Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.cassandra.db.commitlog;

import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
Expand All @@ -28,11 +29,14 @@
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import org.apache.cassandra.cdc.CdcTester;
import org.apache.cassandra.cdc.CdcTests;
import org.apache.cassandra.cdc.LocalCommitLog;
import org.apache.cassandra.cdc.api.CommitLog;
import org.apache.cassandra.cdc.api.CommitLogReader;
import org.apache.cassandra.cdc.api.Marker;
import org.apache.cassandra.cdc.stats.CdcStats;
import org.apache.cassandra.spark.data.CqlTable;
Expand All @@ -54,48 +58,26 @@ public class BufferingCommitLogReaderTests
CdcTests.setup();
}

@BeforeEach
public void setUp()
{
CdcTester.tearDown();
CdcTester.testCommitLog.start();
}

@Test
public void testReaderSeek()
{
TestSchema schema = TestSchema.builder(BRIDGE)
.withPartitionKey("pk", BRIDGE.bigint())
.withColumn("c1", BRIDGE.bigint())
.withColumn("c2", BRIDGE.bigint())
.withCdc(true)
.build();
CqlTable cqlTable = schema.buildTable();
new SchemaBuilder(cqlTable, Partitioner.Murmur3Partitioner, true); // init Schema instance
int numRows = 1000;

// write some rows to a CommitLog
Set<Long> keys = new HashSet<>(numRows);
for (int i = 0; i < numRows; i++)
{
TestSchema.TestRow row = schema.randomRow();
while (keys.contains(row.getLong("pk")))
{
row = schema.randomRow();
}
keys.add(row.getLong("pk"));
CDC_BRIDGE.log(TimeProvider.DEFAULT, cqlTable, CdcTester.testCommitLog, row, TimeUnit.MILLISECONDS.toMicros(System.currentTimeMillis()));
}
CdcTester.testCommitLog.sync();
CommitLog firstLog = writeAndSync(numRows, keys::add);

List<Marker> markers = Collections.synchronizedList(new ArrayList<>());
CommitLog firstLog = CdcTests.logProvider(directory)
.logs()
.min(CommitLog::compareTo)
.orElseThrow(() -> new RuntimeException("Commit log file not found"));

// read entire commit log and verify correct
Consumer<Marker> listener = markers::add;
Set<Long> allRows = readLog(null, keys, firstLog, listener);
Set<Long> allRows = readLog(null, keys, firstLog, markers::add);
assertThat(allRows).hasSize(numRows);

// re-read commit log from each watermark position
// and verify subset of partitions are read
// re-read from each watermark and verify a strictly shrinking subset is returned
int foundRows = allRows.size();
allRows.clear();
List<Marker> allMarkers = new ArrayList<>(markers);
Marker prevMarker = null;
assertThat(allMarkers).isNotEmpty();
Expand All @@ -112,18 +94,76 @@ public void testReaderSeek()
prevMarker = marker;

if (marker.equals(allMarkers.get(allMarkers.size() - 1)))
{
// last marker should return 0 updates
// and be at the end of the file
assertThat(result).isEmpty();
}
else
{
assertThat(result).isNotEmpty();
}
}
}

@Test
public void testPositionAtMaxOffsetInCommitlogsWithPaddedZeros()
{
CommitLog log = writeAndSync(100, null);
// Seek to maxOffset so the loop runs 0 iterations (natural exhaustion);
// Fix 1 sets position = startMarker.position, Fix 2 sets position = maxOffset on clean exit.
Marker maxOffsetMarker = log.markerAt(log.segmentId(), (int) log.maxOffset());
try (BufferingCommitLogReader reader = new BufferingCommitLogReader(log, maxOffsetMarker, CdcStats.STUB, null))
{
CommitLogReader.Result result = reader.result();
assertThat(reader.position()).isEqualTo((int) log.maxOffset());
assertThat(result.isFullyRead()).isTrue();
}
}

@Test
public void testPositionAtMaxOffsetWhenSeekingToEnd()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Excepting the assertThat(result.updates()).isEmpty(); check, this test is identical to testPositionAtMaxOffsetInCommitlogsWithPaddedZeros(). Maybe we keep this one and just leave a javadoc breadcrumb to the JIRA ticket here and the 2 cases this test covers?

