Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@ Notable changes between releases. Detailed migration notes for storage transitio

## [Unreleased]

## [0.6.0-rc.3] — 2026-06-25

### Changed

- **`admin::cancel_tx` / `admin::cancel_by_unique_key_tx` now take `&mut PgConnection`** ([#358](https://github.com/hardbyte/awa/pull/358)) instead of the `&mut sqlx::Transaction` introduced in rc.2. A `&mut Transaction` deref-coerces to `&mut PgConnection`, so existing callers are unaffected, while callers that only hold a connection mid-transaction (e.g. behind a `transact!`-style helper that derefs the transaction) can now use them too. This matches awa's existing multi-statement consumer convention (`insert_many_copy`). Internally the cancel runs in a nested transaction — a `SAVEPOINT` when the connection is already in one — so a cancel error rolls back only the cancel rather than poisoning the caller's transaction.

## [0.6.0-rc.2] — 2026-06-25

### Added
Expand Down
18 changes: 9 additions & 9 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 6 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ members = [
exclude = ["awa-python", "spike"]

[workspace.package]
version = "0.6.0-rc.2"
version = "0.6.0-rc.3"
edition = "2021"
license = "MIT OR Apache-2.0"
description = "Postgres-native background job queue — transactional enqueue, heartbeat crash recovery, SKIP LOCKED dispatch"
Expand All @@ -25,11 +25,11 @@ categories = ["database", "asynchronous", "web-programming"]

[workspace.dependencies]
# Internal crates
awa-model = { path = "awa-model", version = "0.6.0-rc.2" }
awa-macros = { path = "awa-macros", version = "0.6.0-rc.2" }
awa-metrics = { path = "awa-metrics", version = "0.6.0-rc.2" }
awa-worker = { path = "awa-worker", version = "0.6.0-rc.2" }
awa = { path = "awa", version = "0.6.0-rc.2" }
awa-model = { path = "awa-model", version = "0.6.0-rc.3" }
awa-macros = { path = "awa-macros", version = "0.6.0-rc.3" }
awa-metrics = { path = "awa-metrics", version = "0.6.0-rc.3" }
awa-worker = { path = "awa-worker", version = "0.6.0-rc.3" }
awa = { path = "awa", version = "0.6.0-rc.3" }

# Database
sea-orm = { version = "=2.0.0-rc.38", default-features = false, features = ["sqlx-postgres", "runtime-tokio-rustls", "with-chrono", "with-json"] }
Expand Down
4 changes: 2 additions & 2 deletions awa-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ path = "src/main.rs"
[dependencies]
awa-model.workspace = true
awa-worker.workspace = true
awa-ui = { path = "../awa-ui", version = "0.6.0-rc.2" }
awa-ui = { path = "../awa-ui", version = "0.6.0-rc.3" }
axum.workspace = true
sqlx.workspace = true
tokio.workspace = true
Expand All @@ -28,5 +28,5 @@ hex.workspace = true
uuid.workspace = true

[dev-dependencies]
awa-testing = { path = "../awa-testing", version = "0.6.0-rc.2" }
awa-testing = { path = "../awa-testing", version = "0.6.0-rc.3" }
assert_cmd = "2"
2 changes: 1 addition & 1 deletion awa-cli/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "maturin"

[project]
name = "awa-cli"
version = "0.6.0-rc.2"
version = "0.6.0-rc.3"
description = "CLI for the Awa Postgres-native job queue (migrations, admin, serve)"
readme = "README.md"
requires-python = ">=3.10"
Expand Down
76 changes: 62 additions & 14 deletions awa-model/src/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::queue_storage::QueueStorage;
use chrono::{DateTime, Duration, Utc};
use serde::{Deserialize, Serialize};
use sqlx::types::Json;
use sqlx::Acquire;
use sqlx::PgExecutor;
use sqlx::PgPool;
use std::cmp::max;
Expand Down Expand Up @@ -517,19 +518,45 @@ pub async fn cancel(pool: &PgPool, job_id: i64) -> Result<Option<JobRow>, AwaErr
Ok(Some(job))
}

/// Transaction-scoped variant of [`cancel`].
/// Transaction-participating variant of [`cancel`].
///
/// Runs the cancellation inside the caller's transaction `tx` instead of opening
/// its own, so the cancel commits (or rolls back) atomically with the caller's
/// other work, and the cooperative `awa:cancel` NOTIFY to a running worker fires
/// only when the caller commits.
/// Runs the cancellation on the caller-provided connection `conn` rather than
/// acquiring its own from a pool. When `conn` is already inside a transaction —
/// the common case, and a `&mut Transaction` deref-coerces to `&mut PgConnection`
/// — the cancel commits or rolls back atomically with the caller's other work,
/// and the cooperative `awa:cancel` NOTIFY to a running worker fires only when the
/// caller commits. When `conn` is a standalone pooled connection the cancel runs
/// in its own transaction.
///
/// Internally the cancel runs in a nested transaction — a `SAVEPOINT` when `conn`
/// is already in a transaction — so a cancel error rolls back only the cancel
/// rather than poisoning the caller's transaction.
///
/// On the queue-storage engine this skips the best-effort claim-cursor advance
/// that [`cancel`] performs after its own commit (it cannot run inside the
/// caller's transaction); the derived queue depth may briefly over-count by one
/// until later committed rows on the lane are claimed. Counts are unaffected on
/// the canonical engine.
pub async fn cancel_tx<'a>(
pub async fn cancel_tx(
conn: &mut sqlx::PgConnection,
job_id: i64,
) -> Result<Option<JobRow>, AwaError> {
let mut tx = conn.begin().await?;
match cancel_in_tx(&mut tx, job_id).await {
Ok(row) => {
tx.commit().await?;
Ok(row)
}
Err(err) => {
tx.rollback().await.ok();
Err(err)
}
}
}

/// Inner cancellation logic, run on an existing transaction. Shared by
/// [`cancel_tx`] and [`cancel_by_unique_key_tx`].
async fn cancel_in_tx<'a>(
tx: &mut sqlx::Transaction<'a, sqlx::Postgres>,
job_id: i64,
) -> Result<Option<JobRow>, AwaError> {
Expand Down Expand Up @@ -711,13 +738,34 @@ pub async fn cancel_by_unique_key(
}
}

/// Transaction-scoped variant of [`cancel_by_unique_key`].
/// Transaction-participating variant of [`cancel_by_unique_key`].
///
/// Resolves the candidate and cancels it inside the caller's transaction `tx`,
/// so the cancel is atomic with the caller's other work (and the `awa:cancel`
/// NOTIFY fires only on the caller's commit). See [`cancel_tx`] for the
/// queue-storage claim-cursor caveat.
pub async fn cancel_by_unique_key_tx<'a>(
/// Resolves the candidate and cancels it on the caller-provided connection
/// `conn`, so the cancel is atomic with the caller's other work (and the
/// `awa:cancel` NOTIFY fires only on the caller's commit). A `&mut Transaction`
/// deref-coerces to `&mut PgConnection`. See [`cancel_tx`] for the
/// nested-transaction and queue-storage claim-cursor caveats.
pub async fn cancel_by_unique_key_tx(
conn: &mut sqlx::PgConnection,
kind: &str,
queue: Option<&str>,
args: Option<&serde_json::Value>,
period_bucket: Option<i64>,
) -> Result<Option<JobRow>, AwaError> {
let mut tx = conn.begin().await?;
match cancel_by_unique_key_in_tx(&mut tx, kind, queue, args, period_bucket).await {
Ok(row) => {
tx.commit().await?;
Ok(row)
}
Err(err) => {
tx.rollback().await.ok();
Err(err)
}
}
}

async fn cancel_by_unique_key_in_tx<'a>(
tx: &mut sqlx::Transaction<'a, sqlx::Postgres>,
kind: &str,
queue: Option<&str>,
Expand All @@ -734,7 +782,7 @@ pub async fn cancel_by_unique_key_tx<'a>(
.await?;

return match candidate {
Some(job_id) => cancel_tx(tx, job_id).await,
Some(job_id) => cancel_in_tx(tx, job_id).await,
None => Ok(None),
};
}
Expand All @@ -745,7 +793,7 @@ pub async fn cancel_by_unique_key_tx<'a>(
.await?;

