diff --git a/scripts/mutation_gate.py b/scripts/mutation_gate.py index ef55fa19..1a7c281d 100644 --- a/scripts/mutation_gate.py +++ b/scripts/mutation_gate.py @@ -16,6 +16,8 @@ from __future__ import annotations import ast +import contextlib +import importlib.util import re import subprocess import sys @@ -235,15 +237,32 @@ def _run_tests(nodeids: list[str]) -> bool: return proc.returncode != 0 +def _invalidate_bytecode(path: Path) -> None: + """Drop the module's cached ``.pyc`` so the test subprocess recompiles from the + source we just wrote. + + Consecutive mutants ``ast.unparse`` to files that differ by a single token, so + they're usually byte-for-byte the same length and can be written within the same + mtime-second. CPython's default timestamp-based cache validates a ``.pyc`` by + exact (mtime, size) match, so without this it can serve the previous mutant's + (or the original's) bytecode and run *unmutated* code — a false survivor. + """ + cached = importlib.util.cache_from_source(str(path)) + with contextlib.suppress(OSError): + Path(cached).unlink() + + def _survives( path: Path, tree: ast.Module, src: str, mutant: _Mutant, data: coverage.CoverageData ) -> bool: mutant.apply() try: path.write_text(ast.unparse(tree), encoding="utf-8") + _invalidate_bytecode(path) killed = _run_tests(_covering_tests(data, path, mutant.linenos)) finally: path.write_text(src, encoding="utf-8") + _invalidate_bytecode(path) mutant.undo() return not killed diff --git a/tests/test_account_command.py b/tests/test_account_command.py index da1dc4e8..9fae8fb8 100644 --- a/tests/test_account_command.py +++ b/tests/test_account_command.py @@ -74,6 +74,12 @@ def fake_usage(jwt, start, end, window): # (AMS rejects naive datetimes with a 400). for bound in (captured["start"], captured["end"]): assert bound.endswith("+00:00") and "T" in bound, bound + # The default range spans exactly the last 30 days (pins `today - timedelta(days=30)`). + from datetime import datetime as _dt + + start_day = _dt.fromisoformat(captured["start"]).date() + end_day = _dt.fromisoformat(captured["end"]).date() + assert (end_day - start_day).days == 30 data = json.loads(result.output) assert data["usage_items"][0]["total"] == 12.5 @@ -117,6 +123,20 @@ def test_usage_helpers_format_windows_and_line_items(): ) == "2026-01-01 to 2026-01-03" ) + # Exactly one parseable bound falls back to the single start-day label (pins the + # `start is None or end is None` guard; an `and` would dereference the None end). + assert account._window_label({"start_timestamp": "2026-01-01T00:00:00Z"}) == "2026-01-01" + # A one-day window (end == start + 1 day) collapses to a single day, not a range + # (pins the `start.date() + timedelta(days=1)`). + assert ( + account._window_label( + { + "start_timestamp": "2026-01-01T00:00:00Z", + "end_timestamp": "2026-01-02T00:00:00Z", + } + ) + == "2026-01-01" + ) assert account._line_item_label({"name": "minutes", "total": "12.500"}) == "minutes: 12.5" assert account._line_item_label({"product": "streaming"}) == "streaming" assert account._line_item_label({"quantity": 3}) == "3" diff --git a/tests/test_agent_audio.py b/tests/test_agent_audio.py index 3856178a..c966b91e 100644 --- a/tests/test_agent_audio.py +++ b/tests/test_agent_audio.py @@ -33,15 +33,32 @@ def test_duplex_opens_at_device_rate_and_closes(): def factory(*, rate, blocksize, callback, device): seen["rate"] = rate seen["device"] = device + seen["blocksize"] = blocksize return fake d = DuplexAudio(device=3, device_rate=48000, stream_factory=factory) d.player.start() assert seen["rate"] == 48000 and seen["device"] == 3 # one stream at device rate + assert seen["blocksize"] == 4800 # ~100 ms at 48 kHz (device_rate // 10) d.close() assert fake.stopped and fake.closed +def test_duplex_restart_after_close_reopens_stream(): + calls = {"n": 0} + + def factory(**_k): + calls["n"] += 1 + return FakeStream() + + d = DuplexAudio(device_rate=16000, stream_factory=factory) + d.start() + assert calls["n"] == 1 + d.close() + d.start() # close() cleared the started flag, so this reopens the stream + assert calls["n"] == 2 + + def test_duplex_callback_captures_input_and_zero_fills_idle_output(): cb = {} @@ -78,6 +95,26 @@ def factory(*, rate, blocksize, callback, device): d.close() +def test_duplex_callback_partial_buffer_zero_fills_exact_remainder(): + cb = {} + + def factory(*, rate, blocksize, callback, device): + cb["fn"] = callback + return FakeStream() + + # device == target so playback bytes pass through unresampled and are easy to count. + d = DuplexAudio(target_rate=16000, device_rate=16000, stream_factory=factory) + d.player.start() + d.player.enqueue(b"\x01\x02" * 5) # 10 bytes buffered + outdata = bytearray(20) # request 20 bytes -> 10 real + 10 zero-filled + cb["fn"](b"\x00\x00" * 5, outdata, 5, None, None) + # The shortfall is filled with exactly `need - len(take)` zero bytes: the buffer + # plays out first, then silence, and the output stays exactly `need` bytes long. + assert len(outdata) == 20 + assert bytes(outdata) == b"\x01\x02" * 5 + b"\x00" * 10 + d.close() + + def test_duplex_mic_ends_after_close(): d = DuplexAudio(target_rate=16000, device_rate=16000, stream_factory=lambda **k: FakeStream()) d.player.start() @@ -102,8 +139,8 @@ def test_duplex_player_facade_flush_and_close(): fake = FakeStream() d = DuplexAudio(target_rate=16000, device_rate=16000, stream_factory=lambda **k: fake) d.player.start() - d.player.enqueue(b"\x01\x02" * 8) - assert d.player.pending() > 0 + d.player.enqueue(b"\x01\x02" * 8) # 16 bytes, no resample (device == target) + assert d.player.pending() == 8 # pending() reports samples = bytes // 2 d.player.flush() assert d.player.pending() == 0 d.player.close() @@ -161,3 +198,4 @@ def boom(**kw): with pytest.raises(CLIError) as exc: _default_duplex_stream(rate=24000, blocksize=2400, callback=lambda *a: None, device=None) assert exc.value.error_type == "audio_output_error" + assert exc.value.exit_code == 1 diff --git a/tests/test_agent_session.py b/tests/test_agent_session.py index 01a0be86..1bd32c7b 100644 --- a/tests/test_agent_session.py +++ b/tests/test_agent_session.py @@ -112,9 +112,12 @@ def test_transcripts_routed_to_renderer(): s.dispatch({"type": "transcript.user.delta", "text": "what"}) s.dispatch({"type": "transcript.user", "text": "what time"}) s.dispatch({"type": "transcript.agent", "text": "noon", "interrupted": False}) + # An agent transcript with no "interrupted" key defaults to False (pins the default). + s.dispatch({"type": "transcript.agent", "text": "later"}) assert ("user_partial", "what") in s.renderer.calls assert ("user_final", "what time") in s.renderer.calls assert ("agent_transcript", "noon", False) in s.renderer.calls + assert ("agent_transcript", "later", False) in s.renderer.calls def test_unauthorized_error_raises_cli_error_exit_2(): @@ -122,6 +125,7 @@ def test_unauthorized_error_raises_cli_error_exit_2(): with pytest.raises(CLIError) as excinfo: s.dispatch({"type": "session.error", "code": "UNAUTHORIZED", "message": "bad key"}) assert excinfo.value.exit_code == 2 + assert "bad key" in str(excinfo.value) # the server message wins over code/fallback def test_other_session_error_raises_api_error(): @@ -285,6 +289,26 @@ def close(self): assert exc.value.exit_code == 1 # the real mic failure reaches the user, not a hang +def test_run_session_does_not_close_player_that_failed_to_open(): + # If opening the speaker stream raises, the cleanup must NOT call close() on a + # player that never started (pins the player_started=False initializer). + class _FailingPlayer(FakePlayer): + def start(self): + raise CLIError("speaker busy", error_type="audio_output_error", exit_code=1) + + player = _FailingPlayer() + with pytest.raises(CLIError): + run_session( + "sk", + renderer=FakeRenderer(), + player=player, + mic=[], + config=AgentRunConfig(voice="ivy", system_prompt="x", greeting="hi"), + connect=lambda url, **kwargs: _RecordingWS(), + ) + assert player.closed is False # never opened, so never closed + + def test_run_session_non_auth_failure_stays_api_error(): def boom(url, **kwargs): raise RuntimeError("network unreachable") diff --git a/tests/test_auth_ams.py b/tests/test_auth_ams.py index a3e9baf4..d669fed2 100644 --- a/tests/test_auth_ams.py +++ b/tests/test_auth_ams.py @@ -56,6 +56,9 @@ def handler(request: httpx.Request) -> httpx.Response: with pytest.raises(APIError) as exc: ams.discover("x") assert "Something went wrong" in str(exc.value) + # The "detail" field is extracted, not the raw JSON body: the field name and its + # braces must not leak (pins `mapping is not None and "detail" in mapping`). + assert "detail" not in str(exc.value) def test_error_with_non_json_body_falls_back_to_text(monkeypatch): diff --git a/tests/test_auth_loopback.py b/tests/test_auth_loopback.py index d5cde6e3..6be0d088 100644 --- a/tests/test_auth_loopback.py +++ b/tests/test_auth_loopback.py @@ -1,7 +1,7 @@ +import http.client import socket import threading import time -import urllib.request import pytest @@ -9,15 +9,27 @@ from aai_cli.errors import APIError -def _hit(path: str) -> None: - url = f"http://{endpoints.LOOPBACK_HOST}:{endpoints.LOOPBACK_PORT}{path}" +def _hit(path: str) -> int | None: + """Request `path` against the loopback server, returning the HTTP status code. + + Uses http.client (not urllib) so a 404 comes back as a normal response status + rather than a raised HTTPError, and so no urllib audit suppression is needed. + """ # Retry briefly until the server thread is bound. for _ in range(50): + conn = http.client.HTTPConnection( + endpoints.LOOPBACK_HOST, endpoints.LOOPBACK_PORT, timeout=2 + ) try: - urllib.request.urlopen(url, timeout=2).read() # noqa: S310 - fixed localhost URL - return + conn.request("GET", path) + resp = conn.getresponse() + resp.read() + return resp.status except OSError: time.sleep(0.05) + finally: + conn.close() + return None def test_capture_returns_token_and_type(): @@ -28,9 +40,10 @@ def run(): t = threading.Thread(target=run) t.start() - _hit("/callback?stytch_token_type=discovery_oauth&token=tok_abc") + status = _hit("/callback?stytch_token_type=discovery_oauth&token=tok_abc") t.join(timeout=5) + assert status == 200 # the callback is acknowledged with 200 OK result = result_box["result"] assert result.token == "tok_abc" assert result.token_type == "discovery_oauth" @@ -47,7 +60,7 @@ def run(): t = threading.Thread(target=run) t.start() - _hit("/favicon.ico") # unknown path -> 404, capture stays open + assert _hit("/favicon.ico") == 404 # unknown path -> 404, capture stays open _hit("/callback?stytch_token_type=discovery_oauth&token=tok_late") t.join(timeout=5) diff --git a/tests/test_client.py b/tests/test_client.py index 6b9b07ce..d101f7af 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -22,6 +22,9 @@ def test_validate_key_true_on_success(): with patch.object(client.aai, "Transcriber") as T: T.return_value.list_transcripts.return_value = MagicMock() assert client.validate_key("sk_good") is True + # The probe asks for a single row — it only needs to confirm the key authenticates. + params = T.return_value.list_transcripts.call_args.args[0] + assert params.limit == 1 def test_validate_key_false_on_auth_error(): @@ -126,6 +129,23 @@ def test_transcribe_raises_on_error_status(): with pytest.raises(APIError) as exc: client.transcribe("sk", "audio.mp3", config=aai.TranscriptionConfig()) assert exc.value.transcript_id == "t_err" + assert exc.value.message == "decode failed" # surfaces the SDK's error verbatim + + +def test_transcribe_error_status_without_message_uses_fallback(): + # When the SDK reports an error status but no error text, fall back to a generic + # message (pins the `transcript.error or "Transcription failed."`). + fake_transcript = MagicMock() + fake_transcript.status = client.aai.TranscriptStatus.error + fake_transcript.error = None + fake_transcript.id = "t_err" + fake_transcriber = MagicMock() + fake_transcriber.transcribe.return_value = fake_transcript + + with patch.object(client.aai, "Transcriber", return_value=fake_transcriber): + with pytest.raises(APIError) as exc: + client.transcribe("sk", "audio.mp3", config=aai.TranscriptionConfig()) + assert exc.value.message == "Transcription failed." def test_select_transcript_field_utterances_formats_speakers(): @@ -272,6 +292,26 @@ def test_stream_audio_wires_handlers_and_streams(monkeypatch): assert last.terminate is True # graceful flush requested +def test_stream_audio_registers_begin_handler_when_provided(monkeypatch): + # A provided on_begin must actually be wired to the Begin event (pins + # `if on_begin is not None`); inverting it would leave Begin unhandled. + class BeginClient(_FakeStreamingClient): + def stream(self, source): + from assemblyai.streaming.v3 import StreamingEvents + + self.handlers[StreamingEvents.Begin](self, _types.SimpleNamespace(id="sess_1")) + + monkeypatch.setattr(client, "StreamingClient", BeginClient) + begins = [] + client.stream_audio( + "sk", + [b"\x00"], + params=_stream_params(), + on_begin=lambda e: begins.append(e.id), + ) + assert begins == ["sess_1"] + + def test_stream_audio_raises_on_error_event(monkeypatch): class ErrClient(_FakeStreamingClient): def stream(self, source): diff --git a/tests/test_code_gen.py b/tests/test_code_gen.py index 39968f49..0dcf984c 100644 --- a/tests/test_code_gen.py +++ b/tests/test_code_gen.py @@ -154,6 +154,8 @@ def test_transcribe_render_parses_and_uses_env_key(): assert "https://assembly.ai/wildfires.mp3" in code assert "transcript.utterances" in code # result handling for speaker_labels assert "{{API_KEY}}" not in code # never echo a real key + # config kwargs are rendered 4-space indented inside the TranscriptionConfig call + assert "aai.TranscriptionConfig(\n speaker_labels=True,\n)" in code def test_transcribe_render_no_config_is_minimal(): diff --git a/tests/test_config_builder.py b/tests/test_config_builder.py index 7c7ccbb4..3d571c75 100644 --- a/tests/test_config_builder.py +++ b/tests/test_config_builder.py @@ -99,10 +99,18 @@ def test_split_csv(): def test_parse_auth_header(): assert cb.parse_auth_header("Authorization:Bearer x") == ("Authorization", "Bearer x") assert cb.parse_auth_header(None) is None + # Only the first ':' separates NAME from VALUE; colons in the value are preserved. + assert cb.parse_auth_header("X-Auth:Bearer a:b:c") == ("X-Auth", "Bearer a:b:c") with pytest.raises(UsageError): cb.parse_auth_header("no-colon") +def test_parse_config_overrides_splits_on_first_equals_only(): + # A value may itself contain '='; only the first '=' separates key from value. + out = cb.parse_config_overrides(cb.TRANSCRIBE_FIELDS, ["keyterms_prompt=a=b,c"]) + assert out["keyterms_prompt"] == ["a=b", "c"] + + def test_load_custom_spelling(tmp_path): p = tmp_path / "spell.json" p.write_text('{"AssemblyAI": ["assembly ai", "assemblyai"]}') @@ -398,6 +406,21 @@ def test_derive_kind_dict_origin_is_json(): assert cb._derive_kind(dict[str, int]) == "json" +def test_derive_kind_unwraps_optional_and_classifies_bare_scalars(): + import typing + + # A bare scalar is classified by its type, not treated as a dict/json value (pins + # the `origin is dict` check) and a list origin -> "list". + assert cb._derive_kind(int) == "int" + assert cb._derive_kind(list[str]) == "list" + # Optional[int] must unwrap to its single inner type (pins the `a is not None` + # filter). Build the Union via typing.__dict__ so ruff's UP007 ("use X | Y") + # stays quiet — the unwrap path specifically keys on the typing.Union origin, + # which `int | None` doesn't share. + optional_int = typing.__dict__["Union"][int, None] + assert cb._derive_kind(optional_int) == "int" + + def test_coerce_table_unknown_field_defaults_to_str(): # A curated name the SDK model doesn't expose passes through as a string # rather than crashing at import time. diff --git a/tests/test_context.py b/tests/test_context.py index 225238e7..1ad2261b 100644 --- a/tests/test_context.py +++ b/tests/test_context.py @@ -181,6 +181,56 @@ def test_resolve_session_raises_when_no_session(): resolve_session(AppState()) +def test_resolve_session_raises_when_only_account_id_missing(monkeypatch): + # A stored session JWT but no account id is still incomplete and must raise, + # pinning the `session is None or account_id is None` guard (an `and` there would + # fall through and return a None account id instead of failing cleanly). + import pytest + + from aai_cli.context import AppState, resolve_session + from aai_cli.errors import NotAuthenticated + + monkeypatch.setattr(config, "get_session", lambda _profile: {"jwt": "j", "token": "t"}) + monkeypatch.setattr(config, "get_account_id", lambda _profile: None) + with pytest.raises(NotAuthenticated): + resolve_session(AppState()) + + +def test_resolve_session_raises_when_only_jwt_missing(monkeypatch): + # The mirror case: an account id but no stored session must also raise. + import pytest + + from aai_cli.context import AppState, resolve_session + from aai_cli.errors import NotAuthenticated + + monkeypatch.setattr(config, "get_session", lambda _profile: None) + monkeypatch.setattr(config, "get_account_id", lambda _profile: 42) + with pytest.raises(NotAuthenticated): + resolve_session(AppState()) + + +def test_run_command_auto_logs_in_when_env_key_set_but_error_is_not_a_rejection(monkeypatch): + # ENV key present but the failure is a generic NotAuthenticated (not a key + # rejection): a browser login can still fix it, so we DO auto-login. This pins + # the `and` in _should_auto_login — an `or` would wrongly skip the retry here. + monkeypatch.setenv(config.ENV_API_KEY, "sk_env") + ran = {"login": 0} + + def fake_login(): + ran["login"] += 1 + return LoginResult(api_key="sk_auto", session_jwt="j", session_token="t", account_id=7) + + monkeypatch.setattr("aai_cli.context.run_login_flow", fake_login) + + def body(state, json_mode): + raise NotAuthenticated() # message != REJECTED_KEY_MESSAGE + + result = runner.invoke(_make_app(body), ["go"]) + assert ran["login"] == 1 # auto-login was attempted despite the env key + assert result.exit_code == 2 + assert "Run the same command again" in result.output + + def test_appstate_methods_are_the_single_source_of_truth(): # The module-level resolve_* helpers are thin adapters over the AppState methods; # both must agree, and the precedence (default profile + default env) must hold. diff --git a/tests/test_errors.py b/tests/test_errors.py index 0b2e881a..c81c6fa9 100644 --- a/tests/test_errors.py +++ b/tests/test_errors.py @@ -25,6 +25,16 @@ def test_to_dict_includes_suggestion_when_present(): } +def test_cli_error_defaults(): + # A bare CLIError must default to exit_code 1 and error_type "error" (nothing + # else relies on these defaults, so pin them here). + err = CLIError("nope") + assert err.exit_code == 1 + assert err.error_type == "error" + assert err.transcript_id is None + assert err.suggestion is None + + def test_to_dict_omits_none_transcript_id_and_suggestion(): err = CLIError("nope", error_type="generic", exit_code=1) assert err.to_dict() == {"error": {"type": "generic", "message": "nope"}} @@ -79,15 +89,24 @@ class _Frame: class _ClosedRcvd(Exception): rcvd = _Frame() - class _Resp: + class _Resp403: status_code = 403 - class _HTTPish(Exception): - response = _Resp() + class _Forbidden(Exception): + response = _Resp403() + + class _Resp401: + status_code = 401 + + class _Unauthorized(Exception): + response = _Resp401() assert is_auth_failure(_Closed("policy violation")) # no auth word in the text assert is_auth_failure(_ClosedRcvd("connection closed")) - assert is_auth_failure(_HTTPish("nope")) + # Both HTTP rejection codes must be matched structurally, with text that carries + # no auth hint so only the status_code path can flag it (kills 401-vs-403 mutants). + assert is_auth_failure(_Forbidden("nope")) + assert is_auth_failure(_Unauthorized("nope")) def test_is_auth_failure_ignores_unrelated_status_codes(): diff --git a/tests/test_follow.py b/tests/test_follow.py index 0ea996f6..9fab657a 100644 --- a/tests/test_follow.py +++ b/tests/test_follow.py @@ -23,23 +23,36 @@ def test_json_mode_does_not_start_a_live_region(): class _FakeLive: def __init__(self, *args, **kwargs): + self.init_kwargs = kwargs self.started = False self.stopped = False self.updates = [] + self.refreshes = [] def start(self): self.started = True def update(self, renderable, refresh): self.updates.append(renderable) + self.refreshes.append(refresh) def stop(self): self.stopped = True +def _patch_live(monkeypatch, fake): + """Patch follow.Live so the constructor kwargs land on the fake instance.""" + + def factory(*args, **kwargs): + fake.init_kwargs = kwargs + return fake + + monkeypatch.setattr(follow, "Live", factory) + + def test_terminal_mode_renders_panels_and_prints_final(monkeypatch): fake = _FakeLive() - monkeypatch.setattr(follow, "Live", lambda *a, **k: fake) + _patch_live(monkeypatch, fake) printed = [] monkeypatch.setattr(output.console, "print", lambda renderable: printed.append(renderable)) @@ -52,6 +65,11 @@ def test_terminal_mode_renders_panels_and_prints_final(monkeypatch): assert fake.stopped is True # the final panel is reprinted to the normal screen as scrollback assert printed == [fake.updates[-1]] + # The live region must own an isolated alternate screen and never auto-refresh + # (we drive refresh explicitly), and each update must force a synchronous redraw. + assert fake.init_kwargs["screen"] is True + assert fake.init_kwargs["auto_refresh"] is False + assert fake.refreshes == [True, True] def test_terminal_mode_panel_title_pluralizes_turns(monkeypatch): diff --git a/tests/test_init_command.py b/tests/test_init_command.py index 85482f1a..2a5895b0 100644 --- a/tests/test_init_command.py +++ b/tests/test_init_command.py @@ -31,8 +31,10 @@ def test_init_scaffold_only_creates_project(tmp_path, monkeypatch): def test_init_writes_key_from_env(tmp_path, monkeypatch): monkeypatch.chdir(tmp_path) monkeypatch.setenv("ASSEMBLYAI_API_KEY", "sk-from-env") - runner.invoke(app, ["init", TEMPLATE, "myapp", "--no-install"]) + result = runner.invoke(app, ["init", TEMPLATE, "myapp", "--no-install"]) assert "ASSEMBLYAI_API_KEY=sk-from-env" in (tmp_path / "myapp" / ".env").read_text() + # A resolved key means no skipped-key row (pins `if api_key is None`). + assert "no API key found" not in result.output def test_init_logged_out_installs_but_skips_launch_with_hint(tmp_path, monkeypatch): @@ -53,6 +55,9 @@ def test_init_logged_out_installs_but_skips_launch_with_hint(tmp_path, monkeypat assert result.exit_code == 0, result.output assert launched["v"] is False assert "aai login" in result.output + # Deps installed but no key -> a launch-skipped row with the manual run command + # (pins `not no_install and api_key is None`). + assert "uvicorn api.index" in result.output def test_init_writes_base_url_for_active_env(tmp_path, monkeypatch): @@ -66,9 +71,12 @@ def test_init_writes_base_url_for_active_env(tmp_path, monkeypatch): def test_init_placeholder_key_when_logged_out(tmp_path, monkeypatch): monkeypatch.chdir(tmp_path) - runner.invoke(app, ["init", TEMPLATE, "myapp", "--no-install"]) + result = runner.invoke(app, ["init", TEMPLATE, "myapp", "--no-install"]) env = (tmp_path / "myapp" / ".env").read_text() assert "your_assemblyai_api_key_here" in env + # --no-install means no deps were installed, so there's no launch-skipped row even + # without a key (pins the `not no_install` half of the launch guard). + assert "uvicorn api.index" not in result.output def test_init_unknown_template_errors(tmp_path, monkeypatch): diff --git a/tests/test_llm_command.py b/tests/test_llm_command.py index 7230fd57..5b5627ba 100644 --- a/tests/test_llm_command.py +++ b/tests/test_llm_command.py @@ -67,6 +67,18 @@ def fake_complete(api_key, *, model, messages, max_tokens, transcript_id=None): assert seen["messages"][0]["content"] == "What is 2+2?" +def test_llm_output_json_forces_json_for_human(monkeypatch): + _auth() + # Simulate an interactive human (not piped/agentic); `-o json` must still emit + # JSON, pinning the `output_field == "json"` that forces machine output. + monkeypatch.setattr("aai_cli.output._is_agentic", lambda: False) + monkeypatch.setattr("aai_cli.commands.llm.gateway.complete", lambda *a, **k: _payload("4")) + result = runner.invoke(app, ["llm", "hi", "-o", "json"]) + assert result.exit_code == 0 + data = json.loads(result.output) + assert data["output"] == "4" + + def test_llm_transcript_id_injected(monkeypatch): _auth() seen = {} diff --git a/tests/test_login.py b/tests/test_login.py index 999214a1..67aece52 100644 --- a/tests/test_login.py +++ b/tests/test_login.py @@ -14,10 +14,13 @@ def _fake_login_result(key="sk_from_oauth"): def test_login_with_api_key_flag_stores_key(): + import json + with patch("aai_cli.commands.login.client.validate_key", return_value=True): - result = runner.invoke(app, ["login", "--api-key", "sk_flag"]) + result = runner.invoke(app, ["login", "--api-key", "sk_flag", "--json"]) assert result.exit_code == 0 assert config.get_api_key("default") == "sk_flag" + assert json.loads(result.output)["authenticated"] is True # pins the success flag def test_login_rejects_invalid_key(): @@ -70,10 +73,13 @@ def test_whoami_unauthenticated_runs_login(monkeypatch): def test_logout_clears_key(): + import json + config.set_api_key("default", "sk_1234567890") - result = runner.invoke(app, ["logout"]) + result = runner.invoke(app, ["logout", "--json"]) assert result.exit_code == 0 assert config.get_api_key("default") is None + assert json.loads(result.output)["logged_out"] is True # pins the success flag def test_login_oauth_flow_stores_returned_key(monkeypatch): @@ -167,6 +173,34 @@ def test_whoami_reports_env(): assert data["env"] == "production" +def test_root_callback_keeps_profile_env_without_sandbox(): + # Without --sandbox the profile's own env must stand (pins `sandbox and env is + # None`: an `or` would force sandbox000 onto every default invocation). + import json + + config.set_api_key("default", "sk_1234567890") + config.set_profile_env("default", "production") + with patch("aai_cli.commands.login.client.validate_key", return_value=True): + result = runner.invoke(app, ["whoami", "--json"]) + assert result.exit_code == 0 + assert json.loads(result.output)["env"] == "production" + + +def test_root_callback_sandbox_overrides_profile_env(): + # --sandbox forces sandbox000 even when the profile is bound elsewhere (pins the + # `env is None` arm: an `is not None` would leave the profile env in place). + import json + + config.set_api_key("default", "sk_1234567890") + config.set_profile_env("default", "production") + with patch("aai_cli.commands.login.client.validate_key", return_value=True): + result = runner.invoke(app, ["--sandbox", "whoami", "--json"]) + assert result.exit_code == 0 + # A profile/env mismatch warning prints to stderr first; the JSON is the last line. + payload = json.loads(result.output.strip().splitlines()[-1]) + assert payload["env"] == "sandbox000" + + def test_unknown_env_exits_2(): result = runner.invoke(app, ["--env", "bogus", "whoami"]) assert result.exit_code == 2 diff --git a/tests/test_microphone.py b/tests/test_microphone.py index 8d27c45a..b06872de 100644 --- a/tests/test_microphone.py +++ b/tests/test_microphone.py @@ -169,6 +169,19 @@ def test_device_default_rate_reads_device(monkeypatch): assert _device_default_rate(2) == 44100 +def test_resample_pcm16_uses_16bit_mono_params(): + # resample_pcm16 must treat the buffer as 16-bit (2-byte) mono (1-channel) PCM. + # Compare against audioop driven with those exact params; a mutated width/channel + # count yields different bytes (or rejects the frame count), killing the mutant. + import aai_cli.microphone as m + + chunk = bytes(range(256)) # 128 little-endian 16-bit mono samples (a ramp) + expected, _ = m.audioop.ratecv(chunk, 2, 1, 48000, 24000, None) + out, _ = m.resample_pcm16(chunk, None, src_rate=48000, dst_rate=24000) + assert out == expected + assert out != chunk # 48k -> 24k actually changes the data + + def test_device_default_rate_falls_back_on_query_error(monkeypatch): fake_sd: Any = types.ModuleType("sounddevice") @@ -213,6 +226,8 @@ def raw_input_stream(**kwargs): assert created["samplerate"] == 16000 assert created["device"] == 2 assert created["blocksize"] == 1600 # ~100 ms at 16 kHz + assert created["channels"] == 1 # mono capture + assert created["dtype"] == "int16" # PCM16 assert next(iter(stream)) == b"\x01\x02" diff --git a/tests/test_output.py b/tests/test_output.py index db003e6f..12b265ec 100644 --- a/tests/test_output.py +++ b/tests/test_output.py @@ -155,6 +155,7 @@ def test_data_table_is_minimal_and_themed(): # brand heading style — so every listing command renders identically. assert table.box is box.SIMPLE_HEAD assert table.header_style == "aai.heading" + assert table.pad_edge is False # no leading/trailing pad column -> flush-left listing assert [str(col.header) for col in table.columns] == ["id", "status"] @@ -164,3 +165,28 @@ def test_detail_table_is_borderless_label_value_grid(): assert table.box is None assert len(table.columns) == 2 assert table.columns[0].style == "aai.muted" + # padding=(0, 3): no vertical pad, 3 cols of horizontal gap between label/value. + assert table.padding == (0, 3, 0, 3) + + +def test_emit_ndjson_writes_one_flushed_line(monkeypatch): + import sys + + class _RecordingStdout: + def __init__(self): + self.text = "" + self.flushed = 0 + + def write(self, s): + self.text += s + return len(s) + + def flush(self): + self.flushed += 1 + + rec = _RecordingStdout() + monkeypatch.setattr(sys, "stdout", rec) + output.emit_ndjson({"a": 1}) + # One newline-terminated JSON record, explicitly flushed so live pipelines see it. + assert rec.text == '{"a": 1}\n' + assert rec.flushed >= 1 diff --git a/tests/test_samples.py b/tests/test_samples.py index b0a08e6c..f6cbe45e 100644 --- a/tests/test_samples.py +++ b/tests/test_samples.py @@ -15,6 +15,16 @@ def test_samples_list_shows_transcribe(): assert "transcribe" in result.output +def test_samples_list_human_mode_renders_bullets(monkeypatch): + # Force human (non-agentic) rendering so the bullet-list branch runs; pins the + # string concatenation in the human renderer (a `-` there would raise TypeError). + monkeypatch.setattr("aai_cli.output._is_agentic", lambda: False) + result = runner.invoke(app, ["samples", "list"]) + assert result.exit_code == 0 + assert "Available samples:" in result.output + assert "- transcribe" in result.output + + def test_samples_list_shows_templates(): result = runner.invoke(app, ["samples", "list"]) assert result.exit_code == 0 @@ -47,6 +57,7 @@ def test_samples_create_stream_uses_env_key(tmp_path, monkeypatch): body = Path(tmp_path, "stream", "stream.py").read_text() assert _ENV_KEY in body assert "MicrophoneStream" in body + assert "format_turns=True" in body # the stream sample requests formatted turns def test_samples_create_transcribe_uses_env_key(tmp_path, monkeypatch): diff --git a/tests/test_sessions_command.py b/tests/test_sessions_command.py index fef08153..e3fa9e13 100644 --- a/tests/test_sessions_command.py +++ b/tests/test_sessions_command.py @@ -69,6 +69,10 @@ def test_sessions_list_renders_table_human(monkeypatch): result = runner.invoke(app, ["sessions", "list"]) assert result.exit_code == 0 assert "s_1" in result.output and "universal" in result.output + # The created/duration columns must render their values (pins `value or ""`: an + # `and` there would blank a present value). + assert "2026-06-01" in result.output + assert "12.0" in result.output def test_sessions_list_passes_status_filter(): diff --git a/tests/test_setup.py b/tests/test_setup.py index 22120eee..cc6fecf6 100644 --- a/tests/test_setup.py +++ b/tests/test_setup.py @@ -18,6 +18,17 @@ def _isolate_home(tmp_path, monkeypatch): monkeypatch.delenv("CLAUDE_CONFIG_DIR", raising=False) +def test_proc_detail_prefers_stderr_then_falls_back_to_stdout(): + from aai_cli.commands import setup + + # stderr wins when present (pins `proc.stderr or proc.stdout`); stdout is the + # fallback when stderr is empty. + both = subprocess.CompletedProcess([], 1, stdout="out text", stderr="err text") + assert setup._proc_detail(both) == "err text" + only_out = subprocess.CompletedProcess([], 1, stdout="only out", stderr="") + assert setup._proc_detail(only_out) == "only out" + + def _skill_path() -> Path: return Path.home() / ".claude" / "skills" / "assemblyai" diff --git a/tests/test_stream_command.py b/tests/test_stream_command.py index 7bfdf393..a14e8cf7 100644 --- a/tests/test_stream_command.py +++ b/tests/test_stream_command.py @@ -29,6 +29,68 @@ def _login_result(): ) +def test_stream_session_listening_notice_latches(monkeypatch): + # _listening_once must announce "Listening…" exactly once even if the first-audio + # callback fires repeatedly (pins the `self._listening_started = True` latch). + import io + + from aai_cli.commands.stream import _StreamSession + from aai_cli.streaming.render import StreamRenderer + + renderer = StreamRenderer(json_mode=False, out=io.StringIO()) + calls = {"n": 0} + monkeypatch.setattr(renderer, "listening", lambda: calls.__setitem__("n", calls["n"] + 1)) + session = _StreamSession( + api_key="sk", + base_flags={}, + overrides=None, + config_file=None, + renderer=renderer, + follow=None, + llm_prompts=[], + model="m", + max_tokens=1, + ) + session._listening_once() + session._listening_once() + assert calls["n"] == 1 + + +def test_stream_session_closes_renderer_on_error(monkeypatch): + # When streaming raises mid-run, the live region must still be torn down (pins the + # `if self.follow is None: self.renderer.close()` in the finally block). + import io + + import pytest + + from aai_cli.commands.stream import _StreamSession + from aai_cli.errors import CLIError + from aai_cli.streaming.render import StreamRenderer + + renderer = StreamRenderer(json_mode=False, out=io.StringIO()) + closed = {"n": 0} + monkeypatch.setattr(renderer, "close", lambda: closed.__setitem__("n", closed["n"] + 1)) + + def boom(*_args, **_kwargs): + raise CLIError("stream blew up") + + monkeypatch.setattr("aai_cli.commands.stream.client.stream_audio", boom) + session = _StreamSession( + api_key="sk", + base_flags={}, + overrides=None, + config_file=None, + renderer=renderer, + follow=None, + llm_prompts=[], + model="m", + max_tokens=1, + ) + with pytest.raises(CLIError): + session.run([b"\x00"], 16000) + assert closed["n"] >= 1 + + def test_stream_help_lists_command(): result = runner.invoke(app, ["stream", "--help"]) assert result.exit_code == 0 @@ -236,8 +298,14 @@ def test_stream_file_json_output(monkeypatch, tmp_path): def fake( api_key, source, *, params, on_begin=None, on_turn=None, on_termination=None, **_kwargs ): + # In non-follow mode begin/turn/termination must all be wired through to the + # renderer (pins the `follow is not None` None-vs-handler choices). + if on_begin: + on_begin(types.SimpleNamespace(id="sess_1")) if on_turn: on_turn(types.SimpleNamespace(transcript="from file", end_of_turn=True)) + if on_termination: + on_termination(types.SimpleNamespace(audio_duration_seconds=2.0)) monkeypatch.setattr("aai_cli.commands.stream.client.stream_audio", fake) p = tmp_path / "a.wav" @@ -249,7 +317,9 @@ def fake( result = runner.invoke(app, ["stream", str(p), "--json"]) assert result.exit_code == 0 lines = [_json.loads(x) for x in result.output.splitlines() if x.strip()] + assert {"type": "begin", "id": "sess_1"} in lines assert {"type": "turn", "transcript": "from file", "end_of_turn": True} in lines + assert {"type": "termination", "audio_duration_seconds": 2.0} in lines def test_stream_llm_refreshes_live_over_growing_transcript(monkeypatch): @@ -261,6 +331,7 @@ def fake(api_key, source, *, params, on_turn=None, **kwargs): on_turn(types.SimpleNamespace(transcript="hola", end_of_turn=True)) on_turn(types.SimpleNamespace(transcript="mundo", end_of_turn=True)) on_turn(types.SimpleNamespace(transcript="partial", end_of_turn=False)) # ignored + on_turn(types.SimpleNamespace(transcript="no-eot")) # missing flag -> not final def fake_run_chain(api_key, prompts, *, transcript_text, model, max_tokens): seen["texts"].append(transcript_text) diff --git a/tests/test_streaming_render.py b/tests/test_streaming_render.py index 97934edf..7bfc20b5 100644 --- a/tests/test_streaming_render.py +++ b/tests/test_streaming_render.py @@ -179,3 +179,63 @@ def test_listening_is_silent_in_json_mode(): r = StreamRenderer(json_mode=True, out=out) r.listening() assert out.getvalue() == "" # the "Listening…" line is human-only + + +class _FakeLive: + def __init__(self, *args, **kwargs): + self.init_kwargs = kwargs + self.refreshes = [] + self.started = False + self.stopped = False + + def start(self): + self.started = True + + def update(self, renderable, refresh): + self.refreshes.append(refresh) + + def stop(self): + self.stopped = True + + +def test_human_live_region_construction_and_refresh(monkeypatch): + # The live region must be built non-transient, never auto-refresh (we drive it), + # and never redirect the process streams the JSON/threaded paths also write to. + # Pin those kwargs and the forced per-update refresh with a fake Live. + import aai_cli.render as render_mod + + fake = _FakeLive() + + def factory(*args, **kwargs): + fake.init_kwargs = kwargs + return fake + + monkeypatch.setattr(render_mod, "Live", factory) + r, _buf = _human() + r.turn(_turn("partial", False)) # builds the Live and updates it + r.turn(_turn("done", True)) # final update, then commit (stop) + + assert fake.started is True + assert fake.stopped is True + assert fake.init_kwargs["auto_refresh"] is False + assert fake.init_kwargs["transient"] is False + assert fake.init_kwargs["redirect_stdout"] is False + assert fake.init_kwargs["redirect_stderr"] is False + assert fake.refreshes == [True, True] + + +def test_status_notice_is_flushed(): + # Status notices go to stderr with flush=True so they aren't buffered behind a + # long-running stream; a fake stream records that flush actually happens. + class _RecordingErr(io.StringIO): + flushed = 0 + + def flush(self): + self.flushed += 1 + super().flush() + + err = _RecordingErr() + r = StreamRenderer(json_mode=False, text_mode=True, out=io.StringIO(), err=err) + r.listening() + assert "Listening" in err.getvalue() + assert err.flushed >= 1 diff --git a/tests/test_streaming_sources.py b/tests/test_streaming_sources.py index ee13cdb0..5619d39d 100644 --- a/tests/test_streaming_sources.py +++ b/tests/test_streaming_sources.py @@ -19,6 +19,31 @@ def _write_wav(path, *, seconds=0.5, rate=16000): w.writeframes(b"\x00\x01" * frames) # 2 bytes/frame, mono 16-bit +def _make_wav(path, *, channels, width, rate): + with wave.open(str(path), "wb") as w: + w.setnchannels(channels) + w.setsampwidth(width) + w.setframerate(rate) + w.writeframes(b"\x00" * (width * channels * 10)) + + +def test_is_streamable_wav_requires_mono_16bit_16k(tmp_path): + good = tmp_path / "good.wav" + _make_wav(good, channels=1, width=2, rate=16000) + assert sources._is_streamable_wav(good) is True + # Each criterion alone must disqualify the file (pins the full `and` chain — an + # `or` would accept any of these because the other two clauses still match). + stereo = tmp_path / "stereo.wav" + _make_wav(stereo, channels=2, width=2, rate=16000) + wrong_rate = tmp_path / "rate.wav" + _make_wav(wrong_rate, channels=1, width=2, rate=8000) + wrong_width = tmp_path / "width.wav" + _make_wav(wrong_width, channels=1, width=1, rate=16000) + assert sources._is_streamable_wav(stereo) is False + assert sources._is_streamable_wav(wrong_rate) is False + assert sources._is_streamable_wav(wrong_width) is False + + def test_filesource_streams_wav_chunks(tmp_path): p = tmp_path / "clip.wav" _write_wav(p, seconds=0.55) # 0.55s @16k mono 16-bit = 17600 bytes @@ -43,6 +68,7 @@ def test_filesource_non_wav_without_ffmpeg_raises(tmp_path, monkeypatch): with pytest.raises(CLIError) as exc: FileSource(str(p)) assert exc.value.error_type == "ffmpeg_missing" + assert exc.value.exit_code == 2 def test_filesource_uses_ffmpeg_for_non_wav(tmp_path, monkeypatch): @@ -158,6 +184,61 @@ def wait(self): list(sources.FileSource(str(p), sleep=lambda _s: None)) +def test_filesource_ffmpeg_not_terminated_on_natural_eof(tmp_path, monkeypatch): + # On a clean EOF ffmpeg is allowed to exit on its own; terminating it would + # surface as a spurious exit -15. Pins the `completed = True` flag. + p = tmp_path / "clip.mp3" + p.write_bytes(b"x") + monkeypatch.setattr(sources.shutil, "which", lambda _name: "/usr/bin/ffmpeg") + holder = {} + + class FakeProc: + def __init__(self): + self.stdout = io.BytesIO(b"\x00" * 3200) # one full chunk, then EOF + self.stderr = io.BytesIO(b"") + self.returncode = 0 + self.terminated = False + holder["proc"] = self + + def terminate(self): + self.terminated = True + + def wait(self): + pass + + monkeypatch.setattr(sources.subprocess, "Popen", lambda *a, **k: FakeProc()) + chunks = list(FileSource(str(p), sleep=lambda _s: None)) + assert chunks == [b"\x00" * 3200] + assert holder["proc"].terminated is False + + +def test_filesource_ffmpeg_failure_empty_stderr_reports_exit_code(tmp_path, monkeypatch): + # When ffmpeg fails but writes nothing to stderr, the error message falls back to + # the exit code. Pins the `detail or f'exit {returncode}'` (an `and` would blank it). + from aai_cli.errors import APIError + + p = tmp_path / "bad.mp3" + p.write_bytes(b"x") + monkeypatch.setattr(sources.shutil, "which", lambda _name: "/usr/bin/ffmpeg") + + class FailProc: + def __init__(self): + self.stdout = io.BytesIO(b"") + self.stderr = io.BytesIO(b"") # no diagnostic text + self.returncode = 3 + + def terminate(self): + pass + + def wait(self): + pass + + monkeypatch.setattr(sources.subprocess, "Popen", lambda *a, **k: FailProc()) + with pytest.raises(APIError) as exc: + list(FileSource(str(p), sleep=lambda _s: None)) + assert "exit 3" in exc.value.message + + def test_filesource_empty_wav_raises(tmp_path): p = tmp_path / "empty.wav" with wave.open(str(p), "wb") as w: @@ -168,6 +249,7 @@ def test_filesource_empty_wav_raises(tmp_path): with pytest.raises(CLIError) as exc: list(FileSource(str(p), sleep=lambda _s: None)) assert exc.value.error_type == "empty_audio" + assert exc.value.exit_code == 2 def test_filesource_url_skips_local_check_and_streams_via_ffmpeg(monkeypatch): diff --git a/tests/test_timeparse.py b/tests/test_timeparse.py index 2f6fd426..9f846f0e 100644 --- a/tests/test_timeparse.py +++ b/tests/test_timeparse.py @@ -28,3 +28,8 @@ def test_parse_iso_utc_rejects_non_dates(): assert timeparse.parse_iso_utc(None) is None assert timeparse.parse_iso_utc("") is None assert timeparse.parse_iso_utc("not-a-date") is None + # A truthy non-string must also be rejected (not just falsy None/""). This pins + # the `not isinstance(...) or not value` guard: an `and` there would fall through + # to str-only operations on the int and raise instead of returning None. + assert timeparse.parse_iso_utc(20260601) is None + assert timeparse.parse_iso_utc(["2026-06-01"]) is None diff --git a/tests/test_transcribe_render.py b/tests/test_transcribe_render.py index 4af697af..d40ae194 100644 --- a/tests/test_transcribe_render.py +++ b/tests/test_transcribe_render.py @@ -21,6 +21,15 @@ def _render_styled(transcript) -> str: return cap.get() +def test_fmt_ms_formats_minutes_and_seconds(): + # Pins the mm:ss math: values straddling a 60s boundary distinguish // from % + # and the literal 60 (e.g. 120s is 02:00, not 01:xx). + assert tr._fmt_ms(0) == "00:00" + assert tr._fmt_ms(133000) == "02:13" + assert tr._fmt_ms(120000) == "02:00" + assert tr._fmt_ms(605000) == "10:05" + + def test_per_speaker_lines_are_styled(): transcript = SimpleNamespace( text="flat", @@ -84,17 +93,20 @@ def test_renders_sentiment_aggregate(): SimpleNamespace(text="c", sentiment=SimpleNamespace(value="NEGATIVE")), ], ) - out = _render(transcript) - assert "Sentiment:" in out - assert "positive" in out.lower() + out = _render(transcript).lower() + assert "sentiment:" in out + # Exact aggregated percentages pin the `* 100 // total` math and the `or 1` + # guard: 2 of 3 positive -> 66%, 1 of 3 negative -> 33%. + assert "66% positive" in out + assert "33% negative" in out def test_renders_entities_topics_content_safety_highlights(): transcript = SimpleNamespace( text="t", entities=[SimpleNamespace(entity_type=SimpleNamespace(value="person_name"), text="Ada")], - iab_categories=SimpleNamespace(summary={"Technology": 0.91}), - content_safety=SimpleNamespace(summary={"profanity": 0.4}), + iab_categories=SimpleNamespace(summary={"Technology": 0.91, "Health": 0.12}), + content_safety=SimpleNamespace(summary={"profanity": 0.40, "alcohol": 0.10}), auto_highlights=SimpleNamespace( results=[SimpleNamespace(text="key phrase", count=3, rank=0.9)] ), @@ -104,3 +116,9 @@ def test_renders_entities_topics_content_safety_highlights(): assert "Topics:" in out and "Technology" in out assert "Content Safety:" in out and "profanity" in out assert "Highlights:" in out and "key phrase" in out + # Relevance/confidence are rendered to 2 decimals... + assert "Technology (0.91)" in out + assert "profanity (0.40)" in out + # ...and both lists are sorted most-relevant-first (pins reverse=True). + assert out.index("Technology") < out.index("Health") + assert out.index("profanity") < out.index("alcohol") diff --git a/tests/test_youtube.py b/tests/test_youtube.py index 21006790..3353a534 100644 --- a/tests/test_youtube.py +++ b/tests/test_youtube.py @@ -25,10 +25,11 @@ def _fake_ytdlp(monkeypatch, ydl_cls): def test_download_audio_returns_prepared_path(tmp_path, monkeypatch): created = tmp_path / "vid123.m4a" + captured = {} class FakeYDL: def __init__(self, opts): - self.opts = opts + captured["opts"] = opts def __enter__(self): return self @@ -37,6 +38,7 @@ def __exit__(self, *exc): return False def extract_info(self, url, download): + captured["download"] = download created.write_bytes(b"audio") return {"id": "vid123", "ext": "m4a"} @@ -47,6 +49,11 @@ def prepare_filename(self, info): out = youtube.download_audio("https://youtu.be/vid123", tmp_path) assert out == created assert out.is_file() + # yt-dlp is driven quietly (no console noise) and actually downloads the media. + assert captured["opts"]["quiet"] is True + assert captured["opts"]["no_warnings"] is True + assert captured["opts"]["noprogress"] is True + assert captured["download"] is True def test_download_audio_falls_back_to_landed_file(tmp_path, monkeypatch): @@ -95,6 +102,7 @@ def prepare_filename(self, info): with pytest.raises(CLIError) as exc: youtube.download_audio("https://youtu.be/x", tmp_path) assert exc.value.error_type == "youtube_error" + assert exc.value.exit_code == 1 assert "no audio file" in exc.value.message