From ca75283c873b485f00a132c7fcdccbb7f0a1615e Mon Sep 17 00:00:00 2001 From: tabcat Date: Wed, 13 May 2026 21:50:04 +0700 Subject: [PATCH] fix(libp2p): gate connection-monitor abort on transport silence A single failed ping no longer aborts the connection. The heartbeat now records `lastReadAt` on every MaConn read and only aborts when the ping fails AND no data has been received from the peer for `connectionStaleTimeout` ms (default 60s). Aborts use the new `ConnectionStaleError` so consumers can distinguish staleness from transport-level failures. --- packages/interface/src/errors.ts | 15 +++++++ packages/interface/src/message-stream.ts | 6 +++ packages/libp2p/src/connection-monitor.ts | 45 ++++++++++++++----- .../test/connection-monitor/index.spec.ts | 41 ++++++++++++++--- packages/utils/src/abstract-message-stream.ts | 1 + 5 files changed, 92 insertions(+), 16 deletions(-) diff --git a/packages/interface/src/errors.ts b/packages/interface/src/errors.ts index 74584a9dc4..f80fc6e859 100644 --- a/packages/interface/src/errors.ts +++ b/packages/interface/src/errors.ts @@ -120,6 +120,21 @@ export class ConnectionFailedError extends Error { } } +/** + * Thrown when a connection is aborted because it has not received any data + * from the remote peer within the configured staleness threshold. Distinct + * from a transport-level failure - the connection appeared healthy at the + * transport layer but went silent. + */ +export class ConnectionStaleError extends Error { + static name = 'ConnectionStaleError' + + constructor (message = 'Connection stale - no data received from peer') { + super(message) + this.name = 'ConnectionStaleError' + } +} + /** * Thrown when the muxer is closed and an attempt to open a stream occurs */ diff --git a/packages/interface/src/message-stream.ts b/packages/interface/src/message-stream.ts index f69d94ca64..f3ecf00c1f 100644 --- a/packages/interface/src/message-stream.ts +++ b/packages/interface/src/message-stream.ts @@ -35,6 +35,12 @@ export interface MessageStreamTimeline { * writing by both ends of the stream */ close?: number + + /** + * A timestamp of when data was most recently received from the remote end + * of the message stream. Updated every time bytes arrive via `onData`. + */ + lastReadAt?: number } export interface MessageStreamEvents { diff --git a/packages/libp2p/src/connection-monitor.ts b/packages/libp2p/src/connection-monitor.ts index d3980f1e17..827a66d711 100644 --- a/packages/libp2p/src/connection-monitor.ts +++ b/packages/libp2p/src/connection-monitor.ts @@ -1,5 +1,5 @@ import { randomBytes } from '@libp2p/crypto' -import { serviceCapabilities } from '@libp2p/interface' +import { ConnectionStaleError, serviceCapabilities } from '@libp2p/interface' import { AdaptiveTimeout, byteStream } from '@libp2p/utils' import { setMaxListeners } from 'main-event' import type { ComponentLogger, Logger, Metrics, Startable, Stream } from '@libp2p/interface' @@ -7,6 +7,7 @@ import type { ConnectionManager } from '@libp2p/interface-internal' import type { AdaptiveTimeoutInit } from '@libp2p/utils' const DEFAULT_PING_INTERVAL_MS = 10000 +const DEFAULT_CONNECTION_STALE_TIMEOUT_MS = 60000 const PROTOCOL_VERSION = '1.0.0' const PROTOCOL_NAME = 'ping' const PROTOCOL_PREFIX = 'ipfs' @@ -29,16 +30,30 @@ export interface ConnectionMonitorInit { pingInterval?: number /** - * Timeout settings for how long the ping is allowed to take before the - * connection will be judged inactive and aborted. + * Timeout settings for how long an individual ping is allowed to take. The + * timeout is adaptive to cope with slower networks or nodes that have + * changing network characteristics, such as mobile. * - * The timeout is adaptive to cope with slower networks or nodes that - * have changing network characteristics, such as mobile. + * A ping that exceeds this is logged but does not by itself cause the + * connection to be aborted - the staleness check on `connectionStaleTimeout` + * decides that. */ pingTimeout?: Omit /** - * If true, any connection that fails the ping will be aborted + * When `abortConnectionOnPingFailure` is true and a ping fails, the + * connection will only be aborted if no data has been received from the + * remote peer on the underlying transport for this many ms. This avoids + * tearing down connections that are healthy but where a single ping was + * delayed by transient network conditions or backpressure. + * + * @default 60000 + */ + connectionStaleTimeout?: number + + /** + * If true, a connection that has not received any data from the remote peer + * within `connectionStaleTimeout` ms will be aborted when its ping fails. * * @default true */ @@ -64,6 +79,7 @@ export class ConnectionMonitor implements Startable { private readonly log: Logger private heartbeatInterval?: ReturnType private readonly pingIntervalMs: number + private readonly connectionStaleTimeoutMs: number private abortController?: AbortController private readonly timeout: AdaptiveTimeout private readonly abortConnectionOnPingFailure: boolean @@ -74,6 +90,7 @@ export class ConnectionMonitor implements Startable { this.log = components.logger.forComponent('libp2p:connection-monitor') this.pingIntervalMs = init.pingInterval ?? DEFAULT_PING_INTERVAL_MS + this.connectionStaleTimeoutMs = init.connectionStaleTimeout ?? DEFAULT_CONNECTION_STALE_TIMEOUT_MS this.abortConnectionOnPingFailure = init.abortConnectionOnPingFailure ?? DEFAULT_ABORT_CONNECTION_ON_PING_FAILURE this.timeout = new AdaptiveTimeout({ ...(init.pingTimeout ?? {}), @@ -143,11 +160,19 @@ export class ConnectionMonitor implements Startable { .catch(err => { this.log.error('error during heartbeat - %e', err) - if (this.abortConnectionOnPingFailure) { - this.log.error('aborting connection due to ping failure') - conn.abort(err) - } else { + if (!this.abortConnectionOnPingFailure) { this.log('connection ping failed, but not aborting due to abortConnectionOnPingFailure flag') + return + } + + const lastReadAt = conn.timeline.lastReadAt ?? conn.timeline.open + const idleMs = Date.now() - lastReadAt + + if (idleMs > this.connectionStaleTimeoutMs) { + this.log.error('aborting connection - no data received from peer for %dms (ping error: %e)', idleMs, err) + conn.abort(new ConnectionStaleError(`no data received from peer for ${idleMs}ms`)) + } else { + this.log('ping failed but peer was active %dms ago, not aborting', idleMs) } }) }) diff --git a/packages/libp2p/test/connection-monitor/index.spec.ts b/packages/libp2p/test/connection-monitor/index.spec.ts index 1bdf0d1307..2587cdd9c0 100644 --- a/packages/libp2p/test/connection-monitor/index.spec.ts +++ b/packages/libp2p/test/connection-monitor/index.spec.ts @@ -108,17 +108,19 @@ describe('connection monitor', () => { expect(connection.rtt).to.be.gte(0) }) - it('should abort a connection that times out', async () => { + it('should abort a connection that times out and is silent', async () => { monitor = new ConnectionMonitor(components, { pingInterval: 50, pingTimeout: { maxTimeout: 50 - } + }, + connectionStaleTimeout: 50 }) await start(monitor) const connection = stubInterface() + connection.timeline = { open: Date.now() } connection.newStream.withArgs('/ipfs/ping/1.0.0').callsFake(async (protocols, opts) => { await delay(200) opts?.signal?.throwIfAborted() @@ -132,14 +134,16 @@ describe('connection monitor', () => { expect(connection.abort).to.have.property('called', true) }) - it('should abort a connection that fails', async () => { + it('should abort a connection that fails and is silent', async () => { monitor = new ConnectionMonitor(components, { - pingInterval: 10 + pingInterval: 10, + connectionStaleTimeout: 50 }) await start(monitor) const connection = stubInterface() + connection.timeline = { open: Date.now() } connection.newStream.withArgs('/ipfs/ping/1.0.0').callsFake(async (protocols, opts) => { throw new ConnectionClosedError('Connection closed') }) @@ -151,6 +155,27 @@ describe('connection monitor', () => { expect(connection.abort).to.have.property('called', true) }) + it('should not abort a connection that fails but recently received data', async () => { + monitor = new ConnectionMonitor(components, { + pingInterval: 10, + connectionStaleTimeout: 10_000 + }) + + await start(monitor) + + const connection = stubInterface() + connection.timeline = { open: Date.now(), lastReadAt: Date.now() } + connection.newStream.withArgs('/ipfs/ping/1.0.0').callsFake(async (protocols, opts) => { + throw new ConnectionClosedError('Connection closed') + }) + + components.connectionManager.getConnections.returns([connection]) + + await delay(500) + + expect(connection.abort).to.have.property('called', false) + }) + it('should abort the probe stream when the ping exchange fails', async () => { monitor = new ConnectionMonitor(components, { pingInterval: 10 @@ -160,6 +185,7 @@ describe('connection monitor', () => { stream.send.throws(new Error('write failed')) const connection = stubInterface() + connection.timeline = { open: Date.now(), lastReadAt: Date.now() } connection.newStream.withArgs('/ipfs/ping/1.0.0').resolves(stream) components.connectionManager.getConnections.returns([connection]) @@ -179,6 +205,7 @@ describe('connection monitor', () => { await start(monitor) const connection = stubInterface() + connection.timeline = { open: Date.now() } connection.newStream.withArgs('/ipfs/ping/1.0.0').callsFake(async (protocols, opts) => { throw new ConnectionClosedError('Connection closed') }) @@ -190,15 +217,17 @@ describe('connection monitor', () => { expect(connection.abort).to.have.property('called', false) }) - it('should abort a connection that fails when abortConnectionOnPingFailure is true', async () => { + it('should abort a silent connection that fails when abortConnectionOnPingFailure is true', async () => { monitor = new ConnectionMonitor(components, { pingInterval: 10, - abortConnectionOnPingFailure: true + abortConnectionOnPingFailure: true, + connectionStaleTimeout: 50 }) await start(monitor) const connection = stubInterface() + connection.timeline = { open: Date.now() } connection.newStream.withArgs('/ipfs/ping/1.0.0').callsFake(async (protocols, opts) => { throw new ConnectionClosedError('Connection closed') }) diff --git a/packages/utils/src/abstract-message-stream.ts b/packages/utils/src/abstract-message-stream.ts index e4f1356564..bf9d9722f9 100644 --- a/packages/utils/src/abstract-message-stream.ts +++ b/packages/utils/src/abstract-message-stream.ts @@ -312,6 +312,7 @@ export abstract class AbstractMessageStream