diff --git a/src/copilot_usage/cli.py b/src/copilot_usage/cli.py index 858eb06..42d2619 100644 --- a/src/copilot_usage/cli.py +++ b/src/copilot_usage/cli.py @@ -181,34 +181,58 @@ def _show_session_by_index( _FALLBACK_EOF: Final[str] = "\x00__EOF__" -def _start_input_reader_thread() -> queue.SimpleQueue[str]: - """Start a daemon thread reading user input via ``input()`` into a queue. - - Used by ``_interactive_loop`` as a fallback when - ``_read_line_nonblocking`` raises ``ValueError``/``OSError`` (e.g. - stdin is not selectable on Windows, or a detached stdin buffer in - tests). Puts :data:`_FALLBACK_EOF` on the queue when stdin is - exhausted or an unrecoverable error occurs (see issue #1012). +def _start_input_reader_thread( + stop: threading.Event, + need_input: threading.Event, +) -> tuple[queue.SimpleQueue[str], threading.Thread]: + """Start a daemon thread that reads user input on request. + + Uses a request/ack pattern: the thread waits for *need_input* to be + set before each ``input()`` call, ensuring it is never blocked inside + ``input()`` when the caller signals *stop*. The caller sets + *need_input* once for each line it wants; the thread clears the event + after waking. + + Puts :data:`_FALLBACK_EOF` on the queue when stdin is exhausted, + *stop* is signalled, or an unrecoverable error occurs (see issues + #1012, #1062). + + Returns the queue and the thread so the caller can join the thread + in its ``finally`` block. """ q: queue.SimpleQueue[str] = queue.SimpleQueue() def _reader() -> None: while True: + # Wait for the main loop to request a line, checking stop + # periodically so teardown is prompt. + while not need_input.is_set(): + if stop.wait(timeout=0.1): + q.put(_FALLBACK_EOF) + return + need_input.clear() + if stop.is_set(): + q.put(_FALLBACK_EOF) + return try: - q.put(input().strip()) + line = input().strip() except (EOFError, KeyboardInterrupt): q.put(_FALLBACK_EOF) - break + return except Exception as exc: logger.warning( "Unexpected stdin error in fallback reader thread: {}", exc ) q.put(_FALLBACK_EOF) - break + return + if stop.is_set(): + q.put(_FALLBACK_EOF) + return + q.put(line) thread = threading.Thread(target=_reader, daemon=True, name="input-fallback") thread.start() - return q + return q, thread def _read_line_nonblocking(timeout: float = 0.5) -> str | None: @@ -250,6 +274,10 @@ def _interactive_loop(path: Path | None) -> None: # in tests, or an unexpected runtime error). Initialised lazily on the # first error so auto-refresh via change_event keeps working. fallback_queue: queue.SimpleQueue[str] | None = None + fallback_stop: threading.Event | None = None + fallback_thread: threading.Thread | None = None + fallback_need_input: threading.Event | None = None + request_next: bool = False sessions = get_all_sessions(path) session_index = _build_session_index(sessions) @@ -307,6 +335,9 @@ def _interactive_loop(path: Path | None) -> None: # Non-blocking stdin read if fallback_queue is not None: + if request_next and fallback_need_input is not None: + fallback_need_input.set() + request_next = False try: line = fallback_queue.get(timeout=0.5) except queue.Empty: @@ -314,6 +345,7 @@ def _interactive_loop(path: Path | None) -> None: else: if line == _FALLBACK_EOF: break + request_next = True else: try: line = _read_line_nonblocking(timeout=0.5) @@ -322,7 +354,12 @@ def _interactive_loop(path: Path | None) -> None: except (ValueError, OSError): # stdin not selectable — start a threaded input() reader # so change_event auto-refresh keeps working. - fallback_queue = _start_input_reader_thread() + fallback_stop = threading.Event() + fallback_need_input = threading.Event() + fallback_need_input.set() # Request the first line. + fallback_queue, fallback_thread = _start_input_reader_thread( + fallback_stop, fallback_need_input, + ) line = None if line is None: @@ -383,6 +420,17 @@ def _interactive_loop(path: Path | None) -> None: pass # User pressed Ctrl-C; observer cleanup runs in finally finally: _stop_observer(observer) + if fallback_stop is not None: + fallback_stop.set() + if fallback_need_input is not None: + fallback_need_input.set() + if fallback_thread is not None: + fallback_thread.join(timeout=1.0) + if fallback_thread.is_alive(): + logger.warning( + "input-fallback thread did not exit within 1 s; " + "it may be blocked in input()" + ) @click.group(invoke_without_command=True) diff --git a/tests/copilot_usage/test_cli.py b/tests/copilot_usage/test_cli.py index 81a3abc..4603a6d 100644 --- a/tests/copilot_usage/test_cli.py +++ b/tests/copilot_usage/test_cli.py @@ -2361,6 +2361,147 @@ def _driver() -> None: ) +# --------------------------------------------------------------------------- +# Issue #1062 — fallback daemon thread must be joined on exit +# --------------------------------------------------------------------------- + + +def test_fallback_thread_joined_after_loop_exits( + tmp_path: Path, monkeypatch: Any +) -> None: + """No 'input-fallback' daemon threads survive after _interactive_loop returns. + + The fake ``input()`` blocks on a second call to surface regressions: if + the reader re-enters ``input()`` after producing the quit command the + test hangs. With the request/ack pattern introduced in issue #1062 the + second call is never reached. + """ + _write_session(tmp_path, "fb_join0-0000-0000-0000-000000000000", name="Join1") + + import copilot_usage.cli as cli_mod + + def _raise_value_error(timeout: float = 0.5) -> str | None: # noqa: ARG001 + raise ValueError("underlying buffer has been detached") + + monkeypatch.setattr(cli_mod, "_read_line_nonblocking", _raise_value_error) + + second_input_started = threading.Event() + release_blocked_input = threading.Event() + input_calls = 0 + + def _fake_input(*_args: str, **_kwargs: str) -> str: + nonlocal input_calls + input_calls += 1 + if input_calls == 1: + return "q" + + # Reaching here means the reader re-entered input() — block so + # the test times out rather than silently passing. + second_input_started.set() + assert release_blocked_input.wait(timeout=1.0), ( + "test did not release blocked fallback input() call" + ) + return "q" + + monkeypatch.setattr("builtins.input", _fake_input) + + runner = CliRunner() + try: + result = runner.invoke(main, ["--path", str(tmp_path)]) + assert result.exit_code == 0 + + alive = [ + t + for t in threading.enumerate() + if t.name == "input-fallback" and t.is_alive() + ] + assert alive == [], ( + f"input-fallback thread(s) still alive after _interactive_loop " + f"returned: {alive}" + ) + # The reader should not re-enter input() after producing the quit + # command — the request/ack pattern prevents re-entry. + assert not second_input_started.is_set(), ( + "reader thread re-entered input() after producing quit command" + ) + finally: + release_blocked_input.set() + + +def test_fallback_thread_no_zombie_across_two_calls( + tmp_path: Path, monkeypatch: Any +) -> None: + """Two consecutive _interactive_loop calls must not share a fallback thread. + + Each ``_interactive_loop`` invocation creates its own fallback reader, + and each reader's ``input()`` returns ``"q"`` once. A subsequent + ``input()`` call per reader blocks to surface regressions. + + Regression for issue #1062. + """ + _write_session(tmp_path, "fb_zomb0-0000-0000-0000-000000000000", name="Zombie") + + import copilot_usage.cli as cli_mod + + def _raise_value_error(timeout: float = 0.5) -> str | None: # noqa: ARG001 + raise ValueError("underlying buffer has been detached") + + monkeypatch.setattr(cli_mod, "_read_line_nonblocking", _raise_value_error) + + blocked = threading.Event() + release = threading.Event() + input_calls = 0 + + def _fake_input(*_args: str, **_kwargs: str) -> str: + nonlocal input_calls + input_calls += 1 + # Each _interactive_loop invocation triggers one input() call + # (calls 1 and 2). Any further call means a re-entry bug. + if input_calls <= 2: + return "q" + blocked.set() + assert release.wait(timeout=1.0), ( + "test did not release blocked fallback input() call" + ) + return "q" + + monkeypatch.setattr("builtins.input", _fake_input) + + runner = CliRunner() + + try: + # --- Call 1 --- + result1 = runner.invoke(main, ["--path", str(tmp_path)]) + assert result1.exit_code == 0 + + alive_after_first = [ + t + for t in threading.enumerate() + if t.name == "input-fallback" and t.is_alive() + ] + assert alive_after_first == [], ( + "input-fallback thread(s) still alive after first call" + ) + + # --- Call 2 --- + result2 = runner.invoke(main, ["--path", str(tmp_path)]) + assert result2.exit_code == 0 + + alive_after_second = [ + t + for t in threading.enumerate() + if t.name == "input-fallback" and t.is_alive() + ] + assert alive_after_second == [], ( + "input-fallback thread(s) still alive after second call" + ) + assert not blocked.is_set(), ( + "reader thread re-entered input() unexpectedly" + ) + finally: + release.set() + + # --------------------------------------------------------------------------- # Issue #329 — observer=None when session_path doesn't exist # ---------------------------------------------------------------------------