Skip to content

Commit a1c7e0f

Browse files
committed
fix: propagate stdio process exit during initialization
1 parent df75857 commit a1c7e0f

8 files changed

Lines changed: 281 additions & 4 deletions

File tree

mcp-core/src/main/java/io/modelcontextprotocol/client/LifecycleInitializer.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import java.util.concurrent.atomic.AtomicReference;
1212
import java.util.function.Function;
1313

14+
import io.modelcontextprotocol.client.transport.McpStdioServerProcessExitException;
1415
import io.modelcontextprotocol.spec.McpClientSession;
1516
import io.modelcontextprotocol.spec.McpError;
1617
import io.modelcontextprotocol.spec.McpSchema;
@@ -225,6 +226,16 @@ private void close() {
225226
this.mcpSession().close();
226227
}
227228

229+
private void close(Throwable cause) {
230+
McpClientSession mcpClientSession = this.mcpSession();
231+
if (mcpClientSession != null) {
232+
mcpClientSession.close(cause);
233+
}
234+
else {
235+
this.error(cause);
236+
}
237+
}
238+
228239
private Mono<Void> closeGracefully() {
229240
return this.mcpSession().closeGracefully();
230241
}
@@ -259,6 +270,13 @@ public void handleException(Throwable t) {
259270
// the implicit initialization step.
260271
this.withInitialization("re-initializing", result -> Mono.empty()).subscribe();
261272
}
273+
else if (t instanceof McpStdioServerProcessExitException) {
274+
DefaultInitialization previous = this.initializationRef.get();
275+
if (previous != null && previous.initializeResult() == null
276+
&& this.initializationRef.compareAndSet(previous, null)) {
277+
previous.close(t);
278+
}
279+
}
262280
}
263281

264282
/**
@@ -355,4 +373,4 @@ public Mono<?> closeGracefully() {
355373
});
356374
}
357375

358-
}
376+
}
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Copyright 2026-2026 the original author or authors.
3+
*/
4+
5+
package io.modelcontextprotocol.client.transport;
6+
7+
import io.modelcontextprotocol.spec.McpTransportException;
8+
import io.modelcontextprotocol.util.Assert;
9+
10+
/**
11+
* Thrown when an MCP stdio server process exits unexpectedly.
12+
*
13+
* @author DragonFSKY
14+
*/
15+
public class McpStdioServerProcessExitException extends McpTransportException {
16+
17+
private static final long serialVersionUID = 1L;
18+
19+
private final int exitCode;
20+
21+
private final String command;
22+
23+
public McpStdioServerProcessExitException(int exitCode, String command) {
24+
super(message(exitCode, command));
25+
this.exitCode = exitCode;
26+
this.command = command;
27+
}
28+
29+
public int getExitCode() {
30+
return this.exitCode;
31+
}
32+
33+
public String getCommand() {
34+
return this.command;
35+
}
36+
37+
private static String message(int exitCode, String command) {
38+
Assert.hasText(command, "The command can not be empty");
39+
return "MCP server process exited unexpectedly with code " + exitCode + " for command: " + command;
40+
}
41+
42+
}

mcp-core/src/main/java/io/modelcontextprotocol/client/transport/StdioClientTransport.java

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import java.util.List;
1515
import java.util.Set;
1616
import java.util.concurrent.Executors;
17+
import java.util.concurrent.atomic.AtomicReference;
1718
import java.util.function.Consumer;
1819
import java.util.function.Function;
1920
import java.util.stream.IntStream;
@@ -60,6 +61,10 @@ public class StdioClientTransport implements McpClientTransport {
6061
/** The server process being communicated with */
6162
private Process process;
6263

64+
private final AtomicReference<McpStdioServerProcessExitException> unexpectedExitException = new AtomicReference<>();
65+
66+
private final AtomicReference<Consumer<Throwable>> exceptionHandler = new AtomicReference<>();
67+
6368
private McpJsonMapper jsonMapper;
6469

6570
/** Scheduler for handling inbound messages from the server process */
@@ -78,6 +83,8 @@ public class StdioClientTransport implements McpClientTransport {
7883

7984
private volatile boolean isClosing = false;
8085

86+
private volatile boolean closeRequested = false;
87+
8188
// visible for tests
8289
private Consumer<String> stdErrorHandler = error -> logger.info("STDERR Message received: {}", error);
8390

@@ -146,6 +153,7 @@ public Mono<Void> connect(Function<Mono<JSONRPCMessage>, Mono<JSONRPCMessage>> h
146153
startInboundProcessing();
147154
startOutboundProcessing();
148155
startErrorProcessing();
156+
startExitMonitoring();
149157
logger.info("MCP server started");
150158
}).subscribeOn(Schedulers.boundedElastic());
151159
}
@@ -172,6 +180,11 @@ public void setStdErrorHandler(Consumer<String> errorHandler) {
172180
this.stdErrorHandler = errorHandler;
173181
}
174182

