self.data = Signal(16, reset_less=True)
-
class PassThroughStage:
def ispec(self):
return PassData()
+
def ospec(self):
- return self.ispec() # same as ospec
+ return self.ispec() # same as ospec
def process(self, i):
- return i # pass-through
-
+ return i # pass-through
class PassThroughPipe(MaskCancellable):
self.do[muxid] = {}
self.sent[muxid] = []
for i in range(self.tlen):
- self.di[muxid][i] = randint(0, 255) + (muxid<<8)
+ self.di[muxid][i] = randint(0, 255) + (muxid << 8)
self.do[muxid][i] = self.di[muxid][i]
def send(self, muxid):
yield
o_p_ready = yield rs.o_ready
- print ("send", muxid, i, hex(op2), op2)
+ print("send", muxid, i, hex(op2), op2)
self.sent[muxid].append(i)
yield rs.i_valid.eq(0)
yield rs.i_valid.eq(0)
yield
- print ("send ended", muxid)
+ print("send ended", muxid)
- ## wait random period of time before queueing another value
- #for i in range(randint(0, 3)):
+ # wait random period of time before queueing another value
+ # for i in range(randint(0, 3)):
# yield
#send_range = randint(0, 3)
- #if send_range == 0:
+ # if send_range == 0:
# send = True
- #else:
+ # else:
# send = randint(0, send_range) != 0
def rcv(self, muxid):
# check cancellation
if self.sent[muxid] and randint(0, 2) == 0:
todel = self.sent[muxid].pop()
- print ("to delete", muxid, self.sent[muxid], todel)
+ print("to delete", muxid, self.sent[muxid], todel)
if todel in self.do[muxid]:
del self.do[muxid][todel]
yield rs.stop_i.eq(1)
- print ("left", muxid, self.do[muxid])
+ print("left", muxid, self.do[muxid])
if len(self.do[muxid]) == 0:
break
stall_range = randint(0, 3)
- for j in range(randint(1,10)):
+ for j in range(randint(1, 10)):
stall = randint(0, stall_range) != 0
yield self.dut.n[0].i_ready.eq(stall)
yield
n = self.dut.n[muxid]
yield n.i_ready.eq(1)
yield
- yield rs.stop_i.eq(0) # resets cancel mask
+ yield rs.stop_i.eq(0) # resets cancel mask
o_n_valid = yield n.o_valid
i_n_ready = yield n.i_ready
if not o_n_valid or not i_n_ready:
out_i = yield n.o_data.idx
out_v = yield n.o_data.data
- print ("recv", out_muxid, out_i, hex(out_v), out_v)
+ print("recv", out_muxid, out_i, hex(out_v), out_v)
# see if this output has occurred already, delete it if it has
assert muxid == out_muxid, \
- "out_muxid %d not correct %d" % (out_muxid, muxid)
+ "out_muxid %d not correct %d" % (out_muxid, muxid)
if out_i not in self.sent[muxid]:
- print ("cancelled/recv", muxid, out_i)
+ print("cancelled/recv", muxid, out_i)
continue
assert out_i in self.do[muxid], "out_i %d not in array %s" % \
- (out_i, repr(self.do[muxid]))
- assert self.do[muxid][out_i] == out_v # pass-through data
+ (out_i, repr(self.do[muxid]))
+ assert self.do[muxid][out_i] == out_v # pass-through data
del self.do[muxid][out_i]
todel = self.sent[muxid].index(out_i)
del self.sent[muxid][todel]
if len(self.do[muxid]) == 0:
break
- print ("recv ended", muxid)
+ print("recv ended", muxid)
class TestPriorityMuxPipe(PriorityCombMuxInPipe):
class TestInOutPipe(Elaboratable):
def __init__(self, num_rows=4):
self.num_rows = nr = num_rows
- self.inpipe = TestPriorityMuxPipe(nr) # fan-in (combinatorial)
+ self.inpipe = TestPriorityMuxPipe(nr) # fan-in (combinatorial)
self.pipe1 = PassThroughPipe(nr) # stage 1 (clock-sync)
self.pipe2 = PassThroughPipe(nr) # stage 2 (clock-sync)
self.pipe3 = PassThroughPipe(nr) # stage 3 (clock-sync)
self.outpipe = TestMuxOutPipe(nr) # fan-out (combinatorial)
self.p = self.inpipe.p # kinda annoying,
- self.n = self.outpipe.n # use pipe in/out as this class in/out
+ self.n = self.outpipe.n # use pipe in/out as this class in/out
self._ports = self.inpipe.ports() + self.outpipe.ports()
def elaborate(self, platform):
test.rcv(3), test.rcv(2),
test.send(0), test.send(1),
test.send(3), test.send(2),
- ],
+ ],
vcd_name="test_inoutmux_unarycancel_pipe.vcd")
+
if __name__ == '__main__':
test1()