From: Jean-François Nguyen Date: Wed, 25 Mar 2020 13:24:04 +0000 (+0100) Subject: periph._event → periph.event X-Git-Url: https://git.libre-soc.org/?a=commitdiff_plain;h=06d55a412e8d67557902f6f447c1eda214d4b65a;p=lambdasoc.git periph._event → periph.event --- diff --git a/lambdasoc/periph/__init__.py b/lambdasoc/periph/__init__.py index 9b5ed21..511f0c1 100644 --- a/lambdasoc/periph/__init__.py +++ b/lambdasoc/periph/__init__.py @@ -1 +1,2 @@ from .base import * +from .event import * diff --git a/lambdasoc/periph/_event.py b/lambdasoc/periph/_event.py deleted file mode 100644 index 53df4aa..0000000 --- a/lambdasoc/periph/_event.py +++ /dev/null @@ -1,135 +0,0 @@ -from nmigen import * -from nmigen import tracer - -from nmigen_soc import csr - - -__all__ = ["EventSource", "IRQLine", "InterruptSource"] - - -class EventSource: - """Event source. - - Parameters - ---------- - mode : ``"level"``, ``"rise"``, ``"fall"`` - Trigger mode. If ``"level"``, a notification is raised when the ``stb`` signal is high. - If ``"rise"`` (or ``"fall"``) a notification is raised on a rising (or falling) edge - of ``stb``. - name : str - Name of the event. If ``None`` (default) the name is inferred from the variable - name this event source is assigned to. - - Attributes - ---------- - name : str - Name of the event - mode : ``"level"``, ``"rise"``, ``"fall"`` - Trigger mode. - stb : Signal, in - Event strobe. - """ - def __init__(self, *, mode="level", name=None, src_loc_at=0): - if name is not None and not isinstance(name, str): - raise TypeError("Name must be a string, not {!r}".format(name)) - - choices = ("level", "rise", "fall") - if mode not in choices: - raise ValueError("Invalid trigger mode {!r}; must be one of {}" - .format(mode, ", ".join(choices))) - - self.name = name or tracer.get_var_name(depth=2 + src_loc_at) - self.mode = mode - self.stb = Signal(name="{}_stb".format(self.name)) - - -class IRQLine(Signal): - """Interrupt request line.""" - def __init__(self, *, name=None, src_loc_at=0): - super().__init__(name=name, src_loc_at=1 + src_loc_at) - - __hash__ = object.__hash__ - - -class InterruptSource(Elaboratable): - """Interrupt source. - - A mean of gathering multiple event sources into a single interrupt request line. - - Parameters - ---------- - events : iter(:class:`EventSource`) - Event sources. - name : str - Name of the interrupt source. If ``None`` (default) the name is inferred from the - variable name this interrupt source is assigned to. - - Attributes - ---------- - name : str - Name of the interrupt source. - status : :class:`csr.Element`, read-only - Event status register. Each bit displays the level of the strobe of an event source. - Events are ordered by position in the `events` parameter. - pending : :class:`csr.Element`, read/write - Event pending register. If a bit is 1, the associated event source has a pending - notification. Writing 1 to a bit clears it. - Events are ordered by position in the `events` parameter. - enable : :class:`csr.Element`, read/write - Event enable register. Writing 1 to a bit enables its associated event source. - Writing 0 disables it. - Events are ordered by position in the `events` parameter. - irq : :class:`IRQLine`, out - Interrupt request. It is raised if any event source is enabled and has a pending - notification. - """ - def __init__(self, events, *, name=None, src_loc_at=0): - if name is not None and not isinstance(name, str): - raise TypeError("Name must be a string, not {!r}".format(name)) - self.name = name or tracer.get_var_name(depth=2 + src_loc_at) - - for event in events: - if not isinstance(event, EventSource): - raise TypeError("Event source must be an instance of EventSource, not {!r}" - .format(event)) - self._events = list(events) - - width = len(events) - self.status = csr.Element(width, "r", name="{}_status".format(self.name)) - self.pending = csr.Element(width, "rw", name="{}_pending".format(self.name)) - self.enable = csr.Element(width, "rw", name="{}_enable".format(self.name)) - - self.irq = IRQLine(name="{}_irq".format(self.name)) - - def elaborate(self, platform): - m = Module() - - with m.If(self.pending.w_stb): - m.d.sync += self.pending.r_data.eq(self.pending.r_data & ~self.pending.w_data) - - with m.If(self.enable.w_stb): - m.d.sync += self.enable.r_data.eq(self.enable.w_data) - - for i, event in enumerate(self._events): - m.d.sync += self.status.r_data[i].eq(event.stb) - - if event.mode in ("rise", "fall"): - event_stb_r = Signal.like(event.stb, name_suffix="_r") - m.d.sync += event_stb_r.eq(event.stb) - - event_trigger = Signal(name="{}_trigger".format(event.name)) - if event.mode == "level": - m.d.comb += event_trigger.eq(event.stb) - elif event.mode == "rise": - m.d.comb += event_trigger.eq(~event_stb_r & event.stb) - elif event.mode == "fall": - m.d.comb += event_trigger.eq(event_stb_r & ~event.stb) - else: - assert False # :nocov: - - with m.If(event_trigger): - m.d.sync += self.pending.r_data[i].eq(1) - - m.d.comb += self.irq.eq((self.pending.r_data & self.enable.r_data).any()) - - return m diff --git a/lambdasoc/periph/base.py b/lambdasoc/periph/base.py index 6101946..8789ee3 100644 --- a/lambdasoc/periph/base.py +++ b/lambdasoc/periph/base.py @@ -7,7 +7,7 @@ from nmigen_soc.memory import MemoryMap from nmigen_soc.csr.wishbone import WishboneCSRBridge -from ._event import * +from .event import * __all__ = ["Peripheral", "CSRBank", "PeripheralBridge"] diff --git a/lambdasoc/periph/event.py b/lambdasoc/periph/event.py new file mode 100644 index 0000000..53df4aa --- /dev/null +++ b/lambdasoc/periph/event.py @@ -0,0 +1,135 @@ +from nmigen import * +from nmigen import tracer + +from nmigen_soc import csr + + +__all__ = ["EventSource", "IRQLine", "InterruptSource"] + + +class EventSource: + """Event source. + + Parameters + ---------- + mode : ``"level"``, ``"rise"``, ``"fall"`` + Trigger mode. If ``"level"``, a notification is raised when the ``stb`` signal is high. + If ``"rise"`` (or ``"fall"``) a notification is raised on a rising (or falling) edge + of ``stb``. + name : str + Name of the event. If ``None`` (default) the name is inferred from the variable + name this event source is assigned to. + + Attributes + ---------- + name : str + Name of the event + mode : ``"level"``, ``"rise"``, ``"fall"`` + Trigger mode. + stb : Signal, in + Event strobe. + """ + def __init__(self, *, mode="level", name=None, src_loc_at=0): + if name is not None and not isinstance(name, str): + raise TypeError("Name must be a string, not {!r}".format(name)) + + choices = ("level", "rise", "fall") + if mode not in choices: + raise ValueError("Invalid trigger mode {!r}; must be one of {}" + .format(mode, ", ".join(choices))) + + self.name = name or tracer.get_var_name(depth=2 + src_loc_at) + self.mode = mode + self.stb = Signal(name="{}_stb".format(self.name)) + + +class IRQLine(Signal): + """Interrupt request line.""" + def __init__(self, *, name=None, src_loc_at=0): + super().__init__(name=name, src_loc_at=1 + src_loc_at) + + __hash__ = object.__hash__ + + +class InterruptSource(Elaboratable): + """Interrupt source. + + A mean of gathering multiple event sources into a single interrupt request line. + + Parameters + ---------- + events : iter(:class:`EventSource`) + Event sources. + name : str + Name of the interrupt source. If ``None`` (default) the name is inferred from the + variable name this interrupt source is assigned to. + + Attributes + ---------- + name : str + Name of the interrupt source. + status : :class:`csr.Element`, read-only + Event status register. Each bit displays the level of the strobe of an event source. + Events are ordered by position in the `events` parameter. + pending : :class:`csr.Element`, read/write + Event pending register. If a bit is 1, the associated event source has a pending + notification. Writing 1 to a bit clears it. + Events are ordered by position in the `events` parameter. + enable : :class:`csr.Element`, read/write + Event enable register. Writing 1 to a bit enables its associated event source. + Writing 0 disables it. + Events are ordered by position in the `events` parameter. + irq : :class:`IRQLine`, out + Interrupt request. It is raised if any event source is enabled and has a pending + notification. + """ + def __init__(self, events, *, name=None, src_loc_at=0): + if name is not None and not isinstance(name, str): + raise TypeError("Name must be a string, not {!r}".format(name)) + self.name = name or tracer.get_var_name(depth=2 + src_loc_at) + + for event in events: + if not isinstance(event, EventSource): + raise TypeError("Event source must be an instance of EventSource, not {!r}" + .format(event)) + self._events = list(events) + + width = len(events) + self.status = csr.Element(width, "r", name="{}_status".format(self.name)) + self.pending = csr.Element(width, "rw", name="{}_pending".format(self.name)) + self.enable = csr.Element(width, "rw", name="{}_enable".format(self.name)) + + self.irq = IRQLine(name="{}_irq".format(self.name)) + + def elaborate(self, platform): + m = Module() + + with m.If(self.pending.w_stb): + m.d.sync += self.pending.r_data.eq(self.pending.r_data & ~self.pending.w_data) + + with m.If(self.enable.w_stb): + m.d.sync += self.enable.r_data.eq(self.enable.w_data) + + for i, event in enumerate(self._events): + m.d.sync += self.status.r_data[i].eq(event.stb) + + if event.mode in ("rise", "fall"): + event_stb_r = Signal.like(event.stb, name_suffix="_r") + m.d.sync += event_stb_r.eq(event.stb) + + event_trigger = Signal(name="{}_trigger".format(event.name)) + if event.mode == "level": + m.d.comb += event_trigger.eq(event.stb) + elif event.mode == "rise": + m.d.comb += event_trigger.eq(~event_stb_r & event.stb) + elif event.mode == "fall": + m.d.comb += event_trigger.eq(event_stb_r & ~event.stb) + else: + assert False # :nocov: + + with m.If(event_trigger): + m.d.sync += self.pending.r_data[i].eq(1) + + m.d.comb += self.irq.eq((self.pending.r_data & self.enable.r_data).any()) + + return m diff --git a/lambdasoc/test/test_periph_event.py b/lambdasoc/test/test_periph_event.py index 4934af7..69aacf2 100644 --- a/lambdasoc/test/test_periph_event.py +++ b/lambdasoc/test/test_periph_event.py @@ -4,7 +4,7 @@ import unittest from nmigen import * from nmigen.back.pysim import * -from ..periph._event import * +from ..periph.event import * def simulation_test(dut, process):