183+
@Override
184+
public void setExceptionHandler(Consumer<Throwable> handler) {
185+
this.exceptionHandler.set(handler);
186+
}
187+
175188
/**
176189
* Waits for the server process to exit.
177190
* @throws RuntimeException if the process is interrupted while waiting
@@ -239,6 +252,14 @@ private void handleIncomingErrors() {
239252

240253
@Override
241254
public Mono<Void> sendMessage(JSONRPCMessage message) {
255+
McpStdioServerProcessExitException exitException = this.unexpectedExitException.get();
256+
if (exitException != null) {
257+
return Mono.error(exitException);
258+
}
259+
if (!this.closeRequested && this.process != null && !this.process.isAlive()) {
260+
exitException = signalUnexpectedProcessExit(this.process.exitValue());
261+
return Mono.error(exitException);
262+
}
242263
if (this.outboundSink.tryEmitNext(message).isSuccess()) {
243264
// TODO: essentially we could reschedule ourselves in some time and make
244265
// another attempt with the already read data but pause reading until
@@ -252,6 +273,32 @@ public Mono<Void> sendMessage(JSONRPCMessage message) {
252273
}
253274
}
254275

276+
private void startExitMonitoring() {
277+
this.process.onExit().thenAccept(process -> {
278+
if (!closeRequested) {
279+
signalUnexpectedProcessExit(process.exitValue());
280+
}
281+
});
282+
}
283+
284+
private McpStdioServerProcessExitException signalUnexpectedProcessExit(int exitCode) {
285+
McpStdioServerProcessExitException exception = new McpStdioServerProcessExitException(exitCode,
286+
this.params.getCommand());
287+
if (this.unexpectedExitException.compareAndSet(null, exception)) {
288+
logger.warn(exception.getMessage());
289+
isClosing = true;
290+
inboundSink.tryEmitComplete();
291+
outboundSink.tryEmitComplete();
292+
errorSink.tryEmitComplete();
293+
294+
Consumer<Throwable> handler = this.exceptionHandler.get();
295+
if (handler != null) {
296+
handler.accept(exception);
297+
}
298+
}
299+
return this.unexpectedExitException.get();
300+
}
301+
255302
/**
256303
* Starts the inbound processing thread that reads JSON-RPC messages from the
257304
* process's input stream. Messages are deserialized and emitted to the inbound sink.
@@ -347,6 +394,7 @@ protected void handleOutbound(Function<Flux<JSONRPCMessage>, Flux<JSONRPCMessage
347394
@Override
348395
public Mono<Void> closeGracefully() {
349396
return Mono.fromRunnable(() -> {
397+
closeRequested = true;
350398
isClosing = true;
351399
logger.debug("Initiating graceful shutdown");
352400
}).then(Mono.<Void>defer(() -> {

mcp-core/src/main/java/io/modelcontextprotocol/spec/McpClientSession.java

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -123,14 +123,18 @@ public McpClientSession(Duration requestTimeout, McpClientTransport transport,
123123
}, error -> logger.warn("Client failed during connect", error));
124124
}
125125

126-
private void dismissPendingResponses() {
126+
private void dismissPendingResponses(Throwable cause) {
127127
this.pendingResponses.forEach((id, sink) -> {
128-
logger.info("Abruptly terminating exchange for request {}", id);
129-
sink.error(new RuntimeException("MCP session with server terminated"));
128+
logger.warn("Abruptly terminating exchange for request {}: {}", id, cause.toString());
129+
sink.error(cause);
130130
});
131131
this.pendingResponses.clear();
132132
}
133133

134+
private void dismissPendingResponses() {
135+
dismissPendingResponses(new RuntimeException("MCP session with server terminated"));
136+
}
137+
134138
private void handle(McpSchema.JSONRPCMessage message) {
135139
if (message instanceof McpSchema.JSONRPCResponse response) {
136140
logger.debug("Received response: {}", response);
@@ -310,4 +314,13 @@ public void close() {
310314
dismissPendingResponses();
311315
}
312316

317+
/**
318+
* Closes the session immediately, failing pending operations with the given cause.
319+
* @param cause the transport-level cause of the closure
320+
*/
321+
public void close(Throwable cause) {
322+
Assert.notNull(cause, "The cause can not be null");
323+
dismissPendingResponses(cause);
324+
}
325+
313326
}

mcp-core/src/test/java/io/modelcontextprotocol/client/LifecycleInitializerTests.java

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,10 @@
1111
import java.util.function.Function;
1212

