diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/reader/WatermarkExtractorRecordEmitter.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/reader/WatermarkExtractorRecordEmitter.java index 02ef57d344b1..87612ea3061c 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/reader/WatermarkExtractorRecordEmitter.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/reader/WatermarkExtractorRecordEmitter.java @@ -34,7 +34,7 @@ class WatermarkExtractorRecordEmitter implements SerializableRecordEmitter private static final Logger LOG = LoggerFactory.getLogger(WatermarkExtractorRecordEmitter.class); private final SplitWatermarkExtractor timeExtractor; private String lastSplitId = null; - private long watermark; + private long watermark = Long.MIN_VALUE; WatermarkExtractorRecordEmitter(SplitWatermarkExtractor timeExtractor) { this.timeExtractor = timeExtractor; @@ -44,7 +44,8 @@ class WatermarkExtractorRecordEmitter implements SerializableRecordEmitter public void emitRecord( RecordAndPosition element, SourceOutput output, IcebergSourceSplit split) { if (!split.splitId().equals(lastSplitId)) { - long newWatermark = timeExtractor.extractWatermark(split); + long extracted = timeExtractor.extractWatermark(split); + long newWatermark = extracted > Long.MIN_VALUE ? extracted - 1 : Long.MIN_VALUE; if (newWatermark < watermark) { LOG.info( "Received a new split with lower watermark. Previous watermark = {}, current watermark = {}, previous split = {}, current split = {}", diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestWatermarkExtractorRecordEmitter.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestWatermarkExtractorRecordEmitter.java new file mode 100644 index 000000000000..1cb7be03c6a7 --- /dev/null +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestWatermarkExtractorRecordEmitter.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.source.reader; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.IOException; +import java.nio.file.Path; +import java.util.List; +import java.util.Map; +import org.apache.flink.api.common.eventtime.Watermark; +import org.apache.flink.api.connector.source.SourceOutput; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.flink.TestFixtures; +import org.apache.iceberg.flink.source.split.IcebergSourceSplit; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +public class TestWatermarkExtractorRecordEmitter { + @TempDir protected Path temporaryFolder; + + @Test + public void testWatermarkIsDecrementedByOne() throws IOException { + long extractedWatermark = 1000L; + IcebergSourceSplit split = createSplit(1L); + + WatermarkExtractorRecordEmitter emitter = + new WatermarkExtractorRecordEmitter<>(s -> extractedWatermark); + + CapturingSourceOutput output = new CapturingSourceOutput<>(); + emitter.emitRecord(new RecordAndPosition<>("record", 0, 0L), output, split); + + assertThat(output.watermarks).hasSize(1); + assertThat(output.watermarks.get(0).getTimestamp()).isEqualTo(extractedWatermark - 1); + } + + @Test + public void testWatermarkEmittedOnlyOncePerSplit() throws IOException { + IcebergSourceSplit split = createSplit(1L); + + WatermarkExtractorRecordEmitter emitter = + new WatermarkExtractorRecordEmitter<>(s -> 1000L); + + CapturingSourceOutput output = new CapturingSourceOutput<>(); + RecordAndPosition element = new RecordAndPosition<>("record", 0, 0L); + emitter.emitRecord(element, output, split); + emitter.emitRecord(element, output, split); + emitter.emitRecord(element, output, split); + + assertThat(output.watermarks).hasSize(1); + assertThat(output.records).hasSize(3); + } + + @Test + public void testWatermarkNotEmittedWhenNewSplitHasLowerValue() throws IOException { + IcebergSourceSplit split1 = createSplit(1L); + IcebergSourceSplit split2 = createSplit(2L); + + Map watermarkMap = Maps.newHashMap(); + watermarkMap.put(split1.splitId(), 2000L); + watermarkMap.put(split2.splitId(), 1000L); + + WatermarkExtractorRecordEmitter emitter = + new WatermarkExtractorRecordEmitter<>(s -> watermarkMap.get(s.splitId())); + + CapturingSourceOutput output = new CapturingSourceOutput<>(); + RecordAndPosition element = new RecordAndPosition<>("record", 0, 0L); + emitter.emitRecord(element, output, split1); + emitter.emitRecord(element, output, split2); + + // Only split1's watermark is emitted; split2 has a lower value so it's skipped + assertThat(output.watermarks).hasSize(1); + assertThat(output.watermarks.get(0).getTimestamp()).isEqualTo(1999L); + } + + @Test + public void testWatermarkEmittedForEachHigherSplit() throws IOException { + IcebergSourceSplit split1 = createSplit(1L); + IcebergSourceSplit split2 = createSplit(2L); + + Map watermarkMap = Maps.newHashMap(); + watermarkMap.put(split1.splitId(), 1000L); + watermarkMap.put(split2.splitId(), 2000L); + + WatermarkExtractorRecordEmitter emitter = + new WatermarkExtractorRecordEmitter<>(s -> watermarkMap.get(s.splitId())); + + CapturingSourceOutput output = new CapturingSourceOutput<>(); + RecordAndPosition element = new RecordAndPosition<>("record", 0, 0L); + emitter.emitRecord(element, output, split1); + emitter.emitRecord(element, output, split2); + + assertThat(output.watermarks).hasSize(2); + assertThat(output.watermarks.get(0).getTimestamp()).isEqualTo(999L); + assertThat(output.watermarks.get(1).getTimestamp()).isEqualTo(1999L); + } + + @Test + public void testWatermarkAtLongMinValueDoesNotOverflow() throws IOException { + IcebergSourceSplit split = createSplit(1L); + + WatermarkExtractorRecordEmitter emitter = + new WatermarkExtractorRecordEmitter<>(s -> Long.MIN_VALUE); + + CapturingSourceOutput output = new CapturingSourceOutput<>(); + emitter.emitRecord(new RecordAndPosition<>("record", 0, 0L), output, split); + + assertThat(output.watermarks).hasSize(1); + assertThat(output.watermarks.get(0).getTimestamp()).isEqualTo(Long.MIN_VALUE); + } + + private IcebergSourceSplit createSplit(long seed) throws IOException { + List> recordBatchList = + ReaderUtil.createRecordBatchList(seed, TestFixtures.SCHEMA, 1, 1); + return IcebergSourceSplit.fromCombinedScanTask( + ReaderUtil.createCombinedScanTask( + recordBatchList, temporaryFolder, FileFormat.PARQUET, TestFixtures.SCHEMA)); + } + + private static class CapturingSourceOutput implements SourceOutput { + final List watermarks = Lists.newArrayList(); + final List records = Lists.newArrayList(); + + @Override + public void collect(T record) { + records.add(record); + } + + @Override + public void collect(T record, long timestamp) { + records.add(record); + } + + @Override + public void emitWatermark(Watermark watermark) { + watermarks.add(watermark); + } + + @Override + public void markIdle() {} + + @Override + public void markActive() {} + } +} diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/source/reader/WatermarkExtractorRecordEmitter.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/source/reader/WatermarkExtractorRecordEmitter.java index 02ef57d344b1..87612ea3061c 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/source/reader/WatermarkExtractorRecordEmitter.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/source/reader/WatermarkExtractorRecordEmitter.java @@ -34,7 +34,7 @@ class WatermarkExtractorRecordEmitter implements SerializableRecordEmitter private static final Logger LOG = LoggerFactory.getLogger(WatermarkExtractorRecordEmitter.class); private final SplitWatermarkExtractor timeExtractor; private String lastSplitId = null; - private long watermark; + private long watermark = Long.MIN_VALUE; WatermarkExtractorRecordEmitter(SplitWatermarkExtractor timeExtractor) { this.timeExtractor = timeExtractor; @@ -44,7 +44,8 @@ class WatermarkExtractorRecordEmitter implements SerializableRecordEmitter public void emitRecord( RecordAndPosition element, SourceOutput output, IcebergSourceSplit split) { if (!split.splitId().equals(lastSplitId)) { - long newWatermark = timeExtractor.extractWatermark(split); + long extracted = timeExtractor.extractWatermark(split); + long newWatermark = extracted > Long.MIN_VALUE ? extracted - 1 : Long.MIN_VALUE; if (newWatermark < watermark) { LOG.info( "Received a new split with lower watermark. Previous watermark = {}, current watermark = {}, previous split = {}, current split = {}", diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestWatermarkExtractorRecordEmitter.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestWatermarkExtractorRecordEmitter.java new file mode 100644 index 000000000000..1cb7be03c6a7 --- /dev/null +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestWatermarkExtractorRecordEmitter.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.source.reader; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.IOException; +import java.nio.file.Path; +import java.util.List; +import java.util.Map; +import org.apache.flink.api.common.eventtime.Watermark; +import org.apache.flink.api.connector.source.SourceOutput; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.flink.TestFixtures; +import org.apache.iceberg.flink.source.split.IcebergSourceSplit; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +public class TestWatermarkExtractorRecordEmitter { + @TempDir protected Path temporaryFolder; + + @Test + public void testWatermarkIsDecrementedByOne() throws IOException { + long extractedWatermark = 1000L; + IcebergSourceSplit split = createSplit(1L); + + WatermarkExtractorRecordEmitter emitter = + new WatermarkExtractorRecordEmitter<>(s -> extractedWatermark); + + CapturingSourceOutput output = new CapturingSourceOutput<>(); + emitter.emitRecord(new RecordAndPosition<>("record", 0, 0L), output, split); + + assertThat(output.watermarks).hasSize(1); + assertThat(output.watermarks.get(0).getTimestamp()).isEqualTo(extractedWatermark - 1); + } + + @Test + public void testWatermarkEmittedOnlyOncePerSplit() throws IOException { + IcebergSourceSplit split = createSplit(1L); + + WatermarkExtractorRecordEmitter emitter = + new WatermarkExtractorRecordEmitter<>(s -> 1000L); + + CapturingSourceOutput output = new CapturingSourceOutput<>(); + RecordAndPosition element = new RecordAndPosition<>("record", 0, 0L); + emitter.emitRecord(element, output, split); + emitter.emitRecord(element, output, split); + emitter.emitRecord(element, output, split); + + assertThat(output.watermarks).hasSize(1); + assertThat(output.records).hasSize(3); + } + + @Test + public void testWatermarkNotEmittedWhenNewSplitHasLowerValue() throws IOException { + IcebergSourceSplit split1 = createSplit(1L); + IcebergSourceSplit split2 = createSplit(2L); + + Map watermarkMap = Maps.newHashMap(); + watermarkMap.put(split1.splitId(), 2000L); + watermarkMap.put(split2.splitId(), 1000L); + + WatermarkExtractorRecordEmitter emitter = + new WatermarkExtractorRecordEmitter<>(s -> watermarkMap.get(s.splitId())); + + CapturingSourceOutput output = new CapturingSourceOutput<>(); + RecordAndPosition element = new RecordAndPosition<>("record", 0, 0L); + emitter.emitRecord(element, output, split1); + emitter.emitRecord(element, output, split2); + + // Only split1's watermark is emitted; split2 has a lower value so it's skipped + assertThat(output.watermarks).hasSize(1); + assertThat(output.watermarks.get(0).getTimestamp()).isEqualTo(1999L); + } + + @Test + public void testWatermarkEmittedForEachHigherSplit() throws IOException { + IcebergSourceSplit split1 = createSplit(1L); + IcebergSourceSplit split2 = createSplit(2L); + + Map watermarkMap = Maps.newHashMap(); + watermarkMap.put(split1.splitId(), 1000L); + watermarkMap.put(split2.splitId(), 2000L); + + WatermarkExtractorRecordEmitter emitter = + new WatermarkExtractorRecordEmitter<>(s -> watermarkMap.get(s.splitId())); + + CapturingSourceOutput output = new CapturingSourceOutput<>(); + RecordAndPosition element = new RecordAndPosition<>("record", 0, 0L); + emitter.emitRecord(element, output, split1); + emitter.emitRecord(element, output, split2); + + assertThat(output.watermarks).hasSize(2); + assertThat(output.watermarks.get(0).getTimestamp()).isEqualTo(999L); + assertThat(output.watermarks.get(1).getTimestamp()).isEqualTo(1999L); + } + + @Test + public void testWatermarkAtLongMinValueDoesNotOverflow() throws IOException { + IcebergSourceSplit split = createSplit(1L); + + WatermarkExtractorRecordEmitter emitter = + new WatermarkExtractorRecordEmitter<>(s -> Long.MIN_VALUE); + + CapturingSourceOutput output = new CapturingSourceOutput<>(); + emitter.emitRecord(new RecordAndPosition<>("record", 0, 0L), output, split); + + assertThat(output.watermarks).hasSize(1); + assertThat(output.watermarks.get(0).getTimestamp()).isEqualTo(Long.MIN_VALUE); + } + + private IcebergSourceSplit createSplit(long seed) throws IOException { + List> recordBatchList = + ReaderUtil.createRecordBatchList(seed, TestFixtures.SCHEMA, 1, 1); + return IcebergSourceSplit.fromCombinedScanTask( + ReaderUtil.createCombinedScanTask( + recordBatchList, temporaryFolder, FileFormat.PARQUET, TestFixtures.SCHEMA)); + } + + private static class CapturingSourceOutput implements SourceOutput { + final List watermarks = Lists.newArrayList(); + final List records = Lists.newArrayList(); + + @Override + public void collect(T record) { + records.add(record); + } + + @Override + public void collect(T record, long timestamp) { + records.add(record); + } + + @Override + public void emitWatermark(Watermark watermark) { + watermarks.add(watermark); + } + + @Override + public void markIdle() {} + + @Override + public void markActive() {} + } +} diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/source/reader/WatermarkExtractorRecordEmitter.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/source/reader/WatermarkExtractorRecordEmitter.java index 02ef57d344b1..87612ea3061c 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/source/reader/WatermarkExtractorRecordEmitter.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/source/reader/WatermarkExtractorRecordEmitter.java @@ -34,7 +34,7 @@ class WatermarkExtractorRecordEmitter implements SerializableRecordEmitter private static final Logger LOG = LoggerFactory.getLogger(WatermarkExtractorRecordEmitter.class); private final SplitWatermarkExtractor timeExtractor; private String lastSplitId = null; - private long watermark; + private long watermark = Long.MIN_VALUE; WatermarkExtractorRecordEmitter(SplitWatermarkExtractor timeExtractor) { this.timeExtractor = timeExtractor; @@ -44,7 +44,8 @@ class WatermarkExtractorRecordEmitter implements SerializableRecordEmitter public void emitRecord( RecordAndPosition element, SourceOutput output, IcebergSourceSplit split) { if (!split.splitId().equals(lastSplitId)) { - long newWatermark = timeExtractor.extractWatermark(split); + long extracted = timeExtractor.extractWatermark(split); + long newWatermark = extracted > Long.MIN_VALUE ? extracted - 1 : Long.MIN_VALUE; if (newWatermark < watermark) { LOG.info( "Received a new split with lower watermark. Previous watermark = {}, current watermark = {}, previous split = {}, current split = {}", diff --git a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestWatermarkExtractorRecordEmitter.java b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestWatermarkExtractorRecordEmitter.java new file mode 100644 index 000000000000..1cb7be03c6a7 --- /dev/null +++ b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestWatermarkExtractorRecordEmitter.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.source.reader; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.IOException; +import java.nio.file.Path; +import java.util.List; +import java.util.Map; +import org.apache.flink.api.common.eventtime.Watermark; +import org.apache.flink.api.connector.source.SourceOutput; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.flink.TestFixtures; +import org.apache.iceberg.flink.source.split.IcebergSourceSplit; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +public class TestWatermarkExtractorRecordEmitter { + @TempDir protected Path temporaryFolder; + + @Test + public void testWatermarkIsDecrementedByOne() throws IOException { + long extractedWatermark = 1000L; + IcebergSourceSplit split = createSplit(1L); + + WatermarkExtractorRecordEmitter emitter = + new WatermarkExtractorRecordEmitter<>(s -> extractedWatermark); + + CapturingSourceOutput output = new CapturingSourceOutput<>(); + emitter.emitRecord(new RecordAndPosition<>("record", 0, 0L), output, split); + + assertThat(output.watermarks).hasSize(1); + assertThat(output.watermarks.get(0).getTimestamp()).isEqualTo(extractedWatermark - 1); + } + + @Test + public void testWatermarkEmittedOnlyOncePerSplit() throws IOException { + IcebergSourceSplit split = createSplit(1L); + + WatermarkExtractorRecordEmitter emitter = + new WatermarkExtractorRecordEmitter<>(s -> 1000L); + + CapturingSourceOutput output = new CapturingSourceOutput<>(); + RecordAndPosition element = new RecordAndPosition<>("record", 0, 0L); + emitter.emitRecord(element, output, split); + emitter.emitRecord(element, output, split); + emitter.emitRecord(element, output, split); + + assertThat(output.watermarks).hasSize(1); + assertThat(output.records).hasSize(3); + } + + @Test + public void testWatermarkNotEmittedWhenNewSplitHasLowerValue() throws IOException { + IcebergSourceSplit split1 = createSplit(1L); + IcebergSourceSplit split2 = createSplit(2L); + + Map watermarkMap = Maps.newHashMap(); + watermarkMap.put(split1.splitId(), 2000L); + watermarkMap.put(split2.splitId(), 1000L); + + WatermarkExtractorRecordEmitter emitter = + new WatermarkExtractorRecordEmitter<>(s -> watermarkMap.get(s.splitId())); + + CapturingSourceOutput output = new CapturingSourceOutput<>(); + RecordAndPosition element = new RecordAndPosition<>("record", 0, 0L); + emitter.emitRecord(element, output, split1); + emitter.emitRecord(element, output, split2); + + // Only split1's watermark is emitted; split2 has a lower value so it's skipped + assertThat(output.watermarks).hasSize(1); + assertThat(output.watermarks.get(0).getTimestamp()).isEqualTo(1999L); + } + + @Test + public void testWatermarkEmittedForEachHigherSplit() throws IOException { + IcebergSourceSplit split1 = createSplit(1L); + IcebergSourceSplit split2 = createSplit(2L); + + Map watermarkMap = Maps.newHashMap(); + watermarkMap.put(split1.splitId(), 1000L); + watermarkMap.put(split2.splitId(), 2000L); + + WatermarkExtractorRecordEmitter emitter = + new WatermarkExtractorRecordEmitter<>(s -> watermarkMap.get(s.splitId())); + + CapturingSourceOutput output = new CapturingSourceOutput<>(); + RecordAndPosition element = new RecordAndPosition<>("record", 0, 0L); + emitter.emitRecord(element, output, split1); + emitter.emitRecord(element, output, split2); + + assertThat(output.watermarks).hasSize(2); + assertThat(output.watermarks.get(0).getTimestamp()).isEqualTo(999L); + assertThat(output.watermarks.get(1).getTimestamp()).isEqualTo(1999L); + } + + @Test + public void testWatermarkAtLongMinValueDoesNotOverflow() throws IOException { + IcebergSourceSplit split = createSplit(1L); + + WatermarkExtractorRecordEmitter emitter = + new WatermarkExtractorRecordEmitter<>(s -> Long.MIN_VALUE); + + CapturingSourceOutput output = new CapturingSourceOutput<>(); + emitter.emitRecord(new RecordAndPosition<>("record", 0, 0L), output, split); + + assertThat(output.watermarks).hasSize(1); + assertThat(output.watermarks.get(0).getTimestamp()).isEqualTo(Long.MIN_VALUE); + } + + private IcebergSourceSplit createSplit(long seed) throws IOException { + List> recordBatchList = + ReaderUtil.createRecordBatchList(seed, TestFixtures.SCHEMA, 1, 1); + return IcebergSourceSplit.fromCombinedScanTask( + ReaderUtil.createCombinedScanTask( + recordBatchList, temporaryFolder, FileFormat.PARQUET, TestFixtures.SCHEMA)); + } + + private static class CapturingSourceOutput implements SourceOutput { + final List watermarks = Lists.newArrayList(); + final List records = Lists.newArrayList(); + + @Override + public void collect(T record) { + records.add(record); + } + + @Override + public void collect(T record, long timestamp) { + records.add(record); + } + + @Override + public void emitWatermark(Watermark watermark) { + watermarks.add(watermark); + } + + @Override + public void markIdle() {} + + @Override + public void markActive() {} + } +}