Skip to content

Commit 2e8a963

Browse files
g97iulio1609Copilot
andcommitted
fix: collapse BaseExceptionGroup to surface real errors from task groups
When a task in an anyio task group fails, sibling tasks are cancelled. The resulting BaseExceptionGroup contains the real error alongside Cancelled exceptions from those siblings. This makes error classification extremely difficult for callers. Add open_task_group() context manager and collapse_exception_group() utility that detect this pattern and re-raise just the original error, keeping the full group as __cause__ for debugging. Applied to all 16 create_task_group() sites across: - Client transports (sse, stdio, websocket, streamable_http) - Server transports (sse, stdio, websocket, streamable_http) - Session __aexit__ - Server lowlevel run loop - StreamableHTTP session manager - SessionGroup, InMemoryTransport - Experimental task support Fixes #2114 Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent 62575ed commit 2e8a963

17 files changed

Lines changed: 244 additions & 16 deletions

src/mcp/client/_memory.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
from mcp.client._transport import TransportStreams
1313
from mcp.server import Server
1414
from mcp.server.mcpserver import MCPServer
15+
from mcp.shared._exception_utils import open_task_group
1516
from mcp.shared.memory import create_client_server_memory_streams
1617

1718

@@ -48,7 +49,7 @@ async def _connect(self) -> AsyncIterator[TransportStreams]:
4849
client_read, client_write = client_streams
4950
server_read, server_write = server_streams
5051

