https://github.com/ZipCPU/dbgbus/blob/master/hexbus/rtl/hbdeword.v
"""
-from nmigen import Signal, Mux, Module, Elaboratable
+from nmigen import Signal, Mux, Module, Elaboratable, Const
from nmigen.cli import verilog, rtlil
from nmigen.hdl.rec import Record
*BYPASSES* a ControlBase instance ready/valid signalling, which
clearly should not be done without a really, really good reason.
"""
- def __init__(self, stage=None, in_multi=None, stage_ctl=False):
+ def __init__(self, stage=None, in_multi=None, stage_ctl=False, maskwid=0):
""" Base class containing ready/valid/data to previous and next stages
* p: contains ready/valid to the previous stage
StageHelper.__init__(self, stage)
# set up input and output IO ACK (prev/next ready/valid)
- self.p = PrevControl(in_multi, stage_ctl)
- self.n = NextControl(stage_ctl)
+ self.p = PrevControl(in_multi, stage_ctl, maskwid=maskwid)
+ self.n = NextControl(stage_ctl, maskwid=maskwid)
# set up the input and output data
if stage is not None:
| |
+--process->--^
"""
+ def __init__(self, stage, maskwid, in_multi=None, stage_ctl=False):
+ ControlBase.__init__(self, stage, in_multi, stage_ctl, maskwid)
+
def elaborate(self, platform):
self.m = m = ControlBase.elaborate(self, platform)
# XXX EXCEPTIONAL CIRCUMSTANCES: inspection of the data payload
# is NOT "normal" for the Stage API.
p_valid_i = Signal(reset_less=True)
- m.d.comb += p_valid_i.eq((self.p.data_i.ctx.idmask & \
- ~self.cancelmask).bool()) # nonzero
+ print ("self.p.data_i", self.p.data_i)
+ m.d.comb += p_valid_i.eq((self.p.mask_i.bool()))
+ #m.d.comb += p_valid_i.eq((self.p.data_i.ctx.idmask & \
+ # ~self.cancelmask)) # nonzero
# if idmask nonzero, data gets passed on.
m.d.sync += self.n.valid_o.eq(p_valid_i)
with m.If(p_valid_i):
+ m.d.sync += self.n.mask_o.eq(self.p.mask_i)
data_o = self._postprocess(result) # XXX TBD, does nothing right now
m.d.sync += nmoperator.eq(self.n.data_o, data_o) # update output
+ with m.Else():
+ m.d.sync += self.n.mask_o.eq(0)
# output valid if
# input always "ready"
- m.d.comb += self.p._ready_o.eq(1)
+ m.d.comb += self.p._ready_o.eq(Const(1))
return self.m
from nmutil.singlepipe import PassThroughStage
from nmutil.singlepipe import FIFOControl
from nmutil.singlepipe import RecordObject
+from nmutil.singlepipe import MaskCancellable
from random import randint, seed
if self.o == len(self.data):
break
+class TestMask:
+ def __init__(self, dut, resultfn, maskwid, data=None, stage_ctl=False):
+ self.dut = dut
+ self.resultfn = resultfn
+ self.stage_ctl = stage_ctl
+ self.maskwid = maskwid
+ if data:
+ self.data = data
+ else:
+ self.data = []
+ for i in range(num_tests):
+ self.data.append((randint(0, 1<<16-1), randint(0, 1<<16-1)))
+ self.i = 0
+ self.o = 0
+
+ def send(self):
+ while self.o != len(self.data):
+ send_range = randint(0, 3)
+ for j in range(randint(1,10)):
+ if send_range == 0:
+ send = True
+ else:
+ send = randint(0, send_range) != 0
+ #send = True
+ o_p_ready = yield self.dut.p.ready_o
+ if not o_p_ready:
+ yield
+ continue
+ if send and self.i != len(self.data):
+ print ("send", self.i, self.data[self.i])
+ yield self.dut.p.valid_i.eq(1)
+ yield self.dut.p.mask_i.eq(1<<self.i) # XXX TODO
+ for v in self.dut.set_input(self.data[self.i]):
+ yield v
+ self.i += 1
+ else:
+ yield self.dut.p.valid_i.eq(0)
+ yield self.dut.p.mask_i.eq(0) # XXX TODO
+ yield
+
+ def rcv(self):
+ while self.o != len(self.data):
+ stall_range = randint(0, 3)
+ for j in range(randint(1,10)):
+ ready = randint(0, stall_range) != 0
+ ready = True
+ yield self.dut.n.ready_i.eq(ready)
+ yield
+ o_n_valid = yield self.dut.n.valid_o
+ i_n_ready = yield self.dut.n.ready_i_test
+ if not o_n_valid or not i_n_ready:
+ continue
+ if isinstance(self.dut.n.data_o, Record):
+ data_o = {}
+ dod = self.dut.n.data_o
+ for k, v in dod.fields.items():
+ data_o[k] = yield v
+ else:
+ data_o = yield self.dut.n.data_o
+ print ("recv", self.o, data_o)
+ self.resultfn(data_o, self.data[self.o], self.i, self.o)
+ self.o += 1
+ if self.o == len(self.data):
+ break
+
def resultfn_5(data_o, expected, i, o):
res = expected[0] + expected[1]
assert data_o == res, \
return m
+######################################################################
+# Test 0
+######################################################################
+
+class ExampleMaskRecord(RecordObject):
+ """ an example of a class used to store 2 operands.
+ requires an eq function, to conform with the pipeline stage API
+ """
+
+ def __init__(self):
+ RecordObject.__init__(self)
+ self.src1 = Signal(16)
+ self.src2 = Signal(16)
+
+ def __eq(self, i):
+ return [self.src1.eq(i.src1), self.src2.eq(i.src2)]
+
+
+class TestInputMask:
+ """ the eq function, called by set_input, needs an incoming object
+ that conforms to the Example2OpClass.eq function requirements
+ easiest way to do that is to create a class that has the exact
+ same member layout (self.op1, self.op2) as Example2OpClass
+ """
+ def __init__(self, src1, src2):
+ self.src1 = src1
+ self.src2 = src2
+
+ def __repr__(self):
+ return "<TestInputMask %x %x" % (self.src1, self.src2)
+
+class ExampleMaskCancellable(StageCls):
+
+ def ispec(self):
+ """ returns an instance of an ExampleMaskRecord.
+ """
+ return ExampleMaskRecord()
+
+ def ospec(self):
+ """ returns the same
+ """
+ return ExampleMaskRecord()
+
+ def process(self, i):
+ """ process the input data: increase op1 and op2
+ """
+ return TestInputMask(i.src1 + 1, i.src2 + 1)
+
+
+class MaskCancellablePipe(MaskCancellable):
+
+ """ connects two stages together as a *single* combinatorial stage.
+ """
+ def __init__(self):
+ self.cancelmask = Signal(16)
+ stage1 = ExampleMaskCancellable()
+ stage2 = ExampleMaskCancellable()
+ combined = StageChain([stage1, stage2])
+ MaskCancellable.__init__(self, combined, 16)
+
+
+def data_chain0():
+ data = []
+ for i in range(num_tests):
+ data.append(TestInputMask(randint(0, 1<<16-1),
+ randint(0, 1<<16-1)))
+ return data
+
+
+def resultfn_0(data_o, expected, i, o):
+ assert data_o['src1'] == expected.src1 + 2, \
+ "src1 %x-%x received data no match\n" \
+ % (data_o['src1'], expected.src1 + 2)
+ assert data_o['src2'] == expected.src2 + 2, \
+ "src2 %x-%x received data no match\n" \
+ % (data_o['src2'] , expected.src2 + 2)
+
+
######################################################################
# Unit Tests
######################################################################
num_tests = 10
+def test0():
+ print ("test 0")
+ dut = MaskCancellablePipe()
+ ports = [dut.p.valid_i, dut.n.ready_i,
+ dut.n.valid_o, dut.p.ready_o] + \
+ [dut.p.data_i] + [dut.n.data_o]
+ #vl = rtlil.convert(dut, ports=ports)
+ #with open("test_bufpipe14.il", "w") as f:
+ # f.write(vl)
+ data = data_chain0()
+ test = TestMask(dut, resultfn_0, 16, data=data)
+ run_simulation(dut, [test.send, test.rcv],
+ vcd_name="test_maskchain0.vcd")
+
def notworking1():
print ("test 1")
dut = ExampleBufPipe()
with open("test_bufunbuf999.il", "w") as f:
f.write(vl)
+if __name__ == '__main__':
+ test0()