From 7df70059d1483677b1c4d24eaed3bac323b32a11 Mon Sep 17 00:00:00 2001 From: whitequark Date: Fri, 22 Nov 2019 08:32:41 +0000 Subject: [PATCH] back.pysim: redesign the simulator. The redesign introduces no fundamental incompatibilities, but it does involve minor breaking changes: * The simulator commands were moved from hdl.ast to back.pysim (instead of only being reexported from back.pysim). * back.pysim.DeadlineError was removed. Summary of changes: * The new simulator compiles HDL to Python code and is >6x faster. (The old one compiled HDL to lots of Python lambdas.) * The new simulator is a straightforward, rigorous implementation of the Synchronous Reactive Programming paradigm, instead of a pile of ad-hoc code with no particular design driving it. * The new simulator never raises DeadlineError, and there is no limit on the amount of delta cycles. * The new simulator robustly handles multiclock designs. * The new simulator can be reset, such that the compiled design can be reused, which can save significant runtime with large designs. * Generators can no longer be added as processes, since that would break reset(); only generator functions may be. If necessary, they may be added by wrapping them into a generator function; a deprecated fallback does just that. This workaround will raise an exception if the simulator is reset and restarted. * The new simulator does not depend on Python extensions. (The old one required bitarray, which did not provide wheels.) Fixes #28. Fixes #34. Fixes #160. Fixes #161. Fixes #215. Fixes #242. Fixes #262. --- examples/basic/ctr_en.py | 24 +- nmigen/back/pysim.py | 1682 ++++++++++++++++++-------------- nmigen/compat/sim/__init__.py | 27 +- nmigen/test/test_lib_cdc.py | 107 +- nmigen/test/test_lib_coding.py | 100 +- nmigen/test/test_sim.py | 136 ++- setup.py | 6 +- 7 files changed, 1170 insertions(+), 912 deletions(-) diff --git a/examples/basic/ctr_en.py b/examples/basic/ctr_en.py index 1b1aec5..becc7c9 100644 --- a/examples/basic/ctr_en.py +++ b/examples/basic/ctr_en.py @@ -19,17 +19,15 @@ ctr = Counter(width=16) print(verilog.convert(ctr, ports=[ctr.o, ctr.en])) -with pysim.Simulator(ctr, - vcd_file=open("ctrl.vcd", "w"), - gtkw_file=open("ctrl.gtkw", "w"), - traces=[ctr.en, ctr.v, ctr.o]) as sim: - sim.add_clock(1e-6) - def ce_proc(): - yield; yield; yield - yield ctr.en.eq(1) - yield; yield; yield - yield ctr.en.eq(0) - yield; yield; yield - yield ctr.en.eq(1) - sim.add_sync_process(ce_proc()) +sim = pysim.Simulator(ctr) +sim.add_clock(1e-6) +def ce_proc(): + yield; yield; yield + yield ctr.en.eq(1) + yield; yield; yield + yield ctr.en.eq(0) + yield; yield; yield + yield ctr.en.eq(1) +sim.add_sync_process(ce_proc) +with sim.write_vcd("ctrl.vcd", "ctrl.gtkw", traces=[ctr.en, ctr.v, ctr.o]): sim.run_until(100e-6, run_passive=True) diff --git a/nmigen/back/pysim.py b/nmigen/back/pysim.py index dc51e7c..6f905ff 100644 --- a/nmigen/back/pysim.py +++ b/nmigen/back/pysim.py @@ -1,79 +1,365 @@ -import math import inspect import warnings from contextlib import contextmanager -from bitarray import bitarray from vcd import VCDWriter from vcd.gtkw import GTKWSave -from .._utils import flatten +from .._utils import deprecated from ..hdl.ast import * +from ..hdl.cd import * from ..hdl.ir import * -from ..hdl.xfrm import ValueVisitor, StatementVisitor +from ..hdl.xfrm import ValueVisitor, StatementVisitor, LHSGroupFilter -__all__ = ["Simulator", "Delay", "Tick", "Passive", "DeadlineError"] +class Command: + pass -class DeadlineError(Exception): - pass +class Settle(Command): + def __repr__(self): + return "(settle)" + + +class Delay(Command): + def __init__(self, interval=None): + self.interval = None if interval is None else float(interval) + + def __repr__(self): + if self.interval is None: + return "(delay ε)" + else: + return "(delay {:.3}us)".format(self.interval * 1e6) + + +class Tick(Command): + def __init__(self, domain="sync"): + if not isinstance(domain, (str, ClockDomain)): + raise TypeError("Domain must be a string or a ClockDomain instance, not {!r}" + .format(domain)) + assert domain != "comb" + self.domain = domain + + def __repr__(self): + return "(tick {})".format(self.domain) + + +class Passive(Command): + def __repr__(self): + return "(passive)" + + +class Active(Command): + def __repr__(self): + return "(active)" + + +class _WaveformWriter: + def update(self, timestamp, signal, value): + raise NotImplementedError # :nocov: + + def close(self, timestamp): + raise NotImplementedError # :nocov: + +class _VCDWaveformWriter(_WaveformWriter): + @staticmethod + def timestamp_to_vcd(timestamp): + return timestamp * (10 ** 10) # 1/(100 ps) + + @staticmethod + def decode_to_vcd(signal, value): + return signal.decoder(value).expandtabs().replace(" ", "_") + + def __init__(self, signal_names, *, vcd_file, gtkw_file=None, traces=()): + if isinstance(vcd_file, str): + vcd_file = open(vcd_file, "wt") + if isinstance(gtkw_file, str): + gtkw_file = open(gtkw_file, "wt") + + self.vcd_vars = SignalDict() + self.vcd_file = vcd_file + self.vcd_writer = vcd_file and VCDWriter(self.vcd_file, + timescale="100 ps", comment="Generated by nMigen") + + self.gtkw_names = SignalDict() + self.gtkw_file = gtkw_file + self.gtkw_save = gtkw_file and GTKWSave(self.gtkw_file) + + for signal, names in signal_names.items(): + if signal.decoder: + var_type = "string" + var_size = 1 + var_init = self.decode_to_vcd(signal, signal.reset) + else: + var_type = "wire" + var_size = signal.width + var_init = signal.reset -class _State: - __slots__ = ("curr", "curr_dirty", "next", "next_dirty") + for (*var_scope, var_name) in names: + suffix = None + while True: + try: + if suffix is None: + var_name_suffix = var_name + else: + var_name_suffix = "{}${}".format(var_name, suffix) + vcd_var = self.vcd_writer.register_var( + scope=var_scope, name=var_name_suffix, + var_type=var_type, size=var_size, init=var_init) + break + except KeyError: + suffix = (suffix or 0) + 1 + + if signal not in self.vcd_vars: + self.vcd_vars[signal] = set() + self.vcd_vars[signal].add(vcd_var) + + if signal not in self.gtkw_names: + self.gtkw_names[signal] = (*var_scope, var_name_suffix) + + def update(self, timestamp, signal, value): + if signal not in self.vcd_vars: + return + + vcd_timestamp = self.timestamp_to_vcd(timestamp) + if signal.decoder: + var_value = self.decode_to_vcd(signal, value) + else: + var_value = value + for vcd_var in self.vcd_vars[signal]: + self.vcd_writer.change(vcd_var, vcd_timestamp, var_value) + + def close(self, timestamp): + self.vcd_writer.close(self.timestamp_to_vcd(timestamp)) + + if self.gtkw_save is not None: + self.gtkw_save.dumpfile(self.vcd_file.name) + self.gtkw_save.dumpfile_size(self.vcd_file.tell()) + + self.gtkw_save.treeopen("top") + for signal, hierarchy in self.gtkw_names.items(): + if len(signal) > 1 and not signal.decoder: + suffix = "[{}:0]".format(len(signal) - 1) + else: + suffix = "" + self.gtkw_save.trace(".".join(hierarchy) + suffix) + + self.vcd_file.close() + if self.gtkw_file is not None: + self.gtkw_file.close() + + +class _Process: + __slots__ = ("runnable", "passive") + + def reset(self): + raise NotImplementedError # :nocov: + + def run(self): + raise NotImplementedError # :nocov: + + @property + def name(self): + raise NotImplementedError # :nocov: + + +class _SignalState: + __slots__ = ("signal", "curr", "next", "waiters", "pending") + + def __init__(self, signal, pending): + self.signal = signal + self.pending = pending + self.waiters = dict() + self.reset() + + def reset(self): + self.curr = self.next = self.signal.reset + + def set(self, value): + if self.next == value: + return + self.next = value + self.pending.add(self) + def wait(self, task, *, trigger=None): + assert task not in self.waiters + self.waiters[task] = trigger + + def commit(self): + if self.curr == self.next: + return False + self.curr = self.next + return True + + def wakeup(self): + awoken_any = False + for process, trigger in self.waiters.items(): + if trigger is None or trigger == self.curr: + process.runnable = awoken_any = True + return awoken_any + + +class _SimulatorState: def __init__(self): - self.curr = [] - self.next = [] - self.curr_dirty = bitarray() - self.next_dirty = bitarray() - - def add(self, value): - slot = len(self.curr) - self.curr.append(value) - self.next.append(value) - self.curr_dirty.append(True) - self.next_dirty.append(False) - return slot - - def set(self, slot, value): - if self.next[slot] != value: - self.next_dirty[slot] = True - self.next[slot] = value - - def commit(self, slot): - old_value = self.curr[slot] - new_value = self.next[slot] - if old_value != new_value: - self.next_dirty[slot] = False - self.curr_dirty[slot] = True - self.curr[slot] = new_value - return old_value, new_value - - def flush_curr_dirty(self): - while True: - try: - slot = self.curr_dirty.index(True) - except ValueError: - break - self.curr_dirty[slot] = False - yield slot + self.signals = SignalDict() + self.pending = set() - def iter_next_dirty(self): - start = 0 - while True: - try: - slot = self.next_dirty.index(True, start) - start = slot + 1 - except ValueError: + self.timestamp = 0.0 + self.deadlines = dict() + + self.waveform_writer = None + + def reset(self): + for signal_state in self.signals.values(): + signal_state.reset() + self.pending.clear() + + self.timestamp = 0.0 + self.deadlines.clear() + + def for_signal(self, signal): + try: + return self.signals[signal] + except KeyError: + signal_state = _SignalState(signal, self.pending) + self.signals[signal] = signal_state + return signal_state + + def commit(self): + awoken_any = False + for signal_state in self.pending: + if signal_state.commit(): + if signal_state.wakeup(): + awoken_any = True + if self.waveform_writer is not None: + self.waveform_writer.update(self.timestamp, + signal_state.signal, signal_state.curr) + return awoken_any + + def advance(self): + nearest_processes = set() + nearest_deadline = None + for process, deadline in self.deadlines.items(): + if deadline is None: + if nearest_deadline is not None: + nearest_processes.clear() + nearest_processes.add(process) + nearest_deadline = self.timestamp break - yield slot + elif nearest_deadline is None or deadline <= nearest_deadline: + assert deadline >= self.timestamp + if nearest_deadline is not None and deadline < nearest_deadline: + nearest_processes.clear() + nearest_processes.add(process) + nearest_deadline = deadline + + if not nearest_processes: + return False + + for process in nearest_processes: + process.runnable = True + del self.deadlines[process] + self.timestamp = nearest_deadline + + return True + + def start_waveform(self, waveform_writer): + if self.timestamp != 0.0: + raise ValueError("Cannot start writing waveforms after advancing simulation time") + if self.waveform_writer is not None: + raise ValueError("Already writing waveforms to {!r}" + .format(self.waveform_writer)) + self.waveform_writer = waveform_writer + + def finish_waveform(self): + if self.waveform_writer is None: + return + self.waveform_writer.close(self.timestamp) + self.waveform_writer = None + + +class _EvalContext: + __slots__ = ("state", "indexes", "slots") + + def __init__(self, state): + self.state = state + self.indexes = SignalDict() + self.slots = [] + + def get_signal(self, signal): + try: + return self.indexes[signal] + except KeyError: + index = len(self.slots) + self.slots.append(self.state.for_signal(signal)) + self.indexes[signal] = index + return index + + def get_in_signal(self, signal, *, trigger=None): + index = self.get_signal(signal) + self.slots[index].waiters[self] = trigger + return index + + def get_out_signal(self, signal): + return self.get_signal(signal) -normalize = Const.normalize +class _Emitter: + def __init__(self): + self._buffer = [] + self._suffix = 0 + self._level = 0 + + def append(self, code): + self._buffer.append(" " * self._level) + self._buffer.append(code) + self._buffer.append("\n") + + @contextmanager + def indent(self): + self._level += 1 + yield + self._level -= 1 + + def flush(self, indent=""): + code = "".join(self._buffer) + self._buffer.clear() + return code + + def gen_var(self, prefix): + name = f"{prefix}_{self._suffix}" + self._suffix += 1 + return name + + def def_var(self, prefix, value): + name = self.gen_var(prefix) + self.append(f"{name} = {value}") + return name + + +class _Compiler: + def __init__(self, context, emitter): + self.context = context + self.emitter = emitter + + +class _ValueCompiler(ValueVisitor, _Compiler): + helpers = { + "sign": lambda value, sign: value | sign if value & sign else value, + "zdiv": lambda lhs, rhs: 0 if rhs == 0 else lhs // rhs, + "sshl": lambda lhs, rhs: lhs << rhs if rhs >= 0 else lhs >> -rhs, + "sshr": lambda lhs, rhs: lhs >> rhs if rhs >= 0 else lhs << -rhs, + } + + def on_ClockSignal(self, value): + raise NotImplementedError # :nocov: + def on_ResetSignal(self, value): + raise NotImplementedError # :nocov: + + def on_Record(self, value): + return self(Cat(value.fields.values())) -class _ValueCompiler(ValueVisitor): def on_AnyConst(self, value): raise NotImplementedError # :nocov: @@ -86,805 +372,725 @@ class _ValueCompiler(ValueVisitor): def on_Initial(self, value): raise NotImplementedError # :nocov: - def on_Record(self, value): - return self(Cat(value.fields.values())) - class _RHSValueCompiler(_ValueCompiler): - def __init__(self, signal_slots, sensitivity=None, mode="rhs"): - self.signal_slots = signal_slots - self.sensitivity = sensitivity - self.signal_mode = mode + def __init__(self, context, emitter, *, mode, inputs=None): + super().__init__(context, emitter) + assert mode in ("curr", "next") + self.mode = mode + # If not None, `inputs` gets populated with RHS signals. + self.inputs = inputs def on_Const(self, value): - return lambda state: value.value + return f"{value.value}" def on_Signal(self, value): - if self.sensitivity is not None: - self.sensitivity.add(value) - if value not in self.signal_slots: - # A signal that is neither driven nor a port always remains at its reset state. - return lambda state: value.reset - value_slot = self.signal_slots[value] - if self.signal_mode == "rhs": - return lambda state: state.curr[value_slot] - elif self.signal_mode == "lhs": - return lambda state: state.next[value_slot] + if self.inputs is not None: + self.inputs.add(value) + + if self.mode == "curr": + return f"slots[{self.context.get_signal(value)}].{self.mode}" else: - raise ValueError # :nocov: + return f"next_{self.context.get_signal(value)}" - def on_ClockSignal(self, value): - raise NotImplementedError # :nocov: + def on_Operator(self, value): + def mask(value): + value_mask = (1 << len(value)) - 1 + return f"({self(value)} & {value_mask})" - def on_ResetSignal(self, value): - raise NotImplementedError # :nocov: + def sign(value): + if value.shape().signed: + return f"sign({mask(value)}, {-1 << (len(value) - 1)})" + else: # unsigned + return mask(value) - def on_Operator(self, value): - shape = value.shape() if len(value.operands) == 1: - arg, = map(self, value.operands) + arg, = value.operands if value.operator == "~": - return lambda state: normalize(~arg(state), shape) + return f"(~{self(arg)})" if value.operator == "-": - return lambda state: normalize(-arg(state), shape) + return f"(-{self(arg)})" if value.operator == "b": - return lambda state: normalize(bool(arg(state)), shape) + return f"bool({mask(arg)})" if value.operator == "r|": - return lambda state: normalize(arg(state) != 0, shape) + return f"({mask(arg)} != 0)" if value.operator == "r&": - val, = value.operands - mask = (1 << len(val)) - 1 - return lambda state: normalize(arg(state) == mask, shape) + return f"({mask(arg)} == {(1 << len(arg)) - 1})" if value.operator == "r^": # Believe it or not, this is the fastest way to compute a sideways XOR in Python. - return lambda state: normalize(format(arg(state), "b").count("1") % 2, shape) + return f"(format({mask(arg)}, 'b').count('1') % 2)" elif len(value.operands) == 2: - lhs, rhs = map(self, value.operands) + lhs, rhs = value.operands + lhs_mask = (1 << len(lhs)) - 1 + rhs_mask = (1 << len(rhs)) - 1 if value.operator == "+": - return lambda state: normalize(lhs(state) + rhs(state), shape) + return f"({mask(lhs)} + {mask(rhs)})" if value.operator == "-": - return lambda state: normalize(lhs(state) - rhs(state), shape) + return f"({mask(lhs)} - {mask(rhs)})" if value.operator == "*": - return lambda state: normalize(lhs(state) * rhs(state), shape) + return f"({sign(lhs)} * {sign(rhs)})" if value.operator == "//": - def floordiv(lhs, rhs): - return 0 if rhs == 0 else lhs // rhs - return lambda state: normalize(floordiv(lhs(state), rhs(state)), shape) + return f"zdiv({sign(lhs)}, {sign(rhs)})" if value.operator == "&": - return lambda state: normalize(lhs(state) & rhs(state), shape) + return f"({self(lhs)} & {self(rhs)})" if value.operator == "|": - return lambda state: normalize(lhs(state) | rhs(state), shape) + return f"({self(lhs)} | {self(rhs)})" if value.operator == "^": - return lambda state: normalize(lhs(state) ^ rhs(state), shape) + return f"({self(lhs)} ^ {self(rhs)})" if value.operator == "<<": - def sshl(lhs, rhs): - return lhs << rhs if rhs >= 0 else lhs >> -rhs - return lambda state: normalize(sshl(lhs(state), rhs(state)), shape) + return f"sshl({sign(lhs)}, {sign(rhs)})" if value.operator == ">>": - def sshr(lhs, rhs): - return lhs >> rhs if rhs >= 0 else lhs << -rhs - return lambda state: normalize(sshr(lhs(state), rhs(state)), shape) + return f"sshr({sign(lhs)}, {sign(rhs)})" if value.operator == "==": - return lambda state: normalize(lhs(state) == rhs(state), shape) + return f"({sign(lhs)} == {sign(rhs)})" if value.operator == "!=": - return lambda state: normalize(lhs(state) != rhs(state), shape) + return f"({sign(lhs)} != {sign(rhs)})" if value.operator == "<": - return lambda state: normalize(lhs(state) < rhs(state), shape) + return f"({sign(lhs)} < {sign(rhs)})" if value.operator == "<=": - return lambda state: normalize(lhs(state) <= rhs(state), shape) + return f"({sign(lhs)} <= {sign(rhs)})" if value.operator == ">": - return lambda state: normalize(lhs(state) > rhs(state), shape) + return f"({sign(lhs)} > {sign(rhs)})" if value.operator == ">=": - return lambda state: normalize(lhs(state) >= rhs(state), shape) + return f"({sign(lhs)} >= {sign(rhs)})" elif len(value.operands) == 3: if value.operator == "m": - sel, val1, val0 = map(self, value.operands) - return lambda state: val1(state) if sel(state) else val0(state) + sel, val1, val0 = value.operands + return f"({self(val1)} if {self(sel)} else {self(val0)})" raise NotImplementedError("Operator '{}' not implemented".format(value.operator)) # :nocov: def on_Slice(self, value): - shape = value.shape() - arg = self(value.value) - shift = value.start - mask = (1 << (value.stop - value.start)) - 1 - return lambda state: normalize((arg(state) >> shift) & mask, shape) + return f"(({self(value.value)} >> {value.start}) & {(1 << len(value)) - 1})" def on_Part(self, value): - shape = value.shape() - arg = self(value.value) - shift = self(value.offset) - mask = (1 << value.width) - 1 - stride = value.stride - return lambda state: normalize((arg(state) >> shift(state) * stride) & mask, shape) + offset_mask = (1 << len(value.offset)) - 1 + offset = f"(({self(value.offset)} & {offset_mask}) * {value.stride})" + return f"({self(value.value)} >> {offset} & " \ + f"{(1 << value.width) - 1})" def on_Cat(self, value): - shape = value.shape() - parts = [] + gen_parts = [] offset = 0 - for opnd in value.parts: - parts.append((offset, (1 << len(opnd)) - 1, self(opnd))) - offset += len(opnd) - def eval(state): - result = 0 - for offset, mask, opnd in parts: - result |= (opnd(state) & mask) << offset - return normalize(result, shape) - return eval + for part in value.parts: + part_mask = (1 << len(part)) - 1 + gen_parts.append(f"(({self(part)} & {part_mask}) << {offset})") + offset += len(part) + return f"({' | '.join(gen_parts)})" def on_Repl(self, value): - shape = value.shape() - offset = len(value.value) - mask = (1 << len(value.value)) - 1 - count = value.count - opnd = self(value.value) - def eval(state): - result = 0 - for _ in range(count): - result <<= offset - result |= opnd(state) - return normalize(result, shape) - return eval + part_mask = (1 << len(value.value)) - 1 + gen_part = self.emitter.def_var("repl", f"{self(value.value)} & {part_mask}") + gen_parts = [] + offset = 0 + for _ in range(value.count): + gen_parts.append(f"({gen_part} << {offset})") + offset += len(value.value) + return f"({' | '.join(gen_parts)})" def on_ArrayProxy(self, value): - shape = value.shape() - elems = list(map(self, value.elems)) - index = self(value.index) - def eval(state): - index_value = index(state) - if index_value >= len(elems): - index_value = len(elems) - 1 - return normalize(elems[index_value](state), shape) - return eval + index_mask = (1 << len(value.index)) - 1 + gen_index = self.emitter.def_var("rhs_index", f"{self(value.index)} & {index_mask}") + gen_value = self.emitter.gen_var("rhs_proxy") + if value.elems: + gen_elems = [] + for index, elem in enumerate(value.elems): + if index == 0: + self.emitter.append(f"if {gen_index} == {index}:") + else: + self.emitter.append(f"elif {gen_index} == {index}:") + with self.emitter.indent(): + self.emitter.append(f"{gen_value} = {self(elem)}") + self.emitter.append(f"else:") + with self.emitter.indent(): + self.emitter.append(f"{gen_value} = {self(value.elems[-1])}") + return gen_value + else: + return f"0" + + @classmethod + def compile(cls, context, value, *, mode, inputs=None): + emitter = _Emitter() + compiler = cls(context, emitter, mode=mode, inputs=inputs) + emitter.append(f"result = {compiler(value)}") + return emitter.flush() class _LHSValueCompiler(_ValueCompiler): - def __init__(self, signal_slots, rhs_compiler): - self.signal_slots = signal_slots - self.rhs_compiler = rhs_compiler + def __init__(self, context, emitter, *, rhs, outputs=None): + super().__init__(context, emitter) + # `rrhs` is used to translate rvalues that are syntactically a part of an lvalue, e.g. + # the offset of a Part. + self.rrhs = rhs + # `lrhs` is used to translate the read part of a read-modify-write cycle during partial + # update of an lvalue. + self.lrhs = _RHSValueCompiler(context, emitter, mode="next", inputs=None) + # If not None, `outputs` gets populated with signals on LHS. + self.outputs = outputs def on_Const(self, value): raise TypeError # :nocov: def on_Signal(self, value): - shape = value.shape() - value_slot = self.signal_slots[value] - def eval(state, rhs): - state.set(value_slot, normalize(rhs, shape)) - return eval - - def on_ClockSignal(self, value): - raise NotImplementedError # :nocov: - - def on_ResetSignal(self, value): - raise NotImplementedError # :nocov: + if self.outputs is not None: + self.outputs.add(value) + + def gen(arg): + value_mask = (1 << len(value)) - 1 + if value.shape().signed: + value_sign = f"sign({arg} & {value_mask}, {-1 << (len(value) - 1)})" + else: # unsigned + value_sign = f"{arg} & {value_mask}" + self.emitter.append(f"next_{self.context.get_out_signal(value)} = {value_sign}") + return gen def on_Operator(self, value): raise TypeError # :nocov: def on_Slice(self, value): - lhs_r = self.rhs_compiler(value.value) - lhs_l = self(value.value) - shift = value.start - mask = (1 << (value.stop - value.start)) - 1 - def eval(state, rhs): - lhs_value = lhs_r(state) - lhs_value &= ~(mask << shift) - lhs_value |= (rhs & mask) << shift - lhs_l(state, lhs_value) - return eval + def gen(arg): + width_mask = (1 << (value.stop - value.start)) - 1 + self(value.value)(f"({self.lrhs(value.value)} & " \ + f"{~(width_mask << value.start)} | " \ + f"(({arg} & {width_mask}) << {value.start}))") + return gen def on_Part(self, value): - lhs_r = self.rhs_compiler(value.value) - lhs_l = self(value.value) - shift = self.rhs_compiler(value.offset) - mask = (1 << value.width) - 1 - stride = value.stride - def eval(state, rhs): - lhs_value = lhs_r(state) - shift_value = shift(state) * stride - lhs_value &= ~(mask << shift_value) - lhs_value |= (rhs & mask) << shift_value - lhs_l(state, lhs_value) - return eval + def gen(arg): + width_mask = (1 << value.width) - 1 + offset_mask = (1 << len(value.offset)) - 1 + offset = f"(({self.rrhs(value.offset)} & {offset_mask}) * {value.stride})" + self(value.value)(f"({self.lrhs(value.value)} & " \ + f"~({width_mask} << {offset}) | " \ + f"(({arg} & {width_mask}) << {offset}))") + return gen def on_Cat(self, value): - parts = [] - offset = 0 - for opnd in value.parts: - parts.append((offset, (1 << len(opnd)) - 1, self(opnd))) - offset += len(opnd) - def eval(state, rhs): - for offset, mask, opnd in parts: - opnd(state, (rhs >> offset) & mask) - return eval + def gen(arg): + gen_arg = self.emitter.def_var("cat", arg) + gen_parts = [] + offset = 0 + for part in value.parts: + part_mask = (1 << len(part)) - 1 + self(part)(f"(({gen_arg} >> {offset}) & {part_mask})") + offset += len(part) + return gen def on_Repl(self, value): raise TypeError # :nocov: def on_ArrayProxy(self, value): - elems = list(map(self, value.elems)) - index = self.rhs_compiler(value.index) - def eval(state, rhs): - index_value = index(state) - if index_value >= len(elems): - index_value = len(elems) - 1 - elems[index_value](state, rhs) - return eval - - -class _StatementCompiler(StatementVisitor): - def __init__(self, signal_slots): - self.sensitivity = SignalSet() - self.rrhs_compiler = _RHSValueCompiler(signal_slots, self.sensitivity, mode="rhs") - self.lrhs_compiler = _RHSValueCompiler(signal_slots, self.sensitivity, mode="lhs") - self.lhs_compiler = _LHSValueCompiler(signal_slots, self.lrhs_compiler) + def gen(arg): + index_mask = (1 << len(value.index)) - 1 + gen_index = self.emitter.def_var("index", f"{self.rrhs(value.index)} & {index_mask}") + if value.elems: + gen_elems = [] + for index, elem in enumerate(value.elems): + if index == 0: + self.emitter.append(f"if {gen_index} == {index}:") + else: + self.emitter.append(f"elif {gen_index} == {index}:") + with self.emitter.indent(): + self(elem)(arg) + self.emitter.append(f"else:") + with self.emitter.indent(): + self(value.elems[-1])(arg) + else: + self.emitter.append(f"pass") + return gen + + @classmethod + def compile(cls, context, stmt, *, inputs=None, outputs=None): + emitter = _Emitter() + compiler = cls(context, emitter, inputs=inputs, outputs=outputs) + compiler(stmt) + return emitter.flush() + + +class _StatementCompiler(StatementVisitor, _Compiler): + def __init__(self, context, emitter, *, inputs=None, outputs=None): + super().__init__(context, emitter) + self.rhs = _RHSValueCompiler(context, emitter, mode="curr", inputs=inputs) + self.lhs = _LHSValueCompiler(context, emitter, rhs=self.rhs, outputs=outputs) + + def on_statements(self, stmts): + for stmt in stmts: + self(stmt) + if not stmts: + self.emitter.append("pass") def on_Assign(self, stmt): - shape = stmt.lhs.shape() - lhs = self.lhs_compiler(stmt.lhs) - rhs = self.rrhs_compiler(stmt.rhs) - def run(state): - lhs(state, normalize(rhs(state), shape)) - return run + return self.lhs(stmt.lhs)(self.rhs(stmt.rhs)) + + def on_Switch(self, stmt): + gen_test = self.emitter.def_var("test", + f"{self.rhs(stmt.test)} & {(1 << len(stmt.test)) - 1}") + for index, (patterns, stmts) in enumerate(stmt.cases.items()): + gen_checks = [] + if not patterns: + gen_checks.append(f"True") + else: + for pattern in patterns: + if "-" in pattern: + mask = int("".join("0" if b == "-" else "1" for b in pattern), 2) + value = int("".join("0" if b == "-" else b for b in pattern), 2) + gen_checks.append(f"({gen_test} & {mask}) == {value}") + else: + value = int(pattern, 2) + gen_checks.append(f"{gen_test} == {value}") + if index == 0: + self.emitter.append(f"if {' or '.join(gen_checks)}:") + else: + self.emitter.append(f"elif {' or '.join(gen_checks)}:") + with self.emitter.indent(): + self(stmts) def on_Assert(self, stmt): - raise NotImplementedError("Asserts not yet implemented for Simulator backend.") # :nocov: + raise NotImplementedError # :nocov: def on_Assume(self, stmt): - pass # :nocov: + raise NotImplementedError # :nocov: def on_Cover(self, stmt): - raise NotImplementedError("Covers not yet implemented for Simulator backend.") # :nocov: + raise NotImplementedError # :nocov: - def on_Switch(self, stmt): - test = self.rrhs_compiler(stmt.test) - cases = [] - for values, stmts in stmt.cases.items(): - if values == (): - check = lambda test: True + @classmethod + def compile(cls, context, stmt, *, inputs=None, outputs=None): + output_indexes = [context.get_signal(signal) for signal in stmt._lhs_signals()] + emitter = _Emitter() + for signal_index in output_indexes: + emitter.append(f"next_{signal_index} = slots[{signal_index}].next") + compiler = cls(context, emitter, inputs=inputs, outputs=outputs) + compiler(stmt) + for signal_index in output_indexes: + emitter.append(f"slots[{signal_index}].set(next_{signal_index})") + return emitter.flush() + + +class _CompiledProcess(_Process): + __slots__ = ("context", "comb", "name", "run") + + def __init__(self, state, *, comb, name): + self.context = _EvalContext(state) + self.comb = comb + self.name = name + self.run = None # set by _FragmentCompiler + self.reset() + + def reset(self): + self.runnable = self.comb + self.passive = True + + +class _FragmentCompiler: + def __init__(self, state, signal_names): + self.state = state + self.signal_names = signal_names + + def __call__(self, fragment, *, hierarchy=("top",)): + processes = set() + + def add_signal_name(signal): + hierarchical_signal_name = (*hierarchy, signal.name) + if signal not in self.signal_names: + self.signal_names[signal] = {hierarchical_signal_name} else: - check = lambda test: False - def make_check(mask, value, prev_check): - return lambda test: prev_check(test) or test & mask == value - for value in values: - if "-" in value: - mask = "".join("0" if b == "-" else "1" for b in value) - value = "".join("0" if b == "-" else b for b in value) - else: - mask = "1" * len(value) - mask = int(mask, 2) - value = int(value, 2) - check = make_check(mask, value, check) - cases.append((check, self.on_statements(stmts))) - def run(state): - test_value = test(state) - for check, body in cases: - if check(test_value): - body(state) - return - return run + self.signal_names[signal].add(hierarchical_signal_name) - def on_statements(self, stmts): - stmts = [self.on_statement(stmt) for stmt in stmts] - def run(state): - for stmt in stmts: - stmt(state) - return run + for domain_name, domain_signals in fragment.drivers.items(): + for domain_signal in domain_signals: + add_signal_name(domain_signal) + domain_stmts = LHSGroupFilter(domain_signals)(fragment.statements) + domain_process = _CompiledProcess(self.state, comb=domain_name is None, + name=".".join((*hierarchy, "<{}>".format(domain_name or "comb")))) -class Simulator: - def __init__(self, fragment, vcd_file=None, gtkw_file=None, traces=()): - self._fragment = Fragment.get(fragment, platform=None) - - self._signal_slots = SignalDict() # Signal -> int/slot - self._slot_signals = list() # int/slot -> Signal - - self._domains = list() # [ClockDomain] - self._clk_edges = dict() # ClockDomain -> int/edge - self._domain_triggers = list() # int/slot -> ClockDomain - - self._signals = SignalSet() # {Signal} - self._comb_signals = bitarray() # {Signal} - self._sync_signals = bitarray() # {Signal} - self._user_signals = bitarray() # {Signal} - self._domain_signals = dict() # ClockDomain -> {Signal} - - self._started = False - self._timestamp = 0. - self._delta = 0. - self._epsilon = 1e-10 - self._fastest_clock = self._epsilon - self._all_clocks = set() # {str/domain} - self._state = _State() - - self._processes = set() # {process} - self._process_loc = dict() # process -> str/loc - self._passive = set() # {process} - self._suspended = set() # {process} - self._wait_deadline = dict() # process -> float/timestamp - self._wait_tick = dict() # process -> str/domain - - self._funclets = list() # int/slot -> set(lambda) - - self._vcd_file = vcd_file - self._vcd_writer = None - self._vcd_signals = list() # int/slot -> set(vcd_signal) - self._vcd_names = list() # int/slot -> str/name - self._gtkw_file = gtkw_file - self._traces = traces - - self._run_called = False + emitter = _Emitter() + emitter.append(f"def run():") + emitter._level += 1 - @staticmethod - def _check_process(process): - if inspect.isgeneratorfunction(process): - process = process() - if not (inspect.isgenerator(process) or inspect.iscoroutine(process)): - raise TypeError("Cannot add a process {!r} because it is not a generator or " - "a generator function" - .format(process)) - return process + if domain_name is None: + for signal in domain_signals: + signal_index = domain_process.context.get_signal(signal) + emitter.append(f"next_{signal_index} = {signal.reset}") - def _name_process(self, process): - if process in self._process_loc: - return self._process_loc[process] - else: - if inspect.isgenerator(process): - frame = process.gi_frame - if inspect.iscoroutine(process): - frame = process.cr_frame - return "{}:{}".format(inspect.getfile(frame), inspect.getlineno(frame)) + inputs = SignalSet() + _StatementCompiler(domain_process.context, emitter, inputs=inputs)(domain_stmts) - def add_process(self, process): - process = self._check_process(process) - self._processes.add(process) + for input in inputs: + self.state.for_signal(input).wait(domain_process) - def add_sync_process(self, process, domain="sync"): - process = self._check_process(process) - def sync_process(): + else: + domain = fragment.domains[domain_name] + add_signal_name(domain.clk) + if domain.rst is not None: + add_signal_name(domain.rst) + + clk_trigger = 1 if domain.clk_edge == "pos" else 0 + self.state.for_signal(domain.clk).wait(domain_process, trigger=clk_trigger) + if domain.rst is not None and domain.async_reset: + rst_trigger = 1 + self.state.for_signal(domain.rst).wait(domain_process, trigger=rst_trigger) + + gen_asserts = [] + clk_index = domain_process.context.get_signal(domain.clk) + gen_asserts.append(f"slots[{clk_index}].curr == {clk_trigger}") + if domain.rst is not None and domain.async_reset: + rst_index = domain_process.context.get_signal(domain.rst) + gen_asserts.append(f"slots[{rst_index}].curr == {rst_trigger}") + emitter.append(f"assert {' or '.join(gen_asserts)}") + + for signal in domain_signals: + signal_index = domain_process.context.get_signal(signal) + emitter.append(f"next_{signal_index} = slots[{signal_index}].next") + + _StatementCompiler(domain_process.context, emitter)(domain_stmts) + + for signal in domain_signals: + signal_index = domain_process.context.get_signal(signal) + emitter.append(f"slots[{signal_index}].set(next_{signal_index})") + + exec_locals = {"slots": domain_process.context.slots, **_ValueCompiler.helpers} + exec(emitter.flush(), exec_locals) + domain_process.run = exec_locals["run"] + + processes.add(domain_process) + + for subfragment_index, (subfragment, subfragment_name) in enumerate(fragment.subfragments): + if subfragment_name is None: + subfragment_name = "U${}".format(subfragment_index) + processes.update(self(subfragment, hierarchy=(*hierarchy, subfragment_name))) + + return processes + + +class _CoroutineProcess(_Process): + def __init__(self, state, domains, constructor, *, default_cmd=None): + self.state = state + self.domains = domains + self.constructor = constructor + self.default_cmd = default_cmd + self.reset() + + def reset(self): + self.runnable = True + self.passive = False + self.coroutine = self.constructor() + self.eval_context = _EvalContext(self.state) + self.exec_locals = { + "slots": self.eval_context.slots, + "result": None, + **_ValueCompiler.helpers + } + self.waits_on = set() + + @property + def name(self): + coroutine = self.coroutine + while coroutine.gi_yieldfrom is not None: + coroutine = coroutine.gi_yieldfrom + if inspect.isgenerator(coroutine): + frame = coroutine.gi_frame + if inspect.iscoroutine(coroutine): + frame = coroutine.cr_frame + return "{}:{}".format(inspect.getfile(frame), inspect.getlineno(frame)) + + def get_in_signal(self, signal, *, trigger=None): + signal_state = self.state.for_signal(signal) + assert self not in signal_state.waiters + signal_state.waiters[self] = trigger + self.waits_on.add(signal_state) + return signal_state + + def run(self): + if self.coroutine is None: + return + + if self.waits_on: + for signal_state in self.waits_on: + del signal_state.waiters[self] + self.waits_on.clear() + + response = None + while True: try: - cmd = None - while True: - if cmd is None: - cmd = Tick(domain) - result = yield cmd - self._process_loc[sync_process] = self._name_process(process) - cmd = process.send(result) - except StopIteration: - pass - sync_process = sync_process() - self.add_process(sync_process) + command = self.coroutine.send(response) + if command is None: + command = self.default_cmd + response = None + + if isinstance(command, Value): + exec(_RHSValueCompiler.compile(self.eval_context, command, mode="curr"), + self.exec_locals) + response = Const.normalize(self.exec_locals["result"], command.shape()) + + elif isinstance(command, Statement): + exec(_StatementCompiler.compile(self.eval_context, command), + self.exec_locals) + + elif type(command) is Tick: + domain = command.domain + if isinstance(domain, ClockDomain): + pass + elif domain in self.domains: + domain = self.domains[domain] + else: + raise NameError("Received command {!r} that refers to a nonexistent " + "domain {!r} from process {!r}" + .format(command, command.domain, self.name)) + self.get_in_signal(domain.clk, trigger=1 if domain.clk_edge == "pos" else 0) + if domain.rst is not None and domain.async_reset: + self.get_in_signal(domain.rst, trigger=1) + return - def add_clock(self, period, *, phase=None, domain="sync", if_exists=False): - if self._fastest_clock == self._epsilon or period < self._fastest_clock: - self._fastest_clock = period - if domain in self._all_clocks: - raise ValueError("Domain '{}' already has a clock driving it" - .format(domain)) + elif type(command) is Settle: + self.state.deadlines[self] = None + return - half_period = period / 2 - if phase is None: - phase = half_period - for domain_obj in self._domains: - if not domain_obj.local and domain_obj.name == domain: - clk = domain_obj.clk - break - else: - if if_exists: - return - else: - raise ValueError("Domain '{}' is not present in simulation" - .format(domain)) - def clk_process(): - yield Passive() - yield Delay(phase) - while True: - yield clk.eq(1) - yield Delay(half_period) - yield clk.eq(0) - yield Delay(half_period) - self.add_process(clk_process) - self._all_clocks.add(domain) + elif type(command) is Delay: + if command.interval is None: + self.state.deadlines[self] = None + else: + self.state.deadlines[self] = self.state.timestamp + command.interval + return - def __enter__(self): - if self._vcd_file: - self._vcd_writer = VCDWriter(self._vcd_file, timescale="100 ps", - comment="Generated by nMigen") - - root_fragment = self._fragment.prepare() - - hierarchy = {} - domains = set() - def add_fragment(fragment, scope=()): - hierarchy[fragment] = scope - domains.update(fragment.domains.values()) - for index, (subfragment, name) in enumerate(fragment.subfragments): - if name is None: - add_fragment(subfragment, (*scope, "U{}".format(index))) - else: - add_fragment(subfragment, (*scope, name)) - add_fragment(root_fragment, scope=("top",)) - self._domains = list(domains) - self._clk_edges = {domain: 1 if domain.clk_edge == "pos" else 0 for domain in domains} - - def add_signal(signal): - if signal not in self._signals: - self._signals.add(signal) - - signal_slot = self._state.add(normalize(signal.reset, signal.shape())) - self._signal_slots[signal] = signal_slot - self._slot_signals.append(signal) - - self._comb_signals.append(False) - self._sync_signals.append(False) - self._user_signals.append(False) - for domain in self._domains: - if domain not in self._domain_signals: - self._domain_signals[domain] = bitarray() - self._domain_signals[domain].append(False) - - self._funclets.append(set()) - - self._domain_triggers.append(None) - if self._vcd_writer: - self._vcd_signals.append(set()) - self._vcd_names.append(None) - - return self._signal_slots[signal] - - def add_domain_signal(signal, domain): - signal_slot = add_signal(signal) - self._domain_triggers[signal_slot] = domain - - for fragment, fragment_scope in hierarchy.items(): - for signal in fragment.iter_signals(): - add_signal(signal) - - for domain_name, domain in fragment.domains.items(): - add_domain_signal(domain.clk, domain) - if domain.rst is not None: - add_domain_signal(domain.rst, domain) + elif type(command) is Passive: + self.passive = True - for fragment, fragment_scope in hierarchy.items(): - for signal in fragment.iter_signals(): - if not self._vcd_writer: - continue + elif type(command) is Active: + self.passive = False - signal_slot = self._signal_slots[signal] + elif command is None: # only possible if self.default_cmd is None + raise TypeError("Received default command from process {!r} that was added " + "with add_process(); did you mean to add this process with " + "add_sync_process() instead?" + .format(self.name)) - for i, (subfragment, name) in enumerate(fragment.subfragments): - if signal in subfragment.ports: - var_name = "{}_{}".format(name or "U{}".format(i), signal.name) - break else: - var_name = signal.name + raise TypeError("Received unsupported command {!r} from process {!r}" + .format(command, self.name)) - if signal.decoder: - var_type = "string" - var_size = 1 - var_init = signal.decoder(signal.reset).expandtabs().replace(" ", "_") - else: - var_type = "wire" - var_size = signal.width - var_init = signal.reset + except StopIteration: + self.passive = True + self.coroutine = None + return - suffix = None - while True: - try: - if suffix is None: - var_name_suffix = var_name - else: - var_name_suffix = "{}${}".format(var_name, suffix) - self._vcd_signals[signal_slot].add(self._vcd_writer.register_var( - scope=".".join(fragment_scope), name=var_name_suffix, - var_type=var_type, size=var_size, init=var_init)) - if self._vcd_names[signal_slot] is None: - self._vcd_names[signal_slot] = \ - ".".join(fragment_scope + (var_name_suffix,)) - break - except KeyError: - suffix = (suffix or 0) + 1 + except Exception as exn: + self.coroutine.throw(exn) - for domain_name, signals in fragment.drivers.items(): - signals_bits = bitarray(len(self._signals)) - signals_bits.setall(False) - for signal in signals: - signals_bits[self._signal_slots[signal]] = True - if domain_name is None: - self._comb_signals |= signals_bits - else: - self._sync_signals |= signals_bits - self._domain_signals[fragment.domains[domain_name]] |= signals_bits - - statements = [] - for domain_name, signals in fragment.drivers.items(): - reset_stmts = [] - hold_stmts = [] - for signal in signals: - reset_stmts.append(signal.eq(signal.reset)) - hold_stmts .append(signal.eq(signal)) - - if domain_name is None: - statements += reset_stmts - else: - if fragment.domains[domain_name].async_reset: - statements.append(Switch(fragment.domains[domain_name].rst, - {0: hold_stmts, 1: reset_stmts})) - else: - statements += hold_stmts - statements += fragment.statements +class _WaveformContextManager: + def __init__(self, state, waveform_writer): + self._state = state + self._waveform_writer = waveform_writer + + def __enter__(self): + try: + self._state.start_waveform(self._waveform_writer) + except: + self._waveform_writer.close(0) + raise - compiler = _StatementCompiler(self._signal_slots) - funclet = compiler(statements) + def __exit__(self, *args): + self._state.finish_waveform() - def add_funclet(signal, funclet): - if signal in self._signal_slots: - self._funclets[self._signal_slots[signal]].add(funclet) - for signal in compiler.sensitivity: - add_funclet(signal, funclet) - for domain in fragment.domains.values(): - add_funclet(domain.clk, funclet) - if domain.rst is not None: - add_funclet(domain.rst, funclet) +class Simulator: + def __init__(self, fragment, **kwargs): + self._state = _SimulatorState() + self._signal_names = SignalDict() + self._fragment = Fragment.get(fragment, platform=None).prepare() + self._processes = _FragmentCompiler(self._state, self._signal_names)(self._fragment) + if kwargs: # :nocov: + # TODO(nmigen-0.3): remove + self._state.start_waveform(_VCDWaveformWriter(self._signal_names, **kwargs)) + self._clocked = set() + + def _check_process(self, process): + if not (inspect.isgeneratorfunction(process) or inspect.iscoroutinefunction(process)): + if inspect.isgenerator(process) or inspect.iscoroutine(process): + warnings.warn("instead of generators, use generator functions as processes; " + "this allows the simulator to be repeatedly reset", + DeprecationWarning, stacklevel=3) + def wrapper(): + yield from process + return wrapper + else: + raise TypeError("Cannot add a process {!r} because it is not a generator function" + .format(process)) + return process - self._user_signals = bitarray(len(self._signals)) - self._user_signals.setall(True) - self._user_signals &= ~self._comb_signals - self._user_signals &= ~self._sync_signals + def _add_coroutine_process(self, process, *, default_cmd): + self._processes.add(_CoroutineProcess(self._state, self._fragment.domains, process, + default_cmd=default_cmd)) - return self + def add_process(self, process): + process = self._check_process(process) + def wrapper(): + # Only start a bench process after comb settling, so that the reset values are correct. + yield Settle() + yield from process() + self._add_coroutine_process(wrapper, default_cmd=None) + + def add_sync_process(self, process, *, domain="sync"): + process = self._check_process(process) + def wrapper(): + # Only start a sync process after the first clock edge (or reset edge, if the domain + # uses an asynchronous reset). This matches the behavior of synchronous FFs. + yield Tick(domain) + yield from process() + return self._add_coroutine_process(wrapper, default_cmd=Tick(domain)) - def _update_dirty_signals(self): - """Perform the statement part of IR processes (aka RTLIL case).""" - # First, for all dirty signals, use sensitivity lists to determine the set of fragments - # that need their statements to be reevaluated because the signals changed at the previous - # delta cycle. - funclets = set() - for signal_slot in self._state.flush_curr_dirty(): - funclets.update(self._funclets[signal_slot]) - - # Second, compute the values of all signals at the start of the next delta cycle, by - # running precompiled statements. - for funclet in funclets: - funclet(self._state) - - def _commit_signal(self, signal_slot, domains): - """Perform the driver part of IR processes (aka RTLIL sync), for individual signals.""" - # Take the computed value (at the start of this delta cycle) of a signal (that could have - # come from an IR process that ran earlier, or modified by a simulator process) and update - # the value for this delta cycle. - old, new = self._state.commit(signal_slot) - if old == new: + def add_clock(self, period, *, phase=None, domain="sync", if_exists=False): + """Add a clock process. + + Adds a process that drives the clock signal of ``domain`` at a 50% duty cycle. + + Arguments + --------- + period : float + Clock period. The process will toggle the ``domain`` clock signal every ``period / 2`` + seconds. + phase : None or float + Clock phase. The process will wait ``phase`` seconds before the first clock transition. + If not specified, defaults to ``period / 2``. + domain : str or ClockDomain + Driven clock domain. If specified as a string, the domain with that name is looked up + in the root fragment of the simulation. + if_exists : bool + If ``False`` (the default), raise an error if the driven domain is specified as + a string and the root fragment does not have such a domain. If ``True``, do nothing + in this case. + """ + if isinstance(domain, ClockDomain): + pass + elif domain in self._fragment.domains: + domain = self._fragment.domains[domain] + elif if_exists: return + else: + raise ValueError("Domain {!r} is not present in simulation" + .format(domain)) + if domain in self._clocked: + raise ValueError("Domain {!r} already has a clock driving it" + .format(domain.name)) - # If the signal is a clock that triggers synchronous logic, record that fact. - if (self._domain_triggers[signal_slot] is not None and - self._clk_edges[self._domain_triggers[signal_slot]] == new): - domains.add(self._domain_triggers[signal_slot]) - - if self._vcd_writer: - # Finally, dump the new value to the VCD file. - for vcd_signal in self._vcd_signals[signal_slot]: - signal = self._slot_signals[signal_slot] - if signal.decoder: - var_value = signal.decoder(new).expandtabs().replace(" ", "_") - else: - var_value = new - vcd_timestamp = (self._timestamp + self._delta) / self._epsilon - self._vcd_writer.change(vcd_signal, vcd_timestamp, var_value) - - def _commit_comb_signals(self, domains): - """Perform the comb part of IR processes (aka RTLIL always).""" - # Take the computed value (at the start of this delta cycle) of every comb signal and - # update the value for this delta cycle. - for signal_slot in self._state.iter_next_dirty(): - if self._comb_signals[signal_slot]: - self._commit_signal(signal_slot, domains) - - def _commit_sync_signals(self, domains): - """Perform the sync part of IR processes (aka RTLIL posedge).""" - # At entry, `domains` contains a set of every simultaneously triggered sync update. - while domains: - # Advance the timeline a bit (purely for observational purposes) and commit all of them - # at the same timestamp. - self._delta += self._epsilon - curr_domains, domains = domains, set() - - while curr_domains: - domain = curr_domains.pop() - - # Wake up any simulator processes that wait for a domain tick. - for process, wait_domain_name in list(self._wait_tick.items()): - if domain.name == wait_domain_name: - del self._wait_tick[process] - self._suspended.remove(process) - - # Immediately run the process. It is important that this happens here, - # and not on the next step, when all the processes will run anyway, - # because Tick() simulates an edge triggered process. Like DFFs that latch - # a value from the previous clock cycle, simulator processes observe signal - # values from the previous clock cycle on a tick, too. - self._run_process(process) - - # Take the computed value (at the start of this delta cycle) of every sync signal - # in this domain and update the value for this delta cycle. This can trigger more - # synchronous logic, so record that. - for signal_slot in self._state.iter_next_dirty(): - if self._domain_signals[domain][signal_slot]: - self._commit_signal(signal_slot, domains) - - # Unless handling synchronous logic above has triggered more synchronous logic (which - # can happen e.g. if a domain is clocked off a clock divisor in fabric), we're done. - # Otherwise, do one more round of updates. - - def _run_process(self, process): - try: - cmd = process.send(None) + half_period = period / 2 + if phase is None: + # By default, delay the first edge by half period. This causes any synchronous activity + # to happen at a non-zero time, distinguishing it from the reset values in the waveform + # viewer. + phase = half_period + def clk_process(): + yield Passive() + yield Delay(phase) + # Behave correctly if the process is added after the clock signal is manipulated, or if + # its reset state is high. + initial = (yield domain.clk) while True: - if type(cmd) is Delay: - if cmd.interval is None: - interval = self._epsilon - else: - interval = cmd.interval - self._wait_deadline[process] = self._timestamp + interval - self._suspended.add(process) - break - - elif type(cmd) is Tick: - self._wait_tick[process] = cmd.domain - self._suspended.add(process) - break - - elif type(cmd) is Passive: - self._passive.add(process) - - elif type(cmd) is Assign: - lhs_signals = cmd.lhs._lhs_signals() - for signal in lhs_signals: - if not signal in self._signals: - raise ValueError("Process '{}' sent a request to set signal {!r}, " - "which is not a part of simulation" - .format(self._name_process(process), signal)) - signal_slot = self._signal_slots[signal] - if self._comb_signals[signal_slot]: - raise ValueError("Process '{}' sent a request to set signal {!r}, " - "which is a part of combinatorial assignment in " - "simulation" - .format(self._name_process(process), signal)) - - if type(cmd.lhs) is Signal and type(cmd.rhs) is Const: - # Fast path. - self._state.set(self._signal_slots[cmd.lhs], - normalize(cmd.rhs.value, cmd.lhs.shape())) - else: - compiler = _StatementCompiler(self._signal_slots) - funclet = compiler(cmd) - funclet(self._state) - - domains = set() - for signal in lhs_signals: - self._commit_signal(self._signal_slots[signal], domains) - self._commit_sync_signals(domains) - - elif type(cmd) is Signal: - # Fast path. - cmd = process.send(self._state.curr[self._signal_slots[cmd]]) - continue - - elif isinstance(cmd, Value): - compiler = _RHSValueCompiler(self._signal_slots) - funclet = compiler(cmd) - cmd = process.send(funclet(self._state)) - continue + yield domain.clk.eq(~initial) + yield Delay(half_period) + yield domain.clk.eq(initial) + yield Delay(half_period) + self._add_coroutine_process(clk_process, default_cmd=None) + self._clocked.add(domain) + + def reset(self): + """Reset the simulation. + + Assign the reset value to every signal in the simulation, and restart every user process. + """ + self._state.reset() + for process in self._processes: + process.reset() + + def _delta(self): + """Perform a delta cycle. + + Performs the two phases of a delta cycle: + 1. run and suspend every non-waiting process once, queueing signal changes; + 2. commit every queued signal change, waking up any waiting process. + """ + for process in self._processes: + if process.runnable: + process.runnable = False + process.run() + + return self._state.commit() + + def _settle(self): + """Settle the simulation. + + Run every process and commit changes until a fixed point is reached. If there is + an unstable combinatorial loop, this function will never return. + """ + while self._delta(): + pass - else: - raise TypeError("Received unsupported command {!r} from process '{}'" - .format(cmd, self._name_process(process))) - - cmd = process.send(None) - - except StopIteration: - self._processes.remove(process) - self._passive.discard(process) - - except Exception as e: - process.throw(e) - - def step(self, run_passive=False): - # Are there any delta cycles we should run? - if self._state.curr_dirty.any(): - # We might run some delta cycles, and we have simulator processes waiting on - # a deadline. Take care to not exceed the closest deadline. - if self._wait_deadline and \ - (self._timestamp + self._delta) >= min(self._wait_deadline.values()): - # Oops, we blew the deadline. We *could* run the processes now, but this is - # virtually certainly a logic loop and a design bug, so bail out instead.d - raise DeadlineError("Delta cycles exceeded process deadline; combinatorial loop?") - - domains = set() - while self._state.curr_dirty.any(): - self._update_dirty_signals() - self._commit_comb_signals(domains) - self._commit_sync_signals(domains) - return True - - # Are there any processes that haven't had a chance to run yet? - if len(self._processes) > len(self._suspended): - # Schedule an arbitrary one. - process = (self._processes - set(self._suspended)).pop() - self._run_process(process) - return True - - # All processes are suspended. Are any of them active? - if len(self._processes) > len(self._passive) or run_passive: - # Are any of them suspended before a deadline? - if self._wait_deadline: - # Schedule the one with the lowest deadline. - process, deadline = min(self._wait_deadline.items(), key=lambda x: x[1]) - del self._wait_deadline[process] - self._suspended.remove(process) - self._timestamp = deadline - self._delta = 0. - self._run_process(process) - return True - - # No processes, or all processes are passive. Nothing to do! - return False + def step(self): + """Step the simulation. + + Run every process and commit changes until a fixed point is reached, then advance time + to the closest deadline (if any). If there is an unstable combinatorial loop, + this function will never return. + + Returns ``True`` if there are any active processes, ``False`` otherwise. + """ + self._settle() + self._state.advance() + return any(not process.passive for process in self._processes) def run(self): - self._run_called = True + """Run the simulation while any processes are active. + Processes added with :meth:`add_process` and :meth:`add_sync_process` are initially active, + and may change their status using the ``yield Passive()`` and ``yield Active()`` commands. + Processes compiled from HDL and added with :meth:`add_clock` are always passive. + """ while self.step(): pass - def run_until(self, deadline, run_passive=False): - self._run_called = True + def run_until(self, deadline, *, run_passive=False): + """Run the simulation until it advances to ``deadline``. - while self._timestamp < deadline: - if not self.step(run_passive): - return False + If ``run_passive`` is ``False``, the simulation also stops when there are no active + processes, similar to :meth:`run`. Otherwise, the simulation will stop only after it + advances to or past ``deadline``. - return True + If the simulation stops advancing, this function will never return. + """ + assert self._state.timestamp <= deadline + while (self.step() or run_passive) and self._state.timestamp < deadline: + pass - def __exit__(self, *args): - if not self._run_called: - warnings.warn("Simulation created, but not run", UserWarning) - - if self._vcd_writer: - vcd_timestamp = (self._timestamp + self._delta) / self._epsilon - self._vcd_writer.close(vcd_timestamp) - - if self._vcd_file and self._gtkw_file: - gtkw_save = GTKWSave(self._gtkw_file) - if hasattr(self._vcd_file, "name"): - gtkw_save.dumpfile(self._vcd_file.name) - if hasattr(self._vcd_file, "tell"): - gtkw_save.dumpfile_size(self._vcd_file.tell()) - - gtkw_save.treeopen("top") - gtkw_save.zoom_markers(math.log(self._epsilon / self._fastest_clock) - 14) - - def add_trace(signal, **kwargs): - signal_slot = self._signal_slots[signal] - if self._vcd_names[signal_slot] is not None: - if len(signal) > 1 and not signal.decoder: - suffix = "[{}:0]".format(len(signal) - 1) - else: - suffix = "" - gtkw_save.trace(self._vcd_names[signal_slot] + suffix, **kwargs) - - for domain in self._domains: - with gtkw_save.group("d.{}".format(domain.name)): - if domain.rst is not None: - add_trace(domain.rst) - add_trace(domain.clk) - - for signal in self._traces: - add_trace(signal) - - if self._vcd_file: - self._vcd_file.close() - if self._gtkw_file: - self._gtkw_file.close() + def write_vcd(self, vcd_file, gtkw_file=None, *, traces=()): + """Write waveforms to a Value Change Dump file, optionally populating a GTKWave save file. + + This method returns a context manager. It can be used as: :: + + sim = Simulator(frag) + sim.add_clock(1e-6) + with sim.write_vcd("dump.vcd", "dump.gtkw"): + sim.run_until(1e-3) + + Arguments + --------- + vcd_file : str or file-like object + Verilog Value Change Dump file or filename. + gtkw_file : str or file-like object + GTKWave save file or filename. + traces : iterable of Signal + Signals to display traces for. + """ + waveform_writer = _VCDWaveformWriter(self._signal_names, + vcd_file=vcd_file, gtkw_file=gtkw_file, traces=traces) + return _WaveformContextManager(self._state, waveform_writer) + + # TODO(nmigen-0.3): remove + @deprecated("instead of `with Simulator(fragment, ...) as sim:`, use " + "`sim = Simulator(fragment); with sim.write_vcd(...):`") + def __enter__(self): # :nocov: + return self + + # TODO(nmigen-0.3): remove + def __exit__(self, *args): # :nocov: + self._state.finish_waveform() diff --git a/nmigen/compat/sim/__init__.py b/nmigen/compat/sim/__init__.py index 9a3a5aa..c6a89e3 100644 --- a/nmigen/compat/sim/__init__.py +++ b/nmigen/compat/sim/__init__.py @@ -21,15 +21,24 @@ def run_simulation(fragment_or_module, generators, clocks={"sync": 10}, vcd_name generators = {"sync": generators} fragment.domains += ClockDomain("sync") - with Simulator(fragment, vcd_file=open(vcd_name, "w") if vcd_name else None) as sim: - for domain, period in clocks.items(): - sim.add_clock(period / 1e9, domain=domain) - for domain, processes in generators.items(): - if isinstance(processes, Iterable) and not inspect.isgenerator(processes): - for process in processes: - sim.add_sync_process(process, domain=domain) - else: - sim.add_sync_process(processes, domain=domain) + sim = Simulator(fragment) + for domain, period in clocks.items(): + sim.add_clock(period / 1e9, domain=domain) + for domain, processes in generators.items(): + def wrap(process): + def wrapper(): + yield from process + return wrapper + if isinstance(processes, Iterable) and not inspect.isgenerator(processes): + for process in processes: + sim.add_sync_process(wrap(process), domain=domain) + else: + sim.add_sync_process(wrap(processes), domain=domain) + + if vcd_name is not None: + with sim.write_vcd(vcd_name): + sim.run() + else: sim.run() diff --git a/nmigen/test/test_lib_cdc.py b/nmigen/test/test_lib_cdc.py index 02403b6..8c83241 100644 --- a/nmigen/test/test_lib_cdc.py +++ b/nmigen/test/test_lib_cdc.py @@ -19,37 +19,39 @@ class FFSynchronizerTestCase(FHDLTestCase): i = Signal() o = Signal() frag = FFSynchronizer(i, o) - with Simulator(frag) as sim: - sim.add_clock(1e-6) - def process(): - self.assertEqual((yield o), 0) - yield i.eq(1) - yield Tick() - self.assertEqual((yield o), 0) - yield Tick() - self.assertEqual((yield o), 0) - yield Tick() - self.assertEqual((yield o), 1) - sim.add_process(process) - sim.run() + + sim = Simulator(frag) + sim.add_clock(1e-6) + def process(): + self.assertEqual((yield o), 0) + yield i.eq(1) + yield Tick() + self.assertEqual((yield o), 0) + yield Tick() + self.assertEqual((yield o), 0) + yield Tick() + self.assertEqual((yield o), 1) + sim.add_process(process) + sim.run() def test_reset_value(self): i = Signal(reset=1) o = Signal() frag = FFSynchronizer(i, o, reset=1) - with Simulator(frag) as sim: - sim.add_clock(1e-6) - def process(): - self.assertEqual((yield o), 1) - yield i.eq(0) - yield Tick() - self.assertEqual((yield o), 1) - yield Tick() - self.assertEqual((yield o), 1) - yield Tick() - self.assertEqual((yield o), 0) - sim.add_process(process) - sim.run() + + sim = Simulator(frag) + sim.add_clock(1e-6) + def process(): + self.assertEqual((yield o), 1) + yield i.eq(0) + yield Tick() + self.assertEqual((yield o), 1) + yield Tick() + self.assertEqual((yield o), 1) + yield Tick() + self.assertEqual((yield o), 0) + sim.add_process(process) + sim.run() class ResetSynchronizerTestCase(FHDLTestCase): @@ -69,31 +71,32 @@ class ResetSynchronizerTestCase(FHDLTestCase): s = Signal(reset=1) m.d.sync += s.eq(0) - with Simulator(m, vcd_file=open("test.vcd", "w")) as sim: - sim.add_clock(1e-6) - def process(): - # initial reset - self.assertEqual((yield s), 1) - yield Tick(); yield Delay(1e-8) - self.assertEqual((yield s), 1) - yield Tick(); yield Delay(1e-8) - self.assertEqual((yield s), 1) - yield Tick(); yield Delay(1e-8) - self.assertEqual((yield s), 0) - yield Tick(); yield Delay(1e-8) + sim = Simulator(m) + sim.add_clock(1e-6) + def process(): + # initial reset + self.assertEqual((yield s), 1) + yield Tick(); yield Delay(1e-8) + self.assertEqual((yield s), 1) + yield Tick(); yield Delay(1e-8) + self.assertEqual((yield s), 1) + yield Tick(); yield Delay(1e-8) + self.assertEqual((yield s), 0) + yield Tick(); yield Delay(1e-8) - yield arst.eq(1) - yield Delay(1e-8) - self.assertEqual((yield s), 1) - yield Tick(); yield Delay(1e-8) - self.assertEqual((yield s), 1) - yield arst.eq(0) - yield Tick(); yield Delay(1e-8) - self.assertEqual((yield s), 1) - yield Tick(); yield Delay(1e-8) - self.assertEqual((yield s), 1) - yield Tick(); yield Delay(1e-8) - self.assertEqual((yield s), 0) - yield Tick(); yield Delay(1e-8) - sim.add_process(process) + yield arst.eq(1) + yield Delay(1e-8) + self.assertEqual((yield s), 0) + yield Tick(); yield Delay(1e-8) + self.assertEqual((yield s), 1) + yield arst.eq(0) + yield Tick(); yield Delay(1e-8) + self.assertEqual((yield s), 1) + yield Tick(); yield Delay(1e-8) + self.assertEqual((yield s), 1) + yield Tick(); yield Delay(1e-8) + self.assertEqual((yield s), 0) + yield Tick(); yield Delay(1e-8) + sim.add_process(process) + with sim.write_vcd("test.vcd"): sim.run() diff --git a/nmigen/test/test_lib_coding.py b/nmigen/test/test_lib_coding.py index 874b87c..cf582e3 100644 --- a/nmigen/test/test_lib_coding.py +++ b/nmigen/test/test_lib_coding.py @@ -8,78 +8,78 @@ from ..lib.coding import * class EncoderTestCase(FHDLTestCase): def test_basic(self): enc = Encoder(4) - with Simulator(enc) as sim: - def process(): - self.assertEqual((yield enc.n), 1) - self.assertEqual((yield enc.o), 0) + def process(): + self.assertEqual((yield enc.n), 1) + self.assertEqual((yield enc.o), 0) - yield enc.i.eq(0b0001) - yield Delay() - self.assertEqual((yield enc.n), 0) - self.assertEqual((yield enc.o), 0) + yield enc.i.eq(0b0001) + yield Settle() + self.assertEqual((yield enc.n), 0) + self.assertEqual((yield enc.o), 0) - yield enc.i.eq(0b0100) - yield Delay() - self.assertEqual((yield enc.n), 0) - self.assertEqual((yield enc.o), 2) + yield enc.i.eq(0b0100) + yield Settle() + self.assertEqual((yield enc.n), 0) + self.assertEqual((yield enc.o), 2) - yield enc.i.eq(0b0110) - yield Delay() - self.assertEqual((yield enc.n), 1) - self.assertEqual((yield enc.o), 0) + yield enc.i.eq(0b0110) + yield Settle() + self.assertEqual((yield enc.n), 1) + self.assertEqual((yield enc.o), 0) - sim.add_process(process) - sim.run() + sim = Simulator(enc) + sim.add_process(process) + sim.run() class PriorityEncoderTestCase(FHDLTestCase): def test_basic(self): enc = PriorityEncoder(4) - with Simulator(enc) as sim: - def process(): - self.assertEqual((yield enc.n), 1) - self.assertEqual((yield enc.o), 0) + def process(): + self.assertEqual((yield enc.n), 1) + self.assertEqual((yield enc.o), 0) - yield enc.i.eq(0b0001) - yield Delay() - self.assertEqual((yield enc.n), 0) - self.assertEqual((yield enc.o), 0) + yield enc.i.eq(0b0001) + yield Settle() + self.assertEqual((yield enc.n), 0) + self.assertEqual((yield enc.o), 0) - yield enc.i.eq(0b0100) - yield Delay() - self.assertEqual((yield enc.n), 0) - self.assertEqual((yield enc.o), 2) + yield enc.i.eq(0b0100) + yield Settle() + self.assertEqual((yield enc.n), 0) + self.assertEqual((yield enc.o), 2) - yield enc.i.eq(0b0110) - yield Delay() - self.assertEqual((yield enc.n), 0) - self.assertEqual((yield enc.o), 1) + yield enc.i.eq(0b0110) + yield Settle() + self.assertEqual((yield enc.n), 0) + self.assertEqual((yield enc.o), 1) - sim.add_process(process) - sim.run() + sim = Simulator(enc) + sim.add_process(process) + sim.run() class DecoderTestCase(FHDLTestCase): def test_basic(self): dec = Decoder(4) - with Simulator(dec) as sim: - def process(): - self.assertEqual((yield dec.o), 0b0001) + def process(): + self.assertEqual((yield dec.o), 0b0001) - yield dec.i.eq(1) - yield Delay() - self.assertEqual((yield dec.o), 0b0010) + yield dec.i.eq(1) + yield Settle() + self.assertEqual((yield dec.o), 0b0010) - yield dec.i.eq(3) - yield Delay() - self.assertEqual((yield dec.o), 0b1000) + yield dec.i.eq(3) + yield Settle() + self.assertEqual((yield dec.o), 0b1000) - yield dec.n.eq(1) - yield Delay() - self.assertEqual((yield dec.o), 0b0000) + yield dec.n.eq(1) + yield Settle() + self.assertEqual((yield dec.o), 0b0000) - sim.add_process(process) - sim.run() + sim = Simulator(dec) + sim.add_process(process) + sim.run() class ReversibleSpec(Elaboratable): diff --git a/nmigen/test/test_sim.py b/nmigen/test/test_sim.py index 7ae6d97..93b76c3 100644 --- a/nmigen/test/test_sim.py +++ b/nmigen/test/test_sim.py @@ -1,3 +1,4 @@ +import os from contextlib import contextmanager from .utils import * @@ -25,16 +26,14 @@ class SimulatorUnitTestCase(FHDLTestCase): for signal in flatten(s._lhs_signals() for s in Statement.cast(stmt)): frag.add_driver(signal) - with Simulator(frag, - vcd_file =open("test.vcd", "w"), - gtkw_file=open("test.gtkw", "w"), - traces=[*isigs, osig]) as sim: - def process(): - for isig, input in zip(isigs, inputs): - yield isig.eq(input) - yield Delay() - self.assertEqual((yield osig), output.value) - sim.add_process(process) + sim = Simulator(frag) + def process(): + for isig, input in zip(isigs, inputs): + yield isig.eq(input) + yield Settle() + self.assertEqual((yield osig), output.value) + sim.add_process(process) + with sim.write_vcd("test.vcd", "test.gtkw", traces=[*isigs, osig]): sim.run() def test_invert(self): @@ -213,6 +212,13 @@ class SimulatorUnitTestCase(FHDLTestCase): stmt = lambda y, a: [Cat(l, m, n).eq(a), y.eq(Cat(n, m, l))] self.assertStatement(stmt, [C(0b100101110, 9)], C(0b110101100, 9)) + def test_nested_cat_lhs(self): + l = Signal(3) + m = Signal(3) + n = Signal(3) + stmt = lambda y, a: [Cat(Cat(l, Cat(m)), n).eq(a), y.eq(Cat(n, m, l))] + self.assertStatement(stmt, [C(0b100101110, 9)], C(0b110101100, 9)) + def test_record(self): rec = Record([ ("l", 1), @@ -277,8 +283,9 @@ class SimulatorUnitTestCase(FHDLTestCase): class SimulatorIntegrationTestCase(FHDLTestCase): @contextmanager def assertSimulation(self, module, deadline=None): - with Simulator(module) as sim: - yield sim + sim = Simulator(module) + yield sim + with sim.write_vcd("test.vcd", "test.gtkw"): if deadline is None: sim.run() else: @@ -300,11 +307,15 @@ class SimulatorIntegrationTestCase(FHDLTestCase): yield Delay(1e-6) self.assertEqual((yield self.count), 4) yield self.sync.clk.eq(1) + self.assertEqual((yield self.count), 4) + yield Settle() self.assertEqual((yield self.count), 5) yield Delay(1e-6) self.assertEqual((yield self.count), 5) yield self.sync.clk.eq(0) self.assertEqual((yield self.count), 5) + yield Settle() + self.assertEqual((yield self.count), 5) for _ in range(3): yield Delay(1e-6) yield self.sync.clk.eq(1) @@ -328,6 +339,26 @@ class SimulatorIntegrationTestCase(FHDLTestCase): self.assertEqual((yield self.count), 0) sim.add_sync_process(process) + def test_reset(self): + self.setUp_counter() + sim = Simulator(self.m) + sim.add_clock(1e-6) + times = 0 + def process(): + nonlocal times + self.assertEqual((yield self.count), 4) + yield + self.assertEqual((yield self.count), 5) + yield + self.assertEqual((yield self.count), 6) + yield + times += 1 + sim.add_sync_process(process) + sim.run() + sim.reset() + sim.run() + self.assertEqual(times, 2) + def setUp_alu(self): self.a = Signal(8) self.b = Signal(8) @@ -406,7 +437,7 @@ class SimulatorIntegrationTestCase(FHDLTestCase): def process(): yield self.i.eq(0b10101010) yield self.i[:4].eq(-1) - yield Delay() + yield Settle() self.assertEqual((yield self.i[:4]), 0b1111) self.assertEqual((yield self.i), 0b10101111) sim.add_process(process) @@ -426,10 +457,18 @@ class SimulatorIntegrationTestCase(FHDLTestCase): def test_add_process_wrong(self): with self.assertSimulation(Module()) as sim: with self.assertRaises(TypeError, - msg="Cannot add a process 1 because it is not a generator or " - "a generator function"): + msg="Cannot add a process 1 because it is not a generator function"): sim.add_process(1) + def test_add_process_wrong_generator(self): + with self.assertSimulation(Module()) as sim: + with self.assertWarns(DeprecationWarning, + msg="instead of generators, use generator functions as processes; " + "this allows the simulator to be repeatedly reset"): + def process(): + yield Delay() + sim.add_process(process()) + def test_add_clock_wrong_twice(self): m = Module() s = Signal() @@ -452,37 +491,18 @@ class SimulatorIntegrationTestCase(FHDLTestCase): with self.assertSimulation(m) as sim: sim.add_clock(1, if_exists=True) - def test_eq_signal_unused_wrong(self): - self.setUp_lhs_rhs() - self.s = Signal() - with self.assertSimulation(self.m) as sim: - def process(): - with self.assertRaisesRegex(ValueError, - regex=r"Process .+? sent a request to set signal \(sig s\), " - r"which is not a part of simulation"): - yield self.s.eq(0) - yield Delay() - sim.add_process(process) - - def test_eq_signal_comb_wrong(self): - self.setUp_lhs_rhs() - with self.assertSimulation(self.m) as sim: - def process(): - with self.assertRaisesRegex(ValueError, - regex=r"Process .+? sent a request to set signal \(sig o\), " - r"which is a part of combinatorial assignment in simulation"): - yield self.o.eq(0) - yield Delay() - sim.add_process(process) - def test_command_wrong(self): + survived = False with self.assertSimulation(Module()) as sim: def process(): + nonlocal survived with self.assertRaisesRegex(TypeError, regex=r"Received unsupported command 1 from process .+?"): yield 1 - yield Delay() + yield Settle() + survived = True sim.add_process(process) + self.assertTrue(survived) def setUp_memory(self, rd_synchronous=True, rd_transparent=True, wr_granularity=None): self.m = Module() @@ -558,7 +578,7 @@ class SimulatorIntegrationTestCase(FHDLTestCase): self.assertEqual((yield self.rdport.data), 0xaa) yield self.assertEqual((yield self.rdport.data), 0xaa) - yield Delay(1e-6) # let comb propagate + yield Settle() self.assertEqual((yield self.rdport.data), 0x33) sim.add_clock(1e-6) sim.add_sync_process(process) @@ -571,11 +591,11 @@ class SimulatorIntegrationTestCase(FHDLTestCase): yield self.wrport.en.eq(1) yield self.assertEqual((yield self.rdport.data), 0xaa) - yield Delay(1e-6) # let comb propagate + yield Settle() self.assertEqual((yield self.rdport.data), 0x33) yield yield self.rdport.addr.eq(1) - yield Delay(1e-6) # let comb propagate + yield Settle() self.assertEqual((yield self.rdport.data), 0x33) sim.add_clock(1e-6) sim.add_sync_process(process) @@ -585,10 +605,10 @@ class SimulatorIntegrationTestCase(FHDLTestCase): with self.assertSimulation(self.m) as sim: def process(): yield self.rdport.addr.eq(0) - yield Delay() + yield Settle() self.assertEqual((yield self.rdport.data), 0xaa) yield self.rdport.addr.eq(1) - yield Delay() + yield Settle() self.assertEqual((yield self.rdport.data), 0x55) yield self.rdport.addr.eq(0) yield self.wrport.addr.eq(0) @@ -596,7 +616,7 @@ class SimulatorIntegrationTestCase(FHDLTestCase): yield self.wrport.en.eq(1) yield Tick("sync") self.assertEqual((yield self.rdport.data), 0xaa) - yield Delay(1e-6) # let comb propagate + yield Settle() self.assertEqual((yield self.rdport.data), 0x33) sim.add_clock(1e-6) sim.add_process(process) @@ -661,8 +681,26 @@ class SimulatorIntegrationTestCase(FHDLTestCase): sim.add_sync_process(process_gen) sim.add_sync_process(process_check) - def test_wrong_not_run(self): - with self.assertWarns(UserWarning, - msg="Simulation created, but not run"): - with Simulator(Fragment()) as sim: + def test_vcd_wrong_nonzero_time(self): + s = Signal() + m = Module() + m.d.sync += s.eq(s) + sim = Simulator(m) + sim.add_clock(1e-6) + sim.run_until(1e-5) + with self.assertRaisesRegex(ValueError, + regex=r"^Cannot start writing waveforms after advancing simulation time$"): + with sim.write_vcd(open(os.path.devnull, "wt")): pass + + def test_vcd_wrong_twice(self): + s = Signal() + m = Module() + m.d.sync += s.eq(s) + sim = Simulator(m) + sim.add_clock(1e-6) + with self.assertRaisesRegex(ValueError, + regex=r"^Already writing waveforms to .+$"): + with sim.write_vcd(open(os.path.devnull, "wt")): + with sim.write_vcd(open(os.path.devnull, "wt")): + pass diff --git a/setup.py b/setup.py index e904911..4dfbb83 100644 --- a/setup.py +++ b/setup.py @@ -24,7 +24,11 @@ setup( license="BSD", python_requires="~=3.6", setup_requires=["setuptools_scm"], - install_requires=["setuptools", "pyvcd>=0.1.4", "bitarray", "Jinja2"], + install_requires=[ + "setuptools", + "pyvcd~=0.1.4", # for nmigen.pysim + "Jinja2", # for nmigen.build + ], packages=find_packages(), entry_points={ "console_scripts": [ -- 2.30.2