Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
155 changes: 153 additions & 2 deletions src/supervisor/agents/claude/probe.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,100 @@
import { describe, expect, it } from "vitest";
import { claudeCapabilitiesFromCliVersion, win32PathToWslMount } from "./probe";
import { PassThrough } from "node:stream";
import { describe, expect, it, vi, beforeEach } from "vitest";
import type {
Query,
SDKMessage,
SpawnedProcess,
SpawnOptions,
} from "@anthropic-ai/claude-agent-sdk";

const mockSdk = vi.hoisted(() => ({
query: vi.fn<(input: unknown) => Query>(),
}));

const mockChildProcess = vi.hoisted(() => ({
spawn:
vi.fn<(command: string, args: string[], options: Record<string, unknown>) => SpawnedProcess>(),
}));

vi.mock("@anthropic-ai/claude-agent-sdk", () => ({
query: mockSdk.query,
}));

vi.mock("node:child_process", async () => {
const actual = await vi.importActual<typeof import("node:child_process")>("node:child_process");
return {
...actual,
spawn: mockChildProcess.spawn,
};
});

import {
claudeCapabilitiesFromCliVersion,
probeClaudeCapabilities,
win32PathToWslMount,
} from "./probe";
import { spawnClaudeProbeProcess } from "./sdkProbeProcess";

function epipeError(): NodeJS.ErrnoException {
return Object.assign(new Error("write EPIPE"), { code: "EPIPE", syscall: "write" });
}

function ebadfError(): NodeJS.ErrnoException {
return Object.assign(new Error("write EBADF"), { code: "EBADF", syscall: "write" });
}

function makeSpawnedProcess(): SpawnedProcess {
return {
stdin: new PassThrough(),
stdout: new PassThrough(),
killed: false,
exitCode: null,
kill: vi.fn<() => boolean>().mockReturnValue(true),
on() {},
once() {},
off() {},
} as unknown as SpawnedProcess;
}

function createProbeQuery(): Query {
let closed = false;
return {
async next(): Promise<IteratorResult<SDKMessage>> {
if (closed) return { done: true, value: undefined };
return { done: true, value: undefined };
},
async return(): Promise<IteratorResult<SDKMessage>> {
closed = true;
return { done: true, value: undefined };
},
async throw(error?: unknown): Promise<IteratorResult<SDKMessage>> {
throw error;
},
[Symbol.asyncIterator]() {
return this;
},
interrupt: vi.fn<() => Promise<void>>().mockResolvedValue(undefined),
setPermissionMode: vi.fn<() => Promise<void>>().mockResolvedValue(undefined),
setModel: vi.fn<() => Promise<void>>().mockResolvedValue(undefined),
setMaxThinkingTokens: vi.fn<() => Promise<void>>().mockResolvedValue(undefined),
applyFlagSettings: vi.fn<() => Promise<void>>().mockResolvedValue(undefined),
initializationResult: vi.fn<() => Promise<unknown>>().mockResolvedValue({
commands: [{ name: "help", description: "Show help" }],
}),
supportedCommands: vi.fn<() => Promise<unknown[]>>().mockResolvedValue([]),
supportedModels: vi.fn<() => Promise<unknown[]>>().mockResolvedValue([]),
getContextUsage: vi.fn<() => Promise<unknown>>().mockResolvedValue({}),
close: vi.fn<() => void>(() => {
closed = true;
}),
} as unknown as Query;
}

beforeEach(() => {
mockSdk.query.mockReset();
mockChildProcess.spawn.mockReset();
mockChildProcess.spawn.mockImplementation(() => makeSpawnedProcess());
});

