From 59b892013d8bbd35c004fa1586ddf8140406416b Mon Sep 17 00:00:00 2001 From: jkonisa Date: Thu, 5 Mar 2026 11:32:41 -0800 Subject: [PATCH] CASSANALYTICS-124: Commitlog reading not progressing in CDC due to incorrect CommitLogReader.isFullyRead --- .../BufferingCommitLogReaderTests.java | 116 ++++++++++++------ .../commitlog/BufferingCommitLogReader.java | 11 ++ 2 files changed, 89 insertions(+), 38 deletions(-) diff --git a/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/db/commitlog/BufferingCommitLogReaderTests.java b/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/db/commitlog/BufferingCommitLogReaderTests.java index fa4dfd0b3..c25aa9f60 100644 --- a/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/db/commitlog/BufferingCommitLogReaderTests.java +++ b/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/db/commitlog/BufferingCommitLogReaderTests.java @@ -19,6 +19,7 @@ package org.apache.cassandra.db.commitlog; +import java.nio.file.Paths; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; @@ -28,11 +29,14 @@ import java.util.concurrent.TimeUnit; import java.util.function.Consumer; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.apache.cassandra.cdc.CdcTester; import org.apache.cassandra.cdc.CdcTests; +import org.apache.cassandra.cdc.LocalCommitLog; import org.apache.cassandra.cdc.api.CommitLog; +import org.apache.cassandra.cdc.api.CommitLogReader; import org.apache.cassandra.cdc.api.Marker; import org.apache.cassandra.cdc.stats.CdcStats; import org.apache.cassandra.spark.data.CqlTable; @@ -54,48 +58,26 @@ public class BufferingCommitLogReaderTests CdcTests.setup(); } + @BeforeEach + public void setUp() + { + CdcTester.tearDown(); + CdcTester.testCommitLog.start(); + } + @Test public void testReaderSeek() { - TestSchema schema = TestSchema.builder(BRIDGE) - .withPartitionKey("pk", BRIDGE.bigint()) - .withColumn("c1", BRIDGE.bigint()) - .withColumn("c2", BRIDGE.bigint()) - .withCdc(true) - .build(); - CqlTable cqlTable = schema.buildTable(); - new SchemaBuilder(cqlTable, Partitioner.Murmur3Partitioner, true); // init Schema instance int numRows = 1000; - - // write some rows to a CommitLog Set keys = new HashSet<>(numRows); - for (int i = 0; i < numRows; i++) - { - TestSchema.TestRow row = schema.randomRow(); - while (keys.contains(row.getLong("pk"))) - { - row = schema.randomRow(); - } - keys.add(row.getLong("pk")); - CDC_BRIDGE.log(TimeProvider.DEFAULT, cqlTable, CdcTester.testCommitLog, row, TimeUnit.MILLISECONDS.toMicros(System.currentTimeMillis())); - } - CdcTester.testCommitLog.sync(); + CommitLog firstLog = writeAndSync(numRows, keys::add); List markers = Collections.synchronizedList(new ArrayList<>()); - CommitLog firstLog = CdcTests.logProvider(directory) - .logs() - .min(CommitLog::compareTo) - .orElseThrow(() -> new RuntimeException("Commit log file not found")); - - // read entire commit log and verify correct - Consumer listener = markers::add; - Set allRows = readLog(null, keys, firstLog, listener); + Set allRows = readLog(null, keys, firstLog, markers::add); assertThat(allRows).hasSize(numRows); - // re-read commit log from each watermark position - // and verify subset of partitions are read + // re-read from each watermark and verify a strictly shrinking subset is returned int foundRows = allRows.size(); - allRows.clear(); List allMarkers = new ArrayList<>(markers); Marker prevMarker = null; assertThat(allMarkers).isNotEmpty(); @@ -112,18 +94,76 @@ public void testReaderSeek() prevMarker = marker; if (marker.equals(allMarkers.get(allMarkers.size() - 1))) - { - // last marker should return 0 updates - // and be at the end of the file assertThat(result).isEmpty(); - } else - { assertThat(result).isNotEmpty(); - } } } + @Test + public void testPositionAtMaxOffsetInCommitlogsWithPaddedZeros() + { + CommitLog log = writeAndSync(100, null); + // Seek to maxOffset so the loop runs 0 iterations (natural exhaustion); + // Fix 1 sets position = startMarker.position, Fix 2 sets position = maxOffset on clean exit. + Marker maxOffsetMarker = log.markerAt(log.segmentId(), (int) log.maxOffset()); + try (BufferingCommitLogReader reader = new BufferingCommitLogReader(log, maxOffsetMarker, CdcStats.STUB, null)) + { + CommitLogReader.Result result = reader.result(); + assertThat(reader.position()).isEqualTo((int) log.maxOffset()); + assertThat(result.isFullyRead()).isTrue(); + } + } + + @Test + public void testPositionAtMaxOffsetWhenSeekingToEnd() + { + CommitLog log = writeAndSync(100, null); + // Seek directly to maxOffset to test Fix 1: position is initialized to startMarker.position + // rather than 0, ensuring isFullyRead evaluates correctly. + Marker endMarker = log.markerAt(log.segmentId(), (int) log.maxOffset()); + try (BufferingCommitLogReader reader = new BufferingCommitLogReader(log, endMarker, CdcStats.STUB, null)) + { + CommitLogReader.Result result = reader.result(); + assertThat(result.updates()).isEmpty(); + assertThat(reader.position()).isEqualTo((int) log.maxOffset()); + assertThat(result.isFullyRead()).isTrue(); + } + } + + private CommitLog writeAndSync(int numRows, @Nullable Consumer keyCollector) + { + TestSchema schema = TestSchema.builder(BRIDGE) + .withPartitionKey("pk", BRIDGE.bigint()) + .withColumn("c1", BRIDGE.bigint()) + .withCdc(true) + .build(); + CqlTable cqlTable = schema.buildTable(); + new SchemaBuilder(cqlTable, Partitioner.Murmur3Partitioner, true); + for (int i = 0; i < numRows; i++) + { + TestSchema.TestRow row = schema.randomRow(); + if (keyCollector != null) + keyCollector.accept(row.getLong("pk")); + CDC_BRIDGE.log(TimeProvider.DEFAULT, cqlTable, CdcTester.testCommitLog, + row, TimeUnit.MILLISECONDS.toMicros(System.currentTimeMillis())); + } + CdcTester.testCommitLog.sync(); + return new LocalCommitLog(Paths.get(getFirstLog().path())) + { + @Override + public boolean completed() { return true; } + }; + } + + private CommitLog getFirstLog() + { + return CdcTests.logProvider(directory) + .logs() + .min(CommitLog::compareTo) + .orElseThrow(() -> new RuntimeException("Commit log file not found")); + } + private Set readLog(@Nullable Marker highWaterMark, Set keys, CommitLog logFile, diff --git a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/db/commitlog/BufferingCommitLogReader.java b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/db/commitlog/BufferingCommitLogReader.java index 8a808d98a..c8752476e 100644 --- a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/db/commitlog/BufferingCommitLogReader.java +++ b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/db/commitlog/BufferingCommitLogReader.java @@ -284,6 +284,9 @@ private void readCommitLogSegment() throws IOException { stats.commitLogBytesSkippedOnRead(startMarker.position() - reader.getFilePointer()); segmentReader.seek(startMarker.position()); + // When starting from an offset, position must be initialized to startMarker.position() + // rather than 0; an incorrect value causes isFullyRead to fail. + this.position = startMarker.position(); } for (CommitLogSegmentReader.SyncSegment syncSegment : segmentReader) @@ -309,6 +312,13 @@ private void readCommitLogSegment() throws IOException break; } } + + // If the segment reader iterator completes reading commitlog with padded zeros, set the position + // as maxOffset to mark completion of reading commitlog + if (statusTracker.shouldContinue()) + { + this.position = (int) log.maxOffset(); + } } // Unfortunately CommitLogSegmentReader.SegmentIterator (for-loop) cannot throw a checked exception, // so we check to see if a RuntimeException is wrapping an IOException. @@ -427,6 +437,7 @@ private void readSection(FileDataInput reader, if (serializedSize == LEGACY_END_OF_SEGMENT_MARKER) { logger.trace("Encountered end of segment marker at", "position", reader.getFilePointer()); + this.position = (int) log.maxOffset(); statusTracker.requestTermination(); return; }