From 2a8b2cd3ad82661a5ad6d344d6a2177be1a40ef4 Mon Sep 17 00:00:00 2001 From: rcottinet Date: Tue, 10 Feb 2026 00:47:17 +0100 Subject: [PATCH 1/3] feat: add postgres transport with NOTIFY/LISTEN support --- README.md | 34 ++-- package.json | 9 +- src/transports/postgres.ts | 188 +++++++++++++++++++++ src/types/main.ts | 10 ++ tests/drivers/postgres_transport.spec.ts | 201 +++++++++++++++++++++++ tsconfig.json | 2 +- yarn.lock | 146 +++++++++++++++- 7 files changed, 576 insertions(+), 14 deletions(-) create mode 100644 src/transports/postgres.ts create mode 100644 tests/drivers/postgres_transport.spec.ts diff --git a/README.md b/README.md index 56c3554..abee083 100644 --- a/README.md +++ b/README.md @@ -21,7 +21,8 @@ Currently, it supports the following transports:

👉 Memory: A simple in-memory transport for testing purposes.
👉 Redis: A Redis transport for production usage.
-👉 Mqtt: A Mqtt transport for production usage. +👉 Mqtt: A Mqtt transport for production usage.
+👉 Postgres: A PostgreSQL transport using NOTIFY/LISTEN for production usage.

