From 213db91b25fa90d5cf83e7dffc5c1ab1ca44a50a Mon Sep 17 00:00:00 2001 From: NanoClaw Setup Date: Mon, 30 Mar 2026 22:05:43 +0200 Subject: [PATCH] Add Markdown-to-Telegram HTML formatting for chat bridge responses Agent responses were sent as plain text without parse_mode, so Markdown formatting (bold, italic, code, links, etc.) was never rendered by Telegram clients. This adds a conversion pipeline in helpers/format_telegram.py that transforms standard Markdown into Telegram-compatible HTML before sending. The converter handles fenced code blocks, inline code, tables, headings, blockquotes, lists, bold/italic/strikethrough, and links. All raw HTML entities are escaped. Three safety layers protect against bad output: conversion fallback to escaped plain text, HTML-aware message splitting with tag balancing across chunk boundaries, and a send-level fallback that retries without parse_mode if Telegram rejects the HTML. Made-with: Cursor --- helpers/format_telegram.py | 233 +++++++++++++++++++++++++++++++++++++ helpers/telegram_bridge.py | 30 +++-- 2 files changed, 256 insertions(+), 7 deletions(-) create mode 100644 helpers/format_telegram.py diff --git a/helpers/format_telegram.py b/helpers/format_telegram.py new file mode 100644 index 0000000..6dcbc6a --- /dev/null +++ b/helpers/format_telegram.py @@ -0,0 +1,233 @@ +"""Markdown -> Telegram HTML conversion with tag-aware message splitting. + +Telegram supports a limited HTML subset: , , , , , +
, , 
. Everything else is approximated. + +Conversion map: + Fenced code blocks ->
     Inline code (`x`)   -> 
+  Tables (|...|)      -> 
           Headings (# ...)    -> 
+  Blockquotes (>)     -> 
HR (---) -> ——— + Unordered lists -> bullet char **bold** -> + *italic* -> ~~strike~~ -> + [text](url) -> ![alt](url) -> link + +All raw <, >, & are HTML-escaped. Falls back to plain text on failure. +""" + +from __future__ import annotations + +import re + +_TELEGRAM_TAGS = frozenset( + {"b", "i", "u", "s", "code", "pre", "a", "blockquote", "tg-spoiler"} +) +_TAG_RE = re.compile(r"<(/?)(\w[\w-]*)([^>]*)>") + + +# --------------------------------------------------------------------------- +# Public API +# --------------------------------------------------------------------------- + +def markdown_to_telegram_html(text: str) -> str: + """Best-effort Markdown -> Telegram HTML. Returns escaped plain text on + any conversion failure so callers always get a safe string.""" + try: + result = _convert(text) + if "\x00" in result: + return _escape_html(text) + return result + except Exception: + import logging + logging.getLogger("format_telegram").exception( + "Markdown->HTML conversion failed, falling back to plain text" + ) + return _escape_html(text) + + +def split_html_message(html: str, max_length: int = 4096) -> list[str]: + """Split HTML into chunks <= *max_length*, balancing tags across + boundaries so every chunk is valid standalone HTML.""" + effective = max_length - 80 # room for closing/reopening tags + if len(html) <= max_length: + return [html] + + raw: list[str] = [] + buf = html + while buf: + if len(buf) <= effective: + raw.append(buf) + break + cut = _find_safe_cut(buf, effective) + raw.append(buf[:cut]) + buf = buf[cut:].lstrip("\n") + + return _balance_tags(raw) + + +def strip_html(html: str) -> str: + """Remove tags and unescape entities -> plain text (for send fallback).""" + text = re.sub(r"<[^>]+>", "", html) + return ( + text.replace("<", "<").replace(">", ">").replace("&", "&") + ) + + +# --------------------------------------------------------------------------- +# Escaping +# --------------------------------------------------------------------------- + +def _escape_html(text: str) -> str: + return text.replace("&", "&").replace("<", "<").replace(">", ">") + + +# --------------------------------------------------------------------------- +# Conversion pipeline +# --------------------------------------------------------------------------- + +def _convert(text: str) -> str: + stash: list[tuple[str, str]] = [] + + def _put(html: str) -> str: + key = f"\x00\x02{len(stash)}\x03\x00" + stash.append((key, html)) + return key + + # -- Phase 1: protect code blocks (must come before HTML escaping) ------ + + def _fenced(m: re.Match) -> str: + lang = (m.group(1) or "").strip() + code = _escape_html(m.group(2).strip("\n")) + if lang: + return _put( + f'
'
+                f"{code}
" + ) + return _put(f"
{code}
") + + text = re.sub(r"```(\w*)\n(.*?)```", _fenced, text, flags=re.DOTALL) + + def _inline_code(m: re.Match) -> str: + return _put(f"{_escape_html(m.group(1))}") + + text = re.sub(r"`([^`\n]+)`", _inline_code, text) + + # -- Phase 2: tables -> pre block --------------------------------------- + + def _table(m: re.Match) -> str: + lines = m.group(0).strip().split("\n") + kept = [l for l in lines if not re.match(r"^\s*\|[-:\s|]+\|\s*$", l)] + return _put(f"
{_escape_html(chr(10).join(kept))}
") + + text = re.sub( + r"(?:^[ \t]*\|.+\|[ \t]*$\n?){2,}", + _table, + text, + flags=re.MULTILINE, + ) + + # -- Phase 3: escape HTML in remaining text ----------------------------- + text = _escape_html(text) + + # -- Phase 4: block constructs ----------------------------------------- + + # Headings -> bold + text = re.sub(r"^#{1,6}\s+(.+)$", r"\1", text, flags=re.MULTILINE) + + # Blockquotes (consecutive > lines) + def _blockquote(m: re.Match) -> str: + lines = m.group(0).strip().split("\n") + inner = "\n".join(re.sub(r"^>\s?", "", l) for l in lines) + return f"
{inner}
" + + text = re.sub( + r"(?:^>\s?.+$\n?)+", _blockquote, text, flags=re.MULTILINE + ) + + # Horizontal rules + text = re.sub(r"^[-*_]{3,}\s*$", "———", text, flags=re.MULTILINE) + + # Unordered lists + text = re.sub(r"^([ \t]*)[-*+] ", r"\1• ", text, flags=re.MULTILINE) + + # -- Phase 5: inline constructs (order matters) ------------------------- + + # Images before links so ![alt](url) isn't caught as a link + text = re.sub( + r"!\[([^\]]*)\]\(([^)]+)\)", r'🖼
\1', text + ) + text = re.sub(r"\[([^\]]+)\]\(([^)]+)\)", r'\1', text) + + # Bold+italic (*** / ___) before bold (** / __) before italic (* / _) + text = re.sub(r"\*{3}(.+?)\*{3}", r"\1", text) + text = re.sub(r"_{3}(.+?)_{3}", r"\1", text) + text = re.sub(r"\*{2}(.+?)\*{2}", r"\1", text) + text = re.sub(r"__(.+?)__", r"\1", text) + # Single * / _ — but not mid-word underscores (e.g. variable_name) + text = re.sub(r"(?\1
", text) + text = re.sub(r"(?\1
", text) + + text = re.sub(r"~~(.+?)~~", r"\1", text) + + # -- Phase 6: restore stashed content ----------------------------------- + for key, html in stash: + text = text.replace(key, html) + + return text.strip() + + +# --------------------------------------------------------------------------- +# HTML-aware splitting helpers +# --------------------------------------------------------------------------- + +def _find_safe_cut(text: str, max_len: int) -> int: + """Pick a split position that avoids landing inside an HTML tag.""" + cut = text.rfind("\n", 0, max_len) + if cut < max_len // 4: + cut = text.rfind(" ", 0, max_len) + if cut < max_len // 4: + cut = max_len + + # If we're inside a tag, back up to before the '<' + last_open = text.rfind("<", 0, cut) + last_close = text.rfind(">", 0, cut) + if last_open > last_close: + cut = last_open + + return max(cut, 1) + + +def _balance_tags(chunks: list[str]) -> list[str]: + """Close unclosed tags at end of each chunk and reopen them in the next.""" + result: list[str] = [] + carry: list[tuple[str, str]] = [] # (tag_name, full_opening_tag) + + for chunk in chunks: + if carry: + chunk = "".join(tag for _, tag in carry) + chunk + + carry = _unclosed_tags(chunk) + + if carry: + chunk += "".join(f"" for name, _ in reversed(carry)) + + result.append(chunk) + + return result + + +def _unclosed_tags(html: str) -> list[tuple[str, str]]: + """Return (tag_name, full_opening_tag) for tags opened but not closed.""" + stack: list[tuple[str, str]] = [] + for m in _TAG_RE.finditer(html): + is_close = m.group(1) == "/" + name = m.group(2).lower() + if name not in _TELEGRAM_TAGS: + continue + if is_close: + for i in range(len(stack) - 1, -1, -1): + if stack[i][0] == name: + stack.pop(i) + break + else: + stack.append((name, m.group(0))) + return stack diff --git a/helpers/telegram_bridge.py b/helpers/telegram_bridge.py index 9249723..d9cac29 100644 --- a/helpers/telegram_bridge.py +++ b/helpers/telegram_bridge.py @@ -641,16 +641,21 @@ async def _get_agent_response_http(self, chat_id: str, text: str) -> str: # ------------------------------------------------------------------ async def _send_response(self, message, text: str): - """Send a response to Telegram, splitting long messages.""" + """Send a response to Telegram with Markdown->HTML formatting.""" if not text: text = "(No response)" - chunks = _split_message(text) + from usr.plugins.telegram.helpers.format_telegram import ( + markdown_to_telegram_html, + split_html_message, + strip_html, + ) + + html = markdown_to_telegram_html(text) + chunks = split_html_message(html) + for i, chunk in enumerate(chunks): - if i == 0: - sent = await message.reply_text(chunk) - else: - sent = await message.chat.send_message(chunk) + sent = await self._send_chunk(message, chunk, i) # Store bot response for telegram_read tool try: @@ -662,7 +667,7 @@ async def _send_response(self, message, text: str): "title": getattr(sent.chat, "title", ""), "first_name": getattr(sent.chat, "first_name", ""), "username": getattr(sent.chat, "username", "")}, - "text": sent.text or chunk, + "text": sent.text or strip_html(chunk), "from": { "id": self._bot_user.id if self._bot_user else 0, "first_name": self._bot_user.first_name if self._bot_user else "Bot", @@ -674,6 +679,17 @@ async def _send_response(self, message, text: str): except Exception: pass + async def _send_chunk(self, message, html_chunk: str, index: int): + """Send one chunk as HTML, falling back to plain text on parse error.""" + from telegram.error import BadRequest + from usr.plugins.telegram.helpers.format_telegram import strip_html + + send_fn = message.reply_text if index == 0 else message.chat.send_message + try: + return await send_fn(html_chunk, parse_mode="HTML") + except BadRequest: + return await send_fn(strip_html(html_chunk)) + def _split_message(content: str, max_length: int = 4096) -> list[str]: if len(content) <= max_length: