diff --git a/.jules/bolt.md b/.jules/bolt.md new file mode 100644 index 0000000..9118c9b --- /dev/null +++ b/.jules/bolt.md @@ -0,0 +1,3 @@ +## 2024-05-30 - Concurrent PR status checking +**Learning:** Subprocess calls to the GitHub CLI ('gh') sequentially, like in `_filter_to_still_open_prs` checking state for multiple PRs, are a significant performance bottleneck due to N+1 network requests. +**Action:** Use `concurrent.futures.ThreadPoolExecutor` and `.map` to fetch PR states concurrently, reducing the overall latency of the fan-out supervisor, whilst keeping the logic grounded in checking `_pr_is_still_open` independently. diff --git a/ralph_loop/cli.py b/ralph_loop/cli.py index da6a240..b48a9fa 100644 --- a/ralph_loop/cli.py +++ b/ralph_loop/cli.py @@ -1,6 +1,8 @@ """Command-line interface and top-level orchestration.""" + from __future__ import annotations +import concurrent.futures import argparse import os import re @@ -335,9 +337,7 @@ def _validate_pr_metadata( ) -> str: if pr_data.get("state") != "OPEN": raise CommandError( - "PR {} is not open (state={}).".format( - pr_number, pr_data.get("state") - ) + "PR {} is not open (state={}).".format(pr_number, pr_data.get("state")) ) if pr_data.get("isDraft"): raise CommandError( @@ -544,23 +544,35 @@ def _filter_to_still_open_prs(pr_numbers: List[int]) -> List[int]: swallow stale PRs because of a flaky network. """ kept: List[int] = [] - for pr in pr_numbers: + + # ⚡ Bolt Optimization: Check PR states concurrently + # Why: Sequential `gh` subprocess calls for each PR create a significant N+1 + # network bottleneck when filtering the initial PR list. + # Impact: Reduces latency linearly with the number of open PRs (up to max_workers) + # Measurement: Compare execution time of fan-out startup with many open PRs + def _check_pr(pr: int) -> Tuple[int, Optional[bool], Optional[Exception]]: try: - still_open = _pr_is_still_open(pr) + return pr, _pr_is_still_open(pr), None except CommandError as exc: - _print_step( - "Could not confirm PR #{} open state ({}); keeping it in the " - "fan-out set.".format(pr, exc) - ) - kept.append(pr) - continue - if still_open: - kept.append(pr) - else: - _print_step( - "PR #{} is no longer open (per gh pr view); skipping " - "fan-out spawn.".format(pr) - ) + return pr, None, exc + + with concurrent.futures.ThreadPoolExecutor( + max_workers=min(10, len(pr_numbers) or 1) + ) as executor: + for pr, still_open, exc in executor.map(_check_pr, pr_numbers): + if exc is not None: + _print_step( + "Could not confirm PR #{} open state ({}); keeping it in the " + "fan-out set.".format(pr, exc) + ) + kept.append(pr) + elif still_open: + kept.append(pr) + else: + _print_step( + "PR #{} is no longer open (per gh pr view); skipping " + "fan-out spawn.".format(pr) + ) return kept @@ -605,9 +617,7 @@ def _fan_out_all_prs( if args.fan_out_log_dir: log_root = os.path.abspath(os.path.expanduser(args.fan_out_log_dir)) else: - log_root = os.path.join( - os.path.dirname(script_path), ".ralph-logs", "fan-out" - ) + log_root = os.path.join(os.path.dirname(script_path), ".ralph-logs", "fan-out") os.makedirs(log_root, exist_ok=True) stuck_timeout = max(60, args.fan_out_stuck_timeout_seconds) respawn_backoff = max(1, args.fan_out_respawn_backoff_seconds) @@ -700,9 +710,7 @@ def _request_reload(_signum, _frame): reason = "ordinary exit" _print_step( "PR #{} loop exited with code {} ({}); respawning after " - "{}s (log: {})".format( - pr, rc, reason, backoff_for_pr, log_path - ) + "{}s (log: {})".format(pr, rc, reason, backoff_for_pr, log_path) ) try: with open(log_path, "ab", buffering=0) as marker: @@ -796,15 +804,11 @@ def _request_reload(_signum, _frame): # ordinary-exit and stuck-timeout branches above, which both # rewrite pending_backoff to ``respawn_backoff``. _print_step( - "Respawned PR #{} pid={} (log: {})".format( - pr, proc.pid, log_path - ) + "Respawned PR #{} pid={} (log: {})".format(pr, proc.pid, log_path) ) finally: if reload_requested["flag"]: - _print_step( - "Reload requested via SIGHUP; re-exec'ing supervisor" - ) + _print_step("Reload requested via SIGHUP; re-exec'ing supervisor") for pr, (proc, _log_path, log_handle, _spawned_at) in list( children.items() ): @@ -822,9 +826,7 @@ def _request_reload(_signum, _frame): signal.signal(signal.SIGHUP, previous_hup) sys.stdout.flush() sys.stderr.flush() - os.execv( - sys.executable, [sys.executable, script_path] + sys.argv[1:] - ) + os.execv(sys.executable, [sys.executable, script_path] + sys.argv[1:]) for pr, (proc, _log_path, log_handle, _spawned_at) in list(children.items()): try: proc.terminate() @@ -854,9 +856,7 @@ def _request_reload(_signum, _frame): return 0 -def _resolve_target_directories( - raw_dirs: List[str], recursive: bool -) -> List[str]: +def _resolve_target_directories(raw_dirs: List[str], recursive: bool) -> List[str]: """Expand the user-supplied directory args into concrete repo paths. - If ``recursive`` is set, each input path is treated as a parent and we @@ -878,9 +878,7 @@ def _push(path: str) -> None: path = os.path.abspath(os.path.expanduser(raw)) if not os.path.isdir(path): raise CommandError( - "Target directory does not exist or is not a directory: {}".format( - raw - ) + "Target directory does not exist or is not a directory: {}".format(raw) ) if not recursive: _push(path) @@ -933,7 +931,10 @@ def _fan_out_across_directories( base_args.append(token) procs: List[Tuple[str, subprocess.Popen, str, Any]] = [] for target_dir in target_dirs: - slug = re.sub(r"[^A-Za-z0-9._-]+", "-", os.path.basename(target_dir.rstrip("/"))) or "repo" + slug = ( + re.sub(r"[^A-Za-z0-9._-]+", "-", os.path.basename(target_dir.rstrip("/"))) + or "repo" + ) log_path = os.path.join(log_root, "{}.log".format(slug)) log_handle = open(log_path, "ab", buffering=0) cmd = [sys.executable, script_path] + base_args + [target_dir] @@ -1127,9 +1128,7 @@ def _handle_shutdown(signum, _frame): ) pr_target = str(pr_number) - _print_step( - "Using PR #{} {}".format(pr_number, pr_data.get("url", "")) - ) + _print_step("Using PR #{} {}".format(pr_number, pr_data.get("url", ""))) _mark_pr_needs_review(pr_target) if not args.skip_rebase: _print_step("Initial rebase before review/fix loop") @@ -1345,9 +1344,7 @@ def _handle_shutdown(signum, _frame): if not args.skip_merge: fresh_pr_data = _pr_view(str(pr_number)) - fresh_branch = _validate_pr_metadata( - fresh_pr_data, pr_number, args.base - ) + fresh_branch = _validate_pr_metadata(fresh_pr_data, pr_number, args.base) if fresh_branch != branch: raise CommandError( "PR #{} head branch changed from '{}' to '{}' during the run.".format(