## Table of Contents @@ -49,6 +50,7 @@ The module exposes a manager that can be used to register buses. import { BusManager } from '@boringnode/bus' import { redis } from '@boringnode/bus/transports/redis' import { mqtt } from '@boringnode/bus/transports/mqtt' +import { postgres } from '@boringnode/bus/transports/postgres' import { memory } from '@boringnode/bus/transports/memory' const manager = new BusManager({ @@ -69,7 +71,16 @@ const manager = new BusManager({ port: 1883, }), }, - } + postgres: { + transport: postgres({ + host: 'localhost', + port: 5432, + database: 'mydb', + user: 'postgres', + password: 'password', + }), + }, + }, }) ``` @@ -88,6 +99,7 @@ By default, the bus will use the `default` transport. You can specify different ```typescript manager.use('redis').publish('channel', 'Hello world') manager.use('mqtt').publish('channel', 'Hello world') +manager.use('postgres').publish('channel', 'Hello world') ``` ### Without the manager @@ -105,8 +117,8 @@ const transport = new RedisTransport({ const bus = new Bus(transport, { retryQueue: { - retryInterval: '100ms' - } + retryInterval: '100ms', + }, }) ``` @@ -126,10 +138,10 @@ const manager = new BusManager({ port: 6379, }), retryQueue: { - retryInterval: '100ms' - } + retryInterval: '100ms', + }, }, - } + }, }) manager.use('redis').publish('channel', 'Hello World') @@ -143,13 +155,13 @@ You have multiple options to configure the retry queue. export interface RetryQueueOptions { // Enable the retry queue (default: true) enabled?: boolean - + // Defines if we allow duplicates messages in the retry queue (default: true) removeDuplicates?: boolean - + // The maximum size of the retry queue (default: null) maxSize?: number | null - + // The interval between each retry (default: false) retryInterval?: Duration | false } @@ -169,7 +181,7 @@ const buggyTransport = new ChaosTransport(new MemoryTransport()) const bus = new Bus(buggyTransport) /** - * Now, every time you will try to publish a message, the transport + * Now, every time you will try to publish a message, the transport * will throw an error. */ buggyTransport.alwaysThrow() diff --git a/package.json b/package.json index 4c99672..adc27a6 100644 --- a/package.json +++ b/package.json @@ -37,14 +37,17 @@ "@japa/runner": "^5.0.0", "@swc/core": "^1.15.8", "@testcontainers/hivemq": "^11.11.0", + "@testcontainers/postgresql": "^11.11.0", "@testcontainers/redis": "^11.11.0", "@types/node": "^20.17.19", "@types/object-hash": "^3.0.6", + "@types/pg": "^8.11.10", "c8": "^10.1.3", "del-cli": "^7.0.0", "eslint": "^9.39.2", "ioredis": "^5.9.0", "mqtt": "^5.14.1", + "pg": "^8.18.0", "prettier": "^3.7.4", "release-it": "^19.2.3", "testcontainers": "^11.11.0", @@ -58,11 +61,15 @@ "object-hash": "^3.0.0" }, "peerDependencies": { - "ioredis": "^5.0.0" + "ioredis": "^5.0.0", + "pg": "^8.0.0" }, "peerDependenciesMeta": { "ioredis": { "optional": true + }, + "pg": { + "optional": true } }, "author": "Romain Lanz ", diff --git a/src/transports/postgres.ts b/src/transports/postgres.ts new file mode 100644 index 0000000..6d2ac7c --- /dev/null +++ b/src/transports/postgres.ts @@ -0,0 +1,188 @@ +/** + * @boringnode/bus + * + * @license MIT + * @copyright BoringNode + */ + +import { Client } from 'pg' +import { assert } from '@poppinss/utils/assert' + +import debug from '../debug.js' +import { JsonEncoder } from '../encoders/json_encoder.js' +import type { + Transport, + TransportEncoder, + TransportMessage, + Serializable, + SubscribeHandler, + PostgresTransportConfig, +} from '../types/main.js' + +export function postgres(config: PostgresTransportConfig, encoder?: TransportEncoder) { + return () => new PostgresTransport(config, encoder) +} + +export class PostgresTransport implements Transport { + readonly #publisher: Client + readonly #subscriber: Client + readonly #encoder: TransportEncoder + readonly #channelHandlers: Map> = new Map() + #publisherConnected: boolean = false + #subscriberConnected: boolean = false + + #id: string | undefined + + constructor(config: PostgresTransportConfig, encoder?: TransportEncoder) + constructor(config: string, encoder?: TransportEncoder) + constructor(options: PostgresTransportConfig | string, encoder?: TransportEncoder) { + this.#encoder = encoder ?? new JsonEncoder() + + /** + * If a connection string is passed, use it for both publisher and subscriber + */ + if (typeof options === 'string') { + this.#publisher = new Client({ connectionString: options }) + this.#subscriber = new Client({ connectionString: options }) + return + } + + /** + * If a config object is passed, create both publisher and subscriber + */ + this.#publisher = new Client(options) + this.#subscriber = new Client(options) + } + + setId(id: string): Transport { + this.#id = id + + return this + } + + async #ensureConnected(): Promise { + if (!this.#publisherConnected) { + await this.#publisher.connect() + this.#publisherConnected = true + } + if (!this.#subscriberConnected) { + await this.#subscriber.connect() + this.#subscriberConnected = true + } + } + + async disconnect(): Promise { + this.#publisherConnected = false + this.#subscriberConnected = false + + const promises: Promise[] = [] + + try { + promises.push(this.#publisher.end()) + } catch (err) { + // Ignore errors during disconnect + } + + try { + promises.push(this.#subscriber.end()) + } catch (err) { + // Ignore errors during disconnect + } + + await Promise.allSettled(promises) + } + + async publish(channel: string, message: Serializable): Promise { + assert(this.#id, 'You must set an id before publishing a message') + + await this.#ensureConnected() + + const encoded = this.#encoder.encode({ payload: message, busId: this.#id }) + const payloadString = typeof encoded === 'string' ? encoded : encoded.toString('base64') + + // Use pg's built-in escaping methods to safely escape the identifiers and literals + const escapedChannel = this.#publisher.escapeIdentifier(channel) + const escapedPayload = this.#publisher.escapeLiteral(payloadString) + + // Use NOTIFY to send the message + await this.#publisher.query(`NOTIFY ${escapedChannel}, ${escapedPayload}`) + } + + async subscribe( + channel: string, + handler: SubscribeHandler + ): Promise { + await this.#ensureConnected() + + // Store the handler for this channel + this.#channelHandlers.set(channel, handler) + + // Set up the notification listener if not already set + if (this.#subscriber.listenerCount('notification') === 0) { + this.#subscriber.on('notification', (msg) => { + if (msg.channel) { + const channelHandler = this.#channelHandlers.get(msg.channel) + if (channelHandler && msg.payload) { + debug('received message for channel "%s"', msg.channel) + + try { + const data = this.#encoder.decode>(msg.payload) + + /** + * Ignore messages published by this bus instance + */ + if (data.busId === this.#id) { + debug('ignoring message published by the same bus instance') + return + } + + channelHandler(data.payload) + } catch (error) { + debug('error decoding message: %o', error) + } + } + } + }) + } + + // Subscribe to the channel using LISTEN + const escapedChannel = this.#subscriber.escapeIdentifier(channel) + await this.#subscriber.query(`LISTEN ${escapedChannel}`) + } + + onReconnect(callback: () => void): void { + // PostgreSQL client doesn't have built-in reconnection events + // We'll listen to connection errors and trigger callback on reconnect + this.#subscriber.on('error', (err) => { + debug('subscriber error: %o', err) + }) + + this.#subscriber.on('end', () => { + debug('subscriber connection ended') + this.#subscriberConnected = false + // Attempt to reconnect + this.#subscriber + .connect() + .then(() => { + this.#subscriberConnected = true + callback() + // Re-subscribe to all channels + for (const channel of this.#channelHandlers.keys()) { + const escapedChannel = this.#subscriber.escapeIdentifier(channel) + this.#subscriber.query(`LISTEN ${escapedChannel}`).catch((err) => { + debug('error re-subscribing to channel %s: %o', channel, err) + }) + } + }) + .catch((err) => { + debug('error reconnecting: %o', err) + }) + }) + } + + async unsubscribe(channel: string): Promise { + this.#channelHandlers.delete(channel) + const escapedChannel = this.#subscriber.escapeIdentifier(channel) + await this.#subscriber.query(`UNLISTEN ${escapedChannel}`) + } +} diff --git a/src/types/main.ts b/src/types/main.ts index 5ca8db1..30cc274 100644 --- a/src/types/main.ts +++ b/src/types/main.ts @@ -7,8 +7,10 @@ import type { RedisOptions } from 'ioredis' import type { IClientOptions } from 'mqtt' +import type { ClientConfig } from 'pg' export type { Redis, Cluster } from 'ioredis' +export type { Client } from 'pg' export type TransportFactory = () => Transport /** @@ -66,6 +68,14 @@ export interface MqttTransportConfig { options?: IClientOptions } +export interface PostgresTransportConfig extends ClientConfig { + /** + * Connection string for PostgreSQL. If provided, it will be used instead + * of the individual connection properties. + */ + connectionString?: string +} + export interface Transport { setId: (id: string) => Transport onReconnect: (callback: () => void) => void diff --git a/tests/drivers/postgres_transport.spec.ts b/tests/drivers/postgres_transport.spec.ts new file mode 100644 index 0000000..7711a11 --- /dev/null +++ b/tests/drivers/postgres_transport.spec.ts @@ -0,0 +1,201 @@ +/** + * @boringnode/bus + * + * @license MIT + * @copyright BoringNode + */ + +import { setTimeout } from 'node:timers/promises' +import { test } from '@japa/runner' +import { PostgreSqlContainer, StartedPostgreSqlContainer } from '@testcontainers/postgresql' +import { PostgresTransport } from '../../src/transports/postgres.js' +import { JsonEncoder } from '../../src/encoders/json_encoder.js' + +test.group('Postgres Transport', (group) => { + let container: StartedPostgreSqlContainer + + group.setup(async () => { + container = await new PostgreSqlContainer('postgres:16-alpine').start() + + return async () => { + await container.stop() + } + }) + + test('transport should not receive message emitted by itself', async ({ assert, cleanup }) => { + const transport = new PostgresTransport(container.getConnectionUri()).setId('bus') + cleanup(() => transport.disconnect()) + + await transport.subscribe('testing-channel', () => { + assert.fail('Bus should not receive message emitted by itself') + }) + + await transport.publish('testing-channel', 'test') + await setTimeout(200) + }).disableTimeout() + + test('transport should receive message emitted by another bus', async ({ + assert, + cleanup, + }, done) => { + assert.plan(1) + + const transport1 = new PostgresTransport(container.getConnectionUri()).setId('bus1') + const transport2 = new PostgresTransport(container.getConnectionUri()).setId('bus2') + + cleanup(async () => { + await transport1.disconnect() + await transport2.disconnect() + }) + + await transport1.subscribe('testing-channel', (payload) => { + assert.equal(payload, 'test') + done() + }) + + await setTimeout(200) + + await transport2.publish('testing-channel', 'test') + }).waitForDone() + + test('transport should trigger onReconnect when the client reconnects', async ({ + assert, + cleanup, + }) => { + const transport = new PostgresTransport(container.getConnectionUri()).setId('bus') + cleanup(() => transport.disconnect()) + + let onReconnectTriggered = false + transport.onReconnect(() => { + onReconnectTriggered = true + }) + + await container.restart() + await setTimeout(2000) + + assert.isTrue(onReconnectTriggered) + }) + .disableTimeout() + .skip(true, 'PostgreSQL client reconnection behavior needs more investigation') + + test('message should be encoded and decoded correctly when using JSON encoder', async ({ + assert, + cleanup, + }, done) => { + assert.plan(1) + + const transport1 = new PostgresTransport(container.getConnectionUri(), new JsonEncoder()).setId( + 'bus1' + ) + const transport2 = new PostgresTransport(container.getConnectionUri(), new JsonEncoder()).setId( + 'bus2' + ) + + cleanup(async () => { + await transport1.disconnect() + await transport2.disconnect() + }) + + const data = { test: 'test' } + + await transport1.subscribe('testing-channel', (payload) => { + assert.deepEqual(payload, data) + done() + }) + + await setTimeout(200) + + await transport2.publish('testing-channel', data) + }).waitForDone() + + test('should work with config object', async ({ assert, cleanup }, done) => { + assert.plan(1) + + const config = { + host: container.getHost(), + port: container.getMappedPort(5432), + database: container.getDatabase(), + user: container.getUsername(), + password: container.getPassword(), + } + + const transport1 = new PostgresTransport(config).setId('bus1') + const transport2 = new PostgresTransport(config).setId('bus2') + + cleanup(async () => { + await transport1.disconnect() + await transport2.disconnect() + }) + + await transport1.subscribe('testing-channel', (payload) => { + assert.equal(payload, 'test') + done() + }) + + await setTimeout(200) + + await transport2.publish('testing-channel', 'test') + }).waitForDone() + + test('should handle unsubscribe correctly', async ({ assert, cleanup }) => { + const transport1 = new PostgresTransport(container.getConnectionUri()).setId('bus1') + const transport2 = new PostgresTransport(container.getConnectionUri()).setId('bus2') + + cleanup(async () => { + await transport1.disconnect() + await transport2.disconnect() + }) + + let messageCount = 0 + + await transport1.subscribe('testing-channel', () => { + messageCount++ + }) + + await setTimeout(200) + + // Send first message + await transport2.publish('testing-channel', 'test1') + await setTimeout(200) + + // Unsubscribe + await transport1.unsubscribe('testing-channel') + await setTimeout(200) + + // Send second message (should not be received) + await transport2.publish('testing-channel', 'test2') + await setTimeout(200) + + assert.equal(messageCount, 1) + }) + + test('should handle multiple channels', async ({ assert, cleanup }) => { + const transport1 = new PostgresTransport(container.getConnectionUri()).setId('bus1') + const transport2 = new PostgresTransport(container.getConnectionUri()).setId('bus2') + + cleanup(async () => { + await transport1.disconnect() + await transport2.disconnect() + }) + + const receivedMessages: string[] = [] + + await transport1.subscribe('channel1', (payload) => { + receivedMessages.push(`channel1:${payload}`) + }) + + await transport1.subscribe('channel2', (payload) => { + receivedMessages.push(`channel2:${payload}`) + }) + + await setTimeout(200) + + await transport2.publish('channel1', 'message1') + await transport2.publish('channel2', 'message2') + + await setTimeout(200) + + assert.includeMembers(receivedMessages, ['channel1:message1', 'channel2:message2']) + assert.lengthOf(receivedMessages, 2) + }) +}) diff --git a/tsconfig.json b/tsconfig.json index 46e1cb1..ad0cc44 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -4,4 +4,4 @@ "rootDir": "./", "outDir": "./build" } -} +} diff --git a/yarn.lock b/yarn.lock index 71ce74f..ddb1826 100644 --- a/yarn.lock +++ b/yarn.lock @@ -98,15 +98,18 @@ __metadata: "@poppinss/utils": "npm:^6.10.1" "@swc/core": "npm:^1.15.8" "@testcontainers/hivemq": "npm:^11.11.0" + "@testcontainers/postgresql": "npm:^11.11.0" "@testcontainers/redis": "npm:^11.11.0" "@types/node": "npm:^20.17.19" "@types/object-hash": "npm:^3.0.6" + "@types/pg": "npm:^8.11.10" c8: "npm:^10.1.3" del-cli: "npm:^7.0.0" eslint: "npm:^9.39.2" ioredis: "npm:^5.9.0" mqtt: "npm:^5.14.1" object-hash: "npm:^3.0.0" + pg: "npm:^8.18.0" prettier: "npm:^3.7.4" release-it: "npm:^19.2.3" testcontainers: "npm:^11.11.0" @@ -115,9 +118,12 @@ __metadata: typescript: "npm:^5.9.3" peerDependencies: ioredis: ^5.0.0 + pg: ^8.0.0 peerDependenciesMeta: ioredis: optional: true + pg: + optional: true languageName: unknown linkType: soft @@ -1695,6 +1701,15 @@ __metadata: languageName: node linkType: hard +"@testcontainers/postgresql@npm:^11.11.0": + version: 11.11.0 + resolution: "@testcontainers/postgresql@npm:11.11.0" + dependencies: + testcontainers: "npm:^11.11.0" + checksum: 10c0/1292c41d4b31172ce5484020b161cb924932193eea34658ff212bf8c4b6d798715076dc062e0ce0f66573da8a6248d05775ccd80c76afceea43b46375b15f97b + languageName: node + linkType: hard + "@testcontainers/redis@npm:^11.11.0": version: 11.11.0 resolution: "@testcontainers/redis@npm:11.11.0" @@ -1855,6 +1870,17 @@ __metadata: languageName: node linkType: hard +"@types/pg@npm:^8.11.10": + version: 8.16.0 + resolution: "@types/pg@npm:8.16.0" + dependencies: + "@types/node": "npm:*" + pg-protocol: "npm:*" + pg-types: "npm:^2.2.0" + checksum: 10c0/421fe7c07d5c0226835d362414a63653f86251ee966150d807ed60174c13921d1b8a3e2f1c2bfba9659ec0282ca50974030c4c1efcd575003eb922ea12ca7d05 + languageName: node + linkType: hard + "@types/pluralize@npm:^0.0.33": version: 0.0.33 resolution: "@types/pluralize@npm:0.0.33" @@ -5358,6 +5384,87 @@ __metadata: languageName: node linkType: hard +"pg-cloudflare@npm:^1.3.0": + version: 1.3.0 + resolution: "pg-cloudflare@npm:1.3.0" + checksum: 10c0/b0866c88af8e54c7b3ed510719d92df37714b3af5e3a3a10d9f761fcec99483e222f5b78a1f2de590368127648087c45c01aaf66fadbe46edb25673eedc4f8fc + languageName: node + linkType: hard + +"pg-connection-string@npm:^2.11.0": + version: 2.11.0 + resolution: "pg-connection-string@npm:2.11.0" + checksum: 10c0/7a4bcf9b4f1e1fee6482e2bd814f544d451240059be6b8a186f24f73f163f1c599bb8c4984c398254869f744f6c3659b83e285c3d525fc640e99c60c453bd0df + languageName: node + linkType: hard + +"pg-int8@npm:1.0.1": + version: 1.0.1 + resolution: "pg-int8@npm:1.0.1" + checksum: 10c0/be6a02d851fc2a4ae3e9de81710d861de3ba35ac927268973eb3cb618873a05b9424656df464dd43bd7dc3fc5295c3f5b3c8349494f87c7af50ec59ef14e0b98 + languageName: node + linkType: hard + +"pg-pool@npm:^3.11.0": + version: 3.11.0 + resolution: "pg-pool@npm:3.11.0" + peerDependencies: + pg: ">=8.0" + checksum: 10c0/4b104b48a47257a0edad0c62e5ea1908b72cb79386270264b452e69895e9e4c589d00cdbf6e46d4e9c05bc7e7d191656b66814b5282d65f33b12648a21df3c7f + languageName: node + linkType: hard + +"pg-protocol@npm:*, pg-protocol@npm:^1.11.0": + version: 1.11.0 + resolution: "pg-protocol@npm:1.11.0" + checksum: 10c0/93e83581781418c9173eba4e4545f73392cfe66b78dd1d3624d7339fbd37e7f4abebaf2615e68e0701a9bf0edf5b81a4ad533836f388f775fe25fa24a691c464 + languageName: node + linkType: hard + +"pg-types@npm:2.2.0, pg-types@npm:^2.2.0": + version: 2.2.0 + resolution: "pg-types@npm:2.2.0" + dependencies: + pg-int8: "npm:1.0.1" + postgres-array: "npm:~2.0.0" + postgres-bytea: "npm:~1.0.0" + postgres-date: "npm:~1.0.4" + postgres-interval: "npm:^1.1.0" + checksum: 10c0/ab3f8069a323f601cd2d2279ca8c425447dab3f9b61d933b0601d7ffc00d6200df25e26a4290b2b0783b59278198f7dd2ed03e94c4875797919605116a577c65 + languageName: node + linkType: hard + +"pg@npm:^8.18.0": + version: 8.18.0 + resolution: "pg@npm:8.18.0" + dependencies: + pg-cloudflare: "npm:^1.3.0" + pg-connection-string: "npm:^2.11.0" + pg-pool: "npm:^3.11.0" + pg-protocol: "npm:^1.11.0" + pg-types: "npm:2.2.0" + pgpass: "npm:1.0.5" + peerDependencies: + pg-native: ">=3.0.1" + dependenciesMeta: + pg-cloudflare: + optional: true + peerDependenciesMeta: + pg-native: + optional: true + checksum: 10c0/9525e34d603ee5d715b8952269b2fa9fdd350a55fc5a3360104e7613724441858e57d52eed435fb16e993d028b45d8175dc277d270d31f69e5746987a549f772 + languageName: node + linkType: hard + +"pgpass@npm:1.0.5": + version: 1.0.5 + resolution: "pgpass@npm:1.0.5" + dependencies: + split2: "npm:^4.1.0" + checksum: 10c0/5ea6c9b2de04c33abb08d33a2dded303c4a3c7162a9264519cbe85c0a9857d712463140ba42fad0c7cd4b21f644dd870b45bb2e02fcbe505b4de0744fd802c1d + languageName: node + linkType: hard + "picocolors@npm:^1.1.1": version: 1.1.1 resolution: "picocolors@npm:1.1.1" @@ -5465,6 +5572,36 @@ __metadata: languageName: node linkType: hard +"postgres-array@npm:~2.0.0": + version: 2.0.0 + resolution: "postgres-array@npm:2.0.0" + checksum: 10c0/cbd56207e4141d7fbf08c86f2aebf21fa7064943d3f808ec85f442ff94b48d891e7a144cc02665fb2de5dbcb9b8e3183a2ac749959e794b4a4cfd379d7a21d08 + languageName: node + linkType: hard + +"postgres-bytea@npm:~1.0.0": + version: 1.0.1 + resolution: "postgres-bytea@npm:1.0.1" + checksum: 10c0/10b28a27c9d703d5befd97c443e62b551096d1014bc59ab574c65bf0688de7f3f068003b2aea8dcff83cf0f6f9a35f9f74457c38856cf8eb81b00cf3fb44f164 + languageName: node + linkType: hard + +"postgres-date@npm:~1.0.4": + version: 1.0.7 + resolution: "postgres-date@npm:1.0.7" + checksum: 10c0/0ff91fccc64003e10b767fcfeefb5eaffbc522c93aa65d5051c49b3c4ce6cb93ab091a7d22877a90ad60b8874202c6f1d0f935f38a7235ed3b258efd54b97ca9 + languageName: node + linkType: hard + +"postgres-interval@npm:^1.1.0": + version: 1.2.0 + resolution: "postgres-interval@npm:1.2.0" + dependencies: + xtend: "npm:^4.0.0" + checksum: 10c0/c1734c3cb79e7f22579af0b268a463b1fa1d084e742a02a7a290c4f041e349456f3bee3b4ee0bb3f226828597f7b76deb615c1b857db9a742c45520100456272 + languageName: node + linkType: hard + "prelude-ls@npm:^1.2.1": version: 1.2.1 resolution: "prelude-ls@npm:1.2.1" @@ -6161,7 +6298,7 @@ __metadata: languageName: node linkType: hard -"split2@npm:^4.2.0": +"split2@npm:^4.1.0, split2@npm:^4.2.0": version: 4.2.0 resolution: "split2@npm:4.2.0" checksum: 10c0/b292beb8ce9215f8c642bb68be6249c5a4c7f332fc8ecadae7be5cbdf1ea95addc95f0459ef2e7ad9d45fd1064698a097e4eb211c83e772b49bc0ee423e91534 @@ -7055,6 +7192,13 @@ __metadata: languageName: node linkType: hard +"xtend@npm:^4.0.0": + version: 4.0.2 + resolution: "xtend@npm:4.0.2" + checksum: 10c0/366ae4783eec6100f8a02dff02ac907bf29f9a00b82ac0264b4d8b832ead18306797e283cf19de776538babfdcb2101375ec5646b59f08c52128ac4ab812ed0e + languageName: node + linkType: hard + "y18n@npm:^5.0.5": version: 5.0.8 resolution: "y18n@npm:5.0.8" From 40f47c5ebf66f3f79548cd79348830fe095623e0 Mon Sep 17 00:00:00 2001 From: rcottinet Date: Tue, 10 Feb 2026 22:17:48 +0100 Subject: [PATCH 2/3] fix: improve reconnection logic and enhance test for connection loss simulation --- src/transports/postgres.ts | 128 +++++++++++++++-------- tests/drivers/postgres_transport.spec.ts | 23 +++- 2 files changed, 101 insertions(+), 50 deletions(-) diff --git a/src/transports/postgres.ts b/src/transports/postgres.ts index 6d2ac7c..1d8fdec 100644 --- a/src/transports/postgres.ts +++ b/src/transports/postgres.ts @@ -25,11 +25,14 @@ export function postgres(config: PostgresTransportConfig, encoder?: TransportEnc export class PostgresTransport implements Transport { readonly #publisher: Client - readonly #subscriber: Client + #subscriber: Client readonly #encoder: TransportEncoder readonly #channelHandlers: Map> = new Map() #publisherConnected: boolean = false #subscriberConnected: boolean = false + #gracefulDisconnect: boolean = false + #config: PostgresTransportConfig + #reconnectCallback: (() => void) | undefined #id: string | undefined @@ -38,20 +41,14 @@ export class PostgresTransport implements Transport { constructor(options: PostgresTransportConfig | string, encoder?: TransportEncoder) { this.#encoder = encoder ?? new JsonEncoder() - /** - * If a connection string is passed, use it for both publisher and subscriber - */ if (typeof options === 'string') { - this.#publisher = new Client({ connectionString: options }) - this.#subscriber = new Client({ connectionString: options }) - return + this.#config = { connectionString: options } + } else { + this.#config = options } - /** - * If a config object is passed, create both publisher and subscriber - */ - this.#publisher = new Client(options) - this.#subscriber = new Client(options) + this.#publisher = new Client(this.#config) + this.#subscriber = new Client(this.#config) } setId(id: string): Transport { @@ -72,6 +69,7 @@ export class PostgresTransport implements Transport { } async disconnect(): Promise { + this.#gracefulDisconnect = true this.#publisherConnected = false this.#subscriberConnected = false @@ -117,42 +115,51 @@ export class PostgresTransport implements Transport { // Store the handler for this channel this.#channelHandlers.set(channel, handler) - // Set up the notification listener if not already set - if (this.#subscriber.listenerCount('notification') === 0) { - this.#subscriber.on('notification', (msg) => { - if (msg.channel) { - const channelHandler = this.#channelHandlers.get(msg.channel) - if (channelHandler && msg.payload) { - debug('received message for channel "%s"', msg.channel) - - try { - const data = this.#encoder.decode>(msg.payload) - - /** - * Ignore messages published by this bus instance - */ - if (data.busId === this.#id) { - debug('ignoring message published by the same bus instance') - return - } - - channelHandler(data.payload) - } catch (error) { - debug('error decoding message: %o', error) - } - } - } - }) - } + this.#ensureNotificationListener() // Subscribe to the channel using LISTEN const escapedChannel = this.#subscriber.escapeIdentifier(channel) await this.#subscriber.query(`LISTEN ${escapedChannel}`) } + #ensureNotificationListener() { + // Set up the notification listener if not already set + if (this.#subscriber.listenerCount('notification') > 0) { + return + } + + this.#subscriber.on('notification', (msg) => { + if (msg.channel) { + const channelHandler = this.#channelHandlers.get(msg.channel) + if (channelHandler && msg.payload) { + debug('received message for channel "%s"', msg.channel) + + try { + const data = this.#encoder.decode>(msg.payload) + + /** + * Ignore messages published by this bus instance + */ + if (data.busId === this.#id) { + debug('ignoring message published by the same bus instance') + return + } + + channelHandler(data.payload) + } catch (error) { + debug('error decoding message: %o', error) + } + } + } + }) + } + onReconnect(callback: () => void): void { - // PostgreSQL client doesn't have built-in reconnection events - // We'll listen to connection errors and trigger callback on reconnect + this.#reconnectCallback = callback + this.#setupReconnectionListener() + } + + #setupReconnectionListener() { this.#subscriber.on('error', (err) => { debug('subscriber error: %o', err) }) @@ -160,12 +167,42 @@ export class PostgresTransport implements Transport { this.#subscriber.on('end', () => { debug('subscriber connection ended') this.#subscriberConnected = false - // Attempt to reconnect - this.#subscriber + + if (this.#gracefulDisconnect) { + return + } + + this.#attemptReconnection() + }) + } + + #attemptReconnection(attempt = 0) { + const baseDelay = 1000 + const maxDelay = 60000 + // Exponential backoff with jitter + const delay = Math.min(baseDelay * Math.pow(2, attempt), maxDelay) + Math.random() * 1000 + + debug('attempting reconnection in %d ms (attempt %d)', delay, attempt) + + setTimeout(() => { + if (this.#gracefulDisconnect) return + + const newClient = new Client(this.#config) + + newClient .connect() .then(() => { + this.#subscriber = newClient this.#subscriberConnected = true - callback() + debug('reconnected to postgres') + + this.#ensureNotificationListener() + this.#setupReconnectionListener() + + if (this.#reconnectCallback) { + this.#reconnectCallback() + } + // Re-subscribe to all channels for (const channel of this.#channelHandlers.keys()) { const escapedChannel = this.#subscriber.escapeIdentifier(channel) @@ -176,8 +213,9 @@ export class PostgresTransport implements Transport { }) .catch((err) => { debug('error reconnecting: %o', err) + this.#attemptReconnection(attempt + 1) }) - }) + }, delay) } async unsubscribe(channel: string): Promise { diff --git a/tests/drivers/postgres_transport.spec.ts b/tests/drivers/postgres_transport.spec.ts index 7711a11..f37edad 100644 --- a/tests/drivers/postgres_transport.spec.ts +++ b/tests/drivers/postgres_transport.spec.ts @@ -7,6 +7,7 @@ import { setTimeout } from 'node:timers/promises' import { test } from '@japa/runner' +import { Client } from 'pg' import { PostgreSqlContainer, StartedPostgreSqlContainer } from '@testcontainers/postgresql' import { PostgresTransport } from '../../src/transports/postgres.js' import { JsonEncoder } from '../../src/encoders/json_encoder.js' @@ -70,13 +71,25 @@ test.group('Postgres Transport', (group) => { onReconnectTriggered = true }) - await container.restart() - await setTimeout(2000) + await transport.publish('warmup', 'warmup') + + /** + * We use pg_terminate_backend to simulate a connection loss instead of restarting + * the container because restarting the container might change the exposed port, + * making it impossible for the driver to reconnect (since it relies on the initial + * connection string). + */ + const client = new Client({ connectionString: container.getConnectionUri() }) + await client.connect() + await client.query( + `SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE pid <> pg_backend_pid() AND datname = current_database()` + ) + await client.end() + + await setTimeout(5000) assert.isTrue(onReconnectTriggered) - }) - .disableTimeout() - .skip(true, 'PostgreSQL client reconnection behavior needs more investigation') + }).disableTimeout() test('message should be encoded and decoded correctly when using JSON encoder', async ({ assert, From 1fc570ed48a66c233c276754830c2a54525e53e6 Mon Sep 17 00:00:00 2001 From: rcottinet Date: Tue, 10 Feb 2026 22:23:16 +0100 Subject: [PATCH 3/3] fix: optimize re-subscription to channels in PostgresTransport --- src/transports/postgres.ts | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/src/transports/postgres.ts b/src/transports/postgres.ts index 1d8fdec..6f5192a 100644 --- a/src/transports/postgres.ts +++ b/src/transports/postgres.ts @@ -204,10 +204,14 @@ export class PostgresTransport implements Transport { } // Re-subscribe to all channels - for (const channel of this.#channelHandlers.keys()) { - const escapedChannel = this.#subscriber.escapeIdentifier(channel) - this.#subscriber.query(`LISTEN ${escapedChannel}`).catch((err) => { - debug('error re-subscribing to channel %s: %o', channel, err) + const channels = Array.from(this.#channelHandlers.keys()) + if (channels.length > 0) { + const query = channels + .map((channel) => `LISTEN ${this.#subscriber.escapeIdentifier(channel)}`) + .join('; ') + + this.#subscriber.query(query).catch((err) => { + debug('error re-subscribing to channels: %o', err) }) } })