diff --git a/docs/telegram-bridge.md b/docs/telegram-bridge.md index 4cde5d0..f6e2c25 100644 --- a/docs/telegram-bridge.md +++ b/docs/telegram-bridge.md @@ -6,9 +6,13 @@ any phone running Telegram — DM the bot, get a final reply back. Inbound messages route through the **shared session-dispatch pipeline** (`hub/src/dispatch/`) — the same gate ordering (threshold → daily-cost-cap), per-session queue, and offline-grace buffer scheduled tasks and the other subsystems use — with -`store: null` (Telegram writes no run row). Outbound forwards only the *final* -`assistant_message` (no streaming, no `thinking`/`tool_use`/`text_delta` chatter) -via the event bus, NOT a pipeline finalize hook. +`store: null` (Telegram writes no run row). Outbound forwards the final +`assistant_message` plus a **summarized live stream** — an editable "working…" +message that collapses each `tool_use` to a one-liner while the turn runs, then +finalizes to the full assistant text. `thinking` and raw `text_delta` chatter are +never forwarded. All user-facing messages use Telegram **MarkdownV2** with a +400→plain-text fallback. Delivery rides the event buses, NOT a pipeline finalize +hook. A single hub-wide bot serves every user, keyed by `users.telegram_chat_id`. One BotFather bot, one Telegram webhook secret, one redeploy. @@ -206,13 +210,75 @@ When a Telegram-driven Claude session hits a tool **permission/approval** prompt 1. `hub/src/ws/agent.ts` already broadcasts `permission_request` to web subscribers. It now ALSO emits a `permission_request:pending` event on the dedicated bus `hub/src/events/permission-events.ts` (same isolation discipline as the `assistant_message:final` bus — a listener throw can't tear down the WS handler). 2. The bridge subscribes (`onPermissionPendingEvent` in `bridge.ts`). For every user whose `telegram_default_session_id` matches the emitting session (reusing `getUsersWithTelegramDefaultSession`), it sends an inline keyboard `[✅ Approve] [🚫 Deny]` with `callback_data` `pa:` / `pd:`, plus a one-line preview of the tool input (e.g. the Bash `command`). -3. It records the pending prompt in `hub/src/telegram/approvals.ts` keyed by `request_id` → `{ sessionId, userId, chatId, messageId, toolName }`. **Why a server-side map and not callback_data:** Telegram caps `callback_data` at 64 bytes — too small for session UUID + request UUID + a user binding. The map keeps `callback_data` tiny AND the entry's `userId` enforces authorization. -4. On a tap, the webhook's `handlePermissionCallback` looks up + removes the prompt (resolved exactly once), verifies it belongs to the tapping user, then forwards `{ type: "permission_response", session_id, request_id, approved }` onto the session's **agent socket** via `getChannel(sessionId).ws.send(...)` — the exact frame the web client sends. It edits the prompt message to `✅ Approved — ` / `🚫 Denied — ` and drops the buttons. `answerCallbackQuery` fires on every branch. - -**Authorization + edge cases (all answer the callback):** foreign user → `Not allowed` (`callback_permission_denied_auth`, nothing sent to the agent); unknown/expired `request_id` → `This prompt already expired or was answered.` (`callback_permission_stale`); session socket gone → `Session is offline — couldn't deliver.` (`callback_permission_offline`). Prompts expire after `PROMPT_TTL_MS` (10 min) and are pruned lazily on each remember. +3. It records the pending prompt in `hub/src/telegram/approvals.ts` keyed by + **`(sessionId, requestId)`**, holding a **map of every authorized + `userId → { chatId, messageId }`** plus `{ sessionId, toolName, createdAtMs }`. + **Why a server-side map and not callback_data:** Telegram caps `callback_data` + at 64 bytes — too small for session UUID + request UUID + a user binding. The + map keeps `callback_data` tiny AND the per-user set enforces authorization. + **Shared-session re-keying (MED fast-follow from the #189 review):** the + original design keyed by `requestId` alone, so when two users shared one + default session the second `rememberPendingPrompt` *clobbered* the first — + leaving the prompt bound to whichever user the bridge processed last and + locking out every other (valid) approver with "Not allowed" (fail-closed, but + wrong). Re-keying by `(sessionId, requestId)` and merging each authorized user + into the entry fixes this: any authorized user can resolve, exactly once. +4. On a tap, the webhook's `handlePermissionCallback` calls + `takePendingPrompt(requestId, tappingUserId)` — authorization is now folded + into the take: it returns (and removes) the entry only if the tapping user is + an authorized approver, resolving the whole `(sessionId, requestId)` exactly + once. It then forwards `{ type: "permission_response", session_id, request_id, + approved }` onto the session's **agent socket** via + `getChannel(sessionId).ws.send(...)` — the exact frame the web client sends. It + edits the prompt message to `✅ Approved — ` / `🚫 Denied — ` and + drops the buttons. `answerCallbackQuery` fires on every branch. + +**Authorization + edge cases (all answer the callback):** an unauthorized / +unknown / expired / already-resolved tap → `This prompt already expired or was +answered.` (`callback_permission_stale`, nothing sent to the agent — authorization +is enforced inside `takePendingPrompt`); session socket gone → `Session is +offline — couldn't deliver.` (`callback_permission_offline`). Prompts expire after +`PROMPT_TTL_MS` (10 min) and are pruned lazily on each remember. **No new dispatch path.** Inline approval forwards a control frame on an existing agent socket; it does NOT route a user→session message and therefore does NOT (and must not) touch the cost-cap dispatch pipeline — it's a response to a runner-initiated prompt, not new traffic. +### Summarized streaming (MarkdownV2 + editable working message) + +A Telegram-driven session no longer goes silent until one final blob lands. While +a turn runs, the bridge maintains **one editable "working…" message per +`(chat, session)`** and keeps a `typing` chat-action alive. + +- **Activity bus.** `hub/src/ws/agent.ts` emits a `session_activity` event (kind + `tool_use` only) on the dedicated bus `hub/src/events/session-activity-events.ts` + — same listener-isolation discipline as `assistant_message:final`. This is an + outbound read-only fanout, **not** a dispatch path (dispatch = inbound + user→session in `hub/src/dispatch/`); the cost-cap gate is untouched. +- **Working message.** On the first `tool_use` of a turn the bridge sends + `⏳ *Working…*` (MarkdownV2) and records `{ messageId, lines }`. Each subsequent + `tool_use` appends a collapsed one-liner — `🔧 Edit hub/src/foo.ts`, + `🔧 Bash ` — and edits the message in place (throttled ~900ms, list capped + at the last 12 lines). `thinking` is omitted entirely. +- **Typing indicator.** `sendChatAction(chat, "typing")` fires immediately and + then every ~4s (Telegram's typing state expires ~5s) until the turn finalizes. +- **Finalize.** On `assistant_message:final` the bridge stops typing and **edits + the same working message** to the full assistant text (escaped MarkdownV2). If + the final text exceeds 4096 chars it leaves the working summary and sends the + full text as a follow-up. When no `tool_use` ran (or streaming is off) there is + no working message and the final text is sent as a fresh MarkdownV2 message. +- **Reversible flag.** Gated on `config.telegram.summarizedStreaming` (env + `TELEGRAM_SUMMARIZED_STREAMING`, default **on**; set to `false` to revert to a + single final-blob send). The flag only affects the working-message behavior — + finalization (and MarkdownV2) work regardless. + +**MarkdownV2 safety.** `sendMessageMd` / `editMessageTextMd` send with +`parse_mode: MarkdownV2`; an unescaped reserved char makes Telegram reject the +WHOLE message with 400, so on a 400 they **retry once as plain text** — a session +reply is never silently dropped. `escapeMarkdownV2` escapes the full reserved set +`_ * [ ] ( ) ~ \` > # + - = | { } . !` (and `\\`); callers escape dynamic content +and keep only their own intentional markup (`*bold*`, ```code```). The inline +Approve/Deny prompt is also MarkdownV2 with the same fallback; the keyboard is +unaffected. + ### Pagination edits `safeEditMessageText` swallows Telegram's `400 message is not modified` error (no-op success when the new payload is byte-identical to the previous). All other errors are surfaced via `console.error` instead of `console.warn`, so paginate-not-firing-silently bugs surface in logs. @@ -242,8 +308,13 @@ supervisor → hub WS /ws/agent ← emits assistant_message hub/src/events/assistant-events.ts ← emits assistant_message:final ↓ hub/src/telegram/bridge.ts ← gates on default-session match + → finalize working msg OR fresh send + → sendMessageMd (MarkdownV2 + 400→plain) → splitForTelegram (4096 chunks) - → client.sendMessage (per-chat serial queue) + → per-chat serial queue + ↑ (during the turn) +hub/src/events/session-activity-events.ts ← tool_use one-liners → edit working msg + + typing chat-action (~4s refresh) ↓ Telegram chat ``` @@ -294,8 +365,10 @@ Key boundaries: - 10MB photo cap. Larger photos → polite reject. - One default session per user. Multi-session fan-out is out of scope (v1). - No group chats (1 chat_id → 1 user, enforced by UNIQUE). -- No voice / video / stickers / animations / inline keyboards / message editing - / streaming partial replies. All deferred. +- No voice / video / stickers / animations. Deferred. +- Summarized streaming edits one working message per turn; it does NOT stream raw + token deltas (by design — `thinking`/`text_delta` are dropped). Disable via + `TELEGRAM_SUMMARIZED_STREAMING=false`. ## Migration from the legacy per-user post-run telegram path @@ -320,7 +393,8 @@ has soaked. Until then, both code paths coexist. - `hub/src/api/telegram-webhook.ts` — public ingress, URL-path secret, raw-body-before-parse, zod-validated `Update` envelope, audit-row append, command vs dispatch routing. - `hub/src/api/telegram.ts` — authed REST: `GET /status`, `POST /link-code`, `DELETE /link`, `PUT /default-session`. Cookie auth + CSRF double-submit (Phase 07 pattern). -- `hub/src/telegram/client.ts` — `sendMessage` / `sendMessageWithKeyboard` (returns `message_id`) / `editMessageText` / `answerCallbackQuery` / `setMyCommands` / `getFile`, `escapeMarkdownV2`, `splitForTelegram`. 10s `AbortSignal.timeout`. Per-chat outbound serial queue lives in `bridge.ts`. +- `hub/src/telegram/client.ts` — `sendMessage` / `sendMessageMd` (MarkdownV2 + 400→plain fallback, returns `message_id`) / `sendMessageWithKeyboard` (now accepts `parse_mode`) / `editMessageText` / `editMessageTextMd` (MarkdownV2 + 400→plain, swallows "not modified") / `sendChatAction` / `answerCallbackQuery` / `setMyCommands` / `getFile`, `escapeMarkdownV2`, `splitForTelegram`. 10s `AbortSignal.timeout`. Per-chat outbound serial queue lives in `bridge.ts`. +- `hub/src/events/session-activity-events.ts` — internal `EventEmitter` for `session_activity` (`tool_use` one-liners). Outbound read-only fanout for the summarized-streaming working message; **not** a dispatch path. **(new — streaming)** - `hub/src/telegram/commands.ts` — `parse(text)` + handlers for `/start` `/session` `/list` `/status` `/doctor` `/help`; `BOT_COMMANDS` (single source of truth for the slash menu + `/help`). - `hub/src/telegram/approvals.ts` — inline-approval registry (`rememberPendingPrompt`/`takePendingPrompt`, TTL-pruned) + `pa:`/`pd:` callback_data codec. **(new — Fix C)** - `hub/src/events/permission-events.ts` — internal `EventEmitter` for `permission_request:pending`. Additive — does not change the WS broadcast path. **(new — Fix C)** @@ -331,12 +405,12 @@ has soaked. Until then, both code paths coexist. ### Modified (hub) -- `hub/src/config.ts` — `config.telegram.{botToken,webhookSecret,botUsername}` (all optional). +- `hub/src/config.ts` — `config.telegram.{botToken,webhookSecret,botUsername,summarizedStreaming}` (all optional; `summarizedStreaming` defaults true, env `TELEGRAM_SUMMARIZED_STREAMING=false` disables). - `hub/src/db/schema.sql` — additive: `users.telegram_chat_id` (BIGINT UNIQUE), `telegram_default_session_id`, `telegram_default_explicit` (BOOLEAN NOT NULL DEFAULT false — distinguishes an explicit `/session`/tap choice from an auto-pin), `telegram_link_code`, `telegram_link_code_expires_at`; new `telegram_inbound_log` table with `(user_id, received_at DESC)` index. - `hub/src/db/dal.ts` — Telegram DAL helpers (folded into the existing dal module, not a separate `telegram-dal.ts` as the plan envisioned — deviation noted in SUMMARY): `getUserByTelegramChatId`, `getUserByLinkCode`, `setLinkCode`, `linkChatId`, `unlinkChatId`, `setDefaultSession`, `getTelegramStatus`, `appendInboundLog`, `trimInboundLog`, `getUsersWithTelegramDefaultSession`. - `hub/src/csrf.ts` — Telegram REST routes covered by the existing double-submit middleware (no new exclusions). - `hub/src/index.ts` — mount `telegram-webhook.ts` AHEAD of JWT + license + CSRF catch-alls; mount `telegram.ts` inside; start the outbound bridge at boot. -- `hub/src/ws/agent.ts` — emit `assistant_message:final` on the internal event bus when a session run finalizes; **also emit `permission_request:pending` when a runner raises a permission prompt (Fix C)**. Additive only. +- `hub/src/ws/agent.ts` — emit `assistant_message:final` on the internal event bus when a session run finalizes; **also emit `permission_request:pending` when a runner raises a permission prompt (Fix C)**; **also emit `session_activity` (tool_use) for the summarized-streaming working message**. Additive only. - `hub/src/api/telegram-webhook.ts` — **(Fix C)** `handlePermissionCallback` resolves `pa:`/`pd:` taps → forwards `permission_response` on the agent socket; **(Fix B)** picker re-render uses `PAGE_SIZE` (was a hardcoded `20`). - `hub/src/telegram/bridge.ts` — **(Fix A)** `setMyCommands(BOT_COMMANDS)` on startup; **(Fix C)** subscribes to `permission_request:pending` → sends the Approve/Deny keyboard + records the pending prompt. diff --git a/hub/src/api/telegram-webhook.ts b/hub/src/api/telegram-webhook.ts index 46c6a49..ff9e26e 100644 --- a/hub/src/api/telegram-webhook.ts +++ b/hub/src/api/telegram-webhook.ts @@ -466,17 +466,16 @@ async function handlePermissionCallback( user: TelegramUserRow, perm: { requestId: string; approved: boolean }, ): Promise<{ outcome: string }> { - const pending = takePendingPrompt(perm.requestId); + // Authorization is enforced inside takePendingPrompt: it returns the entry only + // if THIS user is an authorized approver for (requestId). A shared default + // session that fanned the prompt to several users now resolves for whichever + // authorized user taps first (no last-write-wins clobber). + const pending = takePendingPrompt(perm.requestId, user.id); if (!pending) { - // Already resolved, expired, or unknown — tell the user, no state change. + // Already resolved, expired, unknown, or the tapping user isn't authorized. await safeAnswerCallback(cb.id, { text: "This prompt already expired or was answered.", show_alert: false }); return { outcome: "callback_permission_stale" }; } - // Authorization: the prompt MUST belong to the tapping user. - if (pending.userId !== user.id) { - await safeAnswerCallback(cb.id, { text: "Not allowed", show_alert: true }); - return { outcome: "callback_permission_denied_auth" }; - } // Forward the decision to the agent socket (same frame the web client sends). const channel = getChannel(pending.sessionId); diff --git a/hub/src/config.ts b/hub/src/config.ts index 8ecbe5b..15d6b8c 100644 --- a/hub/src/config.ts +++ b/hub/src/config.ts @@ -109,6 +109,10 @@ const telegramWebhookSecret = requireMinLenIfSet( 16, ); const telegramBotUsername = process.env.TELEGRAM_BOT_USERNAME || ""; +// Summarized streaming: one editable "working…" message per turn that collapses +// tool-use lines to one-liners and finalizes with the assistant text. Default ON; +// set TELEGRAM_SUMMARIZED_STREAMING=false to revert to a single final-blob send. +const telegramSummarizedStreaming = process.env.TELEGRAM_SUMMARIZED_STREAMING !== "false"; // B2 (obs): owner user_id for the hub's self-capture error_project row. // When unset, self-capture is inert (no row seeded, no hooks installed). @@ -182,6 +186,7 @@ export const config = { botToken: telegramBotToken, webhookSecret: telegramWebhookSecret, botUsername: telegramBotUsername, + summarizedStreaming: telegramSummarizedStreaming, }, // B4: observability bearer token (gates /healthz/deep + /metrics). diff --git a/hub/src/events/session-activity-events.ts b/hub/src/events/session-activity-events.ts new file mode 100644 index 0000000..7b7dac3 --- /dev/null +++ b/hub/src/events/session-activity-events.ts @@ -0,0 +1,69 @@ +/** + * Internal hub event bus for SESSION ACTIVITY (tool-use one-liners). + * + * Mirrors `events/assistant-events.ts` + `events/permission-events.ts`: a + * read-only fanout for server-side consumers. The Telegram outbound bridge + * subscribes and folds these into the editable "working…" message it maintains + * per (chat, session) while a turn is in flight. The final assistant text still + * arrives via the separate `assistant_message:final` bus. + * + * Scope (kept deliberately tiny): + * - `tool_use` only. `thinking` and raw `text_delta` are NEVER emitted here — + * the bridge collapses tool calls to one-liners and shows the final text in + * full; intermediate token noise is intentionally dropped. + * + * This is NOT a dispatch path. Dispatch (inbound user→session) lives in + * `hub/src/dispatch/`. This bus is outbound-only and side-effect free for the + * emitter — listener errors are isolated so a bridge throw can't tear down the + * WS handler in `ws/agent.ts`. + */ + +import { EventEmitter } from "node:events"; + +export interface SessionActivityEvent { + /** Session that produced the activity. */ + sessionId: string; + /** Owning user of the session (known at emit time in ws/agent.ts). */ + userId: string; + /** Activity kind. Only "tool_use" is emitted today. */ + kind: "tool_use"; + /** Tool name (e.g. "Edit", "Bash", "Read"). */ + toolName: string; + /** Best-effort one-line target (file path / command / url); may be empty. */ + detail?: string; +} + +export type SessionActivityListener = (e: SessionActivityEvent) => void | Promise; + +const emitter = new EventEmitter(); +emitter.setMaxListeners(50); + +const EVENT = "session_activity"; + +/** Emit a session-activity event. Listener errors are isolated. */ +export function emitSessionActivity(e: SessionActivityEvent): void { + const listeners = emitter.listeners(EVENT) as SessionActivityListener[]; + for (const fn of listeners) { + try { + const ret = fn(e); + if (ret && typeof (ret as Promise).catch === "function") { + (ret as Promise).catch((err: any) => { + console.warn(`[session-activity-events] async listener error: ${err?.message ?? err}`); + }); + } + } catch (err: any) { + console.warn(`[session-activity-events] listener threw: ${err?.message ?? err}`); + } + } +} + +/** Subscribe to session-activity events. Returns an unsubscribe fn. */ +export function onSessionActivity(listener: SessionActivityListener): () => void { + emitter.on(EVENT, listener as any); + return () => emitter.off(EVENT, listener as any); +} + +/** Test-only — remove every listener. */ +export function _resetSessionActivityEventsForTests(): void { + emitter.removeAllListeners(EVENT); +} diff --git a/hub/src/telegram/approvals.ts b/hub/src/telegram/approvals.ts index 2cec293..300a977 100644 Binary files a/hub/src/telegram/approvals.ts and b/hub/src/telegram/approvals.ts differ diff --git a/hub/src/telegram/bridge.ts b/hub/src/telegram/bridge.ts index 24cb190..3d16140 100644 --- a/hub/src/telegram/bridge.ts +++ b/hub/src/telegram/bridge.ts @@ -34,14 +34,66 @@ import { config } from "../config.ts"; import { onAssistantMessageFinal, type AssistantMessageFinalEvent } from "../events/assistant-events.ts"; import { onPermissionPending, type PermissionPendingEvent } from "../events/permission-events.ts"; +import { onSessionActivity, type SessionActivityEvent } from "../events/session-activity-events.ts"; import { getUsersWithTelegramDefaultSession } from "../db/dal.ts"; -import { sendMessage, sendMessageWithKeyboard, setMyCommands } from "./client.ts"; +import { + sendMessageMd, + sendMessageWithKeyboard, + sendChatAction, + editMessageTextMd, + escapeMarkdownV2, + setMyCommands, +} from "./client.ts"; import { BOT_COMMANDS } from "./commands.ts"; import { rememberPendingPrompt, permissionCallbackData } from "./approvals.ts"; let started = false; let unsubscribe: (() => void) | null = null; let unsubscribePermission: (() => void) | null = null; +let unsubscribeActivity: (() => void) | null = null; + +// ── Summarized-streaming state ────────────────────────────────────────────── +// One editable "working…" message per (chat, session). tool_use events append +// collapsed one-liners; the final assistant_message edits it to the full text. +const TYPING_REFRESH_MS = 4000; // Telegram typing expires ~5s +const EDIT_THROTTLE_MS = 900; // avoid hammering editMessageText (Telegram rate-limits edits) +const MAX_TOOL_LINES = 12; // cap the collapsed list so the working message stays small + +interface WorkingState { + messageId: number; + lines: string[]; + typingTimer: ReturnType | null; + lastEditAt: number; + pendingEdit: boolean; +} + +const workingByKey = new Map(); + +function workKey(chatId: string | number | bigint, sessionId: string): string { + return `${String(chatId)}:${sessionId}`; +} + +/** Collapse a tool_use into a one-line summary, e.g. "🔧 Edit hub/src/foo.ts". */ +function toolLine(toolName: string, detail?: string): string { + const d = (detail ?? "").trim(); + const short = d.length > 80 ? d.slice(0, 79) + "…" : d; + return `🔧 ${toolName}${short ? " " + short : ""}`; +} + +/** Render the in-progress working message body (escaped MarkdownV2). */ +function renderWorking(lines: string[]): string { + const head = "⏳ *Working…*"; + if (lines.length === 0) return head; + const body = lines.map((l) => escapeMarkdownV2(l)).join("\n"); + return head + "\n\n" + body; +} + +function stopTyping(st: WorkingState): void { + if (st.typingTimer) { + clearInterval(st.typingTimer); + st.typingTimer = null; + } +} // Per-chat serial queue. Key = chat_id (stringified to dodge bigint vs number // equality surprises). Value = the tail Promise; new sends chain onto it. @@ -70,6 +122,61 @@ function enqueueForChat(chatId: string | number | bigint, task: () => Promise { + if (!config.telegram.summarizedStreaming) return; + if (e.kind !== "tool_use") return; + let users: Array<{ id: string; telegram_chat_id: string | number }>; + try { + users = await getUsersWithTelegramDefaultSession(e.sessionId); + } catch (err: any) { + console.warn(`[telegram-bridge] activity DAL lookup failed session=${e.sessionId}: ${err?.message ?? err}`); + return; + } + if (users.length === 0) return; + + const line = toolLine(e.toolName, e.detail); + for (const u of users) { + const chatId = u.telegram_chat_id; + if (chatId === null || chatId === undefined) continue; + const key = workKey(chatId, e.sessionId); + void enqueueForChat(chatId, async () => { + let st = workingByKey.get(key); + try { + if (!st) { + // Refresh typing immediately, then on an interval until finalized. + await sendChatAction(chatId as number | string, "typing"); + const sent = await sendMessageMd(chatId as number | string, renderWorking([line])); + const messageId = sent?.message_id ?? 0; + if (!messageId) return; // couldn't anchor an editable message; skip streaming for this turn + const typingTimer = setInterval(() => { + void sendChatAction(chatId as number | string, "typing"); + }, TYPING_REFRESH_MS); + st = { messageId, lines: [line], typingTimer, lastEditAt: Date.now(), pendingEdit: false }; + workingByKey.set(key, st); + return; + } + // Append + edit (throttled). Cap the visible list. + st.lines.push(line); + if (st.lines.length > MAX_TOOL_LINES) st.lines = st.lines.slice(-MAX_TOOL_LINES); + const sinceEdit = Date.now() - st.lastEditAt; + if (sinceEdit >= EDIT_THROTTLE_MS) { + st.lastEditAt = Date.now(); + await editMessageTextMd(chatId as number | string, st.messageId, renderWorking(st.lines)); + } + } catch (err: any) { + console.warn( + `[telegram-bridge] activity edit failed chat=${chatKey(chatId)} session=${e.sessionId}: ${err?.message ?? err}`, + ); + } + }); + } +} + async function onFinal(e: AssistantMessageFinalEvent): Promise { let users: Array<{ id: string; telegram_chat_id: string | number }>; try { @@ -80,17 +187,37 @@ async function onFinal(e: AssistantMessageFinalEvent): Promise { } if (users.length === 0) return; + const finalMd = escapeMarkdownV2(e.text); for (const u of users) { const chatId = u.telegram_chat_id; if (chatId === null || chatId === undefined) continue; + const key = workKey(chatId, e.sessionId); // Fire-and-forget — the queue ensures per-chat serialization. We do NOT // await across users; different chats progress in parallel. void enqueueForChat(chatId, async () => { + const st = workingByKey.get(key); try { - await sendMessage(chatId as number | string, e.text); + if (st) { + // Finalize the editable working message with the full assistant text. + stopTyping(st); + workingByKey.delete(key); + if (e.text.length <= 4096) { + await editMessageTextMd(chatId as number | string, st.messageId, finalMd); + return; + } + // Too long to fit one edited message — leave the working summary and + // send the full text as a follow-up (MarkdownV2 → plaintext fallback). + await sendMessageMd(chatId as number | string, finalMd); + return; + } + // No working message (streaming off, or no tool calls this turn) — send + // the final text as a fresh MarkdownV2 message. + await sendMessageMd(chatId as number | string, finalMd); } catch (err: any) { + if (st) stopTyping(st); + workingByKey.delete(key); console.warn( - `[telegram-bridge] sendMessage failed chat=${chatKey(chatId)} session=${e.sessionId}: ${err?.message ?? err}`, + `[telegram-bridge] final send failed chat=${chatKey(chatId)} session=${e.sessionId}: ${err?.message ?? err}`, ); } }); @@ -127,9 +254,10 @@ async function onPermissionPendingEvent(e: PermissionPendingEvent): Promise { try { - const sent = await sendMessageWithKeyboard(chatId as number | string, text, keyboard); - // Record the pending prompt so the callback can resolve it. We bind the - // prompt to THIS user so a foreign chat can't resolve it. - rememberPendingPrompt(e.requestId, { + let sent: { message_id: number } | void; + try { + sent = await sendMessageWithKeyboard(chatId as number | string, text, keyboard, { + parse_mode: "MarkdownV2", + }); + } catch (mdErr: any) { + // 400 → markup rejected; resend as plain text so the prompt is never + // silently dropped (the inline keyboard still works). + if (mdErr?.status === 400) { + sent = await sendMessageWithKeyboard(chatId as number | string, text, keyboard); + } else { + throw mdErr; + } + } + // Record the pending prompt so the callback can resolve it. Keyed by + // (sessionId, requestId) with THIS user authorized — a shared default + // session no longer overwrites a sibling user's binding. + rememberPendingPrompt(e.sessionId, e.requestId, { sessionId: e.sessionId, userId: u.id, chatId, @@ -174,6 +316,7 @@ export function startTelegramBridge(): void { } unsubscribe = onAssistantMessageFinal(onFinal); unsubscribePermission = onPermissionPending(onPermissionPendingEvent); + unsubscribeActivity = onSessionActivity(onActivity); started = true; // Register the slash-command menu so typing `/` shows a popup. Best-effort, // fire-and-forget — a transient failure must not block bridge startup. @@ -193,6 +336,12 @@ export function _stopTelegramBridgeForTests(): void { unsubscribePermission(); unsubscribePermission = null; } + if (unsubscribeActivity) { + unsubscribeActivity(); + unsubscribeActivity = null; + } + for (const st of workingByKey.values()) stopTyping(st); + workingByKey.clear(); started = false; chatQueues.clear(); } diff --git a/hub/src/telegram/client.ts b/hub/src/telegram/client.ts index 570cb13..3da816b 100644 --- a/hub/src/telegram/client.ts +++ b/hub/src/telegram/client.ts @@ -124,6 +124,95 @@ export async function sendMessage( * way as sendMessage, but the keyboard ONLY attaches to the LAST chunk * (Telegram supports one reply_markup per message). */ +/** + * Send `text` with MarkdownV2 parse_mode. If Telegram rejects the markup with + * a 400 (an unbalanced/unescaped reserved char makes it reject the WHOLE + * message), retry ONCE as plain text so a session is never silently dropped. + * Non-400 errors propagate (caller's queue swallows + logs). + * + * Callers are responsible for escaping any text that should render literally + * via `escapeMarkdownV2`; intentional markup (```code```, *bold*) is passed + * through by the caller assembling the message. + */ +export async function sendMessageMd( + chatId: number | string, + text: string, +): Promise<{ message_id: number } | void> { + try { + return await sendMessageReturningId(chatId, text, { parse_mode: "MarkdownV2" }); + } catch (err) { + if (err instanceof TelegramClientError && err.status === 400) { + // Markup rejected — fall back to plain text (strip nothing; Telegram + // renders the raw chars). Better an unformatted message than none. + return await sendMessageReturningId(chatId, text); + } + throw err; + } +} + +/** + * Like {@link sendMessage} but returns the message_id of the LAST chunk sent + * (parity with sendMessageWithKeyboard's return). Internal — callers use + * sendMessage / sendMessageMd. + */ +async function sendMessageReturningId( + chatId: number | string, + text: string, + opts: SendMessageOptions = {}, +): Promise<{ message_id: number } | void> { + const token = tokenOrThrow(); + const url = `${API_BASE}/bot${token}/sendMessage`; + const chunks = splitForTelegram(text); + let last: { message_id: number } | undefined; + for (const chunk of chunks) { + const res = await fetch(url, { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ + chat_id: chatId, + text: chunk, + ...(opts.parse_mode ? { parse_mode: opts.parse_mode } : {}), + ...(opts.disable_web_page_preview ? { disable_web_page_preview: true } : {}), + }), + signal: AbortSignal.timeout(10_000), + }); + if (!res.ok) { + const body = await res.text().catch(() => ""); + throw new TelegramClientError(res.status, body); + } + try { + const json = (await res.json()) as { result?: { message_id?: number } }; + if (typeof json.result?.message_id === "number") last = { message_id: json.result.message_id }; + } catch { + /* swallow — id is best-effort */ + } + } + return last; +} + +/** + * Send a Telegram chat action (e.g. "typing"). Expires after ~5s on Telegram's + * side, so callers refresh on an interval while a turn is active. Best-effort — + * errors are swallowed (a failed typing indicator must never affect delivery). + */ +export async function sendChatAction( + chatId: number | string, + action: "typing" = "typing", +): Promise { + const token = config.telegram.botToken; + if (!token) return; + try { + await fetch(`${API_BASE}/bot${token}/sendChatAction`, { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ chat_id: chatId, action }), + signal: AbortSignal.timeout(10_000), + }); + } catch { + /* swallow — typing indicator is non-essential */ + } +} + export async function sendMessageWithKeyboard( chatId: number | string, text: string, @@ -281,6 +370,29 @@ export async function setMyCommands( } } +/** + * MarkdownV2 variant of {@link editMessageText} with a 400→plain-text fallback, + * mirroring {@link sendMessageMd}. Used by the streaming "working…" message which + * is edited repeatedly as a turn progresses. A "message is not modified" 400 + * (identical text) is treated as success — it's a benign no-op edit. + */ +export async function editMessageTextMd( + chatId: number | string, + messageId: number, + text: string, +): Promise { + try { + await editMessageText(chatId, messageId, text, { parse_mode: "MarkdownV2" }); + } catch (err) { + if (err instanceof TelegramClientError && err.status === 400) { + if (err.bodyPreview.includes("message is not modified")) return; + await editMessageText(chatId, messageId, text); + return; + } + throw err; + } +} + export async function getFile(fileId: string): Promise<{ file_id: string; file_path: string; file_size?: number }> { const token = tokenOrThrow(); const url = `${API_BASE}/bot${token}/getFile?file_id=${encodeURIComponent(fileId)}`; diff --git a/hub/src/ws/agent.ts b/hub/src/ws/agent.ts index 518ac3c..434499c 100644 --- a/hub/src/ws/agent.ts +++ b/hub/src/ws/agent.ts @@ -388,6 +388,31 @@ export async function handleAgentMessage(ws: ServerWebSocket, raw: // --- Agent activity events --- if (msg.type === 'thinking' || msg.type === 'tool_use' || msg.type === 'tool_result') { broadcastToSubscribers(sessionId, { ...msg }) + // Fan tool_use out to server-side consumers (Telegram summarized streaming). + // tool_use ONLY — thinking/tool_result are intentionally not surfaced. A + // listener throw is isolated by the emitter so it can't tear down this WS + // handler. Dynamic import mirrors the assistant/permission emits below. + if (msg.type === 'tool_use') { + try { + const { emitSessionActivity } = await import('../events/session-activity-events.ts') + const input = (msg as any).input + let detail = '' + if (input && typeof input === 'object') { + const o = input as Record + const v = o.command ?? o.file_path ?? o.path ?? o.url ?? o.pattern + if (typeof v === 'string') detail = v + } + emitSessionActivity({ + sessionId, + userId: ws.data.userId!, + kind: 'tool_use', + toolName: (msg as any).tool ?? 'tool', + detail, + }) + } catch (err: any) { + console.warn('[agent] emitSessionActivity failed', err?.message) + } + } } if (msg.type === 'text_delta') { diff --git a/hub/test/telegram-approvals.test.ts b/hub/test/telegram-approvals.test.ts index 94f3b0a..0f569f5 100644 --- a/hub/test/telegram-approvals.test.ts +++ b/hub/test/telegram-approvals.test.ts @@ -28,20 +28,54 @@ beforeEach(() => _resetPendingPromptsForTests()); describe("approvals registry", () => { test("remember then take returns the prompt exactly once", () => { - rememberPendingPrompt("req-1", mkPrompt()); - const first = takePendingPrompt("req-1"); + rememberPendingPrompt("s1", "req-1", mkPrompt()); + const first = takePendingPrompt("req-1", "u1"); expect(first?.sessionId).toBe("s1"); // Resolved exactly once — second take is null. - expect(takePendingPrompt("req-1")).toBeNull(); + expect(takePendingPrompt("req-1", "u1")).toBeNull(); }); test("unknown requestId returns null", () => { - expect(takePendingPrompt("nope")).toBeNull(); + expect(takePendingPrompt("nope", "u1")).toBeNull(); }); test("expired prompt is not returned", () => { - rememberPendingPrompt("req-old", mkPrompt({ createdAtMs: Date.now() - 11 * 60 * 1000 })); - expect(takePendingPrompt("req-old")).toBeNull(); + rememberPendingPrompt("s1", "req-old", mkPrompt({ createdAtMs: Date.now() - 11 * 60 * 1000 })); + expect(takePendingPrompt("req-old", "u1")).toBeNull(); + }); + + test("unauthorized user cannot take a prompt bound to another user", () => { + rememberPendingPrompt("s1", "req-2", mkPrompt({ userId: "u1" })); + // A different user tapping the same requestId gets nothing... + expect(takePendingPrompt("req-2", "u2")).toBeNull(); + // ...and the rightful user can still resolve it. + expect(takePendingPrompt("req-2", "u1")?.userId).toBe("u1"); + }); + + // MED fast-follow (#189 security review): two users share ONE default session. + // The runner raises a single requestId; the bridge fans the prompt to both. + // The old requestId-only key clobbered the first binding, locking out a valid + // approver. Both bindings must now survive and either user can resolve. + test("two users on the same default session: either can resolve, exactly once", () => { + rememberPendingPrompt("shared-session", "req-3", mkPrompt({ userId: "u1", chatId: 100, messageId: 11 })); + rememberPendingPrompt("shared-session", "req-3", mkPrompt({ userId: "u2", chatId: 200, messageId: 22 })); + + // u1 taps first — resolves with u1's own chat/message context. + const a = takePendingPrompt("req-3", "u1"); + expect(a?.userId).toBe("u1"); + expect(a?.chatId).toBe(100); + expect(a?.messageId).toBe(11); + + // The whole prompt is now resolved — u2's later tap finds nothing. + expect(takePendingPrompt("req-3", "u2")).toBeNull(); + }); + + test("same requestId across DIFFERENT sessions stays isolated", () => { + rememberPendingPrompt("sess-A", "dup", mkPrompt({ userId: "uA", sessionId: "sess-A" })); + rememberPendingPrompt("sess-B", "dup", mkPrompt({ userId: "uB", sessionId: "sess-B" })); + // Explicit sessionId disambiguates. + expect(takePendingPrompt("dup", "uA", "sess-A")?.sessionId).toBe("sess-A"); + expect(takePendingPrompt("dup", "uB", "sess-B")?.sessionId).toBe("sess-B"); }); }); diff --git a/hub/test/telegram-bridge.test.ts b/hub/test/telegram-bridge.test.ts index 99db2d2..a8a1133 100644 --- a/hub/test/telegram-bridge.test.ts +++ b/hub/test/telegram-bridge.test.ts @@ -24,6 +24,10 @@ const state: { keyboardSends: Array<{ chat: number | string; text: string; keyboard: any }>; // Recorded setMyCommands calls. setCommandsCalls: Array>; + // Recorded editMessageTextMd calls (streaming working-message edits). + edits: Array<{ chat: number | string; messageId: number; text: string }>; + // Recorded sendChatAction (typing) calls. + chatActions: Array; // Optional behavior knob for sendMessage. sendImpl: ((chat: number | string, text: string) => Promise) | null; } = { @@ -31,6 +35,8 @@ const state: { sends: [], keyboardSends: [], setCommandsCalls: [], + edits: [], + chatActions: [], sendImpl: null, }; @@ -62,6 +68,24 @@ mock.module("../src/telegram/client.ts", () => ({ } state.sends.push({ chat: chatId, text, at: Date.now() }); }, + // onFinal now finalizes via sendMessageMd (MarkdownV2 + 400 fallback). Route it + // into the same recorder so existing assertions hold (text is escaped upstream + // but the bridge tests use reserved-char-free strings, so escaping is a no-op). + sendMessageMd: async (chatId: number | string, text: string) => { + if (state.sendImpl) { + await state.sendImpl(chatId, text); + } + state.sends.push({ chat: chatId, text, at: Date.now() }); + return { message_id: 99 }; + }, + // Streaming "working…" message helpers — recorded so the summarized-streaming + // test can assert the working message is created then finalized. + editMessageTextMd: async (chatId: number | string, messageId: number, text: string) => { + state.edits.push({ chat: chatId, messageId, text }); + }, + sendChatAction: async (chatId: number | string) => { + state.chatActions.push(chatId); + }, // Inline-approval prompts go through sendMessageWithKeyboard; stub it so // tests never touch the network and can assert the keyboard payload. sendMessageWithKeyboard: async (chatId: number | string, text: string, keyboard: any) => { @@ -80,6 +104,8 @@ let emitAssistantMessageFinal: typeof import("../src/events/assistant-events.ts" let _resetEvents: typeof import("../src/events/assistant-events.ts")._resetAssistantEventsForTests; let emitPermissionPending: typeof import("../src/events/permission-events.ts").emitPermissionPending; let _resetPermEvents: typeof import("../src/events/permission-events.ts")._resetPermissionEventsForTests; +let emitSessionActivity: typeof import("../src/events/session-activity-events.ts").emitSessionActivity; +let _resetActivity: typeof import("../src/events/session-activity-events.ts")._resetSessionActivityEventsForTests; let takePendingPrompt: typeof import("../src/telegram/approvals.ts").takePendingPrompt; let _resetPending: typeof import("../src/telegram/approvals.ts")._resetPendingPromptsForTests; let startTelegramBridge: typeof import("../src/telegram/bridge.ts").startTelegramBridge; @@ -92,6 +118,9 @@ beforeAll(async () => { ({ emitPermissionPending, _resetPermissionEventsForTests: _resetPermEvents } = await import( "../src/events/permission-events.ts" )); + ({ emitSessionActivity, _resetSessionActivityEventsForTests: _resetActivity } = await import( + "../src/events/session-activity-events.ts" + )); ({ takePendingPrompt, _resetPendingPromptsForTests: _resetPending } = await import( "../src/telegram/approvals.ts" )); @@ -105,10 +134,13 @@ beforeEach(() => { state.sends.length = 0; state.keyboardSends.length = 0; state.setCommandsCalls.length = 0; + state.edits.length = 0; + state.chatActions.length = 0; state.sendImpl = null; _stopBridge(); _resetEvents(); _resetPermEvents(); + _resetActivity(); _resetPending(); }); @@ -116,6 +148,7 @@ afterEach(() => { _stopBridge(); _resetEvents(); _resetPermEvents(); + _resetActivity(); _resetPending(); }); @@ -284,12 +317,45 @@ describe("Telegram outbound bridge", () => { expect(buttons.map((b: any) => b.callback_data)).toEqual(["pa:req-123", "pd:req-123"]); // The pending prompt is recorded so the webhook callback can resolve it. - const pending = takePendingPrompt("req-123"); + const pending = takePendingPrompt("req-123", "uP"); expect(pending).not.toBeNull(); expect(pending!.sessionId).toBe("sess_P"); expect(pending!.userId).toBe("uP"); }); + // ── Summarized streaming ────────────────────────────────────────────────── + test("tool_use creates an editable 'working…' message; final edits it to the answer", async () => { + state.sessionUsers.set("sess_S", [{ id: "uS", telegram_chat_id: 9001 }]); + startTelegramBridge(); + + // A tool runs mid-turn → working message sent (via sendMessageMd) + typing. + emitSessionActivity({ + sessionId: "sess_S", + userId: "uS", + kind: "tool_use", + toolName: "Edit", + detail: "hub/src/foo.ts", + }); + await settle(); + + // Working message was sent and a typing indicator fired. + expect(state.sends).toHaveLength(1); + expect(state.sends[0]!.text).toContain("Working"); + expect(state.sends[0]!.text).toContain("Edit"); + expect(state.chatActions).toContain(9001); + + // Final assistant message edits the SAME working message (no new send). + const sendsBefore = state.sends.length; + emitAssistantMessageFinal({ sessionId: "sess_S", userId: "uS", text: "done — all set" }); + await settle(); + + expect(state.sends).toHaveLength(sendsBefore); // finalized via edit, not a new send + expect(state.edits.length).toBeGreaterThanOrEqual(1); + const lastEdit = state.edits[state.edits.length - 1]!; + expect(lastEdit.messageId).toBe(99); + expect(lastEdit.text).toContain("done"); + }); + test("permission_request for a session no user has as default is a no-op", async () => { startTelegramBridge(); emitPermissionPending({ @@ -301,7 +367,7 @@ describe("Telegram outbound bridge", () => { }); await settle(); expect(state.keyboardSends).toHaveLength(0); - expect(takePendingPrompt("req-none")).toBeNull(); + expect(takePendingPrompt("req-none", "uX")).toBeNull(); }); }); diff --git a/hub/test/telegram-client-fallback.test.ts b/hub/test/telegram-client-fallback.test.ts new file mode 100644 index 0000000..2b656de --- /dev/null +++ b/hub/test/telegram-client-fallback.test.ts @@ -0,0 +1,119 @@ +/** + * sendMessageMd / editMessageTextMd MarkdownV2 → plain-text fallback. + * + * Telegram rejects an entire message with HTTP 400 when its MarkdownV2 markup is + * unbalanced. The bridge must never silently drop a session's reply, so a 400 on + * a MarkdownV2 send is retried ONCE as plain text. This test drives that path by + * stubbing global fetch. + */ +import { describe, test, expect, beforeEach, afterAll } from "bun:test"; + +// client.ts reads config.telegram.botToken at CALL time, so we mutate the live +// config singleton (no mock.module — that's process-wide and would leak an +// empty/forced token into sibling test files; mirrors telegram-bridge.test.ts's +// "disabled when token unset" approach). +import { config } from "../src/config.ts"; +// Import via a cache-busted query so we always get the REAL client even when a +// sibling test file mock.module()'s "../src/telegram/client.ts" (Bun mock.module +// is process-wide; without this, telegram-bridge.test.ts's stub of sendMessageMd +// leaks in and this file's 400-fallback assertions break in the full-suite run). +const { sendMessageMd, editMessageTextMd } = await import( + `../src/telegram/client.ts?nomock=${Date.now()}` +); + +const _origToken = config.telegram.botToken; +Object.defineProperty(config.telegram, "botToken", { + value: "fake-bot-token-fallback", + writable: true, + configurable: true, +}); + +interface Call { + url: string; + body: any; +} + +const realFetch = globalThis.fetch; +let calls: Call[] = []; + +/** A fetch stub that returns the next queued response and records the call. */ +function installFetch(responder: (call: Call) => { ok: boolean; status: number; result?: any; body?: string }) { + globalThis.fetch = (async (url: any, init: any) => { + const body = init?.body ? JSON.parse(init.body) : undefined; + const call: Call = { url: String(url), body }; + calls.push(call); + const r = responder(call); + return { + ok: r.ok, + status: r.status, + json: async () => ({ ok: r.ok, result: r.result ?? { message_id: 1 } }), + text: async () => r.body ?? "", + } as any; + }) as any; +} + +beforeEach(() => { + calls = []; +}); +afterAll(() => { + globalThis.fetch = realFetch; + Object.defineProperty(config.telegram, "botToken", { + value: _origToken, + writable: true, + configurable: true, + }); +}); + +describe("sendMessageMd 400 → plain-text fallback", () => { + test("first MarkdownV2 send 400s, retries once as plain text and succeeds", async () => { + let n = 0; + installFetch(() => { + n += 1; + // First attempt (MarkdownV2) → 400; second (plain) → 200. + if (n === 1) return { ok: false, status: 400, body: "Bad Request: can't parse entities" }; + return { ok: true, status: 200, result: { message_id: 42 } }; + }); + + const res = await sendMessageMd(123, "oops *unbalanced"); + expect(res).toEqual({ message_id: 42 }); + expect(calls).toHaveLength(2); + // Attempt 1 carried parse_mode; the retry did NOT. + expect(calls[0]!.body.parse_mode).toBe("MarkdownV2"); + expect(calls[1]!.body.parse_mode).toBeUndefined(); + expect(calls[1]!.body.text).toBe("oops *unbalanced"); + }); + + test("MarkdownV2 send that succeeds does NOT retry", async () => { + installFetch(() => ({ ok: true, status: 200, result: { message_id: 7 } })); + const res = await sendMessageMd(123, "all good"); + expect(res).toEqual({ message_id: 7 }); + expect(calls).toHaveLength(1); + expect(calls[0]!.body.parse_mode).toBe("MarkdownV2"); + }); + + test("non-400 error propagates (no plain-text retry)", async () => { + installFetch(() => ({ ok: false, status: 429, body: "Too Many Requests" })); + await expect(sendMessageMd(123, "rate limited")).rejects.toThrow(); + expect(calls).toHaveLength(1); + }); + + test("editMessageTextMd 400 retries as plain text", async () => { + let n = 0; + installFetch(() => { + n += 1; + if (n === 1) return { ok: false, status: 400, body: "Bad Request: can't parse entities" }; + return { ok: true, status: 200 }; + }); + await editMessageTextMd(123, 9, "edit *broken"); + expect(calls).toHaveLength(2); + expect(calls[0]!.body.parse_mode).toBe("MarkdownV2"); + expect(calls[1]!.body.parse_mode).toBeUndefined(); + }); + + test('editMessageTextMd treats "message is not modified" 400 as success (no retry)', async () => { + installFetch(() => ({ ok: false, status: 400, body: "Bad Request: message is not modified" })); + await editMessageTextMd(123, 9, "same text"); + // Single call — the benign 400 is swallowed, no plain-text retry. + expect(calls).toHaveLength(1); + }); +}); diff --git a/hub/test/telegram-webhook.test.ts b/hub/test/telegram-webhook.test.ts index d56b68e..35289ac 100644 --- a/hub/test/telegram-webhook.test.ts +++ b/hub/test/telegram-webhook.test.ts @@ -430,7 +430,7 @@ describe("inline approval callbacks (Fix C)", () => { state.channelSends = []; const { rememberPendingPrompt, _resetPendingPromptsForTests } = await import("../src/telegram/approvals.ts"); _resetPendingPromptsForTests(); - rememberPendingPrompt("req-approve", { + rememberPendingPrompt("sess_perm", "req-approve", { sessionId: "sess_perm", userId: LINKED_USER_ID, chatId: LINKED_CHAT, messageId: 50, toolName: "Bash", createdAtMs: Date.now(), }); @@ -454,7 +454,7 @@ describe("inline approval callbacks (Fix C)", () => { state.user = { id: LINKED_USER_ID, email: LINKED_EMAIL, telegram_chat_id: LINKED_CHAT, telegram_default_session_id: "sess_perm" } as any; state.channelSends = []; const { rememberPendingPrompt } = await import("../src/telegram/approvals.ts"); - rememberPendingPrompt("req-deny", { + rememberPendingPrompt("sess_perm", "req-deny", { sessionId: "sess_perm", userId: LINKED_USER_ID, chatId: LINKED_CHAT, messageId: 50, toolName: "Write", createdAtMs: Date.now(), }); const res = await post(app, `/api/telegram/webhook/${TEST_SECRET}`, mkCallback({ update_id: 701, chatId: LINKED_CHAT, data: "pd:req-deny" })); @@ -477,12 +477,15 @@ describe("inline approval callbacks (Fix C)", () => { state.user = { id: LINKED_USER_ID, email: LINKED_EMAIL, telegram_chat_id: LINKED_CHAT, telegram_default_session_id: "sess_perm" } as any; state.channelSends = []; const { rememberPendingPrompt } = await import("../src/telegram/approvals.ts"); - rememberPendingPrompt("req-foreign", { + rememberPendingPrompt("sess_perm", "req-foreign", { sessionId: "sess_perm", userId: "SOME-OTHER-USER", chatId: LINKED_CHAT, messageId: 50, toolName: "Bash", createdAtMs: Date.now(), }); const res = await post(app, `/api/telegram/webhook/${TEST_SECRET}`, mkCallback({ update_id: 703, chatId: LINKED_CHAT, data: "pa:req-foreign" })); const body = await res.json(); - expect(body.outcome).toBe("callback_permission_denied_auth"); + // Authorization now lives inside takePendingPrompt: an unauthorized user + // simply finds no entry bound to them → "stale". Still fails closed (no + // permission_response forwarded), which is the security-relevant assertion. + expect(body.outcome).toBe("callback_permission_stale"); expect(state.channelSends).toHaveLength(0); }); });