diff --git a/aws/src/integration/java/org/apache/iceberg/aws/s3/TestS3MultipartUpload.java b/aws/src/integration/java/org/apache/iceberg/aws/s3/TestS3MultipartUpload.java index cbe3051a6711..746015098a40 100644 --- a/aws/src/integration/java/org/apache/iceberg/aws/s3/TestS3MultipartUpload.java +++ b/aws/src/integration/java/org/apache/iceberg/aws/s3/TestS3MultipartUpload.java @@ -21,12 +21,14 @@ import static org.assertj.core.api.Assertions.assertThat; import java.io.IOException; +import java.util.Arrays; import java.util.Random; import java.util.UUID; import java.util.function.Supplier; import java.util.stream.IntStream; import org.apache.iceberg.aws.AwsClientFactories; import org.apache.iceberg.aws.AwsIntegTestUtil; +import org.apache.iceberg.io.IOUtil; import org.apache.iceberg.io.PositionOutputStream; import org.apache.iceberg.io.SeekableInputStream; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; @@ -36,6 +38,8 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.condition.EnabledIfEnvironmentVariable; import org.junit.jupiter.api.condition.EnabledIfEnvironmentVariables; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import software.amazon.awssdk.services.s3.S3Client; /** Long-running tests to ensure multipart upload logic is resilient */ @@ -141,6 +145,35 @@ public void testParallelUpload() throws IOException { } } + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testMultipartUploadWithChunkedEncoding(boolean chunkedEncodingEnabled) + throws IOException { + // Create a new S3FileIO with specified chunked encoding setting + try (S3FileIO testIo = new S3FileIO(() -> s3)) { + testIo.initialize( + ImmutableMap.of( + S3FileIOProperties.MULTIPART_SIZE, + Integer.toString(S3FileIOProperties.MULTIPART_SIZE_MIN), + S3FileIOProperties.CHECKSUM_ENABLED, + "true", + S3FileIOProperties.CHUNKED_ENCODING_ENABLED, + Boolean.toString(chunkedEncodingEnabled))); + + int parts = 10; + long partSize = S3FileIOProperties.MULTIPART_SIZE_MIN; + String suffix = chunkedEncodingEnabled ? "-chunked-enabled" : "-chunked-disabled"; + + String intObjectUri = objectUri + suffix + "-int"; + writeDistinctPartsWithInts(testIo, intObjectUri, parts, partSize); + verifyDistinctPartsWithInts(testIo, intObjectUri, parts, partSize); + + String bytesObjectUri = objectUri + suffix + "-bytes"; + writeDistinctPartsWithBytes(testIo, bytesObjectUri, parts, partSize); + verifyDistinctPartsWithBytes(testIo, bytesObjectUri, parts, partSize); + } + } + private void writeInts(String fileUri, int parts, Supplier writer) { writeInts(fileUri, parts, S3FileIOProperties.MULTIPART_SIZE_MIN, writer); } @@ -177,4 +210,61 @@ private void writeBytes(String fileUri, int parts, Supplier writer) { throw new RuntimeException(e); } } + + private void writeDistinctPartsWithInts(S3FileIO fileIO, String fileUri, int parts, long partSize) + throws IOException { + try (PositionOutputStream outputStream = fileIO.newOutputFile(fileUri).create()) { + for (int part = 0; part < parts; part++) { + int partByte = part + 1; + for (long j = 0; j < partSize; j++) { + outputStream.write(partByte); + } + } + } + + assertThat(fileIO.newInputFile(fileUri).getLength()).isEqualTo(parts * partSize); + } + + private void verifyDistinctPartsWithInts( + S3FileIO fileIO, String fileUri, int parts, long partSize) throws IOException { + try (SeekableInputStream inputStream = fileIO.newInputFile(fileUri).newStream()) { + byte[] readBuffer = new byte[(int) partSize]; + for (int part = 0; part < parts; part++) { + byte expectedByte = (byte) (part + 1); + IOUtil.readFully(inputStream, readBuffer, 0, (int) partSize); + for (int i = 0; i < (int) partSize; i++) { + assertThat(readBuffer[i]).as("part %d, offset %d", part, i).isEqualTo(expectedByte); + } + } + assertThat(inputStream.read()).as("expected end of stream").isEqualTo(-1); + } + } + + private void writeDistinctPartsWithBytes( + S3FileIO fileIO, String fileUri, int parts, long partSize) throws IOException { + try (PositionOutputStream outputStream = fileIO.newOutputFile(fileUri).create()) { + for (int part = 0; part < parts; part++) { + byte[] partBytes = new byte[(int) partSize]; + Arrays.fill(partBytes, (byte) (part + 1)); + outputStream.write(partBytes); + } + } + + assertThat(fileIO.newInputFile(fileUri).getLength()).isEqualTo(parts * partSize); + } + + private void verifyDistinctPartsWithBytes( + S3FileIO fileIO, String fileUri, int parts, long partSize) throws IOException { + try (SeekableInputStream inputStream = fileIO.newInputFile(fileUri).newStream()) { + byte[] readBuffer = new byte[(int) partSize]; + for (int part = 0; part < parts; part++) { + byte expectedByte = (byte) (part + 1); + IOUtil.readFully(inputStream, readBuffer, 0, (int) partSize); + for (int i = 0; i < (int) partSize; i++) { + assertThat(readBuffer[i]).as("part %d, offset %d", part, i).isEqualTo(expectedByte); + } + } + assertThat(inputStream.read()).as("expected end of stream").isEqualTo(-1); + } + } } diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIOProperties.java b/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIOProperties.java index ad5181fd2798..922010d61d27 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIOProperties.java +++ b/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIOProperties.java @@ -295,6 +295,18 @@ public class S3FileIOProperties implements Serializable { public static final boolean REMOTE_SIGNING_ENABLED_DEFAULT = false; + /** + * Enables or disables chunked encoding for S3 requests. + * + *

This feature is enabled by default to match the AWS SDK default behavior. + * + *

For more details see: + * https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/services/s3/S3Configuration.html#chunkedEncodingEnabled() + */ + public static final String CHUNKED_ENCODING_ENABLED = "s3.chunked-encoding-enabled"; + + public static final boolean CHUNKED_ENCODING_ENABLED_DEFAULT = true; + /** Configure the batch size used when deleting multiple files from a given S3 bucket */ public static final String DELETE_BATCH_SIZE = "s3.delete.batch-size"; @@ -509,6 +521,7 @@ public class S3FileIOProperties implements Serializable { private String stagingDirectory; private ObjectCannedACL acl; private boolean isChecksumEnabled; + private boolean isChunkedEncodingEnabled; private final Set writeTags; private boolean isWriteTableTagEnabled; private boolean isWriteNamespaceTagEnabled; @@ -551,6 +564,7 @@ public S3FileIOProperties() { this.deleteBatchSize = DELETE_BATCH_SIZE_DEFAULT; this.stagingDirectory = System.getProperty("java.io.tmpdir"); this.isChecksumEnabled = CHECKSUM_ENABLED_DEFAULT; + this.isChunkedEncodingEnabled = CHUNKED_ENCODING_ENABLED_DEFAULT; this.writeTags = Sets.newHashSet(); this.isWriteTableTagEnabled = WRITE_TABLE_TAG_ENABLED_DEFAULT; this.isWriteNamespaceTagEnabled = WRITE_NAMESPACE_TAG_ENABLED_DEFAULT; @@ -641,6 +655,9 @@ public S3FileIOProperties(Map properties) { "Cannot support S3 CannedACL " + aclType); this.isChecksumEnabled = PropertyUtil.propertyAsBoolean(properties, CHECKSUM_ENABLED, CHECKSUM_ENABLED_DEFAULT); + this.isChunkedEncodingEnabled = + PropertyUtil.propertyAsBoolean( + properties, CHUNKED_ENCODING_ENABLED, CHUNKED_ENCODING_ENABLED_DEFAULT); this.deleteBatchSize = PropertyUtil.propertyAsInt(properties, DELETE_BATCH_SIZE, DELETE_BATCH_SIZE_DEFAULT); Preconditions.checkArgument( @@ -808,6 +825,10 @@ public boolean isChecksumEnabled() { return this.isChecksumEnabled; } + public boolean isChunkedEncodingEnabled() { + return this.isChunkedEncodingEnabled; + } + public boolean isRemoteSigningEnabled() { return this.isRemoteSigningEnabled; } @@ -994,6 +1015,7 @@ public void applyServiceConfigurations(T builder) { .pathStyleAccessEnabled(isPathStyleAccess) .useArnRegionEnabled(isUseArnRegionEnabled) .accelerateModeEnabled(isAccelerationEnabled) + .chunkedEncodingEnabled(isChunkedEncodingEnabled) .build()); } diff --git a/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3FileIOProperties.java b/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3FileIOProperties.java index 1666de1f1d08..953f73d45d4a 100644 --- a/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3FileIOProperties.java +++ b/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3FileIOProperties.java @@ -566,4 +566,25 @@ public void testApplyRetryConfiguration() { RetryPolicy retryPolicy = builder.overrideConfiguration().retryPolicy().get(); assertThat(retryPolicy.numRetries()).as("retries was not set").isEqualTo(999); } + + @Test + public void testChunkedEncodingEnabledDefaultValue() { + Map properties = Maps.newHashMap(); + S3FileIOProperties s3FileIOProperties = new S3FileIOProperties(properties); + + assertThat(s3FileIOProperties.isChunkedEncodingEnabled()) + .as("chunked encoding should be enabled by default") + .isTrue(); + } + + @Test + public void testChunkedEncodingDisabled() { + Map properties = Maps.newHashMap(); + properties.put(S3FileIOProperties.CHUNKED_ENCODING_ENABLED, "false"); + S3FileIOProperties s3FileIOProperties = new S3FileIOProperties(properties); + + assertThat(s3FileIOProperties.isChunkedEncodingEnabled()) + .as("chunked encoding should be disabled when explicitly set to false") + .isFalse(); + } }