annoying spelling errors
[nmigen.git] / nmigen / hdl / dsl.py
1 from collections import OrderedDict
2 from contextlib import contextmanager, _GeneratorContextManager
3 from functools import wraps
4 from enum import Enum
5 import warnings
6
7 from .._utils import flatten, bits_for
8 from .. import tracer
9 from .ast import *
10 from .ir import *
11 from .cd import *
12 from .xfrm import *
13
14
15 __all__ = ["SyntaxError", "SyntaxWarning", "Module"]
16
17
18 class SyntaxError(Exception):
19 pass
20
21
22 class SyntaxWarning(Warning):
23 pass
24
25
26 class _ModuleBuilderProxy:
27 def __init__(self, builder, depth):
28 object.__setattr__(self, "_builder", builder)
29 object.__setattr__(self, "_depth", depth)
30
31
32 class _ModuleBuilderDomain(_ModuleBuilderProxy):
33 def __init__(self, builder, depth, domain):
34 super().__init__(builder, depth)
35 self._domain = domain
36
37 def __iadd__(self, assigns):
38 self._builder._add_statement(assigns, domain=self._domain, depth=self._depth)
39 return self
40
41
42 class _ModuleBuilderDomains(_ModuleBuilderProxy):
43 def __getattr__(self, name):
44 if name == "submodules":
45 warnings.warn("Using '<module>.d.{}' would add statements to clock domain {!r}; "
46 "did you mean <module>.{} instead?"
47 .format(name, name, name),
48 SyntaxWarning, stacklevel=2)
49 if name == "comb":
50 domain = None
51 else:
52 domain = name
53 return _ModuleBuilderDomain(self._builder, self._depth, domain)
54
55 def __getitem__(self, name):
56 return self.__getattr__(name)
57
58 def __setattr__(self, name, value):
59 if name == "_depth":
60 object.__setattr__(self, name, value)
61 elif not isinstance(value, _ModuleBuilderDomain):
62 raise AttributeError("Cannot assign 'd.{}' attribute; did you mean 'd.{} +='?"
63 .format(name, name))
64
65 def __setitem__(self, name, value):
66 return self.__setattr__(name, value)
67
68
69 class _ModuleBuilderRoot:
70 def __init__(self, builder, depth):
71 self._builder = builder
72 self.domain = self.d = _ModuleBuilderDomains(builder, depth)
73
74 def __getattr__(self, name):
75 if name in ("comb", "sync"):
76 raise AttributeError("'{}' object has no attribute '{}'; did you mean 'd.{}'?"
77 .format(type(self).__name__, name, name))
78 raise AttributeError("'{}' object has no attribute '{}'"
79 .format(type(self).__name__, name))
80
81
82 class _ModuleBuilderSubmodules:
83 def __init__(self, builder):
84 object.__setattr__(self, "_builder", builder)
85
86 def __iadd__(self, modules):
87 for module in flatten([modules]):
88 self._builder._add_submodule(module)
89 return self
90
91 def __setattr__(self, name, submodule):
92 self._builder._add_submodule(submodule, name)
93
94 def __setitem__(self, name, value):
95 return self.__setattr__(name, value)
96
97 def __getattr__(self, name):
98 return self._builder._get_submodule(name)
99
100 def __getitem__(self, name):
101 return self.__getattr__(name)
102
103
104 class _ModuleBuilderDomainSet:
105 def __init__(self, builder):
106 object.__setattr__(self, "_builder", builder)
107
108 def __iadd__(self, domains):
109 for domain in flatten([domains]):
110 if not isinstance(domain, ClockDomain):
111 raise TypeError("Only clock domains may be added to `m.domains`, not {!r}"
112 .format(domain))
113 self._builder._add_domain(domain)
114 return self
115
116 def __setattr__(self, name, domain):
117 if not isinstance(domain, ClockDomain):
118 raise TypeError("Only clock domains may be added to `m.domains`, not {!r}"
119 .format(domain))
120 if domain.name != name:
121 raise NameError("Clock domain name {!r} must match name in `m.domains.{} += ...` "
122 "syntax"
123 .format(domain.name, name))
124 self._builder._add_domain(domain)
125
126
127 # It's not particularly clean to depend on an internal interface, but, unfortunately, __bool__
128 # must be defined on a class to be called during implicit conversion.
129 class _GuardedContextManager(_GeneratorContextManager):
130 def __init__(self, keyword, func, args, kwds):
131 self.keyword = keyword
132 return super().__init__(func, args, kwds)
133
134 def __bool__(self):
135 raise SyntaxError("`if m.{kw}(...):` does not work; use `with m.{kw}(...)`"
136 .format(kw=self.keyword))
137
138
139 def _guardedcontextmanager(keyword):
140 def decorator(func):
141 @wraps(func)
142 def helper(*args, **kwds):
143 return _GuardedContextManager(keyword, func, args, kwds)
144 return helper
145 return decorator
146
147
148 class FSM:
149 def __init__(self, state, encoding, decoding):
150 self.state = state
151 self.encoding = encoding
152 self.decoding = decoding
153
154 def ongoing(self, name):
155 if name not in self.encoding:
156 self.encoding[name] = len(self.encoding)
157 return Operator("==", [self.state, self.encoding[name]], src_loc_at=0)
158
159
160 class Module(_ModuleBuilderRoot, Elaboratable):
161 @classmethod
162 def __init_subclass__(cls):
163 raise SyntaxError("Instead of inheriting from `Module`, inherit from `Elaboratable` "
164 "and return a `Module` from the `elaborate(self, platform)` method")
165
166 def __init__(self, _astType=None):
167 _ModuleBuilderRoot.__init__(self, self, depth=0)
168 self.submodules = _ModuleBuilderSubmodules(self)
169 self.domains = _ModuleBuilderDomainSet(self)
170
171 self._statements = Statement.cast([])
172 self._ctrl_context = None
173 self._ctrl_stack = []
174
175 self._driving = SignalDict()
176 self._named_submodules = {}
177 self._anon_submodules = []
178 self._domains = {}
179 self._generated = {}
180
181 # to complete the Type 1 (ast.*) nmigen language construct abstraction
182 # from Type 2 (Module - this class) Module must be told what AST type
183 # it may cast m.If/Elif conditions and m.Switch
184 self._astType = _astType or Value
185
186 def _check_context(self, construct, context):
187 if self._ctrl_context != context:
188 if self._ctrl_context is None:
189 raise SyntaxError("{} is not permitted outside of {}"
190 .format(construct, context))
191 else:
192 if self._ctrl_context == "Switch":
193 secondary_context = "Case"
194 if self._ctrl_context == "FSM":
195 secondary_context = "State"
196 raise SyntaxError("{} is not permitted directly inside of {}; it is permitted "
197 "inside of {} {}"
198 .format(construct, self._ctrl_context,
199 self._ctrl_context, secondary_context))
200
201 def _get_ctrl(self, name):
202 if self._ctrl_stack:
203 top_name, top_data = self._ctrl_stack[-1]
204 if top_name == name:
205 return top_data
206
207 def _flush_ctrl(self):
208 while len(self._ctrl_stack) > self.domain._depth:
209 self._pop_ctrl()
210
211 def _set_ctrl(self, name, data):
212 self._flush_ctrl()
213 self._ctrl_stack.append((name, data))
214 return data
215
216 def _check_signed_cond(self, cond):
217 cond = self._astType.cast(cond)
218 width, signed = cond.shape()
219 if signed:
220 warnings.warn("Signed values in If/Elif conditions usually result from inverting "
221 "Python booleans with ~, which leads to unexpected results. "
222 "Replace `~flag` with `not flag`. (If this is a false positive, "
223 "silence this warning with `m.If(x)` → `m.If(x.bool())`.)",
224 SyntaxWarning, stacklevel=4)
225 # check if the condition is not considered boolean, and only
226 # convert it if it is not
227 if not cond.considered_bool():
228 cond = cond.bool()
229 return cond
230
231 @_guardedcontextmanager("If")
232 def If(self, cond):
233 self._check_context("If", context=None)
234 cond = self._check_signed_cond(cond)
235 src_loc = tracer.get_src_loc(src_loc_at=1)
236 if_data = self._set_ctrl("If", {
237 "depth": self.domain._depth,
238 "tests": [],
239 "bodies": [],
240 "src_loc": src_loc,
241 "src_locs": [],
242 })
243 try:
244 _outer_case, self._statements = self._statements, []
245 self.domain._depth += 1
246 yield
247 self._flush_ctrl()
248 if_data["tests"].append(cond)
249 if_data["bodies"].append(self._statements)
250 if_data["src_locs"].append(src_loc)
251 finally:
252 self.domain._depth -= 1
253 self._statements = _outer_case
254
255 @_guardedcontextmanager("Elif")
256 def Elif(self, cond):
257 self._check_context("Elif", context=None)
258 cond = self._check_signed_cond(cond)
259 src_loc = tracer.get_src_loc(src_loc_at=1)
260 if_data = self._get_ctrl("If")
261 if if_data is None or if_data["depth"] != self.domain._depth:
262 raise SyntaxError("Elif without preceding If")
263 try:
264 _outer_case, self._statements = self._statements, []
265 self.domain._depth += 1
266 yield
267 self._flush_ctrl()
268 if_data["tests"].append(cond)
269 if_data["bodies"].append(self._statements)
270 if_data["src_locs"].append(src_loc)
271 finally:
272 self.domain._depth -= 1
273 self._statements = _outer_case
274
275 @_guardedcontextmanager("Else")
276 def Else(self):
277 self._check_context("Else", context=None)
278 src_loc = tracer.get_src_loc(src_loc_at=1)
279 if_data = self._get_ctrl("If")
280 if if_data is None or if_data["depth"] != self.domain._depth:
281 raise SyntaxError("Else without preceding If/Elif")
282 try:
283 _outer_case, self._statements = self._statements, []
284 self.domain._depth += 1
285 yield
286 self._flush_ctrl()
287 if_data["bodies"].append(self._statements)
288 if_data["src_locs"].append(src_loc)
289 finally:
290 self.domain._depth -= 1
291 self._statements = _outer_case
292 self._pop_ctrl()
293
294 @contextmanager
295 def Switch(self, test):
296 self._check_context("Switch", context=None)
297 switch_data = self._set_ctrl("Switch", {
298 "test": self._astType.cast(test),
299 "cases": OrderedDict(),
300 "src_loc": tracer.get_src_loc(src_loc_at=1),
301 "case_src_locs": {},
302 })
303 try:
304 self._ctrl_context = "Switch"
305 self.domain._depth += 1
306 yield
307 finally:
308 self.domain._depth -= 1
309 self._ctrl_context = None
310 self._pop_ctrl()
311
312 @contextmanager
313 def Case(self, *patterns):
314 self._check_context("Case", context="Switch")
315 src_loc = tracer.get_src_loc(src_loc_at=1)
316 switch_data = self._get_ctrl("Switch")
317 new_patterns = ()
318 for pattern in patterns:
319 if not isinstance(pattern, (int, str, Enum)):
320 raise SyntaxError("Case pattern must be an integer, a string, or an enumeration, "
321 "not {!r}"
322 .format(pattern))
323 if isinstance(pattern, str) and any(bit not in "01- \t" for bit in pattern):
324 raise SyntaxError("Case pattern '{}' must consist of 0, 1, and - (don't care) "
325 "bits, and may include whitespace"
326 .format(pattern))
327 if (isinstance(pattern, str) and
328 len("".join(pattern.split())) != len(switch_data["test"])):
329 raise SyntaxError("Case pattern '{}' must have the same width as switch value "
330 "(which is {})"
331 .format(pattern, len(switch_data["test"])))
332 if isinstance(pattern, int) and bits_for(pattern) > len(switch_data["test"]):
333 warnings.warn("Case pattern '{:b}' is wider than switch value "
334 "(which has width {}); comparison will never be true"
335 .format(pattern, len(switch_data["test"])),
336 SyntaxWarning, stacklevel=3)
337 continue
338 if isinstance(pattern, Enum) and bits_for(pattern.value) > len(switch_data["test"]):
339 warnings.warn("Case pattern '{:b}' ({}.{}) is wider than switch value "
340 "(which has width {}); comparison will never be true"
341 .format(pattern.value, pattern.__class__.__name__, pattern.name,
342 len(switch_data["test"])),
343 SyntaxWarning, stacklevel=3)
344 continue
345 new_patterns = (*new_patterns, pattern)
346 try:
347 _outer_case, self._statements = self._statements, []
348 self._ctrl_context = None
349 yield
350 self._flush_ctrl()
351 # If none of the provided cases can possibly be true, omit this branch completely.
352 # This needs to be differentiated from no cases being provided in the first place,
353 # which means the branch will always match.
354 if not (patterns and not new_patterns):
355 switch_data["cases"][new_patterns] = self._statements
356 switch_data["case_src_locs"][new_patterns] = src_loc
357 finally:
358 self._ctrl_context = "Switch"
359 self._statements = _outer_case
360
361 def Default(self):
362 return self.Case()
363
364 @contextmanager
365 def FSM(self, reset=None, domain="sync", name="fsm"):
366 self._check_context("FSM", context=None)
367 if domain == "comb":
368 raise ValueError("FSM may not be driven by the '{}' domain".format(domain))
369 fsm_data = self._set_ctrl("FSM", {
370 "name": name,
371 "signal": Signal(name="{}_state".format(name), src_loc_at=2),
372 "reset": reset,
373 "domain": domain,
374 "encoding": OrderedDict(),
375 "decoding": OrderedDict(),
376 "states": OrderedDict(),
377 "src_loc": tracer.get_src_loc(src_loc_at=1),
378 "state_src_locs": {},
379 })
380 self._generated[name] = fsm = \
381 FSM(fsm_data["signal"], fsm_data["encoding"], fsm_data["decoding"])
382 try:
383 self._ctrl_context = "FSM"
384 self.domain._depth += 1
385 yield fsm
386 for state_name in fsm_data["encoding"]:
387 if state_name not in fsm_data["states"]:
388 raise NameError("FSM state '{}' is referenced but not defined"
389 .format(state_name))
390 finally:
391 self.domain._depth -= 1
392 self._ctrl_context = None
393 self._pop_ctrl()
394
395 @contextmanager
396 def State(self, name):
397 self._check_context("FSM State", context="FSM")
398 src_loc = tracer.get_src_loc(src_loc_at=1)
399 fsm_data = self._get_ctrl("FSM")
400 if name in fsm_data["states"]:
401 raise NameError("FSM state '{}' is already defined".format(name))
402 if name not in fsm_data["encoding"]:
403 fsm_data["encoding"][name] = len(fsm_data["encoding"])
404 try:
405 _outer_case, self._statements = self._statements, []
406 self._ctrl_context = None
407 yield
408 self._flush_ctrl()
409 fsm_data["states"][name] = self._statements
410 fsm_data["state_src_locs"][name] = src_loc
411 finally:
412 self._ctrl_context = "FSM"
413 self._statements = _outer_case
414
415 @property
416 def next(self):
417 raise SyntaxError("Only assignment to `m.next` is permitted")
418
419 @next.setter
420 def next(self, name):
421 if self._ctrl_context != "FSM":
422 for level, (ctrl_name, ctrl_data) in enumerate(reversed(self._ctrl_stack)):
423 if ctrl_name == "FSM":
424 if name not in ctrl_data["encoding"]:
425 ctrl_data["encoding"][name] = len(ctrl_data["encoding"])
426 self._add_statement(
427 assigns=[ctrl_data["signal"].eq(ctrl_data["encoding"][name])],
428 domain=ctrl_data["domain"],
429 depth=len(self._ctrl_stack))
430 return
431
432 raise SyntaxError("`m.next = <...>` is only permitted inside an FSM state")
433
434 def _pop_ctrl(self):
435 name, data = self._ctrl_stack.pop()
436 src_loc = data["src_loc"]
437
438 if name == "If":
439 if_tests, if_bodies = data["tests"], data["bodies"]
440 if_src_locs = data["src_locs"]
441
442 tests, cases = [], OrderedDict()
443 for if_test, if_case in zip(if_tests + [None], if_bodies):
444 if if_test is not None:
445 tests.append(if_test)
446
447 if if_test is not None:
448 match = ("1" + "-" * (len(tests) - 1)).rjust(len(if_tests), "-")
449 else:
450 match = None
451 cases[match] = if_case
452
453 self._statements.append(Switch(Cat(tests), cases,
454 src_loc=src_loc, case_src_locs=dict(zip(cases, if_src_locs))))
455
456 if name == "Switch":
457 switch_test, switch_cases = data["test"], data["cases"]
458 switch_case_src_locs = data["case_src_locs"]
459
460 self._statements.append(Switch(switch_test, switch_cases,
461 src_loc=src_loc, case_src_locs=switch_case_src_locs))
462
463 if name == "FSM":
464 fsm_signal, fsm_reset, fsm_encoding, fsm_decoding, fsm_states = \
465 data["signal"], data["reset"], data["encoding"], data["decoding"], data["states"]
466 fsm_state_src_locs = data["state_src_locs"]
467 if not fsm_states:
468 return
469 fsm_signal.width = bits_for(len(fsm_encoding) - 1)
470 if fsm_reset is None:
471 fsm_signal.reset = fsm_encoding[next(iter(fsm_states))]
472 else:
473 fsm_signal.reset = fsm_encoding[fsm_reset]
474 # The FSM is encoded such that the state with encoding 0 is always the reset state.
475 fsm_decoding.update((n, s) for s, n in fsm_encoding.items())
476 fsm_signal.decoder = lambda n: "{}/{}".format(fsm_decoding[n], n)
477 self._statements.append(Switch(fsm_signal,
478 OrderedDict((fsm_encoding[name], stmts) for name, stmts in fsm_states.items()),
479 src_loc=src_loc, case_src_locs={fsm_encoding[name]: fsm_state_src_locs[name]
480 for name in fsm_states}))
481
482 def _add_statement(self, assigns, domain, depth, compat_mode=False):
483 def domain_name(domain):
484 if domain is None:
485 return "comb"
486 else:
487 return domain
488
489 while len(self._ctrl_stack) > self.domain._depth:
490 self._pop_ctrl()
491
492 for stmt in Statement.cast(assigns):
493 if not compat_mode and not isinstance(stmt, (_InternalAssign, Assert, Assume, Cover)):
494 raise SyntaxError(
495 "Only assignments and property checks may be appended to d.{}"
496 .format(domain_name(domain)))
497
498 stmt._MustUse__used = True
499 stmt = SampleDomainInjector(domain)(stmt)
500
501 for signal in stmt._lhs_signals():
502 if signal not in self._driving:
503 self._driving[signal] = domain
504 elif self._driving[signal] != domain:
505 cd_curr = self._driving[signal]
506 raise SyntaxError(
507 "Driver-driver conflict: trying to drive {!r} from d.{}, but it is "
508 "already driven from d.{}"
509 .format(signal, domain_name(domain), domain_name(cd_curr)))
510
511 self._statements.append(stmt)
512
513 def _add_submodule(self, submodule, name=None):
514 if not hasattr(submodule, "elaborate"):
515 raise TypeError("Trying to add {!r}, which does not implement .elaborate(), as "
516 "a submodule".format(submodule))
517 if name == None:
518 self._anon_submodules.append(submodule)
519 else:
520 if name in self._named_submodules:
521 raise NameError("Submodule named '{}' already exists".format(name))
522 self._named_submodules[name] = submodule
523
524 def _get_submodule(self, name):
525 if name in self._named_submodules:
526 return self._named_submodules[name]
527 else:
528 raise AttributeError("No submodule named '{}' exists".format(name))
529
530 def _add_domain(self, cd):
531 if cd.name in self._domains:
532 raise NameError("Clock domain named '{}' already exists".format(cd.name))
533 self._domains[cd.name] = cd
534
535 def _flush(self):
536 while self._ctrl_stack:
537 self._pop_ctrl()
538
539 def elaborate(self, platform):
540 self._flush()
541
542 fragment = Fragment()
543 for name in self._named_submodules:
544 fragment.add_subfragment(Fragment.get(self._named_submodules[name], platform), name)
545 for submodule in self._anon_submodules:
546 fragment.add_subfragment(Fragment.get(submodule, platform), None)
547 statements = SampleDomainInjector("sync")(self._statements)
548 fragment.add_statements(statements)
549 for signal, domain in self._driving.items():
550 fragment.add_driver(signal, domain)
551 fragment.add_domains(self._domains.values())
552 fragment.generated.update(self._generated)
553 return fragment