Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions clients/go/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,12 @@ TypeScript client (`PgqueQueueNotFoundError`, `PgqueConsumerNotFoundError`,
`PgqueSqlError`). Go uses the standard acronym-uppercase convention
(`SQLError` rather than `SqlError`).

## Transactions

`Send` → ticker → `Receive` must each run in its own committed transaction (PgQue is snapshot-based). `pgxpool` satisfies this transparently — every `Send`/`Receive`/`Ack` is its own implicit tx, and the `Consumer` is pool-level.

The footgun is `Client.Pool()`: calling `pgque.send` inside your own `pgx.Tx` is fine for transactional enqueue, but the consumer must run after `tx.Commit()`. Don't wrap `pgque.send` and `pgque.receive` in one shared `pgx.Tx`; same for `pgque.maint_retry_events` + `pgque.ticker`. See [snapshot rule](https://github.com/NikolayS/pgque/blob/main/docs/pgq-concepts.md#snapshot-rule).

## Tests

The integration tests require a running PostgreSQL with the PgQue schema
Expand Down
7 changes: 7 additions & 0 deletions clients/python/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,13 @@ client.conn.commit()

`client.force_tick(queue)` remains as a deprecated compatibility alias.

## Transactions

`send` → ticker → `receive` must each run in its own committed transaction (PgQue is snapshot-based). `pgque.connect(dsn)` is non-autocommit by default — commit between produce and consumer. The `Consumer` is autocommit + explicit `conn.transaction()` around `receive + dispatch + ack`.

Don't wrap `send` and `receive` in one explicit tx; same for `maint_retry_events` + `ticker`. See [snapshot rule](https://github.com/NikolayS/pgque/blob/main/docs/pgq-concepts.md#snapshot-rule).


## Tests

Integration tests require a running PostgreSQL with the PgQue schema
Expand Down
6 changes: 6 additions & 0 deletions clients/typescript/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,12 @@ only on pgque's internal pool via a per-pool `CustomTypesConfig` — it
does **not** touch the process-global `pg-types` table. Other `pg.Pool`
or `pg.Client` instances in the same process are unaffected.

## Transactions

`send` → ticker → `receive` must each run in its own committed transaction (PgQue is snapshot-based). `pg.Pool#query` satisfies this transparently — every `send`/`receive`/`ack` is its own implicit tx, and the `Consumer` is pool-level.

The footgun is `client.rawPool`: for transactional enqueue, call `BEGIN` / `pgque.send` / `COMMIT` on a checked-out client. Don't mix `pgque.send` and `pgque.receive` in one shared tx; same for `pgque.maint_retry_events` + `pgque.ticker`. See [snapshot rule](https://github.com/NikolayS/pgque/blob/main/docs/pgq-concepts.md#snapshot-rule).

## Tests

The repository standardizes on Bun for TypeScript client development and CI
Expand Down
20 changes: 17 additions & 3 deletions docs/examples.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@ select pgque.subscribe('orders', 'analytics_pipeline');
-- send / force_next_tick / ticker / receive are separate transactions in psql
-- autocommit. Do not wrap them in begin/commit — the snapshot rule applies.
select pgque.send('orders', 'order.created', '{"order_id": 1}'::jsonb);
select pgque.force_next_tick('orders');
select pgque.ticker();
select pgque.force_next_tick('orders'); -- separate transaction
select pgque.ticker(); -- separate transaction

select * from pgque.receive('orders', 'audit_logger', 100);
select * from pgque.receive('orders', 'audit_logger', 100); -- separate transaction
select * from pgque.receive('orders', 'notification_sender', 100);
```

Expand All @@ -69,6 +69,20 @@ commit;

The `inserted` CTE runs to completion even though the main query does not reference it (data-modifying CTEs always execute). Every row in `msgs` shares the same `batch_id`, so the scalar subquery picks any one of them and `pgque.ack` runs exactly once. **Batch-ownership caveat:** `pgque.ack(batch_id)` advances the consumer past the entire underlying batch, even if `receive()` returned fewer rows than the batch contains (due to `max_return`). Either consume the full batch before acking, or use `max_return >= ticker_max_count` (default 500) to ensure all rows are returned.

> **Anti-pattern: send + receive in one transaction.** Above merges `receive` + writes + `ack` into one tx — correct. Do **not** also merge `send` / `force_next_tick` / `ticker` into the same tx; the ticker's snapshot must be taken *after* `send` commits.
>
> ```sql
> -- WRONG -- consumer sees 0 rows
> begin;
> select pgque.send('orders', 'order.created', '{"id": 1}'::jsonb);
> select pgque.force_next_tick('orders');
> select pgque.ticker();
> select * from pgque.receive('orders', 'processor', 100); -- 0 rows
> commit;
> ```
>
> See [pgq-concepts.md#snapshot-rule](pgq-concepts.md#snapshot-rule).

## Recurring jobs with pg_cron

```sql
Expand Down
37 changes: 37 additions & 0 deletions docs/pgq-concepts.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,43 @@ below; the function auto-prefixes `queue_` internally.

