1 from random
import randint
3 from nmigen
import Module
, Signal
, Cat
4 from nmigen
.compat
.sim
import run_simulation
5 from nmigen
.cli
import verilog
, rtlil
6 from nmigen
.lib
.coding
import PriorityEncoder
8 from example_buf_pipe
import UnbufferedPipeline
11 class InputPriorityArbiter
:
12 def __init__(self
, pipe
, num_rows
):
14 self
.num_rows
= num_rows
15 self
.mmax
= int(log(self
.num_rows
) / log(2))
16 self
.m_id
= Signal(self
.mmax
, reset_less
=True) # multiplex id
17 self
.active
= Signal(reset_less
=True)
19 def elaborate(self
, platform
):
22 assert len(self
.pipe
.p
) == self
.num_rows
, \
23 "must declare input to be same size"
24 pe
= PriorityEncoder(self
.num_rows
)
25 m
.submodules
.selector
= pe
27 # connect priority encoder
29 for i
in range(self
.num_rows
):
30 p_i_valid
= Signal(reset_less
=True)
31 m
.d
.comb
+= p_i_valid
.eq(self
.pipe
.p
[i
].i_valid_logic())
32 in_ready
.append(p_i_valid
)
33 m
.d
.comb
+= pe
.i
.eq(Cat(*in_ready
)) # array of input "valids"
34 m
.d
.comb
+= self
.active
.eq(~pe
.n
) # encoder active (one input valid)
35 m
.d
.comb
+= self
.m_id
.eq(pe
.o
) # output one active input
40 return [self
.m_id
, self
.active
]
43 class PriorityUnbufferedPipeline(UnbufferedPipeline
):
44 def __init__(self
, stage
, p_len
=4):
45 p_mux
= InputPriorityArbiter(self
, p_len
)
46 UnbufferedPipeline
.__init
__(self
, stage
, p_len
=p_len
, p_mux
=p_mux
)
49 return self
.p_mux
.ports()
50 #return UnbufferedPipeline.ports(self) + self.p_mux.ports()
56 self
.data
= Signal(16)
59 return [self
.mid
.eq(i
.mid
), self
.idx
.eq(i
.idx
), self
.data
.eq(i
.data
)]
62 return [self
.mid
, self
.idx
, self
.data
]
64 class PassThroughStage
:
68 return self
.ispec() # same as ospec
71 return i
# pass-through
76 stb
= yield dut
.out_op
.stb
78 ack
= yield dut
.out_op
.ack
82 yield dut
.rs
[1].in_op
[0].eq(5)
83 yield dut
.rs
[1].stb
.eq(0b01) # strobe indicate 1st op ready
84 #yield dut.rs[1].ack.eq(1)
87 # check row 1 output (should be inactive)
88 decode
= yield dut
.rs
[1].out_decode
91 op0
= yield dut
.rs
[1].out_op
[0]
92 op1
= yield dut
.rs
[1].out_op
[1]
93 assert op0
== 0 and op1
== 0
95 # output should be inactive
96 out_stb
= yield dut
.out_op
.stb
100 yield dut
.rs
[1].in_op
[1].eq(6)
101 yield dut
.rs
[1].stb
.eq(0b11) # strobe indicate both ops ready
103 # set acknowledgement of output... takes 1 cycle to respond
104 yield dut
.out_op
.ack
.eq(1)
106 yield dut
.out_op
.ack
.eq(0) # clear ack on output
107 yield dut
.rs
[1].stb
.eq(0) # clear row 1 strobe
109 # output strobe should be active, MID should be 0 until "ack" is set...
110 out_stb
= yield dut
.out_op
.stb
112 out_mid
= yield dut
.mid
115 # ... and output should not yet be passed through either
116 op0
= yield dut
.out_op
.v
[0]
117 op1
= yield dut
.out_op
.v
[1]
118 assert op0
== 0 and op1
== 0
120 # wait for out_op.ack to activate...
121 yield dut
.rs
[1].stb
.eq(0b00) # set row 1 strobes to zero
124 # *now* output should be passed through
125 op0
= yield dut
.out_op
.v
[0]
126 op1
= yield dut
.out_op
.v
[1]
127 assert op0
== 5 and op1
== 6
130 yield dut
.rs
[2].in_op
[0].eq(3)
131 yield dut
.rs
[2].in_op
[1].eq(4)
132 yield dut
.rs
[2].stb
.eq(0b11) # strobe indicate 1st op ready
133 yield dut
.out_op
.ack
.eq(1) # set output ack
135 yield dut
.rs
[2].stb
.eq(0) # clear row 2 strobe
136 yield dut
.out_op
.ack
.eq(0) # set output ack
138 op0
= yield dut
.out_op
.v
[0]
139 op1
= yield dut
.out_op
.v
[1]
140 assert op0
== 3 and op1
== 4, "op0 %d op1 %d" % (op0
, op1
)
141 out_mid
= yield dut
.mid
144 # set row 0 and 3 input
145 yield dut
.rs
[0].in_op
[0].eq(9)
146 yield dut
.rs
[0].in_op
[1].eq(8)
147 yield dut
.rs
[0].stb
.eq(0b11) # strobe indicate 1st op ready
148 yield dut
.rs
[3].in_op
[0].eq(1)
149 yield dut
.rs
[3].in_op
[1].eq(2)
150 yield dut
.rs
[3].stb
.eq(0b11) # strobe indicate 1st op ready
152 # set acknowledgement of output... takes 1 cycle to respond
153 yield dut
.out_op
.ack
.eq(1)
155 yield dut
.rs
[0].stb
.eq(0) # clear row 1 strobe
157 out_mid
= yield dut
.mid
158 assert out_mid
== 0, "out mid %d" % out_mid
161 yield dut
.rs
[3].stb
.eq(0) # clear row 1 strobe
162 yield dut
.out_op
.ack
.eq(0) # clear ack on output
164 out_mid
= yield dut
.mid
165 assert out_mid
== 3, "out mid %d" % out_mid
169 def __init__(self
, dut
):
174 for mid
in range(dut
.num_rows
):
177 for i
in range(self
.tlen
):
178 self
.di
[mid
][i
] = randint(0, 100)
179 self
.do
[mid
][i
] = self
.di
[mid
][i
]
182 for i
in range(self
.tlen
):
183 op2
= self
.di
[mid
][i
]
185 yield rs
.i_valid
.eq(0)
186 o_p_ready
= yield rs
.o_ready
189 o_p_ready
= yield rs
.o_ready
191 print ("send", mid
, i
, op2
)
193 yield rs
.i_valid
.eq(1)
194 yield rs
.i_data
.data
.eq(op2
)
195 yield rs
.i_data
.idx
.eq(i
)
196 yield rs
.i_data
.mid
.eq(mid
)
197 #for v in self.dut.set_input((op2, i, mid)):
201 yield rs
.i_valid
.eq(0)
202 ## wait random period of time before queueing another value
203 #for i in range(randint(0, 3)):
206 #send_range = randint(0, 3)
210 # send = randint(0, send_range) != 0
214 #stall_range = randint(0, 3)
215 #for j in range(randint(1,10)):
216 # stall = randint(0, stall_range) != 0
217 # yield self.dut.n[0].i_ready.eq(stall)
220 yield n
.i_ready
.eq(1)
222 o_n_valid
= yield n
.o_valid
223 i_n_ready
= yield n
.i_ready
224 if not o_n_valid
or not i_n_ready
:
227 mid
= yield n
.o_data
.mid
228 out_i
= yield n
.o_data
.idx
229 out_v
= yield n
.o_data
.data
231 print ("recv", mid
, out_i
, out_v
)
233 # see if this output has occurred already, delete it if it has
234 assert out_i
in self
.do
[mid
], "out_i %d not in array %s" % \
235 (out_i
, repr(self
.do
[mid
]))
236 assert self
.do
[mid
][out_i
] == out_v
# pass-through data
237 del self
.do
[mid
][out_i
]
239 # check if there's any more outputs
241 for (k
, v
) in self
.do
.items():
248 class TestPriorityMuxPipe(PriorityUnbufferedPipeline
):
251 stage
= PassThroughStage()
252 PriorityUnbufferedPipeline
.__init
__(self
, stage
, p_len
=self
.num_rows
)
256 for i
in range(len(self
.p
)):
257 res
+= [self
.p
[i
].i_valid
, self
.p
[i
].o_ready
] + \
258 self
.p
[i
].i_data
.ports()
259 for i
in range(len(self
.n
)):
260 res
+= [self
.n
[i
].i_ready
, self
.n
[i
].o_valid
] + \
261 self
.n
[i
].o_data
.ports()
265 if __name__
== '__main__':
266 dut
= TestPriorityMuxPipe()
267 vl
= rtlil
.convert(dut
, ports
=dut
.ports())
268 with
open("test_inputgroup.il", "w") as f
:
270 #run_simulation(dut, testbench(dut), vcd_name="test_inputgroup.vcd")
272 test
= InputTest(dut
)
273 run_simulation(dut
, [test
.send(1), test
.send(0),
274 #test.send(3), test.send(2),
276 vcd_name
="test_inputgroup_parallel.vcd")