diff --git a/clients/go/README.md b/clients/go/README.md index 894108a4..ba9ec7c9 100644 --- a/clients/go/README.md +++ b/clients/go/README.md @@ -184,6 +184,12 @@ TypeScript client (`PgqueQueueNotFoundError`, `PgqueConsumerNotFoundError`, `PgqueSqlError`). Go uses the standard acronym-uppercase convention (`SQLError` rather than `SqlError`). +## Transactions + +`Send` → ticker → `Receive` must each run in its own committed transaction (PgQue is snapshot-based). `pgxpool` satisfies this transparently — every `Send`/`Receive`/`Ack` is its own implicit tx, and the `Consumer` is pool-level. + +The footgun is `Client.Pool()`: calling `pgque.send` inside your own `pgx.Tx` is fine for transactional enqueue, but the consumer must run after `tx.Commit()`. Don't wrap `pgque.send` and `pgque.receive` in one shared `pgx.Tx`; same for `pgque.maint_retry_events` + `pgque.ticker`. See [snapshot rule](https://github.com/NikolayS/pgque/blob/main/docs/pgq-concepts.md#snapshot-rule). + ## Tests The integration tests require a running PostgreSQL with the PgQue schema diff --git a/clients/python/README.md b/clients/python/README.md index 20afcf77..96fff37d 100644 --- a/clients/python/README.md +++ b/clients/python/README.md @@ -109,6 +109,13 @@ client.conn.commit() `client.force_tick(queue)` remains as a deprecated compatibility alias. +## Transactions + +`send` → ticker → `receive` must each run in its own committed transaction (PgQue is snapshot-based). `pgque.connect(dsn)` is non-autocommit by default — commit between produce and consumer. The `Consumer` is autocommit + explicit `conn.transaction()` around `receive + dispatch + ack`. + +Don't wrap `send` and `receive` in one explicit tx; same for `maint_retry_events` + `ticker`. See [snapshot rule](https://github.com/NikolayS/pgque/blob/main/docs/pgq-concepts.md#snapshot-rule). + + ## Tests Integration tests require a running PostgreSQL with the PgQue schema diff --git a/clients/typescript/README.md b/clients/typescript/README.md index feedcf6b..739a0a16 100644 --- a/clients/typescript/README.md +++ b/clients/typescript/README.md @@ -121,6 +121,12 @@ only on pgque's internal pool via a per-pool `CustomTypesConfig` — it does **not** touch the process-global `pg-types` table. Other `pg.Pool` or `pg.Client` instances in the same process are unaffected. +## Transactions + +`send` → ticker → `receive` must each run in its own committed transaction (PgQue is snapshot-based). `pg.Pool#query` satisfies this transparently — every `send`/`receive`/`ack` is its own implicit tx, and the `Consumer` is pool-level. + +The footgun is `client.rawPool`: for transactional enqueue, call `BEGIN` / `pgque.send` / `COMMIT` on a checked-out client. Don't mix `pgque.send` and `pgque.receive` in one shared tx; same for `pgque.maint_retry_events` + `pgque.ticker`. See [snapshot rule](https://github.com/NikolayS/pgque/blob/main/docs/pgq-concepts.md#snapshot-rule). + ## Tests The repository standardizes on Bun for TypeScript client development and CI diff --git a/docs/examples.md b/docs/examples.md index d477383f..01ea8b63 100644 --- a/docs/examples.md +++ b/docs/examples.md @@ -40,10 +40,10 @@ select pgque.subscribe('orders', 'analytics_pipeline'); -- send / force_next_tick / ticker / receive are separate transactions in psql -- autocommit. Do not wrap them in begin/commit — the snapshot rule applies. select pgque.send('orders', 'order.created', '{"order_id": 1}'::jsonb); -select pgque.force_next_tick('orders'); -select pgque.ticker(); +select pgque.force_next_tick('orders'); -- separate transaction +select pgque.ticker(); -- separate transaction -select * from pgque.receive('orders', 'audit_logger', 100); +select * from pgque.receive('orders', 'audit_logger', 100); -- separate transaction select * from pgque.receive('orders', 'notification_sender', 100); ``` @@ -69,6 +69,20 @@ commit; The `inserted` CTE runs to completion even though the main query does not reference it (data-modifying CTEs always execute). Every row in `msgs` shares the same `batch_id`, so the scalar subquery picks any one of them and `pgque.ack` runs exactly once. **Batch-ownership caveat:** `pgque.ack(batch_id)` advances the consumer past the entire underlying batch, even if `receive()` returned fewer rows than the batch contains (due to `max_return`). Either consume the full batch before acking, or use `max_return >= ticker_max_count` (default 500) to ensure all rows are returned. +> **Anti-pattern: send + receive in one transaction.** Above merges `receive` + writes + `ack` into one tx — correct. Do **not** also merge `send` / `force_next_tick` / `ticker` into the same tx; the ticker's snapshot must be taken *after* `send` commits. +> +> ```sql +> -- WRONG -- consumer sees 0 rows +> begin; +> select pgque.send('orders', 'order.created', '{"id": 1}'::jsonb); +> select pgque.force_next_tick('orders'); +> select pgque.ticker(); +> select * from pgque.receive('orders', 'processor', 100); -- 0 rows +> commit; +> ``` +> +> See [pgq-concepts.md#snapshot-rule](pgq-concepts.md#snapshot-rule). + ## Recurring jobs with pg_cron ```sql diff --git a/docs/pgq-concepts.md b/docs/pgq-concepts.md index 81e89268..863b87e1 100644 --- a/docs/pgq-concepts.md +++ b/docs/pgq-concepts.md @@ -72,6 +72,43 @@ below; the function auto-prefixes `queue_` internally. — Kreen & Pihlak, PgCon 2009 +## Snapshot rule + +PgQue is snapshot-based, not row-claiming. The ticker records the +PostgreSQL snapshot it sees; `pgque.receive` only returns events whose +insert committed **before** that snapshot. Consequence: the following +operation chains MUST run in distinct, committed transactions — +combining any chain in one explicit `begin`/`commit` block silently +produces empty batches and dropped messages. + +- **Producer → consumer.** `pgque.send` (or `pgque.insert_event`) → + `pgque.ticker` (or `pgque.force_tick` + `pgque.ticker`) → + `pgque.receive` (or `pgque.next_batch`). +- **Retry pump.** `pgque.maint_retry_events` (re-inserts retry rows + into event tables with `pg_current_xact_id()`) → `pgque.ticker` + (must run in a later tx so the new `ev_txid`s are visible in its + snapshot) → `pgque.receive`. +- **Rotation.** `pgque.maint_rotate_tables_step1` → + `pgque.maint_rotate_tables_step2` (PgQ design requirement). + +By contrast, `receive → process → ack` belongs in **one** transaction +when you want exactly-once effects on the same database (see the +[transactional pattern](examples.md#exactly-once-processing-transactional-pattern)). +The asymmetry: producer-to-consumer flow needs commit boundaries between +steps; consume-to-side-effect flow needs them merged. + +For the shipped clients: Go (`pgxpool`) and TypeScript (`pg.Pool`) run +each call in its own implicit transaction, so the rule is satisfied +transparently. The Python client requires care — `pgque.connect(dsn)` +is **not** autocommit by default, so producers must commit explicitly +between `send` and the consumer side; the high-level Python `Consumer` +already handles this internally (autocommit + an explicit +`conn.transaction()` around `receive + dispatch + ack`). The footgun +in every driver is reaching for the underlying pool/connection +(`Client.Pool()`, `client.rawPool`, `client.conn`) to wrap `send` and +`receive` in one explicit transaction — the consumer side will not see +what the producer just sent. + ## Three latencies For the full explanation — producer latency, subscriber latency, diff --git a/docs/reference.md b/docs/reference.md index 6ecd5adb..d572f35a 100644 --- a/docs/reference.md +++ b/docs/reference.md @@ -115,6 +115,8 @@ The consume API wraps `pgque.next_batch`, `pgque.get_batch_events`, `pgque.finis All consume-side functions (`receive`, `ack`, `nack`, `subscribe`, `unsubscribe`) are granted to `pgque_reader`, mirroring upstream PgQ's producer/consumer role split. Apps that both produce and consume must hold both `pgque_reader` and `pgque_writer` — `pgque_writer` does not inherit `pgque_reader`. +**Snapshot rule.** `pgque.send` → `pgque.ticker` → `pgque.receive` must each run in its own committed transaction (the ticker's snapshot must be taken after `send` commits; `receive` only sees what committed before it). Same for `pgque.maint_retry_events` → `pgque.ticker` → `pgque.receive`. Go (`pgxpool`) and TypeScript (`pg.Pool`) satisfy this transparently; Python `pgque.connect()` is non-autocommit by default and needs explicit commit boundaries (the high-level Python `Consumer` handles this internally). The footgun in every driver is reaching for the underlying pool/connection (`Client.Pool()`, `client.rawPool`, `client.conn`) to wrap producer + consumer calls in one explicit transaction. See [pgq-concepts.md#snapshot-rule](pgq-concepts.md#snapshot-rule). + #### `pgque.receive(queue text, consumer text, max_return int default 100) → setof pgque.message` Pulls the next batch for `consumer` on `queue` and streams up to `max_return` messages. `max_return` must be >= 1; passing 0 or a negative value raises an error. Returns an empty set if no batch is available. Each row is a `pgque.message` composite (see [§Message type](#message-type)). @@ -236,7 +238,7 @@ Grant: `pgque_admin`. Source: `sql/pgque-api/maint.sql`. #### `pgque.maint_retry_events() → integer` -Moves due rows from `pgque.retry_queue` back into queue event tables so they appear in the next tick. Must be called periodically when using `nack()` with retry — `pgque.start()` schedules it as `pgque_retry_events` every 30 s. When driving the scheduler manually, call this alongside `pgque.maint()`: +Moves due rows from `pgque.retry_queue` back into queue event tables so they appear in the next tick. Must be called periodically when using `nack()` with retry — `pgque.start()` schedules it as `pgque_retry_events` every 30 s. When driving the scheduler manually, call this alongside `pgque.maint()`. The re-inserted rows carry `pg_current_xact_id()` as their `ev_txid`, so the subsequent `pgque.ticker` call must run in a **separate** transaction — see the [snapshot rule](#snapshot-rule). ```sql select pgque.maint_retry_events(); -- every 30 seconds, for nack/retry redelivery @@ -275,6 +277,8 @@ Grant: `pgque_admin`. Source: `sql/pgque-additions/tick_helpers.sql`. Alias for `pgque.force_next_tick`. Retained for compatibility with upstream PgQ (the historical name); identical behavior. The name is misleading — the function does not insert a tick by itself, it only bumps the event sequence so the next `pgque.ticker()` call inserts one. Prefer `force_next_tick` in new code. Grant: `pgque_admin`. Source: `sql/pgque.sql`. +> The `force_tick` → `ticker` → `receive` chain must run across separate transactions for the consumer to see the events you just sent. See the [snapshot rule](#snapshot-rule). + #### `pgque.uninstall() → void` Calls `stop()` (if pg_cron is present) and then `drop schema pgque cascade`. Roles (`pgque_reader`, `pgque_writer`, `pgque_admin`) are not dropped and must be removed manually if desired. `execute` is revoked from both `pgque_admin` (explicit) and PUBLIC (via the schema-wide blanket revoke), so only the schema/install owner (typically a superuser) can run it. diff --git a/docs/tutorial.md b/docs/tutorial.md index 764eedea..81f77104 100644 --- a/docs/tutorial.md +++ b/docs/tutorial.md @@ -124,7 +124,7 @@ For demos and tests, PgQue provides `pgque.force_next_tick` to bypass the tick t -- separate transactions (psql autocommit). Do not wrap in begin/commit: -- ticker() must see the prior send committed before it can include it in a batch. select pgque.force_next_tick('orders'); -select pgque.ticker(); +select pgque.ticker(); -- separate transaction ``` ``` @@ -139,6 +139,8 @@ select pgque.ticker(); `force_next_tick` returns the current tick id (the queue was seeded with tick `1` by `create_queue`). `ticker()` returns the number of queues it processed. +> Each statement above runs in its own transaction — required, not stylistic. PgQue is snapshot-based: the ticker captures a snapshot, and `receive` only returns events whose `send` committed before it. Wrapping `send` + `force_next_tick` + `ticker` + `receive` in one `begin`/`commit` returns zero rows. See the [snapshot rule](pgq-concepts.md#snapshot-rule). + Now try receiving again: ```sql @@ -215,15 +217,15 @@ select pgque.set_queue_config('orders', 'max_retries', '2'); The parameter is `max_retries`, not `queue_max_retries` — `set_queue_config` prepends `queue_` for you. -Send another event, tick, and receive: +Send another event, tick, and receive (each select is a separate transaction — required, see [snapshot rule](pgq-concepts.md#snapshot-rule)): ```sql -- send / force_next_tick / ticker / receive are four separate transactions in psql -- autocommit. Do not wrap them in begin/commit — the snapshot rule still applies. select pgque.send('orders', '{"order_id": 43, "total": 10.00}'::jsonb); select pgque.force_next_tick('orders'); -select pgque.ticker(); -select * from pgque.receive('orders', 'processor', 100); +select pgque.ticker(); -- separate transaction +select * from pgque.receive('orders', 'processor', 100); -- separate transaction ``` ``` @@ -254,8 +256,8 @@ The event is now in PgQ's retry queue. Moving it back into the main event stream -- four separate transactions (psql autocommit). Do not wrap in begin/commit. select pgque.maint_retry_events(); select pgque.force_next_tick('orders'); -select pgque.ticker(); -select * from pgque.receive('orders', 'processor', 100); +select pgque.ticker(); -- separate transaction +select * from pgque.receive('orders', 'processor', 100); -- separate transaction ``` In production, `pgque.start()` schedules `maint_retry_events` on its own cadence — you never call it by hand. See [`pgque.maint()`](reference.md#pgquemaint--integer) and the surrounding Lifecycle entries in the reference. @@ -289,7 +291,7 @@ end $$; -- three more, in psql autocommit. Do not wrap them in begin/commit. select pgque.maint_retry_events(); select pgque.force_next_tick('orders'); -select pgque.ticker(); +select pgque.ticker(); -- separate transaction ``` The second iteration sees `retry_count = 2` and routes to the DLQ instead of the retry queue. After it runs, `receive` returns nothing — the event has moved to `pgque.dead_letter`.