From 4364fe2f6db542d7f0a8a3fb675c4cfd4737ec63 Mon Sep 17 00:00:00 2001 From: _Kerman Date: Tue, 16 Jun 2026 19:21:34 +0800 Subject: [PATCH 1/6] fix: preserve interrupted assistant text --- .../preserve-interrupted-assistant-text.md | 6 + docs/en/guides/interaction.md | 2 +- docs/zh/guides/interaction.md | 2 +- packages/agent-core/src/loop/turn-step.ts | 138 ++++++++++----- .../test/loop/streaming.e2e.test.ts | 124 +++++++++++++ .../test/session-prompt-events.test.ts | 167 +++++++++++++++++- 6 files changed, 388 insertions(+), 51 deletions(-) create mode 100644 .changeset/preserve-interrupted-assistant-text.md diff --git a/.changeset/preserve-interrupted-assistant-text.md b/.changeset/preserve-interrupted-assistant-text.md new file mode 100644 index 000000000..eeae74ed2 --- /dev/null +++ b/.changeset/preserve-interrupted-assistant-text.md @@ -0,0 +1,6 @@ +--- +"@moonshot-ai/agent-core": patch +"@moonshot-ai/kimi-code": patch +--- + +Preserve visible assistant text streamed before an interrupted turn. diff --git a/docs/en/guides/interaction.md b/docs/en/guides/interaction.md index d43448bad..80e886cca 100644 --- a/docs/en/guides/interaction.md +++ b/docs/en/guides/interaction.md @@ -6,7 +6,7 @@ Kimi Code CLI runs as an interactive TUI (terminal user interface) built around The input box accepts free-form text. Press `Enter` to send, or `Shift-Enter` / `Ctrl-J` to insert a newline. When the input box is empty, press `↑` / `↓` to browse the input history for the current working directory. -**Exiting the CLI**: press `Ctrl-D` with the input box empty, press `Ctrl-C` twice while idle, or type `/exit`. Pressing `Ctrl-C` or `Esc` during streaming output interrupts the current turn — it does not exit the program. +**Exiting the CLI**: press `Ctrl-D` with the input box empty, press `Ctrl-C` twice while idle, or type `/exit`. Pressing `Ctrl-C` or `Esc` during streaming output interrupts the current turn — it does not exit the program. If an interrupted turn has already printed visible assistant text, Kimi Code keeps that text in the session history so it remains available after resume; in-progress Thinking output is not saved. ## Pasting images and video diff --git a/docs/zh/guides/interaction.md b/docs/zh/guides/interaction.md index 445f2c560..32b0171c0 100644 --- a/docs/zh/guides/interaction.md +++ b/docs/zh/guides/interaction.md @@ -6,7 +6,7 @@ Kimi Code CLI 以交互式 TUI 运行,核心由输入框、对话视图和状 输入框接受自由文本:`Enter` 发送,`Shift-Enter` 或 `Ctrl-J` 插入换行。输入框为空时按 `↑` / `↓` 浏览当前工作目录的历史输入。 -**退出 CLI**:输入框为空时按 `Ctrl-D`,或空闲状态下连按 `Ctrl-C` 两次,或输入 `/exit`。流式输出期间按 `Ctrl-C` 或 `Esc` 是中断当前轮次,不会退出程序。 +**退出 CLI**:输入框为空时按 `Ctrl-D`,或空闲状态下连按 `Ctrl-C` 两次,或输入 `/exit`。流式输出期间按 `Ctrl-C` 或 `Esc` 是中断当前轮次,不会退出程序。如果被中断的轮次已经输出了可见的 Assistant 文本,Kimi Code 会把这些文本保存在会话历史中,恢复会话后仍可作为上下文;尚未完成的 Thinking 输出不会保存。 ## 粘贴图片与视频 diff --git a/packages/agent-core/src/loop/turn-step.ts b/packages/agent-core/src/loop/turn-step.ts index b06cd67df..8f7a39286 100644 --- a/packages/agent-core/src/loop/turn-step.ts +++ b/packages/agent-core/src/loop/turn-step.ts @@ -13,6 +13,7 @@ import type { TokenUsage } from '@moonshot-ai/kosong'; import type { Logger } from '#/logging/types'; import type { LoopEventDispatcher } from './events'; +import { isAbortError } from './errors'; import type { LLM, LLMChatParams, LLMChatResponse } from './llm'; import { chatWithRetry } from './retry'; import { runToolCallBatch, type ToolCallStepContext } from './tool-call'; @@ -99,27 +100,37 @@ export async function executeLoopStep(deps: ExecuteLoopStepDeps): Promise<{ step: currentStep, }); + const streamingCallbacks = createChatStreamingCallbacks({ + dispatchEvent, + turnId, + currentStep, + stepUuid, + }); const chatParams: LLMChatParams = { messages, tools: tools ?? [], signal, - ...createChatStreamingCallbacks({ + ...streamingCallbacks.callbacks, + }; + let response: LLMChatResponse; + try { + response = await chatWithRetry({ + llm, + params: chatParams, dispatchEvent, turnId, currentStep, stepUuid, - }), - }; - const response: LLMChatResponse = await chatWithRetry({ - llm, - params: chatParams, - dispatchEvent, - turnId, - currentStep, - stepUuid, - maxAttempts: maxRetryAttempts, - log, - }); + maxAttempts: maxRetryAttempts, + log, + }); + } catch (error) { + if (signal.aborted || isAbortError(error)) { + await streamingCallbacks.flushBufferedTextPart(); + } + throw error; + } + streamingCallbacks.clearBufferedTextPart(); const usage = response.usage; const usageResult = await recordUsage(usage); const stopTurnAfterUsage = usageResult?.stopTurn === true; @@ -223,43 +234,76 @@ function createChatStreamingCallbacks(deps: { readonly turnId: string; readonly currentStep: number; readonly stepUuid: string; -}): ChatStreamingCallbacks { +}) { const { dispatchEvent, turnId, currentStep, stepUuid } = deps; + let bufferedText = ''; + + const flushBufferedTextPart = async (): Promise => { + const text = bufferedText; + if (text.length === 0) return; + bufferedText = ''; + await dispatchEvent({ + type: 'content.part', + uuid: randomUUID(), + turnId, + step: currentStep, + stepUuid, + part: { type: 'text', text }, + }); + }; + + const markTextPartRecorded = (text: string): void => { + if (text.length === 0 || bufferedText.length === 0) return; + if (bufferedText.startsWith(text)) { + bufferedText = bufferedText.slice(text.length); + return; + } + bufferedText = ''; + }; + return { - onTextDelta: (delta) => { - dispatchEvent({ type: 'text.delta', delta }); - }, - onThinkDelta: (delta) => { - dispatchEvent({ type: 'thinking.delta', delta }); - }, - onToolCallDelta: (delta) => { - dispatchEvent({ - type: 'tool.call.delta', - toolCallId: delta.toolCallId, - name: delta.name, - argumentsPart: delta.argumentsPart, - }); - }, - onTextPart: async (part) => { - await dispatchEvent({ - type: 'content.part', - uuid: randomUUID(), - turnId, - step: currentStep, - stepUuid, - part, - }); - }, - onThinkPart: async (part) => { - await dispatchEvent({ - type: 'content.part', - uuid: randomUUID(), - turnId, - step: currentStep, - stepUuid, - part, - }); + callbacks: { + onTextDelta: (delta) => { + bufferedText += delta; + dispatchEvent({ type: 'text.delta', delta }); + }, + onThinkDelta: (delta) => { + dispatchEvent({ type: 'thinking.delta', delta }); + }, + onToolCallDelta: (delta) => { + dispatchEvent({ + type: 'tool.call.delta', + toolCallId: delta.toolCallId, + name: delta.name, + argumentsPart: delta.argumentsPart, + }); + }, + onTextPart: async (part) => { + markTextPartRecorded(part.text); + await dispatchEvent({ + type: 'content.part', + uuid: randomUUID(), + turnId, + step: currentStep, + stepUuid, + part, + }); + }, + onThinkPart: async (part) => { + await dispatchEvent({ + type: 'content.part', + uuid: randomUUID(), + turnId, + step: currentStep, + stepUuid, + part, + }); + }, + } satisfies ChatStreamingCallbacks, + flushBufferedTextPart, + clearBufferedTextPart: () => { + bufferedText = ''; }, }; } diff --git a/packages/agent-core/test/loop/streaming.e2e.test.ts b/packages/agent-core/test/loop/streaming.e2e.test.ts index d2bea7267..7b7ba6981 100644 --- a/packages/agent-core/test/loop/streaming.e2e.test.ts +++ b/packages/agent-core/test/loop/streaming.e2e.test.ts @@ -54,6 +54,12 @@ async function runWithLLM(llm: LLM): Promise<{ return { sink, context }; } +function abortError(): Error { + const error = new Error('aborted'); + error.name = 'AbortError'; + return error; +} + describe('runTurn — streaming callbacks', () => { it('routes onTextDelta into text.delta events', async () => { const llm = new StreamingLLM(async (params) => { @@ -70,6 +76,124 @@ describe('runTurn — streaming callbacks', () => { expect(deltas).toEqual(['hel', 'lo']); }); + it('persists buffered text deltas as content when a step is aborted', async () => { + const controller = new AbortController(); + const llm = new StreamingLLM(async (params) => { + params.onTextDelta?.('partial '); + params.onTextDelta?.('answer'); + controller.abort(); + throw abortError(); + }); + const sink = new CollectingSink(); + const context = new RecordingContext(); + const result = await runTurn({ + turnId: 'turn-1', + signal: controller.signal, + llm, + buildMessages: context.buildMessages, + dispatchEvent: createLoopEventDispatcher({ + appendTranscriptRecord: context.appendTranscriptRecord, + emitLiveEvent: sink.emit, + }), + }); + + expect(result.stopReason).toBe('aborted'); + expect(sink.byType('text.delta').map((e) => e.delta)).toEqual([ + 'partial ', + 'answer', + ]); + expect(context.contentParts().map((e) => e.part)).toEqual([ + { type: 'text', text: 'partial answer' }, + ]); + expect(context.stepEnds()).toEqual([]); + }); + + it('does not persist thinking deltas when a step is aborted', async () => { + const controller = new AbortController(); + const llm = new StreamingLLM(async (params) => { + params.onThinkDelta?.('partial reasoning'); + controller.abort(); + throw abortError(); + }); + const sink = new CollectingSink(); + const context = new RecordingContext(); + const result = await runTurn({ + turnId: 'turn-1', + signal: controller.signal, + llm, + buildMessages: context.buildMessages, + dispatchEvent: createLoopEventDispatcher({ + appendTranscriptRecord: context.appendTranscriptRecord, + emitLiveEvent: sink.emit, + }), + }); + + expect(result.stopReason).toBe('aborted'); + expect(sink.byType('thinking.delta').map((e) => e.delta)).toEqual([ + 'partial reasoning', + ]); + expect(context.contentParts()).toEqual([]); + expect(context.stepEnds()).toEqual([]); + }); + + it('does not persist buffered text deltas when a step fails without aborting', async () => { + const llm = new StreamingLLM(async (params) => { + params.onTextDelta?.('partial answer'); + throw new Error('provider failed'); + }); + const sink = new CollectingSink(); + const context = new RecordingContext(); + + await expect( + runTurn({ + turnId: 'turn-1', + signal: new AbortController().signal, + llm, + buildMessages: context.buildMessages, + dispatchEvent: createLoopEventDispatcher({ + appendTranscriptRecord: context.appendTranscriptRecord, + emitLiveEvent: sink.emit, + }), + }), + ).rejects.toThrow('provider failed'); + + expect(sink.byType('text.delta').map((e) => e.delta)).toEqual(['partial answer']); + expect(context.contentParts()).toEqual([]); + expect(context.stepEnds()).toEqual([]); + }); + + it('does not duplicate buffered text after an emitted text part is recorded', async () => { + const controller = new AbortController(); + const llm = new StreamingLLM(async (params) => { + params.onTextDelta?.('complete'); + await params.onTextPart?.({ + type: 'text', + text: 'complete', + }); + controller.abort(); + throw abortError(); + }); + const sink = new CollectingSink(); + const context = new RecordingContext(); + const result = await runTurn({ + turnId: 'turn-1', + signal: controller.signal, + llm, + buildMessages: context.buildMessages, + dispatchEvent: createLoopEventDispatcher({ + appendTranscriptRecord: context.appendTranscriptRecord, + emitLiveEvent: sink.emit, + }), + }); + + expect(result.stopReason).toBe('aborted'); + expect(sink.byType('text.delta').map((e) => e.delta)).toEqual(['complete']); + expect(context.contentParts().map((e) => e.part)).toEqual([ + { type: 'text', text: 'complete' }, + ]); + expect(context.stepEnds()).toEqual([]); + }); + it('routes onThinkDelta into thinking.delta events', async () => { const llm = new StreamingLLM(async (params) => { params.onThinkDelta?.('think...'); diff --git a/packages/node-sdk/test/session-prompt-events.test.ts b/packages/node-sdk/test/session-prompt-events.test.ts index 6845ee6bc..a315dab57 100644 --- a/packages/node-sdk/test/session-prompt-events.test.ts +++ b/packages/node-sdk/test/session-prompt-events.test.ts @@ -12,6 +12,11 @@ import { createKimiHarness, type Event, type KimiHarness } from '#/index'; import { TEST_IDENTITY } from './test-identity'; +type FakeStreamScript = + | { readonly kind: 'text'; readonly text: string } + | { readonly kind: 'textThenAbort'; readonly text: string } + | { readonly kind: 'thinkThenAbort'; readonly think: string }; + const fakeProviderState = vi.hoisted(() => ({ calls: [] as Array<{ readonly systemPrompt: string; @@ -19,10 +24,23 @@ const fakeProviderState = vi.hoisted(() => ({ }>, providerConfigs: [] as unknown[], responseText: 'hello from fake provider', + scripts: [] as FakeStreamScript[], })); vi.mock('@moonshot-ai/kosong', async (importOriginal) => { const actual = await importOriginal(); + const waitForAbort = async (signal: AbortSignal | undefined): Promise => { + if (signal === undefined) { + throw new Error('Expected fake provider to receive an abort signal'); + } + if (signal.aborted) return; + await new Promise((resolve) => { + signal.addEventListener('abort', () => resolve(), { once: true }); + }); + }; + const throwAbortError = (): never => { + throw new DOMException('The operation was aborted.', 'AbortError'); + }; return { ...actual, createProvider: (config: unknown) => { @@ -31,8 +49,17 @@ vi.mock('@moonshot-ai/kosong', async (importOriginal) => { name: 'fake', modelName: 'fake-model', thinkingEffort: null, - async generate(systemPrompt: string, _tools: unknown, history: unknown) { + async generate( + systemPrompt: string, + _tools: unknown, + history: unknown, + options?: { readonly signal?: AbortSignal }, + ) { fakeProviderState.calls.push({ systemPrompt, history }); + const script = fakeProviderState.scripts.shift() ?? { + kind: 'text', + text: fakeProviderState.responseText, + }; return { id: 'fake-response', usage: { @@ -44,7 +71,25 @@ vi.mock('@moonshot-ai/kosong', async (importOriginal) => { finishReason: 'completed', rawFinishReason: 'stop', async *[Symbol.asyncIterator]() { - yield { type: 'text', text: fakeProviderState.responseText }; + switch (script.kind) { + case 'text': + yield { type: 'text', text: script.text }; + return; + case 'textThenAbort': + yield { type: 'text', text: script.text }; + await waitForAbort(options?.signal); + throwAbortError(); + return; + case 'thinkThenAbort': + yield { type: 'think', think: script.think }; + await waitForAbort(options?.signal); + throwAbortError(); + return; + default: { + const _exhaustive: never = script; + return _exhaustive; + } + } }, }; }, @@ -62,6 +107,7 @@ beforeEach(() => { fakeProviderState.calls.length = 0; fakeProviderState.providerConfigs.length = 0; fakeProviderState.responseText = 'hello from fake provider'; + fakeProviderState.scripts.length = 0; }); afterEach(async () => { @@ -250,6 +296,123 @@ describe('Session.prompt events', () => { } }); + it('resumes visible text streamed before cancel without resuming thinking-only deltas', async () => { + const homeDir = await makeTempDir(); + const textWorkDir = await makeTempDir(); + const thinkWorkDir = await makeTempDir(); + const harness = createKimiHarness({ + identity: TEST_IDENTITY, + homeDir, + }); + + try { + await configureFakeProvider(harness); + + fakeProviderState.scripts.push({ + kind: 'textThenAbort', + text: 'Partial answer before cancel.', + }); + const textSession = await harness.createSession({ + id: 'ses_prompt_resume_cancel_text', + workDir: textWorkDir, + }); + const textDelta = waitForEvent( + textSession, + (event) => + event.type === 'assistant.delta' && + event.delta === 'Partial answer before cancel.', + ); + const textEnded = waitForEvent(textSession, (event) => event.type === 'turn.ended'); + + await textSession.prompt('Start streaming'); + await textDelta; + await textSession.cancel(); + await expect(textEnded).resolves.toMatchObject({ + type: 'turn.ended', + reason: 'cancelled', + }); + await textSession.close(); + + const resumedText = await harness.resumeSession({ id: textSession.id }); + fakeProviderState.scripts.push({ + kind: 'text', + text: 'Fresh response after text resume.', + }); + const resumedTextEnded = waitForEvent( + resumedText, + (event) => event.type === 'turn.ended', + ); + await resumedText.prompt('Follow up'); + await resumedTextEnded; + + expect(fakeProviderState.calls.at(-1)?.history).toMatchObject([ + { + role: 'user', + content: [{ type: 'text', text: 'Start streaming' }], + toolCalls: [], + }, + { + role: 'assistant', + content: [{ type: 'text', text: 'Partial answer before cancel.' }], + toolCalls: [], + }, + { + role: 'user', + content: [{ type: 'text', text: 'Follow up' }], + toolCalls: [], + }, + ]); + await resumedText.close(); + + fakeProviderState.scripts.push({ + kind: 'thinkThenAbort', + think: 'Partial reasoning before cancel.', + }); + const thinkSession = await harness.createSession({ + id: 'ses_prompt_resume_cancel_think', + workDir: thinkWorkDir, + }); + const thinkDelta = waitForEvent( + thinkSession, + (event) => + event.type === 'thinking.delta' && + event.delta === 'Partial reasoning before cancel.', + ); + const thinkEnded = waitForEvent(thinkSession, (event) => event.type === 'turn.ended'); + + await thinkSession.prompt('Start thinking'); + await thinkDelta; + await thinkSession.cancel(); + await expect(thinkEnded).resolves.toMatchObject({ + type: 'turn.ended', + reason: 'cancelled', + }); + await thinkSession.close(); + + const resumedThink = await harness.resumeSession({ id: thinkSession.id }); + fakeProviderState.scripts.push({ + kind: 'text', + text: 'Fresh response after thinking resume.', + }); + const resumedThinkEnded = waitForEvent( + resumedThink, + (event) => event.type === 'turn.ended', + ); + await resumedThink.prompt('Follow up'); + await resumedThinkEnded; + + expect(fakeProviderState.calls.at(-1)?.history).toMatchObject([ + { + role: 'user', + content: [{ type: 'text', text: 'Start thinking\n\nFollow up' }], + toolCalls: [], + }, + ]); + } finally { + await harness.close(); + } + }); + it('runs init through generateAgentsMd RPC as a subagent system trigger without prompt metadata updates', async () => { const homeDir = await makeTempDir(); const workDir = await makeTempDir(); From 93be95576e856acb6651f5bfe52f95d47ae2f923 Mon Sep 17 00:00:00 2001 From: _Kerman Date: Tue, 16 Jun 2026 20:01:29 +0800 Subject: [PATCH 2/6] fix: preserve interrupted thinking with text --- packages/agent-core/src/loop/turn-step.ts | 40 +++++---- .../test/loop/streaming.e2e.test.ts | 35 ++++++++ .../test/session-prompt-events.test.ts | 81 ++++++++++++++++++- 3 files changed, 138 insertions(+), 18 deletions(-) diff --git a/packages/agent-core/src/loop/turn-step.ts b/packages/agent-core/src/loop/turn-step.ts index 8f7a39286..7d78742a3 100644 --- a/packages/agent-core/src/loop/turn-step.ts +++ b/packages/agent-core/src/loop/turn-step.ts @@ -126,11 +126,10 @@ export async function executeLoopStep(deps: ExecuteLoopStepDeps): Promise<{ }); } catch (error) { if (signal.aborted || isAbortError(error)) { - await streamingCallbacks.flushBufferedTextPart(); + await streamingCallbacks.flushOnAbort(); } throw error; } - streamingCallbacks.clearBufferedTextPart(); const usage = response.usage; const usageResult = await recordUsage(usage); const stopTurnAfterUsage = usageResult?.stopTurn === true; @@ -238,11 +237,26 @@ function createChatStreamingCallbacks(deps: { const { dispatchEvent, turnId, currentStep, stepUuid } = deps; let bufferedText = ''; + let bufferedThink = ''; - const flushBufferedTextPart = async (): Promise => { + const clearBuffer = (): void => { + bufferedText = ''; + bufferedThink = ''; + }; + + const flushOnAbort = async (): Promise => { const text = bufferedText; if (text.length === 0) return; - bufferedText = ''; + if (bufferedThink.length > 0) { + await dispatchEvent({ + type: 'content.part', + uuid: randomUUID(), + turnId, + step: currentStep, + stepUuid, + part: { type: 'think', think: bufferedThink }, + }); + } await dispatchEvent({ type: 'content.part', uuid: randomUUID(), @@ -251,16 +265,9 @@ function createChatStreamingCallbacks(deps: { stepUuid, part: { type: 'text', text }, }); + clearBuffer(); }; - const markTextPartRecorded = (text: string): void => { - if (text.length === 0 || bufferedText.length === 0) return; - if (bufferedText.startsWith(text)) { - bufferedText = bufferedText.slice(text.length); - return; - } - bufferedText = ''; - }; return { callbacks: { @@ -269,6 +276,7 @@ function createChatStreamingCallbacks(deps: { dispatchEvent({ type: 'text.delta', delta }); }, onThinkDelta: (delta) => { + bufferedThink += delta; dispatchEvent({ type: 'thinking.delta', delta }); }, onToolCallDelta: (delta) => { @@ -280,7 +288,7 @@ function createChatStreamingCallbacks(deps: { }); }, onTextPart: async (part) => { - markTextPartRecorded(part.text); + clearBuffer(); await dispatchEvent({ type: 'content.part', uuid: randomUUID(), @@ -291,6 +299,7 @@ function createChatStreamingCallbacks(deps: { }); }, onThinkPart: async (part) => { + clearBuffer(); await dispatchEvent({ type: 'content.part', uuid: randomUUID(), @@ -301,9 +310,6 @@ function createChatStreamingCallbacks(deps: { }); }, } satisfies ChatStreamingCallbacks, - flushBufferedTextPart, - clearBufferedTextPart: () => { - bufferedText = ''; - }, + flushOnAbort, }; } diff --git a/packages/agent-core/test/loop/streaming.e2e.test.ts b/packages/agent-core/test/loop/streaming.e2e.test.ts index 7b7ba6981..73c55581f 100644 --- a/packages/agent-core/test/loop/streaming.e2e.test.ts +++ b/packages/agent-core/test/loop/streaming.e2e.test.ts @@ -136,6 +136,41 @@ describe('runTurn — streaming callbacks', () => { expect(context.stepEnds()).toEqual([]); }); + it('persists buffered thinking before text when a step is aborted after visible text starts', async () => { + const controller = new AbortController(); + const llm = new StreamingLLM(async (params) => { + params.onThinkDelta?.('complete reasoning'); + params.onTextDelta?.('partial answer'); + controller.abort(); + throw abortError(); + }); + const sink = new CollectingSink(); + const context = new RecordingContext(); + const result = await runTurn({ + turnId: 'turn-1', + signal: controller.signal, + llm, + buildMessages: context.buildMessages, + dispatchEvent: createLoopEventDispatcher({ + appendTranscriptRecord: context.appendTranscriptRecord, + emitLiveEvent: sink.emit, + }), + }); + + expect(result.stopReason).toBe('aborted'); + expect(sink.byType('thinking.delta').map((e) => e.delta)).toEqual([ + 'complete reasoning', + ]); + expect(sink.byType('text.delta').map((e) => e.delta)).toEqual([ + 'partial answer', + ]); + expect(context.contentParts().map((e) => e.part)).toEqual([ + { type: 'think', think: 'complete reasoning' }, + { type: 'text', text: 'partial answer' }, + ]); + expect(context.stepEnds()).toEqual([]); + }); + it('does not persist buffered text deltas when a step fails without aborting', async () => { const llm = new StreamingLLM(async (params) => { params.onTextDelta?.('partial answer'); diff --git a/packages/node-sdk/test/session-prompt-events.test.ts b/packages/node-sdk/test/session-prompt-events.test.ts index a315dab57..25c690100 100644 --- a/packages/node-sdk/test/session-prompt-events.test.ts +++ b/packages/node-sdk/test/session-prompt-events.test.ts @@ -15,7 +15,12 @@ import { TEST_IDENTITY } from './test-identity'; type FakeStreamScript = | { readonly kind: 'text'; readonly text: string } | { readonly kind: 'textThenAbort'; readonly text: string } - | { readonly kind: 'thinkThenAbort'; readonly think: string }; + | { readonly kind: 'thinkThenAbort'; readonly think: string } + | { + readonly kind: 'thinkThenTextThenAbort'; + readonly think: string; + readonly text: string; + }; const fakeProviderState = vi.hoisted(() => ({ calls: [] as Array<{ @@ -85,6 +90,12 @@ vi.mock('@moonshot-ai/kosong', async (importOriginal) => { await waitForAbort(options?.signal); throwAbortError(); return; + case 'thinkThenTextThenAbort': + yield { type: 'think', think: script.think }; + yield { type: 'text', text: script.text }; + await waitForAbort(options?.signal); + throwAbortError(); + return; default: { const _exhaustive: never = script; return _exhaustive; @@ -300,6 +311,7 @@ describe('Session.prompt events', () => { const homeDir = await makeTempDir(); const textWorkDir = await makeTempDir(); const thinkWorkDir = await makeTempDir(); + const mixedWorkDir = await makeTempDir(); const harness = createKimiHarness({ identity: TEST_IDENTITY, homeDir, @@ -408,6 +420,73 @@ describe('Session.prompt events', () => { toolCalls: [], }, ]); + await resumedThink.close(); + + fakeProviderState.scripts.push({ + kind: 'thinkThenTextThenAbort', + think: 'Completed reasoning before answer.', + text: 'Partial answer after reasoning.', + }); + const mixedSession = await harness.createSession({ + id: 'ses_prompt_resume_cancel_mixed', + workDir: mixedWorkDir, + }); + const mixedThinkDelta = waitForEvent( + mixedSession, + (event) => + event.type === 'thinking.delta' && + event.delta === 'Completed reasoning before answer.', + ); + const mixedTextDelta = waitForEvent( + mixedSession, + (event) => + event.type === 'assistant.delta' && + event.delta === 'Partial answer after reasoning.', + ); + const mixedEnded = waitForEvent(mixedSession, (event) => event.type === 'turn.ended'); + + await mixedSession.prompt('Start mixed streaming'); + await mixedThinkDelta; + await mixedTextDelta; + await mixedSession.cancel(); + await expect(mixedEnded).resolves.toMatchObject({ + type: 'turn.ended', + reason: 'cancelled', + }); + await mixedSession.close(); + + const resumedMixed = await harness.resumeSession({ id: mixedSession.id }); + fakeProviderState.scripts.push({ + kind: 'text', + text: 'Fresh response after mixed resume.', + }); + const resumedMixedEnded = waitForEvent( + resumedMixed, + (event) => event.type === 'turn.ended', + ); + await resumedMixed.prompt('Follow up'); + await resumedMixedEnded; + + expect(fakeProviderState.calls.at(-1)?.history).toMatchObject([ + { + role: 'user', + content: [{ type: 'text', text: 'Start mixed streaming' }], + toolCalls: [], + }, + { + role: 'assistant', + content: [ + { type: 'think', think: 'Completed reasoning before answer.' }, + { type: 'text', text: 'Partial answer after reasoning.' }, + ], + toolCalls: [], + }, + { + role: 'user', + content: [{ type: 'text', text: 'Follow up' }], + toolCalls: [], + }, + ]); } finally { await harness.close(); } From 65c21dbb61fb95b08689adc3fb571dd81120f795 Mon Sep 17 00:00:00 2001 From: _Kerman Date: Tue, 16 Jun 2026 20:06:55 +0800 Subject: [PATCH 3/6] docs: revert interrupted assistant guidance --- docs/en/guides/interaction.md | 2 +- docs/zh/guides/interaction.md | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/en/guides/interaction.md b/docs/en/guides/interaction.md index 80e886cca..d43448bad 100644 --- a/docs/en/guides/interaction.md +++ b/docs/en/guides/interaction.md @@ -6,7 +6,7 @@ Kimi Code CLI runs as an interactive TUI (terminal user interface) built around The input box accepts free-form text. Press `Enter` to send, or `Shift-Enter` / `Ctrl-J` to insert a newline. When the input box is empty, press `↑` / `↓` to browse the input history for the current working directory. -**Exiting the CLI**: press `Ctrl-D` with the input box empty, press `Ctrl-C` twice while idle, or type `/exit`. Pressing `Ctrl-C` or `Esc` during streaming output interrupts the current turn — it does not exit the program. If an interrupted turn has already printed visible assistant text, Kimi Code keeps that text in the session history so it remains available after resume; in-progress Thinking output is not saved. +**Exiting the CLI**: press `Ctrl-D` with the input box empty, press `Ctrl-C` twice while idle, or type `/exit`. Pressing `Ctrl-C` or `Esc` during streaming output interrupts the current turn — it does not exit the program. ## Pasting images and video diff --git a/docs/zh/guides/interaction.md b/docs/zh/guides/interaction.md index 32b0171c0..445f2c560 100644 --- a/docs/zh/guides/interaction.md +++ b/docs/zh/guides/interaction.md @@ -6,7 +6,7 @@ Kimi Code CLI 以交互式 TUI 运行,核心由输入框、对话视图和状 输入框接受自由文本:`Enter` 发送,`Shift-Enter` 或 `Ctrl-J` 插入换行。输入框为空时按 `↑` / `↓` 浏览当前工作目录的历史输入。 -**退出 CLI**:输入框为空时按 `Ctrl-D`,或空闲状态下连按 `Ctrl-C` 两次,或输入 `/exit`。流式输出期间按 `Ctrl-C` 或 `Esc` 是中断当前轮次,不会退出程序。如果被中断的轮次已经输出了可见的 Assistant 文本,Kimi Code 会把这些文本保存在会话历史中,恢复会话后仍可作为上下文;尚未完成的 Thinking 输出不会保存。 +**退出 CLI**:输入框为空时按 `Ctrl-D`,或空闲状态下连按 `Ctrl-C` 两次,或输入 `/exit`。流式输出期间按 `Ctrl-C` 或 `Esc` 是中断当前轮次,不会退出程序。 ## 粘贴图片与视频 From 3dbe1f0f8c495644d043d72c3ca94237fc70bb0b Mon Sep 17 00:00:00 2001 From: _Kerman Date: Tue, 16 Jun 2026 20:15:42 +0800 Subject: [PATCH 4/6] fix: reset streaming buffers before retry --- packages/agent-core/src/loop/retry.ts | 2 + packages/agent-core/src/loop/turn-step.ts | 2 + .../test/loop/streaming.e2e.test.ts | 50 +++++++++++++++++++ 3 files changed, 54 insertions(+) diff --git a/packages/agent-core/src/loop/retry.ts b/packages/agent-core/src/loop/retry.ts index b47d08297..735125e0b 100644 --- a/packages/agent-core/src/loop/retry.ts +++ b/packages/agent-core/src/loop/retry.ts @@ -22,6 +22,7 @@ export interface ChatWithRetryInput { readonly currentStep: number; readonly stepUuid: string; readonly maxAttempts?: number; + readonly onRetrying?: () => void; readonly log?: Logger | undefined; } @@ -51,6 +52,7 @@ export async function chatWithRetry(input: ChatWithRetryInput): Promise { expect(context.stepEnds()).toEqual([]); }); + it('drops buffered deltas from a failed retry attempt before abort flush', async () => { + const controller = new AbortController(); + const retryableError = new Error('retryable provider failure'); + let attempts = 0; + const llm: LLM = { + systemPrompt: 'streaming system prompt', + modelName: 'streaming', + isRetryableError: (error) => error === retryableError, + async chat(params) { + attempts += 1; + if (attempts === 1) { + params.onThinkDelta?.('discarded thinking'); + params.onTextDelta?.('discarded text '); + throw retryableError; + } + + params.onThinkDelta?.('kept thinking'); + params.onTextDelta?.('kept text'); + controller.abort(); + throw abortError(); + }, + }; + const sink = new CollectingSink(); + const context = new RecordingContext(); + const result = await runTurn({ + turnId: 'turn-1', + signal: controller.signal, + llm, + buildMessages: context.buildMessages, + dispatchEvent: createLoopEventDispatcher({ + appendTranscriptRecord: context.appendTranscriptRecord, + emitLiveEvent: sink.emit, + }), + maxRetryAttempts: 2, + }); + + expect(result.stopReason).toBe('aborted'); + expect(attempts).toBe(2); + expect(sink.byType('step.retrying')).toHaveLength(1); + expect(sink.byType('text.delta').map((e) => e.delta)).toEqual([ + 'discarded text ', + 'kept text', + ]); + expect(context.contentParts().map((e) => e.part)).toEqual([ + { type: 'think', think: 'kept thinking' }, + { type: 'text', text: 'kept text' }, + ]); + expect(context.stepEnds()).toEqual([]); + }); + it('does not duplicate buffered text after an emitted text part is recorded', async () => { const controller = new AbortController(); const llm = new StreamingLLM(async (params) => { From eea409dbf7ba5f71902240d080273acefcecfa70 Mon Sep 17 00:00:00 2001 From: _Kerman Date: Tue, 16 Jun 2026 21:03:36 +0800 Subject: [PATCH 5/6] fix --- .../agent-core/src/agent/turn/kosong-llm.ts | 2 +- packages/agent-core/src/loop/llm.ts | 7 ++- packages/agent-core/src/loop/turn-step.ts | 25 ++++++---- .../test/loop/api-shape.e2e.test.ts | 7 ++- .../test/loop/streaming.e2e.test.ts | 48 +++++++++++++++++++ 5 files changed, 78 insertions(+), 11 deletions(-) diff --git a/packages/agent-core/src/agent/turn/kosong-llm.ts b/packages/agent-core/src/agent/turn/kosong-llm.ts index b33614e43..c8e88d720 100644 --- a/packages/agent-core/src/agent/turn/kosong-llm.ts +++ b/packages/agent-core/src/agent/turn/kosong-llm.ts @@ -208,7 +208,7 @@ function buildKosongCallbacks( } if (part.type === 'think') { if (params.onThinkDelta === undefined) return; - params.onThinkDelta(part.think); + params.onThinkDelta(part.think, part); return; } if (part.type === 'function') { diff --git a/packages/agent-core/src/loop/llm.ts b/packages/agent-core/src/loop/llm.ts index 88bf716e7..4a4f9d469 100644 --- a/packages/agent-core/src/loop/llm.ts +++ b/packages/agent-core/src/loop/llm.ts @@ -42,7 +42,12 @@ export interface LLMChatParams { signal: AbortSignal; requestLogContext?: LLMRequestLogContext; onTextDelta?: ((delta: string) => void) | undefined; - onThinkDelta?: ((delta: string) => void) | undefined; + /** + * Streams visible thinking text for UI updates. Adapters may pass the + * matching raw `ThinkPart` as the second argument so abort recovery can keep + * provider metadata such as encrypted thinking signatures. + */ + onThinkDelta?: ((delta: string, part?: ThinkPart) => void) | undefined; onToolCallDelta?: ((delta: ToolCallDelta) => void) | undefined; /** * Fires once per completed text block. Additive relative to diff --git a/packages/agent-core/src/loop/turn-step.ts b/packages/agent-core/src/loop/turn-step.ts index 015947ccb..1fa7d21b9 100644 --- a/packages/agent-core/src/loop/turn-step.ts +++ b/packages/agent-core/src/loop/turn-step.ts @@ -9,7 +9,7 @@ import { randomUUID } from 'node:crypto'; -import type { TokenUsage } from '@moonshot-ai/kosong'; +import { mergeInPlace, type ThinkPart, type TokenUsage } from '@moonshot-ai/kosong'; import type { Logger } from '#/logging/types'; import type { LoopEventDispatcher } from './events'; @@ -238,24 +238,34 @@ function createChatStreamingCallbacks(deps: { const { dispatchEvent, turnId, currentStep, stepUuid } = deps; let bufferedText = ''; - let bufferedThink = ''; + const bufferedThinkParts: ThinkPart[] = []; const clearBuffer = (): void => { bufferedText = ''; - bufferedThink = ''; + bufferedThinkParts.length = 0; + }; + + const bufferThinkPart = (part: ThinkPart): void => { + if (part.think.length === 0 && part.encrypted === undefined) return; + + const next: ThinkPart = { ...part }; + const last = bufferedThinkParts.at(-1); + if (last === undefined || !mergeInPlace(last, next)) { + bufferedThinkParts.push(next); + } }; const flushOnAbort = async (): Promise => { const text = bufferedText; if (text.length === 0) return; - if (bufferedThink.length > 0) { + for (const part of bufferedThinkParts) { await dispatchEvent({ type: 'content.part', uuid: randomUUID(), turnId, step: currentStep, stepUuid, - part: { type: 'think', think: bufferedThink }, + part, }); } await dispatchEvent({ @@ -269,15 +279,14 @@ function createChatStreamingCallbacks(deps: { clearBuffer(); }; - return { callbacks: { onTextDelta: (delta) => { bufferedText += delta; dispatchEvent({ type: 'text.delta', delta }); }, - onThinkDelta: (delta) => { - bufferedThink += delta; + onThinkDelta: (delta, part) => { + bufferThinkPart(part ?? { type: 'think', think: delta }); dispatchEvent({ type: 'thinking.delta', delta }); }, onToolCallDelta: (delta) => { diff --git a/packages/agent-core/test/loop/api-shape.e2e.test.ts b/packages/agent-core/test/loop/api-shape.e2e.test.ts index be9afbe61..f68538cbd 100644 --- a/packages/agent-core/test/loop/api-shape.e2e.test.ts +++ b/packages/agent-core/test/loop/api-shape.e2e.test.ts @@ -109,7 +109,12 @@ function _typeOnlyChecks(): void { tools: [], signal: _signal, onTextDelta: (_delta: string) => {}, - onThinkDelta: (_delta: string) => {}, + onThinkDelta: (_delta: string, _part) => { + const thinking: string | undefined = _part?.think; + const encrypted: string | undefined = _part?.encrypted; + void thinking; + void encrypted; + }, onToolCallDelta: (_delta) => {}, onTextPart: async (_part) => { const text: string = _part.text; diff --git a/packages/agent-core/test/loop/streaming.e2e.test.ts b/packages/agent-core/test/loop/streaming.e2e.test.ts index 8ca983a59..f187e5fbd 100644 --- a/packages/agent-core/test/loop/streaming.e2e.test.ts +++ b/packages/agent-core/test/loop/streaming.e2e.test.ts @@ -171,6 +171,54 @@ describe('runTurn — streaming callbacks', () => { expect(context.stepEnds()).toEqual([]); }); + it('preserves buffered thinking signatures when a step is aborted after visible text starts', async () => { + const controller = new AbortController(); + const llm = new StreamingLLM(async (params) => { + params.onThinkDelta?.('complete reasoning', { + type: 'think', + think: 'complete reasoning', + }); + params.onThinkDelta?.('', { + type: 'think', + think: '', + encrypted: 'sig-abc', + }); + params.onTextDelta?.('partial answer'); + controller.abort(); + throw abortError(); + }); + const sink = new CollectingSink(); + const context = new RecordingContext(); + const result = await runTurn({ + turnId: 'turn-1', + signal: controller.signal, + llm, + buildMessages: context.buildMessages, + dispatchEvent: createLoopEventDispatcher({ + appendTranscriptRecord: context.appendTranscriptRecord, + emitLiveEvent: sink.emit, + }), + }); + + expect(result.stopReason).toBe('aborted'); + expect(sink.byType('thinking.delta').map((e) => e.delta)).toEqual([ + 'complete reasoning', + '', + ]); + expect(sink.byType('text.delta').map((e) => e.delta)).toEqual([ + 'partial answer', + ]); + expect(context.contentParts().map((e) => e.part)).toEqual([ + { + type: 'think', + think: 'complete reasoning', + encrypted: 'sig-abc', + }, + { type: 'text', text: 'partial answer' }, + ]); + expect(context.stepEnds()).toEqual([]); + }); + it('does not persist buffered text deltas when a step fails without aborting', async () => { const llm = new StreamingLLM(async (params) => { params.onTextDelta?.('partial answer'); From 400df7ea63497d32544d60e160a3c107e85223bf Mon Sep 17 00:00:00 2001 From: _Kerman Date: Tue, 16 Jun 2026 21:05:11 +0800 Subject: [PATCH 6/6] fix --- packages/agent-core/src/loop/turn-step.ts | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/packages/agent-core/src/loop/turn-step.ts b/packages/agent-core/src/loop/turn-step.ts index 1fa7d21b9..aec2e3c31 100644 --- a/packages/agent-core/src/loop/turn-step.ts +++ b/packages/agent-core/src/loop/turn-step.ts @@ -222,10 +222,8 @@ function stepEndProviderDiagnostics( } return { - ...(providerFinishReason !== undefined ? { providerFinishReason } : {}), - ...(response.rawFinishReason !== undefined - ? { rawFinishReason: response.rawFinishReason } - : {}), + providerFinishReason, + rawFinishReason: response.rawFinishReason, }; }