import socket import struct def parse_mdns_response(data): ( transaction_id, flags, questions, answer_rrs, authority_rrs, additional_rrs, ) = struct.unpack(">HHHHHH", data[:12]) offset = 12 for _ in range(questions): offset = skip_name(data, offset) offset += 4 for _ in range(answer_rrs): offset = parse_record(data, offset) for _ in range(additional_rrs): offset = parse_record(data, offset) def parse_record(data, offset): name, offset = read_name(data, offset) record_type, record_class, ttl, data_length = struct.unpack( ">HHIH", data[offset : offset + 10] ) offset += 10 if ttl == 0: print(f"Zero TTL for {name}") return offset + data_length if record_type == 12: # PTR target, _ = read_name(data, offset) print(f"PTR Record: {name} -> {target}") elif record_type == 1: # A ip_address = socket.inet_ntoa(data[offset : offset + data_length]) print(f"A Record: {name} -> {ip_address}") elif record_type == 33: # SRV priority, weight, port = struct.unpack(">HHH", data[offset : offset + 6]) target, _ = read_name(data, offset + 6) print( f"SRV Record: {name} -> {target}:{port} (priority: {priority}, weight: {weight})" ) elif record_type == 16: # TXT print(data_length) txt_data = data[offset : offset + data_length].decode("utf-8") print(f"TXT Record: {name} -> {txt_data}") elif record_type == 47: # NSEC next_domain, _ = read_name(data, offset) print(f"NSEC Record: {name} -> {next_domain}") else: print(f"{hex(record_type)}") print(f"Unknown Record Type {record_type} for {name}") return offset + data_length def read_name(data, offset): labels = [] original_offset = offset jumped = False while True: length = data[offset] if length & 0xC0 == 0xC0: if not jumped: original_offset = offset + 2 pointer = struct.unpack(">H", data[offset : offset + 2])[0] & 0x3FFF offset = pointer jumped = True elif length == 0: offset += 1 break else: offset += 1 labels.append( data[offset : offset + length].decode("utf-8", errors="ignore") ) offset += length if jumped: return ".".join(labels), original_offset else: return ".".join(labels), offset def skip_name(data, offset): while data[offset] != 0: if data[offset] & 0xC0 == 0xC0: return offset + 2 offset += data[offset] + 1 return offset + 1 def send_mdns_query(service_type): multicast_address = "224.0.0.251" multicast_port = 5353 interface_ip = "192.168.137.1" sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.bind((interface_ip, multicast_port)) sock.setsockopt( socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, struct.pack( "=4s4s", socket.inet_aton(multicast_address), socket.inet_aton(interface_ip) ), ) def create_query(): query = bytearray() query += b"\x00\x00" query += b"\x00\x00" query += b"\x00\x01" query += b"\x00\x00" query += b"\x00\x00" query += b"\x00\x00" for part in service_type.split("."): query += bytes([len(part)]) + part.encode() query += b"\x00" query += b"\x00\x0C" query += b"\x00\x01" return query query_packet = create_query() sock.sendto(query_packet, (multicast_address, multicast_port)) print(f"Sent mDNS query for {service_type}") try: while True: data, addr = sock.recvfrom(1024) print(f"Received response from {addr[0]}") flags = struct.unpack(">H", data[2:4])[0] if flags == 0: print("Ignoring non-response packet") continue parse_mdns_response(data) except KeyboardInterrupt: print("Client stopped.") finally: sock.close() if __name__ == "__main__": send_mdns_query("_http._tcp.local")