From 6b33cdb51132e156b6292c4f90702b1c2e665d7e Mon Sep 17 00:00:00 2001 From: Claude Date: Fri, 19 Jun 2026 05:09:53 +0000 Subject: [PATCH] Refactor near-limit files on their natural seams Two source files sat within ~30 lines of the 500-line max-file-length gate. Split each along its existing concern boundary, preserving behavior: - core/config.py (470->297): extract the config.toml document schema (Profile/Config/StoredSession) plus the parse/cache/atomic-write store into core/config_store.py, mirroring the keyring_store factoring. config stays the auth/profile facade and reads as plain accessors over config_store.load/dump/update. - commands/clip/_exec.py (466->381): extract the ffmpeg cutting stage (silence detection, per-segment re-encode, WrittenClip) into commands/clip/_cut.py, beside the existing _select selection module; _exec is now the orchestration that ties selection and cutting together. Tests repoint to the new modules. Added coverage for two lines that the move pulled into the diff scope: the Windows PermissionError retry path and the real config_dir resolver (both previously uncovered-but-tolerated). Co-Authored-By: Claude Opus 4.8 (1M context) Claude-Session: https://claude.ai/code/session_016aEjd8hrNh1NoWLuZ2fyLT --- aai_cli/AGENTS.md | 2 +- aai_cli/commands/clip/_cut.py | 104 ++++++++++++++++ aai_cli/commands/clip/_exec.py | 105 ++-------------- aai_cli/commands/config.py | 6 +- aai_cli/core/config.py | 221 ++++----------------------------- aai_cli/core/config_store.py | 190 ++++++++++++++++++++++++++++ tests/conftest.py | 2 +- tests/test_clip_exec.py | 3 +- tests/test_config.py | 70 ++++++++--- tests/test_config_command.py | 14 +-- tests/test_onboard_command.py | 4 +- tests/test_update_check.py | 36 +++--- tests/test_update_prompt.py | 4 +- 13 files changed, 420 insertions(+), 341 deletions(-) create mode 100644 aai_cli/commands/clip/_cut.py create mode 100644 aai_cli/core/config_store.py diff --git a/aai_cli/AGENTS.md b/aai_cli/AGENTS.md index fd2f352f..6cc9f17f 100644 --- a/aai_cli/AGENTS.md +++ b/aai_cli/AGENTS.md @@ -139,7 +139,7 @@ heavily-reworked commands with long bodies; small commands keep the inline ### Cross-cutting state (resolution order matters) - **`app/context.py`** — `AppState` (profile, env) is attached to the Typer context in the root `@app.callback()`. `run_command` is the standard command wrapper. -- **`core/config.py`** — profiles persisted in `config.toml` (via `platformdirs`); the **API key lives only in the OS keyring**, never in a dotfile. The keyring access itself is factored into **`core/keyring_store.py`** (the single importer of `keyring`, holding `KEYRING_SERVICE = "assemblyai-cli"` + `set_secret`/`get_secret`/`restore_secret`/`delete_secret`/`usable`), so the "secrets never touch the dotfile" split is structural; `config` reads/writes secrets through it and only `config.keyring_usable` re-surfaces the probe on the auth facade. Key resolution order: `--api-key` flag (validation paths only) → `ASSEMBLYAI_API_KEY` env → keyring. **Run commands deliberately expose no `--api-key` flag** so keys can't leak into `ps`/shell history. Every `config.toml` write is a read-modify-write (`_load` → mutate → `_dump`) via the `config._update` context manager: `_dump` is a temp-file + atomic `os.replace`, so a reader never sees a torn file. Writers and readers are otherwise unsynchronized — last write wins (there is **no** cross-process lock; an earlier `filelock`-based serialization was removed because it was a recurring Windows CI flake and the lost-update race it closed isn't worth the cost for a single-user CLI). On Windows the atomic replace has no replace-over-open guarantee, so both the lock-free read and the `os.replace` ride out the transient `PermissionError` through `config._retry_on_sharing_violation` (a no-op on POSIX). +- **`core/config.py`** — profiles persisted in `config.toml` (via `platformdirs`); the **API key lives only in the OS keyring**, never in a dotfile. The keyring access itself is factored into **`core/keyring_store.py`** (the single importer of `keyring`, holding `KEYRING_SERVICE = "assemblyai-cli"` + `set_secret`/`get_secret`/`restore_secret`/`delete_secret`/`usable`), so the "secrets never touch the dotfile" split is structural; `config` reads/writes secrets through it and only `config.keyring_usable` re-surfaces the probe on the auth facade. Key resolution order: `--api-key` flag (validation paths only) → `ASSEMBLYAI_API_KEY` env → keyring. **Run commands deliberately expose no `--api-key` flag** so keys can't leak into `ps`/shell history. The `config.toml` document schema (`Profile`/`Config`/`StoredSession`) and its parse/cache/atomic-write machinery live one layer down in **`core/config_store.py`** (the same factoring as `keyring_store`): `config` is the auth/profile facade and reads as plain accessors over `config_store.load`/`dump`/`update`, so the store rules stay structural. Every `config.toml` write is a read-modify-write (`load` → mutate → `dump`) via the `config_store.update` context manager: `dump` is a temp-file + atomic `os.replace`, so a reader never sees a torn file. Writers and readers are otherwise unsynchronized — last write wins (there is **no** cross-process lock; an earlier `filelock`-based serialization was removed because it was a recurring Windows CI flake and the lost-update race it closed isn't worth the cost for a single-user CLI). On Windows the atomic replace has no replace-over-open guarantee, so both the lock-free read and the `os.replace` ride out the transient `PermissionError` through `config_store._retry_on_sharing_violation` (a no-op on POSIX). Tests isolate the config dir by patching `config_store.config_dir` (the autouse `tmp_config` fixture). - **`core/environments.py`** — a frozen `Environment` (api_base, streaming_host, llm_gateway_base, ams_base, stytch_*). `DEFAULT_ENV` is **`production`**; use `--sandbox` (or `--env sandbox000` / `AAI_ENV`) to target the sandbox. The active environment is a process-global set once at startup; precedence: `--env` → `AAI_ENV` → profile's stored env → default. A credential is only valid against the environment that minted it. - **`core/client.py`** — thin wrappers over the `assemblyai` SDK (`transcribe`, `list_transcripts`, `stream_audio`, etc.). It normalizes SDK exceptions: auth failures become a single clean `auth_failure()` `CLIError`; everything else becomes `APIError`. New SDK calls should follow this try/except shape. - **`core/errors.py`** — the `CLIError` hierarchy (each with `error_type` + `exit_code`). `ui/output.py` emits errors to **stderr**; stdout stays clean for pipelines. `--json` switches to machine-readable output; it is never auto-enabled — `output.resolve_json()` deliberately keeps human text the default even when piped or agent-run. diff --git a/aai_cli/commands/clip/_cut.py b/aai_cli/commands/clip/_cut.py new file mode 100644 index 00000000..cf4cf4d9 --- /dev/null +++ b/aai_cli/commands/clip/_cut.py @@ -0,0 +1,104 @@ +"""ffmpeg cutting for `assembly clip`: silence detection + per-segment re-encode. + +The pure selection logic (range parsing, utterance filtering, merging) lives in +``_select``; this module is the final stage — it turns the merged ``Segment`` +windows into output files with ffmpeg: one ``silencedetect`` pass to snap cuts +into nearby pauses, then a frame-accurate re-encode per segment. The +orchestration that ties selection and cutting together stays in ``_exec``. +""" + +from __future__ import annotations + +from dataclasses import dataclass +from pathlib import Path + +from rich.markup import escape + +from aai_cli.app import mediafile +from aai_cli.commands.clip import _select as clip_select +from aai_cli.commands.clip._select import Segment +from aai_cli.ui import output + +# -30dB for at least 0.2s reads as a pause in normal speech recordings. +SILENCE_FILTER = "silencedetect=noise=-30dB:d=0.2" + + +def detect_silences(ffmpeg: str, media: Path) -> list[Segment]: + """The silence intervals ffmpeg hears in ``media`` (one decode pass). + + Snapping is best-effort: a failed detection returns no silences (so the + cut proceeds at the selected times) rather than failing the command. + silencedetect logs at info level on stderr, so the usual ``-loglevel + error`` would silence the very lines this parses. + """ + result = mediafile.run_ffmpeg( + [ + ffmpeg, + "-hide_banner", + "-nostats", + "-i", + str(media), + "-af", + SILENCE_FILTER, + "-f", + "null", + "-", + ] + ) + if result.returncode != 0: + return [] + return clip_select.parse_silences(result.stderr) + + +def cut_clip(ffmpeg: str, media: Path, segment: Segment, dest: Path) -> None: + """Re-encode one segment of ``media`` into ``dest``. + + Re-encoding (no ``-c copy``) keeps cuts frame-accurate where stream copy + would snap to the nearest keyframe; ``-y`` makes a re-run overwrite its own + earlier output instead of stalling on ffmpeg's prompt. + """ + result = mediafile.run_ffmpeg( + [ + ffmpeg, + "-hide_banner", + "-loglevel", + "error", + "-y", + "-i", + str(media), + "-ss", + f"{segment.start:.3f}", + "-to", + f"{segment.end:.3f}", + mediafile.path_arg(dest), + ] + ) + if result.returncode != 0: + raise mediafile.ffmpeg_failure(result, "cut", dest, error_type="clip_failed") + + +def clip_dest(media: Path, out_dir: Path | None, index: int) -> Path: + directory = out_dir if out_dir is not None else media.parent + return directory / f"{media.stem}.clip{index:02d}{media.suffix}" + + +@dataclass(frozen=True) +class WrittenClip: + """One output file and the source window it was cut from.""" + + path: Path + segment: Segment + + def payload(self) -> dict[str, object]: + return { + "path": str(self.path), + "start": round(self.segment.start, 3), + "end": round(self.segment.end, 3), + "duration": round(self.segment.end - self.segment.start, 3), + } + + def human_line(self) -> str: + start = clip_select.format_clock(self.segment.start) + end = clip_select.format_clock(self.segment.end) + duration = round(self.segment.end - self.segment.start, 3) + return output.success(f"{escape(str(self.path))} {start} - {end} ({duration}s)") diff --git a/aai_cli/commands/clip/_exec.py b/aai_cli/commands/clip/_exec.py index bbb03871..940cdad5 100644 --- a/aai_cli/commands/clip/_exec.py +++ b/aai_cli/commands/clip/_exec.py @@ -5,7 +5,8 @@ options/run split, see AGENTS.md), so tests drive transcript resolution and the ffmpeg orchestration by constructing options directly. The pure selection logic (range parsing, utterance filtering, LLM reply parsing, merging) lives in -``clip_select``. +``clip_select``; the ffmpeg cutting (silence detection + per-segment re-encode) +lives in ``_cut``. This module is the orchestration that ties them together. Selection composes four sources: ``--speaker`` and ``--search`` filter the diarized utterances of a transcript (made on the fly, reused via @@ -25,10 +26,9 @@ from pathlib import Path from types import SimpleNamespace -from rich.markup import escape - from aai_cli.app import batch, mediafile from aai_cli.app.context import AppState +from aai_cli.commands.clip import _cut as clip_cut from aai_cli.commands.clip import _select as clip_select from aai_cli.commands.clip._select import Segment from aai_cli.core import jsonshape, llm, stdio, youtube @@ -230,91 +230,6 @@ def _validate_selection(opts: ClipOptions) -> None: ) -# -30dB for at least 0.2s reads as a pause in normal speech recordings. -_SILENCE_FILTER = "silencedetect=noise=-30dB:d=0.2" - - -def _detect_silences(ffmpeg: str, media: Path) -> list[Segment]: - """The silence intervals ffmpeg hears in ``media`` (one decode pass). - - Snapping is best-effort: a failed detection returns no silences (so the - cut proceeds at the selected times) rather than failing the command. - silencedetect logs at info level on stderr, so the usual ``-loglevel - error`` would silence the very lines this parses. - """ - result = mediafile.run_ffmpeg( - [ - ffmpeg, - "-hide_banner", - "-nostats", - "-i", - str(media), - "-af", - _SILENCE_FILTER, - "-f", - "null", - "-", - ] - ) - if result.returncode != 0: - return [] - return clip_select.parse_silences(result.stderr) - - -def _cut_clip(ffmpeg: str, media: Path, segment: Segment, dest: Path) -> None: - """Re-encode one segment of ``media`` into ``dest``. - - Re-encoding (no ``-c copy``) keeps cuts frame-accurate where stream copy - would snap to the nearest keyframe; ``-y`` makes a re-run overwrite its own - earlier output instead of stalling on ffmpeg's prompt. - """ - result = mediafile.run_ffmpeg( - [ - ffmpeg, - "-hide_banner", - "-loglevel", - "error", - "-y", - "-i", - str(media), - "-ss", - f"{segment.start:.3f}", - "-to", - f"{segment.end:.3f}", - mediafile.path_arg(dest), - ] - ) - if result.returncode != 0: - raise mediafile.ffmpeg_failure(result, "cut", dest, error_type="clip_failed") - - -def _clip_dest(media: Path, out_dir: Path | None, index: int) -> Path: - directory = out_dir if out_dir is not None else media.parent - return directory / f"{media.stem}.clip{index:02d}{media.suffix}" - - -@dataclass(frozen=True) -class WrittenClip: - """One output file and the source window it was cut from.""" - - path: Path - segment: Segment - - def payload(self) -> dict[str, object]: - return { - "path": str(self.path), - "start": round(self.segment.start, 3), - "end": round(self.segment.end, 3), - "duration": round(self.segment.end - self.segment.start, 3), - } - - def human_line(self) -> str: - start = clip_select.format_clock(self.segment.start) - end = clip_select.format_clock(self.segment.end) - duration = round(self.segment.end - self.segment.start, 3) - return output.success(f"{escape(str(self.path))} {start} - {end} ({duration}s)") - - def run_clip(opts: ClipOptions, state: AppState, *, json_mode: bool) -> None: """Execute `assembly clip`: one source, or a stdin batch (`--from-stdin`).""" _validate_out_dir(opts.out_dir) @@ -407,7 +322,7 @@ def _clip_one( state: AppState, *, json_mode: bool, -) -> tuple[dict[str, object], list[WrittenClip]]: +) -> tuple[dict[str, object], list[clip_cut.WrittenClip]]: """Resolve ``opts.media`` to a local file and cut its clips; the payload + clips. A media-page URL is downloaded once — the audio track by default, the full @@ -443,21 +358,21 @@ def _cut( state: AppState, *, json_mode: bool, -) -> tuple[dict[str, object], list[WrittenClip]]: +) -> tuple[dict[str, object], list[clip_cut.WrittenClip]]: """Select and cut the clips for an already-local media file; the payload + clips.""" matched, transcript_id = _transcript_segments(opts, media, state, json_mode=json_mode) segments = clip_select.merge_segments([*matched, *explicit], opts.padding) if opts.snap: with output.status("Detecting silence…", json_mode=json_mode, quiet=state.quiet): - silences = _detect_silences(ffmpeg, media) + silences = clip_cut.detect_silences(ffmpeg, media) segments = clip_select.snap_to_silences(segments, silences) - written: list[WrittenClip] = [] + written: list[clip_cut.WrittenClip] = [] cutting = f"Cutting {len(segments)} clip(s)…" with output.status(cutting, json_mode=json_mode, quiet=state.quiet): for index, segment in enumerate(segments, 1): - dest = _clip_dest(media, out_dir, index) - _cut_clip(ffmpeg, media, segment, dest) - written.append(WrittenClip(path=dest, segment=segment)) + dest = clip_cut.clip_dest(media, out_dir, index) + clip_cut.cut_clip(ffmpeg, media, segment, dest) + written.append(clip_cut.WrittenClip(path=dest, segment=segment)) payload: dict[str, object] = { "source": opts.media, "transcript_id": transcript_id, diff --git a/aai_cli/commands/config.py b/aai_cli/commands/config.py index ceedde76..7cbea089 100644 --- a/aai_cli/commands/config.py +++ b/aai_cli/commands/config.py @@ -14,7 +14,7 @@ from aai_cli import command_registry, help_panels, options from aai_cli.app.context import AppState, run_command -from aai_cli.core import config, environments +from aai_cli.core import config, config_store, environments from aai_cli.core.choices import ConfigKey from aai_cli.core.errors import UsageError from aai_cli.ui import output @@ -104,7 +104,7 @@ def path( """Print where config.toml lives""" def body(_state: AppState, json_mode: bool) -> None: - file = config.config_file_path() + file = config_store.config_file_path() if json_mode: output.emit({"path": str(file)}, str, json_mode=True) else: @@ -134,7 +134,7 @@ def list_settings( def body(_state: AppState, json_mode: bool) -> None: data: dict[str, object] = { - "path": str(config.config_file_path()), + "path": str(config_store.config_file_path()), "active_profile": config.get_active_profile(), "profiles": config.list_profiles(), "telemetry_enabled": config.get_telemetry_enabled(), diff --git a/aai_cli/core/config.py b/aai_cli/core/config.py index f8933df8..19f5a401 100644 --- a/aai_cli/core/config.py +++ b/aai_cli/core/config.py @@ -1,21 +1,12 @@ from __future__ import annotations -import contextlib -import io -import os import re -import tempfile -import time -import tomllib import uuid -from collections.abc import Callable, Generator -from pathlib import Path -import platformdirs -import tomli_w -from pydantic import BaseModel, ConfigDict, Field, ValidationError +from pydantic import ValidationError -from aai_cli.core import debuglog, env, keyring_store +from aai_cli.core import config_store, debuglog, env, keyring_store +from aai_cli.core.config_store import Profile, StoredSession from aai_cli.core.errors import CLIError, NotAuthenticated ENV_API_KEY = "ASSEMBLYAI_API_KEY" @@ -24,46 +15,6 @@ _PROFILE_RE = re.compile(r"^[A-Za-z0-9_-]+$") -class Profile(BaseModel): - """A single profile's non-secret settings persisted in config.toml. - - ``extra="allow"`` so unknown keys written by a newer CLI survive a round-trip - through an older one instead of being silently dropped on the next ``_dump``. - """ - - model_config = ConfigDict(extra="allow") - - env: str | None = None - account_id: int | None = None - # Login email from AMS discovery; gates internal-environment access (see core.access). - email: str | None = None - - -class Config(BaseModel): - """The whole config.toml document. ``active_profile`` stays optional so we can - tell "never set" apart from the default and only adopt a new profile as active - when the file had none (matching the historic ``setdefault`` semantics).""" - - model_config = ConfigDict(extra="allow") - - active_profile: str | None = None - profiles: dict[str, Profile] = Field(default_factory=dict) - # Telemetry state (see telemetry.py): a random anonymous install id, and the - # persisted opt-out. None means "never chosen", which the opt-out model reads - # as enabled — distinct from an explicit False written by `assembly telemetry disable`. - device_id: str | None = None - telemetry_enabled: bool | None = None - update_last_check: float | None = None - update_latest_version: str | None = None - - -class StoredSession(BaseModel): - """The browser-login Stytch session blob persisted in the OS keyring as JSON.""" - - jwt: str - token: str = "" - - def validate_profile(name: str) -> None: """Reject profile names that aren't simple identifiers. @@ -80,137 +31,13 @@ def validate_profile(name: str) -> None: ) -def config_dir() -> Path: - return Path(platformdirs.user_config_dir("assemblyai")) - - -def config_file_path() -> Path: - """Where config.toml lives — surfaced by `assembly config path` so users can - find the file without knowing the platformdirs convention.""" - return _config_file() - - -def _config_file() -> Path: - return config_dir() / "config.toml" - - -def _validation_summary(exc: ValidationError) -> str: - """A compact, human-sized summary of a pydantic ValidationError. - - Just "field: reason" per problem — pydantic's full rendering dumps input values - and errors.pydantic.dev doc URLs, which is noise (and a potential value leak) - in a one-line CLI error. - """ - problems: list[str] = [] - # include_url/include_input=False keep pydantic's url/input fields out of each - # error dict, but this summary only reads loc + msg, so flipping them is an - # equivalent mutant (the rendered string is identical either way). - for err in exc.errors(include_url=False, include_input=False): # pragma: no mutate - loc = ".".join(str(part) for part in err["loc"]) or "top level" - problems.append(f"{loc}: {err['msg']}") - return "; ".join(problems) - - -# Parsed-config cache: path -> (mtime_ns, size, parsed). The several _load() -# calls in one CLI invocation (profile, env, key resolution) then don't each -# re-read and re-parse the same unchanged TOML; _dump() bumps the mtime, which -# invalidates it naturally. Callers mutate the returned Config (and persist_login -# snapshots one for rollback), so hand out deep copies, never the cached object. -_load_cache: dict[Path, tuple[int, int, Config]] = {} - -# Windows has no atomic replace-over-open like POSIX: while _dump swaps the temp -# file in (os.replace), a racing open on the same path transiently fails with -# PermissionError. Since readers are lock-free, both a lock-free reader's open and -# the writer's replace can lose that race, so each retries a few short backoffs to -# ride out the (sub-millisecond) rename window. POSIX replaces atomically and never -# raises here, so this only ever loops on Windows. -_SHARING_RETRIES = 5 # pragma: no mutate -- a ±1 change in the retry budget is equivalent -_SHARING_BACKOFF = 0.02 # pragma: no mutate -- a timing constant; any small value works - - -def _retry_on_sharing_violation[T](op: Callable[[], T]) -> T: - """Run a file op, retrying the transient PermissionError Windows raises when an - open and an os.replace race on the same path (see _SHARING_RETRIES).""" - for _ in range(_SHARING_RETRIES - 1): - try: - return op() - except PermissionError: - time.sleep(_SHARING_BACKOFF) - return op() # the last attempt's error (a genuine permission problem) propagates - - -def _load() -> Config: - path = _config_file() - try: - stat = path.stat() - except OSError: - return Config() - cached = _load_cache.get(path) - if cached is not None and cached[0] == stat.st_mtime_ns and cached[1] == stat.st_size: - return cached[2].model_copy(deep=True) - raw = _retry_on_sharing_violation(path.read_bytes) - try: - data = tomllib.load(io.BytesIO(raw)) - except tomllib.TOMLDecodeError as exc: - raise CLIError( - f"Config file at {path} is not valid TOML ({exc}). Fix or delete it.", - error_type="invalid_config", - exit_code=2, - ) from exc - try: - cfg = Config.model_validate(data) - except ValidationError as exc: - raise CLIError( - f"Config file at {path} has an unexpected shape " - f"({_validation_summary(exc)}). Fix or delete it.", - error_type="invalid_config", - exit_code=2, - ) from exc - _load_cache[path] = (stat.st_mtime_ns, stat.st_size, cfg) - return cfg.model_copy(deep=True) - - -def _dump(cfg: Config) -> None: - path = _config_file() - path.parent.mkdir(parents=True, exist_ok=True) - # Write to a sibling temp file and atomically rename over the target, so a crash - # (or concurrent reader) mid-write can never leave config.toml truncated into - # invalid TOML that _load would then reject. os.replace is atomic within a dir. - # exclude_none is required: TOML has no null and tomli_w rejects None values. - fd, tmp_name = tempfile.mkstemp(dir=path.parent, prefix=".config-", suffix=".toml.tmp") - tmp = Path(tmp_name) - try: - with os.fdopen(fd, "wb") as fh: - tomli_w.dump(cfg.model_dump(exclude_none=True), fh) - _retry_on_sharing_violation(lambda: tmp.replace(path)) - # The mtime/size key usually invalidates on its own, but drop the entry - # explicitly so a same-size rewrite on a coarse-mtime filesystem can't - # serve the pre-write parse. - _load_cache.pop(path, None) - except BaseException: - with contextlib.suppress(OSError): - tmp.unlink() - raise - - -@contextlib.contextmanager -def _update() -> Generator[Config]: - """Run a config.toml read-modify-write: ``_load`` -> mutate the yielded config -> - ``_dump``. The dump runs on clean exit; an exception in the block propagates and - skips it. The atomic os.replace in ``_dump`` keeps a reader from ever seeing a torn - file (writers and readers are otherwise unsynchronized: last write wins).""" - cfg = _load() - yield cfg - _dump(cfg) - - def get_active_profile() -> str: - return _load().active_profile or DEFAULT_PROFILE + return config_store.load().active_profile or DEFAULT_PROFILE def list_profiles() -> dict[str, str | None]: """Profile name -> stored backend env, for every profile in config.toml.""" - return {name: prof.env for name, prof in _load().profiles.items()} + return {name: prof.env for name, prof in config_store.load().profiles.items()} def set_active_profile(name: str) -> None: @@ -221,7 +48,7 @@ def set_active_profile(name: str) -> None: with no hint why, so the typo is rejected here with the known names listed. """ validate_profile(name) - with _update() as cfg: + with config_store.update() as cfg: if name not in cfg.profiles: known = ", ".join(sorted(cfg.profiles)) or "none yet" raise CLIError( @@ -236,7 +63,7 @@ def set_active_profile(name: str) -> None: def set_api_key(profile: str, api_key: str) -> None: validate_profile(profile) keyring_store.set_secret(profile, api_key) - with _update() as cfg: + with config_store.update() as cfg: cfg.profiles.setdefault(profile, Profile()) if cfg.active_profile is None: cfg.active_profile = profile @@ -259,27 +86,27 @@ def keyring_usable() -> bool: def get_profile_env(profile: str) -> str | None: """The backend environment recorded for a profile, if any (e.g. 'sandbox000').""" - prof = _load().profiles.get(profile) + prof = config_store.load().profiles.get(profile) return prof.env if prof else None def set_profile_env(profile: str, env: str) -> None: """Bind a backend environment to a profile so its key and hosts stay matched.""" validate_profile(profile) - with _update() as cfg: + with config_store.update() as cfg: cfg.profiles.setdefault(profile, Profile()).env = env def get_profile_email(profile: str) -> str | None: """The login email recorded for a profile at browser login, if any.""" - prof = _load().profiles.get(profile) + prof = config_store.load().profiles.get(profile) return prof.email if prof else None def set_profile_email(profile: str, email: str) -> None: """Persist the login email for a profile (gates internal-environment access).""" validate_profile(profile) - with _update() as cfg: + with config_store.update() as cfg: cfg.profiles.setdefault(profile, Profile()).email = email @@ -305,7 +132,7 @@ def set_session(profile: str, *, session_jwt: str, session_token: str, account_i _session_username(profile), StoredSession(jwt=session_jwt, token=session_token).model_dump_json(), ) - with _update() as cfg: + with config_store.update() as cfg: cfg.profiles.setdefault(profile, Profile()).account_id = account_id @@ -323,17 +150,17 @@ def get_session(profile: str) -> dict[str, str] | None: def get_account_id(profile: str) -> int | None: """The AMS account id recorded at login for a profile, if any.""" - prof = _load().profiles.get(profile) + prof = config_store.load().profiles.get(profile) return prof.account_id if prof else None def clear_session(profile: str) -> None: keyring_store.delete_secret(_session_username(profile)) - cfg = _load() + cfg = config_store.load() prof = cfg.profiles.get(profile) if prof and prof.account_id is not None: prof.account_id = None - _dump(cfg) + config_store.dump(cfg) def persist_login( @@ -359,7 +186,7 @@ def persist_login( # Snapshot the prior state so a mid-sequence failure rolls back cleanly to it. prior_api_key = keyring_store.get_secret(profile) prior_session = keyring_store.get_secret(_session_username(profile)) - prior_cfg = _load() + prior_cfg = config_store.load() done = False try: set_api_key(profile, api_key) @@ -378,48 +205,48 @@ def persist_login( if not done: keyring_store.restore_secret(profile, prior_api_key) keyring_store.restore_secret(_session_username(profile), prior_session) - _dump(prior_cfg) + config_store.dump(prior_cfg) def has_device_id() -> bool: """Whether the anonymous telemetry device id has been minted yet, without minting one — lets telemetry detect the true first run for its one-time collection disclosure.""" - return _load().device_id is not None + return config_store.load().device_id is not None def get_device_id() -> str: """A stable anonymous install id for telemetry: a random UUID minted locally on first use and persisted in config.toml. Carries nothing derivable from the machine or account.""" - cfg = _load() + cfg = config_store.load() if cfg.device_id is None: cfg.device_id = str(uuid.uuid4()) - _dump(cfg) + config_store.dump(cfg) return cfg.device_id def get_telemetry_enabled() -> bool | None: """The persisted telemetry choice: True/False if the user ran `aai telemetry enable/disable`, None if they never chose.""" - return _load().telemetry_enabled + return config_store.load().telemetry_enabled def set_telemetry_enabled(*, enabled: bool) -> None: - with _update() as cfg: + with config_store.update() as cfg: cfg.telemetry_enabled = enabled def get_update_cache() -> tuple[float | None, str | None]: """The cached (last-check unix ts, latest version seen) for the update notifier.""" - cfg = _load() + cfg = config_store.load() return cfg.update_last_check, cfg.update_latest_version def set_update_cache(*, last_check: float, latest_version: str | None) -> None: """Persist the update-notifier cache. ``latest_version`` is None when the last fetch failed — the timestamp is still recorded so we don't re-spawn every run.""" - with _update() as cfg: + with config_store.update() as cfg: cfg.update_last_check = last_check cfg.update_latest_version = latest_version diff --git a/aai_cli/core/config_store.py b/aai_cli/core/config_store.py new file mode 100644 index 00000000..89776e34 --- /dev/null +++ b/aai_cli/core/config_store.py @@ -0,0 +1,190 @@ +"""The config.toml document schema and its atomic read-modify-write store. + +Factored out of ``config`` the same way ``keyring_store`` holds the keyring +access: ``config`` is the auth/profile facade, and this module is the layer +beneath it — the pydantic models that describe ``config.toml`` plus the +parse/cache/atomic-dump machinery that reads and writes the file. Keeping the +two apart means the "every write is a temp-file + atomic ``os.replace``" rule +is structural, and the facade reads as plain accessors over ``load``/``dump``. +""" + +from __future__ import annotations + +import contextlib +import io +import os +import tempfile +import time +import tomllib +from collections.abc import Callable, Generator +from pathlib import Path + +import platformdirs +import tomli_w +from pydantic import BaseModel, ConfigDict, Field, ValidationError + +from aai_cli.core.errors import CLIError + + +class Profile(BaseModel): + """A single profile's non-secret settings persisted in config.toml. + + ``extra="allow"`` so unknown keys written by a newer CLI survive a round-trip + through an older one instead of being silently dropped on the next ``dump``. + """ + + model_config = ConfigDict(extra="allow") + + env: str | None = None + account_id: int | None = None + # Login email from AMS discovery; gates internal-environment access (see core.access). + email: str | None = None + + +class Config(BaseModel): + """The whole config.toml document. ``active_profile`` stays optional so we can + tell "never set" apart from the default and only adopt a new profile as active + when the file had none (matching the historic ``setdefault`` semantics).""" + + model_config = ConfigDict(extra="allow") + + active_profile: str | None = None + profiles: dict[str, Profile] = Field(default_factory=dict) + # Telemetry state (see telemetry.py): a random anonymous install id, and the + # persisted opt-out. None means "never chosen", which the opt-out model reads + # as enabled — distinct from an explicit False written by `assembly telemetry disable`. + device_id: str | None = None + telemetry_enabled: bool | None = None + update_last_check: float | None = None + update_latest_version: str | None = None + + +class StoredSession(BaseModel): + """The browser-login Stytch session blob persisted in the OS keyring as JSON.""" + + jwt: str + token: str = "" + + +def config_dir() -> Path: + return Path(platformdirs.user_config_dir("assemblyai")) + + +def config_file_path() -> Path: + """Where config.toml lives — surfaced by `assembly config path` so users can + find the file without knowing the platformdirs convention.""" + return _config_file() + + +def _config_file() -> Path: + return config_dir() / "config.toml" + + +def _validation_summary(exc: ValidationError) -> str: + """A compact, human-sized summary of a pydantic ValidationError. + + Just "field: reason" per problem — pydantic's full rendering dumps input values + and errors.pydantic.dev doc URLs, which is noise (and a potential value leak) + in a one-line CLI error. + """ + problems: list[str] = [] + # include_url/include_input=False keep pydantic's url/input fields out of each + # error dict, but this summary only reads loc + msg, so flipping them is an + # equivalent mutant (the rendered string is identical either way). + for err in exc.errors(include_url=False, include_input=False): # pragma: no mutate + loc = ".".join(str(part) for part in err["loc"]) or "top level" + problems.append(f"{loc}: {err['msg']}") + return "; ".join(problems) + + +# Parsed-config cache: path -> (mtime_ns, size, parsed). The several load() +# calls in one CLI invocation (profile, env, key resolution) then don't each +# re-read and re-parse the same unchanged TOML; dump() bumps the mtime, which +# invalidates it naturally. Callers mutate the returned Config (and persist_login +# snapshots one for rollback), so hand out deep copies, never the cached object. +_load_cache: dict[Path, tuple[int, int, Config]] = {} + +# Windows has no atomic replace-over-open like POSIX: while dump() swaps the temp +# file in (os.replace), a racing open on the same path transiently fails with +# PermissionError. Since readers are lock-free, both a lock-free reader's open and +# the writer's replace can lose that race, so each retries a few short backoffs to +# ride out the (sub-millisecond) rename window. POSIX replaces atomically and never +# raises here, so this only ever loops on Windows. +_SHARING_RETRIES = 5 # pragma: no mutate -- a ±1 change in the retry budget is equivalent +_SHARING_BACKOFF = 0.02 # pragma: no mutate -- a timing constant; any small value works + + +def _retry_on_sharing_violation[T](op: Callable[[], T]) -> T: + """Run a file op, retrying the transient PermissionError Windows raises when an + open and an os.replace race on the same path (see _SHARING_RETRIES).""" + for _ in range(_SHARING_RETRIES - 1): + try: + return op() + except PermissionError: + time.sleep(_SHARING_BACKOFF) + return op() # the last attempt's error (a genuine permission problem) propagates + + +def load() -> Config: + path = _config_file() + try: + stat = path.stat() + except OSError: + return Config() + cached = _load_cache.get(path) + if cached is not None and cached[0] == stat.st_mtime_ns and cached[1] == stat.st_size: + return cached[2].model_copy(deep=True) + raw = _retry_on_sharing_violation(path.read_bytes) + try: + data = tomllib.load(io.BytesIO(raw)) + except tomllib.TOMLDecodeError as exc: + raise CLIError( + f"Config file at {path} is not valid TOML ({exc}). Fix or delete it.", + error_type="invalid_config", + exit_code=2, + ) from exc + try: + cfg = Config.model_validate(data) + except ValidationError as exc: + raise CLIError( + f"Config file at {path} has an unexpected shape " + f"({_validation_summary(exc)}). Fix or delete it.", + error_type="invalid_config", + exit_code=2, + ) from exc + _load_cache[path] = (stat.st_mtime_ns, stat.st_size, cfg) + return cfg.model_copy(deep=True) + + +def dump(cfg: Config) -> None: + path = _config_file() + path.parent.mkdir(parents=True, exist_ok=True) + # Write to a sibling temp file and atomically rename over the target, so a crash + # (or concurrent reader) mid-write can never leave config.toml truncated into + # invalid TOML that load() would then reject. os.replace is atomic within a dir. + # exclude_none is required: TOML has no null and tomli_w rejects None values. + fd, tmp_name = tempfile.mkstemp(dir=path.parent, prefix=".config-", suffix=".toml.tmp") + tmp = Path(tmp_name) + try: + with os.fdopen(fd, "wb") as fh: + tomli_w.dump(cfg.model_dump(exclude_none=True), fh) + _retry_on_sharing_violation(lambda: tmp.replace(path)) + # The mtime/size key usually invalidates on its own, but drop the entry + # explicitly so a same-size rewrite on a coarse-mtime filesystem can't + # serve the pre-write parse. + _load_cache.pop(path, None) + except BaseException: + with contextlib.suppress(OSError): + tmp.unlink() + raise + + +@contextlib.contextmanager +def update() -> Generator[Config]: + """Run a config.toml read-modify-write: ``load`` -> mutate the yielded config -> + ``dump``. The dump runs on clean exit; an exception in the block propagates and + skips it. The atomic os.replace in ``dump`` keeps a reader from ever seeing a torn + file (writers and readers are otherwise unsynchronized: last write wins).""" + cfg = load() + yield cfg + dump(cfg) diff --git a/tests/conftest.py b/tests/conftest.py index f77c3cea..40f99de2 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -244,5 +244,5 @@ def internal_profile(monkeypatch): def tmp_config(monkeypatch, tmp_path): cfg_dir = tmp_path / "config" cfg_dir.mkdir() - monkeypatch.setattr("aai_cli.core.config.config_dir", lambda: cfg_dir) + monkeypatch.setattr("aai_cli.core.config_store.config_dir", lambda: cfg_dir) return cfg_dir diff --git a/tests/test_clip_exec.py b/tests/test_clip_exec.py index 2f2f6bf7..7c7a4cfa 100644 --- a/tests/test_clip_exec.py +++ b/tests/test_clip_exec.py @@ -18,6 +18,7 @@ from aai_cli.app import mediafile from aai_cli.app.context import AppState +from aai_cli.commands.clip import _cut as clip_cut from aai_cli.commands.clip import _exec as clip_exec from aai_cli.commands.clip._select import Segment from aai_cli.core import client, config @@ -53,7 +54,7 @@ def test_options_are_immutable(): @pytest.mark.parametrize( "instance", [ - clip_exec.WrittenClip(path=Path("x.mp4"), segment=Segment(0.0, 1.0)), + clip_cut.WrittenClip(path=Path("x.mp4"), segment=Segment(0.0, 1.0)), clip_exec._PipedTranscript(id="tr_1", utterances=[]), ], ids=["written_clip", "piped_transcript"], diff --git a/tests/test_config.py b/tests/test_config.py index e6ceb88a..2a333bbd 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -2,9 +2,13 @@ import pytest -from aai_cli.core import config, keyring_store +from aai_cli.core import config, config_store, keyring_store from aai_cli.core.errors import CLIError, NotAuthenticated +# Captured at import time, before the autouse tmp_config fixture patches the module +# attribute, so a test can exercise the real resolver rather than the temp-dir stub. +_REAL_CONFIG_DIR = config_store.config_dir + def test_set_and_get_api_key_roundtrip(): config.set_api_key("default", "sk_abc") @@ -250,8 +254,8 @@ def test_validation_summary_joins_multiple_problems(): from pydantic import ValidationError with pytest.raises(ValidationError) as exc: - config.Config.model_validate({"profiles": "oops", "active_profile": 7}) - summary = config._validation_summary(exc.value) + config_store.Config.model_validate({"profiles": "oops", "active_profile": 7}) + summary = config_store._validation_summary(exc.value) assert "profiles:" in summary assert "active_profile:" in summary assert "; " in summary # both problems, compactly joined @@ -262,15 +266,15 @@ def test_validation_summary_labels_rootlevel_problems(): from pydantic import ValidationError with pytest.raises(ValidationError) as exc: - config.Config.model_validate("not a table") - assert config._validation_summary(exc.value).startswith("top level: ") + config_store.Config.model_validate("not a table") + assert config_store._validation_summary(exc.value).startswith("top level: ") def test_dump_creates_missing_parent_directories(monkeypatch, tmp_path): # The config dir's parents may not exist yet (first run on a fresh machine); # _dump must create the whole chain (mkdir parents=True), not just the leaf. nested = tmp_path / "deeply" / "nested" / "config" - monkeypatch.setattr("aai_cli.core.config.config_dir", lambda: nested) + monkeypatch.setattr("aai_cli.core.config_store.config_dir", lambda: nested) config.set_api_key("default", "sk_abc") assert nested.is_dir() assert (nested / "config.toml").exists() @@ -301,9 +305,9 @@ def test_dump_cleans_up_temp_file_when_write_fails(tmp_config, monkeypatch): def boom(_data, _fh): raise RuntimeError("disk full") - monkeypatch.setattr(config.tomli_w, "dump", boom) + monkeypatch.setattr(config_store.tomli_w, "dump", boom) with pytest.raises(RuntimeError): - config._dump(config.Config()) + config_store.dump(config_store.Config()) names = sorted(p.name for p in tmp_config.iterdir()) assert names == ["config.toml"] # no .config-*.toml.tmp left behind @@ -315,13 +319,13 @@ def test_load_caches_parsed_config_between_calls(monkeypatch): # parse must happen once, with later reads served from the mtime-keyed cache. config.set_profile_env("default", "production") parses = {"n": 0} - real_load = config.tomllib.load + real_load = config_store.tomllib.load def counting_load(fh): parses["n"] += 1 return real_load(fh) - monkeypatch.setattr(config.tomllib, "load", counting_load) + monkeypatch.setattr(config_store.tomllib, "load", counting_load) assert config.get_active_profile() == "default" assert config.get_profile_env("default") == "production" assert parses["n"] == 1 @@ -339,13 +343,51 @@ def test_load_returns_independent_copies(): # Callers mutate the returned Config (persist_login snapshots one for rollback), # so the cache must hand out copies — a caller's mutation can't poison it. config.set_api_key("default", "sk_abc") - first = config._load() # cache miss: parses and primes the cache + first = config_store.load() # cache miss: parses and primes the cache first.active_profile = "mutated" first.profiles.clear() - assert config._load().active_profile == "default" - hit = config._load() # cache hit: must also be a *deep* copy + assert config_store.load().active_profile == "default" + hit = config_store.load() # cache hit: must also be a *deep* copy hit.profiles.clear() - assert "default" in config._load().profiles + assert "default" in config_store.load().profiles + + +def test_retry_on_sharing_violation_rides_out_transient_permission_errors(monkeypatch): + # Windows raises a transient PermissionError when an open races _dump's os.replace; + # the helper retries (after a short sleep) and returns once the op finally succeeds. + monkeypatch.setattr(config_store.time, "sleep", lambda _seconds: None) + calls = {"n": 0} + + def flaky() -> str: + calls["n"] += 1 + if calls["n"] < 3: + raise PermissionError("sharing violation") + return "ok" + + assert config_store._retry_on_sharing_violation(flaky) == "ok" + assert calls["n"] == 3 # two transient failures, then success + + +def test_retry_on_sharing_violation_propagates_a_persistent_error(monkeypatch): + # A genuine permission problem (every attempt fails) must surface, not loop forever: + # the budget is exactly _SHARING_RETRIES attempts (the loop plus the final try). + monkeypatch.setattr(config_store.time, "sleep", lambda _seconds: None) + calls = {"n": 0} + + def always_fails() -> str: + calls["n"] += 1 + raise PermissionError("locked") + + with pytest.raises(PermissionError): + config_store._retry_on_sharing_violation(always_fails) + assert calls["n"] == config_store._SHARING_RETRIES + + +def test_config_dir_resolves_under_the_platformdirs_app_location(): + # The autouse tmp_config fixture patches config_dir to a temp path for isolation; + # this exercises the real resolver, which anchors config.toml under the + # "assemblyai" application directory on whichever platform we're on. + assert _REAL_CONFIG_DIR().name == "assemblyai" def test_get_api_key_treats_broken_keyring_backend_as_no_key(monkeypatch): diff --git a/tests/test_config_command.py b/tests/test_config_command.py index d9db71af..8d90a84a 100644 --- a/tests/test_config_command.py +++ b/tests/test_config_command.py @@ -5,7 +5,7 @@ import pytest from typer.testing import CliRunner -from aai_cli.core import config +from aai_cli.core import config, config_store from aai_cli.core.errors import CLIError from aai_cli.main import app @@ -51,9 +51,9 @@ def test_set_active_profile_with_no_profiles_says_none_yet(): def test_config_file_path_is_the_toml_under_config_dir(): - path = config.config_file_path() + path = config_store.config_file_path() assert path.name == "config.toml" - assert path.parent == config.config_dir() + assert path.parent == config_store.config_dir() def test_bare_config_shows_subcommand_help(): @@ -70,13 +70,13 @@ def test_bare_config_shows_subcommand_help(): def test_config_path_prints_bare_path(): result = runner.invoke(app, ["config", "path"]) assert result.exit_code == 0 - assert result.output.strip() == str(config.config_file_path()) + assert result.output.strip() == str(config_store.config_file_path()) def test_config_path_json(): result = runner.invoke(app, ["config", "path", "--json"]) assert result.exit_code == 0 - assert json.loads(result.output) == {"path": str(config.config_file_path())} + assert json.loads(result.output) == {"path": str(config_store.config_file_path())} def test_config_path_works_even_when_config_is_corrupt(tmp_config): @@ -85,7 +85,7 @@ def test_config_path_works_even_when_config_is_corrupt(tmp_config): (tmp_config / "config.toml").write_text("this is = = not toml [[[") result = runner.invoke(app, ["config", "path"]) assert result.exit_code == 0 - assert result.output.strip() == str(config.config_file_path()) + assert result.output.strip() == str(config_store.config_file_path()) def test_config_list_still_errors_when_config_is_corrupt(tmp_config): @@ -113,7 +113,7 @@ def test_config_list_json_is_the_full_settings_object(): result = runner.invoke(app, ["config", "list", "--json"]) assert result.exit_code == 0 assert json.loads(result.output) == { - "path": str(config.config_file_path()), + "path": str(config_store.config_file_path()), "active_profile": "staging", "profiles": {"staging": "sandbox000"}, "telemetry_enabled": False, diff --git a/tests/test_onboard_command.py b/tests/test_onboard_command.py index 0de118fe..a318a4e9 100644 --- a/tests/test_onboard_command.py +++ b/tests/test_onboard_command.py @@ -282,9 +282,9 @@ def test_bare_aai_with_corrupt_config_shows_help_without_crashing( # A corrupt config.toml is deferred by the root callback. Bare `assembly` must still # print help cleanly — without the deferral guard, _profile_has_key -> resolve_api_key # re-raises invalid_config (which it doesn't catch) and the callback dumps a traceback. - from aai_cli.core import config + from aai_cli.core import config_store - monkeypatch.setattr(config, "config_dir", lambda: tmp_path) + monkeypatch.setattr(config_store, "config_dir", lambda: tmp_path) (tmp_path / "config.toml").write_text("this is not = valid = toml ][", encoding="utf-8") monkeypatch.delenv("ASSEMBLYAI_API_KEY", raising=False) monkeypatch.delenv("AAI_ENV", raising=False) diff --git a/tests/test_update_check.py b/tests/test_update_check.py index e8ba2d20..eb10a451 100644 --- a/tests/test_update_check.py +++ b/tests/test_update_check.py @@ -12,13 +12,13 @@ from rich.console import Console from aai_cli import __version__ -from aai_cli.core import config +from aai_cli.core import config, config_store from aai_cli.ui import output, theme, update_check def test_update_cache_roundtrips(tmp_path, monkeypatch): # Isolate config.toml to a temp dir so the real one is never touched. - monkeypatch.setattr(config, "config_dir", lambda: tmp_path) + monkeypatch.setattr(config_store, "config_dir", lambda: tmp_path) assert config.get_update_cache() == (None, None) @@ -27,7 +27,7 @@ def test_update_cache_roundtrips(tmp_path, monkeypatch): def test_update_cache_records_check_even_when_version_unknown(tmp_path, monkeypatch): - monkeypatch.setattr(config, "config_dir", lambda: tmp_path) + monkeypatch.setattr(config_store, "config_dir", lambda: tmp_path) # A failed fetch still records the timestamp (so we don't re-spawn every run) # but leaves the version unknown. config.set_update_cache(last_check=1718000001.0, latest_version=None) @@ -88,7 +88,7 @@ def _fake_response(payload: dict[str, object]) -> types.SimpleNamespace: def test_fetch_and_cache_writes_latest(tmp_path, monkeypatch): - monkeypatch.setattr(config, "config_dir", lambda: tmp_path) + monkeypatch.setattr(config_store, "config_dir", lambda: tmp_path) # Untyped capture dict (mirrors the pattern in tests/test_telemetry.py). captured = {} @@ -118,7 +118,7 @@ def test_check_interval_is_24_hours(): def test_fetch_and_cache_empty_tag_leaves_version_unknown(tmp_path, monkeypatch): - monkeypatch.setattr(config, "config_dir", lambda: tmp_path) + monkeypatch.setattr(config_store, "config_dir", lambda: tmp_path) # An empty tag_name is present but unusable: it must NOT be cached as a version. monkeypatch.setattr(httpx2, "get", lambda url, **kwargs: _fake_response({"tag_name": ""})) @@ -129,7 +129,7 @@ def test_fetch_and_cache_empty_tag_leaves_version_unknown(tmp_path, monkeypatch) def test_fetch_and_cache_swallows_errors_but_records_check(tmp_path, monkeypatch): - monkeypatch.setattr(config, "config_dir", lambda: tmp_path) + monkeypatch.setattr(config_store, "config_dir", lambda: tmp_path) def boom(url, **kwargs): raise httpx2.HTTPError("network down") @@ -153,7 +153,7 @@ def _tty_console() -> tuple[Console, io.StringIO]: def test_maybe_notify_shows_box_for_newer_cached_version(tmp_path, monkeypatch): - monkeypatch.setattr(config, "config_dir", lambda: tmp_path) + monkeypatch.setattr(config_store, "config_dir", lambda: tmp_path) # Cache says a newer version exists, checked just now (so no spawn). config.set_update_cache(last_check=time.time(), latest_version="9.9.9") monkeypatch.setattr(sys, "executable", "/opt/homebrew/Cellar/assembly/9/libexec/bin/python") @@ -172,7 +172,7 @@ def test_maybe_notify_shows_box_for_newer_cached_version(tmp_path, monkeypatch): def test_maybe_notify_silent_under_json(tmp_path, monkeypatch): - monkeypatch.setattr(config, "config_dir", lambda: tmp_path) + monkeypatch.setattr(config_store, "config_dir", lambda: tmp_path) config.set_update_cache(last_check=time.time(), latest_version="9.9.9") con, buf = _tty_console() monkeypatch.setattr(output, "error_console", con) @@ -183,7 +183,7 @@ def test_maybe_notify_silent_under_json(tmp_path, monkeypatch): def test_maybe_notify_silent_when_not_a_tty(tmp_path, monkeypatch): - monkeypatch.setattr(config, "config_dir", lambda: tmp_path) + monkeypatch.setattr(config_store, "config_dir", lambda: tmp_path) config.set_update_cache(last_check=time.time(), latest_version="9.9.9") buf = io.StringIO() con = theme.make_console(file=buf, force_terminal=False, _environ={}) # not a tty @@ -195,7 +195,7 @@ def test_maybe_notify_silent_when_not_a_tty(tmp_path, monkeypatch): def test_maybe_notify_silent_in_ci_and_when_disabled(tmp_path, monkeypatch): - monkeypatch.setattr(config, "config_dir", lambda: tmp_path) + monkeypatch.setattr(config_store, "config_dir", lambda: tmp_path) config.set_update_cache(last_check=time.time(), latest_version="9.9.9") con, buf = _tty_console() monkeypatch.setattr(output, "error_console", con) @@ -211,7 +211,7 @@ def test_maybe_notify_silent_in_ci_and_when_disabled(tmp_path, monkeypatch): def test_maybe_notify_no_box_when_cache_not_newer(tmp_path, monkeypatch): - monkeypatch.setattr(config, "config_dir", lambda: tmp_path) + monkeypatch.setattr(config_store, "config_dir", lambda: tmp_path) config.set_update_cache(last_check=time.time(), latest_version=__version__) # equal con, buf = _tty_console() monkeypatch.setattr(output, "error_console", con) @@ -223,7 +223,7 @@ def test_maybe_notify_no_box_when_cache_not_newer(tmp_path, monkeypatch): def test_maybe_notify_spawns_refresh_only_when_stale(tmp_path, monkeypatch): - monkeypatch.setattr(config, "config_dir", lambda: tmp_path) + monkeypatch.setattr(config_store, "config_dir", lambda: tmp_path) con, _buf = _tty_console() monkeypatch.setattr(output, "error_console", con) monkeypatch.delenv("CI", raising=False) @@ -278,7 +278,7 @@ def test_notice_appears_after_a_real_command(tmp_path, monkeypatch): from aai_cli.main import app - monkeypatch.setattr(config, "config_dir", lambda: tmp_path) + monkeypatch.setattr(config_store, "config_dir", lambda: tmp_path) config.set_update_cache(last_check=time.time(), latest_version="9.9.9") monkeypatch.setattr(sys, "executable", "/opt/homebrew/Cellar/assembly/9/libexec/bin/python") monkeypatch.delenv("CI", raising=False) @@ -300,7 +300,7 @@ def test_no_notice_under_json(tmp_path, monkeypatch): from aai_cli.main import app - monkeypatch.setattr(config, "config_dir", lambda: tmp_path) + monkeypatch.setattr(config_store, "config_dir", lambda: tmp_path) config.set_update_cache(last_check=time.time(), latest_version="9.9.9") con, buf = _tty_console() monkeypatch.setattr(output, "error_console", con) @@ -311,7 +311,7 @@ def test_no_notice_under_json(tmp_path, monkeypatch): def test_maybe_notify_generic_hint_when_install_unknown(tmp_path, monkeypatch): - monkeypatch.setattr(config, "config_dir", lambda: tmp_path) + monkeypatch.setattr(config_store, "config_dir", lambda: tmp_path) config.set_update_cache(last_check=time.time(), latest_version="9.9.9") monkeypatch.setattr(sys, "executable", "/usr/bin/python3") # unknown -> no command con, buf = _tty_console() @@ -328,7 +328,7 @@ def test_maybe_notify_generic_hint_when_install_unknown(tmp_path, monkeypatch): def test_fetch_and_cache_no_tag_leaves_version_unknown(tmp_path, monkeypatch): - monkeypatch.setattr(config, "config_dir", lambda: tmp_path) + monkeypatch.setattr(config_store, "config_dir", lambda: tmp_path) monkeypatch.setattr(httpx2, "get", lambda url, **kwargs: _fake_response({})) # no tag_name update_check.fetch_and_cache() @@ -339,7 +339,7 @@ def test_fetch_and_cache_no_tag_leaves_version_unknown(tmp_path, monkeypatch): def test_fetch_and_cache_swallows_cache_write_error(tmp_path, monkeypatch): - monkeypatch.setattr(config, "config_dir", lambda: tmp_path) + monkeypatch.setattr(config_store, "config_dir", lambda: tmp_path) monkeypatch.setattr(httpx2, "get", lambda url, **kwargs: _fake_response({"tag_name": "v0.4.0"})) def boom(**kwargs): @@ -351,7 +351,7 @@ def boom(**kwargs): def test_maybe_notify_swallows_config_errors(tmp_path, monkeypatch): - monkeypatch.setattr(config, "config_dir", lambda: tmp_path) + monkeypatch.setattr(config_store, "config_dir", lambda: tmp_path) con, _buf = _tty_console() monkeypatch.setattr(output, "error_console", con) monkeypatch.delenv("CI", raising=False) diff --git a/tests/test_update_prompt.py b/tests/test_update_prompt.py index b4987bda..b7e7a995 100644 --- a/tests/test_update_prompt.py +++ b/tests/test_update_prompt.py @@ -8,7 +8,7 @@ from rich.console import Console -from aai_cli.core import config, procs, stdio +from aai_cli.core import config, config_store, procs, stdio from aai_cli.ui import output, theme, update_check @@ -61,7 +61,7 @@ def fake_run(argv, *, check): def _enable_prompt(tmp_path, monkeypatch) -> io.StringIO: """Cache a newer version, a tty stderr console, and an interactive stdin so the update notice renders and the upgrade prompt is reachable.""" - monkeypatch.setattr(config, "config_dir", lambda: tmp_path) + monkeypatch.setattr(config_store, "config_dir", lambda: tmp_path) config.set_update_cache(last_check=time.time(), latest_version="9.9.9") con, buf = _tty_console() monkeypatch.setattr(output, "error_console", con)