fhdl.xfrm: implement DomainLowerer.
authorwhitequark <whitequark@whitequark.org>
Fri, 14 Dec 2018 10:56:53 +0000 (10:56 +0000)
committerwhitequark <whitequark@whitequark.org>
Fri, 14 Dec 2018 10:56:53 +0000 (10:56 +0000)
nmigen/fhdl/ast.py
nmigen/fhdl/cd.py
nmigen/fhdl/ir.py
nmigen/fhdl/xfrm.py
nmigen/test/test_fhdl_cd.py
nmigen/test/test_fhdl_xfrm.py

index cbfa7999b8efe776ea8b1bf56f723f310c2e2863..027f70032eae9fa50f9bcd4fc543bd9d99edef7a 100644 (file)
@@ -626,12 +626,15 @@ class ResetSignal(Value):
     ----------
     domain : str
         Clock domain to obtain a reset signal for. Defaults to ``"sync"``.
+    allow_reset_less : bool
+        If the clock domain is reset-less, act as a constant ``0`` instead of reporting an error.
     """
-    def __init__(self, domain="sync"):
+    def __init__(self, domain="sync", allow_reset_less=False):
         super().__init__()
         if not isinstance(domain, str):
             raise TypeError("Clock domain name must be a string, not {!r}".format(domain))
         self.domain = domain
+        self.allow_reset_less = allow_reset_less
 
     def __repr__(self):
         return "(rst {})".format(self.domain)
index 85c6b71be66aee20097e82349529a2596fabbf3c..941a9bab3f06eed0704b74207e498e529994b11e 100644 (file)
@@ -2,7 +2,11 @@ from .. import tracer
 from .ast import Signal
 
 
-__all__ = ["ClockDomain"]
+__all__ = ["ClockDomain", "DomainError"]
+
+
+class DomainError(Exception):
+    pass
 
 
 class ClockDomain:
index 493aa2f61c6243ec4f983fba67a9b34a376b504a..519f69b9e914c6863af3a73818eed9e87d8e3f4a 100644 (file)
@@ -146,9 +146,13 @@ class Fragment:
     def _insert_domain_resets(self):
         from .xfrm import ResetInserter
 
-        return ResetInserter({
-            cd.name: cd.rst for cd in self.domains.values() if cd.rst is not None
-        })(self)
+        resets = {cd.name: cd.rst for cd in self.domains.values() if cd.rst is not None}
+        return ResetInserter(resets)(self)
+
+    def _lower_domain_signals(self):
+        from .xfrm import DomainLowerer
+
+        return DomainLowerer(self.domains)(self)
 
     def _propagate_ports(self, ports):
         # Collect all signals we're driving (on LHS of statements), and signals we're using
@@ -194,5 +198,6 @@ class Fragment:
         fragment = FragmentTransformer()(self)
         fragment._propagate_domains(ensure_sync_exists)
         fragment = fragment._insert_domain_resets()
+        fragment = fragment._lower_domain_signals()
         fragment._propagate_ports(ports)
         return fragment
index c32083e0d0edf450af0a28770dd6071aceb67594..9c99cce62149c69324db4d5f31ea64bb4b93d3e8 100644 (file)
@@ -2,11 +2,12 @@ from collections import OrderedDict, Iterable
 
 from ..tools import flatten
 from .ast import *
+from .ast import _StatementList
 from .ir import *
 
 
 __all__ = ["ValueTransformer", "StatementTransformer", "FragmentTransformer",
-           "DomainRenamer", "ResetInserter", "CEInserter"]
+           "DomainRenamer", "DomainLowerer", "ResetInserter", "CEInserter"]
 
 
 class ValueTransformer:
@@ -81,7 +82,7 @@ class StatementTransformer:
         return Switch(self.on_value(stmt.test), cases)
 
     def on_statements(self, stmt):
-        return list(flatten(self.on_statement(stmt) for stmt in stmt))
+        return _StatementList(flatten(self.on_statement(stmt) for stmt in stmt))
 
     def on_unknown_statement(self, stmt):
         raise TypeError("Cannot transform statement {!r}".format(stmt)) # :nocov:
@@ -166,6 +167,31 @@ class DomainRenamer(FragmentTransformer, ValueTransformer, StatementTransformer)
                 new_fragment.drive(signal, domain)
 
 
+class DomainLowerer(FragmentTransformer, ValueTransformer, StatementTransformer):
+    def __init__(self, domains):
+        self.domains = domains
+
+    def _resolve(self, domain, context):
+        if domain not in self.domains:
+            raise DomainError("Signal {!r} refers to nonexistent domain '{}'"
+                              .format(context, domain))
+        return self.domains[domain]
+
+    def on_ClockSignal(self, value):
+        cd = self._resolve(value.domain, value)
+        return cd.clk
+
+    def on_ResetSignal(self, value):
+        cd = self._resolve(value.domain, value)
+        if cd.rst is None:
+            if value.allow_reset_less:
+                return Const(0)
+            else:
+                raise DomainError("Signal {!r} refers to reset of reset-less domain '{}'"
+                                  .format(value, value.domain))
+        return cd.rst
+
+
 class _ControlInserter(FragmentTransformer):
     def __init__(self, controls):
         if isinstance(controls, Value):
index 2ec8fb6e7bc065d5fc838244a9ba8d7c95609a90..7dc7fe45a7844588df33a8e35f0071c647fbbbb7 100644 (file)
@@ -47,3 +47,11 @@ class ClockDomainCase(FHDLTestCase):
         self.assertEqual(sync.name, "pix")
         self.assertEqual(sync.clk.name, "pix_clk")
         self.assertEqual(sync.rst.name, "pix_rst")
+
+    def test_rename_reset_less(self):
+        sync = ClockDomain(reset_less=True)
+        self.assertEqual(sync.name, "sync")
+        self.assertEqual(sync.clk.name, "clk")
+        sync.rename("pix")
+        self.assertEqual(sync.name, "pix")
+        self.assertEqual(sync.clk.name, "pix_clk")
index faacf3b8bc809e86a0f51f4c7fa238ee1ff0de87..ca3c2424a73131d89bdec33a9a16b1b588b91661 100644 (file)
@@ -89,6 +89,75 @@ class DomainRenamerTestCase(FHDLTestCase):
         })
 
 
+class DomainLowererTestCase(FHDLTestCase):
+    def setUp(self):
+        self.s = Signal()
+
+    def test_lower_clk(self):
+        sync = ClockDomain()
+        f = Fragment()
+        f.add_statements(
+            self.s.eq(ClockSignal("sync"))
+        )
+
+        f = DomainLowerer({"sync": sync})(f)
+        self.assertRepr(f.statements, """
+        (
+            (eq (sig s) (sig clk))
+        )
+        """)
+
+    def test_lower_rst(self):
+        sync = ClockDomain()
+        f = Fragment()
+        f.add_statements(
+            self.s.eq(ResetSignal("sync"))
+        )
+
+        f = DomainLowerer({"sync": sync})(f)
+        self.assertRepr(f.statements, """
+        (
+            (eq (sig s) (sig rst))
+        )
+        """)
+
+    def test_lower_rst_reset_less(self):
+        sync = ClockDomain(reset_less=True)
+        f = Fragment()
+        f.add_statements(
+            self.s.eq(ResetSignal("sync", allow_reset_less=True))
+        )
+
+        f = DomainLowerer({"sync": sync})(f)
+        self.assertRepr(f.statements, """
+        (
+            (eq (sig s) (const 1'd0))
+        )
+        """)
+
+    def test_lower_wrong_domain(self):
+        sync = ClockDomain()
+        f = Fragment()
+        f.add_statements(
+            self.s.eq(ClockSignal("xxx"))
+        )
+
+        with self.assertRaises(DomainError,
+                msg="Signal (clk xxx) refers to nonexistent domain 'xxx'"):
+            DomainLowerer({"sync": sync})(f)
+
+    def test_lower_wrong_reset_less_domain(self):
+        sync = ClockDomain(reset_less=True)
+        f = Fragment()
+        f.add_statements(
+            self.s.eq(ResetSignal("sync"))
+        )
+
+        with self.assertRaises(DomainError,
+                msg="Signal (rst sync) refers to reset of reset-less domain 'sync'"):
+            DomainLowerer({"sync": sync})(f)
+
+
 class ResetInserterTestCase(FHDLTestCase):
     def setUp(self):
         self.s1 = Signal()