From d8cb537cdb7333a2789d549408db0281e62e582d Mon Sep 17 00:00:00 2001 From: Sasa Junuzovic <44276455+microsasa@users.noreply.github.com> Date: Thu, 23 Apr 2026 14:13:25 -0700 Subject: [PATCH 1/2] fix: join fallback reader thread in _interactive_loop finally block (#1062) The _start_input_reader_thread daemon thread was not cleaned up when _interactive_loop returned. The thread would block on input() after the loop exited, violating the concurrency guideline that I/O resources must not outlive the function call. Changes: - _start_input_reader_thread now accepts a threading.Event stop sentinel and returns (queue, thread) so the caller can join it. - _reader checks stop.is_set() before each iteration and after input(). - _interactive_loop's finally block sets the stop event and joins the thread with a 1 s timeout. - Two new regression tests verify no zombie threads survive a single call and two consecutive calls don't share a fallback thread. Closes #1062 Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- src/copilot_usage/cli.py | 36 +++++++++---- tests/copilot_usage/test_cli.py | 89 +++++++++++++++++++++++++++++++++ 2 files changed, 116 insertions(+), 9 deletions(-) diff --git a/src/copilot_usage/cli.py b/src/copilot_usage/cli.py index 858eb06..589418d 100644 --- a/src/copilot_usage/cli.py +++ b/src/copilot_usage/cli.py @@ -181,34 +181,43 @@ def _show_session_by_index( _FALLBACK_EOF: Final[str] = "\x00__EOF__" -def _start_input_reader_thread() -> queue.SimpleQueue[str]: - """Start a daemon thread reading user input via ``input()`` into a queue. +def _start_input_reader_thread( + stop: threading.Event, +) -> tuple[queue.SimpleQueue[str], threading.Thread]: + """Start a daemon thread reading user input into a queue. Used by ``_interactive_loop`` as a fallback when ``_read_line_nonblocking`` raises ``ValueError``/``OSError`` (e.g. stdin is not selectable on Windows, or a detached stdin buffer in tests). Puts :data:`_FALLBACK_EOF` on the queue when stdin is - exhausted or an unrecoverable error occurs (see issue #1012). + exhausted, the *stop* event is set, or an unrecoverable error + occurs (see issues #1012, #1062). + + Returns the queue **and** the thread so the caller can join the + thread in its ``finally`` block. """ q: queue.SimpleQueue[str] = queue.SimpleQueue() def _reader() -> None: - while True: + while not stop.is_set(): try: - q.put(input().strip()) + line = input().strip() except (EOFError, KeyboardInterrupt): q.put(_FALLBACK_EOF) - break + return except Exception as exc: logger.warning( "Unexpected stdin error in fallback reader thread: {}", exc ) q.put(_FALLBACK_EOF) - break + return + if stop.is_set(): + return + q.put(line) thread = threading.Thread(target=_reader, daemon=True, name="input-fallback") thread.start() - return q + return q, thread def _read_line_nonblocking(timeout: float = 0.5) -> str | None: @@ -250,6 +259,8 @@ def _interactive_loop(path: Path | None) -> None: # in tests, or an unexpected runtime error). Initialised lazily on the # first error so auto-refresh via change_event keeps working. fallback_queue: queue.SimpleQueue[str] | None = None + fallback_stop: threading.Event | None = None + fallback_thread: threading.Thread | None = None sessions = get_all_sessions(path) session_index = _build_session_index(sessions) @@ -322,7 +333,10 @@ def _interactive_loop(path: Path | None) -> None: except (ValueError, OSError): # stdin not selectable — start a threaded input() reader # so change_event auto-refresh keeps working. - fallback_queue = _start_input_reader_thread() + fallback_stop = threading.Event() + fallback_queue, fallback_thread = _start_input_reader_thread( + fallback_stop, + ) line = None if line is None: @@ -383,6 +397,10 @@ def _interactive_loop(path: Path | None) -> None: pass # User pressed Ctrl-C; observer cleanup runs in finally finally: _stop_observer(observer) + if fallback_stop is not None: + fallback_stop.set() + if fallback_thread is not None: + fallback_thread.join(timeout=1.0) @click.group(invoke_without_command=True) diff --git a/tests/copilot_usage/test_cli.py b/tests/copilot_usage/test_cli.py index 81a3abc..1249ead 100644 --- a/tests/copilot_usage/test_cli.py +++ b/tests/copilot_usage/test_cli.py @@ -2361,6 +2361,95 @@ def _driver() -> None: ) +# --------------------------------------------------------------------------- +# Issue #1062 — fallback daemon thread must be joined on exit +# --------------------------------------------------------------------------- + + +def test_fallback_thread_joined_after_loop_exits( + tmp_path: Path, monkeypatch: Any +) -> None: + """No 'input-fallback' daemon threads survive after _interactive_loop returns. + + Regression for issue #1062: the fallback reader thread was not joined in + the finally block, leaving zombie threads that could consume stdin bytes + intended for a subsequent call. + """ + _write_session(tmp_path, "fb_join0-0000-0000-0000-000000000000", name="Join1") + + import copilot_usage.cli as cli_mod + + def _raise_value_error(timeout: float = 0.5) -> str | None: # noqa: ARG001 + raise ValueError("underlying buffer has been detached") + + monkeypatch.setattr(cli_mod, "_read_line_nonblocking", _raise_value_error) + + def _fake_input(*_args: str, **_kwargs: str) -> str: + return "q" + + monkeypatch.setattr("builtins.input", _fake_input) + + runner = CliRunner() + result = runner.invoke(main, ["--path", str(tmp_path)]) + assert result.exit_code == 0 + + alive = [ + t for t in threading.enumerate() if t.name == "input-fallback" and t.is_alive() + ] + assert alive == [], ( + f"input-fallback thread(s) still alive after _interactive_loop returned: {alive}" + ) + + +def test_fallback_thread_no_zombie_across_two_calls( + tmp_path: Path, monkeypatch: Any +) -> None: + """Two consecutive _interactive_loop calls must not share a fallback thread. + + Call 1 exits on 'q'; call 2 must complete without a zombie thread from + call 1 stealing its stdin bytes. + + Regression for issue #1062. + """ + _write_session(tmp_path, "fb_zomb0-0000-0000-0000-000000000000", name="Zombie") + + import copilot_usage.cli as cli_mod + + def _raise_value_error(timeout: float = 0.5) -> str | None: # noqa: ARG001 + raise ValueError("underlying buffer has been detached") + + monkeypatch.setattr(cli_mod, "_read_line_nonblocking", _raise_value_error) + + def _fake_input(*_args: str, **_kwargs: str) -> str: + return "q" + + monkeypatch.setattr("builtins.input", _fake_input) + + runner = CliRunner() + + # --- Call 1 --- + result1 = runner.invoke(main, ["--path", str(tmp_path)]) + assert result1.exit_code == 0 + + alive_after_first = [ + t for t in threading.enumerate() if t.name == "input-fallback" and t.is_alive() + ] + assert alive_after_first == [], ( + "input-fallback thread(s) still alive after first call" + ) + + # --- Call 2 --- + result2 = runner.invoke(main, ["--path", str(tmp_path)]) + assert result2.exit_code == 0 + + alive_after_second = [ + t for t in threading.enumerate() if t.name == "input-fallback" and t.is_alive() + ] + assert alive_after_second == [], ( + "input-fallback thread(s) still alive after second call" + ) + + # --------------------------------------------------------------------------- # Issue #329 — observer=None when session_path doesn't exist # --------------------------------------------------------------------------- From f61f3cb5b79bda08901b2bc6c9a9cecafc26f0e5 Mon Sep 17 00:00:00 2001 From: Sasa Junuzovic <44276455+microsasa@users.noreply.github.com> Date: Thu, 23 Apr 2026 14:35:53 -0700 Subject: [PATCH 2/2] fix: address review comments - Restructure fallback reader thread with request/ack pattern so it never re-enters input() after producing a line (addresses blocking shutdown race). - Enqueue _FALLBACK_EOF on all stop paths so the docstring contract is accurate. - Add logger.warning in teardown if thread does not exit within timeout. - Strengthen regression tests with blocking fake input() to surface re-entry regressions. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- src/copilot_usage/cli.py | 52 +++++++++++--- tests/copilot_usage/test_cli.py | 116 +++++++++++++++++++++++--------- 2 files changed, 125 insertions(+), 43 deletions(-) diff --git a/src/copilot_usage/cli.py b/src/copilot_usage/cli.py index 589418d..42d2619 100644 --- a/src/copilot_usage/cli.py +++ b/src/copilot_usage/cli.py @@ -183,23 +183,37 @@ def _show_session_by_index( def _start_input_reader_thread( stop: threading.Event, + need_input: threading.Event, ) -> tuple[queue.SimpleQueue[str], threading.Thread]: - """Start a daemon thread reading user input into a queue. + """Start a daemon thread that reads user input on request. - Used by ``_interactive_loop`` as a fallback when - ``_read_line_nonblocking`` raises ``ValueError``/``OSError`` (e.g. - stdin is not selectable on Windows, or a detached stdin buffer in - tests). Puts :data:`_FALLBACK_EOF` on the queue when stdin is - exhausted, the *stop* event is set, or an unrecoverable error - occurs (see issues #1012, #1062). + Uses a request/ack pattern: the thread waits for *need_input* to be + set before each ``input()`` call, ensuring it is never blocked inside + ``input()`` when the caller signals *stop*. The caller sets + *need_input* once for each line it wants; the thread clears the event + after waking. - Returns the queue **and** the thread so the caller can join the - thread in its ``finally`` block. + Puts :data:`_FALLBACK_EOF` on the queue when stdin is exhausted, + *stop* is signalled, or an unrecoverable error occurs (see issues + #1012, #1062). + + Returns the queue and the thread so the caller can join the thread + in its ``finally`` block. """ q: queue.SimpleQueue[str] = queue.SimpleQueue() def _reader() -> None: - while not stop.is_set(): + while True: + # Wait for the main loop to request a line, checking stop + # periodically so teardown is prompt. + while not need_input.is_set(): + if stop.wait(timeout=0.1): + q.put(_FALLBACK_EOF) + return + need_input.clear() + if stop.is_set(): + q.put(_FALLBACK_EOF) + return try: line = input().strip() except (EOFError, KeyboardInterrupt): @@ -212,6 +226,7 @@ def _reader() -> None: q.put(_FALLBACK_EOF) return if stop.is_set(): + q.put(_FALLBACK_EOF) return q.put(line) @@ -261,6 +276,8 @@ def _interactive_loop(path: Path | None) -> None: fallback_queue: queue.SimpleQueue[str] | None = None fallback_stop: threading.Event | None = None fallback_thread: threading.Thread | None = None + fallback_need_input: threading.Event | None = None + request_next: bool = False sessions = get_all_sessions(path) session_index = _build_session_index(sessions) @@ -318,6 +335,9 @@ def _interactive_loop(path: Path | None) -> None: # Non-blocking stdin read if fallback_queue is not None: + if request_next and fallback_need_input is not None: + fallback_need_input.set() + request_next = False try: line = fallback_queue.get(timeout=0.5) except queue.Empty: @@ -325,6 +345,7 @@ def _interactive_loop(path: Path | None) -> None: else: if line == _FALLBACK_EOF: break + request_next = True else: try: line = _read_line_nonblocking(timeout=0.5) @@ -334,8 +355,10 @@ def _interactive_loop(path: Path | None) -> None: # stdin not selectable — start a threaded input() reader # so change_event auto-refresh keeps working. fallback_stop = threading.Event() + fallback_need_input = threading.Event() + fallback_need_input.set() # Request the first line. fallback_queue, fallback_thread = _start_input_reader_thread( - fallback_stop, + fallback_stop, fallback_need_input, ) line = None @@ -399,8 +422,15 @@ def _interactive_loop(path: Path | None) -> None: _stop_observer(observer) if fallback_stop is not None: fallback_stop.set() + if fallback_need_input is not None: + fallback_need_input.set() if fallback_thread is not None: fallback_thread.join(timeout=1.0) + if fallback_thread.is_alive(): + logger.warning( + "input-fallback thread did not exit within 1 s; " + "it may be blocked in input()" + ) @click.group(invoke_without_command=True) diff --git a/tests/copilot_usage/test_cli.py b/tests/copilot_usage/test_cli.py index 1249ead..4603a6d 100644 --- a/tests/copilot_usage/test_cli.py +++ b/tests/copilot_usage/test_cli.py @@ -2371,9 +2371,10 @@ def test_fallback_thread_joined_after_loop_exits( ) -> None: """No 'input-fallback' daemon threads survive after _interactive_loop returns. - Regression for issue #1062: the fallback reader thread was not joined in - the finally block, leaving zombie threads that could consume stdin bytes - intended for a subsequent call. + The fake ``input()`` blocks on a second call to surface regressions: if + the reader re-enters ``input()`` after producing the quit command the + test hangs. With the request/ack pattern introduced in issue #1062 the + second call is never reached. """ _write_session(tmp_path, "fb_join0-0000-0000-0000-000000000000", name="Join1") @@ -2384,21 +2385,47 @@ def _raise_value_error(timeout: float = 0.5) -> str | None: # noqa: ARG001 monkeypatch.setattr(cli_mod, "_read_line_nonblocking", _raise_value_error) + second_input_started = threading.Event() + release_blocked_input = threading.Event() + input_calls = 0 + def _fake_input(*_args: str, **_kwargs: str) -> str: + nonlocal input_calls + input_calls += 1 + if input_calls == 1: + return "q" + + # Reaching here means the reader re-entered input() — block so + # the test times out rather than silently passing. + second_input_started.set() + assert release_blocked_input.wait(timeout=1.0), ( + "test did not release blocked fallback input() call" + ) return "q" monkeypatch.setattr("builtins.input", _fake_input) runner = CliRunner() - result = runner.invoke(main, ["--path", str(tmp_path)]) - assert result.exit_code == 0 + try: + result = runner.invoke(main, ["--path", str(tmp_path)]) + assert result.exit_code == 0 - alive = [ - t for t in threading.enumerate() if t.name == "input-fallback" and t.is_alive() - ] - assert alive == [], ( - f"input-fallback thread(s) still alive after _interactive_loop returned: {alive}" - ) + alive = [ + t + for t in threading.enumerate() + if t.name == "input-fallback" and t.is_alive() + ] + assert alive == [], ( + f"input-fallback thread(s) still alive after _interactive_loop " + f"returned: {alive}" + ) + # The reader should not re-enter input() after producing the quit + # command — the request/ack pattern prevents re-entry. + assert not second_input_started.is_set(), ( + "reader thread re-entered input() after producing quit command" + ) + finally: + release_blocked_input.set() def test_fallback_thread_no_zombie_across_two_calls( @@ -2406,8 +2433,9 @@ def test_fallback_thread_no_zombie_across_two_calls( ) -> None: """Two consecutive _interactive_loop calls must not share a fallback thread. - Call 1 exits on 'q'; call 2 must complete without a zombie thread from - call 1 stealing its stdin bytes. + Each ``_interactive_loop`` invocation creates its own fallback reader, + and each reader's ``input()`` returns ``"q"`` once. A subsequent + ``input()`` call per reader blocks to surface regressions. Regression for issue #1062. """ @@ -2420,34 +2448,58 @@ def _raise_value_error(timeout: float = 0.5) -> str | None: # noqa: ARG001 monkeypatch.setattr(cli_mod, "_read_line_nonblocking", _raise_value_error) + blocked = threading.Event() + release = threading.Event() + input_calls = 0 + def _fake_input(*_args: str, **_kwargs: str) -> str: + nonlocal input_calls + input_calls += 1 + # Each _interactive_loop invocation triggers one input() call + # (calls 1 and 2). Any further call means a re-entry bug. + if input_calls <= 2: + return "q" + blocked.set() + assert release.wait(timeout=1.0), ( + "test did not release blocked fallback input() call" + ) return "q" monkeypatch.setattr("builtins.input", _fake_input) runner = CliRunner() - # --- Call 1 --- - result1 = runner.invoke(main, ["--path", str(tmp_path)]) - assert result1.exit_code == 0 - - alive_after_first = [ - t for t in threading.enumerate() if t.name == "input-fallback" and t.is_alive() - ] - assert alive_after_first == [], ( - "input-fallback thread(s) still alive after first call" - ) + try: + # --- Call 1 --- + result1 = runner.invoke(main, ["--path", str(tmp_path)]) + assert result1.exit_code == 0 + + alive_after_first = [ + t + for t in threading.enumerate() + if t.name == "input-fallback" and t.is_alive() + ] + assert alive_after_first == [], ( + "input-fallback thread(s) still alive after first call" + ) - # --- Call 2 --- - result2 = runner.invoke(main, ["--path", str(tmp_path)]) - assert result2.exit_code == 0 + # --- Call 2 --- + result2 = runner.invoke(main, ["--path", str(tmp_path)]) + assert result2.exit_code == 0 - alive_after_second = [ - t for t in threading.enumerate() if t.name == "input-fallback" and t.is_alive() - ] - assert alive_after_second == [], ( - "input-fallback thread(s) still alive after second call" - ) + alive_after_second = [ + t + for t in threading.enumerate() + if t.name == "input-fallback" and t.is_alive() + ] + assert alive_after_second == [], ( + "input-fallback thread(s) still alive after second call" + ) + assert not blocked.is_set(), ( + "reader thread re-entered input() unexpectedly" + ) + finally: + release.set() # ---------------------------------------------------------------------------