From 29e456f1eb2e787ccbb619ccbe8a74d001d1b6ca Mon Sep 17 00:00:00 2001 From: Leonardo Pucci Burti <245049438+leoburti@users.noreply.github.com> Date: Thu, 4 Jun 2026 18:21:52 -0300 Subject: [PATCH] fix(transport): reuse inbound socket for replies in WebSocket fallback MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The WebSocketFallbackTransport always dials a NEW outbound connection to reach a peer. On a direct point-to-point link (e.g. a macOS Thunderbolt bridge) the kernel's cloned route to the peer can go stale, so every fresh outbound connect fails with `connect EHOSTUNREACH - Local (:)` (the OS picks the correct source and still reports no route) — while the inbound socket from that peer is perfectly usable. There was also no liveness/idle handling (`maxIdleTimeoutMs` was accepted but never used; `getOrCreateConnection` trusted only `readyState===OPEN`). This makes the transport: - index each server-accepted (inbound) socket by peer IP (`inboundByHost`) - in `getOrCreateConnection`, before dialing: reuse a live outbound, else a live inbound socket for the same peer IP (WebSocket is full-duplex), else dial (unchanged fallback — no regression when no inbound exists) - add a 5s ping/pong liveness probe + `__alive` gating so a half-open socket is detected and skipped (a direct link won't RST a dropped peer) - retry-once with eviction in `send()` if the chosen socket dies mid-send Validated in production over a Thunderbolt link (the failing direction went from 0/N to N/N, attempts:1, zero EHOSTUNREACH) and with a new unit test asserting the reply rides the inbound socket (`created` stays 0). Refs #162 Co-Authored-By: Claude Opus 4.8 (1M context) --- agentic-flow/src/transport/quic-loader.ts | 126 +++++++++++++++++- .../tests/transport/quic-loader.test.ts | 46 +++++++ 2 files changed, 165 insertions(+), 7 deletions(-) diff --git a/agentic-flow/src/transport/quic-loader.ts b/agentic-flow/src/transport/quic-loader.ts index 04937a668..ec31c22c4 100644 --- a/agentic-flow/src/transport/quic-loader.ts +++ b/agentic-flow/src/transport/quic-loader.ts @@ -142,6 +142,20 @@ export interface AgentTransport { onMessage?(handler: InboundMessageHandler, options?: OnMessageOptions): void; } +/** + * WebSocket augmented with the liveness + origin bookkeeping the fallback + * transport attaches at runtime (ping/pong keepalive + inbound-reuse). + */ +type FedWebSocket = WebSocket & { + __alive?: boolean; + isAlive?: boolean; + __pingIv?: ReturnType; + /** True if this socket was accepted by our server (vs dialed by us). */ + __inbound?: boolean; + /** Peer IP key under which an inbound socket is indexed. */ + __hostKey?: string; +}; + /** * WebSocket fallback transport. * @@ -157,7 +171,7 @@ export interface AgentTransport { * messages at human/agent rates (≤ 100 RPS per peer). */ class WebSocketFallbackTransport implements AgentTransport { - private connections = new Map(); + private connections = new Map(); /** * Per-(address, streamId) message queue. Composite key shape * `${address}#${streamId}` — see {@link queueKey}. Each stream gets @@ -169,6 +183,13 @@ class WebSocketFallbackTransport implements AgentTransport { private connectionsCreated = 0; private connectionsClosed = 0; private servers = new Map(); + /** + * Live INBOUND (server-accepted) sockets, keyed by normalized peer IP. + * A Set per IP tolerates churn (old + new socket from the same peer). + * Reused for replies so we never dial a fresh outbound on a link whose + * kernel route to the peer went stale (see getOrCreateConnection). + */ + private inboundByHost = new Map>(); /** * Inbound handlers. Each entry is { handler, streamId? }. When * streamId is undefined the handler receives ALL messages @@ -253,14 +274,75 @@ class WebSocketFallbackTransport implements AgentTransport { }); } + /** Normalize a raw remoteAddress or a dial address to a bare IPv4 key. */ + private hostKey(addr: string): string { + let s = String(addr).replace(/^wss?:\/\//, ''); + const lc = s.lastIndexOf(':'); + if (lc > -1 && s.indexOf(':') === lc) s = s.slice(0, lc); // exactly one colon => host:port + return s.replace(/^::ffff:/, ''); // IPv4-mapped IPv6 + } + + /** Pick a live (OPEN) inbound socket for a peer IP, evicting dead ones. */ + private liveInboundFor(ip: string): FedWebSocket | null { + const set = this.inboundByHost.get(ip); + if (!set) return null; + let chosen: FedWebSocket | null = null; + for (const ws of set) { + if (ws.readyState === WebSocket.OPEN && ws.__alive !== false && (ws.bufferedAmount || 0) < 1048576) { + chosen = ws; // prefer the newest OPEN socket + } else if (ws.readyState !== WebSocket.OPEN) { + set.delete(ws); + } + } + if (set.size === 0) this.inboundByHost.delete(ip); + return chosen; + } + + /** + * Attach a 5s ping/pong liveness probe. A direct point-to-point link + * (e.g. a macOS Thunderbolt bridge) won't RST a dropped peer, so + * `readyState` can lag a dead TCP connection by seconds — we actively + * probe and set __alive=false on a missed pong so reuse logic skips it. + */ + private attachLiveness(ws: FedWebSocket): void { + ws.__alive = true; + ws.isAlive = true; + ws.on('pong', () => { ws.isAlive = true; ws.__alive = true; }); + ws.__pingIv = setInterval(() => { + if (ws.isAlive === false) { ws.__alive = false; try { ws.terminate(); } catch { /* already gone */ } return; } + ws.isAlive = false; + try { ws.ping(); } catch { ws.__alive = false; } + }, 5000); + if (typeof ws.__pingIv.unref === 'function') ws.__pingIv.unref(); + } + /** * Wire the server's `connection` and per-socket `message` handlers. * Extracted so the wss:// path (where the WebSocketServer is attached * to a pre-created https.Server) can share the same logic. + * + * Also indexes each accepted socket by peer IP (`inboundByHost`) so + * getOrCreateConnection can REPLY on it (full-duplex) instead of dialing + * a fresh outbound — see the inbound-reuse note there. */ private attachServerHandlers(wss: WebSocketServer): void { - wss.on('connection', (ws, req) => { + wss.on('connection', (wsRaw, req) => { + const ws = wsRaw as FedWebSocket; const remoteAddr = `${req.socket.remoteAddress}:${req.socket.remotePort}`; + const ip = this.hostKey(req.socket.remoteAddress ?? ''); + ws.__inbound = true; + ws.__hostKey = ip; + let set = this.inboundByHost.get(ip); + if (!set) { set = new Set(); this.inboundByHost.set(ip, set); } + set.add(ws); + this.attachLiveness(ws); + const cleanup = (): void => { + if (ws.__pingIv) clearInterval(ws.__pingIv); + const s = this.inboundByHost.get(ip); + if (s) { s.delete(ws); if (s.size === 0) this.inboundByHost.delete(ip); } + }; + ws.on('close', cleanup); + ws.on('error', cleanup); ws.on('message', (raw: RawData) => { try { const message = JSON.parse(raw.toString()) as AgentMessage; @@ -277,13 +359,23 @@ class WebSocketFallbackTransport implements AgentTransport { }); } - private async getOrCreateConnection(address: string): Promise { + private async getOrCreateConnection(address: string): Promise { return new Promise((resolve, reject) => { const existing = this.connections.get(address); - if (existing && existing.readyState === WebSocket.OPEN) { + if (existing && existing.readyState === WebSocket.OPEN && existing.__alive !== false && (existing.bufferedAmount || 0) < 1048576) { resolve(existing); return; } + if (existing) { try { existing.terminate(); } catch { /* already gone */ } this.connections.delete(address); } + + // Inbound-reuse: before dialing a NEW outbound (which dies with + // EHOSTUNREACH when the kernel's cloned route to the peer has gone + // stale on a direct point-to-point link), reply on a live inbound + // socket from the same peer IP if one exists (WebSocket is + // full-duplex). The peer's heartbeat keeps one socket alive, so this + // is the happy path; the dial below remains the fallback. + const inbound = this.liveInboundFor(this.hostKey(address)); + if (inbound) { resolve(inbound); return; } const url = address.startsWith('ws://') || address.startsWith('wss://') ? address @@ -341,11 +433,12 @@ class WebSocketFallbackTransport implements AgentTransport { // (no tls config + ws:// → plain unencrypted; ADR-104 documents // tailnet-as-TLS as the recommended path) - const ws = new WebSocket(url, wsOpts); + const ws = new WebSocket(url, wsOpts) as FedWebSocket; ws.on('open', () => { this.connections.set(address, ws); this.connectionsCreated++; + this.attachLiveness(ws); resolve(ws); }); @@ -354,6 +447,7 @@ class WebSocketFallbackTransport implements AgentTransport { }); ws.on('close', () => { + if (ws.__pingIv) clearInterval(ws.__pingIv); this.connectionsClosed++; this.connections.delete(address); }); @@ -374,8 +468,25 @@ class WebSocketFallbackTransport implements AgentTransport { } async send(address: string, message: AgentMessage): Promise { - const ws = await this.getOrCreateConnection(address); - ws.send(JSON.stringify(message)); + let ws = await this.getOrCreateConnection(address); + const data = JSON.stringify(message); + try { + if (ws.readyState !== WebSocket.OPEN) throw new Error('socket not OPEN at send'); + await new Promise((res, rej) => ws.send(data, (err?: Error) => (err ? rej(err) : res()))); + } catch { + // The chosen socket died between selection and send. Evict it (from + // the outbound pool OR the inbound index, by origin) and redial once + // — the dial path stays as the fallback, so no regression. + try { ws.terminate(); } catch { /* already gone */ } + if (ws.__inbound) { + const s = this.inboundByHost.get(ws.__hostKey ?? ''); + if (s) { s.delete(ws); if (s.size === 0) this.inboundByHost.delete(ws.__hostKey ?? ''); } + } else { + this.connections.delete(address); + } + ws = await this.getOrCreateConnection(address); + await new Promise((res, rej) => ws.send(data, (err?: Error) => (err ? rej(err) : res()))); + } } /** @@ -462,6 +573,7 @@ class WebSocketFallbackTransport implements AgentTransport { } this.connections.clear(); this.messageQueue.clear(); + this.inboundByHost.clear(); // Inbound: WebSocketServer.close() blocks until every accepted // socket disconnects. Forcibly terminate them so the close diff --git a/agentic-flow/tests/transport/quic-loader.test.ts b/agentic-flow/tests/transport/quic-loader.test.ts index e54771a96..a0ea502b2 100644 --- a/agentic-flow/tests/transport/quic-loader.test.ts +++ b/agentic-flow/tests/transport/quic-loader.test.ts @@ -339,3 +339,49 @@ describe('loadQuicTransport — selection contract', () => { await t.close(); }); }); + +describe('WebSocketFallbackTransport — inbound-socket reuse', () => { + // Regression for EHOSTUNREACH on stale point-to-point routes: when a node + // has a live INBOUND socket from a peer, a send to that peer must REUSE it + // (WebSocket is full-duplex) instead of dialing a fresh outbound. + let a: WebSocketFallbackTransport | undefined; + let b: WebSocketFallbackTransport | undefined; + + afterEach(async () => { + await closeAll(a, b); + a = undefined; + b = undefined; + }); + + it('replies on the peer inbound socket instead of dialing a new outbound', async () => { + const pA = TEST_PORT + 20; + const pB = TEST_PORT + 21; + a = await WebSocketFallbackTransport.create({ serverName: 'A' }); + b = await WebSocketFallbackTransport.create({ serverName: 'B' }); + await a.listen(pA, '127.0.0.1'); + await b.listen(pB, '127.0.0.1'); + + const gotAtB: AgentMessage[] = []; + b.onMessage((_addr, m) => { + gotAtB.push(m); + }); + + // B dials A -> A accepts an inbound socket from 127.0.0.1 + await b.send(`127.0.0.1:${pA}`, { id: 'b1', type: 'task', payload: 'hi-A' }); + await new Promise((r) => setTimeout(r, 50)); // let A register the inbound + + // A replies to B's listen address. A has NO live outbound to B, so the + // transport must reuse the inbound socket (full-duplex) rather than + // dialing a new outbound (the dial is what fails with EHOSTUNREACH on a + // stale macOS point-to-point route). + await a.send(`127.0.0.1:${pB}`, { id: 'a1', type: 'result', payload: 'hi-B' }); + await new Promise((r) => setTimeout(r, 50)); + + // B received A's reply over the socket B originally opened. + expect(gotAtB.map((m) => m.id)).toContain('a1'); + // Proof it was a reuse: A never created an outbound connection. + expect((await a.getStats()).created).toBe(0); + // Sanity: B dialed exactly one outbound to reach A. + expect((await b.getStats()).created).toBe(1); + }); +});