Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 2 additions & 7 deletions cpo-cassandra/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -126,14 +126,9 @@
<version>${logbackVersion}</version>
<scope>test</scope>
</dependency>
<!-- map xmlbeans log4j dependency to slf4j -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-to-slf4j</artifactId>
</dependency>
<dependency>
<groupId>org.apache.xmlbeans</groupId>
<artifactId>xmlbeans</artifactId>
<groupId>jakarta.xml.bind</groupId>
<artifactId>jakarta.xml.bind-api</artifactId>
</dependency>
<dependency>
<groupId>com.datastax.cassandra</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.datastax.driver.core.*;
import com.datastax.driver.core.policies.*;
import java.util.Collection;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.synchronoss.cpo.AbstractDataSourceInfo;
Expand All @@ -37,7 +38,7 @@
*/
public class ClusterDataSourceInfo extends AbstractDataSourceInfo<ClusterDataSource> {
private static final Logger logger = LoggerFactory.getLogger(ClusterDataSourceInfo.class);
private String[] contactPoints;
private List<String> contactPoints;
private String keySpace;
private String clusterName;
private Integer maxSchemaAgreementWaitSeconds;
Expand Down Expand Up @@ -71,7 +72,11 @@ public class ClusterDataSourceInfo extends AbstractDataSourceInfo<ClusterDataSou
* @param contactPoints The contact points
*/
public ClusterDataSourceInfo(
String clusterName, String keySpace, String[] contactPoints, int fetchSize, int batchSize) {
String clusterName,
String keySpace,
List<String> contactPoints,
int fetchSize,
int batchSize) {
super(buildDataSourceName(clusterName, keySpace, contactPoints), fetchSize, batchSize);
this.keySpace = keySpace;
this.clusterName = clusterName;
Expand Down Expand Up @@ -576,7 +581,7 @@ protected ClusterDataSource createDataSource() throws CpoException {
}

private static String buildDataSourceName(
String clusterName, String keySpace, String[] contactPoints) {
String clusterName, String keySpace, List<String> contactPoints) {
StringBuilder sb = new StringBuilder();
sb.append(clusterName);
sb.append(keySpace);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,9 @@
import org.synchronoss.cpo.cassandra.CassandraCpoAdapter;
import org.synchronoss.cpo.cassandra.CassandraCpoAdapterFactory;
import org.synchronoss.cpo.cassandra.ClusterDataSourceInfo;
import org.synchronoss.cpo.cassandra.cpoCassandraConfig.*;
import org.synchronoss.cpo.cassandra.meta.CassandraCpoMetaDescriptor;
import org.synchronoss.cpo.config.CpoConfigProcessor;
import org.synchronoss.cpo.core.cpoCoreConfig.CtDataSourceConfig;
import org.synchronoss.cpo.cpoconfig.*;
import org.synchronoss.cpo.meta.CpoMetaDescriptor;

/**
Expand Down Expand Up @@ -66,7 +65,7 @@ public CpoAdapterFactory processCpoConfig(CtDataSourceConfig cpoConfig) throws C
CpoMetaDescriptor.getInstance(cassandraConfig.getMetaDescriptorName());

// build the cluster information
if (cassandraConfig.isSetReadWriteConfig()) {
if (cassandraConfig.getReadWriteConfig() != null) {
ClusterDataSourceInfo clusterInfo =
buildDataSourceInfo(
cassandraConfig.getName(),
Expand Down Expand Up @@ -118,113 +117,120 @@ private ClusterDataSourceInfo buildDataSourceInfo(
new ClusterDataSourceInfo(
dataConfigName,
readWriteConfig.getKeySpace(),
readWriteConfig.getContactPointArray(),
readWriteConfig.getContactPoint(),
fetchSize,
batchSize);

// add clusterName
if (readWriteConfig.isSetClusterName())
clusterInfo.setClusterName(readWriteConfig.getClusterName());
clusterInfo.setClusterName(readWriteConfig.getClusterName());

// add maxSchemaAgreementWaitSeconds
if (readWriteConfig.isSetMaxSchemaAgreementWaitSeconds())
clusterInfo.setMaxSchemaAgreementWaitSeconds(
readWriteConfig.getMaxSchemaAgreementWaitSeconds());
clusterInfo.setMaxSchemaAgreementWaitSeconds(
readWriteConfig.getMaxSchemaAgreementWaitSeconds());

// add port
if (readWriteConfig.isSetPort()) clusterInfo.setPort(readWriteConfig.getPort());
if (readWriteConfig.getPort() != null) clusterInfo.setPort(readWriteConfig.getPort());

// add loadBalancing
if (readWriteConfig.isSetLoadBalancingPolicy()) {
if (readWriteConfig.getLoadBalancingPolicy() != null
&& !readWriteConfig.getLoadBalancingPolicy().isBlank()) {
clusterInfo.setLoadBalancingPolicy(
new ConfigInstantiator<LoadBalancingPolicy>()
.instantiate(readWriteConfig.getLoadBalancingPolicy()));
}

// add reconnectionPolicy
if (readWriteConfig.isSetReconnectionPolicy())
if (readWriteConfig.getReconnectionPolicy() != null
&& !readWriteConfig.getReconnectionPolicy().isBlank())
clusterInfo.setReconnectionPolicy(
new ConfigInstantiator<ReconnectionPolicy>()
.instantiate(readWriteConfig.getReconnectionPolicy()));

// add retryPolicy
if (readWriteConfig.isSetRetryPolicy())
if (readWriteConfig.getRetryPolicy() != null && !readWriteConfig.getRetryPolicy().isBlank())
clusterInfo.setRetryPolicy(
new ConfigInstantiator<RetryPolicy>().instantiate(readWriteConfig.getRetryPolicy()));

// add credentials
if (readWriteConfig.isSetCredentials()) {
if (readWriteConfig.getCredentials() != null) {
clusterInfo.setHasCredentials(true);
clusterInfo.setUserName(readWriteConfig.getCredentials().getUser());
clusterInfo.setPassword(readWriteConfig.getCredentials().getUser());
}

// add addressTranslater
if (readWriteConfig.isSetAddressTranslater())
if (readWriteConfig.getAddressTranslater() != null
&& !readWriteConfig.getAddressTranslater().isBlank())
clusterInfo.setAddressTranslater(
new ConfigInstantiator<AddressTranslator>()
.instantiate(readWriteConfig.getAddressTranslater()));

// add AuthProvider
if (readWriteConfig.isSetAuthProvider())
if (readWriteConfig.getAuthProvider() != null && !readWriteConfig.getAuthProvider().isBlank())
clusterInfo.setAuthProvider(
new ConfigInstantiator<AuthProvider>().instantiate(readWriteConfig.getAuthProvider()));

// add Compression
if (readWriteConfig.isSetCompression())
if (readWriteConfig.getCompression() != null)
clusterInfo.setCompressionType(
ProtocolOptions.Compression.valueOf(readWriteConfig.getCompression().toString()));

// add NettyOptions
if (readWriteConfig.isSetNettyOptions())
if (readWriteConfig.getNettyOptions() != null && !readWriteConfig.getNettyOptions().isBlank())
clusterInfo.setNettyOptions(
new ConfigInstantiator<NettyOptions>().instantiate(readWriteConfig.getNettyOptions()));

// add Metrics
if (readWriteConfig.isSetMetrics()) clusterInfo.setUseMetrics(readWriteConfig.getMetrics());
if (readWriteConfig.isMetrics() != null) clusterInfo.setUseMetrics(readWriteConfig.isMetrics());

// add SSL
if (readWriteConfig.isSetSslOptions() && !readWriteConfig.isNilSslOptions()) {
if (readWriteConfig.getSslOptions() != null
&& readWriteConfig.getSslOptions().getValue() != null
&& !readWriteConfig.getSslOptions().getValue().isBlank()) {
clusterInfo.setSslOptions(
new ConfigInstantiator<SSLOptions>().instantiate(readWriteConfig.getSslOptions()));
new ConfigInstantiator<SSLOptions>()
.instantiate(readWriteConfig.getSslOptions().getValue()));
}

// add Listeners
if (readWriteConfig.isSetInitialListeners()) {
if (readWriteConfig.getInitialListeners() != null
&& !readWriteConfig.getInitialListeners().isBlank()) {
clusterInfo.setListeners(
new ConfigInstantiator<Collection<Host.StateListener>>()
.instantiate(readWriteConfig.getInitialListeners()));
}

// add JMX Reporting
if (readWriteConfig.isSetJmxReporting())
clusterInfo.setUseJmxReporting(readWriteConfig.getJmxReporting());
if (readWriteConfig.isJmxReporting() != null)
clusterInfo.setUseJmxReporting(readWriteConfig.isJmxReporting());

// add protocolVersion
if (readWriteConfig.isSetProtocolVersion())
if (readWriteConfig.getProtocolVersion() != null)
clusterInfo.setProtocolVersion(
ProtocolVersion.valueOf(readWriteConfig.getProtocolVersion().toString()));

// add pooling options
if (readWriteConfig.isSetPoolingOptions())
if (readWriteConfig.getPoolingOptions() != null)
clusterInfo.setPoolingOptions(buildPoolingOptions(readWriteConfig.getPoolingOptions()));

// add socket options
if (readWriteConfig.isSetSocketOptions())
if (readWriteConfig.getSocketOptions() != null)
clusterInfo.setSocketOptions(buildSocketOptions(readWriteConfig.getSocketOptions()));

// add query Options
if (readWriteConfig.isSetQueryOptions())
if (readWriteConfig.getQueryOptions() != null)
clusterInfo.setQueryOptions(buildQueryOptions(readWriteConfig.getQueryOptions()));

// add speculativeExecutionPolicy
if (readWriteConfig.isSetSpeculativeExecutionPolicy())
if (readWriteConfig.getSpeculativeExecutionPolicy() != null
&& !readWriteConfig.getSpeculativeExecutionPolicy().isBlank())
clusterInfo.setSpeculativeExecutionPolicy(
new ConfigInstantiator<SpeculativeExecutionPolicy>()
.instantiate(readWriteConfig.getSpeculativeExecutionPolicy()));

// add TimestampGenerator
if (readWriteConfig.isSetTimestampGenerator())
if (readWriteConfig.getTimestampGenerator() != null
&& !readWriteConfig.getTimestampGenerator().isBlank())
clusterInfo.setTimestampGenerator(
new ConfigInstantiator<TimestampGenerator>()
.instantiate(readWriteConfig.getTimestampGenerator()));
Expand All @@ -236,45 +242,45 @@ private ClusterDataSourceInfo buildDataSourceInfo(
private PoolingOptions buildPoolingOptions(CtPoolingOptions ctPoolingOptions) {
PoolingOptions poolingOptions = new PoolingOptions();

if (ctPoolingOptions.isSetConnectionsPerHost()) {
if (ctPoolingOptions.getConnectionsPerHost() != null) {
CtConnectionsPerHost cph = ctPoolingOptions.getConnectionsPerHost();
poolingOptions.setConnectionsPerHost(
HostDistance.valueOf(cph.getDistance().toString()), cph.getCore(), cph.getMax());
}

if (ctPoolingOptions.isSetCoreConnectionsPerHost()) {
if (ctPoolingOptions.getCoreConnectionsPerHost() != null) {
CtHostDistanceAndThreshold hdt = ctPoolingOptions.getCoreConnectionsPerHost();
poolingOptions.setCoreConnectionsPerHost(
HostDistance.valueOf(hdt.getDistance().toString()), hdt.getThreshold());
}

if (ctPoolingOptions.isSetHeartbeatIntervalSeconds()) {
if (ctPoolingOptions.getHeartbeatIntervalSeconds() != null) {
poolingOptions.setHeartbeatIntervalSeconds(ctPoolingOptions.getHeartbeatIntervalSeconds());
}

if (ctPoolingOptions.isSetIdleTimeoutSeconds()) {
if (ctPoolingOptions.getIdleTimeoutSeconds() != null) {
poolingOptions.setIdleTimeoutSeconds(ctPoolingOptions.getIdleTimeoutSeconds());
}

if (ctPoolingOptions.isSetMaxConnectionsPerHost()) {
if (ctPoolingOptions.getMaxConnectionsPerHost() != null) {
CtHostDistanceAndThreshold hdt = ctPoolingOptions.getMaxConnectionsPerHost();
poolingOptions.setMaxConnectionsPerHost(
HostDistance.valueOf(hdt.getDistance().toString()), hdt.getThreshold());
}

if (ctPoolingOptions.isSetMaxRequestsPerConnection()) {
if (ctPoolingOptions.getMaxRequestsPerConnection() != null) {
CtHostDistanceAndThreshold hdt = ctPoolingOptions.getMaxRequestsPerConnection();
poolingOptions.setMaxRequestsPerConnection(
HostDistance.valueOf(hdt.getDistance().toString()), hdt.getThreshold());
}

if (ctPoolingOptions.isSetNewConnectionThreshold()) {
if (ctPoolingOptions.getNewConnectionThreshold() != null) {
CtHostDistanceAndThreshold hdt = ctPoolingOptions.getNewConnectionThreshold();
poolingOptions.setNewConnectionThreshold(
HostDistance.valueOf(hdt.getDistance().toString()), hdt.getThreshold());
}

if (ctPoolingOptions.isSetPoolTimeoutMillis()) {
if (ctPoolingOptions.getPoolTimeoutMillis() != null) {
poolingOptions.setPoolTimeoutMillis(ctPoolingOptions.getPoolTimeoutMillis());
}

Expand All @@ -284,16 +290,17 @@ private PoolingOptions buildPoolingOptions(CtPoolingOptions ctPoolingOptions) {
private QueryOptions buildQueryOptions(CtQueryOptions ctQueryOptions) {
QueryOptions queryOptions = new QueryOptions();

if (ctQueryOptions.isSetConsistencyLevel())
if (ctQueryOptions.getConsistencyLevel() != null)
queryOptions.setConsistencyLevel(
ConsistencyLevel.valueOf(ctQueryOptions.getConsistencyLevel().toString()));

if (ctQueryOptions.isSetDefaultIdempotence())
queryOptions.setDefaultIdempotence(ctQueryOptions.getDefaultIdempotence());
if (ctQueryOptions.isDefaultIdempotence() != null)
queryOptions.setDefaultIdempotence(ctQueryOptions.isDefaultIdempotence());

if (ctQueryOptions.isSetFetchSize()) queryOptions.setFetchSize(ctQueryOptions.getFetchSize());
if (ctQueryOptions.getFetchSize() != null)
queryOptions.setFetchSize(ctQueryOptions.getFetchSize());

if (ctQueryOptions.isSetSerialConsistencyLevel())
if (ctQueryOptions.getSerialConsistencyLevel() != null)
queryOptions.setSerialConsistencyLevel(
ConsistencyLevel.valueOf(ctQueryOptions.getSerialConsistencyLevel().toString()));

Expand All @@ -303,28 +310,29 @@ private QueryOptions buildQueryOptions(CtQueryOptions ctQueryOptions) {
private SocketOptions buildSocketOptions(CtSocketOptions ctSocketOptions) {
SocketOptions socketOptions = new SocketOptions();

if (ctSocketOptions.isSetConnectionTimeoutMillis())
if (ctSocketOptions.getConnectionTimeoutMillis() != null)
socketOptions.setConnectTimeoutMillis(ctSocketOptions.getConnectionTimeoutMillis());

if (ctSocketOptions.isSetKeepAlive())
socketOptions.setKeepAlive(ctSocketOptions.getKeepAlive());
if (ctSocketOptions.isKeepAlive() != null)
socketOptions.setKeepAlive(ctSocketOptions.isKeepAlive());

if (ctSocketOptions.isSetReadTimeoutMillis())
if (ctSocketOptions.getReadTimeoutMillis() != null)
socketOptions.setReadTimeoutMillis(ctSocketOptions.getReadTimeoutMillis());

if (ctSocketOptions.isSetReceiveBufferSize())
if (ctSocketOptions.getReceiveBufferSize() != null)
socketOptions.setReceiveBufferSize(ctSocketOptions.getReceiveBufferSize());

if (ctSocketOptions.isSetReuseAddress())
socketOptions.setReuseAddress(ctSocketOptions.getReuseAddress());
if (ctSocketOptions.isReuseAddress() != null)
socketOptions.setReuseAddress(ctSocketOptions.isReuseAddress());

if (ctSocketOptions.isSetSendBufferSize())
if (ctSocketOptions.getSendBufferSize() != null)
socketOptions.setSendBufferSize(ctSocketOptions.getSendBufferSize());

if (ctSocketOptions.isSetSoLinger()) socketOptions.setSoLinger(ctSocketOptions.getSoLinger());
if (ctSocketOptions.getSoLinger() != null)
socketOptions.setSoLinger(ctSocketOptions.getSoLinger());

if (ctSocketOptions.isSetTcpNoDelay())
socketOptions.setTcpNoDelay(ctSocketOptions.getTcpNoDelay());
if (ctSocketOptions.isTcpNoDelay() != null)
socketOptions.setTcpNoDelay(ctSocketOptions.isTcpNoDelay());

return socketOptions;
}
Expand Down
Loading
Loading