requestFuture,
SdkAsyncHttpResponseHandler responseHandler) {
return new CrtResponseAdapter(requestFuture, responseHandler);
@@ -107,6 +107,7 @@ public int onResponseBody(HttpStreamBase stream, byte[] bodyBytesIn) {
@Override
public void onResponseComplete(HttpStreamBase stream, int errorCode) {
+ responseHandlerHelper.onResponseComplete();
if (errorCode == CRT.AWS_CRT_SUCCESS) {
onSuccessfulResponseComplete();
} else {
@@ -145,4 +146,12 @@ private void callResponseHandlerOnError(Throwable error) {
log.warn(() -> "Exception raised from SdkAsyncHttpResponseHandler#onError.", e);
}
}
+
+ public void onAcquireStream(HttpStreamBase stream) {
+ responseHandlerHelper.onAcquireStream(stream);
+ }
+
+ public void closeConnection() {
+ responseHandlerHelper.closeConnection();
+ }
}
diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/response/InputStreamAdaptingHttpStreamResponseHandler.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/response/InputStreamAdaptingHttpStreamResponseHandler.java
index bb04d71fdb2..b3baa446790 100644
--- a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/response/InputStreamAdaptingHttpStreamResponseHandler.java
+++ b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/response/InputStreamAdaptingHttpStreamResponseHandler.java
@@ -109,6 +109,7 @@ public int onResponseBody(HttpStreamBase stream, byte[] bodyBytesIn) {
@Override
public void onResponseComplete(HttpStreamBase stream, int errorCode) {
+ responseHandlerHelper.onResponseComplete();
if (errorCode == CRT.AWS_CRT_SUCCESS) {
onSuccessfulResponseComplete();
} else {
@@ -132,4 +133,12 @@ private void onSuccessfulResponseComplete() {
simplePublisher.complete();
responseHandlerHelper.releaseConnection();
}
+
+ public void onAcquireStream(HttpStreamBase stream) {
+ responseHandlerHelper.onAcquireStream(stream);
+ }
+
+ public void closeConnection() {
+ responseHandlerHelper.closeConnection();
+ }
}
diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/response/ResponseHandlerHelper.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/response/ResponseHandlerHelper.java
index b90b4321047..3c43a73ecee 100644
--- a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/response/ResponseHandlerHelper.java
+++ b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/response/ResponseHandlerHelper.java
@@ -32,18 +32,27 @@ public class ResponseHandlerHelper {
private final SdkHttpResponse.Builder responseBuilder;
private HttpStreamBase stream;
private boolean streamClosed;
+ private boolean streamCompleted;
private final Object streamLock = new Object();
public ResponseHandlerHelper(SdkHttpResponse.Builder responseBuilder) {
this.responseBuilder = responseBuilder;
}
- public void onResponseHeaders(HttpStreamBase stream, int responseStatusCode, int headerType, HttpHeader[] nextHeaders) {
+ /**
+ * Set the stream reference as soon as it is acquired from the pool, so that closeConnection can
+ * cancel it even if onResponseHeaders has not yet fired (e.g. the server is unresponsive).
+ */
+ public void onAcquireStream(HttpStreamBase stream) {
synchronized (streamLock) {
if (this.stream == null) {
this.stream = stream;
}
}
+ }
+
+ public void onResponseHeaders(HttpStreamBase stream, int responseStatusCode, int headerType, HttpHeader[] nextHeaders) {
+ onAcquireStream(stream);
if (headerType == HttpHeaderBlock.MAIN.getValue()) {
for (HttpHeader h : nextHeaders) {
responseBuilder.appendHeader(h.getName(), h.getValue());
@@ -73,16 +82,32 @@ public void releaseConnection() {
}
}
+ /**
+ * Called when CRT fires onResponseComplete. After this, {@link #closeConnection()} skips
+ * {@code cancel()} because per {@link software.amazon.awssdk.crt.http.HttpStreamBase#cancel()}
+ * javadoc: "if the stream is already completing for other reasons, this call will have no effect."
+ */
+ public void onResponseComplete() {
+ synchronized (streamLock) {
+ streamCompleted = true;
+ }
+ }
+
/**
* Cancel and close the stream, forcing the underlying connection to shut down rather than be returned to the
* connection pool. This should be called on error paths or when the stream is aborted before the response is
* fully consumed. {@code cancel()} must be invoked before {@code close()} per the CRT contract.
+ *
+ * If CRT has already completed the stream via {@link #onResponseComplete()}, {@code cancel()} is skipped
+ * to avoid a native use-after-free, but {@code close()} is still called to release the Java-side handle.
*/
public void closeConnection() {
synchronized (streamLock) {
if (!streamClosed && stream != null) {
streamClosed = true;
- stream.cancel();
+ if (!streamCompleted) {
+ stream.cancel();
+ }
stream.close();
}
}
diff --git a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/AwsCrtHttpClientAbortBehaviorTest.java b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/AwsCrtHttpClientAbortBehaviorTest.java
new file mode 100644
index 00000000000..3409048587e
--- /dev/null
+++ b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/AwsCrtHttpClientAbortBehaviorTest.java
@@ -0,0 +1,183 @@
+/*
+ * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License").
+ * You may not use this file except in compliance with the License.
+ * A copy of the License is located at
+ *
+ * http://aws.amazon.com/apache2.0
+ *
+ * or in the "license" file accompanying this file. This file is distributed
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package software.amazon.awssdk.http.crt;
+
+import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;
+import static com.github.tomakehurst.wiremock.client.WireMock.any;
+import static com.github.tomakehurst.wiremock.client.WireMock.urlPathEqualTo;
+import static com.github.tomakehurst.wiremock.core.WireMockConfiguration.wireMockConfig;
+import static org.assertj.core.api.Assertions.assertThat;
+import static software.amazon.awssdk.http.HttpTestUtils.createProvider;
+import static software.amazon.awssdk.http.crt.CrtHttpClientTestUtils.createRequest;
+
+import com.github.tomakehurst.wiremock.junit5.WireMockExtension;
+import com.github.tomakehurst.wiremock.junit5.WireMockRuntimeInfo;
+import java.io.ByteArrayInputStream;
+import java.net.URI;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import software.amazon.awssdk.http.ExecutableHttpRequest;
+import software.amazon.awssdk.http.HttpExecuteRequest;
+import software.amazon.awssdk.http.HttpMetric;
+import software.amazon.awssdk.http.RecordingResponseHandler;
+import software.amazon.awssdk.http.SdkHttpClient;
+import software.amazon.awssdk.http.async.AsyncExecuteRequest;
+import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
+import software.amazon.awssdk.metrics.MetricCollector;
+
+/**
+ * Verifies connection pool behavior when requests are aborted in between.
+ */
+public class AwsCrtHttpClientAbortBehaviorTest {
+
+ @RegisterExtension
+ static WireMockExtension mockServer = WireMockExtension.newInstance()
+ .options(wireMockConfig().dynamicPort())
+ .build();
+
+ private static ScheduledExecutorService scheduler;
+
+ @BeforeAll
+ static void setup() {
+ scheduler = Executors.newScheduledThreadPool(1);
+ }
+
+ @AfterAll
+ static void tearDown() {
+ scheduler.shutdown();
+ }
+
+ /**
+ * Verifies that aborting in-flight requests evicts connections from the pool —
+ * the next request succeeds and LEASED_CONCURRENCY is 1.
+ */
+ @Test
+ void syncClient_whenRequestAborted_connectionIsEvictedFromPool(WireMockRuntimeInfo wm) throws Exception {
+ URI uri = URI.create("http://localhost:" + wm.getHttpPort());
+
+ try (SdkHttpClient client = AwsCrtHttpClient.builder().maxConcurrency(3).build()) {
+ stubUnresponsiveServer();
+ executeAndAbort(client, uri, 3);
+
+ // allow cancel() callbacks to complete before asserting pool state
+ Thread.sleep(200);
+
+ stubResponsiveServer();
+ int successCount = 0;
+ MetricCollector collector = MetricCollector.create("test");
+ for (int i = 0; i < 3; i++) {
+ try {
+ client.prepareRequest(syncRequest(uri, i == 0 ? collector : null)).call();
+ successCount++;
+ } catch (Exception e) {
+ // connection not evicted
+ }
+ }
+
+ assertThat(successCount).as("%d/%d requests succeeded after aborts", successCount, 3).isEqualTo(3);
+ assertThat(collector.collect().metricValues(HttpMetric.LEASED_CONCURRENCY))
+ .as("LEASED_CONCURRENCY must be 1 after aborts, not %d", 4)
+ .containsExactly(1);
+ }
+ }
+
+ /**
+ * Verifies that when an async request future completes exceptionally, the connection is
+ * evicted from the pool and LEASED_CONCURRENCY is 1 for the next request.
+ */
+ @Test
+ void asyncClient_whenRequestAborted_connectionIsEvictedFromPool(WireMockRuntimeInfo wm) throws Exception {
+ URI uri = URI.create("http://localhost:" + wm.getHttpPort());
+
+ try (SdkAsyncHttpClient client = AwsCrtAsyncHttpClient.builder().maxConcurrency(3).build()) {
+ stubUnresponsiveServer();
+ for (int i = 0; i < 3; i++) {
+ RecordingResponseHandler recorder = new RecordingResponseHandler();
+ CompletableFuture future = client.execute(AsyncExecuteRequest.builder()
+ .request(createRequest(uri))
+ .requestContentPublisher(createProvider(""))
+ .responseHandler(recorder)
+ .build());
+ // abort() equivalent for async: complete the future exceptionally after stream is acquired
+ scheduler.schedule(() -> future.completeExceptionally(new RuntimeException("timeout")),
+ 100, TimeUnit.MILLISECONDS);
+ try {
+ future.get(2, TimeUnit.SECONDS);
+ } catch (Exception e) {
+ // expected
+ }
+ // wait for the response handler to finish so cancel() has completed before next iteration
+ try {
+ recorder.completeFuture().get(2, TimeUnit.SECONDS);
+ } catch (Exception e) {
+ // expected — handler receives the error
+ }
+ }
+
+ stubResponsiveServer();
+ MetricCollector collector = MetricCollector.create("test");
+ RecordingResponseHandler recorder = new RecordingResponseHandler();
+ client.execute(AsyncExecuteRequest.builder()
+ .request(createRequest(uri))
+ .requestContentPublisher(createProvider(""))
+ .responseHandler(recorder)
+ .metricCollector(collector)
+ .build());
+ recorder.completeFuture().get(5, TimeUnit.SECONDS);
+
+ assertThat(collector.collect().metricValues(HttpMetric.LEASED_CONCURRENCY))
+ .as("LEASED_CONCURRENCY must be 1 after exceptionally-completed futures, not %d", 4)
+ .containsExactly(1);
+ }
+ }
+
+ private void executeAndAbort(SdkHttpClient client, URI uri, int count) {
+ for (int i = 0; i < count; i++) {
+ ExecutableHttpRequest req = client.prepareRequest(syncRequest(uri, null));
+ // abort() must be called from another thread while call() is blocking
+ scheduler.schedule(req::abort, 100, TimeUnit.MILLISECONDS);
+ try {
+ req.call();
+ } catch (Exception e) {
+ // expected — aborted
+ }
+ }
+ }
+
+ private HttpExecuteRequest syncRequest(URI uri, MetricCollector collector) {
+ HttpExecuteRequest.Builder builder = HttpExecuteRequest.builder()
+ .request(createRequest(uri))
+ .contentStreamProvider(() -> new ByteArrayInputStream(new byte[0]));
+ if (collector != null) {
+ builder.metricCollector(collector);
+ }
+ return builder.build();
+ }
+
+ private void stubUnresponsiveServer() {
+ mockServer.stubFor(any(urlPathEqualTo("/")).willReturn(aResponse().withFixedDelay(5000).withBody("slow")));
+ }
+
+ private void stubResponsiveServer() {
+ mockServer.stubFor(any(urlPathEqualTo("/")).willReturn(aResponse().withStatus(200).withBody("OK")));
+ }
+}
diff --git a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/BaseHttpStreamResponseHandlerTest.java b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/BaseHttpStreamResponseHandlerTest.java
index 9318f6af228..0026a16c713 100644
--- a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/BaseHttpStreamResponseHandlerTest.java
+++ b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/BaseHttpStreamResponseHandlerTest.java
@@ -91,7 +91,7 @@ void nonServerError_shouldCloseStream(int statusCode) {
}
@Test
- void failedToGetResponse_shouldCancelAndCloseStream() {
+ void failedToGetResponse_shouldCloseStreamWithoutCancel() {
HttpHeader[] httpHeaders = getHttpHeaders();
responseHandler.onResponseHeaders(httpStream, 200, HttpHeaderBlock.MAIN.getValue(),
httpHeaders);
@@ -99,7 +99,10 @@ void failedToGetResponse_shouldCancelAndCloseStream() {
responseHandler.onResponseComplete(httpStream, 1);
assertThatThrownBy(() -> requestFuture.join()).hasRootCauseInstanceOf(HttpException.class);
InOrder inOrder = Mockito.inOrder(httpStream);
- inOrder.verify(httpStream).cancel();
+ // cancel() is skipped when CRT has already completed the stream (streamCompleted=true).
+ // Per HttpStreamBase.cancel() javadoc: "if the stream is already completing, this call will have no effect."
+ // For HTTP/2, calling cancel() on an already-completed stream triggers premature native resource cleanup.
+ verify(httpStream, never()).cancel();
inOrder.verify(httpStream).close();
}