— Kreen & Pihlak, PgCon 2009

## Snapshot rule

PgQue is snapshot-based, not row-claiming. The ticker records the
PostgreSQL snapshot it sees; `pgque.receive` only returns events whose
insert committed **before** that snapshot. Consequence: the following
operation chains MUST run in distinct, committed transactions —
combining any chain in one explicit `begin`/`commit` block silently
produces empty batches and dropped messages.

- **Producer → consumer.** `pgque.send` (or `pgque.insert_event`) →
`pgque.ticker` (or `pgque.force_tick` + `pgque.ticker`) →
`pgque.receive` (or `pgque.next_batch`).
- **Retry pump.** `pgque.maint_retry_events` (re-inserts retry rows
into event tables with `pg_current_xact_id()`) → `pgque.ticker`
(must run in a later tx so the new `ev_txid`s are visible in its
snapshot) → `pgque.receive`.
- **Rotation.** `pgque.maint_rotate_tables_step1` →
`pgque.maint_rotate_tables_step2` (PgQ design requirement).

By contrast, `receive → process → ack` belongs in **one** transaction
when you want exactly-once effects on the same database (see the
[transactional pattern](examples.md#exactly-once-processing-transactional-pattern)).
The asymmetry: producer-to-consumer flow needs commit boundaries between
steps; consume-to-side-effect flow needs them merged.

For the shipped clients: Go (`pgxpool`) and TypeScript (`pg.Pool`) run
each call in its own implicit transaction, so the rule is satisfied
transparently. The Python client requires care — `pgque.connect(dsn)`
is **not** autocommit by default, so producers must commit explicitly
between `send` and the consumer side; the high-level Python `Consumer`
already handles this internally (autocommit + an explicit
`conn.transaction()` around `receive + dispatch + ack`). The footgun
in every driver is reaching for the underlying pool/connection
(`Client.Pool()`, `client.rawPool`, `client.conn`) to wrap `send` and
`receive` in one explicit transaction — the consumer side will not see
what the producer just sent.

## Three latencies

For the full explanation — producer latency, subscriber latency,
Expand Down
6 changes: 5 additions & 1 deletion docs/reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ The consume API wraps `pgque.next_batch`, `pgque.get_batch_events`, `pgque.finis

All consume-side functions (`receive`, `ack`, `nack`, `subscribe`, `unsubscribe`) are granted to `pgque_reader`, mirroring upstream PgQ's producer/consumer role split. Apps that both produce and consume must hold both `pgque_reader` and `pgque_writer` — `pgque_writer` does not inherit `pgque_reader`.

<a id="snapshot-rule"></a>**Snapshot rule.** `pgque.send` → `pgque.ticker` → `pgque.receive` must each run in its own committed transaction (the ticker's snapshot must be taken after `send` commits; `receive` only sees what committed before it). Same for `pgque.maint_retry_events` → `pgque.ticker` → `pgque.receive`. Go (`pgxpool`) and TypeScript (`pg.Pool`) satisfy this transparently; Python `pgque.connect()` is non-autocommit by default and needs explicit commit boundaries (the high-level Python `Consumer` handles this internally). The footgun in every driver is reaching for the underlying pool/connection (`Client.Pool()`, `client.rawPool`, `client.conn`) to wrap producer + consumer calls in one explicit transaction. See [pgq-concepts.md#snapshot-rule](pgq-concepts.md#snapshot-rule).

#### `pgque.receive(queue text, consumer text, max_return int default 100) → setof pgque.message`

Pulls the next batch for `consumer` on `queue` and streams up to `max_return` messages. `max_return` must be >= 1; passing 0 or a negative value raises an error. Returns an empty set if no batch is available. Each row is a `pgque.message` composite (see [§Message type](#message-type)).
Expand Down Expand Up @@ -236,7 +238,7 @@ Grant: `pgque_admin`. Source: `sql/pgque-api/maint.sql`.

#### `pgque.maint_retry_events() → integer`

Moves due rows from `pgque.retry_queue` back into queue event tables so they appear in the next tick. Must be called periodically when using `nack()` with retry — `pgque.start()` schedules it as `pgque_retry_events` every 30 s. When driving the scheduler manually, call this alongside `pgque.maint()`:
Moves due rows from `pgque.retry_queue` back into queue event tables so they appear in the next tick. Must be called periodically when using `nack()` with retry — `pgque.start()` schedules it as `pgque_retry_events` every 30 s. When driving the scheduler manually, call this alongside `pgque.maint()`. The re-inserted rows carry `pg_current_xact_id()` as their `ev_txid`, so the subsequent `pgque.ticker` call must run in a **separate** transaction — see the [snapshot rule](#snapshot-rule).

```sql
select pgque.maint_retry_events(); -- every 30 seconds, for nack/retry redelivery
Expand Down Expand Up @@ -275,6 +277,8 @@ Grant: `pgque_admin`. Source: `sql/pgque-additions/tick_helpers.sql`.
Alias for `pgque.force_next_tick`. Retained for compatibility with upstream PgQ (the historical name); identical behavior. The name is misleading — the function does not insert a tick by itself, it only bumps the event sequence so the next `pgque.ticker()` call inserts one. Prefer `force_next_tick` in new code.
Grant: `pgque_admin`. Source: `sql/pgque.sql`.

> The `force_tick` → `ticker` → `receive` chain must run across separate transactions for the consumer to see the events you just sent. See the [snapshot rule](#snapshot-rule).

#### `pgque.uninstall() → void`

Calls `stop()` (if pg_cron is present) and then `drop schema pgque cascade`. Roles (`pgque_reader`, `pgque_writer`, `pgque_admin`) are not dropped and must be removed manually if desired. `execute` is revoked from both `pgque_admin` (explicit) and PUBLIC (via the schema-wide blanket revoke), so only the schema/install owner (typically a superuser) can run it.
Expand Down
16 changes: 9 additions & 7 deletions docs/tutorial.md
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ For demos and tests, PgQue provides `pgque.force_next_tick` to bypass the tick t
-- separate transactions (psql autocommit). Do not wrap in begin/commit:
-- ticker() must see the prior send committed before it can include it in a batch.
select pgque.force_next_tick('orders');
select pgque.ticker();
select pgque.ticker(); -- separate transaction
```

```
Expand All @@ -139,6 +139,8 @@ select pgque.ticker();

`force_next_tick` returns the current tick id (the queue was seeded with tick `1` by `create_queue`). `ticker()` returns the number of queues it processed.

> Each statement above runs in its own transaction — required, not stylistic. PgQue is snapshot-based: the ticker captures a snapshot, and `receive` only returns events whose `send` committed before it. Wrapping `send` + `force_next_tick` + `ticker` + `receive` in one `begin`/`commit` returns zero rows. See the [snapshot rule](pgq-concepts.md#snapshot-rule).

Now try receiving again:

```sql
Expand Down Expand Up @@ -215,15 +217,15 @@ select pgque.set_queue_config('orders', 'max_retries', '2');

The parameter is `max_retries`, not `queue_max_retries` — `set_queue_config` prepends `queue_` for you.

Send another event, tick, and receive:
Send another event, tick, and receive (each select is a separate transaction — required, see [snapshot rule](pgq-concepts.md#snapshot-rule)):

```sql
-- send / force_next_tick / ticker / receive are four separate transactions in psql
-- autocommit. Do not wrap them in begin/commit — the snapshot rule still applies.
select pgque.send('orders', '{"order_id": 43, "total": 10.00}'::jsonb);
select pgque.force_next_tick('orders');
select pgque.ticker();
select * from pgque.receive('orders', 'processor', 100);
select pgque.ticker(); -- separate transaction
select * from pgque.receive('orders', 'processor', 100); -- separate transaction
```

```
Expand Down Expand Up @@ -254,8 +256,8 @@ The event is now in PgQ's retry queue. Moving it back into the main event stream
-- four separate transactions (psql autocommit). Do not wrap in begin/commit.
select pgque.maint_retry_events();
select pgque.force_next_tick('orders');
select pgque.ticker();
select * from pgque.receive('orders', 'processor', 100);
select pgque.ticker(); -- separate transaction
select * from pgque.receive('orders', 'processor', 100); -- separate transaction
```

In production, `pgque.start()` schedules `maint_retry_events` on its own cadence — you never call it by hand. See [`pgque.maint()`](reference.md#pgquemaint--integer) and the surrounding Lifecycle entries in the reference.
Expand Down Expand Up @@ -289,7 +291,7 @@ end $$;
-- three more, in psql autocommit. Do not wrap them in begin/commit.
select pgque.maint_retry_events();
select pgque.force_next_tick('orders');
select pgque.ticker();
select pgque.ticker(); -- separate transaction
```

The second iteration sees `retry_count = 2` and routes to the DLQ instead of the retry queue. After it runs, `receive` returns nothing — the event has moved to `pgque.dead_letter`.
Expand Down
Loading