1 """ Unit tests for Buffered and Unbuffered pipelines
3 contains useful worked examples of how to use the Pipeline API,
6 * Combinatorial Stage "Chaining"
7 * class-based data stages
8 * nmigen module-based data stages
9 * special nmigen module-based data stage, where the stage *is* the module
10 * Record-based data stages
11 * static-class data stages
12 * multi-stage pipelines (and how to connect them)
13 * how to *use* the pipelines (see Test5) - how to get data in and out
17 from nmigen
import Module
, Signal
, Mux
, Const
18 from nmigen
.hdl
.rec
import Record
19 from nmigen
.compat
.sim
import run_simulation
20 from nmigen
.cli
import verilog
, rtlil
22 from example_buf_pipe
import ExampleBufPipe
, ExampleBufPipeAdd
23 from example_buf_pipe
import ExamplePipeline
, UnbufferedPipeline
24 from example_buf_pipe
import ExampleStageCls
25 from example_buf_pipe
import PrevControl
, NextControl
, BufferedPipeline
26 from example_buf_pipe
import StageChain
, ControlBase
, StageCls
28 from random
import randint
31 def check_o_n_valid(dut
, val
):
32 o_n_valid
= yield dut
.n
.o_valid
33 assert o_n_valid
== val
35 def check_o_n_valid2(dut
, val
):
36 o_n_valid
= yield dut
.n
.o_valid
37 assert o_n_valid
== val
41 #yield dut.i_p_rst.eq(1)
42 yield dut
.n
.i_ready
.eq(0)
43 yield dut
.p
.o_ready
.eq(0)
46 #yield dut.i_p_rst.eq(0)
47 yield dut
.n
.i_ready
.eq(1)
48 yield dut
.p
.i_data
.eq(5)
49 yield dut
.p
.i_valid
.eq(1)
52 yield dut
.p
.i_data
.eq(7)
53 yield from check_o_n_valid(dut
, 0) # effects of i_p_valid delayed
55 yield from check_o_n_valid(dut
, 1) # ok *now* i_p_valid effect is felt
57 yield dut
.p
.i_data
.eq(2)
59 yield dut
.n
.i_ready
.eq(0) # begin going into "stall" (next stage says ready)
60 yield dut
.p
.i_data
.eq(9)
62 yield dut
.p
.i_valid
.eq(0)
63 yield dut
.p
.i_data
.eq(12)
65 yield dut
.p
.i_data
.eq(32)
66 yield dut
.n
.i_ready
.eq(1)
68 yield from check_o_n_valid(dut
, 1) # buffer still needs to output
70 yield from check_o_n_valid(dut
, 1) # buffer still needs to output
72 yield from check_o_n_valid(dut
, 0) # buffer outputted, *now* we're done.
77 #yield dut.p.i_rst.eq(1)
78 yield dut
.n
.i_ready
.eq(0)
79 #yield dut.p.o_ready.eq(0)
82 #yield dut.p.i_rst.eq(0)
83 yield dut
.n
.i_ready
.eq(1)
84 yield dut
.p
.i_data
.eq(5)
85 yield dut
.p
.i_valid
.eq(1)
88 yield dut
.p
.i_data
.eq(7)
89 yield from check_o_n_valid2(dut
, 0) # effects of i_p_valid delayed 2 clocks
91 yield from check_o_n_valid2(dut
, 0) # effects of i_p_valid delayed 2 clocks
93 yield dut
.p
.i_data
.eq(2)
95 yield from check_o_n_valid2(dut
, 1) # ok *now* i_p_valid effect is felt
96 yield dut
.n
.i_ready
.eq(0) # begin going into "stall" (next stage says ready)
97 yield dut
.p
.i_data
.eq(9)
99 yield dut
.p
.i_valid
.eq(0)
100 yield dut
.p
.i_data
.eq(12)
102 yield dut
.p
.i_data
.eq(32)
103 yield dut
.n
.i_ready
.eq(1)
105 yield from check_o_n_valid2(dut
, 1) # buffer still needs to output
107 yield from check_o_n_valid2(dut
, 1) # buffer still needs to output
109 yield from check_o_n_valid2(dut
, 1) # buffer still needs to output
111 yield from check_o_n_valid2(dut
, 0) # buffer outputted, *now* we're done.
118 def __init__(self
, dut
, resultfn
):
120 self
.resultfn
= resultfn
122 for i
in range(num_tests
):
123 #data.append(randint(0, 1<<16-1))
124 self
.data
.append(i
+1)
129 while self
.o
!= len(self
.data
):
130 send_range
= randint(0, 3)
131 for j
in range(randint(1,10)):
135 send
= randint(0, send_range
) != 0
136 o_p_ready
= yield self
.dut
.p
.o_ready
140 if send
and self
.i
!= len(self
.data
):
141 yield self
.dut
.p
.i_valid
.eq(1)
142 yield self
.dut
.p
.i_data
.eq(self
.data
[self
.i
])
145 yield self
.dut
.p
.i_valid
.eq(0)
149 while self
.o
!= len(self
.data
):
150 stall_range
= randint(0, 3)
151 for j
in range(randint(1,10)):
152 stall
= randint(0, stall_range
) != 0
153 yield self
.dut
.n
.i_ready
.eq(stall
)
155 o_n_valid
= yield self
.dut
.n
.o_valid
156 i_n_ready
= yield self
.dut
.n
.i_ready_test
157 if not o_n_valid
or not i_n_ready
:
159 o_data
= yield self
.dut
.n
.o_data
160 self
.resultfn(o_data
, self
.data
[self
.o
], self
.i
, self
.o
)
162 if self
.o
== len(self
.data
):
165 def test3_resultfn(o_data
, expected
, i
, o
):
166 assert o_data
== expected
+ 1, \
167 "%d-%d data %x not match %x\n" \
168 % (i
, o
, o_data
, expected
)
170 def data_placeholder():
172 for i
in range(num_tests
):
174 d
.src1
= randint(0, 1<<16-1)
175 d
.src2
= randint(0, 1<<16-1)
181 for i
in range(num_tests
):
182 data
.append({'src1': randint(0, 1<<16-1),
183 'src2': randint(0, 1<<16-1)})
188 def __init__(self
, dut
, resultfn
, data
=None, stage_ctl
=False):
190 self
.resultfn
= resultfn
191 self
.stage_ctl
= stage_ctl
196 for i
in range(num_tests
):
197 self
.data
.append((randint(0, 1<<16-1), randint(0, 1<<16-1)))
202 while self
.o
!= len(self
.data
):
203 send_range
= randint(0, 3)
204 for j
in range(randint(1,10)):
208 send
= randint(0, send_range
) != 0
210 o_p_ready
= yield self
.dut
.p
.o_ready
214 if send
and self
.i
!= len(self
.data
):
215 yield self
.dut
.p
.i_valid
.eq(1)
216 for v
in self
.dut
.set_input(self
.data
[self
.i
]):
220 yield self
.dut
.p
.i_valid
.eq(0)
224 while self
.o
!= len(self
.data
):
225 stall_range
= randint(0, 3)
226 for j
in range(randint(1,10)):
227 stall
= randint(0, stall_range
) != 0
228 yield self
.dut
.n
.i_ready
.eq(stall
)
230 o_n_valid
= yield self
.dut
.n
.o_valid
231 i_n_ready
= yield self
.dut
.n
.i_ready_test
232 if not o_n_valid
or not i_n_ready
:
234 if isinstance(self
.dut
.n
.o_data
, Record
):
236 dod
= self
.dut
.n
.o_data
237 for k
, v
in dod
.fields
.items():
240 o_data
= yield self
.dut
.n
.o_data
241 self
.resultfn(o_data
, self
.data
[self
.o
], self
.i
, self
.o
)
243 if self
.o
== len(self
.data
):
246 def test5_resultfn(o_data
, expected
, i
, o
):
247 res
= expected
[0] + expected
[1]
248 assert o_data
== res
, \
249 "%d-%d data %x not match %s\n" \
250 % (i
, o
, o_data
, repr(expected
))
254 for i
in range(num_tests
):
255 #data.append(randint(0, 1<<16-1))
260 stall
= randint(0, 3) != 0
261 send
= randint(0, 5) != 0
262 yield dut
.n
.i_ready
.eq(stall
)
263 o_p_ready
= yield dut
.p
.o_ready
265 if send
and i
!= len(data
):
266 yield dut
.p
.i_valid
.eq(1)
267 yield dut
.p
.i_data
.eq(data
[i
])
270 yield dut
.p
.i_valid
.eq(0)
272 o_n_valid
= yield dut
.n
.o_valid
273 i_n_ready
= yield dut
.n
.i_ready_test
274 if o_n_valid
and i_n_ready
:
275 o_data
= yield dut
.n
.o_data
276 assert o_data
== data
[o
] + 2, "%d-%d data %x not match %x\n" \
277 % (i
, o
, o_data
, data
[o
])
282 ######################################################################
284 ######################################################################
286 class ExampleBufPipe2(ControlBase
):
287 """ Example of how to do chained pipeline stages.
290 def elaborate(self
, platform
):
293 pipe1
= ExampleBufPipe()
294 pipe2
= ExampleBufPipe()
296 m
.submodules
.pipe1
= pipe1
297 m
.submodules
.pipe2
= pipe2
299 m
.d
.comb
+= self
.connect([pipe1
, pipe2
])
304 ######################################################################
306 ######################################################################
308 class ExampleBufPipeChain2(BufferedPipeline
):
309 """ connects two stages together as a *single* combinatorial stage.
312 stage1
= ExampleStageCls()
313 stage2
= ExampleStageCls()
314 combined
= StageChain([stage1
, stage2
])
315 BufferedPipeline
.__init
__(self
, combined
)
320 for i
in range(num_tests
):
321 data
.append(randint(0, 1<<16-2))
325 def test9_resultfn(o_data
, expected
, i
, o
):
327 assert o_data
== res
, \
328 "%d-%d data %x not match %s\n" \
329 % (i
, o
, o_data
, repr(expected
))
332 ######################################################################
334 ######################################################################
337 def __init__(self
, width
, signed
):
339 self
.src1
= Signal((width
, signed
), name
="src1")
340 self
.src2
= Signal((width
, signed
), name
="src2")
341 self
.output
= Signal(width
, name
="out")
343 def elaborate(self
, platform
):
344 self
.m
.d
.comb
+= self
.output
.eq(Mux(self
.src1
< self
.src2
, 1, 0))
348 class LTStage(StageCls
):
349 """ module-based stage example
352 self
.slt
= SetLessThan(16, True)
355 return (Signal(16, name
="sig1"), Signal(16, "sig2"))
358 return Signal(16, "out")
360 def setup(self
, m
, i
):
362 m
.submodules
.slt
= self
.slt
363 m
.d
.comb
+= self
.slt
.src1
.eq(i
[0])
364 m
.d
.comb
+= self
.slt
.src2
.eq(i
[1])
365 m
.d
.comb
+= self
.o
.eq(self
.slt
.output
)
367 def process(self
, i
):
371 class LTStageDerived(SetLessThan
, StageCls
):
372 """ special version of a nmigen module where the module is also a stage
374 shows that you don't actually need to combinatorially connect
375 to the outputs, or add the module as a submodule: just return
376 the module output parameter(s) from the Stage.process() function
380 SetLessThan
.__init
__(self
, 16, True)
383 return (Signal(16), Signal(16))
388 def setup(self
, m
, i
):
389 m
.submodules
.slt
= self
390 m
.d
.comb
+= self
.src1
.eq(i
[0])
391 m
.d
.comb
+= self
.src2
.eq(i
[1])
393 def process(self
, i
):
397 class ExampleLTPipeline(UnbufferedPipeline
):
398 """ an example of how to use the unbuffered pipeline.
403 UnbufferedPipeline
.__init
__(self
, stage
)
406 class ExampleLTBufferedPipeDerived(BufferedPipeline
):
407 """ an example of how to use the buffered pipeline.
411 stage
= LTStageDerived()
412 BufferedPipeline
.__init
__(self
, stage
)
415 def test6_resultfn(o_data
, expected
, i
, o
):
416 res
= 1 if expected
[0] < expected
[1] else 0
417 assert o_data
== res
, \
418 "%d-%d data %x not match %s\n" \
419 % (i
, o
, o_data
, repr(expected
))
422 ######################################################################
424 ######################################################################
426 class ExampleAddRecordStage(StageCls
):
427 """ example use of a Record
430 record_spec
= [('src1', 16), ('src2', 16)]
432 """ returns a Record using the specification
434 return Record(self
.record_spec
)
437 return Record(self
.record_spec
)
439 def process(self
, i
):
440 """ process the input data, returning a dictionary with key names
441 that exactly match the Record's attributes.
443 return {'src1': i
.src1
+ 1,
446 ######################################################################
448 ######################################################################
450 class ExampleAddRecordPlaceHolderStage(StageCls
):
451 """ example use of a Record, with a placeholder as the processing result
454 record_spec
= [('src1', 16), ('src2', 16)]
456 """ returns a Record using the specification
458 return Record(self
.record_spec
)
461 return Record(self
.record_spec
)
463 def process(self
, i
):
464 """ process the input data, returning a PlaceHolder class instance
465 with attributes that exactly match those of the Record.
473 class PlaceHolder
: pass
476 class ExampleAddRecordPipe(UnbufferedPipeline
):
477 """ an example of how to use the combinatorial pipeline.
481 stage
= ExampleAddRecordStage()
482 UnbufferedPipeline
.__init
__(self
, stage
)
485 def test7_resultfn(o_data
, expected
, i
, o
):
486 res
= (expected
['src1'] + 1, expected
['src2'] + 1)
487 assert o_data
['src1'] == res
[0] and o_data
['src2'] == res
[1], \
488 "%d-%d data %s not match %s\n" \
489 % (i
, o
, repr(o_data
), repr(expected
))
492 class ExampleAddRecordPlaceHolderPipe(UnbufferedPipeline
):
493 """ an example of how to use the combinatorial pipeline.
497 stage
= ExampleAddRecordPlaceHolderStage()
498 UnbufferedPipeline
.__init
__(self
, stage
)
501 def test11_resultfn(o_data
, expected
, i
, o
):
502 res1
= expected
.src1
+ 1
503 res2
= expected
.src2
+ 1
504 assert o_data
['src1'] == res1
and o_data
['src2'] == res2
, \
505 "%d-%d data %s not match %s\n" \
506 % (i
, o
, repr(o_data
), repr(expected
))
509 ######################################################################
511 ######################################################################
514 class Example2OpClass
:
515 """ an example of a class used to store 2 operands.
516 requires an eq function, to conform with the pipeline stage API
520 self
.op1
= Signal(16)
521 self
.op2
= Signal(16)
524 return [self
.op1
.eq(i
.op1
), self
.op2
.eq(i
.op2
)]
527 class ExampleAddClassStage(StageCls
):
528 """ an example of how to use the buffered pipeline, as a class instance
532 """ returns an instance of an Example2OpClass.
534 return Example2OpClass()
537 """ returns an output signal which will happen to contain the sum
542 def process(self
, i
):
543 """ process the input data (sums the values in the tuple) and returns it
548 class ExampleBufPipeAddClass(BufferedPipeline
):
549 """ an example of how to use the buffered pipeline, using a class instance
553 addstage
= ExampleAddClassStage()
554 BufferedPipeline
.__init
__(self
, addstage
)
558 """ the eq function, called by set_input, needs an incoming object
559 that conforms to the Example2OpClass.eq function requirements
560 easiest way to do that is to create a class that has the exact
561 same member layout (self.op1, self.op2) as Example2OpClass
563 def __init__(self
, op1
, op2
):
568 def test8_resultfn(o_data
, expected
, i
, o
):
569 res
= expected
.op1
+ expected
.op2
# these are a TestInputAdd instance
570 assert o_data
== res
, \
571 "%d-%d data %x not match %s\n" \
572 % (i
, o
, o_data
, repr(expected
))
576 for i
in range(num_tests
):
577 data
.append(TestInputAdd(randint(0, 1<<16-1), randint(0, 1<<16-1)))
581 ######################################################################
583 ######################################################################
585 class ExampleStageDelayCls(StageCls
):
586 """ an example of how to use the buffered pipeline, in a static class
591 self
.count
= Signal(2)
594 return Signal(16, name
="example_input_signal")
597 return Signal(16, name
="example_output_signal")
602 return self
.count
== 2
606 return self
.count
== 0
609 def process(self
, i
):
610 """ process the input data and returns it (adds 1)
614 def elaborate(self
, platform
):
616 m
.d
.sync
+= self
.count
.eq(self
.count
+ 1)
620 class ExampleBufDelayedPipe(BufferedPipeline
):
621 """ an example of how to use the buffered pipeline.
625 stage
= ExampleStageDelayCls()
626 BufferedPipeline
.__init
__(self
, stage
, stage_ctl
=True)
628 def elaborate(self
, platform
):
629 m
= BufferedPipeline
.elaborate(self
, platform
)
630 m
.submodules
.stage
= self
.stage
634 class ExampleBufPipe3(ControlBase
):
635 """ Example of how to do delayed pipeline, where the stage signals
639 def elaborate(self
, platform
):
640 m
= ControlBase
._elaborate
(self
, platform
)
642 #pipe1 = ExampleBufPipe()
643 pipe1
= ExampleBufDelayedPipe()
644 pipe2
= ExampleBufDelayedPipe()
646 m
.submodules
.pipe1
= pipe1
647 m
.submodules
.pipe2
= pipe2
649 m
.d
.comb
+= self
.connect([pipe1
, pipe2
])
655 for i
in range(num_tests
):
656 data
.append(randint(0, 1<<16-2))
660 def test12_resultfn(o_data
, expected
, i
, o
):
662 assert o_data
== res
, \
663 "%d-%d data %x not match %x\n" \
664 % (i
, o
, o_data
, res
)
667 ######################################################################
669 ######################################################################
673 if __name__
== '__main__':
675 dut
= ExampleBufPipe()
676 run_simulation(dut
, testbench(dut
), vcd_name
="test_bufpipe.vcd")
679 dut
= ExampleBufPipe2()
680 run_simulation(dut
, testbench2(dut
), vcd_name
="test_bufpipe2.vcd")
681 ports
= [dut
.p
.i_valid
, dut
.n
.i_ready
,
682 dut
.n
.o_valid
, dut
.p
.o_ready
] + \
683 [dut
.p
.i_data
] + [dut
.n
.o_data
]
684 vl
= rtlil
.convert(dut
, ports
=ports
)
685 with
open("test_bufpipe2.il", "w") as f
:
690 dut
= ExampleBufPipe()
691 test
= Test3(dut
, test3_resultfn
)
692 run_simulation(dut
, [test
.send
, test
.rcv
], vcd_name
="test_bufpipe3.vcd")
695 dut
= ExamplePipeline()
696 test
= Test3(dut
, test3_resultfn
)
697 run_simulation(dut
, [test
.send
, test
.rcv
], vcd_name
="test_combpipe3.vcd")
700 dut
= ExampleBufPipe2()
701 run_simulation(dut
, testbench4(dut
), vcd_name
="test_bufpipe4.vcd")
704 dut
= ExampleBufPipeAdd()
705 test
= Test5(dut
, test5_resultfn
, stage_ctl
=True)
706 run_simulation(dut
, [test
.send
, test
.rcv
], vcd_name
="test_bufpipe5.vcd")
709 dut
= ExampleLTPipeline()
710 test
= Test5(dut
, test6_resultfn
)
711 run_simulation(dut
, [test
.send
, test
.rcv
], vcd_name
="test_ltcomb6.vcd")
713 ports
= [dut
.p
.i_valid
, dut
.n
.i_ready
,
714 dut
.n
.o_valid
, dut
.p
.o_ready
] + \
715 list(dut
.p
.i_data
) + [dut
.n
.o_data
]
716 vl
= rtlil
.convert(dut
, ports
=ports
)
717 with
open("test_ltcomb_pipe.il", "w") as f
:
721 dut
= ExampleAddRecordPipe()
723 test
= Test5(dut
, test7_resultfn
, data
=data
)
724 run_simulation(dut
, [test
.send
, test
.rcv
], vcd_name
="test_addrecord.vcd")
726 ports
= [dut
.p
.i_valid
, dut
.n
.i_ready
,
727 dut
.n
.o_valid
, dut
.p
.o_ready
,
728 dut
.p
.i_data
.src1
, dut
.p
.i_data
.src2
,
729 dut
.n
.o_data
.src1
, dut
.n
.o_data
.src2
]
730 vl
= rtlil
.convert(dut
, ports
=ports
)
731 with
open("test_recordcomb_pipe.il", "w") as f
:
735 dut
= ExampleBufPipeAddClass()
737 test
= Test5(dut
, test8_resultfn
, data
=data
)
738 run_simulation(dut
, [test
.send
, test
.rcv
], vcd_name
="test_bufpipe8.vcd")
741 dut
= ExampleBufPipeChain2()
742 ports
= [dut
.p
.i_valid
, dut
.n
.i_ready
,
743 dut
.n
.o_valid
, dut
.p
.o_ready
] + \
744 [dut
.p
.i_data
] + [dut
.n
.o_data
]
745 vl
= rtlil
.convert(dut
, ports
=ports
)
746 with
open("test_bufpipechain2.il", "w") as f
:
750 test
= Test5(dut
, test9_resultfn
, data
=data
)
751 run_simulation(dut
, [test
.send
, test
.rcv
],
752 vcd_name
="test_bufpipechain2.vcd")
755 dut
= ExampleLTBufferedPipeDerived()
756 test
= Test5(dut
, test6_resultfn
)
757 run_simulation(dut
, [test
.send
, test
.rcv
], vcd_name
="test_ltbufpipe10.vcd")
758 vl
= rtlil
.convert(dut
, ports
=ports
)
759 with
open("test_ltbufpipe10.il", "w") as f
:
763 dut
= ExampleAddRecordPlaceHolderPipe()
764 data
=data_placeholder()
765 test
= Test5(dut
, test11_resultfn
, data
=data
)
766 run_simulation(dut
, [test
.send
, test
.rcv
], vcd_name
="test_addrecord.vcd")
770 #dut = ExampleBufPipe3()
771 dut
= ExampleBufDelayedPipe()
773 test
= Test5(dut
, test12_resultfn
, data
=data
)
774 run_simulation(dut
, [test
.send
, test
.rcv
], vcd_name
="test_bufpipe12.vcd")
775 ports
= [dut
.p
.i_valid
, dut
.n
.i_ready
,
776 dut
.n
.o_valid
, dut
.p
.o_ready
] + \
777 [dut
.p
.i_data
] + [dut
.n
.o_data
]
778 vl
= rtlil
.convert(dut
, ports
=ports
)
779 with
open("test_bufpipe12.il", "w") as f
: