1 # This file is Copyright (c) 2020 LambdaConcept <contact@lambdaconcept.com>
6 from nmigen
import tracer
7 from nmigen
.compat
import Case
8 from nmigen
.back
.pysim
import *
10 __ALL__
= ["delayed_enter", "RoundRobin", "Timeline", "CSRPrefixProxy"]
13 def delayed_enter(m
, src
, dst
, delay
):
16 for i
in range(delay
):
20 statename
= "{}-{}".format(src
, i
)
25 deststate
= "{}-{}".format(src
, i
+1)
27 with m
.State(statename
):
30 # Original nMigen implementation by HarryHo90sHK
33 class RoundRobin(Elaboratable
):
34 """A round-robin scheduler.
38 Maximum number of requests to handle.
42 Signal where a '1' on the i-th bit represents an incoming request from the i-th device.
43 grant : Signal(range(n))
44 Signal that equals to the index of the device which is currently granted access.
46 Strobe signal to enable granting access to the next device requesting. Externally driven.
49 def __init__(self
, n
):
51 self
.request
= Signal(n
)
52 self
.grant
= Signal(range(n
))
55 def elaborate(self
, platform
):
59 with m
.Switch(self
.grant
):
60 for i
in range(self
.n
):
62 for j
in reversed(range(i
+1, i
+self
.n
)):
63 # If i+1 <= j < n, then t == j; (after i)
64 # If n <= j < i+n, then t == j - n (before i)
66 with m
.If(self
.request
[t
]):
67 m
.d
.sync
+= self
.grant
.eq(t
)
72 class Timeline(Elaboratable
):
73 def __init__(self
, events
):
74 self
.trigger
= Signal()
77 def elaborate(self
, platform
):
80 lastevent
= max([e
[0] for e
in self
._events
])
81 counter
= Signal(range(lastevent
+1))
83 # Counter incrementation
84 # (with overflow handling)
85 if (lastevent
& (lastevent
+ 1)) != 0:
86 with m
.If(counter
== lastevent
):
87 m
.d
.sync
+= counter
.eq(0)
89 with m
.If(counter
!= 0):
90 m
.d
.sync
+= counter
.eq(counter
+1)
91 with m
.Elif(self
.trigger
):
92 m
.d
.sync
+= counter
.eq(1)
94 with m
.If(counter
!= 0):
95 m
.d
.sync
+= counter
.eq(counter
+1)
96 with m
.Elif(self
.trigger
):
97 m
.d
.sync
+= counter
.eq(1)
99 for e
in self
._events
:
101 with m
.If(self
.trigger
& (counter
== 0)):
104 with m
.If(counter
== e
[0]):
109 class TimelineTestCase(unittest
.TestCase
):
110 def test_sequence(self
):
114 timeline
= Timeline([
122 m
.submodules
.timeline
= timeline
125 # Test default value for unset signals
126 self
.assertFalse((yield sigA
))
127 self
.assertFalse((yield sigB
))
129 # Ensure that the sequence isn't triggered without the trigger signal
132 self
.assertFalse((yield sigA
))
133 self
.assertFalse((yield sigB
))
135 yield timeline
.trigger
.eq(1)
137 yield timeline
.trigger
.eq(0)
139 for i
in range(11+1):
143 self
.assertTrue((yield sigA
))
144 self
.assertFalse((yield sigB
))
146 self
.assertTrue((yield sigA
))
147 self
.assertFalse((yield sigB
))
149 self
.assertFalse((yield sigA
))
150 self
.assertFalse((yield sigB
))
152 self
.assertFalse((yield sigA
))
153 self
.assertTrue((yield sigB
))
155 self
.assertFalse((yield sigA
))
156 self
.assertFalse((yield sigB
))
158 # Ensure no changes happen once the sequence is done
161 self
.assertFalse((yield sigA
))
162 self
.assertFalse((yield sigB
))
165 with sim
.write_vcd("test_compat.vcd"):
167 sim
.add_sync_process(process
)
171 class CSRPrefixProxy
:
172 def __init__(self
, bank
, prefix
):
174 self
._prefix
= prefix
176 def csr(self
, width
, access
, *, addr
=None, alignment
=None, name
=None,
178 if name
is not None and not isinstance(name
, str):
179 raise TypeError("Name must be a string, not {!r}".format(name
))
180 name
= name
or tracer
.get_var_name(depth
=2 + src_loc_at
).lstrip("_")
182 prefixed_name
= "{}_{}".format(self
._prefix
, name
)
183 return self
._bank
.csr(width
=width
, access
=access
, addr
=addr
,
184 alignment
=alignment
, name
=prefixed_name
)