6e85f3a1286a3adf64247de75ad0e2c7b3d97348
1 """ key strategic example showing how to do multi-input fan-in into a
2 multi-stage pipeline, then multi-output fanout, with an unary muxid
5 the multiplex ID from the fan-in is passed in to the pipeline, preserved,
6 and used as a routing ID on the fanout.
9 from random
import randint
11 from nmigen
import Module
, Signal
, Cat
, Value
, Elaboratable
12 from nmigen
.compat
.sim
import run_simulation
13 from nmigen
.cli
import verilog
, rtlil
15 from nmutil
.multipipe
import CombMultiOutPipeline
, CombMuxOutPipe
16 from nmutil
.multipipe
import PriorityCombMuxInPipe
17 from nmutil
.singlepipe
import MaskCancellable
, RecordObject
, Object
20 class PassData2(RecordObject
):
22 RecordObject
.__init
__(self
)
23 self
.muxid
= Signal(2, reset_less
=True)
24 self
.idx
= Signal(8, reset_less
=True)
25 self
.data
= Signal(16, reset_less
=True)
28 class PassData(Object
):
31 self
.muxid
= Signal(2, reset_less
=True)
32 self
.idx
= Signal(8, reset_less
=True)
33 self
.data
= Signal(16, reset_less
=True)
37 class PassThroughStage
:
41 return self
.ispec() # same as ospec
44 return i
# pass-through
48 class PassThroughPipe(MaskCancellable
):
49 def __init__(self
, maskwid
):
50 MaskCancellable
.__init
__(self
, PassThroughStage(), maskwid
)
54 def __init__(self
, dut
):
59 for muxid
in range(dut
.num_rows
):
62 for i
in range(self
.tlen
):
63 self
.di
[muxid
][i
] = randint(0, 255) + (muxid
<<8)
64 self
.do
[muxid
][i
] = self
.di
[muxid
][i
]
66 def send(self
, muxid
):
67 for i
in range(self
.tlen
):
68 op2
= self
.di
[muxid
][i
]
69 rs
= self
.dut
.p
[muxid
]
70 yield rs
.valid_i
.eq(1)
71 yield rs
.data_i
.data
.eq(op2
)
72 yield rs
.data_i
.idx
.eq(i
)
73 yield rs
.data_i
.muxid
.eq(muxid
)
76 o_p_ready
= yield rs
.ready_o
79 o_p_ready
= yield rs
.ready_o
81 print ("send", muxid
, i
, hex(op2
))
83 yield rs
.valid_i
.eq(0)
84 # wait random period of time before queueing another value
85 for i
in range(randint(0, 3)):
88 yield rs
.valid_i
.eq(0)
91 print ("send ended", muxid
)
93 ## wait random period of time before queueing another value
94 #for i in range(randint(0, 3)):
97 #send_range = randint(0, 3)
101 # send = randint(0, send_range) != 0
103 def rcv(self
, muxid
):
105 #stall_range = randint(0, 3)
106 #for j in range(randint(1,10)):
107 # stall = randint(0, stall_range) != 0
108 # yield self.dut.n[0].ready_i.eq(stall)
110 n
= self
.dut
.n
[muxid
]
111 yield n
.ready_i
.eq(1)
113 o_n_valid
= yield n
.valid_o
114 i_n_ready
= yield n
.ready_i
115 if not o_n_valid
or not i_n_ready
:
118 out_muxid
= yield n
.data_o
.muxid
119 out_i
= yield n
.data_o
.idx
120 out_v
= yield n
.data_o
.data
122 print ("recv", out_muxid
, out_i
, hex(out_v
))
124 # see if this output has occurred already, delete it if it has
125 assert muxid
== out_muxid
, \
126 "out_muxid %d not correct %d" % (out_muxid
, muxid
)
127 assert out_i
in self
.do
[muxid
], "out_i %d not in array %s" % \
128 (out_i
, repr(self
.do
[muxid
]))
129 assert self
.do
[muxid
][out_i
] == out_v
# pass-through data
130 del self
.do
[muxid
][out_i
]
132 # check if there's any more outputs
133 if len(self
.do
[muxid
]) == 0:
135 print ("recv ended", muxid
)
138 class TestPriorityMuxPipe(PriorityCombMuxInPipe
):
139 def __init__(self
, num_rows
):
140 self
.num_rows
= num_rows
141 stage
= PassThroughStage()
142 PriorityCombMuxInPipe
.__init
__(self
, stage
,
143 p_len
=self
.num_rows
, maskwid
=1)
147 def __init__(self
, dut
):
152 for i
in range(self
.tlen
* dut
.num_rows
):
156 muxid
= randint(0, dut
.num_rows
-1)
157 data
= randint(0, 255) + (muxid
<<8)
160 for i
in range(self
.tlen
* dut
.num_rows
):
162 muxid
= self
.di
[i
][1]
164 yield rs
.valid_i
.eq(1)
165 yield rs
.data_i
.data
.eq(op2
)
166 yield rs
.data_i
.muxid
.eq(muxid
)
168 o_p_ready
= yield rs
.ready_o
171 o_p_ready
= yield rs
.ready_o
173 print ("send", muxid
, i
, hex(op2
))
175 yield rs
.valid_i
.eq(0)
176 # wait random period of time before queueing another value
177 for i
in range(randint(0, 3)):
180 yield rs
.valid_i
.eq(0)
183 class TestMuxOutPipe(CombMuxOutPipe
):
184 def __init__(self
, num_rows
):
185 self
.num_rows
= num_rows
186 stage
= PassThroughStage()
187 CombMuxOutPipe
.__init
__(self
, stage
, n_len
=self
.num_rows
,
191 class TestInOutPipe(Elaboratable
):
192 def __init__(self
, num_rows
=4):
193 self
.num_rows
= nr
= num_rows
194 self
.inpipe
= TestPriorityMuxPipe(nr
) # fan-in (combinatorial)
195 self
.pipe1
= PassThroughPipe(nr
) # stage 1 (clock-sync)
196 self
.pipe2
= PassThroughPipe(nr
) # stage 2 (clock-sync)
197 self
.outpipe
= TestMuxOutPipe(nr
) # fan-out (combinatorial)
199 self
.p
= self
.inpipe
.p
# kinda annoying,
200 self
.n
= self
.outpipe
.n
# use pipe in/out as this class in/out
201 self
._ports
= self
.inpipe
.ports() + self
.outpipe
.ports()
203 def elaborate(self
, platform
):
205 m
.submodules
.inpipe
= self
.inpipe
206 m
.submodules
.pipe1
= self
.pipe1
207 m
.submodules
.pipe2
= self
.pipe2
208 m
.submodules
.outpipe
= self
.outpipe
210 m
.d
.comb
+= self
.inpipe
.n
.connect_to_next(self
.pipe1
.p
)
211 m
.d
.comb
+= self
.pipe1
.connect_to_next(self
.pipe2
)
212 m
.d
.comb
+= self
.pipe2
.connect_to_next(self
.outpipe
)
221 dut
= TestInOutPipe()
222 vl
= rtlil
.convert(dut
, ports
=dut
.ports())
223 with
open("test_inoutmux_pipe.il", "w") as f
:
225 #run_simulation(dut, testbench(dut), vcd_name="test_inputgroup.vcd")
227 test
= InputTest(dut
)
228 run_simulation(dut
, [test
.rcv(1), test
.rcv(0),
229 test
.rcv(3), test
.rcv(2),
230 test
.send(0), test
.send(1),
231 test
.send(3), test
.send(2),
233 vcd_name
="test_inoutmux_pipe.vcd")
235 if __name__
== '__main__':