diff --git a/.gitignore b/.gitignore index 716b5d33..6aaea051 100644 --- a/.gitignore +++ b/.gitignore @@ -88,7 +88,7 @@ ipython_config.py # pyenv # For a library or package, you might want to ignore these files since the code is # intended to run in multiple environments; otherwise, check them in: -# .python-version +.python-version # pipenv # According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. @@ -108,7 +108,6 @@ ipython_config.py # This is especially recommended for binary packages to ensure reproducibility, and is more # commonly ignored for libraries. # https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control -#poetry.lock # pdm # Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control. diff --git a/backend/app/agents/devrel/nodes/gather_context.py b/backend/app/agents/devrel/nodes/gather_context.py index ec0eb208..ff9ac62d 100644 --- a/backend/app/agents/devrel/nodes/gather_context.py +++ b/backend/app/agents/devrel/nodes/gather_context.py @@ -1,5 +1,5 @@ import logging -from datetime import datetime +from datetime import datetime, timezone from typing import Any, Dict from app.agents.state import AgentState @@ -27,7 +27,7 @@ async def gather_context_node(state: AgentState) -> Dict[str, Any]: new_message = { "role": "user", "content": original_message, - "timestamp": datetime.now().isoformat() + "timestamp": datetime.now(timezone.utc).isoformat() } profile_data: Dict[str, Any] = dict(state.user_profile or {}) diff --git a/backend/app/core/config/settings.py b/backend/app/core/config/settings.py index 1349a02f..4d3eff9c 100644 --- a/backend/app/core/config/settings.py +++ b/backend/app/core/config/settings.py @@ -15,6 +15,7 @@ class Settings(BaseSettings): # Platforms github_token: str = "" discord_bot_token: str = "" + bot_owner_id: Optional[int] = None # DB configuration supabase_url: str diff --git a/backend/app/database/supabase/services.py b/backend/app/database/supabase/services.py index 44755ba0..bfccc633 100644 --- a/backend/app/database/supabase/services.py +++ b/backend/app/database/supabase/services.py @@ -1,6 +1,6 @@ import logging from typing import Dict, Any, Optional -from datetime import datetime +from datetime import datetime, timezone import uuid from app.database.supabase.client import get_supabase_client @@ -43,7 +43,7 @@ async def ensure_user_exists( # Update last_active timestamp last_active_column = f"last_active_{platform}" await supabase.table("users").update({ - last_active_column: datetime.now().isoformat() + last_active_column: datetime.now(timezone.utc).isoformat() }).eq("id", user_uuid).execute() return user_uuid @@ -64,7 +64,7 @@ async def ensure_user_exists( # Set last_active timestamp last_active_column = f"last_active_{platform}" - new_user[last_active_column] = datetime.now().isoformat() + new_user[last_active_column] = datetime.now(timezone.utc).isoformat() insert_response = await supabase.table("users").insert(new_user).execute() diff --git a/backend/app/models/database/weaviate.py b/backend/app/models/database/weaviate.py index a685a091..390c03dd 100644 --- a/backend/app/models/database/weaviate.py +++ b/backend/app/models/database/weaviate.py @@ -29,6 +29,32 @@ class WeaviatePullRequest(BaseModel): labels: List[str] = Field(default_factory=list, description="Labels associated with the pull request.") url: str = Field(..., description="The URL of the pull request.") + +class WeaviateCodeChunk(BaseModel): + """Represents a code chunk stored in Weaviate for semantic code search.""" + supabase_chunk_id: str = Field(..., description="The unique identifier linking to Supabase code chunk record.") + code_content: str = Field(..., description="Raw code content for the chunk.") + language: str = Field(..., description="Programming language of the code chunk.") + function_names: List[str] = Field(default_factory=list, description="Function names detected in the code chunk.") + embedding: List[float] = Field(..., description="Vector embedding representation for semantic search.") + created_at: datetime = Field(default_factory=datetime.now, description="Creation timestamp for the chunk record.") + last_updated: datetime = Field(default_factory=datetime.now, description="Last update timestamp for the chunk record.") + + model_config = ConfigDict(from_attributes=True) + + +class WeaviateInteraction(BaseModel): + """Represents an interaction summary stored in Weaviate for semantic retrieval.""" + supabase_interaction_id: str = Field(..., description="The unique identifier linking to Supabase interaction record.") + conversation_summary: str = Field(..., description="Summarized interaction content.") + platform: str = Field(..., description="Origin platform of the interaction (e.g., Discord, Web).") + topics: List[str] = Field(default_factory=list, description="Topics extracted from the interaction.") + embedding: List[float] = Field(..., description="Vector embedding representation for semantic search.") + created_at: datetime = Field(default_factory=datetime.now, description="Creation timestamp for the interaction record.") + last_updated: datetime = Field(default_factory=datetime.now, description="Last update timestamp for the interaction record.") + + model_config = ConfigDict(from_attributes=True) + class WeaviateUserProfile(BaseModel): """ Represents a user's profile data to be stored and indexed in Weaviate. @@ -125,4 +151,13 @@ class WeaviateUserProfile(BaseModel): } } - ) \ No newline at end of file + ) + + +__all__ = [ + "WeaviateRepository", + "WeaviatePullRequest", + "WeaviateCodeChunk", + "WeaviateInteraction", + "WeaviateUserProfile", +] \ No newline at end of file diff --git a/backend/app/services/admin/__init__.py b/backend/app/services/admin/__init__.py new file mode 100644 index 00000000..e70ec3b0 --- /dev/null +++ b/backend/app/services/admin/__init__.py @@ -0,0 +1,15 @@ +from .bot_stats_service import BotStatsService +from .health_check_service import HealthCheckService +from .user_info_service import UserInfoService +from .queue_service import QueueService +from .cache_service import CacheService +from .user_management_service import UserManagementService + +__all__ = [ + "BotStatsService", + "HealthCheckService", + "UserInfoService", + "QueueService", + "CacheService", + "UserManagementService", +] diff --git a/backend/app/services/admin/bot_stats_service.py b/backend/app/services/admin/bot_stats_service.py new file mode 100644 index 00000000..a0a04633 --- /dev/null +++ b/backend/app/services/admin/bot_stats_service.py @@ -0,0 +1,111 @@ +import logging +import psutil +import os +from datetime import datetime, timedelta +from typing import Dict, Any, Optional +from dataclasses import dataclass + +from app.database.supabase.client import get_supabase_client + +logger = logging.getLogger(__name__) + + +@dataclass +class BotStats: + guild_count: int + total_members: int + active_threads: int + latency_ms: int + uptime_seconds: float + memory_mb: float + messages_today: int + messages_week: int + queue_high: int + queue_medium: int + queue_low: int + + +class BotStatsService: + def __init__(self, bot=None, queue_manager=None): + self.bot = bot + self.queue_manager = queue_manager + self._start_time = datetime.now() + + async def get_message_stats(self) -> Dict[str, int]: + try: + supabase = get_supabase_client() + now = datetime.now() + today_start = now.replace(hour=0, minute=0, second=0, microsecond=0) + week_start = today_start - timedelta(days=7) + + today_res = await supabase.table("message_logs").select( + "id", count="exact" + ).gte("created_at", today_start.isoformat()).execute() + + week_res = await supabase.table("message_logs").select( + "id", count="exact" + ).gte("created_at", week_start.isoformat()).execute() + + return { + "today": today_res.count or 0, + "week": week_res.count or 0, + } + except Exception as e: + logger.warning(f"Could not get message stats: {e}") + return {"today": 0, "week": 0} + + async def get_queue_stats(self) -> Dict[str, int]: + try: + if not self.queue_manager or not self.queue_manager.channel: + return {"high": 0, "medium": 0, "low": 0} + + stats = {} + for priority, queue_name in self.queue_manager.queues.items(): + try: + queue = await self.queue_manager.channel.declare_queue( + queue_name, durable=True, passive=True + ) + stats[priority.value] = queue.declaration_result.message_count + except Exception: + stats[priority.value] = 0 + return stats + except Exception as e: + logger.warning(f"Could not get queue stats: {e}") + return {"high": 0, "medium": 0, "low": 0} + + def get_system_stats(self) -> Dict[str, Any]: + try: + process = psutil.Process(os.getpid()) + memory_mb = process.memory_info().rss / 1024 / 1024 + uptime = (datetime.now() - self._start_time).total_seconds() + return { + "memory_mb": round(memory_mb, 2), + "uptime_seconds": uptime, + } + except Exception as e: + logger.warning(f"Could not get system stats: {e}") + return {"memory_mb": 0, "uptime_seconds": 0} + + async def get_all_stats(self) -> BotStats: + message_stats = await self.get_message_stats() + queue_stats = await self.get_queue_stats() + system_stats = self.get_system_stats() + + guild_count = len(self.bot.guilds) if self.bot else 0 + total_members = sum(g.member_count or 0 for g in self.bot.guilds) if self.bot else 0 + active_threads = len(self.bot.active_threads) if self.bot else 0 + latency_ms = round(self.bot.latency * 1000) if self.bot else 0 + + return BotStats( + guild_count=guild_count, + total_members=total_members, + active_threads=active_threads, + latency_ms=latency_ms, + uptime_seconds=system_stats["uptime_seconds"], + memory_mb=system_stats["memory_mb"], + messages_today=message_stats["today"], + messages_week=message_stats["week"], + queue_high=queue_stats.get("high", 0), + queue_medium=queue_stats.get("medium", 0), + queue_low=queue_stats.get("low", 0), + ) diff --git a/backend/app/services/admin/cache_service.py b/backend/app/services/admin/cache_service.py new file mode 100644 index 00000000..ae6e6cd1 --- /dev/null +++ b/backend/app/services/admin/cache_service.py @@ -0,0 +1,60 @@ +import logging +from typing import Dict, Any, Optional +from dataclasses import dataclass + +logger = logging.getLogger(__name__) + + +@dataclass +class CacheInfo: + name: str + size: int + cleared: bool + + +class CacheService: + def __init__(self, bot=None): + self.bot = bot + + async def get_cache_sizes(self) -> Dict[str, int]: + sizes = { + "active_threads": 0, + "embeddings": 0, + "memories": 0, + } + + if self.bot: + sizes["active_threads"] = len(self.bot.active_threads) + + return sizes + + async def clear_active_threads(self) -> int: + if not self.bot: + return 0 + + count = len(self.bot.active_threads) + self.bot.active_threads.clear() + logger.info(f"Cleared {count} active threads from cache") + return count + + async def clear_embeddings(self) -> int: + logger.info("Embeddings cache clear requested (no-op for now)") + return 0 + + async def clear_memories(self) -> int: + logger.info("Memories cache clear requested (no-op for now)") + return 0 + + async def clear_cache(self, cache_type: str = "all") -> Dict[str, int]: + cleared = {} + + if cache_type in ("all", "active_threads"): + cleared["active_threads"] = await self.clear_active_threads() + + if cache_type in ("all", "embeddings"): + cleared["embeddings"] = await self.clear_embeddings() + + if cache_type in ("all", "memories"): + cleared["memories"] = await self.clear_memories() + + return cleared diff --git a/backend/app/services/admin/health_check_service.py b/backend/app/services/admin/health_check_service.py new file mode 100644 index 00000000..3edc9f6b --- /dev/null +++ b/backend/app/services/admin/health_check_service.py @@ -0,0 +1,175 @@ +import logging +import asyncio +import aiohttp +from urllib.parse import urlparse +from datetime import datetime, timezone +from typing import Dict, Any, List +from dataclasses import dataclass, field + +from app.core.config import settings +from app.database.supabase.client import get_supabase_client + +logger = logging.getLogger(__name__) + + +@dataclass +class ServiceHealth: + name: str + status: str # healthy, unhealthy, degraded + latency_ms: float + error: str = "" + + +@dataclass +class SystemHealth: + overall_status: str + services: List[ServiceHealth] = field(default_factory=list) + timestamp: str = "" + + def __post_init__(self): + if not self.timestamp: + self.timestamp = datetime.now(timezone.utc).isoformat() + + +class HealthCheckService: + def __init__(self, queue_manager=None): + self.queue_manager = queue_manager + self.timeout = 10.0 + + async def check_supabase(self) -> ServiceHealth: + start = datetime.now() + try: + supabase = get_supabase_client() + await asyncio.wait_for( + supabase.table("users").select("id").limit(1).execute(), + timeout=self.timeout + ) + latency = (datetime.now() - start).total_seconds() * 1000 + return ServiceHealth("Supabase", "healthy", round(latency, 2)) + except asyncio.TimeoutError: + return ServiceHealth("Supabase", "unhealthy", self.timeout * 1000, "Timeout") + except Exception as e: + latency = (datetime.now() - start).total_seconds() * 1000 + return ServiceHealth("Supabase", "unhealthy", round(latency, 2), str(e)[:100]) + + async def check_rabbitmq(self) -> ServiceHealth: + start = datetime.now() + try: + if not self.queue_manager or not self.queue_manager.connection: + return ServiceHealth("RabbitMQ", "unhealthy", 0, "Not connected") + + if self.queue_manager.connection.is_closed: + return ServiceHealth("RabbitMQ", "unhealthy", 0, "Connection closed") + + latency = (datetime.now() - start).total_seconds() * 1000 + return ServiceHealth("RabbitMQ", "healthy", round(latency, 2)) + except Exception as e: + latency = (datetime.now() - start).total_seconds() * 1000 + return ServiceHealth("RabbitMQ", "unhealthy", round(latency, 2), str(e)[:100]) + + async def check_weaviate(self) -> ServiceHealth: + start = datetime.now() + try: + weaviate_url = getattr(settings, 'weaviate_url', 'http://localhost:8080') + async with aiohttp.ClientSession() as session: + async with session.get( + f"{weaviate_url}/v1/.well-known/ready", + timeout=aiohttp.ClientTimeout(total=self.timeout) + ) as resp: + latency = (datetime.now() - start).total_seconds() * 1000 + if resp.status == 200: + return ServiceHealth("Weaviate", "healthy", round(latency, 2)) + return ServiceHealth("Weaviate", "unhealthy", round(latency, 2), f"Status {resp.status}") + except asyncio.TimeoutError: + return ServiceHealth("Weaviate", "unhealthy", self.timeout * 1000, "Timeout") + except Exception as e: + latency = (datetime.now() - start).total_seconds() * 1000 + return ServiceHealth("Weaviate", "unhealthy", round(latency, 2), str(e)[:100]) + + async def check_falkordb(self) -> ServiceHealth: + start = datetime.now() + client = None + try: + import redis.asyncio as redis + + falkor_url = getattr(settings, 'falkordb_url', None) or "redis://localhost:6379" + + # Normalize non-redis URLs (e.g., http://localhost:6379) to redis:// + parsed = urlparse(falkor_url) + if parsed.scheme not in ("redis", "rediss"): + host = parsed.hostname or "localhost" + port = parsed.port or 6379 + falkor_url = f"redis://{host}:{port}" + + client = redis.from_url(falkor_url, decode_responses=True) + pong = await asyncio.wait_for(client.ping(), timeout=self.timeout) + + latency = (datetime.now() - start).total_seconds() * 1000 + if pong is True or pong == "PONG": + return ServiceHealth("FalkorDB", "healthy", round(latency, 2)) + return ServiceHealth("FalkorDB", "degraded", round(latency, 2), f"Unexpected ping reply: {pong}") + except asyncio.TimeoutError: + return ServiceHealth("FalkorDB", "degraded", self.timeout * 1000, "Timeout") + except Exception as e: + latency = (datetime.now() - start).total_seconds() * 1000 + return ServiceHealth("FalkorDB", "degraded", round(latency, 2), str(e)[:50]) + finally: + if client is not None: + try: + await client.aclose() + except Exception: + try: + await client.close() + except Exception: + pass + + async def check_gemini_api(self) -> ServiceHealth: + start = datetime.now() + try: + if not getattr(settings, 'gemini_api_key', None): + return ServiceHealth("Gemini API", "unhealthy", 0, "No API key") + + async with aiohttp.ClientSession() as session: + async with session.get( + "https://generativelanguage.googleapis.com/v1/models", + params={"key": settings.gemini_api_key}, + timeout=aiohttp.ClientTimeout(total=self.timeout) + ) as resp: + latency = (datetime.now() - start).total_seconds() * 1000 + if resp.status == 200: + return ServiceHealth("Gemini API", "healthy", round(latency, 2)) + return ServiceHealth("Gemini API", "unhealthy", round(latency, 2), f"Status {resp.status}") + except asyncio.TimeoutError: + return ServiceHealth("Gemini API", "degraded", self.timeout * 1000, "Timeout") + except Exception as e: + latency = (datetime.now() - start).total_seconds() * 1000 + return ServiceHealth("Gemini API", "unhealthy", round(latency, 2), str(e)[:100]) + + async def get_all_health(self) -> SystemHealth: + checks = await asyncio.gather( + self.check_supabase(), + self.check_rabbitmq(), + self.check_weaviate(), + self.check_falkordb(), + self.check_gemini_api(), + return_exceptions=True + ) + + services = [] + for check in checks: + if isinstance(check, Exception): + services.append(ServiceHealth("Unknown", "unhealthy", 0, str(check)[:100])) + else: + services.append(check) + + unhealthy = sum(1 for s in services if s.status == "unhealthy") + degraded = sum(1 for s in services if s.status == "degraded") + + if unhealthy > 0: + overall = "unhealthy" + elif degraded > 0: + overall = "degraded" + else: + overall = "healthy" + + return SystemHealth(overall_status=overall, services=services) diff --git a/backend/app/services/admin/queue_service.py b/backend/app/services/admin/queue_service.py new file mode 100644 index 00000000..66bade8b --- /dev/null +++ b/backend/app/services/admin/queue_service.py @@ -0,0 +1,98 @@ +import logging +from typing import Dict, Any, List +from dataclasses import dataclass +from datetime import datetime + +from app.core.orchestration.queue_manager import QueuePriority + +logger = logging.getLogger(__name__) + + +@dataclass +class QueueStats: + priority: str + pending: int + consumers: int + + +@dataclass +class FullQueueStatus: + high: QueueStats + medium: QueueStats + low: QueueStats + total_pending: int + + +class QueueService: + def __init__(self, queue_manager=None): + self.queue_manager = queue_manager + + async def get_queue_stats(self) -> FullQueueStatus: + high = QueueStats("high", 0, 0) + medium = QueueStats("medium", 0, 0) + low = QueueStats("low", 0, 0) + + if not self.queue_manager or not self.queue_manager.channel: + return FullQueueStatus(high, medium, low, 0) + + try: + for priority, queue_name in self.queue_manager.queues.items(): + try: + queue = await self.queue_manager.channel.declare_queue( + queue_name, durable=True, passive=True + ) + count = queue.declaration_result.message_count + consumers = queue.declaration_result.consumer_count + + if priority == QueuePriority.HIGH: + high = QueueStats("high", count, consumers) + elif priority == QueuePriority.MEDIUM: + medium = QueueStats("medium", count, consumers) + elif priority == QueuePriority.LOW: + low = QueueStats("low", count, consumers) + except Exception as e: + logger.warning(f"Could not get stats for {queue_name}: {e}") + + total = high.pending + medium.pending + low.pending + return FullQueueStatus(high, medium, low, total) + except Exception as e: + logger.error(f"Error getting queue stats: {e}") + return FullQueueStatus(high, medium, low, 0) + + async def clear_queue(self, priority: str = "all") -> Dict[str, int]: + if not self.queue_manager or not self.queue_manager.channel: + raise RuntimeError("Queue manager not available") + + cleared = {} + + try: + priorities_to_clear = [] + if priority == "all": + priorities_to_clear = list(self.queue_manager.queues.keys()) + else: + priority_map = { + "high": QueuePriority.HIGH, + "medium": QueuePriority.MEDIUM, + "low": QueuePriority.LOW, + } + if priority in priority_map: + priorities_to_clear = [priority_map[priority]] + + for p in priorities_to_clear: + queue_name = self.queue_manager.queues[p] + try: + queue = await self.queue_manager.channel.declare_queue( + queue_name, durable=True, passive=True + ) + count = queue.declaration_result.message_count + await queue.purge() + cleared[p.value] = count + logger.info(f"Cleared {count} messages from {queue_name}") + except Exception as e: + logger.error(f"Failed to clear {queue_name}: {e}") + cleared[p.value] = 0 + + return cleared + except Exception as e: + logger.error(f"Error clearing queues: {e}") + raise diff --git a/backend/app/services/admin/user_info_service.py b/backend/app/services/admin/user_info_service.py new file mode 100644 index 00000000..e627aee7 --- /dev/null +++ b/backend/app/services/admin/user_info_service.py @@ -0,0 +1,127 @@ +import logging +from typing import Dict, Any, Optional +from dataclasses import dataclass +from datetime import datetime + +from app.database.supabase.client import get_supabase_client + +logger = logging.getLogger(__name__) + + +@dataclass +class UserInfo: + discord_id: str + discord_username: str + display_name: str + avatar_url: Optional[str] + created_at: str + is_verified: bool + github_username: Optional[str] + message_count: int + has_active_thread: bool + roles_count: int + last_message_at: Optional[str] + + +class UserInfoService: + def __init__(self, bot=None): + self.bot = bot + + async def _get_internal_user_id(self, discord_id: str) -> Optional[str]: + """Resolve internal users.id UUID from a Discord snowflake ID.""" + try: + supabase = get_supabase_client() + res = await supabase.table("users").select("id").eq( + "discord_id", discord_id + ).limit(1).execute() + + if res.data: + return str(res.data[0]["id"]) + return None + except Exception as e: + logger.warning(f"Could not resolve internal user id for discord_id={discord_id}: {e}") + return None + + async def get_user_profile(self, discord_id: str) -> Optional[Dict[str, Any]]: + try: + supabase = get_supabase_client() + res = await supabase.table("users").select("*").eq( + "discord_id", discord_id + ).limit(1).execute() + if res.data: + return res.data[0] + return None + except Exception as e: + logger.error(f"Error getting user profile: {e}") + return None + + async def get_user_message_count(self, discord_id: str) -> int: + try: + internal_user_id = await self._get_internal_user_id(discord_id) + if not internal_user_id: + return 0 + + supabase = get_supabase_client() + res = await supabase.table("interactions").select( + "id", count="exact" + ).eq("user_id", internal_user_id).execute() + return res.count or 0 + except Exception as e: + logger.warning(f"Could not get message count: {e}") + return 0 + + async def get_last_message(self, discord_id: str) -> Optional[str]: + try: + internal_user_id = await self._get_internal_user_id(discord_id) + if not internal_user_id: + return None + + supabase = get_supabase_client() + res = await supabase.table("interactions").select( + "created_at" + ).eq("user_id", internal_user_id).order( + "created_at", desc=True + ).limit(1).execute() + if res.data: + return res.data[0]["created_at"] + return None + except Exception as e: + logger.warning(f"Could not get last message: {e}") + return None + + def has_active_thread(self, discord_id: str) -> bool: + if not self.bot: + return False + return discord_id in self.bot.active_threads + + async def get_full_user_info( + self, + discord_user, + member=None + ) -> UserInfo: + discord_id = str(discord_user.id) + profile = await self.get_user_profile(discord_id) + message_count = await self.get_user_message_count(discord_id) + last_message = await self.get_last_message(discord_id) + + is_verified = False + github_username = None + if profile: + is_verified = profile.get("is_verified", False) + github_username = profile.get("github_username") + + roles_count = len(member.roles) - 1 if member else 0 + + return UserInfo( + discord_id=discord_id, + discord_username=discord_user.name, + display_name=discord_user.display_name if hasattr(discord_user, 'display_name') else discord_user.name, + avatar_url=str(discord_user.avatar.url) if discord_user.avatar else None, + created_at=discord_user.created_at.strftime("%Y-%m-%d"), + is_verified=is_verified, + github_username=github_username, + message_count=message_count, + has_active_thread=self.has_active_thread(discord_id), + roles_count=roles_count, + last_message_at=last_message, + ) diff --git a/backend/app/services/admin/user_management_service.py b/backend/app/services/admin/user_management_service.py new file mode 100644 index 00000000..2c914f69 --- /dev/null +++ b/backend/app/services/admin/user_management_service.py @@ -0,0 +1,117 @@ +import logging +from typing import Dict, Any, List, Optional +from dataclasses import dataclass +from datetime import datetime, timezone + +from app.database.supabase.client import get_supabase_client +from app.core.orchestration.queue_manager import AsyncQueueManager, QueuePriority + +logger = logging.getLogger(__name__) + + +@dataclass +class ResetResult: + user_id: str + memory_cleared: bool + thread_closed: bool + verification_reset: bool + errors: List[str] + + +class UserManagementService: + def __init__(self, bot=None, queue_manager: AsyncQueueManager = None): + self.bot = bot + self.queue_manager = queue_manager + + async def clear_user_memory(self, user_id: str) -> bool: + try: + if not self.queue_manager: + logger.warning("Queue manager not available for memory clear") + return False + + cleanup_msg = { + "type": "clear_thread_memory", + "memory_thread_id": user_id, + "user_id": user_id, + "cleanup_reason": "admin_reset" + } + await self.queue_manager.enqueue(cleanup_msg, QueuePriority.HIGH) + logger.info(f"Queued memory clear for user {user_id}") + return True + except Exception as e: + logger.error(f"Failed to clear memory for {user_id}: {e}") + return False + + async def close_user_thread(self, user_id: str) -> bool: + try: + if not self.bot: + return False + + if user_id not in self.bot.active_threads: + return True + + thread_id = self.bot.active_threads.pop(user_id, None) + if thread_id: + try: + thread = self.bot.get_channel(int(thread_id)) + if thread: + await thread.edit(archived=True) + logger.info(f"Archived thread {thread_id} for user {user_id}") + except Exception as e: + logger.warning(f"Could not archive thread {thread_id}: {e}") + + return True + except Exception as e: + logger.error(f"Failed to close thread for {user_id}: {e}") + return False + + async def reset_verification(self, user_id: str) -> bool: + try: + supabase = get_supabase_client() + await supabase.table("users").update({ + "github_id": None, + "github_username": None, + "is_verified": False, + "verification_token": None, + "updated_at": datetime.now(timezone.utc).isoformat() + }).eq("discord_id", user_id).execute() + logger.info(f"Reset verification for user {user_id}") + return True + except Exception as e: + logger.error(f"Failed to reset verification for {user_id}: {e}") + return False + + async def reset_user( + self, + user_id: str, + reset_memory: bool = False, + reset_thread: bool = False, + reset_verification: bool = False + ) -> ResetResult: + errors = [] + memory_cleared = False + thread_closed = False + verification_reset = False + + if reset_memory: + memory_cleared = await self.clear_user_memory(user_id) + if not memory_cleared: + errors.append("Failed to clear memory") + + if reset_thread: + thread_closed = await self.close_user_thread(user_id) + if not thread_closed: + errors.append("Failed to close thread") + + if reset_verification: + verification_reset = await self.reset_verification(user_id) + if not verification_reset: + errors.append("Failed to reset verification") + + return ResetResult( + user_id=user_id, + memory_cleared=memory_cleared, + thread_closed=thread_closed, + verification_reset=verification_reset, + errors=errors + ) diff --git a/backend/app/services/auth/management.py b/backend/app/services/auth/management.py index 4d0f35d8..f23d91b1 100644 --- a/backend/app/services/auth/management.py +++ b/backend/app/services/auth/management.py @@ -1,5 +1,5 @@ import uuid -from datetime import datetime +from datetime import datetime, timezone from typing import Optional from app.database.supabase.client import get_supabase_client from app.models.database.supabase import User @@ -29,8 +29,8 @@ async def get_or_create_user_by_discord( "discord_username": discord_username, "avatar_url": avatar_url, "preferred_languages": [], - "created_at": datetime.now().isoformat(), - "updated_at": datetime.now().isoformat() + "created_at": datetime.now(timezone.utc).isoformat(), + "updated_at": datetime.now(timezone.utc).isoformat() } insert_res = await supabase.table("users").insert(new_user_data).execute() @@ -80,7 +80,7 @@ async def update_user_profile(user_id: str, **updates) -> Optional[User]: try: # Add updated_at timestamp - updates["updated_at"] = datetime.now().isoformat() + updates["updated_at"] = datetime.now(timezone.utc).isoformat() update_res = await supabase.table("users").update(updates).eq("id", user_id).execute() diff --git a/backend/app/services/auth/verification.py b/backend/app/services/auth/verification.py index cbfa156c..03a154d5 100644 --- a/backend/app/services/auth/verification.py +++ b/backend/app/services/auth/verification.py @@ -1,5 +1,5 @@ import uuid -from datetime import datetime, timedelta +from datetime import datetime, timedelta, timezone from typing import Optional, Dict, Tuple from app.database.supabase.client import get_supabase_client from app.models.database.supabase import User @@ -16,7 +16,7 @@ def _cleanup_expired_sessions(): """ Remove expired verification sessions. """ - current_time = datetime.now() + current_time = datetime.now(timezone.utc) expired_sessions = [ session_id for session_id, (discord_id, expiry_time) in _verification_sessions.items() if current_time > expiry_time @@ -40,13 +40,13 @@ async def create_verification_session(discord_id: str) -> Optional[str]: token = str(uuid.uuid4()) session_id = str(uuid.uuid4()) - expiry_time = datetime.now() + timedelta(minutes=SESSION_EXPIRY_MINUTES) + expiry_time = datetime.now(timezone.utc) + timedelta(minutes=SESSION_EXPIRY_MINUTES) try: update_res = await supabase.table("users").update({ "verification_token": token, "verification_token_expires_at": expiry_time.isoformat(), - "updated_at": datetime.now().isoformat() + "updated_at": datetime.now(timezone.utc).isoformat() }).eq("discord_id", discord_id).execute() if update_res.data: @@ -79,7 +79,7 @@ async def find_user_by_session_and_verify( discord_id, expiry_time = session_data - current_time = datetime.now().isoformat() + current_time = datetime.now(timezone.utc).isoformat() user_res = await supabase.table("users").select("*").eq( "discord_id", discord_id ).neq( @@ -106,7 +106,7 @@ async def find_user_by_session_and_verify( await supabase.table("users").update({ "verification_token": None, "verification_token_expires_at": None, - "updated_at": datetime.now().isoformat() + "updated_at": datetime.now(timezone.utc).isoformat() }).eq("id", user_to_verify['id']).execute() raise Exception(f"GitHub account {github_username} is already linked to another Discord user") @@ -115,10 +115,10 @@ async def find_user_by_session_and_verify( "github_username": github_username, "email": user_to_verify.get('email') or email, "is_verified": True, - "verified_at": datetime.now().isoformat(), + "verified_at": datetime.now(timezone.utc).isoformat(), "verification_token": None, "verification_token_expires_at": None, - "updated_at": datetime.now().isoformat() + "updated_at": datetime.now(timezone.utc).isoformat() } await supabase.table("users").update(update_data).eq("id", user_to_verify['id']).execute() @@ -139,7 +139,7 @@ async def cleanup_expired_tokens(): Clean up expired verification tokens from database. """ supabase = get_supabase_client() - current_time = datetime.now().isoformat() + current_time = datetime.now(timezone.utc).isoformat() try: cleanup_res = await supabase.table("users").update({ @@ -165,12 +165,12 @@ async def get_verification_session_info(session_id: str) -> Optional[Dict[str, s discord_id, expiry_time = session_data - if datetime.now() > expiry_time: + if datetime.now(timezone.utc) > expiry_time: del _verification_sessions[session_id] return None return { "discord_id": discord_id, "expiry_time": expiry_time.isoformat(), - "time_remaining": str(expiry_time - datetime.now()) + "time_remaining": str(expiry_time - datetime.now(timezone.utc)) } diff --git a/backend/app/services/codegraph/repo_service.py b/backend/app/services/codegraph/repo_service.py index eba4fca8..835a7760 100644 --- a/backend/app/services/codegraph/repo_service.py +++ b/backend/app/services/codegraph/repo_service.py @@ -2,7 +2,7 @@ import aiohttp import re from typing import Dict, Any -from datetime import datetime +from datetime import datetime, timezone from app.database.supabase.client import get_supabase_client import os @@ -80,7 +80,7 @@ async def index_repo(self, repo_input: str, discord_id: str) -> Dict[str, Any]: await self.supabase.table("indexed_repositories").update({ "indexing_status": "pending", "last_error": None, - "updated_at": datetime.now().isoformat() + "updated_at": datetime.now(timezone.utc).isoformat() }).eq("id", repo_data['id']).execute() else: # Insert new record @@ -118,7 +118,7 @@ async def index_repo(self, repo_input: str, discord_id: str) -> Dict[str, Any]: await self.supabase.table("indexed_repositories").update({ "indexing_status": "completed", - "indexed_at": datetime.now().isoformat(), + "indexed_at": datetime.now(timezone.utc).isoformat(), "node_count": data.get("node_count", 0), "edge_count": data.get("edge_count", 0), "last_error": None @@ -269,7 +269,7 @@ async def delete_repo(self, repo_full_name: str, discord_id: str) -> Dict[str, A await self.supabase.table("indexed_repositories").update({ "is_deleted": True, - "updated_at": datetime.now().isoformat() + "updated_at": datetime.now(timezone.utc).isoformat() }).eq("repository_full_name", repo_full_name).eq("is_deleted", False).execute() return {"status": "success", "repo": repo_full_name, "graph_name": graph_name} diff --git a/backend/app/services/integration_service.py b/backend/app/services/integration_service.py index 21f87e48..87550f94 100644 --- a/backend/app/services/integration_service.py +++ b/backend/app/services/integration_service.py @@ -1,6 +1,6 @@ import logging from uuid import UUID, uuid4 -from datetime import datetime +from datetime import datetime, timezone from typing import Optional, List from app.database.supabase.client import get_supabase_client from app.models.integration import ( @@ -49,8 +49,8 @@ async def create_integration( "organization_name": request.organization_name, "is_active": True, "config": request.config or {}, - "created_at": datetime.now().isoformat(), - "updated_at": datetime.now().isoformat() + "created_at": datetime.now(timezone.utc).isoformat(), + "updated_at": datetime.now(timezone.utc).isoformat() } # Store organization link if provided @@ -134,7 +134,7 @@ async def update_integration( ) -> IntegrationResponse: """Update an existing integration.""" try: - update_data = {"updated_at": datetime.now().isoformat()} + update_data = {"updated_at": datetime.now(timezone.utc).isoformat()} if request.organization_name is not None: update_data["organization_name"] = request.organization_name diff --git a/backend/app/utils/admin_logger.py b/backend/app/utils/admin_logger.py new file mode 100644 index 00000000..8cfb363f --- /dev/null +++ b/backend/app/utils/admin_logger.py @@ -0,0 +1,262 @@ +import logging +from typing import Dict, Any, Optional, List +from datetime import datetime, timezone +import uuid + +from app.database.supabase.client import get_supabase_client + +logger = logging.getLogger(__name__) + +_admin_logs_table_ready: Optional[bool] = None + + +async def ensure_admin_logs_table() -> bool: + """Check whether the admin_logs table is available in the current environment.""" + global _admin_logs_table_ready + + if _admin_logs_table_ready is True: + return True + + try: + supabase = get_supabase_client() + await supabase.table("admin_logs").select("id").limit(1).execute() + _admin_logs_table_ready = True + return True + except Exception as e: + logger.error( + "admin_logs table is unavailable. Apply migration at " + "backend/database/02_create_admin_logs_table.sql. Details: %s", + str(e), + ) + return False + + +async def log_admin_action( + executor_id: str, + executor_username: str, + command_name: str, + server_id: str, + action_result: str = "success", + command_args: Optional[Dict[str, Any]] = None, + target_id: Optional[str] = None, + error_message: Optional[str] = None, + metadata: Optional[Dict[str, Any]] = None, +) -> Optional[str]: + """Log admin command execution to database. Returns log UUID or None if failed.""" + try: + if not await ensure_admin_logs_table(): + return None + + supabase = get_supabase_client() + + # Validate action_result + if action_result not in ["success", "failure", "error"]: + logger.warning(f"Invalid action_result '{action_result}', defaulting to 'error'") + action_result = "error" + + # Prepare log entry + log_entry = { + "id": str(uuid.uuid4()), + "timestamp": datetime.now(timezone.utc).isoformat(), + "executor_id": executor_id, + "executor_username": executor_username, + "command_name": command_name, + "command_args": command_args or {}, + "target_id": target_id, + "action_result": action_result, + "error_message": error_message, + "server_id": server_id, + "metadata": metadata or {}, + } + + # Insert log entry + response = await supabase.table("admin_logs").insert(log_entry).execute() + + if response.data: + log_id = response.data[0]["id"] + logger.info( + f"Admin action logged: id={log_id}, executor={executor_username}, " + f"command={command_name}, result={action_result}" + ) + return log_id + else: + logger.error(f"Failed to log admin action: {response}") + return None + + except Exception as e: + logger.error(f"Error logging admin action: {str(e)}", exc_info=True) + return None + + +async def get_admin_logs( + executor_id: Optional[str] = None, + command_name: Optional[str] = None, + server_id: Optional[str] = None, + action_result: Optional[str] = None, + target_id: Optional[str] = None, + start_time: Optional[datetime] = None, + end_time: Optional[datetime] = None, + limit: int = 100, + offset: int = 0, +) -> List[Dict[str, Any]]: + """Get admin logs with optional filtering. Supports pagination via limit/offset.""" + try: + supabase = get_supabase_client() + + # Enforce maximum limit + limit = min(limit, 1000) + + # Build query + query = supabase.table("admin_logs").select("*") + + # Apply filters + if executor_id: + query = query.eq("executor_id", executor_id) + + if command_name: + query = query.eq("command_name", command_name) + + if server_id: + query = query.eq("server_id", server_id) + + if action_result: + if action_result not in ["success", "failure", "error"]: + logger.warning(f"Invalid action_result filter: {action_result}") + else: + query = query.eq("action_result", action_result) + + if target_id: + query = query.eq("target_id", target_id) + + if start_time: + query = query.gte("timestamp", start_time.isoformat()) + + if end_time: + query = query.lte("timestamp", end_time.isoformat()) + + # Order by timestamp (newest first) and apply pagination + query = query.order("timestamp", desc=True).range(offset, offset + limit - 1) + + # Execute query + response = await query.execute() + + if response.data: + logger.info( + f"Retrieved {len(response.data)} admin logs " + f"(limit={limit}, offset={offset})" + ) + return response.data + else: + logger.info("No admin logs found matching the criteria") + return [] + + except Exception as e: + logger.error(f"Error retrieving admin logs: {str(e)}", exc_info=True) + return [] + + +async def get_admin_log_stats( + server_id: Optional[str] = None, + start_time: Optional[datetime] = None, + end_time: Optional[datetime] = None, +) -> Dict[str, Any]: + """Get usage stats for admin commands including success rates and top executors.""" + try: + supabase = get_supabase_client() + + # Paginate through matching logs to avoid unbounded result sets + batch_size = 1000 + offset = 0 + + total_commands = 0 + success_count = 0 + failure_count = 0 + error_count = 0 + commands_by_type: Dict[str, int] = {} + executor_counts: Dict[str, int] = {} + + while True: + query = supabase.table("admin_logs").select( + "command_name,action_result,executor_username" + ) + + if server_id: + query = query.eq("server_id", server_id) + + if start_time: + query = query.gte("timestamp", start_time.isoformat()) + + if end_time: + query = query.lte("timestamp", end_time.isoformat()) + + query = query.order("timestamp", desc=True).range(offset, offset + batch_size - 1) + + response = await query.execute() + logs = response.data or [] + + if not logs: + break + + total_commands += len(logs) + + for log in logs: + action_result = log.get("action_result") + if action_result == "success": + success_count += 1 + elif action_result == "failure": + failure_count += 1 + elif action_result == "error": + error_count += 1 + + cmd = log.get("command_name") or "unknown" + commands_by_type[cmd] = commands_by_type.get(cmd, 0) + 1 + + executor = log.get("executor_username") or "unknown" + executor_counts[executor] = executor_counts.get(executor, 0) + 1 + + if len(logs) < batch_size: + break + + offset += batch_size + + if total_commands == 0: + return { + "total_commands": 0, + "success_count": 0, + "failure_count": 0, + "error_count": 0, + "commands_by_type": {}, + "top_executors": [], + } + + # Sort executors by count + top_executors = [ + {"username": username, "count": count} + for username, count in sorted( + executor_counts.items(), key=lambda x: x[1], reverse=True + )[:10] + ] + + stats = { + "total_commands": total_commands, + "success_count": success_count, + "failure_count": failure_count, + "error_count": error_count, + "success_rate": round((success_count / total_commands * 100), 2) if total_commands > 0 else 0, + "commands_by_type": commands_by_type, + "top_executors": top_executors, + } + + logger.info(f"Generated admin log statistics: {total_commands} total commands") + return stats + + except Exception as e: + logger.error(f"Error getting admin log stats: {str(e)}", exc_info=True) + return { + "total_commands": 0, + "success_count": 0, + "failure_count": 0, + "error_count": 0, + "commands_by_type": {}, + "top_executors": [], + } diff --git a/backend/database/02_create_admin_logs_table.sql b/backend/database/02_create_admin_logs_table.sql new file mode 100644 index 00000000..7a79284a --- /dev/null +++ b/backend/database/02_create_admin_logs_table.sql @@ -0,0 +1,52 @@ +-- Table for storing admin action logs +CREATE TABLE IF NOT EXISTS admin_logs ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + timestamp TIMESTAMPTZ NOT NULL DEFAULT NOW(), + executor_id TEXT NOT NULL, -- Discord user ID + executor_username TEXT NOT NULL, + command_name TEXT NOT NULL, + command_args JSONB DEFAULT '{}', -- Store command parameters + target_id TEXT, -- Target user/queue if applicable + action_result TEXT NOT NULL CHECK (action_result IN ('success', 'failure', 'error')), + error_message TEXT, + server_id TEXT NOT NULL, -- Discord server/guild ID + metadata JSONB DEFAULT '{}' -- Additional context (e.g., queue state, bot status, etc.) +); + +-- Create indexes for better query performance +CREATE INDEX IF NOT EXISTS idx_admin_logs_timestamp ON admin_logs(timestamp DESC); +CREATE INDEX IF NOT EXISTS idx_admin_logs_executor_id ON admin_logs(executor_id); +CREATE INDEX IF NOT EXISTS idx_admin_logs_command_name ON admin_logs(command_name); +CREATE INDEX IF NOT EXISTS idx_admin_logs_action_result ON admin_logs(action_result); +CREATE INDEX IF NOT EXISTS idx_admin_logs_server_id ON admin_logs(server_id); +CREATE INDEX IF NOT EXISTS idx_admin_logs_target_id ON admin_logs(target_id) WHERE target_id IS NOT NULL; + +-- Composite index for common query patterns +CREATE INDEX IF NOT EXISTS idx_admin_logs_executor_timestamp ON admin_logs(executor_id, timestamp DESC); +CREATE INDEX IF NOT EXISTS idx_admin_logs_command_timestamp ON admin_logs(command_name, timestamp DESC); + +-- Enable Row Level Security (RLS) +ALTER TABLE admin_logs ENABLE ROW LEVEL SECURITY; + +-- Create RLS policies for admin_logs +-- Only authenticated users can view logs (typically admin dashboard access) +CREATE POLICY "Authenticated users can view admin logs" + ON admin_logs + FOR SELECT + USING (auth.role() = 'authenticated'); + +-- Service role can insert logs (for bot to write logs) +CREATE POLICY "Service role can insert admin logs" + ON admin_logs + FOR INSERT + WITH CHECK (auth.role() = 'service_role'); + +-- Add helpful comments +COMMENT ON TABLE admin_logs IS 'Tracks all admin command executions for audit trail'; +COMMENT ON COLUMN admin_logs.executor_id IS 'Discord user ID of the admin who executed the command'; +COMMENT ON COLUMN admin_logs.command_name IS 'Name of the admin command (e.g., /admin stats, /admin pause_queue)'; +COMMENT ON COLUMN admin_logs.command_args IS 'JSON object containing command parameters and arguments'; +COMMENT ON COLUMN admin_logs.target_id IS 'ID of the target entity if applicable (user ID, queue name, etc.)'; +COMMENT ON COLUMN admin_logs.action_result IS 'Result of the command execution: success, failure, or error'; +COMMENT ON COLUMN admin_logs.server_id IS 'Discord server/guild ID where the command was executed'; +COMMENT ON COLUMN admin_logs.metadata IS 'Additional contextual information (e.g., system state before/after)'; diff --git a/backend/integrations/discord/admin_cog.py b/backend/integrations/discord/admin_cog.py new file mode 100644 index 00000000..821a655e --- /dev/null +++ b/backend/integrations/discord/admin_cog.py @@ -0,0 +1,411 @@ +import logging +import discord +from discord import app_commands, Interaction +from discord.ext import commands + +from integrations.discord.bot import DiscordBot +from integrations.discord.permissions import require_admin +from integrations.discord.views import ConfirmActionView +from app.core.orchestration.queue_manager import AsyncQueueManager +from app.services.admin import ( + BotStatsService, + HealthCheckService, + UserInfoService, + QueueService, + CacheService, + UserManagementService, +) + +logger = logging.getLogger(__name__) + + +class AdminCommands(commands.GroupCog, name="admin"): + """Bot management and monitoring commands""" + + def __init__(self, bot: DiscordBot, queue_manager: AsyncQueueManager): + self.bot = bot + self.queue = queue_manager + self.stats_service = BotStatsService(bot=bot, queue_manager=queue_manager) + self.health_service = HealthCheckService(queue_manager=queue_manager) + self.user_info_service = UserInfoService(bot=bot) + self.queue_service = QueueService(queue_manager=queue_manager) + self.cache_service = CacheService(bot=bot) + self.user_management_service = UserManagementService(bot=bot, queue_manager=queue_manager) + super().__init__() + + async def _confirm_action( + self, + interaction: Interaction, + *, + title: str, + description: str, + timeout: float = 30.0, + ) -> bool: + """Show a confirmation dialog and return True only when confirmed.""" + embed = discord.Embed( + title=title, + description=description, + color=discord.Color.orange(), + ) + embed.set_footer(text=f"This action times out in {int(timeout)} seconds.") + + view = ConfirmActionView(timeout=timeout) + await interaction.response.send_message(embed=embed, view=view, ephemeral=True) + await view.wait() + + if view.interaction is None: + timeout_embed = discord.Embed( + title="Action Timed Out", + description="No confirmation received. Operation cancelled.", + color=discord.Color.light_grey(), + ) + await interaction.edit_original_response(embed=timeout_embed, view=None) + return False + + if not view.confirmed: + cancelled_embed = discord.Embed( + title="Action Cancelled", + description="No changes were made.", + color=discord.Color.light_grey(), + ) + await interaction.edit_original_response(embed=cancelled_embed, view=None) + return False + + return True + + async def cog_app_command_error(self, interaction: Interaction, error: Exception): + """Handle errors for admin slash commands.""" + logger.error(f"Admin command error: {error}", exc_info=True) + + if isinstance(error, app_commands.MissingPermissions): + error_message = "You don't have permission to use this command." + elif isinstance(error, app_commands.CommandInvokeError): + error_message = f"Command failed: {str(error.original)}" + else: + error_message = "Something went wrong executing that command." + + try: + if interaction.response.is_done(): + await interaction.followup.send(error_message, ephemeral=True) + else: + await interaction.response.send_message(error_message, ephemeral=True) + except Exception as e: + logger.error(f"Failed to send error message: {e}") + + @app_commands.command(name="stats", description="Display bot statistics and metrics") + @require_admin + async def stats(self, interaction: Interaction): + """Show current bot stats.""" + await interaction.response.defer(ephemeral=True) + + try: + stats = await self.stats_service.get_all_stats() + + embed = discord.Embed(title="Bot Statistics", color=discord.Color.blue()) + embed.add_field(name="Servers", value=str(stats.guild_count), inline=True) + embed.add_field(name="Total Members", value=str(stats.total_members), inline=True) + embed.add_field(name="Active Threads", value=str(stats.active_threads), inline=True) + embed.add_field(name="Latency", value=f"{stats.latency_ms}ms", inline=True) + embed.add_field(name="Memory Usage", value=f"{stats.memory_mb} MB", inline=True) + embed.add_field(name="Uptime", value=f"{int(stats.uptime_seconds)}s", inline=True) + embed.add_field(name="Messages Today", value=str(stats.messages_today), inline=True) + embed.add_field(name="Messages (7d)", value=str(stats.messages_week), inline=True) + embed.add_field( + name="Queue", + value=( + f"High: {stats.queue_high}\n" + f"Medium: {stats.queue_medium}\n" + f"Low: {stats.queue_low}" + ), + inline=False, + ) + + await interaction.followup.send(embed=embed, ephemeral=True) + + except Exception as e: + logger.error(f"Stats command failed: {e}", exc_info=True) + await interaction.followup.send(f"Failed to get stats: {str(e)}", ephemeral=True) + + @app_commands.command(name="health", description="Check system health and service status") + @require_admin + async def health(self, interaction: Interaction): + """Check if all services are running.""" + await interaction.response.defer(ephemeral=True) + + try: + health = await self.health_service.get_all_health() + + status_color = { + "healthy": discord.Color.green(), + "degraded": discord.Color.orange(), + "unhealthy": discord.Color.red(), + } + status_emoji = { + "healthy": "✅", + "degraded": "⚠️", + "unhealthy": "❌", + } + + embed = discord.Embed( + title="System Health", + color=status_color.get(health.overall_status, discord.Color.orange()), + ) + embed.add_field( + name="Overall", + value=f"{status_emoji.get(health.overall_status, '⚠️')} {health.overall_status.title()}", + inline=False, + ) + + for service in health.services: + details = f"{service.latency_ms}ms" + if service.error: + details += f"\n{service.error}" + embed.add_field( + name=f"{status_emoji.get(service.status, '⚠️')} {service.name}", + value=details, + inline=True, + ) + + embed.set_footer(text=f"Checked at {health.timestamp}") + + await interaction.followup.send(embed=embed, ephemeral=True) + + except Exception as e: + logger.error(f"Health check error: {e}", exc_info=True) + await interaction.followup.send(f"Health check failed: {str(e)}", ephemeral=True) + + @app_commands.command(name="user_info", description="Get info about a user") + @app_commands.describe(user="User to look up") + @require_admin + async def user_info(self, interaction: Interaction, user: discord.User): + """Look up user details.""" + await interaction.response.defer(ephemeral=True) + + try: + member = interaction.guild.get_member(user.id) if interaction.guild else None + info = await self.user_info_service.get_full_user_info(user, member) + + embed = discord.Embed(title="User Information", color=discord.Color.blue()) + embed.set_thumbnail(url=user.display_avatar.url) + embed.add_field(name="Username", value=info.discord_username, inline=True) + embed.add_field(name="ID", value=info.discord_id, inline=True) + embed.add_field(name="Created", value=info.created_at, inline=True) + embed.add_field(name="Verified", value="Yes" if info.is_verified else "No", inline=True) + embed.add_field(name="GitHub", value=info.github_username or "Not linked", inline=True) + embed.add_field(name="Messages", value=str(info.message_count), inline=True) + embed.add_field(name="Active Thread", value="Yes" if info.has_active_thread else "No", inline=True) + embed.add_field(name="Roles", value=str(info.roles_count), inline=True) + embed.add_field(name="Last Message", value=info.last_message_at or "Never", inline=False) + + await interaction.followup.send(embed=embed, ephemeral=True) + + except Exception as e: + logger.error(f"User info lookup failed: {e}", exc_info=True) + await interaction.followup.send(f"Couldn't get user info: {str(e)}", ephemeral=True) + + @app_commands.command(name="user_reset", description="Reset user state") + @app_commands.describe( + user="User to reset", + reset_memory="Clear conversation memory", + reset_thread="Close active thread", + reset_verification="Clear GitHub verification" + ) + @require_admin + async def user_reset( + self, + interaction: Interaction, + user: discord.User, + reset_memory: bool = False, + reset_thread: bool = False, + reset_verification: bool = False + ): + """Reset various aspects of user state.""" + if not any([reset_memory, reset_thread, reset_verification]): + await interaction.response.send_message("Select at least one thing to reset.", ephemeral=True) + return + + try: + actions = [] + if reset_memory: + actions.append("memory") + if reset_thread: + actions.append("thread") + if reset_verification: + actions.append("verification") + + confirmed = await self._confirm_action( + interaction, + title="Confirm User Reset", + description=( + f"You are about to reset **{', '.join(actions)}** for {user.mention}.\n" + "This action may be destructive and cannot be fully undone." + ), + ) + if not confirmed: + return + + result = await self.user_management_service.reset_user( + user_id=str(user.id), + reset_memory=reset_memory, + reset_thread=reset_thread, + reset_verification=reset_verification, + ) + + success = len(result.errors) == 0 + + embed = discord.Embed( + title="User Reset Complete" if success else "User Reset Completed with Issues", + description=f"Target: {user.mention}", + color=discord.Color.green() if success else discord.Color.orange(), + ) + embed.add_field(name="Memory Cleared", value="Yes" if result.memory_cleared else "No", inline=True) + embed.add_field(name="Thread Closed", value="Yes" if result.thread_closed else "No", inline=True) + embed.add_field(name="Verification Reset", value="Yes" if result.verification_reset else "No", inline=True) + + if result.errors: + embed.add_field(name="Errors", value="\n".join(result.errors), inline=False) + + await interaction.edit_original_response(embed=embed, view=None) + + except Exception as e: + logger.error(f"User reset failed: {e}", exc_info=True) + await interaction.edit_original_response(content=f"Reset failed: {str(e)}", embed=None, view=None) + + @app_commands.command(name="queue_status", description="Check queue status") + @require_admin + async def queue_status(self, interaction: Interaction): + """See what's in the queue.""" + await interaction.response.defer(ephemeral=True) + + try: + status = await self.queue_service.get_queue_stats() + + total = status.total_pending + if total == 0: + color = discord.Color.green() + elif total < 10: + color = discord.Color.blue() + elif total < 50: + color = discord.Color.orange() + else: + color = discord.Color.red() + + embed = discord.Embed(title="Queue Status", color=discord.Color.blue()) + embed.color = color + embed.add_field( + name="High", + value=f"{status.high.pending} pending\n{status.high.consumers} consumers", + inline=True, + ) + embed.add_field( + name="Medium", + value=f"{status.medium.pending} pending\n{status.medium.consumers} consumers", + inline=True, + ) + embed.add_field( + name="Low", + value=f"{status.low.pending} pending\n{status.low.consumers} consumers", + inline=True, + ) + embed.add_field(name="Total Pending", value=str(status.total_pending), inline=False) + + await interaction.followup.send(embed=embed, ephemeral=True) + + except Exception as e: + logger.error(f"Queue status check failed: {e}", exc_info=True) + await interaction.followup.send(f"Couldn't get queue status: {str(e)}", ephemeral=True) + + @app_commands.command(name="queue_clear", description="Clear queue messages") + @app_commands.describe(priority="Which queue to clear") + @app_commands.choices(priority=[ + app_commands.Choice(name="All", value="all"), + app_commands.Choice(name="High", value="high"), + app_commands.Choice(name="Medium", value="medium"), + app_commands.Choice(name="Low", value="low") + ]) + @require_admin + async def queue_clear(self, interaction: Interaction, priority: str = "all"): + """Clear stuck messages from queue.""" + try: + confirmed = await self._confirm_action( + interaction, + title="Confirm Queue Clear", + description=( + f"You are about to clear the **{priority}** queue scope.\n" + "This action is destructive and cannot be undone." + ), + ) + if not confirmed: + return + + cleared = await self.queue_service.clear_queue(priority=priority) + total = sum(cleared.values()) + + embed = discord.Embed( + title="Queue Cleared", + description=f"Cleared queue scope: {priority}", + color=discord.Color.orange(), + ) + embed.add_field(name="High", value=str(cleared.get("high", 0)), inline=True) + embed.add_field(name="Medium", value=str(cleared.get("medium", 0)), inline=True) + embed.add_field(name="Low", value=str(cleared.get("low", 0)), inline=True) + embed.add_field(name="Total Cleared", value=str(total), inline=False) + + await interaction.edit_original_response(embed=embed, view=None) + + except Exception as e: + logger.error(f"Queue clear failed: {e}", exc_info=True) + await interaction.edit_original_response(content=f"Clear failed: {str(e)}", embed=None, view=None) + + @app_commands.command(name="cache_clear", description="Clear cached data") + @app_commands.describe(cache_type="What to clear") + @app_commands.choices(cache_type=[ + app_commands.Choice(name="All", value="all"), + app_commands.Choice(name="Active Threads", value="active_threads"), + app_commands.Choice(name="Embeddings", value="embeddings"), + app_commands.Choice(name="Memories", value="memories") + ]) + @require_admin + async def cache_clear(self, interaction: Interaction, cache_type: str = "all"): + """Clear various caches.""" + try: + if cache_type == "all": + confirmed = await self._confirm_action( + interaction, + title="Confirm Cache Clear", + description=( + "You are about to clear **all** caches.\n" + "This action may affect active sessions and performance." + ), + ) + if not confirmed: + return + else: + await interaction.response.defer(ephemeral=True) + + cleared = await self.cache_service.clear_cache(cache_type=cache_type) + + embed = discord.Embed( + title="Cache Clear", + description=f"Cleared cache type: {cache_type}", + color=discord.Color.blue(), + ) + for name, count in cleared.items(): + embed.add_field(name=name.replace("_", " ").title(), value=str(count), inline=True) + + if cache_type == "all": + await interaction.edit_original_response(embed=embed, view=None) + else: + await interaction.followup.send(embed=embed, ephemeral=True) + + except Exception as e: + logger.error(f"Cache clear failed: {e}", exc_info=True) + if interaction.response.is_done(): + await interaction.followup.send(f"Clear failed: {str(e)}", ephemeral=True) + else: + await interaction.response.send_message(f"Clear failed: {str(e)}", ephemeral=True) + + +async def setup(bot: DiscordBot): + """Register the admin cog.""" + await bot.add_cog(AdminCommands(bot, bot.queue_manager)) diff --git a/backend/integrations/discord/bot.py b/backend/integrations/discord/bot.py index dbb7c3a4..36551ff9 100644 --- a/backend/integrations/discord/bot.py +++ b/backend/integrations/discord/bot.py @@ -36,12 +36,23 @@ def _register_queue_handlers(self): async def on_ready(self): """Bot ready event""" logger.info(f'Enhanced Discord bot logged in as {self.user}') - print(f'Bot is ready! Logged in as {self.user}') + logger.info(f'Bot is ready! Logged in as {self.user}') try: - synced = await self.tree.sync() - print(f"Synced {len(synced)} slash command(s)") + # Sync globally + synced = await self.sync_commands() + synced_count = len(synced) if synced is not None else 0 + logger.info(f"Synced {synced_count} global slash command(s)") + + # Also sync to each guild for instant availability + for guild in self.guilds: + try: + guild_synced = await self.sync_commands(guild_ids=[guild.id]) + guild_synced_count = len(guild_synced) if guild_synced is not None else 0 + logger.info(f"Synced {guild_synced_count} commands to guild {guild.name}") + except Exception as e: + logger.warning(f"Failed to sync to guild {guild.name}: {e}") except Exception as e: - print(f"Failed to sync slash commands: {e}") + logger.error(f"Failed to sync slash commands: {e}") async def on_message(self, message): """Handles regular chat messages, but ignores slash commands.""" diff --git a/backend/integrations/discord/cogs.py b/backend/integrations/discord/cogs.py index 64fd0b7f..f74a83a5 100644 --- a/backend/integrations/discord/cogs.py +++ b/backend/integrations/discord/cogs.py @@ -38,9 +38,9 @@ def cog_unload(self): async def cleanup_expired_tokens(self): """Periodic cleanup of expired verification tokens""" try: - print("--> Running token cleanup task...") + logger.debug("Running token cleanup task") await cleanup_expired_tokens() - print("--> Token cleanup task finished.") + logger.debug("Token cleanup task finished") except Exception as e: logger.error(f"Error during token cleanup: {e}") diff --git a/backend/integrations/discord/permissions.py b/backend/integrations/discord/permissions.py new file mode 100644 index 00000000..bdd06883 --- /dev/null +++ b/backend/integrations/discord/permissions.py @@ -0,0 +1,274 @@ +from functools import wraps +from typing import Optional, Tuple, Dict, Any +import discord +from discord import Interaction +import logging +import inspect + +from app.core.config.settings import settings +from app.utils.admin_logger import log_admin_action + +logger = logging.getLogger(__name__) + + +def is_bot_owner(user_id: int) -> bool: + return settings.bot_owner_id is not None and user_id == settings.bot_owner_id + + +def is_admin(interaction: Interaction) -> bool: + if not interaction.guild: + return False + member = interaction.guild.get_member(interaction.user.id) + if not member: + return False + return member.guild_permissions.administrator + + +def get_permission_embed( + title: str = "Permission Denied", + description: str = "You don't have permission to use this command.", + required_permission: str = "Administrator or Bot Owner" +) -> discord.Embed: + embed = discord.Embed( + title=title, + description=description, + color=discord.Color.red() + ) + embed.add_field(name="Required Permission", value=required_permission, inline=False) + embed.add_field(name="Contact", value="Contact the bot owner if you believe this is an error.", inline=False) + return embed + + +def require_admin(func): + @wraps(func) + async def wrapper(self, interaction: Interaction, *args, **kwargs): + has_permission, reason = check_permissions(interaction) + + if not has_permission: + logger.warning( + f"Admin command denied: user={interaction.user.id} " + f"command={interaction.command.name if interaction.command else 'unknown'} " + f"reason={reason}" + ) + + # Log failed permission check + await log_admin_action( + executor_id=str(interaction.user.id), + executor_username=interaction.user.name, + command_name=interaction.command.name if interaction.command else 'unknown', + server_id=str(interaction.guild.id) if interaction.guild else 'dm', + action_result='failure', + error_message=f"Permission denied: {reason}", + metadata={ + 'is_owner': is_bot_owner(interaction.user.id), + 'is_admin': is_admin(interaction), + } + ) + + embed = get_permission_embed() + + if interaction.response.is_done(): + await interaction.followup.send(embed=embed, ephemeral=True) + else: + await interaction.response.send_message(embed=embed, ephemeral=True) + return None + + logger.info( + f"Admin command authorized: user={interaction.user.id} " + f"is_owner={is_bot_owner(interaction.user.id)} " + f"command={interaction.command.name if interaction.command else 'unknown'}" + ) + + # Extract command arguments for logging + command_args = _extract_command_args(func, args, kwargs) + + # Execute the command and track success/failure + action_result = 'success' + error_message = None + metadata = { + 'is_owner': is_bot_owner(interaction.user.id), + 'is_admin': is_admin(interaction), + } + + try: + result = await func(self, interaction, *args, **kwargs) + + # Log successful execution + await log_admin_action( + executor_id=str(interaction.user.id), + executor_username=interaction.user.name, + command_name=interaction.command.name if interaction.command else 'unknown', + server_id=str(interaction.guild.id) if interaction.guild else 'dm', + action_result=action_result, + command_args=command_args, + metadata=metadata + ) + + return result + + except Exception as e: + action_result = 'error' + error_message = str(e) + logger.error( + f"Admin command error: user={interaction.user.id} " + f"command={interaction.command.name if interaction.command else 'unknown'} " + f"error={error_message}" + ) + + # Log error + await log_admin_action( + executor_id=str(interaction.user.id), + executor_username=interaction.user.name, + command_name=interaction.command.name if interaction.command else 'unknown', + server_id=str(interaction.guild.id) if interaction.guild else 'dm', + action_result=action_result, + command_args=command_args, + error_message=error_message, + metadata=metadata + ) + + # Re-raise the exception + raise + + return wrapper + + +def require_bot_owner(func): + @wraps(func) + async def wrapper(self, interaction: Interaction, *args, **kwargs): + if not is_bot_owner(interaction.user.id): + logger.warning( + f"Bot owner command denied: user={interaction.user.id} " + f"command={interaction.command.name if interaction.command else 'unknown'}" + ) + + # Log failed permission check + await log_admin_action( + executor_id=str(interaction.user.id), + executor_username=interaction.user.name, + command_name=interaction.command.name if interaction.command else 'unknown', + server_id=str(interaction.guild.id) if interaction.guild else 'dm', + action_result='failure', + error_message="Permission denied: Bot owner only", + metadata={ + 'is_owner': False, + 'required_permission': 'bot_owner', + } + ) + + embed = get_permission_embed( + title="Bot Owner Only", + description="This command can only be used by the bot owner.", + required_permission="Bot Owner" + ) + + if interaction.response.is_done(): + await interaction.followup.send(embed=embed, ephemeral=True) + else: + await interaction.response.send_message(embed=embed, ephemeral=True) + return None + + logger.info( + f"Bot owner command authorized: user={interaction.user.id} " + f"command={interaction.command.name if interaction.command else 'unknown'}" + ) + + # Extract command arguments for logging + command_args = _extract_command_args(func, args, kwargs) + + # Execute the command and track success/failure + action_result = 'success' + error_message = None + metadata = { + 'is_owner': True, + 'required_permission': 'bot_owner', + } + + try: + result = await func(self, interaction, *args, **kwargs) + + # Log successful execution + await log_admin_action( + executor_id=str(interaction.user.id), + executor_username=interaction.user.name, + command_name=interaction.command.name if interaction.command else 'unknown', + server_id=str(interaction.guild.id) if interaction.guild else 'dm', + action_result=action_result, + command_args=command_args, + metadata=metadata + ) + + return result + + except Exception as e: + action_result = 'error' + error_message = str(e) + logger.error( + f"Bot owner command error: user={interaction.user.id} " + f"command={interaction.command.name if interaction.command else 'unknown'} " + f"error={error_message}" + ) + + # Log error + await log_admin_action( + executor_id=str(interaction.user.id), + executor_username=interaction.user.name, + command_name=interaction.command.name if interaction.command else 'unknown', + server_id=str(interaction.guild.id) if interaction.guild else 'dm', + action_result=action_result, + command_args=command_args, + error_message=error_message, + metadata=metadata + ) + + # Re-raise the exception + raise + + return wrapper + + +def check_permissions(interaction: Interaction) -> Tuple[bool, Optional[str]]: + if is_bot_owner(interaction.user.id): + return True, None + + if is_admin(interaction): + return True, None + + return False, f"User {interaction.user.id} is neither bot owner nor server administrator" + + +def _extract_command_args(func, args: tuple, kwargs: dict) -> Dict[str, Any]: + """Extract command arguments from function call for logging.""" + try: + sig = inspect.signature(func) + param_names = list(sig.parameters.keys()) + + # Skip 'self' and 'interaction' parameters + param_names = [p for p in param_names if p not in ['self', 'interaction']] + + # Build argument dictionary + command_args = {} + + # Add positional arguments + for i, value in enumerate(args): + if i < len(param_names): + # Convert to string for JSON serialization + serializable_types = (str, int, float, bool, type(None)) + command_args[param_names[i]] = ( + value if isinstance(value, serializable_types) else str(value) + ) + + # Add keyword arguments + for key, value in kwargs.items(): + if key not in ['self', 'interaction']: + # Convert to string for JSON serialization + serializable_types = (str, int, float, bool, type(None)) + command_args[key] = ( + value if isinstance(value, serializable_types) else str(value) + ) + + return command_args + + except Exception as e: + logger.warning(f"Failed to extract command arguments: {e}") + return {} diff --git a/backend/integrations/discord/views.py b/backend/integrations/discord/views.py index 807d6dd7..fdf490da 100644 --- a/backend/integrations/discord/views.py +++ b/backend/integrations/discord/views.py @@ -49,6 +49,35 @@ def __init__(self, oauth_url: str, provider_name: str): self.add_item(button) +class ConfirmActionView(discord.ui.View): + def __init__(self, timeout: float = 30.0): + super().__init__(timeout=timeout) + self.confirmed = False + self.interaction = None + + @discord.ui.button(label="Confirm", style=discord.ButtonStyle.danger) + async def confirm(self, interaction: discord.Interaction, button: discord.ui.Button): + self.confirmed = True + self.interaction = interaction + for item in self.children: + item.disabled = True + await interaction.response.edit_message(view=self) + self.stop() + + @discord.ui.button(label="Cancel", style=discord.ButtonStyle.secondary) + async def cancel(self, interaction: discord.Interaction, button: discord.ui.Button): + self.confirmed = False + self.interaction = interaction + for item in self.children: + item.disabled = True + await interaction.response.edit_message(view=self) + self.stop() + + async def on_timeout(self): + for item in self.children: + item.disabled = True + + class OnboardingView(discord.ui.View): """View shown in onboarding DM with optional GitHub connect link and Skip button.""" diff --git a/backend/main.py b/backend/main.py index b7ad80a6..b8baba37 100644 --- a/backend/main.py +++ b/backend/main.py @@ -12,6 +12,7 @@ from app.core.orchestration.agent_coordinator import AgentCoordinator from app.core.orchestration.queue_manager import AsyncQueueManager from app.database.weaviate.client import get_weaviate_client +from app.utils.admin_logger import ensure_admin_logs_table from integrations.discord.bot import DiscordBot from discord.ext import commands # DevRel commands are now loaded dynamically (commented out below) @@ -45,12 +46,23 @@ async def start_background_tasks(self): await self.queue_manager.start(num_workers=3) + if await ensure_admin_logs_table(): + logger.info("Admin logs table check passed") + else: + logger.warning("Admin logs table check failed; admin action logs will be skipped") + # --- Load commands inside the async startup function --- try: await self.discord_bot.load_extension("integrations.discord.cogs") except (ImportError, commands.ExtensionError) as e: logger.error("Failed to load Discord cog extension: %s", e) + try: + await self.discord_bot.load_extension("integrations.discord.admin_cog") + logger.info("Admin cog loaded successfully") + except Exception as e: + logger.error("Failed to load admin cog extension: %s", e, exc_info=True) + # Start the bot as a background task. asyncio.create_task( self.discord_bot.start(settings.discord_bot_token) diff --git a/docs/ADMIN_COMMANDS.md b/docs/ADMIN_COMMANDS.md new file mode 100644 index 00000000..09ce4d01 --- /dev/null +++ b/docs/ADMIN_COMMANDS.md @@ -0,0 +1,238 @@ +# Admin Commands Documentation + +## Overview + +The admin commands provide server administrators and bot owners with tools to manage, monitor, and troubleshoot the Devr.AI bot. All admin commands are grouped under the `/admin` command prefix. + +## Permissions + +Admin commands require one of the following: + +- **Server Administrator**: Users with the Discord `ADMINISTRATOR` permission +- **Bot Owner**: The user ID configured in `BOT_OWNER_ID` environment variable + +All admin command executions are logged for audit purposes. + +## Setup + +### Configure Bot Owner + +Add your Discord user ID to `.env`: + +```env +BOT_OWNER_ID=your_discord_user_id_here +``` + +To find your Discord user ID: +1. Enable Developer Mode in Discord (Settings > App Settings > Advanced) +2. Right-click your profile and select "Copy User ID" + +### Run Database Migration + +Execute the admin logs table migration: + +```sql +-- Run in Supabase SQL Editor or via psql +\i backend/database/02_create_admin_logs_table.sql +``` + +Optional verification query: + +```sql +SELECT to_regclass('public.admin_logs'); +``` + +Expected result: `admin_logs` + +> Note: On startup, the backend checks whether `admin_logs` exists. If missing, +> admin commands still run, but admin-action logging is skipped until migration is applied. + +## Commands + +### /admin stats + +Display bot statistics and metrics. + +**Usage:** `/admin stats` + +**Shows:** +- Server count and total members +- Active threads +- Bot latency and uptime +- Memory usage +- Messages processed (today and 7-day) +- Queue status by priority + +### /admin health + +Check system health and service status. + +**Usage:** `/admin health` + +**Checks:** +- Supabase database connection +- RabbitMQ message queue +- Weaviate vector store +- FalkorDB availability +- Gemini API availability + +**Status Indicators:** +- Green: Healthy +- Orange: Degraded +- Red: Unhealthy + +### /admin user_info + +Get detailed information about a user. + +**Usage:** `/admin user_info user:<@user>` + +**Parameters:** +- `user`: The Discord user to look up (mention or ID) + +**Shows:** +- Discord profile (username, ID, account creation date) +- GitHub verification status +- Linked GitHub username (if verified) +- Message count +- Active thread status +- Role count + +### /admin user_reset + +Reset user state with confirmation. + +**Usage:** `/admin user_reset user:<@user> [options]` + +**Parameters:** +- `user`: The Discord user to reset +- `reset_memory`: Clear conversation memory (default: False) +- `reset_thread`: Close active thread (default: False) +- `reset_verification`: Clear GitHub verification (default: False) + +**Requires Confirmation:** Yes + +### /admin queue_status + +Check message queue status. + +**Usage:** `/admin queue_status` + +**Shows:** +- Pending messages by priority (High, Medium, Low) +- Consumer count per queue +- Total pending messages + +**Color Indicators:** +- Green: 0 pending +- Blue: 1-9 pending +- Orange: 10-49 pending +- Red: 50+ pending + +### /admin queue_clear + +Clear messages from the queue. + +**Usage:** `/admin queue_clear [priority]` + +**Parameters:** +- `priority`: Which queue to clear + - `all` (default): Clear all queues + - `high`: Clear high priority only + - `medium`: Clear medium priority only + - `low`: Clear low priority only + +**Requires Confirmation:** Yes + +### /admin cache_clear + +Clear cached data. + +**Usage:** `/admin cache_clear [cache_type]` + +**Parameters:** +- `cache_type`: What to clear + - `all` (default): Clear all caches + - `active_threads`: Clear tracked threads + - `embeddings`: Clear embedding cache + - `memories`: Clear memory cache + +**Requires Confirmation:** Yes (for `all` only) + +## Logging + +All admin command executions are logged to the `admin_logs` table with: + +- Timestamp +- Executor ID and username +- Command name and arguments +- Target user (if applicable) +- Result (success/failure/error) +- Error message (if failed) +- Server ID +- Additional metadata + +### Viewing Logs + +Query logs via Supabase dashboard or API: + +```python +from app.utils.admin_logger import get_admin_logs + +# Get recent logs +logs = await get_admin_logs(limit=50) + +# Filter by executor +logs = await get_admin_logs(executor_id="123456789") + +# Filter by command +logs = await get_admin_logs(command_name="queue_clear") + +# Get statistics +from app.utils.admin_logger import get_admin_log_stats +stats = await get_admin_log_stats(server_id="987654321") +``` + +## Best Practices + +### When to Use Queue Clear +- Stuck messages that won't process +- After system errors that corrupted messages +- During maintenance windows + +### When to Use User Reset +- User reports conversation issues +- Memory corruption +- Re-verification needed + +### When to Use Cache Clear +- After configuration changes +- Memory pressure issues +- Stale data problems + +## Troubleshooting + +### Commands Not Appearing + +1. Wait for Discord to sync slash commands (can take up to 1 hour) +2. Try `/admin` and check autocomplete +3. Restart the bot to force sync + +### Permission Denied + +1. Verify you have `ADMINISTRATOR` permission in the server +2. Check if `BOT_OWNER_ID` is set correctly in `.env` +3. Verify the bot was restarted after config changes + +### Health Check Failures + +- **Supabase**: Check credentials in `.env` +- **RabbitMQ**: Verify Docker container is running +- **Weaviate**: Check Docker container and port 8080 +- **Gemini API**: Verify API key is valid + +### Queue Not Clearing + +1. Check RabbitMQ connection with `/admin health` +2. Verify Docker containers are running +3. Check RabbitMQ management console (port 15672) diff --git a/env.example b/env.example index 6ed55bcc..bda25ebf 100644 --- a/env.example +++ b/env.example @@ -5,6 +5,7 @@ TAVILY_API_KEY=your_tavily_api_key_here # Platform Integrations DISCORD_BOT_TOKEN=your_discord_bot_token_here GITHUB_TOKEN=your_github_token_here +BOT_OWNER_ID=your_discord_user_id_here # Database SUPABASE_URL=your_supabase_url_here diff --git a/tests/test_admin_integration.py b/tests/test_admin_integration.py new file mode 100644 index 00000000..12b71986 --- /dev/null +++ b/tests/test_admin_integration.py @@ -0,0 +1,222 @@ +import asyncio +from unittest.mock import Mock, AsyncMock, patch +import sys +from pathlib import Path + +ROOT = Path(__file__).resolve().parent.parent +sys.path.insert(0, str(ROOT)) +sys.path.insert(0, str(ROOT / "backend")) + + +async def test_full_workflow(): + from app.services.admin.bot_stats_service import BotStatsService + from app.services.admin.health_check_service import HealthCheckService, ServiceHealth + from app.services.admin.queue_service import QueueService, FullQueueStatus, QueueStats + + mock_bot = Mock() + mock_bot.guilds = [Mock(member_count=100)] + mock_bot.active_threads = {} + mock_bot.latency = 0.05 + + stats_service = BotStatsService(mock_bot, None) + health_service = HealthCheckService(None) + queue_service = QueueService(None) + + with patch.object(stats_service, 'get_message_stats', new_callable=AsyncMock) as m1, \ + patch.object(stats_service, 'get_queue_stats', new_callable=AsyncMock) as m2: + m1.return_value = {"today": 5, "week": 20} + m2.return_value = {"high": 0, "medium": 0, "low": 0} + + stats = await stats_service.get_all_stats() + assert stats.guild_count == 1 + + with patch.object(health_service, 'check_supabase', new_callable=AsyncMock) as h1, \ + patch.object(health_service, 'check_rabbitmq', new_callable=AsyncMock) as h2, \ + patch.object(health_service, 'check_weaviate', new_callable=AsyncMock) as h3, \ + patch.object(health_service, 'check_falkordb', new_callable=AsyncMock) as h4, \ + patch.object(health_service, 'check_gemini_api', new_callable=AsyncMock) as h5: + + h1.return_value = ServiceHealth("Supabase", "healthy", 10) + h2.return_value = ServiceHealth("RabbitMQ", "healthy", 5) + h3.return_value = ServiceHealth("Weaviate", "healthy", 15) + h4.return_value = ServiceHealth("FalkorDB", "healthy", 12) + h5.return_value = ServiceHealth("Gemini", "healthy", 100) + + health = await health_service.get_all_health() + assert health.overall_status == "healthy" + + with patch.object(queue_service, 'get_queue_stats', new_callable=AsyncMock) as q1: + q1.return_value = FullQueueStatus( + high=QueueStats("high", 0, 1), + medium=QueueStats("medium", 0, 1), + low=QueueStats("low", 0, 1), + total_pending=0 + ) + + status = await queue_service.get_queue_stats() + assert status.total_pending == 0 + + print("PASS: test_full_workflow") + + +async def test_permission_denies_regular_user(): + from integrations.discord.permissions import require_admin + + with patch("integrations.discord.permissions.settings") as mock_settings, \ + patch("integrations.discord.permissions.log_admin_action", new_callable=AsyncMock): + + mock_settings.bot_owner_id = 123456789 + + interaction = Mock() + interaction.user = Mock() + interaction.user.id = 999999999 + interaction.user.name = "regular_user" + interaction.guild = Mock() + interaction.guild.id = 1234567890 + + member = Mock() + member.guild_permissions = Mock() + member.guild_permissions.administrator = False + interaction.guild.get_member = Mock(return_value=member) + + interaction.command = Mock() + interaction.command.name = "stats" + + interaction.response = Mock() + interaction.response.is_done = Mock(return_value=False) + interaction.response.send_message = AsyncMock() + + called = False + + class Cog: + pass + + @require_admin + async def cmd(self, interaction): + nonlocal called + called = True + + await cmd(Cog(), interaction) + + assert not called + assert interaction.response.send_message.called + print("PASS: test_permission_denies_regular_user") + + +async def test_permission_allows_admin(): + from integrations.discord.permissions import require_admin + + with patch("integrations.discord.permissions.settings") as mock_settings, \ + patch("integrations.discord.permissions.log_admin_action", new_callable=AsyncMock): + + mock_settings.bot_owner_id = 123456789 + + interaction = Mock() + interaction.user = Mock() + interaction.user.id = 999999999 + interaction.user.name = "admin_user" + interaction.guild = Mock() + interaction.guild.id = 1234567890 + + member = Mock() + member.guild_permissions = Mock() + member.guild_permissions.administrator = True + interaction.guild.get_member = Mock(return_value=member) + + interaction.command = Mock() + interaction.command.name = "stats" + + interaction.response = Mock() + interaction.response.is_done = Mock(return_value=False) + + called = False + + class Cog: + pass + + @require_admin + async def cmd(self, interaction): + nonlocal called + called = True + + await cmd(Cog(), interaction) + + assert called + print("PASS: test_permission_allows_admin") + + +async def test_logging_works(): + from integrations.discord.permissions import require_admin + + with patch("integrations.discord.permissions.settings") as mock_settings, \ + patch("integrations.discord.permissions.log_admin_action", new_callable=AsyncMock) as mock_log: + + mock_settings.bot_owner_id = 123456789 + + interaction = Mock() + interaction.user = Mock() + interaction.user.id = 123456789 + interaction.user.name = "bot_owner" + interaction.guild = Mock() + interaction.guild.id = 1234567890 + + member = Mock() + member.guild_permissions = Mock() + member.guild_permissions.administrator = False + interaction.guild.get_member = Mock(return_value=member) + + interaction.command = Mock() + interaction.command.name = "queue_clear" + + interaction.response = Mock() + interaction.response.is_done = Mock(return_value=False) + + class Cog: + pass + + @require_admin + async def cmd(self, interaction, priority="all"): + pass + + await cmd(Cog(), interaction, priority="high") + + assert mock_log.called + kwargs = mock_log.call_args.kwargs + assert kwargs["executor_id"] == "123456789" + assert kwargs["command_name"] == "queue_clear" + print("PASS: test_logging_works") + + +async def run_all_tests(): + print("\n" + "=" * 60) + print("Running Admin Integration Tests") + print("=" * 60 + "\n") + + passed = 0 + failed = 0 + + tests = [ + test_full_workflow, + test_permission_denies_regular_user, + test_permission_allows_admin, + test_logging_works, + ] + + for test in tests: + try: + await test() + passed += 1 + except Exception as e: + failed += 1 + print(f"FAIL: {test.__name__}: {e}") + + print("\n" + "=" * 60) + print(f"Results: {passed} passed, {failed} failed") + print("=" * 60) + + return failed == 0 + + +if __name__ == "__main__": + success = asyncio.run(run_all_tests()) + sys.exit(0 if success else 1) diff --git a/tests/test_admin_management.py b/tests/test_admin_management.py new file mode 100644 index 00000000..0310c001 --- /dev/null +++ b/tests/test_admin_management.py @@ -0,0 +1,190 @@ +import asyncio +from unittest.mock import Mock, AsyncMock, patch +import sys +from pathlib import Path + +ROOT = Path(__file__).resolve().parent.parent +sys.path.insert(0, str(ROOT)) +sys.path.insert(0, str(ROOT / "backend")) + +async def test_queue_status(): + from app.services.admin.queue_service import QueueService, FullQueueStatus, QueueStats + + service = QueueService(None) + + with patch.object(service, 'get_queue_stats', new_callable=AsyncMock) as mock_stats: + mock_stats.return_value = FullQueueStatus( + high=QueueStats("high", 5, 2), + medium=QueueStats("medium", 10, 3), + low=QueueStats("low", 3, 1), + total_pending=18 + ) + + status = await service.get_queue_stats() + + assert status.total_pending == 18 + assert status.high.pending == 5 + print("PASS: test_queue_status") + + +async def test_queue_clear(): + from app.services.admin.queue_service import QueueService + from app.core.orchestration.queue_manager import QueuePriority + + mock_channel = Mock() + mock_queue_manager = Mock() + mock_queue_manager.channel = mock_channel + mock_queue_manager.queues = { + QueuePriority.HIGH: "high_task_queue", + QueuePriority.MEDIUM: "medium_task_queue", + QueuePriority.LOW: "low_task_queue" + } + + mock_queue = Mock() + mock_queue.declaration_result = Mock() + mock_queue.declaration_result.message_count = 5 + mock_queue.purge = AsyncMock() + + mock_channel.declare_queue = AsyncMock(return_value=mock_queue) + + service = QueueService(mock_queue_manager) + cleared = await service.clear_queue("all") + + assert "high" in cleared + assert "medium" in cleared + assert "low" in cleared + print("PASS: test_queue_clear") + + +async def test_user_reset_memory(): + from app.services.admin.user_management_service import UserManagementService + + mock_queue = Mock() + mock_queue.enqueue = AsyncMock() + + service = UserManagementService(None, mock_queue) + + result = await service.reset_user( + "123456", + reset_memory=True, + reset_thread=False, + reset_verification=False + ) + + assert result.memory_cleared is True + assert result.thread_closed is False + mock_queue.enqueue.assert_called_once() + print("PASS: test_user_reset_memory") + + +async def test_user_reset_full(): + from app.services.admin.user_management_service import UserManagementService + + mock_bot = Mock() + mock_bot.active_threads = {"123456": "thread123"} + mock_bot.get_channel = Mock(return_value=None) + + mock_queue = Mock() + mock_queue.enqueue = AsyncMock() + + service = UserManagementService(mock_bot, mock_queue) + + with patch('app.services.admin.user_management_service.get_supabase_client') as mock_supa: + mock_client = Mock() + mock_client.table = Mock(return_value=mock_client) + mock_client.update = Mock(return_value=mock_client) + mock_client.eq = Mock(return_value=mock_client) + mock_client.execute = AsyncMock() + mock_supa.return_value = mock_client + + result = await service.reset_user( + "123456", + reset_memory=True, + reset_thread=True, + reset_verification=True + ) + + assert result.memory_cleared is True + assert result.thread_closed is True + assert result.verification_reset is True + print("PASS: test_user_reset_full") + + +async def test_cache_clear(): + from app.services.admin.cache_service import CacheService + + mock_bot = Mock() + mock_bot.active_threads = {"1": "t1", "2": "t2", "3": "t3"} + + service = CacheService(mock_bot) + cleared = await service.clear_cache("all") + + assert cleared["active_threads"] == 3 + assert len(mock_bot.active_threads) == 0 + print("PASS: test_cache_clear") + + +async def test_cache_sizes(): + from app.services.admin.cache_service import CacheService + + mock_bot = Mock() + mock_bot.active_threads = {"1": "t1"} + + service = CacheService(mock_bot) + sizes = await service.get_cache_sizes() + + assert sizes["active_threads"] == 1 + print("PASS: test_cache_sizes") + + +def test_confirm_view(): + from integrations.discord.views import ConfirmActionView + + view = ConfirmActionView(timeout=30.0) + assert view.confirmed is False + assert view.timeout == 30.0 + print("PASS: test_confirm_view") + + +async def run_all_tests(): + print("\n" + "=" * 60) + print("Running Admin Management Tests") + print("=" * 60 + "\n") + + passed = 0 + failed = 0 + + async_tests = [ + test_queue_status, + test_queue_clear, + test_user_reset_memory, + test_user_reset_full, + test_cache_clear, + test_cache_sizes, + ] + + for test in async_tests: + try: + await test() + passed += 1 + except Exception as e: + failed += 1 + print(f"FAIL: {test.__name__}: {e}") + + try: + test_confirm_view() + passed += 1 + except Exception as e: + failed += 1 + print(f"FAIL: test_confirm_view: {e}") + + print("\n" + "=" * 60) + print(f"Results: {passed} passed, {failed} failed") + print("=" * 60) + + return failed == 0 + + +if __name__ == "__main__": + success = asyncio.run(run_all_tests()) + sys.exit(0 if success else 1) diff --git a/tests/test_admin_monitoring.py b/tests/test_admin_monitoring.py new file mode 100644 index 00000000..b95aeb75 --- /dev/null +++ b/tests/test_admin_monitoring.py @@ -0,0 +1,202 @@ +import asyncio +from unittest.mock import Mock, AsyncMock, patch +import sys +from pathlib import Path + +ROOT = Path(__file__).resolve().parent.parent +sys.path.insert(0, str(ROOT)) +sys.path.insert(0, str(ROOT / "backend")) + + +async def test_stats_service(): + from app.services.admin.bot_stats_service import BotStatsService + + mock_bot = Mock() + mock_bot.guilds = [Mock(member_count=100), Mock(member_count=200)] + mock_bot.active_threads = {"1": "thread1"} + mock_bot.latency = 0.05 + + service = BotStatsService(mock_bot, None) + + with patch.object(service, 'get_message_stats', new_callable=AsyncMock) as mock_msg, \ + patch.object(service, 'get_queue_stats', new_callable=AsyncMock) as mock_queue: + mock_msg.return_value = {"today": 10, "week": 50} + mock_queue.return_value = {"high": 1, "medium": 2, "low": 3} + + stats = await service.get_all_stats() + + assert stats.guild_count == 2 + assert stats.total_members == 300 + assert stats.active_threads == 1 + assert stats.messages_today == 10 + print("PASS: test_stats_service") + + +async def test_stats_handles_errors(): + from app.services.admin.bot_stats_service import BotStatsService + + mock_bot = Mock() + mock_bot.guilds = [] + mock_bot.active_threads = {} + mock_bot.latency = 0.1 + + service = BotStatsService(mock_bot, None) + + with patch('app.services.admin.bot_stats_service.get_supabase_client') as mock_supa: + mock_supa.side_effect = Exception("Database error") + stats = await service.get_all_stats() + assert stats.messages_today == 0 + print("PASS: test_stats_handles_errors") + + +async def test_health_all_healthy(): + from app.services.admin.health_check_service import HealthCheckService, ServiceHealth + + service = HealthCheckService(None) + + with patch.object(service, 'check_supabase', new_callable=AsyncMock) as m1, \ + patch.object(service, 'check_rabbitmq', new_callable=AsyncMock) as m2, \ + patch.object(service, 'check_weaviate', new_callable=AsyncMock) as m3, \ + patch.object(service, 'check_falkordb', new_callable=AsyncMock) as m4, \ + patch.object(service, 'check_gemini_api', new_callable=AsyncMock) as m5: + + m1.return_value = ServiceHealth("Supabase", "healthy", 10) + m2.return_value = ServiceHealth("RabbitMQ", "healthy", 5) + m3.return_value = ServiceHealth("Weaviate", "healthy", 15) + m4.return_value = ServiceHealth("FalkorDB", "healthy", 12) + m5.return_value = ServiceHealth("Gemini", "healthy", 100) + + health = await service.get_all_health() + + assert health.overall_status == "healthy" + assert len(health.services) == 5 + print("PASS: test_health_all_healthy") + + +async def test_health_service_down(): + from app.services.admin.health_check_service import HealthCheckService, ServiceHealth + + service = HealthCheckService(None) + + with patch.object(service, 'check_supabase', new_callable=AsyncMock) as m1, \ + patch.object(service, 'check_rabbitmq', new_callable=AsyncMock) as m2, \ + patch.object(service, 'check_weaviate', new_callable=AsyncMock) as m3, \ + patch.object(service, 'check_falkordb', new_callable=AsyncMock) as m4, \ + patch.object(service, 'check_gemini_api', new_callable=AsyncMock) as m5: + + m1.return_value = ServiceHealth("Supabase", "unhealthy", 0, "Connection failed") + m2.return_value = ServiceHealth("RabbitMQ", "healthy", 5) + m3.return_value = ServiceHealth("Weaviate", "healthy", 15) + m4.return_value = ServiceHealth("FalkorDB", "healthy", 12) + m5.return_value = ServiceHealth("Gemini", "healthy", 100) + + health = await service.get_all_health() + + assert health.overall_status == "unhealthy" + print("PASS: test_health_service_down") + + +async def test_user_info_verified(): + from app.services.admin.user_info_service import UserInfoService + + mock_bot = Mock() + mock_bot.active_threads = {"123456": "thread1"} + + service = UserInfoService(mock_bot) + + mock_user = Mock() + mock_user.id = 123456 + mock_user.name = "testuser" + mock_user.display_name = "Test User" + mock_user.avatar = Mock() + mock_user.avatar.url = "https://example.com/avatar.png" + mock_user.created_at = Mock() + mock_user.created_at.strftime = Mock(return_value="2020-01-01") + + mock_member = Mock() + mock_member.roles = [Mock(), Mock(), Mock()] + + with patch.object(service, 'get_user_profile', new_callable=AsyncMock) as m1, \ + patch.object(service, 'get_user_message_count', new_callable=AsyncMock) as m2, \ + patch.object(service, 'get_last_message', new_callable=AsyncMock) as m3: + + m1.return_value = {"is_verified": True, "github_username": "testuser_gh"} + m2.return_value = 42 + m3.return_value = "2024-01-15T10:00:00" + + info = await service.get_full_user_info(mock_user, mock_member) + + assert info.is_verified is True + assert info.github_username == "testuser_gh" + assert info.message_count == 42 + assert info.has_active_thread is True + print("PASS: test_user_info_verified") + + +async def test_user_info_unverified(): + from app.services.admin.user_info_service import UserInfoService + + mock_bot = Mock() + mock_bot.active_threads = {} + + service = UserInfoService(mock_bot) + + mock_user = Mock() + mock_user.id = 999999 + mock_user.name = "newuser" + mock_user.display_name = "New User" + mock_user.avatar = None + mock_user.created_at = Mock() + mock_user.created_at.strftime = Mock(return_value="2024-01-01") + + with patch.object(service, 'get_user_profile', new_callable=AsyncMock) as m1, \ + patch.object(service, 'get_user_message_count', new_callable=AsyncMock) as m2, \ + patch.object(service, 'get_last_message', new_callable=AsyncMock) as m3: + + m1.return_value = None + m2.return_value = 0 + m3.return_value = None + + info = await service.get_full_user_info(mock_user, None) + + assert info.is_verified is False + assert info.github_username is None + assert info.message_count == 0 + print("PASS: test_user_info_unverified") + + +async def run_all_tests(): + print("\n" + "=" * 60) + print("Running Admin Monitoring Tests") + print("=" * 60 + "\n") + + passed = 0 + failed = 0 + + tests = [ + test_stats_service, + test_stats_handles_errors, + test_health_all_healthy, + test_health_service_down, + test_user_info_verified, + test_user_info_unverified, + ] + + for test in tests: + try: + await test() + passed += 1 + except Exception as e: + failed += 1 + print(f"FAIL: {test.__name__}: {e}") + + print("\n" + "=" * 60) + print(f"Results: {passed} passed, {failed} failed") + print("=" * 60) + + return failed == 0 + + +if __name__ == "__main__": + success = asyncio.run(run_all_tests()) + sys.exit(0 if success else 1) diff --git a/tests/test_admin_permissions.py b/tests/test_admin_permissions.py new file mode 100644 index 00000000..fdec677c --- /dev/null +++ b/tests/test_admin_permissions.py @@ -0,0 +1,236 @@ +import asyncio +import discord +from unittest.mock import Mock, AsyncMock, patch +import sys +from pathlib import Path + +ROOT = Path(__file__).resolve().parent.parent +sys.path.insert(0, str(ROOT)) +sys.path.insert(0, str(ROOT / "backend")) + + +class MockCog: + def __init__(self): + self.bot = Mock() + + +def create_mock_interaction(user_id=999999999, guild_id=987654321, command_name="test_command", is_admin_user=False): + interaction = Mock() + interaction.user = Mock() + interaction.user.id = user_id + interaction.user.name = "test_user" + + interaction.guild = Mock() + interaction.guild.id = guild_id + + member = Mock() + member.guild_permissions = Mock() + member.guild_permissions.administrator = is_admin_user + interaction.guild.get_member = Mock(return_value=member) + + interaction.command = Mock() + interaction.command.name = command_name + + interaction.response = Mock() + interaction.response.is_done = Mock(return_value=False) + interaction.response.send_message = AsyncMock() + + interaction.followup = Mock() + interaction.followup.send = AsyncMock() + + return interaction + + +async def test_admin_decorator_allows_administrator(): + from integrations.discord.permissions import require_admin + + with patch("integrations.discord.permissions.settings") as mock_settings, \ + patch("integrations.discord.permissions.log_admin_action", new_callable=AsyncMock): + + mock_settings.bot_owner_id = 123456789 + interaction = create_mock_interaction(user_id=999999999, is_admin_user=True) + cog = MockCog() + + called = False + + @require_admin + async def test_command(self, interaction): + nonlocal called + called = True + + await test_command(cog, interaction) + + assert called, "Command should execute for admin user" + print("PASS: test_admin_decorator_allows_administrator") + + +async def test_admin_decorator_allows_bot_owner(): + from integrations.discord.permissions import require_admin + + with patch("integrations.discord.permissions.settings") as mock_settings, \ + patch("integrations.discord.permissions.log_admin_action", new_callable=AsyncMock): + + mock_settings.bot_owner_id = 123456789 + interaction = create_mock_interaction(user_id=123456789, is_admin_user=False) + cog = MockCog() + + called = False + + @require_admin + async def test_command(self, interaction): + nonlocal called + called = True + + await test_command(cog, interaction) + + assert called, "Command should execute for bot owner" + print("PASS: test_admin_decorator_allows_bot_owner") + + +async def test_admin_decorator_denies_regular_user(): + from integrations.discord.permissions import require_admin + + with patch("integrations.discord.permissions.settings") as mock_settings, \ + patch("integrations.discord.permissions.log_admin_action", new_callable=AsyncMock): + + mock_settings.bot_owner_id = 123456789 + interaction = create_mock_interaction(user_id=999999999, is_admin_user=False) + cog = MockCog() + + called = False + + @require_admin + async def test_command(self, interaction): + nonlocal called + called = True + + await test_command(cog, interaction) + + assert not called, "Command should not execute for regular user" + assert interaction.response.send_message.called, "Should send error message" + print("PASS: test_admin_decorator_denies_regular_user") + + +async def test_bot_owner_decorator_denies_admin(): + from integrations.discord.permissions import require_bot_owner + + with patch("integrations.discord.permissions.settings") as mock_settings, \ + patch("integrations.discord.permissions.log_admin_action", new_callable=AsyncMock): + + mock_settings.bot_owner_id = 123456789 + interaction = create_mock_interaction(user_id=999999999, is_admin_user=True) + cog = MockCog() + + called = False + + @require_bot_owner + async def test_command(self, interaction): + nonlocal called + called = True + + await test_command(cog, interaction) + + assert not called, "Command should not execute for non-owner admin" + print("PASS: test_bot_owner_decorator_denies_admin") + + +async def test_bot_owner_decorator_allows_owner(): + from integrations.discord.permissions import require_bot_owner + + with patch("integrations.discord.permissions.settings") as mock_settings, \ + patch("integrations.discord.permissions.log_admin_action", new_callable=AsyncMock): + + mock_settings.bot_owner_id = 123456789 + interaction = create_mock_interaction(user_id=123456789, is_admin_user=False) + cog = MockCog() + + called = False + + @require_bot_owner + async def test_command(self, interaction): + nonlocal called + called = True + + await test_command(cog, interaction) + + assert called, "Command should execute for bot owner" + print("PASS: test_bot_owner_decorator_allows_owner") + + +async def test_permission_check_logging(): + from integrations.discord.permissions import require_admin + + with patch("integrations.discord.permissions.settings") as mock_settings, \ + patch("integrations.discord.permissions.log_admin_action", new_callable=AsyncMock) as mock_log: + + mock_settings.bot_owner_id = 123456789 + interaction = create_mock_interaction(user_id=999999999, is_admin_user=False, command_name="stats") + cog = MockCog() + + @require_admin + async def test_command(self, interaction): + pass + + await test_command(cog, interaction) + + assert mock_log.called, "Should log permission check" + call_kwargs = mock_log.call_args.kwargs + assert call_kwargs["executor_id"] == "999999999" + assert call_kwargs["command_name"] == "stats" + assert call_kwargs["action_result"] == "failure" + print("PASS: test_permission_check_logging") + + +def test_error_embed_generation(): + from integrations.discord.permissions import get_permission_embed + + embed = get_permission_embed() + + assert embed.title == "Permission Denied" + assert "don't have permission" in embed.description + assert len(embed.fields) >= 2 + print("PASS: test_error_embed_generation") + + +async def run_all_tests(): + print("\n" + "=" * 60) + print("Running Admin Permissions Tests") + print("=" * 60 + "\n") + + passed = 0 + failed = 0 + + tests = [ + test_admin_decorator_allows_administrator, + test_admin_decorator_allows_bot_owner, + test_admin_decorator_denies_regular_user, + test_bot_owner_decorator_denies_admin, + test_bot_owner_decorator_allows_owner, + test_permission_check_logging, + ] + + for test in tests: + try: + await test() + passed += 1 + except Exception as e: + failed += 1 + print(f"FAIL: {test.__name__}: {e}") + + try: + test_error_embed_generation() + passed += 1 + except Exception as e: + failed += 1 + print(f"FAIL: test_error_embed_generation: {e}") + + print("\n" + "=" * 60) + print(f"Results: {passed} passed, {failed} failed") + print("=" * 60) + + return failed == 0 + + +if __name__ == "__main__": + success = asyncio.run(run_all_tests()) + sys.exit(0 if success else 1) diff --git a/tests/test_embedding_service.py b/tests/test_embedding_service.py index 9794bebb..3fa8e507 100644 --- a/tests/test_embedding_service.py +++ b/tests/test_embedding_service.py @@ -1,9 +1,15 @@ import sys import os -sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) -from backend.app.services.embedding_service.service import EmbeddingService -import unittest +from pathlib import Path + from sklearn.metrics.pairwise import cosine_similarity +import unittest + +ROOT = Path(__file__).resolve().parent.parent +sys.path.insert(0, str(ROOT)) +sys.path.insert(0, str(ROOT / "backend")) + +from app.services.embedding_service.service import EmbeddingService class TestEmbeddingService(unittest.IsolatedAsyncioTestCase): @@ -14,44 +20,45 @@ async def test_get_embedding(self): text = "Hi, this seems to be great!" embedding = await self.embedding_service.get_embedding(text) self.assertTrue(len(embedding) == 384) - + async def test_similarity(self): texts = ["Hi, this seems to be great!", "This is good!"] embeddings = await self.embedding_service.get_embeddings(texts) similarity = cosine_similarity([embeddings[0]], [embeddings[1]])[0][0] self.assertTrue(similarity > 0.5) - + def test_get_model_info(self): # Access model once to initialize it _ = self.embedding_service.model - + info = self.embedding_service.get_model_info() - + # Check that all expected keys are present self.assertIn("model_name", info) self.assertIn("device", info) self.assertIn("embedding_size", info) - + # Verify values self.assertEqual(info["model_name"], self.embedding_service.model_name) self.assertEqual(info["device"], self.embedding_service.device) self.assertEqual(info["embedding_size"], 384) # For BGE-small model - + def test_clear_cache(self): # Access model first to ensure it's loaded _ = self.embedding_service.model self.assertIsNotNone(self.embedding_service._model) - + # Clear the cache self.embedding_service.clear_cache() - + # Verify model is cleared self.assertIsNone(self.embedding_service._model) - + # Verify model loads again after clearing _ = self.embedding_service.model self.assertIsNotNone(self.embedding_service._model) + # run the tests if __name__ == "__main__": - unittest.main() \ No newline at end of file + unittest.main() diff --git a/tests/test_supabase.py b/tests/test_supabase.py index 55b98671..71e8b4aa 100644 --- a/tests/test_supabase.py +++ b/tests/test_supabase.py @@ -1,7 +1,16 @@ -from backend.app.models.database.supabase import User, Interaction, CodeChunk, Repository -from uuid import uuid4 -from backend.app.database.supabase.client import get_supabase_client +import sys +from pathlib import Path + from datetime import datetime # Your User model import +from uuid import uuid4 + +ROOT = Path(__file__).resolve().parent.parent +sys.path.insert(0, str(ROOT)) +sys.path.insert(0, str(ROOT / "backend")) + +from app.database.supabase.client import get_supabase_client +from app.models.database.supabase import User, Interaction, CodeChunk, Repository + client = get_supabase_client() diff --git a/tests/test_weaviate.py b/tests/test_weaviate.py index ff8fd863..f36915af 100644 --- a/tests/test_weaviate.py +++ b/tests/test_weaviate.py @@ -1,10 +1,18 @@ -from app.db.weaviate.weaviate_client import get_client -from datetime import datetime +import sys +from pathlib import Path + from uuid import uuid4 -from app.model.weaviate.models import ( +from datetime import datetime + +ROOT = Path(__file__).resolve().parent.parent +sys.path.insert(0, str(ROOT)) +sys.path.insert(0, str(ROOT / "backend")) + +from app.database.weaviate.client import get_client +from app.models.database.weaviate import ( WeaviateUserProfile, WeaviateCodeChunk, - WeaviateInteraction + WeaviateInteraction, ) diff --git a/tests/tests_db.py b/tests/tests_db.py index 36cffa61..1865a54d 100644 --- a/tests/tests_db.py +++ b/tests/tests_db.py @@ -1,10 +1,14 @@ +import logging +import asyncio import sys import os -sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) -from backend.app.services.vector_db.service import EmbeddingItem, VectorDBService -import asyncio -import logging -from backend.app.services.vector_db.service import EmbeddingItem, VectorDBService +from pathlib import Path + +ROOT = Path(__file__).resolve().parent.parent +sys.path.insert(0, str(ROOT)) +sys.path.insert(0, str(ROOT / "backend")) + +from app.services.vector_db.service import EmbeddingItem, VectorDBService logging.basicConfig(level=logging.INFO)