from collections.abc import Iterable
from contextlib import contextmanager
-from ..tools import flatten
+from ..tools import flatten, bits_for
from .ast import *
from .ir import *
from .xfrm import *
self._ctrl_context = "Switch"
self._statements = _outer_case
+ @contextmanager
+ def FSM(self, reset=None, domain="sync", name="fsm"):
+ self._check_context("FSM", context=None)
+ fsm_data = self._set_ctrl("FSM", {
+ "signal": Signal(name="{}_state".format(name)),
+ "domain": domain,
+ "encoding": OrderedDict(),
+ "states": OrderedDict(),
+ })
+ if reset is not None:
+ fsm_data["encoding"][reset] = 0
+ try:
+ self._ctrl_context = "FSM"
+ self.domain._depth += 1
+ yield
+ finally:
+ self.domain._depth -= 1
+ self._ctrl_context = None
+ self._pop_ctrl()
+
+ @contextmanager
+ def State(self, name):
+ self._check_context("FSM State", context="FSM")
+ fsm_data = self._get_ctrl("FSM")
+ if name in fsm_data["states"]:
+ raise SyntaxError("FSM state '{}' is already defined".format(name))
+ if name not in fsm_data["encoding"]:
+ fsm_data["encoding"][name] = len(fsm_data["encoding"])
+ try:
+ _outer_case, self._statements = self._statements, []
+ self._ctrl_context = None
+ yield
+ self._flush_ctrl()
+ fsm_data["states"][name] = self._statements
+ finally:
+ self._ctrl_context = "FSM"
+ self._statements = _outer_case
+
+ @property
+ def next(self):
+ raise SyntaxError("Only assignment to `m.next` is permitted")
+
+ @next.setter
+ def next(self, name):
+ for ctrl_name, ctrl_data in reversed(self._ctrl_stack):
+ if ctrl_name == "FSM":
+ if name not in ctrl_data["encoding"]:
+ ctrl_data["encoding"][name] = len(ctrl_data["encoding"])
+ self._add_statement(
+ assigns=[ctrl_data["signal"].eq(ctrl_data["encoding"][name])],
+ domain=ctrl_data["domain"],
+ depth=len(self._ctrl_stack))
+ break
+ else:
+ raise SyntaxError("`m.next = <...>` is only permitted inside an FSM")
+
def _pop_ctrl(self):
name, data = self._ctrl_stack.pop()
self._statements.append(Switch(switch_test, switch_cases))
+ if name == "FSM":
+ fsm_signal, fsm_encoding, fsm_states = data["signal"], data["encoding"], data["states"]
+ fsm_signal.nbits = bits_for(len(fsm_encoding) - 1)
+ # The FSM is encoded such that the state with encoding 0 is always the reset state.
+ self._statements.append(Switch(fsm_signal,
+ OrderedDict((fsm_encoding[name], stmts) for name, stmts in fsm_states.items())))
+
def _add_statement(self, assigns, domain, depth, compat_mode=False):
def domain_name(domain):
if domain is None:
with m.If(self.s2):
pass
+ def test_FSM_basic(self):
+ a = Signal()
+ b = Signal()
+ c = Signal()
+ m = Module()
+ with m.FSM():
+ with m.State("FIRST"):
+ m.d.comb += a.eq(1)
+ m.next = "SECOND"
+ with m.State("SECOND"):
+ m.d.sync += b.eq(~b)
+ with m.If(c):
+ m.next = "FIRST"
+ m._flush()
+ self.assertRepr(m._statements, """
+ (
+ (switch (sig fsm_state)
+ (case 0
+ (eq (sig a) (const 1'd1))
+ (eq (sig fsm_state) (const 1'd1))
+ )
+ (case 1
+ (eq (sig b) (~ (sig b)))
+ (switch (cat (sig c))
+ (case 1
+ (eq (sig fsm_state) (const 1'd0)))
+ )
+ )
+ )
+ )
+ """)
+ self.assertEqual({repr(k): v for k, v in m._driving.items()}, {
+ "(sig a)": None,
+ "(sig fsm_state)": "sync",
+ "(sig b)": "sync",
+ })
+
+ def test_FSM_reset(self):
+ a = Signal()
+ m = Module()
+ with m.FSM(reset="SECOND"):
+ with m.State("FIRST"):
+ m.d.comb += a.eq(0)
+ m.next = "SECOND"
+ with m.State("SECOND"):
+ m.next = "FIRST"
+ m._flush()
+ self.assertRepr(m._statements, """
+ (
+ (switch (sig fsm_state)
+ (case 1
+ (eq (sig a) (const 1'd0))
+ (eq (sig fsm_state) (const 1'd0))
+ )
+ (case 0
+ (eq (sig fsm_state) (const 1'd1))
+ )
+ )
+ )
+ """)
+
+ def test_FSM_wrong_redefined(self):
+ m = Module()
+ with m.FSM():
+ with m.State("FOO"):
+ pass
+ with self.assertRaises(SyntaxError,
+ msg="FSM state 'FOO' is already defined"):
+ with m.State("FOO"):
+ pass
+
+ def test_FSM_wrong_next(self):
+ m = Module()
+ with self.assertRaises(SyntaxError,
+ msg="Only assignment to `m.next` is permitted"):
+ m.next
+ with self.assertRaises(SyntaxError,
+ msg="`m.next = <...>` is only permitted inside an FSM"):
+ m.next = "FOO"
+
def test_auto_pop_ctrl(self):
m = Module()
with m.If(self.w1):