415ad951c9d077c6e7476030df7482a451ea2b95
[nmigen.git] / nmigen / hdl / dsl.py
1 from collections import OrderedDict, namedtuple
2 from collections.abc import Iterable
3 from contextlib import contextmanager
4 import warnings
5
6 from ..tools import flatten, bits_for, deprecated
7 from .ast import *
8 from .ir import *
9 from .xfrm import *
10
11
12 __all__ = ["SyntaxError", "SyntaxWarning", "Module"]
13
14
15 class SyntaxError(Exception):
16 pass
17
18
19 class SyntaxWarning(Warning):
20 pass
21
22
23 class _ModuleBuilderProxy:
24 def __init__(self, builder, depth):
25 object.__setattr__(self, "_builder", builder)
26 object.__setattr__(self, "_depth", depth)
27
28
29 class _ModuleBuilderDomainExplicit(_ModuleBuilderProxy):
30 def __init__(self, builder, depth, domain):
31 super().__init__(builder, depth)
32 self._domain = domain
33
34 def __iadd__(self, assigns):
35 self._builder._add_statement(assigns, domain=self._domain, depth=self._depth)
36 return self
37
38
39 class _ModuleBuilderDomainImplicit(_ModuleBuilderProxy):
40 def __getattr__(self, name):
41 if name == "comb":
42 domain = None
43 else:
44 domain = name
45 return _ModuleBuilderDomainExplicit(self._builder, self._depth, domain)
46
47 def __getitem__(self, name):
48 return self.__getattr__(name)
49
50 def __setattr__(self, name, value):
51 if name == "_depth":
52 object.__setattr__(self, name, value)
53 elif not isinstance(value, _ModuleBuilderDomainExplicit):
54 raise AttributeError("Cannot assign 'd.{}' attribute; did you mean 'd.{} +='?"
55 .format(name, name))
56
57 def __setitem__(self, name, value):
58 return self.__setattr__(name, value)
59
60
61 class _ModuleBuilderRoot:
62 def __init__(self, builder, depth):
63 self._builder = builder
64 self.domain = self.d = _ModuleBuilderDomainImplicit(builder, depth)
65
66 def __getattr__(self, name):
67 if name in ("comb", "sync"):
68 raise AttributeError("'{}' object has no attribute '{}'; did you mean 'd.{}'?"
69 .format(type(self).__name__, name, name))
70 raise AttributeError("'{}' object has no attribute '{}'"
71 .format(type(self).__name__, name))
72
73
74 class _ModuleBuilderSubmodules:
75 def __init__(self, builder):
76 object.__setattr__(self, "_builder", builder)
77
78 def __iadd__(self, modules):
79 for module in flatten([modules]):
80 self._builder._add_submodule(module)
81 return self
82
83 def __setattr__(self, name, submodule):
84 self._builder._add_submodule(submodule, name)
85
86 def __setitem__(self, name, value):
87 return self.__setattr__(name, value)
88
89
90 class _ModuleBuilderDomainSet:
91 def __init__(self, builder):
92 object.__setattr__(self, "_builder", builder)
93
94 def __iadd__(self, domains):
95 for domain in flatten([domains]):
96 self._builder._add_domain(domain)
97 return self
98
99 def __setattr__(self, name, domain):
100 self._builder._add_domain(domain)
101
102
103 class FSM:
104 def __init__(self, state, encoding, decoding):
105 self.state = state
106 self.encoding = encoding
107 self.decoding = decoding
108
109 def ongoing(self, name):
110 if name not in self.encoding:
111 self.encoding[name] = len(self.encoding)
112 return self.state == self.encoding[name]
113
114
115 class Module(_ModuleBuilderRoot, Elaboratable):
116 def __init__(self):
117 _ModuleBuilderRoot.__init__(self, self, depth=0)
118 self.submodules = _ModuleBuilderSubmodules(self)
119 self.domains = _ModuleBuilderDomainSet(self)
120
121 self._statements = Statement.wrap([])
122 self._ctrl_context = None
123 self._ctrl_stack = []
124
125 self._driving = SignalDict()
126 self._submodules = []
127 self._domains = []
128 self._generated = {}
129
130 def _check_context(self, construct, context):
131 if self._ctrl_context != context:
132 if self._ctrl_context is None:
133 raise SyntaxError("{} is not permitted outside of {}"
134 .format(construct, context))
135 else:
136 raise SyntaxError("{} is not permitted inside of {}"
137 .format(construct, self._ctrl_context))
138
139 def _get_ctrl(self, name):
140 if self._ctrl_stack:
141 top_name, top_data = self._ctrl_stack[-1]
142 if top_name == name:
143 return top_data
144
145 def _flush_ctrl(self):
146 while len(self._ctrl_stack) > self.domain._depth:
147 self._pop_ctrl()
148
149 def _set_ctrl(self, name, data):
150 self._flush_ctrl()
151 self._ctrl_stack.append((name, data))
152 return data
153
154 @contextmanager
155 def If(self, cond):
156 self._check_context("If", context=None)
157 if_data = self._set_ctrl("If", {"tests": [], "bodies": []})
158 try:
159 _outer_case, self._statements = self._statements, []
160 self.domain._depth += 1
161 yield
162 self._flush_ctrl()
163 if_data["tests"].append(cond)
164 if_data["bodies"].append(self._statements)
165 finally:
166 self.domain._depth -= 1
167 self._statements = _outer_case
168
169 @contextmanager
170 def Elif(self, cond):
171 self._check_context("Elif", context=None)
172 if_data = self._get_ctrl("If")
173 if if_data is None:
174 raise SyntaxError("Elif without preceding If")
175 try:
176 _outer_case, self._statements = self._statements, []
177 self.domain._depth += 1
178 yield
179 self._flush_ctrl()
180 if_data["tests"].append(cond)
181 if_data["bodies"].append(self._statements)
182 finally:
183 self.domain._depth -= 1
184 self._statements = _outer_case
185
186 @contextmanager
187 def Else(self):
188 self._check_context("Else", context=None)
189 if_data = self._get_ctrl("If")
190 if if_data is None:
191 raise SyntaxError("Else without preceding If/Elif")
192 try:
193 _outer_case, self._statements = self._statements, []
194 self.domain._depth += 1
195 yield
196 self._flush_ctrl()
197 if_data["bodies"].append(self._statements)
198 finally:
199 self.domain._depth -= 1
200 self._statements = _outer_case
201 self._pop_ctrl()
202
203 @contextmanager
204 def Switch(self, test):
205 self._check_context("Switch", context=None)
206 switch_data = self._set_ctrl("Switch", {"test": Value.wrap(test), "cases": OrderedDict()})
207 try:
208 self._ctrl_context = "Switch"
209 self.domain._depth += 1
210 yield
211 finally:
212 self.domain._depth -= 1
213 self._ctrl_context = None
214 self._pop_ctrl()
215
216 @contextmanager
217 def Case(self, *values):
218 self._check_context("Case", context="Switch")
219 switch_data = self._get_ctrl("Switch")
220 new_values = ()
221 for value in values:
222 if isinstance(value, str) and len(value) != len(switch_data["test"]):
223 raise SyntaxError("Case value '{}' must have the same width as test (which is {})"
224 .format(value, len(switch_data["test"])))
225 if isinstance(value, int) and bits_for(value) > len(switch_data["test"]):
226 warnings.warn("Case value '{:b}' is wider than test (which has width {}); "
227 "comparison will never be true"
228 .format(value, len(switch_data["test"])),
229 SyntaxWarning, stacklevel=3)
230 continue
231 new_values = (*new_values, value)
232 try:
233 _outer_case, self._statements = self._statements, []
234 self._ctrl_context = None
235 yield
236 self._flush_ctrl()
237 # If none of the provided cases can possibly be true, omit this branch completely.
238 # This needs to be differentiated from no cases being provided in the first place,
239 # which means the branch will always match.
240 if not (values and not new_values):
241 switch_data["cases"][new_values] = self._statements
242 finally:
243 self._ctrl_context = "Switch"
244 self._statements = _outer_case
245
246 @contextmanager
247 def FSM(self, reset=None, domain="sync", name="fsm"):
248 self._check_context("FSM", context=None)
249 fsm_data = self._set_ctrl("FSM", {
250 "name": name,
251 "signal": Signal(name="{}_state".format(name), src_loc_at=2),
252 "reset": reset,
253 "domain": domain,
254 "encoding": OrderedDict(),
255 "decoding": OrderedDict(),
256 "states": OrderedDict(),
257 })
258 self._generated[name] = fsm = \
259 FSM(fsm_data["signal"], fsm_data["encoding"], fsm_data["decoding"])
260 try:
261 self._ctrl_context = "FSM"
262 self.domain._depth += 1
263 yield fsm
264 finally:
265 self.domain._depth -= 1
266 self._ctrl_context = None
267 self._pop_ctrl()
268
269 @contextmanager
270 def State(self, name):
271 self._check_context("FSM State", context="FSM")
272 fsm_data = self._get_ctrl("FSM")
273 if name in fsm_data["states"]:
274 raise SyntaxError("FSM state '{}' is already defined".format(name))
275 if name not in fsm_data["encoding"]:
276 fsm_data["encoding"][name] = len(fsm_data["encoding"])
277 try:
278 _outer_case, self._statements = self._statements, []
279 self._ctrl_context = None
280 yield
281 self._flush_ctrl()
282 fsm_data["states"][name] = self._statements
283 finally:
284 self._ctrl_context = "FSM"
285 self._statements = _outer_case
286
287 @property
288 def next(self):
289 raise SyntaxError("Only assignment to `m.next` is permitted")
290
291 @next.setter
292 def next(self, name):
293 if self._ctrl_context != "FSM":
294 for level, (ctrl_name, ctrl_data) in enumerate(reversed(self._ctrl_stack)):
295 if ctrl_name == "FSM":
296 if name not in ctrl_data["encoding"]:
297 ctrl_data["encoding"][name] = len(ctrl_data["encoding"])
298 self._add_statement(
299 assigns=[ctrl_data["signal"].eq(ctrl_data["encoding"][name])],
300 domain=ctrl_data["domain"],
301 depth=len(self._ctrl_stack))
302 return
303
304 raise SyntaxError("`m.next = <...>` is only permitted inside an FSM state")
305
306 def _pop_ctrl(self):
307 # FIXME: the src_loc extraction unfortunately doesn't work very well here; src_loc_at=3 is
308 # correct, but the resulting src_loc points at the *last* line of the `with` block.
309 # Unfortunately, it is not clear how this can be fixed.
310
311 name, data = self._ctrl_stack.pop()
312
313 if name == "If":
314 if_tests, if_bodies = data["tests"], data["bodies"]
315
316 tests, cases = [], OrderedDict()
317 for if_test, if_case in zip(if_tests + [None], if_bodies):
318 if if_test is not None:
319 if_test = Value.wrap(if_test)
320 if len(if_test) != 1:
321 if_test = if_test.bool()
322 tests.append(if_test)
323
324 if if_test is not None:
325 match = ("1" + "-" * (len(tests) - 1)).rjust(len(if_tests), "-")
326 else:
327 match = None
328 cases[match] = if_case
329
330 self._statements.append(Switch(Cat(tests), cases, src_loc_at=3))
331
332 if name == "Switch":
333 switch_test, switch_cases = data["test"], data["cases"]
334
335 self._statements.append(Switch(switch_test, switch_cases, src_loc_at=3))
336
337 if name == "FSM":
338 fsm_signal, fsm_reset, fsm_encoding, fsm_decoding, fsm_states = \
339 data["signal"], data["reset"], data["encoding"], data["decoding"], data["states"]
340 if not fsm_states:
341 return
342 fsm_signal.nbits = bits_for(len(fsm_encoding) - 1)
343 if fsm_reset is None:
344 fsm_signal.reset = fsm_encoding[next(iter(fsm_states))]
345 else:
346 fsm_signal.reset = fsm_encoding[fsm_reset]
347 # The FSM is encoded such that the state with encoding 0 is always the reset state.
348 fsm_decoding.update((n, s) for s, n in fsm_encoding.items())
349 fsm_signal.decoder = lambda n: "{}/{}".format(fsm_decoding[n], n)
350 self._statements.append(Switch(fsm_signal,
351 OrderedDict((fsm_encoding[name], stmts) for name, stmts in fsm_states.items()),
352 src_loc_at=3))
353
354 def _add_statement(self, assigns, domain, depth, compat_mode=False):
355 def domain_name(domain):
356 if domain is None:
357 return "comb"
358 else:
359 return domain
360
361 while len(self._ctrl_stack) > self.domain._depth:
362 self._pop_ctrl()
363
364 for assign in Statement.wrap(assigns):
365 if not compat_mode and not isinstance(assign, (Assign, Assert, Assume)):
366 raise SyntaxError(
367 "Only assignments, asserts, and assumes may be appended to d.{}"
368 .format(domain_name(domain)))
369
370 assign = SampleDomainInjector(domain)(assign)
371 for signal in assign._lhs_signals():
372 if signal not in self._driving:
373 self._driving[signal] = domain
374 elif self._driving[signal] != domain:
375 cd_curr = self._driving[signal]
376 raise SyntaxError(
377 "Driver-driver conflict: trying to drive {!r} from d.{}, but it is "
378 "already driven from d.{}"
379 .format(signal, domain_name(domain), domain_name(cd_curr)))
380
381 self._statements.append(assign)
382
383 def _add_submodule(self, submodule, name=None):
384 if not hasattr(submodule, "elaborate"):
385 raise TypeError("Trying to add '{!r}', which does not implement .elaborate(), as "
386 "a submodule".format(submodule))
387 self._submodules.append((submodule, name))
388
389 def _add_domain(self, cd):
390 self._domains.append(cd)
391
392 def _flush(self):
393 while self._ctrl_stack:
394 self._pop_ctrl()
395
396 def elaborate(self, platform):
397 self._flush()
398
399 fragment = Fragment()
400 for submodule, name in self._submodules:
401 fragment.add_subfragment(Fragment.get(submodule, platform), name)
402 statements = SampleDomainInjector("sync")(self._statements)
403 fragment.add_statements(statements)
404 for signal, domain in self._driving.items():
405 fragment.add_driver(signal, domain)
406 fragment.add_domains(self._domains)
407 fragment.generated.update(self._generated)
408 return fragment