Skip to content

Commit 77b7e4b

Browse files
committed
fix(stdio): allow configurable memory stream buffer size in stdio_server
Add read_stream_buffer_size and write_stream_buffer_size parameters to stdio_server() to allow decoupling the stdin reader from the message processor. With the default buffer_size=0, the reader blocks on send() until the processor consumes the message, causing the server to become unresponsive during slow operations. A non-zero buffer allows the reader to queue messages ahead, preventing ping timeouts and request starvation. Closes #1333
1 parent 7c02248 commit 77b7e4b

2 files changed

Lines changed: 83 additions & 3 deletions

File tree

src/mcp/server/stdio.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,12 @@ async def run_server():
3030

3131

3232
@asynccontextmanager
33-
async def stdio_server(stdin: anyio.AsyncFile[str] | None = None, stdout: anyio.AsyncFile[str] | None = None):
33+
async def stdio_server(
34+
stdin: anyio.AsyncFile[str] | None = None,
35+
stdout: anyio.AsyncFile[str] | None = None,
36+
read_stream_buffer_size: int = 0,
37+
write_stream_buffer_size: int = 0,
38+
):
3439
"""Server transport for stdio: this communicates with an MCP client by reading
3540
from the current process' stdin and writing to stdout.
3641
"""
@@ -49,8 +54,8 @@ async def stdio_server(stdin: anyio.AsyncFile[str] | None = None, stdout: anyio.
4954
write_stream: MemoryObjectSendStream[SessionMessage]
5055
write_stream_reader: MemoryObjectReceiveStream[SessionMessage]
5156

52-
read_stream_writer, read_stream = anyio.create_memory_object_stream(0)
53-
write_stream, write_stream_reader = anyio.create_memory_object_stream(0)
57+
read_stream_writer, read_stream = anyio.create_memory_object_stream(read_stream_buffer_size)
58+
write_stream, write_stream_reader = anyio.create_memory_object_stream(write_stream_buffer_size)
5459

5560
async def stdin_reader():
5661
try:

tests/server/test_stdio.py

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,3 +59,78 @@ async def test_stdio_server():
5959
assert len(received_responses) == 2
6060
assert received_responses[0] == JSONRPCRequest(jsonrpc="2.0", id=3, method="ping")
6161
assert received_responses[1] == JSONRPCResponse(jsonrpc="2.0", id=4, result={})
62+
63+
64+
@pytest.mark.anyio
65+
async def test_stdio_server_with_buffer_size():
66+
"""Test that stdio_server works with configurable buffer sizes."""
67+
stdin = io.StringIO()
68+
stdout = io.StringIO()
69+
70+
messages = [
71+
JSONRPCRequest(jsonrpc="2.0", id=1, method="ping"),
72+
JSONRPCRequest(jsonrpc="2.0", id=2, method="ping"),
73+
JSONRPCRequest(jsonrpc="2.0", id=3, method="ping"),
74+
]
75+
76+
for message in messages:
77+
stdin.write(message.model_dump_json(by_alias=True, exclude_none=True) + "\n")
78+
stdin.seek(0)
79+
80+
async with stdio_server(
81+
stdin=anyio.AsyncFile(stdin),
82+
stdout=anyio.AsyncFile(stdout),
83+
read_stream_buffer_size=5,
84+
write_stream_buffer_size=5,
85+
) as (read_stream, write_stream):
86+
received_messages: list[JSONRPCMessage] = []
87+
async with read_stream:
88+
async for message in read_stream:
89+
if isinstance(message, Exception):
90+
raise message
91+
received_messages.append(message.message)
92+
if len(received_messages) == 3:
93+
break
94+
95+
assert len(received_messages) == 3
96+
for i, msg in enumerate(received_messages, 1):
97+
assert msg == JSONRPCRequest(jsonrpc="2.0", id=i, method="ping")
98+
99+
100+
@pytest.mark.anyio
101+
async def test_stdio_server_buffered_does_not_block_reader():
102+
"""Test that a non-zero buffer allows stdin_reader to continue reading
103+
even when the consumer is slow to process messages.
104+
105+
With buffer_size=0, the reader blocks on send() until the consumer calls
106+
receive(). With buffer_size>0, the reader can queue messages ahead.
107+
"""
108+
stdin = io.StringIO()
109+
stdout = io.StringIO()
110+
111+
num_messages = 5
112+
for i in range(1, num_messages + 1):
113+
msg = JSONRPCRequest(jsonrpc="2.0", id=i, method="ping")
114+
stdin.write(msg.model_dump_json(by_alias=True, exclude_none=True) + "\n")
115+
stdin.seek(0)
116+
117+
async with stdio_server(
118+
stdin=anyio.AsyncFile(stdin),
119+
stdout=anyio.AsyncFile(stdout),
120+
read_stream_buffer_size=num_messages,
121+
) as (read_stream, write_stream):
122+
# Give the reader time to buffer all messages
123+
await anyio.sleep(0.1)
124+
125+
received: list[JSONRPCMessage] = []
126+
async with read_stream:
127+
async for message in read_stream:
128+
if isinstance(message, Exception):
129+
raise message
130+
received.append(message.message)
131+
# Simulate slow processing
132+
await anyio.sleep(0.01)
133+
if len(received) == num_messages:
134+
break
135+
136+
assert len(received) == num_messages

0 commit comments

Comments
 (0)