Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/agents-terminal-connection-error.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"agents": minor
"@cloudflare/ai-chat": minor
---

Stop reconnecting on terminal WebSocket close events and expose terminal connection failures via `connectionError` / `onConnectionError` on `AgentClient`, `useAgent`, and `useAgentChat`.
5 changes: 5 additions & 0 deletions .changeset/think-runturn-stream-input.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@cloudflare/think": minor
---

Allow `runTurn({ mode: "stream" })` to accept array and function inputs, matching the existing `wait` mode input surface while preserving the durable `submit` function-input guard.
4 changes: 0 additions & 4 deletions .github/workflows/nightly.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ jobs:
key: ${{ runner.os }}-playwright-${{ steps.playwright-version.outputs.version }}

- name: Install Playwright browsers
if: steps.playwright-cache.outputs.cache-hit != 'true'
run: pnpm exec playwright install --with-deps chromium

- name: Run ai-chat deterministic e2e tests
Expand Down Expand Up @@ -83,7 +82,6 @@ jobs:
key: ${{ runner.os }}-playwright-${{ steps.playwright-version.outputs.version }}

- name: Install Playwright browsers
if: steps.playwright-cache.outputs.cache-hit != 'true'
run: pnpm exec playwright install --with-deps chromium

- name: Run ai-chat Workers AI e2e tests
Expand Down Expand Up @@ -224,7 +222,6 @@ jobs:
key: ${{ runner.os }}-playwright-${{ steps.playwright-version.outputs.version }}

- name: Install Playwright browsers
if: steps.playwright-cache.outputs.cache-hit != 'true'
run: pnpm exec playwright install --with-deps chromium

- name: Run codemode e2e tests
Expand Down Expand Up @@ -260,7 +257,6 @@ jobs:
key: ${{ runner.os }}-playwright-${{ steps.playwright-version.outputs.version }}

- name: Install Playwright browsers
if: steps.playwright-cache.outputs.cache-hit != 'true'
run: pnpm exec playwright install --with-deps chromium

- name: Run agents browser-connector e2e tests
Expand Down
1 change: 0 additions & 1 deletion .github/workflows/pullrequest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ jobs:
key: ${{ runner.os }}-playwright-${{ steps.playwright-version.outputs.version }}

- name: Install Playwright browsers
if: steps.playwright-cache.outputs.cache-hit != 'true'
run: pnpm exec playwright install --with-deps chromium

- run: CI=true pnpm exec nx affected -t test
Expand Down
1 change: 0 additions & 1 deletion .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ jobs:
key: ${{ runner.os }}-playwright-${{ steps.playwright-version.outputs.version }}

- name: Install Playwright browsers
if: steps.playwright-cache.outputs.cache-hit != 'true'
run: pnpm exec playwright install --with-deps chromium

- run: CI=true pnpm exec nx run-many -t test
Expand Down
3 changes: 2 additions & 1 deletion design/rfc-sub-agent-routing.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# RFC: External addressability for sub-agents

Status: proposed
Status: accepted — shipped as an experimental routing primitive; D6 client retry
hardening and the follow-up table remain open.

> Current behavior note: this RFC records the original routing proposal. For
> the current `parentAgent()` implementation, including facet-only direct
Expand Down
4 changes: 3 additions & 1 deletion design/rfc-think-channels.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
Status: in progress
Status: v1 shipped; five tracked follow-ups remain (DeliveryKind on `post()`,
sub-agent channels, attachment ordering, voice transport, and recovery
convergence).

# RFC: Think channels and notices

Expand Down
19 changes: 11 additions & 8 deletions design/rfc-think-turns.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
Status: proposed
Status: accepted — `runTurn`, `addMessages`, `TurnSpec`, and `_admitTurn`
shipped; the remaining input-superset work is tracked as follow-up.

# RFC: Think turns — `runTurn()`, `TurnSpec`, and `addMessages()`

Expand Down Expand Up @@ -147,11 +148,11 @@ turn.

## The proposal

