diff --git a/Releases/v5.0.0/.claude/PAI/PULSE/lib/threadStore.ts b/Releases/v5.0.0/.claude/PAI/PULSE/lib/threadStore.ts new file mode 100644 index 000000000..38930444b --- /dev/null +++ b/Releases/v5.0.0/.claude/PAI/PULSE/lib/threadStore.ts @@ -0,0 +1,205 @@ +/** + * Thread-aware conversation persistence for Telegram-style chat bots. + * + * Each thread is rooted at a user's top-level message. Replies to bot messages + * resume the thread that produced the replied-to message. Top-level messages + * always start a new thread, even from the same chat — context never bleeds + * across topics. + * + * On-disk shape (single file, atomic write via tmp+rename): + * { + * "threads": { "": { sessionId, history[], topic, lastBotMessageId, updated } }, + * "botMessageToThread": { "": } + * } + * + * threadId = the Telegram message_id of the user's root message (stable per chat). + */ + +import { join } from "path" + +export interface ThreadMessage { + role: "user" | "assistant" + content: string + timestamp: number +} + +export interface Thread { + sessionId?: string + history: ThreadMessage[] + topic: string + lastBotMessageId?: number + updated: number + created: number +} + +interface PersistShape { + threads: Record + botMessageToThread: Record +} + +export class ThreadStore { + private threads: Map = new Map() + private botMessageToThread: Map = new Map() + + constructor( + private readonly path: string, + private readonly maxHistoryPerThread = 40, + private readonly maxBotMessageEntries = 2000, + private readonly threadStaleMs = 30 * 24 * 60 * 60 * 1000, + ) {} + + async load(): Promise { + try { + const file = Bun.file(this.path) + if (!(await file.exists())) return + const raw = (await file.json()) as PersistShape + this.threads = new Map( + Object.entries(raw.threads ?? {}).map(([k, v]) => [Number(k), v]), + ) + this.botMessageToThread = new Map( + Object.entries(raw.botMessageToThread ?? {}).map(([k, v]) => [Number(k), Number(v)]), + ) + this.pruneStale() + } catch { + this.threads = new Map() + this.botMessageToThread = new Map() + } + } + + /** + * Resolve which thread a new incoming message belongs to. + * - If user replied to a bot message we recognize → that thread. + * - Otherwise → new thread rooted at this message_id. + * + * Returns the threadId and a flag indicating whether the thread was just created. + */ + resolveThread(args: { + incomingMessageId: number + incomingText: string + replyToBotMessageId?: number + botId?: number + replyToFromIsBot?: boolean + replyToFromId?: number + }): { threadId: number; created: boolean } { + const { incomingMessageId, incomingText, replyToBotMessageId, replyToFromIsBot, replyToFromId, botId } = args + + if (replyToBotMessageId && replyToFromIsBot && (!botId || replyToFromId === botId)) { + const existingThread = this.botMessageToThread.get(replyToBotMessageId) + if (existingThread !== undefined && this.threads.has(existingThread)) { + return { threadId: existingThread, created: false } + } + } + + const threadId = incomingMessageId + const now = Date.now() + if (!this.threads.has(threadId)) { + this.threads.set(threadId, { + history: [], + topic: incomingText.slice(0, 80), + updated: now, + created: now, + }) + return { threadId, created: true } + } + return { threadId, created: false } + } + + getThread(threadId: number): Thread | undefined { + return this.threads.get(threadId) + } + + /** Last N exchanges for prompt prefix, scoped to one thread. */ + getHistory(threadId: number, limit = 10): Array<{ role: "user" | "assistant"; content: string }> { + const t = this.threads.get(threadId) + if (!t) return [] + return t.history.slice(-limit).map((m) => ({ role: m.role, content: m.content })) + } + + setSessionId(threadId: number, sessionId: string): void { + const t = this.threads.get(threadId) + if (!t) return + t.sessionId = sessionId + t.updated = Date.now() + } + + recordBotMessage(threadId: number, botMessageId: number): void { + const t = this.threads.get(threadId) + if (!t) return + t.lastBotMessageId = botMessageId + t.updated = Date.now() + this.botMessageToThread.set(botMessageId, threadId) + + if (this.botMessageToThread.size > this.maxBotMessageEntries) { + const overflow = this.botMessageToThread.size - this.maxBotMessageEntries + const it = this.botMessageToThread.keys() + for (let i = 0; i < overflow; i++) { + const k = it.next().value + if (k !== undefined) this.botMessageToThread.delete(k) + } + } + } + + async addExchange(threadId: number, userContent: string, assistantContent: string): Promise { + const t = this.threads.get(threadId) + if (!t) return + const now = Date.now() + t.history.push( + { role: "user", content: userContent, timestamp: now }, + { role: "assistant", content: assistantContent, timestamp: now }, + ) + if (t.history.length > this.maxHistoryPerThread) { + t.history = t.history.slice(-this.maxHistoryPerThread) + } + t.updated = now + await this.persist() + } + + listThreads(): Array<{ threadId: number; topic: string; messageCount: number; updated: number; created: number }> { + return [...this.threads.entries()] + .map(([threadId, t]) => ({ + threadId, + topic: t.topic, + messageCount: t.history.length, + updated: t.updated, + created: t.created, + })) + .sort((a, b) => b.updated - a.updated) + } + + async clearAll(): Promise { + this.threads.clear() + this.botMessageToThread.clear() + await this.persist() + } + + size(): { threads: number; botMessages: number } { + return { threads: this.threads.size, botMessages: this.botMessageToThread.size } + } + + private pruneStale(): void { + const cutoff = Date.now() - this.threadStaleMs + for (const [id, t] of this.threads) { + if (t.updated < cutoff) { + this.threads.delete(id) + if (t.lastBotMessageId !== undefined) this.botMessageToThread.delete(t.lastBotMessageId) + } + } + } + + async persist(): Promise { + const shape: PersistShape = { + threads: Object.fromEntries([...this.threads.entries()].map(([k, v]) => [String(k), v])), + botMessageToThread: Object.fromEntries( + [...this.botMessageToThread.entries()].map(([k, v]) => [String(k), v]), + ), + } + const tmp = this.path + ".tmp" + await Bun.write(tmp, JSON.stringify(shape, null, 2)) + const fs = await import("fs/promises") + await fs.rename(tmp, this.path) + } +} + +export function defaultThreadStorePath(homeDir: string): string { + return join(homeDir, ".claude", "PAI", "PULSE", "state", "telegram", "threads.json") +} diff --git a/Releases/v5.0.0/.claude/PAI/PULSE/modules/telegram.ts b/Releases/v5.0.0/.claude/PAI/PULSE/modules/telegram.ts index 22b0fea2e..8cd83fdd3 100644 --- a/Releases/v5.0.0/.claude/PAI/PULSE/modules/telegram.ts +++ b/Releases/v5.0.0/.claude/PAI/PULSE/modules/telegram.ts @@ -5,15 +5,23 @@ * Does NOT create its own HTTP server — health is reported via the * parent's /health endpoint using telegramHealth(). * - * Architecture: grammY polling → auth → SDK session → stream → Telegram + * Architecture: grammY polling → auth → thread resolve → SDK session (per thread) → stream → Telegram + * + * Threading model (v2 — 2026-05-18): + * - Each top-level user message starts a fresh thread. + * - A reply to a bot message resumes THAT thread's SDK session. + * - /new clears all thread state. /threads lists active threads. + * - Empty SDK responses retry once with a fresh session; second failure + * returns a specific diagnosis (timeout / max_turns / empty) and is + * NOT written into thread history. */ import { Bot } from "grammy" import { query } from "@anthropic-ai/claude-agent-sdk" -import { ConversationStore } from "../lib/conversation" +import { ThreadStore, defaultThreadStorePath } from "../lib/threadStore" import { sanitize, analyzeForInjection } from "../lib/sanitize" import { join } from "path" -import { appendFile, mkdir } from "fs/promises" +import { appendFile, mkdir, readdir, readFile } from "fs/promises" // BILLING: Strip ANTHROPIC_API_KEY before any SDK query() call. Bun auto-loads // ~/.claude/.env into this process; if the key is present, @anthropic-ai/claude-agent-sdk @@ -39,18 +47,19 @@ const HOME = process.env.HOME ?? "" const CWD = join(HOME, ".claude") const STATE_DIR = join(HOME, ".claude", "PAI", "PULSE", "state", "telegram") const LOGS_DIR = join(HOME, ".claude", "PAI", "PULSE", "logs", "telegram") +const WORK_DIR = join(HOME, ".claude", "PAI", "MEMORY", "WORK") const MAX_TELEGRAM_LENGTH = 4096 const CURSOR = " ▌" // ── Module State ── let bot: Bot | null = null -let conversationStore: ConversationStore | null = null +let threadStore: ThreadStore | null = null +let botUserId: number | undefined let processing = false let startedAt = 0 let messagesReceived = 0 let messagesResponded = 0 -let lastSessionId: string | undefined let activeConfig: TelegramConfig | null = null // ── Logging ── @@ -68,16 +77,33 @@ function log(level: "info" | "warn" | "error", msg: string, data?: unknown) { // ── Chat Log ── -async function appendChatLog(userMsg: string, botMsg: string) { +async function appendChatLog(threadId: number, userMsg: string, botMsg: string) { const chatLogPath = join(LOGS_DIR, "chat-log.md") const ts = new Date().toLocaleString("en-US", { timeZone: "America/Los_Angeles", month: "short", day: "numeric", hour: "2-digit", minute: "2-digit", }) - const entry = `\n### ${ts}\n**{{PRINCIPAL_NAME}}:** ${userMsg}\n\n**{{DA_NAME}}:** ${botMsg}\n\n---\n` + const entry = `\n### ${ts} · thread ${threadId}\n**{{PRINCIPAL_NAME}}:** ${userMsg}\n\n**{{DA_NAME}}:** ${botMsg}\n\n---\n` await appendFile(chatLogPath, entry).catch(() => {}) } +// ── Tool → friendly status mapping ── + +function friendlyToolStatus(toolName: string | undefined): string | null { + if (!toolName) return null + const n = toolName.toLowerCase() + if (n.includes("gmail") || n.includes("mail")) return "📬 Reading email…" + if (n.includes("calendar")) return "📅 Checking calendar…" + if (n.includes("read")) return "📖 Reading file…" + if (n.includes("write") || n.includes("edit")) return "✏️ Editing…" + if (n.includes("bash") || n.includes("shell")) return "⚡ Running command…" + if (n.includes("grep") || n.includes("search")) return "🔍 Searching…" + if (n.includes("skill")) return "🛠️ Invoking skill…" + if (n.includes("agent") || n.includes("task")) return "🤖 Delegating…" + if (n.includes("fetch") || n.includes("http")) return "🌐 Fetching…" + return `🔧 ${toolName}…` +} + // ── Exports ── /** @@ -119,9 +145,9 @@ export async function startTelegram(config: TelegramConfig): Promise { await mkdir(STATE_DIR, { recursive: true }) await mkdir(LOGS_DIR, { recursive: true }) - // Initialize conversation store - conversationStore = new ConversationStore(join(STATE_DIR, "conversations.json")) - await conversationStore.load() + // Initialize thread store + threadStore = new ThreadStore(defaultThreadStorePath(HOME)) + await threadStore.load() // Create bot activeConfig = config @@ -129,11 +155,11 @@ export async function startTelegram(config: TelegramConfig): Promise { messagesReceived = 0 messagesResponded = 0 processing = false - lastSessionId = undefined + botUserId = undefined bot = new Bot(token) - // Auth middleware + // Auth middleware — applies to commands AND messages bot.use(async (ctx, next) => { const userId = ctx.from?.id if (!userId || !allowedUsers.has(userId)) { @@ -143,18 +169,91 @@ export async function startTelegram(config: TelegramConfig): Promise { await next() }) - // Message handler — sequential processing + // ── Commands ── (registered BEFORE the catch-all message handler) + + bot.command("new", async (ctx) => { + if (!threadStore) return + const { threads } = threadStore.size() + await threadStore.clearAll() + await ctx.reply(`🧹 Cleared ${threads} thread(s). Next message starts fresh.`) + log("info", "Thread state cleared via /new", { threadsCleared: threads }) + }) + + bot.command("help", async (ctx) => { + const lines = [ + "*I'm your DA over Telegram.*", + "", + "Threading:", + "• Top-level message = new thread", + "• Reply to one of my messages = continue that thread", + "", + "Commands:", + "• `/new` — clear all threads, start fresh", + "• `/status` — current config + counts", + "• `/threads` — list active threads", + "• `/help` — this message", + "", + "Otherwise just talk to me — I have full PAI access.", + ] + await ctx.reply(lines.join("\n"), { parse_mode: "Markdown" }).catch(() => ctx.reply(lines.join("\n"))) + }) + + bot.command("status", async (ctx) => { + if (!threadStore) return + const sizes = threadStore.size() + const uptimeMin = Math.round((Date.now() - startedAt) / 60_000) + const lines = [ + "*Status*", + `Uptime: ${uptimeMin}m`, + `Threads: ${sizes.threads}`, + `Bot-message map: ${sizes.botMessages}`, + `Received: ${messagesReceived} · Responded: ${messagesResponded}`, + `Processing: ${processing ? "yes" : "no"}`, + `Max turns: ${maxTurns} · Timeout: ${Math.round(sdkTimeoutMs / 1000)}s`, + ] + await ctx.reply(lines.join("\n"), { parse_mode: "Markdown" }).catch(() => ctx.reply(lines.join("\n"))) + }) + + bot.command("threads", async (ctx) => { + if (!threadStore) return + const list = threadStore.listThreads().slice(0, 20) + if (list.length === 0) { + await ctx.reply("No active threads.") + return + } + const now = Date.now() + const lines = ["*Active threads:*", ""] + for (const t of list) { + const ageMin = Math.round((now - t.updated) / 60_000) + const ageLabel = ageMin < 60 ? `${ageMin}m` : ageMin < 1440 ? `${Math.round(ageMin / 60)}h` : `${Math.round(ageMin / 1440)}d` + const topic = t.topic.replace(/[*_`\[\]]/g, "").slice(0, 60) + lines.push(`• #${t.threadId} · ${t.messageCount}msg · ${ageLabel} · ${topic}`) + } + await ctx.reply(lines.join("\n"), { parse_mode: "Markdown" }).catch(() => ctx.reply(lines.join("\n"))) + }) + + // Message handler — sequential processing, thread-aware bot.on("message:text", async (ctx) => { const text = ctx.message.text const userId = ctx.from.id const chatId = ctx.chat.id + // Skip unknown slash commands rather than feeding them to the SDK + if (text.startsWith("/")) { + log("info", "Unknown command ignored", { text: text.slice(0, 32) }) + return + } + messagesReceived++ - log("info", "Message received", { userId, chatId, textLength: text.length }) + log("info", "Message received", { userId, chatId, messageId: ctx.message.message_id, textLength: text.length }) // Sanitize input const sanitized = sanitize(text) - if (!sanitized) return + if (!sanitized.trim()) { + log("warn", "Empty message after sanitize", { userId, originalLength: text.length }) + await ctx.reply("⚠️ Leere Nachricht. Bitte etwas Text schicken — oder /help für Befehle.").catch(() => {}) + return + } const injection = analyzeForInjection(sanitized) if (injection.riskLevel === "CRITICAL") { @@ -172,33 +271,154 @@ export async function startTelegram(config: TelegramConfig): Promise { processing = true const startTime = Date.now() + // ── Resolve which thread this message belongs to ── + const replyTo = ctx.message.reply_to_message + const { threadId, created: threadCreated } = threadStore!.resolveThread({ + incomingMessageId: ctx.message.message_id, + incomingText: sanitized, + replyToBotMessageId: replyTo?.message_id, + replyToFromIsBot: replyTo?.from?.is_bot, + replyToFromId: replyTo?.from?.id, + botId: botUserId, + }) + const thread = threadStore!.getThread(threadId) + log("info", "Thread resolved", { threadId, created: threadCreated, sessionId: thread?.sessionId }) + try { // Typing indicator await ctx.api.sendChatAction(chatId, "typing").catch(() => {}) - // Build prompt with conversation history context - const history = conversationStore!.getHistory() + // Build prompt with per-thread history + const history = threadStore!.getHistory(threadId, 10) let prompt = sanitized if (history.length > 0) { const historyText = history - .slice(-10) // Last 5 exchanges for context .map(m => `${m.role === "user" ? "Principal" : "DA"}: ${m.content}`) .join("\n") prompt = `Previous conversation:\n${historyText}\n\nPrincipal's new message: ${sanitized}` } - const sdkOptions: Record = { - cwd: CWD, - tools: { type: "preset", preset: "claude_code" }, - settingSources: ["user", "project"], // NO "local" — skip CLAUDE.md to avoid Algorithm/format/voice curls + const result = await runSdk({ + prompt, + resumeSessionId: thread?.sessionId, + ctx, + chatId, + threadId, maxTurns, - includePartialMessages: true, - permissionMode: "bypassPermissions", - allowDangerouslySkipPermissions: true, - systemPrompt: { - type: "preset", - preset: "claude_code", - append: `\n\n## TELEGRAM MODE OVERRIDE (highest priority — overrides CLAUDE.md format rules) + sdkTimeoutMs, + editIntervalMs, + }) + + // Empty → retry ONCE with fresh session (no resume) and prefix hint. + // We do NOT write the empty/diagnostic into thread history. + if (!result.text) { + log("warn", "Empty response — retrying once with fresh session", { threadId, subtype: result.subtype }) + const retryPrompt = `[Previous attempt produced no output — try a more direct approach this time.]\n\n${sanitized}` + const retry = await runSdk({ + prompt: retryPrompt, + resumeSessionId: undefined, + ctx, + chatId, + threadId, + maxTurns, + sdkTimeoutMs, + editIntervalMs, + existingMessageId: result.messageId ?? undefined, + }) + result.text = retry.text + result.subtype = retry.subtype ?? result.subtype + result.sessionId = retry.sessionId ?? result.sessionId + result.messageId = retry.messageId ?? result.messageId + result.timedOut = retry.timedOut || result.timedOut + } + + // Final outcome + if (result.text) { + // Persist the new sessionId on this thread + if (result.sessionId) threadStore!.setSessionId(threadId, result.sessionId) + + // Send / edit final clean message and capture bot message id for reply-chain + const finalMessageId = await sendFinal(ctx, chatId, result.text, result.messageId) + if (finalMessageId !== null) { + threadStore!.recordBotMessage(threadId, finalMessageId) + } + + await threadStore!.addExchange(threadId, sanitized, result.text) + await appendChatLog(threadId, sanitized, result.text) + messagesResponded++ + + log("info", "Response sent", { + threadId, + durationMs: Date.now() - startTime, + responseLength: result.text.length, + sessionId: result.sessionId, + }) + } else { + // Both attempts empty → specific diagnostic, NOT written into history + const reason = diagnoseFailure(result.subtype, result.timedOut) + log("error", "Empty response after retry", { threadId, subtype: result.subtype, timedOut: result.timedOut }) + const finalMessageId = await sendFinal(ctx, chatId, reason, result.messageId) + if (finalMessageId !== null) { + // Map this diagnostic msg back to the thread so the user can still reply-to-continue + threadStore!.recordBotMessage(threadId, finalMessageId) + } + } + } catch (err) { + log("error", "Message processing failed", { error: String(err), threadId }) + await ctx.reply("Something went wrong processing your message. Try again?").catch(() => {}) + } finally { + processing = false + } + }) + + // Start polling + log("info", "Starting Telegram polling", { allowedUsers: [...allowedUsers] }) + + await bot.start({ + onStart: (info) => { + botUserId = info.id + log("info", `Bot started: @${info.username}`, { botId: info.id }) + }, + }) +} + +// ── SDK runner with live status streaming ── + +interface SdkRunArgs { + prompt: string + resumeSessionId: string | undefined + ctx: any + chatId: number + threadId: number + maxTurns: number + sdkTimeoutMs: number + editIntervalMs: number + existingMessageId?: number +} + +interface SdkRunResult { + text: string + sessionId?: string + subtype?: string + timedOut: boolean + messageId: number | null +} + +async function runSdk(args: SdkRunArgs): Promise { + const { prompt, resumeSessionId, ctx, chatId, maxTurns, sdkTimeoutMs, editIntervalMs, existingMessageId } = args + + const sdkOptions: Record = { + cwd: CWD, + tools: { type: "preset", preset: "claude_code" }, + settingSources: ["user", "project"], + maxTurns, + includePartialMessages: true, + permissionMode: "bypassPermissions", + allowDangerouslySkipPermissions: true, + systemPrompt: { + type: "preset", + preset: "claude_code", + append: `\n\n## TELEGRAM MODE OVERRIDE (highest priority — overrides CLAUDE.md format rules) You are {{DA_NAME}}, responding via Telegram. {{PRINCIPAL_NAME}} is messaging you from his phone. @@ -212,141 +432,148 @@ CRITICAL RULES FOR TELEGRAM MODE: - NEVER use voice notification curls (no http://localhost:31337/notify calls) - You have ALL PAI capabilities — skills, email, calendar, lights, everything - When doing tasks, do them and confirm briefly what you did`, - }, - } - - // Resume previous session for context continuity - if (lastSessionId) { - sdkOptions.resume = lastSessionId - } + }, + } - const conversation = query({ prompt, options: sdkOptions as any }) + if (resumeSessionId) sdkOptions.resume = resumeSessionId - // Collect response with timeout - let fullText = "" - let messageId: number | null = null - let lastEditTime = 0 + const conversation = query({ prompt, options: sdkOptions as any }) - const timeoutController = new AbortController() - const timeout = setTimeout(() => timeoutController.abort(), sdkTimeoutMs) + let fullText = "" + let messageId: number | null = existingMessageId ?? null + let lastEditTime = 0 + let lastStatus: string | null = null + let sessionId: string | undefined + let subtype: string | undefined + let timedOut = false - try { - for await (const message of conversation) { - if (timeoutController.signal.aborted) break + const timeoutController = new AbortController() + const timeout = setTimeout(() => { + timedOut = true + timeoutController.abort() + }, sdkTimeoutMs) - const msg = message as any + const editStatus = async (statusText: string) => { + if (statusText === lastStatus) return + lastStatus = statusText + try { + if (!messageId) { + const sent = await ctx.reply(statusText) + messageId = sent.message_id + } else { + await ctx.api.editMessageText(chatId, messageId, statusText).catch(() => {}) + } + } catch { /* non-critical */ } + } - // Capture session ID for resume - if (msg.type === "system" && msg.subtype === "init" && msg.session_id) { - lastSessionId = msg.session_id - log("info", "Session initialized", { sessionId: lastSessionId }) - } + try { + for await (const message of conversation) { + if (timeoutController.signal.aborted) break + const msg = message as any - // Streaming text deltas (progressive updates) - if (msg.type === "stream_event" && msg.event?.type === "content_block_delta" && - msg.event?.delta?.type === "text_delta" && msg.event.delta.text) { - fullText += msg.event.delta.text - } + if (msg.type === "system" && msg.subtype === "init" && msg.session_id) { + sessionId = msg.session_id + } - // Full assistant message (fallback if streaming not available) - if (msg.type === "assistant" && Array.isArray(msg.message?.content)) { - for (const block of msg.message.content) { - if (block.type === "text" && block.text) { - if (!fullText) fullText = block.text - } - } - } + // Streaming text deltas + if (msg.type === "stream_event" && msg.event?.type === "content_block_delta" && + msg.event?.delta?.type === "text_delta" && msg.event.delta.text) { + fullText += msg.event.delta.text + } - // Final result - if (msg.type === "result") { - if (msg.subtype === "success" && msg.result) { - fullText = msg.result - } - if (msg.session_id) lastSessionId = msg.session_id - log("info", "SDK session complete", { - durationMs: Date.now() - startTime, - numTurns: msg.num_turns, - cost: msg.total_cost_usd, - sessionId: lastSessionId, - }) + // Tool-use detection for status updates + if (msg.type === "stream_event" && msg.event?.type === "content_block_start" && + msg.event?.content_block?.type === "tool_use" && !fullText) { + const status = friendlyToolStatus(msg.event.content_block.name) + if (status) await editStatus(status) + } + if (msg.type === "assistant" && Array.isArray(msg.message?.content) && !fullText) { + for (const block of msg.message.content) { + if (block.type === "tool_use") { + const status = friendlyToolStatus(block.name) + if (status) await editStatus(status) } + } + } - // Live edit updates in Telegram - const now = Date.now() - if (fullText && now - lastEditTime >= editIntervalMs) { - const displayText = fullText.slice(0, MAX_TELEGRAM_LENGTH - 10) + CURSOR - try { - if (!messageId) { - const sent = await ctx.reply(displayText) - messageId = sent.message_id - } else { - await ctx.api.editMessageText(chatId, messageId, displayText).catch(() => {}) - } - lastEditTime = now - } catch { /* edit failures are non-critical */ } + // Full assistant message (fallback if streaming text isn't available) + if (msg.type === "assistant" && Array.isArray(msg.message?.content)) { + for (const block of msg.message.content) { + if (block.type === "text" && block.text && !fullText) { + fullText = block.text } } - } finally { - clearTimeout(timeout) } - if (!fullText) { - fullText = "Sorry, I wasn't able to generate a response. Try again?" - log("error", "Empty response from SDK") + // Final result + if (msg.type === "result") { + if (msg.subtype === "success" && msg.result) fullText = msg.result + subtype = msg.subtype + if (msg.session_id) sessionId = msg.session_id } - // Final clean message - if (fullText.length <= MAX_TELEGRAM_LENGTH) { - if (messageId) { - await ctx.api.editMessageText(chatId, messageId, fullText).catch(() => {}) - } else { - await ctx.reply(fullText) - } - } else { - // Split long messages - const chunks: string[] = [] - let remaining = fullText - while (remaining.length > 0) { - chunks.push(remaining.slice(0, MAX_TELEGRAM_LENGTH)) - remaining = remaining.slice(MAX_TELEGRAM_LENGTH) - } - if (messageId) { - await ctx.api.editMessageText(chatId, messageId, chunks[0]!).catch(() => {}) - for (const chunk of chunks.slice(1)) { - await ctx.reply(chunk) - } - } else { - for (const chunk of chunks) { - await ctx.reply(chunk) + // Live text edits + const now = Date.now() + if (fullText && now - lastEditTime >= editIntervalMs) { + const display = fullText.slice(0, MAX_TELEGRAM_LENGTH - 10) + CURSOR + try { + if (!messageId) { + const sent = await ctx.reply(display) + messageId = sent.message_id + } else { + await ctx.api.editMessageText(chatId, messageId, display).catch(() => {}) } - } + lastEditTime = now + } catch { /* non-critical */ } } + } + } finally { + clearTimeout(timeout) + } - messagesResponded++ - log("info", "Response sent", { durationMs: Date.now() - startTime, responseLength: fullText.length }) + return { text: fullText, sessionId, subtype, timedOut, messageId } +} - // Persist conversation - await conversationStore!.addExchange(sanitized, fullText) - await appendChatLog(sanitized, fullText) +function diagnoseFailure(subtype: string | undefined, timedOut: boolean): string { + if (timedOut) return "⏱️ Timeout nach 120s. Bitte den Task kleiner schneiden oder mit /new neu starten." + if (subtype === "error_max_turns") return "🔁 Hat zu viele Schritte gebraucht (Max-Turns erreicht). Bitte konkretere Frage stellen." + if (subtype === "error_during_execution") return "⚠️ Fehler in der Ausführung. Nochmal versuchen, ggf. /new." + if (subtype && subtype !== "success") return `⚠️ Konnte keine Antwort erzeugen (subtype=${subtype}).` + return "⚠️ Leere Antwort. Bitte nochmal — oder /new für frischen Kontext." +} - } catch (err) { - log("error", "Message processing failed", { error: String(err) }) - await ctx.reply("Something went wrong processing your message. Try again?").catch(() => {}) - } finally { - processing = false +async function sendFinal(ctx: any, chatId: number, text: string, existingMessageId: number | null): Promise { + if (text.length <= MAX_TELEGRAM_LENGTH) { + if (existingMessageId) { + const ok = await ctx.api.editMessageText(chatId, existingMessageId, text).catch(() => null) + return ok ? existingMessageId : null + } else { + const sent = await ctx.reply(text).catch(() => null) + return sent ? sent.message_id : null } - }) - - // Start polling — await keeps startTelegram() alive until bot.stop() is called. - // Without await, the supervisor thinks the function exited and restarts it, - // causing a grammY 409 conflict (two polling loops on the same bot token). - log("info", "Starting Telegram polling", { allowedUsers: [...allowedUsers] }) - - await bot.start({ - onStart: (info) => { - log("info", `Bot started: @${info.username}`, { botId: info.id }) - }, - }) + } + // Split long messages + const chunks: string[] = [] + let remaining = text + while (remaining.length > 0) { + chunks.push(remaining.slice(0, MAX_TELEGRAM_LENGTH)) + remaining = remaining.slice(MAX_TELEGRAM_LENGTH) + } + let lastId: number | null = null + if (existingMessageId && chunks[0]) { + await ctx.api.editMessageText(chatId, existingMessageId, chunks[0]).catch(() => {}) + lastId = existingMessageId + for (const chunk of chunks.slice(1)) { + const sent = await ctx.reply(chunk).catch(() => null) + if (sent) lastId = sent.message_id + } + } else { + for (const chunk of chunks) { + const sent = await ctx.reply(chunk).catch(() => null) + if (sent) lastId = sent.message_id + } + } + return lastId } /** @@ -370,7 +597,7 @@ export function telegramHealth(): { messages_received: number messages_responded: number processing: boolean - last_session_id?: string + active_threads?: number } { if (!bot) { return { @@ -388,6 +615,6 @@ export function telegramHealth(): { messages_received: messagesReceived, messages_responded: messagesResponded, processing, - last_session_id: lastSessionId, + active_threads: threadStore?.size().threads, } }