add icmp model
authorFlorent Kermarrec <florent@enjoy-digital.fr>
Fri, 6 Feb 2015 08:08:05 +0000 (09:08 +0100)
committerFlorent Kermarrec <florent@enjoy-digital.fr>
Fri, 6 Feb 2015 08:08:05 +0000 (09:08 +0100)
liteeth/test/Makefile
liteeth/test/model/dumps.py
liteeth/test/model/icmp.py [new file with mode: 0644]
liteeth/test/model/ip.py

index 18155a5f87498f7f3040c0aeb0b6676ceb016545..f02ba1fb960ae2cd0985e6cf615f3290e3c797e8 100644 (file)
@@ -8,6 +8,7 @@ model_tb:
        $(CMD) ./model/arp.py
        $(CMD) ./model/ip.py
        $(CMD) ./model/udp.py
+       $(CMD) ./model/icmp.py
 
 mac_core_tb:
        $(CMD) mac_core_tb.py
index 8245ed5c50b42d6bd436f3748a3cbfe3d4143c0d..ae618e0f231bb40aeb874c9a6b718a3823f3ee37 100644 (file)
@@ -19,14 +19,14 @@ arp_request = format_dump("""
 
 arp_request_infos = {
        "sender_mac"            :       0x00123f979201,
-       "target_mac"    :       0x00221922549e,
-       "ethernet_type"                         :       0x806,
-       "hwtype"                                :       0x1,
-       "opcode"                                        :       0x1,
-       "protosize"     :       0x4,
+       "target_mac"            :       0x00221922549e,
+       "ethernet_type"         :       0x806,
+       "hwtype"                        :       0x1,
+       "opcode"                        :       0x1,
+       "protosize"                     :       0x4,
        "proto"                         :       0x800,
        "sender_ip"                     :       0xa9feff42,
-       "target_ip"     :       0xa9fe6462
+       "target_ip"                     :       0xa9fe6462
 
 }
 
@@ -38,14 +38,14 @@ arp_reply = format_dump("""
 
 arp_reply_infos = {
        "sender_mac"            :       0x00221922549e,
-       "target_mac"    :       0x00123f979201,
-       "ethernet_type"                         :       0x806,
-       "hwtype"                                :       0x1,
-       "opcode"                                        :       0x2,
-       "protosize"     :       0x4,
+       "target_mac"            :       0x00123f979201,
+       "ethernet_type"         :       0x806,
+       "hwtype"                        :       0x1,
+       "opcode"                        :       0x2,
+       "protosize"                     :       0x4,
        "proto"                         :       0x800,
        "sender_ip"                     :       0xa9fe6462,
-       "target_ip"     :       0xa9feff42
+       "target_ip"                     :       0xa9feff42
 }
 
 udp = format_dump("""
@@ -59,10 +59,32 @@ aa 9b 4e 4d f9 2e 51 52 fe ff 65 31 3a 71 34 3a
 
 udp_infos = {
        "sender_mac"            :       0x00140b333327,
-       "target_mac"    :       0xd07ab596cd0a,
-       "protocol"                                      :       0x11,
+       "target_mac"            :       0xd07ab596cd0a,
+       "protocol"                      :       0x11,
        "sender_ip"                     :       0xc0a80165,
-       "target_ip"     :       0xb27b0d78,
-       "src_port"                              :       0xa63f,
+       "target_ip"                     :       0xb27b0d78,
+       "src_port"                      :       0xa63f,
        "dst_port"                      :       0x690f
 }
+
+ping_request = format_dump("""
+00 50 56 e0 14 49 00 0c 29 34 0b de 08 00 45 00
+00 3c d7 43 00 00 80 01 2b 73 c0 a8 9e 8b ae 89
+2a 4d 08 00 2a 5c 02 00 21 00 61 62 63 64 65 66
+67 68 69 6a 6b 6c 6d 6e 6f 70 71 72 73 74 75 76
+77 61 62 63 64 65 66 67 68 69""")
+
+ping_request_infos = {
+       "code"          : 0x0,
+       "msgtype"       : 0x8,
+       "quench"        : 0x2002100
+}
+
+ping_reply = format_dump("""
+00 0c 29 34 0b de 00 50 56 e0 14 49 08 00 45 00
+00 3c 76 e1 00 00 80 01 8b d5 ae 89 2a 4d c0 a8
+9e 8b 00 00 32 5c 02 00 21 00 61 62 63 64 65 66
+67 68 69 6a 6b 6c 6d 6e 6f 70 71 72 73 74 75 76
+77 61 62 63 64 65 66 67 68 69""")
+
+ping_reply_infos = {}
diff --git a/liteeth/test/model/icmp.py b/liteeth/test/model/icmp.py
new file mode 100644 (file)
index 0000000..5ee0804
--- /dev/null
@@ -0,0 +1,106 @@
+import math
+
+from liteeth.common import *
+from liteeth.test.common import *
+
+from liteeth.test.model import ip
+
+def print_icmp(s):
+       print_with_prefix(s, "[ICMP]")
+
+# ICMP model
+class ICMPPacket(Packet):
+       def __init__(self, init=[]):
+               Packet.__init__(self, init)
+
+       def decode(self):
+               header = []
+               for byte in self[:icmp_header_len]:
+                       header.append(self.pop(0))
+               for k, v in sorted(icmp_header.items()):
+                       setattr(self, k, get_field_data(v, header))
+
+       def encode(self):
+               header = 0
+               for k, v in sorted(icmp_header.items()):
+                       value = merge_bytes(split_bytes(getattr(self, k), math.ceil(v.width/8)), "little")
+                       header += (value << v.offset+(v.byte*8))
+               for d in split_bytes(header, icmp_header_len):
+                       self.insert(0, d)
+
+       def __repr__(self):
+               r = "--------\n"
+               for k in sorted(icmp_header.keys()):
+                       r += k + " : 0x%x" %getattr(self,k) + "\n"
+               r += "payload: "
+               for d in self:
+                       r += "%02x" %d
+               return r
+
+class ICMP(Module):
+       def  __init__(self, ip, ip_address, debug=False):
+               self.ip = ip
+               self.ip_address = ip_address
+               self.debug = debug
+               self.loopback = loopback
+               self.tx_packets = []
+               self.tx_packet = ICMPPacket()
+               self.rx_packet = ICMPPacket()
+
+               self.ip.set_icmp_callback(self.callback)
+
+       def send(self, packet):
+               packet.encode()
+               if self.debug:
+                       print_icmp(">>>>>>>>")
+                       print_icmp(packet)
+               ip_packet = ip.IPPacket(packet)
+               ip_packet.version = 0x4
+               ip_packet.ihl = 0x5
+               ip_packet.total_length = len(packet) + ip_packet.ihl
+               ip_packet.identification = 0
+               ip_packet.flags = 0
+               ip_packet.fragment_offset = 0
+               ip_packet.ttl = 0x80
+               ip_packet.sender_ip = self.ip_address
+               ip_packet.target_ip = 0x12345678 # XXX
+               ip_packet.checksum = 0
+               ip_packet.protocol = icmp_protocol
+               self.ip.send(ip_packet)
+
+       def callback(self, packet):
+               packet = ICMPPacket(packet)
+               packet.decode()
+               if self.debug:
+                       print_icmp("<<<<<<<<")
+                       print_icmp(packet)
+               if self.loopback:
+                       self.send(packet)
+               else:
+                       self.process(packet)
+
+       def process(self, packet):
+               pass
+
+if __name__ == "__main__":
+       from liteeth.test.model.dumps import *
+       from liteeth.test.model.mac import *
+       from liteeth.test.model.ip import *
+       errors = 0
+       # ICMP packet
+       packet = MACPacket(ping_request)
+       packet.decode_remove_header()
+       #print(packet)
+       packet = IPPacket(packet)
+       packet.decode()
+       #print(packet)
+       packet = ICMPPacket(packet)
+       packet.decode()
+       #print(packet)
+       errors += verify_packet(packet, ping_request_infos)
+       packet.encode()
+       packet.decode()
+       #print(packet)
+       errors += verify_packet(packet, ping_request_infos)
+
+       print("icmp errors " + str(errors))
\ No newline at end of file
index afac3298029ac56441f469ef7ce10ff26028cdf7..58b95e3d2f2aaa1a23344a4ff8dfff0366c5b3c4 100644 (file)
@@ -74,12 +74,16 @@ class IP(Module):
                self.table = {}
                self.request_pending = False
                self.udp_callback = None
+               self.icmp_callback = None
 
                self.mac.set_ip_callback(self.callback)
 
        def set_udp_callback(self, callback):
                self.udp_callback = callback
 
+       def set_icmp_callback(self, callback):
+               self.icmp_callback = callback
+
        def send(self, packet):
                packet.encode()
                packet.insert_checksum()
@@ -116,6 +120,9 @@ class IP(Module):
                if packet.protocol == udp_protocol:
                        if self.udp_callback is not None:
                                self.udp_callback(packet)
+               elif packet.protocol == icmp_protocol:
+                       if self.icmp_callback is not None:
+                               self.icmp_callback(packet)
 
 if __name__ == "__main__":
        from liteeth.test.model.dumps import *