Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 87 additions & 13 deletions docs/telegram-bridge.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,13 @@ any phone running Telegram — DM the bot, get a final reply back. Inbound
messages route through the **shared session-dispatch pipeline** (`hub/src/dispatch/`)
— the same gate ordering (threshold → daily-cost-cap), per-session queue, and
offline-grace buffer scheduled tasks and the other subsystems use — with
`store: null` (Telegram writes no run row). Outbound forwards only the *final*
`assistant_message` (no streaming, no `thinking`/`tool_use`/`text_delta` chatter)
via the event bus, NOT a pipeline finalize hook.
`store: null` (Telegram writes no run row). Outbound forwards the final
`assistant_message` plus a **summarized live stream** — an editable "working…"
message that collapses each `tool_use` to a one-liner while the turn runs, then
finalizes to the full assistant text. `thinking` and raw `text_delta` chatter are
never forwarded. All user-facing messages use Telegram **MarkdownV2** with a
400→plain-text fallback. Delivery rides the event buses, NOT a pipeline finalize
hook.

A single hub-wide bot serves every user, keyed by `users.telegram_chat_id`.
One BotFather bot, one Telegram webhook secret, one redeploy.
Expand Down Expand Up @@ -206,13 +210,75 @@ When a Telegram-driven Claude session hits a tool **permission/approval** prompt

