1 from collections
import OrderedDict
2 from contextlib
import contextmanager
, _GeneratorContextManager
3 from functools
import wraps
7 from .._utils
import flatten
, bits_for
15 __all__
= ["SyntaxError", "SyntaxWarning", "Module"]
18 class SyntaxError(Exception):
22 class SyntaxWarning(Warning):
26 class _ModuleBuilderProxy
:
27 def __init__(self
, builder
, depth
):
28 object.__setattr
__(self
, "_builder", builder
)
29 object.__setattr
__(self
, "_depth", depth
)
32 class _ModuleBuilderDomain(_ModuleBuilderProxy
):
33 def __init__(self
, builder
, depth
, domain
):
34 super().__init
__(builder
, depth
)
37 def __iadd__(self
, assigns
):
38 self
._builder
._add
_statement
(assigns
, domain
=self
._domain
, depth
=self
._depth
)
42 class _ModuleBuilderDomains(_ModuleBuilderProxy
):
43 def __getattr__(self
, name
):
44 if name
== "submodules":
45 warnings
.warn("Using '<module>.d.{}' would add statements to clock domain {!r}; "
46 "did you mean <module>.{} instead?"
47 .format(name
, name
, name
),
48 SyntaxWarning, stacklevel
=2)
53 return _ModuleBuilderDomain(self
._builder
, self
._depth
, domain
)
55 def __getitem__(self
, name
):
56 return self
.__getattr
__(name
)
58 def __setattr__(self
, name
, value
):
60 object.__setattr
__(self
, name
, value
)
61 elif not isinstance(value
, _ModuleBuilderDomain
):
62 raise AttributeError("Cannot assign 'd.{}' attribute; did you mean 'd.{} +='?"
65 def __setitem__(self
, name
, value
):
66 return self
.__setattr
__(name
, value
)
69 class _ModuleBuilderRoot
:
70 def __init__(self
, builder
, depth
):
71 self
._builder
= builder
72 self
.domain
= self
.d
= _ModuleBuilderDomains(builder
, depth
)
74 def __getattr__(self
, name
):
75 if name
in ("comb", "sync"):
76 raise AttributeError("'{}' object has no attribute '{}'; did you mean 'd.{}'?"
77 .format(type(self
).__name
__, name
, name
))
78 raise AttributeError("'{}' object has no attribute '{}'"
79 .format(type(self
).__name
__, name
))
82 class _ModuleBuilderSubmodules
:
83 def __init__(self
, builder
):
84 object.__setattr
__(self
, "_builder", builder
)
86 def __iadd__(self
, modules
):
87 for module
in flatten([modules
]):
88 self
._builder
._add
_submodule
(module
)
91 def __setattr__(self
, name
, submodule
):
92 self
._builder
._add
_submodule
(submodule
, name
)
94 def __setitem__(self
, name
, value
):
95 return self
.__setattr
__(name
, value
)
97 def __getattr__(self
, name
):
98 return self
._builder
._get
_submodule
(name
)
100 def __getitem__(self
, name
):
101 return self
.__getattr
__(name
)
104 class _ModuleBuilderDomainSet
:
105 def __init__(self
, builder
):
106 object.__setattr
__(self
, "_builder", builder
)
108 def __iadd__(self
, domains
):
109 for domain
in flatten([domains
]):
110 if not isinstance(domain
, ClockDomain
):
111 raise TypeError("Only clock domains may be added to `m.domains`, not {!r}"
113 self
._builder
._add
_domain
(domain
)
116 def __setattr__(self
, name
, domain
):
117 if not isinstance(domain
, ClockDomain
):
118 raise TypeError("Only clock domains may be added to `m.domains`, not {!r}"
120 if domain
.name
!= name
:
121 raise NameError("Clock domain name {!r} must match name in `m.domains.{} += ...` "
123 .format(domain
.name
, name
))
124 self
._builder
._add
_domain
(domain
)
127 # It's not particularly clean to depend on an internal interface, but, unfortunately, __bool__
128 # must be defined on a class to be called during implicit conversion.
129 class _GuardedContextManager(_GeneratorContextManager
):
130 def __init__(self
, keyword
, func
, args
, kwds
):
131 self
.keyword
= keyword
132 return super().__init
__(func
, args
, kwds
)
135 raise SyntaxError("`if m.{kw}(...):` does not work; use `with m.{kw}(...)`"
136 .format(kw
=self
.keyword
))
139 def _guardedcontextmanager(keyword
):
142 def helper(*args
, **kwds
):
143 return _GuardedContextManager(keyword
, func
, args
, kwds
)
149 def __init__(self
, state
, encoding
, decoding
):
151 self
.encoding
= encoding
152 self
.decoding
= decoding
154 def ongoing(self
, name
):
155 if name
not in self
.encoding
:
156 self
.encoding
[name
] = len(self
.encoding
)
157 return Operator("==", [self
.state
, self
.encoding
[name
]], src_loc_at
=0)
160 class Module(_ModuleBuilderRoot
, Elaboratable
):
162 def __init_subclass__(cls
):
163 raise SyntaxError("Instead of inheriting from `Module`, inherit from `Elaboratable` "
164 "and return a `Module` from the `elaborate(self, platform)` method")
166 def __init__(self
, _astType
=None):
167 _ModuleBuilderRoot
.__init
__(self
, self
, depth
=0)
168 self
.submodules
= _ModuleBuilderSubmodules(self
)
169 self
.domains
= _ModuleBuilderDomainSet(self
)
171 self
._statements
= Statement
.cast([])
172 self
._ctrl
_context
= None
173 self
._ctrl
_stack
= []
175 self
._driving
= SignalDict()
176 self
._named
_submodules
= {}
177 self
._anon
_submodules
= []
181 # to complete the Type 1 (ast.*) nmigen language construct abstraction
182 # from Type 2 (Module - this class) Module must be told what AST type
183 # it may cast m.If/Elif conditions and m.Switch
184 self
._astType
= _astType
or Value
186 def _check_context(self
, construct
, context
):
187 if self
._ctrl
_context
!= context
:
188 if self
._ctrl
_context
is None:
189 raise SyntaxError("{} is not permitted outside of {}"
190 .format(construct
, context
))
192 if self
._ctrl
_context
== "Switch":
193 secondary_context
= "Case"
194 if self
._ctrl
_context
== "FSM":
195 secondary_context
= "State"
196 raise SyntaxError("{} is not permitted directly inside of {}; it is permitted "
198 .format(construct
, self
._ctrl
_context
,
199 self
._ctrl
_context
, secondary_context
))
201 def _get_ctrl(self
, name
):
203 top_name
, top_data
= self
._ctrl
_stack
[-1]
207 def _flush_ctrl(self
):
208 while len(self
._ctrl
_stack
) > self
.domain
._depth
:
211 def _set_ctrl(self
, name
, data
):
213 self
._ctrl
_stack
.append((name
, data
))
216 def _check_signed_cond(self
, cond
):
217 cond
= self
._astType
.cast(cond
)
218 width
, signed
= cond
.shape()
220 warnings
.warn("Signed values in If/Elif conditions usually result from inverting "
221 "Python booleans with ~, which leads to unexpected results. "
222 "Replace `~flag` with `not flag`. (If this is a false positive, "
223 "silence this warning with `m.If(x)` → `m.If(x.bool())`.)",
224 SyntaxWarning, stacklevel
=4)
225 # check if the condition is not considered boolean, and only
226 # convert it if it is not
227 if not cond
.considered_bool():
231 @_guardedcontextmanager("If")
233 self
._check
_context
("If", context
=None)
234 cond
= self
._check
_signed
_cond
(cond
)
235 src_loc
= tracer
.get_src_loc(src_loc_at
=1)
236 if_data
= self
._set
_ctrl
("If", {
237 "depth": self
.domain
._depth
,
244 _outer_case
, self
._statements
= self
._statements
, []
245 self
.domain
._depth
+= 1
248 if_data
["tests"].append(cond
)
249 if_data
["bodies"].append(self
._statements
)
250 if_data
["src_locs"].append(src_loc
)
252 self
.domain
._depth
-= 1
253 self
._statements
= _outer_case
255 @_guardedcontextmanager("Elif")
256 def Elif(self
, cond
):
257 self
._check
_context
("Elif", context
=None)
258 cond
= self
._check
_signed
_cond
(cond
)
259 src_loc
= tracer
.get_src_loc(src_loc_at
=1)
260 if_data
= self
._get
_ctrl
("If")
261 if if_data
is None or if_data
["depth"] != self
.domain
._depth
:
262 raise SyntaxError("Elif without preceding If")
264 _outer_case
, self
._statements
= self
._statements
, []
265 self
.domain
._depth
+= 1
268 if_data
["tests"].append(cond
)
269 if_data
["bodies"].append(self
._statements
)
270 if_data
["src_locs"].append(src_loc
)
272 self
.domain
._depth
-= 1
273 self
._statements
= _outer_case
275 @_guardedcontextmanager("Else")
277 self
._check
_context
("Else", context
=None)
278 src_loc
= tracer
.get_src_loc(src_loc_at
=1)
279 if_data
= self
._get
_ctrl
("If")
280 if if_data
is None or if_data
["depth"] != self
.domain
._depth
:
281 raise SyntaxError("Else without preceding If/Elif")
283 _outer_case
, self
._statements
= self
._statements
, []
284 self
.domain
._depth
+= 1
287 if_data
["bodies"].append(self
._statements
)
288 if_data
["src_locs"].append(src_loc
)
290 self
.domain
._depth
-= 1
291 self
._statements
= _outer_case
295 def Switch(self
, test
):
296 self
._check
_context
("Switch", context
=None)
297 switch_data
= self
._set
_ctrl
("Switch", {
298 "test": self
._astType
.cast(test
),
299 "cases": OrderedDict(),
300 "src_loc": tracer
.get_src_loc(src_loc_at
=1),
304 self
._ctrl
_context
= "Switch"
305 self
.domain
._depth
+= 1
308 self
.domain
._depth
-= 1
309 self
._ctrl
_context
= None
313 def Case(self
, *patterns
):
314 self
._check
_context
("Case", context
="Switch")
315 src_loc
= tracer
.get_src_loc(src_loc_at
=1)
316 switch_data
= self
._get
_ctrl
("Switch")
318 for pattern
in patterns
:
319 if not isinstance(pattern
, (int, str, Enum
)):
320 raise SyntaxError("Case pattern must be an integer, a string, or an enumeration, "
323 if isinstance(pattern
, str) and any(bit
not in "01- \t" for bit
in pattern
):
324 raise SyntaxError("Case pattern '{}' must consist of 0, 1, and - (don't care) "
325 "bits, and may include whitespace"
327 if (isinstance(pattern
, str) and
328 len("".join(pattern
.split())) != len(switch_data
["test"])):
329 raise SyntaxError("Case pattern '{}' must have the same width as switch value "
331 .format(pattern
, len(switch_data
["test"])))
332 if isinstance(pattern
, int) and bits_for(pattern
) > len(switch_data
["test"]):
333 warnings
.warn("Case pattern '{:b}' is wider than switch value "
334 "(which has width {}); comparison will never be true"
335 .format(pattern
, len(switch_data
["test"])),
336 SyntaxWarning, stacklevel
=3)
338 if isinstance(pattern
, Enum
) and bits_for(pattern
.value
) > len(switch_data
["test"]):
339 warnings
.warn("Case pattern '{:b}' ({}.{}) is wider than switch value "
340 "(which has width {}); comparison will never be true"
341 .format(pattern
.value
, pattern
.__class
__.__name
__, pattern
.name
,
342 len(switch_data
["test"])),
343 SyntaxWarning, stacklevel
=3)
345 new_patterns
= (*new_patterns
, pattern
)
347 _outer_case
, self
._statements
= self
._statements
, []
348 self
._ctrl
_context
= None
351 # If none of the provided cases can possibly be true, omit this branch completely.
352 # This needs to be differentiated from no cases being provided in the first place,
353 # which means the branch will always match.
354 if not (patterns
and not new_patterns
):
355 switch_data
["cases"][new_patterns
] = self
._statements
356 switch_data
["case_src_locs"][new_patterns
] = src_loc
358 self
._ctrl
_context
= "Switch"
359 self
._statements
= _outer_case
365 def FSM(self
, reset
=None, domain
="sync", name
="fsm"):
366 self
._check
_context
("FSM", context
=None)
368 raise ValueError("FSM may not be driven by the '{}' domain".format(domain
))
369 fsm_data
= self
._set
_ctrl
("FSM", {
371 "signal": Signal(name
="{}_state".format(name
), src_loc_at
=2),
374 "encoding": OrderedDict(),
375 "decoding": OrderedDict(),
376 "states": OrderedDict(),
377 "src_loc": tracer
.get_src_loc(src_loc_at
=1),
378 "state_src_locs": {},
380 self
._generated
[name
] = fsm
= \
381 FSM(fsm_data
["signal"], fsm_data
["encoding"], fsm_data
["decoding"])
383 self
._ctrl
_context
= "FSM"
384 self
.domain
._depth
+= 1
386 for state_name
in fsm_data
["encoding"]:
387 if state_name
not in fsm_data
["states"]:
388 raise NameError("FSM state '{}' is referenced but not defined"
391 self
.domain
._depth
-= 1
392 self
._ctrl
_context
= None
396 def State(self
, name
):
397 self
._check
_context
("FSM State", context
="FSM")
398 src_loc
= tracer
.get_src_loc(src_loc_at
=1)
399 fsm_data
= self
._get
_ctrl
("FSM")
400 if name
in fsm_data
["states"]:
401 raise NameError("FSM state '{}' is already defined".format(name
))
402 if name
not in fsm_data
["encoding"]:
403 fsm_data
["encoding"][name
] = len(fsm_data
["encoding"])
405 _outer_case
, self
._statements
= self
._statements
, []
406 self
._ctrl
_context
= None
409 fsm_data
["states"][name
] = self
._statements
410 fsm_data
["state_src_locs"][name
] = src_loc
412 self
._ctrl
_context
= "FSM"
413 self
._statements
= _outer_case
417 raise SyntaxError("Only assignment to `m.next` is permitted")
420 def next(self
, name
):
421 if self
._ctrl
_context
!= "FSM":
422 for level
, (ctrl_name
, ctrl_data
) in enumerate(reversed(self
._ctrl
_stack
)):
423 if ctrl_name
== "FSM":
424 if name
not in ctrl_data
["encoding"]:
425 ctrl_data
["encoding"][name
] = len(ctrl_data
["encoding"])
427 assigns
=[ctrl_data
["signal"].eq(ctrl_data
["encoding"][name
])],
428 domain
=ctrl_data
["domain"],
429 depth
=len(self
._ctrl
_stack
))
432 raise SyntaxError("`m.next = <...>` is only permitted inside an FSM state")
435 name
, data
= self
._ctrl
_stack
.pop()
436 src_loc
= data
["src_loc"]
439 if_tests
, if_bodies
= data
["tests"], data
["bodies"]
440 if_src_locs
= data
["src_locs"]
442 tests
, cases
= [], OrderedDict()
443 for if_test
, if_case
in zip(if_tests
+ [None], if_bodies
):
444 if if_test
is not None:
445 tests
.append(if_test
)
447 if if_test
is not None:
448 match
= ("1" + "-" * (len(tests
) - 1)).rjust(len(if_tests
), "-")
451 cases
[match
] = if_case
453 self
._statements
.append(Switch(Cat(tests
), cases
,
454 src_loc
=src_loc
, case_src_locs
=dict(zip(cases
, if_src_locs
))))
457 switch_test
, switch_cases
= data
["test"], data
["cases"]
458 switch_case_src_locs
= data
["case_src_locs"]
460 self
._statements
.append(Switch(switch_test
, switch_cases
,
461 src_loc
=src_loc
, case_src_locs
=switch_case_src_locs
))
464 fsm_signal
, fsm_reset
, fsm_encoding
, fsm_decoding
, fsm_states
= \
465 data
["signal"], data
["reset"], data
["encoding"], data
["decoding"], data
["states"]
466 fsm_state_src_locs
= data
["state_src_locs"]
469 fsm_signal
.width
= bits_for(len(fsm_encoding
) - 1)
470 if fsm_reset
is None:
471 fsm_signal
.reset
= fsm_encoding
[next(iter(fsm_states
))]
473 fsm_signal
.reset
= fsm_encoding
[fsm_reset
]
474 # The FSM is encoded such that the state with encoding 0 is always the reset state.
475 fsm_decoding
.update((n
, s
) for s
, n
in fsm_encoding
.items())
476 fsm_signal
.decoder
= lambda n
: "{}/{}".format(fsm_decoding
[n
], n
)
477 self
._statements
.append(Switch(fsm_signal
,
478 OrderedDict((fsm_encoding
[name
], stmts
) for name
, stmts
in fsm_states
.items()),
479 src_loc
=src_loc
, case_src_locs
={fsm_encoding
[name
]: fsm_state_src_locs
[name
]
480 for name
in fsm_states
}))
482 def _add_statement(self
, assigns
, domain
, depth
, compat_mode
=False):
483 def domain_name(domain
):
489 while len(self
._ctrl
_stack
) > self
.domain
._depth
:
492 for stmt
in Statement
.cast(assigns
):
493 if not compat_mode
and not isinstance(stmt
, (_InternalAssign
, Assert
, Assume
, Cover
)):
495 "Only assignments and property checks may be appended to d.{}"
496 .format(domain_name(domain
)))
498 stmt
._MustUse
__used
= True
499 stmt
= SampleDomainInjector(domain
)(stmt
)
501 for signal
in stmt
._lhs
_signals
():
502 if signal
not in self
._driving
:
503 self
._driving
[signal
] = domain
504 elif self
._driving
[signal
] != domain
:
505 cd_curr
= self
._driving
[signal
]
507 "Driver-driver conflict: trying to drive {!r} from d.{}, but it is "
508 "already driven from d.{}"
509 .format(signal
, domain_name(domain
), domain_name(cd_curr
)))
511 self
._statements
.append(stmt
)
513 def _add_submodule(self
, submodule
, name
=None):
514 if not hasattr(submodule
, "elaborate"):
515 raise TypeError("Trying to add {!r}, which does not implement .elaborate(), as "
516 "a submodule".format(submodule
))
518 self
._anon
_submodules
.append(submodule
)
520 if name
in self
._named
_submodules
:
521 raise NameError("Submodule named '{}' already exists".format(name
))
522 self
._named
_submodules
[name
] = submodule
524 def _get_submodule(self
, name
):
525 if name
in self
._named
_submodules
:
526 return self
._named
_submodules
[name
]
528 raise AttributeError("No submodule named '{}' exists".format(name
))
530 def _add_domain(self
, cd
):
531 if cd
.name
in self
._domains
:
532 raise NameError("Clock domain named '{}' already exists".format(cd
.name
))
533 self
._domains
[cd
.name
] = cd
536 while self
._ctrl
_stack
:
539 def elaborate(self
, platform
):
542 fragment
= Fragment()
543 for name
in self
._named
_submodules
:
544 fragment
.add_subfragment(Fragment
.get(self
._named
_submodules
[name
], platform
), name
)
545 for submodule
in self
._anon
_submodules
:
546 fragment
.add_subfragment(Fragment
.get(submodule
, platform
), None)
547 statements
= SampleDomainInjector("sync")(self
._statements
)
548 fragment
.add_statements(statements
)
549 for signal
, domain
in self
._driving
.items():
550 fragment
.add_driver(signal
, domain
)
551 fragment
.add_domains(self
._domains
.values())
552 fragment
.generated
.update(self
._generated
)