from migen.fhdl.std import *
from migen.fhdl import verilog
+
class Example(Module):
def __init__(self):
dx = 2
from migen.genlib.complex import *
from migen.fhdl import verilog
+
class Example(Module):
def __init__(self):
w = Complex(32, 42)
from migen.fhdl import verilog
from migen.genlib.fsm import FSM, NextState, NextValue
+
class Example(Module):
def __init__(self):
self.s = Signal()
from migen.genlib.cdc import GrayCounter
from migen.sim.generic import run_simulation
+
class TB(Module):
def __init__(self, width=3):
self.width = width
from migen.fhdl import verilog
from migen.genlib.divider import Divider
+
class CDM(Module):
def __init__(self):
self.submodules.divider = Divider(5)
self.clock_domains.cd_sys = ClockDomain(reset_less=True)
+
class MultiMod(Module):
def __init__(self):
self.submodules.foo = CDM()
from migen.fhdl.std import *
from migen.fhdl import verilog
+
class Example(Module):
def __init__(self):
self.specials.mem = Memory(32, 100, init=[5, 18, 32])
from migen.fhdl import verilog
from migen.genlib.misc import optree
+
def gen_list(n):
s = [Signal() for i in range(n)]
return s
+
def gen_2list(n):
s = [Signal(2) for i in range(n)]
return s
+
class Foo:
def __init__(self):
la = gen_list(3)
lb = gen_2list(2)
self.sigs = la + lb
+
class Bar:
def __init__(self):
self.sigs = gen_list(2)
+
class Example(Module):
def __init__(self):
a = [Bar() for x in range(3)]
from migen.fhdl import verilog
from migen.genlib.cdc import *
+
class XilinxMultiRegImpl(MultiRegImpl):
def __init__(self, *args, **kwargs):
MultiRegImpl.__init__(self, *args, **kwargs)
self.specials += set(SynthesisDirective("attribute shreg_extract of {r} is no", r=r)
for r in self.regs)
+
class XilinxMultiReg:
@staticmethod
def lower(dr):
("ack", 1, DIR_S_TO_M)
]
+
class Test(Module):
def __init__(self):
master = Record(L)
from migen.fhdl.std import *
from migen.fhdl import verilog
+
class Example(Module):
def __init__(self):
a = Signal(3)
from migen.genlib.cdc import MultiReg
from migen.bank import description, csrgen
+
class Example(Module):
def __init__(self, ninputs=32, noutputs=32):
r_o = description.CSRStorage(noutputs, atomic_write=True)
from migen.fhdl.std import *
from migen.fhdl import verilog
+
class Example(Module):
def __init__(self, n=6):
self.pad = Signal(n)
from migen.fhdl import verilog
from migen.genlib import divider
+
@ResetInserter()
@CEInserter()
class Example(Module):
from migen.bus import wishbone
from migen.sim.generic import run_simulation
+
class MyModel:
def read(self, address):
return address + 4
+
class MyModelWB(MyModel, wishbone.TargetModel):
def __init__(self):
self.prng = Random(763627)
def can_ack(self, bus):
return self.prng.randrange(0, 2)
+
def adrgen_gen():
for i in range(10):
print("Address: " + hex(i))
yield Token("address", {"a": i})
+
class SimAdrGen(SimActor):
def __init__(self, nbits):
self.address = Source([("a", nbits)])
SimActor.__init__(self, adrgen_gen())
+
def dumper_gen():
while True:
t = Token("data", idle_wait=True)
yield t
print("Received: " + hex(t.value["d"]))
+
class SimDumper(SimActor):
def __init__(self):
self.data = Sink([("d", 32)])
SimActor.__init__(self, dumper_gen())
+
def trgen_gen():
for i in range(10):
a = i
print("Address: " + hex(a) + " Data: " + hex(d))
yield Token("address_data", {"a": a, "d": d})
+
class SimTrGen(SimActor):
def __init__(self, a_nbits):
self.address_data = Source([("a", a_nbits), ("d", 32)])
SimActor.__init__(self, trgen_gen())
+
class TBWishbone(Module):
def __init__(self, master):
self.submodules.peripheral = wishbone.Target(MyModelWB())
self.submodules.interconnect = wishbone.InterconnectPointToPoint(master.bus,
self.peripheral.bus)
+
class TBWishboneReader(TBWishbone):
def __init__(self):
self.adrgen = SimAdrGen(30)
self.submodules.comp = CompositeActor(g)
TBWishbone.__init__(self, self.reader)
+
class TBWishboneWriter(TBWishbone):
def __init__(self):
self.trgen = SimTrGen(30)
self.submodules.comp = CompositeActor(g)
TBWishbone.__init__(self, self.writer)
+
def test_wb_reader():
print("*** Testing Wishbone reader")
run_simulation(TBWishboneReader(), 100)
+
def test_wb_writer():
print("*** Testing Wishbone writer")
run_simulation(TBWishboneWriter(), 100)
from migen.actorlib.sim import *
from migen.sim.generic import run_simulation
+
def source_gen():
for i in range(10):
v = i + 5
print("==> " + str(v))
yield Token("source", {"maximum": v})
+
class SimSource(SimActor):
def __init__(self):
self.source = Source([("maximum", 32)])
SimActor.__init__(self, source_gen())
+
def sink_gen():
while True:
t = Token("sink")
yield t
print(t.value["value"])
+
class SimSink(SimActor):
def __init__(self):
self.sink = Sink([("value", 32)])
packed_layout = structuring.pack_layout(base_layout, pack_factor)
rawbits_layout = [("value", 32*pack_factor)]
+
def source_gen():
for i in count(0):
yield Token("source", {"value": i})
+
class SimSource(SimActor):
def __init__(self):
self.source = Source(base_layout)
SimActor.__init__(self, source_gen())
+
def sink_gen():
while True:
t = Token("sink")
yield t
print(t.value["value"])
+
class SimSink(SimActor):
def __init__(self):
self.sink = Sink(base_layout)
SimActor.__init__(self, sink_gen())
+
class TB(Module):
def __init__(self):
source = SimSource()
from migen.bus import wishbone
from migen.sim.generic import run_simulation
+
# Our bus master.
# Python generators let us program bus transactions in an elegant sequential style.
def my_generator():
for delay in range(prng.randrange(0, 3)):
yield None
+
# Our bus slave.
class MyModelWB(wishbone.TargetModel):
def __init__(self):
# Simulate variable latency.
return self.prng.randrange(0, 2)
+
class TB(Module):
def __init__(self):
# The "wishbone.Initiator" library component runs our generator
from migen.fhdl.std import *
from migen.sim.generic import run_simulation
+
# Our simple counter, which increments at every cycle
# and prints its current value in simulation.
class Counter(Module):
from migen.fhdl.std import *
from migen.sim.generic import run_simulation
+
# A slightly more elaborate counter.
# Has a clock enable (CE) signal, counts on more bits
# and resets with a negative number.
from migen.actorlib.sim import *
from migen.sim.generic import run_simulation
+
def source_gen():
for i in range(10):
print("Sending: " + str(i))
yield Token("source", {"value": i})
+
class SimSource(SimActor):
def __init__(self):
self.source = Source([("value", 32)])
SimActor.__init__(self, source_gen())
+
def sink_gen():
while True:
t = Token("sink")
yield t
print("Received: " + str(t.value["value"]))
+
class SimSink(SimActor):
def __init__(self):
self.sink = Sink([("value", 32)])
SimActor.__init__(self, sink_gen())
+
class TB(Module):
def __init__(self):
self.source = SimSource()
from migen.genlib.misc import optree
from migen.sim.generic import run_simulation
+
# A synthesizable FIR filter.
class FIR(Module):
def __init__(self, coef, wsize=16):
self.sync += sum_full.eq(optree("+", muls))
self.comb += self.o.eq(sum_full[self.wsize-1:])
+
# A test bench for our FIR filter.
# Generates a sine wave at the input and records the output.
class TB(Module):
from migen.fhdl.std import *
from migen.sim.generic import run_simulation
+
class Mem(Module):
def __init__(self):
# Initialize the beginning of the memory with integers
from mibuild.generic_platform import GenericPlatform
from mibuild.altera import common, quartus
+
class AlteraPlatform(GenericPlatform):
bitstream_ext = ".sof"
from mibuild.generic_programmer import GenericProgrammer
+
class USBBlaster(GenericProgrammer):
needs_bitreverse = False
from mibuild import tools
from mibuild.xilinx import common
+
def _format_constraint(c):
if isinstance(c, Pins):
return "set_location_assignment PIN_" + c.identifiers[0]
elif isinstance(c, Misc):
return c.misc
+
def _format_qsf(signame, pin, others, resname):
fmt_c = [_format_constraint(c) for c in ([Pins(pin)] + others)]
fmt_r = resname[0] + ":" + str(resname[1])
r += c + " -to " + signame + " # " + fmt_r + "\n"
return r
+
def _build_qsf(named_sc, named_pc):
r = ""
for sig, pins, others, resname in named_sc:
r += "set_global_assignment -name top_level_entity top\n"
return r
+
def _build_files(device, sources, vincpaths, named_sc, named_pc, build_name):
qsf_contents = ""
for filename, language in sources:
qsf_contents += "set_global_assignment -name DEVICE " + device
tools.write_to_file(build_name + ".qsf", qsf_contents)
+
def _run_quartus(build_name, quartus_path):
build_script_contents = """# Autogenerated by mibuild
if r != 0:
raise OSError("Subprocess failed")
+
class AlteraQuartusToolchain:
def build(self, platform, fragment, build_dir="build", build_name="top",
quartus_path="/opt/Altera", run=True):
from mibuild import tools
+
class ConstraintError(Exception):
pass
+
class Pins:
def __init__(self, *identifiers):
self.identifiers = []
for i in identifiers:
self.identifiers += i.split()
+
class IOStandard:
def __init__(self, name):
self.name = name
+
class Drive:
def __init__(self, strength):
self.strength = strength
+
class Misc:
def __init__(self, misc):
self.misc = misc
+
class Subsignal:
def __init__(self, name, *constraints):
self.name = name
self.constraints = list(constraints)
+
class PlatformInfo:
def __init__(self, info):
self.info = info
+
def _lookup(description, name, number):
for resource in description:
if resource[0] == name and (number is None or resource[1] == number):
return resource
raise ConstraintError("Resource not found: " + name + ":" + str(number))
+
def _resource_type(resource):
t = None
for element in resource[2:]:
t.append((element.name, n_bits))
return t
+
class ConnectorManager:
def __init__(self, connectors):
self.connector_table = dict()
r.append(identifier)
return r
+
def _separate_pins(constraints):
pins = None
others = []
others.append(c)
return pins, others
+
class ConstraintManager:
def __init__(self, io, connectors):
self.available = list(io)
def get_platform_commands(self):
return self.platform_commands
+
class GenericPlatform:
def __init__(self, device, io, connectors=[], name=None):
self.device = device
import os
+
class GenericProgrammer:
def __init__(self, flash_proxy_basename=None):
self.flash_proxy_basename = flash_proxy_basename
from migen.genlib.resetsync import AsyncResetSynchronizer
+
class LatticeAsyncResetSynchronizerImpl(Module):
def __init__(self, cd, async_reset):
rst1 = Signal()
i_CK=cd.clk, o_Q=cd.rst)
]
+
class LatticeAsyncResetSynchronizer:
@staticmethod
def lower(dr):
return LatticeAsyncResetSynchronizerImpl(dr.cd, dr.async_reset)
+
class LatticeDDROutputImpl(Module):
def __init__(self, i1, i2, o, clk):
self.specials += Instance("ODDRXD1",
i_DA=i1, i_DB=i2, o_Q=o,
)
+
class LatticeDDROutput:
@staticmethod
def lower(dr):
from mibuild import tools
from mibuild.lattice import common
+
def _format_constraint(c):
if isinstance(c, Pins):
return ("LOCATE COMP ", " SITE " + "\"" + c.identifiers[0] + "\"")
elif isinstance(c, Misc):
return c.misc
+
def _format_lpf(signame, pin, others, resname):
fmt_c = [_format_constraint(c) for c in ([Pins(pin)] + others)]
r = ""
r += pre + "\""+ signame +"\"" + suf + ";\n"
return r
+
def _build_lpf(named_sc, named_pc):
r = "BLOCK RESETPATHS;\n"
r += "BLOCK ASYNCPATHS;\n"
r += "\n" + "\n\n".join(named_pc)
return r
+
def _build_files(device, sources, vincpaths, build_name):
tcl = []
tcl.append("prj_project new -name \"%s\" -impl \"implementation\" -dev %s -synthesis \"synplify\"" %(build_name, device))
tcl.append("prj_run Export -impl implementation -task Bitgen")
tools.write_to_file(build_name + ".tcl", "\n".join(tcl))
+
def _run_diamond(build_name, source, ver=None):
if sys.platform == "win32" or sys.platform == "cygwin":
build_script_contents = "REM Autogenerated by mibuild\n"
if r != 0:
raise OSError("Subprocess failed")
+
class LatticeDiamondToolchain:
def build(self, platform, fragment, build_dir="build", build_name="top",
diamond_path="/opt/Diamond", run=True):
from mibuild.generic_platform import GenericPlatform
from mibuild.lattice import common, diamond
+
class LatticePlatform(GenericPlatform):
bitstream_ext = ".bit"
</ispXCF>
"""
+
class LatticeProgrammer(GenericProgrammer):
needs_bitreverse = False
"None") # 116 USBH2_CLK USB_HOST2 +2V5 PA0
]
+
class Platform(XilinxPlatform):
default_clk_name = "clk0"
default_clk_period = 10
"None") # 140 FPGA_BANK3_POWER
]
+
class Platform(XilinxPlatform):
default_clk_name = "clk3"
default_clk_period = 10.526
),
]
+
class Platform(AlteraPlatform):
default_clk_name = "clk50"
default_clk_period = 20
)
]
+
class Platform(XilinxPlatform):
identifier = 0x4B37
default_clk_name = "clk156"
)
]
+
class Platform(XilinxPlatform):
identifier = 0x4D31
default_clk_name = "clk50"
("F", "E2 E1 E4 F4 F5 G3 F3 G1 H3 H1 H2 J1")
]
+
class Platform(XilinxPlatform):
default_clk_name = "clk32"
default_clk_period = 31.25
),
]
+
class Platform(XilinxPlatform):
identifier = 0x4D58
default_clk_name = "clk50"
)
]
+
class Platform(XilinxPlatform):
default_clk_name = "clk200"
default_clk_period = 5
("C", "P114 P115 P116 P117 P118 P119 P120 P121 P123 P124 P126 P127 P131 P132 P133 P134")
]
+
class Platform(XilinxPlatform):
identifier = 0x5050
default_clk_name = "clk32"
("C", "F17 F16 E16 G16 F15 G14 F14 H14 H13 J13 G13 H12 K14 K13 K12 L12"),
]
+
class Platform(XilinxPlatform):
identifier = 0x5049
default_clk_name = "clk50"
)
]
+
class Platform(XilinxPlatform):
default_clk_name = "clk100"
default_clk_period = 10
),
]
+
class Platform(XilinxPlatform):
def __init__(self):
XilinxPlatform.__init__(self, "xc5vsx95t-ff1136-1", _io)
from mibuild.generic_platform import *
from mibuild.sim import SimPlatform
+
class SimPins(Pins):
def __init__(self, n):
Pins.__init__(self, "s "*n)
),
]
+
class Platform(SimPlatform):
is_sim = True
default_clk_name = "sys_clk"
),
]
+
class Platform(LatticePlatform):
default_clk_name = "clk100"
default_clk_period = 10
]
+
class Platform(XilinxPlatform):
default_clk_name = "clk_if"
default_clk_period = 20
from mibuild.generic_platform import GenericPlatform
from mibuild.sim import common, verilator
+
class SimPlatform(GenericPlatform):
def __init__(self, *args, toolchain="verilator", **kwargs):
GenericPlatform.__init__(self, *args, **kwargs)
from mibuild import tools
from mibuild.sim import common
+
def _build_tb(platform, vns, serial, template):
def io_name(ressource, subsignal=None):
f.close()
tools.write_to_file("dut_tb.cpp", content)
+
def _build_sim(platform, vns, build_name, include_paths, sim_path, serial, verbose):
include = ""
for path in include_paths:
if r != 0:
raise OSError("Subprocess failed")
+
def _run_sim(build_name):
run_script_contents = """obj_dir/Vdut
"""
if r != 0:
raise OSError("Subprocess failed")
+
class SimVerilatorToolchain:
# XXX fir sim_path
def build(self, platform, fragment, build_dir="build", build_name="top",
import os, struct
from distutils.version import StrictVersion
+
def mkdir_noerror(d):
try:
os.mkdir(d)
except OSError:
pass
+
def language_by_filename(name):
extension = name.rsplit(".")[-1]
if extension in ["v", "vh", "vo"]:
return "vhdl"
return None
+
def write_to_file(filename, contents, force_unix=False):
newline = None
if force_unix:
with open(filename, "w", newline=newline) as f:
f.write(contents)
+
def arch_bits():
return struct.calcsize("P")*8
+
def versions(path):
for n in os.listdir(path):
full = os.path.join(path, n)
from migen.genlib.io import *
from mibuild import tools
+
def settings(path, ver=None, sub=None):
vers = list(tools.versions(path))
if ver is None:
raise OSError("no settings file found")
+
class XilinxNoRetimingImpl(Module):
def __init__(self, reg):
self.specials += SynthesisDirective("attribute register_balancing of {r} is no", r=reg)
+
class XilinxNoRetiming:
@staticmethod
def lower(dr):
return XilinxNoRetimingImpl(dr.reg)
+
class XilinxMultiRegImpl(MultiRegImpl):
def __init__(self, *args, **kwargs):
MultiRegImpl.__init__(self, *args, **kwargs)
self.specials += [SynthesisDirective("attribute shreg_extract of {r} is no", r=r)
for r in self.regs]
+
class XilinxMultiReg:
@staticmethod
def lower(dr):
return XilinxMultiRegImpl(dr.i, dr.o, dr.odomain, dr.n)
+
class XilinxAsyncResetSynchronizerImpl(Module):
def __init__(self, cd, async_reset):
rst1 = Signal()
i_CE=1, i_C=cd.clk, o_Q=cd.rst)
]
+
class XilinxAsyncResetSynchronizer:
@staticmethod
def lower(dr):
return XilinxAsyncResetSynchronizerImpl(dr.cd, dr.async_reset)
+
class XilinxDifferentialInputImpl(Module):
def __init__(self, i_p, i_n, o):
self.specials += Instance("IBUFDS", i_I=i_p, i_IB=i_n, o_O=o)
+
class XilinxDifferentialInput:
@staticmethod
def lower(dr):
return XilinxDifferentialInputImpl(dr.i_p, dr.i_n, dr.o)
+
class XilinxDifferentialOutputImpl(Module):
def __init__(self, i, o_p, o_n):
self.specials += Instance("OBUFDS", i_I=i, o_O=o_p, o_OB=o_n)
+
class XilinxDifferentialOutput:
@staticmethod
def lower(dr):
return XilinxDifferentialOutputImpl(dr.i, dr.o_p, dr.o_n)
+
class XilinxDDROutputImpl(Module):
def __init__(self, i1, i2, o, clk):
self.specials += Instance("ODDR",
i_D1=i1, i_D2=i2, o_Q=o,
)
+
class XilinxDDROutput:
@staticmethod
def lower(dr):
from mibuild import tools
from mibuild.xilinx import common
+
def _format_constraint(c):
if isinstance(c, Pins):
return "LOC=" + c.identifiers[0]
elif isinstance(c, Misc):
return c.misc
+
def _format_ucf(signame, pin, others, resname):
fmt_c = []
for c in [Pins(pin)] + others:
fmt_r += "." + resname[2]
return "NET \"" + signame + "\" " + " | ".join(fmt_c) + "; # " + fmt_r + "\n"
+
def _build_ucf(named_sc, named_pc):
r = ""
for sig, pins, others, resname in named_sc:
r += "\n" + "\n\n".join(named_pc)
return r
+
def _build_xst_files(device, sources, vincpaths, build_name, xst_opt):
prj_contents = ""
for filename, language in sources:
xst_contents += "-vlgincdir " + path + "\n"
tools.write_to_file(build_name + ".xst", xst_contents)
+
def _run_yosys(device, sources, vincpaths, build_name):
ys_contents = ""
incflags = ""
if r != 0:
raise OSError("Subprocess failed")
+
def _run_ise(build_name, ise_path, source, mode, ngdbuild_opt,
bitgen_opt, ise_commands, map_opt, par_opt, ver=None):
if sys.platform == "win32" or sys.platform == "cygwin":
if r != 0:
raise OSError("Subprocess failed")
+
class XilinxISEToolchain:
def __init__(self):
self.xst_opt = """-ifmt MIXED
from mibuild.generic_platform import GenericPlatform
from mibuild.xilinx import common, vivado, ise
+
class XilinxPlatform(GenericPlatform):
bitstream_ext = ".bit"
from mibuild.generic_programmer import GenericProgrammer
from mibuild.xilinx import common
+
def _run_urjtag(cmds):
with subprocess.Popen("jtag", stdin=subprocess.PIPE) as process:
process.stdin.write(cmds.encode("ASCII"))
process.communicate()
+
class UrJTAG(GenericProgrammer):
needs_bitreverse = True
""".format(flash_proxy=flash_proxy, address=address, data_file=data_file)
_run_urjtag(cmds)
+
class XC3SProg(GenericProgrammer):
needs_bitreverse = False
subprocess.call(["xc3sprog", "-v", "-c", self.cable, "-I"+flash_proxy, "{}:w:0x{:x}:BIN".format(data_file, address)])
+
class FpgaProg(GenericProgrammer):
needs_bitreverse = False
subprocess.call(["fpgaprog", "-v", "-sa", "-r", "-b", flash_proxy,
"-f", data_file])
+
def _run_impact(cmds):
with subprocess.Popen("impact -batch", stdin=subprocess.PIPE) as process:
process.stdin.write(cmds.encode("ASCII"))
process.communicate()
+
class iMPACT(GenericProgrammer):
needs_bitreverse = False
""".format(bitstream=bitstream_file)
_run_impact(cmds)
+
def _run_vivado(path, ver, cmds):
if sys.platform == "win32" or sys.platform == "cygwin":
vivado_cmd = "vivado -mode tcl"
process.stdin.write(cmds.encode("ASCII"))
process.communicate()
+
class VivadoProgrammer(GenericProgrammer):
needs_bitreverse = False
def __init__(self, vivado_path="/opt/Xilinx/Vivado", vivado_ver=None):
from mibuild import tools
from mibuild.xilinx import common
+
def _format_constraint(c):
if isinstance(c, Pins):
return "set_property LOC " + c.identifiers[0]
else:
raise ValueError("unknown constraint %s" % c)
+
def _format_xdc(signame, resname, *constraints):
fmt_c = [_format_constraint(c) for c in constraints]
fmt_r = resname[0] + ":" + str(resname[1])
r += c + " [get_ports " + signame + "]\n"
return r
+
def _build_xdc(named_sc, named_pc):
r = ""
for sig, pins, others, resname in named_sc:
r += "\n" + "\n\n".join(named_pc)
return r
+
def _run_vivado(build_name, vivado_path, source, ver=None):
if sys.platform == "win32" or sys.platform == "cygwin":
build_script_contents = "REM Autogenerated by mibuild\n"
if r != 0:
raise OSError("Subprocess failed")
+
class XilinxVivadoToolchain:
def __init__(self):
self.bitstream_commands = []
from migen.bus import wishbone
from migen.flow.actor import *
+
class Reader(Module):
def __init__(self):
self.bus = wishbone.Interface()
)
]
+
class Writer(Module):
def __init__(self):
self.bus = wishbone.Interface()
from migen.flow.actor import *
from migen.genlib import fifo
+
class _FIFOActor(Module):
def __init__(self, fifo_class, layout, depth):
self.sink = Sink(layout)
self.source.eop.eq(self.fifo.dout.eop)
]
+
class SyncFIFO(_FIFOActor):
def __init__(self, layout, depth, buffered=False):
_FIFOActor.__init__(
fifo.SyncFIFOBuffered if buffered else fifo.SyncFIFO,
layout, depth)
+
class AsyncFIFO(_FIFOActor):
def __init__(self, layout, depth):
_FIFOActor.__init__(self, fifo.AsyncFIFO, layout, depth)
from migen.genlib.fsm import *
from migen.flow.actor import *
+
# Generates integers from start to maximum-1
class IntSequence(Module):
def __init__(self, nbits, offsetbits=0, step=1):
from migen.flow.transactions import *
from migen.util.misc import xdir
+
def _sim_multiread(sim, obj):
if isinstance(obj, Signal):
return sim.rd(obj)
r[k] = rd
return r
+
def _sim_multiwrite(sim, obj, value):
if isinstance(obj, Signal):
sim.wr(obj, value)
for k, v in value.items():
_sim_multiwrite(sim, getattr(obj, k), v)
+
# Generators yield None or a tuple of Tokens.
# Tokens for Sink endpoints are pulled and the "value" field filled in.
# Tokens for Source endpoints are pushed according to their "value" field.
self._update_control_signals(selfp)
do_simulation.passive = True
+
class SimActor(Module):
def __init__(self, generator):
self.busy = Signal()
selfp.busy = self.token_exchanger.busy
do_simulation.passive = True
+
def _dumper_gen(prefix):
while True:
t = Token("result")
s = str(list(t.value.values())[0])
print(prefix + s)
+
class Dumper(SimActor):
def __init__(self, layout, prefix=""):
self.result = Sink(layout)
from migen.flow import plumbing
from migen.actorlib import misc
+
# layout is a list of tuples, either:
# - (name, nbits, [reset value], [alignment bits])
# - (name, sublayout)
(MODE_EXTERNAL, MODE_SINGLE_SHOT, MODE_CONTINUOUS) = range(3)
+
class SingleGenerator(Module, AutoCSR):
def __init__(self, layout, mode):
self.source = Source(_convert_layout(layout))
self.sync += If(self.source.ack | ~self.source.stb,
getattr(target, name).eq(reg.storage))
+
class Collector(Module, AutoCSR):
def __init__(self, layout, depth=1024):
self.sink = Sink(layout)
self._rd.status.eq(rp.dat_r)
]
+
class _DMAController(Module):
def __init__(self, bus_accessor, bus_aw, bus_dw, mode, base_reset=0, length_reset=0):
self.alignment_bits = bits_for(bus_dw//8) - 1
def get_csrs(self):
return self.generator.get_csrs() + [self.r_busy]
+
class DMAReadController(_DMAController):
def __init__(self, bus_accessor, *args, **kwargs):
bus_aw = flen(bus_accessor.address.a)
self.busy = comp_actor.busy
self.comb += self.r_busy.status.eq(self.busy)
+
class DMAWriteController(_DMAController):
def __init__(self, bus_accessor, *args, ack_when_inactive=False, **kwargs):
bus_aw = flen(bus_accessor.address_data.a)
from migen.genlib.record import *
from migen.flow.actor import *
+
def _rawbits_layout(l):
if isinstance(l, int):
return [("rawbits", l)]
else:
return l
+
class Cast(CombinatorialActor):
def __init__(self, layout_from, layout_to, reverse_from=False, reverse_to=False):
self.sink = Sink(_rawbits_layout(layout_from))
raise TypeError
self.comb += Cat(*sigs_to).eq(Cat(*sigs_from))
+
def pack_layout(l, n):
return [("chunk"+str(i), l) for i in range(n)]
+
class Unpack(Module):
def __init__(self, n, layout_to, reverse=False):
self.source = source = Source(layout_to)
source.eop.eq(sink.eop & last)
]
+
class Pack(Module):
def __init__(self, layout_from, n, reverse=False):
self.sink = sink = Sink(layout_from)
)
]
+
class Chunkerize(CombinatorialActor):
def __init__(self, layout_from, layout_to, n, reverse=False):
self.sink = Sink(layout_from)
dst = getattr(self.source, f[0])
self.comb += dst.eq(src)
+
class Unchunkerize(CombinatorialActor):
def __init__(self, layout_from, n, layout_to, reverse=False):
if isinstance(layout_from, EndpointDescription):
dst = getattr(self.source, f[0])
self.comb += dst.eq(src)
+
class Converter(Module):
def __init__(self, layout_from, layout_to, reverse=False):
self.sink = Sink(layout_from)
else:
self.comb += Record.connect(self.sink, self.source)
+
class Pipeline(Module):
def __init__(self, *modules):
self.busy = Signal()
from migen.fhdl.std import Module, bits_for
from migen.bank.description import CSR
+
class GenericBank(Module):
def __init__(self, description, busword):
# Turn description into simple CSRs and claim ownership of compound CSR modules
self.submodules += c
self.decode_bits = bits_for(len(self.simple_csrs)-1)
+
def get_offset(description, name, busword):
offset = 0
for c in description:
from migen.bus import csr
from migen.bank.bank import GenericBank
+
class Bank(GenericBank):
def __init__(self, description, address=0, bus=None):
if bus is None:
If(sel, Case(self.bus.adr[:self.decode_bits], brcases))
]
+
# address_map(name, memory) returns the CSR offset at which to map
# the CSR object (register bank or memory).
# If memory=None, the object is the register bank of object source.name.
from migen.fhdl.std import *
from migen.fhdl.tracer import get_obj_var_name
+
class _CSRBase(HUID):
def __init__(self, size, name):
HUID.__init__(self)
raise ValueError("Cannot extract CSR name from code, need to specify.")
self.size = size
+
class CSR(_CSRBase):
def __init__(self, size=1, name=None):
_CSRBase.__init__(self, size, name)
self.r = Signal(self.size, name=self.name + "_r")
self.w = Signal(self.size, name=self.name + "_w")
+
class _CompoundCSR(_CSRBase, Module):
def __init__(self, size, name):
_CSRBase.__init__(self, size, name)
def do_finalize(self, busword):
raise NotImplementedError
+
class CSRStatus(_CompoundCSR):
def __init__(self, size=1, reset=0, name=None):
_CompoundCSR.__init__(self, size, name)
self.comb += sc.w.eq(self.status[i*busword:i*busword+nbits])
self.simple_csrs.append(sc)
+
class CSRStorage(_CompoundCSR):
def __init__(self, size=1, reset=0, atomic_write=False, write_from_dev=False, alignment_bits=0, name=None):
_CompoundCSR.__init__(self, size, name)
self.sync += If(sc.re, self.storage_full[lo:hi].eq(sc.r))
self.sync += self.re.eq(sc.re)
+
def csrprefix(prefix, csrs, done):
for csr in csrs:
if csr.huid not in done:
csr.name = prefix + csr.name
done.add(csr.huid)
+
def memprefix(prefix, memories, done):
for memory in memories:
if memory.huid not in done:
memory.name_override = prefix + memory.name_override
done.add(memory.huid)
+
class AutoCSR:
def get_memories(self):
try:
from migen.bank.description import *
from migen.genlib.misc import optree
+
class _EventSource(HUID):
def __init__(self):
HUID.__init__(self)
self.trigger = Signal() # trigger signal interface to the user design
self.clear = Signal() # clearing attempt by W1C to pending register, ignored by some event sources
+
# set on a positive trigger pulse
class EventSourcePulse(Module, _EventSource):
def __init__(self):
If(self.trigger, self.pending.eq(1))
]
+
# set on the falling edge of the trigger, status = trigger
class EventSourceProcess(Module, _EventSource):
def __init__(self):
If(~self.trigger & old_trigger, self.pending.eq(1))
]
+
# all status set by external trigger
class EventSourceLevel(Module, _EventSource):
def __init__(self):
self.pending.eq(self.trigger)
]
+
class EventManager(Module, AutoCSR):
def __init__(self):
self.irq = Signal()
raise FinalizeError
self.submodules += value
+
class SharedIRQ(Module):
def __init__(self, *event_managers):
self.irq = Signal()
from migen.bus import wishbone
from migen.bank.bank import GenericBank
+
class Bank(GenericBank):
def __init__(self, description, bus=None):
if bus is None:
("dat_r", "data_width", DIR_S_TO_M)
]
+
class Interface(Record):
def __init__(self, data_width=8, address_width=14):
Record.__init__(self, set_layout_parameters(_layout,
data_width=data_width, address_width=address_width))
+
class Interconnect(Module):
def __init__(self, master, slaves):
self.comb += master.connect(*slaves)
+
class Initiator(Module):
def __init__(self, generator, bus=None):
self.generator = generator
selfp.bus.we = 1
selfp.bus.dat_w = self.transaction.data
+
class SRAM(Module):
def __init__(self, mem_or_size, address, read_only=None, init=None, bus=None):
if bus is None:
from migen.fhdl.std import *
from migen.bus.transactions import *
+
def _byte_mask(orig, dat_w, sel):
r = 0
shift = 0
shift += 8
return r
+
class Initiator(Module):
def __init__(self, generator, mem):
self.generator = generator
from migen.fhdl.std import *
+
class Transaction:
def __init__(self, address, data=0, sel=None, busname=None):
self.address = address
def __str__(self):
return "<" + self.__class__.__name__ + " adr:" + hex(self.address) + " dat:" + hex(self.data) + ">"
+
class TRead(Transaction):
pass
+
class TWrite(Transaction):
pass
("err", 1, DIR_S_TO_M)
]
+
class Interface(Record):
def __init__(self, data_width=32):
Record.__init__(self, set_layout_parameters(_layout,
data_width=data_width,
sel_width=data_width//8))
+
class InterconnectPointToPoint(Module):
def __init__(self, master, slave):
self.comb += master.connect(slave)
+
class Arbiter(Module):
def __init__(self, masters, target):
self.submodules.rr = roundrobin.RoundRobin(len(masters))
reqs = [m.cyc for m in masters]
self.comb += self.rr.request.eq(Cat(*reqs))
+
class Decoder(Module):
# slaves is a list of pairs:
# 0) function that takes the address signal and returns a FHDL expression
masked = [Replicate(slave_sel_r[i], flen(master.dat_r)) & slaves[i][1].dat_r for i in range(ns)]
self.comb += master.dat_r.eq(optree("|", masked))
+
class InterconnectShared(Module):
def __init__(self, masters, slaves, register=False):
shared = Interface()
self.submodules += Arbiter(masters, shared)
self.submodules += Decoder(shared, slaves, register)
+
class Crossbar(Module):
def __init__(self, masters, slaves, register=False):
matches, busses = zip(*slaves)
for column, bus in zip(zip(*access), busses):
self.submodules += Arbiter(column, bus)
+
class DownConverter(Module):
# DownConverter splits Wishbone accesses of N bits in M accesses of L bits where:
# N is the original data-width
)
)
+
class Tap(Module):
def __init__(self, bus, handler=print):
self.bus = bus
self.handler(transaction)
do_simulation.passive = True
+
class Initiator(Module):
def __init__(self, generator, bus=None):
self.generator = generator
selfp.bus.cyc = 0
selfp.bus.stb = 0
+
class TargetModel:
def read(self, address):
return 0
def can_ack(self, bus):
return True
+
class Target(Module):
def __init__(self, model, bus=None):
if bus is None:
bus.ack = 0
do_simulation.passive = True
+
class SRAM(Module):
def __init__(self, mem_or_size, read_only=None, init=None, bus=None):
if bus is None:
from migen.bus import csr
from migen.genlib.misc import timeline
+
class WB2CSR(Module):
def __init__(self, bus_wishbone=None, bus_csr=None):
if bus_wishbone is None:
from migen.fhdl import structure as f
+
def log2_int(n, need_pow2=True):
l = 1
r = 0
raise ValueError("Not a power of 2")
return r
+
def bits_for(n, require_sign_bit=False):
if n > 0:
r = log2_int(n + 1, False)
r += 1
return r
+
def value_bits_sign(v):
if isinstance(v, bool):
return 1, False
raise TypeError("Can not calculate bit length of {} {}".format(
type(v), v))
+
def flen(v):
"""Bit length of an expression
"""
return value_bits_sign(v)[0]
+
def fiter(v):
"""Bit iterator
else:
raise TypeError("Can not bit-iterate {} {}".format(type(v), v))
+
def fslice(v, s):
"""Bit slice
else:
raise TypeError("Can not bit-slice {} {}".format(type(v), v))
+
def freversed(v):
"""Bit reverse
from migen.fhdl.module import Module
from migen.fhdl.tools import insert_reset, rename_clock_domain
+
class ModuleTransformer:
# overload this in derived classes
def transform_instance(self, i):
warnings.warn("deprecated, use the plain transformer", DeprecationWarning, 2)
return cls(*args, **kwargs)(i)
+
def DecorateModule(transformer, *args, **kwargs):
warnings.warn("deprecated, use the plain transformer", DeprecationWarning, 2)
return transformer.__self__(*args, **kwargs)
+
class ControlInserter(ModuleTransformer):
control_name = None # override this
for cdn in self.clock_domains]
self.transform_fragment_insert(i, f, to_insert)
+
class CEInserter(ControlInserter):
control_name = "ce"
InsertCE = CEInserter.adhoc
+
class ResetInserter(ControlInserter):
control_name = "reset"
InsertReset = ResetInserter.adhoc
+
class ClockDomainsRenamer(ModuleTransformer):
def __init__(self, cd_remapping):
if isinstance(cd_remapping, str):
_Instance = namedtuple("_Instance", "name cell properties")
_NetBranch = namedtuple("_NetBranch", "portname instancename")
+
def _write_cells(cells):
r = ""
for cell in cells:
)"""
return r
+
def _write_io(ios):
r = ""
for s in ios:
(port {0.name} (direction {0.direction}))""".format(s)
return r
+
def _write_instantiations(instances, cell_library):
instantiations = ""
for instance in instances:
)"""
return instantiations
+
def _write_connections(connections):
r = ""
for netname, branches in connections.items():
)"""
return r
+
def _write_edif(cells, ios, instances, connections, cell_library, design_name, part, vendor):
r = """(edif {0}
(edifVersion 2 0 0)
return r
+
def _generate_cells(f):
cell_dict = OrderedDict()
for special in f.specials:
raise ValueError("EDIF conversion can only handle synthesized fragments")
return [_Cell(k, v) for k, v in cell_dict.items()]
+
def _generate_instances(f,ns):
instances = []
for special in f.specials:
raise ValueError("EDIF conversion can only handle synthesized fragments")
return instances
+
def _generate_ios(f, ios, ns):
outs = list_special_ios(f, False, True, False)
inouts = list_special_ios(f, False, False, True)
r.append(_Port(name=ns.get_name(io), direction=direction))
return r
+
def _generate_connections(f, ios, ns):
r = OrderedDict()
for special in f.specials:
r[io].append(_NetBranch(portname=io, instancename=""))
return r
+
def convert(f, ios, cell_library, vendor, device, name="top"):
if not isinstance(f, _Fragment):
f = f.get_fragment()
from migen.fhdl.tools import rename_clock_domain
from migen.sim.upper import gen_sim, proxy_sim
+
class FinalizeError(Exception):
pass
+
def _flat_list(e):
if isinstance(e, collections.Iterable):
return flat_iteration(e)
else:
return [e]
+
class _ModuleProxy:
def __init__(self, fm):
object.__setattr__(self, "_fm", fm)
+
class _ModuleComb(_ModuleProxy):
def __iadd__(self, other):
self._fm._fragment.comb += _flat_list(other)
return self
+
def _cd_append(d, key, statements):
try:
l = d[key]
d[key] = l
l += _flat_list(statements)
+
class _ModuleSyncCD:
def __init__(self, fm, cd):
self._fm = fm
_cd_append(self._fm._fragment.sync, self._cd, other)
return self
+
class _ModuleSync(_ModuleProxy):
def __iadd__(self, other):
_cd_append(self._fm._fragment.sync, "sys", other)
if not isinstance(value, _ModuleSyncCD):
raise AttributeError("Attempted to assign sync property - use += instead")
+
# _ModuleForwardAttr enables user classes to do e.g.:
# self.subm.foobar = SomeModule()
# and then access the submodule with self.foobar.
self.__iadd__(value)
setattr(self._fm, name, value)
+
class _ModuleSpecials(_ModuleProxy, _ModuleForwardAttr):
def __iadd__(self, other):
self._fm._fragment.specials |= set(_flat_list(other))
return self
+
class _ModuleSubmodules(_ModuleProxy):
def __setattr__(self, name, value):
self._fm._submodules += [(name, e) for e in _flat_list(value)]
self._fm._submodules += [(None, e) for e in _flat_list(other)]
return self
+
class _ModuleClockDomains(_ModuleProxy, _ModuleForwardAttr):
def __iadd__(self, other):
self._fm._fragment.clock_domains += _flat_list(other)
return self
+
class Module:
def get_fragment(self):
assert(not self.get_fragment_called)
from migen.fhdl.structure import *
+
class _Node:
def __init__(self):
self.signal_count = 0
self.use_number = False
self.children = OrderedDict()
+
def _display_tree(filename, tree):
from migen.util.treeviz import RenderNode
top = _to_render_node("top", tree)
top.to_svg(filename)
+
def _build_tree(signals, basic_tree=None):
root = _Node()
for signal in signals:
current.signal_count += 1
return root
+
def _set_use_name(node, node_name=""):
cnames = [(k, _set_use_name(v, k)) for k, v in node.children.items()]
for (c1_prefix, c1_names), (c2_prefix, c2_names) in combinations(cnames, 2):
return r
+
def _name_signal(tree, signal):
elements = []
treepos = tree
elements.append(elname)
return "_".join(elements)
+
def _build_pnd_from_tree(tree, signals):
return dict((signal, _name_signal(tree, signal)) for signal in signals)
+
def _invert_pnd(pnd):
inv_pnd = dict()
for k, v in pnd.items():
inv_pnd[v].append(k)
return inv_pnd
+
def _list_conflicting_signals(pnd):
inv_pnd = _invert_pnd(pnd)
r = set()
r.update(v)
return r
+
def _set_use_number(tree, signals):
for signal in signals:
current = tree
_debug = False
+
def _build_pnd_for_group(group_n, signals):
basic_tree = _build_tree(signals)
_set_use_name(basic_tree)
return pnd
+
def _build_signal_groups(signals):
r = []
for signal in signals:
s1 -= s2
return r
+
def _build_pnd(signals):
groups = _build_signal_groups(signals)
gpnds = [_build_pnd_for_group(n, gsignals) for n, gsignals in enumerate(groups)]
return pnd
+
def build_namespace(signals):
pnd = _build_pnd(signals)
ns = Namespace(pnd)
ns.get_name(signal)
return ns
+
class Namespace:
def __init__(self, pnd):
self.counts = {}
from migen.fhdl.decorators import ModuleTransformer
from migen.util.misc import gcd_multiple
+
class FullMemoryWE(ModuleTransformer):
def transform_fragment(self, i, f):
newspecials = set()
from migen.fhdl.tracer import get_obj_var_name
from migen.fhdl.verilog import _printexpr as verilog_printexpr
+
class Special(HUID):
def iter_expressions(self):
for x in []:
r.update(signals)
return r
+
class Tristate(Special):
def __init__(self, target, o, oe, i=None):
Special.__init__(self)
r += "\n"
return r
+
class TSTriple:
def __init__(self, bits_sign=None, min=None, max=None, reset_o=0, reset_oe=0):
self.o = Signal(bits_sign, min=min, max=max, reset=reset_o)
def get_tristate(self, target):
return Tristate(target, self.o, self.oe, self.i)
+
class Instance(Special):
class _IO:
def __init__(self, name, expr=None):
(READ_FIRST, WRITE_FIRST, NO_CHANGE) = range(3)
+
class _MemoryPort(Special):
def __init__(self, adr, dat_r, we=None, dat_w=None,
async_read=False, re=None, we_granularity=0, mode=WRITE_FIRST,
def emit_verilog(port, ns, add_data_file):
return "" # done by parent Memory object
+
class Memory(Special):
def __init__(self, width, depth, init=None, name=None):
Special.__init__(self)
return r
+
class SynthesisDirective(Special):
def __init__(self, template, **signals):
Special.__init__(self)
from migen.fhdl import tracer
from migen.util.misc import flat_iteration
+
class HUID:
__next_uid = 0
def __init__(self):
def __hash__(self):
return self.huid
+
class Value(HUID):
"""Base class for operands
def __hash__(self):
return HUID.__hash__(self)
+
class _Operator(Value):
def __init__(self, op, operands):
Value.__init__(self)
self.op = op
self.operands = operands
+
def Mux(sel, val1, val0):
"""Multiplex between two values
"""
return _Operator("m", [sel, val1, val0])
+
class _Slice(Value):
def __init__(self, value, start, stop):
Value.__init__(self)
self.start = start
self.stop = stop
+
class Cat(Value):
"""Concatenate values
Value.__init__(self)
self.l = list(flat_iteration(args))
+
class Replicate(Value):
"""Replicate a value
self.v = v
self.n = n
+
class Signal(Value):
"""A `Value` that can change
from migen.fhdl.bitcontainer import value_bits_sign
return cls(bits_sign=value_bits_sign(other), **kwargs)
+
class ClockSignal(Value):
"""Clock signal for a given clock domain
Value.__init__(self)
self.cd = cd
+
class ResetSignal(Value):
"""Reset signal for a given clock domain
# statements
+
class _Assign:
def __init__(self, l, r):
self.l = l
self.r = r
+
class If:
"""Conditional execution of statements
_insert_else(self, [If(cond, *t)])
return self
+
def _insert_else(obj, clause):
o = obj
while o.f:
o = o.f[0]
o.f = clause
+
class Case:
"""Case/Switch statement
# arrays
+
class _ArrayProxy(Value):
def __init__(self, choices, key):
self.choices = choices
return _ArrayProxy([choice.__getitem__(key) for choice in self.choices],
self.key)
+
class Array(list):
"""Addressable multiplexer
else:
return list.__getitem__(self, key)
+
class ClockDomain:
"""Synchronous domain
if self.rst is not None:
self.rst.name_override = new_name + "_rst"
+
class _ClockDomainList(list):
def __getitem__(self, key):
if isinstance(key, str):
(SPECIAL_INPUT, SPECIAL_OUTPUT, SPECIAL_INOUT) = range(3)
+
class StopSimulation(Exception):
pass
+
class _Fragment:
def __init__(self, comb=None, sync=None, specials=None, clock_domains=None, sim=None):
if comb is None: comb = []
from migen.fhdl.bitcontainer import value_bits_sign
from migen.util.misc import flat_iteration
+
class _SignalLister(NodeVisitor):
def __init__(self):
self.output_list = set()
def visit_Signal(self, node):
self.output_list.add(node)
+
class _TargetLister(NodeVisitor):
def __init__(self):
self.output_list = set()
for choice in node.choices:
self.visit(choice)
+
def list_signals(node):
lister = _SignalLister()
lister.visit(node)
return lister.output_list
+
def list_targets(node):
lister = _TargetLister()
lister.visit(node)
return lister.output_list
+
def _resort_statements(ol):
return [statement for i, statement in
sorted(ol, key=lambda x: x[0])]
+
def group_by_targets(sl):
groups = []
seen = set()
return [(targets, _resort_statements(stmts))
for targets, stmts in groups]
+
def list_special_ios(f, ins, outs, inouts):
r = set()
for special in f.specials:
r |= special.list_ios(ins, outs, inouts)
return r
+
class _ClockDomainLister(NodeVisitor):
def __init__(self):
self.clock_domains = set()
self.clock_domains.add(clockname)
self.visit(statements)
+
def list_clock_domains_expr(f):
cdl = _ClockDomainLister()
cdl.visit(f)
return cdl.clock_domains
+
def list_clock_domains(f):
r = list_clock_domains_expr(f)
for special in f.specials:
r.add(cd.name)
return r
+
def is_variable(node):
if isinstance(node, Signal):
return node.variable
else:
raise TypeError
+
def generate_reset(rst, sl):
targets = list_targets(sl)
return [t.eq(t.reset) for t in sorted(targets, key=lambda x: x.huid)]
+
def insert_reset(rst, sl):
return [If(rst, *generate_reset(rst, sl)).Else(*sl)]
+
def insert_resets(f):
newsync = dict()
for k, v in f.sync.items():
newsync[k] = v
f.sync = newsync
+
class _Lowerer(NodeTransformer):
def __init__(self):
self.target_context = False
self.target_context, self.extra_stmts = old_target_context, old_extra_stmts
return r
+
# Basics are FHDL structure elements that back-ends are not required to support
# but can be expressed in terms of other elements (lowered) before conversion.
class _BasicLowerer(_Lowerer):
def visit_ResetSignal(self, node):
return self.clock_domains[node.cd].rst
+
class _ComplexSliceLowerer(_Lowerer):
def visit_Slice(self, node):
if not isinstance(node.value, Signal):
node = _Slice(slice_proxy, node.start, node.stop)
return NodeTransformer.visit_Slice(self, node)
+
def _apply_lowerer(l, f):
f = l.visit(f)
f.comb += l.comb
return f
+
def lower_basics(f):
return _apply_lowerer(_BasicLowerer(f.clock_domains), f)
+
def lower_complex_slices(f):
return _apply_lowerer(_ComplexSliceLowerer(), f)
+
class _ClockDomainRenamer(NodeVisitor):
def __init__(self, old, new):
self.old = old
if node.cd == self.old:
node.cd = self.new
+
def rename_clock_domain_expr(f, old, new):
cdr = _ClockDomainRenamer(old, new)
cdr.visit(f)
+
def rename_clock_domain(f, old, new):
rename_clock_domain_expr(f, old, new)
if old in f.sync:
from opcode import opname
from collections import defaultdict
+
def get_var_name(frame):
code = frame.f_code
call_index = frame.f_lasti
else:
return None
+
def remove_underscore(s):
if len(s) > 2 and s[0] == "_" and s[1] != "_":
s = s[1:]
return s
+
def get_obj_var_name(override=None, default=None):
if override:
return override
name_to_idx = defaultdict(int)
classname_to_objs = dict()
+
def index_id(l, obj):
for n, e in enumerate(l):
if id(e) == id(obj):
return n
raise ValueError
+
def trace_back(varname=None):
l = []
frame = inspect.currentframe().f_back.f_back
from migen.fhdl.namer import Namespace, build_namespace
from migen.fhdl.conv_output import ConvOutput
+
def _printsig(ns, s):
if s.signed:
n = "signed "
n += ns.get_name(s)
return n
+
def _printintbool(node):
if isinstance(node, bool):
if node:
else:
raise TypeError
+
def _printexpr(ns, node):
if isinstance(node, (int, bool)):
return _printintbool(node)
(_AT_BLOCKING, _AT_NONBLOCKING, _AT_SIGNAL) = range(3)
+
def _printnode(ns, at, level, node):
if node is None:
return ""
else:
raise TypeError("Node of unrecognized type: "+str(type(node)))
+
def _list_comb_wires(f):
r = set()
groups = group_by_targets(f.comb)
r |= g[0]
return r
+
def _printheader(f, ios, name, ns):
sigs = list_signals(f) | list_special_ios(f, True, True, True)
special_outs = list_special_ios(f, False, True, True)
r += "\n"
return r
+
def _printcomb(f, ns, display_run):
r = ""
if f.comb:
r += "\n"
return r
+
def _printsync(f, ns):
r = ""
for k, v in sorted(f.sync.items(), key=itemgetter(0)):
r += "end\n\n"
return r
+
def _call_special_classmethod(overrides, obj, method, *args, **kwargs):
cl = obj.__class__
if cl in overrides:
else:
return None
+
def _lower_specials_step(overrides, specials):
f = _Fragment()
lowered_specials = set()
lowered_specials.add(special)
return f, lowered_specials
+
def _can_lower(overrides, specials):
for special in specials:
cl = special.__class__
return True
return False
+
def _lower_specials(overrides, specials):
f, lowered_specials = _lower_specials_step(overrides, specials)
while _can_lower(overrides, f.specials):
f.specials -= lowered_specials2
return f, lowered_specials
+
def _printspecials(overrides, specials, ns, add_data_file):
r = ""
for special in sorted(specials, key=lambda x: x.huid):
r += pr
return r
+
def convert(f, ios=None, name="top",
special_overrides=dict(),
create_clock_domains=True,
from migen.fhdl.structure import *
from migen.fhdl.structure import _Operator, _Slice, _Assign, _ArrayProxy, _Fragment
+
class NodeVisitor:
def visit(self, node):
if isinstance(node, (int, bool)):
def visit_unknown(self, node):
pass
+
# Default methods always copy the node, except for:
# - Signals, ClockSignals and ResetSignals
# - Unknown objects
from migen.genlib.misc import optree
from migen.genlib.record import *
+
def _make_m2s(layout):
r = []
for f in layout:
r.append((f[0], _make_m2s(f[1])))
return r
+
class EndpointDescription:
def __init__(self, payload_layout, param_layout=[], packetized=False):
self.payload_layout = payload_layout
except:
return getattr(object.__getattribute__(self, "param"), name)
+
class Source(_Endpoint):
def connect(self, sink):
return Record.connect(self, sink)
+
class Sink(_Endpoint):
def connect(self, source):
return source.connect(self)
+
def get_endpoints(obj, filt=_Endpoint):
if hasattr(obj, "get_endpoints") and callable(obj.get_endpoints):
return obj.get_endpoints(filt)
r[k] = v
return r
+
def get_single_ep(obj, filt):
eps = get_endpoints(obj, filt)
if len(eps) != 1:
raise ValueError("More than one endpoint")
return list(eps.items())[0]
+
class BinaryActor(Module):
def __init__(self, *args, **kwargs):
self.busy = Signal()
def build_binary_control(self, sink, source):
raise NotImplementedError("Binary actor classes must overload build_binary_control_fragment")
+
class CombinatorialActor(BinaryActor):
def build_binary_control(self, sink, source):
self.comb += [
source.eop.eq(sink.eop)
]
+
class SequentialActor(BinaryActor):
def __init__(self, delay):
self.trigger = Signal()
source.eop.eq(sink.eop)
]
+
class PipelinedActor(BinaryActor):
def __init__(self, latency):
self.pipe_ce = Signal()
from migen.fhdl.std import *
from migen.flow.actor import *
+
class EndpointSimHook(Module):
def __init__(self, endpoint):
self.endpoint = endpoint
else:
self.on_inactive()
+
class DFGHook(Module):
def __init__(self, dfg, create):
assert(not dfg.is_abstract())
ISD_MAGIC = 0x6ab4
+
class EndpointReporter(Module, AutoCSR):
def __init__(self, endpoint, nbits):
self.reset = Signal()
)
]
+
class DFGReporter(DFGHook, AutoCSR):
def __init__(self, dfg, nbits):
self._magic = CSRStatus(16)
# from the dictionary. They are needed to enable actor duplication or sharing during
# elaboration, and automatic parametrization of plumbing actors.
+
class AbstractActor:
def __init__(self, actor_class, parameters=dict(), name=None):
self.actor_class = actor_class
r += ">"
return r
+
class MultiDiGraph:
def __init__(self):
self.edges = defaultdict(list)
e.append((source, sink, edge))
return e
+
# TODO: rewrite this without non-determinism
class DataFlowGraph(MultiDiGraph):
def __init__(self):
optimizer(self)
self._instantiate_actors()
+
class CompositeActor(Module):
def __init__(self, dfg):
dfg.elaborate()
from migen.flow.hooks import *
+
class EndpointReporter(EndpointSimHook):
def __init__(self, endpoint):
EndpointSimHook.__init__(self, endpoint)
def on_inactive(self):
self.inactive += 1
+
class DFGReporter(DFGHook):
def __init__(self, dfg):
DFGHook.__init__(self, dfg, lambda u, ep, v: EndpointReporter(getattr(u, ep)))
from migen.genlib.record import *
from migen.genlib.misc import optree
+
class Buffer(PipelinedActor):
def __init__(self, layout):
self.d = Sink(layout)
self.q.param.eq(self.d.param)
)
+
class Combinator(Module):
def __init__(self, layout, subrecords):
self.source = Source(layout)
self.comb += [self.source.payload.eq(sink.payload) for sink in sinks]
self.comb += [self.source.param.eq(sink.param) for sink in sinks]
+
class Splitter(Module):
def __init__(self, layout, subrecords):
self.sink = Sink(layout)
for n, s in enumerate(sources):
self.comb += s.stb.eq(self.sink.stb & ~already_acked[n])
+
class Multiplexer(Module):
def __init__(self, layout, n):
self.source = Source(layout)
cases[i] = Record.connect(sink, self.source)
self.comb += Case(self.sel, cases)
+
class Demultiplexer(Module):
def __init__(self, layout, n):
self.sink = Sink(layout)
from migen.fhdl.specials import Special
from migen.fhdl.tools import list_signals
+
class NoRetiming(Special):
def __init__(self, reg):
Special.__init__(self)
def lower(dr):
return Module()
+
class MultiRegImpl(Module):
def __init__(self, i, o, odomain, n):
self.i = i
self.comb += self.o.eq(src)
self.specials += [NoRetiming(reg) for reg in self.regs]
+
class MultiReg(Special):
def __init__(self, i, o, odomain="sys", n=2):
Special.__init__(self)
def lower(dr):
return MultiRegImpl(dr.i, dr.o, dr.odomain, dr.n)
+
class PulseSynchronizer(Module):
def __init__(self, idomain, odomain):
self.i = Signal()
sync_o += toggle_o_r.eq(toggle_o)
self.comb += self.o.eq(toggle_o ^ toggle_o_r)
+
class GrayCounter(Module):
def __init__(self, width):
self.ce = Signal()
Encoders and decoders between binary and one-hot representation
"""
+
class Encoder(Module):
"""Encode one-hot to binary
act["default"] = self.n.eq(1)
self.comb += Case(self.i, act)
+
class PriorityEncoder(Module):
"""Priority encode requests to binary
self.comb += If(self.i[j], self.o.eq(j))
self.comb += self.n.eq(self.i == 0)
+
class Decoder(Module):
"""Decode binary to one-hot
self.comb += Case(self.i, act)
self.comb += If(self.n, self.o.eq(0))
+
class PriorityDecoder(Decoder):
pass # same
from migen.fhdl.std import *
+
class Complex:
def __init__(self, real, imag):
self.real = real
else:
return self.real.eq(r), self.imag.eq(0)
+
def SignalC(*args, **kwargs):
real = Signal(*args, **kwargs)
imag = Signal(*args, **kwargs)
from migen.fhdl.std import *
+
class Divider(Module):
def __init__(self, w):
self.start_i = Signal()
from migen.genlib.cdc import NoRetiming, MultiReg, GrayCounter
from migen.genlib.record import layout_len, Record
+
def _inc(signal, modulo):
if modulo == 2**flen(signal):
return signal.eq(signal + 1)
signal.eq(signal + 1)
)
+
class _FIFOInterface:
"""
Data written to the input interface (`din`, `we`, `writable`) is
self.dout_bits = self.dout
self.width = width_or_layout
+
class SyncFIFO(Module, _FIFOInterface):
"""Synchronous FIFO (first in, first out)
self.readable.eq(self.level != 0)
]
+
class SyncFIFOBuffered(Module, _FIFOInterface):
def __init__(self, width_or_layout, depth):
_FIFOInterface.__init__(self, width_or_layout, depth)
)
self.comb += self.level.eq(fifo.level + self.readable)
+
class AsyncFIFO(Module, _FIFOInterface):
"""Asynchronous FIFO (first in, first out)
from migen.fhdl.visit import NodeTransformer
from migen.fhdl.bitcontainer import value_bits_sign
+
class AnonymousState:
pass
+
# do not use namedtuple here as it inherits tuple
# and the latter is used elsewhere in FHDL
class NextState:
def __init__(self, state):
self.state = state
+
class NextValue:
def __init__(self, register, value):
self.register = register
self.value = value
+
class _LowerNext(NodeTransformer):
def __init__(self, next_state_signal, encoding, aliases):
self.next_state_signal = next_state_signal
else:
return node
+
class FSM(Module):
def __init__(self, reset_state=None):
self.actions = OrderedDict()
from migen.fhdl.specials import Special
from migen.fhdl.tools import list_signals
+
class DifferentialInput(Special):
def __init__(self, i_p, i_n, o):
Special.__init__(self)
def lower(dr):
raise NotImplementedError("Attempted to use a differential input, but platform does not support them")
+
class DifferentialOutput(Special):
def __init__(self, i, o_p, o_n):
Special.__init__(self)
def lower(dr):
raise NotImplementedError("Attempted to use a differential output, but platform does not support them")
+
class CRG(Module):
def __init__(self, clk, rst=0):
self.clock_domains.cd_sys = ClockDomain()
self.cd_sys.rst.eq(~rst_n)
]
+
class DDRInput(Special):
def __init__(self, i, o1, o2, clk=ClockSignal()):
Special.__init__(self)
def lower(dr):
raise NotImplementedError("Attempted to use a DDR input, but platform does not support them")
+
class DDROutput(Special):
def __init__(self, i1, i2, o, clk=ClockSignal()):
Special.__init__(self)
from migen.fhdl.std import *
from migen.fhdl.structure import _Operator
+
def optree(op, operands, lb=None, ub=None, default=None):
if lb is None:
lb = 0
[optree(op, operands, lb, s, default),
optree(op, operands, s, ub, default)])
+
def split(v, *counts):
r = []
offset = 0
offset += n
return tuple(r)
+
def displacer(signal, shift, output, n=None, reverse=False):
if shift is None:
return output.eq(signal)
l = [Replicate(shift == i, w) & signal for i in r]
return output.eq(Cat(*l))
+
def chooser(signal, shift, output, n=None, reverse=False):
if shift is None:
return output.eq(signal)
cases[i] = [output.eq(signal[s*w:(s+1)*w])]
return Case(shift, cases).makedefault()
+
def timeline(trigger, events):
lastevent = max([e[0] for e in events])
counter = Signal(max=lastevent+1)
sync.append(counterlogic)
return sync
+
@ResetInserter()
@CEInserter()
class FlipFlop(Module):
self.q = Signal(*args, **kwargs)
self.sync += self.q.eq(self.d)
+
@ResetInserter()
@CEInserter()
class Counter(Module):
self.width = flen(self.value)
self.sync += self.value.eq(self.value+increment)
+
@ResetInserter()
@CEInserter()
class Timeout(Module):
# size can be an int, or a (int, bool) tuple for signed numbers
# sublayout must be a list
+
def set_layout_parameters(layout, **layout_dict):
def resolve(p):
if isinstance(p, str):
raise TypeError
return r
+
def layout_len(layout):
r = 0
for f in layout:
r += fsize
return r
+
def layout_get(layout, name):
for f in layout:
if f[0] == name:
return f
raise KeyError(name)
+
def layout_partial(layout, *elements):
r = []
for path in elements:
insert_ref.append(layout_get(copy_ref, last))
return r
+
class Record:
def __init__(self, layout, name=None):
self.name = get_obj_var_name(name, "")
from migen.fhdl.specials import Special
from migen.fhdl.tools import list_signals
+
class AsyncResetSynchronizer(Special):
def __init__(self, cd, async_reset):
Special.__init__(self)
from migen.fhdl.std import *
+
class ReorderSlot:
def __init__(self, tag_width, data_width):
self.wait_data = Signal()
self.tag = Signal(tag_width)
self.data = Signal(data_width)
+
class ReorderBuffer(Module):
def __init__(self, tag_width, data_width, depth):
# issue
(SP_WITHDRAW, SP_CE) = range(2)
+
class RoundRobin(Module):
def __init__(self, n, switch_policy=SP_WITHDRAW):
self.request = Signal(n)
from migen.fhdl.std import *
from migen.fhdl import verilog
+
class BitonicSort(Module):
"""Combinatorial sorting network
from migen.sim.ipc import *
from migen.sim import icarus
+
class TopLevel:
def __init__(self, vcd_name=None, vcd_level=1,
top_name="top", dut_type="dut", dut_name="dut",
r += "\nendmodule"
return r
+
class Simulator:
def __init__(self, fragment, top_level=None, sim_runner=None, sockaddr="simsocket", **vopts):
if not isinstance(fragment, _Fragment):
def __exit__(self, type, value, traceback):
self.close()
+
def run_simulation(fragment, ncycles=None, vcd_name=None, **kwargs):
with Simulator(fragment, TopLevel(vcd_name), icarus.Runner(**kwargs)) as s:
s.run(ncycles)
# Message classes
#
+
class Int32(int):
pass
+
class Message:
def __init__(self, *pvalues):
for parameter, value in zip(self.parameters, pvalues):
pf = ""
return "<" + self.__class__.__name__ + pf + ">"
+
class MessageTick(Message):
code = 0
parameters = []
+
class MessageGo(Message):
code = 1
parameters = []
+
class MessageWrite(Message):
code = 2
parameters = [(str, "name"), (Int32, "index"), (int, "value")]
+
class MessageRead(Message):
code = 3
parameters = [(str, "name"), (Int32, "index")]
+
class MessageReadReply(Message):
code = 4
parameters = [(int, "value")]
# Packing
#
+
def _pack_int(v):
if v == 0:
p = [1, 0]
p.insert(0, len(p))
return p
+
def _pack_str(v):
p = [ord(c) for c in v]
p.append(0)
return p
+
def _pack_int32(v):
return [
v & 0xff,
(v & 0xff000000) >> 24
]
+
def _pack(message):
r = [message.code]
for t, p in message.parameters:
# Unpacking
#
+
def _unpack_int(i, nchunks=None):
v = 0
power = 1
power *= 256
return v
+
def _unpack_str(i):
v = ""
c = next(i)
c = next(i)
return v
+
def _unpack(message):
i = iter(message)
code = next(i)
# I/O
#
+
class PacketTooLarge(Exception):
pass
+
class Initiator:
def __init__(self, sockaddr):
self.sockaddr = sockaddr
from migen.fhdl.structure import Signal, StopSimulation
from migen.fhdl.specials import Memory
+
class MemoryProxy:
def __init__(self, simulator, obj):
self.simulator = simulator
for i, v in zip(range(start, stop, step), value):
self.simulator.wr(self._simproxy_obj, i, v)
+
class Proxy:
def __init__(self, simulator, obj):
object.__setattr__(self, "simulator", simulator)
assert(isinstance(item, Signal))
self.simulator.wr(item, value)
+
def gen_sim(simg):
gens = dict()
resume_cycle = 0
from migen.sim.generic import run_simulation
from migen.fhdl import verilog
+
class SimBench(Module):
callback = None
def do_simulation(self, selfp):
if self.callback is not None:
return self.callback(self, selfp)
+
class SimCase:
TestBench = SimBench
from migen.test.support import SimCase, SimBench
+
def source_gen(sent):
for i in range(10):
yield Token("source", {"value": i})
sent.append(i)
+
class SimSource(SimActor):
def __init__(self):
self.source = Source([("value", 32)])
self.sent = []
SimActor.__init__(self, source_gen(self.sent))
+
def sink_gen(received):
while True:
t = Token("sink")
yield t
received.append(t.value["value"])
+
class SimSink(SimActor):
def __init__(self):
self.sink = Sink([("value", 32)])
self.received = []
SimActor.__init__(self, sink_gen(self.received))
+
class SourceSinkCase(SimCase, unittest.TestCase):
class TestBench(SimBench):
def __init__(self):
self.run_with(lambda tb, tbp: None)
self.assertEqual(self.tb.source.sent, self.tb.sink.received)
+
class SourceSinkDirectCase(SimCase, unittest.TestCase):
class TestBench(SimBench):
def __init__(self):
from migen.test.support import SimCase, SimBench
+
class EncCase(SimCase, unittest.TestCase):
class TestBench(SimBench):
def __init__(self):
self.assertEqual(tbp.dut.i, 1<<tbp.dut.o)
self.run_with(cb, 256)
+
class PrioEncCase(SimCase, unittest.TestCase):
class TestBench(SimBench):
def __init__(self):
self.assertGreaterEqual(i, 1<<o)
self.run_with(cb, 256)
+
class DecCase(SimCase, unittest.TestCase):
class TestBench(SimBench):
def __init__(self):
self.assertEqual(o, 1<<i)
self.run_with(cb, 256)
+
class SmallPrioEncCase(SimCase, unittest.TestCase):
class TestBench(SimBench):
def __init__(self):
from migen.test.support import SimCase, SimBench
+
class SyncFIFOCase(SimCase, unittest.TestCase):
class TestBench(SimBench):
def __init__(self):
from migen.fhdl.std import *
from migen.test.support import SimCase, SimBench
+
class SignedCase(SimCase, unittest.TestCase):
class TestBench(SimBench):
def __init__(self):
from migen.fhdl.std import *
+
def _same_slices(a, b):
return a.value is b.value and a.start == b.start and a.stop == b.stop
+
class SignalSizeCase(unittest.TestCase):
def setUp(self):
self.i = 0xaa
from migen.test.support import SimCase, SimBench
+
class BitonicCase(SimCase, unittest.TestCase):
class TestBench(SimBench):
def __init__(self):
from fractions import gcd
import collections
+
def flat_iteration(l):
for element in l:
if isinstance(element, collections.Iterable):
else:
yield element
+
def xdir(obj, return_values=False):
for attr in dir(obj):
if attr[:2] != "__" and attr[-2:] != "__":
else:
yield attr
+
def autotype(s):
if s == "True":
return True
pass
return s
+
def gcd_multiple(numbers):
l = len(numbers)
if l == 1:
import cairo
import math
+
def _cairo_draw_node(ctx, dx, radius, color, outer_color, s):
ctx.save()
ctx.restore()
+
def _cairo_draw_connection(ctx, x0, y0, color0, x1, y1, color1):
ctx.move_to(x0, y0)
ctx.curve_to(x0, y0+20, x1, y1-20, x1, y1)
ctx.set_source(gradient_color)
ctx.stroke()
+
class RenderNode:
def __init__(self, label, children=None, color=(0.8, 0.8, 0.8), radius=40):
self.label = label
self.render(ctx)
surface.finish()
+
def _test():
xns = [RenderNode("X"+str(n)) for n in range(5)]
yns = [RenderNode("Y"+str(n), [RenderNode("foo", color=(0.1*n, 0.5+0.2*n, 1.0-0.3*n))]) for n in range(3)]