51-
async with anyio.create_task_group() as tg:
52+
async with open_task_group() as tg:
5253
# Start server in background
5354
tg.start_soon(
5455
lambda: actual_server.run(

src/mcp/client/session_group.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
from mcp.client.sse import sse_client
2525
from mcp.client.stdio import StdioServerParameters
2626
from mcp.client.streamable_http import streamable_http_client
27+
from mcp.shared._exception_utils import open_task_group
2728
from mcp.shared._httpx_utils import create_mcp_http_client
2829
from mcp.shared.exceptions import MCPError
2930
from mcp.shared.session import ProgressFnT
@@ -166,7 +167,7 @@ async def __aexit__(
166167
await self._exit_stack.aclose()
167168

168169
# Concurrently close session stacks.
169-
async with anyio.create_task_group() as tg:
170+
async with open_task_group() as tg:
170171
for exit_stack in self._session_exit_stacks.values():
171172
tg.start_soon(exit_stack.aclose)
172173

src/mcp/client/sse.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
from httpx_sse._exceptions import SSEError
1313

1414
from mcp import types
15+
from mcp.shared._exception_utils import open_task_group
1516
from mcp.shared._httpx_utils import McpHttpClientFactory, create_mcp_http_client
1617
from mcp.shared.message import SessionMessage
1718

@@ -60,7 +61,7 @@ async def sse_client(
6061
read_stream_writer, read_stream = anyio.create_memory_object_stream(0)
6162
write_stream, write_stream_reader = anyio.create_memory_object_stream(0)
6263

63-
async with anyio.create_task_group() as tg:
64+
async with open_task_group() as tg:
6465
try:
6566
logger.debug(f"Connecting to SSE endpoint: {remove_request_params(url)}")
6667
async with httpx_client_factory(

src/mcp/client/stdio.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
get_windows_executable_command,
2121
terminate_windows_process_tree,
2222
)
23+
from mcp.shared._exception_utils import open_task_group
2324
from mcp.shared.message import SessionMessage
2425

2526
logger = logging.getLogger(__name__)
@@ -177,7 +178,7 @@ async def stdin_writer():
177178
except anyio.ClosedResourceError: # pragma: no cover
178179
await anyio.lowlevel.checkpoint()
179180

180-
async with anyio.create_task_group() as tg, process:
181+
async with open_task_group() as tg, process:
181182
tg.start_soon(stdout_reader)
182183
tg.start_soon(stdin_writer)
183184
try:

src/mcp/client/streamable_http.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
from pydantic import ValidationError
1717

1818
from mcp.client._transport import TransportStreams
19+
from mcp.shared._exception_utils import open_task_group
1920
from mcp.shared._httpx_utils import create_mcp_http_client
2021
from mcp.shared.message import ClientMessageMetadata, SessionMessage
2122
from mcp.types import (
@@ -546,7 +547,7 @@ async def streamable_http_client(
546547

547548
transport = StreamableHTTPTransport(url)
548549

549-
async with anyio.create_task_group() as tg:
550+
async with open_task_group() as tg:
550551
try:
551552
logger.debug(f"Connecting to StreamableHTTP endpoint: {url}")
552553

src/mcp/client/websocket.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
from websockets.typing import Subprotocol
1010

1111
from mcp import types
12+
from mcp.shared._exception_utils import open_task_group
1213
from mcp.shared.message import SessionMessage
1314

1415

@@ -68,7 +69,7 @@ async def ws_writer():
6869
msg_dict = session_message.message.model_dump(by_alias=True, mode="json", exclude_unset=True)
6970
await ws.send(json.dumps(msg_dict))
7071

71-
async with anyio.create_task_group() as tg:
72+
async with open_task_group() as tg:
7273
# Start reader and writer tasks
7374
tg.start_soon(ws_reader)
7475
tg.start_soon(ws_writer)

src/mcp/server/experimental/task_result_handler.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import anyio
1616

1717
from mcp.server.session import ServerSession
18+
from mcp.shared._exception_utils import open_task_group
1819
from mcp.shared.exceptions import MCPError
1920
from mcp.shared.experimental.tasks.helpers import RELATED_TASK_METADATA_KEY, is_terminal
2021
from mcp.shared.experimental.tasks.message_queue import TaskMessageQueue
@@ -162,7 +163,7 @@ async def _wait_for_task_update(self, task_id: str) -> None:
162163
163164
Races between store update and queue message - first one wins.
164165
"""
165-
async with anyio.create_task_group() as tg:
166+
async with open_task_group() as tg:
166167

167168
async def wait_for_store() -> None:
168169
try:

src/mcp/server/experimental/task_support.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
from mcp.server.experimental.task_result_handler import TaskResultHandler
1515
from mcp.server.session import ServerSession
16+
from mcp.shared._exception_utils import open_task_group
1617
from mcp.shared.experimental.tasks.in_memory_task_store import InMemoryTaskStore
1718
from mcp.shared.experimental.tasks.message_queue import InMemoryTaskMessageQueue, TaskMessageQueue
1819
from mcp.shared.experimental.tasks.store import TaskStore
@@ -79,7 +80,7 @@ async def run(self) -> AsyncIterator[None]:
7980
# Task group is now available
8081
...
8182
"""
82-
async with anyio.create_task_group() as tg:
83+
async with open_task_group() as tg:
8384
self._task_group = tg
8485
try:
8586
yield

src/mcp/server/lowlevel/server.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ async def main():
6666
from mcp.server.streamable_http import EventStore
6767
from mcp.server.streamable_http_manager import StreamableHTTPASGIApp, StreamableHTTPSessionManager
6868
from mcp.server.transport_security import TransportSecuritySettings
69+
from mcp.shared._exception_utils import open_task_group
6970
from mcp.shared.exceptions import MCPError
7071
from mcp.shared.message import ServerMessageMetadata, SessionMessage
7172
from mcp.shared.session import RequestResponder
@@ -389,7 +390,7 @@ async def run(
389390
task_support.configure_session(session)
390391
await stack.enter_async_context(task_support.run())
391392

392-
async with anyio.create_task_group() as tg:
393+
async with open_task_group() as tg:
393394
async for message in session.incoming_messages:
394395
logger.debug("Received message: %s", message)
395396

src/mcp/server/sse.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ async def handle_sse(request):
5555
TransportSecurityMiddleware,
5656
TransportSecuritySettings,
5757
)
58+
from mcp.shared._exception_utils import open_task_group
5859
from mcp.shared.message import ServerMessageMetadata, SessionMessage
5960

6061
logger = logging.getLogger(__name__)
@@ -174,7 +175,7 @@ async def sse_writer():
174175
}
175176
)
176177

177-
async with anyio.create_task_group() as tg:
178+
async with open_task_group() as tg:
178179

179180
async def response_wrapper(scope: Scope, receive: Receive, send: Send):
180181
"""The EventSourceResponse returning signals a client close / disconnect.

0 commit comments

Comments
 (0)