build.res: add ConstraintManager.
authorJean-François Nguyen <jf@lambdaconcept.com>
Fri, 26 Apr 2019 12:37:08 +0000 (14:37 +0200)
committerwhitequark <cz@m-labs.hk>
Sun, 26 May 2019 01:26:58 +0000 (01:26 +0000)
nmigen/build/res.py [new file with mode: 0644]
nmigen/test/test_build_res.py [new file with mode: 0644]

diff --git a/nmigen/build/res.py b/nmigen/build/res.py
new file mode 100644 (file)
index 0000000..f447ff7
--- /dev/null
@@ -0,0 +1,172 @@
+from collections import OrderedDict
+
+from .. import *
+from ..hdl.rec import *
+from ..lib.io import *
+
+from .dsl import *
+
+
+__all__ = ["ConstraintError", "ConstraintManager"]
+
+
+class ConstraintError(Exception):
+    pass
+
+
+class ConstraintManager:
+    def __init__(self, resources):
+        self.resources  = OrderedDict()
+        self.requested  = OrderedDict()
+        self.clocks     = OrderedDict()
+
+        self._ports     = []
+        self._tristates = []
+        self._diffpairs = []
+
+        self.add_resources(resources)
+
+    def add_resources(self, resources):
+        for r in resources:
+            if not isinstance(r, Resource):
+                raise TypeError("Object {!r} is not a Resource".format(r))
+            if (r.name, r.number) in self.resources:
+                raise NameError("Trying to add {!r}, but {!r} has the same name and number"
+                                .format(r, self.resources[r.name, r.number]))
+            self.resources[r.name, r.number] = r
+
+    def add_clock(self, name, number, frequency):
+        resource = self.lookup(name, number)
+        if isinstance(resource.io[0], Subsignal):
+            raise ConstraintError("Cannot constrain frequency of resource {}#{} because it has "
+                                  "subsignals"
+                                  .format(resource.name, resource.number, frequency))
+        if (resource.name, resource.number) in self.clocks:
+            other = self.clocks[resource.name, resource.number]
+            raise ConstraintError("Resource {}#{} is already constrained to a frequency of "
+                                  "{:f} MHz"
+                                  .format(resource.name, resource.number, other / 1e6))
+        self.clocks[resource.name, resource.number] = frequency
+
+    def lookup(self, name, number):
+        if (name, number) not in self.resources:
+            raise NameError("Resource {}#{} does not exist"
+                            .format(name, number))
+        return self.resources[name, number]
+
+    def request(self, name, number, dir=None, xdr=None):
+        resource = self.lookup(name, number)
+        if (resource.name, resource.number) in self.requested:
+            raise ConstraintError("Resource {}#{} has already been requested"
+                                  .format(name, number))
+
+        def resolve_dir_xdr(subsignal, dir, xdr):
+            if isinstance(subsignal.io[0], Subsignal):
+                if dir is None:
+                    dir = dict()
+                if xdr is None:
+                    xdr = dict()
+                if not isinstance(dir, dict):
+                    raise TypeError("Directions must be a dict, not {!r}, because {!r} "
+                                    "has subsignals"
+                                    .format(dir, subsignal))
+                if not isinstance(xdr, dict):
+                    raise TypeError("Data rate must be a dict, not {!r}, because {!r} "
+                                    "has subsignals"
+                                    .format(xdr, subsignal))
+                for sub in subsignal.io:
+                    sub_dir = dir.get(sub.name, None)
+                    sub_xdr = xdr.get(sub.name, None)
+                    dir[sub.name], xdr[sub.name] = resolve_dir_xdr(sub, sub_dir, sub_xdr)
+            else:
+                if dir is None:
+                    dir = subsignal.io[0].dir
+                if xdr is None:
+                    xdr = 1
+                if dir not in ("i", "o", "io"):
+                    raise TypeError("Direction must be one of \"i\", \"o\" or \"io\", not {!r}"
+                                    .format(dir))
+                if subsignal.io[0].dir != "io" and dir != subsignal.io[0].dir:
+                    raise ValueError("Direction of {!r} cannot be changed from \"{}\" to \"{}\"; "
+                                     "direction can be changed from \"io\" to \"i\" or from \"io\""
+                                     "to \"o\""
+                                     .format(subsignal.io[0], subsignal.io[0].dir, dir))
+                if not isinstance(xdr, int) or xdr < 1:
+                    raise ValueError("Data rate of {!r} must be a positive integer, not {!r}"
+                                     .format(subsignal.io[0], xdr))
+            return dir, xdr
+
+        dir, xdr = resolve_dir_xdr(resource, dir, xdr)
+
+        def get_value(subsignal, dir, xdr, name):
+            if isinstance(subsignal.io[0], Subsignal):
+                fields = OrderedDict()
+                for sub in subsignal.io:
+                    fields[sub.name] = get_value(sub, dir[sub.name], xdr[sub.name],
+                                                 "{}__{}".format(name, sub.name))
+                rec = Record([
+                    (f_name, f.layout) for (f_name, f) in fields.items()
+                ], fields=fields, name=name)
+                return rec
+            elif isinstance(subsignal.io[0], DiffPairs):
+                pairs = subsignal.io[0]
+                return Pin(len(pairs), dir, xdr, name=name)
+            elif isinstance(subsignal.io[0], Pins):
+                pins = subsignal.io[0]
+                return Pin(len(pins), dir, xdr, name=name)
+            else:
+                assert False # :nocov:
+
+        value_name = "{}_{}".format(resource.name, resource.number)
+        value = get_value(resource, dir, xdr, value_name)
+
+        def match_constraints(value, subsignal):
+            if isinstance(subsignal.io[0], Subsignal):
+                for sub in subsignal.io:
+                    yield from match_constraints(value[sub.name], sub)
+            else:
+                assert isinstance(value, Pin)
+                yield (value, subsignal.io[0], subsignal.extras)
+
+        for (pin, io, extras) in match_constraints(value, resource):
+            if isinstance(io, DiffPairs):
+                p = Signal(pin.width, name="{}_p".format(pin.name))
+                n = Signal(pin.width, name="{}_n".format(pin.name))
+                self._diffpairs.append((pin, p, n))
+                self._ports.append((p, io.p.names, extras))
+                self._ports.append((n, io.n.names, extras))
+            elif isinstance(io, Pins):
+                if pin.dir == "io":
+                    port = Signal(pin.width, name="{}_io".format(pin.name))
+                    self._tristates.append((pin, port))
+                else:
+                    port = getattr(pin, pin.dir)
+                self._ports.append((port, io.names, extras))
+            else:
+                assert False # :nocov:
+
+        self.requested[resource.name, resource.number] = value
+        return value
+
+    def iter_ports(self):
+        for port, pins, extras in self._ports:
+            yield port
+
+    def iter_port_constraints(self):
+        for port, pins, extras in self._ports:
+            yield (port.name, pins, extras)
+
+    def iter_clock_constraints(self):
+        for name, number in self.clocks.keys() & self.requested.keys():
+            resource = self.resources[name, number]
+            pin      = self.requested[name, number]
+            period   = self.clocks[name, number]
+            if pin.dir == "io":
+                raise ConstraintError("Cannot constrain frequency of resource {}#{} because "
+                                      "it has been requested as a tristate buffer"
+                                      .format(name, number))
+            if isinstance(resource.io[0], DiffPairs):
+                port_name = "{}_p".format(pin.name)
+            else:
+                port_name = getattr(pin, pin.dir).name
+            yield (port_name, period)
diff --git a/nmigen/test/test_build_res.py b/nmigen/test/test_build_res.py
new file mode 100644 (file)
index 0000000..adc25bf
--- /dev/null
@@ -0,0 +1,185 @@
+from .. import *
+from ..hdl.rec import *
+from ..lib.io import *
+from ..build.dsl import *
+from ..build.res import *
+from .tools import *
+
+
+class ConstraintManagerTestCase(FHDLTestCase):
+    def setUp(self):
+        self.resources = [
+            Resource("clk100", 0, DiffPairs("H1", "H2", dir="i")),
+            Resource("clk50", 0, Pins("K1")),
+            Resource("user_led", 0, Pins("A0", dir="o")),
+            Resource("i2c", 0,
+                Subsignal("scl", Pins("N10", dir="o")),
+                Subsignal("sda", Pins("N11"))
+            )
+        ]
+        self.cm = ConstraintManager(self.resources)
+
+    def test_basic(self):
+        self.assertEqual(self.cm.resources, {
+            ("clk100",   0): self.resources[0],
+            ("clk50",    0): self.resources[1],
+            ("user_led", 0): self.resources[2],
+            ("i2c",      0): self.resources[3]
+        })
+
+    def test_add_resources(self):
+        new_resources = [
+            Resource("user_led", 1, Pins("A1", dir="o"))
+        ]
+        self.cm.add_resources(new_resources)
+        self.assertEqual(self.cm.resources, {
+            ("clk100",   0): self.resources[0],
+            ("clk50",    0): self.resources[1],
+            ("user_led", 0): self.resources[2],
+            ("i2c",      0): self.resources[3],
+            ("user_led", 1): new_resources[0]
+        })
+
+    def test_lookup(self):
+        r = self.cm.lookup("user_led", 0)
+        self.assertIs(r, self.cm.resources["user_led", 0])
+
+    def test_request_basic(self):
+        r = self.cm.lookup("user_led", 0)
+        user_led = self.cm.request("user_led", 0)
+
+        self.assertIsInstance(user_led, Pin)
+        self.assertEqual(user_led.name, "user_led_0")
+        self.assertEqual(user_led.width, 1)
+        self.assertEqual(user_led.dir, "o")
+
+        ports = list(self.cm.iter_ports())
+        self.assertEqual(len(ports), 1)
+        self.assertIs(user_led.o, ports[0])
+
+        self.assertEqual(list(self.cm.iter_port_constraints()), [
+            ("user_led_0__o", ["A0"], [])
+        ])
+
+    def test_request_with_dir(self):
+        i2c = self.cm.request("i2c", 0, dir={"sda": "o"})
+        self.assertIsInstance(i2c, Record)
+        self.assertIsInstance(i2c.sda, Pin)
+        self.assertEqual(i2c.sda.dir, "o")
+
+    def test_request_tristate(self):
+        i2c = self.cm.request("i2c", 0)
+        self.assertEqual(i2c.sda.dir, "io")
+
+        ports = list(self.cm.iter_ports())
+        self.assertEqual(len(ports), 2)
+        self.assertIs(i2c.scl.o, ports[0]),
+        self.assertEqual(ports[1].name, "i2c_0__sda_io")
+        self.assertEqual(ports[1].nbits, 1)
+
+        self.assertEqual(self.cm._tristates, [(i2c.sda, ports[1])])
+        self.assertEqual(list(self.cm.iter_port_constraints()), [
+            ("i2c_0__scl__o", ["N10"], []),
+            ("i2c_0__sda_io", ["N11"], [])
+        ])
+
+    def test_request_diffpairs(self):
+        clk100 = self.cm.request("clk100", 0)
+        self.assertIsInstance(clk100, Pin)
+        self.assertEqual(clk100.dir, "i")
+        self.assertEqual(clk100.width, 1)
+
+        ports = list(self.cm.iter_ports())
+        self.assertEqual(len(ports), 2)
+        p, n = ports
+        self.assertEqual(p.name, "clk100_0_p")
+        self.assertEqual(p.nbits, clk100.width)
+        self.assertEqual(n.name, "clk100_0_n")
+        self.assertEqual(n.nbits, clk100.width)
+
+        self.assertEqual(self.cm._diffpairs, [(clk100, p, n)])
+        self.assertEqual(list(self.cm.iter_port_constraints()), [
+            ("clk100_0_p", ["H1"], []),
+            ("clk100_0_n", ["H2"], [])
+        ])
+
+    def test_add_clock(self):
+        self.cm.add_clock("clk100", 0, 10e6)
+        self.assertEqual(self.cm.clocks["clk100", 0], 10e6)
+        self.cm.add_clock("clk50", 0, 5e6)
+
+        clk100 = self.cm.request("clk100", 0)
+        clk50 = self.cm.request("clk50", 0, dir="i")
+        self.assertEqual(list(sorted(self.cm.iter_clock_constraints())), [
+            ("clk100_0_p", 10e6),
+            ("clk50_0__i", 5e6)
+        ])
+
+    def test_wrong_resources(self):
+        with self.assertRaises(TypeError, msg="Object 'wrong' is not a Resource"):
+            self.cm.add_resources(['wrong'])
+
+    def test_wrong_resources_duplicate(self):
+        with self.assertRaises(NameError,
+                msg="Trying to add (resource user_led 0 (pins o A1) ), but "
+                    "(resource user_led 0 (pins o A0) ) has the same name and number"):
+            self.cm.add_resources([Resource("user_led", 0, Pins("A1", dir="o"))])
+
+    def test_wrong_lookup(self):
+        with self.assertRaises(NameError,
+                msg="Resource user_led#1 does not exist"):
+            r = self.cm.lookup("user_led", 1)
+
+    def test_wrong_frequency_subsignals(self):
+        with self.assertRaises(ConstraintError,
+                msg="Cannot constrain frequency of resource i2c#0 because "
+                    "it has subsignals"):
+            self.cm.add_clock("i2c", 0, 10e6)
+
+    def test_wrong_frequency_tristate(self):
+        with self.assertRaises(ConstraintError,
+                msg="Cannot constrain frequency of resource clk50#0 because "
+                    "it has been requested as a tristate buffer"):
+            self.cm.add_clock("clk50", 0, 20e6)
+            clk50 = self.cm.request("clk50", 0)
+            list(self.cm.iter_clock_constraints())
+
+    def test_wrong_frequency_duplicate(self):
+        with self.assertRaises(ConstraintError,
+                msg="Resource clk100#0 is already constrained to a frequency of 10.000000 MHz"):
+            self.cm.add_clock("clk100", 0, 10e6)
+            self.cm.add_clock("clk100", 0, 5e6)
+
+    def test_wrong_request_duplicate(self):
+        with self.assertRaises(ConstraintError,
+                msg="Resource user_led#0 has already been requested"):
+            self.cm.request("user_led", 0)
+            self.cm.request("user_led", 0)
+
+    def test_wrong_request_with_dir(self):
+        with self.assertRaises(TypeError,
+                msg="Direction must be one of \"i\", \"o\" or \"io\", not 'wrong'"):
+            user_led = self.cm.request("user_led", 0, dir="wrong")
+
+    def test_wrong_request_with_dir_io(self):
+        with self.assertRaises(ValueError,
+                msg="Direction of (pins o A0) cannot be changed from \"o\" to \"i\"; direction "
+                    "can be changed from \"io\" to \"i\" or from \"io\"to \"o\""):
+            user_led = self.cm.request("user_led", 0, dir="i")
+
+    def test_wrong_request_with_dir_dict(self):
+        with self.assertRaises(TypeError,
+                msg="Directions must be a dict, not 'i', because (resource i2c 0 (subsignal scl "
+                    "(pins o N10) ) (subsignal sda (pins io N11) ) ) has subsignals"):
+            i2c = self.cm.request("i2c", 0, dir="i")
+
+    def test_wrong_request_with_wrong_xdr(self):
+        with self.assertRaises(ValueError,
+                msg="Data rate of (pins o A0) must be a positive integer, not 0"):
+            user_led = self.cm.request("user_led", 0, xdr=0)
+
+    def test_wrong_request_with_xdr_dict(self):
+        with self.assertRaises(TypeError,
+                msg="Data rate must be a dict, not 2, because (resource i2c 0 (subsignal scl "
+                    "(pins o N10) ) (subsignal sda (pins io N11) ) ) has subsignals"):
+            i2c = self.cm.request("i2c", 0, xdr=2)