Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,23 @@ public void runRetention(
boolean backupEnabled,
String backupDir,
ZonedDateTime now) {
// Pre-flight: plan files matching the retention filter. If none match, the DELETE is a
// no-op — skip spark.sql(DELETE) to avoid the full-table COW scan that Iceberg performs
// when the retention column doesn't align with the partition spec.
Table table = getTable(fqtn);
Expression filter =
SparkJobUtil.createDeleteFilter(columnName, columnPattern, granularity, count, now);
boolean hasMatchingFiles;
try (CloseableIterable<FileScanTask> filesIterable =
table.newScan().filter(filter).planFiles()) {
hasMatchingFiles = filesIterable.iterator().hasNext();
} catch (IOException e) {
throw new RuntimeException("Failed to plan files for retention pre-flight", e);
}
if (!hasMatchingFiles) {
log.info("Retention pre-flight: no files match filter for table {}, skipping DELETE", fqtn);
return;
}
if (backupEnabled) {
// Cache of manifests: partitionPath -> list of data file path
Map<String, List<String>> manifestCache =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ private void runRetentionJobWithStringPartitionColumns(
}

@Test
public void testRetentionCreatesSnapshotsOnNoOpDelete() throws Exception {
public void testRetentionSkipsNoOpDelete() throws Exception {
final String tableName = "db_test.test_retention_sql";
try (Operations ops = Operations.withCatalog(getSparkSession(), otelEmitter)) {
prepareTable(ops, tableName);
Expand All @@ -174,7 +174,9 @@ public void testRetentionCreatesSnapshotsOnNoOpDelete() throws Exception {
ops.runRetention(tableName, "ts", "", "day", 2, false, "", ZonedDateTime.now());
verifyRowCount(ops, tableName, 4);
List<Long> snapshotsAfter = getSnapshotIds(ops, tableName);
Assertions.assertEquals(snapshots.size() + 1, snapshotsAfter.size());
// Pre-flight planFiles returns empty (all rows are from today, cutoff is 2 days ago),
// so spark.sql(DELETE) is skipped and no new snapshot is produced.
Assertions.assertEquals(snapshots.size(), snapshotsAfter.size());
}
}

Expand Down Expand Up @@ -309,6 +311,34 @@ public void testRetentionDataManifestWithTimestampPartitionedTable() throws Exce
}
}

@Test
public void testRetentionNoOpDeleteSkipsBackupManifests() throws Exception {
try (Operations ops = Operations.withCatalog(getSparkSession(), otelEmitter)) {
String tableName = "db.test_noop_skip_backup";
String columnName = "ts";
String granularity = "DAY";
int count = 1;
// Only insert current-day rows; cutoff (1 day ago) leaves nothing to delete.
ops.spark()
.sql(
String.format(
"create table %s (data string, ts timestamp) partitioned by (Days(ts))",
tableName));
ops.spark().sql(String.format("insert into %s values ('a', current_timestamp())", tableName));
ZonedDateTime now = ZonedDateTime.now();
List<Long> snapshotsBefore = getSnapshotIds(ops, tableName);
ops.runRetention(tableName, columnName, "", granularity, count, true, ".backup", now);
// No backup manifests should be written and BACKUP_DIR_KEY should not be set.
Table table = ops.getTable(tableName);
Path backupRoot = new Path(table.location(), ".backup");
Assertions.assertFalse(ops.fs().exists(backupRoot));
Assertions.assertNull(table.properties().get(AppConstants.BACKUP_DIR_KEY));
// No DELETE snapshot should be appended.
List<Long> snapshotsAfter = getSnapshotIds(ops, tableName);
Assertions.assertEquals(snapshotsBefore.size(), snapshotsAfter.size());
}
}

@Test
public void testOrphanFilesDeletionJavaAPI() throws Exception {
final String tableName = "db.test_ofd_java";
Expand Down