1 from collections
import OrderedDict
, namedtuple
2 from collections
.abc
import Iterable
3 from contextlib
import contextmanager
, _GeneratorContextManager
4 from functools
import wraps
8 from .._utils
import flatten
, bits_for
, deprecated
16 __all__
= ["SyntaxError", "SyntaxWarning", "Module"]
19 class SyntaxError(Exception):
23 class SyntaxWarning(Warning):
27 class _ModuleBuilderProxy
:
28 def __init__(self
, builder
, depth
):
29 object.__setattr
__(self
, "_builder", builder
)
30 object.__setattr
__(self
, "_depth", depth
)
33 class _ModuleBuilderDomain(_ModuleBuilderProxy
):
34 def __init__(self
, builder
, depth
, domain
):
35 super().__init
__(builder
, depth
)
38 def __iadd__(self
, assigns
):
39 self
._builder
._add
_statement
(assigns
, domain
=self
._domain
, depth
=self
._depth
)
43 class _ModuleBuilderDomains(_ModuleBuilderProxy
):
44 def __getattr__(self
, name
):
45 if name
== "submodules":
46 warnings
.warn("Using '<module>.d.{}' would add statements to clock domain {!r}; "
47 "did you mean <module>.{} instead?"
48 .format(name
, name
, name
),
49 SyntaxWarning, stacklevel
=2)
54 return _ModuleBuilderDomain(self
._builder
, self
._depth
, domain
)
56 def __getitem__(self
, name
):
57 return self
.__getattr
__(name
)
59 def __setattr__(self
, name
, value
):
61 object.__setattr
__(self
, name
, value
)
62 elif not isinstance(value
, _ModuleBuilderDomain
):
63 raise AttributeError("Cannot assign 'd.{}' attribute; did you mean 'd.{} +='?"
66 def __setitem__(self
, name
, value
):
67 return self
.__setattr
__(name
, value
)
70 class _ModuleBuilderRoot
:
71 def __init__(self
, builder
, depth
):
72 self
._builder
= builder
73 self
.domain
= self
.d
= _ModuleBuilderDomains(builder
, depth
)
75 def __getattr__(self
, name
):
76 if name
in ("comb", "sync"):
77 raise AttributeError("'{}' object has no attribute '{}'; did you mean 'd.{}'?"
78 .format(type(self
).__name
__, name
, name
))
79 raise AttributeError("'{}' object has no attribute '{}'"
80 .format(type(self
).__name
__, name
))
83 class _ModuleBuilderSubmodules
:
84 def __init__(self
, builder
):
85 object.__setattr
__(self
, "_builder", builder
)
87 def __iadd__(self
, modules
):
88 for module
in flatten([modules
]):
89 self
._builder
._add
_submodule
(module
)
92 def __setattr__(self
, name
, submodule
):
93 self
._builder
._add
_submodule
(submodule
, name
)
95 def __setitem__(self
, name
, value
):
96 return self
.__setattr
__(name
, value
)
98 def __getattr__(self
, name
):
99 return self
._builder
._get
_submodule
(name
)
101 def __getitem__(self
, name
):
102 return self
.__getattr
__(name
)
105 class _ModuleBuilderDomainSet
:
106 def __init__(self
, builder
):
107 object.__setattr
__(self
, "_builder", builder
)
109 def __iadd__(self
, domains
):
110 for domain
in flatten([domains
]):
111 if not isinstance(domain
, ClockDomain
):
112 raise TypeError("Only clock domains may be added to `m.domains`, not {!r}"
114 self
._builder
._add
_domain
(domain
)
117 def __setattr__(self
, name
, domain
):
118 if not isinstance(domain
, ClockDomain
):
119 raise TypeError("Only clock domains may be added to `m.domains`, not {!r}"
121 if domain
.name
!= name
:
122 raise NameError("Clock domain name {!r} must match name in `m.domains.{} += ...` "
124 .format(domain
.name
, name
))
125 self
._builder
._add
_domain
(domain
)
128 # It's not particularly clean to depend on an internal interface, but, unfortunately, __bool__
129 # must be defined on a class to be called during implicit conversion.
130 class _GuardedContextManager(_GeneratorContextManager
):
131 def __init__(self
, keyword
, func
, args
, kwds
):
132 self
.keyword
= keyword
133 return super().__init
__(func
, args
, kwds
)
136 raise SyntaxError("`if m.{kw}(...):` does not work; use `with m.{kw}(...)`"
137 .format(kw
=self
.keyword
))
140 def _guardedcontextmanager(keyword
):
143 def helper(*args
, **kwds
):
144 return _GuardedContextManager(keyword
, func
, args
, kwds
)
150 def __init__(self
, state
, encoding
, decoding
):
152 self
.encoding
= encoding
153 self
.decoding
= decoding
155 def ongoing(self
, name
):
156 if name
not in self
.encoding
:
157 self
.encoding
[name
] = len(self
.encoding
)
158 return Operator("==", [self
.state
, self
.encoding
[name
]], src_loc_at
=0)
161 class Module(_ModuleBuilderRoot
, Elaboratable
):
163 def __init_subclass__(cls
):
164 raise SyntaxError("Instead of inheriting from `Module`, inherit from `Elaboratable` "
165 "and return a `Module` from the `elaborate(self, platform)` method")
168 _ModuleBuilderRoot
.__init
__(self
, self
, depth
=0)
169 self
.submodules
= _ModuleBuilderSubmodules(self
)
170 self
.domains
= _ModuleBuilderDomainSet(self
)
172 self
._statements
= Statement
.cast([])
173 self
._ctrl
_context
= None
174 self
._ctrl
_stack
= []
176 self
._driving
= SignalDict()
177 self
._named
_submodules
= {}
178 self
._anon
_submodules
= []
182 def _check_context(self
, construct
, context
):
183 if self
._ctrl
_context
!= context
:
184 if self
._ctrl
_context
is None:
185 raise SyntaxError("{} is not permitted outside of {}"
186 .format(construct
, context
))
188 if self
._ctrl
_context
== "Switch":
189 secondary_context
= "Case"
190 if self
._ctrl
_context
== "FSM":
191 secondary_context
= "State"
192 raise SyntaxError("{} is not permitted directly inside of {}; it is permitted "
194 .format(construct
, self
._ctrl
_context
,
195 self
._ctrl
_context
, secondary_context
))
197 def _get_ctrl(self
, name
):
199 top_name
, top_data
= self
._ctrl
_stack
[-1]
203 def _flush_ctrl(self
):
204 while len(self
._ctrl
_stack
) > self
.domain
._depth
:
207 def _set_ctrl(self
, name
, data
):
209 self
._ctrl
_stack
.append((name
, data
))
212 def _check_signed_cond(self
, cond
):
213 cond
= Value
.cast(cond
)
214 width
, signed
= cond
.shape()
216 warnings
.warn("Signed values in If/Elif conditions usually result from inverting "
217 "Python booleans with ~, which leads to unexpected results: ~True is "
218 "-2, which is truthful. Replace `~flag` with `not flag`. (If this is "
219 "a false positive, silence this warning with "
220 "`m.If(x)` → `m.If(x.bool())`.)",
221 SyntaxWarning, stacklevel
=4)
224 @_guardedcontextmanager("If")
226 self
._check
_context
("If", context
=None)
227 cond
= self
._check
_signed
_cond
(cond
)
228 src_loc
= tracer
.get_src_loc(src_loc_at
=1)
229 if_data
= self
._set
_ctrl
("If", {
236 _outer_case
, self
._statements
= self
._statements
, []
237 self
.domain
._depth
+= 1
240 if_data
["tests"].append(cond
)
241 if_data
["bodies"].append(self
._statements
)
242 if_data
["src_locs"].append(src_loc
)
244 self
.domain
._depth
-= 1
245 self
._statements
= _outer_case
247 @_guardedcontextmanager("Elif")
248 def Elif(self
, cond
):
249 self
._check
_context
("Elif", context
=None)
250 cond
= self
._check
_signed
_cond
(cond
)
251 src_loc
= tracer
.get_src_loc(src_loc_at
=1)
252 if_data
= self
._get
_ctrl
("If")
254 raise SyntaxError("Elif without preceding If")
256 _outer_case
, self
._statements
= self
._statements
, []
257 self
.domain
._depth
+= 1
260 if_data
["tests"].append(cond
)
261 if_data
["bodies"].append(self
._statements
)
262 if_data
["src_locs"].append(src_loc
)
264 self
.domain
._depth
-= 1
265 self
._statements
= _outer_case
267 @_guardedcontextmanager("Else")
269 self
._check
_context
("Else", context
=None)
270 src_loc
= tracer
.get_src_loc(src_loc_at
=1)
271 if_data
= self
._get
_ctrl
("If")
273 raise SyntaxError("Else without preceding If/Elif")
275 _outer_case
, self
._statements
= self
._statements
, []
276 self
.domain
._depth
+= 1
279 if_data
["bodies"].append(self
._statements
)
280 if_data
["src_locs"].append(src_loc
)
282 self
.domain
._depth
-= 1
283 self
._statements
= _outer_case
287 def Switch(self
, test
):
288 self
._check
_context
("Switch", context
=None)
289 switch_data
= self
._set
_ctrl
("Switch", {
290 "test": Value
.cast(test
),
291 "cases": OrderedDict(),
292 "src_loc": tracer
.get_src_loc(src_loc_at
=1),
296 self
._ctrl
_context
= "Switch"
297 self
.domain
._depth
+= 1
300 self
.domain
._depth
-= 1
301 self
._ctrl
_context
= None
305 def Case(self
, *patterns
):
306 self
._check
_context
("Case", context
="Switch")
307 src_loc
= tracer
.get_src_loc(src_loc_at
=1)
308 switch_data
= self
._get
_ctrl
("Switch")
310 for pattern
in patterns
:
311 if not isinstance(pattern
, (int, str, Enum
)):
312 raise SyntaxError("Case pattern must be an integer, a string, or an enumeration, "
315 if isinstance(pattern
, str) and any(bit
not in "01- \t" for bit
in pattern
):
316 raise SyntaxError("Case pattern '{}' must consist of 0, 1, and - (don't care) "
317 "bits, and may include whitespace"
319 if (isinstance(pattern
, str) and
320 len("".join(pattern
.split())) != len(switch_data
["test"])):
321 raise SyntaxError("Case pattern '{}' must have the same width as switch value "
323 .format(pattern
, len(switch_data
["test"])))
324 if isinstance(pattern
, int) and bits_for(pattern
) > len(switch_data
["test"]):
325 warnings
.warn("Case pattern '{:b}' is wider than switch value "
326 "(which has width {}); comparison will never be true"
327 .format(pattern
, len(switch_data
["test"])),
328 SyntaxWarning, stacklevel
=3)
330 if isinstance(pattern
, Enum
) and bits_for(pattern
.value
) > len(switch_data
["test"]):
331 warnings
.warn("Case pattern '{:b}' ({}.{}) is wider than switch value "
332 "(which has width {}); comparison will never be true"
333 .format(pattern
.value
, pattern
.__class
__.__name
__, pattern
.name
,
334 len(switch_data
["test"])),
335 SyntaxWarning, stacklevel
=3)
337 new_patterns
= (*new_patterns
, pattern
)
339 _outer_case
, self
._statements
= self
._statements
, []
340 self
._ctrl
_context
= None
343 # If none of the provided cases can possibly be true, omit this branch completely.
344 # This needs to be differentiated from no cases being provided in the first place,
345 # which means the branch will always match.
346 if not (patterns
and not new_patterns
):
347 switch_data
["cases"][new_patterns
] = self
._statements
348 switch_data
["case_src_locs"][new_patterns
] = src_loc
350 self
._ctrl
_context
= "Switch"
351 self
._statements
= _outer_case
357 def FSM(self
, reset
=None, domain
="sync", name
="fsm"):
358 self
._check
_context
("FSM", context
=None)
360 raise ValueError("FSM may not be driven by the '{}' domain".format(domain
))
361 fsm_data
= self
._set
_ctrl
("FSM", {
363 "signal": Signal(name
="{}_state".format(name
), src_loc_at
=2),
366 "encoding": OrderedDict(),
367 "decoding": OrderedDict(),
368 "states": OrderedDict(),
369 "src_loc": tracer
.get_src_loc(src_loc_at
=1),
370 "state_src_locs": {},
372 self
._generated
[name
] = fsm
= \
373 FSM(fsm_data
["signal"], fsm_data
["encoding"], fsm_data
["decoding"])
375 self
._ctrl
_context
= "FSM"
376 self
.domain
._depth
+= 1
378 for state_name
in fsm_data
["encoding"]:
379 if state_name
not in fsm_data
["states"]:
380 raise NameError("FSM state '{}' is referenced but not defined"
383 self
.domain
._depth
-= 1
384 self
._ctrl
_context
= None
388 def State(self
, name
):
389 self
._check
_context
("FSM State", context
="FSM")
390 src_loc
= tracer
.get_src_loc(src_loc_at
=1)
391 fsm_data
= self
._get
_ctrl
("FSM")
392 if name
in fsm_data
["states"]:
393 raise NameError("FSM state '{}' is already defined".format(name
))
394 if name
not in fsm_data
["encoding"]:
395 fsm_data
["encoding"][name
] = len(fsm_data
["encoding"])
397 _outer_case
, self
._statements
= self
._statements
, []
398 self
._ctrl
_context
= None
401 fsm_data
["states"][name
] = self
._statements
402 fsm_data
["state_src_locs"][name
] = src_loc
404 self
._ctrl
_context
= "FSM"
405 self
._statements
= _outer_case
409 raise SyntaxError("Only assignment to `m.next` is permitted")
412 def next(self
, name
):
413 if self
._ctrl
_context
!= "FSM":
414 for level
, (ctrl_name
, ctrl_data
) in enumerate(reversed(self
._ctrl
_stack
)):
415 if ctrl_name
== "FSM":
416 if name
not in ctrl_data
["encoding"]:
417 ctrl_data
["encoding"][name
] = len(ctrl_data
["encoding"])
419 assigns
=[ctrl_data
["signal"].eq(ctrl_data
["encoding"][name
])],
420 domain
=ctrl_data
["domain"],
421 depth
=len(self
._ctrl
_stack
))
424 raise SyntaxError("`m.next = <...>` is only permitted inside an FSM state")
427 name
, data
= self
._ctrl
_stack
.pop()
428 src_loc
= data
["src_loc"]
431 if_tests
, if_bodies
= data
["tests"], data
["bodies"]
432 if_src_locs
= data
["src_locs"]
434 tests
, cases
= [], OrderedDict()
435 for if_test
, if_case
in zip(if_tests
+ [None], if_bodies
):
436 if if_test
is not None:
437 if_test
= Value
.cast(if_test
)
438 if len(if_test
) != 1:
439 if_test
= if_test
.bool()
440 tests
.append(if_test
)
442 if if_test
is not None:
443 match
= ("1" + "-" * (len(tests
) - 1)).rjust(len(if_tests
), "-")
446 cases
[match
] = if_case
448 self
._statements
.append(Switch(Cat(tests
), cases
,
449 src_loc
=src_loc
, case_src_locs
=dict(zip(cases
, if_src_locs
))))
452 switch_test
, switch_cases
= data
["test"], data
["cases"]
453 switch_case_src_locs
= data
["case_src_locs"]
455 self
._statements
.append(Switch(switch_test
, switch_cases
,
456 src_loc
=src_loc
, case_src_locs
=switch_case_src_locs
))
459 fsm_signal
, fsm_reset
, fsm_encoding
, fsm_decoding
, fsm_states
= \
460 data
["signal"], data
["reset"], data
["encoding"], data
["decoding"], data
["states"]
461 fsm_state_src_locs
= data
["state_src_locs"]
464 fsm_signal
.width
= bits_for(len(fsm_encoding
) - 1)
465 if fsm_reset
is None:
466 fsm_signal
.reset
= fsm_encoding
[next(iter(fsm_states
))]
468 fsm_signal
.reset
= fsm_encoding
[fsm_reset
]
469 # The FSM is encoded such that the state with encoding 0 is always the reset state.
470 fsm_decoding
.update((n
, s
) for s
, n
in fsm_encoding
.items())
471 fsm_signal
.decoder
= lambda n
: "{}/{}".format(fsm_decoding
[n
], n
)
472 self
._statements
.append(Switch(fsm_signal
,
473 OrderedDict((fsm_encoding
[name
], stmts
) for name
, stmts
in fsm_states
.items()),
474 src_loc
=src_loc
, case_src_locs
={fsm_encoding
[name
]: fsm_state_src_locs
[name
]
475 for name
in fsm_states
}))
477 def _add_statement(self
, assigns
, domain
, depth
, compat_mode
=False):
478 def domain_name(domain
):
484 while len(self
._ctrl
_stack
) > self
.domain
._depth
:
487 for stmt
in Statement
.cast(assigns
):
488 if not compat_mode
and not isinstance(stmt
, (Assign
, Assert
, Assume
, Cover
)):
490 "Only assignments and property checks may be appended to d.{}"
491 .format(domain_name(domain
)))
493 stmt
._MustUse
__used
= True
494 stmt
= SampleDomainInjector(domain
)(stmt
)
496 for signal
in stmt
._lhs
_signals
():
497 if signal
not in self
._driving
:
498 self
._driving
[signal
] = domain
499 elif self
._driving
[signal
] != domain
:
500 cd_curr
= self
._driving
[signal
]
502 "Driver-driver conflict: trying to drive {!r} from d.{}, but it is "
503 "already driven from d.{}"
504 .format(signal
, domain_name(domain
), domain_name(cd_curr
)))
506 self
._statements
.append(stmt
)
508 def _add_submodule(self
, submodule
, name
=None):
509 if not hasattr(submodule
, "elaborate"):
510 raise TypeError("Trying to add {!r}, which does not implement .elaborate(), as "
511 "a submodule".format(submodule
))
513 self
._anon
_submodules
.append(submodule
)
515 if name
in self
._named
_submodules
:
516 raise NameError("Submodule named '{}' already exists".format(name
))
517 self
._named
_submodules
[name
] = submodule
519 def _get_submodule(self
, name
):
520 if name
in self
._named
_submodules
:
521 return self
._named
_submodules
[name
]
523 raise AttributeError("No submodule named '{}' exists".format(name
))
525 def _add_domain(self
, cd
):
526 if cd
.name
in self
._domains
:
527 raise NameError("Clock domain named '{}' already exists".format(cd
.name
))
528 self
._domains
[cd
.name
] = cd
531 while self
._ctrl
_stack
:
534 def elaborate(self
, platform
):
537 fragment
= Fragment()
538 for name
in self
._named
_submodules
:
539 fragment
.add_subfragment(Fragment
.get(self
._named
_submodules
[name
], platform
), name
)
540 for submodule
in self
._anon
_submodules
:
541 fragment
.add_subfragment(Fragment
.get(submodule
, platform
), None)
542 statements
= SampleDomainInjector("sync")(self
._statements
)
543 fragment
.add_statements(statements
)
544 for signal
, domain
in self
._driving
.items():
545 fragment
.add_driver(signal
, domain
)
546 fragment
.add_domains(self
._domains
.values())
547 fragment
.generated
.update(self
._generated
)