raise NotImplementedError # :nocov:
+class _Timeline:
+ def __init__(self):
+ self.now = 0.0
+ self.deadlines = dict()
+
+ def reset(self):
+ self.now = 0.0
+ self.deadlines.clear()
+
+ def at(self, run_at, process):
+ assert process not in self.deadlines
+ self.deadlines[process] = run_at
+
+ def delay(self, delay_by, process):
+ if delay_by is None:
+ run_at = self.now
+ else:
+ run_at = self.now + delay_by
+ self.at(run_at, process)
+
+ def advance(self):
+ nearest_processes = set()
+ nearest_deadline = None
+ for process, deadline in self.deadlines.items():
+ if deadline is None:
+ if nearest_deadline is not None:
+ nearest_processes.clear()
+ nearest_processes.add(process)
+ nearest_deadline = self.now
+ break
+ elif nearest_deadline is None or deadline <= nearest_deadline:
+ assert deadline >= self.now
+ if nearest_deadline is not None and deadline < nearest_deadline:
+ nearest_processes.clear()
+ nearest_processes.add(process)
+ nearest_deadline = deadline
+
+ if not nearest_processes:
+ return False
+
+ for process in nearest_processes:
+ process.runnable = True
+ del self.deadlines[process]
+ self.now = nearest_deadline
+
+ return True
+
+
class _SignalState:
__slots__ = ("signal", "curr", "next", "waiters", "pending")
class _SimulatorState:
def __init__(self):
- self.signals = SignalDict()
- self.slots = []
- self.pending = set()
-
- self.timestamp = 0.0
- self.deadlines = dict()
+ self.timeline = _Timeline()
+ self.signals = SignalDict()
+ self.slots = []
+ self.pending = set()
def reset(self):
for signal, index in self.signals.items():
self.slots[index].curr = self.slots[index].next = signal.reset
self.pending.clear()
- self.timestamp = 0.0
- self.deadlines.clear()
-
def get_signal(self, signal):
try:
return self.signals[signal]
self.pending.clear()
return converged
- def advance(self):
- nearest_processes = set()
- nearest_deadline = None
- for process, deadline in self.deadlines.items():
- if deadline is None:
- if nearest_deadline is not None:
- nearest_processes.clear()
- nearest_processes.add(process)
- nearest_deadline = self.timestamp
- break
- elif nearest_deadline is None or deadline <= nearest_deadline:
- assert deadline >= self.timestamp
- if nearest_deadline is not None and deadline < nearest_deadline:
- nearest_processes.clear()
- nearest_processes.add(process)
- nearest_deadline = deadline
-
- if not nearest_processes:
- return False
-
- for process in nearest_processes:
- process.runnable = True
- del self.deadlines[process]
- self.timestamp = nearest_deadline
-
- return True
-
class _Emitter:
def __init__(self):
return
elif type(command) is Settle:
- self.state.deadlines[self] = None
+ self.state.timeline.delay(None, self)
return
elif type(command) is Delay:
- if command.interval is None:
- self.state.deadlines[self] = None
- else:
- self.state.deadlines[self] = self.state.timestamp + command.interval
+ self.state.timeline.delay(command.interval, self)
return
elif type(command) is Passive:
for waveform_writer in self._waveform_writers:
for signal_state in self._state.pending:
- waveform_writer.update(self._state.timestamp,
+ waveform_writer.update(self._state.timeline.now,
signal_state.signal, signal_state.curr)
# 2. commit: apply every queued signal change, waking up any waiting processes
Returns ``True`` if there are any active processes, ``False`` otherwise.
"""
self._real_step()
- self._state.advance()
+ self._state.timeline.advance()
return any(not process.passive for process in self._processes)
def run(self):
If the simulation stops advancing, this function will never return.
"""
- assert self._state.timestamp <= deadline
- while (self.advance() or run_passive) and self._state.timestamp < deadline:
+ assert self._state.timeline.now <= deadline
+ while (self.advance() or run_passive) and self._state.timeline.now < deadline:
pass
@contextmanager
traces : iterable of Signal
Signals to display traces for.
"""
- if self._state.timestamp != 0.0:
+ if self._state.timeline.now != 0.0:
raise ValueError("Cannot start writing waveforms after advancing simulation time")
waveform_writer = _VCDWaveformWriter(self._signal_names,
vcd_file=vcd_file, gtkw_file=gtkw_file, traces=traces)
self._waveform_writers.append(waveform_writer)
yield
- waveform_writer.close(self._state.timestamp)
+ waveform_writer.close(self._state.timeline.now)
self._waveform_writers.remove(waveform_writer)