From: Jean-François Nguyen Date: Fri, 26 Apr 2019 12:37:08 +0000 (+0200) Subject: build.res: add ConstraintManager. X-Git-Url: https://git.libre-soc.org/?a=commitdiff_plain;h=1dad8c6e380dddf941120b1dde399f167cb15299;p=nmigen.git build.res: add ConstraintManager. --- diff --git a/nmigen/build/res.py b/nmigen/build/res.py new file mode 100644 index 0000000..f447ff7 --- /dev/null +++ b/nmigen/build/res.py @@ -0,0 +1,172 @@ +from collections import OrderedDict + +from .. import * +from ..hdl.rec import * +from ..lib.io import * + +from .dsl import * + + +__all__ = ["ConstraintError", "ConstraintManager"] + + +class ConstraintError(Exception): + pass + + +class ConstraintManager: + def __init__(self, resources): + self.resources = OrderedDict() + self.requested = OrderedDict() + self.clocks = OrderedDict() + + self._ports = [] + self._tristates = [] + self._diffpairs = [] + + self.add_resources(resources) + + def add_resources(self, resources): + for r in resources: + if not isinstance(r, Resource): + raise TypeError("Object {!r} is not a Resource".format(r)) + if (r.name, r.number) in self.resources: + raise NameError("Trying to add {!r}, but {!r} has the same name and number" + .format(r, self.resources[r.name, r.number])) + self.resources[r.name, r.number] = r + + def add_clock(self, name, number, frequency): + resource = self.lookup(name, number) + if isinstance(resource.io[0], Subsignal): + raise ConstraintError("Cannot constrain frequency of resource {}#{} because it has " + "subsignals" + .format(resource.name, resource.number, frequency)) + if (resource.name, resource.number) in self.clocks: + other = self.clocks[resource.name, resource.number] + raise ConstraintError("Resource {}#{} is already constrained to a frequency of " + "{:f} MHz" + .format(resource.name, resource.number, other / 1e6)) + self.clocks[resource.name, resource.number] = frequency + + def lookup(self, name, number): + if (name, number) not in self.resources: + raise NameError("Resource {}#{} does not exist" + .format(name, number)) + return self.resources[name, number] + + def request(self, name, number, dir=None, xdr=None): + resource = self.lookup(name, number) + if (resource.name, resource.number) in self.requested: + raise ConstraintError("Resource {}#{} has already been requested" + .format(name, number)) + + def resolve_dir_xdr(subsignal, dir, xdr): + if isinstance(subsignal.io[0], Subsignal): + if dir is None: + dir = dict() + if xdr is None: + xdr = dict() + if not isinstance(dir, dict): + raise TypeError("Directions must be a dict, not {!r}, because {!r} " + "has subsignals" + .format(dir, subsignal)) + if not isinstance(xdr, dict): + raise TypeError("Data rate must be a dict, not {!r}, because {!r} " + "has subsignals" + .format(xdr, subsignal)) + for sub in subsignal.io: + sub_dir = dir.get(sub.name, None) + sub_xdr = xdr.get(sub.name, None) + dir[sub.name], xdr[sub.name] = resolve_dir_xdr(sub, sub_dir, sub_xdr) + else: + if dir is None: + dir = subsignal.io[0].dir + if xdr is None: + xdr = 1 + if dir not in ("i", "o", "io"): + raise TypeError("Direction must be one of \"i\", \"o\" or \"io\", not {!r}" + .format(dir)) + if subsignal.io[0].dir != "io" and dir != subsignal.io[0].dir: + raise ValueError("Direction of {!r} cannot be changed from \"{}\" to \"{}\"; " + "direction can be changed from \"io\" to \"i\" or from \"io\"" + "to \"o\"" + .format(subsignal.io[0], subsignal.io[0].dir, dir)) + if not isinstance(xdr, int) or xdr < 1: + raise ValueError("Data rate of {!r} must be a positive integer, not {!r}" + .format(subsignal.io[0], xdr)) + return dir, xdr + + dir, xdr = resolve_dir_xdr(resource, dir, xdr) + + def get_value(subsignal, dir, xdr, name): + if isinstance(subsignal.io[0], Subsignal): + fields = OrderedDict() + for sub in subsignal.io: + fields[sub.name] = get_value(sub, dir[sub.name], xdr[sub.name], + "{}__{}".format(name, sub.name)) + rec = Record([ + (f_name, f.layout) for (f_name, f) in fields.items() + ], fields=fields, name=name) + return rec + elif isinstance(subsignal.io[0], DiffPairs): + pairs = subsignal.io[0] + return Pin(len(pairs), dir, xdr, name=name) + elif isinstance(subsignal.io[0], Pins): + pins = subsignal.io[0] + return Pin(len(pins), dir, xdr, name=name) + else: + assert False # :nocov: + + value_name = "{}_{}".format(resource.name, resource.number) + value = get_value(resource, dir, xdr, value_name) + + def match_constraints(value, subsignal): + if isinstance(subsignal.io[0], Subsignal): + for sub in subsignal.io: + yield from match_constraints(value[sub.name], sub) + else: + assert isinstance(value, Pin) + yield (value, subsignal.io[0], subsignal.extras) + + for (pin, io, extras) in match_constraints(value, resource): + if isinstance(io, DiffPairs): + p = Signal(pin.width, name="{}_p".format(pin.name)) + n = Signal(pin.width, name="{}_n".format(pin.name)) + self._diffpairs.append((pin, p, n)) + self._ports.append((p, io.p.names, extras)) + self._ports.append((n, io.n.names, extras)) + elif isinstance(io, Pins): + if pin.dir == "io": + port = Signal(pin.width, name="{}_io".format(pin.name)) + self._tristates.append((pin, port)) + else: + port = getattr(pin, pin.dir) + self._ports.append((port, io.names, extras)) + else: + assert False # :nocov: + + self.requested[resource.name, resource.number] = value + return value + + def iter_ports(self): + for port, pins, extras in self._ports: + yield port + + def iter_port_constraints(self): + for port, pins, extras in self._ports: + yield (port.name, pins, extras) + + def iter_clock_constraints(self): + for name, number in self.clocks.keys() & self.requested.keys(): + resource = self.resources[name, number] + pin = self.requested[name, number] + period = self.clocks[name, number] + if pin.dir == "io": + raise ConstraintError("Cannot constrain frequency of resource {}#{} because " + "it has been requested as a tristate buffer" + .format(name, number)) + if isinstance(resource.io[0], DiffPairs): + port_name = "{}_p".format(pin.name) + else: + port_name = getattr(pin, pin.dir).name + yield (port_name, period) diff --git a/nmigen/test/test_build_res.py b/nmigen/test/test_build_res.py new file mode 100644 index 0000000..adc25bf --- /dev/null +++ b/nmigen/test/test_build_res.py @@ -0,0 +1,185 @@ +from .. import * +from ..hdl.rec import * +from ..lib.io import * +from ..build.dsl import * +from ..build.res import * +from .tools import * + + +class ConstraintManagerTestCase(FHDLTestCase): + def setUp(self): + self.resources = [ + Resource("clk100", 0, DiffPairs("H1", "H2", dir="i")), + Resource("clk50", 0, Pins("K1")), + Resource("user_led", 0, Pins("A0", dir="o")), + Resource("i2c", 0, + Subsignal("scl", Pins("N10", dir="o")), + Subsignal("sda", Pins("N11")) + ) + ] + self.cm = ConstraintManager(self.resources) + + def test_basic(self): + self.assertEqual(self.cm.resources, { + ("clk100", 0): self.resources[0], + ("clk50", 0): self.resources[1], + ("user_led", 0): self.resources[2], + ("i2c", 0): self.resources[3] + }) + + def test_add_resources(self): + new_resources = [ + Resource("user_led", 1, Pins("A1", dir="o")) + ] + self.cm.add_resources(new_resources) + self.assertEqual(self.cm.resources, { + ("clk100", 0): self.resources[0], + ("clk50", 0): self.resources[1], + ("user_led", 0): self.resources[2], + ("i2c", 0): self.resources[3], + ("user_led", 1): new_resources[0] + }) + + def test_lookup(self): + r = self.cm.lookup("user_led", 0) + self.assertIs(r, self.cm.resources["user_led", 0]) + + def test_request_basic(self): + r = self.cm.lookup("user_led", 0) + user_led = self.cm.request("user_led", 0) + + self.assertIsInstance(user_led, Pin) + self.assertEqual(user_led.name, "user_led_0") + self.assertEqual(user_led.width, 1) + self.assertEqual(user_led.dir, "o") + + ports = list(self.cm.iter_ports()) + self.assertEqual(len(ports), 1) + self.assertIs(user_led.o, ports[0]) + + self.assertEqual(list(self.cm.iter_port_constraints()), [ + ("user_led_0__o", ["A0"], []) + ]) + + def test_request_with_dir(self): + i2c = self.cm.request("i2c", 0, dir={"sda": "o"}) + self.assertIsInstance(i2c, Record) + self.assertIsInstance(i2c.sda, Pin) + self.assertEqual(i2c.sda.dir, "o") + + def test_request_tristate(self): + i2c = self.cm.request("i2c", 0) + self.assertEqual(i2c.sda.dir, "io") + + ports = list(self.cm.iter_ports()) + self.assertEqual(len(ports), 2) + self.assertIs(i2c.scl.o, ports[0]), + self.assertEqual(ports[1].name, "i2c_0__sda_io") + self.assertEqual(ports[1].nbits, 1) + + self.assertEqual(self.cm._tristates, [(i2c.sda, ports[1])]) + self.assertEqual(list(self.cm.iter_port_constraints()), [ + ("i2c_0__scl__o", ["N10"], []), + ("i2c_0__sda_io", ["N11"], []) + ]) + + def test_request_diffpairs(self): + clk100 = self.cm.request("clk100", 0) + self.assertIsInstance(clk100, Pin) + self.assertEqual(clk100.dir, "i") + self.assertEqual(clk100.width, 1) + + ports = list(self.cm.iter_ports()) + self.assertEqual(len(ports), 2) + p, n = ports + self.assertEqual(p.name, "clk100_0_p") + self.assertEqual(p.nbits, clk100.width) + self.assertEqual(n.name, "clk100_0_n") + self.assertEqual(n.nbits, clk100.width) + + self.assertEqual(self.cm._diffpairs, [(clk100, p, n)]) + self.assertEqual(list(self.cm.iter_port_constraints()), [ + ("clk100_0_p", ["H1"], []), + ("clk100_0_n", ["H2"], []) + ]) + + def test_add_clock(self): + self.cm.add_clock("clk100", 0, 10e6) + self.assertEqual(self.cm.clocks["clk100", 0], 10e6) + self.cm.add_clock("clk50", 0, 5e6) + + clk100 = self.cm.request("clk100", 0) + clk50 = self.cm.request("clk50", 0, dir="i") + self.assertEqual(list(sorted(self.cm.iter_clock_constraints())), [ + ("clk100_0_p", 10e6), + ("clk50_0__i", 5e6) + ]) + + def test_wrong_resources(self): + with self.assertRaises(TypeError, msg="Object 'wrong' is not a Resource"): + self.cm.add_resources(['wrong']) + + def test_wrong_resources_duplicate(self): + with self.assertRaises(NameError, + msg="Trying to add (resource user_led 0 (pins o A1) ), but " + "(resource user_led 0 (pins o A0) ) has the same name and number"): + self.cm.add_resources([Resource("user_led", 0, Pins("A1", dir="o"))]) + + def test_wrong_lookup(self): + with self.assertRaises(NameError, + msg="Resource user_led#1 does not exist"): + r = self.cm.lookup("user_led", 1) + + def test_wrong_frequency_subsignals(self): + with self.assertRaises(ConstraintError, + msg="Cannot constrain frequency of resource i2c#0 because " + "it has subsignals"): + self.cm.add_clock("i2c", 0, 10e6) + + def test_wrong_frequency_tristate(self): + with self.assertRaises(ConstraintError, + msg="Cannot constrain frequency of resource clk50#0 because " + "it has been requested as a tristate buffer"): + self.cm.add_clock("clk50", 0, 20e6) + clk50 = self.cm.request("clk50", 0) + list(self.cm.iter_clock_constraints()) + + def test_wrong_frequency_duplicate(self): + with self.assertRaises(ConstraintError, + msg="Resource clk100#0 is already constrained to a frequency of 10.000000 MHz"): + self.cm.add_clock("clk100", 0, 10e6) + self.cm.add_clock("clk100", 0, 5e6) + + def test_wrong_request_duplicate(self): + with self.assertRaises(ConstraintError, + msg="Resource user_led#0 has already been requested"): + self.cm.request("user_led", 0) + self.cm.request("user_led", 0) + + def test_wrong_request_with_dir(self): + with self.assertRaises(TypeError, + msg="Direction must be one of \"i\", \"o\" or \"io\", not 'wrong'"): + user_led = self.cm.request("user_led", 0, dir="wrong") + + def test_wrong_request_with_dir_io(self): + with self.assertRaises(ValueError, + msg="Direction of (pins o A0) cannot be changed from \"o\" to \"i\"; direction " + "can be changed from \"io\" to \"i\" or from \"io\"to \"o\""): + user_led = self.cm.request("user_led", 0, dir="i") + + def test_wrong_request_with_dir_dict(self): + with self.assertRaises(TypeError, + msg="Directions must be a dict, not 'i', because (resource i2c 0 (subsignal scl " + "(pins o N10) ) (subsignal sda (pins io N11) ) ) has subsignals"): + i2c = self.cm.request("i2c", 0, dir="i") + + def test_wrong_request_with_wrong_xdr(self): + with self.assertRaises(ValueError, + msg="Data rate of (pins o A0) must be a positive integer, not 0"): + user_led = self.cm.request("user_led", 0, xdr=0) + + def test_wrong_request_with_xdr_dict(self): + with self.assertRaises(TypeError, + msg="Data rate must be a dict, not 2, because (resource i2c 0 (subsignal scl " + "(pins o N10) ) (subsignal sda (pins io N11) ) ) has subsignals"): + i2c = self.cm.request("i2c", 0, xdr=2)