+++ /dev/null
-from random import Random
-
-from migen.fhdl.std import *
-from migen.bus.transactions import *
-from migen.bus import wishbone
-from migen.sim.generic import run_simulation
-
-
-# Our bus master.
-# Python generators let us program bus transactions in an elegant sequential style.
-def my_generator():
- prng = Random(92837)
-
- # Write to the first addresses.
- for x in range(10):
- t = TWrite(x, 2*x)
- yield t
- print("Wrote in " + str(t.latency) + " cycle(s)")
- # Insert some dead cycles to simulate bus inactivity.
- for delay in range(prng.randrange(0, 3)):
- yield None
-
- # Read from the first addresses.
- for x in range(10):
- t = TRead(x)
- yield t
- print("Read " + str(t.data) + " in " + str(t.latency) + " cycle(s)")
- for delay in range(prng.randrange(0, 3)):
- yield None
-
-
-# Our bus slave.
-class MyModelWB(wishbone.TargetModel):
- def __init__(self):
- self.prng = Random(763627)
-
- def read(self, address):
- return address + 4
-
- def can_ack(self, bus):
- # Simulate variable latency.
- return self.prng.randrange(0, 2)
-
-
-class TB(Module):
- def __init__(self):
- # The "wishbone.Initiator" library component runs our generator
- # and manipulates the bus signals accordingly.
- self.submodules.master = wishbone.Initiator(my_generator())
- # The "wishbone.Target" library component examines the bus signals
- # and calls into our model object.
- self.submodules.slave = wishbone.Target(MyModelWB())
- # The "wishbone.Tap" library component examines the bus at the slave port
- # and displays the transactions on the console (<TRead...>/<TWrite...>).
- self.submodules.tap = wishbone.Tap(self.slave.bus)
- # Connect the master to the slave.
- self.submodules.intercon = wishbone.InterconnectPointToPoint(self.master.bus, self.slave.bus)
-
-if __name__ == "__main__":
- run_simulation(TB())
-
-# Output:
-# <TWrite adr:0x0 dat:0x0>
-# Wrote in 0 cycle(s)
-# <TWrite adr:0x1 dat:0x2>
-# Wrote in 0 cycle(s)
-# <TWrite adr:0x2 dat:0x4>
-# Wrote in 0 cycle(s)
-# <TWrite adr:0x3 dat:0x6>
-# Wrote in 1 cycle(s)
-# <TWrite adr:0x4 dat:0x8>
-# Wrote in 1 cycle(s)
-# <TWrite adr:0x5 dat:0xa>
-# Wrote in 2 cycle(s)
-# ...
-# <TRead adr:0x0 dat:0x4>
-# Read 4 in 2 cycle(s)
-# <TRead adr:0x1 dat:0x5>
-# Read 5 in 2 cycle(s)
-# <TRead adr:0x2 dat:0x6>
-# Read 6 in 1 cycle(s)
-# <TRead adr:0x3 dat:0x7>
-# Read 7 in 1 cycle(s)
-# ...
from migen.fhdl.std import *
-from migen.sim.generic import run_simulation
+from migen.sim import Simulator
-# Our simple counter, which increments at every cycle
-# and prints its current value in simulation.
+# Our simple counter, which increments at every cycle.
class Counter(Module):
def __init__(self):
self.count = Signal(4)
# We do it with convertible/synthesizable FHDL code.
self.sync += self.count.eq(self.count + 1)
- # This function will be called at every cycle.
- def do_simulation(self, selfp):
- # Simply read the count signal and print it.
- # The output is:
- # Count: 0
- # Count: 1
- # Count: 2
- # ...
- print("Count: " + str(selfp.count))
+
+# Simply read the count signal and print it.
+# The output is:
+# Count: 0
+# Count: 1
+# Count: 2
+# ...
+def counter_test(dut):
+ for i in range(20):
+ print((yield dut.count)) # read and print
+ yield # next clock cycle
+ # simulation ends with this generator
+
if __name__ == "__main__":
dut = Counter()
- # Since we do not use StopSimulation, limit the simulation
- # to some number of cycles.
- run_simulation(dut, ncycles=20)
+ Simulator(dut, counter_test(dut)).run()
from migen.fhdl.std import *
-from migen.sim.generic import run_simulation
+from migen.sim import Simulator
# A slightly more elaborate counter.
self.sync += If(self.ce, self.count.eq(self.count + 1))
- def do_simulation(self, selfp):
+
+def counter_test(dut):
+ for cycle in range(20):
# Only assert CE every second cycle.
# => each counter value is held for two cycles.
- if selfp.simulator.cycle_counter % 2:
- selfp.ce = 0 # This is how you write to a signal.
+ if cycle % 2:
+ yield dut.ce, 0 # This is how you write to a signal.
else:
- selfp.ce = 1
- print("Cycle: " + str(selfp.simulator.cycle_counter) + " Count: " + \
- str(selfp.count))
+ yield dut.ce, 1
+ print("Cycle: {} Count: {}".format(cycle, (yield dut.count)))
+ yield
# Output is:
# Cycle: 0 Count: -5
if __name__ == "__main__":
dut = Counter()
- # Demonstrate VCD output
- run_simulation(dut, vcd_name="my.vcd", ncycles=20)
+ Simulator(dut, counter_test(dut)).run()
+++ /dev/null
-from migen.fhdl.std import *
-from migen.flow.actor import *
-from migen.flow.transactions import *
-from migen.flow.network import *
-from migen.actorlib.sim import *
-from migen.sim.generic import run_simulation
-
-
-def source_gen():
- for i in range(10):
- print("Sending: " + str(i))
- yield Token("source", {"value": i})
-
-
-class SimSource(SimActor):
- def __init__(self):
- self.source = Source([("value", 32)])
- SimActor.__init__(self, source_gen())
-
-
-def sink_gen():
- while True:
- t = Token("sink")
- yield t
- print("Received: " + str(t.value["value"]))
-
-
-class SimSink(SimActor):
- def __init__(self):
- self.sink = Sink([("value", 32)])
- SimActor.__init__(self, sink_gen())
-
-
-class TB(Module):
- def __init__(self):
- self.source = SimSource()
- self.sink = SimSink()
- g = DataFlowGraph()
- g.add_connection(self.source, self.sink)
- self.submodules.comp = CompositeActor(g)
-
- def do_simulation(self, selfp):
- if self.source.token_exchanger.done:
- raise StopSimulation
-
-if __name__ == "__main__":
- run_simulation(TB())
from migen.fhdl.std import *
from migen.fhdl import verilog
-from migen.sim.generic import run_simulation
+from migen.sim import Simulator
from functools import reduce
from operator import add
muls.append(c_fp*sreg)
sum_full = Signal((2*self.wsize-1, True))
self.sync += sum_full.eq(reduce(add, muls))
- self.comb += self.o.eq(sum_full[self.wsize-1:])
+ self.comb += self.o.eq(sum_full >> self.wsize-1)
# A test bench for our FIR filter.
# Generates a sine wave at the input and records the output.
-class TB(Module):
- def __init__(self, coef, frequency):
- self.submodules.fir = FIR(coef)
- self.frequency = frequency
- self.inputs = []
- self.outputs = []
+def fir_tb(dut, frequency, inputs, outputs):
+ f = 2**(dut.wsize - 1)
+ for cycle in range(200):
+ v = 0.1*cos(2*pi*frequency*cycle)
+ yield dut.i, int(f*v)
+ inputs.append(v)
+ outputs.append((yield dut.o)/f)
+ yield
- def do_simulation(self, selfp):
- f = 2**(self.fir.wsize - 1)
- v = 0.1*cos(2*pi*self.frequency*selfp.simulator.cycle_counter)
- selfp.fir.i = int(f*v)
- self.inputs.append(v)
- self.outputs.append(selfp.fir.o/f)
if __name__ == "__main__":
# Compute filter coefficients with SciPy.
in_signals = []
out_signals = []
for frequency in [0.05, 0.1, 0.25]:
- tb = TB(coef, frequency)
- run_simulation(tb, ncycles=200)
- in_signals += tb.inputs
- out_signals += tb.outputs
+ dut = FIR(coef)
+ tb = fir_tb(dut, frequency, in_signals, out_signals)
+ Simulator(dut, tb).run()
# Plot data from the input and output waveforms.
plt.plot(in_signals)
self.modifications.clear()
return r
- def _eval(self, node):
+ def eval(self, node):
if isinstance(node, (int, bool)):
return node
elif isinstance(node, Signal):
except KeyError:
return node.reset
elif isinstance(node, _Operator):
- operands = [self._eval(o) for o in node.operands]
+ operands = [self.eval(o) for o in node.operands]
if node.op == "-":
if len(operands) == 1:
return -operands[0]
# TODO: Cat, Slice, Array, ClockSignal, ResetSignal, Memory
raise NotImplementedError
+ def assign(self, signal, value):
+ value = value & (2**signal.nbits - 1)
+ if signal.signed and (value & 2**(signal.nbits - 1)):
+ value -= 2**signal.nbits
+ self.modifications[signal] = value
+
def execute(self, statements):
for s in statements:
if isinstance(s, _Assign):
- value = self._eval(s.r)
+ value = self.eval(s.r)
if isinstance(s.l, Signal):
- value = value & (2**s.l.nbits - 1)
- if s.l.signed and (value & 2**(s.l.nbits - 1)):
- value -= 2**s.l.nbits
- self.modifications[s.l] = value
+ self.assign(s.l, value)
else:
# TODO: Cat, Slice, Array, ClockSignal, ResetSignal, Memory
raise NotImplementedError
elif isinstance(s, If):
- if self._eval(s.cond):
+ if self.eval(s.cond):
self.execute(s.t)
else:
self.execute(s.f)
self.evaluator.execute(self.comb_dependent_statements[signal])
modified = self.evaluator.commit()
+ def _process_generators(self, cd):
+ if cd in self.generators:
+ exhausted = []
+ for generator in self.generators[cd]:
+ reply = None
+ while True:
+ try:
+ request = generator.send(reply)
+ if request is None:
+ break # next cycle
+ elif isinstance(request, tuple):
+ self.evaluator.assign(*request)
+ else:
+ reply = self.evaluator.eval(request)
+ except StopIteration:
+ exhausted.append(generator)
+ break
+ for generator in exhausted:
+ self.generators[cd].remove(generator)
+
def _continue_simulation(self):
# TODO: passive generators
return any(self.generators.values())
self._comb_propagate(self.evaluator.commit())
while True:
- print(self.evaluator.signal_values)
cds = self.time.tick()
for cd in cds:
self.evaluator.execute(self.fragment.sync[cd])
- self._comb_propagate(self.evaluator.commit())
+ self._process_generators(cd)
+ self._comb_propagate(self.evaluator.commit())
+
if not self._continue_simulation():
break