[Iceberg CDC] Add watch for snapshots source#38834
Conversation
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request implements a snapshot-watching mechanism for Iceberg CDC in Apache Beam. By periodically polling for new snapshots and tracking them using monotonic sequence numbers, the system can effectively stream changes from an Iceberg table. It includes robust handling for watermark advancement during idle periods and provides comprehensive unit testing for the new SDF logic. Highlights
New Features🧠 You can now enable Memory (public preview) to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize the Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counterproductive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request introduces WatchForSnapshotsSdf, a SplittableDoFn that monitors Iceberg tables for new snapshots, alongside necessary visibility and configuration updates in IcebergScanConfig, ReadUtils, and TableCache. The review feedback identifies several critical issues: a potential NullPointerException in ReadUtils when resolving expired snapshots, stale metadata returns in TableCache due to asynchronous cache refreshing, incorrect range estimation and stale cache usage in WatchForSnapshotsSdf, and a missing validation check to enforce Iceberg table format version 2 or higher for CDC scans.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
| public static @Nullable Long getFromSnapshotExclusive(Table table, IcebergScanConfig scanConfig) { | ||
| @Nullable Long fromSnapshot = getFromSnapshotInclusive(table, scanConfig); |
There was a problem hiding this comment.
If fromSnapshot is not null, but the snapshot does not exist in the table (e.g., it has expired or was configured incorrectly), table.snapshot(fromSnapshot) will return null. Calling .parentId() on a null reference will throw a NullPointerException. We should add a null check to handle this defensively.
Suggested fix:
if (fromSnapshot != null) {
Snapshot snapshot = table.snapshot(fromSnapshot);
fromSnapshot = snapshot != null ? snapshot.parentId() : null;
}|
|
||
| /** Forces a table refresh and returns. */ | ||
| static Table getRefreshed(String identifier) { | ||
| public static Table getRefreshed(String identifier) { |
There was a problem hiding this comment.
Guava's LoadingCache.refresh(K) is designed to be asynchronous when CacheLoader.reload is overridden to return a future. Calling get(identifier) immediately after refresh(identifier) will return the old cached value while the refresh happens in the background. This means getRefreshed will return stale table metadata, causing the SDF to miss newly committed snapshots during the current poll.
To ensure a synchronous refresh, we should invalidate the key before fetching it:
public static Table getRefreshed(String identifier) {
INTERNAL_CACHE.invalidate(identifier);
return get(identifier);
}| private long estimateCurrentRangeEndExclusive() { | ||
| TableCache.setup(scanConfig); | ||
| Table table = TableCache.get(scanConfig.getTableIdentifier()); | ||
|
|
||
| @Nullable Long toSnapshotId = ReadUtils.getToSnapshot(table, scanConfig); | ||
| if (toSnapshotId != null) { | ||
| @Nullable Snapshot toSnapshot = table.snapshot(toSnapshotId); | ||
| return toSnapshot == null ? Long.MIN_VALUE : toSnapshot.sequenceNumber() + 1; | ||
| } | ||
|
|
||
| @Nullable Snapshot current = table.currentSnapshot(); | ||
| return current == null ? Long.MIN_VALUE : current.sequenceNumber() + 1; | ||
| } |
There was a problem hiding this comment.
There are two issues in estimateCurrentRangeEndExclusive:
- It uses
TableCache.getwhich returns the cached table, which might be stale. We should useTableCache.getRefreshedto ensure we estimate the range end based on the latest metadata. - If
toSnapshotorcurrentis null, returningLong.MIN_VALUEis problematic because the range start is at least1L. Returning a value less than the start offset can cause issues inGrowableOffsetRangeTrackerand runner progress/splitting calculations. Returning0Lis safer becauseMath.max(start, 0L)will correctly default tostart, resulting in a valid empty range[start, start).
| private long estimateCurrentRangeEndExclusive() { | |
| TableCache.setup(scanConfig); | |
| Table table = TableCache.get(scanConfig.getTableIdentifier()); | |
| @Nullable Long toSnapshotId = ReadUtils.getToSnapshot(table, scanConfig); | |
| if (toSnapshotId != null) { | |
| @Nullable Snapshot toSnapshot = table.snapshot(toSnapshotId); | |
| return toSnapshot == null ? Long.MIN_VALUE : toSnapshot.sequenceNumber() + 1; | |
| } | |
| @Nullable Snapshot current = table.currentSnapshot(); | |
| return current == null ? Long.MIN_VALUE : current.sequenceNumber() + 1; | |
| } | |
| private long estimateCurrentRangeEndExclusive() { | |
| TableCache.setup(scanConfig); | |
| Table table = TableCache.getRefreshed(scanConfig.getTableIdentifier()); | |
| @Nullable Long toSnapshotId = ReadUtils.getToSnapshot(table, scanConfig); | |
| if (toSnapshotId != null) { | |
| @Nullable Snapshot toSnapshot = table.snapshot(toSnapshotId); | |
| return toSnapshot == null ? 0L : toSnapshot.sequenceNumber() + 1; | |
| } | |
| @Nullable Snapshot current = table.currentSnapshot(); | |
| return current == null ? 0L : current.sequenceNumber() + 1; | |
| } |
| public OffsetRange initialRestriction() { | ||
| TableCache.setup(scanConfig); | ||
| Table table = TableCache.getRefreshed(scanConfig.getTableIdentifier()); |
There was a problem hiding this comment.
Iceberg CDC / Changelog scans require table format version 2 (or higher) because format version 1 does not support sequence numbers (they are always 0) or row-level deletes. If a user runs this on a v1 table, sequence numbers will always be 0, and the SDF will silently fail to discover any snapshots (since toSnapshotSeq < nextSeqInclusive will always be true).
We should add a fast-fail validation check in initialRestriction to ensure the table format version is at least 2.
public OffsetRange initialRestriction() {
TableCache.setup(scanConfig);
Table table = TableCache.getRefreshed(scanConfig.getTableIdentifier());
if (table instanceof org.apache.iceberg.HasTableOperations) {
int formatVersion =
((org.apache.iceberg.HasTableOperations) table).operations().current().formatVersion();
checkArgument(
formatVersion >= 2,
"Iceberg CDC / Changelog scans require table format version >= 2, but found version %s",
formatVersion);
}|
Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment |
Adds a source that periodically polls for snapshots and outputs the snapshot ID. Tracks progress based on snapshot sequence number, which is monotonically increasing unlike the ID.
Part of #38831
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>instead.CHANGES.mdwith noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.