Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions clients/typescript/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,56 @@ try {
`sendBatch()`, `ticker(queue)`, and `forceTick(queue)` are JS `bigint` to
match PostgreSQL `bigint` losslessly.

### Consumer options

`client.newConsumer(queue, name, opts?)` accepts:

| Option | Default | Notes |
|---|---|---|
| `pollInterval` | `30000` (ms) | Maximum wait between poll cycles. With LISTEN/NOTIFY enabled (the default when using `client.newConsumer`), the consumer wakes immediately on a tick notification and only falls back to this interval as a safety net for missed notifications. |
| `maxMessages` | `2147483647` | Max messages returned per `pgque.receive` call. The default is PostgreSQL's `int` maximum, so the high-level consumer requests the whole PgQ batch before acknowledging it. `pgque.ack(batch_id)` finishes the whole underlying batch, including rows beyond `maxMessages`; only lower this value when it is at least as large as the queue's possible batch size for your workload. |
| `unknownHandlerPolicy` | `'nack'` | What to do when a message arrives whose `type` has no registered handler. `'nack'` (default) routes to retry / DLQ via `pgque.nack`. `'ack'` logs a warning and lets the batch ack absorb it (silent discard). |
| `logger` | `console` | Receives `warn` / `error` lines. |

### LISTEN/NOTIFY wakeup

The consumer opens a dedicated `pg.Client` connection and issues
`LISTEN pgque_<queue>`. When the PgQue ticker emits a `pg_notify`, the
consumer wakes immediately — typically within single-digit milliseconds of
the tick — instead of waiting up to `pollInterval`. Polling remains active
as a safety net for missed notifications and network drops; `pollInterval`
becomes the maximum latency rather than the typical one.

If the dedicated LISTEN connection drops (network glitch, idle connection
timeout, etc.), the consumer reconnects automatically with a short backoff
(1 s → 2 s → 5 s, capped at `pollInterval`). During reconnection the poll
fallback keeps messages flowing. Call `consumer.start(signal)` and abort
the signal to stop cleanly; the LISTEN connection is unlistened and closed
before `start()` resolves.

### Payload coercion: `undefined` → JSON `null`

`client.send()` JSON-encodes `event.payload` before binding it as
`jsonb`. Because `JSON.stringify(undefined)` returns the JS literal
`undefined` (not the string `"null"`), the driver substitutes the JSON
literal `null` whenever the top-level `payload` is `undefined`:

```ts
// All three store the JSON value `null` in the queue:
await client.send('q', { type: 't', payload: null });
await client.send('q', { type: 't', payload: undefined });
await client.send('q', { type: 't' });
```

Inside an object, properties whose value is `undefined` are dropped by
`JSON.stringify` per the JSON spec. This is the standard JS behavior;
the driver does not try to override it:

```ts
await client.send('q', { type: 't', payload: { a: 1, b: undefined } });
// Stored as: {"a":1}
```

## Errors

All errors derive from `PgqueError`:
Expand Down
24 changes: 20 additions & 4 deletions clients/typescript/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import {
} from './errors.js';
import type { ConsumerOptions, Event, Message, NackOptions } from './types.js';

const { Pool, types } = pg;
const { Pool, Client: Client_, types } = pg;

// PostgreSQL `bigint` (OID 20) is parsed by `pg` as string by default to
// avoid silent precision loss above Number.MAX_SAFE_INTEGER. We promote to
Expand Down Expand Up @@ -69,7 +69,10 @@ interface RawMessageRow {
*/
export class Client {
/** @internal — use {@link connect} instead. */
constructor(private readonly pool: pg.Pool) {}
constructor(
private readonly pool: pg.Pool,
private readonly dsn?: string,
) {}

/** Release the connection pool. After this, the client must not be used. */
async close(): Promise<void> {
Expand Down Expand Up @@ -277,7 +280,20 @@ export class Client {
* must already be subscribed (e.g. via {@link subscribe}).
*/
newConsumer(queue: string, name: string, opts: ConsumerOptions = {}): Consumer {
return new Consumer(this, queue, name, opts);
// Inject a factory for the dedicated LISTEN pg.Client when we have a DSN
// and the caller hasn't already provided a test-stub factory.
const dsn = this.dsn;
const optsWithFactory: ConsumerOptions =
dsn !== undefined && opts._listenClientFactory === undefined
? {
...opts,
_listenClientFactory: async () => {
const c = new Client_({ connectionString: dsn });
return c;
},
}
: opts;
return new Consumer(this, queue, name, optsWithFactory);
}

/**
Expand Down Expand Up @@ -398,7 +414,7 @@ export async function connect(
throw new PgqueConnectionError(`pgque: connect: ${(err as Error).message}`, { cause: err });
}
probe.release();
return new Client(pool);
return new Client(pool, dsn);
}

function rowToMessage(row: RawMessageRow): Message {
Expand Down
131 changes: 129 additions & 2 deletions clients/typescript/src/consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,21 @@ import type { ConsumerOptions, HandlerFunc, Message } from './types.js';

const DEFAULT_MAX_MESSAGES = 2_147_483_647; // PostgreSQL int4 max; request the whole batch by default.

// Reconnect backoff steps (ms) for the LISTEN connection on disconnect.
// Capped at the consumer's pollInterval.
const LISTEN_BACKOFF_MS = [1_000, 2_000, 5_000];

/**
* High-level consumer that polls `pgque.receive`, dispatches each message
* to a per-event-type handler, and finalizes the batch with `ack` (or
* per-message `nack` on handler failure / unknown event type).
*
* **LISTEN/NOTIFY wakeup:** the consumer opens a dedicated `pg.Client`
* connection and issues `LISTEN pgque_<queue>`. When the PgQue ticker
* emits a `pg_notify`, the consumer wakes immediately instead of waiting
* for the next poll interval. Polling is retained as a safety net for
* missed notifications and network drops.
*
* Usage:
* ```ts
* const consumer = client.newConsumer('orders', 'order_worker');
Expand All @@ -26,6 +36,7 @@ export class Consumer {
private readonly maxMessages: number;
private readonly unknownHandlerPolicy: 'ack' | 'nack';
private readonly logger: Pick<Console, 'warn' | 'error'>;
private readonly listenClientFactory: ConsumerOptions['_listenClientFactory'];

/** @internal — use {@link Client.newConsumer}. */
constructor(
Expand All @@ -38,6 +49,7 @@ export class Consumer {
this.maxMessages = opts.maxMessages ?? DEFAULT_MAX_MESSAGES;
this.unknownHandlerPolicy = opts.unknownHandlerPolicy ?? 'nack';
this.logger = opts.logger ?? console;
this.listenClientFactory = opts._listenClientFactory;
}

/** Register a handler for `eventType`. Replaces any previous handler. */
Expand All @@ -55,20 +67,122 @@ export class Consumer {
* `client.receive()` call. If a `receive()` round-trip is in progress
* when the signal fires, the loop will drain that call to completion
* before exiting.
*
* **LISTEN/NOTIFY:** a dedicated `pg.Client` opens `LISTEN pgque_<queue>`
* so the loop wakes as soon as the ticker fires rather than waiting for the
* full `pollInterval`. Polling remains active as a fallback safety net.
*/
async start(signal?: AbortSignal): Promise<void> {
// notifyResolve is the resolve function for the current "wait for notify"
// promise. Calling it wakes the poll loop early.
let notifyResolve: (() => void) | null = null;

const makeNotifyPromise = (): Promise<void> =>
new Promise<void>((resolve) => {
notifyResolve = resolve;
});

let currentNotifyPromise = makeNotifyPromise();

const onNotification = (): void => {
const resolve = notifyResolve;
if (resolve) {
notifyResolve = null;
resolve();
// Pre-arm next cycle so the handler is always ready.
currentNotifyPromise = makeNotifyPromise();
}
};

// Spin up LISTEN connection in the background; reconnect on drops.
const channel = `pgque_${this.queue}`;

const connectListen = async (): Promise<void> => {
if (!this.listenClientFactory) {
// No factory injected — skip LISTEN; poll-only fallback.
return;
}
let backoffIdx = 0;
while (!signal?.aborted) {
try {
const client = await this.listenClientFactory();

client.on('notification', onNotification);
client.on('error', (_err: Error) => {
// Suppress the unhandled rejection that Node.js emits for
// EventEmitter 'error' events; the disconnect is handled below.
});

await client.connect();
await client.query(`LISTEN ${quoteIdentifier(channel)}`);
backoffIdx = 0; // reset on successful connect

// Block until aborted or the connection emits 'end'.
if (!signal?.aborted) {
await new Promise<void>((resolve) => {
const onEnd = (): void => resolve();
const onAbort = (): void => {
client.removeListener('end', onEnd);
resolve();
};
client.once('end', onEnd);
signal?.addEventListener('abort', onAbort, { once: true });
});
}

// Clean up: UNLISTEN + end.
client.removeListener('notification', onNotification);
try {
await client.query(`UNLISTEN ${quoteIdentifier(channel)}`);
} catch {
// ignore: connection may already be broken
}
try {
await client.end();
} catch {
// ignore
}

if (signal?.aborted) return;

// Unexpected disconnect — fall through to reconnect logic below.
this.logger.warn(`pgque: LISTEN connection dropped for ${channel}, reconnecting`);
} catch (err) {
this.logger.error(
`pgque: LISTEN connect error for ${channel}: ${formatErr(err)}, reconnecting`,
);
}

if (signal?.aborted) return;

// Exponential backoff capped at pollInterval.
const delay = Math.min(
LISTEN_BACKOFF_MS[backoffIdx] ?? LISTEN_BACKOFF_MS[LISTEN_BACKOFF_MS.length - 1]!,
this.pollIntervalMs,
);
backoffIdx = Math.min(backoffIdx + 1, LISTEN_BACKOFF_MS.length - 1);
await sleep(delay, signal);
}
};

// Launch the LISTEN loop; don't await — it runs alongside the poll loop.
const listenDone = connectListen();

// Poll loop.
while (!signal?.aborted) {
let msgs: Message[];
try {
msgs = await this.client.receive(this.queue, this.name, this.maxMessages);
} catch (err) {
this.logger.error(`pgque: receive error: ${formatErr(err)}`);
await sleep(this.pollIntervalMs, signal);
await Promise.race([sleep(this.pollIntervalMs, signal), currentNotifyPromise]);
currentNotifyPromise = makeNotifyPromise();
continue;
}

if (msgs.length === 0) {
await sleep(this.pollIntervalMs, signal);
await Promise.race([sleep(this.pollIntervalMs, signal), currentNotifyPromise]);
currentNotifyPromise = makeNotifyPromise();
continue;
}

Expand Down Expand Up @@ -120,6 +234,11 @@ export class Consumer {
}
}
}

// Signal aborted: wake the notify promise so the listen loop exits cleanly.
onNotification();
// Ensure the LISTEN loop also exits (it checks signal?.aborted).
await listenDone;
}

/** Returns true if the nack succeeded, false if it threw (and was logged). */
Expand Down Expand Up @@ -156,3 +275,11 @@ function sleep(ms: number, signal?: AbortSignal): Promise<void> {
signal?.addEventListener('abort', onAbort, { once: true });
});
}

/**
* Quote a PostgreSQL identifier (channel name) for safe use in LISTEN/UNLISTEN.
* Doubles any double-quote characters and wraps in double quotes.
*/
function quoteIdentifier(name: string): string {
return `"${name.replace(/"/g, '""')}"`;
}
5 changes: 5 additions & 0 deletions clients/typescript/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@ export interface ConsumerOptions {
unknownHandlerPolicy?: 'ack' | 'nack';
/** Optional logger. Defaults to `console`. */
logger?: Pick<Console, 'warn' | 'error'>;
/**
* @internal — used in tests to inject a stub `pg.Client` for the
* dedicated LISTEN connection instead of opening a real one.
*/
_listenClientFactory?: () => Promise<import('pg').Client>;
}

/**
Expand Down
Loading
Loading