Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,43 @@ jobs:
env:
DATABASE_URL: postgres://postgres:postgres@localhost:5432/awa_test

# ─── Integration suite under the queue-storage engine ──
# awa's caller-facing contract is engine-invariant; the default suite runs
# canonical, so this leg re-runs it with the queue-storage engine active on a
# fresh database (a separate DB avoids cross-engine job_unique_claims carryover).
rust-test-queue-storage:
name: Rust tests (queue_storage)
needs: [rust-build]
if: github.event_name != 'pull_request' || contains(github.event.pull_request.labels.*.name, 'full-ci')
runs-on: ubuntu-latest
timeout-minutes: 30
services:
postgres:
image: postgres:17-alpine
env:
POSTGRES_PASSWORD: postgres
POSTGRES_DB: awa_test
ports:
- 5432:5432
options: >-
--health-cmd pg_isready
--health-interval 10s
--health-timeout 5s
--health-retries 5
steps:
- uses: actions/checkout@v6
- uses: dtolnay/rust-toolchain@stable
- uses: Swatinem/rust-cache@v2
with:
workspaces: ". -> target"
- name: Wait for Postgres
run: until pg_isready -h localhost -p 5432; do sleep 1; done
- name: Run integration suite under queue storage
run: cargo test -p awa --test integration_test
env:
DATABASE_URL: postgres://postgres:postgres@localhost:5432/awa_test
AWA_TEST_ENGINE: queue_storage

# ─── Rust bridge (tokio-postgres) ──────────────────────
rust-bridge-tokio-pg:
name: Rust bridge (tokio-postgres)
Expand Down
183 changes: 162 additions & 21 deletions awa-testing/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,21 @@ impl TestClient {
}

