Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions src/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose
target_session_attrs
} = options

// prefer-standby does a second pass over the host list accepting any server type (like libpq)
const maxHostAttempts = host.length * (target_session_attrs === 'prefer-standby' ? 2 : 1)

const sent = Queue()
, id = uid++
, backend = { pid: null, secret: null }
Expand Down Expand Up @@ -112,6 +115,7 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose
idleTimer,
connect(query) {
initial = query
retries = 0
reconnect()
},
terminate,
Expand Down Expand Up @@ -259,7 +263,7 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose
}

function connectTimedOut() {
errored(Errors.connection('CONNECT_TIMEOUT', options, socket))
error(Errors.connection('CONNECT_TIMEOUT', options, socket))
socket.destroy()
}

Expand Down Expand Up @@ -379,7 +383,7 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose
}

function error(err) {
if (connection.queue === queues.connecting && options.host[retries + 1])
if (connection.queue === queues.connecting && retries + 1 < maxHostAttempts)
return

errored(err)
Expand Down Expand Up @@ -447,8 +451,10 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose
socket.removeAllListeners()
socket = null

if (initial)
if (initial) {
retries++
return reconnect()
}

!hadError && (query || sent.length) && error(Errors.connection('CONNECTION_CLOSED', options, socket))
closedTime = performance.now()
Expand Down Expand Up @@ -793,7 +799,7 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose
(x === 'read-only' && xs.default_transaction_read_only === 'off') ||
(x === 'primary' && xs.in_hot_standby === 'on') ||
(x === 'standby' && xs.in_hot_standby === 'off') ||
(x === 'prefer-standby' && xs.in_hot_standby === 'off' && options.host[retries])
(x === 'prefer-standby' && xs.in_hot_standby === 'off' && retries < host.length)
)
}

Expand Down
81 changes: 81 additions & 0 deletions tests/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -1901,6 +1901,87 @@ t('Multiple hosts', {
return [[id1, id2, id1].join(','), result.join(',')]
})

t('Multiple hosts errors when all hosts are down', { timeout: 10 }, async() => {
const sql = postgres({ ...options, host: ['localhost', 'localhost'], port: [1, 2], connect_timeout: 1 })
return ['ECONNREFUSED', await sql`select 1`.catch(e => e.code)]
})

t('Multiple hosts continues to next host after connect timeout', { timeout: 10 }, async() => {
const server = net.createServer()
server.listen()
const sql = postgres({ ...options, host: ['127.0.0.1', 'localhost'], port: [server.address().port, 5432], connect_timeout: 1 })
const x = (await sql`select 1 as x`)[0].x
server.close()
await sql.end()
return [1, x]
})

t('prefer-standby connects to the primary when the standby host is down', { timeout: 10 }, async() => {
const sql = postgres({ ...options, host: ['localhost', 'localhost'], port: [1, 5432], target_session_attrs: 'prefer-standby', connect_timeout: 1 })
const x = (await sql`select 1 as x`)[0].x
await sql.end()
return [1, x]
})

t('prefer-standby connects to a primary when no host is a standby', { timeout: 10 }, async() => {
const sql = postgres({ idle_timeout, max: 1, host: ['localhost', 'localhost'], port: [5432, 5433], target_session_attrs: 'prefer-standby' })
const x = (await sql`select 1 as x`)[0].x
await sql.end()
return [1, x]
})

t('target_session_attrs standby errors when no host is a standby', { timeout: 10 }, async() => {
const sql = postgres({ idle_timeout, max: 1, host: ['localhost', 'localhost'], port: [5432, 5433], target_session_attrs: 'standby' })
return ['CONNECTION_DESTROYED', await sql`select 1`.catch(e => e.code)]
})

t('Multiple hosts rejects within connect_timeout × hosts when every host times out', { timeout: 10 }, async() => {
// Two hosts that accept the TCP connection but never speak the Postgres protocol, so each
// attempt connect-timeouts (the #988 path, not ECONNREFUSED). When `error()` wrongly assumed
// "another host to try", connect_timeout was defeated and the query hung forever. It must now
// reject, bounded by ~connect_timeout × hosts, instead of hanging.
const connect_timeout = 0.3
const a = net.createServer().listen()
const b = net.createServer().listen()
const host = ['127.0.0.1', '127.0.0.1']
const port = [a.address().port, b.address().port]
const sql = postgres({ ...options, host, port, connect_timeout })

const start = Date.now()
const code = await sql`select 1`.catch(e => e.code)
const elapsed = (Date.now() - start) / 1000

a.close()
b.close()
await sql.end()

const bounded = code === 'CONNECT_TIMEOUT' && elapsed < connect_timeout * host.length + 1
return ['CONNECT_TIMEOUT bounded', bounded ? 'CONNECT_TIMEOUT bounded' : `${code} after ${elapsed.toFixed(2)}s`]
})

t('prefer-standby exhausts the standby-only first pass before accepting a primary', { timeout: 10 }, async() => {
// host[0] is the real primary; host[1] is a probe that counts connections and drops them.
// With prefer-standby and no standby available, libpq-style semantics require a full first
// pass that rejects primaries, then a second pass that accepts any server. So the probe must
// be reached exactly once (pass one skips past the primary) before the primary is accepted on
// pass two. probe === 0 would mean the primary was wrongly accepted on the first pass; a probe
// count that never settles (looping forever) is the original "retries never increments" bug.
let probe = 0
const server = net.createServer(socket => (probe++, socket.destroy())).listen()
const sql = postgres({
...options,
host: ['localhost', '127.0.0.1'],
port: [5432, server.address().port],
target_session_attrs: 'prefer-standby'
})

const x = (await sql`select 1 as x`)[0].x
server.close()
await sql.end()

return ['1,1', [x, probe].join(',')]
})

t('Escaping supports schemas and tables', async() => {
await sql`create schema a`
await sql`create table a.b (c int)`
Expand Down
4 changes: 2 additions & 2 deletions types/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -345,9 +345,9 @@ declare namespace postgres {

interface Options<T extends Record<string, postgres.PostgresType>> extends Partial<BaseOptions<T>> {
/** @inheritdoc */
host?: string | undefined;
host?: string | string[] | undefined;
/** @inheritdoc */
port?: number | undefined;
port?: number | number[] | undefined;
/** @inheritdoc */
path?: string | undefined;
/** Password of database user (an alias for `password`) */
Expand Down