class Simulator:
def __init__(self, fragment=None, vcd_file=None):
self._fragments = {} # fragment -> hierarchy
+
self._domains = {} # str/domain -> ClockDomain
self._domain_triggers = ValueDict() # Signal -> str/domain
self._domain_signals = {} # str/domain -> {Signal}
+
self._signals = ValueSet() # {Signal}
self._comb_signals = ValueSet() # {Signal}
self._sync_signals = ValueSet() # {Signal}
self._started = False
self._timestamp = 0.
+ self._epsilon = 1e-10
self._state = _State()
self._processes = set() # {process}
self._wait_deadline = {} # process -> float/timestamp
self._wait_tick = {} # process -> str/domain
- self._handlers = ValueDict() # Signal -> set(lambda)
+ self._funclets = ValueDict() # Signal -> set(lambda)
self._vcd_file = vcd_file
self._vcd_writer = None
return "{}_{}".format(name, signal.name)
return signal.name
- def _add_handler(self, signal, handler):
- if signal not in self._handlers:
- self._handlers[signal] = set()
- self._handlers[signal].add(handler)
+ def _add_funclet(self, signal, funclet):
+ if signal not in self._funclets:
+ self._funclets[signal] = set()
+ self._funclets[signal].add(funclet)
def __enter__(self):
if self._vcd_file:
self._domain_signals[domain].update(signals)
compiler = _StatementCompiler()
- handler = compiler(fragment.statements)
+ funclet = compiler(fragment.statements)
for signal in compiler.sensitivity:
- self._add_handler(signal, handler)
+ self._add_funclet(signal, funclet)
for domain, cd in fragment.domains.items():
- self._add_handler(cd.clk, handler)
+ self._add_funclet(cd.clk, funclet)
if cd.rst is not None:
- self._add_handler(cd.rst, handler)
+ self._add_funclet(cd.rst, funclet)
self._user_signals = self._signals - self._comb_signals - self._sync_signals
- def _commit_signal(self, signal):
+ def _update_dirty_signals(self):
+ """Perform the statement part of IR processes (aka RTLIL case)."""
+ # First, for all dirty signals, use sensitivity lists to determine the set of fragments
+ # that need their statements to be reevaluated because the signals changed at the previous
+ # delta cycle.
+ funclets = set()
+ while self._state.curr_dirty:
+ signal = self._state.curr_dirty.pop()
+ if signal in self._funclets:
+ funclets.update(self._funclets[signal])
+
+ # Second, compute the values of all signals at the start of the next delta cycle, by
+ # running precompiled statements.
+ for funclet in funclets:
+ funclet(self._state)
+
+ def _commit_signal(self, signal, domains):
+ """Perform the driver part of IR processes (aka RTLIL sync), for individual signals."""
+ # Take the computed value (at the start of this delta cycle) of a signal (that could have
+ # come from an IR process that ran earlier, or modified by a simulator process) and update
+ # the value for this delta cycle.
old, new = self._state.commit(signal)
- if (old, new) == (0, 1) and signal in self._domain_triggers:
- domain = self._domain_triggers[signal]
- for sync_signal in self._state.next_dirty:
- if sync_signal in self._domain_signals[domain]:
- self._commit_signal(sync_signal)
- for proc, wait_domain in list(self._wait_tick.items()):
- if domain == wait_domain:
- del self._wait_tick[proc]
- self._suspended.remove(proc)
+ # If the signal is a clock that triggers synchronous logic, record that fact.
+ if (old, new) == (0, 1) and signal in self._domain_triggers:
+ domains.add(self._domain_triggers[signal])
if self._vcd_writer:
+ # Finally, dump the new value to the VCD file.
for vcd_signal in self._vcd_signals[signal]:
- self._vcd_writer.change(vcd_signal, self._timestamp * 1e10, new)
-
- def _handle_event(self):
- handlers = set()
- while self._state.curr_dirty:
- signal = self._state.curr_dirty.pop()
- if signal in self._handlers:
- handlers.update(self._handlers[signal])
-
- for handler in handlers:
- handler(self._state)
+ self._vcd_writer.change(vcd_signal, self._timestamp / self._epsilon, new)
+ def _commit_comb_signals(self, domains):
+ """Perform the comb part of IR processes (aka RTLIL always)."""
+ # Take the computed value (at the start of this delta cycle) of every comb signal and
+ # update the value for this delta cycle.
for signal in self._state.next_dirty:
if signal in self._comb_signals or signal in self._user_signals:
- self._commit_signal(signal)
+ self._commit_signal(signal, domains)
+
+ def _commit_sync_signals(self, domains):
+ """Perform the sync part of IR processes (aka RTLIL posedge)."""
+ # At entry, `domains` contains a list of every simultaneously triggered sync update.
+ while domains:
+ # Advance the timeline a bit (purely for observational purposes) and commit all of them
+ # at the same timestamp.
+ self._timestamp += self._epsilon
+ curr_domains, domains = domains, set()
+
+ while curr_domains:
+ domain = curr_domains.pop()
+
+ # Take the computed value (at the start of this delta cycle) of every sync signal
+ # in this domain and update the value for this delta cycle. This can trigger more
+ # synchronous logic, so record that.
+ for signal in self._state.next_dirty:
+ if signal in self._domain_signals[domain]:
+ self._commit_signal(signal, domains)
+
+ # Wake up any simulator processes that wait for a domain tick.
+ for proc, wait_domain in list(self._wait_tick.items()):
+ if domain == wait_domain:
+ del self._wait_tick[proc]
+ self._suspended.remove(proc)
+
+ # Unless handling synchronous logic above has triggered more synchronous logic (which
+ # can happen e.g. if a domain is clocked off a clock divisor in fabric), we're done.
+ # Otherwise, do one more round of updates.
def _force_signal(self, signal, value):
assert signal in self._user_signals
self._state.set_next(signal, value)
- self._commit_signal(signal)
+
+ domains = set()
+ self._commit_signal(signal, domains)
+ self._commit_sync_signals(domains)
def _run_process(self, proc):
try:
def step(self, run_passive=False):
# Are there any delta cycles we should run?
while self._state.curr_dirty:
- self._timestamp += 1e-10
- self._handle_event()
+ self._timestamp += self._epsilon
+
+ domains = set()
+ self._update_dirty_signals()
+ self._commit_comb_signals(domains)
+ self._commit_sync_signals(domains)
# Are there any processes that haven't had a chance to run yet?
if len(self._processes) > len(self._suspended):
while self._timestamp < deadline:
if not self.step(run_passive):
return False
+
return True
def __exit__(self, *args):
if self._vcd_writer:
- self._vcd_writer.close(self._timestamp * 1e10)
+ self._vcd_writer.close(self._timestamp / self._epsilon)