def add_driven(self, signal, sync):
self.driven[signal] = sync
- def add_port(self, signal, kind=None):
- if signal in self.driven:
- self.ports[signal] = (len(self.ports), "output")
- else:
- self.ports[signal] = (len(self.ports), "input")
+ def add_port(self, signal, kind):
+ assert kind in ("i", "o", "io")
+ if kind == "i":
+ kind = "input"
+ elif kind == "o":
+ kind = "output"
+ elif kind == "io":
+ kind = "inout"
+ self.ports[signal] = (len(self.ports), kind)
@contextmanager
def lhs(self):
for domain, signal in fragment.iter_drivers():
xformer.add_driven(signal, sync=domain is not None)
- # Register all signals used as ports in the current fragment. The wires are lazily
- # generated, so registering ports eagerly ensures they get correct direction qualifiers.
+ # Transform all signals used as ports in the current fragment eagerly and outside of
+ # any hierarchy, to make sure they get sensible (non-prefixed) names.
for signal in fragment.ports:
- xformer.add_port(signal)
+ xformer.add_port(signal, fragment.ports[signal])
+ xformer(signal)
# Transform all clocks clocks and resets eagerly and outside of any hierarchy, to make
# sure they get sensible (non-prefixed) names. This does not affect semantics.
if name is None:
try:
- name = tracer.get_var_name()
+ name = tracer.get_var_name(depth=2 + src_loc_at)
except tracer.NameNotFound:
name = "$signal"
self.name = name
other : Value
Object to base this Signal on.
"""
- kw = dict(shape=cls.wrap(other).shape(), name=tracer.get_var_name())
+ kw = dict(shape=cls.wrap(other).shape(),
+ name=tracer.get_var_name(depth=2 + src_loc_at))
if isinstance(other, cls):
kw.update(reset=other.reset, reset_less=other.reset_less, attrs=other.attrs)
kw.update(kwargs)
def __iter__(self):
return map(lambda x: None if x is None else x.value, sorted(self._inner))
+ def __eq__(self, other):
+ if not isinstance(other, ValueDict):
+ return False
+ if len(self) != len(other):
+ return False
+ for ak, bk in zip(self, other):
+ if ValueKey(ak) != ValueKey(bk):
+ return False
+ if self[ak] != other[bk]:
+ return False
+ return True
+
def __len__(self):
return len(self._inner)
+ def __repr__(self):
+ pairs = ["({!r}, {!r})".format(k, v) for k, v in self.items()]
+ return "ValueDict([{}])".format(", ".join(pairs))
+
class ValueSet(MutableSet):
def __init__(self, elements=()):
class Fragment:
def __init__(self):
- self.ports = ValueSet()
+ self.ports = ValueDict()
self.drivers = OrderedDict()
self.statements = []
self.domains = OrderedDict()
self.subfragments = []
- def add_ports(self, *ports):
- self.ports.update(flatten(ports))
+ def add_ports(self, *ports, kind):
+ assert kind in ("i", "o", "io")
+ for port in flatten(ports):
+ self.ports[port] = kind
def iter_ports(self):
- yield from self.ports
+ yield from self.ports.keys()
def drive(self, signal, domain=None):
if domain not in self.drivers:
outs |= ports & sub_outs
# We've computed the precise set of input and output ports.
- self.add_ports(ins, outs)
+ self.add_ports(ins, kind="i")
+ self.add_ports(outs, kind="o")
return ins, outs
def test_empty(self):
f = Fragment()
- ins, outs = f._propagate_ports(ports=())
- self.assertEqual(ins, ValueSet())
- self.assertEqual(outs, ValueSet())
- self.assertEqual(f.ports, ValueSet())
+ f._propagate_ports(ports=())
+ self.assertEqual(f.ports, ValueDict([]))
def test_self_contained(self):
f = Fragment()
self.s1.eq(self.c1)
)
- ins, outs = f._propagate_ports(ports=())
- self.assertEqual(ins, ValueSet())
- self.assertEqual(outs, ValueSet())
- self.assertEqual(f.ports, ValueSet())
+ f._propagate_ports(ports=())
+ self.assertEqual(f.ports, ValueDict([]))
def test_infer_input(self):
f = Fragment()
self.c1.eq(self.s1)
)
- ins, outs = f._propagate_ports(ports=())
- self.assertEqual(ins, ValueSet((self.s1,)))
- self.assertEqual(outs, ValueSet())
- self.assertEqual(f.ports, ValueSet((self.s1,)))
+ f._propagate_ports(ports=())
+ self.assertEqual(f.ports, ValueDict([
+ (self.s1, "i")
+ ]))
def test_request_output(self):
f = Fragment()
self.c1.eq(self.s1)
)
- ins, outs = f._propagate_ports(ports=(self.c1,))
- self.assertEqual(ins, ValueSet((self.s1,)))
- self.assertEqual(outs, ValueSet((self.c1,)))
- self.assertEqual(f.ports, ValueSet((self.s1, self.c1)))
+ f._propagate_ports(ports=(self.c1,))
+ self.assertEqual(f.ports, ValueDict([
+ (self.s1, "i"),
+ (self.c1, "o")
+ ]))
def test_input_in_subfragment(self):
f1 = Fragment()
self.s1.eq(0)
)
f1.add_subfragment(f2)
- ins, outs = f1._propagate_ports(ports=())
- self.assertEqual(ins, ValueSet())
- self.assertEqual(outs, ValueSet())
- self.assertEqual(f1.ports, ValueSet())
- self.assertEqual(f2.ports, ValueSet((self.s1,)))
+ f1._propagate_ports(ports=())
+ self.assertEqual(f1.ports, ValueDict())
+ self.assertEqual(f2.ports, ValueDict([
+ (self.s1, "o"),
+ ]))
def test_input_only_in_subfragment(self):
f1 = Fragment()
self.c1.eq(self.s1)
)
f1.add_subfragment(f2)
- ins, outs = f1._propagate_ports(ports=())
- self.assertEqual(ins, ValueSet((self.s1,)))
- self.assertEqual(outs, ValueSet())
- self.assertEqual(f1.ports, ValueSet((self.s1,)))
- self.assertEqual(f2.ports, ValueSet((self.s1,)))
+ f1._propagate_ports(ports=())
+ self.assertEqual(f1.ports, ValueDict([
+ (self.s1, "i"),
+ ]))
+ self.assertEqual(f2.ports, ValueDict([
+ (self.s1, "i"),
+ ]))
def test_output_from_subfragment(self):
f1 = Fragment()
)
f1.add_subfragment(f2)
- ins, outs = f1._propagate_ports(ports=(self.c2,))
- self.assertEqual(ins, ValueSet())
- self.assertEqual(outs, ValueSet((self.c2,)))
- self.assertEqual(f1.ports, ValueSet((self.c2,)))
- self.assertEqual(f2.ports, ValueSet((self.c2,)))
+ f1._propagate_ports(ports=(self.c2,))
+ self.assertEqual(f1.ports, ValueDict([
+ (self.c2, "o"),
+ ]))
+ self.assertEqual(f2.ports, ValueDict([
+ (self.c2, "o"),
+ ]))
def test_input_cd(self):
sync = ClockDomain()
f.add_domains(sync)
f.drive(self.c1, "sync")
- ins, outs = f._propagate_ports(ports=())
- self.assertEqual(ins, ValueSet((self.s1, sync.clk, sync.rst)))
- self.assertEqual(outs, ValueSet(()))
- self.assertEqual(f.ports, ValueSet((self.s1, sync.clk, sync.rst)))
+ f._propagate_ports(ports=())
+ self.assertEqual(f.ports, ValueDict([
+ (self.s1, "i"),
+ (sync.clk, "i"),
+ (sync.rst, "i"),
+ ]))
def test_input_cd_reset_less(self):
sync = ClockDomain(reset_less=True)
f.add_domains(sync)
f.drive(self.c1, "sync")
- ins, outs = f._propagate_ports(ports=())
- self.assertEqual(ins, ValueSet((self.s1, sync.clk)))
- self.assertEqual(outs, ValueSet(()))
- self.assertEqual(f.ports, ValueSet((self.s1, sync.clk)))
+ f._propagate_ports(ports=())
+ self.assertEqual(f.ports, ValueDict([
+ (self.s1, "i"),
+ (sync.clk, "i"),
+ ]))
class FragmentDomainsTestCase(FHDLTestCase):