""" Unit tests for Buffered and Unbuffered pipelines contains useful worked examples of how to use the Pipeline API, including: * Combinatorial Stage "Chaining" * class-based data stages * nmigen module-based data stages * special nmigen module-based data stage, where the stage *is* the module * Record-based data stages * static-class data stages * multi-stage pipelines (and how to connect them) * how to *use* the pipelines (see Test5) - how to get data in and out """ from nmigen import Module, Signal, Mux, Const, Elaboratable from nmigen.hdl.rec import Record from nmigen.compat.sim import run_simulation from nmigen.cli import verilog, rtlil from nmutil.test.example_buf_pipe import ExampleBufPipe, ExampleBufPipeAdd from nmutil.test.example_buf_pipe import ExamplePipeline, UnbufferedPipeline from nmutil.test.example_buf_pipe import ExampleStageCls from nmutil.iocontrol import PrevControl, NextControl from nmutil.stageapi import StageChain, StageCls from nmutil.singlepipe import ControlBase from nmutil.singlepipe import UnbufferedPipeline2 from nmutil.singlepipe import SimpleHandshake from nmutil.singlepipe import BufferedHandshake from nmutil.singlepipe import PassThroughHandshake from nmutil.singlepipe import PassThroughStage from nmutil.singlepipe import FIFOControl from nmutil.singlepipe import RecordObject from nmutil.singlepipe import MaskCancellable from random import randint, seed #seed(4) def check_o_n_valid(dut, val): o_n_valid = yield dut.n.valid_o assert o_n_valid == val def check_o_n_valid2(dut, val): o_n_valid = yield dut.n.valid_o assert o_n_valid == val def tbench(dut): #yield dut.i_p_rst.eq(1) yield dut.n.ready_i.eq(0) #yield dut.p.ready_o.eq(0) yield yield #yield dut.i_p_rst.eq(0) yield dut.n.ready_i.eq(1) yield dut.p.data_i.eq(5) yield dut.p.valid_i.eq(1) yield yield dut.p.data_i.eq(7) yield from check_o_n_valid(dut, 0) # effects of i_p_valid delayed yield yield from check_o_n_valid(dut, 1) # ok *now* i_p_valid effect is felt yield dut.p.data_i.eq(2) yield yield dut.n.ready_i.eq(0) # begin going into "stall" (next stage says ready) yield dut.p.data_i.eq(9) yield yield dut.p.valid_i.eq(0) yield dut.p.data_i.eq(12) yield yield dut.p.data_i.eq(32) yield dut.n.ready_i.eq(1) yield yield from check_o_n_valid(dut, 1) # buffer still needs to output yield yield from check_o_n_valid(dut, 1) # buffer still needs to output yield yield from check_o_n_valid(dut, 0) # buffer outputted, *now* we're done. yield def tbench2(dut): #yield dut.p.i_rst.eq(1) yield dut.n.ready_i.eq(0) #yield dut.p.ready_o.eq(0) yield yield #yield dut.p.i_rst.eq(0) yield dut.n.ready_i.eq(1) yield dut.p.data_i.eq(5) yield dut.p.valid_i.eq(1) yield yield dut.p.data_i.eq(7) yield from check_o_n_valid2(dut, 0) # effects of i_p_valid delayed 2 clocks yield yield from check_o_n_valid2(dut, 0) # effects of i_p_valid delayed 2 clocks yield dut.p.data_i.eq(2) yield yield from check_o_n_valid2(dut, 1) # ok *now* i_p_valid effect is felt yield dut.n.ready_i.eq(0) # begin going into "stall" (next stage says ready) yield dut.p.data_i.eq(9) yield yield dut.p.valid_i.eq(0) yield dut.p.data_i.eq(12) yield yield dut.p.data_i.eq(32) yield dut.n.ready_i.eq(1) yield yield from check_o_n_valid2(dut, 1) # buffer still needs to output yield yield from check_o_n_valid2(dut, 1) # buffer still needs to output yield yield from check_o_n_valid2(dut, 1) # buffer still needs to output yield yield from check_o_n_valid2(dut, 0) # buffer outputted, *now* we're done. yield yield yield class Test3: def __init__(self, dut, resultfn): self.dut = dut self.resultfn = resultfn self.data = [] for i in range(num_tests): #data.append(randint(0, 1<<16-1)) self.data.append(i+1) self.i = 0 self.o = 0 def send(self): while self.o != len(self.data): send_range = randint(0, 3) for j in range(randint(1,10)): if send_range == 0: send = True else: send = randint(0, send_range) != 0 o_p_ready = yield self.dut.p.ready_o if not o_p_ready: yield continue if send and self.i != len(self.data): yield self.dut.p.valid_i.eq(1) yield self.dut.p.data_i.eq(self.data[self.i]) self.i += 1 else: yield self.dut.p.valid_i.eq(0) yield def rcv(self): while self.o != len(self.data): stall_range = randint(0, 3) for j in range(randint(1,10)): stall = randint(0, stall_range) != 0 yield self.dut.n.ready_i.eq(stall) yield o_n_valid = yield self.dut.n.valid_o i_n_ready = yield self.dut.n.ready_i_test if not o_n_valid or not i_n_ready: continue data_o = yield self.dut.n.data_o self.resultfn(data_o, self.data[self.o], self.i, self.o) self.o += 1 if self.o == len(self.data): break def resultfn_3(data_o, expected, i, o): assert data_o == expected + 1, \ "%d-%d data %x not match %x\n" \ % (i, o, data_o, expected) def data_placeholder(): data = [] for i in range(num_tests): d = PlaceHolder() d.src1 = randint(0, 1<<16-1) d.src2 = randint(0, 1<<16-1) data.append(d) return data def data_dict(): data = [] for i in range(num_tests): data.append({'src1': randint(0, 1<<16-1), 'src2': randint(0, 1<<16-1)}) return data class Test5: def __init__(self, dut, resultfn, data=None, stage_ctl=False): self.dut = dut self.resultfn = resultfn self.stage_ctl = stage_ctl if data: self.data = data else: self.data = [] for i in range(num_tests): self.data.append((randint(0, 1<<16-1), randint(0, 1<<16-1))) self.i = 0 self.o = 0 def send(self): while self.o != len(self.data): send_range = randint(0, 3) for j in range(randint(1,10)): if send_range == 0: send = True else: send = randint(0, send_range) != 0 #send = True o_p_ready = yield self.dut.p.ready_o if not o_p_ready: yield continue if send and self.i != len(self.data): yield self.dut.p.valid_i.eq(1) for v in self.dut.set_input(self.data[self.i]): yield v self.i += 1 else: yield self.dut.p.valid_i.eq(0) yield def rcv(self): while self.o != len(self.data): stall_range = randint(0, 3) for j in range(randint(1,10)): ready = randint(0, stall_range) != 0 #ready = True yield self.dut.n.ready_i.eq(ready) yield o_n_valid = yield self.dut.n.valid_o i_n_ready = yield self.dut.n.ready_i_test if not o_n_valid or not i_n_ready: continue if isinstance(self.dut.n.data_o, Record): data_o = {} dod = self.dut.n.data_o for k, v in dod.fields.items(): data_o[k] = yield v else: data_o = yield self.dut.n.data_o self.resultfn(data_o, self.data[self.o], self.i, self.o) self.o += 1 if self.o == len(self.data): break class TestMask: def __init__(self, dut, resultfn, maskwid, data=None, stage_ctl=False, latching=False): self.dut = dut self.resultfn = resultfn self.stage_ctl = stage_ctl self.maskwid = maskwid self.latching = latching self.latchmode = 0 if data: self.data = data else: self.data = [] for i in range(num_tests): self.data.append((randint(0, 1<<16-1), randint(0, 1<<16-1))) self.i = 0 self.o = 0 def send(self): while self.o != len(self.data): send_range = randint(0, 3) for j in range(randint(1,10)): if send_range == 0: send = True else: send = randint(0, send_range) != 0 #send = True o_p_ready = yield self.dut.p.ready_o if not o_p_ready: yield continue if self.latching: latchtest = randint(0, 3) == 0 if latchtest: yield self.dut.p.valid_i.eq(0) yield self.dut.p.mask_i.eq(0) # wait for pipeline to flush, then invert state for i in range(10): yield self.latchmode = 1 - self.latchmode yield self.dut.latchmode.eq(self.latchmode) mode = yield self.dut.latchmode print ("latching", mode) if send and self.i != len(self.data): print ("send", self.i, self.data[self.i]) yield self.dut.p.valid_i.eq(1) yield self.dut.p.mask_i.eq(1<