Skip to content

Commit 3350ca9

Browse files
author
Andrey Barchenkov
committed
fix: prevent infinite reconnection loop when SSE stream drops without response
When the server accepts SSE connections but closes the stream without delivering a complete JSON-RPC response, the client retried forever because _handle_reconnection reset the attempt counter to 0 on each reconnection. Now the attempt counter only resets when real data (not just priming events) was received, indicating the server made progress. When the server only sends empty priming events and drops, the counter increments and the client gives up after MAX_RECONNECTION_ATTEMPTS. Also report a JSONRPCError back to the caller when max attempts are exceeded, so call_tool returns an error instead of hanging forever. Fixes #2393
1 parent 3d7b311 commit 3350ca9

2 files changed

Lines changed: 94 additions & 4 deletions

File tree

src/mcp/client/streamable_http.py

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -380,8 +380,17 @@ async def _handle_reconnection(
380380
) -> None:
381381
"""Reconnect with Last-Event-ID to resume stream after server disconnect."""
382382
# Bail if max retries exceeded
383-
if attempt >= MAX_RECONNECTION_ATTEMPTS: # pragma: no cover
384-
logger.debug(f"Max reconnection attempts ({MAX_RECONNECTION_ATTEMPTS}) exceeded")
383+
if attempt >= MAX_RECONNECTION_ATTEMPTS:
384+
logger.warning(f"Max reconnection attempts ({MAX_RECONNECTION_ATTEMPTS}) exceeded")
385+
if isinstance(ctx.session_message.message, JSONRPCRequest):
386+
error_data = ErrorData(
387+
code=INTERNAL_ERROR,
388+
message=f"SSE stream disconnected after {MAX_RECONNECTION_ATTEMPTS} reconnection attempts",
389+
)
390+
error_msg = SessionMessage(
391+
JSONRPCError(jsonrpc="2.0", id=ctx.session_message.message.id, error=error_data)
392+
)
393+
await ctx.read_stream_writer.send(error_msg)
385394
return
386395

387396
# Always wait - use server value or default
@@ -404,12 +413,15 @@ async def _handle_reconnection(
404413
# Track for potential further reconnection
405414
reconnect_last_event_id: str = last_event_id
406415
reconnect_retry_ms = retry_interval_ms
416+
received_data = False
407417

408418
async for sse in event_source.aiter_sse():
409419
if sse.id: # pragma: no branch
410420
reconnect_last_event_id = sse.id
411421
if sse.retry is not None:
412422
reconnect_retry_ms = sse.retry
423+
if sse.data:
424+
received_data = True
413425

414426
is_complete = await self._handle_sse_event(
415427
sse,
@@ -421,9 +433,13 @@ async def _handle_reconnection(
421433
await event_source.response.aclose()
422434
return
423435

424-
# Stream ended again without response - reconnect again (reset attempt counter)
436+
# Stream ended without response — reconnect.
437+
# Reset attempt counter only when real data was received
438+
# (server made progress). Otherwise increment to prevent
439+
# infinite loops when server only sends priming events.
440+
next_attempt = 0 if received_data else attempt + 1
425441
logger.info("SSE stream disconnected, reconnecting...")
426-
await self._handle_reconnection(ctx, reconnect_last_event_id, reconnect_retry_ms, 0)
442+
await self._handle_reconnection(ctx, reconnect_last_event_id, reconnect_retry_ms, next_attempt)
427443
except Exception as e: # pragma: no cover
428444
logger.debug(f"Reconnection failed: {e}")
429445
# Try to reconnect again if we still have an event ID

tests/shared/test_streamable_http.py

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
CallToolRequestParams,
5858
CallToolResult,
5959
InitializeResult,
60+
JSONRPCError,
6061
JSONRPCRequest,
6162
ListToolsResult,
6263
PaginatedRequestParams,
@@ -2318,3 +2319,76 @@ async def test_streamable_http_client_preserves_custom_with_mcp_headers(
23182319

23192320
assert "content-type" in headers_data
23202321
assert headers_data["content-type"] == "application/json"
2322+
2323+
2324+
@pytest.mark.anyio
2325+
async def test_handle_reconnection_stops_after_max_attempts() -> None:
2326+
"""_handle_reconnection must not reset attempt counter on stream drop.
2327+
2328+
Regression test for https://github.com/modelcontextprotocol/python-sdk/issues/2393.
2329+
When the server accepts the SSE connection but closes the stream without
2330+
sending a complete JSON-RPC response, the client must give up after
2331+
MAX_RECONNECTION_ATTEMPTS total attempts and report an error — not retry
2332+
forever.
2333+
"""
2334+
from unittest.mock import AsyncMock, MagicMock
2335+
2336+
from mcp.client.streamable_http import MAX_RECONNECTION_ATTEMPTS, RequestContext
2337+
2338+
transport = StreamableHTTPTransport("http://test/mcp")
2339+
connect_count = 0
2340+
2341+
@asynccontextmanager
2342+
async def fake_aconnect_sse(*_args: object, **_kwargs: object):
2343+
nonlocal connect_count
2344+
connect_count += 1
2345+
2346+
response = MagicMock()
2347+
response.raise_for_status = MagicMock()
2348+
response.aclose = AsyncMock()
2349+
2350+
event_source = MagicMock()
2351+
event_source.response = response
2352+
2353+
async def aiter_sse():
2354+
yield ServerSentEvent(event="message", data="", id=f"evt-{connect_count}", retry=None)
2355+
2356+
event_source.aiter_sse = aiter_sse
2357+
yield event_source
2358+
2359+
write_stream, read_stream = create_context_streams[SessionMessage | Exception](1)
2360+
2361+
request = JSONRPCRequest(
2362+
jsonrpc="2.0",
2363+
id="req-1",
2364+
method="tools/call",
2365+
params={"name": "test_tool", "arguments": {}},
2366+
)
2367+
ctx = RequestContext(
2368+
client=MagicMock(),
2369+
session_id="test-session",
2370+
session_message=SessionMessage(request),
2371+
metadata=None,
2372+
read_stream_writer=write_stream,
2373+
)
2374+
2375+
import mcp.client.streamable_http as _mod
2376+
2377+
original = _mod.aconnect_sse
2378+
_mod.aconnect_sse = fake_aconnect_sse # type: ignore[assignment]
2379+
try:
2380+
await transport._handle_reconnection(ctx, "evt-0", 0)
2381+
finally:
2382+
_mod.aconnect_sse = original
2383+
2384+
assert connect_count == MAX_RECONNECTION_ATTEMPTS
2385+
2386+
with anyio.fail_after(1):
2387+
msg = await read_stream.receive()
2388+
assert isinstance(msg, SessionMessage)
2389+
assert isinstance(msg.message, JSONRPCError)
2390+
assert "reconnection attempts" in msg.message.error.message.lower()
2391+
assert msg.message.id == "req-1"
2392+
2393+
await write_stream.aclose()
2394+
await read_stream.aclose()

0 commit comments

Comments
 (0)