sim.run()
+class ArbiterTestCase(unittest.TestCase):
+ def setUp(self):
+ self.dut = Arbiter(addr_width=31, data_width=32, granularity=16)
+
+ def test_add_wrong(self):
+ with self.assertRaisesRegex(TypeError,
+ r"Initiator bus must be an instance of wishbone\.Interface, not 'foo'"):
+ self.dut.add("foo")
+
+ def test_add_wrong_addr_width(self):
+ with self.assertRaisesRegex(ValueError,
+ r"Initiator bus has address width 15, which is not the same as arbiter "
+ r"address width 31"):
+ self.dut.add(Interface(addr_width=15, data_width=32, granularity=16))
+
+ def test_add_wrong_granularity(self):
+ with self.assertRaisesRegex(ValueError,
+ r"Initiator bus has granularity 8, which is lesser than "
+ r"the arbiter granularity 16"):
+ self.dut.add(Interface(addr_width=31, data_width=32, granularity=8))
+
+ def test_add_wrong_data_width(self):
+ with self.assertRaisesRegex(ValueError,
+ r"Initiator bus has data width 16, which is not the same as arbiter "
+ r"data width 32"):
+ self.dut.add(Interface(addr_width=31, data_width=16, granularity=16))
+
+ def test_add_wrong_optional_output(self):
+ with self.assertRaisesRegex(ValueError,
+ r"Initiator bus has optional output 'lock', but the arbiter does "
+ r"not have a corresponding input"):
+ self.dut.add(Interface(addr_width=31, data_width=32, granularity=16,
+ features={"lock"}))
+
+
+class ArbiterSimulationTestCase(unittest.TestCase):
+ def test_simple(self):
+ dut = Arbiter(addr_width=30, data_width=32, granularity=8,
+ features={"err", "rty", "stall", "lock", "cti", "bte"})
+ itor_1 = Interface(addr_width=30, data_width=32, granularity=8)
+ dut.add(itor_1)
+ itor_2 = Interface(addr_width=30, data_width=32, granularity=16,
+ features={"err", "rty", "stall", "lock", "cti", "bte"})
+ dut.add(itor_2)
+
+ def sim_test():
+ yield itor_1.adr.eq(0x7ffffffc >> 2)
+ yield itor_1.cyc.eq(1)
+ yield itor_1.stb.eq(1)
+ yield itor_1.sel.eq(0b1111)
+ yield itor_1.we.eq(1)
+ yield itor_1.dat_w.eq(0x12345678)
+ yield dut.bus.dat_r.eq(0xabcdef01)
+ yield dut.bus.ack.eq(1)
+ yield Delay(1e-7)
+ self.assertEqual((yield dut.bus.adr), 0x7ffffffc >> 2)
+ self.assertEqual((yield dut.bus.cyc), 1)
+ self.assertEqual((yield dut.bus.stb), 1)
+ self.assertEqual((yield dut.bus.sel), 0b1111)
+ self.assertEqual((yield dut.bus.we), 1)
+ self.assertEqual((yield dut.bus.dat_w), 0x12345678)
+ self.assertEqual((yield dut.bus.lock), 1)
+ self.assertEqual((yield dut.bus.cti), CycleType.CLASSIC.value)
+ self.assertEqual((yield dut.bus.bte), BurstTypeExt.LINEAR.value)
+ self.assertEqual((yield itor_1.dat_r), 0xabcdef01)
+ self.assertEqual((yield itor_1.ack), 1)
+
+ yield itor_1.cyc.eq(0)
+ yield itor_2.adr.eq(0xe0000000 >> 2)
+ yield itor_2.cyc.eq(1)
+ yield itor_2.stb.eq(1)
+ yield itor_2.sel.eq(0b10)
+ yield itor_2.we.eq(1)
+ yield itor_2.dat_w.eq(0x43218765)
+ yield itor_2.lock.eq(0)
+ yield itor_2.cti.eq(CycleType.INCR_BURST)
+ yield itor_2.bte.eq(BurstTypeExt.WRAP_4)
+ yield Tick()
+
+ yield dut.bus.err.eq(1)
+ yield dut.bus.rty.eq(1)
+ yield dut.bus.stall.eq(0)
+ yield Delay(1e-7)
+ self.assertEqual((yield dut.bus.adr), 0xe0000000 >> 2)
+ self.assertEqual((yield dut.bus.cyc), 1)
+ self.assertEqual((yield dut.bus.stb), 1)
+ self.assertEqual((yield dut.bus.sel), 0b1100)
+ self.assertEqual((yield dut.bus.we), 1)
+ self.assertEqual((yield dut.bus.dat_w), 0x43218765)
+ self.assertEqual((yield dut.bus.lock), 0)
+ self.assertEqual((yield dut.bus.cti), CycleType.INCR_BURST.value)
+ self.assertEqual((yield dut.bus.bte), BurstTypeExt.WRAP_4.value)
+ self.assertEqual((yield itor_2.dat_r), 0xabcdef01)
+ self.assertEqual((yield itor_2.ack), 1)
+ self.assertEqual((yield itor_2.err), 1)
+ self.assertEqual((yield itor_2.rty), 1)
+ self.assertEqual((yield itor_2.stall), 0)
+
+ with Simulator(dut, vcd_file=open("test.vcd", "w")) as sim:
+ sim.add_clock(1e-6)
+ sim.add_sync_process(sim_test())
+ sim.run()
+
+ def test_lock(self):
+ dut = Arbiter(addr_width=30, data_width=32, features={"lock"})
+ itor_1 = Interface(addr_width=30, data_width=32, features={"lock"})
+ dut.add(itor_1)
+ itor_2 = Interface(addr_width=30, data_width=32, features={"lock"})
+ dut.add(itor_2)
+
+ def sim_test():
+ yield itor_1.cyc.eq(1)
+ yield itor_1.lock.eq(1)
+ yield itor_2.cyc.eq(1)
+ yield dut.bus.ack.eq(1)
+ yield Delay(1e-7)
+ self.assertEqual((yield itor_1.ack), 1)
+ self.assertEqual((yield itor_2.ack), 0)
+
+ yield Tick()
+ yield Delay(1e-7)
+ self.assertEqual((yield itor_1.ack), 1)
+ self.assertEqual((yield itor_2.ack), 0)
+
+ yield itor_1.lock.eq(0)
+ yield Tick()
+ yield Delay(1e-7)
+ self.assertEqual((yield itor_1.ack), 0)
+ self.assertEqual((yield itor_2.ack), 1)
+
+ yield itor_2.cyc.eq(0)
+ yield Tick()
+ yield Delay(1e-7)
+ self.assertEqual((yield itor_1.ack), 1)
+ self.assertEqual((yield itor_2.ack), 0)
+
+ yield itor_1.stb.eq(1)
+ yield Tick()
+ yield Delay(1e-7)
+ self.assertEqual((yield itor_1.ack), 1)
+ self.assertEqual((yield itor_2.ack), 0)
+
+ yield itor_1.stb.eq(0)
+ yield itor_2.cyc.eq(1)
+ yield Tick()
+ yield Delay(1e-7)
+ self.assertEqual((yield itor_1.ack), 0)
+ self.assertEqual((yield itor_2.ack), 1)
+
+ with Simulator(dut, vcd_file=open("test.vcd", "w")) as sim:
+ sim.add_clock(1e-6)
+ sim.add_sync_process(sim_test())
+ sim.run()
+
+ def test_stall(self):
+ dut = Arbiter(addr_width=30, data_width=32, features={"stall"})
+ itor_1 = Interface(addr_width=30, data_width=32, features={"stall"})
+ dut.add(itor_1)
+ itor_2 = Interface(addr_width=30, data_width=32, features={"stall"})
+ dut.add(itor_2)
+
+ def sim_test():
+ yield itor_1.cyc.eq(1)
+ yield itor_2.cyc.eq(1)
+ yield dut.bus.stall.eq(0)
+ yield Delay(1e-6)
+ self.assertEqual((yield itor_1.stall), 0)
+ self.assertEqual((yield itor_2.stall), 1)
+
+ yield dut.bus.stall.eq(1)
+ yield Delay(1e-6)
+ self.assertEqual((yield itor_1.stall), 1)
+ self.assertEqual((yield itor_2.stall), 1)
+
+ with Simulator(dut, vcd_file=open("test.vcd", "w")) as sim:
+ sim.add_process(sim_test())
+ sim.run()
+
+ def test_stall_compat(self):
+ dut = Arbiter(addr_width=30, data_width=32)
+ itor_1 = Interface(addr_width=30, data_width=32, features={"stall"})
+ dut.add(itor_1)
+ itor_2 = Interface(addr_width=30, data_width=32, features={"stall"})
+ dut.add(itor_2)
+
+ def sim_test():
+ yield itor_1.cyc.eq(1)
+ yield itor_2.cyc.eq(1)
+ yield Delay(1e-6)
+ self.assertEqual((yield itor_1.stall), 1)
+ self.assertEqual((yield itor_2.stall), 1)
+
+ yield dut.bus.ack.eq(1)
+ yield Delay(1e-6)
+ self.assertEqual((yield itor_1.stall), 0)
+ self.assertEqual((yield itor_2.stall), 1)
+
+ with Simulator(dut, vcd_file=open("test.vcd", "w")) as sim:
+ sim.add_process(sim_test())
+ sim.run()
+
+ def test_roundrobin(self):
+ dut = Arbiter(addr_width=30, data_width=32)
+ itor_1 = Interface(addr_width=30, data_width=32)
+ dut.add(itor_1)
+ itor_2 = Interface(addr_width=30, data_width=32)
+ dut.add(itor_2)
+ itor_3 = Interface(addr_width=30, data_width=32)
+ dut.add(itor_3)
+
+ def sim_test():
+ yield itor_1.cyc.eq(1)
+ yield itor_2.cyc.eq(0)
+ yield itor_3.cyc.eq(1)
+ yield dut.bus.ack.eq(1)
+ yield Delay(1e-7)
+ self.assertEqual((yield itor_1.ack), 1)
+ self.assertEqual((yield itor_2.ack), 0)
+ self.assertEqual((yield itor_3.ack), 0)
+
+ yield itor_1.cyc.eq(0)
+ yield itor_2.cyc.eq(0)
+ yield itor_3.cyc.eq(1)
+ yield Tick()
+ yield Delay(1e-7)
+ self.assertEqual((yield itor_1.ack), 0)
+ self.assertEqual((yield itor_2.ack), 0)
+ self.assertEqual((yield itor_3.ack), 1)
+
+ yield itor_1.cyc.eq(1)
+ yield itor_2.cyc.eq(1)
+ yield itor_3.cyc.eq(0)
+ yield Tick()
+ yield Delay(1e-7)
+ self.assertEqual((yield itor_1.ack), 1)
+ self.assertEqual((yield itor_2.ack), 0)
+ self.assertEqual((yield itor_3.ack), 0)
+
+ yield itor_1.cyc.eq(0)
+ yield itor_2.cyc.eq(1)
+ yield itor_3.cyc.eq(1)
+ yield Tick()
+ yield Delay(1e-7)
+ self.assertEqual((yield itor_1.ack), 0)
+ self.assertEqual((yield itor_2.ack), 1)
+ self.assertEqual((yield itor_3.ack), 0)
+
+ yield itor_1.cyc.eq(1)
+ yield itor_2.cyc.eq(0)
+ yield itor_3.cyc.eq(1)
+ yield Tick()
+ yield Delay(1e-7)
+ self.assertEqual((yield itor_1.ack), 0)
+ self.assertEqual((yield itor_2.ack), 0)
+ self.assertEqual((yield itor_3.ack), 1)
+
+ with Simulator(dut, vcd_file=open("test.vcd", "w")) as sim:
+ sim.add_clock(1e-6)
+ sim.add_sync_process(sim_test())
+ sim.run()
+
+
class InterconnectSharedSimulationTestCase(unittest.TestCase):
def setUp(self):
self.shared = Interface(addr_width=30,
features={"err","cti","bte"},
name="sub01")
self.dut = InterconnectShared(
- shared_bus=self.shared,
- masters=[
+ addr_width=30, data_width=32, granularity=8,
+ features={"err","cti","bte"},
+ itors=[
self.master01,
self.master02
],
targets=[
- (self.ram.bus, 0),
+ (self.ram.bus, 0),
(self.sub01, (2**21) << 2)
]
)
for _ in range(5):
yield self.master01.cyc.eq(1)
yield self.master02.cyc.eq(1)
- yield
+ yield
ram_cyc = (yield self.ram.bus.cyc)
sub01_cyc = (yield self.sub01.cyc)
if ram_cyc == 1:
class Arbiter(Elaboratable):
"""Wishbone bus arbiter.
- An arbiter for selecting the Wishbone master from several devices.
+ An arbiter for initiators (masters) to access a shared Wishbone bus.
Parameters
----------
addr_width : int
- Address width. See :class:`Interface`.
+ Address width of the shared bus. See :class:`Interface`.
data_width : int
- Data width. See :class:`Interface`.
+ Data width of the shared bus. See :class:`Interface`.
granularity : int
- Granularity. See :class:`Interface`.
+ Granularity of the shared bus. See :class:`Interface`.
features : iter(str)
- Optional signal set. See :class:`Interface`.
- alignment : int
- Window alignment. See :class:`Interface`.
+ Optional signal set for the shared bus. See :class:`Interface`.
+ scheduler : str or None
+ Method for bus arbitration. Defaults to "rr" (Round Robin, see
+ :class:`scheduler.RoundRobin`).
Attributes
----------
bus : :class:`Interface`
- Bus providing access to the selected master.
+ Shared bus to which the selected initiator gains access.
"""
def __init__(self, *, addr_width, data_width, granularity=None, features=frozenset(),
- alignment=0, scheduler="rr"):
- self.bus = Interface(addr_width=addr_width, data_width=data_width,
- granularity=granularity, features=features,
- alignment=alignment)
- self._masters = dict()
+ scheduler="rr"):
+ self.bus = Interface(addr_width=addr_width, data_width=data_width,
+ granularity=granularity, features=features)
+ self._itors = []
if scheduler not in ["rr"]:
raise ValueError("Scheduling mode must be \"rr\", not {!r}"
.format(scheduler))
self._scheduler = scheduler
- self._next_index = 0
- def add(self, master_bus):
- """Add a device bus to the list of master candidates
+ def add(self, itor_bus):
+ """Add an initiator bus to the arbiter.
+
+ The initiator bus must have the same address width and data width as the arbiter. The
+ granularity of the initiator bus must be greater than or equal to the granularity of
+ the arbiter.
"""
- if not isinstance(master_bus, Interface):
- raise TypeError("Master bus must be an instance of wishbone.Interface, not {!r}"
- .format(master_bus))
- if master_bus.granularity != self.bus.granularity:
- raise ValueError("Master bus has granularity {}, which is not the same as "
+ if not isinstance(itor_bus, Interface):
+ raise TypeError("Initiator bus must be an instance of wishbone.Interface, not {!r}"
+ .format(itor_bus))
+ if itor_bus.addr_width != self.bus.addr_width:
+ raise ValueError("Initiator bus has address width {}, which is not the same as "
+ "arbiter address width {}"
+ .format(itor_bus.addr_width, self.bus.addr_width))
+ if itor_bus.granularity < self.bus.granularity:
+ raise ValueError("Initiator bus has granularity {}, which is lesser than the "
"arbiter granularity {}"
- .format(master_bus.granularity, self.bus.granularity))
- if master_bus.data_width != self.bus.data_width:
- raise ValueError("Master bus has data width {}, which is not the same as "
- "arbiter data width {})"
- .format(master_bus.data_width, self.bus.data_width))
- for opt_output in {"err", "rty", "stall"}:
- if hasattr(master_bus, opt_output) and not hasattr(self.bus, opt_output):
- raise ValueError("Master bus has optional output {!r}, but the arbiter "
+ .format(itor_bus.granularity, self.bus.granularity))
+ if itor_bus.data_width != self.bus.data_width:
+ raise ValueError("Initiator bus has data width {}, which is not the same as "
+ "arbiter data width {}"
+ .format(itor_bus.data_width, self.bus.data_width))
+ for opt_output in {"lock", "cti", "bte"}:
+ if hasattr(itor_bus, opt_output) and not hasattr(self.bus, opt_output):
+ raise ValueError("Initiator bus has optional output {!r}, but the arbiter "
"does not have a corresponding input"
.format(opt_output))
- self._masters[master_bus.memory_map] = self._next_index, master_bus
- self._next_index += 1
+ self._itors.append(itor_bus)
def elaborate(self, platform):
m = Module()
if self._scheduler == "rr":
- m.submodules.scheduler = scheduler = RoundRobin(self._next_index)
- grant = Signal(self._next_index)
+ m.submodules.scheduler = scheduler = RoundRobin(len(self._itors))
+ grant = Signal(range(len(self._itors)))
+
+ # CYC should not be indefinitely asserted. (See RECOMMENDATION 3.05, Wishbone B4)
+ bus_busy = self.bus.cyc
+ if hasattr(self.bus, "lock"):
+ # If LOCK is not asserted, we also wait for STB to be deasserted before granting bus
+ # ownership to the next initiator. If we didn't, the next bus owner could receive
+ # an ACK (or ERR, RTY) from the previous transaction when targeting the same
+ # peripheral.
+ bus_busy &= self.bus.lock | self.bus.stb
+
m.d.comb += [
- # CYC should not be indefinitely asserted. (See RECOMMENDATION 3.05, Wishbone B4)
- scheduler.stb.eq(~self.bus.cyc),
- grant.eq(scheduler.grant)
+ scheduler.stb.eq(~bus_busy),
+ grant.eq(scheduler.grant),
+ scheduler.request.eq(Cat(itor_bus.cyc for itor_bus in self._itors))
]
- for signal_name, (_, signal_direction) in self.bus.layout.fields.items():
- # FANOUT signals: only mux the granted master with the interface
- if signal_direction == Direction.FANOUT:
- master_signals = Array(getattr(master_bus, signal_name)
- for __, (___, master_bus)
- in self._masters.items())
- m.d.comb += getattr(self.bus, signal_name).eq(master_signals[grant])
- # FANIN signals: ACK and ERR are ORed to all masters;
- # all other signals are asserted to the granted master only
- if signal_direction == Direction.FANIN:
- for __, (index, master_bus) in self._masters.items():
- source = getattr(self.bus, signal_name)
- dest = getattr(master_bus, signal_name)
- if signal_name in ["ack", "err"]:
- m.d.comb += dest.eq(source & (grant == index))
- else:
- m.d.comb += dest.eq(source)
-
- master_requests = [master_bus.cyc & ~master_bus.ack
- for __, (___, master_bus) in self._masters.items()]
- m.d.comb += scheduler.request.eq(Cat(*master_requests))
+ with m.Switch(grant):
+ for i, itor_bus in enumerate(self._itors):
+ m.d.comb += itor_bus.dat_r.eq(self.bus.dat_r)
+ if hasattr(itor_bus, "stall"):
+ itor_bus_stall = Signal(reset=1)
+ m.d.comb += itor_bus.stall.eq(itor_bus_stall)
+
+ with m.Case(i):
+ ratio = itor_bus.granularity // self.bus.granularity
+ m.d.comb += [
+ self.bus.adr.eq(itor_bus.adr),
+ self.bus.dat_w.eq(itor_bus.dat_w),
+ self.bus.sel.eq(Cat(Repl(sel, ratio) for sel in itor_bus.sel)),
+ self.bus.we.eq(itor_bus.we),
+ self.bus.stb.eq(itor_bus.stb),
+ ]
+ m.d.comb += self.bus.cyc.eq(itor_bus.cyc)
+ if hasattr(self.bus, "lock"):
+ m.d.comb += self.bus.lock.eq(getattr(itor_bus, "lock", 1))
+ if hasattr(self.bus, "cti"):
+ m.d.comb += self.bus.cti.eq(getattr(itor_bus, "cti", CycleType.CLASSIC))
+ if hasattr(self.bus, "bte"):
+ m.d.comb += self.bus.bte.eq(getattr(itor_bus, "bte", BurstTypeExt.LINEAR))
+
+ m.d.comb += itor_bus.ack.eq(self.bus.ack)
+ if hasattr(itor_bus, "err"):
+ m.d.comb += itor_bus.err.eq(getattr(self.bus, "err", 0))
+ if hasattr(itor_bus, "rty"):
+ m.d.comb += itor_bus.rty.eq(getattr(self.bus, "rty", 0))
+ if hasattr(itor_bus, "stall"):
+ m.d.comb += itor_bus_stall.eq(getattr(self.bus, "stall", ~self.bus.ack))
return m
class InterconnectShared(Elaboratable):
+ """Wishbone bus interconnect module.
+
+ This is initialised using the following components:
+ (1) A shared Wishbone bus connecting multiple initiators (MASTERs) with
+ multiple targeted SLAVEs;
+ (2) A list of initiator Wishbone busses; and
+ (3) A list of SLAVE Wishbone busses targeted by the MASTERs.
+
+ This instantiates the following components:
+ (1) An arbiter (:class:`Arbiter`) controlling access of
+ multiple MASTERs to the shared bus; and
+ (2) A decoder (:class:`Decoder`) specifying which targeted SLAVE is to be accessed
+ using address translation on the shared bus.
+
+ See Section 8.2.3 of Wishbone B4 for implemenation specifications.
+
+ Parameters
+ ----------
+ shared_bus : :class:`Interface`
+ Shared bus for the interconnect module between the arbiter and decoder.
+ itors : list of :class:`Interface`
+ List of MASTERs on the arbiter to request access to the shared bus.
+ targets : list of :class:`Interface`
+ List of SLAVEs on the decoder whose accesses are to be targeted by the shared bus.
+
+ Attributes
+ ----------
+ addr_width : int
+ Address width of the shared bus. See :class:`Interface`.
+ data_width : int
+ Data width of the shared bus. See :class:`Interface`.
+ granularity : int
+ Granularity of the shared bus. See :class:`Interface`
"""
- """
- def __init__(self, shared_bus, masters, targets):
- self.addr_width = shared_bus.addr_width
- self.data_width = shared_bus.data_width
- self.granularity = shared_bus.granularity
- self._features = shared_bus._features
- self._alignment = shared_bus._alignment
-
- self._masters = []
+ def __init__(self, *, addr_width, data_width, itors, targets,
+ scheduler="rr", **kwargs):
+ self.addr_width = addr_width
+ self.data_width = data_width
+
+ self._itors = []
self._targets = []
- self._masters_convert_stmts = []
- for master_bus in masters:
- if isinstance(master_bus, Interface):
- self._masters.append(master_bus)
- elif isinstance(master_bus, Record):
- master_interface = Interface.from_pure_record(master_bus)
- self._masters_convert_stmts.append(
- master_bus.connect(master_interface)
+ self._itors_convert_stmts = []
+ for itor_bus in itors:
+ if isinstance(itor_bus, Interface):
+ self._itors.append(itor_bus)
+ elif isinstance(itor_bus, Record):
+ master_interface = Interface.from_pure_record(itor_bus)
+ self._itors_convert_stmts.append(
+ itor_bus.connect(master_interface)
)
- self._masters.append(master_interface)
+ self._itors.append(master_interface)
else:
raise TypeError("Master {!r} must be a Wishbone interface"
- .format(master_bus))
+ .format(itor_bus))
for target_bus in targets:
self._targets.append(target_bus)
+ arbiter_kwargs = dict()
+ for name in ["granularity", "features", "scheduler"]:
+ if name in kwargs:
+ arbiter_kwargs[name] = kwargs[name]
self.arbiter = Arbiter(
- addr_width=self.addr_width,
- data_width=self.data_width,
- granularity=self.granularity,
- features=self._features,
- alignment=self._alignment
+ addr_width=self.addr_width, data_width=self.data_width, **arbiter_kwargs
)
- for master_bus in self._masters:
- self.arbiter.add(master_bus)
-
+ self.arbiter.bus.name = "arbiter_shared"
+ for itor_bus in self._itors:
+ self.arbiter.add(itor_bus)
+
+ decoder_kwargs = dict()
+ for name in ["granularity", "features", "alignment"]:
+ if name in kwargs:
+ decoder_kwargs[name] = kwargs[name]
self.decoder = Decoder(
- addr_width=self.addr_width,
- data_width=self.data_width,
- granularity=self.granularity,
- features=self._features,
- alignment=self._alignment
+ addr_width=self.addr_width, data_width=self.data_width, **decoder_kwargs
)
+ self.decoder.bus.name = "decoder_shared"
for item in self._targets:
if isinstance(item, Interface):
self.decoder.add(item)
m.submodules.decoder = self.decoder
m.d.comb += (
- self._masters_convert_stmts +
+ self._itors_convert_stmts +
self.arbiter.bus.connect(self.decoder.bus)
)