1 from random
import randint
3 from nmigen
import Module
, Signal
, Cat
4 from nmigen
.compat
.sim
import run_simulation
5 from nmigen
.cli
import verilog
, rtlil
7 from nmutil
.singlepipe
import PassThroughStage
8 from nmutil
.multipipe
import (CombMultiInPipeline
, PriorityCombMuxInPipe
)
10 from . import StepLimiter
16 self
.muxid
= Signal(2, reset_less
=True)
17 self
.idx
= Signal(6, reset_less
=True)
18 self
.data
= Signal(16, reset_less
=True)
21 return [self
.muxid
.eq(i
.muxid
), self
.idx
.eq(i
.idx
), self
.data
.eq(i
.data
)]
24 return [self
.muxid
, self
.idx
, self
.data
]
28 stb
= yield dut
.out_op
.stb
30 ack
= yield dut
.out_op
.ack
34 yield dut
.rs
[1].in_op
[0].eq(5)
35 yield dut
.rs
[1].stb
.eq(0b01) # strobe indicate 1st op ready
36 # yield dut.rs[1].ack.eq(1)
39 # check row 1 output (should be inactive)
40 decode
= yield dut
.rs
[1].out_decode
43 op0
= yield dut
.rs
[1].out_op
[0]
44 op1
= yield dut
.rs
[1].out_op
[1]
45 assert op0
== 0 and op1
== 0
47 # output should be inactive
48 out_stb
= yield dut
.out_op
.stb
52 yield dut
.rs
[1].in_op
[1].eq(6)
53 yield dut
.rs
[1].stb
.eq(0b11) # strobe indicate both ops ready
55 # set acknowledgement of output... takes 1 cycle to respond
56 yield dut
.out_op
.ack
.eq(1)
58 yield dut
.out_op
.ack
.eq(0) # clear ack on output
59 yield dut
.rs
[1].stb
.eq(0) # clear row 1 strobe
61 # output strobe should be active, MID should be 0 until "ack" is set...
62 out_stb
= yield dut
.out_op
.stb
64 out_muxid
= yield dut
.muxid
67 # ... and output should not yet be passed through either
68 op0
= yield dut
.out_op
.v
[0]
69 op1
= yield dut
.out_op
.v
[1]
70 assert op0
== 0 and op1
== 0
72 # wait for out_op.ack to activate...
73 yield dut
.rs
[1].stb
.eq(0b00) # set row 1 strobes to zero
76 # *now* output should be passed through
77 op0
= yield dut
.out_op
.v
[0]
78 op1
= yield dut
.out_op
.v
[1]
79 assert op0
== 5 and op1
== 6
82 yield dut
.rs
[2].in_op
[0].eq(3)
83 yield dut
.rs
[2].in_op
[1].eq(4)
84 yield dut
.rs
[2].stb
.eq(0b11) # strobe indicate 1st op ready
85 yield dut
.out_op
.ack
.eq(1) # set output ack
87 yield dut
.rs
[2].stb
.eq(0) # clear row 2 strobe
88 yield dut
.out_op
.ack
.eq(0) # set output ack
90 op0
= yield dut
.out_op
.v
[0]
91 op1
= yield dut
.out_op
.v
[1]
92 assert op0
== 3 and op1
== 4, "op0 %d op1 %d" % (op0
, op1
)
93 out_muxid
= yield dut
.muxid
96 # set row 0 and 3 input
97 yield dut
.rs
[0].in_op
[0].eq(9)
98 yield dut
.rs
[0].in_op
[1].eq(8)
99 yield dut
.rs
[0].stb
.eq(0b11) # strobe indicate 1st op ready
100 yield dut
.rs
[3].in_op
[0].eq(1)
101 yield dut
.rs
[3].in_op
[1].eq(2)
102 yield dut
.rs
[3].stb
.eq(0b11) # strobe indicate 1st op ready
104 # set acknowledgement of output... takes 1 cycle to respond
105 yield dut
.out_op
.ack
.eq(1)
107 yield dut
.rs
[0].stb
.eq(0) # clear row 1 strobe
109 out_muxid
= yield dut
.muxid
110 assert out_muxid
== 0, "out muxid %d" % out_muxid
113 yield dut
.rs
[3].stb
.eq(0) # clear row 1 strobe
114 yield dut
.out_op
.ack
.eq(0) # clear ack on output
116 out_muxid
= yield dut
.muxid
117 assert out_muxid
== 3, "out muxid %d" % out_muxid
121 def __init__(self
, dut
):
126 for muxid
in range(dut
.num_rows
):
129 for i
in range(self
.tlen
):
130 self
.di
[muxid
][i
] = randint(0, 100) + (muxid
<< 8)
131 self
.do
[muxid
][i
] = self
.di
[muxid
][i
]
133 def send(self
, muxid
):
134 for i
in range(self
.tlen
):
135 op2
= self
.di
[muxid
][i
]
136 rs
= self
.dut
.p
[muxid
]
137 yield rs
.i_valid
.eq(1)
138 yield rs
.i_data
.data
.eq(op2
)
139 yield rs
.i_data
.idx
.eq(i
)
140 yield rs
.i_data
.muxid
.eq(muxid
)
142 o_p_ready
= yield rs
.o_ready
143 step_limiter
= StepLimiter(10000)
147 o_p_ready
= yield rs
.o_ready
149 print("send", muxid
, i
, hex(op2
))
151 yield rs
.i_valid
.eq(0)
152 # wait random period of time before queueing another value
153 for i
in range(randint(0, 3)):
156 yield rs
.i_valid
.eq(0)
157 # wait random period of time before queueing another value
158 # for i in range(randint(0, 3)):
161 #send_range = randint(0, 3)
162 # if send_range == 0:
165 # send = randint(0, send_range) != 0
168 for _
in StepLimiter(10000):
169 #stall_range = randint(0, 3)
170 # for j in range(randint(1,10)):
171 # stall = randint(0, stall_range) != 0
172 # yield self.dut.n[0].i_ready.eq(stall)
175 yield n
.i_ready
.eq(1)
177 o_n_valid
= yield n
.o_valid
178 i_n_ready
= yield n
.i_ready
179 if not o_n_valid
or not i_n_ready
:
182 muxid
= yield n
.o_data
.muxid
183 out_i
= yield n
.o_data
.idx
184 out_v
= yield n
.o_data
.data
186 print("recv", muxid
, out_i
, hex(out_v
))
188 # see if this output has occurred already, delete it if it has
189 assert out_i
in self
.do
[muxid
], "out_i %d not in array %s" % \
190 (out_i
, repr(self
.do
[muxid
]))
191 assert self
.do
[muxid
][out_i
] == out_v
# pass-through data
192 del self
.do
[muxid
][out_i
]
194 # check if there's any more outputs
196 for (k
, v
) in self
.do
.items():
203 class TestPriorityMuxPipe(PriorityCombMuxInPipe
):
206 def iospecfn(): return PassData()
207 stage
= PassThroughStage(iospecfn
)
208 PriorityCombMuxInPipe
.__init
__(self
, stage
, p_len
=self
.num_rows
)
211 @unittest.skip("disabled for now: logic loop") # FIXME
213 dut
= TestPriorityMuxPipe()
214 vl
= rtlil
.convert(dut
, ports
=dut
.ports())
215 with
open("test_inputgroup_multi.il", "w") as f
:
217 #run_simulation(dut, tbench(dut), vcd_name="test_inputgroup.vcd")
219 test
= InputTest(dut
)
220 run_simulation(dut
, [test
.send(1), test
.send(0),
221 test
.send(3), test
.send(2),
223 vcd_name
="test_inputgroup_multi.vcd")
226 if __name__
== '__main__':