From e68235736c9f642afe71dba05cf03c6ba668852c Mon Sep 17 00:00:00 2001 From: Leonardo Zanivan Date: Mon, 9 Feb 2026 12:41:17 -0300 Subject: [PATCH 1/8] feat: close connections with open transactions on release --- packages/pg-pool/index.js | 10 +++- packages/pg-pool/test/poison-pool.js | 82 ++++++++++++++++++++++++++++ 2 files changed, 91 insertions(+), 1 deletion(-) create mode 100644 packages/pg-pool/test/poison-pool.js diff --git a/packages/pg-pool/index.js b/packages/pg-pool/index.js index 2fbdb78d5..c3efbb184 100644 --- a/packages/pg-pool/index.js +++ b/packages/pg-pool/index.js @@ -389,7 +389,15 @@ class Pool extends EventEmitter { this.emit('release', err, client) // TODO(bmc): expose a proper, public interface _queryable and _ending - if (err || this.ending || !client._queryable || client._ending || client._poolUseCount >= this.options.maxUses) { + if ( + err || + this.ending || + !client._queryable || + client._ending || + client._txStatus === 'T' || + client._txStatus === 'E' || + client._poolUseCount >= this.options.maxUses + ) { if (client._poolUseCount >= this.options.maxUses) { this.log('remove expended client') } diff --git a/packages/pg-pool/test/poison-pool.js b/packages/pg-pool/test/poison-pool.js new file mode 100644 index 000000000..057b4d28f --- /dev/null +++ b/packages/pg-pool/test/poison-pool.js @@ -0,0 +1,82 @@ +'use strict' + +const expect = require('expect.js') +const describe = require('mocha').describe +const it = require('mocha').it +const Pool = require('..') + +describe('poison connection pool defense (_txStatus check)', function () { + it('removes a client with an open transaction on release', async function () { + const pool = new Pool({ max: 1 }) + const client = await pool.connect() + await client.query('BEGIN') + expect(client._txStatus).to.be('T') + + client.release() + expect(pool.totalCount).to.be(0) + expect(pool.idleCount).to.be(0) + + // pool should still work by creating a fresh connection + const { rows } = await pool.query('SELECT 1 as num') + expect(rows[0].num).to.be(1) + await pool.end() + }) + + it('removes a client in a failed transaction state on release', async function () { + const pool = new Pool({ max: 1 }) + const client = await pool.connect() + await client.query('BEGIN') + try { + await client.query('SELECT invalid_column FROM nonexistent_table') + } catch (e) { + // swallow the error to avoid pool close the connection + } + // The ReadyForQuery message with status 'E' may arrive on a separate I/O event. + // Issue a follow-up query to ensure it has been processed — this will also fail + // (since the transaction is aborted) but guarantees _txStatus is updated. + try { + await client.query('SELECT 1') + } catch (e) { + // expected — "current transaction is aborted" + } + expect(client._txStatus).to.be('E') + + client.release() + expect(pool.totalCount).to.be(0) + expect(pool.idleCount).to.be(0) + + // pool should still work + const { rows } = await pool.query('SELECT 1 as num') + expect(rows[0].num).to.be(1) + await pool.end() + }) + + it('only removes connections with open transactions, keeps idle ones', async function () { + const pool = new Pool({ max: 3 }) + const clientA = await pool.connect() + const clientB = await pool.connect() + const clientC = await pool.connect() + + // Client A: open transaction (poisoned) + await clientA.query('BEGIN') + expect(clientA._txStatus).to.be('T') + + // Client B: normal query (idle) + await clientB.query('SELECT 1') + expect(clientB._txStatus).to.be('I') + + // Client C: committed transaction (idle) + await clientC.query('BEGIN') + await clientC.query('COMMIT') + expect(clientC._txStatus).to.be('I') + + clientA.release() + clientB.release() + clientC.release() + + // A was removed, B and C kept + expect(pool.totalCount).to.be(2) + expect(pool.idleCount).to.be(2) + await pool.end() + }) +}) From a1be8ec00d6a3967d22320fd8dca1bc53e6859a4 Mon Sep 17 00:00:00 2001 From: Leonardo Zanivan Date: Tue, 10 Feb 2026 09:23:05 -0300 Subject: [PATCH 2/8] feat: add log and more tests --- packages/pg-pool/index.js | 10 ++- .../test/{poison-pool.js => leaked-pool.js} | 67 +++++++++++++++++-- 2 files changed, 70 insertions(+), 7 deletions(-) rename packages/pg-pool/test/{poison-pool.js => leaked-pool.js} (54%) diff --git a/packages/pg-pool/index.js b/packages/pg-pool/index.js index c3efbb184..842a36e42 100644 --- a/packages/pg-pool/index.js +++ b/packages/pg-pool/index.js @@ -124,6 +124,10 @@ class Pool extends EventEmitter { return this._clients.length > this.options.min } + _hasActiveTransaction(client) { + return client && (client._txStatus === 'T' || client._txStatus === 'E') + } + _pulseQueue() { this.log('pulse queue') if (this.ended) { @@ -394,13 +398,15 @@ class Pool extends EventEmitter { this.ending || !client._queryable || client._ending || - client._txStatus === 'T' || - client._txStatus === 'E' || + this._hasActiveTransaction(client) || client._poolUseCount >= this.options.maxUses ) { if (client._poolUseCount >= this.options.maxUses) { this.log('remove expended client') } + if (this._hasActiveTransaction(client)) { + this.log('remove client with leaked transaction') + } return this._remove(client, this._pulseQueue.bind(this)) } diff --git a/packages/pg-pool/test/poison-pool.js b/packages/pg-pool/test/leaked-pool.js similarity index 54% rename from packages/pg-pool/test/poison-pool.js rename to packages/pg-pool/test/leaked-pool.js index 057b4d28f..40641cc47 100644 --- a/packages/pg-pool/test/poison-pool.js +++ b/packages/pg-pool/test/leaked-pool.js @@ -5,9 +5,13 @@ const describe = require('mocha').describe const it = require('mocha').it const Pool = require('..') -describe('poison connection pool defense (_txStatus check)', function () { +describe('leaked connection pool guard', function () { it('removes a client with an open transaction on release', async function () { - const pool = new Pool({ max: 1 }) + const logMessages = [] + const pool = new Pool({ + max: 1, + log: (msg) => logMessages.push(msg), + }) const client = await pool.connect() await client.query('BEGIN') expect(client._txStatus).to.be('T') @@ -15,10 +19,14 @@ describe('poison connection pool defense (_txStatus check)', function () { client.release() expect(pool.totalCount).to.be(0) expect(pool.idleCount).to.be(0) + expect(logMessages).to.contain('remove client with leaked transaction') - // pool should still work by creating a fresh connection + // pool recovers by creating a fresh connection const { rows } = await pool.query('SELECT 1 as num') expect(rows[0].num).to.be(1) + expect(pool.totalCount).to.be(1) + expect(pool.idleCount).to.be(1) + await pool.end() }) @@ -45,9 +53,12 @@ describe('poison connection pool defense (_txStatus check)', function () { expect(pool.totalCount).to.be(0) expect(pool.idleCount).to.be(0) - // pool should still work + // pool recovers by creating a fresh connection const { rows } = await pool.query('SELECT 1 as num') expect(rows[0].num).to.be(1) + expect(pool.totalCount).to.be(1) + expect(pool.idleCount).to.be(1) + await pool.end() }) @@ -57,7 +68,7 @@ describe('poison connection pool defense (_txStatus check)', function () { const clientB = await pool.connect() const clientC = await pool.connect() - // Client A: open transaction (poisoned) + // Client A: open transaction (leaked) await clientA.query('BEGIN') expect(clientA._txStatus).to.be('T') @@ -79,4 +90,50 @@ describe('poison connection pool defense (_txStatus check)', function () { expect(pool.idleCount).to.be(2) await pool.end() }) + + describe('pool.query', function () { + it('removes a client after pool.query leaks transaction via BEGIN', async function () { + const logMessages = [] + const pool = new Pool({ max: 1, log: (msg) => logMessages.push(msg) }) + + await pool.query('BEGIN') + + // Client auto-released with txStatus='T', should be removed + expect(pool.totalCount).to.be(0) + expect(pool.idleCount).to.be(0) + expect(logMessages).to.contain('remove client with leaked transaction') + + // Verify pool recovers + const { rows } = await pool.query('SELECT 1 as num') + expect(rows[0].num).to.be(1) + expect(pool.totalCount).to.be(1) + expect(pool.idleCount).to.be(1) + + await pool.end() + }) + + it('removes a client after pool.query in failed transaction state', async function () { + const pool = new Pool({ max: 1 }) + + await pool.query('BEGIN') + + try { + await pool.query('SELECT invalid_column FROM nonexistent_table') + } catch (e) { + // Expected error + } + + // Client with txStatus='E' should be removed + expect(pool.totalCount).to.be(0) + expect(pool.idleCount).to.be(0) + + // Pool recovers + const { rows } = await pool.query('SELECT 1 as num') + expect(rows[0].num).to.be(1) + expect(pool.totalCount).to.be(1) + expect(pool.idleCount).to.be(1) + + await pool.end() + }) + }) }) From 7d9cdaa5873cef21bd4921bc229fe0a57e269832 Mon Sep 17 00:00:00 2001 From: Leonardo Zanivan Date: Tue, 10 Feb 2026 10:07:50 -0300 Subject: [PATCH 3/8] fix: ignore txStatus tests in native mode --- .../test/integration/client/txstatus-tests.js | 94 ++++++++++--------- 1 file changed, 48 insertions(+), 46 deletions(-) diff --git a/packages/pg/test/integration/client/txstatus-tests.js b/packages/pg/test/integration/client/txstatus-tests.js index cb8b740f8..ffaaa7ee9 100644 --- a/packages/pg/test/integration/client/txstatus-tests.js +++ b/packages/pg/test/integration/client/txstatus-tests.js @@ -4,12 +4,14 @@ const suite = new helper.Suite() const pg = helper.pg const assert = require('assert') -suite.test('txStatus tracking', function (done) { - const client = new pg.Client() - client.connect( - assert.success(function () { - // Run a simple query to initialize txStatus - client.query( +// txStatus tracking is not implemented in native client +if (!helper.args.native) { + suite.test('txStatus tracking', function (done) { + const client = new pg.Client() + client.connect( + assert.success(function () { + // Run a simple query to initialize txStatus + client.query( 'SELECT 1', assert.success(function () { // Test 1: Initial state after query (should be idle) @@ -35,48 +37,48 @@ suite.test('txStatus tracking', function (done) { }) ) }) - ) -}) + }) -suite.test('txStatus error state', function (done) { - const client = new pg.Client() - client.connect( - assert.success(function () { - // Run a simple query to initialize txStatus - client.query( - 'SELECT 1', - assert.success(function () { - client.query( - 'BEGIN', - assert.success(function () { - // Execute invalid SQL to trigger error state - client.query('INVALID SQL SYNTAX', function (err) { - assert(err, 'should receive error from invalid query') + suite.test('txStatus error state', function (done) { + const client = new pg.Client() + client.connect( + assert.success(function () { + // Run a simple query to initialize txStatus + client.query( + 'SELECT 1', + assert.success(function () { + client.query( + 'BEGIN', + assert.success(function () { + // Execute invalid SQL to trigger error state + client.query('INVALID SQL SYNTAX', function (err) { + assert(err, 'should receive error from invalid query') - // Issue a sync query to ensure ReadyForQuery has been processed - // This guarantees transaction status has been updated - client.query('SELECT 1', function () { - // This callback fires after ReadyForQuery is processed - assert.equal(client.getTransactionStatus(), 'E', 'should be in error state') + // Issue a sync query to ensure ReadyForQuery has been processed + // This guarantees transaction status has been updated + client.query('SELECT 1', function () { + // This callback fires after ReadyForQuery is processed + assert.equal(client.getTransactionStatus(), 'E', 'should be in error state') - // Rollback to recover - client.query( - 'ROLLBACK', - assert.success(function () { - assert.equal( - client.getTransactionStatus(), - 'I', - 'should return to idle after rollback from error' - ) - client.end(done) - }) - ) + // Rollback to recover + client.query( + 'ROLLBACK', + assert.success(function () { + assert.equal( + client.getTransactionStatus(), + 'I', + 'should return to idle after rollback from error' + ) + client.end(done) + }) + ) + }) }) }) - }) - ) - }) - ) - }) - ) -}) + ) + }) + ) + }) + ) + }) +} From 08dce2a97a5daf6065fa73043c6b9fd7474df6f5 Mon Sep 17 00:00:00 2001 From: Leonardo Zanivan Date: Wed, 11 Feb 2026 16:30:49 -0300 Subject: [PATCH 4/8] feat: add new pool option evictOnOpenTransaction --- docs/pages/apis/pool.mdx | 6 + packages/pg-pool/index.js | 17 +- packages/pg-pool/test/leaked-pool.js | 397 ++++++++++++++++++++------- 3 files changed, 307 insertions(+), 113 deletions(-) diff --git a/docs/pages/apis/pool.mdx b/docs/pages/apis/pool.mdx index 123bc8ba4..c9ba40d93 100644 --- a/docs/pages/apis/pool.mdx +++ b/docs/pages/apis/pool.mdx @@ -63,6 +63,11 @@ type Config = { // middleware so that you can rotate the underlying servers. The default is disabled (value of zero). maxLifetimeSeconds?: number + // When true, automatically removes connections with open or failed transactions from the pool + // when they are released. This provides protection against "connection poisoning" where + // uncommitted transactions leak across different requests. Default is false. + evictOnOpenTransaction?: boolean + // Called once when a new client is created, before it is made available to the pool. // The client is fully connected and queryable at this point. // Can be a regular function or an async function. @@ -84,6 +89,7 @@ const pool = new Pool({ idleTimeoutMillis: 30000, connectionTimeoutMillis: 2000, maxLifetimeSeconds: 60, + evictOnOpenTransaction: true, }) ``` diff --git a/packages/pg-pool/index.js b/packages/pg-pool/index.js index 842a36e42..5a9655544 100644 --- a/packages/pg-pool/index.js +++ b/packages/pg-pool/index.js @@ -91,6 +91,7 @@ class Pool extends EventEmitter { this.options.maxUses = this.options.maxUses || Infinity this.options.allowExitOnIdle = this.options.allowExitOnIdle || false this.options.maxLifetimeSeconds = this.options.maxLifetimeSeconds || 0 + this.options.evictOnOpenTransaction = this.options.evictOnOpenTransaction || false this.log = this.options.log || function () {} this.Client = this.options.Client || Client || require('pg').Client this.Promise = this.options.Promise || global.Promise @@ -393,21 +394,15 @@ class Pool extends EventEmitter { this.emit('release', err, client) // TODO(bmc): expose a proper, public interface _queryable and _ending - if ( - err || - this.ending || - !client._queryable || - client._ending || - this._hasActiveTransaction(client) || - client._poolUseCount >= this.options.maxUses - ) { + if (err || this.ending || !client._queryable || client._ending || client._poolUseCount >= this.options.maxUses) { if (client._poolUseCount >= this.options.maxUses) { this.log('remove expended client') } - if (this._hasActiveTransaction(client)) { - this.log('remove client with leaked transaction') - } + return this._remove(client, this._pulseQueue.bind(this)) + } + if (this.options.evictOnOpenTransaction && this._hasActiveTransaction(client)) { + this.log('remove client due to open transaction') return this._remove(client, this._pulseQueue.bind(this)) } diff --git a/packages/pg-pool/test/leaked-pool.js b/packages/pg-pool/test/leaked-pool.js index 40641cc47..53900cda1 100644 --- a/packages/pg-pool/test/leaked-pool.js +++ b/packages/pg-pool/test/leaked-pool.js @@ -5,105 +5,57 @@ const describe = require('mocha').describe const it = require('mocha').it const Pool = require('..') -describe('leaked connection pool guard', function () { - it('removes a client with an open transaction on release', async function () { - const logMessages = [] - const pool = new Pool({ - max: 1, - log: (msg) => logMessages.push(msg), - }) - const client = await pool.connect() - await client.query('BEGIN') - expect(client._txStatus).to.be('T') - - client.release() - expect(pool.totalCount).to.be(0) - expect(pool.idleCount).to.be(0) - expect(logMessages).to.contain('remove client with leaked transaction') - - // pool recovers by creating a fresh connection - const { rows } = await pool.query('SELECT 1 as num') - expect(rows[0].num).to.be(1) - expect(pool.totalCount).to.be(1) - expect(pool.idleCount).to.be(1) - - await pool.end() - }) +describe('leaked connection pool', function () { + describe('when evictOnOpenTransaction is true', function () { + it('removes a client with an open transaction on release', async function () { + const logMessages = [] + const pool = new Pool({ + max: 1, + log: (msg) => logMessages.push(msg), + evictOnOpenTransaction: true, + }) + const client = await pool.connect() + await client.query('BEGIN') + expect(client._txStatus).to.be('T') - it('removes a client in a failed transaction state on release', async function () { - const pool = new Pool({ max: 1 }) - const client = await pool.connect() - await client.query('BEGIN') - try { - await client.query('SELECT invalid_column FROM nonexistent_table') - } catch (e) { - // swallow the error to avoid pool close the connection - } - // The ReadyForQuery message with status 'E' may arrive on a separate I/O event. - // Issue a follow-up query to ensure it has been processed — this will also fail - // (since the transaction is aborted) but guarantees _txStatus is updated. - try { - await client.query('SELECT 1') - } catch (e) { - // expected — "current transaction is aborted" - } - expect(client._txStatus).to.be('E') - - client.release() - expect(pool.totalCount).to.be(0) - expect(pool.idleCount).to.be(0) - - // pool recovers by creating a fresh connection - const { rows } = await pool.query('SELECT 1 as num') - expect(rows[0].num).to.be(1) - expect(pool.totalCount).to.be(1) - expect(pool.idleCount).to.be(1) - - await pool.end() - }) + client.release() + expect(pool.totalCount).to.be(0) + expect(pool.idleCount).to.be(0) + expect(logMessages).to.contain('remove client due to open transaction') - it('only removes connections with open transactions, keeps idle ones', async function () { - const pool = new Pool({ max: 3 }) - const clientA = await pool.connect() - const clientB = await pool.connect() - const clientC = await pool.connect() - - // Client A: open transaction (leaked) - await clientA.query('BEGIN') - expect(clientA._txStatus).to.be('T') - - // Client B: normal query (idle) - await clientB.query('SELECT 1') - expect(clientB._txStatus).to.be('I') - - // Client C: committed transaction (idle) - await clientC.query('BEGIN') - await clientC.query('COMMIT') - expect(clientC._txStatus).to.be('I') - - clientA.release() - clientB.release() - clientC.release() - - // A was removed, B and C kept - expect(pool.totalCount).to.be(2) - expect(pool.idleCount).to.be(2) - await pool.end() - }) + // pool recovers by creating a fresh connection + const { rows } = await pool.query('SELECT 1 as num') + expect(rows[0].num).to.be(1) + expect(pool.totalCount).to.be(1) + expect(pool.idleCount).to.be(1) - describe('pool.query', function () { - it('removes a client after pool.query leaks transaction via BEGIN', async function () { - const logMessages = [] - const pool = new Pool({ max: 1, log: (msg) => logMessages.push(msg) }) + await pool.end() + }) - await pool.query('BEGIN') + it('removes a client in a failed transaction state on release', async function () { + const pool = new Pool({ max: 1, evictOnOpenTransaction: true }) + const client = await pool.connect() + await client.query('BEGIN') + try { + await client.query('SELECT invalid_column FROM nonexistent_table') + } catch (e) { + // swallow the error to avoid pool close the connection + } + // The ReadyForQuery message with status 'E' may arrive on a separate I/O event. + // Issue a follow-up query to ensure it has been processed — this will also fail + // (since the transaction is aborted) but guarantees _txStatus is updated. + try { + await client.query('SELECT 1') + } catch (e) { + // expected — "current transaction is aborted" + } + expect(client._txStatus).to.be('E') - // Client auto-released with txStatus='T', should be removed + client.release() expect(pool.totalCount).to.be(0) expect(pool.idleCount).to.be(0) - expect(logMessages).to.contain('remove client with leaked transaction') - // Verify pool recovers + // pool recovers by creating a fresh connection const { rows } = await pool.query('SELECT 1 as num') expect(rows[0].num).to.be(1) expect(pool.totalCount).to.be(1) @@ -112,28 +64,269 @@ describe('leaked connection pool guard', function () { await pool.end() }) - it('removes a client after pool.query in failed transaction state', async function () { - const pool = new Pool({ max: 1 }) + it('only removes connections with open transactions, keeps idle ones', async function () { + const pool = new Pool({ max: 3, evictOnOpenTransaction: true }) + const clientA = await pool.connect() + const clientB = await pool.connect() + const clientC = await pool.connect() + + // Client A: open transaction (leaked) + await clientA.query('BEGIN') + expect(clientA._txStatus).to.be('T') + + // Client B: normal query (idle) + await clientB.query('SELECT 1') + expect(clientB._txStatus).to.be('I') + + // Client C: committed transaction (idle) + await clientC.query('BEGIN') + await clientC.query('COMMIT') + expect(clientC._txStatus).to.be('I') + + clientA.release() + clientB.release() + clientC.release() + + // A was removed, B and C kept + expect(pool.totalCount).to.be(2) + expect(pool.idleCount).to.be(2) + await pool.end() + }) + + describe('pool.query', function () { + it('removes a client after pool.query leaks transaction via BEGIN', async function () { + const logMessages = [] + const pool = new Pool({ max: 1, log: (msg) => logMessages.push(msg), evictOnOpenTransaction: true }) + + await pool.query('BEGIN') + + // Client auto-released with txStatus='T', should be removed + expect(pool.totalCount).to.be(0) + expect(pool.idleCount).to.be(0) + expect(logMessages).to.contain('remove client due to open transaction') + + // Verify pool recovers + const { rows } = await pool.query('SELECT 1 as num') + expect(rows[0].num).to.be(1) + expect(pool.totalCount).to.be(1) + expect(pool.idleCount).to.be(1) + + await pool.end() + }) + + it('removes a client after pool.query in failed transaction state', async function () { + const pool = new Pool({ max: 1 }) + + await pool.query('BEGIN') + + try { + await pool.query('SELECT invalid_column FROM nonexistent_table') + } catch (e) { + // Expected error + } + + // Client with txStatus='E' should be removed + expect(pool.totalCount).to.be(0) + expect(pool.idleCount).to.be(0) + + // Pool recovers + const { rows } = await pool.query('SELECT 1 as num') + expect(rows[0].num).to.be(1) + expect(pool.totalCount).to.be(1) + expect(pool.idleCount).to.be(1) + + await pool.end() + }) + }) + }) + + describe('when evictOnOpenTransaction is false or default', function () { + it('keeps client with open transaction when explicitly false', async function () { + const logMessages = [] + const pool = new Pool({ + max: 1, + log: (msg) => logMessages.push(msg), + evictOnOpenTransaction: false, + }) + const client = await pool.connect() + await client.query('BEGIN') + expect(client._txStatus).to.be('T') + + client.release() + expect(pool.totalCount).to.be(1) // NOT removed + expect(pool.idleCount).to.be(1) + expect(logMessages).to.not.contain('remove client due to open transaction') + + // Verify pool can still execute queries (connection was reused) + const { rows } = await pool.query('SELECT 1 as num') + expect(rows[0].num).to.be(1) + + await pool.end() + }) + + it('keeps client with open transaction when option not specified (default)', async function () { + const logMessages = [] + const pool = new Pool({ + max: 1, + log: (msg) => logMessages.push(msg), + }) + const client = await pool.connect() + await client.query('BEGIN') + expect(client._txStatus).to.be('T') - await pool.query('BEGIN') + client.release() + expect(pool.totalCount).to.be(1) // NOT removed + expect(pool.idleCount).to.be(1) + expect(logMessages).to.not.contain('remove client due to open transaction') + // Verify pool can still execute queries (connection was reused) + const { rows } = await pool.query('SELECT 1 as num') + expect(rows[0].num).to.be(1) + + await pool.end() + }) + + it('keeps client in failed transaction state when explicitly false', async function () { + const pool = new Pool({ max: 1, evictOnOpenTransaction: false }) + const client = await pool.connect() + await client.query('BEGIN') + try { + await client.query('SELECT invalid_column FROM nonexistent_table') + } catch (e) { + // swallow the error + } + // Issue a follow-up query to ensure _txStatus is updated to 'E' try { - await pool.query('SELECT invalid_column FROM nonexistent_table') + await client.query('SELECT 1') } catch (e) { - // Expected error + // expected — "current transaction is aborted" } + expect(client._txStatus).to.be('E') - // Client with txStatus='E' should be removed - expect(pool.totalCount).to.be(0) - expect(pool.idleCount).to.be(0) + client.release() + expect(pool.totalCount).to.be(1) // NOT removed + expect(pool.idleCount).to.be(1) - // Pool recovers - const { rows } = await pool.query('SELECT 1 as num') + // Get a new client and manually ROLLBACK the failed transaction + const client2 = await pool.connect() + await client2.query('ROLLBACK') + const { rows } = await client2.query('SELECT 1 as num') expect(rows[0].num).to.be(1) - expect(pool.totalCount).to.be(1) + client2.release() + + await pool.end() + }) + + it('keeps client in failed transaction state when option not specified (default)', async function () { + const pool = new Pool({ max: 1 }) + const client = await pool.connect() + await client.query('BEGIN') + try { + await client.query('SELECT invalid_column FROM nonexistent_table') + } catch (e) { + // swallow the error + } + // Issue a follow-up query to ensure _txStatus is updated to 'E' + try { + await client.query('SELECT 1') + } catch (e) { + // expected — "current transaction is aborted" + } + expect(client._txStatus).to.be('E') + + client.release() + expect(pool.totalCount).to.be(1) // NOT removed expect(pool.idleCount).to.be(1) + // Get a new client and manually ROLLBACK the failed transaction + const client2 = await pool.connect() + await client2.query('ROLLBACK') + const { rows } = await client2.query('SELECT 1 as num') + expect(rows[0].num).to.be(1) + client2.release() + + await pool.end() + }) + + it('keeps all clients with mixed transaction states', async function () { + const logMessages = [] + const pool = new Pool({ + max: 3, + evictOnOpenTransaction: false, + log: (msg) => logMessages.push(msg), + }) + const clientA = await pool.connect() + const clientB = await pool.connect() + const clientC = await pool.connect() + + // Client A: open transaction (leaked) + await clientA.query('BEGIN') + expect(clientA._txStatus).to.be('T') + + // Client B: normal query (idle) + await clientB.query('SELECT 1') + expect(clientB._txStatus).to.be('I') + + // Client C: committed transaction (idle) + await clientC.query('BEGIN') + await clientC.query('COMMIT') + expect(clientC._txStatus).to.be('I') + + clientA.release() + clientB.release() + clientC.release() + + // All clients kept in pool + expect(pool.totalCount).to.be(3) + expect(pool.idleCount).to.be(3) + expect(logMessages).to.not.contain('remove client due to open transaction') + await pool.end() }) + + describe('pool.query', function () { + it('keeps client after pool.query leaks transaction via BEGIN (default)', async function () { + const logMessages = [] + const pool = new Pool({ max: 1, log: (msg) => logMessages.push(msg) }) + + await pool.query('BEGIN') + + // Client auto-released with txStatus='T', should be kept + expect(pool.totalCount).to.be(1) // NOT removed + expect(pool.idleCount).to.be(1) + expect(logMessages).to.not.contain('remove client due to open transaction') + + // Verify pool still works + const { rows } = await pool.query('SELECT 1 as num') + expect(rows[0].num).to.be(1) + + await pool.end() + }) + + it('removes client on pool.query error even when evictOnOpenTransaction is false', async function () { + const pool = new Pool({ max: 1, evictOnOpenTransaction: false }) + + await pool.query('BEGIN') + + try { + await pool.query('SELECT invalid_column FROM nonexistent_table') + } catch (e) { + // Expected error - pool.query calls client.release(err) which removes the client + } + + // Client is removed because pool.query releases with error argument + // This is independent of evictOnOpenTransaction setting + expect(pool.totalCount).to.be(0) + expect(pool.idleCount).to.be(0) + + // Pool recovers with a fresh connection + const { rows } = await pool.query('SELECT 1 as num') + expect(rows[0].num).to.be(1) + expect(pool.totalCount).to.be(1) + expect(pool.idleCount).to.be(1) + + await pool.end() + }) + }) }) }) From 2316822ae25c21317ec21585b1c65e92c857866a Mon Sep 17 00:00:00 2001 From: Leonardo Zanivan Date: Wed, 11 Feb 2026 16:43:36 -0300 Subject: [PATCH 5/8] feat: add new client.getTransactionStatus() method --- packages/pg-pool/index.js | 2 +- packages/pg-pool/test/leaked-pool.js | 30 ++++++------ .../test/integration/client/txstatus-tests.js | 47 ++++++++++--------- 3 files changed, 40 insertions(+), 39 deletions(-) diff --git a/packages/pg-pool/index.js b/packages/pg-pool/index.js index 5a9655544..02543d0e5 100644 --- a/packages/pg-pool/index.js +++ b/packages/pg-pool/index.js @@ -126,7 +126,7 @@ class Pool extends EventEmitter { } _hasActiveTransaction(client) { - return client && (client._txStatus === 'T' || client._txStatus === 'E') + return client && (client.getTransactionStatus() === 'T' || client.getTransactionStatus() === 'E') } _pulseQueue() { diff --git a/packages/pg-pool/test/leaked-pool.js b/packages/pg-pool/test/leaked-pool.js index 53900cda1..c10f4e9f1 100644 --- a/packages/pg-pool/test/leaked-pool.js +++ b/packages/pg-pool/test/leaked-pool.js @@ -16,7 +16,7 @@ describe('leaked connection pool', function () { }) const client = await pool.connect() await client.query('BEGIN') - expect(client._txStatus).to.be('T') + expect(client.getTransactionStatus()).to.be('T') client.release() expect(pool.totalCount).to.be(0) @@ -43,13 +43,13 @@ describe('leaked connection pool', function () { } // The ReadyForQuery message with status 'E' may arrive on a separate I/O event. // Issue a follow-up query to ensure it has been processed — this will also fail - // (since the transaction is aborted) but guarantees _txStatus is updated. + // (since the transaction is aborted) but guarantees transaction status is updated. try { await client.query('SELECT 1') } catch (e) { // expected — "current transaction is aborted" } - expect(client._txStatus).to.be('E') + expect(client.getTransactionStatus()).to.be('E') client.release() expect(pool.totalCount).to.be(0) @@ -72,16 +72,16 @@ describe('leaked connection pool', function () { // Client A: open transaction (leaked) await clientA.query('BEGIN') - expect(clientA._txStatus).to.be('T') + expect(clientA.getTransactionStatus()).to.be('T') // Client B: normal query (idle) await clientB.query('SELECT 1') - expect(clientB._txStatus).to.be('I') + expect(clientB.getTransactionStatus()).to.be('I') // Client C: committed transaction (idle) await clientC.query('BEGIN') await clientC.query('COMMIT') - expect(clientC._txStatus).to.be('I') + expect(clientC.getTransactionStatus()).to.be('I') clientA.release() clientB.release() @@ -150,7 +150,7 @@ describe('leaked connection pool', function () { }) const client = await pool.connect() await client.query('BEGIN') - expect(client._txStatus).to.be('T') + expect(client.getTransactionStatus()).to.be('T') client.release() expect(pool.totalCount).to.be(1) // NOT removed @@ -172,7 +172,7 @@ describe('leaked connection pool', function () { }) const client = await pool.connect() await client.query('BEGIN') - expect(client._txStatus).to.be('T') + expect(client.getTransactionStatus()).to.be('T') client.release() expect(pool.totalCount).to.be(1) // NOT removed @@ -195,13 +195,13 @@ describe('leaked connection pool', function () { } catch (e) { // swallow the error } - // Issue a follow-up query to ensure _txStatus is updated to 'E' + // Issue a follow-up query to ensure transaction status is updated to 'E' try { await client.query('SELECT 1') } catch (e) { // expected — "current transaction is aborted" } - expect(client._txStatus).to.be('E') + expect(client.getTransactionStatus()).to.be('E') client.release() expect(pool.totalCount).to.be(1) // NOT removed @@ -226,13 +226,13 @@ describe('leaked connection pool', function () { } catch (e) { // swallow the error } - // Issue a follow-up query to ensure _txStatus is updated to 'E' + // Issue a follow-up query to ensure transaction status is updated to 'E' try { await client.query('SELECT 1') } catch (e) { // expected — "current transaction is aborted" } - expect(client._txStatus).to.be('E') + expect(client.getTransactionStatus()).to.be('E') client.release() expect(pool.totalCount).to.be(1) // NOT removed @@ -261,16 +261,16 @@ describe('leaked connection pool', function () { // Client A: open transaction (leaked) await clientA.query('BEGIN') - expect(clientA._txStatus).to.be('T') + expect(clientA.getTransactionStatus()).to.be('T') // Client B: normal query (idle) await clientB.query('SELECT 1') - expect(clientB._txStatus).to.be('I') + expect(clientB.getTransactionStatus()).to.be('I') // Client C: committed transaction (idle) await clientC.query('BEGIN') await clientC.query('COMMIT') - expect(clientC._txStatus).to.be('I') + expect(clientC.getTransactionStatus()).to.be('I') clientA.release() clientB.release() diff --git a/packages/pg/test/integration/client/txstatus-tests.js b/packages/pg/test/integration/client/txstatus-tests.js index ffaaa7ee9..3529ce0fe 100644 --- a/packages/pg/test/integration/client/txstatus-tests.js +++ b/packages/pg/test/integration/client/txstatus-tests.js @@ -4,7 +4,7 @@ const suite = new helper.Suite() const pg = helper.pg const assert = require('assert') -// txStatus tracking is not implemented in native client +// txStatus tracking is not supported in native client if (!helper.args.native) { suite.test('txStatus tracking', function (done) { const client = new pg.Client() @@ -12,31 +12,32 @@ if (!helper.args.native) { assert.success(function () { // Run a simple query to initialize txStatus client.query( - 'SELECT 1', - assert.success(function () { - // Test 1: Initial state after query (should be idle) - assert.equal(client.getTransactionStatus(), 'I', 'should start in idle state') + 'SELECT 1', + assert.success(function () { + // Test 1: Initial state after query (should be idle) + assert.equal(client.getTransactionStatus(), 'I', 'should start in idle state') - // Test 2: BEGIN transaction - client.query( - 'BEGIN', - assert.success(function () { - assert.equal(client.getTransactionStatus(), 'T', 'should be in transaction state') + // Test 2: BEGIN transaction + client.query( + 'BEGIN', + assert.success(function () { + assert.equal(client.getTransactionStatus(), 'T', 'should be in transaction state') - // Test 3: COMMIT - client.query( - 'COMMIT', - assert.success(function () { - assert.equal(client.getTransactionStatus(), 'I', 'should return to idle after commit') + // Test 3: COMMIT + client.query( + 'COMMIT', + assert.success(function () { + assert.equal(client.getTransactionStatus(), 'I', 'should return to idle after commit') - client.end(done) - }) - ) - }) - ) - }) - ) - }) + client.end(done) + }) + ) + }) + ) + }) + ) + }) + ) }) suite.test('txStatus error state', function (done) { From ac66480e71d4fd6b7c761b9b7ab999f501d2bb39 Mon Sep 17 00:00:00 2001 From: Leonardo Zanivan Date: Wed, 4 Mar 2026 13:32:59 -0300 Subject: [PATCH 6/8] add native support --- .../test/integration/client/txstatus-tests.js | 139 +++++++++--------- 1 file changed, 68 insertions(+), 71 deletions(-) diff --git a/packages/pg/test/integration/client/txstatus-tests.js b/packages/pg/test/integration/client/txstatus-tests.js index 3529ce0fe..cb8b740f8 100644 --- a/packages/pg/test/integration/client/txstatus-tests.js +++ b/packages/pg/test/integration/client/txstatus-tests.js @@ -4,82 +4,79 @@ const suite = new helper.Suite() const pg = helper.pg const assert = require('assert') -// txStatus tracking is not supported in native client -if (!helper.args.native) { - suite.test('txStatus tracking', function (done) { - const client = new pg.Client() - client.connect( - assert.success(function () { - // Run a simple query to initialize txStatus - client.query( - 'SELECT 1', - assert.success(function () { - // Test 1: Initial state after query (should be idle) - assert.equal(client.getTransactionStatus(), 'I', 'should start in idle state') +suite.test('txStatus tracking', function (done) { + const client = new pg.Client() + client.connect( + assert.success(function () { + // Run a simple query to initialize txStatus + client.query( + 'SELECT 1', + assert.success(function () { + // Test 1: Initial state after query (should be idle) + assert.equal(client.getTransactionStatus(), 'I', 'should start in idle state') - // Test 2: BEGIN transaction - client.query( - 'BEGIN', - assert.success(function () { - assert.equal(client.getTransactionStatus(), 'T', 'should be in transaction state') + // Test 2: BEGIN transaction + client.query( + 'BEGIN', + assert.success(function () { + assert.equal(client.getTransactionStatus(), 'T', 'should be in transaction state') - // Test 3: COMMIT - client.query( - 'COMMIT', - assert.success(function () { - assert.equal(client.getTransactionStatus(), 'I', 'should return to idle after commit') + // Test 3: COMMIT + client.query( + 'COMMIT', + assert.success(function () { + assert.equal(client.getTransactionStatus(), 'I', 'should return to idle after commit') - client.end(done) - }) - ) - }) - ) - }) - ) - }) - ) - }) + client.end(done) + }) + ) + }) + ) + }) + ) + }) + ) +}) - suite.test('txStatus error state', function (done) { - const client = new pg.Client() - client.connect( - assert.success(function () { - // Run a simple query to initialize txStatus - client.query( - 'SELECT 1', - assert.success(function () { - client.query( - 'BEGIN', - assert.success(function () { - // Execute invalid SQL to trigger error state - client.query('INVALID SQL SYNTAX', function (err) { - assert(err, 'should receive error from invalid query') +suite.test('txStatus error state', function (done) { + const client = new pg.Client() + client.connect( + assert.success(function () { + // Run a simple query to initialize txStatus + client.query( + 'SELECT 1', + assert.success(function () { + client.query( + 'BEGIN', + assert.success(function () { + // Execute invalid SQL to trigger error state + client.query('INVALID SQL SYNTAX', function (err) { + assert(err, 'should receive error from invalid query') - // Issue a sync query to ensure ReadyForQuery has been processed - // This guarantees transaction status has been updated - client.query('SELECT 1', function () { - // This callback fires after ReadyForQuery is processed - assert.equal(client.getTransactionStatus(), 'E', 'should be in error state') + // Issue a sync query to ensure ReadyForQuery has been processed + // This guarantees transaction status has been updated + client.query('SELECT 1', function () { + // This callback fires after ReadyForQuery is processed + assert.equal(client.getTransactionStatus(), 'E', 'should be in error state') - // Rollback to recover - client.query( - 'ROLLBACK', - assert.success(function () { - assert.equal( - client.getTransactionStatus(), - 'I', - 'should return to idle after rollback from error' - ) - client.end(done) - }) - ) - }) + // Rollback to recover + client.query( + 'ROLLBACK', + assert.success(function () { + assert.equal( + client.getTransactionStatus(), + 'I', + 'should return to idle after rollback from error' + ) + client.end(done) + }) + ) }) }) - ) - }) - ) - }) - ) - }) -} + }) + ) + }) + ) + }) + ) +}) From c811f065d084088590f5ea9d6307ca739b099df6 Mon Sep 17 00:00:00 2001 From: Leonardo Zanivan Date: Wed, 4 Mar 2026 19:43:58 -0300 Subject: [PATCH 7/8] bump libpq --- yarn.lock | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/yarn.lock b/yarn.lock index 6e45ace78..8283117f8 100644 --- a/yarn.lock +++ b/yarn.lock @@ -6627,10 +6627,10 @@ mz@^2.5.0: object-assign "^4.0.1" thenify-all "^1.0.0" -nan@~2.22.2: - version "2.22.2" - resolved "https://registry.yarnpkg.com/nan/-/nan-2.22.2.tgz#6b504fd029fb8f38c0990e52ad5c26772fdacfbb" - integrity sha512-DANghxFkS1plDdRsX0X9pm0Z6SJNN6gBdtXfanwoZ8hooC5gosGFSBGRYHUVPz1asKA/kMRqDRdHrluZ61SpBQ== +nan@~2.23.1: + version "2.23.1" + resolved "https://registry.yarnpkg.com/nan/-/nan-2.23.1.tgz#6f86a31dd87e3d1eb77512bf4b9e14c8aded3975" + integrity sha512-r7bBUGKzlqk8oPBDYxt6Z0aEdF1G1rwlMcLk8LCOMbOzf0mG+JUfUzG4fIMWwHWP0iyaLWEQZJmtB7nOHEm/qw== nan@~2.23.1: version "2.23.1" From 1611258980bca03fd2d45c9c6b0893032b6d3ee6 Mon Sep 17 00:00:00 2001 From: Leonardo Zanivan Date: Mon, 11 May 2026 18:05:02 -0300 Subject: [PATCH 8/8] rebase --- yarn.lock | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/yarn.lock b/yarn.lock index 8283117f8..6e45ace78 100644 --- a/yarn.lock +++ b/yarn.lock @@ -6627,10 +6627,10 @@ mz@^2.5.0: object-assign "^4.0.1" thenify-all "^1.0.0" -nan@~2.23.1: - version "2.23.1" - resolved "https://registry.yarnpkg.com/nan/-/nan-2.23.1.tgz#6f86a31dd87e3d1eb77512bf4b9e14c8aded3975" - integrity sha512-r7bBUGKzlqk8oPBDYxt6Z0aEdF1G1rwlMcLk8LCOMbOzf0mG+JUfUzG4fIMWwHWP0iyaLWEQZJmtB7nOHEm/qw== +nan@~2.22.2: + version "2.22.2" + resolved "https://registry.yarnpkg.com/nan/-/nan-2.22.2.tgz#6b504fd029fb8f38c0990e52ad5c26772fdacfbb" + integrity sha512-DANghxFkS1plDdRsX0X9pm0Z6SJNN6gBdtXfanwoZ8hooC5gosGFSBGRYHUVPz1asKA/kMRqDRdHrluZ61SpBQ== nan@~2.23.1: version "2.23.1"