1 from random
import randint
3 from nmigen
import Module
, Signal
, Cat
4 from nmigen
.compat
.sim
import run_simulation
5 from nmigen
.cli
import verilog
, rtlil
7 from singlepipe
import PassThroughStage
8 from multipipe
import (CombMultiInPipeline
, PriorityCombMuxInPipe
)
13 self
.mid
= Signal(2, reset_less
=True)
14 self
.idx
= Signal(6, reset_less
=True)
15 self
.data
= Signal(16, reset_less
=True)
18 return [self
.mid
.eq(i
.mid
), self
.idx
.eq(i
.idx
), self
.data
.eq(i
.data
)]
21 return [self
.mid
, self
.idx
, self
.data
]
25 stb
= yield dut
.out_op
.stb
27 ack
= yield dut
.out_op
.ack
31 yield dut
.rs
[1].in_op
[0].eq(5)
32 yield dut
.rs
[1].stb
.eq(0b01) # strobe indicate 1st op ready
33 #yield dut.rs[1].ack.eq(1)
36 # check row 1 output (should be inactive)
37 decode
= yield dut
.rs
[1].out_decode
40 op0
= yield dut
.rs
[1].out_op
[0]
41 op1
= yield dut
.rs
[1].out_op
[1]
42 assert op0
== 0 and op1
== 0
44 # output should be inactive
45 out_stb
= yield dut
.out_op
.stb
49 yield dut
.rs
[1].in_op
[1].eq(6)
50 yield dut
.rs
[1].stb
.eq(0b11) # strobe indicate both ops ready
52 # set acknowledgement of output... takes 1 cycle to respond
53 yield dut
.out_op
.ack
.eq(1)
55 yield dut
.out_op
.ack
.eq(0) # clear ack on output
56 yield dut
.rs
[1].stb
.eq(0) # clear row 1 strobe
58 # output strobe should be active, MID should be 0 until "ack" is set...
59 out_stb
= yield dut
.out_op
.stb
61 out_mid
= yield dut
.mid
64 # ... and output should not yet be passed through either
65 op0
= yield dut
.out_op
.v
[0]
66 op1
= yield dut
.out_op
.v
[1]
67 assert op0
== 0 and op1
== 0
69 # wait for out_op.ack to activate...
70 yield dut
.rs
[1].stb
.eq(0b00) # set row 1 strobes to zero
73 # *now* output should be passed through
74 op0
= yield dut
.out_op
.v
[0]
75 op1
= yield dut
.out_op
.v
[1]
76 assert op0
== 5 and op1
== 6
79 yield dut
.rs
[2].in_op
[0].eq(3)
80 yield dut
.rs
[2].in_op
[1].eq(4)
81 yield dut
.rs
[2].stb
.eq(0b11) # strobe indicate 1st op ready
82 yield dut
.out_op
.ack
.eq(1) # set output ack
84 yield dut
.rs
[2].stb
.eq(0) # clear row 2 strobe
85 yield dut
.out_op
.ack
.eq(0) # set output ack
87 op0
= yield dut
.out_op
.v
[0]
88 op1
= yield dut
.out_op
.v
[1]
89 assert op0
== 3 and op1
== 4, "op0 %d op1 %d" % (op0
, op1
)
90 out_mid
= yield dut
.mid
93 # set row 0 and 3 input
94 yield dut
.rs
[0].in_op
[0].eq(9)
95 yield dut
.rs
[0].in_op
[1].eq(8)
96 yield dut
.rs
[0].stb
.eq(0b11) # strobe indicate 1st op ready
97 yield dut
.rs
[3].in_op
[0].eq(1)
98 yield dut
.rs
[3].in_op
[1].eq(2)
99 yield dut
.rs
[3].stb
.eq(0b11) # strobe indicate 1st op ready
101 # set acknowledgement of output... takes 1 cycle to respond
102 yield dut
.out_op
.ack
.eq(1)
104 yield dut
.rs
[0].stb
.eq(0) # clear row 1 strobe
106 out_mid
= yield dut
.mid
107 assert out_mid
== 0, "out mid %d" % out_mid
110 yield dut
.rs
[3].stb
.eq(0) # clear row 1 strobe
111 yield dut
.out_op
.ack
.eq(0) # clear ack on output
113 out_mid
= yield dut
.mid
114 assert out_mid
== 3, "out mid %d" % out_mid
118 def __init__(self
, dut
):
123 for mid
in range(dut
.num_rows
):
126 for i
in range(self
.tlen
):
127 self
.di
[mid
][i
] = randint(0, 100) + (mid
<<8)
128 self
.do
[mid
][i
] = self
.di
[mid
][i
]
131 for i
in range(self
.tlen
):
132 op2
= self
.di
[mid
][i
]
134 yield rs
.i_valid
.eq(1)
135 yield rs
.i_data
.data
.eq(op2
)
136 yield rs
.i_data
.idx
.eq(i
)
137 yield rs
.i_data
.mid
.eq(mid
)
139 o_p_ready
= yield rs
.o_ready
142 o_p_ready
= yield rs
.o_ready
144 print ("send", mid
, i
, hex(op2
))
146 yield rs
.i_valid
.eq(0)
147 # wait random period of time before queueing another value
148 for i
in range(randint(0, 3)):
151 yield rs
.i_valid
.eq(0)
152 ## wait random period of time before queueing another value
153 #for i in range(randint(0, 3)):
156 #send_range = randint(0, 3)
160 # send = randint(0, send_range) != 0
164 #stall_range = randint(0, 3)
165 #for j in range(randint(1,10)):
166 # stall = randint(0, stall_range) != 0
167 # yield self.dut.n[0].i_ready.eq(stall)
170 yield n
.i_ready
.eq(1)
172 o_n_valid
= yield n
.o_valid
173 i_n_ready
= yield n
.i_ready
174 if not o_n_valid
or not i_n_ready
:
177 mid
= yield n
.o_data
.mid
178 out_i
= yield n
.o_data
.idx
179 out_v
= yield n
.o_data
.data
181 print ("recv", mid
, out_i
, hex(out_v
))
183 # see if this output has occurred already, delete it if it has
184 assert out_i
in self
.do
[mid
], "out_i %d not in array %s" % \
185 (out_i
, repr(self
.do
[mid
]))
186 assert self
.do
[mid
][out_i
] == out_v
# pass-through data
187 del self
.do
[mid
][out_i
]
189 # check if there's any more outputs
191 for (k
, v
) in self
.do
.items():
198 class TestPriorityMuxPipe(PriorityCombMuxInPipe
):
201 def iospecfn(): return PassData()
202 stage
= PassThroughStage(iospecfn
)
203 PriorityCombMuxInPipe
.__init
__(self
, stage
, p_len
=self
.num_rows
)
206 if __name__
== '__main__':
207 dut
= TestPriorityMuxPipe()
208 vl
= rtlil
.convert(dut
, ports
=dut
.ports())
209 with
open("test_inputgroup_multi.il", "w") as f
:
211 #run_simulation(dut, testbench(dut), vcd_name="test_inputgroup.vcd")
213 test
= InputTest(dut
)
214 run_simulation(dut
, [test
.send(1), test
.send(0),
215 test
.send(3), test
.send(2),
217 vcd_name
="test_inputgroup_multi.vcd")