From 6fa37eccce5157f5d3de2bd9183af679b056d472 Mon Sep 17 00:00:00 2001 From: Vishnu Kamana Date: Thu, 14 May 2026 13:30:34 -0700 Subject: [PATCH] Retention: pre-flight empty-DELETE check to skip no-op spark.sql(DELETE) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Operations.runRetention unconditionally issues spark.sql("DELETE FROM ... WHERE ..."). For tables whose retention column doesn't align with the partition spec, Iceberg's COW DELETE scans every file to verify zero rows match — even when the result is a no-op. The same (input_gb, total_tasks) fingerprints recur day after day across a long tail of tables. Before issuing the DELETE, plan files matching the retention filter via table.newScan().filter(filter).planFiles(). If the iterator is empty, log and return — both the backup-manifest path and the SQL DELETE are skipped. Tests: - testRetentionCreatesSnapshotsOnNoOpDelete renamed to testRetentionSkipsNoOpDelete; flipped the snapshot-count assertion (no-op no longer produces a snapshot). - New testRetentionNoOpDeleteSkipsBackupManifests: with backup enabled but all rows current-day, asserts the .backup directory is never created and BACKUP_DIR_KEY stays unset. BDP-102216 --- .../openhouse/jobs/spark/Operations.java | 17 ++++++++++ .../openhouse/jobs/spark/OperationsTest.java | 34 +++++++++++++++++-- 2 files changed, 49 insertions(+), 2 deletions(-) diff --git a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/Operations.java b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/Operations.java index 53b1297a5..87bea48d5 100644 --- a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/Operations.java +++ b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/Operations.java @@ -312,6 +312,23 @@ public void runRetention( boolean backupEnabled, String backupDir, ZonedDateTime now) { + // Pre-flight: plan files matching the retention filter. If none match, the DELETE is a + // no-op — skip spark.sql(DELETE) to avoid the full-table COW scan that Iceberg performs + // when the retention column doesn't align with the partition spec. + Table table = getTable(fqtn); + Expression filter = + SparkJobUtil.createDeleteFilter(columnName, columnPattern, granularity, count, now); + boolean hasMatchingFiles; + try (CloseableIterable filesIterable = + table.newScan().filter(filter).planFiles()) { + hasMatchingFiles = filesIterable.iterator().hasNext(); + } catch (IOException e) { + throw new RuntimeException("Failed to plan files for retention pre-flight", e); + } + if (!hasMatchingFiles) { + log.info("Retention pre-flight: no files match filter for table {}, skipping DELETE", fqtn); + return; + } if (backupEnabled) { // Cache of manifests: partitionPath -> list of data file path Map> manifestCache = diff --git a/apps/spark/src/test/java/com/linkedin/openhouse/jobs/spark/OperationsTest.java b/apps/spark/src/test/java/com/linkedin/openhouse/jobs/spark/OperationsTest.java index 3646323b8..30560dd73 100644 --- a/apps/spark/src/test/java/com/linkedin/openhouse/jobs/spark/OperationsTest.java +++ b/apps/spark/src/test/java/com/linkedin/openhouse/jobs/spark/OperationsTest.java @@ -163,7 +163,7 @@ private void runRetentionJobWithStringPartitionColumns( } @Test - public void testRetentionCreatesSnapshotsOnNoOpDelete() throws Exception { + public void testRetentionSkipsNoOpDelete() throws Exception { final String tableName = "db_test.test_retention_sql"; try (Operations ops = Operations.withCatalog(getSparkSession(), otelEmitter)) { prepareTable(ops, tableName); @@ -174,7 +174,9 @@ public void testRetentionCreatesSnapshotsOnNoOpDelete() throws Exception { ops.runRetention(tableName, "ts", "", "day", 2, false, "", ZonedDateTime.now()); verifyRowCount(ops, tableName, 4); List snapshotsAfter = getSnapshotIds(ops, tableName); - Assertions.assertEquals(snapshots.size() + 1, snapshotsAfter.size()); + // Pre-flight planFiles returns empty (all rows are from today, cutoff is 2 days ago), + // so spark.sql(DELETE) is skipped and no new snapshot is produced. + Assertions.assertEquals(snapshots.size(), snapshotsAfter.size()); } } @@ -309,6 +311,34 @@ public void testRetentionDataManifestWithTimestampPartitionedTable() throws Exce } } + @Test + public void testRetentionNoOpDeleteSkipsBackupManifests() throws Exception { + try (Operations ops = Operations.withCatalog(getSparkSession(), otelEmitter)) { + String tableName = "db.test_noop_skip_backup"; + String columnName = "ts"; + String granularity = "DAY"; + int count = 1; + // Only insert current-day rows; cutoff (1 day ago) leaves nothing to delete. + ops.spark() + .sql( + String.format( + "create table %s (data string, ts timestamp) partitioned by (Days(ts))", + tableName)); + ops.spark().sql(String.format("insert into %s values ('a', current_timestamp())", tableName)); + ZonedDateTime now = ZonedDateTime.now(); + List snapshotsBefore = getSnapshotIds(ops, tableName); + ops.runRetention(tableName, columnName, "", granularity, count, true, ".backup", now); + // No backup manifests should be written and BACKUP_DIR_KEY should not be set. + Table table = ops.getTable(tableName); + Path backupRoot = new Path(table.location(), ".backup"); + Assertions.assertFalse(ops.fs().exists(backupRoot)); + Assertions.assertNull(table.properties().get(AppConstants.BACKUP_DIR_KEY)); + // No DELETE snapshot should be appended. + List snapshotsAfter = getSnapshotIds(ops, tableName); + Assertions.assertEquals(snapshotsBefore.size(), snapshotsAfter.size()); + } + } + @Test public void testOrphanFilesDeletionJavaAPI() throws Exception { final String tableName = "db.test_ofd_java";