self.data = Signal(16, name="data"+name, reset_less=True)
-
class PassThroughStage:
def ispec(self, name=None):
return PassData(name=name)
+
def ospec(self, name=None):
- return self.ispec(name) # same as ospec
+ return self.ispec(name) # same as ospec
def process(self, i):
- return i # pass-through
-
+ return i # pass-through
class PassThroughPipe(SimpleHandshake):
self.di[muxid] = {}
self.do[muxid] = {}
for i in range(self.tlen):
- self.di[muxid][i] = randint(0, 255) + (muxid<<8)
+ self.di[muxid][i] = randint(0, 255) + (muxid << 8)
self.do[muxid][i] = self.di[muxid][i]
def send(self, muxid):
yield
o_p_ready = yield rs.o_ready
- print ("send", muxid, i, hex(op2))
+ print("send", muxid, i, hex(op2))
yield rs.i_valid.eq(0)
# wait random period of time before queueing another value
yield rs.i_valid.eq(0)
yield
- print ("send ended", muxid)
+ print("send ended", muxid)
- ## wait random period of time before queueing another value
- #for i in range(randint(0, 3)):
+ # wait random period of time before queueing another value
+ # for i in range(randint(0, 3)):
# yield
#send_range = randint(0, 3)
- #if send_range == 0:
+ # if send_range == 0:
# send = True
- #else:
+ # else:
# send = randint(0, send_range) != 0
def rcv(self, muxid):
while True:
#stall_range = randint(0, 3)
- #for j in range(randint(1,10)):
+ # for j in range(randint(1,10)):
# stall = randint(0, stall_range) != 0
# yield self.dut.n[0].i_ready.eq(stall)
# yield
out_i = yield n.o_data.idx
out_v = yield n.o_data.data
- print ("recv", out_muxid, out_i, hex(out_v))
+ print("recv", out_muxid, out_i, hex(out_v))
# see if this output has occurred already, delete it if it has
assert muxid == out_muxid, \
- "out_muxid %d not correct %d" % (out_muxid, muxid)
+ "out_muxid %d not correct %d" % (out_muxid, muxid)
assert out_i in self.do[muxid], "out_i %d not in array %s" % \
- (out_i, repr(self.do[muxid]))
- assert self.do[muxid][out_i] == out_v # pass-through data
+ (out_i, repr(self.do[muxid]))
+ assert self.do[muxid][out_i] == out_v # pass-through data
del self.do[muxid][out_i]
# check if there's any more outputs
if len(self.do[muxid]) == 0:
break
- print ("recv ended", muxid)
+ print("recv ended", muxid)
class TestALU(Elaboratable):
test.rcv(3), test.rcv(2),
test.send(0), test.send(1),
test.send(3), test.send(2),
- ],
+ ],
vcd_name="test_reservation_stations.vcd")
+
if __name__ == '__main__':
test1()