--- /dev/null
+import pkg_resources
+try:
+ __version__ = pkg_resources.get_distribution(__name__).version
+except pkg_resources.DistributionNotFound:
+ pass
--- /dev/null
+from .base import *
--- /dev/null
+from nmigen import *
+from nmigen import tracer
+
+from nmigen_soc import csr
+
+
+__all__ = ["EventSource", "IRQLine", "InterruptSource"]
+
+
+class EventSource:
+ """Event source.
+
+ Parameters
+ ----------
+ mode : ``"level"``, ``"rise"``, ``"fall"``
+ Trigger mode. If ``"level"``, a notification is raised when the ``stb`` signal is high.
+ If ``"rise"`` (or ``"fall"``) a notification is raised on a rising (or falling) edge
+ of ``stb``.
+ name : str
+ Name of the event. If ``None`` (default) the name is inferred from the variable
+ name this event source is assigned to.
+
+ Attributes
+ ----------
+ name : str
+ Name of the event
+ mode : ``"level"``, ``"rise"``, ``"fall"``
+ Trigger mode.
+ stb : Signal, in
+ Event strobe.
+ """
+ def __init__(self, *, mode="level", name=None, src_loc_at=0):
+ if name is not None and not isinstance(name, str):
+ raise TypeError("Name must be a string, not {!r}".format(name))
+
+ choices = ("level", "rise", "fall")
+ if mode not in choices:
+ raise ValueError("Invalid trigger mode {!r}; must be one of {}"
+ .format(mode, ", ".join(choices)))
+
+ self.name = name or tracer.get_var_name(depth=2 + src_loc_at)
+ self.mode = mode
+ self.stb = Signal(name="{}_stb".format(self.name))
+
+
+class IRQLine(Signal):
+ """Interrupt request line."""
+ def __init__(self, *, name=None, src_loc_at=0):
+ super().__init__(name=name, src_loc_at=1 + src_loc_at)
+
+ __hash__ = object.__hash__
+
+
+class InterruptSource(Elaboratable):
+ """Interrupt source.
+
+ A mean of gathering multiple event sources into a single interrupt request line.
+
+ Parameters
+ ----------
+ events : iter(:class:`EventSource`)
+ Event sources.
+ name : str
+ Name of the interrupt source. If ``None`` (default) the name is inferred from the
+ variable name this interrupt source is assigned to.
+
+ Attributes
+ ----------
+ name : str
+ Name of the interrupt source.
+ status : :class:`csr.Element`, read-only
+ Event status register. Each bit displays the level of the strobe of an event source.
+ Events are ordered by position in the `events` parameter.
+ pending : :class:`csr.Element`, read/write
+ Event pending register. If a bit is 1, the associated event source has a pending
+ notification. Writing 1 to a bit clears it.
+ Events are ordered by position in the `events` parameter.
+ enable : :class:`csr.Element`, read/write
+ Event enable register. Writing 1 to a bit enables its associated event source.
+ Writing 0 disables it.
+ Events are ordered by position in the `events` parameter.
+ irq : :class:`IRQLine`, out
+ Interrupt request. It is raised if any event source is enabled and has a pending
+ notification.
+ """
+ def __init__(self, events, *, name=None, src_loc_at=0):
+ if name is not None and not isinstance(name, str):
+ raise TypeError("Name must be a string, not {!r}".format(name))
+ self.name = name or tracer.get_var_name(depth=2 + src_loc_at)
+
+ for event in events:
+ if not isinstance(event, EventSource):
+ raise TypeError("Event source must be an instance of EventSource, not {!r}"
+ .format(event))
+ self._events = list(events)
+
+ width = len(events)
+ self.status = csr.Element(width, "r", name="{}_status".format(self.name))
+ self.pending = csr.Element(width, "rw", name="{}_pending".format(self.name))
+ self.enable = csr.Element(width, "rw", name="{}_enable".format(self.name))
+
+ self.irq = IRQLine(name="{}_irq".format(self.name))
+
+ def elaborate(self, platform):
+ m = Module()
+
+ with m.If(self.pending.w_stb):
+ m.d.sync += self.pending.r_data.eq(self.pending.r_data & ~self.pending.w_data)
+
+ with m.If(self.enable.w_stb):
+ m.d.sync += self.enable.r_data.eq(self.enable.w_data)
+
+ for i, event in enumerate(self._events):
+ m.d.sync += self.status.r_data[i].eq(event.stb)
+
+ if event.mode in ("rise", "fall"):
+ event_stb_r = Signal.like(event.stb, name_suffix="_r")
+ m.d.sync += event_stb_r.eq(event.stb)
+
+ event_trigger = Signal(name="{}_trigger".format(event.name))
+ if event.mode == "level":
+ m.d.comb += event_trigger.eq(event.stb)
+ elif event.mode == "rise":
+ m.d.comb += event_trigger.eq(~event_stb_r & event.stb)
+ elif event.mode == "fall":
+ m.d.comb += event_trigger.eq(event_stb_r & ~event.stb)
+ else:
+ assert False # :nocov:
+
+ with m.If(event_trigger):
+ m.d.sync += self.pending.r_data[i].eq(1)
+
+ m.d.comb += self.irq.eq((self.pending.r_data & self.enable.r_data).any())
+
+ return m
--- /dev/null
+from nmigen import *
+from nmigen import tracer
+from nmigen.utils import log2_int
+
+from nmigen_soc import csr, wishbone
+from nmigen_soc.memory import MemoryMap
+from nmigen_soc.csr.wishbone import WishboneCSRBridge
+
+
+from ._event import *
+
+
+__all__ = ["Peripheral", "CSRBank", "PeripheralBridge"]
+
+
+class Peripheral:
+ """Wishbone peripheral.
+
+ A helper class to reduce the boilerplate needed to control a peripheral with a Wishbone interface.
+ It provides facilities for instantiating CSR registers, requesting windows to subordinate busses
+ and sending interrupt requests to the CPU.
+
+ The ``Peripheral`` class is not meant to be instantiated as-is, but rather as a base class for
+ actual peripherals.
+
+ Usage example
+ -------------
+
+ ```
+ class ExamplePeripheral(Peripheral, Elaboratable):
+ def __init__(self):
+ super().__init__()
+ bank = self.csr_bank()
+ self._foo = bank.csr(8, "r")
+ self._bar = bank.csr(8, "w")
+
+ self._rdy = self.event(mode="rise")
+
+ self._bridge = self.bridge(data_width=32, granularity=8, alignment=2)
+ self.bus = self._bridge.bus
+ self.irq = self._bridge.irq
+
+ def elaborate(self, platform):
+ m = Module()
+ m.submodules.bridge = self._bridge
+ # ...
+ return m
+ ```
+
+ Arguments
+ ---------
+ name : str
+ Name of this peripheral. If ``None`` (default) the name is inferred from the variable
+ name this peripheral is assigned to.
+
+ Properties
+ ----------
+ name : str
+ Name of the peripheral.
+ """
+ def __init__(self, name=None, src_loc_at=1):
+ if name is not None and not isinstance(name, str):
+ raise TypeError("Name must be a string, not {!r}".format(name))
+ self.name = name or tracer.get_var_name(depth=2 + src_loc_at).lstrip("_")
+
+ self._csr_banks = []
+ self._windows = []
+ self._events = []
+
+ self._bus = None
+ self._irq = None
+
+ @property
+ def bus(self):
+ """Wishbone bus interface.
+
+ Return value
+ ------------
+ An instance of :class:`Interface`.
+
+ Exceptions
+ ----------
+ Raises :exn:`NotImplementedError` if the peripheral does not have a Wishbone bus.
+ """
+ if self._bus is None:
+ raise NotImplementedError("Peripheral {!r} does not have a bus interface"
+ .format(self))
+ return self._bus
+
+ @bus.setter
+ def bus(self, bus):
+ if not isinstance(bus, wishbone.Interface):
+ raise TypeError("Bus interface must be an instance of wishbone.Interface, not {!r}"
+ .format(bus))
+ self._bus = bus
+
+ @property
+ def irq(self):
+ """Interrupt request line.
+
+ Return value
+ ------------
+ An instance of :class:`IRQLine`.
+
+ Exceptions
+ ----------
+ Raises :exn:`NotImplementedError` if the peripheral does not have an IRQ line.
+ """
+ if self._irq is None:
+ raise NotImplementedError("Peripheral {!r} does not have an IRQ line"
+ .format(self))
+ return self._irq
+
+ @irq.setter
+ def irq(self, irq):
+ if not isinstance(irq, IRQLine):
+ raise TypeError("IRQ line must be an instance of IRQLine, not {!r}"
+ .format(irq))
+ self._irq = irq
+
+ def csr_bank(self, *, addr=None, alignment=None):
+ """Request a CSR bank.
+
+ Arguments
+ ---------
+ addr : int or None
+ Address of the bank. If ``None``, the implicit next address will be used.
+ Otherwise, the exact specified address (which must be a multiple of
+ ``2 ** max(alignment, bridge_alignment)``) will be used.
+ alignment : int or None
+ Alignment of the bank. If not specified, the bridge alignment is used.
+ See :class:`nmigen_soc.csr.Multiplexer` for details.
+
+ Return value
+ ------------
+ An instance of :class:`CSRBank`.
+ """
+ bank = CSRBank(name_prefix=self.name)
+ self._csr_banks.append((bank, addr, alignment))
+ return bank
+
+ def window(self, *, addr_width, data_width, granularity=None, features=frozenset(),
+ alignment=0, addr=None, sparse=None):
+ """Request a window to a subordinate bus.
+
+ See :meth:`nmigen_soc.wishbone.Decoder.add` for details.
+
+ Return value
+ ------------
+ An instance of :class:`nmigen_soc.wishbone.Interface`.
+ """
+ window = wishbone.Interface(addr_width=addr_width, data_width=data_width,
+ granularity=granularity, features=features)
+ granularity_bits = log2_int(data_width // window.granularity)
+ window.memory_map = MemoryMap(addr_width=addr_width + granularity_bits,
+ data_width=window.granularity, alignment=alignment)
+ self._windows.append((window, addr, sparse))
+ return window
+
+ def event(self, *, mode="level", name=None, src_loc_at=0):
+ """Request an event source.
+
+ See :class:`EventSource` for details.
+
+ Return value
+ ------------
+ An instance of :class:`EventSource`.
+ """
+ event = EventSource(mode=mode, name=name, src_loc_at=1 + src_loc_at)
+ self._events.append(event)
+ return event
+
+ def bridge(self, *, data_width=8, granularity=None, features=frozenset(), alignment=0):
+ """Request a bridge to the resources of the peripheral.
+
+ See :class:`PeripheralBridge` for details.
+
+ Return value
+ ------------
+ A :class:`PeripheralBridge` providing access to local resources.
+ """
+ return PeripheralBridge(self, data_width=data_width, granularity=granularity,
+ features=features, alignment=alignment)
+
+ def iter_csr_banks(self):
+ """Iterate requested CSR banks and their parameters.
+
+ Yield values
+ ------------
+ A tuple ``bank, addr, alignment`` describing the bank and its parameters.
+ """
+ for bank, addr, alignment in self._csr_banks:
+ yield bank, addr, alignment
+
+ def iter_windows(self):
+ """Iterate requested windows and their parameters.
+
+ Yield values
+ ------------
+ A tuple ``window, addr, sparse`` descr
+ given to :meth:`Peripheral.window`.
+ """
+ for window, addr, sparse in self._windows:
+ yield window, addr, sparse
+
+ def iter_events(self):
+ """Iterate requested event sources.
+
+ Yield values
+ ------------
+ An instance of :class:`EventSource`.
+ """
+ for event in self._events:
+ yield event
+
+
+class CSRBank:
+ """CSR register bank.
+
+ Parameters
+ ----------
+ name_prefix : str
+ Name prefix of the bank registers.
+ """
+ def __init__(self, *, name_prefix=""):
+ self._name_prefix = name_prefix
+ self._csr_regs = []
+
+ def csr(self, width, access, *, addr=None, alignment=None, name=None,
+ src_loc_at=0):
+ """Request a CSR register.
+
+ Parameters
+ ----------
+ width : int
+ Width of the register. See :class:`nmigen_soc.csr.Element`.
+ access : :class:`Access`
+ Register access mode. See :class:`nmigen_soc.csr.Element`.
+ addr : int
+ Address of the register. See :meth:`nmigen_soc.csr.Multiplexer.add`.
+ alignment : int
+ Register alignment. See :class:`nmigen_soc.csr.Multiplexer`.
+ name : str
+ Name of the register. If ``None`` (default) the name is inferred from the variable
+ name this register is assigned to.
+
+ Return value
+ ------------
+ An instance of :class:`nmigen_soc.csr.Element`.
+ """
+ if name is not None and not isinstance(name, str):
+ raise TypeError("Name must be a string, not {!r}".format(name))
+ name = name or tracer.get_var_name(depth=2 + src_loc_at).lstrip("_")
+
+ elem_name = "{}_{}".format(self._name_prefix, name)
+ elem = csr.Element(width, access, name=elem_name)
+ self._csr_regs.append((elem, addr, alignment))
+ return elem
+
+ def iter_csr_regs(self):
+ """Iterate requested CSR registers and their parameters.
+
+ Yield values
+ ------------
+ A tuple ``elem, addr, alignment`` describing the register and its parameters.
+ """
+ for elem, addr, alignment in self._csr_regs:
+ yield elem, addr, alignment
+
+
+class PeripheralBridge(Elaboratable):
+ """Peripheral bridge.
+
+ A bridge providing access to the registers and windows of a peripheral, and support for
+ interrupt requests from its event sources.
+
+ Event managment is performed by an :class:`InterruptSource` submodule.
+
+ Parameters
+ ---------
+ periph : :class:`Peripheral`
+ The peripheral whose resources are exposed by this bridge.
+ data_width : int
+ Data width. See :class:`nmigen_soc.wishbone.Interface`.
+ granularity : int or None
+ Granularity. See :class:`nmigen_soc.wishbone.Interface`.
+ features : iter(str)
+ Optional signal set. See :class:`nmigen_soc.wishbone.Interface`.
+ alignment : int
+ Resource alignment. See :class:`nmigen_soc.memory.MemoryMap`.
+
+ Attributes
+ ----------
+ bus : :class:`nmigen_soc.wishbone.Interface`
+ Wishbone bus providing access to the resources of the peripheral.
+ irq : :class:`IRQLine`, out
+ Interrupt request. It is raised if any event source is enabled and has a pending
+ notification.
+ """
+ def __init__(self, periph, *, data_width, granularity, features, alignment):
+ if not isinstance(periph, Peripheral):
+ raise TypeError("Peripheral must be an instance of Peripheral, not {!r}"
+ .format(periph))
+
+ self._wb_decoder = wishbone.Decoder(addr_width=1, data_width=data_width,
+ granularity=granularity,
+ features=features, alignment=alignment)
+
+ self._csr_subs = []
+
+ for bank, bank_addr, bank_alignment in periph.iter_csr_banks():
+ if bank_alignment is None:
+ bank_alignment = alignment
+ csr_mux = csr.Multiplexer(addr_width=1, data_width=8, alignment=bank_alignment)
+ for elem, elem_addr, elem_alignment in bank.iter_csr_regs():
+ if elem_alignment is None:
+ elem_alignment = alignment
+ csr_mux.add(elem, addr=elem_addr, alignment=elem_alignment, extend=True)
+
+ csr_bridge = WishboneCSRBridge(csr_mux.bus, data_width=data_width)
+ self._wb_decoder.add(csr_bridge.wb_bus, addr=bank_addr, extend=True)
+ self._csr_subs.append((csr_mux, csr_bridge))
+
+ for window, window_addr, window_sparse in periph.iter_windows():
+ self._wb_decoder.add(window, addr=window_addr, sparse=window_sparse, extend=True)
+
+ events = list(periph.iter_events())
+ if len(events) > 0:
+ self._int_src = InterruptSource(events, name="{}_ev".format(periph.name))
+ self.irq = self._int_src.irq
+
+ csr_mux = csr.Multiplexer(addr_width=1, data_width=8, alignment=alignment)
+ csr_mux.add(self._int_src.status, extend=True)
+ csr_mux.add(self._int_src.pending, extend=True)
+ csr_mux.add(self._int_src.enable, extend=True)
+
+ csr_bridge = WishboneCSRBridge(csr_mux.bus, data_width=data_width)
+ self._wb_decoder.add(csr_bridge.wb_bus, extend=True)
+ self._csr_subs.append((csr_mux, csr_bridge))
+ else:
+ self._int_src = None
+ self.irq = None
+
+ self.bus = self._wb_decoder.bus
+
+ def elaborate(self, platform):
+ m = Module()
+
+ for i, (csr_mux, csr_bridge) in enumerate(self._csr_subs):
+ m.submodules[ "csr_mux_{}".format(i)] = csr_mux
+ m.submodules["csr_bridge_{}".format(i)] = csr_bridge
+
+ if self._int_src is not None:
+ m.submodules._int_src = self._int_src
+
+ m.submodules.wb_decoder = self._wb_decoder
+
+ return m
--- /dev/null
+def wb_read(bus, addr, sel, timeout=32):
+ yield bus.cyc.eq(1)
+ yield bus.stb.eq(1)
+ yield bus.adr.eq(addr)
+ yield bus.sel.eq(sel)
+ yield
+ cycles = 0
+ while not (yield bus.ack):
+ yield
+ if cycles >= timeout:
+ raise RuntimeError("Wishbone transaction timed out")
+ cycles += 1
+ data = (yield bus.dat_r)
+ yield bus.cyc.eq(0)
+ yield bus.stb.eq(0)
+ return data
+
+def wb_write(bus, addr, data, sel, timeout=32):
+ yield bus.cyc.eq(1)
+ yield bus.stb.eq(1)
+ yield bus.adr.eq(addr)
+ yield bus.we.eq(1)
+ yield bus.sel.eq(sel)
+ yield bus.dat_w.eq(data)
+ yield
+ cycles = 0
+ while not (yield bus.ack):
+ yield
+ if cycles >= timeout:
+ raise RuntimeError("Wishbone transaction timed out")
+ cycles += 1
+ yield bus.cyc.eq(0)
+ yield bus.stb.eq(0)
+ yield bus.we.eq(0)
--- /dev/null
+# nmigen: UnusedElaboratable=no
+
+import unittest
+from nmigen import *
+from nmigen.back.pysim import *
+
+from ._wishbone import *
+from ..periph.base import Peripheral, CSRBank, PeripheralBridge
+
+
+def simulation_test(dut, process):
+ with Simulator(dut, vcd_file=open("test.vcd", "w")) as sim:
+ sim.add_clock(1e-6)
+ sim.add_sync_process(process)
+ sim.run()
+
+
+class PeripheralTestCase(unittest.TestCase):
+ def test_name(self):
+ class Wrapper(Peripheral):
+ def __init__(self):
+ super().__init__()
+ periph_0 = Wrapper()
+ periph_1 = Peripheral(name="periph_1")
+ self.assertEqual(periph_0.name, "periph_0")
+ self.assertEqual(periph_1.name, "periph_1")
+
+ def test_periph_name_wrong(self):
+ with self.assertRaisesRegex(TypeError,
+ r"Name must be a string, not 2"):
+ periph = Peripheral(name=2)
+
+ def test_set_bus_wrong(self):
+ periph = Peripheral(src_loc_at=0)
+ with self.assertRaisesRegex(TypeError,
+ r"Bus interface must be an instance of wishbone.Interface, not 'foo'"):
+ periph.bus = "foo"
+
+ def test_get_bus_wrong(self):
+ periph = Peripheral(src_loc_at=0)
+ with self.assertRaisesRegex(NotImplementedError,
+ r"Peripheral <.*> does not have a bus interface"):
+ periph.bus
+
+ def test_set_irq_wrong(self):
+ periph = Peripheral(src_loc_at=0)
+ with self.assertRaisesRegex(TypeError,
+ r"IRQ line must be an instance of IRQLine, not 'foo'"):
+ periph.irq = "foo"
+
+ def test_get_irq_wrong(self):
+ periph = Peripheral(src_loc_at=0)
+ with self.assertRaisesRegex(NotImplementedError,
+ r"Peripheral <.*> does not have an IRQ line"):
+ periph.irq
+
+ def test_iter_csr_banks(self):
+ periph = Peripheral(src_loc_at=0)
+ bank_0 = periph.csr_bank()
+ bank_1 = periph.csr_bank(addr=0x4, alignment=2)
+ self.assertEqual(list(periph.iter_csr_banks()), [
+ (bank_0, None, None),
+ (bank_1, 0x4, 2),
+ ])
+
+ def test_iter_windows(self):
+ periph = Peripheral(src_loc_at=0)
+ win_0 = periph.window(addr_width=2, data_width=8)
+ win_1 = periph.window(addr_width=4, data_width=8, addr=0x4, sparse=True)
+ self.assertEqual(list(periph.iter_windows()), [
+ (win_0, None, None),
+ (win_1, 0x4, True),
+ ])
+
+ def test_iter_events(self):
+ periph = Peripheral(src_loc_at=0)
+ ev_0 = periph.event()
+ ev_1 = periph.event(mode="rise")
+ self.assertEqual((ev_0.name, ev_0.mode), ("ev_0", "level"))
+ self.assertEqual((ev_1.name, ev_1.mode), ("ev_1", "rise"))
+ self.assertEqual(list(periph.iter_events()), [
+ ev_0,
+ ev_1,
+ ])
+
+
+class CSRBankTestCase(unittest.TestCase):
+ def test_csr_name(self):
+ bank = CSRBank(name_prefix="foo")
+ bar = bank.csr(1, "r")
+ self.assertEqual(bar.name, "foo_bar")
+
+ def test_csr_name_wrong(self):
+ bank = CSRBank()
+ with self.assertRaisesRegex(TypeError,
+ r"Name must be a string, not 2"):
+ bank.csr(1, "r", name=2)
+
+ def test_iter_csr_regs(self):
+ bank = CSRBank()
+ csr_0 = bank.csr(1, "r")
+ csr_1 = bank.csr(8, "rw", addr=0x4, alignment=2)
+ self.assertEqual(list(bank.iter_csr_regs()), [
+ (csr_0, None, None),
+ (csr_1, 0x4, 2),
+ ])
+
+
+class PeripheralBridgeTestCase(unittest.TestCase):
+ def test_periph_wrong(self):
+ with self.assertRaisesRegex(TypeError,
+ r"Peripheral must be an instance of Peripheral, not 'foo'"):
+ PeripheralBridge('foo', data_width=8, granularity=8, features=(), alignment=0)
+
+
+class PeripheralSimulationTestCase(unittest.TestCase):
+ def test_csrs(self):
+ class DummyPeripheral(Peripheral, Elaboratable):
+ def __init__(self):
+ super().__init__()
+ bank = self.csr_bank(addr=0x100)
+ self._csr_0 = bank.csr(8, "r")
+ self._csr_1 = bank.csr(8, "r", addr=0x8, alignment=4)
+ self._csr_2 = bank.csr(8, "rw")
+
+ self.win_0 = self.window(addr_width=1, data_width=8, sparse=True, addr=0x000)
+ self.win_1 = self.window(addr_width=1, data_width=32, granularity=8, addr=0x200)
+
+ self._bridge = self.bridge(data_width=32, granularity=8, alignment=2)
+ self.bus = self._bridge.bus
+
+ def elaborate(self, platform):
+ m = Module()
+ m.submodules.bridge = self._bridge
+ m.d.comb += [
+ self._csr_0.r_data.eq(0xa),
+ self._csr_1.r_data.eq(0xb),
+ ]
+ with m.If(self._csr_2.w_stb):
+ m.d.sync += self._csr_2.r_data.eq(self._csr_2.w_data)
+ return m
+
+ dut = DummyPeripheral()
+
+ def process():
+ self.assertEqual((yield from wb_read(dut.bus, addr=0x100 >> 2, sel=0xf)), 0xa)
+ yield
+ self.assertEqual((yield from wb_read(dut.bus, addr=0x108 >> 2, sel=0xf)), 0xb)
+ yield
+ yield from wb_write(dut.bus, addr=0x118 >> 2, data=0xc, sel=0xf)
+ yield
+ self.assertEqual((yield from wb_read(dut.bus, addr=0x118 >> 2, sel=0xf)), 0xc)
+ yield
+
+ yield dut.bus.cyc.eq(1)
+ yield dut.bus.adr.eq(0x000 >> 2)
+ yield Delay(1e-7)
+ self.assertEqual((yield dut.win_0.cyc), 1)
+
+ yield dut.bus.adr.eq(0x200 >> 2)
+ yield Delay(1e-7)
+ self.assertEqual((yield dut.win_1.cyc), 1)
+
+ simulation_test(dut, process)
+
+ def test_events(self):
+ class DummyPeripheral(Peripheral, Elaboratable):
+ def __init__(self):
+ super().__init__()
+ self.ev_0 = self.event()
+ self.ev_1 = self.event(mode="rise")
+ self.ev_2 = self.event(mode="fall")
+ self._bridge = self.bridge(data_width=8)
+ self.bus = self._bridge.bus
+ self.irq = self._bridge.irq
+
+ def elaborate(self, platform):
+ m = Module()
+ m.submodules.bridge = self._bridge
+ return m
+
+ dut = DummyPeripheral()
+
+ ev_status_addr = 0x0
+ ev_pending_addr = 0x1
+ ev_enable_addr = 0x2
+
+ def process():
+ yield dut.ev_0.stb.eq(1)
+ yield dut.ev_1.stb.eq(0)
+ yield dut.ev_2.stb.eq(1)
+ yield
+ self.assertEqual((yield dut.irq), 0)
+ self.assertEqual((yield from wb_read(dut.bus, ev_status_addr, sel=0xf)), 0b101)
+ yield
+
+ yield from wb_write(dut.bus, ev_enable_addr, data=0b111, sel=0xf)
+ yield
+ self.assertEqual((yield dut.irq), 1)
+
+ yield from wb_write(dut.bus, ev_pending_addr, data=0b001, sel=0xf)
+ yield
+ self.assertEqual((yield from wb_read(dut.bus, ev_pending_addr, sel=0xf)), 0b001)
+ yield
+ self.assertEqual((yield dut.irq), 1)
+ yield dut.ev_0.stb.eq(0)
+ yield from wb_write(dut.bus, ev_pending_addr, data=0b001, sel=0xf)
+ yield
+ self.assertEqual((yield dut.irq), 0)
+
+ yield dut.ev_1.stb.eq(1)
+ yield
+ self.assertEqual((yield from wb_read(dut.bus, ev_pending_addr, sel=0xf)), 0b010)
+ yield
+ self.assertEqual((yield dut.irq), 1)
+ yield from wb_write(dut.bus, ev_pending_addr, data=0b010, sel=0xf)
+ yield
+ self.assertEqual((yield dut.irq), 0)
+
+ yield dut.ev_2.stb.eq(0)
+ yield
+ self.assertEqual((yield from wb_read(dut.bus, ev_pending_addr, sel=0xf)), 0b100)
+ yield
+ self.assertEqual((yield dut.irq), 1)
+ yield from wb_write(dut.bus, ev_pending_addr, data=0b100, sel=0xf)
+ yield
+ self.assertEqual((yield dut.irq), 0)
+
+ simulation_test(dut, process)
--- /dev/null
+# nmigen: UnusedElaboratable=no
+
+import unittest
+from nmigen import *
+from nmigen.back.pysim import *
+
+from ..periph._event import *
+
+
+def simulation_test(dut, process):
+ with Simulator(dut, vcd_file=open("test.vcd", "w")) as sim:
+ sim.add_clock(1e-6)
+ sim.add_sync_process(process)
+ sim.run()
+
+
+class EventSourceTestCase(unittest.TestCase):
+ def test_simple(self):
+ ev = EventSource()
+ self.assertEqual(ev.name, "ev")
+ self.assertEqual(ev.mode, "level")
+
+ def test_name_wrong(self):
+ with self.assertRaisesRegex(TypeError,
+ r"Name must be a string, not 2"):
+ EventSource(name=2)
+
+ def test_mode_wrong(self):
+ with self.assertRaisesRegex(ValueError,
+ r"Invalid trigger mode 'foo'; must be one of level, rise, fall"):
+ ev = EventSource(mode="foo")
+
+
+class InterruptSourceTestCase(unittest.TestCase):
+ def test_simple(self):
+ ev_0 = EventSource()
+ ev_1 = EventSource()
+ dut = InterruptSource((ev_0, ev_1))
+ self.assertEqual(dut.name, "dut")
+ self.assertEqual(dut.status.width, 2)
+ self.assertEqual(dut.pending.width, 2)
+ self.assertEqual(dut.enable.width, 2)
+
+ def test_name_wrong(self):
+ with self.assertRaisesRegex(TypeError,
+ r"Name must be a string, not 2"):
+ InterruptSource((), name=2)
+
+ def test_event_wrong(self):
+ with self.assertRaisesRegex(TypeError,
+ r"Event source must be an instance of EventSource, not 'foo'"):
+ dut = InterruptSource(("foo",))
+
+ def test_events(self):
+ ev_0 = EventSource(mode="level")
+ ev_1 = EventSource(mode="rise")
+ ev_2 = EventSource(mode="fall")
+ dut = InterruptSource((ev_0, ev_1, ev_2))
+
+ def process():
+ yield ev_0.stb.eq(1)
+ yield ev_1.stb.eq(0)
+ yield ev_2.stb.eq(1)
+ yield
+ self.assertEqual((yield dut.irq), 0)
+
+ yield dut.status.r_stb.eq(1)
+ yield
+ yield dut.status.r_stb.eq(0)
+ yield
+ self.assertEqual((yield dut.status.r_data), 0b101)
+ yield
+
+ yield dut.enable.w_stb.eq(1)
+ yield dut.enable.w_data.eq(0b111)
+ yield
+ yield dut.enable.w_stb.eq(0)
+ yield
+ yield
+ self.assertEqual((yield dut.irq), 1)
+
+ yield dut.pending.w_stb.eq(1)
+ yield dut.pending.w_data.eq(0b001)
+ yield
+ yield dut.pending.w_stb.eq(0)
+ yield
+
+ yield dut.pending.r_stb.eq(1)
+ yield
+ yield dut.pending.r_stb.eq(0)
+ yield
+ self.assertEqual((yield dut.pending.r_data), 0b001)
+ self.assertEqual((yield dut.irq), 1)
+ yield
+
+ yield ev_0.stb.eq(0)
+ yield dut.pending.w_stb.eq(1)
+ yield dut.pending.w_data.eq(0b001)
+ yield
+ yield dut.pending.w_stb.eq(0)
+ yield
+ yield
+ self.assertEqual((yield dut.irq), 0)
+
+ yield ev_1.stb.eq(1)
+ yield dut.pending.r_stb.eq(1)
+ yield
+ yield dut.pending.r_stb.eq(0)
+ yield
+ self.assertEqual((yield dut.pending.r_data), 0b010)
+ self.assertEqual((yield dut.irq), 1)
+
+ yield dut.pending.w_stb.eq(1)
+ yield dut.pending.w_data.eq(0b010)
+ yield
+ yield dut.pending.w_stb.eq(0)
+ yield
+ yield
+ self.assertEqual((yield dut.irq), 0)
+
+ yield ev_2.stb.eq(0)
+ yield
+ yield dut.pending.r_stb.eq(1)
+ yield
+ yield dut.pending.r_stb.eq(0)
+ yield
+ self.assertEqual((yield dut.pending.r_data), 0b100)
+ self.assertEqual((yield dut.irq), 1)
+
+ yield dut.pending.w_stb.eq(1)
+ yield dut.pending.w_data.eq(0b100)
+ yield
+ yield dut.pending.w_stb.eq(0)
+ yield
+ yield
+ self.assertEqual((yield dut.irq), 0)
+
+ simulation_test(dut, process)