def __init__(self, master, slave):
self.comb += master.connect(slave)
+
+class AXILiteRequestCounter(Module):
+ def __init__(self, request, response, max_requests=256):
+ self.counter = counter = Signal(max=max_requests)
+ self.full = full = Signal()
+ self.empty = empty = Signal()
+ self.stall = stall = Signal()
+
+ self.comb += [
+ full.eq(counter == max_requests - 1),
+ empty.eq(counter == 0),
+ stall.eq(request & full),
+ ]
+
+ self.sync += [
+ If(request & response,
+ counter.eq(counter)
+ ).Elif(request & ~full,
+ counter.eq(counter + 1)
+ ).Elif(response & ~empty,
+ counter.eq(counter - 1)
+ ),
+ ]
+
class AXILiteArbiter(Module):
"""AXI Lite arbiter
Arbitrate between master interfaces and connect one to the target.
- Arbitration is done separately for write and read channels.
+ New master will not be selected until all requests have been responded to.
+ Arbitration for write and read channels is done separately.
"""
def __init__(self, masters, target):
- self.submodules.rr_write = roundrobin.RoundRobin(len(masters))
- self.submodules.rr_read = roundrobin.RoundRobin(len(masters))
+ self.submodules.rr_write = roundrobin.RoundRobin(len(masters), roundrobin.SP_CE)
+ self.submodules.rr_read = roundrobin.RoundRobin(len(masters), roundrobin.SP_CE)
def get_sig(interface, channel, name):
return getattr(getattr(interface, channel), name)
else:
self.comb += dest.eq(source)
+ # allow to change rr.grant only after all requests from a master have been responded to
+ self.submodules.wr_counter = wr_counter = AXILiteRequestCounter(
+ request=target.aw.valid & target.aw.ready, response=target.b.valid & target.b.ready)
+ self.submodules.rd_counter = rd_counter = AXILiteRequestCounter(
+ request=target.ar.valid & target.ar.ready, response=target.r.valid & target.r.ready)
+
+ # switch to next request only if there are no responses pending
+ self.comb += [
+ self.rr_write.ce.eq(~(target.aw.valid | target.w.valid | target.b.valid) & wr_counter.empty),
+ self.rr_read.ce.eq(~(target.ar.valid | target.r.valid) & rd_counter.empty),
+ ]
+
# connect bus requests to round-robin selectors
- self.comb += self.rr_write.request.eq(Cat(*[m.aw.valid | m.w.valid | m.b.valid for m in masters]))
- self.comb += self.rr_read.request.eq(Cat(*[m.ar.valid | m.r.valid for m in masters]))
+ self.comb += [
+ self.rr_write.request.eq(Cat(*[m.aw.valid | m.w.valid | m.b.valid for m in masters])),
+ self.rr_read.request.eq(Cat(*[m.ar.valid | m.r.valid for m in masters])),
+ ]
class AXILiteDecoder(Module):
# slaves is a list of pairs:
# TestAXILite --------------------------------------------------------------------------------------
class AXILiteChecker:
- def __init__(self, latency=None, rdata_generator=None):
- self.latency = latency or (lambda: 0)
+ def __init__(self, ready_latency=None, response_latency=None, rdata_generator=None):
+ self.ready_latency = ready_latency or (lambda: 0)
+ self.response_latency = response_latency or (lambda: 0)
self.rdata_generator = rdata_generator or (lambda adr: 0xbaadc0de)
self.writes = [] # (addr, data, strb)
self.reads = [] # (addr, data)
- def delay(self):
- for _ in range(self.latency()):
+ def delay(self, latency):
+ for _ in range(latency()):
yield
def handle_write(self, axi_lite):
while not (yield axi_lite.aw.valid):
yield
- yield from self.delay()
+ yield from self.delay(self.ready_latency)
addr = (yield axi_lite.aw.addr)
yield axi_lite.aw.ready.eq(1)
yield
yield axi_lite.aw.ready.eq(0)
while not (yield axi_lite.w.valid):
yield
- yield from self.delay()
+ yield from self.delay(self.ready_latency)
data = (yield axi_lite.w.data)
strb = (yield axi_lite.w.strb)
yield axi_lite.w.ready.eq(1)
yield
yield axi_lite.w.ready.eq(0)
+ yield from self.delay(self.response_latency)
yield axi_lite.b.valid.eq(1)
yield axi_lite.b.resp.eq(RESP_OKAY)
yield
def handle_read(self, axi_lite):
while not (yield axi_lite.ar.valid):
yield
- yield from self.delay()
+ yield from self.delay(self.ready_latency)
addr = (yield axi_lite.ar.addr)
yield axi_lite.ar.ready.eq(1)
yield
yield axi_lite.ar.ready.eq(0)
+ yield from self.delay(self.response_latency)
data = self.rdata_generator(addr)
yield axi_lite.r.valid.eq(1)
yield axi_lite.r.resp.eq(RESP_OKAY)
return _latency
dut = DUT(width_from=width_from, width_to=width_to)
- checker = AXILiteChecker(latency, rdata_generator)
+ checker = AXILiteChecker(ready_latency=latency, rdata_generator=rdata_generator)
run_simulation(dut, [generator(dut.master), checker.handler(dut.slave)])
self.assertEqual(checker.writes, write_expected)
self.assertEqual(checker.reads, read_expected)
self.converter_test(width_from=32, width_to=16,
write_pattern=write_pattern, write_expected=write_expected)
+# TestAXILiteInterconnet ---------------------------------------------------------------------------
+
+class TestAXILiteInterconnect(unittest.TestCase):
def axilite_pattern_generator(self, axi_lite, pattern):
for rw, addr, data in pattern:
assert rw in ["w", "r"]
for _ in range(16):
yield
- def test_axilite_interconnect_p2p(self):
+ def test_interconnect_p2p(self):
class DUT(Module):
def __init__(self):
self.master = master = AXILiteInterface()
self.assertEqual(checker.writes, [(addr, data, 0b1111) for rw, addr, data in pattern if rw == "w"])
self.assertEqual(checker.reads, [(addr, data) for rw, addr, data in pattern if rw == "r"])
- def test_axilite_timeout(self):
+ def test_timeout(self):
class DUT(Module):
def __init__(self):
self.master = master = AXILiteInterface()
]
run_simulation(dut, generators)
- def test_axilite_arbiter(self):
+ def test_arbiter_order(self):
class DUT(Module):
def __init__(self, n_masters):
self.masters = [AXILiteInterface() for _ in range(n_masters)]
checker = AXILiteChecker()
generators = [generator(i, master, delay=0) for i, master in enumerate(dut.masters)]
generators += [timeout(300), checker.handler(dut.slave)]
- run_simulation(dut, generators, vcd_name='sim.vcd')
+ run_simulation(dut, generators)
order = [0, 1, 2, 3, 100, 101, 102, 103, 200, 201, 202, 203]
self.assertEqual([addr for addr, data, strb in checker.writes], order)
self.assertEqual([addr for addr, data in checker.reads], order)
order = [0, 100, 200, 1, 101, 201, 2, 102, 202, 3, 103, 203]
self.assertEqual([addr for addr, data, strb in checker.writes], order)
self.assertEqual([addr for addr, data in checker.reads], order)
+
+ def test_arbiter_holds_grant_until_response(self):
+ class DUT(Module):
+ def __init__(self, n_masters):
+ self.masters = [AXILiteInterface() for _ in range(n_masters)]
+ self.slave = AXILiteInterface()
+ self.submodules.arbiter = AXILiteArbiter(self.masters, self.slave)
+
+ def generator(n, axi_lite, delay=0):
+ def gen(i):
+ return 100*n + i
+
+ for i in range(4):
+ resp = (yield from axi_lite.write(gen(i), gen(i)))
+ self.assertEqual(resp, RESP_OKAY)
+ for _ in range(delay):
+ yield
+ for i in range(4):
+ data, resp = (yield from axi_lite.read(gen(i)))
+ self.assertEqual(resp, RESP_OKAY)
+ for _ in range(delay):
+ yield
+ for _ in range(8):
+ yield
+
+
+ n_masters = 3
+
+ # with no delay each master will do all transfers at once
+ with self.subTest(delay=0):
+ dut = DUT(n_masters)
+ checker = AXILiteChecker(response_latency=lambda: 3)
+ generators = [generator(i, master, delay=0) for i, master in enumerate(dut.masters)]
+ generators += [timeout(300), checker.handler(dut.slave)]
+ run_simulation(dut, generators)
+ order = [0, 1, 2, 3, 100, 101, 102, 103, 200, 201, 202, 203]
+ self.assertEqual([addr for addr, data, strb in checker.writes], order)
+ self.assertEqual([addr for addr, data in checker.reads], order)
+
+ # with some delay, the round-robin arbiter will iterate over masters
+ with self.subTest(delay=1):
+ dut = DUT(n_masters)
+ checker = AXILiteChecker(response_latency=lambda: 3)
+ generators = [generator(i, master, delay=1) for i, master in enumerate(dut.masters)]
+ generators += [timeout(300), checker.handler(dut.slave)]
+ run_simulation(dut, generators, vcd_name='sim.vcd')
+ order = [0, 100, 200, 1, 101, 201, 2, 102, 202, 3, 103, 203]
+ self.assertEqual([addr for addr, data, strb in checker.writes], order)
+ self.assertEqual([addr for addr, data in checker.reads], order)