Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .jules/bolt.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
## 2024-05-30 - Concurrent PR status checking
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Fix the entry date (year is off by two).

This learnings entry is dated 2024-05-30, but the change is being introduced on 2026-05-30. In a chronological log this misdates the entry by two years.

📝 Proposed fix
-## 2024-05-30 - Concurrent PR status checking
+## 2026-05-30 - Concurrent PR status checking
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
## 2024-05-30 - Concurrent PR status checking
## 2026-05-30 - Concurrent PR status checking
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In @.jules/bolt.md at line 1, The changelog entry header uses the wrong year;
update the header string "## 2024-05-30 - Concurrent PR status checking" in
.jules/bolt.md to the correct date "## 2026-05-30 - Concurrent PR status
checking" so the entry is chronologically accurate.

**Learning:** Subprocess calls to the GitHub CLI ('gh') sequentially, like in `_filter_to_still_open_prs` checking state for multiple PRs, are a significant performance bottleneck due to N+1 network requests.
**Action:** Use `concurrent.futures.ThreadPoolExecutor` and `.map` to fetch PR states concurrently, reducing the overall latency of the fan-out supervisor, whilst keeping the logic grounded in checking `_pr_is_still_open` independently.
89 changes: 43 additions & 46 deletions ralph_loop/cli.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
"""Command-line interface and top-level orchestration."""

from __future__ import annotations

