1 # This file is Copyright (c) 2020 LambdaConcept <contact@lambdaconcept.com>
6 from nmigen
import tracer
7 from nmigen
.compat
import Case
8 from nmigen
.back
.pysim
import *
10 __ALL__
= ["delayed_enter", "RoundRobin", "Timeline", "CSRPrefixProxy"]
13 def delayed_enter(m
, src
, dst
, delay
):
16 for i
in range(delay
):
20 statename
= "{}-{}".format(src
, i
)
25 deststate
= "{}-{}".format(src
, i
+1)
27 with m
.State(statename
):
30 # Original nMigen implementation by HarryHo90sHK
33 class RoundRobin(Elaboratable
):
34 """A round-robin scheduler.
38 Maximum number of requests to handle.
42 Signal where a '1' on the i-th bit represents an incoming request from the i-th device.
43 grant : Signal(range(n))
44 Signal that equals to the index of the device which is currently granted access.
46 Strobe signal to enable granting access to the next device requesting. Externally driven.
49 def __init__(self
, n
):
51 self
.request
= Signal(n
)
52 self
.grant
= Signal(range(n
))
55 def elaborate(self
, platform
):
59 with m
.Switch(self
.grant
):
60 for i
in range(self
.n
):
62 for j
in reversed(range(i
+1, i
+self
.n
)):
63 # If i+1 <= j < n, then t == j; (after i)
64 # If n <= j < i+n, then t == j - n (before i)
66 with m
.If(self
.request
[t
]):
67 m
.d
.sync
+= self
.grant
.eq(t
)
72 class Timeline(Elaboratable
):
73 def __init__(self
, events
):
74 self
.trigger
= Signal()
77 def elaborate(self
, platform
):
80 lastevent
= max([e
[0] for e
in self
._events
])
81 counter
= Signal(range(lastevent
+1))
83 # Counter incrementation
84 # (with overflow handling)
85 if (lastevent
& (lastevent
+ 1)) != 0:
86 with m
.If(counter
== lastevent
):
87 m
.d
.sync
+= counter
.eq(0)
89 with m
.If(counter
!= 0):
90 m
.d
.sync
+= counter
.eq(counter
+1)
91 with m
.Elif(self
.trigger
):
92 m
.d
.sync
+= counter
.eq(1)
94 with m
.If(counter
!= 0):
95 m
.d
.sync
+= counter
.eq(counter
+1)
96 with m
.Elif(self
.trigger
):
97 m
.d
.sync
+= counter
.eq(1)
99 for e
in self
._events
:
101 with m
.If(self
.trigger
& (counter
== 0)):
104 with m
.If(counter
== e
[0]):
109 class TimelineTestCase(unittest
.TestCase
):
110 def test_sequence(self
):
114 timeline
= Timeline([
122 m
.submodules
.timeline
= timeline
125 # Test default value for unset signals
126 self
.assertFalse((yield sigA
))
127 self
.assertFalse((yield sigB
))
129 # Ensure that the sequence isn't triggered without the trigger signal
132 self
.assertFalse((yield sigA
))
133 self
.assertFalse((yield sigB
))
135 yield timeline
.trigger
.eq(1)
138 for i
in range(11+1):
142 self
.assertTrue((yield sigA
))
143 self
.assertFalse((yield sigB
))
145 self
.assertTrue((yield sigA
))
146 self
.assertFalse((yield sigB
))
148 self
.assertFalse((yield sigA
))
149 self
.assertFalse((yield sigB
))
151 self
.assertFalse((yield sigA
))
152 self
.assertTrue((yield sigB
))
154 self
.assertFalse((yield sigA
))
155 self
.assertFalse((yield sigB
))
158 with sim
.write_vcd("test_compat.vcd"):
160 sim
.add_sync_process(process
)
164 class CSRPrefixProxy
:
165 def __init__(self
, bank
, prefix
):
167 self
._prefix
= prefix
169 def csr(self
, width
, access
, *, addr
=None, alignment
=None, name
=None,
171 if name
is not None and not isinstance(name
, str):
172 raise TypeError("Name must be a string, not {!r}".format(name
))
173 name
= name
or tracer
.get_var_name(depth
=2 + src_loc_at
).lstrip("_")
175 prefixed_name
= "{}_{}".format(self
._prefix
, name
)
176 return self
._bank
.csr(width
=width
, access
=access
, addr
=addr
,
177 alignment
=alignment
, name
=prefixed_name
)