From 1e37713acf9f53bb3dd834b2336ea65ad1c007c6 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Fri, 5 Jun 2026 14:59:46 -0400 Subject: [PATCH] add watch for snapshots source --- .../sdk/io/iceberg/IcebergScanConfig.java | 5 + .../apache/beam/sdk/io/iceberg/ReadUtils.java | 12 +- .../beam/sdk/io/iceberg/TableCache.java | 8 +- .../io/iceberg/cdc/WatchForSnapshotsSdf.java | 289 ++++++++++++++++++ .../sdk/io/iceberg/TestDataWarehouse.java | 2 +- .../iceberg/cdc/WatchForSnapshotsSdfTest.java | 265 ++++++++++++++++ 6 files changed, 573 insertions(+), 8 deletions(-) create mode 100644 sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/cdc/WatchForSnapshotsSdf.java create mode 100644 sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/cdc/WatchForSnapshotsSdfTest.java diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergScanConfig.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergScanConfig.java index 45ecc7cf71c3..2c74a666e600 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergScanConfig.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergScanConfig.java @@ -226,6 +226,9 @@ public Expression getFilter() { @Pure public abstract @Nullable List getDropFields(); + @Pure + public abstract @Nullable Duration getMaxSnapshotDiscoveryDelay(); + @Pure public static Builder builder() { return new AutoValue_IcebergScanConfig.Builder() @@ -311,6 +314,8 @@ public Builder setTableIdentifier(String... names) { public abstract Builder setDropFields(@Nullable List fields); + public abstract Builder setMaxSnapshotDiscoveryDelay(@Nullable Duration delay); + public abstract IcebergScanConfig build(); } diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ReadUtils.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ReadUtils.java index e7f50882f433..6859804596b9 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ReadUtils.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ReadUtils.java @@ -158,7 +158,7 @@ static ParquetReader createReader(InputFile inputFile, Schema schema) { } } - static @Nullable Long getFromSnapshotExclusive(Table table, IcebergScanConfig scanConfig) { + public static @Nullable Long getFromSnapshotInclusive(Table table, IcebergScanConfig scanConfig) { @Nullable StartingStrategy startingStrategy = scanConfig.getStartingStrategy(); boolean isStreaming = MoreObjects.firstNonNull(scanConfig.getStreaming(), false); if (startingStrategy == null) { @@ -179,17 +179,23 @@ static ParquetReader createReader(InputFile inputFile, Schema schema) { fromSnapshot = currentSnapshot.snapshotId(); } } + + return fromSnapshot; + } + + public static @Nullable Long getFromSnapshotExclusive(Table table, IcebergScanConfig scanConfig) { + @Nullable Long fromSnapshot = getFromSnapshotInclusive(table, scanConfig); // incremental append scan can only be configured with an *exclusive* starting snapshot, // so we need to provide this snapshot's parent id. if (fromSnapshot != null) { fromSnapshot = table.snapshot(fromSnapshot).parentId(); } - // 4. if snapshot is still null, the scan will default to the oldest snapshot, i.e. EARLIEST + // if snapshot is still null, the scan will default to the oldest snapshot, i.e. EARLIEST return fromSnapshot; } - static @Nullable Long getToSnapshot(Table table, IcebergScanConfig scanConfig) { + public static @Nullable Long getToSnapshot(Table table, IcebergScanConfig scanConfig) { // 1. fetch from to_snapshot @Nullable Long toSnapshot = scanConfig.getToSnapshot(); // 2. fetch from to_timestamp diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/TableCache.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/TableCache.java index cb00d90f7fb3..d9d8802e2b49 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/TableCache.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/TableCache.java @@ -33,7 +33,7 @@ import org.apache.iceberg.catalog.TableIdentifier; /** Utility to fetch and cache Iceberg {@link Table}s. */ -class TableCache { +public class TableCache { private static final Map CATALOG_CACHE = new ConcurrentHashMap<>(); private static final LoadingCache INTERNAL_CACHE = CacheBuilder.newBuilder() @@ -55,7 +55,7 @@ public ListenableFuture reload(String unusedIdentifier, Table table) { } });; - static Table get(String identifier) { + public static Table get(String identifier) { try { return INTERNAL_CACHE.get(identifier); } catch (ExecutionException e) { @@ -65,12 +65,12 @@ static Table get(String identifier) { } /** Forces a table refresh and returns. */ - static Table getRefreshed(String identifier) { + public static Table getRefreshed(String identifier) { INTERNAL_CACHE.refresh(identifier); return get(identifier); } - static void setup(IcebergScanConfig scanConfig) { + public static void setup(IcebergScanConfig scanConfig) { String tableIdentifier = scanConfig.getTableIdentifier(); IcebergCatalogConfig catalogConfig = scanConfig.getCatalogConfig(); if (CATALOG_CACHE.containsKey(tableIdentifier)) { diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/cdc/WatchForSnapshotsSdf.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/cdc/WatchForSnapshotsSdf.java new file mode 100644 index 000000000000..0d232c64053f --- /dev/null +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/cdc/WatchForSnapshotsSdf.java @@ -0,0 +1,289 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.iceberg.cdc; + +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.TimeUnit; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.io.iceberg.IcebergScanConfig; +import org.apache.beam.sdk.io.iceberg.ReadUtils; +import org.apache.beam.sdk.io.iceberg.SnapshotInfo; +import org.apache.beam.sdk.io.iceberg.TableCache; +import org.apache.beam.sdk.io.range.OffsetRange; +import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.Gauge; +import org.apache.beam.sdk.metrics.Metrics; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.splittabledofn.GrowableOffsetRangeTracker; +import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator; +import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker; +import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; +import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.Manual; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.Preconditions; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.Table; +import org.apache.iceberg.util.SnapshotUtil; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * SplittableDoFn that watches an Iceberg table for new {@link Snapshot}s and emits them one at a + * time, advancing the watermark per snapshot. Each snapshot is later processed by {@link + * ChangelogScanner}. + * + *

The restriction tracks Snapshots via their sequence numbers, which are monotonic, + * unlike snapshot IDs. The initial range starts at the sequence number of the user-configured + * starting snapshot (or one if none configured) and runs to {@link Long#MAX_VALUE}. Each call to + * {@code @ProcessElement} claims the sequence numbers of newly discovered snapshots in + * chronological order. + * + *

Uses a {@link Manual} watermark estimator. After emitting a snapshot, the watermark is set to + * that snapshot's commit time. On empty polls, the watermark is bumped to {@code now() - + * MAX_SNAPSHOT_DISCOVERY_DELAY} to prevent downstream windows from stalling indefinitely during + * quiet periods. + */ +@DoFn.UnboundedPerElement +class WatchForSnapshotsSdf extends DoFn { + private static final Logger LOG = LoggerFactory.getLogger(WatchForSnapshotsSdf.class); + private static final Duration DEFAULT_POLL_INTERVAL = Duration.standardSeconds(60); + + private static final Counter snapshotsEmitted = + Metrics.counter(WatchForSnapshotsSdf.class, "snapshotsEmitted"); + private static final Gauge latestEmittedSnapshotId = + Metrics.gauge(WatchForSnapshotsSdf.class, "latestEmittedSnapshotId"); + // TODO(ahmedabu98): consider exposing this as a config option + private static final Duration MAX_SNAPSHOT_DISCOVERY_DELAY = Duration.standardMinutes(5); + private static final Long POLL_FOREVER = Long.MAX_VALUE; + + private final IcebergScanConfig scanConfig; + private final Duration pollInterval; + + WatchForSnapshotsSdf(IcebergScanConfig scanConfig) { + this.scanConfig = scanConfig; + this.pollInterval = + MoreObjects.firstNonNull(scanConfig.getPollInterval(), DEFAULT_POLL_INTERVAL); + } + + @GetInitialRestriction + public OffsetRange initialRestriction() { + TableCache.setup(scanConfig); + Table table = TableCache.getRefreshed(scanConfig.getTableIdentifier()); + + long toSnapshotExclusiveSeq = POLL_FOREVER; + @Nullable Long toSnapshotId = ReadUtils.getToSnapshot(table, scanConfig); + if (toSnapshotId != null) { + toSnapshotExclusiveSeq = + Preconditions.checkStateNotNull( + table.snapshot(toSnapshotId), + "Configured end snapshot %s does not exist", + toSnapshotId) + .sequenceNumber() + + 1; + } + + @Nullable Long fromSnapshotInclusiveId = ReadUtils.getFromSnapshotInclusive(table, scanConfig); + long fromSnapshotInclusiveSeq; + if (fromSnapshotInclusiveId == null) { + fromSnapshotInclusiveSeq = 1L; // sequence numbers start at 1 + } else { + Snapshot fromSnapshotInclusive = + Preconditions.checkArgumentNotNull( + table.snapshot(fromSnapshotInclusiveId), + "The specified starting snapshot %s does not exist", + fromSnapshotInclusiveId); + fromSnapshotInclusiveSeq = fromSnapshotInclusive.sequenceNumber(); + + boolean sameLineage = + toSnapshotId == null + ? SnapshotUtil.isAncestorOf(table, fromSnapshotInclusiveId) + : SnapshotUtil.isAncestorOf(table, toSnapshotId, fromSnapshotInclusiveId); + checkArgument( + sameLineage, + "Configured starting snapshot %s is not an ancestor of %s", + fromSnapshotInclusiveId, + toSnapshotId == null ? "the current table" : "end snapshot " + toSnapshotId); + } + + return new OffsetRange( + fromSnapshotInclusiveSeq, Math.max(fromSnapshotInclusiveSeq, toSnapshotExclusiveSeq)); + } + + @NewTracker + public RestrictionTracker newTracker(@Restriction OffsetRange restriction) { + if (restriction.getTo() == POLL_FOREVER) { + return new GrowableOffsetRangeTracker( + restriction.getFrom(), this::estimateCurrentRangeEndExclusive); + } + + return new OffsetRangeTracker(restriction); + } + + private long estimateCurrentRangeEndExclusive() { + TableCache.setup(scanConfig); + Table table = TableCache.get(scanConfig.getTableIdentifier()); + + @Nullable Long toSnapshotId = ReadUtils.getToSnapshot(table, scanConfig); + if (toSnapshotId != null) { + @Nullable Snapshot toSnapshot = table.snapshot(toSnapshotId); + return toSnapshot == null ? Long.MIN_VALUE : toSnapshot.sequenceNumber() + 1; + } + + @Nullable Snapshot current = table.currentSnapshot(); + return current == null ? Long.MIN_VALUE : current.sequenceNumber() + 1; + } + + @GetRestrictionCoder + public Coder restrictionCoder() { + return new OffsetRange.Coder(); + } + + @GetInitialWatermarkEstimatorState + public Instant initialWatermarkState() { + return BoundedWindow.TIMESTAMP_MIN_VALUE; + } + + @NewWatermarkEstimator + public ManualWatermarkEstimator newWatermarkEstimator( + @WatermarkEstimatorState Instant state) { + return new Manual(state); + } + + @ProcessElement + public ProcessContinuation process( + RestrictionTracker tracker, + ManualWatermarkEstimator watermark, + OutputReceiver out) { + TableCache.setup(scanConfig); + Table table = TableCache.getRefreshed(scanConfig.getTableIdentifier()); + + @Nullable Long userToSnapshotId = ReadUtils.getToSnapshot(table, scanConfig); + boolean bounded = userToSnapshotId != null; + + @Nullable Snapshot current = table.currentSnapshot(); + if (current == null) { + // no snapshots yet. + LOG.info("Skipping scan: table is empty with no snapshots yet"); + return pauseOrStop(watermark, bounded); + } + + // Resolve the upper bound: user-specified bounded mode, or "current" for unbounded. + long toSnapshotId; + long toSnapshotSeq; + if (userToSnapshotId != null) { + toSnapshotId = userToSnapshotId; + toSnapshotSeq = + Preconditions.checkStateNotNull( + table.snapshot(userToSnapshotId), + "Configured toSnapshotId %s does not exist", + userToSnapshotId) + .sequenceNumber(); + } else { + toSnapshotId = current.snapshotId(); + toSnapshotSeq = current.sequenceNumber(); + } + + long nextSeqInclusive = tracker.currentRestriction().getFrom(); + if (toSnapshotSeq < nextSeqInclusive) { + // Nothing new since last poll. + LOG.info("Skipping scan: nothing new since last poll."); + return pauseOrStop(watermark, bounded); + } + + // Collect snapshots in [nextSeqInclusive, toSnapshotSeq] chronologically + String tableId = scanConfig.getTableIdentifier(); + List fresh = snapshotsAfter(table, tableId, nextSeqInclusive, toSnapshotId); + LOG.info("Collected snapshots: {}", fresh); + + for (SnapshotInfo snap : fresh) { + if (!tracker.tryClaim(snap.getSequenceNumber())) { + return ProcessContinuation.stop(); + } + Instant ts = Instant.ofEpochMilli(snap.getTimestampMillis()); + out.outputWithTimestamp(snap.getSnapshotId(), ts); + + if (watermark.currentWatermark().isBefore(ts)) { + watermark.setWatermark(ts); + } + snapshotsEmitted.inc(); + latestEmittedSnapshotId.set(snap.getSnapshotId()); + LOG.info( + "Emitted snapshot {} (sequence id: {}, commit ts: {})", + snap.getSnapshotId(), + snap.getSequenceNumber(), + ts); + } + + return pauseOrStop(watermark, bounded); + } + + /** + * On an empty poll, bump the watermark to {@code now() - MAX_SNAPSHOT_DISCOVERY_DELAY} so + * downstream windows can still fire. Returns {@code stop()} when end snapshot has been reached, + * otherwise {@code resume()} after the poll interval. + */ + private ProcessContinuation pauseOrStop( + ManualWatermarkEstimator watermark, boolean bounded) { + Duration delay = + MoreObjects.firstNonNull( + scanConfig.getMaxSnapshotDiscoveryDelay(), MAX_SNAPSHOT_DISCOVERY_DELAY); + Instant idleWatermark = Instant.now().minus(delay); + if (watermark.currentWatermark().isBefore(idleWatermark)) { + LOG.info( + "Sitting idle for {} seconds. Bumping watermark to {}", + TimeUnit.MILLISECONDS.toSeconds( + Instant.now().getMillis() - watermark.currentWatermark().getMillis()), + idleWatermark); + watermark.setWatermark(idleWatermark); + } + return bounded + ? ProcessContinuation.stop() + : ProcessContinuation.resume().withResumeDelay(pollInterval); + } + + /** + * Returns snapshots with sequence number in {@code [nextSeqInclusive, toSnapshotSeq]}, keyed off + * the lineage ending at {@code toSnapshotId}. + */ + @SuppressWarnings("return") // ancestorsOf accepts null returns as a "stop" signal + static List snapshotsAfter( + Table table, String tableIdentifier, long nextSeqInclusive, long toSnapshotId) { + + List snapshots = new ArrayList<>(); + // ancestorsOf returns an iterable of snapshots looking backwards. + // we'll need to reverse it to process snapshots chronologically. + for (Snapshot snapshot : + SnapshotUtil.ancestorsOf( + toSnapshotId, snapshotId -> snapshotId != null ? table.snapshot(snapshotId) : null)) { + if (snapshot.sequenceNumber() < nextSeqInclusive) { + break; + } + snapshots.add(SnapshotInfo.fromSnapshot(snapshot, tableIdentifier)); + } + Collections.reverse(snapshots); + return snapshots; + } +} diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/TestDataWarehouse.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/TestDataWarehouse.java index dcb2d804d2e6..2e711219349c 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/TestDataWarehouse.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/TestDataWarehouse.java @@ -64,7 +64,7 @@ public class TestDataWarehouse extends ExternalResource { protected final Configuration hadoopConf; - protected String location; + public String location; protected Catalog catalog; protected boolean someTableHasBeenCreated = false; diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/cdc/WatchForSnapshotsSdfTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/cdc/WatchForSnapshotsSdfTest.java new file mode 100644 index 000000000000..c62a9d6fb4ed --- /dev/null +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/cdc/WatchForSnapshotsSdfTest.java @@ -0,0 +1,265 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.iceberg.cdc; + +import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.apache.iceberg.types.Types.NestedField.required; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.lessThanOrEqualTo; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import org.apache.beam.sdk.io.iceberg.IcebergCatalogConfig; +import org.apache.beam.sdk.io.iceberg.IcebergIO.ReadRows.StartingStrategy; +import org.apache.beam.sdk.io.iceberg.IcebergScanConfig; +import org.apache.beam.sdk.io.iceberg.IcebergUtils; +import org.apache.beam.sdk.io.iceberg.TestDataWarehouse; +import org.apache.beam.sdk.io.iceberg.TestFixtures; +import org.apache.beam.sdk.io.range.OffsetRange; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator; +import org.apache.beam.sdk.values.OutputBuilder; +import org.apache.beam.sdk.values.TimestampedValue; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.types.Types; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.rules.TestName; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Unit tests for {@link WatchForSnapshotsSdf}. */ +@RunWith(JUnit4.class) +public class WatchForSnapshotsSdfTest { + private static final Schema CDC_SCHEMA = + new Schema( + ImmutableList.of( + required(1, "id", Types.LongType.get()), optional(2, "data", Types.StringType.get())), + ImmutableSet.of(1)); + + @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + + @Rule public TestDataWarehouse warehouse = new TestDataWarehouse(TEMPORARY_FOLDER, "default"); + @Rule public TestName testName = new TestName(); + + @Test + public void earliestStreamingRestrictionEmitsSnapshotsInSequenceOrder() throws Exception { + TableIdentifier tableId = tableId(); + Table table = warehouse.createTable(tableId, CDC_SCHEMA, null, tableProperties()); + commitAppend(table, "s1.parquet", records("one", 1L)); + commitAppend(table, "s2.parquet", records("two", 2L)); + commitAppend(table, "s3.parquet", records("three", 3L)); + List snapshots = Lists.newArrayList(table.snapshots()); + + WatchForSnapshotsSdf sdf = + new WatchForSnapshotsSdf( + scanConfigBuilder(table, tableId) + .setStreaming(true) + .setStartingStrategy(StartingStrategy.EARLIEST) + .setPollInterval(Duration.millis(1L)) + .build()); + + OffsetRange restriction = sdf.initialRestriction(); + assertEquals(1L, restriction.getFrom()); + assertEquals(Long.MAX_VALUE, restriction.getTo()); + + CapturingOutputReceiver out = new CapturingOutputReceiver(); + ManualWatermarkEstimator watermark = + sdf.newWatermarkEstimator(sdf.initialWatermarkState()); + DoFn.ProcessContinuation continuation = + sdf.process(sdf.newTracker(restriction), watermark, out); + + Long[] expectedSnapshotIds = snapshots.stream().map(Snapshot::snapshotId).toArray(Long[]::new); + List actualSnapshotIds = + out.values.stream().map(TimestampedValue::getValue).collect(Collectors.toList()); + assertTrue(continuation.shouldResume()); + assertEquals(Duration.millis(1L), continuation.resumeDelay()); + assertThat(actualSnapshotIds, contains(expectedSnapshotIds)); + assertEquals( + Instant.ofEpochMilli(snapshots.get(snapshots.size() - 1).timestampMillis()), + watermark.currentWatermark()); + } + + @Test + public void boundedSnapshotRangeUsesInclusiveLowerAndExclusiveUpperSequence() throws Exception { + TableIdentifier tableId = tableId(); + Table table = warehouse.createTable(tableId, CDC_SCHEMA, null, tableProperties()); + commitAppend(table, "s1.parquet", records("one", 1L)); + commitAppend(table, "s2.parquet", records("two", 2L)); + commitAppend(table, "s3.parquet", records("three", 3L)); + commitAppend(table, "s4.parquet", records("four", 4L)); + List snapshots = Lists.newArrayList(table.snapshots()); + Snapshot second = snapshots.get(1); + Snapshot third = snapshots.get(2); + + WatchForSnapshotsSdf sdf = + new WatchForSnapshotsSdf( + scanConfigBuilder(table, tableId) + .setStreaming(true) + .setFromSnapshotInclusive(second.snapshotId()) + .setToSnapshot(third.snapshotId()) // this is inclusive + .setPollInterval(Duration.standardSeconds(30L)) + .build()); + + OffsetRange restriction = sdf.initialRestriction(); + assertEquals(second.sequenceNumber(), restriction.getFrom()); + assertEquals(third.sequenceNumber() + 1, restriction.getTo()); + + CapturingOutputReceiver out = new CapturingOutputReceiver(); + DoFn.ProcessContinuation continuation = + sdf.process( + sdf.newTracker(restriction), + sdf.newWatermarkEstimator(sdf.initialWatermarkState()), + out); + + List outputSnapshotIds = + out.values.stream().map(TimestampedValue::getValue).collect(Collectors.toList()); + + assertFalse(continuation.shouldResume()); + assertThat(outputSnapshotIds, contains(second.snapshotId(), third.snapshotId())); + assertEquals(2, out.values.size()); + assertEquals(Instant.ofEpochMilli(second.timestampMillis()), out.values.get(0).getTimestamp()); + assertEquals(Instant.ofEpochMilli(third.timestampMillis()), out.values.get(1).getTimestamp()); + } + + @Test + public void streamingDefaultStartsAtLatestSnapshotAndEarliestStartsAtFirst() throws Exception { + TableIdentifier tableId = tableId(); + Table table = warehouse.createTable(tableId, CDC_SCHEMA, null, tableProperties()); + commitAppend(table, "s1.parquet", records("one", 1L)); + commitAppend(table, "s2.parquet", records("two", 2L)); + Snapshot latest = table.currentSnapshot(); + + WatchForSnapshotsSdf defaultSdf = + new WatchForSnapshotsSdf( + scanConfigBuilder(table, tableId) + .setStreaming(true) + .setPollInterval(Duration.standardSeconds(1L)) + .build()); + WatchForSnapshotsSdf earliestSdf = + new WatchForSnapshotsSdf( + scanConfigBuilder(table, tableId) + .setStreaming(true) + .setStartingStrategy(StartingStrategy.EARLIEST) + .setPollInterval(Duration.standardSeconds(1L)) + .build()); + + assertEquals(latest.sequenceNumber(), defaultSdf.initialRestriction().getFrom()); + assertEquals(1L, earliestSdf.initialRestriction().getFrom()); + } + + @Test + public void emptyTableReturnsResumeAndAdvancesIdleWatermark() { + TableIdentifier tableId = tableId(); + Table table = warehouse.createTable(tableId, CDC_SCHEMA, null, tableProperties()); + WatchForSnapshotsSdf sdf = + new WatchForSnapshotsSdf( + scanConfigBuilder(table, tableId) + .setStreaming(true) + .setMaxSnapshotDiscoveryDelay(Duration.ZERO) + .setPollInterval(Duration.millis(25L)) + .build()); + ManualWatermarkEstimator watermark = + sdf.newWatermarkEstimator(sdf.initialWatermarkState()); + Instant beforeProcess = Instant.now(); + CapturingOutputReceiver out = new CapturingOutputReceiver(); + + DoFn.ProcessContinuation continuation = + sdf.process(sdf.newTracker(sdf.initialRestriction()), watermark, out); + + assertTrue(continuation.shouldResume()); + assertEquals(Duration.millis(25L), continuation.resumeDelay()); + assertThat(out.values, empty()); + assertThat(watermark.currentWatermark(), greaterThan(beforeProcess.minus(Duration.millis(1L)))); + assertThat(watermark.currentWatermark(), lessThanOrEqualTo(Instant.now())); + } + + private TableIdentifier tableId() { + return TableIdentifier.of("default", testName.getMethodName()); + } + + private IcebergScanConfig.Builder scanConfigBuilder(Table table, TableIdentifier tableId) { + return IcebergScanConfig.builder() + .setCatalogConfig( + IcebergCatalogConfig.builder() + .setCatalogName("name") + .setCatalogProperties( + ImmutableMap.of("type", "hadoop", "warehouse", warehouse.location)) + .build()) + .setTableIdentifier(tableId) + .setSchema(IcebergUtils.icebergSchemaToBeamSchema(table.schema())) + .setUseCdc(true); + } + + private static Map tableProperties() { + return ImmutableMap.of(TableProperties.FORMAT_VERSION, "2"); + } + + private void commitAppend(Table table, String fileName, List records) throws IOException { + DataFile file = + warehouse.writeRecords(testName.getMethodName() + "-" + fileName, table.schema(), records); + table.newFastAppend().appendFile(file).commit(); + table.refresh(); + } + + private static List records(String data, long... ids) { + ImmutableList.Builder records = ImmutableList.builder(); + for (long id : ids) { + records.add(TestFixtures.createRecord(CDC_SCHEMA, ImmutableMap.of("id", id, "data", data))); + } + return records.build(); + } + + private static final class CapturingOutputReceiver implements DoFn.OutputReceiver { + private final ImmutableList.Builder> builder = ImmutableList.builder(); + private List> values = ImmutableList.of(); + + @Override + public OutputBuilder builder(Long value) { + throw new UnsupportedOperationException("Use outputWithTimestamp in this test receiver."); + } + + @Override + public void outputWithTimestamp(Long value, Instant timestamp) { + builder.add(TimestampedValue.of(value, timestamp)); + values = builder.build(); + } + } +}