Add test for CommandChooser
authorJean THOMAS <git0@pub.jeanthomas.me>
Mon, 27 Jul 2020 16:54:57 +0000 (18:54 +0200)
committerJean THOMAS <git0@pub.jeanthomas.me>
Mon, 27 Jul 2020 16:54:57 +0000 (18:54 +0200)
gram/test/test_core_multiplexer.py

index 05e89c2f14a29fda1415a94d7600b52bb99c1625..4d5c85865025daf14fe57421cc1b73465a63e779 100644 (file)
@@ -1,8 +1,83 @@
 from nmigen import *
 
-from gram.core.multiplexer import _AntiStarvation
+from gram.core.multiplexer import _AntiStarvation, _CommandChooser
+from gram.common import cmd_request_rw_layout
+import gram.stream as stream
 from utils import *
 
+class CommandChooserTestCase(FHDLTestCase):
+    def prepare_testbench(self):
+        a = 16
+        ba = 3
+        requests = []
+        for i in range(10):
+            requests += [stream.Endpoint(cmd_request_rw_layout(a, ba))]
+        dut = _CommandChooser(requests)
+
+        return (requests, dut)
+
+    def test_wants(self):
+        requests, dut = self.prepare_testbench()
+
+        def process():
+            for i in range(10):
+                yield requests[i].a.eq(i)
+
+            # Fake requests, non valid requests shouldn't be picked
+            yield requests[5].is_read.eq(1)
+            yield requests[5].valid.eq(1)
+            yield requests[6].is_read.eq(1)
+            yield requests[6].valid.eq(0)
+            yield requests[7].is_write.eq(1)
+            yield requests[7].valid.eq(1)
+            yield requests[8].is_write.eq(1)
+            yield requests[8].valid.eq(0)
+
+            # want_writes
+            yield dut.want_writes.eq(1)
+            yield; yield Delay(1e-8)
+            self.assertEqual((yield dut.cmd.a), 7)
+
+            # want_reads
+            yield dut.want_writes.eq(0)
+            yield dut.want_reads.eq(1)
+            yield; yield Delay(1e-8)
+            self.assertEqual((yield dut.cmd.a), 5)
+
+        runSimulation(dut, process, "test_core_multiplexer_commandchooser.vcd")
+
+    def test_helpers(self):
+        requests, dut = self.prepare_testbench()
+
+        def process():
+            for i in range(10):
+                yield requests[i].a.eq(i)
+
+            # Fake requests
+            yield requests[5].is_read.eq(1)
+            yield requests[5].valid.eq(1)
+            yield requests[6].is_read.eq(1)
+            yield requests[6].valid.eq(0)
+            yield requests[7].is_write.eq(1)
+            yield requests[7].valid.eq(1)
+            yield requests[8].is_write.eq(1)
+            yield requests[8].valid.eq(0)
+
+            # want_writes
+            yield dut.want_writes.eq(1)
+            yield; yield Delay(1e-8)
+            self.assertTrue((yield dut.write()))
+            self.assertFalse((yield dut.read()))
+
+            # want_reads
+            yield dut.want_writes.eq(0)
+            yield dut.want_reads.eq(1)
+            yield; yield Delay(1e-8)
+            self.assertTrue((yield dut.read()))
+            self.assertFalse((yield dut.write()))
+
+        runSimulation(dut, process, "test_core_multiplexer_commandchooser.vcd")
+
 class AntiStarvationTestCase(FHDLTestCase):
     def test_duration(self):
         def generic_test(timeout):