[Iceberg CDC] Add utils base, delete reader, and serializable files#38821
[Iceberg CDC] Add utils base, delete reader, and serializable files#38821ahmedabu98 wants to merge 7 commits into
Conversation
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request adds the foundational building blocks for Iceberg Change Data Capture (CDC) reads in Apache Beam. It provides the necessary infrastructure to handle delete files and changelog tasks, ensuring that incremental data changes can be accurately read and processed. The changes include refactoring existing read utilities for better code reuse and updating validation logic to support the stricter schema requirements of CDC operations. Highlights
New Features🧠 You can now enable Memory (public preview) to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize the Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counterproductive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request introduces support for Iceberg Change Data Capture (CDC) reads in Apache Beam, adding several new classes under a new cdc package (such as CdcReadUtils, DeleteReader, and SerializableChangelogTask) and introducing SerializableDeleteFile to handle delete file serialization. It also updates IcebergScanConfig to validate CDC configurations and refactors ReadUtils to share reader construction logic. The review identified two critical issues: closing the shared FileIO instance in ReadUtils via try-with-resources, which would break subsequent table operations, and extending DelegatingSeekableInputStream in CdcReadUtils, which would lead to a runtime ClassCastException. Additionally, the reviewer suggested optimizing a stream operation in SerializableDeleteFile to reduce GC pressure and correcting minor typos in IcebergScanConfig and CdcOutputUtils.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
| private static final class IcebergParquetSeekableStream extends DelegatingSeekableInputStream { | ||
| private final SeekableInputStream delegate; | ||
|
|
||
| IcebergParquetSeekableStream(SeekableInputStream delegate) { | ||
| super(delegate); | ||
| this.delegate = delegate; | ||
| } | ||
|
|
||
| @Override | ||
| public long getPos() throws java.io.IOException { | ||
| return delegate.getPos(); | ||
| } | ||
|
|
||
| @Override | ||
| public void seek(long newPos) throws java.io.IOException { | ||
| delegate.seek(newPos); | ||
| } | ||
| } |
There was a problem hiding this comment.
Extending DelegatingSeekableInputStream is unsafe here because its methods (such as readFully) cast the delegate stream to org.apache.parquet.io.SeekableInputStream. Since org.apache.iceberg.io.SeekableInputStream does not implement the Parquet interface, this will throw a ClassCastException at runtime when the Parquet reader attempts to read the file footer. To fix this, IcebergParquetSeekableStream should extend org.apache.parquet.io.SeekableInputStream directly and delegate the standard InputStream methods to the underlying stream.
private static final class IcebergParquetSeekableStream extends org.apache.parquet.io.SeekableInputStream {
private final SeekableInputStream delegate;
IcebergParquetSeekableStream(SeekableInputStream delegate) {
this.delegate = delegate;
}
@Override
public long getPos() throws java.io.IOException {
return delegate.getPos();
}
@Override
public void seek(long newPos) throws java.io.IOException {
delegate.seek(newPos);
}
@Override
public int read() throws java.io.IOException {
return delegate.read();
}
@Override
public int read(byte[] b, int off, int len) throws java.io.IOException {
return delegate.read(b, off, len);
}
@Override
public void close() throws java.io.IOException {
delegate.close();
}
}| int[] equalityFieldIds = | ||
| Objects.requireNonNullElse(getEqualityFieldIds(), new ArrayList<Integer>()).stream() | ||
| .mapToInt(Integer::intValue) | ||
| .toArray(); |
There was a problem hiding this comment.
Using streams and creating a new ArrayList on every invocation is inefficient and adds unnecessary GC pressure in a potentially hot path. We can optimize this by checking for null and performing a simple loop to populate the primitive array, which avoids boxing/unboxing and stream overhead.
List<Integer> fieldIds = getEqualityFieldIds();
int[] equalityFieldIds = new int[fieldIds != null ? fieldIds.size() : 0];
if (fieldIds != null) {
for (int i = 0; i < fieldIds.size(); i++) {
equalityFieldIds[i] = fieldIds.get(i);
}
}| checkState( | ||
| !primaryKeyIds.isEmpty(), | ||
| "Cannot read CDC records as the table schema does not specified any primary key fields."); |
There was a problem hiding this comment.
There is a grammatical typo in the error message: 'does not specified' should be 'does not specify'.
| checkState( | |
| !primaryKeyIds.isEmpty(), | |
| "Cannot read CDC records as the table schema does not specified any primary key fields."); | |
| checkState( | |
| !primaryKeyIds.isEmpty(), | |
| "Cannot read CDC records as the table schema does not specify any primary key fields."); |
|
Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment |
|
Assigning reviewers: R: @Abacn for label java. Note: If you would like to opt out of this review, comment Available commands:
The PR bot will only process comments in the main thread (not review comments). |
Adds core Iceberg CDC read building blocks, without wiring in the full streaming source yet.
Includes serializable representations for Iceberg delete files and changelog tasks, CDC-specific read helpers, and a delete reader that can emit rows affected by position and equality deletes.
Also refactors shared read utilities so the existing batch read path and the new CDC helpers can use the same Parquet reader construction logic.
Existing Iceberg read/schema transform tests were adjusted to use CDC-compatible schemas where the incremental CDC mode now requires identifier fields.
Part of #38831
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>instead.CHANGES.mdwith noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.