soc: create SoCLocHandler and use it to simplify SoCCSRHandler and SoCIRQHandler
[litex.git] / litex / soc / integration / soc.py
1 #!/usr/bin/env python3
2
3 # This file is Copyright (c) 2020 Florent Kermarrec <florent@enjoy-digital.fr>
4 # License: BSD
5
6 import logging
7 import time
8 import datetime
9
10 from migen import *
11
12 from litex.soc.interconnect import wishbone
13
14 # TODO:
15 # - replace raise with exit on logging error.
16 # - add configurable CSR paging.
17 # - manage IO/Linker regions.
18
19 logging.basicConfig(level=logging.INFO)
20
21 # Helpers ------------------------------------------------------------------------------------------
22 def colorer(s, color="bright"):
23 header = {
24 "bright": "\x1b[1m",
25 "green": "\x1b[32m",
26 "cyan": "\x1b[36m",
27 "red": "\x1b[31m",
28 "yellow": "\x1b[33m",
29 "underline": "\x1b[4m"}[color]
30 trailer = "\x1b[0m"
31 return header + str(s) + trailer
32
33 def buildtime(with_time=True):
34 fmt = "%Y-%m-%d %H:%M:%S" if with_time else "%Y-%m-%d"
35 return datetime.datetime.fromtimestamp(time.time()).strftime("%Y-%m-%d %H:%M:%S")
36
37 # SoCRegion ----------------------------------------------------------------------------------------
38
39 class SoCRegion:
40 def __init__(self, origin=None, size=None, cached=True):
41 self.logger = logging.getLogger("SoCRegion")
42 self.origin = origin
43 self.size = size
44 self.cached = cached
45
46 def decoder(self):
47 origin = self.origin
48 size = self.size
49 origin &= ~0x80000000
50 size = 2**log2_int(size, False)
51 if (origin & (size - 1)) != 0:
52 self.logger.error("Origin needs to be aligned on size:")
53 self.logger.error(self)
54 raise
55 origin >>= 2 # bytes to words aligned
56 size >>= 2 # bytes to words aligned
57 return lambda a: (a[log2_int(size):-1] == (origin >> log2_int(size)))
58
59 def __str__(self):
60 r = ""
61 if self.origin is not None:
62 r += "Origin: {}, ".format(colorer("0x{:08x}".format(self.origin)))
63 if self.size is not None:
64 r += "Size: {}, ".format(colorer("0x{:08x}".format(self.size)))
65 r += "Cached: {}".format(colorer(self.cached))
66 return r
67
68
69 class SoCLinkerRegion(SoCRegion):
70 pass
71
72 # SoCBusHandler ------------------------------------------------------------------------------------
73
74 class SoCBusHandler:
75 supported_standard = ["wishbone"]
76 supported_data_width = [32, 64]
77 supported_address_width = [32]
78
79 # Creation -------------------------------------------------------------------------------------
80 def __init__(self, standard, data_width=32, address_width=32, timeout=1e6, reserved_regions={}):
81 self.logger = logging.getLogger("SoCBusHandler")
82 self.logger.info(colorer("Creating new Bus Handler...", color="cyan"))
83
84 # Check Standard
85 if standard not in self.supported_standard:
86 self.logger.error("Unsupported Standard: {} supporteds: {:s}".format(
87 colorer(standard, color="red"),
88 colorer(", ".join(self.supported_standard), color="green")))
89 raise
90
91 # Check Data Width
92 if data_width not in self.supported_data_width:
93 self.logger.error("Unsupported Data_Width: {} supporteds: {:s}".format(
94 colorer(data_width, color="red"),
95 colorer(", ".join(str(x) for x in self.supported_data_width), color="green")))
96 raise
97
98 # Check Address Width
99 if address_width not in self.supported_address_width:
100 self.logger.error("Unsupported Address Width: {} supporteds: {:s}".format(
101 colorer(data_width, color="red"),
102 colorer(", ".join(str(x) for x in self.supported_address_width), color="green")))
103 raise
104
105 # Create Bus
106 self.standard = standard
107 self.data_width = data_width
108 self.address_width = address_width
109 self.masters = {}
110 self.slaves = {}
111 self.regions = {}
112 self.timeout = timeout
113 self.logger.info("{}-bit {} Bus, {}GiB Address Space.".format(
114 colorer(data_width), colorer(standard), colorer(2**address_width/2**30)))
115
116 # Adding reserved regions
117 self.logger.info("Adding {} Regions...".format(colorer("reserved")))
118 for name, region in reserved_regions.items():
119 if isinstance(region, int):
120 region = SoCRegion(origin=region, size=0x1000000)
121 self.add_region(name, region)
122
123 self.logger.info(colorer("Bus Handler created.", color="cyan"))
124
125 # Add/Allog/Check Regions ----------------------------------------------------------------------
126 def add_region(self, name, region):
127 allocated = False
128 # Check if SoCLinkerRegion
129 if isinstance(region, SoCLinkerRegion):
130 self.logger.info("FIXME: SoCLinkerRegion")
131 # Check if SoCRegion
132 elif isinstance(region, SoCRegion):
133 # If no origin specified, allocate region.
134 if region.origin is None:
135 allocated = True
136 region = self.alloc_region(region.size, region.cached)
137 self.regions[name] = region
138 # Else add region and check for overlaps.
139 else:
140 self.regions[name] = region
141 overlap = self.check_region(self.regions)
142 if overlap is not None:
143 self.logger.error("Region overlap between {} and {}:".format(
144 colorer(overlap[0], color="red"),
145 colorer(overlap[1], color="red")))
146 self.logger.error(str(self.regions[overlap[0]]))
147 self.logger.error(str(self.regions[overlap[1]]))
148 raise
149 self.logger.info("{} Region {} {}.".format(
150 colorer(name, color="underline"),
151 colorer("allocated" if allocated else "added", color="yellow" if allocated else "green"),
152 str(region)))
153 else:
154 self.logger.error("{} is not a supported Region".format(colorer(name, color="red")))
155 raise
156
157 def alloc_region(self, size, cached=True):
158 self.logger.info("Allocating {} Region of size {}...".format(
159 colorer("Cached" if cached else "IO"),
160 colorer("0x{:08x}".format(size))))
161
162 # Limit Search Regions
163 uncached_regions = {}
164 for _, region in self.regions.items():
165 if region.cached == False:
166 uncached_regions[name] = region
167 if cached == False:
168 search_regions = uncached_regions
169 else:
170 search_regions = {"main": SoCRegion(origin=0x00000000, size=2**self.address_width-1)}
171
172 # Iterate on Search_Regions to find a Candidate
173 for _, search_region in search_regions.items():
174 origin = search_region.origin
175 while (origin + size) < (search_region.origin + search_region.size):
176 # Create a Candicate.
177 candidate = SoCRegion(origin=origin, size=size, cached=cached)
178 overlap = False
179 # Check Candidate does not overlap with allocated existing regions
180 for _, allocated in self.regions.items():
181 if self.check_region({"0": allocated, "1": candidate}) is not None:
182 origin = allocated.origin + allocated.size
183 overlap = True
184 break
185 if not overlap:
186 # If no overlap, the Candidate is selected
187 return candidate
188
189 self.logger.error("Not enough Address Space to allocate Region")
190 raise
191
192 def check_region(self, regions):
193 i = 0
194 while i < len(regions):
195 n0 = list(regions.keys())[i]
196 r0 = regions[n0]
197 for n1 in list(regions.keys())[i+1:]:
198 r1 = regions[n1]
199 if isinstance(r0, SoCLinkerRegion) or isinstance(r1, SoCLinkerRegion):
200 continue
201 if r0.origin >= (r1.origin + r1.size):
202 continue
203 if r1.origin >= (r0.origin + r0.size):
204 continue
205 return (n0, n1)
206 i += 1
207 return None
208
209 # Add Master/Slave -----------------------------------------------------------------------------
210 def add_master(self, name=None, master=None, io_regions={}):
211 if name is None:
212 name = "master{:d}".format(len(self.masters))
213 if name in self.masters.keys():
214 self.logger.error("{} already declared as Bus Master:".format(colorer(name, color="red")))
215 self.logger.error(self)
216 raise
217 self.masters[name] = master
218 self.logger.info("{} {} as Bus Master.".format(colorer(name, color="underline"), colorer("added", color="green")))
219 # FIXME: handle IO regions
220
221 def add_slave(self, name=None, slave=None, region=None):
222 no_name = name is None
223 no_region = region is None
224 if no_name and no_region:
225 self.logger.error("Please specify at least {} or {} of Bus Slave".format(
226 colorer("name", color="red"),
227 colorer("region", color="red")))
228 raise
229 if no_name:
230 name = "slave{:d}".format(len(self.slaves))
231 if no_region:
232 region = self.regions.get(name, None)
233 if region is None:
234 self.logger.error("Unable to find Region {}".format(colorer(name, color="red")))
235 raise
236 else:
237 self.add_region(name, region)
238 if name in self.slaves.keys():
239 self.logger.error("{} already declared as Bus Slave:".format(colorer(name, color="red")))
240 self.logger.error(self)
241 raise
242 self.slaves[name] = slave
243 self.logger.info("{} {} as Bus Slave.".format(
244 colorer(name, color="underline"),
245 colorer("added", color="green")))
246
247 # Str ------------------------------------------------------------------------------------------
248 def __str__(self):
249 r = "{}-bit {} Bus, {}GiB Address Space.\n".format(
250 colorer(self.data_width), colorer(self.standard), colorer(2**self.address_width/2**30))
251 r += "Bus Regions: ({})\n".format(len(self.regions.keys())) if len(self.regions.keys()) else ""
252 for name, region in self.regions.items():
253 r += colorer(name, color="underline") + " "*(20-len(name)) + ": " + str(region) + "\n"
254 r += "Bus Masters: ({})\n".format(len(self.masters.keys())) if len(self.masters.keys()) else ""
255 for name in self.masters.keys():
256 r += "- {}\n".format(colorer(name, color="underline"))
257 r += "Bus Slaves: ({})\n".format(len(self.slaves.keys())) if len(self.slaves.keys()) else ""
258 for name in self.slaves.keys():
259 r += "- {}\n".format(colorer(name, color="underline"))
260 r = r[:-1]
261 return r
262
263 # SoCLocHandler --------------------------------------------------------------------------------------
264
265 class SoCLocHandler:
266 # Creation -------------------------------------------------------------------------------------
267 def __init__(self, name, n_locs):
268 self.name = name
269 self.locs = {}
270 self.n_locs = n_locs
271
272 # Add ------------------------------------------------------------------------------------------
273 def add(self, name, n=None, use_loc_if_exists=False):
274 allocated = False
275 if not (use_loc_if_exists and name in self.locs.keys()):
276 if name in self.locs.keys():
277 self.logger.error("{} {} name already used.".format(colorer(name, "red"), self.name))
278 self.logger.error(self)
279 raise
280 if n in self.locs.values():
281 self.logger.error("{} {} Location already used.".format(colorer(n, "red"), self.name))
282 self.logger.error(self)
283 raise
284 if n is None:
285 allocated = True
286 n = self.alloc(name)
287 else:
288 if n < 0:
289 self.logger.error("{} {} Location should be positive.".format(
290 colorer(n, color="red"),
291 self.name))
292 raise
293 if n > self.n_locs:
294 self.logger.error("{} {} Location too high (Up to {}).".format(
295 colorer(n, color="red"),
296 self.name,
297 colorer(self.n_csrs, color="green")))
298 raise
299 self.locs[name] = n
300 else:
301 n = self.locs[name]
302 self.logger.info("{} {} {} at Location {}.".format(
303 colorer(name, color="underline"),
304 self.name,
305 colorer("allocated" if allocated else "added", color="yellow" if allocated else "green"),
306 colorer(n)))
307
308 # Alloc ----------------------------------------------------------------------------------------
309 def alloc(self, name):
310 for n in range(self.n_locs):
311 if n not in self.locs.values():
312 return n
313 self.logger.error("Not enough Locations.")
314 self.logger.error(self)
315 raise
316
317 # Str ------------------------------------------------------------------------------------------
318 def __str__(self):
319 r = "{} Locations: ({})\n".format(self.name, len(self.locs.keys())) if len(self.locs.keys()) else ""
320 for name in self.locs.keys():
321 r += "- {}{}: {}\n".format(colorer(name, color="underline"), " "*(20-len(name)), colorer(self.locs[name]))
322 return r
323
324 # SoCCSRHandler ------------------------------------------------------------------------------------
325
326 class SoCCSRHandler(SoCLocHandler):
327 supported_data_width = [8, 32]
328 supported_address_width = [14, 15]
329 supported_alignment = [32, 64]
330 supported_paging = [0x800]
331
332 # Creation -------------------------------------------------------------------------------------
333 def __init__(self, data_width=32, address_width=14, alignment=32, paging=0x800, reserved_csrs={}):
334 SoCLocHandler.__init__(self, "CSR", n_locs=4*2**address_width//paging) # FIXME
335 self.logger = logging.getLogger("SoCCSRHandler")
336 self.logger.info(colorer("Creating new CSR Handler...", color="cyan"))
337
338 # Check Data Width
339 if data_width not in self.supported_data_width:
340 self.logger.error("Unsupported data_width: {} supporteds: {:s}".format(
341 colorer(data_width, color="red"),
342 colorer(", ".join(str(x) for x in self.supported_data_width)), color="green"))
343 raise
344
345 # Check Address Width
346 if address_width not in self.supported_address_width:
347 self.logger.error("Unsupported address_width: {} supporteds: {:s}".format(
348 colorer(address_width, color="red"),
349 colorer(", ".join(str(x) for x in self.supported_address_width), color="green")))
350 raise
351
352 # Check Alignment
353 if alignment not in self.supported_alignment:
354 self.logger.error("Unsupported alignment: {} supporteds: {:s}".format(
355 colorer(alignment, color="red"),
356 colorer(", ".join(str(x) for x in self.supported_alignment), color="green")))
357 raise
358
359 # Check Paging
360 if paging not in self.supported_paging:
361 self.logger.error("Unsupported paging: {} supporteds: {:s}".format(
362 colorer(paging, color="red"),
363 colorer(", ".join(str(x) for x in self.supported_paging), color="green")))
364 raise
365
366 # Create CSR Handler
367 self.data_width = data_width
368 self.address_width = address_width
369 self.alignment = alignment
370 self.paging = paging
371 self.logger.info("{}-bit CSR Bus, {}KiB Address Space, {}B Paging (Up to {} Locations).\n".format(
372 colorer(self.data_width),
373 colorer(2**self.address_width/2**10),
374 colorer(self.paging),
375 colorer(self.n_locs)))
376
377 # Adding reserved CSRs
378 self.logger.info("Adding {} CSRs...".format(colorer("reserved")))
379 for name, n in reserved_csrs.items():
380 self.add(name, n)
381
382 self.logger.info(colorer("CSR Bus Handler created.", color="cyan"))
383
384 # Str ------------------------------------------------------------------------------------------
385 def __str__(self):
386 r = "{}-bit CSR Bus, {}KiB Address Space, {}B Paging (Up to {} Locations).\n".format(
387 colorer(self.data_width),
388 colorer(2**self.address_width/2**10),
389 colorer(self.paging),
390 colorer(self.n_locs))
391 r += SoCLocHandler.__str__(self)
392 r = r[:-1]
393 return r
394
395 # SoCIRQHandler ------------------------------------------------------------------------------------
396
397 class SoCIRQHandler(SoCLocHandler):
398 # Creation -------------------------------------------------------------------------------------
399 def __init__(self, n_irqs=32, reserved_irqs={}):
400 SoCLocHandler.__init__(self, "IRQ", n_locs=n_irqs)
401 self.logger = logging.getLogger("SoCIRQHandler")
402 self.logger.info(colorer("Creating new SoC IRQ Handler...", color="cyan"))
403
404 # Check IRQ Number
405 if n_irqs > 32:
406 self.logger.error("Unsupported IRQs number: {} supporteds: {:s}".format(
407 colorer(n, color="red"), colorer("Up to 32", color="green")))
408 raise
409
410 # Create IRQ Handler
411 self.logger.info("IRQ Handler (up to {} Locations).".format(colorer(n_irqs)))
412
413 # Adding reserved IRQs
414 self.logger.info("Adding {} IRQs...".format(colorer("reserved")))
415 for name, n in reserved_irqs.items():
416 self.add(name, n)
417
418 self.logger.info(colorer("IRQ Handler created.", color="cyan"))
419
420 # Str ------------------------------------------------------------------------------------------
421 def __str__(self):
422 r ="IRQ Handler (up to {} Locations).".format(colorer(self.n_locs))
423 r += SoCLocHandler.__str__(self)
424 r = r[:-1]
425 return r
426
427 # SoC ----------------------------------------------------------------------------------------------
428
429 class SoC(Module):
430 def __init__(self,
431 bus_standard = "wishbone",
432 bus_data_width = 32,
433 bus_address_width = 32,
434 bus_timeout = 1e6,
435 bus_reserved_regions = {},
436
437 csr_data_width = 32,
438 csr_address_width = 14,
439 csr_alignment = 32,
440 csr_paging = 0x800,
441 csr_reserved_csrs = {},
442
443 irq_n_irqs = 32,
444 irq_reserved_irqs = {},
445 ):
446
447 self.logger = logging.getLogger("SoC")
448 self.logger.info(colorer(" __ _ __ _ __ ", color="bright"))
449 self.logger.info(colorer(" / / (_) /____ | |/_/ ", color="bright"))
450 self.logger.info(colorer(" / /__/ / __/ -_)> < ", color="bright"))
451 self.logger.info(colorer(" /____/_/\\__/\\__/_/|_| ", color="bright"))
452 self.logger.info(colorer(" Build your hardware, easily!", color="bright"))
453
454 self.logger.info(colorer("-"*80, color="bright"))
455 self.logger.info(colorer("Creating new SoC... ({})".format(buildtime()), color="cyan"))
456 self.logger.info(colorer("-"*80, color="bright"))
457
458 # SoC Bus Handler --------------------------------------------------------------------------
459 self.bus = SoCBusHandler(
460 standard = bus_standard,
461 data_width = bus_data_width,
462 address_width = bus_address_width,
463 timeout = bus_timeout,
464 reserved_regions = bus_reserved_regions,
465 )
466
467 # SoC Bus Handler --------------------------------------------------------------------------
468 self.csr = SoCCSRHandler(
469 data_width = csr_data_width,
470 address_width = csr_address_width,
471 alignment = csr_alignment,
472 paging = csr_paging,
473 reserved_csrs = csr_reserved_csrs,
474 )
475
476 # SoC IRQ Handler --------------------------------------------------------------------------
477 self.irq = SoCIRQHandler(
478 n_irqs = irq_n_irqs,
479 reserved_irqs = irq_reserved_irqs
480 )
481
482 self.logger.info(colorer("-"*80, color="bright"))
483 self.logger.info(colorer("Initial SoC:", color="cyan"))
484 self.logger.info(colorer("-"*80, color="bright"))
485 self.logger.info(self.bus)
486 self.logger.info(self.csr)
487 self.logger.info(self.irq)
488 self.logger.info(colorer("-"*80, color="bright"))
489
490
491 def do_finalize(self):
492 self.logger.info(colorer("-"*80, color="bright"))
493 self.logger.info(colorer("Finalized SoC:", color="cyan"))
494 self.logger.info(colorer("-"*80, color="bright"))
495 self.logger.info(self.bus)
496 self.logger.info(self.csr)
497 self.logger.info(self.irq)
498 self.logger.info(colorer("-"*80, color="bright"))
499
500 # SoC Bus Interconnect ---------------------------------------------------------------------
501 bus_masters = self.bus.masters.values()
502 bus_slaves = [(self.bus.regions[n].decoder(), s) for n, s in self.bus.slaves.items()]
503 if len(bus_masters) and len(bus_slaves):
504 self.submodules.bus_interconnect = wishbone.InterconnectShared(
505 masters = bus_masters,
506 slaves = bus_slaves,
507 register = True,
508 timeout_cycles = self.bus.timeout)
509
510 #exit()
511
512 # Test (FIXME: move to litex/text and improve) -----------------------------------------------------
513
514 if __name__ == "__main__":
515 bus = SoCBusHandler("wishbone", reserved_regions={
516 "rom": SoCRegion(origin=0x00000100, size=1024),
517 "ram": SoCRegion(size=512),
518 }
519 )
520 bus.add_master("cpu", None)
521 bus.add_slave("rom", None, SoCRegion(size=1024))
522 bus.add_slave("ram", None, SoCRegion(size=1024))
523
524
525 csr = SoCCSRHandler(reserved_csrs={"ctrl": 0, "uart": 1})
526 csr.add("csr0")
527 csr.add("csr1", 0)
528 #csr.add("csr2", 46)
529 csr.add("csr3", -1)
530 print(bus)
531 print(csr)
532
533 irq = SoCIRQHandler(reserved_irqs={"uart": 1})
534
535 soc = SoC()