"""Protect master against slave timeouts (master _has_ to respond correctly)"""
def __init__(self, master, cycles):
self.error = Signal()
+ wr_error = Signal()
+ rd_error = Signal()
# # #
- timer = WaitTimer(int(cycles))
- self.submodules += timer
- is_write = Signal()
- is_read = Signal()
-
- self.submodules.fsm = fsm = FSM()
- fsm.act("WAIT",
- is_write.eq((master.aw.valid & ~master.aw.ready) | (master.w.valid & ~master.w.ready)),
- is_read.eq(master.ar.valid & ~master.ar.ready),
- timer.wait.eq(is_write | is_read),
- # done is updated in `sync`, so we must make sure that `ready` has not been issued
- # by slave during that single cycle, by checking `timer.wait`
- If(timer.done & timer.wait,
- self.error.eq(1),
- If(is_write,
- NextState("RESPOND-WRITE")
- ).Else(
- NextState("RESPOND-READ")
+ self.comb += self.error.eq(wr_error | rd_error)
+
+ wr_timer = WaitTimer(int(cycles))
+ rd_timer = WaitTimer(int(cycles))
+ self.submodules += wr_timer, rd_timer
+
+ def channel_fsm(timer, wait_cond, error, response):
+ fsm = FSM(reset_state="WAIT")
+ fsm.act("WAIT",
+ timer.wait.eq(wait_cond),
+ # done is updated in `sync`, so we must make sure that `ready` has not been issued
+ # by slave during that single cycle, by checking `timer.wait`
+ If(timer.done & timer.wait,
+ error.eq(1),
+ NextState("RESPOND")
)
)
- )
- fsm.act("RESPOND-WRITE",
- master.aw.ready.eq(master.aw.valid),
- master.w.ready.eq(master.w.valid),
- master.b.valid.eq(~master.aw.valid & ~master.w.valid),
- master.b.resp.eq(RESP_SLVERR),
- If(master.b.valid & master.b.ready,
- NextState("WAIT")
- )
- )
- fsm.act("RESPOND-READ",
- master.ar.ready.eq(master.ar.valid),
- master.r.valid.eq(~master.ar.valid),
- master.r.resp.eq(RESP_SLVERR),
- master.r.data.eq(2**len(master.r.data) - 1),
- If(master.r.valid & master.r.ready,
- NextState("WAIT")
- )
- )
+ fsm.act("RESPOND", *response)
+ return fsm
+
+ self.submodules.wr_fsm = channel_fsm(
+ timer = wr_timer,
+ wait_cond = (master.aw.valid & ~master.aw.ready) | (master.w.valid & ~master.w.ready),
+ error = wr_error,
+ response = [
+ master.aw.ready.eq(master.aw.valid),
+ master.w.ready.eq(master.w.valid),
+ master.b.valid.eq(~master.aw.valid & ~master.w.valid),
+ master.b.resp.eq(RESP_SLVERR),
+ If(master.b.valid & master.b.ready,
+ NextState("WAIT")
+ )
+ ])
+
+ self.submodules.rd_fsm = channel_fsm(
+ timer = rd_timer,
+ wait_cond = master.ar.valid & ~master.ar.ready,
+ error = rd_error,
+ response = [
+ master.ar.ready.eq(master.ar.valid),
+ master.r.valid.eq(~master.ar.valid),
+ master.r.resp.eq(RESP_SLVERR),
+ master.r.data.eq(2**len(master.r.data) - 1),
+ If(master.r.valid & master.r.ready,
+ NextState("WAIT")
+ )
+ ])
# AXILite Interconnect -----------------------------------------------------------------------------
slave_sel = new_slave_sel()
# we need to hold the slave selected until all responses come back
- # TODO: check if this will break Timeout if a slave does not respond?
- # should probably work correctly as it uses master signals
# TODO: we could reuse arbiter counters
locks = {
"write": AXILiteRequestCounter(
def interconnect_shared_test(self, master_patterns, slave_decoders,
master_delay=0, slave_ready_latency=0, slave_response_latency=0,
- timeout=300, **kwargs):
+ disconnected_slaves=None, timeout=300, **kwargs):
# number of masters/slaves is defined by the number of patterns/decoders
# master_patterns: list of patterns per master, pattern = list(tuple(rw, addr, data))
# slave_decoders: list of address decoders per slave
+ # delay/latency: control the speed of masters/slaves
+ # disconnected_slaves: list of slave numbers that shouldn't respond to any transactions
class DUT(Module):
def __init__(self, n_masters, decoders, **kwargs):
self.masters = [AXILiteInterface(name="master") for _ in range(n_masters)]
# run simulator
generators = [gen.handler() for gen in pattern_generators]
- generators += [checker.handler(slave) for (slave, checker) in zip(dut.slaves, checkers)]
+ generators += [checker.handler(slave)
+ for i, (slave, checker) in enumerate(zip(dut.slaves, checkers))
+ if i not in (disconnected_slaves or [])]
generators += [timeout_generator(timeout)]
- run_simulation(dut, generators)
+ run_simulation(dut, generators, vcd_name='sim.vcd')
return pattern_generators, checkers
read_errors = [" 0x{:08x} vs 0x{:08x}".format(v, ref) for v, ref in gen.read_errors]
msg = "\ngen.resp_errors = {}\ngen.read_errors = \n{}".format(
gen.resp_errors, "\n".join(read_errors))
- self.assertEqual(gen.errors, 0, msg=msg)
+ if not kwargs.get("disconnected_slaves", None):
+ self.assertEqual(gen.errors, 0, msg=msg)
+ else: # when some slaves are disconnected we should have some errors
+ self.assertNotEqual(gen.errors, 0, msg=msg)
# make sure all the accesses at slave side are in correct address region
for i_slave, (checker, decoder) in enumerate(zip(checkers, slave_decoders_py)):
master_delay=rand,
slave_ready_latency=rand,
slave_response_latency=rand)
+
+ def test_interconnect_shared_stress_timeout(self):
+ self.interconnect_shared_stress_test(timeout=4000,
+ disconnected_slaves=[1],
+ timeout_cycles=50)