From: whitequark Date: Thu, 17 Jan 2019 05:26:54 +0000 (+0000) Subject: lib.fifo: add basic formal specification. X-Git-Tag: working~71 X-Git-Url: https://git.libre-soc.org/?a=commitdiff_plain;h=5a831ce31cf222352d1c85523e233de31eda68d1;p=nmigen.git lib.fifo: add basic formal specification. --- diff --git a/.gitignore b/.gitignore index e63a727..ca8796a 100644 --- a/.gitignore +++ b/.gitignore @@ -4,5 +4,6 @@ *.v *.vcd *.gtkw +**/test/spec_*/ /.coverage /htmlcov diff --git a/nmigen/lib/fifo.py b/nmigen/lib/fifo.py index f97fdb4..9e5bc86 100644 --- a/nmigen/lib/fifo.py +++ b/nmigen/lib/fifo.py @@ -1,6 +1,7 @@ """First-in first-out queues.""" from .. import * +from ..formal import * __all__ = ["FIFOInterface", "SyncFIFO", "SyncFIFOBuffered"] diff --git a/nmigen/test/test_lib_fifo.py b/nmigen/test/test_lib_fifo.py index 606052c..50886b6 100644 --- a/nmigen/test/test_lib_fifo.py +++ b/nmigen/test/test_lib_fifo.py @@ -29,3 +29,133 @@ class FIFOSmokeTestCase(FHDLTestCase): def test_sync_buffered(self): fifo = SyncFIFO(width=8, depth=4, fwft=True) self.assertSyncFIFOWorks(SyncFIFOBuffered(width=8, depth=4)) + + +class SyncFIFOInvariants: + def __init__(self, fifo): + self.fifo = fifo + + def get_fragment(self, platform): + m = Module() + m.submodules.dut = fifo = self.fifo + + with m.If(Fell(ResetSignal())): + m.d.comb += [ + Assert(fifo.level == 0), + Assert(~fifo.readable), + Assert(fifo.writable), + ] + with m.Elif(~ResetSignal()): + m.d.comb += Assert(fifo.level == (Past(fifo.level) + + (Past(fifo.writable) & Past(fifo.we) & ~Past(fifo.replace)) + - (Past(fifo.readable) & Past(fifo.re)))) + with m.If(fifo.level != 0): + m.d.comb += Assert(fifo.readable) + with m.If(fifo.level != fifo.depth): + m.d.comb += Assert(fifo.writable) + + with m.If(Past(1)): + with m.If(~Past(fifo.re)): + # Unless `re` is asserted, output should not change, other than for the case of + # an empty FWFT queue, or an FWFT queue with a single entry being replaced. + if fifo.fwft: + with m.If((fifo.level == 1) & Past(fifo.we) & + (Past(fifo.writable) | Past(fifo.replace))): + m.d.comb += Assert(fifo.dout == Past(fifo.din)) + with m.Else(): + m.d.comb += Assert(fifo.dout == Past(fifo.dout)) + else: + m.d.comb += Assert(fifo.dout == Past(fifo.dout)) + + return m.lower(platform) + + +class SyncFIFOBufferedInvariants: + def __init__(self, fifo): + self.fifo = fifo + + def get_fragment(self, platform): + m = Module() + m.submodules.dut = fifo = self.fifo + + with m.If(Fell(ResetSignal())): + m.d.comb += [ + Assert(fifo.level == 0), + Assert(~fifo.readable), + Assert(fifo.writable), + ] + with m.Elif(~ResetSignal()): + m.d.comb += Assert(fifo.level == (Past(fifo.level) + + (Past(fifo.writable) & Past(fifo.we)) + - (Past(fifo.readable) & Past(fifo.re)))) + with m.If(fifo.level == 0): + m.d.comb += Assert(~fifo.readable) + with m.If(fifo.level == 1): + # When there is one entry in the queue, it might be either in the inner unbuffered + # queue memory, or in its output register. + with m.If(Past(fifo.readable)): + # On the previous cycle, there was an entry in output register. + with m.If(Past(fifo.level) == 1): + # It was the only entry in the queue, so it's only there if it was + # not read. + m.d.comb += Assert(fifo.readable == ~Past(fifo.re)) + with m.Else(): + # There were more entries in the queue, which would refil the output + # register, if necessary. + m.d.comb += Assert(fifo.readable) + with m.Elif(~Fell(ResetSignal(), 1)): + # On the previous cycle, there was no entry in the output register, so there is + # one only if it was written two cycles back. + m.d.comb += Assert(fifo.readable == Past(fifo.we, 2)) + with m.If(fifo.level > 1): + m.d.comb += Assert(fifo.readable) + with m.If(fifo.level != fifo.depth): + m.d.comb += Assert(fifo.writable) + + with m.If(~Past(fifo.re)): + # Unless `re` is asserted, output should not change, other than for the case of + # an empty FWFT queue, where it changes with latency 1. + with m.If(fifo.readable & ~Past(fifo.readable) & + Past(fifo.we, 2) & Past(fifo.writable, 2)): + m.d.comb += Assert(fifo.dout == Past(fifo.din, 2)) + with m.Else(): + m.d.comb += Assert(fifo.dout == Past(fifo.dout)) + + return m.lower(platform) + + +class FIFOFormalCase(FHDLTestCase): + def test_sync_fwft_pot(self): + fifo = SyncFIFO(width=8, depth=4, fwft=True) + self.assertFormal(SyncFIFOInvariants(fifo), + mode="prove", depth=fifo.depth * 2) + + def test_sync_fwft_npot(self): + fifo = SyncFIFO(width=8, depth=5, fwft=True) + self.assertFormal(SyncFIFOInvariants(fifo), + mode="prove", depth=fifo.depth * 2) + + def test_sync_not_fwft_pot(self): + fifo = SyncFIFO(width=8, depth=4, fwft=False) + self.assertFormal(SyncFIFOInvariants(fifo), + mode="prove", depth=fifo.depth * 2) + + def test_sync_not_fwft_npot(self): + fifo = SyncFIFO(width=8, depth=5, fwft=False) + self.assertFormal(SyncFIFOInvariants(fifo), + mode="prove", depth=fifo.depth * 2) + + def test_sync_buffered_pot(self): + fifo = SyncFIFOBuffered(width=8, depth=4) + self.assertFormal(SyncFIFOBufferedInvariants(fifo), + mode="prove", depth=fifo.depth * 2) + + def test_sync_buffered_potp1(self): + fifo = SyncFIFOBuffered(width=8, depth=5) + self.assertFormal(SyncFIFOBufferedInvariants(fifo), + mode="prove", depth=fifo.depth * 2) + + def test_sync_buffered_potm1(self): + fifo = SyncFIFOBuffered(width=8, depth=3) + self.assertFormal(SyncFIFOBufferedInvariants(fifo), + mode="prove", depth=fifo.depth * 2) diff --git a/nmigen/test/tools.py b/nmigen/test/tools.py index e1b1a3d..a797de0 100644 --- a/nmigen/test/tools.py +++ b/nmigen/test/tools.py @@ -1,9 +1,15 @@ +import os import re +import shutil +import subprocess +import textwrap +import traceback import unittest import warnings from contextlib import contextmanager from ..hdl.ast import * +from ..back import rtlil __all__ = ["FHDLTestCase"] @@ -43,3 +49,42 @@ class FHDLTestCase(unittest.TestCase): self.assertEqual(warns[0].category, category) if msg is not None: self.assertEqual(str(warns[0].message), msg) + + def assertFormal(self, spec, mode="bmc", depth=1): + caller, *_ = traceback.extract_stack(limit=2) + spec_root, _ = os.path.splitext(caller.filename) + spec_dir = os.path.dirname(spec_root) + spec_name = "{}_{}".format( + os.path.basename(spec_root).replace("test_", "spec_"), + caller.name.replace("test_", "") + ) + + # The sby -f switch seems not fully functional when sby is reading from stdin. + if os.path.exists(os.path.join(spec_dir, spec_name)): + shutil.rmtree(os.path.join(spec_dir, spec_name)) + + config = textwrap.dedent("""\ + [options] + mode {mode} + depth {depth} + + [engines] + smtbmc + + [script] + read_ilang top.il + prep + + [file top.il] + {rtlil} + """).format( + mode=mode, + depth=depth, + rtlil=rtlil.convert(spec.get_fragment("formal")) + ) + with subprocess.Popen(["sby", "-f", "-d", spec_name], cwd=spec_dir, + universal_newlines=True, + stdin=subprocess.PIPE, stdout=subprocess.PIPE) as proc: + stdout, stderr = proc.communicate(config) + if proc.returncode != 0: + self.fail("Formal verification failed:\n" + stdout)