hdl.ast: simplify Mux implementation.
[nmigen.git] / nmigen / sim / core.py
1 import inspect
2
3 from .._utils import deprecated
4 from ..hdl.cd import *
5 from ..hdl.ir import *
6 from ._base import BaseEngine
7
8
9 __all__ = ["Settle", "Delay", "Tick", "Passive", "Active", "Simulator"]
10
11
12 class Command:
13 pass
14
15
16 class Settle(Command):
17 def __repr__(self):
18 return "(settle)"
19
20
21 class Delay(Command):
22 def __init__(self, interval=None):
23 self.interval = None if interval is None else float(interval)
24
25 def __repr__(self):
26 if self.interval is None:
27 return "(delay ε)"
28 else:
29 return "(delay {:.3}us)".format(self.interval * 1e6)
30
31
32 class Tick(Command):
33 def __init__(self, domain="sync"):
34 if not isinstance(domain, (str, ClockDomain)):
35 raise TypeError("Domain must be a string or a ClockDomain instance, not {!r}"
36 .format(domain))
37 assert domain != "comb"
38 self.domain = domain
39
40 def __repr__(self):
41 return "(tick {})".format(self.domain)
42
43
44 class Passive(Command):
45 def __repr__(self):
46 return "(passive)"
47
48
49 class Active(Command):
50 def __repr__(self):
51 return "(active)"
52
53
54 class Simulator:
55 def __init__(self, fragment, *, engine="pysim"):
56 if isinstance(engine, type) and issubclass(engine, BaseEngine):
57 pass
58 elif engine == "pysim":
59 from .pysim import PySimEngine
60 engine = PySimEngine
61 else:
62 raise TypeError("Value '{!r}' is not a simulation engine class or "
63 "a simulation engine name"
64 .format(engine))
65
66 self._fragment = Fragment.get(fragment, platform=None).prepare()
67 self._engine = engine(self._fragment)
68 self._clocked = set()
69
70 def _check_process(self, process):
71 if not (inspect.isgeneratorfunction(process) or inspect.iscoroutinefunction(process)):
72 raise TypeError("Cannot add a process {!r} because it is not a generator function"
73 .format(process))
74 return process
75
76 def add_process(self, process):
77 process = self._check_process(process)
78 def wrapper():
79 # Only start a bench process after comb settling, so that the reset values are correct.
80 yield Settle()
81 yield from process()
82 self._engine.add_coroutine_process(wrapper, default_cmd=None)
83
84 def add_sync_process(self, process, *, domain="sync"):
85 process = self._check_process(process)
86 def wrapper():
87 # Only start a sync process after the first clock edge (or reset edge, if the domain
88 # uses an asynchronous reset). This matches the behavior of synchronous FFs.
89 yield Tick(domain)
90 yield from process()
91 self._engine.add_coroutine_process(wrapper, default_cmd=Tick(domain))
92
93 def add_clock(self, period, *, phase=None, domain="sync", if_exists=False):
94 """Add a clock process.
95
96 Adds a process that drives the clock signal of ``domain`` at a 50% duty cycle.
97
98 Arguments
99 ---------
100 period : float
101 Clock period. The process will toggle the ``domain`` clock signal every ``period / 2``
102 seconds.
103 phase : None or float
104 Clock phase. The process will wait ``phase`` seconds before the first clock transition.
105 If not specified, defaults to ``period / 2``.
106 domain : str or ClockDomain
107 Driven clock domain. If specified as a string, the domain with that name is looked up
108 in the root fragment of the simulation.
109 if_exists : bool
110 If ``False`` (the default), raise an error if the driven domain is specified as
111 a string and the root fragment does not have such a domain. If ``True``, do nothing
112 in this case.
113 """
114 if isinstance(domain, ClockDomain):
115 pass
116 elif domain in self._fragment.domains:
117 domain = self._fragment.domains[domain]
118 elif if_exists:
119 return
120 else:
121 raise ValueError("Domain {!r} is not present in simulation"
122 .format(domain))
123 if domain in self._clocked:
124 raise ValueError("Domain {!r} already has a clock driving it"
125 .format(domain.name))
126
127 if phase is None:
128 # By default, delay the first edge by half period. This causes any synchronous activity
129 # to happen at a non-zero time, distinguishing it from the reset values in the waveform
130 # viewer.
131 phase = period / 2
132 self._engine.add_clock_process(domain.clk, phase=phase, period=period)
133 self._clocked.add(domain)
134
135 def reset(self):
136 """Reset the simulation.
137
138 Assign the reset value to every signal in the simulation, and restart every user process.
139 """
140 self._engine.reset()
141
142 # TODO(nmigen-0.4): replace with _real_step
143 @deprecated("instead of `sim.step()`, use `sim.advance()`")
144 def step(self):
145 return self.advance()
146
147 def advance(self):
148 """Advance the simulation.
149
150 Run every process and commit changes until a fixed point is reached, then advance time
151 to the closest deadline (if any). If there is an unstable combinatorial loop,
152 this function will never return.
153
154 Returns ``True`` if there are any active processes, ``False`` otherwise.
155 """
156 return self._engine.advance()
157
158 def run(self):
159 """Run the simulation while any processes are active.
160
161 Processes added with :meth:`add_process` and :meth:`add_sync_process` are initially active,
162 and may change their status using the ``yield Passive()`` and ``yield Active()`` commands.
163 Processes compiled from HDL and added with :meth:`add_clock` are always passive.
164 """
165 while self.advance():
166 pass
167
168 def run_until(self, deadline, *, run_passive=False):
169 """Run the simulation until it advances to ``deadline``.
170
171 If ``run_passive`` is ``False``, the simulation also stops when there are no active
172 processes, similar to :meth:`run`. Otherwise, the simulation will stop only after it
173 advances to or past ``deadline``.
174
175 If the simulation stops advancing, this function will never return.
176 """
177 assert self._engine.now <= deadline
178 while (self.advance() or run_passive) and self._engine.now < deadline:
179 pass
180
181 def write_vcd(self, vcd_file, gtkw_file=None, *, traces=()):
182 """Write waveforms to a Value Change Dump file, optionally populating a GTKWave save file.
183
184 This method returns a context manager. It can be used as: ::
185
186 sim = Simulator(frag)
187 sim.add_clock(1e-6)
188 with sim.write_vcd("dump.vcd", "dump.gtkw"):
189 sim.run_until(1e-3)
190
191 Arguments
192 ---------
193 vcd_file : str or file-like object
194 Verilog Value Change Dump file or filename.
195 gtkw_file : str or file-like object
196 GTKWave save file or filename.
197 traces : iterable of Signal
198 Signals to display traces for.
199 """
200 if self._engine.now != 0.0:
201 for file in (vcd_file, gtkw_file):
202 if hasattr(file, "close"):
203 file.close()
204 raise ValueError("Cannot start writing waveforms after advancing simulation time")
205
206 return self._engine.write_vcd(vcd_file=vcd_file, gtkw_file=gtkw_file, traces=traces)