From f0c9dd763e37df37bce8e726289752671612fd0c Mon Sep 17 00:00:00 2001 From: _Kerman Date: Tue, 16 Jun 2026 20:43:54 +0800 Subject: [PATCH 1/5] feat: detach foreground tasks to background --- .changeset/foreground-task-detach.md | 8 + .../src/agent/background/agent-task.ts | 50 +-- .../agent-core/src/agent/background/index.ts | 329 ++++++++++------ .../src/agent/background/persist.ts | 6 +- .../src/agent/background/process-task.ts | 65 +++- .../agent-core/src/agent/background/task.ts | 6 + packages/agent-core/src/agent/index.ts | 1 + packages/agent-core/src/agent/tool/index.ts | 3 +- packages/agent-core/src/rpc/core-api.ts | 4 + packages/agent-core/src/rpc/core-impl.ts | 5 + packages/agent-core/src/session/index.ts | 2 +- packages/agent-core/src/session/rpc.ts | 5 + .../agent-core/src/session/subagent-host.ts | 7 +- .../src/tools/builtin/collaboration/agent.ts | 282 +++++++------- .../src/tools/builtin/shell/bash.ts | 352 +++++++----------- packages/agent-core/src/utils/promise.ts | 11 + .../agent/background/agent-timeout.test.ts | 36 +- .../test/agent/background/helpers.ts | 26 ++ .../test/agent/background/ids.test.ts | 6 +- .../test/agent/background/manager.test.ts | 128 +++++-- .../test/agent/background/persist.test.ts | 1 + .../test/agent/background/rpc-events.test.ts | 22 +- .../agent/bg-idle-notification-repro.test.ts | 15 +- .../test/session/lifecycle-hooks.test.ts | 10 +- packages/agent-core/test/tools/agent.test.ts | 106 ++++-- .../test/tools/background/task-tools.test.ts | 12 +- .../agent-core/test/tools/bash-env.test.ts | 9 +- packages/agent-core/test/tools/bash.test.ts | 238 +++++++++--- .../test/tools/builtin-current.test.ts | 7 +- .../test/tools/shell-cancel.test.ts | 7 +- .../test/tools/shell-quoting.test.ts | 13 +- packages/node-sdk/src/rpc.ts | 11 + packages/node-sdk/src/session.ts | 17 + packages/protocol/src/events.ts | 1 + 34 files changed, 1158 insertions(+), 643 deletions(-) create mode 100644 .changeset/foreground-task-detach.md create mode 100644 packages/agent-core/src/utils/promise.ts diff --git a/.changeset/foreground-task-detach.md b/.changeset/foreground-task-detach.md new file mode 100644 index 000000000..fb4561669 --- /dev/null +++ b/.changeset/foreground-task-detach.md @@ -0,0 +1,8 @@ +--- +"@moonshot-ai/agent-core": patch +"@moonshot-ai/kimi-code-sdk": patch +"@moonshot-ai/protocol": patch +"@moonshot-ai/kimi-code": patch +--- + +Allow foreground shell and subagent tasks to be detached into background tasks. diff --git a/packages/agent-core/src/agent/background/agent-task.ts b/packages/agent-core/src/agent/background/agent-task.ts index 63eabd241..8d85aaf50 100644 --- a/packages/agent-core/src/agent/background/agent-task.ts +++ b/packages/agent-core/src/agent/background/agent-task.ts @@ -1,11 +1,10 @@ -import { sleep } from '@antfu/utils'; - import { errorMessage, isAbortError } from '../../loop/errors'; import { type BackgroundTask, type BackgroundTaskInfoBase, type BackgroundTaskSink, } from './task'; +import type { SessionSubagentHost, SubagentHandle } from '../../session/subagent-host'; export interface AgentBackgroundTaskInfo extends BackgroundTaskInfoBase { readonly kind: 'agent'; @@ -15,35 +14,25 @@ export interface AgentBackgroundTaskInfo extends BackgroundTaskInfoBase { readonly subagentType?: string; } -export interface AgentBackgroundTaskOptions { - readonly timeoutMs?: number; - readonly abort?: () => void; - readonly agentId?: string; - readonly subagentType?: string; -} - export class AgentBackgroundTask implements BackgroundTask { readonly kind = 'agent' as const; readonly idPrefix: string = 'agent'; - readonly timeoutMs?: number; - readonly agentId?: string; - readonly subagentType?: string; - private readonly abort?: () => void; + readonly agentId: string; + readonly subagentType: string; constructor( - private readonly completion: Promise<{ result: string }>, + private readonly handle: SubagentHandle, readonly description: string, - options: AgentBackgroundTaskOptions = {}, + private readonly subagentHost: Pick, + private readonly abortController: AbortController, ) { - this.timeoutMs = options.timeoutMs; - this.abort = options.abort; - this.agentId = options.agentId; - this.subagentType = options.subagentType; + this.agentId = handle.agentId; + this.subagentType = handle.profileName; } async start(sink: BackgroundTaskSink): Promise { const requestAbort = (): void => { - this.abort?.(); + this.abortController.abort(); }; if (sink.signal.aborted) { requestAbort(); @@ -51,23 +40,8 @@ export class AgentBackgroundTask implements BackgroundTask { sink.signal.addEventListener('abort', requestAbort, { once: true }); } - const deadlineTimeout: unique symbol = Symbol('background-agent-deadline'); - const raceInputs: Array> = [ - this.completion, - ]; - const timeoutMs = this.timeoutMs; - - if (timeoutMs !== undefined && timeoutMs > 0) { - raceInputs.push(sleep(timeoutMs).then(() => deadlineTimeout)); - } - try { - const outcome = await Promise.race(raceInputs); - if (outcome === deadlineTimeout) { - this.abort?.(); - await sink.settle({ status: 'timed_out' }); - return; - } + const outcome = await this.handle.completion; sink.appendOutput(outcome.result); await sink.settle({ status: 'completed' }); } catch (error: unknown) { @@ -81,6 +55,10 @@ export class AgentBackgroundTask implements BackgroundTask { } } + onDetach(): void { + this.subagentHost.markActiveChildDetached(this.agentId); + } + toInfo(base: BackgroundTaskInfoBase): AgentBackgroundTaskInfo { return { ...base, diff --git a/packages/agent-core/src/agent/background/index.ts b/packages/agent-core/src/agent/background/index.ts index 5c9963c57..065f6bc83 100644 --- a/packages/agent-core/src/agent/background/index.ts +++ b/packages/agent-core/src/agent/background/index.ts @@ -12,10 +12,12 @@ import { randomBytes } from 'node:crypto'; +import { createControlledPromise, sleep, type ControlledPromise } from '@antfu/utils'; import type { ContentPart } from '@moonshot-ai/kosong'; import type { Agent } from '../..'; import { errorMessage } from '../../loop/errors'; +import { timeoutOutcome } from '../../utils/promise'; import type { BackgroundTaskOrigin } from '../context'; import { renderNotificationXml } from '../context/notification-xml'; import { type BackgroundTaskPersistence } from './persist'; @@ -58,19 +60,22 @@ interface ManagedTask { /** Total UTF-8 bytes observed, including chunks dropped from the live ring buffer. */ outputSizeBytes: number; status: BackgroundTaskStatus; + /** Normalized registration options. Current mutable state stays on ManagedTask. */ + readonly options: RegisterBackgroundTaskOptions; readonly startedAt: number; endedAt: number | null; - /** Listeners awaiting task completion. */ - readonly waiters: Array<() => void>; - /** True once terminal notification/event side effects have already run. */ - terminalFired: boolean; + /** Foreground tool call release signal, present only for non-detached starts. */ + foregroundRelease?: ControlledPromise; + /** User/tool stop request. */ + readonly stop: ControlledPromise; + /** Resolved once manager has finalized the task. */ + readonly terminal: ControlledPromise; /** Human-readable reason for the terminal status, when available. */ stopReason?: string | undefined; /** Suppress automatic terminal notifications/reminders for this task. */ terminalNotificationSuppressed?: boolean | undefined; /** Cancellation signal owned by the manager and observed by the concrete task. */ readonly abortController: AbortController; - lifecyclePromise: Promise; persistWriteQueue: Promise; outputWriteQueue: Promise; } @@ -89,6 +94,7 @@ interface ManagedTask { const MAX_OUTPUT_BYTES = 1024 * 1024; // 1 MiB const SIGTERM_GRACE_MS = 5_000; +const USER_INTERRUPT_REASON = 'Interrupted by user'; const _ALPHABET = '0123456789abcdefghijklmnopqrstuvwxyz'; @@ -149,6 +155,29 @@ interface BackgroundTaskNotificationContext { const NOTIFICATION_TAIL_BYTES = 3_000; +export interface RegisterBackgroundTaskOptions { + /** + * When false, the task is tracked by the manager but a foreground tool call + * is still waiting for it. It can later be detached through RPC. + */ + readonly detached?: boolean; + /** Deadline owned by BackgroundManager. `0` and `undefined` do not arm a timer. */ + readonly timeoutMs?: number; + /** Foreground caller signal. Ignored for tasks created already detached. */ + readonly signal?: AbortSignal; +} + +export type ForegroundTaskReleaseReason = 'detached' | 'terminal'; + +interface StopRequest { + readonly reason?: string; +} + +type TerminalOutcome = + | { readonly kind: 'worker'; readonly settlement: BackgroundTaskSettlement } + | { readonly kind: 'timeout' } + | { readonly kind: 'stop'; readonly request: StopRequest }; + // ── Manager ────────────────────────────────────────────────────────── export class BackgroundManager { @@ -168,15 +197,8 @@ export class BackgroundManager { private readonly persistence?: BackgroundTaskPersistence, ) { } - /** - * Fire terminal side effects for a live task. Idempotent: the second - * invocation for the same task is a no-op so a lagging `wait()` - * resolver or a race between `stop()` and natural exit cannot yield - * duplicate notifications/events. - */ private fireTerminalEffects(entry: ManagedTask): void { - if (entry.terminalFired) return; - entry.terminalFired = true; + if (!this.isDetached(entry)) return; const info = this.toInfo(entry); void this.notifyBackgroundTask(info).catch(() => { }); this.emitTaskTerminated(info); @@ -198,28 +220,39 @@ export class BackgroundManager { }); } - private resolveWaiters(entry: ManagedTask): void { - const waiters = entry.waiters.splice(0); - for (const resolve of waiters) resolve(); - } - - private assertCanRegister(): void { + private assertCanRegister(startedInBackground: boolean): void { const maxRunningTasks = this.agent.kimiConfig?.background?.maxRunningTasks; if (maxRunningTasks === undefined) return; - if (this.activeTaskCount() < maxRunningTasks) return; + if (!startedInBackground) return; + if (this.activeBackgroundAdmissionCount() < maxRunningTasks) return; throw new Error('Too many background tasks are already running.'); } - private activeTaskCount(): number { + private activeBackgroundAdmissionCount(): number { let count = 0; for (const entry of this.tasks.values()) { - if (!TERMINAL_STATUSES.has(entry.status)) count++; + if (!TERMINAL_STATUSES.has(entry.status) && this.startedInBackground(entry)) count++; } return count; } - registerTask(task: BackgroundTask): string { - this.assertCanRegister(); + private startedInBackground(entry: ManagedTask): boolean { + return entry.options.detached !== false; + } + + private isDetached(entry: ManagedTask): boolean { + return entry.foregroundRelease === undefined; + } + + registerTask(task: BackgroundTask, options: RegisterBackgroundTaskOptions = {}): string { + const detached = options.detached ?? true; + const timeoutMs = options.timeoutMs ?? task.timeoutMs; + const entryOptions: RegisterBackgroundTaskOptions = { + detached, + timeoutMs, + signal: detached ? undefined : options.signal, + }; + this.assertCanRegister(detached); const taskId = generateTaskId(task.idPrefix); const entry: ManagedTask = { taskId, @@ -227,36 +260,24 @@ export class BackgroundManager { outputChunks: [], outputSizeBytes: 0, status: 'running', + options: entryOptions, startedAt: Date.now(), endedAt: null, - waiters: [], - terminalFired: false, + foregroundRelease: detached ? undefined : createControlledPromise(), + stop: createControlledPromise(), + terminal: createControlledPromise(), abortController: new AbortController(), - lifecyclePromise: Promise.resolve(), persistWriteQueue: Promise.resolve(), outputWriteQueue: Promise.resolve(), }; this.tasks.set(taskId, entry); - - entry.lifecyclePromise = Promise.resolve() - .then(() => task.start({ - signal: entry.abortController.signal, - appendOutput: (chunk) => { - this.appendOutput(entry, chunk); - }, - settle: (settlement) => this.settleTask(entry, settlement), - })) - .catch(async (error: unknown) => { - const aborted = entry.abortController.signal.aborted; - await this.settleTask(entry, { - status: aborted ? 'killed' : 'failed', - stopReason: aborted ? undefined : errorMessage(error), - }); - }); + void this.runTaskLifecycle(entry); // Initial persistence (snapshot at start). void this.persistLive(entry); - this.emitTaskStarted(this.toInfo(entry)); + if (this.isDetached(entry)) { + this.emitTaskStarted(this.toInfo(entry)); + } return taskId; } @@ -357,6 +378,25 @@ export class BackgroundManager { await this.persistLive(entry); } + detach(taskId: string): BackgroundTaskInfo | undefined { + const entry = this.tasks.get(taskId); + if (entry === undefined) return this.ghosts.get(taskId); + if (TERMINAL_STATUSES.has(entry.status)) return this.toInfo(entry); + const foregroundRelease = entry.foregroundRelease; + if (foregroundRelease === undefined) return this.toInfo(entry); + + entry.foregroundRelease = undefined; + try { + entry.task.onDetach?.(); + } catch { + /* detach has already succeeded; hooks must not make RPC fail */ + } + void this.persistLive(entry); + this.emitTaskStarted(this.toInfo(entry)); + foregroundRelease.resolve('detached'); + return this.toInfo(entry); + } + /** Stop a running task. SIGTERM → 5s grace → SIGKILL. */ async stop(taskId: string, reason?: string): Promise { const entry = this.tasks.get(taskId); @@ -375,46 +415,8 @@ export class BackgroundManager { entry.stopReason = stopReason; entry.abortController.abort(stopReason); - - // Wait up to 5s for the lifecycle path to settle, then SIGKILL. - // Waiting on lifecyclePromise, rather than the task directly, lets a - // natural completion win the race instead of being overwritten here. - let graceTimer: ReturnType | undefined; - const graceful = await Promise.race([ - entry.lifecyclePromise.then( - () => true, - () => true, - ), - new Promise((resolve) => { - graceTimer = setTimeout(() => { - resolve(false); - }, SIGTERM_GRACE_MS); - }), - ]); - if (graceTimer !== undefined) clearTimeout(graceTimer); - - if (TERMINAL_STATUSES.has(entry.status)) { - await entry.persistWriteQueue; - return this.toInfo(entry); - } - - if (!graceful) { - try { - await entry.task.forceStop?.(); - } catch { - /* ignore */ - } - } - - if (TERMINAL_STATUSES.has(entry.status)) { - await entry.persistWriteQueue; - return this.toInfo(entry); - } - - // Tasks whose lifecycle promise never settles need an explicit terminal - // finalize here after their stop/force-stop hooks have had a chance. - await this.settleTask(entry, { status: 'killed', stopReason }); - + entry.stop.resolve({ reason: stopReason }); + await entry.terminal; return this.toInfo(entry); } @@ -436,25 +438,7 @@ export class BackgroundManager { return this.toInfo(entry); } - let terminalWaiter: (() => void) | undefined; - let timeout: ReturnType | undefined; - try { - await Promise.race([ - new Promise((resolve) => { - terminalWaiter = resolve; - entry.waiters.push(resolve); - }), - new Promise((resolve) => { - timeout = setTimeout(resolve, timeoutMs); - }), - ]); - } finally { - if (timeout !== undefined) clearTimeout(timeout); - if (terminalWaiter !== undefined) { - const index = entry.waiters.indexOf(terminalWaiter); - if (index !== -1) entry.waiters.splice(index, 1); - } - } + await Promise.race([entry.terminal, sleep(timeoutMs)]); if (TERMINAL_STATUSES.has(entry.status)) { await entry.persistWriteQueue; @@ -462,6 +446,32 @@ export class BackgroundManager { return this.toInfo(entry); } + /** + * Wait until a foreground task either detaches from the current tool call or + * reaches a terminal state. Detached tasks return immediately. + */ + async waitForForegroundRelease( + taskId: string, + ): Promise { + const entry = this.tasks.get(taskId); + if (!entry) return undefined; + if (TERMINAL_STATUSES.has(entry.status)) { + await entry.persistWriteQueue; + return 'terminal'; + } + if (this.isDetached(entry)) return 'detached'; + + const foregroundRelease = entry.foregroundRelease; + const reason = await Promise.race([ + foregroundRelease, + entry.terminal.then(() => 'terminal' as const), + ]); + if (reason === 'terminal') { + await entry.persistWriteQueue; + } + return reason; + } + // ── persistence + reconcile ──────────────────────────────────────── /** @@ -571,6 +581,7 @@ export class BackgroundManager { private async buildBackgroundTaskNotificationContext( info: BackgroundTaskInfo, ): Promise { + if (info.detached === false) return undefined; if (this.isTerminalNotificationSuppressed(info.taskId)) return undefined; const origin: BackgroundTaskOrigin = { kind: 'background_task', @@ -633,27 +644,112 @@ export class BackgroundManager { ); } - private async settleTask( + private async runTaskLifecycle(entry: ManagedTask): Promise { + const worker = createControlledPromise(); + let workerSettled = false; + const settleWorker = (settlement: BackgroundTaskSettlement): boolean => { + if (workerSettled) return false; + workerSettled = true; + worker.resolve(settlement); + return true; + }; + + void Promise.resolve() + .then(() => entry.task.start({ + signal: entry.abortController.signal, + appendOutput: (chunk) => { + this.appendOutput(entry, chunk); + }, + settle: async (settlement) => settleWorker(settlement), + })) + .catch((error: unknown) => { + settleWorker({ + status: entry.abortController.signal.aborted ? 'killed' : 'failed', + stopReason: entry.abortController.signal.aborted ? undefined : errorMessage(error), + }); + }); + + const outcome = await Promise.race([ + worker.then((settlement): TerminalOutcome => ({ kind: 'worker', settlement })), + timeoutOutcome(entry.options.timeoutMs, { kind: 'timeout' as const }), + entry.stop.then((request): TerminalOutcome => ({ kind: 'stop', request })), + this.signalOutcome(entry), + ]); + const settlement = await this.settlementForOutcome(entry, outcome, worker); + await this.finalizeTask(entry, settlement); + } + + private signalOutcome(entry: ManagedTask): Promise { + const signal = entry.options.signal; + if (signal === undefined) return new Promise(() => {}); + const outcome: TerminalOutcome = { + kind: 'stop', + request: { reason: USER_INTERRUPT_REASON }, + }; + if (signal.aborted) return Promise.resolve(outcome); + return new Promise((resolve) => { + signal.addEventListener( + 'abort', + () => { + if (!this.isDetached(entry)) resolve(outcome); + }, + { once: true }, + ); + }); + } + + private async settlementForOutcome( entry: ManagedTask, - settlement: BackgroundTaskSettlement, - ): Promise { - if (TERMINAL_STATUSES.has(entry.status)) { - if (entry.status === 'killed' && settlement.status === 'killed') { - entry.endedAt = Math.max(Date.now(), (entry.endedAt ?? 0) + 1); - await this.persistLive(entry); - this.fireTerminalEffects(entry); - this.resolveWaiters(entry); + outcome: TerminalOutcome, + worker: Promise, + ): Promise { + if (outcome.kind === 'worker') return outcome.settlement; + + const timedOut = outcome.kind === 'timeout'; + const stopReason = outcome.kind === 'stop' ? outcome.request.reason : undefined; + entry.stopReason = stopReason; + entry.abortController.abort(timedOut ? 'Timed out' : stopReason); + + const workerAfterAbort = await Promise.race([ + worker, + sleep(SIGTERM_GRACE_MS).then(() => undefined), + ]); + + if ( + outcome.kind === 'stop' && + workerAfterAbort !== undefined && + workerAfterAbort.status !== 'killed' && + workerAfterAbort.status !== 'timed_out' + ) { + return workerAfterAbort; + } + + if (workerAfterAbort === undefined) { + try { + await entry.task.forceStop?.(); + } catch { + /* ignore */ } - return false; } + + return { + status: timedOut ? 'timed_out' : 'killed', + stopReason, + }; + } + + private async finalizeTask( + entry: ManagedTask, + settlement: BackgroundTaskSettlement, + ): Promise { entry.status = settlement.status; entry.endedAt = Date.now(); entry.stopReason = settlement.stopReason ?? (settlement.status === 'killed' ? entry.stopReason : undefined); await this.persistLive(entry); this.fireTerminalEffects(entry); - this.resolveWaiters(entry); - return true; + entry.foregroundRelease?.resolve('terminal'); + entry.terminal.resolve(); } private toInfo(entry: ManagedTask): BackgroundTaskInfo { @@ -661,11 +757,12 @@ export class BackgroundManager { taskId: entry.taskId, description: entry.task.description, status: entry.status, + detached: this.isDetached(entry), startedAt: entry.startedAt, endedAt: entry.endedAt, stopReason: entry.stopReason, terminalNotificationSuppressed: entry.terminalNotificationSuppressed, - timeoutMs: entry.task.timeoutMs, + timeoutMs: entry.options.timeoutMs, }; return entry.task.toInfo(base); } diff --git a/packages/agent-core/src/agent/background/persist.ts b/packages/agent-core/src/agent/background/persist.ts index 290521df7..f1edb5cb9 100644 --- a/packages/agent-core/src/agent/background/persist.ts +++ b/packages/agent-core/src/agent/background/persist.ts @@ -168,7 +168,10 @@ export class BackgroundTaskPersistence { function normalizePersistedTask(task: DiskPersistedTask): PersistedTask { if (isLegacyPersistedTask(task)) return legacyPersistedTaskToInfo(task); - return task; + return { + ...task, + detached: task.detached ?? true, + }; } type LegacyBackgroundTaskStatus = @@ -203,6 +206,7 @@ function legacyPersistedTaskToInfo(task: LegacyPersistedTask): PersistedTask { taskId: task.task_id, description: task.description, status, + detached: true, startedAt: task.started_at, endedAt: task.ended_at, stopReason, diff --git a/packages/agent-core/src/agent/background/process-task.ts b/packages/agent-core/src/agent/background/process-task.ts index d1a2e03b6..a0e27184e 100644 --- a/packages/agent-core/src/agent/background/process-task.ts +++ b/packages/agent-core/src/agent/background/process-task.ts @@ -1,4 +1,5 @@ import type { KaosProcess } from '@moonshot-ai/kaos'; +import type { Readable } from 'node:stream'; import { errorMessage } from '../../loop/errors'; import type { @@ -14,6 +15,13 @@ export interface ProcessBackgroundTaskInfo extends BackgroundTaskInfoBase { readonly exitCode: number | null; } +export type ProcessBackgroundTaskOutputKind = 'stdout' | 'stderr'; + +export type ProcessBackgroundTaskOutputCallback = ( + kind: ProcessBackgroundTaskOutputKind, + text: string, +) => void; + export class ProcessBackgroundTask implements BackgroundTask { readonly kind = 'process' as const; readonly idPrefix = 'bash'; @@ -23,18 +31,18 @@ export class ProcessBackgroundTask implements BackgroundTask { readonly proc: KaosProcess, readonly command: string, readonly description: string, + private readonly onOutput?: ProcessBackgroundTaskOutputCallback, ) {} async start(sink: BackgroundTaskSink): Promise { - for (const stream of [this.proc.stdout, this.proc.stderr]) { - stream.setEncoding('utf8'); - stream.on('data', (chunk: string) => { - sink.appendOutput(chunk); - }); - } + const streamSettled = [ + observeProcessStream(this.proc.stdout, 'stdout', sink, this.onOutput), + observeProcessStream(this.proc.stderr, 'stderr', sink, this.onOutput), + ]; const requestStop = (): void => { void this.proc.kill('SIGTERM').catch(() => {}); + destroyProcessStreams(this.proc); }; if (sink.signal.aborted) { requestStop(); @@ -44,11 +52,13 @@ export class ProcessBackgroundTask implements BackgroundTask { try { const exitCode = await this.proc.wait(); + await Promise.all(streamSettled); this.exitCode = exitCode; await sink.settle({ status: sink.signal.aborted ? 'killed' : exitCode === 0 ? 'completed' : 'failed', }); } catch (error: unknown) { + await Promise.allSettled(streamSettled); this.exitCode = this.proc.exitCode; await sink.settle({ status: sink.signal.aborted ? 'killed' : 'failed', @@ -62,6 +72,7 @@ export class ProcessBackgroundTask implements BackgroundTask { async forceStop(): Promise { if (this.proc.exitCode !== null) return; await this.proc.kill('SIGKILL'); + destroyProcessStreams(this.proc); } toInfo(base: BackgroundTaskInfoBase): ProcessBackgroundTaskInfo { @@ -74,3 +85,45 @@ export class ProcessBackgroundTask implements BackgroundTask { }; } } + +function destroyProcessStreams(proc: KaosProcess): void { + try { + proc.stdout.destroy(); + } catch { + /* ignore */ + } + try { + proc.stderr.destroy(); + } catch { + /* ignore */ + } +} + +function observeProcessStream( + stream: Readable, + kind: ProcessBackgroundTaskOutputKind, + sink: BackgroundTaskSink, + onOutput?: ProcessBackgroundTaskOutputCallback, +): Promise { + stream.setEncoding('utf8'); + stream.on('data', (chunk: string) => { + if (chunk.length === 0) return; + sink.appendOutput(chunk); + onOutput?.(kind, chunk); + }); + + return new Promise((resolve) => { + const done = (): void => { + cleanup(); + resolve(); + }; + const cleanup = (): void => { + stream.removeListener('end', done); + stream.removeListener('close', done); + stream.removeListener('error', done); + }; + stream.once('end', done); + stream.once('close', done); + stream.once('error', done); + }); +} diff --git a/packages/agent-core/src/agent/background/task.ts b/packages/agent-core/src/agent/background/task.ts index 6cb58437a..a053d4062 100644 --- a/packages/agent-core/src/agent/background/task.ts +++ b/packages/agent-core/src/agent/background/task.ts @@ -29,6 +29,11 @@ export interface BackgroundTaskInfoBase { readonly taskId: string; readonly description: string; readonly status: BackgroundTaskStatus; + /** + * `false` means a tool call is still waiting on this task in the + * foreground. Omitted legacy records should be treated as detached. + */ + readonly detached?: boolean; readonly startedAt: number; readonly endedAt: number | null; /** Human-readable reason for the terminal status, when available. */ @@ -57,6 +62,7 @@ export interface BackgroundTask { readonly timeoutMs?: number; start(sink: BackgroundTaskSink): void | Promise; + onDetach?(): void; forceStop?(): Promise; toInfo(base: BackgroundTaskInfoBase): BackgroundTaskInfo; } diff --git a/packages/agent-core/src/agent/index.ts b/packages/agent-core/src/agent/index.ts index 392c1d4ea..8655a8ded 100644 --- a/packages/agent-core/src/agent/index.ts +++ b/packages/agent-core/src/agent/index.ts @@ -402,6 +402,7 @@ export class Agent { stopBackground: (payload) => { void this.background.stop(payload.taskId, payload.reason); }, + detachBackground: (payload) => this.background.detach(payload.taskId), clearContext: () => { this.context.clear(); }, diff --git a/packages/agent-core/src/agent/tool/index.ts b/packages/agent-core/src/agent/tool/index.ts index b80a87f31..69c150023 100644 --- a/packages/agent-core/src/agent/tool/index.ts +++ b/packages/agent-core/src/agent/tool/index.ts @@ -408,9 +408,10 @@ export class ToolManager { this.agent.subagentHost && new b.AgentTool( this.agent.subagentHost, - allowBackground ? background : undefined, + background, DEFAULT_AGENT_PROFILES['agent']?.subagents, { + allowBackground, log: this.agent.log, }, ), diff --git a/packages/agent-core/src/rpc/core-api.ts b/packages/agent-core/src/rpc/core-api.ts index c3ba9f6a5..f90d8ce3e 100644 --- a/packages/agent-core/src/rpc/core-api.ts +++ b/packages/agent-core/src/rpc/core-api.ts @@ -192,6 +192,9 @@ export interface StopBackgroundPayload { /** Free-form human-readable reason persisted with the task record. */ readonly reason?: string; } +export interface DetachBackgroundPayload { + readonly taskId: string; +} export interface GetBackgroundOutputPayload { readonly taskId: string; readonly tail?: number; @@ -326,6 +329,7 @@ export interface AgentAPI { unregisterTool: (payload: UnregisterToolPayload) => void; setActiveTools: (payload: SetActiveToolsPayload) => void; stopBackground: (payload: StopBackgroundPayload) => void; + detachBackground: (payload: DetachBackgroundPayload) => BackgroundTaskInfo | undefined; clearContext: (payload: EmptyPayload) => void; activateSkill: (payload: ActivateSkillPayload) => void; startBtw: (payload: EmptyPayload) => string; diff --git a/packages/agent-core/src/rpc/core-impl.ts b/packages/agent-core/src/rpc/core-impl.ts index 204715da6..491c4fd20 100644 --- a/packages/agent-core/src/rpc/core-impl.ts +++ b/packages/agent-core/src/rpc/core-impl.ts @@ -51,6 +51,7 @@ import type { CoreInfo, CreateGoalPayload, CreateSessionPayload, + DetachBackgroundPayload, EmptyPayload, EnterSwarmPayload, GoalSnapshot, @@ -591,6 +592,10 @@ export class KimiCore implements PromisableMethods { return this.sessionApi(sessionId).stopBackground(payload); } + detachBackground({ sessionId, ...payload }: SessionAgentPayload) { + return this.sessionApi(sessionId).detachBackground(payload); + } + clearContext({ sessionId, ...payload }: SessionAgentPayload) { return this.sessionApi(sessionId).clearContext(payload); } diff --git a/packages/agent-core/src/session/index.ts b/packages/agent-core/src/session/index.ts index ca8c531a9..f3703c9ab 100644 --- a/packages/agent-core/src/session/index.ts +++ b/packages/agent-core/src/session/index.ts @@ -302,7 +302,7 @@ export class Session { const agentIds = new Set(); for (const agent of this.readyAgents()) { for (const task of agent.background.list(true)) { - if (task.kind === 'agent' && task.agentId !== undefined) { + if (task.kind === 'agent' && task.agentId !== undefined && task.detached !== false) { agentIds.add(task.agentId); } } diff --git a/packages/agent-core/src/session/rpc.ts b/packages/agent-core/src/session/rpc.ts index fe81014dc..254893156 100644 --- a/packages/agent-core/src/session/rpc.ts +++ b/packages/agent-core/src/session/rpc.ts @@ -6,6 +6,7 @@ import type { CancelPayload, CancelPlanPayload, CreateGoalPayload, + DetachBackgroundPayload, EmptyPayload, EnterSwarmPayload, GetBackgroundOutputPayload, @@ -174,6 +175,10 @@ export class SessionAPIImpl implements PromisableMethods { return (await this.getAgent(agentId)).stopBackground(payload); } + async detachBackground({ agentId, ...payload }: AgentScopedPayload) { + return (await this.getAgent(agentId)).detachBackground(payload); + } + async clearContext({ agentId, ...payload }: AgentScopedPayload) { return (await this.getAgent(agentId)).clearContext(payload); } diff --git a/packages/agent-core/src/session/subagent-host.ts b/packages/agent-core/src/session/subagent-host.ts index b47e1cd68..4016bc954 100644 --- a/packages/agent-core/src/session/subagent-host.ts +++ b/packages/agent-core/src/session/subagent-host.ts @@ -101,7 +101,7 @@ export class SessionSubagentHost { string, { readonly controller: AbortController; - readonly runInBackground: boolean; + runInBackground: boolean; } >(); @@ -246,6 +246,11 @@ export class SessionSubagentHost { } } + markActiveChildDetached(agentId: string): void { + const child = this.activeChildren.get(agentId); + if (child !== undefined) child.runInBackground = true; + } + async getProfileName(agentId: string): Promise { const metadata = this.session.metadata.agents[agentId]; if (metadata?.type !== 'sub' || metadata.parentAgentId !== this.ownerAgentId) { diff --git a/packages/agent-core/src/tools/builtin/collaboration/agent.ts b/packages/agent-core/src/tools/builtin/collaboration/agent.ts index 29de3ab7d..e8518b508 100644 --- a/packages/agent-core/src/tools/builtin/collaboration/agent.ts +++ b/packages/agent-core/src/tools/builtin/collaboration/agent.ts @@ -6,10 +6,9 @@ * constructor rather than through the Runtime) to create in-process subagent * loop instances. * - * Two modes: - * - **Foreground** (default): blocks the parent turn, `await handle.completion` - * - **Background**: returns the agent id immediately; the result is delivered - * via a notification. + * Foreground and background subagents both run through BackgroundManager. + * Foreground calls wait for the task to finish unless it is detached through + * the background-task RPC. * * `ToolResult.content` is textual; the structured output exposed by * `AgentToolOutputSchema` is only used for drift-guard and is not consumed at @@ -30,11 +29,7 @@ import { type SessionSubagentHost, type SubagentHandle, } from '../../../session/subagent-host'; -import { - createDeadlineAbortSignal, - isUserCancellation, - type DeadlineAbortSignal, -} from '../../../utils/abort'; +import { isUserCancellation } from '../../../utils/abort'; import { AgentBackgroundTask, type BackgroundManager } from '../../../agent/background'; import { toInputJsonSchema } from '../../support/input-schema'; import { matchesGlobRuleSubject } from '../../support/rule-match'; @@ -113,16 +108,18 @@ export class AgentTool implements BuiltinTool { readonly parameters: Record = toInputJsonSchema(AgentToolInputSchema); constructor( private readonly subagentHost: SessionSubagentHost, - private readonly backgroundManager?: BackgroundManager | undefined, + private readonly backgroundManager: BackgroundManager, subagents?: ResolvedAgentProfile['subagents'] | undefined, options?: { log?: Logger; + allowBackground?: boolean | undefined; }, ) { const log = options?.log; + this.allowBackground = options?.allowBackground ?? true; const typeLines = buildSubagentDescriptions(subagents); const baseDescription = `${AGENT_DESCRIPTION_BASE}\n\n${ - this.backgroundManager !== undefined ? AGENT_BACKGROUND_DESCRIPTION : AGENT_BACKGROUND_DISABLED_DESCRIPTION + this.allowBackground ? AGENT_BACKGROUND_DESCRIPTION : AGENT_BACKGROUND_DISABLED_DESCRIPTION }`; this.description = typeLines ? `${baseDescription}\n\nAvailable agent types (pass via subagent_type):\n${typeLines}` @@ -131,6 +128,7 @@ export class AgentTool implements BuiltinTool { } private readonly log?: Logger; + private readonly allowBackground: boolean; async resolveExecution(args: AgentToolInput): Promise { let profileName = args.subagent_type?.length ? args.subagent_type : 'coder'; @@ -157,11 +155,10 @@ export class AgentTool implements BuiltinTool { private async execution( args: AgentToolInput, { - toolCallId, - signal, + toolCallId, + signal, }: ExecutableToolContext, ): Promise { - let foregroundDeadline: DeadlineAbortSignal | undefined; try { signal.throwIfAborted(); const runInBackground = args.run_in_background === true; @@ -178,39 +175,40 @@ export class AgentTool implements BuiltinTool { }; } - if (runInBackground) { - if (this.backgroundManager === undefined) { - return { - output: BACKGROUND_AGENT_UNAVAILABLE, - isError: true, - }; - } + if (runInBackground && !this.allowBackground) { + return { + output: BACKGROUND_AGENT_UNAVAILABLE, + isError: true, + }; } - const backgroundController = runInBackground ? new AbortController() : undefined; - foregroundDeadline = - !runInBackground ? createDeadlineAbortSignal(signal, DEFAULT_SUBAGENT_TIMEOUT_MS) : undefined; - const options = { + const controller = new AbortController(); + const abortBeforeRegister = (): void => { + controller.abort(signal.reason); + }; + if (!runInBackground) { + signal.addEventListener('abort', abortBeforeRegister, { once: true }); + } + + const operation = resumeAgentId !== undefined && resumeAgentId.length > 0 ? 'resume' : 'spawn'; + const runOptions = { parentToolCallId: toolCallId, prompt: args.prompt, description: args.description, runInBackground, - signal: backgroundController?.signal ?? foregroundDeadline?.signal ?? signal, + signal: controller.signal, }; - let handle: SubagentHandle; - const operation = resumeAgentId !== undefined && resumeAgentId.length > 0 ? 'resume' : 'spawn'; try { - if (resumeAgentId !== undefined && resumeAgentId.length > 0) { - handle = await this.subagentHost.resume(resumeAgentId, options); - } else { - const profileName = requestedProfileName ?? 'coder'; - handle = await this.subagentHost.spawn({ - profileName, - ...options, - }); - } + handle = + operation === 'resume' + ? await this.subagentHost.resume(resumeAgentId!, runOptions) + : await this.subagentHost.spawn({ + profileName: requestedProfileName ?? 'coder', + ...runOptions, + }); } catch (error) { + signal.removeEventListener('abort', abortBeforeRegister); this.log?.warn('subagent launch failed', { toolCallId, runInBackground, @@ -222,105 +220,135 @@ export class AgentTool implements BuiltinTool { throw error; } + let taskId: string; + try { + taskId = this.backgroundManager.registerTask( + new AgentBackgroundTask(handle, args.description, this.subagentHost, controller), + { + detached: runInBackground, + timeoutMs: DEFAULT_SUBAGENT_TIMEOUT_MS, + signal: runInBackground ? undefined : signal, + }, + ); + signal.removeEventListener('abort', abortBeforeRegister); + } catch (error) { + controller.abort(); + void handle.completion.catch(() => {}); + signal.removeEventListener('abort', abortBeforeRegister); + this.log?.warn('background agent task registration failed', { + toolCallId, + agentId: handle.agentId, + subagentType: handle.profileName, + error, + }); + return { + output: error instanceof Error ? error.message : String(error), + isError: true, + }; + } + if (runInBackground) { - let taskId: string; - try { - taskId = this.backgroundManager!.registerTask( - new AgentBackgroundTask(handle.completion, args.description, { - timeoutMs: DEFAULT_SUBAGENT_TIMEOUT_MS, - agentId: handle.agentId, - subagentType: handle.profileName, - abort: () => { - backgroundController?.abort(); - }, - }), - ); - } catch (error) { - backgroundController?.abort(); - void handle.completion.catch(() => {}); - this.log?.warn('background agent task registration failed', { - toolCallId, - agentId: handle.agentId, - subagentType: handle.profileName, - error, - }); - return { - output: error instanceof Error ? error.message : String(error), - isError: true, - }; - } - const lines = [ - `task_id: ${taskId}`, - 'status: running', - `agent_id: ${handle.agentId}`, - `actual_subagent_type: ${handle.profileName}`, - 'automatic_notification: true', - '', - `description: ${args.description}`, - '', - `next_step: The completion arrives automatically in a later turn — no polling needed. To peek at progress without blocking, call TaskOutput(task_id="${taskId}", block=false).`, - `resume_hint: To continue or recover this same subagent later, call Agent(resume="${handle.agentId}", prompt="..."). The parameter is agent_id ("${handle.agentId}"), NOT task_id ("${taskId}") or source_id from a later . Recovery cases: a later for this subagent — its conversation history is preserved across session restarts and resume will pick it up.`, - ]; - return { output: lines.join('\n') }; + return { output: formatBackgroundAgentResult(taskId, handle, args.description) }; } - try { - const result = await handle.completion; - const lines = [ - `agent_id: ${handle.agentId}`, - `actual_subagent_type: ${handle.profileName}`, - 'status: completed', - '', - '[summary]', - result.result, - ]; - return { output: lines.join('\n') }; - } catch (error) { - let message: string; - const timedOut = foregroundDeadline?.timedOut() === true; - if (timedOut) { - message = `Agent timed out after ${DEFAULT_SUBAGENT_TIMEOUT_DESCRIPTION}.`; - } else if (isUserCancellation(signal.reason)) { - message = - 'The user manually interrupted this subagent (and any sibling agents launched alongside it). This was a deliberate user action, not a system error, a timeout, or a capacity/concurrency limit. Do not retry automatically or speculate about why it failed — wait for the user\'s next instruction.'; - } else if (isAbortError(error)) { - message = 'The subagent was stopped before it finished.'; - } else { - message = error instanceof Error ? error.message : String(error); - } - const lines = [ - `agent_id: ${handle.agentId}`, - `actual_subagent_type: ${handle.profileName}`, - 'status: failed', - '', - `subagent error: ${message}`, - ]; - if (timedOut) { - lines.push( - `resume_hint: Continue with Agent(resume="${handle.agentId}", prompt="continue"). Use agent_id only; do not set subagent_type. The subagent retains its prior context; redo any unfinished tool call if its result was lost.`, - ); - } - return { output: lines.join('\n'), isError: true }; + const release = await this.backgroundManager.waitForForegroundRelease(taskId); + if (release === 'detached') { + return { output: formatBackgroundAgentResult(taskId, handle, args.description) }; } + return await this.formatForegroundResult(taskId, handle); } catch (error) { - let message: string; - if (foregroundDeadline?.timedOut() === true) { - message = `Agent timed out after ${DEFAULT_SUBAGENT_TIMEOUT_DESCRIPTION}.`; - } else if (isUserCancellation(signal.reason)) { - message = - 'The user manually interrupted this subagent (and any sibling agents launched alongside it). This was a deliberate user action, not a system error, a timeout, or a capacity/concurrency limit. Do not retry automatically or speculate about why it failed — wait for the user\'s next instruction.'; - } else if (isAbortError(error)) { - message = 'The subagent was stopped before it finished.'; - } else { - message = error instanceof Error ? error.message : String(error); - } - return { output: `subagent error: ${message}`, isError: true }; - } finally { - foregroundDeadline?.clear(); + return { output: `subagent error: ${launchErrorMessage(error, signal)}`, isError: true }; + } + } + + private async formatForegroundResult( + taskId: string, + handle: SubagentHandle, + ): Promise { + const info = this.backgroundManager.getTask(taskId); + if (info?.status === 'completed') { + return { + output: formatForegroundAgentSuccess( + handle, + await this.backgroundManager.readOutput(taskId), + ), + }; } + const timedOut = info?.status === 'timed_out'; + const message = + timedOut + ? `Agent timed out after ${DEFAULT_SUBAGENT_TIMEOUT_DESCRIPTION}.` + : info?.stopReason === 'Interrupted by user' + ? USER_INTERRUPTED_SUBAGENT_MESSAGE + : info?.stopReason !== undefined + ? info.stopReason + : 'The subagent was stopped before it finished.'; + return { + output: formatForegroundAgentFailure(handle, message, timedOut), + isError: true, + }; } } +const USER_INTERRUPTED_SUBAGENT_MESSAGE = + 'The user manually interrupted this subagent (and any sibling agents launched alongside it). This was a deliberate user action, not a system error, a timeout, or a capacity/concurrency limit. Do not retry automatically or speculate about why it failed — wait for the user\'s next instruction.'; + +function formatBackgroundAgentResult( + taskId: string, + handle: SubagentHandle, + description: string, +): string { + return [ + `task_id: ${taskId}`, + 'status: running', + `agent_id: ${handle.agentId}`, + `actual_subagent_type: ${handle.profileName}`, + 'automatic_notification: true', + '', + `description: ${description}`, + '', + `next_step: The completion arrives automatically in a later turn — no polling needed. To peek at progress without blocking, call TaskOutput(task_id="${taskId}", block=false).`, + `resume_hint: To continue or recover this same subagent later, call Agent(resume="${handle.agentId}", prompt="..."). The parameter is agent_id ("${handle.agentId}"), NOT task_id ("${taskId}") or source_id from a later . Recovery cases: a later for this subagent — its conversation history is preserved across session restarts and resume will pick it up.`, + ].join('\n'); +} + +function formatForegroundAgentSuccess(handle: SubagentHandle, result: string): string { + return [ + `agent_id: ${handle.agentId}`, + `actual_subagent_type: ${handle.profileName}`, + 'status: completed', + '', + '[summary]', + result, + ].join('\n'); +} + +function formatForegroundAgentFailure( + handle: SubagentHandle, + message: string, + timedOut: boolean, +): string { + const lines = [ + `agent_id: ${handle.agentId}`, + `actual_subagent_type: ${handle.profileName}`, + 'status: failed', + '', + `subagent error: ${message}`, + ]; + if (timedOut) { + lines.push( + `resume_hint: Continue with Agent(resume="${handle.agentId}", prompt="continue"). Use agent_id only; do not set subagent_type. The subagent retains its prior context; redo any unfinished tool call if its result was lost.`, + ); + } + return lines.join('\n'); +} + +function launchErrorMessage(error: unknown, signal: AbortSignal): string { + if (isUserCancellation(signal.reason)) return USER_INTERRUPTED_SUBAGENT_MESSAGE; + if (isAbortError(error)) return 'The subagent was stopped before it finished.'; + return error instanceof Error ? error.message : String(error); +} + function buildSubagentDescriptions(subagents: ResolvedAgentProfile['subagents']): string { if (subagents === undefined) return ''; return Object.entries(subagents) diff --git a/packages/agent-core/src/tools/builtin/shell/bash.ts b/packages/agent-core/src/tools/builtin/shell/bash.ts index 679aff1d7..9313874f0 100644 --- a/packages/agent-core/src/tools/builtin/shell/bash.ts +++ b/packages/agent-core/src/tools/builtin/shell/bash.ts @@ -8,24 +8,20 @@ * - `Kaos` — shell execution abstraction (exec / execWithEnv) * - `cwd` — default working directory for commands * - `Environment` — cross-platform probe (shellName / shellPath) - * - `BackgroundManager?` — optional: required iff run_in_background=true + * - `BackgroundManager` — task lifecycle manager for foreground/background commands * * Execution goes through Kaos, never directly via node:child_process. * * Hardening: - * - `args.timeout` (seconds) and the ambient `signal` both drive - * `Promise.race`; fire-a-kill on either edge. + * - `args.timeout` (seconds) and the ambient `signal` both stop the + * manager-owned process task on either edge. * - stdin is closed immediately so interactive commands (`cat`, `read`, * `python -c 'input()'`) receive EOF instead of hanging. - * - Two-phase kill: SIGTERM → 5s grace → SIGKILL (Kaos honours this - * contract cross-platform). - * - stdout/stderr stream into ToolResultBuilder; excess is replaced with a - * truncation marker so a runaway command cannot OOM the host. + * - Two-phase kill is owned by BackgroundManager: SIGTERM → grace → SIGKILL. + * - stdout/stderr are captured by ProcessBackgroundTask for task output; + * foreground runs pass a callback to collect chunks for this call. */ -import type { Readable } from 'node:stream'; -import { StringDecoder } from 'node:string_decoder'; - import type { Kaos, KaosProcess } from '@moonshot-ai/kaos'; import { z } from 'zod'; @@ -43,7 +39,7 @@ const DEFAULT_TIMEOUT_S = 60; const MAX_TIMEOUT_S = 5 * 60; const DEFAULT_BACKGROUND_TIMEOUT_S = 10 * 60; const MAX_BACKGROUND_TIMEOUT_S = 24 * 60 * 60; -const SIGTERM_GRACE_MS = 5_000; +const USER_INTERRUPT_REASON = 'Interrupted by user'; export const BashInputSchema = z .object({ @@ -155,13 +151,13 @@ export class BashTool implements BuiltinTool { constructor( private readonly kaos: Kaos, private readonly cwd: string, - private readonly backgroundManager?: BackgroundManager, + private readonly backgroundManager: BackgroundManager, options?: { allowBackground?: boolean | undefined; }, ) { this.isWindowsBash = this.kaos.osEnv.osKind === 'Windows'; - this.allowBackground = options?.allowBackground ?? this.backgroundManager !== undefined; + this.allowBackground = options?.allowBackground ?? true; const rendered = renderBashDescription(this.kaos.osEnv.shellName); this.description = this.allowBackground ? rendered : withoutBackgroundDescription(rendered); } @@ -217,30 +213,23 @@ export class BashTool implements BuiltinTool { signal: AbortSignal, onUpdate?: ((update: ToolUpdate) => void) | undefined, ): Promise { - if (signal.aborted) { - return { isError: true, output: 'Aborted before command started' }; - } - if (args.command.length === 0) { - return { isError: true, output: 'Command cannot be empty.' }; - } - - if (args.run_in_background) { - if (!this.allowBackground) { - return { - isError: true, - output: - 'Background execution is not available for this agent because TaskOutput and TaskStop are not enabled.', - }; - } - return this.executeInBackground(args); - } + const validationError = this.validateRunRequest(args, signal); + if (validationError !== undefined) return validationError; - const timeoutMs = normalizeTimeoutMs(args.timeout, false); + const startsInBackground = args.run_in_background === true; + const foregroundTimeoutMs = normalizeTimeoutMs(args.timeout, false); + const command = this.isWindowsBash ? rewriteWindowsNullRedirect(args.command) : args.command; + const effectiveCwd = args.cwd ?? this.cwd; + const description = startsInBackground ? args.description!.trim() : foregroundDescription(args); + const timeoutMs = startsInBackground + ? args.disable_timeout + ? undefined + : normalizeTimeoutMs(args.timeout, true) + : foregroundTimeoutMs; + const builder = new ToolResultBuilder(); let proc: KaosProcess; - const command = this.isWindowsBash ? rewriteWindowsNullRedirect(args.command) : args.command; try { - const effectiveCwd = args.cwd ?? this.cwd; proc = await this.spawn(effectiveCwd, command); } catch (error) { return { @@ -248,187 +237,138 @@ export class BashTool implements BuiltinTool { output: error instanceof Error ? error.message : String(error), }; } + closeProcessStdin(proc); + + let collectForegroundOutput = !startsInBackground; + const onProcessOutput = startsInBackground + ? undefined + : (kind: 'stdout' | 'stderr', text: string): void => { + if (!collectForegroundOutput) return; + onUpdate?.({ kind, text }); + builder.write(text); + }; + let taskId: string; try { - proc.stdin.end(); - } catch { - // Closing stdin on a process that has already exited is a no-op on - // some platforms and throws on others — either is safe to ignore. + taskId = this.backgroundManager.registerTask( + new ProcessBackgroundTask(proc, command, description, onProcessOutput), + { + detached: startsInBackground, + timeoutMs, + signal: startsInBackground ? undefined : signal, + }, + ); + } catch (error) { + collectForegroundOutput = false; + await killSpawnedProcess(proc); + return { + isError: true, + output: error instanceof Error ? error.message : String(error), + }; } - let timedOut = false; - let aborted = false; - let killed = false; - - const killProc = async (): Promise => { - if (killed) return; - killed = true; - try { - await proc.kill('SIGTERM'); - } catch { - /* process already gone */ - } - const exited = proc - .wait() - .then(() => true) - .catch(() => true); - const raced = await Promise.race([ - exited, - new Promise((resolve) => { - setTimeout(() => { - resolve(false); - }, SIGTERM_GRACE_MS); - }), - ]); - if (!raced && proc.exitCode === null) { - try { - await proc.kill('SIGKILL'); - } catch { - /* ignore */ - } - } - - try { - proc.stdout.destroy(); - } catch { - /* ignore */ - } - try { - proc.stderr.destroy(); - } catch { - /* ignore */ - } - }; - - const onAbort = (): void => { - aborted = true; - void killProc(); - }; - signal.addEventListener('abort', onAbort); - - const timeoutHandle = setTimeout(() => { - timedOut = true; - void killProc(); - }, timeoutMs); + if (startsInBackground) { + return this.backgroundStartedResult(taskId, proc, description, { + title: 'Background task started', + brief: `Started ${taskId}`, + }); + } try { - const builder = new ToolResultBuilder(); - const isTerminating = (): boolean => timedOut || aborted || killed; - const [, exitCode] = await Promise.all([ - Promise.all([ - readStreamIntoBuilder(proc.stdout, builder, 'stdout', onUpdate, isTerminating), - readStreamIntoBuilder(proc.stderr, builder, 'stderr', onUpdate, isTerminating), - ]), - proc.wait(), - ]); - - if (timedOut) { - const timeoutLabel = - timeoutMs % 1000 === 0 ? `${String(timeoutMs / 1000)}s` : `${String(timeoutMs)}ms`; - return builder.error(`Command killed by timeout (${timeoutLabel})`, { - brief: `Killed by timeout (${timeoutLabel})`, - }); - } - if (aborted) { - return builder.error('Interrupted by user', { brief: 'Interrupted by user' }); - } - - const isError = exitCode !== 0; - if (isError && builder.nChars === 0) { - builder.write(`Process exited with code ${String(exitCode)}`); + const release = await this.backgroundManager.waitForForegroundRelease(taskId); + if (release === 'detached') { + collectForegroundOutput = false; + return this.backgroundStartedResult( + taskId, + proc, + description, + { + title: 'Task moved to background', + brief: `Backgrounded ${taskId}`, + }, + builder, + ); } - if (!isError) { - return builder.ok('Command executed successfully.'); - } - return builder.error(`Command failed with exit code: ${String(exitCode)}.`, { - brief: `Failed with exit code: ${String(exitCode)}`, - }); - } catch (error) { - return { - isError: true, - output: error instanceof Error ? error.message : String(error), - }; + return this.foregroundCompletionResult(taskId, proc, builder, foregroundTimeoutMs); } finally { - clearTimeout(timeoutHandle); - signal.removeEventListener('abort', onAbort); + collectForegroundOutput = false; } } - private async executeInBackground(args: BashInput): Promise { - if (!this.backgroundManager) { + private validateRunRequest( + args: BashInput, + signal: AbortSignal, + ): ExecutableToolResult | undefined { + if (signal.aborted) return { isError: true, output: 'Aborted before command started' }; + if (args.command.length === 0) return { isError: true, output: 'Command cannot be empty.' }; + if (args.run_in_background !== true) return undefined; + if (!this.allowBackground) { return { isError: true, - output: 'Background execution is not available (no BackgroundManager configured).', + output: + 'Background execution is not available for this agent because TaskOutput and TaskStop are not enabled.', }; } - const backgroundManager = this.backgroundManager; - if (!args.description?.trim()) { return { isError: true, output: 'description is required when run_in_background is true.', }; } + return undefined; + } - const timeoutMs = args.disable_timeout ? undefined : normalizeTimeoutMs(args.timeout, true); - - let proc: KaosProcess; - const command = this.isWindowsBash ? rewriteWindowsNullRedirect(args.command) : args.command; - try { - const effectiveCwd = args.cwd ?? this.cwd; - proc = await this.spawn(effectiveCwd, command); - } catch (error) { - return { - isError: true, - output: error instanceof Error ? error.message : String(error), - }; + private foregroundCompletionResult( + taskId: string, + proc: KaosProcess, + builder: ToolResultBuilder, + foregroundTimeoutMs: number, + ): ExecutableToolResult { + const current = this.backgroundManager.getTask(taskId); + const exitCode = current?.kind === 'process' ? current.exitCode : proc.exitCode; + if (current?.status === 'timed_out') { + const timeoutLabel = formatTimeoutLabel(foregroundTimeoutMs); + return builder.error(`Command killed by timeout (${timeoutLabel})`, { + brief: `Killed by timeout (${timeoutLabel})`, + }); } - - try { - proc.stdin.end(); - } catch { - /* process already gone */ + if (current?.status === 'killed' && current.stopReason === USER_INTERRUPT_REASON) { + return builder.error(USER_INTERRUPT_REASON, { brief: USER_INTERRUPT_REASON }); + } + if ( + (current?.status === 'failed' || current?.status === 'killed') && + current.stopReason !== undefined + ) { + return builder.error(current.stopReason, { brief: current.stopReason }); } - let taskId: string; - try { - taskId = backgroundManager.registerTask( - new ProcessBackgroundTask(proc, command, args.description.trim()), - ); - } catch (error) { - try { - await proc.kill('SIGTERM'); - } catch { - /* process already gone */ - } - return { - isError: true, - output: error instanceof Error ? error.message : String(error), - }; + const isError = exitCode !== 0; + if (isError && builder.nChars === 0) { + builder.write(`Process exited with code ${String(exitCode)}`); } - if (timeoutMs !== undefined) { - const timeoutHandle = setTimeout(() => { - void (async (): Promise => { - if (proc.exitCode !== null) return; - const info = backgroundManager.getTask(taskId); - if (info && info.status === 'running') { - void backgroundManager.stop(taskId, 'Timed out'); - } - })(); - }, timeoutMs); - timeoutHandle.unref?.(); + if (!isError) { + return builder.ok('Command executed successfully.'); } + return builder.error(`Command failed with exit code: ${String(exitCode)}.`, { + brief: `Failed with exit code: ${String(exitCode)}`, + }); + } - // registerTask() synchronously inserts taskId into the manager's Map, so - // this lookup in the same tick cannot return undefined. - const status = backgroundManager.getTask(taskId)!.status; - const builder = new ToolResultBuilder(); + private backgroundStartedResult( + taskId: string, + proc: KaosProcess, + description: string, + labels: { title: string; brief: string }, + builder = new ToolResultBuilder(), + ): ExecutableToolResult { + const status = this.backgroundManager.getTask(taskId)?.status ?? 'running'; + if (builder.nChars > 0) builder.write('\n'); builder.write( `task_id: ${taskId}\n` + `pid: ${String(proc.pid)}\n` + - `description: ${args.description.trim()}\n` + + `description: ${description}\n` + `status: ${status}\n` + `automatic_notification: true\n` + 'next_step: You will be automatically notified when it completes.\n' + @@ -436,41 +376,35 @@ export class BashTool implements BuiltinTool { 'next_step: Use TaskStop only if the task must be cancelled.\n' + 'human_shell_hint: Tell the human to run /tasks to open the interactive background-task panel.', ); - return builder.ok('Background task started', { brief: `Started ${taskId}` }); + return builder.ok(labels.title, { brief: labels.brief }); } } -async function readStreamIntoBuilder( - stream: Readable, - builder: ToolResultBuilder, - kind: 'stdout' | 'stderr', - onUpdate?: ((update: ToolUpdate) => void) | undefined, - suppressPrematureClose?: () => boolean, -): Promise { - const decoder = new StringDecoder('utf8'); +function formatTimeoutLabel(timeoutMs: number): string { + return timeoutMs % 1000 === 0 ? `${String(timeoutMs / 1000)}s` : `${String(timeoutMs)}ms`; +} + +function foregroundDescription(args: BashInput): string { + const explicit = args.description?.trim(); + if (explicit !== undefined && explicit.length > 0) return explicit; + const preview = args.command.length > 60 ? `${args.command.slice(0, 60)}…` : args.command; + return `Bash: ${preview}`; +} + +function closeProcessStdin(proc: KaosProcess): void { try { - for await (const chunk of stream) { - const buf: Buffer = - typeof chunk === 'string' ? Buffer.from(chunk, 'utf8') : (chunk as Buffer); - const text = decoder.write(buf); - if (text.length > 0) onUpdate?.({ kind, text }); - builder.write(text); - } - } catch (error) { - if (!isPrematureCloseError(error) || suppressPrematureClose?.() !== true) { - throw error; - } + proc.stdin.end(); + } catch { + /* process already gone */ } - const trailing = decoder.end(); - if (trailing.length > 0) onUpdate?.({ kind, text: trailing }); - builder.write(trailing); } -function isPrematureCloseError(error: unknown): boolean { - return ( - error instanceof Error && - (error as NodeJS.ErrnoException).code === 'ERR_STREAM_PREMATURE_CLOSE' - ); +async function killSpawnedProcess(proc: KaosProcess): Promise { + try { + await proc.kill('SIGTERM'); + } catch { + /* process already gone */ + } } function shellQuote(s: string): string { diff --git a/packages/agent-core/src/utils/promise.ts b/packages/agent-core/src/utils/promise.ts new file mode 100644 index 000000000..9b51d4816 --- /dev/null +++ b/packages/agent-core/src/utils/promise.ts @@ -0,0 +1,11 @@ +import { sleep } from '@antfu/utils'; + +const NEVER = new Promise(() => {}); + +export function timeoutOutcome( + timeoutMs: number | undefined, + outcome: Outcome, +): Promise { + if (timeoutMs === undefined || timeoutMs <= 0) return NEVER; + return sleep(timeoutMs).then(() => outcome); +} diff --git a/packages/agent-core/test/agent/background/agent-timeout.test.ts b/packages/agent-core/test/agent/background/agent-timeout.test.ts index ef7a50638..dbcc47d49 100644 --- a/packages/agent-core/test/agent/background/agent-timeout.test.ts +++ b/packages/agent-core/test/agent/background/agent-timeout.test.ts @@ -1,9 +1,9 @@ /** - * AgentBackgroundTask `timeoutMs` option. + * BackgroundManager task timeout using AgentBackgroundTask metadata. * * Semantics: - * - external deadline fires → status=`timed_out` - * - no `timeoutMs` → the task runs to completion without a wrapper + * - manager-owned deadline fires → status=`timed_out` + * - no `timeoutMs` → the task runs to completion without a manager deadline * - internal `TimeoutError` rejection (e.g. aiohttp sock_read) is a * generic `failed` with no stop reason — the timeout reason must * only be set for the caller-driven deadline @@ -11,8 +11,7 @@ import { afterEach, describe, expect, it, vi } from 'vitest'; -import { AgentBackgroundTask } from '../../../src/agent/background'; -import { createBackgroundManager } from './helpers'; +import { agentTask, createBackgroundManager } from './helpers'; describe('AgentBackgroundTask — timeoutMs', () => { afterEach(() => { @@ -24,25 +23,24 @@ describe('AgentBackgroundTask — timeoutMs', () => { vi.useFakeTimers({ toFake: ['setTimeout', 'clearTimeout'] }); // A never-resolving completion — only the deadline will fire. const hangForever = new Promise<{ result: string }>(() => {}); - const taskId = manager.registerTask(new AgentBackgroundTask(hangForever, 'hang', { timeoutMs: 2_000 })); + const taskId = manager.registerTask(agentTask(hangForever, 'hang'), { timeoutMs: 2_000 }); - // Advance past the deadline; awaitTerminal resolves once the race - // finishes and the `.finally` block runs. + // Advance past the deadline and manager-owned stop grace. const terminalPromise = manager.wait(taskId); - await vi.advanceTimersByTimeAsync(2_100); + await vi.advanceTimersByTimeAsync(7_100); const info = await terminalPromise; expect(info?.status).toBe('timed_out'); expect(info?.stopReason).toBeUndefined(); }); - it('omitting timeoutMs lets the task run to completion (no wrapper)', async () => { + it('omitting timeoutMs lets the task run to completion without a manager deadline', async () => { const { manager } = createBackgroundManager(); let resolveFn!: (r: { result: string }) => void; const completion = new Promise<{ result: string }>((res) => { resolveFn = res; }); - const taskId = manager.registerTask(new AgentBackgroundTask(completion, 'no deadline')); + const taskId = manager.registerTask(agentTask(completion, 'no deadline')); resolveFn({ result: 'finished' }); const info = await manager.wait(taskId); @@ -58,9 +56,9 @@ describe('AgentBackgroundTask — timeoutMs', () => { const internalErr = new Error('aiohttp sock_read timeout'); internalErr.name = 'TimeoutError'; const rejecting = Promise.reject(internalErr); - const taskId = manager.registerTask(new AgentBackgroundTask(rejecting, 'internal timeout', { + const taskId = manager.registerTask(agentTask(rejecting, 'internal timeout'), { timeoutMs: 900_000, - })); + }); const info = await manager.wait(taskId); expect(info?.status).toBe('failed'); @@ -81,9 +79,9 @@ describe('AgentBackgroundTask — timeoutMs', () => { it('explicit timeoutMs is persisted on the task info', () => { const { manager } = createBackgroundManager(); vi.useFakeTimers({ toFake: ['setTimeout', 'clearTimeout'] }); - const taskId = manager.registerTask(new AgentBackgroundTask(new Promise(() => {}), 'persist timeout', { + const taskId = manager.registerTask(agentTask(new Promise(() => {}), 'persist timeout'), { timeoutMs: 1_800_000, - })); + }); const info = manager.getTask(taskId); expect((info as unknown as { timeoutMs?: number }).timeoutMs).toBe(1_800_000); }); @@ -102,7 +100,7 @@ describe('AgentBackgroundTask — timeoutMs', () => { // registerAgentTask, the assertion below catches it. it('omitted timeoutMs leaves the task info field undefined', () => { const { manager } = createBackgroundManager(); - const taskId = manager.registerTask(new AgentBackgroundTask(new Promise(() => {}), 'default timeout')); + const taskId = manager.registerTask(agentTask(new Promise(() => {}), 'default timeout')); const info = manager.getTask(taskId); expect((info as unknown as { timeoutMs?: number }).timeoutMs).toBeUndefined(); }); @@ -111,14 +109,14 @@ describe('AgentBackgroundTask — timeoutMs', () => { // as "record the value but do NOT arm a deadline" rather than // Python's "fire immediately" semantics. The field is preserved on // the task info so shutdown wait-caps / UI can read it; the - // deadline-arming check (`opts.timeoutMs > 0`) deliberately skips + // deadline-arming check (`timeoutMs > 0`) deliberately skips // zero so a caller writing `0` does not lose its task to an // immediate kill. it('timeoutMs=0 is preserved on the task info and does not arm a deadline', async () => { const { manager } = createBackgroundManager(); - const taskId = manager.registerTask(new AgentBackgroundTask(new Promise(() => {}), 'zero timeout', { + const taskId = manager.registerTask(agentTask(new Promise(() => {}), 'zero timeout'), { timeoutMs: 0, - })); + }); // The literal zero is preserved on the task info. const initial = manager.getTask(taskId); expect((initial as unknown as { timeoutMs?: number }).timeoutMs).toBe(0); diff --git a/packages/agent-core/test/agent/background/helpers.ts b/packages/agent-core/test/agent/background/helpers.ts index c11eec1ff..fe8bc7cc5 100644 --- a/packages/agent-core/test/agent/background/helpers.ts +++ b/packages/agent-core/test/agent/background/helpers.ts @@ -2,11 +2,13 @@ import type { KaosProcess } from '@moonshot-ai/kaos'; import { vi } from 'vitest'; import { + AgentBackgroundTask, BackgroundManager, BackgroundTaskPersistence, ProcessBackgroundTask, type BackgroundTaskInfo, } from '../../../src/agent/background'; +import type { SessionSubagentHost, SubagentHandle } from '../../../src/session/subagent-host'; import type { AgentEvent } from '../../../src/rpc/events'; export interface FakeBackgroundAgent { @@ -65,6 +67,30 @@ export function registerProcess( return manager.registerTask(new ProcessBackgroundTask(proc, command, description)); } +export function agentTask( + completion: Promise<{ result: string }>, + description: string, + options: { + readonly agentId?: string; + readonly subagentType?: string; + readonly subagentHost?: Pick; + readonly abortController?: AbortController; + } = {}, +): AgentBackgroundTask { + const handle: SubagentHandle = { + agentId: options.agentId ?? 'agent-child', + profileName: options.subagentType ?? 'coder', + resumed: false, + completion, + }; + return new AgentBackgroundTask( + handle, + description, + options.subagentHost ?? { markActiveChildDetached: vi.fn() }, + options.abortController ?? new AbortController(), + ); +} + export async function waitForTerminal( manager: BackgroundManager, taskId: string, diff --git a/packages/agent-core/test/agent/background/ids.test.ts b/packages/agent-core/test/agent/background/ids.test.ts index a4acfbd1e..25fb1cbaa 100644 --- a/packages/agent-core/test/agent/background/ids.test.ts +++ b/packages/agent-core/test/agent/background/ids.test.ts @@ -8,8 +8,8 @@ import type { Writable } from 'node:stream'; import type { KaosProcess } from '@moonshot-ai/kaos'; import { describe, expect, it, vi } from 'vitest'; -import { AgentBackgroundTask, BackgroundTaskPersistence } from '../../../src/agent/background'; -import { createBackgroundManager, registerProcess } from './helpers'; +import { BackgroundTaskPersistence } from '../../../src/agent/background'; +import { agentTask, createBackgroundManager, registerProcess } from './helpers'; function pendingProcess(): KaosProcess { return { @@ -35,7 +35,7 @@ describe('background task id format', () => { it('assigns agent-prefixed ids to agent tasks', () => { const { manager } = createBackgroundManager(); const id = manager.registerTask( - new AgentBackgroundTask(new Promise(() => {}), 'agent task'), + agentTask(new Promise(() => {}), 'agent task'), ); expect(id).toMatch(/^agent-[0-9a-z]{8}$/); diff --git a/packages/agent-core/test/agent/background/manager.test.ts b/packages/agent-core/test/agent/background/manager.test.ts index 0dc043263..cbbea653a 100644 --- a/packages/agent-core/test/agent/background/manager.test.ts +++ b/packages/agent-core/test/agent/background/manager.test.ts @@ -12,11 +12,12 @@ import type { KaosProcess } from '@moonshot-ai/kaos'; import { afterEach, describe, expect, it, vi } from 'vitest'; import { - AgentBackgroundTask, BackgroundTaskPersistence, + ProcessBackgroundTask, type BackgroundManager, } from '../../../src/agent/background'; import { + agentTask, createBackgroundManager, registerProcess, waitForOutput, @@ -131,15 +132,6 @@ function processWithVisibleExitCodeBeforeWait(exitCode = 143): { }; } -function waiterCount(manager: BackgroundManager, taskId: string): number { - const tasks = ( - manager as unknown as { - tasks: Map void> }>; - } - ).tasks; - return tasks.get(taskId)?.waiters.length ?? 0; -} - describe('BackgroundManager', () => { afterEach(() => { vi.useRealTimers(); @@ -166,7 +158,7 @@ describe('BackgroundManager', () => { const { manager } = createBackgroundManager(); const taskId = manager.registerTask( - new AgentBackgroundTask(new Promise(() => {}), 'investigate bug', { + agentTask(new Promise(() => {}), 'investigate bug', { agentId: 'agent-child', subagentType: 'coder', }), @@ -183,6 +175,94 @@ describe('BackgroundManager', () => { }); }); + it('tracks foreground tasks and releases their waiter when detached', async () => { + const { manager } = createBackgroundManager(); + const taskId = manager.registerTask( + agentTask(new Promise(() => {}), 'foreground agent'), + { detached: false }, + ); + + expect(manager.getTask(taskId)).toMatchObject({ + detached: false, + }); + + const waiting = manager.waitForForegroundRelease(taskId); + await Promise.resolve(); + + expect(manager.detach(taskId)).toMatchObject({ + taskId, + detached: true, + }); + await expect(waiting).resolves.toBe('detached'); + }); + + it('releases foreground waiters when a foreground task completes', async () => { + const { agent, manager } = createBackgroundManager(); + const taskId = manager.registerTask( + agentTask(Promise.resolve({ result: 'done' }), 'foreground agent'), + { detached: false }, + ); + + await expect(manager.waitForForegroundRelease(taskId)).resolves.toBe('terminal'); + expect(manager.getTask(taskId)).toMatchObject({ + detached: false, + status: 'completed', + }); + expect(agent.turn.steer).not.toHaveBeenCalled(); + }); + + it('stops foreground tasks from their register-time signal', async () => { + const { manager } = createBackgroundManager(); + const { proc, killSpy } = pendingProcess(); + const controller = new AbortController(); + const taskId = manager.registerTask( + new ProcessBackgroundTask(proc, 'sleep 10', 'foreground process'), + { + detached: false, + signal: controller.signal, + }, + ); + + const waiting = manager.waitForForegroundRelease(taskId); + controller.abort(); + + await expect(waiting).resolves.toBe('terminal'); + expect(killSpy).toHaveBeenCalledWith('SIGTERM'); + expect(manager.getTask(taskId)).toMatchObject({ + status: 'killed', + stopReason: 'Interrupted by user', + }); + }); + + it('does not count foreground tasks against the detached task limit', () => { + const { manager } = createBackgroundManager({ maxRunningTasks: 1 }); + manager.registerTask(agentTask(new Promise(() => {}), 'foreground agent'), { + detached: false, + }); + + manager.registerTask(agentTask(new Promise(() => {}), 'background agent')); + + expect(() => { + manager.registerTask(agentTask(new Promise(() => {}), 'second background')); + }).toThrow('Too many background tasks are already running.'); + }); + + it('does not count foreground tasks detached later against the background task limit', () => { + const { manager } = createBackgroundManager({ maxRunningTasks: 1 }); + const taskId = manager.registerTask( + agentTask(new Promise(() => {}), 'foreground agent'), + { detached: false }, + ); + + manager.detach(taskId); + + manager.registerTask(agentTask(new Promise(() => {}), 'background agent')); + + expect(() => { + manager.registerTask(agentTask(new Promise(() => {}), 'second background')); + }).toThrow('Too many background tasks are already running.'); + }); + it('lists active tasks by default', () => { const { manager } = createBackgroundManager(); registerProcess(manager, pendingProcess().proc, 'sleep 60', 'task 1'); @@ -200,7 +280,7 @@ describe('BackgroundManager', () => { registerProcess(manager, pendingProcess().proc, 'sleep 60', 'second task'); }).toThrow('Too many background tasks are already running.'); expect(() => { - manager.registerTask(new AgentBackgroundTask(new Promise(() => {}), 'agent task')); + manager.registerTask(agentTask(new Promise(() => {}), 'agent task')); }).toThrow('Too many background tasks are already running.'); }); @@ -350,9 +430,10 @@ describe('BackgroundManager', () => { const completion = new Promise<{ result: string }>((resolve) => { resolveCompletion = resolve; }); - const abort = vi.fn(); + const controller = new AbortController(); + const abort = vi.spyOn(controller, 'abort'); const taskId = manager.registerTask( - new AgentBackgroundTask(completion, 'agent race test', { abort }), + agentTask(completion, 'agent race test', { abortController: controller }), ); const stopPromise = manager.stop(taskId, 'user requested'); @@ -371,9 +452,10 @@ describe('BackgroundManager', () => { const completion = new Promise<{ result: string }>((_resolve, reject) => { rejectCompletion = reject; }); - const abort = vi.fn(); + const controller = new AbortController(); + const abort = vi.spyOn(controller, 'abort'); const taskId = manager.registerTask( - new AgentBackgroundTask(completion, 'agent failure race test', { abort }), + agentTask(completion, 'agent failure race test', { abortController: controller }), ); const stopPromise = manager.stop(taskId, 'user requested'); @@ -395,11 +477,13 @@ describe('BackgroundManager', () => { }); const abortError = new Error('The operation was aborted.'); abortError.name = 'AbortError'; - const abort = vi.fn(() => { + const controller = new AbortController(); + const abort = vi.spyOn(controller, 'abort').mockImplementation((reason?: unknown) => { + AbortController.prototype.abort.call(controller, reason); rejectCompletion(abortError); }); const taskId = manager.registerTask( - new AgentBackgroundTask(completion, 'agent abort test', { abort }), + agentTask(completion, 'agent abort test', { abortController: controller }), ); const result = await manager.stop(taskId, 'user requested'); @@ -414,9 +498,10 @@ describe('BackgroundManager', () => { it('stop finalizes a never-settling agent task after the grace window', async () => { vi.useFakeTimers(); const { manager } = createBackgroundManager(); - const abort = vi.fn(); + const controller = new AbortController(); + const abort = vi.spyOn(controller, 'abort'); const taskId = manager.registerTask( - new AgentBackgroundTask(new Promise(() => {}), 'hung agent task', { abort }), + agentTask(new Promise(() => {}), 'hung agent task', { abortController: controller }), ); const stopPromise = manager.stop(taskId, 'user requested'); @@ -431,7 +516,7 @@ describe('BackgroundManager', () => { expect(abort).toHaveBeenCalled(); }); - it('wait resolves on completion and removes timed-out waiters', async () => { + it('wait resolves on completion and returns the current snapshot on timeout', async () => { const { manager } = createBackgroundManager(); const completedId = registerProcess(manager, immediateProcess(0), 'echo fast', 'wait test'); @@ -439,7 +524,6 @@ describe('BackgroundManager', () => { const runningId = registerProcess(manager, pendingProcess().proc, 'sleep 60', 'timeout'); expect(await manager.wait(runningId, 0)).toMatchObject({ status: 'running' }); - expect(waiterCount(manager, runningId)).toBe(0); }); it('returns undefined or empty output for unknown task ids', async () => { diff --git a/packages/agent-core/test/agent/background/persist.test.ts b/packages/agent-core/test/agent/background/persist.test.ts index 4dc0305ff..3406ff9f4 100644 --- a/packages/agent-core/test/agent/background/persist.test.ts +++ b/packages/agent-core/test/agent/background/persist.test.ts @@ -27,6 +27,7 @@ function sample(overrides: Partial { it('emits background.task.started for agent tasks', () => { const { agent, manager } = createBackgroundManager(); const taskId = manager.registerTask( - new AgentBackgroundTask(new Promise(() => {}), 'agent task'), + agentTask(new Promise(() => {}), 'agent task'), ); expect(agent.emittedEvents).toContainEqual({ @@ -155,15 +155,19 @@ describe('BackgroundManager — event emission', () => { }); it('tracks failed and timed-out terminal statuses', async () => { + vi.useFakeTimers({ toFake: ['setTimeout', 'clearTimeout'] }); const { agent, manager } = createBackgroundManager(); const failedId = registerProcess(manager, immediateProcess(1), 'false', 'failed'); const timedOutId = manager.registerTask( - new AgentBackgroundTask(new Promise(() => {}), 'slow agent', { timeoutMs: 1 }), + agentTask(new Promise(() => {}), 'slow agent'), + { timeoutMs: 1 }, ); agent.telemetry.track.mockClear(); await manager.wait(failedId); - await manager.wait(timedOutId); + const timedOut = manager.wait(timedOutId); + await vi.advanceTimersByTimeAsync(5_010); + await timedOut; expect(agent.telemetry.track).toHaveBeenCalledWith( 'background_task_completed', @@ -229,7 +233,7 @@ describe('BackgroundManager — notification delivery', () => { it('steers completed agent task notifications into the turn flow', async () => { const { agent, manager } = createBackgroundManager(); const taskId = manager.registerTask( - new AgentBackgroundTask( + agentTask( Promise.resolve({ result: 'final subagent summary' }), 'agent task', ), @@ -453,7 +457,7 @@ describe('BackgroundManager — notification delivery', () => { hooks: { fireAndForgetTrigger }, }); const taskId = manager.registerTask( - new AgentBackgroundTask( + agentTask( Promise.resolve({ result: 'final agent output' }), 'inspect repository', ), @@ -487,7 +491,7 @@ describe('BackgroundManager — notification delivery', () => { hooks: { fireAndForgetTrigger }, }); const taskId = manager.registerTask( - new AgentBackgroundTask( + agentTask( Promise.resolve({ result: 'final agent output' }), 'inspect repository', ), @@ -533,7 +537,7 @@ describe('BackgroundManager — agent recovery notification bodies', () => { it('failed agent task body includes resume instructions with the correct agent_id', async () => { const { agent, manager } = createBackgroundManager(); const taskId = manager.registerTask( - new AgentBackgroundTask( + agentTask( Promise.reject(new Error('subagent crashed')), 'inspect repository', { agentId: 'agent-7' }, @@ -555,7 +559,7 @@ describe('BackgroundManager — agent recovery notification bodies', () => { it('completed agent task body does not add resume instructions', async () => { const { agent, manager } = createBackgroundManager(); const taskId = manager.registerTask( - new AgentBackgroundTask( + agentTask( Promise.resolve({ result: 'all good' }), 'inspect repository', { agentId: 'agent-8' }, diff --git a/packages/agent-core/test/agent/bg-idle-notification-repro.test.ts b/packages/agent-core/test/agent/bg-idle-notification-repro.test.ts index 0c5bc79a3..1db6d527f 100644 --- a/packages/agent-core/test/agent/bg-idle-notification-repro.test.ts +++ b/packages/agent-core/test/agent/bg-idle-notification-repro.test.ts @@ -23,7 +23,8 @@ import { join } from 'pathe'; import { describe, expect, it, vi } from 'vitest'; import { testAgent } from './harness/agent'; -import { AgentBackgroundTask, BackgroundTaskPersistence } from '../../src/agent/background'; +import { BackgroundTaskPersistence } from '../../src/agent/background'; +import { agentTask } from './background/helpers'; describe('background notification → main agent (real Agent instance)', () => { it('IDLE: completed bg agent auto-starts a new turn with XML', async () => { @@ -36,7 +37,7 @@ describe('background notification → main agent (real Agent instance)', () => { // The expected auto-launched turn will call generate once, then end. ctx.mockNextResponse({ type: 'text', text: 'ack from main agent' }); - const taskId = ctx.agent.background.registerTask(new AgentBackgroundTask( + const taskId = ctx.agent.background.registerTask(agentTask( Promise.resolve({ result: 'background agent finished its job' }), 'idle-state repro', )); @@ -93,7 +94,7 @@ describe('background notification → main agent (real Agent instance)', () => { // Right after kicking off, register a background task that // completes immediately. The notification should be steer()d // while activeTurn is still set, landing in the steerBuffer. - const taskId = ctx.agent.background.registerTask(new AgentBackgroundTask( + const taskId = ctx.agent.background.registerTask(agentTask( Promise.resolve({ result: 'busy-state bg result' }), 'busy-state repro', )); @@ -132,15 +133,15 @@ describe('background notification → main agent (real Agent instance)', () => { ctx.mockNextResponse({ type: 'text', text: 'ack group' }); const taskIds = [ - ctx.agent.background.registerTask(new AgentBackgroundTask( + ctx.agent.background.registerTask(agentTask( Promise.resolve({ result: 'bg #1 result' }), 'group-1', )), - ctx.agent.background.registerTask(new AgentBackgroundTask( + ctx.agent.background.registerTask(agentTask( Promise.resolve({ result: 'bg #2 result' }), 'group-2', )), - ctx.agent.background.registerTask(new AgentBackgroundTask( + ctx.agent.background.registerTask(agentTask( Promise.resolve({ result: 'bg #3 result' }), 'group-3', )), @@ -205,7 +206,7 @@ describe('background notification → main agent (real Agent instance)', () => { // completion — this is the IDLE path, NOT the racy one. We // queue an LLM response so the auto-launched turn can run. ctx.mockNextResponse({ type: 'text', text: 'auto ack from bg notification' }); - const taskId = ctx.agent.background.registerTask(new AgentBackgroundTask( + const taskId = ctx.agent.background.registerTask(agentTask( Promise.resolve({ result: 'post-turn bg result' }), 'race-after-turn', )); diff --git a/packages/agent-core/test/session/lifecycle-hooks.test.ts b/packages/agent-core/test/session/lifecycle-hooks.test.ts index 99b1dc1c0..799a52c86 100644 --- a/packages/agent-core/test/session/lifecycle-hooks.test.ts +++ b/packages/agent-core/test/session/lifecycle-hooks.test.ts @@ -11,7 +11,8 @@ import { afterEach, describe, expect, it, vi } from 'vitest'; import type { SDKSessionRPC } from '../../src/rpc'; import { Session } from '../../src/session'; -import { AgentBackgroundTask, ProcessBackgroundTask } from '../../src/agent/background'; +import { ProcessBackgroundTask } from '../../src/agent/background'; +import { agentTask } from '../agent/background/helpers'; const tempDirs: string[] = []; @@ -200,10 +201,11 @@ describe('Session lifecycle hooks', () => { turnSettled.resolve(); }); vi.spyOn(child.turn, 'hasActiveTurn', 'get').mockReturnValue(true); - const abort = vi.fn(); + const abortController = new AbortController(); + const abort = vi.spyOn(abortController, 'abort'); const taskId = main.background.registerTask( - new AgentBackgroundTask(new Promise(() => {}), 'keep background agent alive', { - abort, + agentTask(new Promise(() => {}), 'keep background agent alive', { + abortController, agentId: childId, subagentType: 'coder', }), diff --git a/packages/agent-core/test/tools/agent.test.ts b/packages/agent-core/test/tools/agent.test.ts index de8f11865..fbd622057 100644 --- a/packages/agent-core/test/tools/agent.test.ts +++ b/packages/agent-core/test/tools/agent.test.ts @@ -4,10 +4,9 @@ import { ToolAccesses } from '../../src/loop'; import type { Logger, LogPayload } from '../../src/logging'; import type { ResolvedAgentProfile } from '../../src/profile'; import type { SessionSubagentHost } from '../../src/session/subagent-host'; -import { AgentBackgroundTask } from '../../src/agent/background'; import { AgentTool, AgentToolInputSchema } from '../../src/tools/builtin/collaboration/agent'; import { userCancellationReason } from '../../src/utils/abort'; -import { createBackgroundManager } from '../agent/background/helpers'; +import { agentTask, createBackgroundManager } from '../agent/background/helpers'; import { executeTool } from './fixtures/execute-tool'; const signal = new AbortController().signal; @@ -22,6 +21,15 @@ function mockSubagentHost & Partial return { resume: vi.fn(), ...host } as unknown as T & SessionSubagentHost; } +function agentTool( + host: SessionSubagentHost, + background = createBackgroundManager().manager, + subagents?: ResolvedAgentProfile['subagents'], + options?: ConstructorParameters[3], +): AgentTool { + return new AgentTool(host, background, subagents, options); +} + interface CapturedLogEntry { readonly level: 'error' | 'warn' | 'info' | 'debug'; readonly message: string; @@ -66,7 +74,7 @@ describe('AgentTool', () => { it('exposes run_in_background and not runInBackground in the JSON schema', () => { const host = mockSubagentHost({ spawn: vi.fn() }); - const tool = new AgentTool(host); + const tool = agentTool(host); const properties = (tool.parameters as { properties: Record }).properties; expect(properties).toHaveProperty('run_in_background'); @@ -75,7 +83,7 @@ describe('AgentTool', () => { it('describes subagent_type and run_in_background parameters', () => { const host = mockSubagentHost({ spawn: vi.fn() }); - const tool = new AgentTool(host); + const tool = agentTool(host); const properties = ( tool.parameters as { properties: Record; @@ -94,7 +102,7 @@ describe('AgentTool', () => { it('does not expose a timeout parameter in the JSON schema', () => { const host = mockSubagentHost({ spawn: vi.fn() }); - const tool = new AgentTool(host); + const tool = agentTool(host); const properties = (tool.parameters as { properties: Record }).properties; expect(properties).not.toHaveProperty('timeout'); @@ -102,7 +110,7 @@ describe('AgentTool', () => { it('explains the fixed background subagent timeout', () => { const host = mockSubagentHost({ spawn: vi.fn() }); - const tool = new AgentTool(host, createBackgroundManager().manager); + const tool = agentTool(host); expect(tool.description).toContain('fixed 30-minute timeout'); expect(tool.description).not.toContain('operator-configured background timeout'); @@ -111,7 +119,7 @@ describe('AgentTool', () => { it('does not expose a model parameter in the JSON schema', () => { const host = mockSubagentHost({ spawn: vi.fn() }); - const tool = new AgentTool(host); + const tool = agentTool(host); const properties = (tool.parameters as { properties: Record }).properties; expect(properties).not.toHaveProperty('model'); @@ -132,7 +140,7 @@ describe('AgentTool', () => { }), }; - const tool = new AgentTool(host, undefined, subagents); + const tool = agentTool(host, createBackgroundManager().manager, subagents); expect(tool.description).toContain('Tools: Read, Grep, Glob'); expect(tool.description).toContain('Tools: Read, Write, Edit, Bash'); @@ -140,7 +148,7 @@ describe('AgentTool', () => { it('mentions resume preference and result visibility in the description', () => { const host = mockSubagentHost({ spawn: vi.fn() }); - const tool = new AgentTool(host); + const tool = agentTool(host); expect(tool.description.toLowerCase()).toContain('resume'); expect(tool.description.toLowerCase()).toContain('only visible to you'); @@ -181,7 +189,7 @@ describe('AgentTool', () => { coder: profile({ name: 'coder', description: 'General coding.' }), }; - const tool = new AgentTool(host, undefined, subagents); + const tool = agentTool(host, createBackgroundManager().manager, subagents); expect(tool.description).toContain('Available agent types'); expect(tool.description).toContain('- explore: Read-only exploration. Use for searches.'); @@ -197,7 +205,7 @@ describe('AgentTool', () => { completion: Promise.resolve({ result: 'child result' }), }), }); - const tool = new AgentTool(host); + const tool = agentTool(host); const result = await executeTool(tool, context({ @@ -231,7 +239,7 @@ describe('AgentTool', () => { completion: Promise.resolve({ result: 'child result' }), }), }); - const tool = new AgentTool(host); + const tool = agentTool(host); await executeTool(tool, context({ @@ -259,7 +267,7 @@ describe('AgentTool', () => { completion: Promise.resolve({ result: 'resumed result' }), }), }); - const tool = new AgentTool(host); + const tool = agentTool(host); const result = await executeTool(tool, context({ @@ -290,7 +298,7 @@ describe('AgentTool', () => { spawn: vi.fn(), resume: vi.fn(), }); - const tool = new AgentTool(host); + const tool = agentTool(host); const result = await executeTool(tool, context({ @@ -358,7 +366,7 @@ describe('AgentTool', () => { completion: Promise.resolve({ result: 'resumed result' }), }), }); - const tool = new AgentTool(host); + const tool = agentTool(host); const result = await executeTool(tool, context({ @@ -384,7 +392,7 @@ describe('AgentTool', () => { it('declares no resource accesses so concurrent Agent calls can run in parallel', async () => { const host = mockSubagentHost({ spawn: vi.fn() }); - const tool = new AgentTool(host); + const tool = agentTool(host); const execution = await tool.resolveExecution({ prompt: 'Investigate', description: 'Find cause', @@ -400,7 +408,7 @@ describe('AgentTool', () => { spawn: vi.fn(), getProfileName: vi.fn().mockReturnValue('explore'), }); - const tool = new AgentTool(host); + const tool = agentTool(host); const execution = await tool.resolveExecution({ prompt: 'Continue', description: 'Continue work', @@ -446,6 +454,56 @@ describe('AgentTool', () => { }); }); + it('can detach a foreground subagent through the background manager', async () => { + let resolveCompletion: (value: { result: string }) => void = () => {}; + const completion = new Promise<{ result: string }>((resolve) => { + resolveCompletion = resolve; + }); + const markActiveChildDetached = vi.fn(); + const host = mockSubagentHost({ + markActiveChildDetached, + spawn: vi.fn().mockResolvedValue({ + agentId: 'agent-child', + profileName: 'coder', + resumed: false, + completion, + }), + }); + const background = createBackgroundManager().manager; + const tool = new AgentTool(host, background); + + const running = executeTool(tool, + context({ + prompt: 'Investigate', + description: 'Find cause', + }), + ); + await vi.waitFor(() => { + expect(background.list(false)).toHaveLength(1); + }); + const task = background.list(false)[0]!; + + expect(task).toMatchObject({ + kind: 'agent', + detached: false, + agentId: 'agent-child', + }); + + background.detach(task.taskId); + const result = await running; + + expect(markActiveChildDetached).toHaveBeenCalledWith('agent-child'); + expect(result.output).toContain(`task_id: ${task.taskId}`); + expect(result.output).toContain('agent_id: agent-child'); + expect(result.output).toContain('automatic_notification: true'); + + resolveCompletion({ result: 'finished later' }); + await expect(background.wait(task.taskId)).resolves.toMatchObject({ + status: 'completed', + detached: true, + }); + }); + it('guides the AI with a non-blocking query hint and a resume hint on background launch', async () => { const host = mockSubagentHost({ spawn: vi.fn().mockResolvedValue({ @@ -485,7 +543,7 @@ describe('AgentTool', () => { expect(result.output).toMatch(/task\.lost|task\.failed|task\.killed/); }); - it('rejects background subagents when background management is unavailable', async () => { + it('rejects background subagents when background execution is disabled', async () => { const host = mockSubagentHost({ spawn: vi.fn().mockResolvedValue({ agentId: 'agent-child', @@ -494,7 +552,9 @@ describe('AgentTool', () => { completion: new Promise<{ result: string }>(() => {}), }), }); - const tool = new AgentTool(host); + const tool = agentTool(host, createBackgroundManager().manager, undefined, { + allowBackground: false, + }); expect(tool.description).toContain('Background agent execution is disabled for this agent.'); expect(tool.description).not.toContain('the subagent runs detached from this turn'); @@ -517,7 +577,7 @@ describe('AgentTool', () => { it('returns an error when background registration hits the task limit', async () => { const background = createBackgroundManager({ maxRunningTasks: 1 }).manager; - background.registerTask(new AgentBackgroundTask(new Promise(() => {}), 'existing agent')); + background.registerTask(agentTask(new Promise(() => {}), 'existing agent')); const host = mockSubagentHost({ spawn: vi.fn().mockResolvedValue({ agentId: 'agent-child', @@ -598,7 +658,7 @@ describe('AgentTool', () => { const host = mockSubagentHost({ spawn: vi.fn().mockRejectedValue(error), }); - const tool = new AgentTool(host, undefined, undefined, { log: logger }); + const tool = agentTool(host, createBackgroundManager().manager, undefined, { log: logger }); const result = await executeTool(tool, context({ prompt: 'Investigate', description: 'Find cause' }), @@ -692,7 +752,7 @@ describe('AgentTool', () => { }), ), }); - const tool = new AgentTool(host); + const tool = agentTool(host); const resultPromise = executeTool(tool, { turnId: '0', @@ -744,7 +804,7 @@ describe('AgentTool', () => { }), ), }); - const tool = new AgentTool(host); + const tool = agentTool(host); const resultPromise = executeTool(tool, context({ diff --git a/packages/agent-core/test/tools/background/task-tools.test.ts b/packages/agent-core/test/tools/background/task-tools.test.ts index ca16c4198..9d67639ee 100644 --- a/packages/agent-core/test/tools/background/task-tools.test.ts +++ b/packages/agent-core/test/tools/background/task-tools.test.ts @@ -12,7 +12,6 @@ import type { KaosProcess } from '@moonshot-ai/kaos'; import { afterEach, describe, expect, it, vi } from 'vitest'; import { - AgentBackgroundTask, BackgroundTaskPersistence, type BackgroundManager, type BackgroundTaskInfo, @@ -21,6 +20,7 @@ import { TaskListTool } from '../../../src/tools/background/task-list'; import { TaskOutputTool } from '../../../src/tools/background/task-output'; import { TaskStopTool } from '../../../src/tools/background/task-stop'; import { + agentTask, createBackgroundManager, registerProcess, waitForOutput, @@ -270,7 +270,7 @@ describe('TaskOutputTool', () => { it('returns agent metadata and final summary without process fields', async () => { const { manager } = createBackgroundManager(); const taskId = manager.registerTask( - new AgentBackgroundTask( + agentTask( Promise.resolve({ result: 'SUBAGENT-FINAL-SUMMARY\n' }), 'agent output test', { agentId: 'agent-child', subagentType: 'coder' }, @@ -336,12 +336,16 @@ describe('TaskOutputTool', () => { }); it('surfaces timeout terminal metadata', async () => { + vi.useFakeTimers({ toFake: ['setTimeout', 'clearTimeout'] }); const { manager } = createBackgroundManager(); const taskId = manager.registerTask( - new AgentBackgroundTask(new Promise(() => {}), 'will time out', { timeoutMs: 1 }), + agentTask(new Promise(() => {}), 'will time out'), + { timeoutMs: 1 }, ); - await manager.wait(taskId); + const terminal = manager.wait(taskId); + await vi.advanceTimersByTimeAsync(5_010); + await terminal; const content = await taskOutput(manager, taskId, true); expect(content).toContain('status: timed_out'); diff --git a/packages/agent-core/test/tools/bash-env.test.ts b/packages/agent-core/test/tools/bash-env.test.ts index f5d59439e..cda2a877a 100644 --- a/packages/agent-core/test/tools/bash-env.test.ts +++ b/packages/agent-core/test/tools/bash-env.test.ts @@ -4,6 +4,7 @@ import type { Environment, KaosProcess } from '@moonshot-ai/kaos'; import { describe, expect, it, vi } from 'vitest'; import { BashTool } from '../../src/tools/builtin/shell/bash'; +import { createBackgroundManager } from '../agent/background/helpers'; import { executeTool } from './fixtures/execute-tool'; import { createFakeKaos } from './fixtures/fake-kaos'; @@ -31,7 +32,11 @@ const signal = new AbortController().signal; async function captureSpawnEnv(): Promise> { const execWithEnv = vi.fn().mockResolvedValue(fakeProcess()); - const tool = new BashTool(createFakeKaos({ execWithEnv, osEnv: posixEnv }), '/workspace'); + const tool = new BashTool( + createFakeKaos({ execWithEnv, osEnv: posixEnv }), + '/workspace', + createBackgroundManager().manager, + ); await executeTool(tool, { turnId: '0', toolCallId: 'tc_env', @@ -72,7 +77,7 @@ describe('BashTool noninteractive env semantics', () => { KIMI_CODE_ENV: 'initial', }; const kaos = createFakeKaos({ execWithEnv, osEnv: posixEnv }).withEnv(sessionEnv); - const tool = new BashTool(kaos, '/workspace'); + const tool = new BashTool(kaos, '/workspace', createBackgroundManager().manager); await executeTool(tool, { turnId: '0', diff --git a/packages/agent-core/test/tools/bash.test.ts b/packages/agent-core/test/tools/bash.test.ts index a345445cf..b1f4ba076 100644 --- a/packages/agent-core/test/tools/bash.test.ts +++ b/packages/agent-core/test/tools/bash.test.ts @@ -26,11 +26,11 @@ const windowsBashEnv: Environment = { function processWithOutput( options: { - readonly stdout?: string | Buffer; - readonly stderr?: string | Buffer; - readonly exitCode?: number; - readonly wait?: () => Promise; - readonly kill?: (signal?: NodeJS.Signals) => Promise; + readonly stdout?: string | Buffer; + readonly stderr?: string | Buffer; + readonly exitCode?: number | null; + readonly wait?: () => Promise; + readonly kill?: (signal?: NodeJS.Signals) => Promise; } = {}, ): KaosProcess { const exitCode = options.exitCode ?? 0; @@ -81,6 +81,42 @@ function processWithInterleavedOutput( }; } +function pendingProcess(): { + readonly proc: KaosProcess; + readonly finish: (exitCode?: number) => void; +} { + const stdout = new PassThrough(); + const stderr = new PassThrough(); + let resolveWait: (exitCode: number) => void = () => {}; + let currentExitCode: number | null = null; + const waitPromise = new Promise((resolve) => { + resolveWait = resolve; + }); + const finish = (exitCode = 0): void => { + if (currentExitCode !== null) return; + currentExitCode = exitCode; + stdout.end(); + stderr.end(); + resolveWait(exitCode); + }; + return { + proc: { + stdin: { end: vi.fn(), write: vi.fn() } as unknown as Writable, + stdout, + stderr, + pid: 125, + get exitCode(): number | null { + return currentExitCode; + }, + wait: vi.fn(async () => waitPromise), + kill: vi.fn(async () => { + finish(143); + }) as KaosProcess['kill'], + }, + finish, + }; +} + function processWithVisibleExitBeforeWait(exitCode = 0): { proc: KaosProcess; finishWait: () => void; @@ -153,9 +189,18 @@ function context(args: BashInput, signal = new AbortController().signal) { return { turnId: '0', toolCallId: 'call_bash', args, signal }; } +function bashTool( + kaos: ConstructorParameters[0], + cwd = '/workspace', + manager = createBackgroundManager().manager, + options?: ConstructorParameters[3], +): BashTool { + return new BashTool(kaos, cwd, manager, options); +} + describe('BashTool', () => { it('exposes current metadata and schema', () => { - const tool = new BashTool(createFakeKaos({ osEnv: posixEnv }), '/workspace'); + const tool = bashTool(createFakeKaos({ osEnv: posixEnv }), '/workspace'); expect(tool.name).toBe('Bash'); expect(tool.parameters).toMatchObject({ @@ -204,7 +249,7 @@ describe('BashTool', () => { }); it('describes the cwd, command, run_in_background, description, and disable_timeout parameters', () => { - const tool = new BashTool(createFakeKaos({ osEnv: posixEnv }), '/workspace'); + const tool = bashTool(createFakeKaos({ osEnv: posixEnv }), '/workspace'); const properties = (tool.parameters as { properties: Record }) .properties; @@ -222,7 +267,7 @@ describe('BashTool', () => { }); it('exposes a default timeout in the JSON Schema', () => { - const tool = new BashTool(createFakeKaos({ osEnv: posixEnv }), '/workspace'); + const tool = bashTool(createFakeKaos({ osEnv: posixEnv }), '/workspace'); const properties = (tool.parameters as { properties: Record }) .properties; @@ -242,7 +287,7 @@ describe('BashTool', () => { resolveWait(143); }, }); - const tool = new BashTool( + const tool = bashTool( createFakeKaos({ execWithEnv: vi.fn().mockResolvedValue(proc), osEnv: posixEnv }), '/workspace', ); @@ -262,7 +307,7 @@ describe('BashTool', () => { }); it('renders the available commands section and the /tasks hint', () => { - const tool = new BashTool( + const tool = bashTool( createFakeKaos({ osEnv: posixEnv }), '/workspace', createBackgroundManager().manager, @@ -275,7 +320,11 @@ describe('BashTool', () => { it('runs through execWithEnv, injects cwd, noninteractive env, and closes stdin', async () => { const proc = processWithOutput({ stdout: 'ok\n' }); const execWithEnv = vi.fn().mockResolvedValue(proc); - const tool = new BashTool(createFakeKaos({ execWithEnv, osEnv: posixEnv }), '/workspace'); + const tool = bashTool( + createFakeKaos({ execWithEnv, osEnv: posixEnv }), + '/workspace', + createBackgroundManager().manager, + ); const result = await executeTool(tool, context({ command: 'printf ok', timeout: 60 })); @@ -296,7 +345,11 @@ describe('BashTool', () => { it('uses args.cwd when provided', async () => { const execWithEnv = vi.fn().mockResolvedValue(processWithOutput({ stdout: 'sub\n' })); - const tool = new BashTool(createFakeKaos({ execWithEnv, osEnv: posixEnv }), '/workspace'); + const tool = bashTool( + createFakeKaos({ execWithEnv, osEnv: posixEnv }), + '/workspace', + createBackgroundManager().manager, + ); await executeTool(tool, context({ command: 'pwd', cwd: '/tmp/project', timeout: 60 })); @@ -306,7 +359,7 @@ describe('BashTool', () => { it('uses Git Bash semantics on Windows', async () => { const proc = processWithOutput({ stdout: 'ok\n' }); const execWithEnv = vi.fn().mockResolvedValue(proc); - const tool = new BashTool( + const tool = bashTool( createFakeKaos({ execWithEnv, osEnv: windowsBashEnv }), 'C:\\Users\\me\\project', ); @@ -329,7 +382,7 @@ describe('BashTool', () => { }); it('returns stderr and marks non-zero exit codes as tool errors', async () => { - const tool = new BashTool( + const tool = bashTool( createFakeKaos({ execWithEnv: vi .fn() @@ -351,7 +404,7 @@ describe('BashTool', () => { }); it('returns both stdout and stderr when a command succeeds', async () => { - const tool = new BashTool( + const tool = bashTool( createFakeKaos({ execWithEnv: vi .fn() @@ -371,7 +424,7 @@ describe('BashTool', () => { }); it('returns both stdout and stderr when a command fails', async () => { - const tool = new BashTool( + const tool = bashTool( createFakeKaos({ execWithEnv: vi.fn().mockResolvedValue( processWithOutput({ @@ -396,6 +449,34 @@ describe('BashTool', () => { expect(result.output).toContain('Command failed with exit code: 2.'); }); + it('returns the manager failure reason when foreground process wait rejects', async () => { + const tool = bashTool( + createFakeKaos({ + execWithEnv: vi.fn().mockResolvedValue( + processWithOutput({ + stdout: 'partial output\n', + exitCode: null, + wait: async () => { + throw new Error('wait failed'); + }, + }), + ), + osEnv: posixEnv, + }), + '/workspace', + ); + + const result = await executeTool(tool, context({ command: 'wait fails', timeout: 60 })); + + expect(result).toMatchObject({ + isError: true, + message: 'wait failed', + brief: 'wait failed', + }); + expect(result.output).toContain('partial output\nwait failed'); + expect(result.output).not.toContain('exit code: null'); + }); + it('preserves foreground stdout and stderr arrival order', async () => { vi.useFakeTimers(); try { @@ -404,7 +485,7 @@ describe('BashTool', () => { { stream: 'stdout', text: 'out-second\n', delayMs: 5 }, { stream: 'stderr', text: 'err-third\n', delayMs: 10 }, ]); - const tool = new BashTool( + const tool = bashTool( createFakeKaos({ execWithEnv: vi.fn().mockResolvedValue(proc), osEnv: posixEnv, @@ -426,11 +507,63 @@ describe('BashTool', () => { } }); + it('can detach a foreground command through the background manager', async () => { + const { proc, finish } = pendingProcess(); + const manager = createBackgroundManager().manager; + const tool = bashTool( + createFakeKaos({ + execWithEnv: vi.fn().mockResolvedValue(proc), + osEnv: posixEnv, + }), + '/workspace', + manager, + ); + + const running = executeTool(tool, context({ command: 'sleep 10', timeout: 60 })); + await vi.waitFor(() => { + expect(manager.list(false)).toHaveLength(1); + }); + const task = manager.list(false)[0]!; + await vi.waitFor(() => { + expect((proc.stdout as PassThrough).listenerCount('data')).toBeGreaterThanOrEqual(1); + }); + (proc.stdout as PassThrough).write('before detach\n'); + + expect(task).toMatchObject({ + kind: 'process', + detached: false, + command: 'sleep 10', + }); + + manager.detach(task.taskId); + const result = await running; + (proc.stdout as PassThrough).write('after detach\n'); + + expect(result).toMatchObject({ isError: false }); + expect(result.output).toContain('before detach\n'); + expect(result.output).not.toContain('after detach\n'); + expect(result.output).toContain(`task_id: ${task.taskId}`); + expect(result.output).toContain('automatic_notification: true'); + expect(manager.getTask(task.taskId)).toMatchObject({ detached: true }); + await vi.waitFor(async () => { + await expect(manager.readOutput(task.taskId)).resolves.toContain('after detach\n'); + }); + + finish(); + await expect(manager.wait(task.taskId)).resolves.toMatchObject({ + status: 'completed', + }); + }); + it('does not spawn when the signal is already aborted', async () => { const controller = new AbortController(); controller.abort(); const execWithEnv = vi.fn(); - const tool = new BashTool(createFakeKaos({ execWithEnv, osEnv: posixEnv }), '/workspace'); + const tool = bashTool( + createFakeKaos({ execWithEnv, osEnv: posixEnv }), + '/workspace', + createBackgroundManager().manager, + ); const result = await executeTool(tool, context({ command: 'echo nope' }, controller.signal)); @@ -451,7 +584,7 @@ describe('BashTool', () => { }); const execWithEnv = vi.fn().mockResolvedValue(proc); const controller = new AbortController(); - const tool = new BashTool(createFakeKaos({ execWithEnv, osEnv: posixEnv }), '/workspace'); + const tool = bashTool(createFakeKaos({ execWithEnv, osEnv: posixEnv }), '/workspace'); const running = executeTool(tool, context({ command: 'sleep 10' }, controller.signal)); await vi.waitFor(() => { @@ -465,12 +598,17 @@ describe('BashTool', () => { expect(result.output).toContain('Interrupted by user'); }); - it('requires a background manager and description for background commands', async () => { + it('requires background tools to be enabled and description for background commands', async () => { const proc = processWithOutput(); const execWithEnv = vi.fn().mockResolvedValue(proc); - const withoutManager = new BashTool(createFakeKaos({ execWithEnv, osEnv: posixEnv }), '/workspace'); + const backgroundDisabled = bashTool( + createFakeKaos({ execWithEnv, osEnv: posixEnv }), + '/workspace', + createBackgroundManager().manager, + { allowBackground: false }, + ); - const unavailable = await executeTool(withoutManager, + const unavailable = await executeTool(backgroundDisabled, context({ command: 'sleep 10', run_in_background: true, description: 'watch' }), ); expect(unavailable).toMatchObject({ isError: true }); @@ -478,7 +616,7 @@ describe('BashTool', () => { expect(execWithEnv).not.toHaveBeenCalled(); const manager = createBackgroundManager().manager; - const withManager = new BashTool( + const withManager = bashTool( createFakeKaos({ execWithEnv, osEnv: posixEnv }), '/workspace', manager, @@ -496,7 +634,7 @@ describe('BashTool', () => { const proc = processWithOutput(); const execWithEnv = vi.fn().mockResolvedValue(proc); const manager = createBackgroundManager().manager; - const tool = new BashTool(createFakeKaos({ execWithEnv, osEnv: posixEnv }), '/workspace', manager); + const tool = bashTool(createFakeKaos({ execWithEnv, osEnv: posixEnv }), '/workspace', manager); const result = await executeTool(tool, context({ command: 'sleep 10', run_in_background: true, description: 'long running task' }), @@ -512,7 +650,7 @@ describe('BashTool', () => { registerProcess(manager, processWithOutput(), 'sleep 10', 'existing task'); const rejectedProc = processWithOutput(); const execWithEnv = vi.fn().mockResolvedValue(rejectedProc); - const tool = new BashTool(createFakeKaos({ execWithEnv, osEnv: posixEnv }), '/workspace', manager); + const tool = bashTool(createFakeKaos({ execWithEnv, osEnv: posixEnv }), '/workspace', manager); const result = await executeTool(tool, context({ command: 'sleep 10', run_in_background: true, description: 'second task' }), @@ -536,7 +674,7 @@ describe('BashTool', () => { .fn() .mockResolvedValueOnce(firstProc) .mockResolvedValueOnce(secondProc); - const tool = new BashTool(createFakeKaos({ execWithEnv, osEnv: posixEnv }), '/workspace', manager); + const tool = bashTool(createFakeKaos({ execWithEnv, osEnv: posixEnv }), '/workspace', manager); const first = executeTool(tool, context({ command: 'sleep 10', run_in_background: true, description: 'first task' }), @@ -568,7 +706,7 @@ describe('BashTool', () => { .fn() .mockResolvedValueOnce(firstProc) .mockResolvedValueOnce(secondProc); - const tool = new BashTool( + const tool = bashTool( createFakeKaos({ execWithEnv, osEnv: windowsBashEnv }), 'C:\\Users\\me\\project', manager, @@ -609,13 +747,13 @@ describe('BashTool', () => { ); }); - it('does not timeout-stop a background task whose exit is visible before wait settles', async () => { + it('timeout-stops a background task that has not settled even if process exit is visible', async () => { vi.useFakeTimers(); try { const { proc, finishWait, markExited } = processWithVisibleExitBeforeWait(0); const execWithEnv = vi.fn().mockResolvedValue(proc); const manager = createBackgroundManager().manager; - const tool = new BashTool(createFakeKaos({ execWithEnv, osEnv: posixEnv }), '/workspace', manager); + const tool = bashTool(createFakeKaos({ execWithEnv, osEnv: posixEnv }), '/workspace', manager); const result = await executeTool(tool, context({ @@ -633,13 +771,12 @@ describe('BashTool', () => { markExited(); await vi.advanceTimersByTimeAsync(1_000); - expect(proc.kill).not.toHaveBeenCalled(); + expect(proc.kill).toHaveBeenCalledWith('SIGTERM'); finishWait(); await vi.runAllTimersAsync(); - expect(manager.getTask(taskId!)?.status).toBe('completed'); - expect(proc.kill).not.toHaveBeenCalled(); + expect(manager.getTask(taskId!)?.status).toBe('timed_out'); } finally { vi.useRealTimers(); } @@ -651,7 +788,7 @@ describe('BashTool', () => { const proc = processThatNeverExits(); const execWithEnv = vi.fn().mockResolvedValue(proc); const manager = createBackgroundManager().manager; - const tool = new BashTool(createFakeKaos({ execWithEnv, osEnv: posixEnv }), '/workspace', manager); + const tool = bashTool(createFakeKaos({ execWithEnv, osEnv: posixEnv }), '/workspace', manager); const result = await executeTool(tool, context({ @@ -676,7 +813,7 @@ describe('BashTool', () => { const proc = processThatNeverExits(); const execWithEnv = vi.fn().mockResolvedValue(proc); const manager = createBackgroundManager().manager; - const tool = new BashTool(createFakeKaos({ execWithEnv, osEnv: posixEnv }), '/workspace', manager); + const tool = bashTool(createFakeKaos({ execWithEnv, osEnv: posixEnv }), '/workspace', manager); const result = await executeTool(tool, context({ @@ -698,7 +835,7 @@ describe('BashTool', () => { it('adds a truncation note when stdout exceeds the cap', async () => { const huge = Buffer.alloc(10 * 1024 * 1024 + 1, 'x'); - const tool = new BashTool( + const tool = bashTool( createFakeKaos({ execWithEnv: vi.fn().mockResolvedValue(processWithOutput({ stdout: huge })), osEnv: posixEnv, @@ -715,7 +852,7 @@ describe('BashTool', () => { it('marks the truncated output buffer with a "[...truncated]" sentinel at the cut point', async () => { const huge = Buffer.alloc(10 * 1024 * 1024 + 1, 'x'); - const tool = new BashTool( + const tool = bashTool( createFakeKaos({ execWithEnv: vi.fn().mockResolvedValue(processWithOutput({ stdout: huge })), osEnv: posixEnv, @@ -732,7 +869,7 @@ describe('BashTool', () => { it('truncates output with the sentinel even when the command fails', async () => { const huge = Buffer.alloc(10 * 1024 * 1024 + 1, 'E'); - const tool = new BashTool( + const tool = bashTool( createFakeKaos({ execWithEnv: vi .fn() @@ -764,7 +901,7 @@ describe('BashTool', () => { resolveWait(143); }, }); - const tool = new BashTool( + const tool = bashTool( createFakeKaos({ execWithEnv: vi.fn().mockResolvedValue(proc), osEnv: posixEnv }), '/workspace', ); @@ -787,7 +924,7 @@ describe('BashTool', () => { vi.useFakeTimers(); try { const proc = processWithOpenStreamsThatExitOnKill(); - const tool = new BashTool( + const tool = bashTool( createFakeKaos({ execWithEnv: vi.fn().mockResolvedValue(proc), osEnv: posixEnv }), '/workspace', ); @@ -817,7 +954,7 @@ describe('BashTool', () => { delete process.env['GIT_SSH_COMMAND']; try { const execWithEnv = vi.fn().mockResolvedValue(processWithOutput({ stdout: 'ok\n' })); - const tool = new BashTool(createFakeKaos({ execWithEnv, osEnv: posixEnv }), '/workspace'); + const tool = bashTool(createFakeKaos({ execWithEnv, osEnv: posixEnv }), '/workspace'); await executeTool(tool, context({ command: 'true', timeout: 60 })); @@ -832,7 +969,7 @@ describe('BashTool', () => { const proc = processWithOutput(); const execWithEnv = vi.fn().mockResolvedValue(proc); const manager = createBackgroundManager().manager; - const tool = new BashTool(createFakeKaos({ execWithEnv, osEnv: posixEnv }), '/workspace', manager); + const tool = bashTool(createFakeKaos({ execWithEnv, osEnv: posixEnv }), '/workspace', manager); const result = await executeTool( tool, @@ -851,7 +988,7 @@ describe('BashTool', () => { it('rejects background command without description (description-required guard)', async () => { const manager = createBackgroundManager().manager; const execWithEnv = vi.fn().mockResolvedValue(processWithOutput()); - const tool = new BashTool(createFakeKaos({ execWithEnv, osEnv: posixEnv }), '/workspace', manager); + const tool = bashTool(createFakeKaos({ execWithEnv, osEnv: posixEnv }), '/workspace', manager); const result = await executeTool( tool, @@ -865,7 +1002,7 @@ describe('BashTool', () => { it('rewrites nul-redirect on Windows so the spawned argv has /dev/null', async () => { const execWithEnv = vi.fn().mockResolvedValue(processWithOutput({ stdout: '' })); - const tool = new BashTool( + const tool = bashTool( createFakeKaos({ execWithEnv, osEnv: windowsBashEnv }), 'C:\\Users\\me\\project', ); @@ -878,7 +1015,7 @@ describe('BashTool', () => { it('passes nul-redirect through unchanged on Linux so the argv keeps the literal file target', async () => { const execWithEnv = vi.fn().mockResolvedValue(processWithOutput({ stdout: '' })); - const tool = new BashTool(createFakeKaos({ execWithEnv, osEnv: posixEnv }), '/workspace'); + const tool = bashTool(createFakeKaos({ execWithEnv, osEnv: posixEnv }), '/workspace'); await executeTool(tool, context({ command: 'ls 2>nul', timeout: 60 })); @@ -887,7 +1024,7 @@ describe('BashTool', () => { }); it('exposes a shell description that documents /bin/bash, TaskOutput/TaskStop, safety and efficiency sections, and background semantics', () => { - const tool = new BashTool( + const tool = bashTool( createFakeKaos({ osEnv: posixEnv }), '/workspace', createBackgroundManager().manager, @@ -911,7 +1048,7 @@ describe('BashTool prompt / runtime consistency', () => { // The set of background tools the prompt actually introduces — taken from // the background-enabled prompt, which is the only variant that documents // any Task* tool. - const enabledTool = new BashTool( + const enabledTool = bashTool( createFakeKaos({ execWithEnv, osEnv: posixEnv }), '/workspace', createBackgroundManager().manager, @@ -920,7 +1057,12 @@ describe('BashTool prompt / runtime consistency', () => { [...enabledTool.description.matchAll(/`(Task[A-Za-z]+)`/g)].map((match) => match[1]), ); - const tool = new BashTool(createFakeKaos({ execWithEnv, osEnv: posixEnv }), '/workspace'); + const tool = bashTool( + createFakeKaos({ execWithEnv, osEnv: posixEnv }), + '/workspace', + createBackgroundManager().manager, + { allowBackground: false }, + ); const result = await executeTool(tool, context({ command: 'sleep 10', run_in_background: true, description: 'watch' }), ); @@ -941,7 +1083,7 @@ describe('BashTool prompt / runtime consistency', () => { }); it('does not claim failure exit codes appear in a system tag', () => { - const tool = new BashTool(createFakeKaos({ osEnv: posixEnv }), '/workspace'); + const tool = bashTool(createFakeKaos({ osEnv: posixEnv }), '/workspace'); // The implementation reports failures as plain text inside the output // (`Command failed with exit code: N`), never via a system tag. diff --git a/packages/agent-core/test/tools/builtin-current.test.ts b/packages/agent-core/test/tools/builtin-current.test.ts index 417969a80..1f812fea6 100644 --- a/packages/agent-core/test/tools/builtin-current.test.ts +++ b/packages/agent-core/test/tools/builtin-current.test.ts @@ -79,6 +79,10 @@ function mockSubagentHost>( } as unknown as T & SessionSubagentHost; } +function agentTool(host: SessionSubagentHost): AgentTool { + return new AgentTool(host, createBackgroundManager().manager); +} + function mockSwarmMode(): SwarmMode { return { enter: vi.fn() } as unknown as SwarmMode; } @@ -231,6 +235,7 @@ describe('current builtin file and shell tools', () => { }, }), '/workspace', + createBackgroundManager().manager, ); expect(BashInputSchema.safeParse({ command: 'printf ok' }).success).toBe(true); @@ -287,7 +292,7 @@ describe('current builtin collaboration tools', () => { completion: Promise.resolve({ result: 'child result' }), }), }); - const tool = new AgentTool(host); + const tool = agentTool(host); const input = { prompt: 'Investigate', description: 'Find cause' }; expect(AgentToolInputSchema.safeParse(input).success).toBe(true); diff --git a/packages/agent-core/test/tools/shell-cancel.test.ts b/packages/agent-core/test/tools/shell-cancel.test.ts index 2dae11804..48ad97234 100644 --- a/packages/agent-core/test/tools/shell-cancel.test.ts +++ b/packages/agent-core/test/tools/shell-cancel.test.ts @@ -4,6 +4,7 @@ import type { Environment, KaosProcess } from '@moonshot-ai/kaos'; import { describe, expect, it, vi } from 'vitest'; import { BashTool } from '../../src/tools/builtin/shell/bash'; +import { createBackgroundManager } from '../agent/background/helpers'; import { executeTool } from './fixtures/execute-tool'; import { createFakeKaos } from './fixtures/fake-kaos'; @@ -35,7 +36,11 @@ describe('BashTool cancellation contract', () => { }; const execWithEnv = vi.fn().mockResolvedValue(proc); const controller = new AbortController(); - const tool = new BashTool(createFakeKaos({ execWithEnv, osEnv: posixEnv }), '/workspace'); + const tool = new BashTool( + createFakeKaos({ execWithEnv, osEnv: posixEnv }), + '/workspace', + createBackgroundManager().manager, + ); const running = executeTool(tool, { turnId: '0', diff --git a/packages/agent-core/test/tools/shell-quoting.test.ts b/packages/agent-core/test/tools/shell-quoting.test.ts index 6fd423998..091b9a3fc 100644 --- a/packages/agent-core/test/tools/shell-quoting.test.ts +++ b/packages/agent-core/test/tools/shell-quoting.test.ts @@ -4,6 +4,7 @@ import type { Environment, KaosProcess } from '@moonshot-ai/kaos'; import { describe, expect, it, vi } from 'vitest'; import { BashInputSchema, BashTool } from '../../src/tools/builtin/shell/bash'; +import { createBackgroundManager } from '../agent/background/helpers'; import { executeTool } from './fixtures/execute-tool'; import { createFakeKaos } from './fixtures/fake-kaos'; @@ -55,7 +56,11 @@ function captureCommandRewrite( ): Promise<{ rewritten: string; argv: readonly string[] }> { const execWithEnv = vi.fn().mockResolvedValue(fakeProcess()); const cwd = env.osKind === 'Windows' ? 'C:\\work' : '/work'; - const tool = new BashTool(createFakeKaos({ execWithEnv, osEnv: env }), cwd); + const tool = new BashTool( + createFakeKaos({ execWithEnv, osEnv: env }), + cwd, + createBackgroundManager().manager, + ); return executeTool(tool, { turnId: '0', @@ -153,7 +158,11 @@ describe('BashTool streaming output updates', () => { ); const execWithEnv = vi.fn().mockResolvedValue(proc); const onUpdate = vi.fn(); - const tool = new BashTool(createFakeKaos({ execWithEnv, osEnv: linuxEnv }), '/work'); + const tool = new BashTool( + createFakeKaos({ execWithEnv, osEnv: linuxEnv }), + '/work', + createBackgroundManager().manager, + ); const result = await executeTool(tool, { turnId: '0', diff --git a/packages/node-sdk/src/rpc.ts b/packages/node-sdk/src/rpc.ts index 10ab9cec5..9dd29822f 100644 --- a/packages/node-sdk/src/rpc.ts +++ b/packages/node-sdk/src/rpc.ts @@ -475,6 +475,17 @@ export abstract class SDKRpcClientBase { }); } + async detachBackgroundTask( + input: SessionIdRpcInput & { taskId: string }, + ): Promise { + const rpc = await this.getRpc(); + return rpc.detachBackground({ + sessionId: input.sessionId, + agentId: this.interactiveAgentId, + taskId: input.taskId, + }); + } + async createGoal(input: SessionIdRpcInput & CreateGoalInput): Promise { const rpc = await this.getRpc(); return rpc.createGoal({ diff --git a/packages/node-sdk/src/session.ts b/packages/node-sdk/src/session.ts index 5e1cffcf5..97857bf32 100644 --- a/packages/node-sdk/src/session.ts +++ b/packages/node-sdk/src/session.ts @@ -302,6 +302,23 @@ export class Session { }); } + /** + * Detach a running foreground task so the current tool call can return while + * the task continues under background-task management. + */ + async detachBackgroundTask(taskId: string): Promise { + this.ensureOpen(); + const trimmedTaskId = normalizeRequiredString( + taskId, + 'Task id cannot be empty', + ErrorCodes.BACKGROUND_TASK_ID_EMPTY, + ); + return this.rpc.detachBackgroundTask({ + sessionId: this.id, + taskId: trimmedTaskId, + }); + } + // --- Goal lifecycle --------------------------------------------------- // Deterministic user/host control surface. There is intentionally no // `updateGoal`: the goal's terminal status is decided by the model via the diff --git a/packages/protocol/src/events.ts b/packages/protocol/src/events.ts index 763052d78..45a88df3f 100644 --- a/packages/protocol/src/events.ts +++ b/packages/protocol/src/events.ts @@ -231,6 +231,7 @@ export interface BackgroundTaskInfoBase { readonly taskId: string; readonly description: string; readonly status: AgentCoreBackgroundTaskStatus; + readonly detached?: boolean; readonly startedAt: number; readonly endedAt: number | null; readonly stopReason?: string; From 7c7c24670ef3af8b314a0ac8d5cbe5fc2f702cf4 Mon Sep 17 00:00:00 2001 From: _Kerman Date: Tue, 16 Jun 2026 21:11:12 +0800 Subject: [PATCH 2/5] fix --- packages/agent-core/src/agent/background/index.ts | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/packages/agent-core/src/agent/background/index.ts b/packages/agent-core/src/agent/background/index.ts index 065f6bc83..6f7ec0483 100644 --- a/packages/agent-core/src/agent/background/index.ts +++ b/packages/agent-core/src/agent/background/index.ts @@ -301,12 +301,14 @@ export class BackgroundManager { list(activeOnly = true, limit?: number): BackgroundTaskInfo[] { const result: BackgroundTaskInfo[] = []; for (const entry of this.tasks.values()) { - if (activeOnly && TERMINAL_STATUSES.has(entry.status)) continue; - result.push(this.toInfo(entry)); + const info = this.toInfo(entry); + if (!this.shouldListTask(info, activeOnly)) continue; + result.push(info); if (limit !== undefined && result.length >= limit) return result; } if (!activeOnly) { for (const ghost of this.ghosts.values()) { + if (!this.shouldListTask(ghost, activeOnly)) continue; result.push(ghost); if (limit !== undefined && result.length >= limit) return result; } @@ -314,6 +316,12 @@ export class BackgroundManager { return result; } + private shouldListTask(info: BackgroundTaskInfo, activeOnly: boolean): boolean { + if (!TERMINAL_STATUSES.has(info.status)) return true; + if (activeOnly) return false; + return info.detached !== false; + } + /** * Return the output snapshot used by TaskOutput. * From b3995e1e90ef9b871a07d34a9c11bbd4103b16c7 Mon Sep 17 00:00:00 2001 From: _Kerman Date: Tue, 16 Jun 2026 21:22:24 +0800 Subject: [PATCH 3/5] fix --- .../agent-core/src/agent/background/index.ts | 18 ++++++++---- packages/agent-core/src/utils/promise.ts | 28 +++++++++++++++---- .../test/agent/background/manager.test.ts | 12 ++++++++ 3 files changed, 47 insertions(+), 11 deletions(-) diff --git a/packages/agent-core/src/agent/background/index.ts b/packages/agent-core/src/agent/background/index.ts index 6f7ec0483..2db451c4f 100644 --- a/packages/agent-core/src/agent/background/index.ts +++ b/packages/agent-core/src/agent/background/index.ts @@ -12,7 +12,7 @@ import { randomBytes } from 'node:crypto'; -import { createControlledPromise, sleep, type ControlledPromise } from '@antfu/utils'; +import { createControlledPromise, type ControlledPromise } from '@antfu/utils'; import type { ContentPart } from '@moonshot-ai/kosong'; import type { Agent } from '../..'; @@ -446,7 +446,11 @@ export class BackgroundManager { return this.toInfo(entry); } - await Promise.race([entry.terminal, sleep(timeoutMs)]); + if (timeoutMs <= 0) { + return this.toInfo(entry); + } + const timeout = timeoutOutcome(timeoutMs, undefined); + await Promise.race([entry.terminal, timeout]).finally(() => timeout.clear()); if (TERMINAL_STATUSES.has(entry.status)) { await entry.persistWriteQueue; @@ -677,12 +681,13 @@ export class BackgroundManager { }); }); + const timeout = timeoutOutcome(entry.options.timeoutMs, { kind: 'timeout' as const }); const outcome = await Promise.race([ worker.then((settlement): TerminalOutcome => ({ kind: 'worker', settlement })), - timeoutOutcome(entry.options.timeoutMs, { kind: 'timeout' as const }), + timeout, entry.stop.then((request): TerminalOutcome => ({ kind: 'stop', request })), this.signalOutcome(entry), - ]); + ]).finally(() => timeout.clear()); const settlement = await this.settlementForOutcome(entry, outcome, worker); await this.finalizeTask(entry, settlement); } @@ -718,10 +723,11 @@ export class BackgroundManager { entry.stopReason = stopReason; entry.abortController.abort(timedOut ? 'Timed out' : stopReason); + const graceTimeout = timeoutOutcome(SIGTERM_GRACE_MS, undefined); const workerAfterAbort = await Promise.race([ worker, - sleep(SIGTERM_GRACE_MS).then(() => undefined), - ]); + graceTimeout, + ]).finally(() => graceTimeout.clear()); if ( outcome.kind === 'stop' && diff --git a/packages/agent-core/src/utils/promise.ts b/packages/agent-core/src/utils/promise.ts index 9b51d4816..03182d3d2 100644 --- a/packages/agent-core/src/utils/promise.ts +++ b/packages/agent-core/src/utils/promise.ts @@ -1,11 +1,29 @@ -import { sleep } from '@antfu/utils'; - const NEVER = new Promise(() => {}); +export type TimeoutOutcomePromise = Promise & { + clear(): void; +}; + export function timeoutOutcome( timeoutMs: number | undefined, outcome: Outcome, -): Promise { - if (timeoutMs === undefined || timeoutMs <= 0) return NEVER; - return sleep(timeoutMs).then(() => outcome); +): TimeoutOutcomePromise { + let timeout: ReturnType | undefined; + const promise: Promise = + timeoutMs === undefined || timeoutMs <= 0 + ? NEVER + : new Promise((resolve) => { + timeout = setTimeout(() => { + timeout = undefined; + resolve(outcome); + }, timeoutMs); + }); + + return Object.assign(promise, { + clear() { + if (timeout === undefined) return; + clearTimeout(timeout); + timeout = undefined; + }, + }); } diff --git a/packages/agent-core/test/agent/background/manager.test.ts b/packages/agent-core/test/agent/background/manager.test.ts index cbbea653a..8614d36bf 100644 --- a/packages/agent-core/test/agent/background/manager.test.ts +++ b/packages/agent-core/test/agent/background/manager.test.ts @@ -526,6 +526,18 @@ describe('BackgroundManager', () => { expect(await manager.wait(runningId, 0)).toMatchObject({ status: 'running' }); }); + it('clears task deadline timers when completion wins the race', async () => { + vi.useFakeTimers(); + const { manager } = createBackgroundManager(); + const taskId = manager.registerTask( + agentTask(Promise.resolve({ result: 'done' }), 'fast deadline task'), + { timeoutMs: 60_000 }, + ); + + await expect(manager.wait(taskId, 60_000)).resolves.toMatchObject({ status: 'completed' }); + expect(vi.getTimerCount()).toBe(0); + }); + it('returns undefined or empty output for unknown task ids', async () => { const { manager } = createBackgroundManager(); From 64ad9611a213c290bb9fc01128e955dcb3242df3 Mon Sep 17 00:00:00 2001 From: _Kerman Date: Tue, 16 Jun 2026 21:44:10 +0800 Subject: [PATCH 4/5] fix --- .../src/agent/background/agent-task.ts | 4 +- .../agent-core/src/agent/background/index.ts | 19 ++++-- .../src/agent/background/process-task.ts | 52 +++++++++++--- .../test/agent/background/manager.test.ts | 67 ++++++++++++++++++- packages/agent-core/test/tools/bash.test.ts | 50 ++++++++++++++ 5 files changed, 174 insertions(+), 18 deletions(-) diff --git a/packages/agent-core/src/agent/background/agent-task.ts b/packages/agent-core/src/agent/background/agent-task.ts index 8d85aaf50..c31796803 100644 --- a/packages/agent-core/src/agent/background/agent-task.ts +++ b/packages/agent-core/src/agent/background/agent-task.ts @@ -32,7 +32,7 @@ export class AgentBackgroundTask implements BackgroundTask { async start(sink: BackgroundTaskSink): Promise { const requestAbort = (): void => { - this.abortController.abort(); + this.abortController.abort(sink.signal.reason); }; if (sink.signal.aborted) { requestAbort(); @@ -45,7 +45,7 @@ export class AgentBackgroundTask implements BackgroundTask { sink.appendOutput(outcome.result); await sink.settle({ status: 'completed' }); } catch (error: unknown) { - if (sink.signal.aborted && isAbortError(error)) { + if (sink.signal.aborted && (isAbortError(error) || error === sink.signal.reason)) { await sink.settle({ status: 'killed' }); return; } diff --git a/packages/agent-core/src/agent/background/index.ts b/packages/agent-core/src/agent/background/index.ts index 2db451c4f..5ee71d987 100644 --- a/packages/agent-core/src/agent/background/index.ts +++ b/packages/agent-core/src/agent/background/index.ts @@ -171,6 +171,7 @@ export type ForegroundTaskReleaseReason = 'detached' | 'terminal'; interface StopRequest { readonly reason?: string; + readonly abortReason?: unknown; } type TerminalOutcome = @@ -695,16 +696,16 @@ export class BackgroundManager { private signalOutcome(entry: ManagedTask): Promise { const signal = entry.options.signal; if (signal === undefined) return new Promise(() => {}); - const outcome: TerminalOutcome = { + const outcome = (): TerminalOutcome => ({ kind: 'stop', - request: { reason: USER_INTERRUPT_REASON }, - }; - if (signal.aborted) return Promise.resolve(outcome); + request: { reason: USER_INTERRUPT_REASON, abortReason: signal.reason }, + }); + if (signal.aborted) return Promise.resolve(outcome()); return new Promise((resolve) => { signal.addEventListener( 'abort', () => { - if (!this.isDetached(entry)) resolve(outcome); + if (!this.isDetached(entry)) resolve(outcome()); }, { once: true }, ); @@ -720,8 +721,14 @@ export class BackgroundManager { const timedOut = outcome.kind === 'timeout'; const stopReason = outcome.kind === 'stop' ? outcome.request.reason : undefined; + let abortReason: unknown; + if (timedOut) { + abortReason = 'Timed out'; + } else if (outcome.kind === 'stop') { + abortReason = outcome.request.abortReason ?? stopReason; + } entry.stopReason = stopReason; - entry.abortController.abort(timedOut ? 'Timed out' : stopReason); + entry.abortController.abort(abortReason); const graceTimeout = timeoutOutcome(SIGTERM_GRACE_MS, undefined); const workerAfterAbort = await Promise.race([ diff --git a/packages/agent-core/src/agent/background/process-task.ts b/packages/agent-core/src/agent/background/process-task.ts index a0e27184e..88ef87d5e 100644 --- a/packages/agent-core/src/agent/background/process-task.ts +++ b/packages/agent-core/src/agent/background/process-task.ts @@ -112,18 +112,52 @@ function observeProcessStream( onOutput?.(kind, chunk); }); - return new Promise((resolve) => { - const done = (): void => { + return new Promise((resolve, reject) => { + let ended = false; + const settle = (callback: () => void): void => { cleanup(); - resolve(); + callback(); + }; + const done = (): void => { + settle(resolve); + }; + const fail = (error: unknown): void => { + settle(() => reject(error)); + }; + const onEnd = (): void => { + ended = true; + done(); + }; + const onClose = (): void => { + if (ended || sink.signal.aborted) { + done(); + return; + } + + fail(createPrematureCloseError()); + }; + const onError = (error: Error): void => { + // When the task is aborted we intentionally destroy the streams, which + // can emit errors. Swallow those expected errors; surface anything else. + if (sink.signal.aborted) { + done(); + } else { + fail(error); + } }; const cleanup = (): void => { - stream.removeListener('end', done); - stream.removeListener('close', done); - stream.removeListener('error', done); + stream.removeListener('end', onEnd); + stream.removeListener('close', onClose); + stream.removeListener('error', onError); }; - stream.once('end', done); - stream.once('close', done); - stream.once('error', done); + stream.once('end', onEnd); + stream.once('close', onClose); + stream.once('error', onError); }); } + +function createPrematureCloseError(): Error { + const error = new Error('Premature close') as NodeJS.ErrnoException; + error.code = 'ERR_STREAM_PREMATURE_CLOSE'; + return error; +} diff --git a/packages/agent-core/test/agent/background/manager.test.ts b/packages/agent-core/test/agent/background/manager.test.ts index 8614d36bf..3ffab7840 100644 --- a/packages/agent-core/test/agent/background/manager.test.ts +++ b/packages/agent-core/test/agent/background/manager.test.ts @@ -4,7 +4,7 @@ import { mkdtemp, rm } from 'node:fs/promises'; import { tmpdir } from 'node:os'; -import { Readable } from 'node:stream'; +import { PassThrough, Readable } from 'node:stream'; import type { Writable } from 'node:stream'; import { join } from 'pathe'; @@ -22,6 +22,7 @@ import { registerProcess, waitForOutput, } from './helpers'; +import { isUserCancellation, userCancellationReason } from '../../../src/utils/abort'; function immediateProcess(exitCode: number, stdoutText = ''): KaosProcess { return { @@ -47,6 +48,22 @@ function rejectedProcess(error: Error): KaosProcess { }; } +function processWithStdoutError(message = 'stdout read failed'): KaosProcess { + const stdout = new PassThrough(); + return { + stdin: { write: vi.fn(), end: vi.fn() } as unknown as Writable, + stdout, + stderr: Readable.from([]), + pid: 99998, + exitCode: 0, + wait: vi.fn(async () => { + stdout.destroy(new Error(message)); + return 0; + }) as KaosProcess['wait'], + kill: vi.fn().mockResolvedValue(undefined) as KaosProcess['kill'], + }; +} + function pendingProcess(exitOnKill = 143): { proc: KaosProcess; killSpy: ReturnType; @@ -234,6 +251,37 @@ describe('BackgroundManager', () => { }); }); + it('forwards foreground signal abort reasons to agent task controllers', async () => { + const { manager } = createBackgroundManager(); + const foregroundController = new AbortController(); + const subagentController = new AbortController(); + const completion = new Promise<{ result: string }>((_resolve, reject) => { + subagentController.signal.addEventListener( + 'abort', + () => { + reject(subagentController.signal.reason); + }, + { once: true }, + ); + }); + const taskId = manager.registerTask( + agentTask(completion, 'foreground agent', { abortController: subagentController }), + { + detached: false, + signal: foregroundController.signal, + }, + ); + + foregroundController.abort(userCancellationReason()); + + const info = await manager.wait(taskId); + expect(info).toMatchObject({ + status: 'killed', + stopReason: 'Interrupted by user', + }); + expect(isUserCancellation(subagentController.signal.reason)).toBe(true); + }); + it('does not count foreground tasks against the detached task limit', () => { const { manager } = createBackgroundManager({ maxRunningTasks: 1 }); manager.registerTask(agentTask(new Promise(() => {}), 'foreground agent'), { @@ -298,6 +346,23 @@ describe('BackgroundManager', () => { expect(await manager.readOutput(taskId)).toContain('captured output'); }); + it('fails process tasks when output capture errors after successful exit', async () => { + const { manager } = createBackgroundManager(); + const taskId = registerProcess( + manager, + processWithStdoutError(), + 'ssh example.test', + 'stream error test', + ); + + await expect(manager.wait(taskId)).resolves.toMatchObject({ + kind: 'process', + status: 'failed', + exitCode: 0, + stopReason: 'stdout read failed', + }); + }); + it('transitions process status from exit code', async () => { const { manager } = createBackgroundManager(); const successId = registerProcess(manager, immediateProcess(0), 'echo done', 'ok'); diff --git a/packages/agent-core/test/tools/bash.test.ts b/packages/agent-core/test/tools/bash.test.ts index b1f4ba076..f0969e2d0 100644 --- a/packages/agent-core/test/tools/bash.test.ts +++ b/packages/agent-core/test/tools/bash.test.ts @@ -162,6 +162,40 @@ function processThatNeverExits(): KaosProcess { }; } +function processWithStreamError(options: { + readonly stdoutError?: Error; + readonly stderrError?: Error; + readonly exitCode?: number; +} = {}): KaosProcess { + const exitCode = options.exitCode ?? 0; + const stdout = new PassThrough(); + const stderr = new PassThrough(); + const waitPromise = new Promise((resolve) => { + setTimeout(() => { + if (options.stdoutError !== undefined) { + stdout.emit('error', options.stdoutError); + } else { + stdout.end(); + } + if (options.stderrError !== undefined) { + stderr.emit('error', options.stderrError); + } else { + stderr.end(); + } + resolve(exitCode); + }, 1); + }); + return { + stdin: { end: vi.fn(), write: vi.fn() } as unknown as Writable, + stdout, + stderr, + pid: 128, + exitCode, + wait: vi.fn(async () => waitPromise), + kill: vi.fn(async () => {}), + }; +} + function processWithOpenStreamsThatExitOnKill(): KaosProcess { let currentExitCode: number | null = null; let resolveWait: (code: number) => void = () => {}; @@ -945,6 +979,22 @@ describe('BashTool', () => { } }); + it('reports a stream read error as a tool error even when the process exits with code 0', async () => { + const proc = processWithStreamError({ + stdoutError: new Error('SSH channel read failed'), + exitCode: 0, + }); + const tool = bashTool( + createFakeKaos({ execWithEnv: vi.fn().mockResolvedValue(proc), osEnv: posixEnv }), + '/workspace', + ); + + const result = await executeTool(tool, context({ command: 'remote-cmd', timeout: 60 })); + + expect(result).toMatchObject({ isError: true }); + expect(result.output).toContain('SSH channel read failed'); + }); + it('rejects empty-string commands at the schema layer', () => { expect(BashInputSchema.safeParse({ command: '' }).success).toBe(false); }); From bb71cebb9b5578858a1dbda8d5d5ad4bdbbc31c7 Mon Sep 17 00:00:00 2001 From: _Kerman Date: Wed, 17 Jun 2026 16:05:50 +0800 Subject: [PATCH 5/5] fix(background): defer foreground task persistence until detach or spill MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Foreground Bash commands (run_in_background omitted) were registered with the BackgroundManager as detached:false, which immediately persisted task metadata and streamed the full stdout/stderr to tasks//output.log. Those terminal detached:false tasks are filtered out of every listing, so ordinary foreground commands left undiscoverable full-output logs in the session directory — a disk-growth regression for large or frequent commands. Foreground tasks now keep their output in memory and touch disk only once they actually detach (or spill past a 1 MiB in-memory bound). On detach the buffered pre-detach output is flushed first so output.log stays the complete, in-order record. A foreground task that finishes without detaching or spilling writes nothing to disk. Spilled/detached records are persisted and kept as before — not deleted. --- .../agent-core/src/agent/background/index.ts | 70 ++++++++- .../background/foreground-persistence.test.ts | 147 ++++++++++++++++++ 2 files changed, 214 insertions(+), 3 deletions(-) create mode 100644 packages/agent-core/test/agent/background/foreground-persistence.test.ts diff --git a/packages/agent-core/src/agent/background/index.ts b/packages/agent-core/src/agent/background/index.ts index 5ee71d987..7d04be4eb 100644 --- a/packages/agent-core/src/agent/background/index.ts +++ b/packages/agent-core/src/agent/background/index.ts @@ -78,6 +78,19 @@ interface ManagedTask { readonly abortController: AbortController; persistWriteQueue: Promise; outputWriteQueue: Promise; + /** + * Full output buffered in memory while a foreground task has not yet + * persisted to disk. Flushed to `output.log` (in order, ahead of the live + * stream) when the task detaches or spills, then released. + */ + pendingOutput: string[]; + pendingOutputBytes: number; + /** + * Whether `output.log` writes have begun. True from the start for tasks + * registered already-detached; flipped on detach or memory-bound spill for + * foreground tasks. Until then output stays in `pendingOutput`. + */ + outputPersistStarted: boolean; } /** @@ -270,13 +283,18 @@ export class BackgroundManager { abortController: new AbortController(), persistWriteQueue: Promise.resolve(), outputWriteQueue: Promise.resolve(), + pendingOutput: [], + pendingOutputBytes: 0, + outputPersistStarted: detached, }; this.tasks.set(taskId, entry); void this.runTaskLifecycle(entry); - // Initial persistence (snapshot at start). - void this.persistLive(entry); + // Initial persistence (snapshot at start). Foreground tasks defer all + // persistence until they detach (or spill) — see appendOutput / detach / + // finalizeTask — so ordinary commands leave nothing undiscoverable on disk. if (this.isDetached(entry)) { + void this.persistLive(entry); this.emitTaskStarted(this.toInfo(entry)); } @@ -400,6 +418,9 @@ export class BackgroundManager { } catch { /* detach has already succeeded; hooks must not make RPC fail */ } + // Flush buffered pre-detach output to disk before the live stream resumes, + // so output.log stays the complete, in-order record. + this.startOutputPersist(entry); void this.persistLive(entry); this.emitTaskStarted(this.toInfo(entry)); foregroundRelease.resolve('detached'); @@ -563,6 +584,23 @@ export class BackgroundManager { total -= removed.length; } + if (this.persistence === undefined) return; + + // Foreground tasks keep their full output in memory and only touch disk + // once they detach. A memory-bound spill begins disk persistence early so + // a never-detached command can't grow the buffer without limit. + if (!entry.outputPersistStarted) { + entry.pendingOutput.push(chunk); + entry.pendingOutputBytes += Buffer.byteLength(chunk, 'utf-8'); + if (entry.pendingOutputBytes > MAX_OUTPUT_BYTES) this.startOutputPersist(entry); + return; + } + + this.appendTaskOutput(entry, chunk); + } + + /** Enqueue an `output.log` append, serialized per task. No-op when detached managers omit persistence. */ + private appendTaskOutput(entry: ManagedTask, chunk: string): void { const persistence = this.persistence; if (persistence === undefined) return; entry.outputWriteQueue = entry.outputWriteQueue @@ -570,6 +608,22 @@ export class BackgroundManager { .catch(() => { }); } + /** + * Begin persisting `output.log` for a task that buffered while foreground. + * Flushes the buffered pre-detach output first (in order, ahead of the live + * stream) so the on-disk log stays complete, then releases the buffer. + * Idempotent. + */ + private startOutputPersist(entry: ManagedTask): void { + if (entry.outputPersistStarted) return; + entry.outputPersistStarted = true; + if (entry.pendingOutput.length > 0) { + this.appendTaskOutput(entry, entry.pendingOutput.join('')); + } + entry.pendingOutput = []; + entry.pendingOutputBytes = 0; + } + private async restoreBackgroundTaskNotifications(): Promise { for (const info of this.list(false)) { if (!isBackgroundTaskTerminal(info.status)) continue; @@ -767,7 +821,17 @@ export class BackgroundManager { entry.endedAt = Date.now(); entry.stopReason = settlement.stopReason ?? (settlement.status === 'killed' ? entry.stopReason : undefined); - await this.persistLive(entry); + // Persist the terminal record only when the task actually touched disk: + // detached tasks, and foreground tasks that spilled past the in-memory + // buffer. A foreground task whose output stayed in memory leaves nothing on + // disk — release the buffer and skip persistence so it never accumulates as + // an undiscoverable log. + if (entry.outputPersistStarted) { + await this.persistLive(entry); + } else { + entry.pendingOutput = []; + entry.pendingOutputBytes = 0; + } this.fireTerminalEffects(entry); entry.foregroundRelease?.resolve('terminal'); entry.terminal.resolve(); diff --git a/packages/agent-core/test/agent/background/foreground-persistence.test.ts b/packages/agent-core/test/agent/background/foreground-persistence.test.ts new file mode 100644 index 000000000..572dae09a --- /dev/null +++ b/packages/agent-core/test/agent/background/foreground-persistence.test.ts @@ -0,0 +1,147 @@ +/** + * Foreground task persistence: foreground commands keep their output in memory + * and only touch disk once they detach or spill past the in-memory buffer. A + * foreground command that finishes without either leaves nothing on disk, so + * undiscoverable logs don't accumulate. + */ + +import { existsSync, mkdtempSync, rmSync } from 'node:fs'; +import { tmpdir } from 'node:os'; +import { Readable } from 'node:stream'; +import type { Writable } from 'node:stream'; +import { join } from 'pathe'; + +import type { KaosProcess } from '@moonshot-ai/kaos'; +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; + +import { ProcessBackgroundTask, type BackgroundManager } from '../../../src/agent/background'; +import { createBackgroundManager, waitForTerminal } from './helpers'; + +const MAX_OUTPUT_BYTES = 1024 * 1024; + +const tick = (): Promise => new Promise((resolve) => setTimeout(resolve, 5)); + +function immediateProcess(exitCode: number, stdoutText = ''): KaosProcess { + return { + stdin: { write: vi.fn(), end: vi.fn() } as unknown as Writable, + stdout: Readable.from(stdoutText ? [stdoutText] : []), + stderr: Readable.from([]), + pid: 60000 + exitCode, + exitCode, + wait: vi.fn().mockResolvedValue(exitCode) as KaosProcess['wait'], + kill: vi.fn().mockResolvedValue(undefined) as KaosProcess['kill'], + dispose: vi.fn().mockResolvedValue(undefined) as KaosProcess['dispose'], + }; +} + +/** A process whose stdout and exit are driven by the test, for timing control. */ +function controllableProcess(): { + proc: KaosProcess; + pushStdout: (text: string) => void; + finish: (exitCode: number) => void; +} { + const stdout = new Readable({ read() {} }); + let resolveWait!: (code: number) => void; + const waitPromise = new Promise((resolve) => { + resolveWait = resolve; + }); + const proc: KaosProcess = { + stdin: { write: vi.fn(), end: vi.fn() } as unknown as Writable, + stdout, + stderr: Readable.from([]), + pid: 61000, + exitCode: null, + wait: vi.fn(() => waitPromise) as KaosProcess['wait'], + kill: vi.fn().mockResolvedValue(undefined) as KaosProcess['kill'], + dispose: vi.fn().mockResolvedValue(undefined) as KaosProcess['dispose'], + }; + return { + proc, + pushStdout: (text) => stdout.push(text), + finish: (exitCode) => { + (proc as { exitCode: number | null }).exitCode = exitCode; + stdout.push(null); + resolveWait(exitCode); + }, + }; +} + +function registerForeground( + manager: BackgroundManager, + proc: KaosProcess, + command: string, + description: string, +): string { + return manager.registerTask(new ProcessBackgroundTask(proc, command, description), { + detached: false, + }); +} + +describe('BackgroundManager — foreground persistence', () => { + let sessionDir: string; + let manager: BackgroundManager; + let persistence: NonNullable['persistence']>; + + beforeEach(() => { + sessionDir = mkdtempSync(join(tmpdir(), 'bpm-fg-')); + const fixture = createBackgroundManager({ sessionDir }); + manager = fixture.manager; + persistence = fixture.persistence!; + }); + + afterEach(() => { + rmSync(sessionDir, { recursive: true, force: true }); + }); + + const taskJsonPath = (taskId: string): string => join(sessionDir, 'tasks', `${taskId}.json`); + + it('writes nothing to disk for a foreground task that does not spill or detach', async () => { + const taskId = registerForeground(manager, immediateProcess(0, 'hello\n'), 'echo', 'demo'); + + await waitForTerminal(manager, taskId); + + expect(existsSync(taskJsonPath(taskId))).toBe(false); + expect(existsSync(persistence.taskOutputFile(taskId))).toBe(false); + + // Output is still readable from the in-memory ring buffer. + const snapshot = await manager.getOutputSnapshot(taskId, 1_000); + expect(snapshot.fullOutputAvailable).toBe(false); + expect(snapshot.preview).toContain('hello'); + }); + + it('flushes complete pre-detach output to disk when a foreground task detaches', async () => { + const { proc, pushStdout, finish } = controllableProcess(); + const taskId = registerForeground(manager, proc, 'stream', 'demo'); + + pushStdout('before-detach\n'); + await tick(); // buffered in memory, not yet on disk + expect(existsSync(persistence.taskOutputFile(taskId))).toBe(false); + + expect(manager.detach(taskId)?.detached).toBe(true); + + pushStdout('after-detach\n'); + await tick(); + finish(0); + await waitForTerminal(manager, taskId); + + // output.log is the complete, in-order record across the detach boundary. + expect(await manager.readOutput(taskId)).toBe('before-detach\nafter-detach\n'); + expect(existsSync(taskJsonPath(taskId))).toBe(true); + }); + + it('spills to disk and keeps the log when foreground output exceeds the buffer', async () => { + const big = 'a'.repeat(MAX_OUTPUT_BYTES + 1024); + const taskId = registerForeground(manager, immediateProcess(0, big), 'flood', 'demo'); + + await waitForTerminal(manager, taskId); + + // getOutputSnapshot drains the output write queue before reporting size. + const snapshot = await manager.getOutputSnapshot(taskId, 1_000); + + // Spilled artifacts are persisted complete and NOT deleted on completion. + expect(existsSync(persistence.taskOutputFile(taskId))).toBe(true); + expect(existsSync(taskJsonPath(taskId))).toBe(true); + expect(snapshot.fullOutputAvailable).toBe(true); + expect(snapshot.outputSizeBytes).toBe(big.length); + }); +});