Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 26 additions & 12 deletions aai_cli/agent_cascade/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ def enqueue(self, pcm: bytes) -> None:
def flush(self) -> None:
"""Drop any queued-but-unplayed audio (used on barge-in)."""

def pending(self) -> int:
"""How many unplayed samples are still queued (>0 while audio is audibly playing)."""
...

def close(self) -> None:
"""Close the output stream."""

Expand Down Expand Up @@ -191,30 +195,40 @@ def on_turn(self, event: object) -> None:
self.renderer.user_partial(text)
self._barge_in()

def _barge_in(self) -> None:
"""Stop a reply that is still playing: flush the queued audio and cancel the
worker (the player flush is what silences the browser-equivalent local buffer)."""
if self._reply is not None and self._reply.is_alive():
def _silence_if_speaking(self) -> bool:
"""Cut the agent off if it's currently audible: signal the worker and flush audio.

"Speaking" is broader than a live reply worker: it also covers the greeting (enqueued
with no worker) and the *tail* of a reply whose worker has already finished enqueuing
but whose audio is still draining from the player. In every case there is sound to
silence, so a barge-in or an interrupt should cut it — a bare ``_reply.is_alive()``
check would leave the greeting (and a reply's last sentence) un-interruptible. Setting
the stop flag is harmless when no worker is running (the next ``_start_reply`` clears
it). Returns whether anything was silenced.
"""
speaking = (self._reply is not None and self._reply.is_alive()) or self.player.pending() > 0
if speaking:
self._stop.set()
self.player.flush()
return speaking

def _barge_in(self) -> None:
"""Stop whatever the agent is saying (reply, greeting, or a draining tail) and join."""
self._silence_if_speaking()
self._join_reply()

def interrupt_reply(self) -> bool:
"""Signal an in-flight reply to stop, without waiting for it; True if one was playing.

The UI-thread-safe counterpart to a spoken barge-in: the live TUI's Escape/Ctrl-C
calls this to silence the agent mid-reply without the user having to talk over it.
Flushing the queued audio stops speech at once; the reply worker then sees the stop
flag, unwinds on its own, and emits ``reply_done`` so the front-end returns to
calls this to silence the agent mid-reply (or mid-greeting) without the user having to
talk over it. Flushing the queued audio stops speech at once; a reply worker then sees
the stop flag, unwinds on its own, and emits ``reply_done`` so the front-end returns to
listening (the STT loop keeps running, so the next spoken turn is handled normally).
It deliberately does *not* join the worker — a join from the UI thread would deadlock
against the worker's own ``call_from_thread`` render hops.
"""
playing = self._reply is not None and self._reply.is_alive()
if playing:
self._stop.set()
self.player.flush()
return playing
return self._silence_if_speaking()

def _join_reply(self) -> None:
"""Wait for the current reply worker (if any) to unwind, then drop the handle."""
Expand Down
28 changes: 24 additions & 4 deletions aai_cli/agent_cascade/tui.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

from textual.app import App, ComposeResult
from textual.containers import VerticalScroll
from textual.css.query import NoMatches
from textual.widgets import Static

from aai_cli.code_agent import banner, tui_status
Expand Down Expand Up @@ -128,6 +129,15 @@ def __init__(
self._user_partial: UserMessage | None = None # the in-place "you: …" widget for a turn
self._reply_msg: AssistantMessage | None = None # the reply widget sentences stream into
self._stopped = False # guards on_stop against a double teardown (quit + unmount)
# A fatal cascade error caught on the worker thread, re-raised on the main thread (after
# app.run returns) so the command exits with the error's code instead of a silent 0 —
# the same record-then-re-raise the engine's CascadeSession.error uses across threads.
self._error: CLIError | None = None

@property
def error(self) -> CLIError | None:
"""The fatal cascade error (if any), for the launcher to re-raise after ``run`` returns."""
return self._error

def compose(self) -> ComposeResult:
yield VerticalScroll(id="log")
Expand All @@ -154,6 +164,9 @@ def _run(self) -> None:
try:
self._run_conversation(renderer)
except CLIError as exc:
# Keep the error so the main thread can re-raise it for the right exit code, and show
# it inline too (the post-exit stderr render is the durable copy a torn-down TUI keeps).
self._error = exc
self._safely(self._show_error, exc.message)
# The cascade returned (STT closed, a leg failed, or a quit closed the audio) — exit.
self._safely(self.exit)
Expand Down Expand Up @@ -233,10 +246,17 @@ def _set_phase(self, phase: str) -> None:
self._render_voicebar()

def _render_voicebar(self) -> None:
"""Paint the voice bar for the current phase (no Ctrl-V hint — input is voice-only)."""
self.query_one("#voicebar", Static).update(
tui_status.voicebar_markup(self._voice_phase, next(self._voice_frames))
)
"""Paint the voice bar for the current phase (no Ctrl-V hint — input is voice-only).

A no-op once the bar is gone: the 0.3s animation timer can fire one last ``_tick_voice``
during teardown, after the DOM is dismantled but before the interval is cancelled, so the
query is defensive (a miss only happens on the way out, where the repaint is moot).
"""
try:
bar = self.query_one("#voicebar", Static)
except NoMatches:
return
bar.update(tui_status.voicebar_markup(self._voice_phase, next(self._voice_frames)))

def _tick_voice(self) -> None:
"""Advance the voice-bar meter one frame (the animation timer's callback)."""
Expand Down
20 changes: 10 additions & 10 deletions aai_cli/code_agent/tui.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from rich.markup import escape
from textual.app import ComposeResult
from textual.containers import Horizontal, VerticalScroll
from textual.css.query import NoMatches
from textual.screen import ModalScreen
from textual.widgets import Input, Static
from textual.worker import Worker
Expand Down Expand Up @@ -102,6 +103,8 @@ class CodeAgentApp(_VoiceLegs):
("ctrl+v", "toggle_voice", "Toggle voice"),
("ctrl+o", "toggle_output", "Expand/collapse output"),
]
# The voice-bar meter's animation cadence; a cosmetic value, so it's mutation-exempt.
_TICK_SECONDS: ClassVar[float] = 0.3 # pragma: no mutate

