Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion aai_cli/AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ heavily-reworked commands with long bodies; small commands keep the inline
- **`streaming/`** + `client.stream_audio` — v3 realtime API. Event callbacks run on the SDK reader thread and guard against `BrokenPipeError` (`stdio.silence_stdout()`) so a closed pipe never dumps a thread traceback.
- **`core/sync_stt.py`** + **`core/signals.py`** + `commands/dictate/` — `assembly dictate`: headless dictation over the **Sync STT API** (`Environment.sync_base`, one POST `/transcribe` per utterance with the required `X-AAI-Model: u3-sync-pro` header; 80 ms–120 s of PCM/WAV). It needs no terminal: recording starts immediately and `dictate_exec._record` polls `signals.stop_on_terminate` between ~100 ms mic chunks for a SIGTERM, which finishes the utterance (clean exit 0) — so a hotkey tool like Hammerspoon can launch it as a background task and `kill -TERM`/`task:terminate()` to transcribe. SIGINT (Ctrl-C) still cancels (exit 130). Both boundaries (the stop latch, mic, HTTP) are injectable, so the suite never needs a real signal or microphone (`tests/test_dictate_exec.py` scripts the SIGTERM latch). Contrast `signals.terminate_as_interrupt` (used by `stream`/`agent`/`speak`), which routes SIGTERM into the *cancel* path instead.
- **`agent/`** — full-duplex voice agent (mic in, TTS out via `voices.py`).
- **`agent_cascade/`** + `commands/agent_cascade/` — `assembly agent-cascade`: the same live terminal conversation as `assembly agent`, but **client-orchestrated** — `engine.run_cascade` wires Streaming STT → the LLM Gateway → streaming TTS itself instead of talking to the Voice Agent endpoint, mirroring what the `agent-cascade` `assembly init` template does server-side. **Sandbox-only** (streaming TTS has no prod host; guarded via `tts.session.require_available`). Reuses the agent slice's `DuplexAudio`/`AgentRenderer` and `core.client.stream_audio`/`core.llm.complete`/`tts.session.synthesize`; the three network legs are injected through `engine.CascadeDeps` (the `tts/session.py` seam) so the cascade — greeting, per-sentence TTS, barge-in, history window — is unit-tested against fakes with no sockets/mic/speaker. The LLM leg is a deepagents graph (`brain.py`); under `-v` (`debuglog.active()`) `brain._run_graph` *streams* that graph instead of `invoke`-ing it and logs each tool call/result/interim line as it lands (reusing `code_agent.events.message_events`), so a spoken turn that stalls mid-tool is debuggable — plain `invoke` runs the whole loop internally and `-v` would otherwise show only the httpx lines.
- **`agent_cascade/`** + `commands/agent_cascade/` — `assembly agent-cascade`: the same live terminal conversation as `assembly agent`, but **client-orchestrated** — `engine.run_cascade` wires Streaming STT → the LLM Gateway → streaming TTS itself instead of talking to the Voice Agent endpoint, mirroring what the `agent-cascade` `assembly init` template does server-side. **Sandbox-only** (streaming TTS has no prod host; guarded via `tts.session.require_available`). Reuses the agent slice's `DuplexAudio`/`AgentRenderer` and `core.client.stream_audio`/`core.llm.complete`/`tts.session.synthesize`; the three network legs are injected through `engine.CascadeDeps` (the `tts/session.py` seam) so the cascade — greeting, per-sentence TTS, barge-in, history window — is unit-tested against fakes with no sockets/mic/speaker. The LLM leg is a deepagents graph (`brain.py`); under `-v` (`debuglog.active()`) `brain._run_graph` *streams* that graph instead of `invoke`-ing it and logs each tool call/result/interim line as it lands (reusing `code_agent.events.message_events`), so a spoken turn that stalls mid-tool is debuggable — plain `invoke` runs the whole loop internally and `-v` would otherwise show only the httpx lines. **Front-end:** an interactive mic session in human mode runs a **voice-only Textual TUI** (`agent_cascade/tui.py`, `LiveAgentApp`) by default — there's no text input (you can't type to it), just a transcript + an animated voice bar tracking listening/thinking/speaking. It shares the `assembly code` TUI's chrome (`code_agent.banner` wordmark, `code_agent.messages` widgets, `code_agent.tui_status.voicebar_markup`/`VOICE_FRAMES`); the blocking `run_cascade` runs on a worker thread and reaches the UI through a `_TuiRenderer` (the `engine.Renderer` protocol) that hops each call onto the UI thread, and a quit calls `DuplexAudio.close` to end the mic iterator and unblock that worker. `_exec._should_use_tui` gates it: file/sample input, `--json`/`-o text`, and a non-TTY all fall back to the plain `AgentRenderer` line output.
- **`tts/`** + `commands/speak.py` — `assembly speak` synthesizes text to speech over the sandbox streaming-TTS WebSocket (`streaming-tts.sandbox000.…`). **Sandbox-only:** `session.is_available()` is false in production (empty `Environment.streaming_tts_host`), so the command exits 2 with a `--sandbox` hint. `session.synthesize` drives a Begin→Generate→Flush→Audio→Terminate protocol with an injectable `connect` for hermetic tests (mirrors `agent/session.py`); `audio.py` plays the PCM (default) or writes a WAV (`--out`). The single-voice default-playback path **streams**: `synthesize`'s `on_audio(chunk, sample_rate)` callback is wired to `audio.PcmPlayer.feed`, so speech starts on the first Audio frame (it opens the device lazily, since the rate is only known at Begin) instead of after the whole text — the win for a long `--url` page. `--out` (needs the full buffer) and the multi-voice dialogue path (`synthesize_dialogue` → `_output_audio` → buffered `play_pcm`) stay buffered; `synthesize` still returns the complete PCM for the summary regardless.
- **`code_agent/`** + `commands/code/` — `assembly code`: a terminal coding agent (a bespoke port of langchain-ai/deepagents' `code` agent) that talks **only** to the LLM Gateway. `model.py` pins the model to `ChatOpenAI` against `llm_gateway_base`; `agent.py` builds the deepagents graph over a cwd-scoped `LocalShellBackend` (filesystem + shell tools), plus extra tools: the custom `assembly` CLI tool (`cli_tool.py`, runs `python -m aai_cli` with the key via child env, never argv), a URL `fetch_url` tool (`fetch_tool.py`), Firecrawl web search when `FIRECRAWL_API_KEY` is set (`firecrawl_search.py`, shared with the live voice agent), an `ask_user` tool routed through an `AskBridge` to the front-end (`ask_tool.py`), and best-effort docs MCP tools (`docs_mcp.py`). Middleware adds installed skills (`skills.py`) and long-term memory (`memory.py`), each over its own dedicated backend. Sessions persist via a SQLite checkpointer (`store.py`) keyed by `--session`, so conversations resume. Approval gates the mutating tools (write/edit/execute/`assembly`/`fetch_url`); the general-purpose `task` subagent comes from deepagents by default. `session.py` drives the graph turn-by-turn (interrupt/resume = human approval), emitting framework-agnostic `events.py` to either the Textual TUI (`tui.py`, modeled on deepagents-code: transcript + input + approval/ask modals + clipboard copy) or the Rich fallback (`render.py`). The whole orchestration is tested by driving the **real** graph with a fake `BaseChatModel` (`tests/test_code_agent.py`), so no network/TTY is needed. **Voice is the default front-end in an interactive TTY** (`voice.py` + `_exec._run_voice`): `VoiceSession.listen` captures one spoken turn over Streaming STT (gating the mic shut the instant a turn finalizes) and `VoiceSession.speak` reads each assistant reply back over streaming TTS. It runs the **Rich REPL** loop (not the keyboard TUI) with a voice `read_line` + a reply-speaking sink. Readback needs streaming TTS, so it's **sandbox-only** (`tts.session.is_available`); in production the mic input still works and replies stay on screen. A mic-less box degrades to typed input on the first `AUDIO_ERROR_TYPES` `CLIError`; `--no-voice` selects the TUI, and a non-TTY (pipe/CI) the headless loop. Both legs (STT/TTS) are injected like the cascade's, so `tests/test_code_voice.py` drives it with fakes — no mic/speaker/socket.
- **`code_gen/`** — backs `--show-code` on `transcribe`/`stream`/`agent`: builds a ready-to-run Python SDK script from exactly the flags passed (no API key needed; generated code reads `ASSEMBLYAI_API_KEY`).
Expand Down
259 changes: 259 additions & 0 deletions aai_cli/agent_cascade/tui.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,259 @@
"""A voice-only Textual UI for `assembly live` (the agent cascade).

Shares the chrome of the `assembly code` TUI — the flat dark canvas, the ASSEMBLY
wordmark splash, the animated voice bar, and the transcript message widgets — but drops
the text prompt: `live` is a hands-free spoken conversation, so there is nothing to type.

The cascade (Streaming STT -> LLM -> streaming TTS) is handed in as a blocking
``run_conversation`` driven on a worker thread; it streams transcript events back through a
:class:`_TuiRenderer` that hops each call onto the UI thread. The voice bar tracks the phase
(listening / thinking / speaking). A quit calls ``on_stop`` to close the audio, which ends the
mic iterator and unblocks that worker.
"""

from __future__ import annotations

import contextlib
import itertools
from typing import TYPE_CHECKING, ClassVar

from textual.app import App, ComposeResult
from textual.containers import VerticalScroll
from textual.widgets import Static

from aai_cli.code_agent import banner, tui_status
from aai_cli.code_agent.messages import AssistantMessage, ErrorMessage, Note, UserMessage
from aai_cli.core.errors import CLIError

if TYPE_CHECKING:
from collections.abc import Callable

from textual.timer import Timer

from aai_cli.agent_cascade.engine import Renderer

# Splash intro copy (the code agent's banner copy is code-specific, so `live` carries its own).
_READY_LINE = "Listening… start talking when you're ready."
_TIP_LINE = "Use headphones — the mic stays open while the agent speaks."
# The one-line footer: a hands-free session, so the only control is quit.
_STATUS_LINE = "Ctrl-C to quit"


class _TuiRenderer:
"""Marshals cascade :class:`~aai_cli.agent_cascade.engine.Renderer` calls onto the UI thread.

The cascade runs on a worker thread; every render call hops back via ``call_from_thread``.
Once the app has torn down (a quit mid-turn) that call raises ``RuntimeError`` — the event is
moot then, so it's dropped rather than surfaced as an unhandled worker-thread exception.
"""

def __init__(self, app: LiveAgentApp) -> None:
self._app = app

def connected(self) -> None:
self._dispatch(self._app.live_connected)

def user_partial(self, text: str) -> None:
self._dispatch(self._app.show_user_partial, text)

def user_final(self, text: str) -> None:
self._dispatch(self._app.show_user_final, text)

def reply_started(self) -> None:
self._dispatch(self._app.begin_reply)

def agent_transcript(self, text: str, *, interrupted: bool) -> None:
# Sentences are emitted before any barge-in check, so `interrupted` is always False
# here (the interrupted state is surfaced on reply_done); accept it for the protocol.
del interrupted # pragma: no mutate
self._dispatch(self._app.show_agent_sentence, text)

def reply_done(self, *, interrupted: bool) -> None:
self._dispatch(lambda: self._app.end_reply(interrupted=interrupted))

def _dispatch(self, fn: Callable[..., None], *args: object) -> None:
if not self._app.is_running:
return
with contextlib.suppress(RuntimeError):
self._app.call_from_thread(fn, *args)


class LiveAgentApp(App[None]):
"""The hands-free voice TUI: a scrolling transcript above an animated voice bar."""

# Flat pure-black canvas matching the `code` TUI: a bordered voice bar and a one-line
# footer, with no text prompt (there's nothing to type into a live voice session).
CSS = f"""
Screen {{ background: #000000; }}
#log {{ height: 1fr; border: none; background: #000000; padding: 1 2; }}
#voicebar {{ dock: bottom; height: 3; background: #000000; border: round {banner.BRAND_HEX};
margin: 1 1; content-align: center middle; }}
#status {{ dock: bottom; height: 1; background: #000000; padding: 0 1; }}
"""
TITLE = "AssemblyAI Live"
ENABLE_COMMAND_PALETTE = False
# Ctrl-C / Ctrl-Q both stop the session; there is no turn to interrupt and nothing to type,
# so a single press quits (closing the audio unblocks the cascade worker).
BINDINGS: ClassVar = [
("ctrl+c", "stop", "Quit"),
("ctrl+q", "stop", "Quit"),
]

def __init__(
self,
*,
run_conversation: Callable[[Renderer], None],
on_stop: Callable[[], None],
web_note: str | None = None,
) -> None:
super().__init__()
self._run_conversation = run_conversation # blocking; runs the cascade given a Renderer
self._on_stop = on_stop # closes the audio so a quit unblocks the cascade worker
self._web_note = web_note
self._voice_phase = "listening"
self._voice_frames = itertools.cycle(tui_status.VOICE_FRAMES)
self._voice_timer: Timer | None = None
self._user_partial: UserMessage | None = None # the in-place "you: …" widget for a turn
self._reply_msg: AssistantMessage | None = None # the reply widget sentences stream into
self._stopped = False # guards on_stop against a double teardown (quit + unmount)

def compose(self) -> ComposeResult:
yield VerticalScroll(id="log")
yield Static("", id="voicebar")
yield Static(f"[dim]{_STATUS_LINE}[/dim]", id="status")

def on_mount(self) -> None:
self._write_splash()
if self._web_note:
self.notify(self._web_note, title="Web search disabled", severity="warning")
self._render_voicebar()
self._voice_timer = self.set_interval(0.3, self._tick_voice) # pragma: no mutate
# Defer the first mic open until after the splash has painted (a GIL-holding PortAudio
# open races Textual's initial render otherwise — same reason as the code TUI).
self.call_after_refresh(self._start)

def _start(self) -> None:
# thread=True: the cascade is a blocking sync call; exclusive=True: one session at a time.
self.run_worker(self._run, thread=True, exclusive=True, name="cascade") # pragma: no mutate

def _run(self) -> None:
"""Drive the cascade on a worker thread, then close the app when it ends."""
renderer = _TuiRenderer(self)
try:
self._run_conversation(renderer)
except CLIError as exc:
self._safely(self._show_error, exc.message)
# The cascade returned (STT closed, a leg failed, or a quit closed the audio) — exit.
self._safely(self.exit)

def _safely(self, fn: Callable[..., None], *args: object) -> None:
"""Hop ``fn`` onto the UI thread, dropping the error a torn-down app raises mid-call."""
if not self.is_running:
return
with contextlib.suppress(RuntimeError):
self.call_from_thread(fn, *args)

# --- transcript (always called on the UI thread) --------------------------

def live_connected(self) -> None:
"""The session is live; the splash already shows the listening prompt."""
self._set_phase("listening")

def show_user_partial(self, text: str) -> None:
"""Grow the interim user transcript in place while the turn is still being spoken."""
self._set_phase("listening")
if self._user_partial is None:
self._user_partial = UserMessage(text)
self._mount(self._user_partial)
else:
self._user_partial.set_text(text)
self._scroll_end()

def show_user_final(self, text: str) -> None:
"""Commit the finalized user turn and move to the thinking phase."""
if self._user_partial is None:
self._mount(UserMessage(text))
else:
self._user_partial.set_text(text)
self._user_partial = None # finalized; the next partial starts a fresh line
self._set_phase("thinking")
self._scroll_end()

def begin_reply(self) -> None:
"""Open a fresh reply widget the agent's sentences stream into; switch to speaking."""
self._set_phase("speaking")
self._reply_msg = AssistantMessage()
self._mount(self._reply_msg)

def show_agent_sentence(self, text: str) -> None:
"""Append one spoken sentence to the in-flight reply."""
if self._reply_msg is None:
self._reply_msg = AssistantMessage()
self._mount(self._reply_msg)
self._reply_msg.stream(f"{text} ")
self._scroll_end()

def end_reply(self, *, interrupted: bool) -> None:
"""Finalize the reply (rendered as Markdown) and return to listening."""
if self._reply_msg is not None:
self._reply_msg.finalize(self._reply_msg.text)
self._reply_msg = None
if interrupted:
self._mount(Note("(interrupted)"))
self._set_phase("listening")

def _show_error(self, message: str) -> None:
self._mount(ErrorMessage(message))

# --- voice bar ------------------------------------------------------------

def _set_phase(self, phase: str) -> None:
self._voice_phase = phase
self._render_voicebar()

def _render_voicebar(self) -> None:
"""Paint the voice bar for the current phase (no Ctrl-V hint — input is voice-only)."""
self.query_one("#voicebar", Static).update(
tui_status.voicebar_markup(self._voice_phase, next(self._voice_frames))
)

def _tick_voice(self) -> None:
"""Advance the voice-bar meter one frame (the animation timer's callback)."""
self._render_voicebar()

# --- splash / mounting ----------------------------------------------------

def _write_splash(self) -> None:
rows = [f"[bold {banner.BRAND_HEX}]{row}[/]" for row in banner.wordmark()]
rows += [
f"[dim]{banner.version()}[/dim]",
"",
f"[{banner.BRAND_HEX}]{_READY_LINE}[/]",
f"[dim]{_TIP_LINE}[/dim]",
]
self._mount(Static("\n".join(rows)))

def _mount(self, widget: Static) -> None:
log = self.query_one("#log", VerticalScroll)
log.mount(widget)
log.scroll_end(animate=False) # pragma: no mutate — cosmetic; animate flag is unassertable

def _scroll_end(self) -> None:
self.query_one("#log", VerticalScroll).scroll_end(animate=False) # pragma: no mutate

# --- quit -----------------------------------------------------------------

def action_stop(self) -> None:
"""Ctrl-C / Ctrl-Q: stop the audio (unblocking the cascade worker) and exit."""
self._teardown()
self.exit()

def on_unmount(self) -> None:
"""Close the audio on any exit path, in case the worker is still blocked on the mic."""
self._teardown()

def _teardown(self) -> None:
if self._stopped:
return
self._stopped = True
self._on_stop()
11 changes: 10 additions & 1 deletion aai_cli/code_agent/messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,22 @@ def __init__(self, text: str) -> None:
super().__init__(Text(text, style=_DIM))


def _user_markup(text: str) -> Text:
"""The styled `» …` prompt echo, built in one place for the constructor and set_text."""
return Text(f"» {text}", style="bold #38bdf8")


class UserMessage(Static):
"""The echoed user prompt, with a top margin so each turn is visually separated."""

DEFAULT_CSS = "UserMessage { margin-top: 1; }"

def __init__(self, text: str) -> None:
super().__init__(Text(f"» {text}", style="bold #38bdf8"))
super().__init__(_user_markup(text))

def set_text(self, text: str) -> None:
"""Replace the shown prompt text — grows an interim voice transcript in place."""
self.update(_user_markup(text))


class AssistantMessage(Static):
Expand Down
Loading
Loading