perf(queue): batch registry hydration via Job.fetch_many#212
Merged
Conversation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
get_existing_trial_jobs()was issuing oneHGETALLper job per registry plus a redundantHGET statusper queued job, turningcloud monitor/cloud statussnapshots into thousands of serial round-trips over the operator-side SSH tunnel.Job.fetchloops with oneJob.fetch_manycall per registry and dropped the tautologicalis_queuedrecheck (jobs returned byQueue.get_job_ids()are already queued by RQ contract).O(jobs) × RTTtoO(registries) × RTT(~6 round-trips), regardless of experiment size.Description
Symptom. From the operator (Korea → asia-south1-c, ~80 ms RTT),
crsbench cloud monitorandcloud statuswould attach the IAP/SSH tunnel, complete the Redis PING, then appear to hang. py-spy showed the main thread parked in:Why operator-only. The orchestrator's own
monitor_queueruns against loopback Redis (RTT < 1 ms) and finishes in seconds. The operator goes throughIAP → ssh -L → redis:6379, so the same per-job loop becomes ~N × 80ms. For an experiment with ~2169 trial jobs that's ~3 minutes per snapshot poll.Fix.
rq.Job.fetch_many(job_ids, connection=…)already pipelinesHGETALLs server-side; this is whatget_all_jobs()was already doing for the queued bucket. Apply the same batching to each registry hydration loop and drop the redundant per-jobis_queuedround-trip. No change to wire format, queue model, or orchestrator behavior — purely client-side batching.Tests.
test_get_existing_trial_jobs_filters_to_requested_experimentto monkeypatchJob.fetch_many(the new call site).test_get_existing_trial_jobs_batches_one_fetch_per_registryas a regression guard: asserts exactly one batched call per non-empty registry and that empty registries short-circuit (no Redis call).test_queue_cleanup.py,test_queue_monitor.py,test_distributed_jobs.py,test_run_experiment_distributed.pypass.Test plan
uv run pytest tests/test_queue_cleanup.py tests/test_queue_monitor.py tests/test_distributed_jobs.py tests/test_run_experiment_distributed.py(191 passed)scripts/ci-tests/run-local.sh checks(typecheck + lint + format clean)crsbench cloud monitoragainst a live remote orchestrator returns first snapshot within seconds instead of minutes