zeroconf/src/rfc_client.py

140 lines
4.2 KiB
Python

import socket
import struct
import time
def parse_mdns_response(data):
transaction_id, flags, questions, answer_rrs, authority_rrs, additional_rrs = struct.unpack('>HHHHHH', data[:12])
offset = 12
for _ in range(questions):
offset = skip_name(data, offset)
offset += 4
for _ in range(answer_rrs):
offset = parse_record(data, offset)
for _ in range(additional_rrs):
offset = parse_record(data, offset)
def parse_record(data, offset):
name, offset = read_name(data, offset)
record_type, record_class, ttl, data_length = struct.unpack('>HHIH', data[offset:offset + 10])
offset += 10
if ttl == 0:
print(f"Zero TTL for {name}")
return offset + data_length
if record_type == 12: # PTR
target, _ = read_name(data, offset)
print(f"PTR Record: {name} -> {target}")
elif record_type == 1: # A
ip_address = socket.inet_ntoa(data[offset:offset + data_length])
print(f"A Record: {name} -> {ip_address}")
elif record_type == 33: # SRV
priority, weight, port = struct.unpack('>HHH', data[offset:offset + 6])
target, _ = read_name(data, offset + 6)
print(f"SRV Record: {name} -> {target}:{port} (priority: {priority}, weight: {weight})")
elif record_type == 16: # TXT
print(data_length)
txt_data = data[offset:offset + data_length].decode('utf-8')
print(f"TXT Record: {name} -> {txt_data}")
elif record_type == 47: # NSEC
next_domain, _ = read_name(data, offset)
print(f"NSEC Record: {name} -> {next_domain}")
else:
print(f"{hex(record_type)}")
print(f"Unknown Record Type {record_type} for {name}")
return offset + data_length
def read_name(data, offset):
labels = []
original_offset = offset
jumped = False
while True:
length = data[offset]
if length & 0xC0 == 0xC0:
if not jumped:
original_offset = offset + 2
pointer = struct.unpack('>H', data[offset:offset + 2])[0] & 0x3FFF
offset = pointer
jumped = True
elif length == 0:
offset += 1
break
else:
offset += 1
labels.append(data[offset:offset + length].decode('utf-8', errors='ignore'))
offset += length
if jumped:
return '.'.join(labels), original_offset
else:
return '.'.join(labels), offset
def skip_name(data, offset):
while data[offset] != 0:
if data[offset] & 0xC0 == 0xC0:
return offset + 2
offset += data[offset] + 1
return offset + 1
def send_mdns_query(service_type):
multicast_address = "224.0.0.251"
multicast_port = 5353
interface_ip = "192.168.137.1"
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind((interface_ip, multicast_port))
sock.setsockopt(
socket.IPPROTO_IP,
socket.IP_ADD_MEMBERSHIP,
struct.pack("=4s4s", socket.inet_aton(multicast_address), socket.inet_aton(interface_ip))
)
def create_query():
query = bytearray()
query += b'\x00\x00'
query += b'\x00\x00'
query += b'\x00\x01'
query += b'\x00\x00'
query += b'\x00\x00'
query += b'\x00\x00'
for part in service_type.split('.'):
query += bytes([len(part)]) + part.encode()
query += b'\x00'
query += b'\x00\x0C'
query += b'\x00\x01'
return query
query_packet = create_query()
sock.sendto(query_packet, (multicast_address, multicast_port))
print(f"Sent mDNS query for {service_type}")
try:
while True:
data, addr = sock.recvfrom(1024)
print(f"Received response from {addr[0]}")
flags = struct.unpack('>H', data[2:4])[0]
if flags == 0:
print("Ignoring non-response packet")
continue
parse_mdns_response(data)
except KeyboardInterrupt:
print("Client stopped.")
finally:
sock.close()
if __name__ == "__main__":
send_mdns_query("_http._tcp.local")