Skip to content

Commit 1d56c4a

Browse files
committed
review(#2484): restore exception propagation from background
reader/writer and close streams on crash Self-review of the first commit caught a regression: the asyncio branch spawned stdout_reader / stdin_writer via asyncio.create_task and did `try: await task except BaseException: pass` during cleanup, which silently swallowed any exception the reader or writer raised. An anyio task group would have re-raised that through the `async with` block — the asyncio path has to reproduce that contract to keep the fix observably equivalent. Two changes: 1. `add_done_callback(_on_background_task_done)` — if a background task crashes while the caller is still inside the `yield`, close the memory streams immediately so any in-flight user read wakes up with `ClosedResourceError` instead of hanging forever. Mirrors the scope-cancellation side effect of a task group. The callback logs the exception at debug so operators can trace crashes that we only surface on the way out. 2. On `__aexit__` we now collect task results and re-raise the first non-cancellation, non-closed-resource exception. CancelledError and anyio.ClosedResourceError are expected during the teardown we just forced in `_cleanup_process_and_streams`, so they are swallowed; anything else (a real crash) propagates. New regression test: `test_stdio_client_reader_crash_propagates_on_asyncio` — injects a TextReceiveStream that raises on the first `__anext__`, asserts the exception is visible at the async-with exit. Confirmed it FAILS on the first-commit version ("Failed: DID NOT RAISE"), so it is real regression coverage, not a tautology. All 14 stdio tests pass (10 pre-existing + 4 new, 1 Unix-only skip). Full `tests/client/` pass (203 passed, 1 pre-existing xfail).
1 parent 42cf8aa commit 1d56c4a

File tree

2 files changed

+85
-8
lines changed

2 files changed

+85
-8
lines changed

src/mcp/client/stdio.py

Lines changed: 49 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -215,26 +215,67 @@ async def _cleanup_process_and_streams() -> None:
215215
# design, and cross-task cleanup is fundamentally incompatible with
216216
# that model, so callers on trio still have to clean up LIFO.
217217
if sniffio.current_async_library() == "asyncio":
218+
219+
def _on_background_task_done(task: asyncio.Task[None]) -> None:
220+
"""
221+
If a background reader/writer crashes while the caller is
222+
still using the transport, close the memory streams so that
223+
any in-flight user read wakes up with ``ClosedResourceError``
224+
instead of hanging forever. An anyio task group would have
225+
produced the same effect via scope cancellation — this
226+
restores that observability on the asyncio path.
227+
"""
228+
if task.cancelled():
229+
return
230+
exc = task.exception()
231+
if exc is None:
232+
return
233+
logger.debug(
234+
"stdio_client background task raised %s — closing streams to wake up caller",
235+
type(exc).__name__,
236+
exc_info=exc,
237+
)
238+
for stream in (read_stream_writer, write_stream):
239+
try:
240+
stream.close()
241+
except Exception: # pragma: no cover
242+
pass
243+
218244
async with process:
219-
stdout_task = asyncio.create_task(stdout_reader())
220-
stdin_task = asyncio.create_task(stdin_writer())
245+
stdout_task: asyncio.Task[None] = asyncio.create_task(stdout_reader())
246+
stdin_task: asyncio.Task[None] = asyncio.create_task(stdin_writer())
247+
stdout_task.add_done_callback(_on_background_task_done)
248+
stdin_task.add_done_callback(_on_background_task_done)
249+
background_tasks = (stdout_task, stdin_task)
221250
try:
222251
yield read_stream, write_stream
223252
finally:
224253
try:
225254
await _cleanup_process_and_streams()
226255
finally:
227-
for task in (stdout_task, stdin_task):
256+
for task in background_tasks:
228257
if not task.done():
229258
task.cancel()
230-
for task in (stdout_task, stdin_task):
259+
# Collect results; swallow CancelledError (expected for
260+
# teardown) and anyio.ClosedResourceError (surfaced when
261+
# we closed the streams out from under the reader/writer
262+
# during cleanup). Re-raise anything else so a crash in
263+
# the background does not go unnoticed — matching the
264+
# exception propagation we'd get from an anyio task
265+
# group on the trio path.
266+
pending_exc: BaseException | None = None
267+
for task in background_tasks:
231268
try:
232269
await task
233-
except BaseException: # noqa: BLE001, S110
234-
# Reader/writer cancellation or late I/O errors
235-
# surfaced after the streams were closed: those
236-
# are expected during teardown.
270+
except asyncio.CancelledError:
271+
pass
272+
except anyio.ClosedResourceError:
237273
pass
274+
except BaseException as exc: # noqa: BLE001
275+
if pending_exc is None:
276+
pending_exc = exc
277+
if pending_exc is not None:
278+
raise pending_exc
238279
else:
239280
async with anyio.create_task_group() as tg, process:
240281
tg.start_soon(stdout_reader)

tests/client/test_stdio.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -647,3 +647,39 @@ async def test_stdio_client_shared_exit_stack_fifo_on_asyncio(anyio_backend):
647647
async with AsyncExitStack() as stack:
648648
await stack.enter_async_context(stdio_client(params))
649649
await stack.enter_async_context(stdio_client(params))
650+
651+
652+
@pytest.mark.anyio
653+
@pytest.mark.parametrize("anyio_backend", ["asyncio"])
654+
async def test_stdio_client_reader_crash_propagates_on_asyncio(anyio_backend, monkeypatch):
655+
"""
656+
Guardrail for the asyncio branch of #577: moving the reader/writer
657+
out of an anyio task group must NOT silently swallow exceptions
658+
they raise. An anyio task group would have re-raised those through
659+
the async ``with`` block; the asyncio path has to reproduce that
660+
contract by collecting the task results in ``__aexit__`` and
661+
re-raising anything that isn't cancellation / closed-resource.
662+
"""
663+
from mcp.client import stdio as stdio_mod
664+
665+
class _BoomTextStream:
666+
def __init__(self, *args, **kwargs):
667+
pass
668+
669+
def __aiter__(self):
670+
return self
671+
672+
async def __anext__(self):
673+
raise RuntimeError("deliberate reader crash for #577 regression test")
674+
675+
monkeypatch.setattr(stdio_mod, "TextReceiveStream", _BoomTextStream)
676+
677+
params = StdioServerParameters(command=sys.executable, args=["-c", _QUIET_STDIN_STUB])
678+
679+
with pytest.raises(RuntimeError, match="deliberate reader crash"):
680+
async with stdio_client(params):
681+
# Give the reader a chance to raise. The crash should close
682+
# the streams out from under us, so we just wait a moment
683+
# and then exit the context — the exception is surfaced on
684+
# the way out.
685+
await anyio.sleep(0.2)

0 commit comments

Comments
 (0)