--- /dev/null
+from ..._utils import deprecated
+from ..fhdl.module import CompatModule
+
+
+__all__ = ["RoundRobin", "SP_WITHDRAW", "SP_CE"]
+
+(SP_WITHDRAW, SP_CE) = range(2)
+
+class CompatRoundRobin(CompatModule):
+ def __init__(self, n, switch_policy=SP_WITHDRAW):
+ self.request = Signal(n)
+ self.grant = Signal(max=max(2, n))
+ self.switch_policy = switch_policy
+ if self.switch_policy == SP_CE:
+ warnings.warn("instead of `migen.genlib.roundrobin.RoundRobin`, "
+ "use `nmigen.lib.scheduler.RoundRobin`; note that RoundRobin does not "
+ "require a policy anymore but to get the same behavior as SP_CE you"
+ "should use an EnableInserter",
+ DeprecationWarning, stacklevel=1)
+ self.ce = Signal()
+ else:
+ warnings.warn("instead of `migen.genlib.roundrobin.RoundRobin`, "
+ "use `nmigen.lib.scheduler.RoundRobin`; note that RoundRobin does not "
+ "require a policy anymore",
+ DeprecationWarning, stacklevel=1)
+
+ ###
+
+ if n > 1:
+ cases = {}
+ for i in range(n):
+ switch = []
+ for j in reversed(range(i+1, i+n)):
+ t = j % n
+ switch = [
+ If(self.request[t],
+ self.grant.eq(t)
+ ).Else(
+ *switch
+ )
+ ]
+ if self.switch_policy == SP_WITHDRAW:
+ case = [If(~self.request[i], *switch)]
+ else:
+ case = switch
+ cases[i] = case
+ statement = Case(self.grant, cases)
+ if self.switch_policy == SP_CE:
+ statement = If(self.ce, statement)
+ self.sync += statement
+ else:
+ self.comb += self.grant.eq(0)
+
+
+
+RoundRobin = CompatRoundRobin
--- /dev/null
+from .. import *
+
+
+__all__ = ["RoundRobin"]
+
+
+class RoundRobin(Elaboratable):
+ """Round-robin scheduler.
+
+ For a given set of requests, the round-robin scheduler will
+ grant one request. Once it grants a request, if any other
+ requests are active, it grants the next active request with
+ a greater number, restarting from zero once it reaches the
+ highest one.
+
+ Use :class:`EnableInserter` to control when the scheduler
+ is updated.
+
+ Parameters
+ ----------
+ count : int
+ Number of requests.
+
+ Attributes
+ ----------
+ requests : Signal(count), in
+ Set of requests.
+ grant : Signal(range(count)), out
+ Number of the granted request. Does not change if there are no
+ active requests.
+ valid : Signal(), out
+ Asserted if grant corresponds to an active request. Deasserted
+ otherwise, i.e. if no requests are active.
+ """
+ def __init__(self, *, count):
+ if not isinstance(count, int) or count < 0:
+ raise ValueError("Count must be a non-negative integer, not {!r}"
+ .format(count))
+ self.count = count
+
+ self.requests = Signal(count)
+ self.grant = Signal(range(count))
+ self.valid = Signal()
+
+ def elaborate(self, platform):
+ m = Module()
+
+ with m.Switch(self.grant):
+ for i in range(self.count):
+ with m.Case(i):
+ for pred in reversed(range(i)):
+ with m.If(self.requests[pred]):
+ m.d.sync += self.grant.eq(pred)
+ for succ in reversed(range(i + 1, self.count)):
+ with m.If(self.requests[succ]):
+ m.d.sync += self.grant.eq(succ)
+
+ m.d.sync += self.valid.eq(self.requests.any())
+
+ return m
--- /dev/null
+# nmigen: UnusedElaboratable=no
+import unittest
+from .utils import *
+from ..hdl import *
+from ..asserts import *
+from ..sim.pysim import *
+from ..lib.scheduler import *
+
+
+class RoundRobinTestCase(unittest.TestCase):
+ def test_count(self):
+ dut = RoundRobin(count=32)
+ self.assertEqual(dut.count, 32)
+ self.assertEqual(len(dut.requests), 32)
+ self.assertEqual(len(dut.grant), 5)
+
+ def test_wrong_count(self):
+ with self.assertRaisesRegex(ValueError, r"Count must be a non-negative integer, not 'foo'"):
+ dut = RoundRobin(count="foo")
+ with self.assertRaisesRegex(ValueError, r"Count must be a non-negative integer, not -1"):
+ dut = RoundRobin(count=-1)
+
+
+class RoundRobinSimulationTestCase(unittest.TestCase):
+ def test_count_one(self):
+ dut = RoundRobin(count=1)
+ sim = Simulator(dut)
+ def process():
+ yield dut.requests.eq(0)
+ yield; yield Delay(1e-8)
+ self.assertEqual((yield dut.grant), 0)
+ self.assertFalse((yield dut.valid))
+
+ yield dut.requests.eq(1)
+ yield; yield Delay(1e-8)
+ self.assertEqual((yield dut.grant), 0)
+ self.assertTrue((yield dut.valid))
+ sim.add_sync_process(process)
+ sim.add_clock(1e-6)
+ with sim.write_vcd("test.vcd"):
+ sim.run()
+
+ def test_transitions(self):
+ dut = RoundRobin(count=3)
+ sim = Simulator(dut)
+ def process():
+ yield dut.requests.eq(0b111)
+ yield; yield Delay(1e-8)
+ self.assertEqual((yield dut.grant), 1)
+ self.assertTrue((yield dut.valid))
+
+ yield dut.requests.eq(0b110)
+ yield; yield Delay(1e-8)
+ self.assertEqual((yield dut.grant), 2)
+ self.assertTrue((yield dut.valid))
+
+ yield dut.requests.eq(0b010)
+ yield; yield Delay(1e-8)
+ self.assertEqual((yield dut.grant), 1)
+ self.assertTrue((yield dut.valid))
+
+ yield dut.requests.eq(0b011)
+ yield; yield Delay(1e-8)
+ self.assertEqual((yield dut.grant), 0)
+ self.assertTrue((yield dut.valid))
+
+ yield dut.requests.eq(0b001)
+ yield; yield Delay(1e-8)
+ self.assertEqual((yield dut.grant), 0)
+ self.assertTrue((yield dut.valid))
+
+ yield dut.requests.eq(0b101)
+ yield; yield Delay(1e-8)
+ self.assertEqual((yield dut.grant), 2)
+ self.assertTrue((yield dut.valid))
+
+ yield dut.requests.eq(0b100)
+ yield; yield Delay(1e-8)
+ self.assertEqual((yield dut.grant), 2)
+ self.assertTrue((yield dut.valid))
+
+ yield dut.requests.eq(0b000)
+ yield; yield Delay(1e-8)
+ self.assertFalse((yield dut.valid))
+
+ yield dut.requests.eq(0b001)
+ yield; yield Delay(1e-8)
+ self.assertEqual((yield dut.grant), 0)
+ self.assertTrue((yield dut.valid))
+ sim.add_sync_process(process)
+ sim.add_clock(1e-6)
+ with sim.write_vcd("test.vcd"):
+ sim.run()