from nmigen.compat import Case
from nmigen.back.pysim import *
-__ALL__ = ["delayed_enter", "RoundRobin", "Timeline", "CSRPrefixProxy"]
+__ALL__ = ["delayed_enter", "Timeline", "CSRPrefixProxy"]
def delayed_enter(m, src, dst, delay):
- assert delay > 0
+ if not isinstance(m, Module):
+ raise ValueError("m must be a module object, not {!r}".format(m))
+ if not isinstance(delay, int):
+ raise ValueError("Delay must be an integer, not {!r}".format(delay))
+ if delay < 1:
+ raise ValueError("Delay must be at least one cycle, not {!r}".format(delay))
for i in range(delay):
if i == 0:
with m.State(statename):
m.next = deststate
-# Original nMigen implementation by HarryHo90sHK
-
-
-class RoundRobin(Elaboratable):
- """A round-robin scheduler.
- Parameters
- ----------
- n : int
- Maximum number of requests to handle.
- Attributes
- ----------
- request : Signal(n)
- Signal where a '1' on the i-th bit represents an incoming request from the i-th device.
- grant : Signal(range(n))
- Signal that equals to the index of the device which is currently granted access.
- stb : Signal()
- Strobe signal to enable granting access to the next device requesting. Externally driven.
- """
-
- def __init__(self, n):
- self.n = n
- self.request = Signal(n)
- self.grant = Signal(range(n))
- self.stb = Signal()
-
- def elaborate(self, platform):
- m = Module()
-
- with m.If(self.stb):
- with m.Switch(self.grant):
- for i in range(self.n):
- with m.Case(i):
- for j in reversed(range(i+1, i+self.n)):
- # If i+1 <= j < n, then t == j; (after i)
- # If n <= j < i+n, then t == j - n (before i)
- t = j % self.n
- with m.If(self.request[t]):
- m.d.sync += self.grant.eq(t)
-
- return m
-
class Timeline(Elaboratable):
def __init__(self, events):
return m
-class TimelineTestCase(unittest.TestCase):
- def test_sequence(self):
- sigA = Signal()
- sigB = Signal()
- sigC = Signal()
- timeline = Timeline([
- (1, sigA.eq(1)),
- (5, sigA.eq(1)),
- (7, sigA.eq(0)),
- (10, sigB.eq(1)),
- (11, sigB.eq(0)),
- ])
- m = Module()
- m.submodules.timeline = timeline
-
- def process():
- # Test default value for unset signals
- self.assertFalse((yield sigA))
- self.assertFalse((yield sigB))
-
- # Ensure that the sequence isn't triggered without the trigger signal
- for i in range(100):
- yield
- self.assertFalse((yield sigA))
- self.assertFalse((yield sigB))
-
- yield timeline.trigger.eq(1)
- yield
- yield timeline.trigger.eq(0)
-
- for i in range(11+1):
- yield
-
- if i == 1:
- self.assertTrue((yield sigA))
- self.assertFalse((yield sigB))
- elif i == 5:
- self.assertTrue((yield sigA))
- self.assertFalse((yield sigB))
- elif i == 7:
- self.assertFalse((yield sigA))
- self.assertFalse((yield sigB))
- elif i == 10:
- self.assertFalse((yield sigA))
- self.assertTrue((yield sigB))
- elif i == 11:
- self.assertFalse((yield sigA))
- self.assertFalse((yield sigB))
-
- # Ensure no changes happen once the sequence is done
- for i in range(100):
- yield
- self.assertFalse((yield sigA))
- self.assertFalse((yield sigB))
-
- sim = Simulator(m)
- with sim.write_vcd("test_compat.vcd"):
- sim.add_clock(1e-6)
- sim.add_sync_process(process)
- sim.run()
-
class CSRPrefixProxy:
def __init__(self, bank, prefix):