Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions packages/interface/src/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,21 @@ export class ConnectionFailedError extends Error {
}
}

/**
* Thrown when a connection is aborted because it has not received any data
* from the remote peer within the configured staleness threshold. Distinct
* from a transport-level failure - the connection appeared healthy at the
* transport layer but went silent.
*/
export class ConnectionStaleError extends Error {
static name = 'ConnectionStaleError'

constructor (message = 'Connection stale - no data received from peer') {
super(message)
this.name = 'ConnectionStaleError'
}
}

/**
* Thrown when the muxer is closed and an attempt to open a stream occurs
*/
Expand Down
6 changes: 6 additions & 0 deletions packages/interface/src/message-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ export interface MessageStreamTimeline {
* writing by both ends of the stream
*/
close?: number

/**
* A timestamp of when data was most recently received from the remote end
* of the message stream. Updated every time bytes arrive via `onData`.
*/
lastReadAt?: number
}

export interface MessageStreamEvents {
Expand Down
45 changes: 35 additions & 10 deletions packages/libp2p/src/connection-monitor.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import { randomBytes } from '@libp2p/crypto'
import { serviceCapabilities } from '@libp2p/interface'
import { ConnectionStaleError, serviceCapabilities } from '@libp2p/interface'
import { AdaptiveTimeout, byteStream } from '@libp2p/utils'
import { setMaxListeners } from 'main-event'
import type { ComponentLogger, Logger, Metrics, Startable, Stream } from '@libp2p/interface'
import type { ConnectionManager } from '@libp2p/interface-internal'
import type { AdaptiveTimeoutInit } from '@libp2p/utils'

const DEFAULT_PING_INTERVAL_MS = 10000
const DEFAULT_CONNECTION_STALE_TIMEOUT_MS = 60000
const PROTOCOL_VERSION = '1.0.0'
const PROTOCOL_NAME = 'ping'
const PROTOCOL_PREFIX = 'ipfs'
Expand All @@ -29,16 +30,30 @@ export interface ConnectionMonitorInit {
pingInterval?: number

/**
* Timeout settings for how long the ping is allowed to take before the
* connection will be judged inactive and aborted.
* Timeout settings for how long an individual ping is allowed to take. The
* timeout is adaptive to cope with slower networks or nodes that have
* changing network characteristics, such as mobile.
*
* The timeout is adaptive to cope with slower networks or nodes that
* have changing network characteristics, such as mobile.
* A ping that exceeds this is logged but does not by itself cause the
* connection to be aborted - the staleness check on `connectionStaleTimeout`
* decides that.
*/
pingTimeout?: Omit<AdaptiveTimeoutInit, 'metricsName' | 'metrics'>

/**
* If true, any connection that fails the ping will be aborted
* When `abortConnectionOnPingFailure` is true and a ping fails, the
* connection will only be aborted if no data has been received from the
* remote peer on the underlying transport for this many ms. This avoids
* tearing down connections that are healthy but where a single ping was
* delayed by transient network conditions or backpressure.
*
* @default 60000
*/
connectionStaleTimeout?: number

/**
* If true, a connection that has not received any data from the remote peer
* within `connectionStaleTimeout` ms will be aborted when its ping fails.
*
* @default true
*/
Expand All @@ -64,6 +79,7 @@ export class ConnectionMonitor implements Startable {
private readonly log: Logger
private heartbeatInterval?: ReturnType<typeof setInterval>
private readonly pingIntervalMs: number
private readonly connectionStaleTimeoutMs: number
private abortController?: AbortController
private readonly timeout: AdaptiveTimeout
private readonly abortConnectionOnPingFailure: boolean
Expand All @@ -74,6 +90,7 @@ export class ConnectionMonitor implements Startable {

this.log = components.logger.forComponent('libp2p:connection-monitor')
this.pingIntervalMs = init.pingInterval ?? DEFAULT_PING_INTERVAL_MS
this.connectionStaleTimeoutMs = init.connectionStaleTimeout ?? DEFAULT_CONNECTION_STALE_TIMEOUT_MS
this.abortConnectionOnPingFailure = init.abortConnectionOnPingFailure ?? DEFAULT_ABORT_CONNECTION_ON_PING_FAILURE
this.timeout = new AdaptiveTimeout({
...(init.pingTimeout ?? {}),
Expand Down Expand Up @@ -143,11 +160,19 @@ export class ConnectionMonitor implements Startable {
.catch(err => {
this.log.error('error during heartbeat - %e', err)

if (this.abortConnectionOnPingFailure) {
this.log.error('aborting connection due to ping failure')
conn.abort(err)
} else {
if (!this.abortConnectionOnPingFailure) {
this.log('connection ping failed, but not aborting due to abortConnectionOnPingFailure flag')
return
}

const lastReadAt = conn.timeline.lastReadAt ?? conn.timeline.open
const idleMs = Date.now() - lastReadAt

if (idleMs > this.connectionStaleTimeoutMs) {
this.log.error('aborting connection - no data received from peer for %dms (ping error: %e)', idleMs, err)
conn.abort(new ConnectionStaleError(`no data received from peer for ${idleMs}ms`))
} else {
this.log('ping failed but peer was active %dms ago, not aborting', idleMs)
}
})
})
Expand Down
41 changes: 35 additions & 6 deletions packages/libp2p/test/connection-monitor/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -108,17 +108,19 @@ describe('connection monitor', () => {
expect(connection.rtt).to.be.gte(0)
})

it('should abort a connection that times out', async () => {
it('should abort a connection that times out and is silent', async () => {
monitor = new ConnectionMonitor(components, {
pingInterval: 50,
pingTimeout: {
maxTimeout: 50
}
},
connectionStaleTimeout: 50
})

await start(monitor)

const connection = stubInterface<Connection>()
connection.timeline = { open: Date.now() }
connection.newStream.withArgs('/ipfs/ping/1.0.0').callsFake(async (protocols, opts) => {
await delay(200)
opts?.signal?.throwIfAborted()
Expand All @@ -132,14 +134,16 @@ describe('connection monitor', () => {
expect(connection.abort).to.have.property('called', true)
})

it('should abort a connection that fails', async () => {
it('should abort a connection that fails and is silent', async () => {
monitor = new ConnectionMonitor(components, {
pingInterval: 10
pingInterval: 10,
connectionStaleTimeout: 50
})

await start(monitor)

const connection = stubInterface<Connection>()
connection.timeline = { open: Date.now() }
connection.newStream.withArgs('/ipfs/ping/1.0.0').callsFake(async (protocols, opts) => {
throw new ConnectionClosedError('Connection closed')
})
Expand All @@ -151,6 +155,27 @@ describe('connection monitor', () => {
expect(connection.abort).to.have.property('called', true)
})

it('should not abort a connection that fails but recently received data', async () => {
monitor = new ConnectionMonitor(components, {
pingInterval: 10,
connectionStaleTimeout: 10_000
})

await start(monitor)

const connection = stubInterface<Connection>()
connection.timeline = { open: Date.now(), lastReadAt: Date.now() }
connection.newStream.withArgs('/ipfs/ping/1.0.0').callsFake(async (protocols, opts) => {
throw new ConnectionClosedError('Connection closed')
})

components.connectionManager.getConnections.returns([connection])

await delay(500)

expect(connection.abort).to.have.property('called', false)
})

it('should abort the probe stream when the ping exchange fails', async () => {
monitor = new ConnectionMonitor(components, {
pingInterval: 10
Expand All @@ -160,6 +185,7 @@ describe('connection monitor', () => {
stream.send.throws(new Error('write failed'))

const connection = stubInterface<Connection>()
connection.timeline = { open: Date.now(), lastReadAt: Date.now() }
connection.newStream.withArgs('/ipfs/ping/1.0.0').resolves(stream)
components.connectionManager.getConnections.returns([connection])

Expand All @@ -179,6 +205,7 @@ describe('connection monitor', () => {
await start(monitor)

const connection = stubInterface<Connection>()
connection.timeline = { open: Date.now() }
connection.newStream.withArgs('/ipfs/ping/1.0.0').callsFake(async (protocols, opts) => {
throw new ConnectionClosedError('Connection closed')
})
Expand All @@ -190,15 +217,17 @@ describe('connection monitor', () => {
expect(connection.abort).to.have.property('called', false)
})

it('should abort a connection that fails when abortConnectionOnPingFailure is true', async () => {
it('should abort a silent connection that fails when abortConnectionOnPingFailure is true', async () => {
monitor = new ConnectionMonitor(components, {
pingInterval: 10,
abortConnectionOnPingFailure: true
abortConnectionOnPingFailure: true,
connectionStaleTimeout: 50
})

await start(monitor)

const connection = stubInterface<Connection>()
connection.timeline = { open: Date.now() }
connection.newStream.withArgs('/ipfs/ping/1.0.0').callsFake(async (protocols, opts) => {
throw new ConnectionClosedError('Connection closed')
})
Expand Down
1 change: 1 addition & 0 deletions packages/utils/src/abstract-message-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,7 @@ export abstract class AbstractMessageStream<Timeline extends MessageStreamTimeli
return
}

this.timeline.lastReadAt = Date.now()
this.readBuffer.append(data)
this.dispatchReadBuffer()
}
Expand Down
Loading