__all__ = ["Settle", "Delay", "Tick", "Passive", "Active", "Simulator"]
+class _NameExtractor:
+ def __init__(self):
+ self.names = SignalDict()
+
+ def __call__(self, fragment, *, hierarchy=("top",)):
+ def add_signal_name(signal):
+ hierarchical_signal_name = (*hierarchy, signal.name)
+ if signal not in self.names:
+ self.names[signal] = {hierarchical_signal_name}
+ else:
+ self.names[signal].add(hierarchical_signal_name)
+
+ for domain_name, domain_signals in fragment.drivers.items():
+ if domain_name is not None:
+ domain = fragment.domains[domain_name]
+ add_signal_name(domain.clk)
+ if domain.rst is not None:
+ add_signal_name(domain.rst)
+
+ for statement in fragment.statements:
+ for signal in statement._lhs_signals() | statement._rhs_signals():
+ if not isinstance(signal, (ClockSignal, ResetSignal)):
+ add_signal_name(signal)
+
+ for subfragment_index, (subfragment, subfragment_name) in enumerate(fragment.subfragments):
+ if subfragment_name is None:
+ subfragment_name = "U${}".format(subfragment_index)
+ self(subfragment, hierarchy=(*hierarchy, subfragment_name))
+
+ return self.names
+
+
class _WaveformWriter:
def update(self, timestamp, signal, value):
raise NotImplementedError # :nocov:
def decode_to_vcd(signal, value):
return signal.decoder(value).expandtabs().replace(" ", "_")
- def __init__(self, signal_names, *, vcd_file, gtkw_file=None, traces=()):
+ def __init__(self, fragment, *, vcd_file, gtkw_file=None, traces=()):
if isinstance(vcd_file, str):
vcd_file = open(vcd_file, "wt")
if isinstance(gtkw_file, str):
self.traces = []
+ signal_names = _NameExtractor()(fragment)
+
trace_names = SignalDict()
for trace in traces:
if trace not in signal_names:
class _FragmentCompiler:
- def __init__(self, state, signal_names):
+ def __init__(self, state):
self.state = state
- self.signal_names = signal_names
def __call__(self, fragment, *, hierarchy=("top",)):
processes = set()
- def add_signal_name(signal):
- hierarchical_signal_name = (*hierarchy, signal.name)
- if signal not in self.signal_names:
- self.signal_names[signal] = {hierarchical_signal_name}
- else:
- self.signal_names[signal].add(hierarchical_signal_name)
-
for domain_name, domain_signals in fragment.drivers.items():
domain_stmts = LHSGroupFilter(domain_signals)(fragment.statements)
domain_process = _CompiledProcess(self.state, comb=domain_name is None)
else:
domain = fragment.domains[domain_name]
- add_signal_name(domain.clk)
- if domain.rst is not None:
- add_signal_name(domain.rst)
-
clk_trigger = 1 if domain.clk_edge == "pos" else 0
self.state.add_trigger(domain_process, domain.clk, trigger=clk_trigger)
if domain.rst is not None and domain.async_reset:
processes.add(domain_process)
- for used_signal in domain_process.state.signals:
- add_signal_name(used_signal)
-
for subfragment_index, (subfragment, subfragment_name) in enumerate(fragment.subfragments):
if subfragment_name is None:
subfragment_name = "U${}".format(subfragment_index)
self._state = _SimulatorState()
self._signal_names = SignalDict()
self._fragment = Fragment.get(fragment, platform=None).prepare()
- self._processes = _FragmentCompiler(self._state, self._signal_names)(self._fragment)
+ self._processes = _FragmentCompiler(self._state)(self._fragment)
self._clocked = set()
self._waveform_writers = []
"""
if self._state.timeline.now != 0.0:
raise ValueError("Cannot start writing waveforms after advancing simulation time")
- waveform_writer = _VCDWaveformWriter(self._signal_names,
+ waveform_writer = _VCDWaveformWriter(self._fragment,
vcd_file=vcd_file, gtkw_file=gtkw_file, traces=traces)
self._waveform_writers.append(waveform_writer)
yield