Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public class KafkaPublisher implements AutoCloseable
{
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaPublisher.class);

protected CassandraVersion version;
protected TopicSupplier topicSupplier;
protected int maxRecordSizeBytes;
protected final RecordProducer recordProducer;
Expand All @@ -62,7 +63,8 @@ public class KafkaPublisher implements AutoCloseable
ThreadLocal.withInitial(HashMap::new);
protected final KafkaStats kafkaStats;

public KafkaPublisher(TopicSupplier topicSupplier,
public KafkaPublisher(CassandraVersion version,
TopicSupplier topicSupplier,
KafkaProducer<String, byte[]> producer,
Serializer<CdcEvent> serializer,
int maxRecordSizeBytes,
Expand All @@ -71,6 +73,7 @@ public KafkaPublisher(TopicSupplier topicSupplier,
CdcLogMode logMode)
{
this(
version,
topicSupplier,
producer,
serializer,
Expand All @@ -84,7 +87,8 @@ public KafkaPublisher(TopicSupplier topicSupplier,
);
}

public KafkaPublisher(TopicSupplier topicSupplier,
public KafkaPublisher(CassandraVersion version,
TopicSupplier topicSupplier,
KafkaProducer<String, byte[]> producer,
Serializer<CdcEvent> serializer,
int maxRecordSizeBytes,
Expand All @@ -95,6 +99,7 @@ public KafkaPublisher(TopicSupplier topicSupplier,
RecordProducer recordProducer,
EventHasher eventHasher)
{
this.version = version;
this.topicSupplier = topicSupplier;
this.maxRecordSizeBytes = maxRecordSizeBytes;
this.failOnRecordTooLargeError = failOnRecordTooLargeError;
Expand All @@ -116,7 +121,7 @@ public CqlField.CqlType getType(KeyspaceTypeKey key)

public CassandraVersion version()
{
return CassandraVersion.FOURZERO;
return version;
}

public Logger logger()
Expand Down
10 changes: 6 additions & 4 deletions cassandra-analytics-cdc/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,10 @@ dependencies {
testImplementation project(":cassandra-four-zero-types")
testImplementation project(":cassandra-four-zero-bridge")
testImplementation project(path: ':cassandra-four-zero', configuration: 'shadow')
// unit tests are performed only with Cassandra 4
// testImplementation project(":cassandra-five-zero-bridge")
// testImplementation project(":cassandra-five-zero-types")
// testImplementation project(path: ':cassandra-five-zero', configuration: 'shadow')

testImplementation project(":cassandra-five-zero-bridge")
testImplementation project(":cassandra-five-zero-types")
testImplementation project(path: ':cassandra-five-zero', configuration: 'shadow')

testImplementation(group: 'org.quicktheories', name: 'quicktheories', version: "${project.rootProject.quickTheoriesVersion}")
testImplementation(group: 'com.google.guava', name: 'guava', version: '31.1-jre')
Expand Down Expand Up @@ -139,6 +139,8 @@ jar {
}

test {
systemProperty "cassandra.sidecar.versions_to_test", (System.getenv("CASSANDRA_VERSION") ?: "4.0.0")

minHeapSize = '1024m'
maxHeapSize = '3072m'
maxParallelForks = Math.max(Runtime.runtime.availableProcessors() * 2, 8)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import com.google.common.annotations.VisibleForTesting;

import org.apache.cassandra.cdc.avro.CqlToAvroSchemaConverter;
import org.apache.cassandra.spark.utils.Throwing;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

Expand All @@ -40,14 +43,17 @@ public static class VersionSpecificBridge
public final CdcBridge cdcBridge;
@Nullable
final CqlToAvroSchemaConverter avroSchemaConverter;
final ClassLoader classLoader;

public VersionSpecificBridge(CassandraBridge cassandraBridge,
CdcBridge cdcBridge,
@Nullable CqlToAvroSchemaConverter avroSchemaConverter)
@Nullable CqlToAvroSchemaConverter avroSchemaConverter,
ClassLoader classLoader)
{
this.cassandraBridge = cassandraBridge;
this.cdcBridge = cdcBridge;
this.avroSchemaConverter = avroSchemaConverter;
this.classLoader = classLoader;
}
}

Expand Down Expand Up @@ -150,12 +156,36 @@ private static VersionSpecificBridge create(@NotNull String label)
{
}

return new VersionSpecificBridge(bridgeInstance, cdcBridgeInstance, cqlToAvroSchemaConverter);
return new VersionSpecificBridge(bridgeInstance, cdcBridgeInstance, cqlToAvroSchemaConverter, loader);
}
catch (ClassNotFoundException | NoSuchMethodException | InstantiationException
| IllegalAccessException | InvocationTargetException exception)
{
throw new RuntimeException("Failed to create Cassandra bridge for label " + label, exception);
}
}

@VisibleForTesting
public static <T> T executeActionOnBridgeClassLoader(@NotNull CassandraVersion version, Throwing.Function<ClassLoader, T> action)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add @VisibleForTesting

{
ClassLoader bridgeLoader = getVersionSpecificBridge(version).classLoader;
Thread currentThread = Thread.currentThread();
ClassLoader originalClassLoader = currentThread.getContextClassLoader();
try
{
currentThread.setContextClassLoader(bridgeLoader);
try
{
return action.apply(bridgeLoader);
}
catch (Exception e)
{
throw new RuntimeException("Failed to execute function on bridge classloader", e);
}
}
finally
{
currentThread.setContextClassLoader(originalClassLoader);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.cassandra.cdc;

import java.lang.reflect.Method;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.LinkedHashMap;
Expand All @@ -37,17 +38,17 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.cassandra.bridge.BridgeInitializationParameters;
import org.apache.cassandra.bridge.CassandraBridge;
import org.apache.cassandra.bridge.CassandraVersion;
import org.apache.cassandra.bridge.CdcBridgeImplementation;
import org.apache.cassandra.bridge.CdcBridge;
import org.apache.cassandra.cdc.api.CassandraSource;
import org.apache.cassandra.cdc.api.CdcOptions;
import org.apache.cassandra.cdc.api.CommitLogInstance;
import org.apache.cassandra.cdc.msg.CdcEvent;
import org.apache.cassandra.cdc.state.CdcState;
import org.apache.cassandra.spark.data.CqlTable;
import org.apache.cassandra.spark.data.partitioner.Partitioner;
import org.apache.cassandra.spark.utils.IOUtils;
import org.apache.cassandra.spark.utils.ThrowableUtils;
import org.apache.cassandra.spark.utils.TimeProvider;
import org.apache.cassandra.spark.utils.test.TestSchema;
import org.jetbrains.annotations.Nullable;
Expand All @@ -61,17 +62,27 @@ public class CdcTester
private static final Logger LOGGER = LoggerFactory.getLogger(CdcTester.class);
public static final int DEFAULT_NUM_ROWS = 1000;

public static FourZeroCommitLog testCommitLog;
public static CommitLogInstance testCommitLog;

public static void setup(Path testDirectory)
public static void setup(CdcBridge cdcBridge, Path testDirectory)
{
setup(testDirectory, 32, false);
setup(cdcBridge, testDirectory, 32, false);
}

public static void setup(Path testDirectory, int commitLogSegmentSize, boolean enableCompression)
public static void setup(CdcBridge cdcBridge, Path testDirectory, int commitLogSegmentSize, boolean enableCompression)
{
CdcBridgeImplementation.setup(testDirectory, commitLogSegmentSize, enableCompression);
testCommitLog = new FourZeroCommitLog(testDirectory);
try
{
// TODO: Refactor static initialization to instance method.
// use reflection to execute static initialization
Method setupMethod = cdcBridge.getClass().getMethod("setup", Path.class, int.class, boolean.class, BridgeInitializationParameters.class);
setupMethod.invoke(null, testDirectory, commitLogSegmentSize, enableCompression, BridgeInitializationParameters.fromEnvironment());
}
catch (Exception e)
{
throw new IllegalStateException("Failed to setup CdcBridge", e);
}
testCommitLog = cdcBridge.createCommitLogInstance(testDirectory);
}

public static void tearDown()
Expand Down Expand Up @@ -260,15 +271,14 @@ public void sync()
void run()
{
Map<String, TestSchema.TestRow> rows = new LinkedHashMap<>(numRows);
CassandraVersion version = CassandraVersion.FOURZERO;

List<CdcEvent> cdcEvents = new ArrayList<>();
try
{
LOGGER.info("Running CDC test testId={} schema='{}' thread={}", testId, cqlTable.fields(), Thread.currentThread().getName());
Set<String> udtStmts = schema.udts.stream().map(e -> e.createStatement(bridge.cassandraTypes(), schema.keyspace)).collect(Collectors.toSet());
bridge.buildSchema(schema.createStatement, schema.keyspace, schema.rf, partitioner, udtStmts, null, 0, true);
schema.setCassandraVersion(version);
schema.setCassandraVersion(TestVersionSupplier.testVersion());

// write some mutations to CDC CommitLog
for (CdcWriter writer : writers)
Expand Down Expand Up @@ -312,8 +322,7 @@ void run()
}
catch (Throwable t)
{
LOGGER.error("Unexpected error in CdcTester", ThrowableUtils.rootCause(t));
t.printStackTrace();
LOGGER.error("Unexpected error in CdcTester", t);
fail("Unexpected error in CdcTester");
}
finally
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@
import org.slf4j.LoggerFactory;

import org.apache.cassandra.bridge.CassandraBridge;
import org.apache.cassandra.bridge.CassandraBridgeImplementation;
import org.apache.cassandra.bridge.CassandraVersion;
import org.apache.cassandra.bridge.CdcBridge;
import org.apache.cassandra.bridge.CdcBridgeImplementation;
import org.apache.cassandra.bridge.CdcBridgeFactory;
import org.apache.cassandra.bridge.TokenRange;
import org.apache.cassandra.cdc.api.CdcOptions;
import org.apache.cassandra.cdc.api.CommitLog;
Expand All @@ -69,13 +69,12 @@
import org.apache.cassandra.cdc.msg.jdk.JdkMessageConverter;
import org.apache.cassandra.cdc.state.CdcState;
import org.apache.cassandra.db.marshal.ByteBufferAccessor;
import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.serializers.CollectionSerializer;
import org.apache.cassandra.spark.data.CqlField;
import org.apache.cassandra.spark.data.CqlTable;
import org.apache.cassandra.spark.data.ReplicationFactor;
import org.apache.cassandra.spark.data.partitioner.CassandraInstance;
import org.apache.cassandra.spark.data.partitioner.Partitioner;
import org.apache.cassandra.spark.reader.SchemaBuilder;
import org.apache.cassandra.spark.utils.AsyncExecutor;
import org.apache.cassandra.spark.utils.ByteBufferUtils;
import org.apache.cassandra.spark.utils.IOUtils;
Expand Down Expand Up @@ -107,17 +106,21 @@ public int minimumReplicas(String keyspace)
{
return 1;
}

public CassandraVersion version()
{
return TestVersionSupplier.testVersion();
}
};
public static final ExecutorService EXECUTOR = Executors.newFixedThreadPool(4,
new ThreadFactoryBuilder()
.setNameFormat("cdc-io-%d")
.setDaemon(true)
.build());
public static final AsyncExecutor ASYNC_EXECUTOR = AsyncExecutor.wrap(EXECUTOR);
// TODO: Execute CDC tests also with Cassandra 5 bridge.
public static final CassandraBridge BRIDGE = new CassandraBridgeImplementation();
public static final CassandraBridge BRIDGE = CdcBridgeFactory.get(TestVersionSupplier.testVersion());
public static final JdkMessageConverter MESSAGE_CONVERTER = new JdkMessageConverter(BRIDGE.cassandraTypes());
public static final CdcBridge CDC_BRIDGE = new CdcBridgeImplementation();
public static final CdcBridge CDC_BRIDGE = CdcBridgeFactory.getCdcBridge(TestVersionSupplier.testVersion());

private static final int TTL = 42;

Expand All @@ -143,7 +146,7 @@ public static synchronized void setup()
{
throw new RuntimeException(e);
}
CdcTester.setup(directory);
CdcTester.setup(CDC_BRIDGE, directory);
isSetup = true;
}

Expand Down Expand Up @@ -215,7 +218,6 @@ public void testMockedCdc()
null,
0,
true);
UUID tableId = Schema.instance.getTableMetadata(table.keyspace(), table.table()).id.asUUID();
SchemaSupplier schemaSupplier = () -> CompletableFuture.completedFuture(ImmutableSet.of(table));
AtomicReference<byte[]> state = new AtomicReference<>();
StatePersister statePersister = new StatePersister()
Expand Down Expand Up @@ -254,7 +256,7 @@ public List<CdcState> loadState(String jobId, int partitionId, @Nullable TokenRa
try (Cdc cdc = Cdc.builder("101", 0, eventConsumer, schemaSupplier)
.withExecutor(CdcTests.ASYNC_EXECUTOR)
.withStatePersister(statePersister)
.withTableIdLookup((ks, tb) -> tableId)
.withTableIdLookup(CDC_BRIDGE.internalTableIdLookup())
.withCommitLogProvider(CdcTests.logProvider(CdcTests.directory))
.withCdcOptions(CdcTests.TEST_OPTIONS)
.build())
Expand Down Expand Up @@ -542,8 +544,18 @@ public void testMultiTable()
TestSchema schema3 = tableBuilder3.build();
CqlTable cqlTable2 = schema2.buildTable();
CqlTable cqlTable3 = schema3.buildTable();
new SchemaBuilder(cqlTable2, Partitioner.Murmur3Partitioner, schema2.withCdc);
new SchemaBuilder(cqlTable3, Partitioner.Murmur3Partitioner, schema3.withCdc);
BRIDGE.buildSchema(cqlTable2.createStatement(),
cqlTable2.keyspace(),
ReplicationFactor.simpleStrategy(1),
Partitioner.Murmur3Partitioner,
Collections.emptySet(),
null, 0, schema2.withCdc);
BRIDGE.buildSchema(cqlTable3.createStatement(),
cqlTable3.keyspace(),
ReplicationFactor.simpleStrategy(1),
Partitioner.Murmur3Partitioner,
Collections.emptySet(),
null, 0, schema3.withCdc);
int numRows = DEFAULT_NUM_ROWS;

AtomicReference<TestSchema> schema1Holder = new AtomicReference<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,8 @@
import com.google.common.collect.ImmutableList;
import org.junit.jupiter.api.Test;

import org.apache.cassandra.bridge.CollectionElement;
import org.apache.cassandra.cdc.msg.CdcEvent;
import org.apache.cassandra.cdc.msg.Value;
import org.apache.cassandra.db.rows.CellPath;
import org.apache.cassandra.spark.data.CqlField;
import org.apache.cassandra.spark.utils.test.TestSchema;

Expand Down Expand Up @@ -115,7 +113,8 @@ private void testElementDeletionInCollection(int numOfPKs,
testRow = CdcTester.newUniqueRow(tester.schema, rows);
for (String name : collectionColumnNames)
{
testRow = testRow.copy(name, CollectionElement.deleted(CellPath.create(key)));
Object value = TestUtils.collectionDeleteMutation(TestVersionSupplier.testVersion(), key);
testRow = testRow.copy(name, value);
}
elementDeletionIndices.put(i, key.array());
}
Expand Down
Loading