1 # nmigen: UnusedElaboratable=no
3 from collections
import OrderedDict
5 from nmigen
.hdl
.ast
import *
6 from nmigen
.hdl
.cd
import *
7 from nmigen
.hdl
.ir
import *
8 from nmigen
.hdl
.mem
import *
13 class BadElaboratable(Elaboratable
):
14 def elaborate(self
, platform
):
18 class FragmentGetTestCase(FHDLTestCase
):
19 def test_get_wrong(self
):
20 with self
.assertRaisesRegex(AttributeError,
21 r
"^Object None cannot be elaborated$"):
22 Fragment
.get(None, platform
=None)
24 with self
.assertWarnsRegex(UserWarning,
25 r
"^\.elaborate\(\) returned None; missing return statement\?$"):
26 with self
.assertRaisesRegex(AttributeError,
27 r
"^Object None cannot be elaborated$"):
28 Fragment
.get(BadElaboratable(), platform
=None)
31 class FragmentGeneratedTestCase(FHDLTestCase
):
32 def test_find_subfragment(self
):
35 f1
.add_subfragment(f2
, "f2")
37 self
.assertEqual(f1
.find_subfragment(0), f2
)
38 self
.assertEqual(f1
.find_subfragment("f2"), f2
)
40 def test_find_subfragment_wrong(self
):
43 f1
.add_subfragment(f2
, "f2")
45 with self
.assertRaisesRegex(NameError,
46 r
"^No subfragment at index #1$"):
47 f1
.find_subfragment(1)
48 with self
.assertRaisesRegex(NameError,
49 r
"^No subfragment with name 'fx'$"):
50 f1
.find_subfragment("fx")
52 def test_find_generated(self
):
55 f2
.generated
["sig"] = sig
= Signal()
56 f1
.add_subfragment(f2
, "f2")
58 self
.assertEqual(SignalKey(f1
.find_generated("f2", "sig")),
62 class FragmentDriversTestCase(FHDLTestCase
):
65 self
.assertEqual(list(f
.iter_comb()), [])
66 self
.assertEqual(list(f
.iter_sync()), [])
69 class FragmentPortsTestCase(FHDLTestCase
):
80 self
.assertEqual(list(f
.iter_ports()), [])
82 f
._propagate
_ports
(ports
=(), all_undef_as_ports
=True)
83 self
.assertEqual(f
.ports
, SignalDict([]))
85 def test_iter_signals(self
):
87 f
.add_ports(self
.s1
, self
.s2
, dir="io")
88 self
.assertEqual(SignalSet((self
.s1
, self
.s2
)), f
.iter_signals())
90 def test_self_contained(self
):
97 f
._propagate
_ports
(ports
=(), all_undef_as_ports
=True)
98 self
.assertEqual(f
.ports
, SignalDict([]))
100 def test_infer_input(self
):
106 f
._propagate
_ports
(ports
=(), all_undef_as_ports
=True)
107 self
.assertEqual(f
.ports
, SignalDict([
111 def test_request_output(self
):
117 f
._propagate
_ports
(ports
=(self
.c1
,), all_undef_as_ports
=True)
118 self
.assertEqual(f
.ports
, SignalDict([
123 def test_input_in_subfragment(self
):
132 f1
.add_subfragment(f2
)
133 f1
._propagate_ports(ports
=(), all_undef_as_ports
=True)
134 self
.assertEqual(f1
.ports
, SignalDict())
135 self
.assertEqual(f2
.ports
, SignalDict([
139 def test_input_only_in_subfragment(self
):
145 f1
.add_subfragment(f2
)
146 f1
._propagate_ports(ports
=(), all_undef_as_ports
=True)
147 self
.assertEqual(f1
.ports
, SignalDict([
150 self
.assertEqual(f2
.ports
, SignalDict([
154 def test_output_from_subfragment(self
):
163 f1
.add_subfragment(f2
)
165 f1
._propagate_ports(ports
=(self
.c2
,), all_undef_as_ports
=True)
166 self
.assertEqual(f1
.ports
, SignalDict([
169 self
.assertEqual(f2
.ports
, SignalDict([
173 def test_output_from_subfragment_2(self
):
182 f1
.add_subfragment(f2
)
187 f2
.add_subfragment(f3
)
189 f1
._propagate_ports(ports
=(), all_undef_as_ports
=True)
190 self
.assertEqual(f2
.ports
, SignalDict([
194 def test_input_output_sibling(self
):
200 f1
.add_subfragment(f2
)
205 f3
.add_driver(self
.c2
)
206 f1
.add_subfragment(f3
)
208 f1
._propagate_ports(ports
=(), all_undef_as_ports
=True)
209 self
.assertEqual(f1
.ports
, SignalDict())
211 def test_output_input_sibling(self
):
217 f2
.add_driver(self
.c2
)
218 f1
.add_subfragment(f2
)
223 f1
.add_subfragment(f3
)
225 f1
._propagate_ports(ports
=(), all_undef_as_ports
=True)
226 self
.assertEqual(f1
.ports
, SignalDict())
228 def test_input_cd(self
):
235 f
.add_driver(self
.c1
, "sync")
237 f
._propagate
_ports
(ports
=(), all_undef_as_ports
=True)
238 self
.assertEqual(f
.ports
, SignalDict([
244 def test_input_cd_reset_less(self
):
245 sync
= ClockDomain(reset_less
=True)
251 f
.add_driver(self
.c1
, "sync")
253 f
._propagate
_ports
(ports
=(), all_undef_as_ports
=True)
254 self
.assertEqual(f
.ports
, SignalDict([
259 def test_inout(self
):
262 f2
= Instance("foo", io_x
=s
)
263 f1
.add_subfragment(f2
)
265 f1
._propagate_ports(ports
=(), all_undef_as_ports
=True)
266 self
.assertEqual(f1
.ports
, SignalDict([
270 def test_in_out_same_signal(self
):
273 f1
= Instance("foo", i_x
=s
, o_y
=s
)
275 f2
.add_subfragment(f1
)
277 f2
._propagate_ports(ports
=(), all_undef_as_ports
=True)
278 self
.assertEqual(f1
.ports
, SignalDict([
282 f3
= Instance("foo", o_y
=s
, i_x
=s
)
284 f4
.add_subfragment(f3
)
286 f4
._propagate_ports(ports
=(), all_undef_as_ports
=True)
287 self
.assertEqual(f3
.ports
, SignalDict([
291 def test_clk_rst(self
):
296 f
= f
.prepare(ports
=(ClockSignal("sync"), ResetSignal("sync")))
297 self
.assertEqual(f
.ports
, SignalDict([
302 def test_port_wrong(self
):
304 with self
.assertRaisesRegex(TypeError,
305 r
"^Only signals may be added as ports, not \(const 1'd1\)$"):
306 f
.prepare(ports
=(Const(1),))
308 def test_port_not_iterable(self
):
310 with self
.assertRaisesRegex(TypeError,
311 r
"^`ports` must be either a list or a tuple, not 1$"):
313 with self
.assertRaisesRegex(TypeError,
314 (r
"^`ports` must be either a list or a tuple, not \(const 1'd1\)"
315 r
" \(did you mean `ports=\(<signal>,\)`, rather than `ports=<signal>`\?\)$")):
316 f
.prepare(ports
=Const(1))
318 class FragmentDomainsTestCase(FHDLTestCase
):
319 def test_iter_signals(self
):
321 cd2
= ClockDomain(reset_less
=True)
326 f
.add_domains(cd1
, cd2
)
327 f
.add_driver(s1
, "cd1")
328 self
.assertEqual(SignalSet((cd1
.clk
, cd1
.rst
, s1
)), f
.iter_signals())
329 f
.add_driver(s2
, "cd2")
330 self
.assertEqual(SignalSet((cd1
.clk
, cd1
.rst
, cd2
.clk
, s1
, s2
)), f
.iter_signals())
332 def test_propagate_up(self
):
337 f1
.add_subfragment(f2
)
340 f1
._propagate_domains_up()
341 self
.assertEqual(f1
.domains
, {"cd": cd
})
343 def test_propagate_up_local(self
):
344 cd
= ClockDomain(local
=True)
348 f1
.add_subfragment(f2
)
351 f1
._propagate_domains_up()
352 self
.assertEqual(f1
.domains
, {})
354 def test_domain_conflict(self
):
355 cda
= ClockDomain("sync")
356 cdb
= ClockDomain("sync")
363 f
.add_subfragment(fa
, "a")
364 f
.add_subfragment(fb
, "b")
366 f
._propagate
_domains
_up
()
367 self
.assertEqual(f
.domains
, {"a_sync": cda
, "b_sync": cdb
})
368 (fa
, _
), (fb
, _
) = f
.subfragments
369 self
.assertEqual(fa
.domains
, {"a_sync": cda
})
370 self
.assertEqual(fb
.domains
, {"b_sync": cdb
})
372 def test_domain_conflict_anon(self
):
373 cda
= ClockDomain("sync")
374 cdb
= ClockDomain("sync")
381 f
.add_subfragment(fa
, "a")
382 f
.add_subfragment(fb
)
384 with self
.assertRaisesRegex(DomainError
,
385 (r
"^Domain 'sync' is defined by subfragments 'a', <unnamed #1> of fragment "
386 r
"'top'; it is necessary to either rename subfragment domains explicitly, "
387 r
"or give names to subfragments$")):
388 f
._propagate
_domains
_up
()
390 def test_domain_conflict_name(self
):
391 cda
= ClockDomain("sync")
392 cdb
= ClockDomain("sync")
399 f
.add_subfragment(fa
, "x")
400 f
.add_subfragment(fb
, "x")
402 with self
.assertRaisesRegex(DomainError
,
403 (r
"^Domain 'sync' is defined by subfragments #0, #1 of fragment 'top', some "
404 r
"of which have identical names; it is necessary to either rename subfragment "
405 r
"domains explicitly, or give distinct names to subfragments$")):
406 f
._propagate
_domains
_up
()
408 def test_domain_conflict_rename_drivers(self
):
409 cda
= ClockDomain("sync")
410 cdb
= ClockDomain("sync")
416 fb
.add_driver(ResetSignal("sync"), None)
418 f
.add_subfragment(fa
, "a")
419 f
.add_subfragment(fb
, "b")
421 f
._propagate
_domains
_up
()
422 fb_new
, _
= f
.subfragments
[1]
423 self
.assertEqual(fb_new
.drivers
, OrderedDict({
424 None: SignalSet((ResetSignal("b_sync"),))
427 def test_domain_conflict_rename_drivers(self
):
428 cda
= ClockDomain("sync")
429 cdb
= ClockDomain("sync")
437 f
.add_subfragment(fa
, "a")
438 f
.add_subfragment(fb
, "b")
439 f
.add_driver(s
, "b_sync")
441 f
._propagate
_domains
(lambda name
: ClockDomain(name
))
443 def test_propagate_down(self
):
449 f1
.add_subfragment(f2
)
451 f1
._propagate_domains_down()
452 self
.assertEqual(f2
.domains
, {"cd": cd
})
454 def test_propagate_down_idempotent(self
):
461 f1
.add_subfragment(f2
)
463 f1
._propagate_domains_down()
464 self
.assertEqual(f1
.domains
, {"cd": cd
})
465 self
.assertEqual(f2
.domains
, {"cd": cd
})
467 def test_propagate(self
):
473 f1
.add_subfragment(f2
)
475 new_domains
= f1
._propagate_domains(missing_domain
=lambda name
: None)
476 self
.assertEqual(f1
.domains
, {"cd": cd
})
477 self
.assertEqual(f2
.domains
, {"cd": cd
})
478 self
.assertEqual(new_domains
, [])
480 def test_propagate_missing(self
):
483 f1
.add_driver(s1
, "sync")
485 with self
.assertRaisesRegex(DomainError
,
486 r
"^Domain 'sync' is used but not defined$"):
487 f1
._propagate_domains(missing_domain
=lambda name
: None)
489 def test_propagate_create_missing(self
):
492 f1
.add_driver(s1
, "sync")
494 f1
.add_subfragment(f2
)
496 new_domains
= f1
._propagate_domains(missing_domain
=lambda name
: ClockDomain(name
))
497 self
.assertEqual(f1
.domains
.keys(), {"sync"})
498 self
.assertEqual(f2
.domains
.keys(), {"sync"})
499 self
.assertEqual(f1
.domains
["sync"], f2
.domains
["sync"])
500 self
.assertEqual(new_domains
, [f1
.domains
["sync"]])
502 def test_propagate_create_missing_fragment(self
):
505 f1
.add_driver(s1
, "sync")
507 cd
= ClockDomain("sync")
511 new_domains
= f1
._propagate_domains(missing_domain
=lambda name
: f2
)
512 self
.assertEqual(f1
.domains
.keys(), {"sync"})
513 self
.assertEqual(f1
.domains
["sync"], f2
.domains
["sync"])
514 self
.assertEqual(new_domains
, [])
515 self
.assertEqual(f1
.subfragments
, [
519 def test_propagate_create_missing_fragment_many_domains(self
):
522 f1
.add_driver(s1
, "sync")
524 cd_por
= ClockDomain("por")
525 cd_sync
= ClockDomain("sync")
527 f2
.add_domains(cd_por
, cd_sync
)
529 new_domains
= f1
._propagate_domains(missing_domain
=lambda name
: f2
)
530 self
.assertEqual(f1
.domains
.keys(), {"sync", "por"})
531 self
.assertEqual(f2
.domains
.keys(), {"sync", "por"})
532 self
.assertEqual(f1
.domains
["sync"], f2
.domains
["sync"])
533 self
.assertEqual(new_domains
, [])
534 self
.assertEqual(f1
.subfragments
, [
538 def test_propagate_create_missing_fragment_wrong(self
):
541 f1
.add_driver(s1
, "sync")
544 f2
.add_domains(ClockDomain("foo"))
546 with self
.assertRaisesRegex(DomainError
,
547 (r
"^Fragment returned by missing domain callback does not define requested "
548 r
"domain 'sync' \(defines 'foo'\)\.$")):
549 f1
._propagate_domains(missing_domain
=lambda name
: f2
)
552 class FragmentHierarchyConflictTestCase(FHDLTestCase
):
553 def setUp_self_sub(self
):
559 self
.f1
.add_statements(self
.c1
.eq(0))
560 self
.f1
.add_driver(self
.s1
)
561 self
.f1
.add_driver(self
.c1
, "sync")
563 self
.f1a
= Fragment()
564 self
.f1
.add_subfragment(self
.f1a
, "f1a")
567 self
.f2
.add_statements(self
.c2
.eq(1))
568 self
.f2
.add_driver(self
.s1
)
569 self
.f2
.add_driver(self
.c2
, "sync")
570 self
.f1
.add_subfragment(self
.f2
)
572 self
.f1b
= Fragment()
573 self
.f1
.add_subfragment(self
.f1b
, "f1b")
575 self
.f2a
= Fragment()
576 self
.f2
.add_subfragment(self
.f2a
, "f2a")
578 def test_conflict_self_sub(self
):
579 self
.setUp_self_sub()
581 self
.f1
._resolve_hierarchy_conflicts(mode
="silent")
582 self
.assertEqual(self
.f1
.subfragments
, [
587 self
.assertRepr(self
.f1
.statements
, """
589 (eq (sig c1) (const 1'd0))
590 (eq (sig c2) (const 1'd1))
593 self
.assertEqual(self
.f1
.drivers
, {
594 None: SignalSet((self
.s1
,)),
595 "sync": SignalSet((self
.c1
, self
.c2
)),
598 def test_conflict_self_sub_error(self
):
599 self
.setUp_self_sub()
601 with self
.assertRaisesRegex(DriverConflict
,
602 r
"^Signal '\(sig s1\)' is driven from multiple fragments: top, top.<unnamed #1>$"):
603 self
.f1
._resolve_hierarchy_conflicts(mode
="error")
605 def test_conflict_self_sub_warning(self
):
606 self
.setUp_self_sub()
608 with self
.assertWarnsRegex(DriverConflict
,
609 (r
"^Signal '\(sig s1\)' is driven from multiple fragments: top, top.<unnamed #1>; "
610 r
"hierarchy will be flattened$")):
611 self
.f1
._resolve_hierarchy_conflicts(mode
="warn")
613 def setUp_sub_sub(self
):
621 self
.f2
.add_driver(self
.s1
)
622 self
.f2
.add_statements(self
.c1
.eq(0))
623 self
.f1
.add_subfragment(self
.f2
)
626 self
.f3
.add_driver(self
.s1
)
627 self
.f3
.add_statements(self
.c2
.eq(1))
628 self
.f1
.add_subfragment(self
.f3
)
630 def test_conflict_sub_sub(self
):
633 self
.f1
._resolve_hierarchy_conflicts(mode
="silent")
634 self
.assertEqual(self
.f1
.subfragments
, [])
635 self
.assertRepr(self
.f1
.statements
, """
637 (eq (sig c1) (const 1'd0))
638 (eq (sig c2) (const 1'd1))
642 def setUp_self_subsub(self
):
648 self
.f1
.add_driver(self
.s1
)
651 self
.f2
.add_statements(self
.c1
.eq(0))
652 self
.f1
.add_subfragment(self
.f2
)
655 self
.f3
.add_driver(self
.s1
)
656 self
.f3
.add_statements(self
.c2
.eq(1))
657 self
.f2
.add_subfragment(self
.f3
)
659 def test_conflict_self_subsub(self
):
660 self
.setUp_self_subsub()
662 self
.f1
._resolve_hierarchy_conflicts(mode
="silent")
663 self
.assertEqual(self
.f1
.subfragments
, [])
664 self
.assertRepr(self
.f1
.statements
, """
666 (eq (sig c1) (const 1'd0))
667 (eq (sig c2) (const 1'd1))
671 def setUp_memory(self
):
672 self
.m
= Memory(width
=8, depth
=4)
673 self
.fr
= self
.m
.read_port().elaborate(platform
=None)
674 self
.fw
= self
.m
.write_port().elaborate(platform
=None)
677 self
.f2
.add_subfragment(self
.fr
)
678 self
.f1
.add_subfragment(self
.f2
)
680 self
.f3
.add_subfragment(self
.fw
)
681 self
.f1
.add_subfragment(self
.f3
)
683 def test_conflict_memory(self
):
686 self
.f1
._resolve_hierarchy_conflicts(mode
="silent")
687 self
.assertEqual(self
.f1
.subfragments
, [
692 def test_conflict_memory_error(self
):
695 with self
.assertRaisesRegex(DriverConflict
,
696 r
"^Memory 'm' is accessed from multiple fragments: top\.<unnamed #0>, "
697 r
"top\.<unnamed #1>$"):
698 self
.f1
._resolve_hierarchy_conflicts(mode
="error")
700 def test_conflict_memory_warning(self
):
703 with self
.assertWarnsRegex(DriverConflict
,
704 (r
"^Memory 'm' is accessed from multiple fragments: top.<unnamed #0>, "
705 r
"top.<unnamed #1>; hierarchy will be flattened$")):
706 self
.f1
._resolve_hierarchy_conflicts(mode
="warn")
708 def test_explicit_flatten(self
):
711 self
.f2
.flatten
= True
712 self
.f1
.add_subfragment(self
.f2
)
714 self
.f1
._resolve_hierarchy_conflicts(mode
="silent")
715 self
.assertEqual(self
.f1
.subfragments
, [])
717 def test_no_conflict_local_domains(self
):
719 cd1
= ClockDomain("d", local
=True)
721 f1
.add_driver(ClockSignal("d"))
723 cd2
= ClockDomain("d", local
=True)
725 f2
.add_driver(ClockSignal("d"))
727 f3
.add_subfragment(f1
)
728 f3
.add_subfragment(f2
)
732 class InstanceTestCase(FHDLTestCase
):
733 def test_construct(self
):
740 inst
= Instance("foo",
742 ("p", "PARAM1", 0x1234),
752 self
.assertEqual(inst
.attrs
, OrderedDict([
756 self
.assertEqual(inst
.parameters
, OrderedDict([
760 self
.assertEqual(inst
.named_ports
, OrderedDict([
769 def test_cast_ports(self
):
770 inst
= Instance("foo",
778 self
.assertRepr(inst
.named_ports
["s1"][0], "(const 1'd1)")
779 self
.assertRepr(inst
.named_ports
["s2"][0], "(const 2'd2)")
780 self
.assertRepr(inst
.named_ports
["s3"][0], "(const 2'd3)")
781 self
.assertRepr(inst
.named_ports
["s4"][0], "(const 3'd4)")
782 self
.assertRepr(inst
.named_ports
["s5"][0], "(const 3'd5)")
783 self
.assertRepr(inst
.named_ports
["s6"][0], "(const 3'd6)")
785 def test_wrong_construct_arg(self
):
787 with self
.assertRaisesRegex(NameError,
788 (r
"^Instance argument \('', 's1', \(sig s\)\) should be a tuple "
789 r
"\(kind, name, value\) where kind is one of \"p
\", \"i
\", \"o
\", or \"io
\"$
")):
790 Instance("foo
", ("", "s1
", s))
792 def test_wrong_construct_kwarg(self):
794 with self.assertRaisesRegex(NameError,
795 (r"^Instance keyword argument x_s1
=\
(sig s\
) does
not start with one of
"
796 r"\"p_
\", \"i_
\", \"o_
\", or \"io_
\"$
")):
797 Instance("foo
", x_s1=s)
802 self.pins = Signal(8)
803 self.datal = Signal(4)
804 self.datah = Signal(4)
805 self.inst = Instance("cpu
",
810 o_data=Cat(self.datal, self.datah),
813 self.wrap = Fragment()
814 self.wrap.add_subfragment(self.inst)
819 self.assertEqual(f.type, "cpu
")
820 self.assertEqual(f.parameters, OrderedDict([("RESET
", 0x1234)]))
821 self.assertEqual(list(f.named_ports.keys()), ["clk
", "rst
", "stb
", "data
", "pins
"])
822 self.assertEqual(f.ports, SignalDict([]))
824 def test_prepare(self):
826 f = self.wrap.prepare()
827 sync_clk = f.domains["sync
"].clk
828 self.assertEqual(f.ports, SignalDict([
834 def test_prepare_explicit_ports(self):
836 f = self.wrap.prepare(ports=[self.rst, self.stb])
837 sync_clk = f.domains["sync
"].clk
838 sync_rst = f.domains["sync
"].rst
839 self.assertEqual(f.ports, SignalDict([
847 def test_prepare_slice_in_port(self):
850 f.add_subfragment(Instance("foo
", o_O=s[0]))
851 f.add_subfragment(Instance("foo
", o_O=s[1]))
852 fp = f.prepare(ports=[s], missing_domain=lambda name: None)
853 self.assertEqual(fp.ports, SignalDict([
857 def test_prepare_attrs(self):
859 self.inst.attrs["ATTR
"] = 1
860 f = self.inst.prepare()
861 self.assertEqual(f.attrs, OrderedDict([