match candidate {
Some(job_id) => match cancel_tx(tx, job_id).await {
Some(job_id) => match cancel_in_tx(tx, job_id).await {
Ok(row) => Ok(row),
Err(AwaError::JobNotFound { .. }) => Ok(None),
Err(err) => Err(err),
Expand Down
6 changes: 3 additions & 3 deletions awa-python/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "awa-python"
version = "0.6.0-rc.2"
version = "0.6.0-rc.3"
edition = "2021"

[lib]
Expand All @@ -12,8 +12,8 @@ default = ["cel"]
cel = ["awa-model/cel"]

[dependencies]
awa-model = { path = "../awa-model", version = "0.6.0-rc.2" }
awa-worker = { path = "../awa-worker", version = "0.6.0-rc.2", features = ["__python-bridge"] }
awa-model = { path = "../awa-model", version = "0.6.0-rc.3" }
awa-worker = { path = "../awa-worker", version = "0.6.0-rc.3", features = ["__python-bridge"] }
pyo3 = { version = "0.28", features = ["macros", "abi3-py310", "chrono"] }
pyo3-async-runtimes = { version = "0.28", features = ["tokio-runtime"] }
sqlx = { version = "0.8", features = ["runtime-tokio-rustls", "postgres", "chrono", "json"] }
Expand Down
4 changes: 2 additions & 2 deletions awa-python/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ build-backend = "maturin"
[project]
name = "awa-pg"
requires-python = ">=3.10"
version = "0.6.0-rc.2"
version = "0.6.0-rc.3"
description = "Postgres-native background job queue — Python SDK with async/sync workers, transactional enqueue, and progress tracking. Install awa-pg[ui] to add the bundled web dashboard."
readme = "README.md"
license = {text = "MIT OR Apache-2.0"}
Expand All @@ -32,7 +32,7 @@ classifiers = [
#
# Kept opt-in so the default `awa-pg` install stays small — workers and
# producers don't need the ~10 MB UI bundle.
ui = ["awa-cli==0.6.0-rc.2"]
ui = ["awa-cli==0.6.0-rc.3"]

[project.urls]
Homepage = "https://github.com/hardbyte/awa"
Expand Down
2 changes: 1 addition & 1 deletion awa-seaorm/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ serde_json.workspace = true
sqlx.workspace = true

[dev-dependencies]
awa-testing = { path = "../awa-testing", version = "0.6.0-rc.2" }
awa-testing = { path = "../awa-testing", version = "0.6.0-rc.3" }
serde = { workspace = true, features = ["derive"] }
tokio = { workspace = true, features = ["macros", "rt-multi-thread"] }

2 changes: 1 addition & 1 deletion awa/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ awa-macros.workspace = true
awa-worker.workspace = true

[dev-dependencies]
awa-testing = { path = "../awa-testing", version = "0.6.0-rc.2" }
awa-testing = { path = "../awa-testing", version = "0.6.0-rc.3" }
sqlx.workspace = true
serde.workspace = true
serde_json.workspace = true
Expand Down
10 changes: 7 additions & 3 deletions awa/tests/integration_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -950,7 +950,8 @@ async fn test_admin_cancel_by_unique_key_tx_is_transactional() {
.unwrap();
let args_value = serde_json::to_value(&args).unwrap();

// Rolled-back transaction: the cancel resolves the job but must NOT persist.
// Rolled-back transaction, passing a `&mut Transaction` (which deref-coerces
// to `&mut PgConnection`): the cancel resolves the job but must NOT persist.
let mut tx = client.pool().begin().await.unwrap();
let cancelled = admin::cancel_by_unique_key_tx(
&mut tx,
Expand All @@ -971,10 +972,13 @@ async fn test_admin_cancel_by_unique_key_tx_is_transactional() {
"a rolled-back cancel_by_unique_key_tx must leave the job uncancelled"
);

// Committed transaction: the cancel takes effect.
// Committed transaction, passing an explicit `&mut PgConnection` — exactly
// how a `transact!`-style helper hands a connection (not a `Transaction`) to
// its body.
let mut tx = client.pool().begin().await.unwrap();
let conn: &mut sqlx::PgConnection = &mut tx;
let cancelled = admin::cancel_by_unique_key_tx(
&mut tx,
conn,
SendEmail::kind(),
Some(queue),
Some(&args_value),
Expand Down