-import math
import inspect
import warnings
from contextlib import contextmanager
-from bitarray import bitarray
from vcd import VCDWriter
from vcd.gtkw import GTKWSave
-from .._utils import flatten
+from .._utils import deprecated
from ..hdl.ast import *
+from ..hdl.cd import *
from ..hdl.ir import *
-from ..hdl.xfrm import ValueVisitor, StatementVisitor
+from ..hdl.xfrm import ValueVisitor, StatementVisitor, LHSGroupFilter
-__all__ = ["Simulator", "Delay", "Tick", "Passive", "DeadlineError"]
+class Command:
+ pass
-class DeadlineError(Exception):
- pass
+class Settle(Command):
+ def __repr__(self):
+ return "(settle)"
+
+
+class Delay(Command):
+ def __init__(self, interval=None):
+ self.interval = None if interval is None else float(interval)
+
+ def __repr__(self):
+ if self.interval is None:
+ return "(delay ε)"
+ else:
+ return "(delay {:.3}us)".format(self.interval * 1e6)
+
+
+class Tick(Command):
+ def __init__(self, domain="sync"):
+ if not isinstance(domain, (str, ClockDomain)):
+ raise TypeError("Domain must be a string or a ClockDomain instance, not {!r}"
+ .format(domain))
+ assert domain != "comb"
+ self.domain = domain
+
+ def __repr__(self):
+ return "(tick {})".format(self.domain)
+
+
+class Passive(Command):
+ def __repr__(self):
+ return "(passive)"
+
+
+class Active(Command):
+ def __repr__(self):
+ return "(active)"
+
+
+class _WaveformWriter:
+ def update(self, timestamp, signal, value):
+ raise NotImplementedError # :nocov:
+
+ def close(self, timestamp):
+ raise NotImplementedError # :nocov:
+
+class _VCDWaveformWriter(_WaveformWriter):
+ @staticmethod
+ def timestamp_to_vcd(timestamp):
+ return timestamp * (10 ** 10) # 1/(100 ps)
+
+ @staticmethod
+ def decode_to_vcd(signal, value):
+ return signal.decoder(value).expandtabs().replace(" ", "_")
+
+ def __init__(self, signal_names, *, vcd_file, gtkw_file=None, traces=()):
+ if isinstance(vcd_file, str):
+ vcd_file = open(vcd_file, "wt")
+ if isinstance(gtkw_file, str):
+ gtkw_file = open(gtkw_file, "wt")
+
+ self.vcd_vars = SignalDict()
+ self.vcd_file = vcd_file
+ self.vcd_writer = vcd_file and VCDWriter(self.vcd_file,
+ timescale="100 ps", comment="Generated by nMigen")
+
+ self.gtkw_names = SignalDict()
+ self.gtkw_file = gtkw_file
+ self.gtkw_save = gtkw_file and GTKWSave(self.gtkw_file)
+
+ for signal, names in signal_names.items():
+ if signal.decoder:
+ var_type = "string"
+ var_size = 1
+ var_init = self.decode_to_vcd(signal, signal.reset)
+ else:
+ var_type = "wire"
+ var_size = signal.width
+ var_init = signal.reset
-class _State:
- __slots__ = ("curr", "curr_dirty", "next", "next_dirty")
+ for (*var_scope, var_name) in names:
+ suffix = None
+ while True:
+ try:
+ if suffix is None:
+ var_name_suffix = var_name
+ else:
+ var_name_suffix = "{}${}".format(var_name, suffix)
+ vcd_var = self.vcd_writer.register_var(
+ scope=var_scope, name=var_name_suffix,
+ var_type=var_type, size=var_size, init=var_init)
+ break
+ except KeyError:
+ suffix = (suffix or 0) + 1
+
+ if signal not in self.vcd_vars:
+ self.vcd_vars[signal] = set()
+ self.vcd_vars[signal].add(vcd_var)
+
+ if signal not in self.gtkw_names:
+ self.gtkw_names[signal] = (*var_scope, var_name_suffix)
+
+ def update(self, timestamp, signal, value):
+ if signal not in self.vcd_vars:
+ return
+
+ vcd_timestamp = self.timestamp_to_vcd(timestamp)
+ if signal.decoder:
+ var_value = self.decode_to_vcd(signal, value)
+ else:
+ var_value = value
+ for vcd_var in self.vcd_vars[signal]:
+ self.vcd_writer.change(vcd_var, vcd_timestamp, var_value)
+
+ def close(self, timestamp):
+ self.vcd_writer.close(self.timestamp_to_vcd(timestamp))
+
+ if self.gtkw_save is not None:
+ self.gtkw_save.dumpfile(self.vcd_file.name)
+ self.gtkw_save.dumpfile_size(self.vcd_file.tell())
+
+ self.gtkw_save.treeopen("top")
+ for signal, hierarchy in self.gtkw_names.items():
+ if len(signal) > 1 and not signal.decoder:
+ suffix = "[{}:0]".format(len(signal) - 1)
+ else:
+ suffix = ""
+ self.gtkw_save.trace(".".join(hierarchy) + suffix)
+
+ self.vcd_file.close()
+ if self.gtkw_file is not None:
+ self.gtkw_file.close()
+
+
+class _Process:
+ __slots__ = ("runnable", "passive")
+
+ def reset(self):
+ raise NotImplementedError # :nocov:
+
+ def run(self):
+ raise NotImplementedError # :nocov:
+
+ @property
+ def name(self):
+ raise NotImplementedError # :nocov:
+
+
+class _SignalState:
+ __slots__ = ("signal", "curr", "next", "waiters", "pending")
+
+ def __init__(self, signal, pending):
+ self.signal = signal
+ self.pending = pending
+ self.waiters = dict()
+ self.reset()
+
+ def reset(self):
+ self.curr = self.next = self.signal.reset
+
+ def set(self, value):
+ if self.next == value:
+ return
+ self.next = value
+ self.pending.add(self)
+ def wait(self, task, *, trigger=None):
+ assert task not in self.waiters
+ self.waiters[task] = trigger
+
+ def commit(self):
+ if self.curr == self.next:
+ return False
+ self.curr = self.next
+ return True
+
+ def wakeup(self):
+ awoken_any = False
+ for process, trigger in self.waiters.items():
+ if trigger is None or trigger == self.curr:
+ process.runnable = awoken_any = True
+ return awoken_any
+
+
+class _SimulatorState:
def __init__(self):
- self.curr = []
- self.next = []
- self.curr_dirty = bitarray()
- self.next_dirty = bitarray()
-
- def add(self, value):
- slot = len(self.curr)
- self.curr.append(value)
- self.next.append(value)
- self.curr_dirty.append(True)
- self.next_dirty.append(False)
- return slot
-
- def set(self, slot, value):
- if self.next[slot] != value:
- self.next_dirty[slot] = True
- self.next[slot] = value
-
- def commit(self, slot):
- old_value = self.curr[slot]
- new_value = self.next[slot]
- if old_value != new_value:
- self.next_dirty[slot] = False
- self.curr_dirty[slot] = True
- self.curr[slot] = new_value
- return old_value, new_value
-
- def flush_curr_dirty(self):
- while True:
- try:
- slot = self.curr_dirty.index(True)
- except ValueError:
- break
- self.curr_dirty[slot] = False
- yield slot
+ self.signals = SignalDict()
+ self.pending = set()
- def iter_next_dirty(self):
- start = 0
- while True:
- try:
- slot = self.next_dirty.index(True, start)
- start = slot + 1
- except ValueError:
+ self.timestamp = 0.0
+ self.deadlines = dict()
+
+ self.waveform_writer = None
+
+ def reset(self):
+ for signal_state in self.signals.values():
+ signal_state.reset()
+ self.pending.clear()
+
+ self.timestamp = 0.0
+ self.deadlines.clear()
+
+ def for_signal(self, signal):
+ try:
+ return self.signals[signal]
+ except KeyError:
+ signal_state = _SignalState(signal, self.pending)
+ self.signals[signal] = signal_state
+ return signal_state
+
+ def commit(self):
+ awoken_any = False
+ for signal_state in self.pending:
+ if signal_state.commit():
+ if signal_state.wakeup():
+ awoken_any = True
+ if self.waveform_writer is not None:
+ self.waveform_writer.update(self.timestamp,
+ signal_state.signal, signal_state.curr)
+ return awoken_any
+
+ def advance(self):
+ nearest_processes = set()
+ nearest_deadline = None
+ for process, deadline in self.deadlines.items():
+ if deadline is None:
+ if nearest_deadline is not None:
+ nearest_processes.clear()
+ nearest_processes.add(process)
+ nearest_deadline = self.timestamp
break
- yield slot
+ elif nearest_deadline is None or deadline <= nearest_deadline:
+ assert deadline >= self.timestamp
+ if nearest_deadline is not None and deadline < nearest_deadline:
+ nearest_processes.clear()
+ nearest_processes.add(process)
+ nearest_deadline = deadline
+
+ if not nearest_processes:
+ return False
+
+ for process in nearest_processes:
+ process.runnable = True
+ del self.deadlines[process]
+ self.timestamp = nearest_deadline
+
+ return True
+
+ def start_waveform(self, waveform_writer):
+ if self.timestamp != 0.0:
+ raise ValueError("Cannot start writing waveforms after advancing simulation time")
+ if self.waveform_writer is not None:
+ raise ValueError("Already writing waveforms to {!r}"
+ .format(self.waveform_writer))
+ self.waveform_writer = waveform_writer
+
+ def finish_waveform(self):
+ if self.waveform_writer is None:
+ return
+ self.waveform_writer.close(self.timestamp)
+ self.waveform_writer = None
+
+
+class _EvalContext:
+ __slots__ = ("state", "indexes", "slots")
+
+ def __init__(self, state):
+ self.state = state
+ self.indexes = SignalDict()
+ self.slots = []
+
+ def get_signal(self, signal):
+ try:
+ return self.indexes[signal]
+ except KeyError:
+ index = len(self.slots)
+ self.slots.append(self.state.for_signal(signal))
+ self.indexes[signal] = index
+ return index
+
+ def get_in_signal(self, signal, *, trigger=None):
+ index = self.get_signal(signal)
+ self.slots[index].waiters[self] = trigger
+ return index
+
+ def get_out_signal(self, signal):
+ return self.get_signal(signal)
-normalize = Const.normalize
+class _Emitter:
+ def __init__(self):
+ self._buffer = []
+ self._suffix = 0
+ self._level = 0
+
+ def append(self, code):
+ self._buffer.append(" " * self._level)
+ self._buffer.append(code)
+ self._buffer.append("\n")
+
+ @contextmanager
+ def indent(self):
+ self._level += 1
+ yield
+ self._level -= 1
+
+ def flush(self, indent=""):
+ code = "".join(self._buffer)
+ self._buffer.clear()
+ return code
+
+ def gen_var(self, prefix):
+ name = f"{prefix}_{self._suffix}"
+ self._suffix += 1
+ return name
+
+ def def_var(self, prefix, value):
+ name = self.gen_var(prefix)
+ self.append(f"{name} = {value}")
+ return name
+
+
+class _Compiler:
+ def __init__(self, context, emitter):
+ self.context = context
+ self.emitter = emitter
+
+
+class _ValueCompiler(ValueVisitor, _Compiler):
+ helpers = {
+ "sign": lambda value, sign: value | sign if value & sign else value,
+ "zdiv": lambda lhs, rhs: 0 if rhs == 0 else lhs // rhs,
+ "sshl": lambda lhs, rhs: lhs << rhs if rhs >= 0 else lhs >> -rhs,
+ "sshr": lambda lhs, rhs: lhs >> rhs if rhs >= 0 else lhs << -rhs,
+ }
+
+ def on_ClockSignal(self, value):
+ raise NotImplementedError # :nocov:
+ def on_ResetSignal(self, value):
+ raise NotImplementedError # :nocov:
+
+ def on_Record(self, value):
+ return self(Cat(value.fields.values()))
-class _ValueCompiler(ValueVisitor):
def on_AnyConst(self, value):
raise NotImplementedError # :nocov:
def on_Initial(self, value):
raise NotImplementedError # :nocov:
- def on_Record(self, value):
- return self(Cat(value.fields.values()))
-
class _RHSValueCompiler(_ValueCompiler):
- def __init__(self, signal_slots, sensitivity=None, mode="rhs"):
- self.signal_slots = signal_slots
- self.sensitivity = sensitivity
- self.signal_mode = mode
+ def __init__(self, context, emitter, *, mode, inputs=None):
+ super().__init__(context, emitter)
+ assert mode in ("curr", "next")
+ self.mode = mode
+ # If not None, `inputs` gets populated with RHS signals.
+ self.inputs = inputs
def on_Const(self, value):
- return lambda state: value.value
+ return f"{value.value}"
def on_Signal(self, value):
- if self.sensitivity is not None:
- self.sensitivity.add(value)
- if value not in self.signal_slots:
- # A signal that is neither driven nor a port always remains at its reset state.
- return lambda state: value.reset
- value_slot = self.signal_slots[value]
- if self.signal_mode == "rhs":
- return lambda state: state.curr[value_slot]
- elif self.signal_mode == "lhs":
- return lambda state: state.next[value_slot]
+ if self.inputs is not None:
+ self.inputs.add(value)
+
+ if self.mode == "curr":
+ return f"slots[{self.context.get_signal(value)}].{self.mode}"
else:
- raise ValueError # :nocov:
+ return f"next_{self.context.get_signal(value)}"
- def on_ClockSignal(self, value):
- raise NotImplementedError # :nocov:
+ def on_Operator(self, value):
+ def mask(value):
+ value_mask = (1 << len(value)) - 1
+ return f"({self(value)} & {value_mask})"
- def on_ResetSignal(self, value):
- raise NotImplementedError # :nocov:
+ def sign(value):
+ if value.shape().signed:
+ return f"sign({mask(value)}, {-1 << (len(value) - 1)})"
+ else: # unsigned
+ return mask(value)
- def on_Operator(self, value):
- shape = value.shape()
if len(value.operands) == 1:
- arg, = map(self, value.operands)
+ arg, = value.operands
if value.operator == "~":
- return lambda state: normalize(~arg(state), shape)
+ return f"(~{self(arg)})"
if value.operator == "-":
- return lambda state: normalize(-arg(state), shape)
+ return f"(-{self(arg)})"
if value.operator == "b":
- return lambda state: normalize(bool(arg(state)), shape)
+ return f"bool({mask(arg)})"
if value.operator == "r|":
- return lambda state: normalize(arg(state) != 0, shape)
+ return f"({mask(arg)} != 0)"
if value.operator == "r&":
- val, = value.operands
- mask = (1 << len(val)) - 1
- return lambda state: normalize(arg(state) == mask, shape)
+ return f"({mask(arg)} == {(1 << len(arg)) - 1})"
if value.operator == "r^":
# Believe it or not, this is the fastest way to compute a sideways XOR in Python.
- return lambda state: normalize(format(arg(state), "b").count("1") % 2, shape)
+ return f"(format({mask(arg)}, 'b').count('1') % 2)"
elif len(value.operands) == 2:
- lhs, rhs = map(self, value.operands)
+ lhs, rhs = value.operands
+ lhs_mask = (1 << len(lhs)) - 1
+ rhs_mask = (1 << len(rhs)) - 1
if value.operator == "+":
- return lambda state: normalize(lhs(state) + rhs(state), shape)
+ return f"({mask(lhs)} + {mask(rhs)})"
if value.operator == "-":
- return lambda state: normalize(lhs(state) - rhs(state), shape)
+ return f"({mask(lhs)} - {mask(rhs)})"
if value.operator == "*":
- return lambda state: normalize(lhs(state) * rhs(state), shape)
+ return f"({sign(lhs)} * {sign(rhs)})"
if value.operator == "//":
- def floordiv(lhs, rhs):
- return 0 if rhs == 0 else lhs // rhs
- return lambda state: normalize(floordiv(lhs(state), rhs(state)), shape)
+ return f"zdiv({sign(lhs)}, {sign(rhs)})"
if value.operator == "&":
- return lambda state: normalize(lhs(state) & rhs(state), shape)
+ return f"({self(lhs)} & {self(rhs)})"
if value.operator == "|":
- return lambda state: normalize(lhs(state) | rhs(state), shape)
+ return f"({self(lhs)} | {self(rhs)})"
if value.operator == "^":
- return lambda state: normalize(lhs(state) ^ rhs(state), shape)
+ return f"({self(lhs)} ^ {self(rhs)})"
if value.operator == "<<":
- def sshl(lhs, rhs):
- return lhs << rhs if rhs >= 0 else lhs >> -rhs
- return lambda state: normalize(sshl(lhs(state), rhs(state)), shape)
+ return f"sshl({sign(lhs)}, {sign(rhs)})"
if value.operator == ">>":
- def sshr(lhs, rhs):
- return lhs >> rhs if rhs >= 0 else lhs << -rhs
- return lambda state: normalize(sshr(lhs(state), rhs(state)), shape)
+ return f"sshr({sign(lhs)}, {sign(rhs)})"
if value.operator == "==":
- return lambda state: normalize(lhs(state) == rhs(state), shape)
+ return f"({sign(lhs)} == {sign(rhs)})"
if value.operator == "!=":
- return lambda state: normalize(lhs(state) != rhs(state), shape)
+ return f"({sign(lhs)} != {sign(rhs)})"
if value.operator == "<":
- return lambda state: normalize(lhs(state) < rhs(state), shape)
+ return f"({sign(lhs)} < {sign(rhs)})"
if value.operator == "<=":
- return lambda state: normalize(lhs(state) <= rhs(state), shape)
+ return f"({sign(lhs)} <= {sign(rhs)})"
if value.operator == ">":
- return lambda state: normalize(lhs(state) > rhs(state), shape)
+ return f"({sign(lhs)} > {sign(rhs)})"
if value.operator == ">=":
- return lambda state: normalize(lhs(state) >= rhs(state), shape)
+ return f"({sign(lhs)} >= {sign(rhs)})"
elif len(value.operands) == 3:
if value.operator == "m":
- sel, val1, val0 = map(self, value.operands)
- return lambda state: val1(state) if sel(state) else val0(state)
+ sel, val1, val0 = value.operands
+ return f"({self(val1)} if {self(sel)} else {self(val0)})"
raise NotImplementedError("Operator '{}' not implemented".format(value.operator)) # :nocov:
def on_Slice(self, value):
- shape = value.shape()
- arg = self(value.value)
- shift = value.start
- mask = (1 << (value.stop - value.start)) - 1
- return lambda state: normalize((arg(state) >> shift) & mask, shape)
+ return f"(({self(value.value)} >> {value.start}) & {(1 << len(value)) - 1})"
def on_Part(self, value):
- shape = value.shape()
- arg = self(value.value)
- shift = self(value.offset)
- mask = (1 << value.width) - 1
- stride = value.stride
- return lambda state: normalize((arg(state) >> shift(state) * stride) & mask, shape)
+ offset_mask = (1 << len(value.offset)) - 1
+ offset = f"(({self(value.offset)} & {offset_mask}) * {value.stride})"
+ return f"({self(value.value)} >> {offset} & " \
+ f"{(1 << value.width) - 1})"
def on_Cat(self, value):
- shape = value.shape()
- parts = []
+ gen_parts = []
offset = 0
- for opnd in value.parts:
- parts.append((offset, (1 << len(opnd)) - 1, self(opnd)))
- offset += len(opnd)
- def eval(state):
- result = 0
- for offset, mask, opnd in parts:
- result |= (opnd(state) & mask) << offset
- return normalize(result, shape)
- return eval
+ for part in value.parts:
+ part_mask = (1 << len(part)) - 1
+ gen_parts.append(f"(({self(part)} & {part_mask}) << {offset})")
+ offset += len(part)
+ return f"({' | '.join(gen_parts)})"
def on_Repl(self, value):
- shape = value.shape()
- offset = len(value.value)
- mask = (1 << len(value.value)) - 1
- count = value.count
- opnd = self(value.value)
- def eval(state):
- result = 0
- for _ in range(count):
- result <<= offset
- result |= opnd(state)
- return normalize(result, shape)
- return eval
+ part_mask = (1 << len(value.value)) - 1
+ gen_part = self.emitter.def_var("repl", f"{self(value.value)} & {part_mask}")
+ gen_parts = []
+ offset = 0
+ for _ in range(value.count):
+ gen_parts.append(f"({gen_part} << {offset})")
+ offset += len(value.value)
+ return f"({' | '.join(gen_parts)})"
def on_ArrayProxy(self, value):
- shape = value.shape()
- elems = list(map(self, value.elems))
- index = self(value.index)
- def eval(state):
- index_value = index(state)
- if index_value >= len(elems):
- index_value = len(elems) - 1
- return normalize(elems[index_value](state), shape)
- return eval
+ index_mask = (1 << len(value.index)) - 1
+ gen_index = self.emitter.def_var("rhs_index", f"{self(value.index)} & {index_mask}")
+ gen_value = self.emitter.gen_var("rhs_proxy")
+ if value.elems:
+ gen_elems = []
+ for index, elem in enumerate(value.elems):
+ if index == 0:
+ self.emitter.append(f"if {gen_index} == {index}:")
+ else:
+ self.emitter.append(f"elif {gen_index} == {index}:")
+ with self.emitter.indent():
+ self.emitter.append(f"{gen_value} = {self(elem)}")
+ self.emitter.append(f"else:")
+ with self.emitter.indent():
+ self.emitter.append(f"{gen_value} = {self(value.elems[-1])}")
+ return gen_value
+ else:
+ return f"0"
+
+ @classmethod
+ def compile(cls, context, value, *, mode, inputs=None):
+ emitter = _Emitter()
+ compiler = cls(context, emitter, mode=mode, inputs=inputs)
+ emitter.append(f"result = {compiler(value)}")
+ return emitter.flush()
class _LHSValueCompiler(_ValueCompiler):
- def __init__(self, signal_slots, rhs_compiler):
- self.signal_slots = signal_slots
- self.rhs_compiler = rhs_compiler
+ def __init__(self, context, emitter, *, rhs, outputs=None):
+ super().__init__(context, emitter)
+ # `rrhs` is used to translate rvalues that are syntactically a part of an lvalue, e.g.
+ # the offset of a Part.
+ self.rrhs = rhs
+ # `lrhs` is used to translate the read part of a read-modify-write cycle during partial
+ # update of an lvalue.
+ self.lrhs = _RHSValueCompiler(context, emitter, mode="next", inputs=None)
+ # If not None, `outputs` gets populated with signals on LHS.
+ self.outputs = outputs
def on_Const(self, value):
raise TypeError # :nocov:
def on_Signal(self, value):
- shape = value.shape()
- value_slot = self.signal_slots[value]
- def eval(state, rhs):
- state.set(value_slot, normalize(rhs, shape))
- return eval
-
- def on_ClockSignal(self, value):
- raise NotImplementedError # :nocov:
-
- def on_ResetSignal(self, value):
- raise NotImplementedError # :nocov:
+ if self.outputs is not None:
+ self.outputs.add(value)
+
+ def gen(arg):
+ value_mask = (1 << len(value)) - 1
+ if value.shape().signed:
+ value_sign = f"sign({arg} & {value_mask}, {-1 << (len(value) - 1)})"
+ else: # unsigned
+ value_sign = f"{arg} & {value_mask}"
+ self.emitter.append(f"next_{self.context.get_out_signal(value)} = {value_sign}")
+ return gen
def on_Operator(self, value):
raise TypeError # :nocov:
def on_Slice(self, value):
- lhs_r = self.rhs_compiler(value.value)
- lhs_l = self(value.value)
- shift = value.start
- mask = (1 << (value.stop - value.start)) - 1
- def eval(state, rhs):
- lhs_value = lhs_r(state)
- lhs_value &= ~(mask << shift)
- lhs_value |= (rhs & mask) << shift
- lhs_l(state, lhs_value)
- return eval
+ def gen(arg):
+ width_mask = (1 << (value.stop - value.start)) - 1
+ self(value.value)(f"({self.lrhs(value.value)} & " \
+ f"{~(width_mask << value.start)} | " \
+ f"(({arg} & {width_mask}) << {value.start}))")
+ return gen
def on_Part(self, value):
- lhs_r = self.rhs_compiler(value.value)
- lhs_l = self(value.value)
- shift = self.rhs_compiler(value.offset)
- mask = (1 << value.width) - 1
- stride = value.stride
- def eval(state, rhs):
- lhs_value = lhs_r(state)
- shift_value = shift(state) * stride
- lhs_value &= ~(mask << shift_value)
- lhs_value |= (rhs & mask) << shift_value
- lhs_l(state, lhs_value)
- return eval
+ def gen(arg):
+ width_mask = (1 << value.width) - 1
+ offset_mask = (1 << len(value.offset)) - 1
+ offset = f"(({self.rrhs(value.offset)} & {offset_mask}) * {value.stride})"
+ self(value.value)(f"({self.lrhs(value.value)} & " \
+ f"~({width_mask} << {offset}) | " \
+ f"(({arg} & {width_mask}) << {offset}))")
+ return gen
def on_Cat(self, value):
- parts = []
- offset = 0
- for opnd in value.parts:
- parts.append((offset, (1 << len(opnd)) - 1, self(opnd)))
- offset += len(opnd)
- def eval(state, rhs):
- for offset, mask, opnd in parts:
- opnd(state, (rhs >> offset) & mask)
- return eval
+ def gen(arg):
+ gen_arg = self.emitter.def_var("cat", arg)
+ gen_parts = []
+ offset = 0
+ for part in value.parts:
+ part_mask = (1 << len(part)) - 1
+ self(part)(f"(({gen_arg} >> {offset}) & {part_mask})")
+ offset += len(part)
+ return gen
def on_Repl(self, value):
raise TypeError # :nocov:
def on_ArrayProxy(self, value):
- elems = list(map(self, value.elems))
- index = self.rhs_compiler(value.index)
- def eval(state, rhs):
- index_value = index(state)
- if index_value >= len(elems):
- index_value = len(elems) - 1
- elems[index_value](state, rhs)
- return eval
-
-
-class _StatementCompiler(StatementVisitor):
- def __init__(self, signal_slots):
- self.sensitivity = SignalSet()
- self.rrhs_compiler = _RHSValueCompiler(signal_slots, self.sensitivity, mode="rhs")
- self.lrhs_compiler = _RHSValueCompiler(signal_slots, self.sensitivity, mode="lhs")
- self.lhs_compiler = _LHSValueCompiler(signal_slots, self.lrhs_compiler)
+ def gen(arg):
+ index_mask = (1 << len(value.index)) - 1
+ gen_index = self.emitter.def_var("index", f"{self.rrhs(value.index)} & {index_mask}")
+ if value.elems:
+ gen_elems = []
+ for index, elem in enumerate(value.elems):
+ if index == 0:
+ self.emitter.append(f"if {gen_index} == {index}:")
+ else:
+ self.emitter.append(f"elif {gen_index} == {index}:")
+ with self.emitter.indent():
+ self(elem)(arg)
+ self.emitter.append(f"else:")
+ with self.emitter.indent():
+ self(value.elems[-1])(arg)
+ else:
+ self.emitter.append(f"pass")
+ return gen
+
+ @classmethod
+ def compile(cls, context, stmt, *, inputs=None, outputs=None):
+ emitter = _Emitter()
+ compiler = cls(context, emitter, inputs=inputs, outputs=outputs)
+ compiler(stmt)
+ return emitter.flush()
+
+
+class _StatementCompiler(StatementVisitor, _Compiler):
+ def __init__(self, context, emitter, *, inputs=None, outputs=None):
+ super().__init__(context, emitter)
+ self.rhs = _RHSValueCompiler(context, emitter, mode="curr", inputs=inputs)
+ self.lhs = _LHSValueCompiler(context, emitter, rhs=self.rhs, outputs=outputs)
+
+ def on_statements(self, stmts):
+ for stmt in stmts:
+ self(stmt)
+ if not stmts:
+ self.emitter.append("pass")
def on_Assign(self, stmt):
- shape = stmt.lhs.shape()
- lhs = self.lhs_compiler(stmt.lhs)
- rhs = self.rrhs_compiler(stmt.rhs)
- def run(state):
- lhs(state, normalize(rhs(state), shape))
- return run
+ return self.lhs(stmt.lhs)(self.rhs(stmt.rhs))
+
+ def on_Switch(self, stmt):
+ gen_test = self.emitter.def_var("test",
+ f"{self.rhs(stmt.test)} & {(1 << len(stmt.test)) - 1}")
+ for index, (patterns, stmts) in enumerate(stmt.cases.items()):
+ gen_checks = []
+ if not patterns:
+ gen_checks.append(f"True")
+ else:
+ for pattern in patterns:
+ if "-" in pattern:
+ mask = int("".join("0" if b == "-" else "1" for b in pattern), 2)
+ value = int("".join("0" if b == "-" else b for b in pattern), 2)
+ gen_checks.append(f"({gen_test} & {mask}) == {value}")
+ else:
+ value = int(pattern, 2)
+ gen_checks.append(f"{gen_test} == {value}")
+ if index == 0:
+ self.emitter.append(f"if {' or '.join(gen_checks)}:")
+ else:
+ self.emitter.append(f"elif {' or '.join(gen_checks)}:")
+ with self.emitter.indent():
+ self(stmts)
def on_Assert(self, stmt):
- raise NotImplementedError("Asserts not yet implemented for Simulator backend.") # :nocov:
+ raise NotImplementedError # :nocov:
def on_Assume(self, stmt):
- pass # :nocov:
+ raise NotImplementedError # :nocov:
def on_Cover(self, stmt):
- raise NotImplementedError("Covers not yet implemented for Simulator backend.") # :nocov:
+ raise NotImplementedError # :nocov:
- def on_Switch(self, stmt):
- test = self.rrhs_compiler(stmt.test)
- cases = []
- for values, stmts in stmt.cases.items():
- if values == ():
- check = lambda test: True
+ @classmethod
+ def compile(cls, context, stmt, *, inputs=None, outputs=None):
+ output_indexes = [context.get_signal(signal) for signal in stmt._lhs_signals()]
+ emitter = _Emitter()
+ for signal_index in output_indexes:
+ emitter.append(f"next_{signal_index} = slots[{signal_index}].next")
+ compiler = cls(context, emitter, inputs=inputs, outputs=outputs)
+ compiler(stmt)
+ for signal_index in output_indexes:
+ emitter.append(f"slots[{signal_index}].set(next_{signal_index})")
+ return emitter.flush()
+
+
+class _CompiledProcess(_Process):
+ __slots__ = ("context", "comb", "name", "run")
+
+ def __init__(self, state, *, comb, name):
+ self.context = _EvalContext(state)
+ self.comb = comb
+ self.name = name
+ self.run = None # set by _FragmentCompiler
+ self.reset()
+
+ def reset(self):
+ self.runnable = self.comb
+ self.passive = True
+
+
+class _FragmentCompiler:
+ def __init__(self, state, signal_names):
+ self.state = state
+ self.signal_names = signal_names
+
+ def __call__(self, fragment, *, hierarchy=("top",)):
+ processes = set()
+
+ def add_signal_name(signal):
+ hierarchical_signal_name = (*hierarchy, signal.name)
+ if signal not in self.signal_names:
+ self.signal_names[signal] = {hierarchical_signal_name}
else:
- check = lambda test: False
- def make_check(mask, value, prev_check):
- return lambda test: prev_check(test) or test & mask == value
- for value in values:
- if "-" in value:
- mask = "".join("0" if b == "-" else "1" for b in value)
- value = "".join("0" if b == "-" else b for b in value)
- else:
- mask = "1" * len(value)
- mask = int(mask, 2)
- value = int(value, 2)
- check = make_check(mask, value, check)
- cases.append((check, self.on_statements(stmts)))
- def run(state):
- test_value = test(state)
- for check, body in cases:
- if check(test_value):
- body(state)
- return
- return run
+ self.signal_names[signal].add(hierarchical_signal_name)
- def on_statements(self, stmts):
- stmts = [self.on_statement(stmt) for stmt in stmts]
- def run(state):
- for stmt in stmts:
- stmt(state)
- return run
+ for domain_name, domain_signals in fragment.drivers.items():
+ for domain_signal in domain_signals:
+ add_signal_name(domain_signal)
+ domain_stmts = LHSGroupFilter(domain_signals)(fragment.statements)
+ domain_process = _CompiledProcess(self.state, comb=domain_name is None,
+ name=".".join((*hierarchy, "<{}>".format(domain_name or "comb"))))
-class Simulator:
- def __init__(self, fragment, vcd_file=None, gtkw_file=None, traces=()):
- self._fragment = Fragment.get(fragment, platform=None)
-
- self._signal_slots = SignalDict() # Signal -> int/slot
- self._slot_signals = list() # int/slot -> Signal
-
- self._domains = list() # [ClockDomain]
- self._clk_edges = dict() # ClockDomain -> int/edge
- self._domain_triggers = list() # int/slot -> ClockDomain
-
- self._signals = SignalSet() # {Signal}
- self._comb_signals = bitarray() # {Signal}
- self._sync_signals = bitarray() # {Signal}
- self._user_signals = bitarray() # {Signal}
- self._domain_signals = dict() # ClockDomain -> {Signal}
-
- self._started = False
- self._timestamp = 0.
- self._delta = 0.
- self._epsilon = 1e-10
- self._fastest_clock = self._epsilon
- self._all_clocks = set() # {str/domain}
- self._state = _State()
-
- self._processes = set() # {process}
- self._process_loc = dict() # process -> str/loc
- self._passive = set() # {process}
- self._suspended = set() # {process}
- self._wait_deadline = dict() # process -> float/timestamp
- self._wait_tick = dict() # process -> str/domain
-
- self._funclets = list() # int/slot -> set(lambda)
-
- self._vcd_file = vcd_file
- self._vcd_writer = None
- self._vcd_signals = list() # int/slot -> set(vcd_signal)
- self._vcd_names = list() # int/slot -> str/name
- self._gtkw_file = gtkw_file
- self._traces = traces
-
- self._run_called = False
+ emitter = _Emitter()
+ emitter.append(f"def run():")
+ emitter._level += 1
- @staticmethod
- def _check_process(process):
- if inspect.isgeneratorfunction(process):
- process = process()
- if not (inspect.isgenerator(process) or inspect.iscoroutine(process)):
- raise TypeError("Cannot add a process {!r} because it is not a generator or "
- "a generator function"
- .format(process))
- return process
+ if domain_name is None:
+ for signal in domain_signals:
+ signal_index = domain_process.context.get_signal(signal)
+ emitter.append(f"next_{signal_index} = {signal.reset}")
- def _name_process(self, process):
- if process in self._process_loc:
- return self._process_loc[process]
- else:
- if inspect.isgenerator(process):
- frame = process.gi_frame
- if inspect.iscoroutine(process):
- frame = process.cr_frame
- return "{}:{}".format(inspect.getfile(frame), inspect.getlineno(frame))
+ inputs = SignalSet()
+ _StatementCompiler(domain_process.context, emitter, inputs=inputs)(domain_stmts)
- def add_process(self, process):
- process = self._check_process(process)
- self._processes.add(process)
+ for input in inputs:
+ self.state.for_signal(input).wait(domain_process)
- def add_sync_process(self, process, domain="sync"):
- process = self._check_process(process)
- def sync_process():
+ else:
+ domain = fragment.domains[domain_name]
+ add_signal_name(domain.clk)
+ if domain.rst is not None:
+ add_signal_name(domain.rst)
+
+ clk_trigger = 1 if domain.clk_edge == "pos" else 0
+ self.state.for_signal(domain.clk).wait(domain_process, trigger=clk_trigger)
+ if domain.rst is not None and domain.async_reset:
+ rst_trigger = 1
+ self.state.for_signal(domain.rst).wait(domain_process, trigger=rst_trigger)
+
+ gen_asserts = []
+ clk_index = domain_process.context.get_signal(domain.clk)
+ gen_asserts.append(f"slots[{clk_index}].curr == {clk_trigger}")
+ if domain.rst is not None and domain.async_reset:
+ rst_index = domain_process.context.get_signal(domain.rst)
+ gen_asserts.append(f"slots[{rst_index}].curr == {rst_trigger}")
+ emitter.append(f"assert {' or '.join(gen_asserts)}")
+
+ for signal in domain_signals:
+ signal_index = domain_process.context.get_signal(signal)
+ emitter.append(f"next_{signal_index} = slots[{signal_index}].next")
+
+ _StatementCompiler(domain_process.context, emitter)(domain_stmts)
+
+ for signal in domain_signals:
+ signal_index = domain_process.context.get_signal(signal)
+ emitter.append(f"slots[{signal_index}].set(next_{signal_index})")
+
+ exec_locals = {"slots": domain_process.context.slots, **_ValueCompiler.helpers}
+ exec(emitter.flush(), exec_locals)
+ domain_process.run = exec_locals["run"]
+
+ processes.add(domain_process)
+
+ for subfragment_index, (subfragment, subfragment_name) in enumerate(fragment.subfragments):
+ if subfragment_name is None:
+ subfragment_name = "U${}".format(subfragment_index)
+ processes.update(self(subfragment, hierarchy=(*hierarchy, subfragment_name)))
+
+ return processes
+
+
+class _CoroutineProcess(_Process):
+ def __init__(self, state, domains, constructor, *, default_cmd=None):
+ self.state = state
+ self.domains = domains
+ self.constructor = constructor
+ self.default_cmd = default_cmd
+ self.reset()
+
+ def reset(self):
+ self.runnable = True
+ self.passive = False
+ self.coroutine = self.constructor()
+ self.eval_context = _EvalContext(self.state)
+ self.exec_locals = {
+ "slots": self.eval_context.slots,
+ "result": None,
+ **_ValueCompiler.helpers
+ }
+ self.waits_on = set()
+
+ @property
+ def name(self):
+ coroutine = self.coroutine
+ while coroutine.gi_yieldfrom is not None:
+ coroutine = coroutine.gi_yieldfrom
+ if inspect.isgenerator(coroutine):
+ frame = coroutine.gi_frame
+ if inspect.iscoroutine(coroutine):
+ frame = coroutine.cr_frame
+ return "{}:{}".format(inspect.getfile(frame), inspect.getlineno(frame))
+
+ def get_in_signal(self, signal, *, trigger=None):
+ signal_state = self.state.for_signal(signal)
+ assert self not in signal_state.waiters
+ signal_state.waiters[self] = trigger
+ self.waits_on.add(signal_state)
+ return signal_state
+
+ def run(self):
+ if self.coroutine is None:
+ return
+
+ if self.waits_on:
+ for signal_state in self.waits_on:
+ del signal_state.waiters[self]
+ self.waits_on.clear()
+
+ response = None
+ while True:
try:
- cmd = None
- while True:
- if cmd is None:
- cmd = Tick(domain)
- result = yield cmd
- self._process_loc[sync_process] = self._name_process(process)
- cmd = process.send(result)
- except StopIteration:
- pass
- sync_process = sync_process()
- self.add_process(sync_process)
+ command = self.coroutine.send(response)
+ if command is None:
+ command = self.default_cmd
+ response = None
+
+ if isinstance(command, Value):
+ exec(_RHSValueCompiler.compile(self.eval_context, command, mode="curr"),
+ self.exec_locals)
+ response = Const.normalize(self.exec_locals["result"], command.shape())
+
+ elif isinstance(command, Statement):
+ exec(_StatementCompiler.compile(self.eval_context, command),
+ self.exec_locals)
+
+ elif type(command) is Tick:
+ domain = command.domain
+ if isinstance(domain, ClockDomain):
+ pass
+ elif domain in self.domains:
+ domain = self.domains[domain]
+ else:
+ raise NameError("Received command {!r} that refers to a nonexistent "
+ "domain {!r} from process {!r}"
+ .format(command, command.domain, self.name))
+ self.get_in_signal(domain.clk, trigger=1 if domain.clk_edge == "pos" else 0)
+ if domain.rst is not None and domain.async_reset:
+ self.get_in_signal(domain.rst, trigger=1)
+ return
- def add_clock(self, period, *, phase=None, domain="sync", if_exists=False):
- if self._fastest_clock == self._epsilon or period < self._fastest_clock:
- self._fastest_clock = period
- if domain in self._all_clocks:
- raise ValueError("Domain '{}' already has a clock driving it"
- .format(domain))
+ elif type(command) is Settle:
+ self.state.deadlines[self] = None
+ return
- half_period = period / 2
- if phase is None:
- phase = half_period
- for domain_obj in self._domains:
- if not domain_obj.local and domain_obj.name == domain:
- clk = domain_obj.clk
- break
- else:
- if if_exists:
- return
- else:
- raise ValueError("Domain '{}' is not present in simulation"
- .format(domain))
- def clk_process():
- yield Passive()
- yield Delay(phase)
- while True:
- yield clk.eq(1)
- yield Delay(half_period)
- yield clk.eq(0)
- yield Delay(half_period)
- self.add_process(clk_process)
- self._all_clocks.add(domain)
+ elif type(command) is Delay:
+ if command.interval is None:
+ self.state.deadlines[self] = None
+ else:
+ self.state.deadlines[self] = self.state.timestamp + command.interval
+ return
- def __enter__(self):
- if self._vcd_file:
- self._vcd_writer = VCDWriter(self._vcd_file, timescale="100 ps",
- comment="Generated by nMigen")
-
- root_fragment = self._fragment.prepare()
-
- hierarchy = {}
- domains = set()
- def add_fragment(fragment, scope=()):
- hierarchy[fragment] = scope
- domains.update(fragment.domains.values())
- for index, (subfragment, name) in enumerate(fragment.subfragments):
- if name is None:
- add_fragment(subfragment, (*scope, "U{}".format(index)))
- else:
- add_fragment(subfragment, (*scope, name))
- add_fragment(root_fragment, scope=("top",))
- self._domains = list(domains)
- self._clk_edges = {domain: 1 if domain.clk_edge == "pos" else 0 for domain in domains}
-
- def add_signal(signal):
- if signal not in self._signals:
- self._signals.add(signal)
-
- signal_slot = self._state.add(normalize(signal.reset, signal.shape()))
- self._signal_slots[signal] = signal_slot
- self._slot_signals.append(signal)
-
- self._comb_signals.append(False)
- self._sync_signals.append(False)
- self._user_signals.append(False)
- for domain in self._domains:
- if domain not in self._domain_signals:
- self._domain_signals[domain] = bitarray()
- self._domain_signals[domain].append(False)
-
- self._funclets.append(set())
-
- self._domain_triggers.append(None)
- if self._vcd_writer:
- self._vcd_signals.append(set())
- self._vcd_names.append(None)
-
- return self._signal_slots[signal]
-
- def add_domain_signal(signal, domain):
- signal_slot = add_signal(signal)
- self._domain_triggers[signal_slot] = domain
-
- for fragment, fragment_scope in hierarchy.items():
- for signal in fragment.iter_signals():
- add_signal(signal)
-
- for domain_name, domain in fragment.domains.items():
- add_domain_signal(domain.clk, domain)
- if domain.rst is not None:
- add_domain_signal(domain.rst, domain)
+ elif type(command) is Passive:
+ self.passive = True
- for fragment, fragment_scope in hierarchy.items():
- for signal in fragment.iter_signals():
- if not self._vcd_writer:
- continue
+ elif type(command) is Active:
+ self.passive = False
- signal_slot = self._signal_slots[signal]
+ elif command is None: # only possible if self.default_cmd is None
+ raise TypeError("Received default command from process {!r} that was added "
+ "with add_process(); did you mean to add this process with "
+ "add_sync_process() instead?"
+ .format(self.name))
- for i, (subfragment, name) in enumerate(fragment.subfragments):
- if signal in subfragment.ports:
- var_name = "{}_{}".format(name or "U{}".format(i), signal.name)
- break
else:
- var_name = signal.name
+ raise TypeError("Received unsupported command {!r} from process {!r}"
+ .format(command, self.name))
- if signal.decoder:
- var_type = "string"
- var_size = 1
- var_init = signal.decoder(signal.reset).expandtabs().replace(" ", "_")
- else:
- var_type = "wire"
- var_size = signal.width
- var_init = signal.reset
+ except StopIteration:
+ self.passive = True
+ self.coroutine = None
+ return
- suffix = None
- while True:
- try:
- if suffix is None:
- var_name_suffix = var_name
- else:
- var_name_suffix = "{}${}".format(var_name, suffix)
- self._vcd_signals[signal_slot].add(self._vcd_writer.register_var(
- scope=".".join(fragment_scope), name=var_name_suffix,
- var_type=var_type, size=var_size, init=var_init))
- if self._vcd_names[signal_slot] is None:
- self._vcd_names[signal_slot] = \
- ".".join(fragment_scope + (var_name_suffix,))
- break
- except KeyError:
- suffix = (suffix or 0) + 1
+ except Exception as exn:
+ self.coroutine.throw(exn)
- for domain_name, signals in fragment.drivers.items():
- signals_bits = bitarray(len(self._signals))
- signals_bits.setall(False)
- for signal in signals:
- signals_bits[self._signal_slots[signal]] = True
- if domain_name is None:
- self._comb_signals |= signals_bits
- else:
- self._sync_signals |= signals_bits
- self._domain_signals[fragment.domains[domain_name]] |= signals_bits
-
- statements = []
- for domain_name, signals in fragment.drivers.items():
- reset_stmts = []
- hold_stmts = []
- for signal in signals:
- reset_stmts.append(signal.eq(signal.reset))
- hold_stmts .append(signal.eq(signal))
-
- if domain_name is None:
- statements += reset_stmts
- else:
- if fragment.domains[domain_name].async_reset:
- statements.append(Switch(fragment.domains[domain_name].rst,
- {0: hold_stmts, 1: reset_stmts}))
- else:
- statements += hold_stmts
- statements += fragment.statements
+class _WaveformContextManager:
+ def __init__(self, state, waveform_writer):
+ self._state = state
+ self._waveform_writer = waveform_writer
+
+ def __enter__(self):
+ try:
+ self._state.start_waveform(self._waveform_writer)
+ except:
+ self._waveform_writer.close(0)
+ raise
- compiler = _StatementCompiler(self._signal_slots)
- funclet = compiler(statements)
+ def __exit__(self, *args):
+ self._state.finish_waveform()
- def add_funclet(signal, funclet):
- if signal in self._signal_slots:
- self._funclets[self._signal_slots[signal]].add(funclet)
- for signal in compiler.sensitivity:
- add_funclet(signal, funclet)
- for domain in fragment.domains.values():
- add_funclet(domain.clk, funclet)
- if domain.rst is not None:
- add_funclet(domain.rst, funclet)
+class Simulator:
+ def __init__(self, fragment, **kwargs):
+ self._state = _SimulatorState()
+ self._signal_names = SignalDict()
+ self._fragment = Fragment.get(fragment, platform=None).prepare()
+ self._processes = _FragmentCompiler(self._state, self._signal_names)(self._fragment)
+ if kwargs: # :nocov:
+ # TODO(nmigen-0.3): remove
+ self._state.start_waveform(_VCDWaveformWriter(self._signal_names, **kwargs))
+ self._clocked = set()
+
+ def _check_process(self, process):
+ if not (inspect.isgeneratorfunction(process) or inspect.iscoroutinefunction(process)):
+ if inspect.isgenerator(process) or inspect.iscoroutine(process):
+ warnings.warn("instead of generators, use generator functions as processes; "
+ "this allows the simulator to be repeatedly reset",
+ DeprecationWarning, stacklevel=3)
+ def wrapper():
+ yield from process
+ return wrapper
+ else:
+ raise TypeError("Cannot add a process {!r} because it is not a generator function"
+ .format(process))
+ return process
- self._user_signals = bitarray(len(self._signals))
- self._user_signals.setall(True)
- self._user_signals &= ~self._comb_signals
- self._user_signals &= ~self._sync_signals
+ def _add_coroutine_process(self, process, *, default_cmd):
+ self._processes.add(_CoroutineProcess(self._state, self._fragment.domains, process,
+ default_cmd=default_cmd))
- return self
+ def add_process(self, process):
+ process = self._check_process(process)
+ def wrapper():
+ # Only start a bench process after comb settling, so that the reset values are correct.
+ yield Settle()
+ yield from process()
+ self._add_coroutine_process(wrapper, default_cmd=None)
+
+ def add_sync_process(self, process, *, domain="sync"):
+ process = self._check_process(process)
+ def wrapper():
+ # Only start a sync process after the first clock edge (or reset edge, if the domain
+ # uses an asynchronous reset). This matches the behavior of synchronous FFs.
+ yield Tick(domain)
+ yield from process()
+ return self._add_coroutine_process(wrapper, default_cmd=Tick(domain))
- def _update_dirty_signals(self):
- """Perform the statement part of IR processes (aka RTLIL case)."""
- # First, for all dirty signals, use sensitivity lists to determine the set of fragments
- # that need their statements to be reevaluated because the signals changed at the previous
- # delta cycle.
- funclets = set()
- for signal_slot in self._state.flush_curr_dirty():
- funclets.update(self._funclets[signal_slot])
-
- # Second, compute the values of all signals at the start of the next delta cycle, by
- # running precompiled statements.
- for funclet in funclets:
- funclet(self._state)
-
- def _commit_signal(self, signal_slot, domains):
- """Perform the driver part of IR processes (aka RTLIL sync), for individual signals."""
- # Take the computed value (at the start of this delta cycle) of a signal (that could have
- # come from an IR process that ran earlier, or modified by a simulator process) and update
- # the value for this delta cycle.
- old, new = self._state.commit(signal_slot)
- if old == new:
+ def add_clock(self, period, *, phase=None, domain="sync", if_exists=False):
+ """Add a clock process.
+
+ Adds a process that drives the clock signal of ``domain`` at a 50% duty cycle.
+
+ Arguments
+ ---------
+ period : float
+ Clock period. The process will toggle the ``domain`` clock signal every ``period / 2``
+ seconds.
+ phase : None or float
+ Clock phase. The process will wait ``phase`` seconds before the first clock transition.
+ If not specified, defaults to ``period / 2``.
+ domain : str or ClockDomain
+ Driven clock domain. If specified as a string, the domain with that name is looked up
+ in the root fragment of the simulation.
+ if_exists : bool
+ If ``False`` (the default), raise an error if the driven domain is specified as
+ a string and the root fragment does not have such a domain. If ``True``, do nothing
+ in this case.
+ """
+ if isinstance(domain, ClockDomain):
+ pass
+ elif domain in self._fragment.domains:
+ domain = self._fragment.domains[domain]
+ elif if_exists:
return
+ else:
+ raise ValueError("Domain {!r} is not present in simulation"
+ .format(domain))
+ if domain in self._clocked:
+ raise ValueError("Domain {!r} already has a clock driving it"
+ .format(domain.name))
- # If the signal is a clock that triggers synchronous logic, record that fact.
- if (self._domain_triggers[signal_slot] is not None and
- self._clk_edges[self._domain_triggers[signal_slot]] == new):
- domains.add(self._domain_triggers[signal_slot])
-
- if self._vcd_writer:
- # Finally, dump the new value to the VCD file.
- for vcd_signal in self._vcd_signals[signal_slot]:
- signal = self._slot_signals[signal_slot]
- if signal.decoder:
- var_value = signal.decoder(new).expandtabs().replace(" ", "_")
- else:
- var_value = new
- vcd_timestamp = (self._timestamp + self._delta) / self._epsilon
- self._vcd_writer.change(vcd_signal, vcd_timestamp, var_value)
-
- def _commit_comb_signals(self, domains):
- """Perform the comb part of IR processes (aka RTLIL always)."""
- # Take the computed value (at the start of this delta cycle) of every comb signal and
- # update the value for this delta cycle.
- for signal_slot in self._state.iter_next_dirty():
- if self._comb_signals[signal_slot]:
- self._commit_signal(signal_slot, domains)
-
- def _commit_sync_signals(self, domains):
- """Perform the sync part of IR processes (aka RTLIL posedge)."""
- # At entry, `domains` contains a set of every simultaneously triggered sync update.
- while domains:
- # Advance the timeline a bit (purely for observational purposes) and commit all of them
- # at the same timestamp.
- self._delta += self._epsilon
- curr_domains, domains = domains, set()
-
- while curr_domains:
- domain = curr_domains.pop()
-
- # Wake up any simulator processes that wait for a domain tick.
- for process, wait_domain_name in list(self._wait_tick.items()):
- if domain.name == wait_domain_name:
- del self._wait_tick[process]
- self._suspended.remove(process)
-
- # Immediately run the process. It is important that this happens here,
- # and not on the next step, when all the processes will run anyway,
- # because Tick() simulates an edge triggered process. Like DFFs that latch
- # a value from the previous clock cycle, simulator processes observe signal
- # values from the previous clock cycle on a tick, too.
- self._run_process(process)
-
- # Take the computed value (at the start of this delta cycle) of every sync signal
- # in this domain and update the value for this delta cycle. This can trigger more
- # synchronous logic, so record that.
- for signal_slot in self._state.iter_next_dirty():
- if self._domain_signals[domain][signal_slot]:
- self._commit_signal(signal_slot, domains)
-
- # Unless handling synchronous logic above has triggered more synchronous logic (which
- # can happen e.g. if a domain is clocked off a clock divisor in fabric), we're done.
- # Otherwise, do one more round of updates.
-
- def _run_process(self, process):
- try:
- cmd = process.send(None)
+ half_period = period / 2
+ if phase is None:
+ # By default, delay the first edge by half period. This causes any synchronous activity
+ # to happen at a non-zero time, distinguishing it from the reset values in the waveform
+ # viewer.
+ phase = half_period
+ def clk_process():
+ yield Passive()
+ yield Delay(phase)
+ # Behave correctly if the process is added after the clock signal is manipulated, or if
+ # its reset state is high.
+ initial = (yield domain.clk)
while True:
- if type(cmd) is Delay:
- if cmd.interval is None:
- interval = self._epsilon
- else:
- interval = cmd.interval
- self._wait_deadline[process] = self._timestamp + interval
- self._suspended.add(process)
- break
-
- elif type(cmd) is Tick:
- self._wait_tick[process] = cmd.domain
- self._suspended.add(process)
- break
-
- elif type(cmd) is Passive:
- self._passive.add(process)
-
- elif type(cmd) is Assign:
- lhs_signals = cmd.lhs._lhs_signals()
- for signal in lhs_signals:
- if not signal in self._signals:
- raise ValueError("Process '{}' sent a request to set signal {!r}, "
- "which is not a part of simulation"
- .format(self._name_process(process), signal))
- signal_slot = self._signal_slots[signal]
- if self._comb_signals[signal_slot]:
- raise ValueError("Process '{}' sent a request to set signal {!r}, "
- "which is a part of combinatorial assignment in "
- "simulation"
- .format(self._name_process(process), signal))
-
- if type(cmd.lhs) is Signal and type(cmd.rhs) is Const:
- # Fast path.
- self._state.set(self._signal_slots[cmd.lhs],
- normalize(cmd.rhs.value, cmd.lhs.shape()))
- else:
- compiler = _StatementCompiler(self._signal_slots)
- funclet = compiler(cmd)
- funclet(self._state)
-
- domains = set()
- for signal in lhs_signals:
- self._commit_signal(self._signal_slots[signal], domains)
- self._commit_sync_signals(domains)
-
- elif type(cmd) is Signal:
- # Fast path.
- cmd = process.send(self._state.curr[self._signal_slots[cmd]])
- continue
-
- elif isinstance(cmd, Value):
- compiler = _RHSValueCompiler(self._signal_slots)
- funclet = compiler(cmd)
- cmd = process.send(funclet(self._state))
- continue
+ yield domain.clk.eq(~initial)
+ yield Delay(half_period)
+ yield domain.clk.eq(initial)
+ yield Delay(half_period)
+ self._add_coroutine_process(clk_process, default_cmd=None)
+ self._clocked.add(domain)
+
+ def reset(self):
+ """Reset the simulation.
+
+ Assign the reset value to every signal in the simulation, and restart every user process.
+ """
+ self._state.reset()
+ for process in self._processes:
+ process.reset()
+
+ def _delta(self):
+ """Perform a delta cycle.
+
+ Performs the two phases of a delta cycle:
+ 1. run and suspend every non-waiting process once, queueing signal changes;
+ 2. commit every queued signal change, waking up any waiting process.
+ """
+ for process in self._processes:
+ if process.runnable:
+ process.runnable = False
+ process.run()
+
+ return self._state.commit()
+
+ def _settle(self):
+ """Settle the simulation.
+
+ Run every process and commit changes until a fixed point is reached. If there is
+ an unstable combinatorial loop, this function will never return.
+ """
+ while self._delta():
+ pass
- else:
- raise TypeError("Received unsupported command {!r} from process '{}'"
- .format(cmd, self._name_process(process)))
-
- cmd = process.send(None)
-
- except StopIteration:
- self._processes.remove(process)
- self._passive.discard(process)
-
- except Exception as e:
- process.throw(e)
-
- def step(self, run_passive=False):
- # Are there any delta cycles we should run?
- if self._state.curr_dirty.any():
- # We might run some delta cycles, and we have simulator processes waiting on
- # a deadline. Take care to not exceed the closest deadline.
- if self._wait_deadline and \
- (self._timestamp + self._delta) >= min(self._wait_deadline.values()):
- # Oops, we blew the deadline. We *could* run the processes now, but this is
- # virtually certainly a logic loop and a design bug, so bail out instead.d
- raise DeadlineError("Delta cycles exceeded process deadline; combinatorial loop?")
-
- domains = set()
- while self._state.curr_dirty.any():
- self._update_dirty_signals()
- self._commit_comb_signals(domains)
- self._commit_sync_signals(domains)
- return True
-
- # Are there any processes that haven't had a chance to run yet?
- if len(self._processes) > len(self._suspended):
- # Schedule an arbitrary one.
- process = (self._processes - set(self._suspended)).pop()
- self._run_process(process)
- return True
-
- # All processes are suspended. Are any of them active?
- if len(self._processes) > len(self._passive) or run_passive:
- # Are any of them suspended before a deadline?
- if self._wait_deadline:
- # Schedule the one with the lowest deadline.
- process, deadline = min(self._wait_deadline.items(), key=lambda x: x[1])
- del self._wait_deadline[process]
- self._suspended.remove(process)
- self._timestamp = deadline
- self._delta = 0.
- self._run_process(process)
- return True
-
- # No processes, or all processes are passive. Nothing to do!
- return False
+ def step(self):
+ """Step the simulation.
+
+ Run every process and commit changes until a fixed point is reached, then advance time
+ to the closest deadline (if any). If there is an unstable combinatorial loop,
+ this function will never return.
+
+ Returns ``True`` if there are any active processes, ``False`` otherwise.
+ """
+ self._settle()
+ self._state.advance()
+ return any(not process.passive for process in self._processes)
def run(self):
- self._run_called = True
+ """Run the simulation while any processes are active.
+ Processes added with :meth:`add_process` and :meth:`add_sync_process` are initially active,
+ and may change their status using the ``yield Passive()`` and ``yield Active()`` commands.
+ Processes compiled from HDL and added with :meth:`add_clock` are always passive.
+ """
while self.step():
pass
- def run_until(self, deadline, run_passive=False):
- self._run_called = True
+ def run_until(self, deadline, *, run_passive=False):
+ """Run the simulation until it advances to ``deadline``.
- while self._timestamp < deadline:
- if not self.step(run_passive):
- return False
+ If ``run_passive`` is ``False``, the simulation also stops when there are no active
+ processes, similar to :meth:`run`. Otherwise, the simulation will stop only after it
+ advances to or past ``deadline``.
- return True
+ If the simulation stops advancing, this function will never return.
+ """
+ assert self._state.timestamp <= deadline
+ while (self.step() or run_passive) and self._state.timestamp < deadline:
+ pass
- def __exit__(self, *args):
- if not self._run_called:
- warnings.warn("Simulation created, but not run", UserWarning)
-
- if self._vcd_writer:
- vcd_timestamp = (self._timestamp + self._delta) / self._epsilon
- self._vcd_writer.close(vcd_timestamp)
-
- if self._vcd_file and self._gtkw_file:
- gtkw_save = GTKWSave(self._gtkw_file)
- if hasattr(self._vcd_file, "name"):
- gtkw_save.dumpfile(self._vcd_file.name)
- if hasattr(self._vcd_file, "tell"):
- gtkw_save.dumpfile_size(self._vcd_file.tell())
-
- gtkw_save.treeopen("top")
- gtkw_save.zoom_markers(math.log(self._epsilon / self._fastest_clock) - 14)
-
- def add_trace(signal, **kwargs):
- signal_slot = self._signal_slots[signal]
- if self._vcd_names[signal_slot] is not None:
- if len(signal) > 1 and not signal.decoder:
- suffix = "[{}:0]".format(len(signal) - 1)
- else:
- suffix = ""
- gtkw_save.trace(self._vcd_names[signal_slot] + suffix, **kwargs)
-
- for domain in self._domains:
- with gtkw_save.group("d.{}".format(domain.name)):
- if domain.rst is not None:
- add_trace(domain.rst)
- add_trace(domain.clk)
-
- for signal in self._traces:
- add_trace(signal)
-
- if self._vcd_file:
- self._vcd_file.close()
- if self._gtkw_file:
- self._gtkw_file.close()
+ def write_vcd(self, vcd_file, gtkw_file=None, *, traces=()):
+ """Write waveforms to a Value Change Dump file, optionally populating a GTKWave save file.
+
+ This method returns a context manager. It can be used as: ::
+
+ sim = Simulator(frag)
+ sim.add_clock(1e-6)
+ with sim.write_vcd("dump.vcd", "dump.gtkw"):
+ sim.run_until(1e-3)
+
+ Arguments
+ ---------
+ vcd_file : str or file-like object
+ Verilog Value Change Dump file or filename.
+ gtkw_file : str or file-like object
+ GTKWave save file or filename.
+ traces : iterable of Signal
+ Signals to display traces for.
+ """
+ waveform_writer = _VCDWaveformWriter(self._signal_names,
+ vcd_file=vcd_file, gtkw_file=gtkw_file, traces=traces)
+ return _WaveformContextManager(self._state, waveform_writer)
+
+ # TODO(nmigen-0.3): remove
+ @deprecated("instead of `with Simulator(fragment, ...) as sim:`, use "
+ "`sim = Simulator(fragment); with sim.write_vcd(...):`")
+ def __enter__(self): # :nocov:
+ return self
+
+ # TODO(nmigen-0.3): remove
+ def __exit__(self, *args): # :nocov:
+ self._state.finish_waveform()