Revamp wishbone frontend (fixes #41), add tests
authorJean THOMAS <git0@pub.jeanthomas.me>
Wed, 29 Jul 2020 14:17:50 +0000 (16:17 +0200)
committerJean THOMAS <git0@pub.jeanthomas.me>
Wed, 29 Jul 2020 14:17:50 +0000 (16:17 +0200)
gram/frontend/wishbone.py
gram/test/test_frontend_wishbone.py [new file with mode: 0644]

index 64de8c71ca93412a11ec1a214f2041b369c2c2e0..19bfb8f5e616393826bac7f68efa4789a998c596 100644 (file)
@@ -12,21 +12,19 @@ from lambdasoc.periph import Peripheral
 
 
 class gramWishbone(Peripheral, Elaboratable):
-    def __init__(self, core, data_width = 32):
+    def __init__(self, core, data_width=32, granularity=8):
         super().__init__(name="wishbone")
 
-        self.dw = data_width
-        self._port = core.crossbar.get_native_port()
+        self.native_port = core.crossbar.get_native_port()
 
-        dram_size = core.size//4
-        dram_addr_width = log2_int(dram_size)
-        granularity = 8
+        self.ratio = self.native_port.data_width//data_width
 
-        self.bus = wishbone.Interface(addr_width=dram_addr_width,
-                                      data_width=self.dw, granularity=granularity)
+        addr_width = log2_int(core.size//(self.native_port.data_width//data_width))
+        self.bus = wishbone.Interface(addr_width=addr_width+log2_int(self.ratio),
+                                      data_width=data_width, granularity=granularity)
 
-        map = MemoryMap(addr_width=dram_addr_width +
-                        log2_int(granularity)-1, data_width=granularity)
+        map = MemoryMap(addr_width=addr_width+log2_int(self.ratio)+log2_int(data_width//granularity),
+            data_width=granularity)
         self.bus.memory_map = map
 
     def elaborate(self, platform):
@@ -34,53 +32,52 @@ class gramWishbone(Peripheral, Elaboratable):
 
         # Write datapath
         m.d.comb += [
-            self._port.wdata.valid.eq(self.bus.cyc & self.bus.stb & self.bus.we),
+            self.native_port.wdata.valid.eq(self.bus.cyc & self.bus.stb & self.bus.we),
         ]
 
-        with m.Switch(self.bus.adr & 0b11):
-            for i in range(4):
+        ratio_bitmask = Repl(1, log2_int(self.ratio))
+
+        with m.Switch(self.bus.adr & ratio_bitmask):
+            for i in range(self.ratio):
                 with m.Case(i):
-                    with m.If(self.bus.sel):
-                        m.d.comb += self._port.wdata.we.eq(0xF << (4*i))
-                    with m.Else():
-                        m.d.comb += self._port.wdata.we.eq(0)
+                    m.d.comb += self.native_port.wdata.we.eq(Repl(self.bus.sel, self.bus.data_width//self.bus.granularity) << (self.ratio*i))
 
-        with m.Switch(self.bus.adr & 0b11):
-            for i in range(4):
+        with m.Switch(self.bus.adr & ratio_bitmask):
+            for i in range(self.ratio):
                 with m.Case(i):
-                    m.d.comb += self._port.wdata.data.eq(self.bus.dat_w << (32*i))
+                    m.d.comb += self.native_port.wdata.data.eq(self.bus.dat_w << (self.bus.data_width*i))
 
         # Read datapath
         m.d.comb += [
-            self._port.rdata.ready.eq(1),
+            self.native_port.rdata.ready.eq(1),
         ]
 
-        with m.Switch(self.bus.adr & 0b11):
-            for i in range(4):
+        with m.Switch(self.bus.adr & ratio_bitmask):
+            for i in range(self.ratio):
                 with m.Case(i):
-                    m.d.comb += self.bus.dat_r.eq(self._port.rdata.data >> (32*i))
+                    m.d.comb += self.bus.dat_r.eq(self.native_port.rdata.data >> (self.bus.data_width*i))
 
         with m.FSM():
             with m.State("Send-Cmd"):
                 m.d.comb += [
-                    self._port.cmd.valid.eq(self.bus.cyc & self.bus.stb),
-                    self._port.cmd.we.eq(self.bus.we),
-                    self._port.cmd.addr.eq(self.bus.adr >> 2),
+                    self.native_port.cmd.valid.eq(self.bus.cyc & self.bus.stb),
+                    self.native_port.cmd.we.eq(self.bus.we),
+                    self.native_port.cmd.addr.eq(self.bus.adr >> log2_int(self.bus.data_width//self.bus.granularity)),
                 ]
 
-                with m.If(self._port.cmd.valid & self._port.cmd.ready):
+                with m.If(self.native_port.cmd.valid & self.native_port.cmd.ready):
                     with m.If(self.bus.we):
                         m.next = "Wait-Write"
                     with m.Else():
                         m.next = "Wait-Read"
 
             with m.State("Wait-Read"):
-                with m.If(self._port.rdata.valid):
+                with m.If(self.native_port.rdata.valid):
                     m.d.comb += self.bus.ack.eq(1)
                     m.next = "Send-Cmd"
 
             with m.State("Wait-Write"):
-                with m.If(self._port.wdata.ready):
+                with m.If(self.native_port.wdata.ready):
                     m.d.comb += self.bus.ack.eq(1)
                     m.next = "Send-Cmd"
 
diff --git a/gram/test/test_frontend_wishbone.py b/gram/test/test_frontend_wishbone.py
new file mode 100644 (file)
index 0000000..a4ebe1b
--- /dev/null
@@ -0,0 +1,202 @@
+#nmigen: UnusedElaboratable=no
+
+from nmigen import *
+from lambdasoc.periph import Peripheral
+
+from gram.test.utils import *
+
+from gram.common import gramNativePort
+from gram.frontend.wishbone import gramWishbone
+
+class FakeGramCrossbar:
+    def __init__(self):
+        self.port = gramNativePort("both", 3, 128)
+
+    def get_native_port(self):
+        return self.port
+
+class FakeGramCore:
+    def __init__(self):
+        self.crossbar = FakeGramCrossbar()
+        self.size = 2**3*128//8
+
+class GramWishboneTestCase(FHDLTestCase):
+    def read_request(self, *, bus, native_port, adr, sel, reference_value, timeout=128):
+        # Send a read request
+        yield bus.adr.eq(adr)
+        yield bus.stb.eq(1)
+        yield bus.cyc.eq(1)
+        yield bus.sel.eq(sel)
+        yield bus.we.eq(0)
+        yield
+
+        # Answer cmd
+        yield native_port.cmd.ready.eq(1)
+        yield
+
+        # Answer rdata
+        yield native_port.rdata.data.eq(reference_value)
+        yield native_port.rdata.valid.eq(1)
+        yield
+
+        while not (yield bus.ack):
+            timeout -= 1
+            yield
+            self.assertTrue(timeout > 0)
+
+        res = yield bus.dat_r
+
+        yield bus.stb.eq(0)
+        yield bus.cyc.eq(0)
+        yield native_port.rdata.valid.eq(0)
+        yield
+
+        return res
+
+    def write_request(self, *, bus, native_port, adr, sel, value, timeout=128):
+        # Send a write request
+        yield bus.adr.eq(adr)
+        yield bus.stb.eq(1)
+        yield bus.cyc.eq(1)
+        yield bus.sel.eq(sel)
+        yield bus.we.eq(1)
+        yield bus.dat_w.eq(value)
+        yield
+
+        # Answer cmd
+        yield native_port.cmd.ready.eq(1)
+        yield
+
+        # Answer wdata
+        yield native_port.wdata.ready.eq(1)
+
+        while not (yield bus.ack):
+            timeout -= 1
+            yield
+            self.assertTrue(timeout > 0)
+
+        res = yield native_port.wdata.data
+
+        yield bus.stb.eq(0)
+        yield bus.cyc.eq(0)
+        yield native_port.wdata.ready.eq(0)
+        yield
+
+        return res
+
+    def read_test(self, *, data_width, granularity):
+        core = FakeGramCore()
+        native_port = core.crossbar.get_native_port()
+        dut = gramWishbone(core, data_width=data_width, granularity=granularity)
+
+        def process():
+            # Initialize native port
+            yield native_port.cmd.ready.eq(0)
+            yield native_port.wdata.ready.eq(0)
+            yield native_port.rdata.valid.eq(0)
+
+            reference_value = 0xBADDCAFE_FEEDFACE_BEEFCAFE_BAD0DAB0
+
+            data_granularity_radio = data_width//granularity
+
+            for i in range(native_port.data_width//data_width):
+                res = yield from self.read_request(bus=dut.bus,
+                    native_port=native_port,
+                    adr=i,
+                    sel=2**data_granularity_radio-1,
+                    reference_value=reference_value)
+                self.assertEqual(res, (reference_value >> (i*data_width)) & 2**data_width-1)
+
+        runSimulation(dut, process, "test_frontend_wishbone.vcd")
+
+
+    def write_test(self, *, data_width, granularity):
+        core = FakeGramCore()
+        native_port = core.crossbar.get_native_port()
+        dut = gramWishbone(core, data_width=data_width, granularity=granularity)
+
+        def process():
+            # Initialize native port
+            yield native_port.cmd.ready.eq(0)
+            yield native_port.wdata.ready.eq(0)
+            yield native_port.rdata.valid.eq(0)
+
+            reference_value = 0xBADDCAFE_FEEDFACE_BEEFCAFE_BAD0DAB0
+
+            data_granularity_radio = data_width//granularity
+
+            for i in range(native_port.data_width//data_width):
+                res = yield from self.write_request(bus=dut.bus,
+                    native_port=native_port,
+                    adr=i,
+                    sel=2**data_granularity_radio-1,
+                    value=(reference_value >> (i*data_width)) & 2**data_width-1)
+                self.assertEqual((reference_value >> (i*data_width)) & 2**data_width-1, (res >> (i*data_width)) & 2**data_width-1)
+
+        runSimulation(dut, process, "test_frontend_wishbone.vcd")
+
+    def test_init(self):
+        core = FakeGramCore()
+        dut = gramWishbone(core, data_width=32, granularity=8)
+        self.assertEqual(dut.bus.data_width, 32)
+        self.assertEqual(dut.bus.granularity, 8)
+
+    def test_read8_8(self):
+        self.read_test(data_width=8, granularity=8)
+
+    def test_read16_8(self):
+        self.read_test(data_width=16, granularity=8)
+
+    def test_read16_16(self):
+        self.read_test(data_width=16, granularity=16)
+
+    def test_read32_8(self):
+        self.read_test(data_width=32, granularity=8)
+
+    def test_read32_16(self):
+        self.read_test(data_width=32, granularity=16)
+
+    def test_read32_32(self):
+        self.read_test(data_width=32, granularity=32)
+
+    def test_read64_8(self):
+        self.read_test(data_width=64, granularity=8)
+
+    def test_read64_16(self):
+        self.read_test(data_width=64, granularity=16)
+
+    def test_read64_32(self):
+        self.read_test(data_width=64, granularity=32)
+
+    def test_read64_64(self):
+        self.read_test(data_width=64, granularity=64)
+
+    def test_write8_8(self):
+        self.write_test(data_width=8, granularity=8)
+
+    def test_write16_8(self):
+        self.write_test(data_width=16, granularity=8)
+
+    def test_write16_16(self):
+        self.write_test(data_width=16, granularity=16)
+
+    def test_write32_8(self):
+        self.write_test(data_width=32, granularity=8)
+
+    def test_write32_16(self):
+        self.write_test(data_width=32, granularity=16)
+
+    def test_write32_32(self):
+        self.write_test(data_width=32, granularity=32)
+
+    def test_write64_8(self):
+        self.write_test(data_width=64, granularity=8)
+
+    def test_write64_16(self):
+        self.write_test(data_width=64, granularity=16)
+
+    def test_write64_32(self):
+        self.write_test(data_width=64, granularity=32)
+
+    def test_write64_64(self):
+        self.write_test(data_width=64, granularity=64)