build.run: implement SSH remote builds using Paramiko.
[nmigen.git] / nmigen / test / test_hdl_ir.py
index 439b791c9819f5a44c61c7115daf3ebb1332dd8d..ab8bdd846153a11efc59166227c0d61d8b0fb01d 100644 (file)
@@ -1,10 +1,12 @@
+# nmigen: UnusedElaboratable=no
+
 from collections import OrderedDict
 
 from ..hdl.ast import *
 from ..hdl.cd import *
 from ..hdl.ir import *
 from ..hdl.mem import *
-from .tools import *
+from .utils import *
 
 
 class BadElaboratable(Elaboratable):
@@ -14,14 +16,14 @@ class BadElaboratable(Elaboratable):
 
 class FragmentGetTestCase(FHDLTestCase):
     def test_get_wrong(self):
-        with self.assertRaises(AttributeError,
-                msg="Object 'None' cannot be elaborated"):
+        with self.assertRaisesRegex(AttributeError,
+                r"^Object None cannot be elaborated$"):
             Fragment.get(None, platform=None)
 
-        with self.assertWarns(UserWarning,
-                msg=".elaborate() returned None; missing return statement?"):
-            with self.assertRaises(AttributeError,
-                    msg="Object 'None' cannot be elaborated"):
+        with self.assertWarnsRegex(UserWarning,
+                r"^\.elaborate\(\) returned None; missing return statement\?$"):
+            with self.assertRaisesRegex(AttributeError,
+                    r"^Object None cannot be elaborated$"):
                 Fragment.get(BadElaboratable(), platform=None)
 
 
@@ -39,11 +41,11 @@ class FragmentGeneratedTestCase(FHDLTestCase):
         f2 = Fragment()
         f1.add_subfragment(f2, "f2")
 
-        with self.assertRaises(NameError,
-                msg="No subfragment at index #1"):
+        with self.assertRaisesRegex(NameError,
+                r"^No subfragment at index #1$"):
             f1.find_subfragment(1)
-        with self.assertRaises(NameError,
-                msg="No subfragment with name 'fx'"):
+        with self.assertRaisesRegex(NameError,
+                r"^No subfragment with name 'fx'$"):
             f1.find_subfragment("fx")
 
     def test_find_generated(self):
@@ -264,6 +266,53 @@ class FragmentPortsTestCase(FHDLTestCase):
             (s, "io")
         ]))
 
+    def test_in_out_same_signal(self):
+        s = Signal()
+
+        f1 = Instance("foo", i_x=s, o_y=s)
+        f2 = Fragment()
+        f2.add_subfragment(f1)
+
+        f2._propagate_ports(ports=(), all_undef_as_ports=True)
+        self.assertEqual(f1.ports, SignalDict([
+            (s, "o")
+        ]))
+
+        f3 = Instance("foo", o_y=s, i_x=s)
+        f4 = Fragment()
+        f4.add_subfragment(f3)
+
+        f4._propagate_ports(ports=(), all_undef_as_ports=True)
+        self.assertEqual(f3.ports, SignalDict([
+            (s, "o")
+        ]))
+
+    def test_clk_rst(self):
+        sync = ClockDomain()
+        f = Fragment()
+        f.add_domains(sync)
+
+        f = f.prepare(ports=(ClockSignal("sync"), ResetSignal("sync")))
+        self.assertEqual(f.ports, SignalDict([
+            (sync.clk, "i"),
+            (sync.rst, "i"),
+        ]))
+
+    def test_port_wrong(self):
+        f = Fragment()
+        with self.assertRaisesRegex(TypeError,
+                r"^Only signals may be added as ports, not \(const 1'd1\)$"):
+            f.prepare(ports=(Const(1),))
+
+    def test_port_not_iterable(self):
+        f = Fragment()
+        with self.assertRaisesRegex(TypeError,
+                r"^`ports` must be either a list or a tuple, not 1$"):
+            f.prepare(ports=1)
+        with self.assertRaisesRegex(TypeError,
+                (r"^`ports` must be either a list or a tuple, not \(const 1'd1\)"
+                    r" \(did you mean `ports=\(<signal>,\)`, rather than `ports=<signal>`\?\)$")):
+            f.prepare(ports=Const(1))
 
 class FragmentDomainsTestCase(FHDLTestCase):
     def test_iter_signals(self):
