Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions .changeset/rpc-calls-no-longer-hang.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
---
"agents": minor
---

Fix RPC calls hanging forever during connection churn (#1738).

`useAgent`'s RPC layer now survives socket replacement. `usePartySocket` creates a brand-new socket whenever connection options change (async query refresh, `enabled` toggle, path change) — previously, a call issued against a stale `agent` reference was buffered inside the permanently-closed old socket and its promise never settled, and a call transmitted just before replacement lost its response with no rejection either.

- `agent.call()` (and `agent.stub` / `agent.setState`) now route through the live socket, so stale references captured by mount-time effects keep working.
- RPC requests are only handed to a socket once it's open. Until then they're queued by the hook and flushed on the next open — including on a replacement socket. This is safe: queued requests were never transmitted, so they can't double-execute.
- Calls whose request was already transmitted are rejected with `Connection closed` when their socket closes or is replaced (the response is connection-bound and can never arrive). Calls in flight on a newer socket are no longer spuriously rejected by a stale close event from an old socket.
- Queued calls only follow the connection to the _same_ agent instance. If the hook is re-pointed at a different address (the `agent`, `name`, `basePath`, or path props change) before a queued call could be transmitted, the call is rejected instead of executing against an instance it wasn't composed for.
- `AgentClient` similarly keeps buffered (untransmitted) calls pending across transient disconnects — PartySocket re-sends them on reconnect — and only rejects calls the server actually received.
- Non-streaming calls now have a default 30s timeout as a backstop so lost responses reject instead of hanging. Configure per client via `defaultCallTimeout` (0 disables) on `useAgent` / `AgentClient`, or per call via the existing `timeout` option (`timeout: 0` opts out). Streaming calls are exempt.
- RPC responses that arrive with no matching pending call (e.g. after a timeout) now log a `console.warn` instead of being silently discarded.
90 changes: 78 additions & 12 deletions packages/agents/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,23 @@ export type AgentClientOptions<State = unknown> = Omit<
* { agent: "MyAgent", name: "room", path: "settings" }
*/
path?: string;
/**
* Default timeout (in milliseconds) applied to non-streaming `call()`s
* that don't pass an explicit `timeout`. Defaults to 30 000 ms.
* Set to `0` to disable. Streaming calls never get a default timeout.
*/
defaultCallTimeout?: number;
};

/**
* Default timeout (in milliseconds) applied to non-streaming RPC calls
* that don't pass an explicit `timeout`. Acts as a backstop so calls
* whose response is lost (e.g. the connection drops mid-flight) reject
* instead of hanging forever. Override per client via
* `defaultCallTimeout`, or per call via `timeout` (0 disables).
*/
export const DEFAULT_CALL_TIMEOUT_MS = 30_000;

/**
* Options for streaming RPC calls
*/
Expand Down Expand Up @@ -267,6 +282,14 @@ export class AgentClient<
resolve: (value: unknown) => void;
reject: (error: Error) => void;
stream?: StreamOptions;
/**
* Whether the request was actually transmitted to the server.
* Requests issued while disconnected sit in PartySocket's internal
* buffer (transmitted: false) and are flushed on the next open —
* they survive a transient close instead of being rejected, since
* the server never saw them and re-delivery can't double-execute.
*/
transmitted: boolean;
}
>();
private _readyPromise!: Promise<void>;
Expand Down Expand Up @@ -378,7 +401,13 @@ export class AgentClient<
if (parsedMessage.type === MessageType.RPC) {
const response = parsedMessage as RPCResponse;
const pending = this._pendingCalls.get(response.id);
if (!pending) return;
if (!pending) {
console.warn(
`[AgentClient] Discarded an RPC response with no matching pending call (id "${response.id}"). ` +
"The call likely timed out or was rejected when its connection closed before the response arrived."
);
return;
}

if (!response.success) {
pending.reject(new Error(response.error));
Expand All @@ -405,14 +434,32 @@ export class AgentClient<
}
});

// PartySocket flushes its internal message buffer right before
// dispatching "open" — anything queued has now been transmitted.
this.addEventListener("open", () => {
for (const pending of this._pendingCalls.values()) {
pending.transmitted = true;
}
});

// Clean up pending calls and reset ready state when connection closes
this.addEventListener("close", () => {
// Reset ready state for next connection
this.identified = false;
this._resetReady();

// Reject any remaining pending calls (e.g., from unexpected disconnect)
this._rejectPendingCalls("Connection closed");
if (this.shouldReconnect) {
// Transient disconnect: reject calls whose request was already
// transmitted — their response can never arrive. Buffered calls
// stay pending; PartySocket re-sends them on reconnect.
this._rejectPendingCalls("Connection closed", {
onlyTransmitted: true
});
} else {
// Permanent close (close() called or retries exhausted): nothing
// will ever flush the buffer, so reject everything.
this._rejectPendingCalls("Connection closed");
}
});

this.call = this._callImpl.bind(this) as AgentClientCall<AgentT>;
Expand All @@ -422,15 +469,21 @@ export class AgentClient<
}

