From f0aae0ebdd4e590178d4c038e6dd0810ea91d86e Mon Sep 17 00:00:00 2001 From: Chase Date: Sat, 4 Apr 2026 17:25:55 +0900 Subject: [PATCH 1/3] Flink: Update WatermarkExtractorRecordEmitter to minus one from watermark with unit tests --- .../WatermarkExtractorRecordEmitter.java | 2 +- .../TestWatermarkExtractorRecordEmitter.java | 150 ++++++++++++++++++ .../WatermarkExtractorRecordEmitter.java | 2 +- .../TestWatermarkExtractorRecordEmitter.java | 150 ++++++++++++++++++ .../WatermarkExtractorRecordEmitter.java | 2 +- .../TestWatermarkExtractorRecordEmitter.java | 150 ++++++++++++++++++ 6 files changed, 453 insertions(+), 3 deletions(-) create mode 100644 flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestWatermarkExtractorRecordEmitter.java create mode 100644 flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestWatermarkExtractorRecordEmitter.java create mode 100644 flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestWatermarkExtractorRecordEmitter.java diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/reader/WatermarkExtractorRecordEmitter.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/reader/WatermarkExtractorRecordEmitter.java index 02ef57d344b1..3d4ec7da31ee 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/reader/WatermarkExtractorRecordEmitter.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/reader/WatermarkExtractorRecordEmitter.java @@ -44,7 +44,7 @@ class WatermarkExtractorRecordEmitter implements SerializableRecordEmitter public void emitRecord( RecordAndPosition element, SourceOutput output, IcebergSourceSplit split) { if (!split.splitId().equals(lastSplitId)) { - long newWatermark = timeExtractor.extractWatermark(split); + long newWatermark = timeExtractor.extractWatermark(split) - 1; if (newWatermark < watermark) { LOG.info( "Received a new split with lower watermark. Previous watermark = {}, current watermark = {}, previous split = {}, current split = {}", diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestWatermarkExtractorRecordEmitter.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestWatermarkExtractorRecordEmitter.java new file mode 100644 index 000000000000..e2ec9a10965d --- /dev/null +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestWatermarkExtractorRecordEmitter.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.source.reader; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.IOException; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.flink.api.common.eventtime.Watermark; +import org.apache.flink.api.connector.source.SourceOutput; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.flink.TestFixtures; +import org.apache.iceberg.flink.source.split.IcebergSourceSplit; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +public class TestWatermarkExtractorRecordEmitter { + @TempDir protected Path temporaryFolder; + + @Test + public void testWatermarkIsDecrementedByOne() throws IOException { + long extractedWatermark = 1000L; + IcebergSourceSplit split = createSplit(1L); + + WatermarkExtractorRecordEmitter emitter = + new WatermarkExtractorRecordEmitter<>(s -> extractedWatermark); + + CapturingSourceOutput output = new CapturingSourceOutput<>(); + emitter.emitRecord(new RecordAndPosition<>("record", 0, 0L), output, split); + + assertThat(output.watermarks).hasSize(1); + assertThat(output.watermarks.get(0).getTimestamp()).isEqualTo(extractedWatermark - 1); + } + + @Test + public void testWatermarkEmittedOnlyOncePerSplit() throws IOException { + IcebergSourceSplit split = createSplit(1L); + + WatermarkExtractorRecordEmitter emitter = + new WatermarkExtractorRecordEmitter<>(s -> 1000L); + + CapturingSourceOutput output = new CapturingSourceOutput<>(); + RecordAndPosition element = new RecordAndPosition<>("record", 0, 0L); + emitter.emitRecord(element, output, split); + emitter.emitRecord(element, output, split); + emitter.emitRecord(element, output, split); + + assertThat(output.watermarks).hasSize(1); + assertThat(output.records).hasSize(3); + } + + @Test + public void testWatermarkNotEmittedWhenNewSplitHasLowerValue() throws IOException { + IcebergSourceSplit split1 = createSplit(1L); + IcebergSourceSplit split2 = createSplit(2L); + + Map watermarkMap = new HashMap<>(); + watermarkMap.put(split1.splitId(), 2000L); + watermarkMap.put(split2.splitId(), 1000L); + + WatermarkExtractorRecordEmitter emitter = + new WatermarkExtractorRecordEmitter<>(s -> watermarkMap.get(s.splitId())); + + CapturingSourceOutput output = new CapturingSourceOutput<>(); + RecordAndPosition element = new RecordAndPosition<>("record", 0, 0L); + emitter.emitRecord(element, output, split1); + emitter.emitRecord(element, output, split2); + + // Only split1's watermark is emitted; split2 has a lower value so it's skipped + assertThat(output.watermarks).hasSize(1); + assertThat(output.watermarks.get(0).getTimestamp()).isEqualTo(1999L); + } + + @Test + public void testWatermarkEmittedForEachHigherSplit() throws IOException { + IcebergSourceSplit split1 = createSplit(1L); + IcebergSourceSplit split2 = createSplit(2L); + + Map watermarkMap = new HashMap<>(); + watermarkMap.put(split1.splitId(), 1000L); + watermarkMap.put(split2.splitId(), 2000L); + + WatermarkExtractorRecordEmitter emitter = + new WatermarkExtractorRecordEmitter<>(s -> watermarkMap.get(s.splitId())); + + CapturingSourceOutput output = new CapturingSourceOutput<>(); + RecordAndPosition element = new RecordAndPosition<>("record", 0, 0L); + emitter.emitRecord(element, output, split1); + emitter.emitRecord(element, output, split2); + + assertThat(output.watermarks).hasSize(2); + assertThat(output.watermarks.get(0).getTimestamp()).isEqualTo(999L); + assertThat(output.watermarks.get(1).getTimestamp()).isEqualTo(1999L); + } + + private IcebergSourceSplit createSplit(long seed) throws IOException { + List> recordBatchList = + ReaderUtil.createRecordBatchList(seed, TestFixtures.SCHEMA, 1, 1); + return IcebergSourceSplit.fromCombinedScanTask( + ReaderUtil.createCombinedScanTask( + recordBatchList, temporaryFolder, FileFormat.PARQUET, TestFixtures.SCHEMA)); + } + + private static class CapturingSourceOutput implements SourceOutput { + final List watermarks = new ArrayList<>(); + final List records = new ArrayList<>(); + + @Override + public void collect(T record) { + records.add(record); + } + + @Override + public void collect(T record, long timestamp) { + records.add(record); + } + + @Override + public void emitWatermark(Watermark watermark) { + watermarks.add(watermark); + } + + @Override + public void markIdle() {} + + @Override + public void markActive() {} + } +} diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/source/reader/WatermarkExtractorRecordEmitter.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/source/reader/WatermarkExtractorRecordEmitter.java index 02ef57d344b1..3d4ec7da31ee 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/source/reader/WatermarkExtractorRecordEmitter.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/source/reader/WatermarkExtractorRecordEmitter.java @@ -44,7 +44,7 @@ class WatermarkExtractorRecordEmitter implements SerializableRecordEmitter public void emitRecord( RecordAndPosition element, SourceOutput output, IcebergSourceSplit split) { if (!split.splitId().equals(lastSplitId)) { - long newWatermark = timeExtractor.extractWatermark(split); + long newWatermark = timeExtractor.extractWatermark(split) - 1; if (newWatermark < watermark) { LOG.info( "Received a new split with lower watermark. Previous watermark = {}, current watermark = {}, previous split = {}, current split = {}", diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestWatermarkExtractorRecordEmitter.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestWatermarkExtractorRecordEmitter.java new file mode 100644 index 000000000000..e2ec9a10965d --- /dev/null +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestWatermarkExtractorRecordEmitter.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.source.reader; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.IOException; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.flink.api.common.eventtime.Watermark; +import org.apache.flink.api.connector.source.SourceOutput; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.flink.TestFixtures; +import org.apache.iceberg.flink.source.split.IcebergSourceSplit; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +public class TestWatermarkExtractorRecordEmitter { + @TempDir protected Path temporaryFolder; + + @Test + public void testWatermarkIsDecrementedByOne() throws IOException { + long extractedWatermark = 1000L; + IcebergSourceSplit split = createSplit(1L); + + WatermarkExtractorRecordEmitter emitter = + new WatermarkExtractorRecordEmitter<>(s -> extractedWatermark); + + CapturingSourceOutput output = new CapturingSourceOutput<>(); + emitter.emitRecord(new RecordAndPosition<>("record", 0, 0L), output, split); + + assertThat(output.watermarks).hasSize(1); + assertThat(output.watermarks.get(0).getTimestamp()).isEqualTo(extractedWatermark - 1); + } + + @Test + public void testWatermarkEmittedOnlyOncePerSplit() throws IOException { + IcebergSourceSplit split = createSplit(1L); + + WatermarkExtractorRecordEmitter emitter = + new WatermarkExtractorRecordEmitter<>(s -> 1000L); + + CapturingSourceOutput output = new CapturingSourceOutput<>(); + RecordAndPosition element = new RecordAndPosition<>("record", 0, 0L); + emitter.emitRecord(element, output, split); + emitter.emitRecord(element, output, split); + emitter.emitRecord(element, output, split); + + assertThat(output.watermarks).hasSize(1); + assertThat(output.records).hasSize(3); + } + + @Test + public void testWatermarkNotEmittedWhenNewSplitHasLowerValue() throws IOException { + IcebergSourceSplit split1 = createSplit(1L); + IcebergSourceSplit split2 = createSplit(2L); + + Map watermarkMap = new HashMap<>(); + watermarkMap.put(split1.splitId(), 2000L); + watermarkMap.put(split2.splitId(), 1000L); + + WatermarkExtractorRecordEmitter emitter = + new WatermarkExtractorRecordEmitter<>(s -> watermarkMap.get(s.splitId())); + + CapturingSourceOutput output = new CapturingSourceOutput<>(); + RecordAndPosition element = new RecordAndPosition<>("record", 0, 0L); + emitter.emitRecord(element, output, split1); + emitter.emitRecord(element, output, split2); + + // Only split1's watermark is emitted; split2 has a lower value so it's skipped + assertThat(output.watermarks).hasSize(1); + assertThat(output.watermarks.get(0).getTimestamp()).isEqualTo(1999L); + } + + @Test + public void testWatermarkEmittedForEachHigherSplit() throws IOException { + IcebergSourceSplit split1 = createSplit(1L); + IcebergSourceSplit split2 = createSplit(2L); + + Map watermarkMap = new HashMap<>(); + watermarkMap.put(split1.splitId(), 1000L); + watermarkMap.put(split2.splitId(), 2000L); + + WatermarkExtractorRecordEmitter emitter = + new WatermarkExtractorRecordEmitter<>(s -> watermarkMap.get(s.splitId())); + + CapturingSourceOutput output = new CapturingSourceOutput<>(); + RecordAndPosition element = new RecordAndPosition<>("record", 0, 0L); + emitter.emitRecord(element, output, split1); + emitter.emitRecord(element, output, split2); + + assertThat(output.watermarks).hasSize(2); + assertThat(output.watermarks.get(0).getTimestamp()).isEqualTo(999L); + assertThat(output.watermarks.get(1).getTimestamp()).isEqualTo(1999L); + } + + private IcebergSourceSplit createSplit(long seed) throws IOException { + List> recordBatchList = + ReaderUtil.createRecordBatchList(seed, TestFixtures.SCHEMA, 1, 1); + return IcebergSourceSplit.fromCombinedScanTask( + ReaderUtil.createCombinedScanTask( + recordBatchList, temporaryFolder, FileFormat.PARQUET, TestFixtures.SCHEMA)); + } + + private static class CapturingSourceOutput implements SourceOutput { + final List watermarks = new ArrayList<>(); + final List records = new ArrayList<>(); + + @Override + public void collect(T record) { + records.add(record); + } + + @Override + public void collect(T record, long timestamp) { + records.add(record); + } + + @Override + public void emitWatermark(Watermark watermark) { + watermarks.add(watermark); + } + + @Override + public void markIdle() {} + + @Override + public void markActive() {} + } +} diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/source/reader/WatermarkExtractorRecordEmitter.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/source/reader/WatermarkExtractorRecordEmitter.java index 02ef57d344b1..3d4ec7da31ee 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/source/reader/WatermarkExtractorRecordEmitter.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/source/reader/WatermarkExtractorRecordEmitter.java @@ -44,7 +44,7 @@ class WatermarkExtractorRecordEmitter implements SerializableRecordEmitter public void emitRecord( RecordAndPosition element, SourceOutput output, IcebergSourceSplit split) { if (!split.splitId().equals(lastSplitId)) { - long newWatermark = timeExtractor.extractWatermark(split); + long newWatermark = timeExtractor.extractWatermark(split) - 1; if (newWatermark < watermark) { LOG.info( "Received a new split with lower watermark. Previous watermark = {}, current watermark = {}, previous split = {}, current split = {}", diff --git a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestWatermarkExtractorRecordEmitter.java b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestWatermarkExtractorRecordEmitter.java new file mode 100644 index 000000000000..e2ec9a10965d --- /dev/null +++ b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestWatermarkExtractorRecordEmitter.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.source.reader; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.IOException; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.flink.api.common.eventtime.Watermark; +import org.apache.flink.api.connector.source.SourceOutput; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.flink.TestFixtures; +import org.apache.iceberg.flink.source.split.IcebergSourceSplit; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +public class TestWatermarkExtractorRecordEmitter { + @TempDir protected Path temporaryFolder; + + @Test + public void testWatermarkIsDecrementedByOne() throws IOException { + long extractedWatermark = 1000L; + IcebergSourceSplit split = createSplit(1L); + + WatermarkExtractorRecordEmitter emitter = + new WatermarkExtractorRecordEmitter<>(s -> extractedWatermark); + + CapturingSourceOutput output = new CapturingSourceOutput<>(); + emitter.emitRecord(new RecordAndPosition<>("record", 0, 0L), output, split); + + assertThat(output.watermarks).hasSize(1); + assertThat(output.watermarks.get(0).getTimestamp()).isEqualTo(extractedWatermark - 1); + } + + @Test + public void testWatermarkEmittedOnlyOncePerSplit() throws IOException { + IcebergSourceSplit split = createSplit(1L); + + WatermarkExtractorRecordEmitter emitter = + new WatermarkExtractorRecordEmitter<>(s -> 1000L); + + CapturingSourceOutput output = new CapturingSourceOutput<>(); + RecordAndPosition element = new RecordAndPosition<>("record", 0, 0L); + emitter.emitRecord(element, output, split); + emitter.emitRecord(element, output, split); + emitter.emitRecord(element, output, split); + + assertThat(output.watermarks).hasSize(1); + assertThat(output.records).hasSize(3); + } + + @Test + public void testWatermarkNotEmittedWhenNewSplitHasLowerValue() throws IOException { + IcebergSourceSplit split1 = createSplit(1L); + IcebergSourceSplit split2 = createSplit(2L); + + Map watermarkMap = new HashMap<>(); + watermarkMap.put(split1.splitId(), 2000L); + watermarkMap.put(split2.splitId(), 1000L); + + WatermarkExtractorRecordEmitter emitter = + new WatermarkExtractorRecordEmitter<>(s -> watermarkMap.get(s.splitId())); + + CapturingSourceOutput output = new CapturingSourceOutput<>(); + RecordAndPosition element = new RecordAndPosition<>("record", 0, 0L); + emitter.emitRecord(element, output, split1); + emitter.emitRecord(element, output, split2); + + // Only split1's watermark is emitted; split2 has a lower value so it's skipped + assertThat(output.watermarks).hasSize(1); + assertThat(output.watermarks.get(0).getTimestamp()).isEqualTo(1999L); + } + + @Test + public void testWatermarkEmittedForEachHigherSplit() throws IOException { + IcebergSourceSplit split1 = createSplit(1L); + IcebergSourceSplit split2 = createSplit(2L); + + Map watermarkMap = new HashMap<>(); + watermarkMap.put(split1.splitId(), 1000L); + watermarkMap.put(split2.splitId(), 2000L); + + WatermarkExtractorRecordEmitter emitter = + new WatermarkExtractorRecordEmitter<>(s -> watermarkMap.get(s.splitId())); + + CapturingSourceOutput output = new CapturingSourceOutput<>(); + RecordAndPosition element = new RecordAndPosition<>("record", 0, 0L); + emitter.emitRecord(element, output, split1); + emitter.emitRecord(element, output, split2); + + assertThat(output.watermarks).hasSize(2); + assertThat(output.watermarks.get(0).getTimestamp()).isEqualTo(999L); + assertThat(output.watermarks.get(1).getTimestamp()).isEqualTo(1999L); + } + + private IcebergSourceSplit createSplit(long seed) throws IOException { + List> recordBatchList = + ReaderUtil.createRecordBatchList(seed, TestFixtures.SCHEMA, 1, 1); + return IcebergSourceSplit.fromCombinedScanTask( + ReaderUtil.createCombinedScanTask( + recordBatchList, temporaryFolder, FileFormat.PARQUET, TestFixtures.SCHEMA)); + } + + private static class CapturingSourceOutput implements SourceOutput { + final List watermarks = new ArrayList<>(); + final List records = new ArrayList<>(); + + @Override + public void collect(T record) { + records.add(record); + } + + @Override + public void collect(T record, long timestamp) { + records.add(record); + } + + @Override + public void emitWatermark(Watermark watermark) { + watermarks.add(watermark); + } + + @Override + public void markIdle() {} + + @Override + public void markActive() {} + } +} From 7fbc803e1656cb2e224eccef392862bcd6135f73 Mon Sep 17 00:00:00 2001 From: Chase Date: Mon, 6 Apr 2026 21:25:58 +0900 Subject: [PATCH 2/3] Flink: fix watermark initial value --- .../reader/WatermarkExtractorRecordEmitter.java | 5 +++-- .../TestWatermarkExtractorRecordEmitter.java | 14 ++++++++++++++ .../reader/WatermarkExtractorRecordEmitter.java | 5 +++-- .../TestWatermarkExtractorRecordEmitter.java | 14 ++++++++++++++ .../reader/WatermarkExtractorRecordEmitter.java | 5 +++-- .../TestWatermarkExtractorRecordEmitter.java | 14 ++++++++++++++ 6 files changed, 51 insertions(+), 6 deletions(-) diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/reader/WatermarkExtractorRecordEmitter.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/reader/WatermarkExtractorRecordEmitter.java index 3d4ec7da31ee..87612ea3061c 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/reader/WatermarkExtractorRecordEmitter.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/reader/WatermarkExtractorRecordEmitter.java @@ -34,7 +34,7 @@ class WatermarkExtractorRecordEmitter implements SerializableRecordEmitter private static final Logger LOG = LoggerFactory.getLogger(WatermarkExtractorRecordEmitter.class); private final SplitWatermarkExtractor timeExtractor; private String lastSplitId = null; - private long watermark; + private long watermark = Long.MIN_VALUE; WatermarkExtractorRecordEmitter(SplitWatermarkExtractor timeExtractor) { this.timeExtractor = timeExtractor; @@ -44,7 +44,8 @@ class WatermarkExtractorRecordEmitter implements SerializableRecordEmitter public void emitRecord( RecordAndPosition element, SourceOutput output, IcebergSourceSplit split) { if (!split.splitId().equals(lastSplitId)) { - long newWatermark = timeExtractor.extractWatermark(split) - 1; + long extracted = timeExtractor.extractWatermark(split); + long newWatermark = extracted > Long.MIN_VALUE ? extracted - 1 : Long.MIN_VALUE; if (newWatermark < watermark) { LOG.info( "Received a new split with lower watermark. Previous watermark = {}, current watermark = {}, previous split = {}, current split = {}", diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestWatermarkExtractorRecordEmitter.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestWatermarkExtractorRecordEmitter.java index e2ec9a10965d..7710cfcc64f4 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestWatermarkExtractorRecordEmitter.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestWatermarkExtractorRecordEmitter.java @@ -114,6 +114,20 @@ public void testWatermarkEmittedForEachHigherSplit() throws IOException { assertThat(output.watermarks.get(1).getTimestamp()).isEqualTo(1999L); } + @Test + public void testWatermarkAtLongMinValueDoesNotOverflow() throws IOException { + IcebergSourceSplit split = createSplit(1L); + + WatermarkExtractorRecordEmitter emitter = + new WatermarkExtractorRecordEmitter<>(s -> Long.MIN_VALUE); + + CapturingSourceOutput output = new CapturingSourceOutput<>(); + emitter.emitRecord(new RecordAndPosition<>("record", 0, 0L), output, split); + + assertThat(output.watermarks).hasSize(1); + assertThat(output.watermarks.get(0).getTimestamp()).isEqualTo(Long.MIN_VALUE); + } + private IcebergSourceSplit createSplit(long seed) throws IOException { List> recordBatchList = ReaderUtil.createRecordBatchList(seed, TestFixtures.SCHEMA, 1, 1); diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/source/reader/WatermarkExtractorRecordEmitter.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/source/reader/WatermarkExtractorRecordEmitter.java index 3d4ec7da31ee..87612ea3061c 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/source/reader/WatermarkExtractorRecordEmitter.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/source/reader/WatermarkExtractorRecordEmitter.java @@ -34,7 +34,7 @@ class WatermarkExtractorRecordEmitter implements SerializableRecordEmitter private static final Logger LOG = LoggerFactory.getLogger(WatermarkExtractorRecordEmitter.class); private final SplitWatermarkExtractor timeExtractor; private String lastSplitId = null; - private long watermark; + private long watermark = Long.MIN_VALUE; WatermarkExtractorRecordEmitter(SplitWatermarkExtractor timeExtractor) { this.timeExtractor = timeExtractor; @@ -44,7 +44,8 @@ class WatermarkExtractorRecordEmitter implements SerializableRecordEmitter public void emitRecord( RecordAndPosition element, SourceOutput output, IcebergSourceSplit split) { if (!split.splitId().equals(lastSplitId)) { - long newWatermark = timeExtractor.extractWatermark(split) - 1; + long extracted = timeExtractor.extractWatermark(split); + long newWatermark = extracted > Long.MIN_VALUE ? extracted - 1 : Long.MIN_VALUE; if (newWatermark < watermark) { LOG.info( "Received a new split with lower watermark. Previous watermark = {}, current watermark = {}, previous split = {}, current split = {}", diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestWatermarkExtractorRecordEmitter.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestWatermarkExtractorRecordEmitter.java index e2ec9a10965d..7710cfcc64f4 100644 --- a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestWatermarkExtractorRecordEmitter.java +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestWatermarkExtractorRecordEmitter.java @@ -114,6 +114,20 @@ public void testWatermarkEmittedForEachHigherSplit() throws IOException { assertThat(output.watermarks.get(1).getTimestamp()).isEqualTo(1999L); } + @Test + public void testWatermarkAtLongMinValueDoesNotOverflow() throws IOException { + IcebergSourceSplit split = createSplit(1L); + + WatermarkExtractorRecordEmitter emitter = + new WatermarkExtractorRecordEmitter<>(s -> Long.MIN_VALUE); + + CapturingSourceOutput output = new CapturingSourceOutput<>(); + emitter.emitRecord(new RecordAndPosition<>("record", 0, 0L), output, split); + + assertThat(output.watermarks).hasSize(1); + assertThat(output.watermarks.get(0).getTimestamp()).isEqualTo(Long.MIN_VALUE); + } + private IcebergSourceSplit createSplit(long seed) throws IOException { List> recordBatchList = ReaderUtil.createRecordBatchList(seed, TestFixtures.SCHEMA, 1, 1); diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/source/reader/WatermarkExtractorRecordEmitter.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/source/reader/WatermarkExtractorRecordEmitter.java index 3d4ec7da31ee..87612ea3061c 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/source/reader/WatermarkExtractorRecordEmitter.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/source/reader/WatermarkExtractorRecordEmitter.java @@ -34,7 +34,7 @@ class WatermarkExtractorRecordEmitter implements SerializableRecordEmitter private static final Logger LOG = LoggerFactory.getLogger(WatermarkExtractorRecordEmitter.class); private final SplitWatermarkExtractor timeExtractor; private String lastSplitId = null; - private long watermark; + private long watermark = Long.MIN_VALUE; WatermarkExtractorRecordEmitter(SplitWatermarkExtractor timeExtractor) { this.timeExtractor = timeExtractor; @@ -44,7 +44,8 @@ class WatermarkExtractorRecordEmitter implements SerializableRecordEmitter public void emitRecord( RecordAndPosition element, SourceOutput output, IcebergSourceSplit split) { if (!split.splitId().equals(lastSplitId)) { - long newWatermark = timeExtractor.extractWatermark(split) - 1; + long extracted = timeExtractor.extractWatermark(split); + long newWatermark = extracted > Long.MIN_VALUE ? extracted - 1 : Long.MIN_VALUE; if (newWatermark < watermark) { LOG.info( "Received a new split with lower watermark. Previous watermark = {}, current watermark = {}, previous split = {}, current split = {}", diff --git a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestWatermarkExtractorRecordEmitter.java b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestWatermarkExtractorRecordEmitter.java index e2ec9a10965d..7710cfcc64f4 100644 --- a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestWatermarkExtractorRecordEmitter.java +++ b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestWatermarkExtractorRecordEmitter.java @@ -114,6 +114,20 @@ public void testWatermarkEmittedForEachHigherSplit() throws IOException { assertThat(output.watermarks.get(1).getTimestamp()).isEqualTo(1999L); } + @Test + public void testWatermarkAtLongMinValueDoesNotOverflow() throws IOException { + IcebergSourceSplit split = createSplit(1L); + + WatermarkExtractorRecordEmitter emitter = + new WatermarkExtractorRecordEmitter<>(s -> Long.MIN_VALUE); + + CapturingSourceOutput output = new CapturingSourceOutput<>(); + emitter.emitRecord(new RecordAndPosition<>("record", 0, 0L), output, split); + + assertThat(output.watermarks).hasSize(1); + assertThat(output.watermarks.get(0).getTimestamp()).isEqualTo(Long.MIN_VALUE); + } + private IcebergSourceSplit createSplit(long seed) throws IOException { List> recordBatchList = ReaderUtil.createRecordBatchList(seed, TestFixtures.SCHEMA, 1, 1); From 51601eea80aa2ace98bd26d6216218898c7b08a8 Mon Sep 17 00:00:00 2001 From: Chase Date: Mon, 6 Apr 2026 22:26:04 +0900 Subject: [PATCH 3/3] Flink: fix code style --- .../reader/TestWatermarkExtractorRecordEmitter.java | 12 ++++++------ .../reader/TestWatermarkExtractorRecordEmitter.java | 12 ++++++------ .../reader/TestWatermarkExtractorRecordEmitter.java | 12 ++++++------ 3 files changed, 18 insertions(+), 18 deletions(-) diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestWatermarkExtractorRecordEmitter.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestWatermarkExtractorRecordEmitter.java index 7710cfcc64f4..1cb7be03c6a7 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestWatermarkExtractorRecordEmitter.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestWatermarkExtractorRecordEmitter.java @@ -22,8 +22,6 @@ import java.io.IOException; import java.nio.file.Path; -import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.flink.api.common.eventtime.Watermark; @@ -32,6 +30,8 @@ import org.apache.iceberg.data.Record; import org.apache.iceberg.flink.TestFixtures; import org.apache.iceberg.flink.source.split.IcebergSourceSplit; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; @@ -75,7 +75,7 @@ public void testWatermarkNotEmittedWhenNewSplitHasLowerValue() throws IOExceptio IcebergSourceSplit split1 = createSplit(1L); IcebergSourceSplit split2 = createSplit(2L); - Map watermarkMap = new HashMap<>(); + Map watermarkMap = Maps.newHashMap(); watermarkMap.put(split1.splitId(), 2000L); watermarkMap.put(split2.splitId(), 1000L); @@ -97,7 +97,7 @@ public void testWatermarkEmittedForEachHigherSplit() throws IOException { IcebergSourceSplit split1 = createSplit(1L); IcebergSourceSplit split2 = createSplit(2L); - Map watermarkMap = new HashMap<>(); + Map watermarkMap = Maps.newHashMap(); watermarkMap.put(split1.splitId(), 1000L); watermarkMap.put(split2.splitId(), 2000L); @@ -137,8 +137,8 @@ private IcebergSourceSplit createSplit(long seed) throws IOException { } private static class CapturingSourceOutput implements SourceOutput { - final List watermarks = new ArrayList<>(); - final List records = new ArrayList<>(); + final List watermarks = Lists.newArrayList(); + final List records = Lists.newArrayList(); @Override public void collect(T record) { diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestWatermarkExtractorRecordEmitter.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestWatermarkExtractorRecordEmitter.java index 7710cfcc64f4..1cb7be03c6a7 100644 --- a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestWatermarkExtractorRecordEmitter.java +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestWatermarkExtractorRecordEmitter.java @@ -22,8 +22,6 @@ import java.io.IOException; import java.nio.file.Path; -import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.flink.api.common.eventtime.Watermark; @@ -32,6 +30,8 @@ import org.apache.iceberg.data.Record; import org.apache.iceberg.flink.TestFixtures; import org.apache.iceberg.flink.source.split.IcebergSourceSplit; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; @@ -75,7 +75,7 @@ public void testWatermarkNotEmittedWhenNewSplitHasLowerValue() throws IOExceptio IcebergSourceSplit split1 = createSplit(1L); IcebergSourceSplit split2 = createSplit(2L); - Map watermarkMap = new HashMap<>(); + Map watermarkMap = Maps.newHashMap(); watermarkMap.put(split1.splitId(), 2000L); watermarkMap.put(split2.splitId(), 1000L); @@ -97,7 +97,7 @@ public void testWatermarkEmittedForEachHigherSplit() throws IOException { IcebergSourceSplit split1 = createSplit(1L); IcebergSourceSplit split2 = createSplit(2L); - Map watermarkMap = new HashMap<>(); + Map watermarkMap = Maps.newHashMap(); watermarkMap.put(split1.splitId(), 1000L); watermarkMap.put(split2.splitId(), 2000L); @@ -137,8 +137,8 @@ private IcebergSourceSplit createSplit(long seed) throws IOException { } private static class CapturingSourceOutput implements SourceOutput { - final List watermarks = new ArrayList<>(); - final List records = new ArrayList<>(); + final List watermarks = Lists.newArrayList(); + final List records = Lists.newArrayList(); @Override public void collect(T record) { diff --git a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestWatermarkExtractorRecordEmitter.java b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestWatermarkExtractorRecordEmitter.java index 7710cfcc64f4..1cb7be03c6a7 100644 --- a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestWatermarkExtractorRecordEmitter.java +++ b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestWatermarkExtractorRecordEmitter.java @@ -22,8 +22,6 @@ import java.io.IOException; import java.nio.file.Path; -import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.flink.api.common.eventtime.Watermark; @@ -32,6 +30,8 @@ import org.apache.iceberg.data.Record; import org.apache.iceberg.flink.TestFixtures; import org.apache.iceberg.flink.source.split.IcebergSourceSplit; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; @@ -75,7 +75,7 @@ public void testWatermarkNotEmittedWhenNewSplitHasLowerValue() throws IOExceptio IcebergSourceSplit split1 = createSplit(1L); IcebergSourceSplit split2 = createSplit(2L); - Map watermarkMap = new HashMap<>(); + Map watermarkMap = Maps.newHashMap(); watermarkMap.put(split1.splitId(), 2000L); watermarkMap.put(split2.splitId(), 1000L); @@ -97,7 +97,7 @@ public void testWatermarkEmittedForEachHigherSplit() throws IOException { IcebergSourceSplit split1 = createSplit(1L); IcebergSourceSplit split2 = createSplit(2L); - Map watermarkMap = new HashMap<>(); + Map watermarkMap = Maps.newHashMap(); watermarkMap.put(split1.splitId(), 1000L); watermarkMap.put(split2.splitId(), 2000L); @@ -137,8 +137,8 @@ private IcebergSourceSplit createSplit(long seed) throws IOException { } private static class CapturingSourceOutput implements SourceOutput { - final List watermarks = new ArrayList<>(); - final List records = new ArrayList<>(); + final List watermarks = Lists.newArrayList(); + final List records = Lists.newArrayList(); @Override public void collect(T record) {