From 9342d263a0f99fdc69c9f69033dae7515ab781d9 Mon Sep 17 00:00:00 2001 From: Lukasz Antoniak Date: Fri, 27 Feb 2026 16:32:21 +0100 Subject: [PATCH 1/2] CASSANALYTICS-60: CDC support for Cassandra 5.0 commit logs --- .../cassandra/cdc/kafka/KafkaPublisher.java | 12 +++- cassandra-analytics-cdc/build.gradle | 10 ++-- .../cassandra/bridge/CdcBridgeFactory.java | 31 +++++++++- .../org/apache/cassandra/cdc/CdcTester.java | 35 ++++++----- .../org/apache/cassandra/cdc/CdcTests.java | 36 ++++++++---- .../cdc/CollectionDeletionTests.java | 5 +- .../cdc/MicroBatchIteratorTests.java | 19 +++--- .../org/apache/cassandra/cdc/TestUtils.java | 47 +++++++++++++++ .../cassandra/cdc/TestVersionSupplier.java | 48 +++++++++++++++ .../apache/cassandra/cdc/TypeCacheTests.java | 3 +- .../BufferingCommitLogReaderTests.java | 51 ++++++++-------- .../apache/cassandra/cdc/api/CommitLog.java | 18 ++++-- .../cdc/msg/RangeTombstoneBuilder.java | 34 +++-------- .../apache/cassandra/bridge/CdcBridge.java | 5 ++ .../bridge/CdcBridgeImplementation.java | 2 + .../java/org/apache/cassandra/db/DbUtils.java | 53 +++++++++++++++++ .../apache/cassandra/spark/data/CqlType.java | 13 +++++ .../AbstractCdcBridgeImplementation.java | 43 +++++++++++--- .../bridge/CdcBridgeImplementation.java | 4 +- .../msg/AbstractRangeTombstoneBuilder.java | 58 +++++++++++++++++++ .../msg/FourZeroRangeTombstoneBuilder.java | 2 +- .../java/org/apache/cassandra/db/DbUtils.java | 55 ++++++++++++++++++ .../commitlog/BufferingCommitLogReader.java | 2 +- .../cassandra/spark/data/AbstractCqlType.java | 4 +- .../apache/cassandra/spark/data/CqlType.java | 17 ++++++ 25 files changed, 491 insertions(+), 116 deletions(-) create mode 100644 cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/TestUtils.java create mode 100644 cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/TestVersionSupplier.java create mode 100644 cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/db/DbUtils.java create mode 100644 cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/cdc/msg/AbstractRangeTombstoneBuilder.java create mode 100644 cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/db/DbUtils.java diff --git a/cassandra-analytics-cdc-codec/src/main/java/org/apache/cassandra/cdc/kafka/KafkaPublisher.java b/cassandra-analytics-cdc-codec/src/main/java/org/apache/cassandra/cdc/kafka/KafkaPublisher.java index 41dd5350c..0eb550ca7 100644 --- a/cassandra-analytics-cdc-codec/src/main/java/org/apache/cassandra/cdc/kafka/KafkaPublisher.java +++ b/cassandra-analytics-cdc-codec/src/main/java/org/apache/cassandra/cdc/kafka/KafkaPublisher.java @@ -46,6 +46,7 @@ public class KafkaPublisher implements AutoCloseable { private static final Logger LOGGER = LoggerFactory.getLogger(KafkaPublisher.class); + protected CassandraVersion version; protected TopicSupplier topicSupplier; protected int maxRecordSizeBytes; protected final RecordProducer recordProducer; @@ -62,7 +63,8 @@ public class KafkaPublisher implements AutoCloseable ThreadLocal.withInitial(HashMap::new); protected final KafkaStats kafkaStats; - public KafkaPublisher(TopicSupplier topicSupplier, + public KafkaPublisher(String version, + TopicSupplier topicSupplier, KafkaProducer producer, Serializer serializer, int maxRecordSizeBytes, @@ -71,6 +73,7 @@ public KafkaPublisher(TopicSupplier topicSupplier, CdcLogMode logMode) { this( + version, topicSupplier, producer, serializer, @@ -84,7 +87,8 @@ public KafkaPublisher(TopicSupplier topicSupplier, ); } - public KafkaPublisher(TopicSupplier topicSupplier, + public KafkaPublisher(String version, + TopicSupplier topicSupplier, KafkaProducer producer, Serializer serializer, int maxRecordSizeBytes, @@ -95,6 +99,8 @@ public KafkaPublisher(TopicSupplier topicSupplier, RecordProducer recordProducer, EventHasher eventHasher) { + this.version = CassandraVersion.fromVersion(version).orElseThrow( + () -> new IllegalArgumentException("Unsupported Cassandra version: " + version)); this.topicSupplier = topicSupplier; this.maxRecordSizeBytes = maxRecordSizeBytes; this.failOnRecordTooLargeError = failOnRecordTooLargeError; @@ -116,7 +122,7 @@ public CqlField.CqlType getType(KeyspaceTypeKey key) public CassandraVersion version() { - return CassandraVersion.FOURZERO; + return version; } public Logger logger() diff --git a/cassandra-analytics-cdc/build.gradle b/cassandra-analytics-cdc/build.gradle index cd4a2dc29..660150b59 100644 --- a/cassandra-analytics-cdc/build.gradle +++ b/cassandra-analytics-cdc/build.gradle @@ -97,10 +97,10 @@ dependencies { testImplementation project(":cassandra-four-zero-types") testImplementation project(":cassandra-four-zero-bridge") testImplementation project(path: ':cassandra-four-zero', configuration: 'shadow') - // unit tests are performed only with Cassandra 4 - // testImplementation project(":cassandra-five-zero-bridge") - // testImplementation project(":cassandra-five-zero-types") - // testImplementation project(path: ':cassandra-five-zero', configuration: 'shadow') + + testImplementation project(":cassandra-five-zero-bridge") + testImplementation project(":cassandra-five-zero-types") + testImplementation project(path: ':cassandra-five-zero', configuration: 'shadow') testImplementation(group: 'org.quicktheories', name: 'quicktheories', version: "${project.rootProject.quickTheoriesVersion}") testImplementation(group: 'com.google.guava', name: 'guava', version: '31.1-jre') @@ -139,6 +139,8 @@ jar { } test { + systemProperty "cassandra.sidecar.versions_to_test", (System.getenv("CASSANDRA_VERSION") ?: "4.0.0") + minHeapSize = '1024m' maxHeapSize = '3072m' maxParallelForks = Math.max(Runtime.runtime.availableProcessors() * 2, 8) diff --git a/cassandra-analytics-cdc/src/main/java/org/apache/cassandra/bridge/CdcBridgeFactory.java b/cassandra-analytics-cdc/src/main/java/org/apache/cassandra/bridge/CdcBridgeFactory.java index 0a1e3449a..301940008 100644 --- a/cassandra-analytics-cdc/src/main/java/org/apache/cassandra/bridge/CdcBridgeFactory.java +++ b/cassandra-analytics-cdc/src/main/java/org/apache/cassandra/bridge/CdcBridgeFactory.java @@ -25,6 +25,7 @@ import java.util.concurrent.ConcurrentHashMap; import org.apache.cassandra.cdc.avro.CqlToAvroSchemaConverter; +import org.apache.cassandra.spark.utils.Throwing; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -40,14 +41,17 @@ public static class VersionSpecificBridge public final CdcBridge cdcBridge; @Nullable final CqlToAvroSchemaConverter avroSchemaConverter; + final ClassLoader classLoader; public VersionSpecificBridge(CassandraBridge cassandraBridge, CdcBridge cdcBridge, - @Nullable CqlToAvroSchemaConverter avroSchemaConverter) + @Nullable CqlToAvroSchemaConverter avroSchemaConverter, + ClassLoader classLoader) { this.cassandraBridge = cassandraBridge; this.cdcBridge = cdcBridge; this.avroSchemaConverter = avroSchemaConverter; + this.classLoader = classLoader; } } @@ -150,7 +154,7 @@ private static VersionSpecificBridge create(@NotNull String label) { } - return new VersionSpecificBridge(bridgeInstance, cdcBridgeInstance, cqlToAvroSchemaConverter); + return new VersionSpecificBridge(bridgeInstance, cdcBridgeInstance, cqlToAvroSchemaConverter, loader); } catch (ClassNotFoundException | NoSuchMethodException | InstantiationException | IllegalAccessException | InvocationTargetException exception) @@ -158,4 +162,27 @@ private static VersionSpecificBridge create(@NotNull String label) throw new RuntimeException("Failed to create Cassandra bridge for label " + label, exception); } } + + public static T executeActionOnBridgeClassLoader(@NotNull CassandraVersion version, Throwing.Function action) + { + ClassLoader bridgeLoader = getVersionSpecificBridge(version).classLoader; + Thread currentThread = Thread.currentThread(); + ClassLoader originalClassLoader = currentThread.getContextClassLoader(); + try + { + currentThread.setContextClassLoader(bridgeLoader); + try + { + return action.apply(bridgeLoader); + } + catch (Exception e) + { + throw new RuntimeException("Failed to execute function on bridge classloader", e); + } + } + finally + { + currentThread.setContextClassLoader(originalClassLoader); + } + } } diff --git a/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/CdcTester.java b/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/CdcTester.java index 4087f5318..be3fd0804 100644 --- a/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/CdcTester.java +++ b/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/CdcTester.java @@ -19,6 +19,7 @@ package org.apache.cassandra.cdc; +import java.lang.reflect.Method; import java.nio.file.Path; import java.util.ArrayList; import java.util.LinkedHashMap; @@ -37,17 +38,17 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.bridge.BridgeInitializationParameters; import org.apache.cassandra.bridge.CassandraBridge; -import org.apache.cassandra.bridge.CassandraVersion; -import org.apache.cassandra.bridge.CdcBridgeImplementation; +import org.apache.cassandra.bridge.CdcBridge; import org.apache.cassandra.cdc.api.CassandraSource; import org.apache.cassandra.cdc.api.CdcOptions; +import org.apache.cassandra.cdc.api.CommitLogInstance; import org.apache.cassandra.cdc.msg.CdcEvent; import org.apache.cassandra.cdc.state.CdcState; import org.apache.cassandra.spark.data.CqlTable; import org.apache.cassandra.spark.data.partitioner.Partitioner; import org.apache.cassandra.spark.utils.IOUtils; -import org.apache.cassandra.spark.utils.ThrowableUtils; import org.apache.cassandra.spark.utils.TimeProvider; import org.apache.cassandra.spark.utils.test.TestSchema; import org.jetbrains.annotations.Nullable; @@ -61,17 +62,27 @@ public class CdcTester private static final Logger LOGGER = LoggerFactory.getLogger(CdcTester.class); public static final int DEFAULT_NUM_ROWS = 1000; - public static FourZeroCommitLog testCommitLog; + public static CommitLogInstance testCommitLog; - public static void setup(Path testDirectory) + public static void setup(CdcBridge cdcBridge, Path testDirectory) { - setup(testDirectory, 32, false); + setup(cdcBridge, testDirectory, 32, false); } - public static void setup(Path testDirectory, int commitLogSegmentSize, boolean enableCompression) + public static void setup(CdcBridge cdcBridge, Path testDirectory, int commitLogSegmentSize, boolean enableCompression) { - CdcBridgeImplementation.setup(testDirectory, commitLogSegmentSize, enableCompression); - testCommitLog = new FourZeroCommitLog(testDirectory); + try + { + // TODO: Refactor static initialization to instance method. + // use reflection to execute static initialization + Method setupMethod = cdcBridge.getClass().getMethod("setup", Path.class, int.class, boolean.class, BridgeInitializationParameters.class); + setupMethod.invoke(null, testDirectory, commitLogSegmentSize, enableCompression, BridgeInitializationParameters.fromEnvironment()); + } + catch (Exception e) + { + throw new IllegalStateException("Failed to setup CdcBridge", e); + } + testCommitLog = cdcBridge.createCommitLogInstance(testDirectory); } public static void tearDown() @@ -260,7 +271,6 @@ public void sync() void run() { Map rows = new LinkedHashMap<>(numRows); - CassandraVersion version = CassandraVersion.FOURZERO; List cdcEvents = new ArrayList<>(); try @@ -268,7 +278,7 @@ void run() LOGGER.info("Running CDC test testId={} schema='{}' thread={}", testId, cqlTable.fields(), Thread.currentThread().getName()); Set udtStmts = schema.udts.stream().map(e -> e.createStatement(bridge.cassandraTypes(), schema.keyspace)).collect(Collectors.toSet()); bridge.buildSchema(schema.createStatement, schema.keyspace, schema.rf, partitioner, udtStmts, null, 0, true); - schema.setCassandraVersion(version); + schema.setCassandraVersion(TestVersionSupplier.testVersion()); // write some mutations to CDC CommitLog for (CdcWriter writer : writers) @@ -312,8 +322,7 @@ void run() } catch (Throwable t) { - LOGGER.error("Unexpected error in CdcTester", ThrowableUtils.rootCause(t)); - t.printStackTrace(); + LOGGER.error("Unexpected error in CdcTester", t); fail("Unexpected error in CdcTester"); } finally diff --git a/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/CdcTests.java b/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/CdcTests.java index 3e2377846..b7533acdd 100644 --- a/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/CdcTests.java +++ b/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/CdcTests.java @@ -53,9 +53,9 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.bridge.CassandraBridge; -import org.apache.cassandra.bridge.CassandraBridgeImplementation; +import org.apache.cassandra.bridge.CassandraVersion; import org.apache.cassandra.bridge.CdcBridge; -import org.apache.cassandra.bridge.CdcBridgeImplementation; +import org.apache.cassandra.bridge.CdcBridgeFactory; import org.apache.cassandra.bridge.TokenRange; import org.apache.cassandra.cdc.api.CdcOptions; import org.apache.cassandra.cdc.api.CommitLog; @@ -69,13 +69,12 @@ import org.apache.cassandra.cdc.msg.jdk.JdkMessageConverter; import org.apache.cassandra.cdc.state.CdcState; import org.apache.cassandra.db.marshal.ByteBufferAccessor; -import org.apache.cassandra.schema.Schema; import org.apache.cassandra.serializers.CollectionSerializer; import org.apache.cassandra.spark.data.CqlField; import org.apache.cassandra.spark.data.CqlTable; +import org.apache.cassandra.spark.data.ReplicationFactor; import org.apache.cassandra.spark.data.partitioner.CassandraInstance; import org.apache.cassandra.spark.data.partitioner.Partitioner; -import org.apache.cassandra.spark.reader.SchemaBuilder; import org.apache.cassandra.spark.utils.AsyncExecutor; import org.apache.cassandra.spark.utils.ByteBufferUtils; import org.apache.cassandra.spark.utils.IOUtils; @@ -107,6 +106,11 @@ public int minimumReplicas(String keyspace) { return 1; } + + public CassandraVersion version() + { + return TestVersionSupplier.testVersion(); + } }; public static final ExecutorService EXECUTOR = Executors.newFixedThreadPool(4, new ThreadFactoryBuilder() @@ -114,10 +118,9 @@ public int minimumReplicas(String keyspace) .setDaemon(true) .build()); public static final AsyncExecutor ASYNC_EXECUTOR = AsyncExecutor.wrap(EXECUTOR); - // TODO: Execute CDC tests also with Cassandra 5 bridge. - public static final CassandraBridge BRIDGE = new CassandraBridgeImplementation(); + public static final CassandraBridge BRIDGE = CdcBridgeFactory.get(TestVersionSupplier.testVersion()); public static final JdkMessageConverter MESSAGE_CONVERTER = new JdkMessageConverter(BRIDGE.cassandraTypes()); - public static final CdcBridge CDC_BRIDGE = new CdcBridgeImplementation(); + public static final CdcBridge CDC_BRIDGE = CdcBridgeFactory.getCdcBridge(TestVersionSupplier.testVersion()); private static final int TTL = 42; @@ -143,7 +146,7 @@ public static synchronized void setup() { throw new RuntimeException(e); } - CdcTester.setup(directory); + CdcTester.setup(CDC_BRIDGE, directory); isSetup = true; } @@ -215,7 +218,6 @@ public void testMockedCdc() null, 0, true); - UUID tableId = Schema.instance.getTableMetadata(table.keyspace(), table.table()).id.asUUID(); SchemaSupplier schemaSupplier = () -> CompletableFuture.completedFuture(ImmutableSet.of(table)); AtomicReference state = new AtomicReference<>(); StatePersister statePersister = new StatePersister() @@ -254,7 +256,7 @@ public List loadState(String jobId, int partitionId, @Nullable TokenRa try (Cdc cdc = Cdc.builder("101", 0, eventConsumer, schemaSupplier) .withExecutor(CdcTests.ASYNC_EXECUTOR) .withStatePersister(statePersister) - .withTableIdLookup((ks, tb) -> tableId) + .withTableIdLookup(CDC_BRIDGE.internalTableIdLookup()) .withCommitLogProvider(CdcTests.logProvider(CdcTests.directory)) .withCdcOptions(CdcTests.TEST_OPTIONS) .build()) @@ -542,8 +544,18 @@ public void testMultiTable() TestSchema schema3 = tableBuilder3.build(); CqlTable cqlTable2 = schema2.buildTable(); CqlTable cqlTable3 = schema3.buildTable(); - new SchemaBuilder(cqlTable2, Partitioner.Murmur3Partitioner, schema2.withCdc); - new SchemaBuilder(cqlTable3, Partitioner.Murmur3Partitioner, schema3.withCdc); + BRIDGE.buildSchema(cqlTable2.createStatement(), + cqlTable2.keyspace(), + ReplicationFactor.simpleStrategy(1), + Partitioner.Murmur3Partitioner, + Collections.emptySet(), + null, 0, schema2.withCdc); + BRIDGE.buildSchema(cqlTable3.createStatement(), + cqlTable3.keyspace(), + ReplicationFactor.simpleStrategy(1), + Partitioner.Murmur3Partitioner, + Collections.emptySet(), + null, 0, schema3.withCdc); int numRows = DEFAULT_NUM_ROWS; AtomicReference schema1Holder = new AtomicReference<>(); diff --git a/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/CollectionDeletionTests.java b/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/CollectionDeletionTests.java index dc5d76f26..30d923a65 100644 --- a/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/CollectionDeletionTests.java +++ b/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/CollectionDeletionTests.java @@ -32,10 +32,8 @@ import com.google.common.collect.ImmutableList; import org.junit.jupiter.api.Test; -import org.apache.cassandra.bridge.CollectionElement; import org.apache.cassandra.cdc.msg.CdcEvent; import org.apache.cassandra.cdc.msg.Value; -import org.apache.cassandra.db.rows.CellPath; import org.apache.cassandra.spark.data.CqlField; import org.apache.cassandra.spark.utils.test.TestSchema; @@ -115,7 +113,8 @@ private void testElementDeletionInCollection(int numOfPKs, testRow = CdcTester.newUniqueRow(tester.schema, rows); for (String name : collectionColumnNames) { - testRow = testRow.copy(name, CollectionElement.deleted(CellPath.create(key))); + Object value = TestUtils.collectionDeleteMutation(TestVersionSupplier.testVersion(), key); + testRow = testRow.copy(name, value); } elementDeletionIndices.put(i, key.array()); } diff --git a/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/MicroBatchIteratorTests.java b/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/MicroBatchIteratorTests.java index cd890df29..76fb5ae5f 100644 --- a/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/MicroBatchIteratorTests.java +++ b/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/MicroBatchIteratorTests.java @@ -20,6 +20,7 @@ package org.apache.cassandra.cdc; import java.nio.ByteBuffer; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -33,8 +34,6 @@ import com.google.common.collect.ImmutableSet; import org.junit.jupiter.api.Test; -import org.apache.cassandra.bridge.CassandraVersion; -import org.apache.cassandra.bridge.CollectionElement; import org.apache.cassandra.cdc.api.CassandraSource; import org.apache.cassandra.cdc.api.RangeTombstoneData; import org.apache.cassandra.cdc.msg.CdcEvent; @@ -43,11 +42,10 @@ import org.apache.cassandra.cdc.msg.jdk.Column; import org.apache.cassandra.cdc.msg.jdk.RangeTombstoneMsg; import org.apache.cassandra.cdc.state.CdcState; -import org.apache.cassandra.db.rows.CellPath; import org.apache.cassandra.spark.data.CqlField; import org.apache.cassandra.spark.data.CqlTable; +import org.apache.cassandra.spark.data.ReplicationFactor; import org.apache.cassandra.spark.data.partitioner.Partitioner; -import org.apache.cassandra.spark.reader.SchemaBuilder; import org.apache.cassandra.spark.utils.RandomUtils; import org.apache.cassandra.spark.utils.TimeProvider; import org.apache.cassandra.spark.utils.test.TestSchema; @@ -81,7 +79,7 @@ public void testSetDeletion() TestSchema.TestRow testRow = CdcTester.newUniqueRow(schema, rows); String deletedValue = (String) BRIDGE.text().randomValue(4); ByteBuffer key = BRIDGE.text().serialize(deletedValue); - testRow = testRow.copy("b", CollectionElement.deleted(CellPath.create(key))); + testRow = testRow.copy("b", TestUtils.collectionDeleteMutation(TestVersionSupplier.testVersion(), key)); deletedValues.put(testRow.get(0).toString(), deletedValue); return testRow; }, @@ -106,7 +104,7 @@ public void testMapDeletion() TestSchema.TestRow testRow = CdcTester.newUniqueRow(schema, rows); String deletedValue = (String) BRIDGE.text().randomValue(4); ByteBuffer key = BRIDGE.text().serialize(deletedValue); - testRow = testRow.copy("b", CollectionElement.deleted(CellPath.create(key))); + testRow = testRow.copy("b", TestUtils.collectionDeleteMutation(TestVersionSupplier.testVersion(), key)); deletedValues.put(testRow.get(0).toString(), deletedValue); return testRow; }, @@ -368,8 +366,13 @@ private static void runTest(TestSchema.Builder schemaBuilder, .withCdc(true) .build(); CqlTable cqlTable = schema.buildTable(); - new SchemaBuilder(cqlTable, Partitioner.Murmur3Partitioner, schema.withCdc); - schema.setCassandraVersion(CassandraVersion.FOURZERO); + BRIDGE.buildSchema(cqlTable.createStatement(), + cqlTable.keyspace(), + ReplicationFactor.simpleStrategy(1), + Partitioner.Murmur3Partitioner, + Collections.emptySet(), + null, 0, schema.withCdc); + schema.setCassandraVersion(TestVersionSupplier.testVersion()); try { diff --git a/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/TestUtils.java b/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/TestUtils.java new file mode 100644 index 000000000..55ebc1379 --- /dev/null +++ b/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/TestUtils.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.cdc; + +import java.lang.reflect.Method; +import java.nio.ByteBuffer; + +import org.apache.cassandra.bridge.CassandraVersion; +import org.apache.cassandra.bridge.CdcBridgeFactory; + +public class TestUtils +{ + private TestUtils() + { + throw new IllegalStateException(getClass() + " is static utility class and shall not be instantiated"); + } + + public static Object collectionDeleteMutation(CassandraVersion version, ByteBuffer key) + { + return CdcBridgeFactory.executeActionOnBridgeClassLoader(version, (classLoader) -> { + Class cellPathClass = Class.forName("org.apache.cassandra.db.rows.CellPath", true, classLoader); + Method cellPathFactory = cellPathClass.getMethod("create", ByteBuffer.class); + Object cellPath = cellPathFactory.invoke(null, key); + + Class collectionElementClass = Class.forName("org.apache.cassandra.bridge.CollectionElement", true, classLoader); + Method collectionElementFactory = collectionElementClass.getMethod("deleted", cellPathClass); + return collectionElementFactory.invoke(null, cellPath); + }); + } +} diff --git a/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/TestVersionSupplier.java b/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/TestVersionSupplier.java new file mode 100644 index 000000000..78f6f5aed --- /dev/null +++ b/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/TestVersionSupplier.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.cdc; + +import java.util.Arrays; +import java.util.stream.Stream; + +import org.apache.cassandra.bridge.CassandraVersion; + +public final class TestVersionSupplier +{ + private TestVersionSupplier() + { + throw new IllegalStateException(getClass() + " is static utility class and shall not be instantiated"); + } + + public static Stream testVersions() + { + String versions = System.getProperty("cassandra.sidecar.versions_to_test", "4.0.17,5.0.5"); + return Arrays.stream(versions.split(",")) + .map(String::trim) + .map(v -> CassandraVersion.fromVersion(v).orElseThrow(() -> new IllegalArgumentException("Unsupported version: " + v))); + } + + @Deprecated + public static CassandraVersion testVersion() + { + // TODO: Refactor to parametrized tests and verify all Cassandra versions in a single test run. + // Usage of this method shall be replaced with testVersions(). + return testVersions().findFirst().orElseThrow(); + } +} diff --git a/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/TypeCacheTests.java b/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/TypeCacheTests.java index 8f78b4354..c9d748610 100644 --- a/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/TypeCacheTests.java +++ b/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/TypeCacheTests.java @@ -21,7 +21,6 @@ import org.junit.jupiter.api.Test; -import org.apache.cassandra.bridge.CassandraVersion; import org.apache.cassandra.spark.data.CassandraTypes; import org.apache.cassandra.spark.data.CqlField; @@ -33,7 +32,7 @@ public class TypeCacheTests @Test public void testTypeCache() { - TypeCache typeCache = TypeCache.get(CassandraVersion.FOURZERO); + TypeCache typeCache = TypeCache.get(TestVersionSupplier.testVersion()); assertThat(typeCache.cqlTypeCache).isNull(); CqlField.CqlType ksBigInt = typeCache.getType("ks", "bigint"); diff --git a/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/db/commitlog/BufferingCommitLogReaderTests.java b/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/db/commitlog/BufferingCommitLogReaderTests.java index fa4dfd0b3..b1aa56d2b 100644 --- a/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/db/commitlog/BufferingCommitLogReaderTests.java +++ b/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/db/commitlog/BufferingCommitLogReaderTests.java @@ -30,14 +30,16 @@ import org.junit.jupiter.api.Test; +import org.apache.cassandra.bridge.CdcBridge; import org.apache.cassandra.cdc.CdcTester; import org.apache.cassandra.cdc.CdcTests; import org.apache.cassandra.cdc.api.CommitLog; +import org.apache.cassandra.cdc.api.CommitLogMarkers; import org.apache.cassandra.cdc.api.Marker; import org.apache.cassandra.cdc.stats.CdcStats; import org.apache.cassandra.spark.data.CqlTable; +import org.apache.cassandra.spark.data.ReplicationFactor; import org.apache.cassandra.spark.data.partitioner.Partitioner; -import org.apache.cassandra.spark.reader.SchemaBuilder; import org.apache.cassandra.spark.utils.TimeProvider; import org.apache.cassandra.spark.utils.test.TestSchema; import org.jetbrains.annotations.Nullable; @@ -64,7 +66,12 @@ public void testReaderSeek() .withCdc(true) .build(); CqlTable cqlTable = schema.buildTable(); - new SchemaBuilder(cqlTable, Partitioner.Murmur3Partitioner, true); // init Schema instance + BRIDGE.buildSchema(cqlTable.createStatement(), + cqlTable.keyspace(), + ReplicationFactor.simpleStrategy(1), + Partitioner.Murmur3Partitioner, + Collections.emptySet(), + null, 0, true); int numRows = 1000; // write some rows to a CommitLog @@ -89,7 +96,7 @@ public void testReaderSeek() // read entire commit log and verify correct Consumer listener = markers::add; - Set allRows = readLog(null, keys, firstLog, listener); + Set allRows = readLog(CDC_BRIDGE, null, keys, firstLog, listener); assertThat(allRows).hasSize(numRows); // re-read commit log from each watermark position @@ -101,7 +108,7 @@ public void testReaderSeek() assertThat(allMarkers).isNotEmpty(); for (Marker marker : allMarkers) { - Set result = readLog(marker, keys, firstLog, null); + Set result = readLog(CDC_BRIDGE, marker, keys, firstLog, null); assertThat(result.size()).isLessThan(foundRows); foundRows = result.size(); if (prevMarker != null) @@ -124,32 +131,30 @@ public void testReaderSeek() } } - private Set readLog(@Nullable Marker highWaterMark, + private Set readLog(CdcBridge cdcBridge, + @Nullable Marker highWaterMark, Set keys, CommitLog logFile, @Nullable Consumer listener) { Set keysRead = new HashSet<>(); - try (BufferingCommitLogReader reader = new BufferingCommitLogReader(logFile, - highWaterMark, - CdcStats.STUB, - listener)) + BufferingCommitLogReader.Result result = cdcBridge.readLog(logFile, + null, + CommitLogMarkers.of(highWaterMark), + 0, + CdcStats.STUB, + null, + listener, + null, + false); + for (PartitionUpdateWrapper update : result.updates()) { - BufferingCommitLogReader.Result result = reader.result(); - for (PartitionUpdateWrapper update : result.updates()) - { - long key = Objects.requireNonNull(update.partitionKey()).getLong(); - assertThat(keysRead).doesNotContain(key); - keysRead.add(key); - assertThat(keys).contains(key); - } - - return keysRead; - } - catch (Exception e) - { - throw new RuntimeException(e); + long key = Objects.requireNonNull(update.partitionKey()).getLong(); + assertThat(keysRead).doesNotContain(key); + keysRead.add(key); + assertThat(keys).contains(key); } + return keysRead; } } diff --git a/cassandra-analytics-common/src/main/java/org/apache/cassandra/cdc/api/CommitLog.java b/cassandra-analytics-common/src/main/java/org/apache/cassandra/cdc/api/CommitLog.java index b3c532ad1..66adeb44c 100644 --- a/cassandra-analytics-common/src/main/java/org/apache/cassandra/cdc/api/CommitLog.java +++ b/cassandra-analytics-common/src/main/java/org/apache/cassandra/cdc/api/CommitLog.java @@ -55,14 +55,24 @@ static Optional> extractVersionAndSegmentId(@NotNull String { int version = matcher.group(2) == null ? 6 : Integer.parseInt(matcher.group(2)); // versions are present in C* code-base in CommitLogDescriptor - // TODO: version 8 is the commit log in Cassandra 5. Uncomment the following when cdc support for Cassandra 5 commit log is implemented. -// if (version != 6 && version != 7 && version != 8) - if (version != 6 && version != 7) + if (version != 6 && version != 7 && version != 8) { throw new IllegalStateException("Unknown commitlog version " + version); } // logic taken from org.apache.cassandra.db.commitlog.CommitLogDescriptor.getMessagingVersion() - return Optional.of(Pair.of(version == 6 ? 10 : 12, Long.parseLong(matcher.group(3)))); + int messagingVersion; + switch (version) + { + case 6: + messagingVersion = 10; + break; + case 7: + messagingVersion = 12; + break; + default: + messagingVersion = 13; + } + return Optional.of(Pair.of(messagingVersion, Long.parseLong(matcher.group(3)))); } catch (NumberFormatException e) { diff --git a/cassandra-analytics-common/src/main/java/org/apache/cassandra/cdc/msg/RangeTombstoneBuilder.java b/cassandra-analytics-common/src/main/java/org/apache/cassandra/cdc/msg/RangeTombstoneBuilder.java index 9eec12193..0717166b0 100644 --- a/cassandra-analytics-common/src/main/java/org/apache/cassandra/cdc/msg/RangeTombstoneBuilder.java +++ b/cassandra-analytics-common/src/main/java/org/apache/cassandra/cdc/msg/RangeTombstoneBuilder.java @@ -28,37 +28,17 @@ * * @param */ -public abstract class RangeTombstoneBuilder +public interface RangeTombstoneBuilder { - RangeTombstone rangeTombstone; - boolean expectOpen = true; - - public RangeTombstone buildTombstone(List start, boolean isStartInclusive, List end, boolean isEndInclusive) - { - return new RangeTombstone(start, isStartInclusive, end, isEndInclusive); - } - - public boolean canBuild() - { - return rangeTombstone != null; - } - - public RangeTombstone build() - { - RangeTombstone res = rangeTombstone; - rangeTombstone = null; - return res; - } - - public abstract void add(T marker); + RangeTombstone buildTombstone(List start, boolean isStartInclusive, List end, boolean isEndInclusive); + boolean canBuild(); + RangeTombstone build(); + void add(T marker); /** * @return true when there is range tombstone marker not consumed. */ - public abstract boolean hasIncompleteRange(); + boolean hasIncompleteRange(); - public Value buildValue(String keyspace, String name, String type, ByteBuffer buf) - { - return new Value(keyspace, name, type, buf); - } + Value buildValue(String keyspace, String name, String type, ByteBuffer buf); } diff --git a/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/CdcBridge.java b/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/CdcBridge.java index afa25bca0..722eca7d6 100644 --- a/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/CdcBridge.java +++ b/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/CdcBridge.java @@ -19,6 +19,7 @@ package org.apache.cassandra.bridge; +import java.nio.file.Path; import java.util.Collection; import java.util.Random; import java.util.Set; @@ -59,6 +60,10 @@ public void log(CqlTable cqlTable, CommitLogInstance log, Row row, long timestam log(TimeProvider.DEFAULT, cqlTable, log, row, timestamp); } + public abstract CommitLogInstance createCommitLogInstance(Path path); + + public abstract TableIdLookup internalTableIdLookup(); + public abstract void updateCdcSchema(@NotNull Set cdcTables, @NotNull Partitioner partitioner, @NotNull TableIdLookup tableIdLookup); diff --git a/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/bridge/CdcBridgeImplementation.java b/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/bridge/CdcBridgeImplementation.java index 5605394cb..6062f8cc2 100644 --- a/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/bridge/CdcBridgeImplementation.java +++ b/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/bridge/CdcBridgeImplementation.java @@ -67,6 +67,8 @@ protected static synchronized void setCDC(Path path, int commitLogSegmentSize, b DatabaseDescriptor.setCommitLogSyncGroupWindow(30); DatabaseDescriptor.setCommitLogSegmentSize(commitLogSegmentSize); DatabaseDescriptor.getRawConfig().commitlog_total_space = new DataStorageSpec.IntMebibytesBound(1024); + DatabaseDescriptor.setCommitLogWriteDiskAccessMode(Config.DiskAccessMode.direct); + DatabaseDescriptor.setCDCTotalSpaceInMiB(1024); DatabaseDescriptor.setCommitLogSegmentMgrProvider((commitLog -> new CommitLogSegmentManagerCDC(commitLog, commitLogPath.toString()))); setup = true; } diff --git a/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/db/DbUtils.java b/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/db/DbUtils.java new file mode 100644 index 000000000..963f3454b --- /dev/null +++ b/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/db/DbUtils.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.db; + +import java.nio.ByteBuffer; + +import org.apache.cassandra.db.partitions.PartitionUpdate; +import org.apache.cassandra.schema.TableMetadata; + +public class DbUtils +{ + private DbUtils() + { + throw new IllegalStateException(getClass() + " is static utility class and shall not be instantiated"); + } + + public static DeletionTime deletionTime(long markedForDeleteAt, int localDeletionTime) + { + return DeletionTime.build(markedForDeleteAt, localDeletionTime); + } + + public static LivenessInfo livenessInfo(long timestamp, long nowInSeconds) + { + return LivenessInfo.create(timestamp, nowInSeconds); + } + + public static PartitionUpdate fullPartitionDeletion(TableMetadata metadata, ByteBuffer key, long timestamp, long nowInSec) + { + return PartitionUpdate.fullPartitionDelete(metadata, key, timestamp, nowInSec); + } + + public static PartitionUpdate.SimpleBuilder partitionUpdateBuilderWithNow(TableMetadata metadata, DecoratedKey key, long nowInSec) + { + return PartitionUpdate.simpleBuilder(metadata, key).nowInSec(nowInSec); + } +} diff --git a/cassandra-five-zero-types/src/main/java/org/apache/cassandra/spark/data/CqlType.java b/cassandra-five-zero-types/src/main/java/org/apache/cassandra/spark/data/CqlType.java index 269754b11..8dffb2960 100644 --- a/cassandra-five-zero-types/src/main/java/org/apache/cassandra/spark/data/CqlType.java +++ b/cassandra-five-zero-types/src/main/java/org/apache/cassandra/spark/data/CqlType.java @@ -19,6 +19,7 @@ package org.apache.cassandra.spark.data; +import java.nio.ByteBuffer; import java.util.concurrent.TimeUnit; import com.google.common.annotations.VisibleForTesting; @@ -26,6 +27,8 @@ import org.apache.cassandra.bridge.CassandraVersion; import org.apache.cassandra.db.DeletionTime; +import org.apache.cassandra.db.rows.BufferCell; +import org.apache.cassandra.db.rows.CellPath; import org.apache.cassandra.schema.ColumnMetadata; public abstract class CqlType extends AbstractCqlType @@ -47,4 +50,14 @@ public void addComplexTombstone(org.apache.cassandra.db.rows.Row.Builder rowBuil Preconditions.checkArgument(cd.isComplex(), "The method only works with complex columns"); rowBuilder.addComplexDeletion(cd, DeletionTime.build(deletionTime, (int) TimeUnit.MICROSECONDS.toSeconds(deletionTime))); } + + public static BufferCell tombstone(ColumnMetadata column, long timestamp, long nowInSec, CellPath path) + { + return BufferCell.tombstone(column, timestamp, nowInSec, path); + } + + public static BufferCell expiring(ColumnMetadata column, long timestamp, int ttl, long nowInSec, ByteBuffer value, CellPath path) + { + return BufferCell.expiring(column, timestamp, ttl, nowInSec, value, path); + } } diff --git a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/bridge/AbstractCdcBridgeImplementation.java b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/bridge/AbstractCdcBridgeImplementation.java index e25f2f951..a602f4517 100644 --- a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/bridge/AbstractCdcBridgeImplementation.java +++ b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/bridge/AbstractCdcBridgeImplementation.java @@ -20,11 +20,14 @@ package org.apache.cassandra.bridge; import java.nio.ByteBuffer; +import java.nio.file.Path; import java.util.Collection; import java.util.List; +import java.util.NoSuchElementException; import java.util.Objects; import java.util.Random; import java.util.Set; +import java.util.UUID; import java.util.concurrent.ThreadLocalRandom; import java.util.function.BiConsumer; import java.util.function.Consumer; @@ -33,6 +36,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Iterables; +import org.apache.cassandra.cdc.FourZeroCommitLog; import org.apache.cassandra.cdc.FourZeroMutation; import org.apache.cassandra.cdc.api.CassandraSource; import org.apache.cassandra.cdc.api.CommitLog; @@ -49,9 +53,8 @@ import org.apache.cassandra.cdc.stats.ICdcStats; import org.apache.cassandra.cql3.ColumnIdentifier; import org.apache.cassandra.db.Clustering; +import org.apache.cassandra.db.DbUtils; import org.apache.cassandra.db.DecoratedKey; -import org.apache.cassandra.db.DeletionTime; -import org.apache.cassandra.db.LivenessInfo; import org.apache.cassandra.db.Mutation; import org.apache.cassandra.db.commitlog.BufferingCommitLogReader; import org.apache.cassandra.db.commitlog.FourZeroPartitionUpdateWrapper; @@ -79,6 +82,28 @@ public void log(CqlTable cqlTable, CommitLogInstance log, Row row, long timestam log(TimeProvider.DEFAULT, cqlTable, log, row, timestamp); } + public CommitLogInstance createCommitLogInstance(Path path) + { + return new FourZeroCommitLog(path); + } + + public TableIdLookup internalTableIdLookup() + { + return new TableIdLookup() + { + @Nullable + public UUID lookup(String keyspace, String table) throws NoSuchElementException + { + TableMetadata tm = Schema.instance.getTableMetadata(keyspace, table); + if (tm == null) + { + throw new NoSuchElementException(); + } + return tm.id.asUUID(); + } + }; + } + public void updateCdcSchema(@NotNull Set cdcTables, @NotNull Partitioner partitioner, @NotNull TableIdLookup tableIdLookup) { CassandraSchema.updateCdcSchema(cdcTables, partitioner, tableIdLookup); @@ -100,7 +125,7 @@ public CommitLogReader.Result readLog(@NotNull CommitLog log, partitionId, stats, executor, - null, + listener, // only for testing startTimestampMicros, readCommitLogHeader)) { @@ -139,7 +164,7 @@ public static Mutation makeMutation(TimeProvider timeProvider, CqlTable cqlTable final org.apache.cassandra.db.rows.Row.Builder rowBuilder = BTreeRow.sortedBuilder(); if (row.isInsert()) { - rowBuilder.addPrimaryKeyLivenessInfo(LivenessInfo.create(timestamp, timeProvider.nowInSeconds())); + rowBuilder.addPrimaryKeyLivenessInfo(DbUtils.livenessInfo(timestamp, timeProvider.nowInSeconds())); } org.apache.cassandra.db.rows.Row staticRow = Rows.EMPTY_STATIC_ROW; @@ -154,7 +179,7 @@ public static Mutation makeMutation(TimeProvider timeProvider, CqlTable cqlTable // create a mutation and return early if (isPartitionDeletion(cqlTable, row)) { - PartitionUpdate delete = PartitionUpdate.fullPartitionDelete(table, partitionKey, timestamp, timeProvider.nowInSeconds()); + PartitionUpdate delete = DbUtils.fullPartitionDeletion(table, partitionKey, timestamp, timeProvider.nowInSeconds()); return new Mutation(delete); } @@ -189,7 +214,8 @@ else if (clusteringKeys.stream().allMatch(f -> row.get(f.position()) == null)) if (row.isDeleted()) { - rowBuilder.addRowDeletion(org.apache.cassandra.db.rows.Row.Deletion.regular(new DeletionTime(timestamp, timeProvider.nowInSeconds()))); + rowBuilder.addRowDeletion(org.apache.cassandra.db.rows.Row.Deletion.regular( + DbUtils.deletionTime(timestamp, timeProvider.nowInSeconds()))); } else { @@ -260,9 +286,8 @@ protected static Mutation makeRangeTombstone(CqlTable cqlTable, Row row) { final List clusteringKeys = cqlTable.clusteringKeys(); - PartitionUpdate.SimpleBuilder pub = PartitionUpdate.simpleBuilder(table, decoratedPartitionKey) - .timestamp(timestamp) - .nowInSec(timeProvider.nowInSeconds()); + PartitionUpdate.SimpleBuilder pub = DbUtils.partitionUpdateBuilderWithNow(table, decoratedPartitionKey, timeProvider.nowInSeconds()) + .timestamp(timestamp); for (RangeTombstoneData rt : row.rangeTombstones()) { // range tombstone builder is built when partition update builder builds diff --git a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/bridge/CdcBridgeImplementation.java b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/bridge/CdcBridgeImplementation.java index a382319a6..1e21845ba 100644 --- a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/bridge/CdcBridgeImplementation.java +++ b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/bridge/CdcBridgeImplementation.java @@ -33,9 +33,9 @@ public class CdcBridgeImplementation extends AbstractCdcBridgeImplementation { public static volatile boolean setup = false; - public static void setup(Path path, int commitLogSegmentSize, boolean enableCompression) + public static void setup(Path path, int commitLogSegmentSize, boolean enableCompression, BridgeInitializationParameters bridgeParams) { - CassandraTypesImplementation.setup(BridgeInitializationParameters.fromEnvironment()); + CassandraTypesImplementation.setup(bridgeParams); setCDC(path, commitLogSegmentSize, enableCompression); } diff --git a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/cdc/msg/AbstractRangeTombstoneBuilder.java b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/cdc/msg/AbstractRangeTombstoneBuilder.java new file mode 100644 index 000000000..a77859e1f --- /dev/null +++ b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/cdc/msg/AbstractRangeTombstoneBuilder.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.cdc.msg; + +import java.nio.ByteBuffer; +import java.util.List; + +public abstract class AbstractRangeTombstoneBuilder implements RangeTombstoneBuilder +{ + RangeTombstone rangeTombstone; + boolean expectOpen = true; + + public RangeTombstone buildTombstone(List start, boolean isStartInclusive, List end, boolean isEndInclusive) + { + return new RangeTombstone(start, isStartInclusive, end, isEndInclusive); + } + + public boolean canBuild() + { + return rangeTombstone != null; + } + + public RangeTombstone build() + { + RangeTombstone res = rangeTombstone; + rangeTombstone = null; + return res; + } + + public abstract void add(T marker); + + /** + * @return true when there is range tombstone marker not consumed. + */ + public abstract boolean hasIncompleteRange(); + + public Value buildValue(String keyspace, String name, String type, ByteBuffer buf) + { + return new Value(keyspace, name, type, buf); + } +} diff --git a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/cdc/msg/FourZeroRangeTombstoneBuilder.java b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/cdc/msg/FourZeroRangeTombstoneBuilder.java index c03aac9c8..8252260ea 100644 --- a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/cdc/msg/FourZeroRangeTombstoneBuilder.java +++ b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/cdc/msg/FourZeroRangeTombstoneBuilder.java @@ -31,7 +31,7 @@ import org.apache.cassandra.schema.ColumnMetadata; import org.apache.cassandra.schema.TableMetadata; -public class FourZeroRangeTombstoneBuilder extends RangeTombstoneBuilder +public class FourZeroRangeTombstoneBuilder extends AbstractRangeTombstoneBuilder { private final TableMetadata tableMetadata; private RangeTombstoneMarker rangeTombstoneMarker; diff --git a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/db/DbUtils.java b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/db/DbUtils.java new file mode 100644 index 000000000..7965272e3 --- /dev/null +++ b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/db/DbUtils.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.db; + +import java.nio.ByteBuffer; + +import com.google.common.primitives.Ints; + +import org.apache.cassandra.db.partitions.PartitionUpdate; +import org.apache.cassandra.schema.TableMetadata; + +public class DbUtils +{ + private DbUtils() + { + throw new IllegalStateException(getClass() + " is static utility class and shall not be instantiated"); + } + + public static DeletionTime deletionTime(long markedForDeleteAt, int localDeletionTime) + { + return new DeletionTime(markedForDeleteAt, localDeletionTime); + } + + public static LivenessInfo livenessInfo(long timestamp, long nowInSeconds) + { + return LivenessInfo.create(timestamp, Ints.checkedCast(nowInSeconds)); + } + + public static PartitionUpdate fullPartitionDeletion(TableMetadata metadata, ByteBuffer key, long timestamp, long nowInSec) + { + return PartitionUpdate.fullPartitionDelete(metadata, key, timestamp, Ints.checkedCast(nowInSec)); + } + + public static PartitionUpdate.SimpleBuilder partitionUpdateBuilderWithNow(TableMetadata metadata, DecoratedKey key, long nowInSec) + { + return PartitionUpdate.simpleBuilder(metadata, key).nowInSec(Ints.checkedCast(nowInSec)); + } +} diff --git a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/db/commitlog/BufferingCommitLogReader.java b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/db/commitlog/BufferingCommitLogReader.java index 8a808d98a..a9d170c84 100644 --- a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/db/commitlog/BufferingCommitLogReader.java +++ b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/db/commitlog/BufferingCommitLogReader.java @@ -541,7 +541,7 @@ private void readMutationInternal(byte[] inputBuffer, { return; } - logger.trace("Invalid mutation", ex); // we see many unknown table exception logs when we skip over mutations from other tables + logger.trace("Invalid mutation", "error", ex); // we see many unknown table exception logs when we skip over mutations from other tables stats.mutationsIgnoredUnknownTableCount(1); return; diff --git a/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/AbstractCqlType.java b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/AbstractCqlType.java index cb314fe21..e4425d753 100644 --- a/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/AbstractCqlType.java +++ b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/AbstractCqlType.java @@ -155,7 +155,7 @@ public void addCell(org.apache.cassandra.db.rows.Row.Builder rowBuilder, { if (ttl != NO_TTL) { - rowBuilder.addCell(BufferCell.expiring(cd, timestamp, ttl, now, serialize(value), cellPath)); + rowBuilder.addCell(CqlType.expiring(cd, timestamp, ttl, now, serialize(value), cellPath)); } else { @@ -187,7 +187,7 @@ public void addTombstone(org.apache.cassandra.db.rows.Row.Builder rowBuilder, CellPath cellPath) { Preconditions.checkArgument(!(cd.type instanceof ListType), "The method does not support tombstone elements from a List type"); - rowBuilder.addCell(BufferCell.tombstone(cd, timestamp, (int) TimeUnit.MICROSECONDS.toSeconds(timestamp), cellPath)); + rowBuilder.addCell(CqlType.tombstone(cd, timestamp, TimeUnit.MICROSECONDS.toSeconds(timestamp), cellPath)); } /** diff --git a/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/CqlType.java b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/CqlType.java index bac7d440d..e91027a68 100644 --- a/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/CqlType.java +++ b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/CqlType.java @@ -19,6 +19,23 @@ package org.apache.cassandra.spark.data; +import java.nio.ByteBuffer; + +import com.google.common.primitives.Ints; + +import org.apache.cassandra.db.rows.BufferCell; +import org.apache.cassandra.db.rows.CellPath; +import org.apache.cassandra.schema.ColumnMetadata; + public abstract class CqlType extends AbstractCqlType { + public static BufferCell tombstone(ColumnMetadata column, long timestamp, long nowInSec, CellPath path) + { + return BufferCell.tombstone(column, timestamp, Ints.checkedCast(nowInSec), path); + } + + public static BufferCell expiring(ColumnMetadata column, long timestamp, int ttl, long nowInSec, ByteBuffer value, CellPath path) + { + return BufferCell.expiring(column, timestamp, ttl, Ints.checkedCast(nowInSec), value, path); + } } From 219f7be69c12bc76e4a9b70459d2157ee3f4014d Mon Sep 17 00:00:00 2001 From: Lukasz Antoniak Date: Wed, 4 Mar 2026 14:30:28 +0100 Subject: [PATCH 2/2] Apply review comments --- .../org/apache/cassandra/cdc/kafka/KafkaPublisher.java | 7 +++---- .../java/org/apache/cassandra/bridge/CdcBridgeFactory.java | 3 +++ .../apache/cassandra/cdc/msg/RangeTombstoneBuilder.java | 2 ++ 3 files changed, 8 insertions(+), 4 deletions(-) diff --git a/cassandra-analytics-cdc-codec/src/main/java/org/apache/cassandra/cdc/kafka/KafkaPublisher.java b/cassandra-analytics-cdc-codec/src/main/java/org/apache/cassandra/cdc/kafka/KafkaPublisher.java index 0eb550ca7..e104be0ee 100644 --- a/cassandra-analytics-cdc-codec/src/main/java/org/apache/cassandra/cdc/kafka/KafkaPublisher.java +++ b/cassandra-analytics-cdc-codec/src/main/java/org/apache/cassandra/cdc/kafka/KafkaPublisher.java @@ -63,7 +63,7 @@ public class KafkaPublisher implements AutoCloseable ThreadLocal.withInitial(HashMap::new); protected final KafkaStats kafkaStats; - public KafkaPublisher(String version, + public KafkaPublisher(CassandraVersion version, TopicSupplier topicSupplier, KafkaProducer producer, Serializer serializer, @@ -87,7 +87,7 @@ public KafkaPublisher(String version, ); } - public KafkaPublisher(String version, + public KafkaPublisher(CassandraVersion version, TopicSupplier topicSupplier, KafkaProducer producer, Serializer serializer, @@ -99,8 +99,7 @@ public KafkaPublisher(String version, RecordProducer recordProducer, EventHasher eventHasher) { - this.version = CassandraVersion.fromVersion(version).orElseThrow( - () -> new IllegalArgumentException("Unsupported Cassandra version: " + version)); + this.version = version; this.topicSupplier = topicSupplier; this.maxRecordSizeBytes = maxRecordSizeBytes; this.failOnRecordTooLargeError = failOnRecordTooLargeError; diff --git a/cassandra-analytics-cdc/src/main/java/org/apache/cassandra/bridge/CdcBridgeFactory.java b/cassandra-analytics-cdc/src/main/java/org/apache/cassandra/bridge/CdcBridgeFactory.java index 301940008..0d19bb826 100644 --- a/cassandra-analytics-cdc/src/main/java/org/apache/cassandra/bridge/CdcBridgeFactory.java +++ b/cassandra-analytics-cdc/src/main/java/org/apache/cassandra/bridge/CdcBridgeFactory.java @@ -24,6 +24,8 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import com.google.common.annotations.VisibleForTesting; + import org.apache.cassandra.cdc.avro.CqlToAvroSchemaConverter; import org.apache.cassandra.spark.utils.Throwing; import org.jetbrains.annotations.NotNull; @@ -163,6 +165,7 @@ private static VersionSpecificBridge create(@NotNull String label) } } + @VisibleForTesting public static T executeActionOnBridgeClassLoader(@NotNull CassandraVersion version, Throwing.Function action) { ClassLoader bridgeLoader = getVersionSpecificBridge(version).classLoader; diff --git a/cassandra-analytics-common/src/main/java/org/apache/cassandra/cdc/msg/RangeTombstoneBuilder.java b/cassandra-analytics-common/src/main/java/org/apache/cassandra/cdc/msg/RangeTombstoneBuilder.java index 0717166b0..5ecda526d 100644 --- a/cassandra-analytics-common/src/main/java/org/apache/cassandra/cdc/msg/RangeTombstoneBuilder.java +++ b/cassandra-analytics-common/src/main/java/org/apache/cassandra/cdc/msg/RangeTombstoneBuilder.java @@ -25,6 +25,8 @@ /** * Keep track of the last range tombstone marker to build {@link RangeTombstone} * The caller should check whether {@link #canBuild()} after adding marker, and it should build whenever possible. + * IMPLEMENTATION NOTE: Refactored from abstract class to interface, due to classloader clash. Superclass is loaded + * by application classloader, but concrete implementation (from bridge module) with dedicated classloader. * * @param */