csr.wishbone: add WishboneCSRBridge.
authorwhitequark <whitequark@whitequark.org>
Fri, 25 Oct 2019 23:47:52 +0000 (23:47 +0000)
committerwhitequark <whitequark@whitequark.org>
Sat, 26 Oct 2019 01:36:36 +0000 (01:36 +0000)
nmigen_soc/csr/bus.py
nmigen_soc/csr/wishbone.py [new file with mode: 0644]
nmigen_soc/test/test_csr_wishbone.py [new file with mode: 0644]

index ffb8c3b99fc2d6235cfab79e32c8034af502a981..31a237dee745382aba9094c1ff45ed0fe2a296e6 100644 (file)
@@ -201,7 +201,8 @@ class Multiplexer(Elaboratable):
         CSR bus providing access to registers.
     """
     def __init__(self, *, addr_width, data_width, alignment=0):
-        self.bus  = Interface(addr_width=addr_width, data_width=data_width, alignment=alignment)
+        self.bus  = Interface(addr_width=addr_width, data_width=data_width, alignment=alignment,
+                              name="csr")
         self._map = self.bus.memory_map
 
     def align_to(self, alignment):
@@ -305,7 +306,8 @@ class Decoder(Elaboratable):
         CSR bus providing access to subordinate buses.
     """
     def __init__(self, *, addr_width, data_width, alignment=0):
-        self.bus   = Interface(addr_width=addr_width, data_width=data_width, alignment=alignment)
+        self.bus   = Interface(addr_width=addr_width, data_width=data_width, alignment=alignment,
+                               name="csr")
         self._map  = self.bus.memory_map
         self._subs = dict()
 
