diff --git a/src/mcp/client/sse.py b/src/mcp/client/sse.py index 193204a15..e1d20387d 100644 --- a/src/mcp/client/sse.py +++ b/src/mcp/client/sse.py @@ -121,15 +121,20 @@ async def post_writer(endpoint_url: str): async def _send_message(session_message: SessionMessage) -> None: logger.debug(f"Sending client message: {session_message}") - response = await client.post( - endpoint_url, - json=session_message.message.model_dump( - by_alias=True, - mode="json", - exclude_unset=True, - ), - ) - response.raise_for_status() + try: + response = await client.post( + endpoint_url, + json=session_message.message.model_dump( + by_alias=True, + mode="json", + exclude_unset=True, + ), + ) + response.raise_for_status() + except httpx.HTTPError as exc: + logger.exception("Error sending client message") + await read_stream_writer.send(exc) + return logger.debug(f"Client message sent successfully: {response.status_code}") async for session_message in write_stream_reader: diff --git a/src/mcp/client/streamable_http.py b/src/mcp/client/streamable_http.py index 9a119c633..88b2d6c40 100644 --- a/src/mcp/client/streamable_http.py +++ b/src/mcp/client/streamable_http.py @@ -269,14 +269,22 @@ async def _handle_post_request(self, ctx: RequestContext) -> None: if response.status_code == 404: # pragma: no branch if isinstance(message, JSONRPCRequest): # pragma: no branch - error_data = ErrorData(code=INVALID_REQUEST, message="Session terminated") + error_data = ErrorData( + code=INVALID_REQUEST, + message="Session terminated", + data={"http_status": response.status_code}, + ) session_message = SessionMessage(JSONRPCError(jsonrpc="2.0", id=message.id, error=error_data)) await ctx.read_stream_writer.send(session_message) return if response.status_code >= 400: if isinstance(message, JSONRPCRequest): - error_data = ErrorData(code=INTERNAL_ERROR, message="Server returned an error response") + error_data = ErrorData( + code=INTERNAL_ERROR, + message=f"Server returned an error response (HTTP {response.status_code})", + data={"http_status": response.status_code}, + ) session_message = SessionMessage(JSONRPCError(jsonrpc="2.0", id=message.id, error=error_data)) await ctx.read_stream_writer.send(session_message) return @@ -468,10 +476,14 @@ async def _handle_message(session_message: SessionMessage) -> None: ) async def handle_request_async(): - if is_resumption: - await self._handle_resumption_request(ctx) - else: - await self._handle_post_request(ctx) + try: + if is_resumption: + await self._handle_resumption_request(ctx) + else: + await self._handle_post_request(ctx) + except httpx.HTTPError as exc: + logger.exception("Error sending client message") + await read_stream_writer.send(exc) # If this is a request, start a new task to handle it if isinstance(message, JSONRPCRequest): diff --git a/tests/shared/test_sse.py b/tests/shared/test_sse.py index 5629a5707..4571027cb 100644 --- a/tests/shared/test_sse.py +++ b/tests/shared/test_sse.py @@ -25,6 +25,7 @@ from mcp.server.sse import SseServerTransport from mcp.server.transport_security import TransportSecuritySettings from mcp.shared.exceptions import MCPError +from mcp.shared.message import SessionMessage from mcp.types import ( CallToolRequestParams, CallToolResult, @@ -629,3 +630,172 @@ async def test_sse_session_cleanup_on_disconnect(server: None, server_url: str) headers={"Content-Type": "application/json"}, ) assert response.status_code == 404 + + +class _LongLivedSSEStream(httpx.AsyncByteStream): + """A streaming SSE body that emits one endpoint event then stays open. + + This mirrors a real SSE server: the stream does not end after the endpoint + event, so the client's `sse_reader` keeps running while we exercise the + POST path. The stream unblocks when the outer task group is cancelled. + """ + + def __init__(self, endpoint_path: str) -> None: + self._endpoint_path = endpoint_path + + async def __aiter__(self) -> AsyncGenerator[bytes, None]: + yield f"event: endpoint\ndata: {self._endpoint_path}\n\n".encode() + await anyio.sleep_forever() + + +@pytest.mark.anyio +@pytest.mark.parametrize("post_status", [401, 404, 500]) +async def test_sse_client_propagates_post_http_error_to_caller(post_status: int) -> None: + """Regression for https://github.com/modelcontextprotocol/python-sdk/issues/2110. + + When the SSE POST endpoint returns a non-2xx HTTP status, the caller must + receive the HTTPStatusError via the read stream instead of hanging on an + indefinite wait for a response that will never arrive. + """ + + def handler(request: httpx.Request) -> httpx.Response: + if request.method == "GET": + return httpx.Response( + 200, + headers={"content-type": "text/event-stream"}, + stream=_LongLivedSSEStream("/messages/?session_id=test"), + ) + return httpx.Response(post_status) + + def mock_factory( + headers: dict[str, Any] | None = None, + timeout: httpx.Timeout | None = None, + auth: httpx.Auth | None = None, + ) -> httpx.AsyncClient: + return httpx.AsyncClient(transport=httpx.MockTransport(handler)) + + async with sse_client("http://test/sse", httpx_client_factory=mock_factory) as (read_stream, write_stream): + request = types.JSONRPCRequest(jsonrpc="2.0", id=1, method="ping", params={}) + await write_stream.send(SessionMessage(request)) + + with anyio.fail_after(3): + received = await read_stream.receive() + assert isinstance(received, httpx.HTTPStatusError) + assert received.response.status_code == post_status + + +# ---- Integration tests against a real uvicorn MCP server (issue #2110) ---- + + +def make_failing_post_server_app(post_status: int) -> Starlette: # pragma: no cover + """Starlette app with a real SSE GET endpoint but a POST that always fails. + + Used by the integration test for issue #2110 — the SSE GET handshake is + the real `SseServerTransport`, so the client receives a genuine endpoint + event; the POST route is replaced so every client message fails with the + given status. + """ + security_settings = TransportSecuritySettings( + allowed_hosts=["127.0.0.1:*", "localhost:*"], allowed_origins=["http://127.0.0.1:*", "http://localhost:*"] + ) + sse = SseServerTransport("/messages/", security_settings=security_settings) + server = _create_server() + + async def handle_sse(request: Request) -> Response: + async with sse.connect_sse(request.scope, request.receive, request._send) as streams: + await server.run(streams[0], streams[1], server.create_initialization_options()) + return Response() + + async def failing_post(request: Request) -> Response: + return Response(status_code=post_status, content=f"deliberate {post_status} for #2110".encode()) + + return Starlette( + routes=[ + Route("/sse", endpoint=handle_sse), + Route("/messages/", endpoint=failing_post, methods=["POST"]), + ] + ) + + +def run_failing_post_server(server_port: int, post_status: int) -> None: # pragma: no cover + app = make_failing_post_server_app(post_status) + uvicorn.Server(config=uvicorn.Config(app=app, host="127.0.0.1", port=server_port, log_level="error")).run() + + +@pytest.mark.anyio +@pytest.mark.parametrize("post_status", [401, 404, 500]) +async def test_sse_client_real_server_surfaces_post_http_error(server_port: int, post_status: int) -> None: + """End-to-end integration test for issue #2110. + + Against a real uvicorn-hosted MCP server whose POST endpoint returns + 401/404/500, the client must surface the error within a bounded timeout + rather than hanging forever. Before the fix, this test hit `fail_after(5)` + because `post_writer` swallowed the exception. + """ + proc = multiprocessing.Process( + target=run_failing_post_server, + kwargs={"server_port": server_port, "post_status": post_status}, + daemon=True, + ) + proc.start() + try: + wait_for_server(server_port) + server_url = f"http://127.0.0.1:{server_port}" + with anyio.fail_after(5): + async with sse_client(server_url + "/sse") as (read_stream, write_stream): + request = types.JSONRPCRequest(jsonrpc="2.0", id=1, method="ping", params={}) + await write_stream.send(SessionMessage(request)) + received = await read_stream.receive() + assert isinstance(received, httpx.HTTPStatusError) + assert received.response.status_code == post_status + finally: + proc.kill() + proc.join(timeout=2) + + +def make_sse_only_server_app() -> Starlette: # pragma: no cover + """Starlette app with SSE GET but NO /messages/ POST route (all POSTs 404).""" + security_settings = TransportSecuritySettings( + allowed_hosts=["127.0.0.1:*", "localhost:*"], + allowed_origins=["http://127.0.0.1:*", "http://localhost:*"], + ) + sse = SseServerTransport("/messages/", security_settings=security_settings) + server = _create_server() + + async def handle_sse(request: Request) -> Response: + async with sse.connect_sse(request.scope, request.receive, request._send) as streams: + await server.run(streams[0], streams[1], server.create_initialization_options()) + return Response() + + return Starlette(routes=[Route("/sse", endpoint=handle_sse)]) + + +def run_sse_only_server(port: int) -> None: # pragma: no cover + uvicorn.Server( + config=uvicorn.Config(app=make_sse_only_server_app(), host="127.0.0.1", port=port, log_level="error") + ).run() + + +@pytest.mark.anyio +async def test_sse_client_real_server_handles_no_route_match(server_port: int) -> None: + """End-to-end test: if the server has no POST route at all, httpx surfaces + the 404 to the caller via the read stream rather than hanging. + + This catches the 'server is up but refusing POSTs' failure mode, which is + distinct from the deliberate-status test above. + """ + proc = multiprocessing.Process(target=run_sse_only_server, kwargs={"port": server_port}, daemon=True) + proc.start() + try: + wait_for_server(server_port) + server_url = f"http://127.0.0.1:{server_port}" + with anyio.fail_after(5): + async with sse_client(server_url + "/sse") as (read_stream, write_stream): + request = types.JSONRPCRequest(jsonrpc="2.0", id=1, method="ping", params={}) + await write_stream.send(SessionMessage(request)) + received = await read_stream.receive() + assert isinstance(received, httpx.HTTPStatusError) + assert received.response.status_code == 404 + finally: + proc.kill() + proc.join(timeout=2) diff --git a/tests/shared/test_streamable_http.py b/tests/shared/test_streamable_http.py index 3d5770fb6..cb77ec1eb 100644 --- a/tests/shared/test_streamable_http.py +++ b/tests/shared/test_streamable_http.py @@ -2318,3 +2318,128 @@ async def test_streamable_http_client_preserves_custom_with_mcp_headers( assert "content-type" in headers_data assert headers_data["content-type"] == "application/json" + + +@pytest.mark.anyio +async def test_streamable_http_client_propagates_post_network_error_to_caller() -> None: + """Regression for https://github.com/modelcontextprotocol/python-sdk/issues/2110. + + When a network-level error occurs during POST (e.g. ConnectError), the + caller must receive the exception via the read stream instead of hanging + indefinitely while the error is silently logged. + """ + + def handler(request: httpx.Request) -> httpx.Response: + raise httpx.ConnectError("simulated network failure") + + mock_client = httpx.AsyncClient(transport=httpx.MockTransport(handler)) + + async with mock_client: + async with streamable_http_client("http://test/mcp", http_client=mock_client) as ( + read_stream, + write_stream, + ): + request = JSONRPCRequest(jsonrpc="2.0", id=1, method="ping", params={}) + await write_stream.send(SessionMessage(request)) + + with anyio.fail_after(3): + received = await read_stream.receive() + assert isinstance(received, httpx.ConnectError) + + +# ---- Integration tests against a real uvicorn server (issue #2110) ---- + + +def _fixed_status_mcp_app(post_status: int) -> Starlette: # pragma: no cover + """Starlette app whose POST /mcp always returns the given status.""" + from starlette.responses import Response as StarletteResponse + from starlette.routing import Route + + async def failing_post(request: Request) -> StarletteResponse: + return StarletteResponse(status_code=post_status, content=f"deliberate {post_status} for #2110".encode()) + + return Starlette(routes=[Route("/mcp", endpoint=failing_post, methods=["POST", "GET", "DELETE"])]) + + +def _run_fixed_status_mcp_server(port: int, post_status: int) -> None: # pragma: no cover + uvicorn.Server( + config=uvicorn.Config(app=_fixed_status_mcp_app(post_status), host="127.0.0.1", port=port, log_level="error") + ).run() + + +@pytest.fixture +def failing_mcp_server_port() -> int: + with socket.socket() as s: + s.bind(("127.0.0.1", 0)) + return s.getsockname()[1] + + +@pytest.mark.anyio +@pytest.mark.parametrize( + "post_status,expected_rpc_code,expected_message_fragment", + [ + (500, types.INTERNAL_ERROR, "500"), + (401, types.INTERNAL_ERROR, "401"), + (404, types.INVALID_REQUEST, "Session terminated"), + ], +) +async def test_streamable_http_client_real_server_preserves_http_status( + failing_mcp_server_port: int, + post_status: int, + expected_rpc_code: int, + expected_message_fragment: str, +) -> None: + """End-to-end integration test for issue #2110. + + Against a real uvicorn server whose POST /mcp returns 401/404/500, the + client must deliver a `JSONRPCError` to the read stream that carries the + HTTP status code in `error.data`, within a bounded timeout. Before the + fix, the status was lost (mapped to JSON-RPC error codes with no HTTP + context), and the 404 'session terminated' branch had no reference at all + to the underlying HTTP status. + """ + proc = multiprocessing.Process( + target=_run_fixed_status_mcp_server, + kwargs={"port": failing_mcp_server_port, "post_status": post_status}, + daemon=True, + ) + proc.start() + try: + wait_for_server(failing_mcp_server_port) + url = f"http://127.0.0.1:{failing_mcp_server_port}/mcp" + with anyio.fail_after(5): + async with streamable_http_client(url) as (read_stream, write_stream): + request = JSONRPCRequest(jsonrpc="2.0", id=1, method="ping", params={}) + await write_stream.send(SessionMessage(request)) + received = await read_stream.receive() + + assert isinstance(received, SessionMessage) + assert isinstance(received.message, types.JSONRPCError) + assert received.message.error.code == expected_rpc_code + assert received.message.error.data == {"http_status": post_status} + assert expected_message_fragment in received.message.error.message + finally: + proc.kill() + proc.join(timeout=2) + + +@pytest.mark.anyio +async def test_streamable_http_client_real_server_surfaces_network_error( + failing_mcp_server_port: int, +) -> None: + """End-to-end integration test for issue #2110. + + When the server is entirely unreachable (nothing listening), the client + must surface the network error on the read stream within a bounded + timeout, not hang waiting for a response. Before the fix, `post_writer` + caught the exception and only logged it, leaving the caller blocked. + """ + # failing_mcp_server_port is reserved but nothing is bound — this gives us + # a port that is guaranteed to refuse connections. + url = f"http://127.0.0.1:{failing_mcp_server_port}/mcp" + with anyio.fail_after(5): + async with streamable_http_client(url) as (read_stream, write_stream): + request = JSONRPCRequest(jsonrpc="2.0", id=1, method="ping", params={}) + await write_stream.send(SessionMessage(request)) + received = await read_stream.receive() + assert isinstance(received, httpx.ConnectError)