diff --git a/.changeset/fix-sse-reader-lock-leak.md b/.changeset/fix-sse-reader-lock-leak.md new file mode 100644 index 000000000..cf9936205 --- /dev/null +++ b/.changeset/fix-sse-reader-lock-leak.md @@ -0,0 +1,5 @@ +--- +'@modelcontextprotocol/sdk': patch +--- + +Fix SSE client memory leak: release the reader lock on disconnect so the underlying reader-tied buffer (~50MB per long-lived client) can be garbage-collected. diff --git a/src/client/streamableHttp.ts b/src/client/streamableHttp.ts index 736587973..5278be858 100644 --- a/src/client/streamableHttp.ts +++ b/src/client/streamableHttp.ts @@ -328,40 +328,47 @@ export class StreamableHTTPClientTransport implements Transport { ) .getReader(); - while (true) { - const { value: event, done } = await reader.read(); - if (done) { - break; - } + try { + while (true) { + const { value: event, done } = await reader.read(); + if (done) { + break; + } - // Update last event ID if provided - if (event.id) { - lastEventId = event.id; - // Mark that we've received a priming event - stream is now resumable - hasPrimingEvent = true; - onresumptiontoken?.(event.id); - } + // Update last event ID if provided + if (event.id) { + lastEventId = event.id; + // Mark that we've received a priming event - stream is now resumable + hasPrimingEvent = true; + onresumptiontoken?.(event.id); + } - // Skip events with no data (priming events, keep-alives) - if (!event.data) { - continue; - } + // Skip events with no data (priming events, keep-alives) + if (!event.data) { + continue; + } - if (!event.event || event.event === 'message') { - try { - const message = JSONRPCMessageSchema.parse(JSON.parse(event.data)); - if (isJSONRPCResultResponse(message)) { - // Mark that we received a response - no need to reconnect for this request - receivedResponse = true; - if (replayMessageId !== undefined) { - message.id = replayMessageId; + if (!event.event || event.event === 'message') { + try { + const message = JSONRPCMessageSchema.parse(JSON.parse(event.data)); + if (isJSONRPCResultResponse(message)) { + // Mark that we received a response - no need to reconnect for this request + receivedResponse = true; + if (replayMessageId !== undefined) { + message.id = replayMessageId; + } } + this.onmessage?.(message); + } catch (error) { + this.onerror?.(error as Error); } - this.onmessage?.(message); - } catch (error) { - this.onerror?.(error as Error); } } + } finally { + // Release the reader's lock so the underlying stream and its + // buffered chunks can be GC'd. Without this, each disconnect + // leaks the decoder + parser pipeline (~50MB per reconnect). + reader.releaseLock(); } // Handle graceful server-side disconnect diff --git a/test/client/streamableHttp.test.ts b/test/client/streamableHttp.test.ts index 52c8f1074..b98fddad8 100644 --- a/test/client/streamableHttp.test.ts +++ b/test/client/streamableHttp.test.ts @@ -1041,6 +1041,67 @@ describe('StreamableHTTPClientTransport', () => { // Resumption token callback may be invoked, but the primary assertion // here is that no JSON parse errors occurred for the priming event. }); + + it('should release the SSE reader lock when the stream closes (regression for #1959)', async () => { + // Regression test for issue #1959: _handleSseStream acquired a reader via + // getReader() but never called releaseLock() on either the success or error + // path. That kept the decoder + parser pipeline alive across reconnects, + // leaking ~50MB per reconnect cycle. + transport = new StreamableHTTPClientTransport(new URL('http://localhost:1234/mcp'), { + reconnectionOptions: { + initialReconnectionDelay: 10, + maxRetries: 0, + maxReconnectionDelay: 1000, + reconnectionDelayGrowFactor: 1 + } + }); + + // Spy on every reader's releaseLock so we can verify the SSE pipeline is + // tidied up on both graceful close and error paths. + const releaseSpy = vi.spyOn(ReadableStreamDefaultReader.prototype, 'releaseLock'); + + // Graceful-close stream: a single notification then close. + const gracefulStream = new ReadableStream({ + start(controller) { + controller.enqueue( + new TextEncoder().encode('id: evt-1\ndata: {"jsonrpc":"2.0","method":"notifications/x","params":{}}\n\n') + ); + controller.close(); + } + }); + + // Error stream: pipeline aborts mid-read. + const erroringStream = new ReadableStream({ + start(controller) { + controller.error(new Error('boom')); + } + }); + + const fetchMock = global.fetch as Mock; + fetchMock.mockResolvedValueOnce({ + ok: true, + status: 200, + headers: new Headers({ 'content-type': 'text/event-stream' }), + body: gracefulStream + }); + fetchMock.mockResolvedValueOnce({ + ok: true, + status: 200, + headers: new Headers({ 'content-type': 'text/event-stream' }), + body: erroringStream + }); + + await transport.start(); + await transport['_startOrAuthSse']({}); + await transport['_startOrAuthSse']({}); + await vi.advanceTimersByTimeAsync(50); + + // releaseLock should have fired for both the graceful-close path and the + // error path. Other readers (e.g. those internal to pipeThrough) may also + // call releaseLock; we just need both of ours to be released. + expect(releaseSpy.mock.calls.length).toBeGreaterThanOrEqual(2); + releaseSpy.mockRestore(); + }); }); it('invalidates all credentials on InvalidClientError during auth', async () => {