Skip to content

Commit 4c5cd39

Browse files
committed
feat: ServerRunner.run() and otel_middleware
run() composes dispatch_middleware over _on_request and forwards task_status to dispatcher.run() so callers can 'await tg.start(runner.run)'. otel_middleware is a DispatchMiddleware that wraps each request in a span, mirroring the existing Server._handle_request span shape: name 'MCP handle <method> [<target>]', mcp.method.name attribute, W3C trace context extracted from params._meta (SEP-414), and ERROR status if the handler raises. connection_lifespan plumbing (the enter-late dance) is deferred to a separate commit since Server.connection_lifespan is None today.
1 parent 7dc1527 commit 4c5cd39

File tree

2 files changed

+122
-2
lines changed

2 files changed

+122
-2
lines changed

src/mcp/server/runner.py

Lines changed: 52 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,15 @@
2222
from functools import partial, reduce
2323
from typing import Any, Generic, Protocol, cast
2424

25+
import anyio.abc
26+
from opentelemetry.trace import SpanKind, StatusCode
2527
from pydantic import BaseModel
2628
from typing_extensions import TypeVar
2729

2830
from mcp.server.connection import Connection
2931
from mcp.server.context import CallNext, Context, ContextMiddleware
3032
from mcp.server.lowlevel.server import NotificationOptions
33+
from mcp.shared._otel import extract_trace_context, otel_span
3134
from mcp.shared.dispatcher import DispatchContext, Dispatcher, DispatchMiddleware, OnRequest
3235
from mcp.shared.exceptions import MCPError
3336
from mcp.shared.transport_context import TransportContext
@@ -52,7 +55,7 @@
5255
UnsubscribeRequestParams,
5356
)
5457

55-
__all__ = ["CallNext", "ContextMiddleware", "ServerRegistry", "ServerRunner"]
58+
__all__ = ["CallNext", "ContextMiddleware", "ServerRegistry", "ServerRunner", "otel_middleware"]
5659

5760
logger = logging.getLogger(__name__)
5861

@@ -117,6 +120,44 @@ def get_capabilities(
117120
) -> ServerCapabilities: ...
118121

119122

123+
def otel_middleware(next_on_request: OnRequest) -> OnRequest:
124+
"""Dispatch-tier middleware that wraps each request in an OpenTelemetry span.
125+
126+
Mirrors the span shape of the existing `Server._handle_request`: span name
127+
``"MCP handle <method> [<target>]"``, ``mcp.method.name`` attribute, W3C
128+
trace context extracted from ``params._meta`` (SEP-414), and an ERROR
129+
status if the handler raises.
130+
"""
131+
132+
async def wrapped(
133+
dctx: DispatchContext[TransportContext], method: str, params: Mapping[str, Any] | None
134+
) -> dict[str, Any]:
135+
target: str | None
136+
match params:
137+
case {"name": str() as target}:
138+
pass
139+
case _:
140+
target = None
141+
parent: Any | None
142+
match params:
143+
case {"_meta": {**meta}}:
144+
parent = extract_trace_context(meta)
145+
case _:
146+
parent = None
147+
span_name = f"MCP handle {method}{f' {target}' if target else ''}"
148+
with otel_span(span_name, kind=SpanKind.SERVER, attributes={"mcp.method.name": method}, context=parent) as span:
149+
try:
150+
return await next_on_request(dctx, method, params)
151+
except MCPError as e:
152+
span.set_status(StatusCode.ERROR, e.error.message)
153+
raise
154+
except Exception as e:
155+
span.set_status(StatusCode.ERROR, str(e))
156+
raise
157+
158+
return wrapped
159+
160+
120161
def _dump_result(result: Any) -> dict[str, Any]:
121162
if result is None:
122163
return {}
@@ -145,6 +186,16 @@ def __post_init__(self) -> None:
145186
self._initialized = self.stateless
146187
self.connection = Connection(self.dispatcher, has_standalone_channel=self.has_standalone_channel)
147188

