hdl.dsl: check for unique domain name.
[nmigen.git] / nmigen / hdl / dsl.py
1 from collections import OrderedDict, namedtuple
2 from collections.abc import Iterable
3 from contextlib import contextmanager, _GeneratorContextManager
4 from functools import wraps
5 from enum import Enum
6 import warnings
7
8 from .._utils import flatten, bits_for, deprecated
9 from .. import tracer
10 from .ast import *
11 from .ir import *
12 from .cd import *
13 from .xfrm import *
14
15
16 __all__ = ["SyntaxError", "SyntaxWarning", "Module"]
17
18
19 class SyntaxError(Exception):
20 pass
21
22
23 class SyntaxWarning(Warning):
24 pass
25
26
27 class _ModuleBuilderProxy:
28 def __init__(self, builder, depth):
29 object.__setattr__(self, "_builder", builder)
30 object.__setattr__(self, "_depth", depth)
31
32
33 class _ModuleBuilderDomain(_ModuleBuilderProxy):
34 def __init__(self, builder, depth, domain):
35 super().__init__(builder, depth)
36 self._domain = domain
37
38 def __iadd__(self, assigns):
39 self._builder._add_statement(assigns, domain=self._domain, depth=self._depth)
40 return self
41
42
43 class _ModuleBuilderDomains(_ModuleBuilderProxy):
44 def __getattr__(self, name):
45 if name == "submodules":
46 warnings.warn("Using '<module>.d.{}' would add statements to clock domain {!r}; "
47 "did you mean <module>.{} instead?"
48 .format(name, name, name),
49 SyntaxWarning, stacklevel=2)
50 if name == "comb":
51 domain = None
52 else:
53 domain = name
54 return _ModuleBuilderDomain(self._builder, self._depth, domain)
55
56 def __getitem__(self, name):
57 return self.__getattr__(name)
58
59 def __setattr__(self, name, value):
60 if name == "_depth":
61 object.__setattr__(self, name, value)
62 elif not isinstance(value, _ModuleBuilderDomain):
63 raise AttributeError("Cannot assign 'd.{}' attribute; did you mean 'd.{} +='?"
64 .format(name, name))
65
66 def __setitem__(self, name, value):
67 return self.__setattr__(name, value)
68
69
70 class _ModuleBuilderRoot:
71 def __init__(self, builder, depth):
72 self._builder = builder
73 self.domain = self.d = _ModuleBuilderDomains(builder, depth)
74
75 def __getattr__(self, name):
76 if name in ("comb", "sync"):
77 raise AttributeError("'{}' object has no attribute '{}'; did you mean 'd.{}'?"
78 .format(type(self).__name__, name, name))
79 raise AttributeError("'{}' object has no attribute '{}'"
80 .format(type(self).__name__, name))
81
82
83 class _ModuleBuilderSubmodules:
84 def __init__(self, builder):
85 object.__setattr__(self, "_builder", builder)
86
87 def __iadd__(self, modules):
88 for module in flatten([modules]):
89 self._builder._add_submodule(module)
90 return self
91
92 def __setattr__(self, name, submodule):
93 self._builder._add_submodule(submodule, name)
94
95 def __setitem__(self, name, value):
96 return self.__setattr__(name, value)
97
98 def __getattr__(self, name):
99 return self._builder._get_submodule(name)
100
101 def __getitem__(self, name):
102 return self.__getattr__(name)
103
104
105 class _ModuleBuilderDomainSet:
106 def __init__(self, builder):
107 object.__setattr__(self, "_builder", builder)
108
109 def __iadd__(self, domains):
110 for domain in flatten([domains]):
111 if not isinstance(domain, ClockDomain):
112 raise TypeError("Only clock domains may be added to `m.domains`, not {!r}"
113 .format(domain))
114 self._builder._add_domain(domain)
115 return self
116
117 def __setattr__(self, name, domain):
118 if not isinstance(domain, ClockDomain):
119 raise TypeError("Only clock domains may be added to `m.domains`, not {!r}"
120 .format(domain))
121 if domain.name != name:
122 raise NameError("Clock domain name {!r} must match name in `m.domains.{} += ...` "
123 "syntax"
124 .format(domain.name, name))
125 self._builder._add_domain(domain)
126
127
128 # It's not particularly clean to depend on an internal interface, but, unfortunately, __bool__
129 # must be defined on a class to be called during implicit conversion.
130 class _GuardedContextManager(_GeneratorContextManager):
131 def __init__(self, keyword, func, args, kwds):
132 self.keyword = keyword
133 return super().__init__(func, args, kwds)
134
135 def __bool__(self):
136 raise SyntaxError("`if m.{kw}(...):` does not work; use `with m.{kw}(...)`"
137 .format(kw=self.keyword))
138
139
140 def _guardedcontextmanager(keyword):
141 def decorator(func):
142 @wraps(func)
143 def helper(*args, **kwds):
144 return _GuardedContextManager(keyword, func, args, kwds)
145 return helper
146 return decorator
147
148
149 class FSM:
150 def __init__(self, state, encoding, decoding):
151 self.state = state
152 self.encoding = encoding
153 self.decoding = decoding
154
155 def ongoing(self, name):
156 if name not in self.encoding:
157 self.encoding[name] = len(self.encoding)
158 return Operator("==", [self.state, self.encoding[name]], src_loc_at=0)
159
160
161 class Module(_ModuleBuilderRoot, Elaboratable):
162 @classmethod
163 def __init_subclass__(cls):
164 raise SyntaxError("Instead of inheriting from `Module`, inherit from `Elaboratable` "
165 "and return a `Module` from the `elaborate(self, platform)` method")
166
167 def __init__(self):
168 _ModuleBuilderRoot.__init__(self, self, depth=0)
169 self.submodules = _ModuleBuilderSubmodules(self)
170 self.domains = _ModuleBuilderDomainSet(self)
171
172 self._statements = Statement.cast([])
173 self._ctrl_context = None
174 self._ctrl_stack = []
175
176 self._driving = SignalDict()
177 self._named_submodules = {}
178 self._anon_submodules = []
179 self._domains = {}
180 self._generated = {}
181
182 def _check_context(self, construct, context):
183 if self._ctrl_context != context:
184 if self._ctrl_context is None:
185 raise SyntaxError("{} is not permitted outside of {}"
186 .format(construct, context))
187 else:
188 if self._ctrl_context == "Switch":
189 secondary_context = "Case"
190 if self._ctrl_context == "FSM":
191 secondary_context = "State"
192 raise SyntaxError("{} is not permitted directly inside of {}; it is permitted "
193 "inside of {} {}"
194 .format(construct, self._ctrl_context,
195 self._ctrl_context, secondary_context))
196
197 def _get_ctrl(self, name):
198 if self._ctrl_stack:
199 top_name, top_data = self._ctrl_stack[-1]
200 if top_name == name:
201 return top_data
202
203 def _flush_ctrl(self):
204 while len(self._ctrl_stack) > self.domain._depth:
205 self._pop_ctrl()
206
207 def _set_ctrl(self, name, data):
208 self._flush_ctrl()
209 self._ctrl_stack.append((name, data))
210 return data
211
212 def _check_signed_cond(self, cond):
213 cond = Value.cast(cond)
214 width, signed = cond.shape()
215 if signed:
216 warnings.warn("Signed values in If/Elif conditions usually result from inverting "
217 "Python booleans with ~, which leads to unexpected results: ~True is "
218 "-2, which is truthful. Replace `~flag` with `not flag`. (If this is "
219 "a false positive, silence this warning with "
220 "`m.If(x)` → `m.If(x.bool())`.)",
221 SyntaxWarning, stacklevel=4)
222 return cond
223
224 @_guardedcontextmanager("If")
225 def If(self, cond):
226 self._check_context("If", context=None)
227 cond = self._check_signed_cond(cond)
228 src_loc = tracer.get_src_loc(src_loc_at=1)
229 if_data = self._set_ctrl("If", {
230 "tests": [],
231 "bodies": [],
232 "src_loc": src_loc,
233 "src_locs": [],
234 })
235 try:
236 _outer_case, self._statements = self._statements, []
237 self.domain._depth += 1
238 yield
239 self._flush_ctrl()
240 if_data["tests"].append(cond)
241 if_data["bodies"].append(self._statements)
242 if_data["src_locs"].append(src_loc)
243 finally:
244 self.domain._depth -= 1
245 self._statements = _outer_case
246
247 @_guardedcontextmanager("Elif")
248 def Elif(self, cond):
249 self._check_context("Elif", context=None)
250 cond = self._check_signed_cond(cond)
251 src_loc = tracer.get_src_loc(src_loc_at=1)
252 if_data = self._get_ctrl("If")
253 if if_data is None:
254 raise SyntaxError("Elif without preceding If")
255 try:
256 _outer_case, self._statements = self._statements, []
257 self.domain._depth += 1
258 yield
259 self._flush_ctrl()
260 if_data["tests"].append(cond)
261 if_data["bodies"].append(self._statements)
262 if_data["src_locs"].append(src_loc)
263 finally:
264 self.domain._depth -= 1
265 self._statements = _outer_case
266
267 @_guardedcontextmanager("Else")
268 def Else(self):
269 self._check_context("Else", context=None)
270 src_loc = tracer.get_src_loc(src_loc_at=1)
271 if_data = self._get_ctrl("If")
272 if if_data is None:
273 raise SyntaxError("Else without preceding If/Elif")
274 try:
275 _outer_case, self._statements = self._statements, []
276 self.domain._depth += 1
277 yield
278 self._flush_ctrl()
279 if_data["bodies"].append(self._statements)
280 if_data["src_locs"].append(src_loc)
281 finally:
282 self.domain._depth -= 1
283 self._statements = _outer_case
284 self._pop_ctrl()
285
286 @contextmanager
287 def Switch(self, test):
288 self._check_context("Switch", context=None)
289 switch_data = self._set_ctrl("Switch", {
290 "test": Value.cast(test),
291 "cases": OrderedDict(),
292 "src_loc": tracer.get_src_loc(src_loc_at=1),
293 "case_src_locs": {},
294 })
295 try:
296 self._ctrl_context = "Switch"
297 self.domain._depth += 1
298 yield
299 finally:
300 self.domain._depth -= 1
301 self._ctrl_context = None
302 self._pop_ctrl()
303
304 @contextmanager
305 def Case(self, *patterns):
306 self._check_context("Case", context="Switch")
307 src_loc = tracer.get_src_loc(src_loc_at=1)
308 switch_data = self._get_ctrl("Switch")
309 new_patterns = ()
310 for pattern in patterns:
311 if not isinstance(pattern, (int, str, Enum)):
312 raise SyntaxError("Case pattern must be an integer, a string, or an enumeration, "
313 "not {!r}"
314 .format(pattern))
315 if isinstance(pattern, str) and any(bit not in "01- \t" for bit in pattern):
316 raise SyntaxError("Case pattern '{}' must consist of 0, 1, and - (don't care) "
317 "bits, and may include whitespace"
318 .format(pattern))
319 if (isinstance(pattern, str) and
320 len("".join(pattern.split())) != len(switch_data["test"])):
321 raise SyntaxError("Case pattern '{}' must have the same width as switch value "
322 "(which is {})"
323 .format(pattern, len(switch_data["test"])))
324 if isinstance(pattern, int) and bits_for(pattern) > len(switch_data["test"]):
325 warnings.warn("Case pattern '{:b}' is wider than switch value "
326 "(which has width {}); comparison will never be true"
327 .format(pattern, len(switch_data["test"])),
328 SyntaxWarning, stacklevel=3)
329 continue
330 if isinstance(pattern, Enum) and bits_for(pattern.value) > len(switch_data["test"]):
331 warnings.warn("Case pattern '{:b}' ({}.{}) is wider than switch value "
332 "(which has width {}); comparison will never be true"
333 .format(pattern.value, pattern.__class__.__name__, pattern.name,
334 len(switch_data["test"])),
335 SyntaxWarning, stacklevel=3)
336 continue
337 new_patterns = (*new_patterns, pattern)
338 try:
339 _outer_case, self._statements = self._statements, []
340 self._ctrl_context = None
341 yield
342 self._flush_ctrl()
343 # If none of the provided cases can possibly be true, omit this branch completely.
344 # This needs to be differentiated from no cases being provided in the first place,
345 # which means the branch will always match.
346 if not (patterns and not new_patterns):
347 switch_data["cases"][new_patterns] = self._statements
348 switch_data["case_src_locs"][new_patterns] = src_loc
349 finally:
350 self._ctrl_context = "Switch"
351 self._statements = _outer_case
352
353 def Default(self):
354 return self.Case()
355
356 @contextmanager
357 def FSM(self, reset=None, domain="sync", name="fsm"):
358 self._check_context("FSM", context=None)
359 if domain == "comb":
360 raise ValueError("FSM may not be driven by the '{}' domain".format(domain))
361 fsm_data = self._set_ctrl("FSM", {
362 "name": name,
363 "signal": Signal(name="{}_state".format(name), src_loc_at=2),
364 "reset": reset,
365 "domain": domain,
366 "encoding": OrderedDict(),
367 "decoding": OrderedDict(),
368 "states": OrderedDict(),
369 "src_loc": tracer.get_src_loc(src_loc_at=1),
370 "state_src_locs": {},
371 })
372 self._generated[name] = fsm = \
373 FSM(fsm_data["signal"], fsm_data["encoding"], fsm_data["decoding"])
374 try:
375 self._ctrl_context = "FSM"
376 self.domain._depth += 1
377 yield fsm
378 for state_name in fsm_data["encoding"]:
379 if state_name not in fsm_data["states"]:
380 raise NameError("FSM state '{}' is referenced but not defined"
381 .format(state_name))
382 finally:
383 self.domain._depth -= 1
384 self._ctrl_context = None
385 self._pop_ctrl()
386
387 @contextmanager
388 def State(self, name):
389 self._check_context("FSM State", context="FSM")
390 src_loc = tracer.get_src_loc(src_loc_at=1)
391 fsm_data = self._get_ctrl("FSM")
392 if name in fsm_data["states"]:
393 raise NameError("FSM state '{}' is already defined".format(name))
394 if name not in fsm_data["encoding"]:
395 fsm_data["encoding"][name] = len(fsm_data["encoding"])
396 try:
397 _outer_case, self._statements = self._statements, []
398 self._ctrl_context = None
399 yield
400 self._flush_ctrl()
401 fsm_data["states"][name] = self._statements
402 fsm_data["state_src_locs"][name] = src_loc
403 finally:
404 self._ctrl_context = "FSM"
405 self._statements = _outer_case
406
407 @property
408 def next(self):
409 raise SyntaxError("Only assignment to `m.next` is permitted")
410
411 @next.setter
412 def next(self, name):
413 if self._ctrl_context != "FSM":
414 for level, (ctrl_name, ctrl_data) in enumerate(reversed(self._ctrl_stack)):
415 if ctrl_name == "FSM":
416 if name not in ctrl_data["encoding"]:
417 ctrl_data["encoding"][name] = len(ctrl_data["encoding"])
418 self._add_statement(
419 assigns=[ctrl_data["signal"].eq(ctrl_data["encoding"][name])],
420 domain=ctrl_data["domain"],
421 depth=len(self._ctrl_stack))
422 return
423
424 raise SyntaxError("`m.next = <...>` is only permitted inside an FSM state")
425
426 def _pop_ctrl(self):
427 name, data = self._ctrl_stack.pop()
428 src_loc = data["src_loc"]
429
430 if name == "If":
431 if_tests, if_bodies = data["tests"], data["bodies"]
432 if_src_locs = data["src_locs"]
433
434 tests, cases = [], OrderedDict()
435 for if_test, if_case in zip(if_tests + [None], if_bodies):
436 if if_test is not None:
437 if_test = Value.cast(if_test)
438 if len(if_test) != 1:
439 if_test = if_test.bool()
440 tests.append(if_test)
441
442 if if_test is not None:
443 match = ("1" + "-" * (len(tests) - 1)).rjust(len(if_tests), "-")
444 else:
445 match = None
446 cases[match] = if_case
447
448 self._statements.append(Switch(Cat(tests), cases,
449 src_loc=src_loc, case_src_locs=dict(zip(cases, if_src_locs))))
450
451 if name == "Switch":
452 switch_test, switch_cases = data["test"], data["cases"]
453 switch_case_src_locs = data["case_src_locs"]
454
455 self._statements.append(Switch(switch_test, switch_cases,
456 src_loc=src_loc, case_src_locs=switch_case_src_locs))
457
458 if name == "FSM":
459 fsm_signal, fsm_reset, fsm_encoding, fsm_decoding, fsm_states = \
460 data["signal"], data["reset"], data["encoding"], data["decoding"], data["states"]
461 fsm_state_src_locs = data["state_src_locs"]
462 if not fsm_states:
463 return
464 fsm_signal.width = bits_for(len(fsm_encoding) - 1)
465 if fsm_reset is None:
466 fsm_signal.reset = fsm_encoding[next(iter(fsm_states))]
467 else:
468 fsm_signal.reset = fsm_encoding[fsm_reset]
469 # The FSM is encoded such that the state with encoding 0 is always the reset state.
470 fsm_decoding.update((n, s) for s, n in fsm_encoding.items())
471 fsm_signal.decoder = lambda n: "{}/{}".format(fsm_decoding[n], n)
472 self._statements.append(Switch(fsm_signal,
473 OrderedDict((fsm_encoding[name], stmts) for name, stmts in fsm_states.items()),
474 src_loc=src_loc, case_src_locs={fsm_encoding[name]: fsm_state_src_locs[name]
475 for name in fsm_states}))
476
477 def _add_statement(self, assigns, domain, depth, compat_mode=False):
478 def domain_name(domain):
479 if domain is None:
480 return "comb"
481 else:
482 return domain
483
484 while len(self._ctrl_stack) > self.domain._depth:
485 self._pop_ctrl()
486
487 for stmt in Statement.cast(assigns):
488 if not compat_mode and not isinstance(stmt, (Assign, Assert, Assume, Cover)):
489 raise SyntaxError(
490 "Only assignments and property checks may be appended to d.{}"
491 .format(domain_name(domain)))
492
493 stmt._MustUse__used = True
494 stmt = SampleDomainInjector(domain)(stmt)
495
496 for signal in stmt._lhs_signals():
497 if signal not in self._driving:
498 self._driving[signal] = domain
499 elif self._driving[signal] != domain:
500 cd_curr = self._driving[signal]
501 raise SyntaxError(
502 "Driver-driver conflict: trying to drive {!r} from d.{}, but it is "
503 "already driven from d.{}"
504 .format(signal, domain_name(domain), domain_name(cd_curr)))
505
506 self._statements.append(stmt)
507
508 def _add_submodule(self, submodule, name=None):
509 if not hasattr(submodule, "elaborate"):
510 raise TypeError("Trying to add {!r}, which does not implement .elaborate(), as "
511 "a submodule".format(submodule))
512 if name == None:
513 self._anon_submodules.append(submodule)
514 else:
515 if name in self._named_submodules:
516 raise NameError("Submodule named '{}' already exists".format(name))
517 self._named_submodules[name] = submodule
518
519 def _get_submodule(self, name):
520 if name in self._named_submodules:
521 return self._named_submodules[name]
522 else:
523 raise AttributeError("No submodule named '{}' exists".format(name))
524
525 def _add_domain(self, cd):
526 if cd.name in self._domains:
527 raise NameError("Clock domain named '{}' already exists".format(cd.name))
528 self._domains[cd.name] = cd
529
530 def _flush(self):
531 while self._ctrl_stack:
532 self._pop_ctrl()
533
534 def elaborate(self, platform):
535 self._flush()
536
537 fragment = Fragment()
538 for name in self._named_submodules:
539 fragment.add_subfragment(Fragment.get(self._named_submodules[name], platform), name)
540 for submodule in self._anon_submodules:
541 fragment.add_subfragment(Fragment.get(submodule, platform), None)
542 statements = SampleDomainInjector("sync")(self._statements)
543 fragment.add_statements(statements)
544 for signal, domain in self._driving.items():
545 fragment.add_driver(signal, domain)
546 fragment.add_domains(self._domains.values())
547 fragment.generated.update(self._generated)
548 return fragment