From: Luke Kenneth Casson Leighton Date: Thu, 2 May 2019 13:16:45 +0000 (+0100) Subject: move more unit tests X-Git-Tag: ls180-24jan2020~1079 X-Git-Url: https://git.libre-soc.org/?a=commitdiff_plain;h=79a438bdad47b7f9e87d84b64b8e412f9ff2f6f8;p=ieee754fpu.git move more unit tests --- diff --git a/src/ieee754/add/test_inout_mux_pipe.py b/src/ieee754/add/test_inout_mux_pipe.py deleted file mode 100644 index 221ece1d..00000000 --- a/src/ieee754/add/test_inout_mux_pipe.py +++ /dev/null @@ -1,229 +0,0 @@ -""" key strategic example showing how to do multi-input fan-in into a - multi-stage pipeline, then multi-output fanout. - - the multiplex ID from the fan-in is passed in to the pipeline, preserved, - and used as a routing ID on the fanout. -""" - -from random import randint -from math import log -from nmigen import Module, Signal, Cat, Value, Elaboratable -from nmigen.compat.sim import run_simulation -from nmigen.cli import verilog, rtlil - -from nmutil.multipipe import CombMultiOutPipeline, CombMuxOutPipe -from nmutil.multipipe import PriorityCombMuxInPipe -from nmutil.singlepipe import SimpleHandshake, RecordObject, Object - - -class PassData2(RecordObject): - def __init__(self): - RecordObject.__init__(self) - self.mid = Signal(2, reset_less=True) - self.idx = Signal(8, reset_less=True) - self.data = Signal(16, reset_less=True) - - -class PassData(Object): - def __init__(self): - Object.__init__(self) - self.mid = Signal(2, reset_less=True) - self.idx = Signal(8, reset_less=True) - self.data = Signal(16, reset_less=True) - - - -class PassThroughStage: - def ispec(self): - return PassData() - def ospec(self): - return self.ispec() # same as ospec - - def process(self, i): - return i # pass-through - - - -class PassThroughPipe(SimpleHandshake): - def __init__(self): - SimpleHandshake.__init__(self, PassThroughStage()) - - -class InputTest: - def __init__(self, dut): - self.dut = dut - self.di = {} - self.do = {} - self.tlen = 100 - for mid in range(dut.num_rows): - self.di[mid] = {} - self.do[mid] = {} - for i in range(self.tlen): - self.di[mid][i] = randint(0, 255) + (mid<<8) - self.do[mid][i] = self.di[mid][i] - - def send(self, mid): - for i in range(self.tlen): - op2 = self.di[mid][i] - rs = dut.p[mid] - yield rs.valid_i.eq(1) - yield rs.data_i.data.eq(op2) - yield rs.data_i.idx.eq(i) - yield rs.data_i.mid.eq(mid) - yield - o_p_ready = yield rs.ready_o - while not o_p_ready: - yield - o_p_ready = yield rs.ready_o - - print ("send", mid, i, hex(op2)) - - yield rs.valid_i.eq(0) - # wait random period of time before queueing another value - for i in range(randint(0, 3)): - yield - - yield rs.valid_i.eq(0) - yield - - print ("send ended", mid) - - ## wait random period of time before queueing another value - #for i in range(randint(0, 3)): - # yield - - #send_range = randint(0, 3) - #if send_range == 0: - # send = True - #else: - # send = randint(0, send_range) != 0 - - def rcv(self, mid): - while True: - #stall_range = randint(0, 3) - #for j in range(randint(1,10)): - # stall = randint(0, stall_range) != 0 - # yield self.dut.n[0].ready_i.eq(stall) - # yield - n = self.dut.n[mid] - yield n.ready_i.eq(1) - yield - o_n_valid = yield n.valid_o - i_n_ready = yield n.ready_i - if not o_n_valid or not i_n_ready: - continue - - out_mid = yield n.data_o.mid - out_i = yield n.data_o.idx - out_v = yield n.data_o.data - - print ("recv", out_mid, out_i, hex(out_v)) - - # see if this output has occurred already, delete it if it has - assert mid == out_mid, "out_mid %d not correct %d" % (out_mid, mid) - assert out_i in self.do[mid], "out_i %d not in array %s" % \ - (out_i, repr(self.do[mid])) - assert self.do[mid][out_i] == out_v # pass-through data - del self.do[mid][out_i] - - # check if there's any more outputs - if len(self.do[mid]) == 0: - break - print ("recv ended", mid) - - -class TestPriorityMuxPipe(PriorityCombMuxInPipe): - def __init__(self, num_rows): - self.num_rows = num_rows - stage = PassThroughStage() - PriorityCombMuxInPipe.__init__(self, stage, p_len=self.num_rows) - - -class OutputTest: - def __init__(self, dut): - self.dut = dut - self.di = [] - self.do = {} - self.tlen = 100 - for i in range(self.tlen * dut.num_rows): - if i < dut.num_rows: - mid = i - else: - mid = randint(0, dut.num_rows-1) - data = randint(0, 255) + (mid<<8) - - def send(self): - for i in range(self.tlen * dut.num_rows): - op2 = self.di[i][0] - mid = self.di[i][1] - rs = dut.p - yield rs.valid_i.eq(1) - yield rs.data_i.data.eq(op2) - yield rs.data_i.mid.eq(mid) - yield - o_p_ready = yield rs.ready_o - while not o_p_ready: - yield - o_p_ready = yield rs.ready_o - - print ("send", mid, i, hex(op2)) - - yield rs.valid_i.eq(0) - # wait random period of time before queueing another value - for i in range(randint(0, 3)): - yield - - yield rs.valid_i.eq(0) - - -class TestMuxOutPipe(CombMuxOutPipe): - def __init__(self, num_rows): - self.num_rows = num_rows - stage = PassThroughStage() - CombMuxOutPipe.__init__(self, stage, n_len=self.num_rows) - - -class TestInOutPipe(Elaboratable): - def __init__(self, num_rows=4): - self.num_rows = num_rows - self.inpipe = TestPriorityMuxPipe(num_rows) # fan-in (combinatorial) - self.pipe1 = PassThroughPipe() # stage 1 (clock-sync) - self.pipe2 = PassThroughPipe() # stage 2 (clock-sync) - self.outpipe = TestMuxOutPipe(num_rows) # fan-out (combinatorial) - - self.p = self.inpipe.p # kinda annoying, - self.n = self.outpipe.n # use pipe in/out as this class in/out - self._ports = self.inpipe.ports() + self.outpipe.ports() - - def elaborate(self, platform): - m = Module() - m.submodules.inpipe = self.inpipe - m.submodules.pipe1 = self.pipe1 - m.submodules.pipe2 = self.pipe2 - m.submodules.outpipe = self.outpipe - - m.d.comb += self.inpipe.n.connect_to_next(self.pipe1.p) - m.d.comb += self.pipe1.connect_to_next(self.pipe2) - m.d.comb += self.pipe2.connect_to_next(self.outpipe) - - return m - - def ports(self): - return self._ports - - -if __name__ == '__main__': - dut = TestInOutPipe() - vl = rtlil.convert(dut, ports=dut.ports()) - with open("test_inoutmux_pipe.il", "w") as f: - f.write(vl) - #run_simulation(dut, testbench(dut), vcd_name="test_inputgroup.vcd") - - test = InputTest(dut) - run_simulation(dut, [test.rcv(1), test.rcv(0), - test.rcv(3), test.rcv(2), - test.send(0), test.send(1), - test.send(3), test.send(2), - ], - vcd_name="test_inoutmux_pipe.vcd") - diff --git a/src/ieee754/add/test_outmux_pipe.py b/src/ieee754/add/test_outmux_pipe.py deleted file mode 100644 index 768d9d9a..00000000 --- a/src/ieee754/add/test_outmux_pipe.py +++ /dev/null @@ -1,162 +0,0 @@ -from random import randint -from math import log -from nmigen import Module, Signal, Cat, Elaboratable -from nmigen.compat.sim import run_simulation -from nmigen.cli import verilog, rtlil - -from nmutil.multipipe import CombMuxOutPipe -from nmutil.singlepipe import SimpleHandshake, PassThroughHandshake, RecordObject - - -class PassInData(RecordObject): - def __init__(self): - RecordObject.__init__(self) - self.mid = Signal(2, reset_less=True) - self.data = Signal(16, reset_less=True) - - -class PassThroughStage: - - def ispec(self): - return PassInData() - - def ospec(self, name): - return Signal(16, name="%s_dout" % name, reset_less=True) - - def process(self, i): - return i.data - - -class PassThroughDataStage: - def ispec(self): - return PassInData() - def ospec(self): - return self.ispec() # same as ospec - - def process(self, i): - return i # pass-through - - - -class PassThroughPipe(PassThroughHandshake): - def __init__(self): - PassThroughHandshake.__init__(self, PassThroughDataStage()) - - -class OutputTest: - def __init__(self, dut): - self.dut = dut - self.di = [] - self.do = {} - self.tlen = 10 - for i in range(self.tlen * dut.num_rows): - if i < dut.num_rows: - mid = i - else: - mid = randint(0, dut.num_rows-1) - data = randint(0, 255) + (mid<<8) - if mid not in self.do: - self.do[mid] = [] - self.di.append((data, mid)) - self.do[mid].append(data) - - def send(self): - for i in range(self.tlen * dut.num_rows): - op2 = self.di[i][0] - mid = self.di[i][1] - rs = dut.p - yield rs.valid_i.eq(1) - yield rs.data_i.data.eq(op2) - yield rs.data_i.mid.eq(mid) - yield - o_p_ready = yield rs.ready_o - while not o_p_ready: - yield - o_p_ready = yield rs.ready_o - - print ("send", mid, i, hex(op2)) - - yield rs.valid_i.eq(0) - # wait random period of time before queueing another value - for i in range(randint(0, 3)): - yield - - yield rs.valid_i.eq(0) - - def rcv(self, mid): - out_i = 0 - count = 0 - stall_range = randint(0, 3) - while out_i != len(self.do[mid]): - count += 1 - assert count != 2000, "timeout: too long" - n = self.dut.n[mid] - yield n.ready_i.eq(1) - yield - o_n_valid = yield n.valid_o - i_n_ready = yield n.ready_i - if not o_n_valid or not i_n_ready: - continue - - out_v = yield n.data_o - - print ("recv", mid, out_i, hex(out_v)) - - assert self.do[mid][out_i] == out_v # pass-through data - - out_i += 1 - - if randint(0, 5) == 0: - stall_range = randint(0, 3) - stall = randint(0, stall_range) != 0 - if stall: - yield n.ready_i.eq(0) - for i in range(stall_range): - yield - - -class TestPriorityMuxPipe(CombMuxOutPipe): - def __init__(self, num_rows): - self.num_rows = num_rows - stage = PassThroughStage() - CombMuxOutPipe.__init__(self, stage, n_len=self.num_rows) - - -class TestSyncToPriorityPipe(Elaboratable): - def __init__(self): - self.num_rows = 4 - self.pipe = PassThroughPipe() - self.muxpipe = TestPriorityMuxPipe(self.num_rows) - - self.p = self.pipe.p - self.n = self.muxpipe.n - - def elaborate(self, platform): - m = Module() - m.submodules.pipe = self.pipe - m.submodules.muxpipe = self.muxpipe - m.d.comb += self.pipe.n.connect_to_next(self.muxpipe.p) - return m - - def ports(self): - res = [self.p.valid_i, self.p.ready_o] + \ - self.p.data_i.ports() - for i in range(len(self.n)): - res += [self.n[i].ready_i, self.n[i].valid_o] + \ - [self.n[i].data_o] - #self.n[i].data_o.ports() - return res - - -if __name__ == '__main__': - dut = TestSyncToPriorityPipe() - vl = rtlil.convert(dut, ports=dut.ports()) - with open("test_outmux_pipe.il", "w") as f: - f.write(vl) - - test = OutputTest(dut) - run_simulation(dut, [test.rcv(1), test.rcv(0), - test.rcv(3), test.rcv(2), - test.send()], - vcd_name="test_outmux_pipe.vcd") - diff --git a/src/ieee754/add/test_prioritymux_pipe.py b/src/ieee754/add/test_prioritymux_pipe.py deleted file mode 100644 index ca7181d0..00000000 --- a/src/ieee754/add/test_prioritymux_pipe.py +++ /dev/null @@ -1,218 +0,0 @@ -from random import randint -from math import log -from nmigen import Module, Signal, Cat -from nmigen.compat.sim import run_simulation -from nmigen.cli import verilog, rtlil - -from nmutil.singlepipe import PassThroughStage -from nmutil.multipipe import (CombMultiInPipeline, PriorityCombMuxInPipe) - - -class PassData: - def __init__(self): - self.mid = Signal(2, reset_less=True) - self.idx = Signal(6, reset_less=True) - self.data = Signal(16, reset_less=True) - - def eq(self, i): - return [self.mid.eq(i.mid), self.idx.eq(i.idx), self.data.eq(i.data)] - - def ports(self): - return [self.mid, self.idx, self.data] - - -def testbench(dut): - stb = yield dut.out_op.stb - assert stb == 0 - ack = yield dut.out_op.ack - assert ack == 0 - - # set row 1 input 0 - yield dut.rs[1].in_op[0].eq(5) - yield dut.rs[1].stb.eq(0b01) # strobe indicate 1st op ready - #yield dut.rs[1].ack.eq(1) - yield - - # check row 1 output (should be inactive) - decode = yield dut.rs[1].out_decode - assert decode == 0 - if False: - op0 = yield dut.rs[1].out_op[0] - op1 = yield dut.rs[1].out_op[1] - assert op0 == 0 and op1 == 0 - - # output should be inactive - out_stb = yield dut.out_op.stb - assert out_stb == 1 - - # set row 0 input 1 - yield dut.rs[1].in_op[1].eq(6) - yield dut.rs[1].stb.eq(0b11) # strobe indicate both ops ready - - # set acknowledgement of output... takes 1 cycle to respond - yield dut.out_op.ack.eq(1) - yield - yield dut.out_op.ack.eq(0) # clear ack on output - yield dut.rs[1].stb.eq(0) # clear row 1 strobe - - # output strobe should be active, MID should be 0 until "ack" is set... - out_stb = yield dut.out_op.stb - assert out_stb == 1 - out_mid = yield dut.mid - assert out_mid == 0 - - # ... and output should not yet be passed through either - op0 = yield dut.out_op.v[0] - op1 = yield dut.out_op.v[1] - assert op0 == 0 and op1 == 0 - - # wait for out_op.ack to activate... - yield dut.rs[1].stb.eq(0b00) # set row 1 strobes to zero - yield - - # *now* output should be passed through - op0 = yield dut.out_op.v[0] - op1 = yield dut.out_op.v[1] - assert op0 == 5 and op1 == 6 - - # set row 2 input - yield dut.rs[2].in_op[0].eq(3) - yield dut.rs[2].in_op[1].eq(4) - yield dut.rs[2].stb.eq(0b11) # strobe indicate 1st op ready - yield dut.out_op.ack.eq(1) # set output ack - yield - yield dut.rs[2].stb.eq(0) # clear row 2 strobe - yield dut.out_op.ack.eq(0) # set output ack - yield - op0 = yield dut.out_op.v[0] - op1 = yield dut.out_op.v[1] - assert op0 == 3 and op1 == 4, "op0 %d op1 %d" % (op0, op1) - out_mid = yield dut.mid - assert out_mid == 2 - - # set row 0 and 3 input - yield dut.rs[0].in_op[0].eq(9) - yield dut.rs[0].in_op[1].eq(8) - yield dut.rs[0].stb.eq(0b11) # strobe indicate 1st op ready - yield dut.rs[3].in_op[0].eq(1) - yield dut.rs[3].in_op[1].eq(2) - yield dut.rs[3].stb.eq(0b11) # strobe indicate 1st op ready - - # set acknowledgement of output... takes 1 cycle to respond - yield dut.out_op.ack.eq(1) - yield - yield dut.rs[0].stb.eq(0) # clear row 1 strobe - yield - out_mid = yield dut.mid - assert out_mid == 0, "out mid %d" % out_mid - - yield - yield dut.rs[3].stb.eq(0) # clear row 1 strobe - yield dut.out_op.ack.eq(0) # clear ack on output - yield - out_mid = yield dut.mid - assert out_mid == 3, "out mid %d" % out_mid - - -class InputTest: - def __init__(self, dut): - self.dut = dut - self.di = {} - self.do = {} - self.tlen = 10 - for mid in range(dut.num_rows): - self.di[mid] = {} - self.do[mid] = {} - for i in range(self.tlen): - self.di[mid][i] = randint(0, 100) + (mid<<8) - self.do[mid][i] = self.di[mid][i] - - def send(self, mid): - for i in range(self.tlen): - op2 = self.di[mid][i] - rs = dut.p[mid] - yield rs.valid_i.eq(1) - yield rs.data_i.data.eq(op2) - yield rs.data_i.idx.eq(i) - yield rs.data_i.mid.eq(mid) - yield - o_p_ready = yield rs.ready_o - while not o_p_ready: - yield - o_p_ready = yield rs.ready_o - - print ("send", mid, i, hex(op2)) - - yield rs.valid_i.eq(0) - # wait random period of time before queueing another value - for i in range(randint(0, 3)): - yield - - yield rs.valid_i.eq(0) - ## wait random period of time before queueing another value - #for i in range(randint(0, 3)): - # yield - - #send_range = randint(0, 3) - #if send_range == 0: - # send = True - #else: - # send = randint(0, send_range) != 0 - - def rcv(self): - while True: - #stall_range = randint(0, 3) - #for j in range(randint(1,10)): - # stall = randint(0, stall_range) != 0 - # yield self.dut.n[0].ready_i.eq(stall) - # yield - n = self.dut.n - yield n.ready_i.eq(1) - yield - o_n_valid = yield n.valid_o - i_n_ready = yield n.ready_i - if not o_n_valid or not i_n_ready: - continue - - mid = yield n.data_o.mid - out_i = yield n.data_o.idx - out_v = yield n.data_o.data - - print ("recv", mid, out_i, hex(out_v)) - - # see if this output has occurred already, delete it if it has - assert out_i in self.do[mid], "out_i %d not in array %s" % \ - (out_i, repr(self.do[mid])) - assert self.do[mid][out_i] == out_v # pass-through data - del self.do[mid][out_i] - - # check if there's any more outputs - zerolen = True - for (k, v) in self.do.items(): - if v: - zerolen = False - if zerolen: - break - - -class TestPriorityMuxPipe(PriorityCombMuxInPipe): - def __init__(self): - self.num_rows = 4 - def iospecfn(): return PassData() - stage = PassThroughStage(iospecfn) - PriorityCombMuxInPipe.__init__(self, stage, p_len=self.num_rows) - - -if __name__ == '__main__': - dut = TestPriorityMuxPipe() - vl = rtlil.convert(dut, ports=dut.ports()) - with open("test_inputgroup_multi.il", "w") as f: - f.write(vl) - #run_simulation(dut, testbench(dut), vcd_name="test_inputgroup.vcd") - - test = InputTest(dut) - run_simulation(dut, [test.send(1), test.send(0), - test.send(3), test.send(2), - test.rcv()], - vcd_name="test_inputgroup_multi.vcd") - diff --git a/src/nmutil/test/test_inout_mux_pipe.py b/src/nmutil/test/test_inout_mux_pipe.py new file mode 100644 index 00000000..221ece1d --- /dev/null +++ b/src/nmutil/test/test_inout_mux_pipe.py @@ -0,0 +1,229 @@ +""" key strategic example showing how to do multi-input fan-in into a + multi-stage pipeline, then multi-output fanout. + + the multiplex ID from the fan-in is passed in to the pipeline, preserved, + and used as a routing ID on the fanout. +""" + +from random import randint +from math import log +from nmigen import Module, Signal, Cat, Value, Elaboratable +from nmigen.compat.sim import run_simulation +from nmigen.cli import verilog, rtlil + +from nmutil.multipipe import CombMultiOutPipeline, CombMuxOutPipe +from nmutil.multipipe import PriorityCombMuxInPipe +from nmutil.singlepipe import SimpleHandshake, RecordObject, Object + + +class PassData2(RecordObject): + def __init__(self): + RecordObject.__init__(self) + self.mid = Signal(2, reset_less=True) + self.idx = Signal(8, reset_less=True) + self.data = Signal(16, reset_less=True) + + +class PassData(Object): + def __init__(self): + Object.__init__(self) + self.mid = Signal(2, reset_less=True) + self.idx = Signal(8, reset_less=True) + self.data = Signal(16, reset_less=True) + + + +class PassThroughStage: + def ispec(self): + return PassData() + def ospec(self): + return self.ispec() # same as ospec + + def process(self, i): + return i # pass-through + + + +class PassThroughPipe(SimpleHandshake): + def __init__(self): + SimpleHandshake.__init__(self, PassThroughStage()) + + +class InputTest: + def __init__(self, dut): + self.dut = dut + self.di = {} + self.do = {} + self.tlen = 100 + for mid in range(dut.num_rows): + self.di[mid] = {} + self.do[mid] = {} + for i in range(self.tlen): + self.di[mid][i] = randint(0, 255) + (mid<<8) + self.do[mid][i] = self.di[mid][i] + + def send(self, mid): + for i in range(self.tlen): + op2 = self.di[mid][i] + rs = dut.p[mid] + yield rs.valid_i.eq(1) + yield rs.data_i.data.eq(op2) + yield rs.data_i.idx.eq(i) + yield rs.data_i.mid.eq(mid) + yield + o_p_ready = yield rs.ready_o + while not o_p_ready: + yield + o_p_ready = yield rs.ready_o + + print ("send", mid, i, hex(op2)) + + yield rs.valid_i.eq(0) + # wait random period of time before queueing another value + for i in range(randint(0, 3)): + yield + + yield rs.valid_i.eq(0) + yield + + print ("send ended", mid) + + ## wait random period of time before queueing another value + #for i in range(randint(0, 3)): + # yield + + #send_range = randint(0, 3) + #if send_range == 0: + # send = True + #else: + # send = randint(0, send_range) != 0 + + def rcv(self, mid): + while True: + #stall_range = randint(0, 3) + #for j in range(randint(1,10)): + # stall = randint(0, stall_range) != 0 + # yield self.dut.n[0].ready_i.eq(stall) + # yield + n = self.dut.n[mid] + yield n.ready_i.eq(1) + yield + o_n_valid = yield n.valid_o + i_n_ready = yield n.ready_i + if not o_n_valid or not i_n_ready: + continue + + out_mid = yield n.data_o.mid + out_i = yield n.data_o.idx + out_v = yield n.data_o.data + + print ("recv", out_mid, out_i, hex(out_v)) + + # see if this output has occurred already, delete it if it has + assert mid == out_mid, "out_mid %d not correct %d" % (out_mid, mid) + assert out_i in self.do[mid], "out_i %d not in array %s" % \ + (out_i, repr(self.do[mid])) + assert self.do[mid][out_i] == out_v # pass-through data + del self.do[mid][out_i] + + # check if there's any more outputs + if len(self.do[mid]) == 0: + break + print ("recv ended", mid) + + +class TestPriorityMuxPipe(PriorityCombMuxInPipe): + def __init__(self, num_rows): + self.num_rows = num_rows + stage = PassThroughStage() + PriorityCombMuxInPipe.__init__(self, stage, p_len=self.num_rows) + + +class OutputTest: + def __init__(self, dut): + self.dut = dut + self.di = [] + self.do = {} + self.tlen = 100 + for i in range(self.tlen * dut.num_rows): + if i < dut.num_rows: + mid = i + else: + mid = randint(0, dut.num_rows-1) + data = randint(0, 255) + (mid<<8) + + def send(self): + for i in range(self.tlen * dut.num_rows): + op2 = self.di[i][0] + mid = self.di[i][1] + rs = dut.p + yield rs.valid_i.eq(1) + yield rs.data_i.data.eq(op2) + yield rs.data_i.mid.eq(mid) + yield + o_p_ready = yield rs.ready_o + while not o_p_ready: + yield + o_p_ready = yield rs.ready_o + + print ("send", mid, i, hex(op2)) + + yield rs.valid_i.eq(0) + # wait random period of time before queueing another value + for i in range(randint(0, 3)): + yield + + yield rs.valid_i.eq(0) + + +class TestMuxOutPipe(CombMuxOutPipe): + def __init__(self, num_rows): + self.num_rows = num_rows + stage = PassThroughStage() + CombMuxOutPipe.__init__(self, stage, n_len=self.num_rows) + + +class TestInOutPipe(Elaboratable): + def __init__(self, num_rows=4): + self.num_rows = num_rows + self.inpipe = TestPriorityMuxPipe(num_rows) # fan-in (combinatorial) + self.pipe1 = PassThroughPipe() # stage 1 (clock-sync) + self.pipe2 = PassThroughPipe() # stage 2 (clock-sync) + self.outpipe = TestMuxOutPipe(num_rows) # fan-out (combinatorial) + + self.p = self.inpipe.p # kinda annoying, + self.n = self.outpipe.n # use pipe in/out as this class in/out + self._ports = self.inpipe.ports() + self.outpipe.ports() + + def elaborate(self, platform): + m = Module() + m.submodules.inpipe = self.inpipe + m.submodules.pipe1 = self.pipe1 + m.submodules.pipe2 = self.pipe2 + m.submodules.outpipe = self.outpipe + + m.d.comb += self.inpipe.n.connect_to_next(self.pipe1.p) + m.d.comb += self.pipe1.connect_to_next(self.pipe2) + m.d.comb += self.pipe2.connect_to_next(self.outpipe) + + return m + + def ports(self): + return self._ports + + +if __name__ == '__main__': + dut = TestInOutPipe() + vl = rtlil.convert(dut, ports=dut.ports()) + with open("test_inoutmux_pipe.il", "w") as f: + f.write(vl) + #run_simulation(dut, testbench(dut), vcd_name="test_inputgroup.vcd") + + test = InputTest(dut) + run_simulation(dut, [test.rcv(1), test.rcv(0), + test.rcv(3), test.rcv(2), + test.send(0), test.send(1), + test.send(3), test.send(2), + ], + vcd_name="test_inoutmux_pipe.vcd") + diff --git a/src/nmutil/test/test_outmux_pipe.py b/src/nmutil/test/test_outmux_pipe.py new file mode 100644 index 00000000..768d9d9a --- /dev/null +++ b/src/nmutil/test/test_outmux_pipe.py @@ -0,0 +1,162 @@ +from random import randint +from math import log +from nmigen import Module, Signal, Cat, Elaboratable +from nmigen.compat.sim import run_simulation +from nmigen.cli import verilog, rtlil + +from nmutil.multipipe import CombMuxOutPipe +from nmutil.singlepipe import SimpleHandshake, PassThroughHandshake, RecordObject + + +class PassInData(RecordObject): + def __init__(self): + RecordObject.__init__(self) + self.mid = Signal(2, reset_less=True) + self.data = Signal(16, reset_less=True) + + +class PassThroughStage: + + def ispec(self): + return PassInData() + + def ospec(self, name): + return Signal(16, name="%s_dout" % name, reset_less=True) + + def process(self, i): + return i.data + + +class PassThroughDataStage: + def ispec(self): + return PassInData() + def ospec(self): + return self.ispec() # same as ospec + + def process(self, i): + return i # pass-through + + + +class PassThroughPipe(PassThroughHandshake): + def __init__(self): + PassThroughHandshake.__init__(self, PassThroughDataStage()) + + +class OutputTest: + def __init__(self, dut): + self.dut = dut + self.di = [] + self.do = {} + self.tlen = 10 + for i in range(self.tlen * dut.num_rows): + if i < dut.num_rows: + mid = i + else: + mid = randint(0, dut.num_rows-1) + data = randint(0, 255) + (mid<<8) + if mid not in self.do: + self.do[mid] = [] + self.di.append((data, mid)) + self.do[mid].append(data) + + def send(self): + for i in range(self.tlen * dut.num_rows): + op2 = self.di[i][0] + mid = self.di[i][1] + rs = dut.p + yield rs.valid_i.eq(1) + yield rs.data_i.data.eq(op2) + yield rs.data_i.mid.eq(mid) + yield + o_p_ready = yield rs.ready_o + while not o_p_ready: + yield + o_p_ready = yield rs.ready_o + + print ("send", mid, i, hex(op2)) + + yield rs.valid_i.eq(0) + # wait random period of time before queueing another value + for i in range(randint(0, 3)): + yield + + yield rs.valid_i.eq(0) + + def rcv(self, mid): + out_i = 0 + count = 0 + stall_range = randint(0, 3) + while out_i != len(self.do[mid]): + count += 1 + assert count != 2000, "timeout: too long" + n = self.dut.n[mid] + yield n.ready_i.eq(1) + yield + o_n_valid = yield n.valid_o + i_n_ready = yield n.ready_i + if not o_n_valid or not i_n_ready: + continue + + out_v = yield n.data_o + + print ("recv", mid, out_i, hex(out_v)) + + assert self.do[mid][out_i] == out_v # pass-through data + + out_i += 1 + + if randint(0, 5) == 0: + stall_range = randint(0, 3) + stall = randint(0, stall_range) != 0 + if stall: + yield n.ready_i.eq(0) + for i in range(stall_range): + yield + + +class TestPriorityMuxPipe(CombMuxOutPipe): + def __init__(self, num_rows): + self.num_rows = num_rows + stage = PassThroughStage() + CombMuxOutPipe.__init__(self, stage, n_len=self.num_rows) + + +class TestSyncToPriorityPipe(Elaboratable): + def __init__(self): + self.num_rows = 4 + self.pipe = PassThroughPipe() + self.muxpipe = TestPriorityMuxPipe(self.num_rows) + + self.p = self.pipe.p + self.n = self.muxpipe.n + + def elaborate(self, platform): + m = Module() + m.submodules.pipe = self.pipe + m.submodules.muxpipe = self.muxpipe + m.d.comb += self.pipe.n.connect_to_next(self.muxpipe.p) + return m + + def ports(self): + res = [self.p.valid_i, self.p.ready_o] + \ + self.p.data_i.ports() + for i in range(len(self.n)): + res += [self.n[i].ready_i, self.n[i].valid_o] + \ + [self.n[i].data_o] + #self.n[i].data_o.ports() + return res + + +if __name__ == '__main__': + dut = TestSyncToPriorityPipe() + vl = rtlil.convert(dut, ports=dut.ports()) + with open("test_outmux_pipe.il", "w") as f: + f.write(vl) + + test = OutputTest(dut) + run_simulation(dut, [test.rcv(1), test.rcv(0), + test.rcv(3), test.rcv(2), + test.send()], + vcd_name="test_outmux_pipe.vcd") + diff --git a/src/nmutil/test/test_prioritymux_pipe.py b/src/nmutil/test/test_prioritymux_pipe.py new file mode 100644 index 00000000..ca7181d0 --- /dev/null +++ b/src/nmutil/test/test_prioritymux_pipe.py @@ -0,0 +1,218 @@ +from random import randint +from math import log +from nmigen import Module, Signal, Cat +from nmigen.compat.sim import run_simulation +from nmigen.cli import verilog, rtlil + +from nmutil.singlepipe import PassThroughStage +from nmutil.multipipe import (CombMultiInPipeline, PriorityCombMuxInPipe) + + +class PassData: + def __init__(self): + self.mid = Signal(2, reset_less=True) + self.idx = Signal(6, reset_less=True) + self.data = Signal(16, reset_less=True) + + def eq(self, i): + return [self.mid.eq(i.mid), self.idx.eq(i.idx), self.data.eq(i.data)] + + def ports(self): + return [self.mid, self.idx, self.data] + + +def testbench(dut): + stb = yield dut.out_op.stb + assert stb == 0 + ack = yield dut.out_op.ack + assert ack == 0 + + # set row 1 input 0 + yield dut.rs[1].in_op[0].eq(5) + yield dut.rs[1].stb.eq(0b01) # strobe indicate 1st op ready + #yield dut.rs[1].ack.eq(1) + yield + + # check row 1 output (should be inactive) + decode = yield dut.rs[1].out_decode + assert decode == 0 + if False: + op0 = yield dut.rs[1].out_op[0] + op1 = yield dut.rs[1].out_op[1] + assert op0 == 0 and op1 == 0 + + # output should be inactive + out_stb = yield dut.out_op.stb + assert out_stb == 1 + + # set row 0 input 1 + yield dut.rs[1].in_op[1].eq(6) + yield dut.rs[1].stb.eq(0b11) # strobe indicate both ops ready + + # set acknowledgement of output... takes 1 cycle to respond + yield dut.out_op.ack.eq(1) + yield + yield dut.out_op.ack.eq(0) # clear ack on output + yield dut.rs[1].stb.eq(0) # clear row 1 strobe + + # output strobe should be active, MID should be 0 until "ack" is set... + out_stb = yield dut.out_op.stb + assert out_stb == 1 + out_mid = yield dut.mid + assert out_mid == 0 + + # ... and output should not yet be passed through either + op0 = yield dut.out_op.v[0] + op1 = yield dut.out_op.v[1] + assert op0 == 0 and op1 == 0 + + # wait for out_op.ack to activate... + yield dut.rs[1].stb.eq(0b00) # set row 1 strobes to zero + yield + + # *now* output should be passed through + op0 = yield dut.out_op.v[0] + op1 = yield dut.out_op.v[1] + assert op0 == 5 and op1 == 6 + + # set row 2 input + yield dut.rs[2].in_op[0].eq(3) + yield dut.rs[2].in_op[1].eq(4) + yield dut.rs[2].stb.eq(0b11) # strobe indicate 1st op ready + yield dut.out_op.ack.eq(1) # set output ack + yield + yield dut.rs[2].stb.eq(0) # clear row 2 strobe + yield dut.out_op.ack.eq(0) # set output ack + yield + op0 = yield dut.out_op.v[0] + op1 = yield dut.out_op.v[1] + assert op0 == 3 and op1 == 4, "op0 %d op1 %d" % (op0, op1) + out_mid = yield dut.mid + assert out_mid == 2 + + # set row 0 and 3 input + yield dut.rs[0].in_op[0].eq(9) + yield dut.rs[0].in_op[1].eq(8) + yield dut.rs[0].stb.eq(0b11) # strobe indicate 1st op ready + yield dut.rs[3].in_op[0].eq(1) + yield dut.rs[3].in_op[1].eq(2) + yield dut.rs[3].stb.eq(0b11) # strobe indicate 1st op ready + + # set acknowledgement of output... takes 1 cycle to respond + yield dut.out_op.ack.eq(1) + yield + yield dut.rs[0].stb.eq(0) # clear row 1 strobe + yield + out_mid = yield dut.mid + assert out_mid == 0, "out mid %d" % out_mid + + yield + yield dut.rs[3].stb.eq(0) # clear row 1 strobe + yield dut.out_op.ack.eq(0) # clear ack on output + yield + out_mid = yield dut.mid + assert out_mid == 3, "out mid %d" % out_mid + + +class InputTest: + def __init__(self, dut): + self.dut = dut + self.di = {} + self.do = {} + self.tlen = 10 + for mid in range(dut.num_rows): + self.di[mid] = {} + self.do[mid] = {} + for i in range(self.tlen): + self.di[mid][i] = randint(0, 100) + (mid<<8) + self.do[mid][i] = self.di[mid][i] + + def send(self, mid): + for i in range(self.tlen): + op2 = self.di[mid][i] + rs = dut.p[mid] + yield rs.valid_i.eq(1) + yield rs.data_i.data.eq(op2) + yield rs.data_i.idx.eq(i) + yield rs.data_i.mid.eq(mid) + yield + o_p_ready = yield rs.ready_o + while not o_p_ready: + yield + o_p_ready = yield rs.ready_o + + print ("send", mid, i, hex(op2)) + + yield rs.valid_i.eq(0) + # wait random period of time before queueing another value + for i in range(randint(0, 3)): + yield + + yield rs.valid_i.eq(0) + ## wait random period of time before queueing another value + #for i in range(randint(0, 3)): + # yield + + #send_range = randint(0, 3) + #if send_range == 0: + # send = True + #else: + # send = randint(0, send_range) != 0 + + def rcv(self): + while True: + #stall_range = randint(0, 3) + #for j in range(randint(1,10)): + # stall = randint(0, stall_range) != 0 + # yield self.dut.n[0].ready_i.eq(stall) + # yield + n = self.dut.n + yield n.ready_i.eq(1) + yield + o_n_valid = yield n.valid_o + i_n_ready = yield n.ready_i + if not o_n_valid or not i_n_ready: + continue + + mid = yield n.data_o.mid + out_i = yield n.data_o.idx + out_v = yield n.data_o.data + + print ("recv", mid, out_i, hex(out_v)) + + # see if this output has occurred already, delete it if it has + assert out_i in self.do[mid], "out_i %d not in array %s" % \ + (out_i, repr(self.do[mid])) + assert self.do[mid][out_i] == out_v # pass-through data + del self.do[mid][out_i] + + # check if there's any more outputs + zerolen = True + for (k, v) in self.do.items(): + if v: + zerolen = False + if zerolen: + break + + +class TestPriorityMuxPipe(PriorityCombMuxInPipe): + def __init__(self): + self.num_rows = 4 + def iospecfn(): return PassData() + stage = PassThroughStage(iospecfn) + PriorityCombMuxInPipe.__init__(self, stage, p_len=self.num_rows) + + +if __name__ == '__main__': + dut = TestPriorityMuxPipe() + vl = rtlil.convert(dut, ports=dut.ports()) + with open("test_inputgroup_multi.il", "w") as f: + f.write(vl) + #run_simulation(dut, testbench(dut), vcd_name="test_inputgroup.vcd") + + test = InputTest(dut) + run_simulation(dut, [test.send(1), test.send(0), + test.send(3), test.send(2), + test.rcv()], + vcd_name="test_inputgroup_multi.vcd") +