@@ -290,6 +339,17 @@ class FragmentDomainsTestCase(FHDLTestCase):
         f1._propagate_domains_up()
         self.assertEqual(f1.domains, {"cd": cd})
 
+    def test_propagate_up_local(self):
+        cd = ClockDomain(local=True)
+
+        f1 = Fragment()
+        f2 = Fragment()
+        f1.add_subfragment(f2)
+        f2.add_domains(cd)
+
+        f1._propagate_domains_up()
+        self.assertEqual(f1.domains, {})
+
     def test_domain_conflict(self):
         cda = ClockDomain("sync")
         cdb = ClockDomain("sync")
@@ -320,10 +380,10 @@ class FragmentDomainsTestCase(FHDLTestCase):
         f.add_subfragment(fa, "a")
         f.add_subfragment(fb)
 
-        with self.assertRaises(DomainError,
-                msg="Domain 'sync' is defined by subfragments 'a', <unnamed #1> of fragment "
-                    "'top'; it is necessary to either rename subfragment domains explicitly, "
-                    "or give names to subfragments"):
+        with self.assertRaisesRegex(DomainError,
+                (r"^Domain 'sync' is defined by subfragments 'a', <unnamed #1> of fragment "
+                    r"'top'; it is necessary to either rename subfragment domains explicitly, "
+                    r"or give names to subfragments$")):
             f._propagate_domains_up()
 
     def test_domain_conflict_name(self):
@@ -338,12 +398,47 @@ class FragmentDomainsTestCase(FHDLTestCase):
         f.add_subfragment(fa, "x")
         f.add_subfragment(fb, "x")
 
-        with self.assertRaises(DomainError,
-                msg="Domain 'sync' is defined by subfragments #0, #1 of fragment 'top', some "
-                    "of which have identical names; it is necessary to either rename subfragment "
-                    "domains explicitly, or give distinct names to subfragments"):
+        with self.assertRaisesRegex(DomainError,
+                (r"^Domain 'sync' is defined by subfragments #0, #1 of fragment 'top', some "
+                    r"of which have identical names; it is necessary to either rename subfragment "
+                    r"domains explicitly, or give distinct names to subfragments$")):
             f._propagate_domains_up()
 
+    def test_domain_conflict_rename_drivers(self):
+        cda = ClockDomain("sync")
+        cdb = ClockDomain("sync")
+
+        fa = Fragment()
+        fa.add_domains(cda)
+        fb = Fragment()
+        fb.add_domains(cdb)
+        fb.add_driver(ResetSignal("sync"), None)
+        f = Fragment()
+        f.add_subfragment(fa, "a")
+        f.add_subfragment(fb, "b")
+
+        f._propagate_domains_up()
+        fb_new, _ = f.subfragments[1]
+        self.assertEqual(fb_new.drivers, OrderedDict({
+            None: SignalSet((ResetSignal("b_sync"),))
+        }))
+
+    def test_domain_conflict_rename_drivers(self):
+        cda = ClockDomain("sync")
+        cdb = ClockDomain("sync")
+        s = Signal()
+
+        fa = Fragment()
+        fa.add_domains(cda)
+        fb = Fragment()
+        fb.add_domains(cdb)
+        f = Fragment()
+        f.add_subfragment(fa, "a")
+        f.add_subfragment(fb, "b")
+        f.add_driver(s, "b_sync")
+
+        f._propagate_domains(lambda name: ClockDomain(name))
+
     def test_propagate_down(self):
         cd = ClockDomain()
 
@@ -376,19 +471,81 @@ class FragmentDomainsTestCase(FHDLTestCase):
         f1.add_domains(cd)
         f1.add_subfragment(f2)
 
-        f1._propagate_domains(ensure_sync_exists=False)
+        new_domains = f1._propagate_domains(missing_domain=lambda name: None)
         self.assertEqual(f1.domains, {"cd": cd})
         self.assertEqual(f2.domains, {"cd": cd})
+        self.assertEqual(new_domains, [])
 
-    def test_propagate_ensure_sync(self):
+    def test_propagate_missing(self):
+        s1 = Signal()
         f1 = Fragment()
