Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 74 additions & 82 deletions backend/integrations/discord/bot.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,24 @@
import discord
from discord.ext import commands
import logging
from typing import Dict, Any, Optional
from app.core.orchestration.queue_manager import AsyncQueueManager, QueuePriority
from app.classification.classification_router import ClassificationRouter
import os
import asyncio
from typing import Dict, Optional

from backend.rate_limiter import DiscordRateLimiter
from app.agents.devrel.github.github_toolkit import GitHubToolkit

logger = logging.getLogger(__name__)


class DiscordBot(commands.Bot):
"""Discord bot with LangGraph agent integration"""
"""
DEV MODE Discord Bot
Direct GitHubToolkit execution
Per-channel rate limiting + simple queue (Lock-based)
"""

def __init__(self, queue_manager: AsyncQueueManager, **kwargs):
def __init__(self, **kwargs):
intents = discord.Intents.default()
intents.message_content = True
intents.guilds = True
Expand All @@ -24,89 +32,72 @@ def __init__(self, queue_manager: AsyncQueueManager, **kwargs):
**kwargs
)

self.queue_manager = queue_manager
self.classifier = ClassificationRouter()
self.active_threads: Dict[str, str] = {}
self._register_queue_handlers()
self.channel_locks: Dict[str, asyncio.Lock] = {}

# Redis-enabled per-channel rate limiter
self.rate_limiter = DiscordRateLimiter(
redis_url=os.getenv("REDIS_URL"),
max_retries=3
)
Comment on lines +36 to +42
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟑 Minor

channel_locks dict grows unboundedly.

A new asyncio.Lock is allocated for every unique channel ID and never evicted. Over time (especially with thread-per-user model in _get_or_create_thread), this will leak memory. Consider an LRU eviction strategy or periodic cleanup of locks for channels no longer in active_threads.

