1 from contextlib
import contextmanager
4 from ..tools
import flatten
, union
5 from ..hdl
.ast
import *
7 from ..hdl
.mem
import *
8 from ..hdl
.dsl
import *
10 from ..back
.pysim
import *
13 class SimulatorUnitTestCase(FHDLTestCase
):
14 def assertStatement(self
, stmt
, inputs
, output
, reset
=0):
15 inputs
= [Value
.wrap(i
) for i
in inputs
]
16 output
= Value
.wrap(output
)
18 isigs
= [Signal(i
.shape(), name
=n
) for i
, n
in zip(inputs
, "abcd")]
19 osig
= Signal(output
.shape(), name
="y", reset
=reset
)
21 stmt
= stmt(osig
, *isigs
)
23 frag
.add_statements(stmt
)
24 for signal
in flatten(s
._lhs
_signals
() for s
in Statement
.wrap(stmt
)):
25 frag
.add_driver(signal
)
28 vcd_file
=open("test.vcd", "w"),
29 gtkw_file
=open("test.gtkw", "w"),
30 traces
=[*isigs
, osig
]) as sim
:
32 for isig
, input in zip(isigs
, inputs
):
35 self
.assertEqual((yield osig
), output
.value
)
36 sim
.add_process(process
)
39 def test_invert(self
):
40 stmt
= lambda y
, a
: y
.eq(~a
)
41 self
.assertStatement(stmt
, [C(0b0000, 4)], C(0b1111, 4))
42 self
.assertStatement(stmt
, [C(0b1010, 4)], C(0b0101, 4))
43 self
.assertStatement(stmt
, [C(0, 4)], C(-1, 4))
46 stmt
= lambda y
, a
: y
.eq(-a
)
47 self
.assertStatement(stmt
, [C(0b0000, 4)], C(0b0000, 4))
48 self
.assertStatement(stmt
, [C(0b0001, 4)], C(0b1111, 4))
49 self
.assertStatement(stmt
, [C(0b1010, 4)], C(0b0110, 4))
50 self
.assertStatement(stmt
, [C(1, 4)], C(-1, 4))
51 self
.assertStatement(stmt
, [C(5, 4)], C(-5, 4))
54 stmt
= lambda y
, a
: y
.eq(a
.bool())
55 self
.assertStatement(stmt
, [C(0, 4)], C(0))
56 self
.assertStatement(stmt
, [C(1, 4)], C(1))
57 self
.assertStatement(stmt
, [C(2, 4)], C(1))
60 stmt
= lambda y
, a
, b
: y
.eq(a
+ b
)
61 self
.assertStatement(stmt
, [C(0, 4), C(1, 4)], C(1, 4))
62 self
.assertStatement(stmt
, [C(-5, 4), C(-5, 4)], C(-10, 5))
65 stmt
= lambda y
, a
, b
: y
.eq(a
- b
)
66 self
.assertStatement(stmt
, [C(2, 4), C(1, 4)], C(1, 4))
67 self
.assertStatement(stmt
, [C(0, 4), C(1, 4)], C(-1, 4))
68 self
.assertStatement(stmt
, [C(0, 4), C(10, 4)], C(-10, 5))
71 stmt
= lambda y
, a
, b
: y
.eq(a
* b
)
72 self
.assertStatement(stmt
, [C(2, 4), C(1, 4)], C(2, 8))
73 self
.assertStatement(stmt
, [C(2, 4), C(2, 4)], C(4, 8))
74 self
.assertStatement(stmt
, [C(7, 4), C(7, 4)], C(49, 8))
77 stmt
= lambda y
, a
, b
: y
.eq(a
& b
)
78 self
.assertStatement(stmt
, [C(0b1100, 4), C(0b1010, 4)], C(0b1000, 4))
81 stmt
= lambda y
, a
, b
: y
.eq(a | b
)
82 self
.assertStatement(stmt
, [C(0b1100, 4), C(0b1010, 4)], C(0b1110, 4))
85 stmt
= lambda y
, a
, b
: y
.eq(a ^ b
)
86 self
.assertStatement(stmt
, [C(0b1100, 4), C(0b1010, 4)], C(0b0110, 4))
89 stmt
= lambda y
, a
, b
: y
.eq(a
<< b
)
90 self
.assertStatement(stmt
, [C(0b1001, 4), C(0)], C(0b1001, 5))
91 self
.assertStatement(stmt
, [C(0b1001, 4), C(3)], C(0b1001000, 7))
92 self
.assertStatement(stmt
, [C(0b1001, 4), C(-2)], C(0b10, 7))
95 stmt
= lambda y
, a
, b
: y
.eq(a
>> b
)
96 self
.assertStatement(stmt
, [C(0b1001, 4), C(0)], C(0b1001, 4))
97 self
.assertStatement(stmt
, [C(0b1001, 4), C(2)], C(0b10, 4))
98 self
.assertStatement(stmt
, [C(0b1001, 4), C(-2)], C(0b100100, 5))
101 stmt
= lambda y
, a
, b
: y
.eq(a
== b
)
102 self
.assertStatement(stmt
, [C(0, 4), C(0, 4)], C(1))
103 self
.assertStatement(stmt
, [C(0, 4), C(1, 4)], C(0))
104 self
.assertStatement(stmt
, [C(1, 4), C(0, 4)], C(0))
107 stmt
= lambda y
, a
, b
: y
.eq(a
!= b
)
108 self
.assertStatement(stmt
, [C(0, 4), C(0, 4)], C(0))
109 self
.assertStatement(stmt
, [C(0, 4), C(1, 4)], C(1))
110 self
.assertStatement(stmt
, [C(1, 4), C(0, 4)], C(1))
113 stmt
= lambda y
, a
, b
: y
.eq(a
< b
)
114 self
.assertStatement(stmt
, [C(0, 4), C(0, 4)], C(0))
115 self
.assertStatement(stmt
, [C(0, 4), C(1, 4)], C(1))
116 self
.assertStatement(stmt
, [C(1, 4), C(0, 4)], C(0))
119 stmt
= lambda y
, a
, b
: y
.eq(a
>= b
)
120 self
.assertStatement(stmt
, [C(0, 4), C(0, 4)], C(1))
121 self
.assertStatement(stmt
, [C(0, 4), C(1, 4)], C(0))
122 self
.assertStatement(stmt
, [C(1, 4), C(0, 4)], C(1))
125 stmt
= lambda y
, a
, b
: y
.eq(a
> b
)
126 self
.assertStatement(stmt
, [C(0, 4), C(0, 4)], C(0))
127 self
.assertStatement(stmt
, [C(0, 4), C(1, 4)], C(0))
128 self
.assertStatement(stmt
, [C(1, 4), C(0, 4)], C(1))
131 stmt
= lambda y
, a
, b
: y
.eq(a
<= b
)
132 self
.assertStatement(stmt
, [C(0, 4), C(0, 4)], C(1))
133 self
.assertStatement(stmt
, [C(0, 4), C(1, 4)], C(1))
134 self
.assertStatement(stmt
, [C(1, 4), C(0, 4)], C(0))
137 stmt
= lambda y
, a
, b
, c
: y
.eq(Mux(c
, a
, b
))
138 self
.assertStatement(stmt
, [C(2, 4), C(3, 4), C(0)], C(3, 4))
139 self
.assertStatement(stmt
, [C(2, 4), C(3, 4), C(1)], C(2, 4))
141 def test_slice(self
):
142 stmt1
= lambda y
, a
: y
.eq(a
[2])
143 self
.assertStatement(stmt1
, [C(0b10110100, 8)], C(0b1, 1))
144 stmt2
= lambda y
, a
: y
.eq(a
[2:4])
145 self
.assertStatement(stmt2
, [C(0b10110100, 8)], C(0b01, 2))
147 def test_slice_lhs(self
):
148 stmt1
= lambda y
, a
: y
[2].eq(a
)
149 self
.assertStatement(stmt1
, [C(0b0, 1)], C(0b11111011, 8), reset
=0b11111111)
150 stmt2
= lambda y
, a
: y
[2:4].eq(a
)
151 self
.assertStatement(stmt2
, [C(0b01, 2)], C(0b11110111, 8), reset
=0b11111011)
154 stmt
= lambda y
, a
, b
: y
.eq(a
.part(b
, 3))
155 self
.assertStatement(stmt
, [C(0b10110100, 8), C(0)], C(0b100, 3))
156 self
.assertStatement(stmt
, [C(0b10110100, 8), C(2)], C(0b101, 3))
157 self
.assertStatement(stmt
, [C(0b10110100, 8), C(3)], C(0b110, 3))
159 def test_part_lhs(self
):
160 stmt
= lambda y
, a
, b
: y
.part(a
, 3).eq(b
)
161 self
.assertStatement(stmt
, [C(0), C(0b100, 3)], C(0b11111100, 8), reset
=0b11111111)
162 self
.assertStatement(stmt
, [C(2), C(0b101, 3)], C(0b11110111, 8), reset
=0b11111111)
163 self
.assertStatement(stmt
, [C(3), C(0b110, 3)], C(0b11110111, 8), reset
=0b11111111)
166 stmt
= lambda y
, *xs
: y
.eq(Cat(*xs
))
167 self
.assertStatement(stmt
, [C(0b10, 2), C(0b01, 2)], C(0b0110, 4))
169 def test_cat_lhs(self
):
173 stmt
= lambda y
, a
: [Cat(l
, m
, n
).eq(a
), y
.eq(Cat(n
, m
, l
))]
174 self
.assertStatement(stmt
, [C(0b100101110, 9)], C(0b110101100, 9))
177 stmt
= lambda y
, a
: y
.eq(Repl(a
, 3))
178 self
.assertStatement(stmt
, [C(0b10, 2)], C(0b101010, 6))
180 def test_array(self
):
181 array
= Array([1, 4, 10])
182 stmt
= lambda y
, a
: y
.eq(array
[a
])
183 self
.assertStatement(stmt
, [C(0)], C(1))
184 self
.assertStatement(stmt
, [C(1)], C(4))
185 self
.assertStatement(stmt
, [C(2)], C(10))
187 def test_array_oob(self
):
188 array
= Array([1, 4, 10])
189 stmt
= lambda y
, a
: y
.eq(array
[a
])
190 self
.assertStatement(stmt
, [C(3)], C(10))
191 self
.assertStatement(stmt
, [C(4)], C(10))
193 def test_array_lhs(self
):
194 l
= Signal(3, reset
=1)
195 m
= Signal(3, reset
=4)
196 n
= Signal(3, reset
=7)
197 array
= Array([l
, m
, n
])
198 stmt
= lambda y
, a
, b
: [array
[a
].eq(b
), y
.eq(Cat(*array
))]
199 self
.assertStatement(stmt
, [C(0), C(0b000)], C(0b111100000))
200 self
.assertStatement(stmt
, [C(1), C(0b010)], C(0b111010001))
201 self
.assertStatement(stmt
, [C(2), C(0b100)], C(0b100100001))
203 def test_array_lhs_oob(self
):
207 array
= Array([l
, m
, n
])
208 stmt
= lambda y
, a
, b
: [array
[a
].eq(b
), y
.eq(Cat(*array
))]
209 self
.assertStatement(stmt
, [C(3), C(0b001)], C(0b001000000))
210 self
.assertStatement(stmt
, [C(4), C(0b010)], C(0b010000000))
212 def test_array_index(self
):
213 array
= Array(Array(x
* y
for y
in range(10)) for x
in range(10))
214 stmt
= lambda y
, a
, b
: y
.eq(array
[a
][b
])
217 self
.assertStatement(stmt
, [C(x
), C(y
)], C(x
* y
))
219 def test_array_attr(self
):
220 from collections
import namedtuple
221 pair
= namedtuple("pair", ("p", "n"))
223 array
= Array(pair(x
, -x
) for x
in range(10))
224 stmt
= lambda y
, a
: y
.eq(array
[a
].p
+ array
[a
].n
)
226 self
.assertStatement(stmt
, [C(i
)], C(0))
229 class SimulatorIntegrationTestCase(FHDLTestCase
):
231 def assertSimulation(self
, module
, deadline
=None):
232 with
Simulator(module
.lower(platform
=None)) as sim
:
237 sim
.run_until(deadline
)
239 def setUp_counter(self
):
240 self
.count
= Signal(3, reset
=4)
241 self
.sync
= ClockDomain()
244 self
.m
.d
.sync
+= self
.count
.eq(self
.count
+ 1)
245 self
.m
.domains
+= self
.sync
247 def test_counter_process(self
):
249 with self
.assertSimulation(self
.m
) as sim
:
251 self
.assertEqual((yield self
.count
), 4)
253 self
.assertEqual((yield self
.count
), 4)
254 yield self
.sync
.clk
.eq(1)
255 self
.assertEqual((yield self
.count
), 5)
257 self
.assertEqual((yield self
.count
), 5)
258 yield self
.sync
.clk
.eq(0)
259 self
.assertEqual((yield self
.count
), 5)
262 yield self
.sync
.clk
.eq(1)
264 yield self
.sync
.clk
.eq(0)
265 self
.assertEqual((yield self
.count
), 0)
266 sim
.add_process(process
)
268 def test_counter_clock_and_sync_process(self
):
270 with self
.assertSimulation(self
.m
) as sim
:
271 sim
.add_clock(1e-6, domain
="sync")
273 self
.assertEqual((yield self
.count
), 4)
274 self
.assertEqual((yield self
.sync
.clk
), 0)
276 self
.assertEqual((yield self
.count
), 5)
277 self
.assertEqual((yield self
.sync
.clk
), 1)
280 self
.assertEqual((yield self
.count
), 0)
281 sim
.add_sync_process(process
)
289 self
.sync
= ClockDomain(reset_less
=True)
292 self
.m
.d
.comb
+= self
.x
.eq(self
.a ^ self
.b
)
293 with self
.m
.Switch(self
.s
):
295 self
.m
.d
.sync
+= self
.o
.eq(self
.a
+ self
.b
)
297 self
.m
.d
.sync
+= self
.o
.eq(self
.a
- self
.b
)
299 self
.m
.d
.sync
+= self
.o
.eq(0)
300 self
.m
.domains
+= self
.sync
304 with self
.assertSimulation(self
.m
) as sim
:
310 self
.assertEqual((yield self
.x
), 4)
311 self
.assertEqual((yield self
.o
), 6)
314 self
.assertEqual((yield self
.o
), 4)
317 self
.assertEqual((yield self
.o
), 0)
318 sim
.add_sync_process(process
)
320 def setUp_multiclock(self
):
321 self
.sys
= ClockDomain()
322 self
.pix
= ClockDomain()
325 self
.m
.domains
+= self
.sys
, self
.pix
327 def test_multiclock(self
):
328 self
.setUp_multiclock()
329 with self
.assertSimulation(self
.m
) as sim
:
330 sim
.add_clock(1e-6, domain
="sys")
331 sim
.add_clock(0.3e-6, domain
="pix")
342 sim
.add_sync_process(sys_process
, domain
="sys")
343 sim
.add_sync_process(pix_process
, domain
="pix")
345 def setUp_lhs_rhs(self
):
350 self
.m
.d
.comb
+= self
.o
.eq(self
.i
)
352 def test_complex_lhs_rhs(self
):
354 with self
.assertSimulation(self
.m
) as sim
:
356 yield self
.i
.eq(0b10101010)
357 yield self
.i
[:4].eq(-1)
359 self
.assertEqual((yield self
.i
[:4]), 0b1111)
360 self
.assertEqual((yield self
.i
), 0b10101111)
361 sim
.add_process(process
)
363 def test_run_until(self
):
364 with self
.assertSimulation(Module(), deadline
=100e-6) as sim
:
371 def test_add_process_wrong(self
):
372 with self
.assertSimulation(Module()) as sim
:
373 with self
.assertRaises(TypeError,
374 msg
="Cannot add a process '1' because it is not a generator or "
375 "a generator function"):
378 def test_eq_signal_unused_wrong(self
):
381 with self
.assertSimulation(self
.m
) as sim
:
383 with self
.assertRaisesRegex(ValueError,
384 regex
=r
"Process '.+?' sent a request to set signal '\(sig s\)', "
385 r
"which is not a part of simulation"):
388 sim
.add_process(process
)
390 def test_eq_signal_comb_wrong(self
):
392 with self
.assertSimulation(self
.m
) as sim
:
394 with self
.assertRaisesRegex(ValueError,
395 regex
=r
"Process '.+?' sent a request to set signal '\(sig o\)', "
396 r
"which is a part of combinatorial assignment in simulation"):
399 sim
.add_process(process
)
401 def test_command_wrong(self
):
402 with self
.assertSimulation(Module()) as sim
:
404 with self
.assertRaisesRegex(TypeError,
405 regex
=r
"Received unsupported command '1' from process '.+?'"):
408 sim
.add_process(process
)
410 def setUp_memory(self
, rd_synchronous
=True, rd_transparent
=True, wr_granularity
=None):
412 self
.memory
= Memory(width
=8, depth
=4, init
=[0xaa, 0x55])
413 self
.m
.submodules
.rdport
= self
.rdport
= \
414 self
.memory
.read_port(synchronous
=rd_synchronous
, transparent
=rd_transparent
)
415 self
.m
.submodules
.wrport
= self
.wrport
= \
416 self
.memory
.write_port(granularity
=wr_granularity
)
418 def test_memory_init(self
):
420 with self
.assertSimulation(self
.m
) as sim
:
422 self
.assertEqual((yield self
.rdport
.data
), 0xaa)
423 yield self
.rdport
.addr
.eq(1)
426 self
.assertEqual((yield self
.rdport
.data
), 0x55)
427 yield self
.rdport
.addr
.eq(2)
430 self
.assertEqual((yield self
.rdport
.data
), 0x00)
432 sim
.add_sync_process(process
)
434 def test_memory_write(self
):
436 with self
.assertSimulation(self
.m
) as sim
:
438 yield self
.wrport
.addr
.eq(4)
439 yield self
.wrport
.data
.eq(0x33)
440 yield self
.wrport
.en
.eq(1)
442 yield self
.wrport
.en
.eq(0)
443 yield self
.rdport
.addr
.eq(4)
445 self
.assertEqual((yield self
.rdport
.data
), 0x33)
447 sim
.add_sync_process(process
)
449 def test_memory_write_granularity(self
):
450 self
.setUp_memory(wr_granularity
=4)
451 with self
.assertSimulation(self
.m
) as sim
:
453 yield self
.wrport
.data
.eq(0x50)
454 yield self
.wrport
.en
.eq(0b00)
456 yield self
.wrport
.en
.eq(0)
458 self
.assertEqual((yield self
.rdport
.data
), 0xaa)
459 yield self
.wrport
.en
.eq(0b10)
461 yield self
.wrport
.en
.eq(0)
463 self
.assertEqual((yield self
.rdport
.data
), 0x5a)
464 yield self
.wrport
.data
.eq(0x33)
465 yield self
.wrport
.en
.eq(0b01)
467 yield self
.wrport
.en
.eq(0)
469 self
.assertEqual((yield self
.rdport
.data
), 0x53)
471 sim
.add_sync_process(process
)
473 def test_memory_read_before_write(self
):
474 self
.setUp_memory(rd_transparent
=False)
475 with self
.assertSimulation(self
.m
) as sim
:
477 yield self
.wrport
.data
.eq(0x33)
478 yield self
.wrport
.en
.eq(1)
479 yield self
.rdport
.en
.eq(1)
481 self
.assertEqual((yield self
.rdport
.data
), 0xaa)
482 yield Delay(1e-6) # let comb propagate
483 self
.assertEqual((yield self
.rdport
.data
), 0xaa)
485 sim
.add_sync_process(process
)
487 def test_memory_write_through(self
):
488 self
.setUp_memory(rd_transparent
=True)
489 with self
.assertSimulation(self
.m
) as sim
:
491 yield self
.wrport
.data
.eq(0x33)
492 yield self
.wrport
.en
.eq(1)
494 self
.assertEqual((yield self
.rdport
.data
), 0xaa)
495 yield Delay(1e-6) # let comb propagate
496 self
.assertEqual((yield self
.rdport
.data
), 0x33)
498 yield self
.rdport
.addr
.eq(1)
499 yield Delay(1e-6) # let comb propagate
500 self
.assertEqual((yield self
.rdport
.data
), 0x33)
502 sim
.add_sync_process(process
)
504 def test_memory_async_read_write(self
):
505 self
.setUp_memory(rd_synchronous
=False)
506 with self
.assertSimulation(self
.m
) as sim
:
508 yield self
.rdport
.addr
.eq(0)
510 self
.assertEqual((yield self
.rdport
.data
), 0xaa)
511 yield self
.rdport
.addr
.eq(1)
513 self
.assertEqual((yield self
.rdport
.data
), 0x55)
514 yield self
.rdport
.addr
.eq(0)
515 yield self
.wrport
.addr
.eq(0)
516 yield self
.wrport
.data
.eq(0x33)
517 yield self
.wrport
.en
.eq(1)
519 self
.assertEqual((yield self
.rdport
.data
), 0xaa)
520 yield Delay(1e-6) # let comb propagate
521 self
.assertEqual((yield self
.rdport
.data
), 0x33)
523 sim
.add_process(process
)