1 from contextlib
import contextmanager
4 from ..tools
import flatten
, union
5 from ..hdl
.ast
import *
7 from ..hdl
.mem
import *
8 from ..hdl
.rec
import *
9 from ..hdl
.dsl
import *
10 from ..hdl
.ir
import *
11 from ..back
.pysim
import *
14 class SimulatorUnitTestCase(FHDLTestCase
):
15 def assertStatement(self
, stmt
, inputs
, output
, reset
=0):
16 inputs
= [Value
.wrap(i
) for i
in inputs
]
17 output
= Value
.wrap(output
)
19 isigs
= [Signal(i
.shape(), name
=n
) for i
, n
in zip(inputs
, "abcd")]
20 osig
= Signal(output
.shape(), name
="y", reset
=reset
)
22 stmt
= stmt(osig
, *isigs
)
24 frag
.add_statements(stmt
)
25 for signal
in flatten(s
._lhs
_signals
() for s
in Statement
.wrap(stmt
)):
26 frag
.add_driver(signal
)
29 vcd_file
=open("test.vcd", "w"),
30 gtkw_file
=open("test.gtkw", "w"),
31 traces
=[*isigs
, osig
]) as sim
:
33 for isig
, input in zip(isigs
, inputs
):
36 self
.assertEqual((yield osig
), output
.value
)
37 sim
.add_process(process
)
40 def test_invert(self
):
41 stmt
= lambda y
, a
: y
.eq(~a
)
42 self
.assertStatement(stmt
, [C(0b0000, 4)], C(0b1111, 4))
43 self
.assertStatement(stmt
, [C(0b1010, 4)], C(0b0101, 4))
44 self
.assertStatement(stmt
, [C(0, 4)], C(-1, 4))
47 stmt
= lambda y
, a
: y
.eq(-a
)
48 self
.assertStatement(stmt
, [C(0b0000, 4)], C(0b0000, 4))
49 self
.assertStatement(stmt
, [C(0b0001, 4)], C(0b1111, 4))
50 self
.assertStatement(stmt
, [C(0b1010, 4)], C(0b0110, 4))
51 self
.assertStatement(stmt
, [C(1, 4)], C(-1, 4))
52 self
.assertStatement(stmt
, [C(5, 4)], C(-5, 4))
55 stmt
= lambda y
, a
: y
.eq(a
.bool())
56 self
.assertStatement(stmt
, [C(0, 4)], C(0))
57 self
.assertStatement(stmt
, [C(1, 4)], C(1))
58 self
.assertStatement(stmt
, [C(2, 4)], C(1))
61 stmt
= lambda y
, a
, b
: y
.eq(a
+ b
)
62 self
.assertStatement(stmt
, [C(0, 4), C(1, 4)], C(1, 4))
63 self
.assertStatement(stmt
, [C(-5, 4), C(-5, 4)], C(-10, 5))
66 stmt
= lambda y
, a
, b
: y
.eq(a
- b
)
67 self
.assertStatement(stmt
, [C(2, 4), C(1, 4)], C(1, 4))
68 self
.assertStatement(stmt
, [C(0, 4), C(1, 4)], C(-1, 4))
69 self
.assertStatement(stmt
, [C(0, 4), C(10, 4)], C(-10, 5))
72 stmt
= lambda y
, a
, b
: y
.eq(a
* b
)
73 self
.assertStatement(stmt
, [C(2, 4), C(1, 4)], C(2, 8))
74 self
.assertStatement(stmt
, [C(2, 4), C(2, 4)], C(4, 8))
75 self
.assertStatement(stmt
, [C(7, 4), C(7, 4)], C(49, 8))
78 stmt
= lambda y
, a
, b
: y
.eq(a
& b
)
79 self
.assertStatement(stmt
, [C(0b1100, 4), C(0b1010, 4)], C(0b1000, 4))
82 stmt
= lambda y
, a
, b
: y
.eq(a | b
)
83 self
.assertStatement(stmt
, [C(0b1100, 4), C(0b1010, 4)], C(0b1110, 4))
86 stmt
= lambda y
, a
, b
: y
.eq(a ^ b
)
87 self
.assertStatement(stmt
, [C(0b1100, 4), C(0b1010, 4)], C(0b0110, 4))
90 stmt
= lambda y
, a
, b
: y
.eq(a
<< b
)
91 self
.assertStatement(stmt
, [C(0b1001, 4), C(0)], C(0b1001, 5))
92 self
.assertStatement(stmt
, [C(0b1001, 4), C(3)], C(0b1001000, 7))
93 self
.assertStatement(stmt
, [C(0b1001, 4), C(-2)], C(0b10, 7))
96 stmt
= lambda y
, a
, b
: y
.eq(a
>> b
)
97 self
.assertStatement(stmt
, [C(0b1001, 4), C(0)], C(0b1001, 4))
98 self
.assertStatement(stmt
, [C(0b1001, 4), C(2)], C(0b10, 4))
99 self
.assertStatement(stmt
, [C(0b1001, 4), C(-2)], C(0b100100, 5))
102 stmt
= lambda y
, a
, b
: y
.eq(a
== b
)
103 self
.assertStatement(stmt
, [C(0, 4), C(0, 4)], C(1))
104 self
.assertStatement(stmt
, [C(0, 4), C(1, 4)], C(0))
105 self
.assertStatement(stmt
, [C(1, 4), C(0, 4)], C(0))
108 stmt
= lambda y
, a
, b
: y
.eq(a
!= b
)
109 self
.assertStatement(stmt
, [C(0, 4), C(0, 4)], C(0))
110 self
.assertStatement(stmt
, [C(0, 4), C(1, 4)], C(1))
111 self
.assertStatement(stmt
, [C(1, 4), C(0, 4)], C(1))
114 stmt
= lambda y
, a
, b
: y
.eq(a
< b
)
115 self
.assertStatement(stmt
, [C(0, 4), C(0, 4)], C(0))
116 self
.assertStatement(stmt
, [C(0, 4), C(1, 4)], C(1))
117 self
.assertStatement(stmt
, [C(1, 4), C(0, 4)], C(0))
120 stmt
= lambda y
, a
, b
: y
.eq(a
>= b
)
121 self
.assertStatement(stmt
, [C(0, 4), C(0, 4)], C(1))
122 self
.assertStatement(stmt
, [C(0, 4), C(1, 4)], C(0))
123 self
.assertStatement(stmt
, [C(1, 4), C(0, 4)], C(1))
126 stmt
= lambda y
, a
, b
: y
.eq(a
> b
)
127 self
.assertStatement(stmt
, [C(0, 4), C(0, 4)], C(0))
128 self
.assertStatement(stmt
, [C(0, 4), C(1, 4)], C(0))
129 self
.assertStatement(stmt
, [C(1, 4), C(0, 4)], C(1))
132 stmt
= lambda y
, a
, b
: y
.eq(a
<= b
)
133 self
.assertStatement(stmt
, [C(0, 4), C(0, 4)], C(1))
134 self
.assertStatement(stmt
, [C(0, 4), C(1, 4)], C(1))
135 self
.assertStatement(stmt
, [C(1, 4), C(0, 4)], C(0))
138 stmt
= lambda y
, a
, b
, c
: y
.eq(Mux(c
, a
, b
))
139 self
.assertStatement(stmt
, [C(2, 4), C(3, 4), C(0)], C(3, 4))
140 self
.assertStatement(stmt
, [C(2, 4), C(3, 4), C(1)], C(2, 4))
142 def test_slice(self
):
143 stmt1
= lambda y
, a
: y
.eq(a
[2])
144 self
.assertStatement(stmt1
, [C(0b10110100, 8)], C(0b1, 1))
145 stmt2
= lambda y
, a
: y
.eq(a
[2:4])
146 self
.assertStatement(stmt2
, [C(0b10110100, 8)], C(0b01, 2))
148 def test_slice_lhs(self
):
149 stmt1
= lambda y
, a
: y
[2].eq(a
)
150 self
.assertStatement(stmt1
, [C(0b0, 1)], C(0b11111011, 8), reset
=0b11111111)
151 stmt2
= lambda y
, a
: y
[2:4].eq(a
)
152 self
.assertStatement(stmt2
, [C(0b01, 2)], C(0b11110111, 8), reset
=0b11111011)
155 stmt
= lambda y
, a
, b
: y
.eq(a
.part(b
, 3))
156 self
.assertStatement(stmt
, [C(0b10110100, 8), C(0)], C(0b100, 3))
157 self
.assertStatement(stmt
, [C(0b10110100, 8), C(2)], C(0b101, 3))
158 self
.assertStatement(stmt
, [C(0b10110100, 8), C(3)], C(0b110, 3))
160 def test_part_lhs(self
):
161 stmt
= lambda y
, a
, b
: y
.part(a
, 3).eq(b
)
162 self
.assertStatement(stmt
, [C(0), C(0b100, 3)], C(0b11111100, 8), reset
=0b11111111)
163 self
.assertStatement(stmt
, [C(2), C(0b101, 3)], C(0b11110111, 8), reset
=0b11111111)
164 self
.assertStatement(stmt
, [C(3), C(0b110, 3)], C(0b11110111, 8), reset
=0b11111111)
167 stmt
= lambda y
, *xs
: y
.eq(Cat(*xs
))
168 self
.assertStatement(stmt
, [C(0b10, 2), C(0b01, 2)], C(0b0110, 4))
170 def test_cat_lhs(self
):
174 stmt
= lambda y
, a
: [Cat(l
, m
, n
).eq(a
), y
.eq(Cat(n
, m
, l
))]
175 self
.assertStatement(stmt
, [C(0b100101110, 9)], C(0b110101100, 9))
177 def test_record(self
):
182 stmt
= lambda y
, a
: [rec
.eq(a
), y
.eq(rec
)]
183 self
.assertStatement(stmt
, [C(0b101, 3)], C(0b101, 3))
186 stmt
= lambda y
, a
: y
.eq(Repl(a
, 3))
187 self
.assertStatement(stmt
, [C(0b10, 2)], C(0b101010, 6))
189 def test_array(self
):
190 array
= Array([1, 4, 10])
191 stmt
= lambda y
, a
: y
.eq(array
[a
])
192 self
.assertStatement(stmt
, [C(0)], C(1))
193 self
.assertStatement(stmt
, [C(1)], C(4))
194 self
.assertStatement(stmt
, [C(2)], C(10))
196 def test_array_oob(self
):
197 array
= Array([1, 4, 10])
198 stmt
= lambda y
, a
: y
.eq(array
[a
])
199 self
.assertStatement(stmt
, [C(3)], C(10))
200 self
.assertStatement(stmt
, [C(4)], C(10))
202 def test_array_lhs(self
):
203 l
= Signal(3, reset
=1)
204 m
= Signal(3, reset
=4)
205 n
= Signal(3, reset
=7)
206 array
= Array([l
, m
, n
])
207 stmt
= lambda y
, a
, b
: [array
[a
].eq(b
), y
.eq(Cat(*array
))]
208 self
.assertStatement(stmt
, [C(0), C(0b000)], C(0b111100000))
209 self
.assertStatement(stmt
, [C(1), C(0b010)], C(0b111010001))
210 self
.assertStatement(stmt
, [C(2), C(0b100)], C(0b100100001))
212 def test_array_lhs_oob(self
):
216 array
= Array([l
, m
, n
])
217 stmt
= lambda y
, a
, b
: [array
[a
].eq(b
), y
.eq(Cat(*array
))]
218 self
.assertStatement(stmt
, [C(3), C(0b001)], C(0b001000000))
219 self
.assertStatement(stmt
, [C(4), C(0b010)], C(0b010000000))
221 def test_array_index(self
):
222 array
= Array(Array(x
* y
for y
in range(10)) for x
in range(10))
223 stmt
= lambda y
, a
, b
: y
.eq(array
[a
][b
])
226 self
.assertStatement(stmt
, [C(x
), C(y
)], C(x
* y
))
228 def test_array_attr(self
):
229 from collections
import namedtuple
230 pair
= namedtuple("pair", ("p", "n"))
232 array
= Array(pair(x
, -x
) for x
in range(10))
233 stmt
= lambda y
, a
: y
.eq(array
[a
].p
+ array
[a
].n
)
235 self
.assertStatement(stmt
, [C(i
)], C(0))
238 class SimulatorIntegrationTestCase(FHDLTestCase
):
240 def assertSimulation(self
, module
, deadline
=None):
241 with
Simulator(module
.elaborate(platform
=None)) as sim
:
246 sim
.run_until(deadline
)
248 def setUp_counter(self
):
249 self
.count
= Signal(3, reset
=4)
250 self
.sync
= ClockDomain()
253 self
.m
.d
.sync
+= self
.count
.eq(self
.count
+ 1)
254 self
.m
.domains
+= self
.sync
256 def test_counter_process(self
):
258 with self
.assertSimulation(self
.m
) as sim
:
260 self
.assertEqual((yield self
.count
), 4)
262 self
.assertEqual((yield self
.count
), 4)
263 yield self
.sync
.clk
.eq(1)
264 self
.assertEqual((yield self
.count
), 5)
266 self
.assertEqual((yield self
.count
), 5)
267 yield self
.sync
.clk
.eq(0)
268 self
.assertEqual((yield self
.count
), 5)
271 yield self
.sync
.clk
.eq(1)
273 yield self
.sync
.clk
.eq(0)
274 self
.assertEqual((yield self
.count
), 0)
275 sim
.add_process(process
)
277 def test_counter_clock_and_sync_process(self
):
279 with self
.assertSimulation(self
.m
) as sim
:
280 sim
.add_clock(1e-6, domain
="sync")
282 self
.assertEqual((yield self
.count
), 4)
283 self
.assertEqual((yield self
.sync
.clk
), 1)
285 self
.assertEqual((yield self
.count
), 5)
286 self
.assertEqual((yield self
.sync
.clk
), 1)
289 self
.assertEqual((yield self
.count
), 0)
290 sim
.add_sync_process(process
)
298 self
.sync
= ClockDomain(reset_less
=True)
301 self
.m
.d
.comb
+= self
.x
.eq(self
.a ^ self
.b
)
302 with self
.m
.Switch(self
.s
):
304 self
.m
.d
.sync
+= self
.o
.eq(self
.a
+ self
.b
)
306 self
.m
.d
.sync
+= self
.o
.eq(self
.a
- self
.b
)
308 self
.m
.d
.sync
+= self
.o
.eq(0)
309 self
.m
.domains
+= self
.sync
313 with self
.assertSimulation(self
.m
) as sim
:
319 self
.assertEqual((yield self
.x
), 4)
321 self
.assertEqual((yield self
.o
), 6)
325 self
.assertEqual((yield self
.o
), 4)
329 self
.assertEqual((yield self
.o
), 0)
330 sim
.add_sync_process(process
)
332 def setUp_multiclock(self
):
333 self
.sys
= ClockDomain()
334 self
.pix
= ClockDomain()
337 self
.m
.domains
+= self
.sys
, self
.pix
339 def test_multiclock(self
):
340 self
.setUp_multiclock()
341 with self
.assertSimulation(self
.m
) as sim
:
342 sim
.add_clock(1e-6, domain
="sys")
343 sim
.add_clock(0.3e-6, domain
="pix")
354 sim
.add_sync_process(sys_process
, domain
="sys")
355 sim
.add_sync_process(pix_process
, domain
="pix")
357 def setUp_lhs_rhs(self
):
362 self
.m
.d
.comb
+= self
.o
.eq(self
.i
)
364 def test_complex_lhs_rhs(self
):
366 with self
.assertSimulation(self
.m
) as sim
:
368 yield self
.i
.eq(0b10101010)
369 yield self
.i
[:4].eq(-1)
371 self
.assertEqual((yield self
.i
[:4]), 0b1111)
372 self
.assertEqual((yield self
.i
), 0b10101111)
373 sim
.add_process(process
)
375 def test_run_until(self
):
376 with self
.assertSimulation(Module(), deadline
=100e-6) as sim
:
383 def test_add_process_wrong(self
):
384 with self
.assertSimulation(Module()) as sim
:
385 with self
.assertRaises(TypeError,
386 msg
="Cannot add a process '1' because it is not a generator or "
387 "a generator function"):
390 def test_eq_signal_unused_wrong(self
):
393 with self
.assertSimulation(self
.m
) as sim
:
395 with self
.assertRaisesRegex(ValueError,
396 regex
=r
"Process '.+?' sent a request to set signal '\(sig s\)', "
397 r
"which is not a part of simulation"):
400 sim
.add_process(process
)
402 def test_eq_signal_comb_wrong(self
):
404 with self
.assertSimulation(self
.m
) as sim
:
406 with self
.assertRaisesRegex(ValueError,
407 regex
=r
"Process '.+?' sent a request to set signal '\(sig o\)', "
408 r
"which is a part of combinatorial assignment in simulation"):
411 sim
.add_process(process
)
413 def test_command_wrong(self
):
414 with self
.assertSimulation(Module()) as sim
:
416 with self
.assertRaisesRegex(TypeError,
417 regex
=r
"Received unsupported command '1' from process '.+?'"):
420 sim
.add_process(process
)
422 def setUp_memory(self
, rd_synchronous
=True, rd_transparent
=True, wr_granularity
=None):
424 self
.memory
= Memory(width
=8, depth
=4, init
=[0xaa, 0x55])
425 self
.m
.submodules
.rdport
= self
.rdport
= \
426 self
.memory
.read_port(synchronous
=rd_synchronous
, transparent
=rd_transparent
)
427 self
.m
.submodules
.wrport
= self
.wrport
= \
428 self
.memory
.write_port(granularity
=wr_granularity
)
430 def test_memory_init(self
):
432 with self
.assertSimulation(self
.m
) as sim
:
434 self
.assertEqual((yield self
.rdport
.data
), 0xaa)
435 yield self
.rdport
.addr
.eq(1)
438 self
.assertEqual((yield self
.rdport
.data
), 0x55)
439 yield self
.rdport
.addr
.eq(2)
442 self
.assertEqual((yield self
.rdport
.data
), 0x00)
444 sim
.add_sync_process(process
)
446 def test_memory_write(self
):
448 with self
.assertSimulation(self
.m
) as sim
:
450 yield self
.wrport
.addr
.eq(4)
451 yield self
.wrport
.data
.eq(0x33)
452 yield self
.wrport
.en
.eq(1)
454 yield self
.wrport
.en
.eq(0)
455 yield self
.rdport
.addr
.eq(4)
457 self
.assertEqual((yield self
.rdport
.data
), 0x33)
459 sim
.add_sync_process(process
)
461 def test_memory_write_granularity(self
):
462 self
.setUp_memory(wr_granularity
=4)
463 with self
.assertSimulation(self
.m
) as sim
:
465 yield self
.wrport
.data
.eq(0x50)
466 yield self
.wrport
.en
.eq(0b00)
468 yield self
.wrport
.en
.eq(0)
470 self
.assertEqual((yield self
.rdport
.data
), 0xaa)
471 yield self
.wrport
.en
.eq(0b10)
473 yield self
.wrport
.en
.eq(0)
475 self
.assertEqual((yield self
.rdport
.data
), 0x5a)
476 yield self
.wrport
.data
.eq(0x33)
477 yield self
.wrport
.en
.eq(0b01)
479 yield self
.wrport
.en
.eq(0)
481 self
.assertEqual((yield self
.rdport
.data
), 0x53)
483 sim
.add_sync_process(process
)
485 def test_memory_read_before_write(self
):
486 self
.setUp_memory(rd_transparent
=False)
487 with self
.assertSimulation(self
.m
) as sim
:
489 yield self
.wrport
.data
.eq(0x33)
490 yield self
.wrport
.en
.eq(1)
491 yield self
.rdport
.en
.eq(1)
493 self
.assertEqual((yield self
.rdport
.data
), 0x00)
495 self
.assertEqual((yield self
.rdport
.data
), 0xaa)
496 yield Delay(1e-6) # let comb propagate
497 self
.assertEqual((yield self
.rdport
.data
), 0x33)
499 sim
.add_sync_process(process
)
501 def test_memory_write_through(self
):
502 self
.setUp_memory(rd_transparent
=True)
503 with self
.assertSimulation(self
.m
) as sim
:
505 yield self
.wrport
.data
.eq(0x33)
506 yield self
.wrport
.en
.eq(1)
508 self
.assertEqual((yield self
.rdport
.data
), 0xaa)
509 yield Delay(1e-6) # let comb propagate
510 self
.assertEqual((yield self
.rdport
.data
), 0x33)
512 yield self
.rdport
.addr
.eq(1)
513 yield Delay(1e-6) # let comb propagate
514 self
.assertEqual((yield self
.rdport
.data
), 0x33)
516 sim
.add_sync_process(process
)
518 def test_memory_async_read_write(self
):
519 self
.setUp_memory(rd_synchronous
=False)
520 with self
.assertSimulation(self
.m
) as sim
:
522 yield self
.rdport
.addr
.eq(0)
524 self
.assertEqual((yield self
.rdport
.data
), 0xaa)
525 yield self
.rdport
.addr
.eq(1)
527 self
.assertEqual((yield self
.rdport
.data
), 0x55)
528 yield self
.rdport
.addr
.eq(0)
529 yield self
.wrport
.addr
.eq(0)
530 yield self
.wrport
.data
.eq(0x33)
531 yield self
.wrport
.en
.eq(1)
533 self
.assertEqual((yield self
.rdport
.data
), 0xaa)
534 yield Delay(1e-6) # let comb propagate
535 self
.assertEqual((yield self
.rdport
.data
), 0x33)
537 sim
.add_process(process
)
539 def test_memory_read_only(self
):
541 self
.memory
= Memory(width
=8, depth
=4, init
=[0xaa, 0x55])
542 self
.m
.submodules
.rdport
= self
.rdport
= self
.memory
.read_port()
543 with self
.assertSimulation(self
.m
) as sim
:
545 self
.assertEqual((yield self
.rdport
.data
), 0xaa)
546 yield self
.rdport
.addr
.eq(1)
549 self
.assertEqual((yield self
.rdport
.data
), 0x55)
551 sim
.add_sync_process(process
)
553 def test_sample_helpers(self
):
560 p0
, r0
, f0
, s0
= mk(Past(s
, 0)), mk(Rose(s
)), mk(Fell(s
)), mk(Stable(s
))
561 p1
, r1
, f1
, s1
= mk(Past(s
)), mk(Rose(s
, 1)), mk(Fell(s
, 1)), mk(Stable(s
, 1))
562 p2
, r2
, f2
, s2
= mk(Past(s
, 2)), mk(Rose(s
, 2)), mk(Fell(s
, 2)), mk(Stable(s
, 2))
563 p3
, r3
, f3
, s3
= mk(Past(s
, 3)), mk(Rose(s
, 3)), mk(Fell(s
, 3)), mk(Stable(s
, 3))
564 with self
.assertSimulation(m
) as sim
:
576 self
.assertEqual((yield p0
), 0b01)
577 self
.assertEqual((yield p1
), 0b10)
578 self
.assertEqual((yield p2
), 0b10)
579 self
.assertEqual((yield p3
), 0b00)
581 self
.assertEqual((yield s0
), 0b0)
582 self
.assertEqual((yield s1
), 0b1)
583 self
.assertEqual((yield s2
), 0b0)
584 self
.assertEqual((yield s3
), 0b1)
586 self
.assertEqual((yield r0
), 0b01)
587 self
.assertEqual((yield r1
), 0b00)
588 self
.assertEqual((yield r2
), 0b10)
589 self
.assertEqual((yield r3
), 0b00)
591 self
.assertEqual((yield f0
), 0b10)
592 self
.assertEqual((yield f1
), 0b00)
593 self
.assertEqual((yield f2
), 0b00)
594 self
.assertEqual((yield f3
), 0b00)
596 sim
.add_sync_process(process_gen
)
597 sim
.add_sync_process(process_check
)
599 def test_wrong_not_run(self
):
600 with self
.assertWarns(UserWarning,
601 msg
="Simulation created, but not run"):
602 with
Simulator(Fragment()) as sim
: