--- /dev/null
+from nmigen import *
+from nmigen.utils import log2_int
+
+from . import Interface as CSRInterface
+from ..wishbone import Interface as WishboneInterface
+
+
+__all__ = ["WishboneCSRBridge"]
+
+
+class WishboneCSRBridge(Elaboratable):
+ """Wishbone to CSR bridge.
+
+ A bus bridge for accessing CSR registers from Wishbone. This bridge supports any Wishbone
+ data width greater or equal to CSR data width and performs appropriate address translation.
+
+ Latency
+ -------
+
+ Reads and writes always take ``self.data_width // csr_bus.data_width + 1`` cycles to complete,
+ regardless of the select inputs. Write side effects occur simultaneously with acknowledgement.
+
+ Parameters
+ ----------
+ csr_bus : :class:`..csr.Interface`
+ CSR bus driven by the bridge.
+ data_width : int or None
+ Wishbone bus data width. If not specified, defaults to ``csr_bus.data_width``.
+
+ Attributes
+ ----------
+ wb_bus : :class:`..wishbone.Interface`
+ Wishbone bus provided by the bridge.
+ """
+ def __init__(self, csr_bus, *, data_width=None):
+ if not isinstance(csr_bus, CSRInterface):
+ raise ValueError("CSR bus must be an instance of CSRInterface, not {!r}"
+ .format(csr_bus))
+ if csr_bus.data_width not in (8, 16, 32, 64):
+ raise ValueError("CSR bus data width must be one of 8, 16, 32, 64, not {!r}"
+ .format(csr_bus.data_width))
+ if data_width is None:
+ data_width = csr_bus.data_width
+
+ self.csr_bus = csr_bus
+ self.wb_bus = WishboneInterface(
+ addr_width=max(0, csr_bus.addr_width - log2_int(data_width // csr_bus.data_width)),
+ data_width=data_width,
+ granularity=csr_bus.data_width,
+ name="wb")
+
+ # Since granularity of the Wishbone interface matches the data width of the CSR bus,
+ # no width conversion is performed, even if the Wishbone data width is greater.
+ self.wb_bus.memory_map.add_window(self.csr_bus.memory_map)
+
+ def elaborate(self, platform):
+ csr_bus = self.csr_bus
+ wb_bus = self.wb_bus
+
+ m = Module()
+
+ cycle = Signal(range(len(wb_bus.sel) + 1))
+ m.d.comb += csr_bus.addr.eq(Cat(cycle[:log2_int(len(wb_bus.sel))], wb_bus.adr))
+
+ with m.If(wb_bus.cyc & wb_bus.stb):
+ with m.Switch(cycle):
+ def segment(index):
+ return slice(index * wb_bus.granularity, (index + 1) * wb_bus.granularity)
+
+ for index, sel_index in enumerate(wb_bus.sel):
+ with m.Case(index):
+ if index > 0:
+ # CSR reads are registered, and we need to re-register them.
+ m.d.sync += wb_bus.dat_r[segment(index - 1)].eq(csr_bus.r_data)
+ m.d.comb += csr_bus.r_stb.eq(sel_index & ~wb_bus.we)
+ m.d.comb += csr_bus.w_data.eq(wb_bus.dat_w[segment(index)])
+ m.d.comb += csr_bus.w_stb.eq(sel_index & wb_bus.we)
+ m.d.sync += cycle.eq(index + 1)
+
+ with m.Default():
+ m.d.sync += wb_bus.dat_r[segment(index)].eq(csr_bus.r_data)
+ m.d.sync += wb_bus.ack.eq(1)
+
+ with m.Else():
+ m.d.sync += cycle.eq(0)
+ m.d.sync += wb_bus.ack.eq(0)
+
+ return m
--- /dev/null
+import unittest
+from nmigen import *
+from nmigen.back.pysim import *
+
+from .. import csr
+from ..csr.wishbone import *
+
+
+class MockRegister(Elaboratable):
+ def __init__(self, width):
+ self.element = csr.Element(width, "rw")
+ self.r_count = Signal(8)
+ self.w_count = Signal(8)
+ self.data = Signal(width)
+
+ def elaborate(self, platform):
+ m = Module()
+
+ with m.If(self.element.r_stb):
+ m.d.sync += self.r_count.eq(self.r_count + 1)
+ m.d.comb += self.element.r_data.eq(self.data)
+
+ with m.If(self.element.w_stb):
+ m.d.sync += self.w_count.eq(self.w_count + 1)
+ m.d.sync += self.data.eq(self.element.w_data)
+
+ return m
+
+
+class WishboneCSRBridgeTestCase(unittest.TestCase):
+ def test_wrong_csr_bus(self):
+ with self.assertRaisesRegex(ValueError,
+ r"CSR bus must be an instance of CSRInterface, not 'foo'"):
+ WishboneCSRBridge(csr_bus="foo")
+
+ def test_wrong_csr_bus_data_width(self):
+ with self.assertRaisesRegex(ValueError,
+ r"CSR bus data width must be one of 8, 16, 32, 64, not 7"):
+ WishboneCSRBridge(csr_bus=csr.Interface(addr_width=10, data_width=7))
+
+ def test_narrow(self):
+ mux = csr.Multiplexer(addr_width=10, data_width=8)
+ reg_1 = MockRegister(8)
+ mux.add(reg_1.element)
+ reg_2 = MockRegister(16)
+ mux.add(reg_2.element)
+ dut = WishboneCSRBridge(mux.bus)
+
+ def sim_test():
+ yield dut.wb_bus.cyc.eq(1)
+ yield dut.wb_bus.sel.eq(0b1)
+
+ yield dut.wb_bus.we.eq(1)
+
+ yield dut.wb_bus.adr.eq(0)
+ yield dut.wb_bus.stb.eq(1)
+ yield dut.wb_bus.dat_w.eq(0x55)
+ yield
+ yield
+ yield dut.wb_bus.stb.eq(0)
+ yield
+ self.assertEqual((yield dut.wb_bus.ack), 1)
+ self.assertEqual((yield reg_1.r_count), 0)
+ self.assertEqual((yield reg_1.w_count), 1)
+ self.assertEqual((yield reg_1.data), 0x55)
+
+ yield dut.wb_bus.adr.eq(1)
+ yield dut.wb_bus.stb.eq(1)
+ yield dut.wb_bus.dat_w.eq(0xaa)
+ yield
+ yield
+ yield dut.wb_bus.stb.eq(0)
+ yield
+ self.assertEqual((yield dut.wb_bus.ack), 1)
+ self.assertEqual((yield reg_2.r_count), 0)
+ self.assertEqual((yield reg_2.w_count), 0)
+ self.assertEqual((yield reg_2.data), 0)
+
+ yield dut.wb_bus.adr.eq(2)
+ yield dut.wb_bus.stb.eq(1)
+ yield dut.wb_bus.dat_w.eq(0xbb)
+ yield
+ yield
+ yield dut.wb_bus.stb.eq(0)
+ yield
+ self.assertEqual((yield dut.wb_bus.ack), 1)
+ self.assertEqual((yield reg_2.r_count), 0)
+ self.assertEqual((yield reg_2.w_count), 1)
+ self.assertEqual((yield reg_2.data), 0xbbaa)
+
+ yield dut.wb_bus.we.eq(0)
+
+ yield dut.wb_bus.adr.eq(0)
+ yield dut.wb_bus.stb.eq(1)
+ yield
+ yield
+ yield dut.wb_bus.stb.eq(0)
+ yield
+ self.assertEqual((yield dut.wb_bus.ack), 1)
+ self.assertEqual((yield dut.wb_bus.dat_r), 0x55)
+ self.assertEqual((yield reg_1.r_count), 1)
+ self.assertEqual((yield reg_1.w_count), 1)
+
+ yield dut.wb_bus.adr.eq(1)
+ yield dut.wb_bus.stb.eq(1)
+ yield
+ yield
+ yield dut.wb_bus.stb.eq(0)
+ yield
+ self.assertEqual((yield dut.wb_bus.ack), 1)
+ self.assertEqual((yield dut.wb_bus.dat_r), 0xaa)
+ self.assertEqual((yield reg_2.r_count), 1)
+ self.assertEqual((yield reg_2.w_count), 1)
+
+ yield reg_2.data.eq(0x33333)
+
+ yield dut.wb_bus.adr.eq(2)
+ yield dut.wb_bus.stb.eq(1)
+ yield
+ yield
+ yield dut.wb_bus.stb.eq(0)
+ yield
+ self.assertEqual((yield dut.wb_bus.ack), 1)
+ self.assertEqual((yield dut.wb_bus.dat_r), 0xbb)
+ self.assertEqual((yield reg_2.r_count), 1)
+ self.assertEqual((yield reg_2.w_count), 1)
+
+ m = Module()
+ m.submodules += mux, reg_1, reg_2, dut
+ with Simulator(m, vcd_file=open("test.vcd", "w")) as sim:
+ sim.add_clock(1e-6)
+ sim.add_sync_process(sim_test())
+ sim.run()
+
+ def test_wide(self):
+ mux = csr.Multiplexer(addr_width=10, data_width=8)
+ reg = MockRegister(32)
+ mux.add(reg.element)
+ dut = WishboneCSRBridge(mux.bus, data_width=32)
+
+ def sim_test():
+ yield dut.wb_bus.cyc.eq(1)
+ yield dut.wb_bus.adr.eq(0)
+
+ yield dut.wb_bus.we.eq(1)
+
+ yield dut.wb_bus.dat_w.eq(0x44332211)
+ yield dut.wb_bus.sel.eq(0b1111)
+ yield dut.wb_bus.stb.eq(1)
+ yield
+ yield
+ yield
+ yield
+ yield
+ yield dut.wb_bus.stb.eq(0)
+ yield
+ self.assertEqual((yield dut.wb_bus.ack), 1)
+ self.assertEqual((yield reg.r_count), 0)
+ self.assertEqual((yield reg.w_count), 1)
+ self.assertEqual((yield reg.data), 0x44332211)
+
+ # partial write
+ yield dut.wb_bus.dat_w.eq(0xaabbccdd)
+ yield dut.wb_bus.sel.eq(0b0110)
+ yield dut.wb_bus.stb.eq(1)
+ yield
+ yield
+ yield
+ yield
+ yield
+ yield dut.wb_bus.stb.eq(0)
+ yield
+ self.assertEqual((yield dut.wb_bus.ack), 1)
+ self.assertEqual((yield reg.r_count), 0)
+ self.assertEqual((yield reg.w_count), 1)
+ self.assertEqual((yield reg.data), 0x44332211)
+
+ yield dut.wb_bus.we.eq(0)
+
+ yield dut.wb_bus.sel.eq(0b1111)
+ yield dut.wb_bus.stb.eq(1)
+ yield
+ yield
+ yield
+ yield
+ yield
+ yield dut.wb_bus.stb.eq(0)
+ yield
+ self.assertEqual((yield dut.wb_bus.ack), 1)
+ self.assertEqual((yield dut.wb_bus.dat_r), 0x44332211)
+ self.assertEqual((yield reg.r_count), 1)
+ self.assertEqual((yield reg.w_count), 1)
+
+ yield reg.data.eq(0xaaaaaaaa)
+
+ # partial read
+ yield dut.wb_bus.sel.eq(0b0110)
+ yield dut.wb_bus.stb.eq(1)
+ yield
+ yield
+ yield
+ yield
+ yield
+ yield dut.wb_bus.stb.eq(0)
+ yield
+ self.assertEqual((yield dut.wb_bus.ack), 1)
+ self.assertEqual((yield dut.wb_bus.dat_r), 0x00332200)
+ self.assertEqual((yield reg.r_count), 1)
+ self.assertEqual((yield reg.w_count), 1)
+
+ m = Module()
+ m.submodules += mux, reg, dut
+ with Simulator(m, vcd_file=open("test.vcd", "w")) as sim:
+ sim.add_clock(1e-6)
+ sim.add_sync_process(sim_test())
+ sim.run()