From dcba5f3d910c304eea15628014454eaf0b9b47af Mon Sep 17 00:00:00 2001 From: Ruslan Grinev Date: Sat, 20 Jun 2026 19:32:47 +0300 Subject: [PATCH 1/2] feat: show thinking content --- .env.example | 4 + PRODUCT.md | 3 +- README.md | 1 + .../managers/summary-aggregation-manager.ts | 224 +++++++++++++++++- src/bot/messages/thinking-rendering.ts | 80 +++++++ .../services/event-subscription-service.ts | 91 ++++++- src/config.ts | 1 + .../summary-aggregation-manager.test.ts | 135 ++++++++++- tests/bot/messages/thinking-rendering.test.ts | 57 +++++ tests/config.test.ts | 8 + 10 files changed, 579 insertions(+), 25 deletions(-) create mode 100644 src/bot/messages/thinking-rendering.ts create mode 100644 tests/bot/messages/thinking-rendering.test.ts diff --git a/.env.example b/.env.example index cf22477b..49f9f00c 100644 --- a/.env.example +++ b/.env.example @@ -104,6 +104,10 @@ OPENCODE_MODEL_ID=big-pickle # Hide thinking indicator messages (default: false) # HIDE_THINKING_MESSAGES=false +# Show full model reasoning in the thinking message (default: false) +# Uses RESPONSE_STREAMING_MODE for edit vs draft streaming. Ignored when HIDE_THINKING_MESSAGES=true. +# SHOW_THINKING_CONTENT=false + # Hide tool call service messages (default: false) # HIDE_TOOL_CALL_MESSAGES=false diff --git a/PRODUCT.md b/PRODUCT.md index e2c5c174..6639fc7a 100644 --- a/PRODUCT.md +++ b/PRODUCT.md @@ -59,7 +59,7 @@ No public inbound ports are required for normal usage. ### Result delivery - Send each completed assistant response after completion signal from SSE -- Do not expose raw chain-of-thought; send a lightweight thinking indicator instead +- Hide full model reasoning by default; optionally stream it in the thinking message when explicitly enabled - Split long responses into multiple Telegram messages - Send code updates as files (size-limited) @@ -86,6 +86,7 @@ No public inbound ports are required for normal usage. - Configurable scheduled task limit (default: 10) - Configurable bot locale - Configurable visibility for service messages (thinking/tool calls) +- Configurable opt-in display of full thinking/reasoning content - Configurable max code file size in KB (default: 100) - Optional STT settings for voice transcription (`STT_API_URL`, `STT_API_KEY`, `STT_MODEL`, `STT_LANGUAGE`) - Optional TTS settings for global audio replies (`TTS_PROVIDER`, `TTS_API_URL`, `TTS_API_KEY`, `TTS_MODEL`, `TTS_VOICE`) diff --git a/README.md b/README.md index fda557fa..501fd732 100644 --- a/README.md +++ b/README.md @@ -231,6 +231,7 @@ When installed via npm, the configuration wizard handles the initial setup. The | `BASH_TOOL_DISPLAY_MAX_LENGTH` | Maximum displayed length for `bash` tool commands in Telegram summaries; longer commands are truncated | No | `128` | | `SERVICE_MESSAGES_INTERVAL_SEC` | Service messages interval (thinking + tool calls); keep `>=2` to avoid Telegram rate limits, `0` = immediate | No | `5` | | `HIDE_THINKING_MESSAGES` | Hide `💭 Thinking...` service messages | No | `false` | +| `SHOW_THINKING_CONTENT` | Show full model reasoning in the thinking message; uses `RESPONSE_STREAMING_MODE` for edit vs draft streaming | No | `false` | | `HIDE_TOOL_CALL_MESSAGES` | Hide tool-call service messages (`💻 bash ...`, `📖 read ...`, etc.) | No | `false` | | `HIDE_TOOL_FILE_MESSAGES` | Hide file edit documents sent as `.txt` attachments (`edit_*.txt`, `write_*.txt`) | No | `false` | | `TRACK_BACKGROUND_SESSIONS` | Track detached/non-current sessions in the current selected project/worktree and send short notifications | No | `true` | diff --git a/src/app/managers/summary-aggregation-manager.ts b/src/app/managers/summary-aggregation-manager.ts index 44c9bb21..45d805dc 100644 --- a/src/app/managers/summary-aggregation-manager.ts +++ b/src/app/managers/summary-aggregation-manager.ts @@ -32,6 +32,19 @@ type MessageCompleteCallback = ( type MessagePartialCallback = (sessionId: string, messageId: string, messageText: string) => void; +export interface ThinkingSection { + id: string; + title?: string; + text: string; +} + +export interface ThinkingUpdate { + sessionId: string; + messageId: string; + sections: ThinkingSection[]; + isFirstUpdate: boolean; +} + type ExternalUserInputCallback = ( sessionId: string, messageId: string, @@ -81,7 +94,7 @@ type QuestionCallback = (questions: Question[], requestID: string, sessionId: st type QuestionErrorCallback = () => void; -type ThinkingCallback = (sessionId: string) => void; +type ThinkingCallback = (update: ThinkingUpdate) => void; export interface TokensInfo { input: number; @@ -153,6 +166,11 @@ interface TextMessageState { optimisticUpdateCount: number; } +interface ThinkingMessageState { + orderedPartIds: string[]; + sections: Map; +} + interface SubagentState extends SubagentInfo { hasSubtaskMetadata: boolean; hasTaskToolMetadata: boolean; @@ -207,6 +225,7 @@ function normalizeSnapshotValue(value: unknown): unknown { class SummaryAggregator { private currentSessionId: string | null = null; private textMessageStates: Map = new Map(); + private thinkingMessageStates: Map = new Map(); private messages: Map = new Map(); private messageCount = 0; private lastUpdated = 0; @@ -452,6 +471,7 @@ class SummaryAggregator { this.stopTypingIndicator(); this.currentSessionId = null; this.textMessageStates.clear(); + this.thinkingMessageStates.clear(); this.messages.clear(); this.partHashes.clear(); this.knownTextPartIds.clear(); @@ -1187,6 +1207,14 @@ class SummaryAggregator { this.registerTextPart(messageID, part.id); } + if (part.type === "reasoning") { + this.registerThinkingPart( + messageID, + part.id, + this.extractReasoningTitle(part as unknown as Record), + ); + } + const deltaFromUpdated = (event.properties as { delta?: unknown }).delta; if ( part.type === "text" && @@ -1198,18 +1226,42 @@ class SummaryAggregator { return; } + if ( + part.type === "reasoning" && + typeof deltaFromUpdated === "string" && + deltaFromUpdated.length > 0 + ) { + const partText = "text" in part && typeof part.text === "string" ? part.text : undefined; + this.applyThinkingDelta( + part.sessionID, + messageID, + part.id, + deltaFromUpdated, + partText, + this.extractReasoningTitle(part as unknown as Record), + ); + this.lastUpdated = Date.now(); + return; + } + if (part.type === "reasoning") { - // Fire the thinking callback once per message on the first reasoning part. - // This is the signal that the model is actually doing extended thinking. - if (!this.thinkingFiredForMessages.has(messageID) && this.onThinkingCallback) { + // Fire the thinking callback on every reasoning update. The first update + // preserves the old lightweight indicator behavior for callers that do + // not display full reasoning content. + const isFirstUpdate = !this.thinkingFiredForMessages.has(messageID); + if (isFirstUpdate) { this.thinkingFiredForMessages.add(messageID); - const callback = this.onThinkingCallback; - const sessionID = part.sessionID; - setImmediate(() => { - if (typeof callback === "function") { - callback(sessionID); - } - }); + } + + const partText = "text" in part && typeof part.text === "string" ? part.text : ""; + const wasUpdated = this.setThinkingPartSnapshot( + messageID, + part.id, + partText, + this.extractReasoningTitle(part as unknown as Record), + ); + if (isFirstUpdate || wasUpdated) { + this.emitThinkingUpdate(part.sessionID, messageID, isFirstUpdate); } } else if (part.type === "text" && "text" in part && part.text) { const wasUpdated = @@ -1340,6 +1392,14 @@ class SummaryAggregator { return; } + if (partType === "reasoning" || (!partType && this.isKnownThinkingPart(messageID, partID))) { + const title = part + ? this.extractReasoningTitle(part as unknown as Record) + : undefined; + this.applyThinkingDelta(sessionID, messageID, partID, delta, part?.text, title); + return; + } + if (partType && partType !== "text") { return; } @@ -1403,6 +1463,147 @@ class SummaryAggregator { this.emitPartialText(sessionID, messageID, combined); } + private extractReasoningTitle(part: Record): string | undefined { + for (const key of ["title", "heading", "summary", "name"]) { + const value = part[key]; + if (typeof value === "string" && value.trim()) { + return value; + } + } + + const metadata = part.metadata; + if (metadata && typeof metadata === "object") { + for (const key of ["title", "heading", "summary", "name"]) { + const value = (metadata as Record)[key]; + if (typeof value === "string" && value.trim()) { + return value; + } + } + } + + return undefined; + } + + private getOrCreateThinkingMessageState(messageID: string): ThinkingMessageState { + let state = this.thinkingMessageStates.get(messageID); + if (!state) { + state = { + orderedPartIds: [], + sections: new Map(), + }; + this.thinkingMessageStates.set(messageID, state); + } + return state; + } + + private isKnownThinkingPart(messageID: string, partID: string): boolean { + return this.thinkingMessageStates.get(messageID)?.sections.has(partID) ?? false; + } + + private registerThinkingPart(messageID: string, partID: string, title?: string): void { + const state = this.getOrCreateThinkingMessageState(messageID); + if (!state.orderedPartIds.includes(partID)) { + state.orderedPartIds.push(partID); + } + + const existing = state.sections.get(partID); + if (!existing) { + const section: ThinkingSection = { id: partID, text: "" }; + if (title) { + section.title = title; + } + state.sections.set(partID, section); + return; + } + + if (title && existing.title !== title) { + existing.title = title; + } + } + + private setThinkingPartSnapshot( + messageID: string, + partID: string, + text: string, + title?: string, + ): boolean { + this.registerThinkingPart(messageID, partID, title); + + const state = this.getOrCreateThinkingMessageState(messageID); + const existing = state.sections.get(partID); + const nextTitle = title ?? existing?.title; + + if (existing && existing.text === text && existing.title === nextTitle) { + return false; + } + + const next: ThinkingSection = { id: partID, text }; + if (nextTitle) { + next.title = nextTitle; + } + state.sections.set(partID, next); + return true; + } + + private applyThinkingDelta( + sessionID: string, + messageID: string, + partID: string, + delta: string, + fullTextHint?: string, + title?: string, + ): void { + if (sessionID !== this.currentSessionId) { + return; + } + + this.registerThinkingPart(messageID, partID, title); + + const state = this.getOrCreateThinkingMessageState(messageID); + const existing = state.sections.get(partID); + const previous = existing?.text ?? ""; + let accumulated = `${previous}${delta}`; + + if (typeof fullTextHint === "string" && fullTextHint.length > accumulated.length) { + accumulated = fullTextHint; + } + + this.setThinkingPartSnapshot(messageID, partID, accumulated, title ?? existing?.title); + this.emitThinkingUpdate(sessionID, messageID, false); + } + + private getThinkingSections(messageID: string): ThinkingSection[] { + const state = this.thinkingMessageStates.get(messageID); + if (!state) { + return []; + } + + return state.orderedPartIds + .map((partID) => state.sections.get(partID)) + .filter((section): section is ThinkingSection => Boolean(section)) + .map((section) => ({ ...section })); + } + + private emitThinkingUpdate( + sessionId: string, + messageId: string, + isFirstUpdate: boolean, + ): void { + if (!this.onThinkingCallback) { + return; + } + + const sections = this.getThinkingSections(messageId); + if (sections.length === 0) { + return; + } + + const callback = this.onThinkingCallback; + setImmediate(() => { + callback({ sessionId, messageId, sections, isFirstUpdate }); + }); + } + private emitExternalUserInputIfReady(sessionId: string, messageId: string): void { if (sessionId !== this.currentSessionId || this.deliveredExternalUserMessageIds.has(messageId)) { return; @@ -1435,6 +1636,7 @@ class SummaryAggregator { private cleanupCompletedMessage(messageId: string): void { this.textMessageStates.delete(messageId); + this.thinkingMessageStates.delete(messageId); this.messages.delete(messageId); this.partHashes.delete(messageId); this.knownTextPartIds.delete(messageId); diff --git a/src/bot/messages/thinking-rendering.ts b/src/bot/messages/thinking-rendering.ts new file mode 100644 index 00000000..21007f1a --- /dev/null +++ b/src/bot/messages/thinking-rendering.ts @@ -0,0 +1,80 @@ +import type { MessageEntity } from "grammy/types"; + +import { t } from "../../i18n/index.js"; +import type { TelegramRenderedPart } from "../render/types.js"; +import type { StreamingMessagePayload } from "../streaming/response-streamer.js"; + +export interface ThinkingSection { + id: string; + title?: string; + text: string; +} + +function formatHeader(title?: string): string { + const fallback = t("bot.thinking"); + const normalizedTitle = title?.trim(); + return normalizedTitle ? `${fallback} — ${normalizedTitle}` : fallback; +} + +function quoteFallbackText(text: string): string { + return text + .split("\n") + .map((line) => `> ${line}`) + .join("\n"); +} + +function splitText(text: string, maxLength: number): string[] { + if (text.length <= maxLength) { + return [text]; + } + + const chunks: string[] = []; + for (let offset = 0; offset < text.length; offset += maxLength) { + chunks.push(text.slice(offset, offset + maxLength)); + } + return chunks; +} + +function createThinkingPart(header: string, text: string): TelegramRenderedPart { + if (!text) { + return { + text: header, + fallbackText: header, + source: "entities", + }; + } + + const renderedText = `${header}\n${text}`; + const entity: MessageEntity = { + type: "blockquote", + offset: header.length + 1, + length: text.length, + }; + + return { + text: renderedText, + entities: [entity], + fallbackText: `${header}\n${quoteFallbackText(text)}`, + source: "entities", + }; +} + +export function prepareThinkingStreamingPayload( + sections: ThinkingSection[], + maxPartLength: number, +): StreamingMessagePayload | null { + const parts: TelegramRenderedPart[] = []; + + for (const section of sections) { + const header = formatHeader(section.title); + const text = section.text.replace(/\r\n/g, "\n").trimEnd(); + const textLimit = Math.max(1, maxPartLength - header.length - 1); + const chunks = text ? splitText(text, textLimit) : [""]; + + for (const chunk of chunks) { + parts.push(createThinkingPart(header, chunk)); + } + } + + return parts.length > 0 ? { parts } : null; +} diff --git a/src/bot/services/event-subscription-service.ts b/src/bot/services/event-subscription-service.ts index 3d481c32..f05772de 100644 --- a/src/bot/services/event-subscription-service.ts +++ b/src/bot/services/event-subscription-service.ts @@ -49,6 +49,7 @@ import { prepareAssistantStreamingPayload, renderAssistantFinalPartsSafe, } from "../messages/assistant-rendering.js"; +import { prepareThinkingStreamingPayload } from "../messages/thinking-rendering.js"; import { deliverExternalUserInputNotification } from "../messages/external-user-input-notification.js"; import { backgroundSessionTracker, @@ -91,6 +92,7 @@ class EventSubscriptionService implements BotEventSubscriptionService { private botInstance: Bot | null = null; private chatIdInstance: number | null = null; private nextDraftId = 1; + private readonly thinkingStreamingPayloads = new Map(); private readonly sessionCompletionTasks = new Map>(); private readonly responseStreamer: ResponseStreamer; private readonly toolCallStreamer: ToolCallStreamer; @@ -233,6 +235,7 @@ class EventSubscriptionService implements BotEventSubscriptionService { this.responseStreamer.clearAll(reason); this.toolCallStreamer.clearAll(reason); this.toolMessageBatcher.clearAll(reason); + this.thinkingStreamingPayloads.clear(); this.sessionCompletionTasks.clear(); assistantRunState.clearAll(reason); } @@ -262,6 +265,7 @@ class EventSubscriptionService implements BotEventSubscriptionService { this.toolMessageBatcher.clearAll("summary_aggregator_clear"); this.toolCallStreamer.clearAll("summary_aggregator_clear"); this.responseStreamer.clearAll("summary_aggregator_clear"); + this.thinkingStreamingPayloads.clear(); }); summaryAggregator.setOnPartial((sessionId, messageId, messageText) => { @@ -291,6 +295,7 @@ class EventSubscriptionService implements BotEventSubscriptionService { logger.error("Bot or chat ID not available for sending message"); clearPromptResponseMode(sessionId); this.responseStreamer.clearMessage(sessionId, messageId, "bot_context_missing"); + this.clearThinkingStream(sessionId, messageId, "bot_context_missing"); this.toolCallStreamer.clearSession(sessionId, "bot_context_missing"); assistantRunState.clearRun(sessionId, "bot_context_missing"); foregroundSessionState.markIdle(sessionId); @@ -301,6 +306,7 @@ class EventSubscriptionService implements BotEventSubscriptionService { if (currentSession?.id !== sessionId) { clearPromptResponseMode(sessionId); this.responseStreamer.clearMessage(sessionId, messageId, "session_mismatch"); + this.clearThinkingStream(sessionId, messageId, "session_mismatch"); this.toolCallStreamer.clearSession(sessionId, "session_mismatch"); assistantRunState.clearRun(sessionId, "session_mismatch"); foregroundSessionState.markIdle(sessionId); @@ -318,6 +324,8 @@ class EventSubscriptionService implements BotEventSubscriptionService { modelID: completionInfo.modelID, }); + await this.completeThinkingStream(sessionId, messageId); + await finalizeAssistantResponse({ sessionId, messageId, @@ -349,6 +357,7 @@ class EventSubscriptionService implements BotEventSubscriptionService { }); } catch (err) { clearPromptResponseMode(sessionId); + this.clearThinkingStream(sessionId, messageId, "assistant_finalize_failed"); assistantRunState.clearRun(sessionId, "assistant_finalize_failed"); logger.error("Failed to send message to Telegram:", err); logger.error("[Bot] CRITICAL: Stopping event processing due to error"); @@ -551,25 +560,50 @@ class EventSubscriptionService implements BotEventSubscriptionService { await showPermissionRequest(this.botInstance.api, this.chatIdInstance, request); }); - summaryAggregator.setOnThinking(async (sessionId) => { + summaryAggregator.setOnThinking(async (update) => { if (!this.botInstance || !this.chatIdInstance) { return; } const currentSession = getCurrentSession(); - if (!currentSession || currentSession.id !== sessionId) { + if (!currentSession || currentSession.id !== update.sessionId) { return; } - logger.debug("[Bot] Agent started thinking"); + logger.debug("[Bot] Agent thinking update", { + sessionId: update.sessionId, + messageId: update.messageId, + sectionCount: update.sections.length, + isFirstUpdate: update.isFirstUpdate, + }); - await this.toolCallStreamer.breakSession(sessionId, "thinking_started"); + if (update.isFirstUpdate) { + await this.toolCallStreamer.breakSession(update.sessionId, "thinking_started"); + } - deliverThinkingMessage(sessionId, this.toolMessageBatcher, { - hideThinkingMessages: config.bot.hideThinkingMessages, - }); + if (!config.bot.hideThinkingMessages && config.bot.showThinkingContent) { + const payload = prepareThinkingStreamingPayload(update.sections, RESPONSE_STREAM_TEXT_LIMIT); + if (payload) { + payload.sendOptions = { disable_notification: true }; + payload.editOptions = undefined; - if (pinnedMessageManager.isInitialized()) { + this.thinkingStreamingPayloads.set( + this.getThinkingPayloadKey(update.sessionId, update.messageId), + payload, + ); + this.responseStreamer.enqueue( + update.sessionId, + this.getThinkingStreamId(update.messageId), + payload, + ); + } + } else if (update.isFirstUpdate) { + deliverThinkingMessage(update.sessionId, this.toolMessageBatcher, { + hideThinkingMessages: config.bot.hideThinkingMessages, + }); + } + + if (update.isFirstUpdate && pinnedMessageManager.isInitialized()) { await pinnedMessageManager.refresh(); } }); @@ -958,6 +992,47 @@ class EventSubscriptionService implements BotEventSubscriptionService { return prepareAssistantFinalStreamingPayload(messageText, RESPONSE_STREAM_TEXT_LIMIT); } + private getThinkingStreamId(messageId: string): string { + return `thinking:${messageId}`; + } + + private getThinkingPayloadKey(sessionId: string, messageId: string): string { + return `${sessionId}:${messageId}`; + } + + private clearThinkingStream(sessionId: string, messageId: string, reason: string): void { + this.responseStreamer.clearMessage(sessionId, this.getThinkingStreamId(messageId), reason); + this.thinkingStreamingPayloads.delete(this.getThinkingPayloadKey(sessionId, messageId)); + } + + private async completeThinkingStream(sessionId: string, messageId: string): Promise { + const key = this.getThinkingPayloadKey(sessionId, messageId); + const payload = this.thinkingStreamingPayloads.get(key); + const result = await this.responseStreamer.complete( + sessionId, + this.getThinkingStreamId(messageId), + payload, + ); + this.thinkingStreamingPayloads.delete(key); + + if (result.streamed || !payload) { + return; + } + + if (!this.botInstance || !this.chatIdInstance) { + return; + } + + for (const part of payload.parts) { + await sendRenderedBotPart({ + api: this.botInstance.api, + chatId: this.chatIdInstance, + part, + options: payload.sendOptions as Parameters[0]["options"], + }); + } + } + private enqueueSessionCompletionTask(sessionId: string, task: () => Promise): Promise { const previousTask = this.sessionCompletionTasks.get(sessionId) ?? Promise.resolve(); const nextTask = previousTask diff --git a/src/config.ts b/src/config.ts index da800c93..ef858b91 100644 --- a/src/config.ts +++ b/src/config.ts @@ -186,6 +186,7 @@ export const config = { bashToolDisplayMaxLength: getOptionalPositiveIntEnvVar("BASH_TOOL_DISPLAY_MAX_LENGTH", 128), locale: getOptionalLocaleEnvVar("BOT_LOCALE", "en"), hideThinkingMessages: getOptionalBooleanEnvVar("HIDE_THINKING_MESSAGES", false), + showThinkingContent: getOptionalBooleanEnvVar("SHOW_THINKING_CONTENT", false), hideToolCallMessages: getOptionalBooleanEnvVar("HIDE_TOOL_CALL_MESSAGES", false), hideToolFileMessages: getOptionalBooleanEnvVar("HIDE_TOOL_FILE_MESSAGES", false), trackBackgroundSessions: getOptionalBooleanEnvVar("TRACK_BACKGROUND_SESSIONS", true), diff --git a/tests/app/managers/summary-aggregation-manager.test.ts b/tests/app/managers/summary-aggregation-manager.test.ts index 389bacc5..58c989aa 100644 --- a/tests/app/managers/summary-aggregation-manager.test.ts +++ b/tests/app/managers/summary-aggregation-manager.test.ts @@ -573,7 +573,7 @@ describe("summary/aggregator", () => { expect(onToolFile).not.toHaveBeenCalled(); }); - it("passes sessionId to thinking callback when reasoning part arrives", async () => { + it("passes reasoning sections to thinking callback when reasoning part arrives", async () => { const onThinking = vi.fn(); summaryAggregator.setOnThinking(onThinking); summaryAggregator.setSession("session-1"); @@ -606,7 +606,113 @@ describe("summary/aggregator", () => { await new Promise((resolve) => setImmediate(resolve)); - expect(onThinking).toHaveBeenCalledWith("session-1"); + expect(onThinking).toHaveBeenCalledWith({ + sessionId: "session-1", + messageId: "message-1", + isFirstUpdate: true, + sections: [ + { + id: "part-reasoning-1", + text: "Let me think about this...", + }, + ], + }); + }); + + it("streams explicit reasoning deltas without adding them to assistant text", async () => { + const onThinking = vi.fn(); + const onPartial = vi.fn(); + const onComplete = vi.fn(); + summaryAggregator.setOnThinking(onThinking); + summaryAggregator.setOnPartial(onPartial); + summaryAggregator.setOnComplete(onComplete); + summaryAggregator.setSession("session-1"); + + summaryAggregator.processEvent({ + type: "message.updated", + properties: { + info: { + id: "message-reasoning-delta", + sessionID: "session-1", + role: "assistant", + time: { created: Date.now() }, + }, + }, + } as unknown as Event); + + summaryAggregator.processEvent({ + type: "message.part.updated", + properties: { + part: { + id: "reasoning-delta-part", + sessionID: "session-1", + messageID: "message-reasoning-delta", + type: "reasoning", + title: "Analysis", + text: "First ", + }, + }, + } as unknown as Event); + + summaryAggregator.processEvent({ + type: "message.part.delta", + properties: { + part: { + id: "reasoning-delta-part", + sessionID: "session-1", + messageID: "message-reasoning-delta", + type: "reasoning", + }, + delta: "second", + }, + } as unknown as Event); + + summaryAggregator.processEvent({ + type: "message.part.updated", + properties: { + part: { + id: "answer-part", + sessionID: "session-1", + messageID: "message-reasoning-delta", + type: "text", + text: "Final answer.", + }, + }, + } as unknown as Event); + + summaryAggregator.processEvent({ + type: "message.updated", + properties: { + info: { + id: "message-reasoning-delta", + sessionID: "session-1", + role: "assistant", + time: { created: Date.now(), completed: Date.now() }, + }, + }, + } as unknown as Event); + + await new Promise((resolve) => setImmediate(resolve)); + + expect(onThinking).toHaveBeenLastCalledWith({ + sessionId: "session-1", + messageId: "message-reasoning-delta", + isFirstUpdate: false, + sections: [ + { + id: "reasoning-delta-part", + title: "Analysis", + text: "First second", + }, + ], + }); + expect(onPartial).toHaveBeenCalledWith("session-1", "message-reasoning-delta", "Final answer."); + expect(onComplete).toHaveBeenCalledWith( + "session-1", + "message-reasoning-delta", + "Final answer.", + expect.any(Object), + ); }); it("streams partial text and passes messageId on completion", () => { @@ -1152,7 +1258,7 @@ describe("summary/aggregator", () => { expect(onThinking).not.toHaveBeenCalled(); }); - it("fires thinking callback only once per message even with multiple reasoning parts", async () => { + it("streams all reasoning parts and marks only the first thinking update", async () => { const onThinking = vi.fn(); summaryAggregator.setOnThinking(onThinking); summaryAggregator.setSession("session-1"); @@ -1187,8 +1293,27 @@ describe("summary/aggregator", () => { await new Promise((resolve) => setImmediate(resolve)); - expect(onThinking).toHaveBeenCalledTimes(1); - expect(onThinking).toHaveBeenCalledWith("session-1"); + expect(onThinking).toHaveBeenCalledTimes(3); + expect(onThinking.mock.calls[0][0]).toEqual( + expect.objectContaining({ + sessionId: "session-1", + messageId: "message-multi-reasoning", + isFirstUpdate: true, + sections: [{ id: "part-reasoning-0", text: "Thinking step 0" }], + }), + ); + expect(onThinking.mock.calls[2][0]).toEqual( + expect.objectContaining({ + sessionId: "session-1", + messageId: "message-multi-reasoning", + isFirstUpdate: false, + sections: [ + { id: "part-reasoning-0", text: "Thinking step 0" }, + { id: "part-reasoning-1", text: "Thinking step 1" }, + { id: "part-reasoning-2", text: "Thinking step 2" }, + ], + }), + ); }); it("reports session.error message through callback", async () => { diff --git a/tests/bot/messages/thinking-rendering.test.ts b/tests/bot/messages/thinking-rendering.test.ts new file mode 100644 index 00000000..013d2094 --- /dev/null +++ b/tests/bot/messages/thinking-rendering.test.ts @@ -0,0 +1,57 @@ +import { describe, expect, it } from "vitest"; + +import { prepareThinkingStreamingPayload } from "../../../src/bot/messages/thinking-rendering.js"; +import { t } from "../../../src/i18n/index.js"; + +describe("bot/messages/thinking-rendering", () => { + it("renders thinking title on the first line and content as a quote", () => { + const header = `${t("bot.thinking")} — Analysis`; + const text = "Line one\nLine two"; + + const payload = prepareThinkingStreamingPayload( + [{ id: "r1", title: "Analysis", text }], + 3800, + ); + + expect(payload?.parts).toEqual([ + { + text: `${header}\n${text}`, + entities: [{ type: "blockquote", offset: header.length + 1, length: text.length }], + fallbackText: `${header}\n> Line one\n> Line two`, + source: "entities", + }, + ]); + }); + + it("renders all reasoning sections in order", () => { + const payload = prepareThinkingStreamingPayload( + [ + { id: "r1", title: "First", text: "A" }, + { id: "r2", title: "Second", text: "B" }, + ], + 3800, + ); + + expect(payload?.parts.map((part) => part.text)).toEqual([ + `${t("bot.thinking")} — First\nA`, + `${t("bot.thinking")} — Second\nB`, + ]); + }); + + it("splits long thinking content into multiple quoted parts", () => { + const header = t("bot.thinking"); + const payload = prepareThinkingStreamingPayload( + [{ id: "r1", text: "abcdefghij" }], + header.length + 1 + 4, + ); + + expect(payload?.parts.map((part) => part.text)).toEqual([ + `${header}\nabcd`, + `${header}\nefgh`, + `${header}\nij`, + ]); + expect(payload?.parts[0].entities).toEqual([ + { type: "blockquote", offset: header.length + 1, length: 4 }, + ]); + }); +}); diff --git a/tests/config.test.ts b/tests/config.test.ts index e7872a68..fdb91e84 100644 --- a/tests/config.test.ts +++ b/tests/config.test.ts @@ -18,12 +18,14 @@ describe("config boolean env parsing", () => { it("uses false defaults for hide service message flags", async () => { vi.stubEnv("HIDE_THINKING_MESSAGES", ""); + vi.stubEnv("SHOW_THINKING_CONTENT", ""); vi.stubEnv("HIDE_TOOL_CALL_MESSAGES", ""); vi.stubEnv("HIDE_TOOL_FILE_MESSAGES", ""); const config = await loadConfig(); expect(config.bot.hideThinkingMessages).toBe(false); + expect(config.bot.showThinkingContent).toBe(false); expect(config.bot.hideToolCallMessages).toBe(false); expect(config.bot.hideToolFileMessages).toBe(false); }); @@ -54,36 +56,42 @@ describe("config boolean env parsing", () => { it("parses truthy values for hide service message flags", async () => { vi.stubEnv("HIDE_THINKING_MESSAGES", "YES"); + vi.stubEnv("SHOW_THINKING_CONTENT", "1"); vi.stubEnv("HIDE_TOOL_CALL_MESSAGES", "1"); vi.stubEnv("HIDE_TOOL_FILE_MESSAGES", "true"); const config = await loadConfig(); expect(config.bot.hideThinkingMessages).toBe(true); + expect(config.bot.showThinkingContent).toBe(true); expect(config.bot.hideToolCallMessages).toBe(true); expect(config.bot.hideToolFileMessages).toBe(true); }); it("parses falsy values for hide service message flags", async () => { vi.stubEnv("HIDE_THINKING_MESSAGES", "off"); + vi.stubEnv("SHOW_THINKING_CONTENT", "false"); vi.stubEnv("HIDE_TOOL_CALL_MESSAGES", "0"); vi.stubEnv("HIDE_TOOL_FILE_MESSAGES", "false"); const config = await loadConfig(); expect(config.bot.hideThinkingMessages).toBe(false); + expect(config.bot.showThinkingContent).toBe(false); expect(config.bot.hideToolCallMessages).toBe(false); expect(config.bot.hideToolFileMessages).toBe(false); }); it("falls back to defaults on invalid values", async () => { vi.stubEnv("HIDE_THINKING_MESSAGES", "banana"); + vi.stubEnv("SHOW_THINKING_CONTENT", "maybe"); vi.stubEnv("HIDE_TOOL_CALL_MESSAGES", "nope"); vi.stubEnv("HIDE_TOOL_FILE_MESSAGES", "invalid"); const config = await loadConfig(); expect(config.bot.hideThinkingMessages).toBe(false); + expect(config.bot.showThinkingContent).toBe(false); expect(config.bot.hideToolCallMessages).toBe(false); expect(config.bot.hideToolFileMessages).toBe(false); }); From 9a961a435b2e2697263a85c66836a9a09500c5ff Mon Sep 17 00:00:00 2001 From: Ruslan Grinev Date: Sat, 20 Jun 2026 20:08:54 +0300 Subject: [PATCH 2/2] feat: collapse thinking messages after completion --- .../managers/summary-aggregation-manager.ts | 31 ++++++ src/bot/messages/thinking-rendering.ts | 26 ++++- .../services/event-subscription-service.ts | 38 +++++-- .../summary-aggregation-manager.test.ts | 101 ++++++++++++++++++ tests/bot/messages/thinking-rendering.test.ts | 37 ++++++- 5 files changed, 220 insertions(+), 13 deletions(-) diff --git a/src/app/managers/summary-aggregation-manager.ts b/src/app/managers/summary-aggregation-manager.ts index 45d805dc..cacdc0f8 100644 --- a/src/app/managers/summary-aggregation-manager.ts +++ b/src/app/managers/summary-aggregation-manager.ts @@ -96,6 +96,8 @@ type QuestionErrorCallback = () => void; type ThinkingCallback = (update: ThinkingUpdate) => void; +type ThinkingFinishedCallback = (sessionId: string, messageId: string) => void; + export interface TokensInfo { input: number; output: number; @@ -237,6 +239,7 @@ class SummaryAggregator { private onQuestionCallback: QuestionCallback | null = null; private onQuestionErrorCallback: QuestionErrorCallback | null = null; private onThinkingCallback: ThinkingCallback | null = null; + private onThinkingFinishedCallback: ThinkingFinishedCallback | null = null; private onTokensCallback: TokensCallback | null = null; private onCostCallback: CostCallback | null = null; private onSubagentCallback: SubagentCallback | null = null; @@ -250,6 +253,7 @@ class SummaryAggregator { private onClearedCallback: ClearedCallback | null = null; private processedToolStates: Set = new Set(); private thinkingFiredForMessages: Set = new Set(); + private thinkingFinishedForMessages: Set = new Set(); private deliveredExternalUserMessageIds: Set = new Set(); private knownTextPartIds: Map> = new Map(); private bot: Bot | null = null; @@ -303,6 +307,10 @@ class SummaryAggregator { this.onThinkingCallback = callback; } + setOnThinkingFinished(callback: ThinkingFinishedCallback): void { + this.onThinkingFinishedCallback = callback; + } + setOnTokens(callback: TokensCallback): void { this.onTokensCallback = callback; } @@ -477,6 +485,7 @@ class SummaryAggregator { this.knownTextPartIds.clear(); this.processedToolStates.clear(); this.thinkingFiredForMessages.clear(); + this.thinkingFinishedForMessages.clear(); this.deliveredExternalUserMessageIds.clear(); this.trackedSessionParents.clear(); this.subagentStates.clear(); @@ -1221,6 +1230,7 @@ class SummaryAggregator { typeof deltaFromUpdated === "string" && deltaFromUpdated.length > 0 ) { + this.emitThinkingFinishedOnce(part.sessionID, messageID); this.applyTextDelta(part.sessionID, messageID, part.id, deltaFromUpdated, part.text); this.lastUpdated = Date.now(); return; @@ -1272,6 +1282,8 @@ class SummaryAggregator { return; } + this.emitThinkingFinishedOnce(part.sessionID, messageID); + const fullText = this.getCombinedMessageText(messageID); if (messageInfo && messageInfo.role === "assistant") { @@ -1422,6 +1434,7 @@ class SummaryAggregator { } } + this.emitThinkingFinishedOnce(sessionID, messageID); this.applyTextDelta(sessionID, messageID, partID, delta, part?.text); } @@ -1604,6 +1617,22 @@ class SummaryAggregator { }); } + private emitThinkingFinishedOnce(sessionId: string, messageId: string): void { + if ( + !this.onThinkingFinishedCallback || + !this.thinkingFiredForMessages.has(messageId) || + this.thinkingFinishedForMessages.has(messageId) + ) { + return; + } + + this.thinkingFinishedForMessages.add(messageId); + const callback = this.onThinkingFinishedCallback; + setImmediate(() => { + callback(sessionId, messageId); + }); + } + private emitExternalUserInputIfReady(sessionId: string, messageId: string): void { if (sessionId !== this.currentSessionId || this.deliveredExternalUserMessageIds.has(messageId)) { return; @@ -1640,6 +1669,8 @@ class SummaryAggregator { this.messages.delete(messageId); this.partHashes.delete(messageId); this.knownTextPartIds.delete(messageId); + this.thinkingFiredForMessages.delete(messageId); + this.thinkingFinishedForMessages.delete(messageId); if (this.textMessageStates.size === 0) { logger.debug("[Aggregator] No more active messages, stopping typing indicator"); diff --git a/src/bot/messages/thinking-rendering.ts b/src/bot/messages/thinking-rendering.ts index 21007f1a..6973f5b9 100644 --- a/src/bot/messages/thinking-rendering.ts +++ b/src/bot/messages/thinking-rendering.ts @@ -10,6 +10,10 @@ export interface ThinkingSection { text: string; } +interface ThinkingStreamingPayloadOptions { + expandable?: boolean; +} + function formatHeader(title?: string): string { const fallback = t("bot.thinking"); const normalizedTitle = title?.trim(); @@ -35,7 +39,7 @@ function splitText(text: string, maxLength: number): string[] { return chunks; } -function createThinkingPart(header: string, text: string): TelegramRenderedPart { +function createThinkingPart(header: string, text: string, expandable: boolean): TelegramRenderedPart { if (!text) { return { text: header, @@ -46,7 +50,7 @@ function createThinkingPart(header: string, text: string): TelegramRenderedPart const renderedText = `${header}\n${text}`; const entity: MessageEntity = { - type: "blockquote", + type: expandable ? "expandable_blockquote" : "blockquote", offset: header.length + 1, length: text.length, }; @@ -62,8 +66,10 @@ function createThinkingPart(header: string, text: string): TelegramRenderedPart export function prepareThinkingStreamingPayload( sections: ThinkingSection[], maxPartLength: number, + options: ThinkingStreamingPayloadOptions = {}, ): StreamingMessagePayload | null { const parts: TelegramRenderedPart[] = []; + const expandable = options.expandable ?? true; for (const section of sections) { const header = formatHeader(section.title); @@ -72,9 +78,23 @@ export function prepareThinkingStreamingPayload( const chunks = text ? splitText(text, textLimit) : [""]; for (const chunk of chunks) { - parts.push(createThinkingPart(header, chunk)); + parts.push(createThinkingPart(header, chunk, expandable)); } } return parts.length > 0 ? { parts } : null; } + +export function makeThinkingPayloadExpandable( + payload: StreamingMessagePayload, +): StreamingMessagePayload { + return { + ...payload, + parts: payload.parts.map((part) => ({ + ...part, + entities: part.entities?.map((entity) => + entity.type === "blockquote" ? { ...entity, type: "expandable_blockquote" } : entity, + ), + })), + }; +} diff --git a/src/bot/services/event-subscription-service.ts b/src/bot/services/event-subscription-service.ts index f05772de..c1cf4ae6 100644 --- a/src/bot/services/event-subscription-service.ts +++ b/src/bot/services/event-subscription-service.ts @@ -49,7 +49,10 @@ import { prepareAssistantStreamingPayload, renderAssistantFinalPartsSafe, } from "../messages/assistant-rendering.js"; -import { prepareThinkingStreamingPayload } from "../messages/thinking-rendering.js"; +import { + makeThinkingPayloadExpandable, + prepareThinkingStreamingPayload, +} from "../messages/thinking-rendering.js"; import { deliverExternalUserInputNotification } from "../messages/external-user-input-notification.js"; import { backgroundSessionTracker, @@ -578,11 +581,15 @@ class EventSubscriptionService implements BotEventSubscriptionService { }); if (update.isFirstUpdate) { - await this.toolCallStreamer.breakSession(update.sessionId, "thinking_started"); + void this.toolCallStreamer.breakSession(update.sessionId, "thinking_started").catch((error) => { + logger.error("[Bot] Failed to break tool stream before thinking message", error); + }); } if (!config.bot.hideThinkingMessages && config.bot.showThinkingContent) { - const payload = prepareThinkingStreamingPayload(update.sections, RESPONSE_STREAM_TEXT_LIMIT); + const payload = prepareThinkingStreamingPayload(update.sections, RESPONSE_STREAM_TEXT_LIMIT, { + expandable: false, + }); if (payload) { payload.sendOptions = { disable_notification: true }; payload.editOptions = undefined; @@ -608,6 +615,22 @@ class EventSubscriptionService implements BotEventSubscriptionService { } }); + summaryAggregator.setOnThinkingFinished((sessionId, messageId) => { + if (!this.botInstance || !this.chatIdInstance) { + return; + } + + const currentSession = getCurrentSession(); + if (!currentSession || currentSession.id !== sessionId) { + return; + } + + logger.debug("[Bot] Agent thinking finished", { sessionId, messageId }); + void this.completeThinkingStream(sessionId, messageId).catch((error) => { + logger.error("[Bot] Failed to finalize thinking stream early", error); + }); + }); + summaryAggregator.setOnTokens(async (tokens, isCompleted) => { if (!pinnedMessageManager.isInitialized()) { return; @@ -1008,14 +1031,15 @@ class EventSubscriptionService implements BotEventSubscriptionService { private async completeThinkingStream(sessionId: string, messageId: string): Promise { const key = this.getThinkingPayloadKey(sessionId, messageId); const payload = this.thinkingStreamingPayloads.get(key); + const finalPayload = payload ? makeThinkingPayloadExpandable(payload) : undefined; const result = await this.responseStreamer.complete( sessionId, this.getThinkingStreamId(messageId), - payload, + finalPayload, ); this.thinkingStreamingPayloads.delete(key); - if (result.streamed || !payload) { + if (result.streamed || !finalPayload) { return; } @@ -1023,12 +1047,12 @@ class EventSubscriptionService implements BotEventSubscriptionService { return; } - for (const part of payload.parts) { + for (const part of finalPayload.parts) { await sendRenderedBotPart({ api: this.botInstance.api, chatId: this.chatIdInstance, part, - options: payload.sendOptions as Parameters[0]["options"], + options: finalPayload.sendOptions as Parameters[0]["options"], }); } } diff --git a/tests/app/managers/summary-aggregation-manager.test.ts b/tests/app/managers/summary-aggregation-manager.test.ts index 58c989aa..3e4a4e20 100644 --- a/tests/app/managers/summary-aggregation-manager.test.ts +++ b/tests/app/managers/summary-aggregation-manager.test.ts @@ -28,6 +28,7 @@ describe("summary/aggregator", () => { summaryAggregator.setOnPartial(() => {}); summaryAggregator.setOnExternalUserInput(() => {}); summaryAggregator.setOnThinking(() => {}); + summaryAggregator.setOnThinkingFinished(() => {}); summaryAggregator.setOnSubagent(() => {}); summaryAggregator.setOnSessionIdle(() => {}); summaryAggregator.setOnSessionError(() => {}); @@ -715,6 +716,106 @@ describe("summary/aggregator", () => { ); }); + it("signals thinking finished once when assistant text starts after reasoning", async () => { + const onThinkingFinished = vi.fn(); + summaryAggregator.setOnThinkingFinished(onThinkingFinished); + summaryAggregator.setSession("session-1"); + + summaryAggregator.processEvent({ + type: "message.updated", + properties: { + info: { + id: "message-thinking-finished", + sessionID: "session-1", + role: "assistant", + time: { created: Date.now() }, + }, + }, + } as unknown as Event); + + summaryAggregator.processEvent({ + type: "message.part.updated", + properties: { + part: { + id: "reasoning-part", + sessionID: "session-1", + messageID: "message-thinking-finished", + type: "reasoning", + text: "Thinking...", + }, + }, + } as unknown as Event); + + summaryAggregator.processEvent({ + type: "message.part.updated", + properties: { + part: { + id: "text-part", + sessionID: "session-1", + messageID: "message-thinking-finished", + type: "text", + text: "Answer start", + }, + }, + } as unknown as Event); + + summaryAggregator.processEvent({ + type: "message.part.delta", + properties: { + part: { + id: "text-part", + sessionID: "session-1", + messageID: "message-thinking-finished", + type: "text", + }, + delta: " continues", + }, + } as unknown as Event); + + await new Promise((resolve) => setImmediate(resolve)); + + expect(onThinkingFinished).toHaveBeenCalledTimes(1); + expect(onThinkingFinished).toHaveBeenCalledWith( + "session-1", + "message-thinking-finished", + ); + }); + + it("does not signal thinking finished for assistant text without reasoning", async () => { + const onThinkingFinished = vi.fn(); + summaryAggregator.setOnThinkingFinished(onThinkingFinished); + summaryAggregator.setSession("session-1"); + + summaryAggregator.processEvent({ + type: "message.updated", + properties: { + info: { + id: "message-no-thinking-finished", + sessionID: "session-1", + role: "assistant", + time: { created: Date.now() }, + }, + }, + } as unknown as Event); + + summaryAggregator.processEvent({ + type: "message.part.updated", + properties: { + part: { + id: "text-part", + sessionID: "session-1", + messageID: "message-no-thinking-finished", + type: "text", + text: "Answer only", + }, + }, + } as unknown as Event); + + await new Promise((resolve) => setImmediate(resolve)); + + expect(onThinkingFinished).not.toHaveBeenCalled(); + }); + it("streams partial text and passes messageId on completion", () => { const onPartial = vi.fn(); const onComplete = vi.fn(); diff --git a/tests/bot/messages/thinking-rendering.test.ts b/tests/bot/messages/thinking-rendering.test.ts index 013d2094..2bb00daf 100644 --- a/tests/bot/messages/thinking-rendering.test.ts +++ b/tests/bot/messages/thinking-rendering.test.ts @@ -1,6 +1,9 @@ import { describe, expect, it } from "vitest"; -import { prepareThinkingStreamingPayload } from "../../../src/bot/messages/thinking-rendering.js"; +import { + makeThinkingPayloadExpandable, + prepareThinkingStreamingPayload, +} from "../../../src/bot/messages/thinking-rendering.js"; import { t } from "../../../src/i18n/index.js"; describe("bot/messages/thinking-rendering", () => { @@ -16,7 +19,7 @@ describe("bot/messages/thinking-rendering", () => { expect(payload?.parts).toEqual([ { text: `${header}\n${text}`, - entities: [{ type: "blockquote", offset: header.length + 1, length: text.length }], + entities: [{ type: "expandable_blockquote", offset: header.length + 1, length: text.length }], fallbackText: `${header}\n> Line one\n> Line two`, source: "entities", }, @@ -38,6 +41,34 @@ describe("bot/messages/thinking-rendering", () => { ]); }); + it("can render thinking content as a regular quote while streaming", () => { + const header = `${t("bot.thinking")} — Analysis`; + const payload = prepareThinkingStreamingPayload( + [{ id: "r1", title: "Analysis", text: "Line one" }], + 3800, + { expandable: false }, + ); + + expect(payload?.parts[0].entities).toEqual([ + { type: "blockquote", offset: header.length + 1, length: "Line one".length }, + ]); + }); + + it("converts streamed quote payloads to expandable quotes for final delivery", () => { + const header = t("bot.thinking"); + const payload = prepareThinkingStreamingPayload([{ id: "r1", text: "Line one" }], 3800, { + expandable: false, + }); + + expect(payload).not.toBeNull(); + + const finalPayload = makeThinkingPayloadExpandable(payload!); + + expect(finalPayload.parts[0].entities).toEqual([ + { type: "expandable_blockquote", offset: header.length + 1, length: "Line one".length }, + ]); + }); + it("splits long thinking content into multiple quoted parts", () => { const header = t("bot.thinking"); const payload = prepareThinkingStreamingPayload( @@ -51,7 +82,7 @@ describe("bot/messages/thinking-rendering", () => { `${header}\nij`, ]); expect(payload?.parts[0].entities).toEqual([ - { type: "blockquote", offset: header.length + 1, length: 4 }, + { type: "expandable_blockquote", offset: header.length + 1, length: 4 }, ]); }); });