import os
+import shutil
import tempfile
from contextlib import contextmanager
-from nmigen.hdl import *
from nmigen.hdl.ast import SignalSet
from nmigen.hdl.xfrm import ValueVisitor, StatementVisitor, LHSGroupFilter
from nmigen.sim._base import BaseProcess
-
__all__ = ["PyRTLProcess"]
yield
self._level -= 1
+ @contextmanager
+ def nest(self):
+ self.append(f"{{")
+ self._level += 1
+ #yield self.indent()
+ yield
+ self._level -= 1
+ self.append(f"}}")
+
def flush(self, indent=""):
code = "".join(self._buffer)
self._buffer.clear()
def def_var(self, prefix, value):
name = self.gen_var(prefix)
- self.append(f"{name} = {value}")
+ self.append(f"uint64_t {name} = {value};")
return name
+ def assign(self, lhs, rhs):
+ self.append(f"{lhs} = {rhs}")
+
+ def if_(self, cond):
+ self.append(f"if ({cond})")
+
+ def else_if(self, cond):
+ self.append(f"else if ({cond})")
+
+ def else_(self):
+ self.append(f"else")
+
class _Compiler:
def __init__(self, state, emitter):
if value.elems:
for index, elem in enumerate(value.elems):
if index == 0:
- self.emitter.append(f"if {index} == {gen_index}:")
+ self.emitter.if_(f"{index} == {gen_index}")
else:
- self.emitter.append(f"elif {index} == {gen_index}:")
- with self.emitter.indent():
- self.emitter.append(f"{gen_value} = {self(elem)}")
- self.emitter.append(f"else:")
- with self.emitter.indent():
- self.emitter.append(f"{gen_value} = {self(value.elems[-1])}")
+ self.emitter.else_if(f"{index} == {gen_index}")
+ with self.emitter.nest():
+ self.emitter.assign(f"{gen_value}", f"{self(elem)}")
+ self.emitter.else_()
+ with self.emitter.nest():
+ self.emitter.assign(f"{gen_value}", f"{self(value.elems[-1])}")
return gen_value
else:
return f"0"
def compile(cls, state, value, *, mode):
emitter = _PythonEmitter()
compiler = cls(state, emitter, mode=mode)
- emitter.append(f"result = {compiler(value)}")
+ emitter.assign(f"result", f"{compiler(value)}")
return emitter.flush()
value_sign = f"sign({value_mask} & {arg}, {-1 << (len(value) - 1)})"
else: # unsigned
value_sign = f"{value_mask} & {arg}"
- self.emitter.append(f"next_{self.state.get_signal(value)} = {value_sign}")
+ self.emitter.append(f"next_{self.state.get_signal(value)} = {value_sign};")
return gen
def on_Operator(self, value):
if value.elems:
for index, elem in enumerate(value.elems):
if index == 0:
- self.emitter.append(f"if {index} == {gen_index}:")
+ self.emitter.if_(f"{index} == {gen_index}")
else:
- self.emitter.append(f"elif {index} == {gen_index}:")
- with self.emitter.indent():
+ self.emitter.append(f"{index} == {gen_index}")
+ with self.emitter.nest():
self(elem)(arg)
- self.emitter.append(f"else:")
- with self.emitter.indent():
+ self.emitter.else_
+ with self.emitter.nest():
self(value.elems[-1])(arg)
- else:
- self.emitter.append(f"pass")
return gen
value = int(pattern, 2)
gen_checks.append(f"{value} == {gen_test}")
if index == 0:
- self.emitter.append(f"if {' or '.join(gen_checks)}:")
+ self.emitter.if_(f"{' or '.join(gen_checks)}")
else:
- self.emitter.append(f"elif {' or '.join(gen_checks)}:")
- with self.emitter.indent():
+ self.emitter.else_if(f"{' or '.join(gen_checks)}")
+ with self.emitter.nest():
self(stmts)
def on_Assert(self, stmt):
def __call__(self, fragment):
processes = set()
- for domain_name, domain_signals in fragment.drivers.items():
+ for index, (domain_name, domain_signals) in enumerate(fragment.drivers.items()):
domain_stmts = LHSGroupFilter(domain_signals)(fragment.statements)
domain_process = PyRTLProcess(is_comb=domain_name is None)
emitter = _PythonEmitter()
- emitter.append(f"def run():")
- emitter._level += 1
+ emitter.append(f"void run(void)")
+ with emitter.nest():
+ if domain_name is None:
+ for signal in domain_signals:
+ signal_index = self.state.get_signal(signal)
+ emitter.append(f"uint64_t next_{signal_index} = {signal.reset};")
- if domain_name is None:
- for signal in domain_signals:
- signal_index = self.state.get_signal(signal)
- emitter.append(f"next_{signal_index} = {signal.reset}")
+ inputs = SignalSet()
+ _StatementCompiler(self.state, emitter, inputs=inputs)(domain_stmts)
- inputs = SignalSet()
- _StatementCompiler(self.state, emitter, inputs=inputs)(domain_stmts)
+ for input in inputs:
+ self.state.add_trigger(domain_process, input)
- for input in inputs:
- self.state.add_trigger(domain_process, input)
+ else:
+ domain = fragment.domains[domain_name]
+ clk_trigger = 1 if domain.clk_edge == "pos" else 0
+ self.state.add_trigger(domain_process, domain.clk, trigger=clk_trigger)
+ if domain.rst is not None and domain.async_reset:
+ rst_trigger = 1
+ self.state.add_trigger(domain_process, domain.rst, trigger=rst_trigger)
- else:
- domain = fragment.domains[domain_name]
- clk_trigger = 1 if domain.clk_edge == "pos" else 0
- self.state.add_trigger(domain_process, domain.clk, trigger=clk_trigger)
- if domain.rst is not None and domain.async_reset:
- rst_trigger = 1
- self.state.add_trigger(domain_process, domain.rst, trigger=rst_trigger)
+ for signal in domain_signals:
+ signal_index = self.state.get_signal(signal)
+ emitter.append(f"next_{signal_index} = slots[{signal_index}].next;")
+
+ _StatementCompiler(self.state, emitter)(domain_stmts)
for signal in domain_signals:
signal_index = self.state.get_signal(signal)
- emitter.append(f"next_{signal_index} = slots[{signal_index}].next")
-
- _StatementCompiler(self.state, emitter)(domain_stmts)
-
- for signal in domain_signals:
- signal_index = self.state.get_signal(signal)
- emitter.append(f"slots[{signal_index}].set(next_{signal_index})")
+ emitter.append(f"set(&slots[{signal_index}], next_{signal_index});")
# There shouldn't be any exceptions raised by the generated code, but if there are
# (almost certainly due to a bug in the code generator), use this environment variable
# to make backtraces useful.
code = emitter.flush()
- if os.getenv("NMIGEN_pysim_dump"):
- file = tempfile.NamedTemporaryFile("w", prefix="nmigen_pysim_", delete=False)
- file.write(code)
- filename = file.name
- else:
- filename = "<string>"
-
- exec_locals = {"slots": self.state.slots, **_ValueCompiler.helpers}
- exec(compile(code, filename, "exec"), exec_locals)
- domain_process.run = exec_locals["run"]
- processes.add(domain_process)
+ file = open(f"crtl/{id(fragment)}_{domain_name or ''}_{index}.c", "w")
+ file.write(code)
+ file.close()
for subfragment_index, (subfragment, subfragment_name) in enumerate(fragment.subfragments):
if subfragment_name is None: