-
Notifications
You must be signed in to change notification settings - Fork 0
Add MCP server support to live agent with Firecrawl web search #245
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -43,6 +43,11 @@ class CascadeConfig: | |||||
| llm_extra: Mapping[str, object] = field(default_factory=dict[str, object]) | ||||||
| # Extra streaming-TTS query params (the --tts-config escape hatch). | ||||||
| tts_extra: Mapping[str, str] = field(default_factory=dict[str, str]) | ||||||
| # MCP servers (name -> launch spec) whose tools the deepagents brain can call. Empty | ||||||
| # by default; populated from --mcp-config files and/or the --demo-tools curated set. | ||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Comment says mcp_servers is populated via --demo-tools, but that flag is not part of the current live command flow. The documented population path is no longer true.
Suggested change
Details✨ AI Reasoning Reply |
||||||
| mcp_servers: Mapping[str, Mapping[str, object]] = field( | ||||||
| default_factory=dict[str, Mapping[str, object]] | ||||||
| ) | ||||||
| # Whether STT formats finalized turns. The reply trigger waits for the formatted | ||||||
| # turn when on; with it off, an unformatted end-of-turn is the cue instead. | ||||||
| format_turns: bool = True | ||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,146 @@ | ||
| """Load tools from user-configured MCP servers for the `assembly live` agent. | ||
|
|
||
| The live voice agent's brain is a deepagents graph, so any Model Context Protocol | ||
| server's tools can be threaded into it through ``langchain-mcp-adapters`` — the same | ||
| adapter `docs_mcp.py` uses for the hosted AssemblyAI docs. This lets a spoken | ||
| conversation reach real tools (clock, weather, memory, a notes folder, …), bringing | ||
| `assembly live` toward Gemini-Live / ChatGPT-voice parity. | ||
|
|
||
| Two entry points feed the brain: | ||
|
|
||
| - :func:`default_servers` returns a curated, zero/low-auth set (time, fetch, memory, | ||
| filesystem, weather) that every live session loads out of the box. | ||
| - :func:`parse_mcp_config` reads one or more standard ``mcpServers`` JSON files — the | ||
| exact shape Claude Desktop / Claude Code use — so an existing config drops in | ||
| unchanged and can extend or override the defaults. | ||
|
|
||
| Launching a server is **best-effort per server**: a missing ``npx``/``uvx`` or an | ||
| offline run skips that one server (the others still load) rather than aborting the | ||
| session — a single broken tool can't sink a live demo. | ||
| """ | ||
|
|
||
| from __future__ import annotations | ||
|
|
||
| import asyncio | ||
| import json | ||
| from collections.abc import Callable, Mapping, Sequence | ||
| from pathlib import Path | ||
| from typing import TYPE_CHECKING | ||
|
|
||
| from aai_cli.core import jsonshape | ||
| from aai_cli.core.errors import UsageError | ||
|
|
||
| if TYPE_CHECKING: | ||
| from langchain_core.tools import BaseTool | ||
| from langchain_mcp_adapters.sessions import Connection | ||
|
|
||
| # One MCP server's launch spec, as it appears under "mcpServers" in a standard config: | ||
| # stdio servers carry {command, args, env}; remote servers carry {url}. | ||
| ServerSpec = Mapping[str, object] | ||
| # A loader maps (server name, adapter connection dict) -> the server's tools. Injected in | ||
| # tests so the per-server orchestration runs without subprocesses or sockets. | ||
| Loader = Callable[[str, "Connection"], "list[BaseTool]"] | ||
|
|
||
|
|
||
| def default_servers(filesystem_root: Path) -> dict[str, ServerSpec]: | ||
| """The curated server set every live session loads: zero/low-auth, fast, speakable. | ||
|
|
||
| Every entry is a published reference server runnable with no API key: | ||
| ``time``/``fetch`` over ``uvx`` (PyPI), ``memory``/``filesystem`` over ``npx`` (npm), | ||
| and an NWS-backed ``weather`` server. ``filesystem`` is rooted at ``filesystem_root`` | ||
| (the working directory) so "summarize my notes file" stays scoped to one folder. | ||
| """ | ||
| return { | ||
| "time": {"command": "uvx", "args": ["mcp-server-time"]}, | ||
| "fetch": {"command": "uvx", "args": ["mcp-server-fetch"]}, | ||
| "memory": {"command": "npx", "args": ["-y", "@modelcontextprotocol/server-memory"]}, | ||
| "filesystem": { | ||
| "command": "npx", | ||
| "args": ["-y", "@modelcontextprotocol/server-filesystem", str(filesystem_root)], | ||
| }, | ||
| "weather": {"command": "npx", "args": ["-y", "@h1deya/mcp-server-weather"]}, | ||
| } | ||
|
|
||
|
|
||
| def parse_mcp_config(paths: Sequence[Path]) -> dict[str, ServerSpec]: | ||
| """Merge the ``mcpServers`` maps from one or more standard MCP config JSON files. | ||
|
|
||
| Each file must be ``{"mcpServers": {name: spec, …}}`` (the Claude Desktop / Claude | ||
| Code shape). Later files win on a name clash. A malformed file, a missing | ||
| ``mcpServers`` key, or a spec with neither ``command`` nor ``url`` is a usage error, | ||
| surfaced before any audio device opens. | ||
| """ | ||
| servers: dict[str, ServerSpec] = {} | ||
| for path in paths: | ||
| try: | ||
| data = jsonshape.as_mapping(json.loads(path.read_text(encoding="utf-8"))) | ||
| except (OSError, json.JSONDecodeError) as exc: | ||
| raise UsageError(f"Could not read MCP config {str(path)!r}: {exc}") from exc | ||
| entries = jsonshape.as_mapping(data.get("mcpServers")) if data is not None else None | ||
| if entries is None: | ||
| raise UsageError( | ||
| f"MCP config {str(path)!r} has no 'mcpServers' object.", | ||
| suggestion='Expected {"mcpServers": {"name": {"command": "…"}}}.', | ||
| ) | ||
| for name, spec in entries.items(): | ||
| servers[name] = _validate_spec(name, spec) | ||
| return servers | ||
|
|
||
|
|
||
| def _validate_spec(name: str, spec: object) -> dict[str, object]: | ||
| """Return the spec as a mapping, or reject one naming neither a ``command`` nor ``url``.""" | ||
| mapping = jsonshape.as_mapping(spec) | ||
| if mapping is None or ("command" not in mapping and "url" not in mapping): | ||
| raise UsageError( | ||
| f"MCP server {name!r} needs a 'command' or 'url'.", | ||
| suggestion='e.g. {"command": "uvx", "args": ["mcp-server-time"]}.', | ||
| ) | ||
| return mapping | ||
|
|
||
|
|
||
| def _to_connection(spec: ServerSpec) -> Connection: | ||
| """Translate a standard ``mcpServers`` spec into a langchain-mcp-adapters connection. | ||
|
|
||
| A ``url`` spec becomes a ``streamable_http`` transport; otherwise it's a ``stdio`` | ||
| transport launched from ``command``/``args`` (passing ``env`` through when present). | ||
| """ | ||
| if "url" in spec: | ||
| return {"transport": "streamable_http", "url": str(spec["url"])} | ||
| args = [str(arg) for arg in jsonshape.object_list(spec.get("args"))] | ||
| env_map = jsonshape.as_mapping(spec.get("env")) | ||
| env = {str(k): str(v) for k, v in env_map.items()} if env_map is not None else None | ||
| return {"transport": "stdio", "command": str(spec["command"]), "args": args, "env": env} | ||
|
|
||
|
|
||
| def _load_server(name: str, conn: Connection) -> list[BaseTool]: | ||
| """Connect to one MCP server and return its tools (drives the async adapter).""" | ||
| from langchain_mcp_adapters.client import MultiServerMCPClient | ||
|
|
||
| async def _fetch() -> list[BaseTool]: | ||
| client = MultiServerMCPClient({name: conn}) | ||
| return await client.get_tools() | ||
|
|
||
| return asyncio.run(_fetch()) | ||
|
|
||
|
|
||
| def _safe_load(loader: Loader, name: str, spec: ServerSpec) -> list[BaseTool]: | ||
| """One server's tools, or ``[]`` if it won't start — so a failure is never fatal.""" | ||
| try: | ||
| return loader(name, _to_connection(spec)) | ||
| except Exception: | ||
| return [] | ||
|
|
||
|
|
||
| def load_mcp_tools( | ||
| servers: Mapping[str, ServerSpec], *, loader: Loader = _load_server | ||
| ) -> list[BaseTool]: | ||
| """Load the tools from every configured MCP server, skipping any that fail to start. | ||
|
|
||
| Each server is launched independently so one unreachable server (npx not installed, | ||
| an offline host) drops only its own tools — the rest still load. ``loader`` is the | ||
| only network/subprocess seam, injected in tests. | ||
| """ | ||
| tools: list[BaseTool] = [] | ||
| for name, spec in servers.items(): | ||
| tools.extend(_safe_load(loader, name, spec)) | ||
| return tools |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,37 @@ | ||
| """Optional Firecrawl web search for the live voice agent. | ||
|
|
||
| Firecrawl grounds the agent with live web search, enabled when a ``FIRECRAWL_API_KEY`` | ||
| is present in the environment. Search is read-only, so it is *not* gated behind the | ||
| approval flow. With no key set we simply omit the tool (the agent still has its URL | ||
| fetch and the AssemblyAI docs MCP), rather than erroring. | ||
|
|
||
| This mirrors ``web_search.py`` (Tavily) but reuses Firecrawl's official LangChain | ||
| integration; the live agent prefers it as its default search tool. | ||
| """ | ||
|
|
||
| from __future__ import annotations | ||
|
|
||
| from typing import TYPE_CHECKING | ||
|
|
||
| from aai_cli.core import env | ||
|
|
||
| if TYPE_CHECKING: | ||
| from langchain_core.tools import BaseTool | ||
|
|
||
| # Firecrawl's SDK reads this from the environment; we gate on its presence so we never | ||
| # hand the agent a search tool that will fail on first use for lack of a key. | ||
| FIRECRAWL_API_KEY_ENV = "FIRECRAWL_API_KEY" | ||
|
|
||
| # The name ``FirecrawlSearch`` registers itself under. The prompt builder detects | ||
| # web-search availability by this name, so a test pins it against the tool. | ||
| WEB_SEARCH_TOOL_NAME = "firecrawl_search" | ||
|
|
||
|
|
||
| def build_web_search_tool() -> BaseTool | None: | ||
| """The Firecrawl web-search tool, or ``None`` when no ``FIRECRAWL_API_KEY`` is set.""" | ||
| if not env.get(FIRECRAWL_API_KEY_ENV): | ||
| return None | ||
|
|
||
| from langchain_firecrawl import FirecrawlSearch | ||
|
|
||
| return FirecrawlSearch() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doc text still says web search depends on TAVILY_API_KEY, but this module now uses Firecrawl. This contradicts actual runtime gating and misstates when search is available.
Details
✨ AI Reasoning
The surrounding implementation now uses Firecrawl for web search gating, yet the guidance text still says missing web search is tied to a different API key name. That mismatch makes the documented condition impossible to satisfy as written and can mislead future maintenance and debugging.
🔧 How do I fix it?
Trace execution paths carefully. Ensure precondition checks happen before using values, validate ranges before checking impossible conditions, and don't check for states that the code has already ruled out.
Reply
@AikidoSec feedback: [FEEDBACK]to get better review comments in the future.Reply
@AikidoSec ignore: [REASON]to ignore this issue.More info