soc/integration: add new soc class prorotype with SoCRegion/SoCBus/SoCCSR/SoCIRQ/SoC
authorFlorent Kermarrec <florent@enjoy-digital.fr>
Thu, 6 Feb 2020 10:06:41 +0000 (11:06 +0100)
committerFlorent Kermarrec <florent@enjoy-digital.fr>
Thu, 6 Feb 2020 10:06:41 +0000 (11:06 +0100)
litex/soc/integration/soc.py [new file with mode: 0755]

diff --git a/litex/soc/integration/soc.py b/litex/soc/integration/soc.py
new file mode 100755 (executable)
index 0000000..3847504
--- /dev/null
@@ -0,0 +1,559 @@
+#!/usr/bin/env python3
+
+# This file is Copyright (c) 2020 Florent Kermarrec <florent@enjoy-digital.fr>
+# License: BSD
+
+import logging
+import time
+import datetime
+
+from migen import *
+
+from litex.soc.interconnect import wishbone
+
+# TODO:
+# - replace raise with exit on logging error.
+# - use common module for SoCCSR/SoCIRQ.
+# - add configurable CSR paging.
+# - manage IO/Linker regions.
+
+logging.basicConfig(level=logging.INFO)
+
+# Helpers ------------------------------------------------------------------------------------------
+def colorer(s, color="bright"):
+    header  = {
+        "bright": "\x1b[1m",
+        "green":  "\x1b[32m",
+        "cyan":   "\x1b[36m",
+        "red":    "\x1b[31m",
+        "yellow": "\x1b[33m",
+        "underline": "\x1b[4m"}[color]
+    trailer = "\x1b[0m"
+    return header + str(s) + trailer
+
+def buildtime(with_time=True):
+    fmt = "%Y-%m-%d %H:%M:%S" if with_time else "%Y-%m-%d"
+    return datetime.datetime.fromtimestamp(time.time()).strftime("%Y-%m-%d %H:%M:%S")
+
+# SoCRegion ----------------------------------------------------------------------------------------
+
+class SoCRegion:
+    def __init__(self, origin=None, size=None, cached=True):
+        self.logger = logging.getLogger("SoCRegion")
+        self.origin = origin
+        self.size   = size
+        self.cached = cached
+
+    def decoder(self):
+        origin = self.origin
+        size   = self.size
+        origin &= ~0x80000000
+        size   = 2**log2_int(size, False)
+        if (origin & (size - 1)) != 0:
+            self.logger.error("Origin needs to be aligned on size:")
+            self.logger.error(self)
+            raise
+        origin >>= 2 # bytes to words aligned
+        size   >>= 2 # bytes to words aligned
+        return lambda a: (a[log2_int(size):-1] == (origin >> log2_int(size)))
+
+    def __str__(self):
+        r = ""
+        if self.origin is not None:
+            r += "Origin: {}, ".format(colorer("0x{:08x}".format(self.origin)))
+        if self.size is not None:
+            r += "Size: {}, ".format(colorer("0x{:08x}".format(self.size)))
+        r += "Cached: {}".format(colorer(self.cached))
+        return r
+
+
+class SoCLinkerRegion(SoCRegion):
+    pass
+
+# SoCBus -------------------------------------------------------------------------------------------
+
+class SoCBus:
+    supported_standard      = ["wishbone"]
+    supported_data_width    = [32, 64]
+    supported_address_width = [32]
+
+    # Creation -------------------------------------------------------------------------------------
+    def __init__(self, standard, data_width=32, address_width=32, timeout=1e6, reserved_regions={}):
+        self.logger = logging.getLogger("SoCBus")
+        self.logger.info(colorer("Creating new Bus Handler...", color="cyan"))
+
+        # Check Standard
+        if standard not in self.supported_standard:
+            self.logger.error("Unsupported Standard: {} supporteds: {:s}".format(
+                colorer(standard, color="red"),
+                colorer(", ".join(self.supported_standard), color="green")))
+            raise
+
+        # Check Data Width
+        if data_width not in self.supported_data_width:
+            self.logger.error("Unsupported Data_Width: {} supporteds: {:s}".format(
+                colorer(data_width, color="red"),
+                colorer(", ".join(str(x) for x in self.supported_data_width), color="green")))
+            raise
+
+        # Check Address Width
+        if address_width not in self.supported_address_width:
+            self.logger.error("Unsupported Address Width: {} supporteds: {:s}".format(
+                colorer(data_width, color="red"),
+                colorer(", ".join(str(x) for x in self.supported_address_width), color="green")))
+            raise
+
+        # Create Bus
+        self.standard      = standard
+        self.data_width    = data_width
+        self.address_width = address_width
+        self.masters       = {}
+        self.slaves        = {}
+        self.regions       = {}
+        self.timeout       = timeout
+        self.logger.info("{}-bit {} Bus, {}GiB Address Space.".format(
+            colorer(data_width), colorer(standard), colorer(2**address_width/2**30)))
+
+        # Adding reserved regions
+        self.logger.info("Adding {} Regions...".format(colorer("reserved")))
+        for name, region in reserved_regions.items():
+            if isinstance(region, int):
+                region = SoCRegion(origin=region, size=0x1000000)
+            self.add_region(name, region)
+
+        self.logger.info(colorer("Bus Handler created.", color="cyan"))
+
+    # Add/Allog/Check Regions ----------------------------------------------------------------------
+    def add_region(self, name, region):
+        allocated = False
+        # Check if SoCLinkerRegion
+        if isinstance(region, SoCLinkerRegion):
+            self.logger.info("FIXME: SoCLinkerRegion")
+        # Check if SoCRegion
+        elif isinstance(region, SoCRegion):
+            # If no origin specified, allocate region.
+            if region.origin is None:
+                allocated = True
+                region    = self.alloc_region(region.size, region.cached)
+                self.regions[name] = region
+            # Else add region and check for overlaps.
+            else:
+                self.regions[name] = region
+                overlap = self.check_region(self.regions)
+                if overlap is not None:
+                    self.logger.error("Region overlap between {} and {}:".format(
+                        colorer(overlap[0], color="red"),
+                        colorer(overlap[1], color="red")))
+                    self.logger.error(str(self.regions[overlap[0]]))
+                    self.logger.error(str(self.regions[overlap[1]]))
+                    raise
+            self.logger.info("{} Region {} {}.".format(
+                colorer(name, color="underline"),
+                colorer("allocated" if allocated else "added", color="yellow" if allocated else "green"),
+                str(region)))
+        else:
+            self.logger.error("{} is not a supported Region".format(colorer(name, color="red")))
+            raise
+
+    def alloc_region(self, size, cached=True):
+        self.logger.info("Allocating {} Region of size {}...".format(
+            colorer("Cached" if cached else "IO"),
+            colorer("0x{:08x}".format(size))))
+
+        # Limit Search Regions
+        uncached_regions = {}
+        for _, region in self.regions.items():
+            if region.cached == False:
+                uncached_regions[name] = region
+        if cached == False:
+            search_regions = uncached_regions
+        else:
+            search_regions = {"main": SoCRegion(origin=0x00000000, size=2**self.address_width-1)}
+
+        # Iterate on Search_Regions to find a Candidate
+        for _, search_region in search_regions.items():
+            origin = search_region.origin
+            while (origin + size) < (search_region.origin + search_region.size):
+                # Create a Candicate.
+                candidate = SoCRegion(origin=origin, size=size, cached=cached)
+                overlap   = False
+                # Check Candidate does not overlap with allocated existing regions
+                for _, allocated in self.regions.items():
+                    if self.check_region({"0": allocated, "1": candidate}) is not None:
+                        origin  = allocated.origin + allocated.size
+                        overlap = True
+                        break
+                if not overlap:
+                    # If no overlap, the Candidate is selected
+                    return candidate
+
+        self.logger.error("Not enough Address Space to allocate Region")
+        raise
+
+    def check_region(self, regions):
+        i = 0
+        while i < len(regions):
+            n0 =  list(regions.keys())[i]
+            r0 = regions[n0]
+            for n1 in list(regions.keys())[i+1:]:
+                r1 = regions[n1]
+                if isinstance(r0, SoCLinkerRegion) or isinstance(r1, SoCLinkerRegion):
+                    continue
+                if r0.origin >= (r1.origin + r1.size):
+                    continue
+                if r1.origin >= (r0.origin + r0.size):
+                    continue
+                return (n0, n1)
+            i += 1
+        return None
+
+    # Add Master/Slave -----------------------------------------------------------------------------
+    def add_master(self, name=None, master=None, io_regions={}):
+        if name is None:
+            name = "master{:d}".format(len(self.masters))
+        if name in self.masters.keys():
+            self.logger.error("{} already declared as Bus Master:".format(colorer(name, color="red")))
+            self.logger.error(self)
+            raise
+        self.masters[name] = master
+        self.logger.info("{} {} as Bus Master.".format(colorer(name, color="underline"), colorer("added", color="green")))
+        # FIXME: handle IO regions
+
+    def add_slave(self, name=None, slave=None, region=None):
+        no_name   = name is None
+        no_region = region is None
+        if no_name and no_region:
+            self.logger.error("Please specify at least {} or {} of Bus Slave".format(
+                colorer("name",   color="red"),
+                colorer("region", color="red")))
+            raise
+        if no_name:
+            name = "slave{:d}".format(len(self.slaves))
+        if no_region:
+            region = self.regions.get(name, None)
+            if region is None:
+                self.logger.error("Unable to find Region {}".format(colorer(name, color="red")))
+                raise
+        else:
+             self.add_region(name, region)
+        if name in self.slaves.keys():
+            self.logger.error("{} already declared as Bus Slave:".format(colorer(name, color="red")))
+            self.logger.error(self)
+            raise
+        self.slaves[name] = slave
+        self.logger.info("{} {} as Bus Slave.".format(
+            colorer(name, color="underline"),
+            colorer("added", color="green")))
+
+    # Str ------------------------------------------------------------------------------------------
+    def __str__(self):
+        r = "{}-bit {} Bus, {}GiB Address Space.\n".format(
+            colorer(self.data_width), colorer(self.standard), colorer(2**self.address_width/2**30))
+        r += "Bus Regions: ({})\n".format(len(self.regions.keys())) if len(self.regions.keys()) else ""
+        for name, region in self.regions.items():
+           r += colorer(name, color="underline") + " "*(20-len(name)) + ": " + str(region) + "\n"
+        r += "Bus Masters: ({})\n".format(len(self.masters.keys())) if len(self.masters.keys()) else ""
+        for name in self.masters.keys():
+           r += "- {}\n".format(colorer(name, color="underline"))
+        r += "Bus Slaves: ({})\n".format(len(self.slaves.keys())) if len(self.slaves.keys()) else ""
+        for name in self.slaves.keys():
+           r += "- {}\n".format(colorer(name, color="underline"))
+        r = r[:-1]
+        return r
+
+# SoCCSR ----------------------------------------------------------------------------------------
+
+class SoCCSR:
+    supported_data_width    = [8, 32]
+    supported_address_width = [14, 15]
+    supported_alignment     = [32, 64]
+    supported_paging        = [0x800]
+
+    # Creation -------------------------------------------------------------------------------------
+    def __init__(self, data_width=32, address_width=14, alignment=32, paging=0x800, reserved_csrs={}):
+        self.logger = logging.getLogger("SoCCSR")
+        self.logger.info(colorer("Creating new CSR Handler...", color="cyan"))
+
+        # Check Data Width
+        if data_width not in self.supported_data_width:
+            self.logger.error("Unsupported data_width: {} supporteds: {:s}".format(
+                colorer(data_width, color="red"),
+                colorer(", ".join(str(x) for x in self.supported_data_width)), color="green"))
+            raise
+
+        # Check Address Width
+        if address_width not in self.supported_address_width:
+            self.logger.error("Unsupported address_width: {} supporteds: {:s}".format(
+                colorer(address_width, color="red"),
+                colorer(", ".join(str(x) for x in self.supported_address_width), color="green")))
+            raise
+
+        # Check Alignment
+        if alignment not in self.supported_alignment:
+            self.logger.error("Unsupported alignment: {} supporteds: {:s}".format(
+                colorer(alignment, color="red"),
+                colorer(", ".join(str(x) for x in self.supported_alignment), color="green")))
+            raise
+
+        # Check Paging
+        if paging not in self.supported_paging:
+            self.logger.error("Unsupported paging: {} supporteds: {:s}".format(
+                colorer(paging, color="red"),
+                colorer(", ".join(str(x) for x in self.supported_paging), color="green")))
+            raise
+
+        # Create CSR Handler
+        self.data_width    = data_width
+        self.address_width = address_width
+        self.alignment     = alignment
+        self.paging        = paging
+        self.csrs          = {}
+        self.n_csrs        = 4*2**address_width//paging # FIXME
+        self.logger.info("{}-bit CSR Bus, {}KiB Address Space, {}B Paging (Up to {} Locations).\n".format(
+            colorer(self.data_width),
+            colorer(2**self.address_width/2**10),
+            colorer(self.paging),
+            self.n_csrs))
+
+        # Adding reserved CSRs
+        self.logger.info("Adding {} CSRs...".format(colorer("reserved")))
+        for name, n in reserved_csrs.items():
+            self.add(name, n)
+
+        self.logger.info(colorer("CSR Bus Handler created.", color="cyan"))
+
+    # Add ------------------------------------------------------------------------------------------
+    def add(self, name, n=None):
+        allocated = False
+        if name in self.csrs.keys():
+            self.logger.error("{} CSR name already used.".format(colorer(name, "red")))
+            self.logger.error(self)
+            raise
+        if n in self.csrs.values():
+            self.logger.error("{} CSR Location already used.".format(colorer(n, "red")))
+            self.logger.error(self)
+            raise
+        if n is None:
+            allocated = True
+            n = self.alloc(name)
+        else:
+            if n < 0:
+                self.logger.error("{} CSR Location should be positive.".format(
+                    colorer(n, color="red")))
+                raise
+            if n > self.n_csrs:
+                self.logger.error("{} CSR Location too high (Up to {}).".format(
+                    colorer(n, color="red"),
+                    colorer(self.n_csrs, color="green")))
+                raise
+        self.csrs[name] = n
+        self.logger.info("{} CSR {} at Location {}.".format(
+            colorer(name, color="underline"),
+            colorer("allocated" if allocated else "added", color="yellow" if allocated else "green"),
+            colorer(n)))
+
+    # Alloc ----------------------------------------------------------------------------------------
+    def alloc(self, name):
+        for n in range(self.data_width//8*2**self.address_width//self.paging):
+            if n not in self.csrs.values():
+                return n
+        self.logger.error("Not enough CSR Locations.")
+        self.logger.error(self)
+        raise
+
+    # Str ------------------------------------------------------------------------------------------
+    def __str__(self):
+        r = "{}-bit CSR Bus, {}KiB Address Space, {}B Paging (Up to {} Locations).\n".format(
+            colorer(self.data_width),
+            colorer(2**self.address_width/2**10),
+            colorer(self.paging),
+            self.n_csrs)
+        r += "CSR Locations: ({})\n".format(len(self.csrs.keys())) if len(self.csrs.keys()) else ""
+        for name in self.csrs.keys():
+           r += "- {}{}: {}\n".format(colorer(name, color="underline"), " "*(20-len(name)), colorer(self.csrs[name]))
+        r = r[:-1]
+        return r
+
+# SoCIRQ -------------------------------------------------------------------------------------------
+
+class SoCIRQ:
+    # Creation -------------------------------------------------------------------------------------
+    def __init__(self, n_irqs=32, reserved_irqs={}):
+        self.logger = logging.getLogger("SoCIRQ")
+        self.logger.info(colorer("Creating new SoC IRQ Handler...", color="cyan"))
+
+        # Check IRQ Number
+        if n_irqs > 32:
+            self.logger.error("Unsupported IRQs number: {} supporteds: {:s}".format(
+                colorer(n, color="red"), colorer("Up to 32", color="green")))
+            raise
+
+        # Create IRQ Handler
+        self.n_irqs = n_irqs
+        self.irqs   = {}
+        self.logger.info("IRQ Handler (up to {} Locations).".format(colorer(n_irqs)))
+
+        # Adding reserved IRQs
+        self.logger.info("Adding {} IRQs...".format(colorer("reserved")))
+        for name, n in reserved_irqs.items():
+            self.add(name, n)
+
+        self.logger.info(colorer("IRQ Handler created.", color="cyan"))
+
+    # Add ------------------------------------------------------------------------------------------
+    def add(self, name, n=None):
+        allocated = False
+        if name in self.irqs.keys():
+            self.logger.error("{} IRQ name already used.".format(colorer(name, "red")))
+            self.logger.error(self)
+            raise
+        if n in self.irqs.values():
+            self.logger.error("{} IRQ Location already used.".format(colorer(n, "red")))
+            self.logger.error(self)
+            raise
+        if n is None:
+            allocated = True
+            n = self.alloc(name)
+        else:
+            if n < 0:
+                self.logger.error("{} IRQ Location should be positive.".format(
+                    colorer(n, color="red")))
+                raise
+            if n > self.n_irqs:
+                self.logger.error("{} IRQ Location too high (Up to {}).".format(
+                    colorer(n, color="red"),
+                    colorer(self.n_csrs, color="green")))
+                raise
+        self.irqs[name] = n
+        self.logger.info("{} IRQ {} at Location {}.".format(
+            colorer(name, color="underline"),
+            colorer("allocated" if allocated else "added", color="yellow" if allocated else "green"),
+            colorer(n)))
+
+    # Alloc ----------------------------------------------------------------------------------------
+    def alloc(self, name):
+        for n in range(self.n_irqs):
+            if n not in self.irqs.values():
+                return n
+        self.logger.error("Not enough Locations.")
+        self.logger.error(self)
+        raise
+
+    # Str ------------------------------------------------------------------------------------------
+    def __str__(self):
+        r ="IRQ Handler (up to {} Locations).\n".format(colorer(self.n_irqs))
+        r += "IRQs Locations:\n" if len(self.irqs.keys()) else ""
+        for name in self.irqs.keys():
+           r += "- {}{}: {}\n".format(colorer(name, color="underline"), " "*(20-len(name)), colorer(self.irqs[name]))
+        r = r[:-1]
+        return r
+
+# SoC ----------------------------------------------------------------------------------------------
+
+class SoC(Module):
+    def __init__(self,
+        bus_standard         = "wishbone",
+        bus_data_width       = 32,
+        bus_address_width    = 32,
+        bus_timeout          = 1e6,
+        bus_reserved_regions = {},
+
+        csr_data_width       = 32,
+        csr_address_width    = 14,
+        csr_alignment        = 32,
+        csr_paging           = 0x800,
+        csr_reserved_csrs    = {},
+
+        irq_n_irqs           = 32,
+        irq_reserved_irqs    = {},
+        ):
+
+        self.logger = logging.getLogger("SoC")
+        self.logger.info(colorer("        __   _ __      _  __  ", color="bright"))
+        self.logger.info(colorer("       / /  (_) /____ | |/_/  ", color="bright"))
+        self.logger.info(colorer("      / /__/ / __/ -_)>  <    ", color="bright"))
+        self.logger.info(colorer("     /____/_/\\__/\\__/_/|_|  ", color="bright"))
+        self.logger.info(colorer("  Build your hardware, easily!", color="bright"))
+
+        self.logger.info(colorer("-"*80, color="bright"))
+        self.logger.info(colorer("Creating new SoC... ({})".format(buildtime()), color="cyan"))
+        self.logger.info(colorer("-"*80, color="bright"))
+
+        # SoC Bus Handler --------------------------------------------------------------------------
+        self.bus = SoCBus(
+            standard         = bus_standard,
+            data_width       = bus_data_width,
+            address_width    = bus_address_width,
+            timeout          = bus_timeout,
+            reserved_regions = bus_reserved_regions,
+           )
+
+        # SoC Bus Handler --------------------------------------------------------------------------
+        self.csr = SoCCSR(
+            data_width    = csr_data_width,
+            address_width = csr_address_width,
+            alignment     = csr_alignment,
+            paging        = csr_paging,
+            reserved_csrs = csr_reserved_csrs,
+        )
+
+        # SoC IRQ Handler --------------------------------------------------------------------------
+        self.irq = SoCIRQ(
+            n_irqs        = irq_n_irqs,
+            reserved_irqs = irq_reserved_irqs
+        )
+
+        self.logger.info(colorer("-"*80, color="bright"))
+        self.logger.info(colorer("Initial SoC:", color="cyan"))
+        self.logger.info(colorer("-"*80, color="bright"))
+        self.logger.info(self.bus)
+        self.logger.info(self.csr)
+        self.logger.info(self.irq)
+        self.logger.info(colorer("-"*80, color="bright"))
+
+
+    def do_finalize(self):
+        self.logger.info(colorer("-"*80, color="bright"))
+        self.logger.info(colorer("Finalized SoC:", color="cyan"))
+        self.logger.info(colorer("-"*80, color="bright"))
+        self.logger.info(self.bus)
+        self.logger.info(self.csr)
+        self.logger.info(self.irq)
+        self.logger.info(colorer("-"*80, color="bright"))
+
+        # SoC Bus Interconnect ---------------------------------------------------------------------
+        bus_masters = self.bus.masters.values()
+        bus_slaves  = [(self.bus.regions[n].decoder(), s) for n, s in self.bus.slaves.items()]
+        if len(bus_masters) and len(bus_slaves):
+            self.submodules.bus_interconnect = wishbone.InterconnectShared(
+                masters        = bus_masters,
+                slaves         = bus_slaves,
+                register       = True,
+                timeout_cycles = self.bus.timeout)
+
+        #exit()
+
+# Test (FIXME: move to litex/text and improve) -----------------------------------------------------
+
+if __name__ == "__main__":
+    bus = SoCBus("wishbone", reserved_regions={
+        "rom": SoCRegion(origin=0x00000100, size=1024),
+        "ram": SoCRegion(size=512),
+        }
+    )
+    bus.add_master("cpu", None)
+    bus.add_slave("rom", None, SoCRegion(size=1024))
+    bus.add_slave("ram", None, SoCRegion(size=1024))
+
+
+    csr = SoCCSR(reserved_csrs={"ctrl": 0, "uart": 1})
+    csr.add("csr0")
+    csr.add("csr1", 0)
+    #csr.add("csr2", 46)
+    csr.add("csr3", -1)
+    print(bus)
+    print(csr)
+
+    irq = SoCIRQ(reserved_irqs={"uart": 1})
+
+    soc = SoC()