diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/cdc/ChangelogDescriptor.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/cdc/ChangelogDescriptor.java
new file mode 100644
index 000000000000..710c21c64ac9
--- /dev/null
+++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/cdc/ChangelogDescriptor.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.iceberg.cdc;
+
+import com.google.auto.value.AutoValue;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.SchemaCoder;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.annotations.SchemaFieldNumber;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * Descriptor for a set of {@link SerializableChangelogTask}s.
+ *
+ *
This carries commit-sourced metadata for all rows produced from the task group. These values
+ * are not read from data files; they are appended to final CDC output rows by {@link
+ * CdcOutputUtils#outputRow} when requested.
+ */
+@DefaultSchema(AutoValueSchema.class)
+@AutoValue
+public abstract class ChangelogDescriptor {
+ public static Builder builder() {
+ return new AutoValue_ChangelogDescriptor.Builder();
+ }
+
+ @SuppressWarnings("nullness")
+ public static SchemaCoder coder(Schema overlapSchema) {
+ Schema descriptorSchema =
+ Schema.builder()
+ .addStringField("tableIdentifierString")
+ .addInt64Field("snapshotSequenceNumber")
+ .addInt64Field("commitSnapshotId")
+ .addNullableField("overlapLower", Schema.FieldType.row(overlapSchema))
+ .addNullableField("overlapUpper", Schema.FieldType.row(overlapSchema))
+ .build();
+
+ return SchemaCoder.of(
+ descriptorSchema,
+ TypeDescriptor.of(ChangelogDescriptor.class),
+ descriptor ->
+ Row.withSchema(descriptorSchema)
+ .addValues(
+ descriptor.getTableIdentifierString(),
+ descriptor.getSnapshotSequenceNumber(),
+ descriptor.getCommitSnapshotId(),
+ descriptor.getOverlapLower(),
+ descriptor.getOverlapUpper())
+ .build(),
+ row ->
+ ChangelogDescriptor.builder()
+ .setTableIdentifierString(row.getString("tableIdentifierString"))
+ .setSnapshotSequenceNumber(row.getInt64("snapshotSequenceNumber"))
+ .setCommitSnapshotId(row.getInt64("commitSnapshotId"))
+ .setOverlapLower(row.getRow("overlapLower"))
+ .setOverlapUpper(row.getRow("overlapUpper"))
+ .build());
+ }
+
+ @SchemaFieldNumber("0")
+ public abstract String getTableIdentifierString();
+
+ @SchemaFieldNumber("1")
+ public abstract long getSnapshotSequenceNumber();
+
+ @SchemaFieldNumber("2")
+ public abstract long getCommitSnapshotId();
+
+ @SchemaFieldNumber("3")
+ public abstract @Nullable Row getOverlapLower();
+
+ @SchemaFieldNumber("4")
+ public abstract @Nullable Row getOverlapUpper();
+
+ @AutoValue.Builder
+ public abstract static class Builder {
+ abstract Builder setTableIdentifierString(String table);
+
+ abstract Builder setSnapshotSequenceNumber(long sequenceNumber);
+
+ abstract Builder setCommitSnapshotId(long snapshotId);
+
+ abstract Builder setOverlapLower(@Nullable Row overlapLower);
+
+ abstract Builder setOverlapUpper(@Nullable Row overlapUpper);
+
+ abstract ChangelogDescriptor build();
+ }
+}
diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/cdc/ChangelogScanner.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/cdc/ChangelogScanner.java
new file mode 100644
index 000000000000..6d35b4ec41be
--- /dev/null
+++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/cdc/ChangelogScanner.java
@@ -0,0 +1,1002 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.iceberg.cdc;
+
+import static java.lang.String.format;
+import static org.apache.beam.sdk.io.iceberg.cdc.SerializableChangelogTask.Type.ADDED_ROWS;
+import static org.apache.beam.sdk.io.iceberg.cdc.SerializableChangelogTask.getDataFile;
+import static org.apache.beam.sdk.io.iceberg.cdc.SerializableChangelogTask.getLength;
+import static org.apache.beam.sdk.io.iceberg.cdc.SerializableChangelogTask.getPartition;
+import static org.apache.beam.sdk.io.iceberg.cdc.SerializableChangelogTask.getSpec;
+import static org.apache.beam.sdk.io.iceberg.cdc.SerializableChangelogTask.getType;
+import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.io.iceberg.IcebergScanConfig;
+import org.apache.beam.sdk.io.iceberg.IcebergUtils;
+import org.apache.beam.sdk.io.iceberg.TableCache;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Splitter;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
+import org.apache.iceberg.AddedRowsScanTask;
+import org.apache.iceberg.BaseIncrementalChangelogScan;
+import org.apache.iceberg.ChangelogScanTask;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataOperations;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.DeletedDataFileScanTask;
+import org.apache.iceberg.DeletedRowsScanTask;
+import org.apache.iceberg.IncrementalChangelogScan;
+import org.apache.iceberg.MetricsConfig;
+import org.apache.iceberg.MetricsModes;
+import org.apache.iceberg.PartitionField;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.ScanTaskGroup;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SerializableTable;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.types.Conversions;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.StructLikeMap;
+import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * DoFn that takes incoming Iceberg snapshots and scans them for changelogs using Iceberg's {@link
+ * IncrementalChangelogScan}. Changelog tasks are organized into batches and routed to different
+ * downstream PCollections based on complexity.
+ *
+ * The Iceberg scan generates batches of changelog scan tasks, each of size {@link
+ * TableProperties#SPLIT_SIZE}. This can be configured with the table's read.split.target-size
+ * property.
+ *
+ *
This DoFn analyzes the nature of changes within the snapshot, partition, and file level, then
+ * routes the changes accordingly:
+ *
+ *
+ * - Unidirectional (Fast Path): If an isolated level contains only inserts OR only
+ * deletes, its tasks are emitted to {@link #UNIDIRECTIONAL_TASKS}. These records bypass
+ * the CoGBK shuffle and are output immediately.
+ *
- Small Bidirectional (Medium Path): If an isolated level contains a mix of inserts
+ * and deletes, and is small enough, its tasks are emitted to {@link
+ * #SMALL_BIDIRECTIONAL_TASKS}. These records are resolved in memory to identify potential
+ * updates. Task groups are considered small enough if the estimated overlap region is within
+ * {@link TableProperties#SPLIT_SIZE}.
+ *
- Bidirectional (Slow Path): If an isolated level contains a mix of inserts and
+ * deletes, and is too large, its tasks are emitted to {@link #LARGE_BIDIRECTIONAL_TASKS}.
+ * These records are grouped by Primary Key and processed by {@link ResolveChanges} to
+ * identify potential updates.
+ *
+ *
+ * Optimizing by Shuffling Less Data
+ *
+ * We take a three-layered approach to identify data that can bypass the expensive downstream
+ * CoGroupByKey shuffle:
+ *
+ *
Snapshots
+ *
+ * We start by analyzing the nature of changes at the snapshot level. If a snapshot's operation is
+ * not of type {@link DataOperations#OVERWRITE}, then it's a uni-directional change.
+ *
+ * Pinned Partitions
+ *
+ * If the table's partition fields are derived entirely from Primary Key fields, we know that a
+ * record will not migrate between partitions. This narrows down the isolated level and allows us to
+ * only check for bi-directional changes within a partition. Doing this will allow partitions
+ * with uni-directional changes to bypass the expensive CoGBK shuffle. It also gives partitions with
+ * small bi-directional changes a chance to be processed in-memory instead of needing to pass
+ * through the CoGBK.
+ *
+ *
Optimization for Individual Files
+ *
+ * When we have narrowed down our group of tasks with bi-directional changes, we start analyzing the
+ * metadata of their underlying files. We compare the upper and lower bounds of Partition Keys
+ * relevant to each file, and consider any overlaps as potentially containing an update. If a given
+ * task's Primary Key bounds has no overlap with any opposing task's Primary Key bounds, then we
+ * know it's not possible to create an (insert, delete) pair with it. Such a task can safely bypass
+ * the shuffle.
+ *
+ * Note: "opposing" refers to a change that happens in the opposite direction (e.g. insert is
+ * "positive", delete is "negative")
+ *
+ *
For example, say we have a group of tasks:
+ *
+ *
+ * - Task A (adds rows): bounds [3, 8]
+ *
- Task B (adds rows): bounds [2, 4]
+ *
- Task C (deletes rows): bounds [1, 5]
+ *
- Task D (adds rows): bounds [6, 12]
+ *
+ *
+ * Tasks A and B add rows, and overlap with Task C which deletes row. We need to resolve the rows
+ * in these 3 tasks because they might all contain (insert, delete) pairs that lead to an update.
+ *
+ *
Task D however, does not overlap with any delete rows. It will never produce an (insert,
+ * delete) pair, so we can directly emit it without resolving its output rows.
+ */
+class ChangelogScanner
+ extends DoFn>> {
+ private static final Logger LOG = LoggerFactory.getLogger(ChangelogScanner.class);
+ private static final Counter totalChangelogScanTasks =
+ Metrics.counter(ChangelogScanner.class, "totalChangelogScanTasks");
+ private static final Counter numAddedRowsScanTasks =
+ Metrics.counter(ChangelogScanner.class, "numAddedRowsScanTasks");
+ private static final Counter numDeletedRowsScanTasks =
+ Metrics.counter(ChangelogScanner.class, "numDeletedRowsScanTasks");
+ private static final Counter numDeletedDataFileScanTasks =
+ Metrics.counter(ChangelogScanner.class, "numDeletedDataFileScanTasks");
+ private static final Counter numUniDirectionalTasks =
+ Metrics.counter(ChangelogScanner.class, "numUniDirectionalTasks");
+ private static final Counter numLargeBiDirectionalTasks =
+ Metrics.counter(ChangelogScanner.class, "numLargeBiDirectionalTasks");
+ private static final Counter numSmallBiDirectionalTasks =
+ Metrics.counter(ChangelogScanner.class, "numSmallBiDirectionalTasks");
+ static final TupleTag>>
+ UNIDIRECTIONAL_TASKS = new TupleTag<>();
+ static final TupleTag>>
+ SMALL_BIDIRECTIONAL_TASKS = new TupleTag<>();
+ static final TupleTag>>
+ LARGE_BIDIRECTIONAL_TASKS = new TupleTag<>();
+
+ private final IcebergScanConfig scanConfig;
+ private @MonotonicNonNull Table table;
+ private @MonotonicNonNull Snapshot snapshot;
+ private transient @MonotonicNonNull TaskBatcher uniBatcher;
+ private boolean canDoPartitionOptimization = false;
+ // for metrics
+ private int numAddedRowsTasks = 0;
+ private int numDeletedRowsTasks = 0;
+ private int numDeletedFileTasks = 0;
+ private int numUniDirTasks = 0;
+ private int numSmallBiDirTasks = 0;
+ private int numLargeBiDirTasks = 0;
+ private int numUniDirSplits = 0;
+ private int numSmallBiDirSplits = 0;
+ private int numLargeBiDirSplits = 0;
+
+ ChangelogScanner(IcebergScanConfig scanConfig) {
+ this.scanConfig = scanConfig;
+ }
+
+ static KvCoder> coder(
+ org.apache.beam.sdk.schemas.Schema rowIdBeamSchema) {
+ return KvCoder.of(
+ ChangelogDescriptor.coder(rowIdBeamSchema),
+ ListCoder.of(SerializableChangelogTask.coder()));
+ }
+
+ @Setup
+ public void setup() {
+ TableCache.setup(scanConfig);
+ }
+
+ @ProcessElement
+ public void process(@Element Long snapshotId, MultiOutputReceiver out) throws IOException {
+ resetLocalMetrics();
+ // not using getRefreshed because upstream Watch should have already refreshed the
+ // table to a state where this snapshot exists
+ this.table = SerializableTable.copyOf(TableCache.get(scanConfig.getTableIdentifier()));
+ this.snapshot = table.snapshot(snapshotId);
+
+ // refresh on miss
+ if (this.snapshot == null) {
+ this.table =
+ SerializableTable.copyOf(TableCache.getRefreshed(scanConfig.getTableIdentifier()));
+ this.snapshot =
+ checkStateNotNull(
+ table.snapshot(snapshotId), "Could not retrieve table snapshot: %s", snapshotId);
+ }
+
+ @Nullable Long fromSnapshotId = snapshot.parentId();
+ @Nullable Expression filter = scanConfig.getFilter();
+
+ // TODO(ahmedabu98): replace this with table.newIncrementalChangelogScan() when
+ // https://github.com/apache/iceberg/pull/14264/ gets merged and released.
+ IncrementalChangelogScan scan =
+ new BaseIncrementalChangelogScan(table)
+ .toSnapshot(snapshotId)
+ .project(scanConfig.getProjectedSchema());
+ if (fromSnapshotId != null) {
+ scan = scan.fromSnapshotExclusive(fromSnapshotId);
+ }
+ if (filter != null) {
+ scan = scan.filter(filter);
+ }
+
+ // configure the scan to store upper/lower bound metrics only
+ // if it's available for primary key fields
+ scan = maybeIncludeColumnStats(scan, table);
+
+ createAndOutputReadTasks(scan, out);
+ }
+
+ private IncrementalChangelogScan maybeIncludeColumnStats(
+ IncrementalChangelogScan scan, Table table) {
+ boolean metricsAvailable = true;
+ MetricsConfig metricsConfig = MetricsConfig.forTable(table);
+ Collection pkFields = table.schema().identifierFieldNames();
+ for (String field : pkFields) {
+ MetricsModes.MetricsMode mode = metricsConfig.columnMode(field);
+ if (!(mode instanceof MetricsModes.Full) && !(mode instanceof MetricsModes.Truncate)) {
+ metricsAvailable = false;
+ break;
+ }
+ }
+ if (metricsAvailable) {
+ scan = scan.includeColumnStats(pkFields);
+ }
+ return scan;
+ }
+
+ @SuppressWarnings("Slf4jFormatShouldBeConst")
+ private void createAndOutputReadTasks(
+ IncrementalChangelogScan scan, MultiOutputReceiver multiOutputReceiver) throws IOException {
+ Snapshot snapshot = checkStateNotNull(this.snapshot);
+ Table table = checkStateNotNull(this.table);
+
+ // ******** Partition Optimization ********
+ // Determine which partition specs "pin" records to their partition
+ // (i.e. partition fields are sourced entirely from a record's PK).
+ // If records are pinned, we can optimize by only shuffling bi-directional changes
+ // *within* a partition, since no cross-partition changes will occur.
+ Set pinnedSpecs =
+ table.specs().entrySet().stream()
+ .filter(e -> doesSpecPinRecordsToPartition(e.getValue()))
+ .map(Map.Entry::getKey)
+ .collect(Collectors.toSet());
+ boolean tableHasPinnedSpecs = !pinnedSpecs.isEmpty();
+
+ // The optimization cannot apply if any file in this snapshot uses an unpinned spec
+ boolean snapshotHasUnpinnedSpec = false;
+ Set specsInSnapshot = new HashSet<>();
+ ChangeTypesInPartition changeTypesInPartition = new ChangeTypesInPartition();
+
+ // Buffer tasks from OVERWRITE snapshots, because they are potentially bi-directional
+ OverwriteTasks overwriteTasks = new OverwriteTasks();
+
+ // batcher for uni-directional tasks, which can be directly emitted when splitSize is reached
+ uniBatcher =
+ new TaskBatcher(
+ scanConfig.getTableIdentifier(),
+ snapshot.timestampMillis(),
+ splitSize(),
+ multiOutputReceiver.get(UNIDIRECTIONAL_TASKS));
+
+ // === collect partition metadata and route/buffer tasks ===
+ LOG.info(
+ "Planning to scan snapshot {} (seq: {})", snapshot.snapshotId(), snapshot.sequenceNumber());
+ try (CloseableIterable> scanTaskGroups = scan.planTasks()) {
+ for (ScanTaskGroup scanTaskGroup : scanTaskGroups) {
+ for (ChangelogScanTask task : scanTaskGroup.tasks()) {
+ SerializableChangelogTask.Type type = getType(task);
+ StructLike partition = getPartition(task);
+ PartitionSpec spec = getSpec(task);
+ gatherTaskTypeMetrics(type);
+
+ // Collect partition metadata for pinned-spec optimization
+ if (tableHasPinnedSpecs) {
+ if (!pinnedSpecs.contains(spec.specId())) {
+ snapshotHasUnpinnedSpec = true;
+ } else {
+ changeTypesInPartition.add(spec, partition, type);
+ specsInSnapshot.add(spec.specId());
+ }
+ }
+
+ // non-overwrite tasks are always unidirectional (the scan planner
+ // skips REPLACE ops).
+ if (!DataOperations.OVERWRITE.equals(snapshot.operation())) {
+ uniBatcher.add(makeTask(task, table), snapshot.sequenceNumber(), getLength(task));
+ numUniDirTasks++;
+ continue;
+ }
+
+ // Overwrite tasks need further analysis — buffer for post-loop processing
+ overwriteTasks.add(spec, partition, task);
+ }
+ }
+ }
+ // a snapshot using multiple specs is also not safe for the partition optimization,
+ // unless we account for the spec ID in the file-to-file comparison, which complicates things
+ canDoPartitionOptimization =
+ tableHasPinnedSpecs && !snapshotHasUnpinnedSpec && specsInSnapshot.size() <= 1;
+
+ // === analyze buffered overwrite tasks using the partition metadata ===
+ processOverwriteTasks(overwriteTasks, changeTypesInPartition, multiOutputReceiver);
+ uniBatcher.flush();
+
+ int totalTasks = updateTaskCounters();
+
+ LOG.info(scanResultMessage(totalTasks));
+ }
+
+ private void processOverwriteTasks(
+ OverwriteTasks overwriteTasks,
+ ChangeTypesInPartition changeTypesInPartition,
+ MultiOutputReceiver multiOutputReceiver) {
+ if (overwriteTasks.isEmpty()) {
+ return;
+ }
+ Snapshot snapshot = checkStateNotNull(this.snapshot);
+ Table table = checkStateNotNull(this.table);
+
+ TaskBatcher uniBatcher = checkStateNotNull(this.uniBatcher);
+ TaskBatcher largeBiBatcher =
+ new TaskBatcher(
+ scanConfig.getTableIdentifier(),
+ snapshot.timestampMillis(),
+ splitSize(),
+ multiOutputReceiver.get(LARGE_BIDIRECTIONAL_TASKS));
+
+ if (!canDoPartitionOptimization) {
+ // Records are not pinned to partition (or no pinned specs at all).
+ // We need to compare underlying files across the whole snapshot.
+ List tasks = overwriteTasks.allTasks();
+
+ AnalysisResult result =
+ analyzeFiles(tasks, scanConfig.recordIdSchema(), scanConfig.recordIdComparator());
+
+ uniBatcher.add(result.unidirectional, snapshot.sequenceNumber(), table);
+ numUniDirTasks += result.unidirectional.size();
+
+ routeBidirectional(result, largeBiBatcher, multiOutputReceiver);
+ } else {
+ // Records are pinned to partition.
+ // Narrow down by comparing the files within each partition independently.
+ for (Map.Entry>> tasksPerSpec :
+ overwriteTasks.tasks.entrySet()) {
+ int specId = tasksPerSpec.getKey();
+ for (Map.Entry> tasksInPartition :
+ tasksPerSpec.getValue().entrySet()) {
+ StructLike partition = tasksInPartition.getKey();
+ @Nullable
+ Set partitionChangeTypes =
+ changeTypesInPartition.typesFor(specId, partition);
+
+ // If this partition has only uni-directional changes, output to UNIDIRECTIONAL and bypass
+ // file analysis
+ if (partitionChangeTypes != null && !containsBiDirectionalChanges(partitionChangeTypes)) {
+ uniBatcher.add(tasksInPartition.getValue(), snapshot.sequenceNumber(), table);
+ numUniDirTasks += tasksInPartition.getValue().size();
+ continue;
+ }
+
+ // Partition has bi-directional changes — analyze file-level overlaps
+ AnalysisResult result =
+ analyzeFiles(
+ tasksInPartition.getValue(),
+ scanConfig.recordIdSchema(),
+ scanConfig.recordIdComparator());
+
+ uniBatcher.add(result.unidirectional, snapshot.sequenceNumber(), table);
+ routeBidirectional(result, largeBiBatcher, multiOutputReceiver);
+
+ // metrics
+ numUniDirTasks += result.unidirectional.size();
+ numLargeBiDirTasks += result.bidirectional.size();
+ }
+ }
+ }
+ largeBiBatcher.flush();
+ numLargeBiDirSplits = largeBiBatcher.totalSplits;
+ }
+
+ /**
+ * Helper class for storing + processing {@link ChangelogScanTask}s organized by partition and
+ * spec ID.
+ */
+ static class OverwriteTasks {
+ Map>> tasks = new HashMap<>();
+
+ void add(PartitionSpec spec, StructLike partition, ChangelogScanTask task) {
+ tasks
+ .computeIfAbsent(spec.specId(), id -> StructLikeMap.create(spec.partitionType()))
+ .computeIfAbsent(partition, p -> new ArrayList<>())
+ .add(task);
+ }
+
+ boolean isEmpty() {
+ return tasks.isEmpty();
+ }
+
+ List allTasks() {
+ return tasks.values().stream()
+ .flatMap(taskMap -> taskMap.values().stream())
+ .flatMap(List::stream)
+ .collect(Collectors.toList());
+ }
+ }
+
+ /**
+ * Helper class for identifying types of {@link ChangelogScanTask} per spec ID and partition. This
+ * is used to determine whether this snapshot is eligible for partition optimization.
+ */
+ static class ChangeTypesInPartition {
+ Map>> changeTypesPerPartition =
+ new HashMap<>();
+
+ void add(PartitionSpec spec, StructLike partition, SerializableChangelogTask.Type type) {
+ changeTypesPerPartition
+ .computeIfAbsent(spec.specId(), id -> StructLikeMap.create(spec.partitionType()))
+ .computeIfAbsent(partition, p -> new HashSet<>())
+ .add(type);
+ }
+
+ @Nullable
+ Set typesFor(Integer specId, StructLike partition) {
+ if (!changeTypesPerPartition.containsKey(specId)) {
+ return null;
+ }
+ return checkStateNotNull(changeTypesPerPartition.get(specId)).get(partition);
+ }
+ }
+
+ /** Checks if a set of change types include both inserts and deletes. */
+ private static boolean containsBiDirectionalChanges(
+ Set changeTypes) {
+ return changeTypes.contains(ADDED_ROWS) && changeTypes.size() > 1;
+ }
+
+ /** Helper class for analyzing overlaps between opposing tasks. */
+ static class AnalysisResult {
+ final List unidirectional;
+ final List bidirectional;
+ final @Nullable StructLike overlapLower;
+ final @Nullable StructLike overlapUpper;
+
+ AnalysisResult(
+ List unidirectional,
+ List bidirectional,
+ @Nullable StructLike overlapLower,
+ @Nullable StructLike overlapUpper) {
+ this.unidirectional = unidirectional;
+ this.bidirectional = bidirectional;
+ this.overlapLower = overlapLower;
+ this.overlapUpper = overlapUpper;
+ }
+
+ @Nullable
+ Row overlapLowerRow(org.apache.beam.sdk.schemas.Schema idSchema) {
+ return this.overlapLower == null
+ ? null
+ : IcebergUtils.icebergRecordToBeamRow(idSchema, (Record) this.overlapLower);
+ }
+
+ @Nullable
+ Row overlapUpperRow(org.apache.beam.sdk.schemas.Schema idSchema) {
+ return this.overlapUpper == null
+ ? null
+ : IcebergUtils.icebergRecordToBeamRow(idSchema, (Record) this.overlapUpper);
+ }
+ }
+
+ /**
+ * Analyzes all tasks in the given list by comparing the bounds of each task's underlying files.
+ * If a task's partition key bounds overlap with an opposing task's partition key bounds, they are
+ * both considered bi-directional changes. If a task's bounds do not overlap with any opposing
+ * task's bounds, it is considered a uni-directional change.
+ */
+ static AnalysisResult analyzeFiles(
+ List tasks, Schema recIdSchema, Comparator idComp) {
+ List insertTasks = new ArrayList<>();
+ List deleteTasks = new ArrayList<>();
+
+ try {
+ for (ChangelogScanTask task : tasks) {
+ if (task instanceof AddedRowsScanTask) {
+ insertTasks.add(TaskAndBounds.of(task, recIdSchema, idComp));
+ } else if (task instanceof DeletedDataFileScanTask || task instanceof DeletedRowsScanTask) {
+ deleteTasks.add(TaskAndBounds.of(task, recIdSchema, idComp));
+ } else {
+ throw new IllegalStateException("Unknown ChangelogScanTask type: " + task.getClass());
+ }
+ }
+ } catch (TaskAndBounds.NoBoundMetricsException e) {
+ // if metrics are not fully available, we need to play it safe and shuffle all the tasks.
+ return new AnalysisResult(Collections.emptyList(), tasks, null, null);
+ }
+
+ if (!insertTasks.isEmpty() && !deleteTasks.isEmpty()) {
+ Comparator lowerBoundComp = (t1, t2) -> idComp.compare(t1.lowerId, t2.lowerId);
+ Comparator upperBoundComp = (t1, t2) -> idComp.compare(t1.upperId, t2.upperId);
+
+ insertTasks.sort(lowerBoundComp);
+ deleteTasks.sort(lowerBoundComp);
+
+ TaskAndBounds firstInsert = insertTasks.get(0);
+ TaskAndBounds firstDelete = deleteTasks.get(0);
+ TaskAndBounds lastInsert = insertTasks.stream().max(upperBoundComp).orElseThrow();
+ TaskAndBounds lastDelete = deleteTasks.stream().max(upperBoundComp).orElseThrow();
+
+ boolean overlapExists =
+ idComp.compare(firstDelete.lowerId, lastInsert.upperId) <= 0
+ && idComp.compare(firstInsert.lowerId, lastDelete.upperId) <= 0;
+
+ if (overlapExists) {
+ // Iterate through inserts and only check relevant deletes
+ for (TaskAndBounds insert : insertTasks) {
+ // First check if the insert task overlaps with the global delete window.
+ // If not, we can just skip it.
+ if (idComp.compare(insert.upperId, firstDelete.lowerId) < 0
+ || idComp.compare(insert.lowerId, lastDelete.upperId) > 0) {
+ continue;
+ }
+
+ for (TaskAndBounds del : deleteTasks) {
+ // if the delete task's lower bound is already past the insert task's upper bound,
+ // no subsequent delete can overlap this insert (because we sorted above).
+ // We can break inner loop.
+ if (idComp.compare(del.lowerId, insert.upperId) > 0) {
+ break;
+ }
+
+ del.checkOverlapWith(insert, idComp);
+ }
+ }
+ }
+ }
+
+ // collect results and return.
+ // overlapping tasks are bidirectional.
+ // otherwise they are unidirectional.
+ List unidirectional = new ArrayList<>();
+ List bidirectional = new ArrayList<>();
+
+ for (TaskAndBounds taskAndBounds : Iterables.concat(deleteTasks, insertTasks)) {
+ if (taskAndBounds.overlaps) {
+ bidirectional.add(taskAndBounds.task);
+ } else {
+ unidirectional.add(taskAndBounds.task);
+ }
+ }
+
+ StructLike overlapLower = null;
+ StructLike overlapUpper = null;
+ if (!bidirectional.isEmpty()) {
+ StructLike globalInsertLower =
+ insertTasks.stream()
+ .filter(t -> t.overlaps)
+ .map(t -> t.lowerId)
+ .min(idComp)
+ .orElseThrow();
+ StructLike globalInsertUpper =
+ insertTasks.stream()
+ .filter(t -> t.overlaps)
+ .map(t -> t.upperId)
+ .max(idComp)
+ .orElseThrow();
+ StructLike globalDeleteLower =
+ deleteTasks.stream()
+ .filter(t -> t.overlaps)
+ .map(t -> t.lowerId)
+ .min(idComp)
+ .orElseThrow();
+ StructLike globalDeleteUpper =
+ deleteTasks.stream()
+ .filter(t -> t.overlaps)
+ .map(t -> t.upperId)
+ .max(idComp)
+ .orElseThrow();
+
+ overlapLower =
+ idComp.compare(globalInsertLower, globalDeleteLower) > 0
+ ? globalInsertLower
+ : globalDeleteLower;
+ overlapUpper =
+ idComp.compare(globalInsertUpper, globalDeleteUpper) < 0
+ ? globalInsertUpper
+ : globalDeleteUpper;
+ }
+
+ return new AnalysisResult(unidirectional, bidirectional, overlapLower, overlapUpper);
+ }
+
+ /**
+ * Routes bi-directional tasks from an {@link AnalysisResult} to either the in-memory local
+ * resolve path (when the estimated overlap region fits in one split) or the CoGroupByKey shuffle
+ * path otherwise.
+ *
+ * For LOCAL routing, all bi-directional tasks for this snapshot/partition group are emitted as
+ * a batch so that the downstream {@link LocalResolveDoFn} can resolve them together in-memory. //
+ * * The total byte size may exceed {@code splitSize}, but the in-memory // * footprint is bounded
+ * by the overlap byte estimate (the local resolver still does per-record PK // * routing to avoid
+ * buffering records outside the overlap range).
+ *
+ *
Returns the number of tasks routed to LOCAL so the caller can update counters.
+ */
+ private void routeBidirectional(
+ AnalysisResult result, TaskBatcher largeBiBatcher, MultiOutputReceiver multiOutputReceiver) {
+ Snapshot snapshot = checkStateNotNull(this.snapshot);
+
+ if (result.bidirectional.isEmpty()) {
+ return;
+ }
+
+ long totalBytes =
+ result.bidirectional.stream().mapToLong(SerializableChangelogTask::getLength).sum();
+
+ @Nullable Row overlapLowerRow = result.overlapLowerRow(scanConfig.rowIdBeamSchema());
+ @Nullable Row overlapUpperRow = result.overlapUpperRow(scanConfig.rowIdBeamSchema());
+ ChangelogDescriptor descriptor =
+ ChangelogDescriptor.builder()
+ .setTableIdentifierString(scanConfig.getTableIdentifier())
+ .setSnapshotSequenceNumber(snapshot.sequenceNumber())
+ .setCommitSnapshotId(snapshot.snapshotId())
+ .setOverlapLower(overlapLowerRow)
+ .setOverlapUpper(overlapUpperRow)
+ .build();
+
+ List serializedTasks =
+ result.bidirectional.stream()
+ .map(t -> makeTask(t, checkStateNotNull(table)))
+ .collect(Collectors.toList());
+
+ // If the batch is small enough, we can route to LOCAL (in-memory) resolver
+ if (totalBytes <= splitSize()) {
+ Instant ts = Instant.ofEpochMilli(snapshot.timestampMillis());
+ multiOutputReceiver
+ .get(SMALL_BIDIRECTIONAL_TASKS)
+ .outputWithTimestamp(KV.of(descriptor, serializedTasks), ts);
+ numSmallBiDirTasks += result.bidirectional.size();
+ numSmallBiDirSplits++;
+ return;
+ }
+
+ // If the batch is too big, we need to route to the CoGBK for distributed resolution
+ for (SerializableChangelogTask t : serializedTasks) {
+ largeBiBatcher.add(descriptor, t, t.getLength());
+ }
+ }
+
+ private static SerializableChangelogTask makeTask(ChangelogScanTask task, Table table) {
+ return SerializableChangelogTask.from(task, table.specs());
+ }
+
+ /**
+ * Wraps the {@link ChangelogScanTask}, and stores its lower and upper Primary Keys. Identifies
+ * overlaps with other tasks by comparing lower and upper keys using Iceberg libraries.
+ */
+ static class TaskAndBounds {
+ ChangelogScanTask task;
+ StructLike lowerId;
+ StructLike upperId;
+ boolean overlaps = false;
+
+ private TaskAndBounds(ChangelogScanTask task, StructLike lowerId, StructLike upperId) {
+ this.task = task;
+ this.lowerId = lowerId;
+ this.upperId = upperId;
+ }
+
+ static TaskAndBounds of(
+ ChangelogScanTask task, Schema recIdSchema, Comparator idComp)
+ throws NoBoundMetricsException {
+ @MonotonicNonNull GenericRecord lowerId = null;
+ @MonotonicNonNull GenericRecord upperId = null;
+
+ if (task instanceof AddedRowsScanTask || task instanceof DeletedDataFileScanTask) {
+ // just store the bounds of the DataFile
+ DataFile df = getDataFile(task);
+ @Nullable Map lowerBounds = df.lowerBounds();
+ @Nullable Map upperBounds = df.upperBounds();
+ if (lowerBounds == null || upperBounds == null) {
+ throw new NoBoundMetricsException(
+ format(
+ "Upper and/or lower bounds are missing for %s with DataFile: %s.",
+ task.getClass().getSimpleName(), df.location()));
+ }
+
+ lowerId = createRecId(recIdSchema, lowerBounds);
+ upperId = createRecId(recIdSchema, upperBounds);
+ } else if (task instanceof DeletedRowsScanTask) {
+ // iterate over all added DeleteFiles and keep track of only the
+ // minimum and maximum bounds over the list
+ for (DeleteFile deleteFile : ((DeletedRowsScanTask) task).addedDeletes()) {
+ @Nullable Map lowerDelBounds = deleteFile.lowerBounds();
+ @Nullable Map upperDelBounds = deleteFile.upperBounds();
+ if (lowerDelBounds == null || upperDelBounds == null) {
+ throw new NoBoundMetricsException(
+ format(
+ "Upper and/or lower bounds are missing for %s with "
+ + "DataFile '%s' and DeleteFile '%s'",
+ task.getClass().getSimpleName(),
+ getDataFile(task).location(),
+ deleteFile.location()));
+ }
+
+ GenericRecord delFileLower = createRecId(recIdSchema, lowerDelBounds);
+ GenericRecord delFileUpper = createRecId(recIdSchema, upperDelBounds);
+
+ if (lowerId == null || idComp.compare(delFileLower, lowerId) < 0) {
+ lowerId = delFileLower;
+ }
+ if (upperId == null || idComp.compare(delFileUpper, upperId) > 0) {
+ upperId = delFileUpper;
+ }
+ }
+ } else {
+ throw new UnsupportedOperationException(
+ "Unsupported task type: " + task.getClass().getSimpleName());
+ }
+
+ if (lowerId == null || upperId == null) {
+ throw new NoBoundMetricsException(
+ format(
+ "Could not compute min and/or max bounds for %s with DataFile: %s",
+ task.getClass().getSimpleName(), getDataFile(task).location()));
+ }
+ return new TaskAndBounds(task, lowerId, upperId);
+ }
+
+ /**
+ * Compares itself with another task. If the bounds overlap, sets {@link #overlaps} to true for
+ * both tasks.
+ */
+ private void checkOverlapWith(TaskAndBounds other, Comparator idComp) {
+ if (overlaps && other.overlaps) {
+ return;
+ }
+
+ int left = idComp.compare(lowerId, other.upperId);
+ int right = idComp.compare(other.lowerId, upperId);
+
+ if (left <= 0 && right <= 0) {
+ overlaps = true;
+ other.overlaps = true;
+ }
+ }
+
+ private static GenericRecord createRecId(Schema recIdSchema, Map bounds)
+ throws NoBoundMetricsException {
+ GenericRecord recId = GenericRecord.create(recIdSchema);
+
+ for (Types.NestedField field : recIdSchema.columns()) {
+ int fieldId = field.fieldId();
+ Type type = field.type();
+ String name = field.name();
+ @Nullable ByteBuffer value = bounds.get(fieldId);
+ if (value == null) {
+ throw new NoBoundMetricsException("Could not fetch metric value for column: " + name);
+ }
+ Object data = checkStateNotNull(Conversions.fromByteBuffer(type, value));
+ recId.setField(name, data);
+ }
+ return recId;
+ }
+
+ static class NoBoundMetricsException extends Exception {
+ public NoBoundMetricsException(String msg) {
+ super(msg);
+ }
+ }
+ }
+
+ /** Checks if all partition fields are derived from record identifier fields. */
+ private static boolean doesSpecPinRecordsToPartition(PartitionSpec spec) {
+ Set identifierFieldsIds = spec.schema().identifierFieldIds();
+ if (spec.isUnpartitioned() || identifierFieldsIds.isEmpty()) {
+ return false;
+ }
+
+ for (PartitionField field : spec.fields()) {
+ if (!identifierFieldsIds.contains(field.sourceId())) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ /**
+ * Helper class to batch tasks going to the same tagged PCollection.
+ *
+ * Used to create batches of uni-directional tasks to send to {@link #UNIDIRECTIONAL_TASKS}
+ * tag.
+ *
+ *
Also used to create batches of large bi-directional tasks to send to {@link
+ * #LARGE_BIDIRECTIONAL_TASKS} tag.
+ *
+ *
A batch is emitted once it reaches {@link #splitSize()}.
+ *
+ *
Note: This is not used by small bi-directional tasks. Instead, they are emitted immediately
+ * to {@link #SMALL_BIDIRECTIONAL_TASKS}.
+ */
+ static class TaskBatcher {
+ Map> tasks = new HashMap<>();
+ long byteSize = 0L;
+ final long maxSplitSize;
+ final String tableIdentifier;
+ final Instant timestamp;
+ final OutputReceiver>> output;
+ int totalSplits = 0;
+
+ TaskBatcher(
+ String tableIdentifier,
+ Long timestampMillis,
+ long maxSplitSize,
+ OutputReceiver>> output) {
+ this.tableIdentifier = tableIdentifier;
+ this.timestamp = Instant.ofEpochMilli(timestampMillis);
+ this.maxSplitSize = maxSplitSize;
+ this.output = output;
+ }
+
+ boolean canTake(long sizeBytes) {
+ return byteSize + sizeBytes <= maxSplitSize;
+ }
+
+ void add(List tasks, long sequenceNumber, Table table) {
+ tasks.forEach(t -> add(makeTask(t, table), sequenceNumber, getLength(t)));
+ }
+
+ void add(SerializableChangelogTask task, long snapshotSequenceNumber, long sizeBytes) {
+ add(
+ ChangelogDescriptor.builder()
+ .setTableIdentifierString(tableIdentifier)
+ .setSnapshotSequenceNumber(snapshotSequenceNumber)
+ .setCommitSnapshotId(task.getCommitSnapshotId())
+ .build(),
+ task,
+ sizeBytes);
+ }
+
+ void add(ChangelogDescriptor descriptor, SerializableChangelogTask task, long sizeBytes) {
+ if (!canTake(sizeBytes)) {
+ flush();
+ }
+ byteSize += sizeBytes;
+ tasks.computeIfAbsent(descriptor, d -> new ArrayList<>()).add(task);
+ }
+
+ void flush() {
+ if (tasks.isEmpty()) {
+ return;
+ }
+
+ for (Map.Entry> entry :
+ tasks.entrySet()) {
+ ChangelogDescriptor descriptor = entry.getKey();
+ List taskList = entry.getValue();
+ output.outputWithTimestamp(KV.of(descriptor, taskList), timestamp);
+ }
+
+ byteSize = 0;
+ tasks = new HashMap<>();
+ totalSplits++;
+ }
+ }
+
+ /**
+ * Fetch the desired split size for downstream read DoFn. We do our best to put tasks into groups
+ * of that size. This allows the user to control load per worker by tuning `read.split.target-size`
+ */
+ long splitSize() {
+ return PropertyUtil.propertyAsLong(
+ checkStateNotNull(table).properties(),
+ TableProperties.SPLIT_SIZE,
+ TableProperties.SPLIT_SIZE_DEFAULT);
+ }
+
+ static String name(String path) {
+ return Iterables.getLast(Splitter.on("-").split(path));
+ }
+
+ private void resetLocalMetrics() {
+ numAddedRowsTasks = 0;
+ numDeletedRowsTasks = 0;
+ numDeletedFileTasks = 0;
+ numUniDirTasks = 0;
+ numLargeBiDirTasks = 0;
+ numSmallBiDirTasks = 0;
+ numUniDirSplits = 0;
+ numSmallBiDirSplits = 0;
+ numLargeBiDirSplits = 0;
+ }
+
+ private void gatherTaskTypeMetrics(SerializableChangelogTask.Type type) {
+ switch (type) {
+ case ADDED_ROWS:
+ numAddedRowsTasks++;
+ break;
+ case DELETED_ROWS:
+ numDeletedRowsTasks++;
+ break;
+ case DELETED_FILE:
+ numDeletedFileTasks++;
+ break;
+ }
+ }
+
+ private int updateTaskCounters() {
+ int totalTasks = numAddedRowsTasks + numDeletedRowsTasks + numDeletedFileTasks;
+ numUniDirSplits = checkStateNotNull(uniBatcher).totalSplits;
+ totalChangelogScanTasks.inc(totalTasks);
+ numAddedRowsScanTasks.inc(numAddedRowsTasks);
+ numDeletedRowsScanTasks.inc(numDeletedRowsTasks);
+ numDeletedDataFileScanTasks.inc(numDeletedFileTasks);
+ numUniDirectionalTasks.inc(numUniDirTasks);
+ numSmallBiDirectionalTasks.inc(numSmallBiDirTasks);
+ numLargeBiDirectionalTasks.inc(numLargeBiDirTasks - numSmallBiDirTasks);
+
+ return totalTasks;
+ }
+
+ private String scanResultMessage(int totalTasks) {
+ StringBuilder message = new StringBuilder();
+ message.append(
+ format(
+ "Snapshot %s (seq: %s) produced %s changelog tasks.",
+ checkStateNotNull(snapshot).snapshotId(),
+ checkStateNotNull(snapshot).sequenceNumber(),
+ totalTasks));
+ if (totalTasks > 0) {
+ message.append("Emitted:");
+ if (numUniDirTasks > 0) {
+ message.append(
+ format(
+ "%n\t%s splits containing %s uni-directional tasks",
+ numUniDirSplits, numUniDirTasks));
+ }
+ if (numSmallBiDirTasks > 0) {
+ message.append(
+ format(
+ "%n\t%s splits containing %s small bi-directional tasks (for local resolution)",
+ numSmallBiDirSplits, numSmallBiDirTasks));
+ }
+ if (numLargeBiDirTasks > 0) {
+ message.append(
+ format(
+ "%n\t%s splits containing %s large bi-directional tasks (to be shuffled)",
+ numLargeBiDirSplits, numLargeBiDirTasks));
+ }
+ }
+ return message.toString();
+ }
+}
diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/cdc/ChangelogScannerTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/cdc/ChangelogScannerTest.java
new file mode 100644
index 000000000000..8bd35f17b320
--- /dev/null
+++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/cdc/ChangelogScannerTest.java
@@ -0,0 +1,454 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.iceberg.cdc;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.iceberg.types.Types.NestedField.required;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.empty;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.io.iceberg.SerializableDataFile;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.OutputBuilder;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.AddedRowsScanTask;
+import org.apache.iceberg.ChangelogOperation;
+import org.apache.iceberg.ChangelogScanTask;
+import org.apache.iceberg.ContentScanTask;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.DeletedDataFileScanTask;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Metrics;
+import org.apache.iceberg.PartitionKey;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.ExpressionParser;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.types.Comparators;
+import org.apache.iceberg.types.Conversions;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.types.Types;
+import org.joda.time.Instant;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Unit tests for {@link ChangelogScanner}. */
+@RunWith(JUnit4.class)
+public class ChangelogScannerTest {
+ private static final Schema SINGLE_PK_SCHEMA =
+ new Schema(
+ ImmutableList.of(
+ required(1, "id", Types.LongType.get()), optional(2, "data", Types.StringType.get())),
+ ImmutableSet.of(1));
+ private static final Schema COMPOSITE_PK_SCHEMA =
+ new Schema(
+ ImmutableList.of(
+ required(1, "account", Types.StringType.get()),
+ required(2, "sequence", Types.IntegerType.get()),
+ optional(3, "data", Types.StringType.get())),
+ ImmutableSet.of(1, 2));
+ private static final Schema SINGLE_RECORD_ID_SCHEMA = recordIdSchema(SINGLE_PK_SCHEMA);
+ private static final Schema COMPOSITE_RECORD_ID_SCHEMA = recordIdSchema(COMPOSITE_PK_SCHEMA);
+ private static final PartitionSpec UNPARTITIONED_SPEC = PartitionSpec.unpartitioned();
+ private static final PartitionSpec IDENTITY_ID_SPEC =
+ PartitionSpec.builderFor(SINGLE_PK_SCHEMA).identity("id").build();
+
+ @Test
+ public void analyzeFilesPrunesNonOverlappingOpposingTasksToUnidirectional() {
+ FakeAddedRowsTask insert = new FakeAddedRowsTask(dataFile("insert", 10L, 20L), 11L);
+ FakeDeletedDataFileTask delete = new FakeDeletedDataFileTask(dataFile("delete", 30L, 40L), 13L);
+
+ ChangelogScanner.AnalysisResult result =
+ ChangelogScanner.analyzeFiles(
+ ImmutableList.of(insert, delete),
+ SINGLE_RECORD_ID_SCHEMA,
+ comparator(SINGLE_RECORD_ID_SCHEMA));
+
+ assertThat(result.bidirectional, empty());
+ assertThat(result.unidirectional, contains(delete, insert));
+ assertNull(result.overlapLower);
+ assertNull(result.overlapUpper);
+ }
+
+ @Test
+ public void analyzeFilesFindsOverlapDespiteInputOrder() {
+ FakeDeletedDataFileTask laterDelete =
+ new FakeDeletedDataFileTask(dataFile("delete-later", 30L, 40L), 13L);
+ FakeDeletedDataFileTask overlappingDelete =
+ new FakeDeletedDataFileTask(dataFile("delete-overlap", 15L, 18L), 17L);
+ FakeAddedRowsTask insert = new FakeAddedRowsTask(dataFile("insert", 10L, 20L), 19L);
+
+ ChangelogScanner.AnalysisResult result =
+ ChangelogScanner.analyzeFiles(
+ ImmutableList.of(laterDelete, overlappingDelete, insert),
+ SINGLE_RECORD_ID_SCHEMA,
+ comparator(SINGLE_RECORD_ID_SCHEMA));
+
+ assertThat(result.unidirectional, contains(laterDelete));
+ assertThat(result.bidirectional, containsInAnyOrder(overlappingDelete, insert));
+ assertEquals(15L, record(result.overlapLower).getField("id"));
+ assertEquals(18L, record(result.overlapUpper).getField("id"));
+ }
+
+ @Test
+ public void analyzeFilesUsesLexicographicCompositePrimaryKeyRanges() {
+ FakeAddedRowsTask insert =
+ new FakeAddedRowsTask(
+ dataFile(
+ COMPOSITE_PK_SCHEMA,
+ "insert",
+ ImmutableMap.of("account", "acct-a", "sequence", 2),
+ ImmutableMap.of("account", "acct-a", "sequence", 8),
+ 11L),
+ 11L);
+ FakeDeletedDataFileTask overlappingDelete =
+ new FakeDeletedDataFileTask(
+ dataFile(
+ COMPOSITE_PK_SCHEMA,
+ "delete-overlap",
+ ImmutableMap.of("account", "acct-a", "sequence", 5),
+ ImmutableMap.of("account", "acct-b", "sequence", 1),
+ 13L),
+ 13L);
+ FakeDeletedDataFileTask farDelete =
+ new FakeDeletedDataFileTask(
+ dataFile(
+ COMPOSITE_PK_SCHEMA,
+ "delete-far",
+ ImmutableMap.of("account", "acct-c", "sequence", 1),
+ ImmutableMap.of("account", "acct-c", "sequence", 2),
+ 17L),
+ 17L);
+
+ ChangelogScanner.AnalysisResult result =
+ ChangelogScanner.analyzeFiles(
+ ImmutableList.of(insert, farDelete, overlappingDelete),
+ COMPOSITE_RECORD_ID_SCHEMA,
+ comparator(COMPOSITE_RECORD_ID_SCHEMA));
+
+ assertThat(result.unidirectional, contains(farDelete));
+ assertThat(result.bidirectional, containsInAnyOrder(insert, overlappingDelete));
+ assertEquals("acct-a", record(result.overlapLower).getField("account").toString());
+ assertEquals(5, record(result.overlapLower).getField("sequence"));
+ assertEquals("acct-a", record(result.overlapUpper).getField("account").toString());
+ assertEquals(8, record(result.overlapUpper).getField("sequence"));
+ }
+
+ @Test
+ public void analyzeFilesConservativelyRoutesAllTasksWhenMetricsAreMissing() {
+ FakeAddedRowsTask insert = new FakeAddedRowsTask(dataFile("insert", 10L, 20L), 11L);
+ FakeDeletedDataFileTask deleteWithMissingMetrics =
+ new FakeDeletedDataFileTask(dataFileWithoutBounds("delete-missing-metrics"), 13L);
+
+ ChangelogScanner.AnalysisResult result =
+ ChangelogScanner.analyzeFiles(
+ ImmutableList.of(insert, deleteWithMissingMetrics),
+ SINGLE_RECORD_ID_SCHEMA,
+ comparator(SINGLE_RECORD_ID_SCHEMA));
+
+ assertThat(result.unidirectional, empty());
+ assertThat(result.bidirectional, contains(insert, deleteWithMissingMetrics));
+ assertNull(result.overlapLower);
+ assertNull(result.overlapUpper);
+ }
+
+ @Test
+ public void taskBatcherFlushesAtSplitBoundariesWithoutEmptyBatches() {
+ CapturingOutputReceiver out = new CapturingOutputReceiver();
+ ChangelogScanner.TaskBatcher batcher =
+ new ChangelogScanner.TaskBatcher("default.table", 1234L, 100L, out);
+ SerializableChangelogTask first = serializableTask("first", 40L);
+ SerializableChangelogTask second = serializableTask("second", 60L);
+ SerializableChangelogTask third = serializableTask("third", 1L);
+
+ batcher.add(first, 1L, 40L);
+ batcher.add(second, 1L, 60L);
+ assertThat(out.values, empty());
+
+ batcher.add(third, 1L, 1L);
+ assertEquals(1, out.values.size());
+ batcher.flush();
+
+ assertEquals(2, out.values.size());
+ assertEquals(2, batcher.totalSplits);
+ assertEquals(new Instant(1234L), out.values.get(0).getTimestamp());
+ assertThat(tasksInOutput(out, 0), contains(first, second));
+ assertThat(tasksInOutput(out, 1), contains(third));
+ }
+
+ @Test
+ public void taskBatcherAllowsOversizeSingleTaskWithoutEmittingEmptyBatch() {
+ CapturingOutputReceiver out = new CapturingOutputReceiver();
+ ChangelogScanner.TaskBatcher batcher =
+ new ChangelogScanner.TaskBatcher("default.table", 1234L, 100L, out);
+ SerializableChangelogTask oversize = serializableTask("oversize", 150L);
+
+ batcher.add(oversize, 1L, 150L);
+ assertThat(out.values, empty());
+
+ batcher.flush();
+
+ assertEquals(1, out.values.size());
+ assertEquals(1, batcher.totalSplits);
+ assertThat(tasksInOutput(out, 0), contains(oversize));
+ }
+
+ private static Comparator comparator(Schema schema) {
+ return Comparators.forType(schema.asStruct());
+ }
+
+ private static Schema recordIdSchema(Schema schema) {
+ return TypeUtil.select(schema, schema.identifierFieldIds());
+ }
+
+ private static Record record(StructLike structLike) {
+ return (Record) structLike;
+ }
+
+ private static List tasksInOutput(
+ CapturingOutputReceiver out, int index) {
+ return out.values.get(index).getValue().getValue();
+ }
+
+ private static DataFile dataFile(String name, long lower, long upper) {
+ return dataFile(UNPARTITIONED_SPEC, null, name, lower, upper);
+ }
+
+ private static DataFile dataFile(
+ PartitionSpec spec, StructLike partition, String name, long lower, long upper) {
+ return dataFile(
+ SINGLE_PK_SCHEMA,
+ spec,
+ partition,
+ name,
+ ImmutableMap.of("id", lower),
+ ImmutableMap.of("id", upper),
+ 100L);
+ }
+
+ private static DataFile dataFile(
+ Schema schema, String name, Map lower, Map upper, long size) {
+ return dataFile(schema, UNPARTITIONED_SPEC, null, name, lower, upper, size);
+ }
+
+ private static DataFile dataFile(
+ Schema schema,
+ PartitionSpec spec,
+ StructLike partition,
+ String name,
+ Map lower,
+ Map upper,
+ long size) {
+ DataFiles.Builder builder =
+ DataFiles.builder(spec)
+ .withFormat(FileFormat.PARQUET)
+ .withPath("gs:://bucket/data/" + name + ".parquet")
+ .withFileSizeInBytes(size)
+ .withMetrics(
+ new Metrics(
+ 1L, null, null, null, null, bounds(schema, lower), bounds(schema, upper)));
+ if (partition != null) {
+ builder.withPartition(partition);
+ }
+ return builder.build();
+ }
+
+ private static DataFile dataFileWithoutBounds(String name) {
+ return DataFiles.builder(UNPARTITIONED_SPEC)
+ .withFormat(FileFormat.PARQUET)
+ .withPath("gs:://bucket/data/" + name + ".parquet")
+ .withFileSizeInBytes(100L)
+ .withMetrics(new Metrics(1L, null, null, null, null, null, null))
+ .build();
+ }
+
+ private static Map bounds(Schema schema, Map values) {
+ ImmutableMap.Builder builder = ImmutableMap.builder();
+ for (Types.NestedField field : schema.columns()) {
+ if (values.containsKey(field.name())) {
+ builder.put(
+ field.fieldId(), Conversions.toByteBuffer(field.type(), values.get(field.name())));
+ }
+ }
+ return builder.build();
+ }
+
+ private static StructLike partition(long id) {
+ PartitionKey partitionKey = new PartitionKey(IDENTITY_ID_SPEC, SINGLE_PK_SCHEMA);
+ GenericRecord record = GenericRecord.create(SINGLE_PK_SCHEMA);
+ record.setField("id", id);
+ record.setField("data", "partition-" + id);
+ partitionKey.partition(record);
+ return partitionKey;
+ }
+
+ private static SerializableChangelogTask serializableTask(String name, long length) {
+ DataFile file =
+ DataFiles.builder(UNPARTITIONED_SPEC)
+ .withFormat(FileFormat.PARQUET)
+ .withPath("gs:://bucket/data/" + name + ".parquet")
+ .withFileSizeInBytes(length)
+ .withMetrics(new Metrics(1L, null, null, null, null, null, null))
+ .build();
+ return SerializableChangelogTask.builder()
+ .setType(SerializableChangelogTask.Type.ADDED_ROWS)
+ .setDataFile(SerializableDataFile.from(file, "", false))
+ .setSpecId(UNPARTITIONED_SPEC.specId())
+ .setOperation(ChangelogOperation.INSERT)
+ .setOrdinal(0)
+ .setCommitSnapshotId(1L)
+ .setStart(0L)
+ .setLength(length)
+ .setJsonExpression(ExpressionParser.toJson(Expressions.alwaysTrue()))
+ .build();
+ }
+
+ private abstract static class FakeContentTask
+ implements ChangelogScanTask, ContentScanTask {
+ private final DataFile file;
+ private final PartitionSpec spec;
+ private final StructLike partition;
+ private final long length;
+
+ FakeContentTask(DataFile file, long length) {
+ this(file, UNPARTITIONED_SPEC, file.partition(), length);
+ }
+
+ FakeContentTask(DataFile file, PartitionSpec spec, StructLike partition, long length) {
+ this.file = file;
+ this.spec = spec;
+ this.partition = partition;
+ this.length = length;
+ }
+
+ @Override
+ public DataFile file() {
+ return file;
+ }
+
+ @Override
+ public PartitionSpec spec() {
+ return spec;
+ }
+
+ @Override
+ public StructLike partition() {
+ return partition;
+ }
+
+ @Override
+ public long start() {
+ return 0L;
+ }
+
+ @Override
+ public long length() {
+ return length;
+ }
+
+ @Override
+ public Expression residual() {
+ return Expressions.alwaysTrue();
+ }
+
+ @Override
+ public int changeOrdinal() {
+ return 0;
+ }
+
+ @Override
+ public long commitSnapshotId() {
+ return 1L;
+ }
+ }
+
+ private static class FakeAddedRowsTask extends FakeContentTask implements AddedRowsScanTask {
+ FakeAddedRowsTask(DataFile file, long length) {
+ super(file, length);
+ }
+
+ @Override
+ public List deletes() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public ChangelogOperation operation() {
+ return ChangelogOperation.INSERT;
+ }
+ }
+
+ private static class FakeDeletedDataFileTask extends FakeContentTask
+ implements DeletedDataFileScanTask {
+ FakeDeletedDataFileTask(DataFile file, long length) {
+ super(file, length);
+ }
+
+ @Override
+ public List existingDeletes() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public ChangelogOperation operation() {
+ return ChangelogOperation.DELETE;
+ }
+ }
+
+ private static final class CapturingOutputReceiver
+ implements DoFn.OutputReceiver>> {
+ private final ImmutableList.Builder<
+ TimestampedValue>>>
+ builder = ImmutableList.builder();
+ private List>>>
+ values = ImmutableList.of();
+
+ @Override
+ public OutputBuilder>> builder(
+ KV> value) {
+ throw new UnsupportedOperationException("Use outputWithTimestamp in this test receiver.");
+ }
+
+ @Override
+ public void outputWithTimestamp(
+ KV> value, Instant timestamp) {
+ builder.add(TimestampedValue.of(value, timestamp));
+ values = builder.build();
+ }
+ }
+}