Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 119 additions & 7 deletions agentic-flow/src/transport/quic-loader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,20 @@ export interface AgentTransport {
onMessage?(handler: InboundMessageHandler, options?: OnMessageOptions): void;
}

/**
* WebSocket augmented with the liveness + origin bookkeeping the fallback
* transport attaches at runtime (ping/pong keepalive + inbound-reuse).
*/
type FedWebSocket = WebSocket & {
__alive?: boolean;
isAlive?: boolean;
__pingIv?: ReturnType<typeof setInterval>;
/** True if this socket was accepted by our server (vs dialed by us). */
__inbound?: boolean;
/** Peer IP key under which an inbound socket is indexed. */
__hostKey?: string;
};

/**
* WebSocket fallback transport.
*
Expand All @@ -157,7 +171,7 @@ export interface AgentTransport {
* messages at human/agent rates (≤ 100 RPS per peer).
*/
class WebSocketFallbackTransport implements AgentTransport {
private connections = new Map<string, WebSocket>();
private connections = new Map<string, FedWebSocket>();
/**
* Per-(address, streamId) message queue. Composite key shape
* `${address}#${streamId}` — see {@link queueKey}. Each stream gets
Expand All @@ -169,6 +183,13 @@ class WebSocketFallbackTransport implements AgentTransport {
private connectionsCreated = 0;
private connectionsClosed = 0;
private servers = new Map<number, WebSocketServer>();
/**
* Live INBOUND (server-accepted) sockets, keyed by normalized peer IP.
* A Set per IP tolerates churn (old + new socket from the same peer).
* Reused for replies so we never dial a fresh outbound on a link whose
* kernel route to the peer went stale (see getOrCreateConnection).
*/
private inboundByHost = new Map<string, Set<FedWebSocket>>();
/**
* Inbound handlers. Each entry is { handler, streamId? }. When
* streamId is undefined the handler receives ALL messages
Expand Down Expand Up @@ -253,14 +274,75 @@ class WebSocketFallbackTransport implements AgentTransport {
});
}

/** Normalize a raw remoteAddress or a dial address to a bare IPv4 key. */
private hostKey(addr: string): string {
let s = String(addr).replace(/^wss?:\/\//, '');
const lc = s.lastIndexOf(':');
if (lc > -1 && s.indexOf(':') === lc) s = s.slice(0, lc); // exactly one colon => host:port
return s.replace(/^::ffff:/, ''); // IPv4-mapped IPv6
}

/** Pick a live (OPEN) inbound socket for a peer IP, evicting dead ones. */
private liveInboundFor(ip: string): FedWebSocket | null {
const set = this.inboundByHost.get(ip);
if (!set) return null;
let chosen: FedWebSocket | null = null;
for (const ws of set) {
if (ws.readyState === WebSocket.OPEN && ws.__alive !== false && (ws.bufferedAmount || 0) < 1048576) {
chosen = ws; // prefer the newest OPEN socket
} else if (ws.readyState !== WebSocket.OPEN) {
set.delete(ws);
}
}
if (set.size === 0) this.inboundByHost.delete(ip);
return chosen;
}

/**
* Attach a 5s ping/pong liveness probe. A direct point-to-point link
* (e.g. a macOS Thunderbolt bridge) won't RST a dropped peer, so
* `readyState` can lag a dead TCP connection by seconds — we actively
* probe and set __alive=false on a missed pong so reuse logic skips it.
*/
private attachLiveness(ws: FedWebSocket): void {
ws.__alive = true;
ws.isAlive = true;
ws.on('pong', () => { ws.isAlive = true; ws.__alive = true; });
ws.__pingIv = setInterval(() => {
if (ws.isAlive === false) { ws.__alive = false; try { ws.terminate(); } catch { /* already gone */ } return; }
ws.isAlive = false;
try { ws.ping(); } catch { ws.__alive = false; }
}, 5000);
if (typeof ws.__pingIv.unref === 'function') ws.__pingIv.unref();
}

/**
* Wire the server's `connection` and per-socket `message` handlers.
* Extracted so the wss:// path (where the WebSocketServer is attached
* to a pre-created https.Server) can share the same logic.
*
* Also indexes each accepted socket by peer IP (`inboundByHost`) so
* getOrCreateConnection can REPLY on it (full-duplex) instead of dialing
* a fresh outbound — see the inbound-reuse note there.
*/
private attachServerHandlers(wss: WebSocketServer): void {
wss.on('connection', (ws, req) => {
wss.on('connection', (wsRaw, req) => {
const ws = wsRaw as FedWebSocket;
const remoteAddr = `${req.socket.remoteAddress}:${req.socket.remotePort}`;
const ip = this.hostKey(req.socket.remoteAddress ?? '');
ws.__inbound = true;
ws.__hostKey = ip;
let set = this.inboundByHost.get(ip);
if (!set) { set = new Set(); this.inboundByHost.set(ip, set); }
set.add(ws);
this.attachLiveness(ws);
const cleanup = (): void => {
if (ws.__pingIv) clearInterval(ws.__pingIv);
const s = this.inboundByHost.get(ip);
if (s) { s.delete(ws); if (s.size === 0) this.inboundByHost.delete(ip); }
};
ws.on('close', cleanup);
ws.on('error', cleanup);
ws.on('message', (raw: RawData) => {
try {
const message = JSON.parse(raw.toString()) as AgentMessage;
Expand All @@ -277,13 +359,23 @@ class WebSocketFallbackTransport implements AgentTransport {
});
}

private async getOrCreateConnection(address: string): Promise<WebSocket> {
private async getOrCreateConnection(address: string): Promise<FedWebSocket> {
return new Promise((resolve, reject) => {
const existing = this.connections.get(address);
if (existing && existing.readyState === WebSocket.OPEN) {
if (existing && existing.readyState === WebSocket.OPEN && existing.__alive !== false && (existing.bufferedAmount || 0) < 1048576) {
resolve(existing);
return;
}
if (existing) { try { existing.terminate(); } catch { /* already gone */ } this.connections.delete(address); }

// Inbound-reuse: before dialing a NEW outbound (which dies with
// EHOSTUNREACH when the kernel's cloned route to the peer has gone
// stale on a direct point-to-point link), reply on a live inbound
// socket from the same peer IP if one exists (WebSocket is
// full-duplex). The peer's heartbeat keeps one socket alive, so this
// is the happy path; the dial below remains the fallback.
const inbound = this.liveInboundFor(this.hostKey(address));
if (inbound) { resolve(inbound); return; }

const url = address.startsWith('ws://') || address.startsWith('wss://')
? address
Expand Down Expand Up @@ -341,11 +433,12 @@ class WebSocketFallbackTransport implements AgentTransport {
// (no tls config + ws:// → plain unencrypted; ADR-104 documents
// tailnet-as-TLS as the recommended path)

const ws = new WebSocket(url, wsOpts);
const ws = new WebSocket(url, wsOpts) as FedWebSocket;

ws.on('open', () => {
this.connections.set(address, ws);
this.connectionsCreated++;
this.attachLiveness(ws);
resolve(ws);
});

Expand All @@ -354,6 +447,7 @@ class WebSocketFallbackTransport implements AgentTransport {
});

ws.on('close', () => {
if (ws.__pingIv) clearInterval(ws.__pingIv);
this.connectionsClosed++;
this.connections.delete(address);
});
Expand All @@ -374,8 +468,25 @@ class WebSocketFallbackTransport implements AgentTransport {
}

async send(address: string, message: AgentMessage): Promise<void> {
const ws = await this.getOrCreateConnection(address);
ws.send(JSON.stringify(message));
let ws = await this.getOrCreateConnection(address);
const data = JSON.stringify(message);
try {
if (ws.readyState !== WebSocket.OPEN) throw new Error('socket not OPEN at send');
await new Promise<void>((res, rej) => ws.send(data, (err?: Error) => (err ? rej(err) : res())));
} catch {
// The chosen socket died between selection and send. Evict it (from
// the outbound pool OR the inbound index, by origin) and redial once
// — the dial path stays as the fallback, so no regression.
try { ws.terminate(); } catch { /* already gone */ }
if (ws.__inbound) {
const s = this.inboundByHost.get(ws.__hostKey ?? '');
if (s) { s.delete(ws); if (s.size === 0) this.inboundByHost.delete(ws.__hostKey ?? ''); }
} else {
this.connections.delete(address);
}
ws = await this.getOrCreateConnection(address);
await new Promise<void>((res, rej) => ws.send(data, (err?: Error) => (err ? rej(err) : res())));
}
}

/**
Expand Down Expand Up @@ -462,6 +573,7 @@ class WebSocketFallbackTransport implements AgentTransport {
}
this.connections.clear();
this.messageQueue.clear();
this.inboundByHost.clear();

// Inbound: WebSocketServer.close() blocks until every accepted
// socket disconnects. Forcibly terminate them so the close
Expand Down
46 changes: 46 additions & 0 deletions agentic-flow/tests/transport/quic-loader.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -339,3 +339,49 @@ describe('loadQuicTransport — selection contract', () => {
await t.close();
});
});

describe('WebSocketFallbackTransport — inbound-socket reuse', () => {
// Regression for EHOSTUNREACH on stale point-to-point routes: when a node
// has a live INBOUND socket from a peer, a send to that peer must REUSE it
// (WebSocket is full-duplex) instead of dialing a fresh outbound.
let a: WebSocketFallbackTransport | undefined;
let b: WebSocketFallbackTransport | undefined;

afterEach(async () => {
await closeAll(a, b);
a = undefined;
b = undefined;
});

it('replies on the peer inbound socket instead of dialing a new outbound', async () => {
const pA = TEST_PORT + 20;
const pB = TEST_PORT + 21;
a = await WebSocketFallbackTransport.create({ serverName: 'A' });
b = await WebSocketFallbackTransport.create({ serverName: 'B' });
await a.listen(pA, '127.0.0.1');
await b.listen(pB, '127.0.0.1');

const gotAtB: AgentMessage[] = [];
b.onMessage((_addr, m) => {
gotAtB.push(m);
});

// B dials A -> A accepts an inbound socket from 127.0.0.1
await b.send(`127.0.0.1:${pA}`, { id: 'b1', type: 'task', payload: 'hi-A' });
await new Promise((r) => setTimeout(r, 50)); // let A register the inbound

// A replies to B's listen address. A has NO live outbound to B, so the
// transport must reuse the inbound socket (full-duplex) rather than
// dialing a new outbound (the dial is what fails with EHOSTUNREACH on a
// stale macOS point-to-point route).
await a.send(`127.0.0.1:${pB}`, { id: 'a1', type: 'result', payload: 'hi-B' });
await new Promise((r) => setTimeout(r, 50));

// B received A's reply over the socket B originally opened.
expect(gotAtB.map((m) => m.id)).toContain('a1');
// Proof it was a reuse: A never created an outbound connection.
expect((await a.getStats()).created).toBe(0);
// Sanity: B dialed exactly one outbound to reach A.
expect((await b.getStats()).created).toBe(1);
});
});