+        f1.add_driver(s1, "sync")
+
+        with self.assertRaisesRegex(DomainError,
+                r"^Domain 'sync' is used but not defined$"):
+            f1._propagate_domains(missing_domain=lambda name: None)
+
+    def test_propagate_create_missing(self):
+        s1 = Signal()
+        f1 = Fragment()
+        f1.add_driver(s1, "sync")
         f2 = Fragment()
         f1.add_subfragment(f2)
 
-        f1._propagate_domains(ensure_sync_exists=True)
+        new_domains = f1._propagate_domains(missing_domain=lambda name: ClockDomain(name))
         self.assertEqual(f1.domains.keys(), {"sync"})
         self.assertEqual(f2.domains.keys(), {"sync"})
         self.assertEqual(f1.domains["sync"], f2.domains["sync"])
+        self.assertEqual(new_domains, [f1.domains["sync"]])
+
+    def test_propagate_create_missing_fragment(self):
+        s1 = Signal()
+        f1 = Fragment()
+        f1.add_driver(s1, "sync")
+
+        cd = ClockDomain("sync")
+        f2 = Fragment()
+        f2.add_domains(cd)
+
+        new_domains = f1._propagate_domains(missing_domain=lambda name: f2)
+        self.assertEqual(f1.domains.keys(), {"sync"})
+        self.assertEqual(f1.domains["sync"], f2.domains["sync"])
+        self.assertEqual(new_domains, [])
+        self.assertEqual(f1.subfragments, [
+            (f2, "cd_sync")
+        ])
+
+    def test_propagate_create_missing_fragment_many_domains(self):
+        s1 = Signal()
+        f1 = Fragment()
+        f1.add_driver(s1, "sync")
+
+        cd_por  = ClockDomain("por")
+        cd_sync = ClockDomain("sync")
+        f2 = Fragment()
+        f2.add_domains(cd_por, cd_sync)
+
+        new_domains = f1._propagate_domains(missing_domain=lambda name: f2)
+        self.assertEqual(f1.domains.keys(), {"sync", "por"})
+        self.assertEqual(f2.domains.keys(), {"sync", "por"})
+        self.assertEqual(f1.domains["sync"], f2.domains["sync"])
+        self.assertEqual(new_domains, [])
+        self.assertEqual(f1.subfragments, [
+            (f2, "cd_sync")
+        ])
+
+    def test_propagate_create_missing_fragment_wrong(self):
+        s1 = Signal()
+        f1 = Fragment()
+        f1.add_driver(s1, "sync")
+
+        f2 = Fragment()
+        f2.add_domains(ClockDomain("foo"))
+
+        with self.assertRaisesRegex(DomainError,
+                (r"^Fragment returned by missing domain callback does not define requested "
+                    r"domain 'sync' \(defines 'foo'\)\.$")):
+            f1._propagate_domains(missing_domain=lambda name: f2)
 
 
 class FragmentHierarchyConflictTestCase(FHDLTestCase):
@@ -440,16 +597,16 @@ class FragmentHierarchyConflictTestCase(FHDLTestCase):
     def test_conflict_self_sub_error(self):
         self.setUp_self_sub()
 
-        with self.assertRaises(DriverConflict,
-                msg="Signal '(sig s1)' is driven from multiple fragments: top, top.<unnamed #1>"):
+        with self.assertRaisesRegex(DriverConflict,
+               r"^Signal '\(sig s1\)' is driven from multiple fragments: top, top.<unnamed #1>$"):
             self.f1._resolve_hierarchy_conflicts(mode="error")
 
     def test_conflict_self_sub_warning(self):
         self.setUp_self_sub()
 
-        with self.assertWarns(DriverConflict,
-                msg="Signal '(sig s1)' is driven from multiple fragments: top, top.<unnamed #1>; "
-                    "hierarchy will be flattened"):
+        with self.assertWarnsRegex(DriverConflict,
+                (r"^Signal '\(sig s1\)' is driven from multiple fragments: top, top.<unnamed #1>; "
+                    r"hierarchy will be flattened$")):
             self.f1._resolve_hierarchy_conflicts(mode="warn")
 
     def setUp_sub_sub(self):
@@ -534,17 +691,17 @@ class FragmentHierarchyConflictTestCase(FHDLTestCase):
     def test_conflict_memory_error(self):
         self.setUp_memory()
 
