Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 5 additions & 38 deletions tests/test_concurrency.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@
1. ``core.config`` persists ``config.toml`` with a temp-file + atomic ``os.replace``
(`config._dump`) so a reader never observes a truncated file. Writers and readers are
otherwise unsynchronized (last write wins), and on Windows the replace window is ridden
out by a small retry (`config._retry_on_sharing_violation`). These tests pin the at-rest
atomicity under thread contention and that retry helper.
out by a small retry (`config._retry_on_sharing_violation`). These tests pin that retry
helper. (A multi-thread RMW stress test once lived here too, but it manufactured
Windows-only os.replace sharing-violation contention no single-user CLI produces and was
chronically flaky/hanging on CI; the retry helper's unit tests below cover the real
behavior, and `os.replace` provides the at-rest atomicity structurally.)
2. ``streaming.StreamSession.on_turn`` runs on the SDK reader thread, and the
``--system-audio`` path drives two of those threads at once (`session._drive`). The
turn write is serialized by ``_callback_lock`` so two sources can't interleave a
Expand All @@ -17,7 +20,6 @@

import threading
import types
from concurrent.futures import ThreadPoolExecutor

import pytest

Expand Down Expand Up @@ -76,41 +78,6 @@ def op():
assert len(calls) == config._SHARING_RETRIES


# --- config.toml: atomic writes vs. lost updates -----------------------------------


def test_config_concurrent_writers_always_leave_a_valid_file(tmp_config):
# Many threads rewriting config.toml at once, plus a reader hammering it throughout:
# the temp-file + atomic os.replace in _dump means no writer and no reader ever sees
# a truncated/half-written file (which would surface as an invalid_config CLIError),
# the surviving value is exactly one writer's, and no .config-*.toml.tmp is left behind.
workers = 24
barrier = threading.Barrier(workers + 1) # writers + the reader, released together
stop = threading.Event()

def writer(i: int) -> None:
barrier.wait()
config.set_profile_env("default", f"sandbox{i:03d}")

def reader() -> None:
barrier.wait()
while not stop.is_set():
config.get_profile_env("default") # must never raise on a partial file

# future.result() re-raises any worker error in the main thread, so a truncated-file
# read (an invalid_config CLIError) fails the test cleanly instead of being swallowed.
with ThreadPoolExecutor(max_workers=workers + 1) as pool:
read_future = pool.submit(reader)
write_futures = [pool.submit(writer, i) for i in range(workers)]
for f in write_futures:
f.result()
stop.set()
read_future.result()

assert config.get_profile_env("default") in {f"sandbox{i:03d}" for i in range(workers)}
assert sorted(p.name for p in tmp_config.iterdir()) == ["config.toml"] # no temp leftover


# --- streaming: on_turn serialization under _callback_lock -------------------------


Expand Down
Loading