From: Florent Kermarrec Date: Wed, 4 Feb 2015 19:30:17 +0000 (+0100) Subject: udp: add model X-Git-Tag: 24jan2021_ls180~2604^2~90 X-Git-Url: https://git.libre-soc.org/?a=commitdiff_plain;h=7a4713b3854d7edf5e1f9f3d85c679e26d705395;p=litex.git udp: add model --- diff --git a/liteeth/common.py b/liteeth/common.py index 1d32885e..ec006492 100644 --- a/liteeth/common.py +++ b/liteeth/common.py @@ -77,6 +77,8 @@ udp_header = { "checksum": HField( 6, 0, 16) } +udp_protocol = 0x11 + def reverse_bytes(v): n = math.ceil(flen(v)/8) r = [] diff --git a/liteeth/test/Makefile b/liteeth/test/Makefile index f6a6d864..24e95db9 100644 --- a/liteeth/test/Makefile +++ b/liteeth/test/Makefile @@ -7,6 +7,7 @@ model_tb: $(CMD) ./model/mac.py $(CMD) ./model/arp.py $(CMD) ./model/ip.py + $(CMD) ./model/udp.py mac_core_tb: $(CMD) mac_core_tb.py @@ -18,4 +19,7 @@ arp_tb: $(CMD) arp_tb.py ip_tb: - $(CMD) ip_tb.py + $(CMD) ip_tb.py + +udp_tb: + $(CMD) udp_tb.py diff --git a/liteeth/test/model/dumps.py b/liteeth/test/model/dumps.py index be124ee0..8c6cee86 100644 --- a/liteeth/test/model/dumps.py +++ b/liteeth/test/model/dumps.py @@ -62,5 +62,7 @@ udp_infos = { "destination_mac_address" : 0xd07ab596cd0a, "protocol" : 0x11, "source_ip_address" : 0xc0a80165, - "destination_ip_address" : 0xb27b0d78 + "destination_ip_address" : 0xb27b0d78, + "source_port" : 0xa63f, + "destination_port" : 0x690f } diff --git a/liteeth/test/model/ip.py b/liteeth/test/model/ip.py index 61a861be..9c5db894 100644 --- a/liteeth/test/model/ip.py +++ b/liteeth/test/model/ip.py @@ -8,8 +8,6 @@ from liteeth.test.model import mac def print_ip(s): print_with_prefix(s, "[IP]") -preamble = split_bytes(eth_preamble, 8) - def carry_around_add(a, b): c = a + b return (c & 0xffff) + (c >> 16) @@ -75,9 +73,13 @@ class IP(Module): self.rx_packet = IPPacket() self.table = {} self.request_pending = False + self.udp_callback = None self.mac.set_ip_callback(self.callback) + def set_udp_callback(self, callback): + self.udp_callback = callback + def send(self, packet): packet.encode() packet.insert_checksum() @@ -104,16 +106,22 @@ class IP(Module): if self.loopback: self.send(packet) else: + if packet.version != 0x4: + raise ValueError + if packet.ihl != 0x5: + raise ValueError self.process(packet) def process(self, packet): - pass + if packet.protocol == udp_protocol: + if self.udp_callback is not None: + self.udp_callback(packet) if __name__ == "__main__": from liteeth.test.model.dumps import * from liteeth.test.model.mac import * errors = 0 - # ARP request + # UDP packet packet = MACPacket(udp) packet.decode_remove_header() #print(packet) diff --git a/liteeth/test/model/udp.py b/liteeth/test/model/udp.py new file mode 100644 index 00000000..2f82f965 --- /dev/null +++ b/liteeth/test/model/udp.py @@ -0,0 +1,109 @@ +import math + +from liteeth.common import * +from liteeth.test.common import * + +from liteeth.test.model import ip + +def print_udp(s): + print_with_prefix(s, "[UDP]") + +# UDP model +class UDPPacket(Packet): + def __init__(self, init=[]): + Packet.__init__(self, init) + + def decode(self): + header = [] + for byte in self[:udp_header_len]: + header.append(self.pop(0)) + for k, v in sorted(udp_header.items()): + setattr(self, k, get_field_data(v, header)) + + def encode(self): + header = 0 + for k, v in sorted(udp_header.items()): + value = merge_bytes(split_bytes(getattr(self, k), math.ceil(v.width/8)), "little") + header += (value << v.offset+(v.byte*8)) + for d in split_bytes(header, udp_header_len): + self.insert(0, d) + + def __repr__(self): + r = "--------\n" + for k in sorted(udp_header.keys()): + r += k + " : 0x%x" %getattr(self,k) + "\n" + r += "payload: " + for d in self: + r += "%02x" %d + return r + +class UDP(Module): + def __init__(self, ip, ip_address, debug=False, loopback=False): + self.ip = ip + self.debug = debug + self.loopback = loopback + self.tx_packets = [] + self.tx_packet = UDPPacket() + self.rx_packet = UDPPacket() + + self.ip.set_udp_callback(self.callback) + + def send(self, packet): + packet.encode() + if self.debug: + print_udp(">>>>>>>>") + print_udp(packet) + ip_packet = ip.IPPacket(packet) + ip_packet.version = 0x4 + ip_packet.ihl = 0x5 + ip_packet.dscp = 0x0 + ip_packet.ecn = 0x0 + ip_packet.total_length = len(packet) + ip_packet.ihl + ip_packet.identification = 0 + ip_packet.flags = 0 + ip_packet.fragment_offset = 0 + ip_packet.time_to_live = 0x80 + ip_packet.source_ip_address = ip_address, + ip_packet.destination_ip_address = 0x12345678 # XXX + self.ip.send(ip_packet) + + def callback(self, packet): + packet = UDPPacket(packet) + packet.decode() + if self.debug: + print_udp("<<<<<<<<") + print_udp(packet) + if self.loopback: + self.send(packet) + else: + self.process(packet) + + def process(self, packet): + pass + +if __name__ == "__main__": + from liteeth.test.model.dumps import * + from liteeth.test.model.mac import * + from liteeth.test.model.ip import * + errors = 0 + # UDP packet + packet = MACPacket(udp) + packet.decode_remove_header() + #print(packet) + packet = IPPacket(packet) + packet.decode() + #print(packet) + packet = UDPPacket(packet) + packet.decode() + #print(packet) + if packet.length != (len(packet)+udp_header_len): + errors += 1 + errors += verify_packet(packet, udp_infos) + packet.encode() + packet.decode() + #print(packet) + if packet.length != (len(packet)+udp_header_len): + errors += 1 + errors += verify_packet(packet, udp_infos) + + print("udp errors " + str(errors)) \ No newline at end of file