diff --git a/.changeset/preserve-interrupted-assistant-text.md b/.changeset/preserve-interrupted-assistant-text.md new file mode 100644 index 000000000..eeae74ed2 --- /dev/null +++ b/.changeset/preserve-interrupted-assistant-text.md @@ -0,0 +1,6 @@ +--- +"@moonshot-ai/agent-core": patch +"@moonshot-ai/kimi-code": patch +--- + +Preserve visible assistant text streamed before an interrupted turn. diff --git a/packages/agent-core/src/agent/turn/kosong-llm.ts b/packages/agent-core/src/agent/turn/kosong-llm.ts index b33614e43..c8e88d720 100644 --- a/packages/agent-core/src/agent/turn/kosong-llm.ts +++ b/packages/agent-core/src/agent/turn/kosong-llm.ts @@ -208,7 +208,7 @@ function buildKosongCallbacks( } if (part.type === 'think') { if (params.onThinkDelta === undefined) return; - params.onThinkDelta(part.think); + params.onThinkDelta(part.think, part); return; } if (part.type === 'function') { diff --git a/packages/agent-core/src/loop/llm.ts b/packages/agent-core/src/loop/llm.ts index 88bf716e7..4a4f9d469 100644 --- a/packages/agent-core/src/loop/llm.ts +++ b/packages/agent-core/src/loop/llm.ts @@ -42,7 +42,12 @@ export interface LLMChatParams { signal: AbortSignal; requestLogContext?: LLMRequestLogContext; onTextDelta?: ((delta: string) => void) | undefined; - onThinkDelta?: ((delta: string) => void) | undefined; + /** + * Streams visible thinking text for UI updates. Adapters may pass the + * matching raw `ThinkPart` as the second argument so abort recovery can keep + * provider metadata such as encrypted thinking signatures. + */ + onThinkDelta?: ((delta: string, part?: ThinkPart) => void) | undefined; onToolCallDelta?: ((delta: ToolCallDelta) => void) | undefined; /** * Fires once per completed text block. Additive relative to diff --git a/packages/agent-core/src/loop/retry.ts b/packages/agent-core/src/loop/retry.ts index b47d08297..735125e0b 100644 --- a/packages/agent-core/src/loop/retry.ts +++ b/packages/agent-core/src/loop/retry.ts @@ -22,6 +22,7 @@ export interface ChatWithRetryInput { readonly currentStep: number; readonly stepUuid: string; readonly maxAttempts?: number; + readonly onRetrying?: () => void; readonly log?: Logger | undefined; } @@ -51,6 +52,7 @@ export async function chatWithRetry(input: ChatWithRetryInput): Promise { - dispatchEvent({ type: 'text.delta', delta }); - }, - onThinkDelta: (delta) => { - dispatchEvent({ type: 'thinking.delta', delta }); - }, - onToolCallDelta: (delta) => { - dispatchEvent({ - type: 'tool.call.delta', - toolCallId: delta.toolCallId, - name: delta.name, - argumentsPart: delta.argumentsPart, - }); - }, - onTextPart: async (part) => { - await dispatchEvent({ - type: 'content.part', - uuid: randomUUID(), - turnId, - step: currentStep, - stepUuid, - part, - }); - }, - onThinkPart: async (part) => { + let bufferedText = ''; + const bufferedThinkParts: ThinkPart[] = []; + + const clearBuffer = (): void => { + bufferedText = ''; + bufferedThinkParts.length = 0; + }; + + const bufferThinkPart = (part: ThinkPart): void => { + if (part.think.length === 0 && part.encrypted === undefined) return; + + const next: ThinkPart = { ...part }; + const last = bufferedThinkParts.at(-1); + if (last === undefined || !mergeInPlace(last, next)) { + bufferedThinkParts.push(next); + } + }; + + const flushOnAbort = async (): Promise => { + const text = bufferedText; + if (text.length === 0) return; + for (const part of bufferedThinkParts) { await dispatchEvent({ type: 'content.part', uuid: randomUUID(), @@ -260,6 +265,60 @@ function createChatStreamingCallbacks(deps: { stepUuid, part, }); - }, + } + await dispatchEvent({ + type: 'content.part', + uuid: randomUUID(), + turnId, + step: currentStep, + stepUuid, + part: { type: 'text', text }, + }); + clearBuffer(); + }; + + return { + callbacks: { + onTextDelta: (delta) => { + bufferedText += delta; + dispatchEvent({ type: 'text.delta', delta }); + }, + onThinkDelta: (delta, part) => { + bufferThinkPart(part ?? { type: 'think', think: delta }); + dispatchEvent({ type: 'thinking.delta', delta }); + }, + onToolCallDelta: (delta) => { + dispatchEvent({ + type: 'tool.call.delta', + toolCallId: delta.toolCallId, + name: delta.name, + argumentsPart: delta.argumentsPart, + }); + }, + onTextPart: async (part) => { + clearBuffer(); + await dispatchEvent({ + type: 'content.part', + uuid: randomUUID(), + turnId, + step: currentStep, + stepUuid, + part, + }); + }, + onThinkPart: async (part) => { + clearBuffer(); + await dispatchEvent({ + type: 'content.part', + uuid: randomUUID(), + turnId, + step: currentStep, + stepUuid, + part, + }); + }, + } satisfies ChatStreamingCallbacks, + clearBuffer, + flushOnAbort, }; } diff --git a/packages/agent-core/test/loop/api-shape.e2e.test.ts b/packages/agent-core/test/loop/api-shape.e2e.test.ts index be9afbe61..f68538cbd 100644 --- a/packages/agent-core/test/loop/api-shape.e2e.test.ts +++ b/packages/agent-core/test/loop/api-shape.e2e.test.ts @@ -109,7 +109,12 @@ function _typeOnlyChecks(): void { tools: [], signal: _signal, onTextDelta: (_delta: string) => {}, - onThinkDelta: (_delta: string) => {}, + onThinkDelta: (_delta: string, _part) => { + const thinking: string | undefined = _part?.think; + const encrypted: string | undefined = _part?.encrypted; + void thinking; + void encrypted; + }, onToolCallDelta: (_delta) => {}, onTextPart: async (_part) => { const text: string = _part.text; diff --git a/packages/agent-core/test/loop/streaming.e2e.test.ts b/packages/agent-core/test/loop/streaming.e2e.test.ts index d2bea7267..f187e5fbd 100644 --- a/packages/agent-core/test/loop/streaming.e2e.test.ts +++ b/packages/agent-core/test/loop/streaming.e2e.test.ts @@ -54,6 +54,12 @@ async function runWithLLM(llm: LLM): Promise<{ return { sink, context }; } +function abortError(): Error { + const error = new Error('aborted'); + error.name = 'AbortError'; + return error; +} + describe('runTurn — streaming callbacks', () => { it('routes onTextDelta into text.delta events', async () => { const llm = new StreamingLLM(async (params) => { @@ -70,6 +76,257 @@ describe('runTurn — streaming callbacks', () => { expect(deltas).toEqual(['hel', 'lo']); }); + it('persists buffered text deltas as content when a step is aborted', async () => { + const controller = new AbortController(); + const llm = new StreamingLLM(async (params) => { + params.onTextDelta?.('partial '); + params.onTextDelta?.('answer'); + controller.abort(); + throw abortError(); + }); + const sink = new CollectingSink(); + const context = new RecordingContext(); + const result = await runTurn({ + turnId: 'turn-1', + signal: controller.signal, + llm, + buildMessages: context.buildMessages, + dispatchEvent: createLoopEventDispatcher({ + appendTranscriptRecord: context.appendTranscriptRecord, + emitLiveEvent: sink.emit, + }), + }); + + expect(result.stopReason).toBe('aborted'); + expect(sink.byType('text.delta').map((e) => e.delta)).toEqual([ + 'partial ', + 'answer', + ]); + expect(context.contentParts().map((e) => e.part)).toEqual([ + { type: 'text', text: 'partial answer' }, + ]); + expect(context.stepEnds()).toEqual([]); + }); + + it('does not persist thinking deltas when a step is aborted', async () => { + const controller = new AbortController(); + const llm = new StreamingLLM(async (params) => { + params.onThinkDelta?.('partial reasoning'); + controller.abort(); + throw abortError(); + }); + const sink = new CollectingSink(); + const context = new RecordingContext(); + const result = await runTurn({ + turnId: 'turn-1', + signal: controller.signal, + llm, + buildMessages: context.buildMessages, + dispatchEvent: createLoopEventDispatcher({ + appendTranscriptRecord: context.appendTranscriptRecord, + emitLiveEvent: sink.emit, + }), + }); + + expect(result.stopReason).toBe('aborted'); + expect(sink.byType('thinking.delta').map((e) => e.delta)).toEqual([ + 'partial reasoning', + ]); + expect(context.contentParts()).toEqual([]); + expect(context.stepEnds()).toEqual([]); + }); + + it('persists buffered thinking before text when a step is aborted after visible text starts', async () => { + const controller = new AbortController(); + const llm = new StreamingLLM(async (params) => { + params.onThinkDelta?.('complete reasoning'); + params.onTextDelta?.('partial answer'); + controller.abort(); + throw abortError(); + }); + const sink = new CollectingSink(); + const context = new RecordingContext(); + const result = await runTurn({ + turnId: 'turn-1', + signal: controller.signal, + llm, + buildMessages: context.buildMessages, + dispatchEvent: createLoopEventDispatcher({ + appendTranscriptRecord: context.appendTranscriptRecord, + emitLiveEvent: sink.emit, + }), + }); + + expect(result.stopReason).toBe('aborted'); + expect(sink.byType('thinking.delta').map((e) => e.delta)).toEqual([ + 'complete reasoning', + ]); + expect(sink.byType('text.delta').map((e) => e.delta)).toEqual([ + 'partial answer', + ]); + expect(context.contentParts().map((e) => e.part)).toEqual([ + { type: 'think', think: 'complete reasoning' }, + { type: 'text', text: 'partial answer' }, + ]); + expect(context.stepEnds()).toEqual([]); + }); + + it('preserves buffered thinking signatures when a step is aborted after visible text starts', async () => { + const controller = new AbortController(); + const llm = new StreamingLLM(async (params) => { + params.onThinkDelta?.('complete reasoning', { + type: 'think', + think: 'complete reasoning', + }); + params.onThinkDelta?.('', { + type: 'think', + think: '', + encrypted: 'sig-abc', + }); + params.onTextDelta?.('partial answer'); + controller.abort(); + throw abortError(); + }); + const sink = new CollectingSink(); + const context = new RecordingContext(); + const result = await runTurn({ + turnId: 'turn-1', + signal: controller.signal, + llm, + buildMessages: context.buildMessages, + dispatchEvent: createLoopEventDispatcher({ + appendTranscriptRecord: context.appendTranscriptRecord, + emitLiveEvent: sink.emit, + }), + }); + + expect(result.stopReason).toBe('aborted'); + expect(sink.byType('thinking.delta').map((e) => e.delta)).toEqual([ + 'complete reasoning', + '', + ]); + expect(sink.byType('text.delta').map((e) => e.delta)).toEqual([ + 'partial answer', + ]); + expect(context.contentParts().map((e) => e.part)).toEqual([ + { + type: 'think', + think: 'complete reasoning', + encrypted: 'sig-abc', + }, + { type: 'text', text: 'partial answer' }, + ]); + expect(context.stepEnds()).toEqual([]); + }); + + it('does not persist buffered text deltas when a step fails without aborting', async () => { + const llm = new StreamingLLM(async (params) => { + params.onTextDelta?.('partial answer'); + throw new Error('provider failed'); + }); + const sink = new CollectingSink(); + const context = new RecordingContext(); + + await expect( + runTurn({ + turnId: 'turn-1', + signal: new AbortController().signal, + llm, + buildMessages: context.buildMessages, + dispatchEvent: createLoopEventDispatcher({ + appendTranscriptRecord: context.appendTranscriptRecord, + emitLiveEvent: sink.emit, + }), + }), + ).rejects.toThrow('provider failed'); + + expect(sink.byType('text.delta').map((e) => e.delta)).toEqual(['partial answer']); + expect(context.contentParts()).toEqual([]); + expect(context.stepEnds()).toEqual([]); + }); + + it('drops buffered deltas from a failed retry attempt before abort flush', async () => { + const controller = new AbortController(); + const retryableError = new Error('retryable provider failure'); + let attempts = 0; + const llm: LLM = { + systemPrompt: 'streaming system prompt', + modelName: 'streaming', + isRetryableError: (error) => error === retryableError, + async chat(params) { + attempts += 1; + if (attempts === 1) { + params.onThinkDelta?.('discarded thinking'); + params.onTextDelta?.('discarded text '); + throw retryableError; + } + + params.onThinkDelta?.('kept thinking'); + params.onTextDelta?.('kept text'); + controller.abort(); + throw abortError(); + }, + }; + const sink = new CollectingSink(); + const context = new RecordingContext(); + const result = await runTurn({ + turnId: 'turn-1', + signal: controller.signal, + llm, + buildMessages: context.buildMessages, + dispatchEvent: createLoopEventDispatcher({ + appendTranscriptRecord: context.appendTranscriptRecord, + emitLiveEvent: sink.emit, + }), + maxRetryAttempts: 2, + }); + + expect(result.stopReason).toBe('aborted'); + expect(attempts).toBe(2); + expect(sink.byType('step.retrying')).toHaveLength(1); + expect(sink.byType('text.delta').map((e) => e.delta)).toEqual([ + 'discarded text ', + 'kept text', + ]); + expect(context.contentParts().map((e) => e.part)).toEqual([ + { type: 'think', think: 'kept thinking' }, + { type: 'text', text: 'kept text' }, + ]); + expect(context.stepEnds()).toEqual([]); + }); + + it('does not duplicate buffered text after an emitted text part is recorded', async () => { + const controller = new AbortController(); + const llm = new StreamingLLM(async (params) => { + params.onTextDelta?.('complete'); + await params.onTextPart?.({ + type: 'text', + text: 'complete', + }); + controller.abort(); + throw abortError(); + }); + const sink = new CollectingSink(); + const context = new RecordingContext(); + const result = await runTurn({ + turnId: 'turn-1', + signal: controller.signal, + llm, + buildMessages: context.buildMessages, + dispatchEvent: createLoopEventDispatcher({ + appendTranscriptRecord: context.appendTranscriptRecord, + emitLiveEvent: sink.emit, + }), + }); + + expect(result.stopReason).toBe('aborted'); + expect(sink.byType('text.delta').map((e) => e.delta)).toEqual(['complete']); + expect(context.contentParts().map((e) => e.part)).toEqual([ + { type: 'text', text: 'complete' }, + ]); + expect(context.stepEnds()).toEqual([]); + }); + it('routes onThinkDelta into thinking.delta events', async () => { const llm = new StreamingLLM(async (params) => { params.onThinkDelta?.('think...'); diff --git a/packages/node-sdk/test/session-prompt-events.test.ts b/packages/node-sdk/test/session-prompt-events.test.ts index 6845ee6bc..25c690100 100644 --- a/packages/node-sdk/test/session-prompt-events.test.ts +++ b/packages/node-sdk/test/session-prompt-events.test.ts @@ -12,6 +12,16 @@ import { createKimiHarness, type Event, type KimiHarness } from '#/index'; import { TEST_IDENTITY } from './test-identity'; +type FakeStreamScript = + | { readonly kind: 'text'; readonly text: string } + | { readonly kind: 'textThenAbort'; readonly text: string } + | { readonly kind: 'thinkThenAbort'; readonly think: string } + | { + readonly kind: 'thinkThenTextThenAbort'; + readonly think: string; + readonly text: string; + }; + const fakeProviderState = vi.hoisted(() => ({ calls: [] as Array<{ readonly systemPrompt: string; @@ -19,10 +29,23 @@ const fakeProviderState = vi.hoisted(() => ({ }>, providerConfigs: [] as unknown[], responseText: 'hello from fake provider', + scripts: [] as FakeStreamScript[], })); vi.mock('@moonshot-ai/kosong', async (importOriginal) => { const actual = await importOriginal(); + const waitForAbort = async (signal: AbortSignal | undefined): Promise => { + if (signal === undefined) { + throw new Error('Expected fake provider to receive an abort signal'); + } + if (signal.aborted) return; + await new Promise((resolve) => { + signal.addEventListener('abort', () => resolve(), { once: true }); + }); + }; + const throwAbortError = (): never => { + throw new DOMException('The operation was aborted.', 'AbortError'); + }; return { ...actual, createProvider: (config: unknown) => { @@ -31,8 +54,17 @@ vi.mock('@moonshot-ai/kosong', async (importOriginal) => { name: 'fake', modelName: 'fake-model', thinkingEffort: null, - async generate(systemPrompt: string, _tools: unknown, history: unknown) { + async generate( + systemPrompt: string, + _tools: unknown, + history: unknown, + options?: { readonly signal?: AbortSignal }, + ) { fakeProviderState.calls.push({ systemPrompt, history }); + const script = fakeProviderState.scripts.shift() ?? { + kind: 'text', + text: fakeProviderState.responseText, + }; return { id: 'fake-response', usage: { @@ -44,7 +76,31 @@ vi.mock('@moonshot-ai/kosong', async (importOriginal) => { finishReason: 'completed', rawFinishReason: 'stop', async *[Symbol.asyncIterator]() { - yield { type: 'text', text: fakeProviderState.responseText }; + switch (script.kind) { + case 'text': + yield { type: 'text', text: script.text }; + return; + case 'textThenAbort': + yield { type: 'text', text: script.text }; + await waitForAbort(options?.signal); + throwAbortError(); + return; + case 'thinkThenAbort': + yield { type: 'think', think: script.think }; + await waitForAbort(options?.signal); + throwAbortError(); + return; + case 'thinkThenTextThenAbort': + yield { type: 'think', think: script.think }; + yield { type: 'text', text: script.text }; + await waitForAbort(options?.signal); + throwAbortError(); + return; + default: { + const _exhaustive: never = script; + return _exhaustive; + } + } }, }; }, @@ -62,6 +118,7 @@ beforeEach(() => { fakeProviderState.calls.length = 0; fakeProviderState.providerConfigs.length = 0; fakeProviderState.responseText = 'hello from fake provider'; + fakeProviderState.scripts.length = 0; }); afterEach(async () => { @@ -250,6 +307,191 @@ describe('Session.prompt events', () => { } }); + it('resumes visible text streamed before cancel without resuming thinking-only deltas', async () => { + const homeDir = await makeTempDir(); + const textWorkDir = await makeTempDir(); + const thinkWorkDir = await makeTempDir(); + const mixedWorkDir = await makeTempDir(); + const harness = createKimiHarness({ + identity: TEST_IDENTITY, + homeDir, + }); + + try { + await configureFakeProvider(harness); + + fakeProviderState.scripts.push({ + kind: 'textThenAbort', + text: 'Partial answer before cancel.', + }); + const textSession = await harness.createSession({ + id: 'ses_prompt_resume_cancel_text', + workDir: textWorkDir, + }); + const textDelta = waitForEvent( + textSession, + (event) => + event.type === 'assistant.delta' && + event.delta === 'Partial answer before cancel.', + ); + const textEnded = waitForEvent(textSession, (event) => event.type === 'turn.ended'); + + await textSession.prompt('Start streaming'); + await textDelta; + await textSession.cancel(); + await expect(textEnded).resolves.toMatchObject({ + type: 'turn.ended', + reason: 'cancelled', + }); + await textSession.close(); + + const resumedText = await harness.resumeSession({ id: textSession.id }); + fakeProviderState.scripts.push({ + kind: 'text', + text: 'Fresh response after text resume.', + }); + const resumedTextEnded = waitForEvent( + resumedText, + (event) => event.type === 'turn.ended', + ); + await resumedText.prompt('Follow up'); + await resumedTextEnded; + + expect(fakeProviderState.calls.at(-1)?.history).toMatchObject([ + { + role: 'user', + content: [{ type: 'text', text: 'Start streaming' }], + toolCalls: [], + }, + { + role: 'assistant', + content: [{ type: 'text', text: 'Partial answer before cancel.' }], + toolCalls: [], + }, + { + role: 'user', + content: [{ type: 'text', text: 'Follow up' }], + toolCalls: [], + }, + ]); + await resumedText.close(); + + fakeProviderState.scripts.push({ + kind: 'thinkThenAbort', + think: 'Partial reasoning before cancel.', + }); + const thinkSession = await harness.createSession({ + id: 'ses_prompt_resume_cancel_think', + workDir: thinkWorkDir, + }); + const thinkDelta = waitForEvent( + thinkSession, + (event) => + event.type === 'thinking.delta' && + event.delta === 'Partial reasoning before cancel.', + ); + const thinkEnded = waitForEvent(thinkSession, (event) => event.type === 'turn.ended'); + + await thinkSession.prompt('Start thinking'); + await thinkDelta; + await thinkSession.cancel(); + await expect(thinkEnded).resolves.toMatchObject({ + type: 'turn.ended', + reason: 'cancelled', + }); + await thinkSession.close(); + + const resumedThink = await harness.resumeSession({ id: thinkSession.id }); + fakeProviderState.scripts.push({ + kind: 'text', + text: 'Fresh response after thinking resume.', + }); + const resumedThinkEnded = waitForEvent( + resumedThink, + (event) => event.type === 'turn.ended', + ); + await resumedThink.prompt('Follow up'); + await resumedThinkEnded; + + expect(fakeProviderState.calls.at(-1)?.history).toMatchObject([ + { + role: 'user', + content: [{ type: 'text', text: 'Start thinking\n\nFollow up' }], + toolCalls: [], + }, + ]); + await resumedThink.close(); + + fakeProviderState.scripts.push({ + kind: 'thinkThenTextThenAbort', + think: 'Completed reasoning before answer.', + text: 'Partial answer after reasoning.', + }); + const mixedSession = await harness.createSession({ + id: 'ses_prompt_resume_cancel_mixed', + workDir: mixedWorkDir, + }); + const mixedThinkDelta = waitForEvent( + mixedSession, + (event) => + event.type === 'thinking.delta' && + event.delta === 'Completed reasoning before answer.', + ); + const mixedTextDelta = waitForEvent( + mixedSession, + (event) => + event.type === 'assistant.delta' && + event.delta === 'Partial answer after reasoning.', + ); + const mixedEnded = waitForEvent(mixedSession, (event) => event.type === 'turn.ended'); + + await mixedSession.prompt('Start mixed streaming'); + await mixedThinkDelta; + await mixedTextDelta; + await mixedSession.cancel(); + await expect(mixedEnded).resolves.toMatchObject({ + type: 'turn.ended', + reason: 'cancelled', + }); + await mixedSession.close(); + + const resumedMixed = await harness.resumeSession({ id: mixedSession.id }); + fakeProviderState.scripts.push({ + kind: 'text', + text: 'Fresh response after mixed resume.', + }); + const resumedMixedEnded = waitForEvent( + resumedMixed, + (event) => event.type === 'turn.ended', + ); + await resumedMixed.prompt('Follow up'); + await resumedMixedEnded; + + expect(fakeProviderState.calls.at(-1)?.history).toMatchObject([ + { + role: 'user', + content: [{ type: 'text', text: 'Start mixed streaming' }], + toolCalls: [], + }, + { + role: 'assistant', + content: [ + { type: 'think', think: 'Completed reasoning before answer.' }, + { type: 'text', text: 'Partial answer after reasoning.' }, + ], + toolCalls: [], + }, + { + role: 'user', + content: [{ type: 'text', text: 'Follow up' }], + toolCalls: [], + }, + ]); + } finally { + await harness.close(); + } + }); + it('runs init through generateAgentsMd RPC as a subagent system trigger without prompt metadata updates', async () => { const homeDir = await makeTempDir(); const workDir = await makeTempDir();