1 from contextlib
import contextmanager
4 from ..tools
import flatten
, union
5 from ..hdl
.ast
import *
7 from ..hdl
.mem
import *
8 from ..hdl
.rec
import *
9 from ..hdl
.dsl
import *
10 from ..hdl
.ir
import *
11 from ..back
.pysim
import *
14 class SimulatorUnitTestCase(FHDLTestCase
):
15 def assertStatement(self
, stmt
, inputs
, output
, reset
=0):
16 inputs
= [Value
.wrap(i
) for i
in inputs
]
17 output
= Value
.wrap(output
)
19 isigs
= [Signal(i
.shape(), name
=n
) for i
, n
in zip(inputs
, "abcd")]
20 osig
= Signal(output
.shape(), name
="y", reset
=reset
)
22 stmt
= stmt(osig
, *isigs
)
24 frag
.add_statements(stmt
)
25 for signal
in flatten(s
._lhs
_signals
() for s
in Statement
.wrap(stmt
)):
26 frag
.add_driver(signal
)
29 vcd_file
=open("test.vcd", "w"),
30 gtkw_file
=open("test.gtkw", "w"),
31 traces
=[*isigs
, osig
]) as sim
:
33 for isig
, input in zip(isigs
, inputs
):
36 self
.assertEqual((yield osig
), output
.value
)
37 sim
.add_process(process
)
40 def test_invert(self
):
41 stmt
= lambda y
, a
: y
.eq(~a
)
42 self
.assertStatement(stmt
, [C(0b0000, 4)], C(0b1111, 4))
43 self
.assertStatement(stmt
, [C(0b1010, 4)], C(0b0101, 4))
44 self
.assertStatement(stmt
, [C(0, 4)], C(-1, 4))
47 stmt
= lambda y
, a
: y
.eq(-a
)
48 self
.assertStatement(stmt
, [C(0b0000, 4)], C(0b0000, 4))
49 self
.assertStatement(stmt
, [C(0b0001, 4)], C(0b1111, 4))
50 self
.assertStatement(stmt
, [C(0b1010, 4)], C(0b0110, 4))
51 self
.assertStatement(stmt
, [C(1, 4)], C(-1, 4))
52 self
.assertStatement(stmt
, [C(5, 4)], C(-5, 4))
55 stmt
= lambda y
, a
: y
.eq(a
.bool())
56 self
.assertStatement(stmt
, [C(0, 4)], C(0))
57 self
.assertStatement(stmt
, [C(1, 4)], C(1))
58 self
.assertStatement(stmt
, [C(2, 4)], C(1))
61 stmt
= lambda y
, a
, b
: y
.eq(a
+ b
)
62 self
.assertStatement(stmt
, [C(0, 4), C(1, 4)], C(1, 4))
63 self
.assertStatement(stmt
, [C(-5, 4), C(-5, 4)], C(-10, 5))
66 stmt
= lambda y
, a
, b
: y
.eq(a
- b
)
67 self
.assertStatement(stmt
, [C(2, 4), C(1, 4)], C(1, 4))
68 self
.assertStatement(stmt
, [C(0, 4), C(1, 4)], C(-1, 4))
69 self
.assertStatement(stmt
, [C(0, 4), C(10, 4)], C(-10, 5))
72 stmt
= lambda y
, a
, b
: y
.eq(a
* b
)
73 self
.assertStatement(stmt
, [C(2, 4), C(1, 4)], C(2, 8))
74 self
.assertStatement(stmt
, [C(2, 4), C(2, 4)], C(4, 8))
75 self
.assertStatement(stmt
, [C(7, 4), C(7, 4)], C(49, 8))
78 stmt
= lambda y
, a
, b
: y
.eq(a
& b
)
79 self
.assertStatement(stmt
, [C(0b1100, 4), C(0b1010, 4)], C(0b1000, 4))
82 stmt
= lambda y
, a
, b
: y
.eq(a | b
)
83 self
.assertStatement(stmt
, [C(0b1100, 4), C(0b1010, 4)], C(0b1110, 4))
86 stmt
= lambda y
, a
, b
: y
.eq(a ^ b
)
87 self
.assertStatement(stmt
, [C(0b1100, 4), C(0b1010, 4)], C(0b0110, 4))
90 stmt
= lambda y
, a
, b
: y
.eq(a
<< b
)
91 self
.assertStatement(stmt
, [C(0b1001, 4), C(0)], C(0b1001, 5))
92 self
.assertStatement(stmt
, [C(0b1001, 4), C(3)], C(0b1001000, 7))
93 self
.assertStatement(stmt
, [C(0b1001, 4), C(-2)], C(0b10, 7))
96 stmt
= lambda y
, a
, b
: y
.eq(a
>> b
)
97 self
.assertStatement(stmt
, [C(0b1001, 4), C(0)], C(0b1001, 4))
98 self
.assertStatement(stmt
, [C(0b1001, 4), C(2)], C(0b10, 4))
99 self
.assertStatement(stmt
, [C(0b1001, 4), C(-2)], C(0b100100, 5))
102 stmt
= lambda y
, a
, b
: y
.eq(a
== b
)
103 self
.assertStatement(stmt
, [C(0, 4), C(0, 4)], C(1))
104 self
.assertStatement(stmt
, [C(0, 4), C(1, 4)], C(0))
105 self
.assertStatement(stmt
, [C(1, 4), C(0, 4)], C(0))
108 stmt
= lambda y
, a
, b
: y
.eq(a
!= b
)
109 self
.assertStatement(stmt
, [C(0, 4), C(0, 4)], C(0))
110 self
.assertStatement(stmt
, [C(0, 4), C(1, 4)], C(1))
111 self
.assertStatement(stmt
, [C(1, 4), C(0, 4)], C(1))
114 stmt
= lambda y
, a
, b
: y
.eq(a
< b
)
115 self
.assertStatement(stmt
, [C(0, 4), C(0, 4)], C(0))
116 self
.assertStatement(stmt
, [C(0, 4), C(1, 4)], C(1))
117 self
.assertStatement(stmt
, [C(1, 4), C(0, 4)], C(0))
120 stmt
= lambda y
, a
, b
: y
.eq(a
>= b
)
121 self
.assertStatement(stmt
, [C(0, 4), C(0, 4)], C(1))
122 self
.assertStatement(stmt
, [C(0, 4), C(1, 4)], C(0))
123 self
.assertStatement(stmt
, [C(1, 4), C(0, 4)], C(1))
126 stmt
= lambda y
, a
, b
: y
.eq(a
> b
)
127 self
.assertStatement(stmt
, [C(0, 4), C(0, 4)], C(0))
128 self
.assertStatement(stmt
, [C(0, 4), C(1, 4)], C(0))
129 self
.assertStatement(stmt
, [C(1, 4), C(0, 4)], C(1))
132 stmt
= lambda y
, a
, b
: y
.eq(a
<= b
)
133 self
.assertStatement(stmt
, [C(0, 4), C(0, 4)], C(1))
134 self
.assertStatement(stmt
, [C(0, 4), C(1, 4)], C(1))
135 self
.assertStatement(stmt
, [C(1, 4), C(0, 4)], C(0))
138 stmt
= lambda y
, a
, b
, c
: y
.eq(Mux(c
, a
, b
))
139 self
.assertStatement(stmt
, [C(2, 4), C(3, 4), C(0)], C(3, 4))
140 self
.assertStatement(stmt
, [C(2, 4), C(3, 4), C(1)], C(2, 4))
142 def test_slice(self
):
143 stmt1
= lambda y
, a
: y
.eq(a
[2])
144 self
.assertStatement(stmt1
, [C(0b10110100, 8)], C(0b1, 1))
145 stmt2
= lambda y
, a
: y
.eq(a
[2:4])
146 self
.assertStatement(stmt2
, [C(0b10110100, 8)], C(0b01, 2))
148 def test_slice_lhs(self
):
149 stmt1
= lambda y
, a
: y
[2].eq(a
)
150 self
.assertStatement(stmt1
, [C(0b0, 1)], C(0b11111011, 8), reset
=0b11111111)
151 stmt2
= lambda y
, a
: y
[2:4].eq(a
)
152 self
.assertStatement(stmt2
, [C(0b01, 2)], C(0b11110111, 8), reset
=0b11111011)
155 stmt
= lambda y
, a
, b
: y
.eq(a
.part(b
, 3))
156 self
.assertStatement(stmt
, [C(0b10110100, 8), C(0)], C(0b100, 3))
157 self
.assertStatement(stmt
, [C(0b10110100, 8), C(2)], C(0b101, 3))
158 self
.assertStatement(stmt
, [C(0b10110100, 8), C(3)], C(0b110, 3))
160 def test_part_lhs(self
):
161 stmt
= lambda y
, a
, b
: y
.part(a
, 3).eq(b
)
162 self
.assertStatement(stmt
, [C(0), C(0b100, 3)], C(0b11111100, 8), reset
=0b11111111)
163 self
.assertStatement(stmt
, [C(2), C(0b101, 3)], C(0b11110111, 8), reset
=0b11111111)
164 self
.assertStatement(stmt
, [C(3), C(0b110, 3)], C(0b11110111, 8), reset
=0b11111111)
167 stmt
= lambda y
, *xs
: y
.eq(Cat(*xs
))
168 self
.assertStatement(stmt
, [C(0b10, 2), C(0b01, 2)], C(0b0110, 4))
170 def test_cat_lhs(self
):
174 stmt
= lambda y
, a
: [Cat(l
, m
, n
).eq(a
), y
.eq(Cat(n
, m
, l
))]
175 self
.assertStatement(stmt
, [C(0b100101110, 9)], C(0b110101100, 9))
177 def test_record(self
):
182 stmt
= lambda y
, a
: [rec
.eq(a
), y
.eq(rec
)]
183 self
.assertStatement(stmt
, [C(0b101, 3)], C(0b101, 3))
186 stmt
= lambda y
, a
: y
.eq(Repl(a
, 3))
187 self
.assertStatement(stmt
, [C(0b10, 2)], C(0b101010, 6))
189 def test_array(self
):
190 array
= Array([1, 4, 10])
191 stmt
= lambda y
, a
: y
.eq(array
[a
])
192 self
.assertStatement(stmt
, [C(0)], C(1))
193 self
.assertStatement(stmt
, [C(1)], C(4))
194 self
.assertStatement(stmt
, [C(2)], C(10))
196 def test_array_oob(self
):
197 array
= Array([1, 4, 10])
198 stmt
= lambda y
, a
: y
.eq(array
[a
])
199 self
.assertStatement(stmt
, [C(3)], C(10))
200 self
.assertStatement(stmt
, [C(4)], C(10))
202 def test_array_lhs(self
):
203 l
= Signal(3, reset
=1)
204 m
= Signal(3, reset
=4)
205 n
= Signal(3, reset
=7)
206 array
= Array([l
, m
, n
])
207 stmt
= lambda y
, a
, b
: [array
[a
].eq(b
), y
.eq(Cat(*array
))]
208 self
.assertStatement(stmt
, [C(0), C(0b000)], C(0b111100000))
209 self
.assertStatement(stmt
, [C(1), C(0b010)], C(0b111010001))
210 self
.assertStatement(stmt
, [C(2), C(0b100)], C(0b100100001))
212 def test_array_lhs_oob(self
):
216 array
= Array([l
, m
, n
])
217 stmt
= lambda y
, a
, b
: [array
[a
].eq(b
), y
.eq(Cat(*array
))]
218 self
.assertStatement(stmt
, [C(3), C(0b001)], C(0b001000000))
219 self
.assertStatement(stmt
, [C(4), C(0b010)], C(0b010000000))
221 def test_array_index(self
):
222 array
= Array(Array(x
* y
for y
in range(10)) for x
in range(10))
223 stmt
= lambda y
, a
, b
: y
.eq(array
[a
][b
])
226 self
.assertStatement(stmt
, [C(x
), C(y
)], C(x
* y
))
228 def test_array_attr(self
):
229 from collections
import namedtuple
230 pair
= namedtuple("pair", ("p", "n"))
232 array
= Array(pair(x
, -x
) for x
in range(10))
233 stmt
= lambda y
, a
: y
.eq(array
[a
].p
+ array
[a
].n
)
235 self
.assertStatement(stmt
, [C(i
)], C(0))
238 class SimulatorIntegrationTestCase(FHDLTestCase
):
240 def assertSimulation(self
, module
, deadline
=None):
241 with
Simulator(module
.lower(platform
=None)) as sim
:
246 sim
.run_until(deadline
)
248 def setUp_counter(self
):
249 self
.count
= Signal(3, reset
=4)
250 self
.sync
= ClockDomain()
253 self
.m
.d
.sync
+= self
.count
.eq(self
.count
+ 1)
254 self
.m
.domains
+= self
.sync
256 def test_counter_process(self
):
258 with self
.assertSimulation(self
.m
) as sim
:
260 self
.assertEqual((yield self
.count
), 4)
262 self
.assertEqual((yield self
.count
), 4)
263 yield self
.sync
.clk
.eq(1)
264 self
.assertEqual((yield self
.count
), 5)
266 self
.assertEqual((yield self
.count
), 5)
267 yield self
.sync
.clk
.eq(0)
268 self
.assertEqual((yield self
.count
), 5)
271 yield self
.sync
.clk
.eq(1)
273 yield self
.sync
.clk
.eq(0)
274 self
.assertEqual((yield self
.count
), 0)
275 sim
.add_process(process
)
277 def test_counter_clock_and_sync_process(self
):
279 with self
.assertSimulation(self
.m
) as sim
:
280 sim
.add_clock(1e-6, domain
="sync")
282 self
.assertEqual((yield self
.count
), 4)
283 self
.assertEqual((yield self
.sync
.clk
), 0)
285 self
.assertEqual((yield self
.count
), 5)
286 self
.assertEqual((yield self
.sync
.clk
), 1)
289 self
.assertEqual((yield self
.count
), 0)
290 sim
.add_sync_process(process
)
298 self
.sync
= ClockDomain(reset_less
=True)
301 self
.m
.d
.comb
+= self
.x
.eq(self
.a ^ self
.b
)
302 with self
.m
.Switch(self
.s
):
304 self
.m
.d
.sync
+= self
.o
.eq(self
.a
+ self
.b
)
306 self
.m
.d
.sync
+= self
.o
.eq(self
.a
- self
.b
)
308 self
.m
.d
.sync
+= self
.o
.eq(0)
309 self
.m
.domains
+= self
.sync
313 with self
.assertSimulation(self
.m
) as sim
:
319 self
.assertEqual((yield self
.x
), 4)
320 self
.assertEqual((yield self
.o
), 6)
323 self
.assertEqual((yield self
.o
), 4)
326 self
.assertEqual((yield self
.o
), 0)
327 sim
.add_sync_process(process
)
329 def setUp_multiclock(self
):
330 self
.sys
= ClockDomain()
331 self
.pix
= ClockDomain()
334 self
.m
.domains
+= self
.sys
, self
.pix
336 def test_multiclock(self
):
337 self
.setUp_multiclock()
338 with self
.assertSimulation(self
.m
) as sim
:
339 sim
.add_clock(1e-6, domain
="sys")
340 sim
.add_clock(0.3e-6, domain
="pix")
351 sim
.add_sync_process(sys_process
, domain
="sys")
352 sim
.add_sync_process(pix_process
, domain
="pix")
354 def setUp_lhs_rhs(self
):
359 self
.m
.d
.comb
+= self
.o
.eq(self
.i
)
361 def test_complex_lhs_rhs(self
):
363 with self
.assertSimulation(self
.m
) as sim
:
365 yield self
.i
.eq(0b10101010)
366 yield self
.i
[:4].eq(-1)
368 self
.assertEqual((yield self
.i
[:4]), 0b1111)
369 self
.assertEqual((yield self
.i
), 0b10101111)
370 sim
.add_process(process
)
372 def test_run_until(self
):
373 with self
.assertSimulation(Module(), deadline
=100e-6) as sim
:
380 def test_add_process_wrong(self
):
381 with self
.assertSimulation(Module()) as sim
:
382 with self
.assertRaises(TypeError,
383 msg
="Cannot add a process '1' because it is not a generator or "
384 "a generator function"):
387 def test_eq_signal_unused_wrong(self
):
390 with self
.assertSimulation(self
.m
) as sim
:
392 with self
.assertRaisesRegex(ValueError,
393 regex
=r
"Process '.+?' sent a request to set signal '\(sig s\)', "
394 r
"which is not a part of simulation"):
397 sim
.add_process(process
)
399 def test_eq_signal_comb_wrong(self
):
401 with self
.assertSimulation(self
.m
) as sim
:
403 with self
.assertRaisesRegex(ValueError,
404 regex
=r
"Process '.+?' sent a request to set signal '\(sig o\)', "
405 r
"which is a part of combinatorial assignment in simulation"):
408 sim
.add_process(process
)
410 def test_command_wrong(self
):
411 with self
.assertSimulation(Module()) as sim
:
413 with self
.assertRaisesRegex(TypeError,
414 regex
=r
"Received unsupported command '1' from process '.+?'"):
417 sim
.add_process(process
)
419 def setUp_memory(self
, rd_synchronous
=True, rd_transparent
=True, wr_granularity
=None):
421 self
.memory
= Memory(width
=8, depth
=4, init
=[0xaa, 0x55])
422 self
.m
.submodules
.rdport
= self
.rdport
= \
423 self
.memory
.read_port(synchronous
=rd_synchronous
, transparent
=rd_transparent
)
424 self
.m
.submodules
.wrport
= self
.wrport
= \
425 self
.memory
.write_port(granularity
=wr_granularity
)
427 def test_memory_init(self
):
429 with self
.assertSimulation(self
.m
) as sim
:
431 self
.assertEqual((yield self
.rdport
.data
), 0xaa)
432 yield self
.rdport
.addr
.eq(1)
435 self
.assertEqual((yield self
.rdport
.data
), 0x55)
436 yield self
.rdport
.addr
.eq(2)
439 self
.assertEqual((yield self
.rdport
.data
), 0x00)
441 sim
.add_sync_process(process
)
443 def test_memory_write(self
):
445 with self
.assertSimulation(self
.m
) as sim
:
447 yield self
.wrport
.addr
.eq(4)
448 yield self
.wrport
.data
.eq(0x33)
449 yield self
.wrport
.en
.eq(1)
451 yield self
.wrport
.en
.eq(0)
452 yield self
.rdport
.addr
.eq(4)
454 self
.assertEqual((yield self
.rdport
.data
), 0x33)
456 sim
.add_sync_process(process
)
458 def test_memory_write_granularity(self
):
459 self
.setUp_memory(wr_granularity
=4)
460 with self
.assertSimulation(self
.m
) as sim
:
462 yield self
.wrport
.data
.eq(0x50)
463 yield self
.wrport
.en
.eq(0b00)
465 yield self
.wrport
.en
.eq(0)
467 self
.assertEqual((yield self
.rdport
.data
), 0xaa)
468 yield self
.wrport
.en
.eq(0b10)
470 yield self
.wrport
.en
.eq(0)
472 self
.assertEqual((yield self
.rdport
.data
), 0x5a)
473 yield self
.wrport
.data
.eq(0x33)
474 yield self
.wrport
.en
.eq(0b01)
476 yield self
.wrport
.en
.eq(0)
478 self
.assertEqual((yield self
.rdport
.data
), 0x53)
480 sim
.add_sync_process(process
)
482 def test_memory_read_before_write(self
):
483 self
.setUp_memory(rd_transparent
=False)
484 with self
.assertSimulation(self
.m
) as sim
:
486 yield self
.wrport
.data
.eq(0x33)
487 yield self
.wrport
.en
.eq(1)
488 yield self
.rdport
.en
.eq(1)
490 self
.assertEqual((yield self
.rdport
.data
), 0xaa)
491 yield Delay(1e-6) # let comb propagate
492 self
.assertEqual((yield self
.rdport
.data
), 0xaa)
494 sim
.add_sync_process(process
)
496 def test_memory_write_through(self
):
497 self
.setUp_memory(rd_transparent
=True)
498 with self
.assertSimulation(self
.m
) as sim
:
500 yield self
.wrport
.data
.eq(0x33)
501 yield self
.wrport
.en
.eq(1)
503 self
.assertEqual((yield self
.rdport
.data
), 0xaa)
504 yield Delay(1e-6) # let comb propagate
505 self
.assertEqual((yield self
.rdport
.data
), 0x33)
507 yield self
.rdport
.addr
.eq(1)
508 yield Delay(1e-6) # let comb propagate
509 self
.assertEqual((yield self
.rdport
.data
), 0x33)
511 sim
.add_sync_process(process
)
513 def test_memory_async_read_write(self
):
514 self
.setUp_memory(rd_synchronous
=False)
515 with self
.assertSimulation(self
.m
) as sim
:
517 yield self
.rdport
.addr
.eq(0)
519 self
.assertEqual((yield self
.rdport
.data
), 0xaa)
520 yield self
.rdport
.addr
.eq(1)
522 self
.assertEqual((yield self
.rdport
.data
), 0x55)
523 yield self
.rdport
.addr
.eq(0)
524 yield self
.wrport
.addr
.eq(0)
525 yield self
.wrport
.data
.eq(0x33)
526 yield self
.wrport
.en
.eq(1)
528 self
.assertEqual((yield self
.rdport
.data
), 0xaa)
529 yield Delay(1e-6) # let comb propagate
530 self
.assertEqual((yield self
.rdport
.data
), 0x33)
532 sim
.add_process(process
)
534 def test_wrong_not_run(self
):
535 with self
.assertWarns(UserWarning,
536 msg
="Simulation created, but not run"):
537 with
Simulator(Fragment()) as sim
: