Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 14 additions & 9 deletions src/mcp/client/sse.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,15 +121,20 @@ async def post_writer(endpoint_url: str):

async def _send_message(session_message: SessionMessage) -> None:
logger.debug(f"Sending client message: {session_message}")
response = await client.post(
endpoint_url,
json=session_message.message.model_dump(
by_alias=True,
mode="json",
exclude_unset=True,
),
)
response.raise_for_status()
try:
response = await client.post(
endpoint_url,
json=session_message.message.model_dump(
by_alias=True,
mode="json",
exclude_unset=True,
),
)
response.raise_for_status()
except httpx.HTTPError as exc:
logger.exception("Error sending client message")
await read_stream_writer.send(exc)
return
logger.debug(f"Client message sent successfully: {response.status_code}")

async for session_message in write_stream_reader:
Expand Down
24 changes: 18 additions & 6 deletions src/mcp/client/streamable_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,14 +269,22 @@ async def _handle_post_request(self, ctx: RequestContext) -> None:

if response.status_code == 404: # pragma: no branch
if isinstance(message, JSONRPCRequest): # pragma: no branch
error_data = ErrorData(code=INVALID_REQUEST, message="Session terminated")
error_data = ErrorData(
code=INVALID_REQUEST,
message="Session terminated",
data={"http_status": response.status_code},
)
session_message = SessionMessage(JSONRPCError(jsonrpc="2.0", id=message.id, error=error_data))
await ctx.read_stream_writer.send(session_message)
return

if response.status_code >= 400:
if isinstance(message, JSONRPCRequest):
error_data = ErrorData(code=INTERNAL_ERROR, message="Server returned an error response")
error_data = ErrorData(
code=INTERNAL_ERROR,
message=f"Server returned an error response (HTTP {response.status_code})",
data={"http_status": response.status_code},
)
session_message = SessionMessage(JSONRPCError(jsonrpc="2.0", id=message.id, error=error_data))
await ctx.read_stream_writer.send(session_message)
return
Expand Down Expand Up @@ -468,10 +476,14 @@ async def _handle_message(session_message: SessionMessage) -> None:
)

async def handle_request_async():
if is_resumption:
await self._handle_resumption_request(ctx)
else:
await self._handle_post_request(ctx)
try:
if is_resumption:
await self._handle_resumption_request(ctx)
else:
await self._handle_post_request(ctx)
except httpx.HTTPError as exc:
logger.exception("Error sending client message")
await read_stream_writer.send(exc)

# If this is a request, start a new task to handle it
if isinstance(message, JSONRPCRequest):
Expand Down
170 changes: 170 additions & 0 deletions tests/shared/test_sse.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from mcp.server.sse import SseServerTransport
from mcp.server.transport_security import TransportSecuritySettings
from mcp.shared.exceptions import MCPError
from mcp.shared.message import SessionMessage
from mcp.types import (
CallToolRequestParams,
CallToolResult,
Expand Down Expand Up @@ -629,3 +630,172 @@ async def test_sse_session_cleanup_on_disconnect(server: None, server_url: str)
headers={"Content-Type": "application/json"},
)
assert response.status_code == 404


class _LongLivedSSEStream(httpx.AsyncByteStream):
"""A streaming SSE body that emits one endpoint event then stays open.

This mirrors a real SSE server: the stream does not end after the endpoint
event, so the client's `sse_reader` keeps running while we exercise the
POST path. The stream unblocks when the outer task group is cancelled.
"""

def __init__(self, endpoint_path: str) -> None:
self._endpoint_path = endpoint_path

async def __aiter__(self) -> AsyncGenerator[bytes, None]:
yield f"event: endpoint\ndata: {self._endpoint_path}\n\n".encode()
await anyio.sleep_forever()


@pytest.mark.anyio
@pytest.mark.parametrize("post_status", [401, 404, 500])
async def test_sse_client_propagates_post_http_error_to_caller(post_status: int) -> None:
"""Regression for https://github.com/modelcontextprotocol/python-sdk/issues/2110.

When the SSE POST endpoint returns a non-2xx HTTP status, the caller must
receive the HTTPStatusError via the read stream instead of hanging on an
indefinite wait for a response that will never arrive.
"""

def handler(request: httpx.Request) -> httpx.Response:
if request.method == "GET":
return httpx.Response(
200,
headers={"content-type": "text/event-stream"},
stream=_LongLivedSSEStream("/messages/?session_id=test"),
)
return httpx.Response(post_status)

def mock_factory(
headers: dict[str, Any] | None = None,
timeout: httpx.Timeout | None = None,
auth: httpx.Auth | None = None,
) -> httpx.AsyncClient:
return httpx.AsyncClient(transport=httpx.MockTransport(handler))

async with sse_client("http://test/sse", httpx_client_factory=mock_factory) as (read_stream, write_stream):
request = types.JSONRPCRequest(jsonrpc="2.0", id=1, method="ping", params={})
await write_stream.send(SessionMessage(request))

with anyio.fail_after(3):
received = await read_stream.receive()
assert isinstance(received, httpx.HTTPStatusError)
assert received.response.status_code == post_status


# ---- Integration tests against a real uvicorn MCP server (issue #2110) ----


def make_failing_post_server_app(post_status: int) -> Starlette: # pragma: no cover
"""Starlette app with a real SSE GET endpoint but a POST that always fails.

Used by the integration test for issue #2110 — the SSE GET handshake is
the real `SseServerTransport`, so the client receives a genuine endpoint
event; the POST route is replaced so every client message fails with the
given status.
"""
security_settings = TransportSecuritySettings(
allowed_hosts=["127.0.0.1:*", "localhost:*"], allowed_origins=["http://127.0.0.1:*", "http://localhost:*"]
)
sse = SseServerTransport("/messages/", security_settings=security_settings)
server = _create_server()

async def handle_sse(request: Request) -> Response:
async with sse.connect_sse(request.scope, request.receive, request._send) as streams:
await server.run(streams[0], streams[1], server.create_initialization_options())
return Response()

async def failing_post(request: Request) -> Response:
return Response(status_code=post_status, content=f"deliberate {post_status} for #2110".encode())

return Starlette(
routes=[
Route("/sse", endpoint=handle_sse),
Route("/messages/", endpoint=failing_post, methods=["POST"]),
]
)


def run_failing_post_server(server_port: int, post_status: int) -> None: # pragma: no cover
app = make_failing_post_server_app(post_status)
uvicorn.Server(config=uvicorn.Config(app=app, host="127.0.0.1", port=server_port, log_level="error")).run()


@pytest.mark.anyio
@pytest.mark.parametrize("post_status", [401, 404, 500])
async def test_sse_client_real_server_surfaces_post_http_error(server_port: int, post_status: int) -> None:
"""End-to-end integration test for issue #2110.

Against a real uvicorn-hosted MCP server whose POST endpoint returns
401/404/500, the client must surface the error within a bounded timeout
rather than hanging forever. Before the fix, this test hit `fail_after(5)`
because `post_writer` swallowed the exception.
"""
proc = multiprocessing.Process(
target=run_failing_post_server,
kwargs={"server_port": server_port, "post_status": post_status},
daemon=True,
)
proc.start()
try:
wait_for_server(server_port)
server_url = f"http://127.0.0.1:{server_port}"
with anyio.fail_after(5):
async with sse_client(server_url + "/sse") as (read_stream, write_stream):
request = types.JSONRPCRequest(jsonrpc="2.0", id=1, method="ping", params={})
await write_stream.send(SessionMessage(request))
received = await read_stream.receive()
assert isinstance(received, httpx.HTTPStatusError)
assert received.response.status_code == post_status
finally:
proc.kill()
proc.join(timeout=2)


def make_sse_only_server_app() -> Starlette: # pragma: no cover
"""Starlette app with SSE GET but NO /messages/ POST route (all POSTs 404)."""
security_settings = TransportSecuritySettings(
allowed_hosts=["127.0.0.1:*", "localhost:*"],
allowed_origins=["http://127.0.0.1:*", "http://localhost:*"],
)
sse = SseServerTransport("/messages/", security_settings=security_settings)
server = _create_server()

async def handle_sse(request: Request) -> Response:
async with sse.connect_sse(request.scope, request.receive, request._send) as streams:
await server.run(streams[0], streams[1], server.create_initialization_options())
return Response()

return Starlette(routes=[Route("/sse", endpoint=handle_sse)])


def run_sse_only_server(port: int) -> None: # pragma: no cover
uvicorn.Server(
config=uvicorn.Config(app=make_sse_only_server_app(), host="127.0.0.1", port=port, log_level="error")
).run()


