test: run integration suite under both storage engines (part of #360)#362
test: run integration suite under both storage engines (part of #360)#362hardbyte wants to merge 2 commits into
Conversation
awa's caller-facing contract is engine-invariant, but the default harness forced the canonical engine, so the queue-storage implementations of that contract were never exercised by the broad integration suite — the gap that let the leases.unique_key cancel defect ship. - awa-testing: `reset_runtime_backend` now honours `AWA_TEST_ENGINE` (canonical default | queue_storage); split out `reset_to_canonical` / `activate_queue_storage`; add `TestEngine` / `test_engine()` / `skip_unless_canonical()`. - awa-testing: `work_one_in_queue` is engine-aware — under queue storage it claims, runs, and records the outcome through the runtime store API (claim_runtime_batch / complete_runtime_batch / retry_after / snooze / cancel_running / fail_terminal / enter_callback_wait) instead of writing the awa.jobs view, which the engine rejects. - integration_test: gate the genuinely canonical-only tests (admin-metadata caches, the canonical running-cancel notification, and three that manufacture job state via raw `UPDATE awa.jobs`); route the cancel regression test through the shared activate_queue_storage helper. `cargo test -p awa --test integration_test` is now 44/44 under both AWA_TEST_ENGINE=canonical and AWA_TEST_ENGINE=queue_storage. Addresses part of #360; the forward-compat (old-binary-against-V39) test and the CI engine-matrix dimension remain.
|
Warning Review limit reached
Next review available in: 33 minutes Enable usage-based reviews in Billing to review now. Otherwise, wait until the next included review is available. How can I continue?After more reviews become available, a review can be triggered using the To avoid repeated limits, reduce automatic review volume by pausing incremental auto-reviews earlier, using label-based review opt-in, excluding WIP or generated PR titles, or requesting reviews manually when the PR is ready. If your team needs uninterrupted high-volume reviews, an organization admin can enable usage-based reviews. How do review limits work?CodeRabbit enforces per-developer PR review limits for each organization. Most developers receive the normal plan review availability. For paid Pro and Pro+ PR reviews, CodeRabbit uses adaptive limits for sustained high-volume activity. When a developer's recent PR review activity reaches the 95th percentile or higher among CodeRabbit users, additional reviews become available more gradually as earlier reviews age out of the rolling window. Please refer docs for additional details. Review details⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (4)
📝 WalkthroughWalkthroughThe PR extends the ChangesQueue-storage test harness and engine-aware test guards
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related issues
Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 672bf7714a
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| let claimed = store | ||
| .claim_runtime_batch(&self.pool, &target_queue, 1, Duration::from_secs(300)) |
There was a problem hiding this comment.
Keep queue-storage claims scoped to the worker kind
When a queue contains jobs for multiple kinds, this queue-storage branch claims the next job by queue only and then runs the supplied worker on whatever was returned. The canonical path above filters with kind = worker.kind(), so work_one_in_queue(&EmailWorker, Some(queue)) used to skip an older payment job; under AWA_TEST_ENGINE=queue_storage it can claim that payment job, execute the wrong handler against ctx.job.kind, and record the wrong outcome instead of leaving it for its own worker.
Useful? React with 👍 / 👎.
| let resolved: Option<String> = sqlx::query_scalar(&format!( | ||
| "SELECT queue FROM {schema}.ready_entries WHERE kind = $1 ORDER BY job_id ASC LIMIT 1" | ||
| )) |
There was a problem hiding this comment.
Resolve only currently claimable queue-storage entries
For work_one() without an explicit queue, this query reads {schema}.ready_entries directly, but queue storage retains ready rows after they have been claimed/completed and determines availability via claim-head/tombstone/closure evidence. After an older same-kind job in another queue has already been processed, this can resolve that stale queue, claim_runtime_batch returns no rows, and the helper reports NoJob even though a claimable job of the requested kind exists in a different queue.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@awa-testing/src/lib.rs`:
- Around line 317-322: The WaitForCallback path in the job handling flow updates
the lease to waiting_external without persisting the current progress snapshot,
unlike the canonical transition. Update the callback-wait flow around get_job,
enter_callback_wait, and WorkResult::WaitingExternal so the buffered progress is
written before parking the job, either by passing progress_snapshot through
enter_callback_wait or storing it on the row before reloading the job.
- Around line 263-269: The runtime claim path in work_one_queue_storage is still
queue-only, so it can lease a job for the wrong worker kind when queue is
resolved from worker.kind(). Update the claim flow to preserve the worker kind
by threading worker.kind() into the claim_runtime_batch call or by adding a
kind-aware filter in the claim logic. Use the work_one_queue_storage method and
claim_runtime_batch call site to locate the change, and make sure the claimed
job is selected only if it matches the worker’s kind.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: a12dc878-c5db-41c9-933e-60bfec23e13c
📒 Files selected for processing (3)
awa-testing/src/lib.rsawa-testing/src/setup.rsawa/tests/integration_test.rs
| let claimed = store | ||
| .claim_runtime_batch(&self.pool, &target_queue, 1, Duration::from_secs(300)) | ||
| .await?; | ||
| let claimed = match claimed.into_iter().next() { | ||
| Some(claimed) => claimed, | ||
| None => return Ok(WorkResult::NoJob), | ||
| }; |
There was a problem hiding this comment.
🎯 Functional Correctness | 🟠 Major | 🏗️ Heavy lift
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Verify whether queue storage already has a kind-aware runtime claim API.
rg -n -C3 'claim_runtime.*kind|kind.*claim_runtime|claim_runtime_batch' awa-model/src/queue_storage.rs awa-testing/src/lib.rsRepository: hardbyte/awa
Length of output: 5116
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Inspect the queue-storage path and the canonical claim path around kind filtering.
sed -n '220,310p' awa-testing/src/lib.rs
printf '\n---\n'
rg -n -C4 'state = .available.|kind = \$1|worker-kind|work_one_in_queue|claim_runtime_batch_with_aging_for_instance|claim_runtime_batch\(' awa-model/src awa-testing/src
printf '\n---\n'
sed -n '5680,5785p' awa-model/src/queue_storage.rsRepository: hardbyte/awa
Length of output: 23146
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Inspect the runtime-claim implementation to see whether it filters by kind or only by queue.
sed -n '5214,5315p' awa-model/src/queue_storage.rs
printf '\n---\n'
sed -n '1,120p' awa-testing/src/lib.rs
printf '\n---\n'
rg -n -C2 'ready_entries|queue_claim_heads|queue_enqueue_heads|kind = \$1.*queue|queue = \$2.*kind' awa-model/src/queue_storage.rs awa-model/src/admin.rs awa-testing/src/lib.rsRepository: hardbyte/awa
Length of output: 25446
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Check whether queue-storage allows multiple kinds in one queue and whether claim paths filter by kind anywhere else.
sed -n '7658,7715p' awa-model/src/queue_storage.rs
printf '\n---\n'
sed -n '3720,3815p' awa-model/src/queue_storage.rs
printf '\n---\n'
rg -n -C2 'INSERT INTO .*ready_entries|ready_entries .* kind|WHERE .*queue = .*kind|kind = \$1.*queue = \$2|queue = \$2.*kind = \$1' awa-model/src/queue_storage.rs awa-model/src/admin.rs awa-testing/src/lib.rsRepository: hardbyte/awa
Length of output: 7279
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Check whether queue storage allows multiple kinds in one queue and whether claim paths filter by kind anywhere else.
sed -n '7658,7715p' awa-model/src/queue_storage.rs
printf '\n---\n'
sed -n '3720,3815p' awa-model/src/queue_storage.rs
printf '\n---\n'
rg -n -C2 'INSERT INTO .*ready_entries|ready_entries .* kind|WHERE .*queue = .*kind|kind = \$1.*queue = \$2|queue = \$2.*kind = \$1' awa-model/src/queue_storage.rs awa-model/src/admin.rs awa-testing/src/lib.rsRepository: hardbyte/awa
Length of output: 7279
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Inspect the runtime claim SQL and its call chain for any hidden kind predicate.
rg -n -C4 'async fn claim_ready_rows_tx|claim_ready_rows_tx\(|WHERE .*kind|kind = \$1|claim_runtime_batch_with_aging_physical' awa-model/src/queue_storage.rsRepository: hardbyte/awa
Length of output: 3666
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Read the full claim implementation and the surrounding schema for queue/kind routing.
sed -n '3574,3675p' awa-model/src/queue_storage.rs
printf '\n---\n'
rg -n -C3 'CREATE TABLE .*ready_entries|ready_entries .*kind|queue .*kind|UNIQUE .*queue.*kind|CHECK .*kind' awa-model/src/queue_storage.rsRepository: hardbyte/awa
Length of output: 4570
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Find the definition of claim_ready_runtime and inspect its filtering predicates.
rg -n -C6 'claim_ready_runtime|CREATE FUNCTION .*claim_ready_runtime|RETURNS .*claim_ready_runtime' awa-model/src awa-model/migrationsRepository: hardbyte/awa
Length of output: 50368
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Inspect the claim_ready_runtime function body for any kind predicate.
sed -n '2685,3035p' awa-model/migrations/v023_install_queue_storage_substrate.sql | rg -n -C4 'kind|WHERE|ready_segments|ready_entries|queue_claim_heads|claim_ready_runtime'Repository: hardbyte/awa
Length of output: 4397
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Pull the function body region and isolate every kind-related predicate/column reference.
sed -n '326,430p' awa-model/migrations/v023_install_queue_storage_substrate.sql
printf '\n---\n'
sed -n '430,560p' awa-model/migrations/v023_install_queue_storage_substrate.sql
printf '\n---\n'
rg -n -C2 'kind|WHERE p_queue|WHERE .*queue|p_queue' awa-model/migrations/v023_install_queue_storage_substrate.sqlRepository: hardbyte/awa
Length of output: 21112
Preserve worker kind when claiming from queue storage. work_one_queue_storage resolves a queue by kind when queue is None, but the runtime claim path is still queue-only, so a shared queue can lease a job for the wrong kind. Thread worker.kind() into the claim API or add a kind-aware filter here.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@awa-testing/src/lib.rs` around lines 263 - 269, The runtime claim path in
work_one_queue_storage is still queue-only, so it can lease a job for the wrong
worker kind when queue is resolved from worker.kind(). Update the claim flow to
preserve the worker kind by threading worker.kind() into the claim_runtime_batch
call or by adding a kind-aware filter in the claim logic. Use the
work_one_queue_storage method and claim_runtime_batch call site to locate the
change, and make sure the claimed job is selected only if it matches the
worker’s kind.
| match self.get_job(job.id).await?.callback_id { | ||
| Some(callback_id) => { | ||
| store | ||
| .enter_callback_wait(&self.pool, job.id, job.run_lease, callback_id) | ||
| .await?; | ||
| Ok(WorkResult::WaitingExternal(self.get_job(job.id).await?)) |
There was a problem hiding this comment.
🎯 Functional Correctness | 🟠 Major | ⚡ Quick win
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Inspect enter_callback_wait to confirm whether it can persist progress or needs a companion update.
rg -n -C4 'fn enter_callback_wait|enter_callback_wait\(' awa-model/src/queue_storage.rs awa-testing/src/lib.rsRepository: hardbyte/awa
Length of output: 2039
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Inspect the callback-wait implementation and the canonical waiting_external transition.
sed -n '11355,11410p' awa-model/src/queue_storage.rs
printf '\n---\n'
sed -n '280,340p' awa-testing/src/lib.rs
printf '\n---\n'
rg -n -C4 'waiting_external|progress_snapshot|enter_callback_wait_in_tx|enter_callback_wait\(' awa-model/src/queue_storage.rs awa-testing/src/lib.rsRepository: hardbyte/awa
Length of output: 24580
Persist progress_snapshot before parking on callback wait. The WaitForCallback branch updates the lease to waiting_external without writing progress, unlike the canonical path that stores progress on this transition. That drops in-flight buffered progress; pass the snapshot through enter_callback_wait or update the row before reloading the job.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@awa-testing/src/lib.rs` around lines 317 - 322, The WaitForCallback path in
the job handling flow updates the lease to waiting_external without persisting
the current progress snapshot, unlike the canonical transition. Update the
callback-wait flow around get_job, enter_callback_wait, and
WorkResult::WaitingExternal so the buffered progress is written before parking
the job, either by passing progress_snapshot through enter_callback_wait or
storing it on the row before reloading the job.
Addresses adversarial-review findings on the both-engine harness: - clean_queue is now engine-aware: under queue storage it drains every job-bearing substrate plane (ready_entries / deferred_jobs / leases / done_entries / dlq_entries) for the queue and releases its job_unique_claims, instead of a single `awa.jobs` view DELETE that only removed the queue head. This makes the suite idempotent across re-runs against a reused database (previously 44/0 fresh but 43/1 on re-run, and UniqueConflict on idx_awa_jobs_unique for ungated unique-key tests). - work_one_in_queue: a retryable error now reschedules with a long delay instead of 1s, matching canonical's non-re-claimable `retryable` semantics and removing the accumulation that drove the re-run flake. Claim deadline trimmed 300s -> 60s. enter_callback_wait's result is now asserted rather than discarded. - ci.yml: add a `rust-test-queue-storage` job that runs the integration suite with AWA_TEST_ENGINE=queue_storage on a fresh database, so the queue-storage path is actually enforced in CI (a separate DB avoids cross-engine job_unique_claims carryover). Verified: integration_test is 44/0 under both engines, and stays 44/0 over three consecutive runs against the same database under queue storage. Follow-ups (#360): port the running-job / multi-candidate cancel-by-key positive cases to queue storage; assert claimed kind on the None-queue path; the forward-compat (old-binary-vs-V39) test.
Part of #360.
Motivation
awa's caller-facing contract is engine-invariant, but the default test harness forces the canonical engine (
reset_runtime_backend), so the queue-storage implementations of that contract were never exercised by the broad integration suite. That blind spot is how the queue-storagecancel_by_unique_keycandidate query ({schema}.leases.unique_key, a column that doesn't exist) shipped — it was dead code to CI until #359.Changes
awa-testing:reset_runtime_backendnow honoursAWA_TEST_ENGINE(canonicaldefault |queue_storage). Split intoreset_to_canonical/activate_queue_storage, and addTestEngine/test_engine()/skip_unless_canonical().awa-testing:work_one_in_queueis engine-aware — under queue storage it claims, runs the handler, and records the outcome through the runtime store API (claim_runtime_batch→complete_runtime_batch/retry_after/snooze/cancel_running/fail_terminal/enter_callback_wait) instead of rawUPDATE awa.jobs(which the engine rejects). The handler-run is factored into a sharedrun_handler.integration_test: gate the genuinely canonical-only tests withskip_unless_canonical— the admin-metadata caches (canonical-trigger bookkeeping), the canonical running-cancel notification, and three tests that manufacture job state via rawUPDATE awa.jobs/ rely on multiple non-terminal same-key jobs. Their queue-storage equivalents are covered byqueue_storage_runtime_test+ the cancel regression test.Result
cargo test -p awa --test integration_testis 44/44 under bothAWA_TEST_ENGINE=canonicalandAWA_TEST_ENGINE=queue_storage(verified locally).fmt/clippy -D warningsclean.Remaining for #360
engine: [canonical, queue_storage]) for the correctness suites.Summary by CodeRabbit
Bug Fixes
Tests