# AXILite Data Width Converter ---------------------------------------------------------------------
-class AXILiteDownConverter(Module):
+class _AXILiteDownConverterWrite(Module):
def __init__(self, master, slave):
assert isinstance(master, AXILiteInterface) and isinstance(slave, AXILiteInterface)
- dw_from = len(master.r.data)
- dw_to = len(slave.r.data)
- ratio = dw_from//dw_to
+ dw_from = len(master.w.data)
+ dw_to = len(slave.w.data)
+ ratio = dw_from//dw_to
+ master_align = log2_int(master.data_width//8)
+ slave_align = log2_int(slave.data_width//8)
- # # #
+ skip = Signal()
+ counter = Signal(max=ratio)
+ aw_ready = Signal()
+ w_ready = Signal()
+ resp = Signal.like(master.b.resp)
+ addr_counter = Signal(master_align)
- skip = Signal()
- counter = Signal(max=ratio)
- do_read = Signal()
- do_write = Signal()
- last_was_read = Signal()
- aw_ready = Signal()
- w_ready = Signal()
- resp = Signal.like(master.b.resp)
+ # # #
# Slave address counter
- master_align = log2_int(master.data_width//8)
- slave_align = log2_int(slave.data_width//8)
- addr_counter = Signal(master_align)
self.comb += addr_counter[slave_align:].eq(counter)
- # Write path
+ # Data path
self.comb += [
slave.aw.addr.eq(Cat(addr_counter, master.aw.addr[master_align:])),
Case(counter, {i: slave.w.data.eq(master.w.data[i*dw_to:]) for i in range(ratio)}),
master.b.resp.eq(resp),
]
- # Read path
- # shift the data word
- r_data = Signal(dw_from, reset_less=True)
- self.sync += If(slave.r.ready, r_data.eq(master.r.data))
- self.comb += master.r.data.eq(Cat(r_data[dw_to:], slave.r.data))
- # address, resp
- self.comb += [
- slave.ar.addr.eq(Cat(addr_counter, master.ar.addr[master_align:])),
- master.r.resp.eq(resp),
- ]
-
# Control Path
fsm = FSM(reset_state="IDLE")
fsm = ResetInserter()(fsm)
self.submodules.fsm = fsm
- self.comb += fsm.reset.eq(~(master.aw.valid | master.ar.valid))
+ # Reset the converter state if master breaks a request, we can do that as
+ # aw.valid and w.valid are kept high in CONVERT and RESPOND-SLAVE, and
+ # acknowledged only when moving to RESPOND-MASTER, and then b.valid is 1
+ self.comb += fsm.reset.eq(~((master.aw.valid | master.w.valid) | master.b.valid))
fsm.act("IDLE",
NextValue(counter, 0),
NextValue(resp, RESP_OKAY),
- # If the last access was a read, do a write, and vice versa
- If(master.aw.valid & master.ar.valid,
- do_write.eq(last_was_read),
- do_read.eq(~last_was_read),
- ).Else(
- do_write.eq(master.aw.valid),
- do_read.eq(master.ar.valid),
- ),
- # Start reading/writing immediately not to waste a cycle
- If(do_write & master.w.valid,
- NextValue(last_was_read, 0),
- NextState("WRITE")
- ).Elif(do_read,
- NextValue(last_was_read, 1),
- NextState("READ")
+ If(master.aw.valid & master.w.valid,
+ NextState("CONVERT")
)
)
-
- # Write conversion
- fsm.act("WRITE",
+ fsm.act("CONVERT",
skip.eq(slave.w.strb == 0),
slave.aw.valid.eq(~skip & ~aw_ready),
slave.w.valid.eq(~skip & ~w_ready),
If(counter == (ratio - 1),
master.aw.ready.eq(1),
master.w.ready.eq(1),
- NextState("WRITE-RESPONSE-MASTER")
+ NextState("RESPOND-MASTER")
)
# Write current word and wait for write response
).Elif((slave.aw.ready | aw_ready) & (slave.w.ready | w_ready),
- NextState("WRITE-RESPONSE-SLAVE")
+ NextState("RESPOND-SLAVE")
)
)
- fsm.act("WRITE-RESPONSE-SLAVE",
+ fsm.act("RESPOND-SLAVE",
NextValue(aw_ready, 0),
NextValue(w_ready, 0),
If(slave.b.valid,
slave.b.ready.eq(1),
- # Any errors is sticky, so the first one is always sent
+ # Errors are sticky, so the first one is always sent
If((resp == RESP_OKAY) & (slave.b.resp != RESP_OKAY),
NextValue(resp, slave.b.resp)
),
If(counter == (ratio - 1),
master.aw.ready.eq(1),
master.w.ready.eq(1),
- NextState("WRITE-RESPONSE-MASTER")
+ NextState("RESPOND-MASTER")
).Else(
NextValue(counter, counter + 1),
- NextState("WRITE")
+ NextState("CONVERT")
)
)
)
- fsm.act("WRITE-RESPONSE-MASTER",
+ fsm.act("RESPOND-MASTER",
NextValue(aw_ready, 0),
NextValue(w_ready, 0),
master.b.valid.eq(1),
)
)
- # Read conversion
- fsm.act("READ",
+class _AXILiteDownConverterRead(Module):
+ def __init__(self, master, slave):
+ assert isinstance(master, AXILiteInterface) and isinstance(slave, AXILiteInterface)
+ dw_from = len(master.r.data)
+ dw_to = len(slave.r.data)
+ ratio = dw_from//dw_to
+ master_align = log2_int(master.data_width//8)
+ slave_align = log2_int(slave.data_width//8)
+
+ skip = Signal()
+ counter = Signal(max=ratio)
+ resp = Signal.like(master.r.resp)
+ addr_counter = Signal(master_align)
+
+ # # #
+
+ # Slave address counter
+ self.comb += addr_counter[slave_align:].eq(counter)
+
+ # Data path
+ # shift the data word
+ r_data = Signal(dw_from, reset_less=True)
+ self.sync += If(slave.r.ready, r_data.eq(master.r.data))
+ self.comb += master.r.data.eq(Cat(r_data[dw_to:], slave.r.data))
+ # address, resp
+ self.comb += [
+ slave.ar.addr.eq(Cat(addr_counter, master.ar.addr[master_align:])),
+ master.r.resp.eq(resp),
+ ]
+
+ # Control Path
+ fsm = FSM(reset_state="IDLE")
+ fsm = ResetInserter()(fsm)
+ self.submodules.fsm = fsm
+ # Reset the converter state if master breaks a request, we can do that as
+ # ar.valid is high in CONVERT and RESPOND-SLAVE, and r.valid in RESPOND-MASTER
+ self.comb += fsm.reset.eq(~(master.ar.valid | master.r.valid))
+
+ fsm.act("IDLE",
+ NextValue(counter, 0),
+ NextValue(resp, RESP_OKAY),
+ If(master.ar.valid,
+ NextState("CONVERT")
+ )
+ )
+ fsm.act("CONVERT",
slave.ar.valid.eq(1),
If(slave.ar.ready,
- NextState("READ-RESPONSE-SLAVE")
+ NextState("RESPOND-SLAVE")
)
)
- fsm.act("READ-RESPONSE-SLAVE",
+ fsm.act("RESPOND-SLAVE",
If(slave.r.valid,
- # Any errors is sticky, so the first one is always sent
- If((resp == RESP_OKAY) & (slave.b.resp != RESP_OKAY),
- NextValue(resp, slave.b.resp)
+ # Errors are sticky, so the first one is always sent
+ If((resp == RESP_OKAY) & (slave.r.resp != RESP_OKAY),
+ NextValue(resp, slave.r.resp)
),
# On last word acknowledge ar and hold slave.r.valid until we get master.r.ready
If(counter == (ratio - 1),
master.ar.ready.eq(1),
- NextState("READ-RESPONSE-MASTER")
+ NextState("RESPOND-MASTER")
# Acknowledge the response and continue conversion
).Else(
slave.r.ready.eq(1),
NextValue(counter, counter + 1),
- NextState("READ")
+ NextState("CONVERT")
)
)
)
- fsm.act("READ-RESPONSE-MASTER",
+ fsm.act("RESPOND-MASTER",
master.r.valid.eq(1),
If(master.r.ready,
slave.r.ready.eq(1),
)
)
+class AXILiteDownConverter(Module):
+ def __init__(self, master, slave):
+ self.submodules.write = _AXILiteDownConverterWrite(master, slave)
+ self.submodules.read = _AXILiteDownConverterRead(master, slave)
+
class AXILiteConverter(Module):
"""AXILite data width converter"""
def __init__(self, master, slave):
# AXILite Interconnect -----------------------------------------------------------------------------
-class AXILiteInterconnectPointToPoint(Module):
- def __init__(self, master, slave):
- self.comb += master.connect(slave)
-
-
-class AXILiteRequestCounter(Module):
+class _AXILiteRequestCounter(Module):
def __init__(self, request, response, max_requests=256):
self.counter = counter = Signal(max=max_requests)
self.full = full = Signal()
),
]
+class AXILiteInterconnectPointToPoint(Module):
+ def __init__(self, master, slave):
+ self.comb += master.connect(slave)
+
class AXILiteArbiter(Module):
"""AXI Lite arbiter
self.comb += dest.eq(source)
# allow to change rr.grant only after all requests from a master have been responded to
- self.submodules.wr_lock = wr_lock = AXILiteRequestCounter(
+ self.submodules.wr_lock = wr_lock = _AXILiteRequestCounter(
request=target.aw.valid & target.aw.ready, response=target.b.valid & target.b.ready)
- self.submodules.rd_lock = rd_lock = AXILiteRequestCounter(
+ self.submodules.rd_lock = rd_lock = _AXILiteRequestCounter(
request=target.ar.valid & target.ar.ready, response=target.r.valid & target.r.ready)
# switch to next request only if there are no responses pending
# we need to hold the slave selected until all responses come back
# TODO: we could reuse arbiter counters
locks = {
- "write": AXILiteRequestCounter(
+ "write": _AXILiteRequestCounter(
request=master.aw.valid & master.aw.ready,
response=master.b.valid & master.b.ready),
- "read": AXILiteRequestCounter(
+ "read": _AXILiteRequestCounter(
request=master.ar.valid & master.ar.ready,
response=master.r.valid & master.r.ready),
}
yield from self.handle_read(axi_lite)
yield
+ @passive
+ def _write_handler(self, axi_lite):
+ while True:
+ yield from self.handle_write(axi_lite)
+ yield
+
+ @passive
+ def _read_handler(self, axi_lite):
+ while True:
+ yield from self.handle_read(axi_lite)
+ yield
+
+ def parallel_handlers(self, axi_lite):
+ return self._write_handler(axi_lite), self._read_handler(axi_lite)
+
class AXILitePatternGenerator:
def __init__(self, axi_lite, pattern, delay=0):
# patter: (rw, addr, data)
run_simulation(dut, [generator(dut, init)])
self.assertEqual(dut.errors, 0)
- def converter_test(self, width_from, width_to,
+ def converter_test(self, width_from, width_to, parallel_rw=False,
write_pattern=None, write_expected=None,
read_pattern=None, read_expected=None):
assert not (write_pattern is None and read_pattern is None)
self.slave = AXILiteInterface(data_width=width_to)
self.submodules.converter = AXILiteConverter(self.master, self.slave)
- def generator(axi_lite):
+ prng = random.Random(42)
+
+ def write_generator(axi_lite):
for addr, data, strb in write_pattern or []:
resp = (yield from axi_lite.write(addr, data, strb))
self.assertEqual(resp, RESP_OKAY)
+ for _ in range(prng.randrange(3)):
+ yield
for _ in range(16):
yield
+ def read_generator(axi_lite):
for addr, refdata in read_pattern or []:
data, resp = (yield from axi_lite.read(addr))
self.assertEqual(resp, RESP_OKAY)
self.assertEqual(data, refdata)
+ for _ in range(prng.randrange(3)):
+ yield
for _ in range(4):
yield
+ def sequential_generator(axi_lite):
+ yield from write_generator(axi_lite)
+ yield from read_generator(axi_lite)
+
def rdata_generator(adr):
for a, v in read_expected:
if a == adr:
dut = DUT(width_from=width_from, width_to=width_to)
checker = AXILiteChecker(ready_latency=latency, rdata_generator=rdata_generator)
- run_simulation(dut, [generator(dut.master), checker.handler(dut.slave)])
+ if parallel_rw:
+ generators = [write_generator(dut.master), read_generator(dut.master)]
+ else:
+ generators = [sequential_generator(dut.master)]
+ generators += checker.parallel_handlers(dut.slave)
+ run_simulation(dut, generators)
self.assertEqual(checker.writes, write_expected)
self.assertEqual(checker.reads, read_expected)
]
read_pattern = write_pattern
read_expected = [(adr, data) for (adr, data, _) in write_expected]
- self.converter_test(width_from=32, width_to=16,
- write_pattern=write_pattern, write_expected=write_expected,
- read_pattern=read_pattern, read_expected=read_expected)
+ for parallel in [False, True]:
+ with self.subTest(parallel=parallel):
+ self.converter_test(width_from=32, width_to=16, parallel_rw=parallel,
+ write_pattern=write_pattern, write_expected=write_expected,
+ read_pattern=read_pattern, read_expected=read_expected)
def test_axilite_down_converter_32to8(self):
write_pattern = [
]
read_pattern = write_pattern
read_expected = [(adr, data) for (adr, data, _) in write_expected]
- self.converter_test(width_from=32, width_to=8,
- write_pattern=write_pattern, write_expected=write_expected,
- read_pattern=read_pattern, read_expected=read_expected)
+ for parallel in [False, True]:
+ with self.subTest(parallel=parallel):
+ self.converter_test(width_from=32, width_to=8, parallel_rw=parallel,
+ write_pattern=write_pattern, write_expected=write_expected,
+ read_pattern=read_pattern, read_expected=read_expected)
def test_axilite_down_converter_64to32(self):
write_pattern = [
]
read_pattern = write_pattern
read_expected = [(adr, data) for (adr, data, _) in write_expected]
- self.converter_test(width_from=64, width_to=32,
- write_pattern=write_pattern, write_expected=write_expected,
- read_pattern=read_pattern, read_expected=read_expected)
+ for parallel in [False, True]:
+ with self.subTest(parallel=parallel):
+ self.converter_test(width_from=64, width_to=32, parallel_rw=parallel,
+ write_pattern=write_pattern, write_expected=write_expected,
+ read_pattern=read_pattern, read_expected=read_expected)
def test_axilite_down_converter_strb(self):
write_pattern = [
for i, (slave, checker) in enumerate(zip(dut.slaves, checkers))
if i not in (disconnected_slaves or [])]
generators += [timeout_generator(timeout)]
- run_simulation(dut, generators, vcd_name='sim.vcd')
+ run_simulation(dut, generators)
return pattern_generators, checkers