describe("claudeCapabilitiesFromCliVersion", () => {
it("hides Opus 4.7 and 4.8 when CLI is below 2.1.111", () => {
Expand Down Expand Up @@ -40,3 +135,59 @@ describe("win32PathToWslMount", () => {
expect(win32PathToWslMount("//wsl.localhost/Ubuntu/home/u/w.mjs")).toBe("/home/u/w.mjs");
});
});

describe("Claude SDK probe process handling", () => {
it("contains probe-owned stdin EPIPE from the Claude SDK child", async () => {
mockSdk.query.mockImplementation((input: unknown) => {
const params = input as {
options?: {
spawnClaudeCodeProcess?: (options: SpawnOptions) => SpawnedProcess;
};
};
const spawnForProbe = params.options?.spawnClaudeCodeProcess;
expect(spawnForProbe).toEqual(expect.any(Function));

const child = spawnForProbe!({
command: "claude",
args: ["--sdk-mcp-server"],
cwd: "/tmp",
env: {},
signal: new AbortController().signal,
});
expect(() => child.stdin.emit("error", epipeError())).not.toThrow();

return createProbeQuery();
});

const result = await probeClaudeCapabilities({
location: { kind: "posix", path: "/tmp" },
executablePath: "claude",
version: "2.1.154",
});

expect(result?.slashCommands).toEqual([
{ id: "help", label: "help — Show help", description: "Show help" },
]);
expect(mockChildProcess.spawn).toHaveBeenCalledWith(
"claude",
["--sdk-mcp-server"],
expect.objectContaining({
cwd: "/tmp",
stdio: ["pipe", "pipe", "pipe"],
windowsHide: true,
}),
);
});

it("does not hide unrelated probe stdin errors", () => {
const child = spawnClaudeProbeProcess({
command: "claude",
args: ["--sdk-mcp-server"],
cwd: "/tmp",
env: {},
signal: new AbortController().signal,
});

expect(() => child.stdin.emit("error", ebadfError())).toThrow("write EBADF");
});
});
2 changes: 2 additions & 0 deletions src/supervisor/agents/claude/probe.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import { CLAUDE_FAST_MODE_DISABLED_MESSAGE } from "./detection";
import { resolveFastModeCachePath } from "./fastModeCache";
import { resolveFastAvailability } from "./fastModeProbe";
import { AsyncPromptQueue } from "./promptQueue";
import { spawnClaudeProbeProcess } from "./sdkProbeProcess";

const CLAUDE_TERMINAL_AUTH_METHOD: AgentAuthMethod = {
type: "terminal",
Expand Down Expand Up @@ -136,6 +137,7 @@ async function probeClaudeSdkPartialNative(
settingSources: ["user", "project", "local"],
allowedTools: [],
stderr: () => {},
spawnClaudeCodeProcess: spawnClaudeProbeProcess,
},
});
const init = await q.initializationResult();
Expand Down
30 changes: 30 additions & 0 deletions src/supervisor/agents/claude/sdkProbeProcess.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import { spawn } from "node:child_process";
import type { SpawnOptions, SpawnedProcess } from "@anthropic-ai/claude-agent-sdk";

function isEpipeError(error: Error): boolean {
const code = (error as NodeJS.ErrnoException).code;
return code === "EPIPE" || error.message === "write EPIPE";
}

/**
* The SDK probe owns a short-lived child process and may close/abort it while the
* SDK is still draining streaming input. On Windows that can emit stdin EPIPE
* outside the SDK's awaited promise chain; keep that broken pipe local to this
* probe child while preserving all other stream errors.
*/
export function spawnClaudeProbeProcess(options: SpawnOptions): SpawnedProcess {
const child = spawn(options.command, options.args, {
env: options.env,
signal: options.signal,
stdio: ["pipe", "pipe", "pipe"],
windowsHide: true,
...(options.cwd ? { cwd: options.cwd } : {}),
}) as unknown as SpawnedProcess;

child.stdin.on("error", (error: Error) => {
if (isEpipeError(error)) return;
throw error;
});

return child;
}
2 changes: 2 additions & 0 deletions src/supervisor/agents/claude/sdkProbeWorker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import { query } from "@anthropic-ai/claude-agent-sdk";
import type { SlashCommand } from "@anthropic-ai/claude-agent-sdk";
import { AsyncPromptQueue } from "./promptQueue";
import { resolveFastAvailability } from "./fastModeProbe";
import { spawnClaudeProbeProcess } from "./sdkProbeProcess";

function mapCommands(commands: SlashCommand[]) {
return commands.map((c) => ({
Expand Down Expand Up @@ -47,6 +48,7 @@ async function main() {
settingSources: ["user", "project", "local"],
allowedTools: [],
stderr: () => {},
spawnClaudeCodeProcess: spawnClaudeProbeProcess,
},
});

Expand Down