@pytest.mark.anyio
async def test_sse_client_real_server_handles_no_route_match(server_port: int) -> None:
"""End-to-end test: if the server has no POST route at all, httpx surfaces
the 404 to the caller via the read stream rather than hanging.

This catches the 'server is up but refusing POSTs' failure mode, which is
distinct from the deliberate-status test above.
"""
proc = multiprocessing.Process(target=run_sse_only_server, kwargs={"port": server_port}, daemon=True)
proc.start()
try:
wait_for_server(server_port)
server_url = f"http://127.0.0.1:{server_port}"
with anyio.fail_after(5):
async with sse_client(server_url + "/sse") as (read_stream, write_stream):
request = types.JSONRPCRequest(jsonrpc="2.0", id=1, method="ping", params={})
await write_stream.send(SessionMessage(request))
received = await read_stream.receive()
assert isinstance(received, httpx.HTTPStatusError)
assert received.response.status_code == 404
finally:
proc.kill()
proc.join(timeout=2)
125 changes: 125 additions & 0 deletions tests/shared/test_streamable_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -2318,3 +2318,128 @@ async def test_streamable_http_client_preserves_custom_with_mcp_headers(

assert "content-type" in headers_data
assert headers_data["content-type"] == "application/json"


@pytest.mark.anyio
async def test_streamable_http_client_propagates_post_network_error_to_caller() -> None:
"""Regression for https://github.com/modelcontextprotocol/python-sdk/issues/2110.

When a network-level error occurs during POST (e.g. ConnectError), the
caller must receive the exception via the read stream instead of hanging
indefinitely while the error is silently logged.
"""

def handler(request: httpx.Request) -> httpx.Response:
raise httpx.ConnectError("simulated network failure")

mock_client = httpx.AsyncClient(transport=httpx.MockTransport(handler))

async with mock_client:
async with streamable_http_client("http://test/mcp", http_client=mock_client) as (
read_stream,
write_stream,
):
request = JSONRPCRequest(jsonrpc="2.0", id=1, method="ping", params={})
await write_stream.send(SessionMessage(request))

with anyio.fail_after(3):
received = await read_stream.receive()
assert isinstance(received, httpx.ConnectError)


# ---- Integration tests against a real uvicorn server (issue #2110) ----


def _fixed_status_mcp_app(post_status: int) -> Starlette: # pragma: no cover
"""Starlette app whose POST /mcp always returns the given status."""
from starlette.responses import Response as StarletteResponse
from starlette.routing import Route

async def failing_post(request: Request) -> StarletteResponse:
return StarletteResponse(status_code=post_status, content=f"deliberate {post_status} for #2110".encode())

return Starlette(routes=[Route("/mcp", endpoint=failing_post, methods=["POST", "GET", "DELETE"])])


def _run_fixed_status_mcp_server(port: int, post_status: int) -> None: # pragma: no cover
uvicorn.Server(
config=uvicorn.Config(app=_fixed_status_mcp_app(post_status), host="127.0.0.1", port=port, log_level="error")
).run()


@pytest.fixture
def failing_mcp_server_port() -> int:
with socket.socket() as s:
s.bind(("127.0.0.1", 0))
return s.getsockname()[1]


@pytest.mark.anyio
@pytest.mark.parametrize(
"post_status,expected_rpc_code,expected_message_fragment",
[
(500, types.INTERNAL_ERROR, "500"),
(401, types.INTERNAL_ERROR, "401"),
(404, types.INVALID_REQUEST, "Session terminated"),
],
)
async def test_streamable_http_client_real_server_preserves_http_status(
failing_mcp_server_port: int,
post_status: int,
expected_rpc_code: int,
expected_message_fragment: str,
) -> None:
"""End-to-end integration test for issue #2110.

Against a real uvicorn server whose POST /mcp returns 401/404/500, the
client must deliver a `JSONRPCError` to the read stream that carries the
HTTP status code in `error.data`, within a bounded timeout. Before the
fix, the status was lost (mapped to JSON-RPC error codes with no HTTP
context), and the 404 'session terminated' branch had no reference at all
to the underlying HTTP status.
"""
proc = multiprocessing.Process(
target=_run_fixed_status_mcp_server,
kwargs={"port": failing_mcp_server_port, "post_status": post_status},
daemon=True,
)
proc.start()
try:
wait_for_server(failing_mcp_server_port)
url = f"http://127.0.0.1:{failing_mcp_server_port}/mcp"
with anyio.fail_after(5):
async with streamable_http_client(url) as (read_stream, write_stream):
request = JSONRPCRequest(jsonrpc="2.0", id=1, method="ping", params={})
await write_stream.send(SessionMessage(request))
received = await read_stream.receive()

assert isinstance(received, SessionMessage)
assert isinstance(received.message, types.JSONRPCError)
assert received.message.error.code == expected_rpc_code
assert received.message.error.data == {"http_status": post_status}
assert expected_message_fragment in received.message.error.message
finally:
proc.kill()
proc.join(timeout=2)


@pytest.mark.anyio
async def test_streamable_http_client_real_server_surfaces_network_error(
failing_mcp_server_port: int,
) -> None:
"""End-to-end integration test for issue #2110.

When the server is entirely unreachable (nothing listening), the client
must surface the network error on the read stream within a bounded
timeout, not hang waiting for a response. Before the fix, `post_writer`
caught the exception and only logged it, leaving the caller blocked.
"""
# failing_mcp_server_port is reserved but nothing is bound — this gives us
# a port that is guaranteed to refuse connections.
url = f"http://127.0.0.1:{failing_mcp_server_port}/mcp"
with anyio.fail_after(5):
async with streamable_http_client(url) as (read_stream, write_stream):
request = JSONRPCRequest(jsonrpc="2.0", id=1, method="ping", params={})
await write_stream.send(SessionMessage(request))
received = await read_stream.receive()
assert isinstance(received, httpx.ConnectError)
Loading