diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java b/sdks/java/io/iceberg/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java
new file mode 100644
index 000000000000..6b12e16690ba
--- /dev/null
+++ b/sdks/java/io/iceberg/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java
@@ -0,0 +1,1014 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.iceberg;
+
+import java.util.ArrayDeque;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Deque;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.FluentIterable;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
+import org.apache.iceberg.ManifestGroup.CreateTasksFunction;
+import org.apache.iceberg.ManifestGroup.TaskContext;
+import org.apache.iceberg.expressions.Evaluator;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.expressions.ManifestEvaluator;
+import org.apache.iceberg.expressions.Projections;
+import org.apache.iceberg.expressions.ResidualEvaluator;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.util.ContentFileUtil;
+import org.apache.iceberg.util.Pair;
+import org.apache.iceberg.util.PartitionMap;
+import org.apache.iceberg.util.PartitionSet;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.iceberg.util.SortedMerge;
+import org.apache.iceberg.util.TableScanUtil;
+import org.apache.iceberg.util.Tasks;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Copied over from Iceberg PR #14264.
+ */
+@SuppressWarnings("nullness")
+public class BaseIncrementalChangelogScan
+ extends BaseIncrementalScan<
+ IncrementalChangelogScan, ChangelogScanTask, ScanTaskGroup>
+ implements IncrementalChangelogScan {
+ private static final DeleteFileIndex EMPTY = createEmptyInstance();
+
+ private static DeleteFileIndex createEmptyInstance() {
+ try {
+ var constructor =
+ DeleteFileIndex.class.getDeclaredConstructor(
+ DeleteFileIndex.EqualityDeletes.class,
+ PartitionMap.class,
+ PartitionMap.class,
+ Map.class,
+ Map.class);
+ constructor.setAccessible(true);
+ return constructor.newInstance(null, null, null, null, null);
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to initialize EMPTY DeleteFileIndex", e);
+ }
+ }
+
+ private static final Logger LOG = LoggerFactory.getLogger(BaseIncrementalChangelogScan.class);
+
+ public BaseIncrementalChangelogScan(Table table) {
+ this(table, table.schema(), TableScanContext.empty());
+ }
+
+ private BaseIncrementalChangelogScan(Table table, Schema schema, TableScanContext context) {
+ super(table, schema, context);
+ }
+
+ @Override
+ protected IncrementalChangelogScan newRefinedScan(
+ Table newTable, Schema newSchema, TableScanContext newContext) {
+ return new BaseIncrementalChangelogScan(newTable, newSchema, newContext);
+ }
+
+ // Private fields to track build call count and cache (accessed via package-private methods for
+ // testing)
+ private int existingDeleteIndexBuildCallCount = 0;
+ // Cache for the built index (null if not built yet)
+ private DeleteFileIndex cachedExistingDeleteIndex = null;
+
+ @Override
+ protected CloseableIterable doPlanFiles(
+ Long fromSnapshotIdExclusive, long toSnapshotIdInclusive) {
+
+ Deque changelogSnapshots =
+ orderedChangelogSnapshots(fromSnapshotIdExclusive, toSnapshotIdInclusive);
+
+ if (changelogSnapshots.isEmpty()) {
+ return CloseableIterable.empty();
+ }
+
+ Set changelogSnapshotIds = toSnapshotIds(changelogSnapshots);
+
+ Set newDataManifests =
+ FluentIterable.from(changelogSnapshots)
+ .transformAndConcat(snapshot -> snapshot.dataManifests(table().io()))
+ .filter(manifest -> changelogSnapshotIds.contains(manifest.snapshotId()))
+ .toSet();
+
+ // Build per-snapshot delete file indexes for added deletes
+ Map addedDeletesBySnapshot = buildAddedDeleteIndexes(changelogSnapshots);
+
+ // Check if existing delete index is needed for equality deletes
+ boolean hasEqualityDeletes =
+ addedDeletesBySnapshot.values().stream()
+ .anyMatch(index -> !index.isEmpty() && index.hasEqualityDeletes());
+
+ // Build existing index early if needed for equality deletes, otherwise use lazy initialization
+ DeleteFileIndex existingDeleteIndex =
+ hasEqualityDeletes ? buildExistingDeleteIndexTracked(fromSnapshotIdExclusive) : EMPTY;
+
+ ManifestGroup manifestGroup =
+ new ManifestGroup(table().io(), newDataManifests, ImmutableList.of())
+ .specsById(table().specs())
+ .caseSensitive(isCaseSensitive())
+ .select(scanColumns())
+ .filterData(filter())
+ .filterManifestEntries(entry -> changelogSnapshotIds.contains(entry.snapshotId()))
+ .ignoreExisting()
+ .columnsToKeepStats(columnsToKeepStats());
+
+ if (shouldIgnoreResiduals()) {
+ manifestGroup = manifestGroup.ignoreResiduals();
+ }
+
+ if (newDataManifests.size() > 1 && shouldPlanWithExecutor()) {
+ manifestGroup = manifestGroup.planWith(planExecutor());
+ }
+
+ // Create a supplier that reuses already-built index or builds lazily when first DELETED entry
+ // is encountered
+ Supplier existingDeleteIndexSupplier =
+ () -> {
+ if (cachedExistingDeleteIndex != null) {
+ return cachedExistingDeleteIndex;
+ }
+ return buildExistingDeleteIndexTracked(fromSnapshotIdExclusive);
+ };
+
+ // Plan data file tasks (ADDED and DELETED)
+ Map> cumulativeDeletesMap =
+ buildCumulativeDeletesBySnapshot(changelogSnapshots, addedDeletesBySnapshot);
+
+ CloseableIterable dataFileTasks =
+ manifestGroup.plan(
+ new CreateDataFileChangeTasks(
+ changelogSnapshots,
+ existingDeleteIndexSupplier,
+ addedDeletesBySnapshot,
+ cumulativeDeletesMap,
+ table().specs(),
+ isCaseSensitive()));
+
+ // Find EXISTING data files affected by newly added delete files and create tasks for them
+ CloseableIterable deletedRowsTasks =
+ planDeletedRowsTasks(
+ changelogSnapshots, existingDeleteIndex, addedDeletesBySnapshot, changelogSnapshotIds);
+
+ // Merge tasks from both iterables in order by changeOrdinal
+ Comparator byOrdinal =
+ Comparator.comparing(ChangelogScanTask::changeOrdinal)
+ .thenComparing(ChangelogScanTask::commitSnapshotId);
+
+ return new SortedMerge<>(byOrdinal, ImmutableList.of(dataFileTasks, deletedRowsTasks));
+ }
+
+ @Override
+ public CloseableIterable> planTasks() {
+ return TableScanUtil.planTaskGroups(
+ planFiles(), targetSplitSize(), splitLookback(), splitOpenFileCost());
+ }
+
+ // builds a collection of changelog snapshots (oldest to newest)
+ // the order of the snapshots is important as it is used to determine change ordinals
+ private Deque orderedChangelogSnapshots(Long fromIdExcl, long toIdIncl) {
+ Deque changelogSnapshots = new ArrayDeque<>();
+
+ for (Snapshot snapshot : SnapshotUtil.ancestorsBetween(table(), toIdIncl, fromIdExcl)) {
+ if (!snapshot.operation().equals(DataOperations.REPLACE)) {
+ changelogSnapshots.addFirst(snapshot);
+ }
+ }
+
+ return changelogSnapshots;
+ }
+
+ private Set toSnapshotIds(Collection snapshots) {
+ return snapshots.stream().map(Snapshot::snapshotId).collect(Collectors.toSet());
+ }
+
+ private static Map computeSnapshotOrdinals(Deque snapshots) {
+ Map snapshotOrdinals = Maps.newHashMap();
+
+ int ordinal = 0;
+
+ for (Snapshot snapshot : snapshots) {
+ snapshotOrdinals.put(snapshot.snapshotId(), ordinal++);
+ }
+
+ return snapshotOrdinals;
+ }
+
+ /**
+ * Builds a delete file index for existing deletes that were present before the start snapshot.
+ * These deletes should be applied to data files but should not generate DELETE changelog rows.
+ * Uses manifest pruning and caching to optimize performance.
+ */
+ private DeleteFileIndex buildExistingDeleteIndex(Long fromSnapshotIdExclusive) {
+ if (fromSnapshotIdExclusive == null) {
+ return EMPTY;
+ }
+ Snapshot fromSnapshot = table().snapshot(fromSnapshotIdExclusive);
+ Preconditions.checkState(
+ fromSnapshot != null, "Cannot find starting snapshot: %s", fromSnapshotIdExclusive);
+
+ List existingDeleteManifests = fromSnapshot.deleteManifests(table().io());
+ if (existingDeleteManifests.isEmpty()) {
+ return EMPTY;
+ }
+
+ // Prune manifests based on partition filter to avoid processing irrelevant manifests
+ List prunedManifests = pruneManifestsByPartition(existingDeleteManifests);
+ if (prunedManifests.isEmpty()) {
+ return EMPTY;
+ }
+
+ // Load delete files from manifests
+ Iterable deleteFiles = loadDeleteFiles(prunedManifests, null);
+
+ return DeleteFileIndex.builderFor(deleteFiles)
+ .specsById(table().specs())
+ .caseSensitive(isCaseSensitive())
+ .build();
+ }
+
+ /**
+ * Wrapper method that tracks build calls and caches the result for reuse. This ensures we only
+ * build the index once even if called from multiple places.
+ */
+ private DeleteFileIndex buildExistingDeleteIndexTracked(Long fromSnapshotIdExclusive) {
+ if (cachedExistingDeleteIndex != null) {
+ return cachedExistingDeleteIndex;
+ }
+ existingDeleteIndexBuildCallCount++;
+ cachedExistingDeleteIndex = buildExistingDeleteIndex(fromSnapshotIdExclusive);
+ return cachedExistingDeleteIndex;
+ }
+
+ // Visible for testing
+ int getExistingDeleteIndexBuildCallCount() {
+ return existingDeleteIndexBuildCallCount;
+ }
+
+ // Visible for testing
+ boolean wasExistingDeleteIndexBuilt() {
+ return existingDeleteIndexBuildCallCount > 0;
+ }
+
+ /**
+ * Builds per-snapshot delete file indexes for newly added delete files in each changelog
+ * snapshot. These deletes should generate DELETE changelog rows. Uses caching to avoid re-parsing
+ * manifests.
+ */
+ private Map buildAddedDeleteIndexes(Deque changelogSnapshots) {
+ Map addedDeletesBySnapshot = Maps.newConcurrentMap();
+ Tasks.foreach(changelogSnapshots)
+ .retry(3)
+ .stopOnFailure()
+ .throwFailureWhenFinished()
+ .executeWith(planExecutor())
+ .onFailure(
+ (snapshot, exc) ->
+ LOG.warn(
+ "Failed to build delete index for snapshot {}", snapshot.snapshotId(), exc))
+ .run(
+ snapshot -> {
+ List snapshotDeleteManifests = snapshot.deleteManifests(table().io());
+ if (snapshotDeleteManifests.isEmpty()) {
+ addedDeletesBySnapshot.put(snapshot.snapshotId(), EMPTY);
+ return;
+ }
+
+ // Filter to only include delete files added in this snapshot
+ List addedDeleteManifests =
+ snapshotDeleteManifests.stream()
+ .filter(manifest -> manifest.snapshotId().equals(snapshot.snapshotId()))
+ .collect(Collectors.toUnmodifiableList());
+
+ if (addedDeleteManifests.isEmpty()) {
+ addedDeletesBySnapshot.put(snapshot.snapshotId(), EMPTY);
+ } else {
+ // Load delete files from manifests
+ Iterable deleteFiles =
+ loadDeleteFiles(addedDeleteManifests, snapshot.snapshotId());
+
+ DeleteFileIndex index =
+ DeleteFileIndex.builderFor(deleteFiles)
+ .specsById(table().specs())
+ .caseSensitive(isCaseSensitive())
+ .build();
+ addedDeletesBySnapshot.put(snapshot.snapshotId(), index);
+ }
+ });
+ return addedDeletesBySnapshot;
+ }
+
+ /**
+ * Plans tasks for EXISTING data files that are affected by newly added delete files. These files
+ * were not added or deleted in the changelog snapshot range, but have new delete files applied to
+ * them.
+ */
+ private CloseableIterable planDeletedRowsTasks(
+ Deque changelogSnapshots,
+ DeleteFileIndex existingDeleteIndex,
+ Map addedDeletesBySnapshot,
+ Set changelogSnapshotIds) {
+
+ Map snapshotOrdinals = computeSnapshotOrdinals(changelogSnapshots);
+ List tasks = Lists.newArrayList();
+
+ // Build a map of file statuses and collect affected partitions for each snapshot
+ Pair