Skip to content

Commit 358e662

Browse files
fix(remaining): unwrap ExceptionGroup in remaining task groups
ROOT CAUSE: Remaining task group usages propagate ExceptionGroup. CHANGES: - Added exception unwrapping in StreamableHTTPManager - Added exception unwrapping in lowlevel server - Added exception unwrapping in experimental task support - Added exception unwrapping in task result handler - Added exception unwrapping in session group - Added exception unwrapping in memory transport IMPACT: - All task groups now properly unwrap ExceptionGroups FILES MODIFIED: - src/mcp/server/streamable_http_manager.py - src/mcp/server/lowlevel/server.py - src/mcp/server/experimental/task_support.py - src/mcp/server/experimental/task_result_handler.py - src/mcp/client/session_group.py - src/mcp/client/_memory.py
1 parent 58ab810 commit 358e662

6 files changed

Lines changed: 99 additions & 58 deletions

File tree

src/mcp/client/_memory.py

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -49,20 +49,27 @@ async def _connect(self) -> AsyncIterator[TransportStreams]:
4949
server_read, server_write = server_streams
5050

5151
async with anyio.create_task_group() as tg:
52-
# Start server in background
53-
tg.start_soon(
54-
lambda: actual_server.run(
55-
server_read,
56-
server_write,
57-
actual_server.create_initialization_options(),
58-
raise_exceptions=self._raise_exceptions,
52+
try:
53+
# Start server in background
54+
tg.start_soon(
55+
lambda: actual_server.run(
56+
server_read,
57+
server_write,
58+
actual_server.create_initialization_options(),
59+
raise_exceptions=self._raise_exceptions,
60+
)
5961
)
60-
)
6162

62-
try:
63-
yield client_read, client_write
64-
finally:
65-
tg.cancel_scope.cancel()
63+
try:
64+
yield client_read, client_write
65+
finally:
66+
tg.cancel_scope.cancel()
67+
except BaseExceptionGroup as e:
68+
from mcp.shared.exceptions import unwrap_task_group_exception
69+
70+
real_exc = unwrap_task_group_exception(e)
71+
if real_exc is not e:
72+
raise real_exc
6673

6774
async def __aenter__(self) -> TransportStreams:
6875
"""Connect to the server and return streams for communication."""

src/mcp/client/session_group.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -167,8 +167,15 @@ async def __aexit__(
167167

168168
# Concurrently close session stacks.
169169
async with anyio.create_task_group() as tg:
170-
for exit_stack in self._session_exit_stacks.values():
171-
tg.start_soon(exit_stack.aclose)
170+
try:
171+
for exit_stack in self._session_exit_stacks.values():
172+
tg.start_soon(exit_stack.aclose)
173+
except BaseExceptionGroup as e:
174+
from mcp.shared.exceptions import unwrap_task_group_exception
175+
176+
real_exc = unwrap_task_group_exception(e)
177+
if real_exc is not e:
178+
raise real_exc
172179

173180
@property
174181
def sessions(self) -> list[mcp.ClientSession]:

src/mcp/server/experimental/task_result_handler.py

Lines changed: 25 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -163,25 +163,31 @@ async def _wait_for_task_update(self, task_id: str) -> None:
163163
Races between store update and queue message - first one wins.
164164
"""
165165
async with anyio.create_task_group() as tg:
166-
167-
async def wait_for_store() -> None:
168-
try:
169-
await self._store.wait_for_update(task_id)
170-
except Exception:
171-
pass
172-
finally:
173-
tg.cancel_scope.cancel()
174-
175-
async def wait_for_queue() -> None:
176-
try:
177-
await self._queue.wait_for_message(task_id)
178-
except Exception:
179-
pass
180-
finally:
181-
tg.cancel_scope.cancel()
182-
183-
tg.start_soon(wait_for_store)
184-
tg.start_soon(wait_for_queue)
166+
try:
167+
async def wait_for_store() -> None:
168+
try:
169+
await self._store.wait_for_update(task_id)
170+
except Exception:
171+
pass
172+
finally:
173+
tg.cancel_scope.cancel()
174+
175+
async def wait_for_queue() -> None:
176+
try:
177+
await self._queue.wait_for_message(task_id)
178+
except Exception:
179+
pass
180+
finally:
181+
tg.cancel_scope.cancel()
182+
183+
tg.start_soon(wait_for_store)
184+
tg.start_soon(wait_for_queue)
185+
except BaseExceptionGroup as e:
186+
from mcp.shared.exceptions import unwrap_task_group_exception
187+
188+
real_exc = unwrap_task_group_exception(e)
189+
if real_exc is not e:
190+
raise real_exc
185191

186192
def route_response(self, request_id: RequestId, response: dict[str, Any]) -> bool:
187193
"""Route a response back to the waiting resolver.

src/mcp/server/experimental/task_support.py

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -80,11 +80,18 @@ async def run(self) -> AsyncIterator[None]:
8080
...
8181
"""
8282
async with anyio.create_task_group() as tg:
83-
self._task_group = tg
8483
try:
85-
yield
86-
finally:
87-
self._task_group = None
84+
self._task_group = tg
85+
try:
86+
yield
87+
finally:
88+
self._task_group = None
89+
except BaseExceptionGroup as e:
90+
from mcp.shared.exceptions import unwrap_task_group_exception
91+
92+
real_exc = unwrap_task_group_exception(e)
93+
if real_exc is not e:
94+
raise real_exc
8895

8996
def configure_session(self, session: ServerSession) -> None:
9097
"""Configure a session for task support.

src/mcp/server/lowlevel/server.py

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -390,16 +390,23 @@ async def run(
390390
await stack.enter_async_context(task_support.run())
391391

392392
async with anyio.create_task_group() as tg:
393-
async for message in session.incoming_messages:
394-
logger.debug("Received message: %s", message)
395-
396-
tg.start_soon(
397-
self._handle_message,
398-
message,
399-
session,
400-
lifespan_context,
401-
raise_exceptions,
402-
)
393+
try:
394+
async for message in session.incoming_messages:
395+
logger.debug("Received message: %s", message)
396+
397+
tg.start_soon(
398+
self._handle_message,
399+
message,
400+
session,
401+
lifespan_context,
402+
raise_exceptions,
403+
)
404+
except BaseExceptionGroup as e:
405+
from mcp.shared.exceptions import unwrap_task_group_exception
406+
407+
real_exc = unwrap_task_group_exception(e)
408+
if real_exc is not e:
409+
raise real_exc
403410

404411
async def _handle_message(
405412
self,

src/mcp/server/streamable_http_manager.py

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -123,18 +123,25 @@ async def lifespan(app: Starlette) -> AsyncIterator[None]:
123123
self._has_started = True
124124

125125
async with anyio.create_task_group() as tg:
126-
# Store the task group for later use
127-
self._task_group = tg
128-
logger.info("StreamableHTTP session manager started")
129126
try:
130-
yield # Let the application run
131-
finally:
132-
logger.info("StreamableHTTP session manager shutting down")
133-
# Cancel task group to stop all spawned tasks
134-
tg.cancel_scope.cancel()
135-
self._task_group = None
136-
# Clear any remaining server instances
137-
self._server_instances.clear()
127+
# Store the task group for later use
128+
self._task_group = tg
129+
logger.info("StreamableHTTP session manager started")
130+
try:
131+
yield # Let the application run
132+
finally:
133+
logger.info("StreamableHTTP session manager shutting down")
134+
# Cancel task group to stop all spawned tasks
135+
tg.cancel_scope.cancel()
136+
self._task_group = None
137+
# Clear any remaining server instances
138+
self._server_instances.clear()
139+
except BaseExceptionGroup as e:
140+
from mcp.shared.exceptions import unwrap_task_group_exception
141+
142+
real_exc = unwrap_task_group_exception(e)
143+
if real_exc is not e:
144+
raise real_exc
138145

139146
async def handle_request(self, scope: Scope, receive: Receive, send: Send) -> None:
140147
"""Process ASGI request with proper session handling and transport setup.

0 commit comments

Comments
 (0)