diff --git a/aai_cli/AGENTS.md b/aai_cli/AGENTS.md index d63d451..8a27f3c 100644 --- a/aai_cli/AGENTS.md +++ b/aai_cli/AGENTS.md @@ -151,7 +151,7 @@ heavily-reworked commands with long bodies; small commands keep the inline - **`streaming/`** + `client.stream_audio` — v3 realtime API. Event callbacks run on the SDK reader thread and guard against `BrokenPipeError` (`stdio.silence_stdout()`) so a closed pipe never dumps a thread traceback. - **`core/sync_stt.py`** + **`core/signals.py`** + `commands/dictate/` — `assembly dictate`: headless dictation over the **Sync STT API** (`Environment.sync_base`, one POST `/transcribe` per utterance with the required `X-AAI-Model: u3-sync-pro` header; 80 ms–120 s of PCM/WAV). It needs no terminal: recording starts immediately and `dictate_exec._record` polls `signals.stop_on_terminate` between ~100 ms mic chunks for a SIGTERM, which finishes the utterance (clean exit 0) — so a hotkey tool like Hammerspoon can launch it as a background task and `kill -TERM`/`task:terminate()` to transcribe. SIGINT (Ctrl-C) still cancels (exit 130). Both boundaries (the stop latch, mic, HTTP) are injectable, so the suite never needs a real signal or microphone (`tests/test_dictate_exec.py` scripts the SIGTERM latch). Contrast `signals.terminate_as_interrupt` (used by `stream`/`agent`/`speak`), which routes SIGTERM into the *cancel* path instead. - **`agent/`** — full-duplex voice agent (mic in, TTS out via `voices.py`). -- **`agent_cascade/`** + `commands/agent_cascade/` — `assembly agent-cascade`: the same live terminal conversation as `assembly agent`, but **client-orchestrated** — `engine.run_cascade` wires Streaming STT → the LLM Gateway → streaming TTS itself instead of talking to the Voice Agent endpoint, mirroring what the `agent-cascade` `assembly init` template does server-side. **Sandbox-only** (streaming TTS has no prod host; guarded via `tts.session.require_available`). Reuses the agent slice's `DuplexAudio`/`AgentRenderer` and `core.client.stream_audio`/`core.llm.complete`/`tts.session.synthesize`; the three network legs are injected through `engine.CascadeDeps` (the `tts/session.py` seam) so the cascade — greeting, per-sentence TTS, barge-in, history window — is unit-tested against fakes with no sockets/mic/speaker. +- **`agent_cascade/`** + `commands/agent_cascade/` — `assembly agent-cascade`: the same live terminal conversation as `assembly agent`, but **client-orchestrated** — `engine.run_cascade` wires Streaming STT → the LLM Gateway → streaming TTS itself instead of talking to the Voice Agent endpoint, mirroring what the `agent-cascade` `assembly init` template does server-side. **Sandbox-only** (streaming TTS has no prod host; guarded via `tts.session.require_available`). Reuses the agent slice's `DuplexAudio`/`AgentRenderer` and `core.client.stream_audio`/`core.llm.complete`/`tts.session.synthesize`; the three network legs are injected through `engine.CascadeDeps` (the `tts/session.py` seam) so the cascade — greeting, per-sentence TTS, barge-in, history window — is unit-tested against fakes with no sockets/mic/speaker. The LLM leg is a deepagents graph (`brain.py`); under `-v` (`debuglog.active()`) `brain._run_graph` *streams* that graph instead of `invoke`-ing it and logs each tool call/result/interim line as it lands (reusing `code_agent.events.message_events`), so a spoken turn that stalls mid-tool is debuggable — plain `invoke` runs the whole loop internally and `-v` would otherwise show only the httpx lines. - **`tts/`** + `commands/speak.py` — `assembly speak` synthesizes text to speech over the sandbox streaming-TTS WebSocket (`streaming-tts.sandbox000.…`). **Sandbox-only:** `session.is_available()` is false in production (empty `Environment.streaming_tts_host`), so the command exits 2 with a `--sandbox` hint. `session.synthesize` drives a Begin→Generate→Flush→Audio→Terminate protocol with an injectable `connect` for hermetic tests (mirrors `agent/session.py`); `audio.py` plays the PCM (default) or writes a WAV (`--out`). The single-voice default-playback path **streams**: `synthesize`'s `on_audio(chunk, sample_rate)` callback is wired to `audio.PcmPlayer.feed`, so speech starts on the first Audio frame (it opens the device lazily, since the rate is only known at Begin) instead of after the whole text — the win for a long `--url` page. `--out` (needs the full buffer) and the multi-voice dialogue path (`synthesize_dialogue` → `_output_audio` → buffered `play_pcm`) stay buffered; `synthesize` still returns the complete PCM for the summary regardless. - **`code_agent/`** + `commands/code/` — `assembly code`: a terminal coding agent (a bespoke port of langchain-ai/deepagents' `code` agent) that talks **only** to the LLM Gateway. `model.py` pins the model to `ChatOpenAI` against `llm_gateway_base`; `agent.py` builds the deepagents graph over a cwd-scoped `LocalShellBackend` (filesystem + shell tools), plus extra tools: the custom `assembly` CLI tool (`cli_tool.py`, runs `python -m aai_cli` with the key via child env, never argv), a URL `fetch_url` tool (`fetch_tool.py`), Tavily web search when `TAVILY_API_KEY` is set (`web_search.py`), an `ask_user` tool routed through an `AskBridge` to the front-end (`ask_tool.py`), and best-effort docs MCP tools (`docs_mcp.py`). Middleware adds installed skills (`skills.py`) and long-term memory (`memory.py`), each over its own dedicated backend. Sessions persist via a SQLite checkpointer (`store.py`) keyed by `--session`, so conversations resume. Approval gates the mutating tools (write/edit/execute/`assembly`/`fetch_url`); the general-purpose `task` subagent comes from deepagents by default. `session.py` drives the graph turn-by-turn (interrupt/resume = human approval), emitting framework-agnostic `events.py` to either the Textual TUI (`tui.py`, modeled on deepagents-code: transcript + input + approval/ask modals + clipboard copy) or the Rich fallback (`render.py`). The whole orchestration is tested by driving the **real** graph with a fake `BaseChatModel` (`tests/test_code_agent.py`), so no network/TTY is needed. **Voice is the default front-end in an interactive TTY** (`voice.py` + `_exec._run_voice`): `VoiceSession.listen` captures one spoken turn over Streaming STT (gating the mic shut the instant a turn finalizes) and `VoiceSession.speak` reads each assistant reply back over streaming TTS. It runs the **Rich REPL** loop (not the keyboard TUI) with a voice `read_line` + a reply-speaking sink. Readback needs streaming TTS, so it's **sandbox-only** (`tts.session.is_available`); in production the mic input still works and replies stay on screen. A mic-less box degrades to typed input on the first `AUDIO_ERROR_TYPES` `CLIError`; `--no-voice` selects the TUI, and a non-TTY (pipe/CI) the headless loop. Both legs (STT/TTS) are injected like the cascade's, so `tests/test_code_voice.py` drives it with fakes — no mic/speaker/socket. - **`code_gen/`** — backs `--show-code` on `transcribe`/`stream`/`agent`: builds a ready-to-run Python SDK script from exactly the flags passed (no API key needed; generated code reads `ASSEMBLYAI_API_KEY`). diff --git a/aai_cli/agent_cascade/brain.py b/aai_cli/agent_cascade/brain.py index 966e3e6..5f8b2b3 100644 --- a/aai_cli/agent_cascade/brain.py +++ b/aai_cli/agent_cascade/brain.py @@ -16,6 +16,7 @@ from __future__ import annotations +import logging from collections.abc import Callable, Sequence from typing import TYPE_CHECKING @@ -23,11 +24,23 @@ from aai_cli.code_agent.agent import CompiledAgent from aai_cli.code_agent.fetch_tool import FETCH_TOOL_NAME from aai_cli.code_agent.web_search import WEB_SEARCH_TOOL_NAME +from aai_cli.core import debuglog if TYPE_CHECKING: from langchain_core.tools import BaseTool from openai.types.chat import ChatCompletionMessageParam +# Verbose (`-v`) flow logging for the agent's tool loop. `invoke` runs the whole loop +# internally, so without this `-v` only shows the httpx request lines and never which +# tools the agent reached for or what they returned — exactly what you need to see when +# a spoken turn stalls mid-tool. Logged at INFO so plain `-v` surfaces it. +_FLOW_LOG = logging.getLogger("aai_cli.agent_cascade.brain") + +# Tool outputs (a fetched page, a search payload) can be huge; cap what we log per result +# so a single tool call doesn't bury the rest of the flow in stderr. The exact cap is an +# arbitrary tuning knob — a +-1 shift is behaviorally equivalent, so no test can kill it. +_RESULT_LOG_CAP = 500 # pragma: no mutate + # Closes every guidance variant: the reply is spoken, so it must stay short and plain. _SPOKEN_TAIL = ( "Your reply is read aloud, so keep it short and spoken — no markdown, lists, code, or raw URLs." @@ -147,18 +160,79 @@ def build_completer( The cascade prepends its own ``system`` message to the history each turn; the graph already owns the system prompt, so we drop it before invoking. The graph runs the - full tool loop and we return its final spoken text. ``graph`` is injected in tests - so the per-turn wiring runs against a fake with no network. + full tool loop and we return its final spoken text. Under ``-v`` the loop is streamed + so each tool call/result is logged as it lands (see :func:`_run_graph`). ``graph`` is + injected in tests so the per-turn wiring runs against a fake with no network. """ resolved = build_graph(api_key, config) if graph is None else graph def complete_reply(messages: list[ChatCompletionMessageParam]) -> str: conversation = [message for message in messages if message.get("role") != "system"] - return _reply_text(resolved.invoke({"messages": conversation})) + return _reply_text(_run_graph(resolved, conversation)) return complete_reply +def _run_graph( + graph: CompiledAgent, conversation: list[ChatCompletionMessageParam] +) -> dict[str, object]: + """Run one turn through the graph, returning its end state. + + Normally a single ``invoke`` (the whole tool loop runs internally). Under verbose + mode, and when the graph can stream, drive it as incremental state snapshots instead + so :func:`_log_flow` can surface each tool call/result on stderr as it happens — which + is what makes a stalled spoken turn debuggable. The test fakes only implement + ``invoke``, so they (and the non-verbose path) take the plain branch. + """ + graph_input = {"messages": conversation} + if debuglog.active() and hasattr(graph, "stream"): + last: dict[str, object] = {} + seen = 0 + for chunk in graph.stream(graph_input, None, stream_mode="values"): + seen = _log_flow(chunk, seen) + last = chunk + return last + return graph.invoke(graph_input) + + +def _log_flow(state: dict[str, object], seen: int) -> int: + """Log the tool calls/results added to ``state`` since the first ``seen`` messages. + + Reuses the coding agent's message→event vocabulary so the flow log knows the same + AIMessage/ToolMessage shapes the TUI does. Returns the new high-water message count + so the next snapshot only logs what it added. + """ + from aai_cli.code_agent.events import AssistantText, ToolCall, ToolResult, message_events + + messages = state.get("messages") + if not isinstance(messages, list): + return seen + for message in messages[seen:]: + for event in message_events(message, announce_calls=True): + if isinstance(event, ToolCall): + _FLOW_LOG.info("tool call %s args=%s", event.name, event.args) + elif isinstance(event, ToolResult): + _FLOW_LOG.info("tool result %s -> %s", event.name, _clip(event.content)) + elif isinstance(event, AssistantText): + _FLOW_LOG.info("llm: %s", event.text) + return len(messages) + + +def _clip(text: str) -> str: + """Flatten a tool result onto one line and truncate it for the flow log. + + Tool output is untrusted external content (a fetched page, a search payload), so its + whitespace — newlines especially — is collapsed before logging: a result can't then + forge extra ``[aai_cli.…]`` log lines, and each result stays on one readable line. The + length is capped so a multi-KB payload can't bury the rest of the flow. (Secrets are + separately masked by the debuglog formatter across every record.) + """ + flattened = " ".join(text.split()) + if len(flattened) <= _RESULT_LOG_CAP: + return flattened + return f"{flattened[:_RESULT_LOG_CAP]}… ({len(flattened)} chars)" + + def _reply_text(result: dict[str, object]) -> str: """The agent's final spoken reply: the last assistant message that carries text. diff --git a/tests/test_agent_cascade_brain.py b/tests/test_agent_cascade_brain.py index 9f0509a..3a446a7 100644 --- a/tests/test_agent_cascade_brain.py +++ b/tests/test_agent_cascade_brain.py @@ -8,8 +8,10 @@ from __future__ import annotations +import logging + from langchain_core.language_models.chat_models import BaseChatModel -from langchain_core.messages import AIMessage +from langchain_core.messages import AIMessage, ToolMessage from langchain_core.outputs import ChatGeneration, ChatResult from aai_cli.agent_cascade import brain @@ -131,6 +133,131 @@ def invoke(self, value): assert roles == ["user"] +# --- _run_graph / _log_flow (verbose tool-call flow) ------------------------- + + +class _StreamingGraph: + """A graph that streams scripted state snapshots (the shape the real graph yields). + + Records the kwargs it was streamed with so a test can prove ``_run_graph`` asked for + incremental value snapshots, and exposes an ``invoke`` that must never run on the + verbose path.""" + + def __init__(self, snapshots): + self.snapshots = snapshots + self.stream_kwargs = None + self.invoked = False + + def stream(self, graph_input, config, *, stream_mode): + del graph_input, config + self.stream_kwargs = stream_mode + yield from self.snapshots + + def invoke(self, graph_input): + del graph_input + self.invoked = True + return {"messages": []} + + +def _search_call_message(): + return AIMessage( + content="Let me search.", + tool_calls=[{"name": "tavily_search", "args": {"query": "weather"}, "id": "c1"}], + ) + + +def test_run_graph_streams_and_logs_flow_when_verbose(monkeypatch, caplog, preserve_logging_state): + # Verbose mode streams the loop and logs each step — the assistant's interim line, the + # tool call (name + args), and the tool result — so a stalled spoken turn is debuggable. + monkeypatch.setattr(brain.debuglog, "active", lambda: True) + call = _search_call_message() + snapshots = [ + {"messages": [call]}, + { + "messages": [ + call, + ToolMessage(content="rainy, 52F", name="tavily_search", tool_call_id="c1"), + ] + }, + { + "messages": [ + call, + ToolMessage(content="rainy, 52F", name="tavily_search", tool_call_id="c1"), + AIMessage(content="It's rainy and 52 degrees in Portland."), + ] + }, + ] + graph = _StreamingGraph(snapshots) + completer = brain.build_completer("k", CascadeConfig(), graph=graph) + with caplog.at_level(logging.INFO, logger="aai_cli.agent_cascade.brain"): + reply = completer([{"role": "user", "content": "weather?"}]) + # The streamed final state still yields the spoken reply, and the graph was streamed + # for incremental value snapshots (not invoked). + assert reply == "It's rainy and 52 degrees in Portland." + assert graph.stream_kwargs == "values" + assert graph.invoked is False + # The flow log carries the tool call (with its args), the tool result, and the interim + # assistant line — each logged exactly once despite the growing snapshots. + messages = [record.getMessage() for record in caplog.records] + assert messages == [ + "llm: Let me search.", + "tool call tavily_search args={'query': 'weather'}", + "tool result tavily_search -> rainy, 52F", + "llm: It's rainy and 52 degrees in Portland.", + ] + + +def test_run_graph_invokes_when_not_verbose(): + # Default (non-verbose): the graph is invoked once, never streamed, and nothing is logged. + graph = _StreamingGraph([{"messages": [AIMessage(content="hi")]}]) + completer = brain.build_completer("k", CascadeConfig(), graph=graph) + assert completer([{"role": "user", "content": "hi"}]) == "" + assert graph.invoked is True + assert graph.stream_kwargs is None + + +def test_run_graph_invokes_when_graph_cannot_stream(monkeypatch): + # Verbose but the (test) graph only implements invoke: fall back to invoke rather than + # crashing on a missing .stream — the fakes and any non-streaming graph stay supported. + monkeypatch.setattr(brain.debuglog, "active", lambda: True) + + class _InvokeOnly: + def invoke(self, graph_input): + del graph_input + return {"messages": [AIMessage(content="from invoke")]} + + completer = brain.build_completer("k", CascadeConfig(), graph=_InvokeOnly()) + assert completer([{"role": "user", "content": "hi"}]) == "from invoke" + + +def test_log_flow_ignores_non_list_messages(): + # Defensive: a snapshot without a messages list logs nothing and reports no progress. + assert brain._log_flow({"messages": None}, 3) == 3 + + +def test_clip_passes_short_text_and_truncates_long_text(): + assert brain._clip("short") == "short" + # A result exactly at the cap is left whole (the boundary is inclusive). + at_cap = "y" * brain._RESULT_LOG_CAP + assert brain._clip(at_cap) == at_cap + long = "x" * (brain._RESULT_LOG_CAP + 5000) + clipped = brain._clip(long) + # Only the first _RESULT_LOG_CAP chars survive, with a marker noting the full length — + # so a multi-KB tool payload can't bury the rest of the flow in stderr. + assert clipped == "x" * brain._RESULT_LOG_CAP + f"… ({len(long)} chars)" + assert len(clipped) < len(long) + + +def test_clip_flattens_whitespace_so_tool_output_cant_forge_log_lines(): + # Tool output is untrusted: a result with embedded CR/LF could otherwise inject fake + # "[aai_cli.…]" log lines. _clip collapses all whitespace runs to single spaces, so the + # result stays on one line. + forged = "ok\n[aai_cli.agent_cascade.brain] tool call rm_rf args={}\r\nmore" + assert brain._clip(forged) == "ok [aai_cli.agent_cascade.brain] tool call rm_rf args={} more" + assert "\n" not in brain._clip(forged) + assert "\r" not in brain._clip(forged) + + # --- _reply_text / _content_text --------------------------------------------- @@ -153,7 +280,6 @@ def test_reply_text_joins_list_content_blocks(): def test_reply_text_skips_non_assistant_messages(): - from langchain_core.messages import ToolMessage # Scanning from the end, a trailing non-assistant message (e.g. a tool result) is # skipped — the spoken reply is the AIMessage before it.