diff --git a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/Operations.java b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/Operations.java index 53b1297a5..87bea48d5 100644 --- a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/Operations.java +++ b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/Operations.java @@ -312,6 +312,23 @@ public void runRetention( boolean backupEnabled, String backupDir, ZonedDateTime now) { + // Pre-flight: plan files matching the retention filter. If none match, the DELETE is a + // no-op — skip spark.sql(DELETE) to avoid the full-table COW scan that Iceberg performs + // when the retention column doesn't align with the partition spec. + Table table = getTable(fqtn); + Expression filter = + SparkJobUtil.createDeleteFilter(columnName, columnPattern, granularity, count, now); + boolean hasMatchingFiles; + try (CloseableIterable filesIterable = + table.newScan().filter(filter).planFiles()) { + hasMatchingFiles = filesIterable.iterator().hasNext(); + } catch (IOException e) { + throw new RuntimeException("Failed to plan files for retention pre-flight", e); + } + if (!hasMatchingFiles) { + log.info("Retention pre-flight: no files match filter for table {}, skipping DELETE", fqtn); + return; + } if (backupEnabled) { // Cache of manifests: partitionPath -> list of data file path Map> manifestCache = diff --git a/apps/spark/src/test/java/com/linkedin/openhouse/jobs/spark/OperationsTest.java b/apps/spark/src/test/java/com/linkedin/openhouse/jobs/spark/OperationsTest.java index 3646323b8..30560dd73 100644 --- a/apps/spark/src/test/java/com/linkedin/openhouse/jobs/spark/OperationsTest.java +++ b/apps/spark/src/test/java/com/linkedin/openhouse/jobs/spark/OperationsTest.java @@ -163,7 +163,7 @@ private void runRetentionJobWithStringPartitionColumns( } @Test - public void testRetentionCreatesSnapshotsOnNoOpDelete() throws Exception { + public void testRetentionSkipsNoOpDelete() throws Exception { final String tableName = "db_test.test_retention_sql"; try (Operations ops = Operations.withCatalog(getSparkSession(), otelEmitter)) { prepareTable(ops, tableName); @@ -174,7 +174,9 @@ public void testRetentionCreatesSnapshotsOnNoOpDelete() throws Exception { ops.runRetention(tableName, "ts", "", "day", 2, false, "", ZonedDateTime.now()); verifyRowCount(ops, tableName, 4); List snapshotsAfter = getSnapshotIds(ops, tableName); - Assertions.assertEquals(snapshots.size() + 1, snapshotsAfter.size()); + // Pre-flight planFiles returns empty (all rows are from today, cutoff is 2 days ago), + // so spark.sql(DELETE) is skipped and no new snapshot is produced. + Assertions.assertEquals(snapshots.size(), snapshotsAfter.size()); } } @@ -309,6 +311,34 @@ public void testRetentionDataManifestWithTimestampPartitionedTable() throws Exce } } + @Test + public void testRetentionNoOpDeleteSkipsBackupManifests() throws Exception { + try (Operations ops = Operations.withCatalog(getSparkSession(), otelEmitter)) { + String tableName = "db.test_noop_skip_backup"; + String columnName = "ts"; + String granularity = "DAY"; + int count = 1; + // Only insert current-day rows; cutoff (1 day ago) leaves nothing to delete. + ops.spark() + .sql( + String.format( + "create table %s (data string, ts timestamp) partitioned by (Days(ts))", + tableName)); + ops.spark().sql(String.format("insert into %s values ('a', current_timestamp())", tableName)); + ZonedDateTime now = ZonedDateTime.now(); + List snapshotsBefore = getSnapshotIds(ops, tableName); + ops.runRetention(tableName, columnName, "", granularity, count, true, ".backup", now); + // No backup manifests should be written and BACKUP_DIR_KEY should not be set. + Table table = ops.getTable(tableName); + Path backupRoot = new Path(table.location(), ".backup"); + Assertions.assertFalse(ops.fs().exists(backupRoot)); + Assertions.assertNull(table.properties().get(AppConstants.BACKUP_DIR_KEY)); + // No DELETE snapshot should be appended. + List snapshotsAfter = getSnapshotIds(ops, tableName); + Assertions.assertEquals(snapshotsBefore.size(), snapshotsAfter.size()); + } + } + @Test public void testOrphanFilesDeletionJavaAPI() throws Exception { final String tableName = "db.test_ofd_java";