diff --git a/src/ccbot/bot.py b/src/ccbot/bot.py index f8270732..0387a9b7 100644 --- a/src/ccbot/bot.py +++ b/src/ccbot/bot.py @@ -154,6 +154,7 @@ "help": "↗ Show Claude Code help", "memory": "↗ Edit CLAUDE.md", "model": "↗ Switch AI model", + "context": "↗ Show context window usage", } @@ -1741,12 +1742,17 @@ async def handle_new_message(msg: NewMessage, bot: Bot) -> None: # Mark interactive mode BEFORE sleeping so polling skips this window set_interactive_mode(user_id, wid, thread_id) # Flush pending messages (e.g. plan content) before sending interactive UI - queue = get_message_queue(user_id) + queue = get_message_queue(user_id, thread_id) if queue: await queue.join() - # Wait briefly for Claude Code to render the question UI - await asyncio.sleep(0.3) - handled = await handle_interactive_ui(bot, user_id, wid, thread_id) + # Retry pane capture with increasing delays — Claude Code may need + # time to render the interactive UI after the JSONL entry is written. + handled = False + for delay in (0.3, 0.7, 1.0): + await asyncio.sleep(delay) + handled = await handle_interactive_ui(bot, user_id, wid, thread_id) + if handled: + break if handled: # Update user's read offset session = await session_manager.resolve_session_for_window(wid) @@ -1768,7 +1774,10 @@ async def handle_new_message(msg: NewMessage, bot: Bot) -> None: await clear_interactive_msg(user_id, bot, thread_id) # Skip tool call notifications when CCBOT_SHOW_TOOL_CALLS=false - if not config.show_tool_calls and msg.content_type in ("tool_use", "tool_result"): + if not config.show_tool_calls and msg.content_type in ( + "tool_use", + "tool_result", + ): continue parts = build_response_parts( @@ -1841,6 +1850,25 @@ async def post_init(application: Application) -> None: if rate_limiter and rate_limiter._base_limiter: rate_limiter._base_limiter._level = rate_limiter._base_limiter.max_rate logger.info("Pre-filled global rate limiter bucket") + # Also pre-fill per-group limiters for known chat IDs. + # Without this, the group limiter allows a burst of 20 requests on restart, + # which can exceed Telegram's persisted server-side per-group counter. + if hasattr(rate_limiter, "_group_limiters"): + from aiolimiter import AsyncLimiter + + group_rate = getattr(rate_limiter, "_group_max_rate", 20) + group_period = getattr(rate_limiter, "_group_time_period", 60) + seen_chat_ids: set[int] = set() + for chat_id in session_manager.group_chat_ids.values(): + if chat_id < 0 and chat_id not in seen_chat_ids: + seen_chat_ids.add(chat_id) + limiter = AsyncLimiter(group_rate, group_period) + limiter._level = limiter.max_rate + rate_limiter._group_limiters[chat_id] = limiter + if seen_chat_ids: + logger.info( + "Pre-filled %d group rate limiter bucket(s)", len(seen_chat_ids) + ) monitor = SessionMonitor() diff --git a/src/ccbot/config.py b/src/ccbot/config.py index 22d1de76..84711a68 100644 --- a/src/ccbot/config.py +++ b/src/ccbot/config.py @@ -112,6 +112,22 @@ def __init__(self) -> None: for var in SENSITIVE_ENV_VARS: os.environ.pop(var, None) + # Display thinking messages in real-time and history + # Set SHOW_THINKING=false to skip thinking blocks (reduces Telegram API calls) + self.show_thinking = os.getenv("SHOW_THINKING", "true").lower() in ( + "true", + "1", + "yes", + ) + + # Display tool_use/tool_result messages in real-time and history + # Set SHOW_TOOLS=false to skip tool messages (reduces Telegram API calls) + self.show_tools = os.getenv("SHOW_TOOLS", "true").lower() in ( + "true", + "1", + "yes", + ) + logger.debug( "Config initialized: dir=%s, token=%s..., allowed_users=%d, " "tmux_session=%s, claude_projects_path=%s", diff --git a/src/ccbot/handlers/history.py b/src/ccbot/handlers/history.py index 9094107e..f5203e33 100644 --- a/src/ccbot/handlers/history.py +++ b/src/ccbot/handlers/history.py @@ -160,6 +160,16 @@ async def send_history( lines = [header] for msg in messages: + # Skip thinking messages unless show_thinking is enabled + if msg.get("content_type") == "thinking" and not config.show_thinking: + continue + # Skip tool messages unless show_tools is enabled + if ( + msg.get("content_type") in ("tool_use", "tool_result") + and not config.show_tools + ): + continue + # Format timestamp as HH:MM ts = msg.get("timestamp") if ts: diff --git a/src/ccbot/handlers/message_queue.py b/src/ccbot/handlers/message_queue.py index bdd28038..106ff719 100644 --- a/src/ccbot/handlers/message_queue.py +++ b/src/ccbot/handlers/message_queue.py @@ -19,6 +19,7 @@ import asyncio import logging +import re import time from dataclasses import dataclass, field from typing import Literal @@ -29,8 +30,6 @@ from ..markdown_v2 import convert_markdown from ..session import session_manager -from ..terminal_parser import parse_status_line -from ..tmux_manager import tmux_manager from .message_sender import ( NO_LINK_PREVIEW, PARSE_MODE, @@ -66,10 +65,11 @@ class MessageTask: image_data: list[tuple[str, bytes]] | None = None # From tool_result images -# Per-user message queues and worker tasks -_message_queues: dict[int, asyncio.Queue[MessageTask]] = {} -_queue_workers: dict[int, asyncio.Task[None]] = {} -_queue_locks: dict[int, asyncio.Lock] = {} # Protect drain/refill operations +# Per-topic message queues and worker tasks — keyed by (user_id, thread_id_or_0) +_QueueKey = tuple[int, int] +_message_queues: dict[_QueueKey, asyncio.Queue[MessageTask]] = {} +_queue_workers: dict[_QueueKey, asyncio.Task[None]] = {} +_queue_locks: dict[_QueueKey, asyncio.Lock] = {} # Protect drain/refill operations # Map (tool_use_id, user_id, thread_id_or_0) -> telegram message_id # for editing tool_use messages with results @@ -78,28 +78,61 @@ class MessageTask: # Status message tracking: (user_id, thread_id_or_0) -> (message_id, window_id, last_text) _status_msg_info: dict[tuple[int, int], tuple[int, str, str]] = {} -# Flood control: user_id -> monotonic time when ban expires -_flood_until: dict[int, float] = {} +# Flood control: (user_id, thread_id_or_0) -> monotonic time when ban expires +_flood_until: dict[_QueueKey, float] = {} # Max seconds to wait for flood control before dropping tasks FLOOD_CONTROL_MAX_WAIT = 10 +# Per-group-chat processing lock — serializes API calls across topic workers +# sending to the same Telegram group to prevent rate limit bursts +_group_process_locks: dict[int, asyncio.Lock] = {} -def get_message_queue(user_id: int) -> asyncio.Queue[MessageTask] | None: - """Get the message queue for a user (if exists).""" - return _message_queues.get(user_id) +# Typing indicator throttle: (user_id, thread_id_or_0) -> monotonic time of last send +_last_typing: dict[tuple[int, int], float] = {} +# Minimum interval between typing indicators to same topic (seconds) +TYPING_MIN_INTERVAL = 4.0 + + +# Regex to strip the stats parenthetical (and any trailing text) from status lines, e.g.: +# "Unravelling… (45s · ↓ 2.5k tokens · thought for 25s)" → "Unravelling…" +# "Enchanting… (2m 9s · ↓ 8.1k tokens · thought for 49s)" → "Enchanting…" +# "Working… (30s · ↓ 897 tokens) Esc to interrupt" → "Working…" +_STATUS_STATS_RE = re.compile(r"\s*\([\dhms ]+·.*\).*$") + +# Regex to strip trailing time durations embedded directly in the status text, e.g.: +# "Churned for 9m 58s" → "Churned" +# "Responding for 30s" → "Responding" +_STATUS_TIME_RE = re.compile(r"\s+(?:for\s+)?(?:\d+[hms]\s*)+$") + + +def _strip_status_stats(text: str) -> str: + """Strip timing/stats from a status line for dedup.""" + text = _STATUS_STATS_RE.sub("", text) + text = _STATUS_TIME_RE.sub("", text) + return text -def get_or_create_queue(bot: Bot, user_id: int) -> asyncio.Queue[MessageTask]: - """Get or create message queue and worker for a user.""" - if user_id not in _message_queues: - _message_queues[user_id] = asyncio.Queue() - _queue_locks[user_id] = asyncio.Lock() - # Start worker task for this user - _queue_workers[user_id] = asyncio.create_task( - _message_queue_worker(bot, user_id) - ) - return _message_queues[user_id] + +def get_message_queue( + user_id: int, thread_id: int | None = None +) -> asyncio.Queue[MessageTask] | None: + """Get the message queue for a user+topic (if exists).""" + key: _QueueKey = (user_id, thread_id or 0) + return _message_queues.get(key) + + +def get_or_create_queue( + bot: Bot, user_id: int, thread_id: int | None = None +) -> asyncio.Queue[MessageTask]: + """Get or create message queue and worker for a user+topic.""" + key: _QueueKey = (user_id, thread_id or 0) + if key not in _message_queues: + _message_queues[key] = asyncio.Queue() + _queue_locks[key] = asyncio.Lock() + # Start worker task for this topic + _queue_workers[key] = asyncio.create_task(_message_queue_worker(bot, key)) + return _message_queues[key] def _inspect_queue(queue: asyncio.Queue[MessageTask]) -> list[MessageTask]: @@ -197,18 +230,27 @@ async def _merge_content_tasks( ) -async def _message_queue_worker(bot: Bot, user_id: int) -> None: - """Process message tasks for a user sequentially.""" - queue = _message_queues[user_id] - lock = _queue_locks[user_id] - logger.info(f"Message queue worker started for user {user_id}") +async def _message_queue_worker(bot: Bot, key: _QueueKey) -> None: + """Process message tasks for a user+topic sequentially.""" + user_id, _thread_id = key + queue = _message_queues[key] + lock = _queue_locks[key] + + # Shared per-group-chat lock — serializes sends across topic workers + # to stay within Telegram's per-group rate limit + chat_id = session_manager.resolve_chat_id(user_id, _thread_id or None) + if chat_id not in _group_process_locks: + _group_process_locks[chat_id] = asyncio.Lock() + group_lock = _group_process_locks[chat_id] + + logger.info("Message queue worker started for %s", key) while True: try: task = await queue.get() try: # Flood control: drop status, wait for content - flood_end = _flood_until.get(user_id, 0) + flood_end = _flood_until.get(key, 0) if flood_end > 0: remaining = flood_end - time.monotonic() if remaining > 0: @@ -217,30 +259,33 @@ async def _message_queue_worker(bot: Bot, user_id: int) -> None: continue # Content is actual Claude output — wait then send logger.debug( - "Flood controlled: waiting %.0fs for content (user %d)", + "Flood controlled: waiting %.0fs for content (%s)", remaining, - user_id, + key, ) await asyncio.sleep(remaining) # Ban expired - _flood_until.pop(user_id, None) - logger.info("Flood control lifted for user %d", user_id) - - if task.task_type == "content": - # Try to merge consecutive content tasks - merged_task, merge_count = await _merge_content_tasks( - queue, task, lock - ) - if merge_count > 0: - logger.debug(f"Merged {merge_count} tasks for user {user_id}") - # Mark merged tasks as done - for _ in range(merge_count): - queue.task_done() - await _process_content_task(bot, user_id, merged_task) - elif task.task_type == "status_update": - await _process_status_update_task(bot, user_id, task) - elif task.task_type == "status_clear": - await _do_clear_status_message(bot, user_id, task.thread_id or 0) + _flood_until.pop(key, None) + logger.info("Flood control lifted for %s", key) + + async with group_lock: + if task.task_type == "content": + # Try to merge consecutive content tasks + merged_task, merge_count = await _merge_content_tasks( + queue, task, lock + ) + if merge_count > 0: + logger.debug("Merged %d tasks for %s", merge_count, key) + # Mark merged tasks as done + for _ in range(merge_count): + queue.task_done() + await _process_content_task(bot, user_id, merged_task) + elif task.task_type == "status_update": + await _process_status_update_task(bot, user_id, task) + elif task.task_type == "status_clear": + await _do_clear_status_message( + bot, user_id, task.thread_id or 0 + ) except RetryAfter as e: retry_secs = ( e.retry_after @@ -248,29 +293,29 @@ async def _message_queue_worker(bot: Bot, user_id: int) -> None: else int(e.retry_after.total_seconds()) ) if retry_secs > FLOOD_CONTROL_MAX_WAIT: - _flood_until[user_id] = time.monotonic() + retry_secs + _flood_until[key] = time.monotonic() + retry_secs logger.warning( - "Flood control for user %d: retry_after=%ds, " + "Flood control for %s: retry_after=%ds, " "pausing queue until ban expires", - user_id, + key, retry_secs, ) else: logger.warning( - "Flood control for user %d: waiting %ds", - user_id, + "Flood control for %s: waiting %ds", + key, retry_secs, ) await asyncio.sleep(retry_secs) except Exception as e: - logger.error(f"Error processing message task for user {user_id}: {e}") + logger.error("Error processing message task for %s: %s", key, e) finally: queue.task_done() except asyncio.CancelledError: - logger.info(f"Message queue worker cancelled for user {user_id}") + logger.info("Message queue worker cancelled for %s", key) break except Exception as e: - logger.error(f"Unexpected error in queue worker for user {user_id}: {e}") + logger.error("Unexpected error in queue worker for %s: %s", key, e) def _send_kwargs(thread_id: int | None) -> dict[str, int]: @@ -321,7 +366,6 @@ async def _process_content_task(bot: Bot, user_id: int, task: MessageTask) -> No link_preview_options=NO_LINK_PREVIEW, ) await _send_task_images(bot, chat_id, task) - await _check_and_send_status(bot, user_id, wid, task.thread_id) return except RetryAfter: raise @@ -336,7 +380,6 @@ async def _process_content_task(bot: Bot, user_id: int, task: MessageTask) -> No link_preview_options=NO_LINK_PREVIEW, ) await _send_task_images(bot, chat_id, task) - await _check_and_send_status(bot, user_id, wid, task.thread_id) return except RetryAfter: raise @@ -381,9 +424,6 @@ async def _process_content_task(bot: Bot, user_id: int, task: MessageTask) -> No # 4. Send images if present (from tool_result with base64 image blocks) await _send_task_images(bot, chat_id, task) - # 5. After content, check and send status - await _check_and_send_status(bot, user_id, wid, task.thread_id) - async def _convert_status_to_content( bot: Bot, @@ -466,21 +506,24 @@ async def _process_status_update_task( # Window changed - delete old and send new await _do_clear_status_message(bot, user_id, tid) await _do_send_status_message(bot, user_id, tid, wid, status_text) - elif status_text == last_text: - # Same content, skip edit + elif _strip_status_stats(status_text) == _strip_status_stats(last_text): + # Same action (ignoring timer changes), skip edit return else: # Same window, text changed - edit in place - # Send typing indicator when Claude is working + # Send typing indicator when Claude is working (throttled) if "esc to interrupt" in status_text.lower(): - try: - await bot.send_chat_action( - chat_id=chat_id, action=ChatAction.TYPING - ) - except RetryAfter: - raise - except Exception: - pass + now = time.monotonic() + if now - _last_typing.get(skey, 0) >= TYPING_MIN_INTERVAL: + try: + await bot.send_chat_action( + chat_id=chat_id, action=ChatAction.TYPING + ) + _last_typing[skey] = now + except RetryAfter: + raise + except Exception: + pass try: await bot.edit_message_text( chat_id=chat_id, @@ -531,14 +574,17 @@ async def _do_send_status_message( await bot.delete_message(chat_id=chat_id, message_id=old[0]) except Exception: pass - # Send typing indicator when Claude is working + # Send typing indicator when Claude is working (throttled) if "esc to interrupt" in text.lower(): - try: - await bot.send_chat_action(chat_id=chat_id, action=ChatAction.TYPING) - except RetryAfter: - raise - except Exception: - pass + now = time.monotonic() + if now - _last_typing.get(skey, 0) >= TYPING_MIN_INTERVAL: + try: + await bot.send_chat_action(chat_id=chat_id, action=ChatAction.TYPING) + _last_typing[skey] = now + except RetryAfter: + raise + except Exception: + pass sent = await send_with_fallback( bot, chat_id, @@ -566,31 +612,6 @@ async def _do_clear_status_message( logger.debug(f"Failed to delete status message {msg_id}: {e}") -async def _check_and_send_status( - bot: Bot, - user_id: int, - window_id: str, - thread_id: int | None = None, -) -> None: - """Check terminal for status line and send status message if present.""" - # Skip if there are more messages pending in the queue - queue = _message_queues.get(user_id) - if queue and not queue.empty(): - return - w = await tmux_manager.find_window_by_id(window_id) - if not w: - return - - pane_text = await tmux_manager.capture_pane(w.window_id) - if not pane_text: - return - - tid = thread_id or 0 - status_line = parse_status_line(pane_text) - if status_line: - await _do_send_status_message(bot, user_id, tid, window_id, status_line) - - async def enqueue_content_message( bot: Bot, user_id: int, @@ -609,7 +630,7 @@ async def enqueue_content_message( window_id, content_type, ) - queue = get_or_create_queue(bot, user_id) + queue = get_or_create_queue(bot, user_id, thread_id) task = MessageTask( task_type="content", @@ -632,21 +653,26 @@ async def enqueue_status_update( thread_id: int | None = None, ) -> None: """Enqueue status update. Skipped if text unchanged or during flood control.""" + tid = thread_id or 0 + key: _QueueKey = (user_id, tid) + # Don't enqueue during flood control — they'd just be dropped - flood_end = _flood_until.get(user_id, 0) + flood_end = _flood_until.get(key, 0) if flood_end > time.monotonic(): return - tid = thread_id or 0 - - # Deduplicate: skip if text matches what's already displayed + # Deduplicate: skip if action text matches (ignoring timer/stats changes) if status_text: skey = (user_id, tid) info = _status_msg_info.get(skey) - if info and info[1] == window_id and info[2] == status_text: + if ( + info + and info[1] == window_id + and _strip_status_stats(info[2]) == _strip_status_stats(status_text) + ): return - queue = get_or_create_queue(bot, user_id) + queue = get_or_create_queue(bot, user_id, thread_id) if status_text: task = MessageTask( @@ -692,4 +718,6 @@ async def shutdown_workers() -> None: _queue_workers.clear() _message_queues.clear() _queue_locks.clear() + _group_process_locks.clear() + _last_typing.clear() logger.info("Message queue workers stopped") diff --git a/src/ccbot/handlers/status_polling.py b/src/ccbot/handlers/status_polling.py index c4de1c6e..2d4e6155 100644 --- a/src/ccbot/handlers/status_polling.py +++ b/src/ccbot/handlers/status_polling.py @@ -24,12 +24,14 @@ from telegram.error import BadRequest from ..session import session_manager -from ..terminal_parser import is_interactive_ui, parse_status_line +from ..terminal_parser import extract_interactive_content, parse_status_line from ..tmux_manager import tmux_manager from .interactive_ui import ( + INTERACTIVE_TOOL_NAMES, clear_interactive_msg, get_interactive_window, handle_interactive_ui, + set_interactive_mode, ) from .cleanup import clear_topic_state from .message_queue import enqueue_status_update, get_message_queue @@ -78,7 +80,7 @@ async def update_status_message( if interactive_window == window_id: # User is in interactive mode for THIS window - if is_interactive_ui(pane_text): + if extract_interactive_content(pane_text) is not None: # Interactive UI still showing — skip status update (user is interacting) return # Interactive UI gone — clear interactive mode, fall through to status check. @@ -90,17 +92,27 @@ async def update_status_message( # Clear stale interactive mode await clear_interactive_msg(user_id, bot, thread_id) - # Check for permission prompt (interactive UI not triggered via JSONL) - # ALWAYS check UI, regardless of skip_status - if should_check_new_ui and is_interactive_ui(pane_text): - logger.debug( - "Interactive UI detected in polling (user=%d, window=%s, thread=%s)", - user_id, - window_id, - thread_id, - ) - await handle_interactive_ui(bot, user_id, window_id, thread_id) - return + if should_check_new_ui: + content = extract_interactive_content(pane_text) + if content is not None: + if content.name == "Feedback": + # Auto-dismiss feedback survey by pressing "0" (Dismiss) + await tmux_manager.send_keys(window_id, "0", enter=False, literal=False) + logger.info("Auto-dismissed feedback survey in window %s", window_id) + return + if ( + content.name in INTERACTIVE_TOOL_NAMES + and session_manager.get_window_state(window_id).session_id + ): + # ExitPlanMode/AskUserQuestion have JSONL entries — the JSONL path + # (bot.py handle_new_message) is the sole sender for these UIs. + # Just set interactive mode here to suppress status updates. + set_interactive_mode(user_id, window_id, thread_id) + else: + # PermissionPrompt/RestoreCheckpoint/Settings or no JSONL session: + # send immediately (no ordering concern). + await handle_interactive_ui(bot, user_id, window_id, thread_id) + return # Normal status line check — skip if queue is non-empty if skip_status: @@ -133,13 +145,21 @@ async def status_poll_loop(bot: Bot) -> None: session_manager.iter_thread_bindings() ): try: - await bot.unpin_all_forum_topic_messages( - chat_id=session_manager.resolve_chat_id(user_id, thread_id), + chat_id = session_manager.resolve_chat_id(user_id, thread_id) + # reopen_forum_topic is a silent probe: + # - open topic → BadRequest("Topic_not_modified") + # - deleted topic → BadRequest("Topic_id_invalid") + await bot.reopen_forum_topic( + chat_id=chat_id, message_thread_id=thread_id, ) except BadRequest as e: - if "Topic_id_invalid" in str(e): - # Topic deleted — kill window, unbind, and clean up state + err = str(e) + if "not_modified" in err.lower(): + # Topic exists and is open — no-op + continue + if "thread not found" in err or "Topic_id_invalid" in err: + # Topic deleted — kill window, unbind, and clean up w = await tmux_manager.find_window_by_id(wid) if w: await tmux_manager.kill_window(w.window_id) @@ -180,12 +200,11 @@ async def status_poll_loop(bot: Bot) -> None: ) continue + queue = get_message_queue(user_id, thread_id) # UI detection happens unconditionally in update_status_message. # Status enqueue is skipped inside update_status_message when # interactive UI is detected (returns early) or when queue is non-empty. - queue = get_message_queue(user_id) skip_status = queue is not None and not queue.empty() - await update_status_message( bot, user_id, diff --git a/src/ccbot/session.py b/src/ccbot/session.py index 173293b1..b5abb30b 100644 --- a/src/ccbot/session.py +++ b/src/ccbot/session.py @@ -151,10 +151,21 @@ def _load_state(self) -> None: int(uid): offsets for uid, offsets in state.get("user_window_offsets", {}).items() } - self.thread_bindings = { - int(uid): {int(tid): wid for tid, wid in bindings.items()} - for uid, bindings in state.get("thread_bindings", {}).items() - } + self.thread_bindings = {} + for uid, bindings in state.get("thread_bindings", {}).items(): + parsed: dict[int, str] = {} + for tid, wid in bindings.items(): + if isinstance(wid, dict): + # Old format: {"window_id": "@4", "chat_id": ...} + parsed[int(tid)] = wid["window_id"] + # Migrate chat_id to group_chat_ids + chat_id = wid.get("chat_id") + if chat_id and int(chat_id) < 0: + key = f"{uid}:{tid}" + self.group_chat_ids[key] = int(chat_id) + else: + parsed[int(tid)] = wid + self.thread_bindings[int(uid)] = parsed self.window_display_names = state.get("window_display_names", {}) self.group_chat_ids = { k: int(v) for k, v in state.get("group_chat_ids", {}).items() diff --git a/src/ccbot/session_monitor.py b/src/ccbot/session_monitor.py index 0a1b3186..ef685560 100644 --- a/src/ccbot/session_monitor.py +++ b/src/ccbot/session_monitor.py @@ -21,6 +21,7 @@ import aiofiles from .config import config +from .handlers.interactive_ui import INTERACTIVE_TOOL_NAMES from .monitor_state import MonitorState, TrackedSession from .tmux_manager import tmux_manager from .transcript_parser import TranscriptParser @@ -77,6 +78,8 @@ def __init__( self._running = False self._task: asyncio.Task | None = None self._message_callback: Callable[[NewMessage], Awaitable[None]] | None = None + # Fire-and-forget callback tasks (one per session group per poll cycle) + self._callback_tasks: set[asyncio.Task[None]] = set() # Per-session pending tool_use state carried across poll cycles self._pending_tools: dict[str, dict[str, Any]] = {} # session_id -> pending # Track last known session_map for detecting changes @@ -269,18 +272,22 @@ async def _read_new_lines( async def check_for_updates(self, active_session_ids: set[str]) -> list[NewMessage]: """Check all sessions for new assistant messages. - Reads from last byte offset. Emits both intermediate - (stop_reason=null) and complete messages. + Uses a collect → read → parse pipeline: + 1. Collect: identify sessions that need reading (mtime/size changed) + 2. Read: parallel async file reads via asyncio.gather + 3. Parse: sequential per-session parsing (safe — _pending_tools keyed by session) Args: active_session_ids: Set of session IDs currently in session_map """ - new_messages = [] + new_messages: list[NewMessage] = [] # Scan projects to get available session files sessions = await self.scan_projects() - # Only process sessions that are in session_map + # Phase 1: Collect — identify sessions needing reads + to_read: list[tuple[SessionInfo, TrackedSession, float]] = [] + for session_info in sessions: if session_info.session_id not in active_session_ids: continue @@ -319,55 +326,91 @@ async def check_for_updates(self, active_session_ids: set[str]) -> list[NewMessa current_mtime <= last_mtime and current_size <= tracked.last_byte_offset ): - # File hasn't changed, skip reading continue - # File changed, read new content from last offset - new_entries = await self._read_new_lines( - tracked, session_info.file_path + to_read.append((session_info, tracked, current_mtime)) + + except OSError as e: + logger.debug(f"Error collecting session {session_info.session_id}: {e}") + + if not to_read: + self.state.save_if_dirty() + return new_messages + + # Phase 2: Read — parallel file reads + read_results: list[list[dict[str, Any]] | BaseException] = await asyncio.gather( + *( + self._read_new_lines(tracked, si.file_path) + for si, tracked, _mtime in to_read + ), + return_exceptions=True, + ) + + # Phase 3: Parse — sequential per session + for (session_info, tracked, current_mtime), result in zip( + to_read, read_results, strict=True + ): + if isinstance(result, BaseException): + logger.debug( + "Error reading session %s: %s", session_info.session_id, result ) - self._file_mtimes[session_info.session_id] = current_mtime + continue - if new_entries: - logger.debug( - f"Read {len(new_entries)} new entries for " - f"session {session_info.session_id}" - ) + new_entries: list[dict[str, Any]] = result + self._file_mtimes[session_info.session_id] = current_mtime - # Parse new entries using the shared logic, carrying over pending tools - carry = self._pending_tools.get(session_info.session_id, {}) - parsed_entries, remaining = TranscriptParser.parse_entries( - new_entries, - pending_tools=carry, + if new_entries: + logger.debug( + "Read %d new entries for session %s", + len(new_entries), + session_info.session_id, ) - if remaining: - self._pending_tools[session_info.session_id] = remaining - else: - self._pending_tools.pop(session_info.session_id, None) - for entry in parsed_entries: - if not entry.text and not entry.image_data: - continue - # Skip user messages unless show_user_messages is enabled - if entry.role == "user" and not config.show_user_messages: + # Parse new entries using the shared logic, carrying over pending tools + carry = self._pending_tools.get(session_info.session_id, {}) + parsed_entries, remaining = TranscriptParser.parse_entries( + new_entries, + pending_tools=carry, + ) + if remaining: + self._pending_tools[session_info.session_id] = remaining + else: + self._pending_tools.pop(session_info.session_id, None) + + for entry in parsed_entries: + if not entry.text and not entry.image_data: + continue + # Skip user messages unless show_user_messages is enabled + if entry.role == "user" and not config.show_user_messages: + continue + # Skip thinking messages unless show_thinking is enabled + if entry.content_type == "thinking" and not config.show_thinking: + continue + # Skip tool messages unless show_tools is enabled + # Exception: interactive tools (AskUserQuestion, ExitPlanMode) must pass through + if ( + entry.content_type in ("tool_use", "tool_result") + and not config.show_tools + ): + if not ( + entry.content_type == "tool_use" + and entry.tool_name in INTERACTIVE_TOOL_NAMES + ): continue - new_messages.append( - NewMessage( - session_id=session_info.session_id, - text=entry.text, - is_complete=True, - content_type=entry.content_type, - tool_use_id=entry.tool_use_id, - role=entry.role, - tool_name=entry.tool_name, - image_data=entry.image_data, - ) + new_messages.append( + NewMessage( + session_id=session_info.session_id, + text=entry.text, + is_complete=True, + content_type=entry.content_type, + tool_use_id=entry.tool_use_id, + role=entry.role, + tool_name=entry.tool_name, + image_data=entry.image_data, ) + ) - self.state.update_session(tracked) - - except OSError as e: - logger.debug(f"Error processing session {session_info.session_id}: {e}") + self.state.update_session(tracked) self.state.save_if_dirty() return new_messages @@ -466,6 +509,17 @@ async def _detect_and_cleanup_changes(self) -> dict[str, str]: return current_map + async def _dispatch_session_messages( + self, session_id: str, messages: list[NewMessage] + ) -> None: + """Dispatch messages for one session sequentially (fire-and-forget task).""" + for msg in messages: + try: + if self._message_callback: + await self._message_callback(msg) + except Exception as e: + logger.error("Message callback error (session %s): %s", session_id, e) + async def _monitor_loop(self) -> None: """Background loop for checking session updates. @@ -493,15 +547,23 @@ async def _monitor_loop(self) -> None: # Check for new messages (all I/O is async) new_messages = await self.check_for_updates(active_session_ids) - for msg in new_messages: - status = "complete" if msg.is_complete else "streaming" - preview = msg.text[:80] + ("..." if len(msg.text) > 80 else "") - logger.info("[%s] session=%s: %s", status, msg.session_id, preview) - if self._message_callback: - try: - await self._message_callback(msg) - except Exception as e: - logger.error(f"Message callback error: {e}") + if new_messages and self._message_callback: + # Group messages by session_id for concurrent dispatch + groups: dict[str, list[NewMessage]] = {} + for msg in new_messages: + status = "complete" if msg.is_complete else "streaming" + preview = msg.text[:80] + ("..." if len(msg.text) > 80 else "") + logger.info( + "[%s] session=%s: %s", status, msg.session_id, preview + ) + groups.setdefault(msg.session_id, []).append(msg) + + for session_id, msgs in groups.items(): + task = asyncio.create_task( + self._dispatch_session_messages(session_id, msgs) + ) + self._callback_tasks.add(task) + task.add_done_callback(self._callback_tasks.discard) except Exception as e: logger.error(f"Monitor loop error: {e}") @@ -522,5 +584,9 @@ def stop(self) -> None: if self._task: self._task.cancel() self._task = None + # Cancel outstanding fire-and-forget callback tasks + for task in list(self._callback_tasks): + task.cancel() + self._callback_tasks.clear() self.state.save() logger.info("Session monitor stopped and state saved") diff --git a/src/ccbot/terminal_parser.py b/src/ccbot/terminal_parser.py index 1afefed0..0b5cbb4e 100644 --- a/src/ccbot/terminal_parser.py +++ b/src/ccbot/terminal_parser.py @@ -59,6 +59,13 @@ class UIPattern: re.compile(r"^\s*Esc to (cancel|exit)"), ), ), + # Fallback: numbered selector UI (❯ 1. Yes / 2. No) without old markers + UIPattern( + name="ExitPlanMode", + top=(re.compile(r"^\s*❯\s+\d+\.\s+Yes"),), + bottom=(), # extends to last non-empty line + min_gap=1, + ), UIPattern( name="AskUserQuestion", top=(re.compile(r"^\s*←\s+[☐✔☒]"),), # Multi-tab: no bottom needed @@ -71,6 +78,13 @@ class UIPattern: bottom=(re.compile(r"^\s*Enter to select"),), min_gap=1, ), + # Numbered selector: "❯ 1. [ ] ..." or " 1. [x] ..." (tab bar scrolled off) + UIPattern( + name="AskUserQuestion", + top=(re.compile(r"^\s*(?:❯\s+)?\d+\.\s+\["),), + bottom=(re.compile(r"^\s*Enter to select"),), + min_gap=1, + ), UIPattern( name="PermissionPrompt", top=( @@ -102,6 +116,12 @@ class UIPattern: top=(re.compile(r"^\s*Restore the code"),), bottom=(re.compile(r"^\s*Enter to continue"),), ), + UIPattern( + name="Feedback", + top=(re.compile(r"How is Claude doing this session"),), + bottom=(re.compile(r"0:\s*Dismiss"),), + min_gap=1, + ), UIPattern( name="Settings", top=( diff --git a/src/ccbot/tmux_manager.py b/src/ccbot/tmux_manager.py index f05b4f3a..2a7dcbeb 100644 --- a/src/ccbot/tmux_manager.py +++ b/src/ccbot/tmux_manager.py @@ -117,6 +117,9 @@ def _sync_list_windows() -> list[TmuxWindow]: pane = window.active_pane if pane: cwd = pane.pane_current_path or "" + # tmux appends " (deleted)" when cwd no longer exists + if cwd.endswith(" (deleted)"): + cwd = cwd.removesuffix(" (deleted)") pane_cmd = pane.pane_current_command or "" else: cwd = "" diff --git a/tests/ccbot/conftest.py b/tests/ccbot/conftest.py index f274f9c6..9b8bb2f1 100644 --- a/tests/ccbot/conftest.py +++ b/tests/ccbot/conftest.py @@ -163,6 +163,30 @@ def sample_pane_settings(): ) +@pytest.fixture +def sample_pane_exit_plan_numbered(): + """Realistic pane showing numbered ExitPlanMode selector (current format). + + The old markers (Would you like to proceed?, ctrl-g to edit in) are NOT + present — only the ❯ 1. Yes / 2. No selector with plan context above. + """ + return ( + " I've written a plan to the plan file.\n" + "\n" + " Here's a summary of what I'll do:\n" + " 1. Update the config parser\n" + " 2. Add validation logic\n" + "\n" + " ❯ 1. Yes\n" + " 2. No\n" + "\n" + "──────────────────────────────────────\n" + "❯ \n" + "──────────────────────────────────────\n" + " [Opus 4.6] Context: 34%\n" + ) + + @pytest.fixture def sample_pane_no_ui(): return "$ echo hello\nhello\n$\n" diff --git a/tests/ccbot/handlers/test_interactive_ui.py b/tests/ccbot/handlers/test_interactive_ui.py index 8d6a98e4..6698c844 100644 --- a/tests/ccbot/handlers/test_interactive_ui.py +++ b/tests/ccbot/handlers/test_interactive_ui.py @@ -92,6 +92,60 @@ async def test_handle_no_ui_returns_false(self, mock_bot: AsyncMock): assert result is False mock_bot.send_message.assert_not_called() + @pytest.mark.asyncio + async def test_handle_exit_plan_numbered_sends_keyboard( + self, mock_bot: AsyncMock, sample_pane_exit_plan_numbered: str + ): + """New numbered ExitPlanMode format triggers keyboard send.""" + window_id = "@5" + mock_window = MagicMock() + mock_window.window_id = window_id + + with ( + patch("ccbot.handlers.interactive_ui.tmux_manager") as mock_tmux, + patch("ccbot.handlers.interactive_ui.session_manager") as mock_sm, + ): + mock_tmux.find_window_by_id = AsyncMock(return_value=mock_window) + mock_tmux.capture_pane = AsyncMock( + return_value=sample_pane_exit_plan_numbered + ) + mock_sm.resolve_chat_id.return_value = 100 + + result = await handle_interactive_ui( + mock_bot, user_id=1, window_id=window_id, thread_id=42 + ) + + assert result is True + mock_bot.send_message.assert_called_once() + call_kwargs = mock_bot.send_message.call_args + assert call_kwargs.kwargs["reply_markup"] is not None + + @pytest.mark.asyncio + async def test_exit_plan_old_format_sends_keyboard( + self, mock_bot: AsyncMock, sample_pane_exit_plan: str + ): + """Old ExitPlanMode format still triggers keyboard send (backward compat).""" + window_id = "@5" + mock_window = MagicMock() + mock_window.window_id = window_id + + with ( + patch("ccbot.handlers.interactive_ui.tmux_manager") as mock_tmux, + patch("ccbot.handlers.interactive_ui.session_manager") as mock_sm, + ): + mock_tmux.find_window_by_id = AsyncMock(return_value=mock_window) + mock_tmux.capture_pane = AsyncMock(return_value=sample_pane_exit_plan) + mock_sm.resolve_chat_id.return_value = 100 + + result = await handle_interactive_ui( + mock_bot, user_id=1, window_id=window_id, thread_id=42 + ) + + assert result is True + mock_bot.send_message.assert_called_once() + call_kwargs = mock_bot.send_message.call_args + assert call_kwargs.kwargs["reply_markup"] is not None + class TestKeyboardLayoutForSettings: def test_settings_keyboard_includes_all_nav_keys(self): diff --git a/tests/ccbot/handlers/test_message_queue.py b/tests/ccbot/handlers/test_message_queue.py new file mode 100644 index 00000000..48952be5 --- /dev/null +++ b/tests/ccbot/handlers/test_message_queue.py @@ -0,0 +1,50 @@ +"""Tests for message_queue — status stats stripping for dedup.""" + +import pytest + +from ccbot.handlers.message_queue import _strip_status_stats + + +class TestStripStatusStats: + @pytest.mark.parametrize( + ("input_text", "expected"), + [ + pytest.param( + "Thinking… (45s · ↓ 2.5k tokens · thought for 25s)", + "Thinking…", + id="seconds_only", + ), + pytest.param( + "Enchanting… (2m 9s · ↓ 8.1k tokens · thought for 49s)", + "Enchanting…", + id="minutes_and_seconds", + ), + pytest.param( + "Working… (1h 2m 3s · ↓ 50k tokens)", + "Working…", + id="hours_minutes_seconds", + ), + pytest.param( + "Just text without stats", + "Just text without stats", + id="no_parenthetical", + ), + pytest.param( + "Idle…", + "Idle…", + id="no_stats", + ), + pytest.param( + "Germinating… (30s · ↓ 897 tokens · thought for 2s) Esc to interrupt", + "Germinating…", + id="with_trailing_esc", + ), + pytest.param( + "Thinking… (2m 9s · ↓ 8.1k tokens) Esc to interrupt", + "Thinking…", + id="minutes_with_trailing_esc", + ), + ], + ) + def test_strip_status_stats(self, input_text: str, expected: str): + assert _strip_status_stats(input_text) == expected diff --git a/tests/ccbot/handlers/test_status_polling.py b/tests/ccbot/handlers/test_status_polling.py index 9c0f04f7..93f67ca9 100644 --- a/tests/ccbot/handlers/test_status_polling.py +++ b/tests/ccbot/handlers/test_status_polling.py @@ -139,3 +139,108 @@ async def test_settings_ui_end_to_end_sends_telegram_keyboard( assert keyboard is not None # Verify the message text contains model picker content assert "Select model" in call_kwargs["text"] + + +@pytest.mark.usefixtures("_clear_interactive_state") +class TestStatusPollerExitPlanDetection: + """Simulate the status poller detecting ExitPlanMode UI (numbered format).""" + + @pytest.mark.asyncio + async def test_exit_plan_with_session_defers_to_jsonl( + self, mock_bot: AsyncMock, sample_pane_exit_plan_numbered: str + ): + """ExitPlanMode with active session → set_interactive_mode, no handle_interactive_ui.""" + window_id = "@5" + mock_window = MagicMock() + mock_window.window_id = window_id + + mock_ws = MagicMock() + mock_ws.session_id = "uuid-xxx" + + with ( + patch("ccbot.handlers.status_polling.tmux_manager") as mock_tmux, + patch( + "ccbot.handlers.status_polling.handle_interactive_ui", + new_callable=AsyncMock, + ) as mock_handle_ui, + patch( + "ccbot.handlers.status_polling.set_interactive_mode", + ) as mock_set_mode, + patch("ccbot.handlers.status_polling.session_manager") as mock_sm, + ): + mock_tmux.find_window_by_id = AsyncMock(return_value=mock_window) + mock_tmux.capture_pane = AsyncMock( + return_value=sample_pane_exit_plan_numbered + ) + mock_sm.get_window_state.return_value = mock_ws + + await update_status_message( + mock_bot, user_id=1, window_id=window_id, thread_id=42 + ) + + mock_set_mode.assert_called_once_with(1, window_id, 42) + mock_handle_ui.assert_not_called() + + @pytest.mark.asyncio + async def test_exit_plan_without_session_sends_immediately( + self, mock_bot: AsyncMock, sample_pane_exit_plan_numbered: str + ): + """ExitPlanMode with no session_id → handle_interactive_ui called (fallback).""" + window_id = "@5" + mock_window = MagicMock() + mock_window.window_id = window_id + + mock_ws = MagicMock() + mock_ws.session_id = "" + + with ( + patch("ccbot.handlers.status_polling.tmux_manager") as mock_tmux, + patch( + "ccbot.handlers.status_polling.handle_interactive_ui", + new_callable=AsyncMock, + ) as mock_handle_ui, + patch("ccbot.handlers.status_polling.session_manager") as mock_sm, + ): + mock_tmux.find_window_by_id = AsyncMock(return_value=mock_window) + mock_tmux.capture_pane = AsyncMock( + return_value=sample_pane_exit_plan_numbered + ) + mock_sm.get_window_state.return_value = mock_ws + mock_handle_ui.return_value = True + + await update_status_message( + mock_bot, user_id=1, window_id=window_id, thread_id=42 + ) + + mock_handle_ui.assert_called_once_with(mock_bot, 1, window_id, 42) + + @pytest.mark.asyncio + async def test_permission_prompt_always_sends_immediately( + self, mock_bot: AsyncMock, sample_pane_permission: str + ): + """PermissionPrompt → handle_interactive_ui called regardless of session_id.""" + window_id = "@5" + mock_window = MagicMock() + mock_window.window_id = window_id + + mock_ws = MagicMock() + mock_ws.session_id = "uuid-xxx" + + with ( + patch("ccbot.handlers.status_polling.tmux_manager") as mock_tmux, + patch( + "ccbot.handlers.status_polling.handle_interactive_ui", + new_callable=AsyncMock, + ) as mock_handle_ui, + patch("ccbot.handlers.status_polling.session_manager") as mock_sm, + ): + mock_tmux.find_window_by_id = AsyncMock(return_value=mock_window) + mock_tmux.capture_pane = AsyncMock(return_value=sample_pane_permission) + mock_sm.get_window_state.return_value = mock_ws + mock_handle_ui.return_value = True + + await update_status_message( + mock_bot, user_id=1, window_id=window_id, thread_id=42 + ) + + mock_handle_ui.assert_called_once_with(mock_bot, 1, window_id, 42) diff --git a/tests/ccbot/test_terminal_parser.py b/tests/ccbot/test_terminal_parser.py index 08118430..c6c07277 100644 --- a/tests/ccbot/test_terminal_parser.py +++ b/tests/ccbot/test_terminal_parser.py @@ -83,6 +83,23 @@ def test_exit_plan_mode_variant(self): assert result.name == "ExitPlanMode" assert "Claude has written up a plan" in result.content + def test_exit_plan_mode_numbered_selector( + self, sample_pane_exit_plan_numbered: str + ): + """New numbered ❯ 1. Yes / 2. No format is detected as ExitPlanMode.""" + result = extract_interactive_content(sample_pane_exit_plan_numbered) + assert result is not None + assert result.name == "ExitPlanMode" + assert "❯" in result.content + assert "Yes" in result.content + + def test_exit_plan_mode_old_format_still_works(self, sample_pane_exit_plan: str): + """Backward compat: old ExitPlanMode format still detected.""" + result = extract_interactive_content(sample_pane_exit_plan) + assert result is not None + assert result.name == "ExitPlanMode" + assert "Would you like to proceed?" in result.content + def test_ask_user_multi_tab(self, sample_pane_ask_user_multi_tab: str): result = extract_interactive_content(sample_pane_ask_user_multi_tab) assert result is not None