From: whitequark Date: Sat, 19 Jan 2019 08:57:18 +0000 (+0000) Subject: lib.fifo: use model equivalence to simplify formal specification. X-Git-Tag: working~59 X-Git-Url: https://git.libre-soc.org/?a=commitdiff_plain;h=6ea0a12dd410b0a2a6163b9e835de86ac4d95909;p=nmigen.git lib.fifo: use model equivalence to simplify formal specification. This is unfortunately slow, and should probably be using theory of arrays. --- diff --git a/nmigen/test/test_lib_fifo.py b/nmigen/test/test_lib_fifo.py index 16d19de..720643f 100644 --- a/nmigen/test/test_lib_fifo.py +++ b/nmigen/test/test_lib_fifo.py @@ -32,10 +32,102 @@ class FIFOSmokeTestCase(FHDLTestCase): self.assertSyncFIFOWorks(SyncFIFOBuffered(width=8, depth=4)) -class FIFOContract: - def __init__(self, fifo, fwft, bound): +class FIFOModel(FIFOInterface): + """ + Non-synthesizable first-in first-out queue, implemented naively as a chain of registers. + """ + def __init__(self, width, depth, fwft): + super().__init__(width, depth) + + self.fwft = fwft + + self.replace = Signal() + self.level = Signal(max=self.depth + 1) + + def get_fragment(self, platform): + m = Module() + + storage = Array(Signal(self.width, reset_less=True, name="storage<{}>".format(n)) + for n in range(self.depth)) + + m.d.comb += self.readable.eq(self.level > 0) + with m.If(self.readable): + with m.If(self.re): + for (entry0, entry1) in zip(storage[:], storage[1:]): + m.d.sync += entry0.eq(entry1) + if self.fwft: + m.d.comb += self.dout.eq(storage[0]) + else: + with m.If(self.re): + m.d.sync += self.dout.eq(storage[0]) + + m.d.comb += self.writable.eq(self.level < self.depth) + write_at = self.level - (self.readable & self.re) + with m.If(self.we): + with m.If(~self.replace & self.writable): + m.d.sync += storage[write_at].eq(self.din) + with m.If(self.replace): + # The result of trying to replace an element in an empty queue is irrelevant. + # The result of trying to replace the element that is currently being read + # is undefined. + m.d.comb += Assume(write_at > 0) + m.d.sync += storage[write_at - 1].eq(self.din) + + m.d.sync += self.level.eq(self.level + + (self.writable & self.we & ~self.replace) + - (self.readable & self.re)) + + return m.lower(platform) + + +class FIFOModelEquivalenceSpec: + """ + The first-in first-out queue model equivalence specification: for any inputs and control + signals, the behavior of the implementation under test exactly matches the ideal model, + except for behavior not defined by the model. + """ + def __init__(self, fifo): + self.fifo = fifo + + def get_fragment(self, platform): + m = Module() + m.submodules.dut = dut = self.fifo + m.submodules.gold = gold = FIFOModel(dut.width, dut.depth, dut.fwft) + + m.d.comb += [ + gold.re.eq(dut.readable & dut.re), + gold.we.eq(dut.we), + gold.din.eq(dut.din), + ] + if hasattr(dut, "replace"): + m.d.comb += gold.replace.eq(dut.replace) + else: + m.d.comb += gold.replace.eq(0) + + m.d.comb += Assert(dut.readable.implies(gold.readable)) + if dut.fwft: + m.d.comb += Assert(dut.readable + .implies(dut.dout == gold.dout)) + else: + m.d.comb += Assert((Past(dut.readable) & Past(dut.re)) + .implies(dut.dout == gold.dout)) + + m.d.comb += Assert(dut.writable == gold.writable) + + if hasattr(dut, "level"): + m.d.comb += Assert(dut.level == gold.level) + + return m.lower(platform) + + +class FIFOContractSpec: + """ + The first-in first-out queue contract specification: if two elements are written to the queue + consecutively, they must be read out consecutively at some later point, no matter all other + circumstances, with the exception of reset. + """ + def __init__(self, fifo, bound): self.fifo = fifo - self.fwft = fwft self.bound = bound def get_fragment(self, platform): @@ -89,123 +181,30 @@ class FIFOContract: return m.lower(platform) -class SyncFIFOInvariants: - def __init__(self, fifo): - self.fifo = fifo - - def get_fragment(self, platform): - m = Module() - m.submodules.dut = fifo = self.fifo - - with m.If(Fell(ResetSignal())): - m.d.comb += [ - Assert(fifo.level == 0), - Assert(~fifo.readable), - Assert(fifo.writable), - ] - with m.Elif(~ResetSignal()): - m.d.comb += Assert(fifo.level == (Past(fifo.level) - + (Past(fifo.writable) & Past(fifo.we) & ~Past(fifo.replace)) - - (Past(fifo.readable) & Past(fifo.re)))) - with m.If(fifo.level != 0): - m.d.comb += Assert(fifo.readable) - with m.If(fifo.level != fifo.depth): - m.d.comb += Assert(fifo.writable) - - with m.If(Past(1)): - with m.If(~Past(fifo.re)): - # Unless `re` is asserted, output should not change, other than for the case of - # an empty FWFT queue, or an FWFT queue with a single entry being replaced. - if fifo.fwft: - with m.If((fifo.level == 1) & Past(fifo.we) & - (Past(fifo.writable) | Past(fifo.replace))): - m.d.comb += Assert(fifo.dout == Past(fifo.din)) - with m.Else(): - m.d.comb += Assert(fifo.dout == Past(fifo.dout)) - else: - m.d.comb += Assert(fifo.dout == Past(fifo.dout)) - - return m.lower(platform) - - -class SyncFIFOBufferedInvariants: - def __init__(self, fifo): - self.fifo = fifo - - def get_fragment(self, platform): - m = Module() - m.submodules.dut = fifo = self.fifo - - with m.If(Fell(ResetSignal())): - m.d.comb += [ - Assert(fifo.level == 0), - Assert(~fifo.readable), - Assert(fifo.writable), - ] - with m.Elif(~ResetSignal()): - m.d.comb += Assert(fifo.level == (Past(fifo.level) - + (Past(fifo.writable) & Past(fifo.we)) - - (Past(fifo.readable) & Past(fifo.re)))) - with m.If(fifo.level == 0): - m.d.comb += Assert(~fifo.readable) - with m.If(fifo.level == 1): - # When there is one entry in the queue, it might be either in the inner unbuffered - # queue memory, or in its output register. - with m.If(Past(fifo.readable)): - # On the previous cycle, there was an entry in output register. - with m.If(Past(fifo.level) == 1): - # It was the only entry in the queue, so it's only there if it was - # not read. - m.d.comb += Assert(fifo.readable == ~Past(fifo.re)) - with m.Else(): - # There were more entries in the queue, which would refil the output - # register, if necessary. - m.d.comb += Assert(fifo.readable) - with m.Elif(~Fell(ResetSignal(), 1)): - # On the previous cycle, there was no entry in the output register, so there is - # one only if it was written two cycles back. - m.d.comb += Assert(fifo.readable == Past(fifo.we, 2)) - with m.If(fifo.level > 1): - m.d.comb += Assert(fifo.readable) - with m.If(fifo.level != fifo.depth): - m.d.comb += Assert(fifo.writable) - - with m.If(~Past(fifo.re)): - # Unless `re` is asserted, output should not change, other than for the case of - # an empty FWFT queue, where it changes with latency 1. - with m.If(fifo.readable & ~Past(fifo.readable) & - Past(fifo.we, 2) & Past(fifo.writable, 2)): - m.d.comb += Assert(fifo.dout == Past(fifo.din, 2)) - with m.Else(): - m.d.comb += Assert(fifo.dout == Past(fifo.dout)) - - return m.lower(platform) - - class FIFOFormalCase(FHDLTestCase): - def check_fifo(self, fifo, invariants_cls): - self.assertFormal(FIFOContract(fifo, fwft=fifo.fwft, bound=fifo.depth * 2 + 1), + def check_fifo(self, fifo): + self.assertFormal(FIFOModelEquivalenceSpec(fifo), + mode="bmc", depth=fifo.depth * 2) + self.assertFormal(FIFOContractSpec(fifo, bound=fifo.depth * 2 + 1), mode="hybrid", depth=fifo.depth * 2 + 1) - self.assertFormal(invariants_cls(fifo), - mode="prove", depth=fifo.depth * 2) def test_sync_fwft_pot(self): - self.check_fifo(SyncFIFO(width=8, depth=4, fwft=True), SyncFIFOInvariants) + self.check_fifo(SyncFIFO(width=8, depth=4, fwft=True)) def test_sync_fwft_npot(self): - self.check_fifo(SyncFIFO(width=8, depth=5, fwft=True), SyncFIFOInvariants) + self.check_fifo(SyncFIFO(width=8, depth=5, fwft=True)) def test_sync_not_fwft_pot(self): - self.check_fifo(SyncFIFO(width=8, depth=4, fwft=False), SyncFIFOInvariants) + self.check_fifo(SyncFIFO(width=8, depth=4, fwft=False)) def test_sync_not_fwft_npot(self): - self.check_fifo(SyncFIFO(width=8, depth=5, fwft=False), SyncFIFOInvariants) + self.check_fifo(SyncFIFO(width=8, depth=5, fwft=False)) def test_sync_buffered_pot(self): - self.check_fifo(SyncFIFOBuffered(width=8, depth=4), SyncFIFOBufferedInvariants) + self.check_fifo(SyncFIFOBuffered(width=8, depth=4)) def test_sync_buffered_potp1(self): - self.check_fifo(SyncFIFOBuffered(width=8, depth=5), SyncFIFOBufferedInvariants) + self.check_fifo(SyncFIFOBuffered(width=8, depth=5)) def test_sync_buffered_potm1(self): - self.check_fifo(SyncFIFOBuffered(width=8, depth=3), SyncFIFOBufferedInvariants) + self.check_fifo(SyncFIFOBuffered(width=8, depth=3))