{interface}
level : out
Number of unread entries.
+ replace : in
+ Replaces the last entry written into the FIFO with `din`. Does nothing
+ if that entry has already been read (i.e. the FIFO is empty).
+ Assert in conjunction with `we`.
"""
__doc__ = __doc__.format(interface=_FIFOInterface.__doc__)
_FIFOInterface.__init__(self, width_or_layout, depth)
self.level = Signal(max=depth+1)
+ self.replace = Signal()
###
- do_write = Signal()
- do_read = Signal()
- self.comb += [
- do_write.eq(self.writable & self.we),
- do_read.eq(self.readable & self.re)
- ]
-
produce = Signal(max=depth)
consume = Signal(max=depth)
storage = Memory(self.width, depth)
wrport = storage.get_port(write_capable=True)
self.specials += wrport
self.comb += [
- wrport.adr.eq(produce),
+ If(self.replace,
+ wrport.adr.eq(produce-1)
+ ).Else(
+ wrport.adr.eq(produce)
+ ),
wrport.dat_w.eq(self.din_bits),
- wrport.we.eq(do_write)
+ wrport.we.eq(self.we & (self.writable | self.replace))
]
- self.sync += If(do_write, _inc(produce, depth))
+ self.sync += If(self.we & self.writable & ~self.replace,
+ _inc(produce, depth))
+
+ do_read = Signal()
+ self.comb += do_read.eq(self.readable & self.re)
rdport = storage.get_port(async_read=fwft, has_re=not fwft)
self.specials += rdport
self.sync += If(do_read, _inc(consume, depth))
self.sync += \
- If(do_write,
+ If(self.we & self.writable & ~self.replace,
If(~do_read, self.level.eq(self.level + 1))
).Elif(do_read,
self.level.eq(self.level - 1)
self.dout_bits = fifo.dout_bits
self.dout = fifo.dout
self.level = fifo.level
+ self.replace = fifo.replace
###
try:
i = seq.pop(0)
except IndexError:
- print(tbp.dut.level)
+ raise StopSimulation
+ self.assertEqual(tbp.dut.dout.a, i)
+ self.assertEqual(tbp.dut.dout.b, i*2)
+ self.run_with(cb)
+
+ def test_replace(self):
+ seq = [x for x in range(20) if x % 5]
+ def cb(tb, tbp):
+ tbp.dut.we = tbp.simulator.cycle_counter % 2 == 0
+ tbp.dut.re = tbp.simulator.cycle_counter % 3 == 0
+ tbp.dut.replace = tbp.dut.din.a % 5 == 1
+ if tbp.dut.readable and tbp.dut.re:
+ try:
+ i = seq.pop(0)
+ except IndexError:
raise StopSimulation
self.assertEqual(tbp.dut.dout.a, i)
self.assertEqual(tbp.dut.dout.b, i*2)