Backport modifications from example's CRG
[gram.git] / gram / compat.py
1 # This file is Copyright (c) 2020 LambdaConcept <contact@lambdaconcept.com>
2
3 import unittest
4
5 from nmigen import *
6 from nmigen import tracer
7 from nmigen.compat import Case
8 from nmigen.back.pysim import *
9
10 __ALL__ = ["delayed_enter", "RoundRobin", "Timeline", "CSRPrefixProxy"]
11
12
13 def delayed_enter(m, src, dst, delay):
14 assert delay > 0
15
16 for i in range(delay):
17 if i == 0:
18 statename = src
19 else:
20 statename = "{}-{}".format(src, i)
21
22 if i == delay-1:
23 deststate = dst
24 else:
25 deststate = "{}-{}".format(src, i+1)
26
27 with m.State(statename):
28 m.next = deststate
29
30 class RoundRobin(Elaboratable):
31 """A round-robin scheduler. (HarryHo90sHK)
32 Parameters
33 ----------
34 n : int
35 Maximum number of requests to handle.
36 Attributes
37 ----------
38 request : Signal(n)
39 Signal where a '1' on the i-th bit represents an incoming request from the i-th device.
40 grant : Signal(range(n))
41 Signal that equals to the index of the device which is currently granted access.
42 stb : Signal()
43 Strobe signal to enable granting access to the next device requesting. Externally driven.
44 """
45
46 def __init__(self, n):
47 self.n = n
48 self.request = Signal(n)
49 self.grant = Signal(range(n))
50 self.stb = Signal()
51
52 def elaborate(self, platform):
53 m = Module()
54
55 if self.n == 1:
56 m.d.comb += self.grant.eq(0)
57 else:
58 with m.If(self.stb):
59 with m.Switch(self.grant):
60 for i in range(self.n):
61 with m.Case(i):
62 for j in reversed(range(i+1, i+self.n)):
63 # If i+1 <= j < n, then t == j; (after i)
64 # If n <= j < i+n, then t == j - n (before i)
65 t = j % self.n
66 with m.If(self.request[t]):
67 m.d.sync += self.grant.eq(t)
68
69 return m
70
71
72 class Timeline(Elaboratable):
73 def __init__(self, events):
74 self.trigger = Signal()
75 self._events = events
76
77 def elaborate(self, platform):
78 m = Module()
79
80 lastevent = max([e[0] for e in self._events])
81 counter = Signal(range(lastevent+1))
82
83 # Counter incrementation
84 # (with overflow handling)
85 if (lastevent & (lastevent + 1)) != 0:
86 with m.If(counter == lastevent):
87 m.d.sync += counter.eq(0)
88 with m.Else():
89 with m.If(counter != 0):
90 m.d.sync += counter.eq(counter+1)
91 with m.Elif(self.trigger):
92 m.d.sync += counter.eq(1)
93 else:
94 with m.If(counter != 0):
95 m.d.sync += counter.eq(counter+1)
96 with m.Elif(self.trigger):
97 m.d.sync += counter.eq(1)
98
99 for e in self._events:
100 if e[0] == 0:
101 with m.If(self.trigger & (counter == 0)):
102 m.d.sync += e[1]
103 else:
104 with m.If(counter == e[0]):
105 m.d.sync += e[1]
106
107 return m
108
109
110 class CSRPrefixProxy:
111 def __init__(self, bank, prefix):
112 self._bank = bank
113 self._prefix = prefix
114
115 def csr(self, width, access, *, addr=None, alignment=None, name=None,
116 src_loc_at=0):
117 if name is not None and not isinstance(name, str):
118 raise TypeError("Name must be a string, not {!r}".format(name))
119 name = name or tracer.get_var_name(depth=2 + src_loc_at).lstrip("_")
120
121 prefixed_name = "{}_{}".format(self._prefix, name)
122 return self._bank.csr(width=width, access=access, addr=addr,
123 alignment=alignment, name=prefixed_name)