-
-
Notifications
You must be signed in to change notification settings - Fork 144
Add Discord rate limiting with exponential backoff and Redis coordination (#284) #288
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. Weβll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,16 +1,24 @@ | ||
| import discord | ||
| from discord.ext import commands | ||
| import logging | ||
| from typing import Dict, Any, Optional | ||
| from app.core.orchestration.queue_manager import AsyncQueueManager, QueuePriority | ||
| from app.classification.classification_router import ClassificationRouter | ||
| import os | ||
| import asyncio | ||
| from typing import Dict, Optional | ||
|
|
||
| from backend.rate_limiter import DiscordRateLimiter | ||
| from app.agents.devrel.github.github_toolkit import GitHubToolkit | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
|
|
||
| class DiscordBot(commands.Bot): | ||
| """Discord bot with LangGraph agent integration""" | ||
| """ | ||
| DEV MODE Discord Bot | ||
| Direct GitHubToolkit execution | ||
| Per-channel rate limiting + simple queue (Lock-based) | ||
| """ | ||
|
|
||
| def __init__(self, queue_manager: AsyncQueueManager, **kwargs): | ||
| def __init__(self, **kwargs): | ||
| intents = discord.Intents.default() | ||
| intents.message_content = True | ||
| intents.guilds = True | ||
|
|
@@ -24,89 +32,72 @@ def __init__(self, queue_manager: AsyncQueueManager, **kwargs): | |
| **kwargs | ||
| ) | ||
|
|
||
| self.queue_manager = queue_manager | ||
| self.classifier = ClassificationRouter() | ||
| self.active_threads: Dict[str, str] = {} | ||
| self._register_queue_handlers() | ||
| self.channel_locks: Dict[str, asyncio.Lock] = {} | ||
|
|
||
| # Redis-enabled per-channel rate limiter | ||
| self.rate_limiter = DiscordRateLimiter( | ||
| redis_url=os.getenv("REDIS_URL"), | ||
| max_retries=3 | ||
| ) | ||
|
|
||
| def _register_queue_handlers(self): | ||
| """Register handlers for queue messages""" | ||
| self.queue_manager.register_handler("discord_response", self._handle_agent_response) | ||
| def _get_channel_lock(self, channel_id: str) -> asyncio.Lock: | ||
| if channel_id not in self.channel_locks: | ||
| self.channel_locks[channel_id] = asyncio.Lock() | ||
| return self.channel_locks[channel_id] | ||
|
|
||
| async def on_ready(self): | ||
| """Bot ready event""" | ||
| logger.info(f'Enhanced Discord bot logged in as {self.user}') | ||
| logger.info(f'Bot logged in as {self.user}') | ||
| print(f'Bot is ready! Logged in as {self.user}') | ||
|
|
||
| try: | ||
| synced = await self.tree.sync() | ||
| print(f"Synced {len(synced)} slash command(s)") | ||
| except Exception as e: | ||
| print(f"Failed to sync slash commands: {e}") | ||
|
|
||
| async def on_message(self, message): | ||
| """Handles regular chat messages, but ignores slash commands.""" | ||
| if message.author == self.user: | ||
| return | ||
|
|
||
| if message.interaction_metadata is not None: | ||
| return | ||
|
|
||
| try: | ||
| triage_result = await self.classifier.should_process_message( | ||
| message.content, | ||
| { | ||
| "channel_id": str(message.channel.id), | ||
| "user_id": str(message.author.id), | ||
| "guild_id": str(message.guild.id) if message.guild else None | ||
| } | ||
| ) | ||
|
|
||
| if triage_result.get("needs_devrel", False): | ||
| await self._handle_devrel_message(message, triage_result) | ||
|
|
||
| except Exception as e: | ||
| logger.error(f"Error processing message: {str(e)}") | ||
|
|
||
| async def _handle_devrel_message(self, message, triage_result: Dict[str, Any]): | ||
| """This now handles both new requests and follow-ups in threads.""" | ||
| try: | ||
| user_id = str(message.author.id) | ||
| thread_id = await self._get_or_create_thread(message, user_id) | ||
| thread = self.get_channel(int(thread_id)) | ||
|
|
||
| agent_message = { | ||
| "type": "devrel_request", | ||
| "id": f"discord_{message.id}", | ||
| "user_id": user_id, | ||
| "channel_id": str(message.channel.id), | ||
| "thread_id": thread_id, | ||
| "memory_thread_id": user_id, | ||
| "content": message.content, | ||
| "triage": triage_result, | ||
| "classification": triage_result, | ||
| "platform": "discord", | ||
| "timestamp": message.created_at.isoformat(), | ||
| "author": { | ||
| "username": message.author.name, | ||
| "display_name": message.author.display_name, | ||
| "avatar_url": str(message.author.avatar.url) if message.author.avatar else None | ||
| } | ||
| } | ||
| priority_map = {"high": QueuePriority.HIGH, | ||
| "medium": QueuePriority.MEDIUM, | ||
| "low": QueuePriority.LOW | ||
| } | ||
| priority = priority_map.get(triage_result.get("priority"), QueuePriority.MEDIUM) | ||
| await self.queue_manager.enqueue(agent_message, priority) | ||
|
|
||
| # --- "PROCESSING" MESSAGE RESTORED --- | ||
| if thread_id: | ||
| thread = self.get_channel(int(thread_id)) | ||
| if thread: | ||
| await thread.send("I'm processing your request, please hold on...") | ||
| # ------------------------------------ | ||
| if not thread: | ||
| return | ||
|
|
||
| channel_id = str(thread.id) | ||
| lock = self._get_channel_lock(channel_id) | ||
|
|
||
| async with lock: | ||
|
|
||
| # Send processing message | ||
| await self.rate_limiter.execute_with_retry( | ||
| thread.send, | ||
| channel_id, | ||
| "Processing your request..." | ||
| ) | ||
|
|
||
| # Execute toolkit | ||
| toolkit = GitHubToolkit() | ||
| result = await toolkit.execute(message.content) | ||
| response_text = result.get("message", "No response generated.") | ||
|
|
||
| # Send response in chunks | ||
| for i in range(0, len(response_text), 2000): | ||
| await self.rate_limiter.execute_with_retry( | ||
| thread.send, | ||
| channel_id, | ||
| response_text[i:i+2000] | ||
| ) | ||
|
Comment on lines
+77
to
+97
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. User receives "Processingβ¦" but no error feedback if toolkit execution fails. If Also, β»οΈ Proposed fix except Exception as e:
- logger.error(f"Error processing message: {str(e)}")
+ logger.exception("Error processing message")
+ try:
+ thread = self.get_channel(int(thread_id))
+ if thread:
+ await thread.send("Sorry, something went wrong processing your request.")
+ except Exception:
+ logger.exception("Failed to send error message to user")π€ Prompt for AI Agents |
||
|
|
||
| except Exception as e: | ||
| logger.error(f"Error handling DevRel message: {str(e)}") | ||
| logger.error(f"Error processing message: {str(e)}") | ||
|
|
||
| async def _get_or_create_thread(self, message, user_id: str) -> Optional[str]: | ||
| try: | ||
|
|
@@ -118,28 +109,29 @@ async def _get_or_create_thread(self, message, user_id: str) -> Optional[str]: | |
| else: | ||
| del self.active_threads[user_id] | ||
|
|
||
| # This part only runs if it's not a follow-up message in an active thread. | ||
| if isinstance(message.channel, discord.TextChannel): | ||
| thread_name = f"DevRel Chat - {message.author.display_name}" | ||
| thread = await message.create_thread(name=thread_name, auto_archive_duration=60) | ||
| thread = await message.create_thread( | ||
| name=thread_name, | ||
| auto_archive_duration=60 | ||
| ) | ||
|
|
||
| self.active_threads[user_id] = str(thread.id) | ||
| await thread.send(f"Hello {message.author.mention}! I've created this thread to help you. How can I assist?") | ||
|
|
||
| channel_id = str(thread.id) | ||
| lock = self._get_channel_lock(channel_id) | ||
|
|
||
| async with lock: | ||
| await self.rate_limiter.execute_with_retry( | ||
| thread.send, | ||
| channel_id, | ||
| f"Hello {message.author.mention}! " | ||
| "I've created this thread to help you." | ||
| ) | ||
|
|
||
| return str(thread.id) | ||
|
|
||
| except Exception as e: | ||
| logger.error(f"Failed to create thread: {e}") | ||
| return str(message.channel.id) | ||
|
|
||
| async def _handle_agent_response(self, response_data: Dict[str, Any]): | ||
| try: | ||
| thread_id = response_data.get("thread_id") | ||
| response_text = response_data.get("response", "") | ||
| if not thread_id or not response_text: | ||
| return | ||
| thread = self.get_channel(int(thread_id)) | ||
| if thread: | ||
| for i in range(0, len(response_text), 2000): | ||
| await thread.send(response_text[i:i+2000]) | ||
| else: | ||
| logger.error(f"Thread {thread_id} not found for agent response") | ||
| except Exception as e: | ||
| logger.error(f"Error handling agent response: {str(e)}") | ||
| return str(message.channel.id) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,87 @@ | ||
| import asyncio | ||
| import random | ||
| import logging | ||
| import time | ||
| import discord | ||
| import redis.asyncio as redis | ||
|
|
||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
|
|
||
| class DiscordRateLimiter: | ||
| def __init__(self, redis_url: str | None = None, max_retries: int = 3): | ||
| self.max_retries = max_retries | ||
| self.redis = redis.from_url(redis_url) if redis_url else None | ||
|
Comment on lines
+12
to
+15
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Missing Redis connection error handling and cleanup. If Redis becomes unavailable after initialization, Consider wrapping Redis operations in try/except and adding a cleanup method: β»οΈ Suggested resilience pattern class DiscordRateLimiter:
def __init__(self, redis_url: str | None = None, max_retries: int = 3):
self.max_retries = max_retries
self.redis = redis.from_url(redis_url) if redis_url else None
+
+ async def close(self):
+ if self.redis:
+ await self.redis.close()And in try:
reset_time = await self.redis.get(key)
except Exception:
logger.warning("Redis unavailable, skipping rate-limit check")
returnπ€ Prompt for AI Agents |
||
|
|
||
| def _calculate_backoff(self, attempt: int, retry_after: float) -> float: | ||
| """ | ||
| Exponential backoff with jitter: | ||
| delay = (2 ** attempt * retry_after) + jitter | ||
| """ | ||
| jitter = random.uniform(0, 0.3) | ||
| return (2 ** attempt * retry_after) + jitter | ||
|
|
||
| async def _wait_if_limited(self, bucket: str) -> None: | ||
| """Wait if Redis indicates this bucket is currently rate limited.""" | ||
| if not self.redis: | ||
| return | ||
|
|
||
| key = f"discord_ratelimit:{bucket}" | ||
| reset_time = await self.redis.get(key) | ||
|
|
||
| if reset_time: | ||
| delay = float(reset_time) - time.time() | ||
| if delay > 0: | ||
| logger.warning( | ||
| f"Bucket {bucket} rate limited. Waiting {delay:.2f}s" | ||
| ) | ||
| await asyncio.sleep(delay) | ||
|
|
||
| async def _set_limit(self, bucket: str, retry_after: float) -> None: | ||
| """Store rate limit reset timestamp in Redis.""" | ||
| if not self.redis: | ||
| return | ||
|
|
||
| key = f"discord_ratelimit:{bucket}" | ||
| reset_at = time.time() + retry_after | ||
|
|
||
| await self.redis.set( | ||
| key, | ||
| reset_at, | ||
| ex=int(retry_after) + 1 | ||
| ) | ||
|
|
||
| async def execute_with_retry(self, func, bucket: str, *args, **kwargs): | ||
| """ | ||
| Execute a Discord API call with automatic retry on 429. | ||
| """ | ||
|
|
||
| for attempt in range(self.max_retries + 1): | ||
| try: | ||
| await self._wait_if_limited(bucket) | ||
| return await func(*args, **kwargs) | ||
|
|
||
| except discord.HTTPException as e: | ||
| if e.status != 429: | ||
| raise | ||
|
|
||
| retry_after = getattr(e, "retry_after", 1) | ||
|
|
||
| await self._set_limit(bucket, retry_after) | ||
|
|
||
| delay = self._calculate_backoff(attempt, retry_after) | ||
|
|
||
| logger.warning( | ||
| f"429 hit for bucket {bucket}. " | ||
| f"Attempt {attempt + 1}/{self.max_retries}. " | ||
| f"Retrying in {delay:.2f}s" | ||
| ) | ||
|
|
||
| await asyncio.sleep(delay) | ||
|
|
||
| logger.error(f"Max retries exhausted for bucket {bucket}") | ||
| raise discord.HTTPException( | ||
| response=None, | ||
| message="Discord rate limit exceeded after retries." | ||
|
Comment on lines
+55
to
+86
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Wasted sleep on last failed attempt and misleading log message. Two issues in the retry loop:
β»οΈ Proposed fix except discord.HTTPException as e:
if e.status != 429:
raise
retry_after = getattr(e, "retry_after", 1)
await self._set_limit(bucket, retry_after)
+ if attempt >= self.max_retries:
+ break
+
delay = self._calculate_backoff(attempt, retry_after)
logger.warning(
f"429 hit for bucket {bucket}. "
- f"Attempt {attempt + 1}/{self.max_retries}. "
+ f"Attempt {attempt + 1}/{self.max_retries + 1}. "
f"Retrying in {delay:.2f}s"
)
await asyncio.sleep(delay)π€ Prompt for AI Agents |
||
| ) | ||
|
Comment on lines
+83
to
+87
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. π§© Analysis chainπ Script executed: #!/bin/bash
# Check discord.HTTPException implementation
pip install discord.py==2.4.0 2>/dev/null
python3 << 'EOF'
import inspect
import discord
# Get the HTTPException class
try:
src = inspect.getsource(discord.HTTPException.__init__)
print("=== discord.HTTPException.__init__ ===")
print(src)
print("\n")
except Exception as e:
print(f"Error getting __init__: {e}")
# Also check the class definition
try:
src = inspect.getsource(discord.HTTPException)
print("=== discord.HTTPException class (first 100 lines) ===")
lines = src.split('\n')[:100]
print('\n'.join(lines))
except Exception as e:
print(f"Error getting class: {e}")
EOFRepository: AOSSIE-Org/Devr.AI Length of output: 5210 π Script executed: # Check the context in backend/rate_limiter.py
cat -n backend/rate_limiter.py | sed -n '75,95p'Repository: AOSSIE-Org/Devr.AI Length of output: 620 π Script executed: # Search for usages of discord.HTTPException in the codebase
rg "discord\.HTTPException" --type py -A 3 -B 3Repository: AOSSIE-Org/Devr.AI Length of output: 1233 Replace Passing Consider creating a mock response object (as done in tests with π€ Prompt for AI Agents |
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
channel_locksdict grows unboundedly.A new
asyncio.Lockis allocated for every unique channel ID and never evicted. Over time (especially with thread-per-user model in_get_or_create_thread), this will leak memory. Consider an LRU eviction strategy or periodic cleanup of locks for channels no longer inactive_threads.π€ Prompt for AI Agents