From: Jędrzej Boczar Date: Fri, 17 Jul 2020 14:48:46 +0000 (+0200) Subject: soc/interconnect/axi: point-to-point interconnect and timeout module with tests X-Git-Tag: 24jan2021_ls180~64^2~10 X-Git-Url: https://git.libre-soc.org/?a=commitdiff_plain;h=f47ccdae99b657143a84d9d1d48346eaf2aca54e;p=litex.git soc/interconnect/axi: point-to-point interconnect and timeout module with tests --- diff --git a/litex/soc/interconnect/axi.py b/litex/soc/interconnect/axi.py index 75b992ef..dc8a32b3 100644 --- a/litex/soc/interconnect/axi.py +++ b/litex/soc/interconnect/axi.py @@ -5,10 +5,14 @@ """AXI4 Full/Lite support for LiteX""" from migen import * +from migen.genlib import roundrobin +from migen.genlib.misc import split, displacer, chooser, WaitTimer from litex.soc.interconnect import stream from litex.build.generic_platform import * +from litex.soc.interconnect import csr_bus + # AXI Definition ----------------------------------------------------------------------------------- BURST_FIXED = 0b00 @@ -72,6 +76,24 @@ def _connect_axi(master, slave): r.extend(m.connect(s)) return r +def _axi_layout_flat(axi): + # yields tuples (channel, name, direction) + def get_dir(channel, direction): + if channel in ["b", "r"]: + return {DIR_M_TO_S: DIR_S_TO_M, DIR_S_TO_M: DIR_M_TO_S}[direction] + return direction + for ch in ["aw", "w", "b", "ar", "r"]: + channel = getattr(axi, ch) + for group in channel.layout: + if len(group) == 3: + name, _, direction = group + yield ch, name, get_dir(ch, direction) + else: + _, subgroups = group + for subgroup in subgroups: + name, _, direction = subgroup + yield ch, name, get_dir(ch, direction) + class AXIInterface: def __init__(self, data_width=32, address_width=32, id_width=1, clock_domain="sys"): self.data_width = data_width @@ -88,6 +110,9 @@ class AXIInterface: def connect(self, slave): return _connect_axi(self, slave) + def layout_flat(self): + return list(_axi_layout_flat(self)) + # AXI Lite Definition ------------------------------------------------------------------------------ def ax_lite_description(address_width): @@ -161,6 +186,9 @@ class AXILiteInterface: def connect(self, slave): return _connect_axi(self, slave) + def layout_flat(self): + return list(_axi_layout_flat(self)) + def write(self, addr, data, strb=None): if strb is None: strb = 2**len(self.w.strb) - 1 @@ -621,14 +649,14 @@ def axi_lite_to_simple(axi_lite, port_adr, port_dat_r, port_dat_w=None, port_we= return fsm, comb class AXILite2CSR(Module): - def __init__(self, axi_lite=None, csr=None): + def __init__(self, axi_lite=None, bus_csr=None): if axi_lite is None: axi_lite = AXILiteInterface() - if csr is None: - csr = csr.bus.Interface() + if bus_csr is None: + bus_csr = csr_bus.Interface() self.axi_lite = axi_lite - self.csr = csr + self.csr = bus_csr fsm, comb = axi_lite_to_simple(self.axi_lite, port_adr=self.csr.adr, port_dat_r=self.csr.dat_r, @@ -852,3 +880,59 @@ class AXILiteConverter(Module): raise NotImplementedError("AXILiteUpConverter") else: self.comb += master.connect(slave) + +# AXILite Timeout ---------------------------------------------------------------------------------- + +class AXILiteTimeout(Module): + """Protect master against slave timeouts (master _has_ to respond correctly)""" + def __init__(self, master, cycles): + self.error = Signal() + + # # # + + timer = WaitTimer(int(cycles)) + self.submodules += timer + is_write = Signal() + is_read = Signal() + + self.submodules.fsm = fsm = FSM() + fsm.act("WAIT", + is_write.eq((master.aw.valid & ~master.aw.ready) | (master.w.valid & ~master.w.ready)), + is_read.eq(master.ar.valid & ~master.ar.ready), + timer.wait.eq(is_write | is_read), + # done is updated in `sync`, so we must make sure that `ready` has not been issued + # by slave during that single cycle, by checking `timer.wait` + If(timer.done & timer.wait, + self.error.eq(1), + If(is_write, + NextState("RESPOND-WRITE") + ).Else( + NextState("RESPOND-READ") + ) + ) + ) + fsm.act("RESPOND-WRITE", + master.aw.ready.eq(master.aw.valid), + master.w.ready.eq(master.w.valid), + master.b.valid.eq(~master.aw.valid & ~master.w.valid), + master.b.resp.eq(RESP_SLVERR), + If(master.b.valid & master.b.ready, + NextState("WAIT") + ) + ) + fsm.act("RESPOND-READ", + master.ar.ready.eq(master.ar.valid), + master.r.valid.eq(~master.ar.valid), + master.r.resp.eq(RESP_SLVERR), + master.r.data.eq(2**len(master.r.data) - 1), + If(master.r.valid & master.r.ready, + NextState("WAIT") + ) + ) + +# AXILite Interconnect ----------------------------------------------------------------------------- + +class AXILiteInterconnectPointToPoint(Module): + def __init__(self, master, slave): + self.comb += master.connect(slave) + diff --git a/test/test_axi.py b/test/test_axi.py index 84b2b52e..6a6059e7 100644 --- a/test/test_axi.py +++ b/test/test_axi.py @@ -333,8 +333,8 @@ class AXILiteChecker: def __init__(self, latency=None, rdata_generator=None): self.latency = latency or (lambda: 0) self.rdata_generator = rdata_generator or (lambda adr: 0xbaadc0de) - self.writes = [] - self.reads = [] + self.writes = [] # (addr, data, strb) + self.reads = [] # (addr, data) def delay(self): for _ in range(self.latency()): @@ -555,7 +555,7 @@ class TestAXILite(unittest.TestCase): dut = DUT(width_from=width_from, width_to=width_to) checker = AXILiteChecker(latency, rdata_generator) - run_simulation(dut, [generator(dut.master), checker.handler(dut.slave)], vcd_name='sim.vcd') + run_simulation(dut, [generator(dut.master), checker.handler(dut.slave)]) self.assertEqual(checker.writes, write_expected) self.assertEqual(checker.reads, read_expected) @@ -637,3 +637,92 @@ class TestAXILite(unittest.TestCase): ] self.converter_test(width_from=32, width_to=16, write_pattern=write_pattern, write_expected=write_expected) + + def axilite_pattern_generator(self, axi_lite, pattern): + for rw, addr, data in pattern: + assert rw in ["w", "r"] + if rw == "w": + resp = (yield from axi_lite.write(addr, data, 2**len(axi_lite.w.strb) - 1)) + self.assertEqual(resp, RESP_OKAY) + else: + rdata, resp = (yield from axi_lite.read(addr)) + self.assertEqual(resp, RESP_OKAY) + self.assertEqual(rdata, data) + for _ in range(16): + yield + + def test_axilite_interconnect_p2p(self): + class DUT(Module): + def __init__(self): + self.master = master = AXILiteInterface() + self.slave = slave = AXILiteInterface() + self.submodules.interconnect = AXILiteInterconnectPointToPoint(master, slave) + + pattern = [ + ("w", 0x00000004, 0x11111111), + ("w", 0x0000000c, 0x22222222), + ("r", 0x00000010, 0x33333333), + ("r", 0x00000018, 0x44444444), + ] + + def rdata_generator(adr): + for rw, a, v in pattern: + if rw == "r" and a == adr: + return v + return 0xbaadc0de + + dut = DUT() + checker = AXILiteChecker(rdata_generator=rdata_generator) + generators = [ + self.axilite_pattern_generator(dut.master, pattern), + checker.handler(dut.slave), + ] + run_simulation(dut, generators) + self.assertEqual(checker.writes, [(addr, data, 0b1111) for rw, addr, data in pattern if rw == "w"]) + self.assertEqual(checker.reads, [(addr, data) for rw, addr, data in pattern if rw == "r"]) + + def test_axilite_timeout(self): + class DUT(Module): + def __init__(self): + self.master = master = AXILiteInterface() + self.slave = slave = AXILiteInterface() + self.submodules.interconnect = AXILiteInterconnectPointToPoint(master, slave) + self.submodules.timeout = AXILiteTimeout(master, 16) + + @passive + def timeout(ticks): + for _ in range(ticks): + yield + raise TimeoutError("Timeout after %d ticks" % ticks) + + def generator(axi_lite): + resp = (yield from axi_lite.write(0x00001000, 0x11111111)) + self.assertEqual(resp, RESP_OKAY) + resp = (yield from axi_lite.write(0x00002000, 0x22222222)) + self.assertEqual(resp, RESP_SLVERR) + data, resp = (yield from axi_lite.read(0x00003000)) + self.assertEqual(resp, RESP_SLVERR) + self.assertEqual(data, 0xffffffff) + yield + + def checker(axi_lite): + for _ in range(16): + yield + yield axi_lite.aw.ready.eq(1) + yield axi_lite.w.ready.eq(1) + yield + yield axi_lite.aw.ready.eq(0) + yield axi_lite.w.ready.eq(0) + yield axi_lite.b.valid.eq(1) + yield + while not (yield axi_lite.b.ready): + yield + yield axi_lite.b.valid.eq(0) + + dut = DUT() + generators = [ + generator(dut.master), + checker(dut.slave), + timeout(300), + ] + run_simulation(dut, generators, vcd_name='sim.vcd')