{
CommitLog log = writeAndSync(100, null);
// Seek directly to maxOffset to test Fix 1: position is initialized to startMarker.position
// rather than 0, ensuring isFullyRead evaluates correctly.
Marker endMarker = log.markerAt(log.segmentId(), (int) log.maxOffset());
try (BufferingCommitLogReader reader = new BufferingCommitLogReader(log, endMarker, CdcStats.STUB, null))
{
CommitLogReader.Result result = reader.result();
assertThat(result.updates()).isEmpty();
assertThat(reader.position()).isEqualTo((int) log.maxOffset());
assertThat(result.isFullyRead()).isTrue();
}
}

private CommitLog writeAndSync(int numRows, @Nullable Consumer<Long> keyCollector)
{
TestSchema schema = TestSchema.builder(BRIDGE)
.withPartitionKey("pk", BRIDGE.bigint())
.withColumn("c1", BRIDGE.bigint())
.withCdc(true)
.build();
CqlTable cqlTable = schema.buildTable();
new SchemaBuilder(cqlTable, Partitioner.Murmur3Partitioner, true);
for (int i = 0; i < numRows; i++)
{
TestSchema.TestRow row = schema.randomRow();
if (keyCollector != null)
keyCollector.accept(row.getLong("pk"));
CDC_BRIDGE.log(TimeProvider.DEFAULT, cqlTable, CdcTester.testCommitLog,
row, TimeUnit.MILLISECONDS.toMicros(System.currentTimeMillis()));
}
CdcTester.testCommitLog.sync();
return new LocalCommitLog(Paths.get(getFirstLog().path()))
{
@Override
public boolean completed() { return true; }
};
}

private CommitLog getFirstLog()
{
return CdcTests.logProvider(directory)
.logs()
.min(CommitLog::compareTo)
.orElseThrow(() -> new RuntimeException("Commit log file not found"));
}

private Set<Long> readLog(@Nullable Marker highWaterMark,
Set<Long> keys,
CommitLog logFile,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,9 @@ private void readCommitLogSegment() throws IOException
{
stats.commitLogBytesSkippedOnRead(startMarker.position() - reader.getFilePointer());
segmentReader.seek(startMarker.position());
// When starting from an offset, position must be initialized to startMarker.position()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just leaving a note so we're aligned (no change requested). The relationship between reader, startMarker, and this.position is something we should consider structurally relating in the future. If there's an expectation that these numeric sentinels are linked, wrapping them in code that enforces those invariants would help prevent gaps and allow us to refactor more safely.

Again - nothing to do here. ;) Just my first thoughts on looking at this code in detail. I think this is worth considering for a broader refactor; this BufferingCommitLogReader seems like a leaky abstraction on top of the regular commit log reader that's forcing us to do a lot of heavy lifting and exposing us to a lot of potential pain if we change it too much. 😬

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, we can note this down and do the refactoring in another patch later.

// rather than 0; an incorrect value causes isFullyRead to fail.
this.position = startMarker.position();
}

for (CommitLogSegmentReader.SyncSegment syncSegment : segmentReader)
Expand All @@ -309,6 +312,13 @@ private void readCommitLogSegment() throws IOException
break;
}
}

// If the segment reader iterator completes reading commitlog with padded zeros, set the position
// as maxOffset to mark completion of reading commitlog

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment confused me a bit and sent me down a rabbit hole. wdyt about this instead? Is it accurate?

// If the loop finished naturally (iterator exhausted) without hitting an error or limit,
// ensure the position reflects the end of the file. If we aborted early due to an error
// or mutation limit, 'this.position' remains at the last valid read offset.

if (statusTracker.shouldContinue())
{
this.position = (int) log.maxOffset();
}
}
// Unfortunately CommitLogSegmentReader.SegmentIterator (for-loop) cannot throw a checked exception,
// so we check to see if a RuntimeException is wrapping an IOException.
Expand Down Expand Up @@ -427,6 +437,7 @@ private void readSection(FileDataInput reader,
if (serializedSize == LEGACY_END_OF_SEGMENT_MARKER)
{
logger.trace("Encountered end of segment marker at", "position", reader.getFilePointer());
this.position = (int) log.maxOffset();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have a unit test that exercises this case of "hit LEGACY_END_OF_SEGMENT_MARKER?

statusTracker.requestTermination();
return;
}
Expand Down