lib.fifo: add basic formal specification.
authorwhitequark <cz@m-labs.hk>
Thu, 17 Jan 2019 05:26:54 +0000 (05:26 +0000)
committerwhitequark <cz@m-labs.hk>
Thu, 17 Jan 2019 05:40:25 +0000 (05:40 +0000)
.gitignore
nmigen/lib/fifo.py
nmigen/test/test_lib_fifo.py
nmigen/test/tools.py

index e63a727133e7503708a44a78f6cbecd3224e8875..ca8796aaeafcb8ac4b88f34aa78c6ceee644f2ca 100644 (file)
@@ -4,5 +4,6 @@
 *.v
 *.vcd
 *.gtkw
+**/test/spec_*/
 /.coverage
 /htmlcov
index f97fdb479a81dd3db306dd119271ff335e0414b8..9e5bc861370c4d8caff110bd90fd33bd4841f230 100644 (file)
@@ -1,6 +1,7 @@
 """First-in first-out queues."""
 
 from .. import *
+from ..formal import *
 
 
 __all__ = ["FIFOInterface", "SyncFIFO", "SyncFIFOBuffered"]
index 606052c7ba07f4931447ebe017a3667f6675e194..50886b67ca43b6039fc93d784bce09fab259f499 100644 (file)
@@ -29,3 +29,133 @@ class FIFOSmokeTestCase(FHDLTestCase):
     def test_sync_buffered(self):
         fifo = SyncFIFO(width=8, depth=4, fwft=True)
         self.assertSyncFIFOWorks(SyncFIFOBuffered(width=8, depth=4))
+
+
+class SyncFIFOInvariants:
+    def __init__(self, fifo):
+        self.fifo = fifo
+
+    def get_fragment(self, platform):
+        m = Module()
+        m.submodules.dut = fifo = self.fifo
+
+        with m.If(Fell(ResetSignal())):
+            m.d.comb += [
+                Assert(fifo.level == 0),
+                Assert(~fifo.readable),
+                Assert(fifo.writable),
+            ]
+        with m.Elif(~ResetSignal()):
+            m.d.comb += Assert(fifo.level == (Past(fifo.level)
+                                + (Past(fifo.writable) & Past(fifo.we) & ~Past(fifo.replace))
+                                - (Past(fifo.readable) & Past(fifo.re))))
+            with m.If(fifo.level != 0):
+                m.d.comb += Assert(fifo.readable)
+            with m.If(fifo.level != fifo.depth):
+                m.d.comb += Assert(fifo.writable)
+
+            with m.If(Past(1)):
+                with m.If(~Past(fifo.re)):
+                    # Unless `re` is asserted, output should not change, other than for the case of
+                    # an empty FWFT queue, or an FWFT queue with a single entry being replaced.
+                    if fifo.fwft:
+                        with m.If((fifo.level == 1) & Past(fifo.we) &
+                                  (Past(fifo.writable) | Past(fifo.replace))):
+                            m.d.comb += Assert(fifo.dout == Past(fifo.din))
+                        with m.Else():
+                            m.d.comb += Assert(fifo.dout == Past(fifo.dout))
+                    else:
+                        m.d.comb += Assert(fifo.dout == Past(fifo.dout))
+
+        return m.lower(platform)
+
+
+class SyncFIFOBufferedInvariants:
+    def __init__(self, fifo):
+        self.fifo = fifo
+
+    def get_fragment(self, platform):
+        m = Module()
+        m.submodules.dut = fifo = self.fifo
+
+        with m.If(Fell(ResetSignal())):
+            m.d.comb += [
+                Assert(fifo.level == 0),
+                Assert(~fifo.readable),
+                Assert(fifo.writable),
+            ]
+        with m.Elif(~ResetSignal()):
+            m.d.comb += Assert(fifo.level == (Past(fifo.level)
+                                + (Past(fifo.writable) & Past(fifo.we))
+                                - (Past(fifo.readable) & Past(fifo.re))))
+            with m.If(fifo.level == 0):
+                m.d.comb += Assert(~fifo.readable)
+            with m.If(fifo.level == 1):
+                # When there is one entry in the queue, it might be either in the inner unbuffered
+                # queue memory, or in its output register.
+                with m.If(Past(fifo.readable)):
+                    # On the previous cycle, there was an entry in output register.
+                    with m.If(Past(fifo.level) == 1):
+                        # It was the only entry in the queue, so it's only there if it was
+                        # not read.
+                        m.d.comb += Assert(fifo.readable == ~Past(fifo.re))
+                    with m.Else():
+                        # There were more entries in the queue, which would refil the output
+                        # register, if necessary.
+                        m.d.comb += Assert(fifo.readable)
+                with m.Elif(~Fell(ResetSignal(), 1)):
+                    # On the previous cycle, there was no entry in the output register, so there is
+                    # one only if it was written two cycles back.
+                    m.d.comb += Assert(fifo.readable == Past(fifo.we, 2))
+            with m.If(fifo.level > 1):
+                m.d.comb += Assert(fifo.readable)
+            with m.If(fifo.level != fifo.depth):
+                m.d.comb += Assert(fifo.writable)
+
+            with m.If(~Past(fifo.re)):
+                # Unless `re` is asserted, output should not change, other than for the case of
+                # an empty FWFT queue, where it changes with latency 1.
+                with m.If(fifo.readable & ~Past(fifo.readable) &
+                          Past(fifo.we, 2) & Past(fifo.writable, 2)):
+                    m.d.comb += Assert(fifo.dout == Past(fifo.din, 2))
+                with m.Else():
+                    m.d.comb += Assert(fifo.dout == Past(fifo.dout))
+
+        return m.lower(platform)
+
+
+class FIFOFormalCase(FHDLTestCase):
+    def test_sync_fwft_pot(self):
+        fifo = SyncFIFO(width=8, depth=4, fwft=True)
+        self.assertFormal(SyncFIFOInvariants(fifo),
+                          mode="prove", depth=fifo.depth * 2)
+
+    def test_sync_fwft_npot(self):
+        fifo = SyncFIFO(width=8, depth=5, fwft=True)
+        self.assertFormal(SyncFIFOInvariants(fifo),
+                          mode="prove", depth=fifo.depth * 2)
+
+    def test_sync_not_fwft_pot(self):
+        fifo = SyncFIFO(width=8, depth=4, fwft=False)
+        self.assertFormal(SyncFIFOInvariants(fifo),
+                          mode="prove", depth=fifo.depth * 2)
+
+    def test_sync_not_fwft_npot(self):
+        fifo = SyncFIFO(width=8, depth=5, fwft=False)
+        self.assertFormal(SyncFIFOInvariants(fifo),
+                          mode="prove", depth=fifo.depth * 2)
+
+    def test_sync_buffered_pot(self):
+        fifo = SyncFIFOBuffered(width=8, depth=4)
+        self.assertFormal(SyncFIFOBufferedInvariants(fifo),
+                          mode="prove", depth=fifo.depth * 2)
+
+    def test_sync_buffered_potp1(self):
+        fifo = SyncFIFOBuffered(width=8, depth=5)
+        self.assertFormal(SyncFIFOBufferedInvariants(fifo),
+                          mode="prove", depth=fifo.depth * 2)
+
+    def test_sync_buffered_potm1(self):
+        fifo = SyncFIFOBuffered(width=8, depth=3)
+        self.assertFormal(SyncFIFOBufferedInvariants(fifo),
+                          mode="prove", depth=fifo.depth * 2)
index e1b1a3d70347c8eda9d981cd5f957d6ec47d20d0..a797de0576e342dee8b9bd4c8b28ba97738a210b 100644 (file)
@@ -1,9 +1,15 @@
+import os
 import re
