From fecd6f542c2961f9c84192a7d75fe697511c07bc Mon Sep 17 00:00:00 2001 From: Claude Date: Sun, 3 May 2026 20:00:45 +0000 Subject: [PATCH 1/3] docs: document the separate-transactions rule across user-facing surfaces MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The "send → ticker → receive" snapshot rule and the related "maint_retry_events → ticker → receive" rule were called out in test files (test_api_receive.sql header etc.) but invisible everywhere else. A user reading the tutorial, the function reference, or any client README had no warning that wrapping these calls in one explicit tx silently produces empty batches. This commit adds the rule once in pgq-concepts.md and links to it from every other surface that names the involved functions: - docs/tutorial.md: short callout under Step 5 ("Force a tick, then receive") explaining why each select runs in its own transaction. - docs/pgq-concepts.md: new "Snapshot rule" subsection with all three operation chains (producer → consumer; retry pump; rotation step1 → step2) and the asymmetry vs. receive→process→ack. - docs/examples.md: anti-example block under "Exactly-once processing" showing the wrong send+receive-in-one-tx pattern. - docs/reference.md: anchor + paragraph at the top of "Consuming"; inline reminders on maint_retry_events and force_tick entries. - clients/python/README.md: new "Transactions" section explaining default psycopg autocommit behavior, the Quickstart's commit pattern, and the Consumer's internal transaction scope. - clients/go/README.md: new "Transactions" section flagging Client.Pool() as the footgun (its doc-comment invites you to pgque.send inside your own pgx.Tx — correct only if consumer side runs in a separate tx). - clients/typescript/README.md: new "Transactions" section flagging client.rawPool similarly. No code or behavior change. Source SQL header comments (sql/pgque-api/{send,receive,maint}.sql) intentionally left for a later contributor-doc pass. --- clients/go/README.md | 24 ++++++++++++++++++++++++ clients/python/README.md | 26 ++++++++++++++++++++++++++ clients/typescript/README.md | 23 +++++++++++++++++++++++ docs/examples.md | 14 ++++++++++++++ docs/pgq-concepts.md | 33 +++++++++++++++++++++++++++++++++ docs/reference.md | 6 +++++- docs/tutorial.md | 2 ++ 7 files changed, 127 insertions(+), 1 deletion(-) diff --git a/clients/go/README.md b/clients/go/README.md index 894108a4..4c45ceee 100644 --- a/clients/go/README.md +++ b/clients/go/README.md @@ -184,6 +184,30 @@ TypeScript client (`PgqueQueueNotFoundError`, `PgqueConsumerNotFoundError`, `PgqueSqlError`). Go uses the standard acronym-uppercase convention (`SQLError` rather than `SqlError`). +## Transactions + +PgQue is snapshot-based. `Client.Send` (or any insert), the ticker, and +`Client.Receive` must run in **distinct, committed** transactions — +otherwise the ticker's snapshot does not see the producer's commit and +`Receive` returns zero rows. + +By default this is satisfied transparently: every `Client.Send`, +`Client.Receive`, `Client.Ack`, etc. uses `pgxpool` and runs in its own +implicit transaction. The `Consumer` poll loop is also pool-level; no +special handling required. + +The footgun is `Client.Pool()`. Its doc-comment notes you can call +`pgque.send` inside your own `pgx.Tx` for transactional enqueueing — +that is correct, but **only if the consumer side runs in a separate +transaction after your `tx.Commit()`**. Do **not** wrap `pgque.send` +and `pgque.receive` in one shared `pgx.Tx` — the consumer cannot see +what the producer just sent until it commits. The same caveat applies +to invoking `pgque.maint_retry_events` and `pgque.ticker` inside one +Tx: the ticker's snapshot will predate the maint commit and the next +batch will be empty. + +See [pgq-concepts.md#snapshot-rule](https://github.com/NikolayS/pgque/blob/main/docs/pgq-concepts.md#snapshot-rule). + ## Tests The integration tests require a running PostgreSQL with the PgQue schema diff --git a/clients/python/README.md b/clients/python/README.md index 20afcf77..2ddb50d4 100644 --- a/clients/python/README.md +++ b/clients/python/README.md @@ -109,6 +109,32 @@ client.conn.commit() `client.force_tick(queue)` remains as a deprecated compatibility alias. +## Transactions + +PgQue is snapshot-based. `client.send` (or any insert), the ticker, and +`client.receive` must run in **distinct, committed** transactions — +otherwise the ticker's snapshot does not see the producer's commit and +the consumer returns zero rows. + +`pgque.connect(dsn)` defaults to `autocommit=False` (psycopg's default), +so `client.send(...)` does not commit until you call +`client.conn.commit()`. The Quickstart above commits between subscribe +and produce; commit again before the consumer side runs. The `Consumer` +class wraps its own connection in `autocommit=True` and opens an +explicit `with conn.transaction()` only around `receive + dispatch + +ack` — the recommended pattern. + +If you need transactional enqueue (`pgque.send` inside your +application's transaction), commit it before any consumer-side call. Do +**not** wrap `send` and `receive` in one explicit transaction — the +consumer cannot see what the producer just sent until it commits. The +same caveat applies to `pgque.maint_retry_events` + `pgque.ticker`: +they must be in separate transactions for the re-queued rows to appear +in the next batch. + +See [pgq-concepts.md#snapshot-rule](https://github.com/NikolayS/pgque/blob/main/docs/pgq-concepts.md#snapshot-rule). + + ## Tests Integration tests require a running PostgreSQL with the PgQue schema diff --git a/clients/typescript/README.md b/clients/typescript/README.md index feedcf6b..5d09761c 100644 --- a/clients/typescript/README.md +++ b/clients/typescript/README.md @@ -121,6 +121,29 @@ only on pgque's internal pool via a per-pool `CustomTypesConfig` — it does **not** touch the process-global `pg-types` table. Other `pg.Pool` or `pg.Client` instances in the same process are unaffected. +## Transactions + +PgQue is snapshot-based. `client.send` (or any insert), the ticker, and +`client.receive` must run in **distinct, committed** transactions — +otherwise the ticker's snapshot does not see the producer's commit and +`receive` returns zero rows. + +By default this is satisfied transparently: every method on `client` +goes through `pg.Pool#query`, which checks out a fresh connection and +runs each statement in its own implicit transaction. The high-level +`Consumer` poll loop is also pool-level; no special handling required. + +The footgun is `client.rawPool`. If you want transactional enqueue — +`pgque.send` inside your application's transaction — check out a +client with `await client.rawPool.connect()`, then run `BEGIN`, your +inserts including the `pgque.send`, and `COMMIT`. Do **not** mix +`pgque.send` and `pgque.receive` in one shared transaction — the +consumer cannot see what the producer just sent until it commits. The +same caveat applies to invoking `pgque.maint_retry_events` and +`pgque.ticker` inside one Tx. + +See [pgq-concepts.md#snapshot-rule](https://github.com/NikolayS/pgque/blob/main/docs/pgq-concepts.md#snapshot-rule). + ## Tests The repository standardizes on Bun for TypeScript client development and CI diff --git a/docs/examples.md b/docs/examples.md index d477383f..782ea307 100644 --- a/docs/examples.md +++ b/docs/examples.md @@ -69,6 +69,20 @@ commit; The `inserted` CTE runs to completion even though the main query does not reference it (data-modifying CTEs always execute). Every row in `msgs` shares the same `batch_id`, so the scalar subquery picks any one of them and `pgque.ack` runs exactly once. **Batch-ownership caveat:** `pgque.ack(batch_id)` advances the consumer past the entire underlying batch, even if `receive()` returned fewer rows than the batch contains (due to `max_return`). Either consume the full batch before acking, or use `max_return >= ticker_max_count` (default 500) to ensure all rows are returned. +> **Anti-pattern: send + receive in one transaction.** The pattern above merges `receive` + side effects + `ack` into one tx — that is correct. **Do not** also merge the producer's `send` (or `force_tick` / `ticker`) into the same transaction. PgQue is snapshot-based, so the ticker's snapshot must be taken *after* the producer commits, and the consumer's snapshot must be taken *after* the ticker commits. The following anti-pattern silently returns zero rows because the ticker's snapshot predates the `send`'s commit: +> +> ```sql +> -- WRONG -- consumer sees 0 rows +> begin; +> select pgque.send('orders', 'order.created', '{"id": 1}'::jsonb); +> select pgque.force_tick('orders'); +> select pgque.ticker(); +> select * from pgque.receive('orders', 'processor', 100); -- 0 rows +> commit; +> ``` +> +> See [pgq-concepts.md#snapshot-rule](pgq-concepts.md#snapshot-rule) for the full rule. + ## Recurring jobs with pg_cron ```sql diff --git a/docs/pgq-concepts.md b/docs/pgq-concepts.md index 81e89268..27204851 100644 --- a/docs/pgq-concepts.md +++ b/docs/pgq-concepts.md @@ -72,6 +72,39 @@ below; the function auto-prefixes `queue_` internally. — Kreen & Pihlak, PgCon 2009 +## Snapshot rule + +PgQue is snapshot-based, not row-claiming. The ticker records the +PostgreSQL snapshot it sees; `pgque.receive` only returns events whose +insert committed **before** that snapshot. Consequence: the following +operation chains MUST run in distinct, committed transactions — +combining any chain in one explicit `begin`/`commit` block silently +produces empty batches and dropped messages. + +- **Producer → consumer.** `pgque.send` (or `pgque.insert_event`) → + `pgque.ticker` (or `pgque.force_tick` + `pgque.ticker`) → + `pgque.receive` (or `pgque.next_batch`). +- **Retry pump.** `pgque.maint_retry_events` (re-inserts retry rows + into event tables with `pg_current_xact_id()`) → `pgque.ticker` + (must run in a later tx so the new `ev_txid`s are visible in its + snapshot) → `pgque.receive`. +- **Rotation.** `pgque.maint_rotate_tables_step1` → + `pgque.maint_rotate_tables_step2` (PgQ design requirement). + +By contrast, `receive → process → ack` belongs in **one** transaction +when you want exactly-once effects on the same database (see the +[transactional pattern](examples.md#exactly-once-processing-transactional-pattern)). +The asymmetry: producer-to-consumer flow needs commit boundaries between +steps; consume-to-side-effect flow needs them merged. + +The default modes of every shipped client (`pgxpool` / `pg.Pool` / +`psycopg(autocommit=True)`) run each call in its own implicit +transaction, so the rule is satisfied transparently. The footgun is +reaching for the underlying pool/connection (`Client.Pool()`, +`client.rawPool`, `client.conn`) to wrap `send` and `receive` in one +explicit transaction — the consumer side will not see what the producer +just sent. + ## Three latencies For the full explanation — producer latency, subscriber latency, diff --git a/docs/reference.md b/docs/reference.md index 6ecd5adb..b2faa060 100644 --- a/docs/reference.md +++ b/docs/reference.md @@ -115,6 +115,8 @@ The consume API wraps `pgque.next_batch`, `pgque.get_batch_events`, `pgque.finis All consume-side functions (`receive`, `ack`, `nack`, `subscribe`, `unsubscribe`) are granted to `pgque_reader`, mirroring upstream PgQ's producer/consumer role split. Apps that both produce and consume must hold both `pgque_reader` and `pgque_writer` — `pgque_writer` does not inherit `pgque_reader`. +**Snapshot rule.** PgQue is snapshot-based. `pgque.send` (commit) → `pgque.ticker` (snapshot taken) → `pgque.receive` (sees what committed before that snapshot) MUST run in distinct, committed transactions. The same applies to `pgque.maint_retry_events` → `pgque.ticker` → `pgque.receive`. Combining any chain inside one explicit `begin`/`commit` block silently produces empty batches and dropped messages. Default modes of every shipped client (`pgxpool`, `pg.Pool`, `psycopg(autocommit=True)`) satisfy this transparently; the footgun is reaching for the underlying pool/connection to wrap producer + consumer calls in one explicit transaction. See [pgq-concepts.md#snapshot-rule](pgq-concepts.md#snapshot-rule). + #### `pgque.receive(queue text, consumer text, max_return int default 100) → setof pgque.message` Pulls the next batch for `consumer` on `queue` and streams up to `max_return` messages. `max_return` must be >= 1; passing 0 or a negative value raises an error. Returns an empty set if no batch is available. Each row is a `pgque.message` composite (see [§Message type](#message-type)). @@ -236,7 +238,7 @@ Grant: `pgque_admin`. Source: `sql/pgque-api/maint.sql`. #### `pgque.maint_retry_events() → integer` -Moves due rows from `pgque.retry_queue` back into queue event tables so they appear in the next tick. Must be called periodically when using `nack()` with retry — `pgque.start()` schedules it as `pgque_retry_events` every 30 s. When driving the scheduler manually, call this alongside `pgque.maint()`: +Moves due rows from `pgque.retry_queue` back into queue event tables so they appear in the next tick. Must be called periodically when using `nack()` with retry — `pgque.start()` schedules it as `pgque_retry_events` every 30 s. When driving the scheduler manually, call this alongside `pgque.maint()`. The re-inserted rows carry `pg_current_xact_id()` as their `ev_txid`, so the subsequent `pgque.ticker` call must run in a **separate** transaction — see the [snapshot rule](#snapshot-rule). ```sql select pgque.maint_retry_events(); -- every 30 seconds, for nack/retry redelivery @@ -275,6 +277,8 @@ Grant: `pgque_admin`. Source: `sql/pgque-additions/tick_helpers.sql`. Alias for `pgque.force_next_tick`. Retained for compatibility with upstream PgQ (the historical name); identical behavior. The name is misleading — the function does not insert a tick by itself, it only bumps the event sequence so the next `pgque.ticker()` call inserts one. Prefer `force_next_tick` in new code. Grant: `pgque_admin`. Source: `sql/pgque.sql`. +> The `force_tick` → `ticker` → `receive` chain must run across separate transactions for the consumer to see the events you just sent. See the [snapshot rule](#snapshot-rule). + #### `pgque.uninstall() → void` Calls `stop()` (if pg_cron is present) and then `drop schema pgque cascade`. Roles (`pgque_reader`, `pgque_writer`, `pgque_admin`) are not dropped and must be removed manually if desired. `execute` is revoked from both `pgque_admin` (explicit) and PUBLIC (via the schema-wide blanket revoke), so only the schema/install owner (typically a superuser) can run it. diff --git a/docs/tutorial.md b/docs/tutorial.md index 764eedea..67a0c3d3 100644 --- a/docs/tutorial.md +++ b/docs/tutorial.md @@ -139,6 +139,8 @@ select pgque.ticker(); `force_next_tick` returns the current tick id (the queue was seeded with tick `1` by `create_queue`). `ticker()` returns the number of queues it processed. +> **Each statement above runs in its own transaction.** That is intentional — PgQue is snapshot-based. `pgque.ticker` records the snapshot it sees; `pgque.receive` only returns events whose insert committed *before* that snapshot. If you wrap `send` + `force_tick` + `ticker` (or `maint_retry_events` + `ticker`) inside one explicit `begin`/`commit` block, the ticker's snapshot does not see the inserts and the next `receive` returns zero rows. See the [snapshot rule](pgq-concepts.md#snapshot-rule). + Now try receiving again: ```sql From df46346ee008eba6c09091222547542380221fa6 Mon Sep 17 00:00:00 2001 From: Claude Date: Sun, 3 May 2026 22:56:48 +0000 Subject: [PATCH 2/3] docs: tighten snapshot-rule wording + inline 'separate transaction' comments Per review feedback on PR #190. - Halve the prose on every per-surface callout (tutorial Step 5, examples anti-example, reference snapshot-rule paragraph). The canonical long-form lives in pgq-concepts.md#snapshot-rule and is unchanged. - Cut the three client README "Transactions" sections to ~3 sentences each. The previous versions repeated points already made on the canonical page. - Add inline -- separate transaction comments next to pgque.ticker(), pgque.force_tick(), and follow-up pgque.receive() calls in the tutorial and the fan-out example, so the rule is visible at the point of use rather than only in surrounding prose. --- clients/go/README.md | 24 +++--------------------- clients/python/README.md | 25 +++---------------------- clients/typescript/README.md | 23 +++-------------------- docs/examples.md | 10 +++++----- docs/reference.md | 2 +- docs/tutorial.md | 16 ++++++++-------- 6 files changed, 23 insertions(+), 77 deletions(-) diff --git a/clients/go/README.md b/clients/go/README.md index 4c45ceee..ba9ec7c9 100644 --- a/clients/go/README.md +++ b/clients/go/README.md @@ -186,27 +186,9 @@ TypeScript client (`PgqueQueueNotFoundError`, `PgqueConsumerNotFoundError`, ## Transactions -PgQue is snapshot-based. `Client.Send` (or any insert), the ticker, and -`Client.Receive` must run in **distinct, committed** transactions — -otherwise the ticker's snapshot does not see the producer's commit and -`Receive` returns zero rows. - -By default this is satisfied transparently: every `Client.Send`, -`Client.Receive`, `Client.Ack`, etc. uses `pgxpool` and runs in its own -implicit transaction. The `Consumer` poll loop is also pool-level; no -special handling required. - -The footgun is `Client.Pool()`. Its doc-comment notes you can call -`pgque.send` inside your own `pgx.Tx` for transactional enqueueing — -that is correct, but **only if the consumer side runs in a separate -transaction after your `tx.Commit()`**. Do **not** wrap `pgque.send` -and `pgque.receive` in one shared `pgx.Tx` — the consumer cannot see -what the producer just sent until it commits. The same caveat applies -to invoking `pgque.maint_retry_events` and `pgque.ticker` inside one -Tx: the ticker's snapshot will predate the maint commit and the next -batch will be empty. - -See [pgq-concepts.md#snapshot-rule](https://github.com/NikolayS/pgque/blob/main/docs/pgq-concepts.md#snapshot-rule). +`Send` → ticker → `Receive` must each run in its own committed transaction (PgQue is snapshot-based). `pgxpool` satisfies this transparently — every `Send`/`Receive`/`Ack` is its own implicit tx, and the `Consumer` is pool-level. + +The footgun is `Client.Pool()`: calling `pgque.send` inside your own `pgx.Tx` is fine for transactional enqueue, but the consumer must run after `tx.Commit()`. Don't wrap `pgque.send` and `pgque.receive` in one shared `pgx.Tx`; same for `pgque.maint_retry_events` + `pgque.ticker`. See [snapshot rule](https://github.com/NikolayS/pgque/blob/main/docs/pgq-concepts.md#snapshot-rule). ## Tests diff --git a/clients/python/README.md b/clients/python/README.md index 2ddb50d4..96fff37d 100644 --- a/clients/python/README.md +++ b/clients/python/README.md @@ -111,28 +111,9 @@ client.conn.commit() ## Transactions -PgQue is snapshot-based. `client.send` (or any insert), the ticker, and -`client.receive` must run in **distinct, committed** transactions — -otherwise the ticker's snapshot does not see the producer's commit and -the consumer returns zero rows. - -`pgque.connect(dsn)` defaults to `autocommit=False` (psycopg's default), -so `client.send(...)` does not commit until you call -`client.conn.commit()`. The Quickstart above commits between subscribe -and produce; commit again before the consumer side runs. The `Consumer` -class wraps its own connection in `autocommit=True` and opens an -explicit `with conn.transaction()` only around `receive + dispatch + -ack` — the recommended pattern. - -If you need transactional enqueue (`pgque.send` inside your -application's transaction), commit it before any consumer-side call. Do -**not** wrap `send` and `receive` in one explicit transaction — the -consumer cannot see what the producer just sent until it commits. The -same caveat applies to `pgque.maint_retry_events` + `pgque.ticker`: -they must be in separate transactions for the re-queued rows to appear -in the next batch. - -See [pgq-concepts.md#snapshot-rule](https://github.com/NikolayS/pgque/blob/main/docs/pgq-concepts.md#snapshot-rule). +`send` → ticker → `receive` must each run in its own committed transaction (PgQue is snapshot-based). `pgque.connect(dsn)` is non-autocommit by default — commit between produce and consumer. The `Consumer` is autocommit + explicit `conn.transaction()` around `receive + dispatch + ack`. + +Don't wrap `send` and `receive` in one explicit tx; same for `maint_retry_events` + `ticker`. See [snapshot rule](https://github.com/NikolayS/pgque/blob/main/docs/pgq-concepts.md#snapshot-rule). ## Tests diff --git a/clients/typescript/README.md b/clients/typescript/README.md index 5d09761c..739a0a16 100644 --- a/clients/typescript/README.md +++ b/clients/typescript/README.md @@ -123,26 +123,9 @@ or `pg.Client` instances in the same process are unaffected. ## Transactions -PgQue is snapshot-based. `client.send` (or any insert), the ticker, and -`client.receive` must run in **distinct, committed** transactions — -otherwise the ticker's snapshot does not see the producer's commit and -`receive` returns zero rows. - -By default this is satisfied transparently: every method on `client` -goes through `pg.Pool#query`, which checks out a fresh connection and -runs each statement in its own implicit transaction. The high-level -`Consumer` poll loop is also pool-level; no special handling required. - -The footgun is `client.rawPool`. If you want transactional enqueue — -`pgque.send` inside your application's transaction — check out a -client with `await client.rawPool.connect()`, then run `BEGIN`, your -inserts including the `pgque.send`, and `COMMIT`. Do **not** mix -`pgque.send` and `pgque.receive` in one shared transaction — the -consumer cannot see what the producer just sent until it commits. The -same caveat applies to invoking `pgque.maint_retry_events` and -`pgque.ticker` inside one Tx. - -See [pgq-concepts.md#snapshot-rule](https://github.com/NikolayS/pgque/blob/main/docs/pgq-concepts.md#snapshot-rule). +`send` → ticker → `receive` must each run in its own committed transaction (PgQue is snapshot-based). `pg.Pool#query` satisfies this transparently — every `send`/`receive`/`ack` is its own implicit tx, and the `Consumer` is pool-level. + +The footgun is `client.rawPool`: for transactional enqueue, call `BEGIN` / `pgque.send` / `COMMIT` on a checked-out client. Don't mix `pgque.send` and `pgque.receive` in one shared tx; same for `pgque.maint_retry_events` + `pgque.ticker`. See [snapshot rule](https://github.com/NikolayS/pgque/blob/main/docs/pgq-concepts.md#snapshot-rule). ## Tests diff --git a/docs/examples.md b/docs/examples.md index 782ea307..504f0357 100644 --- a/docs/examples.md +++ b/docs/examples.md @@ -40,10 +40,10 @@ select pgque.subscribe('orders', 'analytics_pipeline'); -- send / force_next_tick / ticker / receive are separate transactions in psql -- autocommit. Do not wrap them in begin/commit — the snapshot rule applies. select pgque.send('orders', 'order.created', '{"order_id": 1}'::jsonb); -select pgque.force_next_tick('orders'); -select pgque.ticker(); +select pgque.force_next_tick('orders'); -- separate transaction +select pgque.ticker(); -- separate transaction -select * from pgque.receive('orders', 'audit_logger', 100); +select * from pgque.receive('orders', 'audit_logger', 100); -- separate transaction select * from pgque.receive('orders', 'notification_sender', 100); ``` @@ -69,7 +69,7 @@ commit; The `inserted` CTE runs to completion even though the main query does not reference it (data-modifying CTEs always execute). Every row in `msgs` shares the same `batch_id`, so the scalar subquery picks any one of them and `pgque.ack` runs exactly once. **Batch-ownership caveat:** `pgque.ack(batch_id)` advances the consumer past the entire underlying batch, even if `receive()` returned fewer rows than the batch contains (due to `max_return`). Either consume the full batch before acking, or use `max_return >= ticker_max_count` (default 500) to ensure all rows are returned. -> **Anti-pattern: send + receive in one transaction.** The pattern above merges `receive` + side effects + `ack` into one tx — that is correct. **Do not** also merge the producer's `send` (or `force_tick` / `ticker`) into the same transaction. PgQue is snapshot-based, so the ticker's snapshot must be taken *after* the producer commits, and the consumer's snapshot must be taken *after* the ticker commits. The following anti-pattern silently returns zero rows because the ticker's snapshot predates the `send`'s commit: +> **Anti-pattern: send + receive in one transaction.** Above merges `receive` + writes + `ack` into one tx — correct. Do **not** also merge `send` / `force_tick` / `ticker` into the same tx; the ticker's snapshot must be taken *after* `send` commits. > > ```sql > -- WRONG -- consumer sees 0 rows @@ -81,7 +81,7 @@ The `inserted` CTE runs to completion even though the main query does not refere > commit; > ``` > -> See [pgq-concepts.md#snapshot-rule](pgq-concepts.md#snapshot-rule) for the full rule. +> See [pgq-concepts.md#snapshot-rule](pgq-concepts.md#snapshot-rule). ## Recurring jobs with pg_cron diff --git a/docs/reference.md b/docs/reference.md index b2faa060..f8b77f0f 100644 --- a/docs/reference.md +++ b/docs/reference.md @@ -115,7 +115,7 @@ The consume API wraps `pgque.next_batch`, `pgque.get_batch_events`, `pgque.finis All consume-side functions (`receive`, `ack`, `nack`, `subscribe`, `unsubscribe`) are granted to `pgque_reader`, mirroring upstream PgQ's producer/consumer role split. Apps that both produce and consume must hold both `pgque_reader` and `pgque_writer` — `pgque_writer` does not inherit `pgque_reader`. -**Snapshot rule.** PgQue is snapshot-based. `pgque.send` (commit) → `pgque.ticker` (snapshot taken) → `pgque.receive` (sees what committed before that snapshot) MUST run in distinct, committed transactions. The same applies to `pgque.maint_retry_events` → `pgque.ticker` → `pgque.receive`. Combining any chain inside one explicit `begin`/`commit` block silently produces empty batches and dropped messages. Default modes of every shipped client (`pgxpool`, `pg.Pool`, `psycopg(autocommit=True)`) satisfy this transparently; the footgun is reaching for the underlying pool/connection to wrap producer + consumer calls in one explicit transaction. See [pgq-concepts.md#snapshot-rule](pgq-concepts.md#snapshot-rule). +**Snapshot rule.** `pgque.send` → `pgque.ticker` → `pgque.receive` must each run in its own committed transaction (the ticker's snapshot must be taken after `send` commits; `receive` only sees what committed before it). Same for `pgque.maint_retry_events` → `pgque.ticker` → `pgque.receive`. Default `pgxpool` / `pg.Pool` / `psycopg(autocommit=True)` satisfy this transparently; the footgun is reaching for the underlying pool/connection to wrap producer + consumer calls in one explicit transaction. See [pgq-concepts.md#snapshot-rule](pgq-concepts.md#snapshot-rule). #### `pgque.receive(queue text, consumer text, max_return int default 100) → setof pgque.message` diff --git a/docs/tutorial.md b/docs/tutorial.md index 67a0c3d3..81f77104 100644 --- a/docs/tutorial.md +++ b/docs/tutorial.md @@ -124,7 +124,7 @@ For demos and tests, PgQue provides `pgque.force_next_tick` to bypass the tick t -- separate transactions (psql autocommit). Do not wrap in begin/commit: -- ticker() must see the prior send committed before it can include it in a batch. select pgque.force_next_tick('orders'); -select pgque.ticker(); +select pgque.ticker(); -- separate transaction ``` ``` @@ -139,7 +139,7 @@ select pgque.ticker(); `force_next_tick` returns the current tick id (the queue was seeded with tick `1` by `create_queue`). `ticker()` returns the number of queues it processed. -> **Each statement above runs in its own transaction.** That is intentional — PgQue is snapshot-based. `pgque.ticker` records the snapshot it sees; `pgque.receive` only returns events whose insert committed *before* that snapshot. If you wrap `send` + `force_tick` + `ticker` (or `maint_retry_events` + `ticker`) inside one explicit `begin`/`commit` block, the ticker's snapshot does not see the inserts and the next `receive` returns zero rows. See the [snapshot rule](pgq-concepts.md#snapshot-rule). +> Each statement above runs in its own transaction — required, not stylistic. PgQue is snapshot-based: the ticker captures a snapshot, and `receive` only returns events whose `send` committed before it. Wrapping `send` + `force_next_tick` + `ticker` + `receive` in one `begin`/`commit` returns zero rows. See the [snapshot rule](pgq-concepts.md#snapshot-rule). Now try receiving again: @@ -217,15 +217,15 @@ select pgque.set_queue_config('orders', 'max_retries', '2'); The parameter is `max_retries`, not `queue_max_retries` — `set_queue_config` prepends `queue_` for you. -Send another event, tick, and receive: +Send another event, tick, and receive (each select is a separate transaction — required, see [snapshot rule](pgq-concepts.md#snapshot-rule)): ```sql -- send / force_next_tick / ticker / receive are four separate transactions in psql -- autocommit. Do not wrap them in begin/commit — the snapshot rule still applies. select pgque.send('orders', '{"order_id": 43, "total": 10.00}'::jsonb); select pgque.force_next_tick('orders'); -select pgque.ticker(); -select * from pgque.receive('orders', 'processor', 100); +select pgque.ticker(); -- separate transaction +select * from pgque.receive('orders', 'processor', 100); -- separate transaction ``` ``` @@ -256,8 +256,8 @@ The event is now in PgQ's retry queue. Moving it back into the main event stream -- four separate transactions (psql autocommit). Do not wrap in begin/commit. select pgque.maint_retry_events(); select pgque.force_next_tick('orders'); -select pgque.ticker(); -select * from pgque.receive('orders', 'processor', 100); +select pgque.ticker(); -- separate transaction +select * from pgque.receive('orders', 'processor', 100); -- separate transaction ``` In production, `pgque.start()` schedules `maint_retry_events` on its own cadence — you never call it by hand. See [`pgque.maint()`](reference.md#pgquemaint--integer) and the surrounding Lifecycle entries in the reference. @@ -291,7 +291,7 @@ end $$; -- three more, in psql autocommit. Do not wrap them in begin/commit. select pgque.maint_retry_events(); select pgque.force_next_tick('orders'); -select pgque.ticker(); +select pgque.ticker(); -- separate transaction ``` The second iteration sees `retry_count = 2` and routes to the DLQ instead of the retry queue. After it runs, `receive` returns nothing — the event has moved to `pgque.dead_letter`. From e87630bc3b607bffba15c6a80f343c705229256d Mon Sep 17 00:00:00 2001 From: Claude Date: Tue, 5 May 2026 06:43:50 +0000 Subject: [PATCH 3/3] docs: clarify Python's non-autocommit default in snapshot rule Per REV review on PR #190. The shared concepts/reference docs implied the Python client satisfies the snapshot rule transparently like Go/TS pool methods. That is wrong for the bare client: pgque.connect(dsn) is autocommit=False by default, so producers must commit explicitly between send and consumer side. The high-level Python Consumer already handles this internally; the bare PgqueClient leaves transaction management to the caller. Brings pgq-concepts.md and reference.md in line with the per-driver README and the actual psycopg default. --- docs/examples.md | 4 ++-- docs/pgq-concepts.md | 18 +++++++++++------- docs/reference.md | 2 +- 3 files changed, 14 insertions(+), 10 deletions(-) diff --git a/docs/examples.md b/docs/examples.md index 504f0357..01ea8b63 100644 --- a/docs/examples.md +++ b/docs/examples.md @@ -69,13 +69,13 @@ commit; The `inserted` CTE runs to completion even though the main query does not reference it (data-modifying CTEs always execute). Every row in `msgs` shares the same `batch_id`, so the scalar subquery picks any one of them and `pgque.ack` runs exactly once. **Batch-ownership caveat:** `pgque.ack(batch_id)` advances the consumer past the entire underlying batch, even if `receive()` returned fewer rows than the batch contains (due to `max_return`). Either consume the full batch before acking, or use `max_return >= ticker_max_count` (default 500) to ensure all rows are returned. -> **Anti-pattern: send + receive in one transaction.** Above merges `receive` + writes + `ack` into one tx — correct. Do **not** also merge `send` / `force_tick` / `ticker` into the same tx; the ticker's snapshot must be taken *after* `send` commits. +> **Anti-pattern: send + receive in one transaction.** Above merges `receive` + writes + `ack` into one tx — correct. Do **not** also merge `send` / `force_next_tick` / `ticker` into the same tx; the ticker's snapshot must be taken *after* `send` commits. > > ```sql > -- WRONG -- consumer sees 0 rows > begin; > select pgque.send('orders', 'order.created', '{"id": 1}'::jsonb); -> select pgque.force_tick('orders'); +> select pgque.force_next_tick('orders'); > select pgque.ticker(); > select * from pgque.receive('orders', 'processor', 100); -- 0 rows > commit; diff --git a/docs/pgq-concepts.md b/docs/pgq-concepts.md index 27204851..863b87e1 100644 --- a/docs/pgq-concepts.md +++ b/docs/pgq-concepts.md @@ -97,13 +97,17 @@ when you want exactly-once effects on the same database (see the The asymmetry: producer-to-consumer flow needs commit boundaries between steps; consume-to-side-effect flow needs them merged. -The default modes of every shipped client (`pgxpool` / `pg.Pool` / -`psycopg(autocommit=True)`) run each call in its own implicit -transaction, so the rule is satisfied transparently. The footgun is -reaching for the underlying pool/connection (`Client.Pool()`, -`client.rawPool`, `client.conn`) to wrap `send` and `receive` in one -explicit transaction — the consumer side will not see what the producer -just sent. +For the shipped clients: Go (`pgxpool`) and TypeScript (`pg.Pool`) run +each call in its own implicit transaction, so the rule is satisfied +transparently. The Python client requires care — `pgque.connect(dsn)` +is **not** autocommit by default, so producers must commit explicitly +between `send` and the consumer side; the high-level Python `Consumer` +already handles this internally (autocommit + an explicit +`conn.transaction()` around `receive + dispatch + ack`). The footgun +in every driver is reaching for the underlying pool/connection +(`Client.Pool()`, `client.rawPool`, `client.conn`) to wrap `send` and +`receive` in one explicit transaction — the consumer side will not see +what the producer just sent. ## Three latencies diff --git a/docs/reference.md b/docs/reference.md index f8b77f0f..d572f35a 100644 --- a/docs/reference.md +++ b/docs/reference.md @@ -115,7 +115,7 @@ The consume API wraps `pgque.next_batch`, `pgque.get_batch_events`, `pgque.finis All consume-side functions (`receive`, `ack`, `nack`, `subscribe`, `unsubscribe`) are granted to `pgque_reader`, mirroring upstream PgQ's producer/consumer role split. Apps that both produce and consume must hold both `pgque_reader` and `pgque_writer` — `pgque_writer` does not inherit `pgque_reader`. -**Snapshot rule.** `pgque.send` → `pgque.ticker` → `pgque.receive` must each run in its own committed transaction (the ticker's snapshot must be taken after `send` commits; `receive` only sees what committed before it). Same for `pgque.maint_retry_events` → `pgque.ticker` → `pgque.receive`. Default `pgxpool` / `pg.Pool` / `psycopg(autocommit=True)` satisfy this transparently; the footgun is reaching for the underlying pool/connection to wrap producer + consumer calls in one explicit transaction. See [pgq-concepts.md#snapshot-rule](pgq-concepts.md#snapshot-rule). +**Snapshot rule.** `pgque.send` → `pgque.ticker` → `pgque.receive` must each run in its own committed transaction (the ticker's snapshot must be taken after `send` commits; `receive` only sees what committed before it). Same for `pgque.maint_retry_events` → `pgque.ticker` → `pgque.receive`. Go (`pgxpool`) and TypeScript (`pg.Pool`) satisfy this transparently; Python `pgque.connect()` is non-autocommit by default and needs explicit commit boundaries (the high-level Python `Consumer` handles this internally). The footgun in every driver is reaching for the underlying pool/connection (`Client.Pool()`, `client.rawPool`, `client.conn`) to wrap producer + consumer calls in one explicit transaction. See [pgq-concepts.md#snapshot-rule](pgq-concepts.md#snapshot-rule). #### `pgque.receive(queue text, consumer text, max_return int default 100) → setof pgque.message`