diff --git a/CHANGELOG.md b/CHANGELOG.md index c43d8000..f80860be 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,12 @@ Notable changes between releases. Detailed migration notes for storage transitio ## [Unreleased] +## [0.6.0-rc.4] — 2026-06-30 + +### Fixed + +- **`admin::cancel_by_unique_key` / `cancel_by_unique_key_tx` on the queue-storage engine** ([#359](https://github.com/hardbyte/awa/pull/359)). The candidate lookup's running-job branch read `unique_key` directly from `{schema}.leases`, but the `leases` table does not carry that column, so any cancel-by-unique-key issued while queue storage was the active engine failed at plan time with `column "unique_key" does not exist`. The running-job branch now recovers the key by joining each lease back to its originating `ready_entries` row — the same lane-identity join the candidate and jobs-compat views already use. Canonical-engine behaviour was unaffected. + ## [0.6.0-rc.3] — 2026-06-25 ### Changed diff --git a/Cargo.lock b/Cargo.lock index 849d5c58..b0b9c121 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -383,7 +383,7 @@ checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8" [[package]] name = "awa" -version = "0.6.0-rc.3" +version = "0.6.0-rc.4" dependencies = [ "async-trait", "awa-macros", @@ -408,7 +408,7 @@ dependencies = [ [[package]] name = "awa-cli" -version = "0.6.0-rc.3" +version = "0.6.0-rc.4" dependencies = [ "assert_cmd", "awa-model", @@ -430,7 +430,7 @@ dependencies = [ [[package]] name = "awa-macros" -version = "0.6.0-rc.3" +version = "0.6.0-rc.4" dependencies = [ "proc-macro-crate", "proc-macro2", @@ -440,7 +440,7 @@ dependencies = [ [[package]] name = "awa-metrics" -version = "0.6.0-rc.3" +version = "0.6.0-rc.4" dependencies = [ "awa-model", "opentelemetry", @@ -448,7 +448,7 @@ dependencies = [ [[package]] name = "awa-model" -version = "0.6.0-rc.3" +version = "0.6.0-rc.4" dependencies = [ "awa-macros", "blake3", @@ -469,7 +469,7 @@ dependencies = [ [[package]] name = "awa-seaorm" -version = "0.6.0-rc.3" +version = "0.6.0-rc.4" dependencies = [ "awa", "awa-testing", @@ -482,7 +482,7 @@ dependencies = [ [[package]] name = "awa-testing" -version = "0.6.0-rc.3" +version = "0.6.0-rc.4" dependencies = [ "async-trait", "awa-model", @@ -497,7 +497,7 @@ dependencies = [ [[package]] name = "awa-ui" -version = "0.6.0-rc.3" +version = "0.6.0-rc.4" dependencies = [ "awa-metrics", "awa-model", @@ -520,7 +520,7 @@ dependencies = [ [[package]] name = "awa-worker" -version = "0.6.0-rc.3" +version = "0.6.0-rc.4" dependencies = [ "anyhow", "async-trait", diff --git a/Cargo.toml b/Cargo.toml index 8d7fc44a..3ee97905 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,7 +14,7 @@ members = [ exclude = ["awa-python", "spike"] [workspace.package] -version = "0.6.0-rc.3" +version = "0.6.0-rc.4" edition = "2021" license = "MIT OR Apache-2.0" description = "Postgres-native background job queue — transactional enqueue, heartbeat crash recovery, SKIP LOCKED dispatch" @@ -25,11 +25,11 @@ categories = ["database", "asynchronous", "web-programming"] [workspace.dependencies] # Internal crates -awa-model = { path = "awa-model", version = "0.6.0-rc.3" } -awa-macros = { path = "awa-macros", version = "0.6.0-rc.3" } -awa-metrics = { path = "awa-metrics", version = "0.6.0-rc.3" } -awa-worker = { path = "awa-worker", version = "0.6.0-rc.3" } -awa = { path = "awa", version = "0.6.0-rc.3" } +awa-model = { path = "awa-model", version = "0.6.0-rc.4" } +awa-macros = { path = "awa-macros", version = "0.6.0-rc.4" } +awa-metrics = { path = "awa-metrics", version = "0.6.0-rc.4" } +awa-worker = { path = "awa-worker", version = "0.6.0-rc.4" } +awa = { path = "awa", version = "0.6.0-rc.4" } # Database sea-orm = { version = "=2.0.0-rc.38", default-features = false, features = ["sqlx-postgres", "runtime-tokio-rustls", "with-chrono", "with-json"] } diff --git a/awa-cli/Cargo.toml b/awa-cli/Cargo.toml index 58f9cd13..4a733d70 100644 --- a/awa-cli/Cargo.toml +++ b/awa-cli/Cargo.toml @@ -14,7 +14,7 @@ path = "src/main.rs" [dependencies] awa-model.workspace = true awa-worker.workspace = true -awa-ui = { path = "../awa-ui", version = "0.6.0-rc.3" } +awa-ui = { path = "../awa-ui", version = "0.6.0-rc.4" } axum.workspace = true sqlx.workspace = true tokio.workspace = true @@ -28,5 +28,5 @@ hex.workspace = true uuid.workspace = true [dev-dependencies] -awa-testing = { path = "../awa-testing", version = "0.6.0-rc.3" } +awa-testing = { path = "../awa-testing", version = "0.6.0-rc.4" } assert_cmd = "2" diff --git a/awa-cli/pyproject.toml b/awa-cli/pyproject.toml index 0c2166e5..0aa31b2b 100644 --- a/awa-cli/pyproject.toml +++ b/awa-cli/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "maturin" [project] name = "awa-cli" -version = "0.6.0-rc.3" +version = "0.6.0-rc.4" description = "CLI for the Awa Postgres-native job queue (migrations, admin, serve)" readme = "README.md" requires-python = ">=3.10" diff --git a/awa-model/src/admin.rs b/awa-model/src/admin.rs index 5d811f5b..3912f4d2 100644 --- a/awa-model/src/admin.rs +++ b/awa-model/src/admin.rs @@ -637,9 +637,18 @@ fn unique_key_candidate_sql(schema: &str) -> String { FROM {schema}.deferred_jobs WHERE unique_key = $1 UNION ALL - SELECT job_id - FROM {schema}.leases - WHERE unique_key = $1 + -- A running job's unique_key is stored on its ready_entries row; + -- reach it through the lease's lane identity. + SELECT leases.job_id + FROM {schema}.leases AS leases + JOIN {schema}.ready_entries AS ready + ON ready.ready_slot = leases.ready_slot + AND ready.ready_generation = leases.ready_generation + AND ready.queue = leases.queue + AND ready.priority = leases.priority + AND ready.enqueue_shard = leases.enqueue_shard + AND ready.lane_seq = leases.lane_seq + WHERE ready.unique_key = $1 ) SELECT job_id FROM candidates diff --git a/awa-python/Cargo.toml b/awa-python/Cargo.toml index 39b187f6..86007b4f 100644 --- a/awa-python/Cargo.toml +++ b/awa-python/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "awa-python" -version = "0.6.0-rc.3" +version = "0.6.0-rc.4" edition = "2021" [lib] @@ -12,8 +12,8 @@ default = ["cel"] cel = ["awa-model/cel"] [dependencies] -awa-model = { path = "../awa-model", version = "0.6.0-rc.3" } -awa-worker = { path = "../awa-worker", version = "0.6.0-rc.3", features = ["__python-bridge"] } +awa-model = { path = "../awa-model", version = "0.6.0-rc.4" } +awa-worker = { path = "../awa-worker", version = "0.6.0-rc.4", features = ["__python-bridge"] } pyo3 = { version = "0.28", features = ["macros", "abi3-py310", "chrono"] } pyo3-async-runtimes = { version = "0.28", features = ["tokio-runtime"] } sqlx = { version = "0.8", features = ["runtime-tokio-rustls", "postgres", "chrono", "json"] } diff --git a/awa-python/pyproject.toml b/awa-python/pyproject.toml index 93ccbfec..bfb9f59f 100644 --- a/awa-python/pyproject.toml +++ b/awa-python/pyproject.toml @@ -5,7 +5,7 @@ build-backend = "maturin" [project] name = "awa-pg" requires-python = ">=3.10" -version = "0.6.0-rc.3" +version = "0.6.0-rc.4" description = "Postgres-native background job queue — Python SDK with async/sync workers, transactional enqueue, and progress tracking. Install awa-pg[ui] to add the bundled web dashboard." readme = "README.md" license = {text = "MIT OR Apache-2.0"} @@ -32,7 +32,7 @@ classifiers = [ # # Kept opt-in so the default `awa-pg` install stays small — workers and # producers don't need the ~10 MB UI bundle. -ui = ["awa-cli==0.6.0-rc.3"] +ui = ["awa-cli==0.6.0-rc.4"] [project.urls] Homepage = "https://github.com/hardbyte/awa" diff --git a/awa-seaorm/Cargo.toml b/awa-seaorm/Cargo.toml index 7a601219..e2f9fcb7 100644 --- a/awa-seaorm/Cargo.toml +++ b/awa-seaorm/Cargo.toml @@ -14,7 +14,7 @@ serde_json.workspace = true sqlx.workspace = true [dev-dependencies] -awa-testing = { path = "../awa-testing", version = "0.6.0-rc.3" } +awa-testing = { path = "../awa-testing", version = "0.6.0-rc.4" } serde = { workspace = true, features = ["derive"] } tokio = { workspace = true, features = ["macros", "rt-multi-thread"] } diff --git a/awa/Cargo.toml b/awa/Cargo.toml index 5d966e81..e6b037e2 100644 --- a/awa/Cargo.toml +++ b/awa/Cargo.toml @@ -20,7 +20,7 @@ awa-macros.workspace = true awa-worker.workspace = true [dev-dependencies] -awa-testing = { path = "../awa-testing", version = "0.6.0-rc.3" } +awa-testing = { path = "../awa-testing", version = "0.6.0-rc.4" } sqlx.workspace = true serde.workspace = true serde_json.workspace = true diff --git a/awa/tests/integration_test.rs b/awa/tests/integration_test.rs index 892117fd..3e46c389 100644 --- a/awa/tests/integration_test.rs +++ b/awa/tests/integration_test.rs @@ -1249,6 +1249,88 @@ async fn test_admin_cancel_by_unique_key_mismatched_queue_returns_none() { ); } +/// Force the engine onto the queue-storage substrate the way a fresh install +/// auto-finalizes onto it — the inverse of `awa_testing::reset_runtime_backend`. +/// The substrate already exists in the `awa` schema (installed by the +/// migrations); flipping the transition state and registering the backend is +/// enough for the admin paths to route there. +async fn activate_queue_storage(pool: &sqlx::PgPool) { + let mut tx = pool.begin().await.unwrap(); + sqlx::query( + r#" + UPDATE awa.storage_transition_state + SET state = 'active', + current_engine = 'queue_storage', + prepared_engine = NULL, + details = jsonb_build_object('schema', 'awa'), + transition_epoch = transition_epoch + 1, + updated_at = now(), + finalized_at = now() + WHERE singleton + "#, + ) + .execute(&mut *tx) + .await + .expect("activate queue-storage transition state"); + sqlx::query( + r#" + INSERT INTO awa.runtime_storage_backends (backend, schema_name, updated_at) + VALUES ('queue_storage', 'awa', now()) + ON CONFLICT (backend) DO UPDATE + SET schema_name = EXCLUDED.schema_name, updated_at = EXCLUDED.updated_at + "#, + ) + .execute(&mut *tx) + .await + .expect("register queue-storage backend"); + tx.commit().await.unwrap(); +} + +/// `cancel_by_unique_key` and its `_tx` variant must run against the +/// queue-storage substrate, not only the canonical tables. The queue-storage +/// candidate query spans the available, deferred, and running-job lookups (the +/// running one reaches `unique_key` through a `leases → ready_entries` join), so +/// it must execute and return `Ok(None)` when nothing matches the key. +#[tokio::test] +async fn test_admin_cancel_by_unique_key_queue_storage_active() { + let _guard = test_lock().lock().await; + let client = setup().await; + activate_queue_storage(client.pool()).await; + + let pool_result = admin::cancel_by_unique_key( + client.pool(), + SendEmail::kind(), + Some("integ_cancel_by_unique_key_qs"), + None, + None, + ) + .await; + + let mut tx = client.pool().begin().await.unwrap(); + let tx_result = admin::cancel_by_unique_key_tx( + &mut tx, + SendEmail::kind(), + Some("integ_cancel_by_unique_key_qs"), + None, + None, + ) + .await; + tx.rollback().await.unwrap(); + + // Restore the canonical engine before asserting so a failed assertion can't + // leak queue-storage state into other tests sharing the database. + awa_testing::setup::reset_runtime_backend(client.pool()).await; + + assert!( + matches!(pool_result, Ok(None)), + "queue-storage cancel_by_unique_key must not error when nothing matches; got {pool_result:?}" + ); + assert!( + matches!(tx_result, Ok(None)), + "queue-storage cancel_by_unique_key_tx must not error when nothing matches; got {tx_result:?}" + ); +} + #[tokio::test] async fn test_admin_pause_resume_queue() { let _guard = test_lock().lock().await;