Skip to content

Commit d1e5d18

Browse files
committed
fix: harden streamable HTTP session routing
1 parent 723ea54 commit d1e5d18

7 files changed

Lines changed: 254 additions & 31 deletions

File tree

acp-core/src/main/java/com/agentclientprotocol/sdk/client/transport/StreamableHttpAcpClientTransport.java

Lines changed: 55 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import java.util.function.Function;
2828

2929
import com.agentclientprotocol.sdk.error.AcpConnectionException;
30+
import com.agentclientprotocol.sdk.error.AcpErrorCodes;
3031
import com.agentclientprotocol.sdk.json.AcpJsonMapper;
3132
import com.agentclientprotocol.sdk.json.TypeRef;
3233
import com.agentclientprotocol.sdk.spec.AcpClientTransport;
@@ -167,7 +168,7 @@ private record HttpClientBundle(HttpClient httpClient, ExecutorService ownedExec
167168

168169
private final Map<String, SseStream> sessionStreams = new ConcurrentHashMap<>();
169170

170-
// Session id -> shared open operation so callers reuse one GET while the stream lives.
171+
// Session id -> shared open operation so callers reuse one GET while opening.
171172
private final Map<String, Mono<Void>> sessionStreamOpenOperations = new ConcurrentHashMap<>();
172173

173174
private volatile SseStream connectionStream;
@@ -416,7 +417,7 @@ private Mono<Void> createSessionStreamOpenMono(String sessionId) {
416417
return openSseStream(RouteScope.session(sessionId))
417418
.doOnSuccess(stream -> sessionStreams.putIfAbsent(sessionId, stream))
418419
.then()
419-
.doOnError(error -> sessionStreamOpenOperations.remove(sessionId))
420+
.doFinally(signal -> sessionStreamOpenOperations.remove(sessionId))
420421
.cache();
421422
}
422423

@@ -555,20 +556,16 @@ private Mono<Void> processInbound(RouteScope actualScope, JSONRPCMessage message
555556
if (message instanceof AcpSchema.JSONRPCResponse response) {
556557
OutboundRequestRoute expectedRoute = outboundRequestRoutes.get(response.id());
557558
if (expectedRoute != null && !Objects.equals(expectedRoute.responseScope(), actualScope)) {
558-
return Mono.error(new AcpConnectionException("Response id " + response.id() + " arrived on "
559-
+ actualScope + " but expected " + expectedRoute.responseScope()));
559+
outboundRequestRoutes.remove(response.id());
560+
return emitInbound(errorResponse(response.id(), "Response id " + response.id() + " arrived on "
561+
+ actualScope + " but expected " + expectedRoute.responseScope(), null));
560562
}
561563
if (expectedRoute != null && expectedRoute.kind() == RequestKind.SESSION_NEW) {
562-
AcpSchema.NewSessionResponse sessionResponse = jsonMapper.convertValue(response.result(),
563-
new TypeRef<AcpSchema.NewSessionResponse>() {
564-
});
565-
String sessionId = sessionResponse.sessionId();
566-
if (sessionId == null || sessionId.isBlank()) {
567-
return Mono.error(new AcpConnectionException("session/new response missing sessionId"));
564+
if (response.error() != null) {
565+
outboundRequestRoutes.remove(response.id());
566+
return emitInbound(response);
568567
}
569-
return openSessionStream(sessionId)
570-
.then(Mono.fromRunnable(() -> outboundRequestRoutes.remove(response.id())))
571-
.then(emitInbound(message));
568+
return processNewSessionResponse(response);
572569
}
573570
if (expectedRoute != null) {
574571
outboundRequestRoutes.remove(response.id());
@@ -586,6 +583,38 @@ private Mono<Void> processInbound(RouteScope actualScope, JSONRPCMessage message
586583
return emitInbound(message);
587584
}
588585

586+
private Mono<Void> processNewSessionResponse(AcpSchema.JSONRPCResponse response) {
587+
String sessionId;
588+
try {
589+
AcpSchema.NewSessionResponse sessionResponse = jsonMapper.convertValue(response.result(),
590+
new TypeRef<AcpSchema.NewSessionResponse>() {
591+
});
592+
sessionId = sessionResponse.sessionId();
593+
}
594+
catch (Exception e) {
595+
outboundRequestRoutes.remove(response.id());
596+
return emitInbound(errorResponse(response.id(), "Failed to read session/new response", e));
597+
}
598+
if (sessionId == null || sessionId.isBlank()) {
599+
outboundRequestRoutes.remove(response.id());
600+
return emitInbound(errorResponse(response.id(), "session/new response missing sessionId", null));
601+
}
602+
return openSessionStream(sessionId)
603+
.then(Mono.fromRunnable(() -> outboundRequestRoutes.remove(response.id())))
604+
.then(emitInbound(response))
605+
.onErrorResume(error -> {
606+
outboundRequestRoutes.remove(response.id());
607+
return emitInbound(errorResponse(response.id(),
608+
"Failed to open session SSE stream for session " + sessionId, error));
609+
});
610+
}
611+
612+
private AcpSchema.JSONRPCResponse errorResponse(Object id, String message, Throwable error) {
613+
Object data = error == null ? null : error.getMessage();
614+
return new AcpSchema.JSONRPCResponse(AcpSchema.JSONRPC_VERSION, id, null,
615+
new AcpSchema.JSONRPCError(AcpErrorCodes.INTERNAL_ERROR, message, data));
616+
}
617+
589618
private Mono<Void> emitInbound(JSONRPCMessage message) {
590619
return Mono.fromRunnable(() -> {
591620
synchronized (inboundEmitMonitor) {
@@ -624,6 +653,11 @@ public Mono<Void> closeGracefully() {
624653
});
625654
}
626655

656+
@Override
657+
public void close() {
658+
closeGracefully().block(Duration.ofSeconds(5));
659+
}
660+
627661
private void clearState() {
628662
connectionStream = null;
629663
sessionStreams.clear();
@@ -722,7 +756,7 @@ private void readLoop() {
722756
}
723757
catch (Exception e) {
724758
if (!closed.get() && !closing.get()) {
725-
exceptionHandler.accept(e);
759+
logger.warn("SSE reader stopped for {}", scope, e);
726760
}
727761
}
728762
}
@@ -733,11 +767,16 @@ private void dispatchEvent(StringBuilder dataBuffer) {
733767
}
734768
try {
735769
JSONRPCMessage message = AcpSchema.deserializeJsonRpcMessage(jsonMapper, dataBuffer.toString());
736-
processInbound(scope, message).block(Duration.ofSeconds(30));
770+
processInbound(scope, message).subscribe(v -> {
771+
}, error -> {
772+
if (!closed.get() && !closing.get()) {
773+
logger.warn("Failed to process SSE event from {}", scope, error);
774+
}
775+
});
737776
}
738777
catch (Exception e) {
739778
if (!closed.get() && !closing.get()) {
740-
exceptionHandler.accept(e);
779+
logger.warn("Failed to deserialize SSE event from {}", scope, e);
741780
}
742781
}
743782
}

acp-core/src/main/java/com/agentclientprotocol/sdk/spec/AcpClientSession.java

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -148,8 +148,6 @@ public AcpClientSession(Duration requestTimeout, AcpClientTransport transport,
148148
return t;
149149
}), "acp-timeout-" + sessionPrefix);
150150

151-
this.transport.setExceptionHandler(this::handleTransportException);
152-
153151
/*
154152
* Client transports currently retain a compatibility path that may forward any
155153
* message emitted by this handler back onto the wire. The session handles outbound
@@ -162,14 +160,6 @@ public AcpClientSession(Duration requestTimeout, AcpClientTransport transport,
162160
this.transport.connect(mono -> mono.doOnNext(this::handle).then(Mono.empty())).transform(connectHook).subscribe();
163161
}
164162

165-
private void handleTransportException(Throwable error) {
166-
this.pendingResponses.forEach((id, sink) -> {
167-
logger.warn("Terminating exchange for request {} after transport error", id, error);
168-
sink.error(error);
169-
});
170-
this.pendingResponses.clear();
171-
}
172-
173163
private void dismissPendingResponses() {
174164
this.pendingResponses.forEach((id, sink) -> {
175165
logger.warn("Abruptly terminating exchange for request {}", id);

acp-core/src/test/java/com/agentclientprotocol/sdk/client/transport/StreamableHttpAcpClientTransportIntegrationTest.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,6 @@ void wrongStreamResponseFailsPendingExchange() throws Exception {
177177
assertThatThrownBy(() -> client
178178
.prompt(AcpTestFixtures.createPromptRequest(session.sessionId(), "wrong stream"))
179179
.block(TIMEOUT))
180-
.isInstanceOf(AcpConnectionException.class)
181180
.hasMessageContaining("arrived on RouteScope");
182181

183182
client.closeGracefully().block(TIMEOUT);

acp-core/src/test/java/com/agentclientprotocol/sdk/client/transport/StreamableHttpAcpClientTransportTest.java

Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,20 @@
66

77
import java.io.ByteArrayInputStream;
88
import java.io.InputStream;
9+
import java.io.PipedInputStream;
10+
import java.io.PipedOutputStream;
911
import java.net.URI;
1012
import java.net.http.HttpClient;
1113
import java.net.http.HttpHeaders;
1214
import java.net.http.HttpRequest;
1315
import java.net.http.HttpResponse;
16+
import java.nio.charset.StandardCharsets;
1417
import java.util.List;
1518
import java.util.Map;
19+
import java.util.concurrent.BlockingQueue;
1620
import java.util.concurrent.CompletableFuture;
1721
import java.util.concurrent.CountDownLatch;
22+
import java.util.concurrent.LinkedBlockingQueue;
1823
import java.util.concurrent.TimeUnit;
1924
import java.util.concurrent.atomic.AtomicInteger;
2025
import java.util.stream.Collectors;
@@ -161,6 +166,146 @@ void concurrentSessionLoadsReuseInFlightSessionStreamOpen() throws Exception {
161166
loads.get(1, TimeUnit.SECONDS);
162167
}
163168

169+
@Test
170+
void sessionNewResponseDoesNotBlockConnectionReaderWhileSessionStreamOpens() throws Exception {
171+
HttpClient httpClient = mock(HttpClient.class);
172+
PipedInputStream connectionStreamBody = new PipedInputStream();
173+
PipedOutputStream connectionStreamWriter = new PipedOutputStream(connectionStreamBody);
174+
CompletableFuture<HttpResponse<InputStream>> sessionStreamResponse = new CompletableFuture<>();
175+
BlockingQueue<AcpSchema.JSONRPCMessage> inboundMessages = new LinkedBlockingQueue<>();
176+
177+
when(httpClient.sendAsync(any(), any())).thenAnswer(invocation -> {
178+
HttpRequest request = invocation.getArgument(0);
179+
if ("POST".equals(request.method())
180+
&& request.headers().firstValue("Acp-Connection-Id").isEmpty()) {
181+
String initializeResponse = jsonMapper.writeValueAsString(AcpTestFixtures
182+
.createJsonRpcResponse("init-1", AcpTestFixtures.createInitializeResponse()));
183+
return CompletableFuture.completedFuture(response(200,
184+
Map.of("Content-Type", "application/json", "Acp-Connection-Id", "conn-1"),
185+
initializeResponse));
186+
}
187+
if ("GET".equals(request.method())
188+
&& request.headers().firstValue("Acp-Session-Id").isEmpty()) {
189+
return CompletableFuture.completedFuture(
190+
response(200, Map.of("Content-Type", "text/event-stream"), connectionStreamBody));
191+
}
192+
if ("GET".equals(request.method())) {
193+
return sessionStreamResponse;
194+
}
195+
return CompletableFuture.completedFuture(response(202, Map.of(), null));
196+
});
197+
198+
StreamableHttpAcpClientTransport transport = new StreamableHttpAcpClientTransport(
199+
URI.create("https://localhost:8443/acp"), jsonMapper, httpClient);
200+
try {
201+
transport.connect(message -> message.doOnNext(inboundMessages::add).then(Mono.empty())).block();
202+
transport.sendMessage(AcpTestFixtures.createJsonRpcRequest(AcpSchema.METHOD_INITIALIZE, "init-1",
203+
AcpTestFixtures.createInitializeRequest()))
204+
.block();
205+
awaitResponse(inboundMessages, "init-1");
206+
207+
transport.sendMessage(AcpTestFixtures.createJsonRpcRequest(AcpSchema.METHOD_SESSION_NEW, "new-1",
208+
AcpTestFixtures.createNewSessionRequest("/workspace")))
209+
.block();
210+
transport.sendMessage(new AcpSchema.JSONRPCRequest(AcpSchema.JSONRPC_VERSION, "ping-1",
211+
"extension/ping", Map.of()))
212+
.block();
213+
214+
writeSse(connectionStreamWriter,
215+
AcpTestFixtures.createJsonRpcResponse("new-1",
216+
new AcpSchema.NewSessionResponse("sess-1", null, null)));
217+
writeSse(connectionStreamWriter, AcpTestFixtures.createJsonRpcResponse("ping-1", Map.of()));
218+
219+
assertThat(awaitResponse(inboundMessages, "ping-1")).isNotNull();
220+
assertThat(inboundMessages.stream()
221+
.filter(AcpSchema.JSONRPCResponse.class::isInstance)
222+
.map(AcpSchema.JSONRPCResponse.class::cast)
223+
.noneMatch(response -> "new-1".equals(response.id()))).isTrue();
224+
225+
sessionStreamResponse.complete(response(200, Map.of("Content-Type", "text/event-stream"), emptyBody()));
226+
assertThat(awaitResponse(inboundMessages, "new-1")).isNotNull();
227+
}
228+
finally {
229+
connectionStreamWriter.close();
230+
transport.close();
231+
}
232+
}
233+
234+
@Test
235+
void sessionNewErrorResponseIsDeliveredWithoutOpeningSessionStream() throws Exception {
236+
HttpClient httpClient = mock(HttpClient.class);
237+
PipedInputStream connectionStreamBody = new PipedInputStream();
238+
PipedOutputStream connectionStreamWriter = new PipedOutputStream(connectionStreamBody);
239+
AtomicInteger sessionGetCount = new AtomicInteger();
240+
BlockingQueue<AcpSchema.JSONRPCMessage> inboundMessages = new LinkedBlockingQueue<>();
241+
242+
when(httpClient.sendAsync(any(), any())).thenAnswer(invocation -> {
243+
HttpRequest request = invocation.getArgument(0);
244+
if ("POST".equals(request.method())
245+
&& request.headers().firstValue("Acp-Connection-Id").isEmpty()) {
246+
String initializeResponse = jsonMapper.writeValueAsString(AcpTestFixtures
247+
.createJsonRpcResponse("init-1", AcpTestFixtures.createInitializeResponse()));
248+
return CompletableFuture.completedFuture(response(200,
249+
Map.of("Content-Type", "application/json", "Acp-Connection-Id", "conn-1"),
250+
initializeResponse));
251+
}
252+
if ("GET".equals(request.method())
253+
&& request.headers().firstValue("Acp-Session-Id").isEmpty()) {
254+
return CompletableFuture.completedFuture(
255+
response(200, Map.of("Content-Type", "text/event-stream"), connectionStreamBody));
256+
}
257+
if ("GET".equals(request.method())) {
258+
sessionGetCount.incrementAndGet();
259+
return CompletableFuture.completedFuture(
260+
response(200, Map.of("Content-Type", "text/event-stream"), emptyBody()));
261+
}
262+
return CompletableFuture.completedFuture(response(202, Map.of(), null));
263+
});
264+
265+
StreamableHttpAcpClientTransport transport = new StreamableHttpAcpClientTransport(
266+
URI.create("https://localhost:8443/acp"), jsonMapper, httpClient);
267+
try {
268+
transport.connect(message -> message.doOnNext(inboundMessages::add).then(Mono.empty())).block();
269+
transport.sendMessage(AcpTestFixtures.createJsonRpcRequest(AcpSchema.METHOD_INITIALIZE, "init-1",
270+
AcpTestFixtures.createInitializeRequest()))
271+
.block();
272+
awaitResponse(inboundMessages, "init-1");
273+
274+
transport.sendMessage(AcpTestFixtures.createJsonRpcRequest(AcpSchema.METHOD_SESSION_NEW, "new-1",
275+
AcpTestFixtures.createNewSessionRequest("/workspace")))
276+
.block();
277+
writeSse(connectionStreamWriter,
278+
new AcpSchema.JSONRPCResponse(AcpSchema.JSONRPC_VERSION, "new-1", null,
279+
new AcpSchema.JSONRPCError(-32000, "agent rejected session", null)));
280+
281+
AcpSchema.JSONRPCResponse response = awaitResponse(inboundMessages, "new-1");
282+
assertThat(response.error()).isNotNull();
283+
assertThat(response.error().message()).isEqualTo("agent rejected session");
284+
assertThat(sessionGetCount).hasValue(0);
285+
}
286+
finally {
287+
connectionStreamWriter.close();
288+
transport.close();
289+
}
290+
}
291+
292+
private void writeSse(PipedOutputStream writer, AcpSchema.JSONRPCMessage message) throws Exception {
293+
writer.write(("data: " + jsonMapper.writeValueAsString(message) + "\n\n").getBytes(StandardCharsets.UTF_8));
294+
writer.flush();
295+
}
296+
297+
private AcpSchema.JSONRPCResponse awaitResponse(BlockingQueue<AcpSchema.JSONRPCMessage> messages, Object id)
298+
throws Exception {
299+
long deadline = System.nanoTime() + TimeUnit.SECONDS.toNanos(2);
300+
while (System.nanoTime() < deadline) {
301+
AcpSchema.JSONRPCMessage message = messages.poll(50, TimeUnit.MILLISECONDS);
302+
if (message instanceof AcpSchema.JSONRPCResponse response && id.equals(response.id())) {
303+
return response;
304+
}
305+
}
306+
throw new AssertionError("Timed out waiting for response " + id);
307+
}
308+
164309
private InputStream emptyBody() {
165310
return new ByteArrayInputStream(new byte[0]);
166311
}

acp-streamable-http-jetty/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
<parent>
88
<groupId>com.agentclientprotocol</groupId>
99
<artifactId>acp-java-sdk</artifactId>
10-
<version>0.12.0-SNAPSHOT</version>
10+
<version>0.13.0-SNAPSHOT</version>
1111
</parent>
1212

1313
<artifactId>acp-streamable-http-jetty</artifactId>

acp-streamable-http-jetty/src/main/java/com/agentclientprotocol/sdk/agent/transport/StreamableHttpAcpAgentTransport.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -581,9 +581,11 @@ void openStream(HttpServletRequest request, HttpServletResponse response, String
581581
}
582582

583583
void close() {
584+
connections.remove(id, this);
584585
connectionStream.close();
585586
sessionStreams.values().forEach(OutboundStream::close);
586-
connection.closeGracefully().subscribe();
587+
connection.closeGracefully().subscribe(v -> {
588+
}, error -> logger.warn("Error closing Streamable HTTP ACP connection {}", id, error));
587589
}
588590

589591
private void routeAgentMessage(JSONRPCMessage message) {
@@ -605,6 +607,7 @@ private void routeAgentMessage(JSONRPCMessage message) {
605607
}
606608
catch (Exception e) {
607609
connection.signalException(e);
610+
close();
608611
}
609612
}
610613

@@ -839,7 +842,8 @@ synchronized void push(String payload) {
839842
}
840843
if (replayOpen) {
841844
if (replay.size() == MAX_REPLAY_EVENTS) {
842-
replay.removeFirst();
845+
throw new AcpConnectionException(
846+
"Outbound SSE replay buffer exceeded " + MAX_REPLAY_EVENTS + " events");
843847
}
844848
replay.addLast(payload);
845849
return;
@@ -996,7 +1000,8 @@ void close(int statusCode, String reason) {
9961000
if (currentSession != null && currentSession.isOpen()) {
9971001
currentSession.close(statusCode, reason, Callback.NOOP);
9981002
}
999-
remoteConnection.closeGracefully().subscribe();
1003+
remoteConnection.closeGracefully().subscribe(v -> {
1004+
}, error -> logger.warn("Error closing Streamable ACP WebSocket connection {}", id, error));
10001005
}
10011006

10021007
private final class SerializedWebSocketSender {

0 commit comments

Comments
 (0)