Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 61 additions & 13 deletions src/copilot_usage/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,34 +181,58 @@ def _show_session_by_index(
_FALLBACK_EOF: Final[str] = "\x00__EOF__"


def _start_input_reader_thread() -> queue.SimpleQueue[str]:
"""Start a daemon thread reading user input via ``input()`` into a queue.

Used by ``_interactive_loop`` as a fallback when
``_read_line_nonblocking`` raises ``ValueError``/``OSError`` (e.g.
stdin is not selectable on Windows, or a detached stdin buffer in
tests). Puts :data:`_FALLBACK_EOF` on the queue when stdin is
exhausted or an unrecoverable error occurs (see issue #1012).
def _start_input_reader_thread(
stop: threading.Event,
need_input: threading.Event,
) -> tuple[queue.SimpleQueue[str], threading.Thread]:
"""Start a daemon thread that reads user input on request.

Uses a request/ack pattern: the thread waits for *need_input* to be
set before each ``input()`` call, ensuring it is never blocked inside
``input()`` when the caller signals *stop*. The caller sets
*need_input* once for each line it wants; the thread clears the event
after waking.

Puts :data:`_FALLBACK_EOF` on the queue when stdin is exhausted,
*stop* is signalled, or an unrecoverable error occurs (see issues
#1012, #1062).

Returns the queue and the thread so the caller can join the thread
in its ``finally`` block.
"""
q: queue.SimpleQueue[str] = queue.SimpleQueue()

def _reader() -> None:
while True:
# Wait for the main loop to request a line, checking stop
# periodically so teardown is prompt.
while not need_input.is_set():
if stop.wait(timeout=0.1):
q.put(_FALLBACK_EOF)
return
need_input.clear()
if stop.is_set():
q.put(_FALLBACK_EOF)
return
try:
q.put(input().strip())
line = input().strip()
except (EOFError, KeyboardInterrupt):
q.put(_FALLBACK_EOF)
break
return
except Exception as exc:
logger.warning(
"Unexpected stdin error in fallback reader thread: {}", exc
)
q.put(_FALLBACK_EOF)
break
return
if stop.is_set():
q.put(_FALLBACK_EOF)
return
q.put(line)
Comment on lines 205 to +231
Copy link

Copilot AI Apr 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

stop does not reliably stop this thread because the thread blocks inside input() and won't check the event again until input() returns. In the common case where the user enters q and then the reader loop immediately calls input() again, the finally block's set() + join(timeout=1.0) can time out and leave an input-fallback thread alive, which still violates the concurrency guideline this PR targets. Consider restructuring the fallback reader so it doesn't re-enter a blocking read after producing a line (e.g., one-shot reads or an explicit request/ack pattern), or implement a stoppable/polled stdin read for the fallback path (platform-specific if needed), and ensure teardown verifies the thread actually exited (warn/error if still alive).

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch — the thread re-entering input() after producing a line is indeed the root of the shutdown race.

Restructured the fallback reader to use a request/ack pattern: the thread now waits for a need_input event before each input() call, so after producing a line it parks on the event rather than re-entering input(). The caller sets need_input only when it's ready for the next line (deferred until the next loop iteration after consuming the previous line).

Teardown sets stop then need_input (to wake the thread from its wait), joins with 1 s timeout, and logs a warning if the thread is still alive (covers the edge case where stop is set while the thread is mid-input(), e.g. Ctrl-C).

Warning

⚠️ Firewall blocked 2 domains

The following domains were blocked by the firewall during workflow execution:

  • astral.sh
  • pypi.org

To allow these domains, add them to the network.allowed list in your workflow frontmatter:

network:
  allowed:
    - defaults
    - "astral.sh"
    - "pypi.org"

See Network Configuration for more information.

Generated by Review Responder · ● 17.7M


thread = threading.Thread(target=_reader, daemon=True, name="input-fallback")
thread.start()
return q
return q, thread


def _read_line_nonblocking(timeout: float = 0.5) -> str | None:
Expand Down Expand Up @@ -250,6 +274,10 @@ def _interactive_loop(path: Path | None) -> None:
# in tests, or an unexpected runtime error). Initialised lazily on the
# first error so auto-refresh via change_event keeps working.
fallback_queue: queue.SimpleQueue[str] | None = None
fallback_stop: threading.Event | None = None
fallback_thread: threading.Thread | None = None
fallback_need_input: threading.Event | None = None
request_next: bool = False

sessions = get_all_sessions(path)
session_index = _build_session_index(sessions)
Expand Down Expand Up @@ -307,13 +335,17 @@ def _interactive_loop(path: Path | None) -> None:

# Non-blocking stdin read
if fallback_queue is not None:
if request_next and fallback_need_input is not None:
fallback_need_input.set()
request_next = False
try:
line = fallback_queue.get(timeout=0.5)
except queue.Empty:
line = None
else:
if line == _FALLBACK_EOF:
break
request_next = True
else:
try:
line = _read_line_nonblocking(timeout=0.5)
Expand All @@ -322,7 +354,12 @@ def _interactive_loop(path: Path | None) -> None:
except (ValueError, OSError):
# stdin not selectable — start a threaded input() reader
# so change_event auto-refresh keeps working.
fallback_queue = _start_input_reader_thread()
fallback_stop = threading.Event()
fallback_need_input = threading.Event()
fallback_need_input.set() # Request the first line.
fallback_queue, fallback_thread = _start_input_reader_thread(
fallback_stop, fallback_need_input,
)
line = None

if line is None:
Expand Down Expand Up @@ -383,6 +420,17 @@ def _interactive_loop(path: Path | None) -> None:
pass # User pressed Ctrl-C; observer cleanup runs in finally
finally:
_stop_observer(observer)
if fallback_stop is not None:
fallback_stop.set()
if fallback_need_input is not None:
fallback_need_input.set()
if fallback_thread is not None:
fallback_thread.join(timeout=1.0)
if fallback_thread.is_alive():
logger.warning(
"input-fallback thread did not exit within 1 s; "
"it may be blocked in input()"
)


@click.group(invoke_without_command=True)
Expand Down
141 changes: 141 additions & 0 deletions tests/copilot_usage/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -2361,6 +2361,147 @@ def _driver() -> None:
)


# ---------------------------------------------------------------------------
# Issue #1062 — fallback daemon thread must be joined on exit
# ---------------------------------------------------------------------------


def test_fallback_thread_joined_after_loop_exits(
tmp_path: Path, monkeypatch: Any
) -> None:
"""No 'input-fallback' daemon threads survive after _interactive_loop returns.

The fake ``input()`` blocks on a second call to surface regressions: if
the reader re-enters ``input()`` after producing the quit command the
test hangs. With the request/ack pattern introduced in issue #1062 the
second call is never reached.
"""
_write_session(tmp_path, "fb_join0-0000-0000-0000-000000000000", name="Join1")

import copilot_usage.cli as cli_mod

def _raise_value_error(timeout: float = 0.5) -> str | None: # noqa: ARG001
raise ValueError("underlying buffer has been detached")

monkeypatch.setattr(cli_mod, "_read_line_nonblocking", _raise_value_error)

second_input_started = threading.Event()
release_blocked_input = threading.Event()
input_calls = 0

def _fake_input(*_args: str, **_kwargs: str) -> str:
nonlocal input_calls
input_calls += 1
if input_calls == 1:
return "q"

# Reaching here means the reader re-entered input() — block so
# the test times out rather than silently passing.
second_input_started.set()
assert release_blocked_input.wait(timeout=1.0), (
"test did not release blocked fallback input() call"
)
return "q"

monkeypatch.setattr("builtins.input", _fake_input)

runner = CliRunner()
try:
result = runner.invoke(main, ["--path", str(tmp_path)])
assert result.exit_code == 0

alive = [
t
for t in threading.enumerate()
if t.name == "input-fallback" and t.is_alive()
]
assert alive == [], (
f"input-fallback thread(s) still alive after _interactive_loop "
f"returned: {alive}"
)
# The reader should not re-enter input() after producing the quit
# command — the request/ack pattern prevents re-entry.
assert not second_input_started.is_set(), (
"reader thread re-entered input() after producing quit command"
)
finally:
release_blocked_input.set()


def test_fallback_thread_no_zombie_across_two_calls(
tmp_path: Path, monkeypatch: Any
) -> None:
"""Two consecutive _interactive_loop calls must not share a fallback thread.

Each ``_interactive_loop`` invocation creates its own fallback reader,
and each reader's ``input()`` returns ``"q"`` once. A subsequent
``input()`` call per reader blocks to surface regressions.

Regression for issue #1062.
"""
_write_session(tmp_path, "fb_zomb0-0000-0000-0000-000000000000", name="Zombie")

import copilot_usage.cli as cli_mod

def _raise_value_error(timeout: float = 0.5) -> str | None: # noqa: ARG001
raise ValueError("underlying buffer has been detached")

monkeypatch.setattr(cli_mod, "_read_line_nonblocking", _raise_value_error)

blocked = threading.Event()
release = threading.Event()
input_calls = 0

def _fake_input(*_args: str, **_kwargs: str) -> str:
nonlocal input_calls
input_calls += 1
# Each _interactive_loop invocation triggers one input() call
# (calls 1 and 2). Any further call means a re-entry bug.
if input_calls <= 2:
return "q"
blocked.set()
assert release.wait(timeout=1.0), (
"test did not release blocked fallback input() call"
)
return "q"

monkeypatch.setattr("builtins.input", _fake_input)

runner = CliRunner()

try:
# --- Call 1 ---
result1 = runner.invoke(main, ["--path", str(tmp_path)])
assert result1.exit_code == 0

alive_after_first = [
t
for t in threading.enumerate()
if t.name == "input-fallback" and t.is_alive()
]
assert alive_after_first == [], (
"input-fallback thread(s) still alive after first call"
)

# --- Call 2 ---
result2 = runner.invoke(main, ["--path", str(tmp_path)])
assert result2.exit_code == 0

alive_after_second = [
t
for t in threading.enumerate()
if t.name == "input-fallback" and t.is_alive()
]
assert alive_after_second == [], (
"input-fallback thread(s) still alive after second call"
)
assert not blocked.is_set(), (
"reader thread re-entered input() unexpectedly"
)
finally:
release.set()


# ---------------------------------------------------------------------------
# Issue #329 — observer=None when session_path doesn't exist
# ---------------------------------------------------------------------------
Expand Down
Loading