Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 28 additions & 5 deletions aai_cli/code_agent/tui.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,9 @@ class CodeAgentApp(_VoiceLegs):
TITLE = "AssemblyAI Code"
# Ctrl-C quits (in addition to Ctrl-Q); the built-in command palette is removed.
ENABLE_COMMAND_PALETTE = False
# Interrupt/quit keys follow deepagents-code: Escape interrupts the running turn, and
# Ctrl-C interrupts a running turn or — when idle — quits only on a confirmed double-press.
# Interrupt/quit keys follow deepagents-code: Escape interrupts the running turn (or, in
# voice mode, the active listen/readback), and Ctrl-C interrupts a running turn or active
# voice, or — when idle — quits only on a confirmed double-press.
BINDINGS: ClassVar = [
("escape", "interrupt", "Interrupt"),
("ctrl+c", "quit_or_interrupt", "Interrupt / Quit"),
Expand Down Expand Up @@ -389,15 +390,37 @@ def _cancel_turn(self) -> bool:
self._note("cancelling…")
return True

def _stop_voice_activity(self) -> bool:
"""Stop in-flight voice listening/readback and go idle; True if voice was active.

In voice mode the agent is usually listening or reading a reply back — neither is a
"running turn", so without this an interrupt key would skip straight to the quit hint.
This cancels the active leg, pauses voice (the text prompt returns, no auto re-listen),
and refreshes the UI, so a first Ctrl-C/Escape gives immediate feedback. Once paused
``_voice_active`` is False, so a second press falls through to the quit path.
"""
if self._voice is None or not self._voice_active():
return False
self._voice.cancel()
self._voice_paused = True
self._refresh_status()
self._sync_input_mode() # active leg stopped -> bring the text prompt back
self._note("voice interrupted (Ctrl-V to talk again)")
return True

def action_interrupt(self) -> None:
"""Escape: interrupt a running agent turn (a no-op when idle, so Esc never quits)."""
self._cancel_turn()
"""Escape: interrupt a running agent turn or in-flight voice (a no-op when idle)."""
if not self._cancel_turn():
self._stop_voice_activity()

def action_quit_or_interrupt(self) -> None:
"""Ctrl-C: interrupt a running turn, else quit on a confirmed second press."""
"""Ctrl-C: interrupt a running turn or active voice, else quit on a second press."""
if self._cancel_turn():
self._quit_pending = False
return
if self._stop_voice_activity():
self._arm_quit_pending() # idle now; a second Ctrl-C confirms the quit
return
if self._quit_pending:
self.exit()
else:
Expand Down
64 changes: 55 additions & 9 deletions aai_cli/code_agent/voice.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@
import re
import threading
from collections.abc import Callable, Iterable, Iterator
from dataclasses import dataclass
from typing import TYPE_CHECKING, Protocol
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, NoReturn, Protocol

from aai_cli.core import client, config_builder
from aai_cli.core import client, config_builder, errors
from aai_cli.core.microphone import MicrophoneSource
from aai_cli.tts import session as tts_session
from aai_cli.tts.audio import PcmPlayer
Expand Down Expand Up @@ -46,6 +46,24 @@
_ALL_CODE_READBACK = "I've updated the code — see the transcript for the details."


class _ReadbackInterrupted(errors.CLIError):
"""Internal sentinel: raised inside the readback feed when ``cancel()`` fires mid-playback.

Subclasses ``CLIError`` so streaming TTS re-raises it unchanged (``synthesize`` passes
``CLIError`` straight through), letting ``speak`` abort the player and stop promptly instead
of draining the rest of the clip. It never reaches the user — ``speak`` always catches it.
"""

def __init__(self) -> None:
# No exit_code: speak() always catches this, so the inherited default never surfaces.
super().__init__("readback interrupted", error_type="readback_interrupted")


def _abort_readback() -> NoReturn:
"""Raise the readback sentinel — the cancel signal ``speak``'s feed acts on mid-playback."""
raise _ReadbackInterrupted


def spoken_summary(text: str) -> str:
"""Reduce an assistant reply to the prose worth reading aloud.

Expand Down Expand Up @@ -138,15 +156,31 @@ class VoiceSession:
stream_fn: StreamFn = client.stream_audio
synth_fn: SynthFn = tts_session.synthesize
player_factory: Callable[[], Player] = PcmPlayer
_cancel: threading.Event = field(
default_factory=threading.Event,
init=False, # pragma: no mutate
)

def cancel(self) -> None:
"""Stop an in-flight ``listen``/``speak`` so the current voice activity ends promptly.

Set from another thread (the TUI's Ctrl-C / Escape, since the legs block on a daemon
thread): the mic gate in :meth:`listen` and the readback feed in :meth:`speak` both
check it between chunks, so listening or playback stops within a chunk rather than
running to completion. Each leg clears it on entry, so a stale cancel never preempts
the next turn.
"""
self._cancel.set()

def listen(self) -> str | None:
"""Capture one spoken turn and return its finalized transcript.

Returns the text of the first end-of-turn the server finalizes, or ``None`` when
the microphone stream ends without one (EOF — e.g. a finite source in tests). The
microphone is gated shut the moment a turn finalizes, so exactly one utterance is
captured per call; a real mic blocks until you speak (Ctrl-C to quit).
the microphone stream ends without one (EOF — e.g. a finite source in tests, or a
:meth:`cancel` mid-capture). The microphone is gated shut the moment a turn finalizes,
so exactly one utterance is captured per call; a real mic blocks until you speak.
"""
self._cancel.clear()
mic = self.mic_factory()
done = threading.Event()
captured: list[str] = []
Expand All @@ -159,7 +193,7 @@ def on_turn(event: object) -> None:

def gated() -> Iterator[bytes]:
for chunk in mic:
if done.is_set():
if done.is_set() or self._cancel.is_set():
return
yield chunk

Expand All @@ -171,13 +205,25 @@ def speak(self, text: str) -> None:

A no-op when readback is off (production, where streaming TTS has no host) or the
text is blank — so the caller can route every assistant reply here unconditionally.
A :meth:`cancel` from another thread stops playback promptly: the feed raises an
internal sentinel that aborts the player (discarding buffered audio) and ends synthesis.
"""
text = text.strip()
if not self.readback or not text:
return
self._cancel.clear()
config = SpeakConfig(text=text, sample_rate=_TTS_SAMPLE_RATE)
with self.player_factory() as player:
self.synth_fn(self.api_key, config, on_audio=player.feed)
try:
with self.player_factory() as player:

def feed(pcm: bytes, sample_rate: int) -> None:
if self._cancel.is_set():
_abort_readback()
player.feed(pcm, sample_rate)

self.synth_fn(self.api_key, config, on_audio=feed)

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

False positive — not actioning this. tts.session.synthesize is synthesize(api_key, config, *, connect=None, on_warning=None, on_audio=None) (session.py:234-241), so self.synth_fn(self.api_key, config, on_audio=feed) matches its arity exactly. synth_fn is the SynthFn Protocol attribute that defaults to synthesize; CodeQL appears to be misresolving it to a 1-arg target. The suggested self.synth_fn(config) would drop api_key and on_audio, breaking authentication and incremental playback. This call shape (previously on_audio=player.feed) predates this PR and is unchanged in behavior — only the callback was wrapped to honor cancel().


Generated by Claude Code

except _ReadbackInterrupted:
pass # cancel() asked us to stop; the player aborted on the way out


def build_voice_session(api_key: str) -> VoiceSession:
Expand Down
3 changes: 3 additions & 0 deletions aai_cli/code_agent/voice_ui.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ def listen(self) -> str | None:
def speak(self, text: str) -> None:
"""Read ``text`` back aloud (a no-op when readback is unavailable)."""

def cancel(self) -> None:
"""Stop an in-flight listen/readback so the current voice activity ends promptly."""


class _VoiceLegs(App[None]):
"""Mixin holding the off-thread voice capture/readback legs for ``CodeAgentApp``.
Expand Down
3 changes: 3 additions & 0 deletions tests/test_code_modals.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ def listen(self) -> str | None:
raise self._error
return self._transcript

def cancel(self) -> None:
"""No-op: the modal voice path never interrupts an in-flight leg."""


def _run(coro) -> None:
asyncio.run(coro)
Expand Down
90 changes: 90 additions & 0 deletions tests/test_code_tui_voice.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ def __init__(self, transcripts: list[str] | None = None, *, error: CLIError | No
self._error = error
self.spoken: list[str] = []
self.listens = 0
self.cancels = 0

def listen(self) -> str | None:
self.listens += 1
Expand All @@ -49,6 +50,9 @@ def listen(self) -> str | None:
def speak(self, text: str) -> None:
self.spoken.append(text)

def cancel(self) -> None:
self.cancels += 1


def _run(coro) -> None:
asyncio.run(coro)
Expand Down Expand Up @@ -351,6 +355,92 @@ def boom() -> None:
_run(go())


def test_ctrl_c_interrupts_active_voice_then_quits_on_second_press(
monkeypatch: pytest.MonkeyPatch,
) -> None:
# In voice mode the agent is listening/speaking (not a "running turn"), so the first Ctrl-C
# stops that voice activity and goes idle; a second Ctrl-C then confirms the quit.
async def go() -> None:
voice = FakeVoice()
app = CodeAgentApp(agent=FakeAgent([]), voice=voice)
app._voice_paused = True # keep on_mount from racing a real listen thread
async with app.run_test(size=(100, 30)) as pilot:
await pilot.pause()
exited: list[bool] = []
monkeypatch.setattr(app, "exit", lambda *a, **k: exited.append(True))
app._voice_paused = False # voice now active (listening)
app.action_quit_or_interrupt() # first press: stop the voice, go idle
assert voice.cancels == 1 # the in-flight listen/readback was cancelled
assert app._voice_paused is True # paused -> idle, the text prompt returns
assert app._quit_pending is True # quit armed so the next press confirms
assert exited == [] # did NOT quit on the first press
app.action_quit_or_interrupt() # second press: now idle -> quits
assert exited == [True]
assert voice.cancels == 1 # the idle press didn't re-cancel

_run(go())


def test_ctrl_c_on_active_voice_interrupts_even_when_a_quit_was_pending(
monkeypatch: pytest.MonkeyPatch,
) -> None:
# Stopping active voice takes priority over a pending quit: a Ctrl-C that lands while the
# agent is listening/speaking interrupts the voice and never quits, even if the quit hint
# was already armed from an earlier press.
async def go() -> None:
voice = FakeVoice()
app = CodeAgentApp(agent=FakeAgent([]), voice=voice)
app._voice_paused = True
async with app.run_test(size=(100, 30)) as pilot:
await pilot.pause()
exited: list[bool] = []
monkeypatch.setattr(app, "exit", lambda *a, **k: exited.append(True))
app._voice_paused = False # voice active (listening)
app._quit_pending = True # a quit hint was already armed
app.action_quit_or_interrupt() # Ctrl-C: interrupt the voice, do NOT quit
assert voice.cancels == 1
assert exited == [] # active voice is interrupted, never quit

_run(go())


def test_escape_interrupts_active_voice_without_arming_quit() -> None:
# Escape stops in-flight voice the same way, but (unlike Ctrl-C) never arms the quit hint.
async def go() -> None:
voice = FakeVoice()
app = CodeAgentApp(agent=FakeAgent([]), voice=voice)
app._voice_paused = True
async with app.run_test(size=(100, 30)) as pilot:
await pilot.pause()
app._voice_paused = False # active
app.action_interrupt() # Escape
assert voice.cancels == 1 # voice stopped
assert app._voice_paused is True # idle
assert app._quit_pending is False # Escape is not a quit key

_run(go())


def test_stop_voice_activity_is_a_noop_when_voice_inactive() -> None:
# No voice session, or a paused one, is not "active": the interrupt defers to the quit path
# rather than cancelling anything.
async def go() -> None:
no_voice = CodeAgentApp(agent=FakeAgent([]))
async with no_voice.run_test(size=(100, 30)) as pilot:
await pilot.pause()
assert no_voice._stop_voice_activity() is False # nothing to stop

voice = FakeVoice()
paused = CodeAgentApp(agent=FakeAgent([]), voice=voice)
paused._voice_paused = True
async with paused.run_test(size=(100, 30)) as pilot:
await pilot.pause()
assert paused._stop_voice_activity() is False # paused -> inactive
assert voice.cancels == 0 # a paused session is never cancelled

_run(go())


def test_toggle_voice_without_session_notifies_and_stays_off() -> None:
# With no voice front-end the toggle is a no-op (notice only) and never marks a pause.
async def go() -> None:
Expand Down
Loading
Loading