1 """ key strategic example showing how to do multi-input fan-in into a
2 multi-stage pipeline, then multi-output fanout.
4 the multiplex ID from the fan-in is passed in to the pipeline, preserved,
5 and used as a routing ID on the fanout.
8 from random
import randint
10 from nmigen
import Module
, Signal
, Cat
, Value
, Elaboratable
11 from nmigen
.compat
.sim
import run_simulation
12 from nmigen
.cli
import verilog
, rtlil
14 from nmutil
.multipipe
import CombMultiOutPipeline
, CombMuxOutPipe
15 from nmutil
.multipipe
import PriorityCombMuxInPipe
16 from nmutil
.singlepipe
import SimpleHandshake
, RecordObject
, Object
19 class PassData2(RecordObject
):
21 RecordObject
.__init
__(self
)
22 self
.muxid
= Signal(2, reset_less
=True)
23 self
.idx
= Signal(8, reset_less
=True)
24 self
.data
= Signal(16, reset_less
=True)
27 class PassData(Object
):
30 self
.muxid
= Signal(2, reset_less
=True)
31 self
.idx
= Signal(8, reset_less
=True)
32 self
.data
= Signal(16, reset_less
=True)
35 class PassThroughStage
:
40 return self
.ispec() # same as ospec
43 return i
# pass-through
46 class PassThroughPipe(SimpleHandshake
):
48 SimpleHandshake
.__init
__(self
, PassThroughStage())
52 def __init__(self
, dut
):
57 for muxid
in range(dut
.num_rows
):
60 for i
in range(self
.tlen
):
61 self
.di
[muxid
][i
] = randint(0, 255) + (muxid
<< 8)
62 self
.do
[muxid
][i
] = self
.di
[muxid
][i
]
64 def send(self
, muxid
):
65 for i
in range(self
.tlen
):
66 op2
= self
.di
[muxid
][i
]
67 rs
= self
.dut
.p
[muxid
]
68 yield rs
.i_valid
.eq(1)
69 yield rs
.i_data
.data
.eq(op2
)
70 yield rs
.i_data
.idx
.eq(i
)
71 yield rs
.i_data
.muxid
.eq(muxid
)
73 o_p_ready
= yield rs
.o_ready
76 o_p_ready
= yield rs
.o_ready
78 print("send", muxid
, i
, hex(op2
))
80 yield rs
.i_valid
.eq(0)
81 # wait random period of time before queueing another value
82 for i
in range(randint(0, 3)):
85 yield rs
.i_valid
.eq(0)
88 print("send ended", muxid
)
90 # wait random period of time before queueing another value
91 # for i in range(randint(0, 3)):
94 #send_range = randint(0, 3)
98 # send = randint(0, send_range) != 0
100 def rcv(self
, muxid
):
102 #stall_range = randint(0, 3)
103 # for j in range(randint(1,10)):
104 # stall = randint(0, stall_range) != 0
105 # yield self.dut.n[0].i_ready.eq(stall)
107 n
= self
.dut
.n
[muxid
]
108 yield n
.i_ready
.eq(1)
110 o_n_valid
= yield n
.o_valid
111 i_n_ready
= yield n
.i_ready
112 if not o_n_valid
or not i_n_ready
:
115 out_muxid
= yield n
.o_data
.muxid
116 out_i
= yield n
.o_data
.idx
117 out_v
= yield n
.o_data
.data
119 print("recv", out_muxid
, out_i
, hex(out_v
))
121 # see if this output has occurred already, delete it if it has
122 assert muxid
== out_muxid
, \
123 "out_muxid %d not correct %d" % (out_muxid
, muxid
)
124 assert out_i
in self
.do
[muxid
], "out_i %d not in array %s" % \
125 (out_i
, repr(self
.do
[muxid
]))
126 assert self
.do
[muxid
][out_i
] == out_v
# pass-through data
127 del self
.do
[muxid
][out_i
]
129 # check if there's any more outputs
130 if len(self
.do
[muxid
]) == 0:
132 print("recv ended", muxid
)
135 class TestPriorityMuxPipe(PriorityCombMuxInPipe
):
136 def __init__(self
, num_rows
):
137 self
.num_rows
= num_rows
138 stage
= PassThroughStage()
139 PriorityCombMuxInPipe
.__init
__(self
, stage
, p_len
=self
.num_rows
)
143 def __init__(self
, dut
):
148 for i
in range(self
.tlen
* dut
.num_rows
):
152 muxid
= randint(0, dut
.num_rows
-1)
153 data
= randint(0, 255) + (muxid
<< 8)
156 for i
in range(self
.tlen
* dut
.num_rows
):
158 muxid
= self
.di
[i
][1]
160 yield rs
.i_valid
.eq(1)
161 yield rs
.i_data
.data
.eq(op2
)
162 yield rs
.i_data
.muxid
.eq(muxid
)
164 o_p_ready
= yield rs
.o_ready
167 o_p_ready
= yield rs
.o_ready
169 print("send", muxid
, i
, hex(op2
))
171 yield rs
.i_valid
.eq(0)
172 # wait random period of time before queueing another value
173 for i
in range(randint(0, 3)):
176 yield rs
.i_valid
.eq(0)
179 class TestMuxOutPipe(CombMuxOutPipe
):
180 def __init__(self
, num_rows
):
181 self
.num_rows
= num_rows
182 stage
= PassThroughStage()
183 CombMuxOutPipe
.__init
__(self
, stage
, n_len
=self
.num_rows
)
186 class TestInOutPipe(Elaboratable
):
187 def __init__(self
, num_rows
=4):
188 self
.num_rows
= num_rows
189 self
.inpipe
= TestPriorityMuxPipe(num_rows
) # fan-in (combinatorial)
190 self
.pipe1
= PassThroughPipe() # stage 1 (clock-sync)
191 self
.pipe2
= PassThroughPipe() # stage 2 (clock-sync)
192 self
.outpipe
= TestMuxOutPipe(num_rows
) # fan-out (combinatorial)
194 self
.p
= self
.inpipe
.p
# kinda annoying,
195 self
.n
= self
.outpipe
.n
# use pipe in/out as this class in/out
196 self
._ports
= self
.inpipe
.ports() + self
.outpipe
.ports()
198 def elaborate(self
, platform
):
200 m
.submodules
.inpipe
= self
.inpipe
201 m
.submodules
.pipe1
= self
.pipe1
202 m
.submodules
.pipe2
= self
.pipe2
203 m
.submodules
.outpipe
= self
.outpipe
205 m
.d
.comb
+= self
.inpipe
.n
.connect_to_next(self
.pipe1
.p
)
206 m
.d
.comb
+= self
.pipe1
.connect_to_next(self
.pipe2
)
207 m
.d
.comb
+= self
.pipe2
.connect_to_next(self
.outpipe
)
216 dut
= TestInOutPipe()
217 vl
= rtlil
.convert(dut
, ports
=dut
.ports())
218 with
open("test_inoutmux_pipe.il", "w") as f
:
220 #run_simulation(dut, testbench(dut), vcd_name="test_inputgroup.vcd")
222 test
= InputTest(dut
)
223 run_simulation(dut
, [test
.rcv(1), test
.rcv(0),
224 test
.rcv(3), test
.rcv(2),
225 test
.send(0), test
.send(1),
226 test
.send(3), test
.send(2),
228 vcd_name
="test_inoutmux_pipe.vcd")
231 if __name__
== '__main__':