[Iceberg CDC] Add Snapshot Changelog Scanner#38836
Conversation
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request adds a new Highlights
New Features🧠 You can now enable Memory (public preview) to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize the Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counterproductive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here. Footnotes
|
|
Need to merge #38821 first |
There was a problem hiding this comment.
Code Review
This pull request introduces Iceberg CDC support by adding ChangelogDescriptor, ChangelogScanner, and corresponding unit tests. The ChangelogScanner optimizes data processing by routing changelog tasks into unidirectional, small bidirectional, or large bidirectional paths. The review feedback highlights a critical issue where instance variables in the DoFn could leak state or cause a NullPointerException, suggests an early return optimization when primary key fields are empty, and recommends removing unused helper methods in both the scanner and its test suite.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
| public void process(@Element Long snapshotId, MultiOutputReceiver out) throws IOException { | ||
| resetLocalMetrics(); | ||
| // not using getRefreshed because upstream Watch should have already refreshed the | ||
| // table to a state where this snapshot exists | ||
| this.table = SerializableTable.copyOf(TableCache.get(scanConfig.getTableIdentifier())); | ||
| this.snapshot = table.snapshot(snapshotId); | ||
|
|
||
| // refresh on miss | ||
| if (this.snapshot == null) { | ||
| this.table = | ||
| SerializableTable.copyOf(TableCache.getRefreshed(scanConfig.getTableIdentifier())); | ||
| this.snapshot = | ||
| checkStateNotNull( | ||
| table.snapshot(snapshotId), "Could not retrieve table snapshot: %s", snapshotId); | ||
| } |
There was a problem hiding this comment.
The table and snapshot fields are instance variables of the DoFn. Since Beam runners can reuse DoFn instances across multiple elements, these fields can leak state from previous elements if they are not reset. Furthermore, if TableCache.get() returns null (e.g., on a cache miss), calling table.snapshot(snapshotId) directly will throw a NullPointerException before the 'refresh on miss' block is ever reached.
To fix both issues, reset the instance fields at the start of process and perform a null check on the cached table before attempting to retrieve the snapshot.
| public void process(@Element Long snapshotId, MultiOutputReceiver out) throws IOException { | |
| resetLocalMetrics(); | |
| // not using getRefreshed because upstream Watch should have already refreshed the | |
| // table to a state where this snapshot exists | |
| this.table = SerializableTable.copyOf(TableCache.get(scanConfig.getTableIdentifier())); | |
| this.snapshot = table.snapshot(snapshotId); | |
| // refresh on miss | |
| if (this.snapshot == null) { | |
| this.table = | |
| SerializableTable.copyOf(TableCache.getRefreshed(scanConfig.getTableIdentifier())); | |
| this.snapshot = | |
| checkStateNotNull( | |
| table.snapshot(snapshotId), "Could not retrieve table snapshot: %s", snapshotId); | |
| } | |
| @ProcessElement | |
| public void process(@Element Long snapshotId, MultiOutputReceiver out) throws IOException { | |
| resetLocalMetrics(); | |
| this.table = null; | |
| this.snapshot = null; | |
| Table cachedTable = TableCache.get(scanConfig.getTableIdentifier()); | |
| if (cachedTable != null) { | |
| this.table = SerializableTable.copyOf(cachedTable); | |
| this.snapshot = this.table.snapshot(snapshotId); | |
| } | |
| // refresh on miss | |
| if (this.snapshot == null) { | |
| this.table = | |
| SerializableTable.copyOf(TableCache.getRefreshed(scanConfig.getTableIdentifier())); | |
| this.snapshot = | |
| checkStateNotNull( | |
| table.snapshot(snapshotId), "Could not retrieve table snapshot: %s", snapshotId); | |
| } |
| boolean metricsAvailable = true; | ||
| MetricsConfig metricsConfig = MetricsConfig.forTable(table); | ||
| Collection<String> pkFields = table.schema().identifierFieldNames(); | ||
| for (String field : pkFields) { | ||
| MetricsModes.MetricsMode mode = metricsConfig.columnMode(field); | ||
| if (!(mode instanceof MetricsModes.Full) && !(mode instanceof MetricsModes.Truncate)) { | ||
| metricsAvailable = false; | ||
| break; | ||
| } | ||
| } | ||
| if (metricsAvailable) { | ||
| scan = scan.includeColumnStats(pkFields); | ||
| } |
There was a problem hiding this comment.
If pkFields is empty, metricsAvailable remains true, which leads to calling scan.includeColumnStats(Collections.emptyList()). If there are no primary keys, we cannot perform file-level overlap analysis anyway. Returning early when pkFields is empty avoids unnecessary overhead.
Collection<String> pkFields = table.schema().identifierFieldNames();
if (pkFields.isEmpty()) {
return scan;
}
boolean metricsAvailable = true;
MetricsConfig metricsConfig = MetricsConfig.forTable(table);
for (String field : pkFields) {
MetricsModes.MetricsMode mode = metricsConfig.columnMode(field);
if (!(mode instanceof MetricsModes.Full) && !(mode instanceof MetricsModes.Truncate)) {
metricsAvailable = false;
break;
}
}
if (metricsAvailable) {
scan = scan.includeColumnStats(pkFields);
}| static String name(String path) { | ||
| return Iterables.getLast(Splitter.on("-").split(path)); | ||
| } |
| private static StructLike partition(long id) { | ||
| PartitionKey partitionKey = new PartitionKey(IDENTITY_ID_SPEC, SINGLE_PK_SCHEMA); | ||
| GenericRecord record = GenericRecord.create(SINGLE_PK_SCHEMA); | ||
| record.setField("id", id); | ||
| record.setField("data", "partition-" + id); | ||
| partitionKey.partition(record); | ||
| return partitionKey; | ||
| } |
Adds a DoFn that scans each incoming snapshot and plans batches of changelog scan tasks. Each batch is routed to the appropriate downstream path depending on its contents and size. There's a lot of details in the java doc, but to summarize:
We perform a few optimizations to limit the amount of data we need to resolve:
Note: "update resolution" is logic that compares inserts and deletes to check if an update has occurred.
Part of #38831
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>instead.CHANGES.mdwith noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.