πŸ€– Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/integrations/discord/bot.py` around lines 36 - 42, channel_locks
currently grows unbounded because a new asyncio.Lock is created per channel in
_get_or_create_thread and never removed; fix by adding eviction/cleanup: either
replace channel_locks with an LRU-backed map (e.g., an OrderedDict LRU with a
fixed max size) or remove the lock entry when a channel/thread is closed, and
ensure any thread-cleanup code (where active_threads is updated) also deletes
self.channel_locks[channel_id]; reference channel_locks and
_get_or_create_thread (and the code path that removes entries from
active_threads) and implement one of these strategies so locks for inactive
channels are evicted.


def _register_queue_handlers(self):
"""Register handlers for queue messages"""
self.queue_manager.register_handler("discord_response", self._handle_agent_response)
def _get_channel_lock(self, channel_id: str) -> asyncio.Lock:
if channel_id not in self.channel_locks:
self.channel_locks[channel_id] = asyncio.Lock()
return self.channel_locks[channel_id]

async def on_ready(self):
"""Bot ready event"""
logger.info(f'Enhanced Discord bot logged in as {self.user}')
logger.info(f'Bot logged in as {self.user}')
print(f'Bot is ready! Logged in as {self.user}')

try:
synced = await self.tree.sync()
print(f"Synced {len(synced)} slash command(s)")
except Exception as e:
print(f"Failed to sync slash commands: {e}")

async def on_message(self, message):
"""Handles regular chat messages, but ignores slash commands."""
if message.author == self.user:
return

if message.interaction_metadata is not None:
return

try:
triage_result = await self.classifier.should_process_message(
message.content,
{
"channel_id": str(message.channel.id),
"user_id": str(message.author.id),
"guild_id": str(message.guild.id) if message.guild else None
}
)

if triage_result.get("needs_devrel", False):
await self._handle_devrel_message(message, triage_result)

except Exception as e:
logger.error(f"Error processing message: {str(e)}")

async def _handle_devrel_message(self, message, triage_result: Dict[str, Any]):
"""This now handles both new requests and follow-ups in threads."""
try:
user_id = str(message.author.id)
thread_id = await self._get_or_create_thread(message, user_id)
thread = self.get_channel(int(thread_id))

agent_message = {
"type": "devrel_request",
"id": f"discord_{message.id}",
"user_id": user_id,
"channel_id": str(message.channel.id),
"thread_id": thread_id,
"memory_thread_id": user_id,
"content": message.content,
"triage": triage_result,
"classification": triage_result,
"platform": "discord",
"timestamp": message.created_at.isoformat(),
"author": {
"username": message.author.name,
"display_name": message.author.display_name,
"avatar_url": str(message.author.avatar.url) if message.author.avatar else None
}
}
priority_map = {"high": QueuePriority.HIGH,
"medium": QueuePriority.MEDIUM,
"low": QueuePriority.LOW
}
priority = priority_map.get(triage_result.get("priority"), QueuePriority.MEDIUM)
await self.queue_manager.enqueue(agent_message, priority)

# --- "PROCESSING" MESSAGE RESTORED ---
if thread_id:
thread = self.get_channel(int(thread_id))
if thread:
await thread.send("I'm processing your request, please hold on...")
# ------------------------------------
if not thread:
return

channel_id = str(thread.id)
lock = self._get_channel_lock(channel_id)

async with lock:

# Send processing message
await self.rate_limiter.execute_with_retry(
thread.send,
channel_id,
"Processing your request..."
)

# Execute toolkit
toolkit = GitHubToolkit()
result = await toolkit.execute(message.content)
response_text = result.get("message", "No response generated.")

# Send response in chunks
for i in range(0, len(response_text), 2000):
await self.rate_limiter.execute_with_retry(
thread.send,
channel_id,
response_text[i:i+2000]
)
Comment on lines +77 to +97
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

User receives "Processing…" but no error feedback if toolkit execution fails.

If GitHubToolkit().execute() or the chunked-send loop raises, the broad except Exception at Line 99 silently logs the error. The user has already seen "Processing your request…" and will be left waiting with no follow-up. Consider sending an error message to the thread in the except block.

Also, logger.error(f"Error processing message: {str(e)}") loses the traceback. Use logger.exception(...) to preserve it.

♻️ Proposed fix
         except Exception as e:
-            logger.error(f"Error processing message: {str(e)}")
+            logger.exception("Error processing message")
+            try:
+                thread = self.get_channel(int(thread_id))
+                if thread:
+                    await thread.send("Sorry, something went wrong processing your request.")
+            except Exception:
+                logger.exception("Failed to send error message to user")
πŸ€– Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/integrations/discord/bot.py` around lines 77 - 97, When handling
failures around GitHubToolkit().execute() and the chunked sends, ensure the
except block both logs the full traceback using logger.exception(...) and
informs the user by sending an error reply to the thread; specifically, in the
try/except that surrounds GitHubToolkit, call
self.rate_limiter.execute_with_retry(thread.send, channel_id, "An error occurred
while processing your request.") (or a brief user-facing message) inside the
except before re-raising or returning, and replace logger.error(...) with
logger.exception("Error processing message") to preserve the traceback; this
targets the GitHubToolkit, rate_limiter.execute_with_retry, and thread.send
callsites.


except Exception as e:
logger.error(f"Error handling DevRel message: {str(e)}")
logger.error(f"Error processing message: {str(e)}")

async def _get_or_create_thread(self, message, user_id: str) -> Optional[str]:
try:
Expand All @@ -118,28 +109,29 @@ async def _get_or_create_thread(self, message, user_id: str) -> Optional[str]:
else:
del self.active_threads[user_id]

# This part only runs if it's not a follow-up message in an active thread.
if isinstance(message.channel, discord.TextChannel):
thread_name = f"DevRel Chat - {message.author.display_name}"
thread = await message.create_thread(name=thread_name, auto_archive_duration=60)
thread = await message.create_thread(
name=thread_name,
auto_archive_duration=60
)

self.active_threads[user_id] = str(thread.id)
await thread.send(f"Hello {message.author.mention}! I've created this thread to help you. How can I assist?")

channel_id = str(thread.id)
lock = self._get_channel_lock(channel_id)

async with lock:
await self.rate_limiter.execute_with_retry(
thread.send,
channel_id,
f"Hello {message.author.mention}! "
"I've created this thread to help you."
)

return str(thread.id)

except Exception as e:
logger.error(f"Failed to create thread: {e}")
return str(message.channel.id)

async def _handle_agent_response(self, response_data: Dict[str, Any]):
try:
thread_id = response_data.get("thread_id")
response_text = response_data.get("response", "")
if not thread_id or not response_text:
return
thread = self.get_channel(int(thread_id))
if thread:
for i in range(0, len(response_text), 2000):
await thread.send(response_text[i:i+2000])
else:
logger.error(f"Thread {thread_id} not found for agent response")
except Exception as e:
logger.error(f"Error handling agent response: {str(e)}")
return str(message.channel.id)
87 changes: 87 additions & 0 deletions backend/rate_limiter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
import asyncio
import random
import logging
import time
import discord
import redis.asyncio as redis


logger = logging.getLogger(__name__)


class DiscordRateLimiter:
def __init__(self, redis_url: str | None = None, max_retries: int = 3):
self.max_retries = max_retries
self.redis = redis.from_url(redis_url) if redis_url else None
Comment on lines +12 to +15
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Missing Redis connection error handling and cleanup.

If Redis becomes unavailable after initialization, _wait_if_limited and _set_limit will raise connection errors that are not caught by the except discord.HTTPException handler in execute_with_retry, causing the entire call to fail even though Redis is optional. Additionally, there's no close() method to cleanly shut down the Redis connection pool.

Consider wrapping Redis operations in try/except and adding a cleanup method:

♻️ Suggested resilience pattern
 class DiscordRateLimiter:
     def __init__(self, redis_url: str | None = None, max_retries: int = 3):
         self.max_retries = max_retries
         self.redis = redis.from_url(redis_url) if redis_url else None
+
+    async def close(self):
+        if self.redis:
+            await self.redis.close()

And in _wait_if_limited / _set_limit, wrap Redis calls:

try:
    reset_time = await self.redis.get(key)
except Exception:
    logger.warning("Redis unavailable, skipping rate-limit check")
    return
πŸ€– Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/rate_limiter.py` around lines 12 - 15, Wrap all Redis interactions in
DiscordRateLimiter (notably in _wait_if_limited and _set_limit) with broad
try/except around await self.redis.* calls so Redis connection errors are
caught, logged as warnings, and the functions return/skip rate-limit logic
instead of propagating; also update execute_with_retry to avoid relying on Redis
exceptions being handled there (i.e., ensure Redis errors are handled inside the
Redis helper methods and do not reach the discord.HTTPException except block).
Finally, add an async close method (e.g., async def close(self):) that checks if
self.redis is set and then closes the connection pool properly (call the
appropriate close/wait_closed semantics on the redis client) to allow clean
shutdown and resource cleanup.


def _calculate_backoff(self, attempt: int, retry_after: float) -> float:
"""
Exponential backoff with jitter:
delay = (2 ** attempt * retry_after) + jitter
"""
jitter = random.uniform(0, 0.3)
return (2 ** attempt * retry_after) + jitter

async def _wait_if_limited(self, bucket: str) -> None:
"""Wait if Redis indicates this bucket is currently rate limited."""
if not self.redis:
return

key = f"discord_ratelimit:{bucket}"
reset_time = await self.redis.get(key)

if reset_time:
delay = float(reset_time) - time.time()
if delay > 0:
logger.warning(
f"Bucket {bucket} rate limited. Waiting {delay:.2f}s"
)
await asyncio.sleep(delay)

async def _set_limit(self, bucket: str, retry_after: float) -> None:
"""Store rate limit reset timestamp in Redis."""
if not self.redis:
return

key = f"discord_ratelimit:{bucket}"
reset_at = time.time() + retry_after

await self.redis.set(
key,
reset_at,
ex=int(retry_after) + 1
)

async def execute_with_retry(self, func, bucket: str, *args, **kwargs):
"""
Execute a Discord API call with automatic retry on 429.
"""

for attempt in range(self.max_retries + 1):
try:
await self._wait_if_limited(bucket)
return await func(*args, **kwargs)

except discord.HTTPException as e:
if e.status != 429:
raise

retry_after = getattr(e, "retry_after", 1)

await self._set_limit(bucket, retry_after)

delay = self._calculate_backoff(attempt, retry_after)

logger.warning(
f"429 hit for bucket {bucket}. "
f"Attempt {attempt + 1}/{self.max_retries}. "
f"Retrying in {delay:.2f}s"
)

await asyncio.sleep(delay)

logger.error(f"Max retries exhausted for bucket {bucket}")
raise discord.HTTPException(
response=None,
message="Discord rate limit exceeded after retries."
Comment on lines +55 to +86
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟑 Minor

Wasted sleep on last failed attempt and misleading log message.

Two issues in the retry loop:

  1. Wasted sleep on final attempt: When attempt == max_retries (the last iteration), a 429 triggers asyncio.sleep(delay) at Line 81, but the loop ends immediately after and falls through to the raise at Line 84. This adds unnecessary latency before the final error.

  2. Misleading log: f"Attempt {attempt + 1}/{self.max_retries}" β€” with max_retries=3, the last iteration logs "Attempt 4/3", which is confusing.

♻️ Proposed fix
             except discord.HTTPException as e:
                 if e.status != 429:
                     raise
 
                 retry_after = getattr(e, "retry_after", 1)
 
                 await self._set_limit(bucket, retry_after)
 
+                if attempt >= self.max_retries:
+                    break
+
                 delay = self._calculate_backoff(attempt, retry_after)
 
                 logger.warning(
                     f"429 hit for bucket {bucket}. "
-                    f"Attempt {attempt + 1}/{self.max_retries}. "
+                    f"Attempt {attempt + 1}/{self.max_retries + 1}. "
                     f"Retrying in {delay:.2f}s"
                 )
 
                 await asyncio.sleep(delay)
πŸ€– Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/rate_limiter.py` around lines 55 - 86, In execute_with_retry, avoid
sleeping after the final retry and fix the attempt count in the log: change the
logged total from self.max_retries to self.max_retries + 1 (e.g.
logger.warning(f"Attempt {attempt + 1}/{self.max_retries + 1}...")) and only
call asyncio.sleep(delay) when attempt < self.max_retries (i.e. wrap the sleep
in an if attempt < self.max_retries: ... block); keep the rest of the 429
handling (_set_limit, _calculate_backoff, logger.warning) unchanged so the loop
exits immediately on the last failed attempt and the final raise happens without
extra delay.

)
Comment on lines +83 to +87
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | πŸ”΄ Critical

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Check discord.HTTPException implementation
pip install discord.py==2.4.0 2>/dev/null
python3 << 'EOF'
import inspect
import discord

