From a8c09a036be07f111e1849d0fe082b7ff211e04a Mon Sep 17 00:00:00 2001 From: Claude Date: Thu, 18 Jun 2026 16:31:16 +0000 Subject: [PATCH 1/2] Surface the live agent's tool-call flow under -v MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `assembly -v live` only showed the httpx request lines because the deepagents brain answers each spoken turn with a single blocking `graph.invoke(...)` that runs the whole tool loop internally — so a turn that stalled mid-tool ("Let me search…" then nothing) was undebuggable. When verbose logging is active (`debuglog.active()`) and the graph can stream, `brain._run_graph` now drives it as incremental state snapshots and logs each tool call (name + args), tool result (truncated), and interim assistant line as it lands, reusing the coding agent's `events.message_events` vocabulary. The non-verbose path and the invoke-only test fakes keep the plain `invoke` branch. Co-Authored-By: Claude Opus 4.8 Claude-Session: https://claude.ai/code/session_01T3YQmYYiZDEZkDvWqXYk6D --- aai_cli/AGENTS.md | 2 +- aai_cli/agent_cascade/brain.py | 72 +++++++++++++++++- tests/test_agent_cascade_brain.py | 120 +++++++++++++++++++++++++++++- 3 files changed, 188 insertions(+), 6 deletions(-) diff --git a/aai_cli/AGENTS.md b/aai_cli/AGENTS.md index d63d451..8a27f3c 100644 --- a/aai_cli/AGENTS.md +++ b/aai_cli/AGENTS.md @@ -151,7 +151,7 @@ heavily-reworked commands with long bodies; small commands keep the inline - **`streaming/`** + `client.stream_audio` — v3 realtime API. Event callbacks run on the SDK reader thread and guard against `BrokenPipeError` (`stdio.silence_stdout()`) so a closed pipe never dumps a thread traceback. - **`core/sync_stt.py`** + **`core/signals.py`** + `commands/dictate/` — `assembly dictate`: headless dictation over the **Sync STT API** (`Environment.sync_base`, one POST `/transcribe` per utterance with the required `X-AAI-Model: u3-sync-pro` header; 80 ms–120 s of PCM/WAV). It needs no terminal: recording starts immediately and `dictate_exec._record` polls `signals.stop_on_terminate` between ~100 ms mic chunks for a SIGTERM, which finishes the utterance (clean exit 0) — so a hotkey tool like Hammerspoon can launch it as a background task and `kill -TERM`/`task:terminate()` to transcribe. SIGINT (Ctrl-C) still cancels (exit 130). Both boundaries (the stop latch, mic, HTTP) are injectable, so the suite never needs a real signal or microphone (`tests/test_dictate_exec.py` scripts the SIGTERM latch). Contrast `signals.terminate_as_interrupt` (used by `stream`/`agent`/`speak`), which routes SIGTERM into the *cancel* path instead. - **`agent/`** — full-duplex voice agent (mic in, TTS out via `voices.py`). -- **`agent_cascade/`** + `commands/agent_cascade/` — `assembly agent-cascade`: the same live terminal conversation as `assembly agent`, but **client-orchestrated** — `engine.run_cascade` wires Streaming STT → the LLM Gateway → streaming TTS itself instead of talking to the Voice Agent endpoint, mirroring what the `agent-cascade` `assembly init` template does server-side. **Sandbox-only** (streaming TTS has no prod host; guarded via `tts.session.require_available`). Reuses the agent slice's `DuplexAudio`/`AgentRenderer` and `core.client.stream_audio`/`core.llm.complete`/`tts.session.synthesize`; the three network legs are injected through `engine.CascadeDeps` (the `tts/session.py` seam) so the cascade — greeting, per-sentence TTS, barge-in, history window — is unit-tested against fakes with no sockets/mic/speaker. +- **`agent_cascade/`** + `commands/agent_cascade/` — `assembly agent-cascade`: the same live terminal conversation as `assembly agent`, but **client-orchestrated** — `engine.run_cascade` wires Streaming STT → the LLM Gateway → streaming TTS itself instead of talking to the Voice Agent endpoint, mirroring what the `agent-cascade` `assembly init` template does server-side. **Sandbox-only** (streaming TTS has no prod host; guarded via `tts.session.require_available`). Reuses the agent slice's `DuplexAudio`/`AgentRenderer` and `core.client.stream_audio`/`core.llm.complete`/`tts.session.synthesize`; the three network legs are injected through `engine.CascadeDeps` (the `tts/session.py` seam) so the cascade — greeting, per-sentence TTS, barge-in, history window — is unit-tested against fakes with no sockets/mic/speaker. The LLM leg is a deepagents graph (`brain.py`); under `-v` (`debuglog.active()`) `brain._run_graph` *streams* that graph instead of `invoke`-ing it and logs each tool call/result/interim line as it lands (reusing `code_agent.events.message_events`), so a spoken turn that stalls mid-tool is debuggable — plain `invoke` runs the whole loop internally and `-v` would otherwise show only the httpx lines. - **`tts/`** + `commands/speak.py` — `assembly speak` synthesizes text to speech over the sandbox streaming-TTS WebSocket (`streaming-tts.sandbox000.…`). **Sandbox-only:** `session.is_available()` is false in production (empty `Environment.streaming_tts_host`), so the command exits 2 with a `--sandbox` hint. `session.synthesize` drives a Begin→Generate→Flush→Audio→Terminate protocol with an injectable `connect` for hermetic tests (mirrors `agent/session.py`); `audio.py` plays the PCM (default) or writes a WAV (`--out`). The single-voice default-playback path **streams**: `synthesize`'s `on_audio(chunk, sample_rate)` callback is wired to `audio.PcmPlayer.feed`, so speech starts on the first Audio frame (it opens the device lazily, since the rate is only known at Begin) instead of after the whole text — the win for a long `--url` page. `--out` (needs the full buffer) and the multi-voice dialogue path (`synthesize_dialogue` → `_output_audio` → buffered `play_pcm`) stay buffered; `synthesize` still returns the complete PCM for the summary regardless. - **`code_agent/`** + `commands/code/` — `assembly code`: a terminal coding agent (a bespoke port of langchain-ai/deepagents' `code` agent) that talks **only** to the LLM Gateway. `model.py` pins the model to `ChatOpenAI` against `llm_gateway_base`; `agent.py` builds the deepagents graph over a cwd-scoped `LocalShellBackend` (filesystem + shell tools), plus extra tools: the custom `assembly` CLI tool (`cli_tool.py`, runs `python -m aai_cli` with the key via child env, never argv), a URL `fetch_url` tool (`fetch_tool.py`), Tavily web search when `TAVILY_API_KEY` is set (`web_search.py`), an `ask_user` tool routed through an `AskBridge` to the front-end (`ask_tool.py`), and best-effort docs MCP tools (`docs_mcp.py`). Middleware adds installed skills (`skills.py`) and long-term memory (`memory.py`), each over its own dedicated backend. Sessions persist via a SQLite checkpointer (`store.py`) keyed by `--session`, so conversations resume. Approval gates the mutating tools (write/edit/execute/`assembly`/`fetch_url`); the general-purpose `task` subagent comes from deepagents by default. `session.py` drives the graph turn-by-turn (interrupt/resume = human approval), emitting framework-agnostic `events.py` to either the Textual TUI (`tui.py`, modeled on deepagents-code: transcript + input + approval/ask modals + clipboard copy) or the Rich fallback (`render.py`). The whole orchestration is tested by driving the **real** graph with a fake `BaseChatModel` (`tests/test_code_agent.py`), so no network/TTY is needed. **Voice is the default front-end in an interactive TTY** (`voice.py` + `_exec._run_voice`): `VoiceSession.listen` captures one spoken turn over Streaming STT (gating the mic shut the instant a turn finalizes) and `VoiceSession.speak` reads each assistant reply back over streaming TTS. It runs the **Rich REPL** loop (not the keyboard TUI) with a voice `read_line` + a reply-speaking sink. Readback needs streaming TTS, so it's **sandbox-only** (`tts.session.is_available`); in production the mic input still works and replies stay on screen. A mic-less box degrades to typed input on the first `AUDIO_ERROR_TYPES` `CLIError`; `--no-voice` selects the TUI, and a non-TTY (pipe/CI) the headless loop. Both legs (STT/TTS) are injected like the cascade's, so `tests/test_code_voice.py` drives it with fakes — no mic/speaker/socket. - **`code_gen/`** — backs `--show-code` on `transcribe`/`stream`/`agent`: builds a ready-to-run Python SDK script from exactly the flags passed (no API key needed; generated code reads `ASSEMBLYAI_API_KEY`). diff --git a/aai_cli/agent_cascade/brain.py b/aai_cli/agent_cascade/brain.py index 966e3e6..210c6c0 100644 --- a/aai_cli/agent_cascade/brain.py +++ b/aai_cli/agent_cascade/brain.py @@ -16,6 +16,7 @@ from __future__ import annotations +import logging from collections.abc import Callable, Sequence from typing import TYPE_CHECKING @@ -23,11 +24,23 @@ from aai_cli.code_agent.agent import CompiledAgent from aai_cli.code_agent.fetch_tool import FETCH_TOOL_NAME from aai_cli.code_agent.web_search import WEB_SEARCH_TOOL_NAME +from aai_cli.core import debuglog if TYPE_CHECKING: from langchain_core.tools import BaseTool from openai.types.chat import ChatCompletionMessageParam +# Verbose (`-v`) flow logging for the agent's tool loop. `invoke` runs the whole loop +# internally, so without this `-v` only shows the httpx request lines and never which +# tools the agent reached for or what they returned — exactly what you need to see when +# a spoken turn stalls mid-tool. Logged at INFO so plain `-v` surfaces it. +_FLOW_LOG = logging.getLogger("aai_cli.agent_cascade.brain") + +# Tool outputs (a fetched page, a search payload) can be huge; cap what we log per result +# so a single tool call doesn't bury the rest of the flow in stderr. The exact cap is an +# arbitrary tuning knob — a +-1 shift is behaviorally equivalent, so no test can kill it. +_RESULT_LOG_CAP = 500 # pragma: no mutate + # Closes every guidance variant: the reply is spoken, so it must stay short and plain. _SPOKEN_TAIL = ( "Your reply is read aloud, so keep it short and spoken — no markdown, lists, code, or raw URLs." @@ -147,18 +160,71 @@ def build_completer( The cascade prepends its own ``system`` message to the history each turn; the graph already owns the system prompt, so we drop it before invoking. The graph runs the - full tool loop and we return its final spoken text. ``graph`` is injected in tests - so the per-turn wiring runs against a fake with no network. + full tool loop and we return its final spoken text. Under ``-v`` the loop is streamed + so each tool call/result is logged as it lands (see :func:`_run_graph`). ``graph`` is + injected in tests so the per-turn wiring runs against a fake with no network. """ resolved = build_graph(api_key, config) if graph is None else graph def complete_reply(messages: list[ChatCompletionMessageParam]) -> str: conversation = [message for message in messages if message.get("role") != "system"] - return _reply_text(resolved.invoke({"messages": conversation})) + return _reply_text(_run_graph(resolved, conversation)) return complete_reply +def _run_graph( + graph: CompiledAgent, conversation: list[ChatCompletionMessageParam] +) -> dict[str, object]: + """Run one turn through the graph, returning its end state. + + Normally a single ``invoke`` (the whole tool loop runs internally). Under verbose + mode, and when the graph can stream, drive it as incremental state snapshots instead + so :func:`_log_flow` can surface each tool call/result on stderr as it happens — which + is what makes a stalled spoken turn debuggable. The test fakes only implement + ``invoke``, so they (and the non-verbose path) take the plain branch. + """ + graph_input = {"messages": conversation} + if debuglog.active() and hasattr(graph, "stream"): + last: dict[str, object] = {} + seen = 0 + for chunk in graph.stream(graph_input, None, stream_mode="values"): + seen = _log_flow(chunk, seen) + last = chunk + return last + return graph.invoke(graph_input) + + +def _log_flow(state: dict[str, object], seen: int) -> int: + """Log the tool calls/results added to ``state`` since the first ``seen`` messages. + + Reuses the coding agent's message→event vocabulary so the flow log knows the same + AIMessage/ToolMessage shapes the TUI does. Returns the new high-water message count + so the next snapshot only logs what it added. + """ + from aai_cli.code_agent.events import AssistantText, ToolCall, ToolResult, message_events + + messages = state.get("messages") + if not isinstance(messages, list): + return seen + for message in messages[seen:]: + for event in message_events(message, announce_calls=True): + if isinstance(event, ToolCall): + _FLOW_LOG.info("tool call %s args=%s", event.name, event.args) + elif isinstance(event, ToolResult): + _FLOW_LOG.info("tool result %s -> %s", event.name, _clip(event.content)) + elif isinstance(event, AssistantText): + _FLOW_LOG.info("llm: %s", event.text) + return len(messages) + + +def _clip(text: str) -> str: + """Truncate a tool result for the flow log, marking that it was shortened.""" + if len(text) <= _RESULT_LOG_CAP: + return text + return f"{text[:_RESULT_LOG_CAP]}… ({len(text)} chars)" + + def _reply_text(result: dict[str, object]) -> str: """The agent's final spoken reply: the last assistant message that carries text. diff --git a/tests/test_agent_cascade_brain.py b/tests/test_agent_cascade_brain.py index 9f0509a..7c116a3 100644 --- a/tests/test_agent_cascade_brain.py +++ b/tests/test_agent_cascade_brain.py @@ -8,8 +8,10 @@ from __future__ import annotations +import logging + from langchain_core.language_models.chat_models import BaseChatModel -from langchain_core.messages import AIMessage +from langchain_core.messages import AIMessage, ToolMessage from langchain_core.outputs import ChatGeneration, ChatResult from aai_cli.agent_cascade import brain @@ -131,6 +133,121 @@ def invoke(self, value): assert roles == ["user"] +# --- _run_graph / _log_flow (verbose tool-call flow) ------------------------- + + +class _StreamingGraph: + """A graph that streams scripted state snapshots (the shape the real graph yields). + + Records the kwargs it was streamed with so a test can prove ``_run_graph`` asked for + incremental value snapshots, and exposes an ``invoke`` that must never run on the + verbose path.""" + + def __init__(self, snapshots): + self.snapshots = snapshots + self.stream_kwargs = None + self.invoked = False + + def stream(self, graph_input, config, *, stream_mode): + del graph_input, config + self.stream_kwargs = stream_mode + yield from self.snapshots + + def invoke(self, graph_input): + del graph_input + self.invoked = True + return {"messages": []} + + +def _search_call_message(): + return AIMessage( + content="Let me search.", + tool_calls=[{"name": "tavily_search", "args": {"query": "weather"}, "id": "c1"}], + ) + + +def test_run_graph_streams_and_logs_flow_when_verbose(monkeypatch, caplog, preserve_logging_state): + # Verbose mode streams the loop and logs each step — the assistant's interim line, the + # tool call (name + args), and the tool result — so a stalled spoken turn is debuggable. + monkeypatch.setattr(brain.debuglog, "active", lambda: True) + call = _search_call_message() + snapshots = [ + {"messages": [call]}, + { + "messages": [ + call, + ToolMessage(content="rainy, 52F", name="tavily_search", tool_call_id="c1"), + ] + }, + { + "messages": [ + call, + ToolMessage(content="rainy, 52F", name="tavily_search", tool_call_id="c1"), + AIMessage(content="It's rainy and 52 degrees in Portland."), + ] + }, + ] + graph = _StreamingGraph(snapshots) + completer = brain.build_completer("k", CascadeConfig(), graph=graph) + with caplog.at_level(logging.INFO, logger="aai_cli.agent_cascade.brain"): + reply = completer([{"role": "user", "content": "weather?"}]) + # The streamed final state still yields the spoken reply, and the graph was streamed + # for incremental value snapshots (not invoked). + assert reply == "It's rainy and 52 degrees in Portland." + assert graph.stream_kwargs == "values" + assert graph.invoked is False + # The flow log carries the tool call (with its args), the tool result, and the interim + # assistant line — each logged exactly once despite the growing snapshots. + messages = [record.getMessage() for record in caplog.records] + assert messages == [ + "llm: Let me search.", + "tool call tavily_search args={'query': 'weather'}", + "tool result tavily_search -> rainy, 52F", + "llm: It's rainy and 52 degrees in Portland.", + ] + + +def test_run_graph_invokes_when_not_verbose(): + # Default (non-verbose): the graph is invoked once, never streamed, and nothing is logged. + graph = _StreamingGraph([{"messages": [AIMessage(content="hi")]}]) + completer = brain.build_completer("k", CascadeConfig(), graph=graph) + assert completer([{"role": "user", "content": "hi"}]) == "" + assert graph.invoked is True + assert graph.stream_kwargs is None + + +def test_run_graph_invokes_when_graph_cannot_stream(monkeypatch): + # Verbose but the (test) graph only implements invoke: fall back to invoke rather than + # crashing on a missing .stream — the fakes and any non-streaming graph stay supported. + monkeypatch.setattr(brain.debuglog, "active", lambda: True) + + class _InvokeOnly: + def invoke(self, graph_input): + del graph_input + return {"messages": [AIMessage(content="from invoke")]} + + completer = brain.build_completer("k", CascadeConfig(), graph=_InvokeOnly()) + assert completer([{"role": "user", "content": "hi"}]) == "from invoke" + + +def test_log_flow_ignores_non_list_messages(): + # Defensive: a snapshot without a messages list logs nothing and reports no progress. + assert brain._log_flow({"messages": None}, 3) == 3 + + +def test_clip_passes_short_text_and_truncates_long_text(): + assert brain._clip("short") == "short" + # A result exactly at the cap is left whole (the boundary is inclusive). + at_cap = "y" * brain._RESULT_LOG_CAP + assert brain._clip(at_cap) == at_cap + long = "x" * (brain._RESULT_LOG_CAP + 5000) + clipped = brain._clip(long) + # Only the first _RESULT_LOG_CAP chars survive, with a marker noting the full length — + # so a multi-KB tool payload can't bury the rest of the flow in stderr. + assert clipped == "x" * brain._RESULT_LOG_CAP + f"… ({len(long)} chars)" + assert len(clipped) < len(long) + + # --- _reply_text / _content_text --------------------------------------------- @@ -153,7 +270,6 @@ def test_reply_text_joins_list_content_blocks(): def test_reply_text_skips_non_assistant_messages(): - from langchain_core.messages import ToolMessage # Scanning from the end, a trailing non-assistant message (e.g. a tool result) is # skipped — the spoken reply is the AIMessage before it. From eb60cff3b577687db473fdf4d6f201c273d57ca5 Mon Sep 17 00:00:00 2001 From: Claude Date: Thu, 18 Jun 2026 17:50:03 +0000 Subject: [PATCH 2/2] Flatten tool-result whitespace in the -v flow log MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Untrusted tool output (a fetched page, a search payload) is logged in the verbose flow log; embedded CR/LF could forge fake `[aai_cli.…]` log lines. `_clip` now collapses all whitespace to single spaces before truncating, so each result stays on one line and can't inject log lines. (Secrets remain masked separately by the debuglog formatter.) Addresses the Aikido review note on PR #243. Co-Authored-By: Claude Opus 4.8 Claude-Session: https://claude.ai/code/session_01T3YQmYYiZDEZkDvWqXYk6D --- aai_cli/agent_cascade/brain.py | 16 ++++++++++++---- tests/test_agent_cascade_brain.py | 10 ++++++++++ 2 files changed, 22 insertions(+), 4 deletions(-) diff --git a/aai_cli/agent_cascade/brain.py b/aai_cli/agent_cascade/brain.py index 210c6c0..5f8b2b3 100644 --- a/aai_cli/agent_cascade/brain.py +++ b/aai_cli/agent_cascade/brain.py @@ -219,10 +219,18 @@ def _log_flow(state: dict[str, object], seen: int) -> int: def _clip(text: str) -> str: - """Truncate a tool result for the flow log, marking that it was shortened.""" - if len(text) <= _RESULT_LOG_CAP: - return text - return f"{text[:_RESULT_LOG_CAP]}… ({len(text)} chars)" + """Flatten a tool result onto one line and truncate it for the flow log. + + Tool output is untrusted external content (a fetched page, a search payload), so its + whitespace — newlines especially — is collapsed before logging: a result can't then + forge extra ``[aai_cli.…]`` log lines, and each result stays on one readable line. The + length is capped so a multi-KB payload can't bury the rest of the flow. (Secrets are + separately masked by the debuglog formatter across every record.) + """ + flattened = " ".join(text.split()) + if len(flattened) <= _RESULT_LOG_CAP: + return flattened + return f"{flattened[:_RESULT_LOG_CAP]}… ({len(flattened)} chars)" def _reply_text(result: dict[str, object]) -> str: diff --git a/tests/test_agent_cascade_brain.py b/tests/test_agent_cascade_brain.py index 7c116a3..3a446a7 100644 --- a/tests/test_agent_cascade_brain.py +++ b/tests/test_agent_cascade_brain.py @@ -248,6 +248,16 @@ def test_clip_passes_short_text_and_truncates_long_text(): assert len(clipped) < len(long) +def test_clip_flattens_whitespace_so_tool_output_cant_forge_log_lines(): + # Tool output is untrusted: a result with embedded CR/LF could otherwise inject fake + # "[aai_cli.…]" log lines. _clip collapses all whitespace runs to single spaces, so the + # result stays on one line. + forged = "ok\n[aai_cli.agent_cascade.brain] tool call rm_rf args={}\r\nmore" + assert brain._clip(forged) == "ok [aai_cli.agent_cascade.brain] tool call rm_rf args={} more" + assert "\n" not in brain._clip(forged) + assert "\r" not in brain._clip(forged) + + # --- _reply_text / _content_text ---------------------------------------------