|
| 1 | +""" |
| 2 | +MCP Activity API — SSE streaming + REST endpoints for live MCP monitoring. |
| 3 | +
|
| 4 | +The MCP dashboard connects here to see real-time tool calls, |
| 5 | +active client sessions, and aggregate stats. |
| 6 | +""" |
| 7 | + |
| 8 | +import asyncio |
| 9 | +import json |
| 10 | +import logging |
| 11 | + |
| 12 | +from fastapi import APIRouter, Depends |
| 13 | +from fastapi.responses import StreamingResponse |
| 14 | + |
| 15 | +from app.core.auth import AuthUser, require_admin |
| 16 | +from app.mcp.activity import tracker |
| 17 | + |
| 18 | +logger = logging.getLogger(__name__) |
| 19 | + |
| 20 | +router = APIRouter(prefix="/api/mcp/activity", tags=["mcp-activity"]) |
| 21 | + |
| 22 | + |
| 23 | +@router.get("/stream") |
| 24 | +async def stream_activity(user: AuthUser = Depends(require_admin)): |
| 25 | + """ |
| 26 | + SSE endpoint — streams MCP tool call events in real-time. |
| 27 | + Each event is a JSON-encoded McpEvent. |
| 28 | + """ |
| 29 | + org_id = user.org_id |
| 30 | + queue = tracker.subscribe(org_id) |
| 31 | + |
| 32 | + async def event_generator(): |
| 33 | + try: |
| 34 | + # Send initial connection event |
| 35 | + yield f"data: {json.dumps({'type': 'connected', 'org_id': org_id})}\n\n" |
| 36 | + |
| 37 | + while True: |
| 38 | + try: |
| 39 | + event = await asyncio.wait_for(queue.get(), timeout=25.0) |
| 40 | + payload = event.to_dict() |
| 41 | + payload["type"] = "tool_call" |
| 42 | + yield f"data: {json.dumps(payload)}\n\n" |
| 43 | + except asyncio.TimeoutError: |
| 44 | + # Send keepalive to prevent connection timeout |
| 45 | + yield ": keepalive\n\n" |
| 46 | + except asyncio.CancelledError: |
| 47 | + pass |
| 48 | + finally: |
| 49 | + tracker.unsubscribe(org_id, queue) |
| 50 | + |
| 51 | + return StreamingResponse( |
| 52 | + event_generator(), |
| 53 | + media_type="text/event-stream", |
| 54 | + headers={ |
| 55 | + "Cache-Control": "no-cache", |
| 56 | + "Connection": "keep-alive", |
| 57 | + "X-Accel-Buffering": "no", |
| 58 | + }, |
| 59 | + ) |
| 60 | + |
| 61 | + |
| 62 | +@router.get("/recent") |
| 63 | +async def get_recent_activity( |
| 64 | + limit: int = 50, |
| 65 | + user: AuthUser = Depends(require_admin), |
| 66 | +): |
| 67 | + """Get recent MCP tool call events.""" |
| 68 | + events = tracker.get_recent_events(user.org_id, limit=limit) |
| 69 | + return [e.to_dict() for e in events] |
| 70 | + |
| 71 | + |
| 72 | +@router.get("/sessions") |
| 73 | +async def get_active_sessions(user: AuthUser = Depends(require_admin)): |
| 74 | + """Get currently active MCP client sessions.""" |
| 75 | + return tracker.get_active_sessions(user.org_id) |
| 76 | + |
| 77 | + |
| 78 | +@router.get("/stats") |
| 79 | +async def get_activity_stats(user: AuthUser = Depends(require_admin)): |
| 80 | + """Get aggregate MCP activity statistics.""" |
| 81 | + return tracker.get_stats(user.org_id) |
0 commit comments