From fb02b5f4cc510a05a5cee92077a90505fb37bfd6 Mon Sep 17 00:00:00 2001 From: frdel <38891707+frdel@users.noreply.github.com> Date: Mon, 30 Mar 2026 17:15:50 +0200 Subject: [PATCH 1/6] enable api caching enable websocket and api caching params --- helpers/api.py | 22 ++++------------------ helpers/ws.py | 2 +- 2 files changed, 5 insertions(+), 19 deletions(-) diff --git a/helpers/api.py b/helpers/api.py index f3b0318616..8d4f64b083 100644 --- a/helpers/api.py +++ b/helpers/api.py @@ -24,7 +24,7 @@ ThreadLockType = Union[threading.Lock, threading.RLock] CACHE_AREA = "api_handlers(api)" -cache.toggle_area(CACHE_AREA, False) # cache off for now +# cache.toggle_area(CACHE_AREA, False) # cache off for now Input = dict Output = Union[Dict[str, Any], Response] @@ -236,10 +236,13 @@ async def call_handler() -> BaseResponse: def register_watchdogs(): from helpers import watchdog + from helpers.ws import CACHE_AREA as WS_CACHE_AREA + def on_api_change(items: list[watchdog.WatchItem]): PrintStyle.debug("API endpoint watchdog triggered:", items) cache.clear(CACHE_AREA) + cache.clear(WS_CACHE_AREA) watchdog.add_watchdog( "api_handlers", @@ -250,20 +253,3 @@ def on_api_change(items: list[watchdog.WatchItem]): patterns=["*.py"], handler=on_api_change, ) - - # WS handler cache shares the same watched directories (api/, usr/api/) - from helpers.ws import CACHE_AREA as WS_CACHE_AREA - - def on_ws_change(items: list[watchdog.WatchItem]): - PrintStyle.debug("WS handler watchdog triggered:", items) - cache.clear(WS_CACHE_AREA) - - watchdog.add_watchdog( - "ws_handlers", - roots=[ - files.get_abs_path(files.API_DIR), - files.get_abs_path(files.USER_DIR, files.API_DIR), - ], - patterns=["ws_*.py"], - handler=on_ws_change, - ) diff --git a/helpers/ws.py b/helpers/ws.py index 41f42e7e53..79a299b912 100644 --- a/helpers/ws.py +++ b/helpers/ws.py @@ -159,7 +159,7 @@ def validate_ws_origin(environ: dict[str, Any]) -> tuple[bool, str | None]: NAMESPACE = "/ws" CACHE_AREA = "ws_handlers(api)(plugins)" -cache.toggle_area(CACHE_AREA, False) # cache off for now +# cache.toggle_area(CACHE_AREA, False) # cache off for now @dataclass From 561980b77748381cecb0ab968a97e7fa6337ce10 Mon Sep 17 00:00:00 2001 From: Alexander Vaagan <2428222+vaaale@users.noreply.github.com> Date: Tue, 31 Mar 2026 15:05:18 +0200 Subject: [PATCH 2/6] Implemented User Elicitation from MCP server --- .../webui_ws_event/_20_mcp_elicitation.py | 67 ++++ helpers/mcp_elicitation.py | 235 ++++++++++++++ helpers/mcp_handler.py | 5 + tests/mcp_elicitation_test_server.py | 144 +++++++++ .../chat/elicitation/elicitation-panel.html | 295 ++++++++++++++++++ .../chat/elicitation/elicitation-store.js | 149 +++++++++ webui/index.html | 2 + webui/js/messages.js | 37 +++ 8 files changed, 934 insertions(+) create mode 100644 extensions/python/webui_ws_event/_20_mcp_elicitation.py create mode 100644 helpers/mcp_elicitation.py create mode 100644 tests/mcp_elicitation_test_server.py create mode 100644 webui/components/chat/elicitation/elicitation-panel.html create mode 100644 webui/components/chat/elicitation/elicitation-store.js diff --git a/extensions/python/webui_ws_event/_20_mcp_elicitation.py b/extensions/python/webui_ws_event/_20_mcp_elicitation.py new file mode 100644 index 0000000000..75b20d2695 --- /dev/null +++ b/extensions/python/webui_ws_event/_20_mcp_elicitation.py @@ -0,0 +1,67 @@ +from helpers.extension import Extension +from helpers.mcp_elicitation import ElicitationManager +from helpers.print_style import PrintStyle + + +class McpElicitationWsHandler(Extension): + """Handle elicitation response events from the frontend.""" + + async def execute( + self, + instance=None, + sid: str = "", + event_type: str = "", + data: dict | None = None, + response_data: dict | None = None, + **kwargs, + ): + if instance is None or data is None: + return + + if event_type == "mcp_elicitation_response": + await self._handle_elicitation_response(data, response_data) + elif event_type == "mcp_elicitation_list_pending": + await self._handle_list_pending(response_data) + + async def _handle_elicitation_response( + self, + data: dict, + response_data: dict | None, + ): + request_id = data.get("request_id", "") + action = data.get("action", "") + content = data.get("content", None) + + if not request_id: + PrintStyle(font_color="orange", padding=True).print( + "MCP Elicitation WS: Received response with no request_id" + ) + if response_data is not None: + response_data["ok"] = False + response_data["error"] = "Missing request_id" + return + + if not action: + PrintStyle(font_color="orange", padding=True).print( + f"MCP Elicitation WS: Received response with no action for request '{request_id}'" + ) + if response_data is not None: + response_data["ok"] = False + response_data["error"] = "Missing action" + return + + manager = ElicitationManager.get_instance() + resolved = manager.resolve(request_id, action, content) + + if response_data is not None: + response_data["ok"] = resolved + if not resolved: + response_data["error"] = f"Request '{request_id}' not found or already resolved" + + async def _handle_list_pending(self, response_data: dict | None): + manager = ElicitationManager.get_instance() + pending = manager.get_all_pending() + + if response_data is not None: + response_data["ok"] = True + response_data["pending"] = pending diff --git a/helpers/mcp_elicitation.py b/helpers/mcp_elicitation.py new file mode 100644 index 0000000000..7f850cb4c3 --- /dev/null +++ b/helpers/mcp_elicitation.py @@ -0,0 +1,235 @@ +import asyncio +import threading +import uuid +from typing import Any, Optional + +from mcp.shared.context import RequestContext +from mcp.client.session import ClientSession +import mcp.types as types + +from helpers.print_style import PrintStyle + + +class _PendingElicitation: + """Tracks a single in-flight elicitation request awaiting a frontend response.""" + + def __init__( + self, + request_id: str, + message: str, + requested_schema: dict[str, Any], + server_name: str, + loop: asyncio.AbstractEventLoop, + ): + self.request_id = request_id + self.message = message + self.requested_schema = requested_schema + self.server_name = server_name + self.loop = loop + self.event = asyncio.Event() + self.result: Optional[types.ElicitResult] = None + + +class ElicitationManager: + """Singleton that bridges MCP elicitation callbacks with the WebSocket frontend. + + Flow: + 1. MCP SDK invokes the elicitation callback during a tool call session. + 2. The callback registers a _PendingElicitation and broadcasts the request + to the frontend via WebSocket. + 3. The callback awaits the asyncio.Event until the frontend responds. + 4. The WS extension receives the frontend response, resolves the pending + elicitation, and sets the Event. + 5. The callback returns the ElicitResult to the MCP SDK. + """ + + _instance: Optional["ElicitationManager"] = None + _lock = threading.Lock() + + def __init__(self): + self._pending: dict[str, _PendingElicitation] = {} + self._pending_lock = threading.Lock() + + @classmethod + def get_instance(cls) -> "ElicitationManager": + if cls._instance is None: + with cls._lock: + if cls._instance is None: + cls._instance = cls() + return cls._instance + + def create_elicitation_callback(self, server_name: str): + """Create an elicitation callback bound to a specific MCP server name.""" + + async def elicitation_callback( + context: RequestContext[ClientSession, Any], + params: types.ElicitRequestParams, + ) -> types.ElicitResult | types.ErrorData: + request_id = str(uuid.uuid4()) + + loop = asyncio.get_running_loop() + pending = _PendingElicitation( + request_id=request_id, + message=params.message, + requested_schema=params.requestedSchema, + server_name=server_name, + loop=loop, + ) + + with self._pending_lock: + self._pending[request_id] = pending + + PrintStyle(font_color="cyan", padding=True).print( + f"MCP Elicitation: Server '{server_name}' requests input: {params.message}" + ) + + self._log_to_contexts( + request_id=request_id, + heading=f"icon://input MCP Server '{server_name}' requests input", + content=params.message, + kvps={ + "server_name": server_name, + "request_id": request_id, + "status": "pending", + }, + ) + + try: + await self._broadcast_elicitation_request(pending) + + # Wait for the frontend to respond (timeout after 5 minutes) + try: + await asyncio.wait_for(pending.event.wait(), timeout=300) + except asyncio.TimeoutError: + PrintStyle(font_color="orange", padding=True).print( + f"MCP Elicitation: Request '{request_id}' timed out after 5 minutes" + ) + return types.ElicitResult( + action="cancel", + content=None, + ) + + if pending.result is not None: + return pending.result + + return types.ElicitResult(action="cancel", content=None) + finally: + with self._pending_lock: + self._pending.pop(request_id, None) + + return elicitation_callback + + def resolve( + self, + request_id: str, + action: str, + content: dict[str, Any] | None = None, + ) -> bool: + """Resolve a pending elicitation with the frontend's response. + + Returns True if the request was found and resolved, False otherwise. + """ + with self._pending_lock: + pending = self._pending.get(request_id) + + if pending is None: + PrintStyle(font_color="orange", padding=True).print( + f"MCP Elicitation: No pending request found for id '{request_id}'" + ) + return False + + if action not in ("accept", "decline", "cancel"): + PrintStyle(font_color="red", padding=True).print( + f"MCP Elicitation: Invalid action '{action}' for request '{request_id}'" + ) + return False + + pending.result = types.ElicitResult( + action=action, + content=content if action == "accept" else None, + ) + # Signal the event on its originating loop for thread-safety. + pending.loop.call_soon_threadsafe(pending.event.set) + + PrintStyle(font_color="green", padding=True).print( + f"MCP Elicitation: Request '{request_id}' resolved with action='{action}'" + ) + + self._log_to_contexts( + request_id=request_id, + heading=f"icon://input MCP Elicitation '{pending.server_name}': {action}", + content=f"User responded with: {action}", + kvps={ + "server_name": pending.server_name, + "request_id": request_id, + "action": action, + "status": "resolved", + }, + ) + + return True + + def get_pending(self, request_id: str) -> Optional[_PendingElicitation]: + """Get a pending elicitation by request_id.""" + with self._pending_lock: + return self._pending.get(request_id) + + def get_all_pending(self) -> list[dict[str, Any]]: + """Get all pending elicitation requests as serializable dicts.""" + with self._pending_lock: + return [ + { + "request_id": p.request_id, + "message": p.message, + "requested_schema": p.requested_schema, + "server_name": p.server_name, + } + for p in self._pending.values() + ] + + @staticmethod + def _log_to_contexts( + request_id: str, + heading: str, + content: str, + kvps: dict[str, Any], + ): + """Log an elicitation event to all active agent contexts.""" + try: + from agent import AgentContext + AgentContext.log_to_all( + type="mcp_elicitation", + heading=heading, + content=content, + kvps=kvps, + id=request_id, + ) + except Exception as e: + PrintStyle(font_color="orange", padding=True).print( + f"MCP Elicitation: Failed to log to contexts: {e}" + ) + + async def _broadcast_elicitation_request(self, pending: _PendingElicitation): + """Broadcast an elicitation request to all connected WebSocket clients.""" + from helpers.ws_manager import get_shared_ws_manager + from helpers.ws import NAMESPACE + + payload = { + "request_id": pending.request_id, + "message": pending.message, + "requested_schema": pending.requested_schema, + "server_name": pending.server_name, + } + + try: + manager = get_shared_ws_manager() + await manager.broadcast( + NAMESPACE, + "mcp_elicitation_request", + payload, + handler_id="helpers.mcp_elicitation.ElicitationManager", + ) + except Exception as e: + PrintStyle(font_color="red", padding=True).print( + f"MCP Elicitation: Failed to broadcast request: {e}" + ) diff --git a/helpers/mcp_handler.py b/helpers/mcp_handler.py index 439127b218..22541d48fd 100644 --- a/helpers/mcp_handler.py +++ b/helpers/mcp_handler.py @@ -24,6 +24,7 @@ from helpers import errors from helpers import settings from helpers.log import LogItem +from helpers.mcp_elicitation import ElicitationManager import httpx @@ -852,6 +853,9 @@ async def _execute_with_session( stdio, write = await self._create_stdio_transport(temp_stack) # PrintStyle(font_color="cyan").print(f"MCPClientBase ({self.server.name} - {operation_name}): Transport created. Initializing session...") + elicitation_cb = ElicitationManager.get_instance().create_elicitation_callback( + self.server.name + ) session = await temp_stack.enter_async_context( ClientSession( stdio, # type: ignore @@ -859,6 +863,7 @@ async def _execute_with_session( read_timeout_seconds=timedelta( seconds=read_timeout_seconds ), + elicitation_callback=elicitation_cb, ) ) await session.initialize() diff --git a/tests/mcp_elicitation_test_server.py b/tests/mcp_elicitation_test_server.py new file mode 100644 index 0000000000..71698b6705 --- /dev/null +++ b/tests/mcp_elicitation_test_server.py @@ -0,0 +1,144 @@ +#!/usr/bin/env python3 +""" +Simple MCP server for end-to-end testing of the elicitation feature. + +Usage: + Start the server: + python tests/mcp_elicitation_test_server.py + + Add to Agent Zero MCP config as: + { + "name": "elicitation-test", + "type": "streamable-http", + "url": "http://localhost:8100/mcp" + } + +Tools provided: + - greet_user: Elicits user's name and greeting style, returns a personalized greeting. + - create_task: Elicits task details (title, priority, description), returns summary. + - confirm_action: Elicits a yes/no confirmation before proceeding. + - simple_echo: No elicitation, just echoes input (control test). +""" + +from enum import Enum +from typing import Optional + +from fastmcp import FastMCP, Context +from fastmcp.server.elicitation import AcceptedElicitation +from pydantic import BaseModel, Field + + +mcp = FastMCP( + name="elicitation-test", + instructions="A test server for MCP elicitation. Use the tools to test human-in-the-loop input gathering.", +) + + +# --- Elicitation response models --- + +class GreetingInfo(BaseModel): + name: str = Field(description="Your name") + style: str = Field(description="Greeting style: formal, casual, or pirate") + + +class Priority(str, Enum): + LOW = "low" + MEDIUM = "medium" + HIGH = "high" + CRITICAL = "critical" + + +class TaskInfo(BaseModel): + title: str = Field(description="Task title") + priority: Priority = Field(default=Priority.MEDIUM, description="Task priority level") + description: str = Field(default="", description="Optional task description") + + +class Confirmation(BaseModel): + confirmed: bool = Field(description="Do you want to proceed?") + + +# --- Tools --- + +@mcp.tool() +async def greet_user(ctx: Context, reason: str = "general") -> str: + """Generate a personalized greeting. Will ask for the user's name and preferred greeting style. + + Args: + reason: Why the greeting is being generated (e.g. 'welcome', 'farewell', 'general'). + """ + result = await ctx.elicit( + message="I'd like to greet you! Please provide your name and preferred greeting style.", + response_type=GreetingInfo, + ) + + if isinstance(result, AcceptedElicitation): + name = result.data.name + style = result.data.style.lower() + if style == "formal": + return f"Good day, {name}. It is a pleasure to make your acquaintance." + elif style == "pirate": + return f"Ahoy, {name}! Welcome aboard, ye scallywag!" + else: + return f"Hey {name}! What's up?" + else: + return f"Greeting cancelled (action: {result.action})." + + +@mcp.tool() +async def create_task(ctx: Context, project: str = "default") -> str: + """Create a new task. Will ask for task details via elicitation. + + Args: + project: The project to create the task in. + """ + result = await ctx.elicit( + message=f"Please provide details for the new task in project '{project}'.", + response_type=TaskInfo, + ) + + if isinstance(result, AcceptedElicitation): + task = result.data + return ( + f"Task created in '{project}':\n" + f" Title: {task.title}\n" + f" Priority: {task.priority.value}\n" + f" Description: {task.description or '(none)'}" + ) + else: + return f"Task creation cancelled (action: {result.action})." + + +@mcp.tool() +async def confirm_action(action_description: str, ctx: Context) -> str: + """Ask for user confirmation before performing an action. + + Args: + action_description: Description of the action that needs confirmation. + """ + result = await ctx.elicit( + message=f"Please confirm: {action_description}", + response_type=Confirmation, + ) + + if isinstance(result, AcceptedElicitation): + if result.data.confirmed: + return f"Action confirmed: {action_description}. Proceeding." + else: + return f"User explicitly declined via the form for: {action_description}." + else: + return f"Confirmation cancelled (action: {result.action})." + + +@mcp.tool() +async def simple_echo(message: str) -> str: + """Echo the input message back. No elicitation involved (control test). + + Args: + message: The message to echo. + """ + return f"Echo: {message}" + + +if __name__ == "__main__": + mcp.run(transport="streamable-http", host="0.0.0.0", port=8100) diff --git a/webui/components/chat/elicitation/elicitation-panel.html b/webui/components/chat/elicitation/elicitation-panel.html new file mode 100644 index 0000000000..a8358ad5bf --- /dev/null +++ b/webui/components/chat/elicitation/elicitation-panel.html @@ -0,0 +1,295 @@ + + + + + +
+ +
+ + + + diff --git a/webui/components/chat/elicitation/elicitation-store.js b/webui/components/chat/elicitation/elicitation-store.js new file mode 100644 index 0000000000..6250896338 --- /dev/null +++ b/webui/components/chat/elicitation/elicitation-store.js @@ -0,0 +1,149 @@ +import { createStore } from "/js/AlpineStore.js"; +import { getNamespacedClient } from "/js/websocket.js"; + +const stateSocket = getNamespacedClient("/ws"); + +export const store = createStore("elicitation", { + /** @type {Array<{request_id: string, message: string, requested_schema: object, server_name: string, formData: object, status: string}>} */ + pending: [], + + initialized: false, + + async init() { + if (this.initialized) return; + this.initialized = true; + + await stateSocket.on("mcp_elicitation_request", (envelope) => { + const data = envelope?.data || envelope; + this._handleRequest(data); + }); + }, + + _handleRequest(data) { + if (!data || !data.request_id) return; + + const existing = this.pending.find((p) => p.request_id === data.request_id); + if (existing) return; + + const formData = this._buildDefaultFormData(data.requested_schema); + + this.pending = [ + ...this.pending, + { + request_id: data.request_id, + message: data.message || "", + requested_schema: data.requested_schema || {}, + server_name: data.server_name || "", + formData, + status: "pending", + }, + ]; + }, + + _buildDefaultFormData(schema) { + const formData = {}; + if (!schema || !schema.properties) return formData; + + for (const [key, prop] of Object.entries(schema.properties)) { + if (prop.default !== undefined) { + formData[key] = prop.default; + } else if (prop.type === "boolean") { + formData[key] = false; + } else if (prop.type === "number" || prop.type === "integer") { + formData[key] = prop.minimum ?? 0; + } else { + formData[key] = ""; + } + } + return formData; + }, + + getFields(schema) { + if (!schema || !schema.properties) return []; + const required = new Set(schema.required || []); + return Object.entries(schema.properties).map(([key, prop]) => ({ + key, + label: prop.title || key, + description: prop.description || "", + type: prop.type || "string", + required: required.has(key), + enum: prop.enum || null, + minimum: prop.minimum, + maximum: prop.maximum, + })); + }, + + async submit(requestId) { + const item = this.pending.find((p) => p.request_id === requestId); + if (!item) return; + + item.status = "submitting"; + this.pending = [...this.pending]; + + try { + await stateSocket.emit("mcp_elicitation_response", { + request_id: requestId, + action: "accept", + content: { ...item.formData }, + }); + this.pending = this.pending.filter((p) => p.request_id !== requestId); + } catch (error) { + console.error("[elicitation] submit failed:", error); + item.status = "pending"; + this.pending = [...this.pending]; + } + }, + + async decline(requestId) { + const item = this.pending.find((p) => p.request_id === requestId); + if (!item) return; + + item.status = "declining"; + this.pending = [...this.pending]; + + try { + await stateSocket.emit("mcp_elicitation_response", { + request_id: requestId, + action: "decline", + content: null, + }); + this.pending = this.pending.filter((p) => p.request_id !== requestId); + } catch (error) { + console.error("[elicitation] decline failed:", error); + item.status = "pending"; + this.pending = [...this.pending]; + } + }, + + async cancel(requestId) { + const item = this.pending.find((p) => p.request_id === requestId); + if (!item) return; + + item.status = "cancelling"; + this.pending = [...this.pending]; + + try { + await stateSocket.emit("mcp_elicitation_response", { + request_id: requestId, + action: "cancel", + content: null, + }); + this.pending = this.pending.filter((p) => p.request_id !== requestId); + } catch (error) { + console.error("[elicitation] cancel failed:", error); + item.status = "pending"; + this.pending = [...this.pending]; + } + }, + + updateField(requestId, key, value) { + const item = this.pending.find((p) => p.request_id === requestId); + if (!item) return; + item.formData = { ...item.formData, [key]: value }; + this.pending = [...this.pending]; + }, + + hasPending() { + return this.pending.length > 0; + }, +}); diff --git a/webui/index.html b/webui/index.html index abb457f125..339986ea34 100644 --- a/webui/index.html +++ b/webui/index.html @@ -133,6 +133,8 @@
+ + diff --git a/webui/js/messages.js b/webui/js/messages.js index d949e69dce..acf25a80c4 100644 --- a/webui/js/messages.js +++ b/webui/js/messages.js @@ -99,6 +99,8 @@ export async function getMessageHandler(type) { return drawMessageProgress; case "mcp": return drawMessageMcp; + case "mcp_elicitation": + return drawMessageMcpElicitation; case "subagent": return drawMessageSubagent; case "warning": @@ -1192,6 +1194,41 @@ export function drawMessageToolSimple({ * @param {MessageHandlerArgs & Record} param0 * @returns {MessageHandlerResult} */ +export function drawMessageMcpElicitation({ + id, + type, + heading, + content, + kvps, + timestamp, + agentno = 0, + ...additional +}) { + const title = cleanStepTitle(heading); + let displayKvps = { ...kvps }; + const contentText = String(content ?? ""); + const actionButtons = contentText.trim() + ? [ + createActionButton("detail", "", () => + stepDetailStore.showStepDetail( + buildDetailPayload(arguments[0], { headerLabels: [] }), + ), + ), + ].filter(Boolean) + : []; + + return drawProcessStep({ + id, + title, + code: "ELC", + classes: undefined, + kvps: displayKvps, + content, + actionButtons, + log: arguments[0], + }); +} + export function drawMessageMcp({ id, type, From 2e5a126a18867558b95c30102a19b5fb59f558fd Mon Sep 17 00:00:00 2001 From: Alexander Vaagan <2428222+vaaale@users.noreply.github.com> Date: Tue, 31 Mar 2026 15:35:19 +0200 Subject: [PATCH 3/6] Implemented Sampling --- .../python/webui_ws_event/_21_mcp_sampling.py | 66 ++++ helpers/mcp_handler.py | 5 + helpers/mcp_sampling.py | 357 ++++++++++++++++++ tests/mcp_elicitation_test_server.py | 45 +++ .../chat/sampling/sampling-panel.html | 292 ++++++++++++++ .../chat/sampling/sampling-store.js | 88 +++++ webui/index.html | 2 + webui/js/messages.js | 37 ++ 8 files changed, 892 insertions(+) create mode 100644 extensions/python/webui_ws_event/_21_mcp_sampling.py create mode 100644 helpers/mcp_sampling.py create mode 100644 webui/components/chat/sampling/sampling-panel.html create mode 100644 webui/components/chat/sampling/sampling-store.js diff --git a/extensions/python/webui_ws_event/_21_mcp_sampling.py b/extensions/python/webui_ws_event/_21_mcp_sampling.py new file mode 100644 index 0000000000..91ea76f013 --- /dev/null +++ b/extensions/python/webui_ws_event/_21_mcp_sampling.py @@ -0,0 +1,66 @@ +from helpers.extension import Extension +from helpers.mcp_sampling import SamplingManager +from helpers.print_style import PrintStyle + + +class McpSamplingWsHandler(Extension): + """Handle sampling response events from the frontend.""" + + async def execute( + self, + instance=None, + sid: str = "", + event_type: str = "", + data: dict | None = None, + response_data: dict | None = None, + **kwargs, + ): + if instance is None or data is None: + return + + if event_type == "mcp_sampling_response": + await self._handle_sampling_response(data, response_data) + elif event_type == "mcp_sampling_list_pending": + await self._handle_list_pending(response_data) + + async def _handle_sampling_response( + self, + data: dict, + response_data: dict | None, + ): + request_id = data.get("request_id", "") + action = data.get("action", "") + + if not request_id: + PrintStyle(font_color="orange", padding=True).print( + "MCP Sampling WS: Received response with no request_id" + ) + if response_data is not None: + response_data["ok"] = False + response_data["error"] = "Missing request_id" + return + + if not action: + PrintStyle(font_color="orange", padding=True).print( + f"MCP Sampling WS: Received response with no action for request '{request_id}'" + ) + if response_data is not None: + response_data["ok"] = False + response_data["error"] = "Missing action" + return + + manager = SamplingManager.get_instance() + resolved = manager.resolve(request_id, action) + + if response_data is not None: + response_data["ok"] = resolved + if not resolved: + response_data["error"] = f"Request '{request_id}' not found or already resolved" + + async def _handle_list_pending(self, response_data: dict | None): + manager = SamplingManager.get_instance() + pending = manager.get_all_pending() + + if response_data is not None: + response_data["ok"] = True + response_data["pending"] = pending diff --git a/helpers/mcp_handler.py b/helpers/mcp_handler.py index 22541d48fd..e7f99cf58d 100644 --- a/helpers/mcp_handler.py +++ b/helpers/mcp_handler.py @@ -25,6 +25,7 @@ from helpers import settings from helpers.log import LogItem from helpers.mcp_elicitation import ElicitationManager +from helpers.mcp_sampling import SamplingManager import httpx @@ -856,6 +857,9 @@ async def _execute_with_session( elicitation_cb = ElicitationManager.get_instance().create_elicitation_callback( self.server.name ) + sampling_cb = SamplingManager.get_instance().create_sampling_callback( + self.server.name + ) session = await temp_stack.enter_async_context( ClientSession( stdio, # type: ignore @@ -864,6 +868,7 @@ async def _execute_with_session( seconds=read_timeout_seconds ), elicitation_callback=elicitation_cb, + sampling_callback=sampling_cb, ) ) await session.initialize() diff --git a/helpers/mcp_sampling.py b/helpers/mcp_sampling.py new file mode 100644 index 0000000000..57f3763c29 --- /dev/null +++ b/helpers/mcp_sampling.py @@ -0,0 +1,357 @@ +import asyncio +import threading +import uuid +from typing import Any, Optional + +from mcp.shared.context import RequestContext +from mcp.client.session import ClientSession +import mcp.types as types + +from helpers.print_style import PrintStyle + + +class _PendingSampling: + """Tracks a single in-flight sampling request awaiting user approval.""" + + def __init__( + self, + request_id: str, + server_name: str, + messages: list[dict[str, Any]], + system_prompt: str | None, + max_tokens: int, + temperature: float | None, + model_preferences: dict[str, Any] | None, + stop_sequences: list[str] | None, + metadata: dict[str, Any] | None, + include_context: str | None, + loop: asyncio.AbstractEventLoop, + ): + self.request_id = request_id + self.server_name = server_name + self.messages = messages + self.system_prompt = system_prompt + self.max_tokens = max_tokens + self.temperature = temperature + self.model_preferences = model_preferences + self.stop_sequences = stop_sequences + self.metadata = metadata + self.include_context = include_context + self.loop = loop + self.event = asyncio.Event() + # Set by resolve(): "approve", "reject", or "cancel" + self.action: str | None = None + + +class SamplingManager: + """Singleton that bridges MCP sampling callbacks with the WebSocket frontend. + + Flow: + 1. MCP SDK invokes the sampling callback during a tool call session. + 2. The callback registers a _PendingSampling and broadcasts the request + to the frontend via WebSocket for human-in-the-loop approval. + 3. The callback awaits the asyncio.Event until the user responds. + 4. On approval, the manager calls LiteLLM with the provided messages + using the configured utility model. + 5. The callback returns the CreateMessageResult to the MCP SDK. + """ + + _instance: Optional["SamplingManager"] = None + _lock = threading.Lock() + + def __init__(self): + self._pending: dict[str, _PendingSampling] = {} + self._pending_lock = threading.Lock() + + @classmethod + def get_instance(cls) -> "SamplingManager": + if cls._instance is None: + with cls._lock: + if cls._instance is None: + cls._instance = cls() + return cls._instance + + def create_sampling_callback(self, server_name: str): + """Create a sampling callback bound to a specific MCP server name.""" + + async def sampling_callback( + context: RequestContext[ClientSession, Any], + params: types.CreateMessageRequestParams, + ) -> types.CreateMessageResult | types.ErrorData: + request_id = str(uuid.uuid4()) + + # Serialize SamplingMessage list to plain dicts for WS transport + messages = [] + for msg in params.messages: + m: dict[str, Any] = {"role": msg.role} + if isinstance(msg.content, types.TextContent): + m["content"] = msg.content.text + m["content_type"] = "text" + elif isinstance(msg.content, types.ImageContent): + m["content"] = msg.content.data + m["content_type"] = "image" + m["mime_type"] = msg.content.mimeType + elif isinstance(msg.content, types.AudioContent): + m["content"] = msg.content.data + m["content_type"] = "audio" + m["mime_type"] = msg.content.mimeType + else: + m["content"] = str(msg.content) + m["content_type"] = "unknown" + messages.append(m) + + model_prefs = None + if params.modelPreferences: + model_prefs = params.modelPreferences.model_dump(exclude_none=True) + + loop = asyncio.get_running_loop() + pending = _PendingSampling( + request_id=request_id, + server_name=server_name, + messages=messages, + system_prompt=params.systemPrompt, + max_tokens=params.maxTokens, + temperature=params.temperature, + model_preferences=model_prefs, + stop_sequences=params.stopSequences, + metadata=params.metadata, + include_context=params.includeContext, + loop=loop, + ) + + with self._pending_lock: + self._pending[request_id] = pending + + PrintStyle(font_color="cyan", padding=True).print( + f"MCP Sampling: Server '{server_name}' requests LLM sampling " + f"({len(messages)} messages, max_tokens={params.maxTokens})" + ) + + self._log_to_contexts( + request_id=request_id, + heading=f"icon://smart_toy MCP Server '{server_name}' requests LLM sampling", + content=f"{len(messages)} message(s), max_tokens={params.maxTokens}", + kvps={ + "server_name": server_name, + "request_id": request_id, + "status": "pending_approval", + "max_tokens": params.maxTokens, + "message_count": len(messages), + }, + ) + + try: + await self._broadcast_sampling_request(pending) + + # Wait for the user to approve/reject (timeout after 5 minutes) + try: + await asyncio.wait_for(pending.event.wait(), timeout=300) + except asyncio.TimeoutError: + PrintStyle(font_color="orange", padding=True).print( + f"MCP Sampling: Request '{request_id}' timed out after 5 minutes" + ) + return types.ErrorData( + code=types.INVALID_REQUEST, + message="Sampling request timed out waiting for user approval", + ) + + if pending.action == "approve": + return await self._execute_sampling(pending) + elif pending.action == "reject": + return types.ErrorData( + code=types.INVALID_REQUEST, + message="User rejected the sampling request", + ) + else: + return types.ErrorData( + code=types.INVALID_REQUEST, + message="Sampling request was cancelled", + ) + finally: + with self._pending_lock: + self._pending.pop(request_id, None) + + return sampling_callback + + def resolve(self, request_id: str, action: str) -> bool: + """Resolve a pending sampling request with the user's decision. + + Returns True if the request was found and resolved, False otherwise. + """ + with self._pending_lock: + pending = self._pending.get(request_id) + + if pending is None: + PrintStyle(font_color="orange", padding=True).print( + f"MCP Sampling: No pending request found for id '{request_id}'" + ) + return False + + if action not in ("approve", "reject", "cancel"): + PrintStyle(font_color="red", padding=True).print( + f"MCP Sampling: Invalid action '{action}' for request '{request_id}'" + ) + return False + + pending.action = action + # Signal the event on its originating loop for thread-safety. + pending.loop.call_soon_threadsafe(pending.event.set) + + PrintStyle(font_color="green", padding=True).print( + f"MCP Sampling: Request '{request_id}' resolved with action='{action}'" + ) + + self._log_to_contexts( + request_id=request_id, + heading=f"icon://smart_toy MCP Sampling '{pending.server_name}': {action}", + content=f"User responded with: {action}", + kvps={ + "server_name": pending.server_name, + "request_id": request_id, + "action": action, + "status": "resolved", + }, + ) + + return True + + def get_all_pending(self) -> list[dict[str, Any]]: + """Get all pending sampling requests as serializable dicts.""" + with self._pending_lock: + return [ + { + "request_id": p.request_id, + "server_name": p.server_name, + "messages": p.messages, + "system_prompt": p.system_prompt, + "max_tokens": p.max_tokens, + "temperature": p.temperature, + "model_preferences": p.model_preferences, + "stop_sequences": p.stop_sequences, + "include_context": p.include_context, + } + for p in self._pending.values() + ] + + async def _execute_sampling( + self, pending: _PendingSampling + ) -> types.CreateMessageResult | types.ErrorData: + """Call the utility model with the sampling request messages.""" + try: + from plugins._model_config.helpers.model_config import build_utility_model + + model = build_utility_model() + model_name = model.model_name + + # Build the user message from the sampling messages + user_parts = [] + for msg in pending.messages: + role = msg.get("role", "user") + content = msg.get("content", "") + if role == "user": + user_parts.append(content) + else: + user_parts.append(f"[{role}]: {content}") + user_message = "\n\n".join(user_parts) + + call_kwargs: dict[str, Any] = {} + if pending.max_tokens: + call_kwargs["max_tokens"] = pending.max_tokens + if pending.temperature is not None: + call_kwargs["temperature"] = pending.temperature + if pending.stop_sequences: + call_kwargs["stop"] = pending.stop_sequences + + PrintStyle(font_color="cyan", padding=True).print( + f"MCP Sampling: Calling utility model '{model_name}' " + f"with {len(pending.messages)} messages" + ) + + response_text, _reasoning = await model.unified_call( + system_message=pending.system_prompt or "", + user_message=user_message, + **call_kwargs, + ) + + PrintStyle(font_color="green", padding=True).print( + f"MCP Sampling: LLM response received ({len(response_text)} chars)" + ) + + self._log_to_contexts( + request_id=pending.request_id, + heading=f"icon://smart_toy MCP Sampling '{pending.server_name}': completed", + content=response_text[:200] + ("..." if len(response_text) > 200 else ""), + kvps={ + "server_name": pending.server_name, + "request_id": pending.request_id, + "model": model_name, + "status": "completed", + }, + ) + + return types.CreateMessageResult( + role="assistant", + content=types.TextContent(type="text", text=response_text), + model=model_name, + stopReason="endTurn", + ) + except Exception as e: + PrintStyle(font_color="red", padding=True).print( + f"MCP Sampling: LLM call failed: {e}" + ) + return types.ErrorData( + code=types.INTERNAL_ERROR, + message=f"LLM call failed: {e}", + ) + + @staticmethod + def _log_to_contexts( + request_id: str, + heading: str, + content: str, + kvps: dict[str, Any], + ): + """Log a sampling event to all active agent contexts.""" + try: + from agent import AgentContext + AgentContext.log_to_all( + type="mcp_sampling", + heading=heading, + content=content, + kvps=kvps, + id=request_id, + ) + except Exception as e: + PrintStyle(font_color="orange", padding=True).print( + f"MCP Sampling: Failed to log to contexts: {e}" + ) + + async def _broadcast_sampling_request(self, pending: _PendingSampling): + """Broadcast a sampling request to all connected WebSocket clients.""" + from helpers.ws_manager import get_shared_ws_manager + from helpers.ws import NAMESPACE + + payload = { + "request_id": pending.request_id, + "server_name": pending.server_name, + "messages": pending.messages, + "system_prompt": pending.system_prompt, + "max_tokens": pending.max_tokens, + "temperature": pending.temperature, + "model_preferences": pending.model_preferences, + "stop_sequences": pending.stop_sequences, + "include_context": pending.include_context, + } + + try: + manager = get_shared_ws_manager() + await manager.broadcast( + NAMESPACE, + "mcp_sampling_request", + payload, + handler_id="helpers.mcp_sampling.SamplingManager", + ) + except Exception as e: + PrintStyle(font_color="red", padding=True).print( + f"MCP Sampling: Failed to broadcast request: {e}" + ) diff --git a/tests/mcp_elicitation_test_server.py b/tests/mcp_elicitation_test_server.py index 71698b6705..48723ed372 100644 --- a/tests/mcp_elicitation_test_server.py +++ b/tests/mcp_elicitation_test_server.py @@ -25,6 +25,7 @@ from fastmcp import FastMCP, Context from fastmcp.server.elicitation import AcceptedElicitation +from mcp.types import TextContent, SamplingMessage from pydantic import BaseModel, Field @@ -140,5 +141,49 @@ async def simple_echo(message: str) -> str: return f"Echo: {message}" +# --- Sampling tools --- + +@mcp.tool() +async def summarize_text(ctx: Context, text: str) -> str: + """Summarize a piece of text using the client's LLM via MCP sampling. + + Args: + text: The text to summarize. + """ + result = await ctx.sample( + messages=[ + SamplingMessage( + role="user", + content=TextContent(type="text", text=f"Please summarize the following text in 2-3 sentences:\n\n{text}"), + ) + ], + system_prompt="You are a concise summarizer. Respond only with the summary.", + max_tokens=256, + temperature=0.3, + ) + return f"Summary: {result.text}" + + +@mcp.tool() +async def analyze_sentiment(ctx: Context, text: str) -> str: + """Analyze the sentiment of text using the client's LLM via MCP sampling. + + Args: + text: The text to analyze. + """ + result = await ctx.sample( + messages=[ + SamplingMessage( + role="user", + content=TextContent(type="text", text=f"Analyze the sentiment of this text and respond with one word (positive, negative, or neutral) followed by a brief explanation:\n\n{text}"), + ) + ], + system_prompt="You are a sentiment analysis expert. Be concise.", + max_tokens=128, + temperature=0.0, + ) + return f"Sentiment analysis: {result.text}" + + if __name__ == "__main__": mcp.run(transport="streamable-http", host="0.0.0.0", port=8100) diff --git a/webui/components/chat/sampling/sampling-panel.html b/webui/components/chat/sampling/sampling-panel.html new file mode 100644 index 0000000000..89abce5604 --- /dev/null +++ b/webui/components/chat/sampling/sampling-panel.html @@ -0,0 +1,292 @@ + + + + + +
+ +
+ + + + diff --git a/webui/components/chat/sampling/sampling-store.js b/webui/components/chat/sampling/sampling-store.js new file mode 100644 index 0000000000..3fb4fa58b2 --- /dev/null +++ b/webui/components/chat/sampling/sampling-store.js @@ -0,0 +1,88 @@ +import { createStore } from "/js/AlpineStore.js"; +import { getNamespacedClient } from "/js/websocket.js"; + +const stateSocket = getNamespacedClient("/ws"); + +export const store = createStore("sampling", { + /** @type {Array<{request_id: string, server_name: string, messages: Array, system_prompt: string|null, max_tokens: number, temperature: number|null, model_preferences: object|null, stop_sequences: Array|null, include_context: string|null, status: string}>} */ + pending: [], + + initialized: false, + + async init() { + if (this.initialized) return; + this.initialized = true; + + await stateSocket.on("mcp_sampling_request", (envelope) => { + const data = envelope?.data || envelope; + this._handleRequest(data); + }); + }, + + _handleRequest(data) { + if (!data || !data.request_id) return; + + const existing = this.pending.find((p) => p.request_id === data.request_id); + if (existing) return; + + this.pending = [ + ...this.pending, + { + request_id: data.request_id, + server_name: data.server_name || "", + messages: data.messages || [], + system_prompt: data.system_prompt || null, + max_tokens: data.max_tokens || 0, + temperature: data.temperature, + model_preferences: data.model_preferences || null, + stop_sequences: data.stop_sequences || null, + include_context: data.include_context || null, + status: "pending", + }, + ]; + }, + + async approve(requestId) { + const item = this.pending.find((p) => p.request_id === requestId); + if (!item) return; + + item.status = "approving"; + this.pending = [...this.pending]; + + try { + await stateSocket.emit("mcp_sampling_response", { + request_id: requestId, + action: "approve", + }); + this.pending = this.pending.filter((p) => p.request_id !== requestId); + } catch (error) { + console.error("[sampling] approve failed:", error); + item.status = "pending"; + this.pending = [...this.pending]; + } + }, + + async reject(requestId) { + const item = this.pending.find((p) => p.request_id === requestId); + if (!item) return; + + item.status = "rejecting"; + this.pending = [...this.pending]; + + try { + await stateSocket.emit("mcp_sampling_response", { + request_id: requestId, + action: "reject", + }); + this.pending = this.pending.filter((p) => p.request_id !== requestId); + } catch (error) { + console.error("[sampling] reject failed:", error); + item.status = "pending"; + this.pending = [...this.pending]; + } + }, + + hasPending() { + return this.pending.length > 0; + }, +}); diff --git a/webui/index.html b/webui/index.html index 339986ea34..fafe7a1456 100644 --- a/webui/index.html +++ b/webui/index.html @@ -135,6 +135,8 @@
+ + diff --git a/webui/js/messages.js b/webui/js/messages.js index acf25a80c4..b8323fba5f 100644 --- a/webui/js/messages.js +++ b/webui/js/messages.js @@ -101,6 +101,8 @@ export async function getMessageHandler(type) { return drawMessageMcp; case "mcp_elicitation": return drawMessageMcpElicitation; + case "mcp_sampling": + return drawMessageMcpSampling; case "subagent": return drawMessageSubagent; case "warning": @@ -1194,6 +1196,41 @@ export function drawMessageToolSimple({ * @param {MessageHandlerArgs & Record} param0 * @returns {MessageHandlerResult} */ +export function drawMessageMcpSampling({ + id, + type, + heading, + content, + kvps, + timestamp, + agentno = 0, + ...additional +}) { + const title = cleanStepTitle(heading); + let displayKvps = { ...kvps }; + const contentText = String(content ?? ""); + const actionButtons = contentText.trim() + ? [ + createActionButton("detail", "", () => + stepDetailStore.showStepDetail( + buildDetailPayload(arguments[0], { headerLabels: [] }), + ), + ), + ].filter(Boolean) + : []; + + return drawProcessStep({ + id, + title, + code: "SMP", + classes: undefined, + kvps: displayKvps, + content, + actionButtons, + log: arguments[0], + }); +} + export function drawMessageMcpElicitation({ id, type, From 002b8dbf229a397392ee3500650b6f3db5c1fc47 Mon Sep 17 00:00:00 2001 From: Alexander Vaagan <2428222+vaaale@users.noreply.github.com> Date: Tue, 31 Mar 2026 20:50:29 +0200 Subject: [PATCH 4/6] Implemented support for mcp-app --- agent.py | 5 +- helpers/mcp_handler.py | 142 +++++++++++++++++- tests/mcp_elicitation_test_server.py | 213 +++++++++++++++++++++++++++ 3 files changed, 352 insertions(+), 8 deletions(-) diff --git a/agent.py b/agent.py index 68a5e13695..855b431103 100644 --- a/agent.py +++ b/agent.py @@ -920,6 +920,7 @@ async def process_tools(self, msg: str): self, response=response, tool_name=tool_name, + tool_args=tool_args or {}, ) await tool.after_execution(response) @@ -954,8 +955,8 @@ async def validate_tool_request(self, tool_request: Any): raise ValueError("Tool request must be a dictionary") if not tool_request.get("tool_name") or not isinstance(tool_request.get("tool_name"), str): raise ValueError("Tool request must have a tool_name (type string) field") - if not tool_request.get("tool_args") or not isinstance(tool_request.get("tool_args"), dict): - raise ValueError("Tool request must have a tool_args (type dictionary) field") + # if not tool_request.get("tool_args") or not isinstance(tool_request.get("tool_args"), dict): + # raise ValueError("Tool request must have a tool_args (type dictionary) field") diff --git a/helpers/mcp_handler.py b/helpers/mcp_handler.py index e7f99cf58d..5b9f4a0c76 100644 --- a/helpers/mcp_handler.py +++ b/helpers/mcp_handler.py @@ -34,7 +34,7 @@ from mcp.client.sse import sse_client from mcp.client.streamable_http import streamablehttp_client from mcp.shared.message import SessionMessage -from mcp.types import CallToolResult, ListToolsResult +from mcp.types import CallToolResult, ListToolsResult, ReadResourceResult from anyio.streams.memory import ( MemoryObjectReceiveStream, MemoryObjectSendStream, @@ -267,6 +267,14 @@ async def call_tool( # We already run in an event loop, dont believe Pylance return await self.__client.call_tool(tool_name, input_data) # type: ignore + def get_tool_ui_meta(self, tool_name: str) -> dict[str, Any] | None: + with self.__lock: + return self.__client.get_tool_ui_meta(tool_name) # type: ignore + + async def read_resource(self, uri: str) -> ReadResourceResult: + with self.__lock: + return await self.__client.read_resource(uri) # type: ignore + def update(self, config: dict[str, Any]) -> "MCPServerRemote": with self.__lock: for key, value in config.items(): @@ -345,6 +353,14 @@ async def call_tool( # We already run in an event loop, dont believe Pylance return await self.__client.call_tool(tool_name, input_data) # type: ignore + def get_tool_ui_meta(self, tool_name: str) -> dict[str, Any] | None: + with self.__lock: + return self.__client.get_tool_ui_meta(tool_name) # type: ignore + + async def read_resource(self, uri: str) -> ReadResourceResult: + with self.__lock: + return await self.__client.read_resource(uri) # type: ignore + def update(self, config: dict[str, Any]) -> "MCPServerLocal": with self.__lock: for key, value in config.items(): @@ -793,6 +809,25 @@ def get_tool(self, agent: Any, tool_name: str) -> MCPTool | None: return None return MCPTool(agent=agent, name=tool_name, method=None, args={}, message="", loop_data=None) + def get_tool_ui_meta(self, tool_name: str) -> dict[str, Any] | None: + """Return the _meta.ui dict for a qualified tool name (server.tool), or None.""" + if "." not in tool_name: + return None + server_name_part, tool_name_part = tool_name.split(".", 1) + with self.__lock: + for server in self.servers: + if server.name == server_name_part: + return server.get_tool_ui_meta(tool_name_part) + return None + + async def read_resource(self, server_name: str, uri: str) -> ReadResourceResult: + """Read a resource by URI from a specific server.""" + with self.__lock: + for server in self.servers: + if server.name == server_name: + return await server.read_resource(uri) + raise ValueError(f"Server '{server_name}' not found") + async def call_tool( self, tool_name: str, input_data: Dict[str, Any] ) -> CallToolResult: @@ -810,6 +845,63 @@ async def call_tool( T = TypeVar("T") +async def _initialize_with_ui_ext(session: ClientSession): + """Initialize a ClientSession with the io.modelcontextprotocol/ui extension capability. + + Replicates the SDK's initialize() logic but injects an extensions field + into ClientCapabilities to advertise MCP Apps support. + """ + from mcp import types as _t + from mcp.client.session import ( + SUPPORTED_PROTOCOL_VERSIONS, + _default_sampling_callback, + _default_elicitation_callback, + _default_list_roots_callback, + ) + + sampling = _t.SamplingCapability() if session._sampling_callback is not _default_sampling_callback else None + elicitation = _t.ElicitationCapability() if session._elicitation_callback is not _default_elicitation_callback else None + roots = ( + _t.RootsCapability(listChanged=True) + if session._list_roots_callback is not _default_list_roots_callback + else None + ) + + capabilities = _t.ClientCapabilities( + sampling=sampling, + elicitation=elicitation, + experimental=None, + roots=roots, + ) + # Inject MCP Apps extension capability (ClientCapabilities allows extra fields) + capabilities.extensions = { # type: ignore[attr-defined] + "io.modelcontextprotocol/ui": { + "mimeTypes": ["text/html;profile=mcp-app"] + } + } + + result = await session.send_request( + _t.ClientRequest( + _t.InitializeRequest( + params=_t.InitializeRequestParams( + protocolVersion=_t.LATEST_PROTOCOL_VERSION, + capabilities=capabilities, + clientInfo=session._client_info, + ), + ) + ), + _t.InitializeResult, + ) + + if result.protocolVersion not in SUPPORTED_PROTOCOL_VERSIONS: + raise RuntimeError(f"Unsupported protocol version from the server: {result.protocolVersion}") + + session._server_capabilities = result.capabilities + await session.send_notification(_t.ClientNotification(_t.InitializedNotification())) + + return result + + class MCPClientBase(ABC): # server: Union[MCPServerLocal, MCPServerRemote] # Defined in __init__ # tools: List[dict[str, Any]] # Defined in __init__ @@ -871,7 +963,7 @@ async def _execute_with_session( sampling_callback=sampling_cb, ) ) - await session.initialize() + await _initialize_with_ui_ext(session) result = await coro_func(session) @@ -912,14 +1004,25 @@ async def update_tools(self) -> "MCPClientBase": async def list_tools_op(current_session: ClientSession): response: ListToolsResult = await current_session.list_tools() with self.__lock: - self.tools = [ - { + self.tools = [] + for tool in response.tools: + tool_dict: dict[str, Any] = { "name": tool.name, "description": tool.description, "input_schema": tool.inputSchema, } - for tool in response.tools - ] + # Preserve _meta.ui for MCP Apps support + PrintStyle(font_color="yellow", padding=True).print( + f"DEBUG update_tools: tool '{tool.name}' meta={tool.meta}" + ) + if tool.meta and isinstance(tool.meta, dict): + ui_meta = tool.meta.get("ui") + if ui_meta and isinstance(ui_meta, dict): + tool_dict["ui_meta"] = ui_meta + PrintStyle(font_color="green", padding=True).print( + f"DEBUG update_tools: captured ui_meta for '{tool.name}': {ui_meta}" + ) + self.tools.append(tool_dict) PrintStyle(font_color="green").print( f"MCPClientBase ({self.server.name}): Tools updated. Found {len(self.tools)} tools." ) @@ -958,6 +1061,33 @@ def get_tools(self) -> List[dict[str, Any]]: with self.__lock: return self.tools + def get_tool_ui_meta(self, tool_name: str) -> dict[str, Any] | None: + """Return the _meta.ui dict for a tool, or None.""" + with self.__lock: + for tool in self.tools: + if tool["name"] == tool_name: + return tool.get("ui_meta") + return None + + async def read_resource(self, uri: str) -> "ReadResourceResult": + """Read a resource by URI (e.g. ui:// resources for MCP Apps).""" + from pydantic import AnyUrl + + async def read_resource_op(current_session: ClientSession): + return await current_session.read_resource(AnyUrl(uri)) + + try: + return await self._execute_with_session(read_resource_op) + except Exception as e: + PrintStyle( + background_color="#AA4455", font_color="white", padding=True + ).print( + f"MCPClientBase ({self.server.name}): 'read_resource' for '{uri}' failed: {type(e).__name__}: {e}" + ) + raise ConnectionError( + f"Failed to read resource '{uri}' on server '{self.server.name}': {type(e).__name__}: {e}" + ) + async def call_tool( self, tool_name: str, input_data: Dict[str, Any] ) -> CallToolResult: diff --git a/tests/mcp_elicitation_test_server.py b/tests/mcp_elicitation_test_server.py index 48723ed372..c6b7f270e4 100644 --- a/tests/mcp_elicitation_test_server.py +++ b/tests/mcp_elicitation_test_server.py @@ -20,6 +20,8 @@ - simple_echo: No elicitation, just echoes input (control test). """ +import json +import time from enum import Enum from typing import Optional @@ -185,5 +187,216 @@ async def analyze_sentiment(ctx: Context, text: str) -> str: return f"Sentiment analysis: {result.text}" +# --- MCP Apps tools --- + +DASHBOARD_HTML = """ + + + +Server Dashboard + + + +

๐Ÿ“Š Server Dashboard

+
+
+

Server Time

+
Loading...
+
Last updated
+
+
+

Status

+
โ—
+
Checking...
+
+
+

Uptime

+
--
+
Hours
+
+
+

Requests

+
--
+
Total served
+
+
+ +
+ + + +""" + +_server_start = time.time() +_request_count = 0 + + +@mcp.resource( + "ui://elicitation-test/dashboard", + name="Server Dashboard", + description="Interactive server monitoring dashboard", + mime_type="text/html", +) +def get_dashboard_html() -> str: + """Serve the dashboard HTML for the MCP App.""" + return DASHBOARD_HTML + + +@mcp.tool( + meta={ + "ui": { + "resourceUri": "ui://elicitation-test/dashboard", + "visibility": ["model", "app"], + } + } +) +async def show_dashboard(ctx: Context, title: str = "Server Dashboard") -> str: + """Show an interactive server monitoring dashboard. Returns live server statistics. + + This tool demonstrates MCP Apps โ€” it renders an interactive UI in the host. + """ + global _request_count + _request_count += 1 + uptime = (time.time() - _server_start) / 3600 + data = { + "time": time.strftime("%Y-%m-%d %H:%M:%S"), + "healthy": True, + "uptime_hours": round(uptime, 2), + "total_requests": _request_count, + } + return json.dumps(data) + + +@mcp.tool( + meta={ + "ui": { + "resourceUri": "ui://elicitation-test/dashboard", + "visibility": ["app"], + } + } +) +async def get_server_stats(name: str = "Server") -> str: + """Get current server statistics. This is an app-only tool (hidden from model). + + Called by the dashboard UI's refresh button. + """ + global _request_count + _request_count += 1 + uptime = (time.time() - _server_start) / 3600 + data = { + "time": time.strftime("%Y-%m-%d %H:%M:%S"), + "healthy": True, + "uptime_hours": round(uptime, 2), + "total_requests": _request_count, + } + return json.dumps(data) + + if __name__ == "__main__": mcp.run(transport="streamable-http", host="0.0.0.0", port=8100) From 5e4082bd89b7efa05e65e51bc336baf6bfc5f1de Mon Sep 17 00:00:00 2001 From: Alexander Vaagan <2428222+vaaale@users.noreply.github.com> Date: Wed, 1 Apr 2026 07:59:58 +0200 Subject: [PATCH 5/6] Moved test MCP server --- tests/mcp_elicitation_test_server.py | 402 --------------------------- 1 file changed, 402 deletions(-) delete mode 100644 tests/mcp_elicitation_test_server.py diff --git a/tests/mcp_elicitation_test_server.py b/tests/mcp_elicitation_test_server.py deleted file mode 100644 index c6b7f270e4..0000000000 --- a/tests/mcp_elicitation_test_server.py +++ /dev/null @@ -1,402 +0,0 @@ -#!/usr/bin/env python3 -""" -Simple MCP server for end-to-end testing of the elicitation feature. - -Usage: - Start the server: - python tests/mcp_elicitation_test_server.py - - Add to Agent Zero MCP config as: - { - "name": "elicitation-test", - "type": "streamable-http", - "url": "http://localhost:8100/mcp" - } - -Tools provided: - - greet_user: Elicits user's name and greeting style, returns a personalized greeting. - - create_task: Elicits task details (title, priority, description), returns summary. - - confirm_action: Elicits a yes/no confirmation before proceeding. - - simple_echo: No elicitation, just echoes input (control test). -""" - -import json -import time -from enum import Enum -from typing import Optional - -from fastmcp import FastMCP, Context -from fastmcp.server.elicitation import AcceptedElicitation -from mcp.types import TextContent, SamplingMessage -from pydantic import BaseModel, Field - - -mcp = FastMCP( - name="elicitation-test", - instructions="A test server for MCP elicitation. Use the tools to test human-in-the-loop input gathering.", -) - - -# --- Elicitation response models --- - -class GreetingInfo(BaseModel): - name: str = Field(description="Your name") - style: str = Field(description="Greeting style: formal, casual, or pirate") - - -class Priority(str, Enum): - LOW = "low" - MEDIUM = "medium" - HIGH = "high" - CRITICAL = "critical" - - -class TaskInfo(BaseModel): - title: str = Field(description="Task title") - priority: Priority = Field(default=Priority.MEDIUM, description="Task priority level") - description: str = Field(default="", description="Optional task description") - - -class Confirmation(BaseModel): - confirmed: bool = Field(description="Do you want to proceed?") - - -# --- Tools --- - -@mcp.tool() -async def greet_user(ctx: Context, reason: str = "general") -> str: - """Generate a personalized greeting. Will ask for the user's name and preferred greeting style. - - Args: - reason: Why the greeting is being generated (e.g. 'welcome', 'farewell', 'general'). - """ - result = await ctx.elicit( - message="I'd like to greet you! Please provide your name and preferred greeting style.", - response_type=GreetingInfo, - ) - - if isinstance(result, AcceptedElicitation): - name = result.data.name - style = result.data.style.lower() - if style == "formal": - return f"Good day, {name}. It is a pleasure to make your acquaintance." - elif style == "pirate": - return f"Ahoy, {name}! Welcome aboard, ye scallywag!" - else: - return f"Hey {name}! What's up?" - else: - return f"Greeting cancelled (action: {result.action})." - - -@mcp.tool() -async def create_task(ctx: Context, project: str = "default") -> str: - """Create a new task. Will ask for task details via elicitation. - - Args: - project: The project to create the task in. - """ - result = await ctx.elicit( - message=f"Please provide details for the new task in project '{project}'.", - response_type=TaskInfo, - ) - - if isinstance(result, AcceptedElicitation): - task = result.data - return ( - f"Task created in '{project}':\n" - f" Title: {task.title}\n" - f" Priority: {task.priority.value}\n" - f" Description: {task.description or '(none)'}" - ) - else: - return f"Task creation cancelled (action: {result.action})." - - -@mcp.tool() -async def confirm_action(action_description: str, ctx: Context) -> str: - """Ask for user confirmation before performing an action. - - Args: - action_description: Description of the action that needs confirmation. - """ - result = await ctx.elicit( - message=f"Please confirm: {action_description}", - response_type=Confirmation, - ) - - if isinstance(result, AcceptedElicitation): - if result.data.confirmed: - return f"Action confirmed: {action_description}. Proceeding." - else: - return f"User explicitly declined via the form for: {action_description}." - else: - return f"Confirmation cancelled (action: {result.action})." - - -@mcp.tool() -async def simple_echo(message: str) -> str: - """Echo the input message back. No elicitation involved (control test). - - Args: - message: The message to echo. - """ - return f"Echo: {message}" - - -# --- Sampling tools --- - -@mcp.tool() -async def summarize_text(ctx: Context, text: str) -> str: - """Summarize a piece of text using the client's LLM via MCP sampling. - - Args: - text: The text to summarize. - """ - result = await ctx.sample( - messages=[ - SamplingMessage( - role="user", - content=TextContent(type="text", text=f"Please summarize the following text in 2-3 sentences:\n\n{text}"), - ) - ], - system_prompt="You are a concise summarizer. Respond only with the summary.", - max_tokens=256, - temperature=0.3, - ) - return f"Summary: {result.text}" - - -@mcp.tool() -async def analyze_sentiment(ctx: Context, text: str) -> str: - """Analyze the sentiment of text using the client's LLM via MCP sampling. - - Args: - text: The text to analyze. - """ - result = await ctx.sample( - messages=[ - SamplingMessage( - role="user", - content=TextContent(type="text", text=f"Analyze the sentiment of this text and respond with one word (positive, negative, or neutral) followed by a brief explanation:\n\n{text}"), - ) - ], - system_prompt="You are a sentiment analysis expert. Be concise.", - max_tokens=128, - temperature=0.0, - ) - return f"Sentiment analysis: {result.text}" - - -# --- MCP Apps tools --- - -DASHBOARD_HTML = """ - - - -Server Dashboard - - - -

๐Ÿ“Š Server Dashboard

-
-
-

Server Time

-
Loading...
-
Last updated
-
-
-

Status

-
โ—
-
Checking...
-
-
-

Uptime

-
--
-
Hours
-
-
-

Requests

-
--
-
Total served
-
-
- -
- - - -""" - -_server_start = time.time() -_request_count = 0 - - -@mcp.resource( - "ui://elicitation-test/dashboard", - name="Server Dashboard", - description="Interactive server monitoring dashboard", - mime_type="text/html", -) -def get_dashboard_html() -> str: - """Serve the dashboard HTML for the MCP App.""" - return DASHBOARD_HTML - - -@mcp.tool( - meta={ - "ui": { - "resourceUri": "ui://elicitation-test/dashboard", - "visibility": ["model", "app"], - } - } -) -async def show_dashboard(ctx: Context, title: str = "Server Dashboard") -> str: - """Show an interactive server monitoring dashboard. Returns live server statistics. - - This tool demonstrates MCP Apps โ€” it renders an interactive UI in the host. - """ - global _request_count - _request_count += 1 - uptime = (time.time() - _server_start) / 3600 - data = { - "time": time.strftime("%Y-%m-%d %H:%M:%S"), - "healthy": True, - "uptime_hours": round(uptime, 2), - "total_requests": _request_count, - } - return json.dumps(data) - - -@mcp.tool( - meta={ - "ui": { - "resourceUri": "ui://elicitation-test/dashboard", - "visibility": ["app"], - } - } -) -async def get_server_stats(name: str = "Server") -> str: - """Get current server statistics. This is an app-only tool (hidden from model). - - Called by the dashboard UI's refresh button. - """ - global _request_count - _request_count += 1 - uptime = (time.time() - _server_start) / 3600 - data = { - "time": time.strftime("%Y-%m-%d %H:%M:%S"), - "healthy": True, - "uptime_hours": round(uptime, 2), - "total_requests": _request_count, - } - return json.dumps(data) - - -if __name__ == "__main__": - mcp.run(transport="streamable-http", host="0.0.0.0", port=8100) From fae2a49326ae0266de024a8a98b3b3501d3cd050 Mon Sep 17 00:00:00 2001 From: Alexander Vaagan <2428222+vaaale@users.noreply.github.com> Date: Wed, 1 Apr 2026 08:47:13 +0200 Subject: [PATCH 6/6] Added MCP Apps Plugin --- .../_10_mcp_apps_intercept.py | 108 +++++ .../python/webui_ws_event/_22_mcp_apps.py | 103 +++++ .../get_message_handler/mcp_app_handler.js | 41 ++ .../webui/initFw_end/mcp_apps_init.js | 5 + .../set_messages_after_loop/mcp_app_inject.js | 72 +++ plugins/_mcp_apps/helpers/__init__.py | 0 plugins/_mcp_apps/helpers/mcp_apps_manager.py | 433 ++++++++++++++++++ plugins/_mcp_apps/plugin.yaml | 8 + plugins/_mcp_apps/test_mcp/__init__.py | 0 .../test_mcp/mcp_elicitation_test_server.py | 402 ++++++++++++++++ plugins/_mcp_apps/webui/mcp-app-bridge.js | 402 ++++++++++++++++ plugins/_mcp_apps/webui/mcp-app-renderer.html | 115 +++++ plugins/_mcp_apps/webui/mcp-app-sandbox.html | 149 ++++++ plugins/_mcp_apps/webui/mcp-app-store.js | 134 ++++++ 14 files changed, 1972 insertions(+) create mode 100644 plugins/_mcp_apps/extensions/python/tool_execute_after/_10_mcp_apps_intercept.py create mode 100644 plugins/_mcp_apps/extensions/python/webui_ws_event/_22_mcp_apps.py create mode 100644 plugins/_mcp_apps/extensions/webui/get_message_handler/mcp_app_handler.js create mode 100644 plugins/_mcp_apps/extensions/webui/initFw_end/mcp_apps_init.js create mode 100644 plugins/_mcp_apps/extensions/webui/set_messages_after_loop/mcp_app_inject.js create mode 100644 plugins/_mcp_apps/helpers/__init__.py create mode 100644 plugins/_mcp_apps/helpers/mcp_apps_manager.py create mode 100644 plugins/_mcp_apps/plugin.yaml create mode 100644 plugins/_mcp_apps/test_mcp/__init__.py create mode 100644 plugins/_mcp_apps/test_mcp/mcp_elicitation_test_server.py create mode 100644 plugins/_mcp_apps/webui/mcp-app-bridge.js create mode 100644 plugins/_mcp_apps/webui/mcp-app-renderer.html create mode 100644 plugins/_mcp_apps/webui/mcp-app-sandbox.html create mode 100644 plugins/_mcp_apps/webui/mcp-app-store.js diff --git a/plugins/_mcp_apps/extensions/python/tool_execute_after/_10_mcp_apps_intercept.py b/plugins/_mcp_apps/extensions/python/tool_execute_after/_10_mcp_apps_intercept.py new file mode 100644 index 0000000000..f11c733056 --- /dev/null +++ b/plugins/_mcp_apps/extensions/python/tool_execute_after/_10_mcp_apps_intercept.py @@ -0,0 +1,108 @@ +""" +Extension that runs after MCP tool execution. If the tool has _meta.ui metadata, +fetches the UI resource and registers an app instance, then broadcasts an +mcp_app message to the frontend via the agent context log. +""" + +from helpers.extension import Extension +from helpers.print_style import PrintStyle + + +class McpAppsToolIntercept(Extension): + async def execute(self, **kwargs): + tool_name = kwargs.get("tool_name", "") + response = kwargs.get("response", None) + + PrintStyle(font_color="yellow", padding=True).print( + f"DEBUG McpAppsToolIntercept: called with tool_name='{tool_name}'" + ) + + if not tool_name or "." not in tool_name: + PrintStyle(font_color="yellow", padding=True).print( + f"DEBUG McpAppsToolIntercept: skipping (no dot in tool_name)" + ) + return + + try: + import helpers.mcp_handler as mcp_handler + + mcp_config = mcp_handler.MCPConfig.get_instance() + ui_meta = mcp_config.get_tool_ui_meta(tool_name) + PrintStyle(font_color="yellow", padding=True).print( + f"DEBUG McpAppsToolIntercept: ui_meta for '{tool_name}' = {ui_meta}" + ) + if not ui_meta: + return + + resource_uri = ui_meta.get("resourceUri") + if not resource_uri: + return + + server_name = tool_name.split(".", 1)[0] + + PrintStyle(font_color="cyan", padding=True).print( + f"MCP Apps: Tool '{tool_name}' has UI resource '{resource_uri}', fetching..." + ) + + from usr.plugins.mcp_apps.helpers.mcp_apps_manager import MCPAppsManager + + manager = MCPAppsManager.get_instance() + html_content = await manager.fetch_ui_resource(server_name, resource_uri) + + tool_result_text = response.message if response else "" + tool_args = kwargs.get("tool_args", {}) + + # Look up tool description and input schema from MCP tool cache + short_tool_name = tool_name.split(".", 1)[1] + tool_description = "" + tool_input_schema = None + for srv in mcp_config.servers: + if srv.name == server_name: + for t in srv.get_tools(): + if t.get("name") == short_tool_name: + tool_description = t.get("description", "") + tool_input_schema = t.get("input_schema") + break + break + + app_id = manager.register_app( + server_name=server_name, + tool_name=short_tool_name, + resource_uri=resource_uri, + html_content=html_content, + tool_args=tool_args, + tool_result={"content": [{"type": "text", "text": tool_result_text}]}, + ui_meta=ui_meta, + tool_description=tool_description, + tool_input_schema=tool_input_schema, + ) + + if self.agent and self.agent.context: + csp = ui_meta.get("csp", {}) + permissions = ui_meta.get("permissions", {}) + prefers_border = ui_meta.get("prefersBorder", True) + + self.agent.context.log.log( + type="mcp_app", + heading=f"icon://widgets MCP App: {tool_name}", + content="", + kvps={ + "app_id": app_id, + "server_name": server_name, + "tool_name": tool_name, + "resource_uri": resource_uri, + "csp": csp, + "permissions": permissions, + "prefers_border": prefers_border, + }, + ) + + PrintStyle(font_color="green", padding=True).print( + f"MCP Apps: App '{app_id}' ready for '{tool_name}' " + f"({len(html_content)} bytes HTML)" + ) + + except Exception as e: + PrintStyle(font_color="red", padding=True).print( + f"MCP Apps: Failed to set up app for tool '{tool_name}': {e}" + ) diff --git a/plugins/_mcp_apps/extensions/python/webui_ws_event/_22_mcp_apps.py b/plugins/_mcp_apps/extensions/python/webui_ws_event/_22_mcp_apps.py new file mode 100644 index 0000000000..56abfcb8b7 --- /dev/null +++ b/plugins/_mcp_apps/extensions/python/webui_ws_event/_22_mcp_apps.py @@ -0,0 +1,103 @@ +""" +WebSocket extension handling MCP Apps events from the frontend iframe bridge. + +Events handled: +- mcp_app_tool_call: Proxy a tools/call from iframe to MCP server +- mcp_app_resource_read: Proxy a resources/read from iframe to MCP server +- mcp_app_get_data: Retrieve app data (HTML, tool result, etc.) for an app_id +- mcp_app_teardown: Clean up an app instance +""" + +from helpers.extension import Extension + + +class McpAppsWsExtension(Extension): + async def execute(self, **kwargs): + event_type = kwargs.get("event_type", "") + data = kwargs.get("data", {}) + response_data = kwargs.get("response_data", {}) + + if event_type == "mcp_app_tool_call": + await self._handle_tool_call(data, response_data) + elif event_type == "mcp_app_resource_read": + await self._handle_resource_read(data, response_data) + elif event_type == "mcp_app_get_data": + await self._handle_get_data(data, response_data) + elif event_type == "mcp_app_teardown": + await self._handle_teardown(data, response_data) + + async def _handle_tool_call(self, data: dict, response_data: dict): + import asyncio + from helpers.print_style import PrintStyle + from usr.plugins.mcp_apps.helpers.mcp_apps_manager import MCPAppsManager + + app_id = data.get("app_id", "") + tool_name = data.get("tool_name", "") + arguments = data.get("arguments", {}) + + if not app_id or not tool_name: + response_data["error"] = "Missing app_id or tool_name" + return + + PrintStyle(font_color="cyan", padding=True).print( + f"MCP Apps WS: tool_call app_id={app_id} tool={tool_name}" + ) + + manager = MCPAppsManager.get_instance() + try: + result = await asyncio.wait_for( + manager.proxy_tool_call(app_id, tool_name, arguments), + timeout=60, + ) + response_data.update(result) + except asyncio.TimeoutError: + PrintStyle(font_color="red", padding=True).print( + f"MCP Apps WS: tool_call TIMEOUT for {tool_name}" + ) + response_data["error"] = {"code": -32000, "message": f"Tool call '{tool_name}' timed out"} + except Exception as e: + PrintStyle(font_color="red", padding=True).print( + f"MCP Apps WS: tool_call ERROR for {tool_name}: {e}" + ) + response_data["error"] = {"code": -32000, "message": str(e)} + + async def _handle_resource_read(self, data: dict, response_data: dict): + from usr.plugins.mcp_apps.helpers.mcp_apps_manager import MCPAppsManager + + app_id = data.get("app_id", "") + uri = data.get("uri", "") + + if not app_id or not uri: + response_data["error"] = "Missing app_id or uri" + return + + manager = MCPAppsManager.get_instance() + result = await manager.proxy_resource_read(app_id, uri) + response_data.update(result) + + async def _handle_get_data(self, data: dict, response_data: dict): + from usr.plugins.mcp_apps.helpers.mcp_apps_manager import MCPAppsManager + + app_id = data.get("app_id", "") + if not app_id: + response_data["error"] = "Missing app_id" + return + + manager = MCPAppsManager.get_instance() + app_data = manager.get_app_data(app_id) + if app_data: + response_data.update(app_data) + else: + response_data["error"] = f"App '{app_id}' not found" + + async def _handle_teardown(self, data: dict, response_data: dict): + from usr.plugins.mcp_apps.helpers.mcp_apps_manager import MCPAppsManager + + app_id = data.get("app_id", "") + if not app_id: + response_data["error"] = "Missing app_id" + return + + manager = MCPAppsManager.get_instance() + manager.remove_app(app_id) + response_data["ok"] = True diff --git a/plugins/_mcp_apps/extensions/webui/get_message_handler/mcp_app_handler.js b/plugins/_mcp_apps/extensions/webui/get_message_handler/mcp_app_handler.js new file mode 100644 index 0000000000..944cccc1d3 --- /dev/null +++ b/plugins/_mcp_apps/extensions/webui/get_message_handler/mcp_app_handler.js @@ -0,0 +1,41 @@ +/** + * JS extension for get_message_handler โ€” registers the mcp_app message type handler. + * Renders a compact APP process step. The iframe is injected separately + * by the set_messages_after_loop extension once all messages are in the DOM. + */ +import { drawProcessStep } from "/js/messages.js"; + +export default async function(extData) { + if (extData.type !== "mcp_app") return; + + extData.handler = drawMessageMcpApp; +} + +function drawMessageMcpApp({ id, type, heading, content, kvps, timestamp, agentno = 0, ...additional }) { + const toolName = kvps?.tool_name || "MCP App"; + const serverName = kvps?.server_name || ""; + const resourceUri = kvps?.resource_uri || ""; + + const cleanTitle = heading + ? heading.replace(/^icon:\/\/\S+\s*/, "") + : `MCP App: ${toolName}`; + + const result = drawProcessStep({ + id, + title: cleanTitle, + code: "APP", + classes: ["mcp-app-step"], + kvps: { server: serverName, tool: toolName }, + content: resourceUri, + actionButtons: [], + log: { id, type, heading, content, kvps, timestamp, agentno, ...additional }, + allowCompletedGroup: true, + }); + + // Store kvps on the step element so the after-loop extension can find it + if (result.step) { + result.step.setAttribute("data-mcp-app-kvps-json", JSON.stringify(kvps || {})); + } + + return result; +} diff --git a/plugins/_mcp_apps/extensions/webui/initFw_end/mcp_apps_init.js b/plugins/_mcp_apps/extensions/webui/initFw_end/mcp_apps_init.js new file mode 100644 index 0000000000..aaca2eb2e2 --- /dev/null +++ b/plugins/_mcp_apps/extensions/webui/initFw_end/mcp_apps_init.js @@ -0,0 +1,5 @@ +import { store } from "/usr/plugins/mcp_apps/webui/mcp-app-store.js"; + +export default async function mcpAppsInit(ctx) { + // Import is enough to register the Alpine store +} diff --git a/plugins/_mcp_apps/extensions/webui/set_messages_after_loop/mcp_app_inject.js b/plugins/_mcp_apps/extensions/webui/set_messages_after_loop/mcp_app_inject.js new file mode 100644 index 0000000000..c2da98a734 --- /dev/null +++ b/plugins/_mcp_apps/extensions/webui/set_messages_after_loop/mcp_app_inject.js @@ -0,0 +1,72 @@ +/** + * set_messages_after_loop extension โ€” injects MCP App iframes into + * the .process-group-response container, above the response .message div. + * + * Runs after ALL messages are rendered, so the DOM is stable and + * .process-group-response is guaranteed to exist (if a response was sent). + */ + +export default async function(context) { + // Find all MCP App steps that have stored kvps + const appSteps = document.querySelectorAll(".mcp-app-step[data-mcp-app-kvps-json]"); + + for (const step of appSteps) { + const stepId = step.getAttribute("data-step-id"); + const frameId = `mcp-app-frame-${stepId}`; + + // Already injected + if (document.getElementById(frameId)) continue; + + // Find the process group this step belongs to + const processGroup = step.closest(".process-group"); + if (!processGroup) continue; + + // Find the response container in this process group + const responseContainer = processGroup.querySelector(".process-group-response"); + if (!responseContainer) continue; + + // Parse the stored kvps + let kvps; + try { + kvps = JSON.parse(step.getAttribute("data-mcp-app-kvps-json")); + } catch (e) { + continue; + } + + // Find the .message.message-agent-response div inside the response container + const messageDiv = responseContainer.querySelector(".message.message-agent-response"); + if (!messageDiv) continue; + + // Create the iframe container and prepend it inside the message div (before .message-body) + const frameContainer = document.createElement("div"); + frameContainer.id = frameId; + frameContainer.className = "mcp-app-frame-container"; + frameContainer.style.cssText = "margin-bottom: 12px;"; + frameContainer.setAttribute("data-mcp-app-kvps", ""); + frameContainer.__mcp_app_kvps = kvps; + + messageDiv.prepend(frameContainer); + + // Load the renderer component + await loadRendererComponent(frameContainer, kvps); + } +} + +async function loadRendererComponent(mountEl, kvps) { + try { + const resp = await fetch("/usr/plugins/mcp_apps/webui/mcp-app-renderer.html"); + if (!resp.ok) { + mountEl.innerHTML = `
Failed to load MCP App renderer
`; + return; + } + const html = await resp.text(); + mountEl.innerHTML = html; + + if (window.Alpine) { + window.Alpine.initTree(mountEl); + } + } catch (e) { + console.error("[mcp-apps] Failed to load renderer:", e); + mountEl.innerHTML = `
Error: ${e.message}
`; + } +} diff --git a/plugins/_mcp_apps/helpers/__init__.py b/plugins/_mcp_apps/helpers/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/plugins/_mcp_apps/helpers/mcp_apps_manager.py b/plugins/_mcp_apps/helpers/mcp_apps_manager.py new file mode 100644 index 0000000000..046144e8ff --- /dev/null +++ b/plugins/_mcp_apps/helpers/mcp_apps_manager.py @@ -0,0 +1,433 @@ +""" +MCP Apps Manager โ€” singleton that tracks UI-enabled tools, fetches UI resources, +manages active app sessions, and proxies tool calls from iframes back to MCP servers. + +Proxy calls from iframes use persistent MCP sessions to avoid the per-call overhead +of creating new connections and running the MCP handshake each time. +""" + +import asyncio +import threading +import uuid +from contextlib import AsyncExitStack +from datetime import timedelta +from typing import Any, Optional + +from helpers.print_style import PrintStyle + + +class _ActiveApp: + """Tracks a single active MCP App iframe instance.""" + + def __init__( + self, + app_id: str, + server_name: str, + tool_name: str, + resource_uri: str, + html_content: str, + tool_args: dict[str, Any], + tool_result: dict[str, Any] | None, + ui_meta: dict[str, Any], + tool_description: str = "", + tool_input_schema: dict[str, Any] | None = None, + ): + self.app_id = app_id + self.server_name = server_name + self.tool_name = tool_name + self.resource_uri = resource_uri + self.html_content = html_content + self.tool_args = tool_args + self.tool_result = tool_result + self.ui_meta = ui_meta + self.tool_description = tool_description + self.tool_input_schema = tool_input_schema or {"type": "object"} + + +class _ProxySession: + """Maintains a persistent MCP ClientSession in a dedicated background task. + + anyio cancel scopes (used by streamablehttp_client) require enter/exit from + the same asyncio Task. We satisfy this by running the entire session + lifecycle inside a single long-lived task and communicating via a queue. + """ + + def __init__(self): + self.session = None + self._queue: asyncio.Queue | None = None + self._task: asyncio.Task | None = None + self._ready = asyncio.Event() + self._open_error: BaseException | None = None + + async def open(self, server): + """Start the background task that owns the transport + session.""" + self._ready = asyncio.Event() + self._open_error = None + self._queue = asyncio.Queue() + self._task = asyncio.create_task(self._run(server)) + await self._ready.wait() + if self._open_error: + raise self._open_error + + async def _run(self, server): + """Background task โ€” owns the full async-context-manager stack.""" + from mcp import ClientSession + from helpers.mcp_handler import ( + MCPServerRemote, + MCPServerLocal, + _initialize_with_ui_ext, + _is_streaming_http_type, + CustomHTTPClientFactory, + ) + from helpers import settings + + try: + async with AsyncExitStack() as stack: + if isinstance(server, MCPServerRemote): + set_ = settings.get_settings() + init_timeout = server.init_timeout or set_["mcp_client_init_timeout"] or 5 + tool_timeout = server.tool_timeout or set_["mcp_client_tool_timeout"] or 60 + client_factory = CustomHTTPClientFactory(verify=server.verify) + + if _is_streaming_http_type(server.type): + from mcp.client.streamable_http import streamablehttp_client + + read_stream, write_stream, _ = await stack.enter_async_context( + streamablehttp_client( + url=server.url, + headers=server.headers, + timeout=timedelta(seconds=init_timeout), + sse_read_timeout=timedelta(seconds=tool_timeout), + httpx_client_factory=client_factory, + ) + ) + else: + from mcp.client.sse import sse_client + + read_stream, write_stream = await stack.enter_async_context( + sse_client( + url=server.url, + headers=server.headers, + timeout=init_timeout, + sse_read_timeout=tool_timeout, + httpx_client_factory=client_factory, + ) + ) + elif isinstance(server, MCPServerLocal): + from mcp import StdioServerParameters + from mcp.client.stdio import stdio_client + from shutil import which + + if not server.command or not which(server.command): + raise ValueError(f"Command '{server.command}' not found") + + params = StdioServerParameters( + command=server.command, + args=server.args, + env=server.env, + encoding=server.encoding, + encoding_error_handler=server.encoding_error_handler, + ) + read_stream, write_stream = await stack.enter_async_context( + stdio_client(params) + ) + else: + raise TypeError(f"Unsupported server type: {type(server)}") + + self.session = await stack.enter_async_context( + ClientSession( + read_stream, + write_stream, + read_timeout_seconds=timedelta(seconds=120), + ) + ) + await _initialize_with_ui_ext(self.session) + + # Signal that we are ready to accept requests + self._ready.set() + + # Process requests until a None sentinel arrives + while True: + item = await self._queue.get() + if item is None: + break + coro_factory, future = item + try: + result = await coro_factory(self.session) + if not future.done(): + future.set_result(result) + except Exception as exc: + if not future.done(): + future.set_exception(exc) + except Exception as exc: + # If we haven't signalled ready yet, store the error so open() can raise it + if not self._ready.is_set(): + self._open_error = exc + self._ready.set() + else: + PrintStyle(font_color="red", padding=True).print( + f"MCP Apps: Proxy session background task error: {exc}" + ) + finally: + self.session = None + + async def execute(self, coro_factory): + """Submit work to the background task and wait for the result.""" + loop = asyncio.get_running_loop() + future = loop.create_future() + await self._queue.put((coro_factory, future)) + return await future + + async def close(self): + """Signal the background task to shut down and wait for it.""" + self.session = None + if self._queue: + try: + await self._queue.put(None) + except Exception: + pass + if self._task and not self._task.done(): + try: + await asyncio.wait_for(self._task, timeout=5) + except (asyncio.TimeoutError, Exception): + self._task.cancel() + self._task = None + self._queue = None + + +class MCPAppsManager: + """Singleton managing MCP App lifecycle and communication.""" + + _instance: Optional["MCPAppsManager"] = None + _lock = threading.Lock() + + def __init__(self): + self._apps: dict[str, _ActiveApp] = {} + self._resource_cache: dict[str, str] = {} + self._proxy_sessions: dict[str, _ProxySession] = {} + + @classmethod + def get_instance(cls) -> "MCPAppsManager": + if cls._instance is None: + with cls._lock: + if cls._instance is None: + cls._instance = cls() + return cls._instance + + def register_app( + self, + server_name: str, + tool_name: str, + resource_uri: str, + html_content: str, + tool_args: dict[str, Any], + tool_result: dict[str, Any] | None, + ui_meta: dict[str, Any], + tool_description: str = "", + tool_input_schema: dict[str, Any] | None = None, + ) -> str: + """Register a new active app instance. Returns the app_id.""" + app_id = str(uuid.uuid4()) + app = _ActiveApp( + app_id=app_id, + server_name=server_name, + tool_name=tool_name, + resource_uri=resource_uri, + html_content=html_content, + tool_args=tool_args, + tool_result=tool_result, + ui_meta=ui_meta, + tool_description=tool_description, + tool_input_schema=tool_input_schema, + ) + with self._lock: + self._apps[app_id] = app + PrintStyle(font_color="cyan", padding=True).print( + f"MCP Apps: Registered app '{app_id}' for tool '{server_name}.{tool_name}'" + ) + return app_id + + def get_app(self, app_id: str) -> _ActiveApp | None: + with self._lock: + return self._apps.get(app_id) + + def remove_app(self, app_id: str) -> None: + with self._lock: + self._apps.pop(app_id, None) + + def get_app_data(self, app_id: str) -> dict[str, Any] | None: + """Return serializable app data for the frontend.""" + app = self.get_app(app_id) + if not app: + return None + return { + "app_id": app.app_id, + "server_name": app.server_name, + "tool_name": app.tool_name, + "resource_uri": app.resource_uri, + "html_content": app.html_content, + "tool_args": app.tool_args, + "tool_result": app.tool_result, + "ui_meta": app.ui_meta, + "tool_description": app.tool_description, + "tool_input_schema": app.tool_input_schema, + } + + def cache_resource(self, uri: str, html: str) -> None: + with self._lock: + self._resource_cache[uri] = html + + def get_cached_resource(self, uri: str) -> str | None: + with self._lock: + return self._resource_cache.get(uri) + + @staticmethod + def _find_mcp_server(server_name: str, tool_name: str | None = None): + """Find an MCP server (and optionally verify it has a tool). + Returns the server reference so callers can invoke async methods + without holding MCPConfig's threading lock.""" + import helpers.mcp_handler as mcp_handler + + mcp_config = mcp_handler.MCPConfig.get_instance() + for server in mcp_config.servers: + if server.name == server_name: + if tool_name is None or server.has_tool(tool_name): + return server + return None + + async def fetch_ui_resource(self, server_name: str, resource_uri: str) -> str: + """Fetch a ui:// resource from an MCP server. Uses cache if available.""" + cached = self.get_cached_resource(resource_uri) + if cached: + return cached + + import helpers.mcp_handler as mcp_handler + + mcp_config = mcp_handler.MCPConfig.get_instance() + result = await mcp_config.read_resource(server_name, resource_uri) + + html_content = "" + for content in result.contents: + if hasattr(content, "text") and content.text: + html_content = content.text + break + elif hasattr(content, "blob") and content.blob: + import base64 + html_content = base64.b64decode(content.blob).decode("utf-8") + break + + if not html_content: + raise ValueError( + f"UI resource '{resource_uri}' from server '{server_name}' returned no content" + ) + + self.cache_resource(resource_uri, html_content) + return html_content + + async def _get_proxy_session(self, server_name: str) -> _ProxySession: + """Get or create a persistent proxy session for the given server.""" + ps = self._proxy_sessions.get(server_name) + if ps and ps.session is not None: + return ps + + # Create a new persistent session + server = self._find_mcp_server(server_name) + if not server: + raise ValueError(f"MCP server '{server_name}' not found") + + ps = _ProxySession() + await ps.open(server) + self._proxy_sessions[server_name] = ps + PrintStyle(font_color="cyan", padding=True).print( + f"MCP Apps: Opened persistent proxy session for '{server_name}'" + ) + return ps + + async def _close_proxy_session(self, server_name: str): + """Close and discard a persistent proxy session.""" + ps = self._proxy_sessions.pop(server_name, None) + if ps: + await ps.close() + PrintStyle(font_color="cyan", padding=True).print( + f"MCP Apps: Closed proxy session for '{server_name}'" + ) + + async def _proxy_with_retry(self, server_name: str, coro_factory): + """Run coro_factory(session) via the background task, with one retry on failure.""" + for attempt in range(2): + try: + ps = await self._get_proxy_session(server_name) + return await ps.execute(coro_factory) + except Exception: + if attempt == 0: + PrintStyle(font_color="yellow", padding=True).print( + f"MCP Apps: Proxy session error for '{server_name}', recreating..." + ) + await self._close_proxy_session(server_name) + else: + raise + + async def proxy_tool_call( + self, app_id: str, tool_name: str, arguments: dict[str, Any] + ) -> dict[str, Any]: + """Proxy a tools/call request from an iframe back to the MCP server.""" + app = self.get_app(app_id) + if not app: + return {"error": {"code": -32000, "message": f"App '{app_id}' not found"}} + + try: + from mcp.types import CallToolResult + + async def do_call(session): + return await session.call_tool(tool_name, arguments) + + result: CallToolResult = await self._proxy_with_retry(app.server_name, do_call) + content_list = [] + for item in result.content: + if item.type == "text": + content_list.append({"type": "text", "text": item.text}) + elif item.type == "image": + content_list.append({ + "type": "image", + "data": item.data, + "mimeType": item.mimeType, + }) + response = {"content": content_list, "isError": result.isError} + if hasattr(result, "structuredContent") and result.structuredContent: + response["structuredContent"] = result.structuredContent + return response + except Exception as e: + PrintStyle(font_color="red", padding=True).print( + f"MCP Apps: Proxy tool call failed for '{app.server_name}.{tool_name}': {e}" + ) + return {"error": {"code": -32000, "message": str(e)}} + + async def proxy_resource_read( + self, app_id: str, uri: str + ) -> dict[str, Any]: + """Proxy a resources/read request from an iframe back to the MCP server.""" + app = self.get_app(app_id) + if not app: + return {"error": {"code": -32000, "message": f"App '{app_id}' not found"}} + + try: + async def do_read(session): + return await session.read_resource(uri) + + result = await self._proxy_with_retry(app.server_name, do_read) + contents = [] + for c in result.contents: + entry: dict[str, Any] = {"uri": str(c.uri)} + if hasattr(c, "mimeType") and c.mimeType: + entry["mimeType"] = c.mimeType + if hasattr(c, "text") and c.text: + entry["text"] = c.text + elif hasattr(c, "blob") and c.blob: + entry["blob"] = c.blob + contents.append(entry) + return {"contents": contents} + except Exception as e: + PrintStyle(font_color="red", padding=True).print( + f"MCP Apps: Proxy resource read failed for '{uri}': {e}" + ) + return {"error": {"code": -32000, "message": str(e)}} diff --git a/plugins/_mcp_apps/plugin.yaml b/plugins/_mcp_apps/plugin.yaml new file mode 100644 index 0000000000..8eef78d095 --- /dev/null +++ b/plugins/_mcp_apps/plugin.yaml @@ -0,0 +1,8 @@ +name: mcp_apps +title: MCP Apps +description: Renders interactive UI applications from MCP servers in sandboxed iframes within chat messages. Implements the MCP Apps extension (SEP-1865). +version: 0.1.0 +settings_sections: [] +per_project_config: false +per_agent_config: false +always_enabled: false diff --git a/plugins/_mcp_apps/test_mcp/__init__.py b/plugins/_mcp_apps/test_mcp/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/plugins/_mcp_apps/test_mcp/mcp_elicitation_test_server.py b/plugins/_mcp_apps/test_mcp/mcp_elicitation_test_server.py new file mode 100644 index 0000000000..c6b7f270e4 --- /dev/null +++ b/plugins/_mcp_apps/test_mcp/mcp_elicitation_test_server.py @@ -0,0 +1,402 @@ +#!/usr/bin/env python3 +""" +Simple MCP server for end-to-end testing of the elicitation feature. + +Usage: + Start the server: + python tests/mcp_elicitation_test_server.py + + Add to Agent Zero MCP config as: + { + "name": "elicitation-test", + "type": "streamable-http", + "url": "http://localhost:8100/mcp" + } + +Tools provided: + - greet_user: Elicits user's name and greeting style, returns a personalized greeting. + - create_task: Elicits task details (title, priority, description), returns summary. + - confirm_action: Elicits a yes/no confirmation before proceeding. + - simple_echo: No elicitation, just echoes input (control test). +""" + +import json +import time +from enum import Enum +from typing import Optional + +from fastmcp import FastMCP, Context +from fastmcp.server.elicitation import AcceptedElicitation +from mcp.types import TextContent, SamplingMessage +from pydantic import BaseModel, Field + + +mcp = FastMCP( + name="elicitation-test", + instructions="A test server for MCP elicitation. Use the tools to test human-in-the-loop input gathering.", +) + + +# --- Elicitation response models --- + +class GreetingInfo(BaseModel): + name: str = Field(description="Your name") + style: str = Field(description="Greeting style: formal, casual, or pirate") + + +class Priority(str, Enum): + LOW = "low" + MEDIUM = "medium" + HIGH = "high" + CRITICAL = "critical" + + +class TaskInfo(BaseModel): + title: str = Field(description="Task title") + priority: Priority = Field(default=Priority.MEDIUM, description="Task priority level") + description: str = Field(default="", description="Optional task description") + + +class Confirmation(BaseModel): + confirmed: bool = Field(description="Do you want to proceed?") + + +# --- Tools --- + +@mcp.tool() +async def greet_user(ctx: Context, reason: str = "general") -> str: + """Generate a personalized greeting. Will ask for the user's name and preferred greeting style. + + Args: + reason: Why the greeting is being generated (e.g. 'welcome', 'farewell', 'general'). + """ + result = await ctx.elicit( + message="I'd like to greet you! Please provide your name and preferred greeting style.", + response_type=GreetingInfo, + ) + + if isinstance(result, AcceptedElicitation): + name = result.data.name + style = result.data.style.lower() + if style == "formal": + return f"Good day, {name}. It is a pleasure to make your acquaintance." + elif style == "pirate": + return f"Ahoy, {name}! Welcome aboard, ye scallywag!" + else: + return f"Hey {name}! What's up?" + else: + return f"Greeting cancelled (action: {result.action})." + + +@mcp.tool() +async def create_task(ctx: Context, project: str = "default") -> str: + """Create a new task. Will ask for task details via elicitation. + + Args: + project: The project to create the task in. + """ + result = await ctx.elicit( + message=f"Please provide details for the new task in project '{project}'.", + response_type=TaskInfo, + ) + + if isinstance(result, AcceptedElicitation): + task = result.data + return ( + f"Task created in '{project}':\n" + f" Title: {task.title}\n" + f" Priority: {task.priority.value}\n" + f" Description: {task.description or '(none)'}" + ) + else: + return f"Task creation cancelled (action: {result.action})." + + +@mcp.tool() +async def confirm_action(action_description: str, ctx: Context) -> str: + """Ask for user confirmation before performing an action. + + Args: + action_description: Description of the action that needs confirmation. + """ + result = await ctx.elicit( + message=f"Please confirm: {action_description}", + response_type=Confirmation, + ) + + if isinstance(result, AcceptedElicitation): + if result.data.confirmed: + return f"Action confirmed: {action_description}. Proceeding." + else: + return f"User explicitly declined via the form for: {action_description}." + else: + return f"Confirmation cancelled (action: {result.action})." + + +@mcp.tool() +async def simple_echo(message: str) -> str: + """Echo the input message back. No elicitation involved (control test). + + Args: + message: The message to echo. + """ + return f"Echo: {message}" + + +# --- Sampling tools --- + +@mcp.tool() +async def summarize_text(ctx: Context, text: str) -> str: + """Summarize a piece of text using the client's LLM via MCP sampling. + + Args: + text: The text to summarize. + """ + result = await ctx.sample( + messages=[ + SamplingMessage( + role="user", + content=TextContent(type="text", text=f"Please summarize the following text in 2-3 sentences:\n\n{text}"), + ) + ], + system_prompt="You are a concise summarizer. Respond only with the summary.", + max_tokens=256, + temperature=0.3, + ) + return f"Summary: {result.text}" + + +@mcp.tool() +async def analyze_sentiment(ctx: Context, text: str) -> str: + """Analyze the sentiment of text using the client's LLM via MCP sampling. + + Args: + text: The text to analyze. + """ + result = await ctx.sample( + messages=[ + SamplingMessage( + role="user", + content=TextContent(type="text", text=f"Analyze the sentiment of this text and respond with one word (positive, negative, or neutral) followed by a brief explanation:\n\n{text}"), + ) + ], + system_prompt="You are a sentiment analysis expert. Be concise.", + max_tokens=128, + temperature=0.0, + ) + return f"Sentiment analysis: {result.text}" + + +# --- MCP Apps tools --- + +DASHBOARD_HTML = """ + + + +Server Dashboard + + + +

๐Ÿ“Š Server Dashboard

+
+
+

Server Time

+
Loading...
+
Last updated
+
+
+

Status

+
โ—
+
Checking...
+
+
+

Uptime

+
--
+
Hours
+
+
+

Requests

+
--
+
Total served
+
+
+ +
+ + + +""" + +_server_start = time.time() +_request_count = 0 + + +@mcp.resource( + "ui://elicitation-test/dashboard", + name="Server Dashboard", + description="Interactive server monitoring dashboard", + mime_type="text/html", +) +def get_dashboard_html() -> str: + """Serve the dashboard HTML for the MCP App.""" + return DASHBOARD_HTML + + +@mcp.tool( + meta={ + "ui": { + "resourceUri": "ui://elicitation-test/dashboard", + "visibility": ["model", "app"], + } + } +) +async def show_dashboard(ctx: Context, title: str = "Server Dashboard") -> str: + """Show an interactive server monitoring dashboard. Returns live server statistics. + + This tool demonstrates MCP Apps โ€” it renders an interactive UI in the host. + """ + global _request_count + _request_count += 1 + uptime = (time.time() - _server_start) / 3600 + data = { + "time": time.strftime("%Y-%m-%d %H:%M:%S"), + "healthy": True, + "uptime_hours": round(uptime, 2), + "total_requests": _request_count, + } + return json.dumps(data) + + +@mcp.tool( + meta={ + "ui": { + "resourceUri": "ui://elicitation-test/dashboard", + "visibility": ["app"], + } + } +) +async def get_server_stats(name: str = "Server") -> str: + """Get current server statistics. This is an app-only tool (hidden from model). + + Called by the dashboard UI's refresh button. + """ + global _request_count + _request_count += 1 + uptime = (time.time() - _server_start) / 3600 + data = { + "time": time.strftime("%Y-%m-%d %H:%M:%S"), + "healthy": True, + "uptime_hours": round(uptime, 2), + "total_requests": _request_count, + } + return json.dumps(data) + + +if __name__ == "__main__": + mcp.run(transport="streamable-http", host="0.0.0.0", port=8100) diff --git a/plugins/_mcp_apps/webui/mcp-app-bridge.js b/plugins/_mcp_apps/webui/mcp-app-bridge.js new file mode 100644 index 0000000000..69ef4fde5b --- /dev/null +++ b/plugins/_mcp_apps/webui/mcp-app-bridge.js @@ -0,0 +1,402 @@ +/** + * MCP App Bridge โ€” PostMessage JSON-RPC bridge between sandboxed iframes and the host. + * + * Implements the host side of the MCP Apps communication protocol (SEP-1865). + * Handles ui/initialize, tools/call, resources/read, notifications/message, + * and sends tool-input/tool-result notifications to the iframe. + */ + +const PROTOCOL_VERSION = "2025-06-18"; + +export class McpAppBridge { + /** + * @param {HTMLIFrameElement} iframe - The sandbox iframe element + * @param {object} options + * @param {string} options.appId - Unique app instance ID + * @param {string} options.serverName - MCP server name + * @param {string} options.toolName - Tool name (without server prefix) + * @param {object} options.toolArgs - Tool call arguments + * @param {object|null} options.toolResult - Tool call result + * @param {object} options.uiMeta - UI metadata from tool definition + * @param {Function} options.onMessage - Callback for ui/message requests + * @param {Function} options.onSizeChanged - Callback for size change notifications + * @param {Function} options.onTeardownRequest - Callback for teardown request + * @param {Function} options.wsEmit - WebSocket emit function for proxying + */ + constructor(iframe, options) { + this.iframe = iframe; + this.appId = options.appId; + this.serverName = options.serverName; + this.toolName = options.toolName; + this.toolArgs = options.toolArgs || {}; + this.toolResult = options.toolResult || null; + this.toolDescription = options.toolDescription || ""; + this.toolInputSchema = options.toolInputSchema || { type: "object" }; + this.uiMeta = options.uiMeta || {}; + this.onMessage = options.onMessage || (() => {}); + this.onSizeChanged = options.onSizeChanged || (() => {}); + this.onTeardownRequest = options.onTeardownRequest || (() => {}); + this.wsRequest = options.wsRequest; + + this._initialized = false; + this._pendingRequests = new Map(); + this._requestDebounce = new Map(); + this._nextHostId = 1; + this._messageHandler = this._handleMessage.bind(this); + + window.addEventListener("message", this._messageHandler); + } + + destroy() { + window.removeEventListener("message", this._messageHandler); + this._pendingRequests.clear(); + for (const entry of this._requestDebounce.values()) clearTimeout(entry.timer); + this._requestDebounce.clear(); + } + + /** + * Send a JSON-RPC notification to the iframe. + */ + _sendNotification(method, params) { + if (!this.iframe?.contentWindow) return; + this.iframe.contentWindow.postMessage( + { jsonrpc: "2.0", method, params }, + "*" + ); + } + + /** + * Send a JSON-RPC response to the iframe. + */ + _sendResponse(id, result) { + if (!this.iframe?.contentWindow) return; + this.iframe.contentWindow.postMessage( + { jsonrpc: "2.0", id, result }, + "*" + ); + } + + /** + * Send a JSON-RPC error response to the iframe. + */ + _sendError(id, code, message) { + if (!this.iframe?.contentWindow) return; + this.iframe.contentWindow.postMessage( + { jsonrpc: "2.0", id, error: { code, message } }, + "*" + ); + } + + /** + * Send tool input notification after initialization. + */ + _sendToolInput() { + this._sendNotification("ui/notifications/tool-input", { + arguments: this.toolArgs, + }); + } + + /** + * Send tool result notification. + */ + _sendToolResult() { + if (this.toolResult) { + this._sendNotification("ui/notifications/tool-result", this.toolResult); + } + } + + /** + * Handle incoming postMessage events from the iframe. + */ + _handleMessage(event) { + if (event.source !== this.iframe?.contentWindow) return; + + const data = event.data; + if (!data || data.jsonrpc !== "2.0") return; + + // It's a request (has id and method) + if (data.id !== undefined && data.method) { + this._handleRequest(data); + return; + } + + // It's a notification (has method but no id) + if (data.method && data.id === undefined) { + this._handleNotification(data); + return; + } + + // It's a response to a host-initiated request (has id but no method) + if (data.id !== undefined && !data.method) { + const pending = this._pendingRequests.get(data.id); + if (pending) { + this._pendingRequests.delete(data.id); + if (data.error) { + pending.reject(new Error(data.error.message || "Unknown error")); + } else { + pending.resolve(data.result); + } + } + } + } + + /** + * Handle JSON-RPC requests from the iframe. + * + * Uses a short debounce (per method) so that duplicate requests fired in + * rapid succession (e.g. from the MCP Apps SDK re-connecting) are coalesced + * into a single bridge operation (last-write-wins). + */ + _handleRequest(msg) { + const { method } = msg; + + const DEBOUNCE_METHODS = new Set([ + "ui/initialize", "tools/call", "resources/read", + ]); + + if (DEBOUNCE_METHODS.has(method)) { + const existing = this._requestDebounce.get(method); + if (existing) { + clearTimeout(existing.timer); + } + + this._requestDebounce.set(method, { + msg, + timer: setTimeout(() => { + this._requestDebounce.delete(method); + this._dispatchRequest(msg); + }, 15), + }); + return; + } + + this._dispatchRequest(msg); + } + + /** + * Dispatch a (possibly debounced) JSON-RPC request to its handler. + */ + async _dispatchRequest(msg) { + const { id, method, params } = msg; + + switch (method) { + case "ui/initialize": + this._handleInitialize(id, params); + break; + + case "tools/call": + await this._handleToolsCall(id, params); + break; + + case "resources/read": + await this._handleResourcesRead(id, params); + break; + + case "ui/open-link": + this._handleOpenLink(id, params); + break; + + case "ui/message": + this._handleUiMessage(id, params); + break; + + case "ui/update-model-context": + this._sendResponse(id, {}); + break; + + case "ui/request-display-mode": + this._sendResponse(id, { mode: "inline" }); + break; + + case "ping": + this._sendResponse(id, {}); + break; + + default: + this._sendError(id, -32601, `Method not found: ${method}`); + } + } + + /** + * Handle JSON-RPC notifications from the iframe. + */ + _handleNotification(msg) { + const { method, params } = msg; + + switch (method) { + case "ui/notifications/initialized": + if (!this._initialized) { + this._initialized = true; + this._sendToolInput(); + this._sendToolResult(); + } + break; + + case "ui/notifications/size-changed": + if (params) { + this.onSizeChanged(params); + } + break; + + case "ui/notifications/request-teardown": + this.onTeardownRequest(); + break; + + case "notifications/cancelled": + // Advisory per MCP spec โ€” acknowledged but no action needed. + break; + + case "notifications/message": + // Log message from app โ€” just consume silently + break; + } + } + + /** + * Handle ui/initialize request. + */ + _handleInitialize(id, params) { + // Each ui/initialize starts a fresh session โ€” reset so that + // tool-input / tool-result are re-sent after the next initialized notification. + this._initialized = false; + + const toolInfo = { + tool: { + name: this.toolName, + description: this.toolDescription || "", + inputSchema: this.toolInputSchema || { type: "object" }, + }, + }; + + const hostCapabilities = { + serverTools: { listChanged: false }, + serverResources: { listChanged: false }, + logging: {}, + }; + + const hostContext = { + toolInfo, + theme: document.documentElement.classList.contains("dark") ? "dark" : "light", + displayMode: "inline", + availableDisplayModes: ["inline"], + platform: "web", + }; + + this._sendResponse(id, { + protocolVersion: PROTOCOL_VERSION, + hostCapabilities, + hostInfo: { name: "agent-zero", version: "1.0.0" }, + hostContext, + }); + } + + /** + * Proxy tools/call to the backend via WebSocket. + */ + async _handleToolsCall(id, params) { + if (!params?.name) { + this._sendError(id, -32602, "Missing tool name"); + return; + } + + try { + const response = await this.wsRequest("mcp_app_tool_call", { + app_id: this.appId, + tool_name: params.name, + arguments: params.arguments || {}, + }); + + const first = response && Array.isArray(response.results) ? response.results[0] : null; + const result = first?.data; + + if (result?.error) { + this._sendError(id, result.error.code || -32000, result.error.message); + } else { + this._sendResponse(id, result || {}); + } + } catch (e) { + this._sendError(id, -32000, e.message || "Tool call failed"); + } + } + + /** + * Proxy resources/read to the backend via WebSocket. + */ + async _handleResourcesRead(id, params) { + if (!params?.uri) { + this._sendError(id, -32602, "Missing resource URI"); + return; + } + + try { + const response = await this.wsRequest("mcp_app_resource_read", { + app_id: this.appId, + uri: params.uri, + }); + + const first = response && Array.isArray(response.results) ? response.results[0] : null; + const result = first?.data; + + if (result?.error) { + this._sendError(id, result.error.code || -32000, result.error.message); + } else { + this._sendResponse(id, result || {}); + } + } catch (e) { + this._sendError(id, -32000, e.message || "Resource read failed"); + } + } + + /** + * Handle ui/open-link โ€” open URL in new tab. + */ + _handleOpenLink(id, params) { + if (params?.url) { + window.open(params.url, "_blank", "noopener,noreferrer"); + this._sendResponse(id, {}); + } else { + this._sendError(id, -32602, "Missing URL"); + } + } + + /** + * Handle ui/message โ€” forward to host chat. + */ + _handleUiMessage(id, params) { + this.onMessage(params); + this._sendResponse(id, {}); + } + + /** + * Send host context change notification to the iframe. + */ + sendHostContextChanged(context) { + this._sendNotification("ui/notifications/host-context-changed", context); + } + + /** + * Initiate graceful teardown of the app. + */ + async teardown(reason = "host") { + const id = this._nextHostId++; + return new Promise((resolve) => { + this._pendingRequests.set(id, { + resolve: () => resolve(true), + reject: () => resolve(false), + }); + if (this.iframe?.contentWindow) { + this.iframe.contentWindow.postMessage( + { jsonrpc: "2.0", id, method: "ui/resource-teardown", params: { reason } }, + "*" + ); + } + // Timeout: don't wait forever + setTimeout(() => { + if (this._pendingRequests.has(id)) { + this._pendingRequests.delete(id); + resolve(false); + } + }, 3000); + }); + } +} diff --git a/plugins/_mcp_apps/webui/mcp-app-renderer.html b/plugins/_mcp_apps/webui/mcp-app-renderer.html new file mode 100644 index 0000000000..fce9b373f6 --- /dev/null +++ b/plugins/_mcp_apps/webui/mcp-app-renderer.html @@ -0,0 +1,115 @@ + +
+ + + + + + + +
+
+ +
+
+ +
+
+
+ + diff --git a/plugins/_mcp_apps/webui/mcp-app-sandbox.html b/plugins/_mcp_apps/webui/mcp-app-sandbox.html new file mode 100644 index 0000000000..03da51b2a6 --- /dev/null +++ b/plugins/_mcp_apps/webui/mcp-app-sandbox.html @@ -0,0 +1,149 @@ + + + + +MCP App Sandbox + + + + + + diff --git a/plugins/_mcp_apps/webui/mcp-app-store.js b/plugins/_mcp_apps/webui/mcp-app-store.js new file mode 100644 index 0000000000..09c8f30a74 --- /dev/null +++ b/plugins/_mcp_apps/webui/mcp-app-store.js @@ -0,0 +1,134 @@ +/** + * MCP Apps Alpine.js store โ€” manages active app instances and their iframe bridges. + */ +import { createStore } from "/js/AlpineStore.js"; +import { getNamespacedClient } from "/js/websocket.js"; +import { McpAppBridge } from "/usr/plugins/mcp_apps/webui/mcp-app-bridge.js"; + +const stateSocket = getNamespacedClient("/ws"); + +export const store = createStore("mcpApps", { + /** @type {Map} */ + _bridges: new Map(), + + initialized: false, + + async init() { + if (this.initialized) return; + this.initialized = true; + }, + + /** + * Initialize an app iframe with its bridge. + * Called from the mcp_app message renderer when the DOM element is ready. + * + * @param {string} appId + * @param {HTMLIFrameElement} sandboxIframe - The outer sandbox iframe + * @param {object} appData - App data from the backend + */ + async setupApp(appId, sandboxIframe, appData) { + if (this._bridges.has(appId)) return; + + const bridge = new McpAppBridge(sandboxIframe, { + appId: appData.app_id, + serverName: appData.server_name, + toolName: appData.tool_name, + toolArgs: appData.tool_args || {}, + toolResult: appData.tool_result || null, + toolDescription: appData.tool_description || "", + toolInputSchema: appData.tool_input_schema || { type: "object" }, + uiMeta: appData.ui_meta || {}, + onMessage: (params) => { + console.log("[mcp-apps] ui/message from app:", params); + }, + onSizeChanged: (params) => { + // Only auto-size height; width is controlled by the layout container. + // Applying the app's reported width causes an infinite resize loop. + // Add buffer (+24px) and hysteresis (ignore deltas โ‰ค20px) to prevent + // resize feedback loops between the app's ResizeObserver and the iframe. + if (params.height != null) { + const target = Math.min(params.height + 24, 800); + const current = parseFloat(sandboxIframe.style.height) || 400; + if (Math.abs(target - current) > 20) { + sandboxIframe.style.height = `${target}px`; + } + } + }, + onTeardownRequest: () => { + this.teardownApp(appId); + }, + wsRequest: (event, data) => stateSocket.request(event, data), + }); + + this._bridges.set(appId, bridge); + + // Wait for sandbox proxy ready, then send the HTML resource + const onSandboxReady = (event) => { + if (event.source !== sandboxIframe.contentWindow) return; + const data = event.data; + if (!data || data.method !== "ui/notifications/sandbox-proxy-ready") return; + + window.removeEventListener("message", onSandboxReady); + + sandboxIframe.contentWindow.postMessage({ + jsonrpc: "2.0", + method: "ui/notifications/sandbox-resource-ready", + params: { + html: appData.html_content, + csp: appData.ui_meta?.csp || null, + permissions: appData.ui_meta?.permissions || null, + }, + }, "*"); + }; + + window.addEventListener("message", onSandboxReady); + }, + + /** + * Fetch app data from the backend for a given app_id. + * @param {string} appId + * @returns {Promise} + */ + async fetchAppData(appId) { + try { + const response = await stateSocket.request("mcp_app_get_data", { app_id: appId }); + const first = response && Array.isArray(response.results) ? response.results[0] : null; + if (!first || first.ok !== true || !first.data) { + const errMsg = first?.data?.error || first?.error?.message || "No data returned"; + console.error("[mcp-apps] fetchAppData error:", errMsg); + return { error: errMsg }; + } + return first.data; + } catch (e) { + console.error("[mcp-apps] fetchAppData failed:", e); + return null; + } + }, + + /** + * Tear down an app and clean up its bridge. + * @param {string} appId + */ + async teardownApp(appId) { + const bridge = this._bridges.get(appId); + if (bridge) { + bridge.destroy(); + this._bridges.delete(appId); + } + + try { + await stateSocket.request("mcp_app_teardown", { app_id: appId }); + } catch (e) { + console.warn("[mcp-apps] teardown notify failed:", e); + } + }, + + /** + * Check if an app bridge exists. + * @param {string} appId + * @returns {boolean} + */ + hasApp(appId) { + return this._bridges.has(appId); + }, +});