if __name__ == "__main__":
dut = GrayCounter(3)
- Simulator(dut, tb(dut)).run()
+ run_simulation(dut, tb(dut), vcd_name="graycounter.vcd")
if __name__ == "__main__":
dut = Counter()
- Simulator(dut, counter_test(dut)).run()
+ run_simulation(dut, counter_test(dut), vcd_name="basic1.vcd")
if __name__ == "__main__":
dut = Counter()
- Simulator(dut, counter_test(dut)).run()
+ run_simulation(dut, counter_test(dut), vcd_name="basic2.vcd")
for frequency in [0.05, 0.1, 0.25]:
dut = FIR(coef)
tb = fir_tb(dut, frequency, in_signals, out_signals)
- Simulator(dut, tb).run()
+ run_simulation(dut, tb)
# Plot data from the input and output waveforms.
plt.plot(in_signals)
if __name__ == "__main__":
dut = Mem()
- Simulator(dut, memory_test(dut)).run()
+ run_simulation(dut, memory_test(dut))
+++ /dev/null
-import operator
-
-from migen.fhdl.structure import *
-from migen.fhdl.structure import (_Value, _Statement,
- _Operator, _Slice, _ArrayProxy,
- _Assign, _Fragment)
-from migen.fhdl.bitcontainer import flen
-from migen.fhdl.tools import list_targets
-from migen.fhdl.simplify import MemoryToArray
-from migen.fhdl.specials import _MemoryLocation
-
-
-__all__ = ["Simulator"]
-
-
-class ClockState:
- def __init__(self, period, times_before_tick):
- self.period = period
- self.times_before_tick = times_before_tick
-
-
-class TimeManager:
- def __init__(self, description):
- self.clocks = dict()
-
- for k, v in description.items():
- if not isinstance(v, tuple):
- v = v, 0
- self.clocks[k] = ClockState(v[0], v[0] - v[1])
-
- def tick(self):
- r = set()
- dt = min(cs.times_before_tick for cs in self.clocks.values())
- for k, cs in self.clocks.items():
- if cs.times_before_tick == dt:
- r.add(k)
- cs.times_before_tick -= dt
- if not cs.times_before_tick:
- cs.times_before_tick += cs.period
- return r
-
-
-str2op = {
- "~": operator.invert,
- "+": operator.add,
- "-": operator.sub,
- "*": operator.mul,
-
- ">>>": operator.rshift,
- "<<<": operator.lshift,
-
- "&": operator.and_,
- "^": operator.xor,
- "|": operator.or_,
-
- "<": operator.lt,
- "<=": operator.le,
- "==": operator.eq,
- "!=": operator.ne,
- ">": operator.gt,
- ">=": operator.ge,
-}
-
-
-class Evaluator:
- def __init__(self, replaced_memories):
- self.replaced_memories = replaced_memories
- self.signal_values = dict()
- self.modifications = dict()
-
- def commit(self):
- r = set()
- for k, v in self.modifications.items():
- if k not in self.signal_values or self.signal_values[k] != v:
- self.signal_values[k] = v
- r.add(k)
- self.modifications.clear()
- return r
-
- def eval(self, node, postcommit=False):
- if isinstance(node, Constant):
- return node.value
- elif isinstance(node, Signal):
- if postcommit:
- try:
- return self.modifications[node]
- except KeyError:
- pass
- try:
- return self.signal_values[node]
- except KeyError:
- return node.reset.value
- elif isinstance(node, _Operator):
- operands = [self.eval(o, postcommit) for o in node.operands]
- if node.op == "-":
- if len(operands) == 1:
- return -operands[0]
- else:
- return operands[0] - operands[1]
- elif node.op == "m":
- return operands[1] if operands[0] else operands[2]
- else:
- return str2op[node.op](*operands)
- elif isinstance(node, _Slice):
- v = self.eval(node.value, postcommit)
- idx = range(node.start, node.stop)
- return sum(((v >> i) & 1) << j for j, i in enumerate(idx))
- elif isinstance(node, Cat):
- shift = 0
- r = 0
- for element in node.l:
- nbits = flen(element)
- # make value always positive
- r |= (self.eval(element, postcommit) & (2**nbits-1)) << shift
- shift += nbits
- return r
- elif isinstance(node, _ArrayProxy):
- return self.eval(node.choices[self.eval(node.key, postcommit)],
- postcommit)
- elif isinstance(node, _MemoryLocation):
- array = self.replaced_memories[node.memory]
- return self.eval(array[self.eval(node.index, postcommit)], postcommit)
- else:
- # TODO: ClockSignal, ResetSignal
- raise NotImplementedError
-
- def assign(self, node, value):
- if isinstance(node, Signal):
- assert not node.variable
- value = value & (2**node.nbits - 1)
- if node.signed and (value & 2**(node.nbits - 1)):
- value -= 2**node.nbits
- self.modifications[node] = value
- elif isinstance(node, Cat):
- for element in node.l:
- nbits = flen(element)
- self.assign(element, value & (2**nbits-1))
- value >>= nbits
- elif isinstance(node, _Slice):
- full_value = self.eval(node, True)
- # clear bits assigned to by the slice
- full_value &= ~((2**node.stop-1) - (2**node.start-1))
- # set them to the new value
- value &= 2**(node.stop - node.start)-1
- full_value |= value << node.start
- self.assign(node, full_value)
- elif isinstance(node, _ArrayProxy):
- self.assign(node.choices[self.eval(node.key)], value)
- elif isinstance(node, _MemoryLocation):
- array = self.replaced_memories[node.memory]
- self.assign(array[self.eval(node.index)], value)
- else:
- # TODO: ClockSignal, ResetSignal
- raise NotImplementedError
-
- def execute(self, statements):
- for s in statements:
- if isinstance(s, _Assign):
- self.assign(s.l, self.eval(s.r))
- elif isinstance(s, If):
- if self.eval(s.cond):
- self.execute(s.t)
- else:
- self.execute(s.f)
- elif isinstance(s, Case):
- test = self.eval(s.test)
- for k, v in s.cases.items():
- if isinstance(k, Constant) and k.value == test:
- self.execute(v)
- return
- if "default" in s.cases:
- self.execute(s.cases["default"])
- else:
- raise NotImplementedError
-
-
-# TODO: instances via Iverilog/VPI
-# TODO: VCD output
-class Simulator:
- def __init__(self, fragment_or_module, generators, clocks={"sys": 100}):
- if isinstance(fragment_or_module, _Fragment):
- self.fragment = fragment_or_module
- else:
- self.fragment = fragment_or_module.get_fragment()
- if not isinstance(generators, dict):
- generators = {"sys": generators}
- self.generators = dict()
- for k, v in generators.items():
- if isinstance(v, list):
- self.generators[k] = v
- else:
- self.generators[k] = [v]
-
- mta = MemoryToArray()
- mta.transform_fragment(None, self.fragment)
- # TODO: insert_resets on sync
- # comb signals return to their reset value if nothing assigns them
- self.fragment.comb[0:0] = [s.eq(s.reset)
- for s in list_targets(self.fragment.comb)]
-
- self.time = TimeManager(clocks)
- self.evaluator = Evaluator(mta.replacements)
-
- def _commit_and_comb_propagate(self):
- # TODO: optimize
- modified = self.evaluator.commit()
- while modified:
- self.evaluator.execute(self.fragment.comb)
- modified = self.evaluator.commit()
-
- def _evalexec_nested_lists(self, x):
- if isinstance(x, list):
- return [self._evalexec_nested_lists(e) for e in x]
- elif isinstance(x, _Value):
- return self.evaluator.eval(x)
- elif isinstance(x, _Statement):
- self.evaluator.execute([x])
- return None
- else:
- raise ValueError
-
- def _process_generators(self, cd):
- exhausted = []
- for generator in self.generators[cd]:
- reply = None
- while True:
- try:
- request = generator.send(reply)
- if request is None:
- break # next cycle
- else:
- reply = self._evalexec_nested_lists(request)
- except StopIteration:
- exhausted.append(generator)
- break
- for generator in exhausted:
- self.generators[cd].remove(generator)
-
- def _continue_simulation(self):
- # TODO: passive generators
- return any(self.generators.values())
-
- def run(self):
- self.evaluator.execute(self.fragment.comb)
- self._commit_and_comb_propagate()
-
- while True:
- cds = self.time.tick()
- for cd in cds:
- if cd in self.fragment.sync:
- self.evaluator.execute(self.fragment.sync[cd])
- if cd in self.generators:
- self._process_generators(cd)
- self._commit_and_comb_propagate()
-
- if not self._continue_simulation():
- break
--- /dev/null
+from migen.sim.core import Simulator, run_simulation
--- /dev/null
+import operator
+
+from migen.fhdl.structure import *
+from migen.fhdl.structure import (_Value, _Statement,
+ _Operator, _Slice, _ArrayProxy,
+ _Assign, _Fragment)
+from migen.fhdl.bitcontainer import flen
+from migen.fhdl.tools import list_signals, list_targets
+from migen.fhdl.simplify import MemoryToArray
+from migen.fhdl.specials import _MemoryLocation
+from migen.sim.vcd import VCDWriter, DummyVCDWriter
+
+
+class ClockState:
+ def __init__(self, period, times_before_tick):
+ self.period = period
+ self.times_before_tick = times_before_tick
+
+
+class TimeManager:
+ def __init__(self, description):
+ self.clocks = dict()
+
+ for k, v in description.items():
+ if not isinstance(v, tuple):
+ v = v, 0
+ self.clocks[k] = ClockState(v[0], v[0] - v[1])
+
+ def tick(self):
+ r = set()
+ dt = min(cs.times_before_tick for cs in self.clocks.values())
+ for k, cs in self.clocks.items():
+ if cs.times_before_tick == dt:
+ r.add(k)
+ cs.times_before_tick -= dt
+ if not cs.times_before_tick:
+ cs.times_before_tick += cs.period
+ return dt, r
+
+
+str2op = {
+ "~": operator.invert,
+ "+": operator.add,
+ "-": operator.sub,
+ "*": operator.mul,
+
+ ">>>": operator.rshift,
+ "<<<": operator.lshift,
+
+ "&": operator.and_,
+ "^": operator.xor,
+ "|": operator.or_,
+
+ "<": operator.lt,
+ "<=": operator.le,
+ "==": operator.eq,
+ "!=": operator.ne,
+ ">": operator.gt,
+ ">=": operator.ge,
+}
+
+
+class Evaluator:
+ def __init__(self, replaced_memories):
+ self.replaced_memories = replaced_memories
+ self.signal_values = dict()
+ self.modifications = dict()
+
+ def commit(self):
+ r = set()
+ for k, v in self.modifications.items():
+ if k not in self.signal_values or self.signal_values[k] != v:
+ self.signal_values[k] = v
+ r.add(k)
+ self.modifications.clear()
+ return r
+
+ def eval(self, node, postcommit=False):
+ if isinstance(node, Constant):
+ return node.value
+ elif isinstance(node, Signal):
+ if postcommit:
+ try:
+ return self.modifications[node]
+ except KeyError:
+ pass
+ try:
+ return self.signal_values[node]
+ except KeyError:
+ return node.reset.value
+ elif isinstance(node, _Operator):
+ operands = [self.eval(o, postcommit) for o in node.operands]
+ if node.op == "-":
+ if len(operands) == 1:
+ return -operands[0]
+ else:
+ return operands[0] - operands[1]
+ elif node.op == "m":
+ return operands[1] if operands[0] else operands[2]
+ else:
+ return str2op[node.op](*operands)
+ elif isinstance(node, _Slice):
+ v = self.eval(node.value, postcommit)
+ idx = range(node.start, node.stop)
+ return sum(((v >> i) & 1) << j for j, i in enumerate(idx))
+ elif isinstance(node, Cat):
+ shift = 0
+ r = 0
+ for element in node.l:
+ nbits = flen(element)
+ # make value always positive
+ r |= (self.eval(element, postcommit) & (2**nbits-1)) << shift
+ shift += nbits
+ return r
+ elif isinstance(node, _ArrayProxy):
+ return self.eval(node.choices[self.eval(node.key, postcommit)],
+ postcommit)
+ elif isinstance(node, _MemoryLocation):
+ array = self.replaced_memories[node.memory]
+ return self.eval(array[self.eval(node.index, postcommit)], postcommit)
+ else:
+ # TODO: ClockSignal, ResetSignal
+ raise NotImplementedError
+
+ def assign(self, node, value):
+ if isinstance(node, Signal):
+ assert not node.variable
+ value = value & (2**node.nbits - 1)
+ if node.signed and (value & 2**(node.nbits - 1)):
+ value -= 2**node.nbits
+ self.modifications[node] = value
+ elif isinstance(node, Cat):
+ for element in node.l:
+ nbits = flen(element)
+ self.assign(element, value & (2**nbits-1))
+ value >>= nbits
+ elif isinstance(node, _Slice):
+ full_value = self.eval(node, True)
+ # clear bits assigned to by the slice
+ full_value &= ~((2**node.stop-1) - (2**node.start-1))
+ # set them to the new value
+ value &= 2**(node.stop - node.start)-1
+ full_value |= value << node.start
+ self.assign(node, full_value)
+ elif isinstance(node, _ArrayProxy):
+ self.assign(node.choices[self.eval(node.key)], value)
+ elif isinstance(node, _MemoryLocation):
+ array = self.replaced_memories[node.memory]
+ self.assign(array[self.eval(node.index)], value)
+ else:
+ # TODO: ClockSignal, ResetSignal
+ raise NotImplementedError
+
+ def execute(self, statements):
+ for s in statements:
+ if isinstance(s, _Assign):
+ self.assign(s.l, self.eval(s.r))
+ elif isinstance(s, If):
+ if self.eval(s.cond):
+ self.execute(s.t)
+ else:
+ self.execute(s.f)
+ elif isinstance(s, Case):
+ test = self.eval(s.test)
+ for k, v in s.cases.items():
+ if isinstance(k, Constant) and k.value == test:
+ self.execute(v)
+ return
+ if "default" in s.cases:
+ self.execute(s.cases["default"])
+ else:
+ raise NotImplementedError
+
+
+# TODO: instances via Iverilog/VPI
+class Simulator:
+ def __init__(self, fragment_or_module, generators, clocks={"sys": 10}, vcd_name=None):
+ if isinstance(fragment_or_module, _Fragment):
+ self.fragment = fragment_or_module
+ else:
+ self.fragment = fragment_or_module.get_fragment()
+ if not isinstance(generators, dict):
+ generators = {"sys": generators}
+ self.generators = dict()
+ for k, v in generators.items():
+ if isinstance(v, list):
+ self.generators[k] = v
+ else:
+ self.generators[k] = [v]
+
+ mta = MemoryToArray()
+ mta.transform_fragment(None, self.fragment)
+ # TODO: insert_resets on sync
+ # comb signals return to their reset value if nothing assigns them
+ self.fragment.comb[0:0] = [s.eq(s.reset)
+ for s in list_targets(self.fragment.comb)]
+
+ if vcd_name is None:
+ self.vcd = DummyVCDWriter()
+ else:
+ signals = sorted(list_signals(self.fragment),
+ key=lambda x: x.duid)
+ self.vcd = VCDWriter(vcd_name, signals)
+
+ self.time = TimeManager(clocks)
+ self.evaluator = Evaluator(mta.replacements)
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, type, value, traceback):
+ self.close()
+
+ def close(self):
+ self.vcd.close()
+
+ def _commit_and_comb_propagate(self):
+ # TODO: optimize
+ all_modified = set()
+ modified = self.evaluator.commit()
+ all_modified |= modified
+ while modified:
+ self.evaluator.execute(self.fragment.comb)
+ modified = self.evaluator.commit()
+ all_modified |= modified
+ for signal in all_modified:
+ self.vcd.set(signal, self.evaluator.signal_values[signal])
+
+ def _evalexec_nested_lists(self, x):
+ if isinstance(x, list):
+ return [self._evalexec_nested_lists(e) for e in x]
+ elif isinstance(x, _Value):
+ return self.evaluator.eval(x)
+ elif isinstance(x, _Statement):
+ self.evaluator.execute([x])
+ return None
+ else:
+ raise ValueError
+
+ def _process_generators(self, cd):
+ exhausted = []
+ for generator in self.generators[cd]:
+ reply = None
+ while True:
+ try:
+ request = generator.send(reply)
+ if request is None:
+ break # next cycle
+ else:
+ reply = self._evalexec_nested_lists(request)
+ except StopIteration:
+ exhausted.append(generator)
+ break
+ for generator in exhausted:
+ self.generators[cd].remove(generator)
+
+ def _continue_simulation(self):
+ # TODO: passive generators
+ return any(self.generators.values())
+
+ def run(self):
+ self.evaluator.execute(self.fragment.comb)
+ self._commit_and_comb_propagate()
+
+ while True:
+ dt, cds = self.time.tick()
+ self.vcd.delay(dt)
+ for cd in cds:
+ if cd in self.fragment.sync:
+ self.evaluator.execute(self.fragment.sync[cd])
+ if cd in self.generators:
+ self._process_generators(cd)
+ self._commit_and_comb_propagate()
+
+ if not self._continue_simulation():
+ break
+
+
+def run_simulation(*args, **kwargs):
+ with Simulator(*args, **kwargs) as s:
+ s.run()
--- /dev/null
+from itertools import count
+
+from migen.fhdl.bitcontainer import flen
+from migen.fhdl.namer import build_namespace
+
+
+def vcd_codes():
+ codechars = [chr(i) for i in range(33, 127)]
+ for n in count():
+ q, r = divmod(n, len(codechars))
+ code = codechars[r]
+ while q > 0:
+ q, r = divmod(q, len(codechars))
+ code = codechars[r] + code
+ yield code
+
+
+class VCDWriter:
+ def __init__(self, filename, signals):
+ self.fo = open(filename, "w")
+ self.codes = dict()
+ self.signal_values = dict()
+ self.t = 0
+
+ try:
+ ns = build_namespace(signals)
+ codes = vcd_codes()
+ for signal in signals:
+ name = ns.get_name(signal)
+ code = next(codes)
+ self.codes[signal] = code
+ self.fo.write("$var wire {len} {code} {name} $end\n"
+ .format(name=name, code=code, len=flen(signal)))
+ self.fo.write("$dumpvars\n")
+ for signal in signals:
+ value = signal.reset.value
+ self._write_value(signal, value)
+ self.signal_values[signal] = value
+ self.fo.write("$end\n")
+ self.fo.write("#0\n")
+ except:
+ self.close()
+ raise
+
+ def _write_value(self, signal, value):
+ l = flen(signal)
+ if value < 0:
+ value += 2**l
+ if l > 1:
+ fmtstr = "b{:0" + str(l) + "b} {}\n"
+ else:
+ fmtstr = "{}{}\n"
+ self.fo.write(fmtstr.format(value, self.codes[signal]))
+
+ def set(self, signal, value):
+ if self.signal_values[signal] != value:
+ self._write_value(signal, value)
+ self.signal_values[signal] = value
+
+ def delay(self, delay):
+ self.t += delay
+ self.fo.write("#{}\n".format(self.t))
+
+ def close(self):
+ self.fo.close()
+
+
+class DummyVCDWriter:
+ def set(self, signal, value):
+ pass
+
+ def delay(self, delay):
+ pass
+
+ def close(self):
+ pass
verilog.convert(self.tb)
def run_with(self, generator):
- Simulator(self.tb, generator).run()
+ run_simulation(self.tb, generator)