Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 11 additions & 15 deletions REFERENCE.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ each carrying a `"type"` field to dispatch on:
| ------- | ----------- |
| `assembly stream --json` | `begin`, `turn`, `termination` (with `--from-stdin`, a `source` event precedes each file's events) |
| `assembly agent --json` | `session.ready`, `transcript.user.delta`, `transcript.user`, `reply.started`, `transcript.agent`, `reply.done` |
| `assembly live --json` | `session.ready`, `transcript.user.delta`, `transcript.user`, `reply.started`, `transcript.agent`, `reply.done` |
| `assembly live --json` | `session.ready`, `transcript.user.delta`, `transcript.user`, `tool.use`, `reply.started`, `transcript.agent`, `reply.done` |
| `assembly dictate --json` | `utterance` |
| `assembly llm --follow --json` | `answer` |
| `assembly transcribe <batch> --json` | `result` (one per source), then `reduce` if `--llm-reduce` is set |
Expand Down Expand Up @@ -143,21 +143,17 @@ The two are mutually exclusive.
## Live agent tools (MCP)

`assembly live` answers each spoken turn with a tool-using agent, so it can reach
external tools mid-conversation. Out of the box it loads its built-in URL fetch,
the AssemblyAI docs, and a curated, no-auth MCP toolset: `time` and `fetch`
(`uvx`), `memory` and `filesystem` (`npx`, the latter rooted at the working
directory), and an NWS-backed `weather` server.

Firecrawl web search also loads when a `FIRECRAWL_API_KEY` is set; without it the
session prints a one-line notice and runs without web search (every other default
tool needs no key).

`--mcp-config FILE` adds your own servers on top of the defaults, from a standard
`mcpServers` JSON file — the same
external tools mid-conversation. Its toolset is deliberately small — a low-latency
spoken turn does best with one obvious tool rather than a large menu to choose
among — so its one built-in tool is Firecrawl web search. It loads when a
`FIRECRAWL_API_KEY` is set; without it the session prints a one-line notice and
runs from the model's own knowledge (no web search).

`--mcp-config FILE` adds your own MCP servers (none load by default), from a
standard `mcpServers` JSON file — the same
`{"mcpServers": {"name": {"command": "…", "args": […]}}}` shape Claude Desktop and
Claude Code use. Repeat the flag to merge several files; a later file (or a config
entry sharing a default's name) wins on a clash. Remote servers use `{"url": "…"}`
instead of `command`/`args`.
Claude Code use. Repeat the flag to merge several files; a later file wins on a
name clash. Remote servers use `{"url": "…"}` instead of `command`/`args`.

Each server is launched independently and best-effort: one that won't start (a
missing `npx`/`uvx`, an offline host) drops only its own tools, so a single broken
Expand Down
9 changes: 8 additions & 1 deletion aai_cli/agent/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,13 @@ class ReplyStarted(_Event):
type: Literal["reply.started"] = "reply.started"


class ToolUse(_Event):
"""The agent invoked a tool mid-reply (``label`` is a short, human description)."""

type: Literal["tool.use"] = "tool.use"
label: str


class AgentTranscript(_Event):
"""The agent's reply transcript (``interrupted`` when the user barged in)."""

Expand All @@ -67,4 +74,4 @@ class ReplyDone(_Event):
interrupted: bool


Event = SessionReady | UserDelta | UserFinal | ReplyStarted | AgentTranscript | ReplyDone
Event = SessionReady | UserDelta | UserFinal | ToolUse | ReplyStarted | AgentTranscript | ReplyDone
13 changes: 13 additions & 0 deletions aai_cli/agent/render.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,19 @@ def user_final(self, text: str) -> None:
else:
self._finalize_line(_labeled("you: ", text, style="aai.you"))

def tool_call(self, label: str) -> None:
"""Surface that the agent is using a tool (e.g. "Searching the web") while it thinks.

JSON emits a ``tool.use`` event; piped text keeps it off stdout (transcript-only) by
routing to stderr; human mode shows a muted inline line.
"""
if self.json_mode:
self._emit_event(events.ToolUse(label=label))
elif self.text_mode:
self._status(f"{label}…")
else:
self._line(_labeled("", f"{label}…", style="aai.muted"))

# --- agent -------------------------------------------------------------
def reply_started(self) -> None:
if self.json_mode:
Expand Down
160 changes: 103 additions & 57 deletions aai_cli/agent_cascade/brain.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
"""Deepagents-powered reply brain for the live voice cascade.

`assembly live` answers each spoken turn with a deepagents graph instead of a single
LLM completion, so the agent can transparently reach for tools — web search, URL
fetch, the AssemblyAI docs — mid-conversation, mimicking a live multimodal assistant
(the "talk to Gemini Live" experience). The graph is built once per session
LLM completion, so the agent can transparently reach for a tool — web search —
mid-conversation, mimicking a live multimodal assistant (the "talk to Gemini Live"
experience). The toolset is deliberately minimal: a low-latency spoken turn does best
with one obvious tool rather than a menu it has to choose among. The graph is built once per session
(:func:`build_graph`) and invoked statelessly per turn with the running history the
cascade already keeps (:func:`build_completer`); tools are read-only and auto-approved,
because a spoken turn can't pause for a keyboard confirmation, and the system prompt
Expand All @@ -22,9 +23,9 @@

from aai_cli.agent_cascade.config import CascadeConfig
from aai_cli.code_agent.agent import CompiledAgent
from aai_cli.code_agent.fetch_tool import FETCH_TOOL_NAME
from aai_cli.code_agent.firecrawl_search import WEB_SEARCH_TOOL_NAME
from aai_cli.core import debuglog
from aai_cli.core.errors import CLIError

if TYPE_CHECKING:
from langchain_core.tools import BaseTool
Expand All @@ -41,6 +42,16 @@
# arbitrary tuning knob — a +-1 shift is behaviorally equivalent, so no test can kill it.
_RESULT_LOG_CAP = 500 # pragma: no mutate

# Human, speakable labels for the tool affordance the live UI shows while a tool runs (so a
# spoken turn that pauses to use a tool says *why* it's working, not just spin silently).
_TOOL_LABELS = {WEB_SEARCH_TOOL_NAME: "Searching the web"}


def _tool_label(name: str) -> str:
"""A short present-tense label for a tool call, shown as the live UI's tool affordance."""
return _TOOL_LABELS.get(name, f"Using {name}")


# Closes every guidance variant: the reply is spoken, so it must stay short and plain.
_SPOKEN_TAIL = (
"Your reply is read aloud, so keep it short and spoken — no markdown, lists, code, or raw URLs."
Expand Down Expand Up @@ -70,21 +81,17 @@ def _join_clause(parts: list[str]) -> str:


def _tool_capabilities(tools: Sequence[BaseTool]) -> list[str]:
"""The spoken-capability phrases backed by an actually-present tool.
"""The spoken-capability phrase backed by a present built-in tool.

Derived from the resolved tool names so the prompt never advertises a capability the
agent can't perform: web search is present only with a ``FIRECRAWL_API_KEY``, and the
docs tools are best-effort (absent when the docs host is unreachable).
The live agent's only built-in tool is Firecrawl web search, bound just when a
``FIRECRAWL_API_KEY`` is set — so the prompt advertises web search only when the agent
can really do it. Advertising a tool it doesn't have made it announce an action ("I'll
search…") it then couldn't take, leaving the turn with no answer.
"""
names = {tool.name for tool in tools}
capabilities: list[str] = []
if WEB_SEARCH_TOOL_NAME in names:
capabilities.append("search the web for current or unfamiliar facts")
if FETCH_TOOL_NAME in names:
capabilities.append("fetch a specific URL")
if names - {WEB_SEARCH_TOOL_NAME, FETCH_TOOL_NAME}:
capabilities.append("look up the AssemblyAI documentation")
return capabilities
return ["search the web for current or unfamiliar facts"]
return []


def _extra_capability(extra_tools: Sequence[BaseTool]) -> str | None:
Expand Down Expand Up @@ -128,24 +135,20 @@ def build_system_prompt(


def build_live_tools() -> list[BaseTool]:
"""The live agent's read-only toolset: URL fetch, web search (if keyed), and docs.

All three are reused from the coding agent's tool modules. Unlike there they are
*not* approval-gated — a spoken turn can't wait for a keyboard confirmation, so the
live agent only gets read-only tools and runs them automatically. Web search is
present only when ``FIRECRAWL_API_KEY`` is set; the docs MCP is best-effort (an empty
list when the host is unreachable), so neither blocks a session.
"""The live agent's single read-only tool: Firecrawl web search (only when keyed).

Deliberately minimal. A low-latency spoken turn does best with one obvious tool rather
than a large menu it has to choose among — a big toolset made the model narrate "I'll
search…" without ever calling anything, and bloated every request with tool schemas.
Web search is the one capability worth the round-trip; everything else the agent answers
from its own knowledge. The tool is reused (un-approval-gated) from the coding agent and
is present only when ``FIRECRAWL_API_KEY`` is set, so an unkeyed session simply runs
tool-free. Extra tools remain strictly opt-in via ``--mcp-config``.
"""
from aai_cli.code_agent.docs_mcp import load_docs_tools
from aai_cli.code_agent.fetch_tool import build_fetch_tool
from aai_cli.code_agent.firecrawl_search import build_web_search_tool

tools: list[BaseTool] = [build_fetch_tool()]
search = build_web_search_tool()
if search is not None:
tools.append(search)
tools.extend(load_docs_tools())
return tools
return [search] if search is not None else []


def build_graph(
Expand Down Expand Up @@ -184,69 +187,112 @@ def build_graph(

def build_completer(
api_key: str, config: CascadeConfig, *, graph: CompiledAgent | None = None
) -> Callable[[list[ChatCompletionMessageParam]], str]:
) -> Callable[..., str]:
"""A ``complete_reply`` for the cascade engine backed by the deepagents graph.

The cascade prepends its own ``system`` message to the history each turn; the graph
already owns the system prompt, so we drop it before invoking. The graph runs the
full tool loop and we return its final spoken text. Under ``-v`` the loop is streamed
so each tool call/result is logged as it lands (see :func:`_run_graph`). ``graph`` is
injected in tests so the per-turn wiring runs against a fake with no network.
already owns the system prompt, so we drop it before invoking. The graph runs the full
tool loop and we return its final spoken text. ``on_tool`` (when given) is called with a
short label as each tool call lands, so the front-end can show a "Searching the web…"
affordance instead of sitting silent while the agent works; the loop is also streamed —
rather than ``invoke``-d — whenever a sink is wired or under ``-v`` (see :func:`_run_graph`).
``graph`` is injected in tests so the per-turn wiring runs against a fake with no network.
"""
resolved = build_graph(api_key, config) if graph is None else graph

def complete_reply(messages: list[ChatCompletionMessageParam]) -> str:
def complete_reply(
messages: list[ChatCompletionMessageParam],
on_tool: Callable[[str], None] | None = None,
) -> str:
conversation = [message for message in messages if message.get("role") != "system"]
return _reply_text(_run_graph(resolved, conversation))
return _reply_text(_run_graph(resolved, conversation, on_tool))

return complete_reply


def _run_graph(
graph: CompiledAgent, conversation: list[ChatCompletionMessageParam]
graph: CompiledAgent,
conversation: list[ChatCompletionMessageParam],
on_tool: Callable[[str], None] | None = None,
) -> dict[str, object]:
"""Run one turn through the graph, returning its end state.

Normally a single ``invoke`` (the whole tool loop runs internally). Under verbose
mode, and when the graph can stream, drive it as incremental state snapshots instead
so :func:`_log_flow` can surface each tool call/result on stderr as it happens — which
is what makes a stalled spoken turn debuggable. The test fakes only implement
``invoke``, so they (and the non-verbose path) take the plain branch.
Normally a single ``invoke`` (the whole tool loop runs internally). When a tool sink is
wired (the live UI's affordance) or under verbose mode, and the graph can stream, drive
it as incremental state snapshots instead so :func:`_log_flow` surfaces each tool call as
it happens. The test fakes only implement ``invoke``, so they (and the plain path with no
sink) take the invoke branch.
"""
graph_input = {"messages": conversation}
if debuglog.active() and hasattr(graph, "stream"):
try:
return _drive_graph(graph, {"messages": conversation}, on_tool)
except CLIError:
raise
except Exception as exc:
# The graph can fail anywhere in the tool loop — a gateway 4xx/5xx, a tool raising,
# a langgraph recursion limit. Convert it to a CLIError so the cascade records and
# *surfaces* it (the engine shows it in the transcript) instead of the reply worker
# dying silently and the user getting no answer with no clue why.
raise CLIError(

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Embedding the raw exception (f"the agent couldn't complete the turn: {exc}") may expose user/tool data. Redact or sanitize exception text before including it in CLIError messages.

Details

✨ AI Reasoning
​The code now catches all Exceptions from the agent graph and raises a CLIError whose message embeds the original exception's string representation. That original exception may include user-controlled data, tool outputs, or other sensitive content. The CLIError is then recorded and shown to the user/UI by other parts of the cascade, so this change increases the risk of leaking unsanitized user input or external payloads to logs/terminal.

🔧 How do I fix it?
Keep sensitive data such as emails, passwords, and tokens out of logs. When logging values tied to a user, prefer a safe identifier like a user ID over the raw input, and strip line breaks from any user-provided text you do log.

Reply @AikidoSec feedback: [FEEDBACK] to get better review comments in the future.
Reply @AikidoSec ignore: [REASON] to ignore this issue.
More info

f"the agent couldn't complete the turn: {exc}", error_type="agent_brain_error"
) from exc


def _drive_graph(
graph: CompiledAgent,
graph_input: dict[str, object],
on_tool: Callable[[str], None] | None = None,
) -> dict[str, object]:
"""Invoke the graph, or stream it (when a tool sink is wired or under ``-v``) so
:func:`_log_flow` can surface each tool call as it lands."""
if (on_tool is not None or debuglog.active()) and hasattr(graph, "stream"):
last: dict[str, object] = {}
seen = 0
for chunk in graph.stream(graph_input, None, stream_mode="values"):
seen = _log_flow(chunk, seen)
seen = _log_flow(chunk, seen, on_tool)
last = chunk
return last
return graph.invoke(graph_input)


def _log_flow(state: dict[str, object], seen: int) -> int:
"""Log the tool calls/results added to ``state`` since the first ``seen`` messages.
def _log_flow(
state: dict[str, object], seen: int, on_tool: Callable[[str], None] | None = None
) -> int:
"""Surface the tool calls/results added to ``state`` since the first ``seen`` messages.

Reuses the coding agent's message→event vocabulary so the flow log knows the same
AIMessage/ToolMessage shapes the TUI does. Returns the new high-water message count
so the next snapshot only logs what it added.
Feeds ``on_tool`` a speakable label as each tool call lands (the live UI's affordance) and,
under ``-v``, logs the call/result/interim line to stderr. Reuses the coding agent's
message→event vocabulary so it reads the same AIMessage/ToolMessage shapes the TUI does.
Returns the new high-water message count so the next snapshot only re-surfaces what it added.
"""
from aai_cli.code_agent.events import AssistantText, ToolCall, ToolResult, message_events
from aai_cli.code_agent.events import message_events

messages = state.get("messages")
if not isinstance(messages, list):
return seen
verbose = debuglog.active()
for message in messages[seen:]:
for event in message_events(message, announce_calls=True):
if isinstance(event, ToolCall):
_FLOW_LOG.info("tool call %s args=%s", event.name, event.args)
elif isinstance(event, ToolResult):
_FLOW_LOG.info("tool result %s -> %s", event.name, _clip(event.content))
elif isinstance(event, AssistantText):
_FLOW_LOG.info("llm: %s", event.text)
_surface_event(event, on_tool, verbose=verbose)
return len(messages)


def _surface_event(event: object, on_tool: Callable[[str], None] | None, *, verbose: bool) -> None:
"""Surface one flow event: feed a tool call's label to ``on_tool``, and (under ``-v``)
log the call/result/interim line to stderr."""
from aai_cli.code_agent.events import AssistantText, ToolCall, ToolResult

if isinstance(event, ToolCall) and on_tool is not None:
on_tool(_tool_label(event.name))
if not verbose:
return
if isinstance(event, ToolCall):
_FLOW_LOG.info("tool call %s args=%s", event.name, event.args)
elif isinstance(event, ToolResult):
_FLOW_LOG.info("tool result %s -> %s", event.name, _clip(event.content))
elif isinstance(event, AssistantText):
_FLOW_LOG.info("llm: %s", event.text)


def _clip(text: str) -> str:
"""Flatten a tool result onto one line and truncate it for the flow log.

Expand Down
8 changes: 4 additions & 4 deletions aai_cli/agent_cascade/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@
from aai_cli.agent_cascade.voices import DEFAULT_VOICE
from aai_cli.core import llm

# `assembly live` defaults to a capable gateway model (override with --model); kept a
# literal rather than llm.DEFAULT_MODEL so the live agent's default is independent of the
# one-shot `assembly llm` default.
DEFAULT_MODEL = "gpt-5.1"
# `assembly live` defaults to a fast, low-latency gateway model (override with --model)
# a literal rather than llm.DEFAULT_MODEL so the live agent's default is independent of the
# one-shot `assembly llm` default. Latency matters most for a spoken back-and-forth.
DEFAULT_MODEL = "claude-haiku-4-5-20251001"
DEFAULT_MAX_TOKENS = llm.DEFAULT_MAX_TOKENS
# The realtime model the cascade transcribes with (same as the agent-cascade template).
DEFAULT_SPEECH_MODEL = "u3-rt-pro"
Expand Down
Loading
Loading