selfp.source.last_be = self.last_be
else:
selfp.source.eop = 0
- selfp.source.last_be = 0
+ if self.last_be is not None:
+ selfp.source.last_be = 0
if len(self.packet) > 0:
selfp.source.stb = 1
selfp.source.data = self.packet.pop(0)
class TB(Module):
def __init__(self):
- self.submodules.hostphy = phy.PHY(8, debug=True)
- self.submodules.hostmac = mac.MAC(self.hostphy, debug=True, random_level=0)
+ self.submodules.hostphy = phy.PHY(8, debug=False)
+ self.submodules.hostmac = mac.MAC(self.hostphy, debug=False, loopback=True)
self.submodules.ethmac = LiteEthMAC(phy=self.hostphy, dw=32, interface="core", with_hw_preamble_crc=True)
self.submodules.streamer = PacketStreamer(eth_mac_description(32), last_be=1)
- self.submodules.streamer_randomizer = AckRandomizer(eth_mac_description(32), level=0)
+ self.submodules.streamer_randomizer = AckRandomizer(eth_mac_description(32), level=50)
- self.submodules.logger_randomizer = AckRandomizer(eth_mac_description(32), level=0)
+ self.submodules.logger_randomizer = AckRandomizer(eth_mac_description(32), level=50)
self.submodules.logger = PacketLogger(eth_mac_description(32))
# use sys_clk for each clock_domain
for i in range(8):
streamer_packet = Packet([i for i in range(64)])
- print(streamer_packet)
yield from self.streamer.send(streamer_packet)
+ yield from self.logger.receive()
+
+ # check results
+ s, l, e = check(streamer_packet, self.logger.packet)
+ print("shift "+ str(s) + " / length " + str(l) + " / errors " + str(e))
if __name__ == "__main__":
- run_simulation(TB(), ncycles=1000, vcd_name="my.vcd", keep_files=True)
+ run_simulation(TB(), ncycles=4000, vcd_name="my.vcd", keep_files=True)
from liteeth.mac.common import *
from liteeth.test.common import *
+def print_mac(s):
+ print_with_prefix(s, "[MAC]")
+
+preamble = [0x55, 0x55, 0x55, 0x55, 0x55, 0x55, 0x55, 0xD5]
+
def crc32(l):
crc = []
crc_bytes = binascii.crc32(bytes(l)).to_bytes(4, byteorder="little")
class MACRXPacket(MACPacket):
def check_remove_preamble(self):
- if comp(self[0:8], [0x55, 0x55, 0x55, 0x55, 0x55, 0x55, 0x55, 0xD5]):
+ if comp(self[0:8], preamble):
for i in range(8):
self.pop(0)
return False
class MACTXPacket(MACPacket):
def insert_crc(self):
- return self
+ for d in crc32(self):
+ self.append(d)
def insert_preamble(self):
- return self
+ for d in reversed(preamble):
+ self.insert(0, d)
class MAC(Module):
- def __init__(self, phy, debug=False, random_level=0):
+ def __init__(self, phy, debug=False, loopback=False):
self.phy = phy
self.debug = debug
- self.random_level = random_level
+ self.loopback = loopback
self.tx_packets = []
self.tx_packet = MACTXPacket()
self.rx_packet = MACRXPacket()
def send(self, datas):
tx_packet = MACTXPacket(datas)
+ if self.debug:
+ r = ">>>>>>>>\n"
+ r += "length " + str(len(tx_packet)) + "\n"
+ for d in tx_packet:
+ r += "%02x" %d
+ print_mac(r)
tx_packet.insert_crc()
tx_packet.insert_preamble()
self.tx_packets.append(tx_packet)
rx_packet = MACRXPacket(datas)
preamble_error = rx_packet.check_remove_preamble()
crc_error = rx_packet.check_remove_crc()
+ if self.debug:
+ r = "<<<<<<<<\n"
+ r += "preamble_error " + str(preamble_error) + "\n"
+ r += "crc_error " + str(crc_error) + "\n"
+ r += "length " + str(len(rx_packet)) + "\n"
+ for d in rx_packet:
+ r += "%02x" %d
+ print_mac(r)
if (not preamble_error) and (not crc_error):
- if self.ip_callback is not None:
+ if self.loopback:
+ self.send(rx_packet)
+ elif self.ip_callback is not None:
self.ip_callback(rx_packet)
def gen_simulation(self, selfp):
from liteeth.mac.common import *
from liteeth.test.common import *
+def print_phy(s):
+ print_with_prefix(s, "[PHY]")
+
# PHY model
class PHYSource(PacketStreamer):
def __init__(self, dw):
PacketLogger.__init__(self, eth_phy_description(dw))
class PHY(Module):
- def __init__(self, dw, debug):
+ def __init__(self, dw, debug=False):
self.dw = dw
self.debug = debug
def send(self, datas):
packet = Packet(datas)
- yield from self.phy_source.send(packet, blocking)
+ if self.debug:
+ r = ">>>>>>>>\n"
+ r += "length " + str(len(datas)) + "\n"
+ for d in datas:
+ r += "%02x" %d
+ print_phy(r)
+ yield from self.phy_source.send(packet)
def receive(self):
yield from self.phy_sink.receive()
+ if self.debug:
+ r = "<<<<<<<<\n"
+ r += "length " + str(len(self.phy_sink.packet)) + "\n"
+ for d in self.phy_sink.packet:
+ r += "%02x" %d
+ print_phy(r)
self.packet = self.phy_sink.packet