Retention: pre-flight empty-DELETE check to skip no-op spark.sql(DELETE)#585
Draft
kamanavishnu wants to merge 1 commit into
Draft
Retention: pre-flight empty-DELETE check to skip no-op spark.sql(DELETE)#585kamanavishnu wants to merge 1 commit into
kamanavishnu wants to merge 1 commit into
Conversation
Operations.runRetention unconditionally issues spark.sql("DELETE FROM ... WHERE ...").
For tables whose retention column doesn't align with the partition spec, Iceberg's
COW DELETE scans every file to verify zero rows match — even when the result is a
no-op. The same (input_gb, total_tasks) fingerprints recur day after day across a
long tail of tables.
Before issuing the DELETE, plan files matching the retention filter via
table.newScan().filter(filter).planFiles(). If the iterator is empty, log and
return — both the backup-manifest path and the SQL DELETE are skipped.
Tests:
- testRetentionCreatesSnapshotsOnNoOpDelete renamed to testRetentionSkipsNoOpDelete;
flipped the snapshot-count assertion (no-op no longer produces a snapshot).
- New testRetentionNoOpDeleteSkipsBackupManifests: with backup enabled but all
rows current-day, asserts the .backup directory is never created and
BACKUP_DIR_KEY stays unset.
BDP-102216
837ea6d to
6fa37ec
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Operations.runRetentionunconditionally issuesspark.sql("DELETE FROM ... WHERE ..."). For tables whose retention column doesn't align with the partition spec, Iceberg's COWDELETEscans every file to verify zero rows match — even when the result is a no-op. In one observed long-tail case the job read 65.7 TB to delete 0 rows. The same(input_gb, total_tasks)fingerprints recur day after day across a long tail of tables, paying the validator + container-allocation + scan tax for nothing.This PR adds a pre-flight scan: before issuing the SQL
DELETE, calltable.newScan().filter(filter).planFiles()with the sameExpressionthe existing backup-manifest path already builds. If the iterator is empty, log and return — both the backup-manifest writing and the SQLDELETEare skipped.The pre-flight reads only Iceberg metadata (manifest list + manifests, ~MB scale) and relies on per-file column min/max statistics to prune. It never opens a data file. It catches the "recurring no-op" long tail where a previously-cleaned table has
min(retention_col) >= cutoffon every file. It does NOT catch pathological "wide min/max but no matching rows" cases — those continue to fall through to the existing DELETE.Tracks BDP-102216.
History context
In commit
5719ff0(April 2024, "Made Retention Operation Lightweight by removing/fixing non-metadata ops"), an earlierspark.sql(SELECT ... LIMIT 1)pre-flight was removed. The companion test was renamed fromtestRetentionDoesNotCreateSnapshotsOnNoOpDelete→testRetentionCreatesSnapshotsOnNoOpDeleteto capture the new "always emit a snapshot, even on no-op" behavior.This PR is not a re-introduction of that rejected design. The 2024-removed pre-flight was a Spark SQL SELECT that materialized rows — expensive. The pre-flight here is metadata-only (
planFiles()using Iceberg column stats) and is orders of magnitude cheaper than both the 2024-removed SELECT and the current unconditional COW DELETE.The behavioral side effect — no new snapshot per no-op retention run — was audited against
openhouse/andli-openhouse/(see "Audit" section below).Changes
Details
In
apps/spark/.../Operations.java:runRetention: prepend a pre-flight block that loads the table, builds the retention filter viaSparkJobUtil.createDeleteFilter(...), and peekstable.newScan().filter(filter).planFiles(). If empty, log"Retention pre-flight: no files match filter for table {}, skipping DELETE"and return. Otherwise fall through to the existing backup-manifest +spark.sql(DELETE)flow unchanged.No public API change.
Audit: who depends on retention producing a snapshot?
I checked both
openhouse/andli-openhouse/for consumers that rely on retention emitting a (possibly empty) snapshot per run. Concrete findings:li-openhouse/.../JasperIO.java): iterates new snapshots and, forDELETEsnapshots, loopssnapshot.removedDataFiles(fileIO). On a no-op DELETE today,removedDataFilesis empty → zero DCEs are emitted. Skipping the snapshot entirely produces the same net effect: zero DCEs.OpenHouseTableStatsKafkaPublisher(li-openhouse): emits periodic table-state stats (numSnapshots,currentSnapshotTimestamp). After this PR, tables that always no-op stop accumulating empty-delete snapshots. The stats reflect actual table state — they were artificially incrementing daily before. No consumer I found relies on the artificial increment.OpenHouseTableAuditEvent(li-openhouse commit audit Avro): emitted per commit. After this PR, no audit event for no-op retention. The previous event captured "retention ran, did nothing." No consumer I found requires that record.TableStatsCollectorUtil(openhouse): usescurrentSnapshot()for table state, not "retention specifically produced a snapshot."expireSnapshots(openhouse): time-/version-based, not delete-snapshot-based.<table>.snapshotsbyoperation = 'delete'to gate downstream behavior (the only such site is the existing OperationsTest).CommitMetricsReportersubscriber, or client API asserts "retention always emits a snapshot" as a guarantee.Reviewer ask: if you know of a monitoring or replication consumer outside these two repos that relies on retention producing a snapshot per run, please flag it. I can preserve the snapshot contract via a metadata-only Iceberg API instead of fully skipping if needed.
Testing Done
In
apps/spark/.../OperationsTest.java:testRetentionCreatesSnapshotsOnNoOpDelete→testRetentionSkipsNoOpDelete. Flipped the snapshot-count assertion fromsnapshots.size() + 1tosnapshots.size(), since the pre-flight now short-circuits a no-opDELETEand no new snapshot is appended. This is the inverse of the assertion the 2024 commit added — see "History context" above for the audit that justifies the flip.testRetentionNoOpDeleteSkipsBackupManifests: creates a partitioned table with only current-day rows, runs retention withbackupEnabled=true, and asserts (a) the.backupdirectory is never created, (b)BACKUP_DIR_KEYtable property staysnull, (c) no new snapshot is appended.Test suite run:
All six retention tests PASSED. The two existing backup-manifest tests still pass because they insert old data (2-3 days ago) —
planFiles()returns non-empty, the pre-flight passes through, and the backup path executes as before.Additional Information
Expected savings (per the linked ticket): ~10K–18K GB-hr/day on the LinkedIn ltx1-holdem fabric (~76% reduction of retention cost). The optimization is generic — any deployment with a long tail of no-op retention runs benefits.