1 from nmigen
import Module
, Signal
, Mux
2 from nmigen
.hdl
.rec
import Record
3 from nmigen
.compat
.sim
import run_simulation
4 from nmigen
.cli
import verilog
, rtlil
6 from example_buf_pipe
import ExampleBufPipe
, ExampleBufPipeAdd
7 from example_buf_pipe
import ExamplePipeline
, Pipeline
, ExampleStageCls
8 from example_buf_pipe
import PrevControl
, NextControl
, BufferedPipeline
9 from example_buf_pipe
import StageChain
11 from random
import randint
14 def check_o_n_valid(dut
, val
):
15 o_n_valid
= yield dut
.n
.o_valid
16 assert o_n_valid
== val
20 #yield dut.i_p_rst.eq(1)
21 yield dut
.n
.i_ready
.eq(0)
22 yield dut
.p
.o_ready
.eq(0)
25 #yield dut.i_p_rst.eq(0)
26 yield dut
.n
.i_ready
.eq(1)
27 yield dut
.p
.i_data
.eq(5)
28 yield dut
.p
.i_valid
.eq(1)
31 yield dut
.p
.i_data
.eq(7)
32 yield from check_o_n_valid(dut
, 0) # effects of i_p_valid delayed
34 yield from check_o_n_valid(dut
, 1) # ok *now* i_p_valid effect is felt
36 yield dut
.p
.i_data
.eq(2)
38 yield dut
.n
.i_ready
.eq(0) # begin going into "stall" (next stage says ready)
39 yield dut
.p
.i_data
.eq(9)
41 yield dut
.p
.i_valid
.eq(0)
42 yield dut
.p
.i_data
.eq(12)
44 yield dut
.p
.i_data
.eq(32)
45 yield dut
.n
.i_ready
.eq(1)
47 yield from check_o_n_valid(dut
, 1) # buffer still needs to output
49 yield from check_o_n_valid(dut
, 1) # buffer still needs to output
51 yield from check_o_n_valid(dut
, 0) # buffer outputted, *now* we're done.
56 #yield dut.p.i_rst.eq(1)
57 yield dut
.n
.i_ready
.eq(0)
58 #yield dut.p.o_ready.eq(0)
61 #yield dut.p.i_rst.eq(0)
62 yield dut
.n
.i_ready
.eq(1)
63 yield dut
.p
.i_data
.eq(5)
64 yield dut
.p
.i_valid
.eq(1)
67 yield dut
.p
.i_data
.eq(7)
68 yield from check_o_n_valid(dut
, 0) # effects of i_p_valid delayed 2 clocks
70 yield from check_o_n_valid(dut
, 0) # effects of i_p_valid delayed 2 clocks
72 yield dut
.p
.i_data
.eq(2)
74 yield from check_o_n_valid(dut
, 1) # ok *now* i_p_valid effect is felt
75 yield dut
.n
.i_ready
.eq(0) # begin going into "stall" (next stage says ready)
76 yield dut
.p
.i_data
.eq(9)
78 yield dut
.p
.i_valid
.eq(0)
79 yield dut
.p
.i_data
.eq(12)
81 yield dut
.p
.i_data
.eq(32)
82 yield dut
.n
.i_ready
.eq(1)
84 yield from check_o_n_valid(dut
, 1) # buffer still needs to output
86 yield from check_o_n_valid(dut
, 1) # buffer still needs to output
88 yield from check_o_n_valid(dut
, 1) # buffer still needs to output
90 yield from check_o_n_valid(dut
, 0) # buffer outputted, *now* we're done.
97 def __init__(self
, dut
, resultfn
):
99 self
.resultfn
= resultfn
101 for i
in range(num_tests
):
102 #data.append(randint(0, 1<<16-1))
103 self
.data
.append(i
+1)
108 while self
.o
!= len(self
.data
):
109 send_range
= randint(0, 3)
110 for j
in range(randint(1,10)):
114 send
= randint(0, send_range
) != 0
115 o_p_ready
= yield self
.dut
.p
.o_ready
119 if send
and self
.i
!= len(self
.data
):
120 yield self
.dut
.p
.i_valid
.eq(1)
121 yield self
.dut
.p
.i_data
.eq(self
.data
[self
.i
])
124 yield self
.dut
.p
.i_valid
.eq(0)
128 while self
.o
!= len(self
.data
):
129 stall_range
= randint(0, 3)
130 for j
in range(randint(1,10)):
131 stall
= randint(0, stall_range
) != 0
132 yield self
.dut
.n
.i_ready
.eq(stall
)
134 o_n_valid
= yield self
.dut
.n
.o_valid
135 i_n_ready
= yield self
.dut
.n
.i_ready
136 if not o_n_valid
or not i_n_ready
:
138 o_data
= yield self
.dut
.n
.o_data
139 self
.resultfn(o_data
, self
.data
[self
.o
], self
.i
, self
.o
)
141 if self
.o
== len(self
.data
):
144 def test3_resultfn(o_data
, expected
, i
, o
):
145 assert o_data
== expected
+ 1, \
146 "%d-%d data %x not match %x\n" \
147 % (i
, o
, o_data
, expected
)
151 for i
in range(num_tests
):
152 data
.append({'src1': randint(0, 1<<16-1),
153 'src2': randint(0, 1<<16-1)})
158 def __init__(self
, dut
, resultfn
, data
=None):
160 self
.resultfn
= resultfn
165 for i
in range(num_tests
):
166 self
.data
.append((randint(0, 1<<16-1), randint(0, 1<<16-1)))
171 while self
.o
!= len(self
.data
):
172 send_range
= randint(0, 3)
173 for j
in range(randint(1,10)):
177 send
= randint(0, send_range
) != 0
178 o_p_ready
= yield self
.dut
.p
.o_ready
182 if send
and self
.i
!= len(self
.data
):
183 yield self
.dut
.p
.i_valid
.eq(1)
184 for v
in self
.dut
.set_input(self
.data
[self
.i
]):
188 yield self
.dut
.p
.i_valid
.eq(0)
192 while self
.o
!= len(self
.data
):
193 stall_range
= randint(0, 3)
194 for j
in range(randint(1,10)):
195 stall
= randint(0, stall_range
) != 0
196 yield self
.dut
.n
.i_ready
.eq(stall
)
198 o_n_valid
= yield self
.dut
.n
.o_valid
199 i_n_ready
= yield self
.dut
.n
.i_ready
200 if not o_n_valid
or not i_n_ready
:
202 if isinstance(self
.dut
.n
.o_data
, Record
):
204 dod
= self
.dut
.n
.o_data
205 for k
, v
in dod
.fields
.items():
208 o_data
= yield self
.dut
.n
.o_data
209 self
.resultfn(o_data
, self
.data
[self
.o
], self
.i
, self
.o
)
211 if self
.o
== len(self
.data
):
214 def test5_resultfn(o_data
, expected
, i
, o
):
215 res
= expected
[0] + expected
[1]
216 assert o_data
== res
, \
217 "%d-%d data %x not match %s\n" \
218 % (i
, o
, o_data
, repr(expected
))
222 for i
in range(num_tests
):
223 #data.append(randint(0, 1<<16-1))
228 stall
= randint(0, 3) != 0
229 send
= randint(0, 5) != 0
230 yield dut
.n
.i_ready
.eq(stall
)
231 o_p_ready
= yield dut
.p
.o_ready
233 if send
and i
!= len(data
):
234 yield dut
.p
.i_valid
.eq(1)
235 yield dut
.p
.i_data
.eq(data
[i
])
238 yield dut
.p
.i_valid
.eq(0)
240 o_n_valid
= yield dut
.n
.o_valid
241 i_n_ready
= yield dut
.n
.i_ready
242 if o_n_valid
and i_n_ready
:
243 o_data
= yield dut
.n
.o_data
244 assert o_data
== data
[o
] + 2, "%d-%d data %x not match %x\n" \
245 % (i
, o
, o_data
, data
[o
])
251 class ExampleBufPipe2
:
253 connect these: ------|---------------|
255 i_p_valid >>in pipe1 o_n_valid out>> i_p_valid >>in pipe2
256 o_p_ready <<out pipe1 i_n_ready <<in o_p_ready <<out pipe2
257 p_i_data >>in pipe1 p_i_data out>> n_o_data >>in pipe2
260 self
.pipe1
= ExampleBufPipe()
261 self
.pipe2
= ExampleBufPipe()
264 self
.p
= PrevControl()
265 self
.p
.i_data
= Signal(32) # >>in - comes in from the PREVIOUS stage
268 self
.n
= NextControl()
269 self
.n
.o_data
= Signal(32) # out>> - goes out to the NEXT stage
271 def elaborate(self
, platform
):
273 m
.submodules
.pipe1
= self
.pipe1
274 m
.submodules
.pipe2
= self
.pipe2
276 # connect inter-pipe input/output valid/ready/data
277 m
.d
.comb
+= self
.pipe1
.connect_to_next(self
.pipe2
)
279 # inputs/outputs to the module: pipe1 connections here (LHS)
280 m
.d
.comb
+= self
.pipe1
.connect_in(self
)
282 # now pipe2 connections (RHS)
283 m
.d
.comb
+= self
.pipe2
.connect_out(self
)
288 class ExampleBufPipeChain2(BufferedPipeline
):
289 """ connects two stages together as a *single* combinatorial stage.
292 stage1
= ExampleStageCls()
293 stage2
= ExampleStageCls()
294 combined
= StageChain([stage1
, stage2
])
295 BufferedPipeline
.__init
__(self
, combined
)
300 for i
in range(num_tests
):
301 data
.append(randint(0, 1<<16-2))
305 def test9_resultfn(o_data
, expected
, i
, o
):
307 assert o_data
== res
, \
308 "%d-%d data %x not match %s\n" \
309 % (i
, o
, o_data
, repr(expected
))
313 def __init__(self
, width
, signed
):
314 self
.src1
= Signal((width
, signed
))
315 self
.src2
= Signal((width
, signed
))
316 self
.output
= Signal(width
)
318 def elaborate(self
, platform
):
320 m
.d
.comb
+= self
.output
.eq(Mux(self
.src1
< self
.src2
, 1, 0))
326 self
.slt
= SetLessThan(16, True)
329 return (Signal(16), Signal(16))
334 def setup(self
, m
, i
):
336 m
.submodules
.slt
= self
.slt
337 m
.d
.comb
+= self
.slt
.src1
.eq(i
[0])
338 m
.d
.comb
+= self
.slt
.src2
.eq(i
[1])
339 m
.d
.comb
+= self
.o
.eq(self
.slt
.output
)
341 def process(self
, i
):
345 class LTStageDerived(SetLessThan
):
348 SetLessThan
.__init
__(self
, 16, True)
351 return (Signal(16), Signal(16))
356 def setup(self
, m
, i
):
357 m
.submodules
.slt
= self
358 m
.d
.comb
+= self
.src1
.eq(i
[0])
359 m
.d
.comb
+= self
.src2
.eq(i
[1])
361 def process(self
, i
):
365 class ExampleLTPipeline(Pipeline
):
366 """ an example of how to use the combinatorial pipeline.
371 Pipeline
.__init
__(self
, stage
)
374 class ExampleLTBufferedPipeDerived(BufferedPipeline
):
375 """ an example of how to use the combinatorial pipeline.
379 stage
= LTStageDerived()
380 BufferedPipeline
.__init
__(self
, stage
)
383 def test6_resultfn(o_data
, expected
, i
, o
):
384 res
= 1 if expected
[0] < expected
[1] else 0
385 assert o_data
== res
, \
386 "%d-%d data %x not match %s\n" \
387 % (i
, o
, o_data
, repr(expected
))
390 class ExampleAddRecordStage
:
391 """ example use of a Record
394 record_spec
= [('src1', 16), ('src2', 16)]
396 """ returns a tuple of input signals which will be the incoming data
398 return Record(self
.record_spec
)
401 return Record(self
.record_spec
)
403 def process(self
, i
):
404 """ process the input data (sums the values in the tuple) and returns it
406 return {'src1': i
.src1
+ 1,
410 class ExampleAddRecordPipe(Pipeline
):
411 """ an example of how to use the combinatorial pipeline.
415 stage
= ExampleAddRecordStage()
416 Pipeline
.__init
__(self
, stage
)
419 def test7_resultfn(o_data
, expected
, i
, o
):
420 res
= (expected
['src1'] + 1, expected
['src2'] + 1)
421 assert o_data
['src1'] == res
[0] and o_data
['src2'] == res
[1], \
422 "%d-%d data %s not match %s\n" \
423 % (i
, o
, repr(o_data
), repr(expected
))
426 class Example2OpClass
:
427 """ an example of a class used to store 2 operands.
428 requires an eq function, to conform with the pipeline stage API
432 self
.op1
= Signal(16)
433 self
.op2
= Signal(16)
436 return [self
.op1
.eq(i
.op1
), self
.op2
.eq(i
.op2
)]
439 class ExampleAddClassStage
:
440 """ an example of how to use the buffered pipeline, as a class instance
444 """ returns an instance of an Example2OpClass.
446 return Example2OpClass()
449 """ returns an output signal which will happen to contain the sum
454 def process(self
, i
):
455 """ process the input data (sums the values in the tuple) and returns it
460 class ExampleBufPipeAddClass(BufferedPipeline
):
461 """ an example of how to use the buffered pipeline, using a class instance
465 addstage
= ExampleAddClassStage()
466 BufferedPipeline
.__init
__(self
, addstage
)
470 """ the eq function, called by set_input, needs an incoming object
471 that conforms to the Example2OpClass.eq function requirements
472 easiest way to do that is to create a class that has the exact
473 same member layout (self.op1, self.op2) as Example2OpClass
475 def __init__(self
, op1
, op2
):
480 def test8_resultfn(o_data
, expected
, i
, o
):
481 res
= expected
.op1
+ expected
.op2
# these are a TestInputAdd instance
482 assert o_data
== res
, \
483 "%d-%d data %x not match %s\n" \
484 % (i
, o
, o_data
, repr(expected
))
488 for i
in range(num_tests
):
489 data
.append(TestInputAdd(randint(0, 1<<16-1), randint(0, 1<<16-1)))
495 if __name__
== '__main__':
497 dut
= ExampleBufPipe()
498 run_simulation(dut
, testbench(dut
), vcd_name
="test_bufpipe.vcd")
501 dut
= ExampleBufPipe2()
502 run_simulation(dut
, testbench2(dut
), vcd_name
="test_bufpipe2.vcd")
505 dut
= ExampleBufPipe()
506 test
= Test3(dut
, test3_resultfn
)
507 run_simulation(dut
, [test
.send
, test
.rcv
], vcd_name
="test_bufpipe3.vcd")
510 dut
= ExamplePipeline()
511 test
= Test3(dut
, test3_resultfn
)
512 run_simulation(dut
, [test
.send
, test
.rcv
], vcd_name
="test_combpipe3.vcd")
515 dut
= ExampleBufPipe2()
516 run_simulation(dut
, testbench4(dut
), vcd_name
="test_bufpipe4.vcd")
519 dut
= ExampleBufPipeAdd()
520 test
= Test5(dut
, test5_resultfn
)
521 run_simulation(dut
, [test
.send
, test
.rcv
], vcd_name
="test_bufpipe5.vcd")
524 dut
= ExampleLTPipeline()
525 test
= Test5(dut
, test6_resultfn
)
526 run_simulation(dut
, [test
.send
, test
.rcv
], vcd_name
="test_ltcomb6.vcd")
528 ports
= [dut
.p
.i_valid
, dut
.n
.i_ready
,
529 dut
.n
.o_valid
, dut
.p
.o_ready
] + \
530 list(dut
.p
.i_data
) + [dut
.n
.o_data
]
531 vl
= rtlil
.convert(dut
, ports
=ports
)
532 with
open("test_ltcomb_pipe.il", "w") as f
:
536 dut
= ExampleAddRecordPipe()
538 test
= Test5(dut
, test7_resultfn
, data
=data
)
539 run_simulation(dut
, [test
.send
, test
.rcv
], vcd_name
="test_addrecord.vcd")
541 ports
= [dut
.p
.i_valid
, dut
.n
.i_ready
,
542 dut
.n
.o_valid
, dut
.p
.o_ready
,
543 dut
.p
.i_data
.src1
, dut
.p
.i_data
.src2
,
544 dut
.n
.o_data
.src1
, dut
.n
.o_data
.src2
]
545 vl
= rtlil
.convert(dut
, ports
=ports
)
546 with
open("test_recordcomb_pipe.il", "w") as f
:
550 dut
= ExampleBufPipeAddClass()
552 test
= Test5(dut
, test8_resultfn
, data
=data
)
553 run_simulation(dut
, [test
.send
, test
.rcv
], vcd_name
="test_bufpipe8.vcd")
556 dut
= ExampleBufPipeChain2()
557 ports
= [dut
.p
.i_valid
, dut
.n
.i_ready
,
558 dut
.n
.o_valid
, dut
.p
.o_ready
] + \
559 [dut
.p
.i_data
] + [dut
.n
.o_data
]
560 vl
= rtlil
.convert(dut
, ports
=ports
)
561 with
open("test_bufpipechain2.il", "w") as f
:
565 test
= Test5(dut
, test9_resultfn
, data
=data
)
566 run_simulation(dut
, [test
.send
, test
.rcv
],
567 vcd_name
="test_bufpipechain2.vcd")
570 dut
= ExampleLTBufferedPipeDerived()
571 test
= Test5(dut
, test6_resultfn
)
572 run_simulation(dut
, [test
.send
, test
.rcv
], vcd_name
="test_ltbufpipe10.vcd")
573 vl
= rtlil
.convert(dut
, ports
=ports
)
574 with
open("test_ltbufpipe10.il", "w") as f
: