1 from random
import randint
3 from nmigen
import Module
, Signal
, Cat
, Elaboratable
4 from nmigen
.compat
.sim
import run_simulation
5 from nmigen
.cli
import verilog
, rtlil
7 from nmutil
.multipipe
import CombMuxOutPipe
8 from nmutil
.singlepipe
import SimpleHandshake
, PassThroughHandshake
, RecordObject
11 class PassInData(RecordObject
):
13 RecordObject
.__init
__(self
)
14 self
.muxid
= Signal(2, reset_less
=True)
15 self
.data
= Signal(16, reset_less
=True)
18 class PassThroughStage
:
23 def ospec(self
, name
):
24 return Signal(16, name
="%s_dout" % name
, reset_less
=True)
30 class PassThroughDataStage
:
34 return self
.ispec() # same as ospec
37 return i
# pass-through
41 class PassThroughPipe(PassThroughHandshake
):
43 PassThroughHandshake
.__init
__(self
, PassThroughDataStage())
47 def __init__(self
, dut
):
52 for i
in range(self
.tlen
* dut
.num_rows
):
56 muxid
= randint(0, dut
.num_rows
-1)
57 data
= randint(0, 255) + (muxid
<<8)
58 if muxid
not in self
.do
:
60 self
.di
.append((data
, muxid
))
61 self
.do
[muxid
].append(data
)
64 for i
in range(self
.tlen
* self
.dut
.num_rows
):
68 yield rs
.i_valid
.eq(1)
69 yield rs
.i_data
.data
.eq(op2
)
70 yield rs
.i_data
.muxid
.eq(muxid
)
72 o_p_ready
= yield rs
.o_ready
75 o_p_ready
= yield rs
.o_ready
77 print ("send", muxid
, i
, hex(op2
))
79 yield rs
.i_valid
.eq(0)
80 # wait random period of time before queueing another value
81 for i
in range(randint(0, 3)):
84 yield rs
.i_valid
.eq(0)
89 stall_range
= randint(0, 3)
90 while out_i
!= len(self
.do
[muxid
]):
92 assert count
!= 2000, "timeout: too long"
96 o_n_valid
= yield n
.o_valid
97 i_n_ready
= yield n
.i_ready
98 if not o_n_valid
or not i_n_ready
:
101 out_v
= yield n
.o_data
103 print ("recv", muxid
, out_i
, hex(out_v
))
105 assert self
.do
[muxid
][out_i
] == out_v
# pass-through data
109 if randint(0, 5) == 0:
110 stall_range
= randint(0, 3)
111 stall
= randint(0, stall_range
) != 0
113 yield n
.i_ready
.eq(0)
114 for i
in range(stall_range
):
118 class TestPriorityMuxPipe(CombMuxOutPipe
):
119 def __init__(self
, num_rows
):
120 self
.num_rows
= num_rows
121 stage
= PassThroughStage()
122 CombMuxOutPipe
.__init
__(self
, stage
, n_len
=self
.num_rows
)
125 class TestSyncToPriorityPipe(Elaboratable
):
128 self
.pipe
= PassThroughPipe()
129 self
.muxpipe
= TestPriorityMuxPipe(self
.num_rows
)
132 self
.n
= self
.muxpipe
.n
134 def elaborate(self
, platform
):
136 m
.submodules
.pipe
= self
.pipe
137 m
.submodules
.muxpipe
= self
.muxpipe
138 m
.d
.comb
+= self
.pipe
.n
.connect_to_next(self
.muxpipe
.p
)
142 res
= [self
.p
.i_valid
, self
.p
.o_ready
] + \
143 self
.p
.i_data
.ports()
144 for i
in range(len(self
.n
)):
145 res
+= [self
.n
[i
].i_ready
, self
.n
[i
].o_valid
] + \
147 #self.n[i].o_data.ports()
152 dut
= TestSyncToPriorityPipe()
153 vl
= rtlil
.convert(dut
, ports
=dut
.ports())
154 with
open("test_outmux_pipe.il", "w") as f
:
157 test
= OutputTest(dut
)
158 run_simulation(dut
, [test
.rcv(1), test
.rcv(0),
159 test
.rcv(3), test
.rcv(2),
161 vcd_name
="test_outmux_pipe.vcd")
163 if __name__
== '__main__':