Skip to content

fix: per-topic queues, 429 hardening, and interactive UI reliability#77

Open
LikunYDev wants to merge 18 commits intosix-ddc:mainfrom
LikunYDev:topic-queue-rate-limit-fixes
Open

fix: per-topic queues, 429 hardening, and interactive UI reliability#77
LikunYDev wants to merge 18 commits intosix-ddc:mainfrom
LikunYDev:topic-queue-rate-limit-fixes

Conversation

@LikunYDev
Copy link
Copy Markdown
Contributor

@LikunYDev LikunYDev commented Apr 15, 2026

Overview

This PR hardens the bot against the three classes of failures that show up at scale: cross-topic head-of-line blocking (one noisy topic starving others), Telegram 429 flood control (bursting past rate limits after restart or during parallel runs), and interactive UI races (AskUserQuestion / ExitPlanMode sent twice or not at all). It also lands a few smaller quality-of-life items (/context, SHOW_THINKING / SHOW_TOOLS, feedback-survey auto-dismiss) and cleans up a backwards-compat corner in state.json loading.

1. Per-topic message queues

Before: one queue per user_id. All topics that the user owned shared a single FIFO worker. A burst in one topic blocked delivery in every other topic.

After: queues keyed by (user_id, thread_id_or_0). Each topic has its own worker task, merge chain, tool_usetool_result mapping, and flood-control window.

  • _QueueKey = tuple[int, int]thread_id_or_0 = 0 represents the non-topic fallback path.
  • get_message_queue(user_id, thread_id) / get_or_create_queue(bot, user_id, thread_id) take the new thread argument.
  • _flood_until, _tool_msg_ids, and _status_msg_info are already keyed by thread; _flood_until moved to the new tuple key.
  • Callers updated: bot.py:handle_new_message now passes thread_id when flushing the queue before an interactive UI; status_polling.py passes thread_id when deciding whether to skip a status update.

Trade-off: more worker tasks per user. Workers are cheap (idle await queue.get()) so this is fine; memory is per-topic proportional.

2. Rate-limit hardening (429 prevention)

Three layered defenses against Telegram's 30/sec global and 20/min per-group limits:

2a. Per-group async lock across topic workers

Independent per-topic workers can still burst against the same group chat if the user is active in several topics at once. A shared _group_process_locks: dict[int, asyncio.Lock] (keyed by chat_id) wraps the per-task send in _message_queue_worker, so workers sending to the same group serialize at the process level — AIORateLimiter handles the remainder.

2b. Pre-fill per-group rate-limiter buckets on restart

post_init already pre-filled the global bucket. The per-group limiter (rate_limiter._group_limiters) was still empty on startup, so the first 20 requests into each group went through instantly, tripping Telegram's server-side per-group counter. Now, for every known group chat in session_manager.group_chat_ids, we construct an AsyncLimiter with _level = max_rate and register it in _group_limiters so the first request after restart waits the correct amount.

2c. Status dedup ignores timing/stats noise

Status lines from Claude Code look like "Unravelling… (45s · ↓ 2.5k tokens · thought for 25s)" — the stats change every second even when the verb doesn't, defeating the existing last_text dedup. Two regexes now strip the parenthetical and any embedded \"for 9m 58s\" suffix before comparison:

Unravelling… (45s · ↓ 2.5k tokens · thought for 25s)  → Unravelling…
Churned for 9m 58s                                    → Churned
Working… (30s · ↓ 897 tokens) Esc to interrupt        → Working…

2d. Typing-indicator throttle

Typing indicators were sent on every enqueue. Now throttled per (user_id, thread_id) to TYPING_MIN_INTERVAL = 4.0 seconds via a _last_typing map.

3. Session monitor concurrency

SessionMonitor.check_for_updates reshapes from an interleaved "read → parse → callback" loop into a three-phase pipeline:

  1. Collect — walk sessions, stat each file, skip those whose (mtime, size) haven't advanced, and build a to_read list.
  2. Read (parallel)asyncio.gather(*(self._read_new_lines(...))) with return_exceptions=True. I/O is the dominant cost at scale; parallelizing it roughly halves the poll cycle when many sessions are active.
  3. Parse (sequential) — iterate results, update _pending_tools per session. Kept sequential because _pending_tools is keyed by session_id and order matters within a session.

