From charlesreid1

Source

Github repo: https://github.com/jerry-0824/008_black_hat_python/

File: https://github.com/jerry-0824/008_black_hat_python/blob/master/p44_chapter_03_scanner.py

Explanation

Let's walk through this step-by-step.

Libraries

Start with an import of libraries:

import socket
import os
import struct
import threading

from netaddr import IPNetwork,IPAddress
from ctypes import *
  • socket: This module provides access to the low-level C API for network sockets. Sockets are the endpoints of a bidirectional communications channel. This library is fundamental for any network communication, allowing the script to create raw sockets for sniffing traffic and UDP sockets for sending packets.
  • os: This module provides a way of using operating system dependent functionality. In this script, it's primarily used to check the name of the operating system (os.name) to apply OS-specific configurations, such as enabling promiscuous mode on Windows.
  • struct: This module is used to perform conversions between Python values and C structs represented as Python bytes objects. Network headers are defined by strict binary layouts (like C structs), so this module is essential for packing data into the correct format for sending (though not explicitly used for sending custom packets in this script's sender) and, more importantly here, for unpacking raw byte data received from the network into understandable Python data types, especially when working with ctypes.
  • threading: This module provides a high-level interface for working with threads, which are separate flows of execution. This script uses threading to run the udp_sender function concurrently with the main packet sniffing loop, allowing the scanner to send packets and listen for replies simultaneously.
  • netaddr: This is a third-party Python library for representing and manipulating network addresses (IPs, MACs). Here, IPNetwork is used to easily iterate over all IP addresses within a given subnet string (e.g., "192.168.1.0/24"), and IPAddress is used to represent and compare IP addresses.
  • ctypes: This module provides a foreign function interface (FFI) for Python, allowing it to call functions in C-compiled shared libraries or DLLs. More relevant to this script, it allows the creation of C-compatible data types, like structures (Structure), which can be mapped directly onto blocks of binary data, such as network packet headers. This is crucial for parsing the IP and ICMP headers from raw packet data.

Next we set the host machine IP address, and the subnet that is being scanned:

host   = "192.168.1.140"
subnet = "192.168.1.0/24"
  • host: This string variable stores the IP address of the network interface on the machine running the scanner. The sniffing socket will be bound to this address, meaning it will primarily listen for packets arriving at this interface.
  • subnet: This string defines the target network range that the scanner will probe. The "/24" is CIDR (Classless Inter-Domain Routing) notation, representing the network mask. In this case, "192.168.1.0/24" includes all IP addresses from 192.168.1.0 to 192.168.1.255.

Next, a function to send UDP packets out on port 65212 using the socket library is defined:

def udp_sender(subnet,magic_message):
    sender = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    
    for ip in IPNetwork(subnet):
        try:
            sender.sendto(magic_message,("%s" % ip,65212))
        except:
            pass

This function, udp_sender, is designed to send a specific UDP packet (containing magic_message) to every IP address within the given subnet.

  • sender = socket.socket(socket.AF_INET, socket.SOCK_DGRAM): This line creates a new socket.
    • socket.AF_INET specifies the address family as IPv4.
    • socket.SOCK_DGRAM specifies the socket type as UDP (User Datagram Protocol), which is a connectionless protocol. UDP is chosen because if a host is up but the specific port is closed, it typically responds with an ICMP "Port Unreachable" message, which this scanner listens for.
  • for ip in IPNetwork(subnet):: This loop iterates through each individual IP address within the specified subnet. The IPNetwork object from the netaddr library conveniently generates these IP addresses.
  • sender.sendto(magic_message,("%s" % ip,65212)): For each IP address, this attempts to send the magic_message (a predefined string, "PYTHONRULES!") to that IP address on UDP port 65212. The port number 65212 is a high, unprivileged port, unlikely to be in legitimate use, increasing the chances of eliciting a "Port Unreachable" ICMP response.
  • except: pass: This is a broad exception handler. If an error occurs during the sendto operation (e.g., a particular IP is not routable, or a network error occurs), the script will ignore the error and pass, continuing to the next IP address. This makes the sender resilient.

So far so good. Now the port scanner defines a few classes, namely, an IP class. The original comment "basically just wraps a bunch of constants and data" means that this class is designed to interpret a raw sequence of bytes (from a network packet) as a structured IP header. It uses ctypes.Structure to map Python attributes to the binary fields of an actual IPv4 header.

class IP(Structure):
    _fields_ = [
        ("ihl",          c_ubyte, 4),
        ("version",      c_ubyte, 4),
        ("tos",          c_ubyte),
        ("len",          c_ushort),
        ("id",           c_ushort),
        ("offset",       c_ushort),
        ("ttl",          c_ubyte),
        ("protocol_num", c_ubyte),
        ("sum",          c_ushort),
        # ("src",          c_ulong),
        # ("dst",          c_ulong)
        ("src",          c_uint32),
        ("dst",          c_uint32)
    ]
    
    def __new__(self, socket_buffer=None):
            return self.from_buffer_copy(socket_buffer)   
        
    def __init__(self, socket_buffer=None):

        # map protocol constants to their names
        self.protocol_map = {1:"ICMP", 6:"TCP", 17:"UDP"}
        
        # human readable IP addresses
        self.src_address = socket.inet_ntoa(struct.pack("<L",self.src))
        self.dst_address = socket.inet_ntoa(struct.pack("<L",self.dst))
    
        # human readable protocol
        try:
            self.protocol = self.protocol_map[self.protocol_num]
        except:
            self.protocol = str(self.protocol_num)

The IP class inherits from ctypes.Structure, enabling it to represent the structure of an IPv4 packet header.

  • _fields_: This special attribute defines the fields of the IP header, their C-compatible data types from ctypes, and, for some, their bit widths. This precise definition is critical for correctly parsing raw byte data from a packet into meaningful header information.
    • ("ihl", c_ubyte, 4): Internet Header Length, a 4-bit field indicating the length of the IP header in 32-bit words (e.g., a value of 5 means $5 \times 4 = 20$ bytes). It's defined as a c_ubyte (unsigned 1-byte integer), with : 4 specifying it uses 4 bits of that byte.
    • ("version", c_ubyte, 4): IP Version, a 4-bit field (typically 4 for IPv4). Shares the first byte with ihl.
    • ("tos", c_ubyte): Type of Service (now Differentiated Services Code Point), an 8-bit field used for quality of service. (1 byte)
    • ("len", c_ushort): Total Length, a 16-bit field specifying the entire packet size in bytes, including header and data. c_ushort is an unsigned 2-byte integer.
    • ("id", c_ushort): Identification, a 16-bit field used for uniquely identifying fragments of an original IP datagram. (2 bytes)
    • ("offset", c_ushort): Fragment Offset, a 16-bit field (of which 13 bits are used for the offset value) indicating where a particular fragment belongs in the original datagram. (2 bytes)
    • ("ttl", c_ubyte): Time To Live, an 8-bit field that limits the lifespan of a datagram to prevent it from circulating indefinitely on the network. (1 byte)
    • ("protocol_num", c_ubyte): Protocol, an 8-bit field identifying the protocol used in the data portion of the IP datagram (e.g., 1 for ICMP, 6 for TCP, 17 for UDP). (1 byte)
    • ("sum", c_ushort): Header Checksum, a 16-bit field used for error-checking of the IP header. (2 bytes)
    • ("src", c_uint32): Source IP Address, a 32-bit numerical representation. c_uint32 is an unsigned 4-byte integer.
    • ("dst", c_uint32): Destination IP Address, a 32-bit numerical representation. (4 bytes)

The use of specific ctypes like c_ubyte (1 byte), c_ushort (2 bytes), and c_uint32 (4 bytes) is crucial because they directly correspond to the fixed sizes of these fields as defined in the IP protocol specification. This ensures that when a raw byte string (from a packet) is mapped to this structure, each field is correctly interpreted. The ihl field, once read, is particularly important later for calculating the actual start of the data payload (e.g., an ICMP header) because the IP header can have a variable length due to options.

  • def __new__(self, socket_buffer=None): This special method is responsible for creating the instance of the class. self.from_buffer_copy(socket_buffer) is a ctypes method that creates an instance of the IP structure and populates its fields directly from the data in socket_buffer. This is how the raw bytes of an incoming packet's IP header are parsed into the structure's defined fields.
  • def __init__(self, socket_buffer=None): This initializer is called after __new__ has created and populated the instance. It performs additional processing to make the header data more usable:
    • self.protocol_map = {1:"ICMP", 6:"TCP", 17:"UDP"}: Creates a simple dictionary to map common protocol numbers to their human-readable names.
    • self.src_address = socket.inet_ntoa(struct.pack("<L",self.src)): Converts the 32-bit integer source IP address (self.src, which was populated by from_buffer_copy) into the familiar dotted-quad string format (e.g., "192.168.1.1").
      • struct.pack("<L",self.src) packs the 32-bit unsigned integer (L) self.src into a 4-byte string. The < specifies little-endian byte order. socket.inet_ntoa expects this packed binary string format to perform the conversion.
    • self.dst_address = socket.inet_ntoa(struct.pack("<L",self.dst)): Performs the same conversion for the destination IP address.
    • The try-except block attempts to look up the human-readable name of the protocol using self.protocol_num (e.g., converting 1 to "ICMP"). If the protocol number isn't in self.protocol_map, it defaults to storing the protocol number as a string.

Next comes an ICMP class:

class ICMP(Structure):
    
    _fields_ = [
        ("type",         c_ubyte),
        ("code",         c_ubyte),
        ("checksum",     c_ushort),
        ("unused",       c_ushort),
        ("next_hop_mtu", c_ushort)
        ]
    
    def __new__(self, socket_buffer):
        return self.from_buffer_copy(socket_buffer)   

    def __init__(self, socket_buffer):
        pass

This ICMP class, also inheriting from ctypes.Structure, defines the layout of an ICMP (Internet Control Message Protocol) header. This allows the script to parse and interpret the ICMP messages it receives, which are key to its host discovery mechanism.

  • _fields_: Defines the fields at the beginning of an ICMP header.
    • ("type", c_ubyte): An 8-bit field indicating the type of ICMP message. For this scanner, Type 3 ("Destination Unreachable") is of particular interest. (1 byte)
    • ("code", c_ubyte): An 8-bit field providing more specific information about the ICMP message, qualifying the type. For Type 3, a Code of 3 indicates "Port Unreachable". (1 byte)
    • ("checksum", c_ushort): A 16-bit checksum for the ICMP message (header and data). (2 bytes)
    • ("unused", c_ushort) and ("next_hop_mtu", c_ushort): These fields represent the next 4 bytes in an ICMP header. For "Destination Unreachable" messages (Type 3), the standard specifies that these bytes are unused (must be zero) for some codes, or may carry information like "Next-Hop MTU" for others. For the specific Type 3, Code 3 ("Port Unreachable") message this scanner looks for, the primary interest is in the type and code. The rest of the ICMP payload often contains the IP header and first 8 bytes of the datagram that caused the error.
  • def __new__(self, socket_buffer): Similar to the IP class, this method creates an ICMP instance and populates its fields from the raw socket_buffer byte data.
  • def __init__(self, socket_buffer): This initializer is empty (pass), indicating that no further processing is needed beyond what __new__ and from_buffer_copy provide. The raw type and code values are directly used by the scanner's logic.

Okay, so now we get to the good part.

# create a raw socket and bind it to the public interface
if os.name == "nt":
    socket_protocol = socket.IPPROTO_IP 
else:
    socket_protocol = socket.IPPROTO_ICMP
    
sniffer = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket_protocol)