def __init__(
self,
Expand Down Expand Up @@ -336,7 +339,7 @@ def _sync_input_mode(self) -> None:
if listening:
self._render_voicebar()
if self._voice_timer is None: # animate the meter only while the bar is shown
self._voice_timer = self.set_interval(0.3, self._tick_voice) # pragma: no mutate
self._voice_timer = self.set_interval(self._TICK_SECONDS, self._render_voicebar)
else:
if self._voice_timer is not None:
self._voice_timer.stop()
Expand All @@ -349,16 +352,13 @@ def _set_voice_phase(self, phase: str) -> None:
self._render_voicebar()

def _render_voicebar(self) -> None:
"""Paint the voice bar for the current phase: an animated meter, label, and accent."""
"""Paint/advance the voice bar (the 0.3s timer's callback); no-op once the bar is gone."""
try:
bar = self.query_one("#voicebar", Static)
except NoMatches:
return # a tick can fire during teardown after the bar is removed; the repaint is moot
hint = " [dim](Ctrl-V to type)[/dim]" if self._voice_phase == "listening" else ""
meter = next(self._voice_frames)
self.query_one("#voicebar", Static).update(
voicebar_markup(self._voice_phase, meter, hint=hint)
)

def _tick_voice(self) -> None:
"""Advance the voice-bar meter one frame (the animation timer's callback)."""
self._render_voicebar()
bar.update(voicebar_markup(self._voice_phase, next(self._voice_frames), hint=hint))

def _ask(self, question: str) -> str:
"""Block the worker on a modal input screen and return the user's answer."""
Expand Down
5 changes: 5 additions & 0 deletions aai_cli/commands/agent_cascade/_exec.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,11 @@ def run_conversation(renderer: engine.Renderer) -> None:
web_note=_web_search_note(),
)
app.run(mouse=False)
# A fatal leg failure (STT/LLM/TTS) is caught on the TUI's worker thread; re-raise it now
# that the app has torn down so the command exits with the error's code (and renders it to
# stderr, which survives the restored screen) instead of a silent success.
if app.error is not None:
raise app.error


def _launch_tui(api_key: str, opts: AgentCascadeOptions, config: CascadeConfig) -> None:
Expand Down
8 changes: 8 additions & 0 deletions tests/_cascade_fakes.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ def __init__(self):
self.flushed = 0
self.started = False
self.closed = False
# What pending() reports: tests set it >0 to simulate audio still draining (a
# greeting, or a reply tail whose worker already exited). flush() zeroes it, mirroring
# the real player dropping its queue.
self.pending_samples = 0

def start(self):
self.started = True
Expand All @@ -55,6 +59,10 @@ def enqueue(self, pcm):

def flush(self):
self.flushed += 1
self.pending_samples = 0

def pending(self):
return self.pending_samples

def close(self):
self.closed = True
Expand Down
23 changes: 23 additions & 0 deletions tests/test_agent_cascade_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,29 @@ def test_interrupt_reply_is_a_noop_when_nothing_is_playing():
assert not session._stop.is_set()


def test_interrupt_reply_silences_the_greeting_with_no_worker():
# The greeting is enqueued with no reply worker; Escape/Ctrl-C must still cut it. With audio
# queued (pending>0) the interrupt flushes the player and reports that it silenced something,
# so the live TUI interrupts the greeting instead of (for Ctrl-C) quitting the session.
session, _renderer, player = make_session()
player.pending_samples = 1 # even a single queued sample (>0) means sound is still playing
assert session.interrupt_reply() is True
assert player.flushed == 1
assert player.pending() == 0 # the queued greeting was dropped


def test_barge_in_silences_a_draining_reply_tail_after_the_worker_exits():
# The reply worker enqueues every sentence then exits, but the audio keeps draining. A new
# spoken turn in that window must still cut the tail — a bare is_alive() check would miss it.
session, _renderer, player = make_session()
session._reply = FakeWorker(alive=False) # worker finished enqueuing
player.pending_samples = 9600 # ...but its audio is still playing
session._barge_in()
assert session._stop.is_set()
assert player.flushed == 1
assert session._reply is None


