class TB(Module):
def __init__(self):
- self.submodules.hostphy = phy.PHY(8, debug=False)
- self.submodules.hostmac = mac.MAC(self.hostphy, debug=False, loopback=True)
- self.submodules.core = LiteEthMACCore(phy=self.hostphy, dw=32, with_hw_preamble_crc=True)
+ self.submodules.phy_model = phy.PHY(8, debug=False)
+ self.submodules.mac_model = mac.MAC(self.phy_model, debug=True, loopback=True)
+ self.submodules.core = LiteEthMACCore(phy=self.phy_model, dw=8, with_hw_preamble_crc=True)
- self.submodules.streamer = PacketStreamer(eth_phy_description(32), last_be=1)
- self.submodules.streamer_randomizer = AckRandomizer(eth_phy_description(32), level=50)
+ self.submodules.streamer = PacketStreamer(eth_phy_description(8), last_be=1)
+ self.submodules.streamer_randomizer = AckRandomizer(eth_phy_description(8), level=50)
- self.submodules.logger_randomizer = AckRandomizer(eth_phy_description(32), level=50)
- self.submodules.logger = PacketLogger(eth_phy_description(32))
+ self.submodules.logger_randomizer = AckRandomizer(eth_phy_description(8), level=50)
+ self.submodules.logger = PacketLogger(eth_phy_description(8))
# use sys_clk for each clock_domain
self.clock_domains.cd_eth_rx = ClockDomain()
selfp.cd_eth_tx.rst = 0
for i in range(8):
- streamer_packet = Packet([i for i in range(64)])
- yield from self.streamer.send(streamer_packet)
+ packet = mac.MACPacket([i for i in range(64)])
+ packet.destination_mac_address = 0x010203040506
+ packet.source_mac_address = 0x090A0B0C0C0D
+ packet.ethernet_type = 0x0800
+ packet.encode_header()
+ yield from self.streamer.send(packet)
yield from self.logger.receive()
# check results
- s, l, e = check(streamer_packet, self.logger.packet)
+ s, l, e = check(packet, self.logger.packet)
print("shift "+ str(s) + " / length " + str(l) + " / errors " + str(e))
if __name__ == "__main__":
-import binascii
+import math, binascii
from liteeth.common import *
from liteeth.mac.common import *
from liteeth.test.common import *
+def split_bytes(v, n):
+ r = []
+ r_bytes = v.to_bytes(n, byteorder="little")
+ for byte in r_bytes:
+ r.append(int(byte))
+ return r
+
+def merge_bytes(b):
+ return int.from_bytes(bytes(b), "little")
+
+def get_field_data(field, datas):
+ v = merge_bytes(datas[field.byte:field.byte+math.ceil(field.width/8)])
+ return (v >> field.offset) & (2**field.width-1)
+
def print_mac(s):
print_with_prefix(s, "[MAC]")
-preamble = [0x55, 0x55, 0x55, 0x55, 0x55, 0x55, 0x55, 0xD5]
+preamble = split_bytes(eth_preamble, 8)
def crc32(l):
crc = []
- crc_bytes = binascii.crc32(bytes(l)).to_bytes(4, byteorder="little")
+ crc_bytes = split_bytes(binascii.crc32(bytes(l)), 4)
for byte in crc_bytes:
crc.append(int(byte))
return crc
# MAC model
-class MACPacket(list):
+class MACPacket(Packet):
def __init__(self, init=[]):
- self.ongoing = False
- self.done = False
- for byte in init:
- self.append(byte)
+ Packet.__init__(self, init)
+ self.preamble_error = False
+ self.crc_error = False
-class MACRXPacket(MACPacket):
def check_remove_preamble(self):
if comp(self[0:8], preamble):
for i in range(8):
else:
return True
-class MACTXPacket(MACPacket):
+ def decode_remove_header(self):
+ header = []
+ for byte in self[:mac_header_len]:
+ header.append(self.pop(0))
+ for k, v in sorted(mac_header.items()):
+ setattr(self, k, get_field_data(v, header))
+
+ def decode(self):
+ self.preamble_error = self.check_remove_preamble()
+ self.crc_error = self.check_remove_crc()
+ if self.crc_error or self.preamble_error:
+ raise ValueError # XXX handle this properly
+ else:
+ self.decode_remove_header()
+
+ def encode_header(self):
+ header = 0
+ for k, v in sorted(mac_header.items()):
+ header |= (getattr(self, k) << (v.byte*8+v.offset))
+ for d in reversed(split_bytes(header, mac_header_len)):
+ self.insert(0, d)
+
def insert_crc(self):
for d in crc32(self):
self.append(d)
for d in reversed(preamble):
self.insert(0, d)
+ def encode(self):
+ self.encode_header()
+ self.insert_crc()
+ self.insert_preamble()
+
+ def __repr__(self):
+ r = "--------\n"
+ for k in sorted(mac_header.keys()):
+ r += k + " : 0x%x" %getattr(self,k) + "\n"
+ r += "payload: "
+ for d in self:
+ r += "%02x" %d
+ return r
+
class MAC(Module):
def __init__(self, phy, debug=False, loopback=False):
self.phy = phy
self.debug = debug
self.loopback = loopback
self.tx_packets = []
- self.tx_packet = MACTXPacket()
- self.rx_packet = MACRXPacket()
+ self.tx_packet = MACPacket()
+ self.rx_packet = MACPacket()
self.ip_callback = None
+ self.arp_callback = None
def set_ip_callback(self, callback):
self.ip_callback = callback
- def send(self, datas):
- tx_packet = MACTXPacket(datas)
+ def set_arp_callback(self, callback):
+ self.arp_callback = callback
+
+ def send(self, packet):
if self.debug:
- r = ">>>>>>>>\n"
- r += "length " + str(len(tx_packet)) + "\n"
- for d in tx_packet:
- r += "%02x" %d
- print_mac(r)
- tx_packet.insert_crc()
- tx_packet.insert_preamble()
- self.tx_packets.append(tx_packet)
+ print_mac(">>>>>>>>")
+ print_mac(packet)
+ packet.encode()
+ self.tx_packets.append(packet)
def callback(self, datas):
- rx_packet = MACRXPacket(datas)
- preamble_error = rx_packet.check_remove_preamble()
- crc_error = rx_packet.check_remove_crc()
+ packet = MACPacket(datas)
+ packet.decode()
if self.debug:
- r = "<<<<<<<<\n"
- r += "preamble_error " + str(preamble_error) + "\n"
- r += "crc_error " + str(crc_error) + "\n"
- r += "length " + str(len(rx_packet)) + "\n"
- for d in rx_packet:
- r += "%02x" %d
- print_mac(r)
- if (not preamble_error) and (not crc_error):
- if self.loopback:
- self.send(rx_packet)
- elif self.ip_callback is not None:
- self.ip_callback(rx_packet)
+ print_mac("<<<<<<<<")
+ print_mac(packet)
+ if self.loopback:
+ self.send(packet)
+ else:
+ if self.ethernet_type == ethernet_type_ip:
+ if self.ip_callback is not None:
+ self.ip_callback(packet)
+ elif self.ethernet_type == ethernet_type_arp:
+ if self.arp_callback is not None:
+ self.arp_callback(packet)
+ else:
+ raise ValueError # XXX handle this properly
def gen_simulation(self, selfp):
self.tx_packet.done = True