/**
* Reject all pending RPC calls with the given reason.
* Reject pending RPC calls with the given reason.
* With `onlyTransmitted`, calls still sitting in the send buffer are
* kept pending (they'll be flushed on reconnect).
*/
private _rejectPendingCalls(reason: string) {
private _rejectPendingCalls(
reason: string,
{ onlyTransmitted = false }: { onlyTransmitted?: boolean } = {}
) {
const error = new Error(reason);
for (const pending of this._pendingCalls.values()) {
for (const [id, pending] of this._pendingCalls) {
if (onlyTransmitted && !pending.transmitted) continue;
this._pendingCalls.delete(id);
pending.reject(error);
pending.stream?.onError?.(reason);
}
this._pendingCalls.clear();
}

setState(state: State) {
Expand Down Expand Up @@ -480,15 +533,24 @@ export class AgentClient<
? undefined
: (options as CallOptions | undefined)?.timeout;

// Set up timeout if specified
if (timeout) {
// Apply the default timeout as a backstop for non-streaming calls
// so a lost response rejects instead of hanging forever. An
// explicit `timeout` (including 0 = disabled) always wins.
const effectiveTimeout =
timeout !== undefined
? timeout
: streamOptions
? undefined
: (this.options.defaultCallTimeout ?? DEFAULT_CALL_TIMEOUT_MS);

if (effectiveTimeout) {
timeoutId = setTimeout(() => {
const pending = this._pendingCalls.get(id);
this._pendingCalls.delete(id);
const errorMessage = `RPC call to ${method} timed out after ${timeout}ms`;
const errorMessage = `RPC call to ${method} timed out after ${effectiveTimeout}ms`;
pending?.stream?.onError?.(errorMessage);
reject(new Error(errorMessage));
}, timeout);
}, effectiveTimeout);
}

this._pendingCalls.set(id, {
Expand All @@ -500,7 +562,11 @@ export class AgentClient<
if (timeoutId) clearTimeout(timeoutId);
resolve(value);
},
stream: streamOptions
stream: streamOptions,
// If the socket is open, send() below transmits synchronously;
// otherwise the request is buffered until the next open event
// (the "open" listener then marks it transmitted).
transmitted: this.readyState === this.OPEN
});

const request: RPCRequest = {
Expand Down
142 changes: 142 additions & 0 deletions packages/agents/src/react-tests/client.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,148 @@ describe("AgentClient", () => {
});
});

describe("RPC robustness", () => {
it("rejects transmitted calls on a transient disconnect", async () => {
const { host, protocol } = getTestWorkerHost();

client = new AgentClient({
agent: "TestCallableAgent",
name: `rpc-transmitted-reject-${Date.now()}`,
host,
protocol
});

await client.ready;

// Transmitted while OPEN — its response is lost when the
// connection drops, so it must reject, not hang.
const inFlight = client.call("asyncMethod", [10000]);
client.reconnect();

await expect(inFlight).rejects.toThrow("Connection closed");
});

it("keeps buffered (untransmitted) calls pending across a transient disconnect and flushes them on reconnect", async () => {
const { host, protocol } = getTestWorkerHost();

client = new AgentClient({
agent: "TestCallableAgent",
name: `rpc-buffered-survive-${Date.now()}`,
host,
protocol
});

await client.ready;

// Drop the connection, then issue a call while disconnected — it
// sits in PartySocket's send buffer (never transmitted).
client.reconnect();
const buffered = client.call("add", [1, 2]);

// A second disconnect fires another close event. Before the fix,
// any close rejected ALL pending calls — including this one, whose
// request the server never saw. Now untransmitted calls survive
// and are flushed once the socket reconnects.
client.reconnect();

await expect(buffered).resolves.toBe(3);
});

it("applies a default timeout to non-streaming calls", async () => {
const { host, protocol } = getTestWorkerHost();

client = new AgentClient({
agent: "TestCallableAgent",
name: `rpc-default-timeout-${Date.now()}`,
host,
protocol,
defaultCallTimeout: 300
});

await client.ready;

// `hang` never responds — the default timeout backstop must fire.
await expect(client.call("hang")).rejects.toThrow(
/timed out after 300ms/
);
});

it("lets an explicit timeout of 0 disable the default timeout", async () => {
const { host, protocol } = getTestWorkerHost();

client = new AgentClient({
agent: "TestCallableAgent",
name: `rpc-timeout-zero-${Date.now()}`,
host,
protocol,
defaultCallTimeout: 100
});

await client.ready;

// Takes 400ms server-side — longer than defaultCallTimeout, but
// timeout: 0 opts this call out of the backstop entirely.
await expect(
client.call("asyncMethod", [400], { timeout: 0 })
).resolves.toBe("done");
});

it("does not apply the default timeout to streaming calls", async () => {
const { host, protocol } = getTestWorkerHost();
const chunks: unknown[] = [];

client = new AgentClient({
agent: "TestCallableAgent",
name: `rpc-stream-no-default-${Date.now()}`,
host,
protocol,
defaultCallTimeout: 150
});

await client.ready;

// 3 chunks × 100ms = ~300ms total, well past defaultCallTimeout.
const result = await client.call("streamWithDelay", [["a", "b"], 100], {
stream: { onChunk: (chunk) => chunks.push(chunk) }
});

expect(result).toBe("complete");
expect(chunks).toEqual(["a", "b"]);
});

it("warns when a response arrives for a call that already timed out", async () => {
const { host, protocol } = getTestWorkerHost();
const warnSpy = vi.spyOn(console, "warn");

client = new AgentClient({
agent: "TestCallableAgent",
name: `rpc-dropped-response-${Date.now()}`,
host,
protocol
});

await client.ready;

// Times out client-side after 50ms; the server response arrives
// ~300ms later with no matching pending call.
await expect(
client.call("asyncMethod", [300], { timeout: 50 })
).rejects.toThrow(/timed out/);

await vi.waitFor(
() => {
const warned = warnSpy.mock.calls.some((call) =>
String(call[0]).includes("Discarded an RPC response")
);
expect(warned).toBe(true);
},
{ timeout: 5000 }
);

warnSpy.mockRestore();
});
});

describe("identity change detection", () => {
it("should detect identity change on reconnect", async () => {
const { host, protocol } = getTestWorkerHost();
Expand Down
Loading
Loading