From: Sebastien Bourdeauducq Date: Tue, 12 Jun 2012 17:55:57 +0000 (+0200) Subject: examples/dataflow/dma: refactor X-Git-Tag: 24jan2021_ls180~2099^2~921 X-Git-Url: https://git.libre-soc.org/?a=commitdiff_plain;h=3a58916a4ffc6917a634530e16337699963d9864;p=litex.git examples/dataflow/dma: refactor --- diff --git a/examples/dataflow/dma.py b/examples/dataflow/dma.py index 9b2491ec..100055f3 100644 --- a/examples/dataflow/dma.py +++ b/examples/dataflow/dma.py @@ -5,20 +5,24 @@ from migen.flow.ala import * from migen.flow.network import * from migen.actorlib import dma_wishbone from migen.actorlib.sim import * -from migen.bus import wishbone +from migen.bus import wishbone, asmibus from migen.sim.generic import Simulator from migen.sim.icarus import Runner -class MyModel(wishbone.TargetModel): - def __init__(self): - self.prng = Random(763627) - +class MyModel: def read(self, address): return address + 4 - + +class MyModelWB(MyModel, wishbone.TargetModel): + def __init__(self): + self.prng = Random(763627) + def can_ack(self, bus): return self.prng.randrange(0, 2) +class MyModelASMI(MyModel, asmibus.TargetModel): + pass + def adrgen_gen(): for i in range(10): print("Address: " + str(i)) @@ -30,8 +34,29 @@ def dumper_gen(): yield t print("Received: " + str(t.value["d"])) -def test_reader(): - print("*** Testing reader") +def trgen_gen(): + for i in range(10): + a = i + d = i+10 + print("Address: " + str(a) + " Data: " + str(d)) + yield Token("address_data", {"a": a, "d": d}) + +def wishbone_sim(efragment, master, end_simulation): + peripheral = wishbone.Target(MyModelWB()) + tap = wishbone.Tap(peripheral.bus) + interconnect = wishbone.InterconnectPointToPoint(master.bus, peripheral.bus) + + fragment = efragment \ + + peripheral.get_fragment() \ + + tap.get_fragment() \ + + interconnect.get_fragment() \ + + Fragment(sim=[end_simulation]) + + sim = Simulator(fragment, Runner()) + sim.run() + +def test_wb_reader(): + print("*** Testing Wishbone reader") adrgen = SimActor(adrgen_gen(), ("address", Source, [("a", BV(30))])) reader = dma_wishbone.Reader() dumper = SimActor(dumper_gen(), ("data", Sink, [("d", BV(32))])) @@ -40,50 +65,21 @@ def test_reader(): g.add_connection(reader, dumper) comp = CompositeActor(g) - peripheral = wishbone.Target(MyModel()) - interconnect = wishbone.InterconnectPointToPoint(reader.bus, peripheral.bus) - def end_simulation(s): s.interrupt = adrgen.done and not s.rd(comp.busy) - - fragment = comp.get_fragment() \ - + peripheral.get_fragment() \ - + interconnect.get_fragment() \ - + Fragment(sim=[end_simulation]) - - sim = Simulator(fragment, Runner()) - sim.run() + wishbone_sim(comp.get_fragment(), reader, end_simulation) -def trgen_gen(): - for i in range(10): - a = i - d = i+10 - print("Address: " + str(a) + " Data: " + str(d)) - yield Token("address_data", {"a": a, "d": d}) - -def test_writer(): - print("*** Testing writer") +def test_wb_writer(): + print("*** Testing Wishbone writer") trgen = SimActor(trgen_gen(), ("address_data", Source, [("a", BV(30)), ("d", BV(32))])) writer = dma_wishbone.Writer() g = DataFlowGraph() g.add_connection(trgen, writer) comp = CompositeActor(g) - peripheral = wishbone.Target(MyModel()) - tap = wishbone.Tap(peripheral.bus) - interconnect = wishbone.InterconnectPointToPoint(writer.bus, peripheral.bus) - def end_simulation(s): s.interrupt = trgen.done and not s.rd(comp.busy) - - fragment = comp.get_fragment() \ - + peripheral.get_fragment() \ - + tap.get_fragment() \ - + interconnect.get_fragment() \ - + Fragment(sim=[end_simulation]) - - sim = Simulator(fragment, Runner()) - sim.run() + wishbone_sim(comp.get_fragment(), writer, end_simulation) -test_reader() -test_writer() +test_wb_reader() +test_wb_writer()