Skip to content

Commit 9938ae9

Browse files
author
BashNetCorp
committed
fix(stdio): handle BrokenResourceError in stdout_reader race
`stdio_client` has a race: `read_stream_writer.aclose()` in the context's `finally` block can close the receiver while the background `stdout_reader` task is mid-`send`. anyio then raises `BrokenResourceError`, which the outer `except` does not cover (`ClosedResourceError` is the sibling class, raised on already-closed streams, not streams closed during an in-flight send). The exception propagates through the task group as `ExceptionGroup` and fails every caller that exits the context while the subprocess is still writing to stdout. Wrap both `read_stream_writer.send(...)` sites in `try`/`except (ClosedResourceError, BrokenResourceError): return` so `stdout_reader` shuts down cleanly, and widen the outer `except` to the same union for defense in depth. No API changes. Adds `test_stdio_client_exits_cleanly_while_server_still_writing`: spawns a subprocess that emits a burst of JSONRPC notifications, exits the `stdio_client` context immediately, and asserts no exception propagates. Fails before the fix (ExceptionGroup / BrokenResourceError), passes after. Github-Issue:#1960
1 parent 3d7b311 commit 9938ae9

2 files changed

Lines changed: 54 additions & 3 deletions

File tree

src/mcp/client/stdio.py

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -153,12 +153,24 @@ async def stdout_reader():
153153
message = types.jsonrpc_message_adapter.validate_json(line, by_name=False)
154154
except Exception as exc: # pragma: no cover
155155
logger.exception("Failed to parse JSONRPC message from server")
156-
await read_stream_writer.send(exc)
156+
try:
157+
await read_stream_writer.send(exc)
158+
except (anyio.ClosedResourceError, anyio.BrokenResourceError):
159+
return
157160
continue
158161

159162
session_message = SessionMessage(message)
160-
await read_stream_writer.send(session_message)
161-
except anyio.ClosedResourceError: # pragma: lax no cover
163+
try:
164+
await read_stream_writer.send(session_message)
165+
except (anyio.ClosedResourceError, anyio.BrokenResourceError):
166+
# The caller exited the stdio_client context while the
167+
# subprocess was still writing; the reader stream has been
168+
# closed (ClosedResourceError) or the closure raced our
169+
# in-flight send (BrokenResourceError). Either way there
170+
# is nowhere to deliver this message, so shut down
171+
# cleanly. Fixes #1960.
172+
return
173+
except (anyio.ClosedResourceError, anyio.BrokenResourceError): # pragma: lax no cover
162174
await anyio.lowlevel.checkpoint()
163175

164176
async def stdin_writer():

tests/client/test_stdio.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,45 @@ async def test_stdio_context_manager_exiting():
3737
pass
3838

3939

40+
@pytest.mark.anyio
41+
async def test_stdio_client_exits_cleanly_while_server_still_writing():
42+
"""Regression test for #1960.
43+
44+
Exiting the ``stdio_client`` context while the subprocess is still writing to
45+
stdout used to surface ``anyio.BrokenResourceError`` through the task group
46+
(as an ``ExceptionGroup``). The ``finally`` block closes
47+
``read_stream_writer`` while the background ``stdout_reader`` task is
48+
mid-``send``.
49+
50+
The fix makes ``stdout_reader`` catch both ``ClosedResourceError`` and
51+
``BrokenResourceError`` and return cleanly, so exiting the context is a
52+
no-op no matter what the subprocess is doing.
53+
"""
54+
# A server that emits a large burst of valid JSON-RPC notifications without
55+
# ever reading stdin. When we exit the context below, the subprocess is
56+
# still in the middle of that burst, which is the exact shape of the race.
57+
noisy_script = textwrap.dedent(
58+
"""
59+
import sys
60+
for i in range(1000):
61+
sys.stdout.write(
62+
'{"jsonrpc":"2.0","method":"notifications/message",'
63+
'"params":{"level":"info","data":"line ' + str(i) + '"}}\\n'
64+
)
65+
sys.stdout.flush()
66+
"""
67+
)
68+
69+
server_params = StdioServerParameters(command=sys.executable, args=["-c", noisy_script])
70+
71+
# The ``async with`` must complete without an ``ExceptionGroup`` /
72+
# ``BrokenResourceError`` propagating. ``anyio.fail_after`` prevents a
73+
# regression from hanging CI.
74+
with anyio.fail_after(5.0):
75+
async with stdio_client(server_params) as (_, _):
76+
pass
77+
78+
4079
@pytest.mark.anyio
4180
@pytest.mark.skipif(tee is None, reason="could not find tee command")
4281
async def test_stdio_client():

0 commit comments

Comments
 (0)