diff --git a/REFERENCE.md b/REFERENCE.md index 801ef66..9288fbb 100644 --- a/REFERENCE.md +++ b/REFERENCE.md @@ -94,7 +94,7 @@ each carrying a `"type"` field to dispatch on: | ------- | ----------- | | `assembly stream --json` | `begin`, `turn`, `termination` (with `--from-stdin`, a `source` event precedes each file's events) | | `assembly agent --json` | `session.ready`, `transcript.user.delta`, `transcript.user`, `reply.started`, `transcript.agent`, `reply.done` | -| `assembly live --json` | `session.ready`, `transcript.user.delta`, `transcript.user`, `reply.started`, `transcript.agent`, `reply.done` | +| `assembly live --json` | `session.ready`, `transcript.user.delta`, `transcript.user`, `tool.use`, `reply.started`, `transcript.agent`, `reply.done` | | `assembly dictate --json` | `utterance` | | `assembly llm --follow --json` | `answer` | | `assembly transcribe --json` | `result` (one per source), then `reduce` if `--llm-reduce` is set | @@ -143,21 +143,17 @@ The two are mutually exclusive. ## Live agent tools (MCP) `assembly live` answers each spoken turn with a tool-using agent, so it can reach -external tools mid-conversation. Out of the box it loads its built-in URL fetch, -the AssemblyAI docs, and a curated, no-auth MCP toolset: `time` and `fetch` -(`uvx`), `memory` and `filesystem` (`npx`, the latter rooted at the working -directory), and an NWS-backed `weather` server. - -Firecrawl web search also loads when a `FIRECRAWL_API_KEY` is set; without it the -session prints a one-line notice and runs without web search (every other default -tool needs no key). - -`--mcp-config FILE` adds your own servers on top of the defaults, from a standard -`mcpServers` JSON file — the same +external tools mid-conversation. Its toolset is deliberately small — a low-latency +spoken turn does best with one obvious tool rather than a large menu to choose +among — so its one built-in tool is Firecrawl web search. It loads when a +`FIRECRAWL_API_KEY` is set; without it the session prints a one-line notice and +runs from the model's own knowledge (no web search). + +`--mcp-config FILE` adds your own MCP servers (none load by default), from a +standard `mcpServers` JSON file — the same `{"mcpServers": {"name": {"command": "…", "args": […]}}}` shape Claude Desktop and -Claude Code use. Repeat the flag to merge several files; a later file (or a config -entry sharing a default's name) wins on a clash. Remote servers use `{"url": "…"}` -instead of `command`/`args`. +Claude Code use. Repeat the flag to merge several files; a later file wins on a +name clash. Remote servers use `{"url": "…"}` instead of `command`/`args`. Each server is launched independently and best-effort: one that won't start (a missing `npx`/`uvx`, an offline host) drops only its own tools, so a single broken diff --git a/aai_cli/agent/events.py b/aai_cli/agent/events.py index 0f917e1..3247bfc 100644 --- a/aai_cli/agent/events.py +++ b/aai_cli/agent/events.py @@ -52,6 +52,13 @@ class ReplyStarted(_Event): type: Literal["reply.started"] = "reply.started" +class ToolUse(_Event): + """The agent invoked a tool mid-reply (``label`` is a short, human description).""" + + type: Literal["tool.use"] = "tool.use" + label: str + + class AgentTranscript(_Event): """The agent's reply transcript (``interrupted`` when the user barged in).""" @@ -67,4 +74,4 @@ class ReplyDone(_Event): interrupted: bool -Event = SessionReady | UserDelta | UserFinal | ReplyStarted | AgentTranscript | ReplyDone +Event = SessionReady | UserDelta | UserFinal | ToolUse | ReplyStarted | AgentTranscript | ReplyDone diff --git a/aai_cli/agent/render.py b/aai_cli/agent/render.py index 98f6a2b..7288fc8 100644 --- a/aai_cli/agent/render.py +++ b/aai_cli/agent/render.py @@ -75,6 +75,19 @@ def user_final(self, text: str) -> None: else: self._finalize_line(_labeled("you: ", text, style="aai.you")) + def tool_call(self, label: str) -> None: + """Surface that the agent is using a tool (e.g. "Searching the web") while it thinks. + + JSON emits a ``tool.use`` event; piped text keeps it off stdout (transcript-only) by + routing to stderr; human mode shows a muted inline line. + """ + if self.json_mode: + self._emit_event(events.ToolUse(label=label)) + elif self.text_mode: + self._status(f"{label}…") + else: + self._line(_labeled("", f"{label}…", style="aai.muted")) + # --- agent ------------------------------------------------------------- def reply_started(self) -> None: if self.json_mode: diff --git a/aai_cli/agent_cascade/brain.py b/aai_cli/agent_cascade/brain.py index 7f9b8d2..e814533 100644 --- a/aai_cli/agent_cascade/brain.py +++ b/aai_cli/agent_cascade/brain.py @@ -1,9 +1,10 @@ """Deepagents-powered reply brain for the live voice cascade. `assembly live` answers each spoken turn with a deepagents graph instead of a single -LLM completion, so the agent can transparently reach for tools — web search, URL -fetch, the AssemblyAI docs — mid-conversation, mimicking a live multimodal assistant -(the "talk to Gemini Live" experience). The graph is built once per session +LLM completion, so the agent can transparently reach for a tool — web search — +mid-conversation, mimicking a live multimodal assistant (the "talk to Gemini Live" +experience). The toolset is deliberately minimal: a low-latency spoken turn does best +with one obvious tool rather than a menu it has to choose among. The graph is built once per session (:func:`build_graph`) and invoked statelessly per turn with the running history the cascade already keeps (:func:`build_completer`); tools are read-only and auto-approved, because a spoken turn can't pause for a keyboard confirmation, and the system prompt @@ -22,9 +23,9 @@ from aai_cli.agent_cascade.config import CascadeConfig from aai_cli.code_agent.agent import CompiledAgent -from aai_cli.code_agent.fetch_tool import FETCH_TOOL_NAME from aai_cli.code_agent.firecrawl_search import WEB_SEARCH_TOOL_NAME from aai_cli.core import debuglog +from aai_cli.core.errors import CLIError if TYPE_CHECKING: from langchain_core.tools import BaseTool @@ -41,6 +42,16 @@ # arbitrary tuning knob — a +-1 shift is behaviorally equivalent, so no test can kill it. _RESULT_LOG_CAP = 500 # pragma: no mutate +# Human, speakable labels for the tool affordance the live UI shows while a tool runs (so a +# spoken turn that pauses to use a tool says *why* it's working, not just spin silently). +_TOOL_LABELS = {WEB_SEARCH_TOOL_NAME: "Searching the web"} + + +def _tool_label(name: str) -> str: + """A short present-tense label for a tool call, shown as the live UI's tool affordance.""" + return _TOOL_LABELS.get(name, f"Using {name}") + + # Closes every guidance variant: the reply is spoken, so it must stay short and plain. _SPOKEN_TAIL = ( "Your reply is read aloud, so keep it short and spoken — no markdown, lists, code, or raw URLs." @@ -70,21 +81,17 @@ def _join_clause(parts: list[str]) -> str: def _tool_capabilities(tools: Sequence[BaseTool]) -> list[str]: - """The spoken-capability phrases backed by an actually-present tool. + """The spoken-capability phrase backed by a present built-in tool. - Derived from the resolved tool names so the prompt never advertises a capability the - agent can't perform: web search is present only with a ``FIRECRAWL_API_KEY``, and the - docs tools are best-effort (absent when the docs host is unreachable). + The live agent's only built-in tool is Firecrawl web search, bound just when a + ``FIRECRAWL_API_KEY`` is set — so the prompt advertises web search only when the agent + can really do it. Advertising a tool it doesn't have made it announce an action ("I'll + search…") it then couldn't take, leaving the turn with no answer. """ names = {tool.name for tool in tools} - capabilities: list[str] = [] if WEB_SEARCH_TOOL_NAME in names: - capabilities.append("search the web for current or unfamiliar facts") - if FETCH_TOOL_NAME in names: - capabilities.append("fetch a specific URL") - if names - {WEB_SEARCH_TOOL_NAME, FETCH_TOOL_NAME}: - capabilities.append("look up the AssemblyAI documentation") - return capabilities + return ["search the web for current or unfamiliar facts"] + return [] def _extra_capability(extra_tools: Sequence[BaseTool]) -> str | None: @@ -128,24 +135,20 @@ def build_system_prompt( def build_live_tools() -> list[BaseTool]: - """The live agent's read-only toolset: URL fetch, web search (if keyed), and docs. - - All three are reused from the coding agent's tool modules. Unlike there they are - *not* approval-gated — a spoken turn can't wait for a keyboard confirmation, so the - live agent only gets read-only tools and runs them automatically. Web search is - present only when ``FIRECRAWL_API_KEY`` is set; the docs MCP is best-effort (an empty - list when the host is unreachable), so neither blocks a session. + """The live agent's single read-only tool: Firecrawl web search (only when keyed). + + Deliberately minimal. A low-latency spoken turn does best with one obvious tool rather + than a large menu it has to choose among — a big toolset made the model narrate "I'll + search…" without ever calling anything, and bloated every request with tool schemas. + Web search is the one capability worth the round-trip; everything else the agent answers + from its own knowledge. The tool is reused (un-approval-gated) from the coding agent and + is present only when ``FIRECRAWL_API_KEY`` is set, so an unkeyed session simply runs + tool-free. Extra tools remain strictly opt-in via ``--mcp-config``. """ - from aai_cli.code_agent.docs_mcp import load_docs_tools - from aai_cli.code_agent.fetch_tool import build_fetch_tool from aai_cli.code_agent.firecrawl_search import build_web_search_tool - tools: list[BaseTool] = [build_fetch_tool()] search = build_web_search_tool() - if search is not None: - tools.append(search) - tools.extend(load_docs_tools()) - return tools + return [search] if search is not None else [] def build_graph( @@ -184,69 +187,112 @@ def build_graph( def build_completer( api_key: str, config: CascadeConfig, *, graph: CompiledAgent | None = None -) -> Callable[[list[ChatCompletionMessageParam]], str]: +) -> Callable[..., str]: """A ``complete_reply`` for the cascade engine backed by the deepagents graph. The cascade prepends its own ``system`` message to the history each turn; the graph - already owns the system prompt, so we drop it before invoking. The graph runs the - full tool loop and we return its final spoken text. Under ``-v`` the loop is streamed - so each tool call/result is logged as it lands (see :func:`_run_graph`). ``graph`` is - injected in tests so the per-turn wiring runs against a fake with no network. + already owns the system prompt, so we drop it before invoking. The graph runs the full + tool loop and we return its final spoken text. ``on_tool`` (when given) is called with a + short label as each tool call lands, so the front-end can show a "Searching the web…" + affordance instead of sitting silent while the agent works; the loop is also streamed — + rather than ``invoke``-d — whenever a sink is wired or under ``-v`` (see :func:`_run_graph`). + ``graph`` is injected in tests so the per-turn wiring runs against a fake with no network. """ resolved = build_graph(api_key, config) if graph is None else graph - def complete_reply(messages: list[ChatCompletionMessageParam]) -> str: + def complete_reply( + messages: list[ChatCompletionMessageParam], + on_tool: Callable[[str], None] | None = None, + ) -> str: conversation = [message for message in messages if message.get("role") != "system"] - return _reply_text(_run_graph(resolved, conversation)) + return _reply_text(_run_graph(resolved, conversation, on_tool)) return complete_reply def _run_graph( - graph: CompiledAgent, conversation: list[ChatCompletionMessageParam] + graph: CompiledAgent, + conversation: list[ChatCompletionMessageParam], + on_tool: Callable[[str], None] | None = None, ) -> dict[str, object]: """Run one turn through the graph, returning its end state. - Normally a single ``invoke`` (the whole tool loop runs internally). Under verbose - mode, and when the graph can stream, drive it as incremental state snapshots instead - so :func:`_log_flow` can surface each tool call/result on stderr as it happens — which - is what makes a stalled spoken turn debuggable. The test fakes only implement - ``invoke``, so they (and the non-verbose path) take the plain branch. + Normally a single ``invoke`` (the whole tool loop runs internally). When a tool sink is + wired (the live UI's affordance) or under verbose mode, and the graph can stream, drive + it as incremental state snapshots instead so :func:`_log_flow` surfaces each tool call as + it happens. The test fakes only implement ``invoke``, so they (and the plain path with no + sink) take the invoke branch. """ - graph_input = {"messages": conversation} - if debuglog.active() and hasattr(graph, "stream"): + try: + return _drive_graph(graph, {"messages": conversation}, on_tool) + except CLIError: + raise + except Exception as exc: + # The graph can fail anywhere in the tool loop — a gateway 4xx/5xx, a tool raising, + # a langgraph recursion limit. Convert it to a CLIError so the cascade records and + # *surfaces* it (the engine shows it in the transcript) instead of the reply worker + # dying silently and the user getting no answer with no clue why. + raise CLIError( + f"the agent couldn't complete the turn: {exc}", error_type="agent_brain_error" + ) from exc + + +def _drive_graph( + graph: CompiledAgent, + graph_input: dict[str, object], + on_tool: Callable[[str], None] | None = None, +) -> dict[str, object]: + """Invoke the graph, or stream it (when a tool sink is wired or under ``-v``) so + :func:`_log_flow` can surface each tool call as it lands.""" + if (on_tool is not None or debuglog.active()) and hasattr(graph, "stream"): last: dict[str, object] = {} seen = 0 for chunk in graph.stream(graph_input, None, stream_mode="values"): - seen = _log_flow(chunk, seen) + seen = _log_flow(chunk, seen, on_tool) last = chunk return last return graph.invoke(graph_input) -def _log_flow(state: dict[str, object], seen: int) -> int: - """Log the tool calls/results added to ``state`` since the first ``seen`` messages. +def _log_flow( + state: dict[str, object], seen: int, on_tool: Callable[[str], None] | None = None +) -> int: + """Surface the tool calls/results added to ``state`` since the first ``seen`` messages. - Reuses the coding agent's message→event vocabulary so the flow log knows the same - AIMessage/ToolMessage shapes the TUI does. Returns the new high-water message count - so the next snapshot only logs what it added. + Feeds ``on_tool`` a speakable label as each tool call lands (the live UI's affordance) and, + under ``-v``, logs the call/result/interim line to stderr. Reuses the coding agent's + message→event vocabulary so it reads the same AIMessage/ToolMessage shapes the TUI does. + Returns the new high-water message count so the next snapshot only re-surfaces what it added. """ - from aai_cli.code_agent.events import AssistantText, ToolCall, ToolResult, message_events + from aai_cli.code_agent.events import message_events messages = state.get("messages") if not isinstance(messages, list): return seen + verbose = debuglog.active() for message in messages[seen:]: for event in message_events(message, announce_calls=True): - if isinstance(event, ToolCall): - _FLOW_LOG.info("tool call %s args=%s", event.name, event.args) - elif isinstance(event, ToolResult): - _FLOW_LOG.info("tool result %s -> %s", event.name, _clip(event.content)) - elif isinstance(event, AssistantText): - _FLOW_LOG.info("llm: %s", event.text) + _surface_event(event, on_tool, verbose=verbose) return len(messages) +def _surface_event(event: object, on_tool: Callable[[str], None] | None, *, verbose: bool) -> None: + """Surface one flow event: feed a tool call's label to ``on_tool``, and (under ``-v``) + log the call/result/interim line to stderr.""" + from aai_cli.code_agent.events import AssistantText, ToolCall, ToolResult + + if isinstance(event, ToolCall) and on_tool is not None: + on_tool(_tool_label(event.name)) + if not verbose: + return + if isinstance(event, ToolCall): + _FLOW_LOG.info("tool call %s args=%s", event.name, event.args) + elif isinstance(event, ToolResult): + _FLOW_LOG.info("tool result %s -> %s", event.name, _clip(event.content)) + elif isinstance(event, AssistantText): + _FLOW_LOG.info("llm: %s", event.text) + + def _clip(text: str) -> str: """Flatten a tool result onto one line and truncate it for the flow log. diff --git a/aai_cli/agent_cascade/config.py b/aai_cli/agent_cascade/config.py index efa5b43..bce18fc 100644 --- a/aai_cli/agent_cascade/config.py +++ b/aai_cli/agent_cascade/config.py @@ -13,10 +13,10 @@ from aai_cli.agent_cascade.voices import DEFAULT_VOICE from aai_cli.core import llm -# `assembly live` defaults to a capable gateway model (override with --model); kept a -# literal rather than llm.DEFAULT_MODEL so the live agent's default is independent of the -# one-shot `assembly llm` default. -DEFAULT_MODEL = "gpt-5.1" +# `assembly live` defaults to a fast, low-latency gateway model (override with --model) — +# a literal rather than llm.DEFAULT_MODEL so the live agent's default is independent of the +# one-shot `assembly llm` default. Latency matters most for a spoken back-and-forth. +DEFAULT_MODEL = "claude-haiku-4-5-20251001" DEFAULT_MAX_TOKENS = llm.DEFAULT_MAX_TOKENS # The realtime model the cascade transcribes with (same as the agent-cascade template). DEFAULT_SPEECH_MODEL = "u3-rt-pro" diff --git a/aai_cli/agent_cascade/engine.py b/aai_cli/agent_cascade/engine.py index af52f15..60c1087 100644 --- a/aai_cli/agent_cascade/engine.py +++ b/aai_cli/agent_cascade/engine.py @@ -58,6 +58,9 @@ def user_partial(self, text: str) -> None: def user_final(self, text: str) -> None: """Show a finalized user transcript.""" + def tool_call(self, label: str) -> None: + """Show that the agent is using a tool (e.g. "Searching the web") while it thinks.""" + def reply_started(self) -> None: """Mark the start of an agent reply.""" @@ -106,7 +109,9 @@ class CascadeDeps: """ run_stt: Callable[[Callable[[object], None]], None] - complete_reply: Callable[[list[ChatCompletionMessageParam]], str] + # complete_reply(messages, on_tool=None) -> spoken text; on_tool is fed a label per tool + # call so the front-end can show a "Searching the web…" affordance (brain.build_completer). + complete_reply: Callable[..., str] synthesize: Callable[[str], bytes] spawn: Callable[[Callable[[], None]], _Worker] = _spawn_thread @@ -194,6 +199,23 @@ def _barge_in(self) -> None: self.player.flush() self._join_reply() + def interrupt_reply(self) -> bool: + """Signal an in-flight reply to stop, without waiting for it; True if one was playing. + + The UI-thread-safe counterpart to a spoken barge-in: the live TUI's Escape/Ctrl-C + calls this to silence the agent mid-reply without the user having to talk over it. + Flushing the queued audio stops speech at once; the reply worker then sees the stop + flag, unwinds on its own, and emits ``reply_done`` so the front-end returns to + listening (the STT loop keeps running, so the next spoken turn is handled normally). + It deliberately does *not* join the worker — a join from the UI thread would deadlock + against the worker's own ``call_from_thread`` render hops. + """ + playing = self._reply is not None and self._reply.is_alive() + if playing: + self._stop.set() + self.player.flush() + return playing + def _join_reply(self) -> None: """Wait for the current reply worker (if any) to unwind, then drop the handle.""" worker = self._reply @@ -213,9 +235,15 @@ def _generate_reply(self) -> None: *self.history, ] try: - reply = self.deps.complete_reply(messages) + reply = self.deps.complete_reply(messages, on_tool=self.renderer.tool_call) except CLIError as exc: + # The reply leg failed (gateway/tool/graph error, now converted to a CLIError in + # brain._run_graph). Show it in the transcript so the turn doesn't just vanish — + # the user sees *why* there was no answer instead of silence. self._record_error(exc) + self.renderer.reply_started() + self.renderer.agent_transcript(f"(error: {exc.message})", interrupted=False) + self.renderer.reply_done(interrupted=False) return self.renderer.reply_started() spoken: list[str] = [] @@ -264,14 +292,24 @@ def _is_final_turn(event: object, *, format_turns: bool) -> bool: def run_cascade( - *, renderer: Renderer, player: Player, config: CascadeConfig, deps: CascadeDeps + *, + renderer: Renderer, + player: Player, + config: CascadeConfig, + deps: CascadeDeps, + on_session: Callable[[CascadeSession], None] | None = None, ) -> None: """Run one terminal cascade conversation until STT closes or the user stops. Greets, then pumps STT turns through the LLM+TTS reply path. A recorded leg - failure is re-raised here so the command exits with the right code. + failure is re-raised here so the command exits with the right code. ``on_session`` is + handed the freshly built session before the conversation starts, so a front-end (the + live TUI) can grab a handle to it — e.g. to wire a keyboard interrupt to + :meth:`CascadeSession.interrupt_reply`. """ session = CascadeSession(deps=deps, renderer=renderer, player=player, config=config) + if on_session is not None: + on_session(session) player.start() try: session.greet() diff --git a/aai_cli/agent_cascade/mcp_tools.py b/aai_cli/agent_cascade/mcp_tools.py index 1086f94..a864a94 100644 --- a/aai_cli/agent_cascade/mcp_tools.py +++ b/aai_cli/agent_cascade/mcp_tools.py @@ -3,16 +3,13 @@ The live voice agent's brain is a deepagents graph, so any Model Context Protocol server's tools can be threaded into it through ``langchain-mcp-adapters`` — the same adapter `docs_mcp.py` uses for the hosted AssemblyAI docs. This lets a spoken -conversation reach real tools (clock, weather, memory, a notes folder, …), bringing -`assembly live` toward Gemini-Live / ChatGPT-voice parity. +conversation reach real tools (a clock, a notes folder, …), bringing `assembly live` +toward Gemini-Live / ChatGPT-voice parity. -Two entry points feed the brain: - -- :func:`default_servers` returns a curated, zero/low-auth set (time, fetch, memory, - filesystem, weather) that every live session loads out of the box. -- :func:`parse_mcp_config` reads one or more standard ``mcpServers`` JSON files — the - exact shape Claude Desktop / Claude Code use — so an existing config drops in - unchanged and can extend or override the defaults. +The live agent ships with only its built-in Firecrawl web-search tool; MCP servers are +**strictly opt-in** (a low-latency spoken turn does best with a small toolset). +:func:`parse_mcp_config` reads one or more standard ``mcpServers`` JSON files — the exact +shape Claude Desktop / Claude Code use — so an existing config drops in unchanged. Launching a server is **best-effort per server**: a missing ``npx``/``uvx`` or an offline run skips that one server (the others still load) rather than aborting the @@ -42,26 +39,6 @@ Loader = Callable[[str, "Connection"], "list[BaseTool]"] -def default_servers(filesystem_root: Path) -> dict[str, ServerSpec]: - """The curated server set every live session loads: zero/low-auth, fast, speakable. - - Every entry is a published reference server runnable with no API key: - ``time``/``fetch`` over ``uvx`` (PyPI), ``memory``/``filesystem`` over ``npx`` (npm), - and an NWS-backed ``weather`` server. ``filesystem`` is rooted at ``filesystem_root`` - (the working directory) so "summarize my notes file" stays scoped to one folder. - """ - return { - "time": {"command": "uvx", "args": ["mcp-server-time"]}, - "fetch": {"command": "uvx", "args": ["mcp-server-fetch"]}, - "memory": {"command": "npx", "args": ["-y", "@modelcontextprotocol/server-memory"]}, - "filesystem": { - "command": "npx", - "args": ["-y", "@modelcontextprotocol/server-filesystem", str(filesystem_root)], - }, - "weather": {"command": "npx", "args": ["-y", "@h1deya/mcp-server-weather"]}, - } - - def parse_mcp_config(paths: Sequence[Path]) -> dict[str, ServerSpec]: """Merge the ``mcpServers`` maps from one or more standard MCP config JSON files. @@ -112,13 +89,23 @@ def _to_connection(spec: ServerSpec) -> Connection: return {"transport": "stdio", "command": str(spec["command"]), "args": args, "env": env} +# A server that hasn't listed its tools within this window is skipped, so a slow or hung +# MCP server (npx/uvx cold-start, an unreachable host) can't block `assembly live` startup. +_LOAD_TIMEOUT_S = 15.0 # pragma: no mutate — a tuning knob; ±a few seconds is equivalent + + def _load_server(name: str, conn: Connection) -> list[BaseTool]: - """Connect to one MCP server and return its tools (drives the async adapter).""" + """Connect to one MCP server and return its tools, bounded by :data:`_LOAD_TIMEOUT_S`. + + The timeout is what keeps a slow/hung server from hanging startup forever — on timeout + the fetch is cancelled, ``asyncio.run`` raises ``TimeoutError``, and :func:`_safe_load` + turns that into an empty toolset (the server is simply skipped). + """ from langchain_mcp_adapters.client import MultiServerMCPClient async def _fetch() -> list[BaseTool]: client = MultiServerMCPClient({name: conn}) - return await client.get_tools() + return await asyncio.wait_for(client.get_tools(), timeout=_LOAD_TIMEOUT_S) return asyncio.run(_fetch()) diff --git a/aai_cli/agent_cascade/tui.py b/aai_cli/agent_cascade/tui.py index 90607d1..007c73d 100644 --- a/aai_cli/agent_cascade/tui.py +++ b/aai_cli/agent_cascade/tui.py @@ -35,8 +35,8 @@ # Splash intro copy (the code agent's banner copy is code-specific, so `live` carries its own). _READY_LINE = "Listening… start talking when you're ready." _TIP_LINE = "Use headphones — the mic stays open while the agent speaks." -# The one-line footer: a hands-free session, so the only control is quit. -_STATUS_LINE = "Ctrl-C to quit" +# The one-line footer: a hands-free session, so the controls are interrupt-and-quit. +_STATUS_LINE = "Esc/Ctrl-C to interrupt · Ctrl-Q to quit" class _TuiRenderer: @@ -59,6 +59,9 @@ def user_partial(self, text: str) -> None: def user_final(self, text: str) -> None: self._dispatch(self._app.show_user_final, text) + def tool_call(self, label: str) -> None: + self._dispatch(self._app.show_tool_call, label) + def reply_started(self) -> None: self._dispatch(self._app.begin_reply) @@ -89,13 +92,19 @@ class LiveAgentApp(App[None]): #voicebar {{ dock: bottom; height: 3; background: #000000; border: round {banner.BRAND_HEX}; margin: 1 1; content-align: center middle; }} #status {{ dock: bottom; height: 1; background: #000000; padding: 0 1; }} + /* Blank line above each agent reply (and the greeting), so turns don't run together. */ + AssistantMessage {{ margin-top: 1; }} """ TITLE = "AssemblyAI Live" ENABLE_COMMAND_PALETTE = False - # Ctrl-C / Ctrl-Q both stop the session; there is no turn to interrupt and nothing to type, - # so a single press quits (closing the audio unblocks the cascade worker). + # Escape and Ctrl-C interrupt a playing reply (silence it and drop back to listening), + # the same as talking over the agent — so you can stop a long answer without speaking. + # When nothing is speaking, Ctrl-C quits; Ctrl-Q always quits (the guaranteed escape + # hatch, so a stuck reply can never trap the session). Quitting closes the audio, which + # unblocks the cascade worker. BINDINGS: ClassVar = [ - ("ctrl+c", "stop", "Quit"), + ("escape", "interrupt", "Interrupt"), + ("ctrl+c", "interrupt_or_quit", "Interrupt / Quit"), ("ctrl+q", "stop", "Quit"), ] @@ -110,6 +119,9 @@ def __init__( self._run_conversation = run_conversation # blocking; runs the cascade given a Renderer self._on_stop = on_stop # closes the audio so a quit unblocks the cascade worker self._web_note = web_note + # The cascade's reply-interrupt, wired once its session exists (see set_interrupt); + # None until then, so an early keypress is a harmless no-op. + self._interrupt: Callable[[], bool] | None = None self._voice_phase = "listening" self._voice_frames = itertools.cycle(tui_status.VOICE_FRAMES) self._voice_timer: Timer | None = None @@ -179,6 +191,15 @@ def show_user_final(self, text: str) -> None: self._set_phase("thinking") self._scroll_end() + def show_tool_call(self, label: str) -> None: + """Surface the agent's tool use inline as it happens (the live tool affordance). + + A spoken turn that pauses to use a tool would otherwise sit silent on "thinking…"; + this drops a dim "Searching the web…" line so the wait reads as progress, not a hang. + """ + self._mount(Note(f"{label}…")) + self._scroll_end() + def begin_reply(self) -> None: """Open a fresh reply widget the agent's sentences stream into; switch to speaking.""" self._set_phase("speaking") @@ -241,10 +262,35 @@ def _mount(self, widget: Static) -> None: def _scroll_end(self) -> None: self.query_one("#log", VerticalScroll).scroll_end(animate=False) # pragma: no mutate - # --- quit ----------------------------------------------------------------- + # --- interrupt / quit ----------------------------------------------------- + + def set_interrupt(self, interrupt: Callable[[], bool]) -> None: + """Wire the session's reply-interrupt once the cascade has built its session. + + Called from the cascade worker thread (via ``run_cascade``'s ``on_session``); it only + stores a callable reference, so no UI hop is needed. + """ + self._interrupt = interrupt + + def action_interrupt(self) -> None: + """Escape: silence a playing reply and return to listening (a no-op when idle).""" + self._do_interrupt() + + def action_interrupt_or_quit(self) -> None: + """Ctrl-C: silence a playing reply and keep listening; quit when nothing is speaking.""" + if not self._do_interrupt(): + self.action_stop() + + def _do_interrupt(self) -> bool: + """Fire the session's reply-interrupt; True if a reply was playing. + + The reply worker then unwinds and emits ``reply_done``, so the renderer is what + returns the voice bar to listening — this only has to signal the stop. + """ + return self._interrupt is not None and self._interrupt() def action_stop(self) -> None: - """Ctrl-C / Ctrl-Q: stop the audio (unblocking the cascade worker) and exit.""" + """Ctrl-Q (or Ctrl-C when idle): stop the audio (unblocking the worker) and exit.""" self._teardown() self.exit() diff --git a/aai_cli/code_agent/firecrawl_search.py b/aai_cli/code_agent/firecrawl_search.py index e66be97..6358e98 100644 --- a/aai_cli/code_agent/firecrawl_search.py +++ b/aai_cli/code_agent/firecrawl_search.py @@ -11,6 +11,7 @@ from __future__ import annotations +import warnings from typing import TYPE_CHECKING from aai_cli.core import env @@ -32,6 +33,13 @@ def build_web_search_tool() -> BaseTool | None: if not env.get(FIRECRAWL_API_KEY_ENV): return None - from langchain_firecrawl import FirecrawlSearch + with warnings.catch_warnings(): + # firecrawl-py's pydantic models name fields ``json``/``schema``, which shadow + # BaseModel attributes and emit noisy UserWarnings on import. They're harmless and + # out of our control, so silence them at runtime (pytest filters them via pyproject). + warnings.filterwarnings( + "ignore", message="Field name .* shadows an attribute", category=UserWarning + ) + from langchain_firecrawl import FirecrawlSearch return FirecrawlSearch() diff --git a/aai_cli/code_agent/modals.py b/aai_cli/code_agent/modals.py index 25c54a7..4e21a97 100644 --- a/aai_cli/code_agent/modals.py +++ b/aai_cli/code_agent/modals.py @@ -76,6 +76,8 @@ class ApprovalScreen(ModalScreen[str]): ("a", "auto", "Auto-approve"), ("n", "reject", "Reject"), ("e", "expand", "Expand"), + # Escape / Ctrl-C dismiss the modal — declining the tool is the safe cancel. + ("escape,ctrl+c", "reject", "Cancel"), ] def __init__( @@ -165,6 +167,8 @@ class AskScreen(ModalScreen[str]): border: round #3a3f55; background: #000000; padding: 0 1; margin: 0 1 1 1; } """ + # Escape / Ctrl-C dismiss the question with no answer. + BINDINGS: ClassVar = [("escape,ctrl+c", "cancel", "Cancel")] def __init__(self, question: str, *, voice: _VoiceIO | None = None) -> None: super().__init__() @@ -198,5 +202,9 @@ def _answer(self, text: str) -> None: self._answered = True self.dismiss(text) + def action_cancel(self) -> None: + """Escape / Ctrl-C: dismiss with no answer (the agent gets an empty reply).""" + self._answer("") + def on_input_submitted(self, event: Input.Submitted) -> None: self._answer(event.value) diff --git a/aai_cli/code_agent/model.py b/aai_cli/code_agent/model.py index 4e1c556..f2b7e50 100644 --- a/aai_cli/code_agent/model.py +++ b/aai_cli/code_agent/model.py @@ -9,7 +9,7 @@ from __future__ import annotations import json -from collections.abc import Mapping +from collections.abc import Iterable, Mapping from typing import TYPE_CHECKING from aai_cli.core import environments @@ -158,6 +158,61 @@ def _is_empty_arguments(arguments: object) -> bool: return isinstance(parsed, dict) and not parsed +# JSON-Schema keywords some gateway-routed models reject on tool definitions. OpenAI ignores +# them, but Gemini's ``function_declarations`` 400 on them ("Unknown name …"), which kills any +# tool-bound turn. These are all validation/metadata keywords — stripping them leaves the +# structural schema (type/properties/items/required/enum/anyOf/description/…) the model needs +# to call the tool, so the call still works; only the unenforced constraints are dropped. +_UNSUPPORTED_SCHEMA_KEYS = ( + "$schema", + "$id", + "$comment", + "title", + "default", + "examples", + "const", + "additionalProperties", + "unevaluatedProperties", + "patternProperties", + "minProperties", + "maxProperties", + "propertyNames", + "exclusiveMinimum", + "exclusiveMaximum", + "multipleOf", + "additionalItems", + "unevaluatedItems", + "contains", +) + + +def _sanitize_tool_schemas(payload: object) -> None: + """Strip model-incompatible JSON-Schema keys from each tool's ``parameters``, in place.""" + if not isinstance(payload, dict): + return + tools = payload.get("tools") + if not isinstance(tools, list): + return + for tool in tools: + function = tool.get("function") if isinstance(tool, dict) else None + if isinstance(function, dict): + _strip_schema_keys(function.get("parameters")) + + +def _strip_schema_keys(node: object) -> None: + """Recursively drop :data:`_UNSUPPORTED_SCHEMA_KEYS` from a JSON-Schema-shaped structure.""" + if isinstance(node, dict): + for key in _UNSUPPORTED_SCHEMA_KEYS: + node.pop(key, None) + children: Iterable[object] = list(node.values()) + elif isinstance(node, list): + children = node + else: + return + for child in children: + _strip_schema_keys(child) + + def build_model( api_key: str, *, @@ -201,6 +256,7 @@ def _get_request_payload( messages = payload.get("messages") _flatten_content(messages) _ensure_tool_call_arguments(messages) + _sanitize_tool_schemas(payload) return payload def _convert_chunk_to_generation_chunk( diff --git a/aai_cli/code_agent/prompt.py b/aai_cli/code_agent/prompt.py index 4e2a7ef..c704607 100644 --- a/aai_cli/code_agent/prompt.py +++ b/aai_cli/code_agent/prompt.py @@ -29,8 +29,12 @@ for API/SDK questions, and web search for anything else. Prefer the docs for AssemblyAI specifics. -Be concise. Make focused edits, explain what you changed, and run commands to verify -your work when it helps. Stop and ask before destructive or far-reaching actions.\ +Be concise — and especially so out loud. Your prose is read aloud by a text-to-speech +engine, so keep replies to a sentence or two of plain, simple spoken language: no +markdown, lists, symbols, URLs, or code in the prose. Put any code in fenced code blocks +(the readback skips them). Make focused edits, briefly say what you changed, and run +commands to verify your work when it helps. Stop and ask before destructive or +far-reaching actions.\ """ diff --git a/aai_cli/code_agent/tui.py b/aai_cli/code_agent/tui.py index 5d5119d..099c64c 100644 --- a/aai_cli/code_agent/tui.py +++ b/aai_cli/code_agent/tui.py @@ -388,23 +388,23 @@ def _cancel_turn(self) -> bool: self._note("cancelling…") return True - def _stop_voice_activity(self) -> bool: - """Stop in-flight voice listening/readback and go idle; True if voice was active. - - In voice mode the agent is usually listening or reading a reply back — neither is a - "running turn", so without this an interrupt key would skip straight to the quit hint. - This cancels the active leg, pauses voice (the text prompt returns, no auto re-listen), - and refreshes the UI, so a first Ctrl-C/Escape gives immediate feedback. Once paused - ``_voice_active`` is False, so a second press falls through to the quit path. + def _stop_voice_activity(self) -> None: + """Stop in-flight voice (a no-op when none is active). + + Interrupting the readback (speaking) stops it and resumes listening — the cancelled + speak() returns and the loop captures the next turn. Interrupting while listening + pauses voice to the text prompt, after which a second press falls through to quit. """ if self._voice is None or not self._voice_active(): - return False + return self._voice.cancel() + if self._voice_phase == "speaking": # stop talking, stay in voice mode -> re-listen + self._note("stopped — listening…") + return self._voice_paused = True self._refresh_status() self._sync_input_mode() # active leg stopped -> bring the text prompt back self._note("voice interrupted (Ctrl-V to talk again)") - return True def action_interrupt(self) -> None: """Escape: interrupt a running agent turn or in-flight voice (a no-op when idle).""" @@ -416,13 +416,13 @@ def action_quit_or_interrupt(self) -> None: if self._cancel_turn(): self._quit_pending = False return - if self._stop_voice_activity(): - self._arm_quit_pending() # idle now; a second Ctrl-C confirms the quit - return + # A second press always quits — checked before stopping voice so a spoken turn can + # never trap you (the first press stops the readback and arms; the second exits). if self._quit_pending: self.exit() - else: - self._arm_quit_pending() + return + self._stop_voice_activity() # stop a readback/listen if one's active (a no-op otherwise) + self._arm_quit_pending() def _arm_quit_pending(self) -> None: """Arm Ctrl-C double-press-to-quit, showing a hint that expires after a few seconds.""" @@ -481,8 +481,7 @@ def _stop_spinner(self) -> None: self.query_one("#spinner", Static).display = False def on_worker_state_changed(self, event: Worker.StateChanged) -> None: - # Guard on is_running: a worker finishing *after* the app tears down (quit / test exit) - # would drive _finish_turn against an unmounted DOM — NoMatches on "#spinner", a flake. + # is_running guard: a worker finishing after teardown would hit an unmounted DOM. if event.worker.is_finished and self.is_running: self._finish_turn() diff --git a/aai_cli/commands/agent_cascade/__init__.py b/aai_cli/commands/agent_cascade/__init__.py index 97fcb8f..afa46c3 100644 --- a/aai_cli/commands/agent_cascade/__init__.py +++ b/aai_cli/commands/agent_cascade/__init__.py @@ -58,7 +58,7 @@ def _emit_voice_list(_state: AppState, json_mode: bool) -> None: 'assembly --sandbox live --system-prompt "You are a terse pirate."', ), ( - "Add your own MCP servers on top of the defaults", + "Add your own MCP servers (none load by default)", "assembly --sandbox live --mcp-config ~/.config/mcp/servers.json", ), ("See available voices", "assembly --sandbox live --list-voices"), @@ -162,7 +162,7 @@ def live( mcp_config: list[Path] | None = typer.Option( None, "--mcp-config", - help='Extra MCP servers config JSON ({"mcpServers": {…}}) on top of the defaults (repeatable)', + help='MCP servers config JSON ({"mcpServers": {…}}) to add (repeatable; none load by default)', exists=True, dir_okay=False, rich_help_panel=_PANEL_TOOLS, @@ -200,12 +200,12 @@ def live( This only runs a conversation in the terminal — it writes no code. To build an agent-cascade app, run 'assembly init agent-cascade' instead. - By default the agent loads a curated, no-auth MCP toolset (time, fetch, - memory, filesystem, weather) alongside its built-in URL fetch and AssemblyAI - docs. Firecrawl web search also loads when a FIRECRAWL_API_KEY is set (you'll - get a one-line notice when it isn't). Add your own servers with --mcp-config, - pointing at any standard mcpServers JSON file. A server that won't start is - skipped, so one broken tool never sinks the session. + The agent keeps a deliberately small toolset for low-latency spoken turns: its + one built-in tool is Firecrawl web search, which loads when a FIRECRAWL_API_KEY + is set (you'll get a one-line notice when it isn't). Add your own MCP servers + with --mcp-config, pointing at any standard mcpServers JSON file (none load by + default). A server that won't start is skipped, so one broken tool never sinks + the session. """ if list_voices: diff --git a/aai_cli/commands/agent_cascade/_exec.py b/aai_cli/commands/agent_cascade/_exec.py index 7a7df44..5d43caa 100644 --- a/aai_cli/commands/agent_cascade/_exec.py +++ b/aai_cli/commands/agent_cascade/_exec.py @@ -74,7 +74,7 @@ class AgentCascadeOptions: # Text-to-speech: language named, any other query param via --tts-config. language: str | None tts_config: tuple[str, ...] - # Tools: extra standard mcpServers JSON config files, on top of the default set. + # Tools: opt-in standard mcpServers JSON config files (none load by default). mcp_config: tuple[Path, ...] # Print the equivalent Python instead of running a conversation. show_code: bool @@ -123,8 +123,9 @@ def _parse_tts_config(pairs: tuple[str, ...]) -> dict[str, str]: def _web_search_note() -> str | None: """The "web search is off" notice when no ``FIRECRAWL_API_KEY`` enables it, else ``None``. - The other default tools (URL fetch, AssemblyAI docs, and the MCP servers) need no - key; only Firecrawl web search does, so its absence is the one worth flagging up front. + Web search (Firecrawl) is the live agent's one built-in tool and the only one needing a + key, so its absence — which leaves the agent answering from its own knowledge alone — is + worth flagging up front. """ if env.get(firecrawl_search.FIRECRAWL_API_KEY_ENV): return None @@ -139,15 +140,13 @@ def _warn_without_web_search(*, json_mode: bool) -> None: def _resolve_mcp_servers(mcp_config: tuple[Path, ...]) -> dict[str, Mapping[str, object]]: - """The MCP servers for this run: the curated default set overlaid with any --mcp-config - files, so an explicit config can extend the defaults or override one by name. + """The MCP servers for this run: only those from ``--mcp-config`` files (none by default). - The default filesystem server is rooted at the working directory, scoping its file - access to one folder. + The live agent ships with just its Firecrawl web-search tool; extra MCP servers are + strictly opt-in, so a low-latency spoken turn isn't handed a large tool menu it has to + choose among. """ - servers: dict[str, Mapping[str, object]] = dict(mcp_tools.default_servers(Path.cwd())) - servers.update(mcp_tools.parse_mcp_config(mcp_config)) - return servers + return dict(mcp_tools.parse_mcp_config(mcp_config)) def _open_audio( @@ -230,13 +229,36 @@ def _run_live_tui(api_key: str, opts: AgentCascadeOptions, config: CascadeConfig deps = engine.CascadeDeps.real(api_key, config, audio=duplex.mic, stt_params=stt_params) def run_conversation(renderer: engine.Renderer) -> None: - engine.run_cascade(renderer=renderer, player=duplex.player, config=config, deps=deps) + # Hand the app the session's reply-interrupt so Escape/Ctrl-C can silence a reply + # mid-sentence and drop back to listening (the session is built inside run_cascade). + engine.run_cascade( + renderer=renderer, + player=duplex.player, + config=config, + deps=deps, + on_session=lambda session: app.set_interrupt(session.interrupt_reply), + ) - LiveAgentApp( + app = LiveAgentApp( run_conversation=run_conversation, on_stop=duplex.close, web_note=_web_search_note(), - ).run(mouse=False) + ) + app.run(mouse=False) + + +def _launch_tui(api_key: str, opts: AgentCascadeOptions, config: CascadeConfig) -> None: + """Run the voice-only TUI, mapping a setup-time Ctrl-C to a clean exit. + + A Ctrl-C during setup — opening the mic, building the graph, loading ``--mcp-config`` + servers — lands before Textual captures the keyboard, so it surfaces as a plain + ``KeyboardInterrupt`` here. Map it to exit 130 (cancel) rather than letting it dump a + half-initialized asyncio/threading traceback. + """ + try: + _run_live_tui(api_key, opts, config) + except KeyboardInterrupt: + raise typer.Exit(code=errors.CANCELLED_EXIT_CODE) from None def run_agent_cascade(opts: AgentCascadeOptions, state: AppState, *, json_mode: bool) -> None: @@ -282,7 +304,7 @@ def run_agent_cascade(opts: AgentCascadeOptions, state: AppState, *, json_mode: if _should_use_tui(from_file=from_file, json_mode=json_mode, text_mode=text_mode): # The voice-only Textual front-end surfaces the web-search note in-app, not on stderr. - _run_live_tui(api_key, opts, config) + _launch_tui(api_key, opts, config) return _warn_without_web_search(json_mode=json_mode) diff --git a/aai_cli/core/llm.py b/aai_cli/core/llm.py index b147e89..f3009a0 100644 --- a/aai_cli/core/llm.py +++ b/aai_cli/core/llm.py @@ -23,18 +23,40 @@ # is supplied. Must be exactly "{{ transcript }}" (spaces included). TRANSCRIPT_TAG = "{{ transcript }}" -# A curated subset for `assembly llm --list-models` and help text. The gateway is the -# source of truth for what's actually accepted, so we don't validate against this. +# The known model ids surfaced by `assembly llm --list-models`, help text, and shell +# completion, grouped by provider. The gateway is the source of truth for what's +# actually accepted, so we don't validate against this — a newer id works even before +# it lands here. KNOWN_MODELS = ( + # Anthropic "claude-opus-4-7", + "claude-opus-4-6", + "claude-opus-4-5-20251101", "claude-sonnet-4-6", + "claude-sonnet-4-5-20250929", "claude-haiku-4-5-20251001", + # OpenAI + "gpt-5.5", + "gpt-5.2", "gpt-5.1", "gpt-5", + "gpt-5-mini", + "gpt-5-nano", "gpt-4.1", + "gpt-oss-120b", + "gpt-oss-20b", + # Google + "gemini-3.5-flash", + "gemini-3-flash-preview", + "gemini-3.1-flash-lite-preview", "gemini-2.5-pro", "gemini-2.5-flash", "gemini-2.5-flash-lite", + # Moonshot AI + "kimi-k2.5", + # Alibaba Cloud + "qwen3-next-80b-a3b", + "qwen3-32B", ) diff --git a/tests/__snapshots__/test_snapshots_help_run.ambr b/tests/__snapshots__/test_snapshots_help_run.ambr index c2334d9..96cca87 100644 --- a/tests/__snapshots__/test_snapshots_help_run.ambr +++ b/tests/__snapshots__/test_snapshots_help_run.ambr @@ -617,12 +617,13 @@ This only runs a conversation in the terminal — it writes no code. To build an agent-cascade app, run 'assembly init agent-cascade' instead. - By default the agent loads a curated, no-auth MCP toolset (time, fetch, - memory, filesystem, weather) alongside its built-in URL fetch and AssemblyAI - docs. Firecrawl web search also loads when a FIRECRAWL_API_KEY is set (you'll - get a one-line notice when it isn't). Add your own servers with --mcp-config, - pointing at any standard mcpServers JSON file. A server that won't start is - skipped, so one broken tool never sinks the session. + The agent keeps a deliberately small toolset for low-latency spoken turns: its + one built-in tool is Firecrawl web search, which loads when a + FIRECRAWL_API_KEY + is set (you'll get a one-line notice when it isn't). Add your own MCP servers + with --mcp-config, pointing at any standard mcpServers JSON file (none load by + default). A server that won't start is skipped, so one broken tool never sinks + the session. ╭─ Arguments ──────────────────────────────────────────────────────────────────╮ │ source [SOURCE] Audio file path or URL to speak to the agent. Omit │ @@ -667,7 +668,8 @@ ╭─ Language model ─────────────────────────────────────────────────────────────╮ │ --model TEXT LLM Gateway model that powers the │ │ agent's replies │ - │ [default: gpt-5.1] │ + │ [default: │ + │ claude-haiku-4-5-20251001] │ │ --max-tokens INTEGER RANGE [x>=1] Max tokens per reply │ │ [default: 8192] │ │ --llm-config TEXT Set any LLM Gateway request field │ @@ -696,8 +698,8 @@ │ streaming fields │ ╰──────────────────────────────────────────────────────────────────────────────╯ ╭─ Tools ──────────────────────────────────────────────────────────────────────╮ - │ --mcp-config FILE Extra MCP servers config JSON ({"mcpServers": │ - │ {…}}) on top of the defaults (repeatable) │ + │ --mcp-config FILE MCP servers config JSON ({"mcpServers": {…}}) to │ + │ add (repeatable; none load by default) │ ╰──────────────────────────────────────────────────────────────────────────────╯ Examples @@ -707,7 +709,7 @@ $ assembly --sandbox live --voice michael --greeting "Hi there" Give the agent a persona $ assembly --sandbox live --system-prompt "You are a terse pirate." - Add your own MCP servers on top of the defaults + Add your own MCP servers (none load by default) $ assembly --sandbox live --mcp-config ~/.config/mcp/servers.json See available voices $ assembly --sandbox live --list-voices diff --git a/tests/_cascade_fakes.py b/tests/_cascade_fakes.py new file mode 100644 index 0000000..4985cf4 --- /dev/null +++ b/tests/_cascade_fakes.py @@ -0,0 +1,105 @@ +"""Shared fakes for the `assembly live` cascade tests (engine/command/TUI). + +The cascade's three network legs and its thread spawner are injected through +``CascadeDeps``, so the suites drive the orchestration against these fakes — no +sockets, mic, or speaker. Kept in one module so the engine, command, and TUI tests +share one set of doubles (and so no single test file grows past the 500-line gate). +""" + +from __future__ import annotations + +import types + +from aai_cli.agent_cascade.config import CascadeConfig +from aai_cli.agent_cascade.engine import CascadeDeps, CascadeSession + + +class FakeRenderer: + def __init__(self): + self.calls = [] + + def connected(self): + self.calls.append(("connected",)) + + def user_partial(self, text): + self.calls.append(("user_partial", text)) + + def user_final(self, text): + self.calls.append(("user_final", text)) + + def tool_call(self, label): + self.calls.append(("tool_call", label)) + + def reply_started(self): + self.calls.append(("reply_started",)) + + def agent_transcript(self, text, *, interrupted): + self.calls.append(("agent_transcript", text, interrupted)) + + def reply_done(self, *, interrupted): + self.calls.append(("reply_done", interrupted)) + + +class FakePlayer: + def __init__(self): + self.enqueued = [] + self.flushed = 0 + self.started = False + self.closed = False + + def start(self): + self.started = True + + def enqueue(self, pcm): + self.enqueued.append(pcm) + + def flush(self): + self.flushed += 1 + + def close(self): + self.closed = True + + +class FakeWorker: + def __init__(self, *, alive): + self._alive = alive + self.joined = 0 + + def is_alive(self): + return self._alive + + def join(self): + self.joined += 1 + self._alive = False + + +def sync_spawn(target): + """Run the reply body inline and hand back a finished worker, so the cascade is + driven deterministically without real threads.""" + target() + return FakeWorker(alive=False) + + +def turn(text, *, end_of_turn=True, turn_is_formatted=True): + return types.SimpleNamespace( + transcript=text, end_of_turn=end_of_turn, turn_is_formatted=turn_is_formatted + ) + + +def make_session( + *, + complete_reply=lambda messages, on_tool=None: "Hello there.", + synthesize=lambda text: b"pcm:" + text.encode(), + spawn=sync_spawn, + run_stt=lambda on_turn: None, + config=None, +): + deps = CascadeDeps( + run_stt=run_stt, complete_reply=complete_reply, synthesize=synthesize, spawn=spawn + ) + renderer = FakeRenderer() + player = FakePlayer() + session = CascadeSession( + deps=deps, renderer=renderer, player=player, config=config or CascadeConfig() + ) + return session, renderer, player diff --git a/tests/test_agent_cascade_brain.py b/tests/test_agent_cascade_brain.py index d8bee24..cf00351 100644 --- a/tests/test_agent_cascade_brain.py +++ b/tests/test_agent_cascade_brain.py @@ -10,6 +10,7 @@ import logging +import pytest from langchain_core.language_models.chat_models import BaseChatModel from langchain_core.messages import AIMessage, ToolMessage from langchain_core.outputs import ChatGeneration, ChatResult @@ -17,6 +18,7 @@ from aai_cli.agent_cascade import brain from aai_cli.agent_cascade.config import CascadeConfig from aai_cli.code_agent import model as model_mod +from aai_cli.core.errors import CLIError class FakeChatModel(BaseChatModel): @@ -56,33 +58,22 @@ def __init__(self, name: str): self.name = name -def test_system_prompt_appends_tool_guidance_for_present_tools(): +def test_system_prompt_advertises_web_search_when_present(): prompt = brain.build_system_prompt( - "You are a pirate.", - tools=[ - _NamedTool(brain.WEB_SEARCH_TOOL_NAME), - _NamedTool("fetch_url"), - _NamedTool("docs_search"), - ], + "You are a pirate.", tools=[_NamedTool(brain.WEB_SEARCH_TOOL_NAME)] ) - # The persona is preserved, and the guidance advertises each capability that a present - # tool backs (the plain cascade persona never mentions tools). + # The persona is preserved, and the guidance advertises the web-search capability the + # present tool backs (the plain cascade persona never mentions tools). assert prompt.startswith("You are a pirate.") assert "search the web" in prompt - assert "fetch a specific URL" in prompt - assert "AssemblyAI documentation" in prompt -def test_system_prompt_omits_web_search_when_no_search_tool(): - # With no TAVILY_API_KEY the search tool is absent — the guidance must NOT promise web - # search, since announcing a missing tool makes the agent narrate "I'll search…" and - # then stall with no answer. The capabilities it *does* have still appear. - prompt = brain.build_system_prompt( - "persona", tools=[_NamedTool("fetch_url"), _NamedTool("docs_search")] - ) +def test_system_prompt_omits_web_search_when_search_tool_absent(): + # Without the Firecrawl search tool the guidance must NOT promise web search — announcing + # a missing tool makes the agent narrate "I'll search…" and then stall with no answer. A + # non-search tool name must not falsely trigger the web-search capability. + prompt = brain.build_system_prompt("persona", tools=[_NamedTool("some_other_tool")]) assert "search the web for current or unfamiliar facts" not in prompt - assert "fetch a specific URL" in prompt - assert "AssemblyAI documentation" in prompt def test_system_prompt_tells_model_not_to_promise_tools_when_none(): @@ -239,7 +230,7 @@ def test_run_graph_streams_and_logs_flow_when_verbose(monkeypatch, caplog, prese def test_run_graph_invokes_when_not_verbose(): - # Default (non-verbose): the graph is invoked once, never streamed, and nothing is logged. + # Default (non-verbose, no tool sink): invoked once, never streamed, nothing logged. graph = _StreamingGraph([{"messages": [AIMessage(content="hi")]}]) completer = brain.build_completer("k", CascadeConfig(), graph=graph) assert completer([{"role": "user", "content": "hi"}]) == "" @@ -247,6 +238,27 @@ def test_run_graph_invokes_when_not_verbose(): assert graph.stream_kwargs is None +def test_on_tool_sink_streams_and_reports_each_tool_call_by_label(): + # A wired tool sink (the live UI affordance) streams the graph — even without -v — and + # reports each tool call by its speakable label, while still returning the final reply. + labels: list[str] = [] + call = AIMessage( + content="", tool_calls=[{"name": brain.WEB_SEARCH_TOOL_NAME, "args": {}, "id": "c1"}] + ) + snapshots = [{"messages": [call]}, {"messages": [call, AIMessage(content="Here's the news.")]}] + graph = _StreamingGraph(snapshots) + completer = brain.build_completer("k", CascadeConfig(), graph=graph) + reply = completer([{"role": "user", "content": "news?"}], on_tool=labels.append) + assert reply == "Here's the news." + assert labels == ["Searching the web"] + assert graph.stream_kwargs == "values" and graph.invoked is False # streamed, not invoked + + +def test_tool_label_maps_web_search_and_falls_back_for_others(): + assert brain._tool_label(brain.WEB_SEARCH_TOOL_NAME) == "Searching the web" + assert brain._tool_label("get_time") == "Using get_time" + + def test_run_graph_invokes_when_graph_cannot_stream(monkeypatch): # Verbose but the (test) graph only implements invoke: fall back to invoke rather than # crashing on a missing .stream — the fakes and any non-streaming graph stay supported. @@ -261,6 +273,33 @@ def invoke(self, graph_input): assert completer([{"role": "user", "content": "hi"}]) == "from invoke" +def test_run_graph_converts_graph_errors_to_cli_error(): + # A graph failure (gateway 4xx/5xx, a tool raising, a recursion limit) must become a + # CLIError so the cascade surfaces it instead of the reply worker dying silently. + class _Boom: + def invoke(self, graph_input): + del graph_input + raise ValueError("bedrock said no") + + completer = brain.build_completer("k", CascadeConfig(), graph=_Boom()) + with pytest.raises(CLIError) as excinfo: + completer([{"role": "user", "content": "hi"}]) + assert "couldn't complete the turn" in excinfo.value.message + assert "bedrock said no" in excinfo.value.message # the cause is preserved for diagnosis + + +def test_run_graph_passes_cli_error_through(): + # A CLIError from the graph is already user-facing -> propagate as-is, not re-wrapped. + class _CliBoom: + def invoke(self, graph_input): + del graph_input + raise CLIError("already clean", error_type="x") + + completer = brain.build_completer("k", CascadeConfig(), graph=_CliBoom()) + with pytest.raises(CLIError, match="already clean"): + completer([{"role": "user", "content": "hi"}]) + + def test_log_flow_ignores_non_list_messages(): # Defensive: a snapshot without a messages list logs nothing and reports no progress. assert brain._log_flow({"messages": None}, 3) == 3 @@ -336,23 +375,17 @@ def test_reply_text_is_empty_without_an_assistant_message(): # --- build_live_tools -------------------------------------------------------- -def test_build_live_tools_includes_search_when_keyed(monkeypatch): +def test_build_live_tools_is_just_web_search_when_keyed(monkeypatch): search = object() - monkeypatch.setattr("aai_cli.code_agent.fetch_tool.build_fetch_tool", lambda: "fetch") monkeypatch.setattr("aai_cli.code_agent.firecrawl_search.build_web_search_tool", lambda: search) - monkeypatch.setattr("aai_cli.code_agent.docs_mcp.load_docs_tools", lambda: ["docs"]) - tools = brain.build_live_tools() - # Fetch + the keyed search + the docs tools, in that order. - assert tools == ["fetch", search, "docs"] + # The live agent's sole built-in tool is Firecrawl web search — no URL fetch, no docs. + assert brain.build_live_tools() == [search] -def test_build_live_tools_omits_search_when_unkeyed(monkeypatch): - monkeypatch.setattr("aai_cli.code_agent.fetch_tool.build_fetch_tool", lambda: "fetch") +def test_build_live_tools_is_empty_without_firecrawl_key(monkeypatch): monkeypatch.setattr("aai_cli.code_agent.firecrawl_search.build_web_search_tool", lambda: None) - monkeypatch.setattr("aai_cli.code_agent.docs_mcp.load_docs_tools", list) - tools = brain.build_live_tools() - # No TAVILY_API_KEY -> no search tool, just the fetch tool. - assert tools == ["fetch"] + # No FIRECRAWL_API_KEY -> no tool at all; the agent then runs tool-free. + assert brain.build_live_tools() == [] # --- build_graph (model construction + compile, with the docs probe skipped) - @@ -393,14 +426,14 @@ def fake_create(*, model, tools, system_prompt): monkeypatch.setattr(deepagents, "create_deep_agent", fake_create) monkeypatch.setattr(model_mod, "build_model", lambda *a, **k: object()) - builtin = [_NamedTool("fetch_url")] + builtin = [_NamedTool(brain.WEB_SEARCH_TOOL_NAME)] extra = [_NamedTool("get_time")] graph = brain.build_graph("k", CascadeConfig(), tools=builtin, mcp_tools=extra) # The model is bound to both tool sets, in built-in-then-MCP order. assert graph == "graph" assert captured["tools"] == builtin + extra - # The prompt advertises the built-in fetch leg AND the MCP tool by name. - assert "fetch a specific URL" in captured["system_prompt"] + # The prompt advertises the built-in web-search leg AND the MCP tool by name. + assert "search the web" in captured["system_prompt"] assert "use your connected tools (get_time)" in captured["system_prompt"] diff --git a/tests/test_agent_cascade_command.py b/tests/test_agent_cascade_command.py index a7c2374..405a508 100644 --- a/tests/test_agent_cascade_command.py +++ b/tests/test_agent_cascade_command.py @@ -215,23 +215,23 @@ def test_open_audio_mic_warns_and_uses_duplex_rate(monkeypatch): # --- MCP servers (resolution unit-tested in test_agent_cascade_mcp.py) ------- -def test_default_mcp_servers_flow_into_cascade_config(monkeypatch): +def test_no_mcp_servers_load_by_default(monkeypatch): monkeypatch.setattr(_exec.tts_session, "require_available", lambda _c: None) monkeypatch.setattr(config, "resolve_api_key", lambda **_: "k") monkeypatch.setattr(_exec, "FileSource", lambda src: types.SimpleNamespace(sample_rate=16000)) monkeypatch.setattr(_exec.client, "resolve_audio_source", lambda source, sample: "clip.wav") captured = {} - # Capture config at the deps seam so the graph (and its npx/uvx servers) never builds. + # Capture config at the deps seam so the graph never builds. def fake_real(api_key, config, *, audio, stt_params): captured["config"] = config return "deps" monkeypatch.setattr(_exec.engine.CascadeDeps, "real", fake_real) monkeypatch.setattr(_exec.engine, "run_cascade", lambda **kwargs: None) - # With no flags, the default servers (e.g. weather) ride into the config the brain reads. + # With no --mcp-config, no MCP servers load — the agent keeps just its web-search tool. run_agent_cascade(_opts(source="clip.wav"), AppState(), json_mode=False) - assert "weather" in captured["config"].mcp_servers + assert captured["config"].mcp_servers == {} # --- run_agent_cascade wiring ---------------------------------------------- diff --git a/tests/test_agent_cascade_config.py b/tests/test_agent_cascade_config.py index 7514ed5..e722fca 100644 --- a/tests/test_agent_cascade_config.py +++ b/tests/test_agent_cascade_config.py @@ -19,7 +19,7 @@ def test_default_config_values(): config = CascadeConfig() assert config.voice == DEFAULT_VOICE - assert config.model == DEFAULT_MODEL == "gpt-5.1" # `assembly live` defaults to gpt-5.1 + assert config.model == DEFAULT_MODEL == "claude-haiku-4-5-20251001" # `assembly live` default assert config.greeting == DEFAULT_GREETING # The sliding-window default keeps the last 40 messages of context. assert config.max_history == 40 diff --git a/tests/test_agent_cascade_engine.py b/tests/test_agent_cascade_engine.py index d113433..23d7ec6 100644 --- a/tests/test_agent_cascade_engine.py +++ b/tests/test_agent_cascade_engine.py @@ -15,95 +15,9 @@ from aai_cli.agent_cascade.config import CascadeConfig from aai_cli.agent_cascade.engine import CascadeDeps, CascadeSession, run_cascade from aai_cli.core.errors import APIError - - -class FakeRenderer: - def __init__(self): - self.calls = [] - - def connected(self): - self.calls.append(("connected",)) - - def user_partial(self, text): - self.calls.append(("user_partial", text)) - - def user_final(self, text): - self.calls.append(("user_final", text)) - - def reply_started(self): - self.calls.append(("reply_started",)) - - def agent_transcript(self, text, *, interrupted): - self.calls.append(("agent_transcript", text, interrupted)) - - def reply_done(self, *, interrupted): - self.calls.append(("reply_done", interrupted)) - - -class FakePlayer: - def __init__(self): - self.enqueued = [] - self.flushed = 0 - self.started = False - self.closed = False - - def start(self): - self.started = True - - def enqueue(self, pcm): - self.enqueued.append(pcm) - - def flush(self): - self.flushed += 1 - - def close(self): - self.closed = True - - -class FakeWorker: - def __init__(self, *, alive): - self._alive = alive - self.joined = 0 - - def is_alive(self): - return self._alive - - def join(self): - self.joined += 1 - self._alive = False - - -def _sync_spawn(target): - """Run the reply body inline and hand back a finished worker, so the cascade is - driven deterministically without real threads.""" - target() - return FakeWorker(alive=False) - - -def _turn(text, *, end_of_turn=True, turn_is_formatted=True): - return types.SimpleNamespace( - transcript=text, end_of_turn=end_of_turn, turn_is_formatted=turn_is_formatted - ) - - -def make_session( - *, - complete_reply=lambda messages: "Hello there.", - synthesize=lambda text: b"pcm:" + text.encode(), - spawn=_sync_spawn, - run_stt=lambda on_turn: None, - config=None, -): - deps = CascadeDeps( - run_stt=run_stt, complete_reply=complete_reply, synthesize=synthesize, spawn=spawn - ) - renderer = FakeRenderer() - player = FakePlayer() - session = CascadeSession( - deps=deps, renderer=renderer, player=player, config=config or CascadeConfig() - ) - return session, renderer, player - +from tests._cascade_fakes import FakePlayer, FakeRenderer, FakeWorker, make_session +from tests._cascade_fakes import sync_spawn as _sync_spawn +from tests._cascade_fakes import turn as _turn # --- greeting ---------------------------------------------------------------- @@ -146,7 +60,7 @@ def test_on_turn_blank_transcript_ignored(): def test_on_turn_final_renders_and_replies(): - session, renderer, player = make_session(complete_reply=lambda m: "Sure thing.") + session, renderer, player = make_session(complete_reply=lambda m, on_tool=None: "Sure thing.") session.on_turn(_turn("what time is it")) assert ("user_final", "what time is it") in renderer.calls assert {"role": "user", "content": "what time is it"} in session.history @@ -155,9 +69,23 @@ def test_on_turn_final_renders_and_replies(): assert ("reply_done", False) in renderer.calls +def test_reply_forwards_tool_calls_to_the_renderer(): + # The reply worker hands complete_reply an on_tool sink; a tool call it makes surfaces on + # the renderer, so the live UI can show a "Searching the web…" affordance mid-turn. + def reply(messages, on_tool): + on_tool("Searching the web") + return "Found it." + + session, renderer, _player = make_session(complete_reply=reply) + session.on_turn(_turn("what's the news")) + assert ("tool_call", "Searching the web") in renderer.calls + + def test_on_turn_interim_shows_partial_and_does_not_reply(): replies = [] - session, renderer, _player = make_session(complete_reply=lambda m: replies.append(m) or "x") + session, renderer, _player = make_session( + complete_reply=lambda m, on_tool=None: replies.append(m) or "x" + ) session.on_turn(_turn("partial words", end_of_turn=False)) assert ("user_partial", "partial words") in renderer.calls assert replies == [] # no reply generated for an interim turn @@ -178,7 +106,7 @@ def test_on_turn_interim_barges_in_on_live_reply(): def test_generate_reply_speaks_each_sentence(): spoken = [] session, renderer, player = make_session( - complete_reply=lambda m: "One. Two! Three?", + complete_reply=lambda m, on_tool=None: "One. Two! Three?", synthesize=lambda text: spoken.append(text) or text.encode(), ) session._generate_reply() @@ -193,7 +121,7 @@ def test_generate_reply_speaks_each_sentence(): def test_generate_reply_threads_system_prompt_and_history(): captured = {} - def capture(messages): + def capture(messages, on_tool=None): captured["messages"] = messages return "Ok." @@ -208,7 +136,7 @@ def capture(messages): def test_generate_reply_trims_history_window(): session, _renderer, _player = make_session( - complete_reply=lambda m: "a. b.", config=CascadeConfig(max_history=1) + complete_reply=lambda m, on_tool=None: "a. b.", config=CascadeConfig(max_history=1) ) session.history.append({"role": "user", "content": "hi"}) session._generate_reply() @@ -219,7 +147,7 @@ def test_generate_reply_trims_history_window(): def test_on_turn_trims_history_window(): # An empty reply adds no assistant turn, so only on_turn's own trim caps the list. session, _renderer, _player = make_session( - complete_reply=lambda m: "", config=CascadeConfig(max_history=1) + complete_reply=lambda m, on_tool=None: "", config=CascadeConfig(max_history=1) ) session.history.append({"role": "assistant", "content": "old"}) session.on_turn(_turn("newest")) @@ -232,7 +160,9 @@ def synth(text): session._stop.set() return text.encode() - session, renderer, player = make_session(complete_reply=lambda m: "One. Two. Three.") + session, renderer, player = make_session( + complete_reply=lambda m, on_tool=None: "One. Two. Three." + ) session.deps.synthesize = synth session._generate_reply() # Only the first sentence finished enqueuing before the barge-in stop landed. @@ -242,7 +172,7 @@ def synth(text): def test_generate_reply_stop_before_first_sentence_speaks_nothing(): - session, renderer, player = make_session(complete_reply=lambda m: "One. Two.") + session, renderer, player = make_session(complete_reply=lambda m, on_tool=None: "One. Two.") session._stop.set() session._generate_reply() assert player.enqueued == [] @@ -251,21 +181,27 @@ def test_generate_reply_stop_before_first_sentence_speaks_nothing(): assert ("reply_done", True) in renderer.calls -def test_generate_reply_llm_failure_is_recorded_and_aborts(): - def boom(messages): +def test_generate_reply_llm_failure_is_recorded_and_surfaced(): + def boom(messages, on_tool=None): + del messages raise APIError("gateway down") - session, renderer, _player = make_session(complete_reply=boom) + session, renderer, player = make_session(complete_reply=boom) session._generate_reply() - assert isinstance(session.error, APIError) - assert ("reply_started",) not in renderer.calls # aborted before speaking + assert isinstance(session.error, APIError) # recorded for the exit path + # Surfaced in the transcript (not swallowed) but nothing is spoken — the turn aborts. + assert ("agent_transcript", "(error: gateway down)", False) in renderer.calls + assert ("reply_done", False) in renderer.calls # the error line is closed off cleanly + assert player.enqueued == [] def test_generate_reply_tts_failure_midway_is_recorded(): def boom(text): raise APIError("tts down") - session, renderer, player = make_session(complete_reply=lambda m: "Hi.", synthesize=boom) + session, renderer, player = make_session( + complete_reply=lambda m, on_tool=None: "Hi.", synthesize=boom + ) session._generate_reply() assert isinstance(session.error, APIError) assert player.enqueued == [] @@ -298,18 +234,36 @@ def test_barge_in_cancels_and_flushes_live_worker(): assert session._reply is None -def test_barge_in_no_worker_does_not_flush(): +def test_barge_in_without_a_live_worker_does_not_flush(): + # No worker, or one that already finished: nothing to cancel, so no flush. session, _renderer, player = make_session() - session._barge_in() + session._barge_in() # no worker + session._reply = FakeWorker(alive=False) + session._barge_in() # finished worker assert player.flushed == 0 + assert session._reply is None + + +def test_interrupt_reply_signals_stop_and_flushes_without_joining(): + # Live TUI Escape/Ctrl-C silences a playing reply: stop flag + flush, but NO join. + session, _renderer, player = make_session() + worker = FakeWorker(alive=True) + session._reply = worker + assert session.interrupt_reply() is True + assert session._stop.is_set() + assert player.flushed == 1 + assert worker.joined == 0 # not joined — the worker unwinds on its own + assert session._reply is worker # still tracked; the next turn's barge-in joins it -def test_barge_in_finished_worker_does_not_flush(): +def test_interrupt_reply_is_a_noop_when_nothing_is_playing(): + # No worker, or one that already finished: nothing to stop, so no flush and no stop flag. session, _renderer, player = make_session() + assert session.interrupt_reply() is False # no worker session._reply = FakeWorker(alive=False) - session._barge_in() + assert session.interrupt_reply() is False # finished worker assert player.flushed == 0 - assert session._reply is None + assert not session._stop.is_set() def test_shutdown_joins_live_worker(): @@ -381,7 +335,7 @@ def run_stt(on_turn): session_box = {} - def complete_reply(messages): + def complete_reply(messages, on_tool=None): session_box["messages"] = messages return "Hi back." @@ -404,6 +358,27 @@ def complete_reply(messages): assert {"role": "assistant", "content": "Welcome."} in session_box["messages"] +def test_run_cascade_hands_the_session_to_on_session_before_greeting(): + # run_cascade hands the session to on_session before the player starts (TUI wires it). + captured = {} + player = FakePlayer() + deps = CascadeDeps( + run_stt=lambda on_turn: None, + complete_reply=lambda m, on_tool=None: "hi", + synthesize=lambda text: b"", + spawn=_sync_spawn, + ) + run_cascade( + renderer=FakeRenderer(), + player=player, + config=CascadeConfig(greeting=""), + deps=deps, + on_session=lambda s: captured.update(session=s, started=player.started), + ) + assert isinstance(captured["session"], CascadeSession) + assert captured["started"] is False + + def test_run_cascade_shuts_down_inflight_worker(): worker = FakeWorker(alive=True) @@ -415,7 +390,10 @@ def run_stt(on_turn): on_turn(_turn("hello")) deps = CascadeDeps( - run_stt=run_stt, complete_reply=lambda m: "hi", synthesize=lambda t: b"", spawn=lazy_spawn + run_stt=run_stt, + complete_reply=lambda m, on_tool=None: "hi", + synthesize=lambda t: b"", + spawn=lazy_spawn, ) run_cascade( renderer=FakeRenderer(), player=FakePlayer(), config=CascadeConfig(greeting=""), deps=deps @@ -427,7 +405,7 @@ def test_run_cascade_reraises_recorded_leg_error(): def run_stt(on_turn): on_turn(_turn("hi")) - def boom(messages): + def boom(messages, on_tool=None): raise APIError("gateway down") deps = CascadeDeps( @@ -448,7 +426,10 @@ def run_stt(on_turn): player = FakePlayer() deps = CascadeDeps( - run_stt=run_stt, complete_reply=lambda m: "", synthesize=lambda t: b"", spawn=_sync_spawn + run_stt=run_stt, + complete_reply=lambda m, on_tool=None: "", + synthesize=lambda t: b"", + spawn=_sync_spawn, ) with pytest.raises(APIError, match="stt failed"): run_cascade( diff --git a/tests/test_agent_cascade_mcp.py b/tests/test_agent_cascade_mcp.py index f7d1fb0..015c22a 100644 --- a/tests/test_agent_cascade_mcp.py +++ b/tests/test_agent_cascade_mcp.py @@ -16,25 +16,6 @@ from aai_cli.commands.agent_cascade import _exec from aai_cli.core.errors import UsageError -# --- default_servers --------------------------------------------------------- - - -def test_default_servers_curated_set_and_filesystem_root(): - root = Path("/notes/dir") - servers = mcp_tools.default_servers(root) - # The five curated, no-auth servers, each with a real launch command. - assert set(servers) == {"time", "fetch", "memory", "filesystem", "weather"} - assert servers["time"] == {"command": "uvx", "args": ["mcp-server-time"]} - assert servers["memory"]["args"] == ["-y", "@modelcontextprotocol/server-memory"] - # The filesystem server is scoped to the passed-in root directory. Compare against - # str(root), not a hardcoded "/notes/dir", so it holds on Windows (backslash paths). - assert servers["filesystem"]["args"] == [ - "-y", - "@modelcontextprotocol/server-filesystem", - str(root), - ] - - # --- parse_mcp_config -------------------------------------------------------- @@ -150,32 +131,41 @@ def boom(name, conn) -> list: assert mcp_tools._safe_load(boom, "s", {"command": "x"}) == [] +def test_load_server_times_out_on_a_slow_server(monkeypatch): + # A server that won't list its tools within the timeout is cancelled, so it can't hang + # `assembly live` startup forever; _safe_load then turns the TimeoutError into []. + import asyncio + + class _SlowClient: + def __init__(self, connections): + del connections + + async def get_tools(self): + await asyncio.sleep(10) # never finishes before the (patched) timeout + return [] + + monkeypatch.setattr("langchain_mcp_adapters.client.MultiServerMCPClient", _SlowClient) + monkeypatch.setattr(mcp_tools, "_LOAD_TIMEOUT_S", 0.05) + with pytest.raises(TimeoutError): + mcp_tools._load_server("slow", mcp_tools._to_connection({"command": "x"})) + + # --- _resolve_mcp_servers (the default set + --mcp-config merge) -------------- -def test_resolve_mcp_servers_defaults_loaded_with_no_config(): - servers = _exec._resolve_mcp_servers(mcp_config=()) - # Every session loads the curated default set out of the box. - assert {"time", "weather", "memory", "fetch", "filesystem"} <= set(servers) +def test_resolve_mcp_servers_empty_with_no_config(): + # No --mcp-config -> no MCP servers; the live agent runs with just its web-search tool. + assert _exec._resolve_mcp_servers(mcp_config=()) == {} -def test_resolve_mcp_servers_config_adds_to_defaults(tmp_path): +def test_resolve_mcp_servers_returns_only_config_servers(tmp_path): path = tmp_path / "servers.json" path.write_text( '{"mcpServers": {"custom": {"command": "uvx", "args": ["x"]}}}', encoding="utf-8" ) servers = _exec._resolve_mcp_servers(mcp_config=(path,)) - # The config server is added alongside (not instead of) the defaults. - assert servers["custom"] == {"command": "uvx", "args": ["x"]} - assert "weather" in servers - - -def test_resolve_mcp_servers_config_overrides_default_by_name(tmp_path): - path = tmp_path / "servers.json" - path.write_text('{"mcpServers": {"time": {"command": "my-time"}}}', encoding="utf-8") - servers = _exec._resolve_mcp_servers(mcp_config=(path,)) - # An explicit config entry overrides the default server of the same name. - assert servers["time"] == {"command": "my-time"} + # Only the opt-in config server is present — there is no curated default set to merge. + assert servers == {"custom": {"command": "uvx", "args": ["x"]}} # --- _warn_without_web_search (the FIRECRAWL_API_KEY notice) ------------------ diff --git a/tests/test_agent_events.py b/tests/test_agent_events.py index 676d8ef..879d1a0 100644 --- a/tests/test_agent_events.py +++ b/tests/test_agent_events.py @@ -18,6 +18,10 @@ (events.UserDelta(text="typing…"), {"type": "transcript.user.delta", "text": "typing…"}), (events.UserFinal(text="hello"), {"type": "transcript.user", "text": "hello"}), (events.ReplyStarted(), {"type": "reply.started"}), + ( + events.ToolUse(label="Searching the web"), + {"type": "tool.use", "label": "Searching the web"}, + ), ( events.AgentTranscript(text="hi back", interrupted=False), {"type": "transcript.agent", "text": "hi back", "interrupted": False}, diff --git a/tests/test_agent_render.py b/tests/test_agent_render.py index e6216af..5cfc9fa 100644 --- a/tests/test_agent_render.py +++ b/tests/test_agent_render.py @@ -124,6 +124,26 @@ def test_human_agent_line_labeled(): assert "the time is noon" in out +def test_json_tool_call_emits_tool_use_event(): + buf = io.StringIO() + AgentRenderer(json_mode=True, out=buf).tool_call("Searching the web") + assert {"type": "tool.use", "label": "Searching the web"} in _json_lines(buf) + + +def test_text_tool_call_goes_to_stderr_not_stdout(): + # The tool affordance is status, so in piped text mode it stays off stdout (transcript-only). + out, err = io.StringIO(), io.StringIO() + AgentRenderer(json_mode=False, text_mode=True, out=out, err=err).tool_call("Searching the web") + assert "Searching the web" in err.getvalue() + assert out.getvalue() == "" + + +def test_human_tool_call_shows_inline_line(): + r, buf = _human() + r.tool_call("Searching the web") + assert "Searching the web" in buf.getvalue() + + def test_human_close_commits_open_partial(): r, buf = _human() r.user_partial("half a sentence") diff --git a/tests/test_code_agent.py b/tests/test_code_agent.py index a1452bf..b9fd1d2 100644 --- a/tests/test_code_agent.py +++ b/tests/test_code_agent.py @@ -27,6 +27,7 @@ ) from aai_cli.code_agent.agent import MUTATING_TOOLS, build_agent from aai_cli.code_agent.events import AssistantText, ErrorText, ToolCall, ToolResult +from aai_cli.code_agent.prompt import build_system_prompt from aai_cli.code_agent.render import RichRenderer, make_approver from aai_cli.code_agent.session import QUIT_COMMANDS, CodeSession, run_repl @@ -124,6 +125,17 @@ def test_run_repl_sends_initial_then_lines_until_quit(tmp_path: Path) -> None: assert texts == ["a", "b"] # initial + "second"; blank skipped, stops at /quit +def test_system_prompt_steers_concise_speech() -> None: + prompt = build_system_prompt("/work") + assert "/work" in prompt # anchored to the working directory + # The prose is read aloud, so the prompt must steer the model to concise, speech-ready + # replies with code kept out of the spoken text. + assert "read aloud" in prompt + assert "fenced code blocks" in prompt + lowered = prompt.lower() + assert "concise" in lowered and "spoken" in lowered + + def test_mutating_tools_include_cli_shell_and_fetch() -> None: assert set(MUTATING_TOOLS) == {"write_file", "edit_file", "execute", "assembly", "fetch_url"} assert "exit" in QUIT_COMMANDS and "/exit" in QUIT_COMMANDS diff --git a/tests/test_code_model.py b/tests/test_code_model.py index 047f9bf..2cc8c70 100644 --- a/tests/test_code_model.py +++ b/tests/test_code_model.py @@ -173,6 +173,61 @@ def test_ensure_tool_call_arguments_guards() -> None: model_mod._ensure_tool_call_arguments([{"tool_calls": 99}]) # tool_calls not a list +def test_sanitize_tool_schemas_strips_model_incompatible_keys() -> None: + # Gemini's function_declarations 400 on these validation/metadata keywords; strip every + # one (recursively) while keeping structural keys, so a tool-bound request works. + denied = [ + "$schema", + "$id", + "$comment", + "title", + "default", + "examples", + "const", + "additionalProperties", + "unevaluatedProperties", + "patternProperties", + "minProperties", + "maxProperties", + "propertyNames", + "exclusiveMinimum", + "exclusiveMaximum", + "multipleOf", + "additionalItems", + "unevaluatedItems", + "contains", + ] + # Pin the shipped denylist against this list: a renamed/dropped key would silently leak an + # unsupported keyword to Gemini (and break a tool-bound turn). + assert set(model_mod._UNSUPPORTED_SCHEMA_KEYS) == set(denied) + + nested: dict[str, object] = {"type": "string", **dict.fromkeys(denied, "x")} + inside_list: dict[str, object] = {"type": "number", **dict.fromkeys(denied, "x")} + params: dict[str, object] = { + "type": "object", + "properties": {"city": nested}, # nested dict + "anyOf": [inside_list], # nested list + **dict.fromkeys(denied, "x"), + } + payload: dict[str, object] = { + "tools": [ + None, # non-dict tool -> skipped + {"type": "function", "function": 7}, # function not a dict -> skipped + {"type": "function", "function": {"name": "get_weather", "parameters": params}}, + ] + } + model_mod._sanitize_tool_schemas(payload) + assert not (set(denied) & set(params)) # every denied key stripped at the top level + assert params["type"] == "object" # structural keys preserved + assert nested == {"type": "string"} # nested dict fully stripped + assert inside_list == {"type": "number"} # nested-in-list fully stripped + + +def test_sanitize_tool_schemas_guards() -> None: + model_mod._sanitize_tool_schemas(None) # not a dict -> early return, no error + model_mod._sanitize_tool_schemas({"tools": 99}) # tools not a list -> early return + + def test_get_request_payload_fills_empty_tool_call_arguments() -> None: from langchain_core.messages import AIMessage, HumanMessage from langchain_openai import ChatOpenAI diff --git a/tests/test_code_tui.py b/tests/test_code_tui.py index b536dec..aa34f4f 100644 --- a/tests/test_code_tui.py +++ b/tests/test_code_tui.py @@ -145,6 +145,22 @@ def test_ask_modal_returns_typed_answer() -> None: assert answer == "8080" +def test_approval_modal_dismisses_on_escape_or_ctrl_c() -> None: + # Escape / Ctrl-C decline the tool (the safe cancel), like pressing "n". + app = CodeAgentApp(agent=FakeAgent([])) + assert _drive_modal(app, lambda: app._approve("execute", {"cmd": "ls"}), ["escape"]) is False + app2 = CodeAgentApp(agent=FakeAgent([])) + assert _drive_modal(app2, lambda: app2._approve("execute", {"cmd": "ls"}), ["ctrl+c"]) is False + + +def test_ask_modal_dismisses_on_escape_or_ctrl_c_with_no_answer() -> None: + # Escape / Ctrl-C cancel the question; the agent gets an empty answer. + app = CodeAgentApp(agent=FakeAgent([])) + assert _drive_modal(app, lambda: app._ask("which port?"), ["escape"]) == "" + app2 = CodeAgentApp(agent=FakeAgent([])) + assert _drive_modal(app2, lambda: app2._ask("which port?"), ["ctrl+c"]) == "" + + def test_full_turn_with_approval_interrupt() -> None: async def go() -> None: agent = FakeAgent( diff --git a/tests/test_code_tui_voice.py b/tests/test_code_tui_voice.py index 2ec6448..ddc3226 100644 --- a/tests/test_code_tui_voice.py +++ b/tests/test_code_tui_voice.py @@ -140,6 +140,30 @@ async def go() -> None: assert calls == [True] # running -> the finished turn is handled +def test_interrupt_during_speaking_stops_readback_and_ctrl_c_can_always_quit(): # untyped: internals + # Both Escape and Ctrl-C stop the readback and re-listen (not text); Ctrl-C also arms the + # quit, and a SECOND Ctrl-C exits even mid-speech — so a spoken turn can never trap you. + async def go(): + app = CodeAgentApp(agent=FakeAgent([]), voice=FakeVoice()) + exited: list[bool] = [] + app.exit = lambda *a, **k: exited.append(True) # capture the quit without tearing down + async with app.run_test(size=(100, 30)) as pilot: + await pilot.pause() + app._voice_phase = "speaking" + app.action_interrupt() # Escape + assert app._voice.cancels >= 1 and app._voice_paused is False # stopped, re-listens + assert app._quit_pending is False # Escape never quits + + app._voice_phase = "speaking" + app.action_quit_or_interrupt() # Ctrl-C + assert app._voice.cancels >= 2 and app._quit_pending is True # stopped + armed + assert exited == [] + app.action_quit_or_interrupt() # second Ctrl-C + assert exited == [True] # quits even mid-speech — never trapped + + _run(go()) + + def test_capture_voice_turn_is_a_noop_once_typed() -> None: async def go() -> None: voice = FakeVoice(transcripts=["ignored"]) @@ -403,25 +427,22 @@ async def go() -> None: _run(go()) -def test_ctrl_c_on_active_voice_interrupts_even_when_a_quit_was_pending( +def test_ctrl_c_quits_when_a_quit_is_pending_even_with_active_voice( monkeypatch: pytest.MonkeyPatch, ) -> None: - # Stopping active voice takes priority over a pending quit: a Ctrl-C that lands while the - # agent is listening/speaking interrupts the voice and never quits, even if the quit hint - # was already armed from an earlier press. + # A pending quit takes priority over active voice: a second Ctrl-C (quit already armed) + # exits even while the agent is listening/speaking — otherwise a voice turn could trap + # the user with no way out. async def go() -> None: - voice = FakeVoice() - app = CodeAgentApp(agent=FakeAgent([]), voice=voice) - app._voice_paused = True + app = CodeAgentApp(agent=FakeAgent([]), voice=FakeVoice()) async with app.run_test(size=(100, 30)) as pilot: await pilot.pause() exited: list[bool] = [] monkeypatch.setattr(app, "exit", lambda *a, **k: exited.append(True)) - app._voice_paused = False # voice active (listening) - app._quit_pending = True # a quit hint was already armed - app.action_quit_or_interrupt() # Ctrl-C: interrupt the voice, do NOT quit - assert voice.cancels == 1 - assert exited == [] # active voice is interrupted, never quit + app._voice_paused = False # voice active (listening/speaking) + app._quit_pending = True # a quit hint was already armed by a prior press + app.action_quit_or_interrupt() # Ctrl-C: with quit armed, exit + assert exited == [True] # quits — never trapped _run(go()) @@ -444,20 +465,20 @@ async def go() -> None: def test_stop_voice_activity_is_a_noop_when_voice_inactive() -> None: - # No voice session, or a paused one, is not "active": the interrupt defers to the quit path - # rather than cancelling anything. + # No voice session, or a paused one, is not "active": _stop_voice_activity cancels nothing + # (and doesn't crash on the missing session), so the interrupt defers to the quit path. async def go() -> None: no_voice = CodeAgentApp(agent=FakeAgent([])) async with no_voice.run_test(size=(100, 30)) as pilot: await pilot.pause() - assert no_voice._stop_voice_activity() is False # nothing to stop + no_voice._stop_voice_activity() # no voice session -> no-op, no error voice = FakeVoice() paused = CodeAgentApp(agent=FakeAgent([]), voice=voice) paused._voice_paused = True async with paused.run_test(size=(100, 30)) as pilot: await pilot.pause() - assert paused._stop_voice_activity() is False # paused -> inactive + paused._stop_voice_activity() # paused -> inactive assert voice.cancels == 0 # a paused session is never cancelled _run(go()) diff --git a/tests/test_live_tui.py b/tests/test_live_tui.py index 732f9e4..abe49b9 100644 --- a/tests/test_live_tui.py +++ b/tests/test_live_tui.py @@ -13,6 +13,7 @@ import types import pytest +import typer from textual.widgets import Static from aai_cli.agent_cascade import engine @@ -72,8 +73,8 @@ def _voicebar(app) -> str: def test_splash_and_status_render() -> None: - # The session opens on the ASSEMBLY wordmark + ready line, and the footer shows the only - # control (quit) — there is no text prompt mounted (input is voice-only). + # The session opens on the ASSEMBLY wordmark + ready line, and the footer shows the + # interrupt/quit controls — there is no text prompt mounted (input is voice-only). async def go() -> None: app = _app() async with app.run_test(size=(100, 30)) as pilot: @@ -81,7 +82,8 @@ async def go() -> None: splash = str(app.query_one("#log").children[0].render()) assert "█" in splash and "Listening… start talking" in splash # the wordmark splash assert "Listening" in _voicebar(app) # opens in the listening phase - assert "Ctrl-C to quit" in str(app.query_one("#status", Static).render()) + status = str(app.query_one("#status", Static).render()) + assert "interrupt" in status and "Ctrl-Q to quit" in status assert len(app.query("#prompt")) == 0 # no text input — voice only assert app.ENABLE_COMMAND_PALETTE is False # the voice UI hides the command palette @@ -135,6 +137,20 @@ async def go() -> None: _run(go()) +def test_show_tool_call_mounts_an_inline_affordance() -> None: + # A tool call mid-turn drops a dim "Searching the web…" note, so the thinking pause reads + # as progress rather than a hang (the live tool affordance). + async def go() -> None: + app = _app() + async with app.run_test(size=(100, 30)) as pilot: + await pilot.pause() + app.show_tool_call("Searching the web") + notes = [str(n.render()) for n in app.query(Note)] + assert any("Searching the web" in n for n in notes) + + _run(go()) + + def test_agent_sentence_without_begin_reply_mounts_a_reply() -> None: async def go() -> None: app = _app() @@ -196,6 +212,75 @@ async def go() -> None: _run(go()) +def test_escape_interrupts_a_playing_reply_via_the_session_hook() -> None: + # Escape fires the session's reply-interrupt (set once the cascade has a session) and + # never quits — the worker unwinds and the renderer returns the bar to listening. + async def go() -> None: + fired: list[bool] = [] + + def hook() -> bool: + fired.append(True) + return True + + app = _app() + async with app.run_test(size=(100, 30)) as pilot: + await pilot.pause() + app.set_interrupt(hook) + app.action_interrupt() + assert fired == [True] + + _run(go()) + + +def test_ctrl_c_interrupts_a_playing_reply_without_quitting(monkeypatch) -> None: + # While a reply is playing (the hook returns True), Ctrl-C interrupts it and stays — it + # must NOT quit, so a long answer can be cut off without ending the session. + async def go() -> None: + app = _app() + async with app.run_test(size=(100, 30)) as pilot: + await pilot.pause() + exited: list[bool] = [] + monkeypatch.setattr(app, "exit", lambda *a, **k: exited.append(True)) + app.set_interrupt(lambda: True) # a reply is playing + app.action_interrupt_or_quit() + assert exited == [] # interrupted, not quit + + _run(go()) + + +def test_ctrl_c_quits_when_nothing_is_playing(monkeypatch) -> None: + # With no reply playing (the hook returns False, or none is wired yet), Ctrl-C quits. + async def go() -> None: + stops: list[bool] = [] + app = _app(on_stop=lambda: stops.append(True)) + async with app.run_test(size=(100, 30)) as pilot: + await pilot.pause() + exited: list[bool] = [] + monkeypatch.setattr(app, "exit", lambda *a, **k: exited.append(True)) + app.set_interrupt(lambda: False) # nothing playing + app.action_interrupt_or_quit() + assert stops == [True] and exited == [True] + + _run(go()) + + +def test_interrupt_before_a_session_is_wired_is_a_safe_noop(monkeypatch) -> None: + # A keypress before the cascade has built its session (no interrupt hook yet): Escape is a + # no-op and Ctrl-C falls through to quit, so an early press can never wedge the UI. + async def go() -> None: + app = _app() + async with app.run_test(size=(100, 30)) as pilot: + await pilot.pause() + exited: list[bool] = [] + monkeypatch.setattr(app, "exit", lambda *a, **k: exited.append(True)) + app.action_interrupt() # no hook wired -> nothing happens, no crash + assert exited == [] + app.action_interrupt_or_quit() # nothing to interrupt -> quits + assert exited == [True] + + _run(go()) + + def test_action_stop_tears_down_audio_and_exits(monkeypatch) -> None: async def go() -> None: stops: list[bool] = [] @@ -225,6 +310,7 @@ def run_conversation(renderer) -> None: renderer.connected() renderer.user_partial("turn it") renderer.user_final("turn it up") + renderer.tool_call("Searching the web") renderer.reply_started() renderer.agent_transcript("Done.", interrupted=False) renderer.reply_done(interrupted=False) @@ -244,6 +330,8 @@ def run_conversation(renderer) -> None: ) assert "» turn it up" in str(app.query_one(UserMessage).render()) assert app.query_one(AssistantMessage).text == "Done. " + # The tool_call leg hopped to the UI thread and surfaced the affordance note. + assert any("Searching the web" in str(n.render()) for n in app.query(Note)) assert done.is_set() # leaving the run_test context unmounted -> on_stop released it _run(go()) @@ -331,11 +419,32 @@ def run(self, **kwargs): assert captured["ran"] == {"mouse": False} # mouse off so transcript text stays selectable +def test_tui_setup_keyboard_interrupt_exits_clean(monkeypatch) -> None: + # Ctrl-C during TUI setup (mic open / graph build / --mcp-config load) lands before + # Textual captures the keyboard; it must exit 130, not surface a raw traceback. + _wire_tui(monkeypatch) + + def boom(*_a, **_k): + raise KeyboardInterrupt + + monkeypatch.setattr(_exec, "_run_live_tui", boom) + with pytest.raises(typer.Exit) as exc: + run_agent_cascade(_opts(), AppState(), json_mode=False) + assert exc.value.exit_code == 130 + + def test_tui_run_conversation_drives_the_cascade(monkeypatch) -> None: - # The closure handed to the app runs the cascade with the duplex player and the wired deps. + # The closure handed to the app runs the cascade with the duplex player and the wired + # deps, and the cascade's on_session wires the session's reply-interrupt onto the app. fake_duplex = _wire_tui(monkeypatch) captured: dict[str, object] = {} - monkeypatch.setattr(engine, "run_cascade", lambda **kw: captured.update(kw)) + + def fake_run_cascade(**kw): + captured.update(kw) + # run_cascade hands the freshly built session to on_session before the conversation. + kw["on_session"](types.SimpleNamespace(interrupt_reply="session-interrupt")) + + monkeypatch.setattr(engine, "run_cascade", fake_run_cascade) class FakeApp: def __init__(self, *, run_conversation, on_stop, web_note): @@ -344,8 +453,13 @@ def __init__(self, *, run_conversation, on_stop, web_note): def run(self, **kwargs): self._rc("renderer-sentinel") # the app would call this on its worker thread + def set_interrupt(self, interrupt): + captured["interrupt"] = interrupt + monkeypatch.setattr("aai_cli.agent_cascade.tui.LiveAgentApp", FakeApp) run_agent_cascade(_opts(), AppState(), json_mode=False) assert captured["player"] is fake_duplex.player assert captured["deps"] == "deps" assert captured["renderer"] == "renderer-sentinel" + # The session's interrupt_reply was wired onto the app (so Escape/Ctrl-C can use it). + assert captured["interrupt"] == "session-interrupt" diff --git a/tests/test_llm_command.py b/tests/test_llm_command.py index c9f0032..54257a6 100644 --- a/tests/test_llm_command.py +++ b/tests/test_llm_command.py @@ -37,6 +37,37 @@ def test_llm_help_lists_command(): assert "gateway" in result.output.lower() +def test_known_models_is_the_full_gateway_list(): + # Pin the exact ids so a typo'd/renamed model id is caught (the --list-models + # tests below compare output against KNOWN_MODELS itself, so they can't). + assert KNOWN_MODELS == ( + "claude-opus-4-7", + "claude-opus-4-6", + "claude-opus-4-5-20251101", + "claude-sonnet-4-6", + "claude-sonnet-4-5-20250929", + "claude-haiku-4-5-20251001", + "gpt-5.5", + "gpt-5.2", + "gpt-5.1", + "gpt-5", + "gpt-5-mini", + "gpt-5-nano", + "gpt-4.1", + "gpt-oss-120b", + "gpt-oss-20b", + "gemini-3.5-flash", + "gemini-3-flash-preview", + "gemini-3.1-flash-lite-preview", + "gemini-2.5-pro", + "gemini-2.5-flash", + "gemini-2.5-flash-lite", + "kimi-k2.5", + "qwen3-next-80b-a3b", + "qwen3-32B", + ) + + def test_llm_list_models_exits_without_network(monkeypatch): called = {"ran": False} monkeypatch.setattr(