Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/mcp/server/lowlevel/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -567,6 +567,7 @@ def streamable_http_app(
stateless_http: bool = False,
event_store: EventStore | None = None,
retry_interval: int | None = None,
session_idle_timeout: float | None = None,
transport_security: TransportSecuritySettings | None = None,
host: str = "127.0.0.1",
auth: AuthSettings | None = None,
Expand All @@ -591,6 +592,7 @@ def streamable_http_app(
json_response=json_response,
stateless=stateless_http,
security_settings=transport_security,
session_idle_timeout=session_idle_timeout,
)
self._session_manager = session_manager

Expand Down
2 changes: 2 additions & 0 deletions src/mcp/server/mcpserver/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -1050,6 +1050,7 @@ def streamable_http_app(
stateless_http: bool = False,
event_store: EventStore | None = None,
retry_interval: int | None = None,
session_idle_timeout: float | None = None,
transport_security: TransportSecuritySettings | None = None,
host: str = "127.0.0.1",
) -> Starlette:
Expand All @@ -1060,6 +1061,7 @@ def streamable_http_app(
stateless_http=stateless_http,
event_store=event_store,
retry_interval=retry_interval,
session_idle_timeout=session_idle_timeout,
transport_security=transport_security,
host=host,
auth=self.settings.auth,
Expand Down
4 changes: 4 additions & 0 deletions src/mcp/server/streamable_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,10 @@ def __init__(
self._terminated = False
# Idle timeout cancel scope; managed by the session manager.
self.idle_scope: anyio.CancelScope | None = None
# Number of requests currently in flight on this session. While > 0,
# the session manager suspends the idle deadline so that a long-running
# request cannot be reaped mid-flight.
self.idle_active_requests: int = 0

@property
def is_terminated(self) -> bool:
Expand Down
36 changes: 31 additions & 5 deletions src/mcp/server/streamable_http_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import contextlib
import logging
import math
from collections.abc import AsyncIterator
from http import HTTPStatus
from typing import TYPE_CHECKING, Any
Expand Down Expand Up @@ -136,6 +137,32 @@ async def lifespan(app: Starlette) -> AsyncIterator[None]:
# Clear any remaining server instances
self._server_instances.clear()

@contextlib.asynccontextmanager
async def _suspend_idle_timeout(self, transport: StreamableHTTPServerTransport) -> AsyncIterator[None]:
"""Suspend the idle-timeout deadline while a request is in flight.

The idle timeout exists to reap sessions that receive no HTTP requests
for ``session_idle_timeout`` seconds; a request that is currently being
processed should not count as an idle session. While at least one
request is in flight we set the deadline to ``math.inf``; once the
last concurrent request completes we restore ``now + timeout``.
"""
active = transport.idle_scope is not None and self.session_idle_timeout is not None
if active:
transport.idle_active_requests += 1
if transport.idle_active_requests == 1:
assert transport.idle_scope is not None
transport.idle_scope.deadline = math.inf
try:
yield
finally:
if active:
transport.idle_active_requests -= 1
if transport.idle_active_requests == 0:
assert transport.idle_scope is not None
assert self.session_idle_timeout is not None
transport.idle_scope.deadline = anyio.current_time() + self.session_idle_timeout

async def handle_request(self, scope: Scope, receive: Receive, send: Send) -> None:
"""Process ASGI request with proper session handling and transport setup.

Expand Down Expand Up @@ -196,10 +223,8 @@ async def _handle_stateful_request(self, scope: Scope, receive: Receive, send: S
if request_mcp_session_id is not None and request_mcp_session_id in self._server_instances:
transport = self._server_instances[request_mcp_session_id]
logger.debug("Session already exists, handling request directly")
# Push back idle deadline on activity
if transport.idle_scope is not None and self.session_idle_timeout is not None:
transport.idle_scope.deadline = anyio.current_time() + self.session_idle_timeout # pragma: no cover
await transport.handle_request(scope, receive, send)
async with self._suspend_idle_timeout(transport):
await transport.handle_request(scope, receive, send)
return

if request_mcp_session_id is None:
Expand Down Expand Up @@ -267,7 +292,8 @@ async def run_server(*, task_status: TaskStatus[None] = anyio.TASK_STATUS_IGNORE
await self._task_group.start(run_server)

# Handle the HTTP request and return the response
await http_transport.handle_request(scope, receive, send)
async with self._suspend_idle_timeout(http_transport):
await http_transport.handle_request(scope, receive, send)
else:
# Unknown or expired session ID - return 404 per MCP spec
# TODO: Align error code once spec clarifies
Expand Down
181 changes: 181 additions & 0 deletions tests/server/test_streamable_http_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -413,3 +413,184 @@ def test_session_idle_timeout_rejects_non_positive():
def test_session_idle_timeout_rejects_stateless():
with pytest.raises(RuntimeError, match="not supported in stateless"):
StreamableHTTPSessionManager(app=Server("test"), session_idle_timeout=30, stateless=True)


def test_session_idle_timeout_passthrough_lowlevel():
"""Server.streamable_http_app() exposes session_idle_timeout to its manager."""
app = Server("test-passthrough")
app.streamable_http_app(session_idle_timeout=42)
assert app._session_manager is not None
assert app._session_manager.session_idle_timeout == 42


def test_session_idle_timeout_passthrough_mcpserver():
"""MCPServer.streamable_http_app() forwards session_idle_timeout to the low-level wrapper."""
from mcp.server.mcpserver.server import MCPServer

server = MCPServer("test-passthrough-mcp")
server.streamable_http_app(session_idle_timeout=17)
assert server._lowlevel_server._session_manager is not None
assert server._lowlevel_server._session_manager.session_idle_timeout == 17


@pytest.mark.anyio
async def test_suspend_idle_timeout_sets_deadline_inf_then_restores():
"""The helper must set deadline=inf for the duration of a request, then restore now+timeout."""
import math

app = Server("test-suspend")
manager = StreamableHTTPSessionManager(app=app, session_idle_timeout=30)
transport = StreamableHTTPServerTransport(mcp_session_id="s1")
transport.idle_scope = anyio.CancelScope()
transport.idle_scope.deadline = anyio.current_time() + 30

async with manager._suspend_idle_timeout(transport):
assert transport.idle_active_requests == 1
assert transport.idle_scope.deadline == math.inf

assert transport.idle_active_requests == 0
# Deadline restored to a finite value approximately now + timeout
assert transport.idle_scope.deadline != math.inf
assert transport.idle_scope.deadline > anyio.current_time()


@pytest.mark.anyio
async def test_suspend_idle_timeout_only_restores_after_last_concurrent_request():
"""With nested suspensions the deadline stays suspended until the outermost exit."""
import math

app = Server("test-suspend-nested")
manager = StreamableHTTPSessionManager(app=app, session_idle_timeout=30)
transport = StreamableHTTPServerTransport(mcp_session_id="s1")
transport.idle_scope = anyio.CancelScope()
transport.idle_scope.deadline = anyio.current_time() + 30

async with manager._suspend_idle_timeout(transport):
async with manager._suspend_idle_timeout(transport):
assert transport.idle_active_requests == 2
assert transport.idle_scope.deadline == math.inf
# After the inner exit, one request still in flight — deadline must remain suspended.
assert transport.idle_active_requests == 1
assert transport.idle_scope.deadline == math.inf

assert transport.idle_active_requests == 0
assert transport.idle_scope.deadline != math.inf


@pytest.mark.anyio
async def test_suspend_idle_timeout_no_op_without_timeout():
"""When session_idle_timeout is None the helper must not touch any state."""
app = Server("test-suspend-noop")
manager = StreamableHTTPSessionManager(app=app) # no timeout
transport = StreamableHTTPServerTransport(mcp_session_id="s1")

async with manager._suspend_idle_timeout(transport):
assert transport.idle_active_requests == 0
assert transport.idle_scope is None

assert transport.idle_active_requests == 0


@pytest.mark.anyio
async def test_long_running_request_outlives_idle_timeout():
"""A handler running longer than session_idle_timeout must still complete successfully.

This is the regression for the second half of #2455: previously the idle scope
fired mid-request and cancelled the in-flight tool call.
"""
from mcp.server import ServerRequestContext
from mcp.types import (
CallToolRequestParams,
CallToolResult,
ListToolsResult,
PaginatedRequestParams,
TextContent,
Tool,
)

host = "testserver-long-request"
timeout = 0.1
handler_delay = 0.4

async def handle_list_tools(ctx: ServerRequestContext, params: PaginatedRequestParams | None) -> ListToolsResult:
return ListToolsResult(tools=[Tool(name="slow", input_schema={"type": "object"})])

async def handle_call_tool(ctx: ServerRequestContext, params: CallToolRequestParams) -> CallToolResult:
await anyio.sleep(handler_delay)
return CallToolResult(content=[TextContent(type="text", text="done")])

app = Server("test-slow-handler", on_list_tools=handle_list_tools, on_call_tool=handle_call_tool)
mcp_app = app.streamable_http_app(host=host, session_idle_timeout=timeout)

async with (
mcp_app.router.lifespan_context(mcp_app),
httpx.ASGITransport(mcp_app) as transport,
httpx.AsyncClient(transport=transport) as http_client,
Client(streamable_http_client(f"http://{host}/mcp", http_client=http_client)) as client,
):
result = await client.call_tool("slow", {})
assert not result.is_error
assert result.content[0].text == "done" # type: ignore[union-attr]


@pytest.mark.anyio
async def test_idle_session_reaped_after_request_completes():
"""After a request finishes the idle deadline resumes; the session is eventually reaped."""
app = Server("test-reap-after-request")
timeout = 0.05
manager = StreamableHTTPSessionManager(app=app, session_idle_timeout=timeout)

async with manager.run():
sent_messages: list[Message] = []

async def mock_send(message: Message):
sent_messages.append(message)

scope: dict[str, Any] = {
"type": "http",
"method": "POST",
"path": "/mcp",
"headers": [(b"content-type", b"application/json")],
}

async def mock_receive(): # pragma: no cover
return {"type": "http.request", "body": b"", "more_body": False}

await manager.handle_request(scope, mock_receive, mock_send)

session_id: str | None = None
for msg in sent_messages: # pragma: no branch
if msg["type"] == "http.response.start": # pragma: no branch
for header_name, header_value in msg.get("headers", []): # pragma: no branch
if header_name.decode().lower() == MCP_SESSION_ID_HEADER.lower():
session_id = header_value.decode()
break
if session_id: # pragma: no branch
break
assert session_id is not None

# Once the request completed the idle deadline should have resumed; wait it out.
await anyio.sleep(timeout * 4)

response_messages: list[Message] = []

async def capture_send(message: Message):
response_messages.append(message)

scope_with_session: dict[str, Any] = {
"type": "http",
"method": "POST",
"path": "/mcp",
"headers": [
(b"content-type", b"application/json"),
(b"mcp-session-id", session_id.encode()),
],
}
await manager.handle_request(scope_with_session, mock_receive, capture_send)

response_start = next(
(msg for msg in response_messages if msg["type"] == "http.response.start"),
None,
)
assert response_start is not None
assert response_start["status"] == 404
Loading