From bdb5ea97f1d4a083bad49f26600a91e53b5ed3fd Mon Sep 17 00:00:00 2001 From: LikunYDev Date: Sat, 21 Feb 2026 15:25:40 +0000 Subject: [PATCH 01/15] feat: add systemd service restart script Adds restart-service.sh that restarts ccbot via systemctl --user, keeping existing Claude Code tmux sessions alive. Co-Authored-By: Claude Opus 4.6 --- scripts/restart-service.sh | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) create mode 100755 scripts/restart-service.sh diff --git a/scripts/restart-service.sh b/scripts/restart-service.sh new file mode 100755 index 00000000..63a7d9fb --- /dev/null +++ b/scripts/restart-service.sh @@ -0,0 +1,24 @@ +#!/usr/bin/env bash +set -euo pipefail + +# Restart ccbot via systemd user service. +# Claude Code sessions live in tmux windows and are unaffected by this restart. +# The bot reconnects to existing sessions via state.json and session_map.json on startup. + +echo "Restarting ccbot service..." +systemctl --user restart ccbot + +sleep 2 + +if systemctl --user is-active --quiet ccbot; then + echo "ccbot restarted successfully." + echo "----------------------------------------" + journalctl --user -u ccbot --no-pager -n 20 + echo "----------------------------------------" +else + echo "Error: ccbot failed to start." + echo "----------------------------------------" + journalctl --user -u ccbot --no-pager -n 30 + echo "----------------------------------------" + exit 1 +fi From 6b6374cd7cb055c466ab1dca2f8f8988bbf439e3 Mon Sep 17 00:00:00 2001 From: LikunYDev Date: Sat, 21 Feb 2026 15:25:41 +0000 Subject: [PATCH 02/15] feat: add /context command to show context window usage Co-Authored-By: Claude Opus 4.6 --- src/ccbot/bot.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/ccbot/bot.py b/src/ccbot/bot.py index 1b218934..f6beaa9c 100644 --- a/src/ccbot/bot.py +++ b/src/ccbot/bot.py @@ -143,6 +143,7 @@ "help": "↗ Show Claude Code help", "memory": "↗ Edit CLAUDE.md", "model": "↗ Switch AI model", + "context": "↗ Show context window usage", } From 17ff3ee6fdb7f71d61181a8ff8d8a30fa3831c53 Mon Sep 17 00:00:00 2001 From: LikunYDev Date: Sat, 21 Feb 2026 16:06:10 +0000 Subject: [PATCH 03/15] fix: resolve multi-session concurrency bottlenecks Per-topic message queues (was per-user) so sessions in different topics no longer block each other. Fire-and-forget monitor callbacks grouped by session_id for concurrent cross-session dispatch. Status polling hoists list_windows() to one call per cycle with parallel pane captures. Session file reads parallelized via asyncio.gather. Co-Authored-By: Claude Opus 4.6 --- src/ccbot/bot.py | 2 +- src/ccbot/handlers/message_queue.py | 96 ++++++++-------- src/ccbot/handlers/status_polling.py | 40 +++++-- src/ccbot/session_monitor.py | 157 ++++++++++++++++++--------- 4 files changed, 187 insertions(+), 108 deletions(-) diff --git a/src/ccbot/bot.py b/src/ccbot/bot.py index f6beaa9c..bea96624 100644 --- a/src/ccbot/bot.py +++ b/src/ccbot/bot.py @@ -1430,7 +1430,7 @@ async def handle_new_message(msg: NewMessage, bot: Bot) -> None: # Mark interactive mode BEFORE sleeping so polling skips this window set_interactive_mode(user_id, wid, thread_id) # Flush pending messages (e.g. plan content) before sending interactive UI - queue = get_message_queue(user_id) + queue = get_message_queue(user_id, thread_id) if queue: await queue.join() # Wait briefly for Claude Code to render the question UI diff --git a/src/ccbot/handlers/message_queue.py b/src/ccbot/handlers/message_queue.py index b5f24971..584498ea 100644 --- a/src/ccbot/handlers/message_queue.py +++ b/src/ccbot/handlers/message_queue.py @@ -55,10 +55,11 @@ class MessageTask: image_data: list[tuple[str, bytes]] | None = None # From tool_result images -# Per-user message queues and worker tasks -_message_queues: dict[int, asyncio.Queue[MessageTask]] = {} -_queue_workers: dict[int, asyncio.Task[None]] = {} -_queue_locks: dict[int, asyncio.Lock] = {} # Protect drain/refill operations +# Per-topic message queues and worker tasks — keyed by (user_id, thread_id_or_0) +_QueueKey = tuple[int, int] +_message_queues: dict[_QueueKey, asyncio.Queue[MessageTask]] = {} +_queue_workers: dict[_QueueKey, asyncio.Task[None]] = {} +_queue_locks: dict[_QueueKey, asyncio.Lock] = {} # Protect drain/refill operations # Map (tool_use_id, user_id, thread_id_or_0) -> telegram message_id # for editing tool_use messages with results @@ -67,28 +68,32 @@ class MessageTask: # Status message tracking: (user_id, thread_id_or_0) -> (message_id, window_id, last_text) _status_msg_info: dict[tuple[int, int], tuple[int, str, str]] = {} -# Flood control: user_id -> monotonic time when ban expires -_flood_until: dict[int, float] = {} +# Flood control: (user_id, thread_id_or_0) -> monotonic time when ban expires +_flood_until: dict[_QueueKey, float] = {} # Max seconds to wait for flood control before dropping tasks FLOOD_CONTROL_MAX_WAIT = 10 -def get_message_queue(user_id: int) -> asyncio.Queue[MessageTask] | None: - """Get the message queue for a user (if exists).""" - return _message_queues.get(user_id) +def get_message_queue( + user_id: int, thread_id: int | None = None +) -> asyncio.Queue[MessageTask] | None: + """Get the message queue for a user+topic (if exists).""" + key: _QueueKey = (user_id, thread_id or 0) + return _message_queues.get(key) -def get_or_create_queue(bot: Bot, user_id: int) -> asyncio.Queue[MessageTask]: - """Get or create message queue and worker for a user.""" - if user_id not in _message_queues: - _message_queues[user_id] = asyncio.Queue() - _queue_locks[user_id] = asyncio.Lock() - # Start worker task for this user - _queue_workers[user_id] = asyncio.create_task( - _message_queue_worker(bot, user_id) - ) - return _message_queues[user_id] +def get_or_create_queue( + bot: Bot, user_id: int, thread_id: int | None = None +) -> asyncio.Queue[MessageTask]: + """Get or create message queue and worker for a user+topic.""" + key: _QueueKey = (user_id, thread_id or 0) + if key not in _message_queues: + _message_queues[key] = asyncio.Queue() + _queue_locks[key] = asyncio.Lock() + # Start worker task for this topic + _queue_workers[key] = asyncio.create_task(_message_queue_worker(bot, key)) + return _message_queues[key] def _inspect_queue(queue: asyncio.Queue[MessageTask]) -> list[MessageTask]: @@ -186,18 +191,19 @@ async def _merge_content_tasks( ) -async def _message_queue_worker(bot: Bot, user_id: int) -> None: - """Process message tasks for a user sequentially.""" - queue = _message_queues[user_id] - lock = _queue_locks[user_id] - logger.info(f"Message queue worker started for user {user_id}") +async def _message_queue_worker(bot: Bot, key: _QueueKey) -> None: + """Process message tasks for a user+topic sequentially.""" + user_id, _thread_id = key + queue = _message_queues[key] + lock = _queue_locks[key] + logger.info("Message queue worker started for %s", key) while True: try: task = await queue.get() try: # Flood control: drop status, wait for content - flood_end = _flood_until.get(user_id, 0) + flood_end = _flood_until.get(key, 0) if flood_end > 0: remaining = flood_end - time.monotonic() if remaining > 0: @@ -206,14 +212,14 @@ async def _message_queue_worker(bot: Bot, user_id: int) -> None: continue # Content is actual Claude output — wait then send logger.debug( - "Flood controlled: waiting %.0fs for content (user %d)", + "Flood controlled: waiting %.0fs for content (%s)", remaining, - user_id, + key, ) await asyncio.sleep(remaining) # Ban expired - _flood_until.pop(user_id, None) - logger.info("Flood control lifted for user %d", user_id) + _flood_until.pop(key, None) + logger.info("Flood control lifted for %s", key) if task.task_type == "content": # Try to merge consecutive content tasks @@ -221,7 +227,7 @@ async def _message_queue_worker(bot: Bot, user_id: int) -> None: queue, task, lock ) if merge_count > 0: - logger.debug(f"Merged {merge_count} tasks for user {user_id}") + logger.debug("Merged %d tasks for %s", merge_count, key) # Mark merged tasks as done for _ in range(merge_count): queue.task_done() @@ -237,29 +243,29 @@ async def _message_queue_worker(bot: Bot, user_id: int) -> None: else int(e.retry_after.total_seconds()) ) if retry_secs > FLOOD_CONTROL_MAX_WAIT: - _flood_until[user_id] = time.monotonic() + retry_secs + _flood_until[key] = time.monotonic() + retry_secs logger.warning( - "Flood control for user %d: retry_after=%ds, " + "Flood control for %s: retry_after=%ds, " "pausing queue until ban expires", - user_id, + key, retry_secs, ) else: logger.warning( - "Flood control for user %d: waiting %ds", - user_id, + "Flood control for %s: waiting %ds", + key, retry_secs, ) await asyncio.sleep(retry_secs) except Exception as e: - logger.error(f"Error processing message task for user {user_id}: {e}") + logger.error("Error processing message task for %s: %s", key, e) finally: queue.task_done() except asyncio.CancelledError: - logger.info(f"Message queue worker cancelled for user {user_id}") + logger.info("Message queue worker cancelled for %s", key) break except Exception as e: - logger.error(f"Unexpected error in queue worker for user {user_id}: {e}") + logger.error("Unexpected error in queue worker for %s: %s", key, e) def _send_kwargs(thread_id: int | None) -> dict[str, int]: @@ -569,7 +575,8 @@ async def _check_and_send_status( ) -> None: """Check terminal for status line and send status message if present.""" # Skip if there are more messages pending in the queue - queue = _message_queues.get(user_id) + key: _QueueKey = (user_id, thread_id or 0) + queue = _message_queues.get(key) if queue and not queue.empty(): return w = await tmux_manager.find_window_by_id(window_id) @@ -604,7 +611,7 @@ async def enqueue_content_message( window_id, content_type, ) - queue = get_or_create_queue(bot, user_id) + queue = get_or_create_queue(bot, user_id, thread_id) task = MessageTask( task_type="content", @@ -627,13 +634,14 @@ async def enqueue_status_update( thread_id: int | None = None, ) -> None: """Enqueue status update. Skipped if text unchanged or during flood control.""" + tid = thread_id or 0 + key: _QueueKey = (user_id, tid) + # Don't enqueue during flood control — they'd just be dropped - flood_end = _flood_until.get(user_id, 0) + flood_end = _flood_until.get(key, 0) if flood_end > time.monotonic(): return - tid = thread_id or 0 - # Deduplicate: skip if text matches what's already displayed if status_text: skey = (user_id, tid) @@ -641,7 +649,7 @@ async def enqueue_status_update( if info and info[1] == window_id and info[2] == status_text: return - queue = get_or_create_queue(bot, user_id) + queue = get_or_create_queue(bot, user_id, thread_id) if status_text: task = MessageTask( diff --git a/src/ccbot/handlers/status_polling.py b/src/ccbot/handlers/status_polling.py index 143b8ba3..e6e10b40 100644 --- a/src/ccbot/handlers/status_polling.py +++ b/src/ccbot/handlers/status_polling.py @@ -25,7 +25,7 @@ from ..session import session_manager from ..terminal_parser import is_interactive_ui, parse_status_line -from ..tmux_manager import tmux_manager +from ..tmux_manager import TmuxWindow, tmux_manager from .interactive_ui import ( clear_interactive_msg, get_interactive_window, @@ -48,13 +48,17 @@ async def update_status_message( user_id: int, window_id: str, thread_id: int | None = None, + window: TmuxWindow | None = None, ) -> None: """Poll terminal and enqueue status update for user's active window. Also detects permission prompt UIs (not triggered via JSONL) and enters interactive mode when found. + + Args: + window: Pre-resolved TmuxWindow to skip redundant list_windows call. """ - w = await tmux_manager.find_window_by_id(window_id) + w = window or await tmux_manager.find_window_by_id(window_id) if not w: # Window gone, enqueue clear await enqueue_status_update(bot, user_id, window_id, None, thread_id=thread_id) @@ -107,6 +111,10 @@ async def status_poll_loop(bot: Bot) -> None: last_topic_check = 0.0 while True: try: + # Single list_windows call per cycle, build lookup map + all_windows = await tmux_manager.list_windows() + window_map: dict[str, TmuxWindow] = {w.window_id: w for w in all_windows} + # Periodic topic existence probe now = time.monotonic() if now - last_topic_check >= TOPIC_CHECK_INTERVAL: @@ -122,7 +130,7 @@ async def status_poll_loop(bot: Bot) -> None: except BadRequest as e: if "Topic_id_invalid" in str(e): # Topic deleted — kill window, unbind, and clean up state - w = await tmux_manager.find_window_by_id(wid) + w = window_map.get(wid) if w: await tmux_manager.kill_window(w.window_id) session_manager.unbind_thread(user_id, thread_id) @@ -147,10 +155,12 @@ async def status_poll_loop(bot: Bot) -> None: e, ) + # Collect status update coroutines to run in parallel + status_coros = [] for user_id, thread_id, wid in list(session_manager.iter_thread_bindings()): try: # Clean up stale bindings (window no longer exists) - w = await tmux_manager.find_window_by_id(wid) + w = window_map.get(wid) if not w: session_manager.unbind_thread(user_id, thread_id) await clear_topic_state(user_id, thread_id, bot) @@ -162,20 +172,30 @@ async def status_poll_loop(bot: Bot) -> None: ) continue - queue = get_message_queue(user_id) + queue = get_message_queue(user_id, thread_id) if queue and not queue.empty(): continue - await update_status_message( - bot, - user_id, - wid, - thread_id=thread_id, + status_coros.append( + update_status_message( + bot, + user_id, + wid, + thread_id=thread_id, + window=w, + ) ) except Exception as e: logger.debug( f"Status update error for user {user_id} " f"thread {thread_id}: {e}" ) + + # Parallelize independent pane captures across windows + if status_coros: + results = await asyncio.gather(*status_coros, return_exceptions=True) + for result in results: + if isinstance(result, Exception): + logger.debug("Status update error: %s", result) except Exception as e: logger.error(f"Status poll loop error: {e}") diff --git a/src/ccbot/session_monitor.py b/src/ccbot/session_monitor.py index 0a1b3186..44f774bc 100644 --- a/src/ccbot/session_monitor.py +++ b/src/ccbot/session_monitor.py @@ -77,6 +77,8 @@ def __init__( self._running = False self._task: asyncio.Task | None = None self._message_callback: Callable[[NewMessage], Awaitable[None]] | None = None + # Fire-and-forget callback tasks (one per session group per poll cycle) + self._callback_tasks: set[asyncio.Task[None]] = set() # Per-session pending tool_use state carried across poll cycles self._pending_tools: dict[str, dict[str, Any]] = {} # session_id -> pending # Track last known session_map for detecting changes @@ -269,18 +271,22 @@ async def _read_new_lines( async def check_for_updates(self, active_session_ids: set[str]) -> list[NewMessage]: """Check all sessions for new assistant messages. - Reads from last byte offset. Emits both intermediate - (stop_reason=null) and complete messages. + Uses a collect → read → parse pipeline: + 1. Collect: identify sessions that need reading (mtime/size changed) + 2. Read: parallel async file reads via asyncio.gather + 3. Parse: sequential per-session parsing (safe — _pending_tools keyed by session) Args: active_session_ids: Set of session IDs currently in session_map """ - new_messages = [] + new_messages: list[NewMessage] = [] # Scan projects to get available session files sessions = await self.scan_projects() - # Only process sessions that are in session_map + # Phase 1: Collect — identify sessions needing reads + to_read: list[tuple[SessionInfo, TrackedSession, float]] = [] + for session_info in sessions: if session_info.session_id not in active_session_ids: continue @@ -319,55 +325,77 @@ async def check_for_updates(self, active_session_ids: set[str]) -> list[NewMessa current_mtime <= last_mtime and current_size <= tracked.last_byte_offset ): - # File hasn't changed, skip reading continue - # File changed, read new content from last offset - new_entries = await self._read_new_lines( - tracked, session_info.file_path + to_read.append((session_info, tracked, current_mtime)) + + except OSError as e: + logger.debug(f"Error collecting session {session_info.session_id}: {e}") + + if not to_read: + self.state.save_if_dirty() + return new_messages + + # Phase 2: Read — parallel file reads + read_results: list[list[dict[str, Any]] | BaseException] = await asyncio.gather( + *( + self._read_new_lines(tracked, si.file_path) + for si, tracked, _mtime in to_read + ), + return_exceptions=True, + ) + + # Phase 3: Parse — sequential per session + for (session_info, tracked, current_mtime), result in zip( + to_read, read_results, strict=True + ): + if isinstance(result, BaseException): + logger.debug( + "Error reading session %s: %s", session_info.session_id, result ) - self._file_mtimes[session_info.session_id] = current_mtime + continue - if new_entries: - logger.debug( - f"Read {len(new_entries)} new entries for " - f"session {session_info.session_id}" - ) + new_entries: list[dict[str, Any]] = result + self._file_mtimes[session_info.session_id] = current_mtime - # Parse new entries using the shared logic, carrying over pending tools - carry = self._pending_tools.get(session_info.session_id, {}) - parsed_entries, remaining = TranscriptParser.parse_entries( - new_entries, - pending_tools=carry, + if new_entries: + logger.debug( + "Read %d new entries for session %s", + len(new_entries), + session_info.session_id, ) - if remaining: - self._pending_tools[session_info.session_id] = remaining - else: - self._pending_tools.pop(session_info.session_id, None) - for entry in parsed_entries: - if not entry.text and not entry.image_data: - continue - # Skip user messages unless show_user_messages is enabled - if entry.role == "user" and not config.show_user_messages: - continue - new_messages.append( - NewMessage( - session_id=session_info.session_id, - text=entry.text, - is_complete=True, - content_type=entry.content_type, - tool_use_id=entry.tool_use_id, - role=entry.role, - tool_name=entry.tool_name, - image_data=entry.image_data, - ) - ) + # Parse new entries using the shared logic, carrying over pending tools + carry = self._pending_tools.get(session_info.session_id, {}) + parsed_entries, remaining = TranscriptParser.parse_entries( + new_entries, + pending_tools=carry, + ) + if remaining: + self._pending_tools[session_info.session_id] = remaining + else: + self._pending_tools.pop(session_info.session_id, None) - self.state.update_session(tracked) + for entry in parsed_entries: + if not entry.text and not entry.image_data: + continue + # Skip user messages unless show_user_messages is enabled + if entry.role == "user" and not config.show_user_messages: + continue + new_messages.append( + NewMessage( + session_id=session_info.session_id, + text=entry.text, + is_complete=True, + content_type=entry.content_type, + tool_use_id=entry.tool_use_id, + role=entry.role, + tool_name=entry.tool_name, + image_data=entry.image_data, + ) + ) - except OSError as e: - logger.debug(f"Error processing session {session_info.session_id}: {e}") + self.state.update_session(tracked) self.state.save_if_dirty() return new_messages @@ -466,6 +494,17 @@ async def _detect_and_cleanup_changes(self) -> dict[str, str]: return current_map + async def _dispatch_session_messages( + self, session_id: str, messages: list[NewMessage] + ) -> None: + """Dispatch messages for one session sequentially (fire-and-forget task).""" + for msg in messages: + try: + if self._message_callback: + await self._message_callback(msg) + except Exception as e: + logger.error("Message callback error (session %s): %s", session_id, e) + async def _monitor_loop(self) -> None: """Background loop for checking session updates. @@ -493,15 +532,23 @@ async def _monitor_loop(self) -> None: # Check for new messages (all I/O is async) new_messages = await self.check_for_updates(active_session_ids) - for msg in new_messages: - status = "complete" if msg.is_complete else "streaming" - preview = msg.text[:80] + ("..." if len(msg.text) > 80 else "") - logger.info("[%s] session=%s: %s", status, msg.session_id, preview) - if self._message_callback: - try: - await self._message_callback(msg) - except Exception as e: - logger.error(f"Message callback error: {e}") + if new_messages and self._message_callback: + # Group messages by session_id for concurrent dispatch + groups: dict[str, list[NewMessage]] = {} + for msg in new_messages: + status = "complete" if msg.is_complete else "streaming" + preview = msg.text[:80] + ("..." if len(msg.text) > 80 else "") + logger.info( + "[%s] session=%s: %s", status, msg.session_id, preview + ) + groups.setdefault(msg.session_id, []).append(msg) + + for session_id, msgs in groups.items(): + task = asyncio.create_task( + self._dispatch_session_messages(session_id, msgs) + ) + self._callback_tasks.add(task) + task.add_done_callback(self._callback_tasks.discard) except Exception as e: logger.error(f"Monitor loop error: {e}") @@ -522,5 +569,9 @@ def stop(self) -> None: if self._task: self._task.cancel() self._task = None + # Cancel outstanding fire-and-forget callback tasks + for task in list(self._callback_tasks): + task.cancel() + self._callback_tasks.clear() self.state.save() logger.info("Session monitor stopped and state saved") From bfa9a43b49c04324049716744c05852268c3a55a Mon Sep 17 00:00:00 2001 From: LikunYDev Date: Sat, 21 Feb 2026 16:30:14 +0000 Subject: [PATCH 04/15] fix: reduce Telegram API call rate to prevent 429 flood control Per-group-chat processing lock serializes sends across topic workers, typing indicators throttled to 1/4s, redundant _check_and_send_status removed (status polling loop already covers this), and group rate limiter buckets pre-filled on restart to account for Telegram's persisted server-side counter. Co-Authored-By: Claude Opus 4.6 --- src/ccbot/bot.py | 19 +++++ src/ccbot/handlers/message_queue.py | 124 ++++++++++++++-------------- 2 files changed, 79 insertions(+), 64 deletions(-) diff --git a/src/ccbot/bot.py b/src/ccbot/bot.py index bea96624..63c9e8b6 100644 --- a/src/ccbot/bot.py +++ b/src/ccbot/bot.py @@ -1526,6 +1526,25 @@ async def post_init(application: Application) -> None: if rate_limiter and rate_limiter._base_limiter: rate_limiter._base_limiter._level = rate_limiter._base_limiter.max_rate logger.info("Pre-filled global rate limiter bucket") + # Also pre-fill per-group limiters for known chat IDs. + # Without this, the group limiter allows a burst of 20 requests on restart, + # which can exceed Telegram's persisted server-side per-group counter. + if hasattr(rate_limiter, "_group_limiters"): + from aiolimiter import AsyncLimiter + + group_rate = getattr(rate_limiter, "_group_max_rate", 20) + group_period = getattr(rate_limiter, "_group_time_period", 60) + seen_chat_ids: set[int] = set() + for chat_id in session_manager.group_chat_ids.values(): + if chat_id < 0 and chat_id not in seen_chat_ids: + seen_chat_ids.add(chat_id) + limiter = AsyncLimiter(group_rate, group_period) + limiter._level = limiter.max_rate + rate_limiter._group_limiters[chat_id] = limiter + if seen_chat_ids: + logger.info( + "Pre-filled %d group rate limiter bucket(s)", len(seen_chat_ids) + ) monitor = SessionMonitor() diff --git a/src/ccbot/handlers/message_queue.py b/src/ccbot/handlers/message_queue.py index 584498ea..37f2dfba 100644 --- a/src/ccbot/handlers/message_queue.py +++ b/src/ccbot/handlers/message_queue.py @@ -30,8 +30,6 @@ from ..markdown_v2 import convert_markdown from ..session import session_manager from ..transcript_parser import TranscriptParser -from ..terminal_parser import parse_status_line -from ..tmux_manager import tmux_manager from .message_sender import NO_LINK_PREVIEW, send_photo, send_with_fallback logger = logging.getLogger(__name__) @@ -74,6 +72,16 @@ class MessageTask: # Max seconds to wait for flood control before dropping tasks FLOOD_CONTROL_MAX_WAIT = 10 +# Per-group-chat processing lock — serializes API calls across topic workers +# sending to the same Telegram group to prevent rate limit bursts +_group_process_locks: dict[int, asyncio.Lock] = {} + +# Typing indicator throttle: (user_id, thread_id_or_0) -> monotonic time of last send +_last_typing: dict[tuple[int, int], float] = {} + +# Minimum interval between typing indicators to same topic (seconds) +TYPING_MIN_INTERVAL = 4.0 + def get_message_queue( user_id: int, thread_id: int | None = None @@ -196,6 +204,14 @@ async def _message_queue_worker(bot: Bot, key: _QueueKey) -> None: user_id, _thread_id = key queue = _message_queues[key] lock = _queue_locks[key] + + # Shared per-group-chat lock — serializes sends across topic workers + # to stay within Telegram's per-group rate limit + chat_id = session_manager.resolve_chat_id(user_id, _thread_id or None) + if chat_id not in _group_process_locks: + _group_process_locks[chat_id] = asyncio.Lock() + group_lock = _group_process_locks[chat_id] + logger.info("Message queue worker started for %s", key) while True: @@ -221,21 +237,24 @@ async def _message_queue_worker(bot: Bot, key: _QueueKey) -> None: _flood_until.pop(key, None) logger.info("Flood control lifted for %s", key) - if task.task_type == "content": - # Try to merge consecutive content tasks - merged_task, merge_count = await _merge_content_tasks( - queue, task, lock - ) - if merge_count > 0: - logger.debug("Merged %d tasks for %s", merge_count, key) - # Mark merged tasks as done - for _ in range(merge_count): - queue.task_done() - await _process_content_task(bot, user_id, merged_task) - elif task.task_type == "status_update": - await _process_status_update_task(bot, user_id, task) - elif task.task_type == "status_clear": - await _do_clear_status_message(bot, user_id, task.thread_id or 0) + async with group_lock: + if task.task_type == "content": + # Try to merge consecutive content tasks + merged_task, merge_count = await _merge_content_tasks( + queue, task, lock + ) + if merge_count > 0: + logger.debug("Merged %d tasks for %s", merge_count, key) + # Mark merged tasks as done + for _ in range(merge_count): + queue.task_done() + await _process_content_task(bot, user_id, merged_task) + elif task.task_type == "status_update": + await _process_status_update_task(bot, user_id, task) + elif task.task_type == "status_clear": + await _do_clear_status_message( + bot, user_id, task.thread_id or 0 + ) except RetryAfter as e: retry_secs = ( e.retry_after @@ -316,7 +335,6 @@ async def _process_content_task(bot: Bot, user_id: int, task: MessageTask) -> No link_preview_options=NO_LINK_PREVIEW, ) await _send_task_images(bot, chat_id, task) - await _check_and_send_status(bot, user_id, wid, task.thread_id) return except RetryAfter: raise @@ -335,7 +353,6 @@ async def _process_content_task(bot: Bot, user_id: int, task: MessageTask) -> No link_preview_options=NO_LINK_PREVIEW, ) await _send_task_images(bot, chat_id, task) - await _check_and_send_status(bot, user_id, wid, task.thread_id) return except RetryAfter: raise @@ -380,9 +397,6 @@ async def _process_content_task(bot: Bot, user_id: int, task: MessageTask) -> No # 4. Send images if present (from tool_result with base64 image blocks) await _send_task_images(bot, chat_id, task) - # 5. After content, check and send status - await _check_and_send_status(bot, user_id, wid, task.thread_id) - async def _convert_status_to_content( bot: Bot, @@ -472,16 +486,19 @@ async def _process_status_update_task( return else: # Same window, text changed - edit in place - # Send typing indicator when Claude is working + # Send typing indicator when Claude is working (throttled) if "esc to interrupt" in status_text.lower(): - try: - await bot.send_chat_action( - chat_id=chat_id, action=ChatAction.TYPING - ) - except RetryAfter: - raise - except Exception: - pass + now = time.monotonic() + if now - _last_typing.get(skey, 0) >= TYPING_MIN_INTERVAL: + try: + await bot.send_chat_action( + chat_id=chat_id, action=ChatAction.TYPING + ) + _last_typing[skey] = now + except RetryAfter: + raise + except Exception: + pass try: await bot.edit_message_text( chat_id=chat_id, @@ -532,14 +549,17 @@ async def _do_send_status_message( await bot.delete_message(chat_id=chat_id, message_id=old[0]) except Exception: pass - # Send typing indicator when Claude is working + # Send typing indicator when Claude is working (throttled) if "esc to interrupt" in text.lower(): - try: - await bot.send_chat_action(chat_id=chat_id, action=ChatAction.TYPING) - except RetryAfter: - raise - except Exception: - pass + now = time.monotonic() + if now - _last_typing.get(skey, 0) >= TYPING_MIN_INTERVAL: + try: + await bot.send_chat_action(chat_id=chat_id, action=ChatAction.TYPING) + _last_typing[skey] = now + except RetryAfter: + raise + except Exception: + pass sent = await send_with_fallback( bot, chat_id, @@ -567,32 +587,6 @@ async def _do_clear_status_message( logger.debug(f"Failed to delete status message {msg_id}: {e}") -async def _check_and_send_status( - bot: Bot, - user_id: int, - window_id: str, - thread_id: int | None = None, -) -> None: - """Check terminal for status line and send status message if present.""" - # Skip if there are more messages pending in the queue - key: _QueueKey = (user_id, thread_id or 0) - queue = _message_queues.get(key) - if queue and not queue.empty(): - return - w = await tmux_manager.find_window_by_id(window_id) - if not w: - return - - pane_text = await tmux_manager.capture_pane(w.window_id) - if not pane_text: - return - - tid = thread_id or 0 - status_line = parse_status_line(pane_text) - if status_line: - await _do_send_status_message(bot, user_id, tid, window_id, status_line) - - async def enqueue_content_message( bot: Bot, user_id: int, @@ -695,4 +689,6 @@ async def shutdown_workers() -> None: _queue_workers.clear() _message_queues.clear() _queue_locks.clear() + _group_process_locks.clear() + _last_typing.clear() logger.info("Message queue workers stopped") From d80862b686d79f150f8f0ff94fc7315b656218a8 Mon Sep 17 00:00:00 2001 From: LikunYDev Date: Sat, 21 Feb 2026 16:33:09 +0000 Subject: [PATCH 05/15] Revert "fix: resolve multi-session concurrency bottlenecks" This reverts commit 17ff3ee6fdb7f71d61181a8ff8d8a30fa3831c53. --- src/ccbot/bot.py | 2 +- src/ccbot/handlers/message_queue.py | 96 ++++++++-------- src/ccbot/handlers/status_polling.py | 40 ++----- src/ccbot/session_monitor.py | 157 +++++++++------------------ 4 files changed, 108 insertions(+), 187 deletions(-) diff --git a/src/ccbot/bot.py b/src/ccbot/bot.py index bea96624..f6beaa9c 100644 --- a/src/ccbot/bot.py +++ b/src/ccbot/bot.py @@ -1430,7 +1430,7 @@ async def handle_new_message(msg: NewMessage, bot: Bot) -> None: # Mark interactive mode BEFORE sleeping so polling skips this window set_interactive_mode(user_id, wid, thread_id) # Flush pending messages (e.g. plan content) before sending interactive UI - queue = get_message_queue(user_id, thread_id) + queue = get_message_queue(user_id) if queue: await queue.join() # Wait briefly for Claude Code to render the question UI diff --git a/src/ccbot/handlers/message_queue.py b/src/ccbot/handlers/message_queue.py index 584498ea..b5f24971 100644 --- a/src/ccbot/handlers/message_queue.py +++ b/src/ccbot/handlers/message_queue.py @@ -55,11 +55,10 @@ class MessageTask: image_data: list[tuple[str, bytes]] | None = None # From tool_result images -# Per-topic message queues and worker tasks — keyed by (user_id, thread_id_or_0) -_QueueKey = tuple[int, int] -_message_queues: dict[_QueueKey, asyncio.Queue[MessageTask]] = {} -_queue_workers: dict[_QueueKey, asyncio.Task[None]] = {} -_queue_locks: dict[_QueueKey, asyncio.Lock] = {} # Protect drain/refill operations +# Per-user message queues and worker tasks +_message_queues: dict[int, asyncio.Queue[MessageTask]] = {} +_queue_workers: dict[int, asyncio.Task[None]] = {} +_queue_locks: dict[int, asyncio.Lock] = {} # Protect drain/refill operations # Map (tool_use_id, user_id, thread_id_or_0) -> telegram message_id # for editing tool_use messages with results @@ -68,32 +67,28 @@ class MessageTask: # Status message tracking: (user_id, thread_id_or_0) -> (message_id, window_id, last_text) _status_msg_info: dict[tuple[int, int], tuple[int, str, str]] = {} -# Flood control: (user_id, thread_id_or_0) -> monotonic time when ban expires -_flood_until: dict[_QueueKey, float] = {} +# Flood control: user_id -> monotonic time when ban expires +_flood_until: dict[int, float] = {} # Max seconds to wait for flood control before dropping tasks FLOOD_CONTROL_MAX_WAIT = 10 -def get_message_queue( - user_id: int, thread_id: int | None = None -) -> asyncio.Queue[MessageTask] | None: - """Get the message queue for a user+topic (if exists).""" - key: _QueueKey = (user_id, thread_id or 0) - return _message_queues.get(key) +def get_message_queue(user_id: int) -> asyncio.Queue[MessageTask] | None: + """Get the message queue for a user (if exists).""" + return _message_queues.get(user_id) -def get_or_create_queue( - bot: Bot, user_id: int, thread_id: int | None = None -) -> asyncio.Queue[MessageTask]: - """Get or create message queue and worker for a user+topic.""" - key: _QueueKey = (user_id, thread_id or 0) - if key not in _message_queues: - _message_queues[key] = asyncio.Queue() - _queue_locks[key] = asyncio.Lock() - # Start worker task for this topic - _queue_workers[key] = asyncio.create_task(_message_queue_worker(bot, key)) - return _message_queues[key] +def get_or_create_queue(bot: Bot, user_id: int) -> asyncio.Queue[MessageTask]: + """Get or create message queue and worker for a user.""" + if user_id not in _message_queues: + _message_queues[user_id] = asyncio.Queue() + _queue_locks[user_id] = asyncio.Lock() + # Start worker task for this user + _queue_workers[user_id] = asyncio.create_task( + _message_queue_worker(bot, user_id) + ) + return _message_queues[user_id] def _inspect_queue(queue: asyncio.Queue[MessageTask]) -> list[MessageTask]: @@ -191,19 +186,18 @@ async def _merge_content_tasks( ) -async def _message_queue_worker(bot: Bot, key: _QueueKey) -> None: - """Process message tasks for a user+topic sequentially.""" - user_id, _thread_id = key - queue = _message_queues[key] - lock = _queue_locks[key] - logger.info("Message queue worker started for %s", key) +async def _message_queue_worker(bot: Bot, user_id: int) -> None: + """Process message tasks for a user sequentially.""" + queue = _message_queues[user_id] + lock = _queue_locks[user_id] + logger.info(f"Message queue worker started for user {user_id}") while True: try: task = await queue.get() try: # Flood control: drop status, wait for content - flood_end = _flood_until.get(key, 0) + flood_end = _flood_until.get(user_id, 0) if flood_end > 0: remaining = flood_end - time.monotonic() if remaining > 0: @@ -212,14 +206,14 @@ async def _message_queue_worker(bot: Bot, key: _QueueKey) -> None: continue # Content is actual Claude output — wait then send logger.debug( - "Flood controlled: waiting %.0fs for content (%s)", + "Flood controlled: waiting %.0fs for content (user %d)", remaining, - key, + user_id, ) await asyncio.sleep(remaining) # Ban expired - _flood_until.pop(key, None) - logger.info("Flood control lifted for %s", key) + _flood_until.pop(user_id, None) + logger.info("Flood control lifted for user %d", user_id) if task.task_type == "content": # Try to merge consecutive content tasks @@ -227,7 +221,7 @@ async def _message_queue_worker(bot: Bot, key: _QueueKey) -> None: queue, task, lock ) if merge_count > 0: - logger.debug("Merged %d tasks for %s", merge_count, key) + logger.debug(f"Merged {merge_count} tasks for user {user_id}") # Mark merged tasks as done for _ in range(merge_count): queue.task_done() @@ -243,29 +237,29 @@ async def _message_queue_worker(bot: Bot, key: _QueueKey) -> None: else int(e.retry_after.total_seconds()) ) if retry_secs > FLOOD_CONTROL_MAX_WAIT: - _flood_until[key] = time.monotonic() + retry_secs + _flood_until[user_id] = time.monotonic() + retry_secs logger.warning( - "Flood control for %s: retry_after=%ds, " + "Flood control for user %d: retry_after=%ds, " "pausing queue until ban expires", - key, + user_id, retry_secs, ) else: logger.warning( - "Flood control for %s: waiting %ds", - key, + "Flood control for user %d: waiting %ds", + user_id, retry_secs, ) await asyncio.sleep(retry_secs) except Exception as e: - logger.error("Error processing message task for %s: %s", key, e) + logger.error(f"Error processing message task for user {user_id}: {e}") finally: queue.task_done() except asyncio.CancelledError: - logger.info("Message queue worker cancelled for %s", key) + logger.info(f"Message queue worker cancelled for user {user_id}") break except Exception as e: - logger.error("Unexpected error in queue worker for %s: %s", key, e) + logger.error(f"Unexpected error in queue worker for user {user_id}: {e}") def _send_kwargs(thread_id: int | None) -> dict[str, int]: @@ -575,8 +569,7 @@ async def _check_and_send_status( ) -> None: """Check terminal for status line and send status message if present.""" # Skip if there are more messages pending in the queue - key: _QueueKey = (user_id, thread_id or 0) - queue = _message_queues.get(key) + queue = _message_queues.get(user_id) if queue and not queue.empty(): return w = await tmux_manager.find_window_by_id(window_id) @@ -611,7 +604,7 @@ async def enqueue_content_message( window_id, content_type, ) - queue = get_or_create_queue(bot, user_id, thread_id) + queue = get_or_create_queue(bot, user_id) task = MessageTask( task_type="content", @@ -634,14 +627,13 @@ async def enqueue_status_update( thread_id: int | None = None, ) -> None: """Enqueue status update. Skipped if text unchanged or during flood control.""" - tid = thread_id or 0 - key: _QueueKey = (user_id, tid) - # Don't enqueue during flood control — they'd just be dropped - flood_end = _flood_until.get(key, 0) + flood_end = _flood_until.get(user_id, 0) if flood_end > time.monotonic(): return + tid = thread_id or 0 + # Deduplicate: skip if text matches what's already displayed if status_text: skey = (user_id, tid) @@ -649,7 +641,7 @@ async def enqueue_status_update( if info and info[1] == window_id and info[2] == status_text: return - queue = get_or_create_queue(bot, user_id, thread_id) + queue = get_or_create_queue(bot, user_id) if status_text: task = MessageTask( diff --git a/src/ccbot/handlers/status_polling.py b/src/ccbot/handlers/status_polling.py index e6e10b40..143b8ba3 100644 --- a/src/ccbot/handlers/status_polling.py +++ b/src/ccbot/handlers/status_polling.py @@ -25,7 +25,7 @@ from ..session import session_manager from ..terminal_parser import is_interactive_ui, parse_status_line -from ..tmux_manager import TmuxWindow, tmux_manager +from ..tmux_manager import tmux_manager from .interactive_ui import ( clear_interactive_msg, get_interactive_window, @@ -48,17 +48,13 @@ async def update_status_message( user_id: int, window_id: str, thread_id: int | None = None, - window: TmuxWindow | None = None, ) -> None: """Poll terminal and enqueue status update for user's active window. Also detects permission prompt UIs (not triggered via JSONL) and enters interactive mode when found. - - Args: - window: Pre-resolved TmuxWindow to skip redundant list_windows call. """ - w = window or await tmux_manager.find_window_by_id(window_id) + w = await tmux_manager.find_window_by_id(window_id) if not w: # Window gone, enqueue clear await enqueue_status_update(bot, user_id, window_id, None, thread_id=thread_id) @@ -111,10 +107,6 @@ async def status_poll_loop(bot: Bot) -> None: last_topic_check = 0.0 while True: try: - # Single list_windows call per cycle, build lookup map - all_windows = await tmux_manager.list_windows() - window_map: dict[str, TmuxWindow] = {w.window_id: w for w in all_windows} - # Periodic topic existence probe now = time.monotonic() if now - last_topic_check >= TOPIC_CHECK_INTERVAL: @@ -130,7 +122,7 @@ async def status_poll_loop(bot: Bot) -> None: except BadRequest as e: if "Topic_id_invalid" in str(e): # Topic deleted — kill window, unbind, and clean up state - w = window_map.get(wid) + w = await tmux_manager.find_window_by_id(wid) if w: await tmux_manager.kill_window(w.window_id) session_manager.unbind_thread(user_id, thread_id) @@ -155,12 +147,10 @@ async def status_poll_loop(bot: Bot) -> None: e, ) - # Collect status update coroutines to run in parallel - status_coros = [] for user_id, thread_id, wid in list(session_manager.iter_thread_bindings()): try: # Clean up stale bindings (window no longer exists) - w = window_map.get(wid) + w = await tmux_manager.find_window_by_id(wid) if not w: session_manager.unbind_thread(user_id, thread_id) await clear_topic_state(user_id, thread_id, bot) @@ -172,30 +162,20 @@ async def status_poll_loop(bot: Bot) -> None: ) continue - queue = get_message_queue(user_id, thread_id) + queue = get_message_queue(user_id) if queue and not queue.empty(): continue - status_coros.append( - update_status_message( - bot, - user_id, - wid, - thread_id=thread_id, - window=w, - ) + await update_status_message( + bot, + user_id, + wid, + thread_id=thread_id, ) except Exception as e: logger.debug( f"Status update error for user {user_id} " f"thread {thread_id}: {e}" ) - - # Parallelize independent pane captures across windows - if status_coros: - results = await asyncio.gather(*status_coros, return_exceptions=True) - for result in results: - if isinstance(result, Exception): - logger.debug("Status update error: %s", result) except Exception as e: logger.error(f"Status poll loop error: {e}") diff --git a/src/ccbot/session_monitor.py b/src/ccbot/session_monitor.py index 44f774bc..0a1b3186 100644 --- a/src/ccbot/session_monitor.py +++ b/src/ccbot/session_monitor.py @@ -77,8 +77,6 @@ def __init__( self._running = False self._task: asyncio.Task | None = None self._message_callback: Callable[[NewMessage], Awaitable[None]] | None = None - # Fire-and-forget callback tasks (one per session group per poll cycle) - self._callback_tasks: set[asyncio.Task[None]] = set() # Per-session pending tool_use state carried across poll cycles self._pending_tools: dict[str, dict[str, Any]] = {} # session_id -> pending # Track last known session_map for detecting changes @@ -271,22 +269,18 @@ async def _read_new_lines( async def check_for_updates(self, active_session_ids: set[str]) -> list[NewMessage]: """Check all sessions for new assistant messages. - Uses a collect → read → parse pipeline: - 1. Collect: identify sessions that need reading (mtime/size changed) - 2. Read: parallel async file reads via asyncio.gather - 3. Parse: sequential per-session parsing (safe — _pending_tools keyed by session) + Reads from last byte offset. Emits both intermediate + (stop_reason=null) and complete messages. Args: active_session_ids: Set of session IDs currently in session_map """ - new_messages: list[NewMessage] = [] + new_messages = [] # Scan projects to get available session files sessions = await self.scan_projects() - # Phase 1: Collect — identify sessions needing reads - to_read: list[tuple[SessionInfo, TrackedSession, float]] = [] - + # Only process sessions that are in session_map for session_info in sessions: if session_info.session_id not in active_session_ids: continue @@ -325,77 +319,55 @@ async def check_for_updates(self, active_session_ids: set[str]) -> list[NewMessa current_mtime <= last_mtime and current_size <= tracked.last_byte_offset ): + # File hasn't changed, skip reading continue - to_read.append((session_info, tracked, current_mtime)) - - except OSError as e: - logger.debug(f"Error collecting session {session_info.session_id}: {e}") - - if not to_read: - self.state.save_if_dirty() - return new_messages - - # Phase 2: Read — parallel file reads - read_results: list[list[dict[str, Any]] | BaseException] = await asyncio.gather( - *( - self._read_new_lines(tracked, si.file_path) - for si, tracked, _mtime in to_read - ), - return_exceptions=True, - ) - - # Phase 3: Parse — sequential per session - for (session_info, tracked, current_mtime), result in zip( - to_read, read_results, strict=True - ): - if isinstance(result, BaseException): - logger.debug( - "Error reading session %s: %s", session_info.session_id, result + # File changed, read new content from last offset + new_entries = await self._read_new_lines( + tracked, session_info.file_path ) - continue + self._file_mtimes[session_info.session_id] = current_mtime - new_entries: list[dict[str, Any]] = result - self._file_mtimes[session_info.session_id] = current_mtime + if new_entries: + logger.debug( + f"Read {len(new_entries)} new entries for " + f"session {session_info.session_id}" + ) - if new_entries: - logger.debug( - "Read %d new entries for session %s", - len(new_entries), - session_info.session_id, + # Parse new entries using the shared logic, carrying over pending tools + carry = self._pending_tools.get(session_info.session_id, {}) + parsed_entries, remaining = TranscriptParser.parse_entries( + new_entries, + pending_tools=carry, ) + if remaining: + self._pending_tools[session_info.session_id] = remaining + else: + self._pending_tools.pop(session_info.session_id, None) - # Parse new entries using the shared logic, carrying over pending tools - carry = self._pending_tools.get(session_info.session_id, {}) - parsed_entries, remaining = TranscriptParser.parse_entries( - new_entries, - pending_tools=carry, - ) - if remaining: - self._pending_tools[session_info.session_id] = remaining - else: - self._pending_tools.pop(session_info.session_id, None) - - for entry in parsed_entries: - if not entry.text and not entry.image_data: - continue - # Skip user messages unless show_user_messages is enabled - if entry.role == "user" and not config.show_user_messages: - continue - new_messages.append( - NewMessage( - session_id=session_info.session_id, - text=entry.text, - is_complete=True, - content_type=entry.content_type, - tool_use_id=entry.tool_use_id, - role=entry.role, - tool_name=entry.tool_name, - image_data=entry.image_data, + for entry in parsed_entries: + if not entry.text and not entry.image_data: + continue + # Skip user messages unless show_user_messages is enabled + if entry.role == "user" and not config.show_user_messages: + continue + new_messages.append( + NewMessage( + session_id=session_info.session_id, + text=entry.text, + is_complete=True, + content_type=entry.content_type, + tool_use_id=entry.tool_use_id, + role=entry.role, + tool_name=entry.tool_name, + image_data=entry.image_data, + ) ) - ) - self.state.update_session(tracked) + self.state.update_session(tracked) + + except OSError as e: + logger.debug(f"Error processing session {session_info.session_id}: {e}") self.state.save_if_dirty() return new_messages @@ -494,17 +466,6 @@ async def _detect_and_cleanup_changes(self) -> dict[str, str]: return current_map - async def _dispatch_session_messages( - self, session_id: str, messages: list[NewMessage] - ) -> None: - """Dispatch messages for one session sequentially (fire-and-forget task).""" - for msg in messages: - try: - if self._message_callback: - await self._message_callback(msg) - except Exception as e: - logger.error("Message callback error (session %s): %s", session_id, e) - async def _monitor_loop(self) -> None: """Background loop for checking session updates. @@ -532,23 +493,15 @@ async def _monitor_loop(self) -> None: # Check for new messages (all I/O is async) new_messages = await self.check_for_updates(active_session_ids) - if new_messages and self._message_callback: - # Group messages by session_id for concurrent dispatch - groups: dict[str, list[NewMessage]] = {} - for msg in new_messages: - status = "complete" if msg.is_complete else "streaming" - preview = msg.text[:80] + ("..." if len(msg.text) > 80 else "") - logger.info( - "[%s] session=%s: %s", status, msg.session_id, preview - ) - groups.setdefault(msg.session_id, []).append(msg) - - for session_id, msgs in groups.items(): - task = asyncio.create_task( - self._dispatch_session_messages(session_id, msgs) - ) - self._callback_tasks.add(task) - task.add_done_callback(self._callback_tasks.discard) + for msg in new_messages: + status = "complete" if msg.is_complete else "streaming" + preview = msg.text[:80] + ("..." if len(msg.text) > 80 else "") + logger.info("[%s] session=%s: %s", status, msg.session_id, preview) + if self._message_callback: + try: + await self._message_callback(msg) + except Exception as e: + logger.error(f"Message callback error: {e}") except Exception as e: logger.error(f"Monitor loop error: {e}") @@ -569,9 +522,5 @@ def stop(self) -> None: if self._task: self._task.cancel() self._task = None - # Cancel outstanding fire-and-forget callback tasks - for task in list(self._callback_tasks): - task.cancel() - self._callback_tasks.clear() self.state.save() logger.info("Session monitor stopped and state saved") From 0a55f35fc1b087714073f937c853e638f8e38ad4 Mon Sep 17 00:00:00 2001 From: LikunYDev Date: Sat, 21 Feb 2026 16:37:59 +0000 Subject: [PATCH 06/15] fix: handle old state.json format and update restart script - _load_state now handles both old (dict with window_id+chat_id) and new (plain string) thread_bindings format, migrating chat_id to group_chat_ids automatically. - restart.sh now uses uv tool install + systemctl instead of running uv run ccbot in a tmux pane, ensuring code changes are picked up. Co-Authored-By: Claude Opus 4.6 --- scripts/restart.sh | 78 +++++++------------------------------------- src/ccbot/session.py | 19 ++++++++--- 2 files changed, 27 insertions(+), 70 deletions(-) diff --git a/scripts/restart.sh b/scripts/restart.sh index 62715a4f..9b4d4531 100755 --- a/scripts/restart.sh +++ b/scripts/restart.sh @@ -1,81 +1,27 @@ #!/usr/bin/env bash set -euo pipefail -TMUX_SESSION="ccbot" -TMUX_WINDOW="__main__" -TARGET="${TMUX_SESSION}:${TMUX_WINDOW}" PROJECT_DIR="$(cd "$(dirname "$0")/.." && pwd)" -MAX_WAIT=10 # seconds to wait for process to exit -# Check if tmux session and window exist -if ! tmux has-session -t "$TMUX_SESSION" 2>/dev/null; then - echo "Error: tmux session '$TMUX_SESSION' does not exist" - exit 1 -fi - -if ! tmux list-windows -t "$TMUX_SESSION" -F '#{window_name}' 2>/dev/null | grep -qx "$TMUX_WINDOW"; then - echo "Error: window '$TMUX_WINDOW' not found in session '$TMUX_SESSION'" - exit 1 -fi - -# Get the pane PID and check if uv run ccbot is running -PANE_PID=$(tmux list-panes -t "$TARGET" -F '#{pane_pid}') - -is_ccbot_running() { - pstree -a "$PANE_PID" 2>/dev/null | grep -q 'uv.*run ccbot\|ccbot.*\.venv/bin/ccbot' -} - -# Stop existing process if running -if is_ccbot_running; then - echo "Found running ccbot process, sending Ctrl-C..." - tmux send-keys -t "$TARGET" C-c - - # Wait for process to exit - waited=0 - while is_ccbot_running && [ "$waited" -lt "$MAX_WAIT" ]; do - sleep 1 - waited=$((waited + 1)) - echo " Waiting for process to exit... (${waited}s/${MAX_WAIT}s)" - done - - if is_ccbot_running; then - echo "Process did not exit after ${MAX_WAIT}s, sending SIGTERM..." - # Kill the uv process directly - UV_PID=$(pstree -ap "$PANE_PID" 2>/dev/null | grep -oP 'uv,\K\d+' | head -1) - if [ -n "$UV_PID" ]; then - kill "$UV_PID" 2>/dev/null || true - sleep 2 - fi - if is_ccbot_running; then - echo "Process still running, sending SIGKILL..." - kill -9 "$UV_PID" 2>/dev/null || true - sleep 1 - fi - fi - - echo "Process stopped." -else - echo "No ccbot process running in $TARGET" -fi - -# Brief pause to let the shell settle -sleep 1 +# Reinstall the package so the systemd service picks up code changes +echo "Installing ccbot from ${PROJECT_DIR}..." +uv tool install --force --reinstall "$PROJECT_DIR" 2>&1 -# Start ccbot -echo "Starting ccbot in $TARGET..." -tmux send-keys -t "$TARGET" "cd ${PROJECT_DIR} && uv run ccbot" Enter +# Restart the systemd user service +echo "Restarting ccbot service..." +systemctl --user restart ccbot -# Verify startup and show logs -sleep 3 -if is_ccbot_running; then +# Wait for startup and verify +sleep 2 +if systemctl --user is-active --quiet ccbot; then echo "ccbot restarted successfully. Recent logs:" echo "----------------------------------------" - tmux capture-pane -t "$TARGET" -p | tail -20 + journalctl --user -u ccbot --since "5 sec ago" --no-pager | tail -20 echo "----------------------------------------" else - echo "Warning: ccbot may not have started. Pane output:" + echo "Warning: ccbot service failed to start. Logs:" echo "----------------------------------------" - tmux capture-pane -t "$TARGET" -p | tail -30 + journalctl --user -u ccbot --since "10 sec ago" --no-pager | tail -30 echo "----------------------------------------" exit 1 fi diff --git a/src/ccbot/session.py b/src/ccbot/session.py index 5121e59e..cfe17ccd 100644 --- a/src/ccbot/session.py +++ b/src/ccbot/session.py @@ -150,10 +150,21 @@ def _load_state(self) -> None: int(uid): offsets for uid, offsets in state.get("user_window_offsets", {}).items() } - self.thread_bindings = { - int(uid): {int(tid): wid for tid, wid in bindings.items()} - for uid, bindings in state.get("thread_bindings", {}).items() - } + self.thread_bindings = {} + for uid, bindings in state.get("thread_bindings", {}).items(): + parsed: dict[int, str] = {} + for tid, wid in bindings.items(): + if isinstance(wid, dict): + # Old format: {"window_id": "@4", "chat_id": ...} + parsed[int(tid)] = wid["window_id"] + # Migrate chat_id to group_chat_ids + chat_id = wid.get("chat_id") + if chat_id and int(chat_id) < 0: + key = f"{uid}:{tid}" + self.group_chat_ids[key] = int(chat_id) + else: + parsed[int(tid)] = wid + self.thread_bindings[int(uid)] = parsed self.window_display_names = state.get("window_display_names", {}) self.group_chat_ids = { k: int(v) for k, v in state.get("group_chat_ids", {}).items() From 41205f9316aeb63aa38ef4d080e4a40bdcc8f6bc Mon Sep 17 00:00:00 2001 From: LikunYDev Date: Sat, 21 Feb 2026 17:30:35 +0000 Subject: [PATCH 07/15] fix: add SHOW_TOOLS config and status dedup to reduce Telegram API calls Suppress tool_use/tool_result messages when SHOW_TOOLS=false (interactive tools like AskUserQuestion/ExitPlanMode still pass through). Strip timer stats from status lines before dedup comparison so edits only fire when the action name changes, not every second. Co-Authored-By: Claude Opus 4.6 --- src/ccbot/config.py | 16 ++++++++++++++++ src/ccbot/handlers/history.py | 10 ++++++++++ src/ccbot/handlers/message_queue.py | 23 +++++++++++++++++++---- src/ccbot/session_monitor.py | 15 +++++++++++++++ 4 files changed, 60 insertions(+), 4 deletions(-) diff --git a/src/ccbot/config.py b/src/ccbot/config.py index deab8786..bf4117c8 100644 --- a/src/ccbot/config.py +++ b/src/ccbot/config.py @@ -74,6 +74,22 @@ def __init__(self) -> None: # When True, user messages are shown with a 👤 prefix self.show_user_messages = True + # Display thinking messages in real-time and history + # Set SHOW_THINKING=false to skip thinking blocks (reduces Telegram API calls) + self.show_thinking = os.getenv("SHOW_THINKING", "true").lower() in ( + "true", + "1", + "yes", + ) + + # Display tool_use/tool_result messages in real-time and history + # Set SHOW_TOOLS=false to skip tool messages (reduces Telegram API calls) + self.show_tools = os.getenv("SHOW_TOOLS", "true").lower() in ( + "true", + "1", + "yes", + ) + logger.debug( "Config initialized: dir=%s, token=%s..., allowed_users=%d, " "tmux_session=%s", diff --git a/src/ccbot/handlers/history.py b/src/ccbot/handlers/history.py index 9094107e..f5203e33 100644 --- a/src/ccbot/handlers/history.py +++ b/src/ccbot/handlers/history.py @@ -160,6 +160,16 @@ async def send_history( lines = [header] for msg in messages: + # Skip thinking messages unless show_thinking is enabled + if msg.get("content_type") == "thinking" and not config.show_thinking: + continue + # Skip tool messages unless show_tools is enabled + if ( + msg.get("content_type") in ("tool_use", "tool_result") + and not config.show_tools + ): + continue + # Format timestamp as HH:MM ts = msg.get("timestamp") if ts: diff --git a/src/ccbot/handlers/message_queue.py b/src/ccbot/handlers/message_queue.py index 37f2dfba..43c914ef 100644 --- a/src/ccbot/handlers/message_queue.py +++ b/src/ccbot/handlers/message_queue.py @@ -19,6 +19,7 @@ import asyncio import logging +import re import time from dataclasses import dataclass, field from typing import Literal @@ -83,6 +84,16 @@ class MessageTask: TYPING_MIN_INTERVAL = 4.0 +# Regex to strip the stats parenthetical from status lines, e.g.: +# "Unravelling… (45s · ↓ 2.5k tokens · thought for 25s)" → "Unravelling…" +_STATUS_STATS_RE = re.compile(r"\s*\([\d]+s\s*·.*\)\s*$") + + +def _strip_status_stats(text: str) -> str: + """Strip the timing/stats parenthetical from a status line for dedup.""" + return _STATUS_STATS_RE.sub("", text) + + def get_message_queue( user_id: int, thread_id: int | None = None ) -> asyncio.Queue[MessageTask] | None: @@ -481,8 +492,8 @@ async def _process_status_update_task( # Window changed - delete old and send new await _do_clear_status_message(bot, user_id, tid) await _do_send_status_message(bot, user_id, tid, wid, status_text) - elif status_text == last_text: - # Same content, skip edit + elif _strip_status_stats(status_text) == _strip_status_stats(last_text): + # Same action (ignoring timer changes), skip edit return else: # Same window, text changed - edit in place @@ -636,11 +647,15 @@ async def enqueue_status_update( if flood_end > time.monotonic(): return - # Deduplicate: skip if text matches what's already displayed + # Deduplicate: skip if action text matches (ignoring timer/stats changes) if status_text: skey = (user_id, tid) info = _status_msg_info.get(skey) - if info and info[1] == window_id and info[2] == status_text: + if ( + info + and info[1] == window_id + and _strip_status_stats(info[2]) == _strip_status_stats(status_text) + ): return queue = get_or_create_queue(bot, user_id, thread_id) diff --git a/src/ccbot/session_monitor.py b/src/ccbot/session_monitor.py index 44f774bc..ef685560 100644 --- a/src/ccbot/session_monitor.py +++ b/src/ccbot/session_monitor.py @@ -21,6 +21,7 @@ import aiofiles from .config import config +from .handlers.interactive_ui import INTERACTIVE_TOOL_NAMES from .monitor_state import MonitorState, TrackedSession from .tmux_manager import tmux_manager from .transcript_parser import TranscriptParser @@ -382,6 +383,20 @@ async def check_for_updates(self, active_session_ids: set[str]) -> list[NewMessa # Skip user messages unless show_user_messages is enabled if entry.role == "user" and not config.show_user_messages: continue + # Skip thinking messages unless show_thinking is enabled + if entry.content_type == "thinking" and not config.show_thinking: + continue + # Skip tool messages unless show_tools is enabled + # Exception: interactive tools (AskUserQuestion, ExitPlanMode) must pass through + if ( + entry.content_type in ("tool_use", "tool_result") + and not config.show_tools + ): + if not ( + entry.content_type == "tool_use" + and entry.tool_name in INTERACTIVE_TOOL_NAMES + ): + continue new_messages.append( NewMessage( session_id=session_info.session_id, From faf286d7a8ddc5556289fbae83f5a24b3680b8c0 Mon Sep 17 00:00:00 2001 From: LikunYDev Date: Sat, 28 Feb 2026 03:33:33 +0000 Subject: [PATCH 08/15] fix: support numbered ExitPlanMode selector and defer JSONL-based interactive UIs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add fallback UIPattern for numbered selector format (❯ 1. Yes / 2. No) that Claude Code now uses for ExitPlanMode - Defer ExitPlanMode/AskUserQuestion to JSONL queue path when session exists, ensuring content messages arrive before the interactive UI - Improve _STATUS_STATS_RE to handle minutes format and trailing text (e.g. "Working… (2m 9s · …) Esc to interrupt") - Add comprehensive tests for new detection and routing logic Co-Authored-By: Claude Opus 4.6 --- tests/ccbot/conftest.py | 24 +++++ tests/ccbot/handlers/test_interactive_ui.py | 54 ++++++++++ tests/ccbot/handlers/test_message_queue.py | 50 ++++++++++ tests/ccbot/handlers/test_status_polling.py | 105 ++++++++++++++++++++ tests/ccbot/test_terminal_parser.py | 17 ++++ 5 files changed, 250 insertions(+) create mode 100644 tests/ccbot/handlers/test_message_queue.py diff --git a/tests/ccbot/conftest.py b/tests/ccbot/conftest.py index f274f9c6..9b8bb2f1 100644 --- a/tests/ccbot/conftest.py +++ b/tests/ccbot/conftest.py @@ -163,6 +163,30 @@ def sample_pane_settings(): ) +@pytest.fixture +def sample_pane_exit_plan_numbered(): + """Realistic pane showing numbered ExitPlanMode selector (current format). + + The old markers (Would you like to proceed?, ctrl-g to edit in) are NOT + present — only the ❯ 1. Yes / 2. No selector with plan context above. + """ + return ( + " I've written a plan to the plan file.\n" + "\n" + " Here's a summary of what I'll do:\n" + " 1. Update the config parser\n" + " 2. Add validation logic\n" + "\n" + " ❯ 1. Yes\n" + " 2. No\n" + "\n" + "──────────────────────────────────────\n" + "❯ \n" + "──────────────────────────────────────\n" + " [Opus 4.6] Context: 34%\n" + ) + + @pytest.fixture def sample_pane_no_ui(): return "$ echo hello\nhello\n$\n" diff --git a/tests/ccbot/handlers/test_interactive_ui.py b/tests/ccbot/handlers/test_interactive_ui.py index 8d6a98e4..6698c844 100644 --- a/tests/ccbot/handlers/test_interactive_ui.py +++ b/tests/ccbot/handlers/test_interactive_ui.py @@ -92,6 +92,60 @@ async def test_handle_no_ui_returns_false(self, mock_bot: AsyncMock): assert result is False mock_bot.send_message.assert_not_called() + @pytest.mark.asyncio + async def test_handle_exit_plan_numbered_sends_keyboard( + self, mock_bot: AsyncMock, sample_pane_exit_plan_numbered: str + ): + """New numbered ExitPlanMode format triggers keyboard send.""" + window_id = "@5" + mock_window = MagicMock() + mock_window.window_id = window_id + + with ( + patch("ccbot.handlers.interactive_ui.tmux_manager") as mock_tmux, + patch("ccbot.handlers.interactive_ui.session_manager") as mock_sm, + ): + mock_tmux.find_window_by_id = AsyncMock(return_value=mock_window) + mock_tmux.capture_pane = AsyncMock( + return_value=sample_pane_exit_plan_numbered + ) + mock_sm.resolve_chat_id.return_value = 100 + + result = await handle_interactive_ui( + mock_bot, user_id=1, window_id=window_id, thread_id=42 + ) + + assert result is True + mock_bot.send_message.assert_called_once() + call_kwargs = mock_bot.send_message.call_args + assert call_kwargs.kwargs["reply_markup"] is not None + + @pytest.mark.asyncio + async def test_exit_plan_old_format_sends_keyboard( + self, mock_bot: AsyncMock, sample_pane_exit_plan: str + ): + """Old ExitPlanMode format still triggers keyboard send (backward compat).""" + window_id = "@5" + mock_window = MagicMock() + mock_window.window_id = window_id + + with ( + patch("ccbot.handlers.interactive_ui.tmux_manager") as mock_tmux, + patch("ccbot.handlers.interactive_ui.session_manager") as mock_sm, + ): + mock_tmux.find_window_by_id = AsyncMock(return_value=mock_window) + mock_tmux.capture_pane = AsyncMock(return_value=sample_pane_exit_plan) + mock_sm.resolve_chat_id.return_value = 100 + + result = await handle_interactive_ui( + mock_bot, user_id=1, window_id=window_id, thread_id=42 + ) + + assert result is True + mock_bot.send_message.assert_called_once() + call_kwargs = mock_bot.send_message.call_args + assert call_kwargs.kwargs["reply_markup"] is not None + class TestKeyboardLayoutForSettings: def test_settings_keyboard_includes_all_nav_keys(self): diff --git a/tests/ccbot/handlers/test_message_queue.py b/tests/ccbot/handlers/test_message_queue.py new file mode 100644 index 00000000..48952be5 --- /dev/null +++ b/tests/ccbot/handlers/test_message_queue.py @@ -0,0 +1,50 @@ +"""Tests for message_queue — status stats stripping for dedup.""" + +import pytest + +from ccbot.handlers.message_queue import _strip_status_stats + + +class TestStripStatusStats: + @pytest.mark.parametrize( + ("input_text", "expected"), + [ + pytest.param( + "Thinking… (45s · ↓ 2.5k tokens · thought for 25s)", + "Thinking…", + id="seconds_only", + ), + pytest.param( + "Enchanting… (2m 9s · ↓ 8.1k tokens · thought for 49s)", + "Enchanting…", + id="minutes_and_seconds", + ), + pytest.param( + "Working… (1h 2m 3s · ↓ 50k tokens)", + "Working…", + id="hours_minutes_seconds", + ), + pytest.param( + "Just text without stats", + "Just text without stats", + id="no_parenthetical", + ), + pytest.param( + "Idle…", + "Idle…", + id="no_stats", + ), + pytest.param( + "Germinating… (30s · ↓ 897 tokens · thought for 2s) Esc to interrupt", + "Germinating…", + id="with_trailing_esc", + ), + pytest.param( + "Thinking… (2m 9s · ↓ 8.1k tokens) Esc to interrupt", + "Thinking…", + id="minutes_with_trailing_esc", + ), + ], + ) + def test_strip_status_stats(self, input_text: str, expected: str): + assert _strip_status_stats(input_text) == expected diff --git a/tests/ccbot/handlers/test_status_polling.py b/tests/ccbot/handlers/test_status_polling.py index 9c0f04f7..93f67ca9 100644 --- a/tests/ccbot/handlers/test_status_polling.py +++ b/tests/ccbot/handlers/test_status_polling.py @@ -139,3 +139,108 @@ async def test_settings_ui_end_to_end_sends_telegram_keyboard( assert keyboard is not None # Verify the message text contains model picker content assert "Select model" in call_kwargs["text"] + + +@pytest.mark.usefixtures("_clear_interactive_state") +class TestStatusPollerExitPlanDetection: + """Simulate the status poller detecting ExitPlanMode UI (numbered format).""" + + @pytest.mark.asyncio + async def test_exit_plan_with_session_defers_to_jsonl( + self, mock_bot: AsyncMock, sample_pane_exit_plan_numbered: str + ): + """ExitPlanMode with active session → set_interactive_mode, no handle_interactive_ui.""" + window_id = "@5" + mock_window = MagicMock() + mock_window.window_id = window_id + + mock_ws = MagicMock() + mock_ws.session_id = "uuid-xxx" + + with ( + patch("ccbot.handlers.status_polling.tmux_manager") as mock_tmux, + patch( + "ccbot.handlers.status_polling.handle_interactive_ui", + new_callable=AsyncMock, + ) as mock_handle_ui, + patch( + "ccbot.handlers.status_polling.set_interactive_mode", + ) as mock_set_mode, + patch("ccbot.handlers.status_polling.session_manager") as mock_sm, + ): + mock_tmux.find_window_by_id = AsyncMock(return_value=mock_window) + mock_tmux.capture_pane = AsyncMock( + return_value=sample_pane_exit_plan_numbered + ) + mock_sm.get_window_state.return_value = mock_ws + + await update_status_message( + mock_bot, user_id=1, window_id=window_id, thread_id=42 + ) + + mock_set_mode.assert_called_once_with(1, window_id, 42) + mock_handle_ui.assert_not_called() + + @pytest.mark.asyncio + async def test_exit_plan_without_session_sends_immediately( + self, mock_bot: AsyncMock, sample_pane_exit_plan_numbered: str + ): + """ExitPlanMode with no session_id → handle_interactive_ui called (fallback).""" + window_id = "@5" + mock_window = MagicMock() + mock_window.window_id = window_id + + mock_ws = MagicMock() + mock_ws.session_id = "" + + with ( + patch("ccbot.handlers.status_polling.tmux_manager") as mock_tmux, + patch( + "ccbot.handlers.status_polling.handle_interactive_ui", + new_callable=AsyncMock, + ) as mock_handle_ui, + patch("ccbot.handlers.status_polling.session_manager") as mock_sm, + ): + mock_tmux.find_window_by_id = AsyncMock(return_value=mock_window) + mock_tmux.capture_pane = AsyncMock( + return_value=sample_pane_exit_plan_numbered + ) + mock_sm.get_window_state.return_value = mock_ws + mock_handle_ui.return_value = True + + await update_status_message( + mock_bot, user_id=1, window_id=window_id, thread_id=42 + ) + + mock_handle_ui.assert_called_once_with(mock_bot, 1, window_id, 42) + + @pytest.mark.asyncio + async def test_permission_prompt_always_sends_immediately( + self, mock_bot: AsyncMock, sample_pane_permission: str + ): + """PermissionPrompt → handle_interactive_ui called regardless of session_id.""" + window_id = "@5" + mock_window = MagicMock() + mock_window.window_id = window_id + + mock_ws = MagicMock() + mock_ws.session_id = "uuid-xxx" + + with ( + patch("ccbot.handlers.status_polling.tmux_manager") as mock_tmux, + patch( + "ccbot.handlers.status_polling.handle_interactive_ui", + new_callable=AsyncMock, + ) as mock_handle_ui, + patch("ccbot.handlers.status_polling.session_manager") as mock_sm, + ): + mock_tmux.find_window_by_id = AsyncMock(return_value=mock_window) + mock_tmux.capture_pane = AsyncMock(return_value=sample_pane_permission) + mock_sm.get_window_state.return_value = mock_ws + mock_handle_ui.return_value = True + + await update_status_message( + mock_bot, user_id=1, window_id=window_id, thread_id=42 + ) + + mock_handle_ui.assert_called_once_with(mock_bot, 1, window_id, 42) diff --git a/tests/ccbot/test_terminal_parser.py b/tests/ccbot/test_terminal_parser.py index 08118430..c6c07277 100644 --- a/tests/ccbot/test_terminal_parser.py +++ b/tests/ccbot/test_terminal_parser.py @@ -83,6 +83,23 @@ def test_exit_plan_mode_variant(self): assert result.name == "ExitPlanMode" assert "Claude has written up a plan" in result.content + def test_exit_plan_mode_numbered_selector( + self, sample_pane_exit_plan_numbered: str + ): + """New numbered ❯ 1. Yes / 2. No format is detected as ExitPlanMode.""" + result = extract_interactive_content(sample_pane_exit_plan_numbered) + assert result is not None + assert result.name == "ExitPlanMode" + assert "❯" in result.content + assert "Yes" in result.content + + def test_exit_plan_mode_old_format_still_works(self, sample_pane_exit_plan: str): + """Backward compat: old ExitPlanMode format still detected.""" + result = extract_interactive_content(sample_pane_exit_plan) + assert result is not None + assert result.name == "ExitPlanMode" + assert "Would you like to proceed?" in result.content + def test_ask_user_multi_tab(self, sample_pane_ask_user_multi_tab: str): result = extract_interactive_content(sample_pane_ask_user_multi_tab) assert result is not None From f29e8c85408fdf97c7e49679391138810dc9cbcb Mon Sep 17 00:00:00 2001 From: LikunYDev Date: Sun, 1 Mar 2026 07:45:49 +0000 Subject: [PATCH 09/15] fix: auto-dismiss feedback survey, silent topic probe, and status dedup improvements - Add Feedback UIPattern detection and auto-dismiss by sending "0" key - Replace sendMessage(".") topic probe with silent reopen_forum_topic (no more visible "." flash in topics) - Strip trailing time durations ("Churned for 9m 58s") for status dedup, preventing repeated edits as timer counts up - Strip " (deleted)" suffix from tmux pane cwd to fix session matching when a directory is deleted Co-Authored-By: Claude Opus 4.6 --- src/ccbot/handlers/message_queue.py | 17 +++++++--- src/ccbot/handlers/status_polling.py | 48 ++++++++++++++++++++++------ src/ccbot/terminal_parser.py | 13 ++++++++ src/ccbot/tmux_manager.py | 3 ++ 4 files changed, 67 insertions(+), 14 deletions(-) diff --git a/src/ccbot/handlers/message_queue.py b/src/ccbot/handlers/message_queue.py index 43c914ef..a72482fb 100644 --- a/src/ccbot/handlers/message_queue.py +++ b/src/ccbot/handlers/message_queue.py @@ -84,14 +84,23 @@ class MessageTask: TYPING_MIN_INTERVAL = 4.0 -# Regex to strip the stats parenthetical from status lines, e.g.: +# Regex to strip the stats parenthetical (and any trailing text) from status lines, e.g.: # "Unravelling… (45s · ↓ 2.5k tokens · thought for 25s)" → "Unravelling…" -_STATUS_STATS_RE = re.compile(r"\s*\([\d]+s\s*·.*\)\s*$") +# "Enchanting… (2m 9s · ↓ 8.1k tokens · thought for 49s)" → "Enchanting…" +# "Working… (30s · ↓ 897 tokens) Esc to interrupt" → "Working…" +_STATUS_STATS_RE = re.compile(r"\s*\([\dhms ]+·.*\).*$") + +# Regex to strip trailing time durations embedded directly in the status text, e.g.: +# "Churned for 9m 58s" → "Churned" +# "Responding for 30s" → "Responding" +_STATUS_TIME_RE = re.compile(r"\s+(?:for\s+)?(?:\d+[hms]\s*)+$") def _strip_status_stats(text: str) -> str: - """Strip the timing/stats parenthetical from a status line for dedup.""" - return _STATUS_STATS_RE.sub("", text) + """Strip timing/stats from a status line for dedup.""" + text = _STATUS_STATS_RE.sub("", text) + text = _STATUS_TIME_RE.sub("", text) + return text def get_message_queue( diff --git a/src/ccbot/handlers/status_polling.py b/src/ccbot/handlers/status_polling.py index 143b8ba3..90e249f2 100644 --- a/src/ccbot/handlers/status_polling.py +++ b/src/ccbot/handlers/status_polling.py @@ -24,12 +24,14 @@ from telegram.error import BadRequest from ..session import session_manager -from ..terminal_parser import is_interactive_ui, parse_status_line +from ..terminal_parser import extract_interactive_content, parse_status_line from ..tmux_manager import tmux_manager from .interactive_ui import ( + INTERACTIVE_TOOL_NAMES, clear_interactive_msg, get_interactive_window, handle_interactive_ui, + set_interactive_mode, ) from .cleanup import clear_topic_state from .message_queue import enqueue_status_update, get_message_queue @@ -70,7 +72,7 @@ async def update_status_message( if interactive_window == window_id: # User is in interactive mode for THIS window - if is_interactive_ui(pane_text): + if extract_interactive_content(pane_text) is not None: # Interactive UI still showing — skip status update (user is interacting) return # Interactive UI gone — clear interactive mode, fall through to status check. @@ -82,10 +84,28 @@ async def update_status_message( # Clear stale interactive mode await clear_interactive_msg(user_id, bot, thread_id) - # Check for permission prompt (interactive UI not triggered via JSONL) - if should_check_new_ui and is_interactive_ui(pane_text): - await handle_interactive_ui(bot, user_id, window_id, thread_id) - return + # Check for interactive UI in terminal + if should_check_new_ui: + content = extract_interactive_content(pane_text) + if content is not None: + if content.name == "Feedback": + # Auto-dismiss feedback survey by pressing "0" (Dismiss) + await tmux_manager.send_keys(window_id, "0", enter=False, literal=False) + logger.info("Auto-dismissed feedback survey in window %s", window_id) + return + if ( + content.name in INTERACTIVE_TOOL_NAMES + and session_manager.get_window_state(window_id).session_id + ): + # ExitPlanMode/AskUserQuestion have JSONL entries — the JSONL path + # will send the UI via queue.join() ordering (content before UI). + # Just set interactive mode to suppress status updates. + set_interactive_mode(user_id, window_id, thread_id) + else: + # PermissionPrompt/RestoreCheckpoint/Settings or no JSONL session: + # send immediately (no ordering concern). + await handle_interactive_ui(bot, user_id, window_id, thread_id) + return # Normal status line check status_line = parse_status_line(pane_text) @@ -115,13 +135,21 @@ async def status_poll_loop(bot: Bot) -> None: session_manager.iter_thread_bindings() ): try: - await bot.unpin_all_forum_topic_messages( - chat_id=session_manager.resolve_chat_id(user_id, thread_id), + chat_id = session_manager.resolve_chat_id(user_id, thread_id) + # reopen_forum_topic is a silent probe: + # - open topic → BadRequest("Topic_not_modified") + # - deleted topic → BadRequest("Topic_id_invalid") + await bot.reopen_forum_topic( + chat_id=chat_id, message_thread_id=thread_id, ) except BadRequest as e: - if "Topic_id_invalid" in str(e): - # Topic deleted — kill window, unbind, and clean up state + err = str(e) + if "not_modified" in err.lower(): + # Topic exists and is open — no-op + continue + if "thread not found" in err or "Topic_id_invalid" in err: + # Topic deleted — kill window, unbind, and clean up w = await tmux_manager.find_window_by_id(wid) if w: await tmux_manager.kill_window(w.window_id) diff --git a/src/ccbot/terminal_parser.py b/src/ccbot/terminal_parser.py index 288f8635..d851391c 100644 --- a/src/ccbot/terminal_parser.py +++ b/src/ccbot/terminal_parser.py @@ -59,6 +59,13 @@ class UIPattern: re.compile(r"^\s*Esc to (cancel|exit)"), ), ), + # Fallback: numbered selector UI (❯ 1. Yes / 2. No) without old markers + UIPattern( + name="ExitPlanMode", + top=(re.compile(r"^\s*❯\s+\d+\.\s+Yes"),), + bottom=(), # extends to last non-empty line + min_gap=1, + ), UIPattern( name="AskUserQuestion", top=(re.compile(r"^\s*←\s+[☐✔☒]"),), # Multi-tab: no bottom needed @@ -84,6 +91,12 @@ class UIPattern: top=(re.compile(r"^\s*Restore the code"),), bottom=(re.compile(r"^\s*Enter to continue"),), ), + UIPattern( + name="Feedback", + top=(re.compile(r"How is Claude doing this session"),), + bottom=(re.compile(r"0:\s*Dismiss"),), + min_gap=1, + ), UIPattern( name="Settings", top=( diff --git a/src/ccbot/tmux_manager.py b/src/ccbot/tmux_manager.py index eae2702a..b1354993 100644 --- a/src/ccbot/tmux_manager.py +++ b/src/ccbot/tmux_manager.py @@ -102,6 +102,9 @@ def _sync_list_windows() -> list[TmuxWindow]: pane = window.active_pane if pane: cwd = pane.pane_current_path or "" + # tmux appends " (deleted)" when cwd no longer exists + if cwd.endswith(" (deleted)"): + cwd = cwd.removesuffix(" (deleted)") pane_cmd = pane.pane_current_command or "" else: cwd = "" From 9fd58ab94a7054a94dbe4f90783847c326679698 Mon Sep 17 00:00:00 2001 From: LikunYDev Date: Wed, 1 Apr 2026 03:29:01 +0000 Subject: [PATCH 10/15] fix: thread-aware message queue, interactive UI fallback, and numbered selector support Pass thread_id to get_message_queue for correct per-topic routing. Add fallback in status polling to send interactive UI when JSONL path misses the pane capture. Add numbered selector pattern for AskUserQuestion. Pin telegramify-markdown to <1.0.0. Co-Authored-By: Claude Opus 4.6 --- pyproject.toml | 2 +- src/ccbot/bot.py | 2 +- src/ccbot/handlers/status_polling.py | 7 ++++++- src/ccbot/terminal_parser.py | 7 +++++++ 4 files changed, 15 insertions(+), 3 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 55aa8cf6..4b64bb96 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -10,7 +10,7 @@ dependencies = [ "httpx>=0.27.0", "libtmux>=0.37.0", "Pillow>=10.0.0", - "telegramify-markdown>=0.5.0", + "telegramify-markdown>=0.5.0,<1.0.0", "aiofiles>=24.0.0", ] diff --git a/src/ccbot/bot.py b/src/ccbot/bot.py index e07458d4..63c9e8b6 100644 --- a/src/ccbot/bot.py +++ b/src/ccbot/bot.py @@ -1430,7 +1430,7 @@ async def handle_new_message(msg: NewMessage, bot: Bot) -> None: # Mark interactive mode BEFORE sleeping so polling skips this window set_interactive_mode(user_id, wid, thread_id) # Flush pending messages (e.g. plan content) before sending interactive UI - queue = get_message_queue(user_id) + queue = get_message_queue(user_id, thread_id) if queue: await queue.join() # Wait briefly for Claude Code to render the question UI diff --git a/src/ccbot/handlers/status_polling.py b/src/ccbot/handlers/status_polling.py index 90e249f2..4504151a 100644 --- a/src/ccbot/handlers/status_polling.py +++ b/src/ccbot/handlers/status_polling.py @@ -29,6 +29,7 @@ from .interactive_ui import ( INTERACTIVE_TOOL_NAMES, clear_interactive_msg, + get_interactive_msg_id, get_interactive_window, handle_interactive_ui, set_interactive_mode, @@ -101,6 +102,10 @@ async def update_status_message( # will send the UI via queue.join() ordering (content before UI). # Just set interactive mode to suppress status updates. set_interactive_mode(user_id, window_id, thread_id) + # Fallback: if JSONL path failed to send the UI (e.g. pane capture + # missed the top marker), send it now from the status poll. + if not get_interactive_msg_id(user_id, thread_id): + await handle_interactive_ui(bot, user_id, window_id, thread_id) else: # PermissionPrompt/RestoreCheckpoint/Settings or no JSONL session: # send immediately (no ordering concern). @@ -190,7 +195,7 @@ async def status_poll_loop(bot: Bot) -> None: ) continue - queue = get_message_queue(user_id) + queue = get_message_queue(user_id, thread_id) if queue and not queue.empty(): continue await update_status_message( diff --git a/src/ccbot/terminal_parser.py b/src/ccbot/terminal_parser.py index d851391c..2f05697c 100644 --- a/src/ccbot/terminal_parser.py +++ b/src/ccbot/terminal_parser.py @@ -78,6 +78,13 @@ class UIPattern: bottom=(re.compile(r"^\s*Enter to select"),), min_gap=1, ), + # Numbered selector: "❯ 1. [ ] ..." or " 1. [x] ..." (tab bar scrolled off) + UIPattern( + name="AskUserQuestion", + top=(re.compile(r"^\s*(?:❯\s+)?\d+\.\s+\["),), + bottom=(re.compile(r"^\s*Enter to select"),), + min_gap=1, + ), UIPattern( name="PermissionPrompt", top=( From 0643bfec0c8d690728b1be0f086eecc0ca9dc414 Mon Sep 17 00:00:00 2001 From: LikunYDev Date: Wed, 1 Apr 2026 05:32:22 +0000 Subject: [PATCH 11/15] fix: prevent duplicate AskUserQuestion/ExitPlanMode UI in Telegram Remove the fallback sender in status_polling that raced with the JSONL path, causing the interactive UI to be sent twice. Instead, make the JSONL path (the sole sender) retry pane capture with increasing delays (0.3s, 0.7s, 1.0s) to handle slow UI rendering. Co-Authored-By: Claude Opus 4.6 --- src/ccbot/bot.py | 11 ++++++++--- src/ccbot/handlers/status_polling.py | 9 ++------- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/src/ccbot/bot.py b/src/ccbot/bot.py index 63c9e8b6..8d0306dd 100644 --- a/src/ccbot/bot.py +++ b/src/ccbot/bot.py @@ -1433,9 +1433,14 @@ async def handle_new_message(msg: NewMessage, bot: Bot) -> None: queue = get_message_queue(user_id, thread_id) if queue: await queue.join() - # Wait briefly for Claude Code to render the question UI - await asyncio.sleep(0.3) - handled = await handle_interactive_ui(bot, user_id, wid, thread_id) + # Retry pane capture with increasing delays — Claude Code may need + # time to render the interactive UI after the JSONL entry is written. + handled = False + for delay in (0.3, 0.7, 1.0): + await asyncio.sleep(delay) + handled = await handle_interactive_ui(bot, user_id, wid, thread_id) + if handled: + break if handled: # Update user's read offset session = await session_manager.resolve_session_for_window(wid) diff --git a/src/ccbot/handlers/status_polling.py b/src/ccbot/handlers/status_polling.py index 4504151a..723f74b7 100644 --- a/src/ccbot/handlers/status_polling.py +++ b/src/ccbot/handlers/status_polling.py @@ -29,7 +29,6 @@ from .interactive_ui import ( INTERACTIVE_TOOL_NAMES, clear_interactive_msg, - get_interactive_msg_id, get_interactive_window, handle_interactive_ui, set_interactive_mode, @@ -99,13 +98,9 @@ async def update_status_message( and session_manager.get_window_state(window_id).session_id ): # ExitPlanMode/AskUserQuestion have JSONL entries — the JSONL path - # will send the UI via queue.join() ordering (content before UI). - # Just set interactive mode to suppress status updates. + # (bot.py handle_new_message) is the sole sender for these UIs. + # Just set interactive mode here to suppress status updates. set_interactive_mode(user_id, window_id, thread_id) - # Fallback: if JSONL path failed to send the UI (e.g. pane capture - # missed the top marker), send it now from the status poll. - if not get_interactive_msg_id(user_id, thread_id): - await handle_interactive_ui(bot, user_id, window_id, thread_id) else: # PermissionPrompt/RestoreCheckpoint/Settings or no JSONL session: # send immediately (no ordering concern). From a5a3bb879c366f6848ec6929a0c51379f4dbd2d8 Mon Sep 17 00:00:00 2001 From: LikunYDev Date: Wed, 15 Apr 2026 11:25:06 +0900 Subject: [PATCH 12/15] docs: add macOS launchd service workflow --- CLAUDE.md | 6 +++++- README.md | 31 +++++++++++++++++++++++++++++++ deploy/macos/com.ccbot.plist | 36 ++++++++++++++++++++++++++++++++++++ scripts/restart-macos.sh | 28 ++++++++++++++++++++++++++++ 4 files changed, 100 insertions(+), 1 deletion(-) create mode 100644 deploy/macos/com.ccbot.plist create mode 100755 scripts/restart-macos.sh diff --git a/CLAUDE.md b/CLAUDE.md index b9d91576..38401a44 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -10,7 +10,9 @@ Tech stack: Python, python-telegram-bot, tmux, uv. uv run ruff check src/ tests/ # Lint — MUST pass before committing uv run ruff format src/ tests/ # Format — auto-fix, then verify with --check uv run pyright src/ccbot/ # Type check — MUST be 0 errors before committing -./scripts/restart.sh # Restart the ccbot service after code changes +uv run python -m pytest # Run full test suite in repo-local .venv +./scripts/restart.sh # Linux: reinstall + restart systemd service +./scripts/restart-macos.sh # macOS: reload launchd agent ccbot hook --install # Auto-install Claude Code SessionStart hook ``` @@ -34,6 +36,8 @@ ccbot hook --install # Auto-install Claude Code SessionStart ho - Config directory: `~/.ccbot/` by default, override with `CCBOT_DIR` env var. - `.env` loading priority: local `.env` > config dir `.env`. - State files: `state.json` (thread bindings), `session_map.json` (hook-generated), `monitor_state.json` (byte offsets). +- Service management is platform-specific: Linux uses `systemd`; macOS uses `launchd`. +- For macOS, keep a local agent plist in `~/Library/LaunchAgents/` and use `deploy/macos/com.ccbot.plist` as the template. ## Hook Configuration diff --git a/README.md b/README.md index e7ee01dc..0a9ca4e9 100644 --- a/README.md +++ b/README.md @@ -61,6 +61,12 @@ cd ccmux uv sync ``` +For development and testing, install dev dependencies into the repo-local `.venv`: + +```bash +uv sync --extra dev +``` + ## Configuration **1. Create a Telegram bot and enable Threaded Mode:** @@ -141,6 +147,31 @@ ccbot uv run ccbot ``` +### Service Management + +Linux/systemd users can use the existing restart helpers: + +```bash +./scripts/restart.sh +``` + +macOS users should use `launchd` instead of `systemd`: + +1. Copy [deploy/macos/com.ccbot.plist](/Users/lkyao/projects/ccbot/deploy/macos/com.ccbot.plist) to `~/Library/LaunchAgents/com.ccbot.plist` +2. Replace the placeholder repo path and username values +3. Load it with `launchctl bootstrap gui/$(id -u) ~/Library/LaunchAgents/com.ccbot.plist` +4. Restart with: + +```bash +./scripts/restart-macos.sh +``` + +For local verification, prefer: + +```bash +uv run python -m pytest +``` + ### Commands **Bot commands:** diff --git a/deploy/macos/com.ccbot.plist b/deploy/macos/com.ccbot.plist new file mode 100644 index 00000000..d3991df1 --- /dev/null +++ b/deploy/macos/com.ccbot.plist @@ -0,0 +1,36 @@ + + + + + Label + com.ccbot + + ProgramArguments + + /opt/homebrew/bin/uv + run + --project + /ABSOLUTE/PATH/TO/ccbot + ccbot + + + WorkingDirectory + /ABSOLUTE/PATH/TO/ccbot + + RunAtLoad + + KeepAlive + + + StandardOutPath + /Users/USERNAME/.ccbot/ccbot.stdout.log + StandardErrorPath + /Users/USERNAME/.ccbot/ccbot.stderr.log + + EnvironmentVariables + + PATH + /opt/homebrew/bin:/usr/local/bin:/usr/bin:/bin:/usr/sbin:/sbin + + + diff --git a/scripts/restart-macos.sh b/scripts/restart-macos.sh new file mode 100755 index 00000000..1c4bfab7 --- /dev/null +++ b/scripts/restart-macos.sh @@ -0,0 +1,28 @@ +#!/usr/bin/env bash +set -euo pipefail + +LABEL="${CCBOT_LAUNCHD_LABEL:-com.ccbot}" +PLIST="${CCBOT_LAUNCHD_PLIST:-$HOME/Library/LaunchAgents/${LABEL}.plist}" + +if [[ ! -f "$PLIST" ]]; then + echo "Error: launchd plist not found: $PLIST" + exit 1 +fi + +echo "Reloading launchd agent: $LABEL" +launchctl bootout "gui/$(id -u)" "$PLIST" >/dev/null 2>&1 || true +launchctl bootstrap "gui/$(id -u)" "$PLIST" +launchctl kickstart -k "gui/$(id -u)/$LABEL" + +sleep 2 + +if launchctl print "gui/$(id -u)/$LABEL" >/dev/null 2>&1; then + echo "ccbot restarted successfully." + echo "----------------------------------------" + tail -n 20 "$HOME/.ccbot/ccbot.stdout.log" 2>/dev/null || true + tail -n 20 "$HOME/.ccbot/ccbot.stderr.log" 2>/dev/null || true + echo "----------------------------------------" +else + echo "Error: ccbot failed to start." + exit 1 +fi From 542566108796f17f6d0766285dcb9c5afc60d2b4 Mon Sep 17 00:00:00 2001 From: LikunYDev Date: Wed, 15 Apr 2026 11:27:55 +0900 Subject: [PATCH 13/15] Revert "docs: add macOS launchd service workflow" This reverts commit a5a3bb879c366f6848ec6929a0c51379f4dbd2d8. --- CLAUDE.md | 6 +----- README.md | 31 ------------------------------- deploy/macos/com.ccbot.plist | 36 ------------------------------------ scripts/restart-macos.sh | 28 ---------------------------- 4 files changed, 1 insertion(+), 100 deletions(-) delete mode 100644 deploy/macos/com.ccbot.plist delete mode 100755 scripts/restart-macos.sh diff --git a/CLAUDE.md b/CLAUDE.md index 38401a44..b9d91576 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -10,9 +10,7 @@ Tech stack: Python, python-telegram-bot, tmux, uv. uv run ruff check src/ tests/ # Lint — MUST pass before committing uv run ruff format src/ tests/ # Format — auto-fix, then verify with --check uv run pyright src/ccbot/ # Type check — MUST be 0 errors before committing -uv run python -m pytest # Run full test suite in repo-local .venv -./scripts/restart.sh # Linux: reinstall + restart systemd service -./scripts/restart-macos.sh # macOS: reload launchd agent +./scripts/restart.sh # Restart the ccbot service after code changes ccbot hook --install # Auto-install Claude Code SessionStart hook ``` @@ -36,8 +34,6 @@ ccbot hook --install # Auto-install Claude Code SessionStart ho - Config directory: `~/.ccbot/` by default, override with `CCBOT_DIR` env var. - `.env` loading priority: local `.env` > config dir `.env`. - State files: `state.json` (thread bindings), `session_map.json` (hook-generated), `monitor_state.json` (byte offsets). -- Service management is platform-specific: Linux uses `systemd`; macOS uses `launchd`. -- For macOS, keep a local agent plist in `~/Library/LaunchAgents/` and use `deploy/macos/com.ccbot.plist` as the template. ## Hook Configuration diff --git a/README.md b/README.md index 0a9ca4e9..e7ee01dc 100644 --- a/README.md +++ b/README.md @@ -61,12 +61,6 @@ cd ccmux uv sync ``` -For development and testing, install dev dependencies into the repo-local `.venv`: - -```bash -uv sync --extra dev -``` - ## Configuration **1. Create a Telegram bot and enable Threaded Mode:** @@ -147,31 +141,6 @@ ccbot uv run ccbot ``` -### Service Management - -Linux/systemd users can use the existing restart helpers: - -```bash -./scripts/restart.sh -``` - -macOS users should use `launchd` instead of `systemd`: - -1. Copy [deploy/macos/com.ccbot.plist](/Users/lkyao/projects/ccbot/deploy/macos/com.ccbot.plist) to `~/Library/LaunchAgents/com.ccbot.plist` -2. Replace the placeholder repo path and username values -3. Load it with `launchctl bootstrap gui/$(id -u) ~/Library/LaunchAgents/com.ccbot.plist` -4. Restart with: - -```bash -./scripts/restart-macos.sh -``` - -For local verification, prefer: - -```bash -uv run python -m pytest -``` - ### Commands **Bot commands:** diff --git a/deploy/macos/com.ccbot.plist b/deploy/macos/com.ccbot.plist deleted file mode 100644 index d3991df1..00000000 --- a/deploy/macos/com.ccbot.plist +++ /dev/null @@ -1,36 +0,0 @@ - - - - - Label - com.ccbot - - ProgramArguments - - /opt/homebrew/bin/uv - run - --project - /ABSOLUTE/PATH/TO/ccbot - ccbot - - - WorkingDirectory - /ABSOLUTE/PATH/TO/ccbot - - RunAtLoad - - KeepAlive - - - StandardOutPath - /Users/USERNAME/.ccbot/ccbot.stdout.log - StandardErrorPath - /Users/USERNAME/.ccbot/ccbot.stderr.log - - EnvironmentVariables - - PATH - /opt/homebrew/bin:/usr/local/bin:/usr/bin:/bin:/usr/sbin:/sbin - - - diff --git a/scripts/restart-macos.sh b/scripts/restart-macos.sh deleted file mode 100755 index 1c4bfab7..00000000 --- a/scripts/restart-macos.sh +++ /dev/null @@ -1,28 +0,0 @@ -#!/usr/bin/env bash -set -euo pipefail - -LABEL="${CCBOT_LAUNCHD_LABEL:-com.ccbot}" -PLIST="${CCBOT_LAUNCHD_PLIST:-$HOME/Library/LaunchAgents/${LABEL}.plist}" - -if [[ ! -f "$PLIST" ]]; then - echo "Error: launchd plist not found: $PLIST" - exit 1 -fi - -echo "Reloading launchd agent: $LABEL" -launchctl bootout "gui/$(id -u)" "$PLIST" >/dev/null 2>&1 || true -launchctl bootstrap "gui/$(id -u)" "$PLIST" -launchctl kickstart -k "gui/$(id -u)/$LABEL" - -sleep 2 - -if launchctl print "gui/$(id -u)/$LABEL" >/dev/null 2>&1; then - echo "ccbot restarted successfully." - echo "----------------------------------------" - tail -n 20 "$HOME/.ccbot/ccbot.stdout.log" 2>/dev/null || true - tail -n 20 "$HOME/.ccbot/ccbot.stderr.log" 2>/dev/null || true - echo "----------------------------------------" -else - echo "Error: ccbot failed to start." - exit 1 -fi From 412b7b69d6722e6c34966311e5044e58dd1ea44e Mon Sep 17 00:00:00 2001 From: LikunYDev Date: Wed, 15 Apr 2026 11:33:12 +0900 Subject: [PATCH 14/15] revert: drop service restart script changes from PR Removes scripts/restart-service.sh and restores scripts/restart.sh to upstream/main to keep this PR focused on core queue/rate-limit logic. Co-Authored-By: Claude Opus 4.6 (1M context) --- scripts/restart-service.sh | 24 ------------ scripts/restart.sh | 78 ++++++++++++++++++++++++++++++++------ 2 files changed, 66 insertions(+), 36 deletions(-) delete mode 100755 scripts/restart-service.sh diff --git a/scripts/restart-service.sh b/scripts/restart-service.sh deleted file mode 100755 index 63a7d9fb..00000000 --- a/scripts/restart-service.sh +++ /dev/null @@ -1,24 +0,0 @@ -#!/usr/bin/env bash -set -euo pipefail - -# Restart ccbot via systemd user service. -# Claude Code sessions live in tmux windows and are unaffected by this restart. -# The bot reconnects to existing sessions via state.json and session_map.json on startup. - -echo "Restarting ccbot service..." -systemctl --user restart ccbot - -sleep 2 - -if systemctl --user is-active --quiet ccbot; then - echo "ccbot restarted successfully." - echo "----------------------------------------" - journalctl --user -u ccbot --no-pager -n 20 - echo "----------------------------------------" -else - echo "Error: ccbot failed to start." - echo "----------------------------------------" - journalctl --user -u ccbot --no-pager -n 30 - echo "----------------------------------------" - exit 1 -fi diff --git a/scripts/restart.sh b/scripts/restart.sh index 9b4d4531..62715a4f 100755 --- a/scripts/restart.sh +++ b/scripts/restart.sh @@ -1,27 +1,81 @@ #!/usr/bin/env bash set -euo pipefail +TMUX_SESSION="ccbot" +TMUX_WINDOW="__main__" +TARGET="${TMUX_SESSION}:${TMUX_WINDOW}" PROJECT_DIR="$(cd "$(dirname "$0")/.." && pwd)" +MAX_WAIT=10 # seconds to wait for process to exit -# Reinstall the package so the systemd service picks up code changes -echo "Installing ccbot from ${PROJECT_DIR}..." -uv tool install --force --reinstall "$PROJECT_DIR" 2>&1 +# Check if tmux session and window exist +if ! tmux has-session -t "$TMUX_SESSION" 2>/dev/null; then + echo "Error: tmux session '$TMUX_SESSION' does not exist" + exit 1 +fi + +if ! tmux list-windows -t "$TMUX_SESSION" -F '#{window_name}' 2>/dev/null | grep -qx "$TMUX_WINDOW"; then + echo "Error: window '$TMUX_WINDOW' not found in session '$TMUX_SESSION'" + exit 1 +fi + +# Get the pane PID and check if uv run ccbot is running +PANE_PID=$(tmux list-panes -t "$TARGET" -F '#{pane_pid}') + +is_ccbot_running() { + pstree -a "$PANE_PID" 2>/dev/null | grep -q 'uv.*run ccbot\|ccbot.*\.venv/bin/ccbot' +} + +# Stop existing process if running +if is_ccbot_running; then + echo "Found running ccbot process, sending Ctrl-C..." + tmux send-keys -t "$TARGET" C-c + + # Wait for process to exit + waited=0 + while is_ccbot_running && [ "$waited" -lt "$MAX_WAIT" ]; do + sleep 1 + waited=$((waited + 1)) + echo " Waiting for process to exit... (${waited}s/${MAX_WAIT}s)" + done + + if is_ccbot_running; then + echo "Process did not exit after ${MAX_WAIT}s, sending SIGTERM..." + # Kill the uv process directly + UV_PID=$(pstree -ap "$PANE_PID" 2>/dev/null | grep -oP 'uv,\K\d+' | head -1) + if [ -n "$UV_PID" ]; then + kill "$UV_PID" 2>/dev/null || true + sleep 2 + fi + if is_ccbot_running; then + echo "Process still running, sending SIGKILL..." + kill -9 "$UV_PID" 2>/dev/null || true + sleep 1 + fi + fi + + echo "Process stopped." +else + echo "No ccbot process running in $TARGET" +fi + +# Brief pause to let the shell settle +sleep 1 -# Restart the systemd user service -echo "Restarting ccbot service..." -systemctl --user restart ccbot +# Start ccbot +echo "Starting ccbot in $TARGET..." +tmux send-keys -t "$TARGET" "cd ${PROJECT_DIR} && uv run ccbot" Enter -# Wait for startup and verify -sleep 2 -if systemctl --user is-active --quiet ccbot; then +# Verify startup and show logs +sleep 3 +if is_ccbot_running; then echo "ccbot restarted successfully. Recent logs:" echo "----------------------------------------" - journalctl --user -u ccbot --since "5 sec ago" --no-pager | tail -20 + tmux capture-pane -t "$TARGET" -p | tail -20 echo "----------------------------------------" else - echo "Warning: ccbot service failed to start. Logs:" + echo "Warning: ccbot may not have started. Pane output:" echo "----------------------------------------" - journalctl --user -u ccbot --since "10 sec ago" --no-pager | tail -30 + tmux capture-pane -t "$TARGET" -p | tail -30 echo "----------------------------------------" exit 1 fi From e71e9b1dddba4f3680a1ff99a10bdbd062eb5f28 Mon Sep 17 00:00:00 2001 From: LikunYDev Date: Wed, 15 Apr 2026 11:36:05 +0900 Subject: [PATCH 15/15] chore: remove unused imports and apply ruff format Co-Authored-By: Claude Opus 4.6 (1M context) --- src/ccbot/bot.py | 5 ++++- src/ccbot/handlers/message_queue.py | 2 -- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/src/ccbot/bot.py b/src/ccbot/bot.py index b3956110..0387a9b7 100644 --- a/src/ccbot/bot.py +++ b/src/ccbot/bot.py @@ -1774,7 +1774,10 @@ async def handle_new_message(msg: NewMessage, bot: Bot) -> None: await clear_interactive_msg(user_id, bot, thread_id) # Skip tool call notifications when CCBOT_SHOW_TOOL_CALLS=false - if not config.show_tool_calls and msg.content_type in ("tool_use", "tool_result"): + if not config.show_tool_calls and msg.content_type in ( + "tool_use", + "tool_result", + ): continue parts = build_response_parts( diff --git a/src/ccbot/handlers/message_queue.py b/src/ccbot/handlers/message_queue.py index f33aec34..106ff719 100644 --- a/src/ccbot/handlers/message_queue.py +++ b/src/ccbot/handlers/message_queue.py @@ -30,8 +30,6 @@ from ..markdown_v2 import convert_markdown from ..session import session_manager -from ..terminal_parser import parse_status_line -from ..tmux_manager import tmux_manager from .message_sender import ( NO_LINK_PREVIEW, PARSE_MODE,