feat(WIP): clean up code and TODOs, add clean command

Co-authored-by: Anja Rabich <a.rabich@uni-luebeck.de>
This commit is contained in:
Benjamin Lipp
2025-08-12 18:02:20 +02:00
parent a4ff3d4eb5
commit b1ac5d9244
2 changed files with 80 additions and 123 deletions

View File

@@ -20,21 +20,28 @@
* ~find the files~
* ~start subprocesses in parallel~
* ~wait for them to finish~
* ~~rebase from main~~
* ~~see if we still need the `extra_args is None` check in `_run_proverif`~`
* ~~set colors differently to prevent injection attack~~
* ~~by calling a function~~
* ~~by prepared statements~~
* ~~standalone function parse_result_line is no longer necessary~~
* ~~is the clean function still necessary?~~
* ~~implement better main function for click~~
## Next Steps
* ~~rebase from main~~
* see if we still need the `extra_args is None` check in `_run_proverif`
* set colors differently to prevent injection attack
* by calling a function
* by prepared statements
* standalone function parse_result_line is no longer necessary
* is the clean function still necessary?
* implement better main function for click
* why does analyze fail when the target/proverif directory is not empty?
* return an exit status that is meaningful for CI
* exception handling in analyze() and in run_proverif()
* refactor filtering in run_proverif (see karo's comment)
* configurable target directory
* do not assume that the repo path has subdir analysis and marzipan
* integrate marzipan.awk into Python, somehow
* rewrite marzipan.awk into Python/LARK
* rewrite cpp into Python/LARK
* integrate the Nix flake into the main Nix flake
* pull the gawk dependency into the Nix flake
* think about next steps
* integrate this upstream, into the CI?
* “make it beautiful” steps? more resiliency to working directory?

View File

@@ -2,6 +2,7 @@ from .util import pkgs, setup_exports, export, rename
#from rich.console import Console
import click
target_subdir = "target/proverif"
(__all__, export) = setup_exports()
export(setup_exports)
@@ -11,6 +12,11 @@ console = pkgs.rich.console.Console()
logger = pkgs.logging.getLogger(__name__)
@click.group()
def main():
pkgs.logging.basicConfig(level=pkgs.logging.DEBUG)
def eprint(*args, **kwargs):
print(*args, **{"file": pkgs.sys.stderr, **kwargs})
@@ -28,13 +34,7 @@ def exc_piped(argv, **kwargs):
return pkgs.subprocess.Popen(argv, **kwargs)
@click.command()
@click.argument("file")
@click.argument("extra_args", required=False)
def run_proverif(file, extra_args=[]):
_run_proverif(file, extra_args)
def _run_proverif(file, extra_args=[]):
params = ["proverif", "-test", *extra_args, file]
logger.debug(params)
@@ -42,22 +42,17 @@ def _run_proverif(file, extra_args=[]):
try:
null, p = clean_warnings_init()
for line in process.stdout:
# Spurious comment -- karo
#print(f"received a line: {line.strip()}")
# clean warnings
line = line.rstrip()
# Maybe we can move this stuff into a function? -- karo
if not pkgs.re.match(r"^Warning: identifier \w+ rebound.$", line):
if p != null:
yield p
# Spurious comment -- karo
#print(p)
p = line
else:
p = null
if p != null:
yield p
#print(p)
except Exception as e:
# When does this happen? Should the error even be ignored? Metaverif should probably just abort here, right? --karo
logger.error(f"An error occurred: {e}")
@@ -66,45 +61,19 @@ def _run_proverif(file, extra_args=[]):
process.wait()
@click.command()
@click.argument("file")
@click.argument("cpp_prep")
def cpp(file, cpp_prep):
_cpp(file, cpp_prep)
def _cpp(file, cpp_prep):
logger.debug(f"_cpp: {file}, {cpp_prep}")
file_path = pkgs.pathlib.Path(file)
dirname = file_path.parent
cwd = pkgs.pathlib.Path.cwd()
params = ["cpp", "-P", f"-I{cwd}/{dirname}", file, "-o", cpp_prep]
params = ["cpp", "-P", f"-I{dirname}", file, "-o", cpp_prep]
return exc(params, stderr=pkgs.sys.stderr)
# This appears ti be unused? -- karo
@click.command()
def parse_result_line():
for outp in pkgs.sys.stdin:
match = pkgs.re.search(r'^RESULT .* \b(true|false)\b\.$', outp)
if match:
result = match.group(1)
print(result, flush=True)
else:
pass
@click.command()
@click.argument("cpp_prep")
@click.argument("awk_prep")
def awk(cpp_prep, awk_prep):
_awk(cpp_prep, awk_prep)
def _awk(cpp_prep, awk_prep):
params = ["awk", "-f", "marzipan/marzipan.awk", cpp_prep]
def awk(repo_path, cpp_prep, awk_prep):
params = ["awk", "-f", str(pkgs.os.path.join(repo_path, "marzipan/marzipan.awk")), cpp_prep]
with open(awk_prep, 'a+') as file:
exc(params, stderr=pkgs.sys.stderr, stdout=file)
file.write("\nprocess main")
@@ -120,35 +89,41 @@ def pretty_output_init(file_path):
expected = []
descs = []
# Process lemmas first
with open(file_path, 'r') as file:
content = file.read()
# Process lemmas first
result = pkgs.re.findall(r'@(lemma)(?=\s+\"([^\"]*)\")', content)
if result:
# This is structured like a bash file; lets use python typed variables instead of stuff like 'true' :) -- karo
expected.extend(['true' if e[0] == 'lemma' else e[0] for e in result])
# The regex only returns lemmas. For lemmas, we always expect the result 'true' from ProVerif.
expected.extend([True for _ in range(len(result))])
descs.extend([e[1] for e in result])
# Then process regular queries
result = pkgs.re.findall(r'@(query|reachable)(?=\s+"[^\"]*")', content)
if result:
# Same here -- karo
expected.extend(['true' if e == '@query' else 'false' for e in result])
# For queries, we expect 'true' from ProVerif, for reachable, we expect 'false'.
expected.extend([e == '@query' for e in result])
reachable_result = pkgs.re.findall(r'@(query|reachable)\s+"([^\"]*)"', content)
descs.extend([e[1] for e in reachable_result])
ta = pkgs.time.time()
res = 0
ctr = 0
# If its a tuple, add some braces -- karo
return ta, res, ctr, expected, descs
return (ta, res, ctr, expected, descs)
def pretty_output_step(file_path, line, expected, descs, res, ctr, ta):
tz = pkgs.time.time()
# Output from ProVerif contains a trailing newline, which we do not have in the expected output. Remove it for meaningful matching.
outp_clean = line.rstrip()
outp_clean_raw = line.rstrip()
if outp_clean_raw == 'true':
outp_clean = True
elif outp_clean_raw == 'false':
outp_clean = False
else:
outp_clean = outp_clean_raw
if outp_clean == expected[ctr]:
pretty_output_line(f"{int(tz - ta)}s ", "", "green", descs[ctr])
@@ -159,49 +134,63 @@ def pretty_output_step(file_path, line, expected, descs, res, ctr, ta):
ctr += 1
ta = tz
return res, ctr, ta
return (res, ctr, ta)
@click.command()
@click.argument("file_path")
def pretty_output(file_path):
ta, res, ctr, expected, descs = pretty_output_init(file_path)
(ta, res, ctr, expected, descs) = pretty_output_init(file_path)
for line in pkgs.sys.stdin:
res, ctr, ta = pretty_output_step(file_path, line, expected, descs, res, ctr, ta)
(res, ctr, ta) = pretty_output_step(file_path, line, expected, descs, res, ctr, ta)
@click.command()
@click.argument("path")
def analyze(path):
# Use relative paths in here, do not chdir. Chdir was used due to bash limitations -- karo
pkgs.os.chdir(path)
def get_target_dir(path):
return pkgs.os.path.join(path, target_subdir)
tmpdir = "target/proverif"
pkgs.os.makedirs(tmpdir, exist_ok=True)
@main.command()
@click.argument("repo_path")
def analyze(repo_path):
target_dir = get_target_dir(repo_path)
pkgs.os.makedirs(target_dir, exist_ok=True)
entries = []
entries.extend(sorted(pkgs.glob.glob('analysis/*.entry.mpv')))
analysis_dir = pkgs.os.path.join(repo_path, 'analysis')
entries.extend(sorted(pkgs.glob.glob(str(analysis_dir) + '/*.entry.mpv')))
error = False
with pkgs.concurrent.futures.ProcessPoolExecutor() as executor:
futures = {executor.submit(_metaverif, tmpdir, entry): entry for entry in entries}
futures = {executor.submit(metaverif, repo_path, target_dir, entry): entry for entry in entries}
for future in pkgs.concurrent.futures.as_completed(futures):
cmd = futures[future]
# TODO: this does not seem to be the right way to find out whether
# the future ended with an exception?
try:
# Spurious comment -- karo
#res = future.result()
print(f"Metaverif {cmd} finished.", file=pkgs.sys.stderr)
except Exception as e:
# Should probably affect the exit status -- karo
error = True
print(f"Metaverif {cmd} generated an exception: {e}")
print("all processes finished.")
if error:
exit(1)
else:
exit(0)
# What is this? -- karo
@click.command()
def clean():
click.echo("foo")
pass
@main.command()
@click.argument("repo_path")
def clean(repo_path):
target_dir = get_target_dir(repo_path)
if pkgs.os.path.isdir(target_dir):
for filename in pkgs.os.listdir(target_dir):
file_path = pkgs.os.path.join(target_dir, filename)
if pkgs.os.path.isfile(file_path) and pkgs.os.path.splitext(file_path)[1] in [".pv", ".log"]:
try:
pkgs.os.remove(file_path)
except Exception as e:
print(f"Error deleting {file_path}: {str(e)}")
def clean_warnings_init():
# This trick with the sentinel value is very much a bashism -- karo
@@ -229,36 +218,16 @@ def clean_warnings_finalize(null, p):
print(p)
@click.command()
def clean_warnings():
# Lots of spurious comments -- karo
#null = "0455290a-50d5-4f28-8008-3d69605c2835"
#p = null
null, p = clean_warnings_init()
for line in pkgs.sys.stdin:
p = clean_warnings_line(null, p, line)
#line = line.rstrip()
#if not pkgs.re.match(r"^Warning: identifier \w+ rebound.$", line):
# if p != null:
# print(p)
# p = line
#else:
# p = null
clean_warnings_finalize(null, p)
## print last line after EOF
#if p != null:
# print(p)
@click.command()
@click.argument("tmpdir")
@click.argument("file")
def metaverif(tmpdir, file):
metaverif(tmpdir, file)
def _metaverif(tmpdir, file):
def metaverif(repo_path, tmpdir, file):
print(f"Start metaverif on {file}")
# Extract the name using regex
name_match = pkgs.re.search(r'([^/]*)(?=\.mpv)', file)
@@ -274,44 +243,25 @@ def _metaverif(tmpdir, file):
print(f"CPP Prep Path: {cpp_prep}")
print(f"AWK Prep Path: {awk_prep}")
# These subcommands should probably go -- karo
_cpp(file, cpp_prep)
_awk(cpp_prep, awk_prep)
cpp(file, cpp_prep)
awk(repo_path, cpp_prep, awk_prep)
log_file = pkgs.os.path.join(tmpdir, f"{name}.log")
ta, res, ctr, expected, descs = pretty_output_init(cpp_prep)
with open(log_file, 'a') as log:
generator = _run_proverif(awk_prep)
generator = run_proverif(awk_prep)
for line in generator:
log.write(line)
# parse-result-line:
match = pkgs.re.search(r'^RESULT .* \b(true|false)\b\.$', line)
if match:
result = match.group(1)
#print(result, flush=True)
# pretty-output:
res, ctr, ta = pretty_output_step(cpp_prep, result, expected, descs, res, ctr, ta)
else:
print("No match found for the file name.")
# Implement sensible main function -- karo
@export
@rename("main") # Click seems to erase __name__
@click.group()
def main():
#pkgs.IPython.embed()
pkgs.logging.basicConfig(level=pkgs.logging.DEBUG)
pass
main.add_command(analyze)
main.add_command(metaverif)
main.add_command(clean)
main.add_command(run_proverif)
main.add_command(cpp)
main.add_command(awk)
main.add_command(parse_result_line)
main.add_command(pretty_output)
main.add_command(clean_warnings)
if __name__ == "__main__":
main()