Skip to content

[Iceberg CDC] Add watch for snapshots source#38834

Open
ahmedabu98 wants to merge 1 commit into
apache:masterfrom
ahmedabu98:iceberg_watchsdf
Open

[Iceberg CDC] Add watch for snapshots source#38834
ahmedabu98 wants to merge 1 commit into
apache:masterfrom
ahmedabu98:iceberg_watchsdf

Conversation

@ahmedabu98
Copy link
Copy Markdown
Contributor

Adds a source that periodically polls for snapshots and outputs the snapshot ID. Tracks progress based on snapshot sequence number, which is monotonically increasing unlike the ID.

Part of #38831


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

@gemini-code-assist
Copy link
Copy Markdown
Contributor

Summary of Changes

Hello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request implements a snapshot-watching mechanism for Iceberg CDC in Apache Beam. By periodically polling for new snapshots and tracking them using monotonic sequence numbers, the system can effectively stream changes from an Iceberg table. It includes robust handling for watermark advancement during idle periods and provides comprehensive unit testing for the new SDF logic.

Highlights

  • New WatchForSnapshotsSdf: Introduced a new SplittableDoFn that periodically polls an Iceberg table for new snapshots, tracking progress via monotonic sequence numbers.
  • Configuration Updates: Added maxSnapshotDiscoveryDelay to IcebergScanConfig to manage watermark advancement during idle periods.
  • Utility Refactoring: Updated ReadUtils and TableCache to expose necessary methods for snapshot discovery and table management.
New Features

🧠 You can now enable Memory (public preview) to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize the Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counterproductive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces WatchForSnapshotsSdf, a SplittableDoFn that monitors Iceberg tables for new snapshots, alongside necessary visibility and configuration updates in IcebergScanConfig, ReadUtils, and TableCache. The review feedback identifies several critical issues: a potential NullPointerException in ReadUtils when resolving expired snapshots, stale metadata returns in TableCache due to asynchronous cache refreshing, incorrect range estimation and stale cache usage in WatchForSnapshotsSdf, and a missing validation check to enforce Iceberg table format version 2 or higher for CDC scans.

Important

The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.

Comment on lines +186 to +187
public static @Nullable Long getFromSnapshotExclusive(Table table, IcebergScanConfig scanConfig) {
@Nullable Long fromSnapshot = getFromSnapshotInclusive(table, scanConfig);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

If fromSnapshot is not null, but the snapshot does not exist in the table (e.g., it has expired or was configured incorrectly), table.snapshot(fromSnapshot) will return null. Calling .parentId() on a null reference will throw a NullPointerException. We should add a null check to handle this defensively.

Suggested fix:

if (fromSnapshot != null) {
  Snapshot snapshot = table.snapshot(fromSnapshot);
  fromSnapshot = snapshot != null ? snapshot.parentId() : null;
}


/** Forces a table refresh and returns. */
static Table getRefreshed(String identifier) {
public static Table getRefreshed(String identifier) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Guava's LoadingCache.refresh(K) is designed to be asynchronous when CacheLoader.reload is overridden to return a future. Calling get(identifier) immediately after refresh(identifier) will return the old cached value while the refresh happens in the background. This means getRefreshed will return stale table metadata, causing the SDF to miss newly committed snapshots during the current poll.

To ensure a synchronous refresh, we should invalidate the key before fetching it:

public static Table getRefreshed(String identifier) {
  INTERNAL_CACHE.invalidate(identifier);
  return get(identifier);
}

Comment on lines +145 to +157
private long estimateCurrentRangeEndExclusive() {
TableCache.setup(scanConfig);
Table table = TableCache.get(scanConfig.getTableIdentifier());

@Nullable Long toSnapshotId = ReadUtils.getToSnapshot(table, scanConfig);
if (toSnapshotId != null) {
@Nullable Snapshot toSnapshot = table.snapshot(toSnapshotId);
return toSnapshot == null ? Long.MIN_VALUE : toSnapshot.sequenceNumber() + 1;
}

@Nullable Snapshot current = table.currentSnapshot();
return current == null ? Long.MIN_VALUE : current.sequenceNumber() + 1;
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

There are two issues in estimateCurrentRangeEndExclusive:

  1. It uses TableCache.get which returns the cached table, which might be stale. We should use TableCache.getRefreshed to ensure we estimate the range end based on the latest metadata.
  2. If toSnapshot or current is null, returning Long.MIN_VALUE is problematic because the range start is at least 1L. Returning a value less than the start offset can cause issues in GrowableOffsetRangeTracker and runner progress/splitting calculations. Returning 0L is safer because Math.max(start, 0L) will correctly default to start, resulting in a valid empty range [start, start).
Suggested change
private long estimateCurrentRangeEndExclusive() {
TableCache.setup(scanConfig);
Table table = TableCache.get(scanConfig.getTableIdentifier());
@Nullable Long toSnapshotId = ReadUtils.getToSnapshot(table, scanConfig);
if (toSnapshotId != null) {
@Nullable Snapshot toSnapshot = table.snapshot(toSnapshotId);
return toSnapshot == null ? Long.MIN_VALUE : toSnapshot.sequenceNumber() + 1;
}
@Nullable Snapshot current = table.currentSnapshot();
return current == null ? Long.MIN_VALUE : current.sequenceNumber() + 1;
}
private long estimateCurrentRangeEndExclusive() {
TableCache.setup(scanConfig);
Table table = TableCache.getRefreshed(scanConfig.getTableIdentifier());
@Nullable Long toSnapshotId = ReadUtils.getToSnapshot(table, scanConfig);
if (toSnapshotId != null) {
@Nullable Snapshot toSnapshot = table.snapshot(toSnapshotId);
return toSnapshot == null ? 0L : toSnapshot.sequenceNumber() + 1;
}
@Nullable Snapshot current = table.currentSnapshot();
return current == null ? 0L : current.sequenceNumber() + 1;
}

Comment on lines +92 to +94
public OffsetRange initialRestriction() {
TableCache.setup(scanConfig);
Table table = TableCache.getRefreshed(scanConfig.getTableIdentifier());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Iceberg CDC / Changelog scans require table format version 2 (or higher) because format version 1 does not support sequence numbers (they are always 0) or row-level deletes. If a user runs this on a v1 table, sequence numbers will always be 0, and the SDF will silently fail to discover any snapshots (since toSnapshotSeq < nextSeqInclusive will always be true).

We should add a fast-fail validation check in initialRestriction to ensure the table format version is at least 2.

  public OffsetRange initialRestriction() {
    TableCache.setup(scanConfig);
    Table table = TableCache.getRefreshed(scanConfig.getTableIdentifier());

    if (table instanceof org.apache.iceberg.HasTableOperations) {
      int formatVersion =
          ((org.apache.iceberg.HasTableOperations) table).operations().current().formatVersion();
      checkArgument(
          formatVersion >= 2,
          "Iceberg CDC / Changelog scans require table format version >= 2, but found version %s",
          formatVersion);
    }

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented Jun 5, 2026

Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment assign set of reviewers

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant