from random import randint, seed
-#seed(4)
+import unittest
+
+# seed(4)
def check_o_n_valid(dut, val):
o_n_valid = yield dut.n.valid_o
assert o_n_valid == val
+
def check_o_n_valid2(dut, val):
o_n_valid = yield dut.n.valid_o
assert o_n_valid == val
def tbench(dut):
- #yield dut.i_p_rst.eq(1)
+ # yield dut.i_p_rst.eq(1)
yield dut.n.ready_i.eq(0)
- #yield dut.p.ready_o.eq(0)
+ # yield dut.p.ready_o.eq(0)
yield
yield
- #yield dut.i_p_rst.eq(0)
+ # yield dut.i_p_rst.eq(0)
yield dut.n.ready_i.eq(1)
yield dut.p.data_i.eq(5)
yield dut.p.valid_i.eq(1)
yield
yield dut.p.data_i.eq(7)
- yield from check_o_n_valid(dut, 0) # effects of i_p_valid delayed
+ yield from check_o_n_valid(dut, 0) # effects of i_p_valid delayed
yield
- yield from check_o_n_valid(dut, 1) # ok *now* i_p_valid effect is felt
+ yield from check_o_n_valid(dut, 1) # ok *now* i_p_valid effect is felt
yield dut.p.data_i.eq(2)
yield
- yield dut.n.ready_i.eq(0) # begin going into "stall" (next stage says ready)
+ # begin going into "stall" (next stage says ready)
+ yield dut.n.ready_i.eq(0)
yield dut.p.data_i.eq(9)
yield
yield dut.p.valid_i.eq(0)
yield dut.p.data_i.eq(32)
yield dut.n.ready_i.eq(1)
yield
- yield from check_o_n_valid(dut, 1) # buffer still needs to output
+ yield from check_o_n_valid(dut, 1) # buffer still needs to output
yield
- yield from check_o_n_valid(dut, 1) # buffer still needs to output
+ yield from check_o_n_valid(dut, 1) # buffer still needs to output
yield
- yield from check_o_n_valid(dut, 0) # buffer outputted, *now* we're done.
+ yield from check_o_n_valid(dut, 0) # buffer outputted, *now* we're done.
yield
def tbench2(dut):
- #yield dut.p.i_rst.eq(1)
+ # yield dut.p.i_rst.eq(1)
yield dut.n.ready_i.eq(0)
- #yield dut.p.ready_o.eq(0)
+ # yield dut.p.ready_o.eq(0)
yield
yield
- #yield dut.p.i_rst.eq(0)
+ # yield dut.p.i_rst.eq(0)
yield dut.n.ready_i.eq(1)
yield dut.p.data_i.eq(5)
yield dut.p.valid_i.eq(1)
yield
yield dut.p.data_i.eq(7)
- yield from check_o_n_valid2(dut, 0) # effects of i_p_valid delayed 2 clocks
+ # effects of i_p_valid delayed 2 clocks
+ yield from check_o_n_valid2(dut, 0)
yield
- yield from check_o_n_valid2(dut, 0) # effects of i_p_valid delayed 2 clocks
+ # effects of i_p_valid delayed 2 clocks
+ yield from check_o_n_valid2(dut, 0)
yield dut.p.data_i.eq(2)
yield
- yield from check_o_n_valid2(dut, 1) # ok *now* i_p_valid effect is felt
- yield dut.n.ready_i.eq(0) # begin going into "stall" (next stage says ready)
+ yield from check_o_n_valid2(dut, 1) # ok *now* i_p_valid effect is felt
+ # begin going into "stall" (next stage says ready)
+ yield dut.n.ready_i.eq(0)
yield dut.p.data_i.eq(9)
yield
yield dut.p.valid_i.eq(0)
yield dut.p.data_i.eq(32)
yield dut.n.ready_i.eq(1)
yield
- yield from check_o_n_valid2(dut, 1) # buffer still needs to output
+ yield from check_o_n_valid2(dut, 1) # buffer still needs to output
yield
- yield from check_o_n_valid2(dut, 1) # buffer still needs to output
+ yield from check_o_n_valid2(dut, 1) # buffer still needs to output
yield
- yield from check_o_n_valid2(dut, 1) # buffer still needs to output
+ yield from check_o_n_valid2(dut, 1) # buffer still needs to output
yield
- yield from check_o_n_valid2(dut, 0) # buffer outputted, *now* we're done.
+ yield from check_o_n_valid2(dut, 0) # buffer outputted, *now* we're done.
yield
yield
yield
def send(self):
while self.o != len(self.data):
send_range = randint(0, 3)
- for j in range(randint(1,10)):
+ for j in range(randint(1, 10)):
if send_range == 0:
send = True
else:
def rcv(self):
while self.o != len(self.data):
stall_range = randint(0, 3)
- for j in range(randint(1,10)):
+ for j in range(randint(1, 10)):
stall = randint(0, stall_range) != 0
yield self.dut.n.ready_i.eq(stall)
yield
if self.o == len(self.data):
break
+
def resultfn_3(data_o, expected, i, o):
assert data_o == expected + 1, \
- "%d-%d data %x not match %x\n" \
- % (i, o, data_o, expected)
+ "%d-%d data %x not match %x\n" \
+ % (i, o, data_o, expected)
+
def data_placeholder():
- data = []
- for i in range(num_tests):
- d = PlaceHolder()
- d.src1 = randint(0, 1<<16-1)
- d.src2 = randint(0, 1<<16-1)
- data.append(d)
- return data
+ data = []
+ for i in range(num_tests):
+ d = PlaceHolder()
+ d.src1 = randint(0, 1 << 16-1)
+ d.src2 = randint(0, 1 << 16-1)
+ data.append(d)
+ return data
+
def data_dict():
- data = []
- for i in range(num_tests):
- data.append({'src1': randint(0, 1<<16-1),
- 'src2': randint(0, 1<<16-1)})
- return data
+ data = []
+ for i in range(num_tests):
+ data.append({'src1': randint(0, 1 << 16-1),
+ 'src2': randint(0, 1 << 16-1)})
+ return data
class Test5:
else:
self.data = []
for i in range(num_tests):
- self.data.append((randint(0, 1<<16-1), randint(0, 1<<16-1)))
+ self.data.append(
+ (randint(0, 1 << 16-1), randint(0, 1 << 16-1)))
self.i = 0
self.o = 0
def send(self):
while self.o != len(self.data):
send_range = randint(0, 3)
- for j in range(randint(1,10)):
+ for j in range(randint(1, 10)):
if send_range == 0:
send = True
else:
def rcv(self):
while self.o != len(self.data):
stall_range = randint(0, 3)
- for j in range(randint(1,10)):
+ for j in range(randint(1, 10)):
ready = randint(0, stall_range) != 0
#ready = True
yield self.dut.n.ready_i.eq(ready)
if self.o == len(self.data):
break
+
class TestMask:
def __init__(self, dut, resultfn, maskwid, data=None, stage_ctl=False,
- latching=False):
+ latching=False):
self.dut = dut
self.resultfn = resultfn
self.stage_ctl = stage_ctl
else:
self.data = []
for i in range(num_tests):
- self.data.append((randint(0, 1<<16-1), randint(0, 1<<16-1)))
+ self.data.append(
+ (randint(0, 1 << 16-1), randint(0, 1 << 16-1)))
self.i = 0
self.o = 0
def send(self):
while self.o != len(self.data):
send_range = randint(0, 3)
- for j in range(randint(1,10)):
+ for j in range(randint(1, 10)):
if send_range == 0:
send = True
else:
self.latchmode = 1 - self.latchmode
yield self.dut.latchmode.eq(self.latchmode)
mode = yield self.dut.latchmode
- print ("latching", mode)
+ print("latching", mode)
if send and self.i != len(self.data):
- print ("send", self.i, self.data[self.i])
+ print("send", self.i, self.data[self.i])
yield self.dut.p.valid_i.eq(1)
- yield self.dut.p.mask_i.eq(1<<self.i) # XXX TODO
+ yield self.dut.p.mask_i.eq(1 << self.i) # XXX TODO
for v in self.dut.set_input(self.data[self.i]):
yield v
self.i += 1
else:
yield self.dut.p.valid_i.eq(0)
- yield self.dut.p.mask_i.eq(0) # XXX TODO
+ yield self.dut.p.mask_i.eq(0) # XXX TODO
yield
def rcv(self):
while self.o != len(self.data):
stall_range = randint(0, 3)
- for j in range(randint(1,10)):
+ for j in range(randint(1, 10)):
ready = randint(0, stall_range) != 0
ready = True
yield self.dut.n.ready_i.eq(ready)
data_o[k] = yield v
else:
data_o = yield self.dut.n.data_o
- print ("recv", self.o, data_o)
+ print("recv", self.o, data_o)
self.resultfn(data_o, self.data[self.o], self.i, self.o)
self.o += 1
if self.o == len(self.data):
break
+
def resultfn_5(data_o, expected, i, o):
res = expected[0] + expected[1]
assert data_o == res, \
- "%d-%d data %x not match %s\n" \
- % (i, o, data_o, repr(expected))
+ "%d-%d data %x not match %s\n" \
+ % (i, o, data_o, repr(expected))
+
def tbench4(dut):
data = []
if o_n_valid and i_n_ready:
data_o = yield dut.n.data_o
assert data_o == data[o] + 2, "%d-%d data %x not match %x\n" \
- % (i, o, data_o, data[o])
+ % (i, o, data_o, data[o])
o += 1
if o == len(data):
break
# Test 2 and 4
######################################################################
+
class ExampleBufPipe2(ControlBase):
""" Example of how to do chained pipeline stages.
"""
class ExampleBufPipeChain2(BufferedHandshake):
""" connects two stages together as a *single* combinatorial stage.
"""
+
def __init__(self):
stage1 = ExampleStageCls()
stage2 = ExampleStageCls()
def data_chain2():
- data = []
- for i in range(num_tests):
- data.append(randint(0, 1<<16-2))
- return data
+ data = []
+ for i in range(num_tests):
+ data.append(randint(0, 1 << 16-2))
+ return data
def resultfn_9(data_o, expected, i, o):
res = expected + 2
assert data_o == res, \
- "%d-%d received data %x not match expected %x\n" \
- % (i, o, data_o, res)
+ "%d-%d received data %x not match expected %x\n" \
+ % (i, o, data_o, res)
######################################################################
class LTStage(StageCls):
""" module-based stage example
"""
+
def __init__(self):
self.slt = SetLessThan(16, True)
Signal(16, name="%s_sig2" % name))
def ospec(self, name):
- return Signal(16, "%s_out" % name)
+ return Signal(16, name="%s_out" % name)
def setup(self, m, i):
self.o = Signal(16)
def resultfn_6(data_o, expected, i, o):
res = 1 if expected[0] < expected[1] else 0
assert data_o == res, \
- "%d-%d data %x not match %s\n" \
- % (i, o, data_o, repr(expected))
+ "%d-%d data %x not match %s\n" \
+ % (i, o, data_o, repr(expected))
######################################################################
"""
record_spec = [('src1', 16), ('src2', 16)]
+
def ispec(self):
""" returns a Record using the specification
"""
# Test 11
######################################################################
+
class ExampleAddRecordPlaceHolderStage(StageCls):
""" example use of a Record, with a placeholder as the processing result
"""
record_spec = [('src1', 16), ('src2', 16)]
+
def ispec(self):
""" returns a Record using the specification
"""
# a dummy class that may have stuff assigned to instances once created
-class PlaceHolder: pass
+class PlaceHolder:
+ pass
class ExampleAddRecordPipe(UnbufferedPipeline):
def resultfn_7(data_o, expected, i, o):
res = (expected['src1'] + 1, expected['src2'] + 1)
assert data_o['src1'] == res[0] and data_o['src2'] == res[1], \
- "%d-%d data %s not match %s\n" \
- % (i, o, repr(data_o), repr(expected))
+ "%d-%d data %s not match %s\n" \
+ % (i, o, repr(data_o), repr(expected))
class ExampleAddRecordPlaceHolderPipe(UnbufferedPipeline):
res1 = expected.src1 + 1
res2 = expected.src2 + 1
assert data_o['src1'] == res1 and data_o['src2'] == res2, \
- "%d-%d data %s not match %s\n" \
- % (i, o, repr(data_o), repr(expected))
+ "%d-%d data %s not match %s\n" \
+ % (i, o, repr(data_o), repr(expected))
######################################################################
easiest way to do that is to create a class that has the exact
same member layout (self.op1, self.op2) as Example2OpClass
"""
+
def __init__(self, op1, op2):
self.op1 = op1
self.op2 = op2
def resultfn_8(data_o, expected, i, o):
- res = expected.op1 + expected.op2 # these are a TestInputAdd instance
+ res = expected.op1 + expected.op2 # these are a TestInputAdd instance
assert data_o == res, \
- "%d-%d data %s res %x not match %s\n" \
- % (i, o, repr(data_o), res, repr(expected))
+ "%d-%d data %s res %x not match %s\n" \
+ % (i, o, repr(data_o), res, repr(expected))
+
def data_2op():
- data = []
- for i in range(num_tests):
- data.append(TestInputAdd(randint(0, 1<<16-1), randint(0, 1<<16-1)))
- return data
+ data = []
+ for i in range(num_tests):
+ data.append(TestInputAdd(randint(0, 1 << 16-1), randint(0, 1 << 16-1)))
+ return data
######################################################################
def d_ready(self):
""" data is ready to be accepted when this is true
"""
- return (self.count == 1)# | (self.count == 3)
+ return (self.count == 1) # | (self.count == 3)
return Const(1)
def d_valid(self, ready_i):
def data_chain1():
- data = []
- for i in range(num_tests):
- data.append(1<<((i*3)%15))
- #data.append(randint(0, 1<<16-2))
- #print (hex(data[-1]))
- return data
+ data = []
+ for i in range(num_tests):
+ data.append(1 << ((i*3) % 15))
+ #data.append(randint(0, 1<<16-2))
+ #print (hex(data[-1]))
+ return data
def resultfn_12(data_o, expected, i, o):
res = expected + 1
assert data_o == res, \
- "%d-%d data %x not match %x\n" \
- % (i, o, data_o, res)
+ "%d-%d data %x not match %x\n" \
+ % (i, o, data_o, res)
######################################################################
# Test 15
######################################################################
+
class ExampleBufModeAdd1Pipe(SimpleHandshake):
def __init__(self):
# Test 17
######################################################################
+
class ExampleUnBufAdd1Pipe2(UnbufferedPipeline2):
def __init__(self):
class PassThroughTest(PassThroughHandshake):
def iospecfn(self):
- return Signal(16, "out")
+ return Signal(16, name="out")
def __init__(self):
stage = PassThroughStage(self.iospecfn)
PassThroughHandshake.__init__(self, stage)
+
def resultfn_identical(data_o, expected, i, o):
res = expected
assert data_o == res, \
- "%d-%d data %x not match %x\n" \
- % (i, o, data_o, res)
+ "%d-%d data %x not match %x\n" \
+ % (i, o, data_o, res)
######################################################################
def iospecfn():
return Signal(16, name="d_in")
+
class FIFOTest16(FIFOControl):
def __init__(self):
def iospecfnrecord():
return Example2OpRecord()
+
class FIFOTestRecordControl(FIFOControl):
def __init__(self):
FIFOControl.__init__(self, 2, stage)
-
######################################################################
# Test 25
######################################################################
def iospecfn24():
return (Signal(16, name="src1"), Signal(16, name="src2"))
+
class FIFOTest2x16(FIFOControl):
def __init__(self):
# http://bugs.libre-riscv.org/show_bug.cgi?id=57
######################################################################
+
class ExampleBufAdd1Pipe(BufferedHandshake):
def __init__(self):
easiest way to do that is to create a class that has the exact
same member layout (self.op1, self.op2) as Example2OpClass
"""
+
def __init__(self, src1, src2):
self.src1 = src1
self.src2 = src2
def __repr__(self):
return "<TestInputMask %x %x" % (self.src1, self.src2)
+
class ExampleMaskCancellable(StageCls):
def ispec(self):
""" connects two stages together as a *single* combinatorial stage.
"""
+
def __init__(self, dynamic=False, maskwid=16):
stage1 = ExampleMaskCancellable()
stage2 = ExampleMaskCancellable()
""" connects a stage to a cancellable pipe with "dynamic" mode on.
"""
+
def __init__(self, dynamic=True, maskwid=16):
stage = ExampleMaskCancellable()
MaskCancellable.__init__(self, stage, maskwid, dynamic=dynamic)
def data_chain0(n_tests):
- data = []
- for i in range(n_tests):
- data.append(TestInputMask(randint(0, 1<<16-1),
- randint(0, 1<<16-1)))
- return data
+ data = []
+ for i in range(n_tests):
+ data.append(TestInputMask(randint(0, 1 << 16-1),
+ randint(0, 1 << 16-1)))
+ return data
def resultfn_0(data_o, expected, i, o):
assert data_o['src1'] == expected.src1 + 2, \
- "src1 %x-%x received data no match\n" \
- % (data_o['src1'], expected.src1 + 2)
+ "src1 %x-%x received data no match\n" \
+ % (data_o['src1'], expected.src1 + 2)
assert data_o['src2'] == expected.src2 + 2, \
- "src2 %x-%x received data no match\n" \
- % (data_o['src2'] , expected.src2 + 2)
+ "src2 %x-%x received data no match\n" \
+ % (data_o['src2'], expected.src2 + 2)
######################################################################
num_tests = 10
+
def test0():
maskwid = num_tests
- print ("test 0")
+ print("test 0")
dut = MaskCancellablePipe(maskwid)
ports = [dut.p.valid_i, dut.n.ready_i,
dut.n.valid_o, dut.p.ready_o] + \
- dut.p.data_i.ports() + dut.n.data_o.ports()
+ dut.p.data_i.ports() + dut.n.data_o.ports()
vl = rtlil.convert(dut, ports=ports)
with open("test_maskchain0.il", "w") as f:
f.write(vl)
data = data_chain0(maskwid)
test = TestMask(dut, resultfn_0, maskwid, data=data)
- run_simulation(dut, [test.send, test.rcv],
- vcd_name="test_maskchain0.vcd")
+ run_simulation(dut, [test.send(), test.rcv()],
+ vcd_name="test_maskchain0.vcd")
+
def test0_1():
maskwid = 32
- print ("test 0.1")
+ print("test 0.1")
dut = MaskCancellableDynamic(maskwid=maskwid)
ports = [dut.p.valid_i, dut.n.ready_i,
- dut.n.valid_o, dut.p.ready_o] #+ \
- #dut.p.data_i.ports() + dut.n.data_o.ports()
+ dut.n.valid_o, dut.p.ready_o] # + \
+ #dut.p.data_i.ports() + dut.n.data_o.ports()
vl = rtlil.convert(dut, ports=ports)
with open("test_maskchain0_dynamic.il", "w") as f:
f.write(vl)
data = data_chain0(maskwid)
test = TestMask(dut, resultfn_0, maskwid, data=data, latching=True)
- run_simulation(dut, [test.send, test.rcv],
- vcd_name="test_maskchain0_dynamic.vcd")
+ run_simulation(dut, [test.send(), test.rcv()],
+ vcd_name="test_maskchain0_dynamic.vcd")
+
def notworking1():
- print ("test 1")
+ print("test 1")
dut = ExampleBufPipe()
run_simulation(dut, tbench(dut), vcd_name="test_bufpipe.vcd")
+
def notworking2():
- print ("test 2")
+ print("test 2")
dut = ExampleBufPipe2()
run_simulation(dut, tbench2(dut), vcd_name="test_bufpipe2.vcd")
ports = [dut.p.valid_i, dut.n.ready_i,
dut.n.valid_o, dut.p.ready_o] + \
- [dut.p.data_i] + [dut.n.data_o]
+ [dut.p.data_i] + [dut.n.data_o]
vl = rtlil.convert(dut, ports=ports)
with open("test_bufpipe2.il", "w") as f:
f.write(vl)
+
def test3():
- print ("test 3")
+ print("test 3")
dut = ExampleBufPipe()
test = Test3(dut, resultfn_3)
- run_simulation(dut, [test.send, test.rcv], vcd_name="test_bufpipe3.vcd")
+ run_simulation(dut, [test.send(), test.rcv()],
+ vcd_name="test_bufpipe3.vcd")
+
def test3_5():
- print ("test 3.5")
+ print("test 3.5")
dut = ExamplePipeline()
test = Test3(dut, resultfn_3)
- run_simulation(dut, [test.send, test.rcv], vcd_name="test_combpipe3.vcd")
+ run_simulation(dut, [test.send(), test.rcv()],
+ vcd_name="test_combpipe3.vcd")
+
def test4():
- print ("test 4")
+ print("test 4")
dut = ExampleBufPipe2()
run_simulation(dut, tbench4(dut), vcd_name="test_bufpipe4.vcd")
+
def test5():
- print ("test 5")
+ print("test 5")
dut = ExampleBufPipeAdd()
test = Test5(dut, resultfn_5, stage_ctl=True)
- run_simulation(dut, [test.send, test.rcv], vcd_name="test_bufpipe5.vcd")
+ run_simulation(dut, [test.send(), test.rcv()],
+ vcd_name="test_bufpipe5.vcd")
+
def test6():
- print ("test 6")
+ print("test 6")
dut = ExampleLTPipeline()
test = Test5(dut, resultfn_6)
- run_simulation(dut, [test.send, test.rcv], vcd_name="test_ltcomb6.vcd")
+ run_simulation(dut, [test.send(), test.rcv()], vcd_name="test_ltcomb6.vcd")
ports = [dut.p.valid_i, dut.n.ready_i,
dut.n.valid_o, dut.p.ready_o] + \
- list(dut.p.data_i) + [dut.n.data_o]
+ list(dut.p.data_i) + [dut.n.data_o]
vl = rtlil.convert(dut, ports=ports)
with open("test_ltcomb_pipe.il", "w") as f:
f.write(vl)
+
def test7():
- print ("test 7")
+ print("test 7")
dut = ExampleAddRecordPipe()
- data=data_dict()
+ data = data_dict()
test = Test5(dut, resultfn_7, data=data)
ports = [dut.p.valid_i, dut.n.ready_i,
dut.n.valid_o, dut.p.ready_o,
vl = rtlil.convert(dut, ports=ports)
with open("test_recordcomb_pipe.il", "w") as f:
f.write(vl)
- run_simulation(dut, [test.send, test.rcv], vcd_name="test_addrecord.vcd")
+ run_simulation(dut, [test.send(), test.rcv()],
+ vcd_name="test_addrecord.vcd")
+
def test8():
- print ("test 8")
+ print("test 8")
dut = ExampleBufPipeAddClass()
- data=data_2op()
+ data = data_2op()
test = Test5(dut, resultfn_8, data=data)
- run_simulation(dut, [test.send, test.rcv], vcd_name="test_bufpipe8.vcd")
+ run_simulation(dut, [test.send(), test.rcv()],
+ vcd_name="test_bufpipe8.vcd")
+
def test9():
- print ("test 9")
+ print("test 9")
dut = ExampleBufPipeChain2()
ports = [dut.p.valid_i, dut.n.ready_i,
dut.n.valid_o, dut.p.ready_o] + \
- [dut.p.data_i] + [dut.n.data_o]
+ [dut.p.data_i] + [dut.n.data_o]
vl = rtlil.convert(dut, ports=ports)
with open("test_bufpipechain2.il", "w") as f:
f.write(vl)
data = data_chain2()
test = Test5(dut, resultfn_9, data=data)
- run_simulation(dut, [test.send, test.rcv],
- vcd_name="test_bufpipechain2.vcd")
+ run_simulation(dut, [test.send(), test.rcv()],
+ vcd_name="test_bufpipechain2.vcd")
+
def test10():
- print ("test 10")
+ print("test 10")
dut = ExampleLTBufferedPipeDerived()
test = Test5(dut, resultfn_6)
- run_simulation(dut, [test.send, test.rcv], vcd_name="test_ltbufpipe10.vcd")
+ run_simulation(dut, [test.send(), test.rcv()],
+ vcd_name="test_ltbufpipe10.vcd")
ports = dut.ports()
vl = rtlil.convert(dut, ports=ports)
with open("test_ltbufpipe10.il", "w") as f:
f.write(vl)
+
def test11():
- print ("test 11")
+ print("test 11")
dut = ExampleAddRecordPlaceHolderPipe()
- data=data_placeholder()
+ data = data_placeholder()
test = Test5(dut, resultfn_11, data=data)
- run_simulation(dut, [test.send, test.rcv], vcd_name="test_addrecord.vcd")
+ run_simulation(dut, [test.send(), test.rcv()],
+ vcd_name="test_addrecord.vcd")
def test12():
- print ("test 12")
+ print("test 12")
dut = ExampleBufDelayedPipe()
data = data_chain1()
test = Test5(dut, resultfn_12, data=data)
- run_simulation(dut, [test.send, test.rcv], vcd_name="test_bufpipe12.vcd")
+ run_simulation(dut, [test.send(), test.rcv()],
+ vcd_name="test_bufpipe12.vcd")
ports = [dut.p.valid_i, dut.n.ready_i,
dut.n.valid_o, dut.p.ready_o] + \
- [dut.p.data_i] + [dut.n.data_o]
+ [dut.p.data_i] + [dut.n.data_o]
vl = rtlil.convert(dut, ports=ports)
with open("test_bufpipe12.il", "w") as f:
f.write(vl)
+
def test13():
- print ("test 13")
+ print("test 13")
dut = ExampleUnBufDelayedPipe()
data = data_chain1()
test = Test5(dut, resultfn_12, data=data)
- run_simulation(dut, [test.send, test.rcv], vcd_name="test_unbufpipe13.vcd")
+ run_simulation(dut, [test.send(), test.rcv()],
+ vcd_name="test_unbufpipe13.vcd")
ports = [dut.p.valid_i, dut.n.ready_i,
dut.n.valid_o, dut.p.ready_o] + \
- [dut.p.data_i] + [dut.n.data_o]
+ [dut.p.data_i] + [dut.n.data_o]
vl = rtlil.convert(dut, ports=ports)
with open("test_unbufpipe13.il", "w") as f:
f.write(vl)
+
def test15():
- print ("test 15")
+ print("test 15")
dut = ExampleBufModeAdd1Pipe()
data = data_chain1()
test = Test5(dut, resultfn_12, data=data)
- run_simulation(dut, [test.send, test.rcv], vcd_name="test_bufunbuf15.vcd")
+ run_simulation(dut, [test.send(), test.rcv()],
+ vcd_name="test_bufunbuf15.vcd")
ports = [dut.p.valid_i, dut.n.ready_i,
dut.n.valid_o, dut.p.ready_o] + \
- [dut.p.data_i] + [dut.n.data_o]
+ [dut.p.data_i] + [dut.n.data_o]
vl = rtlil.convert(dut, ports=ports)
with open("test_bufunbuf15.il", "w") as f:
f.write(vl)
+
def test16():
- print ("test 16")
+ print("test 16")
dut = ExampleBufModeUnBufPipe()
data = data_chain1()
test = Test5(dut, resultfn_9, data=data)
- run_simulation(dut, [test.send, test.rcv], vcd_name="test_bufunbuf16.vcd")
+ run_simulation(dut, [test.send(), test.rcv()],
+ vcd_name="test_bufunbuf16.vcd")
ports = [dut.p.valid_i, dut.n.ready_i,
dut.n.valid_o, dut.p.ready_o] + \
- [dut.p.data_i] + [dut.n.data_o]
+ [dut.p.data_i] + [dut.n.data_o]
vl = rtlil.convert(dut, ports=ports)
with open("test_bufunbuf16.il", "w") as f:
f.write(vl)
+
def test17():
- print ("test 17")
+ print("test 17")
dut = ExampleUnBufAdd1Pipe2()
data = data_chain1()
test = Test5(dut, resultfn_12, data=data)
- run_simulation(dut, [test.send, test.rcv], vcd_name="test_unbufpipe17.vcd")
+ run_simulation(dut, [test.send(), test.rcv()],
+ vcd_name="test_unbufpipe17.vcd")
ports = [dut.p.valid_i, dut.n.ready_i,
dut.n.valid_o, dut.p.ready_o] + \
- [dut.p.data_i] + [dut.n.data_o]
+ [dut.p.data_i] + [dut.n.data_o]
vl = rtlil.convert(dut, ports=ports)
with open("test_unbufpipe17.il", "w") as f:
f.write(vl)
+
def test18():
- print ("test 18")
+ print("test 18")
dut = PassThroughTest()
data = data_chain1()
test = Test5(dut, resultfn_identical, data=data)
- run_simulation(dut, [test.send, test.rcv], vcd_name="test_passthru18.vcd")
+ run_simulation(dut, [test.send(), test.rcv()],
+ vcd_name="test_passthru18.vcd")
ports = [dut.p.valid_i, dut.n.ready_i,
dut.n.valid_o, dut.p.ready_o] + \
- [dut.p.data_i] + [dut.n.data_o]
+ [dut.p.data_i] + [dut.n.data_o]
vl = rtlil.convert(dut, ports=ports)
with open("test_passthru18.il", "w") as f:
f.write(vl)
+
def test19():
- print ("test 19")
+ print("test 19")
dut = ExampleBufPassThruPipe()
data = data_chain1()
test = Test5(dut, resultfn_9, data=data)
- run_simulation(dut, [test.send, test.rcv], vcd_name="test_bufpass19.vcd")
+ run_simulation(dut, [test.send(), test.rcv()],
+ vcd_name="test_bufpass19.vcd")
ports = [dut.p.valid_i, dut.n.ready_i,
dut.n.valid_o, dut.p.ready_o] + \
- [dut.p.data_i] + [dut.n.data_o]
+ [dut.p.data_i] + [dut.n.data_o]
vl = rtlil.convert(dut, ports=ports)
with open("test_bufpass19.il", "w") as f:
f.write(vl)
+
def test20():
- print ("test 20")
+ print("test 20")
dut = FIFOTest16()
data = data_chain1()
test = Test5(dut, resultfn_identical, data=data)
- run_simulation(dut, [test.send, test.rcv], vcd_name="test_fifo20.vcd")
+ run_simulation(dut, [test.send(), test.rcv()], vcd_name="test_fifo20.vcd")
ports = [dut.p.valid_i, dut.n.ready_i,
dut.n.valid_o, dut.p.ready_o] + \
- [dut.p.data_i] + [dut.n.data_o]
+ [dut.p.data_i] + [dut.n.data_o]
vl = rtlil.convert(dut, ports=ports)
with open("test_fifo20.il", "w") as f:
f.write(vl)
+
def test21():
- print ("test 21")
+ print("test 21")
dut = ExampleFIFOPassThruPipe1()
data = data_chain1()
test = Test5(dut, resultfn_12, data=data)
- run_simulation(dut, [test.send, test.rcv], vcd_name="test_fifopass21.vcd")
+ run_simulation(dut, [test.send(), test.rcv()],
+ vcd_name="test_fifopass21.vcd")
ports = [dut.p.valid_i, dut.n.ready_i,
dut.n.valid_o, dut.p.ready_o] + \
- [dut.p.data_i] + [dut.n.data_o]
+ [dut.p.data_i] + [dut.n.data_o]
vl = rtlil.convert(dut, ports=ports)
with open("test_fifopass21.il", "w") as f:
f.write(vl)
+
def test22():
- print ("test 22")
+ print("test 22")
dut = ExampleRecordHandshakeAddClass()
- data=data_2op()
+ data = data_2op()
test = Test5(dut, resultfn_8, data=data)
- run_simulation(dut, [test.send, test.rcv], vcd_name="test_addrecord22.vcd")
+ run_simulation(dut, [test.send(), test.rcv()],
+ vcd_name="test_addrecord22.vcd")
ports = [dut.p.valid_i, dut.n.ready_i,
dut.n.valid_o, dut.p.ready_o] + \
- [dut.p.data_i.op1, dut.p.data_i.op2] + \
- [dut.n.data_o]
+ [dut.p.data_i.op1, dut.p.data_i.op2] + \
+ [dut.n.data_o]
vl = rtlil.convert(dut, ports=ports)
with open("test_addrecord22.il", "w") as f:
f.write(vl)
+
def test23():
- print ("test 23")
+ print("test 23")
dut = ExampleFIFORecordObjectPipe()
- data=data_2op()
+ data = data_2op()
test = Test5(dut, resultfn_8, data=data)
- run_simulation(dut, [test.send, test.rcv], vcd_name="test_addrecord23.vcd")
+ run_simulation(dut, [test.send(), test.rcv()],
+ vcd_name="test_addrecord23.vcd")
ports = [dut.p.valid_i, dut.n.ready_i,
dut.n.valid_o, dut.p.ready_o] + \
- [dut.p.data_i.op1, dut.p.data_i.op2] + \
- [dut.n.data_o]
+ [dut.p.data_i.op1, dut.p.data_i.op2] + \
+ [dut.n.data_o]
vl = rtlil.convert(dut, ports=ports)
with open("test_addrecord23.il", "w") as f:
f.write(vl)
+
def test24():
- print ("test 24")
+ print("test 24")
dut = FIFOTestRecordAddStageControl()
- data=data_2op()
+ data = data_2op()
test = Test5(dut, resultfn_8, data=data)
ports = [dut.p.valid_i, dut.n.ready_i,
dut.n.valid_o, dut.p.ready_o] + \
- [dut.p.data_i.op1, dut.p.data_i.op2] + \
- [dut.n.data_o]
+ [dut.p.data_i.op1, dut.p.data_i.op2] + \
+ [dut.n.data_o]
vl = rtlil.convert(dut, ports=ports)
with open("test_addrecord24.il", "w") as f:
f.write(vl)
- run_simulation(dut, [test.send, test.rcv], vcd_name="test_addrecord24.vcd")
+ run_simulation(dut, [test.send(), test.rcv()],
+ vcd_name="test_addrecord24.vcd")
+
def test25():
- print ("test 25")
+ print("test 25")
dut = ExampleFIFOAdd2Pipe()
data = data_chain1()
test = Test5(dut, resultfn_9, data=data)
- run_simulation(dut, [test.send, test.rcv], vcd_name="test_add2pipe25.vcd")
+ run_simulation(dut, [test.send(), test.rcv()],
+ vcd_name="test_add2pipe25.vcd")
ports = [dut.p.valid_i, dut.n.ready_i,
dut.n.valid_o, dut.p.ready_o] + \
- [dut.p.data_i] + [dut.n.data_o]
+ [dut.p.data_i] + [dut.n.data_o]
vl = rtlil.convert(dut, ports=ports)
with open("test_add2pipe25.il", "w") as f:
f.write(vl)
+
def test997():
- print ("test 997")
+ print("test 997")
dut = ExampleBufPassThruPipe2()
data = data_chain1()
test = Test5(dut, resultfn_9, data=data)
- run_simulation(dut, [test.send, test.rcv], vcd_name="test_bufpass997.vcd")
+ run_simulation(dut, [test.send(), test.rcv()],
+ vcd_name="test_bufpass997.vcd")
ports = [dut.p.valid_i, dut.n.ready_i,
dut.n.valid_o, dut.p.ready_o] + \
- [dut.p.data_i] + [dut.n.data_o]
+ [dut.p.data_i] + [dut.n.data_o]
vl = rtlil.convert(dut, ports=ports)
with open("test_bufpass997.il", "w") as f:
f.write(vl)
+
+@unittest.skip("buggy -- fails due to exceeding step count limit") # FIXME
def test998():
- print ("test 998 (fails, bug)")
+ print("test 998 (fails, bug)")
dut = ExampleBufPipe3()
data = data_chain1()
test = Test5(dut, resultfn_9, data=data)
- run_simulation(dut, [test.send, test.rcv], vcd_name="test_bufpipe14.vcd")
+ run_simulation(dut, [test.send(), test.rcv()],
+ vcd_name="test_bufpipe14.vcd")
ports = [dut.p.valid_i, dut.n.ready_i,
dut.n.valid_o, dut.p.ready_o] + \
- [dut.p.data_i] + [dut.n.data_o]
+ [dut.p.data_i] + [dut.n.data_o]
vl = rtlil.convert(dut, ports=ports)
with open("test_bufpipe14.il", "w") as f:
f.write(vl)
+
def test999():
- print ("test 999 (expected to fail, which is a bug)")
+ print("test 999 (expected to fail, which is a bug)")
dut = ExampleBufUnBufPipe()
data = data_chain1()
test = Test5(dut, resultfn_9, data=data)
- run_simulation(dut, [test.send, test.rcv], vcd_name="test_bufunbuf999.vcd")
+ run_simulation(dut, [test.send(), test.rcv()],
+ vcd_name="test_bufunbuf999.vcd")
ports = [dut.p.valid_i, dut.n.ready_i,
dut.n.valid_o, dut.p.ready_o] + \
- [dut.p.data_i] + [dut.n.data_o]
+ [dut.p.data_i] + [dut.n.data_o]
vl = rtlil.convert(dut, ports=ports)
with open("test_bufunbuf999.il", "w") as f:
f.write(vl)
+
if __name__ == '__main__':
test0()
test0_1()
from nmutil.multipipe import PriorityCombMuxInPipe
from nmutil.singlepipe import MaskCancellable, RecordObject, Object
+from . import StepLimiter
+import unittest
+
class PassData(Object):
def __init__(self):
self.idx = Signal(8, reset_less=True)
self.data = Signal(16, reset_less=True)
self.operator = Signal(2, reset_less=True)
- self.routeid = Signal(2, reset_less=True) # muxidname
+ self.routeid = Signal(2, reset_less=True) # muxidname
class PassThroughStage:
def __init__(self):
self.o = self.ospec()
+
def ispec(self):
return PassData()
+
def ospec(self):
- return self.ispec() # same as ospec
+ return self.ispec() # same as ospec
+
def _setup(self, m, i):
comb = m.d.comb
#comb += self.o.eq(i)
+
def process(self, i):
return i
def ispec(self):
return PassData()
+
def ospec(self):
return PassData()
comb = m.d.comb
comb += self.o.eq(i)
with m.If(i.operator == Const(1, 2)):
- comb += self.o.routeid.eq(1) # selects 2nd output in CombMuxOutPipe
- comb += self.o.data.eq(i.data + 1) # add 1 to say "we did it"
- comb += self.o.operator.eq(2) # don't get into infinite loop
+ # selects 2nd output in CombMuxOutPipe
+ comb += self.o.routeid.eq(1)
+ comb += self.o.data.eq(i.data + 1) # add 1 to say "we did it"
+ comb += self.o.operator.eq(2) # don't get into infinite loop
with m.Else():
- comb += self.o.routeid.eq(0) # selects 2nd output in CombMuxOutPipe
+ # selects 2nd output in CombMuxOutPipe
+ comb += self.o.routeid.eq(0)
def process(self, i):
return self.o
stage = SplitRouteStage()
MaskCancellable.__init__(self, stage, maskwid)
+
class RouteBackPipe(CombMuxOutPipe):
""" routes data back to start of pipeline
"""
+
def __init__(self):
stage = PassThroughStage()
CombMuxOutPipe.__init__(self, stage, n_len=2,
class MergeRoutePipe(PriorityCombMuxInPipe):
""" merges data coming from end of pipe (with operator now == 1)
"""
+
def __init__(self):
stage = PassThroughStage()
PriorityCombMuxInPipe.__init__(self, stage, p_len=2, maskwid=4,
- routemask=True)
-
+ routemask=True)
class PassThroughPipe(MaskCancellable):
self.do[muxid] = {}
self.sent[muxid] = []
for i in range(self.tlen):
- self.di[muxid][i] = randint(0, 255) + (muxid<<8)
+ self.di[muxid][i] = randint(0, 255) + (muxid << 8)
self.do[muxid][i] = self.di[muxid][i]
def send(self, muxid):
yield rs.mask_i.eq(1)
yield
o_p_ready = yield rs.ready_o
+ step_limiter = StepLimiter(10000)
while not o_p_ready:
+ step_limiter.step()
yield
o_p_ready = yield rs.ready_o
- print ("send", muxid, i, hex(op2), op2)
+ print("send", muxid, i, hex(op2), op2)
self.sent[muxid].append(i)
yield rs.valid_i.eq(0)
yield rs.mask_i.eq(0)
# wait until it's received
+ step_limiter = StepLimiter(10000)
while i in self.do[muxid]:
+ step_limiter.step()
yield
# wait random period of time before queueing another value
yield rs.valid_i.eq(0)
yield
- print ("send ended", muxid)
+ print("send ended", muxid)
- ## wait random period of time before queueing another value
- #for i in range(randint(0, 3)):
+ # wait random period of time before queueing another value
+ # for i in range(randint(0, 3)):
# yield
#send_range = randint(0, 3)
- #if send_range == 0:
+ # if send_range == 0:
# send = True
- #else:
+ # else:
# send = randint(0, send_range) != 0
def rcv(self, muxid):
rs = self.dut.p[muxid]
- while True:
+ for _ in StepLimiter(10000):
# check cancellation
if False and self.sent[muxid] and randint(0, 2) == 0:
todel = self.sent[muxid].pop()
- print ("to delete", muxid, self.sent[muxid], todel)
+ print("to delete", muxid, self.sent[muxid], todel)
if todel in self.do[muxid]:
del self.do[muxid][todel]
yield rs.stop_i.eq(1)
- print ("left", muxid, self.do[muxid])
+ print("left", muxid, self.do[muxid])
if len(self.do[muxid]) == 0:
break
#stall_range = randint(0, 3)
- #for j in range(randint(1,10)):
+ # for j in range(randint(1,10)):
# stall = randint(0, stall_range) != 0
# yield self.dut.n[0].ready_i.eq(stall)
# yield
n = self.dut.n[muxid]
yield n.ready_i.eq(1)
yield
- yield rs.stop_i.eq(0) # resets cancel mask
+ yield rs.stop_i.eq(0) # resets cancel mask
o_n_valid = yield n.valid_o
i_n_ready = yield n.ready_i
if not o_n_valid or not i_n_ready:
out_i = yield n.data_o.idx
out_v = yield n.data_o.data
- print ("recv", out_muxid, out_i, hex(out_v), hex(out_v))
+ print("recv", out_muxid, out_i, hex(out_v), hex(out_v))
# see if this output has occurred already, delete it if it has
assert muxid == out_muxid, \
- "out_muxid %d not correct %d" % (out_muxid, muxid)
+ "out_muxid %d not correct %d" % (out_muxid, muxid)
if out_i not in self.sent[muxid]:
- print ("cancelled/recv", muxid, out_i)
+ print("cancelled/recv", muxid, out_i)
continue
assert out_i in self.do[muxid], "out_i %d not in array %s" % \
- (out_i, repr(self.do[muxid]))
- assert self.do[muxid][out_i] + 1 == out_v # check data
+ (out_i, repr(self.do[muxid]))
+ assert self.do[muxid][out_i] + 1 == out_v # check data
del self.do[muxid][out_i]
todel = self.sent[muxid].index(out_i)
del self.sent[muxid][todel]
if len(self.do[muxid]) == 0:
break
- print ("recv ended", muxid)
+ print("recv ended", muxid)
class TestPriorityMuxPipe(PriorityCombMuxInPipe):
class TestInOutPipe(Elaboratable):
def __init__(self, num_rows=4):
self.num_rows = nr = num_rows
- self.inpipe = TestPriorityMuxPipe(nr) # fan-in (combinatorial)
+ self.inpipe = TestPriorityMuxPipe(nr) # fan-in (combinatorial)
self.mergein = MergeRoutePipe() # merge in feedback
self.pipe1 = PassThroughPipe(nr) # stage 1 (clock-sync)
self.pipe2 = DecisionPipe(nr) # stage 2 (clock-sync)
- #self.pipe3 = PassThroughPipe(nr) # stage 3 (clock-sync)
- #self.pipe4 = PassThroughPipe(nr) # stage 4 (clock-sync)
+ # self.pipe3 = PassThroughPipe(nr) # stage 3 (clock-sync)
+ # self.pipe4 = PassThroughPipe(nr) # stage 4 (clock-sync)
self.splitback = RouteBackPipe() # split back to mergein
self.outpipe = TestMuxOutPipe(nr) # fan-out (combinatorial)
self.fifoback = PassThroughPipe(nr) # temp route-back store
self.p = self.inpipe.p # kinda annoying,
- self.n = self.outpipe.n # use pipe in/out as this class in/out
+ self.n = self.outpipe.n # use pipe in/out as this class in/out
self._ports = self.inpipe.ports() + self.outpipe.ports()
def elaborate(self, platform):
return self._ports
+@unittest.skip("buggy -- fails due to exceeding step count limit") # FIXME
def test1():
dut = TestInOutPipe()
vl = rtlil.convert(dut, ports=dut.ports())
tlen = 5
test = InputTest(dut, tlen)
- run_simulation(dut, [test.rcv(0), #test.rcv(1),
+ run_simulation(dut, [test.rcv(0), # test.rcv(1),
#test.rcv(3), test.rcv(2),
- test.send(0), #test.send(1),
+ test.send(0), # test.send(1),
#test.send(3), test.send(2),
- ],
+ ],
vcd_name="test_inoutmux_feedback_pipe.vcd")
if __name__ == '__main__':
#from cProfile import Profile
#p = Profile()
- #p.enable()
+ # p.enable()
test1()
- #p.disable()
- #p.print_stats()
+ # p.disable()
+ # p.print_stats()