.format(sub_bus))
if sub_bus.data_width != self.bus.data_width:
raise ValueError("Subordinate bus has data width {}, which is not the same as "
- "multiplexer data width {}"
+ "decoder data width {}"
.format(sub_bus.data_width, self.bus.data_width))
self._subs[sub_bus.memory_map] = sub_bus
return self._map.add_window(sub_bus.memory_map, addr=addr)
r_data_fanin = 0
with m.Switch(self.bus.addr):
- for sub_map, sub_pat in self._map.window_patterns():
+ for sub_map, (sub_pat, sub_ratio) in self._map.window_patterns():
+ assert sub_ratio == 1
+
sub_bus = self._subs[sub_map]
m.d.comb += sub_bus.addr.eq(self.bus.addr[:sub_bus.addr_width])
with m.Default():
m.d.sync += wb_bus.dat_r[segment(index)].eq(csr_bus.r_data)
m.d.sync += wb_bus.ack.eq(1)
+ m.d.sync += cycle.eq(0)
with m.Else():
- m.d.sync += cycle.eq(0)
m.d.sync += wb_bus.ack.eq(0)
+ m.d.sync += cycle.eq(0)
return m
Yield values
------------
- A tuple ``window, pattern`` describing the address range assigned to the window.
+ A tuple ``window, (pattern, ratio)`` describing the address range assigned to the window.
``pattern`` is a ``self.addr_width`` wide pattern that may be used in ``Case`` or ``match``
- to determine if an address signal is within the address range of ``window``.
+ to determine if an address signal is within the address range of ``window``. When bridging
+ buses of unequal data width, ``ratio`` is the amount of contiguous addresses on
+ the narrower bus that are accessed for each transaction on the wider bus. Otherwise,
+ it is always 1.
"""
for window, window_range in self._windows.items():
pattern = "{:0{}b}{}".format(window_range.start >> window.addr_width,
self.addr_width - window.addr_width,
"-" * window.addr_width)
- yield window, pattern
+ yield window, (pattern, window_range.step)
@staticmethod
def _translate(start, end, width, window, window_range):
+# nmigen: UnusedElaboratable=no
+
import unittest
from nmigen import *
from nmigen.hdl.rec import Layout
class MultiplexerTestCase(unittest.TestCase):
def setUp(self):
self.dut = Multiplexer(addr_width=16, data_width=8)
- Fragment.get(self.dut, platform=None) # silence UnusedElaboratable
def test_add_4b(self):
self.assertEqual(self.dut.add(Element(4, "rw")),
class MultiplexerAlignedTestCase(unittest.TestCase):
def setUp(self):
self.dut = Multiplexer(addr_width=16, data_width=8, alignment=2)
- Fragment.get(self.dut, platform=None) # silence UnusedElaboratable
def test_add_two(self):
self.assertEqual(self.dut.add(Element(8, "rw")),
class DecoderTestCase(unittest.TestCase):
def setUp(self):
self.dut = Decoder(addr_width=16, data_width=8)
- Fragment.get(self.dut, platform=None) # silence UnusedElaboratable
def test_align_to(self):
self.assertEqual(self.dut.add(Interface(addr_width=10, data_width=8)),
with self.assertRaisesRegex(ValueError,
r"Subordinate bus has data width 16, which is not the same as "
- r"multiplexer data width 8"):
+ r"decoder data width 8"):
self.dut.add(mux.bus)
def test_sim(self):
+# nmigen: UnusedElaboratable=no
+
import unittest
from nmigen import *
from nmigen.back.pysim import *
window_2 = MemoryMap(addr_width=12, data_width=16)
memory_map.add_window(window_2)
self.assertEqual(list(memory_map.window_patterns()), [
- (window_1, "000000----------"),
- (window_2, "0001------------"),
+ (window_1, ("000000----------", 2)),
+ (window_2, ("0001------------", 1)),
])
def test_align_to(self):
+# nmigen: UnusedElaboratable=no
+
import unittest
from nmigen import *
from nmigen.hdl.rec import *
+from nmigen.back.pysim import *
from ..wishbone.bus import *
("ack", 1, DIR_FANIN),
]))
- def test_optional(self):
+ def test_features(self):
iface = Interface(addr_width=32, data_width=32,
- optional={"rty", "err", "stall", "lock", "cti", "bte"})
+ features={"rty", "err", "stall", "lock", "cti", "bte"})
self.assertEqual(iface.layout, Layout.cast([
("adr", 32, DIR_FANOUT),
("dat_w", 32, DIR_FANOUT),
r"Granularity 32 may not be greater than data width 8"):
Interface(addr_width=0, data_width=8, granularity=32)
- def test_wrong_optional(self):
+ def test_wrong_features(self):
with self.assertRaisesRegex(ValueError,
r"Optional signal\(s\) 'foo' are not supported"):
- Interface(addr_width=0, data_width=8, optional={"foo"})
+ Interface(addr_width=0, data_width=8, features={"foo"})
+
+
+class DecoderTestCase(unittest.TestCase):
+ def setUp(self):
+ self.dut = Decoder(addr_width=31, data_width=32, granularity=16)
+
+ def test_add_align_to(self):
+ sub_1 = Interface(addr_width=15, data_width=32, granularity=16)
+ sub_2 = Interface(addr_width=15, data_width=32, granularity=16)
+ self.assertEqual(self.dut.add(sub_1), (0x00000000, 0x00010000, 1))
+ self.assertEqual(self.dut.align_to(18), 0x000040000)
+ self.assertEqual(self.dut.add(sub_2), (0x00040000, 0x00050000, 1))
+
+ def test_add_wrong(self):
+ with self.assertRaisesRegex(TypeError,
+ r"Subordinate bus must be an instance of wishbone\.Interface, not 'foo'"):
+ self.dut.add("foo")
+
+ def test_add_wrong_granularity(self):
+ with self.assertRaisesRegex(ValueError,
+ r"Subordinate bus has granularity 32, which is greater than "
+ r"the decoder granularity 16"):
+ self.dut.add(Interface(addr_width=15, data_width=32, granularity=32))
+
+ def test_add_wrong_width_dense(self):
+ with self.assertRaisesRegex(ValueError,
+ r"Subordinate bus has data width 16, which is not the same as decoder "
+ r"data width 32 \(required for dense address translation\)"):
+ self.dut.add(Interface(addr_width=15, data_width=16, granularity=16))
+
+ def test_add_wrong_granularity_sparse(self):
+ with self.assertRaisesRegex(ValueError,
+ r"Subordinate bus has data width 64, which is not the same as subordinate "
+ r"bus granularity 16 \(required for sparse address translation\)"):
+ self.dut.add(Interface(addr_width=15, data_width=64, granularity=16), sparse=True)
+
+ def test_add_wrong_optional_output(self):
+ with self.assertRaisesRegex(ValueError,
+ r"Subordinate bus has optional output 'err', but the decoder does "
+ r"not have a corresponding input"):
+ self.dut.add(Interface(addr_width=15, data_width=32, granularity=16, features={"err"}))
+
+
+class DecoderSimulationTestCase(unittest.TestCase):
+ def test_simple(self):
+ dut = Decoder(addr_width=30, data_width=32, granularity=8,
+ features={"err", "rty", "stall", "lock", "cti", "bte"})
+ sub_1 = Interface(addr_width=14, data_width=32, granularity=8)
+ dut.add(sub_1, addr=0x10000)
+ sub_2 = Interface(addr_width=14, data_width=32, granularity=8,
+ features={"err", "rty", "stall", "lock", "cti", "bte"})
+ dut.add(sub_2)
+
+ def sim_test():
+ yield dut.bus.adr.eq(0x10400 >> 2)
+ yield dut.bus.cyc.eq(1)
+ yield dut.bus.stb.eq(1)
+ yield dut.bus.sel.eq(0b11)
+ yield dut.bus.dat_w.eq(0x12345678)
+ yield dut.bus.lock.eq(1)
+ yield dut.bus.cti.eq(CycleType.INCR_BURST)
+ yield dut.bus.bte.eq(BurstTypeExt.WRAP_4)
+ yield sub_1.ack.eq(1)
+ yield sub_1.dat_r.eq(0xabcdef01)
+ yield sub_2.dat_r.eq(0x5678abcd)
+ yield Delay(1e-6)
+ self.assertEqual((yield sub_1.adr), 0x400 >> 2)
+ self.assertEqual((yield sub_1.cyc), 1)
+ self.assertEqual((yield sub_2.cyc), 0)
+ self.assertEqual((yield sub_1.stb), 1)
+ self.assertEqual((yield sub_1.sel), 0b11)
+ self.assertEqual((yield sub_1.dat_w), 0x12345678)
+ self.assertEqual((yield dut.bus.ack), 1)
+ self.assertEqual((yield dut.bus.err), 0)
+ self.assertEqual((yield dut.bus.rty), 0)
+ self.assertEqual((yield dut.bus.dat_r), 0xabcdef01)
+
+ yield dut.bus.adr.eq(0x20400 >> 2)
+ yield sub_1.ack.eq(0)
+ yield sub_2.err.eq(1)
+ yield sub_2.rty.eq(1)
+ yield sub_2.stall.eq(1)
+ yield Delay(1e-6)
+ self.assertEqual((yield sub_2.adr), 0x400 >> 2)
+ self.assertEqual((yield sub_1.cyc), 0)
+ self.assertEqual((yield sub_2.cyc), 1)
+ self.assertEqual((yield sub_1.stb), 1)
+ self.assertEqual((yield sub_1.sel), 0b11)
+ self.assertEqual((yield sub_1.dat_w), 0x12345678)
+ self.assertEqual((yield sub_2.lock), 1)
+ self.assertEqual((yield sub_2.cti), CycleType.INCR_BURST.value)
+ self.assertEqual((yield sub_2.bte), BurstTypeExt.WRAP_4.value)
+ self.assertEqual((yield dut.bus.ack), 0)
+ self.assertEqual((yield dut.bus.err), 1)
+ self.assertEqual((yield dut.bus.rty), 1)
+ self.assertEqual((yield dut.bus.stall), 1)
+ self.assertEqual((yield dut.bus.dat_r), 0x5678abcd)
+
+ with Simulator(dut, vcd_file=open("test.vcd", "w")) as sim:
+ sim.add_process(sim_test())
+ sim.run()
+
+ def test_addr_translate(self):
+ class AddressLoopback(Elaboratable):
+ def __init__(self, **kwargs):
+ self.bus = Interface(**kwargs)
+
+ def elaborate(self, platform):
+ m = Module()
+
+ for index, sel_bit in enumerate(self.bus.sel):
+ with m.If(sel_bit):
+ segment = self.bus.dat_r.word_select(index, self.bus.granularity)
+ m.d.comb += segment.eq(self.bus.adr + index)
+
+ return m
+
+ dut = Decoder(addr_width=20, data_width=32, granularity=16)
+ loop_1 = AddressLoopback(addr_width=7, data_width=32, granularity=16)
+ self.assertEqual(dut.add(loop_1.bus, addr=0x10000),
+ (0x10000, 0x10100, 1))
+ loop_2 = AddressLoopback(addr_width=6, data_width=32, granularity=8)
+ self.assertEqual(dut.add(loop_2.bus, addr=0x20000),
+ (0x20000, 0x20080, 2))
+ loop_3 = AddressLoopback(addr_width=8, data_width=16, granularity=16)
+ self.assertEqual(dut.add(loop_3.bus, addr=0x30000, sparse=True),
+ (0x30000, 0x30100, 1))
+ loop_4 = AddressLoopback(addr_width=8, data_width=8, granularity=8)
+ self.assertEqual(dut.add(loop_4.bus, addr=0x40000, sparse=True),
+ (0x40000, 0x40100, 1))
+
+ def sim_test():
+ yield dut.bus.cyc.eq(1)
+
+ yield dut.bus.adr.eq(0x10010 >> 1)
+
+ yield dut.bus.sel.eq(0b11)
+ yield Delay(1e-6)
+ self.assertEqual((yield dut.bus.dat_r), 0x00090008)
+
+ yield dut.bus.sel.eq(0b01)
+ yield Delay(1e-6)
+ self.assertEqual((yield dut.bus.dat_r), 0x00000008)
+
+ yield dut.bus.sel.eq(0b10)
+ yield Delay(1e-6)
+ self.assertEqual((yield dut.bus.dat_r), 0x00090000)
+
+ yield dut.bus.adr.eq(0x20010 >> 1)
+
+ yield dut.bus.sel.eq(0b11)
+ yield Delay(1e-6)
+ self.assertEqual((yield dut.bus.dat_r), 0x13121110)
+
+ yield dut.bus.sel.eq(0b01)
+ yield Delay(1e-6)
+ self.assertEqual((yield dut.bus.dat_r), 0x00001110)
+
+ yield dut.bus.sel.eq(0b10)
+ yield Delay(1e-6)
+ self.assertEqual((yield dut.bus.dat_r), 0x13120000)
+
+ yield dut.bus.adr.eq(0x30010 >> 1)
+
+ yield dut.bus.sel.eq(0b11)
+ yield Delay(1e-6)
+ self.assertEqual((yield dut.bus.dat_r), 0x0008)
+
+ yield dut.bus.sel.eq(0b01)
+ yield Delay(1e-6)
+ self.assertEqual((yield dut.bus.dat_r), 0x0008)
+
+ yield dut.bus.sel.eq(0b10)
+ yield Delay(1e-6)
+ self.assertEqual((yield dut.bus.dat_r), 0x0000)
+
+ yield dut.bus.adr.eq(0x30012 >> 1)
+
+ yield dut.bus.sel.eq(0b11)
+ yield Delay(1e-6)
+ self.assertEqual((yield dut.bus.dat_r), 0x0009)
+
+ yield dut.bus.adr.eq(0x40010 >> 1)
+
+ yield dut.bus.sel.eq(0b11)
+ yield Delay(1e-6)
+ self.assertEqual((yield dut.bus.dat_r), 0x08)
+
+ yield dut.bus.sel.eq(0b01)
+ yield Delay(1e-6)
+ self.assertEqual((yield dut.bus.dat_r), 0x08)
+
+ yield dut.bus.sel.eq(0b10)
+ yield Delay(1e-6)
+ self.assertEqual((yield dut.bus.dat_r), 0x00)
+
+ yield dut.bus.adr.eq(0x40012 >> 1)
+
+ yield dut.bus.sel.eq(0b11)
+ yield Delay(1e-6)
+ self.assertEqual((yield dut.bus.dat_r), 0x09)
+
+ m = Module()
+ m.submodules += dut, loop_1, loop_2, loop_3, loop_4
+ with Simulator(m, vcd_file=open("test.vcd", "w")) as sim:
+ sim.add_process(sim_test())
+ sim.run()
from ..memory import MemoryMap
-__all__ = ["CycleType", "BurstTypeExt", "Interface"]
+__all__ = ["CycleType", "BurstTypeExt", "Interface", "Decoder"]
class CycleType(Enum):
granularity : int
Granularity of select signals ("port granularity" in Wishbone terminology).
One of 8, 16, 32, 64.
- optional : iter(str)
+ features : iter(str)
Selects the optional signals that will be a part of this interface.
alignment : int
Resource and window alignment. See :class:`MemoryMap`.
bte : Signal()
Optional. Corresponds to Wishbone signal ``BTE_O`` (initiator) or ``BTE_I`` (target).
"""
- def __init__(self, *, addr_width, data_width, granularity=None, optional=frozenset(),
+ def __init__(self, *, addr_width, data_width, granularity=None, features=frozenset(),
alignment=0, name=None):
if not isinstance(addr_width, int) or addr_width < 0:
raise ValueError("Address width must be a non-negative integer, not {!r}"
data_width=data_width >> granularity_bits,
alignment=alignment)
- optional = set(optional)
- unknown = optional - {"rty", "err", "stall", "lock", "cti", "bte"}
+ features = set(features)
+ unknown = features - {"rty", "err", "stall", "lock", "cti", "bte"}
if unknown:
raise ValueError("Optional signal(s) {} are not supported"
.format(", ".join(map(repr, unknown))))
("we", 1, Direction.FANOUT),
("ack", 1, Direction.FANIN),
]
- if "err" in optional:
+ if "err" in features:
layout += [("err", 1, Direction.FANIN)]
- if "rty" in optional:
+ if "rty" in features:
layout += [("rty", 1, Direction.FANIN)]
- if "stall" in optional:
+ if "stall" in features:
layout += [("stall", 1, Direction.FANIN)]
- if "lock" in optional:
+ if "lock" in features:
layout += [("lock", 1, Direction.FANOUT)]
- if "cti" in optional:
+ if "cti" in features:
layout += [("cti", CycleType, Direction.FANOUT)]
- if "bte" in optional:
+ if "bte" in features:
layout += [("bte", BurstTypeExt, Direction.FANOUT)]
super().__init__(layout, name=name, src_loc_at=1)
+
+
+class Decoder(Elaboratable):
+ """Wishbone bus decoder.
+
+ An address decoder for subordinate Wishbone buses.
+
+ Parameters
+ ----------
+ addr_width : int
+ Address width. See :class:`Interface`.
+ data_width : int
+ Data width. See :class:`Interface`.
+ granularity : int
+ Granularity. See :class:`Interface`
+ features : iter(str)
+ Optional signal set. See :class:`Interface`.
+ alignment : int
+ Window alignment. See :class:`Interface`.
+
+ Attributes
+ ----------
+ bus : :class:`Interface`
+ CSR bus providing access to subordinate buses.
+ """
+ def __init__(self, *, addr_width, data_width, granularity=None, features=frozenset(),
+ alignment=0):
+ self.bus = Interface(addr_width=addr_width, data_width=data_width,
+ granularity=granularity, features=features,
+ alignment=alignment)
+ self._map = self.bus.memory_map
+ self._subs = dict()
+
+ def align_to(self, alignment):
+ """Align the implicit address of the next window.
+
+ See :meth:`MemoryMap.align_to` for details.
+ """
+ return self._map.align_to(alignment)
+
+ def add(self, sub_bus, *, addr=None, sparse=False):
+ """Add a window to a subordinate bus.
+
+ The decoder can perform either sparse or dense address translation. If dense address
+ translation is used (the default), the subordinate bus must have the same data width as
+ the decoder; the window will be contiguous. If sparse address translation is used,
+ the subordinate bus may have data width less than the data width of the decoder;
+ the window may be discontiguous. In either case, the granularity of the subordinate bus
+ must be equal to or less than the granularity of the decoder.
+
+ See :meth:`MemoryMap.add_resource` for details.
+ """
+ if not isinstance(sub_bus, Interface):
+ raise TypeError("Subordinate bus must be an instance of wishbone.Interface, not {!r}"
+ .format(sub_bus))
+ if sub_bus.granularity > self.bus.granularity:
+ raise ValueError("Subordinate bus has granularity {}, which is greater than the "
+ "decoder granularity {}"
+ .format(sub_bus.granularity, self.bus.granularity))
+ if not sparse:
+ if sub_bus.data_width != self.bus.data_width:
+ raise ValueError("Subordinate bus has data width {}, which is not the same as "
+ "decoder data width {} (required for dense address translation)"
+ .format(sub_bus.data_width, self.bus.data_width))
+ else:
+ if sub_bus.granularity != sub_bus.data_width:
+ raise ValueError("Subordinate bus has data width {}, which is not the same as "
+ "subordinate bus granularity {} (required for sparse address "
+ "translation)"
+ .format(sub_bus.data_width, sub_bus.granularity))
+ for opt_output in {"err", "rty", "stall"}:
+ if hasattr(sub_bus, opt_output) and not hasattr(self.bus, opt_output):
+ raise ValueError("Subordinate bus has optional output {!r}, but the decoder "
+ "does not have a corresponding input"
+ .format(opt_output))
+
+ self._subs[sub_bus.memory_map] = sub_bus
+ return self._map.add_window(sub_bus.memory_map, addr=addr, sparse=sparse)
+
+ def elaborate(self, platform):
+ m = Module()
+
+ ack_fanin = 0
+ err_fanin = 0
+ rty_fanin = 0
+ stall_fanin = 0
+
+ with m.Switch(self.bus.adr):
+ for sub_map, (sub_pat, sub_ratio) in self._map.window_patterns():
+ sub_bus = self._subs[sub_map]
+
+ m.d.comb += [
+ sub_bus.adr.eq(self.bus.adr << log2_int(sub_ratio)),
+ sub_bus.dat_w.eq(self.bus.dat_w),
+ sub_bus.sel.eq(Cat(Repl(sel, sub_ratio) for sel in self.bus.sel)),
+ sub_bus.we.eq(self.bus.we),
+ sub_bus.stb.eq(self.bus.stb),
+ ]
+ if hasattr(sub_bus, "lock"):
+ m.d.comb += sub_bus.lock.eq(getattr(self.bus, "lock", 0))
+ if hasattr(sub_bus, "cti"):
+ m.d.comb += sub_bus.cti.eq(getattr(self.bus, "cti", CycleType.CLASSIC))
+ if hasattr(sub_bus, "bte"):
+ m.d.comb += sub_bus.bte.eq(getattr(self.bus, "bte", BurstTypeExt.LINEAR))
+
+ with m.Case(sub_pat[:-log2_int(self.bus.data_width // self.bus.granularity)]):
+ m.d.comb += [
+ sub_bus.cyc.eq(self.bus.cyc),
+ self.bus.dat_r.eq(sub_bus.dat_r),
+ ]
+ ack_fanin |= sub_bus.ack
+ if hasattr(sub_bus, "err"):
+ err_fanin |= sub_bus.err
+ if hasattr(sub_bus, "rty"):
+ rty_fanin |= sub_bus.rty
+ if hasattr(sub_bus, "stall"):
+ stall_fanin |= sub_bus.stall
+
+ m.d.comb += self.bus.ack.eq(ack_fanin)
+ if hasattr(self.bus, "err"):
+ m.d.comb += self.bus.err.eq(err_fanin)
+ if hasattr(self.bus, "rty"):
+ m.d.comb += self.bus.rty.eq(rty_fanin)
+ if hasattr(self.bus, "stall"):
+ m.d.comb += self.bus.stall.eq(stall_fanin)
+
+ return m