1 from nmigen
import Module
, Signal
, Mux
2 from nmigen
.compat
.sim
import run_simulation
3 from example_buf_pipe
import ExampleBufPipe
, ExampleBufPipeAdd
4 from example_buf_pipe
import ExampleCombPipe
, CombPipe
5 from example_buf_pipe
import PrevControl
, NextControl
6 from random
import randint
9 def check_o_n_valid(dut
, val
):
10 o_n_valid
= yield dut
.n
.o_valid
11 assert o_n_valid
== val
15 #yield dut.i_p_rst.eq(1)
16 yield dut
.n
.i_ready
.eq(0)
17 yield dut
.p
.o_ready
.eq(0)
20 #yield dut.i_p_rst.eq(0)
21 yield dut
.n
.i_ready
.eq(1)
22 yield dut
.p
.data
.eq(5)
23 yield dut
.p
.i_valid
.eq(1)
26 yield dut
.p
.data
.eq(7)
27 yield from check_o_n_valid(dut
, 0) # effects of i_p_valid delayed
29 yield from check_o_n_valid(dut
, 1) # ok *now* i_p_valid effect is felt
31 yield dut
.p
.data
.eq(2)
33 yield dut
.n
.i_ready
.eq(0) # begin going into "stall" (next stage says ready)
34 yield dut
.p
.data
.eq(9)
36 yield dut
.p
.i_valid
.eq(0)
37 yield dut
.p
.data
.eq(12)
39 yield dut
.p
.data
.eq(32)
40 yield dut
.n
.i_ready
.eq(1)
42 yield from check_o_n_valid(dut
, 1) # buffer still needs to output
44 yield from check_o_n_valid(dut
, 1) # buffer still needs to output
46 yield from check_o_n_valid(dut
, 0) # buffer outputted, *now* we're done.
51 #yield dut.p.i_rst.eq(1)
52 yield dut
.n
.i_ready
.eq(0)
53 #yield dut.p.o_ready.eq(0)
56 #yield dut.p.i_rst.eq(0)
57 yield dut
.n
.i_ready
.eq(1)
58 yield dut
.p
.data
.eq(5)
59 yield dut
.p
.i_valid
.eq(1)
62 yield dut
.p
.data
.eq(7)
63 yield from check_o_n_valid(dut
, 0) # effects of i_p_valid delayed 2 clocks
65 yield from check_o_n_valid(dut
, 0) # effects of i_p_valid delayed 2 clocks
67 yield dut
.p
.data
.eq(2)
69 yield from check_o_n_valid(dut
, 1) # ok *now* i_p_valid effect is felt
70 yield dut
.n
.i_ready
.eq(0) # begin going into "stall" (next stage says ready)
71 yield dut
.p
.data
.eq(9)
73 yield dut
.p
.i_valid
.eq(0)
74 yield dut
.p
.data
.eq(12)
76 yield dut
.p
.data
.eq(32)
77 yield dut
.n
.i_ready
.eq(1)
79 yield from check_o_n_valid(dut
, 1) # buffer still needs to output
81 yield from check_o_n_valid(dut
, 1) # buffer still needs to output
83 yield from check_o_n_valid(dut
, 1) # buffer still needs to output
85 yield from check_o_n_valid(dut
, 0) # buffer outputted, *now* we're done.
92 def __init__(self
, dut
, resultfn
):
94 self
.resultfn
= resultfn
96 for i
in range(num_tests
):
97 #data.append(randint(0, 1<<16-1))
103 while self
.o
!= len(self
.data
):
104 send_range
= randint(0, 3)
105 for j
in range(randint(1,10)):
109 send
= randint(0, send_range
) != 0
110 o_p_ready
= yield self
.dut
.p
.o_ready
114 if send
and self
.i
!= len(self
.data
):
115 yield self
.dut
.p
.i_valid
.eq(1)
116 yield self
.dut
.p
.data
.eq(self
.data
[self
.i
])
119 yield self
.dut
.p
.i_valid
.eq(0)
123 while self
.o
!= len(self
.data
):
124 stall_range
= randint(0, 3)
125 for j
in range(randint(1,10)):
126 stall
= randint(0, stall_range
) != 0
127 yield self
.dut
.n
.i_ready
.eq(stall
)
129 o_n_valid
= yield self
.dut
.n
.o_valid
130 i_n_ready
= yield self
.dut
.n
.i_ready
131 if not o_n_valid
or not i_n_ready
:
133 o_data
= yield self
.dut
.n
.data
134 self
.resultfn(o_data
, self
.data
[self
.o
], self
.i
, self
.o
)
136 if self
.o
== len(self
.data
):
139 def test3_resultfn(o_data
, expected
, i
, o
):
140 assert o_data
== expected
+ 1, \
141 "%d-%d data %x not match %x\n" \
142 % (i
, o
, o_data
, expected
)
145 def __init__(self
, dut
, resultfn
):
147 self
.resultfn
= resultfn
149 for i
in range(num_tests
):
150 self
.data
.append((randint(0, 1<<16-1), randint(0, 1<<16-1)))
155 while self
.o
!= len(self
.data
):
156 send_range
= randint(0, 3)
157 for j
in range(randint(1,10)):
161 send
= randint(0, send_range
) != 0
162 o_p_ready
= yield self
.dut
.p
.o_ready
166 if send
and self
.i
!= len(self
.data
):
167 yield self
.dut
.p
.i_valid
.eq(1)
168 for v
in self
.dut
.set_input(self
.data
[self
.i
]):
172 yield self
.dut
.p
.i_valid
.eq(0)
176 while self
.o
!= len(self
.data
):
177 stall_range
= randint(0, 3)
178 for j
in range(randint(1,10)):
179 stall
= randint(0, stall_range
) != 0
180 yield self
.dut
.n
.i_ready
.eq(stall
)
182 o_n_valid
= yield self
.dut
.n
.o_valid
183 i_n_ready
= yield self
.dut
.n
.i_ready
184 if not o_n_valid
or not i_n_ready
:
186 o_data
= yield self
.dut
.n
.data
187 self
.resultfn(o_data
, self
.data
[self
.o
], self
.i
, self
.o
)
189 if self
.o
== len(self
.data
):
192 def test5_resultfn(o_data
, expected
, i
, o
):
193 res
= expected
[0] + expected
[1]
194 assert o_data
== res
, \
195 "%d-%d data %x not match %s\n" \
196 % (i
, o
, o_data
, repr(expected
))
200 for i
in range(num_tests
):
201 #data.append(randint(0, 1<<16-1))
206 stall
= randint(0, 3) != 0
207 send
= randint(0, 5) != 0
208 yield dut
.n
.i_ready
.eq(stall
)
209 o_p_ready
= yield dut
.p
.o_ready
211 if send
and i
!= len(data
):
212 yield dut
.p
.i_valid
.eq(1)
213 yield dut
.p
.data
.eq(data
[i
])
216 yield dut
.p
.i_valid
.eq(0)
218 o_n_valid
= yield dut
.n
.o_valid
219 i_n_ready
= yield dut
.n
.i_ready
220 if o_n_valid
and i_n_ready
:
221 o_data
= yield dut
.n
.data
222 assert o_data
== data
[o
] + 2, "%d-%d data %x not match %x\n" \
223 % (i
, o
, o_data
, data
[o
])
229 class ExampleBufPipe2
:
231 connect these: ------|---------------|
233 i_p_valid >>in pipe1 o_n_valid out>> i_p_valid >>in pipe2
234 o_p_ready <<out pipe1 i_n_ready <<in o_p_ready <<out pipe2
235 p_data >>in pipe1 o_data out>> p_data >>in pipe2
238 self
.pipe1
= ExampleBufPipe()
239 self
.pipe2
= ExampleBufPipe()
242 self
.p
= PrevControl()
243 self
.p
.data
= Signal(32) # >>in - comes in from the PREVIOUS stage
246 self
.n
= NextControl()
247 self
.n
.data
= Signal(32) # out>> - goes out to the NEXT stage
249 def elaborate(self
, platform
):
251 m
.submodules
.pipe1
= self
.pipe1
252 m
.submodules
.pipe2
= self
.pipe2
254 # connect inter-pipe input/output valid/ready/data
255 m
.d
.comb
+= self
.pipe1
.connect_to_next(self
.pipe2
)
257 # inputs/outputs to the module: pipe1 connections here (LHS)
258 m
.d
.comb
+= self
.pipe1
.connect_in(self
)
260 # now pipe2 connections (RHS)
261 m
.d
.comb
+= self
.pipe2
.connect_out(self
)
266 def __init__(self
, width
, signed
):
267 self
.src1
= Signal((width
, signed
))
268 self
.src2
= Signal((width
, signed
))
269 self
.output
= Signal(width
)
271 def elaborate(self
, platform
):
273 m
.d
.comb
+= self
.output
.eq(Mux(self
.src1
< self
.src2
, 1, 0))
279 self
.slt
= SetLessThan(16, True)
282 return (Signal(16), Signal(16))
287 def setup(self
, m
, i
):
289 m
.submodules
.slt
= self
.slt
290 m
.d
.comb
+= self
.slt
.src1
.eq(i
[0])
291 m
.d
.comb
+= self
.slt
.src2
.eq(i
[1])
292 m
.d
.comb
+= self
.o
.eq(self
.slt
.output
)
294 def process(self
, i
):
298 class ExampleLTCombPipe(CombPipe
):
299 """ an example of how to use the combinatorial pipeline.
304 CombPipe
.__init
__(self
, stage
)
307 def test6_resultfn(o_data
, expected
, i
, o
):
308 res
= 1 if expected
[0] < expected
[1] else 0
309 assert o_data
== res
, \
310 "%d-%d data %x not match %s\n" \
311 % (i
, o
, o_data
, repr(expected
))
316 if __name__
== '__main__':
318 dut
= ExampleBufPipe()
319 run_simulation(dut
, testbench(dut
), vcd_name
="test_bufpipe.vcd")
322 dut
= ExampleBufPipe2()
323 run_simulation(dut
, testbench2(dut
), vcd_name
="test_bufpipe2.vcd")
326 dut
= ExampleBufPipe()
327 test
= Test3(dut
, test3_resultfn
)
328 run_simulation(dut
, [test
.send
, test
.rcv
], vcd_name
="test_bufpipe3.vcd")
331 dut
= ExampleCombPipe()
332 test
= Test3(dut
, test3_resultfn
)
333 run_simulation(dut
, [test
.send
, test
.rcv
], vcd_name
="test_combpipe3.vcd")
336 dut
= ExampleBufPipe2()
337 run_simulation(dut
, testbench4(dut
), vcd_name
="test_bufpipe4.vcd")
340 dut
= ExampleBufPipeAdd()
341 test
= Test5(dut
, test5_resultfn
)
342 run_simulation(dut
, [test
.send
, test
.rcv
], vcd_name
="test_bufpipe5.vcd")
345 dut
= ExampleLTCombPipe()
346 test
= Test5(dut
, test6_resultfn
)
347 run_simulation(dut
, [test
.send
, test
.rcv
], vcd_name
="test_ltcomb6.vcd")