# Get the HTTPException class
try:
    src = inspect.getsource(discord.HTTPException.__init__)
    print("=== discord.HTTPException.__init__ ===")
    print(src)
    print("\n")
except Exception as e:
    print(f"Error getting __init__: {e}")

# Also check the class definition
try:
    src = inspect.getsource(discord.HTTPException)
    print("=== discord.HTTPException class (first 100 lines) ===")
    lines = src.split('\n')[:100]
    print('\n'.join(lines))
except Exception as e:
    print(f"Error getting class: {e}")
EOF

Repository: AOSSIE-Org/Devr.AI

Length of output: 5210


🏁 Script executed:

# Check the context in backend/rate_limiter.py
cat -n backend/rate_limiter.py | sed -n '75,95p'

Repository: AOSSIE-Org/Devr.AI

Length of output: 620


🏁 Script executed:

# Search for usages of discord.HTTPException in the codebase
rg "discord\.HTTPException" --type py -A 3 -B 3

Repository: AOSSIE-Org/Devr.AI

Length of output: 1233


Replace response=None with a valid mock response object or refactor to avoid discord.HTTPException.

Passing response=None to discord.HTTPException will cause an AttributeError at instantiation. The __init__ method immediately accesses response.status on line 2, and the error message formatting accesses both response.status and response.reason. This will crash the code before the exception can be raised.

Consider creating a mock response object (as done in tests with MagicMock() and response.status = 429) or using a custom exception for rate limit exhaustion instead.

πŸ€– Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/rate_limiter.py` around lines 83 - 87, logger.error("Max retries
exhausted for bucket {bucket}") is followed by raising discord.HTTPException
with response=None which will raise an AttributeError because
discord.HTTPException expects response.status and response.reason; fix by
replacing the discord.HTTPException call in the retry-exhausted branch with
either (a) construct and pass a minimal mock response object that provides
.status and .reason (e.g., a simple object with those attributes) so the
discord.HTTPException can be instantiated, or (b) instead raise a custom
exception (e.g., RateLimitExceeded or MaxRetriesExceeded) defined in this module
and use that throughout the rate limiter; update any callers of
discord.HTTPException to handle the new custom exception if you choose option
(b).

Loading