import concurrent.futures
import argparse
import os
import re
Expand Down Expand Up @@ -335,9 +337,7 @@ def _validate_pr_metadata(
) -> str:
if pr_data.get("state") != "OPEN":
raise CommandError(
"PR {} is not open (state={}).".format(
pr_number, pr_data.get("state")
)
"PR {} is not open (state={}).".format(pr_number, pr_data.get("state"))
)
if pr_data.get("isDraft"):
raise CommandError(
Expand Down Expand Up @@ -544,23 +544,35 @@ def _filter_to_still_open_prs(pr_numbers: List[int]) -> List[int]:
swallow stale PRs because of a flaky network.
"""
kept: List[int] = []
for pr in pr_numbers:

# ⚡ Bolt Optimization: Check PR states concurrently
# Why: Sequential `gh` subprocess calls for each PR create a significant N+1
# network bottleneck when filtering the initial PR list.
# Impact: Reduces latency linearly with the number of open PRs (up to max_workers)
# Measurement: Compare execution time of fan-out startup with many open PRs
def _check_pr(pr: int) -> Tuple[int, Optional[bool], Optional[Exception]]:
try:
still_open = _pr_is_still_open(pr)
return pr, _pr_is_still_open(pr), None
except CommandError as exc:
_print_step(
"Could not confirm PR #{} open state ({}); keeping it in the "
"fan-out set.".format(pr, exc)
)
kept.append(pr)
continue
if still_open:
kept.append(pr)
else:
_print_step(
"PR #{} is no longer open (per gh pr view); skipping "
"fan-out spawn.".format(pr)
)
return pr, None, exc

with concurrent.futures.ThreadPoolExecutor(
max_workers=min(10, len(pr_numbers) or 1)
) as executor:
for pr, still_open, exc in executor.map(_check_pr, pr_numbers):
if exc is not None:
_print_step(
"Could not confirm PR #{} open state ({}); keeping it in the "
"fan-out set.".format(pr, exc)
)
kept.append(pr)
elif still_open:
kept.append(pr)
else:
_print_step(
"PR #{} is no longer open (per gh pr view); skipping "
"fan-out spawn.".format(pr)
)
return kept


Expand Down Expand Up @@ -605,9 +617,7 @@ def _fan_out_all_prs(
if args.fan_out_log_dir:
log_root = os.path.abspath(os.path.expanduser(args.fan_out_log_dir))
else:
log_root = os.path.join(
os.path.dirname(script_path), ".ralph-logs", "fan-out"
)
log_root = os.path.join(os.path.dirname(script_path), ".ralph-logs", "fan-out")
os.makedirs(log_root, exist_ok=True)
stuck_timeout = max(60, args.fan_out_stuck_timeout_seconds)
respawn_backoff = max(1, args.fan_out_respawn_backoff_seconds)
Expand Down Expand Up @@ -700,9 +710,7 @@ def _request_reload(_signum, _frame):
reason = "ordinary exit"
_print_step(
"PR #{} loop exited with code {} ({}); respawning after "
"{}s (log: {})".format(
pr, rc, reason, backoff_for_pr, log_path
)
"{}s (log: {})".format(pr, rc, reason, backoff_for_pr, log_path)
)
try:
with open(log_path, "ab", buffering=0) as marker:
Expand Down Expand Up @@ -796,15 +804,11 @@ def _request_reload(_signum, _frame):
# ordinary-exit and stuck-timeout branches above, which both
# rewrite pending_backoff to ``respawn_backoff``.
_print_step(
"Respawned PR #{} pid={} (log: {})".format(
pr, proc.pid, log_path
)
"Respawned PR #{} pid={} (log: {})".format(pr, proc.pid, log_path)
)
finally:
if reload_requested["flag"]:
_print_step(
"Reload requested via SIGHUP; re-exec'ing supervisor"
)
_print_step("Reload requested via SIGHUP; re-exec'ing supervisor")
for pr, (proc, _log_path, log_handle, _spawned_at) in list(
children.items()
):
Expand All @@ -822,9 +826,7 @@ def _request_reload(_signum, _frame):
signal.signal(signal.SIGHUP, previous_hup)
sys.stdout.flush()
sys.stderr.flush()
os.execv(
sys.executable, [sys.executable, script_path] + sys.argv[1:]
)
os.execv(sys.executable, [sys.executable, script_path] + sys.argv[1:])
for pr, (proc, _log_path, log_handle, _spawned_at) in list(children.items()):
try:
proc.terminate()
Expand Down Expand Up @@ -854,9 +856,7 @@ def _request_reload(_signum, _frame):
return 0


def _resolve_target_directories(
raw_dirs: List[str], recursive: bool
) -> List[str]:
def _resolve_target_directories(raw_dirs: List[str], recursive: bool) -> List[str]:
"""Expand the user-supplied directory args into concrete repo paths.

- If ``recursive`` is set, each input path is treated as a parent and we
Expand All @@ -878,9 +878,7 @@ def _push(path: str) -> None:
path = os.path.abspath(os.path.expanduser(raw))
if not os.path.isdir(path):
raise CommandError(
"Target directory does not exist or is not a directory: {}".format(
raw
)
"Target directory does not exist or is not a directory: {}".format(raw)
)
if not recursive:
_push(path)
Expand Down Expand Up @@ -933,7 +931,10 @@ def _fan_out_across_directories(
base_args.append(token)
procs: List[Tuple[str, subprocess.Popen, str, Any]] = []
for target_dir in target_dirs:
slug = re.sub(r"[^A-Za-z0-9._-]+", "-", os.path.basename(target_dir.rstrip("/"))) or "repo"
slug = (
re.sub(r"[^A-Za-z0-9._-]+", "-", os.path.basename(target_dir.rstrip("/")))
or "repo"
)
log_path = os.path.join(log_root, "{}.log".format(slug))
log_handle = open(log_path, "ab", buffering=0)
cmd = [sys.executable, script_path] + base_args + [target_dir]
Expand Down Expand Up @@ -1127,9 +1128,7 @@ def _handle_shutdown(signum, _frame):
)

pr_target = str(pr_number)
_print_step(
"Using PR #{} {}".format(pr_number, pr_data.get("url", ""))
)
_print_step("Using PR #{} {}".format(pr_number, pr_data.get("url", "")))
_mark_pr_needs_review(pr_target)
if not args.skip_rebase:
_print_step("Initial rebase before review/fix loop")
Expand Down Expand Up @@ -1345,9 +1344,7 @@ def _handle_shutdown(signum, _frame):

if not args.skip_merge:
fresh_pr_data = _pr_view(str(pr_number))
fresh_branch = _validate_pr_metadata(
fresh_pr_data, pr_number, args.base
)
fresh_branch = _validate_pr_metadata(fresh_pr_data, pr_number, args.base)
if fresh_branch != branch:
raise CommandError(
"PR #{} head branch changed from '{}' to '{}' during the run.".format(
Expand Down