-import random
-
-from migen.fhdl.std import *
-from migen.flow.actor import Sink, Source
-
-from misoclib.ethmac.common import *
-
-class PacketStreamer(Module):
- def __init__(self, data):
- self.source = Source(eth_description(8))
- self.data = data
-
- def gen_simulation(self, selfp):
- for n, data in enumerate(self.data):
- selfp.source.stb = 1
- selfp.source.sop = (n == 0)
- selfp.source.eop = (n == len(self.data)-1)
- selfp.source.payload.d = data
- yield
- while selfp.source.ack == 0:
- yield
- selfp.source.stb = 0
- while random.getrandbits(1):
- yield
-
-class PacketLogger(Module):
- def __init__(self):
- self.sink = Sink(eth_description(8))
- self.data = []
-
- def do_simulation(self, selfp):
- selfp.sink.ack = bool(random.getrandbits(1))
- if selfp.sink.stb and selfp.sink.ack:
- self.data.append(selfp.sink.payload.d)
-
-def print_results(s, l1, l2):
- def comp(l1, l2):
- r = True
- try:
- for i, val in enumerate(l1):
- if val != l2[i]:
- print(s + " : val : {:02X}, exp : {:02X}".format(val, l2[i]))
- r = False
- except:
- r = False
- return r
-
- c = comp(l1, l2)
- r = s + " "
- if c:
- r += "[OK]"
- else:
- r += "[KO]"
- print(r)
--- /dev/null
+import random, copy
+
+from migen.fhdl.std import *
+from migen.flow.actor import Sink, Source
+from migen.genlib.record import *
+
+from misoclib.ethmac.common import *
+
+def seed_to_data(seed, random=True):
+ if random:
+ return (seed * 0x31415979 + 1) & 0xffffffff
+ else:
+ return seed
+
+def check(p1, p2):
+ p1 = copy.deepcopy(p1)
+ p2 = copy.deepcopy(p2)
+ if isinstance(p1, int):
+ return 0, 1, int(p1 != p2)
+ else:
+ if len(p1) >= len(p2):
+ ref, res = p1, p2
+ else:
+ ref, res = p2, p1
+ shift = 0
+ while((ref[0] != res[0]) and (len(res)>1)):
+ res.pop(0)
+ shift += 1
+ length = min(len(ref), len(res))
+ errors = 0
+ for i in range(length):
+ if ref.pop(0) != res.pop(0):
+ errors += 1
+ return shift, length, errors
+
+def randn(max_n):
+ return random.randint(0, max_n-1)
+
+class Packet(list):
+ def __init__(self, init=[]):
+ self.ongoing = False
+ self.done = False
+ for data in init:
+ self.append(data)
+
+class PacketStreamer(Module):
+ def __init__(self, description):
+ self.source = Source(description)
+ ###
+ self.packets = []
+ self.packet = Packet()
+ self.packet.done = 1
+
+ def send(self, packet):
+ packet = copy.deepcopy(packet)
+ self.packets.append(packet)
+
+ def do_simulation(self, selfp):
+ if len(self.packets) and self.packet.done:
+ self.packet = self.packets.pop(0)
+ if not self.packet.ongoing and not self.packet.done:
+ selfp.source.stb = 1
+ selfp.source.sop = 1
+ selfp.source.d = self.packet.pop(0)
+ self.packet.ongoing = True
+ elif selfp.source.stb == 1 and selfp.source.ack == 1:
+ selfp.source.sop = 0
+ selfp.source.eop = (len(self.packet) == 1)
+ if len(self.packet) > 0:
+ selfp.source.stb = 1
+ selfp.source.d = self.packet.pop(0)
+ else:
+ self.packet.done = 1
+ selfp.source.stb = 0
+
+class PacketLogger(Module):
+ def __init__(self, description):
+ self.sink = Sink(description)
+ ###
+ self.packet = Packet()
+
+ def receive(self):
+ self.packet.done = 0
+ while self.packet.done == 0:
+ yield
+
+ def do_simulation(self, selfp):
+ selfp.sink.ack = 1
+ if selfp.sink.stb == 1 and selfp.sink.sop == 1:
+ self.packet = Packet()
+ self.packet.append(selfp.sink.d)
+ elif selfp.sink.stb:
+ self.packet.append(selfp.sink.d)
+ if selfp.sink.stb == 1 and selfp.sink.eop == 1:
+ self.packet.done = True
+
+class AckRandomizer(Module):
+ def __init__(self, description, level=0):
+ self.level = level
+
+ self.sink = Sink(description)
+ self.source = Source(description)
+
+ self.run = Signal()
+
+ self.comb += \
+ If(self.run,
+ Record.connect(self.sink, self.source)
+ ).Else(
+ self.source.stb.eq(0),
+ self.sink.ack.eq(0),
+ )
+
+ def do_simulation(self, selfp):
+ n = randn(100)
+ if n < self.level:
+ selfp.run = 0
+ else:
+ selfp.run = 1
+
from migen.actorlib.crc import *
from misoclib.ethmac.common import *
-from misoclib.ethmac.test import *
+from misoclib.ethmac.test.common import *
-frame_data = [
+payload = [
0x00, 0x0A, 0xE6, 0xF0, 0x05, 0xA3, 0x00, 0x12,
0x34, 0x56, 0x78, 0x90, 0x08, 0x00, 0x45, 0x00,
0x00, 0x30, 0xB3, 0xFE, 0x00, 0x00, 0x80, 0x11,
0x0E, 0x0F, 0x10, 0x11, 0x12, 0x13
]
-frame_crc = [
+crc = [
0x7A, 0xD5, 0x6B, 0xB3
]
+mux = {
+ "inserter": 0,
+ "checker": 1,
+ "both": 2
+}
+
class TB(Module):
- def __init__(self):
+ def __init__(self, random_level=50):
sm = self.submodules
+ sm.streamer = PacketStreamer(eth_description(8))
+ sm.streamer_randomizer = AckRandomizer(eth_description(8), random_level)
+ sm.logger = PacketLogger(eth_description(8))
+ sm.logger_randomizer = AckRandomizer(eth_description(8), random_level)
- # Streamer (DATA) --> CRC32Inserter --> Logger (expect DATA + CRC)
- sm.inserter_streamer = PacketStreamer(frame_data)
- sm.crc32_inserter = CRC32Inserter(eth_description(8))
- sm.inserter_logger = PacketLogger()
- self.comb +=[
- self.inserter_streamer.source.connect(self.crc32_inserter.sink),
- self.crc32_inserter.source.connect(self.inserter_logger.sink),
+ self.comb += [
+ self.streamer.source.connect(self.streamer_randomizer.sink),
+ self.logger_randomizer.source.connect(self.logger.sink)
]
- # Streamer (DATA + CRC) --> CRC32Checher --> Logger (except DATA + CRC + check)
- sm.checker_streamer = PacketStreamer(frame_data + frame_crc)
+ sm.crc32_inserter = CRC32Inserter(eth_description(8))
sm.crc32_checker = CRC32Checker(eth_description(8))
- sm.checker_logger = PacketLogger()
- self.comb +=[
- self.checker_streamer.source.connect(self.crc32_checker.sink),
- self.crc32_checker.source.connect(self.checker_logger.sink),
+
+ self.mux = Signal(2)
+ self.comb += [
+ If(self.mux == mux["inserter"],
+ self.streamer_randomizer.source.connect(self.crc32_inserter.sink),
+ self.crc32_inserter.source.connect(self.logger_randomizer.sink)
+ ).Elif(self.mux == mux["checker"],
+ self.streamer_randomizer.source.connect(self.crc32_checker.sink),
+ self.crc32_checker.source.connect(self.logger_randomizer.sink)
+ ).Elif(self.mux == mux["both"],
+ self.streamer_randomizer.source.connect(self.crc32_inserter.sink),
+ self.crc32_inserter.source.connect(self.crc32_checker.sink),
+ self.crc32_checker.source.connect(self.logger_randomizer.sink)
+ )
]
def gen_simulation(self, selfp):
- for i in range(500):
- yield
- inserter_reference = frame_data + frame_crc
- inserter_generated = self.inserter_logger.data
+ selfp.mux = mux["inserter"]
+ print("streamer --> crc32_inserter --> logger:")
+ self.streamer.send(Packet(payload))
+ yield from self.logger.receive()
+ s, l, e = check(payload+crc, self.logger.packet)
+ print("shift "+ str(s) + " / length " + str(l) + " / errors " + str(e))
- checker_reference = frame_data
- checker_generated = self.checker_logger.data
+ selfp.mux = mux["checker"]
+ print("streamer --> crc32_checker --> logger:")
+ self.streamer.send(Packet(payload+crc))
+ yield from self.logger.receive()
+ s, l, e = check(payload, self.logger.packet)
+ print("shift "+ str(s) + " / length " + str(l) + " / errors " + str(e))
- print_results("inserter", inserter_reference, inserter_generated)
- print_results("checker", checker_reference, checker_generated)
+ selfp.mux = mux["both"]
+ print("streamer --> crc32_inserter --> crc32_checker --> logger:")
+ self.streamer.send(Packet(payload))
+ yield from self.logger.receive()
+ s, l, e = check(payload, self.logger.packet)
+ print("shift "+ str(s) + " / length " + str(l) + " / errors " + str(e))
if __name__ == "__main__":
from migen.sim.generic import run_simulation
- run_simulation(TB(), ncycles=1000, vcd_name="my.vcd", keep_files=True)
+ run_simulation(TB(), ncycles=1000, vcd_name="my.vcd")
from misoclib.ethmac import EthMAC
from misoclib.ethmac.phy import loopback
+from misoclib.ethmac.test.common import *
+
class WishboneMaster:
def __init__(self, obj):
self.obj = obj
length = 1500-2
- payload = [i % 0xFF for i in range(length)] + [0, 0, 0, 0]
+ tx_payload = [seed_to_data(i, True) % 0xFF for i in range(length)] + [0, 0, 0, 0]
errors = 0
for slot in range(2):
+ print("slot {}:".format(slot))
# fill tx memory
for i in range(length//4+1):
- dat = 0
- dat |= payload[4*i+0] << 24
- dat |= payload[4*i+1] << 16
- dat |= payload[4*i+2] << 8
- dat |= payload[4*i+3] << 0
+ dat = int.from_bytes(tx_payload[4*i:4*(i+1)], "big")
yield from wishbone_master.write(sram_reader_slots_offset[slot]+i, dat)
- # send tx data & wait
+ # send tx payload & wait
yield from sram_reader_driver.start(slot, length)
yield from sram_reader_driver.wait_done()
yield from sram_reader_driver.clear_done()
- # get rx data (loopback on PHY Model)
- rx_dat = []
+ # get rx payload (loopback on PHY Model)
+ rx_payload = []
for i in range(length//4+1):
yield from wishbone_master.read(sram_writer_slots_offset[slot]+i)
dat = wishbone_master.dat
- rx_dat.append((dat >> 24) & 0xFF)
- rx_dat.append((dat >> 16) & 0xFF)
- rx_dat.append((dat >> 8) & 0xFF)
- rx_dat.append((dat >> 0) & 0xFF)
-
- # check rx data
- for i in range(length):
- #print("{:02x} / {:02x}".format(rx_dat[i], payload[i]))
- if rx_dat[i] != payload[i]:
- errors += 1
-
- for i in range(200):
- yield
- #print(selfp.ethmac.sram_reader._length.storage)
+ rx_payload += list(dat.to_bytes(4, byteorder='big'))
- print("Errors : {}".format(errors))
+ # check results
+ s, l, e = check(tx_payload[:length], rx_payload[:min(length, len(rx_payload))])
+ print("shift "+ str(s) + " / length " + str(l) + " / errors " + str(e))
if __name__ == "__main__":
- run_simulation(TB(), ncycles=16000, vcd_name="my.vcd", keep_files=True)
+ run_simulation(TB(), vcd_name="my.vcd")
from misoclib.ethmac.common import *
from misoclib.ethmac.preamble import *
-from misoclib.ethmac.test import *
+from misoclib.ethmac.test.common import *
-frame_preamble = [
+preamble = [
0x55, 0x55, 0x55, 0x55, 0x55, 0x55, 0x55, 0xD5
]
-frame_data = [
+payload = [
0x00, 0x0A, 0xE6, 0xF0, 0x05, 0xA3, 0x00, 0x12,
- 0x34, 0x56, 0x78, 0x90, 0x08, 0x00, 0x45, 0x00,
- 0x00, 0x30, 0xB3, 0xFE, 0x00, 0x00, 0x80, 0x11,
- 0x72, 0xBA, 0x0A, 0x00, 0x00, 0x03, 0x0A, 0x00,
- 0x00, 0x02, 0x04, 0x00, 0x04, 0x00, 0x00, 0x1C,
- 0x89, 0x4D, 0x00, 0x01, 0x02, 0x03, 0x04, 0x05,
- 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D,
- 0x0E, 0x0F, 0x10, 0x11, 0x12, 0x13
+ 0x34, 0x56, 0x78, 0x90, 0x08, 0x00, 0x45, 0x00,
+ 0x00, 0x30, 0xB3, 0xFE, 0x00, 0x00, 0x80, 0x11,
+ 0x72, 0xBA, 0x0A, 0x00, 0x00, 0x03, 0x0A, 0x00,
+ 0x00, 0x02, 0x04, 0x00, 0x04, 0x00, 0x00, 0x1C,
+ 0x89, 0x4D, 0x00, 0x01, 0x02, 0x03, 0x04, 0x05,
+ 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D,
+ 0x0E, 0x0F, 0x10, 0x11, 0x12, 0x13
]
+mux = {
+ "inserter": 0,
+ "checker": 1,
+ "both": 2
+}
+
class TB(Module):
- def __init__(self):
+ def __init__(self, random_level=50):
sm = self.submodules
+ sm.streamer = PacketStreamer(eth_description(8))
+ sm.streamer_randomizer = AckRandomizer(eth_description(8), random_level)
+ sm.logger = PacketLogger(eth_description(8))
+ sm.logger_randomizer = AckRandomizer(eth_description(8), random_level)
- # Streamer (DATA) --> PreambleInserter --> Logger (expect PREAMBLE + DATA)
- sm.inserter_streamer = PacketStreamer(frame_data)
- sm.preamble_inserter = PreambleInserter(8)
- sm.inserter_logger = PacketLogger()
- self.comb +=[
- self.inserter_streamer.source.connect(self.preamble_inserter.sink),
- self.preamble_inserter.source.connect(self.inserter_logger.sink),
+ self.comb += [
+ self.streamer.source.connect(self.streamer_randomizer.sink),
+ self.logger_randomizer.source.connect(self.logger.sink)
]
- # Streamer (PREAMBLE + DATA) --> CRC32Checher --> Logger (except DATA + check)
- sm.checker_streamer = PacketStreamer(frame_preamble + frame_data)
+ sm.preamble_inserter = PreambleInserter(8)
sm.preamble_checker = PreambleChecker(8)
- sm.checker_logger = PacketLogger()
- self.comb +=[
- self.checker_streamer.source.connect(self.preamble_checker.sink),
- self.preamble_checker.source.connect(self.checker_logger.sink),
- ]
+ self.mux = Signal(2)
+ self.comb += [
+ If(self.mux == mux["inserter"],
+ self.streamer_randomizer.source.connect(self.preamble_inserter.sink),
+ self.preamble_inserter.source.connect(self.logger_randomizer.sink)
+ ).Elif(self.mux == mux["checker"],
+ self.streamer_randomizer.source.connect(self.preamble_checker.sink),
+ self.preamble_checker.source.connect(self.logger_randomizer.sink)
+ ).Elif(self.mux == mux["both"],
+ self.streamer_randomizer.source.connect(self.preamble_inserter.sink),
+ self.preamble_inserter.source.connect(self.preamble_checker.sink),
+ self.preamble_checker.source.connect(self.logger_randomizer.sink)
+ )
+ ]
def gen_simulation(self, selfp):
- for i in range(500):
- yield
- inserter_reference = frame_preamble + frame_data
- inserter_generated = self.inserter_logger.data
+ selfp.mux = mux["inserter"]
+ print("streamer --> preamble_inserter --> logger:")
+ self.streamer.send(Packet(payload))
+ yield from self.logger.receive()
+ s, l, e = check(preamble+payload, self.logger.packet)
+ print("shift "+ str(s) + " / length " + str(l) + " / errors " + str(e))
- checker_reference = frame_data
- checker_generated = self.checker_logger.data
+ selfp.mux = mux["checker"]
+ print("streamer --> preamble_checker --> logger:")
+ self.streamer.send(Packet(preamble+payload))
+ yield from self.logger.receive()
+ s, l, e = check(payload, self.logger.packet)
+ print("shift "+ str(s) + " / length " + str(l) + " / errors " + str(e))
- print_results("inserter", inserter_reference, inserter_generated)
- print_results("checker", checker_reference, checker_generated)
+ selfp.mux = mux["both"]
+ print("streamer --> preamble_inserter --> preamble_checker --> logger:")
+ self.streamer.send(Packet(payload))
+ yield from self.logger.receive()
+ s, l, e = check(payload, self.logger.packet)
+ print("shift "+ str(s) + " / length " + str(l) + " / errors " + str(e))
if __name__ == "__main__":
from migen.sim.generic import run_simulation
- run_simulation(TB(), ncycles=1000, vcd_name="my.vcd", keep_files=True)
+ run_simulation(TB(), ncycles=1000, vcd_name="my.vcd")