def __init__(self, fragment, vcd_file=None, gtkw_file=None, traces=()):
self._fragment = fragment
- self._domains = {} # str/domain -> ClockDomain
+ self._domains = dict() # str/domain -> ClockDomain
self._domain_triggers = ValueDict() # Signal -> str/domain
- self._domain_signals = {} # str/domain -> {Signal}
+ self._domain_signals = dict() # str/domain -> {Signal}
self._signals = ValueSet() # {Signal}
self._comb_signals = ValueSet() # {Signal}
self._state = _State()
self._processes = set() # {process}
+ self._process_loc = dict() # process -> str/loc
self._passive = set() # {process}
self._suspended = set() # {process}
- self._wait_deadline = {} # process -> float/timestamp
- self._wait_tick = {} # process -> str/domain
+ self._wait_deadline = dict() # process -> float/timestamp
+ self._wait_tick = dict() # process -> str/domain
self._funclets = ValueDict() # Signal -> set(lambda)
self._gtkw_file = gtkw_file
self._traces = traces
- def _check_process(self, process):
+ @staticmethod
+ def _check_process(process):
if inspect.isgeneratorfunction(process):
process = process()
if not inspect.isgenerator(process):
.format(process))
return process
+ def _name_process(self, process):
+ if process in self._process_loc:
+ return self._process_loc[process]
+ else:
+ frame = process.gi_frame
+ return "{}:{}".format(inspect.getfile(frame), inspect.getlineno(frame))
+
def add_process(self, process):
process = self._check_process(process)
self._processes.add(process)
while True:
if result is None:
result = Tick(domain)
+ self._process_loc[sync_process] = self._name_process(process)
result = process.send((yield result))
except StopIteration:
pass
- self.add_process(sync_process())
+ sync_process = sync_process()
+ self.add_process(sync_process)
def add_clock(self, period, phase=None, domain="sync"):
if self._fastest_clock == self._epsilon or period < self._fastest_clock:
yield Delay(half_period)
yield clk.eq(0)
yield Delay(half_period)
- self.add_process(clk_process())
+ self.add_process(clk_process)
def __enter__(self):
if self._vcd_file:
# Otherwise, do one more round of updates.
def _run_process(self, process):
- def format_process(process):
- frame = process.gi_frame
- return "{}:{}".format(inspect.getfile(frame), inspect.getlineno(frame))
-
try:
cmd = process.send(None)
while True:
if not signal in self._signals:
raise ValueError("Process '{}' sent a request to set signal '{!r}', "
"which is not a part of simulation"
- .format(format_process(process), signal))
+ .format(self._name_process(process), signal))
if signal in self._comb_signals:
raise ValueError("Process '{}' sent a request to set signal '{!r}', "
"which is a part of combinatorial assignment in "
"simulation"
- .format(format_process(process), signal))
+ .format(self._name_process(process), signal))
compiler = _StatementCompiler()
funclet = compiler(cmd)
else:
raise TypeError("Received unsupported command '{!r}' from process '{}'"
- .format(cmd, format_process(process)))
+ .format(cmd, self._name_process(process)))
break