-        with self.assertRaises(DriverConflict,
-                msg="Memory 'm' is accessed from multiple fragments: top.<unnamed #0>, "
-                    "top.<unnamed #1>"):
+        with self.assertRaisesRegex(DriverConflict,
+                r"^Memory 'm' is accessed from multiple fragments: top\.<unnamed #0>, "
+                    r"top\.<unnamed #1>$"):
             self.f1._resolve_hierarchy_conflicts(mode="error")
 
     def test_conflict_memory_warning(self):
         self.setUp_memory()
 
-        with self.assertWarns(DriverConflict,
-                msg="Memory 'm' is accessed from multiple fragments: top.<unnamed #0>, "
-                    "top.<unnamed #1>; hierarchy will be flattened"):
+        with self.assertWarnsRegex(DriverConflict,
+                (r"^Memory 'm' is accessed from multiple fragments: top.<unnamed #0>, "
+                    r"top.<unnamed #1>; hierarchy will be flattened$")):
             self.f1._resolve_hierarchy_conflicts(mode="warn")
 
     def test_explicit_flatten(self):
@@ -556,6 +713,20 @@ class FragmentHierarchyConflictTestCase(FHDLTestCase):
         self.f1._resolve_hierarchy_conflicts(mode="silent")
         self.assertEqual(self.f1.subfragments, [])
 
+    def test_no_conflict_local_domains(self):
+        f1 = Fragment()
+        cd1 = ClockDomain("d", local=True)
+        f1.add_domains(cd1)
+        f1.add_driver(ClockSignal("d"))
+        f2 = Fragment()
+        cd2 = ClockDomain("d", local=True)
+        f2.add_domains(cd2)
+        f2.add_driver(ClockSignal("d"))
+        f3 = Fragment()
+        f3.add_subfragment(f1)
+        f3.add_subfragment(f2)
+        f3.prepare()
+
 
 class InstanceTestCase(FHDLTestCase):
     def test_construct(self):
@@ -594,18 +765,34 @@ class InstanceTestCase(FHDLTestCase):
             ("s6", (s6, "io")),
         ]))
 
+    def test_cast_ports(self):
+        inst = Instance("foo",
+            ("i", "s1", 1),
+            ("o", "s2", 2),
+            ("io", "s3", 3),
+            i_s4=4,
+            o_s5=5,
+            io_s6=6,
+        )
+        self.assertRepr(inst.named_ports["s1"][0], "(const 1'd1)")
+        self.assertRepr(inst.named_ports["s2"][0], "(const 2'd2)")
+        self.assertRepr(inst.named_ports["s3"][0], "(const 2'd3)")
+        self.assertRepr(inst.named_ports["s4"][0], "(const 3'd4)")
+        self.assertRepr(inst.named_ports["s5"][0], "(const 3'd5)")
+        self.assertRepr(inst.named_ports["s6"][0], "(const 3'd6)")
+
     def test_wrong_construct_arg(self):
         s = Signal()
-        with self.assertRaises(NameError,
-                msg="Instance argument ('', 's1', (sig s)) should be a tuple "
-                    "(kind, name, value) where kind is one of \"p\", \"i\", \"o\", or \"io\""):
+        with self.assertRaisesRegex(NameError,
+                (r"^Instance argument \('', 's1', \(sig s\)\) should be a tuple "
+                    r"\(kind, name, value\) where kind is one of \"p\", \"i\", \"o\", or \"io\"$")):
             Instance("foo", ("", "s1", s))
 
     def test_wrong_construct_kwarg(self):
         s = Signal()
-        with self.assertRaises(NameError,
-                msg="Instance keyword argument x_s1=(sig s) does not start with one of "
-                    "\"p_\", \"i_\", \"o_\", or \"io_\""):
+        with self.assertRaisesRegex(NameError,
+                (r"^Instance keyword argument x_s1=\(sig s\) does not start with one of "
+                    r"\"p_\", \"i_\", \"o_\", or \"io_\"$")):
             Instance("foo", x_s1=s)
 
     def setUp_cpu(self):
@@ -661,7 +848,7 @@ class InstanceTestCase(FHDLTestCase):
         f = Fragment()
         f.add_subfragment(Instance("foo", o_O=s[0]))
         f.add_subfragment(Instance("foo", o_O=s[1]))
-        fp = f.prepare(ports=[s], ensure_sync_exists=False)
+        fp = f.prepare(ports=[s], missing_domain=lambda name: None)
         self.assertEqual(fp.ports, SignalDict([
             (s, "o"),
         ]))