--- /dev/null
+import os
+import tempfile
+from contextlib import contextmanager
+
+from nmigen.hdl import *
+from nmigen.hdl.ast import SignalSet
+from nmigen.hdl.xfrm import ValueVisitor, StatementVisitor, LHSGroupFilter
+from nmigen.sim._base import BaseProcess
+
+
+__all__ = ["PyRTLProcess"]
+
+
+class PyRTLProcess(BaseProcess):
+ __slots__ = ("is_comb", "runnable", "passive", "run")
+
+ def __init__(self, *, is_comb):
+ self.is_comb = is_comb
+
+ self.reset()
+
+ def reset(self):
+ self.runnable = self.is_comb
+ self.passive = True
+
+
+class _PythonEmitter:
+ def __init__(self):
+ self._buffer = []
+ self._suffix = 0
+ self._level = 0
+
+ def append(self, code):
+ self._buffer.append(" " * self._level)
+ self._buffer.append(code)
+ self._buffer.append("\n")
+
+ @contextmanager
+ def indent(self):
+ self._level += 1
+ yield
+ self._level -= 1
+
+ def flush(self, indent=""):
+ code = "".join(self._buffer)
+ self._buffer.clear()
+ return code
+
+ def gen_var(self, prefix):
+ name = f"{prefix}_{self._suffix}"
+ self._suffix += 1
+ return name
+
+ def def_var(self, prefix, value):
+ name = self.gen_var(prefix)
+ self.append(f"{name} = {value}")
+ return name
+
+
+class _Compiler:
+ def __init__(self, state, emitter):
+ self.state = state
+ self.emitter = emitter
+
+
+class _ValueCompiler(ValueVisitor, _Compiler):
+ helpers = {
+ "sign": lambda value, sign: value | sign if value & sign else value,
+ "zdiv": lambda lhs, rhs: 0 if rhs == 0 else lhs // rhs,
+ "zmod": lambda lhs, rhs: 0 if rhs == 0 else lhs % rhs,
+ }
+
+ def on_ClockSignal(self, value):
+ raise NotImplementedError # :nocov:
+
+ def on_ResetSignal(self, value):
+ raise NotImplementedError # :nocov:
+
+ def on_AnyConst(self, value):
+ raise NotImplementedError # :nocov:
+
+ def on_AnySeq(self, value):
+ raise NotImplementedError # :nocov:
+
+ def on_Sample(self, value):
+ raise NotImplementedError # :nocov:
+
+ def on_Initial(self, value):
+ raise NotImplementedError # :nocov:
+
+
+class _RHSValueCompiler(_ValueCompiler):
+ def __init__(self, state, emitter, *, mode, inputs=None):
+ super().__init__(state, emitter)
+ assert mode in ("curr", "next")
+ self.mode = mode
+ # If not None, `inputs` gets populated with RHS signals.
+ self.inputs = inputs
+
+ def on_Const(self, value):
+ return f"{value.value}"
+
+ def on_Signal(self, value):
+ if self.inputs is not None:
+ self.inputs.add(value)
+
+ if self.mode == "curr":
+ return f"slots[{self.state.get_signal(value)}].{self.mode}"
+ else:
+ return f"next_{self.state.get_signal(value)}"
+
+ def on_Operator(self, value):
+ def mask(value):
+ value_mask = (1 << len(value)) - 1
+ return f"({value_mask} & {self(value)})"
+
+ def sign(value):
+ if value.shape().signed:
+ return f"sign({mask(value)}, {-1 << (len(value) - 1)})"
+ else: # unsigned
+ return mask(value)
+
+ if len(value.operands) == 1:
+ arg, = value.operands
+ if value.operator == "~":
+ return f"(~{self(arg)})"
+ if value.operator == "-":
+ return f"(-{sign(arg)})"
+ if value.operator == "b":
+ return f"bool({mask(arg)})"
+ if value.operator == "r|":
+ return f"(0 != {mask(arg)})"
+ if value.operator == "r&":
+ return f"({(1 << len(arg)) - 1} == {mask(arg)})"
+ if value.operator == "r^":
+ # Believe it or not, this is the fastest way to compute a sideways XOR in Python.
+ return f"(format({mask(arg)}, 'b').count('1') % 2)"
+ if value.operator in ("u", "s"):
+ # These operators don't change the bit pattern, only its interpretation.
+ return self(arg)
+ elif len(value.operands) == 2:
+ lhs, rhs = value.operands
+ if value.operator == "+":
+ return f"({sign(lhs)} + {sign(rhs)})"
+ if value.operator == "-":
+ return f"({sign(lhs)} - {sign(rhs)})"
+ if value.operator == "*":
+ return f"({sign(lhs)} * {sign(rhs)})"
+ if value.operator == "//":
+ return f"zdiv({sign(lhs)}, {sign(rhs)})"
+ if value.operator == "%":
+ return f"zmod({sign(lhs)}, {sign(rhs)})"
+ if value.operator == "&":
+ return f"({self(lhs)} & {self(rhs)})"
+ if value.operator == "|":
+ return f"({self(lhs)} | {self(rhs)})"
+ if value.operator == "^":
+ return f"({self(lhs)} ^ {self(rhs)})"
+ if value.operator == "<<":
+ return f"({sign(lhs)} << {sign(rhs)})"
+ if value.operator == ">>":
+ return f"({sign(lhs)} >> {sign(rhs)})"
+ if value.operator == "==":
+ return f"({sign(lhs)} == {sign(rhs)})"
+ if value.operator == "!=":
+ return f"({sign(lhs)} != {sign(rhs)})"
+ if value.operator == "<":
+ return f"({sign(lhs)} < {sign(rhs)})"
+ if value.operator == "<=":
+ return f"({sign(lhs)} <= {sign(rhs)})"
+ if value.operator == ">":
+ return f"({sign(lhs)} > {sign(rhs)})"
+ if value.operator == ">=":
+ return f"({sign(lhs)} >= {sign(rhs)})"
+ elif len(value.operands) == 3:
+ if value.operator == "m":
+ sel, val1, val0 = value.operands
+ return f"({self(val1)} if {mask(sel)} else {self(val0)})"
+ raise NotImplementedError("Operator '{}' not implemented".format(value.operator)) # :nocov:
+
+ def on_Slice(self, value):
+ return f"({(1 << len(value)) - 1} & ({self(value.value)} >> {value.start}))"
+
+ def on_Part(self, value):
+ offset_mask = (1 << len(value.offset)) - 1
+ offset = f"({value.stride} * ({offset_mask} & {self(value.offset)}))"
+ return f"({(1 << value.width) - 1} & " \
+ f"{self(value.value)} >> {offset})"
+
+ def on_Cat(self, value):
+ gen_parts = []
+ offset = 0
+ for part in value.parts:
+ part_mask = (1 << len(part)) - 1
+ gen_parts.append(f"(({part_mask} & {self(part)}) << {offset})")
+ offset += len(part)
+ if gen_parts:
+ return f"({' | '.join(gen_parts)})"
+ return f"0"
+
+ def on_Repl(self, value):
+ part_mask = (1 << len(value.value)) - 1
+ gen_part = self.emitter.def_var("repl", f"{part_mask} & {self(value.value)}")
+ gen_parts = []
+ offset = 0
+ for _ in range(value.count):
+ gen_parts.append(f"({gen_part} << {offset})")
+ offset += len(value.value)
+ if gen_parts:
+ return f"({' | '.join(gen_parts)})"
+ return f"0"
+
+ def on_ArrayProxy(self, value):
+ index_mask = (1 << len(value.index)) - 1
+ gen_index = self.emitter.def_var("rhs_index", f"{index_mask} & {self(value.index)}")
+ gen_value = self.emitter.gen_var("rhs_proxy")
+ if value.elems:
+ for index, elem in enumerate(value.elems):
+ if index == 0:
+ self.emitter.append(f"if {index} == {gen_index}:")
+ else:
+ self.emitter.append(f"elif {index} == {gen_index}:")
+ with self.emitter.indent():
+ self.emitter.append(f"{gen_value} = {self(elem)}")
+ self.emitter.append(f"else:")
+ with self.emitter.indent():
+ self.emitter.append(f"{gen_value} = {self(value.elems[-1])}")
+ return gen_value
+ else:
+ return f"0"
+
+ @classmethod
+ def compile(cls, state, value, *, mode):
+ emitter = _PythonEmitter()
+ compiler = cls(state, emitter, mode=mode)
+ emitter.append(f"result = {compiler(value)}")
+ return emitter.flush()
+
+
+class _LHSValueCompiler(_ValueCompiler):
+ def __init__(self, state, emitter, *, rhs, outputs=None):
+ super().__init__(state, emitter)
+ # `rrhs` is used to translate rvalues that are syntactically a part of an lvalue, e.g.
+ # the offset of a Part.
+ self.rrhs = rhs
+ # `lrhs` is used to translate the read part of a read-modify-write cycle during partial
+ # update of an lvalue.
+ self.lrhs = _RHSValueCompiler(state, emitter, mode="next", inputs=None)
+ # If not None, `outputs` gets populated with signals on LHS.
+ self.outputs = outputs
+
+ def on_Const(self, value):
+ raise TypeError # :nocov:
+
+ def on_Signal(self, value):
+ if self.outputs is not None:
+ self.outputs.add(value)
+
+ def gen(arg):
+ value_mask = (1 << len(value)) - 1
+ if value.shape().signed:
+ value_sign = f"sign({value_mask} & {arg}, {-1 << (len(value) - 1)})"
+ else: # unsigned
+ value_sign = f"{value_mask} & {arg}"
+ self.emitter.append(f"next_{self.state.get_signal(value)} = {value_sign}")
+ return gen
+
+ def on_Operator(self, value):
+ raise TypeError # :nocov:
+
+ def on_Slice(self, value):
+ def gen(arg):
+ width_mask = (1 << (value.stop - value.start)) - 1
+ self(value.value)(f"({self.lrhs(value.value)} & " \
+ f"{~(width_mask << value.start)} | " \
+ f"(({width_mask} & {arg}) << {value.start}))")
+ return gen
+
+ def on_Part(self, value):
+ def gen(arg):
+ width_mask = (1 << value.width) - 1
+ offset_mask = (1 << len(value.offset)) - 1
+ offset = f"({value.stride} * ({offset_mask} & {self.rrhs(value.offset)}))"
+ self(value.value)(f"({self.lrhs(value.value)} & " \
+ f"~({width_mask} << {offset}) | " \
+ f"(({width_mask} & {arg}) << {offset}))")
+ return gen
+
+ def on_Cat(self, value):
+ def gen(arg):
+ gen_arg = self.emitter.def_var("cat", arg)
+ offset = 0
+ for part in value.parts:
+ part_mask = (1 << len(part)) - 1
+ self(part)(f"({part_mask} & ({gen_arg} >> {offset}))")
+ offset += len(part)
+ return gen
+
+ def on_Repl(self, value):
+ raise TypeError # :nocov:
+
+ def on_ArrayProxy(self, value):
+ def gen(arg):
+ index_mask = (1 << len(value.index)) - 1
+ gen_index = self.emitter.def_var("index", f"{self.rrhs(value.index)} & {index_mask}")
+ if value.elems:
+ for index, elem in enumerate(value.elems):
+ if index == 0:
+ self.emitter.append(f"if {index} == {gen_index}:")
+ else:
+ self.emitter.append(f"elif {index} == {gen_index}:")
+ with self.emitter.indent():
+ self(elem)(arg)
+ self.emitter.append(f"else:")
+ with self.emitter.indent():
+ self(value.elems[-1])(arg)
+ else:
+ self.emitter.append(f"pass")
+ return gen
+
+
+class _StatementCompiler(StatementVisitor, _Compiler):
+ def __init__(self, state, emitter, *, inputs=None, outputs=None):
+ super().__init__(state, emitter)
+ self.rhs = _RHSValueCompiler(state, emitter, mode="curr", inputs=inputs)
+ self.lhs = _LHSValueCompiler(state, emitter, rhs=self.rhs, outputs=outputs)
+
+ def on_statements(self, stmts):
+ for stmt in stmts:
+ self(stmt)
+ if not stmts:
+ self.emitter.append("pass")
+
+ def on_Assign(self, stmt):
+ gen_rhs = f"({(1 << len(stmt.rhs)) - 1} & {self.rhs(stmt.rhs)})"
+ if stmt.rhs.shape().signed:
+ gen_rhs = f"sign({gen_rhs}, {-1 << (len(stmt.rhs) - 1)})"
+ return self.lhs(stmt.lhs)(gen_rhs)
+
+ def on_Switch(self, stmt):
+ gen_test = self.emitter.def_var("test",
+ f"{(1 << len(stmt.test)) - 1} & {self.rhs(stmt.test)}")
+ for index, (patterns, stmts) in enumerate(stmt.cases.items()):
+ gen_checks = []
+ if not patterns:
+ gen_checks.append(f"True")
+ else:
+ for pattern in patterns:
+ if "-" in pattern:
+ mask = int("".join("0" if b == "-" else "1" for b in pattern), 2)
+ value = int("".join("0" if b == "-" else b for b in pattern), 2)
+ gen_checks.append(f"{value} == ({mask} & {gen_test})")
+ else:
+ value = int(pattern, 2)
+ gen_checks.append(f"{value} == {gen_test}")
+ if index == 0:
+ self.emitter.append(f"if {' or '.join(gen_checks)}:")
+ else:
+ self.emitter.append(f"elif {' or '.join(gen_checks)}:")
+ with self.emitter.indent():
+ self(stmts)
+
+ def on_Assert(self, stmt):
+ raise NotImplementedError # :nocov:
+
+ def on_Assume(self, stmt):
+ raise NotImplementedError # :nocov:
+
+ def on_Cover(self, stmt):
+ raise NotImplementedError # :nocov:
+
+ @classmethod
+ def compile(cls, state, stmt):
+ output_indexes = [state.get_signal(signal) for signal in stmt._lhs_signals()]
+ emitter = _PythonEmitter()
+ for signal_index in output_indexes:
+ emitter.append(f"next_{signal_index} = slots[{signal_index}].next")
+ compiler = cls(state, emitter)
+ compiler(stmt)
+ for signal_index in output_indexes:
+ emitter.append(f"slots[{signal_index}].set(next_{signal_index})")
+ return emitter.flush()
+
+
+class _FragmentCompiler:
+ def __init__(self, state):
+ self.state = state
+
+ def __call__(self, fragment):
+ processes = set()
+
+ for domain_name, domain_signals in fragment.drivers.items():
+ domain_stmts = LHSGroupFilter(domain_signals)(fragment.statements)
+ domain_process = PyRTLProcess(is_comb=domain_name is None)
+
+ emitter = _PythonEmitter()
+ emitter.append(f"def run():")
+ emitter._level += 1
+
+ if domain_name is None:
+ for signal in domain_signals:
+ signal_index = self.state.get_signal(signal)
+ emitter.append(f"next_{signal_index} = {signal.reset}")
+
+ inputs = SignalSet()
+ _StatementCompiler(self.state, emitter, inputs=inputs)(domain_stmts)
+
+ for input in inputs:
+ self.state.add_trigger(domain_process, input)
+
+ else:
+ domain = fragment.domains[domain_name]
+ clk_trigger = 1 if domain.clk_edge == "pos" else 0
+ self.state.add_trigger(domain_process, domain.clk, trigger=clk_trigger)
+ if domain.rst is not None and domain.async_reset:
+ rst_trigger = 1
+ self.state.add_trigger(domain_process, domain.rst, trigger=rst_trigger)
+
+ for signal in domain_signals:
+ signal_index = self.state.get_signal(signal)
+ emitter.append(f"next_{signal_index} = slots[{signal_index}].next")
+
+ _StatementCompiler(self.state, emitter)(domain_stmts)
+
+ for signal in domain_signals:
+ signal_index = self.state.get_signal(signal)
+ emitter.append(f"slots[{signal_index}].set(next_{signal_index})")
+
+ # There shouldn't be any exceptions raised by the generated code, but if there are
+ # (almost certainly due to a bug in the code generator), use this environment variable
+ # to make backtraces useful.
+ code = emitter.flush()
+ if os.getenv("NMIGEN_pysim_dump"):
+ file = tempfile.NamedTemporaryFile("w", prefix="nmigen_pysim_", delete=False)
+ file.write(code)
+ filename = file.name
+ else:
+ filename = "<string>"
+
+ exec_locals = {"slots": self.state.slots, **_ValueCompiler.helpers}
+ exec(compile(code, filename, "exec"), exec_locals)
+ domain_process.run = exec_locals["run"]
+
+ processes.add(domain_process)
+
+ for subfragment_index, (subfragment, subfragment_name) in enumerate(fragment.subfragments):
+ if subfragment_name is None:
+ subfragment_name = "U${}".format(subfragment_index)
+ processes.update(self(subfragment))
+
+ return processes
--- /dev/null
+from contextlib import contextmanager
+import itertools
+from vcd import VCDWriter
+from vcd.gtkw import GTKWSave
+
+from nmigen.hdl import *
+from nmigen.hdl.ast import SignalDict
+from nmigen.sim._base import *
+from _pyrtl import _FragmentCompiler
+from nmigen.sim._pycoro import PyCoroProcess
+from nmigen.sim._pyclock import PyClockProcess
+
+
+__all__ = ["PySimEngine"]
+
+
+class _NameExtractor:
+ def __init__(self):
+ self.names = SignalDict()
+
+ def __call__(self, fragment, *, hierarchy=("top",)):
+ def add_signal_name(signal):
+ hierarchical_signal_name = (*hierarchy, signal.name)
+ if signal not in self.names:
+ self.names[signal] = {hierarchical_signal_name}
+ else:
+ self.names[signal].add(hierarchical_signal_name)
+
+ for domain_name, domain_signals in fragment.drivers.items():
+ if domain_name is not None:
+ domain = fragment.domains[domain_name]
+ add_signal_name(domain.clk)
+ if domain.rst is not None:
+ add_signal_name(domain.rst)
+
+ for statement in fragment.statements:
+ for signal in statement._lhs_signals() | statement._rhs_signals():
+ if not isinstance(signal, (ClockSignal, ResetSignal)):
+ add_signal_name(signal)
+
+ for subfragment_index, (subfragment, subfragment_name) in enumerate(fragment.subfragments):
+ if subfragment_name is None:
+ subfragment_name = "U${}".format(subfragment_index)
+ self(subfragment, hierarchy=(*hierarchy, subfragment_name))
+
+ return self.names
+
+
+class _VCDWriter:
+ @staticmethod
+ def timestamp_to_vcd(timestamp):
+ return timestamp * (10 ** 10) # 1/(100 ps)
+
+ @staticmethod
+ def decode_to_vcd(signal, value):
+ return signal.decoder(value).expandtabs().replace(" ", "_")
+
+ def __init__(self, fragment, *, vcd_file, gtkw_file=None, traces=()):
+ if isinstance(vcd_file, str):
+ vcd_file = open(vcd_file, "wt")
+ if isinstance(gtkw_file, str):
+ gtkw_file = open(gtkw_file, "wt")
+
+ self.vcd_vars = SignalDict()
+ self.vcd_file = vcd_file
+ self.vcd_writer = vcd_file and VCDWriter(self.vcd_file,
+ timescale="100 ps", comment="Generated by nMigen")
+
+ self.gtkw_names = SignalDict()
+ self.gtkw_file = gtkw_file
+ self.gtkw_save = gtkw_file and GTKWSave(self.gtkw_file)
+
+ self.traces = []
+
+ signal_names = _NameExtractor()(fragment)
+
+ trace_names = SignalDict()
+ for trace in traces:
+ if trace not in signal_names:
+ trace_names[trace] = {("top", trace.name)}
+ self.traces.append(trace)
+
+ if self.vcd_writer is None:
+ return
+
+ for signal, names in itertools.chain(signal_names.items(), trace_names.items()):
+ if signal.decoder:
+ var_type = "string"
+ var_size = 1
+ var_init = self.decode_to_vcd(signal, signal.reset)
+ else:
+ var_type = "wire"
+ var_size = signal.width
+ var_init = signal.reset
+
+ for (*var_scope, var_name) in names:
+ suffix = None
+ while True:
+ try:
+ if suffix is None:
+ var_name_suffix = var_name
+ else:
+ var_name_suffix = "{}${}".format(var_name, suffix)
+ if signal not in self.vcd_vars:
+ vcd_var = self.vcd_writer.register_var(
+ scope=var_scope, name=var_name_suffix,
+ var_type=var_type, size=var_size, init=var_init)
+ self.vcd_vars[signal] = vcd_var
+ else:
+ self.vcd_writer.register_alias(
+ scope=var_scope, name=var_name_suffix,
+ var=self.vcd_vars[signal])
+ break
+ except KeyError:
+ suffix = (suffix or 0) + 1
+
+ if signal not in self.gtkw_names:
+ self.gtkw_names[signal] = (*var_scope, var_name_suffix)
+
+ def update(self, timestamp, signal, value):
+ vcd_var = self.vcd_vars.get(signal)
+ if vcd_var is None:
+ return
+
+ vcd_timestamp = self.timestamp_to_vcd(timestamp)
+ if signal.decoder:
+ var_value = self.decode_to_vcd(signal, value)
+ else:
+ var_value = value
+ self.vcd_writer.change(vcd_var, vcd_timestamp, var_value)
+
+ def close(self, timestamp):
+ if self.vcd_writer is not None:
+ self.vcd_writer.close(self.timestamp_to_vcd(timestamp))
+
+ if self.gtkw_save is not None:
+ self.gtkw_save.dumpfile(self.vcd_file.name)
+ self.gtkw_save.dumpfile_size(self.vcd_file.tell())
+
+ self.gtkw_save.treeopen("top")
+ for signal in self.traces:
+ if len(signal) > 1 and not signal.decoder:
+ suffix = "[{}:0]".format(len(signal) - 1)
+ else:
+ suffix = ""
+ self.gtkw_save.trace(".".join(self.gtkw_names[signal]) + suffix)
+
+ if self.vcd_file is not None:
+ self.vcd_file.close()
+ if self.gtkw_file is not None:
+ self.gtkw_file.close()
+
+
+class _Timeline:
+ def __init__(self):
+ self.now = 0.0
+ self.deadlines = dict()
+
+ def reset(self):
+ self.now = 0.0
+ self.deadlines.clear()
+
+ def at(self, run_at, process):
+ assert process not in self.deadlines
+ self.deadlines[process] = run_at
+
+ def delay(self, delay_by, process):
+ if delay_by is None:
+ run_at = self.now
+ else:
+ run_at = self.now + delay_by
+ self.at(run_at, process)
+
+ def advance(self):
+ nearest_processes = set()
+ nearest_deadline = None
+ for process, deadline in self.deadlines.items():
+ if deadline is None:
+ if nearest_deadline is not None:
+ nearest_processes.clear()
+ nearest_processes.add(process)
+ nearest_deadline = self.now
+ break
+ elif nearest_deadline is None or deadline <= nearest_deadline:
+ assert deadline >= self.now
+ if nearest_deadline is not None and deadline < nearest_deadline:
+ nearest_processes.clear()
+ nearest_processes.add(process)
+ nearest_deadline = deadline
+
+ if not nearest_processes:
+ return False
+
+ for process in nearest_processes:
+ process.runnable = True
+ del self.deadlines[process]
+ self.now = nearest_deadline
+
+ return True
+
+
+class _PySignalState(BaseSignalState):
+ __slots__ = ("signal", "curr", "next", "waiters", "pending")
+
+ def __init__(self, signal, pending):
+ self.signal = signal
+ self.pending = pending
+ self.waiters = dict()
+ self.curr = self.next = signal.reset
+
+ def set(self, value):
+ if self.next == value:
+ return
+ self.next = value
+ self.pending.add(self)
+
+ def commit(self):
+ if self.curr == self.next:
+ return False
+ self.curr = self.next
+
+ awoken_any = False
+ for process, trigger in self.waiters.items():
+ if trigger is None or trigger == self.curr:
+ process.runnable = awoken_any = True
+ return awoken_any
+
+
+class _PySimulation(BaseSimulation):
+ def __init__(self):
+ self.timeline = _Timeline()
+ self.signals = SignalDict()
+ self.slots = []
+ self.pending = set()
+
+ def reset(self):
+ self.timeline.reset()
+ for signal, index in self.signals.items():
+ self.slots[index].curr = self.slots[index].next = signal.reset
+ self.pending.clear()
+
+ def get_signal(self, signal):
+ try:
+ return self.signals[signal]
+ except KeyError:
+ index = len(self.slots)
+ self.slots.append(_PySignalState(signal, self.pending))
+ self.signals[signal] = index
+ return index
+
+ def add_trigger(self, process, signal, *, trigger=None):
+ index = self.get_signal(signal)
+ assert (process not in self.slots[index].waiters or
+ self.slots[index].waiters[process] == trigger)
+ self.slots[index].waiters[process] = trigger
+
+ def remove_trigger(self, process, signal):
+ index = self.get_signal(signal)
+ assert process in self.slots[index].waiters
+ del self.slots[index].waiters[process]
+
+ def wait_interval(self, process, interval):
+ self.timeline.delay(interval, process)
+
+ def commit(self, changed=None):
+ converged = True
+ for signal_state in self.pending:
+ if signal_state.commit():
+ converged = False
+ if changed is not None:
+ changed.update(self.pending)
+ self.pending.clear()
+ return converged
+
+
+class PySimEngine(BaseEngine):
+ def __init__(self, fragment):
+ self._state = _PySimulation()
+ self._timeline = self._state.timeline
+
+ self._fragment = fragment
+ self._processes = _FragmentCompiler(self._state)(self._fragment)
+ self._vcd_writers = []
+
+ def add_coroutine_process(self, process, *, default_cmd):
+ self._processes.add(PyCoroProcess(self._state, self._fragment.domains, process,
+ default_cmd=default_cmd))
+
+ def add_clock_process(self, clock, *, phase, period):
+ self._processes.add(PyClockProcess(self._state, clock,
+ phase=phase, period=period))
+
+ def reset(self):
+ self._state.reset()
+ for process in self._processes:
+ process.reset()
+
+ def _step(self):
+ changed = set() if self._vcd_writers else None
+
+ # Performs the two phases of a delta cycle in a loop:
+ converged = False
+ while not converged:
+ # 1. eval: run and suspend every non-waiting process once, queueing signal changes
+ for process in self._processes:
+ if process.runnable:
+ process.runnable = False
+ process.run()
+
+ # 2. commit: apply every queued signal change, waking up any waiting processes
+ converged = self._state.commit(changed)
+
+ for vcd_writer in self._vcd_writers:
+ for signal_state in changed:
+ vcd_writer.update(self._timeline.now,
+ signal_state.signal, signal_state.curr)
+
+ def advance(self):
+ self._step()
+ self._timeline.advance()
+ return any(not process.passive for process in self._processes)
+
+ @property
+ def now(self):
+ return self._timeline.now
+
+ @contextmanager
+ def write_vcd(self, *, vcd_file, gtkw_file, traces):
+ vcd_writer = _VCDWriter(self._fragment,
+ vcd_file=vcd_file, gtkw_file=gtkw_file, traces=traces)
+ try:
+ self._vcd_writers.append(vcd_writer)
+ yield
+ finally:
+ vcd_writer.close(self._timeline.now)
+ self._vcd_writers.remove(vcd_writer)