speed up ==, hash, <, >, <=, and >= for plain_data
[nmutil.git] / src / nmutil / test / test_inout_mux_pipe.py
1 """ key strategic example showing how to do multi-input fan-in into a
2 multi-stage pipeline, then multi-output fanout.
3
4 the multiplex ID from the fan-in is passed in to the pipeline, preserved,
5 and used as a routing ID on the fanout.
6 """
7
8 from random import randint
9 from math import log
10 from nmigen import Module, Signal, Cat, Value, Elaboratable
11 from nmigen.compat.sim import run_simulation
12 from nmigen.cli import verilog, rtlil
13
14 from nmutil.multipipe import CombMultiOutPipeline, CombMuxOutPipe
15 from nmutil.multipipe import PriorityCombMuxInPipe
16 from nmutil.singlepipe import SimpleHandshake, RecordObject, Object
17
18
19 class PassData2(RecordObject):
20 def __init__(self):
21 RecordObject.__init__(self)
22 self.muxid = Signal(2, reset_less=True)
23 self.idx = Signal(8, reset_less=True)
24 self.data = Signal(16, reset_less=True)
25
26
27 class PassData(Object):
28 def __init__(self):
29 Object.__init__(self)
30 self.muxid = Signal(2, reset_less=True)
31 self.idx = Signal(8, reset_less=True)
32 self.data = Signal(16, reset_less=True)
33
34
35 class PassThroughStage:
36 def ispec(self):
37 return PassData()
38
39 def ospec(self):
40 return self.ispec() # same as ospec
41
42 def process(self, i):
43 return i # pass-through
44
45
46 class PassThroughPipe(SimpleHandshake):
47 def __init__(self):
48 SimpleHandshake.__init__(self, PassThroughStage())
49
50
51 class InputTest:
52 def __init__(self, dut):
53 self.dut = dut
54 self.di = {}
55 self.do = {}
56 self.tlen = 100
57 for muxid in range(dut.num_rows):
58 self.di[muxid] = {}
59 self.do[muxid] = {}
60 for i in range(self.tlen):
61 self.di[muxid][i] = randint(0, 255) + (muxid << 8)
62 self.do[muxid][i] = self.di[muxid][i]
63
64 def send(self, muxid):
65 for i in range(self.tlen):
66 op2 = self.di[muxid][i]
67 rs = self.dut.p[muxid]
68 yield rs.i_valid.eq(1)
69 yield rs.i_data.data.eq(op2)
70 yield rs.i_data.idx.eq(i)
71 yield rs.i_data.muxid.eq(muxid)
72 yield
73 o_p_ready = yield rs.o_ready
74 while not o_p_ready:
75 yield
76 o_p_ready = yield rs.o_ready
77
78 print("send", muxid, i, hex(op2))
79
80 yield rs.i_valid.eq(0)
81 # wait random period of time before queueing another value
82 for i in range(randint(0, 3)):
83 yield
84
85 yield rs.i_valid.eq(0)
86 yield
87
88 print("send ended", muxid)
89
90 # wait random period of time before queueing another value
91 # for i in range(randint(0, 3)):
92 # yield
93
94 #send_range = randint(0, 3)
95 # if send_range == 0:
96 # send = True
97 # else:
98 # send = randint(0, send_range) != 0
99
100 def rcv(self, muxid):
101 while True:
102 #stall_range = randint(0, 3)
103 # for j in range(randint(1,10)):
104 # stall = randint(0, stall_range) != 0
105 # yield self.dut.n[0].i_ready.eq(stall)
106 # yield
107 n = self.dut.n[muxid]
108 yield n.i_ready.eq(1)
109 yield
110 o_n_valid = yield n.o_valid
111 i_n_ready = yield n.i_ready
112 if not o_n_valid or not i_n_ready:
113 continue
114
115 out_muxid = yield n.o_data.muxid
116 out_i = yield n.o_data.idx
117 out_v = yield n.o_data.data
118
119 print("recv", out_muxid, out_i, hex(out_v))
120
121 # see if this output has occurred already, delete it if it has
122 assert muxid == out_muxid, \
123 "out_muxid %d not correct %d" % (out_muxid, muxid)
124 assert out_i in self.do[muxid], "out_i %d not in array %s" % \
125 (out_i, repr(self.do[muxid]))
126 assert self.do[muxid][out_i] == out_v # pass-through data
127 del self.do[muxid][out_i]
128
129 # check if there's any more outputs
130 if len(self.do[muxid]) == 0:
131 break
132 print("recv ended", muxid)
133
134
135 class TestPriorityMuxPipe(PriorityCombMuxInPipe):
136 def __init__(self, num_rows):
137 self.num_rows = num_rows
138 stage = PassThroughStage()
139 PriorityCombMuxInPipe.__init__(self, stage, p_len=self.num_rows)
140
141
142 class OutputTest:
143 def __init__(self, dut):
144 self.dut = dut
145 self.di = []
146 self.do = {}
147 self.tlen = 100
148 for i in range(self.tlen * dut.num_rows):
149 if i < dut.num_rows:
150 muxid = i
151 else:
152 muxid = randint(0, dut.num_rows-1)
153 data = randint(0, 255) + (muxid << 8)
154
155 def send(self):
156 for i in range(self.tlen * dut.num_rows):
157 op2 = self.di[i][0]
158 muxid = self.di[i][1]
159 rs = dut.p
160 yield rs.i_valid.eq(1)
161 yield rs.i_data.data.eq(op2)
162 yield rs.i_data.muxid.eq(muxid)
163 yield
164 o_p_ready = yield rs.o_ready
165 while not o_p_ready:
166 yield
167 o_p_ready = yield rs.o_ready
168
169 print("send", muxid, i, hex(op2))
170
171 yield rs.i_valid.eq(0)
172 # wait random period of time before queueing another value
173 for i in range(randint(0, 3)):
174 yield
175
176 yield rs.i_valid.eq(0)
177
178
179 class TestMuxOutPipe(CombMuxOutPipe):
180 def __init__(self, num_rows):
181 self.num_rows = num_rows
182 stage = PassThroughStage()
183 CombMuxOutPipe.__init__(self, stage, n_len=self.num_rows)
184
185
186 class TestInOutPipe(Elaboratable):
187 def __init__(self, num_rows=4):
188 self.num_rows = num_rows
189 self.inpipe = TestPriorityMuxPipe(num_rows) # fan-in (combinatorial)
190 self.pipe1 = PassThroughPipe() # stage 1 (clock-sync)
191 self.pipe2 = PassThroughPipe() # stage 2 (clock-sync)
192 self.outpipe = TestMuxOutPipe(num_rows) # fan-out (combinatorial)
193
194 self.p = self.inpipe.p # kinda annoying,
195 self.n = self.outpipe.n # use pipe in/out as this class in/out
196 self._ports = self.inpipe.ports() + self.outpipe.ports()
197
198 def elaborate(self, platform):
199 m = Module()
200 m.submodules.inpipe = self.inpipe
201 m.submodules.pipe1 = self.pipe1
202 m.submodules.pipe2 = self.pipe2
203 m.submodules.outpipe = self.outpipe
204
205 m.d.comb += self.inpipe.n.connect_to_next(self.pipe1.p)
206 m.d.comb += self.pipe1.connect_to_next(self.pipe2)
207 m.d.comb += self.pipe2.connect_to_next(self.outpipe)
208
209 return m
210
211 def ports(self):
212 return self._ports
213
214
215 def test1():
216 dut = TestInOutPipe()
217 vl = rtlil.convert(dut, ports=dut.ports())
218 with open("test_inoutmux_pipe.il", "w") as f:
219 f.write(vl)
220 #run_simulation(dut, testbench(dut), vcd_name="test_inputgroup.vcd")
221
222 test = InputTest(dut)
223 run_simulation(dut, [test.rcv(1), test.rcv(0),
224 test.rcv(3), test.rcv(2),
225 test.send(0), test.send(1),
226 test.send(3), test.send(2),
227 ],
228 vcd_name="test_inoutmux_pipe.vcd")
229
230
231 if __name__ == '__main__':
232 test1()