return Mux(signal == modulo - 1, 0, signal + 1)
-def _decr(signal, modulo):
- if modulo == 2 ** len(signal):
- return signal - 1
- else:
- return Mux(signal == 0, modulo - 1, signal - 1)
-
-
class SyncFIFO(Elaboratable, FIFOInterface):
__doc__ = FIFOInterface._doc_template.format(
description="""
level : out
Number of unread entries.
""".strip(),
- w_attributes="""
- replace : in
- If asserted at the same time as ``we``, replaces the last entry written into the queue
- with ``din``. For FWFT queues, if ``level`` is 1, this replaces the value at ``dout``
- as well. Does nothing if the queue is empty.
- """.strip())
+ w_attributes="")
def __init__(self, width, depth, fwft=True):
super().__init__(width, depth, fwft)
- self.level = Signal.range(depth + 1)
- self.replace = Signal()
+ self.level = Signal.range(depth + 1)
def elaborate(self, platform):
m = Module()
]
do_read = self.readable & self.re
- do_write = self.writable & self.we & ~self.replace
+ do_write = self.writable & self.we
storage = Memory(self.width, self.depth)
wrport = m.submodules.wrport = storage.write_port()
m.d.comb += [
wrport.addr.eq(produce),
wrport.data.eq(self.din),
- wrport.en.eq(self.we & (self.writable | self.replace))
+ wrport.en.eq(self.we & self.writable)
]
- with m.If(self.replace):
- m.d.comb += wrport.addr.eq(_decr(produce, self.depth))
with m.If(do_write):
m.d.sync += produce.eq(_incr(produce, self.depth))
fifo.din.eq(self.din),
fifo.we.eq(self.we),
self.writable.eq(fifo.writable),
- fifo.replace.eq(0),
]
m.d.comb += [
self.assertEqual((yield self.tb.dut.dout[32:]), i*2)
yield
self.run_with(gen())
-
- def test_replace(self):
- seq = [x for x in range(20) if x % 5]
- def gen():
- for cycle in count():
- yield self.tb.dut.we.eq(cycle % 2 == 0)
- yield self.tb.dut.re.eq(cycle % 7 == 0)
- yield self.tb.dut.replace.eq(
- (yield self.tb.dut.din[:32]) % 5 == 1)
- if (yield self.tb.dut.readable) and (yield self.tb.dut.re):
- try:
- i = seq.pop(0)
- except IndexError:
- break
- self.assertEqual((yield self.tb.dut.dout[:32]), i)
- self.assertEqual((yield self.tb.dut.dout[32:]), i*2)
- yield
- self.run_with(gen())
self.rdomain = rdomain
self.wdomain = wdomain
- self.replace = Signal()
- self.level = Signal.range(self.depth + 1)
+ self.level = Signal.range(self.depth + 1)
def elaborate(self, platform):
m = Module()
m.d.comb += self.writable.eq(self.level < self.depth)
m.d.comb += wrport.data.eq(self.din)
- with m.If(self.we):
- with m.If(~self.replace & self.writable):
- m.d.comb += wrport.addr.eq((produce + 1) % self.depth)
- m.d.comb += wrport.en.eq(1)
- m.d[self.wdomain] += produce.eq(wrport.addr)
- with m.If(self.replace):
- # The result of trying to replace an element in an empty queue is irrelevant.
- # The result of trying to replace the element that is currently being read
- # is undefined.
- m.d.comb += Assume(self.level > 0)
- m.d.comb += wrport.addr.eq(produce)
- m.d.comb += wrport.en.eq(1)
+ with m.If(self.we & self.writable):
+ m.d.comb += wrport.addr.eq((produce + 1) % self.depth)
+ m.d.comb += wrport.en.eq(1)
+ m.d[self.wdomain] += produce.eq(wrport.addr)
with m.If(ResetSignal(self.rdomain) | ResetSignal(self.wdomain)):
m.d.sync += self.level.eq(0)
with m.Else():
m.d.sync += self.level.eq(self.level
- + (self.writable & self.we & ~self.replace)
+ + (self.writable & self.we)
- (self.readable & self.re))
m.d.comb += Assert(ResetSignal(self.rdomain) == ResetSignal(self.wdomain))
gold.we.eq(dut.we),
gold.din.eq(dut.din),
]
- if hasattr(dut, "replace"):
- m.d.comb += gold.replace.eq(dut.replace)
- else:
- m.d.comb += gold.replace.eq(0)
m.d.comb += Assert(dut.readable.implies(gold.readable))
m.d.comb += Assert(dut.writable.implies(gold.writable))
m.domains += ClockDomain(self.rdomain)
m.d.comb += ResetSignal(self.rdomain).eq(0)
- if hasattr(fifo, "replace"):
- m.d.comb += fifo.replace.eq(0)
-
entry_1 = AnyConst(fifo.width)
entry_2 = AnyConst(fifo.width)