class CycleType(Enum):
"""Wishbone Registered Feedback cycle type."""
- CLASSIC = 0b000
- CONST_BURST = 0b001
- INCR_BURST = 0b010
+ CLASSIC = 0b000
+ CONST_BURST = 0b001
+ INCR_BURST = 0b010
END_OF_BURST = 0b111
class BurstTypeExt(Enum):
"""Wishbone Registered Feedback burst type extension."""
- LINEAR = 0b00
- WRAP_4 = 0b01
- WRAP_8 = 0b10
+ LINEAR = 0b00
+ WRAP_4 = 0b01
+ WRAP_8 = 0b10
WRAP_16 = 0b11
Optional. Corresponds to Wishbone signal ``BTE_O`` (initiator)
or ``BTE_I`` (target).
"""
+
def __init__(self, *, addr_width, data_width, granularity=None, features=frozenset(),
alignment=0, name=None):
if not isinstance(addr_width, int) or addr_width < 0:
if granularity > data_width:
raise ValueError("Granularity {} may not be greater than data width {}"
.format(granularity, data_width))
- self.addr_width = addr_width
- self.data_width = data_width
+ self.addr_width = addr_width
+ self.data_width = data_width
self.granularity = granularity
granularity_bits = log2_int(data_width // granularity)
self._alignment = alignment
- self.memory_map = MemoryMap(addr_width=max(1, addr_width + granularity_bits),
- data_width=data_width >> granularity_bits,
- alignment=alignment)
+ self.memory_map = MemoryMap(addr_width=max(1, addr_width + granularity_bits),
+ data_width=data_width >> granularity_bits,
+ alignment=alignment)
self._features = set(features)
- unknown = self._features - {"rty", "err", "stall", "lock", "cti", "bte"}
+ unknown = self._features - \
+ {"rty", "err", "stall", "lock", "cti", "bte"}
if unknown:
raise ValueError("Optional signal(s) {} are not supported"
.format(", ".join(map(repr, unknown))))
layout = [
- ("adr", addr_width, Direction.FANOUT),
+ ("adr", addr_width, Direction.FANOUT),
("dat_w", data_width, Direction.FANOUT),
("dat_r", data_width, Direction.FANIN),
- ("sel", data_width // granularity, Direction.FANOUT),
- ("cyc", 1, Direction.FANOUT),
- ("stb", 1, Direction.FANOUT),
- ("we", 1, Direction.FANOUT),
- ("ack", 1, Direction.FANIN),
+ ("sel", data_width // granularity, Direction.FANOUT),
+ ("cyc", 1, Direction.FANOUT),
+ ("stb", 1, Direction.FANOUT),
+ ("we", 1, Direction.FANOUT),
+ ("ack", 1, Direction.FANIN),
]
if "err" in features:
layout += [("err", 1, Direction.FANIN)]
if "stall" in features:
layout += [("stall", 1, Direction.FANIN)]
if "lock" in features:
- layout += [("lock", 1, Direction.FANOUT)]
+ layout += [("lock", 1, Direction.FANOUT)]
if "cti" in features:
- layout += [("cti", CycleType, Direction.FANOUT)]
+ layout += [("cti", CycleType, Direction.FANOUT)]
if "bte" in features:
layout += [("bte", BurstTypeExt, Direction.FANOUT)]
super().__init__(layout, name=name, src_loc_at=1)
"but {}-bit long \"dat_r\""
.format(record, len(record.dat_w), len(record.dat_r)))
data_width = len(record.dat_w)
- if data_width%len(record.sel) != 0:
+ if data_width % len(record.sel) != 0:
raise AttributeError("Record {!r} has invalid granularity value because "
"its data width is {}-bit long but "
"its \"sel\" is {}-bit long"
granularity=granularity,
features=features,
alignment=0,
- name=record.name+"_intf")
+ name=record.name + "_intf")
class Decoder(Elaboratable):
bus : :class:`Interface`
Bus providing access to subordinate buses.
"""
+
def __init__(self, *, addr_width, data_width, granularity=None, features=frozenset(),
alignment=0):
- self.bus = Interface(addr_width=addr_width, data_width=data_width,
- granularity=granularity, features=features,
- alignment=alignment)
- self._map = self.bus.memory_map
+ self.bus = Interface(addr_width=addr_width, data_width=data_width,
+ granularity=granularity, features=features,
+ alignment=alignment)
+ self._map = self.bus.memory_map
self._subs = dict()
def align_to(self, alignment):
"translation)"
.format(sub_bus.data_width, sub_bus.granularity))
for opt_output in {"err", "rty", "stall"}:
- if hasattr(sub_bus, opt_output) and not hasattr(self.bus, opt_output):
+ if hasattr(sub_bus, opt_output) and not hasattr(
+ self.bus, opt_output):
raise ValueError("Subordinate bus has optional output {!r}, but the decoder "
"does not have a corresponding input"
.format(opt_output))
self._subs[sub_bus.memory_map] = sub_bus
- return self._map.add_window(sub_bus.memory_map, addr=addr, sparse=sparse)
+ return self._map.add_window(
+ sub_bus.memory_map, addr=addr, sparse=sparse)
def elaborate(self, platform):
m = Module()
- ack_fanin = 0
- err_fanin = 0
- rty_fanin = 0
+ ack_fanin = 0
+ err_fanin = 0
+ rty_fanin = 0
stall_fanin = 0
with m.Switch(self.bus.adr):
m.d.comb += [
sub_bus.adr.eq(self.bus.adr << log2_int(sub_ratio)),
sub_bus.dat_w.eq(self.bus.dat_w),
- sub_bus.sel.eq(Cat(Repl(sel, sub_ratio) for sel in self.bus.sel)),
+ sub_bus.sel.eq(Cat(Repl(sel, sub_ratio)
+ for sel in self.bus.sel)),
sub_bus.we.eq(self.bus.we),
sub_bus.stb.eq(self.bus.stb),
]
if hasattr(sub_bus, "lock"):
m.d.comb += sub_bus.lock.eq(getattr(self.bus, "lock", 0))
if hasattr(sub_bus, "cti"):
- m.d.comb += sub_bus.cti.eq(getattr(self.bus, "cti", CycleType.CLASSIC))
+ m.d.comb += sub_bus.cti.eq(getattr(self.bus,
+ "cti", CycleType.CLASSIC))
if hasattr(sub_bus, "bte"):
- m.d.comb += sub_bus.bte.eq(getattr(self.bus, "bte", BurstTypeExt.LINEAR))
+ m.d.comb += sub_bus.bte.eq(getattr(self.bus,
+ "bte", BurstTypeExt.LINEAR))
with m.Case(sub_pat[:-log2_int(self.bus.data_width // self.bus.granularity)]):
m.d.comb += [
bus : :class:`Interface`
Shared bus to which the selected initiator gains access.
"""
+
def __init__(self, *, addr_width, data_width, granularity=None, features=frozenset(),
scheduler="rr"):
- self.bus = Interface(addr_width=addr_width, data_width=data_width,
- granularity=granularity, features=features)
+ self.bus = Interface(addr_width=addr_width, data_width=data_width,
+ granularity=granularity, features=features)
self._itors = []
if scheduler not in ["rr"]:
raise ValueError("Scheduling mode must be \"rr\", not {!r}"
"arbiter data width {}"
.format(itor_bus.data_width, self.bus.data_width))
for opt_output in {"lock", "cti", "bte"}:
- if hasattr(itor_bus, opt_output) and not hasattr(self.bus, opt_output):
+ if hasattr(itor_bus, opt_output) and not hasattr(
+ self.bus, opt_output):
raise ValueError("Initiator bus has optional output {!r}, but the arbiter "
"does not have a corresponding input"
.format(opt_output))
m.submodules.scheduler = scheduler = RoundRobin(len(self._itors))
grant = Signal(range(len(self._itors)))
- # CYC should not be indefinitely asserted. (See RECOMMENDATION 3.05, Wishbone B4)
+ # CYC should not be indefinitely asserted. (See RECOMMENDATION 3.05,
+ # Wishbone B4)
bus_busy = self.bus.cyc
if hasattr(self.bus, "lock"):
# If LOCK is not asserted, we also wait for STB to be deasserted before granting bus
m.d.comb += [
self.bus.adr.eq(itor_bus.adr),
self.bus.dat_w.eq(itor_bus.dat_w),
- self.bus.sel.eq(Cat(Repl(sel, ratio) for sel in itor_bus.sel)),
+ self.bus.sel.eq(Cat(Repl(sel, ratio)
+ for sel in itor_bus.sel)),
self.bus.we.eq(itor_bus.we),
self.bus.stb.eq(itor_bus.stb),
]
m.d.comb += self.bus.cyc.eq(itor_bus.cyc)
if hasattr(self.bus, "lock"):
- m.d.comb += self.bus.lock.eq(getattr(itor_bus, "lock", 1))
+ m.d.comb += self.bus.lock.eq(
+ getattr(itor_bus, "lock", 1))
if hasattr(self.bus, "cti"):
- m.d.comb += self.bus.cti.eq(getattr(itor_bus, "cti", CycleType.CLASSIC))
+ m.d.comb += self.bus.cti.eq(
+ getattr(itor_bus, "cti", CycleType.CLASSIC))
if hasattr(self.bus, "bte"):
- m.d.comb += self.bus.bte.eq(getattr(itor_bus, "bte", BurstTypeExt.LINEAR))
+ m.d.comb += self.bus.bte.eq(
+ getattr(itor_bus, "bte", BurstTypeExt.LINEAR))
m.d.comb += itor_bus.ack.eq(self.bus.ack)
if hasattr(itor_bus, "err"):
- m.d.comb += itor_bus.err.eq(getattr(self.bus, "err", 0))
+ m.d.comb += itor_bus.err.eq(
+ getattr(self.bus, "err", 0))
if hasattr(itor_bus, "rty"):
- m.d.comb += itor_bus.rty.eq(getattr(self.bus, "rty", 0))
+ m.d.comb += itor_bus.rty.eq(
+ getattr(self.bus, "rty", 0))
if hasattr(itor_bus, "stall"):
- m.d.comb += itor_bus_stall.eq(getattr(self.bus, "stall", ~self.bus.ack))
+ m.d.comb += itor_bus_stall.eq(
+ getattr(self.bus, "stall", ~self.bus.ack))
return m
decoder : :class:`Decoder`
The decoder that connects the shared bus to the list of SLAVEs.
"""
+
def __init__(self, *, addr_width, data_width, itors, targets, **kwargs):
self.addr_width = addr_width
self.data_width = data_width