+import shutil
+import subprocess
+import textwrap
+import traceback
 import unittest
 import warnings
 from contextlib import contextmanager
 
 from ..hdl.ast import *
+from ..back import rtlil
 
 
 __all__ = ["FHDLTestCase"]
@@ -43,3 +49,42 @@ class FHDLTestCase(unittest.TestCase):
         self.assertEqual(warns[0].category, category)
         if msg is not None:
             self.assertEqual(str(warns[0].message), msg)
+
+    def assertFormal(self, spec, mode="bmc", depth=1):
+        caller, *_ = traceback.extract_stack(limit=2)
+        spec_root, _ = os.path.splitext(caller.filename)
+        spec_dir = os.path.dirname(spec_root)
+        spec_name = "{}_{}".format(
+            os.path.basename(spec_root).replace("test_", "spec_"),
+            caller.name.replace("test_", "")
+        )
+
+        # The sby -f switch seems not fully functional when sby is reading from stdin.
+        if os.path.exists(os.path.join(spec_dir, spec_name)):
+            shutil.rmtree(os.path.join(spec_dir, spec_name))
+
+        config = textwrap.dedent("""\
+        [options]
+        mode {mode}
+        depth {depth}
+
+        [engines]
+        smtbmc
+
+        [script]
+        read_ilang top.il
+        prep
+
+        [file top.il]
+        {rtlil}
+        """).format(
+            mode=mode,
+            depth=depth,
+            rtlil=rtlil.convert(spec.get_fragment("formal"))
+        )
+        with subprocess.Popen(["sby", "-f", "-d", spec_name], cwd=spec_dir,
+                              universal_newlines=True,
+                              stdin=subprocess.PIPE, stdout=subprocess.PIPE) as proc:
+            stdout, stderr = proc.communicate(config)
+            if proc.returncode != 0:
+                self.fail("Formal verification failed:\n" + stdout)