Skip to content

Commit 42cf8aa

Browse files
committed
fix(client/stdio): let callers clean up multiple transports
independently on asyncio (#577) `stdio_client` wrapped its reader / writer in `anyio.create_task_group()` inside the async context manager body. The task group binds its cancel scope to whichever task entered the generator, and anyio enforces a strict LIFO stack on those scopes within a single task. If a caller opened two transports from the same task and then closed them in the order they were opened (FIFO) — which is what happens in multi-client orchestrators, dependency-injection containers and pytest fixtures — teardown crashed with: RuntimeError: Attempted to exit a cancel scope that isn't the current task's current cancel scope On the asyncio backend we can spawn the reader / writer with `asyncio.create_task` instead. The background tasks then live outside any caller-bound cancel scope, so each transport becomes self-contained and the cleanup order of two transports no longer matters. Teardown keeps the documented shutdown sequence (close stdin, wait for the process with `PROCESS_TERMINATION_TIMEOUT`, escalate to `_terminate_process_tree`, then close the memory streams) in a shared `_cleanup_process_and_streams` helper so the two branches can't drift. On structured-concurrency backends (trio) an anyio task group is still required — trio intentionally forbids orphan tasks, and cross-task cleanup is fundamentally incompatible with its model. Those callers still have to clean up LIFO. The backend dispatch goes through `sniffio.current_async_library()`. Regression tests (all asyncio-only): - `test_stdio_client_supports_fifo_cleanup_on_asyncio` — the exact scenario from #577. Verified that it FAILS on the pre-fix code with the original RuntimeError, and passes on the fixed code. - `test_stdio_client_supports_lifo_cleanup_on_asyncio` — historical LIFO path must still work unchanged. - `test_stdio_client_shared_exit_stack_fifo_on_asyncio` — a single AsyncExitStack around two transports (already LIFO under the hood, but worth pinning so future refactors cannot silently break it). All 13 stdio tests pass locally (10 pre-existing + 3 new, 1 Unix-only sigint test skipped on Windows). Fixes #577
1 parent 3d7b311 commit 42cf8aa

File tree

2 files changed

+152
-29
lines changed

2 files changed

+152
-29
lines changed

src/mcp/client/stdio.py

Lines changed: 63 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import asyncio
12
import logging
23
import os
34
import sys
@@ -7,6 +8,7 @@
78

89
import anyio
910
import anyio.lowlevel
11+
import sniffio
1012
from anyio.abc import Process
1113
from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream
1214
from anyio.streams.text import TextReceiveStream
@@ -177,38 +179,70 @@ async def stdin_writer():
177179
except anyio.ClosedResourceError: # pragma: no cover
178180
await anyio.lowlevel.checkpoint()
179181

180-
async with anyio.create_task_group() as tg, process:
181-
tg.start_soon(stdout_reader)
182-
tg.start_soon(stdin_writer)
182+
async def _cleanup_process_and_streams() -> None:
183+
# MCP spec: stdio shutdown sequence
184+
# 1. Close input stream to server
185+
# 2. Wait for server to exit, or send SIGTERM if it doesn't exit in time
186+
# 3. Send SIGKILL if still not exited
187+
if process.stdin: # pragma: no branch
188+
try:
189+
await process.stdin.aclose()
190+
except Exception: # pragma: no cover
191+
# stdin might already be closed, which is fine
192+
pass
193+
183194
try:
184-
yield read_stream, write_stream
185-
finally:
186-
# MCP spec: stdio shutdown sequence
187-
# 1. Close input stream to server
188-
# 2. Wait for server to exit, or send SIGTERM if it doesn't exit in time
189-
# 3. Send SIGKILL if still not exited
190-
if process.stdin: # pragma: no branch
191-
try:
192-
await process.stdin.aclose()
193-
except Exception: # pragma: no cover
194-
# stdin might already be closed, which is fine
195-
pass
195+
# Give the process time to exit gracefully after stdin closes
196+
with anyio.fail_after(PROCESS_TERMINATION_TIMEOUT):
197+
await process.wait()
198+
except TimeoutError:
199+
# Process didn't exit from stdin closure, use platform-specific termination
200+
# which handles SIGTERM -> SIGKILL escalation
201+
await _terminate_process_tree(process)
202+
except ProcessLookupError: # pragma: no cover
203+
# Process already exited, which is fine
204+
pass
205+
await read_stream.aclose()
206+
await write_stream.aclose()
207+
await read_stream_writer.aclose()
208+
await write_stream_reader.aclose()
196209

210+
# On asyncio we spawn the reader / writer with asyncio.create_task rather
211+
# than an anyio task group, so their cancel scopes are not bound to the
212+
# caller's task. That is what lets callers clean up multiple transports
213+
# in arbitrary order — see #577. On structured-concurrency backends
214+
# (trio), we keep the task group: orphan tasks are disallowed there by
215+
# design, and cross-task cleanup is fundamentally incompatible with
216+
# that model, so callers on trio still have to clean up LIFO.
217+
if sniffio.current_async_library() == "asyncio":
218+
async with process:
219+
stdout_task = asyncio.create_task(stdout_reader())
220+
stdin_task = asyncio.create_task(stdin_writer())
197221
try:
198-
# Give the process time to exit gracefully after stdin closes
199-
with anyio.fail_after(PROCESS_TERMINATION_TIMEOUT):
200-
await process.wait()
201-
except TimeoutError:
202-
# Process didn't exit from stdin closure, use platform-specific termination
203-
# which handles SIGTERM -> SIGKILL escalation
204-
await _terminate_process_tree(process)
205-
except ProcessLookupError: # pragma: no cover
206-
# Process already exited, which is fine
207-
pass
208-
await read_stream.aclose()
209-
await write_stream.aclose()
210-
await read_stream_writer.aclose()
211-
await write_stream_reader.aclose()
222+
yield read_stream, write_stream
223+
finally:
224+
try:
225+
await _cleanup_process_and_streams()
226+
finally:
227+
for task in (stdout_task, stdin_task):
228+
if not task.done():
229+
task.cancel()
230+
for task in (stdout_task, stdin_task):
231+
try:
232+
await task
233+
except BaseException: # noqa: BLE001, S110
234+
# Reader/writer cancellation or late I/O errors
235+
# surfaced after the streams were closed: those
236+
# are expected during teardown.
237+
pass
238+
else:
239+
async with anyio.create_task_group() as tg, process:
240+
tg.start_soon(stdout_reader)
241+
tg.start_soon(stdin_writer)
242+
try:
243+
yield read_stream, write_stream
244+
finally:
245+
await _cleanup_process_and_streams()
212246

213247

214248
def _get_executable_command(command: str) -> str:

tests/client/test_stdio.py

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -558,3 +558,92 @@ def sigterm_handler(signum, frame):
558558
f"stdio_client cleanup took {elapsed:.1f} seconds for stdin-ignoring process. "
559559
f"Expected between 2-4 seconds (2s stdin timeout + termination time)."
560560
)
561+
562+
563+
# A stub MCP-ish server: exits cleanly as soon as stdin closes. We only need
564+
# the stdio_client to be able to stand the transport up; we do not exercise
565+
# any MCP protocol traffic in the FIFO-cleanup tests below.
566+
_QUIET_STDIN_STUB = textwrap.dedent(
567+
"""
568+
import sys
569+
for _ in sys.stdin:
570+
pass
571+
"""
572+
)
573+
574+
575+
@pytest.mark.anyio
576+
@pytest.mark.parametrize("anyio_backend", ["asyncio"])
577+
async def test_stdio_client_supports_fifo_cleanup_on_asyncio(anyio_backend):
578+
"""
579+
Regression for https://github.com/modelcontextprotocol/python-sdk/issues/577.
580+
581+
Prior to the fix, closing two ``stdio_client`` transports in the order
582+
they were opened (FIFO) crashed with::
583+
584+
RuntimeError: Attempted to exit a cancel scope that isn't the
585+
current task's current cancel scope
586+
587+
because each stdio_client bound a task-group cancel scope to the
588+
caller's task, and anyio enforces a strict LIFO stack on those
589+
scopes. On asyncio the fix uses ``asyncio.create_task`` for the
590+
reader / writer so the transports are independent and the cleanup
591+
order no longer matters.
592+
593+
(Trio intentionally forbids orphan tasks — there is no equivalent
594+
fix on that backend, so this test is asyncio-only.)
595+
"""
596+
params = StdioServerParameters(command=sys.executable, args=["-c", _QUIET_STDIN_STUB])
597+
598+
s1 = AsyncExitStack()
599+
s2 = AsyncExitStack()
600+
601+
await s1.__aenter__()
602+
await s2.__aenter__()
603+
604+
await s1.enter_async_context(stdio_client(params))
605+
await s2.enter_async_context(stdio_client(params))
606+
607+
# Close in FIFO order — the opposite of what anyio's structured
608+
# concurrency would normally require.
609+
await s1.aclose()
610+
await s2.aclose()
611+
612+
613+
@pytest.mark.anyio
614+
@pytest.mark.parametrize("anyio_backend", ["asyncio"])
615+
async def test_stdio_client_supports_lifo_cleanup_on_asyncio(anyio_backend):
616+
"""
617+
Sanity check for the fix above: the historical LIFO cleanup path
618+
must still work unchanged on asyncio.
619+
"""
620+
params = StdioServerParameters(command=sys.executable, args=["-c", _QUIET_STDIN_STUB])
621+
622+
s1 = AsyncExitStack()
623+
s2 = AsyncExitStack()
624+
await s1.__aenter__()
625+
await s2.__aenter__()
626+
627+
await s1.enter_async_context(stdio_client(params))
628+
await s2.enter_async_context(stdio_client(params))
629+
630+
# LIFO cleanup — the last-opened transport closes first.
631+
await s2.aclose()
632+
await s1.aclose()
633+
634+
635+
@pytest.mark.anyio
636+
@pytest.mark.parametrize("anyio_backend", ["asyncio"])
637+
async def test_stdio_client_shared_exit_stack_fifo_on_asyncio(anyio_backend):
638+
"""
639+
The same story, but through a single ``AsyncExitStack`` that the
640+
caller then closes once. ExitStack runs callbacks in LIFO order on
641+
its own, so this case already worked — the test pins that behavior
642+
so a future refactor of the asyncio branch cannot silently break
643+
it.
644+
"""
645+
params = StdioServerParameters(command=sys.executable, args=["-c", _QUIET_STDIN_STUB])
646+
647+
async with AsyncExitStack() as stack:
648+
await stack.enter_async_context(stdio_client(params))
649+
await stack.enter_async_context(stdio_client(params))

0 commit comments

Comments
 (0)