896002253b8ad0163c945ce760c90ac1c493b282
[nmutil.git] / src / nmutil / test / test_inout_unary_mux_cancel_pipe.py
1 """ key strategic example showing how to do multi-input fan-in into a
2 multi-stage pipeline, then multi-output fanout, with an unary muxid
3 and cancellation
4
5 the multiplex ID from the fan-in is passed in to the pipeline, preserved,
6 and used as a routing ID on the fanout.
7 """
8
9 from random import randint
10 from math import log
11 from nmigen import Module, Signal, Cat, Value, Elaboratable
12 from nmigen.compat.sim import run_simulation
13 from nmigen.cli import verilog, rtlil
14
15 from nmutil.multipipe import CombMultiOutPipeline, CombMuxOutPipe
16 from nmutil.multipipe import PriorityCombMuxInPipe
17 from nmutil.singlepipe import MaskCancellable, RecordObject, Object
18
19
20 class PassData2(RecordObject):
21 def __init__(self):
22 RecordObject.__init__(self)
23 self.muxid = Signal(2, reset_less=True)
24 self.idx = Signal(8, reset_less=True)
25 self.data = Signal(16, reset_less=True)
26
27
28 class PassData(Object):
29 def __init__(self):
30 Object.__init__(self)
31 self.muxid = Signal(2, reset_less=True)
32 self.idx = Signal(8, reset_less=True)
33 self.data = Signal(16, reset_less=True)
34
35
36
37 class PassThroughStage:
38 def ispec(self):
39 return PassData()
40 def ospec(self):
41 return self.ispec() # same as ospec
42
43 def process(self, i):
44 return i # pass-through
45
46
47
48 class PassThroughPipe(MaskCancellable):
49 def __init__(self, maskwid):
50 MaskCancellable.__init__(self, PassThroughStage(), maskwid)
51
52
53 class InputTest:
54 def __init__(self, dut, tlen):
55 self.dut = dut
56 self.di = {}
57 self.do = {}
58 self.sent = {}
59 self.tlen = tlen
60 for muxid in range(dut.num_rows):
61 self.di[muxid] = {}
62 self.do[muxid] = {}
63 self.sent[muxid] = []
64 for i in range(self.tlen):
65 self.di[muxid][i] = randint(0, 255) + (muxid<<8)
66 self.do[muxid][i] = self.di[muxid][i]
67
68 def send(self, muxid):
69 for i in range(self.tlen):
70 op2 = self.di[muxid][i]
71 rs = self.dut.p[muxid]
72 yield rs.i_valid.eq(1)
73 yield rs.data_i.data.eq(op2)
74 yield rs.data_i.idx.eq(i)
75 yield rs.data_i.muxid.eq(muxid)
76 yield rs.mask_i.eq(1)
77 yield
78 o_p_ready = yield rs.o_ready
79 while not o_p_ready:
80 yield
81 o_p_ready = yield rs.o_ready
82
83 print ("send", muxid, i, hex(op2), op2)
84 self.sent[muxid].append(i)
85
86 yield rs.i_valid.eq(0)
87 yield rs.mask_i.eq(0)
88 # wait until it's received
89 while i in self.do[muxid]:
90 yield
91
92 # wait random period of time before queueing another value
93 for i in range(randint(0, 3)):
94 yield
95
96 yield rs.i_valid.eq(0)
97 yield
98
99 print ("send ended", muxid)
100
101 ## wait random period of time before queueing another value
102 #for i in range(randint(0, 3)):
103 # yield
104
105 #send_range = randint(0, 3)
106 #if send_range == 0:
107 # send = True
108 #else:
109 # send = randint(0, send_range) != 0
110
111 def rcv(self, muxid):
112 rs = self.dut.p[muxid]
113 while True:
114
115 # check cancellation
116 if self.sent[muxid] and randint(0, 2) == 0:
117 todel = self.sent[muxid].pop()
118 print ("to delete", muxid, self.sent[muxid], todel)
119 if todel in self.do[muxid]:
120 del self.do[muxid][todel]
121 yield rs.stop_i.eq(1)
122 print ("left", muxid, self.do[muxid])
123 if len(self.do[muxid]) == 0:
124 break
125
126 stall_range = randint(0, 3)
127 for j in range(randint(1,10)):
128 stall = randint(0, stall_range) != 0
129 yield self.dut.n[0].i_ready.eq(stall)
130 yield
131
132 n = self.dut.n[muxid]
133 yield n.i_ready.eq(1)
134 yield
135 yield rs.stop_i.eq(0) # resets cancel mask
136 o_n_valid = yield n.o_valid
137 i_n_ready = yield n.i_ready
138 if not o_n_valid or not i_n_ready:
139 continue
140
141 out_muxid = yield n.data_o.muxid
142 out_i = yield n.data_o.idx
143 out_v = yield n.data_o.data
144
145 print ("recv", out_muxid, out_i, hex(out_v), out_v)
146
147 # see if this output has occurred already, delete it if it has
148 assert muxid == out_muxid, \
149 "out_muxid %d not correct %d" % (out_muxid, muxid)
150 if out_i not in self.sent[muxid]:
151 print ("cancelled/recv", muxid, out_i)
152 continue
153 assert out_i in self.do[muxid], "out_i %d not in array %s" % \
154 (out_i, repr(self.do[muxid]))
155 assert self.do[muxid][out_i] == out_v # pass-through data
156 del self.do[muxid][out_i]
157 todel = self.sent[muxid].index(out_i)
158 del self.sent[muxid][todel]
159
160 # check if there's any more outputs
161 if len(self.do[muxid]) == 0:
162 break
163
164 print ("recv ended", muxid)
165
166
167 class TestPriorityMuxPipe(PriorityCombMuxInPipe):
168 def __init__(self, num_rows):
169 self.num_rows = num_rows
170 stage = PassThroughStage()
171 PriorityCombMuxInPipe.__init__(self, stage,
172 p_len=self.num_rows, maskwid=1)
173
174
175 class TestMuxOutPipe(CombMuxOutPipe):
176 def __init__(self, num_rows):
177 self.num_rows = num_rows
178 stage = PassThroughStage()
179 CombMuxOutPipe.__init__(self, stage, n_len=self.num_rows,
180 maskwid=1)
181
182
183 class TestInOutPipe(Elaboratable):
184 def __init__(self, num_rows=4):
185 self.num_rows = nr = num_rows
186 self.inpipe = TestPriorityMuxPipe(nr) # fan-in (combinatorial)
187 self.pipe1 = PassThroughPipe(nr) # stage 1 (clock-sync)
188 self.pipe2 = PassThroughPipe(nr) # stage 2 (clock-sync)
189 self.pipe3 = PassThroughPipe(nr) # stage 3 (clock-sync)
190 self.pipe4 = PassThroughPipe(nr) # stage 4 (clock-sync)
191 self.outpipe = TestMuxOutPipe(nr) # fan-out (combinatorial)
192
193 self.p = self.inpipe.p # kinda annoying,
194 self.n = self.outpipe.n # use pipe in/out as this class in/out
195 self._ports = self.inpipe.ports() + self.outpipe.ports()
196
197 def elaborate(self, platform):
198 m = Module()
199 m.submodules.inpipe = self.inpipe
200 m.submodules.pipe1 = self.pipe1
201 m.submodules.pipe2 = self.pipe2
202 m.submodules.pipe3 = self.pipe3
203 m.submodules.pipe4 = self.pipe4
204 m.submodules.outpipe = self.outpipe
205
206 m.d.comb += self.inpipe.n.connect_to_next(self.pipe1.p)
207 m.d.comb += self.pipe1.connect_to_next(self.pipe2)
208 m.d.comb += self.pipe2.connect_to_next(self.pipe3)
209 m.d.comb += self.pipe3.connect_to_next(self.pipe4)
210 m.d.comb += self.pipe4.connect_to_next(self.outpipe)
211
212 return m
213
214 def ports(self):
215 return self._ports
216
217
218 def test1():
219 dut = TestInOutPipe()
220 vl = rtlil.convert(dut, ports=dut.ports())
221 with open("test_inoutmux_unarycancel_pipe.il", "w") as f:
222 f.write(vl)
223
224 tlen = 20
225
226 test = InputTest(dut, tlen)
227 run_simulation(dut, [test.rcv(1), test.rcv(0),
228 test.rcv(3), test.rcv(2),
229 test.send(0), test.send(1),
230 test.send(3), test.send(2),
231 ],
232 vcd_name="test_inoutmux_unarycancel_pipe.vcd")
233
234 if __name__ == '__main__':
235 test1()