from random import randint
from math import log
-from nmigen import Module, Signal, Cat
+from nmigen import Module, Signal, Cat, Elaboratable
from nmigen.compat.sim import run_simulation
from nmigen.cli import verilog, rtlil
from multipipe import CombMuxOutPipe
-from singlepipe import SimpleHandshake
+from singlepipe import SimpleHandshake, PassThroughHandshake, RecordObject
-class PassInData:
+class PassInData(RecordObject):
def __init__(self):
+ RecordObject.__init__(self)
self.mid = Signal(2, reset_less=True)
self.data = Signal(16, reset_less=True)
- def eq(self, i):
- return [self.mid.eq(i.mid), self.data.eq(i.data)]
-
- def ports(self):
- return [self.mid, self.data]
-
class PassThroughStage:
def ispec(self):
return PassInData()
- def ospec(self):
- return Signal(16, name="data_out", reset_less=True)
+ def ospec(self, name):
+ return Signal(16, name="%s_dout" % name, reset_less=True)
def process(self, i):
return i.data
-class PassThroughPipe(SimpleHandshake):
+class PassThroughPipe(PassThroughHandshake):
def __init__(self):
- SimpleHandshake.__init__(self, PassThroughDataStage())
-
-
-
-
-def testbench(dut):
- stb = yield dut.out_op.stb
- assert stb == 0
- ack = yield dut.out_op.ack
- assert ack == 0
-
- # set row 1 input 0
- yield dut.rs[1].in_op[0].eq(5)
- yield dut.rs[1].stb.eq(0b01) # strobe indicate 1st op ready
- #yield dut.rs[1].ack.eq(1)
- yield
-
- # check row 1 output (should be inactive)
- decode = yield dut.rs[1].out_decode
- assert decode == 0
- if False:
- op0 = yield dut.rs[1].out_op[0]
- op1 = yield dut.rs[1].out_op[1]
- assert op0 == 0 and op1 == 0
-
- # output should be inactive
- out_stb = yield dut.out_op.stb
- assert out_stb == 1
-
- # set row 0 input 1
- yield dut.rs[1].in_op[1].eq(6)
- yield dut.rs[1].stb.eq(0b11) # strobe indicate both ops ready
-
- # set acknowledgement of output... takes 1 cycle to respond
- yield dut.out_op.ack.eq(1)
- yield
- yield dut.out_op.ack.eq(0) # clear ack on output
- yield dut.rs[1].stb.eq(0) # clear row 1 strobe
-
- # output strobe should be active, MID should be 0 until "ack" is set...
- out_stb = yield dut.out_op.stb
- assert out_stb == 1
- out_mid = yield dut.mid
- assert out_mid == 0
-
- # ... and output should not yet be passed through either
- op0 = yield dut.out_op.v[0]
- op1 = yield dut.out_op.v[1]
- assert op0 == 0 and op1 == 0
-
- # wait for out_op.ack to activate...
- yield dut.rs[1].stb.eq(0b00) # set row 1 strobes to zero
- yield
-
- # *now* output should be passed through
- op0 = yield dut.out_op.v[0]
- op1 = yield dut.out_op.v[1]
- assert op0 == 5 and op1 == 6
-
- # set row 2 input
- yield dut.rs[2].in_op[0].eq(3)
- yield dut.rs[2].in_op[1].eq(4)
- yield dut.rs[2].stb.eq(0b11) # strobe indicate 1st op ready
- yield dut.out_op.ack.eq(1) # set output ack
- yield
- yield dut.rs[2].stb.eq(0) # clear row 2 strobe
- yield dut.out_op.ack.eq(0) # set output ack
- yield
- op0 = yield dut.out_op.v[0]
- op1 = yield dut.out_op.v[1]
- assert op0 == 3 and op1 == 4, "op0 %d op1 %d" % (op0, op1)
- out_mid = yield dut.mid
- assert out_mid == 2
-
- # set row 0 and 3 input
- yield dut.rs[0].in_op[0].eq(9)
- yield dut.rs[0].in_op[1].eq(8)
- yield dut.rs[0].stb.eq(0b11) # strobe indicate 1st op ready
- yield dut.rs[3].in_op[0].eq(1)
- yield dut.rs[3].in_op[1].eq(2)
- yield dut.rs[3].stb.eq(0b11) # strobe indicate 1st op ready
-
- # set acknowledgement of output... takes 1 cycle to respond
- yield dut.out_op.ack.eq(1)
- yield
- yield dut.rs[0].stb.eq(0) # clear row 1 strobe
- yield
- out_mid = yield dut.mid
- assert out_mid == 0, "out mid %d" % out_mid
-
- yield
- yield dut.rs[3].stb.eq(0) # clear row 1 strobe
- yield dut.out_op.ack.eq(0) # clear ack on output
- yield
- out_mid = yield dut.mid
- assert out_mid == 3, "out mid %d" % out_mid
+ PassThroughHandshake.__init__(self, PassThroughDataStage())
class OutputTest:
op2 = self.di[i][0]
mid = self.di[i][1]
rs = dut.p
- yield rs.i_valid.eq(1)
- yield rs.i_data.data.eq(op2)
- yield rs.i_data.mid.eq(mid)
+ yield rs.valid_i.eq(1)
+ yield rs.data_i.data.eq(op2)
+ yield rs.data_i.mid.eq(mid)
yield
- o_p_ready = yield rs.o_ready
+ o_p_ready = yield rs.ready_o
while not o_p_ready:
yield
- o_p_ready = yield rs.o_ready
+ o_p_ready = yield rs.ready_o
print ("send", mid, i, hex(op2))
- yield rs.i_valid.eq(0)
+ yield rs.valid_i.eq(0)
# wait random period of time before queueing another value
for i in range(randint(0, 3)):
yield
- yield rs.i_valid.eq(0)
+ yield rs.valid_i.eq(0)
def rcv(self, mid):
out_i = 0
count += 1
assert count != 2000, "timeout: too long"
n = self.dut.n[mid]
- yield n.i_ready.eq(1)
+ yield n.ready_i.eq(1)
yield
- o_n_valid = yield n.o_valid
- i_n_ready = yield n.i_ready
+ o_n_valid = yield n.valid_o
+ i_n_ready = yield n.ready_i
if not o_n_valid or not i_n_ready:
continue
- out_v = yield n.o_data
+ out_v = yield n.data_o
print ("recv", mid, out_i, hex(out_v))
stall_range = randint(0, 3)
stall = randint(0, stall_range) != 0
if stall:
- yield n.i_ready.eq(0)
+ yield n.ready_i.eq(0)
for i in range(stall_range):
yield
CombMuxOutPipe.__init__(self, stage, n_len=self.num_rows)
-class TestSyncToPriorityPipe:
+class TestSyncToPriorityPipe(Elaboratable):
def __init__(self):
self.num_rows = 4
self.pipe = PassThroughPipe()
def elaborate(self, platform):
m = Module()
- m.submodules += self.pipe
- m.submodules += self.muxpipe
+ m.submodules.pipe = self.pipe
+ m.submodules.muxpipe = self.muxpipe
m.d.comb += self.pipe.n.connect_to_next(self.muxpipe.p)
return m
def ports(self):
- res = [self.p.i_valid, self.p.o_ready] + \
- self.p.i_data.ports()
+ res = [self.p.valid_i, self.p.ready_o] + \
+ self.p.data_i.ports()
for i in range(len(self.n)):
- res += [self.n[i].i_ready, self.n[i].o_valid] + \
- [self.n[i].o_data]
- #self.n[i].o_data.ports()
+ res += [self.n[i].ready_i, self.n[i].valid_o] + \
+ [self.n[i].data_o]
+ #self.n[i].data_o.ports()
return res
vl = rtlil.convert(dut, ports=dut.ports())
with open("test_outmux_pipe.il", "w") as f:
f.write(vl)
- #run_simulation(dut, testbench(dut), vcd_name="test_inputgroup.vcd")
test = OutputTest(dut)
run_simulation(dut, [test.rcv(1), test.rcv(0),