1 from collections
import OrderedDict
3 from ..hdl
.ast
import *
6 from ..hdl
.mem
import *
10 class BadElaboratable(Elaboratable
):
11 def elaborate(self
, platform
):
15 class FragmentGetTestCase(FHDLTestCase
):
16 def test_get_wrong(self
):
17 with self
.assertRaises(AttributeError,
18 msg
="Object 'None' cannot be elaborated"):
19 Fragment
.get(None, platform
=None)
21 with self
.assertRaises(AttributeError,
22 msg
="Object 'None' cannot be elaborated"):
23 Fragment
.get(BadElaboratable(), platform
=None)
26 class FragmentGeneratedTestCase(FHDLTestCase
):
27 def test_find_subfragment(self
):
30 f1
.add_subfragment(f2
, "f2")
32 self
.assertEqual(f1
.find_subfragment(0), f2
)
33 self
.assertEqual(f1
.find_subfragment("f2"), f2
)
35 def test_find_subfragment_wrong(self
):
38 f1
.add_subfragment(f2
, "f2")
40 with self
.assertRaises(NameError,
41 msg
="No subfragment at index #1"):
42 f1
.find_subfragment(1)
43 with self
.assertRaises(NameError,
44 msg
="No subfragment with name 'fx'"):
45 f1
.find_subfragment("fx")
47 def test_find_generated(self
):
50 f2
.generated
["sig"] = sig
= Signal()
51 f1
.add_subfragment(f2
, "f2")
53 self
.assertEqual(SignalKey(f1
.find_generated("f2", "sig")),
57 class FragmentDriversTestCase(FHDLTestCase
):
60 self
.assertEqual(list(f
.iter_comb()), [])
61 self
.assertEqual(list(f
.iter_sync()), [])
64 class FragmentPortsTestCase(FHDLTestCase
):
75 self
.assertEqual(list(f
.iter_ports()), [])
77 f
._propagate
_ports
(ports
=(), all_undef_as_ports
=True)
78 self
.assertEqual(f
.ports
, SignalDict([]))
80 def test_iter_signals(self
):
82 f
.add_ports(self
.s1
, self
.s2
, dir="io")
83 self
.assertEqual(SignalSet((self
.s1
, self
.s2
)), f
.iter_signals())
85 def test_self_contained(self
):
92 f
._propagate
_ports
(ports
=(), all_undef_as_ports
=True)
93 self
.assertEqual(f
.ports
, SignalDict([]))
95 def test_infer_input(self
):
101 f
._propagate
_ports
(ports
=(), all_undef_as_ports
=True)
102 self
.assertEqual(f
.ports
, SignalDict([
106 def test_request_output(self
):
112 f
._propagate
_ports
(ports
=(self
.c1
,), all_undef_as_ports
=True)
113 self
.assertEqual(f
.ports
, SignalDict([
118 def test_input_in_subfragment(self
):
127 f1
.add_subfragment(f2
)
128 f1
._propagate_ports(ports
=(), all_undef_as_ports
=True)
129 self
.assertEqual(f1
.ports
, SignalDict())
130 self
.assertEqual(f2
.ports
, SignalDict([
134 def test_input_only_in_subfragment(self
):
140 f1
.add_subfragment(f2
)
141 f1
._propagate_ports(ports
=(), all_undef_as_ports
=True)
142 self
.assertEqual(f1
.ports
, SignalDict([
145 self
.assertEqual(f2
.ports
, SignalDict([
149 def test_output_from_subfragment(self
):
158 f1
.add_subfragment(f2
)
160 f1
._propagate_ports(ports
=(self
.c2
,), all_undef_as_ports
=True)
161 self
.assertEqual(f1
.ports
, SignalDict([
164 self
.assertEqual(f2
.ports
, SignalDict([
168 def test_output_from_subfragment_2(self
):
177 f1
.add_subfragment(f2
)
182 f2
.add_subfragment(f3
)
184 f1
._propagate_ports(ports
=(), all_undef_as_ports
=True)
185 self
.assertEqual(f2
.ports
, SignalDict([
189 def test_input_output_sibling(self
):
195 f1
.add_subfragment(f2
)
200 f3
.add_driver(self
.c2
)
201 f1
.add_subfragment(f3
)
203 f1
._propagate_ports(ports
=(), all_undef_as_ports
=True)
204 self
.assertEqual(f1
.ports
, SignalDict())
206 def test_output_input_sibling(self
):
212 f2
.add_driver(self
.c2
)
213 f1
.add_subfragment(f2
)
218 f1
.add_subfragment(f3
)
220 f1
._propagate_ports(ports
=(), all_undef_as_ports
=True)
221 self
.assertEqual(f1
.ports
, SignalDict())
223 def test_input_cd(self
):
230 f
.add_driver(self
.c1
, "sync")
232 f
._propagate
_ports
(ports
=(), all_undef_as_ports
=True)
233 self
.assertEqual(f
.ports
, SignalDict([
239 def test_input_cd_reset_less(self
):
240 sync
= ClockDomain(reset_less
=True)
246 f
.add_driver(self
.c1
, "sync")
248 f
._propagate
_ports
(ports
=(), all_undef_as_ports
=True)
249 self
.assertEqual(f
.ports
, SignalDict([
254 def test_inout(self
):
257 f2
= Instance("foo", io_x
=s
)
258 f1
.add_subfragment(f2
)
260 f1
._propagate_ports(ports
=(), all_undef_as_ports
=True)
261 self
.assertEqual(f1
.ports
, SignalDict([
266 class FragmentDomainsTestCase(FHDLTestCase
):
267 def test_iter_signals(self
):
269 cd2
= ClockDomain(reset_less
=True)
274 f
.add_domains(cd1
, cd2
)
275 f
.add_driver(s1
, "cd1")
276 self
.assertEqual(SignalSet((cd1
.clk
, cd1
.rst
, s1
)), f
.iter_signals())
277 f
.add_driver(s2
, "cd2")
278 self
.assertEqual(SignalSet((cd1
.clk
, cd1
.rst
, cd2
.clk
, s1
, s2
)), f
.iter_signals())
280 def test_propagate_up(self
):
285 f1
.add_subfragment(f2
)
288 f1
._propagate_domains_up()
289 self
.assertEqual(f1
.domains
, {"cd": cd
})
291 def test_domain_conflict(self
):
292 cda
= ClockDomain("sync")
293 cdb
= ClockDomain("sync")
300 f
.add_subfragment(fa
, "a")
301 f
.add_subfragment(fb
, "b")
303 f
._propagate
_domains
_up
()
304 self
.assertEqual(f
.domains
, {"a_sync": cda
, "b_sync": cdb
})
305 (fa
, _
), (fb
, _
) = f
.subfragments
306 self
.assertEqual(fa
.domains
, {"a_sync": cda
})
307 self
.assertEqual(fb
.domains
, {"b_sync": cdb
})
309 def test_domain_conflict_anon(self
):
310 cda
= ClockDomain("sync")
311 cdb
= ClockDomain("sync")
318 f
.add_subfragment(fa
, "a")
319 f
.add_subfragment(fb
)
321 with self
.assertRaises(DomainError
,
322 msg
="Domain 'sync' is defined by subfragments 'a', <unnamed #1> of fragment "
323 "'top'; it is necessary to either rename subfragment domains explicitly, "
324 "or give names to subfragments"):
325 f
._propagate
_domains
_up
()
327 def test_domain_conflict_name(self
):
328 cda
= ClockDomain("sync")
329 cdb
= ClockDomain("sync")
336 f
.add_subfragment(fa
, "x")
337 f
.add_subfragment(fb
, "x")
339 with self
.assertRaises(DomainError
,
340 msg
="Domain 'sync' is defined by subfragments #0, #1 of fragment 'top', some "
341 "of which have identical names; it is necessary to either rename subfragment "
342 "domains explicitly, or give distinct names to subfragments"):
343 f
._propagate
_domains
_up
()
345 def test_propagate_down(self
):
351 f1
.add_subfragment(f2
)
353 f1
._propagate_domains_down()
354 self
.assertEqual(f2
.domains
, {"cd": cd
})
356 def test_propagate_down_idempotent(self
):
363 f1
.add_subfragment(f2
)
365 f1
._propagate_domains_down()
366 self
.assertEqual(f1
.domains
, {"cd": cd
})
367 self
.assertEqual(f2
.domains
, {"cd": cd
})
369 def test_propagate(self
):
375 f1
.add_subfragment(f2
)
377 f1
._propagate_domains(ensure_sync_exists
=False)
378 self
.assertEqual(f1
.domains
, {"cd": cd
})
379 self
.assertEqual(f2
.domains
, {"cd": cd
})
381 def test_propagate_ensure_sync(self
):
384 f1
.add_subfragment(f2
)
386 f1
._propagate_domains(ensure_sync_exists
=True)
387 self
.assertEqual(f1
.domains
.keys(), {"sync"})
388 self
.assertEqual(f2
.domains
.keys(), {"sync"})
389 self
.assertEqual(f1
.domains
["sync"], f2
.domains
["sync"])
392 class FragmentHierarchyConflictTestCase(FHDLTestCase
):
393 def setUp_self_sub(self
):
399 self
.f1
.add_statements(self
.c1
.eq(0))
400 self
.f1
.add_driver(self
.s1
)
401 self
.f1
.add_driver(self
.c1
, "sync")
403 self
.f1a
= Fragment()
404 self
.f1
.add_subfragment(self
.f1a
, "f1a")
407 self
.f2
.add_statements(self
.c2
.eq(1))
408 self
.f2
.add_driver(self
.s1
)
409 self
.f2
.add_driver(self
.c2
, "sync")
410 self
.f1
.add_subfragment(self
.f2
)
412 self
.f1b
= Fragment()
413 self
.f1
.add_subfragment(self
.f1b
, "f1b")
415 self
.f2a
= Fragment()
416 self
.f2
.add_subfragment(self
.f2a
, "f2a")
418 def test_conflict_self_sub(self
):
419 self
.setUp_self_sub()
421 self
.f1
._resolve_hierarchy_conflicts(mode
="silent")
422 self
.assertEqual(self
.f1
.subfragments
, [
427 self
.assertRepr(self
.f1
.statements
, """
429 (eq (sig c1) (const 1'd0))
430 (eq (sig c2) (const 1'd1))
433 self
.assertEqual(self
.f1
.drivers
, {
434 None: SignalSet((self
.s1
,)),
435 "sync": SignalSet((self
.c1
, self
.c2
)),
438 def test_conflict_self_sub_error(self
):
439 self
.setUp_self_sub()
441 with self
.assertRaises(DriverConflict
,
442 msg
="Signal '(sig s1)' is driven from multiple fragments: top, top.<unnamed #1>"):
443 self
.f1
._resolve_hierarchy_conflicts(mode
="error")
445 def test_conflict_self_sub_warning(self
):
446 self
.setUp_self_sub()
448 with self
.assertWarns(DriverConflict
,
449 msg
="Signal '(sig s1)' is driven from multiple fragments: top, top.<unnamed #1>; "
450 "hierarchy will be flattened"):
451 self
.f1
._resolve_hierarchy_conflicts(mode
="warn")
453 def setUp_sub_sub(self
):
461 self
.f2
.add_driver(self
.s1
)
462 self
.f2
.add_statements(self
.c1
.eq(0))
463 self
.f1
.add_subfragment(self
.f2
)
466 self
.f3
.add_driver(self
.s1
)
467 self
.f3
.add_statements(self
.c2
.eq(1))
468 self
.f1
.add_subfragment(self
.f3
)
470 def test_conflict_sub_sub(self
):
473 self
.f1
._resolve_hierarchy_conflicts(mode
="silent")
474 self
.assertEqual(self
.f1
.subfragments
, [])
475 self
.assertRepr(self
.f1
.statements
, """
477 (eq (sig c1) (const 1'd0))
478 (eq (sig c2) (const 1'd1))
482 def setUp_self_subsub(self
):
488 self
.f1
.add_driver(self
.s1
)
491 self
.f2
.add_statements(self
.c1
.eq(0))
492 self
.f1
.add_subfragment(self
.f2
)
495 self
.f3
.add_driver(self
.s1
)
496 self
.f3
.add_statements(self
.c2
.eq(1))
497 self
.f2
.add_subfragment(self
.f3
)
499 def test_conflict_self_subsub(self
):
500 self
.setUp_self_subsub()
502 self
.f1
._resolve_hierarchy_conflicts(mode
="silent")
503 self
.assertEqual(self
.f1
.subfragments
, [])
504 self
.assertRepr(self
.f1
.statements
, """
506 (eq (sig c1) (const 1'd0))
507 (eq (sig c2) (const 1'd1))
511 def setUp_memory(self
):
512 self
.m
= Memory(width
=8, depth
=4)
513 self
.fr
= self
.m
.read_port().elaborate(platform
=None)
514 self
.fw
= self
.m
.write_port().elaborate(platform
=None)
517 self
.f2
.add_subfragment(self
.fr
)
518 self
.f1
.add_subfragment(self
.f2
)
520 self
.f3
.add_subfragment(self
.fw
)
521 self
.f1
.add_subfragment(self
.f3
)
523 def test_conflict_memory(self
):
526 self
.f1
._resolve_hierarchy_conflicts(mode
="silent")
527 self
.assertEqual(self
.f1
.subfragments
, [
532 def test_conflict_memory_error(self
):
535 with self
.assertRaises(DriverConflict
,
536 msg
="Memory 'm' is accessed from multiple fragments: top.<unnamed #0>, "
538 self
.f1
._resolve_hierarchy_conflicts(mode
="error")
540 def test_conflict_memory_warning(self
):
543 with self
.assertWarns(DriverConflict
,
544 msg
="Memory 'm' is accessed from multiple fragments: top.<unnamed #0>, "
545 "top.<unnamed #1>; hierarchy will be flattened"):
546 self
.f1
._resolve_hierarchy_conflicts(mode
="warn")
548 def test_explicit_flatten(self
):
551 self
.f2
.flatten
= True
552 self
.f1
.add_subfragment(self
.f2
)
554 self
.f1
._resolve_hierarchy_conflicts(mode
="silent")
555 self
.assertEqual(self
.f1
.subfragments
, [])
558 class InstanceTestCase(FHDLTestCase
):
559 def test_construct(self
):
566 inst
= Instance("foo",
568 ("p", "PARAM1", 0x1234),
578 self
.assertEqual(inst
.attrs
, OrderedDict([
582 self
.assertEqual(inst
.parameters
, OrderedDict([
586 self
.assertEqual(inst
.named_ports
, OrderedDict([
595 def test_wrong_construct_arg(self
):
597 with self
.assertRaises(NameError,
598 msg
="Instance argument ('', 's1', (sig s)) should be a tuple "
599 "(kind, name, value) where kind is one of \"p\", \"i\", \"o\", or \"io\""):
600 Instance("foo", ("", "s1", s
))
602 def test_wrong_construct_kwarg(self
):
604 with self
.assertRaises(NameError,
605 msg
="Instance keyword argument x_s1=(sig s) does not start with one of "
606 "\"p_\", \"i_\", \"o_\", or \"io_\""):
607 Instance("foo", x_s1
=s
)
612 self
.pins
= Signal(8)
613 self
.datal
= Signal(4)
614 self
.datah
= Signal(4)
615 self
.inst
= Instance("cpu",
620 o_data
=Cat(self
.datal
, self
.datah
),
623 self
.wrap
= Fragment()
624 self
.wrap
.add_subfragment(self
.inst
)
629 self
.assertEqual(f
.type, "cpu")
630 self
.assertEqual(f
.parameters
, OrderedDict([("RESET", 0x1234)]))
631 self
.assertEqual(list(f
.named_ports
.keys()), ["clk", "rst", "stb", "data", "pins"])
632 self
.assertEqual(f
.ports
, SignalDict([]))
634 def test_prepare(self
):
636 f
= self
.wrap
.prepare()
637 sync_clk
= f
.domains
["sync"].clk
638 self
.assertEqual(f
.ports
, SignalDict([
644 def test_prepare_explicit_ports(self
):
646 f
= self
.wrap
.prepare(ports
=[self
.rst
, self
.stb
])
647 sync_clk
= f
.domains
["sync"].clk
648 sync_rst
= f
.domains
["sync"].rst
649 self
.assertEqual(f
.ports
, SignalDict([
657 def test_prepare_slice_in_port(self
):
660 f
.add_subfragment(Instance("foo", o_O
=s
[0]))
661 f
.add_subfragment(Instance("foo", o_O
=s
[1]))
662 fp
= f
.prepare(ports
=[s
], ensure_sync_exists
=False)
663 self
.assertEqual(fp
.ports
, SignalDict([
667 def test_prepare_attrs(self
):
669 self
.inst
.attrs
["ATTR"] = 1
670 f
= self
.inst
.prepare()
671 self
.assertEqual(f
.attrs
, OrderedDict([