From: Florent Kermarrec Date: Thu, 29 Jan 2015 17:22:12 +0000 (+0100) Subject: improve mac model (add header encoding/decoding) X-Git-Tag: 24jan2021_ls180~2604^2~110 X-Git-Url: https://git.libre-soc.org/?a=commitdiff_plain;h=96b08a0cc76aae4185df1511219d0ae2d21019fc;p=litex.git improve mac model (add header encoding/decoding) --- diff --git a/liteeth/test/mac_core_tb.py b/liteeth/test/mac_core_tb.py index 33680bec..3b9e5a8d 100644 --- a/liteeth/test/mac_core_tb.py +++ b/liteeth/test/mac_core_tb.py @@ -11,15 +11,15 @@ from liteeth.test.model import phy, mac class TB(Module): def __init__(self): - self.submodules.hostphy = phy.PHY(8, debug=False) - self.submodules.hostmac = mac.MAC(self.hostphy, debug=False, loopback=True) - self.submodules.core = LiteEthMACCore(phy=self.hostphy, dw=32, with_hw_preamble_crc=True) + self.submodules.phy_model = phy.PHY(8, debug=False) + self.submodules.mac_model = mac.MAC(self.phy_model, debug=True, loopback=True) + self.submodules.core = LiteEthMACCore(phy=self.phy_model, dw=8, with_hw_preamble_crc=True) - self.submodules.streamer = PacketStreamer(eth_phy_description(32), last_be=1) - self.submodules.streamer_randomizer = AckRandomizer(eth_phy_description(32), level=50) + self.submodules.streamer = PacketStreamer(eth_phy_description(8), last_be=1) + self.submodules.streamer_randomizer = AckRandomizer(eth_phy_description(8), level=50) - self.submodules.logger_randomizer = AckRandomizer(eth_phy_description(32), level=50) - self.submodules.logger = PacketLogger(eth_phy_description(32)) + self.submodules.logger_randomizer = AckRandomizer(eth_phy_description(8), level=50) + self.submodules.logger = PacketLogger(eth_phy_description(8)) # use sys_clk for each clock_domain self.clock_domains.cd_eth_rx = ClockDomain() @@ -46,12 +46,16 @@ class TB(Module): selfp.cd_eth_tx.rst = 0 for i in range(8): - streamer_packet = Packet([i for i in range(64)]) - yield from self.streamer.send(streamer_packet) + packet = mac.MACPacket([i for i in range(64)]) + packet.destination_mac_address = 0x010203040506 + packet.source_mac_address = 0x090A0B0C0C0D + packet.ethernet_type = 0x0800 + packet.encode_header() + yield from self.streamer.send(packet) yield from self.logger.receive() # check results - s, l, e = check(streamer_packet, self.logger.packet) + s, l, e = check(packet, self.logger.packet) print("shift "+ str(s) + " / length " + str(l) + " / errors " + str(e)) if __name__ == "__main__": diff --git a/liteeth/test/model/mac.py b/liteeth/test/model/mac.py index c08ff7b1..72656af4 100644 --- a/liteeth/test/model/mac.py +++ b/liteeth/test/model/mac.py @@ -1,30 +1,42 @@ -import binascii +import math, binascii from liteeth.common import * from liteeth.mac.common import * from liteeth.test.common import * +def split_bytes(v, n): + r = [] + r_bytes = v.to_bytes(n, byteorder="little") + for byte in r_bytes: + r.append(int(byte)) + return r + +def merge_bytes(b): + return int.from_bytes(bytes(b), "little") + +def get_field_data(field, datas): + v = merge_bytes(datas[field.byte:field.byte+math.ceil(field.width/8)]) + return (v >> field.offset) & (2**field.width-1) + def print_mac(s): print_with_prefix(s, "[MAC]") -preamble = [0x55, 0x55, 0x55, 0x55, 0x55, 0x55, 0x55, 0xD5] +preamble = split_bytes(eth_preamble, 8) def crc32(l): crc = [] - crc_bytes = binascii.crc32(bytes(l)).to_bytes(4, byteorder="little") + crc_bytes = split_bytes(binascii.crc32(bytes(l)), 4) for byte in crc_bytes: crc.append(int(byte)) return crc # MAC model -class MACPacket(list): +class MACPacket(Packet): def __init__(self, init=[]): - self.ongoing = False - self.done = False - for byte in init: - self.append(byte) + Packet.__init__(self, init) + self.preamble_error = False + self.crc_error = False -class MACRXPacket(MACPacket): def check_remove_preamble(self): if comp(self[0:8], preamble): for i in range(8): @@ -41,7 +53,28 @@ class MACRXPacket(MACPacket): else: return True -class MACTXPacket(MACPacket): + def decode_remove_header(self): + header = [] + for byte in self[:mac_header_len]: + header.append(self.pop(0)) + for k, v in sorted(mac_header.items()): + setattr(self, k, get_field_data(v, header)) + + def decode(self): + self.preamble_error = self.check_remove_preamble() + self.crc_error = self.check_remove_crc() + if self.crc_error or self.preamble_error: + raise ValueError # XXX handle this properly + else: + self.decode_remove_header() + + def encode_header(self): + header = 0 + for k, v in sorted(mac_header.items()): + header |= (getattr(self, k) << (v.byte*8+v.offset)) + for d in reversed(split_bytes(header, mac_header_len)): + self.insert(0, d) + def insert_crc(self): for d in crc32(self): self.append(d) @@ -50,49 +83,62 @@ class MACTXPacket(MACPacket): for d in reversed(preamble): self.insert(0, d) + def encode(self): + self.encode_header() + self.insert_crc() + self.insert_preamble() + + def __repr__(self): + r = "--------\n" + for k in sorted(mac_header.keys()): + r += k + " : 0x%x" %getattr(self,k) + "\n" + r += "payload: " + for d in self: + r += "%02x" %d + return r + class MAC(Module): def __init__(self, phy, debug=False, loopback=False): self.phy = phy self.debug = debug self.loopback = loopback self.tx_packets = [] - self.tx_packet = MACTXPacket() - self.rx_packet = MACRXPacket() + self.tx_packet = MACPacket() + self.rx_packet = MACPacket() self.ip_callback = None + self.arp_callback = None def set_ip_callback(self, callback): self.ip_callback = callback - def send(self, datas): - tx_packet = MACTXPacket(datas) + def set_arp_callback(self, callback): + self.arp_callback = callback + + def send(self, packet): if self.debug: - r = ">>>>>>>>\n" - r += "length " + str(len(tx_packet)) + "\n" - for d in tx_packet: - r += "%02x" %d - print_mac(r) - tx_packet.insert_crc() - tx_packet.insert_preamble() - self.tx_packets.append(tx_packet) + print_mac(">>>>>>>>") + print_mac(packet) + packet.encode() + self.tx_packets.append(packet) def callback(self, datas): - rx_packet = MACRXPacket(datas) - preamble_error = rx_packet.check_remove_preamble() - crc_error = rx_packet.check_remove_crc() + packet = MACPacket(datas) + packet.decode() if self.debug: - r = "<<<<<<<<\n" - r += "preamble_error " + str(preamble_error) + "\n" - r += "crc_error " + str(crc_error) + "\n" - r += "length " + str(len(rx_packet)) + "\n" - for d in rx_packet: - r += "%02x" %d - print_mac(r) - if (not preamble_error) and (not crc_error): - if self.loopback: - self.send(rx_packet) - elif self.ip_callback is not None: - self.ip_callback(rx_packet) + print_mac("<<<<<<<<") + print_mac(packet) + if self.loopback: + self.send(packet) + else: + if self.ethernet_type == ethernet_type_ip: + if self.ip_callback is not None: + self.ip_callback(packet) + elif self.ethernet_type == ethernet_type_arp: + if self.arp_callback is not None: + self.arp_callback(packet) + else: + raise ValueError # XXX handle this properly def gen_simulation(self, selfp): self.tx_packet.done = True