can have more restrictive access mode, e.g. R/O fields can be
a part of an R/W register.
"""
- R = "r"
- W = "w"
+ R = "r"
+ W = "w"
RW = "rw"
def readable(self):
Write strobe. Registers should update their value or perform
the write side effect when this strobe is asserted.
"""
+
def __init__(self, width, access, *, name=None, src_loc_at=0):
if not isinstance(width, int) or width < 0:
raise ValueError("Width must be a non-negative integer, not {!r}"
.format(width))
- if not isinstance(access, Element.Access) and access not in ("r", "w", "rw"):
- raise ValueError("Access mode must be one of \"r\", \"w\", or \"rw\", not {!r}"
+ if not isinstance(access, Element.Access) and access not in (
+ "r", "w", "rw"):
+ raise ValueError("Access mode must be one of \"r\", "
+ "\"w\", or \"rw\", not {!r}"
.format(access))
- self.width = width
+ self.width = width
self.access = Element.Access(access)
layout = []
if self.access.readable():
layout += [
("r_data", width),
- ("r_stb", 1),
+ ("r_stb", 1),
]
if self.access.writable():
layout += [
("w_data", width),
- ("w_stb", 1),
+ ("w_stb", 1),
]
super().__init__(layout, name=name, src_loc_at=1)
.format(data_width))
self.addr_width = addr_width
self.data_width = data_width
- self.memory_map = MemoryMap(addr_width=addr_width, data_width=data_width,
+ self.memory_map = MemoryMap(addr_width=addr_width,
+ data_width=data_width,
alignment=alignment)
super().__init__([
- ("addr", addr_width),
- ("r_data", data_width),
- ("r_stb", 1),
- ("w_data", data_width),
- ("w_stb", 1),
+ ("addr", addr_width),
+ ("r_data", data_width),
+ ("r_stb", 1),
+ ("w_data", data_width),
+ ("w_stb", 1),
], name=name, src_loc_at=1)
bus : :class:`Interface`
CSR bus providing access to registers.
"""
+
def __init__(self, *, addr_width, data_width, alignment=0):
- self.bus = Interface(addr_width=addr_width, data_width=data_width, alignment=alignment,
- name="csr")
+ self.bus = Interface(addr_width=addr_width,
+ data_width=data_width,
+ alignment=alignment,
+ name="csr")
self._map = self.bus.memory_map
def align_to(self, alignment):
See :meth:`MemoryMap.add_resource` for details.
"""
if not isinstance(element, Element):
- raise TypeError("Element must be an instance of csr.Element, not {!r}"
- .format(element))
+ raise TypeError("Element must be an instance of csr.Element, "
+ "not {!r}" .format(element))
size = (element.width + self.bus.data_width - 1) // self.bus.data_width
- return self._map.add_resource(element, size=size, addr=addr, alignment=alignment)
+ return self._map.add_resource(
+ element, size=size, addr=addr, alignment=alignment)
def elaborate(self, platform):
m = Module()
for elem, (elem_start, elem_end) in self._map.resources():
shadow = Signal(elem.width, name="{}__shadow".format(elem.name))
if elem.access.readable():
- shadow_en = Signal(elem_end - elem_start, name="{}__shadow_en".format(elem.name))
+ shadow_en = Signal(
+ elem_end - elem_start,
+ name="{}__shadow_en".format(
+ elem.name))
m.d.sync += shadow_en.eq(0)
if elem.access.writable():
m.d.comb += elem.w_data.eq(shadow)
# carry chains for comparisons, even with a constant. (Register sizes don't have
# to be powers of 2.)
with m.Switch(self.bus.addr):
- for chunk_offset, chunk_addr in enumerate(range(elem_start, elem_end)):
- shadow_slice = shadow.word_select(chunk_offset, self.bus.data_width)
+ for chunk_offset, chunk_addr in enumerate(
+ range(elem_start, elem_end)):
+ shadow_slice = shadow.word_select(
+ chunk_offset, self.bus.data_width)
with m.Case(chunk_addr):
if elem.access.readable():
- r_data_fanin |= Mux(shadow_en[chunk_offset], shadow_slice, 0)
+ r_data_fanin |= Mux(
+ shadow_en[chunk_offset], shadow_slice, 0)
if chunk_addr == elem_start:
m.d.comb += elem.r_stb.eq(self.bus.r_stb)
with m.If(self.bus.r_stb):
m.d.sync += shadow.eq(elem.r_data)
# Delay by 1 cycle, allowing reads to be pipelined.
- m.d.sync += shadow_en.eq(self.bus.r_stb << chunk_offset)
+ m.d.sync += shadow_en.eq(self.bus.r_stb <<
+ chunk_offset)
if elem.access.writable():
if chunk_addr == elem_end - 1:
bus : :class:`Interface`
CSR bus providing access to subordinate buses.
"""
+
def __init__(self, *, addr_width, data_width, alignment=0):
- self.bus = Interface(addr_width=addr_width, data_width=data_width, alignment=alignment,
- name="csr")
- self._map = self.bus.memory_map
+ self.bus = Interface(addr_width=addr_width,
+ data_width=data_width,
+ alignment=alignment,
+ name="csr")
+ self._map = self.bus.memory_map
self._subs = dict()
def align_to(self, alignment):
See :meth:`MemoryMap.add_resource` for details.
"""
if not isinstance(sub_bus, Interface):
- raise TypeError("Subordinate bus must be an instance of csr.Interface, not {!r}"
- .format(sub_bus))
+ raise TypeError("Subordinate bus must be an instance of "
+ "csr.Interface, not {!r}" .format(sub_bus))
if sub_bus.data_width != self.bus.data_width:
- raise ValueError("Subordinate bus has data width {}, which is not the same as "
+ raise ValueError("Subordinate bus has data width {}, "
+ "which is not the same as "
"decoder data width {}"
.format(sub_bus.data_width, self.bus.data_width))
self._subs[sub_bus.memory_map] = sub_bus
wb_bus : :class:`..wishbone.Interface`
Wishbone bus provided by the bridge.
"""
+
def __init__(self, csr_bus, *, data_width=None):
if not isinstance(csr_bus, CSRInterface):
raise ValueError("CSR bus must be an instance of CSRInterface, not {!r}"
data_width = csr_bus.data_width
self.csr_bus = csr_bus
- self.wb_bus = WishboneInterface(
- addr_width=max(0, csr_bus.addr_width - log2_int(data_width // csr_bus.data_width)),
+ self.wb_bus = WishboneInterface(
+ addr_width=max(
+ 0,
+ csr_bus.addr_width -
+ log2_int(
+ data_width //
+ csr_bus.data_width)),
data_width=data_width,
granularity=csr_bus.data_width,
name="wb")
# Since granularity of the Wishbone interface matches the data width of the CSR bus,
- # no width conversion is performed, even if the Wishbone data width is greater.
+ # no width conversion is performed, even if the Wishbone data width is
+ # greater.
self.wb_bus.memory_map.add_window(self.csr_bus.memory_map)
def elaborate(self, platform):
csr_bus = self.csr_bus
- wb_bus = self.wb_bus
+ wb_bus = self.wb_bus
m = Module()
cycle = Signal(range(len(wb_bus.sel) + 1))
- m.d.comb += csr_bus.addr.eq(Cat(cycle[:log2_int(len(wb_bus.sel))], wb_bus.adr))
+ m.d.comb += csr_bus.addr.eq(
+ Cat(cycle[:log2_int(len(wb_bus.sel))], wb_bus.adr))
with m.If(wb_bus.cyc & wb_bus.stb):
with m.Switch(cycle):
def segment(index):
- return slice(index * wb_bus.granularity, (index + 1) * wb_bus.granularity)
+ return slice(index * wb_bus.granularity,
+ (index + 1) * wb_bus.granularity)
for index, sel_index in enumerate(wb_bus.sel):
with m.Case(index):
if index > 0:
- # CSR reads are registered, and we need to re-register them.
- m.d.sync += wb_bus.dat_r[segment(index - 1)].eq(csr_bus.r_data)
+ # CSR reads are registered, and we need to
+ # re-register them.
+ m.d.sync += wb_bus.dat_r[segment(
+ index - 1)].eq(csr_bus.r_data)
m.d.comb += csr_bus.r_stb.eq(sel_index & ~wb_bus.we)
- m.d.comb += csr_bus.w_data.eq(wb_bus.dat_w[segment(index)])
+ m.d.comb += csr_bus.w_data.eq(
+ wb_bus.dat_w[segment(index)])
m.d.comb += csr_bus.w_stb.eq(sel_index & wb_bus.we)
m.d.sync += cycle.eq(index + 1)
A range map is a mapping from non-overlapping ranges to arbitrary values.
"""
+
def __init__(self):
- self._keys = []
+ self._keys = []
self._values = dict()
self._starts = []
- self._stops = []
+ self._stops = []
def insert(self, key, value):
assert isinstance(key, range)
assert not self.overlaps(key)
start_idx = bisect.bisect_right(self._starts, key.start)
- stop_idx = bisect.bisect_left(self._stops, key.stop)
+ stop_idx = bisect.bisect_left(self._stops, key.stop)
assert start_idx == stop_idx
self._starts.insert(start_idx, key.start)
def overlaps(self, key):
start_idx = bisect.bisect_right(self._stops, key.start)
- stop_idx = bisect.bisect_left(self._starts, key.stop)
+ stop_idx = bisect.bisect_left(self._starts, key.stop)
return [self._values[key] for key in self._keys[start_idx:stop_idx]]
def items(self):
at an address that is a multiple of ``2 ** alignment``, and its
size will be rounded up to be a multiple of ``2 ** alignment``.
"""
+
def __init__(self, *, addr_width, data_width, alignment=0):
if not isinstance(addr_width, int) or addr_width <= 0:
- raise ValueError("Address width must be a positive integer, not {!r}"
- .format(addr_width))
+ raise ValueError("Address width must be a positive integer, "
+ "not {!r}" .format(addr_width))
if not isinstance(data_width, int) or data_width <= 0:
- raise ValueError("Data width must be a positive integer, not {!r}"
- .format(data_width))
+ raise ValueError("Data width must be a positive integer, "
+ "not {!r}" .format(data_width))
if not isinstance(alignment, int) or alignment < 0:
- raise ValueError("Alignment must be a non-negative integer, not {!r}"
- .format(alignment))
+ raise ValueError("Alignment must be a non-negative integer, "
+ "not {!r}" .format(alignment))
self.addr_width = addr_width
self.data_width = data_width
- self.alignment = alignment
+ self.alignment = alignment
- self._ranges = _RangeMap()
+ self._ranges = _RangeMap()
self._resources = dict()
- self._windows = dict()
+ self._windows = dict()
self._next_addr = 0
Implicit next address.
"""
if not isinstance(alignment, int) or alignment < 0:
- raise ValueError("Alignment must be a non-negative integer, not {!r}"
- .format(alignment))
- self._next_addr = self._align_up(self._next_addr, max(alignment, self.alignment))
+ raise ValueError("Alignment must be a non-negative integer, "
+ "not {!r}" .format(alignment))
+ self._next_addr = self._align_up(
+ self._next_addr, max(
+ alignment, self.alignment))
return self._next_addr
def _compute_addr_range(self, addr, size, step=1, *, alignment):
if addr is not None:
if not isinstance(addr, int) or addr < 0:
- raise ValueError("Address must be a non-negative integer, not {!r}"
- .format(addr))
+ raise ValueError("Address must be a non-negative integer, "
+ "not {!r}" .format(addr))
if addr % (1 << self.alignment) != 0:
- raise ValueError("Explicitly specified address {:#x} must be a multiple of "
- "{:#x} bytes"
- .format(addr, 1 << alignment))
+ raise ValueError("Explicitly specified address {:#x} "
+ "must be a multiple of "
+ "{:#x} bytes".format(addr, 1 << alignment))
else:
addr = self._align_up(self._next_addr, alignment)
.format(size))
size = self._align_up(size, alignment)
- if addr > (1 << self.addr_width) or addr + size > (1 << self.addr_width):
+ if addr > (1 << self.addr_width) or addr + \
+ size > (1 << self.addr_width):
raise ValueError("Address range {:#x}..{:#x} out of bounds for memory map spanning "
"range {:#x}..{:#x} ({} address bits)"
- .format(addr, addr + size, 0, 1 << self.addr_width, self.addr_width))
+ .format(addr, addr + size, 0,
+ 1 << self.addr_width, self.addr_width))
addr_range = range(addr, addr + size, step)
overlaps = self._ranges.overlaps(addr_range)
if overlap in self._resources:
resource_range = self._resources[overlap]
overlap_descrs.append("resource {!r} at {:#x}..{:#x}"
- .format(overlap, resource_range.start, resource_range.stop))
+ .format(overlap,
+ resource_range.start,
+ resource_range.stop))
if overlap in self._windows:
window_range = self._windows[overlap]
overlap_descrs.append("window {!r} at {:#x}..{:#x}"
- .format(overlap, window_range.start, window_range.stop))
+ .format(overlap,
+ window_range.start,
+ window_range.stop))
raise ValueError("Address range {:#x}..{:#x} overlaps with {}"
- .format(addr, addr + size, ", ".join(overlap_descrs)))
+ .format(addr,
+ addr + size,
+ ", ".join(overlap_descrs)))
return addr_range
"""
if resource in self._resources:
addr_range = self._resources[resource]
- raise ValueError("Resource {!r} is already added at address range {:#x}..{:#x}"
- .format(resource, addr_range.start, addr_range.stop))
+ raise ValueError("Resource {!r} is already added at "
+ "address range {:#x}..{:#x}"
+ .format(resource, addr_range.start,
+ addr_range.stop))
if alignment is not None:
if not isinstance(alignment, int) or alignment < 0:
- raise ValueError("Alignment must be a non-negative integer, not {!r}"
- .format(alignment))
+ raise ValueError("Alignment must be a non-negative integer, "
+ "not {!r}" .format(alignment))
alignment = max(alignment, self.alignment)
else:
alignment = self.alignment
.format(window))
if window in self._windows:
addr_range = self._windows[window]
- raise ValueError("Window {!r} is already added at address range {:#x}..{:#x}"
+ raise ValueError("Window {!r} is already added at "
+ "address range {:#x}..{:#x}"
.format(window, addr_range.start, addr_range.stop))
if window.data_width > self.data_width:
- raise ValueError("Window has data width {}, and cannot be added to a memory map "
+ raise ValueError("Window has data width {}, and cannot "
+ "be added to a memory map "
"with data width {}"
.format(window.data_width, self.data_width))
if window.data_width != self.data_width:
if sparse is None:
- raise ValueError("Address translation mode must be explicitly specified "
- "when adding a window with data width {} to a memory map "
+ raise ValueError("Address translation mode must be "
+ "explicitly specified "
+ "when adding a window with "
+ "data width {} to a memory map "
"with data width {}"
.format(window.data_width, self.data_width))
if not sparse and self.data_width % window.data_width != 0:
- raise ValueError("Dense addressing cannot be used because the memory map "
- "data width {} is not an integer multiple of window "
+ raise ValueError("Dense addressing cannot be used "
+ "because the memory map "
+ "data width {} is not an integer "
+ "multiple of window "
"data width {}"
.format(self.data_width, window.data_width))
# a window can still be aligned using align_to().
alignment = max(self.alignment, window.addr_width // ratio)
- addr_range = self._compute_addr_range(addr, size, ratio, alignment=alignment)
+ addr_range = self._compute_addr_range(
+ addr, size, ratio, alignment=alignment)
self._ranges.insert(addr_range, window)
self._windows[window] = addr_range
self._next_addr = addr_range.stop
wider bus. Otherwise, it is always 1.
"""
for window, window_range in self._windows.items():
- yield window, (window_range.start, window_range.stop, window_range.step)
+ yield window, (window_range.start, window_range.stop,
+ window_range.step)
def window_patterns(self):
"""Iterate local windows and patterns that match their address ranges.
it is always 1.
"""
for window, window_range in self._windows.items():
- pattern = "{:0{}b}{}".format(window_range.start >> window.addr_width,
+ pattern = "{:0{}b}{}".format((window_range.start >>
+ window.addr_width),
self.addr_width - window.addr_width,
"-" * window.addr_width)
yield window, (pattern, window_range.step)
# Accessing a resource through a dense and then a sparse window results in very strange
# layouts that cannot be easily represented, so reject those.
assert window_range.step == 1 or width == window.data_width
- size = (end - start) // window_range.step
+ size = (end - start) // window_range.step
start += window_range.start
width *= window_range.step
return start, start + size, width
"""
for addr_range, assignment in self._ranges.items():
if assignment in self._resources:
- yield assignment, (addr_range.start, addr_range.stop, self.data_width)
+ yield assignment, (addr_range.start, addr_range.stop,
+ self.data_width)
elif assignment in self._windows:
for sub_resource, sub_descr in assignment.all_resources():
- yield sub_resource, self._translate(*sub_descr, assignment, addr_range)
+ yield sub_resource, self._translate(*sub_descr,
+ assignment,
+ addr_range)
else:
- assert False # :nocov:
+ assert False # :nocov:
def find_resource(self, resource):
"""Find address range corresponding to a resource.
for window, window_range in self._windows.items():
try:
- return self._translate(*window.find_resource(resource), window, window_range)
+ return self._translate(*window.find_resource(resource),
+ window,
+ window_range)
except KeyError:
pass
return assignment
elif assignment in self._windows:
addr_range = self._windows[assignment]
- return assignment.decode_address((address - addr_range.start) // addr_range.step)
+ return assignment.decode_address(
+ (address - addr_range.start) // addr_range.step)
else:
- assert False # :nocov:
+ assert False # :nocov:
Strobe signal to enable granting access to the next device
requesting. Externally driven.
"""
+
def __init__(self, n):
self.n = n
self.request = Signal(n)
with m.Switch(self.grant):
for i in range(self.n):
with m.Case(i):
- for j in reversed(range(i+1, i+self.n)):
+ for j in reversed(range(i + 1, i + self.n)):
# If i+1 <= j < n, then t == j; (after i)
# If n <= j < i+n, then t == j - n (before i)
t = j % self.n
("w_stb", 1),
]))
- def test_layout_0_rw(self): # degenerate but legal case
+ def test_layout_0_rw(self): # degenerate but legal case
elem = Element(0, access=Element.Access.RW)
self.assertEqual(elem.width, 0)
self.assertEqual(elem.access, Element.Access.RW)
self.assertEqual(iface.addr_width, 12)
self.assertEqual(iface.data_width, 8)
self.assertEqual(iface.layout, Layout.cast([
- ("addr", 12),
- ("r_data", 8),
- ("r_stb", 1),
- ("w_data", 8),
- ("w_stb", 1),
+ ("addr", 12),
+ ("r_data", 8),
+ ("r_stb", 1),
+ ("w_data", 8),
+ ("w_stb", 1),
]))
def test_wrong_addr_width(self):
self.assertEqual((yield elem_4_r.r_stb), 0)
self.assertEqual((yield elem_16_rw.r_stb), 1)
yield
- yield bus.addr.eq(3) # pipeline a read
+ yield bus.addr.eq(3) # pipeline a read
self.assertEqual((yield bus.r_data), 0xa5)
yield bus.r_stb.eq(1)
yield bus.w_stb.eq(1)
yield
yield bus.w_stb.eq(0)
- yield bus.addr.eq(2) # change address
+ yield bus.addr.eq(2) # change address
yield
self.assertEqual((yield elem_8_w.w_stb), 1)
self.assertEqual((yield elem_8_w.w_data), 0x3d)
yield
self.assertEqual((yield elem_8_w.w_stb), 0)
self.assertEqual((yield elem_16_rw.w_stb), 0)
- yield bus.addr.eq(3) # pipeline a write
+ yield bus.addr.eq(3) # pipeline a write
yield bus.w_data.eq(0xaa)
yield
self.assertEqual((yield elem_8_w.w_stb), 0)
def test_add_wrong_sub_bus(self):
with self.assertRaisesRegex(TypeError,
- r"Subordinate bus must be an instance of csr\.Interface, not 1"):
+ r"Subordinate bus must be an instance of csr\.Interface, not 1"):
self.dut.add(1)
def test_add_wrong_data_width(self):
mux = Multiplexer(addr_width=10, data_width=16)
- Fragment.get(mux, platform=None) # silence UnusedElaboratable
+ Fragment.get(mux, platform=None) # silence UnusedElaboratable
with self.assertRaisesRegex(ValueError,
r"Subordinate bus has data width 16, which is not the same as "
self.dut.add(mux.bus)
def test_sim(self):
- mux_1 = Multiplexer(addr_width=10, data_width=8)
+ mux_1 = Multiplexer(addr_width=10, data_width=8)
self.dut.add(mux_1.bus)
elem_1 = Element(8, "rw")
mux_1.add(elem_1)
- mux_2 = Multiplexer(addr_width=10, data_width=8)
+ mux_2 = Multiplexer(addr_width=10, data_width=8)
self.dut.add(mux_2.bus)
elem_2 = Element(8, "rw")
mux_2.add(elem_2, addr=2)
self.element = csr.Element(width, "rw")
self.r_count = Signal(8)
self.w_count = Signal(8)
- self.data = Signal(width)
+ self.data = Signal(width)
def elaborate(self, platform):
m = Module()
with self.assertRaisesRegex(ValueError,
r"CSR bus data width must be one of 8, 16, 32, 64, not 7"):
WishboneCSRBridge(csr_bus=csr.Interface(addr_width=10,
- data_width=7))
+ data_width=7))
def test_narrow(self):
- mux = csr.Multiplexer(addr_width=10, data_width=8)
+ mux = csr.Multiplexer(addr_width=10, data_width=8)
reg_1 = MockRegister(8)
mux.add(reg_1.element)
reg_2 = MockRegister(16)
mux.add(reg_2.element)
- dut = WishboneCSRBridge(mux.bus)
+ dut = WishboneCSRBridge(mux.bus)
def sim_test():
yield dut.wb_bus.cyc.eq(1)
class RangeMapTestCase(unittest.TestCase):
def test_insert(self):
range_map = _RangeMap()
- range_map.insert(range(0,10), "a")
- range_map.insert(range(20,21), "c")
- range_map.insert(range(15,16), "b")
- range_map.insert(range(16,20), "q")
+ range_map.insert(range(0, 10), "a")
+ range_map.insert(range(20, 21), "c")
+ range_map.insert(range(15, 16), "b")
+ range_map.insert(range(16, 20), "q")
self.assertEqual(range_map._keys, [
- range(0,10), range(15,16), range(16,20), range(20,21)
+ range(0, 10), range(15, 16), range(16, 20), range(20, 21)
])
def test_overlaps(self):
range_map = _RangeMap()
- range_map.insert(range(10,20), "a")
- self.assertEqual(range_map.overlaps(range(5,15)), ["a"])
- self.assertEqual(range_map.overlaps(range(15,25)), ["a"])
- self.assertEqual(range_map.overlaps(range(5,25)), ["a"])
- self.assertEqual(range_map.overlaps(range(0,3)), [])
- self.assertEqual(range_map.overlaps(range(0,5)), [])
- self.assertEqual(range_map.overlaps(range(25,30)), [])
+ range_map.insert(range(10, 20), "a")
+ self.assertEqual(range_map.overlaps(range(5, 15)), ["a"])
+ self.assertEqual(range_map.overlaps(range(15, 25)), ["a"])
+ self.assertEqual(range_map.overlaps(range(5, 25)), ["a"])
+ self.assertEqual(range_map.overlaps(range(0, 3)), [])
+ self.assertEqual(range_map.overlaps(range(0, 5)), [])
+ self.assertEqual(range_map.overlaps(range(25, 30)), [])
def test_insert_wrong_overlap(self):
range_map = _RangeMap()
- range_map.insert(range(0,10), "a")
+ range_map.insert(range(0, 10), "a")
with self.assertRaises(AssertionError):
- range_map.insert(range(5,15), "b")
+ range_map.insert(range(5, 15), "b")
def test_get(self):
range_map = _RangeMap()
- range_map.insert(range(5,15), "a")
+ range_map.insert(range(5, 15), "a")
self.assertEqual(range_map.get(0), None)
self.assertEqual(range_map.get(5), "a")
self.assertEqual(range_map.get(10), "a")
class MemoryMapTestCase(unittest.TestCase):
def test_wrong_addr_width(self):
with self.assertRaisesRegex(ValueError,
- r"Address width must be a positive integer, not -1"):
+ r"Address width must be a positive integer, not -1"):
MemoryMap(addr_width=-1, data_width=8)
def test_wrong_data_width(self):
with self.assertRaisesRegex(ValueError,
- r"Data width must be a positive integer, not -1"):
+ r"Data width must be a positive integer, not -1"):
MemoryMap(addr_width=16, data_width=-1)
def test_wrong_alignment(self):
with self.assertRaisesRegex(ValueError,
- r"Alignment must be a non-negative integer, not -1"):
+ r"Alignment must be a non-negative integer, not -1"):
MemoryMap(addr_width=16, data_width=8, alignment=-1)
def test_add_resource(self):
def test_add_resource_explicit_aligned(self):
memory_map = MemoryMap(addr_width=16, data_width=8)
self.assertEqual(memory_map.add_resource("a", size=1), (0, 1))
- self.assertEqual(memory_map.add_resource("b", size=1, alignment=1), (2, 4))
+ self.assertEqual(
+ memory_map.add_resource(
+ "b", size=1, alignment=1), (2, 4))
self.assertEqual(memory_map.add_resource("c", size=2), (4, 6))
def test_add_resource_addr(self):
memory_map = MemoryMap(addr_width=16, data_width=8)
- self.assertEqual(memory_map.add_resource("a", size=1, addr=10), (10, 11))
+ self.assertEqual(
+ memory_map.add_resource(
+ "a", size=1, addr=10), (10, 11))
self.assertEqual(memory_map.add_resource("b", size=2), (11, 13))
def test_add_resource_wrong_address(self):
memory_map = MemoryMap(addr_width=16, data_width=8)
with self.assertRaisesRegex(ValueError,
- r"Address must be a non-negative integer, not -1"):
+ r"Address must be a non-negative integer, not -1"):
memory_map.add_resource("a", size=1, addr=-1)
def test_add_resource_wrong_address_unaligned(self):
memory_map = MemoryMap(addr_width=16, data_width=8, alignment=1)
with self.assertRaisesRegex(ValueError,
- r"Explicitly specified address 0x1 must be "
- r"a multiple of 0x2 bytes"):
+ r"Explicitly specified address 0x1 must be "
+ r"a multiple of 0x2 bytes"):
memory_map.add_resource("a", size=1, addr=1)
def test_add_resource_wrong_size(self):
memory_map = MemoryMap(addr_width=16, data_width=8)
with self.assertRaisesRegex(ValueError,
- r"Size must be a non-negative integer, not -1"):
+ r"Size must be a non-negative integer, not -1"):
memory_map.add_resource("a", size=-1)
def test_add_resource_wrong_alignment(self):
r"data width 16 is not an integer multiple of window data "
r"width 7"):
memory_map.add_window(MemoryMap(addr_width=10, data_width=7),
- sparse=False)
+ sparse=False)
def test_add_window_wrong_overlap(self):
memory_map = MemoryMap(addr_width=16, data_width=8)
self.assertEqual(iface.memory_map.addr_width, 32)
self.assertEqual(iface.memory_map.data_width, 8)
self.assertEqual(iface.layout, Layout.cast([
- ("adr", 32, DIR_FANOUT),
- ("dat_w", 8, DIR_FANOUT),
- ("dat_r", 8, DIR_FANIN),
- ("sel", 1, DIR_FANOUT),
- ("cyc", 1, DIR_FANOUT),
- ("stb", 1, DIR_FANOUT),
- ("we", 1, DIR_FANOUT),
- ("ack", 1, DIR_FANIN),
+ ("adr", 32, DIR_FANOUT),
+ ("dat_w", 8, DIR_FANOUT),
+ ("dat_r", 8, DIR_FANIN),
+ ("sel", 1, DIR_FANOUT),
+ ("cyc", 1, DIR_FANOUT),
+ ("stb", 1, DIR_FANOUT),
+ ("we", 1, DIR_FANOUT),
+ ("ack", 1, DIR_FANIN),
]))
def test_granularity(self):
self.assertEqual(iface.memory_map.addr_width, 32)
self.assertEqual(iface.memory_map.data_width, 8)
self.assertEqual(iface.layout, Layout.cast([
- ("adr", 30, DIR_FANOUT),
+ ("adr", 30, DIR_FANOUT),
("dat_w", 32, DIR_FANOUT),
("dat_r", 32, DIR_FANIN),
- ("sel", 4, DIR_FANOUT),
- ("cyc", 1, DIR_FANOUT),
- ("stb", 1, DIR_FANOUT),
- ("we", 1, DIR_FANOUT),
- ("ack", 1, DIR_FANIN),
+ ("sel", 4, DIR_FANOUT),
+ ("cyc", 1, DIR_FANOUT),
+ ("stb", 1, DIR_FANOUT),
+ ("we", 1, DIR_FANOUT),
+ ("ack", 1, DIR_FANIN),
]))
def test_features(self):
iface = Interface(addr_width=32, data_width=32,
features={"rty", "err", "stall", "lock",
- "cti", "bte"})
+ "cti", "bte"})
self.assertEqual(iface.layout, Layout.cast([
- ("adr", 32, DIR_FANOUT),
+ ("adr", 32, DIR_FANOUT),
("dat_w", 32, DIR_FANOUT),
("dat_r", 32, DIR_FANIN),
- ("sel", 1, DIR_FANOUT),
- ("cyc", 1, DIR_FANOUT),
- ("stb", 1, DIR_FANOUT),
- ("we", 1, DIR_FANOUT),
- ("ack", 1, DIR_FANIN),
- ("err", 1, DIR_FANIN),
- ("rty", 1, DIR_FANIN),
- ("stall", 1, DIR_FANIN),
- ("lock", 1, DIR_FANOUT),
- ("cti", CycleType, DIR_FANOUT),
- ("bte", BurstTypeExt, DIR_FANOUT),
+ ("sel", 1, DIR_FANOUT),
+ ("cyc", 1, DIR_FANOUT),
+ ("stb", 1, DIR_FANOUT),
+ ("we", 1, DIR_FANOUT),
+ ("ack", 1, DIR_FANIN),
+ ("err", 1, DIR_FANIN),
+ ("rty", 1, DIR_FANIN),
+ ("stall", 1, DIR_FANIN),
+ ("lock", 1, DIR_FANOUT),
+ ("cti", CycleType, DIR_FANOUT),
+ ("bte", BurstTypeExt, DIR_FANOUT),
]))
def test_wrong_addr_width(self):
r"but the decoder does "
r"not have a corresponding input"):
self.dut.add(Interface(addr_width=15, data_width=32,
- granularity=16, features={"err"}))
+ granularity=16, features={"err"}))
class DecoderSimulationTestCase(unittest.TestCase):
for index, sel_bit in enumerate(self.bus.sel):
with m.If(sel_bit):
segment = self.bus.dat_r.word_select(index,
- self.bus.granularity)
+ self.bus.granularity)
m.d.comb += segment.eq(self.bus.adr + index)
return m
loop_3 = AddressLoopback(addr_width=8, data_width=16, granularity=16)
self.assertEqual(dut.add(loop_3.bus, addr=0x30000, sparse=True),
(0x30000, 0x30100, 1))
- loop_4 = AddressLoopback(addr_width=8, data_width=8, granularity=8)
+ loop_4 = AddressLoopback(addr_width=8, data_width=8, granularity=8)
self.assertEqual(dut.add(loop_4.bus, addr=0x40000, sparse=True),
(0x40000, 0x40100, 1))
dut.add(itor_1)
itor_2 = Interface(addr_width=30, data_width=32, granularity=16,
features={"err", "rty", "stall", "lock",
- "cti", "bte"})
+ "cti", "bte"})
dut.add(itor_2)
def sim_test():
self.shared = Interface(addr_width=30,
data_width=32,
granularity=8,
- features={"err","cti","bte"},
+ features={"err", "cti", "bte"},
name="shared")
self.master01 = Interface(addr_width=30,
data_width=32,
granularity=8,
- features={"err","cti","bte"},
+ features={"err", "cti", "bte"},
name="master01")
self.master02 = Record([
- ("adr", 30, DIR_FANOUT),
+ ("adr", 30, DIR_FANOUT),
("dat_w", 32, DIR_FANOUT),
("dat_r", 32, DIR_FANIN),
- ("sel", 4, DIR_FANOUT),
- ("cyc", 1, DIR_FANOUT),
- ("stb", 1, DIR_FANOUT),
- ("ack", 1, DIR_FANIN),
- ("we", 1, DIR_FANOUT),
- ("cti", 3, DIR_FANOUT),
- ("bte", 2, DIR_FANOUT),
- ("err", 1, DIR_FANIN)
+ ("sel", 4, DIR_FANOUT),
+ ("cyc", 1, DIR_FANOUT),
+ ("stb", 1, DIR_FANOUT),
+ ("ack", 1, DIR_FANIN),
+ ("we", 1, DIR_FANOUT),
+ ("cti", 3, DIR_FANOUT),
+ ("bte", 2, DIR_FANOUT),
+ ("err", 1, DIR_FANIN)
])
self.sub01 = Interface(addr_width=11,
- data_width=32,
- granularity=8,
- features={"err","cti","bte"},
- name="sub01")
+ data_width=32,
+ granularity=8,
+ features={"err", "cti", "bte"},
+ name="sub01")
self.sub02 = Interface(addr_width=21,
data_width=32,
granularity=8,
- features={"err","cti","bte"},
+ features={"err", "cti", "bte"},
name="sub02")
self.dut = InterconnectShared(
addr_width=30, data_width=32, granularity=8,
- features={"err","cti","bte"},
+ features={"err", "cti", "bte"},
itors=[
self.master01,
self.master02
class CycleType(Enum):
"""Wishbone Registered Feedback cycle type."""
- CLASSIC = 0b000
- CONST_BURST = 0b001
- INCR_BURST = 0b010
+ CLASSIC = 0b000
+ CONST_BURST = 0b001
+ INCR_BURST = 0b010
END_OF_BURST = 0b111
class BurstTypeExt(Enum):
"""Wishbone Registered Feedback burst type extension."""
- LINEAR = 0b00
- WRAP_4 = 0b01
- WRAP_8 = 0b10
+ LINEAR = 0b00
+ WRAP_4 = 0b01
+ WRAP_8 = 0b10
WRAP_16 = 0b11
Optional. Corresponds to Wishbone signal ``BTE_O`` (initiator)
or ``BTE_I`` (target).
"""
+
def __init__(self, *, addr_width, data_width, granularity=None, features=frozenset(),
alignment=0, name=None):
if not isinstance(addr_width, int) or addr_width < 0:
if granularity > data_width:
raise ValueError("Granularity {} may not be greater than data width {}"
.format(granularity, data_width))
- self.addr_width = addr_width
- self.data_width = data_width
+ self.addr_width = addr_width
+ self.data_width = data_width
self.granularity = granularity
granularity_bits = log2_int(data_width // granularity)
self._alignment = alignment
- self.memory_map = MemoryMap(addr_width=max(1, addr_width + granularity_bits),
- data_width=data_width >> granularity_bits,
- alignment=alignment)
+ self.memory_map = MemoryMap(addr_width=max(1, addr_width + granularity_bits),
+ data_width=data_width >> granularity_bits,
+ alignment=alignment)
self._features = set(features)
- unknown = self._features - {"rty", "err", "stall", "lock", "cti", "bte"}
+ unknown = self._features - \
+ {"rty", "err", "stall", "lock", "cti", "bte"}
if unknown:
raise ValueError("Optional signal(s) {} are not supported"
.format(", ".join(map(repr, unknown))))
layout = [
- ("adr", addr_width, Direction.FANOUT),
+ ("adr", addr_width, Direction.FANOUT),
("dat_w", data_width, Direction.FANOUT),
("dat_r", data_width, Direction.FANIN),
- ("sel", data_width // granularity, Direction.FANOUT),
- ("cyc", 1, Direction.FANOUT),
- ("stb", 1, Direction.FANOUT),
- ("we", 1, Direction.FANOUT),
- ("ack", 1, Direction.FANIN),
+ ("sel", data_width // granularity, Direction.FANOUT),
+ ("cyc", 1, Direction.FANOUT),
+ ("stb", 1, Direction.FANOUT),
+ ("we", 1, Direction.FANOUT),
+ ("ack", 1, Direction.FANIN),
]
if "err" in features:
layout += [("err", 1, Direction.FANIN)]
if "stall" in features:
layout += [("stall", 1, Direction.FANIN)]
if "lock" in features:
- layout += [("lock", 1, Direction.FANOUT)]
+ layout += [("lock", 1, Direction.FANOUT)]
if "cti" in features:
- layout += [("cti", CycleType, Direction.FANOUT)]
+ layout += [("cti", CycleType, Direction.FANOUT)]
if "bte" in features:
layout += [("bte", BurstTypeExt, Direction.FANOUT)]
super().__init__(layout, name=name, src_loc_at=1)
"but {}-bit long \"dat_r\""
.format(record, len(record.dat_w), len(record.dat_r)))
data_width = len(record.dat_w)
- if data_width%len(record.sel) != 0:
+ if data_width % len(record.sel) != 0:
raise AttributeError("Record {!r} has invalid granularity value because "
"its data width is {}-bit long but "
"its \"sel\" is {}-bit long"
granularity=granularity,
features=features,
alignment=0,
- name=record.name+"_intf")
+ name=record.name + "_intf")
class Decoder(Elaboratable):
bus : :class:`Interface`
Bus providing access to subordinate buses.
"""
+
def __init__(self, *, addr_width, data_width, granularity=None, features=frozenset(),
alignment=0):
- self.bus = Interface(addr_width=addr_width, data_width=data_width,
- granularity=granularity, features=features,
- alignment=alignment)
- self._map = self.bus.memory_map
+ self.bus = Interface(addr_width=addr_width, data_width=data_width,
+ granularity=granularity, features=features,
+ alignment=alignment)
+ self._map = self.bus.memory_map
self._subs = dict()
def align_to(self, alignment):
"translation)"
.format(sub_bus.data_width, sub_bus.granularity))
for opt_output in {"err", "rty", "stall"}:
- if hasattr(sub_bus, opt_output) and not hasattr(self.bus, opt_output):
+ if hasattr(sub_bus, opt_output) and not hasattr(
+ self.bus, opt_output):
raise ValueError("Subordinate bus has optional output {!r}, but the decoder "
"does not have a corresponding input"
.format(opt_output))
self._subs[sub_bus.memory_map] = sub_bus
- return self._map.add_window(sub_bus.memory_map, addr=addr, sparse=sparse)
+ return self._map.add_window(
+ sub_bus.memory_map, addr=addr, sparse=sparse)
def elaborate(self, platform):
m = Module()
- ack_fanin = 0
- err_fanin = 0
- rty_fanin = 0
+ ack_fanin = 0
+ err_fanin = 0
+ rty_fanin = 0
stall_fanin = 0
with m.Switch(self.bus.adr):
m.d.comb += [
sub_bus.adr.eq(self.bus.adr << log2_int(sub_ratio)),
sub_bus.dat_w.eq(self.bus.dat_w),
- sub_bus.sel.eq(Cat(Repl(sel, sub_ratio) for sel in self.bus.sel)),
+ sub_bus.sel.eq(Cat(Repl(sel, sub_ratio)
+ for sel in self.bus.sel)),
sub_bus.we.eq(self.bus.we),
sub_bus.stb.eq(self.bus.stb),
]
if hasattr(sub_bus, "lock"):
m.d.comb += sub_bus.lock.eq(getattr(self.bus, "lock", 0))
if hasattr(sub_bus, "cti"):
- m.d.comb += sub_bus.cti.eq(getattr(self.bus, "cti", CycleType.CLASSIC))
+ m.d.comb += sub_bus.cti.eq(getattr(self.bus,
+ "cti", CycleType.CLASSIC))
if hasattr(sub_bus, "bte"):
- m.d.comb += sub_bus.bte.eq(getattr(self.bus, "bte", BurstTypeExt.LINEAR))
+ m.d.comb += sub_bus.bte.eq(getattr(self.bus,
+ "bte", BurstTypeExt.LINEAR))
with m.Case(sub_pat[:-log2_int(self.bus.data_width // self.bus.granularity)]):
m.d.comb += [
bus : :class:`Interface`
Shared bus to which the selected initiator gains access.
"""
+
def __init__(self, *, addr_width, data_width, granularity=None, features=frozenset(),
scheduler="rr"):
- self.bus = Interface(addr_width=addr_width, data_width=data_width,
- granularity=granularity, features=features)
+ self.bus = Interface(addr_width=addr_width, data_width=data_width,
+ granularity=granularity, features=features)
self._itors = []
if scheduler not in ["rr"]:
raise ValueError("Scheduling mode must be \"rr\", not {!r}"
"arbiter data width {}"
.format(itor_bus.data_width, self.bus.data_width))
for opt_output in {"lock", "cti", "bte"}:
- if hasattr(itor_bus, opt_output) and not hasattr(self.bus, opt_output):
+ if hasattr(itor_bus, opt_output) and not hasattr(
+ self.bus, opt_output):
raise ValueError("Initiator bus has optional output {!r}, but the arbiter "
"does not have a corresponding input"
.format(opt_output))
m.submodules.scheduler = scheduler = RoundRobin(len(self._itors))
grant = Signal(range(len(self._itors)))
- # CYC should not be indefinitely asserted. (See RECOMMENDATION 3.05, Wishbone B4)
+ # CYC should not be indefinitely asserted. (See RECOMMENDATION 3.05,
+ # Wishbone B4)
bus_busy = self.bus.cyc
if hasattr(self.bus, "lock"):
# If LOCK is not asserted, we also wait for STB to be deasserted before granting bus
m.d.comb += [
self.bus.adr.eq(itor_bus.adr),
self.bus.dat_w.eq(itor_bus.dat_w),
- self.bus.sel.eq(Cat(Repl(sel, ratio) for sel in itor_bus.sel)),
+ self.bus.sel.eq(Cat(Repl(sel, ratio)
+ for sel in itor_bus.sel)),
self.bus.we.eq(itor_bus.we),
self.bus.stb.eq(itor_bus.stb),
]
m.d.comb += self.bus.cyc.eq(itor_bus.cyc)
if hasattr(self.bus, "lock"):
- m.d.comb += self.bus.lock.eq(getattr(itor_bus, "lock", 1))
+ m.d.comb += self.bus.lock.eq(
+ getattr(itor_bus, "lock", 1))
if hasattr(self.bus, "cti"):
- m.d.comb += self.bus.cti.eq(getattr(itor_bus, "cti", CycleType.CLASSIC))
+ m.d.comb += self.bus.cti.eq(
+ getattr(itor_bus, "cti", CycleType.CLASSIC))
if hasattr(self.bus, "bte"):
- m.d.comb += self.bus.bte.eq(getattr(itor_bus, "bte", BurstTypeExt.LINEAR))
+ m.d.comb += self.bus.bte.eq(
+ getattr(itor_bus, "bte", BurstTypeExt.LINEAR))
m.d.comb += itor_bus.ack.eq(self.bus.ack)
if hasattr(itor_bus, "err"):
- m.d.comb += itor_bus.err.eq(getattr(self.bus, "err", 0))
+ m.d.comb += itor_bus.err.eq(
+ getattr(self.bus, "err", 0))
if hasattr(itor_bus, "rty"):
- m.d.comb += itor_bus.rty.eq(getattr(self.bus, "rty", 0))
+ m.d.comb += itor_bus.rty.eq(
+ getattr(self.bus, "rty", 0))
if hasattr(itor_bus, "stall"):
- m.d.comb += itor_bus_stall.eq(getattr(self.bus, "stall", ~self.bus.ack))
+ m.d.comb += itor_bus_stall.eq(
+ getattr(self.bus, "stall", ~self.bus.ack))
return m
decoder : :class:`Decoder`
The decoder that connects the shared bus to the list of SLAVEs.
"""
+
def __init__(self, *, addr_width, data_width, itors, targets, **kwargs):
self.addr_width = addr_width
self.data_width = data_width
The Wishbone bus interface providing access to the read/write
ports of the memory.
"""
+
def __init__(self, memory, read_only=False, bus=None,
granularity=None, features=frozenset()):
if not isinstance(memory, Memory):
# write
if not self.read_only:
- m.submodules.wrport = wrport = self.memory.write_port(granularity=self.granularity)
+ m.submodules.wrport = wrport = self.memory.write_port(
+ granularity=self.granularity)
m.d.comb += [
wrport.addr.eq(self.bus.adr[:len(rdport.addr)]),
wrport.data.eq(self.bus.dat_w)