doc: more examples and comments
[litex.git] / examples / wb_initiator.py
1 from random import Random
2
3 from migen.fhdl.structure import *
4 from migen.fhdl import autofragment
5 from migen.bus.transactions import *
6 from migen.bus import wishbone
7 from migen.sim.generic import Simulator
8 from migen.sim.icarus import Runner
9
10 # Our bus master.
11 # Python generators let us program bus transactions in an elegant sequential style.
12 def my_generator():
13 prng = Random(92837)
14
15 # Write to the first addresses.
16 for x in range(10):
17 t = TWrite(x, 2*x)
18 yield t
19 print("Wrote in " + str(t.latency) + " cycle(s)")
20 # Insert some dead cycles to simulate bus inactivity.
21 for delay in range(prng.randrange(0, 3)):
22 yield None
23
24 # Read from the first addresses.
25 for x in range(10):
26 t = TRead(x)
27 yield t
28 print("Read " + str(t.data) + " in " + str(t.latency) + " cycle(s)")
29 for delay in range(prng.randrange(0, 3)):
30 yield None
31
32 # Our bus slave.
33 # All transactions complete with a random delay.
34 # Reads return address + 4. Writes are simply acknowledged.
35 class MyPeripheral:
36 def __init__(self):
37 self.bus = wishbone.Interface()
38 self.ack_en = Signal()
39 self.prng = Random(763627)
40
41 def do_simulation(self, s):
42 # Only authorize acks on certain cycles to simulate variable latency.
43 s.wr(self.ack_en, self.prng.randrange(0, 2))
44
45 def get_fragment(self):
46 comb = [
47 self.bus.ack.eq(self.bus.cyc & self.bus.stb & self.ack_en),
48 self.bus.dat_r.eq(self.bus.adr + 4)
49 ]
50 return Fragment(comb, sim=[self.do_simulation])
51
52 def main():
53 # The "wishbone.Initiator" library component runs our generator
54 # and manipulates the bus signals accordingly.
55 master = wishbone.Initiator(my_generator())
56 # Our slave.
57 slave = MyPeripheral()
58 # The "wishbone.Tap" library component examines the bus at the slave port
59 # and displays the transactions on the console (<TRead...>/<TWrite...>).
60 tap = wishbone.Tap(slave.bus)
61 # Connect the master to the slave.
62 intercon = wishbone.InterconnectPointToPoint(master.bus, slave.bus)
63 # A small extra simulation function to terminate the process when
64 # the initiator is done (i.e. our generator is exhausted).
65 def end_simulation(s):
66 s.interrupt = master.done
67 fragment = autofragment.from_local() + Fragment(sim=[end_simulation])
68 sim = Simulator(fragment, Runner())
69 sim.run()
70
71 main()
72
73 # Output:
74 # <TWrite adr:0x0 dat:0x0>
75 # Wrote in 0 cycle(s)
76 # <TWrite adr:0x1 dat:0x2>
77 # Wrote in 0 cycle(s)
78 # <TWrite adr:0x2 dat:0x4>
79 # Wrote in 0 cycle(s)
80 # <TWrite adr:0x3 dat:0x6>
81 # Wrote in 1 cycle(s)
82 # <TWrite adr:0x4 dat:0x8>
83 # Wrote in 1 cycle(s)
84 # <TWrite adr:0x5 dat:0xa>
85 # Wrote in 2 cycle(s)
86 # ...
87 # <TRead adr:0x0 dat:0x4>
88 # Read 4 in 2 cycle(s)
89 # <TRead adr:0x1 dat:0x5>
90 # Read 5 in 2 cycle(s)
91 # <TRead adr:0x2 dat:0x6>
92 # Read 6 in 1 cycle(s)
93 # <TRead adr:0x3 dat:0x7>
94 # Read 7 in 1 cycle(s)
95 # ...