nmigen.lib.cdc: port PulseSynchronizer.
authorawygle <awygle@gmail.com>
Sun, 16 Feb 2020 06:51:53 +0000 (22:51 -0800)
committerGitHub <noreply@github.com>
Sun, 16 Feb 2020 06:51:53 +0000 (06:51 +0000)
Co-authored-by: Luke Wren <wren6991@gmail.com>
nmigen/lib/cdc.py
nmigen/test/test_lib_cdc.py

index 2787efae6eca5cb6b9d25bb96d2e66edcb680b6e..10a5c7d94af646d2edf177120d910d3203f7b4c9 100644 (file)
@@ -2,7 +2,7 @@ from .._utils import deprecated
 from .. import *
 
 
-__all__ = ["FFSynchronizer", "ResetSynchronizer"]
+__all__ = ["FFSynchronizer", "ResetSynchronizer", "PulseSynchronizer"]
 
 
 def _check_stages(stages):
@@ -156,3 +156,47 @@ class ResetSynchronizer(Elaboratable):
             ResetSignal(self._domain).eq(flops[-1])
         ]
         return m
+
+
+class PulseSynchronizer(Elaboratable):
+    """A one-clock pulse on the input produces a one-clock pulse on the output.
+
+    If the output clock is faster than the input clock, then the input may be safely asserted at
+    100% duty cycle. Otherwise, if the clock ratio is n : 1, the input may be asserted at most once
+    in every n input clocks, else pulses may be dropped.
+    Other than this there is no constraint on the ratio of input and output clock frequency.
+
+    Parameters
+    ----------
+    i_domain : str
+        Name of input clock domain.
+    o-domain : str
+        Name of output clock domain.
+    sync_stages : int
+        Number of synchronisation flops between the two clock domains. 2 is the default, and
+        minimum safe value. High-frequency designs may choose to increase this.
+    """
+    def __init__(self, i_domain, o_domain, sync_stages=2):
+        if not isinstance(sync_stages, int) or sync_stages < 1:
+            raise TypeError("sync_stages must be a positive integer, not '{!r}'".format(sync_stages))
+
+        self.i = Signal()
+        self.o = Signal()
+        self.i_domain = i_domain
+        self.o_domain = o_domain
+        self.sync_stages = sync_stages
+
+    def elaborate(self, platform):
+        m = Module()
+
+        itoggle = Signal()
+        otoggle = Signal()
+        ff_sync = m.submodules.ff_sync = \
+            FFSynchronizer(itoggle, otoggle, o_domain=self.o_domain, stages=self.sync_stages)
+        otoggle_prev = Signal()
+
+        m.d[self.i_domain] += itoggle.eq(itoggle ^ self.i)
+        m.d[self.o_domain] += otoggle_prev.eq(otoggle)
+        m.d.comb += self.o.eq(otoggle ^ otoggle_prev)
+
+        return m
index 8c832413d38e2affe6159f2e0e761cafdc27ec61..e58ed1ebf82a4d10f217098e79bbafab753f6c4e 100644 (file)
@@ -100,3 +100,39 @@ class ResetSynchronizerTestCase(FHDLTestCase):
         sim.add_process(process)
         with sim.write_vcd("test.vcd"):
             sim.run()
+
+
+# TODO: test with distinct clocks
+class PulseSynchronizerTestCase(FHDLTestCase):
+    def test_paramcheck(self):
+        with self.assertRaises(TypeError):
+            ps = PulseSynchronizer("w", "r", sync_stages=0)
+        with self.assertRaises(TypeError):
+            ps = PulseSynchronizer("w", "r", sync_stages="abc")
+        ps = PulseSynchronizer("w", "r", sync_stages = 1)
+
+    def test_smoke(self):
+        m = Module()
+        m.domains += ClockDomain("sync")
+        ps = m.submodules.dut = PulseSynchronizer("sync", "sync")
+
+        sim = Simulator(m)
+        sim.add_clock(1e-6)
+        def process():
+            yield ps.i.eq(0)
+            # TODO: think about reset
+            for n in range(5):
+                yield Tick()
+            # Make sure no pulses are generated in quiescent state
+            for n in range(3):
+                yield Tick()
+                self.assertEqual((yield ps.o), 0)
+            # Check conservation of pulses
+            accum = 0
+            for n in range(10):
+                yield ps.i.eq(1 if n < 4 else 0)
+                yield Tick()
+                accum += yield ps.o
+            self.assertEqual(accum, 4)
+        sim.add_process(process)
+        sim.run()