Implement control path and unit test.
authorCesar Strauss <cestrauss@gmail.com>
Sun, 19 Jul 2020 00:39:32 +0000 (21:39 -0300)
committerCesar Strauss <cestrauss@gmail.com>
Sun, 19 Jul 2020 00:46:11 +0000 (21:46 -0300)
src/soc/experiment/alu_fsm.py

index d695a256dea93b06e6aeb3a169290b2055b82677..0ca1b11888f1ba9e2fe96a66631b1362eb26bc75 100644 (file)
@@ -20,6 +20,7 @@ from nmigen import Elaboratable, Signal, Module, Cat
 from nmigen.back.pysim import Simulator
 from nmigen.cli import rtlil
 from soc.fu.cr.cr_input_record import CompCROpSubset
+from math import log2
 
 
 class Dummy:
@@ -96,6 +97,7 @@ class Shifter(Elaboratable):
         # the control signals
         load = Signal()
         shift = Signal()
+        direction = Signal()
         # the data flow
         shift_in = Signal(self.width)
         shift_left_by_1 = Signal(self.width)
@@ -119,7 +121,7 @@ class Shifter(Elaboratable):
         with m.If(load):
             m.d.comb += next_shift.eq(shift_in)
         with m.Elif(shift):
-            with m.If(self.p.data_i.dir):
+            with m.If(direction):
                 m.d.comb += next_shift.eq(shift_right_by_1)
             with m.Else():
                 m.d.comb += next_shift.eq(shift_left_by_1)
@@ -127,7 +129,53 @@ class Shifter(Elaboratable):
         # register the next value
         m.d.sync += shift_reg.eq(next_shift)
 
-        # TODO: Implement the control path
+        # Control path
+        # ------------
+        # The idea is to have a SHIFT state where the shift register
+        # is shifted every cycle, while a counter decrements.
+        # This counter is loaded with shift amount in the initial state.
+        # The SHIFT state is left when the counter goes to zero.
+
+        # Shift counter
+        shift_width = int(log2(self.width)) + 1
+        next_count = Signal(shift_width)
+        count = Signal(shift_width, reset_less=True)
+        m.d.sync += count.eq(next_count)
+
+        with m.FSM():
+            with m.State("IDLE"):
+                m.d.comb += [
+                    # keep p.ready_o active on IDLE
+                    self.p.ready_o.eq(1),
+                    # keep loading the shift register and shift count
+                    load.eq(1),
+                    next_count.eq(self.p.data_i.shift),
+                ]
+                # capture the direction bit as well
+                m.d.sync += direction.eq(self.p.data_i.dir)
+                with m.If(self.p.valid_i):
+                    # Leave IDLE when data arrives
+                    with m.If(next_count == 0):
+                        # short-circuit for zero shift
+                        m.next = "DONE"
+                    with m.Else():
+                        m.next = "SHIFT"
+            with m.State("SHIFT"):
+                m.d.comb += [
+                    # keep shifting, while counter is not zero
+                    shift.eq(1),
+                    # decrement the shift counter
+                    next_count.eq(count - 1),
+                ]
+                with m.If(next_count == 0):
+                    # exit when shift counter goes to zero
+                    m.next = "DONE"
+            with m.State("DONE"):
+                # keep n.valid_o active while the data is not accepted
+                m.d.comb += self.n.valid_o.eq(1)
+                with m.If(self.n.ready_i):
+                    # go back to IDLE when the data is accepted
+                    m.next = "IDLE"
 
         return m
 
@@ -157,7 +205,65 @@ def test_shifter():
     with open("test_shifter.il", "w") as f:
         f.write(il)
     sim = Simulator(m)
-    # Todo: Implement Simulation
+    sim.add_clock(1e-6)
+
+    def send(data, shift, direction):
+        # present input data and assert valid_i
+        yield dut.p.data_i.data.eq(data)
+        yield dut.p.data_i.shift.eq(shift)
+        yield dut.p.data_i.dir.eq(direction)
+        yield dut.p.valid_i.eq(1)
+        yield
+        # wait for p.ready_o to be asserted
+        while not (yield dut.p.ready_o):
+            yield
+        # clear input data and negate p.valid_i
+        yield dut.p.valid_i.eq(0)
+        yield dut.p.data_i.data.eq(0)
+        yield dut.p.data_i.shift.eq(0)
+        yield dut.p.data_i.dir.eq(0)
+
+    def receive(expected):
+        # signal readiness to receive data
+        yield dut.n.ready_i.eq(1)
+        yield
+        # wait for n.valid_o to be asserted
+        while not (yield dut.n.valid_o):
+            yield
+        # read result
+        result = yield dut.n.data_o.data
+        # negate n.ready_i
+        yield dut.n.ready_i.eq(0)
+        # check result
+        assert result == expected
+
+    def producer():
+        # 13 >> 2
+        yield from send(13, 2, 1)
+        # 3 << 4
+        yield from send(3, 4, 0)
+        # 21 << 0
+        yield from send(21, 0, 0)
+
+    def consumer():
+        # the consumer is not in step with the producer, but the
+        # order of the results are preserved
+        # 13 >> 2 = 3
+        yield from receive(3)
+        # 3 << 4 = 48
+        yield from receive(48)
+        # 21 << 0 = 21
+        yield from receive(21)
+
+    sim.add_sync_process(producer)
+    sim.add_sync_process(consumer)
+    sim_writer = sim.write_vcd(
+        "test_shifter.vcd",
+        "test_shifter.gtkw",
+        traces=dut.ports()
+    )
+    with sim_writer:
+        sim.run()
 
 
 if __name__ == "__main__":