From 6c2546f882723d76b41e31e4f014152210159c66 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Thu, 2 Apr 2026 14:24:37 -0400 Subject: [PATCH 01/14] group rows by partition before writing --- .../AssignDestinationsAndPartitions.java | 131 +++++++++ .../beam/sdk/io/iceberg/BeamRowWrapper.java | 149 +++++++++++ .../apache/beam/sdk/io/iceberg/IcebergIO.java | 47 +++- .../IcebergWriteSchemaTransformProvider.java | 9 + .../beam/sdk/io/iceberg/RecordWriter.java | 3 +- .../iceberg/WritePartitionedRowsToFiles.java | 249 ++++++++++++++++++ .../sdk/io/iceberg/WriteToPartitions.java | 80 ++++++ .../sdk/io/iceberg/IcebergIOWriteTest.java | 39 ++- ...ebergWriteSchemaTransformProviderTest.java | 38 ++- 9 files changed, 718 insertions(+), 27 deletions(-) create mode 100644 sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AssignDestinationsAndPartitions.java create mode 100644 sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/BeamRowWrapper.java create mode 100644 sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WritePartitionedRowsToFiles.java create mode 100644 sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteToPartitions.java diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AssignDestinationsAndPartitions.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AssignDestinationsAndPartitions.java new file mode 100644 index 000000000000..4e1a4f2883ad --- /dev/null +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AssignDestinationsAndPartitions.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.iceberg; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.RowCoder; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.ValueInSingleWindow; +import org.apache.iceberg.PartitionKey; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.NoSuchTableException; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.joda.time.Instant; + +/** + * Assigns destination metadata for each input record. + * + *

The output will have the format { {destination, partition}, data } + */ +class AssignDestinationsAndPartitions + extends PTransform, PCollection>> { + + private final DynamicDestinations dynamicDestinations; + private final IcebergCatalogConfig catalogConfig; + static final String DESTINATION = "destination"; + static final String PARTITION = "partition"; + static final org.apache.beam.sdk.schemas.Schema OUTPUT_SCHEMA = + org.apache.beam.sdk.schemas.Schema.builder() + .addStringField(DESTINATION) + .addStringField(PARTITION) + .build(); + + public AssignDestinationsAndPartitions( + DynamicDestinations dynamicDestinations, IcebergCatalogConfig catalogConfig) { + this.dynamicDestinations = dynamicDestinations; + this.catalogConfig = catalogConfig; + } + + @Override + public PCollection> expand(PCollection input) { + return input + .apply(ParDo.of(new AssignDoFn(dynamicDestinations, catalogConfig))) + .setCoder( + KvCoder.of( + RowCoder.of(OUTPUT_SCHEMA), RowCoder.of(dynamicDestinations.getDataSchema()))); + } + + static class AssignDoFn extends DoFn> { + static final Map PARTITION_KEYS = new ConcurrentHashMap<>(); + static final Map WRAPPERS = new ConcurrentHashMap<>(); + private final DynamicDestinations dynamicDestinations; + private final IcebergCatalogConfig catalogConfig; + + AssignDoFn(DynamicDestinations dynamicDestinations, IcebergCatalogConfig catalogConfig) { + this.dynamicDestinations = dynamicDestinations; + this.catalogConfig = catalogConfig; + } + + @ProcessElement + public void processElement( + @Element Row element, + BoundedWindow window, + PaneInfo paneInfo, + @Timestamp Instant timestamp, + OutputReceiver> out) { + String tableIdentifier = + dynamicDestinations.getTableStringIdentifier( + ValueInSingleWindow.of(element, timestamp, window, paneInfo)); + Row data = dynamicDestinations.getData(element); + + @Nullable PartitionKey partitionKey = PARTITION_KEYS.get(tableIdentifier); + @Nullable BeamRowWrapper wrapper = WRAPPERS.get(tableIdentifier); + if (partitionKey == null || wrapper == null) { + PartitionSpec spec = PartitionSpec.unpartitioned(); + Schema schema = IcebergUtils.beamSchemaToIcebergSchema(data.getSchema()); + @Nullable + IcebergTableCreateConfig createConfig = + dynamicDestinations.instantiateDestination(tableIdentifier).getTableCreateConfig(); + if (createConfig != null && createConfig.getPartitionFields() != null) { + spec = + PartitionUtils.toPartitionSpec(createConfig.getPartitionFields(), data.getSchema()); + } else { + try { + // see if table already exists with a spec + // TODO(ahmedabu98): improve this by periodically refreshing the table to fetch updated + // specs + spec = catalogConfig.catalog().loadTable(TableIdentifier.parse(tableIdentifier)).spec(); + } catch (NoSuchTableException ignored) { + // no partition to apply + } + } + partitionKey = new PartitionKey(spec, schema); + wrapper = new BeamRowWrapper(data.getSchema(), schema.asStruct()); + PARTITION_KEYS.put(tableIdentifier, partitionKey); + WRAPPERS.put(tableIdentifier, wrapper); + } + partitionKey.partition(wrapper.wrap(data)); + String partitionPath = partitionKey.toPath(); + + Row destAndPartition = + Row.withSchema(OUTPUT_SCHEMA).addValues(tableIdentifier, partitionPath).build(); + out.output(KV.of(destAndPartition, data)); + } + } +} diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/BeamRowWrapper.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/BeamRowWrapper.java new file mode 100644 index 000000000000..ad7ec4b7b04f --- /dev/null +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/BeamRowWrapper.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.iceberg; + +import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; + +import java.lang.reflect.Array; +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.time.Instant; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.util.concurrent.TimeUnit; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.Schema.FieldType; +import org.apache.beam.sdk.schemas.logicaltypes.Date; +import org.apache.beam.sdk.schemas.logicaltypes.DateTime; +import org.apache.beam.sdk.schemas.logicaltypes.FixedPrecisionNumeric; +import org.apache.beam.sdk.schemas.logicaltypes.MicrosInstant; +import org.apache.beam.sdk.schemas.logicaltypes.Time; +import org.apache.beam.sdk.values.Row; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.DateTimeUtil; +import org.apache.iceberg.util.UUIDUtil; +import org.checkerframework.checker.nullness.qual.Nullable; + +public class BeamRowWrapper implements StructLike { + + private final FieldType[] types; + private final @Nullable PositionalGetter[] getters; + private @Nullable Row row = null; + + public BeamRowWrapper(Schema schema, Types.StructType struct) { + int size = schema.getFieldCount(); + + types = (FieldType[]) Array.newInstance(FieldType.class, size); + getters = (PositionalGetter[]) Array.newInstance(PositionalGetter.class, size); + + for (int i = 0; i < size; i++) { + types[i] = schema.getField(i).getType(); + getters[i] = buildGetter(types[i], struct.fields().get(i).type()); + } + } + + public BeamRowWrapper wrap(@Nullable Row row) { + this.row = row; + return this; + } + + @Override + public int size() { + return types.length; + } + + @Override + public @Nullable T get(int pos, Class javaClass) { + if (row == null || row.getValue(pos) == null) { + return null; + } else if (getters[pos] != null) { + return javaClass.cast(getters[pos].get(checkStateNotNull(row), pos)); + } + + return javaClass.cast(checkStateNotNull(row).getValue(pos)); + } + + @Override + public void set(int pos, T value) { + throw new UnsupportedOperationException( + "Could not set a field in the BeamRowWrapper because rowData is read-only"); + } + + private interface PositionalGetter { + T get(Row data, int pos); + } + + private static @Nullable PositionalGetter buildGetter(FieldType beamType, Type icebergType) { + switch (beamType.getTypeName()) { + case BYTE: + return Row::getByte; + case INT16: + return Row::getInt16; + case STRING: + return Row::getString; + case BYTES: + return (row, pos) -> { + byte[] bytes = checkStateNotNull(row.getBytes(pos)); + if (Type.TypeID.UUID == icebergType.typeId()) { + return UUIDUtil.convert(bytes); + } else { + return ByteBuffer.wrap(bytes); + } + }; + case DECIMAL: + return Row::getDecimal; + case DATETIME: + return (row, pos) -> + TimeUnit.MILLISECONDS.toMicros(checkStateNotNull(row.getDateTime(pos)).getMillis()); + case ROW: + Schema beamSchema = checkStateNotNull(beamType.getRowSchema()); + Types.StructType structType = (Types.StructType) icebergType; + + BeamRowWrapper nestedWrapper = new BeamRowWrapper(beamSchema, structType); + return (row, pos) -> nestedWrapper.wrap(row.getRow(pos)); + case LOGICAL_TYPE: + if (beamType.isLogicalType(MicrosInstant.IDENTIFIER)) { + return (row, pos) -> { + Instant instant = checkStateNotNull(row.getLogicalTypeValue(pos, Instant.class)); + return TimeUnit.SECONDS.toMicros(instant.getEpochSecond()) + instant.getNano() / 1000; + }; + } else if (beamType.isLogicalType(DateTime.IDENTIFIER)) { + return (row, pos) -> + DateTimeUtil.microsFromTimestamp( + checkStateNotNull(row.getLogicalTypeValue(pos, LocalDateTime.class))); + } else if (beamType.isLogicalType(Date.IDENTIFIER)) { + return (row, pos) -> + DateTimeUtil.daysFromDate( + checkStateNotNull(row.getLogicalTypeValue(pos, LocalDate.class))); + } else if (beamType.isLogicalType(Time.IDENTIFIER)) { + return (row, pos) -> + DateTimeUtil.microsFromTime( + checkStateNotNull(row.getLogicalTypeValue(pos, LocalTime.class))); + } else if (beamType.isLogicalType(FixedPrecisionNumeric.IDENTIFIER)) { + return (row, pos) -> row.getLogicalTypeValue(pos, BigDecimal.class); + } else { + return null; + } + default: + return null; + } + } +} diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java index 1d71ad549094..529095b6ca71 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java @@ -381,7 +381,10 @@ public class IcebergIO { public static WriteRows writeRows(IcebergCatalogConfig catalog) { - return new AutoValue_IcebergIO_WriteRows.Builder().setCatalogConfig(catalog).build(); + return new AutoValue_IcebergIO_WriteRows.Builder() + .setCatalogConfig(catalog) + .setGroupByPartitions(false) + .build(); } @AutoValue @@ -397,6 +400,8 @@ public abstract static class WriteRows extends PTransform, Iceb abstract @Nullable Integer getDirectWriteByteLimit(); + abstract boolean getGroupByPartitions(); + abstract Builder toBuilder(); @AutoValue.Builder @@ -411,6 +416,8 @@ abstract static class Builder { abstract Builder setDirectWriteByteLimit(Integer directWriteByteLimit); + abstract Builder setGroupByPartitions(boolean GroupByPartitions); + abstract WriteRows build(); } @@ -443,6 +450,15 @@ public WriteRows withDirectWriteByteLimit(Integer directWriteByteLimit) { return toBuilder().setDirectWriteByteLimit(directWriteByteLimit).build(); } + /** + * Groups incoming rows by partition before sending to writes, ensuring that a given bundle is + * written to only one partition. For partitioned tables, this helps significantly to reduce the + * number of small files. + */ + public WriteRows groupingByPartitions() { + return toBuilder().setGroupByPartitions(true).build(); + } + @Override public IcebergWriteResult expand(PCollection input) { List allToArgs = Arrays.asList(getTableIdentifier(), getDynamicDestinations()); @@ -464,15 +480,26 @@ public IcebergWriteResult expand(PCollection input) { IcebergUtils.isUnbounded(input), "Must only provide direct write limit for unbounded pipelines."); } - return input - .apply("Assign Table Destinations", new AssignDestinations(destinations)) - .apply( - "Write Rows to Destinations", - new WriteToDestinations( - getCatalogConfig(), - destinations, - getTriggeringFrequency(), - getDirectWriteByteLimit())); + + if (getGroupByPartitions()) { + return input + .apply( + "AssignDestinationAndPartition", + new AssignDestinationsAndPartitions(destinations, getCatalogConfig())) + .apply( + "Write Rows to Partitions", + new WriteToPartitions(getCatalogConfig(), destinations, getTriggeringFrequency())); + } else { + return input + .apply("Assign Table Destinations", new AssignDestinations(destinations)) + .apply( + "Write Rows to Destinations", + new WriteToDestinations( + getCatalogConfig(), + destinations, + getTriggeringFrequency(), + getDirectWriteByteLimit())); + } } } diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java index 428ef71f23e5..d8615e128315 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java @@ -134,6 +134,8 @@ public static Builder builder() { + " please visit https://iceberg.apache.org/docs/latest/configuration/#table-properties.") public abstract @Nullable Map getTableProperties(); + public abstract @Nullable Boolean getGroupByPartitions(); + @AutoValue.Builder public abstract static class Builder { public abstract Builder setTable(String table); @@ -158,6 +160,8 @@ public abstract static class Builder { public abstract Builder setTableProperties(Map tableProperties); + public abstract Builder setGroupByPartitions(Boolean groupByPartitions); + public abstract Configuration build(); } @@ -238,6 +242,11 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { writeTransform = writeTransform.withDirectWriteByteLimit(directWriteByteLimit); } + @Nullable Boolean groupByPartitions = configuration.getGroupByPartitions(); + if (groupByPartitions != null && groupByPartitions) { + writeTransform = writeTransform.groupingByPartitions(); + } + // TODO: support dynamic destinations IcebergWriteResult result = rows.apply(writeTransform); diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriter.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriter.java index 82251c00e72e..fd3d5d63327c 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriter.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriter.java @@ -23,6 +23,7 @@ import org.apache.iceberg.DataFile; import org.apache.iceberg.FileFormat; import org.apache.iceberg.PartitionKey; +import org.apache.iceberg.StructLike; import org.apache.iceberg.Table; import org.apache.iceberg.avro.Avro; import org.apache.iceberg.catalog.Catalog; @@ -56,7 +57,7 @@ class RecordWriter { partitionKey); } - RecordWriter(Table table, FileFormat fileFormat, String filename, PartitionKey partitionKey) + RecordWriter(Table table, FileFormat fileFormat, String filename, StructLike partitionKey) throws IOException { this.table = table; this.fileFormat = fileFormat; diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WritePartitionedRowsToFiles.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WritePartitionedRowsToFiles.java new file mode 100644 index 000000000000..51f49530493c --- /dev/null +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WritePartitionedRowsToFiles.java @@ -0,0 +1,249 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.iceberg; + +import static org.apache.beam.sdk.io.iceberg.AssignDestinationsAndPartitions.DESTINATION; +import static org.apache.beam.sdk.io.iceberg.AssignDestinationsAndPartitions.PARTITION; +import static org.apache.beam.sdk.io.iceberg.RecordWriterManager.getPartitionDataPath; +import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; + +import java.time.Duration; +import java.time.Instant; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import org.apache.beam.sdk.coders.IterableCoder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.RowCoder; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.util.ShardedKey; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.Cache; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps; +import org.apache.iceberg.DataFiles; +import org.apache.iceberg.PartitionField; +import org.apache.iceberg.PartitionKey; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.SupportsNamespaces; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.exceptions.AlreadyExistsException; +import org.apache.iceberg.exceptions.NoSuchTableException; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class WritePartitionedRowsToFiles + extends PTransform< + PCollection, Iterable>>, PCollection> { + private static final Logger LOG = LoggerFactory.getLogger(WritePartitionedRowsToFiles.class); + private final DynamicDestinations dynamicDestinations; + private final IcebergCatalogConfig catalogConfig; + private final String filePrefix; + + WritePartitionedRowsToFiles( + IcebergCatalogConfig catalogConfig, + DynamicDestinations dynamicDestinations, + String filePrefix) { + this.catalogConfig = catalogConfig; + this.dynamicDestinations = dynamicDestinations; + this.filePrefix = filePrefix; + } + + @Override + public PCollection expand( + PCollection, Iterable>> input) { + Schema dataSchema = + ((RowCoder) + ((IterableCoder) + ((KvCoder, Iterable>) input.getCoder()) + .getValueCoder()) + .getElemCoder()) + .getSchema(); + return input.apply( + ParDo.of(new WriteDoFn(catalogConfig, dynamicDestinations, filePrefix, dataSchema))); + } + + private static class WriteDoFn extends DoFn, Iterable>, FileWriteResult> { + + private final DynamicDestinations dynamicDestinations; + private final IcebergCatalogConfig catalogConfig; + private final String filePrefix; + private final Schema dataSchema; + static final Cache LAST_REFRESHED_TABLE_CACHE = + CacheBuilder.newBuilder().expireAfterAccess(10, TimeUnit.MINUTES).build(); + + WriteDoFn( + IcebergCatalogConfig catalogConfig, + DynamicDestinations dynamicDestinations, + String filePrefix, + Schema dataSchema) { + this.catalogConfig = catalogConfig; + this.dynamicDestinations = dynamicDestinations; + this.filePrefix = filePrefix; + this.dataSchema = dataSchema; + } + + @ProcessElement + public void processElement( + @Element KV, Iterable> element, OutputReceiver out) + throws Exception { + String tableIdentifier = checkStateNotNull(element.getKey().getKey().getString(DESTINATION)); + String partitionPath = checkStateNotNull(element.getKey().getKey().getString(PARTITION)); + + IcebergDestination destination = dynamicDestinations.instantiateDestination(tableIdentifier); + Table table = getOrCreateTable(destination, dataSchema); + + // TODO(ahmedabu98): cache this + Map partitionFieldMap = Maps.newHashMap(); + for (PartitionField partitionField : table.spec().fields()) { + partitionFieldMap.put(partitionField.name(), partitionField); + } + partitionPath = getPartitionDataPath(partitionPath, partitionFieldMap); + + StructLike partitionData = + table.spec().isPartitioned() + ? DataFiles.data(table.spec(), partitionPath) + : new PartitionKey(table.spec(), table.schema()); + + String fileName = + destination + .getFileFormat() + .addExtension(String.format("%s-%s", filePrefix, UUID.randomUUID())); + + RecordWriter writer = + new RecordWriter(table, destination.getFileFormat(), fileName, partitionData); + for (Row row : element.getValue()) { + Record record = IcebergUtils.beamRowToIcebergRecord(table.schema(), row); + writer.write(record); + } + writer.close(); + + SerializableDataFile sdf = SerializableDataFile.from(writer.getDataFile(), partitionPath); + out.output( + FileWriteResult.builder() + .setTableIdentifier(destination.getTableIdentifier()) + .setSerializableDataFile(sdf) + .build()); + } + + static final class LastRefreshedTable { + final Table table; + volatile Instant lastRefreshTime; + static final Duration STALENESS_THRESHOLD = Duration.ofMinutes(2); + + LastRefreshedTable(Table table, Instant lastRefreshTime) { + this.table = table; + this.lastRefreshTime = lastRefreshTime; + } + + /** + * Refreshes the table metadata if it is considered stale (older than 2 minutes). + * + *

This method first performs a non-synchronized check on the table's freshness. This + * provides a lock-free fast path that avoids synchronization overhead in the common case + * where the table does not need to be refreshed. If the table might be stale, it then enters + * a synchronized block to ensure that only one thread performs the refresh operation. + */ + void refreshIfStale() { + // Fast path: Avoid entering the synchronized block if the table is not stale. + if (lastRefreshTime.isAfter(Instant.now().minus(STALENESS_THRESHOLD))) { + return; + } + synchronized (this) { + if (lastRefreshTime.isBefore(Instant.now().minus(STALENESS_THRESHOLD))) { + table.refresh(); + lastRefreshTime = Instant.now(); + } + } + } + } + + Table getOrCreateTable(IcebergDestination destination, Schema dataSchema) { + Catalog catalog = catalogConfig.catalog(); + TableIdentifier identifier = destination.getTableIdentifier(); + @Nullable + LastRefreshedTable lastRefreshedTable = LAST_REFRESHED_TABLE_CACHE.getIfPresent(identifier); + if (lastRefreshedTable != null && lastRefreshedTable.table != null) { + lastRefreshedTable.refreshIfStale(); + return lastRefreshedTable.table; + } + + Namespace namespace = identifier.namespace(); + @Nullable IcebergTableCreateConfig createConfig = destination.getTableCreateConfig(); + PartitionSpec partitionSpec = + createConfig != null ? createConfig.getPartitionSpec() : PartitionSpec.unpartitioned(); + Map tableProperties = + createConfig != null && createConfig.getTableProperties() != null + ? createConfig.getTableProperties() + : Maps.newHashMap(); + + @Nullable Table table = null; + synchronized (LAST_REFRESHED_TABLE_CACHE) { + // Create namespace if it does not exist yet + if (!namespace.isEmpty() && catalog instanceof SupportsNamespaces) { + SupportsNamespaces supportsNamespaces = (SupportsNamespaces) catalog; + if (!supportsNamespaces.namespaceExists(namespace)) { + try { + supportsNamespaces.createNamespace(namespace); + LOG.info("Created new namespace '{}'.", namespace); + } catch (AlreadyExistsException ignored) { + // race condition: another worker already created this namespace + } + } + } + + // If table exists, just load it + // Note: the implementation of catalog.tableExists() will load the table to check its + // existence. We don't use it here to avoid double loadTable() calls. + try { + table = catalog.loadTable(identifier); + } catch (NoSuchTableException e) { // Otherwise, create the table + org.apache.iceberg.Schema tableSchema = + IcebergUtils.beamSchemaToIcebergSchema(dataSchema); + try { + table = catalog.createTable(identifier, tableSchema, partitionSpec, tableProperties); + LOG.info( + "Created Iceberg table '{}' with schema: {}\n" + + ", partition spec: {}, table properties: {}", + identifier, + tableSchema, + partitionSpec, + tableProperties); + } catch (AlreadyExistsException ignored) { + // race condition: another worker already created this table + table = catalog.loadTable(identifier); + } + } + } + lastRefreshedTable = new LastRefreshedTable(table, Instant.now()); + LAST_REFRESHED_TABLE_CACHE.put(identifier, lastRefreshedTable); + return table; + } + } +} diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteToPartitions.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteToPartitions.java new file mode 100644 index 000000000000..3f7054ac9ffc --- /dev/null +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteToPartitions.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.iceberg; + +import java.util.UUID; +import org.apache.beam.sdk.coders.IterableCoder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.RowCoder; +import org.apache.beam.sdk.transforms.GroupIntoBatches; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.util.ShardedKey; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.Row; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.joda.time.Duration; + +class WriteToPartitions extends PTransform>, IcebergWriteResult> { + private static final long DEFAULT_BYTES_PER_FILE = (1L << 29); // 512mb + private final IcebergCatalogConfig catalogConfig; + private final DynamicDestinations dynamicDestinations; + private final @Nullable Duration triggeringFrequency; + private final String filePrefix; + + WriteToPartitions( + IcebergCatalogConfig catalogConfig, + DynamicDestinations dynamicDestinations, + @Nullable Duration triggeringFrequency) { + this.dynamicDestinations = dynamicDestinations; + this.catalogConfig = catalogConfig; + this.triggeringFrequency = triggeringFrequency; + // single unique prefix per write transform + this.filePrefix = UUID.randomUUID().toString(); + } + + @Override + public IcebergWriteResult expand(PCollection> input) { + boolean unbounded = IcebergUtils.isUnbounded(input); + + GroupIntoBatches groupIntoPartitions = + GroupIntoBatches.ofByteSize(DEFAULT_BYTES_PER_FILE); + if (unbounded && triggeringFrequency != null) { + groupIntoPartitions.withMaxBufferingDuration(triggeringFrequency); + } + + PCollection, Iterable>> groupedRows = + input + .apply(groupIntoPartitions.withShardedKey()) + .setCoder( + KvCoder.of( + org.apache.beam.sdk.util.ShardedKey.Coder.of( + RowCoder.of(AssignDestinationsAndPartitions.OUTPUT_SCHEMA)), + IterableCoder.of(RowCoder.of(dynamicDestinations.getDataSchema())))); + + PCollection writtenFiles = + groupedRows.apply( + new WritePartitionedRowsToFiles(catalogConfig, dynamicDestinations, filePrefix)); + + // Commit files to tables + PCollection> snapshots = + writtenFiles.apply(new AppendFilesToTables(catalogConfig, filePrefix)); + + return new IcebergWriteResult(input.getPipeline(), snapshots); + } +} diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOWriteTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOWriteTest.java index a7349bffdfa0..03f2ac156c97 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOWriteTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOWriteTest.java @@ -17,6 +17,7 @@ */ package org.apache.beam.sdk.io.iceberg; +import static java.util.Arrays.asList; import static org.apache.beam.sdk.io.iceberg.IcebergUtils.beamRowToIcebergRecord; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertFalse; @@ -64,14 +65,22 @@ import org.junit.Test; import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; +import org.junit.runners.Parameterized; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -@RunWith(JUnit4.class) +@RunWith(Parameterized.class) public class IcebergIOWriteTest implements Serializable { private static final Logger LOG = LoggerFactory.getLogger(IcebergIOWriteTest.class); + @Parameterized.Parameters + public static Iterable data() { + return asList(new Object[][] {{false}, {true}}); + } + + @Parameterized.Parameter(0) + public boolean groupByPartitions; + @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); @Rule @@ -79,6 +88,13 @@ public class IcebergIOWriteTest implements Serializable { @Rule public transient TestPipeline testPipeline = TestPipeline.create(); + private IcebergIO.WriteRows maybeGroupByPartitions(IcebergIO.WriteRows transform) { + if (groupByPartitions) { + return transform.groupingByPartitions(); + } + return transform; + } + @Test public void testSimpleAppend() throws Exception { TableIdentifier tableId = @@ -99,7 +115,7 @@ public void testSimpleAppend() throws Exception { testPipeline .apply("Records To Add", Create.of(TestFixtures.asRows(TestFixtures.FILE1SNAPSHOT1))) .setRowSchema(IcebergUtils.icebergSchemaToBeamSchema(TestFixtures.SCHEMA)) - .apply("Append To Table", IcebergIO.writeRows(catalog).to(tableId)); + .apply("Append To Table", maybeGroupByPartitions(IcebergIO.writeRows(catalog).to(tableId))); LOG.info("Executing pipeline"); testPipeline.run().waitUntilFinish(); @@ -129,7 +145,7 @@ public void testCreateNamespaceAndTable() { testPipeline .apply("Records To Add", Create.of(TestFixtures.asRows(TestFixtures.FILE1SNAPSHOT1))) .setRowSchema(IcebergUtils.icebergSchemaToBeamSchema(TestFixtures.SCHEMA)) - .apply("Append To Table", IcebergIO.writeRows(catalog).to(tableId)); + .apply("Append To Table", maybeGroupByPartitions(IcebergIO.writeRows(catalog).to(tableId))); assertFalse(((SupportsNamespaces) catalog.catalog()).namespaceExists(newNamespace)); LOG.info("Executing pipeline"); @@ -200,7 +216,9 @@ public IcebergDestination instantiateDestination(String dest) { TestFixtures.FILE1SNAPSHOT2, TestFixtures.FILE1SNAPSHOT3)))) .setRowSchema(IcebergUtils.icebergSchemaToBeamSchema(TestFixtures.SCHEMA)) - .apply("Append To Table", IcebergIO.writeRows(catalog).to(dynamicDestinations)); + .apply( + "Append To Table", + maybeGroupByPartitions(IcebergIO.writeRows(catalog).to(dynamicDestinations))); LOG.info("Executing pipeline"); testPipeline.run().waitUntilFinish(); @@ -293,7 +311,9 @@ public IcebergDestination instantiateDestination(String dest) { testPipeline .apply("Records To Add", Create.of(TestFixtures.asRows(elements))) .setRowSchema(IcebergUtils.icebergSchemaToBeamSchema(TestFixtures.SCHEMA)) - .apply("Append To Table", IcebergIO.writeRows(catalog).to(dynamicDestinations)); + .apply( + "Append To Table", + maybeGroupByPartitions(IcebergIO.writeRows(catalog).to(dynamicDestinations))); LOG.info("Executing pipeline"); testPipeline.run().waitUntilFinish(); @@ -386,9 +406,10 @@ public void testStreamingWrite() { .apply("Stream Records", stream) .apply( "Append To Table", - IcebergIO.writeRows(catalog) - .to(tableId) - .withTriggeringFrequency(Duration.standardSeconds(3))) + maybeGroupByPartitions( + IcebergIO.writeRows(catalog) + .to(tableId) + .withTriggeringFrequency(Duration.standardSeconds(3)))) .getSnapshots(); // verify that 2 snapshots are created (one per triggering interval) PCollection snapshots = output.apply(Count.globally()); diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProviderTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProviderTest.java index 7028a394d2fd..23cb1d38ba3c 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProviderTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProviderTest.java @@ -17,6 +17,7 @@ */ package org.apache.beam.sdk.io.iceberg; +import static java.util.Arrays.asList; import static org.apache.beam.sdk.io.iceberg.IcebergWriteSchemaTransformProvider.Configuration; import static org.apache.beam.sdk.io.iceberg.IcebergWriteSchemaTransformProvider.INPUT_TAG; import static org.apache.beam.sdk.io.iceberg.IcebergWriteSchemaTransformProvider.SNAPSHOTS_TAG; @@ -42,6 +43,7 @@ import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.testing.TestStream; import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; @@ -52,6 +54,7 @@ import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionRowTuple; import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.TypeDescriptors; import org.apache.beam.sdk.values.ValueInSingleWindow; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; @@ -72,12 +75,19 @@ import org.junit.Test; import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; +import org.junit.runners.Parameterized; import org.yaml.snakeyaml.Yaml; /** Tests for {@link IcebergWriteSchemaTransformProvider}. */ -@RunWith(JUnit4.class) +@RunWith(Parameterized.class) public class IcebergWriteSchemaTransformProviderTest { + @Parameterized.Parameters + public static Iterable data() { + return asList(new Object[][] {{false}, {true}}); + } + + @Parameterized.Parameter(0) + public boolean groupByPartitions; @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); @@ -115,6 +125,7 @@ public void testSimpleAppend() { .setTable(identifier) .setCatalogName("name") .setCatalogProperties(properties) + .setGroupByPartitions(groupByPartitions) .build(); PCollectionRowTuple input = @@ -151,10 +162,14 @@ public void testWriteUsingManagedTransform() { String.format( "table: %s\n" + "catalog_name: test-name\n" + + "group_by_partitions: %s\n" + "catalog_properties: \n" + " type: %s\n" + " warehouse: %s", - identifier, CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP, warehouse.location); + identifier, + groupByPartitions, + CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP, + warehouse.location); Map configMap = new Yaml().load(yamlConfig); PCollection inputRows = @@ -204,6 +219,7 @@ private void writeToDynamicDestinationsAndFilter(@Nullable String operation, boo ImmutableMap.builder() .put("table", destinationTemplate) .put("catalog_name", "test-name") + .put("group_by_partitions", groupByPartitions) .put( "catalog_properties", ImmutableMap.builder() @@ -415,7 +431,9 @@ public void testWritePartitionedData() { "table", identifier, "catalog_properties", - ImmutableMap.of("type", "hadoop", "warehouse", warehouse.location)); + ImmutableMap.of("type", "hadoop", "warehouse", warehouse.location), + "group_by_partitions", + groupByPartitions); List rows = new ArrayList<>(); for (int i = 0; i < 30; i++) { @@ -491,7 +509,9 @@ public void testWriteCreateTableWithPartitionSpec() { "year(y_datetime)", "month(m_date)", "day(d_date)", - "hour(h_datetimetz)")); + "hour(h_datetimetz)"), + "group_by_partitions", + groupByPartitions); List rows = new ArrayList<>(); for (int i = 0; i < 30; i++) { @@ -563,7 +583,9 @@ public void testWriteCreateTableWithTablePropertiesSpec() { "catalog_properties", ImmutableMap.of("type", "hadoop", "warehouse", warehouse.location), "table_properties", - tableProperties); + tableProperties, + "group_by_partitions", + groupByPartitions); List rows = new ArrayList<>(); for (int i = 0; i < 10; i++) { @@ -622,7 +644,9 @@ public void testWriteCreateTableWithTableProperties() { "table", identifier, "catalog_properties", - ImmutableMap.of("type", "hadoop", "warehouse", warehouse.location)); + ImmutableMap.of("type", "hadoop", "warehouse", warehouse.location), + "group_by_partitions", + groupByPartitions); PCollection result = testPipeline From 0795912ca38bfc1d12f8c8fcb10aa6208fc612a0 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Mon, 13 Apr 2026 17:28:41 -0400 Subject: [PATCH 02/14] add documentation; fix tests; extend to YAML --- .../IO_Iceberg_Integration_Tests.json | 2 +- .../AssignDestinationsAndPartitions.java | 12 ++--- .../apache/beam/sdk/io/iceberg/IcebergIO.java | 52 +++++++++++-------- .../IcebergWriteSchemaTransformProvider.java | 15 ++++-- .../sdk/io/iceberg/WriteToPartitions.java | 20 ++++++- .../sdk/io/iceberg/IcebergIOWriteTest.java | 38 +++++++------- ...ebergWriteSchemaTransformProviderTest.java | 31 ++++++----- .../iceberg/catalog/IcebergCatalogBaseIT.java | 27 ++++++++++ sdks/python/apache_beam/yaml/yaml_io.py | 8 ++- 9 files changed, 133 insertions(+), 72 deletions(-) diff --git a/.github/trigger_files/IO_Iceberg_Integration_Tests.json b/.github/trigger_files/IO_Iceberg_Integration_Tests.json index b73af5e61a43..7ab7bcd9a9c6 100644 --- a/.github/trigger_files/IO_Iceberg_Integration_Tests.json +++ b/.github/trigger_files/IO_Iceberg_Integration_Tests.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run.", - "modification": 1 + "modification": 2 } diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AssignDestinationsAndPartitions.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AssignDestinationsAndPartitions.java index 4e1a4f2883ad..9ce1d5602a08 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AssignDestinationsAndPartitions.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AssignDestinationsAndPartitions.java @@ -72,8 +72,8 @@ public PCollection> expand(PCollection input) { } static class AssignDoFn extends DoFn> { - static final Map PARTITION_KEYS = new ConcurrentHashMap<>(); - static final Map WRAPPERS = new ConcurrentHashMap<>(); + private final Map partitionKeys = new ConcurrentHashMap<>(); + private final Map wrappers = new ConcurrentHashMap<>(); private final DynamicDestinations dynamicDestinations; private final IcebergCatalogConfig catalogConfig; @@ -94,8 +94,8 @@ public void processElement( ValueInSingleWindow.of(element, timestamp, window, paneInfo)); Row data = dynamicDestinations.getData(element); - @Nullable PartitionKey partitionKey = PARTITION_KEYS.get(tableIdentifier); - @Nullable BeamRowWrapper wrapper = WRAPPERS.get(tableIdentifier); + @Nullable PartitionKey partitionKey = partitionKeys.get(tableIdentifier); + @Nullable BeamRowWrapper wrapper = wrappers.get(tableIdentifier); if (partitionKey == null || wrapper == null) { PartitionSpec spec = PartitionSpec.unpartitioned(); Schema schema = IcebergUtils.beamSchemaToIcebergSchema(data.getSchema()); @@ -117,8 +117,8 @@ public void processElement( } partitionKey = new PartitionKey(spec, schema); wrapper = new BeamRowWrapper(data.getSchema(), schema.asStruct()); - PARTITION_KEYS.put(tableIdentifier, partitionKey); - WRAPPERS.put(tableIdentifier, wrapper); + partitionKeys.put(tableIdentifier, partitionKey); + wrappers.put(tableIdentifier, wrapper); } partitionKey.partition(wrapper.wrap(data)); String partitionPath = partitionKey.toPath(); diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java index 529095b6ca71..b36364b35d3b 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java @@ -31,6 +31,7 @@ import org.apache.beam.sdk.values.Row; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Predicates; +import org.apache.iceberg.DistributionMode; import org.apache.iceberg.Table; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.TableIdentifier; @@ -383,7 +384,7 @@ public class IcebergIO { public static WriteRows writeRows(IcebergCatalogConfig catalog) { return new AutoValue_IcebergIO_WriteRows.Builder() .setCatalogConfig(catalog) - .setGroupByPartitions(false) + .setDistributionMode(DistributionMode.NONE) .build(); } @@ -400,7 +401,7 @@ public abstract static class WriteRows extends PTransform, Iceb abstract @Nullable Integer getDirectWriteByteLimit(); - abstract boolean getGroupByPartitions(); + abstract DistributionMode getDistributionMode(); abstract Builder toBuilder(); @@ -416,7 +417,7 @@ abstract static class Builder { abstract Builder setDirectWriteByteLimit(Integer directWriteByteLimit); - abstract Builder setGroupByPartitions(boolean GroupByPartitions); + abstract Builder setDistributionMode(DistributionMode mode); abstract WriteRows build(); } @@ -455,8 +456,8 @@ public WriteRows withDirectWriteByteLimit(Integer directWriteByteLimit) { * written to only one partition. For partitioned tables, this helps significantly to reduce the * number of small files. */ - public WriteRows groupingByPartitions() { - return toBuilder().setGroupByPartitions(true).build(); + public WriteRows withDistributionMode(DistributionMode mode) { + return toBuilder().setDistributionMode(mode).build(); } @Override @@ -481,24 +482,29 @@ public IcebergWriteResult expand(PCollection input) { "Must only provide direct write limit for unbounded pipelines."); } - if (getGroupByPartitions()) { - return input - .apply( - "AssignDestinationAndPartition", - new AssignDestinationsAndPartitions(destinations, getCatalogConfig())) - .apply( - "Write Rows to Partitions", - new WriteToPartitions(getCatalogConfig(), destinations, getTriggeringFrequency())); - } else { - return input - .apply("Assign Table Destinations", new AssignDestinations(destinations)) - .apply( - "Write Rows to Destinations", - new WriteToDestinations( - getCatalogConfig(), - destinations, - getTriggeringFrequency(), - getDirectWriteByteLimit())); + switch (getDistributionMode()) { + case NONE: + return input + .apply("Assign Table Destinations", new AssignDestinations(destinations)) + .apply( + "Write Rows to Destinations", + new WriteToDestinations( + getCatalogConfig(), + destinations, + getTriggeringFrequency(), + getDirectWriteByteLimit())); + case HASH: + return input + .apply( + "AssignDestinationAndPartition", + new AssignDestinationsAndPartitions(destinations, getCatalogConfig())) + .apply( + "Write Rows to Partitions", + new WriteToPartitions( + getCatalogConfig(), destinations, getTriggeringFrequency())); + default: + throw new UnsupportedOperationException( + "Unsupported distribution mode: " + getDistributionMode()); } } } diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java index d8615e128315..4ee771b5ed12 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java @@ -42,6 +42,7 @@ import org.apache.beam.sdk.values.PCollectionRowTuple; import org.apache.beam.sdk.values.Row; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; +import org.apache.iceberg.DistributionMode; import org.apache.iceberg.FileFormat; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Duration; @@ -134,7 +135,11 @@ public static Builder builder() { + " please visit https://iceberg.apache.org/docs/latest/configuration/#table-properties.") public abstract @Nullable Map getTableProperties(); - public abstract @Nullable Boolean getGroupByPartitions(); + @SchemaFieldDescription( + "Defines distribution of write data. Supported distributions:" + + "\n- none: don't shuffle rows (default)" + + "\n- hash: shuffle rows by partition key before writing data") + public abstract @Nullable String getDistributionMode(); @AutoValue.Builder public abstract static class Builder { @@ -160,7 +165,7 @@ public abstract static class Builder { public abstract Builder setTableProperties(Map tableProperties); - public abstract Builder setGroupByPartitions(Boolean groupByPartitions); + public abstract Builder setDistributionMode(String mode); public abstract Configuration build(); } @@ -242,9 +247,9 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { writeTransform = writeTransform.withDirectWriteByteLimit(directWriteByteLimit); } - @Nullable Boolean groupByPartitions = configuration.getGroupByPartitions(); - if (groupByPartitions != null && groupByPartitions) { - writeTransform = writeTransform.groupingByPartitions(); + @Nullable String mode = configuration.getDistributionMode(); + if (mode != null) { + writeTransform = writeTransform.withDistributionMode(DistributionMode.fromName(mode)); } // TODO: support dynamic destinations diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteToPartitions.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteToPartitions.java index 3f7054ac9ffc..b17100cef780 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteToPartitions.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteToPartitions.java @@ -17,12 +17,18 @@ */ package org.apache.beam.sdk.io.iceberg; +import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull; + import java.util.UUID; import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.RowCoder; import org.apache.beam.sdk.transforms.GroupIntoBatches; import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime; +import org.apache.beam.sdk.transforms.windowing.GlobalWindows; +import org.apache.beam.sdk.transforms.windowing.Repeatedly; +import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.util.ShardedKey; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; @@ -55,7 +61,7 @@ public IcebergWriteResult expand(PCollection> input) { GroupIntoBatches groupIntoPartitions = GroupIntoBatches.ofByteSize(DEFAULT_BYTES_PER_FILE); if (unbounded && triggeringFrequency != null) { - groupIntoPartitions.withMaxBufferingDuration(triggeringFrequency); + groupIntoPartitions = groupIntoPartitions.withMaxBufferingDuration(triggeringFrequency); } PCollection, Iterable>> groupedRows = @@ -71,6 +77,18 @@ public IcebergWriteResult expand(PCollection> input) { groupedRows.apply( new WritePartitionedRowsToFiles(catalogConfig, dynamicDestinations, filePrefix)); + if (unbounded && triggeringFrequency != null) { + writtenFiles = + writtenFiles.apply( + "ApplyUserTrigger", + Window.into(new GlobalWindows()) + .triggering( + Repeatedly.forever( + AfterProcessingTime.pastFirstElementInPane() + .plusDelayOf(checkArgumentNotNull(triggeringFrequency)))) + .discardingFiredPanes()); + } + // Commit files to tables PCollection> snapshots = writtenFiles.apply(new AppendFilesToTables(catalogConfig, filePrefix)); diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOWriteTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOWriteTest.java index 03f2ac156c97..1f43b8e45ec4 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOWriteTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOWriteTest.java @@ -22,6 +22,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import static org.junit.Assume.assumeTrue; import java.io.Serializable; import java.util.List; @@ -45,6 +46,7 @@ import org.apache.iceberg.AppendFiles; import org.apache.iceberg.CatalogUtil; import org.apache.iceberg.DataFile; +import org.apache.iceberg.DistributionMode; import org.apache.iceberg.FileFormat; import org.apache.iceberg.Table; import org.apache.iceberg.catalog.Namespace; @@ -75,11 +77,11 @@ public class IcebergIOWriteTest implements Serializable { @Parameterized.Parameters public static Iterable data() { - return asList(new Object[][] {{false}, {true}}); + return asList(new Object[][] {{DistributionMode.NONE}, {DistributionMode.HASH}}); } @Parameterized.Parameter(0) - public boolean groupByPartitions; + public DistributionMode distributionMode; @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); @@ -88,13 +90,6 @@ public static Iterable data() { @Rule public transient TestPipeline testPipeline = TestPipeline.create(); - private IcebergIO.WriteRows maybeGroupByPartitions(IcebergIO.WriteRows transform) { - if (groupByPartitions) { - return transform.groupingByPartitions(); - } - return transform; - } - @Test public void testSimpleAppend() throws Exception { TableIdentifier tableId = @@ -115,7 +110,9 @@ public void testSimpleAppend() throws Exception { testPipeline .apply("Records To Add", Create.of(TestFixtures.asRows(TestFixtures.FILE1SNAPSHOT1))) .setRowSchema(IcebergUtils.icebergSchemaToBeamSchema(TestFixtures.SCHEMA)) - .apply("Append To Table", maybeGroupByPartitions(IcebergIO.writeRows(catalog).to(tableId))); + .apply( + "Append To Table", + IcebergIO.writeRows(catalog).to(tableId).withDistributionMode(distributionMode)); LOG.info("Executing pipeline"); testPipeline.run().waitUntilFinish(); @@ -145,7 +142,9 @@ public void testCreateNamespaceAndTable() { testPipeline .apply("Records To Add", Create.of(TestFixtures.asRows(TestFixtures.FILE1SNAPSHOT1))) .setRowSchema(IcebergUtils.icebergSchemaToBeamSchema(TestFixtures.SCHEMA)) - .apply("Append To Table", maybeGroupByPartitions(IcebergIO.writeRows(catalog).to(tableId))); + .apply( + "Append To Table", + IcebergIO.writeRows(catalog).to(tableId).withDistributionMode(distributionMode)); assertFalse(((SupportsNamespaces) catalog.catalog()).namespaceExists(newNamespace)); LOG.info("Executing pipeline"); @@ -218,7 +217,9 @@ public IcebergDestination instantiateDestination(String dest) { .setRowSchema(IcebergUtils.icebergSchemaToBeamSchema(TestFixtures.SCHEMA)) .apply( "Append To Table", - maybeGroupByPartitions(IcebergIO.writeRows(catalog).to(dynamicDestinations))); + IcebergIO.writeRows(catalog) + .to(dynamicDestinations) + .withDistributionMode(distributionMode)); LOG.info("Executing pipeline"); testPipeline.run().waitUntilFinish(); @@ -245,6 +246,7 @@ public IcebergDestination instantiateDestination(String dest) { */ @Test public void testDynamicDestinationsWithSpillover() throws Exception { + assumeTrue(distributionMode.equals(DistributionMode.NONE)); final String salt = Long.toString(UUID.randomUUID().hashCode(), 16); // Create far more tables than the max writers per bundle @@ -311,9 +313,7 @@ public IcebergDestination instantiateDestination(String dest) { testPipeline .apply("Records To Add", Create.of(TestFixtures.asRows(elements))) .setRowSchema(IcebergUtils.icebergSchemaToBeamSchema(TestFixtures.SCHEMA)) - .apply( - "Append To Table", - maybeGroupByPartitions(IcebergIO.writeRows(catalog).to(dynamicDestinations))); + .apply("Append To Table", IcebergIO.writeRows(catalog).to(dynamicDestinations)); LOG.info("Executing pipeline"); testPipeline.run().waitUntilFinish(); @@ -406,10 +406,10 @@ public void testStreamingWrite() { .apply("Stream Records", stream) .apply( "Append To Table", - maybeGroupByPartitions( - IcebergIO.writeRows(catalog) - .to(tableId) - .withTriggeringFrequency(Duration.standardSeconds(3)))) + IcebergIO.writeRows(catalog) + .to(tableId) + .withTriggeringFrequency(Duration.standardSeconds(3)) + .withDistributionMode(distributionMode)) .getSnapshots(); // verify that 2 snapshots are created (one per triggering interval) PCollection snapshots = output.apply(Count.globally()); diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProviderTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProviderTest.java index 23cb1d38ba3c..80b0389d78ea 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProviderTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProviderTest.java @@ -43,7 +43,6 @@ import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.testing.TestStream; import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; @@ -54,11 +53,11 @@ import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionRowTuple; import org.apache.beam.sdk.values.Row; -import org.apache.beam.sdk.values.TypeDescriptors; import org.apache.beam.sdk.values.ValueInSingleWindow; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.DistributionMode; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Table; import org.apache.iceberg.catalog.TableIdentifier; @@ -83,11 +82,11 @@ public class IcebergWriteSchemaTransformProviderTest { @Parameterized.Parameters public static Iterable data() { - return asList(new Object[][] {{false}, {true}}); + return asList(new Object[][] {{DistributionMode.NONE}, {DistributionMode.HASH}}); } @Parameterized.Parameter(0) - public boolean groupByPartitions; + public DistributionMode distributionMode; @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); @@ -125,7 +124,7 @@ public void testSimpleAppend() { .setTable(identifier) .setCatalogName("name") .setCatalogProperties(properties) - .setGroupByPartitions(groupByPartitions) + .setDistributionMode(distributionMode.name()) .build(); PCollectionRowTuple input = @@ -162,12 +161,12 @@ public void testWriteUsingManagedTransform() { String.format( "table: %s\n" + "catalog_name: test-name\n" - + "group_by_partitions: %s\n" + + "distribution_mode: %s\n" + "catalog_properties: \n" + " type: %s\n" + " warehouse: %s", identifier, - groupByPartitions, + distributionMode.name(), CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP, warehouse.location); Map configMap = new Yaml().load(yamlConfig); @@ -219,7 +218,7 @@ private void writeToDynamicDestinationsAndFilter(@Nullable String operation, boo ImmutableMap.builder() .put("table", destinationTemplate) .put("catalog_name", "test-name") - .put("group_by_partitions", groupByPartitions) + .put("distribution_mode", distributionMode.name()) .put( "catalog_properties", ImmutableMap.builder() @@ -432,8 +431,8 @@ public void testWritePartitionedData() { identifier, "catalog_properties", ImmutableMap.of("type", "hadoop", "warehouse", warehouse.location), - "group_by_partitions", - groupByPartitions); + "distribution_mode", + distributionMode.name()); List rows = new ArrayList<>(); for (int i = 0; i < 30; i++) { @@ -510,8 +509,8 @@ public void testWriteCreateTableWithPartitionSpec() { "month(m_date)", "day(d_date)", "hour(h_datetimetz)"), - "group_by_partitions", - groupByPartitions); + "distribution_mode", + distributionMode.name()); List rows = new ArrayList<>(); for (int i = 0; i < 30; i++) { @@ -584,8 +583,8 @@ public void testWriteCreateTableWithTablePropertiesSpec() { ImmutableMap.of("type", "hadoop", "warehouse", warehouse.location), "table_properties", tableProperties, - "group_by_partitions", - groupByPartitions); + "distribution_mode", + distributionMode.name()); List rows = new ArrayList<>(); for (int i = 0; i < 10; i++) { @@ -645,8 +644,8 @@ public void testWriteCreateTableWithTableProperties() { identifier, "catalog_properties", ImmutableMap.of("type", "hadoop", "warehouse", warehouse.location), - "group_by_partitions", - groupByPartitions); + "distribution_mode", + distributionMode.name()); PCollection result = testPipeline diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/IcebergCatalogBaseIT.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/IcebergCatalogBaseIT.java index e81f75c40fb1..caab3cebe6fb 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/IcebergCatalogBaseIT.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/IcebergCatalogBaseIT.java @@ -687,6 +687,32 @@ public void testWriteToPartitionedTable() throws IOException { returnedRecords, containsInAnyOrder(inputRows.stream().map(RECORD_FUNC::apply).toArray())); } + @Test + public void testWriteToPartitionedTableWithHashDistribution() throws IOException { + Map config = new HashMap<>(managedIcebergConfig(tableId())); + int truncLength = "value_x".length(); + List partitionFields = + Arrays.asList("bool_field", "hour(datetime)", "truncate(str, " + truncLength + ")"); + config.put("partition_fields", partitionFields); + config.put("distribution_mode", "hash"); + PCollection input = pipeline.apply(Create.of(inputRows)).setRowSchema(BEAM_SCHEMA); + input.apply(Managed.write(ICEBERG).withConfig(config)); + pipeline.run().waitUntilFinish(); + + // Read back and check records are correct + Table table = catalog.loadTable(TableIdentifier.parse(tableId())); + List returnedRecords = readRecords(table); + PartitionSpec expectedSpec = + PartitionSpec.builderFor(table.schema()) + .identity("bool_field") + .hour("datetime") + .truncate("str", truncLength) + .build(); + assertEquals(expectedSpec, table.spec()); + assertThat( + returnedRecords, containsInAnyOrder(inputRows.stream().map(RECORD_FUNC::apply).toArray())); + } + private PeriodicImpulse getStreamingSource() { return PeriodicImpulse.create() .stopAfter(Duration.millis(numRecords() - 1)) @@ -799,6 +825,7 @@ private void writeToDynamicDestinations( if (partitioning) { Preconditions.checkState(filterOp == null || !filterOp.equals("only")); writeConfig.put("partition_fields", Arrays.asList("bool_field", "modulo_5")); + writeConfig.put("distribution_mode", "hash"); } // Write with Beam diff --git a/sdks/python/apache_beam/yaml/yaml_io.py b/sdks/python/apache_beam/yaml/yaml_io.py index f8702a1da209..4541d79ed346 100644 --- a/sdks/python/apache_beam/yaml/yaml_io.py +++ b/sdks/python/apache_beam/yaml/yaml_io.py @@ -562,6 +562,7 @@ def write_to_iceberg( keep: Optional[Iterable[str]] = None, drop: Optional[Iterable[str]] = None, only: Optional[str] = None, + distribution_mode: Optional[str] = False, ): # TODO(robertwb): It'd be nice to derive this list of parameters, along with # their types and docs, programmatically from the iceberg (or managed) @@ -611,6 +612,10 @@ def write_to_iceberg( only: The name of exactly one field to keep as the top level record when writing to the destination. All other fields are dropped. This field must be of row type. Mutually exclusive with drop and keep. + distribution_mode: Defines distribution of write data. Supported + distributions: + - none: don't shuffle rows (default) + - hash: shuffle rows by partition key before writing data """ return beam.managed.Write( "iceberg", @@ -624,7 +629,8 @@ def write_to_iceberg( triggering_frequency_seconds=triggering_frequency_seconds, keep=keep, drop=drop, - only=only)) + only=only, + distribution_mode=distribution_mode)) def io_providers(): From 3c29bbd9cd2beb8c7f47acc814a43fb9be56a1b0 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Tue, 14 Apr 2026 19:17:55 -0400 Subject: [PATCH 03/14] add BeamRowWrapper test class --- .../AssignDestinationsAndPartitions.java | 18 +- .../sdk/io/iceberg/BeamRowWrapperTest.java | 227 ++++++++++++++++++ 2 files changed, 240 insertions(+), 5 deletions(-) create mode 100644 sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/BeamRowWrapperTest.java diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AssignDestinationsAndPartitions.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AssignDestinationsAndPartitions.java index 9ce1d5602a08..db30fd66a8ef 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AssignDestinationsAndPartitions.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AssignDestinationsAndPartitions.java @@ -17,8 +17,10 @@ */ package org.apache.beam.sdk.io.iceberg; +import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; + +import java.util.HashMap; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.RowCoder; import org.apache.beam.sdk.transforms.DoFn; @@ -35,6 +37,7 @@ import org.apache.iceberg.Schema; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.exceptions.NoSuchTableException; +import org.checkerframework.checker.nullness.qual.MonotonicNonNull; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Instant; @@ -72,8 +75,8 @@ public PCollection> expand(PCollection input) { } static class AssignDoFn extends DoFn> { - private final Map partitionKeys = new ConcurrentHashMap<>(); - private final Map wrappers = new ConcurrentHashMap<>(); + private final Map partitionKeys = new HashMap<>(); + private transient @MonotonicNonNull Map wrappers; private final DynamicDestinations dynamicDestinations; private final IcebergCatalogConfig catalogConfig; @@ -82,6 +85,11 @@ static class AssignDoFn extends DoFn> { this.catalogConfig = catalogConfig; } + @Setup + public void setup() { + this.wrappers = new HashMap<>(); + } + @ProcessElement public void processElement( @Element Row element, @@ -95,7 +103,7 @@ public void processElement( Row data = dynamicDestinations.getData(element); @Nullable PartitionKey partitionKey = partitionKeys.get(tableIdentifier); - @Nullable BeamRowWrapper wrapper = wrappers.get(tableIdentifier); + @Nullable BeamRowWrapper wrapper = checkStateNotNull(wrappers).get(tableIdentifier); if (partitionKey == null || wrapper == null) { PartitionSpec spec = PartitionSpec.unpartitioned(); Schema schema = IcebergUtils.beamSchemaToIcebergSchema(data.getSchema()); @@ -118,7 +126,7 @@ public void processElement( partitionKey = new PartitionKey(spec, schema); wrapper = new BeamRowWrapper(data.getSchema(), schema.asStruct()); partitionKeys.put(tableIdentifier, partitionKey); - wrappers.put(tableIdentifier, wrapper); + checkStateNotNull(wrappers).put(tableIdentifier, wrapper); } partitionKey.partition(wrapper.wrap(data)); String partitionPath = partitionKey.toPath(); diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/BeamRowWrapperTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/BeamRowWrapperTest.java new file mode 100644 index 000000000000..dfd75047cdfa --- /dev/null +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/BeamRowWrapperTest.java @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.iceberg; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; + +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.time.Instant; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.logicaltypes.Date; +import org.apache.beam.sdk.schemas.logicaltypes.DateTime; +import org.apache.beam.sdk.schemas.logicaltypes.FixedPrecisionNumeric; +import org.apache.beam.sdk.schemas.logicaltypes.MicrosInstant; +import org.apache.beam.sdk.schemas.logicaltypes.Time; +import org.apache.beam.sdk.values.Row; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.DateTimeUtil; +import org.apache.iceberg.util.UUIDUtil; +import org.junit.Test; + +public class BeamRowWrapperTest { + private static final Schema NESTED_BEAM_SCHEMA = + Schema.builder().addInt32Field("nested_int").build(); + private static final Schema BEAM_SCHEMA = + Schema.builder() + .addByteField("byte_field") + .addInt16Field("int16_field") + .addStringField("string_field") + .addByteArrayField("bytes_field") + .addByteArrayField("uuid_field") + .addDecimalField("decimal_field") + .addDateTimeField("datetime_field") + .addLogicalTypeField("micros_instant_field", new MicrosInstant()) + .addLogicalTypeField("date_time_field", new DateTime()) + .addLogicalTypeField("date_field", new Date()) + .addLogicalTypeField("time_field", new Time()) + .addLogicalTypeField("fixed_numeric_field", FixedPrecisionNumeric.of(10, 2)) + .addRowField("row_field", NESTED_BEAM_SCHEMA) + .addInt32Field("pass_through_field") + .build(); + private static final Types.StructType ICEBERG_STRUCT = + Types.StructType.of( + Types.NestedField.required(1, "byte_field", Types.IntegerType.get()), + Types.NestedField.required(2, "int16_field", Types.IntegerType.get()), + Types.NestedField.required(3, "string_field", Types.StringType.get()), + Types.NestedField.required(4, "bytes_field", Types.BinaryType.get()), + Types.NestedField.required(5, "uuid_field", Types.UUIDType.get()), + Types.NestedField.required(6, "decimal_field", Types.DecimalType.of(10, 2)), + Types.NestedField.required(7, "datetime_field", Types.TimestampType.withZone()), + Types.NestedField.required(8, "micros_instant_field", Types.TimestampType.withZone()), + Types.NestedField.required(9, "date_time_field", Types.TimestampType.withoutZone()), + Types.NestedField.required(10, "date_field", Types.DateType.get()), + Types.NestedField.required(11, "time_field", Types.TimeType.get()), + Types.NestedField.required(12, "fixed_numeric_field", Types.DecimalType.of(10, 2)), + Types.NestedField.required( + 13, + "row_field", + Types.StructType.of( + Types.NestedField.required(1, "nested_int", Types.IntegerType.get()))), + Types.NestedField.required(14, "pass_through_field", Types.IntegerType.get())); + private static final UUID TEST_UUID = UUID.randomUUID(); + private static final Row NESTED_ROW = Row.withSchema(NESTED_BEAM_SCHEMA).addValue(999).build(); + private static final Row ROW = + Row.withSchema(BEAM_SCHEMA) + .addValues( + (byte) 42, + (short) 123, + "testString", + new byte[] {0x01, 0x02, 0x03}, + ByteBuffer.allocate(16) + .putLong(TEST_UUID.getMostSignificantBits()) + .putLong(TEST_UUID.getLeastSignificantBits()) + .array(), + new BigDecimal("123.45"), + org.joda.time.Instant.now(), + Instant.now(), + LocalDateTime.now(), + LocalDate.now(), + LocalTime.now(), + new BigDecimal("567.89"), + NESTED_ROW, + 888) + .build(); + private static final BeamRowWrapper WRAPPER = + new BeamRowWrapper(BEAM_SCHEMA, ICEBERG_STRUCT).wrap(ROW); + + @Test + public void testSize() { + assertEquals("Size should match the schema field count", 14, WRAPPER.size()); + } + + @Test + public void testUnsupportedSetThrowsException() { + assertThrows(UnsupportedOperationException.class, () -> WRAPPER.set(0, "test")); + } + + @Test + public void testNullRowHandling() { + BeamRowWrapper emptyWrapper = new BeamRowWrapper(BEAM_SCHEMA, ICEBERG_STRUCT); + assertNull( + "Should return null if the underlying row is null", emptyWrapper.get(0, Object.class)); + } + + @Test + public void testNullFieldHandling() { + Schema nullableSchema = Schema.builder().addNullableStringField("nullable_str").build(); + Types.StructType nullableIcebergType = + Types.StructType.of(Types.NestedField.optional(1, "nullable_str", Types.StringType.get())); + + Row nullRow = Row.withSchema(nullableSchema).addValue(null).build(); + BeamRowWrapper nullableWrapper = + new BeamRowWrapper(nullableSchema, nullableIcebergType).wrap(nullRow); + + assertNull("Should return null for a null field value", nullableWrapper.get(0, String.class)); + } + + // --- Type Conversion Tests --- + + @Test + public void testByteConversion() { + assertEquals(ROW.getByte(0), WRAPPER.get(0, Byte.class)); + } + + @Test + public void testInt16Conversion() { + assertEquals(ROW.getInt16(1), WRAPPER.get(1, Short.class)); + } + + @Test + public void testStringConversion() { + assertEquals(ROW.getString(2), WRAPPER.get(2, String.class)); + } + + @Test + public void testBytesToByteBufferConversion() { + assertEquals(ByteBuffer.wrap(ROW.getBytes(3)), WRAPPER.get(3, ByteBuffer.class)); + } + + @Test + public void testBytesToUUIDConversion() { + assertEquals(UUIDUtil.convert(ROW.getBytes(4)), WRAPPER.get(4, UUID.class)); + } + + @Test + public void testDecimalConversion() { + assertEquals(ROW.getDecimal(5), WRAPPER.get(5, BigDecimal.class)); + } + + @Test + public void testDateTimeConversion() { + long expectedJodaMicros = TimeUnit.MILLISECONDS.toMicros(ROW.getDateTime(6).getMillis()); + assertEquals(expectedJodaMicros, (long) WRAPPER.get(6, Long.class)); + } + + @Test + public void testMicrosInstantLogicalTypeConversion() { + Instant javaInstant = ROW.getLogicalTypeValue(7, Instant.class); + long expectedMicrosInstant = + TimeUnit.SECONDS.toMicros(javaInstant.getEpochSecond()) + javaInstant.getNano() / 1000; + assertEquals(expectedMicrosInstant, (long) WRAPPER.get(7, Long.class)); + } + + @Test + public void testDateTimeLogicalTypeConversion() { + long expectedDateTime = + DateTimeUtil.microsFromTimestamp(ROW.getLogicalTypeValue(8, LocalDateTime.class)); + assertEquals(expectedDateTime, (long) WRAPPER.get(8, Long.class)); + } + + @Test + public void testDateLogicalTypeConversion() { + int expectedDate = DateTimeUtil.daysFromDate(ROW.getLogicalTypeValue(9, LocalDate.class)); + assertEquals(expectedDate, (int) WRAPPER.get(9, Integer.class)); + } + + @Test + public void testTimeLogicalTypeConversion() { + long expectedTime = DateTimeUtil.microsFromTime(ROW.getLogicalTypeValue(10, LocalTime.class)); + assertEquals(expectedTime, (long) WRAPPER.get(10, Long.class)); + } + + @Test + public void testFixedPrecisionNumericLogicalTypeConversion() { + assertEquals( + ROW.getLogicalTypeValue(11, FixedPrecisionNumeric.class), + WRAPPER.get(11, BigDecimal.class)); + } + + @Test + public void testNestedRowConversion() { + StructLike nestedWrapperResult = WRAPPER.get(12, StructLike.class); + assertTrue( + "Should return a nested BeamRowWrapper", nestedWrapperResult instanceof BeamRowWrapper); + assertEquals(999, (int) nestedWrapperResult.get(0, Integer.class)); + } + + @Test + public void testPassThroughFallbackConversion() { + // Tests the 'default' case in the switch statement + assertEquals(ROW.getInt32(13), WRAPPER.get(13, Integer.class)); + } +} From 97a71f89f31b7b098b8524011d09d363b425ee89 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Tue, 14 Apr 2026 19:18:24 -0400 Subject: [PATCH 04/14] lint --- sdks/python/apache_beam/yaml/yaml_io.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/yaml/yaml_io.py b/sdks/python/apache_beam/yaml/yaml_io.py index 4541d79ed346..336e32adc253 100644 --- a/sdks/python/apache_beam/yaml/yaml_io.py +++ b/sdks/python/apache_beam/yaml/yaml_io.py @@ -562,7 +562,7 @@ def write_to_iceberg( keep: Optional[Iterable[str]] = None, drop: Optional[Iterable[str]] = None, only: Optional[str] = None, - distribution_mode: Optional[str] = False, + distribution_mode: Optional[str] = None, ): # TODO(robertwb): It'd be nice to derive this list of parameters, along with # their types and docs, programmatically from the iceberg (or managed) From 5ee527837b29940b3a1888d08032ed090560aa09 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Tue, 14 Apr 2026 19:43:48 -0400 Subject: [PATCH 05/14] spotless --- .../org/apache/beam/sdk/io/iceberg/BeamRowWrapperTest.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/BeamRowWrapperTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/BeamRowWrapperTest.java index dfd75047cdfa..22e3f8ffce52 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/BeamRowWrapperTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/BeamRowWrapperTest.java @@ -28,6 +28,7 @@ import java.time.LocalDate; import java.time.LocalDateTime; import java.time.LocalTime; +import java.time.ZoneId; import java.util.UUID; import java.util.concurrent.TimeUnit; import org.apache.beam.sdk.schemas.Schema; @@ -99,7 +100,7 @@ public class BeamRowWrapperTest { new BigDecimal("123.45"), org.joda.time.Instant.now(), Instant.now(), - LocalDateTime.now(), + LocalDateTime.now(ZoneId.systemDefault()), LocalDate.now(), LocalTime.now(), new BigDecimal("567.89"), From 5c55fc7139309c80df3c262889396ab76b76a990 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Tue, 14 Apr 2026 23:40:47 -0400 Subject: [PATCH 06/14] spotless --- .../org/apache/beam/sdk/io/iceberg/BeamRowWrapperTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/BeamRowWrapperTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/BeamRowWrapperTest.java index 22e3f8ffce52..2758c9d1188c 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/BeamRowWrapperTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/BeamRowWrapperTest.java @@ -101,8 +101,8 @@ public class BeamRowWrapperTest { org.joda.time.Instant.now(), Instant.now(), LocalDateTime.now(ZoneId.systemDefault()), - LocalDate.now(), - LocalTime.now(), + LocalDate.now(ZoneId.systemDefault()), + LocalTime.now(ZoneId.systemDefault()), new BigDecimal("567.89"), NESTED_ROW, 888) From 7696e1e12d75256d55a1acdfbdee8bb9cd6d9e8c Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Wed, 15 Apr 2026 11:18:35 -0400 Subject: [PATCH 07/14] fix lint --- sdks/python/apache_beam/ml/rag/ingestion/spanner_it_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/ml/rag/ingestion/spanner_it_test.py b/sdks/python/apache_beam/ml/rag/ingestion/spanner_it_test.py index c371d6fd96b4..3a0d6453690d 100644 --- a/sdks/python/apache_beam/ml/rag/ingestion/spanner_it_test.py +++ b/sdks/python/apache_beam/ml/rag/ingestion/spanner_it_test.py @@ -36,7 +36,7 @@ try: from google.cloud import spanner except ImportError: - spanner = None + spanner = None # type: ignore[assignment] try: from testcontainers.core.container import DockerContainer From e0f8cd52de40f1bc27ea0172b393848027bd58e1 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Fri, 1 May 2026 09:19:48 -0400 Subject: [PATCH 08/14] add BeamRowWrapper javadoc; add an autosharding toggle option; address other comments --- .../IO_Iceberg_Integration_Tests.json | 2 +- .../AssignDestinationsAndPartitions.java | 11 +- .../beam/sdk/io/iceberg/BeamRowWrapper.java | 31 ++ .../apache/beam/sdk/io/iceberg/IcebergIO.java | 28 +- .../IcebergWriteSchemaTransformProvider.java | 28 +- .../iceberg/WritePartitionedRowsToFiles.java | 60 ++- .../sdk/io/iceberg/WriteToPartitions.java | 50 ++- .../sdk/io/iceberg/BeamRowWrapperTest.java | 4 +- .../sdk/io/iceberg/IcebergIOWriteTest.java | 388 +++++++++++++++++- ...ebergWriteSchemaTransformProviderTest.java | 15 +- .../iceberg/catalog/IcebergCatalogBaseIT.java | 1 + 11 files changed, 544 insertions(+), 74 deletions(-) diff --git a/.github/trigger_files/IO_Iceberg_Integration_Tests.json b/.github/trigger_files/IO_Iceberg_Integration_Tests.json index 5d04b2c0a8c7..b73af5e61a43 100644 --- a/.github/trigger_files/IO_Iceberg_Integration_Tests.json +++ b/.github/trigger_files/IO_Iceberg_Integration_Tests.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run.", - "modification": 5 + "modification": 1 } diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AssignDestinationsAndPartitions.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AssignDestinationsAndPartitions.java index db30fd66a8ef..475786d3a4f6 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AssignDestinationsAndPartitions.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AssignDestinationsAndPartitions.java @@ -75,7 +75,7 @@ public PCollection> expand(PCollection input) { } static class AssignDoFn extends DoFn> { - private final Map partitionKeys = new HashMap<>(); + private transient @MonotonicNonNull Map partitionKeys; private transient @MonotonicNonNull Map wrappers; private final DynamicDestinations dynamicDestinations; private final IcebergCatalogConfig catalogConfig; @@ -88,6 +88,7 @@ static class AssignDoFn extends DoFn> { @Setup public void setup() { this.wrappers = new HashMap<>(); + this.partitionKeys = new HashMap<>(); } @ProcessElement @@ -102,7 +103,7 @@ public void processElement( ValueInSingleWindow.of(element, timestamp, window, paneInfo)); Row data = dynamicDestinations.getData(element); - @Nullable PartitionKey partitionKey = partitionKeys.get(tableIdentifier); + @Nullable PartitionKey partitionKey = checkStateNotNull(partitionKeys).get(tableIdentifier); @Nullable BeamRowWrapper wrapper = checkStateNotNull(wrappers).get(tableIdentifier); if (partitionKey == null || wrapper == null) { PartitionSpec spec = PartitionSpec.unpartitioned(); @@ -116,8 +117,8 @@ public void processElement( } else { try { // see if table already exists with a spec - // TODO(ahmedabu98): improve this by periodically refreshing the table to fetch updated - // specs + // TODO(https://github.com/apache/beam/issues/38337): improve this by periodically + // refreshing the table to fetch updated specs spec = catalogConfig.catalog().loadTable(TableIdentifier.parse(tableIdentifier)).spec(); } catch (NoSuchTableException ignored) { // no partition to apply @@ -125,7 +126,7 @@ public void processElement( } partitionKey = new PartitionKey(spec, schema); wrapper = new BeamRowWrapper(data.getSchema(), schema.asStruct()); - partitionKeys.put(tableIdentifier, partitionKey); + checkStateNotNull(partitionKeys).put(tableIdentifier, partitionKey); checkStateNotNull(wrappers).put(tableIdentifier, wrapper); } partitionKey.partition(wrapper.wrap(data)); diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/BeamRowWrapper.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/BeamRowWrapper.java index ad7ec4b7b04f..4ab2b5b931be 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/BeamRowWrapper.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/BeamRowWrapper.java @@ -42,12 +42,24 @@ import org.apache.iceberg.util.UUIDUtil; import org.checkerframework.checker.nullness.qual.Nullable; +/** + * A wrapper that adapts a Beam {@link Row} to Iceberg's {@link StructLike} interface. + * + *

This class allows Beam rows to be processed by Iceberg internal components (like partition + * keys or writers) without requiring a full conversion into Iceberg's internal Record format. It + * handles the mapping between Beam's {@link Schema} and Iceberg's {@link Types.StructType}, + * including complex type conversions for timestamps, logical types, and UUIDs. + * + *

Note: This implementation is read-only. Calls to {@link #set(int, Object)} will + * throw an {@link UnsupportedOperationException}. + */ public class BeamRowWrapper implements StructLike { private final FieldType[] types; private final @Nullable PositionalGetter[] getters; private @Nullable Row row = null; + /** Constructs a new wrapper and pre-computes the mapping between Beam and Iceberg fields. */ public BeamRowWrapper(Schema schema, Types.StructType struct) { int size = schema.getFieldCount(); @@ -60,6 +72,10 @@ public BeamRowWrapper(Schema schema, Types.StructType struct) { } } + /** + * Sets the current Beam {@link Row} to be wrapped. This method allows the wrapper to be reused + * across different rows to minimize object allocation. + */ public BeamRowWrapper wrap(@Nullable Row row) { this.row = row; return this; @@ -70,6 +86,10 @@ public int size() { return types.length; } + /** + * Retrieves a field value from the wrapped row, performing any necessary type conversion to match + * Iceberg's internal expectations (e.g., converting Timestamps to microseconds). + */ @Override public @Nullable T get(int pos, Class javaClass) { if (row == null || row.getValue(pos) == null) { @@ -91,6 +111,17 @@ private interface PositionalGetter { T get(Row data, int pos); } + /** + * Factory method to create a getter that handles type-specific conversion logic. + * + *

Handles special cases: + * + *

+ */ private static @Nullable PositionalGetter buildGetter(FieldType beamType, Type icebergType) { switch (beamType.getTypeName()) { case BYTE: diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java index b36364b35d3b..a5a3beef8f51 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java @@ -385,6 +385,7 @@ public static WriteRows writeRows(IcebergCatalogConfig catalog) { return new AutoValue_IcebergIO_WriteRows.Builder() .setCatalogConfig(catalog) .setDistributionMode(DistributionMode.NONE) + .setAutoSharding(false) .build(); } @@ -403,6 +404,8 @@ public abstract static class WriteRows extends PTransform, Iceb abstract DistributionMode getDistributionMode(); + abstract boolean getAutoSharding(); + abstract Builder toBuilder(); @AutoValue.Builder @@ -419,6 +422,8 @@ abstract static class Builder { abstract Builder setDistributionMode(DistributionMode mode); + abstract Builder setAutoSharding(boolean autoSharding); + abstract WriteRows build(); } @@ -452,14 +457,23 @@ public WriteRows withDirectWriteByteLimit(Integer directWriteByteLimit) { } /** - * Groups incoming rows by partition before sending to writes, ensuring that a given bundle is - * written to only one partition. For partitioned tables, this helps significantly to reduce the - * number of small files. + * Defines distribution of write data. Supported distributions: + * + *
    + *
  1. {@link DistributionMode.NONE}: don't shuffle rows (default) + *
  2. {@link DistributionMode.HASH}: shuffle rows by partition key before writing data + *
+ * + * {@link DistributionMode.RANGE} is not supported yet */ public WriteRows withDistributionMode(DistributionMode mode) { return toBuilder().setDistributionMode(mode).build(); } + public WriteRows withAutosharding() { + return toBuilder().setAutoSharding(true).build(); + } + @Override public IcebergWriteResult expand(PCollection input) { List allToArgs = Arrays.asList(getTableIdentifier(), getDynamicDestinations()); @@ -484,6 +498,9 @@ public IcebergWriteResult expand(PCollection input) { switch (getDistributionMode()) { case NONE: + Preconditions.checkArgument( + !getAutoSharding(), + "Autosharding option is only available with " + "'hash' distribution mode."); return input .apply("Assign Table Destinations", new AssignDestinations(destinations)) .apply( @@ -501,7 +518,10 @@ public IcebergWriteResult expand(PCollection input) { .apply( "Write Rows to Partitions", new WriteToPartitions( - getCatalogConfig(), destinations, getTriggeringFrequency())); + getCatalogConfig(), + destinations, + getTriggeringFrequency(), + getAutoSharding())); default: throw new UnsupportedOperationException( "Unsupported distribution mode: " + getDistributionMode()); diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java index 11c22a13bcb3..639f222696ff 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java @@ -135,12 +135,6 @@ public static Builder builder() { + " please visit https://iceberg.apache.org/docs/latest/configuration/#table-properties.") public abstract @Nullable Map getTableProperties(); - @SchemaFieldDescription( - "Defines distribution of write data. Supported distributions:" - + "\n- none: don't shuffle rows (default)" - + "\n- hash: shuffle rows by partition key before writing data") - public abstract @Nullable String getDistributionMode(); - @SchemaFieldDescription( "Fields used to set the table's sort order, applied when the table is created. " + "Each entry has the form ` [asc|desc] [nulls first|nulls last]`, where `` " @@ -151,6 +145,19 @@ public static Builder builder() { + "For more information on sort orders, please visit https://iceberg.apache.org/spec/#sort-orders.") public abstract @Nullable List getSortFields(); + @SchemaFieldDescription( + "Defines distribution of write data. Supported distributions:" + + "\n- none: don't shuffle rows (default)" + + "\n- hash: shuffle rows by partition key before writing data") + public abstract @Nullable String getDistributionMode(); + + @SchemaFieldDescription( + "Enables dynamic sharding to automatically adjust the number of parallel writers " + + "based on data volume. It handles data skew " + + "by further sub-dividing partitions into multiple shards to prevent bottlenecks " + + "during high-throughput writes. Only available with 'hash' distribution mode.") + public abstract @Nullable Boolean getAutosharding(); + @AutoValue.Builder public abstract static class Builder { public abstract Builder setTable(String table); @@ -175,9 +182,11 @@ public abstract static class Builder { public abstract Builder setTableProperties(Map tableProperties); + public abstract Builder setSortFields(List sortFields); + public abstract Builder setDistributionMode(String mode); - public abstract Builder setSortFields(List sortFields); + public abstract Builder setAutosharding(boolean autosharding); public abstract Configuration build(); } @@ -265,6 +274,11 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { writeTransform = writeTransform.withDistributionMode(DistributionMode.fromName(mode)); } + @Nullable Boolean autoSharding = configuration.getAutosharding(); + if (autoSharding != null && autoSharding) { + writeTransform = writeTransform.withAutosharding(); + } + // TODO: support dynamic destinations IcebergWriteResult result = rows.apply(writeTransform); diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WritePartitionedRowsToFiles.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WritePartitionedRowsToFiles.java index 51f49530493c..9c71e5cdafe8 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WritePartitionedRowsToFiles.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WritePartitionedRowsToFiles.java @@ -34,7 +34,6 @@ import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.util.ShardedKey; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.Row; @@ -54,13 +53,13 @@ import org.apache.iceberg.data.Record; import org.apache.iceberg.exceptions.AlreadyExistsException; import org.apache.iceberg.exceptions.NoSuchTableException; +import org.checkerframework.checker.nullness.qual.MonotonicNonNull; import org.checkerframework.checker.nullness.qual.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; class WritePartitionedRowsToFiles - extends PTransform< - PCollection, Iterable>>, PCollection> { + extends PTransform>>, PCollection> { private static final Logger LOG = LoggerFactory.getLogger(WritePartitionedRowsToFiles.class); private final DynamicDestinations dynamicDestinations; private final IcebergCatalogConfig catalogConfig; @@ -76,20 +75,18 @@ class WritePartitionedRowsToFiles } @Override - public PCollection expand( - PCollection, Iterable>> input) { + public PCollection expand(PCollection>> input) { Schema dataSchema = ((RowCoder) ((IterableCoder) - ((KvCoder, Iterable>) input.getCoder()) - .getValueCoder()) + ((KvCoder>) input.getCoder()).getValueCoder()) .getElemCoder()) .getSchema(); return input.apply( ParDo.of(new WriteDoFn(catalogConfig, dynamicDestinations, filePrefix, dataSchema))); } - private static class WriteDoFn extends DoFn, Iterable>, FileWriteResult> { + private static class WriteDoFn extends DoFn>, FileWriteResult> { private final DynamicDestinations dynamicDestinations; private final IcebergCatalogConfig catalogConfig; @@ -97,6 +94,7 @@ private static class WriteDoFn extends DoFn, Iterable>, private final Schema dataSchema; static final Cache LAST_REFRESHED_TABLE_CACHE = CacheBuilder.newBuilder().expireAfterAccess(10, TimeUnit.MINUTES).build(); + private @MonotonicNonNull Map partitionFieldMap; WriteDoFn( IcebergCatalogConfig catalogConfig, @@ -109,20 +107,34 @@ private static class WriteDoFn extends DoFn, Iterable>, this.dataSchema = dataSchema; } + private long id = UUID.randomUUID().getLeastSignificantBits(); + + @Setup + public void setup() { + id = UUID.randomUUID().getLeastSignificantBits(); + } + + @StartBundle + public void startBundle() { + System.out.printf("[%s] new bundle\n", id); + } + @ProcessElement public void processElement( - @Element KV, Iterable> element, OutputReceiver out) + @Element KV> element, OutputReceiver out) throws Exception { - String tableIdentifier = checkStateNotNull(element.getKey().getKey().getString(DESTINATION)); - String partitionPath = checkStateNotNull(element.getKey().getKey().getString(PARTITION)); + System.out.println(String.format("[%s] partition key: %s\n", id, element.getKey())); + String tableIdentifier = checkStateNotNull(element.getKey().getString(DESTINATION)); + String partitionPath = checkStateNotNull(element.getKey().getString(PARTITION)); IcebergDestination destination = dynamicDestinations.instantiateDestination(tableIdentifier); Table table = getOrCreateTable(destination, dataSchema); - // TODO(ahmedabu98): cache this - Map partitionFieldMap = Maps.newHashMap(); - for (PartitionField partitionField : table.spec().fields()) { - partitionFieldMap.put(partitionField.name(), partitionField); + if (partitionFieldMap == null) { + partitionFieldMap = Maps.newHashMap(); + for (PartitionField partitionField : table.spec().fields()) { + partitionFieldMap.put(partitionField.name(), partitionField); + } } partitionPath = getPartitionDataPath(partitionPath, partitionFieldMap); @@ -138,11 +150,14 @@ public void processElement( RecordWriter writer = new RecordWriter(table, destination.getFileFormat(), fileName, partitionData); - for (Row row : element.getValue()) { - Record record = IcebergUtils.beamRowToIcebergRecord(table.schema(), row); - writer.write(record); + try { + for (Row row : element.getValue()) { + Record record = IcebergUtils.beamRowToIcebergRecord(table.schema(), row); + writer.write(record); + } + } finally { + writer.close(); } - writer.close(); SerializableDataFile sdf = SerializableDataFile.from(writer.getDataFile(), partitionPath); out.output( @@ -205,6 +220,12 @@ Table getOrCreateTable(IcebergDestination destination, Schema dataSchema) { @Nullable Table table = null; synchronized (LAST_REFRESHED_TABLE_CACHE) { + lastRefreshedTable = LAST_REFRESHED_TABLE_CACHE.getIfPresent(identifier); + if (lastRefreshedTable != null && lastRefreshedTable.table != null) { + lastRefreshedTable.refreshIfStale(); + return lastRefreshedTable.table; + } + // Create namespace if it does not exist yet if (!namespace.isEmpty() && catalog instanceof SupportsNamespaces) { SupportsNamespaces supportsNamespaces = (SupportsNamespaces) catalog; @@ -214,6 +235,7 @@ Table getOrCreateTable(IcebergDestination destination, Schema dataSchema) { LOG.info("Created new namespace '{}'.", namespace); } catch (AlreadyExistsException ignored) { // race condition: another worker already created this namespace + LOG.info("Namespace `{}` already exists.", namespace); } } } diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteToPartitions.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteToPartitions.java index b17100cef780..310fa1bede41 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteToPartitions.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteToPartitions.java @@ -18,18 +18,21 @@ package org.apache.beam.sdk.io.iceberg; import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull; +import static org.apache.beam.sdk.values.TypeDescriptors.iterables; +import static org.apache.beam.sdk.values.TypeDescriptors.kvs; +import static org.apache.beam.sdk.values.TypeDescriptors.rows; import java.util.UUID; import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.RowCoder; import org.apache.beam.sdk.transforms.GroupIntoBatches; +import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime; import org.apache.beam.sdk.transforms.windowing.GlobalWindows; import org.apache.beam.sdk.transforms.windowing.Repeatedly; import org.apache.beam.sdk.transforms.windowing.Window; -import org.apache.beam.sdk.util.ShardedKey; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.Row; @@ -42,42 +45,59 @@ class WriteToPartitions extends PTransform>, IcebergWri private final DynamicDestinations dynamicDestinations; private final @Nullable Duration triggeringFrequency; private final String filePrefix; + private final boolean autoSharding; WriteToPartitions( IcebergCatalogConfig catalogConfig, DynamicDestinations dynamicDestinations, - @Nullable Duration triggeringFrequency) { + @Nullable Duration triggeringFrequency, + boolean autoSharding) { this.dynamicDestinations = dynamicDestinations; this.catalogConfig = catalogConfig; this.triggeringFrequency = triggeringFrequency; // single unique prefix per write transform this.filePrefix = UUID.randomUUID().toString(); + this.autoSharding = autoSharding; } - @Override - public IcebergWriteResult expand(PCollection> input) { - boolean unbounded = IcebergUtils.isUnbounded(input); + private PCollection>> groupByPartition(PCollection> input) { + RowCoder destinationCoder = RowCoder.of(AssignDestinationsAndPartitions.OUTPUT_SCHEMA); + RowCoder dataCoder = RowCoder.of(dynamicDestinations.getDataSchema()); GroupIntoBatches groupIntoPartitions = GroupIntoBatches.ofByteSize(DEFAULT_BYTES_PER_FILE); - if (unbounded && triggeringFrequency != null) { + if (IcebergUtils.isUnbounded(input) && triggeringFrequency != null) { groupIntoPartitions = groupIntoPartitions.withMaxBufferingDuration(triggeringFrequency); } - PCollection, Iterable>> groupedRows = - input - .apply(groupIntoPartitions.withShardedKey()) - .setCoder( - KvCoder.of( - org.apache.beam.sdk.util.ShardedKey.Coder.of( - RowCoder.of(AssignDestinationsAndPartitions.OUTPUT_SCHEMA)), - IterableCoder.of(RowCoder.of(dynamicDestinations.getDataSchema())))); + if (autoSharding) { + return input + .apply(groupIntoPartitions.withShardedKey()) + .setCoder( + KvCoder.of( + org.apache.beam.sdk.util.ShardedKey.Coder.of(destinationCoder), + IterableCoder.of(dataCoder))) + .apply( + "DropShardId", + MapElements.into(kvs(rows(), iterables(rows()))) + .via(kv -> KV.of(kv.getKey().getKey(), kv.getValue()))) + .setCoder(KvCoder.of(destinationCoder, IterableCoder.of(dataCoder))); + } else { + return input + .apply(groupIntoPartitions) + .setCoder(KvCoder.of(destinationCoder, IterableCoder.of(dataCoder))); + } + } + + @Override + public IcebergWriteResult expand(PCollection> input) { + PCollection>> groupedRows = groupByPartition(input); PCollection writtenFiles = groupedRows.apply( new WritePartitionedRowsToFiles(catalogConfig, dynamicDestinations, filePrefix)); - if (unbounded && triggeringFrequency != null) { + if (IcebergUtils.isUnbounded(input) && triggeringFrequency != null) { writtenFiles = writtenFiles.apply( "ApplyUserTrigger", diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/BeamRowWrapperTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/BeamRowWrapperTest.java index 2758c9d1188c..bd8cead72987 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/BeamRowWrapperTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/BeamRowWrapperTest.java @@ -207,9 +207,7 @@ public void testTimeLogicalTypeConversion() { @Test public void testFixedPrecisionNumericLogicalTypeConversion() { - assertEquals( - ROW.getLogicalTypeValue(11, FixedPrecisionNumeric.class), - WRAPPER.get(11, BigDecimal.class)); + assertEquals(ROW.getLogicalTypeValue(11, BigDecimal.class), WRAPPER.get(11, BigDecimal.class)); } @Test diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOWriteTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOWriteTest.java index 1f43b8e45ec4..5ccb9b2427ea 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOWriteTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOWriteTest.java @@ -19,24 +19,52 @@ import static java.util.Arrays.asList; import static org.apache.beam.sdk.io.iceberg.IcebergUtils.beamRowToIcebergRecord; +import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; +import static org.apache.beam.sdk.values.TypeDescriptors.integers; +import static org.apache.beam.sdk.values.TypeDescriptors.kvs; +import static org.apache.beam.sdk.values.TypeDescriptors.strings; import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assume.assumeTrue; import java.io.Serializable; +import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.LongStream; +import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.VarLongCoder; +import org.apache.beam.sdk.io.GenerateSequence; +import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.MetricNameFilter; +import org.apache.beam.sdk.metrics.Metrics; +import org.apache.beam.sdk.metrics.MetricsFilter; import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.SchemaCoder; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.testing.TestStream; import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.Flatten; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.Redistribute; +import org.apache.beam.sdk.transforms.Values; +import org.apache.beam.sdk.transforms.WithKeys; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionList; import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.TypeDescriptors; import org.apache.beam.sdk.values.ValueInSingleWindow; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; @@ -48,6 +76,7 @@ import org.apache.iceberg.DataFile; import org.apache.iceberg.DistributionMode; import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Table; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.SupportsNamespaces; @@ -75,13 +104,17 @@ public class IcebergIOWriteTest implements Serializable { private static final Logger LOG = LoggerFactory.getLogger(IcebergIOWriteTest.class); + private static final String NONE = "none"; + private static final String HASH = "hash"; + private static final String HASH_WITH_AUTOSHARDING = "hashWithAutoSharding"; + @Parameterized.Parameters public static Iterable data() { - return asList(new Object[][] {{DistributionMode.NONE}, {DistributionMode.HASH}}); + return asList(new Object[][] {{NONE}, {HASH}, {HASH_WITH_AUTOSHARDING}}); } @Parameterized.Parameter(0) - public DistributionMode distributionMode; + public String distributionMode; @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); @@ -90,6 +123,28 @@ public static Iterable data() { @Rule public transient TestPipeline testPipeline = TestPipeline.create(); + private IcebergIO.WriteRows writeTransform( + IcebergCatalogConfig catalog, TableIdentifier tableId) { + IcebergIO.WriteRows write = IcebergIO.writeRows(catalog).to(tableId); + return applyDistribution(write); + } + + private IcebergIO.WriteRows writeTransform( + IcebergCatalogConfig catalog, DynamicDestinations dynamicDestinations) { + IcebergIO.WriteRows write = IcebergIO.writeRows(catalog).to(dynamicDestinations); + return applyDistribution(write); + } + + private IcebergIO.WriteRows applyDistribution(IcebergIO.WriteRows write) { + if (distributionMode.contains(HASH)) { + write = write.withDistributionMode(DistributionMode.HASH); + } + if (distributionMode.equals(HASH_WITH_AUTOSHARDING)) { + write = write.withAutosharding(); + } + return write; + } + @Test public void testSimpleAppend() throws Exception { TableIdentifier tableId = @@ -110,9 +165,7 @@ public void testSimpleAppend() throws Exception { testPipeline .apply("Records To Add", Create.of(TestFixtures.asRows(TestFixtures.FILE1SNAPSHOT1))) .setRowSchema(IcebergUtils.icebergSchemaToBeamSchema(TestFixtures.SCHEMA)) - .apply( - "Append To Table", - IcebergIO.writeRows(catalog).to(tableId).withDistributionMode(distributionMode)); + .apply("Append To Table", writeTransform(catalog, tableId)); LOG.info("Executing pipeline"); testPipeline.run().waitUntilFinish(); @@ -142,9 +195,7 @@ public void testCreateNamespaceAndTable() { testPipeline .apply("Records To Add", Create.of(TestFixtures.asRows(TestFixtures.FILE1SNAPSHOT1))) .setRowSchema(IcebergUtils.icebergSchemaToBeamSchema(TestFixtures.SCHEMA)) - .apply( - "Append To Table", - IcebergIO.writeRows(catalog).to(tableId).withDistributionMode(distributionMode)); + .apply("Append To Table", writeTransform(catalog, tableId)); assertFalse(((SupportsNamespaces) catalog.catalog()).namespaceExists(newNamespace)); LOG.info("Executing pipeline"); @@ -215,11 +266,7 @@ public IcebergDestination instantiateDestination(String dest) { TestFixtures.FILE1SNAPSHOT2, TestFixtures.FILE1SNAPSHOT3)))) .setRowSchema(IcebergUtils.icebergSchemaToBeamSchema(TestFixtures.SCHEMA)) - .apply( - "Append To Table", - IcebergIO.writeRows(catalog) - .to(dynamicDestinations) - .withDistributionMode(distributionMode)); + .apply("Append To Table", writeTransform(catalog, dynamicDestinations)); LOG.info("Executing pipeline"); testPipeline.run().waitUntilFinish(); @@ -246,7 +293,6 @@ public IcebergDestination instantiateDestination(String dest) { */ @Test public void testDynamicDestinationsWithSpillover() throws Exception { - assumeTrue(distributionMode.equals(DistributionMode.NONE)); final String salt = Long.toString(UUID.randomUUID().hashCode(), 16); // Create far more tables than the max writers per bundle @@ -313,7 +359,7 @@ public IcebergDestination instantiateDestination(String dest) { testPipeline .apply("Records To Add", Create.of(TestFixtures.asRows(elements))) .setRowSchema(IcebergUtils.icebergSchemaToBeamSchema(TestFixtures.SCHEMA)) - .apply("Append To Table", IcebergIO.writeRows(catalog).to(dynamicDestinations)); + .apply("Append To Table", writeTransform(catalog, dynamicDestinations)); LOG.info("Executing pipeline"); testPipeline.run().waitUntilFinish(); @@ -406,10 +452,8 @@ public void testStreamingWrite() { .apply("Stream Records", stream) .apply( "Append To Table", - IcebergIO.writeRows(catalog) - .to(tableId) - .withTriggeringFrequency(Duration.standardSeconds(3)) - .withDistributionMode(distributionMode)) + writeTransform(catalog, tableId) + .withTriggeringFrequency(Duration.standardSeconds(3))) .getSnapshots(); // verify that 2 snapshots are created (one per triggering interval) PCollection snapshots = output.apply(Count.globally()); @@ -421,4 +465,310 @@ public void testStreamingWrite() { List writtenRecords = ImmutableList.copyOf(IcebergGenerics.read(table).build()); assertThat(writtenRecords, Matchers.containsInAnyOrder(TestFixtures.FILE1SNAPSHOT1.toArray())); } + + @Test + public void testHashDistribution() { + assumeTrue(distributionMode.equals(HASH_WITH_AUTOSHARDING)); + Schema schema = Schema.builder().addInt64Field("id").addStringField("name").build(); + + TableIdentifier tableId = + TableIdentifier.of("default", "hash_" + Long.toString(UUID.randomUUID().hashCode(), 16)); + Map catalogProps = + ImmutableMap.builder() + .put("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP) + .put("warehouse", warehouse.location) + .build(); + IcebergCatalogConfig catalog = + IcebergCatalogConfig.builder() + .setCatalogName("name") + .setCatalogProperties(catalogProps) + .build(); + + // create table with two partitions + catalog + .catalog() + .createTable( + tableId, + IcebergUtils.beamSchemaToIcebergSchema(schema), + PartitionSpec.builderFor(IcebergUtils.beamSchemaToIcebergSchema(schema)) + .bucket("id", 2) + .build()); + + // Prepare 100 rows and split them up into separate keys. + // The "none" distribution will process each key in a separate writer DoFn, + // essentially creating one file per parallel thread. This means one file per + // record since each record is in its own key. + // The "hash" distribution will group records by partition key first, resulting + // in a much smaller number of files created. + PCollection rows = + testPipeline + .apply(GenerateSequence.from(0).to(100)) + .apply( + "Make rows", + MapElements.into(TypeDescriptors.rows()) + .via(i -> Row.withSchema(schema).addValues(i, "name_" + i).build())) + .setRowSchema(schema) + .apply(WithKeys.of(1L)) + .setCoder(KvCoder.of(VarLongCoder.of(), SchemaCoder.of(schema))) + .apply(Redistribute.byKey()) + .apply(Values.create()); + + Function, KV>> getAddedFilesFunc = + (distribution) -> + MapElements.into(kvs(strings(), integers())) + .via( + snapshot -> + KV.of( + distribution, + Integer.parseInt( + checkStateNotNull(snapshot.getValue().getSummary()) + .get("added-data-files")))); + + // 1. Write files without any additional config + PCollection> noneDistributionAddedFiles = + rows.apply( + "none distribution write", + IcebergIO.writeRows(catalog) + .to(tableId) + .withDistributionMode(DistributionMode.NONE)) + .getSnapshots() + .apply("Get none files", getAddedFilesFunc.apply(NONE)); + // 2. Write files with hash distribution + PCollection> hashDistributionAddedFiles = + rows.apply( + "hash distribution write", + IcebergIO.writeRows(catalog) + .to(tableId) + .withDistributionMode(DistributionMode.HASH)) + .getSnapshots() + .apply("Get hash files", getAddedFilesFunc.apply(HASH)); + // 3. Write files with hash distribution AND auto-sharding + PCollection> hashAutoShardingDistributionAddedFiles = + rows.apply( + "hash distribution + autosharding write", + IcebergIO.writeRows(catalog) + .to(tableId) + .withDistributionMode(DistributionMode.HASH) + .withAutosharding()) + .getSnapshots() + .apply("Get hash autosharded files", getAddedFilesFunc.apply(HASH_WITH_AUTOSHARDING)); + + PCollectionList.of( + Arrays.asList( + hashDistributionAddedFiles, + noneDistributionAddedFiles, + hashAutoShardingDistributionAddedFiles)) + .apply(Flatten.pCollections()) + .apply("add dummy key", WithKeys.of(1)) + .apply("group together", GroupByKey.create()) + .apply("unwrap values", Values.create()) + .apply( + "validate num files", + ParDo.of( + new DoFn>, Void>() { + @ProcessElement + public void process(@Element Iterable> sums) { + List> sumList = Lists.newArrayList(sums.iterator()); + assertEquals(3, sumList.size()); + + int numFilesAddedNoneDist = + Iterables.getOnlyElement( + sumList.stream() + .filter(kv -> kv.getKey().equals(NONE)) + .map(KV::getValue) + .collect(Collectors.toList())); + + int numFilesAddedHashDist = + Iterables.getOnlyElement( + sumList.stream() + .filter(kv -> kv.getKey().equals(HASH)) + .map(KV::getValue) + .collect(Collectors.toList())); + + int numFilesAddedHashAutoShardingDist = + Iterables.getOnlyElement( + sumList.stream() + .filter(kv -> kv.getKey().equals(HASH_WITH_AUTOSHARDING)) + .map(KV::getValue) + .collect(Collectors.toList())); + + System.out.println("none: " + numFilesAddedNoneDist); + System.out.println("hash: " + numFilesAddedHashDist); + System.out.println( + "hash with autosharding: " + numFilesAddedHashAutoShardingDist); + // plain hash distribution should have exactly the same number of partitions + assertEquals(2, numFilesAddedHashAutoShardingDist); + // hash with autosharding may create sub-shards and lead to more than just 2 + // files. + // should still be less than 'none' distribution though + assertTrue(numFilesAddedHashDist < numFilesAddedNoneDist); + } + })); + + testPipeline.run().waitUntilFinish(); + } + + @Test + public void testHashDistributionStreaming() { + assumeTrue(distributionMode.equals(HASH_WITH_AUTOSHARDING)); + Schema schema = Schema.builder().addInt64Field("id").addStringField("name").build(); + + TableIdentifier tableId = + TableIdentifier.of( + "default", "hash_streaming" + Long.toString(UUID.randomUUID().hashCode(), 16)); + Map catalogProps = + ImmutableMap.builder() + .put("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP) + .put("warehouse", warehouse.location) + .build(); + IcebergCatalogConfig catalog = + IcebergCatalogConfig.builder() + .setCatalogName("name") + .setCatalogProperties(catalogProps) + .build(); + + // create table with two partitions + catalog + .catalog() + .createTable( + tableId, + IcebergUtils.beamSchemaToIcebergSchema(schema), + PartitionSpec.builderFor(IcebergUtils.beamSchemaToIcebergSchema(schema)) + .bucket("id", 2) + .build()); + + // Prepare 100 rows and split them up into separate keys. + // The "none" distribution will process each key in a separate writer DoFn, + // essentially creating one file per parallel thread. This means one file per + // record since each record is in its own key. + // The "hash" distribution will group records by partition key first, resulting + // in a much smaller number of files created. + PCollection rows = + testPipeline + .apply( + TestStream.create(VarLongCoder.of()) + .addElements(0L, LongStream.range(1, 10).boxed().toArray(Long[]::new)) + .advanceProcessingTime(Duration.standardSeconds(10)) + .addElements(10L, LongStream.range(11, 20).boxed().toArray(Long[]::new)) + .advanceProcessingTime(Duration.standardSeconds(10)) + .addElements(20L, LongStream.range(21, 30).boxed().toArray(Long[]::new)) + .advanceProcessingTime(Duration.standardSeconds(10)) + .addElements(30L, LongStream.range(31, 40).boxed().toArray(Long[]::new)) + .advanceProcessingTime(Duration.standardSeconds(10)) + .addElements(40L, LongStream.range(41, 50).boxed().toArray(Long[]::new)) + .advanceProcessingTime(Duration.standardSeconds(10)) + .advanceWatermarkToInfinity()) + .apply( + "Make rows", + MapElements.into(TypeDescriptors.rows()) + .via(i -> Row.withSchema(schema).addValues(i, "name_" + i).build())) + .setRowSchema(schema) + .apply(WithKeys.of(r -> r.getInt64("id"))) + .setCoder(KvCoder.of(VarLongCoder.of(), SchemaCoder.of(schema))) + .apply(Redistribute.byKey()) + .apply(Values.create()); + + Function, KV>> getAddedFilesFunc = + (distribution) -> + MapElements.into(kvs(strings(), integers())) + .via( + snapshot -> + KV.of( + distribution, + Integer.parseInt( + checkStateNotNull(snapshot.getValue().getSummary()) + .get("added-data-files")))); + + // 1. Write files without any additional config + PCollection> noneDistributionAddedFiles = + rows.apply( + "none distribution write", + IcebergIO.writeRows(catalog) + .to(tableId) + .withTriggeringFrequency(Duration.standardSeconds(5)) + .withDistributionMode(DistributionMode.NONE)) + .getSnapshots() + .apply("Get none files", getAddedFilesFunc.apply(NONE)); + // 2. Write files with hash distribution + PCollection> hashDistributionAddedFiles = + rows.apply( + "hash distribution write", + IcebergIO.writeRows(catalog) + .to(tableId) + .withTriggeringFrequency(Duration.standardSeconds(5)) + .withDistributionMode(DistributionMode.HASH)) + .getSnapshots() + .apply("Get hash files", getAddedFilesFunc.apply(HASH)); + // 3. Write files with hash distribution AND auto-sharding + PCollection> hashAutoShardingDistributionAddedFiles = + rows.apply( + "hash distribution + autosharding write", + IcebergIO.writeRows(catalog) + .to(tableId) + .withTriggeringFrequency(Duration.standardSeconds(5)) + .withDistributionMode(DistributionMode.HASH) + .withAutosharding()) + .getSnapshots() + .apply("Get hash autosharded files", getAddedFilesFunc.apply(HASH_WITH_AUTOSHARDING)); + + PCollectionList.of( + Arrays.asList( + hashDistributionAddedFiles, + noneDistributionAddedFiles, + hashAutoShardingDistributionAddedFiles)) + .apply(Flatten.pCollections()) + .apply("add dummy key", WithKeys.of(1)) + .apply("group together", GroupByKey.create()) + .apply( + "validate num files", + ParDo.of( + new DoFn>>, Void>() { + private Counter numWaves = Metrics.counter(IcebergIOWriteTest.class, "numWaves"); + + @ProcessElement + public void process(@Element KV>> sums) { + List> sumList = + Lists.newArrayList(sums.getValue().iterator()); + // each wave should have one snapshot per write branch + assertEquals(3, sumList.size()); + + // get the number of files written by each branch + int numFilesAddedHashDist = + Iterables.getOnlyElement( + sumList.stream() + .filter(kv -> kv.getKey().equals(HASH)) + .map(KV::getValue) + .collect(Collectors.toList())); + + // plain hash distribution should have exactly the same number of partitions + assertEquals(2, numFilesAddedHashDist); + // hash with autosharding may create sub-shards and lead to more than just 2 + // files. + // In a production runner like Dataflow, hash + autosharding would still + // make less files than 'none' distribution. + // We're testing with DirectRunner though, which doesn't have a smart + // autosharding implementation, so it may sometimes make more files + // than even 'none' distribution. + // assertTrue(numFilesAddedHashAutoShardingDist < numFilesAddedNoneDist); + numWaves.inc(); + } + })); + + PipelineResult result = testPipeline.run(); + result.waitUntilFinish(); + + // verify total number of snapshot commit waves + long numWaves = + result + .metrics() + .queryMetrics( + MetricsFilter.builder() + .addNameFilter(MetricNameFilter.named(IcebergIOWriteTest.class, "numWaves")) + .build()) + .getCounters() + .iterator() + .next() + .getCommitted(); + assertEquals(5L, numWaves); + } } diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProviderTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProviderTest.java index 80b0389d78ea..c5fc5a6b6fe7 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProviderTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProviderTest.java @@ -25,6 +25,7 @@ import static org.apache.iceberg.util.DateTimeUtil.timestampFromMicros; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; +import static org.junit.Assume.assumeTrue; import java.time.LocalDate; import java.time.LocalDateTime; @@ -387,8 +388,18 @@ public Void apply(Iterable input) { } } + @Test + public void testWritePartitionedDataWithAutosharding() { + assumeTrue(distributionMode.equals(DistributionMode.HASH)); + writePartitionedData(true); + } + @Test public void testWritePartitionedData() { + writePartitionedData(false); + } + + public void writePartitionedData(boolean autosharding) { Schema schema = Schema.builder() .addStringField("str") @@ -432,7 +443,9 @@ public void testWritePartitionedData() { "catalog_properties", ImmutableMap.of("type", "hadoop", "warehouse", warehouse.location), "distribution_mode", - distributionMode.name()); + distributionMode.name(), + "autosharding", + autosharding); List rows = new ArrayList<>(); for (int i = 0; i < 30; i++) { diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/IcebergCatalogBaseIT.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/IcebergCatalogBaseIT.java index a559523a2926..74408d67ed86 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/IcebergCatalogBaseIT.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/IcebergCatalogBaseIT.java @@ -851,6 +851,7 @@ private void writeToDynamicDestinations( Preconditions.checkState(filterOp == null || !filterOp.equals("only")); writeConfig.put("partition_fields", Arrays.asList("bool_field", "modulo_5")); writeConfig.put("distribution_mode", "hash"); + writeConfig.put("autosharding", true); } // Write with Beam From 945783dd49d411a95857884b3e35cc62665c3567 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Fri, 1 May 2026 13:41:58 -0400 Subject: [PATCH 09/14] cleanup --- .../IcebergWriteSchemaTransformProvider.java | 2 +- .../io/iceberg/WritePartitionedRowsToFiles.java | 15 +-------------- .../beam/sdk/io/iceberg/IcebergIOWriteTest.java | 7 +++++-- 3 files changed, 7 insertions(+), 17 deletions(-) diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java index 639f222696ff..8db4fb77a8e8 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java @@ -186,7 +186,7 @@ public abstract static class Builder { public abstract Builder setDistributionMode(String mode); - public abstract Builder setAutosharding(boolean autosharding); + public abstract Builder setAutosharding(Boolean autosharding); public abstract Configuration build(); } diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WritePartitionedRowsToFiles.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WritePartitionedRowsToFiles.java index 9c71e5cdafe8..2508f16083c2 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WritePartitionedRowsToFiles.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WritePartitionedRowsToFiles.java @@ -107,23 +107,10 @@ private static class WriteDoFn extends DoFn>, FileWriteRes this.dataSchema = dataSchema; } - private long id = UUID.randomUUID().getLeastSignificantBits(); - - @Setup - public void setup() { - id = UUID.randomUUID().getLeastSignificantBits(); - } - - @StartBundle - public void startBundle() { - System.out.printf("[%s] new bundle\n", id); - } - @ProcessElement public void processElement( @Element KV> element, OutputReceiver out) throws Exception { - System.out.println(String.format("[%s] partition key: %s\n", id, element.getKey())); String tableIdentifier = checkStateNotNull(element.getKey().getString(DESTINATION)); String partitionPath = checkStateNotNull(element.getKey().getString(PARTITION)); @@ -200,7 +187,6 @@ void refreshIfStale() { } Table getOrCreateTable(IcebergDestination destination, Schema dataSchema) { - Catalog catalog = catalogConfig.catalog(); TableIdentifier identifier = destination.getTableIdentifier(); @Nullable LastRefreshedTable lastRefreshedTable = LAST_REFRESHED_TABLE_CACHE.getIfPresent(identifier); @@ -226,6 +212,7 @@ Table getOrCreateTable(IcebergDestination destination, Schema dataSchema) { return lastRefreshedTable.table; } + Catalog catalog = catalogConfig.catalog(); // Create namespace if it does not exist yet if (!namespace.isEmpty() && catalog instanceof SupportsNamespaces) { SupportsNamespaces supportsNamespaces = (SupportsNamespaces) catalog; diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOWriteTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOWriteTest.java index 5ccb9b2427ea..52d92911f4e4 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOWriteTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOWriteTest.java @@ -597,7 +597,7 @@ public void process(@Element Iterable> sums) { System.out.println( "hash with autosharding: " + numFilesAddedHashAutoShardingDist); // plain hash distribution should have exactly the same number of partitions - assertEquals(2, numFilesAddedHashAutoShardingDist); + assertEquals(2, numFilesAddedHashDist); // hash with autosharding may create sub-shards and lead to more than just 2 // files. // should still be less than 'none' distribution though @@ -657,6 +657,7 @@ public void testHashDistributionStreaming() { .advanceProcessingTime(Duration.standardSeconds(10)) .addElements(40L, LongStream.range(41, 50).boxed().toArray(Long[]::new)) .advanceProcessingTime(Duration.standardSeconds(10)) + .advanceProcessingTime(Duration.standardSeconds(10)) .advanceWatermarkToInfinity()) .apply( "Make rows", @@ -723,13 +724,15 @@ public void testHashDistributionStreaming() { "validate num files", ParDo.of( new DoFn>>, Void>() { - private Counter numWaves = Metrics.counter(IcebergIOWriteTest.class, "numWaves"); + private final Counter numWaves = + Metrics.counter(IcebergIOWriteTest.class, "numWaves"); @ProcessElement public void process(@Element KV>> sums) { List> sumList = Lists.newArrayList(sums.getValue().iterator()); // each wave should have one snapshot per write branch + System.out.println("list: " + sumList); assertEquals(3, sumList.size()); // get the number of files written by each branch From 9ada7da59c58fea1153b30bb4d61ebedc8a425e0 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Fri, 1 May 2026 13:52:47 -0400 Subject: [PATCH 10/14] cleanup --- .../iceberg/WritePartitionedRowsToFiles.java | 39 +++++++++++-------- 1 file changed, 22 insertions(+), 17 deletions(-) diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WritePartitionedRowsToFiles.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WritePartitionedRowsToFiles.java index 2508f16083c2..d4c6b2938fb3 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WritePartitionedRowsToFiles.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WritePartitionedRowsToFiles.java @@ -53,7 +53,6 @@ import org.apache.iceberg.data.Record; import org.apache.iceberg.exceptions.AlreadyExistsException; import org.apache.iceberg.exceptions.NoSuchTableException; -import org.checkerframework.checker.nullness.qual.MonotonicNonNull; import org.checkerframework.checker.nullness.qual.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -94,7 +93,6 @@ private static class WriteDoFn extends DoFn>, FileWriteRes private final Schema dataSchema; static final Cache LAST_REFRESHED_TABLE_CACHE = CacheBuilder.newBuilder().expireAfterAccess(10, TimeUnit.MINUTES).build(); - private @MonotonicNonNull Map partitionFieldMap; WriteDoFn( IcebergCatalogConfig catalogConfig, @@ -115,15 +113,9 @@ public void processElement( String partitionPath = checkStateNotNull(element.getKey().getString(PARTITION)); IcebergDestination destination = dynamicDestinations.instantiateDestination(tableIdentifier); - Table table = getOrCreateTable(destination, dataSchema); - - if (partitionFieldMap == null) { - partitionFieldMap = Maps.newHashMap(); - for (PartitionField partitionField : table.spec().fields()) { - partitionFieldMap.put(partitionField.name(), partitionField); - } - } - partitionPath = getPartitionDataPath(partitionPath, partitionFieldMap); + LastRefreshedTable lastRefreshedTable = getOrCreateTable(destination, dataSchema); + Table table = lastRefreshedTable.table; + partitionPath = getPartitionDataPath(partitionPath, lastRefreshedTable.partitionFieldMap); StructLike partitionData = table.spec().isPartitioned() @@ -158,10 +150,16 @@ static final class LastRefreshedTable { final Table table; volatile Instant lastRefreshTime; static final Duration STALENESS_THRESHOLD = Duration.ofMinutes(2); + private int specId; + Map partitionFieldMap = Maps.newHashMap(); LastRefreshedTable(Table table, Instant lastRefreshTime) { this.table = table; + this.specId = table.spec().specId(); this.lastRefreshTime = lastRefreshTime; + for (PartitionField partitionField : table.spec().fields()) { + partitionFieldMap.put(partitionField.name(), partitionField); + } } /** @@ -181,18 +179,25 @@ void refreshIfStale() { if (lastRefreshTime.isBefore(Instant.now().minus(STALENESS_THRESHOLD))) { table.refresh(); lastRefreshTime = Instant.now(); + if (table.spec().specId() != this.specId) { + partitionFieldMap = Maps.newHashMap(); + for (PartitionField partitionField : table.spec().fields()) { + partitionFieldMap.put(partitionField.name(), partitionField); + } + this.specId = table.spec().specId(); + } } } } } - Table getOrCreateTable(IcebergDestination destination, Schema dataSchema) { + LastRefreshedTable getOrCreateTable(IcebergDestination destination, Schema dataSchema) { TableIdentifier identifier = destination.getTableIdentifier(); @Nullable LastRefreshedTable lastRefreshedTable = LAST_REFRESHED_TABLE_CACHE.getIfPresent(identifier); - if (lastRefreshedTable != null && lastRefreshedTable.table != null) { + if (lastRefreshedTable != null) { lastRefreshedTable.refreshIfStale(); - return lastRefreshedTable.table; + return lastRefreshedTable; } Namespace namespace = identifier.namespace(); @@ -207,9 +212,9 @@ Table getOrCreateTable(IcebergDestination destination, Schema dataSchema) { @Nullable Table table = null; synchronized (LAST_REFRESHED_TABLE_CACHE) { lastRefreshedTable = LAST_REFRESHED_TABLE_CACHE.getIfPresent(identifier); - if (lastRefreshedTable != null && lastRefreshedTable.table != null) { + if (lastRefreshedTable != null) { lastRefreshedTable.refreshIfStale(); - return lastRefreshedTable.table; + return lastRefreshedTable; } Catalog catalog = catalogConfig.catalog(); @@ -252,7 +257,7 @@ Table getOrCreateTable(IcebergDestination destination, Schema dataSchema) { } lastRefreshedTable = new LastRefreshedTable(table, Instant.now()); LAST_REFRESHED_TABLE_CACHE.put(identifier, lastRefreshedTable); - return table; + return lastRefreshedTable; } } } From 1beac3b04f305409bc93b10abd3f972d0e65457f Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Mon, 4 May 2026 11:23:55 -0400 Subject: [PATCH 11/14] volatile map --- .../apache/beam/sdk/io/iceberg/WritePartitionedRowsToFiles.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WritePartitionedRowsToFiles.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WritePartitionedRowsToFiles.java index d4c6b2938fb3..54ad120f1aca 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WritePartitionedRowsToFiles.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WritePartitionedRowsToFiles.java @@ -151,7 +151,7 @@ static final class LastRefreshedTable { volatile Instant lastRefreshTime; static final Duration STALENESS_THRESHOLD = Duration.ofMinutes(2); private int specId; - Map partitionFieldMap = Maps.newHashMap(); + volatile Map partitionFieldMap = Maps.newHashMap(); LastRefreshedTable(Table table, Instant lastRefreshTime) { this.table = table; From 6e3508166e8aafaf4bcb69ea8c82483433d9ba6d Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Mon, 4 May 2026 12:34:32 -0400 Subject: [PATCH 12/14] add to changes.md --- CHANGES.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/CHANGES.md b/CHANGES.md index 8574448d0898..0651b736e46e 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -66,6 +66,7 @@ * Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). * IcebergIO: support declaring a table's sort order on dynamic table creation via the new `sort_fields` config ([#38269](https://github.com/apache/beam/issues/38269)). +* IcebergIO: support writing with hash distribution mode, and with autosharding ([#38061](https://github.com/apache/beam/pull/38061)) ## New Features / Improvements @@ -2434,4 +2435,4 @@ Schema Options, it will be removed in version `2.23.0`. ([BEAM-9704](https://iss ## Highlights -- For versions 2.19.0 and older release notes are available on [Apache Beam Blog](https://beam.apache.org/blog/). \ No newline at end of file +- For versions 2.19.0 and older release notes are available on [Apache Beam Blog](https://beam.apache.org/blog/). From b62e3a4f0abb058d9f46c7ef1066cdbc42f0b9c8 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Mon, 4 May 2026 15:36:47 -0400 Subject: [PATCH 13/14] format --- CHANGES.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGES.md b/CHANGES.md index 0651b736e46e..dc1994fdd1ed 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -66,7 +66,7 @@ * Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). * IcebergIO: support declaring a table's sort order on dynamic table creation via the new `sort_fields` config ([#38269](https://github.com/apache/beam/issues/38269)). -* IcebergIO: support writing with hash distribution mode, and with autosharding ([#38061](https://github.com/apache/beam/pull/38061)) +* IcebergIO: support writing with hash distribution mode, and with autosharding ([#38061](https://github.com/apache/beam/pull/38061)). ## New Features / Improvements From 65297d3e5edcdd791753ad5625ac08109a2f7dc6 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Mon, 4 May 2026 15:43:38 -0400 Subject: [PATCH 14/14] format --- CHANGES.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGES.md b/CHANGES.md index dc1994fdd1ed..f9b9f1d28483 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -66,7 +66,7 @@ * Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). * IcebergIO: support declaring a table's sort order on dynamic table creation via the new `sort_fields` config ([#38269](https://github.com/apache/beam/issues/38269)). -* IcebergIO: support writing with hash distribution mode, and with autosharding ([#38061](https://github.com/apache/beam/pull/38061)). +* IcebergIO: support writing with hash distribution mode, and with autosharding ([#38061](https://github.com/apache/beam/issues/38061))). ## New Features / Improvements