sim.run()
- shared_bus=self.shared,
- masters=[
+ class ArbiterTestCase(unittest.TestCase):
+ def setUp(self):
+ self.dut = Arbiter(addr_width=31, data_width=32, granularity=16)
+
+ def test_add_wrong(self):
+ with self.assertRaisesRegex(TypeError,
+ r"Initiator bus must be an instance of wishbone\.Interface, not 'foo'"):
+ self.dut.add("foo")
+
+ def test_add_wrong_addr_width(self):
+ with self.assertRaisesRegex(ValueError,
+ r"Initiator bus has address width 15, which is not the same as arbiter "
+ r"address width 31"):
+ self.dut.add(Interface(addr_width=15, data_width=32, granularity=16))
+
+ def test_add_wrong_granularity(self):
+ with self.assertRaisesRegex(ValueError,
+ r"Initiator bus has granularity 8, which is lesser than "
+ r"the arbiter granularity 16"):
+ self.dut.add(Interface(addr_width=31, data_width=32, granularity=8))
+
+ def test_add_wrong_data_width(self):
+ with self.assertRaisesRegex(ValueError,
+ r"Initiator bus has data width 16, which is not the same as arbiter "
+ r"data width 32"):
+ self.dut.add(Interface(addr_width=31, data_width=16, granularity=16))
+
+ def test_add_wrong_optional_output(self):
+ with self.assertRaisesRegex(ValueError,
+ r"Initiator bus has optional output 'lock', but the arbiter does "
+ r"not have a corresponding input"):
+ self.dut.add(Interface(addr_width=31, data_width=32, granularity=16,
+ features={"lock"}))
+
+
+ class ArbiterSimulationTestCase(unittest.TestCase):
+ def test_simple(self):
+ dut = Arbiter(addr_width=30, data_width=32, granularity=8,
+ features={"err", "rty", "stall", "lock", "cti", "bte"})
+ itor_1 = Interface(addr_width=30, data_width=32, granularity=8)
+ dut.add(itor_1)
+ itor_2 = Interface(addr_width=30, data_width=32, granularity=16,
+ features={"err", "rty", "stall", "lock", "cti", "bte"})
+ dut.add(itor_2)
+
+ def sim_test():
+ yield itor_1.adr.eq(0x7ffffffc >> 2)
+ yield itor_1.cyc.eq(1)
+ yield itor_1.stb.eq(1)
+ yield itor_1.sel.eq(0b1111)
+ yield itor_1.we.eq(1)
+ yield itor_1.dat_w.eq(0x12345678)
+ yield dut.bus.dat_r.eq(0xabcdef01)
+ yield dut.bus.ack.eq(1)
+ yield Delay(1e-7)
+ self.assertEqual((yield dut.bus.adr), 0x7ffffffc >> 2)
+ self.assertEqual((yield dut.bus.cyc), 1)
+ self.assertEqual((yield dut.bus.stb), 1)
+ self.assertEqual((yield dut.bus.sel), 0b1111)
+ self.assertEqual((yield dut.bus.we), 1)
+ self.assertEqual((yield dut.bus.dat_w), 0x12345678)
+ self.assertEqual((yield dut.bus.lock), 1)
+ self.assertEqual((yield dut.bus.cti), CycleType.CLASSIC.value)
+ self.assertEqual((yield dut.bus.bte), BurstTypeExt.LINEAR.value)
+ self.assertEqual((yield itor_1.dat_r), 0xabcdef01)
+ self.assertEqual((yield itor_1.ack), 1)
+
+ yield itor_1.cyc.eq(0)
+ yield itor_2.adr.eq(0xe0000000 >> 2)
+ yield itor_2.cyc.eq(1)
+ yield itor_2.stb.eq(1)
+ yield itor_2.sel.eq(0b10)
+ yield itor_2.we.eq(1)
+ yield itor_2.dat_w.eq(0x43218765)
+ yield itor_2.lock.eq(0)
+ yield itor_2.cti.eq(CycleType.INCR_BURST)
+ yield itor_2.bte.eq(BurstTypeExt.WRAP_4)
+ yield Tick()
+
+ yield dut.bus.err.eq(1)
+ yield dut.bus.rty.eq(1)
+ yield dut.bus.stall.eq(0)
+ yield Delay(1e-7)
+ self.assertEqual((yield dut.bus.adr), 0xe0000000 >> 2)
+ self.assertEqual((yield dut.bus.cyc), 1)
+ self.assertEqual((yield dut.bus.stb), 1)
+ self.assertEqual((yield dut.bus.sel), 0b1100)
+ self.assertEqual((yield dut.bus.we), 1)
+ self.assertEqual((yield dut.bus.dat_w), 0x43218765)
+ self.assertEqual((yield dut.bus.lock), 0)
+ self.assertEqual((yield dut.bus.cti), CycleType.INCR_BURST.value)
+ self.assertEqual((yield dut.bus.bte), BurstTypeExt.WRAP_4.value)
+ self.assertEqual((yield itor_2.dat_r), 0xabcdef01)
+ self.assertEqual((yield itor_2.ack), 1)
+ self.assertEqual((yield itor_2.err), 1)
+ self.assertEqual((yield itor_2.rty), 1)
+ self.assertEqual((yield itor_2.stall), 0)
+
+ with Simulator(dut, vcd_file=open("test.vcd", "w")) as sim:
+ sim.add_clock(1e-6)
+ sim.add_sync_process(sim_test())
+ sim.run()
+
+ def test_lock(self):
+ dut = Arbiter(addr_width=30, data_width=32, features={"lock"})
+ itor_1 = Interface(addr_width=30, data_width=32, features={"lock"})
+ dut.add(itor_1)
+ itor_2 = Interface(addr_width=30, data_width=32, features={"lock"})
+ dut.add(itor_2)
+
+ def sim_test():
+ yield itor_1.cyc.eq(1)
+ yield itor_1.lock.eq(1)
+ yield itor_2.cyc.eq(1)
+ yield dut.bus.ack.eq(1)
+ yield Delay(1e-7)
+ self.assertEqual((yield itor_1.ack), 1)
+ self.assertEqual((yield itor_2.ack), 0)
+
+ yield Tick()
+ yield Delay(1e-7)
+ self.assertEqual((yield itor_1.ack), 1)
+ self.assertEqual((yield itor_2.ack), 0)
+
+ yield itor_1.lock.eq(0)
+ yield Tick()
+ yield Delay(1e-7)
+ self.assertEqual((yield itor_1.ack), 0)
+ self.assertEqual((yield itor_2.ack), 1)
+
+ yield itor_2.cyc.eq(0)
+ yield Tick()
+ yield Delay(1e-7)
+ self.assertEqual((yield itor_1.ack), 1)
+ self.assertEqual((yield itor_2.ack), 0)
+
+ yield itor_1.stb.eq(1)
+ yield Tick()
+ yield Delay(1e-7)
+ self.assertEqual((yield itor_1.ack), 1)
+ self.assertEqual((yield itor_2.ack), 0)
+
+ yield itor_1.stb.eq(0)
+ yield itor_2.cyc.eq(1)
+ yield Tick()
+ yield Delay(1e-7)
+ self.assertEqual((yield itor_1.ack), 0)
+ self.assertEqual((yield itor_2.ack), 1)
+
+ with Simulator(dut, vcd_file=open("test.vcd", "w")) as sim:
+ sim.add_clock(1e-6)
+ sim.add_sync_process(sim_test())
+ sim.run()
+
+ def test_stall(self):
+ dut = Arbiter(addr_width=30, data_width=32, features={"stall"})
+ itor_1 = Interface(addr_width=30, data_width=32, features={"stall"})
+ dut.add(itor_1)
+ itor_2 = Interface(addr_width=30, data_width=32, features={"stall"})
+ dut.add(itor_2)
+
+ def sim_test():
+ yield itor_1.cyc.eq(1)
+ yield itor_2.cyc.eq(1)
+ yield dut.bus.stall.eq(0)
+ yield Delay(1e-6)
+ self.assertEqual((yield itor_1.stall), 0)
+ self.assertEqual((yield itor_2.stall), 1)
+
+ yield dut.bus.stall.eq(1)
+ yield Delay(1e-6)
+ self.assertEqual((yield itor_1.stall), 1)
+ self.assertEqual((yield itor_2.stall), 1)
+
+ with Simulator(dut, vcd_file=open("test.vcd", "w")) as sim:
+ sim.add_process(sim_test())
+ sim.run()
+
+ def test_stall_compat(self):
+ dut = Arbiter(addr_width=30, data_width=32)
+ itor_1 = Interface(addr_width=30, data_width=32, features={"stall"})
+ dut.add(itor_1)
+ itor_2 = Interface(addr_width=30, data_width=32, features={"stall"})
+ dut.add(itor_2)
+
+ def sim_test():
+ yield itor_1.cyc.eq(1)
+ yield itor_2.cyc.eq(1)
+ yield Delay(1e-6)
+ self.assertEqual((yield itor_1.stall), 1)
+ self.assertEqual((yield itor_2.stall), 1)
+
+ yield dut.bus.ack.eq(1)
+ yield Delay(1e-6)
+ self.assertEqual((yield itor_1.stall), 0)
+ self.assertEqual((yield itor_2.stall), 1)
+
+ with Simulator(dut, vcd_file=open("test.vcd", "w")) as sim:
+ sim.add_process(sim_test())
+ sim.run()
+
+ def test_roundrobin(self):
+ dut = Arbiter(addr_width=30, data_width=32)
+ itor_1 = Interface(addr_width=30, data_width=32)
+ dut.add(itor_1)
+ itor_2 = Interface(addr_width=30, data_width=32)
+ dut.add(itor_2)
+ itor_3 = Interface(addr_width=30, data_width=32)
+ dut.add(itor_3)
+
+ def sim_test():
+ yield itor_1.cyc.eq(1)
+ yield itor_2.cyc.eq(0)
+ yield itor_3.cyc.eq(1)
+ yield dut.bus.ack.eq(1)
+ yield Delay(1e-7)
+ self.assertEqual((yield itor_1.ack), 1)
+ self.assertEqual((yield itor_2.ack), 0)
+ self.assertEqual((yield itor_3.ack), 0)
+
+ yield itor_1.cyc.eq(0)
+ yield itor_2.cyc.eq(0)
+ yield itor_3.cyc.eq(1)
+ yield Tick()
+ yield Delay(1e-7)
+ self.assertEqual((yield itor_1.ack), 0)
+ self.assertEqual((yield itor_2.ack), 0)
+ self.assertEqual((yield itor_3.ack), 1)
+
+ yield itor_1.cyc.eq(1)
+ yield itor_2.cyc.eq(1)
+ yield itor_3.cyc.eq(0)
+ yield Tick()
+ yield Delay(1e-7)
+ self.assertEqual((yield itor_1.ack), 1)
+ self.assertEqual((yield itor_2.ack), 0)
+ self.assertEqual((yield itor_3.ack), 0)
+
+ yield itor_1.cyc.eq(0)
+ yield itor_2.cyc.eq(1)
+ yield itor_3.cyc.eq(1)
+ yield Tick()
+ yield Delay(1e-7)
+ self.assertEqual((yield itor_1.ack), 0)
+ self.assertEqual((yield itor_2.ack), 1)
+ self.assertEqual((yield itor_3.ack), 0)
+
+ yield itor_1.cyc.eq(1)
+ yield itor_2.cyc.eq(0)
+ yield itor_3.cyc.eq(1)
+ yield Tick()
+ yield Delay(1e-7)
+ self.assertEqual((yield itor_1.ack), 0)
+ self.assertEqual((yield itor_2.ack), 0)
+ self.assertEqual((yield itor_3.ack), 1)
+
+ with Simulator(dut, vcd_file=open("test.vcd", "w")) as sim:
+ sim.add_clock(1e-6)
+ sim.add_sync_process(sim_test())
+ sim.run()
++
++
+class InterconnectSharedSimulationTestCase(unittest.TestCase):
+ def setUp(self):
+ self.shared = Interface(addr_width=30,
+ data_width=32,
+ granularity=8,
+ features={"err","cti","bte"},
+ name="shared")
+ self.master01 = Interface(addr_width=30,
+ data_width=32,
+ granularity=8,
+ features={"err","cti","bte"},
+ name="master01")
+ self.master02 = Record([
+ ("adr", 30, DIR_FANOUT),
+ ("dat_w", 32, DIR_FANOUT),
+ ("dat_r", 32, DIR_FANIN),
+ ("sel", 4, DIR_FANOUT),
+ ("cyc", 1, DIR_FANOUT),
+ ("stb", 1, DIR_FANOUT),
+ ("ack", 1, DIR_FANIN),
+ ("we", 1, DIR_FANOUT),
+ ("cti", 3, DIR_FANOUT),
+ ("bte", 2, DIR_FANOUT),
+ ("err", 1, DIR_FANIN)
+ ])
+ self.ram = SRAM(Memory(width=32, depth=2048, init=[]),
+ granularity=8, features={"err","cti","bte"})
+ self.sub01 = Interface(addr_width=21,
+ data_width=32,
+ granularity=8,
+ features={"err","cti","bte"},
+ name="sub01")
+ self.dut = InterconnectShared(
- (self.ram.bus, 0),
++ addr_width=30, data_width=32, granularity=8,
++ features={"err","cti","bte"},
++ itors=[
+ self.master01,
+ self.master02
+ ],
+ targets=[
- yield
++ (self.ram.bus, 0),
+ (self.sub01, (2**21) << 2)
+ ]
+ )
+
+ def test_basic(self):
+ def sim_test():
+ yield self.master01.adr.eq(0)
+ yield self.master02.adr.eq(2**21)
+ yield self.master01.we.eq(0)
+ yield self.master02.we.eq(0)
+ #
+ for _ in range(5):
+ yield self.master01.cyc.eq(1)
+ yield self.master02.cyc.eq(1)
++ yield
+ ram_cyc = (yield self.ram.bus.cyc)
+ sub01_cyc = (yield self.sub01.cyc)
+ if ram_cyc == 1:
+ yield self.master01.stb.eq(1)
+ yield
+ yield self.ram.bus.ack.eq(1)
+ yield self.master01.stb.eq(0)
+ yield
+ yield self.ram.bus.ack.eq(0)
+ yield self.master01.cyc.eq(0)
+ elif sub01_cyc == 1:
+ yield self.master02.stb.eq(1)
+ yield
+ yield self.sub01.ack.eq(1)
+ yield self.master02.stb.eq(0)
+ yield
+ yield self.sub01.ack.eq(0)
+ yield self.master02.cyc.eq(0)
+ yield
+
+ m = Module()
+ m.submodules += self.dut, self.ram
+ with Simulator(m, vcd_file=open("test.vcd", "w")) as sim:
+ sim.add_clock(1e-6)
+ sim.add_sync_process(sim_test())
+ sim.run()
class Arbiter(Elaboratable):
"""Wishbone bus arbiter.
- An arbiter for selecting the Wishbone master from several devices.
- A round-robin arbiter for initiators accessing a shared Wishbone bus.
++ An arbiter for initiators (masters) to access a shared Wishbone bus.
Parameters
----------
addr_width : int
-- Address width. See :class:`Interface`.
++ Address width of the shared bus. See :class:`Interface`.
data_width : int
-- Data width. See :class:`Interface`.
++ Data width of the shared bus. See :class:`Interface`.
granularity : int
- Granularity. See :class:`Interface`.
- Granularity. See :class:`Interface`
++ Granularity of the shared bus. See :class:`Interface`.
features : iter(str)
-- Optional signal set. See :class:`Interface`.
- alignment : int
- Window alignment. See :class:`Interface`.
++ Optional signal set for the shared bus. See :class:`Interface`.
++ scheduler : str or None
++ Method for bus arbitration. Defaults to "rr" (Round Robin, see
++ :class:`scheduler.RoundRobin`).
Attributes
----------
bus : :class:`Interface`
- Bus providing access to the selected master.
- Shared Wishbone bus.
++ Shared bus to which the selected initiator gains access.
"""
- def __init__(self, *, addr_width, data_width, granularity=None, features=frozenset()):
+ def __init__(self, *, addr_width, data_width, granularity=None, features=frozenset(),
- alignment=0, scheduler="rr"):
- self.bus = Interface(addr_width=addr_width, data_width=data_width,
- granularity=granularity, features=features,
- alignment=alignment)
- self._masters = dict()
++ scheduler="rr"):
+ self.bus = Interface(addr_width=addr_width, data_width=data_width,
+ granularity=granularity, features=features)
+ self._itors = []
+ if scheduler not in ["rr"]:
+ raise ValueError("Scheduling mode must be \"rr\", not {!r}"
+ .format(scheduler))
+ self._scheduler = scheduler
- self._next_index = 0
- def add(self, master_bus):
- """Add a device bus to the list of master candidates
+ def add(self, itor_bus):
+ """Add an initiator bus to the arbiter.
+
+ The initiator bus must have the same address width and data width as the arbiter. The
+ granularity of the initiator bus must be greater than or equal to the granularity of
+ the arbiter.
"""
- if not isinstance(master_bus, Interface):
- raise TypeError("Master bus must be an instance of wishbone.Interface, not {!r}"
- .format(master_bus))
- if master_bus.granularity != self.bus.granularity:
- raise ValueError("Master bus has granularity {}, which is not the same as "
+ if not isinstance(itor_bus, Interface):
+ raise TypeError("Initiator bus must be an instance of wishbone.Interface, not {!r}"
+ .format(itor_bus))
+ if itor_bus.addr_width != self.bus.addr_width:
+ raise ValueError("Initiator bus has address width {}, which is not the same as "
+ "arbiter address width {}"
+ .format(itor_bus.addr_width, self.bus.addr_width))
+ if itor_bus.granularity < self.bus.granularity:
+ raise ValueError("Initiator bus has granularity {}, which is lesser than the "
"arbiter granularity {}"
- .format(master_bus.granularity, self.bus.granularity))
- if master_bus.data_width != self.bus.data_width:
- raise ValueError("Master bus has data width {}, which is not the same as "
- "arbiter data width {})"
- .format(master_bus.data_width, self.bus.data_width))
- for opt_output in {"err", "rty", "stall"}:
- if hasattr(master_bus, opt_output) and not hasattr(self.bus, opt_output):
- raise ValueError("Master bus has optional output {!r}, but the arbiter "
+ .format(itor_bus.granularity, self.bus.granularity))
+ if itor_bus.data_width != self.bus.data_width:
+ raise ValueError("Initiator bus has data width {}, which is not the same as "
+ "arbiter data width {}"
+ .format(itor_bus.data_width, self.bus.data_width))
+ for opt_output in {"lock", "cti", "bte"}:
+ if hasattr(itor_bus, opt_output) and not hasattr(self.bus, opt_output):
+ raise ValueError("Initiator bus has optional output {!r}, but the arbiter "
"does not have a corresponding input"
.format(opt_output))
def elaborate(self, platform):
m = Module()
- requests = Signal(len(self._itors))
- grant = Signal(range(len(self._itors)))
- m.d.comb += requests.eq(Cat(itor_bus.cyc for itor_bus in self._itors))
+ if self._scheduler == "rr":
- m.submodules.scheduler = scheduler = RoundRobin(self._next_index)
- grant = Signal(self._next_index)
++ m.submodules.scheduler = scheduler = RoundRobin(len(self._itors))
++ grant = Signal(range(len(self._itors)))
+
++ # CYC should not be indefinitely asserted. (See RECOMMENDATION 3.05, Wishbone B4)
+ bus_busy = self.bus.cyc
+ if hasattr(self.bus, "lock"):
+ # If LOCK is not asserted, we also wait for STB to be deasserted before granting bus
+ # ownership to the next initiator. If we didn't, the next bus owner could receive
+ # an ACK (or ERR, RTY) from the previous transaction when targeting the same
+ # peripheral.
+ bus_busy &= self.bus.lock | self.bus.stb
+
- with m.If(~bus_busy):
- with m.Switch(grant):
- for i in range(len(requests)):
- with m.Case(i):
- for pred in reversed(range(i)):
- with m.If(requests[pred]):
- m.d.sync += grant.eq(pred)
- for succ in reversed(range(i + 1, len(requests))):
- with m.If(requests[succ]):
- m.d.sync += grant.eq(succ)
+ m.d.comb += [
- # CYC should not be indefinitely asserted. (See RECOMMENDATION 3.05, Wishbone B4)
- scheduler.stb.eq(~self.bus.cyc),
- grant.eq(scheduler.grant)
++ scheduler.stb.eq(~bus_busy),
++ grant.eq(scheduler.grant),
++ scheduler.request.eq(Cat(itor_bus.cyc for itor_bus in self._itors))
+ ]
- for signal_name, (_, signal_direction) in self.bus.layout.fields.items():
- # FANOUT signals: only mux the granted master with the interface
- if signal_direction == Direction.FANOUT:
- master_signals = Array(getattr(master_bus, signal_name)
- for __, (___, master_bus)
- in self._masters.items())
- m.d.comb += getattr(self.bus, signal_name).eq(master_signals[grant])
- # FANIN signals: ACK and ERR are ORed to all masters;
- # all other signals are asserted to the granted master only
- if signal_direction == Direction.FANIN:
- for __, (index, master_bus) in self._masters.items():
- source = getattr(self.bus, signal_name)
- dest = getattr(master_bus, signal_name)
- if signal_name in ["ack", "err"]:
- m.d.comb += dest.eq(source & (grant == index))
- else:
- m.d.comb += dest.eq(source)
-
- master_requests = [master_bus.cyc & ~master_bus.ack
- for __, (___, master_bus) in self._masters.items()]
- m.d.comb += scheduler.request.eq(Cat(*master_requests))
+ with m.Switch(grant):
+ for i, itor_bus in enumerate(self._itors):
+ m.d.comb += itor_bus.dat_r.eq(self.bus.dat_r)
+ if hasattr(itor_bus, "stall"):
+ itor_bus_stall = Signal(reset=1)
+ m.d.comb += itor_bus.stall.eq(itor_bus_stall)
+
+ with m.Case(i):
+ ratio = itor_bus.granularity // self.bus.granularity
+ m.d.comb += [
+ self.bus.adr.eq(itor_bus.adr),
+ self.bus.dat_w.eq(itor_bus.dat_w),
+ self.bus.sel.eq(Cat(Repl(sel, ratio) for sel in itor_bus.sel)),
+ self.bus.we.eq(itor_bus.we),
+ self.bus.stb.eq(itor_bus.stb),
+ ]
+ m.d.comb += self.bus.cyc.eq(itor_bus.cyc)
+ if hasattr(self.bus, "lock"):
+ m.d.comb += self.bus.lock.eq(getattr(itor_bus, "lock", 1))
+ if hasattr(self.bus, "cti"):
+ m.d.comb += self.bus.cti.eq(getattr(itor_bus, "cti", CycleType.CLASSIC))
+ if hasattr(self.bus, "bte"):
+ m.d.comb += self.bus.bte.eq(getattr(itor_bus, "bte", BurstTypeExt.LINEAR))
+
+ m.d.comb += itor_bus.ack.eq(self.bus.ack)
+ if hasattr(itor_bus, "err"):
+ m.d.comb += itor_bus.err.eq(getattr(self.bus, "err", 0))
+ if hasattr(itor_bus, "rty"):
+ m.d.comb += itor_bus.rty.eq(getattr(self.bus, "rty", 0))
+ if hasattr(itor_bus, "stall"):
+ m.d.comb += itor_bus_stall.eq(getattr(self.bus, "stall", ~self.bus.ack))
return m
- """
- def __init__(self, shared_bus, masters, targets):
- self.addr_width = shared_bus.addr_width
- self.data_width = shared_bus.data_width
- self.granularity = shared_bus.granularity
- self._features = shared_bus._features
- self._alignment = shared_bus._alignment
-
- self._masters = []
+
+
+class InterconnectShared(Elaboratable):
++ """Wishbone bus interconnect module.
++
++ This is initialised using the following components:
++ (1) A shared Wishbone bus connecting multiple initiators (MASTERs) with
++ multiple targeted SLAVEs;
++ (2) A list of initiator Wishbone busses; and
++ (3) A list of SLAVE Wishbone busses targeted by the MASTERs.
++
++ This instantiates the following components:
++ (1) An arbiter (:class:`Arbiter`) controlling access of
++ multiple MASTERs to the shared bus; and
++ (2) A decoder (:class:`Decoder`) specifying which targeted SLAVE is to be accessed
++ using address translation on the shared bus.
++
++ See Section 8.2.3 of Wishbone B4 for implemenation specifications.
++
++ Parameters
++ ----------
++ shared_bus : :class:`Interface`
++ Shared bus for the interconnect module between the arbiter and decoder.
++ itors : list of :class:`Interface`
++ List of MASTERs on the arbiter to request access to the shared bus.
++ targets : list of :class:`Interface`
++ List of SLAVEs on the decoder whose accesses are to be targeted by the shared bus.
++
++ Attributes
++ ----------
++ addr_width : int
++ Address width of the shared bus. See :class:`Interface`.
++ data_width : int
++ Data width of the shared bus. See :class:`Interface`.
++ granularity : int
++ Granularity of the shared bus. See :class:`Interface`
+ """
- self._masters_convert_stmts = []
- for master_bus in masters:
- if isinstance(master_bus, Interface):
- self._masters.append(master_bus)
- elif isinstance(master_bus, Record):
- master_interface = Interface.from_pure_record(master_bus)
- self._masters_convert_stmts.append(
- master_bus.connect(master_interface)
++ def __init__(self, *, addr_width, data_width, itors, targets,
++ scheduler="rr", **kwargs):
++ self.addr_width = addr_width
++ self.data_width = data_width
++
++ self._itors = []
+ self._targets = []
+
- self._masters.append(master_interface)
++ self._itors_convert_stmts = []
++ for itor_bus in itors:
++ if isinstance(itor_bus, Interface):
++ self._itors.append(itor_bus)
++ elif isinstance(itor_bus, Record):
++ master_interface = Interface.from_pure_record(itor_bus)
++ self._itors_convert_stmts.append(
++ itor_bus.connect(master_interface)
+ )
- .format(master_bus))
++ self._itors.append(master_interface)
+ else:
+ raise TypeError("Master {!r} must be a Wishbone interface"
- addr_width=self.addr_width,
- data_width=self.data_width,
- granularity=self.granularity,
- features=self._features,
- alignment=self._alignment
++ .format(itor_bus))
+
+ for target_bus in targets:
+ self._targets.append(target_bus)
+
++ arbiter_kwargs = dict()
++ for name in ["granularity", "features", "scheduler"]:
++ if name in kwargs:
++ arbiter_kwargs[name] = kwargs[name]
+ self.arbiter = Arbiter(
- for master_bus in self._masters:
- self.arbiter.add(master_bus)
-
++ addr_width=self.addr_width, data_width=self.data_width, **arbiter_kwargs
+ )
- addr_width=self.addr_width,
- data_width=self.data_width,
- granularity=self.granularity,
- features=self._features,
- alignment=self._alignment
++ self.arbiter.bus.name = "arbiter_shared"
++ for itor_bus in self._itors:
++ self.arbiter.add(itor_bus)
++
++ decoder_kwargs = dict()
++ for name in ["granularity", "features", "alignment"]:
++ if name in kwargs:
++ decoder_kwargs[name] = kwargs[name]
+ self.decoder = Decoder(
- self._masters_convert_stmts +
++ addr_width=self.addr_width, data_width=self.data_width, **decoder_kwargs
+ )
++ self.decoder.bus.name = "decoder_shared"
+ for item in self._targets:
+ if isinstance(item, Interface):
+ self.decoder.add(item)
+ elif isinstance(item, tuple) and len(item) == 2:
+ self.decoder.add(item[0], addr=item[1])
+ else:
+ raise TypeError("Target must be a Wishbone interface, "
+ "or a (Wishbone interface, start address) tuple, not {!r}"
+ .format(item))
+
+ def elaborate(self, platform):
+ m = Module()
+
+ m.submodules.arbiter = self.arbiter
+ m.submodules.decoder = self.decoder
+
+ m.d.comb += (
++ self._itors_convert_stmts +
+ self.arbiter.bus.connect(self.decoder.bus)
+ )
+
+ return m