From 8a17457ca56c534d697ec46f1768c286200f4199 Mon Sep 17 00:00:00 2001 From: Claude Date: Wed, 17 Jun 2026 04:38:35 +0000 Subject: [PATCH 1/2] test: cover config write atomicity and streaming turn-lock concurrency MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add tests/test_concurrency.py for the two genuinely contended runtime paths that lacked direct coverage: - core.config: a 24-writer + reader stress test pins that _dump's temp-file + atomic os.replace never lets a concurrent reader observe a truncated file, and a deterministic two-"process" interleave documents the flip side it does not solve — lost updates, since there is no cross-process write lock. - streaming.StreamSession.on_turn: a deterministic test proves the render + save + meta critical section runs under _callback_lock (a second reader thread cannot acquire the lock mid-save), plus a two-source stress test that every finalized turn lands in the saved transcript exactly once and intact. Test-only change; no source modified. Co-Authored-By: Claude Opus 4.8 (1M context) Claude-Session: https://claude.ai/code/session_01FueRkrQHWSfpPf1KNaZskX --- tests/test_concurrency.py | 178 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 178 insertions(+) create mode 100644 tests/test_concurrency.py diff --git a/tests/test_concurrency.py b/tests/test_concurrency.py new file mode 100644 index 0000000..4a56c8d --- /dev/null +++ b/tests/test_concurrency.py @@ -0,0 +1,178 @@ +"""Concurrency tests for the two code paths that are genuinely contended at runtime. + +Two real concurrency surfaces exist in the CLI, and neither was directly exercised: + +1. ``core.config`` persists ``config.toml`` with a temp-file + atomic ``os.replace`` + (`config._dump`), the guard that lets concurrent CLI invocations and readers never + observe a truncated file. These tests pin that atomicity guarantee under real thread + contention, and document the flip side it deliberately does *not* solve (lost updates, + since there is no cross-process lock). +2. ``streaming.StreamSession.on_turn`` runs on the SDK reader thread, and the + ``--system-audio`` path drives two of those threads at once (`session._drive`). The + turn write is serialized by ``_callback_lock`` so two sources can't interleave a + partial line into the saved transcript. These tests pin that mutual exclusion. +""" + +from __future__ import annotations + +import threading +import types +from concurrent.futures import ThreadPoolExecutor + +from aai_cli.core import config + +# --- config.toml: atomic writes vs. lost updates ----------------------------------- + + +def test_config_concurrent_writers_always_leave_a_valid_file(tmp_config): + # Many threads rewriting config.toml at once, plus a reader hammering it throughout: + # the temp-file + atomic os.replace in _dump means no writer and no reader ever sees + # a truncated/half-written file (which would surface as an invalid_config CLIError), + # the surviving value is exactly one writer's, and no .config-*.toml.tmp is left behind. + workers = 24 + barrier = threading.Barrier(workers + 1) # writers + the reader, released together + stop = threading.Event() + + def writer(i: int) -> None: + barrier.wait() + config.set_profile_env("default", f"sandbox{i:03d}") + + def reader() -> None: + barrier.wait() + while not stop.is_set(): + config.get_profile_env("default") # must never raise on a partial file + + # future.result() re-raises any worker error in the main thread, so a truncated-file + # read (an invalid_config CLIError) fails the test cleanly instead of being swallowed. + with ThreadPoolExecutor(max_workers=workers + 1) as pool: + read_future = pool.submit(reader) + write_futures = [pool.submit(writer, i) for i in range(workers)] + for f in write_futures: + f.result() + stop.set() + read_future.result() + + assert config.get_profile_env("default") in {f"sandbox{i:03d}" for i in range(workers)} + assert sorted(p.name for p in tmp_config.iterdir()) == ["config.toml"] # no temp leftover + + +def test_concurrent_read_modify_write_drops_an_interleaved_update(tmp_config): + # Documents a known limitation, not a bug fixed here: config.py has no cross-process + # write lock (a filelock would add a dependency; fcntl is POSIX-only), so two + # `assembly` processes that each load -> mutate -> dump concurrently lose one update — + # last writer wins. Atomic os.replace prevents *corruption*, never *lost updates*. + # If config.py ever grows a write lock, this assertion is the one that should flip. + config.set_profile_env("seed", "sandbox000") + proc_a = config._load() # both "processes" observe the same starting config + proc_b = config._load() + + proc_a.profiles["alpha"] = config.Profile(env="sandbox111") + config._dump(proc_a) # process A commits its new profile + + proc_b.profiles["beta"] = config.Profile(env="sandbox222") + config._dump(proc_b) # process B, unaware of A's write, clobbers it + + names = set(config.list_profiles()) + assert "beta" in names # the last writer's update survives + assert "alpha" not in names # A's interleaved update was silently lost + assert "seed" in names # present in both snapshots, so it persists either way + + +# --- streaming: on_turn serialization under _callback_lock ------------------------- + + +def _turn(text: str): + # No source/speaker label, so _finalized_turn_line writes the bare text as its line. + return types.SimpleNamespace(transcript=text, end_of_turn=True, speaker_label=None) + + +def _stream_session(out_path): + import io + + from aai_cli.streaming.render import StreamRenderer + from aai_cli.streaming.session import StreamSession + + return StreamSession( + api_key="sk", + base_flags={}, + overrides=None, + config_file=None, + renderer=StreamRenderer(json_mode=True, out=io.StringIO()), + follow=None, + llm_prompts=[], + model="m", + max_tokens=1, + save_transcript=out_path, + ) + + +def test_on_turn_holds_callback_lock_across_the_save(tmp_path): + # on_turn must run its render + save + meta critical section under _callback_lock, so a + # second SDK reader thread (--system-audio runs two) can't interleave into it. Proven + # deterministically: pin a worker mid-save (inside write_turn) and assert no other + # thread can take the lock while it's there. Drop the `with self._callback_lock` and + # the lock is free during the save -> acquire(blocking=False) would succeed and fail this. + from aai_cli.streaming.transcript import TranscriptWriter + + out = tmp_path / "transcript.txt" + session = _stream_session(out) + entered = threading.Event() + release = threading.Event() + + class _BlockingWriter(TranscriptWriter): + # A real writer whose write_turn blocks, so the test can hold on_turn's critical + # section open and probe the lock. It opens the real handle (closed below). + def __init__(self, path) -> None: + super().__init__(path) + self.lines: list[str] = [] + + def write_turn(self, line: str) -> None: + self.lines.append(line) + entered.set() # we're now inside _save_line, holding _callback_lock + assert release.wait(timeout=5) # hold the lock open until the test releases us + + writer = _BlockingWriter(out) + session._transcript_writer = writer + worker = threading.Thread(target=lambda: session.on_turn(_turn("first"))) + worker.start() + + assert entered.wait(timeout=5) # worker reached the critical section + # While the worker is inside it, no other thread can acquire _callback_lock. + assert session._callback_lock.acquire(blocking=False) is False + + release.set() + worker.join(timeout=5) + assert not worker.is_alive() + assert writer.lines == ["first"] + writer.close() + + +def test_concurrent_turns_record_every_line_without_interleaving(tmp_path): + # Two source threads firing finalized turns at once (the --system-audio shape) must + # each land in the saved transcript exactly once and intact: _callback_lock serializes + # the writes so no line is lost or interleaved with another's characters. + from aai_cli.streaming.transcript import TranscriptWriter + + out = tmp_path / "transcript.txt" + session = _stream_session(out) + session._transcript_writer = TranscriptWriter(out) + + threads_count, per_thread = 2, 50 + barrier = threading.Barrier(threads_count) # start the bursts simultaneously + + def worker(tid: int) -> None: + barrier.wait() + for n in range(per_thread): + session.on_turn(_turn(f"t{tid}-{n}")) + + threads = [threading.Thread(target=worker, args=(i,)) for i in range(threads_count)] + for t in threads: + t.start() + for t in threads: + t.join() + session._transcript_writer.close() + + lines = out.read_text(encoding="utf-8").splitlines() + expected = {f"t{i}-{n}" for i in range(threads_count) for n in range(per_thread)} + assert len(lines) == len(expected) # every turn written exactly once (none lost) + assert set(lines) == expected # none interleaved/corrupted into an unexpected line From 3008168b4424cf4430a846df83a471c975650d52 Mon Sep 17 00:00:00 2001 From: Claude Date: Wed, 17 Jun 2026 05:04:12 +0000 Subject: [PATCH 2/2] feat(config): serialize config.toml writes with a cross-process lock config.toml's mutating helpers do a read-modify-write (_load -> mutate -> _dump). The atomic os.replace in _dump already keeps a *reader* from seeing a torn file, but two `assembly` processes writing concurrently would still lose an update (both read the same config; the second dump clobbers the first). Wrap every read-modify-write in a cross-process filelock so concurrent writers no longer clobber each other; readers stay lock-free. filelock graduates from a dev-only transitive dependency to a declared runtime dependency. - add core/locking.py: generic cached cross-process FileLock helper (one instance per path, so nested acquisitions stay reentrant). - add core/config_lock.py: config.toml's lock_path/write_lock/locked/update helpers (kept out of config.py to stay under the file-length gate; _load and _dump are injected so the module avoids config's private API). - route set_*/clear_session/get_device_id/persist_login through the lock. Update tests/test_concurrency.py: the lost-update test becomes a no-loss test (16 concurrent writers each adding a distinct profile all survive), plus unit tests pinning the lock path, per-dir rebuild, and that the lock is held across the dump. Co-Authored-By: Claude Opus 4.8 (1M context) Claude-Session: https://claude.ai/code/session_01FueRkrQHWSfpPf1KNaZskX --- aai_cli/AGENTS.md | 6 +- aai_cli/core/config.py | 129 ++++++++++++++++++------------------ aai_cli/core/config_lock.py | 51 ++++++++++++++ aai_cli/core/locking.py | 43 ++++++++++++ pyproject.toml | 5 ++ tests/test_concurrency.py | 75 +++++++++++++++------ uv.lock | 2 + 7 files changed, 221 insertions(+), 90 deletions(-) create mode 100644 aai_cli/core/config_lock.py create mode 100644 aai_cli/core/locking.py diff --git a/aai_cli/AGENTS.md b/aai_cli/AGENTS.md index 50303b8..bcde3ae 100644 --- a/aai_cli/AGENTS.md +++ b/aai_cli/AGENTS.md @@ -34,8 +34,8 @@ between layers is enforced — higher may import lower, never the reverse: `config_builder`, `environments`, `env`, `errors`, `llm`, `telemetry`, `debuglog`, `remotefs`, `sync_stt`, `hotkey`, `ws`, `youtube`, `wer`, `argscan`, `jsonshape`, `timeparse`, `microphone`, `procs`, `stdio`, - `choices`. Contract 4 also forbids `rich` here, so "no Rich below the UI - layer" is structural. + `choices`, `locking`, `config_lock`. Contract 4 also forbids `rich` here, so + "no Rich below the UI layer" is structural. Three things sit *beside* the stack, intentionally unlisted in the layers contract: @@ -139,7 +139,7 @@ heavily-reworked commands with long bodies; small commands keep the inline ### Cross-cutting state (resolution order matters) - **`app/context.py`** — `AppState` (profile, env) is attached to the Typer context in the root `@app.callback()`. `run_command` is the standard command wrapper. -- **`core/config.py`** — profiles persisted in `config.toml` (via `platformdirs`); the **API key lives only in the OS keyring** (`KEYRING_SERVICE = "assemblyai-cli"`), never in a dotfile. Key resolution order: `--api-key` flag (validation paths only) → `ASSEMBLYAI_API_KEY` env → keyring. **Run commands deliberately expose no `--api-key` flag** so keys can't leak into `ps`/shell history. +- **`core/config.py`** — profiles persisted in `config.toml` (via `platformdirs`); the **API key lives only in the OS keyring** (`KEYRING_SERVICE = "assemblyai-cli"`), never in a dotfile. Key resolution order: `--api-key` flag (validation paths only) → `ASSEMBLYAI_API_KEY` env → keyring. **Run commands deliberately expose no `--api-key` flag** so keys can't leak into `ps`/shell history. Every `config.toml` write is a read-modify-write (`_load` → mutate → `_dump`): `_dump` is a temp-file + atomic `os.replace` (a reader never sees a torn file), and the whole RMW runs under a cross-process `filelock` (`config_lock.update`/`.locked`, built on `core/locking.py`) so two concurrent `assembly` processes can't lose each other's updates. Readers stay lock-free. The lock helpers live in `config_lock.py` (not `config.py`) only to keep the latter under the file-length gate; reuse one cached `FileLock` per path so nested writers (`persist_login`) stay reentrant. - **`core/environments.py`** — a frozen `Environment` (api_base, streaming_host, llm_gateway_base, ams_base, stytch_*). `DEFAULT_ENV` is **`production`**; use `--sandbox` (or `--env sandbox000` / `AAI_ENV`) to target the sandbox. The active environment is a process-global set once at startup; precedence: `--env` → `AAI_ENV` → profile's stored env → default. A credential is only valid against the environment that minted it. - **`core/client.py`** — thin wrappers over the `assemblyai` SDK (`transcribe`, `list_transcripts`, `stream_audio`, etc.). It normalizes SDK exceptions: auth failures become a single clean `auth_failure()` `CLIError`; everything else becomes `APIError`. New SDK calls should follow this try/except shape. - **`core/errors.py`** — the `CLIError` hierarchy (each with `error_type` + `exit_code`). `ui/output.py` emits errors to **stderr**; stdout stays clean for pipelines. `--json` switches to machine-readable output; it is never auto-enabled — `output.resolve_json()` deliberately keeps human text the default even when piped or agent-run. diff --git a/aai_cli/core/config.py b/aai_cli/core/config.py index cf5312a..7f7f93c 100644 --- a/aai_cli/core/config.py +++ b/aai_cli/core/config.py @@ -14,7 +14,7 @@ import tomli_w from pydantic import BaseModel, ConfigDict, Field, ValidationError -from aai_cli.core import debuglog, env +from aai_cli.core import config_lock, debuglog, env from aai_cli.core.errors import CLIError, NotAuthenticated KEYRING_SERVICE = "assemblyai-cli" @@ -190,17 +190,16 @@ def set_active_profile(name: str) -> None: with no hint why, so the typo is rejected here with the known names listed. """ validate_profile(name) - cfg = _load() - if name not in cfg.profiles: - known = ", ".join(sorted(cfg.profiles)) or "none yet" - raise CLIError( - f"No profile named {name!r} (known: {known}).", - error_type="invalid_profile", - exit_code=2, - suggestion=f"Create it first: assembly --profile {name} login", - ) - cfg.active_profile = name - _dump(cfg) + with config_lock.update(_load, _dump) as cfg: + if name not in cfg.profiles: + known = ", ".join(sorted(cfg.profiles)) or "none yet" + raise CLIError( + f"No profile named {name!r} (known: {known}).", + error_type="invalid_profile", + exit_code=2, + suggestion=f"Create it first: assembly --profile {name} login", + ) + cfg.active_profile = name def _keyring_set(username: str, secret: str) -> None: @@ -240,11 +239,10 @@ def _keyring_restore(username: str, prior: str | None) -> None: def set_api_key(profile: str, api_key: str) -> None: validate_profile(profile) _keyring_set(profile, api_key) - cfg = _load() - cfg.profiles.setdefault(profile, Profile()) - if cfg.active_profile is None: - cfg.active_profile = profile - _dump(cfg) + with config_lock.update(_load, _dump) as cfg: + cfg.profiles.setdefault(profile, Profile()) + if cfg.active_profile is None: + cfg.active_profile = profile def _keyring_get(username: str) -> str | None: @@ -288,9 +286,8 @@ def get_profile_env(profile: str) -> str | None: def set_profile_env(profile: str, env: str) -> None: """Bind a backend environment to a profile so its key and hosts stay matched.""" validate_profile(profile) - cfg = _load() - cfg.profiles.setdefault(profile, Profile()).env = env - _dump(cfg) + with config_lock.update(_load, _dump) as cfg: + cfg.profiles.setdefault(profile, Profile()).env = env def get_profile_email(profile: str) -> str | None: @@ -302,9 +299,8 @@ def get_profile_email(profile: str) -> str | None: def set_profile_email(profile: str, email: str) -> None: """Persist the login email for a profile (gates internal-environment access).""" validate_profile(profile) - cfg = _load() - cfg.profiles.setdefault(profile, Profile()).email = email - _dump(cfg) + with config_lock.update(_load, _dump) as cfg: + cfg.profiles.setdefault(profile, Profile()).email = email def clear_api_key(profile: str) -> None: @@ -332,9 +328,8 @@ def set_session(profile: str, *, session_jwt: str, session_token: str, account_i _session_username(profile), StoredSession(jwt=session_jwt, token=session_token).model_dump_json(), ) - cfg = _load() - cfg.profiles.setdefault(profile, Profile()).account_id = account_id - _dump(cfg) + with config_lock.update(_load, _dump) as cfg: + cfg.profiles.setdefault(profile, Profile()).account_id = account_id def get_session(profile: str) -> dict[str, str] | None: @@ -358,11 +353,12 @@ def get_account_id(profile: str) -> int | None: def clear_session(profile: str) -> None: with contextlib.suppress(keyring.errors.KeyringError): keyring.delete_password(KEYRING_SERVICE, _session_username(profile)) - cfg = _load() - prof = cfg.profiles.get(profile) - if prof and prof.account_id is not None: - prof.account_id = None - _dump(cfg) + with config_lock.locked(): + cfg = _load() + prof = cfg.profiles.get(profile) + if prof and prof.account_id is not None: + prof.account_id = None + _dump(cfg) def persist_login( @@ -385,28 +381,32 @@ def persist_login( restored best-effort. """ validate_profile(profile) - prior_api_key = _keyring_get(profile) - prior_session = _keyring_get(_session_username(profile)) - prior_cfg = _load() - done = False - try: - set_api_key(profile, api_key) - set_profile_env(profile, env) - set_session( - profile, - session_jwt=session_jwt, - session_token=session_token, - account_id=account_id, - ) - # Within the same atomic rollback so the sandbox gate can't read stale identity. - if email is not None: - set_profile_email(profile, email) - done = True - finally: - if not done: - _keyring_restore(profile, prior_api_key) - _keyring_restore(_session_username(profile), prior_session) - _dump(prior_cfg) + # Hold the write lock across the whole snapshot -> writes -> rollback so a concurrent + # writer can't slip a change between the snapshot and a rollback dump. The set_* + # helpers re-take the same (reentrant) lock, so the nesting is safe. + with config_lock.locked(): + prior_api_key = _keyring_get(profile) + prior_session = _keyring_get(_session_username(profile)) + prior_cfg = _load() + done = False + try: + set_api_key(profile, api_key) + set_profile_env(profile, env) + set_session( + profile, + session_jwt=session_jwt, + session_token=session_token, + account_id=account_id, + ) + # Within the same atomic rollback so the sandbox gate can't read stale identity. + if email is not None: + set_profile_email(profile, email) + done = True + finally: + if not done: + _keyring_restore(profile, prior_api_key) + _keyring_restore(_session_username(profile), prior_session) + _dump(prior_cfg) def has_device_id() -> bool: @@ -420,11 +420,12 @@ def get_device_id() -> str: """A stable anonymous install id for telemetry: a random UUID minted locally on first use and persisted in config.toml. Carries nothing derivable from the machine or account.""" - cfg = _load() - if cfg.device_id is None: - cfg.device_id = str(uuid.uuid4()) - _dump(cfg) - return cfg.device_id + with config_lock.locked(): + cfg = _load() + if cfg.device_id is None: + cfg.device_id = str(uuid.uuid4()) + _dump(cfg) + return cfg.device_id def get_telemetry_enabled() -> bool | None: @@ -434,9 +435,8 @@ def get_telemetry_enabled() -> bool | None: def set_telemetry_enabled(*, enabled: bool) -> None: - cfg = _load() - cfg.telemetry_enabled = enabled - _dump(cfg) + with config_lock.update(_load, _dump) as cfg: + cfg.telemetry_enabled = enabled def get_update_cache() -> tuple[float | None, str | None]: @@ -448,10 +448,9 @@ def get_update_cache() -> tuple[float | None, str | None]: def set_update_cache(*, last_check: float, latest_version: str | None) -> None: """Persist the update-notifier cache. ``latest_version`` is None when the last fetch failed — the timestamp is still recorded so we don't re-spawn every run.""" - cfg = _load() - cfg.update_last_check = last_check - cfg.update_latest_version = latest_version - _dump(cfg) + with config_lock.update(_load, _dump) as cfg: + cfg.update_last_check = last_check + cfg.update_latest_version = latest_version def resolve_api_key(*, profile: str | None = None, api_key_flag: str | None = None) -> str: diff --git a/aai_cli/core/config_lock.py b/aai_cli/core/config_lock.py new file mode 100644 index 0000000..70d1912 --- /dev/null +++ b/aai_cli/core/config_lock.py @@ -0,0 +1,51 @@ +"""The cross-process write lock for config.toml's read-modify-write. + +config.toml's mutating helpers do ``_load -> mutate -> _dump``. The atomic ``os.replace`` +in ``_dump`` keeps a *reader* from ever seeing a torn file, but two writers racing would +still lose an update — both read the same config, and the second dump clobbers the first's +change. These helpers serialize the whole read-modify-write across processes via a sibling +lock file; readers stay lock-free (an older-but-valid parse is fine). + +Kept out of ``config.py`` only to keep that module under the file-length gate; it reaches +back into ``config`` (its ``_load``/``_dump``/``config_dir``) lazily, at call time. +""" + +from __future__ import annotations + +import contextlib +from collections.abc import Callable, Generator +from pathlib import Path + +import filelock + +from aai_cli.core import config, locking + + +def lock_path() -> Path: + return config.config_dir() / "config.toml.lock" + + +def write_lock() -> filelock.FileLock: + """The shared cross-process write lock guarding config.toml.""" + return locking.file_lock(lock_path()) + + +@contextlib.contextmanager +def locked() -> Generator[None]: + """Hold the config write lock for the duration of the block.""" + with locking.locked(lock_path()): + yield + + +@contextlib.contextmanager +def update( + load: Callable[[], config.Config], dump: Callable[[config.Config], None] +) -> Generator[config.Config]: + """Run a load -> mutate -> dump under the write lock so a concurrent writer can't lose + the update. Yields the loaded config; dumps it on clean exit (an exception in the block + propagates and skips the dump). ``load``/``dump`` are injected by config.py so this + module stays clear of its private helpers.""" + with locked(): + cfg = load() + yield cfg + dump(cfg) diff --git a/aai_cli/core/locking.py b/aai_cli/core/locking.py new file mode 100644 index 0000000..dbfb33b --- /dev/null +++ b/aai_cli/core/locking.py @@ -0,0 +1,43 @@ +"""Cross-process advisory file locking, backed by ``filelock``. + +Used to serialize a read-modify-write of a shared on-disk file (``config.toml``) across +concurrent ``assembly`` processes, so two of them can't lose each other's updates +(last-writer-wins). The atomic-rename write keeps a *reader* from ever seeing a torn +file; this lock is what keeps two *writers* from clobbering each other. +""" + +from __future__ import annotations + +import contextlib +from collections.abc import Generator +from pathlib import Path + +import filelock + +# One cached lock instance per lock-file path. filelock already serializes threads within +# this process (and other processes via the lock file), and reusing a single instance per +# path makes nested acquisitions reentrant — distinct instances on one path deadlock, which +# is what a re-entrant caller (e.g. a snapshot+rollback that calls smaller writers) needs. +_lock_cache: dict[str, filelock.FileLock] = {} + + +def file_lock(path: Path) -> filelock.FileLock: + """The cached cross-process lock for ``path`` (created on first use).""" + key = str(path) + lock = _lock_cache.get(key) + if lock is None: + lock = filelock.FileLock(path) + _lock_cache[key] = lock + return lock + + +@contextlib.contextmanager +def locked(path: Path) -> Generator[None]: + """Hold the cross-process lock at ``path`` for the duration of the block. + + Creates the lock file's parent directory first — filelock won't, and the very first + write on a fresh machine targets a dir that may not exist yet. + """ + path.parent.mkdir(parents=True, exist_ok=True) + with file_lock(path): + yield diff --git a/pyproject.toml b/pyproject.toml index 7840422..8e04d4d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -65,6 +65,11 @@ dependencies = [ # lazily). Strips boilerplate down to the readable body; ships prebuilt wheels # (lxml included), so it adds no source-compile step to Homebrew bottling. "trafilatura>=2.1.0", + # Cross-process advisory lock around config.toml's read-modify-write (config.py), + # so two concurrent `assembly` processes can't lose each other's profile/telemetry + # updates (last-writer-wins). Pure-Python, no compiled deps; already arrived + # transitively via the dev toolchain, so the lock pins the same release. + "filelock>=3.16.0", ] [project.urls] diff --git a/tests/test_concurrency.py b/tests/test_concurrency.py index 4a56c8d..ad5647b 100644 --- a/tests/test_concurrency.py +++ b/tests/test_concurrency.py @@ -3,10 +3,11 @@ Two real concurrency surfaces exist in the CLI, and neither was directly exercised: 1. ``core.config`` persists ``config.toml`` with a temp-file + atomic ``os.replace`` - (`config._dump`), the guard that lets concurrent CLI invocations and readers never - observe a truncated file. These tests pin that atomicity guarantee under real thread - contention, and document the flip side it deliberately does *not* solve (lost updates, - since there is no cross-process lock). + (`config._dump`) so a reader never observes a truncated file, and serializes its + read-modify-write under a cross-process ``filelock`` (`config._write_lock`) so two + concurrent ``assembly`` processes can't lose each other's updates. These tests pin + both: the at-rest atomicity under thread contention, and that distinct concurrent + updates all survive (no last-writer-wins clobber). 2. ``streaming.StreamSession.on_turn`` runs on the SDK reader thread, and the ``--system-audio`` path drives two of those threads at once (`session._drive`). The turn write is serialized by ``_callback_lock`` so two sources can't interleave a @@ -19,7 +20,7 @@ import types from concurrent.futures import ThreadPoolExecutor -from aai_cli.core import config +from aai_cli.core import config, config_lock # --- config.toml: atomic writes vs. lost updates ----------------------------------- @@ -56,26 +57,56 @@ def reader() -> None: assert sorted(p.name for p in tmp_config.iterdir()) == ["config.toml"] # no temp leftover -def test_concurrent_read_modify_write_drops_an_interleaved_update(tmp_config): - # Documents a known limitation, not a bug fixed here: config.py has no cross-process - # write lock (a filelock would add a dependency; fcntl is POSIX-only), so two - # `assembly` processes that each load -> mutate -> dump concurrently lose one update — - # last writer wins. Atomic os.replace prevents *corruption*, never *lost updates*. - # If config.py ever grows a write lock, this assertion is the one that should flip. - config.set_profile_env("seed", "sandbox000") - proc_a = config._load() # both "processes" observe the same starting config - proc_b = config._load() +def test_concurrent_writers_do_not_lose_distinct_updates(tmp_config): + # The cross-process write lock makes the read-modify-write atomic, so many threads + # each adding a DISTINCT profile through the public API all survive. Without the lock + # the interleaved RMW would drop some (two writers _load the same config; the second + # _dump clobbers the first's new profile) — this is the lost-update race the lock closes. + workers = 16 + barrier = threading.Barrier(workers) # release all writers at once for max contention - proc_a.profiles["alpha"] = config.Profile(env="sandbox111") - config._dump(proc_a) # process A commits its new profile + def add(i: int) -> None: + barrier.wait() + config.set_profile_env(f"p{i:02d}", f"sandbox{i:03d}") + + with ThreadPoolExecutor(max_workers=workers) as pool: + for f in [pool.submit(add, i) for i in range(workers)]: + f.result() + + # Every concurrent update is present with its own value — none clobbered. + assert config.list_profiles() == {f"p{i:02d}": f"sandbox{i:03d}" for i in range(workers)} + + +def test_write_lock_targets_the_config_dir_lock_file(tmp_config): + # The lock guards config.toml via a sibling lock file in the same dir. + assert config_lock.write_lock().lock_file == str(tmp_config / "config.toml.lock") + + +def test_write_lock_rebuilds_when_config_dir_changes(monkeypatch, tmp_path): + # The instance is cached per path, but a changed config dir (the suite repoints it per + # test) must yield a fresh lock pointed at the new dir — not the stale one. + first = config_lock.write_lock() + moved = tmp_path / "moved" + moved.mkdir() + monkeypatch.setattr(config, "config_dir", lambda: moved) + second = config_lock.write_lock() + assert second is not first + assert second.lock_file == str(moved / "config.toml.lock") + + +def test_update_holds_the_write_lock_during_the_dump(tmp_config, monkeypatch): + # The mutate -> dump runs inside the lock: while _dump executes, the lock is held. + # (Drop the `with locked()` and is_locked would be False here.) + seen: dict[str, bool] = {} + real_dump = config._dump - proc_b.profiles["beta"] = config.Profile(env="sandbox222") - config._dump(proc_b) # process B, unaware of A's write, clobbers it + def spy_dump(cfg): + seen["locked"] = config_lock.write_lock().is_locked + return real_dump(cfg) - names = set(config.list_profiles()) - assert "beta" in names # the last writer's update survives - assert "alpha" not in names # A's interleaved update was silently lost - assert "seed" in names # present in both snapshots, so it persists either way + monkeypatch.setattr(config, "_dump", spy_dump) + config.set_profile_env("default", "sandbox000") + assert seen["locked"] is True # --- streaming: on_turn serialization under _callback_lock ------------------------- diff --git a/uv.lock b/uv.lock index e73c0e5..0a04be2 100644 --- a/uv.lock +++ b/uv.lock @@ -23,6 +23,7 @@ dependencies = [ { name = "assemblyai" }, { name = "audioop-lts", marker = "python_full_version >= '3.13'" }, { name = "feedparser" }, + { name = "filelock" }, { name = "fsspec" }, { name = "httpx2" }, { name = "jiwer" }, @@ -77,6 +78,7 @@ requires-dist = [ { name = "assemblyai", specifier = ">=0.64.4" }, { name = "audioop-lts", marker = "python_full_version >= '3.13'", specifier = ">=0.2" }, { name = "feedparser", specifier = ">=6.0.11" }, + { name = "filelock", specifier = ">=3.16.0" }, { name = "fsspec", specifier = ">=2026.4.0" }, { name = "httpx2", specifier = ">=2.0.0" }, { name = "jiwer", specifier = ">=4.0" },