1 from random
import randint
3 from nmigen
import Module
, Signal
, Cat
4 from nmigen
.compat
.sim
import run_simulation
5 from nmigen
.cli
import verilog
, rtlil
7 from multipipe
import CombMultiOutPipeline
10 class MuxUnbufferedPipeline(CombMultiOutPipeline
):
11 def __init__(self
, stage
, n_len
):
12 # HACK: stage is also the n-way multiplexer
13 CombMultiOutPipeline
.__init
__(self
, stage
, n_len
=n_len
, n_mux
=stage
)
15 # HACK: n-mux is also the stage... so set the muxid equal to input mid
16 stage
.m_id
= self
.p
.i_data
.mid
19 return self
.p_mux
.ports()
24 self
.mid
= Signal(2, reset_less
=True)
25 self
.data
= Signal(16, reset_less
=True)
28 return [self
.mid
.eq(i
.mid
), self
.data
.eq(i
.data
)]
31 return [self
.mid
, self
.data
]
34 class PassThroughStage
:
40 return Signal(16, reset_less
=True)
48 stb
= yield dut
.out_op
.stb
50 ack
= yield dut
.out_op
.ack
54 yield dut
.rs
[1].in_op
[0].eq(5)
55 yield dut
.rs
[1].stb
.eq(0b01) # strobe indicate 1st op ready
56 #yield dut.rs[1].ack.eq(1)
59 # check row 1 output (should be inactive)
60 decode
= yield dut
.rs
[1].out_decode
63 op0
= yield dut
.rs
[1].out_op
[0]
64 op1
= yield dut
.rs
[1].out_op
[1]
65 assert op0
== 0 and op1
== 0
67 # output should be inactive
68 out_stb
= yield dut
.out_op
.stb
72 yield dut
.rs
[1].in_op
[1].eq(6)
73 yield dut
.rs
[1].stb
.eq(0b11) # strobe indicate both ops ready
75 # set acknowledgement of output... takes 1 cycle to respond
76 yield dut
.out_op
.ack
.eq(1)
78 yield dut
.out_op
.ack
.eq(0) # clear ack on output
79 yield dut
.rs
[1].stb
.eq(0) # clear row 1 strobe
81 # output strobe should be active, MID should be 0 until "ack" is set...
82 out_stb
= yield dut
.out_op
.stb
84 out_mid
= yield dut
.mid
87 # ... and output should not yet be passed through either
88 op0
= yield dut
.out_op
.v
[0]
89 op1
= yield dut
.out_op
.v
[1]
90 assert op0
== 0 and op1
== 0
92 # wait for out_op.ack to activate...
93 yield dut
.rs
[1].stb
.eq(0b00) # set row 1 strobes to zero
96 # *now* output should be passed through
97 op0
= yield dut
.out_op
.v
[0]
98 op1
= yield dut
.out_op
.v
[1]
99 assert op0
== 5 and op1
== 6
102 yield dut
.rs
[2].in_op
[0].eq(3)
103 yield dut
.rs
[2].in_op
[1].eq(4)
104 yield dut
.rs
[2].stb
.eq(0b11) # strobe indicate 1st op ready
105 yield dut
.out_op
.ack
.eq(1) # set output ack
107 yield dut
.rs
[2].stb
.eq(0) # clear row 2 strobe
108 yield dut
.out_op
.ack
.eq(0) # set output ack
110 op0
= yield dut
.out_op
.v
[0]
111 op1
= yield dut
.out_op
.v
[1]
112 assert op0
== 3 and op1
== 4, "op0 %d op1 %d" % (op0
, op1
)
113 out_mid
= yield dut
.mid
116 # set row 0 and 3 input
117 yield dut
.rs
[0].in_op
[0].eq(9)
118 yield dut
.rs
[0].in_op
[1].eq(8)
119 yield dut
.rs
[0].stb
.eq(0b11) # strobe indicate 1st op ready
120 yield dut
.rs
[3].in_op
[0].eq(1)
121 yield dut
.rs
[3].in_op
[1].eq(2)
122 yield dut
.rs
[3].stb
.eq(0b11) # strobe indicate 1st op ready
124 # set acknowledgement of output... takes 1 cycle to respond
125 yield dut
.out_op
.ack
.eq(1)
127 yield dut
.rs
[0].stb
.eq(0) # clear row 1 strobe
129 out_mid
= yield dut
.mid
130 assert out_mid
== 0, "out mid %d" % out_mid
133 yield dut
.rs
[3].stb
.eq(0) # clear row 1 strobe
134 yield dut
.out_op
.ack
.eq(0) # clear ack on output
136 out_mid
= yield dut
.mid
137 assert out_mid
== 3, "out mid %d" % out_mid
141 def __init__(self
, dut
):
146 for i
in range(self
.tlen
* dut
.num_rows
):
147 mid
= randint(0, dut
.num_rows
-1)
148 data
= randint(0, 255) + (mid
<<8)
149 if mid
not in self
.do
:
151 self
.di
.append((data
, mid
))
152 self
.do
[mid
].append(data
)
155 for i
in range(self
.tlen
* dut
.num_rows
):
159 yield rs
.i_valid
.eq(1)
160 yield rs
.i_data
.data
.eq(op2
)
161 yield rs
.i_data
.mid
.eq(mid
)
163 o_p_ready
= yield rs
.o_ready
166 o_p_ready
= yield rs
.o_ready
168 print ("send", mid
, i
, hex(op2
))
170 yield rs
.i_valid
.eq(0)
171 # wait random period of time before queueing another value
172 for i
in range(randint(0, 3)):
175 yield rs
.i_valid
.eq(0)
180 while out_i
!= self
.tlen
:
184 #stall_range = randint(0, 3)
185 #for j in range(randint(1,10)):
186 # stall = randint(0, stall_range) != 0
187 # yield self.dut.n[0].i_ready.eq(stall)
190 yield n
.i_ready
.eq(1)
192 o_n_valid
= yield n
.o_valid
193 i_n_ready
= yield n
.i_ready
194 if not o_n_valid
or not i_n_ready
:
197 out_v
= yield n
.o_data
199 print ("recv", mid
, out_i
, hex(out_v
))
201 assert self
.do
[mid
][out_i
] == out_v
# pass-through data
206 class TestPriorityMuxPipe(MuxUnbufferedPipeline
):
209 stage
= PassThroughStage()
210 MuxUnbufferedPipeline
.__init
__(self
, stage
, n_len
=self
.num_rows
)
213 res
= [self
.p
.i_valid
, self
.p
.o_ready
] + \
214 self
.p
.i_data
.ports()
215 for i
in range(len(self
.n
)):
216 res
+= [self
.n
[i
].i_ready
, self
.n
[i
].o_valid
] + \
218 #self.n[i].o_data.ports()
222 if __name__
== '__main__':
223 dut
= TestPriorityMuxPipe()
224 vl
= rtlil
.convert(dut
, ports
=dut
.ports())
225 with
open("test_outmux_pipe.il", "w") as f
:
227 #run_simulation(dut, testbench(dut), vcd_name="test_inputgroup.vcd")
229 test
= OutputTest(dut
)
230 run_simulation(dut
, [test
.rcv(1), test
.rcv(0),
231 test
.rcv(3), test
.rcv(2),
233 vcd_name
="test_outmux_pipe.vcd")