Compatibility summary
---------------------
- - (−) `fhdl`
+ - (−) `fhdl` → `.hdl`
- (+) `bitcontainer` ⇒ `.tools`
- (+) `log2_int` id
- (+) `bits_for` id
- (+) `value_bits_sign` → `Value.shape`
- (−) `conv_output` ?
- - (+) `decorators` ⇒ `.fhdl.xfrm`
+ - (+) `decorators` ⇒ `.hdl.xfrm`
<br>Note: `transform_*` methods not considered part of public API.
- (⊙) `ModuleTransformer` **brk**
- (⊙) `ControlInserter` **brk**
- (+) `ResetInserter` id, `clock_domains=`→`controls=`
- (+) `ClockDomainsRenamer` → `DomainRenamer`, `cd_remapping=`→`domain_map=`
- (⊙) `edif` **brk**
- - (+) `module` **obs** → `.fhdl.dsl`
+ - (+) `module` **obs** → `.hdl.dsl`
- (+) `FinalizeError` **obs**
- - (+) `Module` **obs** → `.fhdl.dsl.Module`
+ - (+) `Module` **obs** → `.hdl.dsl.Module`
- (⊙) `namer` **brk**
- (−) `simplify` ?
- (−) `FullMemoryWE` ?
- (−) `specials` **obs**
- (−) `Special` ?
- (−) `Tristate` ?
- - (+) `TSTriple` → `.genlib.io.TSTriple`, `bits_sign=`→`shape=`
+ - (+) `TSTriple` → `.lib.io.TSTriple`, `bits_sign=`→`shape=`
- (−) `Instance` ?
- (−) `READ_FIRST`/`WRITE_FIRST`/`NO_CHANGE` ?
- (−) `_MemoryPort` ?
- (−) `Memory` ?
- - (−) `structure` → `.fhdl.ast`
+ - (−) `structure` → `.hdl.ast`
- (+) `DUID` id
- (+) `_Value` → `Value`
<br>Note: values no longer valid as keys in `dict` and `set`; use `ValueDict` and `ValueSet` instead.
- (+) `_Statement` → `Statement`
- (+) `_Assign` → `Assign`, `l=`→`lhs=`, `r=`→`rhs=`
- (-) `_check_statement` **obs** → `Statement.wrap`
- - (+) `If` **obs** → `.fhdl.dsl.Module.If`
- - (+) `Case` **obs** → `.fhdl.dsl.Module.Switch`
+ - (+) `If` **obs** → `.hdl.dsl.Module.If`
+ - (+) `Case` **obs** → `.hdl.dsl.Module.Switch`
- (−) `_ArrayProxy` ?
- (−) `Array` ?
- - (+) `ClockDomain` → `.fhdl.cd.ClockDomain`
+ - (+) `ClockDomain` → `.hdl.cd.ClockDomain`
- (−) `_ClockDomainList` ?
- (−) `SPECIAL_INPUT`/`SPECIAL_OUTPUT`/`SPECIAL_INOUT` ?
- - (⊙) `_Fragment` **brk** → `.fhdl.ir.Fragment`
+ - (⊙) `_Fragment` **brk** → `.hdl.ir.Fragment`
- (−) `tools` **brk**
- (−) `list_signals` ?
- (−) `list_targets` ?
- (−) `is_variable` ?
- (⊙) `generate_reset` **brk**
- (⊙) `insert_reset` **brk**
- - (⊙) `insert_resets` **brk** → `.fhdl.xfrm.ResetInserter`
+ - (⊙) `insert_resets` **brk** → `.hdl.xfrm.ResetInserter`
- (⊙) `lower_basics` **brk**
- (⊙) `lower_complex_slices` **brk**
- (⊙) `lower_complex_parts` **brk**
- (⊙) `rename_clock_domain_expr` **brk**
- - (⊙) `rename_clock_domain` **brk** → `.fhdl.xfrm.DomainRenamer`
+ - (⊙) `rename_clock_domain` **brk** → `.hdl.xfrm.DomainRenamer`
- (⊙) `call_special_classmethod` **brk**
- (⊙) `lower_specials` **brk**
- (−) `tracer` **brk**
- (−) `verilog`
- (−) `DummyAttrTranslate` ?
- (−) `convert` **obs** → `.back.verilog.convert`
- - (⊙) `visit` **brk** → `.fhdl.xfrm`
+ - (⊙) `visit` **brk** → `.hdl.xfrm`
- (⊙) `NodeVisitor` **brk**
- - (⊙) `NodeTransformer` **brk** → `.fhdl.xfrm.ValueTransformer`/`.fhdl.xfrm.StatementTransformer`
- - (−) `genlib`
+ - (⊙) `NodeTransformer` **brk** → `.hdl.xfrm.ValueTransformer`/`.hdl.xfrm.StatementTransformer`
+ - (−) `genlib` → `.lib`
- (−) `cdc` ?
- (−) `MultiRegImpl` ?
- (+) `MultiReg` id
- (⊙) `vcd` **brk** → `vcd`
- (⊙) `Simulator` **brk**
- (+) `run_simulation` **obs** → `.back.pysim.Simulator`
- - (−) `passive` **obs** → `.fhdl.ast.Passive`
+ - (−) `passive` **obs** → `.hdl.ast.Passive`
- (−) `build` ?
- (+) `util` **obs**
- (+) `misc` ⇒ `.tools`
-from .fhdl.ast import Value, Const, Mux, Cat, Repl, Signal, ClockSignal, ResetSignal
-from .fhdl.dsl import Module
-from .fhdl.cd import ClockDomain
-from .fhdl.ir import Fragment
-from .fhdl.xfrm import ResetInserter, CEInserter
+from .hdl.ast import Value, Const, Mux, Cat, Repl, Signal, ClockSignal, ResetSignal
+from .hdl.dsl import Module
+from .hdl.cd import ClockDomain
+from .hdl.ir import Fragment
+from .hdl.xfrm import ResetInserter, CEInserter
-from .genlib.cdc import MultiReg
+from .lib.cdc import MultiReg
from vcd.gtkw import GTKWSave
from ..tools import flatten
-from ..fhdl.ast import *
-from ..fhdl.xfrm import ValueTransformer, StatementTransformer
+from ..hdl.ast import *
+from ..hdl.xfrm import ValueTransformer, StatementTransformer
__all__ = ["Simulator", "Delay", "Tick", "Passive", "DeadlineError"]
from collections import defaultdict, OrderedDict
from contextlib import contextmanager
-from ..fhdl import ast, ir, xfrm
+from ..hdl import ast, ir, xfrm
class _Namer:
from ... import tools
-from ...fhdl import ast
+from ...hdl import ast
from ...tools import deprecated
from collections.abc import Iterable
from ...tools import flatten, deprecated
-from ...fhdl import dsl
+from ...hdl import dsl
__all__ = ["Module", "FinalizeError"]
-from ...genlib.io import TSTriple as NativeTSTriple
+from ...lib.io import TSTriple as NativeTSTriple
__all__ = ["TSTriple"]
from collections import OrderedDict
from ...tools import deprecated
-from ...fhdl import ast
-from ...fhdl.ast import DUID, Value, Signal, Mux, Cat, Repl, Const, C, ClockSignal, ResetSignal
-from ...fhdl.cd import ClockDomain
+from ...hdl import ast
+from ...hdl.ast import DUID, Value, Signal, Mux, Cat, Repl, Const, C, ClockSignal, ResetSignal
+from ...hdl.cd import ClockDomain
__all__ = ["DUID", "wrap", "Mux", "Cat", "Replicate", "Constant", "C", "Signal", "ClockSignal",
-from ...genlib.cdc import MultiReg
+from ...lib.cdc import MultiReg
__all__ = ["MultiReg"]
import warnings
from collections import OrderedDict
-from ...fhdl.xfrm import ValueTransformer, StatementTransformer
-from ...fhdl.ast import *
+from ...hdl.xfrm import ValueTransformer, StatementTransformer
+from ...hdl.ast import *
from ..fhdl.module import CompatModule, CompatFinalizeError
from ..fhdl.structure import If, Case
+++ /dev/null
-from abc import ABCMeta, abstractmethod
-import builtins
-import traceback
-from collections import OrderedDict
-from collections.abc import Iterable, MutableMapping, MutableSet
-
-from .. import tracer
-from ..tools import *
-
-
-__all__ = [
- "Value", "Const", "C", "Operator", "Mux", "Part", "Slice", "Cat", "Repl",
- "Signal", "ClockSignal", "ResetSignal",
- "Statement", "Assign", "Switch", "Delay", "Tick", "Passive",
- "ValueKey", "ValueDict", "ValueSet",
-]
-
-
-class DUID:
- """Deterministic Unique IDentifier"""
- __next_uid = 0
- def __init__(self):
- self.duid = DUID.__next_uid
- DUID.__next_uid += 1
-
-
-class Value(metaclass=ABCMeta):
- @staticmethod
- def wrap(obj):
- """Ensures that the passed object is a Migen value. Booleans and integers
- are automatically wrapped into ``Const``."""
- if isinstance(obj, Value):
- return obj
- elif isinstance(obj, (bool, int)):
- return Const(obj)
- else:
- raise TypeError("Object '{!r}' is not a Migen value".format(obj))
-
- def __init__(self, src_loc_at=0):
- super().__init__()
-
- src_loc_at += 3
- tb = traceback.extract_stack(limit=src_loc_at)
- if len(tb) < src_loc_at:
- self.src_loc = None
- else:
- self.src_loc = (tb[0].filename, tb[0].lineno)
-
- def __bool__(self):
- raise TypeError("Attempted to convert Migen value to boolean")
-
- def __invert__(self):
- return Operator("~", [self])
- def __neg__(self):
- return Operator("-", [self])
-
- def __add__(self, other):
- return Operator("+", [self, other])
- def __radd__(self, other):
- return Operator("+", [other, self])
- def __sub__(self, other):
- return Operator("-", [self, other])
- def __rsub__(self, other):
- return Operator("-", [other, self])
- def __mul__(self, other):
- return Operator("*", [self, other])
- def __rmul__(self, other):
- return Operator("*", [other, self])
- def __mod__(self, other):
- return Operator("%", [self, other])
- def __rmod__(self, other):
- return Operator("%", [other, self])
- def __div__(self, other):
- return Operator("/", [self, other])
- def __rdiv__(self, other):
- return Operator("/", [other, self])
- def __lshift__(self, other):
- return Operator("<<", [self, other])
- def __rlshift__(self, other):
- return Operator("<<", [other, self])
- def __rshift__(self, other):
- return Operator(">>", [self, other])
- def __rrshift__(self, other):
- return Operator(">>", [other, self])
- def __and__(self, other):
- return Operator("&", [self, other])
- def __rand__(self, other):
- return Operator("&", [other, self])
- def __xor__(self, other):
- return Operator("^", [self, other])
- def __rxor__(self, other):
- return Operator("^", [other, self])
- def __or__(self, other):
- return Operator("|", [self, other])
- def __ror__(self, other):
- return Operator("|", [other, self])
-
- def __eq__(self, other):
- return Operator("==", [self, other])
- def __ne__(self, other):
- return Operator("!=", [self, other])
- def __lt__(self, other):
- return Operator("<", [self, other])
- def __le__(self, other):
- return Operator("<=", [self, other])
- def __gt__(self, other):
- return Operator(">", [self, other])
- def __ge__(self, other):
- return Operator(">=", [self, other])
-
- def __len__(self):
- return self.shape()[0]
-
- def __getitem__(self, key):
- n = len(self)
- if isinstance(key, int):
- if key not in range(-n, n):
- raise IndexError("Cannot index {} bits into {}-bit value".format(key, n))
- if key < 0:
- key += n
- return Slice(self, key, key + 1)
- elif isinstance(key, slice):
- start, stop, step = key.indices(n)
- if step != 1:
- return Cat(self[i] for i in range(start, stop, step))
- return Slice(self, start, stop)
- else:
- raise TypeError("Cannot index value with {}".format(repr(key)))
-
- def bool(self):
- """Conversion to boolean.
-
- Returns
- -------
- Value, out
- Output ``Value``. If any bits are set, returns ``1``, else ``0``.
- """
- return Operator("b", [self])
-
- def part(self, offset, width):
- """Indexed part-select.
-
- Selects a constant width but variable offset part of a ``Value``.
-
- Parameters
- ----------
- offset : Value, in
- start point of the selected bits
- width : int
- number of selected bits
-
- Returns
- -------
- Part, out
- Selected part of the ``Value``
- """
- return Part(self, offset, width)
-
- def eq(self, value):
- """Assignment.
-
- Parameters
- ----------
- value : Value, in
- Value to be assigned.
-
- Returns
- -------
- Assign
- Assignment statement that can be used in combinatorial or synchronous context.
- """
- return Assign(self, value)
-
- @abstractmethod
- def shape(self):
- """Bit length and signedness of a value.
-
- Returns
- -------
- int, bool
- Number of bits required to store `v` or available in `v`, followed by
- whether `v` has a sign bit (included in the bit count).
-
- Examples
- --------
- >>> Value.shape(Signal(8))
- 8, False
- >>> Value.shape(C(0xaa))
- 8, False
- """
- pass # :nocov:
-
- def _lhs_signals(self):
- raise TypeError("Value {!r} cannot be used in assignments".format(self))
-
- @abstractmethod
- def _rhs_signals(self):
- pass # :nocov:
-
- __hash__ = None
-
-
-class Const(Value):
- """A constant, literal integer value.
-
- Parameters
- ----------
- value : int
- shape : int or tuple or None
- Either an integer `bits` or a tuple `(bits, signed)`
- specifying the number of bits in this `Const` and whether it is
- signed (can represent negative values). `shape` defaults
- to the minimum width and signedness of `value`.
-
- Attributes
- ----------
- nbits : int
- signed : bool
- """
- src_loc = None
-
- @staticmethod
- def normalize(value, shape):
- nbits, signed = shape
- mask = (1 << nbits) - 1
- value &= mask
- if signed and value >> (nbits - 1):
- value |= ~mask
- return value
-
- def __init__(self, value, shape=None):
- self.value = int(value)
- if shape is None:
- shape = bits_for(self.value), self.value < 0
- if isinstance(shape, int):
- shape = shape, self.value < 0
- self.nbits, self.signed = shape
- if not isinstance(self.nbits, int) or self.nbits < 0:
- raise TypeError("Width must be a non-negative integer")
- self.value = self.normalize(self.value, shape)
-
- def shape(self):
- return self.nbits, self.signed
-
- def _rhs_signals(self):
- return ValueSet()
-
- def __repr__(self):
- return "(const {}'{}d{})".format(self.nbits, "s" if self.signed else "", self.value)
-
-
-C = Const # shorthand
-
-
-class Operator(Value):
- def __init__(self, op, operands, src_loc_at=0):
- super().__init__(src_loc_at=1 + src_loc_at)
- self.op = op
- self.operands = [Value.wrap(o) for o in operands]
-
- @staticmethod
- def _bitwise_binary_shape(a_shape, b_shape):
- a_bits, a_sign = a_shape
- b_bits, b_sign = b_shape
- if not a_sign and not b_sign:
- # both operands unsigned
- return max(a_bits, b_bits), False
- elif a_sign and b_sign:
- # both operands signed
- return max(a_bits, b_bits), True
- elif not a_sign and b_sign:
- # first operand unsigned (add sign bit), second operand signed
- return max(a_bits + 1, b_bits), True
- else:
- # first signed, second operand unsigned (add sign bit)
- return max(a_bits, b_bits + 1), True
-
- def shape(self):
- op_shapes = list(map(lambda x: x.shape(), self.operands))
- if len(op_shapes) == 1:
- (a_bits, a_sign), = op_shapes
- if self.op in ("+", "~"):
- return a_bits, a_sign
- if self.op == "-":
- if not a_sign:
- return a_bits + 1, True
- else:
- return a_bits, a_sign
- if self.op == "b":
- return 1, False
- elif len(op_shapes) == 2:
- (a_bits, a_sign), (b_bits, b_sign) = op_shapes
- if self.op == "+" or self.op == "-":
- bits, sign = self._bitwise_binary_shape(*op_shapes)
- return bits + 1, sign
- if self.op == "*":
- if not a_sign and not b_sign:
- # both operands unsigned
- return a_bits + b_bits, False
- if a_sign and b_sign:
- # both operands signed
- return a_bits + b_bits - 1, True
- # one operand signed, the other unsigned (add sign bit)
- return a_bits + b_bits + 1 - 1, True
- if self.op in ("<", "<=", "==", "!=", ">", ">=", "b"):
- return 1, False
- if self.op in ("&", "^", "|"):
- return self._bitwise_binary_shape(*op_shapes)
- if self.op == "<<":
- if b_sign:
- extra = 2 ** (b_bits - 1) - 1
- else:
- extra = 2 ** (b_bits) - 1
- return a_bits + extra, a_sign
- if self.op == ">>":
- if b_sign:
- extra = 2 ** (b_bits - 1)
- else:
- extra = 0
- return a_bits + extra, a_sign
- elif len(op_shapes) == 3:
- if self.op == "m":
- s_shape, a_shape, b_shape = op_shapes
- return self._bitwise_binary_shape(a_shape, b_shape)
- raise NotImplementedError("Operator {}/{} not implemented"
- .format(self.op, len(op_shapes))) # :nocov:
-
- def _rhs_signals(self):
- return union(op._rhs_signals() for op in self.operands)
-
- def __repr__(self):
- return "({} {})".format(self.op, " ".join(map(repr, self.operands)))
-
-
-def Mux(sel, val1, val0):
- """Choose between two values.
-
- Parameters
- ----------
- sel : Value, in
- Selector.
- val1 : Value, in
- val0 : Value, in
- Input values.
-
- Returns
- -------
- Value, out
- Output ``Value``. If ``sel`` is asserted, the Mux returns ``val1``, else ``val0``.
- """
- return Operator("m", [sel, val1, val0], src_loc_at=1)
-
-
-class Slice(Value):
- def __init__(self, value, start, end):
- if not isinstance(start, int):
- raise TypeError("Slice start must be an integer, not '{!r}'".format(start))
- if not isinstance(end, int):
- raise TypeError("Slice end must be an integer, not '{!r}'".format(end))
-
- n = len(value)
- if start not in range(-n, n):
- raise IndexError("Cannot start slice {} bits into {}-bit value".format(start, n))
- if start < 0:
- start += n
- if end not in range(-(n+1), n+1):
- raise IndexError("Cannot end slice {} bits into {}-bit value".format(end, n))
- if end < 0:
- end += n
- if start > end:
- raise IndexError("Slice start {} must be less than slice end {}".format(start, end))
-
- super().__init__()
- self.value = Value.wrap(value)
- self.start = start
- self.end = end
-
- def shape(self):
- return self.end - self.start, False
-
- def _lhs_signals(self):
- return self.value._lhs_signals()
-
- def _rhs_signals(self):
- return self.value._rhs_signals()
-
- def __repr__(self):
- return "(slice {} {}:{})".format(repr(self.value), self.start, self.end)
-
-
-class Part(Value):
- def __init__(self, value, offset, width):
- if not isinstance(width, int) or width < 0:
- raise TypeError("Part width must be a non-negative integer, not '{!r}'".format(width))
-
- super().__init__()
- self.value = value
- self.offset = Value.wrap(offset)
- self.width = width
-
- def shape(self):
- return self.width, False
-
- def _lhs_signals(self):
- return self.value._lhs_signals()
-
- def _rhs_signals(self):
- return self.value._rhs_signals()
-
- def __repr__(self):
- return "(part {} {} {})".format(repr(self.value), repr(self.offset), self.width)
-
-
-class Cat(Value):
- """Concatenate values.
-
- Form a compound ``Value`` from several smaller ones by concatenation.
- The first argument occupies the lower bits of the result.
- The return value can be used on either side of an assignment, that
- is, the concatenated value can be used as an argument on the RHS or
- as a target on the LHS. If it is used on the LHS, it must solely
- consist of ``Signal`` s, slices of ``Signal`` s, and other concatenations
- meeting these properties. The bit length of the return value is the sum of
- the bit lengths of the arguments::
-
- len(Cat(args)) == sum(len(arg) for arg in args)
-
- Parameters
- ----------
- *args : Values or iterables of Values, inout
- ``Value`` s to be concatenated.
-
- Returns
- -------
- Value, inout
- Resulting ``Value`` obtained by concatentation.
- """
- def __init__(self, *args):
- super().__init__()
- self.operands = [Value.wrap(v) for v in flatten(args)]
-
- def shape(self):
- return sum(len(op) for op in self.operands), False
-
- def _lhs_signals(self):
- return union(op._lhs_signals() for op in self.operands)
-
- def _rhs_signals(self):
- return union(op._rhs_signals() for op in self.operands)
-
- def __repr__(self):
- return "(cat {})".format(" ".join(map(repr, self.operands)))
-
-
-class Repl(Value):
- """Replicate a value
-
- An input value is replicated (repeated) several times
- to be used on the RHS of assignments::
-
- len(Repl(s, n)) == len(s) * n
-
- Parameters
- ----------
- value : Value, in
- Input value to be replicated.
- count : int
- Number of replications.
-
- Returns
- -------
- Repl, out
- Replicated value.
- """
- def __init__(self, value, count):
- if not isinstance(count, int) or count < 0:
- raise TypeError("Replication count must be a non-negative integer, not '{!r}'"
- .format(count))
-
- super().__init__()
- self.value = Value.wrap(value)
- self.count = count
-
- def shape(self):
- return len(self.value) * self.count, False
-
- def _rhs_signals(self):
- return self.value._rhs_signals()
-
- def __repr__(self):
- return "(repl {!r} {})".format(self.value, self.count)
-
-
-class Signal(Value, DUID):
- """A varying integer value.
-
- Parameters
- ----------
- shape : int or tuple or None
- Either an integer ``bits`` or a tuple ``(bits, signed)`` specifying the number of bits
- in this ``Signal`` and whether it is signed (can represent negative values).
- ``shape`` defaults to 1-bit and non-signed.
- name : str
- Name hint for this signal. If ``None`` (default) the name is inferred from the variable
- name this ``Signal`` is assigned to. Name collisions are automatically resolved by
- prepending names of objects that contain this ``Signal`` and by appending integer
- sequences.
- reset : int
- Reset (synchronous) or default (combinatorial) value.
- When this ``Signal`` is assigned to in synchronous context and the corresponding clock
- domain is reset, the ``Signal`` assumes the given value. When this ``Signal`` is unassigned
- in combinatorial context (due to conditional assignments not being taken), the ``Signal``
- assumes its ``reset`` value. Defaults to 0.
- reset_less : bool
- If ``True``, do not generate reset logic for this ``Signal`` in synchronous statements.
- The ``reset`` value is only used as a combinatorial default or as the initial value.
- Defaults to ``False``.
- min : int or None
- max : int or None
- If ``shape`` is ``None``, the signal bit width and signedness are
- determined by the integer range given by ``min`` (inclusive,
- defaults to 0) and ``max`` (exclusive, defaults to 2).
- attrs : dict
- Dictionary of synthesis attributes.
- decoder : function
- A function converting integer signal values to human-readable strings (e.g. FSM state
- names).
-
- Attributes
- ----------
- nbits : int
- signed : bool
- name : str
- reset : int
- reset_less : bool
- attrs : dict
- """
-
- def __init__(self, shape=None, name=None, reset=0, reset_less=False, min=None, max=None,
- attrs=None, decoder=None, src_loc_at=0):
- super().__init__(src_loc_at=src_loc_at)
-
- if name is None:
- try:
- name = tracer.get_var_name(depth=2 + src_loc_at)
- except tracer.NameNotFound:
- name = "$signal"
- self.name = name
-
- if shape is None:
- if min is None:
- min = 0
- if max is None:
- max = 2
- max -= 1 # make both bounds inclusive
- if not min < max:
- raise ValueError("Lower bound {} should be less than higher bound {}"
- .format(min, max))
- self.signed = min < 0 or max < 0
- self.nbits = builtins.max(bits_for(min, self.signed), bits_for(max, self.signed))
-
- else:
- if not (min is None and max is None):
- raise ValueError("Only one of bits/signedness or bounds may be specified")
- if isinstance(shape, int):
- self.nbits, self.signed = shape, False
- else:
- self.nbits, self.signed = shape
-
- if not isinstance(self.nbits, int) or self.nbits < 0:
- raise TypeError("Width must be a non-negative integer, not '{!r}'".format(self.nbits))
- self.reset = int(reset)
- self.reset_less = bool(reset_less)
-
- self.attrs = OrderedDict(() if attrs is None else attrs)
- self.decoder = decoder
-
- @classmethod
- def like(cls, other, src_loc_at=0, **kwargs):
- """Create Signal based on another.
-
- Parameters
- ----------
- other : Value
- Object to base this Signal on.
- """
- kw = dict(shape=cls.wrap(other).shape(),
- name=tracer.get_var_name(depth=2 + src_loc_at))
- if isinstance(other, cls):
- kw.update(reset=other.reset, reset_less=other.reset_less,
- attrs=other.attrs, decoder=other.decoder)
- kw.update(kwargs)
- return cls(**kw, src_loc_at=1 + src_loc_at)
-
- def shape(self):
- return self.nbits, self.signed
-
- def _lhs_signals(self):
- return ValueSet((self,))
-
- def _rhs_signals(self):
- return ValueSet((self,))
-
- def __repr__(self):
- return "(sig {})".format(self.name)
-
-
-class ClockSignal(Value):
- """Clock signal for a given clock domain.
-
- ``ClockSignal`` s for a given clock domain can be retrieved multiple
- times. They all ultimately refer to the same signal.
-
- Parameters
- ----------
- domain : str
- Clock domain to obtain a clock signal for. Defaults to ``"sync"``.
- """
- def __init__(self, domain="sync"):
- super().__init__()
- if not isinstance(domain, str):
- raise TypeError("Clock domain name must be a string, not '{!r}'".format(domain))
- self.domain = domain
-
- def shape(self):
- return 1, False
-
- def _rhs_signals(self):
- raise NotImplementedError("ClockSignal must be lowered to a concrete signal") # :nocov:
-
- def __repr__(self):
- return "(clk {})".format(self.domain)
-
-
-class ResetSignal(Value):
- """Reset signal for a given clock domain
-
- ``ResetSignal`` s for a given clock domain can be retrieved multiple
- times. They all ultimately refer to the same signal.
-
- Parameters
- ----------
- domain : str
- Clock domain to obtain a reset signal for. Defaults to ``"sync"``.
- allow_reset_less : bool
- If the clock domain is reset-less, act as a constant ``0`` instead of reporting an error.
- """
- def __init__(self, domain="sync", allow_reset_less=False):
- super().__init__()
- if not isinstance(domain, str):
- raise TypeError("Clock domain name must be a string, not '{!r}'".format(domain))
- self.domain = domain
- self.allow_reset_less = allow_reset_less
-
- def shape(self):
- return 1, False
-
- def _rhs_signals(self):
- raise NotImplementedError("ResetSignal must be lowered to a concrete signal") # :nocov:
-
- def __repr__(self):
- return "(rst {})".format(self.domain)
-
-
-class _StatementList(list):
- def __repr__(self):
- return "({})".format(" ".join(map(repr, self)))
-
-
-class Statement:
- @staticmethod
- def wrap(obj):
- if isinstance(obj, Iterable):
- return _StatementList(sum((Statement.wrap(e) for e in obj), []))
- else:
- if isinstance(obj, Statement):
- return _StatementList([obj])
- else:
- raise TypeError("Object '{!r}' is not a Migen statement".format(obj))
-
-
-class Assign(Statement):
- def __init__(self, lhs, rhs):
- self.lhs = Value.wrap(lhs)
- self.rhs = Value.wrap(rhs)
-
- def _lhs_signals(self):
- return self.lhs._lhs_signals()
-
- def _rhs_signals(self):
- return self.rhs._rhs_signals()
-
- def __repr__(self):
- return "(eq {!r} {!r})".format(self.lhs, self.rhs)
-
-
-class Switch(Statement):
- def __init__(self, test, cases):
- self.test = Value.wrap(test)
- self.cases = OrderedDict()
- for key, stmts in cases.items():
- if isinstance(key, (bool, int)):
- key = "{:0{}b}".format(key, len(self.test))
- elif isinstance(key, str):
- assert len(key) == len(self.test)
- else:
- raise TypeError("Object '{!r}' cannot be used as a switch key"
- .format(key))
- if not isinstance(stmts, Iterable):
- stmts = [stmts]
- self.cases[key] = Statement.wrap(stmts)
-
- def _lhs_signals(self):
- signals = union(s._lhs_signals() for ss in self.cases.values() for s in ss) or ValueSet()
- return signals
-
- def _rhs_signals(self):
- signals = union(s._rhs_signals() for ss in self.cases.values() for s in ss) or ValueSet()
- return self.test._rhs_signals() | signals
-
- def __repr__(self):
- cases = ["(case {} {})".format(key, " ".join(map(repr, stmts)))
- for key, stmts in self.cases.items()]
- return "(switch {!r} {})".format(self.test, " ".join(cases))
-
-
-class Delay(Statement):
- def __init__(self, interval=None):
- self.interval = None if interval is None else float(interval)
-
- def _rhs_signals(self):
- return ValueSet()
-
- def __repr__(self):
- if self.interval is None:
- return "(delay ε)"
- else:
- return "(delay {:.3}us)".format(self.interval * 10e6)
-
-
-class Tick(Statement):
- def __init__(self, domain):
- self.domain = str(domain)
-
- def _rhs_signals(self):
- return ValueSet()
-
- def __repr__(self):
- return "(tick {})".format(self.domain)
-
-
-class Passive(Statement):
- def _rhs_signals(self):
- return ValueSet()
-
- def __repr__(self):
- return "(passive)"
-
-
-class ValueKey:
- def __init__(self, value):
- self.value = Value.wrap(value)
-
- def __hash__(self):
- if isinstance(self.value, Const):
- return hash(self.value)
- elif isinstance(self.value, Signal):
- return hash(id(self.value))
- elif isinstance(self.value, Slice):
- return hash((ValueKey(self.value.value), self.value.start, self.value.end))
- else: # :nocov:
- raise TypeError("Object '{!r}' cannot be used as a key in value collections")
-
- def __eq__(self, other):
- if not isinstance(other, ValueKey):
- return False
- if type(self.value) != type(other.value):
- return False
-
- if isinstance(self.value, Const):
- return self.value == other.value
- elif isinstance(self.value, Signal):
- return id(self.value) == id(other.value)
- elif isinstance(self.value, Slice):
- return (ValueKey(self.value.value) == ValueKey(other.value.value) and
- self.value.start == other.value.start and
- self.value.end == other.value.end)
- else: # :nocov:
- raise TypeError("Object '{!r}' cannot be used as a key in value collections")
-
- def __lt__(self, other):
- if not isinstance(other, ValueKey):
- return False
- if type(self.value) != type(other.value):
- return False
-
- if isinstance(self.value, Const):
- return self.value < other.value
- elif isinstance(self.value, Signal):
- return self.value.duid < other.value.duid
- elif isinstance(self.value, Slice):
- return (ValueKey(self.value.value) < ValueKey(other.value.value) and
- self.value.start < other.value.start and
- self.value.end < other.value.end)
- else: # :nocov:
- raise TypeError("Object '{!r}' cannot be used as a key in value collections")
-
- def __repr__(self):
- return "<{}.ValueKey {!r}>".format(__name__, self.value)
-
-
-class ValueDict(MutableMapping):
- def __init__(self, pairs=()):
- self._inner = dict()
- for key, value in pairs:
- self[key] = value
-
- def __getitem__(self, key):
- key = None if key is None else ValueKey(key)
- return self._inner[key]
-
- def __setitem__(self, key, value):
- key = None if key is None else ValueKey(key)
- self._inner[key] = value
-
- def __delitem__(self, key):
- key = None if key is None else ValueKey(key)
- del self._inner[key]
-
- def __iter__(self):
- return map(lambda x: None if x is None else x.value, sorted(self._inner))
-
- def __eq__(self, other):
- if not isinstance(other, ValueDict):
- return False
- if len(self) != len(other):
- return False
- for ak, bk in zip(self, other):
- if ValueKey(ak) != ValueKey(bk):
- return False
- if self[ak] != other[bk]:
- return False
- return True
-
- def __len__(self):
- return len(self._inner)
-
- def __repr__(self):
- pairs = ["({!r}, {!r})".format(k, v) for k, v in self.items()]
- return "ValueDict([{}])".format(", ".join(pairs))
-
-
-class ValueSet(MutableSet):
- def __init__(self, elements=()):
- self._inner = set()
- for elem in elements:
- self.add(elem)
-
- def add(self, value):
- self._inner.add(ValueKey(value))
-
- def update(self, values):
- for value in values:
- self.add(value)
-
- def discard(self, value):
- self._inner.discard(ValueKey(value))
-
- def __contains__(self, value):
- return ValueKey(value) in self._inner
-
- def __iter__(self):
- return map(lambda x: x.value, sorted(self._inner))
-
- def __len__(self):
- return len(self._inner)
-
- def __repr__(self):
- return "ValueSet({})".format(", ".join(repr(x) for x in self))
+++ /dev/null
-from .. import tracer
-from .ast import Signal
-
-
-__all__ = ["ClockDomain", "DomainError"]
-
-
-class DomainError(Exception):
- pass
-
-
-class ClockDomain:
- """Synchronous domain.
-
- Parameters
- ----------
- name : str or None
- Domain name. If ``None`` (the default) the name is inferred from the variable name this
- ``ClockDomain`` is assigned to (stripping any `"cd_"` prefix).
- reset_less : bool
- If ``True``, the domain does not use a reset signal. Registers within this domain are
- still all initialized to their reset state once, e.g. through Verilog `"initial"`
- statements.
- async_reset : bool
- If ``True``, the domain uses an asynchronous reset, and registers within this domain
- are initialized to their reset state when reset level changes. Otherwise, registers
- are initialized to reset state at the next clock cycle when reset is asserted.
-
- Attributes
- ----------
- clk : Signal, inout
- The clock for this domain. Can be driven or used to drive other signals (preferably
- in combinatorial context).
- rst : Signal or None, inout
- Reset signal for this domain. Can be driven or used to drive.
- """
-
- @staticmethod
- def _name_for(domain_name, signal_name):
- if domain_name == "sync":
- return signal_name
- else:
- return "{}_{}".format(domain_name, signal_name)
-
- def __init__(self, name=None, reset_less=False, async_reset=False):
- if name is None:
- try:
- name = tracer.get_var_name()
- except tracer.NameNotFound:
- raise ValueError("Clock domain name must be specified explicitly")
- if name.startswith("cd_"):
- name = name[3:]
- self.name = name
-
- self.clk = Signal(name=self._name_for(name, "clk"), src_loc_at=1)
- if reset_less:
- self.rst = None
- else:
- self.rst = Signal(name=self._name_for(name, "rst"), src_loc_at=1)
-
- self.async_reset = async_reset
-
- def rename(self, new_name):
- self.name = new_name
- self.clk.name = self._name_for(new_name, "clk")
- if self.rst is not None:
- self.rst.name = self._name_for(new_name, "rst")
+++ /dev/null
-from collections import OrderedDict
-from collections.abc import Iterable
-from contextlib import contextmanager
-
-from .ast import *
-from .ir import *
-from .xfrm import *
-
-
-__all__ = ["Module", "SyntaxError"]
-
-
-class SyntaxError(Exception):
- pass
-
-
-class _ModuleBuilderProxy:
- def __init__(self, builder, depth):
- object.__setattr__(self, "_builder", builder)
- object.__setattr__(self, "_depth", depth)
-
-
-class _ModuleBuilderDomain(_ModuleBuilderProxy):
- def __init__(self, builder, depth, domain):
- super().__init__(builder, depth)
- self._domain = domain
-
- def __iadd__(self, assigns):
- self._builder._add_statement(assigns, domain=self._domain, depth=self._depth)
- return self
-
-
-class _ModuleBuilderDomains(_ModuleBuilderProxy):
- def __getattr__(self, name):
- if name == "comb":
- domain = None
- else:
- domain = name
- return _ModuleBuilderDomain(self._builder, self._depth, domain)
-
- def __getitem__(self, name):
- return self.__getattr__(name)
-
- def __setattr__(self, name, value):
- if name == "_depth":
- object.__setattr__(self, name, value)
- elif not isinstance(value, _ModuleBuilderDomain):
- raise AttributeError("Cannot assign 'd.{}' attribute; did you mean 'd.{} +='?"
- .format(name, name))
-
- def __setitem__(self, name, value):
- return self.__setattr__(name, value)
-
-
-class _ModuleBuilderRoot:
- def __init__(self, builder, depth):
- self._builder = builder
- self.domain = self.d = _ModuleBuilderDomains(builder, depth)
-
- def __getattr__(self, name):
- if name in ("comb", "sync"):
- raise AttributeError("'{}' object has no attribute '{}'; did you mean 'd.{}'?"
- .format(type(self).__name__, name, name))
- raise AttributeError("'{}' object has no attribute '{}'"
- .format(type(self).__name__, name))
-
-
-class _ModuleBuilderSubmodules:
- def __init__(self, builder):
- object.__setattr__(self, "_builder", builder)
-
- def __iadd__(self, modules):
- if isinstance(modules, Iterable):
- for module in modules:
- self._builder._add_submodule(module)
- else:
- module = modules
- self._builder._add_submodule(module)
- return self
-
- def __setattr__(self, name, submodule):
- self._builder._add_submodule(submodule, name)
-
-
-class Module(_ModuleBuilderRoot):
- def __init__(self):
- _ModuleBuilderRoot.__init__(self, self, depth=0)
- self.submodules = _ModuleBuilderSubmodules(self)
-
- self._submodules = []
- self._driving = ValueDict()
- self._statements = Statement.wrap([])
- self._ctrl_context = None
- self._ctrl_stack = []
- self._stmt_if_cond = []
- self._stmt_if_bodies = []
- self._stmt_switch_test = None
- self._stmt_switch_cases = OrderedDict()
-
- def _check_context(self, construct, context):
- if self._ctrl_context != context:
- if self._ctrl_context is None:
- raise SyntaxError("{} is not permitted outside of {}"
- .format(construct, context))
- else:
- raise SyntaxError("{} is not permitted inside of {}"
- .format(construct, self._ctrl_context))
-
- def _get_ctrl(self, name):
- if self._ctrl_stack:
- top_name, top_data = self._ctrl_stack[-1]
- if top_name == name:
- return top_data
-
- def _flush_ctrl(self):
- while len(self._ctrl_stack) > self.domain._depth:
- self._pop_ctrl()
-
- def _set_ctrl(self, name, data):
- self._flush_ctrl()
- self._ctrl_stack.append((name, data))
- return data
-
- @contextmanager
- def If(self, cond):
- self._check_context("If", context=None)
- if_data = self._set_ctrl("If", {"tests": [], "bodies": []})
- try:
- _outer_case, self._statements = self._statements, []
- self.domain._depth += 1
- yield
- self._flush_ctrl()
- if_data["tests"].append(cond)
- if_data["bodies"].append(self._statements)
- finally:
- self.domain._depth -= 1
- self._statements = _outer_case
-
- @contextmanager
- def Elif(self, cond):
- self._check_context("Elif", context=None)
- if_data = self._get_ctrl("If")
- if if_data is None:
- raise SyntaxError("Elif without preceding If")
- try:
- _outer_case, self._statements = self._statements, []
- self.domain._depth += 1
- yield
- self._flush_ctrl()
- if_data["tests"].append(cond)
- if_data["bodies"].append(self._statements)
- finally:
- self.domain._depth -= 1
- self._statements = _outer_case
-
- @contextmanager
- def Else(self):
- self._check_context("Else", context=None)
- if_data = self._get_ctrl("If")
- if if_data is None:
- raise SyntaxError("Else without preceding If/Elif")
- try:
- _outer_case, self._statements = self._statements, []
- self.domain._depth += 1
- yield
- self._flush_ctrl()
- if_data["bodies"].append(self._statements)
- finally:
- self.domain._depth -= 1
- self._statements = _outer_case
- self._pop_ctrl()
-
- @contextmanager
- def Switch(self, test):
- self._check_context("Switch", context=None)
- switch_data = self._set_ctrl("Switch", {"test": test, "cases": OrderedDict()})
- try:
- self._ctrl_context = "Switch"
- self.domain._depth += 1
- yield
- finally:
- self.domain._depth -= 1
- self._ctrl_context = None
- self._pop_ctrl()
-
- @contextmanager
- def Case(self, value=None):
- self._check_context("Case", context="Switch")
- switch_data = self._get_ctrl("Switch")
- if value is None:
- value = "-" * len(switch_data["test"])
- if isinstance(value, str) and len(switch_data["test"]) != len(value):
- raise SyntaxError("Case value '{}' must have the same width as test (which is {})"
- .format(value, len(switch_data["test"])))
- try:
- _outer_case, self._statements = self._statements, []
- self._ctrl_context = None
- yield
- self._flush_ctrl()
- switch_data["cases"][value] = self._statements
- finally:
- self._ctrl_context = "Switch"
- self._statements = _outer_case
-
- def _pop_ctrl(self):
- name, data = self._ctrl_stack.pop()
-
- if name == "If":
- if_tests, if_bodies = data["tests"], data["bodies"]
-
- tests, cases = [], OrderedDict()
- for if_test, if_case in zip(if_tests + [None], if_bodies):
- if if_test is not None:
- if_test = Value.wrap(if_test)
- if len(if_test) != 1:
- if_test = if_test.bool()
- tests.append(if_test)
-
- if if_test is not None:
- match = ("1" + "-" * (len(tests) - 1)).rjust(len(if_tests), "-")
- else:
- match = "-" * len(tests)
- cases[match] = if_case
-
- self._statements.append(Switch(Cat(tests), cases))
-
- if name == "Switch":
- switch_test, switch_cases = data["test"], data["cases"]
-
- self._statements.append(Switch(switch_test, switch_cases))
-
- def _add_statement(self, assigns, domain, depth, compat_mode=False):
- def domain_name(domain):
- if domain is None:
- return "comb"
- else:
- return domain
-
- while len(self._ctrl_stack) > self.domain._depth:
- self._pop_ctrl()
-
- for assign in Statement.wrap(assigns):
- if not compat_mode and not isinstance(assign, Assign):
- raise SyntaxError(
- "Only assignments may be appended to d.{}"
- .format(domain_name(domain)))
-
- for signal in assign._lhs_signals():
- if signal not in self._driving:
- self._driving[signal] = domain
- elif self._driving[signal] != domain:
- cd_curr = self._driving[signal]
- raise SyntaxError(
- "Driver-driver conflict: trying to drive {!r} from d.{}, but it is "
- "already driven from d.{}"
- .format(signal, domain_name(domain), domain_name(cd_curr)))
-
- self._statements.append(assign)
-
- def _add_submodule(self, submodule, name=None):
- if not hasattr(submodule, "get_fragment"):
- raise TypeError("Trying to add '{!r}', which does not implement .get_fragment(), as "
- "a submodule".format(submodule))
- self._submodules.append((submodule, name))
-
- def _flush(self):
- while self._ctrl_stack:
- self._pop_ctrl()
-
- def lower(self, platform):
- self._flush()
-
- fragment = Fragment()
- for submodule, name in self._submodules:
- fragment.add_subfragment(submodule.get_fragment(platform), name)
- fragment.add_statements(self._statements)
- for signal, domain in self._driving.items():
- fragment.add_driver(signal, domain)
- return fragment
-
- get_fragment = lower
+++ /dev/null
-import warnings
-from collections import defaultdict, OrderedDict
-
-from ..tools import *
-from .ast import *
-from .cd import *
-
-
-__all__ = ["Fragment", "DriverConflict"]
-
-
-class DriverConflict(UserWarning):
- pass
-
-
-class Fragment:
- def __init__(self):
- self.ports = ValueDict()
- self.drivers = OrderedDict()
- self.statements = []
- self.domains = OrderedDict()
- self.subfragments = []
-
- def add_ports(self, *ports, kind):
- assert kind in ("i", "o", "io")
- for port in flatten(ports):
- self.ports[port] = kind
-
- def iter_ports(self):
- yield from self.ports.keys()
-
- def add_driver(self, signal, domain=None):
- if domain not in self.drivers:
- self.drivers[domain] = ValueSet()
- self.drivers[domain].add(signal)
-
- def iter_drivers(self):
- for domain, signals in self.drivers.items():
- for signal in signals:
- yield domain, signal
-
- def iter_comb(self):
- if None in self.drivers:
- yield from self.drivers[None]
-
- def iter_sync(self):
- for domain, signals in self.drivers.items():
- if domain is None:
- continue
- for signal in signals:
- yield domain, signal
-
- def iter_signals(self):
- signals = ValueSet()
- signals |= self.ports.keys()
- for domain, domain_signals in self.drivers.items():
- if domain is not None:
- cd = self.domains[domain]
- signals.add(cd.clk)
- if cd.rst is not None:
- signals.add(cd.rst)
- signals |= domain_signals
- return signals
-
- def add_domains(self, *domains):
- for domain in domains:
- assert isinstance(domain, ClockDomain)
- assert domain.name not in self.domains
- self.domains[domain.name] = domain
-
- def iter_domains(self):
- yield from self.domains
-
- def add_statements(self, *stmts):
- self.statements += Statement.wrap(stmts)
-
- def add_subfragment(self, subfragment, name=None):
- assert isinstance(subfragment, Fragment)
- self.subfragments.append((subfragment, name))
-
- def _resolve_driver_conflicts(self, hierarchy=("top",), mode="warn"):
- assert mode in ("silent", "warn", "error")
-
- driver_subfrags = ValueDict()
-
- # For each signal driven by this fragment and/or its subfragments, determine which
- # subfragments also drive it.
- for domain, signal in self.iter_drivers():
- if signal not in driver_subfrags:
- driver_subfrags[signal] = set()
- driver_subfrags[signal].add((None, hierarchy))
-
- for i, (subfrag, name) in enumerate(self.subfragments):
- # First, recurse into subfragments and let them detect driver conflicts as well.
- if name is None:
- name = "<unnamed #{}>".format(i)
- subfrag_hierarchy = hierarchy + (name,)
- subfrag_drivers = subfrag._resolve_driver_conflicts(subfrag_hierarchy, mode)
-
- # Second, classify subfragments by domains they define.
- for signal in subfrag_drivers:
- if signal not in driver_subfrags:
- driver_subfrags[signal] = set()
- driver_subfrags[signal].add((subfrag, subfrag_hierarchy))
-
- # Find out the set of subfragments that needs to be flattened into this fragment
- # to resolve driver-driver conflicts.
- flatten_subfrags = set()
- for signal, subfrags in driver_subfrags.items():
- if len(subfrags) > 1:
- flatten_subfrags.update((f, h) for f, h in subfrags if f is not None)
-
- # While we're at it, show a message.
- subfrag_names = ", ".join(sorted(".".join(h) for f, h in subfrags))
- message = ("Signal '{}' is driven from multiple fragments: {}"
- .format(signal, subfrag_names))
- if mode == "error":
- raise DriverConflict(message)
- elif mode == "warn":
- message += "; hierarchy will be flattened"
- warnings.warn_explicit(message, DriverConflict, *signal.src_loc)
-
- for subfrag, subfrag_hierarchy in sorted(flatten_subfrags, key=lambda x: x[1]):
- # Merge subfragment's everything except clock domains into this fragment.
- # Flattening is done after clock domain propagation, so we can assume the domains
- # are already the same in every involved fragment in the first place.
- self.ports.update(subfrag.ports)
- for domain, signal in subfrag.iter_drivers():
- self.add_driver(signal, domain)
- self.statements += subfrag.statements
- self.subfragments += subfrag.subfragments
-
- # Remove the merged subfragment.
- for i, (check_subfrag, check_name) in enumerate(self.subfragments): # :nobr:
- if subfrag == check_subfrag:
- del self.subfragments[i]
- break
-
- # If we flattened anything, we might be in a situation where we have a driver conflict
- # again, e.g. if we had a tree of fragments like A --- B --- C where only fragments
- # A and C were driving a signal S. In that case, since B is not driving S itself,
- # processing B will not result in any flattening, but since B is transitively driving S,
- # processing A will flatten B into it. Afterwards, we have a tree like AB --- C, which
- # has another conflict.
- if any(flatten_subfrags):
- # Try flattening again.
- return self._resolve_driver_conflicts(hierarchy, mode)
-
- # Nothing was flattened, we're done!
- return ValueSet(driver_subfrags.keys())
-
- def _propagate_domains_up(self, hierarchy=("top",)):
- from .xfrm import DomainRenamer
-
- domain_subfrags = defaultdict(lambda: set())
-
- # For each domain defined by a subfragment, determine which subfragments define it.
- for i, (subfrag, name) in enumerate(self.subfragments):
- # First, recurse into subfragments and let them propagate domains up as well.
- hier_name = name
- if hier_name is None:
- hier_name = "<unnamed #{}>".format(i)
- subfrag._propagate_domains_up(hierarchy + (hier_name,))
-
- # Second, classify subfragments by domains they define.
- for domain in subfrag.iter_domains():
- domain_subfrags[domain].add((subfrag, name, i))
-
- # For each domain defined by more than one subfragment, rename the domain in each
- # of the subfragments such that they no longer conflict.
- for domain, subfrags in domain_subfrags.items():
- if len(subfrags) == 1:
- continue
-
- names = [n for f, n, i in subfrags]
- if not all(names):
- names = sorted("<unnamed #{}>".format(i) if n is None else "'{}'".format(n)
- for f, n, i in subfrags)
- raise DomainError("Domain '{}' is defined by subfragments {} of fragment '{}'; "
- "it is necessary to either rename subfragment domains "
- "explicitly, or give names to subfragments"
- .format(domain, ", ".join(names), ".".join(hierarchy)))
-
- if len(names) != len(set(names)):
- names = sorted("#{}".format(i) for f, n, i in subfrags)
- raise DomainError("Domain '{}' is defined by subfragments {} of fragment '{}', "
- "some of which have identical names; it is necessary to either "
- "rename subfragment domains explicitly, or give distinct names "
- "to subfragments"
- .format(domain, ", ".join(names), ".".join(hierarchy)))
-
- for subfrag, name, i in subfrags:
- self.subfragments[i] = \
- (DomainRenamer({domain: "{}_{}".format(name, domain)})(subfrag), name)
-
- # Finally, collect the (now unique) subfragment domains, and merge them into our domains.
- for subfrag, name in self.subfragments:
- for domain in subfrag.iter_domains():
- self.add_domains(subfrag.domains[domain])
-
- def _propagate_domains_down(self):
- # For each domain defined in this fragment, ensure it also exists in all subfragments.
- for subfrag, name in self.subfragments:
- for domain in self.iter_domains():
- if domain in subfrag.domains:
- assert self.domains[domain] is subfrag.domains[domain]
- else:
- subfrag.add_domains(self.domains[domain])
-
- subfrag._propagate_domains_down()
-
- def _propagate_domains(self, ensure_sync_exists):
- self._propagate_domains_up()
- if ensure_sync_exists and not self.domains:
- self.add_domains(ClockDomain("sync"))
- self._propagate_domains_down()
-
- def _insert_domain_resets(self):
- from .xfrm import ResetInserter
-
- resets = {cd.name: cd.rst for cd in self.domains.values() if cd.rst is not None}
- return ResetInserter(resets)(self)
-
- def _lower_domain_signals(self):
- from .xfrm import DomainLowerer
-
- return DomainLowerer(self.domains)(self)
-
- def _propagate_ports(self, ports):
- # Collect all signals we're driving (on LHS of statements), and signals we're using
- # (on RHS of statements, or in clock domains).
- self_driven = union(s._lhs_signals() for s in self.statements) or ValueSet()
- self_used = union(s._rhs_signals() for s in self.statements) or ValueSet()
- for domain, _ in self.iter_sync():
- cd = self.domains[domain]
- self_used.add(cd.clk)
- if cd.rst is not None:
- self_used.add(cd.rst)
-
- # Our input ports are all the signals we're using but not driving. This is an over-
- # approximation: some of these signals may be driven by our subfragments.
- ins = self_used - self_driven
- # Our output ports are all the signals we're asked to provide that we're driving. This is
- # an underapproximation: some of these signals may be driven by subfragments.
- outs = ports & self_driven
-
- # Go through subfragments and refine our approximation for ports.
- for subfrag, name in self.subfragments:
- # Always ask subfragments to provide all signals we're using and signals we're asked
- # to provide. If the subfragment is not driving it, it will silently ignore it.
- sub_ins, sub_outs = subfrag._propagate_ports(ports=self_used | ports)
- # Refine the input port approximation: if a subfragment is driving a signal,
- # it is definitely not our input. But, if a subfragment requires a signal as an input,
- # and we aren't driving it, it has to be our input as well.
- ins -= sub_outs
- ins |= sub_ins - self_driven
- # Refine the output port approximation: if a subfragment is driving a signal,
- # and we're asked to provide it, we can provide it now.
- outs |= ports & sub_outs
-
- # We've computed the precise set of input and output ports.
- self.add_ports(ins, kind="i")
- self.add_ports(outs, kind="o")
-
- return ins, outs
-
- def prepare(self, ports=(), ensure_sync_exists=True):
- from .xfrm import FragmentTransformer
-
- fragment = FragmentTransformer()(self)
- fragment._propagate_domains(ensure_sync_exists)
- fragment._resolve_driver_conflicts()
- fragment = fragment._insert_domain_resets()
- fragment = fragment._lower_domain_signals()
- fragment._propagate_ports(ports)
- return fragment
+++ /dev/null
-from collections import OrderedDict
-from collections.abc import Iterable
-
-from ..tools import flatten
-from .ast import *
-from .ast import _StatementList
-from .cd import *
-from .ir import *
-
-
-__all__ = ["ValueTransformer", "StatementTransformer", "FragmentTransformer",
- "DomainRenamer", "DomainLowerer", "ResetInserter", "CEInserter"]
-
-
-class ValueTransformer:
- def on_Const(self, value):
- return value
-
- def on_Signal(self, value):
- return value
-
- def on_ClockSignal(self, value):
- return value
-
- def on_ResetSignal(self, value):
- return value
-
- def on_Operator(self, value):
- return Operator(value.op, [self.on_value(o) for o in value.operands])
-
- def on_Slice(self, value):
- return Slice(self.on_value(value.value), value.start, value.end)
-
- def on_Part(self, value):
- return Part(self.on_value(value.value), self.on_value(value.offset), value.width)
-
- def on_Cat(self, value):
- return Cat(self.on_value(o) for o in value.operands)
-
- def on_Repl(self, value):
- return Repl(self.on_value(value.value), value.count)
-
- def on_unknown_value(self, value):
- raise TypeError("Cannot transform value '{!r}'".format(value)) # :nocov:
-
- def on_value(self, value):
- if isinstance(value, Const):
- new_value = self.on_Const(value)
- elif isinstance(value, Signal):
- new_value = self.on_Signal(value)
- elif isinstance(value, ClockSignal):
- new_value = self.on_ClockSignal(value)
- elif isinstance(value, ResetSignal):
- new_value = self.on_ResetSignal(value)
- elif isinstance(value, Operator):
- new_value = self.on_Operator(value)
- elif isinstance(value, Slice):
- new_value = self.on_Slice(value)
- elif isinstance(value, Part):
- new_value = self.on_Part(value)
- elif isinstance(value, Cat):
- new_value = self.on_Cat(value)
- elif isinstance(value, Repl):
- new_value = self.on_Repl(value)
- else:
- new_value = self.on_unknown_value(value)
- if isinstance(new_value, Value):
- new_value.src_loc = value.src_loc
- return new_value
-
- def __call__(self, value):
- return self.on_value(value)
-
-
-class StatementTransformer:
- def on_value(self, value):
- return value
-
- def on_Assign(self, stmt):
- return Assign(self.on_value(stmt.lhs), self.on_value(stmt.rhs))
-
- def on_Switch(self, stmt):
- cases = OrderedDict((k, self.on_statement(v)) for k, v in stmt.cases.items())
- return Switch(self.on_value(stmt.test), cases)
-
- def on_statements(self, stmt):
- return _StatementList(flatten(self.on_statement(stmt) for stmt in stmt))
-
- def on_unknown_statement(self, stmt):
- raise TypeError("Cannot transform statement '{!r}'".format(stmt)) # :nocov:
-
- def on_statement(self, stmt):
- if isinstance(stmt, Assign):
- return self.on_Assign(stmt)
- elif isinstance(stmt, Switch):
- return self.on_Switch(stmt)
- elif isinstance(stmt, Iterable):
- return self.on_statements(stmt)
- else:
- return self.on_unknown_statement(stmt)
-
- def __call__(self, value):
- return self.on_statement(value)
-
-
-class FragmentTransformer:
- def map_subfragments(self, fragment, new_fragment):
- for subfragment, name in fragment.subfragments:
- new_fragment.add_subfragment(self(subfragment), name)
-
- def map_domains(self, fragment, new_fragment):
- for domain in fragment.iter_domains():
- new_fragment.add_domains(fragment.domains[domain])
-
- def map_statements(self, fragment, new_fragment):
- if hasattr(self, "on_statement"):
- new_fragment.add_statements(map(self.on_statement, fragment.statements))
- else:
- new_fragment.add_statements(fragment.statements)
-
- def map_drivers(self, fragment, new_fragment):
- for domain, signal in fragment.iter_drivers():
- new_fragment.add_driver(signal, domain)
-
- def on_fragment(self, fragment):
- new_fragment = Fragment()
- self.map_subfragments(fragment, new_fragment)
- self.map_domains(fragment, new_fragment)
- self.map_statements(fragment, new_fragment)
- self.map_drivers(fragment, new_fragment)
- return new_fragment
-
- def __call__(self, value):
- return self.on_fragment(value)
-
-
-class DomainRenamer(FragmentTransformer, ValueTransformer, StatementTransformer):
- def __init__(self, domain_map):
- if isinstance(domain_map, str):
- domain_map = {"sync": domain_map}
- self.domain_map = OrderedDict(domain_map)
-
- def on_ClockSignal(self, value):
- if value.domain in self.domain_map:
- return ClockSignal(self.domain_map[value.domain])
- return value
-
- def on_ResetSignal(self, value):
- if value.domain in self.domain_map:
- return ResetSignal(self.domain_map[value.domain])
- return value
-
- def map_domains(self, fragment, new_fragment):
- for domain in fragment.iter_domains():
- cd = fragment.domains[domain]
- if domain in self.domain_map:
- if cd.name == domain:
- # Rename the actual ClockDomain object.
- cd.rename(self.domain_map[domain])
- else:
- assert cd.name == self.domain_map[domain]
- new_fragment.add_domains(cd)
-
- def map_drivers(self, fragment, new_fragment):
- for domain, signals in fragment.drivers.items():
- if domain in self.domain_map:
- domain = self.domain_map[domain]
- for signal in signals:
- new_fragment.add_driver(signal, domain)
-
-
-class DomainLowerer(FragmentTransformer, ValueTransformer, StatementTransformer):
- def __init__(self, domains):
- self.domains = domains
-
- def _resolve(self, domain, context):
- if domain not in self.domains:
- raise DomainError("Signal {!r} refers to nonexistent domain '{}'"
- .format(context, domain))
- return self.domains[domain]
-
- def on_ClockSignal(self, value):
- cd = self._resolve(value.domain, value)
- return cd.clk
-
- def on_ResetSignal(self, value):
- cd = self._resolve(value.domain, value)
- if cd.rst is None:
- if value.allow_reset_less:
- return Const(0)
- else:
- raise DomainError("Signal {!r} refers to reset of reset-less domain '{}'"
- .format(value, value.domain))
- return cd.rst
-
-
-class _ControlInserter(FragmentTransformer):
- def __init__(self, controls):
- if isinstance(controls, Value):
- controls = {"sync": controls}
- self.controls = OrderedDict(controls)
-
- def on_fragment(self, fragment):
- new_fragment = super().on_fragment(fragment)
- for domain, signals in fragment.drivers.items():
- if domain is None or domain not in self.controls:
- continue
- self._insert_control(new_fragment, domain, signals)
- return new_fragment
-
- def _insert_control(self, fragment, domain, signals):
- raise NotImplementedError # :nocov:
-
-
-class ResetInserter(_ControlInserter):
- def _insert_control(self, fragment, domain, signals):
- stmts = [s.eq(Const(s.reset, s.nbits)) for s in signals if not s.reset_less]
- fragment.add_statements(Switch(self.controls[domain], {1: stmts}))
-
-
-class CEInserter(_ControlInserter):
- def _insert_control(self, fragment, domain, signals):
- stmts = [s.eq(s) for s in signals]
- fragment.add_statements(Switch(self.controls[domain], {0: stmts}))
+++ /dev/null
-from ..fhdl import *
-
-
-__all__ = ["MultiReg"]
-
-
-class MultiReg:
- def __init__(self, i, o, odomain="sync", n=2, reset=0):
- self.i = i
- self.o = o
- self.odomain = odomain
-
- self._regs = [Signal(self.i.shape(), name="cdc{}".format(i),
- reset=reset, reset_less=True, attrs={"no_retiming": True})
- for i in range(n)]
-
- def get_fragment(self, platform):
- if hasattr(platform, "get_multi_reg"):
- return platform.get_multi_reg(self)
-
- m = Module()
- for i, o in zip((self.i, *self._regs), self._regs):
- m.d[self.odomain] += o.eq(i)
- m.d.comb += self.o.eq(self._regs[-1])
- return m.lower(platform)
+++ /dev/null
-from ..fhdl import *
-
-
-__all__ = ["TSTriple"]
-
-
-class TSTriple:
- def __init__(self, shape=None, min=None, max=None, reset_o=0, reset_oe=0, reset_i=0,
- name=None):
- self.o = Signal(shape, min=min, max=max, reset=reset_o,
- name=None if name is None else name + "_o")
- self.oe = Signal(reset=reset_oe,
- name=None if name is None else name + "_oe")
- self.i = Signal(shape, min=min, max=max, reset=reset_i,
- name=None if name is None else name + "_i")
-
- def __len__(self):
- return len(self.o)
-
- def get_fragment(self, platform):
- return Fragment()
--- /dev/null
+from abc import ABCMeta, abstractmethod
+import builtins
+import traceback
+from collections import OrderedDict
+from collections.abc import Iterable, MutableMapping, MutableSet
+
+from .. import tracer
+from ..tools import *
+
+
+__all__ = [
+ "Value", "Const", "C", "Operator", "Mux", "Part", "Slice", "Cat", "Repl",
+ "Signal", "ClockSignal", "ResetSignal",
+ "Statement", "Assign", "Switch", "Delay", "Tick", "Passive",
+ "ValueKey", "ValueDict", "ValueSet",
+]
+
+
+class DUID:
+ """Deterministic Unique IDentifier"""
+ __next_uid = 0
+ def __init__(self):
+ self.duid = DUID.__next_uid
+ DUID.__next_uid += 1
+
+
+class Value(metaclass=ABCMeta):
+ @staticmethod
+ def wrap(obj):
+ """Ensures that the passed object is a Migen value. Booleans and integers
+ are automatically wrapped into ``Const``."""
+ if isinstance(obj, Value):
+ return obj
+ elif isinstance(obj, (bool, int)):
+ return Const(obj)
+ else:
+ raise TypeError("Object '{!r}' is not a Migen value".format(obj))
+
+ def __init__(self, src_loc_at=0):
+ super().__init__()
+
+ src_loc_at += 3
+ tb = traceback.extract_stack(limit=src_loc_at)
+ if len(tb) < src_loc_at:
+ self.src_loc = None
+ else:
+ self.src_loc = (tb[0].filename, tb[0].lineno)
+
+ def __bool__(self):
+ raise TypeError("Attempted to convert Migen value to boolean")
+
+ def __invert__(self):
+ return Operator("~", [self])
+ def __neg__(self):
+ return Operator("-", [self])
+
+ def __add__(self, other):
+ return Operator("+", [self, other])
+ def __radd__(self, other):
+ return Operator("+", [other, self])
+ def __sub__(self, other):
+ return Operator("-", [self, other])
+ def __rsub__(self, other):
+ return Operator("-", [other, self])
+ def __mul__(self, other):
+ return Operator("*", [self, other])
+ def __rmul__(self, other):
+ return Operator("*", [other, self])
+ def __mod__(self, other):
+ return Operator("%", [self, other])
+ def __rmod__(self, other):
+ return Operator("%", [other, self])
+ def __div__(self, other):
+ return Operator("/", [self, other])
+ def __rdiv__(self, other):
+ return Operator("/", [other, self])
+ def __lshift__(self, other):
+ return Operator("<<", [self, other])
+ def __rlshift__(self, other):
+ return Operator("<<", [other, self])
+ def __rshift__(self, other):
+ return Operator(">>", [self, other])
+ def __rrshift__(self, other):
+ return Operator(">>", [other, self])
+ def __and__(self, other):
+ return Operator("&", [self, other])
+ def __rand__(self, other):
+ return Operator("&", [other, self])
+ def __xor__(self, other):
+ return Operator("^", [self, other])
+ def __rxor__(self, other):
+ return Operator("^", [other, self])
+ def __or__(self, other):
+ return Operator("|", [self, other])
+ def __ror__(self, other):
+ return Operator("|", [other, self])
+
+ def __eq__(self, other):
+ return Operator("==", [self, other])
+ def __ne__(self, other):
+ return Operator("!=", [self, other])
+ def __lt__(self, other):
+ return Operator("<", [self, other])
+ def __le__(self, other):
+ return Operator("<=", [self, other])
+ def __gt__(self, other):
+ return Operator(">", [self, other])
+ def __ge__(self, other):
+ return Operator(">=", [self, other])
+
+ def __len__(self):
+ return self.shape()[0]
+
+ def __getitem__(self, key):
+ n = len(self)
+ if isinstance(key, int):
+ if key not in range(-n, n):
+ raise IndexError("Cannot index {} bits into {}-bit value".format(key, n))
+ if key < 0:
+ key += n
+ return Slice(self, key, key + 1)
+ elif isinstance(key, slice):
+ start, stop, step = key.indices(n)
+ if step != 1:
+ return Cat(self[i] for i in range(start, stop, step))
+ return Slice(self, start, stop)
+ else:
+ raise TypeError("Cannot index value with {}".format(repr(key)))
+
+ def bool(self):
+ """Conversion to boolean.
+
+ Returns
+ -------
+ Value, out
+ Output ``Value``. If any bits are set, returns ``1``, else ``0``.
+ """
+ return Operator("b", [self])
+
+ def part(self, offset, width):
+ """Indexed part-select.
+
+ Selects a constant width but variable offset part of a ``Value``.
+
+ Parameters
+ ----------
+ offset : Value, in
+ start point of the selected bits
+ width : int
+ number of selected bits
+
+ Returns
+ -------
+ Part, out
+ Selected part of the ``Value``
+ """
+ return Part(self, offset, width)
+
+ def eq(self, value):
+ """Assignment.
+
+ Parameters
+ ----------
+ value : Value, in
+ Value to be assigned.
+
+ Returns
+ -------
+ Assign
+ Assignment statement that can be used in combinatorial or synchronous context.
+ """
+ return Assign(self, value)
+
+ @abstractmethod
+ def shape(self):
+ """Bit length and signedness of a value.
+
+ Returns
+ -------
+ int, bool
+ Number of bits required to store `v` or available in `v`, followed by
+ whether `v` has a sign bit (included in the bit count).
+
+ Examples
+ --------
+ >>> Value.shape(Signal(8))
+ 8, False
+ >>> Value.shape(C(0xaa))
+ 8, False
+ """
+ pass # :nocov:
+
+ def _lhs_signals(self):
+ raise TypeError("Value {!r} cannot be used in assignments".format(self))
+
+ @abstractmethod
+ def _rhs_signals(self):
+ pass # :nocov:
+
+ __hash__ = None
+
+
+class Const(Value):
+ """A constant, literal integer value.
+
+ Parameters
+ ----------
+ value : int
+ shape : int or tuple or None
+ Either an integer `bits` or a tuple `(bits, signed)`
+ specifying the number of bits in this `Const` and whether it is
+ signed (can represent negative values). `shape` defaults
+ to the minimum width and signedness of `value`.
+
+ Attributes
+ ----------
+ nbits : int
+ signed : bool
+ """
+ src_loc = None
+
+ @staticmethod
+ def normalize(value, shape):
+ nbits, signed = shape
+ mask = (1 << nbits) - 1
+ value &= mask
+ if signed and value >> (nbits - 1):
+ value |= ~mask
+ return value
+
+ def __init__(self, value, shape=None):
+ self.value = int(value)
+ if shape is None:
+ shape = bits_for(self.value), self.value < 0
+ if isinstance(shape, int):
+ shape = shape, self.value < 0
+ self.nbits, self.signed = shape
+ if not isinstance(self.nbits, int) or self.nbits < 0:
+ raise TypeError("Width must be a non-negative integer")
+ self.value = self.normalize(self.value, shape)
+
+ def shape(self):
+ return self.nbits, self.signed
+
+ def _rhs_signals(self):
+ return ValueSet()
+
+ def __repr__(self):
+ return "(const {}'{}d{})".format(self.nbits, "s" if self.signed else "", self.value)
+
+
+C = Const # shorthand
+
+
+class Operator(Value):
+ def __init__(self, op, operands, src_loc_at=0):
+ super().__init__(src_loc_at=1 + src_loc_at)
+ self.op = op
+ self.operands = [Value.wrap(o) for o in operands]
+
+ @staticmethod
+ def _bitwise_binary_shape(a_shape, b_shape):
+ a_bits, a_sign = a_shape
+ b_bits, b_sign = b_shape
+ if not a_sign and not b_sign:
+ # both operands unsigned
+ return max(a_bits, b_bits), False
+ elif a_sign and b_sign:
+ # both operands signed
+ return max(a_bits, b_bits), True
+ elif not a_sign and b_sign:
+ # first operand unsigned (add sign bit), second operand signed
+ return max(a_bits + 1, b_bits), True
+ else:
+ # first signed, second operand unsigned (add sign bit)
+ return max(a_bits, b_bits + 1), True
+
+ def shape(self):
+ op_shapes = list(map(lambda x: x.shape(), self.operands))
+ if len(op_shapes) == 1:
+ (a_bits, a_sign), = op_shapes
+ if self.op in ("+", "~"):
+ return a_bits, a_sign
+ if self.op == "-":
+ if not a_sign:
+ return a_bits + 1, True
+ else:
+ return a_bits, a_sign
+ if self.op == "b":
+ return 1, False
+ elif len(op_shapes) == 2:
+ (a_bits, a_sign), (b_bits, b_sign) = op_shapes
+ if self.op == "+" or self.op == "-":
+ bits, sign = self._bitwise_binary_shape(*op_shapes)
+ return bits + 1, sign
+ if self.op == "*":
+ if not a_sign and not b_sign:
+ # both operands unsigned
+ return a_bits + b_bits, False
+ if a_sign and b_sign:
+ # both operands signed
+ return a_bits + b_bits - 1, True
+ # one operand signed, the other unsigned (add sign bit)
+ return a_bits + b_bits + 1 - 1, True
+ if self.op in ("<", "<=", "==", "!=", ">", ">=", "b"):
+ return 1, False
+ if self.op in ("&", "^", "|"):
+ return self._bitwise_binary_shape(*op_shapes)
+ if self.op == "<<":
+ if b_sign:
+ extra = 2 ** (b_bits - 1) - 1
+ else:
+ extra = 2 ** (b_bits) - 1
+ return a_bits + extra, a_sign
+ if self.op == ">>":
+ if b_sign:
+ extra = 2 ** (b_bits - 1)
+ else:
+ extra = 0
+ return a_bits + extra, a_sign
+ elif len(op_shapes) == 3:
+ if self.op == "m":
+ s_shape, a_shape, b_shape = op_shapes
+ return self._bitwise_binary_shape(a_shape, b_shape)
+ raise NotImplementedError("Operator {}/{} not implemented"
+ .format(self.op, len(op_shapes))) # :nocov:
+
+ def _rhs_signals(self):
+ return union(op._rhs_signals() for op in self.operands)
+
+ def __repr__(self):
+ return "({} {})".format(self.op, " ".join(map(repr, self.operands)))
+
+
+def Mux(sel, val1, val0):
+ """Choose between two values.
+
+ Parameters
+ ----------
+ sel : Value, in
+ Selector.
+ val1 : Value, in
+ val0 : Value, in
+ Input values.
+
+ Returns
+ -------
+ Value, out
+ Output ``Value``. If ``sel`` is asserted, the Mux returns ``val1``, else ``val0``.
+ """
+ return Operator("m", [sel, val1, val0], src_loc_at=1)
+
+
+class Slice(Value):
+ def __init__(self, value, start, end):
+ if not isinstance(start, int):
+ raise TypeError("Slice start must be an integer, not '{!r}'".format(start))
+ if not isinstance(end, int):
+ raise TypeError("Slice end must be an integer, not '{!r}'".format(end))
+
+ n = len(value)
+ if start not in range(-n, n):
+ raise IndexError("Cannot start slice {} bits into {}-bit value".format(start, n))
+ if start < 0:
+ start += n
+ if end not in range(-(n+1), n+1):
+ raise IndexError("Cannot end slice {} bits into {}-bit value".format(end, n))
+ if end < 0:
+ end += n
+ if start > end:
+ raise IndexError("Slice start {} must be less than slice end {}".format(start, end))
+
+ super().__init__()
+ self.value = Value.wrap(value)
+ self.start = start
+ self.end = end
+
+ def shape(self):
+ return self.end - self.start, False
+
+ def _lhs_signals(self):
+ return self.value._lhs_signals()
+
+ def _rhs_signals(self):
+ return self.value._rhs_signals()
+
+ def __repr__(self):
+ return "(slice {} {}:{})".format(repr(self.value), self.start, self.end)
+
+
+class Part(Value):
+ def __init__(self, value, offset, width):
+ if not isinstance(width, int) or width < 0:
+ raise TypeError("Part width must be a non-negative integer, not '{!r}'".format(width))
+
+ super().__init__()
+ self.value = value
+ self.offset = Value.wrap(offset)
+ self.width = width
+
+ def shape(self):
+ return self.width, False
+
+ def _lhs_signals(self):
+ return self.value._lhs_signals()
+
+ def _rhs_signals(self):
+ return self.value._rhs_signals()
+
+ def __repr__(self):
+ return "(part {} {} {})".format(repr(self.value), repr(self.offset), self.width)
+
+
+class Cat(Value):
+ """Concatenate values.
+
+ Form a compound ``Value`` from several smaller ones by concatenation.
+ The first argument occupies the lower bits of the result.
+ The return value can be used on either side of an assignment, that
+ is, the concatenated value can be used as an argument on the RHS or
+ as a target on the LHS. If it is used on the LHS, it must solely
+ consist of ``Signal`` s, slices of ``Signal`` s, and other concatenations
+ meeting these properties. The bit length of the return value is the sum of
+ the bit lengths of the arguments::
+
+ len(Cat(args)) == sum(len(arg) for arg in args)
+
+ Parameters
+ ----------
+ *args : Values or iterables of Values, inout
+ ``Value`` s to be concatenated.
+
+ Returns
+ -------
+ Value, inout
+ Resulting ``Value`` obtained by concatentation.
+ """
+ def __init__(self, *args):
+ super().__init__()
+ self.operands = [Value.wrap(v) for v in flatten(args)]
+
+ def shape(self):
+ return sum(len(op) for op in self.operands), False
+
+ def _lhs_signals(self):
+ return union(op._lhs_signals() for op in self.operands)
+
+ def _rhs_signals(self):
+ return union(op._rhs_signals() for op in self.operands)
+
+ def __repr__(self):
+ return "(cat {})".format(" ".join(map(repr, self.operands)))
+
+
+class Repl(Value):
+ """Replicate a value
+
+ An input value is replicated (repeated) several times
+ to be used on the RHS of assignments::
+
+ len(Repl(s, n)) == len(s) * n
+
+ Parameters
+ ----------
+ value : Value, in
+ Input value to be replicated.
+ count : int
+ Number of replications.
+
+ Returns
+ -------
+ Repl, out
+ Replicated value.
+ """
+ def __init__(self, value, count):
+ if not isinstance(count, int) or count < 0:
+ raise TypeError("Replication count must be a non-negative integer, not '{!r}'"
+ .format(count))
+
+ super().__init__()
+ self.value = Value.wrap(value)
+ self.count = count
+
+ def shape(self):
+ return len(self.value) * self.count, False
+
+ def _rhs_signals(self):
+ return self.value._rhs_signals()
+
+ def __repr__(self):
+ return "(repl {!r} {})".format(self.value, self.count)
+
+
+class Signal(Value, DUID):
+ """A varying integer value.
+
+ Parameters
+ ----------
+ shape : int or tuple or None
+ Either an integer ``bits`` or a tuple ``(bits, signed)`` specifying the number of bits
+ in this ``Signal`` and whether it is signed (can represent negative values).
+ ``shape`` defaults to 1-bit and non-signed.
+ name : str
+ Name hint for this signal. If ``None`` (default) the name is inferred from the variable
+ name this ``Signal`` is assigned to. Name collisions are automatically resolved by
+ prepending names of objects that contain this ``Signal`` and by appending integer
+ sequences.
+ reset : int
+ Reset (synchronous) or default (combinatorial) value.
+ When this ``Signal`` is assigned to in synchronous context and the corresponding clock
+ domain is reset, the ``Signal`` assumes the given value. When this ``Signal`` is unassigned
+ in combinatorial context (due to conditional assignments not being taken), the ``Signal``
+ assumes its ``reset`` value. Defaults to 0.
+ reset_less : bool
+ If ``True``, do not generate reset logic for this ``Signal`` in synchronous statements.
+ The ``reset`` value is only used as a combinatorial default or as the initial value.
+ Defaults to ``False``.
+ min : int or None
+ max : int or None
+ If ``shape`` is ``None``, the signal bit width and signedness are
+ determined by the integer range given by ``min`` (inclusive,
+ defaults to 0) and ``max`` (exclusive, defaults to 2).
+ attrs : dict
+ Dictionary of synthesis attributes.
+ decoder : function
+ A function converting integer signal values to human-readable strings (e.g. FSM state
+ names).
+
+ Attributes
+ ----------
+ nbits : int
+ signed : bool
+ name : str
+ reset : int
+ reset_less : bool
+ attrs : dict
+ """
+
+ def __init__(self, shape=None, name=None, reset=0, reset_less=False, min=None, max=None,
+ attrs=None, decoder=None, src_loc_at=0):
+ super().__init__(src_loc_at=src_loc_at)
+
+ if name is None:
+ try:
+ name = tracer.get_var_name(depth=2 + src_loc_at)
+ except tracer.NameNotFound:
+ name = "$signal"
+ self.name = name
+
+ if shape is None:
+ if min is None:
+ min = 0
+ if max is None:
+ max = 2
+ max -= 1 # make both bounds inclusive
+ if not min < max:
+ raise ValueError("Lower bound {} should be less than higher bound {}"
+ .format(min, max))
+ self.signed = min < 0 or max < 0
+ self.nbits = builtins.max(bits_for(min, self.signed), bits_for(max, self.signed))
+
+ else:
+ if not (min is None and max is None):
+ raise ValueError("Only one of bits/signedness or bounds may be specified")
+ if isinstance(shape, int):
+ self.nbits, self.signed = shape, False
+ else:
+ self.nbits, self.signed = shape
+
+ if not isinstance(self.nbits, int) or self.nbits < 0:
+ raise TypeError("Width must be a non-negative integer, not '{!r}'".format(self.nbits))
+ self.reset = int(reset)
+ self.reset_less = bool(reset_less)
+
+ self.attrs = OrderedDict(() if attrs is None else attrs)
+ self.decoder = decoder
+
+ @classmethod
+ def like(cls, other, src_loc_at=0, **kwargs):
+ """Create Signal based on another.
+
+ Parameters
+ ----------
+ other : Value
+ Object to base this Signal on.
+ """
+ kw = dict(shape=cls.wrap(other).shape(),
+ name=tracer.get_var_name(depth=2 + src_loc_at))
+ if isinstance(other, cls):
+ kw.update(reset=other.reset, reset_less=other.reset_less,
+ attrs=other.attrs, decoder=other.decoder)
+ kw.update(kwargs)
+ return cls(**kw, src_loc_at=1 + src_loc_at)
+
+ def shape(self):
+ return self.nbits, self.signed
+
+ def _lhs_signals(self):
+ return ValueSet((self,))
+
+ def _rhs_signals(self):
+ return ValueSet((self,))
+
+ def __repr__(self):
+ return "(sig {})".format(self.name)
+
+
+class ClockSignal(Value):
+ """Clock signal for a given clock domain.
+
+ ``ClockSignal`` s for a given clock domain can be retrieved multiple
+ times. They all ultimately refer to the same signal.
+
+ Parameters
+ ----------
+ domain : str
+ Clock domain to obtain a clock signal for. Defaults to ``"sync"``.
+ """
+ def __init__(self, domain="sync"):
+ super().__init__()
+ if not isinstance(domain, str):
+ raise TypeError("Clock domain name must be a string, not '{!r}'".format(domain))
+ self.domain = domain
+
+ def shape(self):
+ return 1, False
+
+ def _rhs_signals(self):
+ raise NotImplementedError("ClockSignal must be lowered to a concrete signal") # :nocov:
+
+ def __repr__(self):
+ return "(clk {})".format(self.domain)
+
+
+class ResetSignal(Value):
+ """Reset signal for a given clock domain
+
+ ``ResetSignal`` s for a given clock domain can be retrieved multiple
+ times. They all ultimately refer to the same signal.
+
+ Parameters
+ ----------
+ domain : str
+ Clock domain to obtain a reset signal for. Defaults to ``"sync"``.
+ allow_reset_less : bool
+ If the clock domain is reset-less, act as a constant ``0`` instead of reporting an error.
+ """
+ def __init__(self, domain="sync", allow_reset_less=False):
+ super().__init__()
+ if not isinstance(domain, str):
+ raise TypeError("Clock domain name must be a string, not '{!r}'".format(domain))
+ self.domain = domain
+ self.allow_reset_less = allow_reset_less
+
+ def shape(self):
+ return 1, False
+
+ def _rhs_signals(self):
+ raise NotImplementedError("ResetSignal must be lowered to a concrete signal") # :nocov:
+
+ def __repr__(self):
+ return "(rst {})".format(self.domain)
+
+
+class _StatementList(list):
+ def __repr__(self):
+ return "({})".format(" ".join(map(repr, self)))
+
+
+class Statement:
+ @staticmethod
+ def wrap(obj):
+ if isinstance(obj, Iterable):
+ return _StatementList(sum((Statement.wrap(e) for e in obj), []))
+ else:
+ if isinstance(obj, Statement):
+ return _StatementList([obj])
+ else:
+ raise TypeError("Object '{!r}' is not a Migen statement".format(obj))
+
+
+class Assign(Statement):
+ def __init__(self, lhs, rhs):
+ self.lhs = Value.wrap(lhs)
+ self.rhs = Value.wrap(rhs)
+
+ def _lhs_signals(self):
+ return self.lhs._lhs_signals()
+
+ def _rhs_signals(self):
+ return self.rhs._rhs_signals()
+
+ def __repr__(self):
+ return "(eq {!r} {!r})".format(self.lhs, self.rhs)
+
+
+class Switch(Statement):
+ def __init__(self, test, cases):
+ self.test = Value.wrap(test)
+ self.cases = OrderedDict()
+ for key, stmts in cases.items():
+ if isinstance(key, (bool, int)):
+ key = "{:0{}b}".format(key, len(self.test))
+ elif isinstance(key, str):
+ assert len(key) == len(self.test)
+ else:
+ raise TypeError("Object '{!r}' cannot be used as a switch key"
+ .format(key))
+ if not isinstance(stmts, Iterable):
+ stmts = [stmts]
+ self.cases[key] = Statement.wrap(stmts)
+
+ def _lhs_signals(self):
+ signals = union(s._lhs_signals() for ss in self.cases.values() for s in ss) or ValueSet()
+ return signals
+
+ def _rhs_signals(self):
+ signals = union(s._rhs_signals() for ss in self.cases.values() for s in ss) or ValueSet()
+ return self.test._rhs_signals() | signals
+
+ def __repr__(self):
+ cases = ["(case {} {})".format(key, " ".join(map(repr, stmts)))
+ for key, stmts in self.cases.items()]
+ return "(switch {!r} {})".format(self.test, " ".join(cases))
+
+
+class Delay(Statement):
+ def __init__(self, interval=None):
+ self.interval = None if interval is None else float(interval)
+
+ def _rhs_signals(self):
+ return ValueSet()
+
+ def __repr__(self):
+ if self.interval is None:
+ return "(delay ε)"
+ else:
+ return "(delay {:.3}us)".format(self.interval * 10e6)
+
+
+class Tick(Statement):
+ def __init__(self, domain):
+ self.domain = str(domain)
+
+ def _rhs_signals(self):
+ return ValueSet()
+
+ def __repr__(self):
+ return "(tick {})".format(self.domain)
+
+
+class Passive(Statement):
+ def _rhs_signals(self):
+ return ValueSet()
+
+ def __repr__(self):
+ return "(passive)"
+
+
+class ValueKey:
+ def __init__(self, value):
+ self.value = Value.wrap(value)
+
+ def __hash__(self):
+ if isinstance(self.value, Const):
+ return hash(self.value)
+ elif isinstance(self.value, Signal):
+ return hash(id(self.value))
+ elif isinstance(self.value, Slice):
+ return hash((ValueKey(self.value.value), self.value.start, self.value.end))
+ else: # :nocov:
+ raise TypeError("Object '{!r}' cannot be used as a key in value collections")
+
+ def __eq__(self, other):
+ if not isinstance(other, ValueKey):
+ return False
+ if type(self.value) != type(other.value):
+ return False
+
+ if isinstance(self.value, Const):
+ return self.value == other.value
+ elif isinstance(self.value, Signal):
+ return id(self.value) == id(other.value)
+ elif isinstance(self.value, Slice):
+ return (ValueKey(self.value.value) == ValueKey(other.value.value) and
+ self.value.start == other.value.start and
+ self.value.end == other.value.end)
+ else: # :nocov:
+ raise TypeError("Object '{!r}' cannot be used as a key in value collections")
+
+ def __lt__(self, other):
+ if not isinstance(other, ValueKey):
+ return False
+ if type(self.value) != type(other.value):
+ return False
+
+ if isinstance(self.value, Const):
+ return self.value < other.value
+ elif isinstance(self.value, Signal):
+ return self.value.duid < other.value.duid
+ elif isinstance(self.value, Slice):
+ return (ValueKey(self.value.value) < ValueKey(other.value.value) and
+ self.value.start < other.value.start and
+ self.value.end < other.value.end)
+ else: # :nocov:
+ raise TypeError("Object '{!r}' cannot be used as a key in value collections")
+
+ def __repr__(self):
+ return "<{}.ValueKey {!r}>".format(__name__, self.value)
+
+
+class ValueDict(MutableMapping):
+ def __init__(self, pairs=()):
+ self._inner = dict()
+ for key, value in pairs:
+ self[key] = value
+
+ def __getitem__(self, key):
+ key = None if key is None else ValueKey(key)
+ return self._inner[key]
+
+ def __setitem__(self, key, value):
+ key = None if key is None else ValueKey(key)
+ self._inner[key] = value
+
+ def __delitem__(self, key):
+ key = None if key is None else ValueKey(key)
+ del self._inner[key]
+
+ def __iter__(self):
+ return map(lambda x: None if x is None else x.value, sorted(self._inner))
+
+ def __eq__(self, other):
+ if not isinstance(other, ValueDict):
+ return False
+ if len(self) != len(other):
+ return False
+ for ak, bk in zip(self, other):
+ if ValueKey(ak) != ValueKey(bk):
+ return False
+ if self[ak] != other[bk]:
+ return False
+ return True
+
+ def __len__(self):
+ return len(self._inner)
+
+ def __repr__(self):
+ pairs = ["({!r}, {!r})".format(k, v) for k, v in self.items()]
+ return "ValueDict([{}])".format(", ".join(pairs))
+
+
+class ValueSet(MutableSet):
+ def __init__(self, elements=()):
+ self._inner = set()
+ for elem in elements:
+ self.add(elem)
+
+ def add(self, value):
+ self._inner.add(ValueKey(value))
+
+ def update(self, values):
+ for value in values:
+ self.add(value)
+
+ def discard(self, value):
+ self._inner.discard(ValueKey(value))
+
+ def __contains__(self, value):
+ return ValueKey(value) in self._inner
+
+ def __iter__(self):
+ return map(lambda x: x.value, sorted(self._inner))
+
+ def __len__(self):
+ return len(self._inner)
+
+ def __repr__(self):
+ return "ValueSet({})".format(", ".join(repr(x) for x in self))
--- /dev/null
+from .. import tracer
+from .ast import Signal
+
+
+__all__ = ["ClockDomain", "DomainError"]
+
+
+class DomainError(Exception):
+ pass
+
+
+class ClockDomain:
+ """Synchronous domain.
+
+ Parameters
+ ----------
+ name : str or None
+ Domain name. If ``None`` (the default) the name is inferred from the variable name this
+ ``ClockDomain`` is assigned to (stripping any `"cd_"` prefix).
+ reset_less : bool
+ If ``True``, the domain does not use a reset signal. Registers within this domain are
+ still all initialized to their reset state once, e.g. through Verilog `"initial"`
+ statements.
+ async_reset : bool
+ If ``True``, the domain uses an asynchronous reset, and registers within this domain
+ are initialized to their reset state when reset level changes. Otherwise, registers
+ are initialized to reset state at the next clock cycle when reset is asserted.
+
+ Attributes
+ ----------
+ clk : Signal, inout
+ The clock for this domain. Can be driven or used to drive other signals (preferably
+ in combinatorial context).
+ rst : Signal or None, inout
+ Reset signal for this domain. Can be driven or used to drive.
+ """
+
+ @staticmethod
+ def _name_for(domain_name, signal_name):
+ if domain_name == "sync":
+ return signal_name
+ else:
+ return "{}_{}".format(domain_name, signal_name)
+
+ def __init__(self, name=None, reset_less=False, async_reset=False):
+ if name is None:
+ try:
+ name = tracer.get_var_name()
+ except tracer.NameNotFound:
+ raise ValueError("Clock domain name must be specified explicitly")
+ if name.startswith("cd_"):
+ name = name[3:]
+ self.name = name
+
+ self.clk = Signal(name=self._name_for(name, "clk"), src_loc_at=1)
+ if reset_less:
+ self.rst = None
+ else:
+ self.rst = Signal(name=self._name_for(name, "rst"), src_loc_at=1)
+
+ self.async_reset = async_reset
+
+ def rename(self, new_name):
+ self.name = new_name
+ self.clk.name = self._name_for(new_name, "clk")
+ if self.rst is not None:
+ self.rst.name = self._name_for(new_name, "rst")
--- /dev/null
+from collections import OrderedDict
+from collections.abc import Iterable
+from contextlib import contextmanager
+
+from .ast import *
+from .ir import *
+from .xfrm import *
+
+
+__all__ = ["Module", "SyntaxError"]
+
+
+class SyntaxError(Exception):
+ pass
+
+
+class _ModuleBuilderProxy:
+ def __init__(self, builder, depth):
+ object.__setattr__(self, "_builder", builder)
+ object.__setattr__(self, "_depth", depth)
+
+
+class _ModuleBuilderDomain(_ModuleBuilderProxy):
+ def __init__(self, builder, depth, domain):
+ super().__init__(builder, depth)
+ self._domain = domain
+
+ def __iadd__(self, assigns):
+ self._builder._add_statement(assigns, domain=self._domain, depth=self._depth)
+ return self
+
+
+class _ModuleBuilderDomains(_ModuleBuilderProxy):
+ def __getattr__(self, name):
+ if name == "comb":
+ domain = None
+ else:
+ domain = name
+ return _ModuleBuilderDomain(self._builder, self._depth, domain)
+
+ def __getitem__(self, name):
+ return self.__getattr__(name)
+
+ def __setattr__(self, name, value):
+ if name == "_depth":
+ object.__setattr__(self, name, value)
+ elif not isinstance(value, _ModuleBuilderDomain):
+ raise AttributeError("Cannot assign 'd.{}' attribute; did you mean 'd.{} +='?"
+ .format(name, name))
+
+ def __setitem__(self, name, value):
+ return self.__setattr__(name, value)
+
+
+class _ModuleBuilderRoot:
+ def __init__(self, builder, depth):
+ self._builder = builder
+ self.domain = self.d = _ModuleBuilderDomains(builder, depth)
+
+ def __getattr__(self, name):
+ if name in ("comb", "sync"):
+ raise AttributeError("'{}' object has no attribute '{}'; did you mean 'd.{}'?"
+ .format(type(self).__name__, name, name))
+ raise AttributeError("'{}' object has no attribute '{}'"
+ .format(type(self).__name__, name))
+
+
+class _ModuleBuilderSubmodules:
+ def __init__(self, builder):
+ object.__setattr__(self, "_builder", builder)
+
+ def __iadd__(self, modules):
+ if isinstance(modules, Iterable):
+ for module in modules:
+ self._builder._add_submodule(module)
+ else:
+ module = modules
+ self._builder._add_submodule(module)
+ return self
+
+ def __setattr__(self, name, submodule):
+ self._builder._add_submodule(submodule, name)
+
+
+class Module(_ModuleBuilderRoot):
+ def __init__(self):
+ _ModuleBuilderRoot.__init__(self, self, depth=0)
+ self.submodules = _ModuleBuilderSubmodules(self)
+
+ self._submodules = []
+ self._driving = ValueDict()
+ self._statements = Statement.wrap([])
+ self._ctrl_context = None
+ self._ctrl_stack = []
+ self._stmt_if_cond = []
+ self._stmt_if_bodies = []
+ self._stmt_switch_test = None
+ self._stmt_switch_cases = OrderedDict()
+
+ def _check_context(self, construct, context):
+ if self._ctrl_context != context:
+ if self._ctrl_context is None:
+ raise SyntaxError("{} is not permitted outside of {}"
+ .format(construct, context))
+ else:
+ raise SyntaxError("{} is not permitted inside of {}"
+ .format(construct, self._ctrl_context))
+
+ def _get_ctrl(self, name):
+ if self._ctrl_stack:
+ top_name, top_data = self._ctrl_stack[-1]
+ if top_name == name:
+ return top_data
+
+ def _flush_ctrl(self):
+ while len(self._ctrl_stack) > self.domain._depth:
+ self._pop_ctrl()
+
+ def _set_ctrl(self, name, data):
+ self._flush_ctrl()
+ self._ctrl_stack.append((name, data))
+ return data
+
+ @contextmanager
+ def If(self, cond):
+ self._check_context("If", context=None)
+ if_data = self._set_ctrl("If", {"tests": [], "bodies": []})
+ try:
+ _outer_case, self._statements = self._statements, []
+ self.domain._depth += 1
+ yield
+ self._flush_ctrl()
+ if_data["tests"].append(cond)
+ if_data["bodies"].append(self._statements)
+ finally:
+ self.domain._depth -= 1
+ self._statements = _outer_case
+
+ @contextmanager
+ def Elif(self, cond):
+ self._check_context("Elif", context=None)
+ if_data = self._get_ctrl("If")
+ if if_data is None:
+ raise SyntaxError("Elif without preceding If")
+ try:
+ _outer_case, self._statements = self._statements, []
+ self.domain._depth += 1
+ yield
+ self._flush_ctrl()
+ if_data["tests"].append(cond)
+ if_data["bodies"].append(self._statements)
+ finally:
+ self.domain._depth -= 1
+ self._statements = _outer_case
+
+ @contextmanager
+ def Else(self):
+ self._check_context("Else", context=None)
+ if_data = self._get_ctrl("If")
+ if if_data is None:
+ raise SyntaxError("Else without preceding If/Elif")
+ try:
+ _outer_case, self._statements = self._statements, []
+ self.domain._depth += 1
+ yield
+ self._flush_ctrl()
+ if_data["bodies"].append(self._statements)
+ finally:
+ self.domain._depth -= 1
+ self._statements = _outer_case
+ self._pop_ctrl()
+
+ @contextmanager
+ def Switch(self, test):
+ self._check_context("Switch", context=None)
+ switch_data = self._set_ctrl("Switch", {"test": test, "cases": OrderedDict()})
+ try:
+ self._ctrl_context = "Switch"
+ self.domain._depth += 1
+ yield
+ finally:
+ self.domain._depth -= 1
+ self._ctrl_context = None
+ self._pop_ctrl()
+
+ @contextmanager
+ def Case(self, value=None):
+ self._check_context("Case", context="Switch")
+ switch_data = self._get_ctrl("Switch")
+ if value is None:
+ value = "-" * len(switch_data["test"])
+ if isinstance(value, str) and len(switch_data["test"]) != len(value):
+ raise SyntaxError("Case value '{}' must have the same width as test (which is {})"
+ .format(value, len(switch_data["test"])))
+ try:
+ _outer_case, self._statements = self._statements, []
+ self._ctrl_context = None
+ yield
+ self._flush_ctrl()
+ switch_data["cases"][value] = self._statements
+ finally:
+ self._ctrl_context = "Switch"
+ self._statements = _outer_case
+
+ def _pop_ctrl(self):
+ name, data = self._ctrl_stack.pop()
+
+ if name == "If":
+ if_tests, if_bodies = data["tests"], data["bodies"]
+
+ tests, cases = [], OrderedDict()
+ for if_test, if_case in zip(if_tests + [None], if_bodies):
+ if if_test is not None:
+ if_test = Value.wrap(if_test)
+ if len(if_test) != 1:
+ if_test = if_test.bool()
+ tests.append(if_test)
+
+ if if_test is not None:
+ match = ("1" + "-" * (len(tests) - 1)).rjust(len(if_tests), "-")
+ else:
+ match = "-" * len(tests)
+ cases[match] = if_case
+
+ self._statements.append(Switch(Cat(tests), cases))
+
+ if name == "Switch":
+ switch_test, switch_cases = data["test"], data["cases"]
+
+ self._statements.append(Switch(switch_test, switch_cases))
+
+ def _add_statement(self, assigns, domain, depth, compat_mode=False):
+ def domain_name(domain):
+ if domain is None:
+ return "comb"
+ else:
+ return domain
+
+ while len(self._ctrl_stack) > self.domain._depth:
+ self._pop_ctrl()
+
+ for assign in Statement.wrap(assigns):
+ if not compat_mode and not isinstance(assign, Assign):
+ raise SyntaxError(
+ "Only assignments may be appended to d.{}"
+ .format(domain_name(domain)))
+
+ for signal in assign._lhs_signals():
+ if signal not in self._driving:
+ self._driving[signal] = domain
+ elif self._driving[signal] != domain:
+ cd_curr = self._driving[signal]
+ raise SyntaxError(
+ "Driver-driver conflict: trying to drive {!r} from d.{}, but it is "
+ "already driven from d.{}"
+ .format(signal, domain_name(domain), domain_name(cd_curr)))
+
+ self._statements.append(assign)
+
+ def _add_submodule(self, submodule, name=None):
+ if not hasattr(submodule, "get_fragment"):
+ raise TypeError("Trying to add '{!r}', which does not implement .get_fragment(), as "
+ "a submodule".format(submodule))
+ self._submodules.append((submodule, name))
+
+ def _flush(self):
+ while self._ctrl_stack:
+ self._pop_ctrl()
+
+ def lower(self, platform):
+ self._flush()
+
+ fragment = Fragment()
+ for submodule, name in self._submodules:
+ fragment.add_subfragment(submodule.get_fragment(platform), name)
+ fragment.add_statements(self._statements)
+ for signal, domain in self._driving.items():
+ fragment.add_driver(signal, domain)
+ return fragment
+
+ get_fragment = lower
--- /dev/null
+import warnings
+from collections import defaultdict, OrderedDict
+
+from ..tools import *
+from .ast import *
+from .cd import *
+
+
+__all__ = ["Fragment", "DriverConflict"]
+
+
+class DriverConflict(UserWarning):
+ pass
+
+
+class Fragment:
+ def __init__(self):
+ self.ports = ValueDict()
+ self.drivers = OrderedDict()
+ self.statements = []
+ self.domains = OrderedDict()
+ self.subfragments = []
+
+ def add_ports(self, *ports, kind):
+ assert kind in ("i", "o", "io")
+ for port in flatten(ports):
+ self.ports[port] = kind
+
+ def iter_ports(self):
+ yield from self.ports.keys()
+
+ def add_driver(self, signal, domain=None):
+ if domain not in self.drivers:
+ self.drivers[domain] = ValueSet()
+ self.drivers[domain].add(signal)
+
+ def iter_drivers(self):
+ for domain, signals in self.drivers.items():
+ for signal in signals:
+ yield domain, signal
+
+ def iter_comb(self):
+ if None in self.drivers:
+ yield from self.drivers[None]
+
+ def iter_sync(self):
+ for domain, signals in self.drivers.items():
+ if domain is None:
+ continue
+ for signal in signals:
+ yield domain, signal
+
+ def iter_signals(self):
+ signals = ValueSet()
+ signals |= self.ports.keys()
+ for domain, domain_signals in self.drivers.items():
+ if domain is not None:
+ cd = self.domains[domain]
+ signals.add(cd.clk)
+ if cd.rst is not None:
+ signals.add(cd.rst)
+ signals |= domain_signals
+ return signals
+
+ def add_domains(self, *domains):
+ for domain in domains:
+ assert isinstance(domain, ClockDomain)
+ assert domain.name not in self.domains
+ self.domains[domain.name] = domain
+
+ def iter_domains(self):
+ yield from self.domains
+
+ def add_statements(self, *stmts):
+ self.statements += Statement.wrap(stmts)
+
+ def add_subfragment(self, subfragment, name=None):
+ assert isinstance(subfragment, Fragment)
+ self.subfragments.append((subfragment, name))
+
+ def _resolve_driver_conflicts(self, hierarchy=("top",), mode="warn"):
+ assert mode in ("silent", "warn", "error")
+
+ driver_subfrags = ValueDict()
+
+ # For each signal driven by this fragment and/or its subfragments, determine which
+ # subfragments also drive it.
+ for domain, signal in self.iter_drivers():
+ if signal not in driver_subfrags:
+ driver_subfrags[signal] = set()
+ driver_subfrags[signal].add((None, hierarchy))
+
+ for i, (subfrag, name) in enumerate(self.subfragments):
+ # First, recurse into subfragments and let them detect driver conflicts as well.
+ if name is None:
+ name = "<unnamed #{}>".format(i)
+ subfrag_hierarchy = hierarchy + (name,)
+ subfrag_drivers = subfrag._resolve_driver_conflicts(subfrag_hierarchy, mode)
+
+ # Second, classify subfragments by domains they define.
+ for signal in subfrag_drivers:
+ if signal not in driver_subfrags:
+ driver_subfrags[signal] = set()
+ driver_subfrags[signal].add((subfrag, subfrag_hierarchy))
+
+ # Find out the set of subfragments that needs to be flattened into this fragment
+ # to resolve driver-driver conflicts.
+ flatten_subfrags = set()
+ for signal, subfrags in driver_subfrags.items():
+ if len(subfrags) > 1:
+ flatten_subfrags.update((f, h) for f, h in subfrags if f is not None)
+
+ # While we're at it, show a message.
+ subfrag_names = ", ".join(sorted(".".join(h) for f, h in subfrags))
+ message = ("Signal '{}' is driven from multiple fragments: {}"
+ .format(signal, subfrag_names))
+ if mode == "error":
+ raise DriverConflict(message)
+ elif mode == "warn":
+ message += "; hierarchy will be flattened"
+ warnings.warn_explicit(message, DriverConflict, *signal.src_loc)
+
+ for subfrag, subfrag_hierarchy in sorted(flatten_subfrags, key=lambda x: x[1]):
+ # Merge subfragment's everything except clock domains into this fragment.
+ # Flattening is done after clock domain propagation, so we can assume the domains
+ # are already the same in every involved fragment in the first place.
+ self.ports.update(subfrag.ports)
+ for domain, signal in subfrag.iter_drivers():
+ self.add_driver(signal, domain)
+ self.statements += subfrag.statements
+ self.subfragments += subfrag.subfragments
+
+ # Remove the merged subfragment.
+ for i, (check_subfrag, check_name) in enumerate(self.subfragments): # :nobr:
+ if subfrag == check_subfrag:
+ del self.subfragments[i]
+ break
+
+ # If we flattened anything, we might be in a situation where we have a driver conflict
+ # again, e.g. if we had a tree of fragments like A --- B --- C where only fragments
+ # A and C were driving a signal S. In that case, since B is not driving S itself,
+ # processing B will not result in any flattening, but since B is transitively driving S,
+ # processing A will flatten B into it. Afterwards, we have a tree like AB --- C, which
+ # has another conflict.
+ if any(flatten_subfrags):
+ # Try flattening again.
+ return self._resolve_driver_conflicts(hierarchy, mode)
+
+ # Nothing was flattened, we're done!
+ return ValueSet(driver_subfrags.keys())
+
+ def _propagate_domains_up(self, hierarchy=("top",)):
+ from .xfrm import DomainRenamer
+
+ domain_subfrags = defaultdict(lambda: set())
+
+ # For each domain defined by a subfragment, determine which subfragments define it.
+ for i, (subfrag, name) in enumerate(self.subfragments):
+ # First, recurse into subfragments and let them propagate domains up as well.
+ hier_name = name
+ if hier_name is None:
+ hier_name = "<unnamed #{}>".format(i)
+ subfrag._propagate_domains_up(hierarchy + (hier_name,))
+
+ # Second, classify subfragments by domains they define.
+ for domain in subfrag.iter_domains():
+ domain_subfrags[domain].add((subfrag, name, i))
+
+ # For each domain defined by more than one subfragment, rename the domain in each
+ # of the subfragments such that they no longer conflict.
+ for domain, subfrags in domain_subfrags.items():
+ if len(subfrags) == 1:
+ continue
+
+ names = [n for f, n, i in subfrags]
+ if not all(names):
+ names = sorted("<unnamed #{}>".format(i) if n is None else "'{}'".format(n)
+ for f, n, i in subfrags)
+ raise DomainError("Domain '{}' is defined by subfragments {} of fragment '{}'; "
+ "it is necessary to either rename subfragment domains "
+ "explicitly, or give names to subfragments"
+ .format(domain, ", ".join(names), ".".join(hierarchy)))
+
+ if len(names) != len(set(names)):
+ names = sorted("#{}".format(i) for f, n, i in subfrags)
+ raise DomainError("Domain '{}' is defined by subfragments {} of fragment '{}', "
+ "some of which have identical names; it is necessary to either "
+ "rename subfragment domains explicitly, or give distinct names "
+ "to subfragments"
+ .format(domain, ", ".join(names), ".".join(hierarchy)))
+
+ for subfrag, name, i in subfrags:
+ self.subfragments[i] = \
+ (DomainRenamer({domain: "{}_{}".format(name, domain)})(subfrag), name)
+
+ # Finally, collect the (now unique) subfragment domains, and merge them into our domains.
+ for subfrag, name in self.subfragments:
+ for domain in subfrag.iter_domains():
+ self.add_domains(subfrag.domains[domain])
+
+ def _propagate_domains_down(self):
+ # For each domain defined in this fragment, ensure it also exists in all subfragments.
+ for subfrag, name in self.subfragments:
+ for domain in self.iter_domains():
+ if domain in subfrag.domains:
+ assert self.domains[domain] is subfrag.domains[domain]
+ else:
+ subfrag.add_domains(self.domains[domain])
+
+ subfrag._propagate_domains_down()
+
+ def _propagate_domains(self, ensure_sync_exists):
+ self._propagate_domains_up()
+ if ensure_sync_exists and not self.domains:
+ self.add_domains(ClockDomain("sync"))
+ self._propagate_domains_down()
+
+ def _insert_domain_resets(self):
+ from .xfrm import ResetInserter
+
+ resets = {cd.name: cd.rst for cd in self.domains.values() if cd.rst is not None}
+ return ResetInserter(resets)(self)
+
+ def _lower_domain_signals(self):
+ from .xfrm import DomainLowerer
+
+ return DomainLowerer(self.domains)(self)
+
+ def _propagate_ports(self, ports):
+ # Collect all signals we're driving (on LHS of statements), and signals we're using
+ # (on RHS of statements, or in clock domains).
+ self_driven = union(s._lhs_signals() for s in self.statements) or ValueSet()
+ self_used = union(s._rhs_signals() for s in self.statements) or ValueSet()
+ for domain, _ in self.iter_sync():
+ cd = self.domains[domain]
+ self_used.add(cd.clk)
+ if cd.rst is not None:
+ self_used.add(cd.rst)
+
+ # Our input ports are all the signals we're using but not driving. This is an over-
+ # approximation: some of these signals may be driven by our subfragments.
+ ins = self_used - self_driven
+ # Our output ports are all the signals we're asked to provide that we're driving. This is
+ # an underapproximation: some of these signals may be driven by subfragments.
+ outs = ports & self_driven
+
+ # Go through subfragments and refine our approximation for ports.
+ for subfrag, name in self.subfragments:
+ # Always ask subfragments to provide all signals we're using and signals we're asked
+ # to provide. If the subfragment is not driving it, it will silently ignore it.
+ sub_ins, sub_outs = subfrag._propagate_ports(ports=self_used | ports)
+ # Refine the input port approximation: if a subfragment is driving a signal,
+ # it is definitely not our input. But, if a subfragment requires a signal as an input,
+ # and we aren't driving it, it has to be our input as well.
+ ins -= sub_outs
+ ins |= sub_ins - self_driven
+ # Refine the output port approximation: if a subfragment is driving a signal,
+ # and we're asked to provide it, we can provide it now.
+ outs |= ports & sub_outs
+
+ # We've computed the precise set of input and output ports.
+ self.add_ports(ins, kind="i")
+ self.add_ports(outs, kind="o")
+
+ return ins, outs
+
+ def prepare(self, ports=(), ensure_sync_exists=True):
+ from .xfrm import FragmentTransformer
+
+ fragment = FragmentTransformer()(self)
+ fragment._propagate_domains(ensure_sync_exists)
+ fragment._resolve_driver_conflicts()
+ fragment = fragment._insert_domain_resets()
+ fragment = fragment._lower_domain_signals()
+ fragment._propagate_ports(ports)
+ return fragment
--- /dev/null
+from collections import OrderedDict
+from collections.abc import Iterable
+
+from ..tools import flatten
+from .ast import *
+from .ast import _StatementList
+from .cd import *
+from .ir import *
+
+
+__all__ = ["ValueTransformer", "StatementTransformer", "FragmentTransformer",
+ "DomainRenamer", "DomainLowerer", "ResetInserter", "CEInserter"]
+
+
+class ValueTransformer:
+ def on_Const(self, value):
+ return value
+
+ def on_Signal(self, value):
+ return value
+
+ def on_ClockSignal(self, value):
+ return value
+
+ def on_ResetSignal(self, value):
+ return value
+
+ def on_Operator(self, value):
+ return Operator(value.op, [self.on_value(o) for o in value.operands])
+
+ def on_Slice(self, value):
+ return Slice(self.on_value(value.value), value.start, value.end)
+
+ def on_Part(self, value):
+ return Part(self.on_value(value.value), self.on_value(value.offset), value.width)
+
+ def on_Cat(self, value):
+ return Cat(self.on_value(o) for o in value.operands)
+
+ def on_Repl(self, value):
+ return Repl(self.on_value(value.value), value.count)
+
+ def on_unknown_value(self, value):
+ raise TypeError("Cannot transform value '{!r}'".format(value)) # :nocov:
+
+ def on_value(self, value):
+ if isinstance(value, Const):
+ new_value = self.on_Const(value)
+ elif isinstance(value, Signal):
+ new_value = self.on_Signal(value)
+ elif isinstance(value, ClockSignal):
+ new_value = self.on_ClockSignal(value)
+ elif isinstance(value, ResetSignal):
+ new_value = self.on_ResetSignal(value)
+ elif isinstance(value, Operator):
+ new_value = self.on_Operator(value)
+ elif isinstance(value, Slice):
+ new_value = self.on_Slice(value)
+ elif isinstance(value, Part):
+ new_value = self.on_Part(value)
+ elif isinstance(value, Cat):
+ new_value = self.on_Cat(value)
+ elif isinstance(value, Repl):
+ new_value = self.on_Repl(value)
+ else:
+ new_value = self.on_unknown_value(value)
+ if isinstance(new_value, Value):
+ new_value.src_loc = value.src_loc
+ return new_value
+
+ def __call__(self, value):
+ return self.on_value(value)
+
+
+class StatementTransformer:
+ def on_value(self, value):
+ return value
+
+ def on_Assign(self, stmt):
+ return Assign(self.on_value(stmt.lhs), self.on_value(stmt.rhs))
+
+ def on_Switch(self, stmt):
+ cases = OrderedDict((k, self.on_statement(v)) for k, v in stmt.cases.items())
+ return Switch(self.on_value(stmt.test), cases)
+
+ def on_statements(self, stmt):
+ return _StatementList(flatten(self.on_statement(stmt) for stmt in stmt))
+
+ def on_unknown_statement(self, stmt):
+ raise TypeError("Cannot transform statement '{!r}'".format(stmt)) # :nocov:
+
+ def on_statement(self, stmt):
+ if isinstance(stmt, Assign):
+ return self.on_Assign(stmt)
+ elif isinstance(stmt, Switch):
+ return self.on_Switch(stmt)
+ elif isinstance(stmt, Iterable):
+ return self.on_statements(stmt)
+ else:
+ return self.on_unknown_statement(stmt)
+
+ def __call__(self, value):
+ return self.on_statement(value)
+
+
+class FragmentTransformer:
+ def map_subfragments(self, fragment, new_fragment):
+ for subfragment, name in fragment.subfragments:
+ new_fragment.add_subfragment(self(subfragment), name)
+
+ def map_domains(self, fragment, new_fragment):
+ for domain in fragment.iter_domains():
+ new_fragment.add_domains(fragment.domains[domain])
+
+ def map_statements(self, fragment, new_fragment):
+ if hasattr(self, "on_statement"):
+ new_fragment.add_statements(map(self.on_statement, fragment.statements))
+ else:
+ new_fragment.add_statements(fragment.statements)
+
+ def map_drivers(self, fragment, new_fragment):
+ for domain, signal in fragment.iter_drivers():
+ new_fragment.add_driver(signal, domain)
+
+ def on_fragment(self, fragment):
+ new_fragment = Fragment()
+ self.map_subfragments(fragment, new_fragment)
+ self.map_domains(fragment, new_fragment)
+ self.map_statements(fragment, new_fragment)
+ self.map_drivers(fragment, new_fragment)
+ return new_fragment
+
+ def __call__(self, value):
+ return self.on_fragment(value)
+
+
+class DomainRenamer(FragmentTransformer, ValueTransformer, StatementTransformer):
+ def __init__(self, domain_map):
+ if isinstance(domain_map, str):
+ domain_map = {"sync": domain_map}
+ self.domain_map = OrderedDict(domain_map)
+
+ def on_ClockSignal(self, value):
+ if value.domain in self.domain_map:
+ return ClockSignal(self.domain_map[value.domain])
+ return value
+
+ def on_ResetSignal(self, value):
+ if value.domain in self.domain_map:
+ return ResetSignal(self.domain_map[value.domain])
+ return value
+
+ def map_domains(self, fragment, new_fragment):
+ for domain in fragment.iter_domains():
+ cd = fragment.domains[domain]
+ if domain in self.domain_map:
+ if cd.name == domain:
+ # Rename the actual ClockDomain object.
+ cd.rename(self.domain_map[domain])
+ else:
+ assert cd.name == self.domain_map[domain]
+ new_fragment.add_domains(cd)
+
+ def map_drivers(self, fragment, new_fragment):
+ for domain, signals in fragment.drivers.items():
+ if domain in self.domain_map:
+ domain = self.domain_map[domain]
+ for signal in signals:
+ new_fragment.add_driver(signal, domain)
+
+
+class DomainLowerer(FragmentTransformer, ValueTransformer, StatementTransformer):
+ def __init__(self, domains):
+ self.domains = domains
+
+ def _resolve(self, domain, context):
+ if domain not in self.domains:
+ raise DomainError("Signal {!r} refers to nonexistent domain '{}'"
+ .format(context, domain))
+ return self.domains[domain]
+
+ def on_ClockSignal(self, value):
+ cd = self._resolve(value.domain, value)
+ return cd.clk
+
+ def on_ResetSignal(self, value):
+ cd = self._resolve(value.domain, value)
+ if cd.rst is None:
+ if value.allow_reset_less:
+ return Const(0)
+ else:
+ raise DomainError("Signal {!r} refers to reset of reset-less domain '{}'"
+ .format(value, value.domain))
+ return cd.rst
+
+
+class _ControlInserter(FragmentTransformer):
+ def __init__(self, controls):
+ if isinstance(controls, Value):
+ controls = {"sync": controls}
+ self.controls = OrderedDict(controls)
+
+ def on_fragment(self, fragment):
+ new_fragment = super().on_fragment(fragment)
+ for domain, signals in fragment.drivers.items():
+ if domain is None or domain not in self.controls:
+ continue
+ self._insert_control(new_fragment, domain, signals)
+ return new_fragment
+
+ def _insert_control(self, fragment, domain, signals):
+ raise NotImplementedError # :nocov:
+
+
+class ResetInserter(_ControlInserter):
+ def _insert_control(self, fragment, domain, signals):
+ stmts = [s.eq(Const(s.reset, s.nbits)) for s in signals if not s.reset_less]
+ fragment.add_statements(Switch(self.controls[domain], {1: stmts}))
+
+
+class CEInserter(_ControlInserter):
+ def _insert_control(self, fragment, domain, signals):
+ stmts = [s.eq(s) for s in signals]
+ fragment.add_statements(Switch(self.controls[domain], {0: stmts}))
--- /dev/null
+from .. import *
+
+
+__all__ = ["MultiReg"]
+
+
+class MultiReg:
+ def __init__(self, i, o, odomain="sync", n=2, reset=0):
+ self.i = i
+ self.o = o
+ self.odomain = odomain
+
+ self._regs = [Signal(self.i.shape(), name="cdc{}".format(i),
+ reset=reset, reset_less=True, attrs={"no_retiming": True})
+ for i in range(n)]
+
+ def get_fragment(self, platform):
+ if hasattr(platform, "get_multi_reg"):
+ return platform.get_multi_reg(self)
+
+ m = Module()
+ for i, o in zip((self.i, *self._regs), self._regs):
+ m.d[self.odomain] += o.eq(i)
+ m.d.comb += self.o.eq(self._regs[-1])
+ return m.lower(platform)
--- /dev/null
+from .. import *
+
+
+__all__ = ["TSTriple"]
+
+
+class TSTriple:
+ def __init__(self, shape=None, min=None, max=None, reset_o=0, reset_oe=0, reset_i=0,
+ name=None):
+ self.o = Signal(shape, min=min, max=max, reset=reset_o,
+ name=None if name is None else name + "_o")
+ self.oe = Signal(reset=reset_oe,
+ name=None if name is None else name + "_oe")
+ self.i = Signal(shape, min=min, max=max, reset=reset_i,
+ name=None if name is None else name + "_i")
+
+ def __len__(self):
+ return len(self.o)
+
+ def get_fragment(self, platform):
+ return Fragment()
-from ..fhdl.ast import *
+from ..hdl.ast import *
from .tools import *
-from ..fhdl.cd import *
+from ..hdl.cd import *
from .tools import *
-from ..fhdl.ast import *
-from ..fhdl.dsl import *
+from ..hdl.ast import *
+from ..hdl.dsl import *
from .tools import *
-from ..fhdl.ast import *
-from ..fhdl.cd import *
-from ..fhdl.ir import *
+from ..hdl.ast import *
+from ..hdl.cd import *
+from ..hdl.ir import *
from .tools import *
-from ..fhdl.ast import *
-from ..fhdl.cd import *
-from ..fhdl.ir import *
-from ..fhdl.xfrm import *
+from ..hdl.ast import *
+from ..hdl.cd import *
+from ..hdl.ir import *
+from ..hdl.xfrm import *
from .tools import *
from .tools import *
-from ..fhdl.ast import *
-from ..fhdl.ir import *
+from ..hdl.ast import *
+from ..hdl.ir import *
from ..back.pysim import *
import warnings
from contextlib import contextmanager
-from ..fhdl.ast import *
+from ..hdl.ast import *
__all__ = ["FHDLTestCase"]