diff --git a/aai_cli/agent_cascade/engine.py b/aai_cli/agent_cascade/engine.py index 60c1087..9c940f8 100644 --- a/aai_cli/agent_cascade/engine.py +++ b/aai_cli/agent_cascade/engine.py @@ -83,6 +83,10 @@ def enqueue(self, pcm: bytes) -> None: def flush(self) -> None: """Drop any queued-but-unplayed audio (used on barge-in).""" + def pending(self) -> int: + """How many unplayed samples are still queued (>0 while audio is audibly playing).""" + ... + def close(self) -> None: """Close the output stream.""" @@ -191,30 +195,40 @@ def on_turn(self, event: object) -> None: self.renderer.user_partial(text) self._barge_in() - def _barge_in(self) -> None: - """Stop a reply that is still playing: flush the queued audio and cancel the - worker (the player flush is what silences the browser-equivalent local buffer).""" - if self._reply is not None and self._reply.is_alive(): + def _silence_if_speaking(self) -> bool: + """Cut the agent off if it's currently audible: signal the worker and flush audio. + + "Speaking" is broader than a live reply worker: it also covers the greeting (enqueued + with no worker) and the *tail* of a reply whose worker has already finished enqueuing + but whose audio is still draining from the player. In every case there is sound to + silence, so a barge-in or an interrupt should cut it — a bare ``_reply.is_alive()`` + check would leave the greeting (and a reply's last sentence) un-interruptible. Setting + the stop flag is harmless when no worker is running (the next ``_start_reply`` clears + it). Returns whether anything was silenced. + """ + speaking = (self._reply is not None and self._reply.is_alive()) or self.player.pending() > 0 + if speaking: self._stop.set() self.player.flush() + return speaking + + def _barge_in(self) -> None: + """Stop whatever the agent is saying (reply, greeting, or a draining tail) and join.""" + self._silence_if_speaking() self._join_reply() def interrupt_reply(self) -> bool: """Signal an in-flight reply to stop, without waiting for it; True if one was playing. The UI-thread-safe counterpart to a spoken barge-in: the live TUI's Escape/Ctrl-C - calls this to silence the agent mid-reply without the user having to talk over it. - Flushing the queued audio stops speech at once; the reply worker then sees the stop - flag, unwinds on its own, and emits ``reply_done`` so the front-end returns to + calls this to silence the agent mid-reply (or mid-greeting) without the user having to + talk over it. Flushing the queued audio stops speech at once; a reply worker then sees + the stop flag, unwinds on its own, and emits ``reply_done`` so the front-end returns to listening (the STT loop keeps running, so the next spoken turn is handled normally). It deliberately does *not* join the worker — a join from the UI thread would deadlock against the worker's own ``call_from_thread`` render hops. """ - playing = self._reply is not None and self._reply.is_alive() - if playing: - self._stop.set() - self.player.flush() - return playing + return self._silence_if_speaking() def _join_reply(self) -> None: """Wait for the current reply worker (if any) to unwind, then drop the handle.""" diff --git a/aai_cli/agent_cascade/tui.py b/aai_cli/agent_cascade/tui.py index 007c73d..2d4a94c 100644 --- a/aai_cli/agent_cascade/tui.py +++ b/aai_cli/agent_cascade/tui.py @@ -19,6 +19,7 @@ from textual.app import App, ComposeResult from textual.containers import VerticalScroll +from textual.css.query import NoMatches from textual.widgets import Static from aai_cli.code_agent import banner, tui_status @@ -128,6 +129,15 @@ def __init__( self._user_partial: UserMessage | None = None # the in-place "you: …" widget for a turn self._reply_msg: AssistantMessage | None = None # the reply widget sentences stream into self._stopped = False # guards on_stop against a double teardown (quit + unmount) + # A fatal cascade error caught on the worker thread, re-raised on the main thread (after + # app.run returns) so the command exits with the error's code instead of a silent 0 — + # the same record-then-re-raise the engine's CascadeSession.error uses across threads. + self._error: CLIError | None = None + + @property + def error(self) -> CLIError | None: + """The fatal cascade error (if any), for the launcher to re-raise after ``run`` returns.""" + return self._error def compose(self) -> ComposeResult: yield VerticalScroll(id="log") @@ -154,6 +164,9 @@ def _run(self) -> None: try: self._run_conversation(renderer) except CLIError as exc: + # Keep the error so the main thread can re-raise it for the right exit code, and show + # it inline too (the post-exit stderr render is the durable copy a torn-down TUI keeps). + self._error = exc self._safely(self._show_error, exc.message) # The cascade returned (STT closed, a leg failed, or a quit closed the audio) — exit. self._safely(self.exit) @@ -233,10 +246,17 @@ def _set_phase(self, phase: str) -> None: self._render_voicebar() def _render_voicebar(self) -> None: - """Paint the voice bar for the current phase (no Ctrl-V hint — input is voice-only).""" - self.query_one("#voicebar", Static).update( - tui_status.voicebar_markup(self._voice_phase, next(self._voice_frames)) - ) + """Paint the voice bar for the current phase (no Ctrl-V hint — input is voice-only). + + A no-op once the bar is gone: the 0.3s animation timer can fire one last ``_tick_voice`` + during teardown, after the DOM is dismantled but before the interval is cancelled, so the + query is defensive (a miss only happens on the way out, where the repaint is moot). + """ + try: + bar = self.query_one("#voicebar", Static) + except NoMatches: + return + bar.update(tui_status.voicebar_markup(self._voice_phase, next(self._voice_frames))) def _tick_voice(self) -> None: """Advance the voice-bar meter one frame (the animation timer's callback).""" diff --git a/aai_cli/code_agent/tui.py b/aai_cli/code_agent/tui.py index 2624f4f..2871038 100644 --- a/aai_cli/code_agent/tui.py +++ b/aai_cli/code_agent/tui.py @@ -18,6 +18,7 @@ from rich.markup import escape from textual.app import ComposeResult from textual.containers import Horizontal, VerticalScroll +from textual.css.query import NoMatches from textual.screen import ModalScreen from textual.widgets import Input, Static from textual.worker import Worker @@ -102,6 +103,8 @@ class CodeAgentApp(_VoiceLegs): ("ctrl+v", "toggle_voice", "Toggle voice"), ("ctrl+o", "toggle_output", "Expand/collapse output"), ] + # The voice-bar meter's animation cadence; a cosmetic value, so it's mutation-exempt. + _TICK_SECONDS: ClassVar[float] = 0.3 # pragma: no mutate def __init__( self, @@ -336,7 +339,7 @@ def _sync_input_mode(self) -> None: if listening: self._render_voicebar() if self._voice_timer is None: # animate the meter only while the bar is shown - self._voice_timer = self.set_interval(0.3, self._tick_voice) # pragma: no mutate + self._voice_timer = self.set_interval(self._TICK_SECONDS, self._render_voicebar) else: if self._voice_timer is not None: self._voice_timer.stop() @@ -349,16 +352,13 @@ def _set_voice_phase(self, phase: str) -> None: self._render_voicebar() def _render_voicebar(self) -> None: - """Paint the voice bar for the current phase: an animated meter, label, and accent.""" + """Paint/advance the voice bar (the 0.3s timer's callback); no-op once the bar is gone.""" + try: + bar = self.query_one("#voicebar", Static) + except NoMatches: + return # a tick can fire during teardown after the bar is removed; the repaint is moot hint = " [dim](Ctrl-V to type)[/dim]" if self._voice_phase == "listening" else "" - meter = next(self._voice_frames) - self.query_one("#voicebar", Static).update( - voicebar_markup(self._voice_phase, meter, hint=hint) - ) - - def _tick_voice(self) -> None: - """Advance the voice-bar meter one frame (the animation timer's callback).""" - self._render_voicebar() + bar.update(voicebar_markup(self._voice_phase, next(self._voice_frames), hint=hint)) def _ask(self, question: str) -> str: """Block the worker on a modal input screen and return the user's answer.""" diff --git a/aai_cli/commands/agent_cascade/_exec.py b/aai_cli/commands/agent_cascade/_exec.py index 5d43caa..d177241 100644 --- a/aai_cli/commands/agent_cascade/_exec.py +++ b/aai_cli/commands/agent_cascade/_exec.py @@ -245,6 +245,11 @@ def run_conversation(renderer: engine.Renderer) -> None: web_note=_web_search_note(), ) app.run(mouse=False) + # A fatal leg failure (STT/LLM/TTS) is caught on the TUI's worker thread; re-raise it now + # that the app has torn down so the command exits with the error's code (and renders it to + # stderr, which survives the restored screen) instead of a silent success. + if app.error is not None: + raise app.error def _launch_tui(api_key: str, opts: AgentCascadeOptions, config: CascadeConfig) -> None: diff --git a/tests/_cascade_fakes.py b/tests/_cascade_fakes.py index 4985cf4..fac7222 100644 --- a/tests/_cascade_fakes.py +++ b/tests/_cascade_fakes.py @@ -46,6 +46,10 @@ def __init__(self): self.flushed = 0 self.started = False self.closed = False + # What pending() reports: tests set it >0 to simulate audio still draining (a + # greeting, or a reply tail whose worker already exited). flush() zeroes it, mirroring + # the real player dropping its queue. + self.pending_samples = 0 def start(self): self.started = True @@ -55,6 +59,10 @@ def enqueue(self, pcm): def flush(self): self.flushed += 1 + self.pending_samples = 0 + + def pending(self): + return self.pending_samples def close(self): self.closed = True diff --git a/tests/test_agent_cascade_engine.py b/tests/test_agent_cascade_engine.py index 23d7ec6..091c64d 100644 --- a/tests/test_agent_cascade_engine.py +++ b/tests/test_agent_cascade_engine.py @@ -266,6 +266,29 @@ def test_interrupt_reply_is_a_noop_when_nothing_is_playing(): assert not session._stop.is_set() +def test_interrupt_reply_silences_the_greeting_with_no_worker(): + # The greeting is enqueued with no reply worker; Escape/Ctrl-C must still cut it. With audio + # queued (pending>0) the interrupt flushes the player and reports that it silenced something, + # so the live TUI interrupts the greeting instead of (for Ctrl-C) quitting the session. + session, _renderer, player = make_session() + player.pending_samples = 1 # even a single queued sample (>0) means sound is still playing + assert session.interrupt_reply() is True + assert player.flushed == 1 + assert player.pending() == 0 # the queued greeting was dropped + + +def test_barge_in_silences_a_draining_reply_tail_after_the_worker_exits(): + # The reply worker enqueues every sentence then exits, but the audio keeps draining. A new + # spoken turn in that window must still cut the tail — a bare is_alive() check would miss it. + session, _renderer, player = make_session() + session._reply = FakeWorker(alive=False) # worker finished enqueuing + player.pending_samples = 9600 # ...but its audio is still playing + session._barge_in() + assert session._stop.is_set() + assert player.flushed == 1 + assert session._reply is None + + def test_shutdown_joins_live_worker(): session, _renderer, _player = make_session() worker = FakeWorker(alive=True) diff --git a/tests/test_code_tui.py b/tests/test_code_tui.py index 9961714..b280ccc 100644 --- a/tests/test_code_tui.py +++ b/tests/test_code_tui.py @@ -63,6 +63,21 @@ async def go() -> None: _run(go()) +def test_voicebar_render_after_the_bar_is_gone_is_a_safe_noop() -> None: + # The 0.3s animation timer drives _render_voicebar and can fire one last tick during teardown, + # after #voicebar is removed but before the interval is cancelled; it must no-op, not raise the + # NoMatches that surfaced as a py3.13 CI flake. + async def go() -> None: + app = CodeAgentApp(agent=FakeAgent([])) + async with app.run_test(size=(100, 30)) as pilot: + await pilot.pause() + await app.query_one("#voicebar", Static).remove() + assert len(app.query("#voicebar")) == 0 + app._render_voicebar() # must not raise now that the bar is gone + + _run(go()) + + def test_initial_prompt_runs_a_turn_on_mount() -> None: async def go() -> None: agent = FakeAgent([{"messages": [HumanMessage("seed"), AIMessage("seeded reply")]}]) diff --git a/tests/test_code_tui_voice.py b/tests/test_code_tui_voice.py index ddc3226..5e6eb61 100644 --- a/tests/test_code_tui_voice.py +++ b/tests/test_code_tui_voice.py @@ -342,7 +342,7 @@ async def go() -> None: running_timer = app._voice_timer assert running_timer is not None before = str(app.query_one("#voicebar", Static).render()) - app._tick_voice() + app._render_voicebar() assert str(app.query_one("#voicebar", Static).render()) != before # meter advanced app.action_toggle_voice() # voice off -> timer stopped await pilot.pause() diff --git a/tests/test_live_tui.py b/tests/test_live_tui.py index abe49b9..fc56e1e 100644 --- a/tests/test_live_tui.py +++ b/tests/test_live_tui.py @@ -190,7 +190,8 @@ async def go() -> None: _run(go()) -def test_voice_bar_animation_advances_on_tick() -> None: +def test_voice_bar_tick_advances_then_survives_teardown() -> None: + # Each tick advances the meter; once #voicebar is gone (a teardown tick) it must no-op, not raise. async def go() -> None: app = _app() async with app.run_test(size=(100, 30)) as pilot: @@ -198,6 +199,9 @@ async def go() -> None: before = _voicebar(app) app._tick_voice() assert _voicebar(app) != before # the meter advanced a frame + await app.query_one("#voicebar", Static).remove() + app._tick_voice() # bar gone -> no-op, no NoMatches + assert len(app.query("#voicebar")) == 0 _run(go()) @@ -339,13 +343,18 @@ def run_conversation(renderer) -> None: def test_worker_surfaces_a_leg_error_in_the_transcript() -> None: async def go() -> None: + boom_error = CLIError("gateway down", error_type="api_error", exit_code=1) + def boom(renderer) -> None: - raise CLIError("gateway down", error_type="api_error", exit_code=1) + raise boom_error app = _app(run_conversation=boom) async with app.run_test(size=(100, 30)) as pilot: assert await _wait_until(pilot, lambda: bool(app.query(ErrorMessage))) assert "gateway down" in str(app.query_one(ErrorMessage).render()) + # The error is also kept on the app so the launcher can re-raise it for the + # right exit code, not just shown inline (where a torn-down TUI would lose it). + assert app.error is boom_error _run(go()) @@ -401,6 +410,8 @@ def test_interactive_human_run_launches_the_tui(monkeypatch) -> None: captured: dict[str, object] = {} class FakeApp: + error = None # no fatal leg failure -> the launcher re-raises nothing + def __init__(self, *, run_conversation, on_stop, web_note): captured["run_conversation"] = run_conversation captured["on_stop"] = on_stop @@ -447,6 +458,8 @@ def fake_run_cascade(**kw): monkeypatch.setattr(engine, "run_cascade", fake_run_cascade) class FakeApp: + error = None # the conversation completes cleanly here + def __init__(self, *, run_conversation, on_stop, web_note): self._rc = run_conversation @@ -463,3 +476,25 @@ def set_interrupt(self, interrupt): assert captured["renderer"] == "renderer-sentinel" # The session's interrupt_reply was wired onto the app (so Escape/Ctrl-C can use it). assert captured["interrupt"] == "session-interrupt" + + +def test_tui_reraises_a_fatal_leg_error_for_the_exit_code(monkeypatch) -> None: + # A fatal leg failure is caught on the TUI worker thread and parked on app.error; the + # launcher must re-raise it after the app tears down so the command exits with the + # error's code (api_error -> exit 1) instead of a silent success. + _wire_tui(monkeypatch) + boom = CLIError("streaming STT closed", error_type="api_error", exit_code=1) + + class FakeApp: + error = boom # the worker thread recorded a fatal cascade error + + def __init__(self, *, run_conversation, on_stop, web_note): + pass + + def run(self, **kwargs): + pass + + monkeypatch.setattr("aai_cli.agent_cascade.tui.LiveAgentApp", FakeApp) + with pytest.raises(CLIError) as exc: + run_agent_cascade(_opts(), AppState(), json_mode=False) + assert exc.value is boom