from nmigen.compat import Case
from nmigen.back.pysim import *
-__ALL__ = ["delayed_enter", "RoundRobin", "Timeline", "CSRPrefixProxy"]
+__ALL__ = ["delayed_enter", "Timeline", "CSRPrefixProxy"]
def delayed_enter(m, src, dst, delay):
with m.State(statename):
m.next = deststate
-class RoundRobin(Elaboratable):
- """A round-robin scheduler. (HarryHo90sHK)
- Parameters
- ----------
- n : int
- Maximum number of requests to handle.
- Attributes
- ----------
- request : Signal(n)
- Signal where a '1' on the i-th bit represents an incoming request from the i-th device.
- grant : Signal(range(n))
- Signal that equals to the index of the device which is currently granted access.
- stb : Signal()
- Strobe signal to enable granting access to the next device requesting. Externally driven.
- """
-
- def __init__(self, n):
- self.n = n
- self.request = Signal(n)
- self.grant = Signal(range(n))
- self.stb = Signal()
-
- def elaborate(self, platform):
- m = Module()
-
- if self.n == 1:
- m.d.comb += self.grant.eq(0)
- else:
- with m.If(self.stb):
- with m.Switch(self.grant):
- for i in range(self.n):
- with m.Case(i):
- for j in reversed(range(i+1, i+self.n)):
- # If i+1 <= j < n, then t == j; (after i)
- # If n <= j < i+n, then t == j - n (before i)
- t = j % self.n
- with m.If(self.request[t]):
- m.d.sync += self.grant.eq(t)
-
- return m
-
class Timeline(Elaboratable):
def __init__(self, events):
# License: BSD
from nmigen import *
+from nmigen.lib.scheduler import RoundRobin
from gram.common import *
from gram.core.controller import *
-from gram.compat import RoundRobin
import gram.stream as stream
__ALL__ = ["gramCrossbar"]
master_wdata_readys = [0]*nmasters
master_rdata_valids = [0]*nmasters
- arbiters = [RoundRobin(nmasters) for n in range(self.nbanks)]
+ arbiters_en = Signal(self.nbanks)
+ arbiters = [EnableInserter(arbiters_en[n])(RoundRobin(count=nmasters)) for n in range(self.nbanks)]
m.submodules += arbiters
for nb, arbiter in enumerate(arbiters):
bank_requested = [bs & master.cmd.valid for bs,
master in zip(bank_selected, self.masters)]
m.d.comb += [
- arbiter.request.eq(Cat(*bank_requested)),
- arbiter.stb.eq(~bank.valid & ~bank.lock)
+ arbiter.requests.eq(Cat(*bank_requested)),
+ arbiters_en[nb].eq(~bank.valid & ~bank.lock)
]
# Route requests -----------------------------------------------------------------------
from nmigen import *
from nmigen.asserts import Assert, Assume
+from nmigen.lib.scheduler import RoundRobin
from gram.common import *
import gram.stream as stream
-from gram.compat import RoundRobin, delayed_enter
+from gram.compat import delayed_enter
__ALL__ = ["Multiplexer"]
write = request.is_write == self.want_writes
m.d.comb += valids[i].eq(request.valid & (command | (read & write)))
- m.submodules.arbiter = arbiter = RoundRobin(n)
+ # Arbitrate if a command is being accepted or if the command is not valid to ensure a valid
+ # command is selected when cmd.ready goes high.
+ m.submodules.arbiter = arbiter = EnableInserter(self.cmd.ready | ~self.cmd.valid)(RoundRobin(count=n))
choices = Array(valids[i] for i in range(n))
m.d.comb += [
- arbiter.request.eq(valids),
+ arbiter.requests.eq(valids),
self.cmd.valid.eq(choices[arbiter.grant])
]
with m.If(self.cmd.valid & self.cmd.ready & (arbiter.grant == i)):
m.d.comb += self.ready[i].eq(1)
- # Arbitrate if a command is being accepted or if the command is not valid to ensure a valid
- # command is selected when cmd.ready goes high.
- m.d.comb += arbiter.stb.eq(self.cmd.ready | ~self.cmd.valid)
-
return m
# helpers
self.assertFalse((yield sigB))
runSimulation(timeline, process, "test_timeline.vcd")
-
-class RoundRobinOutputMatchSpec(Elaboratable):
- def __init__(self, dut):
- self.dut = dut
-
- def elaborate(self, platform):
- m = Module()
-
- m.d.comb += Assume(Rose(self.dut.stb).implies(self.dut.request == Past(self.dut.request)))
-
- m.d.sync += Assert(((Past(self.dut.request) != 0) & Past(self.dut.stb)).implies(Past(self.dut.request) & (1 << self.dut.grant)))
-
- return m
-
-class RoundRobinTestCase(FHDLTestCase):
- def test_sequence(self):
- m = Module()
- m.submodules.rb = roundrobin = RoundRobin(8)
-
- def process():
- yield roundrobin.request.eq(0b10001000)
- yield roundrobin.stb.eq(1)
- yield
- yield
-
- self.assertEqual((yield roundrobin.grant), 3)
- yield
-
- self.assertEqual((yield roundrobin.grant), 7)
- yield
-
- self.assertEqual((yield roundrobin.grant), 3)
- yield roundrobin.request.eq(0b00000001)
- yield
- yield
-
- self.assertEqual((yield roundrobin.grant), 0)
-
- runSimulation(m, process, "test_roundrobin.vcd")
-
- # def test_output_match(self):
- # roundrobin = RoundRobin(32)
- # spec = RoundRobinOutputMatchSpec(roundrobin)
- # self.assertFormal(spec, mode="bmc", depth=10)
\ No newline at end of file