From 2a7de795579486f80c905248b67fba3836fe5065 Mon Sep 17 00:00:00 2001 From: Claude Date: Sun, 3 May 2026 19:56:10 +0000 Subject: [PATCH 1/2] test(clients/typescript): add LISTEN/NOTIFY wakeup tests (red) https://claude.ai/code/session_015Tf54iAd24uQPKCBaXeiTV --- clients/typescript/test/consumer.test.ts | 102 +++++++++++++++++++++++ 1 file changed, 102 insertions(+) diff --git a/clients/typescript/test/consumer.test.ts b/clients/typescript/test/consumer.test.ts index a380eef8..27b25451 100644 --- a/clients/typescript/test/consumer.test.ts +++ b/clients/typescript/test/consumer.test.ts @@ -2,6 +2,7 @@ // Copyright 2026 Nikolay Samokhvalov. Apache-2.0 license. import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; +import { EventEmitter } from 'node:events'; import { Consumer } from '../src/consumer.js'; import type { Client } from '../src/client.js'; import type { Message } from '../src/types.js'; @@ -108,6 +109,38 @@ describe('Consumer (env-gated)', () => { expect(elapsed).toBeLessThan(2000); }); + skipIfNoDb('LISTEN/NOTIFY wakes consumer before pollInterval elapses', async () => { + // Use a very long pollInterval so only a NOTIFY can wake the consumer in time. + const consumer = env.client.newConsumer(env.queue, env.consumer, { + pollInterval: 60_000, + }); + const seen: Message[] = []; + consumer.handle('notify.test', async (msg) => { + seen.push(msg); + }); + + const ac = new AbortController(); + const startPromise = consumer.start(ac.signal); + + // Give the consumer a moment to set up its LISTEN connection. + await sleep(200); + + // Send a message and advance the queue so the ticker emits a NOTIFY. + await env.client.send(env.queue, { type: 'notify.test', payload: { x: 1 } }); + await advanceQueue(env.client, env.queue); + + // Assert delivery within 2 s — clearly via NOTIFY, not the 60 s poll. + const deadline = Date.now() + 2000; + while (Date.now() < deadline && seen.length === 0) { + await sleep(50); + } + + ac.abort(); + await startPromise; + + expect(seen).toHaveLength(1); + }, 10_000); + skipIfNoDb('unhandled message types are nacked, not silently consumed', async () => { await env.client.send(env.queue, { type: 'unknown', payload: { v: 1 } }); await advanceQueue(env.client, env.queue); @@ -214,6 +247,75 @@ describe('Consumer (in-memory mocks)', () => { expect(fakeClient.receive.mock.calls[0]).toEqual(['q', 'c', 2_147_483_647]); }); + it('NOTIFY wakes the poll loop before the long sleep elapses (stub pg.Client)', async () => { + // Build a fake pg.Client stub that emits 'notification' events. + const notifyEmitter = new EventEmitter(); + let listenCalled = false; + let unlistenCalled = false; + + const fakePgClient = { + connect: vi.fn(async () => undefined), + end: vi.fn(async () => undefined), + query: vi.fn(async (sql: string) => { + if (/^\s*LISTEN/i.test(sql)) listenCalled = true; + if (/^\s*UNLISTEN/i.test(sql)) unlistenCalled = true; + return { rows: [] }; + }), + on: vi.fn((event: string, handler: (...args: unknown[]) => void) => { + notifyEmitter.on(event, handler); + }), + removeListener: vi.fn((event: string, handler: (...args: unknown[]) => void) => { + notifyEmitter.removeListener(event, handler); + }), + }; + + // Consumer with a long pollInterval — only NOTIFY should trigger a cycle. + const fakeClient = { + receive: vi.fn(async () => []), + ack: vi.fn(async () => undefined), + nack: vi.fn(async () => undefined), + }; + + const consumer = new Consumer( + fakeClient as unknown as Client, + 'orders', + 'worker', + { + pollInterval: 60_000, + logger: { warn: () => undefined, error: () => undefined }, + _listenClientFactory: async () => fakePgClient as unknown as import('pg').Client, + }, + ); + + const ac = new AbortController(); + const startPromise = consumer.start(ac.signal); + + // Wait for LISTEN to be registered. + const listenDeadline = Date.now() + 1000; + while (!listenCalled && Date.now() < listenDeadline) { + await sleep(10); + } + expect(listenCalled).toBe(true); + + // Record how many receive() calls happened before we fire the notification. + const callsBefore = fakeClient.receive.mock.calls.length; + + // Fire a simulated NOTIFY from the server. + notifyEmitter.emit('notification', { channel: 'pgque_orders', payload: '42' }); + + // The consumer should wake up and call receive() again within 500 ms. + const wakeDeadline = Date.now() + 500; + while (fakeClient.receive.mock.calls.length <= callsBefore && Date.now() < wakeDeadline) { + await sleep(10); + } + + ac.abort(); + await startPromise; + + expect(fakeClient.receive.mock.calls.length).toBeGreaterThan(callsBefore); + expect(unlistenCalled).toBe(true); + }); + it('passes configured maxMessages to receive', async () => { const fakeClient = { receive: vi.fn(async () => []), From 774e88bbfc563d992a2ccde7d8637ab70c138e8d Mon Sep 17 00:00:00 2001 From: Claude Date: Sun, 3 May 2026 20:02:46 +0000 Subject: [PATCH 2/2] fix(clients/typescript): LISTEN/NOTIFY consumer (#147) https://claude.ai/code/session_015Tf54iAd24uQPKCBaXeiTV --- clients/typescript/README.md | 50 +++++++++ clients/typescript/src/client.ts | 24 ++++- clients/typescript/src/consumer.ts | 131 ++++++++++++++++++++++- clients/typescript/src/types.ts | 5 + clients/typescript/test/consumer.test.ts | 3 + 5 files changed, 207 insertions(+), 6 deletions(-) diff --git a/clients/typescript/README.md b/clients/typescript/README.md index 1e1a55ca..69b2221c 100644 --- a/clients/typescript/README.md +++ b/clients/typescript/README.md @@ -82,6 +82,56 @@ try { `sendBatch()`, `ticker(queue)`, and `forceTick(queue)` are JS `bigint` to match PostgreSQL `bigint` losslessly. +### Consumer options + +`client.newConsumer(queue, name, opts?)` accepts: + +| Option | Default | Notes | +|---|---|---| +| `pollInterval` | `30000` (ms) | Maximum wait between poll cycles. With LISTEN/NOTIFY enabled (the default when using `client.newConsumer`), the consumer wakes immediately on a tick notification and only falls back to this interval as a safety net for missed notifications. | +| `maxMessages` | `2147483647` | Max messages returned per `pgque.receive` call. The default is PostgreSQL's `int` maximum, so the high-level consumer requests the whole PgQ batch before acknowledging it. `pgque.ack(batch_id)` finishes the whole underlying batch, including rows beyond `maxMessages`; only lower this value when it is at least as large as the queue's possible batch size for your workload. | +| `unknownHandlerPolicy` | `'nack'` | What to do when a message arrives whose `type` has no registered handler. `'nack'` (default) routes to retry / DLQ via `pgque.nack`. `'ack'` logs a warning and lets the batch ack absorb it (silent discard). | +| `logger` | `console` | Receives `warn` / `error` lines. | + +### LISTEN/NOTIFY wakeup + +The consumer opens a dedicated `pg.Client` connection and issues +`LISTEN pgque_`. When the PgQue ticker emits a `pg_notify`, the +consumer wakes immediately — typically within single-digit milliseconds of +the tick — instead of waiting up to `pollInterval`. Polling remains active +as a safety net for missed notifications and network drops; `pollInterval` +becomes the maximum latency rather than the typical one. + +If the dedicated LISTEN connection drops (network glitch, idle connection +timeout, etc.), the consumer reconnects automatically with a short backoff +(1 s → 2 s → 5 s, capped at `pollInterval`). During reconnection the poll +fallback keeps messages flowing. Call `consumer.start(signal)` and abort +the signal to stop cleanly; the LISTEN connection is unlistened and closed +before `start()` resolves. + +### Payload coercion: `undefined` → JSON `null` + +`client.send()` JSON-encodes `event.payload` before binding it as +`jsonb`. Because `JSON.stringify(undefined)` returns the JS literal +`undefined` (not the string `"null"`), the driver substitutes the JSON +literal `null` whenever the top-level `payload` is `undefined`: + +```ts +// All three store the JSON value `null` in the queue: +await client.send('q', { type: 't', payload: null }); +await client.send('q', { type: 't', payload: undefined }); +await client.send('q', { type: 't' }); +``` + +Inside an object, properties whose value is `undefined` are dropped by +`JSON.stringify` per the JSON spec. This is the standard JS behavior; +the driver does not try to override it: + +```ts +await client.send('q', { type: 't', payload: { a: 1, b: undefined } }); +// Stored as: {"a":1} +``` + ## Errors All errors derive from `PgqueError`: diff --git a/clients/typescript/src/client.ts b/clients/typescript/src/client.ts index 4495e83e..36b8b86e 100644 --- a/clients/typescript/src/client.ts +++ b/clients/typescript/src/client.ts @@ -11,7 +11,7 @@ import { } from './errors.js'; import type { ConsumerOptions, Event, Message, NackOptions } from './types.js'; -const { Pool, types } = pg; +const { Pool, Client: Client_, types } = pg; // PostgreSQL `bigint` (OID 20) is parsed by `pg` as string by default to // avoid silent precision loss above Number.MAX_SAFE_INTEGER. We promote to @@ -69,7 +69,10 @@ interface RawMessageRow { */ export class Client { /** @internal — use {@link connect} instead. */ - constructor(private readonly pool: pg.Pool) {} + constructor( + private readonly pool: pg.Pool, + private readonly dsn?: string, + ) {} /** Release the connection pool. After this, the client must not be used. */ async close(): Promise { @@ -277,7 +280,20 @@ export class Client { * must already be subscribed (e.g. via {@link subscribe}). */ newConsumer(queue: string, name: string, opts: ConsumerOptions = {}): Consumer { - return new Consumer(this, queue, name, opts); + // Inject a factory for the dedicated LISTEN pg.Client when we have a DSN + // and the caller hasn't already provided a test-stub factory. + const dsn = this.dsn; + const optsWithFactory: ConsumerOptions = + dsn !== undefined && opts._listenClientFactory === undefined + ? { + ...opts, + _listenClientFactory: async () => { + const c = new Client_({ connectionString: dsn }); + return c; + }, + } + : opts; + return new Consumer(this, queue, name, optsWithFactory); } /** @@ -398,7 +414,7 @@ export async function connect( throw new PgqueConnectionError(`pgque: connect: ${(err as Error).message}`, { cause: err }); } probe.release(); - return new Client(pool); + return new Client(pool, dsn); } function rowToMessage(row: RawMessageRow): Message { diff --git a/clients/typescript/src/consumer.ts b/clients/typescript/src/consumer.ts index dcbeb946..d7785c0e 100644 --- a/clients/typescript/src/consumer.ts +++ b/clients/typescript/src/consumer.ts @@ -6,11 +6,21 @@ import type { ConsumerOptions, HandlerFunc, Message } from './types.js'; const DEFAULT_MAX_MESSAGES = 2_147_483_647; // PostgreSQL int4 max; request the whole batch by default. +// Reconnect backoff steps (ms) for the LISTEN connection on disconnect. +// Capped at the consumer's pollInterval. +const LISTEN_BACKOFF_MS = [1_000, 2_000, 5_000]; + /** * High-level consumer that polls `pgque.receive`, dispatches each message * to a per-event-type handler, and finalizes the batch with `ack` (or * per-message `nack` on handler failure / unknown event type). * + * **LISTEN/NOTIFY wakeup:** the consumer opens a dedicated `pg.Client` + * connection and issues `LISTEN pgque_`. When the PgQue ticker + * emits a `pg_notify`, the consumer wakes immediately instead of waiting + * for the next poll interval. Polling is retained as a safety net for + * missed notifications and network drops. + * * Usage: * ```ts * const consumer = client.newConsumer('orders', 'order_worker'); @@ -26,6 +36,7 @@ export class Consumer { private readonly maxMessages: number; private readonly unknownHandlerPolicy: 'ack' | 'nack'; private readonly logger: Pick; + private readonly listenClientFactory: ConsumerOptions['_listenClientFactory']; /** @internal — use {@link Client.newConsumer}. */ constructor( @@ -38,6 +49,7 @@ export class Consumer { this.maxMessages = opts.maxMessages ?? DEFAULT_MAX_MESSAGES; this.unknownHandlerPolicy = opts.unknownHandlerPolicy ?? 'nack'; this.logger = opts.logger ?? console; + this.listenClientFactory = opts._listenClientFactory; } /** Register a handler for `eventType`. Replaces any previous handler. */ @@ -55,20 +67,122 @@ export class Consumer { * `client.receive()` call. If a `receive()` round-trip is in progress * when the signal fires, the loop will drain that call to completion * before exiting. + * + * **LISTEN/NOTIFY:** a dedicated `pg.Client` opens `LISTEN pgque_` + * so the loop wakes as soon as the ticker fires rather than waiting for the + * full `pollInterval`. Polling remains active as a fallback safety net. */ async start(signal?: AbortSignal): Promise { + // notifyResolve is the resolve function for the current "wait for notify" + // promise. Calling it wakes the poll loop early. + let notifyResolve: (() => void) | null = null; + + const makeNotifyPromise = (): Promise => + new Promise((resolve) => { + notifyResolve = resolve; + }); + + let currentNotifyPromise = makeNotifyPromise(); + + const onNotification = (): void => { + const resolve = notifyResolve; + if (resolve) { + notifyResolve = null; + resolve(); + // Pre-arm next cycle so the handler is always ready. + currentNotifyPromise = makeNotifyPromise(); + } + }; + + // Spin up LISTEN connection in the background; reconnect on drops. + const channel = `pgque_${this.queue}`; + + const connectListen = async (): Promise => { + if (!this.listenClientFactory) { + // No factory injected — skip LISTEN; poll-only fallback. + return; + } + let backoffIdx = 0; + while (!signal?.aborted) { + try { + const client = await this.listenClientFactory(); + + client.on('notification', onNotification); + client.on('error', (_err: Error) => { + // Suppress the unhandled rejection that Node.js emits for + // EventEmitter 'error' events; the disconnect is handled below. + }); + + await client.connect(); + await client.query(`LISTEN ${quoteIdentifier(channel)}`); + backoffIdx = 0; // reset on successful connect + + // Block until aborted or the connection emits 'end'. + if (!signal?.aborted) { + await new Promise((resolve) => { + const onEnd = (): void => resolve(); + const onAbort = (): void => { + client.removeListener('end', onEnd); + resolve(); + }; + client.once('end', onEnd); + signal?.addEventListener('abort', onAbort, { once: true }); + }); + } + + // Clean up: UNLISTEN + end. + client.removeListener('notification', onNotification); + try { + await client.query(`UNLISTEN ${quoteIdentifier(channel)}`); + } catch { + // ignore: connection may already be broken + } + try { + await client.end(); + } catch { + // ignore + } + + if (signal?.aborted) return; + + // Unexpected disconnect — fall through to reconnect logic below. + this.logger.warn(`pgque: LISTEN connection dropped for ${channel}, reconnecting`); + } catch (err) { + this.logger.error( + `pgque: LISTEN connect error for ${channel}: ${formatErr(err)}, reconnecting`, + ); + } + + if (signal?.aborted) return; + + // Exponential backoff capped at pollInterval. + const delay = Math.min( + LISTEN_BACKOFF_MS[backoffIdx] ?? LISTEN_BACKOFF_MS[LISTEN_BACKOFF_MS.length - 1]!, + this.pollIntervalMs, + ); + backoffIdx = Math.min(backoffIdx + 1, LISTEN_BACKOFF_MS.length - 1); + await sleep(delay, signal); + } + }; + + // Launch the LISTEN loop; don't await — it runs alongside the poll loop. + const listenDone = connectListen(); + + // Poll loop. while (!signal?.aborted) { let msgs: Message[]; try { msgs = await this.client.receive(this.queue, this.name, this.maxMessages); } catch (err) { this.logger.error(`pgque: receive error: ${formatErr(err)}`); - await sleep(this.pollIntervalMs, signal); + await Promise.race([sleep(this.pollIntervalMs, signal), currentNotifyPromise]); + currentNotifyPromise = makeNotifyPromise(); continue; } if (msgs.length === 0) { - await sleep(this.pollIntervalMs, signal); + await Promise.race([sleep(this.pollIntervalMs, signal), currentNotifyPromise]); + currentNotifyPromise = makeNotifyPromise(); continue; } @@ -120,6 +234,11 @@ export class Consumer { } } } + + // Signal aborted: wake the notify promise so the listen loop exits cleanly. + onNotification(); + // Ensure the LISTEN loop also exits (it checks signal?.aborted). + await listenDone; } /** Returns true if the nack succeeded, false if it threw (and was logged). */ @@ -156,3 +275,11 @@ function sleep(ms: number, signal?: AbortSignal): Promise { signal?.addEventListener('abort', onAbort, { once: true }); }); } + +/** + * Quote a PostgreSQL identifier (channel name) for safe use in LISTEN/UNLISTEN. + * Doubles any double-quote characters and wraps in double quotes. + */ +function quoteIdentifier(name: string): string { + return `"${name.replace(/"/g, '""')}"`; +} diff --git a/clients/typescript/src/types.ts b/clients/typescript/src/types.ts index 0c27aa43..274a44a3 100644 --- a/clients/typescript/src/types.ts +++ b/clients/typescript/src/types.ts @@ -67,6 +67,11 @@ export interface ConsumerOptions { unknownHandlerPolicy?: 'ack' | 'nack'; /** Optional logger. Defaults to `console`. */ logger?: Pick; + /** + * @internal — used in tests to inject a stub `pg.Client` for the + * dedicated LISTEN connection instead of opening a real one. + */ + _listenClientFactory?: () => Promise; } /** diff --git a/clients/typescript/test/consumer.test.ts b/clients/typescript/test/consumer.test.ts index 27b25451..28fbd158 100644 --- a/clients/typescript/test/consumer.test.ts +++ b/clients/typescript/test/consumer.test.ts @@ -264,6 +264,9 @@ describe('Consumer (in-memory mocks)', () => { on: vi.fn((event: string, handler: (...args: unknown[]) => void) => { notifyEmitter.on(event, handler); }), + once: vi.fn((event: string, handler: (...args: unknown[]) => void) => { + notifyEmitter.once(event, handler); + }), removeListener: vi.fn((event: string, handler: (...args: unknown[]) => void) => { notifyEmitter.removeListener(event, handler); }),