.. include:: ../examples/fir.py
:code: python
-Wishbone
-========
-.. include:: ../examples/wb_initiator.py
+Abstract bus transactions
+=========================
+.. include:: ../examples/abstract_transactions.py
:code: python
--- /dev/null
+# Copyright (C) 2012 Vermeer Manufacturing Co.
+# License: GPLv3 with additional permissions (see README).
+
+from random import Random
+
+from migen.fhdl.structure import *
+from migen.fhdl import autofragment
+from migen.bus.transactions import *
+from migen.bus import wishbone, asmibus
+from migen.sim.generic import Simulator
+from migen.sim.icarus import Runner
+
+# Our bus master.
+# Python generators let us program bus transactions in an elegant sequential style.
+def my_generator():
+ prng = Random(92837)
+
+ # Write to the first addresses.
+ for x in range(10):
+ t = TWrite(x, 2*x)
+ yield t
+ print("Wrote in " + str(t.latency) + " cycle(s)")
+ # Insert some dead cycles to simulate bus inactivity.
+ for delay in range(prng.randrange(0, 3)):
+ yield None
+
+ # Read from the first addresses.
+ for x in range(10):
+ t = TRead(x)
+ yield t
+ print("Read " + str(t.data) + " in " + str(t.latency) + " cycle(s)")
+ for delay in range(prng.randrange(0, 3)):
+ yield None
+
+# Our bus slave.
+class MyModel:
+ def read(self, address):
+ return address + 4
+
+class MyModelWB(MyModel, wishbone.TargetModel):
+ def __init__(self):
+ self.prng = Random(763627)
+
+ def can_ack(self, bus):
+ # Simulate variable latency.
+ return self.prng.randrange(0, 2)
+
+class MyModelASMI(MyModel, asmibus.TargetModel):
+ pass
+
+def test_wishbone():
+ print("*** Wishbone test")
+
+ # The "wishbone.Initiator" library component runs our generator
+ # and manipulates the bus signals accordingly.
+ master = wishbone.Initiator(my_generator())
+ # The "wishbone.Target" library component examines the bus signals
+ # and calls into our model object.
+ slave = wishbone.Target(MyModelWB())
+ # The "wishbone.Tap" library component examines the bus at the slave port
+ # and displays the transactions on the console (<TRead...>/<TWrite...>).
+ tap = wishbone.Tap(slave.bus)
+ # Connect the master to the slave.
+ intercon = wishbone.InterconnectPointToPoint(master.bus, slave.bus)
+ # A small extra simulation function to terminate the process when
+ # the initiator is done (i.e. our generator is exhausted).
+ def end_simulation(s):
+ s.interrupt = master.done
+ fragment = autofragment.from_local() + Fragment(sim=[end_simulation])
+ sim = Simulator(fragment, Runner())
+ sim.run()
+
+def test_asmi():
+ print("*** ASMI test")
+
+ # Create a hub with one port for our initiator.
+ hub = asmibus.Hub(32, 32)
+ port = hub.get_port()
+ hub.finalize()
+ # Create the initiator, target and tap (similar to the Wishbone case).
+ master = asmibus.Initiator(port, my_generator())
+ slave = asmibus.Target(hub, MyModelASMI())
+ tap = asmibus.Tap(hub)
+ # Run the simulation (same as the Wishbone case).
+ def end_simulation(s):
+ s.interrupt = master.done
+ fragment = autofragment.from_local() + Fragment(sim=[end_simulation])
+ sim = Simulator(fragment, Runner())
+ sim.run()
+
+test_wishbone()
+test_asmi()
+
+# Output:
+# <TWrite adr:0x0 dat:0x0>
+# Wrote in 0 cycle(s)
+# <TWrite adr:0x1 dat:0x2>
+# Wrote in 0 cycle(s)
+# <TWrite adr:0x2 dat:0x4>
+# Wrote in 0 cycle(s)
+# <TWrite adr:0x3 dat:0x6>
+# Wrote in 1 cycle(s)
+# <TWrite adr:0x4 dat:0x8>
+# Wrote in 1 cycle(s)
+# <TWrite adr:0x5 dat:0xa>
+# Wrote in 2 cycle(s)
+# ...
+# <TRead adr:0x0 dat:0x4>
+# Read 4 in 2 cycle(s)
+# <TRead adr:0x1 dat:0x5>
+# Read 5 in 2 cycle(s)
+# <TRead adr:0x2 dat:0x6>
+# Read 6 in 1 cycle(s)
+# <TRead adr:0x3 dat:0x7>
+# Read 7 in 1 cycle(s)
+# ...
+++ /dev/null
-# Copyright (C) 2012 Vermeer Manufacturing Co.
-# License: GPLv3 with additional permissions (see README).
-
-from random import Random
-
-from migen.fhdl.structure import *
-from migen.fhdl import autofragment
-from migen.bus.transactions import *
-from migen.bus import wishbone
-from migen.sim.generic import Simulator
-from migen.sim.icarus import Runner
-
-# Our bus master.
-# Python generators let us program bus transactions in an elegant sequential style.
-def my_generator():
- prng = Random(92837)
-
- # Write to the first addresses.
- for x in range(10):
- t = TWrite(x, 2*x)
- yield t
- print("Wrote in " + str(t.latency) + " cycle(s)")
- # Insert some dead cycles to simulate bus inactivity.
- for delay in range(prng.randrange(0, 3)):
- yield None
-
- # Read from the first addresses.
- for x in range(10):
- t = TRead(x)
- yield t
- print("Read " + str(t.data) + " in " + str(t.latency) + " cycle(s)")
- for delay in range(prng.randrange(0, 3)):
- yield None
-
-# Our bus slave.
-class MyModel(wishbone.TargetModel):
- def __init__(self):
- self.prng = Random(763627)
-
- def read(self, address):
- return address + 4
-
- def can_ack(self, bus):
- return self.prng.randrange(0, 2)
-
-def main():
- # The "wishbone.Initiator" library component runs our generator
- # and manipulates the bus signals accordingly.
- master = wishbone.Initiator(my_generator())
- # The "wishbone.Target" library component examines the bus signals
- # and calls into our model object.
- slave = wishbone.Target(MyModel())
- # The "wishbone.Tap" library component examines the bus at the slave port
- # and displays the transactions on the console (<TRead...>/<TWrite...>).
- tap = wishbone.Tap(slave.bus)
- # Connect the master to the slave.
- intercon = wishbone.InterconnectPointToPoint(master.bus, slave.bus)
- # A small extra simulation function to terminate the process when
- # the initiator is done (i.e. our generator is exhausted).
- def end_simulation(s):
- s.interrupt = master.done
- fragment = autofragment.from_local() + Fragment(sim=[end_simulation])
- sim = Simulator(fragment, Runner())
- sim.run()
-
-main()
-
-# Output:
-# <TWrite adr:0x0 dat:0x0>
-# Wrote in 0 cycle(s)
-# <TWrite adr:0x1 dat:0x2>
-# Wrote in 0 cycle(s)
-# <TWrite adr:0x2 dat:0x4>
-# Wrote in 0 cycle(s)
-# <TWrite adr:0x3 dat:0x6>
-# Wrote in 1 cycle(s)
-# <TWrite adr:0x4 dat:0x8>
-# Wrote in 1 cycle(s)
-# <TWrite adr:0x5 dat:0xa>
-# Wrote in 2 cycle(s)
-# ...
-# <TRead adr:0x0 dat:0x4>
-# Read 4 in 2 cycle(s)
-# <TRead adr:0x1 dat:0x5>
-# Read 5 in 2 cycle(s)
-# <TRead adr:0x2 dat:0x6>
-# Read 6 in 1 cycle(s)
-# <TRead adr:0x3 dat:0x7>
-# Read 7 in 1 cycle(s)
-# ...
from migen.fhdl.structure import *
from migen.corelogic.misc import optree
from migen.bus.transactions import *
+from migen.sim.generic import Proxy
class FinalizeError(Exception):
pass
]
return ports + Fragment(comb)
+class Tap:
+ def __init__(self, hub, handler=print):
+ self.hub = hub
+ self.handler = handler
+ self.tag_to_transaction = dict()
+ self.transaction = None
+
+ def do_simulation(self, s):
+ hub = Proxy(s, self.hub)
+
+ # Pull any data announced in the previous cycle.
+ if isinstance(self.transaction, TWrite):
+ self.transaction.data = hub.dat_w
+ self.transaction.sel = ~hub.dat_wm
+ self.handler(self.transaction)
+ self.transaction = None
+ if isinstance(self.transaction, TRead):
+ self.transaction.data = hub.dat_r
+ self.handler(self.transaction)
+ self.transaction = None
+
+ # Tag issue. Transaction objects are created here
+ # and placed into the tag_to_transaction dictionary.
+ for tag, slot in enumerate(self.hub.get_slots()):
+ if s.rd(slot.allocate):
+ adr = s.rd(slot.allocate_adr)
+ we = s.rd(slot.allocate_we)
+ if we:
+ transaction = TWrite(adr)
+ else:
+ transaction = TRead(adr)
+ transaction.latency = s.cycle_counter
+ self.tag_to_transaction[tag] = transaction
+
+ # Tag call.
+ if hub.call:
+ transaction = self.tag_to_transaction[hub.tag_call]
+ transaction.latency = s.cycle_counter - transaction.latency + 1
+ self.transaction = transaction
+
+ def get_fragment(self):
+ return Fragment(sim=[self.do_simulation])
+
class Initiator:
def __init__(self, port, generator):
self.port = port
def get_fragment(self):
return Fragment(sim=[self.do_simulation])
+
+class TargetModel:
+ def __init__(self):
+ self.last_slot = 0
+
+ def read(self, address):
+ return 0
+
+ def write(self, address, data, mask):
+ pass
+
+ # Round-robin scheduling.
+ def select_slot(self, pending_slots):
+ if not pending_slots:
+ return -1
+ self.last_slot += 1
+ if self.last_slot > max(pending_slots):
+ self.last_slot = 0
+ while self.last_slot not in pending_slots:
+ self.last_slot += 1
+ return self.last_slot
+
+class Target:
+ def __init__(self, hub, model):
+ self.hub = hub
+ self.model = model
+ self._calling_tag = -1
+ self._write_request_d = -1
+ self._write_request = -1
+ self._read_request = -1
+
+ def do_simulation(self, s):
+ slots = self.hub.get_slots()
+
+ # Data I/O
+ if self._write_request >= 0:
+ self.model.write(self._write_request,
+ s.rd(self.hub.dat_w), s.rd(self.hub.dat_wm))
+ if self._read_request >= 0:
+ s.wr(self.hub.dat_r, self.model.read(self._read_request))
+
+ # Request pipeline
+ self._read_request = -1
+ self._write_request = self._write_request_d
+ self._write_request_d = -1
+
+ # Examine pending slots and possibly choose one.
+ # Note that we do not use the SLOT_PROCESSING state here.
+ # Selected slots are immediately called.
+ pending_slots = set()
+ for tag, slot in enumerate(slots):
+ if tag != self._calling_tag and s.rd(slot.state) == SLOT_PENDING:
+ pending_slots.add(tag)
+ slot_to_call = self.model.select_slot(pending_slots)
+
+ # Call slot.
+ if slot_to_call >= 0:
+ slot = slots[slot_to_call]
+ s.wr(self.hub.call, 1)
+ s.wr(self.hub.tag_call, slot_to_call)
+ self._calling_tag = slot_to_call
+ if s.rd(slot.we):
+ self._write_request_d = s.rd(slot.adr)
+ else:
+ self._read_request = s.rd(slot.adr)
+ else:
+ s.wr(self.hub.call, 0)
+ self._calling_tag = -1
+
+ def get_fragment(self):
+ return Fragment(sim=[self.do_simulation])