Skip to content

[Iceberg CDC] Add utils base, delete reader, and serializable files#38821

Open
ahmedabu98 wants to merge 7 commits into
apache:masterfrom
ahmedabu98:iceberg_cdc_read_utils
Open

[Iceberg CDC] Add utils base, delete reader, and serializable files#38821
ahmedabu98 wants to merge 7 commits into
apache:masterfrom
ahmedabu98:iceberg_cdc_read_utils

Conversation

@ahmedabu98
Copy link
Copy Markdown
Contributor

@ahmedabu98 ahmedabu98 commented Jun 4, 2026

Adds core Iceberg CDC read building blocks, without wiring in the full streaming source yet.

Includes serializable representations for Iceberg delete files and changelog tasks, CDC-specific read helpers, and a delete reader that can emit rows affected by position and equality deletes.

Also refactors shared read utilities so the existing batch read path and the new CDC helpers can use the same Parquet reader construction logic.

Existing Iceberg read/schema transform tests were adjusted to use CDC-compatible schemas where the incremental CDC mode now requires identifier fields.

Part of #38831


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

@gemini-code-assist
Copy link
Copy Markdown
Contributor

Summary of Changes

Hello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request adds the foundational building blocks for Iceberg Change Data Capture (CDC) reads in Apache Beam. It provides the necessary infrastructure to handle delete files and changelog tasks, ensuring that incremental data changes can be accurately read and processed. The changes include refactoring existing read utilities for better code reuse and updating validation logic to support the stricter schema requirements of CDC operations.

Highlights

  • Iceberg CDC Core Infrastructure: Introduced serializable representations for Iceberg delete files and changelog tasks, and added CDC-specific read helpers to support incremental reading.
  • Delete Reader Implementation: Implemented a new delete reader capable of emitting rows affected by position and equality deletes, enabling proper CDC record processing.
  • Read Utilities Refactoring: Refactored shared read utilities to unify Parquet reader construction logic between existing batch paths and new CDC helpers.
  • Schema and Validation Updates: Adjusted Iceberg read/schema transform tests to enforce CDC-compatible schemas, including mandatory identifier fields for incremental CDC mode.
New Features

🧠 You can now enable Memory (public preview) to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize the Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counterproductive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces support for Iceberg Change Data Capture (CDC) reads in Apache Beam, adding several new classes under a new cdc package (such as CdcReadUtils, DeleteReader, and SerializableChangelogTask) and introducing SerializableDeleteFile to handle delete file serialization. It also updates IcebergScanConfig to validate CDC configurations and refactors ReadUtils to share reader construction logic. The review identified two critical issues: closing the shared FileIO instance in ReadUtils via try-with-resources, which would break subsequent table operations, and extending DelegatingSeekableInputStream in CdcReadUtils, which would lead to a runtime ClassCastException. Additionally, the reviewer suggested optimizing a stream operation in SerializableDeleteFile to reduce GC pressure and correcting minor typos in IcebergScanConfig and CdcOutputUtils.

Important

The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.

Comment thread sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ReadUtils.java Outdated
Comment on lines +683 to +700
private static final class IcebergParquetSeekableStream extends DelegatingSeekableInputStream {
private final SeekableInputStream delegate;

IcebergParquetSeekableStream(SeekableInputStream delegate) {
super(delegate);
this.delegate = delegate;
}

@Override
public long getPos() throws java.io.IOException {
return delegate.getPos();
}

@Override
public void seek(long newPos) throws java.io.IOException {
delegate.seek(newPos);
}
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

Extending DelegatingSeekableInputStream is unsafe here because its methods (such as readFully) cast the delegate stream to org.apache.parquet.io.SeekableInputStream. Since org.apache.iceberg.io.SeekableInputStream does not implement the Parquet interface, this will throw a ClassCastException at runtime when the Parquet reader attempts to read the file footer. To fix this, IcebergParquetSeekableStream should extend org.apache.parquet.io.SeekableInputStream directly and delegate the standard InputStream methods to the underlying stream.

  private static final class IcebergParquetSeekableStream extends org.apache.parquet.io.SeekableInputStream {
    private final SeekableInputStream delegate;

    IcebergParquetSeekableStream(SeekableInputStream delegate) {
      this.delegate = delegate;
    }

    @Override
    public long getPos() throws java.io.IOException {
      return delegate.getPos();
    }

    @Override
    public void seek(long newPos) throws java.io.IOException {
      delegate.seek(newPos);
    }

    @Override
    public int read() throws java.io.IOException {
      return delegate.read();
    }

    @Override
    public int read(byte[] b, int off, int len) throws java.io.IOException {
      return delegate.read(b, off, len);
    }

    @Override
    public void close() throws java.io.IOException {
      delegate.close();
    }
  }

Comment on lines +239 to +242
int[] equalityFieldIds =
Objects.requireNonNullElse(getEqualityFieldIds(), new ArrayList<Integer>()).stream()
.mapToInt(Integer::intValue)
.toArray();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Using streams and creating a new ArrayList on every invocation is inefficient and adds unnecessary GC pressure in a potentially hot path. We can optimize this by checking for null and performing a simple loop to populate the primitive array, which avoids boxing/unboxing and stream overhead.

        List<Integer> fieldIds = getEqualityFieldIds();
        int[] equalityFieldIds = new int[fieldIds != null ? fieldIds.size() : 0];
        if (fieldIds != null) {
          for (int i = 0; i < fieldIds.size(); i++) {
            equalityFieldIds[i] = fieldIds.get(i);
          }
        }

Comment on lines +390 to +392
checkState(
!primaryKeyIds.isEmpty(),
"Cannot read CDC records as the table schema does not specified any primary key fields.");
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

There is a grammatical typo in the error message: 'does not specified' should be 'does not specify'.

Suggested change
checkState(
!primaryKeyIds.isEmpty(),
"Cannot read CDC records as the table schema does not specified any primary key fields.");
checkState(
!primaryKeyIds.isEmpty(),
"Cannot read CDC records as the table schema does not specify any primary key fields.");

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented Jun 5, 2026

Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment assign set of reviewers

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented Jun 5, 2026

Assigning reviewers:

R: @Abacn for label java.

Note: If you would like to opt out of this review, comment assign to next reviewer.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant