Skip to content

Commit c8e8091

Browse files
committed
fix: avoid dropped errors when transport is closed or uninitialized
Signed-off-by: Dariusz Jędrzejczyk <2554306+chemicL@users.noreply.github.com>
1 parent c8ab341 commit c8e8091

7 files changed

Lines changed: 89 additions & 58 deletions

File tree

mcp-core/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java

Lines changed: 49 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -30,17 +30,7 @@
3030
import io.modelcontextprotocol.json.McpJsonDefaults;
3131
import io.modelcontextprotocol.json.McpJsonMapper;
3232
import io.modelcontextprotocol.json.TypeRef;
33-
import io.modelcontextprotocol.spec.ClosedMcpTransportSession;
34-
import io.modelcontextprotocol.spec.DefaultMcpTransportSession;
35-
import io.modelcontextprotocol.spec.DefaultMcpTransportStream;
36-
import io.modelcontextprotocol.spec.HttpHeaders;
37-
import io.modelcontextprotocol.spec.McpClientTransport;
38-
import io.modelcontextprotocol.spec.McpSchema;
39-
import io.modelcontextprotocol.spec.McpTransportException;
40-
import io.modelcontextprotocol.spec.McpTransportSession;
41-
import io.modelcontextprotocol.spec.McpTransportSessionNotFoundException;
42-
import io.modelcontextprotocol.spec.McpTransportStream;
43-
import io.modelcontextprotocol.spec.ProtocolVersions;
33+
import io.modelcontextprotocol.spec.*;
4434
import io.modelcontextprotocol.util.Assert;
4535
import io.modelcontextprotocol.util.Utils;
4636
import org.reactivestreams.Publisher;
@@ -189,14 +179,6 @@ private McpTransportSession<Disposable> createTransportSession() {
189179
return new DefaultMcpTransportSession(onClose);
190180
}
191181

192-
private McpTransportSession<Disposable> createClosedSession(McpTransportSession<Disposable> existingSession) {
193-
var existingSessionId = Optional.ofNullable(existingSession)
194-
.filter(session -> !(session instanceof ClosedMcpTransportSession<Disposable>))
195-
.flatMap(McpTransportSession::sessionId)
196-
.orElse(null);
197-
return new ClosedMcpTransportSession<>(existingSessionId);
198-
}
199-
200182
private Publisher<Void> createDelete(String sessionId) {
201183

202184
var uri = Utils.resolveUri(this.baseUri, this.endpoint);
@@ -240,7 +222,8 @@ private void handleException(Throwable t) {
240222
public Mono<Void> closeGracefully() {
241223
return Mono.defer(() -> {
242224
logger.debug("Graceful close triggered");
243-
McpTransportSession<Disposable> currentSession = this.activeSession.getAndUpdate(this::createClosedSession);
225+
McpTransportSession<Disposable> currentSession = this.activeSession
226+
.getAndSet(ClosedMcpTransportSession.INSTANCE);
244227
if (currentSession != null) {
245228
return Mono.from(currentSession.closeGracefully());
246229
}
@@ -250,6 +233,19 @@ public Mono<Void> closeGracefully() {
250233

251234
private Mono<Disposable> reconnect(McpTransportStream<Disposable> stream) {
252235
return Mono.deferContextual(ctx -> {
236+
var rh = this.handler.get();
237+
if (rh == null) {
238+
logger.warn("Transport has no request handler registered. Remember to call connect!");
239+
}
240+
241+
final Function<Mono<McpSchema.JSONRPCMessage>, Mono<McpSchema.JSONRPCMessage>> requestHandler = rh != null
242+
? rh : msg -> Mono.error(new IllegalStateException("No request handler"));
243+
244+
final McpTransportSession<Disposable> transportSession = this.activeSession.get();
245+
246+
if (ClosedMcpTransportSession.INSTANCE.equals(transportSession)) {
247+
throw new McpTransportSessionClosedException();
248+
}
253249

254250
if (stream != null) {
255251
logger.debug("Reconnecting stream {} with lastId {}", stream.streamId(), stream.lastId());
@@ -259,7 +255,7 @@ private Mono<Disposable> reconnect(McpTransportStream<Disposable> stream) {
259255
}
260256

261257
final AtomicReference<Disposable> disposableRef = new AtomicReference<>();
262-
final McpTransportSession<Disposable> transportSession = this.activeSession.get();
258+
263259
var uri = Utils.resolveUri(this.baseUri, this.endpoint);
264260

265261
Disposable connection = Mono.deferContextual(connectionCtx -> {
@@ -389,18 +385,18 @@ else if (statusCode == BAD_REQUEST) {
389385
"Received unrecognized SSE event type: " + sseResponseEvent.sseEvent().event()));
390386
})
391387
.retryWhen(authorizationErrorRetrySpec())
392-
.flatMap(jsonrpcMessage -> this.handler.get().apply(Mono.just(jsonrpcMessage)))
388+
.flatMap(jsonrpcMessage -> requestHandler.apply(Mono.just(jsonrpcMessage)))
393389
.onErrorMap(CompletionException.class, t -> t.getCause())
394-
.onErrorComplete(t -> {
395-
this.handleException(t);
396-
return true;
397-
})
398390
.doFinally(s -> {
399391
Disposable ref = disposableRef.getAndSet(null);
400392
if (ref != null) {
401393
transportSession.removeConnection(ref);
402394
}
403395
}))
396+
.onErrorComplete(t -> {
397+
this.handleException(t);
398+
return true;
399+
})
404400
.contextWrite(ctx)
405401
.subscribe();
406402

@@ -467,10 +463,23 @@ public String toString(McpSchema.JSONRPCMessage message) {
467463

468464
public Mono<Void> sendMessage(McpSchema.JSONRPCMessage sentMessage) {
469465
return Mono.create(deliveredSink -> {
466+
var rh = this.handler.get();
467+
if (rh == null) {
468+
logger.warn("Transport has no request handler registered. Remember to call connect!");
469+
}
470+
471+
final Function<Mono<McpSchema.JSONRPCMessage>, Mono<McpSchema.JSONRPCMessage>> requestHandler = rh != null
472+
? rh : msg -> Mono.error(new IllegalStateException("No request handler"));
473+
474+
var transportSession = this.activeSession.get();
475+
476+
if (ClosedMcpTransportSession.INSTANCE.equals(transportSession)) {
477+
throw new McpTransportSessionClosedException();
478+
}
479+
470480
logger.debug("Sending message {}", sentMessage);
471481

472482
final AtomicReference<Disposable> disposableRef = new AtomicReference<>();
473-
final McpTransportSession<Disposable> transportSession = this.activeSession.get();
474483

475484
var uri = Utils.resolveUri(this.baseUri, this.endpoint);
476485
String jsonBody = this.toString(sentMessage);
@@ -643,22 +652,26 @@ else if (statusCode == BAD_REQUEST) {
643652
new RuntimeException("Failed to send message: " + responseEvent));
644653
})
645654
.retryWhen(authorizationErrorRetrySpec())
646-
.flatMap(jsonRpcMessage -> this.handler.get().apply(Mono.just(jsonRpcMessage)))
655+
.flatMap(jsonRpcMessage -> requestHandler.apply(Mono.just(jsonRpcMessage)))
647656
.onErrorMap(CompletionException.class, t -> t.getCause())
648-
.onErrorComplete(t -> {
649-
// handle the error first
650-
this.handleException(t);
651-
// inform the caller of sendMessage
652-
deliveredSink.error(t);
653-
return true;
654-
})
655657
.doFinally(s -> {
656658
logger.debug("SendMessage finally: {}", s);
657659
Disposable ref = disposableRef.getAndSet(null);
658660
if (ref != null) {
659661
transportSession.removeConnection(ref);
660662
}
661-
})).contextWrite(deliveredSink.contextView()).subscribe();
663+
})).onErrorComplete(t -> {
664+
// handle the error first
665+
try {
666+
this.handleException(t);
667+
}
668+
catch (Exception e) {
669+
logger.error("Error handling exception {}", t.getMessage(), e);
670+
}
671+
// inform the caller of sendMessage
672+
deliveredSink.error(t);
673+
return true;
674+
}).contextWrite(deliveredSink.contextView()).subscribe();
662675

663676
disposableRef.set(connection);
664677
transportSession.addConnection(connection);

mcp-core/src/main/java/io/modelcontextprotocol/spec/ClosedMcpTransportSession.java

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -6,43 +6,41 @@
66
import java.util.Optional;
77

88
import org.reactivestreams.Publisher;
9+
import reactor.core.Disposable;
910
import reactor.core.publisher.Mono;
1011
import reactor.util.annotation.Nullable;
1112

1213
/**
13-
* Represents a closed MCP session, which may not be reused. All calls will throw a
14-
* {@link McpTransportSessionClosedException}.
14+
* Represents a closed MCP session, which may not be reused.
1515
*
16-
* @param <CONNECTION> the resource representing the connection that the transport
17-
* manages.
1816
* @author Daniel Garnier-Moiroux
17+
* @author Dariusz Jędrzejczyk
1918
*/
20-
public class ClosedMcpTransportSession<CONNECTION> implements McpTransportSession<CONNECTION> {
19+
public final class ClosedMcpTransportSession implements McpTransportSession<Disposable> {
2120

22-
private final String sessionId;
21+
public static final ClosedMcpTransportSession INSTANCE = new ClosedMcpTransportSession();
2322

24-
public ClosedMcpTransportSession(@Nullable String sessionId) {
25-
this.sessionId = sessionId;
23+
private ClosedMcpTransportSession() {
2624
}
2725

2826
@Override
2927
public Optional<String> sessionId() {
30-
throw new McpTransportSessionClosedException(sessionId);
28+
return Optional.empty();
3129
}
3230

3331
@Override
3432
public boolean markInitialized(String sessionId) {
35-
throw new McpTransportSessionClosedException(sessionId);
33+
throw new IllegalStateException("MCP Session is already closed");
3634
}
3735

3836
@Override
39-
public void addConnection(CONNECTION connection) {
40-
throw new McpTransportSessionClosedException(sessionId);
37+
public void addConnection(Disposable connection) {
38+
throw new IllegalStateException("MCP Session is already closed");
4139
}
4240

4341
@Override
44-
public void removeConnection(CONNECTION connection) {
45-
throw new McpTransportSessionClosedException(sessionId);
42+
public void removeConnection(Disposable connection) {
43+
throw new IllegalStateException("MCP Session is already closed");
4644
}
4745

4846
@Override

mcp-core/src/main/java/io/modelcontextprotocol/spec/McpClientSession.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,8 @@ public McpClientSession(Duration requestTimeout, McpClientTransport transport,
119119
this.requestHandlers.putAll(requestHandlers);
120120
this.notificationHandlers.putAll(notificationHandlers);
121121

122-
this.transport.connect(mono -> mono.doOnNext(this::handle)).transform(connectHook).subscribe();
122+
this.transport.connect(mono -> mono.doOnNext(this::handle)).transform(connectHook).subscribe(ignored -> {
123+
}, error -> logger.warn("Client failed during connect", error));
123124
}
124125

125126
private void dismissPendingResponses() {
@@ -160,7 +161,12 @@ else if (message instanceof McpSchema.JSONRPCRequest request) {
160161
var errorResponse = McpSchema.JSONRPCResponse.error(request.id(), jsonRpcError);
161162
return Mono.just(errorResponse);
162163
}).flatMap(this.transport::sendMessage).onErrorComplete(t -> {
163-
logger.warn("Issue sending response to the client, ", t);
164+
if (t instanceof McpTransportSessionClosedException) {
165+
logger.debug("Can't send response to request {} when the transport is closed", request.id());
166+
}
167+
else {
168+
logger.warn("Failed to send response to the server", t);
169+
}
164170
return true;
165171
}).subscribe();
166172
}

mcp-core/src/main/java/io/modelcontextprotocol/spec/McpTransport.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88

99
import io.modelcontextprotocol.spec.McpSchema.JSONRPCMessage;
1010
import io.modelcontextprotocol.json.TypeRef;
11+
import org.slf4j.Logger;
12+
import org.slf4j.LoggerFactory;
1113
import reactor.core.publisher.Mono;
1214

1315
/**
@@ -39,6 +41,8 @@
3941
*/
4042
public interface McpTransport {
4143

44+
Logger logger = LoggerFactory.getLogger(McpTransport.class);
45+
4246
/**
4347
* Closes the transport connection and releases any associated resources.
4448
*
@@ -48,7 +52,8 @@ public interface McpTransport {
4852
* </p>
4953
*/
5054
default void close() {
51-
this.closeGracefully().subscribe();
55+
this.closeGracefully().subscribe(ignored -> {
56+
}, error -> logger.warn("Error during asynchronous close", error));
5257
}
5358

5459
/**

mcp-core/src/main/java/io/modelcontextprotocol/spec/McpTransportSessionClosedException.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,14 @@
1313
* @see ClosedMcpTransportSession
1414
* @author Daniel Garnier-Moiroux
1515
*/
16+
1617
public class McpTransportSessionClosedException extends RuntimeException {
1718

19+
public McpTransportSessionClosedException() {
20+
super("Transport has already been closed.");
21+
}
22+
23+
@Deprecated(forRemoval = true)
1824
public McpTransportSessionClosedException(@Nullable String sessionId) {
1925
super(sessionId != null ? "MCP session with ID %s has been closed".formatted(sessionId)
2026
: "MCP session has been closed");

mcp-test/src/main/java/io/modelcontextprotocol/client/AbstractMcpAsyncClientTests.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -463,7 +463,7 @@ void testRootsListChanged() {
463463
void testInitializeWithRootsListProviders() {
464464
withClient(createMcpTransport(),
465465
builder -> builder.roots(Root.builder("file:///test/path").name("test-root").build()), client -> {
466-
StepVerifier.create(client.initialize().then(client.closeGracefully())).verifyComplete();
466+
StepVerifier.create(client.initialize()).expectNextCount(1).verifyComplete();
467467
});
468468
}
469469

@@ -725,8 +725,6 @@ void testLoggingConsumer() {
725725
builder -> builder.loggingConsumer(notification -> Mono.fromRunnable(() -> logReceived.set(true))),
726726
client -> {
727727
StepVerifier.create(client.initialize()).expectNextMatches(Objects::nonNull).verifyComplete();
728-
StepVerifier.create(client.closeGracefully()).verifyComplete();
729-
730728
});
731729

732730
}

mcp-test/src/test/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransportTest.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,14 @@
88
import io.modelcontextprotocol.client.transport.customizer.McpSyncHttpClientRequestCustomizer;
99
import io.modelcontextprotocol.common.McpTransportContext;
1010
import io.modelcontextprotocol.spec.McpSchema;
11+
import io.modelcontextprotocol.spec.McpTransportSessionClosedException;
1112
import io.modelcontextprotocol.spec.ProtocolVersions;
1213
import java.net.URI;
1314
import java.net.URISyntaxException;
1415
import java.util.Map;
1516
import java.util.function.Consumer;
17+
import java.util.function.Function;
18+
1619
import org.junit.jupiter.api.AfterAll;
1720
import org.junit.jupiter.api.BeforeAll;
1821
import org.junit.jupiter.api.Test;
@@ -139,13 +142,14 @@ void testCloseUninitialized() {
139142
var testMessage = new McpSchema.JSONRPCRequest(McpSchema.METHOD_INITIALIZE, "test-id", initializeRequest);
140143

141144
StepVerifier.create(transport.sendMessage(testMessage))
142-
.expectErrorMessage("MCP session has been closed")
145+
.expectErrorMessage("Transport has already been closed.")
143146
.verify();
144147
}
145148

146149
@Test
147150
void testCloseInitialized() {
148151
var transport = HttpClientStreamableHttpTransport.builder(host).build();
152+
// transport.connect(Function.identity()).block();
149153

150154
var initializeRequest = McpSchema.InitializeRequest
151155
.builder(ProtocolVersions.MCP_2025_11_25, McpSchema.ClientCapabilities.builder().roots(true).build(),
@@ -157,7 +161,8 @@ void testCloseInitialized() {
157161
StepVerifier.create(transport.closeGracefully()).verifyComplete();
158162

159163
StepVerifier.create(transport.sendMessage(testMessage))
160-
.expectErrorMatches(err -> err.getMessage().matches("MCP session with ID [a-zA-Z0-9-]* has been closed"))
164+
.expectErrorMatches(err -> err instanceof McpTransportSessionClosedException
165+
&& err.getMessage().contains("Transport has already been closed"))
161166
.verify();
162167
}
163168

0 commit comments

Comments
 (0)