yield
while not (yield self.aw.ready):
yield
+ yield self.aw.valid.eq(0)
while not (yield self.w.ready):
yield
+ yield self.w.valid.eq(0)
+ yield self.b.ready.eq(1)
while not (yield self.b.valid):
yield
- yield self.b.ready.eq(1)
resp = (yield self.b.resp)
- yield
yield self.b.ready.eq(0)
return resp
yield
while not (yield self.ar.ready):
yield
+ yield self.ar.valid.eq(0)
+ yield self.r.ready.eq(1)
while not (yield self.r.valid):
yield
- yield self.r.ready.eq(1)
data = (yield self.r.data)
resp = (yield self.r.resp)
- yield
yield self.r.ready.eq(0)
return (data, resp)
# AXILite Data Width Converter ---------------------------------------------------------------------
+class AXILiteDownConverter(Module):
+ def __init__(self, master, slave):
+ assert isinstance(master, AXILiteInterface) and isinstance(slave, AXILiteInterface)
+ dw_from = len(master.r.data)
+ dw_to = len(slave.r.data)
+ ratio = dw_from//dw_to
+
+ # # #
+
+ skip = Signal()
+ counter = Signal(max=ratio)
+ do_read = Signal()
+ do_write = Signal()
+ last_was_read = Signal()
+ aw_ready = Signal()
+ w_ready = Signal()
+
+ # Slave address counter
+ master_align = log2_int(master.data_width//8)
+ slave_align = log2_int(slave.data_width//8)
+ addr_counter = Signal(master_align)
+ self.comb += addr_counter[slave_align:].eq(counter)
+
+ # Write path
+ self.comb += [
+ slave.aw.addr.eq(Cat(addr_counter, master.aw.addr[master_align:])),
+ Case(counter, {i: slave.w.data.eq(master.w.data[i*dw_to:]) for i in range(ratio)}),
+ Case(counter, {i: slave.w.strb.eq(master.w.strb[i*dw_to//8:]) for i in range(ratio)}),
+ master.b.resp.eq(RESP_OKAY), # FIXME: error handling?
+ ]
+
+ # Read path
+ # shift the data word
+ r_data = Signal(dw_from, reset_less=True)
+ self.sync += If(slave.r.ready, r_data.eq(master.r.data))
+ self.comb += master.r.data.eq(Cat(r_data[dw_to:], slave.r.data))
+ # address, resp
+ self.comb += [
+ slave.ar.addr.eq(Cat(addr_counter, master.ar.addr[master_align:])),
+ master.r.resp.eq(RESP_OKAY), # FIXME: error handling?
+ ]
+
+ # Control Path
+ fsm = FSM(reset_state="IDLE")
+ fsm = ResetInserter()(fsm)
+ self.submodules.fsm = fsm
+ self.comb += fsm.reset.eq(~(master.aw.valid | master.ar.valid))
+
+ fsm.act("IDLE",
+ NextValue(counter, 0),
+ # If the last access was a read, do a write, and vice versa
+ If(master.aw.valid & master.ar.valid,
+ do_write.eq(last_was_read),
+ do_read.eq(~last_was_read),
+ ).Else(
+ do_write.eq(master.aw.valid),
+ do_read.eq(master.ar.valid),
+ ),
+ # Start reading/writing immediately not to waste a cycle
+ If(do_write & master.w.valid,
+ NextValue(last_was_read, 0),
+ NextState("WRITE")
+ ).Elif(do_read,
+ NextValue(last_was_read, 1),
+ NextState("READ")
+ )
+ )
+
+ # Write conversion
+ fsm.act("WRITE",
+ skip.eq(slave.w.strb == 0),
+ slave.aw.valid.eq(~skip & ~aw_ready),
+ slave.w.valid.eq(~skip & ~w_ready),
+ If(slave.aw.ready,
+ NextValue(aw_ready, 1)
+ ),
+ If(slave.w.ready,
+ NextValue(w_ready, 1)
+ ),
+ # When skipping, we just increment the counter
+ If(skip,
+ NextValue(counter, counter + 1),
+ # Corner-case: when the last word is being skipped, we must send the response
+ If(counter == (ratio - 1),
+ master.aw.ready.eq(1),
+ master.w.ready.eq(1),
+ NextState("WRITE-RESPONSE-MASTER")
+ )
+ # Write current word and wait for write response
+ ).Elif((slave.aw.ready | aw_ready) & (slave.w.ready | w_ready),
+ NextState("WRITE-RESPONSE-SLAVE")
+ )
+ )
+ fsm.act("WRITE-RESPONSE-SLAVE",
+ NextValue(aw_ready, 0),
+ NextValue(w_ready, 0),
+ If(slave.b.valid,
+ slave.b.ready.eq(1),
+ If(counter == (ratio - 1),
+ master.aw.ready.eq(1),
+ master.w.ready.eq(1),
+ NextState("WRITE-RESPONSE-MASTER")
+ ).Else(
+ NextValue(counter, counter + 1),
+ NextState("WRITE")
+ )
+ )
+ )
+ fsm.act("WRITE-RESPONSE-MASTER",
+ NextValue(aw_ready, 0),
+ NextValue(w_ready, 0),
+ master.b.valid.eq(1),
+ If(master.b.ready,
+ NextState("IDLE")
+ )
+ )
+
+ # Read conversion
+ fsm.act("READ",
+ slave.ar.valid.eq(1),
+ If(slave.ar.ready,
+ NextState("READ-RESPONSE-SLAVE")
+ )
+ )
+ fsm.act("READ-RESPONSE-SLAVE",
+ If(slave.r.valid,
+ # On last word acknowledge ar and hold slave.r.valid until we get master.r.ready
+ If(counter == (ratio - 1),
+ master.ar.ready.eq(1),
+ NextState("READ-RESPONSE-MASTER")
+ # Acknowledge the response and continue conversion
+ ).Else(
+ slave.r.ready.eq(1),
+ NextValue(counter, counter + 1),
+ NextState("READ")
+ )
+ )
+ )
+ fsm.act("READ-RESPONSE-MASTER",
+ master.r.valid.eq(1),
+ If(master.r.ready,
+ slave.r.ready.eq(1),
+ NextState("IDLE")
+ )
+ )
+
class AXILiteConverter(Module):
"""AXILite data width converter"""
def __init__(self, master, slave):
dw_from = len(master.r.data)
dw_to = len(slave.r.data)
if dw_from > dw_to:
- raise NotImplementedError
+ print("AXILiteConverter (Down): {} -> {}".format(master.data_width, slave.data_width))
+ self.submodules += AXILiteDownConverter(master, slave)
elif dw_from < dw_to:
+ print("AXILiteConverter (Up): {} -> {}".format(master.data_width, slave.data_width))
raise NotImplementedError
else:
self.comb += master.connect(slave)
class Read(Access):
pass
-# Tests --------------------------------------------------------------------------------------------
+# TestAXI ------------------------------------------------------------------------------------------
class TestAXI(unittest.TestCase):
def test_burst2beat(self):
r_ready_random = 90
)
+# TestAXILite --------------------------------------------------------------------------------------
+
+class AXILiteChecker:
+ def __init__(self, latency=None, rdata_generator=None):
+ self.latency = latency or (lambda: 0)
+ self.rdata_generator = rdata_generator or (lambda adr: 0xbaadc0de)
+ self.writes = []
+ self.reads = []
+
+ def delay(self):
+ for _ in range(self.latency()):
+ yield
+
+ def handle_write(self, axi_lite):
+ while not (yield axi_lite.aw.valid):
+ yield
+ yield from self.delay()
+ addr = (yield axi_lite.aw.addr)
+ yield axi_lite.aw.ready.eq(1)
+ yield
+ yield axi_lite.aw.ready.eq(0)
+ while not (yield axi_lite.w.valid):
+ yield
+ yield from self.delay()
+ data = (yield axi_lite.w.data)
+ strb = (yield axi_lite.w.strb)
+ yield axi_lite.w.ready.eq(1)
+ yield
+ yield axi_lite.w.ready.eq(0)
+ yield axi_lite.b.valid.eq(1)
+ yield axi_lite.b.resp.eq(RESP_OKAY)
+ yield
+ while not (yield axi_lite.b.ready):
+ yield
+ yield axi_lite.b.valid.eq(0)
+ self.writes.append((addr, data, strb))
+
+ def handle_read(self, axi_lite):
+ while not (yield axi_lite.ar.valid):
+ yield
+ yield from self.delay()
+ addr = (yield axi_lite.ar.addr)
+ yield axi_lite.ar.ready.eq(1)
+ yield
+ yield axi_lite.ar.ready.eq(0)
+ data = self.rdata_generator(addr)
+ yield axi_lite.r.valid.eq(1)
+ yield axi_lite.r.resp.eq(RESP_OKAY)
+ yield axi_lite.r.data.eq(data)
+ yield
+ while not (yield axi_lite.r.ready):
+ yield
+ yield axi_lite.r.valid.eq(0)
+ self.reads.append((addr, data))
+
+ @passive
+ def handler(self, axi_lite):
+ while True:
+ if (yield axi_lite.aw.valid):
+ yield from self.handle_write(axi_lite)
+ if (yield axi_lite.ar.valid):
+ yield from self.handle_read(axi_lite)
+ yield
+
+class TestAXILite(unittest.TestCase):
def test_wishbone2axi2wishbone(self):
class DUT(Module):
def __init__(self):
dut = DUT(size=len(init)*4, init=[v for v in init])
run_simulation(dut, [generator(dut, init)])
self.assertEqual(dut.errors, 0)
+
+ def converter_test(self, width_from, width_to,
+ write_pattern=None, write_expected=None,
+ read_pattern=None, read_expected=None):
+ assert not (write_pattern is None and read_pattern is None)
+
+ if write_pattern is None:
+ write_pattern = []
+ write_expected = []
+ elif len(write_pattern[0]) == 2:
+ # add w.strb
+ write_pattern = [(adr, data, 2**(width_from//8)-1) for adr, data in write_pattern]
+
+ if read_pattern is None:
+ read_pattern = []
+ read_expected = []
+
+ class DUT(Module):
+ def __init__(self, width_from, width_to):
+ self.master = AXILiteInterface(data_width=width_from)
+ self.slave = AXILiteInterface(data_width=width_to)
+ self.submodules.converter = AXILiteConverter(self.master, self.slave)
+
+ def generator(axi_lite):
+ for addr, data, strb in write_pattern or []:
+ resp = (yield from axi_lite.write(addr, data, strb))
+ self.assertEqual(resp, RESP_OKAY)
+ for _ in range(16):
+ yield
+
+ for addr, refdata in read_pattern or []:
+ data, resp = (yield from axi_lite.read(addr))
+ self.assertEqual(resp, RESP_OKAY)
+ self.assertEqual(data, refdata)
+ for _ in range(4):
+ yield
+
+ def rdata_generator(adr):
+ for a, v in read_expected:
+ if a == adr:
+ return v
+ return 0xbaadc0de
+
+ _latency = 0
+ def latency():
+ nonlocal _latency
+ _latency = (_latency + 1) % 3
+ return _latency
+
+ dut = DUT(width_from=width_from, width_to=width_to)
+ checker = AXILiteChecker(latency, rdata_generator)
+ run_simulation(dut, [generator(dut.master), checker.handler(dut.slave)], vcd_name='sim.vcd')
+ self.assertEqual(checker.writes, write_expected)
+ self.assertEqual(checker.reads, read_expected)
+
+ def test_axilite_down_converter_32to16(self):
+ write_pattern = [
+ (0x00000000, 0x22221111),
+ (0x00000004, 0x44443333),
+ (0x00000008, 0x66665555),
+ (0x00000100, 0x88887777),
+ ]
+ write_expected = [
+ (0x00000000, 0x1111, 0b11),
+ (0x00000002, 0x2222, 0b11),
+ (0x00000004, 0x3333, 0b11),
+ (0x00000006, 0x4444, 0b11),
+ (0x00000008, 0x5555, 0b11),
+ (0x0000000a, 0x6666, 0b11),
+ (0x00000100, 0x7777, 0b11),
+ (0x00000102, 0x8888, 0b11),
+ ]
+ read_pattern = write_pattern
+ read_expected = [(adr, data) for (adr, data, _) in write_expected]
+ self.converter_test(width_from=32, width_to=16,
+ write_pattern=write_pattern, write_expected=write_expected,
+ read_pattern=read_pattern, read_expected=read_expected)
+
+ def test_axilite_down_converter_32to8(self):
+ write_pattern = [
+ (0x00000000, 0x44332211),
+ (0x00000004, 0x88776655),
+ ]
+ write_expected = [
+ (0x00000000, 0x11, 0b1),
+ (0x00000001, 0x22, 0b1),
+ (0x00000002, 0x33, 0b1),
+ (0x00000003, 0x44, 0b1),
+ (0x00000004, 0x55, 0b1),
+ (0x00000005, 0x66, 0b1),
+ (0x00000006, 0x77, 0b1),
+ (0x00000007, 0x88, 0b1),
+ ]
+ read_pattern = write_pattern
+ read_expected = [(adr, data) for (adr, data, _) in write_expected]
+ self.converter_test(width_from=32, width_to=8,
+ write_pattern=write_pattern, write_expected=write_expected,
+ read_pattern=read_pattern, read_expected=read_expected)
+
+ def test_axilite_down_converter_64to32(self):
+ write_pattern = [
+ (0x00000000, 0x2222222211111111),
+ (0x00000008, 0x4444444433333333),
+ ]
+ write_expected = [
+ (0x00000000, 0x11111111, 0b1111),
+ (0x00000004, 0x22222222, 0b1111),
+ (0x00000008, 0x33333333, 0b1111),
+ (0x0000000c, 0x44444444, 0b1111),
+ ]
+ read_pattern = write_pattern
+ read_expected = [(adr, data) for (adr, data, _) in write_expected]
+ self.converter_test(width_from=64, width_to=32,
+ write_pattern=write_pattern, write_expected=write_expected,
+ read_pattern=read_pattern, read_expected=read_expected)
+
+ def test_axilite_down_converter_strb(self):
+ write_pattern = [
+ (0x00000000, 0x22221111, 0b1100),
+ (0x00000004, 0x44443333, 0b1111),
+ (0x00000008, 0x66665555, 0b1011),
+ (0x00000100, 0x88887777, 0b0011),
+ ]
+ write_expected = [
+ (0x00000002, 0x2222, 0b11),
+ (0x00000004, 0x3333, 0b11),
+ (0x00000006, 0x4444, 0b11),
+ (0x00000008, 0x5555, 0b11),
+ (0x0000000a, 0x6666, 0b10),
+ (0x00000100, 0x7777, 0b11),
+ ]
+ self.converter_test(width_from=32, width_to=16,
+ write_pattern=write_pattern, write_expected=write_expected)