1 from nmigen
import Module
, Signal
2 from nmigen
.compat
.sim
import run_simulation
3 from example_buf_pipe
import ExampleBufPipe
, ExampleBufPipeAdd
4 from random
import randint
7 def check_o_n_valid(dut
, val
):
8 o_n_valid
= yield dut
.o
.n_valid
9 assert o_n_valid
== val
11 def check_o_n_valid2(dut
, val
):
12 o_n_valid
= yield dut
.o_n_valid
13 assert o_n_valid
== val
17 #yield dut.i_p_rst.eq(1)
18 yield dut
.i
.n_ready
.eq(0)
19 yield dut
.o
.p_ready
.eq(0)
22 #yield dut.i_p_rst.eq(0)
23 yield dut
.i
.n_ready
.eq(1)
24 yield dut
.i
.data
.eq(5)
25 yield dut
.i
.p_valid
.eq(1)
28 yield dut
.i
.data
.eq(7)
29 yield from check_o_n_valid(dut
, 0) # effects of i_p_valid delayed
31 yield from check_o_n_valid(dut
, 1) # ok *now* i_p_valid effect is felt
33 yield dut
.i
.data
.eq(2)
35 yield dut
.i
.n_ready
.eq(0) # begin going into "stall" (next stage says ready)
36 yield dut
.i
.data
.eq(9)
38 yield dut
.i
.p_valid
.eq(0)
39 yield dut
.i
.data
.eq(12)
41 yield dut
.i
.data
.eq(32)
42 yield dut
.i
.n_ready
.eq(1)
44 yield from check_o_n_valid(dut
, 1) # buffer still needs to output
46 yield from check_o_n_valid(dut
, 1) # buffer still needs to output
48 yield from check_o_n_valid(dut
, 0) # buffer outputted, *now* we're done.
53 #yield dut.i.p_rst.eq(1)
54 yield dut
.i_n_ready
.eq(0)
55 #yield dut.o.p_ready.eq(0)
58 #yield dut.i.p_rst.eq(0)
59 yield dut
.i_n_ready
.eq(1)
60 yield dut
.i_data
.eq(5)
61 yield dut
.i_p_valid
.eq(1)
64 yield dut
.i_data
.eq(7)
65 yield from check_o_n_valid2(dut
, 0) # effects of i_p_valid delayed 2 clocks
67 yield from check_o_n_valid2(dut
, 0) # effects of i_p_valid delayed 2 clocks
69 yield dut
.i_data
.eq(2)
71 yield from check_o_n_valid2(dut
, 1) # ok *now* i_p_valid effect is felt
72 yield dut
.i_n_ready
.eq(0) # begin going into "stall" (next stage says ready)
73 yield dut
.i_data
.eq(9)
75 yield dut
.i_p_valid
.eq(0)
76 yield dut
.i_data
.eq(12)
78 yield dut
.i_data
.eq(32)
79 yield dut
.i_n_ready
.eq(1)
81 yield from check_o_n_valid2(dut
, 1) # buffer still needs to output
83 yield from check_o_n_valid2(dut
, 1) # buffer still needs to output
85 yield from check_o_n_valid2(dut
, 1) # buffer still needs to output
87 yield from check_o_n_valid2(dut
, 0) # buffer outputted, *now* we're done.
94 def __init__(self
, dut
):
97 for i
in range(num_tests
):
98 #data.append(randint(0, 1<<16-1))
104 while self
.o
!= len(self
.data
):
105 send_range
= randint(0, 3)
106 for j
in range(randint(1,10)):
110 send
= randint(0, send_range
) != 0
111 o_p_ready
= yield self
.dut
.o
.p_ready
115 if send
and self
.i
!= len(self
.data
):
116 yield self
.dut
.i
.p_valid
.eq(1)
117 yield self
.dut
.i
.data
.eq(self
.data
[self
.i
])
120 yield self
.dut
.i
.p_valid
.eq(0)
124 while self
.o
!= len(self
.data
):
125 stall_range
= randint(0, 3)
126 for j
in range(randint(1,10)):
127 stall
= randint(0, stall_range
) != 0
128 yield self
.dut
.i
.n_ready
.eq(stall
)
130 o_n_valid
= yield self
.dut
.o
.n_valid
131 i_n_ready
= yield self
.dut
.i
.n_ready
132 if not o_n_valid
or not i_n_ready
:
134 o_data
= yield self
.dut
.o
.data
135 assert o_data
== self
.data
[self
.o
] + 1, \
136 "%d-%d data %x not match %x\n" \
137 % (self
.i
, self
.o
, o_data
, self
.data
[self
.o
])
139 if self
.o
== len(self
.data
):
144 def __init__(self
, dut
):
147 for i
in range(num_tests
):
148 self
.data
.append((randint(0, 1<<14), randint(0, 1<<14)))
153 while self
.o
!= len(self
.data
):
154 send_range
= randint(0, 3)
155 for j
in range(randint(1,10)):
159 send
= randint(0, send_range
) != 0
160 o_p_ready
= yield self
.dut
.o
.p_ready
164 if send
and self
.i
!= len(self
.data
):
165 yield self
.dut
.i
.p_valid
.eq(1)
166 for v
in self
.dut
.set_input(self
.data
[self
.i
]):
170 yield self
.dut
.i
.p_valid
.eq(0)
174 while self
.o
!= len(self
.data
):
175 stall_range
= randint(0, 3)
176 for j
in range(randint(1,10)):
177 stall
= randint(0, stall_range
) != 0
178 yield self
.dut
.i
.n_ready
.eq(stall
)
180 o_n_valid
= yield self
.dut
.o
.n_valid
181 i_n_ready
= yield self
.dut
.i
.n_ready
182 if not o_n_valid
or not i_n_ready
:
184 o_data
= yield self
.dut
.o
.data
185 res
= self
.data
[self
.o
][0] + self
.data
[self
.o
][1]
186 assert o_data
== res
, \
187 "%d-%d data %x not match %s\n" \
188 % (self
.i
, self
.o
, o_data
, repr(self
.data
[self
.o
]))
190 if self
.o
== len(self
.data
):
196 for i
in range(num_tests
):
197 #data.append(randint(0, 1<<16-1))
202 stall
= randint(0, 3) != 0
203 send
= randint(0, 5) != 0
204 yield dut
.i_n_ready
.eq(stall
)
205 o_p_ready
= yield dut
.o_p_ready
207 if send
and i
!= len(data
):
208 yield dut
.i_p_valid
.eq(1)
209 yield dut
.i_data
.eq(data
[i
])
212 yield dut
.i_p_valid
.eq(0)
214 o_n_valid
= yield dut
.o_n_valid
215 i_n_ready
= yield dut
.i_n_ready
216 if o_n_valid
and i_n_ready
:
217 o_data
= yield dut
.o_data
218 assert o_data
== data
[o
] + 2, "%d-%d data %x not match %x\n" \
219 % (i
, o
, o_data
, data
[o
])
225 class ExampleBufPipe2
:
227 connect these: ------|---------------|
229 i_p_valid >>in pipe1 o_n_valid out>> i_p_valid >>in pipe2
230 o_p_ready <<out pipe1 i_n_ready <<in o_p_ready <<out pipe2
231 i_data >>in pipe1 o_data out>> i_data >>in pipe2
234 self
.pipe1
= ExampleBufPipe()
235 self
.pipe2
= ExampleBufPipe()
238 self
.i_p_valid
= Signal() # >>in - comes in from PREVIOUS stage
239 self
.i_n_ready
= Signal() # in<< - comes in from the NEXT stage
240 self
.i_data
= Signal(32) # >>in - comes in from the PREVIOUS stage
243 self
.o_n_valid
= Signal() # out>> - goes out to the NEXT stage
244 self
.o_p_ready
= Signal() # <<out - goes out to the PREVIOUS stage
245 self
.o_data
= Signal(32) # out>> - goes out to the NEXT stage
247 def elaborate(self
, platform
):
249 m
.submodules
.pipe1
= self
.pipe1
250 m
.submodules
.pipe2
= self
.pipe2
252 # connect inter-pipe input/output valid/ready/data
253 m
.d
.comb
+= self
.pipe2
.i
.p_valid
.eq(self
.pipe1
.o
.n_valid
)
254 m
.d
.comb
+= self
.pipe1
.i
.n_ready
.eq(self
.pipe2
.o
.p_ready
)
255 m
.d
.comb
+= self
.pipe2
.i
.data
.eq(self
.pipe1
.o
.data
)
257 # inputs/outputs to the module: pipe1 connections here (LHS)
258 m
.d
.comb
+= self
.pipe1
.i
.p_valid
.eq(self
.i_p_valid
)
259 m
.d
.comb
+= self
.o_p_ready
.eq(self
.pipe1
.o
.p_ready
)
260 m
.d
.comb
+= self
.pipe1
.i
.data
.eq(self
.i_data
)
262 # now pipe2 connections (RHS)
263 m
.d
.comb
+= self
.o_n_valid
.eq(self
.pipe2
.o
.n_valid
)
264 m
.d
.comb
+= self
.pipe2
.i
.n_ready
.eq(self
.i_n_ready
)
265 m
.d
.comb
+= self
.o_data
.eq(self
.pipe2
.o
.data
)
271 if __name__
== '__main__':
273 dut
= ExampleBufPipe()
274 run_simulation(dut
, testbench(dut
), vcd_name
="test_bufpipe.vcd")
277 dut
= ExampleBufPipe2()
278 run_simulation(dut
, testbench2(dut
), vcd_name
="test_bufpipe2.vcd")
281 dut
= ExampleBufPipe()
283 run_simulation(dut
, [test
.send
, test
.rcv
], vcd_name
="test_bufpipe3.vcd")
286 dut
= ExampleBufPipe2()
287 run_simulation(dut
, testbench4(dut
), vcd_name
="test_bufpipe4.vcd")
290 dut
= ExampleBufPipeAdd()
292 run_simulation(dut
, [test
.send
, test
.rcv
], vcd_name
="test_bufpipe5.vcd")