From 9d7bac82a25aba16359b5943332965d3b59af3b3 Mon Sep 17 00:00:00 2001 From: Alex Kroman Date: Tue, 16 Jun 2026 14:26:35 -0700 Subject: [PATCH] Stop streaming commands cleanly on SIGTERM, like Ctrl-C The realtime commands treat Ctrl-C (SIGINT -> KeyboardInterrupt) as a clean "user stopped" signal that flushes closing state and exits 0. An external supervisor that stops the command from outside the terminal -- a Hammerspoon hotkey, a service manager, a wrapper script's `kill` -- sends SIGTERM, whose default action aborts the process before that flush runs, and delivering SIGINT instead means signalling the whole process group under a shell wrapper. Add `core.signals.terminate_as_interrupt`, a context manager that re-raises SIGTERM as KeyboardInterrupt (main-thread-only, restores the prior handler), and wrap the interactive run of `stream` (single + --from-stdin batch), `agent`, `agent-cascade`, and `speak` with it. SIGTERM now routes through each command's existing clean-stop path, so a controller can stop a recording with a plain SIGTERM. No new flag; behavior is transparent. Co-Authored-By: Claude Opus 4.8 (1M context) --- aai_cli/commands/agent/_exec.py | 7 +++- aai_cli/commands/agent_cascade/_exec.py | 7 +++- aai_cli/commands/speak/_exec.py | 53 +++++++++++++----------- aai_cli/commands/stream/_exec.py | 10 +++-- aai_cli/core/signals.py | 45 ++++++++++++++++++++ tests/test_agent_cascade_command.py | 17 ++++++++ tests/test_agent_command.py | 20 +++++++++ tests/test_signals.py | 44 ++++++++++++++++++++ tests/test_speak.py | 18 ++++++++ tests/test_stream_sigterm.py | 55 +++++++++++++++++++++++++ 10 files changed, 244 insertions(+), 32 deletions(-) create mode 100644 aai_cli/core/signals.py create mode 100644 tests/test_signals.py create mode 100644 tests/test_stream_sigterm.py diff --git a/aai_cli/commands/agent/_exec.py b/aai_cli/commands/agent/_exec.py index 60c16441..7bd2ebe0 100644 --- a/aai_cli/commands/agent/_exec.py +++ b/aai_cli/commands/agent/_exec.py @@ -22,7 +22,7 @@ from aai_cli.agent.voices import VOICE_NAMES from aai_cli.app.agent_shared import resolve_system_prompt as _resolve_system_prompt from aai_cli.app.context import AppState -from aai_cli.core import choices, client +from aai_cli.core import choices, client, signals from aai_cli.core.errors import UsageError from aai_cli.streaming.session import resolve_output_modes from aai_cli.streaming.sources import FileSource @@ -130,7 +130,10 @@ def run_agent(opts: AgentOptions, state: AppState, *, json_mode: bool) -> None: exit_after_reply=from_file, ) try: - run_session(api_key, renderer=renderer, player=player, mic=audio, config=run_config) + # SIGTERM stops the agent as cleanly as Ctrl-C, so an external supervisor + # (Hammerspoon, a service manager, a wrapper's `kill`) can end the session. + with signals.terminate_as_interrupt(): + run_session(api_key, renderer=renderer, player=player, mic=audio, config=run_config) except KeyboardInterrupt: renderer.stopped() except BrokenPipeError as exc: diff --git a/aai_cli/commands/agent_cascade/_exec.py b/aai_cli/commands/agent_cascade/_exec.py index d13cdf1e..bff21eff 100644 --- a/aai_cli/commands/agent_cascade/_exec.py +++ b/aai_cli/commands/agent_cascade/_exec.py @@ -22,7 +22,7 @@ from aai_cli.agent_cascade.config import DEFAULT_MAX_HISTORY, CascadeConfig from aai_cli.app.agent_shared import resolve_system_prompt as _resolve_system_prompt from aai_cli.app.context import AppState -from aai_cli.core import choices, client, config_builder, llm +from aai_cli.core import choices, client, config_builder, llm, signals from aai_cli.core.errors import UsageError from aai_cli.streaming import turn_presets from aai_cli.streaming.session import resolve_output_modes @@ -213,7 +213,10 @@ def run_agent_cascade(opts: AgentCascadeOptions, state: AppState, *, json_mode: stt_params = _build_stt_params(opts, sample_rate) deps = engine.CascadeDeps.real(api_key, config, audio=audio, stt_params=stt_params) try: - engine.run_cascade(renderer=renderer, player=player, config=config, deps=deps) + # SIGTERM stops the cascade as cleanly as Ctrl-C, so an external supervisor + # (Hammerspoon, a service manager, a wrapper's `kill`) can end the session. + with signals.terminate_as_interrupt(): + engine.run_cascade(renderer=renderer, player=player, config=config, deps=deps) except KeyboardInterrupt: renderer.stopped() except BrokenPipeError as exc: diff --git a/aai_cli/commands/speak/_exec.py b/aai_cli/commands/speak/_exec.py index 82febd89..441306c5 100644 --- a/aai_cli/commands/speak/_exec.py +++ b/aai_cli/commands/speak/_exec.py @@ -12,7 +12,7 @@ from pathlib import Path from aai_cli.app.context import AppState -from aai_cli.core import stdio +from aai_cli.core import signals, stdio from aai_cli.core.errors import UsageError from aai_cli.tts import audio, dialogue, session, voices from aai_cli.ui import output @@ -182,29 +182,32 @@ def run_speak(opts: SpeakOptions, state: AppState, *, json_mode: bool) -> None: spoken = _read_text(opts.text) api_key = state.resolve_api_key() bare_voice, overrides = dialogue.parse_voice_overrides(opts.voice) - if dialogue.looks_like_speaker_labeled(spoken): - _speak_dialogue( - api_key, - spoken, - bare_voice, - overrides, - opts, - json_mode=json_mode, - quiet=state.quiet, - ) - else: - if overrides: - # Mirror the inverse warning in _speak_dialogue: never drop a - # requested voice mapping silently. - output.emit_warning( - "Ignoring --voice SPEAKER=VOICE mappings; input has no speaker labels.", + # SIGTERM aborts synthesis/playback the same way Ctrl-C does, so an external + # supervisor (Hammerspoon, a service manager, a wrapper's `kill`) can stop it. + with signals.terminate_as_interrupt(): + if dialogue.looks_like_speaker_labeled(spoken): + _speak_dialogue( + api_key, + spoken, + bare_voice, + overrides, + opts, json_mode=json_mode, + quiet=state.quiet, + ) + else: + if overrides: + # Mirror the inverse warning in _speak_dialogue: never drop a + # requested voice mapping silently. + output.emit_warning( + "Ignoring --voice SPEAKER=VOICE mappings; input has no speaker labels.", + json_mode=json_mode, + ) + _speak_single( + api_key, + spoken, + bare_voice or voices.default_voice(opts.language), + opts, + json_mode=json_mode, + quiet=state.quiet, ) - _speak_single( - api_key, - spoken, - bare_voice or voices.default_voice(opts.language), - opts, - json_mode=json_mode, - quiet=state.quiet, - ) diff --git a/aai_cli/commands/stream/_exec.py b/aai_cli/commands/stream/_exec.py index af0e7a44..22d7a62c 100644 --- a/aai_cli/commands/stream/_exec.py +++ b/aai_cli/commands/stream/_exec.py @@ -19,7 +19,7 @@ from aai_cli import code_gen from aai_cli.app.context import AppState -from aai_cli.core import choices, client, config_builder, stdio, youtube +from aai_cli.core import choices, client, config_builder, signals, stdio, youtube from aai_cli.core.errors import UsageError, mutually_exclusive from aai_cli.core.microphone import MicrophoneSource from aai_cli.streaming import record, turn_presets @@ -305,7 +305,10 @@ def run_stream(opts: StreamOptions, state: AppState, *, json_mode: bool) -> None """Execute one `assembly stream` invocation from already-parsed flags.""" text_mode, json_mode = resolve_output_modes(opts.output_field, json_mode=json_mode) if opts.from_stdin: - _run_batch(opts, state, json_mode=json_mode, text_mode=text_mode) + # SIGTERM stops the stream as cleanly as Ctrl-C, so an external supervisor + # (Hammerspoon, a service manager, a wrapper's `kill`) can end a recording. + with signals.terminate_as_interrupt(): + _run_batch(opts, state, json_mode=json_mode, text_mode=text_mode) return sources = opts.source_options() base_flags = opts.base_flags() @@ -348,4 +351,5 @@ def run_stream(opts: StreamOptions, state: AppState, *, json_mode: bool) -> None save_audio=opts.save_audio, llm_interval=opts.llm_interval, ) - _dispatch(session, sources) + with signals.terminate_as_interrupt(): + _dispatch(session, sources) diff --git a/aai_cli/core/signals.py b/aai_cli/core/signals.py new file mode 100644 index 00000000..449cc4b8 --- /dev/null +++ b/aai_cli/core/signals.py @@ -0,0 +1,45 @@ +"""Signal plumbing for the long-running interactive commands. + +`stream` (and the other realtime commands) treat Ctrl-C as a clean "user stopped" +signal: SIGINT arrives as a ``KeyboardInterrupt`` that the streaming lifecycle catches +to flush its closing state and exit 0. This module lets an *external* stop reach that +same path — see ``terminate_as_interrupt``. +""" + +from __future__ import annotations + +import contextlib +import signal +import threading +from collections.abc import Generator +from types import FrameType + + +@contextlib.contextmanager +def terminate_as_interrupt() -> Generator[None]: + """Map SIGTERM onto ``KeyboardInterrupt`` for the duration of the block. + + A supervisor that stops the command from outside the terminal — a Hammerspoon + hotkey, a service manager, a wrapper script's ``kill`` — sends SIGTERM, whose + default action aborts the process before the graceful flush runs. Re-raising it + as ``KeyboardInterrupt`` routes SIGTERM through the same clean-stop path as + Ctrl-C, so a controller can stop a recording with a plain SIGTERM and never has + to deliver SIGINT (which, under a ``just``/shell wrapper, means signalling the + whole process group). + + No-op off the main thread, where ``signal.signal`` refuses to install a handler; + the previous handler is always restored on exit. + """ + if threading.current_thread() is not threading.main_thread(): + yield + return + + def _raise_interrupt(signum: int, frame: FrameType | None) -> None: + del signum, frame # handler signature is fixed; the values are unused + raise KeyboardInterrupt + + previous = signal.signal(signal.SIGTERM, _raise_interrupt) + try: + yield + finally: + signal.signal(signal.SIGTERM, previous) diff --git a/tests/test_agent_cascade_command.py b/tests/test_agent_cascade_command.py index 7b89efde..d66b17b5 100644 --- a/tests/test_agent_cascade_command.py +++ b/tests/test_agent_cascade_command.py @@ -7,6 +7,7 @@ from __future__ import annotations import dataclasses +import signal import types import pytest @@ -264,6 +265,22 @@ def boom(**kwargs): assert rendered["r"].closed is True +def test_installs_sigterm_handler_around_run(monkeypatch): + captured: dict[str, object] = {} + + def capture(**kwargs): + captured["handler"] = signal.getsignal(signal.SIGTERM) + + _wire_run(monkeypatch, capture) + run_agent_cascade(_opts(source="clip.wav"), AppState(), json_mode=False) + handler = captured["handler"] + # While the cascade runs, SIGTERM maps to KeyboardInterrupt (the same clean stop + # as Ctrl-C); without the wrapper this would be the default, non-callable SIG_DFL. + assert callable(handler) + with pytest.raises(KeyboardInterrupt): + handler(signal.SIGTERM, None) + + def test_broken_pipe_exits_zero(monkeypatch): def boom(**kwargs): raise BrokenPipeError diff --git a/tests/test_agent_command.py b/tests/test_agent_command.py index 8c7a2ce8..c5abce4e 100644 --- a/tests/test_agent_command.py +++ b/tests/test_agent_command.py @@ -1,5 +1,7 @@ import json +import signal +import pytest from typer.testing import CliRunner from aai_cli.agent.voices import VOICES, format_voice_list @@ -136,6 +138,24 @@ def raise_kbd(*a, **k): assert result.exit_code == 0 +def test_agent_installs_sigterm_handler_around_session(monkeypatch): + config.set_api_key("default", "sk_live") + captured: dict[str, object] = {} + + def fake_run_session(api_key, *, renderer, player, mic, config): + captured["handler"] = signal.getsignal(signal.SIGTERM) + + monkeypatch.setattr("aai_cli.commands.agent._exec.run_session", fake_run_session) + result = runner.invoke(app, ["agent"]) + assert result.exit_code == 0 + handler = captured["handler"] + # While the session runs, SIGTERM maps to KeyboardInterrupt (the same clean stop + # as Ctrl-C); without the wrapper this would be the default, non-callable SIG_DFL. + assert callable(handler) + with pytest.raises(KeyboardInterrupt): + handler(signal.SIGTERM, None) + + def test_agent_unknown_voice_exits_2(monkeypatch): config.set_api_key("default", "sk_live") monkeypatch.setattr("aai_cli.commands.agent._exec.run_session", lambda *a, **k: None) diff --git a/tests/test_signals.py b/tests/test_signals.py new file mode 100644 index 00000000..d95d05e2 --- /dev/null +++ b/tests/test_signals.py @@ -0,0 +1,44 @@ +"""Unit tests for aai_cli.core.signals.terminate_as_interrupt.""" + +from __future__ import annotations + +import signal +import threading + +import pytest + +from aai_cli.core import signals + + +def test_terminate_as_interrupt_installs_and_restores_handler(): + before = signal.getsignal(signal.SIGTERM) + with signals.terminate_as_interrupt(): + handler = signal.getsignal(signal.SIGTERM) + # A new handler is installed for the block... + assert handler is not before + assert callable(handler) + # ...and it turns a SIGTERM into the clean-stop KeyboardInterrupt. + with pytest.raises(KeyboardInterrupt): + handler(signal.SIGTERM, None) + # The previous handler is restored on exit. + assert signal.getsignal(signal.SIGTERM) is before + + +def test_terminate_as_interrupt_is_noop_off_main_thread(): + before = signal.getsignal(signal.SIGTERM) + observed: dict[str, object] = {} + + def worker() -> None: + with signals.terminate_as_interrupt(): + # Off the main thread no handler may be installed, so the disposition + # is untouched and the block still runs to completion. + observed["handler"] = signal.getsignal(signal.SIGTERM) + observed["ran"] = True + + thread = threading.Thread(target=worker) + thread.start() + thread.join() + + assert observed["ran"] is True + assert observed["handler"] is before + assert signal.getsignal(signal.SIGTERM) is before diff --git a/tests/test_speak.py b/tests/test_speak.py index bb4d059c..c56ce182 100644 --- a/tests/test_speak.py +++ b/tests/test_speak.py @@ -2,6 +2,7 @@ import json import re +import signal import pytest from typer.testing import CliRunner @@ -81,6 +82,23 @@ def test_out_writes_wav_and_does_not_play(monkeypatch, tmp_path, fake_synthesize assert "played" not in result.stderr +def test_installs_sigterm_handler_around_synthesis(monkeypatch): + captured: dict[str, object] = {} + + def fake_single(*a, **k): + captured["handler"] = signal.getsignal(signal.SIGTERM) + + monkeypatch.setattr("aai_cli.commands.speak._exec._speak_single", fake_single) + result = runner.invoke(app, ["--sandbox", "speak", "Hi"]) + assert result.exit_code == 0 + handler = captured["handler"] + # During synthesis/playback, SIGTERM maps to KeyboardInterrupt — the same abort + # path as Ctrl-C; without the wrapper this would be the default, non-callable SIG_DFL. + assert callable(handler) + with pytest.raises(KeyboardInterrupt): + handler(signal.SIGTERM, None) + + def test_reads_text_from_stdin_when_arg_omitted(monkeypatch, fake_synthesize): monkeypatch.setattr("aai_cli.commands.speak._exec.audio.play_pcm", lambda *a, **k: None) result = runner.invoke(app, ["--sandbox", "speak"], input="piped text\n") diff --git a/tests/test_stream_sigterm.py b/tests/test_stream_sigterm.py new file mode 100644 index 00000000..f2a6f31b --- /dev/null +++ b/tests/test_stream_sigterm.py @@ -0,0 +1,55 @@ +"""`assembly stream` wires SIGTERM into the clean-stop path (see core.signals). + +These assert the *wiring*: that ``run_stream`` installs the SIGTERM->KeyboardInterrupt +handler around the streaming body for both the single-source and ``--from-stdin`` batch +paths. The handler's own behavior is covered in test_signals.py; the graceful stop the +KeyboardInterrupt then triggers is covered in test_stream_session.py / test_stream_batch.py. +""" + +from __future__ import annotations + +import signal + +import pytest +from typer.testing import CliRunner + +from aai_cli.core import config +from aai_cli.main import app + +runner = CliRunner() + + +def test_stream_installs_sigterm_handler_around_dispatch(monkeypatch): + config.set_api_key("default", "sk_live") + captured: dict[str, object] = {} + + def fake_dispatch(session, sources): + captured["handler"] = signal.getsignal(signal.SIGTERM) + + monkeypatch.setattr("aai_cli.commands.stream._exec._dispatch", fake_dispatch) + result = runner.invoke(app, ["stream"]) + + assert result.exit_code == 0 + handler = captured["handler"] + # While streaming, SIGTERM raises KeyboardInterrupt — without the wrapper this + # would be the default disposition (SIG_DFL), which is not callable. + assert callable(handler) + with pytest.raises(KeyboardInterrupt): + handler(signal.SIGTERM, None) + + +def test_stream_batch_installs_sigterm_handler_around_run(monkeypatch): + config.set_api_key("default", "sk_live") + captured: dict[str, object] = {} + + def fake_run_batch(opts, state, *, json_mode, text_mode): + captured["handler"] = signal.getsignal(signal.SIGTERM) + + monkeypatch.setattr("aai_cli.commands.stream._exec._run_batch", fake_run_batch) + result = runner.invoke(app, ["stream", "--from-stdin"]) + + assert result.exit_code == 0 + handler = captured["handler"] + assert callable(handler) + with pytest.raises(KeyboardInterrupt): + handler(signal.SIGTERM, None)