Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ public abstract class CrossDcConf {
public static final String COLLAPSE_UPDATES = "collapseUpdates";
public static final String MAX_COLLAPSE_RECORDS = "maxCollapseRecords";

public static final String SOLR_HTTP2_ENABLED = "solrHttp2Enabled";

/**
* Option to expand Delete-By-Query requests on the producer side.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
import org.apache.kafka.common.security.plain.PlainLoginModule;

import java.util.*;

Expand Down Expand Up @@ -109,6 +111,10 @@ public class KafkaCrossDcConf extends CrossDcConf {

public static final String FETCH_MAX_BYTES = "fetchMaxBytes";

public static final String USERNAME = "userName";

public static final String PASSWORD = "password";

// The maximum delay between invocations of poll() when using consumer group management. This places
// an upper bound on the amount of time that the consumer can be idle before fetching more records.
// If poll() is not called before expiration of this timeout, then the consumer is considered failed
Expand Down Expand Up @@ -158,6 +164,7 @@ public class KafkaCrossDcConf extends CrossDcConf {

// Consumer only zkConnectString
new ConfigProperty(ZK_CONNECT_STRING, null),
new ConfigProperty(SOLR_HTTP2_ENABLED, "false"),
new ConfigProperty(FETCH_MIN_BYTES, DEFAULT_FETCH_MIN_BYTES),
new ConfigProperty(FETCH_MAX_BYTES, DEFAULT_FETCH_MAX_BYTES),
new ConfigProperty(FETCH_MAX_WAIT_MS, DEFAULT_FETCH_MAX_WAIT_MS),
Expand Down Expand Up @@ -198,6 +205,9 @@ public class KafkaCrossDcConf extends CrossDcConf {
new ConfigProperty(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG),
new ConfigProperty(SslConfigs.SSL_SECURE_RANDOM_IMPLEMENTATION_CONFIG),

new ConfigProperty(SaslConfigs.SASL_MECHANISM),
new ConfigProperty(USERNAME),
new ConfigProperty(PASSWORD),
new ConfigProperty(BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG),


Expand Down Expand Up @@ -232,11 +242,24 @@ public KafkaCrossDcConf(Map<String, Object> properties) {

public static void addSecurityProps(KafkaCrossDcConf conf, Properties kafkaConsumerProps) {
for (ConfigProperty property : SECURITY_CONFIG_PROPERTIES) {
if (KafkaCrossDcConf.USERNAME.equals(property.getKey()) || KafkaCrossDcConf.PASSWORD.equals(property.getKey()))
continue;
String val = conf.get(property.getKey());
if (val != null) {
kafkaConsumerProps.put(property.getKey(), val);
}
}

if ("PLAIN".equals(conf.get(SaslConfigs.SASL_MECHANISM))) {
var kafkaUsername = conf.get(KafkaCrossDcConf.USERNAME);
var kafkaPassword = conf.get(KafkaCrossDcConf.PASSWORD);
kafkaConsumerProps.put(SaslConfigs.SASL_JAAS_CONFIG, String.format(
"%s required username=\"%s\" " + "password=\"%s\";",
PlainLoginModule.class.getName(),
kafkaUsername,
kafkaPassword
));
}
}

public String get(String property) {
Expand All @@ -258,7 +281,7 @@ public Boolean getBool(String property) {
}
return prop.getValueAsBoolean(properties);
}

public Map<String,Object> getAdditionalProperties() {
Map<String, Object> additional = new HashMap<>(properties);
for (ConfigProperty configProperty : CONFIG_PROPERTIES) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,19 +44,19 @@ public class KafkaMirroringSink implements RequestMirroringSink, Closeable {

private final KafkaCrossDcConf conf;
private final Producer<String, MirroredSolrRequest> producer;
private final KafkaConsumer<String,MirroredSolrRequest> consumer;
private KafkaConsumer<String,MirroredSolrRequest> consumer;
private final String mainTopic;
private final String dlqTopic;

public KafkaMirroringSink(final KafkaCrossDcConf conf) {
// Create Kafka Mirroring Sink
this.conf = conf;
this.producer = initProducer();
this.consumer = initConsumer();
//this.consumer = initConsumer();
this.mainTopic = conf.get(KafkaCrossDcConf.TOPIC_NAME).split(",")[0];
this.dlqTopic = conf.get(KafkaCrossDcConf.DLQ_TOPIC_NAME);

checkTopicsAvailability();
//checkTopicsAvailability();
}

@Override
Expand Down Expand Up @@ -130,8 +130,8 @@ private Producer<String, MirroredSolrRequest> initProducer() {
// Initialize and return Kafka producer
Properties kafkaProducerProps = new Properties();

log.info("Starting CrossDC Producer {}", conf);

log.debug("Starting CrossDC Producer {}", conf);
log.info("Starting CrossDC Producer.");
kafkaProducerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, conf.get(KafkaCrossDcConf.BOOTSTRAP_SERVERS));

kafkaProducerProps.put(ProducerConfig.ACKS_CONFIG, "all");
Expand Down Expand Up @@ -160,7 +160,7 @@ private Producer<String, MirroredSolrRequest> initProducer() {
}

ClassLoader originalContextClassLoader = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(null);
Thread.currentThread().setContextClassLoader(KafkaProducer.class.getClassLoader());
Producer<String, MirroredSolrRequest> producer;
try {
producer = new KafkaProducer<>(kafkaProducerProps);
Expand All @@ -185,6 +185,9 @@ private KafkaConsumer<String, MirroredSolrRequest> initConsumer() {
kafkaConsumerProperties.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, conf.getInt(KafkaCrossDcConf.FETCH_MAX_BYTES));
kafkaConsumerProperties.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, conf.getInt(KafkaCrossDcConf.MAX_PARTITION_FETCH_BYTES));
kafkaConsumerProperties.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, conf.getInt(KafkaCrossDcConf.REQUEST_TIMEOUT_MS));

KafkaCrossDcConf.addSecurityProps(conf, kafkaConsumerProperties);

kafkaConsumerProperties.putAll(conf.getAdditionalProperties());

return new KafkaConsumer<>(kafkaConsumerProperties, new StringDeserializer(), new MirroredSolrRequestSerializer());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,11 @@
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.ConfigSetAdminRequest;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.client.solrj.request.schema.AbstractSchemaRequest;
import org.apache.solr.client.solrj.request.schema.SchemaRequest;
import org.apache.solr.client.solrj.response.CollectionAdminResponse;
import org.apache.solr.client.solrj.response.ConfigSetAdminResponse;
import org.apache.solr.client.solrj.response.schema.SchemaResponse;
import org.apache.solr.common.params.CollectionParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
Expand All @@ -46,6 +49,7 @@ public enum Type {
UPDATE,
ADMIN,
CONFIGSET,
SCHEMA,
UNKNOWN;

public static final Type get(String s) {
Expand Down Expand Up @@ -117,6 +121,28 @@ protected ConfigSetAdminResponse createResponse(SolrClient client) {
}
}

public static class MirroredSchemaRequest extends AbstractSchemaRequest {
private ModifiableSolrParams params;
public MirroredSchemaRequest(METHOD method, ModifiableSolrParams params) {
super(method, "/schema", params);
this.params = params;
}

@Override
public SolrParams getParams() {
return params;
}

public void setParams(ModifiableSolrParams params) {
this.params = params;
}

@Override
protected SchemaResponse createResponse(SolrClient client) {
return new SchemaResponse();
}
}

public static class ExposedByteArrayContentStream extends ContentStreamBase {
private final byte[] bytes;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,10 @@ public MirroredSolrRequest deserialize(String topic, byte[] data) {
}
request = new MirroredSolrRequest.MirroredConfigSetRequest(method, params, contentStreams);
log.debug("-- configSet method={}, req={}, streams={}", request.getMethod(), request.getParams(), csNames);
} else if (type == MirroredSolrRequest.Type.SCHEMA) {
String m = (String) requestMap.get("method");
SolrRequest.METHOD method = SolrRequest.METHOD.valueOf(m);
request = new MirroredSolrRequest.MirroredSchemaRequest(method, params);
} else {
throw new RuntimeException("Unknown request type: " + requestMap);
}
Expand Down Expand Up @@ -189,6 +193,9 @@ public byte[] serialize(String topic, MirroredSolrRequest request) {
}
map.put("contentStreams", streamsList);
}
} else if (solrRequest instanceof MirroredSolrRequest.MirroredSchemaRequest) {
MirroredSolrRequest.MirroredSchemaRequest schema = (MirroredSolrRequest.MirroredSchemaRequest) solrRequest;
map.put("method", schema.getMethod().toString());
}

codec.marshal(map, baos);
Expand Down
6 changes: 3 additions & 3 deletions crossdc-consumer/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,11 @@ dependencies {
implementation 'io.dropwizard.metrics:metrics-core:4.2.17'
implementation 'io.dropwizard.metrics:metrics-servlets:4.2.17'
implementation 'org.slf4j:slf4j-api:2.0.5'
implementation 'org.eclipse.jetty:jetty-http:10.0.13'
implementation 'org.eclipse.jetty:jetty-server:10.0.13'
implementation 'org.eclipse.jetty:jetty-http:10.0.19'
implementation 'org.eclipse.jetty:jetty-server:10.0.19'
implementation group: 'commons-logging', name: 'commons-logging', version: '1.3.0'
implementation group: 'org.apache.logging.log4j', name: 'log4j-api', version: '2.20.0' // log4j impl can use StackLocatorUtil which is in the api jar
implementation 'org.eclipse.jetty:jetty-servlet:10.0.13'
implementation 'org.eclipse.jetty:jetty-servlet:10.0.19'
implementation 'org.apache.logging.log4j:log4j-slf4j-impl:2.20.0'
implementation group: 'org.slf4j', name: 'slf4j-simple', version: '2.0.9'
testImplementation project(path: ':crossdc-commons')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,17 @@
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.crossdc.common.ConfUtil;
import org.apache.solr.crossdc.common.ConfigProperty;
import org.apache.solr.crossdc.common.CrossDcConf;
import org.apache.solr.crossdc.common.KafkaCrossDcConf;
import org.apache.solr.crossdc.common.SensitivePropRedactionUtils;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletMapping;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.ByteArrayInputStream;
import java.lang.invoke.MethodHandles;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -76,6 +71,8 @@ public void start(Map<String,Object> properties ) {
try (SolrZkClient client = new SolrZkClient.Builder().withUrl(zkConnectString).withTimeout(15000, TimeUnit.MILLISECONDS).build()) {
// update properties, potentially also from ZK
ConfUtil.fillProperties(client, properties);
} catch (Exception e) {
log.info("Failed to fetch properties from ZK.", e);
}

ConfUtil.verifyProperties(properties);
Expand Down Expand Up @@ -104,8 +101,8 @@ public void start(Map<String,Object> properties ) {

// Start consumer thread

log.info("Starting CrossDC Consumer {}", conf);

log.debug("Starting CrossDC Consumer {}", conf);
log.info("Starting CrossDC Consumer.");
ExecutorService consumerThreadExecutor = Executors.newSingleThreadExecutor();
consumerThreadExecutor.submit(crossDcConsumer);

Expand Down
Loading