add start of outputmux pipe test
[ieee754fpu.git] / src / add / test_outmux_pipe.py
1 from random import randint
2 from math import log
3 from nmigen import Module, Signal, Cat
4 from nmigen.compat.sim import run_simulation
5 from nmigen.cli import verilog, rtlil
6
7 from multipipe import CombMultiOutPipeline
8
9
10 class MuxUnbufferedPipeline(CombMultiOutPipeline):
11 def __init__(self, stage, n_len):
12 # HACK: stage is also the n-way multiplexer
13 CombMultiOutPipeline.__init__(self, stage, n_len=n_len, n_mux=stage)
14
15 # HACK: n-mux is also the stage... so set the muxid equal to input mid
16 stage.m_id = self.p.i_data.mid
17
18 def ports(self):
19 return self.p_mux.ports()
20
21
22 class PassInData:
23 def __init__(self):
24 self.mid = Signal(2, reset_less=True)
25 self.data = Signal(16, reset_less=True)
26
27 def eq(self, i):
28 return [self.mid.eq(i.mid), self.data.eq(i.data)]
29
30 def ports(self):
31 return [self.mid, self.data]
32
33
34 class PassThroughStage:
35
36 def ispec(self):
37 return PassInData()
38
39 def ospec(self):
40 return Signal(16, reset_less=True)
41
42 def process(self, i):
43 return i.data
44
45
46
47 def testbench(dut):
48 stb = yield dut.out_op.stb
49 assert stb == 0
50 ack = yield dut.out_op.ack
51 assert ack == 0
52
53 # set row 1 input 0
54 yield dut.rs[1].in_op[0].eq(5)
55 yield dut.rs[1].stb.eq(0b01) # strobe indicate 1st op ready
56 #yield dut.rs[1].ack.eq(1)
57 yield
58
59 # check row 1 output (should be inactive)
60 decode = yield dut.rs[1].out_decode
61 assert decode == 0
62 if False:
63 op0 = yield dut.rs[1].out_op[0]
64 op1 = yield dut.rs[1].out_op[1]
65 assert op0 == 0 and op1 == 0
66
67 # output should be inactive
68 out_stb = yield dut.out_op.stb
69 assert out_stb == 1
70
71 # set row 0 input 1
72 yield dut.rs[1].in_op[1].eq(6)
73 yield dut.rs[1].stb.eq(0b11) # strobe indicate both ops ready
74
75 # set acknowledgement of output... takes 1 cycle to respond
76 yield dut.out_op.ack.eq(1)
77 yield
78 yield dut.out_op.ack.eq(0) # clear ack on output
79 yield dut.rs[1].stb.eq(0) # clear row 1 strobe
80
81 # output strobe should be active, MID should be 0 until "ack" is set...
82 out_stb = yield dut.out_op.stb
83 assert out_stb == 1
84 out_mid = yield dut.mid
85 assert out_mid == 0
86
87 # ... and output should not yet be passed through either
88 op0 = yield dut.out_op.v[0]
89 op1 = yield dut.out_op.v[1]
90 assert op0 == 0 and op1 == 0
91
92 # wait for out_op.ack to activate...
93 yield dut.rs[1].stb.eq(0b00) # set row 1 strobes to zero
94 yield
95
96 # *now* output should be passed through
97 op0 = yield dut.out_op.v[0]
98 op1 = yield dut.out_op.v[1]
99 assert op0 == 5 and op1 == 6
100
101 # set row 2 input
102 yield dut.rs[2].in_op[0].eq(3)
103 yield dut.rs[2].in_op[1].eq(4)
104 yield dut.rs[2].stb.eq(0b11) # strobe indicate 1st op ready
105 yield dut.out_op.ack.eq(1) # set output ack
106 yield
107 yield dut.rs[2].stb.eq(0) # clear row 2 strobe
108 yield dut.out_op.ack.eq(0) # set output ack
109 yield
110 op0 = yield dut.out_op.v[0]
111 op1 = yield dut.out_op.v[1]
112 assert op0 == 3 and op1 == 4, "op0 %d op1 %d" % (op0, op1)
113 out_mid = yield dut.mid
114 assert out_mid == 2
115
116 # set row 0 and 3 input
117 yield dut.rs[0].in_op[0].eq(9)
118 yield dut.rs[0].in_op[1].eq(8)
119 yield dut.rs[0].stb.eq(0b11) # strobe indicate 1st op ready
120 yield dut.rs[3].in_op[0].eq(1)
121 yield dut.rs[3].in_op[1].eq(2)
122 yield dut.rs[3].stb.eq(0b11) # strobe indicate 1st op ready
123
124 # set acknowledgement of output... takes 1 cycle to respond
125 yield dut.out_op.ack.eq(1)
126 yield
127 yield dut.rs[0].stb.eq(0) # clear row 1 strobe
128 yield
129 out_mid = yield dut.mid
130 assert out_mid == 0, "out mid %d" % out_mid
131
132 yield
133 yield dut.rs[3].stb.eq(0) # clear row 1 strobe
134 yield dut.out_op.ack.eq(0) # clear ack on output
135 yield
136 out_mid = yield dut.mid
137 assert out_mid == 3, "out mid %d" % out_mid
138
139
140 class OutputTest:
141 def __init__(self, dut):
142 self.dut = dut
143 self.di = []
144 self.do = {}
145 self.tlen = 3
146 for i in range(self.tlen * dut.num_rows):
147 mid = randint(0, dut.num_rows-1)
148 data = randint(0, 255) + (mid<<8)
149 if mid not in self.do:
150 self.do[mid] = []
151 self.di.append((data, mid))
152 self.do[mid].append(data)
153
154 def send(self):
155 for i in range(self.tlen * dut.num_rows):
156 op2 = self.di[i][0]
157 mid = self.di[i][1]
158 rs = dut.p
159 yield rs.i_valid.eq(1)
160 yield rs.i_data.data.eq(op2)
161 yield rs.i_data.mid.eq(mid)
162 yield
163 o_p_ready = yield rs.o_ready
164 while not o_p_ready:
165 yield
166 o_p_ready = yield rs.o_ready
167
168 print ("send", mid, i, hex(op2))
169
170 yield rs.i_valid.eq(0)
171 # wait random period of time before queueing another value
172 for i in range(randint(0, 3)):
173 yield
174
175 yield rs.i_valid.eq(0)
176
177 def rcv(self, mid):
178 out_i = 0
179 count = 0
180 while out_i != self.tlen:
181 count += 1
182 if count == 1000:
183 break
184 #stall_range = randint(0, 3)
185 #for j in range(randint(1,10)):
186 # stall = randint(0, stall_range) != 0
187 # yield self.dut.n[0].i_ready.eq(stall)
188 # yield
189 n = self.dut.n[mid]
190 yield n.i_ready.eq(1)
191 yield
192 o_n_valid = yield n.o_valid
193 i_n_ready = yield n.i_ready
194 if not o_n_valid or not i_n_ready:
195 continue
196
197 out_v = yield n.o_data
198
199 print ("recv", mid, out_i, hex(out_v))
200
201 assert self.do[mid][out_i] == out_v # pass-through data
202
203 out_i += 1
204
205
206 class TestPriorityMuxPipe(MuxUnbufferedPipeline):
207 def __init__(self):
208 self.num_rows = 4
209 stage = PassThroughStage()
210 MuxUnbufferedPipeline.__init__(self, stage, n_len=self.num_rows)
211
212 def ports(self):
213 res = [self.p.i_valid, self.p.o_ready] + \
214 self.p.i_data.ports()
215 for i in range(len(self.n)):
216 res += [self.n[i].i_ready, self.n[i].o_valid] + \
217 [self.n[i].o_data]
218 #self.n[i].o_data.ports()
219 return res
220
221
222 if __name__ == '__main__':
223 dut = TestPriorityMuxPipe()
224 vl = rtlil.convert(dut, ports=dut.ports())
225 with open("test_outmux_pipe.il", "w") as f:
226 f.write(vl)
227 #run_simulation(dut, testbench(dut), vcd_name="test_inputgroup.vcd")
228
229 test = OutputTest(dut)
230 run_simulation(dut, [test.rcv(1), test.rcv(0),
231 test.rcv(3), test.rcv(2),
232 test.send()],
233 vcd_name="test_outmux_pipe.vcd")
234