+# This file is Copyright (c) 2020 LambdaConcept <contact@lambdaconcept.com>
+
+import unittest
+
from nmigen import *
+from nmigen import tracer
+from nmigen.compat import Case
+from nmigen.back.pysim import *
+
+__ALL__ = ["delayed_enter", "RoundRobin", "Timeline", "CSRPrefixProxy"]
-__ALL__ = ["delayed_enter", "RoundRobin", "Timeline"]
def delayed_enter(m, src, dst, delay):
assert delay > 0
with m.State(statename):
m.next = deststate
-(SP_WITHDRAW, SP_CE) = range(2)
-
class RoundRobin(Elaboratable):
- def __init__(self, n, switch_policy=SP_WITHDRAW):
+ """A round-robin scheduler. (HarryHo90sHK)
+ Parameters
+ ----------
+ n : int
+ Maximum number of requests to handle.
+ Attributes
+ ----------
+ request : Signal(n)
+ Signal where a '1' on the i-th bit represents an incoming request from the i-th device.
+ grant : Signal(range(n))
+ Signal that equals to the index of the device which is currently granted access.
+ stb : Signal()
+ Strobe signal to enable granting access to the next device requesting. Externally driven.
+ """
+
+ def __init__(self, n):
+ self.n = n
self.request = Signal(n)
- self.grant = Signal(max=max(2, n))
- self.switch_policy = switch_policy
- if self.switch_policy == SP_CE:
- self.ce = Signal()
+ self.grant = Signal(range(n))
+ self.stb = Signal()
def elaborate(self, platform):
m = Module()
- # TODO: fix
-
- if n > 1:
- cases = {}
- for i in range(n):
- switch = []
- for j in reversed(range(i+1, i+n)):
- t = j % n
- switch = [
- If(self.request[t],
- self.grant.eq(t)
- ).Else(
- *switch
- )
- ]
- if self.switch_policy == SP_WITHDRAW:
- case = [If(~self.request[i], *switch)]
- else:
- case = switch
- cases[i] = case
- statement = Case(self.grant, cases)
- if self.switch_policy == SP_CE:
- with m.If(self.ce):
- m.d.sync += statement
- else:
- m.d.sync += statement
- else:
+ if self.n == 1:
m.d.comb += self.grant.eq(0)
+ else:
+ with m.If(self.stb):
+ with m.Switch(self.grant):
+ for i in range(self.n):
+ with m.Case(i):
+ for j in reversed(range(i+1, i+self.n)):
+ # If i+1 <= j < n, then t == j; (after i)
+ # If n <= j < i+n, then t == j - n (before i)
+ t = j % self.n
+ with m.If(self.request[t]):
+ m.d.sync += self.grant.eq(t)
return m
+
class Timeline(Elaboratable):
def __init__(self, events):
self.trigger = Signal()
m.d.sync += e[1]
return m
+
+
+class CSRPrefixProxy:
+ def __init__(self, bank, prefix):
+ self._bank = bank
+ self._prefix = prefix
+
+ def csr(self, width, access, *, addr=None, alignment=None, name=None,
+ src_loc_at=0):
+ if name is not None and not isinstance(name, str):
+ raise TypeError("Name must be a string, not {!r}".format(name))
+ name = name or tracer.get_var_name(depth=2 + src_loc_at).lstrip("_")
+
+ prefixed_name = "{}_{}".format(self._prefix, name)
+ return self._bank.csr(width=width, access=access, addr=addr,
+ alignment=alignment, name=prefixed_name)