udp: add model
authorFlorent Kermarrec <florent@enjoy-digital.fr>
Wed, 4 Feb 2015 19:30:17 +0000 (20:30 +0100)
committerFlorent Kermarrec <florent@enjoy-digital.fr>
Wed, 4 Feb 2015 19:30:17 +0000 (20:30 +0100)
liteeth/common.py
liteeth/test/Makefile
liteeth/test/model/dumps.py
liteeth/test/model/ip.py
liteeth/test/model/udp.py [new file with mode: 0644]

index 1d32885e8aa0fbc4b8bb1cb14799212838fcaac6..ec006492ac724a4547cd944a3cf33f611be72599 100644 (file)
@@ -77,6 +77,8 @@ udp_header = {
        "checksum":                                     HField( 6,  0, 16)
 }
 
+udp_protocol = 0x11
+
 def reverse_bytes(v):
        n = math.ceil(flen(v)/8)
        r = []
index f6a6d864eb673a83eabb46ff128b948f748c1046..24e95db93e41d2612d895c47b148d2d3b9883cb3 100644 (file)
@@ -7,6 +7,7 @@ model_tb:
        $(CMD) ./model/mac.py
        $(CMD) ./model/arp.py
        $(CMD) ./model/ip.py
+       $(CMD) ./model/udp.py
 
 mac_core_tb:
        $(CMD) mac_core_tb.py
@@ -18,4 +19,7 @@ arp_tb:
        $(CMD) arp_tb.py
 
 ip_tb:
-       $(CMD) ip_tb.py 
+       $(CMD) ip_tb.py
+
+udp_tb:
+       $(CMD) udp_tb.py
index be124ee092523c296cd33f3235c090bf2bb6302b..8c6cee86831e0b8c8307ca0e85514fd71d48f5e1 100644 (file)
@@ -62,5 +62,7 @@ udp_infos = {
        "destination_mac_address"       :       0xd07ab596cd0a,
        "protocol"                                      :       0x11,
        "source_ip_address"                     :       0xc0a80165,
-       "destination_ip_address"        :       0xb27b0d78
+       "destination_ip_address"        :       0xb27b0d78,
+       "source_port"                           :       0xa63f,
+       "destination_port"                      :       0x690f
 }
index 61a861be1851e4b3b3fc9325c707500165acc080..9c5db8947d864e4a0db0c20cce728eed97879cb5 100644 (file)
@@ -8,8 +8,6 @@ from liteeth.test.model import mac
 def print_ip(s):
        print_with_prefix(s, "[IP]")
 
-preamble = split_bytes(eth_preamble, 8)
-
 def carry_around_add(a, b):
     c = a + b
     return (c & 0xffff) + (c >> 16)
@@ -75,9 +73,13 @@ class IP(Module):
                self.rx_packet = IPPacket()
                self.table = {}
                self.request_pending = False
+               self.udp_callback = None
 
                self.mac.set_ip_callback(self.callback)
 
+       def set_udp_callback(self, callback):
+               self.udp_callback = callback
+
        def send(self, packet):
                packet.encode()
                packet.insert_checksum()
@@ -104,16 +106,22 @@ class IP(Module):
                if self.loopback:
                        self.send(packet)
                else:
+                       if packet.version != 0x4:
+                               raise ValueError
+                       if packet.ihl != 0x5:
+                               raise ValueError
                        self.process(packet)
 
        def process(self, packet):
-               pass
+               if packet.protocol == udp_protocol:
+                       if self.udp_callback is not None:
+                               self.udp_callback(packet)
 
 if __name__ == "__main__":
        from liteeth.test.model.dumps import *
        from liteeth.test.model.mac import *
        errors = 0
-       # ARP request
+       # UDP packet
        packet = MACPacket(udp)
        packet.decode_remove_header()
        #print(packet)
diff --git a/liteeth/test/model/udp.py b/liteeth/test/model/udp.py
new file mode 100644 (file)
index 0000000..2f82f96
--- /dev/null
@@ -0,0 +1,109 @@
+import math
+
+from liteeth.common import *
+from liteeth.test.common import *
+
+from liteeth.test.model import ip
+
+def print_udp(s):
+       print_with_prefix(s, "[UDP]")
+
+# UDP model
+class UDPPacket(Packet):
+       def __init__(self, init=[]):
+               Packet.__init__(self, init)
+
+       def decode(self):
+               header = []
+               for byte in self[:udp_header_len]:
+                       header.append(self.pop(0))
+               for k, v in sorted(udp_header.items()):
+                       setattr(self, k, get_field_data(v, header))
+
+       def encode(self):
+               header = 0
+               for k, v in sorted(udp_header.items()):
+                       value = merge_bytes(split_bytes(getattr(self, k), math.ceil(v.width/8)), "little")
+                       header += (value << v.offset+(v.byte*8))
+               for d in split_bytes(header, udp_header_len):
+                       self.insert(0, d)
+
+       def __repr__(self):
+               r = "--------\n"
+               for k in sorted(udp_header.keys()):
+                       r += k + " : 0x%x" %getattr(self,k) + "\n"
+               r += "payload: "
+               for d in self:
+                       r += "%02x" %d
+               return r
+
+class UDP(Module):
+       def  __init__(self, ip, ip_address, debug=False, loopback=False):
+               self.ip = ip
+               self.debug = debug
+               self.loopback = loopback
+               self.tx_packets = []
+               self.tx_packet = UDPPacket()
+               self.rx_packet = UDPPacket()
+
+               self.ip.set_udp_callback(self.callback)
+
+       def send(self, packet):
+               packet.encode()
+               if self.debug:
+                       print_udp(">>>>>>>>")
+                       print_udp(packet)
+               ip_packet = ip.IPPacket(packet)
+               ip_packet.version = 0x4
+               ip_packet.ihl = 0x5
+               ip_packet.dscp = 0x0
+               ip_packet.ecn = 0x0
+               ip_packet.total_length = len(packet) + ip_packet.ihl
+               ip_packet.identification = 0
+               ip_packet.flags = 0
+               ip_packet.fragment_offset = 0
+               ip_packet.time_to_live = 0x80
+               ip_packet.source_ip_address = ip_address,
+               ip_packet.destination_ip_address = 0x12345678 # XXX
+               self.ip.send(ip_packet)
+
+       def callback(self, packet):
+               packet = UDPPacket(packet)
+               packet.decode()
+               if self.debug:
+                       print_udp("<<<<<<<<")
+                       print_udp(packet)
+               if self.loopback:
+                       self.send(packet)
+               else:
+                       self.process(packet)
+
+       def process(self, packet):
+               pass
+
+if __name__ == "__main__":
+       from liteeth.test.model.dumps import *
+       from liteeth.test.model.mac import *
+       from liteeth.test.model.ip import *
+       errors = 0
+       # UDP packet
+       packet = MACPacket(udp)
+       packet.decode_remove_header()
+       #print(packet)
+       packet = IPPacket(packet)
+       packet.decode()
+       #print(packet)
+       packet = UDPPacket(packet)
+       packet.decode()
+       #print(packet)
+       if packet.length != (len(packet)+udp_header_len):
+               errors += 1
+       errors += verify_packet(packet, udp_infos)
+       packet.encode()
+       packet.decode()
+       #print(packet)
+       if packet.length != (len(packet)+udp_header_len):
+               errors += 1
+       errors += verify_packet(packet, udp_infos)
+
+       print("udp errors " + str(errors))
\ No newline at end of file