Callback dispatch is fire-and-forget per session group, tracked in self._callback_tasks: set[asyncio.Task[None]] with a discard done-callback. This decouples the poll cadence from slow callbacks (e.g. a Telegram edit that hits a transient 429): a stalled callback in session A no longer blocks the poll from advancing session B. Outstanding tasks are cancelled in stop().

4. Interactive UI reliability

4a. JSONL path: retry pane capture with backoff

When Claude Code writes an AskUserQuestion/ExitPlanMode entry to JSONL, the tmux pane sometimes hasn't rendered the UI yet. The previous code used a single await asyncio.sleep(0.3) before capturing, so on slow renders the pattern match failed and the UI was dropped. Now we retry up to three times with increasing delays (0.3s, 0.7s, 1.0s), breaking on first success.

4b. Polling path: coordinate with JSONL path to avoid double-send

update_status_message now distinguishes UIs with JSONL entries from UIs without:

  • AskUserQuestion / ExitPlanMode (listed in INTERACTIVE_TOOL_NAMES, session has a JSONL) — the JSONL path in bot.py:handle_new_message is the sole sender; polling path only calls set_interactive_mode to suppress status updates, never handle_interactive_ui.
  • PermissionPrompt / RestoreCheckpoint / Settings / JSONL-less sessions — polling path sends immediately; no ordering concern because these have no JSONL equivalent.

This eliminates the duplicate-message race fixed piecemeal in earlier commits.

4c. Feedback survey auto-dismiss

A new Feedback UI pattern ("How is Claude doing this session…" / "0: Dismiss") is detected, and the poller sends 0 directly to the tmux pane to dismiss it without bothering the user.

4d. Numbered selector fallbacks

Recent Claude Code builds moved ExitPlanMode / AskUserQuestion to numbered selectors (❯ 1. Yes / ❯ 1. [ ] …). Two new fallback UIPatterns match these so the UI is still detected when the old markers are gone.

5. Topic lifecycle probe (silent)

