"""
-from nmigen import Module, Signal, Mux, Const
+from nmigen import Module, Signal, Mux, Const, Elaboratable
from nmigen.hdl.rec import Record
from nmigen.compat.sim import run_simulation
from nmigen.cli import verilog, rtlil
from singlepipe import SimpleHandshake
from singlepipe import PassThroughHandshake
from singlepipe import PassThroughStage
+from singlepipe import FIFOControl
+from singlepipe import RecordObject
from random import randint, seed
assert o_n_valid == val
-def testbench(dut):
+def tbench(dut):
#yield dut.i_p_rst.eq(1)
yield dut.n.i_ready.eq(0)
- yield dut.p.o_ready.eq(0)
+ #yield dut.p.o_ready.eq(0)
yield
yield
#yield dut.i_p_rst.eq(0)
yield
-def testbench2(dut):
+def tbench2(dut):
#yield dut.p.i_rst.eq(1)
yield dut.n.i_ready.eq(0)
#yield dut.p.o_ready.eq(0)
if self.o == len(self.data):
break
-def test3_resultfn(o_data, expected, i, o):
+def resultfn_3(o_data, expected, i, o):
assert o_data == expected + 1, \
"%d-%d data %x not match %x\n" \
% (i, o, o_data, expected)
if self.o == len(self.data):
break
-def test5_resultfn(o_data, expected, i, o):
+def resultfn_5(o_data, expected, i, o):
res = expected[0] + expected[1]
assert o_data == res, \
"%d-%d data %x not match %s\n" \
% (i, o, o_data, repr(expected))
-def testbench4(dut):
+def tbench4(dut):
data = []
for i in range(num_tests):
#data.append(randint(0, 1<<16-1))
"""
def elaborate(self, platform):
- m = Module()
+ m = ControlBase.elaborate(self, platform)
pipe1 = ExampleBufPipe()
pipe2 = ExampleBufPipe()
return data
-def test9_resultfn(o_data, expected, i, o):
+def resultfn_9(o_data, expected, i, o):
res = expected + 2
assert o_data == res, \
"%d-%d received data %x not match expected %x\n" \
# Test 6 and 10
######################################################################
-class SetLessThan:
+class SetLessThan(Elaboratable):
def __init__(self, width, signed):
self.m = Module()
self.src1 = Signal((width, signed), name="src1")
def __init__(self):
self.slt = SetLessThan(16, True)
- def ispec(self):
- return (Signal(16, name="sig1"), Signal(16, "sig2"))
+ def ispec(self, name):
+ return (Signal(16, name="%s_sig1" % name),
+ Signal(16, name="%s_sig2" % name))
- def ospec(self):
- return Signal(16, "out")
+ def ospec(self, name):
+ return Signal(16, "%s_out" % name)
def setup(self, m, i):
self.o = Signal(16)
return self.o
-class LTStageDerived(SetLessThan, StageCls):
+class LTStageDerived(SetLessThan, StageCls, Elaboratable):
""" special version of a nmigen module where the module is also a stage
shows that you don't actually need to combinatorially connect
BufferedHandshake.__init__(self, stage)
-def test6_resultfn(o_data, expected, i, o):
+def resultfn_6(o_data, expected, i, o):
res = 1 if expected[0] < expected[1] else 0
assert o_data == res, \
"%d-%d data %x not match %s\n" \
UnbufferedPipeline.__init__(self, stage)
-def test7_resultfn(o_data, expected, i, o):
+def resultfn_7(o_data, expected, i, o):
res = (expected['src1'] + 1, expected['src2'] + 1)
assert o_data['src1'] == res[0] and o_data['src2'] == res[1], \
"%d-%d data %s not match %s\n" \
UnbufferedPipeline.__init__(self, stage)
-def test11_resultfn(o_data, expected, i, o):
+def resultfn_test11(o_data, expected, i, o):
res1 = expected.src1 + 1
res2 = expected.src2 + 1
assert o_data['src1'] == res1 and o_data['src2'] == res2, \
self.op2 = op2
-def test8_resultfn(o_data, expected, i, o):
+def resultfn_8(o_data, expected, i, o):
res = expected.op1 + expected.op2 # these are a TestInputAdd instance
assert o_data == res, \
- "%d-%d data %x not match %s\n" \
- % (i, o, o_data, repr(expected))
+ "%d-%d data %s res %x not match %s\n" \
+ % (i, o, repr(o_data), res, repr(expected))
def data_2op():
data = []
# Test 12
######################################################################
-class ExampleStageDelayCls(StageCls):
+class ExampleStageDelayCls(StageCls, Elaboratable):
""" an example of how to use the buffered pipeline, in a static class
fashion
"""
@property
def d_ready(self):
+ """ data is ready to be accepted when this is true
+ """
return (self.count == 1)# | (self.count == 3)
return Const(1)
def d_valid(self, i_ready):
+ """ data is valid at output when this is true
+ """
return self.count == self.valid_trigger
return Const(1)
return data
-def test12_resultfn(o_data, expected, i, o):
+def resultfn_12(o_data, expected, i, o):
res = expected + 1
assert o_data == res, \
"%d-%d data %x not match %x\n" \
class ExampleBufModeUnBufPipe(ControlBase):
def elaborate(self, platform):
- m = ControlBase._elaborate(self, platform)
+ m = ControlBase.elaborate(self, platform)
pipe1 = ExampleBufModeAdd1Pipe()
pipe2 = ExampleBufAdd1Pipe()
stage = PassThroughStage(self.iospecfn)
PassThroughHandshake.__init__(self, stage)
-def test_identical_resultfn(o_data, expected, i, o):
+def resultfn_identical(o_data, expected, i, o):
res = expected
assert o_data == res, \
"%d-%d data %x not match %x\n" \
class ExampleBufPassThruPipe(ControlBase):
def elaborate(self, platform):
- m = ControlBase._elaborate(self, platform)
+ m = ControlBase.elaborate(self, platform)
# XXX currently fails: any other permutation works fine.
# p1=u,p2=b ok p1=u,p2=u ok p1=b,p2=b ok
return m
+######################################################################
+# Test 20
+######################################################################
+
+def iospecfn():
+ return Signal(16, name="d_in")
+
+class FIFOTest16(FIFOControl):
+
+ def __init__(self):
+ stage = PassThroughStage(iospecfn)
+ FIFOControl.__init__(self, 2, stage)
+
+
+######################################################################
+# Test 21
+######################################################################
+
+class ExampleFIFOPassThruPipe1(ControlBase):
+
+ def elaborate(self, platform):
+ m = ControlBase.elaborate(self, platform)
+
+ pipe1 = FIFOTest16()
+ pipe2 = ExamplePassAdd1Pipe()
+
+ m.submodules.pipe1 = pipe1
+ m.submodules.pipe2 = pipe2
+
+ m.d.comb += self.connect([pipe1, pipe2])
+
+ return m
+
+
+######################################################################
+# Test 22
+######################################################################
+
+class Example2OpRecord(RecordObject):
+ def __init__(self):
+ RecordObject.__init__(self)
+ self.op1 = Signal(16)
+ self.op2 = Signal(16)
+
+
+class ExampleAddRecordObjectStage(StageCls):
+
+ def ispec(self):
+ """ returns an instance of an Example2OpRecord.
+ """
+ return Example2OpRecord()
+
+ def ospec(self):
+ """ returns an output signal which will happen to contain the sum
+ of the two inputs
+ """
+ return Signal(16)
+
+ def process(self, i):
+ """ process the input data (sums the values in the tuple) and returns it
+ """
+ return i.op1 + i.op2
+
+
+class ExampleRecordHandshakeAddClass(SimpleHandshake):
+
+ def __init__(self):
+ addstage = ExampleAddRecordObjectStage()
+ SimpleHandshake.__init__(self, stage=addstage)
+
+
+######################################################################
+# Test 23
+######################################################################
+
+def iospecfnrecord():
+ return Example2OpRecord()
+
+class FIFOTestRecordControl(FIFOControl):
+
+ def __init__(self):
+ stage = PassThroughStage(iospecfnrecord)
+ FIFOControl.__init__(self, 2, stage)
+
+
+class ExampleFIFORecordObjectPipe(ControlBase):
+
+ def elaborate(self, platform):
+ m = ControlBase.elaborate(self, platform)
+
+ pipe1 = FIFOTestRecordControl()
+ pipe2 = ExampleRecordHandshakeAddClass()
+
+ m.submodules.pipe1 = pipe1
+ m.submodules.pipe2 = pipe2
+
+ m.d.comb += self.connect([pipe1, pipe2])
+
+ return m
+
+
+######################################################################
+# Test 24
+######################################################################
+
+class FIFOTestRecordAddStageControl(FIFOControl):
+
+ def __init__(self):
+ stage = ExampleAddRecordObjectStage()
+ FIFOControl.__init__(self, 2, stage)
+
+
+
+######################################################################
+# Test 25
+######################################################################
+
+class FIFOTestAdd16(FIFOControl):
+
+ def __init__(self):
+ stage = ExampleStageCls()
+ FIFOControl.__init__(self, 2, stage)
+
+
+class ExampleFIFOAdd2Pipe(ControlBase):
+
+ def elaborate(self, platform):
+ m = ControlBase.elaborate(self, platform)
+
+ pipe1 = FIFOTestAdd16()
+ pipe2 = FIFOTestAdd16()
+
+ m.submodules.pipe1 = pipe1
+ m.submodules.pipe2 = pipe2
+
+ m.d.comb += self.connect([pipe1, pipe2])
+
+ return m
+
+
+######################################################################
+# Test 26
+######################################################################
+
+def iospecfn24():
+ return (Signal(16, name="src1"), Signal(16, name="src2"))
+
+class FIFOTest2x16(FIFOControl):
+
+ def __init__(self):
+ stage = PassThroughStage(iospecfn2)
+ FIFOControl.__init__(self, 2, stage)
+
+
######################################################################
# Test 997
######################################################################
class ExampleBufPassThruPipe2(ControlBase):
def elaborate(self, platform):
- m = ControlBase._elaborate(self, platform)
+ m = ControlBase.elaborate(self, platform)
# XXX currently fails: any other permutation works fine.
# p1=u,p2=b ok p1=u,p2=u ok p1=b,p2=b ok
"""
def elaborate(self, platform):
- m = ControlBase._elaborate(self, platform)
+ m = ControlBase.elaborate(self, platform)
pipe1 = ExampleBufDelayedPipe()
pipe2 = ExampleBufPipe()
class ExampleBufUnBufPipe(ControlBase):
def elaborate(self, platform):
- m = ControlBase._elaborate(self, platform)
+ m = ControlBase.elaborate(self, platform)
# XXX currently fails: any other permutation works fine.
# p1=u,p2=b ok p1=u,p2=u ok p1=b,p2=b ok
# Unit Tests
######################################################################
-num_tests = 100
+num_tests = 10
if __name__ == '__main__':
- print ("test 1")
- dut = ExampleBufPipe()
- run_simulation(dut, testbench(dut), vcd_name="test_bufpipe.vcd")
-
- print ("test 2")
- dut = ExampleBufPipe2()
- run_simulation(dut, testbench2(dut), vcd_name="test_bufpipe2.vcd")
- ports = [dut.p.i_valid, dut.n.i_ready,
- dut.n.o_valid, dut.p.o_ready] + \
- [dut.p.i_data] + [dut.n.o_data]
- vl = rtlil.convert(dut, ports=ports)
- with open("test_bufpipe2.il", "w") as f:
- f.write(vl)
+ if False:
+ print ("test 1")
+ dut = ExampleBufPipe()
+ run_simulation(dut, tbench(dut), vcd_name="test_bufpipe.vcd")
+
+ print ("test 2")
+ dut = ExampleBufPipe2()
+ run_simulation(dut, tbench2(dut), vcd_name="test_bufpipe2.vcd")
+ ports = [dut.p.i_valid, dut.n.i_ready,
+ dut.n.o_valid, dut.p.o_ready] + \
+ [dut.p.i_data] + [dut.n.o_data]
+ vl = rtlil.convert(dut, ports=ports)
+ with open("test_bufpipe2.il", "w") as f:
+ f.write(vl)
print ("test 3")
dut = ExampleBufPipe()
- test = Test3(dut, test3_resultfn)
+ test = Test3(dut, resultfn_3)
run_simulation(dut, [test.send, test.rcv], vcd_name="test_bufpipe3.vcd")
print ("test 3.5")
dut = ExamplePipeline()
- test = Test3(dut, test3_resultfn)
+ test = Test3(dut, resultfn_3)
run_simulation(dut, [test.send, test.rcv], vcd_name="test_combpipe3.vcd")
print ("test 4")
dut = ExampleBufPipe2()
- run_simulation(dut, testbench4(dut), vcd_name="test_bufpipe4.vcd")
+ run_simulation(dut, tbench4(dut), vcd_name="test_bufpipe4.vcd")
print ("test 5")
dut = ExampleBufPipeAdd()
- test = Test5(dut, test5_resultfn, stage_ctl=True)
+ test = Test5(dut, resultfn_5, stage_ctl=True)
run_simulation(dut, [test.send, test.rcv], vcd_name="test_bufpipe5.vcd")
print ("test 6")
dut = ExampleLTPipeline()
- test = Test5(dut, test6_resultfn)
+ test = Test5(dut, resultfn_6)
run_simulation(dut, [test.send, test.rcv], vcd_name="test_ltcomb6.vcd")
ports = [dut.p.i_valid, dut.n.i_ready,
print ("test 7")
dut = ExampleAddRecordPipe()
data=data_dict()
- test = Test5(dut, test7_resultfn, data=data)
- run_simulation(dut, [test.send, test.rcv], vcd_name="test_addrecord.vcd")
-
+ test = Test5(dut, resultfn_7, data=data)
ports = [dut.p.i_valid, dut.n.i_ready,
dut.n.o_valid, dut.p.o_ready,
dut.p.i_data.src1, dut.p.i_data.src2,
vl = rtlil.convert(dut, ports=ports)
with open("test_recordcomb_pipe.il", "w") as f:
f.write(vl)
+ run_simulation(dut, [test.send, test.rcv], vcd_name="test_addrecord.vcd")
print ("test 8")
dut = ExampleBufPipeAddClass()
data=data_2op()
- test = Test5(dut, test8_resultfn, data=data)
+ test = Test5(dut, resultfn_8, data=data)
run_simulation(dut, [test.send, test.rcv], vcd_name="test_bufpipe8.vcd")
print ("test 9")
f.write(vl)
data = data_chain2()
- test = Test5(dut, test9_resultfn, data=data)
+ test = Test5(dut, resultfn_9, data=data)
run_simulation(dut, [test.send, test.rcv],
vcd_name="test_bufpipechain2.vcd")
print ("test 10")
dut = ExampleLTBufferedPipeDerived()
- test = Test5(dut, test6_resultfn)
+ test = Test5(dut, resultfn_6)
run_simulation(dut, [test.send, test.rcv], vcd_name="test_ltbufpipe10.vcd")
vl = rtlil.convert(dut, ports=ports)
with open("test_ltbufpipe10.il", "w") as f:
print ("test 11")
dut = ExampleAddRecordPlaceHolderPipe()
data=data_placeholder()
- test = Test5(dut, test11_resultfn, data=data)
+ test = Test5(dut, resultfn_test11, data=data)
run_simulation(dut, [test.send, test.rcv], vcd_name="test_addrecord.vcd")
print ("test 12")
dut = ExampleBufDelayedPipe()
data = data_chain1()
- test = Test5(dut, test12_resultfn, data=data)
+ test = Test5(dut, resultfn_12, data=data)
run_simulation(dut, [test.send, test.rcv], vcd_name="test_bufpipe12.vcd")
ports = [dut.p.i_valid, dut.n.i_ready,
dut.n.o_valid, dut.p.o_ready] + \
print ("test 13")
dut = ExampleUnBufDelayedPipe()
data = data_chain1()
- test = Test5(dut, test12_resultfn, data=data)
+ test = Test5(dut, resultfn_12, data=data)
run_simulation(dut, [test.send, test.rcv], vcd_name="test_unbufpipe13.vcd")
ports = [dut.p.i_valid, dut.n.i_ready,
dut.n.o_valid, dut.p.o_ready] + \
print ("test 15")
dut = ExampleBufModeAdd1Pipe()
data = data_chain1()
- test = Test5(dut, test12_resultfn, data=data)
+ test = Test5(dut, resultfn_12, data=data)
run_simulation(dut, [test.send, test.rcv], vcd_name="test_bufunbuf15.vcd")
ports = [dut.p.i_valid, dut.n.i_ready,
dut.n.o_valid, dut.p.o_ready] + \
print ("test 16")
dut = ExampleBufModeUnBufPipe()
data = data_chain1()
- test = Test5(dut, test9_resultfn, data=data)
+ test = Test5(dut, resultfn_9, data=data)
run_simulation(dut, [test.send, test.rcv], vcd_name="test_bufunbuf16.vcd")
ports = [dut.p.i_valid, dut.n.i_ready,
dut.n.o_valid, dut.p.o_ready] + \
print ("test 17")
dut = ExampleUnBufAdd1Pipe2()
data = data_chain1()
- test = Test5(dut, test12_resultfn, data=data)
+ test = Test5(dut, resultfn_12, data=data)
run_simulation(dut, [test.send, test.rcv], vcd_name="test_unbufpipe17.vcd")
ports = [dut.p.i_valid, dut.n.i_ready,
dut.n.o_valid, dut.p.o_ready] + \
print ("test 18")
dut = PassThroughTest()
data = data_chain1()
- test = Test5(dut, test_identical_resultfn, data=data)
+ test = Test5(dut, resultfn_identical, data=data)
run_simulation(dut, [test.send, test.rcv], vcd_name="test_passthru18.vcd")
ports = [dut.p.i_valid, dut.n.i_ready,
dut.n.o_valid, dut.p.o_ready] + \
print ("test 19")
dut = ExampleBufPassThruPipe()
data = data_chain1()
- test = Test5(dut, test9_resultfn, data=data)
+ test = Test5(dut, resultfn_9, data=data)
run_simulation(dut, [test.send, test.rcv], vcd_name="test_bufpass19.vcd")
ports = [dut.p.i_valid, dut.n.i_ready,
dut.n.o_valid, dut.p.o_ready] + \
with open("test_bufpass19.il", "w") as f:
f.write(vl)
+ print ("test 20")
+ dut = FIFOTest16()
+ data = data_chain1()
+ test = Test5(dut, resultfn_identical, data=data)
+ run_simulation(dut, [test.send, test.rcv], vcd_name="test_fifo20.vcd")
+ ports = [dut.p.i_valid, dut.n.i_ready,
+ dut.n.o_valid, dut.p.o_ready] + \
+ [dut.p.i_data] + [dut.n.o_data]
+ vl = rtlil.convert(dut, ports=ports)
+ with open("test_fifo20.il", "w") as f:
+ f.write(vl)
+
+ print ("test 21")
+ dut = ExampleFIFOPassThruPipe1()
+ data = data_chain1()
+ test = Test5(dut, resultfn_12, data=data)
+ run_simulation(dut, [test.send, test.rcv], vcd_name="test_fifopass21.vcd")
+ ports = [dut.p.i_valid, dut.n.i_ready,
+ dut.n.o_valid, dut.p.o_ready] + \
+ [dut.p.i_data] + [dut.n.o_data]
+ vl = rtlil.convert(dut, ports=ports)
+ with open("test_fifopass21.il", "w") as f:
+ f.write(vl)
+
+ print ("test 22")
+ dut = ExampleRecordHandshakeAddClass()
+ data=data_2op()
+ test = Test5(dut, resultfn_8, data=data)
+ run_simulation(dut, [test.send, test.rcv], vcd_name="test_addrecord22.vcd")
+ ports = [dut.p.i_valid, dut.n.i_ready,
+ dut.n.o_valid, dut.p.o_ready] + \
+ [dut.p.i_data.op1, dut.p.i_data.op2] + \
+ [dut.n.o_data]
+ vl = rtlil.convert(dut, ports=ports)
+ with open("test_addrecord22.il", "w") as f:
+ f.write(vl)
+
+ print ("test 23")
+ dut = ExampleFIFORecordObjectPipe()
+ data=data_2op()
+ test = Test5(dut, resultfn_8, data=data)
+ run_simulation(dut, [test.send, test.rcv], vcd_name="test_addrecord23.vcd")
+ ports = [dut.p.i_valid, dut.n.i_ready,
+ dut.n.o_valid, dut.p.o_ready] + \
+ [dut.p.i_data.op1, dut.p.i_data.op2] + \
+ [dut.n.o_data]
+ vl = rtlil.convert(dut, ports=ports)
+ with open("test_addrecord23.il", "w") as f:
+ f.write(vl)
+
+ print ("test 24")
+ dut = FIFOTestRecordAddStageControl()
+ data=data_2op()
+ test = Test5(dut, resultfn_8, data=data)
+ ports = [dut.p.i_valid, dut.n.i_ready,
+ dut.n.o_valid, dut.p.o_ready] + \
+ [dut.p.i_data.op1, dut.p.i_data.op2] + \
+ [dut.n.o_data]
+ vl = rtlil.convert(dut, ports=ports)
+ with open("test_addrecord24.il", "w") as f:
+ f.write(vl)
+ run_simulation(dut, [test.send, test.rcv], vcd_name="test_addrecord24.vcd")
+
+ print ("test 25")
+ dut = ExampleFIFOAdd2Pipe()
+ data = data_chain1()
+ test = Test5(dut, resultfn_9, data=data)
+ run_simulation(dut, [test.send, test.rcv], vcd_name="test_add2pipe25.vcd")
+ ports = [dut.p.i_valid, dut.n.i_ready,
+ dut.n.o_valid, dut.p.o_ready] + \
+ [dut.p.i_data] + [dut.n.o_data]
+ vl = rtlil.convert(dut, ports=ports)
+ with open("test_add2pipe25.il", "w") as f:
+ f.write(vl)
+
print ("test 997")
dut = ExampleBufPassThruPipe2()
data = data_chain1()
- test = Test5(dut, test9_resultfn, data=data)
+ test = Test5(dut, resultfn_9, data=data)
run_simulation(dut, [test.send, test.rcv], vcd_name="test_bufpass997.vcd")
ports = [dut.p.i_valid, dut.n.i_ready,
dut.n.o_valid, dut.p.o_ready] + \
print ("test 998 (fails, bug)")
dut = ExampleBufPipe3()
data = data_chain1()
- test = Test5(dut, test9_resultfn, data=data)
+ test = Test5(dut, resultfn_9, data=data)
run_simulation(dut, [test.send, test.rcv], vcd_name="test_bufpipe14.vcd")
ports = [dut.p.i_valid, dut.n.i_ready,
dut.n.o_valid, dut.p.o_ready] + \
print ("test 999 (expected to fail, which is a bug)")
dut = ExampleBufUnBufPipe()
data = data_chain1()
- test = Test5(dut, test9_resultfn, data=data)
+ test = Test5(dut, resultfn_9, data=data)
run_simulation(dut, [test.send, test.rcv], vcd_name="test_bufunbuf999.vcd")
ports = [dut.p.i_valid, dut.n.i_ready,
dut.n.o_valid, dut.p.o_ready] + \