sniffer.bind((host, 0))

Here, the script sets up a raw socket to listen for incoming network packets.

  • if os.name == "nt": socket_protocol = socket.IPPROTO_IP else: socket_protocol = socket.IPPROTO_ICMP: This conditional statement selects the appropriate protocol for the raw socket based on the operating system.
    • On Windows (os.name == "nt"), socket.IPPROTO_IP is used. This means the socket will receive entire IP packets, including the IP header.
    • On POSIX-compliant systems (Linux, macOS, etc.), socket.IPPROTO_ICMP is used. This tells the operating system that the application is specifically interested in ICMP packets. The OS might provide the ICMP packet with or without the IP header depending on the system, but the script later explicitly requests the IP header with IP_HDRINCL.
  • sniffer = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket_protocol): This line creates the raw socket.
    • socket.AF_INET: Specifies the address family as IPv4.
    • socket.SOCK_RAW: Specifies the socket type as "raw". Raw sockets allow applications to bypass much of the OS's TCP/IP stack and directly access IP packets (and other protocols). This level of access is necessary to read the headers of received packets and to receive ICMP messages that might not be delivered to regular sockets.
    • socket_protocol: The protocol constant determined by the OS check.
  • sniffer.bind((host, 0)): This binds the raw socket to the network interface associated with the host IP address. The port number 0 is used because for raw IP or ICMP sockets, the concept of a specific port for listening across all protocols is not strictly applicable in the same way as for TCP/UDP sockets; the socket receives packets based on the protocol type specified. Binding ensures the sniffer listens on the correct network interface if the machine has multiple.

