From cf0b36c6cf237042d838a8a8746a8021f1b0c443 Mon Sep 17 00:00:00 2001 From: Florent Kermarrec Date: Fri, 6 Feb 2015 09:08:05 +0100 Subject: [PATCH] add icmp model --- liteeth/test/Makefile | 1 + liteeth/test/model/dumps.py | 54 ++++++++++++------ liteeth/test/model/icmp.py | 106 ++++++++++++++++++++++++++++++++++++ liteeth/test/model/ip.py | 7 +++ 4 files changed, 152 insertions(+), 16 deletions(-) create mode 100644 liteeth/test/model/icmp.py diff --git a/liteeth/test/Makefile b/liteeth/test/Makefile index 18155a5f..f02ba1fb 100644 --- a/liteeth/test/Makefile +++ b/liteeth/test/Makefile @@ -8,6 +8,7 @@ model_tb: $(CMD) ./model/arp.py $(CMD) ./model/ip.py $(CMD) ./model/udp.py + $(CMD) ./model/icmp.py mac_core_tb: $(CMD) mac_core_tb.py diff --git a/liteeth/test/model/dumps.py b/liteeth/test/model/dumps.py index 8245ed5c..ae618e0f 100644 --- a/liteeth/test/model/dumps.py +++ b/liteeth/test/model/dumps.py @@ -19,14 +19,14 @@ arp_request = format_dump(""" arp_request_infos = { "sender_mac" : 0x00123f979201, - "target_mac" : 0x00221922549e, - "ethernet_type" : 0x806, - "hwtype" : 0x1, - "opcode" : 0x1, - "protosize" : 0x4, + "target_mac" : 0x00221922549e, + "ethernet_type" : 0x806, + "hwtype" : 0x1, + "opcode" : 0x1, + "protosize" : 0x4, "proto" : 0x800, "sender_ip" : 0xa9feff42, - "target_ip" : 0xa9fe6462 + "target_ip" : 0xa9fe6462 } @@ -38,14 +38,14 @@ arp_reply = format_dump(""" arp_reply_infos = { "sender_mac" : 0x00221922549e, - "target_mac" : 0x00123f979201, - "ethernet_type" : 0x806, - "hwtype" : 0x1, - "opcode" : 0x2, - "protosize" : 0x4, + "target_mac" : 0x00123f979201, + "ethernet_type" : 0x806, + "hwtype" : 0x1, + "opcode" : 0x2, + "protosize" : 0x4, "proto" : 0x800, "sender_ip" : 0xa9fe6462, - "target_ip" : 0xa9feff42 + "target_ip" : 0xa9feff42 } udp = format_dump(""" @@ -59,10 +59,32 @@ aa 9b 4e 4d f9 2e 51 52 fe ff 65 31 3a 71 34 3a udp_infos = { "sender_mac" : 0x00140b333327, - "target_mac" : 0xd07ab596cd0a, - "protocol" : 0x11, + "target_mac" : 0xd07ab596cd0a, + "protocol" : 0x11, "sender_ip" : 0xc0a80165, - "target_ip" : 0xb27b0d78, - "src_port" : 0xa63f, + "target_ip" : 0xb27b0d78, + "src_port" : 0xa63f, "dst_port" : 0x690f } + +ping_request = format_dump(""" +00 50 56 e0 14 49 00 0c 29 34 0b de 08 00 45 00 +00 3c d7 43 00 00 80 01 2b 73 c0 a8 9e 8b ae 89 +2a 4d 08 00 2a 5c 02 00 21 00 61 62 63 64 65 66 +67 68 69 6a 6b 6c 6d 6e 6f 70 71 72 73 74 75 76 +77 61 62 63 64 65 66 67 68 69""") + +ping_request_infos = { + "code" : 0x0, + "msgtype" : 0x8, + "quench" : 0x2002100 +} + +ping_reply = format_dump(""" +00 0c 29 34 0b de 00 50 56 e0 14 49 08 00 45 00 +00 3c 76 e1 00 00 80 01 8b d5 ae 89 2a 4d c0 a8 +9e 8b 00 00 32 5c 02 00 21 00 61 62 63 64 65 66 +67 68 69 6a 6b 6c 6d 6e 6f 70 71 72 73 74 75 76 +77 61 62 63 64 65 66 67 68 69""") + +ping_reply_infos = {} diff --git a/liteeth/test/model/icmp.py b/liteeth/test/model/icmp.py new file mode 100644 index 00000000..5ee08042 --- /dev/null +++ b/liteeth/test/model/icmp.py @@ -0,0 +1,106 @@ +import math + +from liteeth.common import * +from liteeth.test.common import * + +from liteeth.test.model import ip + +def print_icmp(s): + print_with_prefix(s, "[ICMP]") + +# ICMP model +class ICMPPacket(Packet): + def __init__(self, init=[]): + Packet.__init__(self, init) + + def decode(self): + header = [] + for byte in self[:icmp_header_len]: + header.append(self.pop(0)) + for k, v in sorted(icmp_header.items()): + setattr(self, k, get_field_data(v, header)) + + def encode(self): + header = 0 + for k, v in sorted(icmp_header.items()): + value = merge_bytes(split_bytes(getattr(self, k), math.ceil(v.width/8)), "little") + header += (value << v.offset+(v.byte*8)) + for d in split_bytes(header, icmp_header_len): + self.insert(0, d) + + def __repr__(self): + r = "--------\n" + for k in sorted(icmp_header.keys()): + r += k + " : 0x%x" %getattr(self,k) + "\n" + r += "payload: " + for d in self: + r += "%02x" %d + return r + +class ICMP(Module): + def __init__(self, ip, ip_address, debug=False): + self.ip = ip + self.ip_address = ip_address + self.debug = debug + self.loopback = loopback + self.tx_packets = [] + self.tx_packet = ICMPPacket() + self.rx_packet = ICMPPacket() + + self.ip.set_icmp_callback(self.callback) + + def send(self, packet): + packet.encode() + if self.debug: + print_icmp(">>>>>>>>") + print_icmp(packet) + ip_packet = ip.IPPacket(packet) + ip_packet.version = 0x4 + ip_packet.ihl = 0x5 + ip_packet.total_length = len(packet) + ip_packet.ihl + ip_packet.identification = 0 + ip_packet.flags = 0 + ip_packet.fragment_offset = 0 + ip_packet.ttl = 0x80 + ip_packet.sender_ip = self.ip_address + ip_packet.target_ip = 0x12345678 # XXX + ip_packet.checksum = 0 + ip_packet.protocol = icmp_protocol + self.ip.send(ip_packet) + + def callback(self, packet): + packet = ICMPPacket(packet) + packet.decode() + if self.debug: + print_icmp("<<<<<<<<") + print_icmp(packet) + if self.loopback: + self.send(packet) + else: + self.process(packet) + + def process(self, packet): + pass + +if __name__ == "__main__": + from liteeth.test.model.dumps import * + from liteeth.test.model.mac import * + from liteeth.test.model.ip import * + errors = 0 + # ICMP packet + packet = MACPacket(ping_request) + packet.decode_remove_header() + #print(packet) + packet = IPPacket(packet) + packet.decode() + #print(packet) + packet = ICMPPacket(packet) + packet.decode() + #print(packet) + errors += verify_packet(packet, ping_request_infos) + packet.encode() + packet.decode() + #print(packet) + errors += verify_packet(packet, ping_request_infos) + + print("icmp errors " + str(errors)) \ No newline at end of file diff --git a/liteeth/test/model/ip.py b/liteeth/test/model/ip.py index afac3298..58b95e3d 100644 --- a/liteeth/test/model/ip.py +++ b/liteeth/test/model/ip.py @@ -74,12 +74,16 @@ class IP(Module): self.table = {} self.request_pending = False self.udp_callback = None + self.icmp_callback = None self.mac.set_ip_callback(self.callback) def set_udp_callback(self, callback): self.udp_callback = callback + def set_icmp_callback(self, callback): + self.icmp_callback = callback + def send(self, packet): packet.encode() packet.insert_checksum() @@ -116,6 +120,9 @@ class IP(Module): if packet.protocol == udp_protocol: if self.udp_callback is not None: self.udp_callback(packet) + elif packet.protocol == icmp_protocol: + if self.icmp_callback is not None: + self.icmp_callback(packet) if __name__ == "__main__": from liteeth.test.model.dumps import * -- 2.30.2