From 46cb957ac204a735858494b995abd148452c0f77 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Thu, 4 Jun 2026 18:29:34 -0400 Subject: [PATCH 1/3] import incremental changelog scan --- .../iceberg/BaseIncrementalChangelogScan.java | 1014 +++++++++++++++++ .../java/org/apache/iceberg/package-info.java | 20 + 2 files changed, 1034 insertions(+) create mode 100644 sdks/java/io/iceberg/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java create mode 100644 sdks/java/io/iceberg/src/main/java/org/apache/iceberg/package-info.java diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java b/sdks/java/io/iceberg/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java new file mode 100644 index 000000000000..61f06b3f7dce --- /dev/null +++ b/sdks/java/io/iceberg/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java @@ -0,0 +1,1014 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.iceberg; + +import java.util.ArrayDeque; +import java.util.Collection; +import java.util.Comparator; +import java.util.Deque; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.FluentIterable; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets; +import org.apache.iceberg.ManifestGroup.CreateTasksFunction; +import org.apache.iceberg.ManifestGroup.TaskContext; +import org.apache.iceberg.expressions.Evaluator; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.expressions.ManifestEvaluator; +import org.apache.iceberg.expressions.Projections; +import org.apache.iceberg.expressions.ResidualEvaluator; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.util.ContentFileUtil; +import org.apache.iceberg.util.Pair; +import org.apache.iceberg.util.PartitionMap; +import org.apache.iceberg.util.PartitionSet; +import org.apache.iceberg.util.SnapshotUtil; +import org.apache.iceberg.util.SortedMerge; +import org.apache.iceberg.util.TableScanUtil; +import org.apache.iceberg.util.Tasks; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Copied over from Iceberg PR #14264. + */ +@SuppressWarnings("nullness") +public class BaseIncrementalChangelogScan + extends BaseIncrementalScan< + IncrementalChangelogScan, ChangelogScanTask, ScanTaskGroup> + implements IncrementalChangelogScan { + private static final DeleteFileIndex EMPTY = createEmptyInstance(); + + private static DeleteFileIndex createEmptyInstance() { + try { + var constructor = + DeleteFileIndex.class.getDeclaredConstructor( + DeleteFileIndex.EqualityDeletes.class, + PartitionMap.class, + PartitionMap.class, + Map.class, + Map.class); + constructor.setAccessible(true); + return constructor.newInstance(null, null, null, null, null); + } catch (Exception e) { + throw new RuntimeException("Failed to initialize EMPTY DeleteFileIndex", e); + } + } + + private static final Logger LOG = LoggerFactory.getLogger(BaseIncrementalChangelogScan.class); + + public BaseIncrementalChangelogScan(Table table) { + this(table, table.schema(), TableScanContext.empty()); + } + + private BaseIncrementalChangelogScan(Table table, Schema schema, TableScanContext context) { + super(table, schema, context); + } + + @Override + protected IncrementalChangelogScan newRefinedScan( + Table newTable, Schema newSchema, TableScanContext newContext) { + return new BaseIncrementalChangelogScan(newTable, newSchema, newContext); + } + + // Private fields to track build call count and cache (accessed via package-private methods for + // testing) + private int existingDeleteIndexBuildCallCount = 0; + // Cache for the built index (null if not built yet) + private DeleteFileIndex cachedExistingDeleteIndex = null; + + @Override + protected CloseableIterable doPlanFiles( + Long fromSnapshotIdExclusive, long toSnapshotIdInclusive) { + + Deque changelogSnapshots = + orderedChangelogSnapshots(fromSnapshotIdExclusive, toSnapshotIdInclusive); + + if (changelogSnapshots.isEmpty()) { + return CloseableIterable.empty(); + } + + Set changelogSnapshotIds = toSnapshotIds(changelogSnapshots); + + Set newDataManifests = + FluentIterable.from(changelogSnapshots) + .transformAndConcat(snapshot -> snapshot.dataManifests(table().io())) + .filter(manifest -> changelogSnapshotIds.contains(manifest.snapshotId())) + .toSet(); + + // Build per-snapshot delete file indexes for added deletes + Map addedDeletesBySnapshot = buildAddedDeleteIndexes(changelogSnapshots); + + // Check if existing delete index is needed for equality deletes + boolean hasEqualityDeletes = + addedDeletesBySnapshot.values().stream() + .anyMatch(index -> !index.isEmpty() && index.hasEqualityDeletes()); + + // Build existing index early if needed for equality deletes, otherwise use lazy initialization + DeleteFileIndex existingDeleteIndex = + hasEqualityDeletes ? buildExistingDeleteIndexTracked(fromSnapshotIdExclusive) : EMPTY; + + ManifestGroup manifestGroup = + new ManifestGroup(table().io(), newDataManifests, ImmutableList.of()) + .specsById(table().specs()) + .caseSensitive(isCaseSensitive()) + .select(scanColumns()) + .filterData(filter()) + .filterManifestEntries(entry -> changelogSnapshotIds.contains(entry.snapshotId())) + .ignoreExisting() + .columnsToKeepStats(columnsToKeepStats()); + + if (shouldIgnoreResiduals()) { + manifestGroup = manifestGroup.ignoreResiduals(); + } + + if (newDataManifests.size() > 1 && shouldPlanWithExecutor()) { + manifestGroup = manifestGroup.planWith(planExecutor()); + } + + // Create a supplier that reuses already-built index or builds lazily when first DELETED entry + // is encountered + Supplier existingDeleteIndexSupplier = + () -> { + if (cachedExistingDeleteIndex != null) { + return cachedExistingDeleteIndex; + } + return buildExistingDeleteIndexTracked(fromSnapshotIdExclusive); + }; + + // Plan data file tasks (ADDED and DELETED) + Map> cumulativeDeletesMap = + buildCumulativeDeletesBySnapshot(changelogSnapshots, addedDeletesBySnapshot); + + CloseableIterable dataFileTasks = + manifestGroup.plan( + new CreateDataFileChangeTasks( + changelogSnapshots, + existingDeleteIndexSupplier, + addedDeletesBySnapshot, + cumulativeDeletesMap, + table().specs(), + isCaseSensitive())); + + // Find EXISTING data files affected by newly added delete files and create tasks for them + CloseableIterable deletedRowsTasks = + planDeletedRowsTasks( + changelogSnapshots, existingDeleteIndex, addedDeletesBySnapshot, changelogSnapshotIds); + + // Merge tasks from both iterables in order by changeOrdinal + Comparator byOrdinal = + Comparator.comparing(ChangelogScanTask::changeOrdinal) + .thenComparing(ChangelogScanTask::commitSnapshotId); + + return new SortedMerge<>(byOrdinal, ImmutableList.of(dataFileTasks, deletedRowsTasks)); + } + + @Override + public CloseableIterable> planTasks() { + return TableScanUtil.planTaskGroups( + planFiles(), targetSplitSize(), splitLookback(), splitOpenFileCost()); + } + + // builds a collection of changelog snapshots (oldest to newest) + // the order of the snapshots is important as it is used to determine change ordinals + private Deque orderedChangelogSnapshots(Long fromIdExcl, long toIdIncl) { + Deque changelogSnapshots = new ArrayDeque<>(); + + for (Snapshot snapshot : SnapshotUtil.ancestorsBetween(table(), toIdIncl, fromIdExcl)) { + if (!snapshot.operation().equals(DataOperations.REPLACE)) { + changelogSnapshots.addFirst(snapshot); + } + } + + return changelogSnapshots; + } + + private Set toSnapshotIds(Collection snapshots) { + return snapshots.stream().map(Snapshot::snapshotId).collect(Collectors.toSet()); + } + + private static Map computeSnapshotOrdinals(Deque snapshots) { + Map snapshotOrdinals = Maps.newHashMap(); + + int ordinal = 0; + + for (Snapshot snapshot : snapshots) { + snapshotOrdinals.put(snapshot.snapshotId(), ordinal++); + } + + return snapshotOrdinals; + } + + /** + * Builds a delete file index for existing deletes that were present before the start snapshot. + * These deletes should be applied to data files but should not generate DELETE changelog rows. + * Uses manifest pruning and caching to optimize performance. + */ + private DeleteFileIndex buildExistingDeleteIndex(Long fromSnapshotIdExclusive) { + if (fromSnapshotIdExclusive == null) { + return EMPTY; + } + Snapshot fromSnapshot = table().snapshot(fromSnapshotIdExclusive); + Preconditions.checkState( + fromSnapshot != null, "Cannot find starting snapshot: %s", fromSnapshotIdExclusive); + + List existingDeleteManifests = fromSnapshot.deleteManifests(table().io()); + if (existingDeleteManifests.isEmpty()) { + return EMPTY; + } + + // Prune manifests based on partition filter to avoid processing irrelevant manifests + List prunedManifests = pruneManifestsByPartition(existingDeleteManifests); + if (prunedManifests.isEmpty()) { + return EMPTY; + } + + // Load delete files from manifests + Iterable deleteFiles = loadDeleteFiles(prunedManifests, null); + + return DeleteFileIndex.builderFor(deleteFiles) + .specsById(table().specs()) + .caseSensitive(isCaseSensitive()) + .build(); + } + + /** + * Wrapper method that tracks build calls and caches the result for reuse. This ensures we only + * build the index once even if called from multiple places. + */ + private DeleteFileIndex buildExistingDeleteIndexTracked(Long fromSnapshotIdExclusive) { + if (cachedExistingDeleteIndex != null) { + return cachedExistingDeleteIndex; + } + existingDeleteIndexBuildCallCount++; + cachedExistingDeleteIndex = buildExistingDeleteIndex(fromSnapshotIdExclusive); + return cachedExistingDeleteIndex; + } + + // Visible for testing + int getExistingDeleteIndexBuildCallCount() { + return existingDeleteIndexBuildCallCount; + } + + // Visible for testing + boolean wasExistingDeleteIndexBuilt() { + return existingDeleteIndexBuildCallCount > 0; + } + + /** + * Builds per-snapshot delete file indexes for newly added delete files in each changelog + * snapshot. These deletes should generate DELETE changelog rows. Uses caching to avoid re-parsing + * manifests. + */ + private Map buildAddedDeleteIndexes(Deque changelogSnapshots) { + Map addedDeletesBySnapshot = Maps.newConcurrentMap(); + Tasks.foreach(changelogSnapshots) + .retry(3) + .stopOnFailure() + .throwFailureWhenFinished() + .executeWith(planExecutor()) + .onFailure( + (snapshot, exc) -> + LOG.warn( + "Failed to build delete index for snapshot {}", snapshot.snapshotId(), exc)) + .run( + snapshot -> { + List snapshotDeleteManifests = snapshot.deleteManifests(table().io()); + if (snapshotDeleteManifests.isEmpty()) { + addedDeletesBySnapshot.put(snapshot.snapshotId(), EMPTY); + return; + } + + // Filter to only include delete files added in this snapshot + List addedDeleteManifests = + snapshotDeleteManifests.stream() + .filter(manifest -> manifest.snapshotId().equals(snapshot.snapshotId())) + .collect(Collectors.toUnmodifiableList()); + + if (addedDeleteManifests.isEmpty()) { + addedDeletesBySnapshot.put(snapshot.snapshotId(), EMPTY); + } else { + // Load delete files from manifests + Iterable deleteFiles = + loadDeleteFiles(addedDeleteManifests, snapshot.snapshotId()); + + DeleteFileIndex index = + DeleteFileIndex.builderFor(deleteFiles) + .specsById(table().specs()) + .caseSensitive(isCaseSensitive()) + .build(); + addedDeletesBySnapshot.put(snapshot.snapshotId(), index); + } + }); + return addedDeletesBySnapshot; + } + + /** + * Plans tasks for EXISTING data files that are affected by newly added delete files. These files + * were not added or deleted in the changelog snapshot range, but have new delete files applied to + * them. + */ + private CloseableIterable planDeletedRowsTasks( + Deque changelogSnapshots, + DeleteFileIndex existingDeleteIndex, + Map addedDeletesBySnapshot, + Set changelogSnapshotIds) { + + Map snapshotOrdinals = computeSnapshotOrdinals(changelogSnapshots); + List tasks = Lists.newArrayList(); + + // Build a map of file statuses and collect affected partitions for each snapshot + Pair>, PartitionSet> fileStatusAndPartitions = + buildFileStatusBySnapshot(changelogSnapshots, changelogSnapshotIds); + Map> fileStatusBySnapshot = + fileStatusAndPartitions.first(); + PartitionSet affectedPartitions = fileStatusAndPartitions.second(); + + // Accumulate actual DeleteFile entries chronologically + List accumulatedDeletes = Lists.newArrayList(); + + // Start with deletes from before the changelog range + if (!existingDeleteIndex.isEmpty()) { + for (DeleteFile df : existingDeleteIndex.referencedDeleteFiles()) { + accumulatedDeletes.add(df); + } + } + + for (Snapshot snapshot : changelogSnapshots) { + DeleteFileIndex addedDeleteIndex = addedDeletesBySnapshot.get(snapshot.snapshotId()); + if (addedDeleteIndex.isEmpty()) { + continue; + } + + // Collect partitions of newly added delete files for pruning (important for the current + // snapshot) + for (DeleteFile df : addedDeleteIndex.referencedDeleteFiles()) { + affectedPartitions.add(df.specId(), df.partition()); + } + + DeleteFileIndex cumulativeDeleteIndex = + buildDeleteIndex(accumulatedDeletes, affectedPartitions); + + // Process data files for this snapshot + // Use a local set per snapshot to track processed files + Set alreadyProcessedPaths = Sets.newHashSet(); + processSnapshotForDeletedRowsTasks( + snapshot, + addedDeleteIndex, + cumulativeDeleteIndex, + fileStatusBySnapshot.get(snapshot.snapshotId()), + alreadyProcessedPaths, + snapshotOrdinals, + affectedPartitions, + tasks); + + // Accumulate this snapshot's added deletes for subsequent snapshots + for (DeleteFile df : addedDeleteIndex.referencedDeleteFiles()) { + accumulatedDeletes.add(df); + } + } + + return CloseableIterable.withNoopClose(tasks); + } + + /** + * Builds a map of file statuses for each snapshot, tracking which files were added or deleted in + * each snapshot. + */ + private Pair>, PartitionSet> + buildFileStatusBySnapshot( + Deque changelogSnapshots, Set changelogSnapshotIds) { + + Map> fileStatusBySnapshot = Maps.newConcurrentMap(); + java.util.Queue localPartitionsQueue = + new java.util.concurrent.ConcurrentLinkedQueue<>(); + + Tasks.foreach(changelogSnapshots) + .stopOnFailure() + .throwFailureWhenFinished() + .executeWith(planExecutor()) + .run( + snapshot -> { + Map fileStatuses = Maps.newHashMap(); + PartitionSet localAffected = PartitionSet.create(table().specs()); + + List changedDataManifests = + FluentIterable.from(snapshot.dataManifests(table().io())) + .filter(manifest -> manifest.snapshotId().equals(snapshot.snapshotId())) + .toList(); + + if (!changedDataManifests.isEmpty()) { + ManifestGroup changedGroup = + new ManifestGroup(table().io(), changedDataManifests, ImmutableList.of()) + .specsById(table().specs()) + .caseSensitive(isCaseSensitive()) + .select(scanColumns()) + .filterData(filter()) + .ignoreExisting() + .columnsToKeepStats(columnsToKeepStats()); + + try (CloseableIterable> entries = changedGroup.entries()) { + for (ManifestEntry entry : entries) { + if (changelogSnapshotIds.contains(entry.snapshotId())) { + fileStatuses.put(entry.file().location(), entry.status()); + localAffected.add(entry.file().specId(), entry.file().partition()); + } + } + } catch (Exception e) { + throw new RuntimeException( + "Failed to collect file statuses for snapshot " + snapshot.snapshotId(), e); + } + } + + fileStatusBySnapshot.put(snapshot.snapshotId(), fileStatuses); + localPartitionsQueue.add(localAffected); + }); + + PartitionSet globalAffected = PartitionSet.create(table().specs()); + for (PartitionSet local : localPartitionsQueue) { + globalAffected.addAll(local); + } + + return Pair.of(fileStatusBySnapshot, globalAffected); + } + + private List pruneManifestsByAffectedPartitions( + List manifests, PartitionSet affectedPartitions) { + if (affectedPartitions.isEmpty()) { + return manifests; + } + + Expression affectedExpr = buildAffectedPartitionExpression(affectedPartitions); + if (affectedExpr == Expressions.alwaysFalse()) { + return manifests; + } + + List pruned = Lists.newArrayList(); + for (ManifestFile manifest : manifests) { + PartitionSpec spec = table().specs().get(manifest.partitionSpecId()); + if (spec == null || spec.isUnpartitioned()) { + pruned.add(manifest); + } else if (manifestOverlapsFilter(manifest, spec, affectedExpr)) { + pruned.add(manifest); + } + } + return pruned; + } + + private Expression buildAffectedPartitionExpression(PartitionSet affectedPartitions) { + Expression combined = null; + + for (Pair pair : affectedPartitions) { + int specId = pair.first(); + StructLike partition = pair.second(); + PartitionSpec spec = table().specs().get(specId); + if (spec == null) { + continue; + } else if (spec.isUnpartitioned()) { + return Expressions.alwaysTrue(); // FALLBACK: Global delete exists, include ALL manifests! + } + + Expression specExpr = null; + for (int i = 0; i < spec.fields().size(); i++) { + org.apache.iceberg.PartitionField field = spec.fields().get(i); + Object value = partition.get(i, Object.class); + if (value != null) { + String columnName = table().schema().findColumnName(field.sourceId()); + if (columnName != null) { + Expression equalExpr = Expressions.equal(columnName, value); + specExpr = (specExpr == null) ? equalExpr : Expressions.and(specExpr, equalExpr); + } + } + } + + if (specExpr != null) { + combined = (combined == null) ? specExpr : Expressions.or(combined, specExpr); + } + } + + return combined != null ? combined : Expressions.alwaysFalse(); + } + + /** + * Builds a map of snapshot ID -> all delete files that were added in the scan range up to that + * snapshot, PRUNING files that were removed in the middle. + */ + private Map> buildCumulativeDeletesBySnapshot( + Deque snapshots, Map addedDeletesBySnapshot) { + Map> result = Maps.newHashMap(); + List accumulatedDeletes = Lists.newArrayList(); + + for (Snapshot snapshot : snapshots) { + // Save state first, so that this snapshot's tasks can use any deletes active up to this point + result.put(snapshot.snapshotId(), Lists.newArrayList(accumulatedDeletes)); + + // Check for removed deletes and prune from accumulatedDeletes for FUTURE snapshots + List changedDeletes = + FluentIterable.from(snapshot.deleteManifests(table().io())) + .filter(manifest -> manifest.snapshotId().equals(snapshot.snapshotId())) + .toList(); + + if (!changedDeletes.isEmpty()) { + Iterable removedDeletes = + loadRemovedDeleteFiles(changedDeletes, snapshot.snapshotId()); + Set removedPaths = Sets.newHashSet(); + for (DeleteFile rdf : removedDeletes) { + removedPaths.add(rdf.location()); + } + accumulatedDeletes.removeIf(df -> removedPaths.contains(df.location())); + } + + // Add new deletes for FUTURE snapshots + DeleteFileIndex addedDeleteIndex = addedDeletesBySnapshot.get(snapshot.snapshotId()); + if (addedDeleteIndex != null && !addedDeleteIndex.isEmpty()) { + for (DeleteFile df : addedDeleteIndex.referencedDeleteFiles()) { + accumulatedDeletes.add(df); + } + } + } + + return result; + } + + /** + * Builds a delete index from the accumulated list of delete files, pruning by affected + * partitions. + */ + private DeleteFileIndex buildDeleteIndex( + List accumulatedDeletes, PartitionSet affectedPartitions) { + if (accumulatedDeletes.isEmpty()) { + return EMPTY; + } + + List filteredDeletes = accumulatedDeletes; + if (!affectedPartitions.isEmpty()) { + filteredDeletes = Lists.newArrayList(); + for (DeleteFile df : accumulatedDeletes) { + PartitionSpec spec = table().specs().get(df.specId()); + if (spec == null || spec.isUnpartitioned()) { + filteredDeletes.add(df); // Always include unpartitioned deletes + } else if (affectedPartitions.contains(df.specId(), df.partition())) { + filteredDeletes.add(df); + } + } + } + + return DeleteFileIndex.builderFor(filteredDeletes) + .specsById(table().specs()) + .caseSensitive(isCaseSensitive()) + .build(); + } + + /** + * Processes data files for a snapshot to create DeletedRowsScanTask for existing files affected + * by new delete files. + */ + private void processSnapshotForDeletedRowsTasks( + Snapshot snapshot, + DeleteFileIndex addedDeleteIndex, + DeleteFileIndex cumulativeDeleteIndex, + Map currentSnapshotFiles, + Set alreadyProcessedPaths, + Map snapshotOrdinals, + PartitionSet affectedPartitions, + List tasks) { + + // Get all data files that exist in this snapshot, pruned by affected partitions + List allDataManifests = snapshot.dataManifests(table().io()); + List prunedManifests = + pruneManifestsByAffectedPartitions(allDataManifests, affectedPartitions); + + ManifestGroup allDataGroup = + new ManifestGroup(table().io(), prunedManifests, ImmutableList.of()) + .specsById(table().specs()) + .caseSensitive(isCaseSensitive()) + .select(scanColumns()) + .filterData(filter()) + .ignoreDeleted() + .columnsToKeepStats(columnsToKeepStats()); + + if (shouldIgnoreResiduals()) { + allDataGroup = allDataGroup.ignoreResiduals(); + } + + String schemaString = SchemaParser.toJson(schema()); + + // Cache per specId - same for all files with same specId + Map specStringCache = Maps.newHashMap(); + Map residualCache = Maps.newHashMap(); + Expression residualFilter = shouldIgnoreResiduals() ? Expressions.alwaysTrue() : filter(); + + try (CloseableIterable> entries = allDataGroup.entries()) { + for (ManifestEntry entry : entries) { + DataFile dataFile = entry.file(); + String filePath = dataFile.location(); + + // Skip if this file was ADDED or DELETED in this snapshot + // (those are handled by CreateDataFileChangeTasks) + if (currentSnapshotFiles.containsKey(filePath)) { + continue; + } + + // Skip if we already created a task for this file in this snapshot + // Note: alreadyProcessedPaths is local to this snapshot's processing + if (alreadyProcessedPaths.contains(filePath)) { + continue; + } + + // Check if this data file is affected by newly added delete files + DeleteFile[] addedDeletes = addedDeleteIndex.forEntry(entry); + if (addedDeletes.length == 0) { + continue; + } + + // This data file was EXISTING but has new delete files applied + // Get existing deletes from before this snapshot (cumulative) + DeleteFile[] existingDeletes = + cumulativeDeleteIndex.isEmpty() + ? new DeleteFile[0] + : cumulativeDeleteIndex.forEntry(entry); + + // Create a DeletedRowsScanTask + int changeOrdinal = snapshotOrdinals.get(snapshot.snapshotId()); + + // Use cached values (calculate once per specId) + int specId = dataFile.specId(); + String specString = + specStringCache.computeIfAbsent( + specId, id -> PartitionSpecParser.toJson(table().specs().get(id))); + ResidualEvaluator residuals = + residualCache.computeIfAbsent( + specId, + id -> { + PartitionSpec spec = table().specs().get(id); + return ResidualEvaluator.of(spec, residualFilter, isCaseSensitive()); + }); + + tasks.add( + new BaseDeletedRowsScanTask( + changeOrdinal, + snapshot.snapshotId(), + dataFile.copy(shouldKeepStats()), + addedDeletes, + existingDeletes, + schemaString, + specString, + residuals)); + + // Mark this file as processed for this snapshot + alreadyProcessedPaths.add(filePath); + } + } catch (Exception e) { + throw new RuntimeException("Failed to plan deleted rows tasks", e); + } + } + + private boolean shouldKeepStats() { + Set columns = columnsToKeepStats(); + return columns != null && !columns.isEmpty(); + } + + /** + * Loads delete files from manifests by parsing each manifest. + * + * @param manifests the delete manifests to load + * @return list of delete files + */ + private Iterable loadDeleteFiles( + List manifests, Long targetSnapshotId) { + Queue allDeleteFiles = new ConcurrentLinkedQueue<>(); + + Tasks.foreach(manifests) + .stopOnFailure() + .throwFailureWhenFinished() + .executeWith(planExecutor()) + .run( + manifest -> { + List deleteFiles = + loadDeleteFilesFromManifest(manifest, targetSnapshotId); + allDeleteFiles.addAll(deleteFiles); + }); + + return allDeleteFiles; + } + + private Iterable loadRemovedDeleteFiles( + List manifests, Long targetSnapshotId) { + Queue allDeleteFiles = new ConcurrentLinkedQueue<>(); + + Tasks.foreach(manifests) + .stopOnFailure() + .throwFailureWhenFinished() + .executeWith(planExecutor()) + .run( + manifest -> { + List deleteFiles = + loadRemovedDeleteFilesFromManifest(manifest, targetSnapshotId); + allDeleteFiles.addAll(deleteFiles); + }); + + return allDeleteFiles; + } + + private List loadRemovedDeleteFilesFromManifest( + ManifestFile manifest, Long targetSnapshotId) { + List deleteFiles = Lists.newArrayList(); + + try (ManifestReader reader = + ManifestFiles.readDeleteManifest(manifest, table().io(), table().specs())) { + for (ManifestEntry entry : reader.entries()) { + if (entry.status() == ManifestEntry.Status.DELETED + && entry.snapshotId().equals(targetSnapshotId)) { + DeleteFile file = entry.file(); + + if (!partitionMatchesFilter(file)) { + continue; + } + + Set columns = + file.content() == FileContent.POSITION_DELETES + ? Set.of(MetadataColumns.DELETE_FILE_PATH.fieldId()) + : Set.copyOf(file.equalityFieldIds()); + deleteFiles.add(ContentFileUtil.copy(file, true, columns)); + } + } + } catch (Exception e) { + throw new RuntimeException("Failed to read delete manifest: " + manifest.path(), e); + } + + return deleteFiles; + } + + /** + * Prunes delete manifests based on partition filter to avoid processing irrelevant manifests. + * This significantly improves performance when only a subset of partitions are relevant to the + * scan. + * + * @param manifests all delete manifests to consider + * @return list of manifests that might contain relevant delete files + */ + private List pruneManifestsByPartition(List manifests) { + Expression currentFilter = filter(); + + // If there's no filter, return all manifests + if (currentFilter == null || currentFilter.equals(Expressions.alwaysTrue())) { + return manifests; + } + + List prunedManifests = Lists.newArrayList(); + + for (ManifestFile manifest : manifests) { + PartitionSpec spec = table().specs().get(manifest.partitionSpecId()); + if (spec == null || spec.isUnpartitioned()) { + // Include unpartitioned manifests + prunedManifests.add(manifest); + } else if (manifestOverlapsFilter(manifest, spec, currentFilter)) { + // Check if manifest partition range overlaps with filter + prunedManifests.add(manifest); + } + } + + return prunedManifests; + } + + /** + * Checks if a manifest's partition range overlaps with the given filter. + * + * @param manifest the manifest to check + * @param spec the partition spec for the manifest + * @param filter the scan filter + * @return true if the manifest might contain matching partitions, false otherwise + */ + private boolean manifestOverlapsFilter( + ManifestFile manifest, PartitionSpec spec, Expression filter) { + try { + // Use inclusive projection to transform row filter to partition filter + Expression partitionFilter = Projections.inclusive(spec, isCaseSensitive()).project(filter); + + // Create evaluator for the partition filter + ManifestEvaluator evaluator = + ManifestEvaluator.forPartitionFilter(partitionFilter, spec, isCaseSensitive()); + + // Check if manifest could contain matching partitions + return evaluator.eval(manifest); + } catch (Exception e) { + // If evaluation fails, be conservative and include the manifest + return true; + } + } + + /** + * Checks if a delete file's partition overlaps with the current scan filter. This enables + * partition pruning to reduce memory footprint and planning overhead by skipping delete files + * that cannot possibly match any rows in the scan. + * + * @param file the delete file to check + * @return true if the delete file's partition might contain matching rows, false otherwise + */ + private boolean partitionMatchesFilter(DeleteFile file) { + // If there's no filter, all partitions match + Expression currentFilter = filter(); + if (currentFilter == null || currentFilter.equals(Expressions.alwaysTrue())) { + return true; + } + + // Get the partition spec for this delete file + PartitionSpec spec = table().specs().get(file.specId()); + if (spec == null || spec.isUnpartitioned()) { + // If spec not found or table is unpartitioned, be conservative and include the file + return true; + } + + try { + // Project the row filter to partition space using inclusive projection + // This transforms expressions on source columns to expressions on partition columns + Expression partitionFilter = + Projections.inclusive(spec, isCaseSensitive()).project(currentFilter); + + // Evaluate the projected filter against the delete file's partition + Evaluator evaluator = new Evaluator(spec.partitionType(), partitionFilter, isCaseSensitive()); + return evaluator.eval(file.partition()); + } catch (Exception e) { + // If evaluation fails, be conservative and include the file + return true; + } + } + + /** + * Loads delete files from a single manifest, parsing the manifest entries. + * + * @param manifest the delete manifest to load + * @return list of delete files from this manifest + */ + private List loadDeleteFilesFromManifest( + ManifestFile manifest, Long targetSnapshotId) { + List deleteFiles = Lists.newArrayList(); + + try (ManifestReader reader = + ManifestFiles.readDeleteManifest(manifest, table().io(), table().specs())) { + for (ManifestEntry entry : reader.entries()) { + if (entry.status() != ManifestEntry.Status.DELETED + && (targetSnapshotId == null || entry.snapshotId().equals(targetSnapshotId))) { + // Only include live delete files, copy with minimal stats to save memory + DeleteFile file = entry.file(); + + // Apply partition pruning - skip delete files that cannot match the scan filter + if (!partitionMatchesFilter(file)) { + continue; + } + + Set columns = + file.content() == FileContent.POSITION_DELETES + ? Set.of(MetadataColumns.DELETE_FILE_PATH.fieldId()) + : Set.copyOf(file.equalityFieldIds()); + deleteFiles.add(ContentFileUtil.copy(file, true, columns)); + } + } + } catch (Exception e) { + throw new RuntimeException("Failed to read delete manifest: " + manifest.path(), e); + } + + return deleteFiles; + } + + private static class CreateDataFileChangeTasks implements CreateTasksFunction { + private static final DeleteFile[] NO_DELETES = new DeleteFile[0]; + + private final Map snapshotOrdinals; + private final Supplier existingDeleteIndexSupplier; + private final Map addedDeletesBySnapshot; + private final Map> cumulativeDeletesMap; + private final Map specsById; + private final boolean caseSensitive; + + CreateDataFileChangeTasks( + Deque snapshots, + Supplier existingDeleteIndexSupplier, + Map addedDeletesBySnapshot, + Map> cumulativeDeletesMap, + Map specsById, + boolean caseSensitive) { + this.snapshotOrdinals = computeSnapshotOrdinals(snapshots); + this.existingDeleteIndexSupplier = existingDeleteIndexSupplier; + this.addedDeletesBySnapshot = addedDeletesBySnapshot; + this.cumulativeDeletesMap = cumulativeDeletesMap; + this.specsById = specsById; + this.caseSensitive = caseSensitive; + } + + @Override + public CloseableIterable apply( + CloseableIterable> entries, TaskContext context) { + + return CloseableIterable.transform( + entries, + entry -> { + long commitSnapshotId = entry.snapshotId(); + int changeOrdinal = snapshotOrdinals.get(commitSnapshotId); + DataFile dataFile = entry.file().copy(context.shouldKeepStats()); + + switch (entry.status()) { + case ADDED: + // For ADDED data files, attach delete files added in this snapshot + DeleteFile[] addedFileDeletes = getDeletesForAddedFile(entry, commitSnapshotId); + return new BaseAddedRowsScanTask( + changeOrdinal, + commitSnapshotId, + dataFile, + addedFileDeletes, + context.schemaAsString(), + context.specAsString(), + context.residuals()); + + case DELETED: + // For DELETED data files, attach ALL deletes that were present up to deletion + // This includes existing deletes AND deletes added in the scan range + DeleteFile[] deletedFileDeletes = getDeletesForDeletedFile(entry, commitSnapshotId); + return new BaseDeletedDataFileScanTask( + changeOrdinal, + commitSnapshotId, + dataFile, + deletedFileDeletes, + context.schemaAsString(), + context.specAsString(), + context.residuals()); + + default: + throw new IllegalArgumentException("Unexpected entry status: " + entry.status()); + } + }); + } + + /** + * Gets delete files that apply to an ADDED data file. Only includes deletes added in the same + * snapshot as the file. + */ + private DeleteFile[] getDeletesForAddedFile( + ManifestEntry entry, long commitSnapshotId) { + DeleteFileIndex addedDeleteIndex = addedDeletesBySnapshot.get(commitSnapshotId); + return addedDeleteIndex == null || addedDeleteIndex.isEmpty() + ? NO_DELETES + : addedDeleteIndex.forEntry(entry); + } + + /** + * Gets all delete files that were applied to a DELETED data file up to the point it was + * deleted. This includes existing deletes and all deletes added in the scan range up to (but + * not including) the deletion snapshot. + */ + private DeleteFile[] getDeletesForDeletedFile( + ManifestEntry entry, long deletionSnapshotId) { + + List allDeletes = Lists.newArrayList(); + + // Build existing delete index lazily when first DELETED entry is encountered + DeleteFileIndex existingDeleteIndex = existingDeleteIndexSupplier.get(); + DeleteFile[] existingDeletes = + existingDeleteIndex.isEmpty() ? NO_DELETES : existingDeleteIndex.forEntry(entry); + for (DeleteFile df : existingDeletes) { + allDeletes.add(df); + } + + // Add all deletes from snapshots in the scan range BEFORE the deletion + List cumulativeDeletes = cumulativeDeletesMap.get(deletionSnapshotId); + if (cumulativeDeletes != null && !cumulativeDeletes.isEmpty()) { + DeleteFileIndex tempIndex = + DeleteFileIndex.builderFor(cumulativeDeletes) + .specsById(specsById) + .caseSensitive(caseSensitive) + .build(); + DeleteFile[] applicable = tempIndex.forEntry(entry); + for (DeleteFile deleteFile : applicable) { + allDeletes.add(deleteFile); + } + } + + return allDeletes.isEmpty() ? NO_DELETES : allDeletes.toArray(new DeleteFile[0]); + } + } +} diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/iceberg/package-info.java b/sdks/java/io/iceberg/src/main/java/org/apache/iceberg/package-info.java new file mode 100644 index 000000000000..a32b307b45d3 --- /dev/null +++ b/sdks/java/io/iceberg/src/main/java/org/apache/iceberg/package-info.java @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** Classes copied from unreleased Iceberg core. */ +package org.apache.iceberg; From 59f0f09455ac6c5f31697ca88bdabd7c8991414c Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Fri, 5 Jun 2026 08:34:31 -0400 Subject: [PATCH 2/3] spotless --- .../iceberg/BaseIncrementalChangelogScan.java | 548 +++++++++--------- 1 file changed, 274 insertions(+), 274 deletions(-) diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java b/sdks/java/io/iceberg/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java index 61f06b3f7dce..6b12e16690ba 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java @@ -59,20 +59,20 @@ */ @SuppressWarnings("nullness") public class BaseIncrementalChangelogScan - extends BaseIncrementalScan< - IncrementalChangelogScan, ChangelogScanTask, ScanTaskGroup> - implements IncrementalChangelogScan { + extends BaseIncrementalScan< + IncrementalChangelogScan, ChangelogScanTask, ScanTaskGroup> + implements IncrementalChangelogScan { private static final DeleteFileIndex EMPTY = createEmptyInstance(); private static DeleteFileIndex createEmptyInstance() { try { var constructor = - DeleteFileIndex.class.getDeclaredConstructor( - DeleteFileIndex.EqualityDeletes.class, - PartitionMap.class, - PartitionMap.class, - Map.class, - Map.class); + DeleteFileIndex.class.getDeclaredConstructor( + DeleteFileIndex.EqualityDeletes.class, + PartitionMap.class, + PartitionMap.class, + Map.class, + Map.class); constructor.setAccessible(true); return constructor.newInstance(null, null, null, null, null); } catch (Exception e) { @@ -92,7 +92,7 @@ private BaseIncrementalChangelogScan(Table table, Schema schema, TableScanContex @Override protected IncrementalChangelogScan newRefinedScan( - Table newTable, Schema newSchema, TableScanContext newContext) { + Table newTable, Schema newSchema, TableScanContext newContext) { return new BaseIncrementalChangelogScan(newTable, newSchema, newContext); } @@ -104,10 +104,10 @@ protected IncrementalChangelogScan newRefinedScan( @Override protected CloseableIterable doPlanFiles( - Long fromSnapshotIdExclusive, long toSnapshotIdInclusive) { + Long fromSnapshotIdExclusive, long toSnapshotIdInclusive) { Deque changelogSnapshots = - orderedChangelogSnapshots(fromSnapshotIdExclusive, toSnapshotIdInclusive); + orderedChangelogSnapshots(fromSnapshotIdExclusive, toSnapshotIdInclusive); if (changelogSnapshots.isEmpty()) { return CloseableIterable.empty(); @@ -116,32 +116,32 @@ protected CloseableIterable doPlanFiles( Set changelogSnapshotIds = toSnapshotIds(changelogSnapshots); Set newDataManifests = - FluentIterable.from(changelogSnapshots) - .transformAndConcat(snapshot -> snapshot.dataManifests(table().io())) - .filter(manifest -> changelogSnapshotIds.contains(manifest.snapshotId())) - .toSet(); + FluentIterable.from(changelogSnapshots) + .transformAndConcat(snapshot -> snapshot.dataManifests(table().io())) + .filter(manifest -> changelogSnapshotIds.contains(manifest.snapshotId())) + .toSet(); // Build per-snapshot delete file indexes for added deletes Map addedDeletesBySnapshot = buildAddedDeleteIndexes(changelogSnapshots); // Check if existing delete index is needed for equality deletes boolean hasEqualityDeletes = - addedDeletesBySnapshot.values().stream() - .anyMatch(index -> !index.isEmpty() && index.hasEqualityDeletes()); + addedDeletesBySnapshot.values().stream() + .anyMatch(index -> !index.isEmpty() && index.hasEqualityDeletes()); // Build existing index early if needed for equality deletes, otherwise use lazy initialization DeleteFileIndex existingDeleteIndex = - hasEqualityDeletes ? buildExistingDeleteIndexTracked(fromSnapshotIdExclusive) : EMPTY; + hasEqualityDeletes ? buildExistingDeleteIndexTracked(fromSnapshotIdExclusive) : EMPTY; ManifestGroup manifestGroup = - new ManifestGroup(table().io(), newDataManifests, ImmutableList.of()) - .specsById(table().specs()) - .caseSensitive(isCaseSensitive()) - .select(scanColumns()) - .filterData(filter()) - .filterManifestEntries(entry -> changelogSnapshotIds.contains(entry.snapshotId())) - .ignoreExisting() - .columnsToKeepStats(columnsToKeepStats()); + new ManifestGroup(table().io(), newDataManifests, ImmutableList.of()) + .specsById(table().specs()) + .caseSensitive(isCaseSensitive()) + .select(scanColumns()) + .filterData(filter()) + .filterManifestEntries(entry -> changelogSnapshotIds.contains(entry.snapshotId())) + .ignoreExisting() + .columnsToKeepStats(columnsToKeepStats()); if (shouldIgnoreResiduals()) { manifestGroup = manifestGroup.ignoreResiduals(); @@ -154,36 +154,36 @@ protected CloseableIterable doPlanFiles( // Create a supplier that reuses already-built index or builds lazily when first DELETED entry // is encountered Supplier existingDeleteIndexSupplier = - () -> { - if (cachedExistingDeleteIndex != null) { - return cachedExistingDeleteIndex; - } - return buildExistingDeleteIndexTracked(fromSnapshotIdExclusive); - }; + () -> { + if (cachedExistingDeleteIndex != null) { + return cachedExistingDeleteIndex; + } + return buildExistingDeleteIndexTracked(fromSnapshotIdExclusive); + }; // Plan data file tasks (ADDED and DELETED) Map> cumulativeDeletesMap = - buildCumulativeDeletesBySnapshot(changelogSnapshots, addedDeletesBySnapshot); + buildCumulativeDeletesBySnapshot(changelogSnapshots, addedDeletesBySnapshot); CloseableIterable dataFileTasks = - manifestGroup.plan( - new CreateDataFileChangeTasks( - changelogSnapshots, - existingDeleteIndexSupplier, - addedDeletesBySnapshot, - cumulativeDeletesMap, - table().specs(), - isCaseSensitive())); + manifestGroup.plan( + new CreateDataFileChangeTasks( + changelogSnapshots, + existingDeleteIndexSupplier, + addedDeletesBySnapshot, + cumulativeDeletesMap, + table().specs(), + isCaseSensitive())); // Find EXISTING data files affected by newly added delete files and create tasks for them CloseableIterable deletedRowsTasks = - planDeletedRowsTasks( - changelogSnapshots, existingDeleteIndex, addedDeletesBySnapshot, changelogSnapshotIds); + planDeletedRowsTasks( + changelogSnapshots, existingDeleteIndex, addedDeletesBySnapshot, changelogSnapshotIds); // Merge tasks from both iterables in order by changeOrdinal Comparator byOrdinal = - Comparator.comparing(ChangelogScanTask::changeOrdinal) - .thenComparing(ChangelogScanTask::commitSnapshotId); + Comparator.comparing(ChangelogScanTask::changeOrdinal) + .thenComparing(ChangelogScanTask::commitSnapshotId); return new SortedMerge<>(byOrdinal, ImmutableList.of(dataFileTasks, deletedRowsTasks)); } @@ -191,7 +191,7 @@ protected CloseableIterable doPlanFiles( @Override public CloseableIterable> planTasks() { return TableScanUtil.planTaskGroups( - planFiles(), targetSplitSize(), splitLookback(), splitOpenFileCost()); + planFiles(), targetSplitSize(), splitLookback(), splitOpenFileCost()); } // builds a collection of changelog snapshots (oldest to newest) @@ -235,7 +235,7 @@ private DeleteFileIndex buildExistingDeleteIndex(Long fromSnapshotIdExclusive) { } Snapshot fromSnapshot = table().snapshot(fromSnapshotIdExclusive); Preconditions.checkState( - fromSnapshot != null, "Cannot find starting snapshot: %s", fromSnapshotIdExclusive); + fromSnapshot != null, "Cannot find starting snapshot: %s", fromSnapshotIdExclusive); List existingDeleteManifests = fromSnapshot.deleteManifests(table().io()); if (existingDeleteManifests.isEmpty()) { @@ -252,9 +252,9 @@ private DeleteFileIndex buildExistingDeleteIndex(Long fromSnapshotIdExclusive) { Iterable deleteFiles = loadDeleteFiles(prunedManifests, null); return DeleteFileIndex.builderFor(deleteFiles) - .specsById(table().specs()) - .caseSensitive(isCaseSensitive()) - .build(); + .specsById(table().specs()) + .caseSensitive(isCaseSensitive()) + .build(); } /** @@ -288,43 +288,43 @@ boolean wasExistingDeleteIndexBuilt() { private Map buildAddedDeleteIndexes(Deque changelogSnapshots) { Map addedDeletesBySnapshot = Maps.newConcurrentMap(); Tasks.foreach(changelogSnapshots) - .retry(3) - .stopOnFailure() - .throwFailureWhenFinished() - .executeWith(planExecutor()) - .onFailure( - (snapshot, exc) -> - LOG.warn( - "Failed to build delete index for snapshot {}", snapshot.snapshotId(), exc)) - .run( - snapshot -> { - List snapshotDeleteManifests = snapshot.deleteManifests(table().io()); - if (snapshotDeleteManifests.isEmpty()) { - addedDeletesBySnapshot.put(snapshot.snapshotId(), EMPTY); - return; - } + .retry(3) + .stopOnFailure() + .throwFailureWhenFinished() + .executeWith(planExecutor()) + .onFailure( + (snapshot, exc) -> + LOG.warn( + "Failed to build delete index for snapshot {}", snapshot.snapshotId(), exc)) + .run( + snapshot -> { + List snapshotDeleteManifests = snapshot.deleteManifests(table().io()); + if (snapshotDeleteManifests.isEmpty()) { + addedDeletesBySnapshot.put(snapshot.snapshotId(), EMPTY); + return; + } - // Filter to only include delete files added in this snapshot - List addedDeleteManifests = - snapshotDeleteManifests.stream() - .filter(manifest -> manifest.snapshotId().equals(snapshot.snapshotId())) - .collect(Collectors.toUnmodifiableList()); - - if (addedDeleteManifests.isEmpty()) { - addedDeletesBySnapshot.put(snapshot.snapshotId(), EMPTY); - } else { - // Load delete files from manifests - Iterable deleteFiles = - loadDeleteFiles(addedDeleteManifests, snapshot.snapshotId()); - - DeleteFileIndex index = - DeleteFileIndex.builderFor(deleteFiles) - .specsById(table().specs()) - .caseSensitive(isCaseSensitive()) - .build(); - addedDeletesBySnapshot.put(snapshot.snapshotId(), index); - } - }); + // Filter to only include delete files added in this snapshot + List addedDeleteManifests = + snapshotDeleteManifests.stream() + .filter(manifest -> manifest.snapshotId().equals(snapshot.snapshotId())) + .collect(Collectors.toUnmodifiableList()); + + if (addedDeleteManifests.isEmpty()) { + addedDeletesBySnapshot.put(snapshot.snapshotId(), EMPTY); + } else { + // Load delete files from manifests + Iterable deleteFiles = + loadDeleteFiles(addedDeleteManifests, snapshot.snapshotId()); + + DeleteFileIndex index = + DeleteFileIndex.builderFor(deleteFiles) + .specsById(table().specs()) + .caseSensitive(isCaseSensitive()) + .build(); + addedDeletesBySnapshot.put(snapshot.snapshotId(), index); + } + }); return addedDeletesBySnapshot; } @@ -334,19 +334,19 @@ private Map buildAddedDeleteIndexes(Deque chang * them. */ private CloseableIterable planDeletedRowsTasks( - Deque changelogSnapshots, - DeleteFileIndex existingDeleteIndex, - Map addedDeletesBySnapshot, - Set changelogSnapshotIds) { + Deque changelogSnapshots, + DeleteFileIndex existingDeleteIndex, + Map addedDeletesBySnapshot, + Set changelogSnapshotIds) { Map snapshotOrdinals = computeSnapshotOrdinals(changelogSnapshots); List tasks = Lists.newArrayList(); // Build a map of file statuses and collect affected partitions for each snapshot Pair>, PartitionSet> fileStatusAndPartitions = - buildFileStatusBySnapshot(changelogSnapshots, changelogSnapshotIds); + buildFileStatusBySnapshot(changelogSnapshots, changelogSnapshotIds); Map> fileStatusBySnapshot = - fileStatusAndPartitions.first(); + fileStatusAndPartitions.first(); PartitionSet affectedPartitions = fileStatusAndPartitions.second(); // Accumulate actual DeleteFile entries chronologically @@ -372,20 +372,20 @@ private CloseableIterable planDeletedRowsTasks( } DeleteFileIndex cumulativeDeleteIndex = - buildDeleteIndex(accumulatedDeletes, affectedPartitions); + buildDeleteIndex(accumulatedDeletes, affectedPartitions); // Process data files for this snapshot // Use a local set per snapshot to track processed files Set alreadyProcessedPaths = Sets.newHashSet(); processSnapshotForDeletedRowsTasks( - snapshot, - addedDeleteIndex, - cumulativeDeleteIndex, - fileStatusBySnapshot.get(snapshot.snapshotId()), - alreadyProcessedPaths, - snapshotOrdinals, - affectedPartitions, - tasks); + snapshot, + addedDeleteIndex, + cumulativeDeleteIndex, + fileStatusBySnapshot.get(snapshot.snapshotId()), + alreadyProcessedPaths, + snapshotOrdinals, + affectedPartitions, + tasks); // Accumulate this snapshot's added deletes for subsequent snapshots for (DeleteFile df : addedDeleteIndex.referencedDeleteFiles()) { @@ -401,53 +401,53 @@ private CloseableIterable planDeletedRowsTasks( * each snapshot. */ private Pair>, PartitionSet> - buildFileStatusBySnapshot( - Deque changelogSnapshots, Set changelogSnapshotIds) { + buildFileStatusBySnapshot( + Deque changelogSnapshots, Set changelogSnapshotIds) { Map> fileStatusBySnapshot = Maps.newConcurrentMap(); java.util.Queue localPartitionsQueue = - new java.util.concurrent.ConcurrentLinkedQueue<>(); + new java.util.concurrent.ConcurrentLinkedQueue<>(); Tasks.foreach(changelogSnapshots) - .stopOnFailure() - .throwFailureWhenFinished() - .executeWith(planExecutor()) - .run( - snapshot -> { - Map fileStatuses = Maps.newHashMap(); - PartitionSet localAffected = PartitionSet.create(table().specs()); - - List changedDataManifests = - FluentIterable.from(snapshot.dataManifests(table().io())) - .filter(manifest -> manifest.snapshotId().equals(snapshot.snapshotId())) - .toList(); - - if (!changedDataManifests.isEmpty()) { - ManifestGroup changedGroup = - new ManifestGroup(table().io(), changedDataManifests, ImmutableList.of()) - .specsById(table().specs()) - .caseSensitive(isCaseSensitive()) - .select(scanColumns()) - .filterData(filter()) - .ignoreExisting() - .columnsToKeepStats(columnsToKeepStats()); - - try (CloseableIterable> entries = changedGroup.entries()) { - for (ManifestEntry entry : entries) { - if (changelogSnapshotIds.contains(entry.snapshotId())) { - fileStatuses.put(entry.file().location(), entry.status()); - localAffected.add(entry.file().specId(), entry.file().partition()); + .stopOnFailure() + .throwFailureWhenFinished() + .executeWith(planExecutor()) + .run( + snapshot -> { + Map fileStatuses = Maps.newHashMap(); + PartitionSet localAffected = PartitionSet.create(table().specs()); + + List changedDataManifests = + FluentIterable.from(snapshot.dataManifests(table().io())) + .filter(manifest -> manifest.snapshotId().equals(snapshot.snapshotId())) + .toList(); + + if (!changedDataManifests.isEmpty()) { + ManifestGroup changedGroup = + new ManifestGroup(table().io(), changedDataManifests, ImmutableList.of()) + .specsById(table().specs()) + .caseSensitive(isCaseSensitive()) + .select(scanColumns()) + .filterData(filter()) + .ignoreExisting() + .columnsToKeepStats(columnsToKeepStats()); + + try (CloseableIterable> entries = changedGroup.entries()) { + for (ManifestEntry entry : entries) { + if (changelogSnapshotIds.contains(entry.snapshotId())) { + fileStatuses.put(entry.file().location(), entry.status()); + localAffected.add(entry.file().specId(), entry.file().partition()); + } + } + } catch (Exception e) { + throw new RuntimeException( + "Failed to collect file statuses for snapshot " + snapshot.snapshotId(), e); } } - } catch (Exception e) { - throw new RuntimeException( - "Failed to collect file statuses for snapshot " + snapshot.snapshotId(), e); - } - } - fileStatusBySnapshot.put(snapshot.snapshotId(), fileStatuses); - localPartitionsQueue.add(localAffected); - }); + fileStatusBySnapshot.put(snapshot.snapshotId(), fileStatuses); + localPartitionsQueue.add(localAffected); + }); PartitionSet globalAffected = PartitionSet.create(table().specs()); for (PartitionSet local : localPartitionsQueue) { @@ -458,7 +458,7 @@ private CloseableIterable planDeletedRowsTasks( } private List pruneManifestsByAffectedPartitions( - List manifests, PartitionSet affectedPartitions) { + List manifests, PartitionSet affectedPartitions) { if (affectedPartitions.isEmpty()) { return manifests; } @@ -519,7 +519,7 @@ private Expression buildAffectedPartitionExpression(PartitionSet affectedPartiti * snapshot, PRUNING files that were removed in the middle. */ private Map> buildCumulativeDeletesBySnapshot( - Deque snapshots, Map addedDeletesBySnapshot) { + Deque snapshots, Map addedDeletesBySnapshot) { Map> result = Maps.newHashMap(); List accumulatedDeletes = Lists.newArrayList(); @@ -529,13 +529,13 @@ private Map> buildCumulativeDeletesBySnapshot( // Check for removed deletes and prune from accumulatedDeletes for FUTURE snapshots List changedDeletes = - FluentIterable.from(snapshot.deleteManifests(table().io())) - .filter(manifest -> manifest.snapshotId().equals(snapshot.snapshotId())) - .toList(); + FluentIterable.from(snapshot.deleteManifests(table().io())) + .filter(manifest -> manifest.snapshotId().equals(snapshot.snapshotId())) + .toList(); if (!changedDeletes.isEmpty()) { Iterable removedDeletes = - loadRemovedDeleteFiles(changedDeletes, snapshot.snapshotId()); + loadRemovedDeleteFiles(changedDeletes, snapshot.snapshotId()); Set removedPaths = Sets.newHashSet(); for (DeleteFile rdf : removedDeletes) { removedPaths.add(rdf.location()); @@ -560,7 +560,7 @@ private Map> buildCumulativeDeletesBySnapshot( * partitions. */ private DeleteFileIndex buildDeleteIndex( - List accumulatedDeletes, PartitionSet affectedPartitions) { + List accumulatedDeletes, PartitionSet affectedPartitions) { if (accumulatedDeletes.isEmpty()) { return EMPTY; } @@ -579,9 +579,9 @@ private DeleteFileIndex buildDeleteIndex( } return DeleteFileIndex.builderFor(filteredDeletes) - .specsById(table().specs()) - .caseSensitive(isCaseSensitive()) - .build(); + .specsById(table().specs()) + .caseSensitive(isCaseSensitive()) + .build(); } /** @@ -589,28 +589,28 @@ private DeleteFileIndex buildDeleteIndex( * by new delete files. */ private void processSnapshotForDeletedRowsTasks( - Snapshot snapshot, - DeleteFileIndex addedDeleteIndex, - DeleteFileIndex cumulativeDeleteIndex, - Map currentSnapshotFiles, - Set alreadyProcessedPaths, - Map snapshotOrdinals, - PartitionSet affectedPartitions, - List tasks) { + Snapshot snapshot, + DeleteFileIndex addedDeleteIndex, + DeleteFileIndex cumulativeDeleteIndex, + Map currentSnapshotFiles, + Set alreadyProcessedPaths, + Map snapshotOrdinals, + PartitionSet affectedPartitions, + List tasks) { // Get all data files that exist in this snapshot, pruned by affected partitions List allDataManifests = snapshot.dataManifests(table().io()); List prunedManifests = - pruneManifestsByAffectedPartitions(allDataManifests, affectedPartitions); + pruneManifestsByAffectedPartitions(allDataManifests, affectedPartitions); ManifestGroup allDataGroup = - new ManifestGroup(table().io(), prunedManifests, ImmutableList.of()) - .specsById(table().specs()) - .caseSensitive(isCaseSensitive()) - .select(scanColumns()) - .filterData(filter()) - .ignoreDeleted() - .columnsToKeepStats(columnsToKeepStats()); + new ManifestGroup(table().io(), prunedManifests, ImmutableList.of()) + .specsById(table().specs()) + .caseSensitive(isCaseSensitive()) + .select(scanColumns()) + .filterData(filter()) + .ignoreDeleted() + .columnsToKeepStats(columnsToKeepStats()); if (shouldIgnoreResiduals()) { allDataGroup = allDataGroup.ignoreResiduals(); @@ -649,9 +649,9 @@ private void processSnapshotForDeletedRowsTasks( // This data file was EXISTING but has new delete files applied // Get existing deletes from before this snapshot (cumulative) DeleteFile[] existingDeletes = - cumulativeDeleteIndex.isEmpty() - ? new DeleteFile[0] - : cumulativeDeleteIndex.forEntry(entry); + cumulativeDeleteIndex.isEmpty() + ? new DeleteFile[0] + : cumulativeDeleteIndex.forEntry(entry); // Create a DeletedRowsScanTask int changeOrdinal = snapshotOrdinals.get(snapshot.snapshotId()); @@ -659,26 +659,26 @@ private void processSnapshotForDeletedRowsTasks( // Use cached values (calculate once per specId) int specId = dataFile.specId(); String specString = - specStringCache.computeIfAbsent( - specId, id -> PartitionSpecParser.toJson(table().specs().get(id))); + specStringCache.computeIfAbsent( + specId, id -> PartitionSpecParser.toJson(table().specs().get(id))); ResidualEvaluator residuals = - residualCache.computeIfAbsent( - specId, - id -> { - PartitionSpec spec = table().specs().get(id); - return ResidualEvaluator.of(spec, residualFilter, isCaseSensitive()); - }); + residualCache.computeIfAbsent( + specId, + id -> { + PartitionSpec spec = table().specs().get(id); + return ResidualEvaluator.of(spec, residualFilter, isCaseSensitive()); + }); tasks.add( - new BaseDeletedRowsScanTask( - changeOrdinal, - snapshot.snapshotId(), - dataFile.copy(shouldKeepStats()), - addedDeletes, - existingDeletes, - schemaString, - specString, - residuals)); + new BaseDeletedRowsScanTask( + changeOrdinal, + snapshot.snapshotId(), + dataFile.copy(shouldKeepStats()), + addedDeletes, + existingDeletes, + schemaString, + specString, + residuals)); // Mark this file as processed for this snapshot alreadyProcessedPaths.add(filePath); @@ -700,50 +700,50 @@ private boolean shouldKeepStats() { * @return list of delete files */ private Iterable loadDeleteFiles( - List manifests, Long targetSnapshotId) { + List manifests, Long targetSnapshotId) { Queue allDeleteFiles = new ConcurrentLinkedQueue<>(); Tasks.foreach(manifests) - .stopOnFailure() - .throwFailureWhenFinished() - .executeWith(planExecutor()) - .run( - manifest -> { - List deleteFiles = - loadDeleteFilesFromManifest(manifest, targetSnapshotId); - allDeleteFiles.addAll(deleteFiles); - }); + .stopOnFailure() + .throwFailureWhenFinished() + .executeWith(planExecutor()) + .run( + manifest -> { + List deleteFiles = + loadDeleteFilesFromManifest(manifest, targetSnapshotId); + allDeleteFiles.addAll(deleteFiles); + }); return allDeleteFiles; } private Iterable loadRemovedDeleteFiles( - List manifests, Long targetSnapshotId) { + List manifests, Long targetSnapshotId) { Queue allDeleteFiles = new ConcurrentLinkedQueue<>(); Tasks.foreach(manifests) - .stopOnFailure() - .throwFailureWhenFinished() - .executeWith(planExecutor()) - .run( - manifest -> { - List deleteFiles = - loadRemovedDeleteFilesFromManifest(manifest, targetSnapshotId); - allDeleteFiles.addAll(deleteFiles); - }); + .stopOnFailure() + .throwFailureWhenFinished() + .executeWith(planExecutor()) + .run( + manifest -> { + List deleteFiles = + loadRemovedDeleteFilesFromManifest(manifest, targetSnapshotId); + allDeleteFiles.addAll(deleteFiles); + }); return allDeleteFiles; } private List loadRemovedDeleteFilesFromManifest( - ManifestFile manifest, Long targetSnapshotId) { + ManifestFile manifest, Long targetSnapshotId) { List deleteFiles = Lists.newArrayList(); try (ManifestReader reader = - ManifestFiles.readDeleteManifest(manifest, table().io(), table().specs())) { + ManifestFiles.readDeleteManifest(manifest, table().io(), table().specs())) { for (ManifestEntry entry : reader.entries()) { if (entry.status() == ManifestEntry.Status.DELETED - && entry.snapshotId().equals(targetSnapshotId)) { + && entry.snapshotId().equals(targetSnapshotId)) { DeleteFile file = entry.file(); if (!partitionMatchesFilter(file)) { @@ -751,9 +751,9 @@ private List loadRemovedDeleteFilesFromManifest( } Set columns = - file.content() == FileContent.POSITION_DELETES - ? Set.of(MetadataColumns.DELETE_FILE_PATH.fieldId()) - : Set.copyOf(file.equalityFieldIds()); + file.content() == FileContent.POSITION_DELETES + ? Set.of(MetadataColumns.DELETE_FILE_PATH.fieldId()) + : Set.copyOf(file.equalityFieldIds()); deleteFiles.add(ContentFileUtil.copy(file, true, columns)); } } @@ -805,14 +805,14 @@ private List pruneManifestsByPartition(List manifest * @return true if the manifest might contain matching partitions, false otherwise */ private boolean manifestOverlapsFilter( - ManifestFile manifest, PartitionSpec spec, Expression filter) { + ManifestFile manifest, PartitionSpec spec, Expression filter) { try { // Use inclusive projection to transform row filter to partition filter Expression partitionFilter = Projections.inclusive(spec, isCaseSensitive()).project(filter); // Create evaluator for the partition filter ManifestEvaluator evaluator = - ManifestEvaluator.forPartitionFilter(partitionFilter, spec, isCaseSensitive()); + ManifestEvaluator.forPartitionFilter(partitionFilter, spec, isCaseSensitive()); // Check if manifest could contain matching partitions return evaluator.eval(manifest); @@ -848,7 +848,7 @@ private boolean partitionMatchesFilter(DeleteFile file) { // Project the row filter to partition space using inclusive projection // This transforms expressions on source columns to expressions on partition columns Expression partitionFilter = - Projections.inclusive(spec, isCaseSensitive()).project(currentFilter); + Projections.inclusive(spec, isCaseSensitive()).project(currentFilter); // Evaluate the projected filter against the delete file's partition Evaluator evaluator = new Evaluator(spec.partitionType(), partitionFilter, isCaseSensitive()); @@ -866,14 +866,14 @@ private boolean partitionMatchesFilter(DeleteFile file) { * @return list of delete files from this manifest */ private List loadDeleteFilesFromManifest( - ManifestFile manifest, Long targetSnapshotId) { + ManifestFile manifest, Long targetSnapshotId) { List deleteFiles = Lists.newArrayList(); try (ManifestReader reader = - ManifestFiles.readDeleteManifest(manifest, table().io(), table().specs())) { + ManifestFiles.readDeleteManifest(manifest, table().io(), table().specs())) { for (ManifestEntry entry : reader.entries()) { if (entry.status() != ManifestEntry.Status.DELETED - && (targetSnapshotId == null || entry.snapshotId().equals(targetSnapshotId))) { + && (targetSnapshotId == null || entry.snapshotId().equals(targetSnapshotId))) { // Only include live delete files, copy with minimal stats to save memory DeleteFile file = entry.file(); @@ -883,9 +883,9 @@ private List loadDeleteFilesFromManifest( } Set columns = - file.content() == FileContent.POSITION_DELETES - ? Set.of(MetadataColumns.DELETE_FILE_PATH.fieldId()) - : Set.copyOf(file.equalityFieldIds()); + file.content() == FileContent.POSITION_DELETES + ? Set.of(MetadataColumns.DELETE_FILE_PATH.fieldId()) + : Set.copyOf(file.equalityFieldIds()); deleteFiles.add(ContentFileUtil.copy(file, true, columns)); } } @@ -907,12 +907,12 @@ private static class CreateDataFileChangeTasks implements CreateTasksFunction snapshots, - Supplier existingDeleteIndexSupplier, - Map addedDeletesBySnapshot, - Map> cumulativeDeletesMap, - Map specsById, - boolean caseSensitive) { + Deque snapshots, + Supplier existingDeleteIndexSupplier, + Map addedDeletesBySnapshot, + Map> cumulativeDeletesMap, + Map specsById, + boolean caseSensitive) { this.snapshotOrdinals = computeSnapshotOrdinals(snapshots); this.existingDeleteIndexSupplier = existingDeleteIndexSupplier; this.addedDeletesBySnapshot = addedDeletesBySnapshot; @@ -923,45 +923,45 @@ private static class CreateDataFileChangeTasks implements CreateTasksFunction apply( - CloseableIterable> entries, TaskContext context) { + CloseableIterable> entries, TaskContext context) { return CloseableIterable.transform( - entries, - entry -> { - long commitSnapshotId = entry.snapshotId(); - int changeOrdinal = snapshotOrdinals.get(commitSnapshotId); - DataFile dataFile = entry.file().copy(context.shouldKeepStats()); - - switch (entry.status()) { - case ADDED: - // For ADDED data files, attach delete files added in this snapshot - DeleteFile[] addedFileDeletes = getDeletesForAddedFile(entry, commitSnapshotId); - return new BaseAddedRowsScanTask( - changeOrdinal, - commitSnapshotId, - dataFile, - addedFileDeletes, - context.schemaAsString(), - context.specAsString(), - context.residuals()); - - case DELETED: - // For DELETED data files, attach ALL deletes that were present up to deletion - // This includes existing deletes AND deletes added in the scan range - DeleteFile[] deletedFileDeletes = getDeletesForDeletedFile(entry, commitSnapshotId); - return new BaseDeletedDataFileScanTask( - changeOrdinal, - commitSnapshotId, - dataFile, - deletedFileDeletes, - context.schemaAsString(), - context.specAsString(), - context.residuals()); - - default: - throw new IllegalArgumentException("Unexpected entry status: " + entry.status()); - } - }); + entries, + entry -> { + long commitSnapshotId = entry.snapshotId(); + int changeOrdinal = snapshotOrdinals.get(commitSnapshotId); + DataFile dataFile = entry.file().copy(context.shouldKeepStats()); + + switch (entry.status()) { + case ADDED: + // For ADDED data files, attach delete files added in this snapshot + DeleteFile[] addedFileDeletes = getDeletesForAddedFile(entry, commitSnapshotId); + return new BaseAddedRowsScanTask( + changeOrdinal, + commitSnapshotId, + dataFile, + addedFileDeletes, + context.schemaAsString(), + context.specAsString(), + context.residuals()); + + case DELETED: + // For DELETED data files, attach ALL deletes that were present up to deletion + // This includes existing deletes AND deletes added in the scan range + DeleteFile[] deletedFileDeletes = getDeletesForDeletedFile(entry, commitSnapshotId); + return new BaseDeletedDataFileScanTask( + changeOrdinal, + commitSnapshotId, + dataFile, + deletedFileDeletes, + context.schemaAsString(), + context.specAsString(), + context.residuals()); + + default: + throw new IllegalArgumentException("Unexpected entry status: " + entry.status()); + } + }); } /** @@ -969,11 +969,11 @@ public CloseableIterable apply( * snapshot as the file. */ private DeleteFile[] getDeletesForAddedFile( - ManifestEntry entry, long commitSnapshotId) { + ManifestEntry entry, long commitSnapshotId) { DeleteFileIndex addedDeleteIndex = addedDeletesBySnapshot.get(commitSnapshotId); return addedDeleteIndex == null || addedDeleteIndex.isEmpty() - ? NO_DELETES - : addedDeleteIndex.forEntry(entry); + ? NO_DELETES + : addedDeleteIndex.forEntry(entry); } /** @@ -982,14 +982,14 @@ private DeleteFile[] getDeletesForAddedFile( * not including) the deletion snapshot. */ private DeleteFile[] getDeletesForDeletedFile( - ManifestEntry entry, long deletionSnapshotId) { + ManifestEntry entry, long deletionSnapshotId) { List allDeletes = Lists.newArrayList(); // Build existing delete index lazily when first DELETED entry is encountered DeleteFileIndex existingDeleteIndex = existingDeleteIndexSupplier.get(); DeleteFile[] existingDeletes = - existingDeleteIndex.isEmpty() ? NO_DELETES : existingDeleteIndex.forEntry(entry); + existingDeleteIndex.isEmpty() ? NO_DELETES : existingDeleteIndex.forEntry(entry); for (DeleteFile df : existingDeletes) { allDeletes.add(df); } @@ -998,10 +998,10 @@ private DeleteFile[] getDeletesForDeletedFile( List cumulativeDeletes = cumulativeDeletesMap.get(deletionSnapshotId); if (cumulativeDeletes != null && !cumulativeDeletes.isEmpty()) { DeleteFileIndex tempIndex = - DeleteFileIndex.builderFor(cumulativeDeletes) - .specsById(specsById) - .caseSensitive(caseSensitive) - .build(); + DeleteFileIndex.builderFor(cumulativeDeletes) + .specsById(specsById) + .caseSensitive(caseSensitive) + .build(); DeleteFile[] applicable = tempIndex.forEntry(entry); for (DeleteFile deleteFile : applicable) { allDeletes.add(deleteFile); From abbaabe2febb404521464016650af791aecb4ffc Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Fri, 5 Jun 2026 12:14:25 -0400 Subject: [PATCH 3/3] fix test --- .../beam/sdk/io/iceberg/catalog/IcebergCatalogBaseIT.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/IcebergCatalogBaseIT.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/IcebergCatalogBaseIT.java index 74408d67ed86..fd24580e67c1 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/IcebergCatalogBaseIT.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/IcebergCatalogBaseIT.java @@ -543,7 +543,7 @@ public void testStreamingReadWithColumnPruning_drop() throws Exception { List expectedRows = populateTable(table); - List fieldsToDrop = Arrays.asList("row", "str", "modulo_5", "nullable_long"); + List fieldsToDrop = Arrays.asList("row", "modulo_5", "nullable_long"); RowFilter rowFilter = new RowFilter(BEAM_SCHEMA).drop(fieldsToDrop); Map config = new HashMap<>(managedIcebergConfig(tableId()));