1. `hub/src/ws/agent.ts` already broadcasts `permission_request` to web subscribers. It now ALSO emits a `permission_request:pending` event on the dedicated bus `hub/src/events/permission-events.ts` (same isolation discipline as the `assistant_message:final` bus — a listener throw can't tear down the WS handler).
2. The bridge subscribes (`onPermissionPendingEvent` in `bridge.ts`). For every user whose `telegram_default_session_id` matches the emitting session (reusing `getUsersWithTelegramDefaultSession`), it sends an inline keyboard `[✅ Approve] [🚫 Deny]` with `callback_data` `pa:<request_id>` / `pd:<request_id>`, plus a one-line preview of the tool input (e.g. the Bash `command`).
3. It records the pending prompt in `hub/src/telegram/approvals.ts` keyed by `request_id` → `{ sessionId, userId, chatId, messageId, toolName }`. **Why a server-side map and not callback_data:** Telegram caps `callback_data` at 64 bytes — too small for session UUID + request UUID + a user binding. The map keeps `callback_data` tiny AND the entry's `userId` enforces authorization.
4. On a tap, the webhook's `handlePermissionCallback` looks up + removes the prompt (resolved exactly once), verifies it belongs to the tapping user, then forwards `{ type: "permission_response", session_id, request_id, approved }` onto the session's **agent socket** via `getChannel(sessionId).ws.send(...)` — the exact frame the web client sends. It edits the prompt message to `✅ Approved — <tool>` / `🚫 Denied — <tool>` and drops the buttons. `answerCallbackQuery` fires on every branch.

**Authorization + edge cases (all answer the callback):** foreign user → `Not allowed` (`callback_permission_denied_auth`, nothing sent to the agent); unknown/expired `request_id` → `This prompt already expired or was answered.` (`callback_permission_stale`); session socket gone → `Session is offline — couldn't deliver.` (`callback_permission_offline`). Prompts expire after `PROMPT_TTL_MS` (10 min) and are pruned lazily on each remember.
3. It records the pending prompt in `hub/src/telegram/approvals.ts` keyed by
**`(sessionId, requestId)`**, holding a **map of every authorized
`userId → { chatId, messageId }`** plus `{ sessionId, toolName, createdAtMs }`.
**Why a server-side map and not callback_data:** Telegram caps `callback_data`
at 64 bytes — too small for session UUID + request UUID + a user binding. The
map keeps `callback_data` tiny AND the per-user set enforces authorization.
**Shared-session re-keying (MED fast-follow from the #189 review):** the
original design keyed by `requestId` alone, so when two users shared one
default session the second `rememberPendingPrompt` *clobbered* the first —
leaving the prompt bound to whichever user the bridge processed last and
locking out every other (valid) approver with "Not allowed" (fail-closed, but
wrong). Re-keying by `(sessionId, requestId)` and merging each authorized user
into the entry fixes this: any authorized user can resolve, exactly once.
4. On a tap, the webhook's `handlePermissionCallback` calls
`takePendingPrompt(requestId, tappingUserId)` — authorization is now folded
into the take: it returns (and removes) the entry only if the tapping user is
an authorized approver, resolving the whole `(sessionId, requestId)` exactly
once. It then forwards `{ type: "permission_response", session_id, request_id,
approved }` onto the session's **agent socket** via
`getChannel(sessionId).ws.send(...)` — the exact frame the web client sends. It
edits the prompt message to `✅ Approved — <tool>` / `🚫 Denied — <tool>` and
drops the buttons. `answerCallbackQuery` fires on every branch.

**Authorization + edge cases (all answer the callback):** an unauthorized /
unknown / expired / already-resolved tap → `This prompt already expired or was
answered.` (`callback_permission_stale`, nothing sent to the agent — authorization
is enforced inside `takePendingPrompt`); session socket gone → `Session is
offline — couldn't deliver.` (`callback_permission_offline`). Prompts expire after
`PROMPT_TTL_MS` (10 min) and are pruned lazily on each remember.

**No new dispatch path.** Inline approval forwards a control frame on an existing agent socket; it does NOT route a user→session message and therefore does NOT (and must not) touch the cost-cap dispatch pipeline — it's a response to a runner-initiated prompt, not new traffic.

### Summarized streaming (MarkdownV2 + editable working message)

A Telegram-driven session no longer goes silent until one final blob lands. While
a turn runs, the bridge maintains **one editable "working…" message per
`(chat, session)`** and keeps a `typing` chat-action alive.

- **Activity bus.** `hub/src/ws/agent.ts` emits a `session_activity` event (kind
`tool_use` only) on the dedicated bus `hub/src/events/session-activity-events.ts`
— same listener-isolation discipline as `assistant_message:final`. This is an
outbound read-only fanout, **not** a dispatch path (dispatch = inbound
user→session in `hub/src/dispatch/`); the cost-cap gate is untouched.
- **Working message.** On the first `tool_use` of a turn the bridge sends
`⏳ *Working…*` (MarkdownV2) and records `{ messageId, lines }`. Each subsequent
`tool_use` appends a collapsed one-liner — `🔧 Edit hub/src/foo.ts`,
`🔧 Bash <cmd>` — and edits the message in place (throttled ~900ms, list capped
at the last 12 lines). `thinking` is omitted entirely.
- **Typing indicator.** `sendChatAction(chat, "typing")` fires immediately and
then every ~4s (Telegram's typing state expires ~5s) until the turn finalizes.
- **Finalize.** On `assistant_message:final` the bridge stops typing and **edits
the same working message** to the full assistant text (escaped MarkdownV2). If
the final text exceeds 4096 chars it leaves the working summary and sends the
full text as a follow-up. When no `tool_use` ran (or streaming is off) there is
no working message and the final text is sent as a fresh MarkdownV2 message.
- **Reversible flag.** Gated on `config.telegram.summarizedStreaming` (env
`TELEGRAM_SUMMARIZED_STREAMING`, default **on**; set to `false` to revert to a
single final-blob send). The flag only affects the working-message behavior —
finalization (and MarkdownV2) work regardless.

**MarkdownV2 safety.** `sendMessageMd` / `editMessageTextMd` send with
`parse_mode: MarkdownV2`; an unescaped reserved char makes Telegram reject the
WHOLE message with 400, so on a 400 they **retry once as plain text** — a session
reply is never silently dropped. `escapeMarkdownV2` escapes the full reserved set
`_ * [ ] ( ) ~ \` > # + - = | { } . !` (and `\\`); callers escape dynamic content
and keep only their own intentional markup (`*bold*`, ```code```). The inline
Approve/Deny prompt is also MarkdownV2 with the same fallback; the keyboard is
unaffected.

### Pagination edits

`safeEditMessageText` swallows Telegram's `400 message is not modified` error (no-op success when the new payload is byte-identical to the previous). All other errors are surfaced via `console.error` instead of `console.warn`, so paginate-not-firing-silently bugs surface in logs.
Expand Down Expand Up @@ -242,8 +308,13 @@ supervisor → hub WS /ws/agent ← emits assistant_message
hub/src/events/assistant-events.ts ← emits assistant_message:final
hub/src/telegram/bridge.ts ← gates on default-session match
→ finalize working msg OR fresh send
→ sendMessageMd (MarkdownV2 + 400→plain)
→ splitForTelegram (4096 chunks)
→ client.sendMessage (per-chat serial queue)
→ per-chat serial queue
↑ (during the turn)
hub/src/events/session-activity-events.ts ← tool_use one-liners → edit working msg
+ typing chat-action (~4s refresh)
Telegram chat
```
Expand Down Expand Up @@ -294,8 +365,10 @@ Key boundaries:
- 10MB photo cap. Larger photos → polite reject.
- One default session per user. Multi-session fan-out is out of scope (v1).
- No group chats (1 chat_id → 1 user, enforced by UNIQUE).
- No voice / video / stickers / animations / inline keyboards / message editing
/ streaming partial replies. All deferred.
- No voice / video / stickers / animations. Deferred.
- Summarized streaming edits one working message per turn; it does NOT stream raw
token deltas (by design — `thinking`/`text_delta` are dropped). Disable via
`TELEGRAM_SUMMARIZED_STREAMING=false`.

## Migration from the legacy per-user post-run telegram path

Expand All @@ -320,7 +393,8 @@ has soaked. Until then, both code paths coexist.

- `hub/src/api/telegram-webhook.ts` — public ingress, URL-path secret, raw-body-before-parse, zod-validated `Update` envelope, audit-row append, command vs dispatch routing.
- `hub/src/api/telegram.ts` — authed REST: `GET /status`, `POST /link-code`, `DELETE /link`, `PUT /default-session`. Cookie auth + CSRF double-submit (Phase 07 pattern).
- `hub/src/telegram/client.ts` — `sendMessage` / `sendMessageWithKeyboard` (returns `message_id`) / `editMessageText` / `answerCallbackQuery` / `setMyCommands` / `getFile`, `escapeMarkdownV2`, `splitForTelegram`. 10s `AbortSignal.timeout`. Per-chat outbound serial queue lives in `bridge.ts`.
- `hub/src/telegram/client.ts` — `sendMessage` / `sendMessageMd` (MarkdownV2 + 400→plain fallback, returns `message_id`) / `sendMessageWithKeyboard` (now accepts `parse_mode`) / `editMessageText` / `editMessageTextMd` (MarkdownV2 + 400→plain, swallows "not modified") / `sendChatAction` / `answerCallbackQuery` / `setMyCommands` / `getFile`, `escapeMarkdownV2`, `splitForTelegram`. 10s `AbortSignal.timeout`. Per-chat outbound serial queue lives in `bridge.ts`.
- `hub/src/events/session-activity-events.ts` — internal `EventEmitter` for `session_activity` (`tool_use` one-liners). Outbound read-only fanout for the summarized-streaming working message; **not** a dispatch path. **(new — streaming)**
- `hub/src/telegram/commands.ts` — `parse(text)` + handlers for `/start` `/session` `/list` `/status` `/doctor` `/help`; `BOT_COMMANDS` (single source of truth for the slash menu + `/help`).
- `hub/src/telegram/approvals.ts` — inline-approval registry (`rememberPendingPrompt`/`takePendingPrompt`, TTL-pruned) + `pa:`/`pd:` callback_data codec. **(new — Fix C)**
- `hub/src/events/permission-events.ts` — internal `EventEmitter` for `permission_request:pending`. Additive — does not change the WS broadcast path. **(new — Fix C)**
Expand All @@ -331,12 +405,12 @@ has soaked. Until then, both code paths coexist.

### Modified (hub)

- `hub/src/config.ts` — `config.telegram.{botToken,webhookSecret,botUsername}` (all optional).
- `hub/src/config.ts` — `config.telegram.{botToken,webhookSecret,botUsername,summarizedStreaming}` (all optional; `summarizedStreaming` defaults true, env `TELEGRAM_SUMMARIZED_STREAMING=false` disables).
- `hub/src/db/schema.sql` — additive: `users.telegram_chat_id` (BIGINT UNIQUE), `telegram_default_session_id`, `telegram_default_explicit` (BOOLEAN NOT NULL DEFAULT false — distinguishes an explicit `/session`/tap choice from an auto-pin), `telegram_link_code`, `telegram_link_code_expires_at`; new `telegram_inbound_log` table with `(user_id, received_at DESC)` index.
- `hub/src/db/dal.ts` — Telegram DAL helpers (folded into the existing dal module, not a separate `telegram-dal.ts` as the plan envisioned — deviation noted in SUMMARY): `getUserByTelegramChatId`, `getUserByLinkCode`, `setLinkCode`, `linkChatId`, `unlinkChatId`, `setDefaultSession`, `getTelegramStatus`, `appendInboundLog`, `trimInboundLog`, `getUsersWithTelegramDefaultSession`.
- `hub/src/csrf.ts` — Telegram REST routes covered by the existing double-submit middleware (no new exclusions).
- `hub/src/index.ts` — mount `telegram-webhook.ts` AHEAD of JWT + license + CSRF catch-alls; mount `telegram.ts` inside; start the outbound bridge at boot.
- `hub/src/ws/agent.ts` — emit `assistant_message:final` on the internal event bus when a session run finalizes; **also emit `permission_request:pending` when a runner raises a permission prompt (Fix C)**. Additive only.
- `hub/src/ws/agent.ts` — emit `assistant_message:final` on the internal event bus when a session run finalizes; **also emit `permission_request:pending` when a runner raises a permission prompt (Fix C)**; **also emit `session_activity` (tool_use) for the summarized-streaming working message**. Additive only.
- `hub/src/api/telegram-webhook.ts` — **(Fix C)** `handlePermissionCallback` resolves `pa:`/`pd:` taps → forwards `permission_response` on the agent socket; **(Fix B)** picker re-render uses `PAGE_SIZE` (was a hardcoded `20`).
- `hub/src/telegram/bridge.ts` — **(Fix A)** `setMyCommands(BOT_COMMANDS)` on startup; **(Fix C)** subscribes to `permission_request:pending` → sends the Approve/Deny keyboard + records the pending prompt.

Expand Down
13 changes: 6 additions & 7 deletions hub/src/api/telegram-webhook.ts
Original file line number Diff line number Diff line change
Expand Up @@ -466,17 +466,16 @@ async function handlePermissionCallback(
user: TelegramUserRow,
perm: { requestId: string; approved: boolean },
): Promise<{ outcome: string }> {
const pending = takePendingPrompt(perm.requestId);
// Authorization is enforced inside takePendingPrompt: it returns the entry only
// if THIS user is an authorized approver for (requestId). A shared default
// session that fanned the prompt to several users now resolves for whichever
// authorized user taps first (no last-write-wins clobber).
const pending = takePendingPrompt(perm.requestId, user.id);
if (!pending) {
// Already resolved, expired, or unknown — tell the user, no state change.
// Already resolved, expired, unknown, or the tapping user isn't authorized.
await safeAnswerCallback(cb.id, { text: "This prompt already expired or was answered.", show_alert: false });
return { outcome: "callback_permission_stale" };
}
// Authorization: the prompt MUST belong to the tapping user.
if (pending.userId !== user.id) {
await safeAnswerCallback(cb.id, { text: "Not allowed", show_alert: true });
return { outcome: "callback_permission_denied_auth" };
}

// Forward the decision to the agent socket (same frame the web client sends).
const channel = getChannel(pending.sessionId);
Expand Down
5 changes: 5 additions & 0 deletions hub/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,10 @@ const telegramWebhookSecret = requireMinLenIfSet(
16,
);
const telegramBotUsername = process.env.TELEGRAM_BOT_USERNAME || "";
// Summarized streaming: one editable "working…" message per turn that collapses
// tool-use lines to one-liners and finalizes with the assistant text. Default ON;
// set TELEGRAM_SUMMARIZED_STREAMING=false to revert to a single final-blob send.
const telegramSummarizedStreaming = process.env.TELEGRAM_SUMMARIZED_STREAMING !== "false";

// B2 (obs): owner user_id for the hub's self-capture error_project row.
// When unset, self-capture is inert (no row seeded, no hooks installed).
Expand Down Expand Up @@ -182,6 +186,7 @@ export const config = {
botToken: telegramBotToken,
webhookSecret: telegramWebhookSecret,
botUsername: telegramBotUsername,
summarizedStreaming: telegramSummarizedStreaming,
},

// B4: observability bearer token (gates /healthz/deep + /metrics).
Expand Down
69 changes: 69 additions & 0 deletions hub/src/events/session-activity-events.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/**
* Internal hub event bus for SESSION ACTIVITY (tool-use one-liners).
*
* Mirrors `events/assistant-events.ts` + `events/permission-events.ts`: a
* read-only fanout for server-side consumers. The Telegram outbound bridge
* subscribes and folds these into the editable "working…" message it maintains
* per (chat, session) while a turn is in flight. The final assistant text still
* arrives via the separate `assistant_message:final` bus.
*
* Scope (kept deliberately tiny):
* - `tool_use` only. `thinking` and raw `text_delta` are NEVER emitted here —
* the bridge collapses tool calls to one-liners and shows the final text in
* full; intermediate token noise is intentionally dropped.
*
* This is NOT a dispatch path. Dispatch (inbound user→session) lives in
* `hub/src/dispatch/`. This bus is outbound-only and side-effect free for the
* emitter — listener errors are isolated so a bridge throw can't tear down the
* WS handler in `ws/agent.ts`.
*/

import { EventEmitter } from "node:events";

export interface SessionActivityEvent {
/** Session that produced the activity. */
sessionId: string;
/** Owning user of the session (known at emit time in ws/agent.ts). */
userId: string;
/** Activity kind. Only "tool_use" is emitted today. */
kind: "tool_use";
/** Tool name (e.g. "Edit", "Bash", "Read"). */
toolName: string;
/** Best-effort one-line target (file path / command / url); may be empty. */
detail?: string;
}

export type SessionActivityListener = (e: SessionActivityEvent) => void | Promise<void>;

const emitter = new EventEmitter();
emitter.setMaxListeners(50);

const EVENT = "session_activity";

/** Emit a session-activity event. Listener errors are isolated. */
export function emitSessionActivity(e: SessionActivityEvent): void {
const listeners = emitter.listeners(EVENT) as SessionActivityListener[];
for (const fn of listeners) {
try {
const ret = fn(e);
if (ret && typeof (ret as Promise<void>).catch === "function") {
(ret as Promise<void>).catch((err: any) => {
console.warn(`[session-activity-events] async listener error: ${err?.message ?? err}`);
});
}
} catch (err: any) {
console.warn(`[session-activity-events] listener threw: ${err?.message ?? err}`);
}
}
}

/** Subscribe to session-activity events. Returns an unsubscribe fn. */
export function onSessionActivity(listener: SessionActivityListener): () => void {
emitter.on(EVENT, listener as any);
return () => emitter.off(EVENT, listener as any);
}

/** Test-only — remove every listener. */
export function _resetSessionActivityEventsForTests(): void {
emitter.removeAllListeners(EVENT);
}
Binary file modified hub/src/telegram/approvals.ts
Binary file not shown.
Loading
Loading