@property
def o_ready(self):
+ """ public-facing API: indicates (externally) that stage is ready
+ """
if self.stage_ctl:
- return self.s_o_ready
- return self._o_ready
+ return self.s_o_ready # set dynamically by stage
+ return self._o_ready # return this when not under dynamic control
def _connect_in(self, prev):
""" internal helper function to connect stage to an input source.
@property
def o_valid(self):
+ """ public-facing API: indicates (externally) that data is valid
+ """
if self.stage_ctl:
return self.s_o_valid
return self._o_valid
res += self.n.o_data
return res
+ def elaborate(self, platform):
+ """ handles case where stage has dynamic ready/valid functions
+ """
+ m = Module()
+ if not self.n.stage_ctl:
+ return m
+
+ # when the pipeline (buffered or otherwise) says "ready",
+ # test the *stage* "ready".
+ with m.If(self.p._o_ready):
+ m.d.comb += self.p.s_o_ready.eq(self.stage.p_o_ready)
+ with m.Else():
+ m.d.comb += self.p.s_o_ready.eq(0)
+
+ # when the pipeline (buffered or otherwise) says "valid",
+ # test the *stage* "valid".
+ with m.If(self.n._o_valid):
+ m.d.comb += self.n.s_o_valid.eq(self.stage.n_o_valid)
+ with m.Else():
+ m.d.comb += self.n.s_o_valid.eq(0)
+ return m
+
class BufferedPipeline(ControlBase):
""" buffered pipeline stage. data and strobe signals travel in sync.
input may begin to be processed and transferred directly to output.
"""
- def __init__(self, stage):
- ControlBase.__init__(self)
+ def __init__(self, stage, stage_ctl=False):
+ ControlBase.__init__(self, stage_ctl=stage_ctl)
self.stage = stage
# set up the input and output data
def elaborate(self, platform):
- self.m = Module()
+ self.m = ControlBase.elaborate(self, platform)
result = self.stage.ospec()
r_data = self.stage.ospec()
with self.m.If(self.n.i_ready): # next stage is ready
with self.m.If(self.p.o_ready): # not stalled
# nothing in buffer: send (processed) input direct to output
- self.m.d.sync += [self.n.o_valid.eq(p_i_valid),
+ self.m.d.sync += [self.n._o_valid.eq(p_i_valid),
eq(self.n.o_data, result), # update output
]
with self.m.Else(): # p.o_ready is false, and something in buffer
# Flush the [already processed] buffer to the output port.
- self.m.d.sync += [self.n.o_valid.eq(1), # declare reg empty
+ self.m.d.sync += [self.n._o_valid.eq(1), # reg empty
eq(self.n.o_data, r_data), # flush buffer
- self.p.o_ready.eq(1), # clear stall
+ self.p._o_ready.eq(1), # clear stall
]
# ignore input, since p.o_ready is also false.
# (n.i_ready) is false here: next stage is ready
with self.m.Elif(o_n_validn): # next stage being told "ready"
- self.m.d.sync += [self.n.o_valid.eq(p_i_valid),
- self.p.o_ready.eq(1), # Keep the buffer empty
+ self.m.d.sync += [self.n._o_valid.eq(p_i_valid),
+ self.p._o_ready.eq(1), # Keep the buffer empty
eq(self.n.o_data, result), # set output data
]
# (n.i_ready) false and (n.o_valid) true:
with self.m.Elif(i_p_valid_o_p_ready):
# If next stage *is* ready, and not stalled yet, accept input
- self.m.d.sync += self.p.o_ready.eq(~(p_i_valid & self.n.o_valid))
+ self.m.d.sync += self.p._o_ready.eq(~(p_i_valid & self.n.o_valid))
return self.m
COMBINATORIALLY (no clock dependence).
"""
- def __init__(self, stage):
- ControlBase.__init__(self)
+ def __init__(self, stage, stage_ctl=False):
+ ControlBase.__init__(self, stage_ctl=stage_ctl)
self.stage = stage
# set up the input and output data
self.m.d.comb += p_i_valid.eq(self.p.i_valid_logic())
self.m.d.comb += pv.eq(self.p.i_valid & self.p.o_ready)
- self.m.d.comb += self.n.o_valid.eq(data_valid)
- self.m.d.comb += self.p.o_ready.eq(~data_valid | self.n.i_ready)
+ self.m.d.comb += self.n._o_valid.eq(data_valid)
+ self.m.d.comb += self.p._o_ready.eq(~data_valid | self.n.i_ready)
self.m.d.sync += data_valid.eq(p_i_valid | \
(~self.n.i_ready & data_valid))
with self.m.If(pv):
"""
-from nmigen import Module, Signal, Mux
+from nmigen import Module, Signal, Mux, Const
from nmigen.hdl.rec import Record
from nmigen.compat.sim import run_simulation
from nmigen.cli import verilog, rtlil
"""
def __init__(self):
- BufferedPipeline.__init__(self, ExampleStageDelayCls())
+ stage = ExampleStageDelayCls()
+ BufferedPipeline.__init__(self, stage, stage_ctl=True)
class ExampleBufPipe3(ControlBase):
print ("test 12")
dut = ExampleBufPipe3()
- run_simulation(dut, testbench2(dut), vcd_name="test_bufpipe12.vcd")
+ data = data_chain2()
+ test = Test5(dut, test9_resultfn, data=data)
+ run_simulation(dut, [test.send, test.rcv], vcd_name="test_bufpipe12.vcd")
ports = [dut.p.i_valid, dut.n.i_ready,
dut.n.o_valid, dut.p.o_ready] + \
[dut.p.i_data] + [dut.n.o_data]