from migen.fhdl.std import *
from migen.genlib.cdc import GrayCounter
-from migen.sim.generic import Simulator
+from migen.sim.generic import run_simulation
class TB(Module):
def __init__(self, width=3):
self.submodules.gc = GrayCounter(self.width)
self.prng = Random(7345)
- def do_simulation(self, s):
- print("{0:0{1}b} CE={2} bin={3}".format(s.rd(self.gc.q),
- self.width, s.rd(self.gc.ce), s.rd(self.gc.q_binary)))
- s.wr(self.gc.ce, self.prng.getrandbits(1))
+ def do_simulation(self, selfp):
+ print("{0:0{1}b} CE={2} bin={3}".format(selfp.gc.q,
+ self.width, selfp.gc.ce, selfp.gc.q_binary))
+ selfp.gc.ce = self.prng.getrandbits(1)
-sim = Simulator(TB())
-sim.run(35)
+if __name__ == "__main__":
+ run_simulation(TB(), ncycles=35)
from migen.fhdl.std import *
from migen.flow.network import *
from migen.flow.transactions import *
-from migen.actorlib import dma_wishbone, dma_asmi
+from migen.actorlib import dma_wishbone
from migen.actorlib.sim import *
-from migen.bus import wishbone, asmibus
-from migen.sim.generic import Simulator
+from migen.bus import wishbone
+from migen.sim.generic import run_simulation
class MyModel:
def read(self, address):
def can_ack(self, bus):
return self.prng.randrange(0, 2)
-class MyModelASMI(MyModel, asmibus.TargetModel):
- pass
-
def adrgen_gen():
for i in range(10):
print("Address: " + hex(i))
self.submodules.comp = CompositeActor(g)
TBWishbone.__init__(self, self.reader)
- def do_simulation(self, s):
- s.interrupt = self.adrgen.token_exchanger.done and not s.rd(self.comp.busy)
-
class TBWishboneWriter(TBWishbone):
def __init__(self):
self.trgen = SimTrGen(30)
self.submodules.comp = CompositeActor(g)
TBWishbone.__init__(self, self.writer)
- def do_simulation(self, s):
- s.interrupt = self.trgen.token_exchanger.done and not s.rd(self.comp.busy)
-
-class TBAsmi(Module):
- def __init__(self, nslots):
- self.submodules.hub = asmibus.Hub(32, 32)
- self.port = self.hub.get_port(nslots)
- self.hub.finalize()
-
- self.submodules.peripheral = asmibus.Target(MyModelASMI(), self.hub)
- self.submodules.tap = asmibus.Tap(self.hub)
-
-class TBAsmiReader(TBAsmi):
- def __init__(self, nslots):
- TBAsmi.__init__(self, nslots)
-
- self.adrgen = SimAdrGen(32)
- self.reader = dma_asmi.Reader(self.port)
- self.dumper = SimDumper()
- g = DataFlowGraph()
- g.add_connection(self.adrgen, self.reader)
- g.add_connection(self.reader, self.dumper)
- self.submodules.comp = CompositeActor(g)
-
- def do_simulation(self, s):
- s.interrupt = self.adrgen.token_exchanger.done and not s.rd(self.comp.busy)
-
-class TBAsmiWriter(TBAsmi):
- def __init__(self, nslots):
- TBAsmi.__init__(self, nslots)
-
- self.trgen = SimTrGen(32)
- self.writer = dma_asmi.Writer(self.port)
- g = DataFlowGraph()
- g.add_connection(self.trgen, self.writer)
- self.submodules.comp = CompositeActor(g)
-
- def do_simulation(self, s):
- s.interrupt = self.trgen.token_exchanger.done and not s.rd(self.comp.busy)
-
def test_wb_reader():
print("*** Testing Wishbone reader")
- Simulator(TBWishboneReader()).run()
+ run_simulation(TBWishboneReader())
def test_wb_writer():
print("*** Testing Wishbone writer")
- Simulator(TBWishboneWriter()).run()
-
-def test_asmi_reader(nslots):
- print("*** Testing ASMI reader (nslots={})".format(nslots))
- Simulator(TBAsmiReader(nslots)).run()
-
-def test_asmi_writer(nslots):
- print("*** Testing ASMI writer (nslots={})".format(nslots))
- Simulator(TBAsmiWriter(nslots)).run()
-
-test_wb_reader()
-test_wb_writer()
-test_asmi_reader(1)
-test_asmi_reader(2)
-test_asmi_writer(1)
-test_asmi_writer(2)
+ run_simulation(TBWishboneWriter())
+
+if __name__ == "__main__":
+ test_wb_reader()
+ test_wb_writer()
from migen.flow.transactions import *
from migen.actorlib import misc
from migen.actorlib.sim import *
-from migen.sim.generic import Simulator
+from migen.sim.generic import run_simulation
def source_gen():
for i in range(10):
self.sink = Sink([("value", 32)])
SimActor.__init__(self, sink_gen())
-def main():
+if __name__ == "__main__":
source = SimSource()
loop = misc.IntSequence(32)
sink = SimSink()
g.add_connection(source, loop)
g.add_connection(loop, sink)
comp = CompositeActor(g)
- sim = Simulator(comp)
- sim.run(500)
-
-main()
+ run_simulation(comp, ncycles=500)
from migen.flow.transactions import *
from migen.actorlib import structuring
from migen.actorlib.sim import *
-from migen.sim.generic import Simulator
from migen.flow import perftools
+from migen.sim.generic import run_simulation
pack_factor = 5
base_layout = [("value", 32)]
self.submodules.comp = CompositeActor(self.g)
self.submodules.reporter = perftools.DFGReporter(self.g)
-def main():
+if __name__ == "__main__":
tb = TB()
- sim = Simulator(tb).run(1000)
+ run_simulation(tb, ncycles=1000)
g_layout = nx.spectral_layout(tb.g)
nx.draw(tb.g, g_layout)
nx.draw_networkx_edge_labels(tb.g, g_layout, tb.reporter.get_edge_labels())
plt.show()
-
-main()
from migen.flow.transactions import *
from migen.actorlib.sim import *
from migen.pytholite.compiler import Pytholite
-from migen.sim.generic import Simulator
+from migen.sim.generic import run_simulation
from migen.fhdl import verilog
layout = [("r", 32)]
self.result = Source(layout)
SimActor.__init__(self, number_gen(5))
-def run_sim(ng):
+def run_ng_sim(ng):
g = DataFlowGraph()
d = Dumper(layout)
g.add_connection(ng, d)
c = CompositeActor(g)
- sim = Simulator(c)
- sim.run(20)
- del sim
+ run_simulation(c, ncycles=20)
def make_ng_pytholite():
ng_pytholite = Pytholite(number_gen, 5)
def main():
print("Simulating native Python:")
ng_native = SimNumberGen()
- run_sim(ng_native)
+ run_ng_sim(ng_native)
print("Simulating Pytholite:")
ng_pytholite = make_ng_pytholite()
- run_sim(ng_pytholite)
+ run_ng_sim(ng_pytholite)
print("Converting Pytholite to Verilog:")
ng_pytholite = make_ng_pytholite()
from migen.genlib.ioo import UnifiedIOSimulation
from migen.pytholite.transel import Register
from migen.pytholite.compiler import Pytholite
-from migen.sim.generic import Simulator
+from migen.sim.generic import run_simulation
from migen.fhdl.std import *
from migen.fhdl import verilog
self.submodules.intercon = wishbone.InterconnectPointToPoint(ng.wb, self.slave.bus)
self.submodules.ca = CompositeActor(g)
-def run_sim(ng):
- sim = Simulator(TestBench(ng))
- sim.run(50)
- del sim
+def run_ng_sim(ng):
+ run_simulation(TestBench(ng), ncycles=50)
def add_interfaces(obj):
obj.result = Source(layout)
print("Simulating native Python:")
ng_native = UnifiedIOSimulation(gen())
add_interfaces(ng_native)
- run_sim(ng_native)
+ run_ng_sim(ng_native)
print("Simulating Pytholite:")
ng_pytholite = Pytholite(gen)
add_interfaces(ng_pytholite)
- run_sim(ng_pytholite)
+ run_ng_sim(ng_pytholite)
print("Converting Pytholite to Verilog:")
ng_pytholite = Pytholite(gen)
from migen.fhdl.std import *
from migen.bus.transactions import *
from migen.bus import lasmibus
-from migen.sim.generic import Simulator
+from migen.sim.generic import run_simulation
def my_generator(n):
bank = n % 4
def __init__(self):
self.submodules.controller = lasmibus.Target(MyModel(), aw=4, dw=32, nbanks=4, req_queue_size=4,
read_latency=4, write_latency=1)
- self.submodules.xbar = lasmibus.Crossbar([self.controller.bus], 4, 2)
- self.initiators = [lasmibus.Initiator(my_generator(n), bus) for n, bus in enumerate(self.xbar.masters)]
+ self.submodules.xbar = lasmibus.Crossbar([self.controller.bus], 2)
+ self.initiators = [lasmibus.Initiator(my_generator(n), self.xbar.get_master()) for n in range(4)]
self.submodules += self.initiators
- def do_simulation(self, s):
- s.interrupt = all(m.done for m in self.initiators)
-
-def main():
- tb = TB()
- sim = Simulator(tb)
- sim.run()
-
-main()
+if __name__ == "__main__":
+ run_simulation(TB())
from migen.fhdl.std import *
from migen.bus.transactions import *
from migen.bus import wishbone
-from migen.sim.generic import Simulator
+from migen.sim.generic import run_simulation
# Our bus master.
# Python generators let us program bus transactions in an elegant sequential style.
# Connect the master to the slave.
self.submodules.intercon = wishbone.InterconnectPointToPoint(self.master.bus, self.slave.bus)
- def do_simulation(self, s):
- # Terminate the simulation when the initiator is done (i.e. our generator is exhausted).
- s.interrupt = self.master.done
-
-def main():
- tb = TB()
- sim = Simulator(tb)
- sim.run()
-
-main()
+if __name__ == "__main__":
+ run_simulation(TB())
# Output:
# <TWrite adr:0x0 dat:0x0>
-# Copyright (C) 2012 Vermeer Manufacturing Co.
-# License: GPLv3 with additional permissions (see README).
-
from migen.fhdl.std import *
-from migen.sim.generic import Simulator
+from migen.sim.generic import run_simulation
# Our simple counter, which increments at every cycle
# and prints its current value in simulation.
self.sync += self.count.eq(self.count + 1)
# This function will be called at every cycle.
- def do_simulation(self, s):
+ def do_simulation(self, selfp):
# Simply read the count signal and print it.
# The output is:
# Count: 0
# Count: 1
# Count: 2
# ...
- print("Count: " + str(s.rd(self.count)))
+ print("Count: " + str(selfp.count))
-def main():
+if __name__ == "__main__":
dut = Counter()
- # We do not specify a top-level nor runner object, and use the defaults.
- sim = Simulator(dut)
- # Since we do not use sim.interrupt, limit the simulation
+ # Since we do not use StopSimulation, limit the simulation
# to some number of cycles.
- sim.run(20)
-
-main()
+ run_simulation(dut, ncycles=20)
-# Copyright (C) 2012 Vermeer Manufacturing Co.
-# License: GPLv3 with additional permissions (see README).
-
from migen.fhdl.std import *
-from migen.sim.generic import Simulator, TopLevel
+from migen.sim.generic import run_simulation
-# A slightly improved counter.
+# A slightly more elaborate counter.
# Has a clock enable (CE) signal, counts on more bits
# and resets with a negative number.
class Counter(Module):
self.sync += If(self.ce, self.count.eq(self.count + 1))
- def do_simulation(self, s):
+ def do_simulation(self, selfp):
# Only assert CE every second cycle.
# => each counter value is held for two cycles.
- if s.cycle_counter % 2:
- s.wr(self.ce, 0) # This is how you write to a signal.
+ if selfp.simulator.cycle_counter % 2:
+ selfp.ce = 0 # This is how you write to a signal.
else:
- s.wr(self.ce, 1)
- print("Cycle: " + str(s.cycle_counter) + " Count: " + \
- str(s.rd(self.count)))
- # Set the "initialize" property on our simulation function.
- # The simulator will call it during the reset cycle,
- # with s.cycle_counter == -1.
- do_simulation.initialize = True
+ selfp.ce = 1
+ print("Cycle: " + str(selfp.simulator.cycle_counter) + " Count: " + \
+ str(selfp.count))
- # Output is:
- # Cycle: -1 Count: 0
- # Cycle: 0 Count: -5
- # Cycle: 1 Count: -5
- # Cycle: 2 Count: -4
- # Cycle: 3 Count: -4
- # Cycle: 4 Count: -3
- # ...
+# Output is:
+# Cycle: 0 Count: -5
+# Cycle: 1 Count: -5
+# Cycle: 2 Count: -4
+# Cycle: 3 Count: -4
+# Cycle: 4 Count: -3
+# ...
-def main():
+if __name__ == "__main__":
dut = Counter()
- # Instantiating the generic top-level ourselves lets us
- # specify a VCD output file.
- sim = Simulator(dut, TopLevel("my.vcd"))
- sim.run(20)
-
-main()
+ # Demonstrate VCD output
+ run_simulation(dut, vcd_name="my.vcd", ncycles=20)
from migen.fhdl.std import *
from migen.fhdl import verilog
from migen.genlib.cordic import Cordic
-from migen.sim.generic import Simulator
+from migen.sim.generic import run_simulation
class TestBench(Module):
def __init__(self, n=None, xmax=.98, i=None, **kwargs):
self.ii = iter(self.i)
self.o = []
- def do_simulation(self, s):
- if s.rd(self.cordic.new_in):
+ def do_simulation(self, selfp):
+ if selfp.cordic.new_in:
try:
- xi, yi, zi = next(self.ii)
+ selfp.cordic.xi, selfp.cordic.yi, selfp.cordic.zi = next(self.ii)
except StopIteration:
- s.interrupt = True
- return
- s.wr(self.cordic.xi, xi)
- s.wr(self.cordic.yi, yi)
- s.wr(self.cordic.zi, zi)
- if s.rd(self.cordic.new_out):
- xo = s.rd(self.cordic.xo)
- yo = s.rd(self.cordic.yo)
- zo = s.rd(self.cordic.zo)
- self.o.append((xo, yo, zo))
+ raise StopSimulation
+ if selfp.cordic.new_out:
+ self.o.append((selfp.cordic.xo, selfp.cordic.yo, selfp.cordic.zo))
- def run_io(self):
- with Simulator(self) as sim:
- sim.run()
+ def run_io(self):
+ run_simulation(self)
del self.i[-1], self.o[0]
if self.i[0] != (0, 0, 0):
assert self.o[0] != (0, 0, 0)
from migen.flow.transactions import *
from migen.flow.network import *
from migen.actorlib.sim import *
-from migen.sim.generic import Simulator
+from migen.sim.generic import run_simulation
def source_gen():
for i in range(10):
g.add_connection(self.source, self.sink)
self.submodules.comp = CompositeActor(g)
- def do_simulation(self, s):
- s.interrupt = self.source.token_exchanger.done
-
-Simulator(TB()).run()
+if __name__ == "__main__":
+ run_simulation(TB())
-# Copyright (C) 2012 Vermeer Manufacturing Co.
-# License: GPLv3 with additional permissions (see README).
-
from math import cos, pi
from scipy import signal
import matplotlib.pyplot as plt
from migen.fhdl.std import *
from migen.fhdl import verilog
from migen.genlib.misc import optree
-from migen.sim.generic import Simulator
+from migen.sim.generic import run_simulation
# A synthesizable FIR filter.
class FIR(Module):
self.inputs = []
self.outputs = []
- def do_simulation(self, s):
+ def do_simulation(self, selfp):
f = 2**(self.fir.wsize - 1)
- v = 0.1*cos(2*pi*self.frequency*s.cycle_counter)
- s.wr(self.fir.i, int(f*v))
+ v = 0.1*cos(2*pi*self.frequency*selfp.simulator.cycle_counter)
+ selfp.fir.i = int(f*v)
self.inputs.append(v)
- self.outputs.append(s.rd(self.fir.o)/f)
+ self.outputs.append(selfp.fir.o/f)
-def main():
+if __name__ == "__main__":
# Compute filter coefficients with SciPy.
coef = signal.remez(30, [0, 0.1, 0.2, 0.4, 0.45, 0.5], [0, 1, 0])
out_signals = []
for frequency in [0.05, 0.1, 0.25]:
tb = TB(coef, frequency)
- sim = Simulator(tb)
- sim.run(200)
- del sim
+ run_simulation(tb, ncycles=200)
in_signals += tb.inputs
out_signals += tb.outputs
# Print the Verilog source for the filter.
fir = FIR(coef)
print(verilog.convert(fir, ios={fir.i, fir.o}))
-
-main()
-# Copyright (C) 2012 Vermeer Manufacturing Co.
-# License: GPLv3 with additional permissions (see README).
-
from migen.fhdl.std import *
-from migen.sim.generic import Simulator
+from migen.sim.generic import run_simulation
class Mem(Module):
def __init__(self):
# from 0 to 19.
self.specials.mem = Memory(16, 2**12, init=list(range(20)))
- def do_simulation(self, s):
+ def do_simulation(self, selfp):
# Read the memory. Use the cycle counter as address.
- value = s.rd(self.mem, s.cycle_counter)
+ value = selfp.mem[selfp.simulator.cycle_counter]
# Print the result. Output is:
# 0
# 1
# 2
# ...
print(value)
- # Demonstrate how to interrupt the simulator.
+ # Raising StopSimulation disables the current (and here, only one)
+ # simulation function. Simulator stops when all functions are disabled.
if value == 10:
- s.interrupt = True
-
-def main():
- dut = Mem()
- sim = Simulator(dut)
- # No need for a cycle limit here, we use sim.interrupt instead.
- sim.run()
+ raise StopSimulation
-main()
+if __name__ == "__main__":
+ run_simulation(Mem())
from migen.fhdl.std import *
from migen.flow.actor import *
from migen.flow.transactions import *
+from migen.util.misc import xdir
+
+def _sim_multiread(sim, obj):
+ if isinstance(obj, Signal):
+ return sim.rd(obj)
+ else:
+ r = {}
+ for k, v in xdir(obj, True):
+ rd = _sim_multiread(sim, v)
+ if isinstance(rd, int) or rd:
+ r[k] = rd
+ return r
+
+def _sim_multiwrite(sim, obj, value):
+ if isinstance(obj, Signal):
+ sim.wr(obj, value)
+ else:
+ for k, v in value.items():
+ _sim_multiwrite(sim, getattr(obj, k), v)
# Generators yield None or a tuple of Tokens.
# Tokens for Sink endpoints are pulled and the "value" field filled in.
self.actor = actor
self.active = set()
self.busy = True
- self.done = False
- def _process_transactions(self, s):
+ def _process_transactions(self, selfp):
completed = set()
for token in self.active:
ep = getattr(self.actor, token.endpoint)
if isinstance(ep, Sink):
- if s.rd(ep.ack) and s.rd(ep.stb):
- token.value = s.multiread(ep.payload)
+ if selfp.simulator.rd(ep.ack) and selfp.simulator.rd(ep.stb):
+ token.value = _sim_multiread(selfp.simulator, ep.payload)
completed.add(token)
- s.wr(ep.ack, 0)
+ selfp.simulator.wr(ep.ack, 0)
elif isinstance(ep, Source):
- if s.rd(ep.ack) and s.rd(ep.stb):
+ if selfp.simulator.rd(ep.ack) and selfp.simulator.rd(ep.stb):
completed.add(token)
- s.wr(ep.stb, 0)
+ selfp.simulator.wr(ep.stb, 0)
else:
raise TypeError
self.active -= completed
if not self.active:
self.busy = True
- def _update_control_signals(self, s):
+ def _update_control_signals(self, selfp):
for token in self.active:
ep = getattr(self.actor, token.endpoint)
if isinstance(ep, Sink):
- s.wr(ep.ack, 1)
+ selfp.simulator.wr(ep.ack, 1)
elif isinstance(ep, Source):
- s.multiwrite(ep.payload, token.value)
- s.wr(ep.stb, 1)
+ _sim_multiwrite(selfp.simulator, ep.payload, token.value)
+ selfp.simulator.wr(ep.stb, 1)
else:
raise TypeError
try:
transactions = next(self.generator)
except StopIteration:
- self.done = True
self.busy = False
- transactions = None
+ raise StopSimulation
if isinstance(transactions, Token):
self.active = {transactions}
elif isinstance(transactions, (tuple, list, set)):
if self.active and all(transaction.idle_wait for transaction in self.active):
self.busy = False
- def do_simulation(self, s):
- if not self.done:
- if self.active:
- self._process_transactions(s)
- if not self.active:
- self._next_transactions()
- self._update_control_signals(s)
-
- do_simulation.initialize = True
+ def do_simulation(self, selfp):
+ if self.active:
+ self._process_transactions(selfp)
+ if not self.active:
+ self._next_transactions()
+ self._update_control_signals(selfp)
class SimActor(Module):
def __init__(self, generator):
self.busy = Signal()
self.submodules.token_exchanger = TokenExchanger(generator, self)
- def do_simulation(self, s):
- s.wr(self.busy, self.token_exchanger.busy)
+ def do_simulation(self, selfp):
+ selfp.busy = self.token_exchanger.busy
def _dumper_gen(prefix):
while True:
self.transaction_start = 0
self.transaction = None
self.transaction_end = None
- self.done = False
- def do_simulation(self, s):
- s.wr(self.bus.dat_w, 0)
- s.wr(self.bus.dat_we, 0)
- if not self.done:
- if self.transaction is not None:
- if s.rd(self.bus.req_ack):
- s.wr(self.bus.stb, 0)
- if s.rd(self.bus.dat_ack):
- if isinstance(self.transaction, TRead):
- self.transaction_end = s.cycle_counter + self.bus.read_latency
- else:
- self.transaction_end = s.cycle_counter + self.bus.write_latency - 1
+ def do_simulation(self, selfp):
+ selfp.bus.dat_w = 0
+ selfp.bus.dat_we = 0
+
+ if self.transaction is not None:
+ if selfp.bus.req_ack:
+ selfp.bus.stb = 0
+ if selfp.bus.dat_ack:
+ if isinstance(self.transaction, TRead):
+ self.transaction_end = selfp.simulator.cycle_counter + self.bus.read_latency
+ else:
+ self.transaction_end = selfp.simulator.cycle_counter + self.bus.write_latency - 1
- if self.transaction is None or s.cycle_counter == self.transaction_end:
- if self.transaction is not None:
- self.transaction.latency = s.cycle_counter - self.transaction_start - 1
- if isinstance(self.transaction, TRead):
- self.transaction.data = s.rd(self.bus.dat_r)
- else:
- s.wr(self.bus.dat_w, self.transaction.data)
- s.wr(self.bus.dat_we, self.transaction.sel)
- try:
- self.transaction = next(self.generator)
- except StopIteration:
- self.done = True
- self.transaction = None
- if self.transaction is not None:
- self.transaction_start = s.cycle_counter
- s.wr(self.bus.stb, 1)
- s.wr(self.bus.adr, self.transaction.address)
- if isinstance(self.transaction, TRead):
- s.wr(self.bus.we, 0)
- else:
- s.wr(self.bus.we, 1)
+ if self.transaction is None or selfp.simulator.cycle_counter == self.transaction_end:
+ if self.transaction is not None:
+ self.transaction.latency = selfp.simulator.cycle_counter - self.transaction_start - 1
+ if isinstance(self.transaction, TRead):
+ self.transaction.data = selfp.bus.dat_r
+ else:
+ selfp.bus.dat_w = self.transaction.data
+ selfp.bus.dat_we = self.transaction.sel
+ try:
+ self.transaction = next(self.generator)
+ except StopIteration:
+ raise StopSimulation
+ if self.transaction is not None:
+ self.transaction_start = selfp.simulator.cycle_counter
+ selfp.bus.stb = 1
+ selfp.bus.adr = self.transaction.address
+ if isinstance(self.transaction, TRead):
+ selfp.bus.we = 0
+ else:
+ selfp.bus.we = 1
class TargetModel:
def __init__(self):
self.bank = bank
self.contents = []
- def do_simulation(self, s):
+ def do_simulation(self, selfp):
if len(self.contents) < self.req_queue_size:
- if s.rd(self.bank.stb):
- self.contents.append((s.rd(self.bank.we), s.rd(self.bank.adr)))
- s.wr(self.bank.req_ack, 1)
+ if selfp.bank.stb:
+ self.contents.append((selfp.bank.we, selfp.bank.adr))
+ selfp.bank.req_ack = 1
else:
- s.wr(self.bank.req_ack, 0)
- s.wr(self.bank.lock, bool(self.contents))
+ selfp.bank.req_ack = 0
+ selfp.bank.lock = bool(self.contents)
class Target(Module):
def __init__(self, model, *ifargs, **ifkwargs):
self.rd_pipeline = [None]*self.bus.read_latency
self.wr_pipeline = [None]*(self.bus.write_latency + 1)
- def do_simulation(self, s):
+ def do_simulation(self, selfp):
# determine banks with pending requests
pending_banks = set(nb for nb, rf in enumerate(self.req_fifos) if rf.contents)
selected_bank_n = self.model.select_bank(pending_banks)
selected_transaction = None
for nb in range(self.bus.nbanks):
- bank = getattr(self.bus, "bank"+str(nb))
+ bank = getattr(selfp.bus, "bank"+str(nb))
if nb == selected_bank_n:
- s.wr(bank.dat_ack, 1)
+ bank.dat_ack = 1
selected_transaction = self.req_fifos[nb].contents.pop(0)
else:
- s.wr(bank.dat_ack, 0)
+ bank.dat_ack = 0
rd_transaction = None
wr_transaction = None
done_rd_transaction = self.rd_pipeline.pop(0)
done_wr_transaction = self.wr_pipeline.pop(0)
if done_rd_transaction is not None:
- s.wr(self.bus.dat_r, self.model.read(done_rd_transaction[0], done_rd_transaction[1]))
+ selfp.bus.dat_r = self.model.read(done_rd_transaction[0], done_rd_transaction[1])
if done_wr_transaction is not None:
self.model.write(done_wr_transaction[0], done_wr_transaction[1],
- s.rd(self.bus.dat_w), s.rd(self.bus.dat_we))
+ selfp.bus.dat_w, selfp.bus.dat_we)
def __init__(self, generator, mem):
self.generator = generator
self.mem = mem
- self.done = False
- def do_simulation(self, s):
- if not self.done:
- try:
- transaction = next(self.generator)
- except StopIteration:
- self.done = True
- transaction = None
- if isinstance(transaction, TRead):
- transaction.data = s.rd(self.mem, transaction.address)
- elif isinstance(transaction, TWrite):
- d = s.rd(self.mem, transaction.address)
- d_mask = _byte_mask(d, transaction.data, transaction.sel)
- s.wr(s.mem, d_mask, transaction.address)
+ def do_simulation(self, selfp):
+ try:
+ transaction = next(self.generator)
+ except StopIteration:
+ transaction = None
+ raise StopSimulation
+ if isinstance(transaction, TRead):
+ transaction.data = selfp.mem[transaction.address]
+ elif isinstance(transaction, TWrite):
+ d = selfp.mem[transaction.address]
+ d_mask = _byte_mask(d, transaction.data, transaction.sel)
+ selfp.mem[transaction.address] = d_mask
from migen.genlib.misc import optree, chooser
from migen.genlib.fsm import FSM, NextState
from migen.bus.transactions import *
-from migen.sim.generic import Proxy
_layout = [
("adr", 30, DIR_M_TO_S),
self.bus = bus
self.handler = handler
- def do_simulation(self, s):
- if s.rd(self.bus.ack):
- assert(s.rd(self.bus.cyc) and s.rd(self.bus.stb))
- if s.rd(self.bus.we):
- transaction = TWrite(s.rd(self.bus.adr),
- s.rd(self.bus.dat_w),
- s.rd(self.bus.sel))
+ def do_simulation(self, selfp):
+ if selfp.bus.ack:
+ assert(selfp.bus.cyc and selfp.bus.stb)
+ if selfp.bus.we:
+ transaction = TWrite(selfp.bus.adr,
+ selfp.bus.dat_w,
+ selfp.bus.sel)
else:
- transaction = TRead(s.rd(self.bus.adr),
- s.rd(self.bus.dat_r))
+ transaction = TRead(selfp.bus.adr,
+ selfp.bus.dat_r)
self.handler(transaction)
class Initiator(Module):
self.bus = bus
self.transaction_start = 0
self.transaction = None
- self.done = False
- def do_simulation(self, s):
- if not self.done:
- if self.transaction is None or s.rd(self.bus.ack):
- if self.transaction is not None:
- self.transaction.latency = s.cycle_counter - self.transaction_start - 1
- if isinstance(self.transaction, TRead):
- self.transaction.data = s.rd(self.bus.dat_r)
- try:
- self.transaction = next(self.generator)
- except StopIteration:
- self.done = True
- self.transaction = None
- if self.transaction is not None:
- self.transaction_start = s.cycle_counter
- s.wr(self.bus.cyc, 1)
- s.wr(self.bus.stb, 1)
- s.wr(self.bus.adr, self.transaction.address)
- if isinstance(self.transaction, TWrite):
- s.wr(self.bus.we, 1)
- s.wr(self.bus.sel, self.transaction.sel)
- s.wr(self.bus.dat_w, self.transaction.data)
- else:
- s.wr(self.bus.we, 0)
+ def do_simulation(self, selfp):
+ if self.transaction is None or selfp.bus.ack:
+ if self.transaction is not None:
+ self.transaction.latency = selfp.simulator.cycle_counter - self.transaction_start - 1
+ if isinstance(self.transaction, TRead):
+ self.transaction.data = selfp.bus.dat_r
+ try:
+ self.transaction = next(self.generator)
+ except StopIteration:
+ selfp.bus.cyc = 0
+ selfp.bus.stb = 0
+ raise StopSimulation
+ if self.transaction is not None:
+ self.transaction_start = selfp.simulator.cycle_counter
+ selfp.bus.cyc = 1
+ selfp.bus.stb = 1
+ selfp.bus.adr = self.transaction.address
+ if isinstance(self.transaction, TWrite):
+ selfp.bus.we = 1
+ selfp.bus.sel = self.transaction.sel
+ selfp.bus.dat_w = self.transaction.data
else:
- s.wr(self.bus.cyc, 0)
- s.wr(self.bus.stb, 0)
+ selfp.bus.we = 0
+ else:
+ selfp.bus.cyc = 0
+ selfp.bus.stb = 0
class TargetModel:
def read(self, address):
self.bus = bus
self.model = model
- def do_simulation(self, s):
- bus = Proxy(s, self.bus)
+ def do_simulation(self, selfp):
+ bus = selfp.bus
if not bus.ack:
if self.model.can_ack(bus) and bus.cyc and bus.stb:
if bus.we:
import collections
from itertools import combinations
+from migen.util.misc import flat_iteration
from migen.fhdl.structure import *
from migen.fhdl.structure import _Fragment
-from migen.fhdl.specials import Special
from migen.fhdl.tools import rename_clock_domain
-from migen.util.misc import flat_iteration
+from migen.sim.upper import GenSim, ProxySim
class FinalizeError(Exception):
pass
self.finalized = False
return self.finalized
elif name == "_fragment":
+ simf = None
try:
- sim = [self.do_simulation]
+ simf = self.do_simulation
except AttributeError:
- sim = []
+ try:
+ simg = self.gen_simulation
+ except AttributeError:
+ pass
+ else:
+ gs = GenSim(simg)
+ simf = gs.do_simulation
+ if simf is not None:
+ ps = ProxySim(self, simf)
+ simf = ps.do_simulation
+ sim = [] if simf is None else [simf]
self._fragment = _Fragment(sim=sim)
return self._fragment
elif name == "_submodules":
(SPECIAL_INPUT, SPECIAL_OUTPUT, SPECIAL_INOUT) = range(3)
+class StopSimulation(Exception):
+ pass
+
class _Fragment:
def __init__(self, comb=None, sync=None, specials=None, clock_domains=None, sim=None):
if comb is None: comb = []
def on_inactive(self):
pass
- def do_simulation(self, s):
- if s.rd(self.endpoint.stb):
- if s.rd(self.endpoint.ack):
+ def do_simulation(self, selfp):
+ if selfp.endpoint.stb:
+ if selfp.endpoint.ack:
self.on_ack()
else:
self.on_nack()
return r
def _call_sim(fragment, simulator):
+ del_list = []
for s in fragment.sim:
- if simulator.cycle_counter >= 0 or (hasattr(s, "initialize") and s.initialize):
+ try:
s(simulator)
+ except StopSimulation:
+ del_list.append(s)
+ for s in del_list:
+ fragment.sim.remove(s)
class Simulator:
def __init__(self, fragment, top_level=None, sim_runner=None, sockaddr="simsocket", **vopts):
**vopts)
self.cycle_counter = -1
- self.interrupt = False
self.sim_runner = sim_runner
self.sim_runner.start(c_top, c_fragment)
self.ipc.accept()
reply = self.ipc.recv()
assert(isinstance(reply, MessageTick))
- _call_sim(self.fragment, self)
- def run(self, ncycles=-1):
- self.interrupt = False
+ def run(self, ncycles=None):
counter = 0
- while not self.interrupt and (ncycles < 0 or counter < ncycles):
+ while self.fragment.sim and (ncycles is None or counter < ncycles):
self.cycle_counter += 1
counter += 1
self.ipc.send(MessageGo())
assert(value >= 0 and value < 2**nbits)
self.ipc.send(MessageWrite(name, Int32(index), value))
- def multiread(self, obj):
- if isinstance(obj, Signal):
- return self.rd(obj)
- elif isinstance(obj, list):
- r = []
- for item in obj:
- rd = self.multiread(item)
- if isinstance(item, Signal) or rd:
- r.append(rd)
- return r
- elif hasattr(obj, "__dict__"):
- r = {}
- for k, v in obj.__dict__.items():
- rd = self.multiread(v)
- if isinstance(v, Signal) or rd:
- r[k] = rd
- return r
-
- def multiwrite(self, obj, value):
- if isinstance(obj, Signal):
- self.wr(obj, value)
- elif isinstance(obj, list):
- for target, source in zip(obj, value):
- self.multiwrite(target, source)
- else:
- for k, v in value.items():
- self.multiwrite(getattr(obj, k), v)
-
def __del__(self):
if hasattr(self, "ipc"):
warnings.warn("call Simulator.close() to clean up "
def __enter__(self):
return self
- def __exit__(self, type, value, traceback):
+ def __exit__(self, type, value, traceback):
self.close()
-# Contrary to multiread/multiwrite, Proxy fetches the necessary signals only and
-# immediately forwards writes into the simulation.
-class Proxy:
- def __init__(self, sim, obj):
- self.__dict__["_sim"] = sim
- self.__dict__["_obj"] = obj
-
- def __getattr__(self, name):
- item = getattr(self._obj, name)
- if isinstance(item, Signal):
- return self._sim.rd(item)
- elif isinstance(item, list):
- return [Proxy(self._sim, si) for si in item]
- else:
- return Proxy(self._sim, item)
-
- def __setattr__(self, name, value):
- item = getattr(self._obj, name)
- assert(isinstance(item, Signal))
- self._sim.wr(item, value)
+def run_simulation(fragment, ncycles=None, vcd_name=None, keep_files=False):
+ with Simulator(fragment, TopLevel(vcd_name), icarus.Runner(keep_files=keep_files)) as s:
+ s.run(ncycles)
--- /dev/null
+from migen.fhdl.structure import Signal, StopSimulation
+from migen.fhdl.specials import Memory
+
+class MemoryProxy:
+ def __init__(self, simulator, obj):
+ self.simulator = simulator
+ self._simproxy_obj = obj
+
+ def __getitem__(self, key):
+ if isinstance(key, int):
+ return self.simulator.rd(self._simproxy_obj, key)
+ else:
+ start, stop, step = key.indices(self._simproxy_obj.depth)
+ return [self.simulator.rd(self._simproxy_obj, i) for i in range(start, stop, step)]
+
+ def __setitem__(self, key, value):
+ if isinstance(key, int):
+ self.simulator.wr(self._simproxy_obj, key, value)
+ else:
+ start, stop, step = key.indices(self.__obj.depth)
+ if len(value) != (stop - start)//step:
+ raise ValueError
+ for i, v in zip(range(start, stop, step), value):
+ self.simulator.wr(self._simproxy_obj, i, v)
+
+class Proxy:
+ def __init__(self, simulator, obj):
+ object.__setattr__(self, "simulator", simulator)
+ object.__setattr__(self, "_simproxy_obj", obj)
+
+ def __process_get(self, item):
+ if isinstance(item, Signal):
+ return self.simulator.rd(item)
+ elif isinstance(item, Memory):
+ return MemoryProxy(self.simulator, item)
+ else:
+ return Proxy(self.simulator, item)
+
+ def __getattr__(self, name):
+ return self.__process_get(getattr(self._simproxy_obj, name))
+
+ def __setattr__(self, name, value):
+ item = getattr(self._simproxy_obj, name)
+ assert(isinstance(item, Signal))
+ self.simulator.wr(item, value)
+
+ def __getitem__(self, key):
+ return self.__process_get(self._simproxy_obj[key])
+
+ def __setitem__(self, key, value):
+ item = self._simproxy_obj[key]
+ assert(isinstance(item, Signal))
+ self.simulator.wr(item, value)
+
+class GenSim:
+ def __init__(self, simg):
+ self.simg = simg
+ self.gens = dict()
+ self.resume_cycle = 0
+
+ def do_simulation(self, s):
+ if isinstance(s, Proxy):
+ simulator = s.simulator
+ else:
+ simulator = s
+
+ if simulator.cycle_counter >= self.resume_cycle:
+ try:
+ gen = self.gens[simulator]
+ except KeyError:
+ gen = self.simg(s)
+ self.gens[simulator] = gen
+ try:
+ n = next(gen)
+ except StopIteration:
+ del self.gens[simulator]
+ raise StopSimulation
+ else:
+ if n is None:
+ n = 1
+ self.resume_cycle = simulator.cycle_counter + n
+
+class ProxySim:
+ def __init__(self, target, simf):
+ self.target = target
+ self.simf = simf
+ self.proxies = dict()
+
+ def do_simulation(self, simulator):
+ try:
+ proxy = self.proxies[simulator]
+ except KeyError:
+ proxy = Proxy(simulator, self.target)
+ self.proxies[simulator] = proxy
+ try:
+ self.simf(proxy)
+ except StopSimulation:
+ del self.proxies[simulator]
+ raise
-import unittest
from migen.fhdl.std import *
-from migen.sim.generic import Simulator
+from migen.sim.generic import run_simulation
from migen.fhdl import verilog
class SimBench(Module):
callback = None
- def do_simulation(self, s):
+ def do_simulation(self, selfp):
if self.callback is not None:
- return self.callback(self, s)
+ return self.callback(self, selfp)
class SimCase:
TestBench = SimBench
def test_to_verilog(self):
verilog.convert(self.tb)
- def run_with(self, cb, cycles=-1):
+ def run_with(self, cb, ncycles=-1):
self.tb.callback = cb
- with Simulator(self.tb) as s:
- s.run(cycles)
+ run_simulation(self.tb, ncycles=ncycles)
def test_run_sequence(self):
seq = list(range(1<<8))
- cur = None
- def cb(tb, s):
+ def cb(tb, tbp):
if seq:
- s.wr(tb.dut.i, seq.pop(0))
- i = s.rd(tb.dut.i)
- if s.rd(tb.dut.n):
- self.assertNotIn(i, [1<<i for i in range(8)])
+ tbp.dut.i = seq.pop(0)
+ if tbp.dut.n:
+ self.assertNotIn(tbp.dut.i, [1<<i for i in range(8)])
else:
- o = s.rd(tb.dut.o)
- self.assertEqual(i, 1<<o)
+ self.assertEqual(tbp.dut.i, 1<<tbp.dut.o)
self.run_with(cb, 256)
class PrioEncCase(SimCase, unittest.TestCase):
def test_run_sequence(self):
seq = list(range(1<<8))
- cur = None
- def cb(tb, s):
+ def cb(tb, tbp):
if seq:
- s.wr(tb.dut.i, seq.pop(0))
- i = s.rd(tb.dut.i)
- if s.rd(tb.dut.n):
+ tbp.dut.i = seq.pop(0)
+ i = tbp.dut.i
+ if tbp.dut.n:
self.assertEqual(i, 0)
else:
- o = s.rd(tb.dut.o)
+ o = tbp.dut.o
if o > 0:
self.assertEqual(i & 1<<(o - 1), 0)
self.assertGreaterEqual(i, 1<<o)
def test_run_sequence(self):
seq = list(range(8*2))
- cur = None
- def cb(tb, s):
+ def cb(tb, tbp):
if seq:
i = seq.pop()
- s.wr(tb.dut.i, i//2)
- s.wr(tb.dut.n, i%2)
- i = s.rd(tb.dut.i)
- o = s.rd(tb.dut.o)
- if s.rd(tb.dut.n):
+ tbp.dut.i = i//2
+ tbp.dut.n = i%2
+ i = tbp.dut.i
+ o = tbp.dut.o
+ if tbp.dut.n:
self.assertEqual(o, 0)
else:
self.assertEqual(o, 1<<i)
from migen.test.support import SimCase, SimBench
-
class CordicCase(SimCase, unittest.TestCase):
class TestBench(SimBench):
def __init__(self, **kwargs):
zm = self.tb.dut.zmax
pipe = {}
genn = [gen() for i in range(n)]
- def cb(tb, s):
- if s.rd(tb.dut.new_in):
+ def cb(tb, tbp):
+ if tbp.dut.new_in:
if genn:
xi, yi, zi = genn.pop(0)
else:
- s.interrupt = True
- return
+ raise StopSimulation
xi = floor(xi*c/g)
yi = floor(yi*c/g)
zi = floor(zi*c/zm)
- s.wr(tb.dut.xi, xi)
- s.wr(tb.dut.yi, yi)
- s.wr(tb.dut.zi, zi)
- pipe[s.cycle_counter] = xi, yi, zi
- if s.rd(tb.dut.new_out):
- t = s.cycle_counter - tb.dut.latency - 1
+ tbp.dut.xi = xi
+ tbp.dut.yi = yi
+ tbp.dut.zi = zi
+ pipe[tbp.simulator.cycle_counter] = xi, yi, zi
+ if tbp.dut.new_out:
+ t = tbp.simulator.cycle_counter - tb.dut.latency - 1
if t < 1:
return
xi, yi, zi = pipe.pop(t)
import unittest
from migen.fhdl.std import *
-from migen.genlib.fifo import SyncFIFO, AsyncFIFO
+from migen.genlib.fifo import SyncFIFO
from migen.test.support import SimCase, SimBench
def test_run_sequence(self):
seq = list(range(20))
- def cb(tb, s):
+ def cb(tb, tbp):
# fire re and we at "random"
- s.wr(tb.dut.we, s.cycle_counter % 2 == 0)
- s.wr(tb.dut.re, s.cycle_counter % 3 == 0)
+ tbp.dut.we = tbp.simulator.cycle_counter % 2 == 0
+ tbp.dut.re = tbp.simulator.cycle_counter % 3 == 0
# the output if valid must be correct
- if s.rd(tb.dut.readable) and s.rd(tb.dut.re):
+ if tbp.dut.readable and tbp.dut.re:
i = seq.pop(0)
- self.assertEqual(s.rd(tb.dut.dout.a), i)
- self.assertEqual(s.rd(tb.dut.dout.b), i*2)
+ self.assertEqual(tbp.dut.dout.a, i)
+ self.assertEqual(tbp.dut.dout.b, i*2)
self.run_with(cb, 20)
self.a = Signal((3, True))
self.b = Signal((4, True))
comps = [
- lambda p, q: p > q,
- lambda p, q: p >= q,
- lambda p, q: p < q,
- lambda p, q: p <= q,
- lambda p, q: p == q,
- lambda p, q: p != q,
- ]
+ lambda p, q: p > q,
+ lambda p, q: p >= q,
+ lambda p, q: p < q,
+ lambda p, q: p <= q,
+ lambda p, q: p == q,
+ lambda p, q: p != q,
+ ]
self.vals = []
for asign in 1, -1:
for bsign in 1, -1:
values = range(-4, 4)
agen = iter(values)
bgen = iter(values)
- def cb(tb, s):
+ def cb(tb, tbp):
try:
- s.wr(self.tb.a, next(agen))
- s.wr(self.tb.b, next(bgen))
+ tbp.a = next(agen)
+ tbp.b = next(bgen)
except StopIteration:
- s.interrupt = True
- a = s.rd(self.tb.a)
- b = s.rd(self.tb.b)
+ raise StopSimulation
+ a = tbp.a
+ b = tbp.b
for asign, bsign, f, r, op in self.tb.vals:
- r, r0 = s.rd(r), f(asign*a, bsign*b)
+ r, r0 = tbp.simulator.rd(r), f(asign*a, bsign*b)
self.assertEqual(r, int(r0),
"got {}, want {}*{} {} {}*{} = {}".format(
r, asign, a, op, bsign, b, r0))
self.assertEqual(flen(self.tb.dut.o[i]), 4)
def test_sort(self):
- def cb(tb, s):
+ def cb(tb, tbp):
for i in tb.dut.i:
- s.wr(i, randrange(1<<flen(i)))
- i = [s.rd(i) for i in tb.dut.i]
- o = [s.rd(o) for o in tb.dut.o]
- self.assertEqual(sorted(i), o)
+ tbp.simulator.wr(i, randrange(1<<flen(i)))
+ self.assertEqual(sorted(list(tbp.dut.i)), list(tbp.dut.o))
self.run_with(cb, 20)