189+
async def run(self, *, task_status: anyio.abc.TaskStatus[None] = anyio.TASK_STATUS_IGNORED) -> None:
190+
"""Drive the dispatcher until the underlying channel closes.
191+
192+
Composes `dispatch_middleware` over `_on_request` and hands the result
193+
to `dispatcher.run()`. ``task_status.started()`` is forwarded so callers
194+
can ``await tg.start(runner.run)`` and resume once the dispatcher is
195+
ready to accept requests.
196+
"""
197+
await self.dispatcher.run(self._compose_on_request(), self._on_notify, task_status=task_status)
198+
148199
def _compose_on_request(self) -> OnRequest:
149200
"""Wrap `_on_request` in `dispatch_middleware`, outermost-first.
150201

tests/server/test_runner.py

Lines changed: 70 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
from mcp.server.connection import Connection
1515
from mcp.server.context import Context
1616
from mcp.server.lowlevel.server import Server
17-
from mcp.server.runner import ServerRunner
17+
from mcp.server.runner import ServerRunner, otel_middleware
1818
from mcp.shared.direct_dispatcher import create_direct_dispatcher_pair
1919
from mcp.shared.exceptions import MCPError
2020
from mcp.shared.transport_context import TransportContext
@@ -272,6 +272,75 @@ async def mw(ctx: Any, method: str, params: Any, call_next: Any) -> Any:
272272
tg.cancel_scope.cancel()
273273

274274

275+
@pytest.mark.anyio
276+
async def test_runner_run_drives_dispatcher_end_to_end(server: SrvT):
277+
client, server_d = create_direct_dispatcher_pair()
278+
runner = ServerRunner(server=server, dispatcher=server_d, lifespan_state=None, has_standalone_channel=True)
279+
c_req, c_notify = echo_handlers(Recorder())
280+
async with anyio.create_task_group() as tg:
281+
await tg.start(client.run, c_req, c_notify)
282+
await tg.start(runner.run)
283+
with anyio.fail_after(5):
284+
init = await client.send_raw_request("initialize", _initialize_params())
285+
tools = await client.send_raw_request("tools/list", None)
286+
assert init["serverInfo"]["name"] == "test-server"
287+
assert tools["tools"][0]["name"] == "t"
288+
tg.cancel_scope.cancel()
289+
290+
291+
@pytest.mark.anyio
292+
async def test_runner_run_applies_dispatch_middleware(server: SrvT):
293+
seen: list[str] = []
294+
295+
def trace_mw(next_on_request: Any) -> Any:
296+
async def wrapped(dctx: Any, method: str, params: Any) -> Any:
297+
seen.append(method)
298+
return await next_on_request(dctx, method, params)
299+
300+
return wrapped
301+
302+
client, server_d = create_direct_dispatcher_pair()
303+
runner = ServerRunner(
304+
server=server,
305+
dispatcher=server_d,
306+
lifespan_state=None,
307+
has_standalone_channel=True,
308+
dispatch_middleware=[trace_mw],
309+
)
310+
c_req, c_notify = echo_handlers(Recorder())
311+
async with anyio.create_task_group() as tg:
312+
await tg.start(client.run, c_req, c_notify)
313+
await tg.start(runner.run)
314+
with anyio.fail_after(5):
315+
await client.send_raw_request("initialize", _initialize_params())
316+
await client.send_raw_request("ping", None)
317+
assert seen == ["initialize", "ping"]
318+
tg.cancel_scope.cancel()
319+
320+
321+
@pytest.mark.anyio
322+
async def test_otel_middleware_passes_through_result_and_survives_handler_error(server: SrvT):
323+
client, server_d = create_direct_dispatcher_pair()
324+
runner = ServerRunner(
325+
server=server,
326+
dispatcher=server_d,
327+
lifespan_state=None,
328+
has_standalone_channel=True,
329+
dispatch_middleware=[otel_middleware],
330+
)
331+
c_req, c_notify = echo_handlers(Recorder())
332+
async with anyio.create_task_group() as tg:
333+
await tg.start(client.run, c_req, c_notify)
334+
await tg.start(runner.run)
335+
with anyio.fail_after(5):
336+
await client.send_raw_request("initialize", _initialize_params())
337+
tools = await client.send_raw_request("tools/list", None)
338+
assert tools["tools"][0]["name"] == "t"
339+
with pytest.raises(MCPError):
340+
await client.send_raw_request("nonexistent/method", None)
341+
tg.cancel_scope.cancel()
342+
343+
275344
@pytest.mark.anyio
276345
async def test_runner_stateless_skips_init_gate(server: SrvT):
277346
client, server_d = create_direct_dispatcher_pair()

0 commit comments

Comments
 (0)