Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@ Notable changes between releases. Detailed migration notes for storage transitio

## [Unreleased]

## [0.6.0-rc.4] — 2026-06-30

### Fixed

- **`admin::cancel_by_unique_key` / `cancel_by_unique_key_tx` on the queue-storage engine** ([#359](https://github.com/hardbyte/awa/pull/359)). The candidate lookup's running-job branch read `unique_key` directly from `{schema}.leases`, but the `leases` table does not carry that column, so any cancel-by-unique-key issued while queue storage was the active engine failed at plan time with `column "unique_key" does not exist`. The running-job branch now recovers the key by joining each lease back to its originating `ready_entries` row — the same lane-identity join the candidate and jobs-compat views already use. Canonical-engine behaviour was unaffected.

## [0.6.0-rc.3] — 2026-06-25

### Changed
Expand Down
18 changes: 9 additions & 9 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 6 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ members = [
exclude = ["awa-python", "spike"]

[workspace.package]
version = "0.6.0-rc.3"
version = "0.6.0-rc.4"
edition = "2021"
license = "MIT OR Apache-2.0"
description = "Postgres-native background job queue — transactional enqueue, heartbeat crash recovery, SKIP LOCKED dispatch"
Expand All @@ -25,11 +25,11 @@ categories = ["database", "asynchronous", "web-programming"]

[workspace.dependencies]
# Internal crates
awa-model = { path = "awa-model", version = "0.6.0-rc.3" }
awa-macros = { path = "awa-macros", version = "0.6.0-rc.3" }
awa-metrics = { path = "awa-metrics", version = "0.6.0-rc.3" }
awa-worker = { path = "awa-worker", version = "0.6.0-rc.3" }
awa = { path = "awa", version = "0.6.0-rc.3" }
awa-model = { path = "awa-model", version = "0.6.0-rc.4" }
awa-macros = { path = "awa-macros", version = "0.6.0-rc.4" }
awa-metrics = { path = "awa-metrics", version = "0.6.0-rc.4" }
awa-worker = { path = "awa-worker", version = "0.6.0-rc.4" }
awa = { path = "awa", version = "0.6.0-rc.4" }

# Database
sea-orm = { version = "=2.0.0-rc.38", default-features = false, features = ["sqlx-postgres", "runtime-tokio-rustls", "with-chrono", "with-json"] }
Expand Down
4 changes: 2 additions & 2 deletions awa-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ path = "src/main.rs"
[dependencies]
awa-model.workspace = true
awa-worker.workspace = true
awa-ui = { path = "../awa-ui", version = "0.6.0-rc.3" }
awa-ui = { path = "../awa-ui", version = "0.6.0-rc.4" }
axum.workspace = true
sqlx.workspace = true
tokio.workspace = true
Expand All @@ -28,5 +28,5 @@ hex.workspace = true
uuid.workspace = true

[dev-dependencies]
awa-testing = { path = "../awa-testing", version = "0.6.0-rc.3" }
awa-testing = { path = "../awa-testing", version = "0.6.0-rc.4" }
assert_cmd = "2"
2 changes: 1 addition & 1 deletion awa-cli/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "maturin"

[project]
name = "awa-cli"
version = "0.6.0-rc.3"
version = "0.6.0-rc.4"
description = "CLI for the Awa Postgres-native job queue (migrations, admin, serve)"
readme = "README.md"
requires-python = ">=3.10"
Expand Down
15 changes: 12 additions & 3 deletions awa-model/src/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -637,9 +637,18 @@ fn unique_key_candidate_sql(schema: &str) -> String {
FROM {schema}.deferred_jobs
WHERE unique_key = $1
UNION ALL
SELECT job_id
FROM {schema}.leases
WHERE unique_key = $1
-- A running job's unique_key is stored on its ready_entries row;
-- reach it through the lease's lane identity.
SELECT leases.job_id
FROM {schema}.leases AS leases
JOIN {schema}.ready_entries AS ready
ON ready.ready_slot = leases.ready_slot
AND ready.ready_generation = leases.ready_generation
AND ready.queue = leases.queue
AND ready.priority = leases.priority
AND ready.enqueue_shard = leases.enqueue_shard
AND ready.lane_seq = leases.lane_seq
WHERE ready.unique_key = $1
)
SELECT job_id
FROM candidates
Expand Down
6 changes: 3 additions & 3 deletions awa-python/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "awa-python"
version = "0.6.0-rc.3"
version = "0.6.0-rc.4"
edition = "2021"

[lib]
Expand All @@ -12,8 +12,8 @@ default = ["cel"]
cel = ["awa-model/cel"]

[dependencies]
awa-model = { path = "../awa-model", version = "0.6.0-rc.3" }
awa-worker = { path = "../awa-worker", version = "0.6.0-rc.3", features = ["__python-bridge"] }
awa-model = { path = "../awa-model", version = "0.6.0-rc.4" }
awa-worker = { path = "../awa-worker", version = "0.6.0-rc.4", features = ["__python-bridge"] }
pyo3 = { version = "0.28", features = ["macros", "abi3-py310", "chrono"] }
pyo3-async-runtimes = { version = "0.28", features = ["tokio-runtime"] }
sqlx = { version = "0.8", features = ["runtime-tokio-rustls", "postgres", "chrono", "json"] }
Expand Down
4 changes: 2 additions & 2 deletions awa-python/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ build-backend = "maturin"
[project]
name = "awa-pg"
requires-python = ">=3.10"
version = "0.6.0-rc.3"
version = "0.6.0-rc.4"
description = "Postgres-native background job queue — Python SDK with async/sync workers, transactional enqueue, and progress tracking. Install awa-pg[ui] to add the bundled web dashboard."
readme = "README.md"
license = {text = "MIT OR Apache-2.0"}
Expand All @@ -32,7 +32,7 @@ classifiers = [
#
# Kept opt-in so the default `awa-pg` install stays small — workers and
# producers don't need the ~10 MB UI bundle.
ui = ["awa-cli==0.6.0-rc.3"]
ui = ["awa-cli==0.6.0-rc.4"]

[project.urls]
Homepage = "https://github.com/hardbyte/awa"
Expand Down
2 changes: 1 addition & 1 deletion awa-seaorm/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ serde_json.workspace = true
sqlx.workspace = true

[dev-dependencies]
awa-testing = { path = "../awa-testing", version = "0.6.0-rc.3" }
awa-testing = { path = "../awa-testing", version = "0.6.0-rc.4" }
serde = { workspace = true, features = ["derive"] }
tokio = { workspace = true, features = ["macros", "rt-multi-thread"] }

2 changes: 1 addition & 1 deletion awa/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ awa-macros.workspace = true
awa-worker.workspace = true

[dev-dependencies]
awa-testing = { path = "../awa-testing", version = "0.6.0-rc.3" }
awa-testing = { path = "../awa-testing", version = "0.6.0-rc.4" }
sqlx.workspace = true
serde.workspace = true
serde_json.workspace = true
Expand Down
82 changes: 82 additions & 0 deletions awa/tests/integration_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1249,6 +1249,88 @@ async fn test_admin_cancel_by_unique_key_mismatched_queue_returns_none() {
);
}

/// Force the engine onto the queue-storage substrate the way a fresh install
/// auto-finalizes onto it — the inverse of `awa_testing::reset_runtime_backend`.
/// The substrate already exists in the `awa` schema (installed by the
/// migrations); flipping the transition state and registering the backend is
/// enough for the admin paths to route there.
async fn activate_queue_storage(pool: &sqlx::PgPool) {
let mut tx = pool.begin().await.unwrap();
sqlx::query(
r#"
UPDATE awa.storage_transition_state
SET state = 'active',
current_engine = 'queue_storage',
prepared_engine = NULL,
details = jsonb_build_object('schema', 'awa'),
transition_epoch = transition_epoch + 1,
updated_at = now(),
finalized_at = now()
WHERE singleton
"#,
)
.execute(&mut *tx)
.await
.expect("activate queue-storage transition state");
sqlx::query(
r#"
INSERT INTO awa.runtime_storage_backends (backend, schema_name, updated_at)
VALUES ('queue_storage', 'awa', now())
ON CONFLICT (backend) DO UPDATE
SET schema_name = EXCLUDED.schema_name, updated_at = EXCLUDED.updated_at
"#,
)
.execute(&mut *tx)
.await
.expect("register queue-storage backend");
tx.commit().await.unwrap();
}

/// `cancel_by_unique_key` and its `_tx` variant must run against the
/// queue-storage substrate, not only the canonical tables. The queue-storage
/// candidate query spans the available, deferred, and running-job lookups (the
/// running one reaches `unique_key` through a `leases → ready_entries` join), so
/// it must execute and return `Ok(None)` when nothing matches the key.
#[tokio::test]
async fn test_admin_cancel_by_unique_key_queue_storage_active() {
let _guard = test_lock().lock().await;
let client = setup().await;
activate_queue_storage(client.pool()).await;

let pool_result = admin::cancel_by_unique_key(
client.pool(),
SendEmail::kind(),
Some("integ_cancel_by_unique_key_qs"),
None,
None,
)
.await;

let mut tx = client.pool().begin().await.unwrap();
let tx_result = admin::cancel_by_unique_key_tx(
&mut tx,
SendEmail::kind(),
Some("integ_cancel_by_unique_key_qs"),
None,
None,
)
.await;
tx.rollback().await.unwrap();

// Restore the canonical engine before asserting so a failed assertion can't
// leak queue-storage state into other tests sharing the database.
awa_testing::setup::reset_runtime_backend(client.pool()).await;

assert!(
matches!(pool_result, Ok(None)),
"queue-storage cancel_by_unique_key must not error when nothing matches; got {pool_result:?}"
);
assert!(
matches!(tx_result, Ok(None)),
"queue-storage cancel_by_unique_key_tx must not error when nothing matches; got {tx_result:?}"
);
}

#[tokio::test]
async fn test_admin_pause_resume_queue() {
let _guard = test_lock().lock().await;
Expand Down