def test_shutdown_joins_live_worker():
session, _renderer, _player = make_session()
worker = FakeWorker(alive=True)
Expand Down
15 changes: 15 additions & 0 deletions tests/test_code_tui.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,21 @@ async def go() -> None:
_run(go())


def test_voicebar_render_after_the_bar_is_gone_is_a_safe_noop() -> None:
# The 0.3s animation timer drives _render_voicebar and can fire one last tick during teardown,
# after #voicebar is removed but before the interval is cancelled; it must no-op, not raise the
# NoMatches that surfaced as a py3.13 CI flake.
async def go() -> None:
app = CodeAgentApp(agent=FakeAgent([]))
async with app.run_test(size=(100, 30)) as pilot:
await pilot.pause()
await app.query_one("#voicebar", Static).remove()
assert len(app.query("#voicebar")) == 0
app._render_voicebar() # must not raise now that the bar is gone

_run(go())


def test_initial_prompt_runs_a_turn_on_mount() -> None:
async def go() -> None:
agent = FakeAgent([{"messages": [HumanMessage("seed"), AIMessage("seeded reply")]}])
Expand Down
2 changes: 1 addition & 1 deletion tests/test_code_tui_voice.py
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ async def go() -> None:
running_timer = app._voice_timer
assert running_timer is not None
before = str(app.query_one("#voicebar", Static).render())
app._tick_voice()
app._render_voicebar()
assert str(app.query_one("#voicebar", Static).render()) != before # meter advanced
app.action_toggle_voice() # voice off -> timer stopped
await pilot.pause()
Expand Down
39 changes: 37 additions & 2 deletions tests/test_live_tui.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,14 +190,18 @@ async def go() -> None:
_run(go())


def test_voice_bar_animation_advances_on_tick() -> None:
def test_voice_bar_tick_advances_then_survives_teardown() -> None:
# Each tick advances the meter; once #voicebar is gone (a teardown tick) it must no-op, not raise.
async def go() -> None:
app = _app()
async with app.run_test(size=(100, 30)) as pilot:
await pilot.pause()
before = _voicebar(app)
app._tick_voice()
assert _voicebar(app) != before # the meter advanced a frame
await app.query_one("#voicebar", Static).remove()
app._tick_voice() # bar gone -> no-op, no NoMatches
assert len(app.query("#voicebar")) == 0

_run(go())

Expand Down Expand Up @@ -339,13 +343,18 @@ def run_conversation(renderer) -> None:

def test_worker_surfaces_a_leg_error_in_the_transcript() -> None:
async def go() -> None:
boom_error = CLIError("gateway down", error_type="api_error", exit_code=1)

def boom(renderer) -> None:
raise CLIError("gateway down", error_type="api_error", exit_code=1)
raise boom_error

app = _app(run_conversation=boom)
async with app.run_test(size=(100, 30)) as pilot:
assert await _wait_until(pilot, lambda: bool(app.query(ErrorMessage)))
assert "gateway down" in str(app.query_one(ErrorMessage).render())
# The error is also kept on the app so the launcher can re-raise it for the
# right exit code, not just shown inline (where a torn-down TUI would lose it).
assert app.error is boom_error

_run(go())

Expand Down Expand Up @@ -401,6 +410,8 @@ def test_interactive_human_run_launches_the_tui(monkeypatch) -> None:
captured: dict[str, object] = {}

class FakeApp:
error = None # no fatal leg failure -> the launcher re-raises nothing

def __init__(self, *, run_conversation, on_stop, web_note):
captured["run_conversation"] = run_conversation
captured["on_stop"] = on_stop
Expand Down Expand Up @@ -447,6 +458,8 @@ def fake_run_cascade(**kw):
monkeypatch.setattr(engine, "run_cascade", fake_run_cascade)

class FakeApp:
error = None # the conversation completes cleanly here

def __init__(self, *, run_conversation, on_stop, web_note):
self._rc = run_conversation

Expand All @@ -463,3 +476,25 @@ def set_interrupt(self, interrupt):
assert captured["renderer"] == "renderer-sentinel"
# The session's interrupt_reply was wired onto the app (so Escape/Ctrl-C can use it).
assert captured["interrupt"] == "session-interrupt"


def test_tui_reraises_a_fatal_leg_error_for_the_exit_code(monkeypatch) -> None:
# A fatal leg failure is caught on the TUI worker thread and parked on app.error; the
# launcher must re-raise it after the app tears down so the command exits with the
# error's code (api_error -> exit 1) instead of a silent success.
_wire_tui(monkeypatch)
boom = CLIError("streaming STT closed", error_type="api_error", exit_code=1)

class FakeApp:
error = boom # the worker thread recorded a fatal cascade error

def __init__(self, *, run_conversation, on_stop, web_note):
pass

def run(self, **kwargs):
pass

monkeypatch.setattr("aai_cli.agent_cascade.tui.LiveAgentApp", FakeApp)
with pytest.raises(CLIError) as exc:
run_agent_cascade(_opts(), AppState(), json_mode=False)
assert exc.value is boom
Loading