--- /dev/null
+from nmigen import *
+from nmigen.back import rtlil, verilog
+
+
+class System:
+ def __init__(self):
+ self.adr = Signal(16)
+ self.dat_r = Signal(8)
+ self.dat_w = Signal(8)
+ self.we = Signal()
+
+ def get_fragment(self, platform):
+ m = Module()
+ m.submodules += Instance("CPU",
+ p_RESET_ADDR=0xfff0,
+ i_d_adr =self.adr,
+ i_d_dat_r=self.dat_r,
+ o_d_dat_w=self.dat_w,
+ )
+ return m.lower(platform)
+
+
+sys = System()
+frag = sys.get_fragment(platform=None)
+# print(rtlil.convert(frag, ports=[sys.adr, sys.dat_r, sys.dat_w, sys.we]))
+print(verilog.convert(frag, ports=[sys.adr, sys.dat_r, sys.dat_w, sys.we]))
from .hdl.ast import Value, Const, Mux, Cat, Repl, Array, Signal, ClockSignal, ResetSignal
from .hdl.dsl import Module
from .hdl.cd import ClockDomain
-from .hdl.ir import Fragment
+from .hdl.ir import Fragment, Instance
from .hdl.xfrm import ResetInserter, CEInserter
from .lib.cdc import MultiReg
def convert_fragment(builder, fragment, name, top):
+ if fragment.black_box is not None:
+ port_map = OrderedDict()
+ for signal in fragment.ports:
+ port_map["\\{}".format(fragment.port_names[signal])] = signal
+
+ return "\\{}".format(fragment.black_box), port_map
+ else:
+ assert not fragment.port_names
+ assert not fragment.parameters
+
with builder.module(name or "anonymous", attrs={"top": 1} if top else {}) as module:
compiler_state = _ValueCompilerState(module)
rhs_compiler = _RHSValueCompiler(compiler_state)
module.cell(sub_name, name=sub_name, ports={
port: compiler_state.resolve_curr(signal, prefix=sub_name)
for port, signal in sub_port_map.items()
- })
+ }, params=subfragment.parameters)
with module.process() as process:
with process.case() as case:
class _MappedKeyCollection(metaclass=ABCMeta):
@abstractmethod
def _map_key(self, key):
- pass
+ pass # :nocov:
@abstractmethod
def _unmap_key(self, key):
- pass
+ pass # :nocov:
class _MappedKeyDict(MutableMapping, _MappedKeyCollection):
from .cd import *
-__all__ = ["Fragment", "DriverConflict"]
+__all__ = ["Fragment", "Instance", "DriverConflict"]
class DriverConflict(UserWarning):
class Fragment:
def __init__(self):
+ self.black_box = None
+ self.port_names = SignalDict()
+ self.parameters = OrderedDict()
+
self.ports = SignalDict()
self.drivers = OrderedDict()
self.statements = []
self.domains = OrderedDict()
self.subfragments = []
- def add_ports(self, *ports, kind):
- assert kind in ("i", "o", "io")
+ def add_ports(self, *ports, dir):
+ assert dir in ("i", "o", "io")
for port in flatten(ports):
- self.ports[port] = kind
+ self.ports[port] = dir
- def iter_ports(self):
- yield from self.ports.keys()
+ def iter_ports(self, dir=None):
+ if dir is None:
+ yield from self.ports
+ else:
+ for port, port_dir in self.ports.items():
+ if port_dir == dir:
+ yield port
def add_driver(self, signal, domain=None):
if domain not in self.drivers:
assert isinstance(subfragment, Fragment)
self.subfragments.append((subfragment, name))
+ def get_fragment(self, platform):
+ return self
+
def _resolve_driver_conflicts(self, hierarchy=("top",), mode="warn"):
assert mode in ("silent", "warn", "error")
for subfrag, name in self.subfragments:
# Always ask subfragments to provide all signals we're using and signals we're asked
# to provide. If the subfragment is not driving it, it will silently ignore it.
- sub_ins, sub_outs = subfrag._propagate_ports(ports=self_used | ports)
+ sub_ins, sub_outs, sub_inouts = subfrag._propagate_ports(ports=self_used | ports)
# Refine the input port approximation: if a subfragment is driving a signal,
# it is definitely not our input. But, if a subfragment requires a signal as an input,
# and we aren't driving it, it has to be our input as well.
# Refine the output port approximation: if a subfragment is driving a signal,
# and we're asked to provide it, we can provide it now.
outs |= ports & sub_outs
+ # All of our subfragments' bidirectional ports are also our bidirectional ports,
+ # since these are only used for pins.
+ self.add_ports(sub_inouts, dir="io")
# We've computed the precise set of input and output ports.
- self.add_ports(ins, kind="i")
- self.add_ports(outs, kind="o")
+ self.add_ports(ins, dir="i")
+ self.add_ports(outs, dir="o")
- return ins, outs
+ return (SignalSet(self.iter_ports("i")),
+ SignalSet(self.iter_ports("o")),
+ SignalSet(self.iter_ports("io")))
def prepare(self, ports=(), ensure_sync_exists=True):
from .xfrm import FragmentTransformer
fragment = fragment._lower_domain_signals()
fragment._propagate_ports(ports)
return fragment
+
+
+class Instance(Fragment):
+ def __init__(self, type, **kwargs):
+ super().__init__()
+ self.black_box = type
+ for kw, arg in kwargs.items():
+ if kw.startswith("p_"):
+ self.parameters[kw[2:]] = arg
+ elif kw.startswith("i_"):
+ self.port_names[arg] = kw[2:]
+ self.add_ports(arg, dir="i")
+ elif kw.startswith("o_"):
+ self.port_names[arg] = kw[2:]
+ self.add_ports(arg, dir="o")
+ elif kw.startswith("io_"):
+ self.port_names[arg] = kw[3:]
+ self.add_ports(arg, dir="io")
+ else:
+ raise NameError("Instance argument '{}' does not start with p_, i_, o_, or io_"
+ .format(arg))
class AbstractValueTransformer(metaclass=ABCMeta):
@abstractmethod
def on_Const(self, value):
- pass
+ pass # :nocov:
@abstractmethod
def on_Signal(self, value):
- pass
+ pass # :nocov:
@abstractmethod
def on_ClockSignal(self, value):
- pass
+ pass # :nocov:
@abstractmethod
def on_ResetSignal(self, value):
- pass
+ pass # :nocov:
@abstractmethod
def on_Operator(self, value):
- pass
+ pass # :nocov:
@abstractmethod
def on_Slice(self, value):
- pass
+ pass # :nocov:
@abstractmethod
def on_Part(self, value):
- pass
+ pass # :nocov:
@abstractmethod
def on_Cat(self, value):
- pass
+ pass # :nocov:
@abstractmethod
def on_Repl(self, value):
- pass
+ pass # :nocov:
@abstractmethod
def on_ArrayProxy(self, value):
- pass
+ pass # :nocov:
def on_unknown_value(self, value):
raise TypeError("Cannot transform value '{!r}'".format(value)) # :nocov:
class AbstractStatementTransformer(metaclass=ABCMeta):
@abstractmethod
def on_Assign(self, stmt):
- pass
+ pass # :nocov:
@abstractmethod
def on_Switch(self, stmt):
- pass
+ pass # :nocov:
@abstractmethod
def on_statements(self, stmt):
- pass
+ pass # :nocov:
def on_unknown_statement(self, stmt):
raise TypeError("Cannot transform statement '{!r}'".format(stmt)) # :nocov:
for subfragment, name in fragment.subfragments:
new_fragment.add_subfragment(self(subfragment), name)
+ def map_ports(self, fragment, new_fragment):
+ for port, dir in fragment.ports.items():
+ new_fragment.add_ports(port, dir=dir)
+
def map_domains(self, fragment, new_fragment):
for domain in fragment.iter_domains():
new_fragment.add_domains(fragment.domains[domain])
def on_fragment(self, fragment):
new_fragment = Fragment()
+ new_fragment.black_box = fragment.black_box
+ new_fragment.parameters = OrderedDict(fragment.parameters)
+ new_fragment.port_names = SignalDict(fragment.port_names.items())
+ self.map_ports(fragment, new_fragment)
self.map_subfragments(fragment, new_fragment)
self.map_domains(fragment, new_fragment)
self.map_statements(fragment, new_fragment)
+from collections import OrderedDict
+
from ..hdl.ast import *
from ..hdl.cd import *
from ..hdl.ir import *
def test_iter_signals(self):
f = Fragment()
- f.add_ports(self.s1, self.s2, kind="io")
+ f.add_ports(self.s1, self.s2, dir="io")
self.assertEqual(SignalSet((self.s1, self.s2)), f.iter_signals())
def test_self_contained(self):
(sync.clk, "i"),
]))
+ def test_inout(self):
+ s = Signal()
+ f1 = Fragment()
+ f2 = Fragment()
+ f2.add_ports(s, dir="io")
+ f1.add_subfragment(f2)
+
+ f1._propagate_ports(ports=())
+ self.assertEqual(f1.ports, SignalDict([
+ (s, "io")
+ ]))
+
class FragmentDomainsTestCase(FHDLTestCase):
def test_iter_signals(self):
(eq (sig c2) (const 1'd1))
)
""")
+
+
+class InstanceTestCase(FHDLTestCase):
+ def test_init(self):
+ rst = Signal()
+ stb = Signal()
+ pins = Signal(8)
+ inst = Instance("cpu", p_RESET=0x1234, i_rst=rst, o_stb=stb, io_pins=pins)
+ self.assertEqual(inst.black_box, "cpu")
+ self.assertEqual(inst.parameters, OrderedDict([("RESET", 0x1234)]))
+ self.assertEqual(inst.ports, SignalDict([
+ (rst, "i"),
+ (stb, "o"),
+ (pins, "io"),
+ ]))