Here we've created an open socket, and bound it to the host IP. That's the socket through which our machine will primarily capture traffic relevant to its IP address, and with promiscuous mode (set later for Windows), traffic on its network segment.

# we want the IP headers included in the capture
sniffer.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1)

# if we're on Windows we need to send some ioctls
# to setup promiscuous mode
if os.name == "nt":
    sniffer.ioctl(socket.SIO_RCVALL, socket.RCVALL_ON)

# start sending packets
t = threading.Thread(target=udp_sender,args=(subnet,magic_message))
t.start()        

This section configures the newly created raw socket and initiates the concurrent packet sending.

  • sniffer.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1): This sets a socket option.
    • socket.IPPROTO_IP: Specifies that the option applies at the IP protocol level.
    • socket.IP_HDRINCL: This option, when set to 1 (true), indicates that the IP header should be included in the packets received by this raw socket. This is crucial because the script needs to parse the IP header (using the IP class) to determine the protocol, source IP, and IP header length.
  • if os.name == "nt": sniffer.ioctl(socket.SIO_RCVALL, socket.RCVALL_ON): This block is Windows-specific.
    • sniffer.ioctl(socket.SIO_RCVALL, socket.RCVALL_ON): ioctl (Input/Output Control) is a system call used to perform device-specific operations.
    • socket.SIO_RCVALL is a control code that, when used with socket.RCVALL_ON, puts the network interface associated with the socket into promiscuous mode. In promiscuous mode, the network card passes all received frames to the operating system, regardless of whether they are addressed to the machine's MAC address. This allows the sniffer to capture a wider range of traffic on the local network segment.
  • t = threading.Thread(target=udp_sender,args=(subnet,magic_message)): This line creates a new thread of execution.
    • target=udp_sender: Specifies that the udp_sender function is the function to be executed in this new thread.
    • args=(subnet,magic_message): Passes the subnet to scan and the magic_message string as arguments to the udp_sender function.
  • t.start(): This starts the execution of the udp_sender function in the newly created thread. This allows the script to send out its UDP probe packets across the subnet while the main thread concurrently listens for incoming ICMP responses.

