Skip to content
Merged
12 changes: 12 additions & 0 deletions AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,18 @@ uv run diff-cover coverage.xml --compare-branch=origin/main --fail-under=100
uv run python scripts/mutation_gate.py origin/main # mutation gate
```

The gate is diff-scoped, so code predating it is never mutation-tested. To audit
existing code (or a whole module) against the same bar, `scripts/mutation_sweep.py`
reuses the gate's engine over *every* line of the files you name (or the whole
package). Refresh coverage first, and pass `--timeout` to that pytest step — the
default suite has no per-test timeout (it's opt-in; see `pyproject.toml`), so a
deadlocked test would wedge the run instead of failing fast:

```sh
uv run pytest -q -n auto --timeout=60 --cov=aai_cli --cov-branch --cov-context=test --cov-report=
uv run python scripts/mutation_sweep.py aai_cli/config.py # or omit paths for the whole package
```

### Test markers

The default suite **excludes** three slow/credentialed marker sets — `pyproject.toml`'s `addopts` carries `-m "not e2e and not install and not install_script"`, so a bare `pytest` matches what `check.sh` gates. An explicit command-line `-m` overrides it for the opt-in runs:
Expand Down
5 changes: 4 additions & 1 deletion aai_cli/commands/init.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,10 @@ def _install_step(
"status": "failed",
"detail": (setup.stderr or setup.stdout).strip()[:300],
}
return [row], False
# The False (don't-launch) is an equivalent mutant: run_init raises Exit(1) on
# any failed step before it ever consults will_launch, so the value is unused
# on this branch.
return [row], False # pragma: no mutate
return [
{
"name": "install",
Expand Down
5 changes: 4 additions & 1 deletion aai_cli/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,10 @@ def _validation_summary(exc: ValidationError) -> str:
in a one-line CLI error.
"""
problems: list[str] = []
for err in exc.errors(include_url=False, include_input=False):
# include_url/include_input=False keep pydantic's url/input fields out of each
# error dict, but this summary only reads loc + msg, so flipping them is an
# equivalent mutant (the rendered string is identical either way).
for err in exc.errors(include_url=False, include_input=False): # pragma: no mutate
loc = ".".join(str(part) for part in err["loc"]) or "top level"
problems.append(f"{loc}: {err['msg']}")
return "; ".join(problems)
Expand Down
7 changes: 5 additions & 2 deletions aai_cli/init/scaffold.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,13 @@ def _copy_tree(node: Traversable, dest: Path) -> None:
name = _DOTFILE_RENAMES.get(child.name, child.name)
out = dest / name
if child.is_dir():
out.mkdir(parents=True, exist_ok=True)
# parents=True is an equivalent mutant here: the walk always creates a
# node's parent before descending, so `dest` (and `out.parent`) already
# exists. exist_ok is exercised by the idempotent re-scaffold test.
out.mkdir(parents=True, exist_ok=True) # pragma: no mutate
_copy_tree(child, out)
else:
out.parent.mkdir(parents=True, exist_ok=True)
out.parent.mkdir(parents=True, exist_ok=True) # pragma: no mutate
out.write_bytes(child.read_bytes())


Expand Down
4 changes: 3 additions & 1 deletion aai_cli/streaming/macos.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,9 @@ def build_helper() -> Path:

cache_dir.mkdir(parents=True, exist_ok=True)
module_cache = cache_dir / "swift-module-cache"
module_cache.mkdir(parents=True, exist_ok=True)
# parents=True is an equivalent mutant here: cache_dir was just created above, so
# module_cache's parent always exists. exist_ok is covered by the rebuild test.
module_cache.mkdir(parents=True, exist_ok=True) # pragma: no mutate
source_path = cache_dir / f"{_HELPER_PREFIX}-{digest}.swift"
source_path.write_bytes(source)
tmp_helper = helper.with_suffix(".tmp")
Expand Down
4 changes: 3 additions & 1 deletion aai_cli/streaming/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,9 @@ def worker(source_label: str, audio: Iterable[bytes], rate: int) -> None:
thread.start()
while any(thread.is_alive() for thread in threads):
for thread in threads:
thread.join(timeout=0.1)
# Poll interval: a responsiveness/CPU tradeoff, not behavior -- the loop
# surfaces a worker error within ~0.1s. Exact value isn't assertable.
thread.join(timeout=0.1) # pragma: no mutate
if not errors.empty():
raise errors.get()
if not errors.empty():
Expand Down
4 changes: 3 additions & 1 deletion aai_cli/transcribe_render.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,9 @@ def _render_chapters(transcript: object, console: Console) -> None:
return
console.print("\n[bold]Chapters:[/bold]")
for ch in chapters:
span = f"{_fmt_ms(jsonshape.as_int(getattr(ch, 'start', 0)))}-{_fmt_ms(jsonshape.as_int(getattr(ch, 'end', 0)))}"
# The `, 0` getattr fallbacks are equivalent mutants: they apply only to a
# chapter missing start/end, and _fmt_ms(0) == _fmt_ms(1) == "00:00" regardless.
span = f"{_fmt_ms(jsonshape.as_int(getattr(ch, 'start', 0)))}-{_fmt_ms(jsonshape.as_int(getattr(ch, 'end', 0)))}" # pragma: no mutate
console.print(f" {span} {getattr(ch, 'headline', '')}")


Expand Down
99 changes: 99 additions & 0 deletions scripts/mutation_sweep.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
"""Whole-file mutation sweep — the diff-scoped gate's repo-wide companion.

``scripts/mutation_gate.py`` only mutates lines changed versus a branch, so code
that predates the gate is never held to its bar. This sweeps EVERY eligible line
of the given files (or the whole package) and reports the mutants that survive —
i.e. the suite still passes with the line deliberately broken — so you can add an
assertion that kills each one (or mark a genuinely-equivalent line
``# pragma: no mutate``). It reuses the gate's own mutation/kill engine, so a
survivor here is a survivor there.

Usage::

# 1. Refresh per-test coverage contexts the sweep reads from .coverage:
uv run pytest -q -n auto --timeout=60 --cov=aai_cli --cov-branch \
--cov-context=test --cov-report=
# 2. Sweep specific files (or omit paths to sweep the whole package):
uv run python scripts/mutation_sweep.py aai_cli/config.py
uv run python scripts/mutation_sweep.py

Pass ``--timeout`` to the pytest step above: the default suite has no per-test
timeout (it is opt-in; see pyproject), and a deadlocked test would otherwise wedge
the whole run instead of failing fast.

Exit status is 1 if any real survivor is found, else 0. Lines whose mutants have
no covering test are reported separately as UNCOVERED (not failed): coverage
attributes import-time evaluated defaults to no test, so that bucket needs a
manual look rather than blind action.
"""

from __future__ import annotations

import importlib.util
import sys
from pathlib import Path
from types import ModuleType

import coverage

_HERE = Path(__file__).resolve().parent
_PKG = _HERE.parent / "aai_cli"
_TEMPLATES = _PKG / "init" / "templates"


def _load_gate() -> ModuleType:
# ModuleType attribute access is dynamic, so reusing the gate's private helpers
# (_collect/_covering_tests/_survives) below needs no type-checker escape hatch.
spec = importlib.util.spec_from_file_location("mutation_gate", _HERE / "mutation_gate.py")
if spec is None or spec.loader is None:
raise RuntimeError("could not load scripts/mutation_gate.py")
module = importlib.util.module_from_spec(spec)
sys.modules["mutation_gate"] = module
spec.loader.exec_module(module)
return module


def _package_files() -> list[Path]:
return sorted(p for p in _PKG.rglob("*.py") if _TEMPLATES not in p.parents)


def _sweep_file(
mg: ModuleType, path: Path, data: coverage.CoverageData
) -> tuple[int, list[str], list[str]]:
line_count = len(path.read_text(encoding="utf-8").splitlines())
tree, src, mutants = mg._collect(path, set(range(1, line_count + 1)))
survivors: list[str] = []
uncovered: list[str] = []
for mutant in mutants:
if not mg._covering_tests(data, path, mutant.linenos):
uncovered.append(mutant.label)
elif mg._survives(path, tree, src, mutant, data):
survivors.append(mutant.label)
return len(mutants), survivors, uncovered


def main() -> int:
mg = _load_gate()
args = [Path(a) for a in sys.argv[1:]] or _package_files()
data = coverage.CoverageData()
data.read()
total = 0
all_survivors: list[str] = []
for path in args:
tested, survivors, uncovered = _sweep_file(mg, path, data)
total += tested
all_survivors += survivors
sys.stdout.write(f"\n=== {path} : {tested} mutants ===\n")
for label in survivors:
sys.stdout.write(f" SURVIVES {label}\n")
for label in uncovered:
sys.stdout.write(f" uncovered {label}\n")
if not survivors and not uncovered:
sys.stdout.write(" clean\n")
sys.stdout.flush()
sys.stdout.write(f"\nTOTAL {total} mutant(s); {len(all_survivors)} surviving\n")
return 1 if all_survivors else 0


if __name__ == "__main__":
raise SystemExit(main())
2 changes: 2 additions & 0 deletions tests/setup_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,14 @@ class FakeRun:

def __init__(self, returncodes=None, *, creates_skill=True, removes_skill=True):
self.calls = []
self.invocations = [] # (cmd, kwargs) per call, so tests can assert timeout etc.
self.returncodes = returncodes or {}
self.creates_skill = creates_skill
self.removes_skill = removes_skill

def __call__(self, cmd, *args, **kwargs):
self.calls.append(cmd)
self.invocations.append((list(cmd), dict(kwargs)))
rc = 0
best = -1
for prefix, code in self.returncodes.items():
Expand Down
14 changes: 14 additions & 0 deletions tests/test_agent_audio.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,20 @@ def factory(*, rate, blocksize, callback, device):
assert fake.stopped and fake.closed


def test_duplex_floors_blocksize_at_one():
# A pathologically small device rate (//10 == 0) must still open with at least one
# frame per block; the max(1, ...) floor prevents a 0-frame block.
seen = {}

def factory(*, rate, blocksize, callback, device):
seen["blocksize"] = blocksize
return FakeStream()

d = DuplexAudio(device_rate=5, stream_factory=factory) # 5 // 10 == 0
d.player.start()
assert seen["blocksize"] == 1


def test_duplex_restart_after_close_reopens_stream():
calls = {"n": 0}

Expand Down
16 changes: 16 additions & 0 deletions tests/test_agent_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,3 +266,19 @@ def test_send_audio_loop_waits_for_ready_event_before_streaming():
ws = _RecordingWS()
_send_audio_loop(ws, s, [b"\x01\x02"])
assert len(ws.sent) == 1 # frame forwarded once the gate is open


def test_send_audio_loop_waits_on_ready_event_with_bounded_timeout():
# The wait on ready_event is bounded so a server that never sends `ready` can't
# wedge the send loop forever; pins the 10s timeout.
seen = {}

class _RecordingEvent:
def wait(self, timeout=None):
seen["timeout"] = timeout
return True

s = _session(exit_after_reply=True, ready_event=_RecordingEvent())
s.ready = True
_send_audio_loop(_RecordingWS(), s, [b"\x01\x02"])
assert seen["timeout"] == 10
47 changes: 47 additions & 0 deletions tests/test_agent_session_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,53 @@ def close(self):
assert exc.value.exit_code == 1 # the real mic failure reaches the user, not a hang


def test_run_session_capture_thread_is_daemon(monkeypatch):
# The capture thread is a daemon so a stuck mic read can't keep the process alive
# after the session ends.
import threading as _threading

from aai_cli.agent import session as session_mod

daemons = []
real_cls = session_mod.threading.Thread

class SpyThread(real_cls):
def __init__(self, *a, **k):
daemons.append(k.get("daemon"))
super().__init__(*a, **k)

monkeypatch.setattr(session_mod.threading, "Thread", SpyThread)

class _BoomMic:
def __iter__(self):
raise CLIError("no microphone", error_type="mic_error", exit_code=1)

class _BlockingWS:
def __init__(self):
self._closed = _threading.Event()

def send(self, _msg):
pass

def __iter__(self):
self._closed.wait(timeout=2)
return iter(())

def close(self):
self._closed.set()

with pytest.raises(CLIError):
run_session(
"sk_live",
renderer=FakeRenderer(),
player=FakePlayer(),
mic=_BoomMic(),
config=AgentRunConfig(voice="ivy", system_prompt="x", greeting="hi"),
connect=lambda url, **kwargs: _BlockingWS(),
)
assert daemons == [True] # the one capture thread, created as a daemon


def test_run_session_does_not_close_player_that_failed_to_open():
# If opening the speaker stream raises, the cleanup must NOT call close() on a
# player that never started (pins the player_started=False initializer).
Expand Down
24 changes: 24 additions & 0 deletions tests/test_auth_loopback.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,30 @@ def test_capture_times_out_without_callback():
assert result.token is None


def test_capture_server_thread_is_daemon_and_joined_with_timeout(monkeypatch):
# The serve_forever thread must be a daemon (so it can't block process exit) and the
# cleanup join must be bounded (5s) so a wedged server can't hang shutdown. The
# server really serves (no callback arrives, so capture just times out fast); we
# only spy on the thread's daemon flag and join timeout.
created = {}
real_cls = loopback.threading.Thread

class SpyThread(real_cls):
def __init__(self, *a, **k):
created["daemon"] = k.get("daemon")
super().__init__(*a, **k)

def join(self, timeout=None):
created["join_timeout"] = timeout
return super().join(timeout)

monkeypatch.setattr(loopback.threading, "Thread", SpyThread)
result = loopback.capture_callback(timeout=0.1) # no callback -> times out
assert result.error == "timeout"
assert created["daemon"] is True
assert created["join_timeout"] == 5


def test_capture_raises_clean_error_when_port_unavailable(monkeypatch):
# Occupy a port, then point the callback server at it: binding must fail with a
# clean APIError, not a raw OSError traceback escaping run_login_flow.
Expand Down
23 changes: 20 additions & 3 deletions tests/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,23 +174,29 @@ def test_empty_api_key_flag_rejected():

from aai_cli.errors import CLIError

with pytest.raises(CLIError):
with pytest.raises(CLIError) as exc:
config.resolve_api_key(api_key_flag="")
assert exc.value.error_type == "invalid_key"
assert exc.value.exit_code == 2 # usage error, not the generic 1


def test_invalid_profile_name_has_suggestion():
with pytest.raises(CLIError) as exc:
config.set_api_key("bad name!", "sk_x")
assert exc.value.message.startswith("Invalid profile name")
assert exc.value.error_type == "invalid_profile"
assert exc.value.exit_code == 2 # usage error, not the generic 1
assert exc.value.suggestion == "Use only letters, digits, '-' or '_'."


def test_malformed_config_raises_clean_error(tmp_config):
from aai_cli.errors import CLIError

(tmp_config / "config.toml").write_text("this is not = = valid toml ===\n")
with pytest.raises(CLIError):
with pytest.raises(CLIError) as exc:
config.get_active_profile()
assert exc.value.error_type == "invalid_config"
assert exc.value.exit_code == 2 # usage error, not the generic 1


def test_unexpected_config_shape_raises_clean_error(tmp_config):
Expand All @@ -200,7 +206,7 @@ def test_unexpected_config_shape_raises_clean_error(tmp_config):
with pytest.raises(CLIError) as exc:
config.get_active_profile()
assert exc.value.error_type == "invalid_config"
assert exc.value.exit_code == 2
assert exc.value.exit_code == 2 # usage error, not the generic 1


def test_unexpected_config_shape_error_is_compact(tmp_config):
Expand Down Expand Up @@ -245,6 +251,17 @@ def test_validation_summary_labels_rootlevel_problems():
assert config._validation_summary(exc.value).startswith("top level: ")


def test_dump_creates_missing_parent_directories(monkeypatch, tmp_path):
# The config dir's parents may not exist yet (first run on a fresh machine);
# _dump must create the whole chain (mkdir parents=True), not just the leaf.
nested = tmp_path / "deeply" / "nested" / "config"
monkeypatch.setattr("aai_cli.config.config_dir", lambda: nested)
config.set_api_key("default", "sk_abc")
assert nested.is_dir()
assert (nested / "config.toml").exists()
assert config.get_api_key("default") == "sk_abc"


def test_config_roundtrips_after_special_value(tmp_path, monkeypatch):
# profile names are validated; this checks tomli_w writes valid TOML for normal data
config.set_api_key("staging", "sk_x")
Expand Down
Loading
Loading