speed up ==, hash, <, >, <=, and >= for plain_data
[nmutil.git] / src / nmutil / test / test_inout_mux_pipe.py
1 """ key strategic example showing how to do multi-input fan-in into a
2 multi-stage pipeline, then multi-output fanout.
3
4 the multiplex ID from the fan-in is passed in to the pipeline, preserved,
5 and used as a routing ID on the fanout.
6 """
7
8 from random import randint
9 from math import log
10 from nmigen import Module, Signal, Cat, Value, Elaboratable
11 from nmigen.compat.sim import run_simulation
12 from nmigen.cli import verilog, rtlil
13
14 from nmutil.multipipe import CombMultiOutPipeline, CombMuxOutPipe
15 from nmutil.multipipe import PriorityCombMuxInPipe
16 from nmutil.singlepipe import SimpleHandshake, RecordObject, Object
17
18
19 class PassData2(RecordObject):
20 def __init__(self):
21 RecordObject.__init__(self)
22 self.muxid = Signal(2, reset_less=True)
23 self.idx = Signal(8, reset_less=True)
24 self.data = Signal(16, reset_less=True)
25
26
27 class PassData(Object):
28 def __init__(self):
29 Object.__init__(self)
30 self.muxid = Signal(2, reset_less=True)
31 self.idx = Signal(8, reset_less=True)
32 self.data = Signal(16, reset_less=True)
33
34
35
36 class PassThroughStage:
37 def ispec(self):
38 return PassData()
39 def ospec(self):
40 return self.ispec() # same as ospec
41
42 def process(self, i):
43 return i # pass-through
44
45
46
47 class PassThroughPipe(SimpleHandshake):
48 def __init__(self):
49 SimpleHandshake.__init__(self, PassThroughStage())
50
51
52 class InputTest:
53 def __init__(self, dut):
54 self.dut = dut
55 self.di = {}
56 self.do = {}
57 self.tlen = 100
58 for muxid in range(dut.num_rows):
59 self.di[muxid] = {}
60 self.do[muxid] = {}
61 for i in range(self.tlen):
62 self.di[muxid][i] = randint(0, 255) + (muxid<<8)
63 self.do[muxid][i] = self.di[muxid][i]
64
65 def send(self, muxid):
66 for i in range(self.tlen):
67 op2 = self.di[muxid][i]
68 rs = self.dut.p[muxid]
69 yield rs.i_valid.eq(1)
70 yield rs.i_data.data.eq(op2)
71 yield rs.i_data.idx.eq(i)
72 yield rs.i_data.muxid.eq(muxid)
73 yield
74 o_p_ready = yield rs.o_ready
75 while not o_p_ready:
76 yield
77 o_p_ready = yield rs.o_ready
78
79 print ("send", muxid, i, hex(op2))
80
81 yield rs.i_valid.eq(0)
82 # wait random period of time before queueing another value
83 for i in range(randint(0, 3)):
84 yield
85
86 yield rs.i_valid.eq(0)
87 yield
88
89 print ("send ended", muxid)
90
91 ## wait random period of time before queueing another value
92 #for i in range(randint(0, 3)):
93 # yield
94
95 #send_range = randint(0, 3)
96 #if send_range == 0:
97 # send = True
98 #else:
99 # send = randint(0, send_range) != 0
100
101 def rcv(self, muxid):
102 while True:
103 #stall_range = randint(0, 3)
104 #for j in range(randint(1,10)):
105 # stall = randint(0, stall_range) != 0
106 # yield self.dut.n[0].i_ready.eq(stall)
107 # yield
108 n = self.dut.n[muxid]
109 yield n.i_ready.eq(1)
110 yield
111 o_n_valid = yield n.o_valid
112 i_n_ready = yield n.i_ready
113 if not o_n_valid or not i_n_ready:
114 continue
115
116 out_muxid = yield n.o_data.muxid
117 out_i = yield n.o_data.idx
118 out_v = yield n.o_data.data
119
120 print ("recv", out_muxid, out_i, hex(out_v))
121
122 # see if this output has occurred already, delete it if it has
123 assert muxid == out_muxid, \
124 "out_muxid %d not correct %d" % (out_muxid, muxid)
125 assert out_i in self.do[muxid], "out_i %d not in array %s" % \
126 (out_i, repr(self.do[muxid]))
127 assert self.do[muxid][out_i] == out_v # pass-through data
128 del self.do[muxid][out_i]
129
130 # check if there's any more outputs
131 if len(self.do[muxid]) == 0:
132 break
133 print ("recv ended", muxid)
134
135
136 class TestPriorityMuxPipe(PriorityCombMuxInPipe):
137 def __init__(self, num_rows):
138 self.num_rows = num_rows
139 stage = PassThroughStage()
140 PriorityCombMuxInPipe.__init__(self, stage, p_len=self.num_rows)
141
142
143 class OutputTest:
144 def __init__(self, dut):
145 self.dut = dut
146 self.di = []
147 self.do = {}
148 self.tlen = 100
149 for i in range(self.tlen * dut.num_rows):
150 if i < dut.num_rows:
151 muxid = i
152 else:
153 muxid = randint(0, dut.num_rows-1)
154 data = randint(0, 255) + (muxid<<8)
155
156 def send(self):
157 for i in range(self.tlen * dut.num_rows):
158 op2 = self.di[i][0]
159 muxid = self.di[i][1]
160 rs = dut.p
161 yield rs.i_valid.eq(1)
162 yield rs.i_data.data.eq(op2)
163 yield rs.i_data.muxid.eq(muxid)
164 yield
165 o_p_ready = yield rs.o_ready
166 while not o_p_ready:
167 yield
168 o_p_ready = yield rs.o_ready
169
170 print ("send", muxid, i, hex(op2))
171
172 yield rs.i_valid.eq(0)
173 # wait random period of time before queueing another value
174 for i in range(randint(0, 3)):
175 yield
176
177 yield rs.i_valid.eq(0)
178
179
180 class TestMuxOutPipe(CombMuxOutPipe):
181 def __init__(self, num_rows):
182 self.num_rows = num_rows
183 stage = PassThroughStage()
184 CombMuxOutPipe.__init__(self, stage, n_len=self.num_rows)
185
186
187 class TestInOutPipe(Elaboratable):
188 def __init__(self, num_rows=4):
189 self.num_rows = num_rows
190 self.inpipe = TestPriorityMuxPipe(num_rows) # fan-in (combinatorial)
191 self.pipe1 = PassThroughPipe() # stage 1 (clock-sync)
192 self.pipe2 = PassThroughPipe() # stage 2 (clock-sync)
193 self.outpipe = TestMuxOutPipe(num_rows) # fan-out (combinatorial)
194
195 self.p = self.inpipe.p # kinda annoying,
196 self.n = self.outpipe.n # use pipe in/out as this class in/out
197 self._ports = self.inpipe.ports() + self.outpipe.ports()
198
199 def elaborate(self, platform):
200 m = Module()
201 m.submodules.inpipe = self.inpipe
202 m.submodules.pipe1 = self.pipe1
203 m.submodules.pipe2 = self.pipe2
204 m.submodules.outpipe = self.outpipe
205
206 m.d.comb += self.inpipe.n.connect_to_next(self.pipe1.p)
207 m.d.comb += self.pipe1.connect_to_next(self.pipe2)
208 m.d.comb += self.pipe2.connect_to_next(self.outpipe)
209
210 return m
211
212 def ports(self):
213 return self._ports
214
215
216 def test1():
217 dut = TestInOutPipe()
218 vl = rtlil.convert(dut, ports=dut.ports())
219 with open("test_inoutmux_pipe.il", "w") as f:
220 f.write(vl)
221 #run_simulation(dut, testbench(dut), vcd_name="test_inputgroup.vcd")
222
223 test = InputTest(dut)
224 run_simulation(dut, [test.rcv(1), test.rcv(0),
225 test.rcv(3), test.rcv(2),
226 test.send(0), test.send(1),
227 test.send(3), test.send(2),
228 ],
229 vcd_name="test_inoutmux_pipe.vcd")
230
231 if __name__ == '__main__':
232 test1()