self.data = Signal(16, reset_less=True)
-
class PassThroughStage:
def ispec(self):
return PassData()
+
def ospec(self):
- return self.ispec() # same as ospec
+ return self.ispec() # same as ospec
def process(self, i):
- return i # pass-through
-
+ return i # pass-through
class PassThroughPipe(SimpleHandshake):
self.di[muxid] = {}
self.do[muxid] = {}
for i in range(self.tlen):
- self.di[muxid][i] = randint(0, 255) + (muxid<<8)
+ self.di[muxid][i] = randint(0, 255) + (muxid << 8)
self.do[muxid][i] = self.di[muxid][i]
def send(self, muxid):
yield
o_p_ready = yield rs.o_ready
- print ("send", muxid, i, hex(op2))
+ print("send", muxid, i, hex(op2))
yield rs.i_valid.eq(0)
# wait random period of time before queueing another value
yield rs.i_valid.eq(0)
yield
- print ("send ended", muxid)
+ print("send ended", muxid)
- ## wait random period of time before queueing another value
- #for i in range(randint(0, 3)):
+ # wait random period of time before queueing another value
+ # for i in range(randint(0, 3)):
# yield
#send_range = randint(0, 3)
- #if send_range == 0:
+ # if send_range == 0:
# send = True
- #else:
+ # else:
# send = randint(0, send_range) != 0
def rcv(self, muxid):
while True:
#stall_range = randint(0, 3)
- #for j in range(randint(1,10)):
+ # for j in range(randint(1,10)):
# stall = randint(0, stall_range) != 0
# yield self.dut.n[0].i_ready.eq(stall)
# yield
out_i = yield n.o_data.idx
out_v = yield n.o_data.data
- print ("recv", out_muxid, out_i, hex(out_v))
+ print("recv", out_muxid, out_i, hex(out_v))
# see if this output has occurred already, delete it if it has
assert muxid == out_muxid, \
- "out_muxid %d not correct %d" % (out_muxid, muxid)
+ "out_muxid %d not correct %d" % (out_muxid, muxid)
assert out_i in self.do[muxid], "out_i %d not in array %s" % \
- (out_i, repr(self.do[muxid]))
- assert self.do[muxid][out_i] == out_v # pass-through data
+ (out_i, repr(self.do[muxid]))
+ assert self.do[muxid][out_i] == out_v # pass-through data
del self.do[muxid][out_i]
# check if there's any more outputs
if len(self.do[muxid]) == 0:
break
- print ("recv ended", muxid)
+ print("recv ended", muxid)
class TestPriorityMuxPipe(PriorityCombMuxInPipe):
muxid = i
else:
muxid = randint(0, dut.num_rows-1)
- data = randint(0, 255) + (muxid<<8)
+ data = randint(0, 255) + (muxid << 8)
def send(self):
for i in range(self.tlen * dut.num_rows):
yield
o_p_ready = yield rs.o_ready
- print ("send", muxid, i, hex(op2))
+ print("send", muxid, i, hex(op2))
yield rs.i_valid.eq(0)
# wait random period of time before queueing another value
class TestInOutPipe(Elaboratable):
def __init__(self, num_rows=4):
self.num_rows = num_rows
- self.inpipe = TestPriorityMuxPipe(num_rows) # fan-in (combinatorial)
+ self.inpipe = TestPriorityMuxPipe(num_rows) # fan-in (combinatorial)
self.pipe1 = PassThroughPipe() # stage 1 (clock-sync)
self.pipe2 = PassThroughPipe() # stage 2 (clock-sync)
self.outpipe = TestMuxOutPipe(num_rows) # fan-out (combinatorial)
self.p = self.inpipe.p # kinda annoying,
- self.n = self.outpipe.n # use pipe in/out as this class in/out
+ self.n = self.outpipe.n # use pipe in/out as this class in/out
self._ports = self.inpipe.ports() + self.outpipe.ports()
def elaborate(self, platform):
test.rcv(3), test.rcv(2),
test.send(0), test.send(1),
test.send(3), test.send(2),
- ],
+ ],
vcd_name="test_inoutmux_pipe.vcd")
+
if __name__ == '__main__':
test1()