1 from collections
import OrderedDict
, namedtuple
2 from collections
.abc
import Iterable
3 from contextlib
import contextmanager
6 from ..tools
import flatten
, bits_for
, deprecated
12 __all__
= ["SyntaxError", "SyntaxWarning", "Module"]
15 class SyntaxError(Exception):
19 class SyntaxWarning(Warning):
23 class _ModuleBuilderProxy
:
24 def __init__(self
, builder
, depth
):
25 object.__setattr
__(self
, "_builder", builder
)
26 object.__setattr
__(self
, "_depth", depth
)
29 class _ModuleBuilderDomainExplicit(_ModuleBuilderProxy
):
30 def __init__(self
, builder
, depth
, domain
):
31 super().__init
__(builder
, depth
)
34 def __iadd__(self
, assigns
):
35 self
._builder
._add
_statement
(assigns
, domain
=self
._domain
, depth
=self
._depth
)
39 class _ModuleBuilderDomainImplicit(_ModuleBuilderProxy
):
40 def __getattr__(self
, name
):
45 return _ModuleBuilderDomainExplicit(self
._builder
, self
._depth
, domain
)
47 def __getitem__(self
, name
):
48 return self
.__getattr
__(name
)
50 def __setattr__(self
, name
, value
):
52 object.__setattr
__(self
, name
, value
)
53 elif not isinstance(value
, _ModuleBuilderDomainExplicit
):
54 raise AttributeError("Cannot assign 'd.{}' attribute; did you mean 'd.{} +='?"
57 def __setitem__(self
, name
, value
):
58 return self
.__setattr
__(name
, value
)
61 class _ModuleBuilderRoot
:
62 def __init__(self
, builder
, depth
):
63 self
._builder
= builder
64 self
.domain
= self
.d
= _ModuleBuilderDomainImplicit(builder
, depth
)
66 def __getattr__(self
, name
):
67 if name
in ("comb", "sync"):
68 raise AttributeError("'{}' object has no attribute '{}'; did you mean 'd.{}'?"
69 .format(type(self
).__name
__, name
, name
))
70 raise AttributeError("'{}' object has no attribute '{}'"
71 .format(type(self
).__name
__, name
))
74 class _ModuleBuilderSubmodules
:
75 def __init__(self
, builder
):
76 object.__setattr
__(self
, "_builder", builder
)
78 def __iadd__(self
, modules
):
79 for module
in flatten([modules
]):
80 self
._builder
._add
_submodule
(module
)
83 def __setattr__(self
, name
, submodule
):
84 self
._builder
._add
_submodule
(submodule
, name
)
86 def __setitem__(self
, name
, value
):
87 return self
.__setattr
__(name
, value
)
90 class _ModuleBuilderDomainSet
:
91 def __init__(self
, builder
):
92 object.__setattr
__(self
, "_builder", builder
)
94 def __iadd__(self
, domains
):
95 for domain
in flatten([domains
]):
96 self
._builder
._add
_domain
(domain
)
99 def __setattr__(self
, name
, domain
):
100 self
._builder
._add
_domain
(domain
)
104 def __init__(self
, state
, encoding
, decoding
):
106 self
.encoding
= encoding
107 self
.decoding
= decoding
109 def ongoing(self
, name
):
110 if name
not in self
.encoding
:
111 self
.encoding
[name
] = len(self
.encoding
)
112 return self
.state
== self
.encoding
[name
]
115 class Module(_ModuleBuilderRoot
, Elaboratable
):
117 _ModuleBuilderRoot
.__init
__(self
, self
, depth
=0)
118 self
.submodules
= _ModuleBuilderSubmodules(self
)
119 self
.domains
= _ModuleBuilderDomainSet(self
)
121 self
._statements
= Statement
.wrap([])
122 self
._ctrl
_context
= None
123 self
._ctrl
_stack
= []
125 self
._driving
= SignalDict()
126 self
._submodules
= []
130 def _check_context(self
, construct
, context
):
131 if self
._ctrl
_context
!= context
:
132 if self
._ctrl
_context
is None:
133 raise SyntaxError("{} is not permitted outside of {}"
134 .format(construct
, context
))
136 raise SyntaxError("{} is not permitted inside of {}"
137 .format(construct
, self
._ctrl
_context
))
139 def _get_ctrl(self
, name
):
141 top_name
, top_data
= self
._ctrl
_stack
[-1]
145 def _flush_ctrl(self
):
146 while len(self
._ctrl
_stack
) > self
.domain
._depth
:
149 def _set_ctrl(self
, name
, data
):
151 self
._ctrl
_stack
.append((name
, data
))
156 self
._check
_context
("If", context
=None)
157 if_data
= self
._set
_ctrl
("If", {"tests": [], "bodies": []})
159 _outer_case
, self
._statements
= self
._statements
, []
160 self
.domain
._depth
+= 1
163 if_data
["tests"].append(cond
)
164 if_data
["bodies"].append(self
._statements
)
166 self
.domain
._depth
-= 1
167 self
._statements
= _outer_case
170 def Elif(self
, cond
):
171 self
._check
_context
("Elif", context
=None)
172 if_data
= self
._get
_ctrl
("If")
174 raise SyntaxError("Elif without preceding If")
176 _outer_case
, self
._statements
= self
._statements
, []
177 self
.domain
._depth
+= 1
180 if_data
["tests"].append(cond
)
181 if_data
["bodies"].append(self
._statements
)
183 self
.domain
._depth
-= 1
184 self
._statements
= _outer_case
188 self
._check
_context
("Else", context
=None)
189 if_data
= self
._get
_ctrl
("If")
191 raise SyntaxError("Else without preceding If/Elif")
193 _outer_case
, self
._statements
= self
._statements
, []
194 self
.domain
._depth
+= 1
197 if_data
["bodies"].append(self
._statements
)
199 self
.domain
._depth
-= 1
200 self
._statements
= _outer_case
204 def Switch(self
, test
):
205 self
._check
_context
("Switch", context
=None)
206 switch_data
= self
._set
_ctrl
("Switch", {"test": Value
.wrap(test
), "cases": OrderedDict()})
208 self
._ctrl
_context
= "Switch"
209 self
.domain
._depth
+= 1
212 self
.domain
._depth
-= 1
213 self
._ctrl
_context
= None
217 def Case(self
, *values
):
218 self
._check
_context
("Case", context
="Switch")
219 switch_data
= self
._get
_ctrl
("Switch")
222 if isinstance(value
, str) and len(value
) != len(switch_data
["test"]):
223 raise SyntaxError("Case value '{}' must have the same width as test (which is {})"
224 .format(value
, len(switch_data
["test"])))
225 if isinstance(value
, int) and bits_for(value
) > len(switch_data
["test"]):
226 warnings
.warn("Case value '{:b}' is wider than test (which has width {}); "
227 "comparison will never be true"
228 .format(value
, len(switch_data
["test"])),
229 SyntaxWarning, stacklevel
=3)
231 new_values
= (*new_values
, value
)
233 _outer_case
, self
._statements
= self
._statements
, []
234 self
._ctrl
_context
= None
237 # If none of the provided cases can possibly be true, omit this branch completely.
238 # This needs to be differentiated from no cases being provided in the first place,
239 # which means the branch will always match.
240 if not (values
and not new_values
):
241 switch_data
["cases"][new_values
] = self
._statements
243 self
._ctrl
_context
= "Switch"
244 self
._statements
= _outer_case
247 def FSM(self
, reset
=None, domain
="sync", name
="fsm"):
248 self
._check
_context
("FSM", context
=None)
249 fsm_data
= self
._set
_ctrl
("FSM", {
251 "signal": Signal(name
="{}_state".format(name
), src_loc_at
=2),
254 "encoding": OrderedDict(),
255 "decoding": OrderedDict(),
256 "states": OrderedDict(),
258 self
._generated
[name
] = fsm
= \
259 FSM(fsm_data
["signal"], fsm_data
["encoding"], fsm_data
["decoding"])
261 self
._ctrl
_context
= "FSM"
262 self
.domain
._depth
+= 1
265 self
.domain
._depth
-= 1
266 self
._ctrl
_context
= None
270 def State(self
, name
):
271 self
._check
_context
("FSM State", context
="FSM")
272 fsm_data
= self
._get
_ctrl
("FSM")
273 if name
in fsm_data
["states"]:
274 raise SyntaxError("FSM state '{}' is already defined".format(name
))
275 if name
not in fsm_data
["encoding"]:
276 fsm_data
["encoding"][name
] = len(fsm_data
["encoding"])
278 _outer_case
, self
._statements
= self
._statements
, []
279 self
._ctrl
_context
= None
282 fsm_data
["states"][name
] = self
._statements
284 self
._ctrl
_context
= "FSM"
285 self
._statements
= _outer_case
289 raise SyntaxError("Only assignment to `m.next` is permitted")
292 def next(self
, name
):
293 if self
._ctrl
_context
!= "FSM":
294 for level
, (ctrl_name
, ctrl_data
) in enumerate(reversed(self
._ctrl
_stack
)):
295 if ctrl_name
== "FSM":
296 if name
not in ctrl_data
["encoding"]:
297 ctrl_data
["encoding"][name
] = len(ctrl_data
["encoding"])
299 assigns
=[ctrl_data
["signal"].eq(ctrl_data
["encoding"][name
])],
300 domain
=ctrl_data
["domain"],
301 depth
=len(self
._ctrl
_stack
))
304 raise SyntaxError("`m.next = <...>` is only permitted inside an FSM state")
307 # FIXME: the src_loc extraction unfortunately doesn't work very well here; src_loc_at=3 is
308 # correct, but the resulting src_loc points at the *last* line of the `with` block.
309 # Unfortunately, it is not clear how this can be fixed.
311 name
, data
= self
._ctrl
_stack
.pop()
314 if_tests
, if_bodies
= data
["tests"], data
["bodies"]
316 tests
, cases
= [], OrderedDict()
317 for if_test
, if_case
in zip(if_tests
+ [None], if_bodies
):
318 if if_test
is not None:
319 if_test
= Value
.wrap(if_test
)
320 if len(if_test
) != 1:
321 if_test
= if_test
.bool()
322 tests
.append(if_test
)
324 if if_test
is not None:
325 match
= ("1" + "-" * (len(tests
) - 1)).rjust(len(if_tests
), "-")
328 cases
[match
] = if_case
330 self
._statements
.append(Switch(Cat(tests
), cases
, src_loc_at
=3))
333 switch_test
, switch_cases
= data
["test"], data
["cases"]
335 self
._statements
.append(Switch(switch_test
, switch_cases
, src_loc_at
=3))
338 fsm_signal
, fsm_reset
, fsm_encoding
, fsm_decoding
, fsm_states
= \
339 data
["signal"], data
["reset"], data
["encoding"], data
["decoding"], data
["states"]
342 fsm_signal
.nbits
= bits_for(len(fsm_encoding
) - 1)
343 if fsm_reset
is None:
344 fsm_signal
.reset
= fsm_encoding
[next(iter(fsm_states
))]
346 fsm_signal
.reset
= fsm_encoding
[fsm_reset
]
347 # The FSM is encoded such that the state with encoding 0 is always the reset state.
348 fsm_decoding
.update((n
, s
) for s
, n
in fsm_encoding
.items())
349 fsm_signal
.decoder
= lambda n
: "{}/{}".format(fsm_decoding
[n
], n
)
350 self
._statements
.append(Switch(fsm_signal
,
351 OrderedDict((fsm_encoding
[name
], stmts
) for name
, stmts
in fsm_states
.items()),
354 def _add_statement(self
, assigns
, domain
, depth
, compat_mode
=False):
355 def domain_name(domain
):
361 while len(self
._ctrl
_stack
) > self
.domain
._depth
:
364 for assign
in Statement
.wrap(assigns
):
365 if not compat_mode
and not isinstance(assign
, (Assign
, Assert
, Assume
)):
367 "Only assignments, asserts, and assumes may be appended to d.{}"
368 .format(domain_name(domain
)))
370 assign
= SampleDomainInjector(domain
)(assign
)
371 for signal
in assign
._lhs
_signals
():
372 if signal
not in self
._driving
:
373 self
._driving
[signal
] = domain
374 elif self
._driving
[signal
] != domain
:
375 cd_curr
= self
._driving
[signal
]
377 "Driver-driver conflict: trying to drive {!r} from d.{}, but it is "
378 "already driven from d.{}"
379 .format(signal
, domain_name(domain
), domain_name(cd_curr
)))
381 self
._statements
.append(assign
)
383 def _add_submodule(self
, submodule
, name
=None):
384 if not hasattr(submodule
, "elaborate"):
385 raise TypeError("Trying to add '{!r}', which does not implement .elaborate(), as "
386 "a submodule".format(submodule
))
387 self
._submodules
.append((submodule
, name
))
389 def _add_domain(self
, cd
):
390 self
._domains
.append(cd
)
393 while self
._ctrl
_stack
:
396 def elaborate(self
, platform
):
399 fragment
= Fragment()
400 for submodule
, name
in self
._submodules
:
401 fragment
.add_subfragment(Fragment
.get(submodule
, platform
), name
)
402 statements
= SampleDomainInjector("sync")(self
._statements
)
403 fragment
.add_statements(statements
)
404 for signal
, domain
in self
._driving
.items():
405 fragment
.add_driver(signal
, domain
)
406 fragment
.add_domains(self
._domains
)
407 fragment
.generated
.update(self
._generated
)