Three additions, in order of importance (#3 is already shipped — see Status):
Three additions, in order of importance (all three have shipped — see Status):

1. `runTurn(options)` — public unifying turn API. **(not built)**
1. `runTurn(options)` — public unifying turn API. **(✅ shipped)**
2. `TurnSpec` + an internal `_admitTurn(spec)` routine — the shared admission
path the existing methods delegate to. **(not built)**
path the existing methods delegate to. **(✅ shipped)**
3. `addMessages(...)` — public no-turn transcript write. **(✅ shipped)**

### 1. `runTurn(options)`
Expand Down Expand Up @@ -798,7 +799,9 @@ must be preserved by the parity tests, not by this prose.

## The decision

_Pending review._ Proposed direction: ship `runTurn` + `addMessages` as additive
APIs, extract `_admitTurn`/`TurnSpec` as a behavior-preserving internal refactor
sequenced after recovery RFC Phases 0–1 (ideally Phase 3), and lead the docs with
`runTurn` while keeping the existing methods as shortcuts.
Accepted and shipped. `runTurn` + `addMessages` landed as additive APIs,
`_admitTurn`/`TurnSpec` landed as a behavior-preserving internal refactor after
the recovery foundation, and docs can lead with `runTurn` while keeping existing
methods as shortcuts. Remaining input-superset work is tracked separately:
`stream` should accept array/function input, while `submit` + function input
requires durable submission pipeline changes.
13 changes: 13 additions & 0 deletions design/test-coverage-matrix.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,14 @@ Legend: ✅ covered · ⚠️ partial / indirect · — none · 🚫 gated (opt-
| Codemode (dynamic worker exec) | ✅ | ✅ | ✅ | ✅ `codemode/src/tests/` (browser) | — | ⚠️ `codemode/e2e/` (**not nightly**) | — |
| Shared-engine genericity (pi / tanstack adapters) | ✅ | ✅ | ✅ | — | ✅ `experimental/pi-recovery`, `tanstack-recovery` (workers-ai leg 🚫) | — | ✅ tanstack leg |

### Accepted coverage gaps

- **Chat recovery orphan-persist (c) tool-approval dedup:** no real-SIGKILL L4
e2e by design. The behavior is covered at L1/2 and L3 via
`reconcileOrphanPartial` unit coverage and `durable-chat-recovery.test.ts`;
the RFC progress log records the L4 gap as accepted because marginal value is
low and harness cost is high.

## Skipped & quarantined test debt

Tracked so it doesn't rot. **Group A** is intentional opt-in (live/billable deps);
Expand Down Expand Up @@ -157,3 +165,8 @@ sign-off; the non-workflow fix is already applied.
transcript-delivery divergence (Think pushes; ai-chat owns via
`getInitialMessages`) is recorded as a keep-divergent row in the chat-recovery
RFC convergence matrix.
- **Design-reconciliation follow-up:** corrected stale RFC status lines for
shipped Turns, sub-agent routing, and Channels work; kept the ai-chat
`agent-tool-recovery` L4 parity coverage in the feature matrix; and documented
the remaining accepted tool-approval-dedup SIGKILL gap separately from skipped
test debt.
2 changes: 1 addition & 1 deletion examples/deploy-churn/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
"@types/react": "^19.2.17",
"@types/react-dom": "^19.2.3",
"@vitejs/plugin-react": "^6.0.2",
"partysocket": "1.2.0",
"partysocket": "1.3.0",
"tailwindcss": "^4.3.0",
"tsx": "^4.22.4",
"typescript": "^6.0.3",
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@
"oxfmt": "^0.54.0",
"oxlint": "^1.69.0",
"partyserver": "^0.5.8",
"partysocket": "1.2.0",
"partysocket": "1.3.0",
"pkg-pr-new": "^0.0.75",
"playwright": "^1.60.0",
"react": "^19.2.7",
Expand Down
2 changes: 1 addition & 1 deletion packages/agents/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
"mimetext": "^3.0.28",
"nanoid": "^5.1.11",
"partyserver": "^0.5.8",
"partysocket": "1.2.0",
"partysocket": "1.3.0",
"yaml": "^2.9.0",
"yargs": "^18.0.0"
},
Expand Down
170 changes: 111 additions & 59 deletions packages/agents/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,68 +14,94 @@ import type {
import { MessageType } from "./types";
import { camelCaseToKebabCase, isInternalJsStubProp } from "./utils";

export class AgentConnectionError extends Error {
code: number;
reason: string;
wasClean: boolean;

constructor(event: CloseEvent) {
const reason = event.reason || `WebSocket closed with code ${event.code}`;
super(`Agent connection closed: ${reason}`);
this.name = "AgentConnectionError";
this.code = event.code;
this.reason = event.reason;
this.wasClean = event.wasClean;
}
}

export function isTerminalCloseEvent(event: CloseEvent): boolean {
return event.code === 1008 || (event.code >= 4000 && event.code <= 4999);
}

type TerminalReconnectOptions = {
shouldReconnectOnClose?: (event: CloseEvent) => boolean;
};

/**
* Options for creating an AgentClient
*/
export type AgentClientOptions<State = unknown> = Omit<
PartySocketOptions,
"party" | "room"
> & {
/** Name of the agent to connect to (ignored if basePath is set) */
agent: string;
/** Name of the specific Agent instance (ignored if basePath is set) */
name?: string;
/**
* Full URL path - bypasses agent/name URL construction.
* When set, the client connects to this path directly.
* Server must handle routing manually (e.g., with getAgentByName + fetch).
* @example
* // Client connects to /user, server routes based on session
* useAgent({ agent: "UserAgent", basePath: "user" })
*/
basePath?: string;
/** Called when the Agent's state is updated */
onStateUpdate?: (state: State, source: "server" | "client") => void;
/** Called when a state update fails (e.g., connection is readonly) */
onStateUpdateError?: (error: string) => void;
/**
* Called when the server sends the agent's identity on connect.
* Useful when using basePath, as the actual instance name is determined server-side.
* @param name The actual agent instance name
* @param agent The agent class name (kebab-case)
*/
onIdentity?: (name: string, agent: string) => void;
/**
* Called when identity changes on reconnect (different instance than before).
* If not provided and identity changes, a warning will be logged.
* @param oldName Previous instance name
* @param newName New instance name
* @param oldAgent Previous agent class name
* @param newAgent New agent class name
*/
onIdentityChange?: (
oldName: string,
newName: string,
oldAgent: string,
newAgent: string
) => void;
/**
* Additional path to append to the URL.
* Works with both standard routing and basePath.
* @example
* // With basePath: /user/settings
* { basePath: "user", path: "settings" }
* // Standard: /agents/my-agent/room/settings
* { agent: "MyAgent", name: "room", path: "settings" }
*/
path?: string;
/**
* Default timeout (in milliseconds) applied to non-streaming `call()`s
* that don't pass an explicit `timeout`. Defaults to 30 000 ms.
* Set to `0` to disable. Streaming calls never get a default timeout.
*/
defaultCallTimeout?: number;
};
> &
TerminalReconnectOptions & {
/** Name of the agent to connect to (ignored if basePath is set) */
agent: string;
/** Name of the specific Agent instance (ignored if basePath is set) */
name?: string;
/**
* Full URL path - bypasses agent/name URL construction.
* When set, the client connects to this path directly.
* Server must handle routing manually (e.g., with getAgentByName + fetch).
* @example
* // Client connects to /user, server routes based on session
* useAgent({ agent: "UserAgent", basePath: "user" })
*/
basePath?: string;
/** Called when the Agent's state is updated */
onStateUpdate?: (state: State, source: "server" | "client") => void;
/** Called when a state update fails (e.g., connection is readonly) */
onStateUpdateError?: (error: string) => void;
/**
* Called when the server sends the agent's identity on connect.
* Useful when using basePath, as the actual instance name is determined server-side.
* @param name The actual agent instance name
* @param agent The agent class name (kebab-case)
*/
onIdentity?: (name: string, agent: string) => void;
/**
* Called when identity changes on reconnect (different instance than before).
* If not provided and identity changes, a warning will be logged.
* @param oldName Previous instance name
* @param newName New instance name
* @param oldAgent Previous agent class name
* @param newAgent New agent class name
*/
onIdentityChange?: (
oldName: string,
newName: string,
oldAgent: string,
newAgent: string
) => void;
/**
* Additional path to append to the URL.
* Works with both standard routing and basePath.
* @example
* // With basePath: /user/settings
* { basePath: "user", path: "settings" }
* // Standard: /agents/my-agent/room/settings
* { agent: "MyAgent", name: "room", path: "settings" }
*/
path?: string;
/**
* Default timeout (in milliseconds) applied to non-streaming `call()`s
* that don't pass an explicit `timeout`. Defaults to 30 000 ms.
* Set to `0` to disable. Streaming calls never get a default timeout.
*/
defaultCallTimeout?: number;
/** Called when the connection closes with a terminal code and will not reconnect. */
onConnectionError?: (error: AgentConnectionError) => void;
};

/**
* Default timeout (in milliseconds) applied to non-streaming RPC calls
Expand Down Expand Up @@ -266,6 +292,12 @@ export class AgentClient<
*/
identified = false;

/**
* Terminal connection error, if the server closed the socket with a code
* that should not be retried automatically.
*/
connectionError: AgentConnectionError | null = null;

/**
* Promise that resolves when identity has been received from the server.
* Useful for waiting before making calls that depend on knowing the instance.
Expand Down Expand Up @@ -305,16 +337,25 @@ export class AgentClient<

constructor(options: AgentClientOptions<State>) {
const agentNamespace = camelCaseToKebabCase(options.agent);
const shouldReconnectOnClose = options.shouldReconnectOnClose;
const classifyReconnect = (event: CloseEvent) =>
(shouldReconnectOnClose?.(event) ?? true) && !isTerminalCloseEvent(event);

// If basePath is provided, use it directly; otherwise construct from agent/name
const socketOptions = options.basePath
? { basePath: options.basePath, path: options.path, ...options }
? {
basePath: options.basePath,
path: options.path,
...options,
shouldReconnectOnClose: classifyReconnect
}
: {
party: agentNamespace,
prefix: "agents",
room: options.name || "default",
path: options.path,
...options
...options,
shouldReconnectOnClose: classifyReconnect
};

super(socketOptions);
Expand Down Expand Up @@ -437,13 +478,15 @@ export class AgentClient<
// PartySocket flushes its internal message buffer right before
// dispatching "open" — anything queued has now been transmitted.
this.addEventListener("open", () => {
this.connectionError = null;
for (const pending of this._pendingCalls.values()) {
pending.transmitted = true;
}
});

// Clean up pending calls and reset ready state when connection closes
this.addEventListener("close", () => {
this.addEventListener("close", (event) => {
const terminalClose = isTerminalCloseEvent(event);
// Reset ready state for next connection
this.identified = false;
this._resetReady();
Expand All @@ -459,6 +502,11 @@ export class AgentClient<
// Permanent close (close() called or retries exhausted): nothing
// will ever flush the buffer, so reject everything.
this._rejectPendingCalls("Connection closed");
if (terminalClose) {
const error = new AgentConnectionError(event);
this.connectionError = error;
this.options.onConnectionError?.(error);
}
}
});

Expand Down Expand Up @@ -518,6 +566,10 @@ export class AgentClient<
args: unknown[] = [],
options?: CallOptions | StreamOptions
): Promise<unknown> {
if (this.connectionError && this.readyState === this.CLOSED) {
throw new Error("Connection closed");
}

return new Promise((resolve, reject) => {
const id = crypto.randomUUID();
let timeoutId: ReturnType<typeof setTimeout> | undefined;
Expand Down
Loading
Loading