This just starts up a thread that is sending UDP packets with our "magic message". The main thread will now proceed to listen for responses.

try:
    while True:
        
        # read in a single packet
        raw_buffer = sniffer.recvfrom(65535)[0]
        
        # create an IP header from the first 20 bytes of the buffer
        ip_header = IP(raw_buffer[0:20])
    
        #print "Protocol: %s %s -> %s" % (ip_header.protocol, ip_header.src_address, ip_header.dst_address)
    
        # if it's ICMP we want it
        if ip_header.protocol == "ICMP":
            
            # calculate where our ICMP packet starts
            offset = ip_header.ihl * 4
            buf = raw_buffer[offset:offset + sizeof(ICMP)]
            
            # create our ICMP structure
            icmp_header = ICMP(buf)
            
            #print "ICMP -> Type: %d Code: %d" % (icmp_header.type, icmp_header.code)

            # now check for the TYPE 3 and CODE 3 which indicates
            # a host is up but no port available to talk to             
            if icmp_header.code == 3 and icmp_header.type == 3:
                
                # check to make sure we are receiving the response 
                # that lands in our subnet
                if IPAddress(ip_header.src_address) in IPNetwork(subnet):
                    
                    # test for our magic message
                    if raw_buffer[len(raw_buffer)-len(magic_message):] == magic_message.encode("utf-8"):
                        print("Host Up: %s" % ip_header.src_address)

