class CycleType(Enum):
"""Wishbone Registered Feedback cycle type."""
- CLASSIC = 0b000
- CONST_BURST = 0b001
- INCR_BURST = 0b010
+ CLASSIC = 0b000
+ CONST_BURST = 0b001
+ INCR_BURST = 0b010
END_OF_BURST = 0b111
class BurstTypeExt(Enum):
"""Wishbone Registered Feedback burst type extension."""
- LINEAR = 0b00
- WRAP_4 = 0b01
- WRAP_8 = 0b10
+ LINEAR = 0b00
+ WRAP_4 = 0b01
+ WRAP_8 = 0b10
WRAP_16 = 0b11
Optional. Corresponds to Wishbone signal ``BTE_O`` (initiator)
or ``BTE_I`` (target).
"""
- def __init__(self, *, addr_width, data_width, granularity=None, features=frozenset(),
+
+ def __init__(self, *, addr_width, data_width, granularity=None,
+ features=None,
alignment=0, name=None):
+ if features is None:
+ features = frozenset()
if not isinstance(addr_width, int) or addr_width < 0:
- raise ValueError("Address width must be a non-negative integer, not {!r}"
- .format(addr_width))
+ raise ValueError("Address width must be a non-negative integer, "
+ "not {!r}" .format(addr_width))
if data_width not in (8, 16, 32, 64):
raise ValueError("Data width must be one of 8, 16, 32, 64, not {!r}"
.format(data_width))
if granularity is None:
granularity = data_width
elif granularity not in (8, 16, 32, 64):
- raise ValueError("Granularity must be one of 8, 16, 32, 64, not {!r}"
- .format(granularity))
+ raise ValueError("Granularity must be one of 8, 16, 32, 64, "
+ "not {!r}" .format(granularity))
if granularity > data_width:
- raise ValueError("Granularity {} may not be greater than data width {}"
- .format(granularity, data_width))
- self.addr_width = addr_width
- self.data_width = data_width
+ raise ValueError("Granularity {} may not be greater than data "
+ "width {}" .format(granularity, data_width))
+ self.addr_width = addr_width
+ self.data_width = data_width
self.granularity = granularity
granularity_bits = log2_int(data_width // granularity)
self._alignment = alignment
- self.memory_map = MemoryMap(addr_width=max(1, addr_width + granularity_bits),
- data_width=data_width >> granularity_bits,
- alignment=alignment)
+ self.memory_map = MemoryMap(addr_width=max(1, addr_width +
+ granularity_bits),
+ data_width=data_width >> granularity_bits,
+ alignment=alignment)
self._features = set(features)
- unknown = self._features - {"rty", "err", "stall", "lock", "cti", "bte"}
+ unknown = self._features - \
+ {"rty", "err", "stall", "lock", "cti", "bte"}
if unknown:
raise ValueError("Optional signal(s) {} are not supported"
.format(", ".join(map(repr, unknown))))
layout = [
- ("adr", addr_width, Direction.FANOUT),
+ ("adr", addr_width, Direction.FANOUT),
("dat_w", data_width, Direction.FANOUT),
("dat_r", data_width, Direction.FANIN),
- ("sel", data_width // granularity, Direction.FANOUT),
- ("cyc", 1, Direction.FANOUT),
- ("stb", 1, Direction.FANOUT),
- ("we", 1, Direction.FANOUT),
- ("ack", 1, Direction.FANIN),
+ ("sel", data_width // granularity, Direction.FANOUT),
+ ("cyc", 1, Direction.FANOUT),
+ ("stb", 1, Direction.FANOUT),
+ ("we", 1, Direction.FANOUT),
+ ("ack", 1, Direction.FANIN),
]
if "err" in features:
layout += [("err", 1, Direction.FANIN)]
if "stall" in features:
layout += [("stall", 1, Direction.FANIN)]
if "lock" in features:
- layout += [("lock", 1, Direction.FANOUT)]
+ layout += [("lock", 1, Direction.FANOUT)]
if "cti" in features:
- layout += [("cti", CycleType, Direction.FANOUT)]
+ layout += [("cti", CycleType, Direction.FANOUT)]
if "bte" in features:
layout += [("bte", BurstTypeExt, Direction.FANOUT)]
super().__init__(layout, name=name, src_loc_at=1)
@classmethod
def from_pure_record(cls, record):
- """Instantiate a :class:`wishbone.Interface` from a simple :class:`Record`
+ """Instantiate a :class:`wishbone.Interface`
+ from a simple :class:`Record`
"""
if not isinstance(record, Record):
raise TypeError("{!r} is not a Record"
if len(record.dat_w) != len(record.dat_r):
raise AttributeError("Record {!r} has {}-bit long \"dat_w\" "
"but {}-bit long \"dat_r\""
- .format(record, len(record.dat_w), len(record.dat_r)))
+ .format(record, len(record.dat_w),
+ len(record.dat_r)))
data_width = len(record.dat_w)
- if data_width%len(record.sel) != 0:
- raise AttributeError("Record {!r} has invalid granularity value because "
+ if data_width % len(record.sel) != 0:
+ raise AttributeError("Record {!r} has invalid granularity "
+ "value because "
"its data width is {}-bit long but "
"its \"sel\" is {}-bit long"
.format(record, data_width, len(record.sel)))
granularity=granularity,
features=features,
alignment=0,
- name=record.name+"_intf")
+ name=record.name + "_intf")
class Decoder(Elaboratable):
bus : :class:`Interface`
Bus providing access to subordinate buses.
"""
+
def __init__(self, *, addr_width, data_width, granularity=None, features=frozenset(),
alignment=0):
- self.bus = Interface(addr_width=addr_width, data_width=data_width,
- granularity=granularity, features=features,
- alignment=alignment)
- self._map = self.bus.memory_map
+ self.bus = Interface(addr_width=addr_width, data_width=data_width,
+ granularity=granularity, features=features,
+ alignment=alignment)
+ self._map = self.bus.memory_map
self._subs = dict()
def align_to(self, alignment):
See :meth:`MemoryMap.add_resource` for details.
"""
if not isinstance(sub_bus, Interface):
- raise TypeError("Subordinate bus must be an instance of wishbone.Interface, not {!r}"
- .format(sub_bus))
+ raise TypeError("Subordinate bus must be an instance of "
+ "wishbone.Interface, not {!r}" .format(sub_bus))
if sub_bus.granularity > self.bus.granularity:
- raise ValueError("Subordinate bus has granularity {}, which is greater than the "
+ raise ValueError("Subordinate bus has granularity {}, "
+ "which is greater than the "
"decoder granularity {}"
.format(sub_bus.granularity, self.bus.granularity))
if not sparse:
if sub_bus.data_width != self.bus.data_width:
- raise ValueError("Subordinate bus has data width {}, which is not the same as "
- "decoder data width {} (required for dense address translation)"
- .format(sub_bus.data_width, self.bus.data_width))
+ raise ValueError("Subordinate bus has data width {}, "
+ "which is not the same as "
+ "decoder data width {} "
+ "(required for dense address translation)"
+ .format(sub_bus.data_width,
+ self.bus.data_width))
else:
if sub_bus.granularity != sub_bus.data_width:
- raise ValueError("Subordinate bus has data width {}, which is not the same as "
- "subordinate bus granularity {} (required for sparse address "
+ raise ValueError("Subordinate bus has data width {}, "
+ "which is not the same as "
+ "subordinate bus granularity {} "
+ "(required for sparse address "
"translation)"
- .format(sub_bus.data_width, sub_bus.granularity))
+ .format(sub_bus.data_width,
+ sub_bus.granularity))
for opt_output in {"err", "rty", "stall"}:
- if hasattr(sub_bus, opt_output) and not hasattr(self.bus, opt_output):
- raise ValueError("Subordinate bus has optional output {!r}, but the decoder "
+ if hasattr(sub_bus, opt_output) and not hasattr(
+ self.bus, opt_output):
+ raise ValueError("Subordinate bus has optional output "
+ "{!r}, but the decoder "
"does not have a corresponding input"
.format(opt_output))
self._subs[sub_bus.memory_map] = sub_bus
- return self._map.add_window(sub_bus.memory_map, addr=addr, sparse=sparse)
+ return self._map.add_window(
+ sub_bus.memory_map, addr=addr, sparse=sparse)
def elaborate(self, platform):
m = Module()
- ack_fanin = 0
- err_fanin = 0
- rty_fanin = 0
+ ack_fanin = 0
+ err_fanin = 0
+ rty_fanin = 0
stall_fanin = 0
with m.Switch(self.bus.adr):
m.d.comb += [
sub_bus.adr.eq(self.bus.adr << log2_int(sub_ratio)),
sub_bus.dat_w.eq(self.bus.dat_w),
- sub_bus.sel.eq(Cat(Repl(sel, sub_ratio) for sel in self.bus.sel)),
+ sub_bus.sel.eq(Cat(Repl(sel, sub_ratio)
+ for sel in self.bus.sel)),
sub_bus.we.eq(self.bus.we),
sub_bus.stb.eq(self.bus.stb),
]
if hasattr(sub_bus, "lock"):
m.d.comb += sub_bus.lock.eq(getattr(self.bus, "lock", 0))
if hasattr(sub_bus, "cti"):
- m.d.comb += sub_bus.cti.eq(getattr(self.bus, "cti", CycleType.CLASSIC))
+ m.d.comb += sub_bus.cti.eq(getattr(self.bus,
+ "cti",
+ CycleType.CLASSIC))
if hasattr(sub_bus, "bte"):
- m.d.comb += sub_bus.bte.eq(getattr(self.bus, "bte", BurstTypeExt.LINEAR))
+ m.d.comb += sub_bus.bte.eq(getattr(self.bus,
+ "bte",
+ BurstTypeExt.LINEAR))
- with m.Case(sub_pat[:-log2_int(self.bus.data_width // self.bus.granularity)]):
+ with m.Case(sub_pat[:-log2_int(self.bus.data_width //
+ self.bus.granularity)]):
m.d.comb += [
sub_bus.cyc.eq(self.bus.cyc),
self.bus.dat_r.eq(sub_bus.dat_r),
bus : :class:`Interface`
Shared bus to which the selected initiator gains access.
"""
+
def __init__(self, *, addr_width, data_width, granularity=None, features=frozenset(),
scheduler="rr"):
- self.bus = Interface(addr_width=addr_width, data_width=data_width,
- granularity=granularity, features=features)
+ self.bus = Interface(addr_width=addr_width, data_width=data_width,
+ granularity=granularity, features=features)
self._itors = []
if scheduler not in ["rr"]:
raise ValueError("Scheduling mode must be \"rr\", not {!r}"
be greater than or equal to the granularity of the arbiter.
"""
if not isinstance(itor_bus, Interface):
- raise TypeError("Initiator bus must be an instance of wishbone.Interface, not {!r}"
- .format(itor_bus))
+ raise TypeError("Initiator bus must be an instance of "
+ "wishbone.Interface, not {!r}".format(itor_bus))
if itor_bus.addr_width != self.bus.addr_width:
- raise ValueError("Initiator bus has address width {}, which is not the same as "
+ raise ValueError("Initiator bus has address width {}, "
+ "which is not the same as "
"arbiter address width {}"
.format(itor_bus.addr_width, self.bus.addr_width))
if itor_bus.granularity < self.bus.granularity:
- raise ValueError("Initiator bus has granularity {}, which is lesser than the "
+ raise ValueError("Initiator bus has granularity {}, "
+ "which is lesser than the "
"arbiter granularity {}"
.format(itor_bus.granularity, self.bus.granularity))
if itor_bus.data_width != self.bus.data_width:
- raise ValueError("Initiator bus has data width {}, which is not the same as "
+ raise ValueError("Initiator bus has data width {}, "
+ "which is not the same as "
"arbiter data width {}"
.format(itor_bus.data_width, self.bus.data_width))
for opt_output in {"lock", "cti", "bte"}:
- if hasattr(itor_bus, opt_output) and not hasattr(self.bus, opt_output):
- raise ValueError("Initiator bus has optional output {!r}, but the arbiter "
+ if hasattr(itor_bus, opt_output) and not hasattr(
+ self.bus, opt_output):
+ raise ValueError("Initiator bus has optional output {!r}, "
+ "but the arbiter "
"does not have a corresponding input"
.format(opt_output))
m.submodules.scheduler = scheduler = RoundRobin(len(self._itors))
grant = Signal(range(len(self._itors)))
- # CYC should not be indefinitely asserted. (See RECOMMENDATION 3.05, Wishbone B4)
+ # CYC should not be indefinitely asserted. (See RECOMMENDATION 3.05,
+ # Wishbone B4)
bus_busy = self.bus.cyc
if hasattr(self.bus, "lock"):
- # If LOCK is not asserted, we also wait for STB to be deasserted before granting bus
- # ownership to the next initiator. If we didn't, the next bus owner could receive
- # an ACK (or ERR, RTY) from the previous transaction when targeting the same
- # peripheral.
+ # If LOCK is not asserted, we also wait for STB to be
+ # deasserted before granting bus ownership to the next
+ # initiator. If we didn't, the next bus owner could receive
+ # an ACK (or ERR, RTY) from the previous transaction when
+ # targeting the same peripheral.
bus_busy &= self.bus.lock | self.bus.stb
m.d.comb += [
m.d.comb += [
self.bus.adr.eq(itor_bus.adr),
self.bus.dat_w.eq(itor_bus.dat_w),
- self.bus.sel.eq(Cat(Repl(sel, ratio) for sel in itor_bus.sel)),
+ self.bus.sel.eq(Cat(Repl(sel, ratio)
+ for sel in itor_bus.sel)),
self.bus.we.eq(itor_bus.we),
self.bus.stb.eq(itor_bus.stb),
]
m.d.comb += self.bus.cyc.eq(itor_bus.cyc)
if hasattr(self.bus, "lock"):
- m.d.comb += self.bus.lock.eq(getattr(itor_bus, "lock", 1))
+ m.d.comb += self.bus.lock.eq(
+ getattr(itor_bus, "lock", 1))
if hasattr(self.bus, "cti"):
- m.d.comb += self.bus.cti.eq(getattr(itor_bus, "cti", CycleType.CLASSIC))
+ m.d.comb += self.bus.cti.eq(
+ getattr(itor_bus, "cti", CycleType.CLASSIC))
if hasattr(self.bus, "bte"):
- m.d.comb += self.bus.bte.eq(getattr(itor_bus, "bte", BurstTypeExt.LINEAR))
+ m.d.comb += self.bus.bte.eq(
+ getattr(itor_bus, "bte", BurstTypeExt.LINEAR))
m.d.comb += itor_bus.ack.eq(self.bus.ack)
if hasattr(itor_bus, "err"):
- m.d.comb += itor_bus.err.eq(getattr(self.bus, "err", 0))
+ m.d.comb += itor_bus.err.eq(
+ getattr(self.bus, "err", 0))
if hasattr(itor_bus, "rty"):
- m.d.comb += itor_bus.rty.eq(getattr(self.bus, "rty", 0))
+ m.d.comb += itor_bus.rty.eq(
+ getattr(self.bus, "rty", 0))
if hasattr(itor_bus, "stall"):
- m.d.comb += itor_bus_stall.eq(getattr(self.bus, "stall", ~self.bus.ack))
+ m.d.comb += itor_bus_stall.eq(
+ getattr(self.bus, "stall", ~self.bus.ack))
return m
This instantiates the following components:
(1) An arbiter (:class:`Arbiter`) controlling access of
multiple MASTERs to the shared bus; and
- (2) A decoder (:class:`Decoder`) specifying which targeted SLAVE is to be accessed
- using address translation on the shared bus.
+ (2) A decoder (:class:`Decoder`) specifying which targeted SLAVE is
+ to be accessed using address translation on the shared bus.
See Section 8.2.3 of Wishbone B4 for implemenation specifications.
decoder : :class:`Decoder`
The decoder that connects the shared bus to the list of SLAVEs.
"""
+
def __init__(self, *, addr_width, data_width, itors, targets, **kwargs):
self.addr_width = addr_width
self.data_width = data_width
if name in kwargs:
arbiter_kwargs[name] = kwargs[name]
self.arbiter = Arbiter(
- addr_width=self.addr_width, data_width=self.data_width, **arbiter_kwargs
+ addr_width=self.addr_width, data_width=self.data_width,
+ **arbiter_kwargs
)
self.arbiter.bus.name = "arbiter_shared"
for itor_bus in self._itors:
for name in ["granularity", "features", "alignment"]:
if name in kwargs:
decoder_kwargs[name] = kwargs[name]
- self.decoder = Decoder(
- addr_width=self.addr_width, data_width=self.data_width, **decoder_kwargs
- )
+ self.decoder = Decoder(addr_width=self.addr_width,
+ data_width=self.data_width,
+ **decoder_kwargs)
self.decoder.bus.name = "decoder_shared"
for item in self._targets:
if isinstance(item, Interface):
self.decoder.add(item[0], addr=item[1])
else:
raise TypeError("Target must be a Wishbone interface, "
- "or a (Wishbone interface, start address) tuple, not {!r}"
+ "or a (Wishbone interface, "
+ "start address) tuple, not {!r}"
.format(item))
def elaborate(self, platform):