The poller probes each bound topic every cycle to detect deletions. Previously it used unpin_all_forum_topic_messages, which is harmless but technically a write. Now it uses reopen_forum_topic:

  • Open topic → BadRequest(\"Topic_not_modified\") — we treat this as success and continue.
  • Deleted topic → BadRequest(\"Topic_id_invalid\") or \"thread not found\" — triggers window kill + state cleanup.

No user-visible effect on healthy topics and no write side-effect.

6. New config flags — SHOW_THINKING, SHOW_TOOLS

Both default true. Setting either to false suppresses the respective content type in:

  • real-time delivery (session_monitor.py — skipped before emitting NewMessage),
  • history rendering (handlers/history.py — skipped during message formatting).

Interactive tools (AskUserQuestion, ExitPlanMode) are exempt from SHOW_TOOLS=false so their UI still surfaces.

7. New /context command

Added to CC_COMMANDS in bot.py. Forwards to Claude Code via the existing slash-command path to display context-window usage.

8. State migration — old thread_bindings format

Old state.json files stored each binding as a dict: {\"window_id\": \"@4\", \"chat_id\": -1001…}. The new format is a plain string (\"@4\") with chat_id tracked separately in group_chat_ids (keyed by \"{user_id}:{thread_id}\"). SessionManager._load_state now detects the old shape per-entry and migrates chat_idgroup_chat_ids transparently. Fresh installs are unaffected.

9. Tmux cwd sanitation

TmuxManager.list_windows now strips the \" (deleted)\" suffix that tmux appends when the pane's cwd has been removed — otherwise downstream path comparisons failed on deleted-then-recreated directories.

10. Tests

  • tests/ccbot/handlers/test_message_queue.py (new, 50 lines) — per-topic queue keying, worker lifecycle.
  • tests/ccbot/handlers/test_status_polling.py (+105 lines) — JSONL-vs-polling coordination, feedback dismiss, silent reopen probe.
  • tests/ccbot/handlers/test_interactive_ui.py (+54 lines) — numbered selector detection, AskUserQuestion/ExitPlanMode routing.
  • tests/ccbot/test_terminal_parser.py (+17 lines) — status stats stripping regex.
  • tests/ccbot/conftest.py (+24 lines) — shared fixtures for session/state manager mocks.

Test plan

  • uv run ruff check src/ tests/
  • uv run ruff format --check src/ tests/
  • uv run pyright src/ccbot/
  • uv run pytest tests/ (250 tests, ~0.3s)
  • Drive messages across 3+ topics in parallel; confirm one topic's burst does not stall others
  • Restart the bot while the user is active in several groups; confirm no 429 trip on the first requests
  • Trigger AskUserQuestion and ExitPlanMode under both normal and slow-render conditions; confirm exactly one UI is sent
  • Let Claude Code render a feedback survey; confirm auto-dismiss without user action
  • Delete a topic in the Telegram client; confirm window kill and state cleanup on next poll
  • Send /context in a bound topic; confirm forwarding works
  • Toggle SHOW_THINKING=false and SHOW_TOOLS=false; confirm real-time and history suppression (and that interactive tool UIs still surface)
  • Start with an old-format state.json; confirm bindings and group chat IDs are recovered

🤖 Generated with Claude Code

LikunYDev and others added 17 commits February 21, 2026 15:25
Adds restart-service.sh that restarts ccbot via systemctl --user,
keeping existing Claude Code tmux sessions alive.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Per-topic message queues (was per-user) so sessions in different topics
no longer block each other. Fire-and-forget monitor callbacks grouped by
session_id for concurrent cross-session dispatch. Status polling hoists
list_windows() to one call per cycle with parallel pane captures.
Session file reads parallelized via asyncio.gather.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Per-group-chat processing lock serializes sends across topic workers,
typing indicators throttled to 1/4s, redundant _check_and_send_status
removed (status polling loop already covers this), and group rate
limiter buckets pre-filled on restart to account for Telegram's
persisted server-side counter.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- _load_state now handles both old (dict with window_id+chat_id) and
  new (plain string) thread_bindings format, migrating chat_id to
  group_chat_ids automatically.
- restart.sh now uses uv tool install + systemctl instead of running
  uv run ccbot in a tmux pane, ensuring code changes are picked up.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Suppress tool_use/tool_result messages when SHOW_TOOLS=false (interactive
tools like AskUserQuestion/ExitPlanMode still pass through). Strip timer
stats from status lines before dedup comparison so edits only fire when
the action name changes, not every second.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
# Conflicts:
#	src/ccbot/handlers/message_queue.py
#	src/ccbot/session_monitor.py
…eractive UIs

- Add fallback UIPattern for numbered selector format (❯ 1. Yes / 2. No)
  that Claude Code now uses for ExitPlanMode
- Defer ExitPlanMode/AskUserQuestion to JSONL queue path when session exists,
  ensuring content messages arrive before the interactive UI
- Improve _STATUS_STATS_RE to handle minutes format and trailing text
  (e.g. "Working… (2m 9s · …) Esc to interrupt")
- Add comprehensive tests for new detection and routing logic

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…up improvements

- Add Feedback UIPattern detection and auto-dismiss by sending "0" key
- Replace sendMessage(".") topic probe with silent reopen_forum_topic
  (no more visible "." flash in topics)
- Strip trailing time durations ("Churned for 9m 58s") for status dedup,
  preventing repeated edits as timer counts up
- Strip " (deleted)" suffix from tmux pane cwd to fix session matching
  when a directory is deleted

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…d selector support

Pass thread_id to get_message_queue for correct per-topic routing. Add
fallback in status polling to send interactive UI when JSONL path misses
the pane capture. Add numbered selector pattern for AskUserQuestion. Pin
telegramify-markdown to <1.0.0.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Remove the fallback sender in status_polling that raced with the JSONL
path, causing the interactive UI to be sent twice. Instead, make the
JSONL path (the sole sender) retry pane capture with increasing delays
(0.3s, 0.7s, 1.0s) to handle slow UI rendering.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
fix: prevent duplicate interactive UI messages
# Conflicts:
#	pyproject.toml
#	src/ccbot/handlers/message_queue.py
#	src/ccbot/handlers/status_polling.py
Removes scripts/restart-service.sh and restores scripts/restart.sh to
upstream/main to keep this PR focused on core queue/rate-limit logic.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@LikunYDev LikunYDev changed the title Topic queue rate limit fixes fix: per-topic queues, 429 hardening, and interactive UI reliability Apr 15, 2026
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@LikunYDev LikunYDev marked this pull request as ready for review April 15, 2026 02:41
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant