class DictAction(argparse.Action):
def __call__(self, parser, namespace, values, option_string=None):
- assert isinstance(getattr(namespace, self.dest), dict), "Use ArgumentParser.set_defaults() to initialize {} to dict()".format(self.dest)
+ assert isinstance(getattr(namespace, self.dest), dict), f"Use ArgumentParser.set_defaults() to initialize {self.dest} to dict()"
name = option_string.lstrip(parser.prefix_chars).replace("-", "_")
getattr(namespace, self.dest)[name] = values
sv_file = init_config_file + ".sv"
sby_file = init_config_file + ".sby"
with open(sby_file, 'w') as config:
- config.write("""[options]
+ config.write(f"""[options]
mode bmc
[engines]
smtbmc
[script]
-read -formal {0}
+read -formal {sv_file}
prep -top top
[files]
-{0}
-""".format(sv_file))
+{sv_file}
+""")
- print("sby config written to {}".format(sby_file), file=sys.stderr)
+ print(f"sby config written to {sby_file}", file=sys.stderr)
sys.exit(0)
early_logmsgs = list()
if len(task_tags_all) and not found_task_tag:
tokens = line.split()
if len(tokens) > 0 and tokens[0][0] == line[0] and tokens[0].endswith(":"):
- print("ERROR: Invalid task specifier \"{}\".".format(tokens[0]), file=sys.stderr)
+ print(f"ERROR: Invalid task specifier \"{tokens[0]}\".", file=sys.stderr)
sys.exit(1)
if task_skip_line or task_skip_block:
handle_line(line)
if taskname is not None and not task_matched:
- print("ERROR: Task name '{}' didn't match any lines in [tasks].".format(taskname), file=sys.stderr)
+ print(f"ERROR: Task name '{taskname}' didn't match any lines in [tasks].", file=sys.stderr)
sys.exit(1)
return cfgdata, tasklist
shutil.move(my_workdir, "{}.bak{:03d}".format(my_workdir, backup_idx))
if opt_force and not reusedir:
- early_log(my_workdir, "Removing directory '{}'.".format(os.path.abspath(my_workdir)))
+ early_log(my_workdir, f"Removing directory '{os.path.abspath(my_workdir)}'.")
if sbyfile:
shutil.rmtree(my_workdir, ignore_errors=True)
if reusedir:
pass
elif os.path.isdir(my_workdir):
- print("ERROR: Directory '{}' already exists.".format(my_workdir))
+ print(f"ERROR: Directory '{my_workdir}' already exists.")
sys.exit(1)
else:
os.makedirs(my_workdir)
pass
if my_opt_tmpdir:
- job.log("Removing directory '{}'.".format(my_workdir))
+ job.log(f"Removing directory '{my_workdir}'.")
shutil.rmtree(my_workdir, ignore_errors=True)
if setupmode:
- job.log("SETUP COMPLETE (rc={})".format(job.retcode))
+ job.log(f"SETUP COMPLETE (rc={job.retcode})")
else:
- job.log("DONE ({}, rc={})".format(job.status, job.retcode))
+ job.log(f"DONE ({job.status}, rc={job.retcode})")
job.logfile.close()
if not my_opt_tmpdir and not setupmode:
junit_errors = 1 if job.retcode == 16 else 0
junit_failures = 1 if job.retcode != 0 and junit_errors == 0 else 0
print('<?xml version="1.0" encoding="UTF-8"?>', file=f)
- print('<testsuites disabled="0" errors="{}" failures="{}" tests="1" time="{}">'.format(junit_errors, junit_failures, job.total_time), file=f)
- print('<testsuite disabled="0" errors="{}" failures="{}" name="{}" skipped="0" tests="1" time="{}">'.format(junit_errors, junit_failures, junit_ts_name, job.total_time), file=f)
+ print(f'<testsuites disabled="0" errors="{junit_errors}" failures="{junit_failures}" tests="1" time="{job.total_time}">', file=f)
+ print(f'<testsuite disabled="0" errors="{junit_errors}" failures="{junit_failures}" name="{junit_ts_name}" skipped="0" tests="1" time="{job.total_time}">', file=f)
print('<properties>', file=f)
- print('<property name="os" value="{}"/>'.format(os.name), file=f)
+ print(f'<property name="os" value="{os.name}"/>', file=f)
print('</properties>', file=f)
- print('<testcase classname="{}" name="{}" status="{}" time="{}">'.format(junit_ts_name, junit_tc_name, job.status, job.total_time), file=f)
+ print(f'<testcase classname="{junit_ts_name}" name="{junit_tc_name}" status="{job.status}" time="{job.total_time}">', file=f)
if junit_errors:
- print('<error message="{}" type="{}"/>'.format(job.status, job.status), file=f)
+ print(f'<error message="{job.status}" type="{job.status}"/>', file=f)
if junit_failures:
- print('<failure message="{}" type="{}"/>'.format(job.status, job.status), file=f)
+ print(f'<failure message="{job.status}" type="{job.status}"/>', file=f)
print('<system-out>', end="", file=f)
- with open("{}/logfile.txt".format(job.workdir), "r") as logf:
+ with open(f"{job.workdir}/logfile.txt", "r") as logf:
for line in logf:
print(line.replace("&", "&").replace("<", "<").replace(">", ">").replace("\"", """), end="", file=f)
print('</system-out></testcase></testsuite></testsuites>', file=f)
- with open("{}/status".format(job.workdir), "w") as f:
- print("{} {} {}".format(job.status, job.retcode, job.total_time), file=f)
+ with open(f"{job.workdir}/status", "w") as f:
+ print(f"{job.status} {job.retcode} {job.total_time}", file=f)
return job.retcode
if line is not None and (self.noprintregex is None or not self.noprintregex.match(line)):
if self.logfile is not None:
print(line, file=self.logfile)
- self.job.log("{}: {}".format(self.info, line))
+ self.job.log(f"{self.info}: {line}")
def handle_output(self, line):
if self.terminated or len(line) == 0:
return
if self.running:
if not self.silent:
- self.job.log("{}: terminating process".format(self.info))
+ self.job.log(f"{self.info}: terminating process")
if os.name == "posix":
try:
os.killpg(self.p.pid, signal.SIGTERM)
return
if not self.silent:
- self.job.log("{}: starting process \"{}\"".format(self.info, self.cmdline))
+ self.job.log(f"{self.info}: starting process \"{self.cmdline}\"")
if os.name == "posix":
def preexec_fn():
if self.p.poll() is not None:
if not self.silent:
- self.job.log("{}: finished (returncode={})".format(self.info, self.p.returncode))
+ self.job.log(f"{self.info}: finished (returncode={self.p.returncode})")
self.job.tasks_running.remove(self)
all_tasks_running.remove(self)
self.running = False
if self.p.returncode == 127:
self.job.status = "ERROR"
if not self.silent:
- self.job.log("{}: COMMAND NOT FOUND. ERROR.".format(self.info))
+ self.job.log(f"{self.info}: COMMAND NOT FOUND. ERROR.")
self.terminated = True
self.job.terminate()
return
if self.checkretcode and self.p.returncode != 0:
self.job.status = "ERROR"
if not self.silent:
- self.job.log("{}: job failed. ERROR.".format(self.info))
+ self.job.log(f"{self.info}: job failed. ERROR.")
self.terminated = True
self.job.terminate()
return
self.summary = list()
- self.logfile = open("{}/logfile.txt".format(workdir), "a")
+ self.logfile = open(f"{workdir}/logfile.txt", "a")
for line in early_logs:
print(line, file=self.logfile, flush=True)
if not reusedir:
- with open("{}/config.sby".format(workdir), "w") as f:
+ with open(f"{workdir}/config.sby", "w") as f:
for line in sbyconfig:
print(line, file=f)
if self.opt_timeout is not None:
total_clock_time = int(time() - self.start_clock_time)
if total_clock_time > self.opt_timeout:
- self.log("Reached TIMEOUT ({} seconds). Terminating all tasks.".format(self.opt_timeout))
+ self.log(f"Reached TIMEOUT ({self.opt_timeout} seconds). Terminating all tasks.")
self.status = "TIMEOUT"
self.terminate(timeout=True)
if "ERROR" not in self.expect:
self.retcode = 16
self.terminate()
- with open("{}/{}".format(self.workdir, self.status), "w") as f:
- print("ERROR: {}".format(logmessage), file=f)
+ with open(f"{self.workdir}/{self.status}", "w") as f:
+ print(f"ERROR: {logmessage}", file=f)
raise SbyAbort(logmessage)
def makedirs(self, path):
for dstfile, lines in self.verbatim_files.items():
dstfile = self.workdir + "/src/" + dstfile
- self.log("Writing '{}'.".format(dstfile))
+ self.log(f"Writing '{dstfile}'.")
with open(dstfile, "w") as f:
for line in lines:
for dstfile, srcfile in self.files.items():
if dstfile.startswith("/") or dstfile.startswith("../") or ("/../" in dstfile):
- self.error("destination filename must be a relative path without /../: {}".format(dstfile))
+ self.error(f"destination filename must be a relative path without /../: {dstfile}")
dstfile = self.workdir + "/src/" + dstfile
srcfile = process_filename(srcfile)
if basedir != "" and not os.path.exists(basedir):
os.makedirs(basedir)
- self.log("Copy '{}' to '{}'.".format(os.path.abspath(srcfile), os.path.abspath(dstfile)))
+ self.log(f"Copy '{os.path.abspath(srcfile)}' to '{os.path.abspath(dstfile)}'.")
copyfile(srcfile, dstfile)
def handle_str_option(self, option_name, default_value):
def handle_bool_option(self, option_name, default_value):
if option_name in self.options:
if self.options[option_name] not in ["on", "off"]:
- self.error("Invalid value '{}' for boolean option {}.".format(self.options[option_name], option_name))
+ self.error(f"Invalid value '{self.options[option_name]}' for boolean option {option_name}.")
self.__dict__["opt_" + option_name] = self.options[option_name] == "on"
self.used_options.add(option_name)
else:
self.__dict__["opt_" + option_name] = default_value
def make_model(self, model_name):
- if not os.path.isdir("{}/model".format(self.workdir)):
- os.makedirs("{}/model".format(self.workdir))
+ if not os.path.isdir(f"{self.workdir}/model"):
+ os.makedirs(f"{self.workdir}/model")
if model_name in ["base", "nomem"]:
- with open("{}/model/design{}.ys".format(self.workdir, "" if model_name == "base" else "_nomem"), "w") as f:
- print("# running in {}/src/".format(self.workdir), file=f)
+ with open(f"""{self.workdir}/model/design{"" if model_name == "base" else "_nomem"}.ys""", "w") as f:
+ print(f"# running in {self.workdir}/src/", file=f)
for cmd in self.script:
print(cmd, file=f)
if model_name == "base":
print("opt -keepdc -fast", file=f)
print("check", file=f)
print("hierarchy -simcheck", file=f)
- print("write_ilang ../model/design{}.il".format("" if model_name == "base" else "_nomem"), file=f)
-
- task = SbyTask(self, model_name, [],
- "cd {}/src; {} -ql ../model/design{s}.log ../model/design{s}.ys".format(self.workdir, self.exe_paths["yosys"],
- s="" if model_name == "base" else "_nomem"))
+ print(f"""write_ilang ../model/design{"" if model_name == "base" else "_nomem"}.il""", file=f)
+
+ task = SbyTask(
+ self,
+ model_name,
+ [],
+ "cd {}/src; {} -ql ../model/design{s}.log ../model/design{s}.ys".format(self.workdir, self.exe_paths["yosys"],
+ s="" if model_name == "base" else "_nomem")
+ )
task.checkretcode = True
return [task]
if re.match(r"^smt2(_syn)?(_nomem)?(_stbv|_stdt)?$", model_name):
- with open("{}/model/design_{}.ys".format(self.workdir, model_name), "w") as f:
- print("# running in {}/model/".format(self.workdir), file=f)
- print("read_ilang design{}.il".format("_nomem" if "_nomem" in model_name else ""), file=f)
+ with open(f"{self.workdir}/model/design_{model_name}.ys", "w") as f:
+ print(f"# running in {self.workdir}/model/", file=f)
+ print(f"""read_ilang design{"_nomem" if "_nomem" in model_name else ""}.il""", file=f)
if "_syn" in model_name:
print("techmap", file=f)
print("opt -fast", file=f)
print("dffunmap", file=f)
print("stat", file=f)
if "_stbv" in model_name:
- print("write_smt2 -stbv -wires design_{}.smt2".format(model_name), file=f)
+ print(f"write_smt2 -stbv -wires design_{model_name}.smt2", file=f)
elif "_stdt" in model_name:
- print("write_smt2 -stdt -wires design_{}.smt2".format(model_name), file=f)
+ print(f"write_smt2 -stdt -wires design_{model_name}.smt2", file=f)
else:
- print("write_smt2 -wires design_{}.smt2".format(model_name), file=f)
-
- task = SbyTask(self, model_name, self.model("nomem" if "_nomem" in model_name else "base"),
- "cd {}/model; {} -ql design_{s}.log design_{s}.ys".format(self.workdir, self.exe_paths["yosys"], s=model_name))
+ print(f"write_smt2 -wires design_{model_name}.smt2", file=f)
+
+ task = SbyTask(
+ self,
+ model_name,
+ self.model("nomem" if "_nomem" in model_name else "base"),
+ "cd {}/model; {} -ql design_{s}.log design_{s}.ys".format(self.workdir, self.exe_paths["yosys"], s=model_name)
+ )
task.checkretcode = True
return [task]
if re.match(r"^btor(_syn)?(_nomem)?$", model_name):
- with open("{}/model/design_{}.ys".format(self.workdir, model_name), "w") as f:
- print("# running in {}/model/".format(self.workdir), file=f)
- print("read_ilang design{}.il".format("_nomem" if "_nomem" in model_name else ""), file=f)
+ with open(f"{self.workdir}/model/design_{model_name}.ys", "w") as f:
+ print(f"# running in {self.workdir}/model/", file=f)
+ print(f"""read_ilang design{"_nomem" if "_nomem" in model_name else ""}.il""", file=f)
print("flatten", file=f)
print("setundef -undriven -anyseq", file=f)
if "_syn" in model_name:
print("stat", file=f)
print("write_btor {}-i design_{m}.info design_{m}.btor".format("-c " if self.opt_mode == "cover" else "", m=model_name), file=f)
- task = SbyTask(self, model_name, self.model("nomem" if "_nomem" in model_name else "base"),
- "cd {}/model; {} -ql design_{s}.log design_{s}.ys".format(self.workdir, self.exe_paths["yosys"], s=model_name))
+ task = SbyTask(
+ self,
+ model_name,
+ self.model("nomem" if "_nomem" in model_name else "base"),
+ "cd {}/model; {} -ql design_{s}.log design_{s}.ys".format(self.workdir, self.exe_paths["yosys"], s=model_name)
+ )
task.checkretcode = True
return [task]
if model_name == "aig":
- with open("{}/model/design_aiger.ys".format(self.workdir), "w") as f:
- print("# running in {}/model/".format(self.workdir), file=f)
+ with open(f"{self.workdir}/model/design_aiger.ys", "w") as f:
+ print(f"# running in {self.workdir}/model/", file=f)
print("read_ilang design_nomem.il", file=f)
print("flatten", file=f)
print("setundef -undriven -anyseq", file=f)
print("stat", file=f)
print("write_aiger -I -B -zinit -map design_aiger.aim design_aiger.aig", file=f)
- task = SbyTask(self, "aig", self.model("nomem"),
- "cd {}/model; {} -ql design_aiger.log design_aiger.ys".format(self.workdir, self.exe_paths["yosys"]))
+ task = SbyTask(
+ self,
+ "aig",
+ self.model("nomem"),
+ f"""cd {self.workdir}/model; {self.exe_paths["yosys"]} -ql design_aiger.log design_aiger.ys"""
+ )
task.checkretcode = True
return [task]
mode = None
key = None
- with open("{}/config.sby".format(self.workdir), "r") as f:
+ with open(f"{self.workdir}/config.sby", "r") as f:
for line in f:
raw_line = line
if mode in ["options", "engines", "files"]:
if match:
entries = match.group(1).split()
if len(entries) == 0:
- self.error("sby file syntax error: {}".format(line))
+ self.error(f"sby file syntax error: {line}")
if entries[0] == "options":
mode = "options"
if len(self.options) != 0 or len(entries) != 1:
- self.error("sby file syntax error: {}".format(line))
+ self.error(f"sby file syntax error: {line}")
continue
if entries[0] == "engines":
mode = "engines"
if len(self.engines) != 0 or len(entries) != 1:
- self.error("sby file syntax error: {}".format(line))
+ self.error(f"sby file syntax error: {line}")
continue
if entries[0] == "script":
mode = "script"
if len(self.script) != 0 or len(entries) != 1:
- self.error("sby file syntax error: {}".format(line))
+ self.error(f"sby file syntax error: {line}")
continue
if entries[0] == "file":
mode = "file"
if len(entries) != 2:
- self.error("sby file syntax error: {}".format(line))
+ self.error(f"sby file syntax error: {line}")
current_verbatim_file = entries[1]
if current_verbatim_file in self.verbatim_files:
- self.error("duplicate file: {}".format(entries[1]))
+ self.error(f"duplicate file: {entries[1]}")
self.verbatim_files[current_verbatim_file] = list()
continue
if entries[0] == "files":
mode = "files"
if len(entries) != 1:
- self.error("sby file syntax error: {}".format(line))
+ self.error(f"sby file syntax error: {line}")
continue
- self.error("sby file syntax error: {}".format(line))
+ self.error(f"sby file syntax error: {line}")
if mode == "options":
entries = line.split()
if len(entries) != 2:
- self.error("sby file syntax error: {}".format(line))
+ self.error(f"sby file syntax error: {line}")
self.options[entries[0]] = entries[1]
continue
elif len(entries) == 2:
self.files[entries[0]] = entries[1]
else:
- self.error("sby file syntax error: {}".format(line))
+ self.error(f"sby file syntax error: {line}")
continue
if mode == "file":
self.verbatim_files[current_verbatim_file].append(raw_line)
continue
- self.error("sby file syntax error: {}".format(line))
+ self.error(f"sby file syntax error: {line}")
self.handle_str_option("mode", None)
if self.opt_mode not in ["bmc", "prove", "cover", "live"]:
- self.error("Invalid mode: {}".format(self.opt_mode))
+ self.error(f"Invalid mode: {self.opt_mode}")
self.expect = ["PASS"]
if "expect" in self.options:
for s in self.expect:
if s not in ["PASS", "FAIL", "UNKNOWN", "ERROR", "TIMEOUT"]:
- self.error("Invalid expect value: {}".format(s))
+ self.error(f"Invalid expect value: {s}")
self.handle_bool_option("multiclock", False)
self.handle_bool_option("wait", False)
self.error("Config file is lacking engine configuration.")
if self.reusedir:
- rmtree("{}/model".format(self.workdir), ignore_errors=True)
+ rmtree(f"{self.workdir}/model", ignore_errors=True)
else:
self.copy_src()
for opt in self.options.keys():
if opt not in self.used_options:
- self.error("Unused option: {}".format(opt))
+ self.error(f"Unused option: {opt}")
self.taskloop()
] + self.summary
for line in self.summary:
- self.log("summary: {}".format(line))
+ self.log(f"summary: {line}")
assert self.status in ["PASS", "FAIL", "UNKNOWN", "ERROR", "TIMEOUT"]
if self.status == "TIMEOUT": self.retcode = 8
if self.status == "ERROR": self.retcode = 16
- with open("{}/{}".format(self.workdir, self.status), "w") as f:
+ with open(f"{self.workdir}/{self.status}", "w") as f:
for line in self.summary:
print(line, file=f)
if abc_command[0] == "bmc3":
if mode != "bmc":
job.error("ABC command 'bmc3' is only valid in bmc mode.")
- abc_command[0] += " -F {} -v".format(job.opt_depth)
+ abc_command[0] += f" -F {job.opt_depth} -v"
elif abc_command[0] == "sim3":
if mode != "bmc":
job.error("ABC command 'sim3' is only valid in bmc mode.")
- abc_command[0] += " -F {} -v".format(job.opt_depth)
+ abc_command[0] += f" -F {job.opt_depth} -v"
elif abc_command[0] == "pdr":
if mode != "prove":
job.error("ABC command 'pdr' is only valid in prove mode.")
else:
- job.error("Invalid ABC command {}.".format(abc_command[0]))
+ job.error(f"Invalid ABC command {abc_command[0]}.")
- task = SbyTask(job, "engine_{}".format(engine_idx), job.model("aig"),
- ("cd {}; {} -c 'read_aiger model/design_aiger.aig; fold; strash; {}; write_cex -a engine_{}/trace.aiw'").format
- (job.workdir, job.exe_paths["abc"], " ".join(abc_command), engine_idx),
- logfile=open("{}/engine_{}/logfile.txt".format(job.workdir, engine_idx), "w"))
+ task = SbyTask(
+ job,
+ f"engine_{engine_idx}",
+ job.model("aig"),
+ f"""cd {job.workdir}; {job.exe_paths["abc"]} -c 'read_aiger model/design_aiger.aig; fold; strash; {" ".join(abc_command)}; write_cex -a engine_{engine_idx}/trace.aiw'""",
+ logfile=open(f"{job.workdir}/engine_{engine_idx}/logfile.txt", "w")
+ )
task.noprintregex = re.compile(r"^\.+$")
task_status = None
assert task_status is not None
job.update_status(task_status)
- job.log("engine_{}: Status returned by engine: {}".format(engine_idx, task_status))
- job.summary.append("engine_{} ({}) returned {}".format(engine_idx, " ".join(engine), task_status))
+ job.log(f"engine_{engine_idx}: Status returned by engine: {task_status}")
+ job.summary.append(f"""engine_{engine_idx} ({" ".join(engine)}) returned {task_status}""")
job.terminate()
if task_status == "FAIL" and job.opt_aigsmt != "none":
- task2 = SbyTask(job, "engine_{}".format(engine_idx), job.model("smt2"),
- ("cd {}; {} -s {}{} --noprogress --append {} --dump-vcd engine_{i}/trace.vcd --dump-vlogtb engine_{i}/trace_tb.v " +
+ task2 = SbyTask(
+ job,
+ f"engine_{engine_idx}",
+ job.model("smt2"),
+ ("cd {}; {} -s {}{} --noprogress --append {} --dump-vcd engine_{i}/trace.vcd --dump-vlogtb engine_{i}/trace_tb.v " +
"--dump-smtc engine_{i}/trace.smtc --aig model/design_aiger.aim:engine_{i}/trace.aiw --aig-noheader model/design_smt2.smt2").format
(job.workdir, job.exe_paths["smtbmc"], job.opt_aigsmt,
- "" if job.opt_tbtop is None else " --vlogtb-top {}".format(job.opt_tbtop),
+ "" if job.opt_tbtop is None else f" --vlogtb-top {job.opt_tbtop}",
job.opt_append, i=engine_idx),
- logfile=open("{}/engine_{}/logfile2.txt".format(job.workdir, engine_idx), "w"))
+ logfile=open(f"{job.workdir}/engine_{engine_idx}/logfile2.txt", "w")
+ )
task2_status = None
assert task2_status is not None
assert task2_status == "FAIL"
- if os.path.exists("{}/engine_{}/trace.vcd".format(job.workdir, engine_idx)):
- job.summary.append("counterexample trace: {}/engine_{}/trace.vcd".format(job.workdir, engine_idx))
+ if os.path.exists(f"{job.workdir}/engine_{engine_idx}/trace.vcd"):
+ job.summary.append(f"counterexample trace: {job.workdir}/engine_{engine_idx}/trace.vcd")
task2.output_callback = output_callback2
task2.exit_callback = exit_callback2
solver_cmd = " ".join([job.exe_paths["aigbmc"]] + solver_args[1:])
else:
- job.error("Invalid solver command {}.".format(solver_args[0]))
+ job.error(f"Invalid solver command {solver_args[0]}.")
- task = SbyTask(job, "engine_{}".format(engine_idx), job.model("aig"),
- "cd {}; {} model/design_aiger.aig".format(job.workdir, solver_cmd),
- logfile=open("{}/engine_{}/logfile.txt".format(job.workdir, engine_idx), "w"))
+ task = SbyTask(
+ job,
+ f"engine_{engine_idx}",
+ job.model("aig"),
+ f"cd {job.workdir}; {solver_cmd} model/design_aiger.aig",
+ logfile=open(f"{job.workdir}/engine_{engine_idx}/logfile.txt", "w")
+ )
task_status = None
produced_cex = False
end_of_cex = False
- aiw_file = open("{}/engine_{}/trace.aiw".format(job.workdir, engine_idx), "w")
+ aiw_file = open(f"{job.workdir}/engine_{engine_idx}/trace.aiw", "w")
def output_callback(line):
nonlocal task_status
return None
if line.startswith("u"):
- return "No CEX up to depth {}.".format(int(line[1:])-1)
+ return f"No CEX up to depth {int(line[1:])-1}."
if line in ["0", "1", "2"]:
print(line, file=aiw_file)
aiw_file.close()
job.update_status(task_status)
- job.log("engine_{}: Status returned by engine: {}".format(engine_idx, task_status))
- job.summary.append("engine_{} ({}) returned {}".format(engine_idx, " ".join(engine), task_status))
+ job.log(f"engine_{engine_idx}: Status returned by engine: {task_status}")
+ job.summary.append(f"""engine_{engine_idx} ({" ".join(engine)}) returned {task_status}""")
job.terminate()
if task_status == "FAIL" and job.opt_aigsmt != "none":
if produced_cex:
if mode == "live":
- task2 = SbyTask(job, "engine_{}".format(engine_idx), job.model("smt2"),
- ("cd {}; {} -g -s {}{} --noprogress --dump-vcd engine_{i}/trace.vcd --dump-vlogtb engine_{i}/trace_tb.v " +
+ task2 = SbyTask(
+ job,
+ f"engine_{engine_idx}",
+ job.model("smt2"),
+ ("cd {}; {} -g -s {}{} --noprogress --dump-vcd engine_{i}/trace.vcd --dump-vlogtb engine_{i}/trace_tb.v " +
"--dump-smtc engine_{i}/trace.smtc --aig model/design_aiger.aim:engine_{i}/trace.aiw model/design_smt2.smt2").format
(job.workdir, job.exe_paths["smtbmc"], job.opt_aigsmt,
- "" if job.opt_tbtop is None else " --vlogtb-top {}".format(job.opt_tbtop),
+ "" if job.opt_tbtop is None else f" --vlogtb-top {job.opt_tbtop}",
i=engine_idx),
- logfile=open("{}/engine_{}/logfile2.txt".format(job.workdir, engine_idx), "w"))
+ logfile=open(f"{job.workdir}/engine_{engine_idx}/logfile2.txt", "w")
+ )
else:
- task2 = SbyTask(job, "engine_{}".format(engine_idx), job.model("smt2"),
- ("cd {}; {} -s {}{} --noprogress --append {} --dump-vcd engine_{i}/trace.vcd --dump-vlogtb engine_{i}/trace_tb.v " +
+ task2 = SbyTask(
+ job,
+ f"engine_{engine_idx}",
+ job.model("smt2"),
+ ("cd {}; {} -s {}{} --noprogress --append {} --dump-vcd engine_{i}/trace.vcd --dump-vlogtb engine_{i}/trace_tb.v " +
"--dump-smtc engine_{i}/trace.smtc --aig model/design_aiger.aim:engine_{i}/trace.aiw model/design_smt2.smt2").format
(job.workdir, job.exe_paths["smtbmc"], job.opt_aigsmt,
- "" if job.opt_tbtop is None else " --vlogtb-top {}".format(job.opt_tbtop),
+ "" if job.opt_tbtop is None else f" --vlogtb-top {job.opt_tbtop}",
job.opt_append, i=engine_idx),
- logfile=open("{}/engine_{}/logfile2.txt".format(job.workdir, engine_idx), "w"))
+ logfile=open(f"{job.workdir}/engine_{engine_idx}/logfile2.txt", "w")
+ )
task2_status = None
else:
assert task2_status == "FAIL"
- if os.path.exists("{}/engine_{}/trace.vcd".format(job.workdir, engine_idx)):
- job.summary.append("counterexample trace: {}/engine_{}/trace.vcd".format(job.workdir, engine_idx))
+ if os.path.exists(f"{job.workdir}/engine_{engine_idx}/trace.vcd"):
+ job.summary.append(f"counterexample trace: {job.workdir}/engine_{engine_idx}/trace.vcd")
task2.output_callback = output_callback2
task2.exit_callback = exit_callback2
else:
- job.log("engine_{}: Engine did not produce a counter example.".format(engine_idx))
+ job.log(f"engine_{engine_idx}: Engine did not produce a counter example.")
task.output_callback = output_callback
task.exit_callback = exit_callback
if solver_args[0] == "btormc":
solver_cmd = ""
if random_seed:
- solver_cmd += "BTORSEED={} ".format(random_seed)
- solver_cmd += job.exe_paths["btormc"] + " --stop-first {} -v 1 -kmax {}".format(0 if mode == "cover" else 1, job.opt_depth - 1)
+ solver_cmd += f"BTORSEED={random_seed} "
+ solver_cmd += job.exe_paths["btormc"] + f""" --stop-first {0 if mode == "cover" else 1} -v 1 -kmax {job.opt_depth - 1}"""
if job.opt_skip is not None:
- solver_cmd += " -kmin {}".format(job.opt_skip)
+ solver_cmd += f" -kmin {job.opt_skip}"
solver_cmd += " ".join([""] + solver_args[1:])
elif solver_args[0] == "pono":
if random_seed:
job.error("Setting the random seed is not available for the pono solver.")
- solver_cmd = job.exe_paths["pono"] + " -v 1 -e bmc -k {}".format(job.opt_depth - 1)
+ solver_cmd = job.exe_paths["pono"] + f" -v 1 -e bmc -k {job.opt_depth - 1}"
else:
- job.error("Invalid solver command {}.".format(solver_args[0]))
+ job.error(f"Invalid solver command {solver_args[0]}.")
common_state = SimpleNamespace()
common_state.solver_status = None
elif common_state.solver_status == "unsat":
task_status = "FAIL"
else:
- job.error("engine_{}: Engine terminated without status.".format(engine_idx))
+ job.error(f"engine_{engine_idx}: Engine terminated without status.")
else:
if common_state.expected_cex == 0:
task_status = "pass"
elif common_state.solver_status == "unsat":
task_status = "pass"
else:
- job.error("engine_{}: Engine terminated without status.".format(engine_idx))
+ job.error(f"engine_{engine_idx}: Engine terminated without status.")
job.update_status(task_status.upper())
- job.log("engine_{}: Status returned by engine: {}".format(engine_idx, task_status))
- job.summary.append("engine_{} ({}) returned {}".format(engine_idx, " ".join(engine), task_status))
+ job.log(f"engine_{engine_idx}: Status returned by engine: {task_status}")
+ job.summary.append(f"""engine_{engine_idx} ({" ".join(engine)}) returned {task_status}""")
if len(common_state.produced_traces) == 0:
- job.log("engine_{}: Engine did not produce a{}example.".format(engine_idx, " counter" if mode != "cover" else "n "))
+ job.log(f"""engine_{engine_idx}: Engine did not produce a{" counter" if mode != "cover" else "n "}example.""")
elif len(common_state.produced_traces) <= common_state.print_traces_max:
job.summary.extend(common_state.produced_traces)
else:
job.summary.extend(common_state.produced_traces[:common_state.print_traces_max])
excess_traces = len(common_state.produced_traces) - common_state.print_traces_max
- job.summary.append("and {} further trace{}".format(excess_traces, "s" if excess_traces > 1 else ""))
+ job.summary.append(f"""and {excess_traces} further trace{"s" if excess_traces > 1 else ""}""")
job.terminate()
def exit_callback2(retcode):
assert retcode == 0
- vcdpath = "{}/engine_{}/trace{}.vcd".format(job.workdir, engine_idx, suffix)
+ vcdpath = f"{job.workdir}/engine_{engine_idx}/trace{suffix}.vcd"
if os.path.exists(vcdpath):
- common_state.produced_traces.append("{}trace: {}".format("" if mode == "cover" else "counterexample ", vcdpath))
+ common_state.produced_traces.append(f"""{"" if mode == "cover" else "counterexample "}trace: {vcdpath}""")
common_state.running_tasks -= 1
if (common_state.running_tasks == 0):
assert common_state.produced_cex == 0
else:
- job.error("engine_{}: BTOR solver '{}' is currently not supported in cover mode.".format(engine_idx, solver_args[0]))
+ job.error(f"engine_{engine_idx}: BTOR solver '{solver_args[0]}' is currently not supported in cover mode.")
if (common_state.produced_cex < common_state.expected_cex) and line == "sat":
assert common_state.wit_file == None
if common_state.expected_cex == 1:
- common_state.wit_file = open("{}/engine_{}/trace.wit".format(job.workdir, engine_idx), "w")
+ common_state.wit_file = open(f"{job.workdir}/engine_{engine_idx}/trace.wit", "w")
else:
- common_state.wit_file = open("{}/engine_{}/trace{}.wit".format(job.workdir, engine_idx, common_state.produced_cex), "w")
+ common_state.wit_file = open(f"""{job.workdir}/engine_{engine_idx}/trace{common_state.produced_cex}.wit""", "w")
if solver_args[0] != "btormc":
task.log("Found satisfiability witness.")
suffix = ""
else:
suffix = common_state.produced_cex
- task2 = SbyTask(job, "engine_{}_{}".format(engine_idx, common_state.produced_cex), job.model("btor"),
- "cd {dir} ; btorsim -c --vcd engine_{idx}/trace{i}.vcd --hierarchical-symbols --info model/design_btor.info model/design_btor.btor engine_{idx}/trace{i}.wit".format(dir=job.workdir, idx=engine_idx, i=suffix),
- logfile=open("{dir}/engine_{idx}/logfile2.txt".format(dir=job.workdir, idx=engine_idx), "w"))
+ task2 = SbyTask(
+ job,
+ f"engine_{engine_idx}_{common_state.produced_cex}",
+ job.model("btor"),
+ "cd {dir} ; btorsim -c --vcd engine_{idx}/trace{i}.vcd --hierarchical-symbols --info model/design_btor.info model/design_btor.btor engine_{idx}/trace{i}.wit".format(dir=job.workdir, idx=engine_idx, i=suffix),
+ logfile=open(f"{job.workdir}/engine_{engine_idx}/logfile2.txt", "w")
+ )
task2.output_callback = output_callback2
task2.exit_callback = make_exit_callback(suffix)
task2.checkretcode = True
if common_state.solver_status == "unsat":
if common_state.expected_cex == 1:
- with open("{}/engine_{}/trace.wit".format(job.workdir, engine_idx), "w") as wit_file:
+ with open(f"""{job.workdir}/engine_{engine_idx}/trace.wit""", "w") as wit_file:
print("unsat", file=wit_file)
else:
for i in range(common_state.produced_cex, common_state.expected_cex):
- with open("{}/engine_{}/trace{}.wit".format(job.workdir, engine_idx, i), "w") as wit_file:
+ with open(f"{job.workdir}/engine_{engine_idx}/trace{i}.wit", "w") as wit_file:
print("unsat", file=wit_file)
common_state.running_tasks -= 1
if (common_state.running_tasks == 0):
print_traces_and_terminate()
- task = SbyTask(job, "engine_{}".format(engine_idx), job.model("btor"),
- "cd {}; {} model/design_btor.btor".format(job.workdir, solver_cmd),
- logfile=open("{}/engine_{}/logfile.txt".format(job.workdir, engine_idx), "w"))
+ task = SbyTask(
+ job,
+ f"engine_{engine_idx}", job.model("btor"),
+ f"cd {job.workdir}; {solver_cmd} model/design_btor.btor",
+ logfile=open("{job.workdir}/engine_{engine_idx}/logfile.txt", "w")
+ )
task.output_callback = output_callback
task.exit_callback = exit_callback
elif o == "--seed":
random_seed = a
else:
- job.error("Invalid smtbmc options {}.".format(o))
+ job.error(f"Invalid smtbmc options {o}.")
xtra_opts = False
for i, a in enumerate(args):
smtbmc_opts += ["--unroll"]
if job.opt_smtc is not None:
- smtbmc_opts += ["--smtc", "src/{}".format(job.opt_smtc)]
+ smtbmc_opts += ["--smtc", f"src/{job.opt_smtc}"]
if job.opt_tbtop is not None:
smtbmc_opts += ["--vlogtb-top", job.opt_tbtop]
run("prove_induction", job, engine_idx, engine)
return
- taskname = "engine_{}".format(engine_idx)
- trace_prefix = "engine_{}/trace".format(engine_idx)
- logfile_prefix = "{}/engine_{}/logfile".format(job.workdir, engine_idx)
+ taskname = f"engine_{engine_idx}"
+ trace_prefix = f"engine_{engine_idx}/trace"
+ logfile_prefix = f"{job.workdir}/engine_{engine_idx}/logfile"
if mode == "prove_basecase":
taskname += ".basecase"
else:
t_opt = "{}".format(job.opt_depth)
- task = SbyTask(job, taskname, job.model(model_name),
- "cd {}; {} {} -t {} {} --append {} --dump-vcd {p}.vcd --dump-vlogtb {p}_tb.v --dump-smtc {p}.smtc model/design_{}.smt2".format
- (job.workdir, job.exe_paths["smtbmc"], " ".join(smtbmc_opts), t_opt, "--info \"(set-option :random-seed {})\"".format(random_seed) if random_seed else "", job.opt_append, model_name, p=trace_prefix),
- logfile=open(logfile_prefix + ".txt", "w"), logstderr=(not progress))
+ random_seed = f"--info \"(set-option :random-seed {random_seed})\"" if random_seed else ""
+ task = SbyTask(
+ job,
+ taskname,
+ job.model(model_name),
+ f"""cd {job.workdir}; {job.exe_paths["smtbmc"]} {" ".join(smtbmc_opts)} -t {t_opt} {random_seed} --append {job.opt_append} --dump-vcd {trace_prefix}.vcd --dump-vlogtb {trace_prefix}_tb.v --dump-smtc {trace_prefix}.smtc model/design_{model_name}.smt2""",
+ logfile=open(logfile_prefix + ".txt", "w"),
+ logstderr=(not progress)
+ )
if mode == "prove_basecase":
job.basecase_tasks.append(task)
def exit_callback(retcode):
if task_status is None:
- job.error("engine_{}: Engine terminated without status.".format(engine_idx))
+ job.error(f"engine_{engine_idx}: Engine terminated without status.")
if mode == "bmc" or mode == "cover":
job.update_status(task_status)
task_status_lower = task_status.lower() if task_status == "PASS" else task_status
- job.log("engine_{}: Status returned by engine: {}".format(engine_idx, task_status_lower))
- job.summary.append("engine_{} ({}) returned {}".format(engine_idx, " ".join(engine), task_status_lower))
+ job.log(f"engine_{engine_idx}: Status returned by engine: {task_status_lower}")
+ job.summary.append(f"""engine_{engine_idx} ({" ".join(engine)}) returned {task_status_lower}""")
if task_status == "FAIL" and mode != "cover":
- if os.path.exists("{}/engine_{}/trace.vcd".format(job.workdir, engine_idx)):
- job.summary.append("counterexample trace: {}/engine_{}/trace.vcd".format(job.workdir, engine_idx))
+ if os.path.exists(f"{job.workdir}/engine_{engine_idx}/trace.vcd"):
+ job.summary.append(f"counterexample trace: {job.workdir}/engine_{engine_idx}/trace.vcd")
elif task_status == "PASS" and mode == "cover":
print_traces_max = 5
for i in range(print_traces_max):
- if os.path.exists("{}/engine_{}/trace{}.vcd".format(job.workdir, engine_idx, i)):
- job.summary.append("trace: {}/engine_{}/trace{}.vcd".format(job.workdir, engine_idx, i))
+ if os.path.exists(f"{job.workdir}/engine_{engine_idx}/trace{i}.vcd"):
+ job.summary.append(f"trace: {job.workdir}/engine_{engine_idx}/trace{i}.vcd")
else:
break
else:
excess_traces = 0
- while os.path.exists("{}/engine_{}/trace{}.vcd".format(job.workdir, engine_idx, print_traces_max + excess_traces)):
+ while os.path.exists(f"{job.workdir}/engine_{engine_idx}/trace{print_traces_max + excess_traces}.vcd"):
excess_traces += 1
if excess_traces > 0:
- job.summary.append("and {} further trace{}".format(excess_traces, "s" if excess_traces > 1 else ""))
+ job.summary.append(f"""and {excess_traces} further trace{"s" if excess_traces > 1 else ""}""")
job.terminate()
elif mode in ["prove_basecase", "prove_induction"]:
task_status_lower = task_status.lower() if task_status == "PASS" else task_status
- job.log("engine_{}: Status returned by engine for {}: {}".format(engine_idx, mode.split("_")[1], task_status_lower))
- job.summary.append("engine_{} ({}) returned {} for {}".format(engine_idx, " ".join(engine), task_status_lower, mode.split("_")[1]))
+ job.log(f"""engine_{engine_idx}: Status returned by engine for {mode.split("_")[1]}: {task_status_lower}""")
+ job.summary.append(f"""engine_{engine_idx} ({" ".join(engine)}) returned {task_status_lower} for {mode.split("_")[1]}""")
if mode == "prove_basecase":
for task in job.basecase_tasks:
else:
job.update_status(task_status)
- if os.path.exists("{}/engine_{}/trace.vcd".format(job.workdir, engine_idx)):
- job.summary.append("counterexample trace: {}/engine_{}/trace.vcd".format(job.workdir, engine_idx))
+ if os.path.exists(f"{job.workdir}/engine_{engine_idx}/trace.vcd"):
+ job.summary.append(f"counterexample trace: {job.workdir}/engine_{engine_idx}/trace.vcd")
job.terminate()
elif mode == "prove_induction":
engine = job.engines[engine_idx]
assert len(engine) > 0
- job.log("engine_{}: {}".format(engine_idx, " ".join(engine)))
- job.makedirs("{}/engine_{}".format(job.workdir, engine_idx))
+ job.log(f"""engine_{engine_idx}: {" ".join(engine)}""")
+ job.makedirs(f"{job.workdir}/engine_{engine_idx}")
if engine[0] == "smtbmc":
import sby_engine_smtbmc
sby_engine_btor.run("bmc", job, engine_idx, engine)
else:
- job.error("Invalid engine '{}' for bmc mode.".format(engine[0]))
+ job.error(f"Invalid engine '{engine[0]}' for bmc mode.")
engine = job.engines[engine_idx]
assert len(engine) > 0
- job.log("engine_{}: {}".format(engine_idx, " ".join(engine)))
- job.makedirs("{}/engine_{}".format(job.workdir, engine_idx))
+ job.log(f"""engine_{engine_idx}: {" ".join(engine)}""")
+ job.makedirs(f"{job.workdir}/engine_{engine_idx}")
if engine[0] == "smtbmc":
import sby_engine_smtbmc
sby_engine_btor.run("cover", job, engine_idx, engine)
else:
- job.error("Invalid engine '{}' for cover mode.".format(engine[0]))
+ job.error(f"Invalid engine '{engine[0]}' for cover mode.")
engine = job.engines[engine_idx]
assert len(engine) > 0
- job.log("engine_{}: {}".format(engine_idx, " ".join(engine)))
- job.makedirs("{}/engine_{}".format(job.workdir, engine_idx))
+ job.log(f"""engine_{engine_idx}: {" ".join(engine)}""")
+ job.makedirs(f"{job.workdir}/engine_{engine_idx}")
if engine[0] == "aiger":
import sby_engine_aiger
sby_engine_aiger.run("live", job, engine_idx, engine)
else:
- job.error("Invalid engine '{}' for live mode.".format(engine[0]))
+ job.error(f"Invalid engine '{engine[0]}' for live mode.")
engine = job.engines[engine_idx]
assert len(engine) > 0
- job.log("engine_{}: {}".format(engine_idx, " ".join(engine)))
- job.makedirs("{}/engine_{}".format(job.workdir, engine_idx))
+ job.log(f"""engine_{engine_idx}: {" ".join(engine)}""")
+ job.makedirs(f"{job.workdir}/engine_{engine_idx}")
if engine[0] == "smtbmc":
import sby_engine_smtbmc
sby_engine_abc.run("prove", job, engine_idx, engine)
else:
- job.error("Invalid engine '{}' for prove mode.".format(engine[0]))
+ job.error(f"Invalid engine '{engine[0]}' for prove mode.")