diff --git a/nmigen_soc/csr/wishbone.py b/nmigen_soc/csr/wishbone.py
new file mode 100644 (file)
index 0000000..f570ece
--- /dev/null
@@ -0,0 +1,88 @@
+from nmigen import *
+from nmigen.utils import log2_int
+
+from . import Interface as CSRInterface
+from ..wishbone import Interface as WishboneInterface
+
+
+__all__ = ["WishboneCSRBridge"]
+
+
+class WishboneCSRBridge(Elaboratable):
+    """Wishbone to CSR bridge.
+
+    A bus bridge for accessing CSR registers from Wishbone. This bridge supports any Wishbone
+    data width greater or equal to CSR data width and performs appropriate address translation.
+
+    Latency
+    -------
+
+    Reads and writes always take ``self.data_width // csr_bus.data_width + 1`` cycles to complete,
+    regardless of the select inputs. Write side effects occur simultaneously with acknowledgement.
+
+    Parameters
+    ----------
+    csr_bus : :class:`..csr.Interface`
+        CSR bus driven by the bridge.
+    data_width : int or None
+        Wishbone bus data width. If not specified, defaults to ``csr_bus.data_width``.
+
+    Attributes
+    ----------
+    wb_bus : :class:`..wishbone.Interface`
+        Wishbone bus provided by the bridge.
+    """
+    def __init__(self, csr_bus, *, data_width=None):
+        if not isinstance(csr_bus, CSRInterface):
+            raise ValueError("CSR bus must be an instance of CSRInterface, not {!r}"
+                             .format(csr_bus))
+        if csr_bus.data_width not in (8, 16, 32, 64):
+            raise ValueError("CSR bus data width must be one of 8, 16, 32, 64, not {!r}"
+                             .format(csr_bus.data_width))
+        if data_width is None:
+            data_width = csr_bus.data_width
+
+        self.csr_bus = csr_bus
+        self.wb_bus  = WishboneInterface(
+            addr_width=max(0, csr_bus.addr_width - log2_int(data_width // csr_bus.data_width)),
+            data_width=data_width,
+            granularity=csr_bus.data_width,
+            name="wb")
+
+        # Since granularity of the Wishbone interface matches the data width of the CSR bus,
+        # no width conversion is performed, even if the Wishbone data width is greater.
+        self.wb_bus.memory_map.add_window(self.csr_bus.memory_map)
+
+    def elaborate(self, platform):
+        csr_bus = self.csr_bus
+        wb_bus  = self.wb_bus
+
+        m = Module()
+
+        cycle = Signal(range(len(wb_bus.sel) + 1))
+        m.d.comb += csr_bus.addr.eq(Cat(cycle[:log2_int(len(wb_bus.sel))], wb_bus.adr))
+
+        with m.If(wb_bus.cyc & wb_bus.stb):
+            with m.Switch(cycle):
+                def segment(index):
+                    return slice(index * wb_bus.granularity, (index + 1) * wb_bus.granularity)
+
+                for index, sel_index in enumerate(wb_bus.sel):
+                    with m.Case(index):
+                        if index > 0:
+                            # CSR reads are registered, and we need to re-register them.
+                            m.d.sync += wb_bus.dat_r[segment(index - 1)].eq(csr_bus.r_data)
+                        m.d.comb += csr_bus.r_stb.eq(sel_index & ~wb_bus.we)
+                        m.d.comb += csr_bus.w_data.eq(wb_bus.dat_w[segment(index)])
+                        m.d.comb += csr_bus.w_stb.eq(sel_index & wb_bus.we)
+                        m.d.sync += cycle.eq(index + 1)
+
+                with m.Default():
+                    m.d.sync += wb_bus.dat_r[segment(index)].eq(csr_bus.r_data)
+                    m.d.sync += wb_bus.ack.eq(1)
+
+        with m.Else():
+            m.d.sync += cycle.eq(0)
+            m.d.sync += wb_bus.ack.eq(0)
+
+        return m
diff --git a/nmigen_soc/test/test_csr_wishbone.py b/nmigen_soc/test/test_csr_wishbone.py
new file mode 100644 (file)
index 0000000..e7474c5
--- /dev/null
@@ -0,0 +1,216 @@
+import unittest
+from nmigen import *
+from nmigen.back.pysim import *
+
+from .. import csr
+from ..csr.wishbone import *
+
+
+class MockRegister(Elaboratable):
+    def __init__(self, width):
+        self.element = csr.Element(width, "rw")
+        self.r_count = Signal(8)
+        self.w_count = Signal(8)
+        self.data    = Signal(width)
+
+    def elaborate(self, platform):
+        m = Module()
+
+        with m.If(self.element.r_stb):
+            m.d.sync += self.r_count.eq(self.r_count + 1)
+        m.d.comb += self.element.r_data.eq(self.data)
+
+        with m.If(self.element.w_stb):
+            m.d.sync += self.w_count.eq(self.w_count + 1)
+            m.d.sync += self.data.eq(self.element.w_data)
+
+        return m
+
+
+class WishboneCSRBridgeTestCase(unittest.TestCase):
+    def test_wrong_csr_bus(self):
+        with self.assertRaisesRegex(ValueError,
+                r"CSR bus must be an instance of CSRInterface, not 'foo'"):
+            WishboneCSRBridge(csr_bus="foo")
+
+    def test_wrong_csr_bus_data_width(self):
+        with self.assertRaisesRegex(ValueError,
+                r"CSR bus data width must be one of 8, 16, 32, 64, not 7"):
+            WishboneCSRBridge(csr_bus=csr.Interface(addr_width=10, data_width=7))
+
+    def test_narrow(self):
+        mux   = csr.Multiplexer(addr_width=10, data_width=8)
+        reg_1 = MockRegister(8)
+        mux.add(reg_1.element)
+        reg_2 = MockRegister(16)
+        mux.add(reg_2.element)
+        dut   = WishboneCSRBridge(mux.bus)
+
+        def sim_test():
+            yield dut.wb_bus.cyc.eq(1)
+            yield dut.wb_bus.sel.eq(0b1)
+
+            yield dut.wb_bus.we.eq(1)
+
+            yield dut.wb_bus.adr.eq(0)
+            yield dut.wb_bus.stb.eq(1)
+            yield dut.wb_bus.dat_w.eq(0x55)
+            yield
+            yield
+            yield dut.wb_bus.stb.eq(0)
+            yield
+            self.assertEqual((yield dut.wb_bus.ack), 1)
+            self.assertEqual((yield reg_1.r_count), 0)
+            self.assertEqual((yield reg_1.w_count), 1)
+            self.assertEqual((yield reg_1.data), 0x55)
+
+            yield dut.wb_bus.adr.eq(1)
+            yield dut.wb_bus.stb.eq(1)
+            yield dut.wb_bus.dat_w.eq(0xaa)
+            yield
+            yield
+            yield dut.wb_bus.stb.eq(0)
+            yield
+            self.assertEqual((yield dut.wb_bus.ack), 1)
+            self.assertEqual((yield reg_2.r_count), 0)
+            self.assertEqual((yield reg_2.w_count), 0)
+            self.assertEqual((yield reg_2.data), 0)
+
+            yield dut.wb_bus.adr.eq(2)
+            yield dut.wb_bus.stb.eq(1)
+            yield dut.wb_bus.dat_w.eq(0xbb)
+            yield
+            yield
+            yield dut.wb_bus.stb.eq(0)
+            yield
+            self.assertEqual((yield dut.wb_bus.ack), 1)
+            self.assertEqual((yield reg_2.r_count), 0)
+            self.assertEqual((yield reg_2.w_count), 1)
+            self.assertEqual((yield reg_2.data), 0xbbaa)
+
+            yield dut.wb_bus.we.eq(0)
+
+            yield dut.wb_bus.adr.eq(0)
+            yield dut.wb_bus.stb.eq(1)
+            yield
+            yield
+            yield dut.wb_bus.stb.eq(0)
+            yield
+            self.assertEqual((yield dut.wb_bus.ack), 1)
+            self.assertEqual((yield dut.wb_bus.dat_r), 0x55)
+            self.assertEqual((yield reg_1.r_count), 1)
+            self.assertEqual((yield reg_1.w_count), 1)
+
+            yield dut.wb_bus.adr.eq(1)
+            yield dut.wb_bus.stb.eq(1)
+            yield
+            yield
+            yield dut.wb_bus.stb.eq(0)
+            yield
+            self.assertEqual((yield dut.wb_bus.ack), 1)
+            self.assertEqual((yield dut.wb_bus.dat_r), 0xaa)
+            self.assertEqual((yield reg_2.r_count), 1)
+            self.assertEqual((yield reg_2.w_count), 1)
+
+            yield reg_2.data.eq(0x33333)
+
+            yield dut.wb_bus.adr.eq(2)
+            yield dut.wb_bus.stb.eq(1)
+            yield
+            yield
+            yield dut.wb_bus.stb.eq(0)
+            yield
+            self.assertEqual((yield dut.wb_bus.ack), 1)
+            self.assertEqual((yield dut.wb_bus.dat_r), 0xbb)
+            self.assertEqual((yield reg_2.r_count), 1)
+            self.assertEqual((yield reg_2.w_count), 1)
+
+        m = Module()
+        m.submodules += mux, reg_1, reg_2, dut
+        with Simulator(m, vcd_file=open("test.vcd", "w")) as sim:
+            sim.add_clock(1e-6)
+            sim.add_sync_process(sim_test())
+            sim.run()
+
+    def test_wide(self):
+        mux = csr.Multiplexer(addr_width=10, data_width=8)
+        reg = MockRegister(32)
+        mux.add(reg.element)
+        dut = WishboneCSRBridge(mux.bus, data_width=32)
+
+        def sim_test():
+            yield dut.wb_bus.cyc.eq(1)
+            yield dut.wb_bus.adr.eq(0)
+
+            yield dut.wb_bus.we.eq(1)
+
+            yield dut.wb_bus.dat_w.eq(0x44332211)
+            yield dut.wb_bus.sel.eq(0b1111)
+            yield dut.wb_bus.stb.eq(1)
+            yield
+            yield
+            yield
+            yield
+            yield
+            yield dut.wb_bus.stb.eq(0)
+            yield
+            self.assertEqual((yield dut.wb_bus.ack), 1)
+            self.assertEqual((yield reg.r_count), 0)
+            self.assertEqual((yield reg.w_count), 1)
+            self.assertEqual((yield reg.data), 0x44332211)
+
+            # partial write
+            yield dut.wb_bus.dat_w.eq(0xaabbccdd)
+            yield dut.wb_bus.sel.eq(0b0110)
+            yield dut.wb_bus.stb.eq(1)
+            yield
+            yield
+            yield
+            yield
+            yield
+            yield dut.wb_bus.stb.eq(0)
+            yield
+            self.assertEqual((yield dut.wb_bus.ack), 1)
+            self.assertEqual((yield reg.r_count), 0)
+            self.assertEqual((yield reg.w_count), 1)
+            self.assertEqual((yield reg.data), 0x44332211)
+
+            yield dut.wb_bus.we.eq(0)
+
+            yield dut.wb_bus.sel.eq(0b1111)
+            yield dut.wb_bus.stb.eq(1)
+            yield
+            yield
+            yield
+            yield
+            yield
+            yield dut.wb_bus.stb.eq(0)
+            yield
+            self.assertEqual((yield dut.wb_bus.ack), 1)
+            self.assertEqual((yield dut.wb_bus.dat_r), 0x44332211)
+            self.assertEqual((yield reg.r_count), 1)
+            self.assertEqual((yield reg.w_count), 1)
+
+            yield reg.data.eq(0xaaaaaaaa)
+
+            # partial read
+            yield dut.wb_bus.sel.eq(0b0110)
+            yield dut.wb_bus.stb.eq(1)
+            yield
+            yield
+            yield
+            yield
+            yield
+            yield dut.wb_bus.stb.eq(0)
+            yield
+            self.assertEqual((yield dut.wb_bus.ack), 1)
+            self.assertEqual((yield dut.wb_bus.dat_r), 0x00332200)
+            self.assertEqual((yield reg.r_count), 1)
+            self.assertEqual((yield reg.w_count), 1)
+
+        m = Module()
+        m.submodules += mux, reg, dut
+        with Simulator(m, vcd_file=open("test.vcd", "w")) as sim:
+            sim.add_clock(1e-6)
+            sim.add_sync_process(sim_test())
+            sim.run()