Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/fix-sse-reader-lock-leak.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@modelcontextprotocol/sdk': patch
---

Fix SSE client memory leak: release the reader lock on disconnect so the underlying reader-tied buffer (~50MB per long-lived client) can be garbage-collected.
61 changes: 34 additions & 27 deletions src/client/streamableHttp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -328,40 +328,47 @@ export class StreamableHTTPClientTransport implements Transport {
)
.getReader();

while (true) {
const { value: event, done } = await reader.read();
if (done) {
break;
}
try {
while (true) {
const { value: event, done } = await reader.read();
if (done) {
break;
}

// Update last event ID if provided
if (event.id) {
lastEventId = event.id;
// Mark that we've received a priming event - stream is now resumable
hasPrimingEvent = true;
onresumptiontoken?.(event.id);
}
// Update last event ID if provided
if (event.id) {
lastEventId = event.id;
// Mark that we've received a priming event - stream is now resumable
hasPrimingEvent = true;
onresumptiontoken?.(event.id);
}

// Skip events with no data (priming events, keep-alives)
if (!event.data) {
continue;
}
// Skip events with no data (priming events, keep-alives)
if (!event.data) {
continue;
}

if (!event.event || event.event === 'message') {
try {
const message = JSONRPCMessageSchema.parse(JSON.parse(event.data));
if (isJSONRPCResultResponse(message)) {
// Mark that we received a response - no need to reconnect for this request
receivedResponse = true;
if (replayMessageId !== undefined) {
message.id = replayMessageId;
if (!event.event || event.event === 'message') {
try {
const message = JSONRPCMessageSchema.parse(JSON.parse(event.data));
if (isJSONRPCResultResponse(message)) {
// Mark that we received a response - no need to reconnect for this request
receivedResponse = true;
if (replayMessageId !== undefined) {
message.id = replayMessageId;
}
}
this.onmessage?.(message);
} catch (error) {
this.onerror?.(error as Error);
}
this.onmessage?.(message);
} catch (error) {
this.onerror?.(error as Error);
}
}
} finally {
// Release the reader's lock so the underlying stream and its
// buffered chunks can be GC'd. Without this, each disconnect
// leaks the decoder + parser pipeline (~50MB per reconnect).
reader.releaseLock();
}

// Handle graceful server-side disconnect
Expand Down
61 changes: 61 additions & 0 deletions test/client/streamableHttp.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1041,6 +1041,67 @@ describe('StreamableHTTPClientTransport', () => {
// Resumption token callback may be invoked, but the primary assertion
// here is that no JSON parse errors occurred for the priming event.
});

it('should release the SSE reader lock when the stream closes (regression for #1959)', async () => {
// Regression test for issue #1959: _handleSseStream acquired a reader via
// getReader() but never called releaseLock() on either the success or error
// path. That kept the decoder + parser pipeline alive across reconnects,
// leaking ~50MB per reconnect cycle.
transport = new StreamableHTTPClientTransport(new URL('http://localhost:1234/mcp'), {
reconnectionOptions: {
initialReconnectionDelay: 10,
maxRetries: 0,
maxReconnectionDelay: 1000,
reconnectionDelayGrowFactor: 1
}
});

// Spy on every reader's releaseLock so we can verify the SSE pipeline is
// tidied up on both graceful close and error paths.
const releaseSpy = vi.spyOn(ReadableStreamDefaultReader.prototype, 'releaseLock');

// Graceful-close stream: a single notification then close.
const gracefulStream = new ReadableStream({
start(controller) {
controller.enqueue(
new TextEncoder().encode('id: evt-1\ndata: {"jsonrpc":"2.0","method":"notifications/x","params":{}}\n\n')
);
controller.close();
}
});

// Error stream: pipeline aborts mid-read.
const erroringStream = new ReadableStream({
start(controller) {
controller.error(new Error('boom'));
}
});

const fetchMock = global.fetch as Mock;
fetchMock.mockResolvedValueOnce({
ok: true,
status: 200,
headers: new Headers({ 'content-type': 'text/event-stream' }),
body: gracefulStream
});
fetchMock.mockResolvedValueOnce({
ok: true,
status: 200,
headers: new Headers({ 'content-type': 'text/event-stream' }),
body: erroringStream
});

await transport.start();
await transport['_startOrAuthSse']({});
await transport['_startOrAuthSse']({});
await vi.advanceTimersByTimeAsync(50);

// releaseLock should have fired for both the graceful-close path and the
// error path. Other readers (e.g. those internal to pipeThrough) may also
// call releaseLock; we just need both of ours to be released.
expect(releaseSpy.mock.calls.length).toBeGreaterThanOrEqual(2);
releaseSpy.mockRestore();
});
});

it('invalidates all credentials on InvalidClientError during auth', async () => {
Expand Down
Loading