# handle CTRL-C
except KeyboardInterrupt:
    # if we're on Windows turn off promiscuous mode
    if os.name == "nt":
        sniffer.ioctl(socket.SIO_RCVALL, socket.RCVALL_OFF)

This is the main loop of the sniffer, where it continuously reads and processes packets from the network.

  • try...except KeyboardInterrupt: This structure encloses the main loop, allowing the program to run indefinitely until the user manually interrupts it (e.g., by pressing Ctrl+C). The except block handles this interruption gracefully.
  • while True:: An infinite loop to keep listening for packets.
  • raw_buffer = sniffer.recvfrom(65535)[0]: This is a blocking call that waits to receive a packet on the sniffer socket.
    • 65535 is the maximum size of an IP packet in bytes, so it allocates a buffer large enough for any packet.
    • recvfrom() returns a tuple (bytes, address), where bytes is the received packet data and address is the source address. [0] selects just the packet data.
  • ip_header = IP(raw_buffer[0:20]): Creates an instance of the IP class using the first 20 bytes of the received raw_buffer. An IPv4 header is at least 20 bytes long. This parses these bytes into the ip_header object, allowing access to fields like protocol_num and ihl.
  • if ip_header.protocol == "ICMP":: Checks if the protocol encapsulated within the IP packet is ICMP. The ip_header.protocol attribute was conveniently converted to a string (e.g., "ICMP", "TCP", "UDP") in the IP class's __init__ method.
  • offset = ip_header.ihl * 4: If the packet is ICMP, this line calculates the starting position of the ICMP header within the raw_buffer. The ip_header.ihl field gives the IP header length in 4-byte words, so multiplying by 4 converts it to bytes. This is important because the IP header can contain options, making it longer than the minimum 20 bytes.
  • buf = raw_buffer[offset:offset + sizeof(ICMP)]: Extracts the portion of the raw_buffer that corresponds to the ICMP header. It starts at the calculated offset and takes a slice of bytes equal to the size of the ICMP structure (defined by its _fields_).
  • icmp_header = ICMP(buf): Creates an instance of the ICMP class from the extracted buf, parsing the ICMP header fields (like type and code).
  • if icmp_header.code == 3 and icmp_header.type == 3:: This is the core detection logic. It checks if the received ICMP message is of Type 3 ("Destination Unreachable") and Code 3 ("Port Unreachable"). This specific ICMP message is expected if one of the UDP packets sent by udp_sender reaches a live host, but the destination port (65212) is closed on that host.
  • if IPAddress(ip_header.src_address) in IPNetwork(subnet):: This checks if the source IP address of the ICMP "Port Unreachable" message is within the subnet that the scanner is targeting. This helps ensure that the response is from one of the hosts being probed and not from an unrelated source.
  • if raw_buffer[len(raw_buffer)-len(magic_message):] == magic_message.encode("utf-8"):: This is a crucial verification step. ICMP "Destination Unreachable" messages typically include the IP header and the first 8 bytes of the original datagram that caused the error. Since the udp_sender sent a UDP packet with magic_message as its payload, this check verifies if the tail end of the received ICMP error packet's payload (which contains part of the original UDP packet) matches the magic_message. Note: magic_message is a string, and network data is bytes, so it's compared against an encoded version of magic_message. This confirms that the ICMP response is a direct reply to one of the scanner's probe packets. The len(raw_buffer)-len(magic_message): slice attempts to get the last part of the payload that would correspond to the length of the magic message.
  • print("Host Up: %s" % ip_header.src_address): If all the above conditions are met, it means a host at ip_header.src_address is considered "up" and has responded to the probe. The script then prints this information.
  • except KeyboardInterrupt:: If the user presses Ctrl+C, the while loop is broken.
  • if os.name == "nt": sniffer.ioctl(socket.SIO_RCVALL, socket.RCVALL_OFF): If the script is running on Windows, this line is executed upon exiting the loop (due to KeyboardInterrupt). It turns off promiscuous mode by calling ioctl with socket.RCVALL_OFF. This is good practice to restore the network interface to its normal operational mode.

