+++ /dev/null
-Unless otherwise noted, Migen is copyright (C) 2011-2013 Sebastien Bourdeauducq.
-The simulation extension (as mentioned in the comments at the beginning of the
-corresponding source files) is copyright (C) 2012 Vermeer Manufacturing Co. All
-rights reserved.
-
-Redistribution and use in source and binary forms, with or without modification,
-are permitted provided that the following conditions are met:
-
-1. Redistributions of source code must retain the above copyright notice, this
- list of conditions and the following disclaimer.
-2. Redistributions in binary form must reproduce the above copyright notice,
- this list of conditions and the following disclaimer in the documentation
- and/or other materials provided with the distribution.
-
-THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
-ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
-WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
-DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
-ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
-(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
-LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
-ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
-SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-
-Other authors retain ownership of their contributions. If a submission can
-reasonably be considered independently copyrightable, it's yours and we
-encourage you to claim it with appropriate copyright notices. This submission
-then falls under the "otherwise noted" category. All submissions are strongly
-encouraged to use the two-clause BSD license reproduced above.
+++ /dev/null
-from litex.gen.fhdl import structure as f
-
-
-__all__ = ["log2_int", "bits_for", "value_bits_sign"]
-
-
-def log2_int(n, need_pow2=True):
- if n == 0:
- return 0
- r = (n - 1).bit_length()
- if need_pow2 and (1 << r) != n:
- raise ValueError("Not a power of 2")
- return r
-
-
-def bits_for(n, require_sign_bit=False):
- if n > 0:
- r = log2_int(n + 1, False)
- else:
- require_sign_bit = True
- r = log2_int(-n, False)
- if require_sign_bit:
- r += 1
- return r
-
-
-def _bitwise_binary_bits_sign(a, b):
- if not a[1] and not b[1]:
- # both operands unsigned
- return max(a[0], b[0]), False
- elif a[1] and b[1]:
- # both operands signed
- return max(a[0], b[0]), True
- elif not a[1] and b[1]:
- # first operand unsigned (add sign bit), second operand signed
- return max(a[0] + 1, b[0]), True
- else:
- # first signed, second operand unsigned (add sign bit)
- return max(a[0], b[0] + 1), True
-
-
-def value_bits_sign(v):
- """Bit length and signedness of a value.
-
- Parameters
- ----------
- v : Value
-
- Returns
- -------
- int, bool
- Number of bits required to store `v` or available in `v`, followed by
- whether `v` has a sign bit (included in the bit count).
-
- Examples
- --------
- >>> value_bits_sign(f.Signal(8))
- 8, False
- >>> value_bits_sign(C(0xaa))
- 8, False
- """
- if isinstance(v, (f.Constant, f.Signal)):
- return v.nbits, v.signed
- elif isinstance(v, (f.ClockSignal, f.ResetSignal)):
- return 1, False
- elif isinstance(v, f._Operator):
- obs = list(map(value_bits_sign, v.operands))
- if v.op == "+" or v.op == "-":
- if len(obs) == 1:
- if v.op == "-" and not obs[0][1]:
- return obs[0][0] + 1, True
- else:
- return obs[0]
- n, s = _bitwise_binary_bits_sign(*obs)
- return n + 1, s
- elif v.op == "*":
- if not obs[0][1] and not obs[1][1]:
- # both operands unsigned
- return obs[0][0] + obs[1][0], False
- elif obs[0][1] and obs[1][1]:
- # both operands signed
- return obs[0][0] + obs[1][0] - 1, True
- else:
- # one operand signed, the other unsigned (add sign bit)
- return obs[0][0] + obs[1][0] + 1 - 1, True
- elif v.op == "<<<":
- if obs[1][1]:
- extra = 2**(obs[1][0] - 1) - 1
- else:
- extra = 2**obs[1][0] - 1
- return obs[0][0] + extra, obs[0][1]
- elif v.op == ">>>":
- if obs[1][1]:
- extra = 2**(obs[1][0] - 1)
- else:
- extra = 0
- return obs[0][0] + extra, obs[0][1]
- elif v.op == "&" or v.op == "^" or v.op == "|":
- return _bitwise_binary_bits_sign(*obs)
- elif (v.op == "<" or v.op == "<=" or v.op == "==" or v.op == "!=" or
- v.op == ">" or v.op == ">="):
- return 1, False
- elif v.op == "~":
- return obs[0]
- elif v.op == "m":
- return _bitwise_binary_bits_sign(obs[1], obs[2])
- else:
- raise TypeError
- elif isinstance(v, f._Slice):
- return v.stop - v.start, value_bits_sign(v.value)[1]
- elif isinstance(v, f.Cat):
- return sum(value_bits_sign(sv)[0] for sv in v.l), False
- elif isinstance(v, f.Replicate):
- return (value_bits_sign(v.v)[0])*v.n, False
- elif isinstance(v, f._ArrayProxy):
- bsc = list(map(value_bits_sign, v.choices))
- return max(bs[0] for bs in bsc), any(bs[1] for bs in bsc)
- else:
- raise TypeError("Can not calculate bit length of {} {}".format(
- type(v), v))
+++ /dev/null
-from operator import itemgetter
-
-
-class ConvOutput:
- def __init__(self):
- self.main_source = ""
- self.data_files = dict()
-
- def set_main_source(self, src):
- self.main_source = src
-
- def add_data_file(self, filename_base, content):
- filename = filename_base
- i = 1
- while filename in self.data_files:
- parts = filename_base.split(".", maxsplit=1)
- parts[0] += "_" + str(i)
- filename = ".".join(parts)
- i += 1
- self.data_files[filename] = content
- return filename
-
- def __str__(self):
- r = self.main_source + "\n"
- for filename, content in sorted(self.data_files.items(),
- key=itemgetter(0)):
- r += filename + ":\n" + content
- return r
-
- def write(self, main_filename):
- with open(main_filename, "w") as f:
- f.write(self.main_source)
- for filename, content in self.data_files.items():
- with open(filename, "w") as f:
- f.write(content)
+++ /dev/null
-from litex.gen.fhdl.structure import *
-from litex.gen.fhdl.module import Module
-from litex.gen.fhdl.tools import insert_reset, rename_clock_domain
-
-
-__all__ = ["CEInserter", "ResetInserter", "ClockDomainsRenamer",
- "ModuleTransformer"]
-
-
-class ModuleTransformer:
- # overload this in derived classes
- def transform_instance(self, i):
- pass
-
- # overload this in derived classes
- def transform_fragment(self, i, f):
- pass
-
- def wrap_class(self, victim):
- class Wrapped(victim):
- def __init__(i, *args, **kwargs):
- victim.__init__(i, *args, **kwargs)
- self.transform_instance(i)
-
- def get_fragment(i):
- f = victim.get_fragment(i)
- self.transform_fragment(i, f)
- return f
-
- Wrapped.__name__ = victim.__name__
- Wrapped.__doc__ = victim.__doc__
- Wrapped.__module__ = victim.__module__
- return Wrapped
-
- def wrap_instance(self, victim):
- self.transform_instance(victim)
- orig_get_fragment = victim.get_fragment
-
- def get_fragment():
- f = orig_get_fragment()
- self.transform_fragment(victim, f)
- return f
-
- victim.get_fragment = get_fragment
- return victim
-
- def __call__(self, victim):
- if isinstance(victim, Module):
- return self.wrap_instance(victim)
- else:
- return self.wrap_class(victim)
-
-
-class ControlInserter(ModuleTransformer):
- control_name = None # override this
-
- def __init__(self, clock_domains=None):
- self.clock_domains = clock_domains
-
- def transform_instance(self, i):
- if self.clock_domains is None:
- ctl = Signal(name=self.control_name)
- assert not hasattr(i, self.control_name)
- setattr(i, self.control_name, ctl)
- else:
- for cd in self.clock_domains:
- name = self.control_name + "_" + cd
- ctl = Signal(name=name)
- assert not hasattr(i, name)
- setattr(i, name, ctl)
-
- def transform_fragment(self, i, f):
- if self.clock_domains is None:
- if not f.sync:
- return
- if len(f.sync) > 1:
- raise ValueError("Control signal clock domains must be specified when module has more than one domain")
- cdn = list(f.sync.keys())[0]
- to_insert = [(getattr(i, self.control_name), cdn)]
- else:
- to_insert = [(getattr(i, self.control_name + "_" + cdn), cdn)
- for cdn in self.clock_domains]
- self.transform_fragment_insert(i, f, to_insert)
-
-
-class CEInserter(ControlInserter):
- control_name = "ce"
-
- def transform_fragment_insert(self, i, f, to_insert):
- for ce, cdn in to_insert:
- f.sync[cdn] = [If(ce, *f.sync[cdn])]
-
-
-class ResetInserter(ControlInserter):
- control_name = "reset"
-
- def transform_fragment_insert(self, i, f, to_insert):
- for reset, cdn in to_insert:
- f.sync[cdn] = insert_reset(reset, f.sync[cdn])
-
-
-class ClockDomainsRenamer(ModuleTransformer):
- def __init__(self, cd_remapping):
- if isinstance(cd_remapping, str):
- cd_remapping = {"sys": cd_remapping}
- self.cd_remapping = cd_remapping
-
- def transform_fragment(self, i, f):
- for old, new in self.cd_remapping.items():
- rename_clock_domain(f, old, new)
+++ /dev/null
-import collections
-from itertools import combinations
-
-from litex.gen.util.misc import flat_iteration
-from litex.gen.fhdl.structure import *
-from litex.gen.fhdl.structure import _Fragment
-from litex.gen.fhdl.tools import rename_clock_domain
-
-
-__all__ = ["Module", "FinalizeError"]
-
-
-class FinalizeError(Exception):
- pass
-
-
-def _flat_list(e):
- if isinstance(e, collections.Iterable):
- return flat_iteration(e)
- else:
- return [e]
-
-
-class _ModuleProxy:
- def __init__(self, fm):
- object.__setattr__(self, "_fm", fm)
-
-
-class _ModuleComb(_ModuleProxy):
- def __iadd__(self, other):
- self._fm._fragment.comb += _flat_list(other)
- return self
-
-
-def _cd_append(d, key, statements):
- try:
- l = d[key]
- except KeyError:
- l = []
- d[key] = l
- l += _flat_list(statements)
-
-
-class _ModuleSyncCD:
- def __init__(self, fm, cd):
- self._fm = fm
- self._cd = cd
-
- def __iadd__(self, other):
- _cd_append(self._fm._fragment.sync, self._cd, other)
- return self
-
-
-class _ModuleSync(_ModuleProxy):
- def __iadd__(self, other):
- _cd_append(self._fm._fragment.sync, "sys", other)
- return self
-
- def __getattr__(self, name):
- return _ModuleSyncCD(self._fm, name)
-
- def __setattr__(self, name, value):
- if not isinstance(value, _ModuleSyncCD):
- raise AttributeError("Attempted to assign sync property - use += instead")
-
-
-# _ModuleForwardAttr enables user classes to do e.g.:
-# self.subm.foobar = SomeModule()
-# and then access the submodule with self.foobar.
-class _ModuleForwardAttr:
- def __setattr__(self, name, value):
- self.__iadd__(value)
- setattr(self._fm, name, value)
-
-
-class _ModuleSpecials(_ModuleProxy, _ModuleForwardAttr):
- def __iadd__(self, other):
- self._fm._fragment.specials |= set(_flat_list(other))
- return self
-
-
-class _ModuleSubmodules(_ModuleProxy):
- def __setattr__(self, name, value):
- self._fm._submodules += [(name, e) for e in _flat_list(value)]
- setattr(self._fm, name, value)
-
- def __iadd__(self, other):
- self._fm._submodules += [(None, e) for e in _flat_list(other)]
- return self
-
-
-class _ModuleClockDomains(_ModuleProxy, _ModuleForwardAttr):
- def __iadd__(self, other):
- self._fm._fragment.clock_domains += _flat_list(other)
- return self
-
-
-class Module:
- def get_fragment(self):
- assert(not self.get_fragment_called)
- self.get_fragment_called = True
- self.finalize()
- return self._fragment
-
- def __getattr__(self, name):
- if name == "comb":
- return _ModuleComb(self)
- elif name == "sync":
- return _ModuleSync(self)
- elif name == "specials":
- return _ModuleSpecials(self)
- elif name == "submodules":
- return _ModuleSubmodules(self)
- elif name == "clock_domains":
- return _ModuleClockDomains(self)
-
- # hack to have initialized regular attributes without using __init__
- # (which would require derived classes to call it)
- elif name == "finalized":
- self.finalized = False
- return self.finalized
- elif name == "_fragment":
- self._fragment = _Fragment()
- return self._fragment
- elif name == "_submodules":
- self._submodules = []
- return self._submodules
- elif name == "_clock_domains":
- self._clock_domains = []
- return self._clock_domains
- elif name == "get_fragment_called":
- self.get_fragment_called = False
- return self.get_fragment_called
-
- else:
- raise AttributeError("'"+self.__class__.__name__+"' object has no attribute '"+name+"'")
-
- def __setattr__(self, name, value):
- if name in ["comb", "sync", "specials", "submodules", "clock_domains"]:
- if not isinstance(value, _ModuleProxy):
- raise AttributeError("Attempted to assign special Module property - use += instead")
- else:
- object.__setattr__(self, name, value)
-
- def _collect_submodules(self):
- r = []
- for name, submodule in self._submodules:
- if not submodule.get_fragment_called:
- r.append((name, submodule.get_fragment()))
- return r
-
- def finalize(self, *args, **kwargs):
- if not self.finalized:
- self.finalized = True
- # finalize existing submodules before finalizing us
- subfragments = self._collect_submodules()
- self.do_finalize(*args, **kwargs)
- # finalize submodules created by do_finalize
- subfragments += self._collect_submodules()
- # resolve clock domain name conflicts
- needs_renaming = set()
- for (mod_name1, f1), (mod_name2, f2) in combinations(subfragments, 2):
- f1_names = set(cd.name for cd in f1.clock_domains)
- f2_names = set(cd.name for cd in f2.clock_domains)
- common_names = f1_names & f2_names
- if common_names:
- if mod_name1 is None or mod_name2 is None:
- raise ValueError("Multiple submodules with local clock domains cannot be anonymous")
- if mod_name1 == mod_name2:
- raise ValueError("Multiple submodules with local clock domains cannot have the same name")
- needs_renaming |= common_names
- for mod_name, f in subfragments:
- for cd in f.clock_domains:
- if cd.name in needs_renaming:
- rename_clock_domain(f, cd.name, mod_name + "_" + cd.name)
- # sum subfragments
- for mod_name, f in subfragments:
- self._fragment += f
-
- def do_finalize(self):
- pass
-
- def do_exit(self, *args, **kwargs):
- for name, submodule in self._submodules:
- submodule.do_exit(*args, **kwargs)
+++ /dev/null
-from collections import OrderedDict
-from itertools import combinations
-
-from litex.gen.fhdl.structure import *
-
-
-class _Node:
- def __init__(self):
- self.signal_count = 0
- self.numbers = set()
- self.use_name = False
- self.use_number = False
- self.children = OrderedDict()
-
-
-def _display_tree(filename, tree):
- from litex.gen.util.treeviz import RenderNode
-
- def _to_render_node(name, node):
- children = [_to_render_node(k, v) for k, v in node.children.items()]
- if node.use_name:
- if node.use_number:
- color = (0.5, 0.9, 0.8)
- else:
- color = (0.8, 0.5, 0.9)
- else:
- if node.use_number:
- color = (0.9, 0.8, 0.5)
- else:
- color = (0.8, 0.8, 0.8)
- label = "{0}\n{1} signals\n{2}".format(name, node.signal_count, node.numbers)
- return RenderNode(label, children, color=color)
-
- top = _to_render_node("top", tree)
- top.to_svg(filename)
-
-
-def _build_tree(signals, basic_tree=None):
- root = _Node()
- for signal in signals:
- current_b = basic_tree
- current = root
- current.signal_count += 1
- for name, number in signal.backtrace:
- if basic_tree is None:
- use_number = False
- else:
- current_b = current_b.children[name]
- use_number = current_b.use_number
- if use_number:
- key = (name, number)
- else:
- key = name
- try:
- current = current.children[key]
- except KeyError:
- new = _Node()
- current.children[key] = new
- current = new
- current.numbers.add(number)
- if use_number:
- current.all_numbers = sorted(current_b.numbers)
- current.signal_count += 1
- return root
-
-
-def _set_use_name(node, node_name=""):
- cnames = [(k, _set_use_name(v, k)) for k, v in node.children.items()]
- for (c1_prefix, c1_names), (c2_prefix, c2_names) in combinations(cnames, 2):
- if not c1_names.isdisjoint(c2_names):
- node.children[c1_prefix].use_name = True
- node.children[c2_prefix].use_name = True
- r = set()
- for c_prefix, c_names in cnames:
- if node.children[c_prefix].use_name:
- for c_name in c_names:
- r.add((c_prefix, ) + c_name)
- else:
- r |= c_names
-
- if node.signal_count > sum(c.signal_count for c in node.children.values()):
- node.use_name = True
- r.add((node_name, ))
-
- return r
-
-
-def _name_signal(tree, signal):
- elements = []
- treepos = tree
- for step_name, step_n in signal.backtrace:
- try:
- treepos = treepos.children[(step_name, step_n)]
- use_number = True
- except KeyError:
- treepos = treepos.children[step_name]
- use_number = False
- if treepos.use_name:
- elname = step_name
- if use_number:
- elname += str(treepos.all_numbers.index(step_n))
- elements.append(elname)
- return "_".join(elements)
-
-
-def _build_pnd_from_tree(tree, signals):
- return dict((signal, _name_signal(tree, signal)) for signal in signals)
-
-
-def _invert_pnd(pnd):
- inv_pnd = dict()
- for k, v in pnd.items():
- inv_pnd[v] = inv_pnd.get(v, [])
- inv_pnd[v].append(k)
- return inv_pnd
-
-
-def _list_conflicting_signals(pnd):
- inv_pnd = _invert_pnd(pnd)
- r = set()
- for k, v in inv_pnd.items():
- if len(v) > 1:
- r.update(v)
- return r
-
-
-def _set_use_number(tree, signals):
- for signal in signals:
- current = tree
- for step_name, step_n in signal.backtrace:
- current = current.children[step_name]
- current.use_number = current.signal_count > len(current.numbers) and len(current.numbers) > 1
-
-_debug = False
-
-
-def _build_pnd_for_group(group_n, signals):
- basic_tree = _build_tree(signals)
- _set_use_name(basic_tree)
- if _debug:
- _display_tree("tree{0}_basic.svg".format(group_n), basic_tree)
- pnd = _build_pnd_from_tree(basic_tree, signals)
-
- # If there are conflicts, try splitting the tree by numbers
- # on paths taken by conflicting signals.
- conflicting_signals = _list_conflicting_signals(pnd)
- if conflicting_signals:
- _set_use_number(basic_tree, conflicting_signals)
- if _debug:
- print("namer: using split-by-number strategy (group {0})".format(group_n))
- _display_tree("tree{0}_marked.svg".format(group_n), basic_tree)
- numbered_tree = _build_tree(signals, basic_tree)
- _set_use_name(numbered_tree)
- if _debug:
- _display_tree("tree{0}_numbered.svg".format(group_n), numbered_tree)
- pnd = _build_pnd_from_tree(numbered_tree, signals)
- else:
- if _debug:
- print("namer: using basic strategy (group {0})".format(group_n))
-
- # ...then add number suffixes by DUID
- inv_pnd = _invert_pnd(pnd)
- duid_suffixed = False
- for name, signals in inv_pnd.items():
- if len(signals) > 1:
- duid_suffixed = True
- for n, signal in enumerate(sorted(signals, key=lambda x: x.duid)):
- pnd[signal] += str(n)
- if _debug and duid_suffixed:
- print("namer: using DUID suffixes (group {0})".format(group_n))
-
- return pnd
-
-
-def _build_signal_groups(signals):
- r = []
- for signal in signals:
- # build chain of related signals
- related_list = []
- cur_signal = signal
- while cur_signal is not None:
- related_list.insert(0, cur_signal)
- cur_signal = cur_signal.related
- # add to groups
- for _ in range(len(related_list) - len(r)):
- r.append(set())
- for target_set, source_signal in zip(r, related_list):
- target_set.add(source_signal)
- # with the algorithm above and a list of all signals,
- # a signal appears in all groups of a lower number than its.
- # make signals appear only in their group of highest number.
- for s1, s2 in zip(r, r[1:]):
- s1 -= s2
- return r
-
-
-def _build_pnd(signals):
- groups = _build_signal_groups(signals)
- gpnds = [_build_pnd_for_group(n, gsignals) for n, gsignals in enumerate(groups)]
-
- pnd = dict()
- for gn, gpnd in enumerate(gpnds):
- for signal, name in gpnd.items():
- result = name
- cur_gn = gn
- cur_signal = signal
- while cur_signal.related is not None:
- cur_signal = cur_signal.related
- cur_gn -= 1
- result = gpnds[cur_gn][cur_signal] + "_" + result
- pnd[signal] = result
-
- return pnd
-
-
-def build_namespace(signals, reserved_keywords=set()):
- pnd = _build_pnd(signals)
- ns = Namespace(pnd, reserved_keywords)
- # register signals with name_override
- swno = {signal for signal in signals if signal.name_override is not None}
- for signal in sorted(swno, key=lambda x: x.duid):
- ns.get_name(signal)
- return ns
-
-
-class Namespace:
- def __init__(self, pnd, reserved_keywords=set()):
- self.counts = {k: 1 for k in reserved_keywords}
- self.sigs = {}
- self.pnd = pnd
- self.clock_domains = dict()
-
- def get_name(self, sig):
- if isinstance(sig, ClockSignal):
- sig = self.clock_domains[sig.cd].clk
- if isinstance(sig, ResetSignal):
- sig = self.clock_domains[sig.cd].rst
- if sig is None:
- raise ValueError("Attempted to obtain name of non-existent "
- "reset signal of domain "+sig.cd)
-
- if sig.name_override is not None:
- sig_name = sig.name_override
- else:
- sig_name = self.pnd[sig]
- try:
- n = self.sigs[sig]
- except KeyError:
- try:
- n = self.counts[sig_name]
- except KeyError:
- n = 0
- self.sigs[sig] = n
- self.counts[sig_name] = n + 1
- if n:
- return sig_name + "_" + str(n)
- else:
- return sig_name
+++ /dev/null
-from litex.gen.fhdl.structure import *
-from litex.gen.fhdl.specials import Memory, _MemoryPort, WRITE_FIRST, NO_CHANGE
-from litex.gen.fhdl.decorators import ModuleTransformer
-from litex.gen.util.misc import gcd_multiple
-from litex.gen.fhdl.bitcontainer import log2_int
-
-
-class FullMemoryWE(ModuleTransformer):
- def __init__(self):
- self.replacements = dict()
-
- def transform_fragment(self, i, f):
- newspecials = set()
-
- for orig in f.specials:
- if not isinstance(orig, Memory):
- newspecials.add(orig)
- continue
- global_granularity = gcd_multiple([p.we_granularity if p.we_granularity else orig.width for p in orig.ports])
- if global_granularity == orig.width:
- newspecials.add(orig) # nothing to do
- else:
- newmems = []
- for i in range(orig.width//global_granularity):
- if orig.init is None:
- newinit = None
- else:
- newinit = [(v >> i*global_granularity) & (2**global_granularity - 1) for v in orig.init]
- newmem = Memory(global_granularity, orig.depth, newinit, orig.name_override + "_grain" + str(i))
- newspecials.add(newmem)
- newmems.append(newmem)
- for port in orig.ports:
- port_granularity = port.we_granularity if port.we_granularity else orig.width
- newport = _MemoryPort(
- adr=port.adr,
-
- dat_r=port.dat_r[i*global_granularity:(i+1)*global_granularity] if port.dat_r is not None else None,
- we=port.we[i*global_granularity//port_granularity] if port.we is not None else None,
- dat_w=port.dat_w[i*global_granularity:(i+1)*global_granularity] if port.dat_w is not None else None,
-
- async_read=port.async_read,
- re=port.re,
- we_granularity=0,
- mode=port.mode,
- clock_domain=port.clock.cd)
- newmem.ports.append(newport)
- newspecials.add(newport)
- self.replacements[orig] = newmems
-
- f.specials = newspecials
- for oldmem in self.replacements.keys():
- f.specials -= set(oldmem.ports)
-
-
-class MemoryToArray(ModuleTransformer):
- def __init__(self):
- self.replacements = dict()
-
- def transform_fragment(self, i, f):
- newspecials = set()
- processed_ports = set()
-
- for mem in f.specials:
- if not isinstance(mem, Memory):
- newspecials.add(mem)
- continue
-
- storage = Array()
- self.replacements[mem] = storage
- init = []
- if mem.init is not None:
- init = mem.init
- for d in init:
- mem_storage = Signal(mem.width, reset=d)
- storage.append(mem_storage)
- for _ in range(mem.depth-len(init)):
- mem_storage = Signal(mem.width)
- storage.append(mem_storage)
-
- for port in mem.ports:
- try:
- sync = f.sync[port.clock.cd]
- except KeyError:
- sync = f.sync[port.clock.cd] = []
-
- # read
- if port.async_read:
- f.comb.append(port.dat_r.eq(storage[port.adr]))
- else:
- if port.mode == WRITE_FIRST and port.we is not None:
- adr_reg = Signal.like(port.adr)
- rd_stmt = adr_reg.eq(port.adr)
- f.comb.append(port.dat_r.eq(storage[adr_reg]))
- elif port.mode == NO_CHANGE and port.we is not None:
- rd_stmt = If(~port.we, port.dat_r.eq(storage[port.adr]))
- else: # READ_FIRST or port.we is None, simplest case
- rd_stmt = port.dat_r.eq(storage[port.adr])
- if port.re is None:
- sync.append(rd_stmt)
- else:
- sync.append(If(port.re, rd_stmt))
-
- # write
- if port.we is not None:
- if port.we_granularity:
- n = mem.width//port.we_granularity
- for i in range(n):
- m = i*port.we_granularity
- M = (i+1)*port.we_granularity
- sync.append(If(port.we[i],
- storage[port.adr][m:M].eq(port.dat_w[m:M])))
- else:
- sync.append(If(port.we,
- storage[port.adr].eq(port.dat_w)))
-
- processed_ports.add(port)
-
- newspecials -= processed_ports
- f.specials = newspecials
-
-
-class SplitMemory(ModuleTransformer):
- """Split memories with depths that are not powers of two into smaller
- power-of-two memories.
-
- This prevents toolchains from rounding up and wasting resources."""
-
- def transform_fragment(self, i, f):
- old_specials, f.specials = f.specials, set()
- old_ports = set()
-
- for old in old_specials:
- if not isinstance(old, Memory):
- f.specials.add(old)
- continue
- try:
- log2_int(old.depth, need_pow2=True)
- f.specials.add(old)
- except ValueError:
- new, comb, sync = self._split_mem(old)
- old_ports |= set(old.ports)
- f.specials.update(new)
- f.comb += comb
- for cd, sy in sync.items():
- s = f.sync.setdefault(cd, [])
- s += sy
- f.specials -= old_ports
-
- def _split_mem(self, mem):
- depths = [1 << i for i in range(log2_int(mem.depth, need_pow2=False))
- if mem.depth & (1 << i)]
- depths.reverse()
- inits = None
- if mem.init is not None:
- inits = list(mem.init)
- mems = []
- for i, depth in enumerate(depths):
- init = None
- if inits is not None:
- init = inits[:depth]
- del inits[:depth]
- name = "{}_part{}".format(mem.name_override, i)
- mems.append(Memory(width=mem.width, depth=depth,
- init=init, name=name))
- ports = []
- comb = []
- sync = {}
- for port in mem.ports:
- p, c, s = self._split_port(port, mems)
- ports += p
- comb += c
- sy = sync.setdefault(port.clock.cd, [])
- sy += s
- return mems + ports, comb, sync
-
- def _split_port(self, port, mems):
- ports = [mem.get_port(write_capable=port.we is not None,
- async_read=port.async_read,
- has_re=port.re is not None,
- we_granularity=port.we_granularity,
- mode=port.mode,
- clock_domain=port.clock.cd)
- for mem in mems]
-
- sel = Signal(max=len(ports), reset=len(ports) - 1)
- sel_r = Signal.like(sel)
- eq = sel_r.eq(sel)
- if port.re is not None:
- eq = If(port.re, eq)
- comb, sync = [], []
- if port.async_read:
- comb += [eq]
- else:
- sync += [eq]
- comb += reversed([If(~port.adr[len(p.adr)], sel.eq(i))
- for i, p in enumerate(ports)])
- comb += [p.adr.eq(port.adr) for p in ports]
- comb.append(port.dat_r.eq(Array([p.dat_r for p in ports])[sel_r]))
- if port.we is not None:
- comb.append(Array([p.we for p in ports])[sel].eq(port.we))
- comb += [p.dat_w.eq(port.dat_w) for p in ports]
- if port.re is not None:
- comb += [p.re.eq(port.re) for p in ports]
- return ports, comb, sync
+++ /dev/null
-from operator import itemgetter
-
-from litex.gen.fhdl.structure import *
-from litex.gen.fhdl.structure import _Value
-from litex.gen.fhdl.bitcontainer import bits_for, value_bits_sign
-from litex.gen.fhdl.tools import *
-from litex.gen.fhdl.tracer import get_obj_var_name
-from litex.gen.fhdl.verilog import _printexpr as verilog_printexpr
-
-
-__all__ = ["TSTriple", "Instance", "Memory",
- "READ_FIRST", "WRITE_FIRST", "NO_CHANGE"]
-
-
-class Special(DUID):
- def iter_expressions(self):
- for x in []:
- yield x
-
- def rename_clock_domain(self, old, new):
- for obj, attr, direction in self.iter_expressions():
- rename_clock_domain_expr(getattr(obj, attr), old, new)
-
- def list_clock_domains(self):
- r = set()
- for obj, attr, direction in self.iter_expressions():
- r |= list_clock_domains_expr(getattr(obj, attr))
- return r
-
- def list_ios(self, ins, outs, inouts):
- r = set()
- for obj, attr, direction in self.iter_expressions():
- if (direction == SPECIAL_INPUT and ins) \
- or (direction == SPECIAL_OUTPUT and outs) \
- or (direction == SPECIAL_INOUT and inouts):
- signals = list_signals(getattr(obj, attr))
- r.update(signals)
- return r
-
-
-class Tristate(Special):
- def __init__(self, target, o, oe, i=None):
- Special.__init__(self)
- self.target = wrap(target)
- self.o = wrap(o)
- self.oe = wrap(oe)
- self.i = wrap(i) if i is not None else None
-
- def iter_expressions(self):
- for attr, target_context in [
- ("target", SPECIAL_INOUT),
- ("o", SPECIAL_INPUT),
- ("oe", SPECIAL_INPUT),
- ("i", SPECIAL_OUTPUT)]:
- if getattr(self, attr) is not None:
- yield self, attr, target_context
-
- @staticmethod
- def emit_verilog(tristate, ns, add_data_file):
- def pe(e):
- return verilog_printexpr(ns, e)[0]
- w, s = value_bits_sign(tristate.target)
- r = "assign " + pe(tristate.target) + " = " \
- + pe(tristate.oe) + " ? " + pe(tristate.o) \
- + " : " + str(w) + "'bz;\n"
- if tristate.i is not None:
- r += "assign " + pe(tristate.i) + " = " + pe(tristate.target) + ";\n"
- r += "\n"
- return r
-
-
-class TSTriple:
- def __init__(self, bits_sign=None, min=None, max=None, reset_o=0, reset_oe=0):
- self.o = Signal(bits_sign, min=min, max=max, reset=reset_o)
- self.oe = Signal(reset=reset_oe)
- self.i = Signal(bits_sign, min=min, max=max)
-
- def get_tristate(self, target):
- return Tristate(target, self.o, self.oe, self.i)
-
-
-class Instance(Special):
- class _IO:
- def __init__(self, name, expr=None):
- self.name = name
- if expr is None:
- expr = Signal()
- self.expr = wrap(expr)
- class Input(_IO):
- pass
- class Output(_IO):
- pass
- class InOut(_IO):
- pass
- class Parameter:
- def __init__(self, name, value):
- self.name = name
- if isinstance(value, (int, bool)):
- value = Constant(value)
- self.value = value
- class PreformattedParam(str):
- pass
-
- def __init__(self, of, *items, name="", synthesis_directive=None,
- attr=None, **kwargs):
- Special.__init__(self)
- self.of = of
- if name:
- self.name_override = name
- else:
- self.name_override = of
- self.items = list(items)
- self.synthesis_directive = synthesis_directive
- if attr is None:
- attr = set()
- self.attr = attr
- for k, v in sorted(kwargs.items(), key=itemgetter(0)):
- try:
- item_type, item_name = k.split("_", maxsplit=1)
- except ValueError:
- raise TypeError("Wrong format for value '" + str(k) +
- "', format should be 'type_name'")
-
- item_class = {
- "i": Instance.Input,
- "o": Instance.Output,
- "io": Instance.InOut,
- "p": Instance.Parameter
- }[item_type]
- self.items.append(item_class(item_name, v))
-
- def get_io(self, name):
- for item in self.items:
- if isinstance(item, Instance._IO) and item.name == name:
- return item.expr
-
- def iter_expressions(self):
- for item in self.items:
- if isinstance(item, Instance.Input):
- yield item, "expr", SPECIAL_INPUT
- elif isinstance(item, Instance.Output):
- yield item, "expr", SPECIAL_OUTPUT
- elif isinstance(item, Instance.InOut):
- yield item, "expr", SPECIAL_INOUT
-
- @staticmethod
- def emit_verilog(instance, ns, add_data_file):
- r = instance.of + " "
- parameters = list(filter(lambda i: isinstance(i, Instance.Parameter), instance.items))
- if parameters:
- r += "#(\n"
- firstp = True
- for p in parameters:
- if not firstp:
- r += ",\n"
- firstp = False
- r += "\t." + p.name + "("
- if isinstance(p.value, Constant):
- r += verilog_printexpr(ns, p.value)[0]
- elif isinstance(p.value, float):
- r += str(p.value)
- elif isinstance(p.value, Instance.PreformattedParam):
- r += p.value
- elif isinstance(p.value, str):
- r += "\"" + p.value + "\""
- else:
- raise TypeError
- r += ")"
- r += "\n) "
- r += ns.get_name(instance)
- if parameters: r += " "
- r += "(\n"
- firstp = True
- for p in instance.items:
- if isinstance(p, Instance._IO):
- name_inst = p.name
- name_design = verilog_printexpr(ns, p.expr)[0]
- if not firstp:
- r += ",\n"
- firstp = False
- r += "\t." + name_inst + "(" + name_design + ")"
- if not firstp:
- r += "\n"
-
- directives = instance.synthesis_directive
- if directives is None:
- directives = []
- elif type(directives) == str :
- directives = [directives,]
-
- r += ")";
- for directive in directives:
- r += "\n\t/* synthesis {} */".format(directive)
- r += ";\n\n"
-
- return r
-
-
-(READ_FIRST, WRITE_FIRST, NO_CHANGE) = range(3)
-
-
-class _MemoryPort(Special):
- def __init__(self, adr, dat_r, we=None, dat_w=None,
- async_read=False, re=None, we_granularity=0, mode=WRITE_FIRST,
- clock_domain="sys"):
- Special.__init__(self)
- self.adr = adr
- self.dat_r = dat_r
- self.we = we
- self.dat_w = dat_w
- self.async_read = async_read
- self.re = re
- self.we_granularity = we_granularity
- self.mode = mode
- self.clock = ClockSignal(clock_domain)
-
- def iter_expressions(self):
- for attr, target_context in [
- ("adr", SPECIAL_INPUT),
- ("we", SPECIAL_INPUT),
- ("dat_w", SPECIAL_INPUT),
- ("re", SPECIAL_INPUT),
- ("dat_r", SPECIAL_OUTPUT),
- ("clock", SPECIAL_INPUT)]:
- yield self, attr, target_context
-
- @staticmethod
- def emit_verilog(port, ns, add_data_file):
- return "" # done by parent Memory object
-
-
-class _MemoryLocation(_Value):
- def __init__(self, memory, index):
- _Value.__init__(self)
- self.memory = memory
- self.index = wrap(index)
-
-
-class Memory(Special):
- def __init__(self, width, depth, init=None, name=None):
- Special.__init__(self)
- self.width = width
- self.depth = depth
- self.ports = []
- self.init = init
- self.name_override = get_obj_var_name(name, "mem")
-
- def __getitem__(self, index):
- # simulation only
- return _MemoryLocation(self, index)
-
- def get_port(self, write_capable=False, async_read=False,
- has_re=False, we_granularity=0, mode=WRITE_FIRST,
- clock_domain="sys"):
- if we_granularity >= self.width:
- we_granularity = 0
- adr = Signal(max=self.depth)
- dat_r = Signal(self.width)
- if write_capable:
- if we_granularity:
- we = Signal(self.width//we_granularity)
- else:
- we = Signal()
- dat_w = Signal(self.width)
- else:
- we = None
- dat_w = None
- if has_re:
- re = Signal()
- else:
- re = None
- mp = _MemoryPort(adr, dat_r, we, dat_w,
- async_read, re, we_granularity, mode,
- clock_domain)
- self.ports.append(mp)
- return mp
-
- @staticmethod
- def emit_verilog(memory, ns, add_data_file):
- r = ""
- def gn(e):
- if isinstance(e, Memory):
- return ns.get_name(e)
- else:
- return verilog_printexpr(ns, e)[0]
- adrbits = bits_for(memory.depth-1)
-
- r += "reg [" + str(memory.width-1) + ":0] " \
- + gn(memory) \
- + "[0:" + str(memory.depth-1) + "];\n"
-
- adr_regs = {}
- data_regs = {}
- for port in memory.ports:
- if not port.async_read:
- if port.mode == WRITE_FIRST and port.we is not None:
- adr_reg = Signal(name_override="memadr")
- r += "reg [" + str(adrbits-1) + ":0] " \
- + gn(adr_reg) + ";\n"
- adr_regs[id(port)] = adr_reg
- else:
- data_reg = Signal(name_override="memdat")
- r += "reg [" + str(memory.width-1) + ":0] " \
- + gn(data_reg) + ";\n"
- data_regs[id(port)] = data_reg
-
- for port in memory.ports:
- r += "always @(posedge " + gn(port.clock) + ") begin\n"
- if port.we is not None:
- if port.we_granularity:
- n = memory.width//port.we_granularity
- for i in range(n):
- m = i*port.we_granularity
- M = (i+1)*port.we_granularity-1
- sl = "[" + str(M) + ":" + str(m) + "]"
- r += "\tif (" + gn(port.we) + "[" + str(i) + "])\n"
- r += "\t\t" + gn(memory) + "[" + gn(port.adr) + "]" + sl + " <= " + gn(port.dat_w) + sl + ";\n"
- else:
- r += "\tif (" + gn(port.we) + ")\n"
- r += "\t\t" + gn(memory) + "[" + gn(port.adr) + "] <= " + gn(port.dat_w) + ";\n"
- if not port.async_read:
- if port.mode == WRITE_FIRST and port.we is not None:
- rd = "\t" + gn(adr_regs[id(port)]) + " <= " + gn(port.adr) + ";\n"
- else:
- bassign = gn(data_regs[id(port)]) + " <= " + gn(memory) + "[" + gn(port.adr) + "];\n"
- if port.mode == READ_FIRST or port.we is None:
- rd = "\t" + bassign
- elif port.mode == NO_CHANGE:
- rd = "\tif (!" + gn(port.we) + ")\n" \
- + "\t\t" + bassign
- if port.re is None:
- r += rd
- else:
- r += "\tif (" + gn(port.re) + ")\n"
- r += "\t" + rd.replace("\n\t", "\n\t\t")
- r += "end\n\n"
-
- for port in memory.ports:
- if port.async_read:
- r += "assign " + gn(port.dat_r) + " = " + gn(memory) + "[" + gn(port.adr) + "];\n"
- else:
- if port.mode == WRITE_FIRST and port.we is not None:
- r += "assign " + gn(port.dat_r) + " = " + gn(memory) + "[" + gn(adr_regs[id(port)]) + "];\n"
- else:
- r += "assign " + gn(port.dat_r) + " = " + gn(data_regs[id(port)]) + ";\n"
- r += "\n"
-
- if memory.init is not None:
- content = ""
- for d in memory.init:
- content += "{:x}\n".format(d)
- memory_filename = add_data_file(gn(memory) + ".init", content)
-
- r += "initial begin\n"
- r += "\t$readmemh(\"" + memory_filename + "\", " + gn(memory) + ");\n"
- r += "end\n\n"
-
- return r
+++ /dev/null
-import builtins as _builtins
-import collections as _collections
-import re as _re
-
-from litex.gen.fhdl import tracer as _tracer
-from litex.gen.util.misc import flat_iteration as _flat_iteration
-
-
-class DUID:
- """Deterministic Unique IDentifier"""
- __next_uid = 0
- def __init__(self):
- self.duid = DUID.__next_uid
- DUID.__next_uid += 1
-
-
-class _Value(DUID):
- """Base class for operands
-
- Instances of `_Value` or its subclasses can be operands to
- arithmetic, comparison, bitwise, and logic operators.
- They can be assigned (:meth:`eq`) or indexed/sliced (using the usual
- Python indexing and slicing notation).
-
- Values created from integers have the minimum bit width to necessary to
- represent the integer.
- """
- def __bool__(self):
- # Special case: Constants and Signals are part of a set or used as
- # dictionary keys, and Python needs to check for equality.
- if isinstance(self, _Operator) and self.op == "==":
- a, b = self.operands
- if isinstance(a, Constant) and isinstance(b, Constant):
- return a.value == b.value
- if isinstance(a, Signal) and isinstance(b, Signal):
- return a is b
- if (isinstance(a, Constant) and isinstance(b, Signal)
- or isinstance(a, Signal) and isinstance(b, Constant)):
- return False
- raise TypeError("Attempted to convert Migen value to boolean")
-
- def __invert__(self):
- return _Operator("~", [self])
- def __neg__(self):
- return _Operator("-", [self])
-
- def __add__(self, other):
- return _Operator("+", [self, other])
- def __radd__(self, other):
- return _Operator("+", [other, self])
- def __sub__(self, other):
- return _Operator("-", [self, other])
- def __rsub__(self, other):
- return _Operator("-", [other, self])
- def __mul__(self, other):
- return _Operator("*", [self, other])
- def __rmul__(self, other):
- return _Operator("*", [other, self])
- def __lshift__(self, other):
- return _Operator("<<<", [self, other])
- def __rlshift__(self, other):
- return _Operator("<<<", [other, self])
- def __rshift__(self, other):
- return _Operator(">>>", [self, other])
- def __rrshift__(self, other):
- return _Operator(">>>", [other, self])
- def __and__(self, other):
- return _Operator("&", [self, other])
- def __rand__(self, other):
- return _Operator("&", [other, self])
- def __xor__(self, other):
- return _Operator("^", [self, other])
- def __rxor__(self, other):
- return _Operator("^", [other, self])
- def __or__(self, other):
- return _Operator("|", [self, other])
- def __ror__(self, other):
- return _Operator("|", [other, self])
-
- def __lt__(self, other):
- return _Operator("<", [self, other])
- def __le__(self, other):
- return _Operator("<=", [self, other])
- def __eq__(self, other):
- return _Operator("==", [self, other])
- def __ne__(self, other):
- return _Operator("!=", [self, other])
- def __gt__(self, other):
- return _Operator(">", [self, other])
- def __ge__(self, other):
- return _Operator(">=", [self, other])
-
- def __len__(self):
- from litex.gen.fhdl.bitcontainer import value_bits_sign
- return value_bits_sign(self)[0]
-
- def __getitem__(self, key):
- n = len(self)
- if isinstance(key, int):
- if key >= n:
- raise IndexError
- if key < 0:
- key += n
- return _Slice(self, key, key+1)
- elif isinstance(key, slice):
- start, stop, step = key.indices(n)
- if step != 1:
- return Cat(self[i] for i in range(start, stop, step))
- return _Slice(self, start, stop)
- else:
- raise TypeError("Cannot use type {} ({}) as key".format(
- type(key), repr(key)))
-
- def eq(self, r):
- """Assignment
-
- Parameters
- ----------
- r : _Value, in
- Value to be assigned.
-
- Returns
- -------
- _Assign
- Assignment statement that can be used in combinatorial or
- synchronous context.
- """
- return _Assign(self, r)
-
- def __hash__(self):
- raise TypeError("unhashable type: '{}'".format(type(self).__name__))
-
-
-def wrap(value):
- """Ensures that the passed object is a Migen value. Booleans and integers
- are automatically wrapped into ``Constant``."""
- if isinstance(value, (bool, int)):
- value = Constant(value)
- if not isinstance(value, _Value):
- raise TypeError("Object '{}' of type {} is not a Migen value"
- .format(value, type(value)))
- return value
-
-
-class _Operator(_Value):
- def __init__(self, op, operands):
- _Value.__init__(self)
- self.op = op
- self.operands = [wrap(o) for o in operands]
-
-
-def Mux(sel, val1, val0):
- """Multiplex between two values
-
- Parameters
- ----------
- sel : _Value(1), in
- Selector.
- val1 : _Value(N), in
- val0 : _Value(N), in
- Input values.
-
- Returns
- -------
- _Value(N), out
- Output `_Value`. If `sel` is asserted, the Mux returns
- `val1`, else `val0`.
- """
- return _Operator("m", [sel, val1, val0])
-
-
-class _Slice(_Value):
- def __init__(self, value, start, stop):
- _Value.__init__(self)
- if not isinstance(start, int) or not isinstance(stop, int):
- raise TypeError("Slice boundaries must be integers")
- self.value = wrap(value)
- self.start = start
- self.stop = stop
-
-
-class Cat(_Value):
- """Concatenate values
-
- Form a compound `_Value` from several smaller ones by concatenation.
- The first argument occupies the lower bits of the result.
- The return value can be used on either side of an assignment, that
- is, the concatenated value can be used as an argument on the RHS or
- as a target on the LHS. If it is used on the LHS, it must solely
- consist of `Signal` s, slices of `Signal` s, and other concatenations
- meeting these properties. The bit length of the return value is the sum of
- the bit lengths of the arguments::
-
- len(Cat(args)) == sum(len(arg) for arg in args)
-
- Parameters
- ----------
- *args : _Values or iterables of _Values, inout
- `_Value` s to be concatenated.
-
- Returns
- -------
- Cat, inout
- Resulting `_Value` obtained by concatentation.
- """
- def __init__(self, *args):
- _Value.__init__(self)
- self.l = [wrap(v) for v in _flat_iteration(args)]
-
-
-class Replicate(_Value):
- """Replicate a value
-
- An input value is replicated (repeated) several times
- to be used on the RHS of assignments::
-
- len(Replicate(s, n)) == len(s)*n
-
- Parameters
- ----------
- v : _Value, in
- Input value to be replicated.
- n : int
- Number of replications.
-
- Returns
- -------
- Replicate, out
- Replicated value.
- """
- def __init__(self, v, n):
- _Value.__init__(self)
- if not isinstance(n, int) or n < 0:
- raise TypeError("Replication count must be a positive integer")
- self.v = wrap(v)
- self.n = n
-
-
-class Constant(_Value):
- """A constant, HDL-literal integer `_Value`
-
- Parameters
- ----------
- value : int
- bits_sign : int or tuple or None
- Either an integer `bits` or a tuple `(bits, signed)`
- specifying the number of bits in this `Constant` and whether it is
- signed (can represent negative values). `bits_sign` defaults
- to the minimum width and signedness of `value`.
- """
- def __init__(self, value, bits_sign=None):
- from litex.gen.fhdl.bitcontainer import bits_for
-
- _Value.__init__(self)
-
- self.value = int(value)
- if bits_sign is None:
- bits_sign = bits_for(self.value), self.value < 0
- elif isinstance(bits_sign, int):
- bits_sign = bits_sign, self.value < 0
- self.nbits, self.signed = bits_sign
- if not isinstance(self.nbits, int) or self.nbits <= 0:
- raise TypeError("Width must be a strictly positive integer")
-
- def __hash__(self):
- return self.value
-
-
-C = Constant # shorthand
-
-
-class Signal(_Value):
- """A `_Value` that can change
-
- The `Signal` object represents a value that is expected to change
- in the circuit. It does exactly what Verilog's `wire` and
- `reg` and VHDL's `signal` do.
-
- A `Signal` can be indexed to access a subset of its bits. Negative
- indices (`signal[-1]`) and the extended Python slicing notation
- (`signal[start:stop:step]`) are supported.
- The indices 0 and -1 are the least and most significant bits
- respectively.
-
- Parameters
- ----------
- bits_sign : int or tuple
- Either an integer `bits` or a tuple `(bits, signed)`
- specifying the number of bits in this `Signal` and whether it is
- signed (can represent negative values). `signed` defaults to
- `False`.
- name : str or None
- Name hint for this signal. If `None` (default) the name is
- inferred from the variable name this `Signal` is assigned to.
- Name collisions are automatically resolved by prepending
- names of objects that contain this `Signal` and by
- appending integer sequences.
- variable : bool
- Deprecated.
- reset : int
- Reset (synchronous) or default (combinatorial) value.
- When this `Signal` is assigned to in synchronous context and the
- corresponding clock domain is reset, the `Signal` assumes the
- given value. When this `Signal` is unassigned in combinatorial
- context (due to conditional assignments not being taken),
- the `Signal` assumes its `reset` value. Defaults to 0.
- reset_less : bool
- If `True`, do not generate reset logic for this `Signal` in
- synchronous statements. The `reset` value is only used as a
- combinatorial default or as the initial value. Defaults to `False`.
- name_override : str or None
- Do not use the inferred name but the given one.
- min : int or None
- max : int or None
- If `bits_sign` is `None`, the signal bit width and signedness are
- determined by the integer range given by `min` (inclusive,
- defaults to 0) and `max` (exclusive, defaults to 2).
- related : Signal or None
- attr : set of synthesis attributes
- """
- _name_re = _re.compile(r"^[a-zA-Z_][a-zA-Z0-9_]*$")
-
- def __init__(self, bits_sign=None, name=None, variable=False, reset=0,
- reset_less=False, name_override=None, min=None, max=None,
- related=None, attr=None):
- from litex.gen.fhdl.bitcontainer import bits_for
-
- _Value.__init__(self)
-
- for n in [name, name_override]:
- if n is not None and not self._name_re.match(n):
- raise ValueError("Signal name {} is not a valid Python identifier"
- .format(repr(n)))
-
- # determine number of bits and signedness
- if bits_sign is None:
- if min is None:
- min = 0
- if max is None:
- max = 2
- max -= 1 # make both bounds inclusive
- assert(min < max)
- self.signed = min < 0 or max < 0
- self.nbits = _builtins.max(bits_for(min, self.signed), bits_for(max, self.signed))
- else:
- assert(min is None and max is None)
- if isinstance(bits_sign, tuple):
- self.nbits, self.signed = bits_sign
- else:
- self.nbits, self.signed = bits_sign, False
- if isinstance(reset, (bool, int)):
- reset = Constant(reset, (self.nbits, self.signed))
- if not isinstance(self.nbits, int) or self.nbits <= 0:
- raise ValueError("Signal width must be a strictly positive integer")
- if attr is None:
- attr = set()
-
- self.variable = variable # deprecated
- self.reset = reset
- self.reset_less = reset_less
- self.name_override = name_override
- self.backtrace = _tracer.trace_back(name)
- self.related = related
- self.attr = attr
-
- def __setattr__(self, k, v):
- if k == "reset":
- v = wrap(v)
- _Value.__setattr__(self, k, v)
-
- def __repr__(self):
- return "<Signal " + (self.backtrace[-1][0] or "anonymous") + " at " + hex(id(self)) + ">"
-
- @classmethod
- def like(cls, other, **kwargs):
- """Create Signal based on another.
-
- Parameters
- ----------
- other : _Value
- Object to base this Signal on.
-
- See `migen.fhdl.bitcontainer.value_bits_sign` for details.
- """
- from litex.gen.fhdl.bitcontainer import value_bits_sign
- kw = dict(bits_sign=value_bits_sign(other))
- if isinstance(other, cls):
- kw.update(variable=other.variable,
- reset=other.reset.value, reset_less=other.reset_less,
- related=other.related, attr=set(other.attr))
- kw.update(kwargs)
- return cls(**kw)
-
- def __hash__(self):
- return self.duid
-
-
-class ClockSignal(_Value):
- """Clock signal for a given clock domain
-
- `ClockSignal` s for a given clock domain can be retrieved multiple
- times. They all ultimately refer to the same signal.
-
- Parameters
- ----------
- cd : str
- Clock domain to obtain a clock signal for. Defaults to `"sys"`.
- """
- def __init__(self, cd="sys"):
- _Value.__init__(self)
- if not isinstance(cd, str):
- raise TypeError("Argument of ClockSignal must be a string")
- self.cd = cd
-
-
-class ResetSignal(_Value):
- """Reset signal for a given clock domain
-
- `ResetSignal` s for a given clock domain can be retrieved multiple
- times. They all ultimately refer to the same signal.
-
- Parameters
- ----------
- cd : str
- Clock domain to obtain a reset signal for. Defaults to `"sys"`.
- allow_reset_less : bool
- If the clock domain is resetless, return 0 instead of reporting an
- error.
- """
- def __init__(self, cd="sys", allow_reset_less=False):
- _Value.__init__(self)
- if not isinstance(cd, str):
- raise TypeError("Argument of ResetSignal must be a string")
- self.cd = cd
- self.allow_reset_less = allow_reset_less
-
-
-# statements
-
-
-class _Statement:
- pass
-
-
-class _Assign(_Statement):
- def __init__(self, l, r):
- self.l = wrap(l)
- self.r = wrap(r)
-
-
-def _check_statement(s):
- if isinstance(s, _collections.Iterable):
- return all(_check_statement(ss) for ss in s)
- else:
- return isinstance(s, _Statement)
-
-
-class If(_Statement):
- """Conditional execution of statements
-
- Parameters
- ----------
- cond : _Value(1), in
- Condition
- *t : Statements
- Statements to execute if `cond` is asserted.
-
- Examples
- --------
- >>> a = Signal()
- >>> b = Signal()
- >>> c = Signal()
- >>> d = Signal()
- >>> If(a,
- ... b.eq(1)
- ... ).Elif(c,
- ... b.eq(0)
- ... ).Else(
- ... b.eq(d)
- ... )
- """
- def __init__(self, cond, *t):
- if not _check_statement(t):
- raise TypeError("Not all test body objects are Migen statements")
- self.cond = wrap(cond)
- self.t = list(t)
- self.f = []
-
- def Else(self, *f):
- """Add an `else` conditional block
-
- Parameters
- ----------
- *f : Statements
- Statements to execute if all previous conditions fail.
- """
- if not _check_statement(f):
- raise TypeError("Not all test body objects are Migen statements")
- _insert_else(self, list(f))
- return self
-
- def Elif(self, cond, *t):
- """Add an `else if` conditional block
-
- Parameters
- ----------
- cond : _Value(1), in
- Condition
- *t : Statements
- Statements to execute if previous conditions fail and `cond`
- is asserted.
- """
- _insert_else(self, [If(cond, *t)])
- return self
-
-
-def _insert_else(obj, clause):
- o = obj
- while o.f:
- assert(len(o.f) == 1)
- assert(isinstance(o.f[0], If))
- o = o.f[0]
- o.f = clause
-
-
-class Case(_Statement):
- """Case/Switch statement
-
- Parameters
- ----------
- test : _Value, in
- Selector value used to decide which block to execute
- cases : dict
- Dictionary of cases. The keys are numeric constants to compare
- with `test`. The values are statements to be executed the
- corresponding key matches `test`. The dictionary may contain a
- string key `"default"` to mark a fall-through case that is
- executed if no other key matches.
-
- Examples
- --------
- >>> a = Signal()
- >>> b = Signal()
- >>> Case(a, {
- ... 0: b.eq(1),
- ... 1: b.eq(0),
- ... "default": b.eq(0),
- ... })
- """
- def __init__(self, test, cases):
- self.test = wrap(test)
- self.cases = dict()
- for k, v in cases.items():
- if isinstance(k, (bool, int)):
- k = Constant(k)
- if (not isinstance(k, Constant)
- and not (isinstance(k, str) and k == "default")):
- raise TypeError("Case object is not a Migen constant")
- if not isinstance(v, _collections.Iterable):
- v = [v]
- if not _check_statement(v):
- raise TypeError("Not all objects for case {} "
- "are Migen statements".format(k))
- self.cases[k] = v
-
- def makedefault(self, key=None):
- """Mark a key as the default case
-
- Deletes/substitutes any previously existing default case.
-
- Parameters
- ----------
- key : int, Constant or None
- Key to use as default case if no other key matches.
- By default, the largest key is the default key.
- """
- if key is None:
- for choice in self.cases.keys():
- if (key is None
- or (isinstance(choice, str) and choice == "default")
- or choice.value > key.value):
- key = choice
- if not isinstance(key, str) or key != "default":
- key = wrap(key)
- stmts = self.cases[key]
- del self.cases[key]
- self.cases["default"] = stmts
- return self
-
-
-# arrays
-
-
-class _ArrayProxy(_Value):
- def __init__(self, choices, key):
- _Value.__init__(self)
- self.choices = []
- for c in choices:
- if isinstance(c, (bool, int)):
- c = Constant(c)
- self.choices.append(c)
- self.key = key
-
- def __getattr__(self, attr):
- return _ArrayProxy([getattr(choice, attr) for choice in self.choices],
- self.key)
-
- def __getitem__(self, key):
- return _ArrayProxy([choice.__getitem__(key) for choice in self.choices],
- self.key)
-
-
-class Array(list):
- """Addressable multiplexer
-
- An array is created from an iterable of values and indexed using the
- usual Python simple indexing notation (no negative indices or
- slices). It can be indexed by numeric constants, `_Value` s, or
- `Signal` s.
-
- The result of indexing the array is a proxy for the entry at the
- given index that can be used on either RHS or LHS of assignments.
-
- An array can be indexed multiple times.
-
- Multidimensional arrays are supported by packing inner arrays into
- outer arrays.
-
- Parameters
- ----------
- values : iterable of ints, _Values, Signals
- Entries of the array. Each entry can be a numeric constant, a
- `Signal` or a `Record`.
-
- Examples
- --------
- >>> a = Array(range(10))
- >>> b = Signal(max=10)
- >>> c = Signal(max=10)
- >>> b.eq(a[9 - c])
- """
- def __getitem__(self, key):
- if isinstance(key, Constant):
- return list.__getitem__(self, key.value)
- elif isinstance(key, _Value):
- return _ArrayProxy(self, key)
- else:
- return list.__getitem__(self, key)
-
-
-class ClockDomain:
- """Synchronous domain
-
- Parameters
- ----------
- name : str or None
- Domain name. If None (the default) the name is inferred from the
- variable name this `ClockDomain` is assigned to (stripping any
- `"cd_"` prefix).
- reset_less : bool
- The domain does not use a reset signal. Registers within this
- domain are still all initialized to their reset state once, e.g.
- through Verilog `"initial"` statements.
-
- Attributes
- ----------
- clk : Signal, inout
- The clock for this domain. Can be driven or used to drive other
- signals (preferably in combinatorial context).
- rst : Signal or None, inout
- Reset signal for this domain. Can be driven or used to drive.
- """
- def __init__(self, name=None, reset_less=False):
- self.name = _tracer.get_obj_var_name(name)
- if self.name is None:
- raise ValueError("Cannot extract clock domain name from code, need to specify.")
- if self.name.startswith("cd_"):
- self.name = self.name[3:]
- if self.name[0].isdigit():
- raise ValueError("Clock domain name cannot start with a number.")
- self.clk = Signal(name_override=self.name + "_clk")
- if reset_less:
- self.rst = None
- else:
- self.rst = Signal(name_override=self.name + "_rst")
-
- def rename(self, new_name):
- """Rename the clock domain
-
- Parameters
- ----------
- new_name : str
- New name
- """
- self.name = new_name
- self.clk.name_override = new_name + "_clk"
- if self.rst is not None:
- self.rst.name_override = new_name + "_rst"
-
-
-class _ClockDomainList(list):
- def __getitem__(self, key):
- if isinstance(key, str):
- for cd in self:
- if cd.name == key:
- return cd
- raise KeyError(key)
- else:
- return list.__getitem__(self, key)
-
- def __contains__(self, cd_or_name):
- if isinstance(cd_or_name, str):
- for cd in self:
- if cd.name == cd_or_name:
- return True
- return False
- else:
- return list.__contains__(self, cd_or_name)
-
-
-(SPECIAL_INPUT, SPECIAL_OUTPUT, SPECIAL_INOUT) = range(3)
-
-
-class _Fragment:
- def __init__(self, comb=None, sync=None, specials=None, clock_domains=None):
- if comb is None: comb = []
- if sync is None: sync = dict()
- if specials is None: specials = set()
- if clock_domains is None: clock_domains = _ClockDomainList()
-
- self.comb = comb
- self.sync = sync
- self.specials = specials
- self.clock_domains = _ClockDomainList(clock_domains)
-
- def __add__(self, other):
- newsync = _collections.defaultdict(list)
- for k, v in self.sync.items():
- newsync[k] = v[:]
- for k, v in other.sync.items():
- newsync[k].extend(v)
- return _Fragment(self.comb + other.comb, newsync,
- self.specials | other.specials,
- self.clock_domains + other.clock_domains)
-
- def __iadd__(self, other):
- newsync = _collections.defaultdict(list)
- for k, v in self.sync.items():
- newsync[k] = v[:]
- for k, v in other.sync.items():
- newsync[k].extend(v)
- self.comb += other.comb
- self.sync = newsync
- self.specials |= other.specials
- self.clock_domains += other.clock_domains
- return self
-
-
-class Display(_Statement):
- def __init__(self, s, *args):
- self.s = s
- self.args = args
-
-class Finish(_Statement):
- pass
+++ /dev/null
-from litex.gen.fhdl.structure import *
-from litex.gen.fhdl.structure import _Slice, _Assign, _Fragment
-from litex.gen.fhdl.visit import NodeVisitor, NodeTransformer
-from litex.gen.fhdl.bitcontainer import value_bits_sign
-from litex.gen.util.misc import flat_iteration
-
-
-class _SignalLister(NodeVisitor):
- def __init__(self):
- self.output_list = set()
-
- def visit_Signal(self, node):
- self.output_list.add(node)
-
-
-class _TargetLister(NodeVisitor):
- def __init__(self):
- self.output_list = set()
- self.target_context = False
-
- def visit_Signal(self, node):
- if self.target_context:
- self.output_list.add(node)
-
- def visit_Assign(self, node):
- self.target_context = True
- self.visit(node.l)
- self.target_context = False
-
- def visit_ArrayProxy(self, node):
- for choice in node.choices:
- self.visit(choice)
-
-
-class _InputLister(NodeVisitor):
- def __init__(self):
- self.output_list = set()
-
- def visit_Signal(self, node):
- self.output_list.add(node)
-
- def visit_Assign(self, node):
- self.visit(node.r)
-
-
-def list_signals(node):
- lister = _SignalLister()
- lister.visit(node)
- return lister.output_list
-
-
-def list_targets(node):
- lister = _TargetLister()
- lister.visit(node)
- return lister.output_list
-
-
-def list_inputs(node):
- lister = _InputLister()
- lister.visit(node)
- return lister.output_list
-
-
-def _resort_statements(ol):
- return [statement for i, statement in
- sorted(ol, key=lambda x: x[0])]
-
-
-def group_by_targets(sl):
- groups = []
- seen = set()
- for order, stmt in enumerate(flat_iteration(sl)):
- targets = set(list_targets(stmt))
- group = [(order, stmt)]
- disjoint = targets.isdisjoint(seen)
- seen |= targets
- if not disjoint:
- groups, old_groups = [], groups
- for old_targets, old_group in old_groups:
- if targets.isdisjoint(old_targets):
- groups.append((old_targets, old_group))
- else:
- targets |= old_targets
- group += old_group
- groups.append((targets, group))
- return [(targets, _resort_statements(stmts))
- for targets, stmts in groups]
-
-
-def list_special_ios(f, ins, outs, inouts):
- r = set()
- for special in f.specials:
- r |= special.list_ios(ins, outs, inouts)
- return r
-
-
-class _ClockDomainLister(NodeVisitor):
- def __init__(self):
- self.clock_domains = set()
-
- def visit_ClockSignal(self, node):
- self.clock_domains.add(node.cd)
-
- def visit_ResetSignal(self, node):
- self.clock_domains.add(node.cd)
-
- def visit_clock_domains(self, node):
- for clockname, statements in node.items():
- self.clock_domains.add(clockname)
- self.visit(statements)
-
-
-def list_clock_domains_expr(f):
- cdl = _ClockDomainLister()
- cdl.visit(f)
- return cdl.clock_domains
-
-
-def list_clock_domains(f):
- r = list_clock_domains_expr(f)
- for special in f.specials:
- r |= special.list_clock_domains()
- for cd in f.clock_domains:
- r.add(cd.name)
- return r
-
-
-def is_variable(node):
- if isinstance(node, Signal):
- return node.variable
- elif isinstance(node, _Slice):
- return is_variable(node.value)
- elif isinstance(node, Cat):
- arevars = list(map(is_variable, node.l))
- r = arevars[0]
- for x in arevars:
- if x != r:
- raise TypeError
- return r
- else:
- raise TypeError
-
-
-def generate_reset(rst, sl):
- targets = list_targets(sl)
- return [t.eq(t.reset) for t in sorted(targets, key=lambda x: x.duid)
- if not t.reset_less]
-
-
-def insert_reset(rst, sl):
- return sl + [If(rst, *generate_reset(rst, sl))]
-
-
-def insert_resets(f):
- newsync = dict()
- for k, v in f.sync.items():
- if f.clock_domains[k].rst is not None:
- newsync[k] = insert_reset(ResetSignal(k), v)
- else:
- newsync[k] = v
- f.sync = newsync
-
-
-class _Lowerer(NodeTransformer):
- def __init__(self):
- self.target_context = False
- self.extra_stmts = []
- self.comb = []
-
- def visit_Assign(self, node):
- old_target_context, old_extra_stmts = self.target_context, self.extra_stmts
- self.extra_stmts = []
-
- self.target_context = True
- lhs = self.visit(node.l)
- self.target_context = False
- rhs = self.visit(node.r)
- r = _Assign(lhs, rhs)
- if self.extra_stmts:
- r = [r] + self.extra_stmts
-
- self.target_context, self.extra_stmts = old_target_context, old_extra_stmts
- return r
-
-
-# Basics are FHDL structure elements that back-ends are not required to support
-# but can be expressed in terms of other elements (lowered) before conversion.
-class _BasicLowerer(_Lowerer):
- def __init__(self, clock_domains):
- self.clock_domains = clock_domains
- _Lowerer.__init__(self)
-
- def visit_ArrayProxy(self, node):
- # TODO: rewrite without variables
- array_muxed = Signal(value_bits_sign(node), variable=True)
- if self.target_context:
- k = self.visit(node.key)
- cases = {}
- for n, choice in enumerate(node.choices):
- cases[n] = [self.visit_Assign(_Assign(choice, array_muxed))]
- self.extra_stmts.append(Case(k, cases).makedefault())
- else:
- cases = dict((n, _Assign(array_muxed, self.visit(choice)))
- for n, choice in enumerate(node.choices))
- self.comb.append(Case(self.visit(node.key), cases).makedefault())
- return array_muxed
-
- def visit_ClockSignal(self, node):
- return self.clock_domains[node.cd].clk
-
- def visit_ResetSignal(self, node):
- rst = self.clock_domains[node.cd].rst
- if rst is None:
- if node.allow_reset_less:
- return 0
- else:
- raise ValueError("Attempted to get reset signal of resetless"
- " domain '{}'".format(node.cd))
- else:
- return rst
-
-
-class _ComplexSliceLowerer(_Lowerer):
- def visit_Slice(self, node):
- if not isinstance(node.value, Signal):
- slice_proxy = Signal(value_bits_sign(node.value))
- if self.target_context:
- a = _Assign(node.value, slice_proxy)
- else:
- a = _Assign(slice_proxy, node.value)
- self.comb.append(self.visit_Assign(a))
- node = _Slice(slice_proxy, node.start, node.stop)
- return NodeTransformer.visit_Slice(self, node)
-
-
-def _apply_lowerer(l, f):
- f = l.visit(f)
- f.comb += l.comb
-
- for special in sorted(f.specials, key=lambda s: s.duid):
- for obj, attr, direction in special.iter_expressions():
- if direction != SPECIAL_INOUT:
- # inouts are only supported by Migen when connected directly to top-level
- # in this case, they are Signal and never need lowering
- l.comb = []
- l.target_context = direction != SPECIAL_INPUT
- l.extra_stmts = []
- expr = getattr(obj, attr)
- expr = l.visit(expr)
- setattr(obj, attr, expr)
- f.comb += l.comb + l.extra_stmts
-
- return f
-
-
-def lower_basics(f):
- return _apply_lowerer(_BasicLowerer(f.clock_domains), f)
-
-
-def lower_complex_slices(f):
- return _apply_lowerer(_ComplexSliceLowerer(), f)
-
-
-class _ClockDomainRenamer(NodeVisitor):
- def __init__(self, old, new):
- self.old = old
- self.new = new
-
- def visit_ClockSignal(self, node):
- if node.cd == self.old:
- node.cd = self.new
-
- def visit_ResetSignal(self, node):
- if node.cd == self.old:
- node.cd = self.new
-
-
-def rename_clock_domain_expr(f, old, new):
- cdr = _ClockDomainRenamer(old, new)
- cdr.visit(f)
-
-
-def rename_clock_domain(f, old, new):
- rename_clock_domain_expr(f, old, new)
- if new != old:
- if old in f.sync:
- if new in f.sync:
- f.sync[new].extend(f.sync[old])
- else:
- f.sync[new] = f.sync[old]
- del f.sync[old]
- for special in f.specials:
- special.rename_clock_domain(old, new)
- try:
- cd = f.clock_domains[old]
- except KeyError:
- pass
- else:
- cd.rename(new)
-
-
-def call_special_classmethod(overrides, obj, method, *args, **kwargs):
- cl = obj.__class__
- if cl in overrides:
- cl = overrides[cl]
- if hasattr(cl, method):
- return getattr(cl, method)(obj, *args, **kwargs)
- else:
- return None
-
-
-def _lower_specials_step(overrides, specials):
- f = _Fragment()
- lowered_specials = set()
- for special in sorted(specials, key=lambda x: x.duid):
- impl = call_special_classmethod(overrides, special, "lower")
- if impl is not None:
- f += impl.get_fragment()
- lowered_specials.add(special)
- return f, lowered_specials
-
-
-def _can_lower(overrides, specials):
- for special in specials:
- cl = special.__class__
- if cl in overrides:
- cl = overrides[cl]
- if hasattr(cl, "lower"):
- return True
- return False
-
-
-def lower_specials(overrides, specials):
- f, lowered_specials = _lower_specials_step(overrides, specials)
- while _can_lower(overrides, f.specials):
- f2, lowered_specials2 = _lower_specials_step(overrides, f.specials)
- f += f2
- lowered_specials |= lowered_specials2
- f.specials -= lowered_specials2
- return f, lowered_specials
+++ /dev/null
-import inspect
-from sys import version_info
-from opcode import opname
-from collections import defaultdict
-
-# All opcodes are 2 bytes in length in Python 3.6
-def _bytecode_length_version_guard(old_len):
- return old_len if version_info[1] < 6 else 2
-
-_call_opcodes = {
- "CALL_FUNCTION" : _bytecode_length_version_guard(3),
- "CALL_FUNCTION_KW" : _bytecode_length_version_guard(3),
-}
-
-if version_info[1] < 6:
- _call_opcodes["CALL_FUNCTION_VAR"] = 3
- _call_opcodes["CALL_FUNCTION_VAR_KW"] = 3
-else:
- _call_opcodes["CALL_FUNCTION_VAR_KW"] = 2
-
-_load_build_opcodes = {
- "LOAD_GLOBAL" : _bytecode_length_version_guard(3),
- "LOAD_ATTR" : _bytecode_length_version_guard(3),
- "LOAD_FAST" : _bytecode_length_version_guard(3),
- "LOAD_DEREF" : _bytecode_length_version_guard(3),
- "DUP_TOP" : _bytecode_length_version_guard(1),
- "BUILD_LIST" : _bytecode_length_version_guard(3),
-}
-
-
-def get_var_name(frame):
- code = frame.f_code
- call_index = frame.f_lasti
- call_opc = opname[code.co_code[call_index]]
- if call_opc not in _call_opcodes:
- return None
- index = call_index+_call_opcodes[call_opc]
- while True:
- opc = opname[code.co_code[index]]
- if opc == "STORE_NAME" or opc == "STORE_ATTR":
- name_index = int(code.co_code[index+1])
- return code.co_names[name_index]
- elif opc == "STORE_FAST":
- name_index = int(code.co_code[index+1])
- return code.co_varnames[name_index]
- elif opc == "STORE_DEREF":
- name_index = int(code.co_code[index+1])
- return code.co_cellvars[name_index]
- elif opc in _load_build_opcodes:
- index += _load_build_opcodes[opc]
- else:
- return None
-
-
-def remove_underscore(s):
- if len(s) > 2 and s[0] == "_" and s[1] != "_":
- s = s[1:]
- return s
-
-
-def get_obj_var_name(override=None, default=None):
- if override:
- return override
-
- frame = inspect.currentframe().f_back
- # We can be called via derived classes. Go back the stack frames
- # until we reach the first class that does not inherit from us.
- ourclass = frame.f_locals["self"].__class__
- while "self" in frame.f_locals and isinstance(frame.f_locals["self"], ourclass):
- frame = frame.f_back
-
- vn = get_var_name(frame)
- if vn is None:
- vn = default
- else:
- vn = remove_underscore(vn)
- return vn
-
-name_to_idx = defaultdict(int)
-classname_to_objs = dict()
-
-
-def index_id(l, obj):
- for n, e in enumerate(l):
- if id(e) == id(obj):
- return n
- raise ValueError
-
-
-def trace_back(varname=None):
- l = []
- frame = inspect.currentframe().f_back.f_back
- while frame is not None:
- if varname is None:
- varname = get_var_name(frame)
- if varname is not None:
- varname = remove_underscore(varname)
- l.insert(0, (varname, name_to_idx[varname]))
- name_to_idx[varname] += 1
-
- try:
- obj = frame.f_locals["self"]
- except KeyError:
- obj = None
- if hasattr(obj, "__del__"):
- obj = None
-
- if obj is None:
- if varname is not None:
- coname = frame.f_code.co_name
- if coname == "<module>":
- modules = frame.f_globals["__name__"]
- modules = modules.split(".")
- coname = modules[len(modules)-1]
- coname = remove_underscore(coname)
- l.insert(0, (coname, name_to_idx[coname]))
- name_to_idx[coname] += 1
- else:
- classname = obj.__class__.__name__.lower()
- try:
- objs = classname_to_objs[classname]
- except KeyError:
- classname_to_objs[classname] = [obj]
- idx = 0
- else:
- try:
- idx = index_id(objs, obj)
- except ValueError:
- idx = len(objs)
- objs.append(obj)
- classname = remove_underscore(classname)
- l.insert(0, (classname, idx))
-
- varname = None
- frame = frame.f_back
- return l
+++ /dev/null
-from copy import copy
-from operator import itemgetter
-
-from litex.gen.fhdl.structure import *
-from litex.gen.fhdl.structure import (_Operator, _Slice, _Assign, _ArrayProxy,
- _Fragment)
-
-
-class NodeVisitor:
- def visit(self, node):
- if isinstance(node, Constant):
- self.visit_Constant(node)
- elif isinstance(node, Signal):
- self.visit_Signal(node)
- elif isinstance(node, ClockSignal):
- self.visit_ClockSignal(node)
- elif isinstance(node, ResetSignal):
- self.visit_ResetSignal(node)
- elif isinstance(node, _Operator):
- self.visit_Operator(node)
- elif isinstance(node, _Slice):
- self.visit_Slice(node)
- elif isinstance(node, Cat):
- self.visit_Cat(node)
- elif isinstance(node, Replicate):
- self.visit_Replicate(node)
- elif isinstance(node, _Assign):
- self.visit_Assign(node)
- elif isinstance(node, If):
- self.visit_If(node)
- elif isinstance(node, Case):
- self.visit_Case(node)
- elif isinstance(node, _Fragment):
- self.visit_Fragment(node)
- elif isinstance(node, (list, tuple)):
- self.visit_statements(node)
- elif isinstance(node, dict):
- self.visit_clock_domains(node)
- elif isinstance(node, _ArrayProxy):
- self.visit_ArrayProxy(node)
- else:
- self.visit_unknown(node)
-
- def visit_Constant(self, node):
- pass
-
- def visit_Signal(self, node):
- pass
-
- def visit_ClockSignal(self, node):
- pass
-
- def visit_ResetSignal(self, node):
- pass
-
- def visit_Operator(self, node):
- for o in node.operands:
- self.visit(o)
-
- def visit_Slice(self, node):
- self.visit(node.value)
-
- def visit_Cat(self, node):
- for e in node.l:
- self.visit(e)
-
- def visit_Replicate(self, node):
- self.visit(node.v)
-
- def visit_Assign(self, node):
- self.visit(node.l)
- self.visit(node.r)
-
- def visit_If(self, node):
- self.visit(node.cond)
- self.visit(node.t)
- self.visit(node.f)
-
- def visit_Case(self, node):
- self.visit(node.test)
- for v, statements in sorted(node.cases.items(),
- key=lambda x: str(x[0])):
- self.visit(statements)
-
- def visit_Fragment(self, node):
- self.visit(node.comb)
- self.visit(node.sync)
-
- def visit_statements(self, node):
- for statement in node:
- self.visit(statement)
-
- def visit_clock_domains(self, node):
- for clockname, statements in sorted(node.items(), key=itemgetter(0)):
- self.visit(statements)
-
- def visit_ArrayProxy(self, node):
- for choice in node.choices:
- self.visit(choice)
- self.visit(node.key)
-
- def visit_unknown(self, node):
- pass
-
-
-# Default methods always copy the node, except for:
-# - Signals, ClockSignals and ResetSignals
-# - Unknown objects
-# - All fragment fields except comb and sync
-# In those cases, the original node is returned unchanged.
-class NodeTransformer:
- def visit(self, node):
- if isinstance(node, Constant):
- return self.visit_Constant(node)
- elif isinstance(node, Signal):
- return self.visit_Signal(node)
- elif isinstance(node, ClockSignal):
- return self.visit_ClockSignal(node)
- elif isinstance(node, ResetSignal):
- return self.visit_ResetSignal(node)
- elif isinstance(node, _Operator):
- return self.visit_Operator(node)
- elif isinstance(node, _Slice):
- return self.visit_Slice(node)
- elif isinstance(node, Cat):
- return self.visit_Cat(node)
- elif isinstance(node, Replicate):
- return self.visit_Replicate(node)
- elif isinstance(node, _Assign):
- return self.visit_Assign(node)
- elif isinstance(node, If):
- return self.visit_If(node)
- elif isinstance(node, Case):
- return self.visit_Case(node)
- elif isinstance(node, _Fragment):
- return self.visit_Fragment(node)
- elif isinstance(node, (list, tuple)):
- return self.visit_statements(node)
- elif isinstance(node, dict):
- return self.visit_clock_domains(node)
- elif isinstance(node, _ArrayProxy):
- return self.visit_ArrayProxy(node)
- else:
- return self.visit_unknown(node)
-
- def visit_Constant(self, node):
- return node
-
- def visit_Signal(self, node):
- return node
-
- def visit_ClockSignal(self, node):
- return node
-
- def visit_ResetSignal(self, node):
- return node
-
- def visit_Operator(self, node):
- return _Operator(node.op, [self.visit(o) for o in node.operands])
-
- def visit_Slice(self, node):
- return _Slice(self.visit(node.value), node.start, node.stop)
-
- def visit_Cat(self, node):
- return Cat(*[self.visit(e) for e in node.l])
-
- def visit_Replicate(self, node):
- return Replicate(self.visit(node.v), node.n)
-
- def visit_Assign(self, node):
- return _Assign(self.visit(node.l), self.visit(node.r))
-
- def visit_If(self, node):
- r = If(self.visit(node.cond))
- r.t = self.visit(node.t)
- r.f = self.visit(node.f)
- return r
-
- def visit_Case(self, node):
- cases = {v: self.visit(statements)
- for v, statements in sorted(node.cases.items(),
- key=lambda x: str(x[0]))}
- r = Case(self.visit(node.test), cases)
- return r
-
- def visit_Fragment(self, node):
- r = copy(node)
- r.comb = self.visit(node.comb)
- r.sync = self.visit(node.sync)
- return r
-
- # NOTE: this will always return a list, even if node is a tuple
- def visit_statements(self, node):
- return [self.visit(statement) for statement in node]
-
- def visit_clock_domains(self, node):
- return {clockname: self.visit(statements)
- for clockname, statements in sorted(node.items(),
- key=itemgetter(0))}
-
- def visit_ArrayProxy(self, node):
- return _ArrayProxy([self.visit(choice) for choice in node.choices],
- self.visit(node.key))
-
- def visit_unknown(self, node):
- return node
+++ /dev/null
-"""
-Clock domain crossing module
-"""
-from math import gcd
-
-from litex.gen.fhdl.structure import *
-from litex.gen.fhdl.module import Module
-from litex.gen.fhdl.specials import Special, Memory
-from litex.gen.fhdl.bitcontainer import value_bits_sign
-from litex.gen.fhdl.decorators import ClockDomainsRenamer
-from litex.gen.genlib.misc import WaitTimer
-from litex.gen.genlib.resetsync import AsyncResetSynchronizer
-
-
-class MultiRegImpl(Module):
- def __init__(self, i, o, odomain, n):
- self.i = i
- self.o = o
- self.odomain = odomain
-
- w, signed = value_bits_sign(self.i)
- self.regs = [Signal((w, signed), reset_less=True)
- for i in range(n)]
-
- ###
-
- src = self.i
- for reg in self.regs:
- sd = getattr(self.sync, self.odomain)
- sd += reg.eq(src)
- src = reg
- self.comb += self.o.eq(src)
- for reg in self.regs:
- reg.attr.add("no_retiming")
-
-
-class MultiReg(Special):
- def __init__(self, i, o, odomain="sys", n=2):
- Special.__init__(self)
- self.i = wrap(i)
- self.o = wrap(o)
- self.odomain = odomain
- self.n = n
-
- def iter_expressions(self):
- yield self, "i", SPECIAL_INPUT
- yield self, "o", SPECIAL_OUTPUT
-
- def rename_clock_domain(self, old, new):
- Special.rename_clock_domain(self, old, new)
- if self.odomain == old:
- self.odomain = new
-
- def list_clock_domains(self):
- r = Special.list_clock_domains(self)
- r.add(self.odomain)
- return r
-
- @staticmethod
- def lower(dr):
- return MultiRegImpl(dr.i, dr.o, dr.odomain, dr.n)
-
-
-class PulseSynchronizer(Module):
- def __init__(self, idomain, odomain):
- self.i = Signal()
- self.o = Signal()
-
- ###
-
- toggle_i = Signal(reset_less=True)
- toggle_o = Signal() # registered reset_less by MultiReg
- toggle_o_r = Signal(reset_less=True)
-
- sync_i = getattr(self.sync, idomain)
- sync_o = getattr(self.sync, odomain)
-
- sync_i += If(self.i, toggle_i.eq(~toggle_i))
- self.specials += MultiReg(toggle_i, toggle_o, odomain)
- sync_o += toggle_o_r.eq(toggle_o)
- self.comb += self.o.eq(toggle_o ^ toggle_o_r)
-
-
-class BusSynchronizer(Module):
- """Clock domain transfer of several bits at once.
-
- Ensures that all the bits form a single word that was present
- synchronously in the input clock domain (unlike direct use of
- ``MultiReg``)."""
- def __init__(self, width, idomain, odomain, timeout=128):
- self.i = Signal(width)
- self.o = Signal(width, reset_less=True)
-
- if width == 1:
- self.specials += MultiReg(self.i, self.o, odomain)
- else:
- sync_i = getattr(self.sync, idomain)
- sync_o = getattr(self.sync, odomain)
-
- starter = Signal(reset=1)
- sync_i += starter.eq(0)
- self.submodules._ping = PulseSynchronizer(idomain, odomain)
- self.submodules._pong = PulseSynchronizer(odomain, idomain)
- self.submodules._timeout = ClockDomainsRenamer(idomain)(
- WaitTimer(timeout))
- self.comb += [
- self._timeout.wait.eq(~self._ping.i),
- self._ping.i.eq(starter | self._pong.o | self._timeout.done),
- self._pong.i.eq(self._ping.i)
- ]
-
- ibuffer = Signal(width, reset_less=True)
- obuffer = Signal(width) # registered reset_less by MultiReg
- sync_i += If(self._pong.o, ibuffer.eq(self.i))
- ibuffer.attr.add("no_retiming")
- self.specials += MultiReg(ibuffer, obuffer, odomain)
- sync_o += If(self._ping.o, self.o.eq(obuffer))
-
-
-class GrayCounter(Module):
- def __init__(self, width):
- self.ce = Signal()
- self.q = Signal(width)
- self.q_next = Signal(width)
- self.q_binary = Signal(width)
- self.q_next_binary = Signal(width)
-
- ###
-
- self.comb += [
- If(self.ce,
- self.q_next_binary.eq(self.q_binary + 1)
- ).Else(
- self.q_next_binary.eq(self.q_binary)
- ),
- self.q_next.eq(self.q_next_binary ^ self.q_next_binary[1:])
- ]
- self.sync += [
- self.q_binary.eq(self.q_next_binary),
- self.q.eq(self.q_next)
- ]
-
-
-class GrayDecoder(Module):
- def __init__(self, width):
- self.i = Signal(width)
- self.o = Signal(width, reset_less=True)
-
- # # #
-
- o_comb = Signal(width)
- self.comb += o_comb[-1].eq(self.i[-1])
- for i in reversed(range(width-1)):
- self.comb += o_comb[i].eq(o_comb[i+1] ^ self.i[i])
- self.sync += self.o.eq(o_comb)
-
-
-class ElasticBuffer(Module):
- def __init__(self, width, depth, idomain, odomain):
- self.din = Signal(width)
- self.dout = Signal(width)
-
- # # #
-
- reset = Signal()
- cd_write = ClockDomain()
- cd_read = ClockDomain()
- self.comb += [
- cd_write.clk.eq(ClockSignal(idomain)),
- cd_read.clk.eq(ClockSignal(odomain)),
- reset.eq(ResetSignal(idomain) | ResetSignal(odomain))
- ]
- self.specials += [
- AsyncResetSynchronizer(cd_write, reset),
- AsyncResetSynchronizer(cd_read, reset)
- ]
- self.clock_domains += cd_write, cd_read
-
- wrpointer = Signal(max=depth, reset=depth//2)
- rdpointer = Signal(max=depth)
-
- storage = Memory(width, depth)
- self.specials += storage
-
- wrport = storage.get_port(write_capable=True, clock_domain="write")
- rdport = storage.get_port(clock_domain="read")
- self.specials += wrport, rdport
-
- self.sync.write += wrpointer.eq(wrpointer + 1)
- self.sync.read += rdpointer.eq(rdpointer + 1)
-
- self.comb += [
- wrport.we.eq(1),
- wrport.adr.eq(wrpointer),
- wrport.dat_w.eq(self.din),
-
- rdport.adr.eq(rdpointer),
- self.dout.eq(rdport.dat_r)
- ]
-
-
-def lcm(a, b):
- """Compute the lowest common multiple of a and b"""
- return (a*b)//gcd(a, b)
-
-
-class Gearbox(Module):
- def __init__(self, iwidth, idomain, owidth, odomain):
- self.i = Signal(iwidth)
- self.o = Signal(owidth, reset_less=True)
-
- # # #
-
- rst = Signal()
- cd_write = ClockDomain()
- cd_read = ClockDomain()
- self.comb += [
- rst.eq(ResetSignal(idomain) | ResetSignal(odomain)),
- cd_write.clk.eq(ClockSignal(idomain)),
- cd_read.clk.eq(ClockSignal(odomain)),
- cd_write.rst.eq(rst),
- cd_read.rst.eq(rst)
- ]
- self.clock_domains += cd_write, cd_read
-
- storage = Signal(2*lcm(iwidth, owidth), reset_less=True)
- wrchunks = len(storage)//iwidth
- rdchunks = len(storage)//owidth
- wrpointer = Signal(max=wrchunks, reset=0 if iwidth > owidth else wrchunks//2)
- rdpointer = Signal(max=rdchunks, reset=rdchunks//2 if iwidth > owidth else 0)
-
- self.sync.write += \
- If(wrpointer == wrchunks-1,
- wrpointer.eq(0)
- ).Else(
- wrpointer.eq(wrpointer + 1)
- )
- cases = {}
- for i in range(wrchunks):
- cases[i] = [storage[iwidth*i:iwidth*(i+1)].eq(self.i)]
- self.sync.write += Case(wrpointer, cases)
-
-
- self.sync.read += \
- If(rdpointer == rdchunks-1,
- rdpointer.eq(0)
- ).Else(
- rdpointer.eq(rdpointer + 1)
- )
- cases = {}
- for i in range(rdchunks):
- cases[i] = [self.o.eq(storage[owidth*i:owidth*(i+1)])]
- self.sync.read += Case(rdpointer, cases)
\ No newline at end of file
+++ /dev/null
-"""
-Encoders and decoders between binary and one-hot representation
-"""
-
-from litex.gen.fhdl.structure import *
-from litex.gen.fhdl.module import Module
-
-
-class Encoder(Module):
- """Encode one-hot to binary
-
- If `n` is low, the `o` th bit in `i` is asserted, else none or
- multiple bits are asserted.
-
- Parameters
- ----------
- width : int
- Bit width of the input
-
- Attributes
- ----------
- i : Signal(width), in
- One-hot input
- o : Signal(max=width), out
- Encoded binary
- n : Signal(1), out
- Invalid, either none or multiple input bits are asserted
- """
- def __init__(self, width):
- self.i = Signal(width) # one-hot
- self.o = Signal(max=max(2, width)) # binary
- self.n = Signal() # invalid: none or multiple
- act = dict((1<<j, self.o.eq(j)) for j in range(width))
- act["default"] = self.n.eq(1)
- self.comb += Case(self.i, act)
-
-
-class PriorityEncoder(Module):
- """Priority encode requests to binary
-
- If `n` is low, the `o` th bit in `i` is asserted and the bits below
- `o` are unasserted, else `o == 0`. The LSB has priority.
-
- Parameters
- ----------
- width : int
- Bit width of the input
-
- Attributes
- ----------
- i : Signal(width), in
- Input requests
- o : Signal(max=width), out
- Encoded binary
- n : Signal(1), out
- Invalid, no input bits are asserted
- """
- def __init__(self, width):
- self.i = Signal(width) # one-hot, lsb has priority
- self.o = Signal(max=max(2, width)) # binary
- self.n = Signal() # none
- for j in range(width)[::-1]: # last has priority
- self.comb += If(self.i[j], self.o.eq(j))
- self.comb += self.n.eq(self.i == 0)
-
-
-class Decoder(Module):
- """Decode binary to one-hot
-
- If `n` is low, the `i` th bit in `o` is asserted, the others are
- not, else `o == 0`.
-
- Parameters
- ----------
- width : int
- Bit width of the output
-
- Attributes
- ----------
- i : Signal(max=width), in
- Input binary
- o : Signal(width), out
- Decoded one-hot
- n : Signal(1), in
- Invalid, no output bits are to be asserted
- """
-
- def __init__(self, width):
- self.i = Signal(max=max(2, width)) # binary
- self.n = Signal() # none/invalid
- self.o = Signal(width) # one-hot
- act = dict((j, self.o.eq(1<<j)) for j in range(width))
- self.comb += Case(self.i, act)
- self.comb += If(self.n, self.o.eq(0))
-
-
-class PriorityDecoder(Decoder):
- pass # same
+++ /dev/null
-from litex.gen.fhdl.structure import *
-from litex.gen.fhdl.module import Module
-
-
-class Divider(Module):
- def __init__(self, w):
- self.start_i = Signal()
- self.dividend_i = Signal(w)
- self.divisor_i = Signal(w)
- self.ready_o = Signal()
- self.quotient_o = Signal(w)
- self.remainder_o = Signal(w)
-
- ###
-
- qr = Signal(2*w)
- counter = Signal(max=w+1)
- divisor_r = Signal(w)
- diff = Signal(w+1)
-
- self.comb += [
- self.quotient_o.eq(qr[:w]),
- self.remainder_o.eq(qr[w:]),
- self.ready_o.eq(counter == 0),
- diff.eq(qr[w-1:] - divisor_r)
- ]
- self.sync += [
- If(self.start_i,
- counter.eq(w),
- qr.eq(self.dividend_i),
- divisor_r.eq(self.divisor_i)
- ).Elif(~self.ready_o,
- If(diff[w],
- qr.eq(Cat(0, qr[:2*w-1]))
- ).Else(
- qr.eq(Cat(1, qr[:w-1], diff[:w]))
- ),
- counter.eq(counter - 1)
- )
- ]
+++ /dev/null
-from litex.gen.fhdl.structure import *
-from litex.gen.fhdl.module import Module
-from litex.gen.fhdl.specials import Memory
-from litex.gen.fhdl.bitcontainer import log2_int
-from litex.gen.fhdl.decorators import ClockDomainsRenamer
-from litex.gen.genlib.cdc import MultiReg, GrayCounter
-
-
-def _inc(signal, modulo):
- if modulo == 2**len(signal):
- return signal.eq(signal + 1)
- else:
- return If(signal == (modulo - 1),
- signal.eq(0)
- ).Else(
- signal.eq(signal + 1)
- )
-
-
-class _FIFOInterface:
- """
- Data written to the input interface (`din`, `we`, `writable`) is
- buffered and can be read at the output interface (`dout`, `re`,
- `readable`). The data entry written first to the input
- also appears first on the output.
-
- Parameters
- ----------
- width : int
- Bit width for the data.
- depth : int
- Depth of the FIFO.
-
- Attributes
- ----------
- din : in, width
- Input data
- writable : out
- There is space in the FIFO and `we` can be asserted to load new data.
- we : in
- Write enable signal to latch `din` into the FIFO. Does nothing if
- `writable` is not asserted.
- dout : out, width
- Output data. Only valid if `readable` is asserted.
- readable : out
- Output data `dout` valid, FIFO not empty.
- re : in
- Acknowledge `dout`. If asserted, the next entry will be
- available on the next cycle (if `readable` is high then).
- """
- def __init__(self, width, depth):
- self.we = Signal()
- self.writable = Signal() # not full
- self.re = Signal()
- self.readable = Signal() # not empty
-
- self.din = Signal(width)
- self.dout = Signal(width)
- self.width = width
- self.depth = depth
-
-
-class SyncFIFO(Module, _FIFOInterface):
- """Synchronous FIFO (first in, first out)
-
- Read and write interfaces are accessed from the same clock domain.
- If different clock domains are needed, use :class:`AsyncFIFO`.
-
- {interface}
- level : out
- Number of unread entries.
- replace : in
- Replaces the last entry written into the FIFO with `din`. Does nothing
- if that entry has already been read (i.e. the FIFO is empty).
- Assert in conjunction with `we`.
- """
- __doc__ = __doc__.format(interface=_FIFOInterface.__doc__)
-
- def __init__(self, width, depth, fwft=True):
- _FIFOInterface.__init__(self, width, depth)
-
- self.level = Signal(max=depth+1)
- self.replace = Signal()
-
- ###
-
- produce = Signal(max=depth)
- consume = Signal(max=depth)
- storage = Memory(self.width, depth)
- self.specials += storage
-
- wrport = storage.get_port(write_capable=True)
- self.specials += wrport
- self.comb += [
- If(self.replace,
- wrport.adr.eq(produce-1)
- ).Else(
- wrport.adr.eq(produce)
- ),
- wrport.dat_w.eq(self.din),
- wrport.we.eq(self.we & (self.writable | self.replace))
- ]
- self.sync += If(self.we & self.writable & ~self.replace,
- _inc(produce, depth))
-
- do_read = Signal()
- self.comb += do_read.eq(self.readable & self.re)
-
- rdport = storage.get_port(async_read=fwft, has_re=not fwft)
- self.specials += rdport
- self.comb += [
- rdport.adr.eq(consume),
- self.dout.eq(rdport.dat_r)
- ]
- if not fwft:
- self.comb += rdport.re.eq(do_read)
- self.sync += If(do_read, _inc(consume, depth))
-
- self.sync += \
- If(self.we & self.writable & ~self.replace,
- If(~do_read, self.level.eq(self.level + 1))
- ).Elif(do_read,
- self.level.eq(self.level - 1)
- )
- self.comb += [
- self.writable.eq(self.level != depth),
- self.readable.eq(self.level != 0)
- ]
-
-
-class SyncFIFOBuffered(Module, _FIFOInterface):
- def __init__(self, width, depth):
- _FIFOInterface.__init__(self, width, depth)
- self.submodules.fifo = fifo = SyncFIFO(width, depth, False)
-
- self.writable = fifo.writable
- self.din = fifo.din
- self.we = fifo.we
- self.dout = fifo.dout
- self.level = Signal(max=depth+2)
-
- ###
-
- self.comb += fifo.re.eq(fifo.readable & (~self.readable | self.re))
- self.sync += \
- If(fifo.re,
- self.readable.eq(1),
- ).Elif(self.re,
- self.readable.eq(0),
- )
- self.comb += self.level.eq(fifo.level + self.readable)
-
-
-class AsyncFIFO(Module, _FIFOInterface):
- """Asynchronous FIFO (first in, first out)
-
- Read and write interfaces are accessed from different clock domains,
- named `read` and `write`. Use `ClockDomainsRenamer` to rename to
- other names.
-
- {interface}
- """
- __doc__ = __doc__.format(interface=_FIFOInterface.__doc__)
-
- def __init__(self, width, depth):
- _FIFOInterface.__init__(self, width, depth)
-
- ###
-
- depth_bits = log2_int(depth, True)
-
- produce = ClockDomainsRenamer("write")(GrayCounter(depth_bits+1))
- consume = ClockDomainsRenamer("read")(GrayCounter(depth_bits+1))
- self.submodules += produce, consume
- self.comb += [
- produce.ce.eq(self.writable & self.we),
- consume.ce.eq(self.readable & self.re)
- ]
-
- produce_rdomain = Signal(depth_bits+1)
- produce.q.attr.add("no_retiming")
- self.specials += MultiReg(produce.q, produce_rdomain, "read")
- consume_wdomain = Signal(depth_bits+1)
- consume.q.attr.add("no_retiming")
- self.specials += MultiReg(consume.q, consume_wdomain, "write")
- if depth_bits == 1:
- self.comb += self.writable.eq((produce.q[-1] == consume_wdomain[-1])
- | (produce.q[-2] == consume_wdomain[-2]))
- else:
- self.comb += [
- self.writable.eq((produce.q[-1] == consume_wdomain[-1])
- | (produce.q[-2] == consume_wdomain[-2])
- | (produce.q[:-2] != consume_wdomain[:-2]))
- ]
- self.comb += self.readable.eq(consume.q != produce_rdomain)
-
- storage = Memory(self.width, depth)
- self.specials += storage
- wrport = storage.get_port(write_capable=True, clock_domain="write")
- self.specials += wrport
- self.comb += [
- wrport.adr.eq(produce.q_binary[:-1]),
- wrport.dat_w.eq(self.din),
- wrport.we.eq(produce.ce)
- ]
- rdport = storage.get_port(clock_domain="read")
- self.specials += rdport
- self.comb += [
- rdport.adr.eq(consume.q_next_binary[:-1]),
- self.dout.eq(rdport.dat_r)
- ]
+++ /dev/null
-from collections import OrderedDict
-
-from litex.gen.fhdl.structure import *
-from litex.gen.fhdl.structure import _Statement, _Slice, _ArrayProxy
-from litex.gen.fhdl.module import Module, FinalizeError
-from litex.gen.fhdl.visit import NodeTransformer
-from litex.gen.fhdl.bitcontainer import value_bits_sign
-
-
-__all__ = ["AnonymousState", "NextState", "NextValue", "FSM"]
-
-
-class AnonymousState:
- pass
-
-
-# do not use namedtuple here as it inherits tuple
-# and the latter is used elsewhere in FHDL
-class NextState(_Statement):
- def __init__(self, state):
- self.state = state
-
-
-class NextValue(_Statement):
- def __init__(self, target, value):
- self.target = target
- self.value = value
-
-
-def _target_eq(a, b):
- if type(a) != type(b):
- return False
- ty = type(a)
- if ty == Constant:
- return a.value == b.value
- elif ty == Signal:
- return a is b
- elif ty == Cat:
- return all(_target_eq(x, y) for x, y in zip(a.l, b.l))
- elif ty == _Slice:
- return (_target_eq(a.value, b.value)
- and a.start == b.start
- and a.stop == b.stop)
- elif ty == _ArrayProxy:
- return (all(_target_eq(x, y) for x, y in zip(a.choices, b.choices))
- and _target_eq(a.key, b.key))
- else:
- raise ValueError("NextValue cannot be used with target type '{}'"
- .format(ty))
-
-
-class _LowerNext(NodeTransformer):
- def __init__(self, next_state_signal, encoding, aliases):
- self.next_state_signal = next_state_signal
- self.encoding = encoding
- self.aliases = aliases
- # (target, next_value_ce, next_value)
- self.registers = []
-
- def _get_register_control(self, target):
- for x in self.registers:
- if _target_eq(target, x[0]):
- return x[1], x[2]
- raise KeyError
-
- def visit_unknown(self, node):
- if isinstance(node, NextState):
- try:
- actual_state = self.aliases[node.state]
- except KeyError:
- actual_state = node.state
- return self.next_state_signal.eq(self.encoding[actual_state])
- elif isinstance(node, NextValue):
- try:
- next_value_ce, next_value = self._get_register_control(node.target)
- except KeyError:
- related = node.target if isinstance(node.target, Signal) else None
- next_value = Signal(bits_sign=value_bits_sign(node.target), related=related)
- next_value_ce = Signal(related=related)
- self.registers.append((node.target, next_value_ce, next_value))
- return next_value.eq(node.value), next_value_ce.eq(1)
- else:
- return node
-
-class FSM(Module):
- """
- Finite state machine
-
- Any Python objects can be used as states, e.g. strings.
-
- Parameters
- ----------
- reset_state
- Reset state. Defaults to the first added state.
-
- Examples
- --------
-
- >>> self.active = Signal()
- >>> self.bitno = Signal(3)
- >>>
- >>> fsm = FSM(reset_state="START")
- >>> self.submodules += fsm
- >>>
- >>> fsm.act("START",
- ... self.active.eq(1),
- ... If(strobe,
- ... NextState("DATA")
- ... )
- ... )
- >>> fsm.act("DATA",
- ... self.active.eq(1),
- ... If(strobe,
- ... NextValue(self.bitno, self.bitno + 1)
- ... If(self.bitno == 7,
- ... NextState("END")
- ... )
- ... )
- ... )
- >>> fsm.act("END",
- ... self.active.eq(0),
- ... NextState("STOP")
- ... )
-
- """
- def __init__(self, reset_state=None):
- self.actions = OrderedDict()
- self.state_aliases = dict()
- self.reset_state = reset_state
-
- self.before_entering_signals = OrderedDict()
- self.before_leaving_signals = OrderedDict()
- self.after_entering_signals = OrderedDict()
- self.after_leaving_signals = OrderedDict()
-
- def act(self, state, *statements):
- """
- Schedules `statements` to be executed in `state`. Statements may include:
-
- * combinatorial statements of form `a.eq(b)`, equivalent to
- `self.comb += a.eq(b)` when the FSM is in the given `state`;
- * synchronous statements of form `NextValue(a, b)`, equivalent to
- `self.sync += a.eq(b)` when the FSM is in the given `state`;
- * a statement of form `NextState(new_state)`, selecting the next state;
- * `If`, `Case`, etc.
- """
- if self.finalized:
- raise FinalizeError
- if self.reset_state is None:
- self.reset_state = state
- if state not in self.actions:
- self.actions[state] = []
- self.actions[state] += statements
-
- def delayed_enter(self, name, target, delay):
- if self.finalized:
- raise FinalizeError
- if delay > 0:
- state = name
- for i in range(delay):
- if i == delay - 1:
- next_state = target
- else:
- next_state = AnonymousState()
- self.act(state, NextState(next_state))
- state = next_state
- else:
- self.state_aliases[name] = target
-
- def ongoing(self, state):
- """
- Returns a signal that has the value 1 when the FSM is in the given `state`,
- and 0 otherwise.
- """
- is_ongoing = Signal()
- self.act(state, is_ongoing.eq(1))
- return is_ongoing
-
- def _get_signal(self, d, state):
- if state not in self.actions:
- self.actions[state] = []
- try:
- return d[state]
- except KeyError:
- is_el = Signal()
- d[state] = is_el
- return is_el
-
- def before_entering(self, state):
- return self._get_signal(self.before_entering_signals, state)
-
- def before_leaving(self, state):
- return self._get_signal(self.before_leaving_signals, state)
-
- def after_entering(self, state):
- signal = self._get_signal(self.after_entering_signals, state)
- self.sync += signal.eq(self.before_entering(state))
- return signal
-
- def after_leaving(self, state):
- signal = self._get_signal(self.after_leaving_signals, state)
- self.sync += signal.eq(self.before_leaving(state))
- return signal
-
- def do_finalize(self):
- nstates = len(self.actions)
- self.encoding = dict((s, n) for n, s in enumerate(self.actions.keys()))
- self.state = Signal(max=nstates, reset=self.encoding[self.reset_state])
- self.next_state = Signal(max=nstates)
-
- ln = _LowerNext(self.next_state, self.encoding, self.state_aliases)
- cases = dict((self.encoding[k], ln.visit(v)) for k, v in self.actions.items() if v)
- self.comb += [
- self.next_state.eq(self.state),
- Case(self.state, cases).makedefault(self.encoding[self.reset_state])
- ]
- self.sync += self.state.eq(self.next_state)
- for register, next_value_ce, next_value in ln.registers:
- self.sync += If(next_value_ce, register.eq(next_value))
-
- # drive entering/leaving signals
- for state, signal in self.before_leaving_signals.items():
- encoded = self.encoding[state]
- self.comb += signal.eq((self.state == encoded) & ~(self.next_state == encoded))
- if self.reset_state in self.after_entering_signals:
- self.after_entering_signals[self.reset_state].reset = 1
- for state, signal in self.before_entering_signals.items():
- encoded = self.encoding[state]
- self.comb += signal.eq(~(self.state == encoded) & (self.next_state == encoded))
+++ /dev/null
-from litex.gen.fhdl.structure import *
-from litex.gen.fhdl.module import Module
-from litex.gen.fhdl.specials import Special
-
-
-class DifferentialInput(Special):
- def __init__(self, i_p, i_n, o):
- Special.__init__(self)
- self.i_p = wrap(i_p)
- self.i_n = wrap(i_n)
- self.o = wrap(o)
-
- def iter_expressions(self):
- yield self, "i_p", SPECIAL_INPUT
- yield self, "i_n", SPECIAL_INPUT
- yield self, "o", SPECIAL_OUTPUT
-
- @staticmethod
- def lower(dr):
- raise NotImplementedError("Attempted to use a differential input, but platform does not support them")
-
-
-class DifferentialOutput(Special):
- def __init__(self, i, o_p, o_n):
- Special.__init__(self)
- self.i = wrap(i)
- self.o_p = wrap(o_p)
- self.o_n = wrap(o_n)
-
- def iter_expressions(self):
- yield self, "i", SPECIAL_INPUT
- yield self, "o_p", SPECIAL_OUTPUT
- yield self, "o_n", SPECIAL_OUTPUT
-
- @staticmethod
- def lower(dr):
- raise NotImplementedError("Attempted to use a differential output, but platform does not support them")
-
-
-class CRG(Module):
- """ Clock and Reset Generator """
-
- def __init__(self, clk, rst=0):
- self.clock_domains.cd_sys = ClockDomain()
- self.clock_domains.cd_por = ClockDomain(reset_less=True)
-
- if hasattr(clk, "p"):
- clk_se = Signal()
- self.specials += DifferentialInput(clk.p, clk.n, clk_se)
- clk = clk_se
-
- # Power on Reset (vendor agnostic)
- int_rst = Signal(reset=1)
- self.sync.por += int_rst.eq(rst)
- self.comb += [
- self.cd_sys.clk.eq(clk),
- self.cd_por.clk.eq(clk),
- self.cd_sys.rst.eq(int_rst)
- ]
-
-
-class DDRInput(Special):
- def __init__(self, i, o1, o2, clk=ClockSignal()):
- Special.__init__(self)
- self.i = wrap(i)
- self.o1 = wrap(o1)
- self.o2 = wrap(o2)
- self.clk = wrap(clk)
-
- def iter_expressions(self):
- yield self, "i", SPECIAL_INPUT
- yield self, "o1", SPECIAL_OUTPUT
- yield self, "o2", SPECIAL_OUTPUT
- yield self, "clk", SPECIAL_INPUT
-
- @staticmethod
- def lower(dr):
- raise NotImplementedError("Attempted to use a DDR input, but platform does not support them")
-
-
-class DDROutput(Special):
- def __init__(self, i1, i2, o, clk=ClockSignal()):
- Special.__init__(self)
- self.i1 = i1
- self.i2 = i2
- self.o = o
- self.clk = clk
-
- def iter_expressions(self):
- yield self, "i1", SPECIAL_INPUT
- yield self, "i2", SPECIAL_INPUT
- yield self, "o", SPECIAL_OUTPUT
- yield self, "clk", SPECIAL_INPUT
-
- @staticmethod
- def lower(dr):
- raise NotImplementedError("Attempted to use a DDR output, but platform does not support them")
-
+++ /dev/null
-from litex.gen.fhdl.structure import *
-from litex.gen.fhdl.module import Module
-from litex.gen.fhdl.bitcontainer import bits_for
-
-
-def split(v, *counts):
- r = []
- offset = 0
- for n in counts:
- if n != 0:
- r.append(v[offset:offset+n])
- else:
- r.append(None)
- offset += n
- return tuple(r)
-
-
-def displacer(signal, shift, output, n=None, reverse=False):
- if shift is None:
- return output.eq(signal)
- if n is None:
- n = 2**len(shift)
- w = len(signal)
- if reverse:
- r = reversed(range(n))
- else:
- r = range(n)
- l = [Replicate(shift == i, w) & signal for i in r]
- return output.eq(Cat(*l))
-
-
-def chooser(signal, shift, output, n=None, reverse=False):
- if shift is None:
- return output.eq(signal)
- if n is None:
- n = 2**len(shift)
- w = len(output)
- cases = {}
- for i in range(n):
- if reverse:
- s = n - i - 1
- else:
- s = i
- cases[i] = [output.eq(signal[s*w:(s+1)*w])]
- return Case(shift, cases).makedefault()
-
-
-def timeline(trigger, events):
- lastevent = max([e[0] for e in events])
- counter = Signal(max=lastevent+1)
-
- counterlogic = If(counter != 0,
- counter.eq(counter + 1)
- ).Elif(trigger,
- counter.eq(1)
- )
- # insert counter reset if it doesn't naturally overflow
- # (test if lastevent+1 is a power of 2)
- if (lastevent & (lastevent + 1)) != 0:
- counterlogic = If(counter == lastevent,
- counter.eq(0)
- ).Else(
- counterlogic
- )
-
- def get_cond(e):
- if e[0] == 0:
- return trigger & (counter == 0)
- else:
- return counter == e[0]
- sync = [If(get_cond(e), *e[1]) for e in events]
- sync.append(counterlogic)
- return sync
-
-
-class WaitTimer(Module):
- def __init__(self, t):
- self.wait = Signal()
- self.done = Signal()
-
- # # #
-
- count = Signal(bits_for(t), reset=t)
- self.comb += self.done.eq(count == 0)
- self.sync += \
- If(self.wait,
- If(~self.done, count.eq(count - 1))
- ).Else(count.eq(count.reset))
-
-
-class BitSlip(Module):
- def __init__(self, dw):
- self.i = Signal(dw)
- self.o = Signal(dw)
- self.value = Signal(max=dw)
-
- # # #
-
- r = Signal(2*dw)
- self.sync += r.eq(Cat(r[dw:], self.i))
- cases = {}
- for i in range(dw):
- cases[i] = self.o.eq(r[i:dw+i])
- self.sync += Case(self.value, cases)
+++ /dev/null
-from litex.gen.fhdl.structure import *
-from litex.gen.fhdl.tracer import get_obj_var_name
-
-from functools import reduce
-from operator import or_
-
-
-(DIR_NONE, DIR_S_TO_M, DIR_M_TO_S) = range(3)
-
-# Possible layout elements:
-# 1. (name, size)
-# 2. (name, size, direction)
-# 3. (name, sublayout)
-# size can be an int, or a (int, bool) tuple for signed numbers
-# sublayout must be a list
-
-
-def set_layout_parameters(layout, **layout_dict):
- def resolve(p):
- if isinstance(p, str):
- try:
- return layout_dict[p]
- except KeyError:
- return p
- else:
- return p
-
- r = []
- for f in layout:
- if isinstance(f[1], (int, tuple, str)): # cases 1/2
- if len(f) == 3:
- r.append((f[0], resolve(f[1]), f[2]))
- else:
- r.append((f[0], resolve(f[1])))
- elif isinstance(f[1], list): # case 3
- r.append((f[0], set_layout_parameters(f[1], **layout_dict)))
- else:
- raise TypeError
- return r
-
-
-def layout_len(layout):
- r = 0
- for f in layout:
- if isinstance(f[1], (int, tuple)): # cases 1/2
- if len(f) == 3:
- fname, fsize, fdirection = f
- else:
- fname, fsize = f
- elif isinstance(f[1], list): # case 3
- fname, fsublayout = f
- fsize = layout_len(fsublayout)
- else:
- raise TypeError
- if isinstance(fsize, tuple):
- r += fsize[0]
- else:
- r += fsize
- return r
-
-
-def layout_get(layout, name):
- for f in layout:
- if f[0] == name:
- return f
- raise KeyError(name)
-
-
-def layout_partial(layout, *elements):
- r = []
- for path in elements:
- path_s = path.split("/")
- last = path_s.pop()
- copy_ref = layout
- insert_ref = r
- for hop in path_s:
- name, copy_ref = layout_get(copy_ref, hop)
- try:
- name, insert_ref = layout_get(insert_ref, hop)
- except KeyError:
- new_insert_ref = []
- insert_ref.append((hop, new_insert_ref))
- insert_ref = new_insert_ref
- insert_ref.append(layout_get(copy_ref, last))
- return r
-
-
-class Record:
- def __init__(self, layout, name=None):
- self.name = get_obj_var_name(name, "")
- self.layout = layout
-
- if self.name:
- prefix = self.name + "_"
- else:
- prefix = ""
- for f in self.layout:
- if isinstance(f[1], (int, tuple)): # cases 1/2
- freset_less = False
- if(len(f) == 4):
- fname, fsize, fdirection, freset_less = f
- elif(len(f) == 3):
- fname, fsize, fdirection = f
- else:
- fname, fsize = f
- finst = Signal(fsize, name=prefix + fname, reset_less=freset_less)
- elif isinstance(f[1], list): # case 3
- fname, fsublayout = f
- finst = Record(fsublayout, prefix + fname)
- else:
- raise TypeError
- setattr(self, fname, finst)
-
- def eq(self, other):
- return [getattr(self, f[0]).eq(getattr(other, f[0]))
- for f in self.layout if hasattr(other, f[0])]
-
- def iter_flat(self):
- for f in self.layout:
- e = getattr(self, f[0])
- if isinstance(e, Signal):
- if len(f) == 3:
- yield e, f[2]
- else:
- yield e, DIR_NONE
- elif isinstance(e, Record):
- yield from e.iter_flat()
- else:
- raise TypeError
-
- def flatten(self):
- return [signal for signal, direction in self.iter_flat()]
-
- def raw_bits(self):
- return Cat(*self.flatten())
-
- def connect(self, *slaves, keep=None, omit=None):
- if keep is None:
- _keep = set([f[0] for f in self.layout])
- elif isinstance(keep, list):
- _keep = set(keep)
- else:
- _keep = keep
- if omit is None:
- _omit = set()
- elif isinstance(omit, list):
- _omit = set(omit)
- else:
- _omit = omit
-
- _keep = _keep - _omit
-
- r = []
- for f in self.layout:
- field = f[0]
- self_e = getattr(self, field)
- if isinstance(self_e, Signal):
- if field in _keep:
- direction = f[2]
- if direction == DIR_M_TO_S:
- r += [getattr(slave, field).eq(self_e) for slave in slaves]
- elif direction == DIR_S_TO_M:
- r.append(self_e.eq(reduce(or_, [getattr(slave, field) for slave in slaves])))
- else:
- raise TypeError
- else:
- for slave in slaves:
- r += self_e.connect(getattr(slave, field), keep=keep, omit=omit)
- return r
-
- def connect_flat(self, *slaves):
- r = []
- iter_slaves = [slave.iter_flat() for slave in slaves]
- for m_signal, m_direction in self.iter_flat():
- if m_direction == DIR_M_TO_S:
- for iter_slave in iter_slaves:
- s_signal, s_direction = next(iter_slave)
- assert(s_direction == DIR_M_TO_S)
- r.append(s_signal.eq(m_signal))
- elif m_direction == DIR_S_TO_M:
- s_signals = []
- for iter_slave in iter_slaves:
- s_signal, s_direction = next(iter_slave)
- assert(s_direction == DIR_S_TO_M)
- s_signals.append(s_signal)
- r.append(m_signal.eq(reduce(or_, s_signals)))
- else:
- raise TypeError
- return r
-
- def __len__(self):
- return layout_len(self.layout)
-
- def __repr__(self):
- return "<Record " + ":".join(f[0] for f in self.layout) + " at " + hex(id(self)) + ">"
+++ /dev/null
-from litex.gen.fhdl.structure import *
-from litex.gen.fhdl.specials import Special
-
-
-class AsyncResetSynchronizer(Special):
- def __init__(self, cd, async_reset):
- Special.__init__(self)
- self.cd = cd
- self.async_reset = wrap(async_reset)
-
- def iter_expressions(self):
- yield self.cd, "clk", SPECIAL_INPUT
- yield self.cd, "rst", SPECIAL_OUTPUT
- yield self, "async_reset", SPECIAL_INPUT
-
- @staticmethod
- def lower(dr):
- raise NotImplementedError("Attempted to use a reset synchronizer, but platform does not support them")
+++ /dev/null
-from litex.gen.fhdl.structure import *
-from litex.gen.fhdl.module import Module
-
-
-(SP_WITHDRAW, SP_CE) = range(2)
-
-
-class RoundRobin(Module):
- def __init__(self, n, switch_policy=SP_WITHDRAW):
- self.request = Signal(n)
- self.grant = Signal(max=max(2, n))
- self.switch_policy = switch_policy
- if self.switch_policy == SP_CE:
- self.ce = Signal()
-
- ###
-
- if n > 1:
- cases = {}
- for i in range(n):
- switch = []
- for j in reversed(range(i+1, i+n)):
- t = j % n
- switch = [
- If(self.request[t],
- self.grant.eq(t)
- ).Else(
- *switch
- )
- ]
- if self.switch_policy == SP_WITHDRAW:
- case = [If(~self.request[i], *switch)]
- else:
- case = switch
- cases[i] = case
- statement = Case(self.grant, cases)
- if self.switch_policy == SP_CE:
- statement = If(self.ce, statement)
- self.sync += statement
- else:
- self.comb += self.grant.eq(0)
+++ /dev/null
-from litex.gen.fhdl.structure import *
-from litex.gen.fhdl.module import Module
-
-
-class BitonicSort(Module):
- """Combinatorial sorting network
-
- The Bitonic sort is implemented as a combinatorial sort using
- comparators and multiplexers. Its asymptotic complexity (in terms of
- number of comparators/muxes) is O(n log(n)**2), like mergesort or
- shellsort.
-
- http://www.dps.uibk.ac.at/~cosenza/teaching/gpu/sort-batcher.pdf
-
- http://www.inf.fh-flensburg.de/lang/algorithmen/sortieren/bitonic/bitonicen.htm
-
- http://www.myhdl.org/doku.php/cookbook:bitonic
-
- Parameters
- ----------
- n : int
- Number of inputs and output signals.
- m : int
- Bit width of inputs and outputs. Or a tuple of `(m, signed)`.
- ascending : bool
- Sort direction. `True` if input is to be sorted ascending,
- `False` for descending. Defaults to ascending.
-
- Attributes
- ----------
- i : list of Signals, in
- Input values, each `m` wide.
- o : list of Signals, out
- Output values, sorted, each `m` bits wide.
- """
- def __init__(self, n, m, ascending=True):
- self.i = [Signal(m) for i in range(n)]
- self.o = [Signal(m) for i in range(n)]
- self._sort(self.i, self.o, int(ascending), m)
-
- def _sort_two(self, i0, i1, o0, o1, dir):
- self.comb += [
- o0.eq(i0),
- o1.eq(i1),
- If(dir == (i0 > i1),
- o0.eq(i1),
- o1.eq(i0),
- )]
-
- def _merge(self, i, o, dir, m):
- n = len(i)
- k = n//2
- if n > 1:
- t = [Signal(m) for j in range(n)]
- for j in range(k):
- self._sort_two(i[j], i[j + k], t[j], t[j + k], dir)
- self._merge(t[:k], o[:k], dir, m)
- self._merge(t[k:], o[k:], dir, m)
- else:
- self.comb += o[0].eq(i[0])
-
- def _sort(self, i, o, dir, m):
- n = len(i)
- k = n//2
- if n > 1:
- t = [Signal(m) for j in range(n)]
- self._sort(i[:k], t[:k], 1, m) # ascending
- self._sort(i[k:], t[k:], 0, m) # descending
- self._merge(t, o, dir, m)
- else:
- self.comb += o[0].eq(i[0])
+++ /dev/null
-from math import gcd
-import collections
-
-
-def flat_iteration(l):
- for element in l:
- if isinstance(element, collections.Iterable):
- for element2 in flat_iteration(element):
- yield element2
- else:
- yield element
-
-
-def xdir(obj, return_values=False):
- for attr in dir(obj):
- if attr[:2] != "__" and attr[-2:] != "__":
- if return_values:
- yield attr, getattr(obj, attr)
- else:
- yield attr
-
-
-def gcd_multiple(numbers):
- l = len(numbers)
- if l == 1:
- return numbers[0]
- else:
- s = l//2
- return gcd(gcd_multiple(numbers[:s]), gcd_multiple(numbers[s:]))