diff --git a/docs/pages/apis/pool.mdx b/docs/pages/apis/pool.mdx index 123bc8ba4..c9ba40d93 100644 --- a/docs/pages/apis/pool.mdx +++ b/docs/pages/apis/pool.mdx @@ -63,6 +63,11 @@ type Config = { // middleware so that you can rotate the underlying servers. The default is disabled (value of zero). maxLifetimeSeconds?: number + // When true, automatically removes connections with open or failed transactions from the pool + // when they are released. This provides protection against "connection poisoning" where + // uncommitted transactions leak across different requests. Default is false. + evictOnOpenTransaction?: boolean + // Called once when a new client is created, before it is made available to the pool. // The client is fully connected and queryable at this point. // Can be a regular function or an async function. @@ -84,6 +89,7 @@ const pool = new Pool({ idleTimeoutMillis: 30000, connectionTimeoutMillis: 2000, maxLifetimeSeconds: 60, + evictOnOpenTransaction: true, }) ``` diff --git a/packages/pg-pool/index.js b/packages/pg-pool/index.js index 2fbdb78d5..02543d0e5 100644 --- a/packages/pg-pool/index.js +++ b/packages/pg-pool/index.js @@ -91,6 +91,7 @@ class Pool extends EventEmitter { this.options.maxUses = this.options.maxUses || Infinity this.options.allowExitOnIdle = this.options.allowExitOnIdle || false this.options.maxLifetimeSeconds = this.options.maxLifetimeSeconds || 0 + this.options.evictOnOpenTransaction = this.options.evictOnOpenTransaction || false this.log = this.options.log || function () {} this.Client = this.options.Client || Client || require('pg').Client this.Promise = this.options.Promise || global.Promise @@ -124,6 +125,10 @@ class Pool extends EventEmitter { return this._clients.length > this.options.min } + _hasActiveTransaction(client) { + return client && (client.getTransactionStatus() === 'T' || client.getTransactionStatus() === 'E') + } + _pulseQueue() { this.log('pulse queue') if (this.ended) { @@ -393,7 +398,11 @@ class Pool extends EventEmitter { if (client._poolUseCount >= this.options.maxUses) { this.log('remove expended client') } + return this._remove(client, this._pulseQueue.bind(this)) + } + if (this.options.evictOnOpenTransaction && this._hasActiveTransaction(client)) { + this.log('remove client due to open transaction') return this._remove(client, this._pulseQueue.bind(this)) } diff --git a/packages/pg-pool/test/leaked-pool.js b/packages/pg-pool/test/leaked-pool.js new file mode 100644 index 000000000..c10f4e9f1 --- /dev/null +++ b/packages/pg-pool/test/leaked-pool.js @@ -0,0 +1,332 @@ +'use strict' + +const expect = require('expect.js') +const describe = require('mocha').describe +const it = require('mocha').it +const Pool = require('..') + +describe('leaked connection pool', function () { + describe('when evictOnOpenTransaction is true', function () { + it('removes a client with an open transaction on release', async function () { + const logMessages = [] + const pool = new Pool({ + max: 1, + log: (msg) => logMessages.push(msg), + evictOnOpenTransaction: true, + }) + const client = await pool.connect() + await client.query('BEGIN') + expect(client.getTransactionStatus()).to.be('T') + + client.release() + expect(pool.totalCount).to.be(0) + expect(pool.idleCount).to.be(0) + expect(logMessages).to.contain('remove client due to open transaction') + + // pool recovers by creating a fresh connection + const { rows } = await pool.query('SELECT 1 as num') + expect(rows[0].num).to.be(1) + expect(pool.totalCount).to.be(1) + expect(pool.idleCount).to.be(1) + + await pool.end() + }) + + it('removes a client in a failed transaction state on release', async function () { + const pool = new Pool({ max: 1, evictOnOpenTransaction: true }) + const client = await pool.connect() + await client.query('BEGIN') + try { + await client.query('SELECT invalid_column FROM nonexistent_table') + } catch (e) { + // swallow the error to avoid pool close the connection + } + // The ReadyForQuery message with status 'E' may arrive on a separate I/O event. + // Issue a follow-up query to ensure it has been processed — this will also fail + // (since the transaction is aborted) but guarantees transaction status is updated. + try { + await client.query('SELECT 1') + } catch (e) { + // expected — "current transaction is aborted" + } + expect(client.getTransactionStatus()).to.be('E') + + client.release() + expect(pool.totalCount).to.be(0) + expect(pool.idleCount).to.be(0) + + // pool recovers by creating a fresh connection + const { rows } = await pool.query('SELECT 1 as num') + expect(rows[0].num).to.be(1) + expect(pool.totalCount).to.be(1) + expect(pool.idleCount).to.be(1) + + await pool.end() + }) + + it('only removes connections with open transactions, keeps idle ones', async function () { + const pool = new Pool({ max: 3, evictOnOpenTransaction: true }) + const clientA = await pool.connect() + const clientB = await pool.connect() + const clientC = await pool.connect() + + // Client A: open transaction (leaked) + await clientA.query('BEGIN') + expect(clientA.getTransactionStatus()).to.be('T') + + // Client B: normal query (idle) + await clientB.query('SELECT 1') + expect(clientB.getTransactionStatus()).to.be('I') + + // Client C: committed transaction (idle) + await clientC.query('BEGIN') + await clientC.query('COMMIT') + expect(clientC.getTransactionStatus()).to.be('I') + + clientA.release() + clientB.release() + clientC.release() + + // A was removed, B and C kept + expect(pool.totalCount).to.be(2) + expect(pool.idleCount).to.be(2) + await pool.end() + }) + + describe('pool.query', function () { + it('removes a client after pool.query leaks transaction via BEGIN', async function () { + const logMessages = [] + const pool = new Pool({ max: 1, log: (msg) => logMessages.push(msg), evictOnOpenTransaction: true }) + + await pool.query('BEGIN') + + // Client auto-released with txStatus='T', should be removed + expect(pool.totalCount).to.be(0) + expect(pool.idleCount).to.be(0) + expect(logMessages).to.contain('remove client due to open transaction') + + // Verify pool recovers + const { rows } = await pool.query('SELECT 1 as num') + expect(rows[0].num).to.be(1) + expect(pool.totalCount).to.be(1) + expect(pool.idleCount).to.be(1) + + await pool.end() + }) + + it('removes a client after pool.query in failed transaction state', async function () { + const pool = new Pool({ max: 1 }) + + await pool.query('BEGIN') + + try { + await pool.query('SELECT invalid_column FROM nonexistent_table') + } catch (e) { + // Expected error + } + + // Client with txStatus='E' should be removed + expect(pool.totalCount).to.be(0) + expect(pool.idleCount).to.be(0) + + // Pool recovers + const { rows } = await pool.query('SELECT 1 as num') + expect(rows[0].num).to.be(1) + expect(pool.totalCount).to.be(1) + expect(pool.idleCount).to.be(1) + + await pool.end() + }) + }) + }) + + describe('when evictOnOpenTransaction is false or default', function () { + it('keeps client with open transaction when explicitly false', async function () { + const logMessages = [] + const pool = new Pool({ + max: 1, + log: (msg) => logMessages.push(msg), + evictOnOpenTransaction: false, + }) + const client = await pool.connect() + await client.query('BEGIN') + expect(client.getTransactionStatus()).to.be('T') + + client.release() + expect(pool.totalCount).to.be(1) // NOT removed + expect(pool.idleCount).to.be(1) + expect(logMessages).to.not.contain('remove client due to open transaction') + + // Verify pool can still execute queries (connection was reused) + const { rows } = await pool.query('SELECT 1 as num') + expect(rows[0].num).to.be(1) + + await pool.end() + }) + + it('keeps client with open transaction when option not specified (default)', async function () { + const logMessages = [] + const pool = new Pool({ + max: 1, + log: (msg) => logMessages.push(msg), + }) + const client = await pool.connect() + await client.query('BEGIN') + expect(client.getTransactionStatus()).to.be('T') + + client.release() + expect(pool.totalCount).to.be(1) // NOT removed + expect(pool.idleCount).to.be(1) + expect(logMessages).to.not.contain('remove client due to open transaction') + + // Verify pool can still execute queries (connection was reused) + const { rows } = await pool.query('SELECT 1 as num') + expect(rows[0].num).to.be(1) + + await pool.end() + }) + + it('keeps client in failed transaction state when explicitly false', async function () { + const pool = new Pool({ max: 1, evictOnOpenTransaction: false }) + const client = await pool.connect() + await client.query('BEGIN') + try { + await client.query('SELECT invalid_column FROM nonexistent_table') + } catch (e) { + // swallow the error + } + // Issue a follow-up query to ensure transaction status is updated to 'E' + try { + await client.query('SELECT 1') + } catch (e) { + // expected — "current transaction is aborted" + } + expect(client.getTransactionStatus()).to.be('E') + + client.release() + expect(pool.totalCount).to.be(1) // NOT removed + expect(pool.idleCount).to.be(1) + + // Get a new client and manually ROLLBACK the failed transaction + const client2 = await pool.connect() + await client2.query('ROLLBACK') + const { rows } = await client2.query('SELECT 1 as num') + expect(rows[0].num).to.be(1) + client2.release() + + await pool.end() + }) + + it('keeps client in failed transaction state when option not specified (default)', async function () { + const pool = new Pool({ max: 1 }) + const client = await pool.connect() + await client.query('BEGIN') + try { + await client.query('SELECT invalid_column FROM nonexistent_table') + } catch (e) { + // swallow the error + } + // Issue a follow-up query to ensure transaction status is updated to 'E' + try { + await client.query('SELECT 1') + } catch (e) { + // expected — "current transaction is aborted" + } + expect(client.getTransactionStatus()).to.be('E') + + client.release() + expect(pool.totalCount).to.be(1) // NOT removed + expect(pool.idleCount).to.be(1) + + // Get a new client and manually ROLLBACK the failed transaction + const client2 = await pool.connect() + await client2.query('ROLLBACK') + const { rows } = await client2.query('SELECT 1 as num') + expect(rows[0].num).to.be(1) + client2.release() + + await pool.end() + }) + + it('keeps all clients with mixed transaction states', async function () { + const logMessages = [] + const pool = new Pool({ + max: 3, + evictOnOpenTransaction: false, + log: (msg) => logMessages.push(msg), + }) + const clientA = await pool.connect() + const clientB = await pool.connect() + const clientC = await pool.connect() + + // Client A: open transaction (leaked) + await clientA.query('BEGIN') + expect(clientA.getTransactionStatus()).to.be('T') + + // Client B: normal query (idle) + await clientB.query('SELECT 1') + expect(clientB.getTransactionStatus()).to.be('I') + + // Client C: committed transaction (idle) + await clientC.query('BEGIN') + await clientC.query('COMMIT') + expect(clientC.getTransactionStatus()).to.be('I') + + clientA.release() + clientB.release() + clientC.release() + + // All clients kept in pool + expect(pool.totalCount).to.be(3) + expect(pool.idleCount).to.be(3) + expect(logMessages).to.not.contain('remove client due to open transaction') + + await pool.end() + }) + + describe('pool.query', function () { + it('keeps client after pool.query leaks transaction via BEGIN (default)', async function () { + const logMessages = [] + const pool = new Pool({ max: 1, log: (msg) => logMessages.push(msg) }) + + await pool.query('BEGIN') + + // Client auto-released with txStatus='T', should be kept + expect(pool.totalCount).to.be(1) // NOT removed + expect(pool.idleCount).to.be(1) + expect(logMessages).to.not.contain('remove client due to open transaction') + + // Verify pool still works + const { rows } = await pool.query('SELECT 1 as num') + expect(rows[0].num).to.be(1) + + await pool.end() + }) + + it('removes client on pool.query error even when evictOnOpenTransaction is false', async function () { + const pool = new Pool({ max: 1, evictOnOpenTransaction: false }) + + await pool.query('BEGIN') + + try { + await pool.query('SELECT invalid_column FROM nonexistent_table') + } catch (e) { + // Expected error - pool.query calls client.release(err) which removes the client + } + + // Client is removed because pool.query releases with error argument + // This is independent of evictOnOpenTransaction setting + expect(pool.totalCount).to.be(0) + expect(pool.idleCount).to.be(0) + + // Pool recovers with a fresh connection + const { rows } = await pool.query('SELECT 1 as num') + expect(rows[0].num).to.be(1) + expect(pool.totalCount).to.be(1) + expect(pool.idleCount).to.be(1) + + await pool.end() + }) + }) + }) +})