1313
import io.modelcontextprotocol.client.LifecycleInitializer.Initialization;
14+
import io.modelcontextprotocol.client.transport.McpStdioServerProcessExitException;
1415
import io.modelcontextprotocol.spec.McpClientSession;
1516
import io.modelcontextprotocol.spec.McpSchema;
17+
import io.modelcontextprotocol.spec.McpTransportException;
1618
import io.modelcontextprotocol.spec.McpTransportSessionNotFoundException;
1719
import org.junit.jupiter.api.BeforeEach;
1820
import org.junit.jupiter.api.Test;
@@ -302,6 +304,56 @@ void shouldHandleOtherExceptions() {
302304
verify(mockSessionSupplier, times(1)).apply(any(ContextView.class));
303305
}
304306

307+
@Test
308+
void shouldCloseInProgressInitializationOnStdioProcessExit() {
309+
var cause = new McpStdioServerProcessExitException(127, "java");
310+
when(mockClientSession.sendRequest(eq(McpSchema.METHOD_INITIALIZE), any(), any())).thenReturn(Mono.never())
311+
.thenReturn(Mono.just(MOCK_INIT_RESULT));
312+
313+
var subscription = initializer.withInitialization("test", init -> Mono.just(init.initializeResult()))
314+
.subscribe();
315+
316+
initializer.handleException(cause);
317+
subscription.dispose();
318+
319+
verify(mockClientSession).close(cause);
320+
321+
StepVerifier.create(initializer.withInitialization("retry", init -> Mono.just(init.initializeResult())))
322+
.expectNext(MOCK_INIT_RESULT)
323+
.verifyComplete();
324+
325+
verify(mockSessionSupplier, times(2)).apply(any(ContextView.class));
326+
}
327+
328+
@Test
329+
void shouldIgnoreGenericTransportExceptionDuringInitialization() {
330+
var cause = new McpTransportException("Transport closed");
331+
when(mockClientSession.sendRequest(eq(McpSchema.METHOD_INITIALIZE), any(), any())).thenReturn(Mono.never());
332+
333+
var subscription = initializer.withInitialization("test", init -> Mono.just(init.initializeResult()))
334+
.subscribe();
335+
336+
initializer.handleException(cause);
337+
subscription.dispose();
338+
339+
verify(mockClientSession, never()).close(cause);
340+
}
341+
342+
@Test
343+
void shouldKeepInitializedAfterTransportException() {
344+
StepVerifier.create(initializer.withInitialization("test", init -> Mono.just(init.initializeResult())))
345+
.expectNext(MOCK_INIT_RESULT)
346+
.verifyComplete();
347+
348+
var cause = new McpTransportException("Transport closed");
349+
350+
initializer.handleException(cause);
351+
352+
assertThat(initializer.isInitialized()).isTrue();
353+
verify(mockClientSession, never()).close(cause);
354+
verify(mockSessionSupplier, times(1)).apply(any(ContextView.class));
355+
}
356+
305357
@Test
306358
void shouldCloseGracefully() {
307359
StepVerifier.create(initializer.withInitialization("test", init -> Mono.just(init.initializeResult())))

mcp-core/src/test/java/io/modelcontextprotocol/spec/McpClientSessionTests.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,21 @@ void testRequestTimeout() {
107107
session.close();
108108
}
109109

110+
@Test
111+
void testPendingRequestFailsWithCloseCause() {
112+
var transport = new MockMcpClientTransport();
113+
var session = new McpClientSession(TIMEOUT, transport, Map.of(),
114+
Map.of(TEST_NOTIFICATION, params -> Mono.fromRunnable(() -> logger.info("Status update: {}", params))),
115+
Function.identity());
116+
var cause = new McpTransportException("Transport closed");
117+
118+
Mono<String> responseMono = session.sendRequest(TEST_METHOD, "test", responseType);
119+
120+
StepVerifier.create(responseMono).then(() -> session.close(cause)).expectErrorSatisfies(error -> {
121+
assertThat(error).isSameAs(cause);
122+
}).verify();
123+
}
124+
110125
@Test
111126
void testSendNotification() {
112127
var transport = new MockMcpClientTransport();
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
/*
2+
* Copyright 2024-2026 the original author or authors.
3+
*/
4+
5+
package io.modelcontextprotocol.client;
6+
7+
final class FailingStdioServer {
8+
9+
private FailingStdioServer() {
10+
}
11+
12+
public static void main(String[] args) {
13+
System.err.println("Exiting before MCP initialization with code 127");
14+
System.exit(127);
15+
}
16+
17+
}

0 commit comments

Comments
 (0)