a new entry.
w_en : in
Write strobe. Latches ``w_data`` into the queue. Does nothing if ``w_rdy`` is not asserted.
+ w_level : out
+ Number of unread entries.
{w_attributes}
r_data : out, width
Output data. {r_data_valid}
r_en : in
Read strobe. Makes the next entry (if any) available on ``r_data`` at the next cycle.
Does nothing if ``r_rdy`` is not asserted.
+ r_level : out
+ Number of unread entries.
{r_attributes}
"""
self.w_data = Signal(width, reset_less=True)
self.w_rdy = Signal() # writable; not full
self.w_en = Signal()
+ self.w_level = Signal(range(depth + 1))
self.r_data = Signal(width, reset_less=True)
self.r_rdy = Signal() # readable; not empty
self.r_en = Signal()
+ self.r_level = Signal(range(depth + 1))
def _incr(signal, modulo):
cycle after ``r_rdy`` and ``r_en`` have been asserted.
""".strip(),
attributes="",
- r_attributes="""
- level : out
- Number of unread entries.
- """.strip(),
+ r_attributes="",
w_attributes="")
def __init__(self, *, width, depth, fwft=True):
m.d.comb += [
self.w_rdy.eq(self.level != self.depth),
- self.r_rdy.eq(self.level != 0)
+ self.r_rdy.eq(self.level != 0),
+ self.w_level.eq(self.level),
+ self.r_level.eq(self.level),
]
do_read = self.r_rdy & self.r_en
m.d.comb += [
w_port.addr.eq(produce),
w_port.data.eq(self.w_data),
- w_port.en.eq(self.w_en & self.w_rdy)
+ w_port.en.eq(self.w_en & self.w_rdy),
]
with m.If(do_write):
m.d.sync += produce.eq(_incr(produce, self.depth))
with m.Elif(self.r_en):
m.d.sync += self.r_rdy.eq(0)
- m.d.comb += self.level.eq(fifo.level + self.r_rdy)
+ m.d.comb += [
+ self.level.eq(fifo.level + self.r_rdy),
+ self.w_level.eq(self.level),
+ self.r_level.eq(self.level),
+ ]
return m
m.d.comb += consume_enc.i.eq(consume_r_nxt)
m.d[self._r_domain] += consume_r_gry.eq(consume_enc.o)
+ consume_w_bin = Signal(self._ctr_bits)
+ consume_dec = m.submodules.consume_dec = \
+ GrayDecoder(self._ctr_bits)
+ m.d.comb += consume_dec.i.eq(consume_w_gry),
+ m.d[self._w_domain] += consume_w_bin.eq(consume_dec.o)
+
+ produce_r_bin = Signal(self._ctr_bits)
+ produce_dec = m.submodules.produce_dec = \
+ GrayDecoder(self._ctr_bits)
+ m.d.comb += produce_dec.i.eq(produce_r_gry),
+ m.d[self._r_domain] += produce_r_bin.eq(produce_dec.o)
+
w_full = Signal()
r_empty = Signal()
m.d.comb += [
r_empty.eq(consume_r_gry == produce_r_gry),
]
+ m.d[self._w_domain] += self.w_level.eq((produce_w_bin - consume_w_bin)[:self._ctr_bits-1])
+ m.d[self._r_domain] += self.r_level.eq((produce_r_bin - consume_r_bin)[:self._ctr_bits-1])
+
storage = Memory(width=self.width, depth=self.depth)
w_port = m.submodules.w_port = storage.write_port(domain=self._w_domain)
r_port = m.submodules.r_port = storage.read_port (domain=self._r_domain,
fifo.w_data.eq(self.w_data),
self.w_rdy.eq(fifo.w_rdy),
fifo.w_en.eq(self.w_en),
+ self.w_level.eq(fifo.w_level),
]
with m.If(self.r_en | ~self.r_rdy):
self.r_data.eq(fifo.r_data),
self.r_rdy.eq(fifo.r_rdy),
self.r_rst.eq(fifo.r_rst),
+ self.r_level.eq(fifo.r_level),
]
m.d.comb += [
fifo.r_en.eq(1)
self.w_domain = w_domain
self.level = Signal(range(self.depth + 1))
+ self.r_level = Signal(range(self.depth + 1))
+ self.w_level = Signal(range(self.depth + 1))
def elaborate(self, platform):
m = Module()
+ (self.w_rdy & self.w_en)
- (self.r_rdy & self.r_en))
+ m.d.comb += [
+ self.r_level.eq(self.level),
+ self.w_level.eq(self.level),
+ ]
m.d.comb += Assert(ResetSignal(self.r_domain) == ResetSignal(self.w_domain))
return m
m.d.comb += Assert(dut.r_rdy.implies(gold.r_rdy))
m.d.comb += Assert(dut.w_rdy.implies(gold.w_rdy))
- if hasattr(dut, "level"):
- m.d.comb += Assert(dut.level == gold.level)
+ m.d.comb += Assert(dut.r_level == gold.r_level)
+ m.d.comb += Assert(dut.w_level == gold.w_level)
if dut.fwft:
m.d.comb += Assert(dut.r_rdy