Original Code

# /usr/bin/python python3
# -*- coding:utf-8 -*-
##################################################
# Filename: p43_chapter_03_scanner.py
# Author:   jerry_0824
# Email:    63935127##qq.com
# Phone:    +86-155-8287-7999
# Date:     2016-03-07
# Version:  v1.0.0
##################################################

import socket
import os
import struct
import threading

from netaddr import IPNetwork,IPAddress
from ctypes import *

# host to listen on
host   = "192.168.1.140"

# subnet to target
subnet = "192.168.1.0/24"

# magic we'll check ICMP responses for
magic_message = "PYTHONRULES!"

def udp_sender(subnet,magic_message):
    # In Python 3, magic_message (if string) needs to be encoded to bytes for sendto
    # If magic_message is already bytes, this is fine.
    # Assuming magic_message is a string as defined globally.
    payload = magic_message.encode('utf-8') 
    sender = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    
    for ip in IPNetwork(subnet):
        try:
            sender.sendto(payload,("%s" % ip,65212))
        except:
            pass

class IP(Structure):
    _fields_ = [
        ("ihl",          c_ubyte, 4),
        ("version",      c_ubyte, 4),
        ("tos",          c_ubyte),
        ("len",          c_ushort),
        ("id",           c_ushort),
        ("offset",       c_ushort),
        ("ttl",          c_ubyte),
        ("protocol_num", c_ubyte),
        ("sum",          c_ushort),
        # ("src",          c_ulong),
        # ("dst",          c_ulong)
        ("src",          c_uint32),
        ("dst",          c_uint32)
    ]
    
    def __new__(self, socket_buffer=None):
            return self.from_buffer_copy(socket_buffer)   
        
    def __init__(self, socket_buffer=None):

        # map protocol constants to their names
        self.protocol_map = {1:"ICMP", 6:"TCP", 17:"UDP"}
        
        # human readable IP addresses
        self.src_address = socket.inet_ntoa(struct.pack("<L",self.src))
        self.dst_address = socket.inet_ntoa(struct.pack("<L",self.dst))
    
        # human readable protocol
        try:
            self.protocol = self.protocol_map[self.protocol_num]
        except KeyError: # More specific exception
            self.protocol = str(self.protocol_num)

class ICMP(Structure):
    
    _fields_ = [
        ("type",         c_ubyte),
        ("code",         c_ubyte),
        ("checksum",     c_ushort),
        ("unused",       c_ushort),
        ("next_hop_mtu", c_ushort)
        ]
    
    def __new__(self, socket_buffer):
        return self.from_buffer_copy(socket_buffer)   

    def __init__(self, socket_buffer):
        pass

