From 2d7d48885b61d32d3a7700dbe4a6876db1c88d4a Mon Sep 17 00:00:00 2001 From: piegames Date: Fri, 25 Jun 2021 00:02:22 +0200 Subject: [PATCH] Turn .format() strings into f-strings --- sbysrc/sby.py | 44 ++++++------ sbysrc/sby_core.py | 136 ++++++++++++++++++++---------------- sbysrc/sby_engine_abc.py | 37 ++++++---- sbysrc/sby_engine_aiger.py | 50 ++++++++----- sbysrc/sby_engine_btor.py | 55 ++++++++------- sbysrc/sby_engine_smtbmc.py | 49 +++++++------ sbysrc/sby_mode_bmc.py | 6 +- sbysrc/sby_mode_cover.py | 6 +- sbysrc/sby_mode_live.py | 6 +- sbysrc/sby_mode_prove.py | 6 +- 10 files changed, 221 insertions(+), 174 deletions(-) diff --git a/sbysrc/sby.py b/sbysrc/sby.py index 707d894..c088aaf 100644 --- a/sbysrc/sby.py +++ b/sbysrc/sby.py @@ -24,7 +24,7 @@ from time import localtime class DictAction(argparse.Action): def __call__(self, parser, namespace, values, option_string=None): - assert isinstance(getattr(namespace, self.dest), dict), "Use ArgumentParser.set_defaults() to initialize {} to dict()".format(self.dest) + assert isinstance(getattr(namespace, self.dest), dict), f"Use ArgumentParser.set_defaults() to initialize {self.dest} to dict()" name = option_string.lstrip(parser.prefix_chars).replace("-", "_") getattr(namespace, self.dest)[name] = values @@ -124,21 +124,21 @@ elif init_config_file is not None: sv_file = init_config_file + ".sv" sby_file = init_config_file + ".sby" with open(sby_file, 'w') as config: - config.write("""[options] + config.write(f"""[options] mode bmc [engines] smtbmc [script] -read -formal {0} +read -formal {sv_file} prep -top top [files] -{0} -""".format(sv_file)) +{sv_file} +""") - print("sby config written to {}".format(sby_file), file=sys.stderr) + print(f"sby config written to {sby_file}", file=sys.stderr) sys.exit(0) early_logmsgs = list() @@ -220,7 +220,7 @@ def read_sbyconfig(sbydata, taskname): if len(task_tags_all) and not found_task_tag: tokens = line.split() if len(tokens) > 0 and tokens[0][0] == line[0] and tokens[0].endswith(":"): - print("ERROR: Invalid task specifier \"{}\".".format(tokens[0]), file=sys.stderr) + print(f"ERROR: Invalid task specifier \"{tokens[0]}\".", file=sys.stderr) sys.exit(1) if task_skip_line or task_skip_block: @@ -263,7 +263,7 @@ def read_sbyconfig(sbydata, taskname): handle_line(line) if taskname is not None and not task_matched: - print("ERROR: Task name '{}' didn't match any lines in [tasks].".format(taskname), file=sys.stderr) + print(f"ERROR: Task name '{taskname}' didn't match any lines in [tasks].", file=sys.stderr) sys.exit(1) return cfgdata, tasklist @@ -343,14 +343,14 @@ def run_job(taskname): shutil.move(my_workdir, "{}.bak{:03d}".format(my_workdir, backup_idx)) if opt_force and not reusedir: - early_log(my_workdir, "Removing directory '{}'.".format(os.path.abspath(my_workdir))) + early_log(my_workdir, f"Removing directory '{os.path.abspath(my_workdir)}'.") if sbyfile: shutil.rmtree(my_workdir, ignore_errors=True) if reusedir: pass elif os.path.isdir(my_workdir): - print("ERROR: Directory '{}' already exists.".format(my_workdir)) + print(f"ERROR: Directory '{my_workdir}' already exists.") sys.exit(1) else: os.makedirs(my_workdir) @@ -388,13 +388,13 @@ def run_job(taskname): pass if my_opt_tmpdir: - job.log("Removing directory '{}'.".format(my_workdir)) + job.log(f"Removing directory '{my_workdir}'.") shutil.rmtree(my_workdir, ignore_errors=True) if setupmode: - job.log("SETUP COMPLETE (rc={})".format(job.retcode)) + job.log(f"SETUP COMPLETE (rc={job.retcode})") else: - job.log("DONE ({}, rc={})".format(job.status, job.retcode)) + job.log(f"DONE ({job.status}, rc={job.retcode})") job.logfile.close() if not my_opt_tmpdir and not setupmode: @@ -402,23 +402,23 @@ def run_job(taskname): junit_errors = 1 if job.retcode == 16 else 0 junit_failures = 1 if job.retcode != 0 and junit_errors == 0 else 0 print('', file=f) - print(''.format(junit_errors, junit_failures, job.total_time), file=f) - print(''.format(junit_errors, junit_failures, junit_ts_name, job.total_time), file=f) + print(f'', file=f) + print(f'', file=f) print('', file=f) - print(''.format(os.name), file=f) + print(f'', file=f) print('', file=f) - print(''.format(junit_ts_name, junit_tc_name, job.status, job.total_time), file=f) + print(f'', file=f) if junit_errors: - print(''.format(job.status, job.status), file=f) + print(f'', file=f) if junit_failures: - print(''.format(job.status, job.status), file=f) + print(f'', file=f) print('', end="", file=f) - with open("{}/logfile.txt".format(job.workdir), "r") as logf: + with open(f"{job.workdir}/logfile.txt", "r") as logf: for line in logf: print(line.replace("&", "&").replace("<", "<").replace(">", ">").replace("\"", """), end="", file=f) print('', file=f) - with open("{}/status".format(job.workdir), "w") as f: - print("{} {} {}".format(job.status, job.retcode, job.total_time), file=f) + with open(f"{job.workdir}/status", "w") as f: + print(f"{job.status} {job.retcode} {job.total_time}", file=f) return job.retcode diff --git a/sbysrc/sby_core.py b/sbysrc/sby_core.py index d5d8184..dbc05d3 100644 --- a/sbysrc/sby_core.py +++ b/sbysrc/sby_core.py @@ -97,7 +97,7 @@ class SbyTask: if line is not None and (self.noprintregex is None or not self.noprintregex.match(line)): if self.logfile is not None: print(line, file=self.logfile) - self.job.log("{}: {}".format(self.info, line)) + self.job.log(f"{self.info}: {line}") def handle_output(self, line): if self.terminated or len(line) == 0: @@ -119,7 +119,7 @@ class SbyTask: return if self.running: if not self.silent: - self.job.log("{}: terminating process".format(self.info)) + self.job.log(f"{self.info}: terminating process") if os.name == "posix": try: os.killpg(self.p.pid, signal.SIGTERM) @@ -140,7 +140,7 @@ class SbyTask: return if not self.silent: - self.job.log("{}: starting process \"{}\"".format(self.info, self.cmdline)) + self.job.log(f"{self.info}: starting process \"{self.cmdline}\"") if os.name == "posix": def preexec_fn(): @@ -175,7 +175,7 @@ class SbyTask: if self.p.poll() is not None: if not self.silent: - self.job.log("{}: finished (returncode={})".format(self.info, self.p.returncode)) + self.job.log(f"{self.info}: finished (returncode={self.p.returncode})") self.job.tasks_running.remove(self) all_tasks_running.remove(self) self.running = False @@ -183,7 +183,7 @@ class SbyTask: if self.p.returncode == 127: self.job.status = "ERROR" if not self.silent: - self.job.log("{}: COMMAND NOT FOUND. ERROR.".format(self.info)) + self.job.log(f"{self.info}: COMMAND NOT FOUND. ERROR.") self.terminated = True self.job.terminate() return @@ -193,7 +193,7 @@ class SbyTask: if self.checkretcode and self.p.returncode != 0: self.job.status = "ERROR" if not self.silent: - self.job.log("{}: job failed. ERROR.".format(self.info)) + self.job.log(f"{self.info}: job failed. ERROR.") self.terminated = True self.job.terminate() return @@ -246,13 +246,13 @@ class SbyJob: self.summary = list() - self.logfile = open("{}/logfile.txt".format(workdir), "a") + self.logfile = open(f"{workdir}/logfile.txt", "a") for line in early_logs: print(line, file=self.logfile, flush=True) if not reusedir: - with open("{}/config.sby".format(workdir), "w") as f: + with open(f"{workdir}/config.sby", "w") as f: for line in sbyconfig: print(line, file=f) @@ -283,7 +283,7 @@ class SbyJob: if self.opt_timeout is not None: total_clock_time = int(time() - self.start_clock_time) if total_clock_time > self.opt_timeout: - self.log("Reached TIMEOUT ({} seconds). Terminating all tasks.".format(self.opt_timeout)) + self.log(f"Reached TIMEOUT ({self.opt_timeout} seconds). Terminating all tasks.") self.status = "TIMEOUT" self.terminate(timeout=True) @@ -300,8 +300,8 @@ class SbyJob: if "ERROR" not in self.expect: self.retcode = 16 self.terminate() - with open("{}/{}".format(self.workdir, self.status), "w") as f: - print("ERROR: {}".format(logmessage), file=f) + with open(f"{self.workdir}/{self.status}", "w") as f: + print(f"ERROR: {logmessage}", file=f) raise SbyAbort(logmessage) def makedirs(self, path): @@ -314,7 +314,7 @@ class SbyJob: for dstfile, lines in self.verbatim_files.items(): dstfile = self.workdir + "/src/" + dstfile - self.log("Writing '{}'.".format(dstfile)) + self.log(f"Writing '{dstfile}'.") with open(dstfile, "w") as f: for line in lines: @@ -322,7 +322,7 @@ class SbyJob: for dstfile, srcfile in self.files.items(): if dstfile.startswith("/") or dstfile.startswith("../") or ("/../" in dstfile): - self.error("destination filename must be a relative path without /../: {}".format(dstfile)) + self.error(f"destination filename must be a relative path without /../: {dstfile}") dstfile = self.workdir + "/src/" + dstfile srcfile = process_filename(srcfile) @@ -331,7 +331,7 @@ class SbyJob: if basedir != "" and not os.path.exists(basedir): os.makedirs(basedir) - self.log("Copy '{}' to '{}'.".format(os.path.abspath(srcfile), os.path.abspath(dstfile))) + self.log(f"Copy '{os.path.abspath(srcfile)}' to '{os.path.abspath(dstfile)}'.") copyfile(srcfile, dstfile) def handle_str_option(self, option_name, default_value): @@ -351,19 +351,19 @@ class SbyJob: def handle_bool_option(self, option_name, default_value): if option_name in self.options: if self.options[option_name] not in ["on", "off"]: - self.error("Invalid value '{}' for boolean option {}.".format(self.options[option_name], option_name)) + self.error(f"Invalid value '{self.options[option_name]}' for boolean option {option_name}.") self.__dict__["opt_" + option_name] = self.options[option_name] == "on" self.used_options.add(option_name) else: self.__dict__["opt_" + option_name] = default_value def make_model(self, model_name): - if not os.path.isdir("{}/model".format(self.workdir)): - os.makedirs("{}/model".format(self.workdir)) + if not os.path.isdir(f"{self.workdir}/model"): + os.makedirs(f"{self.workdir}/model") if model_name in ["base", "nomem"]: - with open("{}/model/design{}.ys".format(self.workdir, "" if model_name == "base" else "_nomem"), "w") as f: - print("# running in {}/src/".format(self.workdir), file=f) + with open(f"""{self.workdir}/model/design{"" if model_name == "base" else "_nomem"}.ys""", "w") as f: + print(f"# running in {self.workdir}/src/", file=f) for cmd in self.script: print(cmd, file=f) if model_name == "base": @@ -387,19 +387,23 @@ class SbyJob: print("opt -keepdc -fast", file=f) print("check", file=f) print("hierarchy -simcheck", file=f) - print("write_ilang ../model/design{}.il".format("" if model_name == "base" else "_nomem"), file=f) - - task = SbyTask(self, model_name, [], - "cd {}/src; {} -ql ../model/design{s}.log ../model/design{s}.ys".format(self.workdir, self.exe_paths["yosys"], - s="" if model_name == "base" else "_nomem")) + print(f"""write_ilang ../model/design{"" if model_name == "base" else "_nomem"}.il""", file=f) + + task = SbyTask( + self, + model_name, + [], + "cd {}/src; {} -ql ../model/design{s}.log ../model/design{s}.ys".format(self.workdir, self.exe_paths["yosys"], + s="" if model_name == "base" else "_nomem") + ) task.checkretcode = True return [task] if re.match(r"^smt2(_syn)?(_nomem)?(_stbv|_stdt)?$", model_name): - with open("{}/model/design_{}.ys".format(self.workdir, model_name), "w") as f: - print("# running in {}/model/".format(self.workdir), file=f) - print("read_ilang design{}.il".format("_nomem" if "_nomem" in model_name else ""), file=f) + with open(f"{self.workdir}/model/design_{model_name}.ys", "w") as f: + print(f"# running in {self.workdir}/model/", file=f) + print(f"""read_ilang design{"_nomem" if "_nomem" in model_name else ""}.il""", file=f) if "_syn" in model_name: print("techmap", file=f) print("opt -fast", file=f) @@ -408,22 +412,26 @@ class SbyJob: print("dffunmap", file=f) print("stat", file=f) if "_stbv" in model_name: - print("write_smt2 -stbv -wires design_{}.smt2".format(model_name), file=f) + print(f"write_smt2 -stbv -wires design_{model_name}.smt2", file=f) elif "_stdt" in model_name: - print("write_smt2 -stdt -wires design_{}.smt2".format(model_name), file=f) + print(f"write_smt2 -stdt -wires design_{model_name}.smt2", file=f) else: - print("write_smt2 -wires design_{}.smt2".format(model_name), file=f) - - task = SbyTask(self, model_name, self.model("nomem" if "_nomem" in model_name else "base"), - "cd {}/model; {} -ql design_{s}.log design_{s}.ys".format(self.workdir, self.exe_paths["yosys"], s=model_name)) + print(f"write_smt2 -wires design_{model_name}.smt2", file=f) + + task = SbyTask( + self, + model_name, + self.model("nomem" if "_nomem" in model_name else "base"), + "cd {}/model; {} -ql design_{s}.log design_{s}.ys".format(self.workdir, self.exe_paths["yosys"], s=model_name) + ) task.checkretcode = True return [task] if re.match(r"^btor(_syn)?(_nomem)?$", model_name): - with open("{}/model/design_{}.ys".format(self.workdir, model_name), "w") as f: - print("# running in {}/model/".format(self.workdir), file=f) - print("read_ilang design{}.il".format("_nomem" if "_nomem" in model_name else ""), file=f) + with open(f"{self.workdir}/model/design_{model_name}.ys", "w") as f: + print(f"# running in {self.workdir}/model/", file=f) + print(f"""read_ilang design{"_nomem" if "_nomem" in model_name else ""}.il""", file=f) print("flatten", file=f) print("setundef -undriven -anyseq", file=f) if "_syn" in model_name: @@ -439,15 +447,19 @@ class SbyJob: print("stat", file=f) print("write_btor {}-i design_{m}.info design_{m}.btor".format("-c " if self.opt_mode == "cover" else "", m=model_name), file=f) - task = SbyTask(self, model_name, self.model("nomem" if "_nomem" in model_name else "base"), - "cd {}/model; {} -ql design_{s}.log design_{s}.ys".format(self.workdir, self.exe_paths["yosys"], s=model_name)) + task = SbyTask( + self, + model_name, + self.model("nomem" if "_nomem" in model_name else "base"), + "cd {}/model; {} -ql design_{s}.log design_{s}.ys".format(self.workdir, self.exe_paths["yosys"], s=model_name) + ) task.checkretcode = True return [task] if model_name == "aig": - with open("{}/model/design_aiger.ys".format(self.workdir), "w") as f: - print("# running in {}/model/".format(self.workdir), file=f) + with open(f"{self.workdir}/model/design_aiger.ys", "w") as f: + print(f"# running in {self.workdir}/model/", file=f) print("read_ilang design_nomem.il", file=f) print("flatten", file=f) print("setundef -undriven -anyseq", file=f) @@ -462,8 +474,12 @@ class SbyJob: print("stat", file=f) print("write_aiger -I -B -zinit -map design_aiger.aim design_aiger.aig", file=f) - task = SbyTask(self, "aig", self.model("nomem"), - "cd {}/model; {} -ql design_aiger.log design_aiger.ys".format(self.workdir, self.exe_paths["yosys"])) + task = SbyTask( + self, + "aig", + self.model("nomem"), + f"""cd {self.workdir}/model; {self.exe_paths["yosys"]} -ql design_aiger.log design_aiger.ys""" + ) task.checkretcode = True return [task] @@ -506,7 +522,7 @@ class SbyJob: mode = None key = None - with open("{}/config.sby".format(self.workdir), "r") as f: + with open(f"{self.workdir}/config.sby", "r") as f: for line in f: raw_line = line if mode in ["options", "engines", "files"]: @@ -522,48 +538,48 @@ class SbyJob: if match: entries = match.group(1).split() if len(entries) == 0: - self.error("sby file syntax error: {}".format(line)) + self.error(f"sby file syntax error: {line}") if entries[0] == "options": mode = "options" if len(self.options) != 0 or len(entries) != 1: - self.error("sby file syntax error: {}".format(line)) + self.error(f"sby file syntax error: {line}") continue if entries[0] == "engines": mode = "engines" if len(self.engines) != 0 or len(entries) != 1: - self.error("sby file syntax error: {}".format(line)) + self.error(f"sby file syntax error: {line}") continue if entries[0] == "script": mode = "script" if len(self.script) != 0 or len(entries) != 1: - self.error("sby file syntax error: {}".format(line)) + self.error(f"sby file syntax error: {line}") continue if entries[0] == "file": mode = "file" if len(entries) != 2: - self.error("sby file syntax error: {}".format(line)) + self.error(f"sby file syntax error: {line}") current_verbatim_file = entries[1] if current_verbatim_file in self.verbatim_files: - self.error("duplicate file: {}".format(entries[1])) + self.error(f"duplicate file: {entries[1]}") self.verbatim_files[current_verbatim_file] = list() continue if entries[0] == "files": mode = "files" if len(entries) != 1: - self.error("sby file syntax error: {}".format(line)) + self.error(f"sby file syntax error: {line}") continue - self.error("sby file syntax error: {}".format(line)) + self.error(f"sby file syntax error: {line}") if mode == "options": entries = line.split() if len(entries) != 2: - self.error("sby file syntax error: {}".format(line)) + self.error(f"sby file syntax error: {line}") self.options[entries[0]] = entries[1] continue @@ -583,19 +599,19 @@ class SbyJob: elif len(entries) == 2: self.files[entries[0]] = entries[1] else: - self.error("sby file syntax error: {}".format(line)) + self.error(f"sby file syntax error: {line}") continue if mode == "file": self.verbatim_files[current_verbatim_file].append(raw_line) continue - self.error("sby file syntax error: {}".format(line)) + self.error(f"sby file syntax error: {line}") self.handle_str_option("mode", None) if self.opt_mode not in ["bmc", "prove", "cover", "live"]: - self.error("Invalid mode: {}".format(self.opt_mode)) + self.error(f"Invalid mode: {self.opt_mode}") self.expect = ["PASS"] if "expect" in self.options: @@ -604,7 +620,7 @@ class SbyJob: for s in self.expect: if s not in ["PASS", "FAIL", "UNKNOWN", "ERROR", "TIMEOUT"]: - self.error("Invalid expect value: {}".format(s)) + self.error(f"Invalid expect value: {s}") self.handle_bool_option("multiclock", False) self.handle_bool_option("wait", False) @@ -631,7 +647,7 @@ class SbyJob: self.error("Config file is lacking engine configuration.") if self.reusedir: - rmtree("{}/model".format(self.workdir), ignore_errors=True) + rmtree(f"{self.workdir}/model", ignore_errors=True) else: self.copy_src() @@ -660,7 +676,7 @@ class SbyJob: for opt in self.options.keys(): if opt not in self.used_options: - self.error("Unused option: {}".format(opt)) + self.error(f"Unused option: {opt}") self.taskloop() @@ -685,7 +701,7 @@ class SbyJob: ] + self.summary for line in self.summary: - self.log("summary: {}".format(line)) + self.log(f"summary: {line}") assert self.status in ["PASS", "FAIL", "UNKNOWN", "ERROR", "TIMEOUT"] @@ -698,6 +714,6 @@ class SbyJob: if self.status == "TIMEOUT": self.retcode = 8 if self.status == "ERROR": self.retcode = 16 - with open("{}/{}".format(self.workdir, self.status), "w") as f: + with open(f"{self.workdir}/{self.status}", "w") as f: for line in self.summary: print(line, file=f) diff --git a/sbysrc/sby_engine_abc.py b/sbysrc/sby_engine_abc.py index 49d044d..919cc35 100644 --- a/sbysrc/sby_engine_abc.py +++ b/sbysrc/sby_engine_abc.py @@ -31,24 +31,27 @@ def run(mode, job, engine_idx, engine): if abc_command[0] == "bmc3": if mode != "bmc": job.error("ABC command 'bmc3' is only valid in bmc mode.") - abc_command[0] += " -F {} -v".format(job.opt_depth) + abc_command[0] += f" -F {job.opt_depth} -v" elif abc_command[0] == "sim3": if mode != "bmc": job.error("ABC command 'sim3' is only valid in bmc mode.") - abc_command[0] += " -F {} -v".format(job.opt_depth) + abc_command[0] += f" -F {job.opt_depth} -v" elif abc_command[0] == "pdr": if mode != "prove": job.error("ABC command 'pdr' is only valid in prove mode.") else: - job.error("Invalid ABC command {}.".format(abc_command[0])) + job.error(f"Invalid ABC command {abc_command[0]}.") - task = SbyTask(job, "engine_{}".format(engine_idx), job.model("aig"), - ("cd {}; {} -c 'read_aiger model/design_aiger.aig; fold; strash; {}; write_cex -a engine_{}/trace.aiw'").format - (job.workdir, job.exe_paths["abc"], " ".join(abc_command), engine_idx), - logfile=open("{}/engine_{}/logfile.txt".format(job.workdir, engine_idx), "w")) + task = SbyTask( + job, + f"engine_{engine_idx}", + job.model("aig"), + f"""cd {job.workdir}; {job.exe_paths["abc"]} -c 'read_aiger model/design_aiger.aig; fold; strash; {" ".join(abc_command)}; write_cex -a engine_{engine_idx}/trace.aiw'""", + logfile=open(f"{job.workdir}/engine_{engine_idx}/logfile.txt", "w") + ) task.noprintregex = re.compile(r"^\.+$") task_status = None @@ -78,19 +81,23 @@ def run(mode, job, engine_idx, engine): assert task_status is not None job.update_status(task_status) - job.log("engine_{}: Status returned by engine: {}".format(engine_idx, task_status)) - job.summary.append("engine_{} ({}) returned {}".format(engine_idx, " ".join(engine), task_status)) + job.log(f"engine_{engine_idx}: Status returned by engine: {task_status}") + job.summary.append(f"""engine_{engine_idx} ({" ".join(engine)}) returned {task_status}""") job.terminate() if task_status == "FAIL" and job.opt_aigsmt != "none": - task2 = SbyTask(job, "engine_{}".format(engine_idx), job.model("smt2"), - ("cd {}; {} -s {}{} --noprogress --append {} --dump-vcd engine_{i}/trace.vcd --dump-vlogtb engine_{i}/trace_tb.v " + + task2 = SbyTask( + job, + f"engine_{engine_idx}", + job.model("smt2"), + ("cd {}; {} -s {}{} --noprogress --append {} --dump-vcd engine_{i}/trace.vcd --dump-vlogtb engine_{i}/trace_tb.v " + "--dump-smtc engine_{i}/trace.smtc --aig model/design_aiger.aim:engine_{i}/trace.aiw --aig-noheader model/design_smt2.smt2").format (job.workdir, job.exe_paths["smtbmc"], job.opt_aigsmt, - "" if job.opt_tbtop is None else " --vlogtb-top {}".format(job.opt_tbtop), + "" if job.opt_tbtop is None else f" --vlogtb-top {job.opt_tbtop}", job.opt_append, i=engine_idx), - logfile=open("{}/engine_{}/logfile2.txt".format(job.workdir, engine_idx), "w")) + logfile=open(f"{job.workdir}/engine_{engine_idx}/logfile2.txt", "w") + ) task2_status = None @@ -109,8 +116,8 @@ def run(mode, job, engine_idx, engine): assert task2_status is not None assert task2_status == "FAIL" - if os.path.exists("{}/engine_{}/trace.vcd".format(job.workdir, engine_idx)): - job.summary.append("counterexample trace: {}/engine_{}/trace.vcd".format(job.workdir, engine_idx)) + if os.path.exists(f"{job.workdir}/engine_{engine_idx}/trace.vcd"): + job.summary.append(f"counterexample trace: {job.workdir}/engine_{engine_idx}/trace.vcd") task2.output_callback = output_callback2 task2.exit_callback = exit_callback2 diff --git a/sbysrc/sby_engine_aiger.py b/sbysrc/sby_engine_aiger.py index dbe05c5..7a9510d 100644 --- a/sbysrc/sby_engine_aiger.py +++ b/sbysrc/sby_engine_aiger.py @@ -40,16 +40,20 @@ def run(mode, job, engine_idx, engine): solver_cmd = " ".join([job.exe_paths["aigbmc"]] + solver_args[1:]) else: - job.error("Invalid solver command {}.".format(solver_args[0])) + job.error(f"Invalid solver command {solver_args[0]}.") - task = SbyTask(job, "engine_{}".format(engine_idx), job.model("aig"), - "cd {}; {} model/design_aiger.aig".format(job.workdir, solver_cmd), - logfile=open("{}/engine_{}/logfile.txt".format(job.workdir, engine_idx), "w")) + task = SbyTask( + job, + f"engine_{engine_idx}", + job.model("aig"), + f"cd {job.workdir}; {solver_cmd} model/design_aiger.aig", + logfile=open(f"{job.workdir}/engine_{engine_idx}/logfile.txt", "w") + ) task_status = None produced_cex = False end_of_cex = False - aiw_file = open("{}/engine_{}/trace.aiw".format(job.workdir, engine_idx), "w") + aiw_file = open(f"{job.workdir}/engine_{engine_idx}/trace.aiw", "w") def output_callback(line): nonlocal task_status @@ -66,7 +70,7 @@ def run(mode, job, engine_idx, engine): return None if line.startswith("u"): - return "No CEX up to depth {}.".format(int(line[1:])-1) + return f"No CEX up to depth {int(line[1:])-1}." if line in ["0", "1", "2"]: print(line, file=aiw_file) @@ -84,29 +88,37 @@ def run(mode, job, engine_idx, engine): aiw_file.close() job.update_status(task_status) - job.log("engine_{}: Status returned by engine: {}".format(engine_idx, task_status)) - job.summary.append("engine_{} ({}) returned {}".format(engine_idx, " ".join(engine), task_status)) + job.log(f"engine_{engine_idx}: Status returned by engine: {task_status}") + job.summary.append(f"""engine_{engine_idx} ({" ".join(engine)}) returned {task_status}""") job.terminate() if task_status == "FAIL" and job.opt_aigsmt != "none": if produced_cex: if mode == "live": - task2 = SbyTask(job, "engine_{}".format(engine_idx), job.model("smt2"), - ("cd {}; {} -g -s {}{} --noprogress --dump-vcd engine_{i}/trace.vcd --dump-vlogtb engine_{i}/trace_tb.v " + + task2 = SbyTask( + job, + f"engine_{engine_idx}", + job.model("smt2"), + ("cd {}; {} -g -s {}{} --noprogress --dump-vcd engine_{i}/trace.vcd --dump-vlogtb engine_{i}/trace_tb.v " + "--dump-smtc engine_{i}/trace.smtc --aig model/design_aiger.aim:engine_{i}/trace.aiw model/design_smt2.smt2").format (job.workdir, job.exe_paths["smtbmc"], job.opt_aigsmt, - "" if job.opt_tbtop is None else " --vlogtb-top {}".format(job.opt_tbtop), + "" if job.opt_tbtop is None else f" --vlogtb-top {job.opt_tbtop}", i=engine_idx), - logfile=open("{}/engine_{}/logfile2.txt".format(job.workdir, engine_idx), "w")) + logfile=open(f"{job.workdir}/engine_{engine_idx}/logfile2.txt", "w") + ) else: - task2 = SbyTask(job, "engine_{}".format(engine_idx), job.model("smt2"), - ("cd {}; {} -s {}{} --noprogress --append {} --dump-vcd engine_{i}/trace.vcd --dump-vlogtb engine_{i}/trace_tb.v " + + task2 = SbyTask( + job, + f"engine_{engine_idx}", + job.model("smt2"), + ("cd {}; {} -s {}{} --noprogress --append {} --dump-vcd engine_{i}/trace.vcd --dump-vlogtb engine_{i}/trace_tb.v " + "--dump-smtc engine_{i}/trace.smtc --aig model/design_aiger.aim:engine_{i}/trace.aiw model/design_smt2.smt2").format (job.workdir, job.exe_paths["smtbmc"], job.opt_aigsmt, - "" if job.opt_tbtop is None else " --vlogtb-top {}".format(job.opt_tbtop), + "" if job.opt_tbtop is None else f" --vlogtb-top {job.opt_tbtop}", job.opt_append, i=engine_idx), - logfile=open("{}/engine_{}/logfile2.txt".format(job.workdir, engine_idx), "w")) + logfile=open(f"{job.workdir}/engine_{engine_idx}/logfile2.txt", "w") + ) task2_status = None @@ -128,14 +140,14 @@ def run(mode, job, engine_idx, engine): else: assert task2_status == "FAIL" - if os.path.exists("{}/engine_{}/trace.vcd".format(job.workdir, engine_idx)): - job.summary.append("counterexample trace: {}/engine_{}/trace.vcd".format(job.workdir, engine_idx)) + if os.path.exists(f"{job.workdir}/engine_{engine_idx}/trace.vcd"): + job.summary.append(f"counterexample trace: {job.workdir}/engine_{engine_idx}/trace.vcd") task2.output_callback = output_callback2 task2.exit_callback = exit_callback2 else: - job.log("engine_{}: Engine did not produce a counter example.".format(engine_idx)) + job.log(f"engine_{engine_idx}: Engine did not produce a counter example.") task.output_callback = output_callback task.exit_callback = exit_callback diff --git a/sbysrc/sby_engine_btor.py b/sbysrc/sby_engine_btor.py index fb80797..ec64536 100644 --- a/sbysrc/sby_engine_btor.py +++ b/sbysrc/sby_engine_btor.py @@ -37,19 +37,19 @@ def run(mode, job, engine_idx, engine): if solver_args[0] == "btormc": solver_cmd = "" if random_seed: - solver_cmd += "BTORSEED={} ".format(random_seed) - solver_cmd += job.exe_paths["btormc"] + " --stop-first {} -v 1 -kmax {}".format(0 if mode == "cover" else 1, job.opt_depth - 1) + solver_cmd += f"BTORSEED={random_seed} " + solver_cmd += job.exe_paths["btormc"] + f""" --stop-first {0 if mode == "cover" else 1} -v 1 -kmax {job.opt_depth - 1}""" if job.opt_skip is not None: - solver_cmd += " -kmin {}".format(job.opt_skip) + solver_cmd += f" -kmin {job.opt_skip}" solver_cmd += " ".join([""] + solver_args[1:]) elif solver_args[0] == "pono": if random_seed: job.error("Setting the random seed is not available for the pono solver.") - solver_cmd = job.exe_paths["pono"] + " -v 1 -e bmc -k {}".format(job.opt_depth - 1) + solver_cmd = job.exe_paths["pono"] + f" -v 1 -e bmc -k {job.opt_depth - 1}" else: - job.error("Invalid solver command {}.".format(solver_args[0])) + job.error(f"Invalid solver command {solver_args[0]}.") common_state = SimpleNamespace() common_state.solver_status = None @@ -72,7 +72,7 @@ def run(mode, job, engine_idx, engine): elif common_state.solver_status == "unsat": task_status = "FAIL" else: - job.error("engine_{}: Engine terminated without status.".format(engine_idx)) + job.error(f"engine_{engine_idx}: Engine terminated without status.") else: if common_state.expected_cex == 0: task_status = "pass" @@ -81,20 +81,20 @@ def run(mode, job, engine_idx, engine): elif common_state.solver_status == "unsat": task_status = "pass" else: - job.error("engine_{}: Engine terminated without status.".format(engine_idx)) + job.error(f"engine_{engine_idx}: Engine terminated without status.") job.update_status(task_status.upper()) - job.log("engine_{}: Status returned by engine: {}".format(engine_idx, task_status)) - job.summary.append("engine_{} ({}) returned {}".format(engine_idx, " ".join(engine), task_status)) + job.log(f"engine_{engine_idx}: Status returned by engine: {task_status}") + job.summary.append(f"""engine_{engine_idx} ({" ".join(engine)}) returned {task_status}""") if len(common_state.produced_traces) == 0: - job.log("engine_{}: Engine did not produce a{}example.".format(engine_idx, " counter" if mode != "cover" else "n ")) + job.log(f"""engine_{engine_idx}: Engine did not produce a{" counter" if mode != "cover" else "n "}example.""") elif len(common_state.produced_traces) <= common_state.print_traces_max: job.summary.extend(common_state.produced_traces) else: job.summary.extend(common_state.produced_traces[:common_state.print_traces_max]) excess_traces = len(common_state.produced_traces) - common_state.print_traces_max - job.summary.append("and {} further trace{}".format(excess_traces, "s" if excess_traces > 1 else "")) + job.summary.append(f"""and {excess_traces} further trace{"s" if excess_traces > 1 else ""}""") job.terminate() @@ -112,9 +112,9 @@ def run(mode, job, engine_idx, engine): def exit_callback2(retcode): assert retcode == 0 - vcdpath = "{}/engine_{}/trace{}.vcd".format(job.workdir, engine_idx, suffix) + vcdpath = f"{job.workdir}/engine_{engine_idx}/trace{suffix}.vcd" if os.path.exists(vcdpath): - common_state.produced_traces.append("{}trace: {}".format("" if mode == "cover" else "counterexample ", vcdpath)) + common_state.produced_traces.append(f"""{"" if mode == "cover" else "counterexample "}trace: {vcdpath}""") common_state.running_tasks -= 1 if (common_state.running_tasks == 0): @@ -131,14 +131,14 @@ def run(mode, job, engine_idx, engine): assert common_state.produced_cex == 0 else: - job.error("engine_{}: BTOR solver '{}' is currently not supported in cover mode.".format(engine_idx, solver_args[0])) + job.error(f"engine_{engine_idx}: BTOR solver '{solver_args[0]}' is currently not supported in cover mode.") if (common_state.produced_cex < common_state.expected_cex) and line == "sat": assert common_state.wit_file == None if common_state.expected_cex == 1: - common_state.wit_file = open("{}/engine_{}/trace.wit".format(job.workdir, engine_idx), "w") + common_state.wit_file = open(f"{job.workdir}/engine_{engine_idx}/trace.wit", "w") else: - common_state.wit_file = open("{}/engine_{}/trace{}.wit".format(job.workdir, engine_idx, common_state.produced_cex), "w") + common_state.wit_file = open(f"""{job.workdir}/engine_{engine_idx}/trace{common_state.produced_cex}.wit""", "w") if solver_args[0] != "btormc": task.log("Found satisfiability witness.") @@ -149,9 +149,13 @@ def run(mode, job, engine_idx, engine): suffix = "" else: suffix = common_state.produced_cex - task2 = SbyTask(job, "engine_{}_{}".format(engine_idx, common_state.produced_cex), job.model("btor"), - "cd {dir} ; btorsim -c --vcd engine_{idx}/trace{i}.vcd --hierarchical-symbols --info model/design_btor.info model/design_btor.btor engine_{idx}/trace{i}.wit".format(dir=job.workdir, idx=engine_idx, i=suffix), - logfile=open("{dir}/engine_{idx}/logfile2.txt".format(dir=job.workdir, idx=engine_idx), "w")) + task2 = SbyTask( + job, + f"engine_{engine_idx}_{common_state.produced_cex}", + job.model("btor"), + "cd {dir} ; btorsim -c --vcd engine_{idx}/trace{i}.vcd --hierarchical-symbols --info model/design_btor.info model/design_btor.btor engine_{idx}/trace{i}.wit".format(dir=job.workdir, idx=engine_idx, i=suffix), + logfile=open(f"{job.workdir}/engine_{engine_idx}/logfile2.txt", "w") + ) task2.output_callback = output_callback2 task2.exit_callback = make_exit_callback(suffix) task2.checkretcode = True @@ -198,20 +202,23 @@ def run(mode, job, engine_idx, engine): if common_state.solver_status == "unsat": if common_state.expected_cex == 1: - with open("{}/engine_{}/trace.wit".format(job.workdir, engine_idx), "w") as wit_file: + with open(f"""{job.workdir}/engine_{engine_idx}/trace.wit""", "w") as wit_file: print("unsat", file=wit_file) else: for i in range(common_state.produced_cex, common_state.expected_cex): - with open("{}/engine_{}/trace{}.wit".format(job.workdir, engine_idx, i), "w") as wit_file: + with open(f"{job.workdir}/engine_{engine_idx}/trace{i}.wit", "w") as wit_file: print("unsat", file=wit_file) common_state.running_tasks -= 1 if (common_state.running_tasks == 0): print_traces_and_terminate() - task = SbyTask(job, "engine_{}".format(engine_idx), job.model("btor"), - "cd {}; {} model/design_btor.btor".format(job.workdir, solver_cmd), - logfile=open("{}/engine_{}/logfile.txt".format(job.workdir, engine_idx), "w")) + task = SbyTask( + job, + f"engine_{engine_idx}", job.model("btor"), + f"cd {job.workdir}; {solver_cmd} model/design_btor.btor", + logfile=open("{job.workdir}/engine_{engine_idx}/logfile.txt", "w") + ) task.output_callback = output_callback task.exit_callback = exit_callback diff --git a/sbysrc/sby_engine_smtbmc.py b/sbysrc/sby_engine_smtbmc.py index 6d2ffc4..c21a3f7 100644 --- a/sbysrc/sby_engine_smtbmc.py +++ b/sbysrc/sby_engine_smtbmc.py @@ -68,7 +68,7 @@ def run(mode, job, engine_idx, engine): elif o == "--seed": random_seed = a else: - job.error("Invalid smtbmc options {}.".format(o)) + job.error(f"Invalid smtbmc options {o}.") xtra_opts = False for i, a in enumerate(args): @@ -89,7 +89,7 @@ def run(mode, job, engine_idx, engine): smtbmc_opts += ["--unroll"] if job.opt_smtc is not None: - smtbmc_opts += ["--smtc", "src/{}".format(job.opt_smtc)] + smtbmc_opts += ["--smtc", f"src/{job.opt_smtc}"] if job.opt_tbtop is not None: smtbmc_opts += ["--vlogtb-top", job.opt_tbtop] @@ -107,9 +107,9 @@ def run(mode, job, engine_idx, engine): run("prove_induction", job, engine_idx, engine) return - taskname = "engine_{}".format(engine_idx) - trace_prefix = "engine_{}/trace".format(engine_idx) - logfile_prefix = "{}/engine_{}/logfile".format(job.workdir, engine_idx) + taskname = f"engine_{engine_idx}" + trace_prefix = f"engine_{engine_idx}/trace" + logfile_prefix = f"{job.workdir}/engine_{engine_idx}/logfile" if mode == "prove_basecase": taskname += ".basecase" @@ -137,10 +137,15 @@ def run(mode, job, engine_idx, engine): else: t_opt = "{}".format(job.opt_depth) - task = SbyTask(job, taskname, job.model(model_name), - "cd {}; {} {} -t {} {} --append {} --dump-vcd {p}.vcd --dump-vlogtb {p}_tb.v --dump-smtc {p}.smtc model/design_{}.smt2".format - (job.workdir, job.exe_paths["smtbmc"], " ".join(smtbmc_opts), t_opt, "--info \"(set-option :random-seed {})\"".format(random_seed) if random_seed else "", job.opt_append, model_name, p=trace_prefix), - logfile=open(logfile_prefix + ".txt", "w"), logstderr=(not progress)) + random_seed = f"--info \"(set-option :random-seed {random_seed})\"" if random_seed else "" + task = SbyTask( + job, + taskname, + job.model(model_name), + f"""cd {job.workdir}; {job.exe_paths["smtbmc"]} {" ".join(smtbmc_opts)} -t {t_opt} {random_seed} --append {job.opt_append} --dump-vcd {trace_prefix}.vcd --dump-vlogtb {trace_prefix}_tb.v --dump-smtc {trace_prefix}.smtc model/design_{model_name}.smt2""", + logfile=open(logfile_prefix + ".txt", "w"), + logstderr=(not progress) + ) if mode == "prove_basecase": job.basecase_tasks.append(task) @@ -177,37 +182,37 @@ def run(mode, job, engine_idx, engine): def exit_callback(retcode): if task_status is None: - job.error("engine_{}: Engine terminated without status.".format(engine_idx)) + job.error(f"engine_{engine_idx}: Engine terminated without status.") if mode == "bmc" or mode == "cover": job.update_status(task_status) task_status_lower = task_status.lower() if task_status == "PASS" else task_status - job.log("engine_{}: Status returned by engine: {}".format(engine_idx, task_status_lower)) - job.summary.append("engine_{} ({}) returned {}".format(engine_idx, " ".join(engine), task_status_lower)) + job.log(f"engine_{engine_idx}: Status returned by engine: {task_status_lower}") + job.summary.append(f"""engine_{engine_idx} ({" ".join(engine)}) returned {task_status_lower}""") if task_status == "FAIL" and mode != "cover": - if os.path.exists("{}/engine_{}/trace.vcd".format(job.workdir, engine_idx)): - job.summary.append("counterexample trace: {}/engine_{}/trace.vcd".format(job.workdir, engine_idx)) + if os.path.exists(f"{job.workdir}/engine_{engine_idx}/trace.vcd"): + job.summary.append(f"counterexample trace: {job.workdir}/engine_{engine_idx}/trace.vcd") elif task_status == "PASS" and mode == "cover": print_traces_max = 5 for i in range(print_traces_max): - if os.path.exists("{}/engine_{}/trace{}.vcd".format(job.workdir, engine_idx, i)): - job.summary.append("trace: {}/engine_{}/trace{}.vcd".format(job.workdir, engine_idx, i)) + if os.path.exists(f"{job.workdir}/engine_{engine_idx}/trace{i}.vcd"): + job.summary.append(f"trace: {job.workdir}/engine_{engine_idx}/trace{i}.vcd") else: break else: excess_traces = 0 - while os.path.exists("{}/engine_{}/trace{}.vcd".format(job.workdir, engine_idx, print_traces_max + excess_traces)): + while os.path.exists(f"{job.workdir}/engine_{engine_idx}/trace{print_traces_max + excess_traces}.vcd"): excess_traces += 1 if excess_traces > 0: - job.summary.append("and {} further trace{}".format(excess_traces, "s" if excess_traces > 1 else "")) + job.summary.append(f"""and {excess_traces} further trace{"s" if excess_traces > 1 else ""}""") job.terminate() elif mode in ["prove_basecase", "prove_induction"]: task_status_lower = task_status.lower() if task_status == "PASS" else task_status - job.log("engine_{}: Status returned by engine for {}: {}".format(engine_idx, mode.split("_")[1], task_status_lower)) - job.summary.append("engine_{} ({}) returned {} for {}".format(engine_idx, " ".join(engine), task_status_lower, mode.split("_")[1])) + job.log(f"""engine_{engine_idx}: Status returned by engine for {mode.split("_")[1]}: {task_status_lower}""") + job.summary.append(f"""engine_{engine_idx} ({" ".join(engine)}) returned {task_status_lower} for {mode.split("_")[1]}""") if mode == "prove_basecase": for task in job.basecase_tasks: @@ -218,8 +223,8 @@ def run(mode, job, engine_idx, engine): else: job.update_status(task_status) - if os.path.exists("{}/engine_{}/trace.vcd".format(job.workdir, engine_idx)): - job.summary.append("counterexample trace: {}/engine_{}/trace.vcd".format(job.workdir, engine_idx)) + if os.path.exists(f"{job.workdir}/engine_{engine_idx}/trace.vcd"): + job.summary.append(f"counterexample trace: {job.workdir}/engine_{engine_idx}/trace.vcd") job.terminate() elif mode == "prove_induction": diff --git a/sbysrc/sby_mode_bmc.py b/sbysrc/sby_mode_bmc.py index 20ffe23..eed3f4a 100644 --- a/sbysrc/sby_mode_bmc.py +++ b/sbysrc/sby_mode_bmc.py @@ -28,8 +28,8 @@ def run(job): engine = job.engines[engine_idx] assert len(engine) > 0 - job.log("engine_{}: {}".format(engine_idx, " ".join(engine))) - job.makedirs("{}/engine_{}".format(job.workdir, engine_idx)) + job.log(f"""engine_{engine_idx}: {" ".join(engine)}""") + job.makedirs(f"{job.workdir}/engine_{engine_idx}") if engine[0] == "smtbmc": import sby_engine_smtbmc @@ -44,4 +44,4 @@ def run(job): sby_engine_btor.run("bmc", job, engine_idx, engine) else: - job.error("Invalid engine '{}' for bmc mode.".format(engine[0])) + job.error(f"Invalid engine '{engine[0]}' for bmc mode.") diff --git a/sbysrc/sby_mode_cover.py b/sbysrc/sby_mode_cover.py index dbefac2..a078370 100644 --- a/sbysrc/sby_mode_cover.py +++ b/sbysrc/sby_mode_cover.py @@ -27,8 +27,8 @@ def run(job): engine = job.engines[engine_idx] assert len(engine) > 0 - job.log("engine_{}: {}".format(engine_idx, " ".join(engine))) - job.makedirs("{}/engine_{}".format(job.workdir, engine_idx)) + job.log(f"""engine_{engine_idx}: {" ".join(engine)}""") + job.makedirs(f"{job.workdir}/engine_{engine_idx}") if engine[0] == "smtbmc": import sby_engine_smtbmc @@ -39,4 +39,4 @@ def run(job): sby_engine_btor.run("cover", job, engine_idx, engine) else: - job.error("Invalid engine '{}' for cover mode.".format(engine[0])) + job.error(f"Invalid engine '{engine[0]}' for cover mode.") diff --git a/sbysrc/sby_mode_live.py b/sbysrc/sby_mode_live.py index 9a046cb..c58f562 100644 --- a/sbysrc/sby_mode_live.py +++ b/sbysrc/sby_mode_live.py @@ -28,12 +28,12 @@ def run(job): engine = job.engines[engine_idx] assert len(engine) > 0 - job.log("engine_{}: {}".format(engine_idx, " ".join(engine))) - job.makedirs("{}/engine_{}".format(job.workdir, engine_idx)) + job.log(f"""engine_{engine_idx}: {" ".join(engine)}""") + job.makedirs(f"{job.workdir}/engine_{engine_idx}") if engine[0] == "aiger": import sby_engine_aiger sby_engine_aiger.run("live", job, engine_idx, engine) else: - job.error("Invalid engine '{}' for live mode.".format(engine[0])) + job.error(f"Invalid engine '{engine[0]}' for live mode.") diff --git a/sbysrc/sby_mode_prove.py b/sbysrc/sby_mode_prove.py index f3c3ac9..6591e0d 100644 --- a/sbysrc/sby_mode_prove.py +++ b/sbysrc/sby_mode_prove.py @@ -35,8 +35,8 @@ def run(job): engine = job.engines[engine_idx] assert len(engine) > 0 - job.log("engine_{}: {}".format(engine_idx, " ".join(engine))) - job.makedirs("{}/engine_{}".format(job.workdir, engine_idx)) + job.log(f"""engine_{engine_idx}: {" ".join(engine)}""") + job.makedirs(f"{job.workdir}/engine_{engine_idx}") if engine[0] == "smtbmc": import sby_engine_smtbmc @@ -51,4 +51,4 @@ def run(job): sby_engine_abc.run("prove", job, engine_idx, engine) else: - job.error("Invalid engine '{}' for prove mode.".format(engine[0])) + job.error(f"Invalid engine '{engine[0]}' for prove mode.") -- 2.30.2