diff --git a/aai_cli/AGENTS.md b/aai_cli/AGENTS.md index 2a4e848..fd2f352 100644 --- a/aai_cli/AGENTS.md +++ b/aai_cli/AGENTS.md @@ -151,7 +151,7 @@ heavily-reworked commands with long bodies; small commands keep the inline - **`streaming/`** + `client.stream_audio` — v3 realtime API. Event callbacks run on the SDK reader thread and guard against `BrokenPipeError` (`stdio.silence_stdout()`) so a closed pipe never dumps a thread traceback. - **`core/sync_stt.py`** + **`core/signals.py`** + `commands/dictate/` — `assembly dictate`: headless dictation over the **Sync STT API** (`Environment.sync_base`, one POST `/transcribe` per utterance with the required `X-AAI-Model: u3-sync-pro` header; 80 ms–120 s of PCM/WAV). It needs no terminal: recording starts immediately and `dictate_exec._record` polls `signals.stop_on_terminate` between ~100 ms mic chunks for a SIGTERM, which finishes the utterance (clean exit 0) — so a hotkey tool like Hammerspoon can launch it as a background task and `kill -TERM`/`task:terminate()` to transcribe. SIGINT (Ctrl-C) still cancels (exit 130). Both boundaries (the stop latch, mic, HTTP) are injectable, so the suite never needs a real signal or microphone (`tests/test_dictate_exec.py` scripts the SIGTERM latch). Contrast `signals.terminate_as_interrupt` (used by `stream`/`agent`/`speak`), which routes SIGTERM into the *cancel* path instead. - **`agent/`** — full-duplex voice agent (mic in, TTS out via `voices.py`). -- **`agent_cascade/`** + `commands/agent_cascade/` — `assembly agent-cascade`: the same live terminal conversation as `assembly agent`, but **client-orchestrated** — `engine.run_cascade` wires Streaming STT → the LLM Gateway → streaming TTS itself instead of talking to the Voice Agent endpoint, mirroring what the `agent-cascade` `assembly init` template does server-side. **Sandbox-only** (streaming TTS has no prod host; guarded via `tts.session.require_available`). Reuses the agent slice's `DuplexAudio`/`AgentRenderer` and `core.client.stream_audio`/`core.llm.complete`/`tts.session.synthesize`; the three network legs are injected through `engine.CascadeDeps` (the `tts/session.py` seam) so the cascade — greeting, per-sentence TTS, barge-in, history window — is unit-tested against fakes with no sockets/mic/speaker. The LLM leg is a deepagents graph (`brain.py`); under `-v` (`debuglog.active()`) `brain._run_graph` *streams* that graph instead of `invoke`-ing it and logs each tool call/result/interim line as it lands (reusing `code_agent.events.message_events`), so a spoken turn that stalls mid-tool is debuggable — plain `invoke` runs the whole loop internally and `-v` would otherwise show only the httpx lines. +- **`agent_cascade/`** + `commands/agent_cascade/` — `assembly agent-cascade`: the same live terminal conversation as `assembly agent`, but **client-orchestrated** — `engine.run_cascade` wires Streaming STT → the LLM Gateway → streaming TTS itself instead of talking to the Voice Agent endpoint, mirroring what the `agent-cascade` `assembly init` template does server-side. **Sandbox-only** (streaming TTS has no prod host; guarded via `tts.session.require_available`). Reuses the agent slice's `DuplexAudio`/`AgentRenderer` and `core.client.stream_audio`/`core.llm.complete`/`tts.session.synthesize`; the three network legs are injected through `engine.CascadeDeps` (the `tts/session.py` seam) so the cascade — greeting, per-sentence TTS, barge-in, history window — is unit-tested against fakes with no sockets/mic/speaker. The LLM leg is a deepagents graph (`brain.py`); under `-v` (`debuglog.active()`) `brain._run_graph` *streams* that graph instead of `invoke`-ing it and logs each tool call/result/interim line as it lands (reusing `code_agent.events.message_events`), so a spoken turn that stalls mid-tool is debuggable — plain `invoke` runs the whole loop internally and `-v` would otherwise show only the httpx lines. **Front-end:** an interactive mic session in human mode runs a **voice-only Textual TUI** (`agent_cascade/tui.py`, `LiveAgentApp`) by default — there's no text input (you can't type to it), just a transcript + an animated voice bar tracking listening/thinking/speaking. It shares the `assembly code` TUI's chrome (`code_agent.banner` wordmark, `code_agent.messages` widgets, `code_agent.tui_status.voicebar_markup`/`VOICE_FRAMES`); the blocking `run_cascade` runs on a worker thread and reaches the UI through a `_TuiRenderer` (the `engine.Renderer` protocol) that hops each call onto the UI thread, and a quit calls `DuplexAudio.close` to end the mic iterator and unblock that worker. `_exec._should_use_tui` gates it: file/sample input, `--json`/`-o text`, and a non-TTY all fall back to the plain `AgentRenderer` line output. - **`tts/`** + `commands/speak.py` — `assembly speak` synthesizes text to speech over the sandbox streaming-TTS WebSocket (`streaming-tts.sandbox000.…`). **Sandbox-only:** `session.is_available()` is false in production (empty `Environment.streaming_tts_host`), so the command exits 2 with a `--sandbox` hint. `session.synthesize` drives a Begin→Generate→Flush→Audio→Terminate protocol with an injectable `connect` for hermetic tests (mirrors `agent/session.py`); `audio.py` plays the PCM (default) or writes a WAV (`--out`). The single-voice default-playback path **streams**: `synthesize`'s `on_audio(chunk, sample_rate)` callback is wired to `audio.PcmPlayer.feed`, so speech starts on the first Audio frame (it opens the device lazily, since the rate is only known at Begin) instead of after the whole text — the win for a long `--url` page. `--out` (needs the full buffer) and the multi-voice dialogue path (`synthesize_dialogue` → `_output_audio` → buffered `play_pcm`) stay buffered; `synthesize` still returns the complete PCM for the summary regardless. - **`code_agent/`** + `commands/code/` — `assembly code`: a terminal coding agent (a bespoke port of langchain-ai/deepagents' `code` agent) that talks **only** to the LLM Gateway. `model.py` pins the model to `ChatOpenAI` against `llm_gateway_base`; `agent.py` builds the deepagents graph over a cwd-scoped `LocalShellBackend` (filesystem + shell tools), plus extra tools: the custom `assembly` CLI tool (`cli_tool.py`, runs `python -m aai_cli` with the key via child env, never argv), a URL `fetch_url` tool (`fetch_tool.py`), Firecrawl web search when `FIRECRAWL_API_KEY` is set (`firecrawl_search.py`, shared with the live voice agent), an `ask_user` tool routed through an `AskBridge` to the front-end (`ask_tool.py`), and best-effort docs MCP tools (`docs_mcp.py`). Middleware adds installed skills (`skills.py`) and long-term memory (`memory.py`), each over its own dedicated backend. Sessions persist via a SQLite checkpointer (`store.py`) keyed by `--session`, so conversations resume. Approval gates the mutating tools (write/edit/execute/`assembly`/`fetch_url`); the general-purpose `task` subagent comes from deepagents by default. `session.py` drives the graph turn-by-turn (interrupt/resume = human approval), emitting framework-agnostic `events.py` to either the Textual TUI (`tui.py`, modeled on deepagents-code: transcript + input + approval/ask modals + clipboard copy) or the Rich fallback (`render.py`). The whole orchestration is tested by driving the **real** graph with a fake `BaseChatModel` (`tests/test_code_agent.py`), so no network/TTY is needed. **Voice is the default front-end in an interactive TTY** (`voice.py` + `_exec._run_voice`): `VoiceSession.listen` captures one spoken turn over Streaming STT (gating the mic shut the instant a turn finalizes) and `VoiceSession.speak` reads each assistant reply back over streaming TTS. It runs the **Rich REPL** loop (not the keyboard TUI) with a voice `read_line` + a reply-speaking sink. Readback needs streaming TTS, so it's **sandbox-only** (`tts.session.is_available`); in production the mic input still works and replies stay on screen. A mic-less box degrades to typed input on the first `AUDIO_ERROR_TYPES` `CLIError`; `--no-voice` selects the TUI, and a non-TTY (pipe/CI) the headless loop. Both legs (STT/TTS) are injected like the cascade's, so `tests/test_code_voice.py` drives it with fakes — no mic/speaker/socket. - **`code_gen/`** — backs `--show-code` on `transcribe`/`stream`/`agent`: builds a ready-to-run Python SDK script from exactly the flags passed (no API key needed; generated code reads `ASSEMBLYAI_API_KEY`). diff --git a/aai_cli/agent_cascade/tui.py b/aai_cli/agent_cascade/tui.py new file mode 100644 index 0000000..90607d1 --- /dev/null +++ b/aai_cli/agent_cascade/tui.py @@ -0,0 +1,259 @@ +"""A voice-only Textual UI for `assembly live` (the agent cascade). + +Shares the chrome of the `assembly code` TUI — the flat dark canvas, the ASSEMBLY +wordmark splash, the animated voice bar, and the transcript message widgets — but drops +the text prompt: `live` is a hands-free spoken conversation, so there is nothing to type. + +The cascade (Streaming STT -> LLM -> streaming TTS) is handed in as a blocking +``run_conversation`` driven on a worker thread; it streams transcript events back through a +:class:`_TuiRenderer` that hops each call onto the UI thread. The voice bar tracks the phase +(listening / thinking / speaking). A quit calls ``on_stop`` to close the audio, which ends the +mic iterator and unblocks that worker. +""" + +from __future__ import annotations + +import contextlib +import itertools +from typing import TYPE_CHECKING, ClassVar + +from textual.app import App, ComposeResult +from textual.containers import VerticalScroll +from textual.widgets import Static + +from aai_cli.code_agent import banner, tui_status +from aai_cli.code_agent.messages import AssistantMessage, ErrorMessage, Note, UserMessage +from aai_cli.core.errors import CLIError + +if TYPE_CHECKING: + from collections.abc import Callable + + from textual.timer import Timer + + from aai_cli.agent_cascade.engine import Renderer + +# Splash intro copy (the code agent's banner copy is code-specific, so `live` carries its own). +_READY_LINE = "Listening… start talking when you're ready." +_TIP_LINE = "Use headphones — the mic stays open while the agent speaks." +# The one-line footer: a hands-free session, so the only control is quit. +_STATUS_LINE = "Ctrl-C to quit" + + +class _TuiRenderer: + """Marshals cascade :class:`~aai_cli.agent_cascade.engine.Renderer` calls onto the UI thread. + + The cascade runs on a worker thread; every render call hops back via ``call_from_thread``. + Once the app has torn down (a quit mid-turn) that call raises ``RuntimeError`` — the event is + moot then, so it's dropped rather than surfaced as an unhandled worker-thread exception. + """ + + def __init__(self, app: LiveAgentApp) -> None: + self._app = app + + def connected(self) -> None: + self._dispatch(self._app.live_connected) + + def user_partial(self, text: str) -> None: + self._dispatch(self._app.show_user_partial, text) + + def user_final(self, text: str) -> None: + self._dispatch(self._app.show_user_final, text) + + def reply_started(self) -> None: + self._dispatch(self._app.begin_reply) + + def agent_transcript(self, text: str, *, interrupted: bool) -> None: + # Sentences are emitted before any barge-in check, so `interrupted` is always False + # here (the interrupted state is surfaced on reply_done); accept it for the protocol. + del interrupted # pragma: no mutate + self._dispatch(self._app.show_agent_sentence, text) + + def reply_done(self, *, interrupted: bool) -> None: + self._dispatch(lambda: self._app.end_reply(interrupted=interrupted)) + + def _dispatch(self, fn: Callable[..., None], *args: object) -> None: + if not self._app.is_running: + return + with contextlib.suppress(RuntimeError): + self._app.call_from_thread(fn, *args) + + +class LiveAgentApp(App[None]): + """The hands-free voice TUI: a scrolling transcript above an animated voice bar.""" + + # Flat pure-black canvas matching the `code` TUI: a bordered voice bar and a one-line + # footer, with no text prompt (there's nothing to type into a live voice session). + CSS = f""" + Screen {{ background: #000000; }} + #log {{ height: 1fr; border: none; background: #000000; padding: 1 2; }} + #voicebar {{ dock: bottom; height: 3; background: #000000; border: round {banner.BRAND_HEX}; + margin: 1 1; content-align: center middle; }} + #status {{ dock: bottom; height: 1; background: #000000; padding: 0 1; }} + """ + TITLE = "AssemblyAI Live" + ENABLE_COMMAND_PALETTE = False + # Ctrl-C / Ctrl-Q both stop the session; there is no turn to interrupt and nothing to type, + # so a single press quits (closing the audio unblocks the cascade worker). + BINDINGS: ClassVar = [ + ("ctrl+c", "stop", "Quit"), + ("ctrl+q", "stop", "Quit"), + ] + + def __init__( + self, + *, + run_conversation: Callable[[Renderer], None], + on_stop: Callable[[], None], + web_note: str | None = None, + ) -> None: + super().__init__() + self._run_conversation = run_conversation # blocking; runs the cascade given a Renderer + self._on_stop = on_stop # closes the audio so a quit unblocks the cascade worker + self._web_note = web_note + self._voice_phase = "listening" + self._voice_frames = itertools.cycle(tui_status.VOICE_FRAMES) + self._voice_timer: Timer | None = None + self._user_partial: UserMessage | None = None # the in-place "you: …" widget for a turn + self._reply_msg: AssistantMessage | None = None # the reply widget sentences stream into + self._stopped = False # guards on_stop against a double teardown (quit + unmount) + + def compose(self) -> ComposeResult: + yield VerticalScroll(id="log") + yield Static("", id="voicebar") + yield Static(f"[dim]{_STATUS_LINE}[/dim]", id="status") + + def on_mount(self) -> None: + self._write_splash() + if self._web_note: + self.notify(self._web_note, title="Web search disabled", severity="warning") + self._render_voicebar() + self._voice_timer = self.set_interval(0.3, self._tick_voice) # pragma: no mutate + # Defer the first mic open until after the splash has painted (a GIL-holding PortAudio + # open races Textual's initial render otherwise — same reason as the code TUI). + self.call_after_refresh(self._start) + + def _start(self) -> None: + # thread=True: the cascade is a blocking sync call; exclusive=True: one session at a time. + self.run_worker(self._run, thread=True, exclusive=True, name="cascade") # pragma: no mutate + + def _run(self) -> None: + """Drive the cascade on a worker thread, then close the app when it ends.""" + renderer = _TuiRenderer(self) + try: + self._run_conversation(renderer) + except CLIError as exc: + self._safely(self._show_error, exc.message) + # The cascade returned (STT closed, a leg failed, or a quit closed the audio) — exit. + self._safely(self.exit) + + def _safely(self, fn: Callable[..., None], *args: object) -> None: + """Hop ``fn`` onto the UI thread, dropping the error a torn-down app raises mid-call.""" + if not self.is_running: + return + with contextlib.suppress(RuntimeError): + self.call_from_thread(fn, *args) + + # --- transcript (always called on the UI thread) -------------------------- + + def live_connected(self) -> None: + """The session is live; the splash already shows the listening prompt.""" + self._set_phase("listening") + + def show_user_partial(self, text: str) -> None: + """Grow the interim user transcript in place while the turn is still being spoken.""" + self._set_phase("listening") + if self._user_partial is None: + self._user_partial = UserMessage(text) + self._mount(self._user_partial) + else: + self._user_partial.set_text(text) + self._scroll_end() + + def show_user_final(self, text: str) -> None: + """Commit the finalized user turn and move to the thinking phase.""" + if self._user_partial is None: + self._mount(UserMessage(text)) + else: + self._user_partial.set_text(text) + self._user_partial = None # finalized; the next partial starts a fresh line + self._set_phase("thinking") + self._scroll_end() + + def begin_reply(self) -> None: + """Open a fresh reply widget the agent's sentences stream into; switch to speaking.""" + self._set_phase("speaking") + self._reply_msg = AssistantMessage() + self._mount(self._reply_msg) + + def show_agent_sentence(self, text: str) -> None: + """Append one spoken sentence to the in-flight reply.""" + if self._reply_msg is None: + self._reply_msg = AssistantMessage() + self._mount(self._reply_msg) + self._reply_msg.stream(f"{text} ") + self._scroll_end() + + def end_reply(self, *, interrupted: bool) -> None: + """Finalize the reply (rendered as Markdown) and return to listening.""" + if self._reply_msg is not None: + self._reply_msg.finalize(self._reply_msg.text) + self._reply_msg = None + if interrupted: + self._mount(Note("(interrupted)")) + self._set_phase("listening") + + def _show_error(self, message: str) -> None: + self._mount(ErrorMessage(message)) + + # --- voice bar ------------------------------------------------------------ + + def _set_phase(self, phase: str) -> None: + self._voice_phase = phase + self._render_voicebar() + + def _render_voicebar(self) -> None: + """Paint the voice bar for the current phase (no Ctrl-V hint — input is voice-only).""" + self.query_one("#voicebar", Static).update( + tui_status.voicebar_markup(self._voice_phase, next(self._voice_frames)) + ) + + def _tick_voice(self) -> None: + """Advance the voice-bar meter one frame (the animation timer's callback).""" + self._render_voicebar() + + # --- splash / mounting ---------------------------------------------------- + + def _write_splash(self) -> None: + rows = [f"[bold {banner.BRAND_HEX}]{row}[/]" for row in banner.wordmark()] + rows += [ + f"[dim]{banner.version()}[/dim]", + "", + f"[{banner.BRAND_HEX}]{_READY_LINE}[/]", + f"[dim]{_TIP_LINE}[/dim]", + ] + self._mount(Static("\n".join(rows))) + + def _mount(self, widget: Static) -> None: + log = self.query_one("#log", VerticalScroll) + log.mount(widget) + log.scroll_end(animate=False) # pragma: no mutate — cosmetic; animate flag is unassertable + + def _scroll_end(self) -> None: + self.query_one("#log", VerticalScroll).scroll_end(animate=False) # pragma: no mutate + + # --- quit ----------------------------------------------------------------- + + def action_stop(self) -> None: + """Ctrl-C / Ctrl-Q: stop the audio (unblocking the cascade worker) and exit.""" + self._teardown() + self.exit() + + def on_unmount(self) -> None: + """Close the audio on any exit path, in case the worker is still blocked on the mic.""" + self._teardown() + + def _teardown(self) -> None: + if self._stopped: + return + self._stopped = True + self._on_stop() diff --git a/aai_cli/code_agent/messages.py b/aai_cli/code_agent/messages.py index 8bb1ad2..afcefdb 100644 --- a/aai_cli/code_agent/messages.py +++ b/aai_cli/code_agent/messages.py @@ -30,13 +30,22 @@ def __init__(self, text: str) -> None: super().__init__(Text(text, style=_DIM)) +def _user_markup(text: str) -> Text: + """The styled `» …` prompt echo, built in one place for the constructor and set_text.""" + return Text(f"» {text}", style="bold #38bdf8") + + class UserMessage(Static): """The echoed user prompt, with a top margin so each turn is visually separated.""" DEFAULT_CSS = "UserMessage { margin-top: 1; }" def __init__(self, text: str) -> None: - super().__init__(Text(f"» {text}", style="bold #38bdf8")) + super().__init__(_user_markup(text)) + + def set_text(self, text: str) -> None: + """Replace the shown prompt text — grows an interim voice transcript in place.""" + self.update(_user_markup(text)) class AssistantMessage(Static): diff --git a/aai_cli/code_agent/tui.py b/aai_cli/code_agent/tui.py index 5710e3b..f5bb44a 100644 --- a/aai_cli/code_agent/tui.py +++ b/aai_cli/code_agent/tui.py @@ -43,7 +43,12 @@ ) from aai_cli.code_agent.modals import ApprovalScreen, AskScreen from aai_cli.code_agent.session import CodeSession -from aai_cli.code_agent.tui_status import _spinner_text, _status_text +from aai_cli.code_agent.tui_status import ( + VOICE_FRAMES, + _spinner_text, + _status_text, + voicebar_markup, +) from aai_cli.code_agent.voice_ui import _VoiceIO, _VoiceLegs if TYPE_CHECKING: @@ -53,14 +58,6 @@ _SPIN_FRAMES = "✶✷✸✹✺" # pragma: no mutate # Seconds the Ctrl-C "press again to quit" hint stays armed (deepagents-code uses 3s too). _QUIT_HINT_SECONDS = 3 # pragma: no mutate -# Animated meter for the voice bar — a 3-cell block-char pulse (BMP, single-width, no emoji). -_VOICE_FRAMES = ("▁▃▅", "▃▅▇", "▅▇▆", "▆▇▅", "▇▅▃", "▅▃▁") # pragma: no mutate -# The three voice phases the bar distinguishes, each (label, accent color). -_VOICE_PHASES: dict[str, tuple[str, str]] = { - "listening": ("Listening — speak your request", banner.BRAND_HEX), - "thinking": ("Thinking…", "#f59e0b"), - "speaking": ("Speaking…", "#22c55e"), -} class CodeAgentApp(_VoiceLegs): @@ -125,7 +122,7 @@ def __init__( self._voice_typed = False # flips once the mic is ruled out; then input is typed only self._voice_paused = False # user-toggled off via Ctrl-V (distinct from a mic failure) self._voice_phase = "listening" # listening / thinking / speaking, shown in the voice bar - self._voice_frames = itertools.cycle(_VOICE_FRAMES) + self._voice_frames = itertools.cycle(VOICE_FRAMES) self._voice_timer: Timer | None = None # animates the voice-bar meter while it's shown self._streaming_msg: AssistantMessage | None = None # the reply widget tokens stream into self._last_tool_output: ToolOutput | None = None # the row Ctrl+O expands/collapses @@ -351,10 +348,11 @@ def _set_voice_phase(self, phase: str) -> None: def _render_voicebar(self) -> None: """Paint the voice bar for the current phase: an animated meter, label, and accent.""" - label, color = _VOICE_PHASES[self._voice_phase] - meter = next(self._voice_frames) hint = " [dim](Ctrl-V to type)[/dim]" if self._voice_phase == "listening" else "" - self.query_one("#voicebar", Static).update(f"[{color}]{meter}[/] {escape(label)}{hint}") + meter = next(self._voice_frames) + self.query_one("#voicebar", Static).update( + voicebar_markup(self._voice_phase, meter, hint=hint) + ) def _tick_voice(self) -> None: """Advance the voice-bar meter one frame (the animation timer's callback).""" diff --git a/aai_cli/code_agent/tui_status.py b/aai_cli/code_agent/tui_status.py index 5e385b5..e163ea0 100644 --- a/aai_cli/code_agent/tui_status.py +++ b/aai_cli/code_agent/tui_status.py @@ -8,6 +8,31 @@ from pathlib import Path +from rich.markup import escape + +from aai_cli.ui import theme + +# Animated meter for the voice bar — a 3-cell block-char pulse (BMP, single-width, no emoji). +# Public: both the `code` and `live` TUIs cycle it for their bar animation. +VOICE_FRAMES = ("▁▃▅", "▃▅▇", "▅▇▆", "▆▇▅", "▇▅▃", "▅▃▁") # pragma: no mutate +# The voice phases the bar distinguishes, each (label, accent color). Shared by the `code` +# and `live` TUIs so both read the same: blue while listening, amber thinking, green speaking. +_VOICE_PHASES: dict[str, tuple[str, str]] = { + "listening": ("Listening — speak your request", theme.BRAND), + "thinking": ("Thinking…", "#f59e0b"), + "speaking": ("Speaking…", "#22c55e"), +} + + +def voicebar_markup(phase: str, frame: str, *, hint: str = "") -> str: + """The voice bar's content for one phase: an accented meter, the phase label, and a hint. + + ``hint`` is appended verbatim (already-marked-up trailing copy, e.g. a Ctrl-V tip); the + label is escaped so a phase string can't inject styling. + """ + label, color = _VOICE_PHASES[phase] + return f"[{color}]{frame}[/] {escape(label)}{hint}" + def _spinner_text(elapsed_s: int, frame: str) -> str: """The working-indicator line: a spinner glyph and the elapsed seconds.""" diff --git a/aai_cli/commands/agent_cascade/_exec.py b/aai_cli/commands/agent_cascade/_exec.py index 408d147..7a7df44 100644 --- a/aai_cli/commands/agent_cascade/_exec.py +++ b/aai_cli/commands/agent_cascade/_exec.py @@ -24,7 +24,7 @@ from aai_cli.app.agent_shared import validate_voice from aai_cli.app.context import AppState from aai_cli.code_agent import firecrawl_search -from aai_cli.core import choices, client, config_builder, env, errors, llm, signals +from aai_cli.core import choices, client, config_builder, env, errors, llm, signals, stdio from aai_cli.core.errors import UsageError from aai_cli.streaming import turn_presets from aai_cli.streaming.sources import FileSource @@ -120,17 +120,22 @@ def _parse_tts_config(pairs: tuple[str, ...]) -> dict[str, str]: return extra -def _warn_without_web_search(*, json_mode: bool) -> None: - """Warn that web search is off unless a ``FIRECRAWL_API_KEY`` is set to enable it. +def _web_search_note() -> str | None: + """The "web search is off" notice when no ``FIRECRAWL_API_KEY`` enables it, else ``None``. The other default tools (URL fetch, AssemblyAI docs, and the MCP servers) need no key; only Firecrawl web search does, so its absence is the one worth flagging up front. """ - if not env.get(firecrawl_search.FIRECRAWL_API_KEY_ENV): - output.emit_warning( - "Web search is off — set FIRECRAWL_API_KEY to enable the agent's web search tool.", - json_mode=json_mode, - ) + if env.get(firecrawl_search.FIRECRAWL_API_KEY_ENV): + return None + return "Web search is off — set FIRECRAWL_API_KEY to enable the agent's web search tool." + + +def _warn_without_web_search(*, json_mode: bool) -> None: + """Emit the web-search-off notice (if any) to stderr / the JSON warning channel.""" + note = _web_search_note() + if note is not None: + output.emit_warning(note, json_mode=json_mode) def _resolve_mcp_servers(mcp_config: tuple[Path, ...]) -> dict[str, Mapping[str, object]]: @@ -194,6 +199,46 @@ def _print_show_code(opts: AgentCascadeOptions, system_prompt_text: str) -> None output.print_code(code_gen.agent_cascade(config, speech_model=opts.speech_model)) +def _should_use_tui(*, from_file: bool, json_mode: bool, text_mode: bool) -> bool: + """Whether to run the live conversation in the voice-only Textual TUI. + + The TUI is the default for an interactive mic session in human mode. It's skipped for + file/sample input (a one-shot run with no live mic), for the machine output modes + (``--json`` / ``-o text`` stream to stdout), and when stdout/stdin aren't a TTY (piped or + CI) — all of which keep the plain line renderer. + """ + return ( + not from_file + and not json_mode + and not text_mode + and stdio.stdout_is_tty() + and stdio.stdin_is_tty() + ) + + +def _run_live_tui(api_key: str, opts: AgentCascadeOptions, config: CascadeConfig) -> None: + """Run the live conversation inside the voice-only Textual TUI. + + Opens the duplex mic/speaker, wires the cascade legs, and hands the app a blocking + ``run_conversation`` (driven on a worker thread) plus an ``on_stop`` that closes the audio + so a quit ends the mic iterator and unblocks that worker. + """ + from aai_cli.agent_cascade.tui import LiveAgentApp + + duplex = DuplexAudio(target_rate=SAMPLE_RATE, device=opts.device) + stt_params = _build_stt_params(opts, SAMPLE_RATE) + deps = engine.CascadeDeps.real(api_key, config, audio=duplex.mic, stt_params=stt_params) + + def run_conversation(renderer: engine.Renderer) -> None: + engine.run_cascade(renderer=renderer, player=duplex.player, config=config, deps=deps) + + LiveAgentApp( + run_conversation=run_conversation, + on_stop=duplex.close, + web_note=_web_search_note(), + ).run(mouse=False) + + def run_agent_cascade(opts: AgentCascadeOptions, state: AppState, *, json_mode: bool) -> None: """Execute one `assembly agent-cascade` cascade from already-parsed flags.""" text_mode, json_mode = resolve_output_modes(opts.output_field, json_mode=json_mode) @@ -206,8 +251,6 @@ def run_agent_cascade(opts: AgentCascadeOptions, state: AppState, *, json_mode: _print_show_code(opts, system_prompt_text) return - _warn_without_web_search(json_mode=json_mode) - from_file = bool(opts.source) or opts.sample if from_file and opts.device is not None: raise UsageError("--device applies only to microphone input.") @@ -236,6 +279,13 @@ def run_agent_cascade(opts: AgentCascadeOptions, state: AppState, *, json_mode: tts_extra=tts_extra, mcp_servers=mcp_servers, ) + + if _should_use_tui(from_file=from_file, json_mode=json_mode, text_mode=text_mode): + # The voice-only Textual front-end surfaces the web-search note in-app, not on stderr. + _run_live_tui(api_key, opts, config) + return + + _warn_without_web_search(json_mode=json_mode) renderer = AgentRenderer(json_mode=json_mode, text_mode=text_mode, mic_input=not from_file) audio, player, sample_rate = _open_audio( renderer, source=opts.source, sample=opts.sample, device=opts.device, from_file=from_file diff --git a/tests/test_code_messages.py b/tests/test_code_messages.py index 9a1168d..99e7fe5 100644 --- a/tests/test_code_messages.py +++ b/tests/test_code_messages.py @@ -11,7 +11,7 @@ import asyncio from aai_cli.code_agent.events import AssistantDelta, AssistantText, ToolResult -from aai_cli.code_agent.messages import AssistantMessage, ToolOutput +from aai_cli.code_agent.messages import AssistantMessage, ToolOutput, UserMessage from aai_cli.code_agent.tui import CodeAgentApp @@ -92,6 +92,22 @@ async def go() -> None: _run(go()) +def test_user_message_prefixes_and_set_text_replaces_in_place() -> None: + # The prompt echo carries the "» " prefix; set_text() swaps the body in place (used to grow + # an interim voice transcript), keeping the same widget rather than mounting a new line. + async def go() -> None: + app = CodeAgentApp(agent=FakeAgent([])) + async with app.run_test(size=(100, 30)) as pilot: + await pilot.pause() + msg = UserMessage("hi") + await app.query_one("#log").mount(msg) + assert "» hi" in str(msg.render()) + msg.set_text("hi there friend") + assert "» hi there friend" in str(msg.render()) # body replaced, not appended + + _run(go()) + + def test_short_tool_output_is_not_expandable() -> None: # Output that already fits has no expand affordance and Ctrl-O is a no-op on it. async def go() -> None: diff --git a/tests/test_code_tui_status.py b/tests/test_code_tui_status.py index f261a51..fc53dbc 100644 --- a/tests/test_code_tui_status.py +++ b/tests/test_code_tui_status.py @@ -9,6 +9,7 @@ from pathlib import Path from aai_cli.code_agent import tui_status +from aai_cli.ui import theme def test_spinner_text_formats_frame_and_elapsed() -> None: @@ -37,6 +38,19 @@ def test_git_branch_and_status(tmp_path: Path) -> None: assert "manual" in tui_status._status_text(tmp_path, auto_approve=False) +def test_voicebar_markup_per_phase_carries_label_meter_accent_and_hint() -> None: + # Each phase renders its own label + accent color; the meter frame and any trailing hint + # are passed through verbatim. Assert the literal accents (not the dict value) so a mutated + # color literal is caught — reading from the dict would mutate in lockstep and survive. + listening = tui_status.voicebar_markup("listening", "▁▃▅", hint=" (Ctrl-V)") + assert "Listening" in listening and "▁▃▅" in listening and " (Ctrl-V)" in listening + assert theme.BRAND in listening # blue accent while listening + thinking = tui_status.voicebar_markup("thinking", "▃▅▇") + assert "Thinking" in thinking and "#f59e0b" in thinking # amber, no hint + speaking = tui_status.voicebar_markup("speaking", "▅▇▆") + assert "Speaking" in speaking and "#22c55e" in speaking # green + + def test_status_text_renders_voice_badge(tmp_path: Path) -> None: # No voice front-end -> no voice badge (the dot glyphs are absent); on/off render the # state so the Ctrl-V toggle shows. (Asserts on the dots, not the word — the tmp_path name diff --git a/tests/test_live_tui.py b/tests/test_live_tui.py new file mode 100644 index 0000000..ba92a06 --- /dev/null +++ b/tests/test_live_tui.py @@ -0,0 +1,342 @@ +"""Tests for the voice-only `assembly live` Textual TUI (``LiveAgentApp``). + +Drives the real Textual app headless. Most tests call the transcript/phase methods directly +(they always run on the UI thread), mirroring the code-TUI suite; two drive the worker leg with +a scripted ``run_conversation`` through the real ``_TuiRenderer`` to cover the off-thread hop, +the error path, and teardown — all without a mic, speaker, or socket. +""" + +from __future__ import annotations + +import asyncio +import threading +import types + +import pytest +from textual.widgets import Static + +from aai_cli.agent_cascade import engine +from aai_cli.agent_cascade.tui import LiveAgentApp, _TuiRenderer +from aai_cli.app.context import AppState +from aai_cli.code_agent.messages import AssistantMessage, ErrorMessage, Note, UserMessage +from aai_cli.commands.agent_cascade import _exec +from aai_cli.commands.agent_cascade._exec import run_agent_cascade +from aai_cli.core import config, stdio +from aai_cli.core.errors import CLIError +from tests.test_agent_cascade_command import _opts + + +def _run(coro) -> None: + asyncio.run(coro) + + +def _wait_until(pilot, predicate): + """Pump the event loop until ``predicate`` holds (lets a worker thread land).""" + + async def loop() -> bool: + for _ in range(200): + await pilot.pause(0.01) + if predicate(): + return True + return False + + return loop() + + +def _app(run_conversation=None, on_stop=None, web_note=None): + """A LiveAgentApp whose worker stays alive for the test, releasing on teardown. + + The real ``run_conversation`` blocks on the live mic; the default here blocks on an event + so the app doesn't auto-exit (an instant return makes the worker close the app). Teardown + always sets that event — and still runs any test-supplied ``on_stop`` — so no worker leaks. + """ + release = threading.Event() + + def stop() -> None: + release.set() + if on_stop is not None: + on_stop() + + def block(renderer) -> None: + release.wait(30) # block like a live mic; teardown releases it well before this + + return LiveAgentApp( + run_conversation=run_conversation or block, + on_stop=stop, + web_note=web_note, + ) + + +def _voicebar(app) -> str: + return str(app.query_one("#voicebar", Static).render()) + + +def test_splash_and_status_render() -> None: + # The session opens on the ASSEMBLY wordmark + ready line, and the footer shows the only + # control (quit) — there is no text prompt mounted (input is voice-only). + async def go() -> None: + app = _app() + async with app.run_test(size=(100, 30)) as pilot: + await pilot.pause() + splash = str(app.query_one("#log").children[0].render()) + assert "█" in splash and "Listening… start talking" in splash # the wordmark splash + assert "Listening" in _voicebar(app) # opens in the listening phase + assert "Ctrl-C to quit" in str(app.query_one("#status", Static).render()) + assert len(app.query("#prompt")) == 0 # no text input — voice only + assert app.ENABLE_COMMAND_PALETTE is False # the voice UI hides the command palette + + _run(go()) + + +def test_user_partial_grows_then_finalizes_into_thinking() -> None: + async def go() -> None: + app = _app() + async with app.run_test(size=(100, 30)) as pilot: + await pilot.pause() + app.show_user_partial("what is") + app.show_user_partial("what is the weather") + # One growing user line, not two — the partial updates in place. + assert len(app.query(UserMessage)) == 1 + assert "Listening" in _voicebar(app) + app.show_user_final("what is the weather") + assert "» what is the weather" in str(app.query_one(UserMessage).render()) + assert "Thinking" in _voicebar(app) # a finalized turn -> the LLM is thinking + + _run(go()) + + +def test_user_final_without_a_prior_partial_still_shows_the_turn() -> None: + async def go() -> None: + app = _app() + async with app.run_test(size=(100, 30)) as pilot: + await pilot.pause() + app.show_user_final("hello") # no partial first (formatted turn arrives whole) + assert "» hello" in str(app.query_one(UserMessage).render()) + assert "Thinking" in _voicebar(app) + + _run(go()) + + +def test_reply_streams_sentences_and_finalizes_back_to_listening() -> None: + async def go() -> None: + app = _app() + async with app.run_test(size=(100, 30)) as pilot: + await pilot.pause() + app.begin_reply() + assert "Speaking" in _voicebar(app) + app.show_agent_sentence("Hello.") + app.show_agent_sentence("How can I help?") + reply = app.query_one(AssistantMessage) + assert reply.text == "Hello. How can I help? " + app.end_reply(interrupted=False) + assert "Listening" in _voicebar(app) # reply done -> back to listening + assert len(app.query(Note)) == 0 # not interrupted -> no interrupted aside + + _run(go()) + + +def test_agent_sentence_without_begin_reply_mounts_a_reply() -> None: + async def go() -> None: + app = _app() + async with app.run_test(size=(100, 30)) as pilot: + await pilot.pause() + app.show_agent_sentence("Standalone.") # defensive: no begin_reply first + assert app.query_one(AssistantMessage).text == "Standalone. " + + _run(go()) + + +def test_interrupted_reply_notes_the_barge_in() -> None: + async def go() -> None: + app = _app() + async with app.run_test(size=(100, 30)) as pilot: + await pilot.pause() + app.begin_reply() + app.show_agent_sentence("As I was saying") + app.end_reply(interrupted=True) # the user barged in + assert any("interrupted" in str(n.render()) for n in app.query(Note)) + assert "Listening" in _voicebar(app) + + _run(go()) + + +def test_end_reply_without_an_active_reply_is_a_safe_noop() -> None: + # A reply_done with no open reply widget (e.g. a turn that produced no spoken sentence) must + # not touch the absent widget — it just returns to listening. + async def go() -> None: + app = _app() + async with app.run_test(size=(100, 30)) as pilot: + await pilot.pause() + app.end_reply(interrupted=False) # no begin_reply first + assert len(app.query(AssistantMessage)) == 0 # nothing mounted + assert "Listening" in _voicebar(app) + + _run(go()) + + +def test_voice_bar_animation_advances_on_tick() -> None: + async def go() -> None: + app = _app() + async with app.run_test(size=(100, 30)) as pilot: + await pilot.pause() + before = _voicebar(app) + app._tick_voice() + assert _voicebar(app) != before # the meter advanced a frame + + _run(go()) + + +def test_web_note_is_surfaced_as_a_notification() -> None: + async def go() -> None: + app = _app(web_note="Web search is off — set FIRECRAWL_API_KEY") + async with app.run_test(size=(100, 30)) as pilot: + await pilot.pause() + assert any("FIRECRAWL_API_KEY" in n.message for n in app._notifications) + + _run(go()) + + +def test_action_stop_tears_down_audio_and_exits(monkeypatch) -> None: + async def go() -> None: + stops: list[bool] = [] + app = _app(on_stop=lambda: stops.append(True)) + async with app.run_test(size=(100, 30)) as pilot: + await pilot.pause() + exited: list[bool] = [] + monkeypatch.setattr(app, "exit", lambda *a, **k: exited.append(True)) + app.action_stop() + assert stops == [True] # the audio was closed (unblocks the cascade worker) + assert exited == [True] + app.action_stop() # idempotent: a second stop never re-closes the audio + assert stops == [True] + + _run(go()) + + +def test_worker_drives_the_renderer_and_unmount_closes_audio() -> None: + # The blocking run_conversation runs on a worker thread and reaches the UI through the real + # _TuiRenderer; tearing the app down fires on_stop, which (in production) ends the mic and + # lets the worker return. + async def go() -> None: + done = threading.Event() + + def run_conversation(renderer) -> None: + # A full spoken turn, exercising every _TuiRenderer leg (each hops to the UI thread). + renderer.connected() + renderer.user_partial("turn it") + renderer.user_final("turn it up") + renderer.reply_started() + renderer.agent_transcript("Done.", interrupted=False) + renderer.reply_done(interrupted=False) + done.wait(30) # block until teardown's on_stop fires (timeout is just a leak guard) + + app = _app(run_conversation=run_conversation, on_stop=done.set) + async with app.run_test(size=(100, 30)) as pilot: + assert await _wait_until(pilot, lambda: bool(app.query(AssistantMessage))) + assert "» turn it up" in str(app.query_one(UserMessage).render()) + assert app.query_one(AssistantMessage).text == "Done. " + assert done.is_set() # leaving the run_test context unmounted -> on_stop released it + + _run(go()) + + +def test_worker_surfaces_a_leg_error_in_the_transcript() -> None: + async def go() -> None: + def boom(renderer) -> None: + raise CLIError("gateway down", error_type="api_error", exit_code=1) + + app = _app(run_conversation=boom) + async with app.run_test(size=(100, 30)) as pilot: + assert await _wait_until(pilot, lambda: bool(app.query(ErrorMessage))) + assert "gateway down" in str(app.query_one(ErrorMessage).render()) + + _run(go()) + + +def test_tui_renderer_drops_calls_after_the_app_stops() -> None: + # A renderer call that lands after teardown must be swallowed (the turn is moot), not raised + # as an unhandled worker-thread error. This app was never started, so is_running is False. + app = _app() + assert app.is_running is False + renderer = _TuiRenderer(app) + renderer.user_final("ignored") # returns without raising + renderer.reply_done(interrupted=False) + + +# --- run_agent_cascade -> TUI selection + wiring ----------------------------- + + +def test_should_use_tui_only_for_interactive_human_mic_sessions(monkeypatch) -> None: + # The TUI is the default for a live mic session in human mode on a TTY. Each of the four + # disqualifiers (file input, --json, -o text, no TTY) falls back to the line renderer. + monkeypatch.setattr(stdio, "stdout_is_tty", lambda: True) + monkeypatch.setattr(stdio, "stdin_is_tty", lambda: True) + assert _exec._should_use_tui(from_file=False, json_mode=False, text_mode=False) is True + assert _exec._should_use_tui(from_file=True, json_mode=False, text_mode=False) is False + assert _exec._should_use_tui(from_file=False, json_mode=True, text_mode=False) is False + assert _exec._should_use_tui(from_file=False, json_mode=False, text_mode=True) is False + monkeypatch.setattr(stdio, "stdout_is_tty", lambda: False) + assert _exec._should_use_tui(from_file=False, json_mode=False, text_mode=False) is False + + +def test_web_search_note_tracks_the_firecrawl_key(monkeypatch) -> None: + monkeypatch.delenv("FIRECRAWL_API_KEY", raising=False) + assert "FIRECRAWL_API_KEY" in (_exec._web_search_note() or "") + monkeypatch.setenv("FIRECRAWL_API_KEY", "fc-x") + assert _exec._web_search_note() is None + + +def _wire_tui(monkeypatch): + """Stub auth/audio/deps so run_agent_cascade reaches the TUI launch on an interactive mic run.""" + monkeypatch.setattr(_exec.tts_session, "require_available", lambda _c: None) + monkeypatch.setattr(config, "resolve_api_key", lambda **_: "k") + monkeypatch.setattr(stdio, "stdout_is_tty", lambda: True) + monkeypatch.setattr(stdio, "stdin_is_tty", lambda: True) + fake_duplex = types.SimpleNamespace(mic=object(), player=object(), close=lambda: None) + monkeypatch.setattr(_exec, "DuplexAudio", lambda **kwargs: fake_duplex) + monkeypatch.setattr(engine.CascadeDeps, "real", lambda *a, **k: "deps") + return fake_duplex + + +def test_interactive_human_run_launches_the_tui(monkeypatch) -> None: + # A mic session in human mode on a TTY runs the Textual app, not the line renderer. + fake_duplex = _wire_tui(monkeypatch) + captured: dict[str, object] = {} + + class FakeApp: + def __init__(self, *, run_conversation, on_stop, web_note): + captured["run_conversation"] = run_conversation + captured["on_stop"] = on_stop + + def run(self, **kwargs): + captured["ran"] = kwargs + + monkeypatch.setattr("aai_cli.agent_cascade.tui.LiveAgentApp", FakeApp) + # AgentRenderer must NOT be built on the TUI path — fail loudly if the line path is taken. + monkeypatch.setattr( + _exec, "AgentRenderer", lambda **kw: pytest.fail("line renderer used in TUI mode") + ) + run_agent_cascade(_opts(), AppState(), json_mode=False) + assert callable(captured["run_conversation"]) # the TUI was launched with a cascade closure + assert captured["on_stop"] is fake_duplex.close # quit closes the audio + assert captured["ran"] == {"mouse": False} # mouse off so transcript text stays selectable + + +def test_tui_run_conversation_drives_the_cascade(monkeypatch) -> None: + # The closure handed to the app runs the cascade with the duplex player and the wired deps. + fake_duplex = _wire_tui(monkeypatch) + captured: dict[str, object] = {} + monkeypatch.setattr(engine, "run_cascade", lambda **kw: captured.update(kw)) + + class FakeApp: + def __init__(self, *, run_conversation, on_stop, web_note): + self._rc = run_conversation + + def run(self, **kwargs): + self._rc("renderer-sentinel") # the app would call this on its worker thread + + monkeypatch.setattr("aai_cli.agent_cascade.tui.LiveAgentApp", FakeApp) + run_agent_cascade(_opts(), AppState(), json_mode=False) + assert captured["player"] is fake_duplex.player + assert captured["deps"] == "deps" + assert captured["renderer"] == "renderer-sentinel"