# create a raw socket and bind it to the public interface
if os.name == "nt":
    socket_protocol = socket.IPPROTO_IP 
else:
    socket_protocol = socket.IPPROTO_ICMP
    
sniffer = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket_protocol)

sniffer.bind((host, 0))

# we want the IP headers included in the capture
sniffer.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1)

# if we're on Windows we need to send some ioctls
# to setup promiscuous mode
if os.name == "nt":
    sniffer.ioctl(socket.SIO_RCVALL, socket.RCVALL_ON)

# start sending packets
# Ensure magic_message is passed as bytes to the thread if it's used as bytes there
magic_message_bytes_for_thread = magic_message.encode('utf-8') if isinstance(magic_message, str) else magic_message
t = threading.Thread(target=udp_sender,args=(subnet,magic_message_bytes_for_thread))
t.start()        

try:
    while True:
        
        # read in a single packet
        raw_buffer = sniffer.recvfrom(65535)[0]
        
        # create an IP header from the first 20 bytes of the buffer
        # A more robust approach would first parse just enough to get ip_header.ihl,
        # then parse the full IP header based on that length.
        # For simplicity here, it assumes at least 20 bytes and parses that.
        ip_header = IP(raw_buffer[0:20]) 
    
        #print "Protocol: %s %s -> %s" % (ip_header.protocol, ip_header.src_address, ip_header.dst_address)
    
        # if it's ICMP we want it
        if ip_header.protocol == "ICMP":
            
            # calculate where our ICMP packet starts
            offset = ip_header.ihl * 4
            # Ensure offset + sizeof(ICMP) is within raw_buffer bounds
            if offset + sizeof(ICMP) <= len(raw_buffer):
                buf = raw_buffer[offset:offset + sizeof(ICMP)]
                
                # create our ICMP structure
                icmp_header = ICMP(buf)
                
                #print "ICMP -> Type: %d Code: %d" % (icmp_header.type, icmp_header.code)

                # now check for the TYPE 3 and CODE 3 which indicates
                # a host is up but no port available to talk to             
                if icmp_header.code == 3 and icmp_header.type == 3:
                    
                    # check to make sure we are receiving the response 
                    # that lands in our subnet
                    if IPAddress(ip_header.src_address) in IPNetwork(subnet):
                        
                        # test for our magic message
                        # ICMP error message payload typically contains:
                        # IP header of original datagram + first 8 bytes of original datagram's payload.
                        # The original datagram's payload was our magic_message.
                        # So we look for magic_message within this returned part.
                        # The start of the original datagram's payload within the ICMP data:
                        original_payload_in_icmp_offset = offset + sizeof(ICMP) + (ip_header.ihl * 4) # Start of original IP header in ICMP payload
                        original_payload_in_icmp_offset += 8 # Skip original IP header's first 8 bytes if that's how it's structured
                                                            # More simply, check if magic_message is in the latter part of raw_buffer
                        
                        # The original script's check: raw_buffer[len(raw_buffer)-len(magic_message):] == magic_message_bytes_for_thread
                        # This assumes magic_message is at the very end of the received raw_buffer, which might be true
                        # if the UDP packet was small and the ICMP response includes it fully at the end of its own payload.
                        # Let's stick to the original script's logic for this part.
                        # Ensure magic_message_bytes_for_thread is defined (it is from thread setup)
                        if len(raw_buffer) >= len(magic_message_bytes_for_thread) and \
                           raw_buffer[len(raw_buffer)-len(magic_message_bytes_for_thread):] == magic_message_bytes_for_thread:
                            print("Host Up: %s" % ip_header.src_address)

# handle CTRL-C
except KeyboardInterrupt:
    # if we're on Windows turn off promiscuous mode
    if os.name == "nt":
        sniffer.ioctl(socket.SIO_RCVALL, socket.RCVALL_OFF)

Flags