Skip to content

perf(queue): batch registry hydration via Job.fetch_many#212

Merged
acorn421 merged 1 commit into
mainfrom
fix/queue-snapshot-batched-fetch
Apr 27, 2026
Merged

perf(queue): batch registry hydration via Job.fetch_many#212
acorn421 merged 1 commit into
mainfrom
fix/queue-snapshot-batched-fetch

Conversation

@acorn421

@acorn421 acorn421 commented Apr 27, 2026

Copy link
Copy Markdown
Contributor

Summary

  • get_existing_trial_jobs() was issuing one HGETALL per job per registry plus a redundant HGET status per queued job, turning cloud monitor / cloud status snapshots into thousands of serial round-trips over the operator-side SSH tunnel.
  • Replaced the per-id Job.fetch loops with one Job.fetch_many call per registry and dropped the tautological is_queued recheck (jobs returned by Queue.get_job_ids() are already queued by RQ contract).
  • Snapshot cost drops from O(jobs) × RTT to O(registries) × RTT (~6 round-trips), regardless of experiment size.

Description

Symptom. From the operator (Korea → asia-south1-c, ~80 ms RTT), crsbench cloud monitor and cloud status would attach the IAP/SSH tunnel, complete the Redis PING, then appear to hang. py-spy showed the main thread parked in:

redis._parsers.socket._read_from_socket
rq.job.get_status        # HGET status
rq.job.is_queued
crsbench/distributed/queue.py:get_existing_trial_jobs
crsbench/distributed/queue_monitor.py:build_monitor_snapshot

Why operator-only. The orchestrator's own monitor_queue runs against loopback Redis (RTT < 1 ms) and finishes in seconds. The operator goes through IAP → ssh -L → redis:6379, so the same per-job loop becomes ~N × 80ms. For an experiment with ~2169 trial jobs that's ~3 minutes per snapshot poll.

Fix. rq.Job.fetch_many(job_ids, connection=…) already pipelines HGETALLs server-side; this is what get_all_jobs() was already doing for the queued bucket. Apply the same batching to each registry hydration loop and drop the redundant per-job is_queued round-trip. No change to wire format, queue model, or orchestrator behavior — purely client-side batching.

Tests.

  • Updated test_get_existing_trial_jobs_filters_to_requested_experiment to monkeypatch Job.fetch_many (the new call site).
  • Added test_get_existing_trial_jobs_batches_one_fetch_per_registry as a regression guard: asserts exactly one batched call per non-empty registry and that empty registries short-circuit (no Redis call).
  • All 191 tests in test_queue_cleanup.py, test_queue_monitor.py, test_distributed_jobs.py, test_run_experiment_distributed.py pass.

Test plan

  • uv run pytest tests/test_queue_cleanup.py tests/test_queue_monitor.py tests/test_distributed_jobs.py tests/test_run_experiment_distributed.py (191 passed)
  • scripts/ci-tests/run-local.sh checks (typecheck + lint + format clean)
  • crsbench cloud monitor against a live remote orchestrator returns first snapshot within seconds instead of minutes

@acorn421 acorn421 merged commit 017f29b into main Apr 27, 2026
5 checks passed
@acorn421 acorn421 deleted the fix/queue-snapshot-batched-fetch branch April 27, 2026 06:11
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant