examples/dataflow/dma: refactor
authorSebastien Bourdeauducq <sebastien@milkymist.org>
Tue, 12 Jun 2012 17:55:57 +0000 (19:55 +0200)
committerSebastien Bourdeauducq <sebastien@milkymist.org>
Tue, 12 Jun 2012 17:55:57 +0000 (19:55 +0200)
examples/dataflow/dma.py

index 9b2491ec29075315fdd550afa81b9ad09d8fe09e..100055f3387a9009d23bd2f88d86cb47f4cb8431 100644 (file)
@@ -5,20 +5,24 @@ from migen.flow.ala import *
 from migen.flow.network import *
 from migen.actorlib import dma_wishbone
 from migen.actorlib.sim import *
-from migen.bus import wishbone
+from migen.bus import wishbone, asmibus
 from migen.sim.generic import Simulator
 from migen.sim.icarus import Runner
 
-class MyModel(wishbone.TargetModel):
-       def __init__(self):
-               self.prng = Random(763627)
-       
+class MyModel:
        def read(self, address):
                return address + 4
-       
+
+class MyModelWB(MyModel, wishbone.TargetModel):
+       def __init__(self):
+               self.prng = Random(763627)
+
        def can_ack(self, bus):
                return self.prng.randrange(0, 2)
 
+class MyModelASMI(MyModel, asmibus.TargetModel):
+       pass
+
 def adrgen_gen():
        for i in range(10):
                print("Address:  " + str(i))
@@ -30,8 +34,29 @@ def dumper_gen():
                yield t
                print("Received: " + str(t.value["d"]))
 
-def test_reader():
-       print("*** Testing reader")
+def trgen_gen():
+       for i in range(10):
+               a = i
+               d = i+10
+               print("Address: " + str(a) + " Data: " + str(d))
+               yield Token("address_data", {"a": a, "d": d})
+
+def wishbone_sim(efragment, master, end_simulation):
+       peripheral = wishbone.Target(MyModelWB())
+       tap = wishbone.Tap(peripheral.bus)
+       interconnect = wishbone.InterconnectPointToPoint(master.bus, peripheral.bus)
+
+       fragment = efragment \
+               + peripheral.get_fragment() \
+               + tap.get_fragment() \
+               + interconnect.get_fragment() \
+               + Fragment(sim=[end_simulation])
+       
+       sim = Simulator(fragment, Runner())
+       sim.run()
+
+def test_wb_reader():
+       print("*** Testing Wishbone reader")
        adrgen = SimActor(adrgen_gen(), ("address", Source, [("a", BV(30))]))
        reader = dma_wishbone.Reader()
        dumper = SimActor(dumper_gen(), ("data", Sink, [("d", BV(32))]))
@@ -40,50 +65,21 @@ def test_reader():
        g.add_connection(reader, dumper)
        comp = CompositeActor(g)
        
-       peripheral = wishbone.Target(MyModel())
-       interconnect = wishbone.InterconnectPointToPoint(reader.bus, peripheral.bus)
-       
        def end_simulation(s):
                s.interrupt = adrgen.done and not s.rd(comp.busy)
-       
-       fragment = comp.get_fragment() \
-               + peripheral.get_fragment() \
-               + interconnect.get_fragment() \
-               + Fragment(sim=[end_simulation])
-       
-       sim = Simulator(fragment, Runner())
-       sim.run()
+       wishbone_sim(comp.get_fragment(), reader, end_simulation)
 
-def trgen_gen():
-       for i in range(10):
-               a = i
-               d = i+10
-               print("Address: " + str(a) + " Data: " + str(d))
-               yield Token("address_data", {"a": a, "d": d})
-       
-def test_writer():
-       print("*** Testing writer")
+def test_wb_writer():
+       print("*** Testing Wishbone writer")
        trgen = SimActor(trgen_gen(), ("address_data", Source, [("a", BV(30)), ("d", BV(32))]))
        writer = dma_wishbone.Writer()
        g = DataFlowGraph()
        g.add_connection(trgen, writer)
        comp = CompositeActor(g)
        
-       peripheral = wishbone.Target(MyModel())
-       tap = wishbone.Tap(peripheral.bus)
-       interconnect = wishbone.InterconnectPointToPoint(writer.bus, peripheral.bus)
-       
        def end_simulation(s):
                s.interrupt = trgen.done and not s.rd(comp.busy)
-       
-       fragment = comp.get_fragment() \
-               + peripheral.get_fragment() \
-               + tap.get_fragment() \
-               + interconnect.get_fragment() \
-               + Fragment(sim=[end_simulation])
-       
-       sim = Simulator(fragment, Runner())
-       sim.run()
+       wishbone_sim(comp.get_fragment(), writer, end_simulation)
 
-test_reader()
-test_writer()
+test_wb_reader()
+test_wb_writer()