"""AXI4 Full/Lite support for LiteX"""
from migen import *
+from migen.genlib import roundrobin
+from migen.genlib.misc import split, displacer, chooser, WaitTimer
from litex.soc.interconnect import stream
from litex.build.generic_platform import *
+from litex.soc.interconnect import csr_bus
+
# AXI Definition -----------------------------------------------------------------------------------
BURST_FIXED = 0b00
r.extend(m.connect(s))
return r
+def _axi_layout_flat(axi):
+ # yields tuples (channel, name, direction)
+ def get_dir(channel, direction):
+ if channel in ["b", "r"]:
+ return {DIR_M_TO_S: DIR_S_TO_M, DIR_S_TO_M: DIR_M_TO_S}[direction]
+ return direction
+ for ch in ["aw", "w", "b", "ar", "r"]:
+ channel = getattr(axi, ch)
+ for group in channel.layout:
+ if len(group) == 3:
+ name, _, direction = group
+ yield ch, name, get_dir(ch, direction)
+ else:
+ _, subgroups = group
+ for subgroup in subgroups:
+ name, _, direction = subgroup
+ yield ch, name, get_dir(ch, direction)
+
class AXIInterface:
def __init__(self, data_width=32, address_width=32, id_width=1, clock_domain="sys"):
self.data_width = data_width
def connect(self, slave):
return _connect_axi(self, slave)
+ def layout_flat(self):
+ return list(_axi_layout_flat(self))
+
# AXI Lite Definition ------------------------------------------------------------------------------
def ax_lite_description(address_width):
def connect(self, slave):
return _connect_axi(self, slave)
+ def layout_flat(self):
+ return list(_axi_layout_flat(self))
+
def write(self, addr, data, strb=None):
if strb is None:
strb = 2**len(self.w.strb) - 1
return fsm, comb
class AXILite2CSR(Module):
- def __init__(self, axi_lite=None, csr=None):
+ def __init__(self, axi_lite=None, bus_csr=None):
if axi_lite is None:
axi_lite = AXILiteInterface()
- if csr is None:
- csr = csr.bus.Interface()
+ if bus_csr is None:
+ bus_csr = csr_bus.Interface()
self.axi_lite = axi_lite
- self.csr = csr
+ self.csr = bus_csr
fsm, comb = axi_lite_to_simple(self.axi_lite,
port_adr=self.csr.adr, port_dat_r=self.csr.dat_r,
raise NotImplementedError("AXILiteUpConverter")
else:
self.comb += master.connect(slave)
+
+# AXILite Timeout ----------------------------------------------------------------------------------
+
+class AXILiteTimeout(Module):
+ """Protect master against slave timeouts (master _has_ to respond correctly)"""
+ def __init__(self, master, cycles):
+ self.error = Signal()
+
+ # # #
+
+ timer = WaitTimer(int(cycles))
+ self.submodules += timer
+ is_write = Signal()
+ is_read = Signal()
+
+ self.submodules.fsm = fsm = FSM()
+ fsm.act("WAIT",
+ is_write.eq((master.aw.valid & ~master.aw.ready) | (master.w.valid & ~master.w.ready)),
+ is_read.eq(master.ar.valid & ~master.ar.ready),
+ timer.wait.eq(is_write | is_read),
+ # done is updated in `sync`, so we must make sure that `ready` has not been issued
+ # by slave during that single cycle, by checking `timer.wait`
+ If(timer.done & timer.wait,
+ self.error.eq(1),
+ If(is_write,
+ NextState("RESPOND-WRITE")
+ ).Else(
+ NextState("RESPOND-READ")
+ )
+ )
+ )
+ fsm.act("RESPOND-WRITE",
+ master.aw.ready.eq(master.aw.valid),
+ master.w.ready.eq(master.w.valid),
+ master.b.valid.eq(~master.aw.valid & ~master.w.valid),
+ master.b.resp.eq(RESP_SLVERR),
+ If(master.b.valid & master.b.ready,
+ NextState("WAIT")
+ )
+ )
+ fsm.act("RESPOND-READ",
+ master.ar.ready.eq(master.ar.valid),
+ master.r.valid.eq(~master.ar.valid),
+ master.r.resp.eq(RESP_SLVERR),
+ master.r.data.eq(2**len(master.r.data) - 1),
+ If(master.r.valid & master.r.ready,
+ NextState("WAIT")
+ )
+ )
+
+# AXILite Interconnect -----------------------------------------------------------------------------
+
+class AXILiteInterconnectPointToPoint(Module):
+ def __init__(self, master, slave):
+ self.comb += master.connect(slave)
+
def __init__(self, latency=None, rdata_generator=None):
self.latency = latency or (lambda: 0)
self.rdata_generator = rdata_generator or (lambda adr: 0xbaadc0de)
- self.writes = []
- self.reads = []
+ self.writes = [] # (addr, data, strb)
+ self.reads = [] # (addr, data)
def delay(self):
for _ in range(self.latency()):
dut = DUT(width_from=width_from, width_to=width_to)
checker = AXILiteChecker(latency, rdata_generator)
- run_simulation(dut, [generator(dut.master), checker.handler(dut.slave)], vcd_name='sim.vcd')
+ run_simulation(dut, [generator(dut.master), checker.handler(dut.slave)])
self.assertEqual(checker.writes, write_expected)
self.assertEqual(checker.reads, read_expected)
]
self.converter_test(width_from=32, width_to=16,
write_pattern=write_pattern, write_expected=write_expected)
+
+ def axilite_pattern_generator(self, axi_lite, pattern):
+ for rw, addr, data in pattern:
+ assert rw in ["w", "r"]
+ if rw == "w":
+ resp = (yield from axi_lite.write(addr, data, 2**len(axi_lite.w.strb) - 1))
+ self.assertEqual(resp, RESP_OKAY)
+ else:
+ rdata, resp = (yield from axi_lite.read(addr))
+ self.assertEqual(resp, RESP_OKAY)
+ self.assertEqual(rdata, data)
+ for _ in range(16):
+ yield
+
+ def test_axilite_interconnect_p2p(self):
+ class DUT(Module):
+ def __init__(self):
+ self.master = master = AXILiteInterface()
+ self.slave = slave = AXILiteInterface()
+ self.submodules.interconnect = AXILiteInterconnectPointToPoint(master, slave)
+
+ pattern = [
+ ("w", 0x00000004, 0x11111111),
+ ("w", 0x0000000c, 0x22222222),
+ ("r", 0x00000010, 0x33333333),
+ ("r", 0x00000018, 0x44444444),
+ ]
+
+ def rdata_generator(adr):
+ for rw, a, v in pattern:
+ if rw == "r" and a == adr:
+ return v
+ return 0xbaadc0de
+
+ dut = DUT()
+ checker = AXILiteChecker(rdata_generator=rdata_generator)
+ generators = [
+ self.axilite_pattern_generator(dut.master, pattern),
+ checker.handler(dut.slave),
+ ]
+ run_simulation(dut, generators)
+ self.assertEqual(checker.writes, [(addr, data, 0b1111) for rw, addr, data in pattern if rw == "w"])
+ self.assertEqual(checker.reads, [(addr, data) for rw, addr, data in pattern if rw == "r"])
+
+ def test_axilite_timeout(self):
+ class DUT(Module):
+ def __init__(self):
+ self.master = master = AXILiteInterface()
+ self.slave = slave = AXILiteInterface()
+ self.submodules.interconnect = AXILiteInterconnectPointToPoint(master, slave)
+ self.submodules.timeout = AXILiteTimeout(master, 16)
+
+ @passive
+ def timeout(ticks):
+ for _ in range(ticks):
+ yield
+ raise TimeoutError("Timeout after %d ticks" % ticks)
+
+ def generator(axi_lite):
+ resp = (yield from axi_lite.write(0x00001000, 0x11111111))
+ self.assertEqual(resp, RESP_OKAY)
+ resp = (yield from axi_lite.write(0x00002000, 0x22222222))
+ self.assertEqual(resp, RESP_SLVERR)
+ data, resp = (yield from axi_lite.read(0x00003000))
+ self.assertEqual(resp, RESP_SLVERR)
+ self.assertEqual(data, 0xffffffff)
+ yield
+
+ def checker(axi_lite):
+ for _ in range(16):
+ yield
+ yield axi_lite.aw.ready.eq(1)
+ yield axi_lite.w.ready.eq(1)
+ yield
+ yield axi_lite.aw.ready.eq(0)
+ yield axi_lite.w.ready.eq(0)
+ yield axi_lite.b.valid.eq(1)
+ yield
+ while not (yield axi_lite.b.ready):
+ yield
+ yield axi_lite.b.valid.eq(0)
+
+ dut = DUT()
+ generators = [
+ generator(dut.master),
+ checker(dut.slave),
+ timeout(300),
+ ]
+ run_simulation(dut, generators, vcd_name='sim.vcd')