diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/SplittingTransformerConfiguration.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/SplittingTransformerConfiguration.java index 766213195203..1277536640a9 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/SplittingTransformerConfiguration.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/SplittingTransformerConfiguration.java @@ -16,6 +16,7 @@ package software.amazon.awssdk.core; import java.util.Objects; +import java.util.function.UnaryOperator; import software.amazon.awssdk.annotations.SdkPublicApi; import software.amazon.awssdk.core.async.AsyncResponseTransformer; import software.amazon.awssdk.core.internal.async.SplittingTransformer; @@ -35,9 +36,11 @@ public final class SplittingTransformerConfiguration implements ToCopyableBuilde SplittingTransformerConfiguration> { private final Long bufferSizeInBytes; + private final UnaryOperator responseMapper; private SplittingTransformerConfiguration(DefaultBuilder builder) { this.bufferSizeInBytes = Validate.paramNotNull(builder.bufferSize, "bufferSize"); + this.responseMapper = builder.responseMapper; } /** @@ -54,6 +57,13 @@ public Long bufferSizeInBytes() { return bufferSizeInBytes; } + /** + * @return the response mapper, or null if not set + */ + public UnaryOperator responseMapper() { + return responseMapper; + } + @Override public boolean equals(Object o) { if (this == o) { @@ -65,12 +75,15 @@ public boolean equals(Object o) { SplittingTransformerConfiguration that = (SplittingTransformerConfiguration) o; - return Objects.equals(bufferSizeInBytes, that.bufferSizeInBytes); + return Objects.equals(bufferSizeInBytes, that.bufferSizeInBytes) + && Objects.equals(responseMapper, that.responseMapper); } @Override public int hashCode() { - return bufferSizeInBytes != null ? bufferSizeInBytes.hashCode() : 0; + int result = bufferSizeInBytes != null ? bufferSizeInBytes.hashCode() : 0; + result = 31 * result + (responseMapper != null ? responseMapper.hashCode() : 0); + return result; } @Override @@ -94,13 +107,25 @@ public interface Builder extends CopyableBuilder responseMapper); } private static final class DefaultBuilder implements Builder { private Long bufferSize; + private UnaryOperator responseMapper; private DefaultBuilder(SplittingTransformerConfiguration configuration) { this.bufferSize = configuration.bufferSizeInBytes; + this.responseMapper = configuration.responseMapper; } private DefaultBuilder() { @@ -112,6 +137,12 @@ public Builder bufferSizeInBytes(Long bufferSize) { return this; } + @Override + public Builder responseMapper(UnaryOperator responseMapper) { + this.responseMapper = responseMapper; + return this; + } + @Override public SplittingTransformerConfiguration build() { return new SplittingTransformerConfiguration(this); diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncResponseTransformer.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncResponseTransformer.java index 4c27501e6340..dd4a5140170f 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncResponseTransformer.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncResponseTransformer.java @@ -138,6 +138,7 @@ default SplitResult split(SplittingTransformerConfiguration .builder() .upstreamResponseTransformer(this) .maximumBufferSizeInBytes(splitConfig.bufferSizeInBytes()) + .responseMapper(splitConfig.responseMapper()) .resultFuture(future) .build(); return AsyncResponseTransformer.SplitResult.builder() diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ByteArrayAsyncResponseTransformer.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ByteArrayAsyncResponseTransformer.java index e250eab650b0..3c94080f604c 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ByteArrayAsyncResponseTransformer.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ByteArrayAsyncResponseTransformer.java @@ -72,7 +72,7 @@ public void exceptionOccurred(Throwable throwable) { public SplitResult> split(SplittingTransformerConfiguration splitConfig) { CompletableFuture> future = new CompletableFuture<>(); SdkPublisher> transformer = - new ByteArraySplittingTransformer<>(this, future); + new ByteArraySplittingTransformer<>(this, future, splitConfig.responseMapper()); return AsyncResponseTransformer.SplitResult.>builder() .publisher(transformer) .resultFuture(future) diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ByteArraySplittingTransformer.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ByteArraySplittingTransformer.java index 2531f7f8166b..fa5761d01bd2 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ByteArraySplittingTransformer.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ByteArraySplittingTransformer.java @@ -23,10 +23,12 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.UnaryOperator; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; import software.amazon.awssdk.annotations.SdkInternalApi; import software.amazon.awssdk.core.ResponseBytes; +import software.amazon.awssdk.core.SdkResponse; import software.amazon.awssdk.core.async.AsyncResponseTransformer; import software.amazon.awssdk.core.async.SdkPublisher; import software.amazon.awssdk.core.exception.SdkClientException; @@ -84,12 +86,30 @@ public class ByteArraySplittingTransformer implements SdkPublisher buffers; + private final UnaryOperator responseMapper; + public ByteArraySplittingTransformer(AsyncResponseTransformer> upstreamResponseTransformer, CompletableFuture> resultFuture) { + this(upstreamResponseTransformer, resultFuture, UnaryOperator.identity()); + } + + public ByteArraySplittingTransformer(AsyncResponseTransformer> + upstreamResponseTransformer, + CompletableFuture> resultFuture, + UnaryOperator responseMapper) { this.upstreamResponseTransformer = upstreamResponseTransformer; this.resultFuture = resultFuture; this.buffers = new ConcurrentHashMap<>(); + this.responseMapper = responseMapper != null ? responseMapper : UnaryOperator.identity(); + } + + @SuppressWarnings("unchecked") + private ResponseT mapResponse(ResponseT response) { + if (!(response instanceof SdkResponse)) { + return response; + } + return (ResponseT) responseMapper.apply((SdkResponse) response); } @Override @@ -181,7 +201,7 @@ private void handleSubscriptionCancel() { CompletableFuture> upstreamPrepareFuture = upstreamResponseTransformer.prepare(); CompletableFutureUtils.forwardResultTo(upstreamPrepareFuture, resultFuture); - upstreamResponseTransformer.onResponse(responseT.get()); + upstreamResponseTransformer.onResponse(mapResponse(responseT.get())); int totalPartCount = nextPartNumber.get() - 1; if (buffers.size() != totalPartCount) { diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/SplittingTransformer.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/SplittingTransformer.java index d4cf1c7a2356..0b94dd2f7c34 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/SplittingTransformer.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/SplittingTransformer.java @@ -19,9 +19,11 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.UnaryOperator; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; import software.amazon.awssdk.annotations.SdkInternalApi; +import software.amazon.awssdk.core.SdkResponse; import software.amazon.awssdk.core.SplittingTransformerConfiguration; import software.amazon.awssdk.core.async.AsyncResponseTransformer; import software.amazon.awssdk.core.async.SdkPublisher; @@ -112,16 +114,18 @@ public class SplittingTransformer implements SdkPublisher upstreamResponseTransformer, - Long maximumBufferSizeInBytes, - CompletableFuture resultFuture) { + private final UnaryOperator responseMapper; + + private SplittingTransformer(Builder builder) { this.upstreamResponseTransformer = Validate.paramNotNull( - upstreamResponseTransformer, "upstreamResponseTransformer"); - this.resultFuture = Validate.paramNotNull( - resultFuture, "resultFuture"); - Validate.notNull(maximumBufferSizeInBytes, "maximumBufferSizeInBytes"); + builder.upstreamResponseTransformer, "upstreamResponseTransformer"); + this.resultFuture = Validate.paramNotNull(builder.returnFuture, "resultFuture"); + Validate.notNull(builder.maximumBufferSize, "maximumBufferSizeInBytes"); this.maximumBufferInBytes = Validate.isPositive( - maximumBufferSizeInBytes, "maximumBufferSizeInBytes"); + builder.maximumBufferSize, "maximumBufferSizeInBytes"); + this.responseMapper = builder.responseMapper != null + ? builder.responseMapper + : UnaryOperator.identity(); this.resultFuture.whenComplete((r, e) -> { if (e == null) { @@ -133,6 +137,14 @@ private SplittingTransformer(AsyncResponseTransformer upstre }); } + @SuppressWarnings("unchecked") + private ResponseT mapResponse(ResponseT response) { + if (!(response instanceof SdkResponse)) { + return response; + } + return (ResponseT) responseMapper.apply((SdkResponse) response); + } + /** * @param downstreamSubscriber the {@link Subscriber} to the individual AsyncResponseTransformer */ @@ -296,7 +308,7 @@ public CompletableFuture prepare() { public void onResponse(ResponseT response) { if (onResponseCalled.compareAndSet(false, true)) { log.trace(() -> "calling onResponse on the upstream transformer"); - upstreamResponseTransformer.onResponse(response); + upstreamResponseTransformer.onResponse(mapResponse(response)); } this.response = response; } @@ -393,6 +405,7 @@ public static final class Builder { private Long maximumBufferSize; private CompletableFuture returnFuture; private AsyncResponseTransformer upstreamResponseTransformer; + private UnaryOperator responseMapper; private Builder() { } @@ -437,10 +450,13 @@ public Builder resultFuture(CompletableFuture retur return this; } + public Builder responseMapper(UnaryOperator responseMapper) { + this.responseMapper = responseMapper; + return this; + } + public SplittingTransformer build() { - return new SplittingTransformer<>(this.upstreamResponseTransformer, - this.maximumBufferSize, - this.returnFuture); + return new SplittingTransformer<>(this); } } } diff --git a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/GenericS3TransferManager.java b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/GenericS3TransferManager.java index c86a3fea12fb..f92c5196a095 100644 --- a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/GenericS3TransferManager.java +++ b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/GenericS3TransferManager.java @@ -334,7 +334,7 @@ private CopyObjectRequest attachSdkAttribute(CopyObjectRequest copyObjectRequest } private GetObjectRequest attachSdkAttribute(GetObjectRequest request, - Consumer builderMutation) { + Consumer builderMutation) { AwsRequestOverrideConfiguration modifiedRequestOverrideConfig = request.overrideConfiguration() .map(o -> o.toBuilder().applyMutation(builderMutation).build()) diff --git a/services/s3/src/it/java/software/amazon/awssdk/services/s3/presignedurl/AsyncPresignedUrlExtensionTestSuite.java b/services/s3/src/it/java/software/amazon/awssdk/services/s3/presignedurl/AsyncPresignedUrlExtensionTestSuite.java index 1ff737b52850..b5a3edbfd8f7 100644 --- a/services/s3/src/it/java/software/amazon/awssdk/services/s3/presignedurl/AsyncPresignedUrlExtensionTestSuite.java +++ b/services/s3/src/it/java/software/amazon/awssdk/services/s3/presignedurl/AsyncPresignedUrlExtensionTestSuite.java @@ -24,7 +24,9 @@ import java.nio.file.Path; import java.time.Duration; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; +import java.util.Random; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; @@ -41,15 +43,21 @@ import software.amazon.awssdk.core.ResponseBytes; import software.amazon.awssdk.core.async.AsyncRequestBody; import software.amazon.awssdk.core.async.AsyncResponseTransformer; +import software.amazon.awssdk.core.sync.RequestBody; import software.amazon.awssdk.metrics.MetricCollection; import software.amazon.awssdk.metrics.MetricPublisher; import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.S3IntegrationTestBase; +import software.amazon.awssdk.services.s3.model.ChecksumAlgorithm; import software.amazon.awssdk.services.s3.model.ChecksumMode; +import software.amazon.awssdk.services.s3.model.CompletedMultipartUpload; +import software.amazon.awssdk.services.s3.model.CompletedPart; +import software.amazon.awssdk.services.s3.model.CreateMultipartUploadResponse; import software.amazon.awssdk.services.s3.model.DeleteObjectRequest; import software.amazon.awssdk.services.s3.model.GetObjectResponse; import software.amazon.awssdk.services.s3.model.PutObjectRequest; import software.amazon.awssdk.services.s3.model.S3Exception; +import software.amazon.awssdk.services.s3.model.UploadPartResponse; import software.amazon.awssdk.services.s3.presigner.S3Presigner; import software.amazon.awssdk.services.s3.presigner.model.PresignedGetObjectRequest; import software.amazon.awssdk.services.s3.presignedurl.model.PresignedUrlDownloadRequest; @@ -70,8 +78,10 @@ public abstract class AsyncPresignedUrlExtensionTestSuite extends S3IntegrationT protected static String testGetObjectKey; protected static String testLargeObjectKey; + protected static String testMpuChecksumKey; protected static String testObjectContent; protected static byte[] testLargeObjectContent; + protected static byte[] testMpuObjectContent; protected static String expectedLargeObjectMd5; protected abstract S3AsyncClient createS3AsyncClient(); @@ -105,6 +115,7 @@ static void setUpTestSuite() throws Exception { .build(), AsyncRequestBody.fromBytes(testLargeObjectContent) ).join(); + uploadMpuObjectWithChecksum(); S3TestUtils.addCleanupTask(AsyncPresignedUrlExtensionTestSuite.class, () -> { s3.deleteObject(DeleteObjectRequest.builder() .bucket(testBucket) @@ -114,6 +125,10 @@ static void setUpTestSuite() throws Exception { .bucket(testBucket) .key(testLargeObjectKey) .build()); + s3.deleteObject(DeleteObjectRequest.builder() + .bucket(testBucket) + .key(testMpuChecksumKey) + .build()); deleteBucketAndAllContents(testBucket); }); } @@ -152,6 +167,8 @@ void getObject_withValidPresignedUrl_returnsContent(String testDescription, assertThat(response.asByteArray().length).isEqualTo(testLargeObjectContent.length); } assertThat(response.response()).isNotNull(); + assertThat(response.response().contentLength()).isEqualTo((long) testLargeObjectContent.length); + assertThat(response.response().sdkHttpResponse().firstMatchingHeader("x-amz-request-id")).isPresent(); } } @@ -239,6 +256,8 @@ public void close() {} assertThat(response).isNotNull(); assertThat(downloadFile).exists(); assertThat(downloadFile.toFile().length()).isEqualTo(testLargeObjectContent.length); + assertThat(response.contentLength()).isEqualTo((long) testLargeObjectContent.length); + assertThat(response.sdkHttpResponse().firstMatchingHeader("x-amz-request-id")).isPresent(); assertThat(collectedMetrics).isNotEmpty(); } } @@ -359,7 +378,123 @@ static Stream rangeTestData() { ); } + @Test + void getObject_withRangeRequest_preservesPartialMetadata() throws Exception { + PresignedUrlDownloadRequest request = createRequestForKey(testLargeObjectKey, "bytes=0-1048575"); + ResponseBytes response = + presignedUrlExtension.getObject(request, AsyncResponseTransformer.toBytes()) + .get(30, TimeUnit.SECONDS); + + assertThat(response.response().contentLength()).isEqualTo(1048576L); + assertThat(response.response().contentRange()).isNotNull(); + assertThat(response.response().sdkHttpResponse().firstMatchingHeader("x-amz-request-id")).isPresent(); + } + + @ParameterizedTest(name = "getObject_largeObject_{0}_hasCorrectFullObjectMetadata") + @MethodSource("transformerTypes") + void getObject_largeObject_hasCorrectFullObjectMetadata(String type) throws Exception { + PresignedUrlDownloadRequest request = createRequestForKey(testLargeObjectKey); + + if ("toFile".equals(type)) { + Path downloadFile = temporaryFolder.resolve("large-metadata-test-" + UUID.randomUUID() + ".bin"); + GetObjectResponse response = + presignedUrlExtension.getObject(request, downloadFile) + .get(60, TimeUnit.SECONDS); + + assertThat(response.contentLength()).isEqualTo((long) testLargeObjectContent.length); + assertThat(downloadFile.toFile().length()).isEqualTo(testLargeObjectContent.length); + assertThat(response.sdkHttpResponse().firstMatchingHeader("x-amz-request-id")).isPresent(); + } else { + ResponseBytes response = + presignedUrlExtension.getObject(request, AsyncResponseTransformer.toBytes()) + .get(60, TimeUnit.SECONDS); + + assertThat(response.asByteArray().length).isEqualTo(testLargeObjectContent.length); + assertThat(response.response().contentLength()).isEqualTo((long) testLargeObjectContent.length); + assertThat(response.response().sdkHttpResponse().firstMatchingHeader("x-amz-request-id")).isPresent(); + } + } + + static Stream transformerTypes() { + return Stream.of("toFile", "toBytes"); + } + + @ParameterizedTest(name = "getObject_mpuObject_{0}_hasCorrectMetadata") + @MethodSource("checksumModes") + void getObject_mpuObject_hasCorrectMetadata(String mode) throws Exception { + PresignedUrlDownloadRequest request; + if ("withChecksumMode".equals(mode)) { + PresignedGetObjectRequest presigned = presigner.presignGetObject(r -> r + .getObjectRequest(req -> req.bucket(testBucket).key(testMpuChecksumKey) + .checksumMode(ChecksumMode.ENABLED)) + .signatureDuration(Duration.ofMinutes(10))); + request = PresignedUrlDownloadRequest.builder().presignedUrl(presigned.url()).build(); + } else { + request = PresignedUrlDownloadRequest.builder() + .presignedUrl(createPresignedUrl(testMpuChecksumKey)) + .build(); + } + + ResponseBytes response = + presignedUrlExtension.getObject(request, AsyncResponseTransformer.toBytes()) + .get(60, TimeUnit.SECONDS); + + assertThat(response.asByteArray().length).isEqualTo(testMpuObjectContent.length); + assertThat(response.response().contentLength()).isEqualTo((long) testMpuObjectContent.length); + assertThat(response.response().sdkHttpResponse().firstMatchingHeader("x-amz-request-id")).isPresent(); + } + + static Stream checksumModes() { + return Stream.of("withChecksumMode", "withoutChecksumMode"); + } + + @Test + void getObject_mpuObjectWithRange_preservesPartialMetadata() throws Exception { + PresignedUrlDownloadRequest request = PresignedUrlDownloadRequest.builder() + .presignedUrl(createPresignedUrl(testMpuChecksumKey)) + .range("bytes=0-1048575") + .build(); + + ResponseBytes response = + presignedUrlExtension.getObject(request, AsyncResponseTransformer.toBytes()) + .get(30, TimeUnit.SECONDS); + + assertThat(response.response().contentLength()).isEqualTo(1048576L); + assertThat(response.response().contentRange()).contains("bytes 0-1048575/"); + assertThat(response.response().sdkHttpResponse().firstMatchingHeader("x-amz-request-id")).isPresent(); + } + // Helper methods + private static void uploadMpuObjectWithChecksum() { + testMpuChecksumKey = generateRandomObjectKey() + "-mpu-checksum"; + int partSize = 5 * 1024 * 1024; + int numParts = 2; + testMpuObjectContent = new byte[partSize * numParts]; + new Random(42).nextBytes(testMpuObjectContent); + + CreateMultipartUploadResponse createResp = + s3.createMultipartUpload(b -> b.bucket(testBucket).key(testMpuChecksumKey) + .checksumAlgorithm(ChecksumAlgorithm.CRC32)); + String uploadId = createResp.uploadId(); + List parts = new ArrayList<>(); + + for (int i = 0; i < numParts; i++) { + byte[] partData = Arrays.copyOfRange(testMpuObjectContent, i * partSize, (i + 1) * partSize); + final int partNum = i + 1; + UploadPartResponse uploadResp = s3.uploadPart( + b -> b.bucket(testBucket).key(testMpuChecksumKey).uploadId(uploadId).partNumber(partNum) + .checksumAlgorithm(ChecksumAlgorithm.CRC32), + RequestBody.fromBytes(partData)); + parts.add(CompletedPart.builder() + .partNumber(partNum).eTag(uploadResp.eTag()) + .checksumCRC32(uploadResp.checksumCRC32()).build()); + } + + s3.completeMultipartUpload(b -> b.bucket(testBucket).key(testMpuChecksumKey).uploadId(uploadId) + .multipartUpload(CompletedMultipartUpload.builder() + .parts(parts).build())); + } + private static String generateRandomObjectKey() { return "async-presigned-url-extension-test-" + UUID.randomUUID(); } diff --git a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartDownloadUtils.java b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartDownloadUtils.java index c7e9885e34d3..a9145f1bc1dc 100644 --- a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartDownloadUtils.java +++ b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartDownloadUtils.java @@ -23,7 +23,13 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; import software.amazon.awssdk.annotations.SdkInternalApi; +import software.amazon.awssdk.core.SdkResponse; +import software.amazon.awssdk.core.SplittingTransformerConfiguration; +import software.amazon.awssdk.core.async.AsyncResponseTransformer; +import software.amazon.awssdk.core.async.AsyncResponseTransformer.SplitResult; +import software.amazon.awssdk.services.s3.model.ChecksumType; import software.amazon.awssdk.services.s3.model.GetObjectRequest; +import software.amazon.awssdk.services.s3.model.GetObjectResponse; import software.amazon.awssdk.services.s3.model.S3Request; @SdkInternalApi @@ -126,4 +132,54 @@ public static long calculateTotalParts(long contentLength, long partSize) { } + /** + * Rewrites a first-part response to represent the full object. + * + * @param firstPartResponse the GetObjectResponse from the first part request + * @return full-object response with total content-length, full content-range, + * and checksum values nulled if checksum type is COMPOSITE + */ + public static GetObjectResponse toFullObjectResponse(GetObjectResponse firstPartResponse) { + String contentRange = firstPartResponse.contentRange(); + Optional totalOpt = parseContentRangeForTotalSize(contentRange); + if (!totalOpt.isPresent()) { + return firstPartResponse; + } + long totalLength = totalOpt.get(); + String fullRange = "bytes 0-" + (totalLength - 1) + "/" + totalLength; + + GetObjectResponse.Builder builder = firstPartResponse.toBuilder() + .contentLength(totalLength) + .contentRange(fullRange); + + if (firstPartResponse.checksumType() == ChecksumType.COMPOSITE) { + builder.sdkFields().stream() + .filter(f -> f.memberName().startsWith("Checksum") && !"ChecksumType".equals(f.memberName())) + .forEach(f -> f.set(builder, null)); + } + + return builder.build(); + } + + /** + * Splits the given transformer with a response mapper that applies {@link #toFullObjectResponse} + * to the first part's response before it reaches the customer's transformer. + */ + public static SplitResult splitWithResponseRewrite( + AsyncResponseTransformer transformer, + SplittingTransformerConfiguration splitConfig) { + SplittingTransformerConfiguration configWithMapper = + splitConfig.toBuilder() + .responseMapper(MultipartDownloadUtils::mapToFullObjectResponse) + .build(); + return transformer.split(configWithMapper); + } + + private static SdkResponse mapToFullObjectResponse(SdkResponse response) { + if (response instanceof GetObjectResponse) { + return toFullObjectResponse((GetObjectResponse) response); + } + return response; + } + } diff --git a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/ParallelPresignedUrlMultipartDownloaderSubscriber.java b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/ParallelPresignedUrlMultipartDownloaderSubscriber.java index 88031de5c733..3497aa3745e6 100644 --- a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/ParallelPresignedUrlMultipartDownloaderSubscriber.java +++ b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/ParallelPresignedUrlMultipartDownloaderSubscriber.java @@ -191,7 +191,7 @@ private void sendFirstRequest(AsyncResponseTransformer CompletableFuture downloadObject( doMultipartDownload(presignedRequest, asyncResponseTransformer) .whenComplete((result, error) -> { Throwable cause = error instanceof CompletionException ? error.getCause() : error; - if (cause instanceof EmptyObjectRangeNotSatisfiableException) { + // Parallel path wraps it as EmptyObjectRangeNotSatisfiableException; + // serial path (toBytes, custom transformers) surfaces raw S3Exception. + if (cause instanceof EmptyObjectRangeNotSatisfiableException + || isRangeNotSatisfiable(cause)) { log.debug(() -> "Received 416 on first request, falling back to non-range GET for empty object"); asyncPresignedUrlExtension.getObject(presignedRequest, asyncResponseTransformer) .whenComplete((r, e) -> { @@ -99,6 +102,8 @@ private CompletableFuture doMultipartDownload( if (split.parallelSplitSupported()) { return downloadPartsInParallel(presignedRequest, split); } + // Serial path: split with response mapper to convert part response to full-object response + split = MultipartDownloadUtils.splitWithResponseRewrite(asyncResponseTransformer, splittingConfig); return downloadPartsSerially(presignedRequest, split); } diff --git a/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartDownloadUtilsTest.java b/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartDownloadUtilsTest.java index acc535b686eb..b88c6cd8a33d 100644 --- a/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartDownloadUtilsTest.java +++ b/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartDownloadUtilsTest.java @@ -19,7 +19,10 @@ import static software.amazon.awssdk.services.s3.multipart.S3MultipartExecutionAttribute.MULTIPART_DOWNLOAD_RESUME_CONTEXT; import org.junit.jupiter.api.Test; +import software.amazon.awssdk.http.SdkHttpResponse; +import software.amazon.awssdk.services.s3.model.ChecksumType; import software.amazon.awssdk.services.s3.model.GetObjectRequest; +import software.amazon.awssdk.services.s3.model.GetObjectResponse; class MultipartDownloadUtilsTest { @@ -99,4 +102,77 @@ void calculateTotalParts_shouldCalculateCorrectly() { assertThat(MultipartDownloadUtils.calculateTotalParts(Long.MAX_VALUE, 2)) .isEqualTo((Long.MAX_VALUE / 2) + 1); } + + @Test + void toFullObjectResponse_setsContentLengthAndContentRange() { + GetObjectResponse response = GetObjectResponse.builder() + .contentLength(1024L) + .contentRange("bytes 0-1023/4096") + .eTag("\"abc123\"") + .build(); + response = setHttpResponse(response, 206, "Partial Content"); + + GetObjectResponse result = MultipartDownloadUtils.toFullObjectResponse(response); + + assertThat(result.contentLength()).isEqualTo(4096L); + assertThat(result.contentRange()).isEqualTo("bytes 0-4095/4096"); + assertThat(result.eTag()).isEqualTo("\"abc123\""); + } + + @Test + void toFullObjectResponse_noContentRange_returnsUnchanged() { + GetObjectResponse response = GetObjectResponse.builder() + .contentLength(1024L) + .build(); + response = setHttpResponse(response, 206, "Partial Content"); + + GetObjectResponse result = MultipartDownloadUtils.toFullObjectResponse(response); + + assertThat(result.contentLength()).isEqualTo(1024L); + } + + @Test + void toFullObjectResponse_compositeChecksum_nullsChecksumValues() { + GetObjectResponse response = GetObjectResponse.builder() + .contentLength(8388608L) + .contentRange("bytes 0-8388607/25165824") + .checksumType(ChecksumType.COMPOSITE) + .checksumCRC32("/g/pWA==") + .build(); + response = setHttpResponse(response, 206, "Partial Content"); + + GetObjectResponse result = MultipartDownloadUtils.toFullObjectResponse(response); + + assertThat(result.checksumType()).isEqualTo(ChecksumType.COMPOSITE); + assertThat(result.checksumCRC32()).isNull(); + assertThat(result.checksumCRC32C()).isNull(); + assertThat(result.checksumCRC64NVME()).isNull(); + assertThat(result.checksumSHA1()).isNull(); + assertThat(result.checksumSHA256()).isNull(); + assertThat(result.contentLength()).isEqualTo(25165824L); + } + + @Test + void toFullObjectResponse_fullObjectChecksum_preservesChecksumValues() { + GetObjectResponse response = GetObjectResponse.builder() + .contentLength(8388608L) + .contentRange("bytes 0-8388607/25165824") + .checksumType(ChecksumType.FULL_OBJECT) + .checksumCRC32("abc123") + .build(); + response = setHttpResponse(response, 206, "Partial Content"); + + GetObjectResponse result = MultipartDownloadUtils.toFullObjectResponse(response); + + assertThat(result.checksumType()).isEqualTo(ChecksumType.FULL_OBJECT); + assertThat(result.checksumCRC32()).isEqualTo("abc123"); + } + + private static GetObjectResponse setHttpResponse(GetObjectResponse response, int statusCode, String statusText) { + SdkHttpResponse httpResponse = SdkHttpResponse.builder() + .statusCode(statusCode) + .statusText(statusText) + .build(); + return (GetObjectResponse) response.toBuilder().sdkHttpResponse(httpResponse).build(); + } } diff --git a/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/PresignedUrlMultipartDownloaderSubscriberWiremockTest.java b/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/PresignedUrlMultipartDownloaderSubscriberWiremockTest.java index c71534842bb9..ad05af389452 100644 --- a/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/PresignedUrlMultipartDownloaderSubscriberWiremockTest.java +++ b/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/PresignedUrlMultipartDownloaderSubscriberWiremockTest.java @@ -31,6 +31,7 @@ import java.net.MalformedURLException; import java.net.URI; import java.net.URL; +import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; @@ -44,8 +45,11 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; import software.amazon.awssdk.core.ResponseBytes; import software.amazon.awssdk.core.async.AsyncResponseTransformer; +import software.amazon.awssdk.core.async.SdkPublisher; import software.amazon.awssdk.core.exception.SdkClientException; import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.model.GetObjectResponse; @@ -71,6 +75,9 @@ public void setup(WireMockRuntimeInfo wiremock) throws MalformedURLException { .build(); s3AsyncClient = S3AsyncClient.builder() .endpointOverride(URI.create("http://localhost:" + wiremock.getHttpPort())) + .credentialsProvider(software.amazon.awssdk.auth.credentials.StaticCredentialsProvider.create( + software.amazon.awssdk.auth.credentials.AwsBasicCredentials.create("accessKey", "secretKey"))) + .region(software.amazon.awssdk.regions.Region.US_EAST_1) .multipartEnabled(true) .multipartConfiguration(multipartConfig) .build(); @@ -339,6 +346,59 @@ void presignedUrlDownload_withRangeHeader_emptyObject_shouldThrow416(String tran .hasRootCauseInstanceOf(S3Exception.class); } + /** + * Verifies that custom serial transformers on the presigned URL path correctly trigger + * the empty-object 416 fallback. + */ + @Test + void presignedUrlDownload_emptyObject_customTransformer_fallbackWorks() { + // Range request → 416 (simulates empty object race) + stubFor(get(urlEqualTo(PRESIGNED_URL_PATH)) + .withHeader("Range", matching("bytes=.*")) + .willReturn(aResponse() + .withStatus(416) + .withBody("InvalidRange" + + "The requested range is not satisfiable"))); + + // Non-range fallback GET → 200 with empty body (the correct fallback for empty object) + stubFor(get(urlEqualTo(PRESIGNED_URL_PATH)) + .withHeader("Range", absent()) + .willReturn(aResponse() + .withStatus(200) + .withHeader("Content-Length", "0") + .withBody(""))); + + AsyncResponseTransformer customTransformer = + new AsyncResponseTransformer() { + private CompletableFuture future; + @Override + public CompletableFuture prepare() { + future = new CompletableFuture<>(); + return future; + } + @Override public void onResponse(GetObjectResponse r) { } + @Override public void onStream(SdkPublisher p) { + p.subscribe(new Subscriber() { + @Override public void onSubscribe(Subscription s) { s.request(Long.MAX_VALUE); } + @Override public void onNext(ByteBuffer b) { } + @Override public void onError(Throwable t) { future.completeExceptionally(t); } + @Override public void onComplete() { future.complete("done"); } + }); + } + @Override public void exceptionOccurred(Throwable e) { future.completeExceptionally(e); } + }; + + PresignedUrlDownloadRequest request = PresignedUrlDownloadRequest.builder() + .presignedUrl(presignedUrl) + .build(); + + // Should succeed: 416 triggers fallback → non-range GET returns 200 + String result = s3AsyncClient.presignedUrlExtension() + .getObject(request, customTransformer) + .join(); + assertThat(result).isEqualTo("done"); + } + @AfterEach void cleanup() { if (tempFile != null && Files.exists(tempFile)) { @@ -486,4 +546,62 @@ private void stubResponseWithContentLengthMismatch() { .withHeader("ETag", "\"test-etag\"") .withBody(Arrays.copyOfRange(TEST_DATA, 0, 8)))); } + + @Test + void presignedUrlDownload_customTransformer_hasFullObjectMetadata() { + // Stub two parts: 16 bytes each, total 32 + stubFor(get(urlEqualTo(PRESIGNED_URL_PATH)) + .withHeader("Range", matching("bytes=0-15")) + .willReturn(aResponse() + .withStatus(206) + .withHeader("Content-Length", "16") + .withHeader("Content-Range", "bytes 0-15/32") + .withHeader("ETag", "\"test-etag\"") + .withBody(Arrays.copyOfRange(TEST_DATA, 0, 16)))); + + stubFor(get(urlEqualTo(PRESIGNED_URL_PATH)) + .withHeader("Range", matching("bytes=16-31")) + .willReturn(aResponse() + .withStatus(206) + .withHeader("Content-Length", "16") + .withHeader("Content-Range", "bytes 16-31/32") + .withHeader("ETag", "\"test-etag\"") + .withBody(Arrays.copyOfRange(TEST_DATA, 16, 32)))); + + AsyncResponseTransformer customTransformer = + new AsyncResponseTransformer() { + private CompletableFuture future; + private GetObjectResponse response; + + @Override + public CompletableFuture prepare() { + future = new CompletableFuture<>(); + return future; + } + @Override public void onResponse(GetObjectResponse r) { this.response = r; } + @Override public void onStream(SdkPublisher p) { + p.subscribe(new Subscriber() { + @Override public void onSubscribe(Subscription s) { s.request(Long.MAX_VALUE); } + @Override public void onNext(ByteBuffer b) { } + @Override public void onError(Throwable t) { future.completeExceptionally(t); } + @Override public void onComplete() { + future.complete("contentLength=" + response.contentLength() + + "|contentRange=" + response.contentRange()); + } + }); + } + @Override public void exceptionOccurred(Throwable e) { future.completeExceptionally(e); } + }; + + PresignedUrlDownloadRequest request = PresignedUrlDownloadRequest.builder() + .presignedUrl(presignedUrl) + .build(); + + String result = s3AsyncClient.presignedUrlExtension() + .getObject(request, customTransformer) + .join(); + + assertThat(result).contains("contentLength=32"); + assertThat(result).contains("contentRange=bytes 0-31/32"); + } }