/// Claim and execute a single job, optionally filtered by queue.
///
/// Routes through the active storage engine: under queue storage it claims
/// and records the outcome via the runtime store API; under canonical it
/// drives the `awa.jobs` view directly.
pub async fn work_one_in_queue<W: Worker>(
&self,
worker: &W,
queue: Option<&str>,
) -> Result<WorkResult, AwaError> {
if let Some(schema) =
awa_model::queue_storage::QueueStorage::active_schema(&self.pool).await?
{
return self.work_one_queue_storage(worker, queue, &schema).await;
}

// Claim one job
let jobs: Vec<JobRow> = sqlx::query_as::<_, JobRow>(
r#"
Expand Down Expand Up @@ -102,27 +112,7 @@ impl TestClient {
None => return Ok(WorkResult::NoJob),
};

let cancel = Arc::new(AtomicBool::new(false));
let state: Arc<HashMap<std::any::TypeId, Box<dyn Any + Send + Sync>>> =
Arc::new(HashMap::new());
let progress = Arc::new(std::sync::Mutex::new(ProgressState::new(
job.progress.clone(),
)));
let ctx = JobContext::new_for_testing(
job.clone(),
cancel,
state,
self.pool.clone(),
progress.clone(),
);

let result = worker.perform(&ctx).await;

// Snapshot progress from the buffer after handler execution
let progress_snapshot: Option<serde_json::Value> = {
let guard = progress.lock().expect("progress lock poisoned");
guard.clone_latest()
};
let (result, progress_snapshot) = self.run_handler(&job, worker).await;

// Update job state based on result
match &result {
Expand Down Expand Up @@ -212,6 +202,157 @@ impl TestClient {
}
}

/// Build a testing `JobContext`, run the worker, and snapshot any progress
/// it buffered. Shared by the canonical and queue-storage work paths.
async fn run_handler<W: Worker>(
&self,
job: &JobRow,
worker: &W,
) -> (Result<JobResult, JobError>, Option<serde_json::Value>) {
let cancel = Arc::new(AtomicBool::new(false));
let state: Arc<HashMap<std::any::TypeId, Box<dyn Any + Send + Sync>>> =
Arc::new(HashMap::new());
let progress = Arc::new(std::sync::Mutex::new(ProgressState::new(
job.progress.clone(),
)));
let ctx = JobContext::new_for_testing(
job.clone(),
cancel,
state,
self.pool.clone(),
progress.clone(),
);
let result = worker.perform(&ctx).await;
let snapshot = {
let guard = progress.lock().expect("progress lock poisoned");
guard.clone_latest()
};
(result, snapshot)
}

/// Queue-storage variant of [`Self::work_one_in_queue`]: claim, run the
/// handler, and record the outcome through the runtime store API rather
/// than writing the `awa.jobs` view (which the engine rejects).
async fn work_one_queue_storage<W: Worker>(
&self,
worker: &W,
queue: Option<&str>,
schema: &str,
) -> Result<WorkResult, AwaError> {
use std::time::Duration;
let store = awa_model::queue_storage::QueueStorage::from_existing_schema(schema)?;

// The store claims per queue; when the caller did not pin one, resolve a
// queue carrying a ready job of this worker's kind.
let target_queue = match queue {
Some(queue) => queue.to_string(),
None => {
let resolved: Option<String> = sqlx::query_scalar(&format!(
"SELECT queue FROM {schema}.ready_entries WHERE kind = $1 ORDER BY job_id ASC LIMIT 1"
))
Comment on lines +250 to +252

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Resolve only currently claimable queue-storage entries

For work_one() without an explicit queue, this query reads {schema}.ready_entries directly, but queue storage retains ready rows after they have been claimed/completed and determines availability via claim-head/tombstone/closure evidence. After an older same-kind job in another queue has already been processed, this can resolve that stale queue, claim_runtime_batch returns no rows, and the helper reports NoJob even though a claimable job of the requested kind exists in a different queue.

Useful? React with 👍 / 👎.

.bind(worker.kind())
.fetch_optional(&self.pool)
.await?;
match resolved {
Some(queue) => queue,
None => return Ok(WorkResult::NoJob),
}
}
};

let claimed = store
.claim_runtime_batch(&self.pool, &target_queue, 1, Duration::from_secs(60))
.await?;
let claimed = match claimed.into_iter().next() {
Some(claimed) => claimed,
None => return Ok(WorkResult::NoJob),
};
Comment on lines +263 to +269

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🎯 Functional Correctness | 🟠 Major | 🏗️ Heavy lift

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Verify whether queue storage already has a kind-aware runtime claim API.
rg -n -C3 'claim_runtime.*kind|kind.*claim_runtime|claim_runtime_batch' awa-model/src/queue_storage.rs awa-testing/src/lib.rs

Repository: hardbyte/awa

Length of output: 5116


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Inspect the queue-storage path and the canonical claim path around kind filtering.
sed -n '220,310p' awa-testing/src/lib.rs
printf '\n---\n'
rg -n -C4 'state = .available.|kind = \$1|worker-kind|work_one_in_queue|claim_runtime_batch_with_aging_for_instance|claim_runtime_batch\(' awa-model/src awa-testing/src
printf '\n---\n'
sed -n '5680,5785p' awa-model/src/queue_storage.rs

Repository: hardbyte/awa

Length of output: 23146


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Inspect the runtime-claim implementation to see whether it filters by kind or only by queue.
sed -n '5214,5315p' awa-model/src/queue_storage.rs
printf '\n---\n'
sed -n '1,120p' awa-testing/src/lib.rs
printf '\n---\n'
rg -n -C2 'ready_entries|queue_claim_heads|queue_enqueue_heads|kind = \$1.*queue|queue = \$2.*kind' awa-model/src/queue_storage.rs awa-model/src/admin.rs awa-testing/src/lib.rs

Repository: hardbyte/awa

Length of output: 25446


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Check whether queue-storage allows multiple kinds in one queue and whether claim paths filter by kind anywhere else.
sed -n '7658,7715p' awa-model/src/queue_storage.rs
printf '\n---\n'
sed -n '3720,3815p' awa-model/src/queue_storage.rs
printf '\n---\n'
rg -n -C2 'INSERT INTO .*ready_entries|ready_entries .* kind|WHERE .*queue = .*kind|kind = \$1.*queue = \$2|queue = \$2.*kind = \$1' awa-model/src/queue_storage.rs awa-model/src/admin.rs awa-testing/src/lib.rs

Repository: hardbyte/awa

Length of output: 7279


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Check whether queue storage allows multiple kinds in one queue and whether claim paths filter by kind anywhere else.
sed -n '7658,7715p' awa-model/src/queue_storage.rs
printf '\n---\n'
sed -n '3720,3815p' awa-model/src/queue_storage.rs
printf '\n---\n'
rg -n -C2 'INSERT INTO .*ready_entries|ready_entries .* kind|WHERE .*queue = .*kind|kind = \$1.*queue = \$2|queue = \$2.*kind = \$1' awa-model/src/queue_storage.rs awa-model/src/admin.rs awa-testing/src/lib.rs

Repository: hardbyte/awa

Length of output: 7279


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Inspect the runtime claim SQL and its call chain for any hidden kind predicate.
rg -n -C4 'async fn claim_ready_rows_tx|claim_ready_rows_tx\(|WHERE .*kind|kind = \$1|claim_runtime_batch_with_aging_physical' awa-model/src/queue_storage.rs

Repository: hardbyte/awa

Length of output: 3666


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Read the full claim implementation and the surrounding schema for queue/kind routing.
sed -n '3574,3675p' awa-model/src/queue_storage.rs
printf '\n---\n'
rg -n -C3 'CREATE TABLE .*ready_entries|ready_entries .*kind|queue .*kind|UNIQUE .*queue.*kind|CHECK .*kind' awa-model/src/queue_storage.rs

Repository: hardbyte/awa

Length of output: 4570


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Find the definition of claim_ready_runtime and inspect its filtering predicates.
rg -n -C6 'claim_ready_runtime|CREATE FUNCTION .*claim_ready_runtime|RETURNS .*claim_ready_runtime' awa-model/src awa-model/migrations

Repository: hardbyte/awa

Length of output: 50368


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Inspect the claim_ready_runtime function body for any kind predicate.
sed -n '2685,3035p' awa-model/migrations/v023_install_queue_storage_substrate.sql | rg -n -C4 'kind|WHERE|ready_segments|ready_entries|queue_claim_heads|claim_ready_runtime'

Repository: hardbyte/awa

Length of output: 4397


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Pull the function body region and isolate every kind-related predicate/column reference.
sed -n '326,430p' awa-model/migrations/v023_install_queue_storage_substrate.sql
printf '\n---\n'
sed -n '430,560p' awa-model/migrations/v023_install_queue_storage_substrate.sql
printf '\n---\n'
rg -n -C2 'kind|WHERE p_queue|WHERE .*queue|p_queue' awa-model/migrations/v023_install_queue_storage_substrate.sql

Repository: hardbyte/awa

Length of output: 21112


Preserve worker kind when claiming from queue storage. work_one_queue_storage resolves a queue by kind when queue is None, but the runtime claim path is still queue-only, so a shared queue can lease a job for the wrong kind. Thread worker.kind() into the claim API or add a kind-aware filter here.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@awa-testing/src/lib.rs` around lines 263 - 269, The runtime claim path in
work_one_queue_storage is still queue-only, so it can lease a job for the wrong
worker kind when queue is resolved from worker.kind(). Update the claim flow to
preserve the worker kind by threading worker.kind() into the claim_runtime_batch
call or by adding a kind-aware filter in the claim logic. Use the
work_one_queue_storage method and claim_runtime_batch call site to locate the
change, and make sure the claimed job is selected only if it matches the
worker’s kind.

let job = claimed.job.clone();

let (result, progress_snapshot) = self.run_handler(&job, worker).await;

match &result {
Ok(JobResult::Completed) => {
store
.complete_runtime_batch(&self.pool, std::slice::from_ref(&claimed))
.await?;
Ok(WorkResult::Completed(job))
}
Ok(JobResult::Cancel(reason)) => {
store
.cancel_running(&self.pool, job.id, job.run_lease, reason, progress_snapshot)
.await?;
Ok(WorkResult::Cancelled(job, reason.clone()))
}
Ok(JobResult::RetryAfter(delay)) => {
store
.retry_after(&self.pool, job.id, job.run_lease, *delay, progress_snapshot)
.await?;
Ok(WorkResult::Retryable(job))
}
Err(JobError::Retryable(_)) => {
// A retryable error reschedules the job. Canonical leaves it in
// a finalized `retryable` state (not re-claimable by a later
// `work_one`); the store needs an explicit delay, so use a long
// one rather than a near-immediate reschedule that would let the
// same job be re-claimed within a test or accumulate across runs.
store
.retry_after(
&self.pool,
job.id,
job.run_lease,
Duration::from_secs(3600),
progress_snapshot,
)
.await?;
Ok(WorkResult::Retryable(job))
}
Ok(JobResult::Snooze(delay)) => {
store
.snooze(&self.pool, job.id, job.run_lease, *delay, progress_snapshot)
.await?;
Ok(WorkResult::Snoozed(job))
}
Ok(JobResult::WaitForCallback(_)) => {
// The handler registers the callback via the context; park to
// waiting_external when it did, otherwise fail like the
// canonical path.
match self.get_job(job.id).await?.callback_id {
Some(callback_id) => {
let entered = store
.enter_callback_wait(&self.pool, job.id, job.run_lease, callback_id)
.await?;
assert!(
entered,
"enter_callback_wait did not transition job {} to waiting_external",
job.id
);
Ok(WorkResult::WaitingExternal(self.get_job(job.id).await?))
Comment on lines +320 to +330

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🎯 Functional Correctness | 🟠 Major | ⚡ Quick win

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Inspect enter_callback_wait to confirm whether it can persist progress or needs a companion update.
rg -n -C4 'fn enter_callback_wait|enter_callback_wait\(' awa-model/src/queue_storage.rs awa-testing/src/lib.rs

Repository: hardbyte/awa

Length of output: 2039


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Inspect the callback-wait implementation and the canonical waiting_external transition.
sed -n '11355,11410p' awa-model/src/queue_storage.rs
printf '\n---\n'
sed -n '280,340p' awa-testing/src/lib.rs
printf '\n---\n'
rg -n -C4 'waiting_external|progress_snapshot|enter_callback_wait_in_tx|enter_callback_wait\(' awa-model/src/queue_storage.rs awa-testing/src/lib.rs

Repository: hardbyte/awa

Length of output: 24580


Persist progress_snapshot before parking on callback wait. The WaitForCallback branch updates the lease to waiting_external without writing progress, unlike the canonical path that stores progress on this transition. That drops in-flight buffered progress; pass the snapshot through enter_callback_wait or update the row before reloading the job.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@awa-testing/src/lib.rs` around lines 317 - 322, The WaitForCallback path in
the job handling flow updates the lease to waiting_external without persisting
the current progress snapshot, unlike the canonical transition. Update the
callback-wait flow around get_job, enter_callback_wait, and
WorkResult::WaitingExternal so the buffered progress is written before parking
the job, either by passing progress_snapshot through enter_callback_wait or
storing it on the row before reloading the job.

}
None => {
let msg = "WaitForCallback returned without calling register_callback";
store
.fail_terminal(
&self.pool,
job.id,
job.run_lease,
msg,
progress_snapshot,
)
.await?;
Ok(WorkResult::Failed(job, msg.to_string()))
}
}
}
Err(JobError::Terminal(msg)) => {
store
.fail_terminal(&self.pool, job.id, job.run_lease, msg, progress_snapshot)
.await?;
Ok(WorkResult::Failed(job, msg.clone()))
}
}
}

/// Get a job by ID.
pub async fn get_job(&self, job_id: i64) -> Result<JobRow, AwaError> {
awa_model::admin::get_job(&self.pool, job_id).await
Expand Down
Loading