https://github.com/ZipCPU/dbgbus/blob/master/hexbus/rtl/hbdeword.v
"""
-from nmigen import Signal, Mux, Module, Elaboratable
+from nmigen import Signal, Mux, Module, Elaboratable, Const
from nmigen.cli import verilog, rtlil
from nmigen.hdl.rec import Record
from nmutil.iocontrol import (PrevControl, NextControl, Object, RecordObject)
from nmutil.stageapi import (_spec, StageCls, Stage, StageChain, StageHelper)
from nmutil import nmoperator
-
+
class RecordBasedStage(Stage):
""" convenience class which provides a Records-based layout.
*BYPASSES* a ControlBase instance ready/valid signalling, which
clearly should not be done without a really, really good reason.
"""
- def __init__(self, stage=None, in_multi=None, stage_ctl=False):
+ def __init__(self, stage=None, in_multi=None, stage_ctl=False, maskwid=0):
""" Base class containing ready/valid/data to previous and next stages
* p: contains ready/valid to the previous stage
* add data_o member to NextControl (n)
Calling ControlBase._new_data is a good way to do that.
"""
+ print ("ControlBase", self, stage, in_multi, stage_ctl)
StageHelper.__init__(self, stage)
# set up input and output IO ACK (prev/next ready/valid)
- self.p = PrevControl(in_multi, stage_ctl)
- self.n = NextControl(stage_ctl)
+ self.p = PrevControl(in_multi, stage_ctl, maskwid=maskwid)
+ self.n = NextControl(stage_ctl, maskwid=maskwid)
# set up the input and output data
if stage is not None:
return self.m
+class MaskNoDelayCancellable(ControlBase):
+ """ Mask-activated Cancellable pipeline (that does not respect "ready")
+
+ Based on (identical behaviour to) SimpleHandshake.
+ TODO: decide whether to merge *into* SimpleHandshake.
+
+ Argument: stage. see Stage API above
+
+ stage-1 p.valid_i >>in stage n.valid_o out>> stage+1
+ stage-1 p.ready_o <<out stage n.ready_i <<in stage+1
+ stage-1 p.data_i >>in stage n.data_o out>> stage+1
+ | |
+ +--process->--^
+ """
+ def __init__(self, stage, maskwid, in_multi=None, stage_ctl=False):
+ ControlBase.__init__(self, stage, in_multi, stage_ctl, maskwid)
+
+ def elaborate(self, platform):
+ self.m = m = ControlBase.elaborate(self, platform)
+
+ # store result of processing in combinatorial temporary
+ result = _spec(self.stage.ospec, "r_tmp")
+ m.d.comb += nmoperator.eq(result, self.data_r)
+
+ # establish if the data should be passed on. cancellation is
+ # a global signal.
+ # XXX EXCEPTIONAL CIRCUMSTANCES: inspection of the data payload
+ # is NOT "normal" for the Stage API.
+ p_valid_i = Signal(reset_less=True)
+ #print ("self.p.data_i", self.p.data_i)
+ maskedout = Signal(len(self.p.mask_i), reset_less=True)
+ m.d.comb += maskedout.eq(self.p.mask_i & ~self.p.stop_i)
+ m.d.comb += p_valid_i.eq(maskedout.bool())
+
+ # if idmask nonzero, mask gets passed on (and register set).
+ # register is left as-is if idmask is zero, but out-mask is set to zero
+ # note however: only the *uncancelled* mask bits get passed on
+ m.d.sync += self.n.valid_o.eq(p_valid_i)
+ m.d.sync += self.n.mask_o.eq(Mux(p_valid_i, maskedout, 0))
+ with m.If(p_valid_i):
+ data_o = self._postprocess(result) # XXX TBD, does nothing right now
+ m.d.sync += nmoperator.eq(self.n.data_o, data_o) # update output
+
+ # output valid if
+ # input always "ready"
+ #m.d.comb += self.p._ready_o.eq(self.n.ready_i_test)
+ m.d.comb += self.p._ready_o.eq(Const(1))
+
+ # always pass on stop (as combinatorial: single signal)
+ m.d.comb += self.n.stop_o.eq(self.p.stop_i)
+
+ return self.m
+
+
+class MaskCancellable(ControlBase):
+ """ Mask-activated Cancellable pipeline
+
+ Arguments:
+
+ * stage. see Stage API above
+ * maskwid - sets up cancellation capability (mask and stop).
+ * in_multi
+ * stage_ctl
+ * dynamic - allows switching from sync to combinatorial (passthrough)
+ USE WITH CARE. will need the entire pipe to be quiescent
+ before switching, otherwise data WILL be destroyed.
+
+ stage-1 p.valid_i >>in stage n.valid_o out>> stage+1
+ stage-1 p.ready_o <<out stage n.ready_i <<in stage+1
+ stage-1 p.data_i >>in stage n.data_o out>> stage+1
+ | |
+ +--process->--^
+ """
+ def __init__(self, stage, maskwid, in_multi=None, stage_ctl=False,
+ dynamic=False):
+ ControlBase.__init__(self, stage, in_multi, stage_ctl, maskwid)
+ self.dynamic = dynamic
+ if dynamic:
+ self.latchmode = Signal()
+ else:
+ self.latchmode = Const(1)
+
+ def elaborate(self, platform):
+ self.m = m = ControlBase.elaborate(self, platform)
+
+ r_mask = Signal(len(self.p.mask_i), reset_less=True)
+ data_r = _spec(self.stage.ospec, "data_r")
+ m.d.comb += nmoperator.eq(data_r, self._postprocess(self.data_r))
+
+ with m.If(self.latchmode):
+ r_busy = Signal()
+ r_latch = _spec(self.stage.ospec, "r_latch")
+
+ # establish if the data should be passed on. cancellation is
+ # a global signal.
+ p_valid_i = Signal(reset_less=True)
+ #print ("self.p.data_i", self.p.data_i)
+ maskedout = Signal(len(self.p.mask_i), reset_less=True)
+ m.d.comb += maskedout.eq(self.p.mask_i & ~self.p.stop_i)
+
+ # establish some combinatorial temporaries
+ n_ready_i = Signal(reset_less=True, name="n_i_rdy_data")
+ p_valid_i_p_ready_o = Signal(reset_less=True)
+ m.d.comb += [p_valid_i.eq(self.p.valid_i_test & maskedout.bool()),
+ n_ready_i.eq(self.n.ready_i_test),
+ p_valid_i_p_ready_o.eq(p_valid_i & self.p.ready_o),
+ ]
+
+ # if idmask nonzero, mask gets passed on (and register set).
+ # register is left as-is if idmask is zero, but out-mask is set to
+ # zero
+ # note however: only the *uncancelled* mask bits get passed on
+ m.d.sync += r_mask.eq(Mux(p_valid_i, maskedout, 0))
+ m.d.comb += self.n.mask_o.eq(r_mask)
+
+ # always pass on stop (as combinatorial: single signal)
+ m.d.comb += self.n.stop_o.eq(self.p.stop_i)
+
+ stor = Signal(reset_less=True)
+ m.d.comb += stor.eq(p_valid_i_p_ready_o | n_ready_i)
+ with m.If(stor):
+ # store result of processing in combinatorial temporary
+ m.d.sync += nmoperator.eq(r_latch, data_r)
+
+ # previous valid and ready
+ with m.If(p_valid_i_p_ready_o):
+ m.d.sync += r_busy.eq(1) # output valid
+ # previous invalid or not ready, however next is accepting
+ with m.Elif(n_ready_i):
+ m.d.sync += r_busy.eq(0) # ...so set output invalid
+
+ # output set combinatorially from latch
+ m.d.comb += nmoperator.eq(self.n.data_o, r_latch)
+
+ m.d.comb += self.n.valid_o.eq(r_busy)
+ # if next is ready, so is previous
+ m.d.comb += self.p._ready_o.eq(n_ready_i)
+
+ with m.Else():
+ # pass everything straight through. p connected to n: data,
+ # valid, mask, everything. this is "effectively" just a
+ # StageChain: MaskCancellable is doing "nothing" except
+ # combinatorially passing everything through
+ # (except now it's *dynamically selectable* whether to do that)
+ m.d.comb += self.n.valid_o.eq(self.p.valid_i_test)
+ m.d.comb += self.p._ready_o.eq(self.n.ready_i_test)
+ m.d.comb += self.n.stop_o.eq(self.p.stop_i)
+ m.d.comb += self.n.mask_o.eq(self.p.mask_i)
+ m.d.comb += nmoperator.eq(self.n.data_o, data_r)
+
+ return self.m
+
+
class SimpleHandshake(ControlBase):
""" simple handshake control. data and strobe signals travel in sync.
implements the protocol used by Wishbone and AXI4.
return self.m
+
class UnbufferedPipeline2(ControlBase):
""" A simple pipeline stage with single-clock synchronisation
and two-way valid/ready synchronised signalling.