# TestAXILite --------------------------------------------------------------------------------------
+def _int_or_call(int_or_func):
+ if callable(int_or_func):
+ return int_or_func()
+ return int_or_func
+
class AXILiteChecker:
- def __init__(self, ready_latency=None, response_latency=None, rdata_generator=None):
- self.ready_latency = ready_latency or (lambda: 0)
- self.response_latency = response_latency or (lambda: 0)
+ def __init__(self, ready_latency=0, response_latency=0, rdata_generator=None):
+ self.ready_latency = ready_latency
+ self.response_latency = response_latency
self.rdata_generator = rdata_generator or (lambda adr: 0xbaadc0de)
self.writes = [] # (addr, data, strb)
self.reads = [] # (addr, data)
def delay(self, latency):
- for _ in range(latency()):
+ for _ in range(_int_or_call(latency)):
yield
def handle_write(self, axi_lite):
yield
@passive
-def timeout(ticks):
- for _ in range(ticks):
+def timeout_generator(ticks):
+ import os
+ for i in range(ticks):
+ if os.environ.get("TIMEOUT_DEBUG", "") == "1":
+ print("tick {}".format(i))
yield
raise TimeoutError("Timeout after %d ticks" % ticks)
# TestAXILiteInterconnet ---------------------------------------------------------------------------
-class TestAXILiteInterconnect(unittest.TestCase):
- def axilite_pattern_generator(self, axi_lite, pattern, delay=0):
- for rw, addr, data in pattern:
+class AXILitePatternGenerator:
+ def __init__(self, axi_lite, pattern, delay=0):
+ self.axi_lite = axi_lite
+ self.pattern = pattern
+ self.delay = delay
+ self.errors = 0
+ self.read_errors = []
+ self.resp_errors = {"w": 0, "r": 0}
+
+ def handler(self):
+ for rw, addr, data in self.pattern:
assert rw in ["w", "r"]
if rw == "w":
- resp = (yield from axi_lite.write(addr, data, 2**len(axi_lite.w.strb) - 1))
- self.assertEqual(resp, RESP_OKAY)
+ strb = 2**len(self.axi_lite.w.strb) - 1
+ resp = (yield from self.axi_lite.write(addr, data, strb))
else:
- rdata, resp = (yield from axi_lite.read(addr))
- self.assertEqual(resp, RESP_OKAY)
- self.assertEqual(rdata, data)
- for _ in range(delay):
+ rdata, resp = (yield from self.axi_lite.read(addr))
+ if rdata != data:
+ self.read_errors.append((rdata, data))
+ self.errors += 1
+ if resp != RESP_OKAY:
+ self.resp_errors[rw] += 1
+ self.errors += 1
+ for _ in range(_int_or_call(self.delay)):
yield
for _ in range(16):
yield
+class TestAXILiteInterconnect(unittest.TestCase):
def test_interconnect_p2p(self):
class DUT(Module):
def __init__(self):
dut = DUT()
checker = AXILiteChecker(rdata_generator=rdata_generator)
generators = [
- self.axilite_pattern_generator(dut.master, pattern),
+ AXILitePatternGenerator(dut.master, pattern).handler(),
checker.handler(dut.slave),
]
run_simulation(dut, generators)
generators = [
generator(dut.master),
checker(dut.slave),
- timeout(300),
+ timeout_generator(300),
]
run_simulation(dut, generators)
dut = DUT(n_masters)
checker = AXILiteChecker()
generators = [generator(i, master, delay=0) for i, master in enumerate(dut.masters)]
- generators += [timeout(300), checker.handler(dut.slave)]
+ generators += [timeout_generator(300), checker.handler(dut.slave)]
run_simulation(dut, generators)
order = [0, 1, 2, 3, 100, 101, 102, 103, 200, 201, 202, 203]
self.assertEqual([addr for addr, data, strb in checker.writes], order)
dut = DUT(n_masters)
checker = AXILiteChecker()
generators = [generator(i, master, delay=1) for i, master in enumerate(dut.masters)]
- generators += [timeout(300), checker.handler(dut.slave)]
+ generators += [timeout_generator(300), checker.handler(dut.slave)]
run_simulation(dut, generators)
order = [0, 100, 200, 1, 101, 201, 2, 102, 202, 3, 103, 203]
self.assertEqual([addr for addr, data, strb in checker.writes], order)
dut = DUT(n_masters)
checker = AXILiteChecker(response_latency=lambda: 3)
generators = [generator(i, master, delay=0) for i, master in enumerate(dut.masters)]
- generators += [timeout(300), checker.handler(dut.slave)]
+ generators += [timeout_generator(300), checker.handler(dut.slave)]
run_simulation(dut, generators)
order = [0, 1, 2, 3, 100, 101, 102, 103, 200, 201, 202, 203]
self.assertEqual([addr for addr, data, strb in checker.writes], order)
dut = DUT(n_masters)
checker = AXILiteChecker(response_latency=lambda: 3)
generators = [generator(i, master, delay=1) for i, master in enumerate(dut.masters)]
- generators += [timeout(300), checker.handler(dut.slave)]
+ generators += [timeout_generator(300), checker.handler(dut.slave)]
run_simulation(dut, generators)
order = [0, 100, 200, 1, 101, 201, 2, 102, 202, 3, 103, 203]
self.assertEqual([addr for addr, data, strb in checker.writes], order)
self.assertEqual([addr for addr, data in checker.reads], order)
+ def address_decoder(self, i, size=0x100, python=False):
+ # bytes to 32-bit words aligned
+ _size = (size) >> 2
+ _origin = (size * i) >> 2
+ if python: # for python integers
+ shift = log2_int(_size)
+ return lambda a: ((a >> shift) == (_origin >> shift))
+ # for migen signals
+ return lambda a: (a[log2_int(_size):] == (_origin >> log2_int(_size)))
+
def decoder_test(self, n_slaves, pattern, generator_delay=0):
class DUT(Module):
def __init__(self, decoders):
slaves = list(zip(decoders, self.slaves))
self.submodules.decoder = AXILiteDecoder(self.master, slaves)
- def decoder(i):
- # bytes to 32-bit words aligned
- size = (0x100) >> 2
- origin = (0x100 * i) >> 2
- return lambda a: (a[log2_int(size):] == (origin >> log2_int(size)))
-
def rdata_generator(adr):
for rw, a, v in pattern:
if rw == "r" and a == adr:
return v
return 0xbaadc0de
- dut = DUT([decoder(i) for i in range(n_slaves)])
+ dut = DUT([self.address_decoder(i) for i in range(n_slaves)])
checkers = [AXILiteChecker(rdata_generator=rdata_generator) for _ in dut.slaves]
- generators = [self.axilite_pattern_generator(dut.master, pattern, delay=generator_delay)]
+ generators = [AXILitePatternGenerator(dut.master, pattern, delay=generator_delay).handler()]
generators += [checker.handler(slave) for (slave, checker) in zip(dut.slaves, checkers)]
- generators += [timeout(300)]
- run_simulation(dut, generators, vcd_name='sim.vcd')
+ generators += [timeout_generator(300)]
+ run_simulation(dut, generators)
return checkers
self.decoder_test(n_slaves=3, pattern=[
("r", 0x300, 1),
])
+
+ def interconnect_shared_test(self, master_patterns, slave_decoders,
+ master_delay=0, slave_ready_latency=0, slave_response_latency=0,
+ timeout=300, **kwargs):
+ # number of masters/slaves is defined by the number of patterns/decoders
+ # master_patterns: list of patterns per master, pattern = list(tuple(rw, addr, data))
+ # slave_decoders: list of address decoders per slave
+ class DUT(Module):
+ def __init__(self, n_masters, decoders, **kwargs):
+ self.masters = [AXILiteInterface(name="master") for _ in range(n_masters)]
+ self.slaves = [AXILiteInterface(name="slave") for _ in range(len(decoders))]
+ slaves = list(zip(decoders, self.slaves))
+ self.submodules.interconnect = AXILiteInterconnectShared(self.masters, slaves, **kwargs)
+
+ class ReadDataGenerator:
+ # Generates data based on decoded addresses and data defined in master_patterns
+ def __init__(self, patterns):
+ self.mem = {}
+ for pattern in patterns:
+ for rw, addr, val in pattern:
+ if rw == "r":
+ assert addr not in self.mem
+ self.mem[addr] = val
+
+ def getter(self, n):
+ # on miss will give default data depending on slave n
+ return lambda addr: self.mem.get(addr, 0xbaad0000 + n)
+
+ def new_checker(rdata_generator):
+ return AXILiteChecker(ready_latency=slave_ready_latency,
+ response_latency=slave_response_latency,
+ rdata_generator=rdata_generator)
+
+ # perpare test
+ dut = DUT(len(master_patterns), slave_decoders, **kwargs)
+ rdata_generator = ReadDataGenerator(master_patterns)
+ checkers = [new_checker(rdata_generator.getter(i)) for i, _ in enumerate(master_patterns)]
+ pattern_generators = [AXILitePatternGenerator(dut.masters[i], pattern, delay=master_delay)
+ for i, pattern in enumerate(master_patterns)]
+
+ # run simulator
+ generators = [gen.handler() for gen in pattern_generators]
+ generators += [checker.handler(slave) for (slave, checker) in zip(dut.slaves, checkers)]
+ generators += [timeout_generator(timeout)]
+ run_simulation(dut, generators)
+
+ return pattern_generators, checkers
+
+ def test_interconnect_shared_basic(self):
+ master_patterns = [
+ [("w", 0x000, 0), ("w", 0x101, 0), ("w", 0x202, 0)],
+ [("w", 0x010, 0), ("w", 0x111, 0), ("w", 0x112, 0)],
+ [("w", 0x220, 0), ("w", 0x221, 0), ("w", 0x222, 0)],
+ ]
+ slave_decoders = [self.address_decoder(i) for i in range(3)]
+
+ generators, checkers = self.interconnect_shared_test(master_patterns, slave_decoders,
+ master_delay=1)
+
+ for gen in generators:
+ self.assertEqual(gen.errors, 0)
+
+ def addr(checker_list):
+ return [entry[0] for entry in checker_list]
+
+ self.assertEqual(addr(checkers[0].writes), [0x000, 0x010])
+ self.assertEqual(addr(checkers[1].writes), [0x101, 0x111, 0x112])
+ self.assertEqual(addr(checkers[2].writes), [0x220, 0x221, 0x202, 0x222])
+ self.assertEqual(addr(checkers[0].reads), [])
+ self.assertEqual(addr(checkers[1].reads), [])
+ self.assertEqual(addr(checkers[2].reads), [])
+
+ def interconnect_shared_stress_test(self, timeout=1000, **kwargs):
+ prng = random.Random(42)
+
+ n_masters = 3
+ n_slaves = 3
+ pattern_length = 64
+ slave_region_size = 0x10000000
+ # for testing purpose each master will access only its own region of a slave
+ master_region_size = 0x1000
+ assert n_masters*master_region_size < slave_region_size
+
+ def gen_pattern(n, length):
+ assert length < master_region_size
+ for i_access in range(length):
+ rw = "w" if prng.randint(0, 1) == 0 else "r"
+ i_slave = prng.randrange(n_slaves)
+ addr = i_slave*slave_region_size + n*master_region_size + i_access
+ data = addr
+ yield rw, addr, data
+
+ master_patterns = [list(gen_pattern(i, pattern_length)) for i in range(n_masters)]
+ slave_decoders = [self.address_decoder(i, size=slave_region_size) for i in range(n_slaves)]
+ slave_decoders_py = [self.address_decoder(i, size=slave_region_size, python=True)
+ for i in range(n_slaves)]
+
+ generators, checkers = self.interconnect_shared_test(master_patterns, slave_decoders,
+ timeout=timeout, **kwargs)
+
+ for gen in generators:
+ read_errors = [" 0x{:08x} vs 0x{:08x}".format(v, ref) for v, ref in gen.read_errors]
+ msg = "\ngen.resp_errors = {}\ngen.read_errors = \n{}".format(
+ gen.resp_errors, "\n".join(read_errors))
+ self.assertEqual(gen.errors, 0, msg=msg)
+
+ # make sure all the accesses at slave side are in correct address region
+ for i_slave, (checker, decoder) in enumerate(zip(checkers, slave_decoders_py)):
+ for addr in (entry[0] for entry in checker.writes + checker.reads):
+ # compensate for the fact that decoders work on word-aligned addresses
+ self.assertNotEqual(decoder(addr >> 2), 0)
+
+ def test_interconnect_shared_stress_no_delay(self):
+ self.interconnect_shared_stress_test(timeout=1000,
+ master_delay=0,
+ slave_ready_latency=0,
+ slave_response_latency=0)
+
+ def test_interconnect_shared_stress_rand_short(self):
+ prng = random.Random(42)
+ rand = lambda: prng.randrange(4)
+ self.interconnect_shared_stress_test(timeout=2000,
+ master_delay=rand,
+ slave_ready_latency=rand,
+ slave_response_latency=rand)
+
+ def test_interconnect_shared_stress_rand_long(self):
+ prng = random.Random(42)
+ rand = lambda: prng.randrange(16)
+ self.interconnect_shared_stress_test(timeout=4000,
+ master_delay=rand,
+ slave_ready_latency=rand,
+ slave_response_latency=rand)