Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/preserve-interrupted-assistant-text.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"@moonshot-ai/agent-core": patch
"@moonshot-ai/kimi-code": patch
---

Preserve visible assistant text streamed before an interrupted turn.
2 changes: 1 addition & 1 deletion packages/agent-core/src/agent/turn/kosong-llm.ts
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ function buildKosongCallbacks(
}
if (part.type === 'think') {
if (params.onThinkDelta === undefined) return;
params.onThinkDelta(part.think);
params.onThinkDelta(part.think, part);
return;
}
if (part.type === 'function') {
Expand Down
7 changes: 6 additions & 1 deletion packages/agent-core/src/loop/llm.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,12 @@ export interface LLMChatParams {
signal: AbortSignal;
requestLogContext?: LLMRequestLogContext;
onTextDelta?: ((delta: string) => void) | undefined;
onThinkDelta?: ((delta: string) => void) | undefined;
/**
* Streams visible thinking text for UI updates. Adapters may pass the
* matching raw `ThinkPart` as the second argument so abort recovery can keep
* provider metadata such as encrypted thinking signatures.
*/
onThinkDelta?: ((delta: string, part?: ThinkPart) => void) | undefined;
onToolCallDelta?: ((delta: ToolCallDelta) => void) | undefined;
/**
* Fires once per completed text block. Additive relative to
Expand Down
2 changes: 2 additions & 0 deletions packages/agent-core/src/loop/retry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ export interface ChatWithRetryInput {
readonly currentStep: number;
readonly stepUuid: string;
readonly maxAttempts?: number;
readonly onRetrying?: () => void;
readonly log?: Logger | undefined;
}

Expand Down Expand Up @@ -51,6 +52,7 @@ export async function chatWithRetry(input: ChatWithRetryInput): Promise<LLMChatR

const delayMs = delays[attempt - 1] ?? 0;
input.params.signal.throwIfAborted();
input.onRetrying?.();
input.dispatchEvent({
type: 'step.retrying',
turnId: input.turnId,
Expand Down
151 changes: 105 additions & 46 deletions packages/agent-core/src/loop/turn-step.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@

import { randomUUID } from 'node:crypto';

import type { TokenUsage } from '@moonshot-ai/kosong';
import { mergeInPlace, type ThinkPart, type TokenUsage } from '@moonshot-ai/kosong';
import type { Logger } from '#/logging/types';

import type { LoopEventDispatcher } from './events';
import { isAbortError } from './errors';
import type { LLM, LLMChatParams, LLMChatResponse } from './llm';
import { chatWithRetry } from './retry';
import { runToolCallBatch, type ToolCallStepContext } from './tool-call';
Expand Down Expand Up @@ -99,27 +100,37 @@ export async function executeLoopStep(deps: ExecuteLoopStepDeps): Promise<{
step: currentStep,
});

const streamingCallbacks = createChatStreamingCallbacks({
dispatchEvent,
turnId,
currentStep,
stepUuid,
});
const chatParams: LLMChatParams = {
messages,
tools: tools ?? [],
signal,
...createChatStreamingCallbacks({
...streamingCallbacks.callbacks,
};
let response: LLMChatResponse;
try {
response = await chatWithRetry({
llm,
params: chatParams,
Comment on lines +117 to +119

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Reset buffered deltas between retry attempts

In the retry path, chatWithRetry reuses these same streaming callbacks for every attempt. If an earlier attempt emits text or thinking deltas and then fails with a retryable error, the new buffers keep those discarded deltas; if a later attempt is cancelled, flushInterruptedAssistantParts() persists the stale output into content.part, so resumed history can include assistant content from a request the retry layer threw away. Scope the buffers to a single attempt or clear them when step.retrying is emitted.

Useful? React with 👍 / 👎.

dispatchEvent,
turnId,
currentStep,
stepUuid,
}),
};
const response: LLMChatResponse = await chatWithRetry({
llm,
params: chatParams,
dispatchEvent,
turnId,
currentStep,
stepUuid,
maxAttempts: maxRetryAttempts,
log,
});
maxAttempts: maxRetryAttempts,
onRetrying: streamingCallbacks.clearBuffer,
log,
});
} catch (error) {
if (signal.aborted || isAbortError(error)) {
await streamingCallbacks.flushOnAbort();
}
throw error;
}
const usage = response.usage;
const usageResult = await recordUsage(usage);
const stopTurnAfterUsage = usageResult?.stopTurn === true;
Expand Down Expand Up @@ -211,10 +222,8 @@ function stepEndProviderDiagnostics(
}

return {
...(providerFinishReason !== undefined ? { providerFinishReason } : {}),
...(response.rawFinishReason !== undefined
? { rawFinishReason: response.rawFinishReason }
: {}),
providerFinishReason,
rawFinishReason: response.rawFinishReason,
};
}

Expand All @@ -223,35 +232,31 @@ function createChatStreamingCallbacks(deps: {
readonly turnId: string;
readonly currentStep: number;
readonly stepUuid: string;
}): ChatStreamingCallbacks {
}) {
const { dispatchEvent, turnId, currentStep, stepUuid } = deps;

return {
onTextDelta: (delta) => {
dispatchEvent({ type: 'text.delta', delta });
},
onThinkDelta: (delta) => {
dispatchEvent({ type: 'thinking.delta', delta });
},
onToolCallDelta: (delta) => {
dispatchEvent({
type: 'tool.call.delta',
toolCallId: delta.toolCallId,
name: delta.name,
argumentsPart: delta.argumentsPart,
});
},
onTextPart: async (part) => {
await dispatchEvent({
type: 'content.part',
uuid: randomUUID(),
turnId,
step: currentStep,
stepUuid,
part,
});
},
onThinkPart: async (part) => {
let bufferedText = '';
const bufferedThinkParts: ThinkPart[] = [];

const clearBuffer = (): void => {
bufferedText = '';
bufferedThinkParts.length = 0;
};

const bufferThinkPart = (part: ThinkPart): void => {
if (part.think.length === 0 && part.encrypted === undefined) return;

const next: ThinkPart = { ...part };
const last = bufferedThinkParts.at(-1);
if (last === undefined || !mergeInPlace(last, next)) {
bufferedThinkParts.push(next);
}
};

const flushOnAbort = async (): Promise<void> => {
const text = bufferedText;
if (text.length === 0) return;
for (const part of bufferedThinkParts) {
Comment thread
kermanx marked this conversation as resolved.
await dispatchEvent({
type: 'content.part',
uuid: randomUUID(),
Expand All @@ -260,6 +265,60 @@ function createChatStreamingCallbacks(deps: {
stepUuid,
part,
});
},
}
await dispatchEvent({
type: 'content.part',
uuid: randomUUID(),
turnId,
step: currentStep,
stepUuid,
part: { type: 'text', text },
});
clearBuffer();
};

return {
callbacks: {
onTextDelta: (delta) => {
bufferedText += delta;
dispatchEvent({ type: 'text.delta', delta });
},
onThinkDelta: (delta, part) => {
bufferThinkPart(part ?? { type: 'think', think: delta });
dispatchEvent({ type: 'thinking.delta', delta });
},
onToolCallDelta: (delta) => {
dispatchEvent({
type: 'tool.call.delta',
toolCallId: delta.toolCallId,
name: delta.name,
argumentsPart: delta.argumentsPart,
});
},
onTextPart: async (part) => {
clearBuffer();
await dispatchEvent({
type: 'content.part',
uuid: randomUUID(),
turnId,
step: currentStep,
stepUuid,
part,
});
},
onThinkPart: async (part) => {
clearBuffer();
await dispatchEvent({
type: 'content.part',
uuid: randomUUID(),
turnId,
step: currentStep,
stepUuid,
part,
});
},
} satisfies ChatStreamingCallbacks,
clearBuffer,
flushOnAbort,
};
}
7 changes: 6 additions & 1 deletion packages/agent-core/test/loop/api-shape.e2e.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,12 @@ function _typeOnlyChecks(): void {
tools: [],
signal: _signal,
onTextDelta: (_delta: string) => {},
onThinkDelta: (_delta: string) => {},
onThinkDelta: (_delta: string, _part) => {
const thinking: string | undefined = _part?.think;
const encrypted: string | undefined = _part?.encrypted;
void thinking;
void encrypted;
},
onToolCallDelta: (_delta) => {},
onTextPart: async (_part) => {
const text: string = _part.text;
Expand Down
Loading
Loading