diff --git a/.changes/next-release/bugfix-AWSCRTHTTPClient-ea797a6.json b/.changes/next-release/bugfix-AWSCRTHTTPClient-ea797a6.json new file mode 100644 index 00000000000..8c96e780170 --- /dev/null +++ b/.changes/next-release/bugfix-AWSCRTHTTPClient-ea797a6.json @@ -0,0 +1,6 @@ +{ + "type": "bugfix", + "category": "AWS CRT HTTP Client", + "contributor": "", + "description": "Fix connection pool exhaustion in the CRT HTTP client where connections were not released after a request abort or timeout." +} diff --git a/build-tools/src/main/resources/software/amazon/awssdk/checkstyle-suppressions.xml b/build-tools/src/main/resources/software/amazon/awssdk/checkstyle-suppressions.xml index 2a6e22ef28b..4e81373b0be 100644 --- a/build-tools/src/main/resources/software/amazon/awssdk/checkstyle-suppressions.xml +++ b/build-tools/src/main/resources/software/amazon/awssdk/checkstyle-suppressions.xml @@ -68,4 +68,8 @@ + + + diff --git a/build-tools/src/main/resources/software/amazon/awssdk/checkstyle.xml b/build-tools/src/main/resources/software/amazon/awssdk/checkstyle.xml index 30f72ec4bad..09c2082a8d4 100644 --- a/build-tools/src/main/resources/software/amazon/awssdk/checkstyle.xml +++ b/build-tools/src/main/resources/software/amazon/awssdk/checkstyle.xml @@ -369,6 +369,15 @@ + + + + + + + + + diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtAsyncRequestExecutor.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtAsyncRequestExecutor.java index 7629683899a..3c1f59fb47b 100644 --- a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtAsyncRequestExecutor.java +++ b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtAsyncRequestExecutor.java @@ -23,7 +23,6 @@ import software.amazon.awssdk.annotations.SdkInternalApi; import software.amazon.awssdk.crt.http.HttpRequestBase; import software.amazon.awssdk.crt.http.HttpStreamBase; -import software.amazon.awssdk.crt.http.HttpStreamBaseResponseHandler; import software.amazon.awssdk.http.SdkCancellationException; import software.amazon.awssdk.http.async.AsyncExecuteRequest; import software.amazon.awssdk.http.async.SdkAsyncHttpResponseHandler; @@ -67,12 +66,21 @@ private void doExecute(CrtAsyncRequestContext executionContext, HttpRequestBase crtRequest = toAsyncCrtRequest(executionContext); - HttpStreamBaseResponseHandler crtResponseHandler = + CrtResponseAdapter crtResponseHandler = CrtResponseAdapter.toCrtResponseHandler(requestFuture, asyncRequest.responseHandler()); CompletableFuture streamFuture = executionContext.streamManager().acquireStream(crtRequest, crtResponseHandler); + streamFuture.thenAccept(crtResponseHandler::onAcquireStream); + + // Evict the connection from the pool on failure so it is not reused. + requestFuture.whenComplete((r, t) -> { + if (t != null) { + crtResponseHandler.closeConnection(); + } + }); + long finalAcquireStartTime = acquireStartTime; streamFuture.whenComplete((stream, throwable) -> { diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtRequestExecutor.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtRequestExecutor.java index 7165696435e..81c506d576e 100644 --- a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtRequestExecutor.java +++ b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/CrtRequestExecutor.java @@ -22,7 +22,6 @@ import software.amazon.awssdk.annotations.SdkInternalApi; import software.amazon.awssdk.crt.http.HttpRequest; import software.amazon.awssdk.crt.http.HttpStreamBase; -import software.amazon.awssdk.crt.http.HttpStreamBaseResponseHandler; import software.amazon.awssdk.http.SdkHttpFullResponse; import software.amazon.awssdk.http.crt.internal.request.CrtRequestAdapter; import software.amazon.awssdk.http.crt.internal.response.InputStreamAdaptingHttpStreamResponseHandler; @@ -57,13 +56,23 @@ private void doExecute(CrtRequestContext executionContext, CompletableFuture streamFuture = executionContext.streamManager().acquireStream(crtRequest, crtResponseHandler); + streamFuture.thenAccept(crtResponseHandler::onAcquireStream); + + // Evict the connection from the pool on failure so it is not reused. + requestFuture.whenComplete((r, t) -> { + if (t != null) { + crtResponseHandler.closeConnection(); + } + }); + long finalAcquireStartTime = acquireStartTime; streamFuture.whenComplete((streamBase, throwable) -> { diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/response/CrtResponseAdapter.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/response/CrtResponseAdapter.java index 1beaa872b5f..4f30b938897 100644 --- a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/response/CrtResponseAdapter.java +++ b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/response/CrtResponseAdapter.java @@ -65,7 +65,7 @@ public CrtResponseAdapter(CompletableFuture completionFuture, this.responseHandlerHelper = new ResponseHandlerHelper(responseBuilder); } - public static HttpStreamBaseResponseHandler toCrtResponseHandler( + public static CrtResponseAdapter toCrtResponseHandler( CompletableFuture requestFuture, SdkAsyncHttpResponseHandler responseHandler) { return new CrtResponseAdapter(requestFuture, responseHandler); @@ -107,6 +107,7 @@ public int onResponseBody(HttpStreamBase stream, byte[] bodyBytesIn) { @Override public void onResponseComplete(HttpStreamBase stream, int errorCode) { + responseHandlerHelper.onResponseComplete(); if (errorCode == CRT.AWS_CRT_SUCCESS) { onSuccessfulResponseComplete(); } else { @@ -145,4 +146,12 @@ private void callResponseHandlerOnError(Throwable error) { log.warn(() -> "Exception raised from SdkAsyncHttpResponseHandler#onError.", e); } } + + public void onAcquireStream(HttpStreamBase stream) { + responseHandlerHelper.onAcquireStream(stream); + } + + public void closeConnection() { + responseHandlerHelper.closeConnection(); + } } diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/response/InputStreamAdaptingHttpStreamResponseHandler.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/response/InputStreamAdaptingHttpStreamResponseHandler.java index bb04d71fdb2..b3baa446790 100644 --- a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/response/InputStreamAdaptingHttpStreamResponseHandler.java +++ b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/response/InputStreamAdaptingHttpStreamResponseHandler.java @@ -109,6 +109,7 @@ public int onResponseBody(HttpStreamBase stream, byte[] bodyBytesIn) { @Override public void onResponseComplete(HttpStreamBase stream, int errorCode) { + responseHandlerHelper.onResponseComplete(); if (errorCode == CRT.AWS_CRT_SUCCESS) { onSuccessfulResponseComplete(); } else { @@ -132,4 +133,12 @@ private void onSuccessfulResponseComplete() { simplePublisher.complete(); responseHandlerHelper.releaseConnection(); } + + public void onAcquireStream(HttpStreamBase stream) { + responseHandlerHelper.onAcquireStream(stream); + } + + public void closeConnection() { + responseHandlerHelper.closeConnection(); + } } diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/response/ResponseHandlerHelper.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/response/ResponseHandlerHelper.java index b90b4321047..3c43a73ecee 100644 --- a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/response/ResponseHandlerHelper.java +++ b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/response/ResponseHandlerHelper.java @@ -32,18 +32,27 @@ public class ResponseHandlerHelper { private final SdkHttpResponse.Builder responseBuilder; private HttpStreamBase stream; private boolean streamClosed; + private boolean streamCompleted; private final Object streamLock = new Object(); public ResponseHandlerHelper(SdkHttpResponse.Builder responseBuilder) { this.responseBuilder = responseBuilder; } - public void onResponseHeaders(HttpStreamBase stream, int responseStatusCode, int headerType, HttpHeader[] nextHeaders) { + /** + * Set the stream reference as soon as it is acquired from the pool, so that closeConnection can + * cancel it even if onResponseHeaders has not yet fired (e.g. the server is unresponsive). + */ + public void onAcquireStream(HttpStreamBase stream) { synchronized (streamLock) { if (this.stream == null) { this.stream = stream; } } + } + + public void onResponseHeaders(HttpStreamBase stream, int responseStatusCode, int headerType, HttpHeader[] nextHeaders) { + onAcquireStream(stream); if (headerType == HttpHeaderBlock.MAIN.getValue()) { for (HttpHeader h : nextHeaders) { responseBuilder.appendHeader(h.getName(), h.getValue()); @@ -73,16 +82,32 @@ public void releaseConnection() { } } + /** + * Called when CRT fires onResponseComplete. After this, {@link #closeConnection()} skips + * {@code cancel()} because per {@link software.amazon.awssdk.crt.http.HttpStreamBase#cancel()} + * javadoc: "if the stream is already completing for other reasons, this call will have no effect." + */ + public void onResponseComplete() { + synchronized (streamLock) { + streamCompleted = true; + } + } + /** * Cancel and close the stream, forcing the underlying connection to shut down rather than be returned to the * connection pool. This should be called on error paths or when the stream is aborted before the response is * fully consumed. {@code cancel()} must be invoked before {@code close()} per the CRT contract. + *

+ * If CRT has already completed the stream via {@link #onResponseComplete()}, {@code cancel()} is skipped + * to avoid a native use-after-free, but {@code close()} is still called to release the Java-side handle. */ public void closeConnection() { synchronized (streamLock) { if (!streamClosed && stream != null) { streamClosed = true; - stream.cancel(); + if (!streamCompleted) { + stream.cancel(); + } stream.close(); } } diff --git a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/AwsCrtHttpClientAbortBehaviorTest.java b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/AwsCrtHttpClientAbortBehaviorTest.java new file mode 100644 index 00000000000..3409048587e --- /dev/null +++ b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/AwsCrtHttpClientAbortBehaviorTest.java @@ -0,0 +1,183 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.http.crt; + +import static com.github.tomakehurst.wiremock.client.WireMock.aResponse; +import static com.github.tomakehurst.wiremock.client.WireMock.any; +import static com.github.tomakehurst.wiremock.client.WireMock.urlPathEqualTo; +import static com.github.tomakehurst.wiremock.core.WireMockConfiguration.wireMockConfig; +import static org.assertj.core.api.Assertions.assertThat; +import static software.amazon.awssdk.http.HttpTestUtils.createProvider; +import static software.amazon.awssdk.http.crt.CrtHttpClientTestUtils.createRequest; + +import com.github.tomakehurst.wiremock.junit5.WireMockExtension; +import com.github.tomakehurst.wiremock.junit5.WireMockRuntimeInfo; +import java.io.ByteArrayInputStream; +import java.net.URI; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import software.amazon.awssdk.http.ExecutableHttpRequest; +import software.amazon.awssdk.http.HttpExecuteRequest; +import software.amazon.awssdk.http.HttpMetric; +import software.amazon.awssdk.http.RecordingResponseHandler; +import software.amazon.awssdk.http.SdkHttpClient; +import software.amazon.awssdk.http.async.AsyncExecuteRequest; +import software.amazon.awssdk.http.async.SdkAsyncHttpClient; +import software.amazon.awssdk.metrics.MetricCollector; + +/** + * Verifies connection pool behavior when requests are aborted in between. + */ +public class AwsCrtHttpClientAbortBehaviorTest { + + @RegisterExtension + static WireMockExtension mockServer = WireMockExtension.newInstance() + .options(wireMockConfig().dynamicPort()) + .build(); + + private static ScheduledExecutorService scheduler; + + @BeforeAll + static void setup() { + scheduler = Executors.newScheduledThreadPool(1); + } + + @AfterAll + static void tearDown() { + scheduler.shutdown(); + } + + /** + * Verifies that aborting in-flight requests evicts connections from the pool — + * the next request succeeds and LEASED_CONCURRENCY is 1. + */ + @Test + void syncClient_whenRequestAborted_connectionIsEvictedFromPool(WireMockRuntimeInfo wm) throws Exception { + URI uri = URI.create("http://localhost:" + wm.getHttpPort()); + + try (SdkHttpClient client = AwsCrtHttpClient.builder().maxConcurrency(3).build()) { + stubUnresponsiveServer(); + executeAndAbort(client, uri, 3); + + // allow cancel() callbacks to complete before asserting pool state + Thread.sleep(200); + + stubResponsiveServer(); + int successCount = 0; + MetricCollector collector = MetricCollector.create("test"); + for (int i = 0; i < 3; i++) { + try { + client.prepareRequest(syncRequest(uri, i == 0 ? collector : null)).call(); + successCount++; + } catch (Exception e) { + // connection not evicted + } + } + + assertThat(successCount).as("%d/%d requests succeeded after aborts", successCount, 3).isEqualTo(3); + assertThat(collector.collect().metricValues(HttpMetric.LEASED_CONCURRENCY)) + .as("LEASED_CONCURRENCY must be 1 after aborts, not %d", 4) + .containsExactly(1); + } + } + + /** + * Verifies that when an async request future completes exceptionally, the connection is + * evicted from the pool and LEASED_CONCURRENCY is 1 for the next request. + */ + @Test + void asyncClient_whenRequestAborted_connectionIsEvictedFromPool(WireMockRuntimeInfo wm) throws Exception { + URI uri = URI.create("http://localhost:" + wm.getHttpPort()); + + try (SdkAsyncHttpClient client = AwsCrtAsyncHttpClient.builder().maxConcurrency(3).build()) { + stubUnresponsiveServer(); + for (int i = 0; i < 3; i++) { + RecordingResponseHandler recorder = new RecordingResponseHandler(); + CompletableFuture future = client.execute(AsyncExecuteRequest.builder() + .request(createRequest(uri)) + .requestContentPublisher(createProvider("")) + .responseHandler(recorder) + .build()); + // abort() equivalent for async: complete the future exceptionally after stream is acquired + scheduler.schedule(() -> future.completeExceptionally(new RuntimeException("timeout")), + 100, TimeUnit.MILLISECONDS); + try { + future.get(2, TimeUnit.SECONDS); + } catch (Exception e) { + // expected + } + // wait for the response handler to finish so cancel() has completed before next iteration + try { + recorder.completeFuture().get(2, TimeUnit.SECONDS); + } catch (Exception e) { + // expected — handler receives the error + } + } + + stubResponsiveServer(); + MetricCollector collector = MetricCollector.create("test"); + RecordingResponseHandler recorder = new RecordingResponseHandler(); + client.execute(AsyncExecuteRequest.builder() + .request(createRequest(uri)) + .requestContentPublisher(createProvider("")) + .responseHandler(recorder) + .metricCollector(collector) + .build()); + recorder.completeFuture().get(5, TimeUnit.SECONDS); + + assertThat(collector.collect().metricValues(HttpMetric.LEASED_CONCURRENCY)) + .as("LEASED_CONCURRENCY must be 1 after exceptionally-completed futures, not %d", 4) + .containsExactly(1); + } + } + + private void executeAndAbort(SdkHttpClient client, URI uri, int count) { + for (int i = 0; i < count; i++) { + ExecutableHttpRequest req = client.prepareRequest(syncRequest(uri, null)); + // abort() must be called from another thread while call() is blocking + scheduler.schedule(req::abort, 100, TimeUnit.MILLISECONDS); + try { + req.call(); + } catch (Exception e) { + // expected — aborted + } + } + } + + private HttpExecuteRequest syncRequest(URI uri, MetricCollector collector) { + HttpExecuteRequest.Builder builder = HttpExecuteRequest.builder() + .request(createRequest(uri)) + .contentStreamProvider(() -> new ByteArrayInputStream(new byte[0])); + if (collector != null) { + builder.metricCollector(collector); + } + return builder.build(); + } + + private void stubUnresponsiveServer() { + mockServer.stubFor(any(urlPathEqualTo("/")).willReturn(aResponse().withFixedDelay(5000).withBody("slow"))); + } + + private void stubResponsiveServer() { + mockServer.stubFor(any(urlPathEqualTo("/")).willReturn(aResponse().withStatus(200).withBody("OK"))); + } +} diff --git a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/BaseHttpStreamResponseHandlerTest.java b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/BaseHttpStreamResponseHandlerTest.java index 9318f6af228..0026a16c713 100644 --- a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/BaseHttpStreamResponseHandlerTest.java +++ b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/BaseHttpStreamResponseHandlerTest.java @@ -91,7 +91,7 @@ void nonServerError_shouldCloseStream(int statusCode) { } @Test - void failedToGetResponse_shouldCancelAndCloseStream() { + void failedToGetResponse_shouldCloseStreamWithoutCancel() { HttpHeader[] httpHeaders = getHttpHeaders(); responseHandler.onResponseHeaders(httpStream, 200, HttpHeaderBlock.MAIN.getValue(), httpHeaders); @@ -99,7 +99,10 @@ void failedToGetResponse_shouldCancelAndCloseStream() { responseHandler.onResponseComplete(httpStream, 1); assertThatThrownBy(() -> requestFuture.join()).hasRootCauseInstanceOf(HttpException.class); InOrder inOrder = Mockito.inOrder(httpStream); - inOrder.verify(httpStream).cancel(); + // cancel() is skipped when CRT has already completed the stream (streamCompleted=true). + // Per HttpStreamBase.cancel() javadoc: "if the stream is already completing, this call will have no effect." + // For HTTP/2, calling cancel() on an already-completed stream triggers premature native resource cleanup. + verify(httpStream, never()).cancel(); inOrder.verify(httpStream).close(); }