fix: join fallback reader thread in _interactive_loop finally block (#1062)#1064
fix: join fallback reader thread in _interactive_loop finally block (#1062)#1064
_interactive_loop finally block (#1062)#1064Conversation
…1062) The _start_input_reader_thread daemon thread was not cleaned up when _interactive_loop returned. The thread would block on input() after the loop exited, violating the concurrency guideline that I/O resources must not outlive the function call. Changes: - _start_input_reader_thread now accepts a threading.Event stop sentinel and returns (queue, thread) so the caller can join it. - _reader checks stop.is_set() before each iteration and after input(). - _interactive_loop's finally block sets the stop event and joins the thread with a 1 s timeout. - Two new regression tests verify no zombie threads survive a single call and two consecutive calls don't share a fallback thread. Closes #1062 Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
There was a problem hiding this comment.
Pull request overview
This PR addresses issue #1062 by attempting to ensure the _interactive_loop fallback input() reader thread is cleaned up on exit, aligning the interactive CLI with the repo’s concurrency/I/O lifetime guidelines.
Changes:
- Update
_start_input_reader_threadto accept a stopEventand return both the queue and thread so the caller can attempt cleanup. - Track fallback stop/thread as per-call locals in
_interactive_loopand join the fallback thread in thefinallyblock. - Add regression tests asserting no
input-fallbackthreads remain after one or two invocations.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 3 comments.
| File | Description |
|---|---|
| src/copilot_usage/cli.py | Adds stop/join plumbing for the fallback input reader thread used by _interactive_loop. |
| tests/copilot_usage/test_cli.py | Adds tests intended to detect zombie input-fallback threads after _interactive_loop exits. |
| def _reader() -> None: | ||
| while True: | ||
| while not stop.is_set(): | ||
| try: | ||
| q.put(input().strip()) | ||
| line = input().strip() | ||
| except (EOFError, KeyboardInterrupt): | ||
| q.put(_FALLBACK_EOF) | ||
| break | ||
| return | ||
| except Exception as exc: | ||
| logger.warning( | ||
| "Unexpected stdin error in fallback reader thread: {}", exc | ||
| ) | ||
| q.put(_FALLBACK_EOF) | ||
| break | ||
| return | ||
| if stop.is_set(): | ||
| return | ||
| q.put(line) |
There was a problem hiding this comment.
stop does not reliably stop this thread because the thread blocks inside input() and won't check the event again until input() returns. In the common case where the user enters q and then the reader loop immediately calls input() again, the finally block's set() + join(timeout=1.0) can time out and leave an input-fallback thread alive, which still violates the concurrency guideline this PR targets. Consider restructuring the fallback reader so it doesn't re-enter a blocking read after producing a line (e.g., one-shot reads or an explicit request/ack pattern), or implement a stoppable/polled stdin read for the fallback path (platform-specific if needed), and ensure teardown verifies the thread actually exited (warn/error if still alive).
There was a problem hiding this comment.
Good catch — the thread re-entering input() after producing a line is indeed the root of the shutdown race.
Restructured the fallback reader to use a request/ack pattern: the thread now waits for a need_input event before each input() call, so after producing a line it parks on the event rather than re-entering input(). The caller sets need_input only when it's ready for the next line (deferred until the next loop iteration after consuming the previous line).
Teardown sets stop then need_input (to wake the thread from its wait), joins with 1 s timeout, and logs a warning if the thread is still alive (covers the edge case where stop is set while the thread is mid-input(), e.g. Ctrl-C).
Warning
⚠️ Firewall blocked 2 domains
The following domains were blocked by the firewall during workflow execution:
astral.shpypi.org
To allow these domains, add them to the network.allowed list in your workflow frontmatter:
network:
allowed:
- defaults
- "astral.sh"
- "pypi.org"See Network Configuration for more information.
Generated by Review Responder · ● 17.7M
| """Start a daemon thread reading user input into a queue. | ||
|
|
||
| Used by ``_interactive_loop`` as a fallback when | ||
| ``_read_line_nonblocking`` raises ``ValueError``/``OSError`` (e.g. | ||
| stdin is not selectable on Windows, or a detached stdin buffer in | ||
| tests). Puts :data:`_FALLBACK_EOF` on the queue when stdin is | ||
| exhausted or an unrecoverable error occurs (see issue #1012). | ||
| exhausted, the *stop* event is set, or an unrecoverable error | ||
| occurs (see issues #1012, #1062). | ||
|
|
||
| Returns the queue **and** the thread so the caller can join the | ||
| thread in its ``finally`` block. |
There was a problem hiding this comment.
The docstring says the fallback reader "Puts _FALLBACK_EOF on the queue when ... the stop event is set", but the implementation returns immediately when stop.is_set() without enqueueing _FALLBACK_EOF. Please either update the docstring to match the actual behavior, or enqueue the sentinel on stop so the contract is accurate.
There was a problem hiding this comment.
Fixed — all stop-triggered exit paths now enqueue _FALLBACK_EOF before returning, so the docstring contract ("Puts _FALLBACK_EOF on the queue when ... stop is signalled") is accurate. This includes the while not stop.is_set() loop guard at the top and the if stop.is_set() check after input() returns.
Warning
⚠️ Firewall blocked 2 domains
The following domains were blocked by the firewall during workflow execution:
astral.shpypi.org
To allow these domains, add them to the network.allowed list in your workflow frontmatter:
network:
allowed:
- defaults
- "astral.sh"
- "pypi.org"See Network Configuration for more information.
Generated by Review Responder · ● 17.7M
| def _fake_input(*_args: str, **_kwargs: str) -> str: | ||
| return "q" | ||
|
|
||
| monkeypatch.setattr("builtins.input", _fake_input) | ||
|
|
||
| runner = CliRunner() | ||
|
|
||
| # --- Call 1 --- | ||
| result1 = runner.invoke(main, ["--path", str(tmp_path)]) | ||
| assert result1.exit_code == 0 | ||
|
|
||
| alive_after_first = [ | ||
| t for t in threading.enumerate() if t.name == "input-fallback" and t.is_alive() | ||
| ] | ||
| assert alive_after_first == [], ( | ||
| "input-fallback thread(s) still alive after first call" | ||
| ) | ||
|
|
||
| # --- Call 2 --- | ||
| result2 = runner.invoke(main, ["--path", str(tmp_path)]) | ||
| assert result2.exit_code == 0 | ||
|
|
||
| alive_after_second = [ | ||
| t for t in threading.enumerate() if t.name == "input-fallback" and t.is_alive() | ||
| ] | ||
| assert alive_after_second == [], ( | ||
| "input-fallback thread(s) still alive after second call" | ||
| ) |
There was a problem hiding this comment.
These regression tests monkeypatch input() to return immediately (always 'q'), which doesn't exercise the shutdown race where the reader thread has already looped and is blocked in a subsequent input() call when _interactive_loop exits. As written, the tests can pass even if the thread would remain alive in real interactive use. Consider adjusting the fake input() to block on a second call (after delivering the quit command) and assert the fallback thread still terminates (or that teardown detects and fails if it cannot).
| def _fake_input(*_args: str, **_kwargs: str) -> str: | |
| return "q" | |
| monkeypatch.setattr("builtins.input", _fake_input) | |
| runner = CliRunner() | |
| # --- Call 1 --- | |
| result1 = runner.invoke(main, ["--path", str(tmp_path)]) | |
| assert result1.exit_code == 0 | |
| alive_after_first = [ | |
| t for t in threading.enumerate() if t.name == "input-fallback" and t.is_alive() | |
| ] | |
| assert alive_after_first == [], ( | |
| "input-fallback thread(s) still alive after first call" | |
| ) | |
| # --- Call 2 --- | |
| result2 = runner.invoke(main, ["--path", str(tmp_path)]) | |
| assert result2.exit_code == 0 | |
| alive_after_second = [ | |
| t for t in threading.enumerate() if t.name == "input-fallback" and t.is_alive() | |
| ] | |
| assert alive_after_second == [], ( | |
| "input-fallback thread(s) still alive after second call" | |
| ) | |
| second_input_started = threading.Event() | |
| release_blocked_input = threading.Event() | |
| input_calls = 0 | |
| def _fake_input(*_args: str, **_kwargs: str) -> str: | |
| nonlocal input_calls | |
| input_calls += 1 | |
| if input_calls == 1: | |
| return "q" | |
| second_input_started.set() | |
| assert release_blocked_input.wait(timeout=1.0), ( | |
| "test did not release blocked fallback input() call" | |
| ) | |
| return "q" | |
| monkeypatch.setattr("builtins.input", _fake_input) | |
| runner = CliRunner() | |
| try: | |
| # --- Call 1 --- | |
| result1 = runner.invoke(main, ["--path", str(tmp_path)]) | |
| assert result1.exit_code == 0 | |
| assert second_input_started.wait(timeout=1.0), ( | |
| "input-fallback thread never reached a blocked subsequent input() call" | |
| ) | |
| alive_after_first = [ | |
| t | |
| for t in threading.enumerate() | |
| if t.name == "input-fallback" and t.is_alive() | |
| ] | |
| assert alive_after_first == [], ( | |
| "input-fallback thread(s) still alive after first call" | |
| ) | |
| # --- Call 2 --- | |
| result2 = runner.invoke(main, ["--path", str(tmp_path)]) | |
| assert result2.exit_code == 0 | |
| alive_after_second = [ | |
| t | |
| for t in threading.enumerate() | |
| if t.name == "input-fallback" and t.is_alive() | |
| ] | |
| assert alive_after_second == [], ( | |
| "input-fallback thread(s) still alive after second call" | |
| ) | |
| finally: | |
| release_blocked_input.set() |
There was a problem hiding this comment.
Agreed that the tests were too forgiving. Updated both tests so _fake_input blocks on a subsequent call (via threading.Event.wait) — if the reader thread re-enters input() after producing the quit command, the test hangs rather than silently passing.
With the request/ack pattern the reader never re-enters input() after producing a line, so the tests also assert not second_input_started.is_set() (and not blocked.is_set() for the zombie test) to verify the design invariant.
The suggested test code assumed the old loop-and-block design, so I adapted it to match the request/ack pattern while keeping the same spirit: a blocking trap that catches regressions.
Warning
⚠️ Firewall blocked 2 domains
The following domains were blocked by the firewall during workflow execution:
astral.shpypi.org
To allow these domains, add them to the network.allowed list in your workflow frontmatter:
network:
allowed:
- defaults
- "astral.sh"
- "pypi.org"See Network Configuration for more information.
Generated by Review Responder · ● 17.7M
- Restructure fallback reader thread with request/ack pattern so it never re-enters input() after producing a line (addresses blocking shutdown race). - Enqueue _FALLBACK_EOF on all stop paths so the docstring contract is accurate. - Add logger.warning in teardown if thread does not exit within timeout. - Strengthen regression tests with blocking fake input() to surface re-entry regressions. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
|
Commit pushed:
|
|
❌ Pipeline orchestrator: CI still failing after ci-fixer attempt. Marking as stuck for human review. |
Summary
The
_start_input_reader_threaddaemon thread in_interactive_loopwas not cleaned up when the loop returned. The thread would remain blocked oninput()after exit, violating the concurrency guideline that I/O resources must not outlive the function call that uses them.Changes
src/copilot_usage/cli.py_start_input_reader_threadnow accepts athreading.Eventstop sentinel and returns(queue, thread)so the caller can join the thread._readerfunction checksstop.is_set()before each iteration and afterinput()returns, allowing the thread to exit promptly when signalled._interactive_looptracksfallback_stopandfallback_threadas per-call locals (no module-level state).finallyblock now sets the stop event and joins the thread with a 1-second timeout:tests/copilot_usage/test_cli.pyTwo regression tests added:
test_fallback_thread_joined_after_loop_exits— verifies noinput-fallbackdaemon threads are alive after_interactive_loopreturns.test_fallback_thread_no_zombie_across_two_calls— calls_interactive_looptwice in the same process (with monkeypatched_read_line_nonblockingthat raisesValueErrorto force the fallback path) and asserts no zombie threads from the first call survive to interfere with the second.Closes #1062
Warning
The following domains were blocked by the firewall during workflow execution:
astral.shconda.anaconda.orgfiles.pythonhosted.orgindex.crates.iomicro.mamba.pmobjects.githubusercontent.compypi.orgreleaseassets.githubusercontent.comrepo.anaconda.comTo allow these domains, add them to the
network.allowedlist in your workflow frontmatter:See Network Configuration for more information.