From 62ed2fdacd8b55084151a0b320d2befb025b1c10 Mon Sep 17 00:00:00 2001 From: Stevengre Date: Tue, 12 May 2026 14:23:32 +0800 Subject: [PATCH] feat: add `--per-depth-timeout` option for kontrol prove Adds progressive depth-halving with a per-attempt **stall window**. When `--per-depth-timeout S` is set, each attempt is given an initial window of `current_depth * S` seconds. The parent polls the proof's on-disk subdir every window: if any file mtime changed (i.e. the prove committed at least one step), the window is reset for another round. If no progress is observed across a full window, the entire subprocess session is reaped with `os.killpg(..., SIGKILL)` (Python + KoreServer + parallel-frontier worker threads), `current_depth` is halved (floor 1), and the next attempt resumes from the disk-persisted KCFG state. Each attempt runs in a forked subprocess that calls `os.setsid()` to become its own session leader, so a single `killpg` reaps the entire subtree. The proof state is persisted by `advance_proof`'s maintenance loop (maintenance_rate=1, default), so on-disk KCFG state is current up to the last committed step at the moment of the kill. Default 0 disables the wrapper: the existing single-attempt `run_prover(...)` path is taken with no subprocess overhead. --- src/kontrol/cli.py | 12 ++++ src/kontrol/options.py | 2 + src/kontrol/prove.py | 125 +++++++++++++++++++++++++++++++++++++++++ 3 files changed, 139 insertions(+) diff --git a/src/kontrol/cli.py b/src/kontrol/cli.py index f34a26c8e..39074e237 100644 --- a/src/kontrol/cli.py +++ b/src/kontrol/cli.py @@ -554,6 +554,18 @@ def parse(s: str) -> list[T]: action='store_true', help='Generate a Solidity test contract with concrete counterexample values when proofs fail.', ) + prove_args.add_argument( + '--per-depth-timeout', + type=int, + default=None, + dest='per_depth_timeout', + help=( + 'Per-depth wall-clock timeout in seconds. When > 0, each prove attempt is given ' + '`max_depth * per_depth_timeout` seconds; if it exceeds the budget, `max_depth` is halved and the ' + 'proof resumes from saved KCFG state. Repeats down to depth=1 (timeout = per_depth_timeout). ' + 'Default 0 disables progressive halving.' + ), + ) show_args = command_parser.add_parser( 'show', diff --git a/src/kontrol/options.py b/src/kontrol/options.py index ad16d96ed..2418ae507 100644 --- a/src/kontrol/options.py +++ b/src/kontrol/options.py @@ -343,6 +343,7 @@ class ProveOptions( extra_module: str | None symbolic_caller: bool generate_counterexample: bool + per_depth_timeout: int def __init__(self, args: dict[str, Any]) -> None: super().__init__(args) @@ -376,6 +377,7 @@ def default() -> dict[str, Any]: 'extra_module': None, 'symbolic_caller': False, 'generate_counterexample': False, + 'per_depth_timeout': 0, } @staticmethod diff --git a/src/kontrol/prove.py b/src/kontrol/prove.py index c05604c29..761fae9a0 100644 --- a/src/kontrol/prove.py +++ b/src/kontrol/prove.py @@ -1,6 +1,8 @@ from __future__ import annotations import logging +import os +import signal import sys import time from abc import abstractmethod @@ -13,6 +15,7 @@ from kevm_pyk.cli import EVMChainOptions from kevm_pyk.kevm import KEVM, _process_jumpdests from kevm_pyk.utils import KDefinition__expand_macros, abstract_cell_vars, run_prover +import multiprocess as mp # type: ignore from multiprocess.pool import Pool # type: ignore from pyk.cterm import CTerm, CTermSymbolic from pyk.kast.inner import KApply, KSequence, KSort, KVariable, Subst @@ -311,7 +314,11 @@ def _run_cfg_group( summary_ids: Iterable[str], init_accounts: Iterable[KInner] = (), ) -> list[APRProof]: + _ATTEMPT_TIMEOUT: Any = object() + def init_and_run_proof(test: FoundryTest, progress: Progress | None = None) -> APRFailureInfo | Exception | None: + if options.per_depth_timeout > 0: + return _init_and_run_proof_progressive(test, progress) task: TaskID | None = None if progress is not None: @@ -479,6 +486,124 @@ def create_kcfg_explore() -> KCFGExplore: else: return proof.failure_info + def _init_and_run_proof_progressive( + test: FoundryTest, progress: Progress | None + ) -> APRFailureInfo | Exception | None: + current_depth = max(1, options.max_depth) + while True: + budget_s = current_depth * options.per_depth_timeout + outcome = _run_attempt_under_timeout(test, current_depth, budget_s) + if outcome is _ATTEMPT_TIMEOUT: + _LOGGER.warning( + f'Proof {test.id}: depth={current_depth} attempt exhausted {budget_s}s budget; halving.' + ) + if current_depth <= 1: + break + current_depth = max(1, current_depth // 2) + continue + return outcome # type: ignore[no-any-return] + # All attempts exhausted; load whatever state was persisted to disk. + if Proof.proof_data_exists(test.id, foundry.proofs_dir): + persisted = foundry.get_apr_proof(test.id) + if persisted.error_info is not None: + return persisted.error_info + return persisted.failure_info + return None + + def _run_attempt_under_timeout(test: FoundryTest, attempt_max_depth: int, budget_s: int) -> Any: + # Each attempt runs in a forked subprocess that becomes its own session leader. + # The parent treats `budget_s` as a *stall window*, not a hard total budget: every + # `budget_s` seconds it polls the on-disk proof_subdir for any file mtime change + # (each committed step touches proof.json + the KCFG files via maintenance_rate=1). + # See progress → continue waiting another `budget_s`. No progress → SIGKILL the + # whole session (Python process + KoreServer subprocess + worker threads) and + # report a timeout so the caller can halve max_depth. + proof_subdir = foundry.proofs_dir / test.id + + def progress_marker() -> float: + marker = 0.0 + try: + for root, _, files in os.walk(proof_subdir): + for fname in files: + try: + marker = max(marker, os.path.getmtime(os.path.join(root, fname))) + except OSError: + continue + except OSError: + pass + return marker + + ctx = mp.get_context('fork') + parent_conn, child_conn = ctx.Pipe(duplex=False) + + def _child_entry() -> None: + try: + os.setsid() + except OSError: + pass + try: + options.max_depth = attempt_max_depth + options.per_depth_timeout = 0 # prevent re-entering the progressive path + result = init_and_run_proof(test, progress=None) + try: + child_conn.send(('ok', result)) + except Exception: + pass + except BaseException as e: + try: + child_conn.send(('err', e)) + except Exception: + pass + finally: + try: + child_conn.close() + except Exception: + pass + + proc = ctx.Process(target=_child_entry) + proc.start() + try: + child_conn.close() + except Exception: + pass + + last_marker = progress_marker() + while True: + proc.join(timeout=budget_s) + if not proc.is_alive(): + break + current_marker = progress_marker() + if current_marker != last_marker: + last_marker = current_marker + continue # progress observed; grant another budget_s window + try: + os.killpg(os.getpgid(proc.pid), signal.SIGKILL) + except (ProcessLookupError, OSError): + pass + proc.join() + try: + parent_conn.close() + except Exception: + pass + return _ATTEMPT_TIMEOUT + + try: + if parent_conn.poll(): + tag, payload = parent_conn.recv() + else: + tag, payload = 'ok', None + except (EOFError, OSError, BrokenPipeError): + tag, payload = 'ok', None + finally: + try: + parent_conn.close() + except Exception: + pass + + if tag == 'err': + _LOGGER.error(f'Proof {test.id} subprocess raised: {payload}') + return payload + with Progress( SpinnerColumn(), TimeElapsedColumn(),