See #57.
def emit_design(backend):
return {"rtlil": rtlil, "verilog": verilog}[backend].convert(
fragment, name=name, platform=self, ports=list(self.iter_ports()),
- ensure_sync_exists=False)
+ missing_domain=lambda name: None)
def emit_commands(format):
commands = []
import warnings
from ...hdl.ir import Fragment
+from ...hdl.cd import ClockDomain
from ...back import verilog
from .conv_output import ConvOutput
DeprecationWarning, stacklevel=1)
# TODO: attr_translate
+ def missing_domain(name):
+ if create_clock_domains:
+ return ClockDomain(name)
v_output = verilog.convert(
fragment=Fragment.get(fi.get_fragment(), platform=None),
name=name,
ports=ios or (),
- ensure_sync_exists=create_clock_domains
+ missing_domain=missing_domain
)
output = ConvOutput()
output.set_main_source(v_output)
import functools
import inspect
from collections.abc import Iterable
+from ...hdl.cd import ClockDomain
from ...back.pysim import *
if not isinstance(generators, dict):
generators = {"sync": generators}
+ fragment.domains += ClockDomain("sync")
with Simulator(fragment, vcd_file=open(vcd_name, "w") if vcd_name else None) as sim:
for domain, period in clocks.items():
subfrag._propagate_domains_down()
- def _propagate_domains(self, ensure_sync_exists):
+ def _propagate_domains(self, missing_domain):
+ from .xfrm import DomainCollector
+
self._propagate_domains_up()
- if ensure_sync_exists and not self.domains:
- cd_sync = ClockDomain()
- self.add_domains(cd_sync)
- new_domains = (cd_sync,)
- else:
- new_domains = ()
+ new_domains = []
+ for domain_name in DomainCollector()(self):
+ if domain_name is None:
+ continue
+ if domain_name not in self.domains:
+ domain = missing_domain(domain_name)
+ if domain is not None:
+ self.add_domains(domain)
+ new_domains.append(domain)
self._propagate_domains_down()
return new_domains
else:
self.add_ports(sig, dir="i")
- def prepare(self, ports=None, ensure_sync_exists=True):
+ def prepare(self, ports=None, missing_domain=lambda name: ClockDomain(name)):
from .xfrm import SampleLowerer
fragment = SampleLowerer()(self)
- new_domains = fragment._propagate_domains(ensure_sync_exists)
+ new_domains = fragment._propagate_domains(missing_domain)
fragment._resolve_hierarchy_conflicts()
fragment = fragment._insert_domain_resets()
fragment = fragment._lower_domain_signals()
"StatementVisitor", "StatementTransformer",
"FragmentTransformer",
"TransformedElaboratable",
- "DomainRenamer", "DomainLowerer",
+ "DomainCollector", "DomainRenamer", "DomainLowerer",
"SampleDomainInjector", "SampleLowerer",
"SwitchCleaner", "LHSGroupAnalyzer", "LHSGroupFilter",
"ResetInserter", "CEInserter"]
return fragment
+class DomainCollector(ValueVisitor, StatementVisitor):
+ def __init__(self):
+ self.domains = set()
+
+ def on_ignore(self, value):
+ pass
+
+ on_Const = on_ignore
+ on_AnyConst = on_ignore
+ on_AnySeq = on_ignore
+ on_Signal = on_ignore
+
+ def on_ClockSignal(self, value):
+ self.domains.add(value.domain)
+
+ def on_ResetSignal(self, value):
+ self.domains.add(value.domain)
+
+ on_Record = on_ignore
+
+ def on_Operator(self, value):
+ for o in value.operands:
+ self.on_value(o)
+
+ def on_Slice(self, value):
+ self.on_value(value.value)
+
+ def on_Part(self, value):
+ self.on_value(value.value)
+ self.on_value(value.offset)
+
+ def on_Cat(self, value):
+ for o in value.parts:
+ self.on_value(o)
+
+ def on_Repl(self, value):
+ self.on_value(value.value)
+
+ def on_ArrayProxy(self, value):
+ for elem in value._iter_as_values():
+ self.on_value(elem)
+ self.on_value(value.index)
+
+ def on_Sample(self, value):
+ self.on_value(value.value)
+
+ def on_Assign(self, stmt):
+ self.on_value(stmt.lhs)
+ self.on_value(stmt.rhs)
+
+ def on_Assert(self, stmt):
+ self.on_value(stmt.test)
+
+ def on_Assume(self, stmt):
+ self.on_value(stmt.test)
+
+ def on_Switch(self, stmt):
+ self.on_value(stmt.test)
+ for stmts in stmt.cases.values():
+ self.on_statement(stmts)
+
+ def on_statements(self, stmts):
+ for stmt in stmts:
+ self.on_statement(stmt)
+
+ def on_fragment(self, fragment):
+ if isinstance(fragment, Instance):
+ for name, (value, dir) in fragment.named_ports.items():
+ self.on_value(value)
+ self.on_statements(fragment.statements)
+ self.domains.update(fragment.drivers.keys())
+ for subfragment, name in fragment.subfragments:
+ self.on_fragment(subfragment)
+
+ def __call__(self, fragment):
+ self.on_fragment(fragment)
+ return self.domains
+
+
class DomainRenamer(FragmentTransformer, ValueTransformer, StatementTransformer):
def __init__(self, domain_map):
if isinstance(domain_map, str):
f1.add_domains(cd)
f1.add_subfragment(f2)
- f1._propagate_domains(ensure_sync_exists=False)
+ f1._propagate_domains(missing_domain=lambda name: None)
self.assertEqual(f1.domains, {"cd": cd})
self.assertEqual(f2.domains, {"cd": cd})
- def test_propagate_ensure_sync(self):
+ def test_propagate_create_missing(self):
+ s1 = Signal()
f1 = Fragment()
+ f1.add_driver(s1, "sync")
f2 = Fragment()
f1.add_subfragment(f2)
- f1._propagate_domains(ensure_sync_exists=True)
+ f1._propagate_domains(missing_domain=lambda name: ClockDomain(name))
self.assertEqual(f1.domains.keys(), {"sync"})
self.assertEqual(f2.domains.keys(), {"sync"})
self.assertEqual(f1.domains["sync"], f2.domains["sync"])
f = Fragment()
f.add_subfragment(Instance("foo", o_O=s[0]))
f.add_subfragment(Instance("foo", o_O=s[1]))
- fp = f.prepare(ports=[s], ensure_sync_exists=False)
+ fp = f.prepare(ports=[s], missing_domain=lambda name: None)
self.assertEqual(fp.ports, SignalDict([
(s, "o"),
]))
sim.add_process(process)
def test_run_until(self):
- with self.assertSimulation(Module(), deadline=100e-6) as sim:
+ m = Module()
+ s = Signal()
+ m.d.sync += s.eq(0)
+ with self.assertSimulation(m, deadline=100e-6) as sim:
sim.add_clock(1e-6)
def process():
for _ in range(101):
sim.add_process(1)
def test_add_clock_wrong(self):
- with self.assertSimulation(Module()) as sim:
+ m = Module()
+ s = Signal()
+ m.d.sync += s.eq(0)
+ with self.assertSimulation(m) as sim:
sim.add_clock(1)
with self.assertRaises(ValueError,
msg="Domain 'sync' already has a clock driving it"):