代码拉取完成,页面将自动刷新
#
# unit tests using the py.test framework
#
# Dependency: python-pytest python-pytest-cov
# Run as: /usr/bin/pytest --cov=.
#
# Note: examples of packets and checksums can be found using wireshark.
# You can then right-click and Copy->...As Escaped string.
import struct
# We can't use import directly because of the hyphen in the file name,
# so we do this via importlib
import importlib
mr = importlib.import_module('multicast-relay')
def test_net_checksum_ipv4():
ipv4_header_tests = (
# IGMPv2 multicast to 224.0.0.1:
(0x9fda, "\x46\xc0\x00\x20\xe2\x92\x00\x00\x01\x02\x9f\xda\xc0\xa8" \
"\x01\x01\xe0\x00\x00\x01\x94\x04\x00\x00"),
# 216.58.198.174 -> 192.168.1.196 unicast tcp:
(0xffba, "\x45\x00\x00\x34\xe0\xb3\x00\x00\x79\x06\xff\xba\xd8\x3a" \
"\xc6\xae\xc0\xa8\x01\xc4"),
# 192.168.1.1 -> 192.168.1.196 UDP DNS response
(0xc000, "\x45\x00\x00\xb4\xf6\x22\x40\x00\x40\x11\xc0\x00\xc0\xa8" \
"\x01\x01\xc0\xa8\x01\xc4"),
)
for (cksum, ip_header) in ipv4_header_tests:
assert cksum == mr.PacketRelay.net_checksum(ip_header[:10]
+ struct.pack('!H', 0)
+ ip_header[12:])
assert 0 == mr.PacketRelay.net_checksum(ip_header)
def test_net_checksum_udp():
# UDP pseudo header (used to calculate the checksum):
# src_ip, dst_ip, 0, protocol, udp_length, udp_header, data
udp_pseudo_header_tests = (
# NTP v4, client:
(0x101f, "\xc0\xa8\x01\xc4" "\x5b\xbd\x5b\x9d" "\x00" "\x11" "\x00\x38" \
"\xd7\x6e\x00\x7b\x00\x38\x10\x1f" \
"\x23\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00" \
"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00" \
"\x00\x00\x00\x00\x00\x00\x00\x00\xe0\x01\x7b\x46\x34\x2f\xeb\x36"),
# NTP v4, server
(0x0d66, "\x5b\xbd\x5b\x9d" "\xc0\xa8\x01\xc4" "\x00" "\x11" "\x00\x38" \
"\x00\x7b\xd5\x1e\x00\x38\x0d\x66" \
"\x24\x02\x03\xe8\x00\x00\x08\x29\x00\x00\x09\xab\x84\xf6\x0b\xe7" \
"\xe0\x01\x76\xa5\x62\xb6\x0f\x58\xe0\x01\x7c\x07\x16\x61\x28\xc4" \
"\xe0\x01\x7c\x07\x53\xd6\x52\x7d\xe0\x01\x7c\x07\x53\xda\xc1\xf6"),
# DNS standard query:
(0x874e, "\xc0\xa8\x01\xc4" "\xc0\xa8\x01\x01" "\x00" "\x11" "\x00\x4e" \
"\xad\xa5\x00\x35\x00\x4e\x87\x4e" \
"\xae\x06\x01\x00\x00\x01\x00\x00\x00\x00\x00\x01\x0d\x6e\x31\x2d" \
"\x32\x31\x31\x36\x39\x37\x33\x36\x34\x35\x09\x65\x75\x2d\x77\x65" \
"\x73\x74\x2d\x31\x03\x65\x6c\x62\x09\x61\x6d\x61\x7a\x6f\x6e\x61" \
"\x77\x73\x03\x63\x6f\x6d\x00\x00\x01\x00\x01\x00\x00\x29\x02\x00" \
"\x00\x00\x00\x00\x00\x00"),
# DNS standard query response
(0x73ad, "\xc0\xa8\x01\x01" "\xc0\xa8\x01\xc4" "\x00" "\x11" "\x00\x44" \
"\x00\x35\x9f\xe2\x00\x44\x73\xad"
"\x22\x97\x81\x80\x00\x01\x00\x01\x00\x00\x00\x01\x04\x64\x6f\x63" \
"\x73\x06\x67\x6f\x6f\x67\x6c\x65\x03\x63\x6f\x6d\x00\x00\x01\x00" \
"\x01\xc0\x0c\x00\x01\x00\x01\x00\x00\x00\x78\x00\x04\xd8\x3a\xd4" \
"\x4e\x00\x00\x29\x10\x00\x00\x00\x00\x00\x00\x00"),
# DNS standard query response - with one less padding to test odd size scenario:
(0x73ad, "\xc0\xa8\x01\x01" "\xc0\xa8\x01\xc4" "\x00" "\x11" "\x00\x44" \
"\x00\x35\x9f\xe2\x00\x44\x73\xad"
"\x22\x97\x81\x80\x00\x01\x00\x01\x00\x00\x00\x01\x04\x64\x6f\x63" \
"\x73\x06\x67\x6f\x6f\x67\x6c\x65\x03\x63\x6f\x6d\x00\x00\x01\x00" \
"\x01\xc0\x0c\x00\x01\x00\x01\x00\x00\x00\x78\x00\x04\xd8\x3a\xd4" \
"\x4e\x00\x00\x29\x10\x00\x00\x00\x00\x00\x00"),
)
for (cksum, udp_header) in udp_pseudo_header_tests:
assert cksum == mr.PacketRelay.net_checksum(udp_header[:18] + struct.pack('!H', 0) + udp_header[20:])
assert 0 == mr.PacketRelay.net_checksum(udp_header)
def test_unicast_ip2mac_str():
arp = """IP address HW type Flags HW address Mask Device
192.168.0.1 0x1 0x2 30:65:EC:6F:C4:58 * eth0
192.168.0.2 0x1 0x2 30:65:ec:6f:c4:59 * eth0"""
assert mr.PacketRelay.unicast_ip2mac_str("192.168.0.1", proc_net_arp_content=arp) == "30:65:EC:6F:C4:58"
assert mr.PacketRelay.unicast_ip2mac_str("192.168.0.2", proc_net_arp_content=arp) == "30:65:ec:6f:c4:59"
assert mr.PacketRelay.unicast_ip2mac_str("172.16.5.1", proc_net_arp_content=arp) is None
def test_modify_udp_packet():
# modify_udp_packet(data, ipHeaderLength, newSrcAddr=None,
# newSrcPort=None, newDstAddr=None, newDstPort=None)
# DNS query:
udp_packet = \
"\x45\x00" \
"\x00\x54\x33\x91\x40\x00\x40\x11\x84\x93\xc0\xa8\x00\x26\xc0\xa8" \
"\x00\xfe\xdf\xdd\x00\x35\x00\x40\xfe\x24\x72\xb7\x01\x00\x00\x01" \
"\x00\x00\x00\x00\x00\x01\x09\x67\x6f\x6f\x67\x6c\x65\x61\x64\x73" \
"\x01\x67\x0b\x64\x6f\x75\x62\x6c\x65\x63\x6c\x69\x63\x6b\x03\x6e" \
"\x65\x74\x00\x00\x01\x00\x01\x00\x00\x29\x02\x00\x00\x00\x00\x00" \
"\x00\x00"
# Make sure the packet is reconstructed the same if we don't change anything:
assert mr.PacketRelay.modify_udp_packet(udp_packet, 20) == udp_packet
# Now change IPs and ports:
modified_packet = \
"\x45\x00" \
"\x00\x54\x33\x91\x40\x00\x40\x11\x8d\x41\xc0\xa8\x01\xc4\x5b\xbd" \
"\x5b\x9d\xde\xde\x01\x34\x00\x40\x06\xd3\x72\xb7\x01\x00\x00\x01" \
"\x00\x00\x00\x00\x00\x01\x09\x67\x6f\x6f\x67\x6c\x65\x61\x64\x73" \
"\x01\x67\x0b\x64\x6f\x75\x62\x6c\x65\x63\x6c\x69\x63\x6b\x03\x6e" \
"\x65\x74\x00\x00\x01\x00\x01\x00\x00\x29\x02\x00\x00\x00\x00\x00" \
"\x00\x00"
assert mr.PacketRelay.modify_udp_packet(udp_packet, 20, '192.168.1.196',
57054, '91.189.91.157', 308) == modified_packet
# SSDP multicast query:
udp_packet = \
"\x45\x00" \
"\x00\x99\x4a\x5f\x40\x00\x01\x11\x7c\xe2\xc0\xa8\x01\x70\xef\xff" \
"\xff\xfa\xd9\x79\x07\x6c\x00\x85\x3e\x62\x4d\x2d\x53\x45\x41\x52" \
"\x43\x48\x20\x2a\x20\x48\x54\x54\x50\x2f\x31\x2e\x31\x0d\x0a\x48" \
"\x4f\x53\x54\x3a\x20\x32\x33\x39\x2e\x32\x35\x35\x2e\x32\x35\x35" \
"\x2e\x32\x35\x30\x3a\x31\x39\x30\x30\x0d\x0a\x4d\x41\x4e\x3a\x20" \
"\x22\x73\x73\x64\x70\x3a\x64\x69\x73\x63\x6f\x76\x65\x72\x22\x0d" \
"\x0a\x4d\x58\x3a\x20\x31\x0d\x0a\x53\x54\x3a\x20\x75\x72\x6e\x3a" \
"\x64\x69\x61\x6c\x2d\x6d\x75\x6c\x74\x69\x73\x63\x72\x65\x65\x6e" \
"\x2d\x6f\x72\x67\x3a\x73\x65\x72\x76\x69\x63\x65\x3a\x64\x69\x61" \
"\x6c\x3a\x31\x0d\x0a\x0d\x0a"
# Make sure the packet is reconstructed the same if we don't change anything:
assert mr.PacketRelay.modify_udp_packet(udp_packet, 20) == udp_packet
# Now change IPs and ports:
modified_packet = \
"\x45\x00" \
"\x00\x99\x4a\x5f\x40\x00\x01\x11\x34\xfa\x0a\x00\x00\x01\xef\xff" \
"\xff\xfa\x07\x6d\x07\x6c\x00\x85\xc8\x86\x4d\x2d\x53\x45\x41\x52" \
"\x43\x48\x20\x2a\x20\x48\x54\x54\x50\x2f\x31\x2e\x31\x0d\x0a\x48" \
"\x4f\x53\x54\x3a\x20\x32\x33\x39\x2e\x32\x35\x35\x2e\x32\x35\x35" \
"\x2e\x32\x35\x30\x3a\x31\x39\x30\x30\x0d\x0a\x4d\x41\x4e\x3a\x20" \
"\x22\x73\x73\x64\x70\x3a\x64\x69\x73\x63\x6f\x76\x65\x72\x22\x0d" \
"\x0a\x4d\x58\x3a\x20\x31\x0d\x0a\x53\x54\x3a\x20\x75\x72\x6e\x3a" \
"\x64\x69\x61\x6c\x2d\x6d\x75\x6c\x74\x69\x73\x63\x72\x65\x65\x6e" \
"\x2d\x6f\x72\x67\x3a\x73\x65\x72\x76\x69\x63\x65\x3a\x64\x69\x61" \
"\x6c\x3a\x31\x0d\x0a\x0d\x0a"
assert mr.PacketRelay.modify_udp_packet(udp_packet, 20, newSrcAddr='10.0.0.1',
newSrcPort=1901) == modified_packet
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。