diff --git a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/CrossDcConf.java b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/CrossDcConf.java index ca23847..336a3d1 100644 --- a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/CrossDcConf.java +++ b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/CrossDcConf.java @@ -27,6 +27,8 @@ public abstract class CrossDcConf { public static final String COLLAPSE_UPDATES = "collapseUpdates"; public static final String MAX_COLLAPSE_RECORDS = "maxCollapseRecords"; + public static final String SOLR_HTTP2_ENABLED = "solrHttp2Enabled"; + /** * Option to expand Delete-By-Query requests on the producer side. */ diff --git a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaCrossDcConf.java b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaCrossDcConf.java index 9e6d45d..e1c5c24 100644 --- a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaCrossDcConf.java +++ b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaCrossDcConf.java @@ -18,8 +18,10 @@ import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.common.config.SaslConfigs; import org.apache.kafka.common.config.SslConfigs; import org.apache.kafka.common.config.internals.BrokerSecurityConfigs; +import org.apache.kafka.common.security.plain.PlainLoginModule; import java.util.*; @@ -109,6 +111,10 @@ public class KafkaCrossDcConf extends CrossDcConf { public static final String FETCH_MAX_BYTES = "fetchMaxBytes"; + public static final String USERNAME = "userName"; + + public static final String PASSWORD = "password"; + // The maximum delay between invocations of poll() when using consumer group management. This places // an upper bound on the amount of time that the consumer can be idle before fetching more records. // If poll() is not called before expiration of this timeout, then the consumer is considered failed @@ -158,6 +164,7 @@ public class KafkaCrossDcConf extends CrossDcConf { // Consumer only zkConnectString new ConfigProperty(ZK_CONNECT_STRING, null), + new ConfigProperty(SOLR_HTTP2_ENABLED, "false"), new ConfigProperty(FETCH_MIN_BYTES, DEFAULT_FETCH_MIN_BYTES), new ConfigProperty(FETCH_MAX_BYTES, DEFAULT_FETCH_MAX_BYTES), new ConfigProperty(FETCH_MAX_WAIT_MS, DEFAULT_FETCH_MAX_WAIT_MS), @@ -198,6 +205,9 @@ public class KafkaCrossDcConf extends CrossDcConf { new ConfigProperty(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG), new ConfigProperty(SslConfigs.SSL_SECURE_RANDOM_IMPLEMENTATION_CONFIG), + new ConfigProperty(SaslConfigs.SASL_MECHANISM), + new ConfigProperty(USERNAME), + new ConfigProperty(PASSWORD), new ConfigProperty(BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG), @@ -232,11 +242,24 @@ public KafkaCrossDcConf(Map properties) { public static void addSecurityProps(KafkaCrossDcConf conf, Properties kafkaConsumerProps) { for (ConfigProperty property : SECURITY_CONFIG_PROPERTIES) { + if (KafkaCrossDcConf.USERNAME.equals(property.getKey()) || KafkaCrossDcConf.PASSWORD.equals(property.getKey())) + continue; String val = conf.get(property.getKey()); if (val != null) { kafkaConsumerProps.put(property.getKey(), val); } } + + if ("PLAIN".equals(conf.get(SaslConfigs.SASL_MECHANISM))) { + var kafkaUsername = conf.get(KafkaCrossDcConf.USERNAME); + var kafkaPassword = conf.get(KafkaCrossDcConf.PASSWORD); + kafkaConsumerProps.put(SaslConfigs.SASL_JAAS_CONFIG, String.format( + "%s required username=\"%s\" " + "password=\"%s\";", + PlainLoginModule.class.getName(), + kafkaUsername, + kafkaPassword + )); + } } public String get(String property) { @@ -258,7 +281,7 @@ public Boolean getBool(String property) { } return prop.getValueAsBoolean(properties); } - + public Map getAdditionalProperties() { Map additional = new HashMap<>(properties); for (ConfigProperty configProperty : CONFIG_PROPERTIES) { diff --git a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaMirroringSink.java b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaMirroringSink.java index 952e930..a90f3f1 100644 --- a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaMirroringSink.java +++ b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaMirroringSink.java @@ -44,7 +44,7 @@ public class KafkaMirroringSink implements RequestMirroringSink, Closeable { private final KafkaCrossDcConf conf; private final Producer producer; - private final KafkaConsumer consumer; + private KafkaConsumer consumer; private final String mainTopic; private final String dlqTopic; @@ -52,11 +52,11 @@ public KafkaMirroringSink(final KafkaCrossDcConf conf) { // Create Kafka Mirroring Sink this.conf = conf; this.producer = initProducer(); - this.consumer = initConsumer(); + //this.consumer = initConsumer(); this.mainTopic = conf.get(KafkaCrossDcConf.TOPIC_NAME).split(",")[0]; this.dlqTopic = conf.get(KafkaCrossDcConf.DLQ_TOPIC_NAME); - checkTopicsAvailability(); + //checkTopicsAvailability(); } @Override @@ -130,8 +130,8 @@ private Producer initProducer() { // Initialize and return Kafka producer Properties kafkaProducerProps = new Properties(); - log.info("Starting CrossDC Producer {}", conf); - + log.debug("Starting CrossDC Producer {}", conf); + log.info("Starting CrossDC Producer."); kafkaProducerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, conf.get(KafkaCrossDcConf.BOOTSTRAP_SERVERS)); kafkaProducerProps.put(ProducerConfig.ACKS_CONFIG, "all"); @@ -160,7 +160,7 @@ private Producer initProducer() { } ClassLoader originalContextClassLoader = Thread.currentThread().getContextClassLoader(); - Thread.currentThread().setContextClassLoader(null); + Thread.currentThread().setContextClassLoader(KafkaProducer.class.getClassLoader()); Producer producer; try { producer = new KafkaProducer<>(kafkaProducerProps); @@ -185,6 +185,9 @@ private KafkaConsumer initConsumer() { kafkaConsumerProperties.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, conf.getInt(KafkaCrossDcConf.FETCH_MAX_BYTES)); kafkaConsumerProperties.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, conf.getInt(KafkaCrossDcConf.MAX_PARTITION_FETCH_BYTES)); kafkaConsumerProperties.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, conf.getInt(KafkaCrossDcConf.REQUEST_TIMEOUT_MS)); + + KafkaCrossDcConf.addSecurityProps(conf, kafkaConsumerProperties); + kafkaConsumerProperties.putAll(conf.getAdditionalProperties()); return new KafkaConsumer<>(kafkaConsumerProperties, new StringDeserializer(), new MirroredSolrRequestSerializer()); diff --git a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/MirroredSolrRequest.java b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/MirroredSolrRequest.java index 8fe1eb3..4758c37 100644 --- a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/MirroredSolrRequest.java +++ b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/MirroredSolrRequest.java @@ -21,8 +21,11 @@ import org.apache.solr.client.solrj.request.CollectionAdminRequest; import org.apache.solr.client.solrj.request.ConfigSetAdminRequest; import org.apache.solr.client.solrj.request.UpdateRequest; +import org.apache.solr.client.solrj.request.schema.AbstractSchemaRequest; +import org.apache.solr.client.solrj.request.schema.SchemaRequest; import org.apache.solr.client.solrj.response.CollectionAdminResponse; import org.apache.solr.client.solrj.response.ConfigSetAdminResponse; +import org.apache.solr.client.solrj.response.schema.SchemaResponse; import org.apache.solr.common.params.CollectionParams; import org.apache.solr.common.params.ModifiableSolrParams; import org.apache.solr.common.params.SolrParams; @@ -46,6 +49,7 @@ public enum Type { UPDATE, ADMIN, CONFIGSET, + SCHEMA, UNKNOWN; public static final Type get(String s) { @@ -117,6 +121,28 @@ protected ConfigSetAdminResponse createResponse(SolrClient client) { } } + public static class MirroredSchemaRequest extends AbstractSchemaRequest { + private ModifiableSolrParams params; + public MirroredSchemaRequest(METHOD method, ModifiableSolrParams params) { + super(method, "/schema", params); + this.params = params; + } + + @Override + public SolrParams getParams() { + return params; + } + + public void setParams(ModifiableSolrParams params) { + this.params = params; + } + + @Override + protected SchemaResponse createResponse(SolrClient client) { + return new SchemaResponse(); + } + } + public static class ExposedByteArrayContentStream extends ContentStreamBase { private final byte[] bytes; diff --git a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/MirroredSolrRequestSerializer.java b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/MirroredSolrRequestSerializer.java index ceca073..44e5302 100644 --- a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/MirroredSolrRequestSerializer.java +++ b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/MirroredSolrRequestSerializer.java @@ -137,6 +137,10 @@ public MirroredSolrRequest deserialize(String topic, byte[] data) { } request = new MirroredSolrRequest.MirroredConfigSetRequest(method, params, contentStreams); log.debug("-- configSet method={}, req={}, streams={}", request.getMethod(), request.getParams(), csNames); + } else if (type == MirroredSolrRequest.Type.SCHEMA) { + String m = (String) requestMap.get("method"); + SolrRequest.METHOD method = SolrRequest.METHOD.valueOf(m); + request = new MirroredSolrRequest.MirroredSchemaRequest(method, params); } else { throw new RuntimeException("Unknown request type: " + requestMap); } @@ -189,6 +193,9 @@ public byte[] serialize(String topic, MirroredSolrRequest request) { } map.put("contentStreams", streamsList); } + } else if (solrRequest instanceof MirroredSolrRequest.MirroredSchemaRequest) { + MirroredSolrRequest.MirroredSchemaRequest schema = (MirroredSolrRequest.MirroredSchemaRequest) solrRequest; + map.put("method", schema.getMethod().toString()); } codec.marshal(map, baos); diff --git a/crossdc-consumer/build.gradle b/crossdc-consumer/build.gradle index 561a1c7..4fdb42e 100644 --- a/crossdc-consumer/build.gradle +++ b/crossdc-consumer/build.gradle @@ -48,11 +48,11 @@ dependencies { implementation 'io.dropwizard.metrics:metrics-core:4.2.17' implementation 'io.dropwizard.metrics:metrics-servlets:4.2.17' implementation 'org.slf4j:slf4j-api:2.0.5' - implementation 'org.eclipse.jetty:jetty-http:10.0.13' - implementation 'org.eclipse.jetty:jetty-server:10.0.13' + implementation 'org.eclipse.jetty:jetty-http:10.0.19' + implementation 'org.eclipse.jetty:jetty-server:10.0.19' implementation group: 'commons-logging', name: 'commons-logging', version: '1.3.0' implementation group: 'org.apache.logging.log4j', name: 'log4j-api', version: '2.20.0' // log4j impl can use StackLocatorUtil which is in the api jar - implementation 'org.eclipse.jetty:jetty-servlet:10.0.13' + implementation 'org.eclipse.jetty:jetty-servlet:10.0.19' implementation 'org.apache.logging.log4j:log4j-slf4j-impl:2.20.0' implementation group: 'org.slf4j', name: 'slf4j-simple', version: '2.0.9' testImplementation project(path: ':crossdc-commons') diff --git a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Consumer.java b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Consumer.java index 429423e..1b2a675 100644 --- a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Consumer.java +++ b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Consumer.java @@ -22,22 +22,17 @@ import org.apache.solr.common.SolrException; import org.apache.solr.common.cloud.SolrZkClient; import org.apache.solr.crossdc.common.ConfUtil; -import org.apache.solr.crossdc.common.ConfigProperty; -import org.apache.solr.crossdc.common.CrossDcConf; import org.apache.solr.crossdc.common.KafkaCrossDcConf; import org.apache.solr.crossdc.common.SensitivePropRedactionUtils; import org.eclipse.jetty.server.Server; -import org.eclipse.jetty.server.ServerConnector; import org.eclipse.jetty.servlet.ServletContextHandler; import org.eclipse.jetty.servlet.ServletMapping; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.ByteArrayInputStream; import java.lang.invoke.MethodHandles; import java.util.HashMap; import java.util.Map; -import java.util.Properties; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -76,6 +71,8 @@ public void start(Map properties ) { try (SolrZkClient client = new SolrZkClient.Builder().withUrl(zkConnectString).withTimeout(15000, TimeUnit.MILLISECONDS).build()) { // update properties, potentially also from ZK ConfUtil.fillProperties(client, properties); + } catch (Exception e) { + log.info("Failed to fetch properties from ZK.", e); } ConfUtil.verifyProperties(properties); @@ -104,8 +101,8 @@ public void start(Map properties ) { // Start consumer thread - log.info("Starting CrossDC Consumer {}", conf); - + log.debug("Starting CrossDC Consumer {}", conf); + log.info("Starting CrossDC Consumer."); ExecutorService consumerThreadExecutor = Executors.newSingleThreadExecutor(); consumerThreadExecutor.submit(crossDcConsumer); diff --git a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java index d5e8bf8..e7154f2 100644 --- a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java +++ b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java @@ -2,13 +2,42 @@ import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.SharedMetricRegistries; +import org.apache.http.HttpException; +import org.apache.http.HttpHost; +import org.apache.http.HttpRequest; +import org.apache.http.HttpRequestInterceptor; +import org.apache.http.auth.AuthScheme; +import org.apache.http.auth.AuthScope; +import org.apache.http.auth.AuthState; +import org.apache.http.auth.Credentials; +import org.apache.http.auth.UsernamePasswordCredentials; +import org.apache.http.client.CredentialsProvider; +import org.apache.http.client.HttpClient; +import org.apache.http.client.config.RequestConfig; +import org.apache.http.client.protocol.HttpClientContext; +import org.apache.http.config.Registry; +import org.apache.http.config.RegistryBuilder; +import org.apache.http.conn.socket.ConnectionSocketFactory; +import org.apache.http.conn.socket.PlainConnectionSocketFactory; +import org.apache.http.conn.ssl.NoopHostnameVerifier; +import org.apache.http.conn.ssl.SSLConnectionSocketFactory; +import org.apache.http.impl.auth.BasicScheme; +import org.apache.http.impl.client.BasicCredentialsProvider; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; +import org.apache.http.protocol.HttpContext; +import org.apache.http.protocol.HttpCoreContext; import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.solr.client.solrj.SolrRequest; +import org.apache.solr.client.solrj.embedded.SSLConfig; +import org.apache.solr.client.solrj.impl.CloudHttp2SolrClient; +import org.apache.solr.client.solrj.impl.CloudLegacySolrClient; import org.apache.solr.client.solrj.impl.CloudSolrClient; +import org.apache.solr.client.solrj.impl.Http2SolrClient; import org.apache.solr.client.solrj.request.UpdateRequest; import org.apache.solr.common.SolrInputDocument; import org.apache.solr.common.params.ModifiableSolrParams; @@ -16,10 +45,26 @@ import org.apache.solr.common.util.NamedList; import org.apache.solr.crossdc.common.*; import org.apache.solr.crossdc.messageprocessor.SolrMessageProcessor; +import org.eclipse.jetty.util.ssl.SslContextFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.net.ssl.CertPathTrustManagerParameters; +import javax.net.ssl.KeyManagerFactory; +import javax.net.ssl.SSLContext; +import javax.net.ssl.TrustManager; +import javax.net.ssl.TrustManagerFactory; +import javax.net.ssl.X509TrustManager; import java.lang.invoke.MethodHandles; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.security.KeyStore; +import java.security.SecureRandom; +import java.security.cert.CRL; +import java.security.cert.CertificateException; +import java.security.cert.PKIXBuilderParameters; +import java.security.cert.X509Certificate; import java.time.Duration; import java.util.*; import java.util.concurrent.*; @@ -35,434 +80,654 @@ * top the consumer thread. */ public class KafkaCrossDcConsumer extends Consumer.CrossDcConsumer { - private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); - private final MetricRegistry metrics = SharedMetricRegistries.getOrCreate(Consumer.METRICS_REGISTRY); + private final MetricRegistry metrics = SharedMetricRegistries.getOrCreate(Consumer.METRICS_REGISTRY); - private final KafkaConsumer kafkaConsumer; - private final CountDownLatch startLatch; - KafkaMirroringSink kafkaMirroringSink; + private final KafkaConsumer kafkaConsumer; + private final CountDownLatch startLatch; + KafkaMirroringSink kafkaMirroringSink; - private final static int KAFKA_CONSUMER_POLL_TIMEOUT_MS = 5000; - private final String[] topicNames; - private final int maxAttempts; - private final CrossDcConf.CollapseUpdates collapseUpdates; - private final int maxCollapseRecords; - private final SolrMessageProcessor messageProcessor; + private final static int KAFKA_CONSUMER_POLL_TIMEOUT_MS = 5000; + private final String[] topicNames; + private final int maxAttempts; + private final CrossDcConf.CollapseUpdates collapseUpdates; + private final int maxCollapseRecords; + private final SolrMessageProcessor messageProcessor; - protected final CloudSolrClient solrClient; + protected final CloudSolrClient solrClient; - private final ThreadPoolExecutor executor; + private final ThreadPoolExecutor executor; - private final ExecutorService offsetCheckExecutor = Executors.newCachedThreadPool(r -> { - Thread t = new Thread(r); - t.setName("offset-check-thread"); - return t; - }); - private PartitionManager partitionManager; - - private BlockingQueue queue = new BlockingQueue<>(10); + private final ExecutorService offsetCheckExecutor = Executors.newCachedThreadPool(r -> { + Thread t = new Thread(r); + t.setName("offset-check-thread"); + return t; + }); + private PartitionManager partitionManager; + private BlockingQueue queue = new BlockingQueue<>(10); - /** - * @param conf The Kafka consumer configuration - * @param startLatch - */ - public KafkaCrossDcConsumer(KafkaCrossDcConf conf, CountDownLatch startLatch) { + /** + * @param conf The Kafka consumer configuration + * @param startLatch + */ + public KafkaCrossDcConsumer(KafkaCrossDcConf conf, CountDownLatch startLatch) { - this.topicNames = conf.get(KafkaCrossDcConf.TOPIC_NAME).split(","); - this.maxAttempts = conf.getInt(KafkaCrossDcConf.MAX_ATTEMPTS); - this.collapseUpdates = CrossDcConf.CollapseUpdates.getOrDefault(conf.get(CrossDcConf.COLLAPSE_UPDATES), CrossDcConf.CollapseUpdates.PARTIAL); - this.maxCollapseRecords = conf.getInt(KafkaCrossDcConf.MAX_COLLAPSE_RECORDS); - this.startLatch = startLatch; - final Properties kafkaConsumerProps = new Properties(); + this.topicNames = conf.get(KafkaCrossDcConf.TOPIC_NAME).split(","); + this.maxAttempts = conf.getInt(KafkaCrossDcConf.MAX_ATTEMPTS); + this.collapseUpdates = CrossDcConf.CollapseUpdates.getOrDefault(conf.get(CrossDcConf.COLLAPSE_UPDATES), CrossDcConf.CollapseUpdates.PARTIAL); + this.maxCollapseRecords = conf.getInt(KafkaCrossDcConf.MAX_COLLAPSE_RECORDS); + this.startLatch = startLatch; + final Properties kafkaConsumerProps = new Properties(); - kafkaConsumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, conf.get(KafkaCrossDcConf.BOOTSTRAP_SERVERS)); + kafkaConsumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, conf.get(KafkaCrossDcConf.BOOTSTRAP_SERVERS)); - kafkaConsumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, conf.get(KafkaCrossDcConf.GROUP_ID)); + kafkaConsumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, conf.get(KafkaCrossDcConf.GROUP_ID)); - kafkaConsumerProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, conf.getInt(KafkaCrossDcConf.MAX_POLL_RECORDS)); + kafkaConsumerProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, conf.getInt(KafkaCrossDcConf.MAX_POLL_RECORDS)); - kafkaConsumerProps.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, conf.get(KafkaCrossDcConf.MAX_POLL_INTERVAL_MS)); + kafkaConsumerProps.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, conf.get(KafkaCrossDcConf.MAX_POLL_INTERVAL_MS)); - kafkaConsumerProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, conf.get(KafkaCrossDcConf.SESSION_TIMEOUT_MS)); - kafkaConsumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + kafkaConsumerProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, conf.get(KafkaCrossDcConf.SESSION_TIMEOUT_MS)); + kafkaConsumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - kafkaConsumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); + kafkaConsumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); - kafkaConsumerProps.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, conf.getInt(KafkaCrossDcConf.FETCH_MIN_BYTES)); - kafkaConsumerProps.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, conf.getInt(KafkaCrossDcConf.FETCH_MAX_WAIT_MS)); + kafkaConsumerProps.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, conf.getInt(KafkaCrossDcConf.FETCH_MIN_BYTES)); + kafkaConsumerProps.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, conf.getInt(KafkaCrossDcConf.FETCH_MAX_WAIT_MS)); - kafkaConsumerProps.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, conf.getInt(KafkaCrossDcConf.FETCH_MAX_BYTES)); - kafkaConsumerProps.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, conf.getInt(KafkaCrossDcConf.MAX_PARTITION_FETCH_BYTES)); + kafkaConsumerProps.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, conf.getInt(KafkaCrossDcConf.FETCH_MAX_BYTES)); + kafkaConsumerProps.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, conf.getInt(KafkaCrossDcConf.MAX_PARTITION_FETCH_BYTES)); - kafkaConsumerProps.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, conf.getInt(KafkaCrossDcConf.REQUEST_TIMEOUT_MS)); + kafkaConsumerProps.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, conf.getInt(KafkaCrossDcConf.REQUEST_TIMEOUT_MS)); - KafkaCrossDcConf.addSecurityProps(conf, kafkaConsumerProps); + KafkaCrossDcConf.addSecurityProps(conf, kafkaConsumerProps); - kafkaConsumerProps.putAll(conf.getAdditionalProperties()); - int threads = conf.getInt(KafkaCrossDcConf.CONSUMER_PROCESSING_THREADS); + kafkaConsumerProps.putAll(conf.getAdditionalProperties()); + int threads = conf.getInt(KafkaCrossDcConf.CONSUMER_PROCESSING_THREADS); - executor = new ThreadPoolExecutor(threads, threads, 0L, TimeUnit.MILLISECONDS, queue, new ThreadFactory() { + executor = new ThreadPoolExecutor(threads, threads, 0L, TimeUnit.MILLISECONDS, queue, new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setName("KafkaCrossDcConsumerWorker"); return t; } - }); - executor.prestartAllCoreThreads(); + }); + executor.prestartAllCoreThreads(); - solrClient = createSolrClient(conf); + solrClient = createSolrClient(conf); - messageProcessor = createSolrMessageProcessor(); + messageProcessor = createSolrMessageProcessor(); + log.debug("Creating Kafka consumer with configuration {}", kafkaConsumerProps); + log.info("Creating Kafka consumer."); + kafkaConsumer = createKafkaConsumer(kafkaConsumerProps); + log.info("Created Kafka consumer."); + partitionManager = new PartitionManager(kafkaConsumer); + // Create producer for resubmitting failed requests + log.info("Creating Kafka resubmit producer"); + this.kafkaMirroringSink = createKafkaMirroringSink(conf); + log.info("Created Kafka resubmit producer"); - log.info("Creating Kafka consumer with configuration {}", kafkaConsumerProps); - kafkaConsumer = createKafkaConsumer(kafkaConsumerProps); - partitionManager = new PartitionManager(kafkaConsumer); - // Create producer for resubmitting failed requests - log.info("Creating Kafka resubmit producer"); - this.kafkaMirroringSink = createKafkaMirroringSink(conf); - log.info("Created Kafka resubmit producer"); + } - } + protected SolrMessageProcessor createSolrMessageProcessor() { + return new SolrMessageProcessor(solrClient, resubmitRequest -> 0L); + } - protected SolrMessageProcessor createSolrMessageProcessor() { - return new SolrMessageProcessor(solrClient, resubmitRequest -> 0L); - } + public KafkaConsumer createKafkaConsumer(Properties properties) { + return new KafkaConsumer<>(properties, new StringDeserializer(), new MirroredSolrRequestSerializer()); + } - public KafkaConsumer createKafkaConsumer(Properties properties) { - return new KafkaConsumer<>(properties, new StringDeserializer(), new MirroredSolrRequestSerializer()); - } + /** + * This is where the magic happens. + * 1. Polls and gets the packets from the queue + * 2. Extract the MirroredSolrRequest objects + * 3. Send the request to the MirroredSolrRequestHandler that has the processing, retry, error handling logic. + */ + @Override + public void run() { + log.info("About to start Kafka consumer thread, topics={}", Arrays.asList(topicNames)); - /** - * This is where the magic happens. - * 1. Polls and gets the packets from the queue - * 2. Extract the MirroredSolrRequest objects - * 3. Send the request to the MirroredSolrRequestHandler that has the processing, retry, error handling logic. - */ - @Override public void run() { - log.info("About to start Kafka consumer thread, topics={}", Arrays.asList(topicNames)); + try { - try { + kafkaConsumer.subscribe(Arrays.asList((topicNames))); - kafkaConsumer.subscribe(Arrays.asList((topicNames))); + log.info("Consumer started"); + startLatch.countDown(); - log.info("Consumer started"); - startLatch.countDown(); + while (pollAndProcessRequests()) { + //no-op within this loop: everything is done in pollAndProcessRequests method defined above. + } - while (pollAndProcessRequests()) { - //no-op within this loop: everything is done in pollAndProcessRequests method defined above. - } + log.info("Closed kafka consumer. Exiting now."); + try { + kafkaConsumer.close(); + } catch (Exception e) { + log.warn("Failed to close kafka consumer", e); + } - log.info("Closed kafka consumer. Exiting now."); - try { - kafkaConsumer.close(); - } catch (Exception e) { - log.warn("Failed to close kafka consumer", e); - } + try { + kafkaMirroringSink.close(); + } catch (Exception e) { + log.warn("Failed to close kafka mirroring sink", e); + } + } finally { + IOUtils.closeQuietly(solrClient); + } - try { - kafkaMirroringSink.close(); - } catch (Exception e) { - log.warn("Failed to close kafka mirroring sink", e); - } - } finally { - IOUtils.closeQuietly(solrClient); } - } - - /** - * Polls and processes the requests from Kafka. This method returns false when the consumer needs to be - * shutdown i.e. when there's a wakeup exception. - */ - boolean pollAndProcessRequests() { - log.trace("Entered pollAndProcessRequests loop"); - try { - try { - partitionManager.checkOffsetUpdates(); - } catch (Throwable e) { - log.error("Error while checking offset updates, shutting down", e); - return false; - } - - ConsumerRecords records = kafkaConsumer.poll(Duration.ofMillis(KAFKA_CONSUMER_POLL_TIMEOUT_MS)); - - if (log.isTraceEnabled()) { - log.trace("poll return {} records", records.count()); - } - - UpdateRequest updateReqBatch = null; - int currentCollapsed = 0; - - ConsumerRecord lastRecord = null; - - for (TopicPartition partition : records.partitions()) { - List> partitionRecords = records.records(partition); - - PartitionManager.PartitionWork partitionWork = partitionManager.getPartitionWork(partition); - PartitionManager.WorkUnit workUnit = new PartitionManager.WorkUnit(partition); - workUnit.nextOffset = PartitionManager.getOffsetForPartition(partitionRecords); - partitionWork.partitionQueue.add(workUnit); + /** + * Polls and processes the requests from Kafka. This method returns false when the consumer needs to be + * shutdown i.e. when there's a wakeup exception. + */ + boolean pollAndProcessRequests() { + log.trace("Entered pollAndProcessRequests loop"); try { - ModifiableSolrParams lastUpdateParams = null; - NamedList lastUpdateParamsAsNamedList = null; - for (ConsumerRecord requestRecord : partitionRecords) { - if (log.isTraceEnabled()) { - log.trace("Fetched record from topic={} partition={} key={} value={}", requestRecord.topic(), requestRecord.partition(), requestRecord.key(), - requestRecord.value()); + try { + partitionManager.checkOffsetUpdates(); + } catch (Throwable e) { + log.error("Error while checking offset updates, shutting down", e); + return false; } - lastRecord = requestRecord; - MirroredSolrRequest req = requestRecord.value(); - SolrRequest solrReq = req.getSolrRequest(); - MirroredSolrRequest.Type type = req.getType(); - metrics.counter(MetricRegistry.name(type.name(), "input")).inc(); - ModifiableSolrParams params = new ModifiableSolrParams(solrReq.getParams()); + ConsumerRecords records = kafkaConsumer.poll(Duration.ofMillis(KAFKA_CONSUMER_POLL_TIMEOUT_MS)); + if (log.isTraceEnabled()) { - log.trace("-- picked type={}, params={}", req.getType(), params); + log.trace("poll return {} records", records.count()); } - // determine if it's an UPDATE with deletes, or if the existing batch has deletes - boolean hasDeletes = false; - if (type == MirroredSolrRequest.Type.UPDATE) { - UpdateRequest ureq = (UpdateRequest) solrReq; - hasDeletes = hasDeletes(ureq) || hasDeletes(updateReqBatch); + UpdateRequest updateReqBatch = null; + int currentCollapsed = 0; + + ConsumerRecord lastRecord = null; + + for (TopicPartition partition : records.partitions()) { + List> partitionRecords = records.records(partition); + + PartitionManager.PartitionWork partitionWork = partitionManager.getPartitionWork(partition); + PartitionManager.WorkUnit workUnit = new PartitionManager.WorkUnit(partition); + workUnit.nextOffset = PartitionManager.getOffsetForPartition(partitionRecords); + partitionWork.partitionQueue.add(workUnit); + try { + ModifiableSolrParams lastUpdateParams = null; + NamedList lastUpdateParamsAsNamedList = null; + for (ConsumerRecord requestRecord : partitionRecords) { + if (log.isTraceEnabled()) { + log.trace("Fetched record from topic={} partition={} key={} value={}", requestRecord.topic(), requestRecord.partition(), requestRecord.key(), + requestRecord.value()); + } + + lastRecord = requestRecord; + MirroredSolrRequest req = requestRecord.value(); + SolrRequest solrReq = req.getSolrRequest(); + MirroredSolrRequest.Type type = req.getType(); + metrics.counter(MetricRegistry.name(type.name(), "input")).inc(); + ModifiableSolrParams params = new ModifiableSolrParams(solrReq.getParams()); + if (log.isTraceEnabled()) { + log.trace("-- picked type={}, params={}", req.getType(), params); + } + + // determine if it's an UPDATE with deletes, or if the existing batch has deletes + boolean hasDeletes = false; + if (type == MirroredSolrRequest.Type.UPDATE) { + UpdateRequest ureq = (UpdateRequest) solrReq; + hasDeletes = hasDeletes(ureq) || hasDeletes(updateReqBatch); + } + + // it's an update but with different params + if (type == MirroredSolrRequest.Type.UPDATE && + ( + // different params + (lastUpdateParams != null && !lastUpdateParams.toNamedList().equals(params.toNamedList())) || + // no collapsing + (collapseUpdates == CrossDcConf.CollapseUpdates.NONE) || + // partial collapsing but has deletes + (collapseUpdates == CrossDcConf.CollapseUpdates.PARTIAL && hasDeletes) || + // too many collapsed - emit + currentCollapsed >= maxCollapseRecords + ) + ) { + if (log.isTraceEnabled()) { + log.trace("Starting new UpdateRequest, params={}", params); + } + // send previous batch, if any + if (updateReqBatch != null) { + sendBatch(updateReqBatch, type, lastRecord, workUnit); + } + updateReqBatch = null; + lastUpdateParamsAsNamedList = null; + currentCollapsed = 0; + workUnit = new PartitionManager.WorkUnit(partition); + workUnit.nextOffset = PartitionManager.getOffsetForPartition(partitionRecords); + partitionWork.partitionQueue.add(workUnit); + } + + lastUpdateParams = params; + if (type == MirroredSolrRequest.Type.UPDATE) { + if (updateReqBatch == null) { + // just initialize + updateReqBatch = new UpdateRequest(); + } else { + if (collapseUpdates == CrossDcConf.CollapseUpdates.NONE) { + throw new RuntimeException("Can't collapse requests."); + } + if (collapseUpdates == CrossDcConf.CollapseUpdates.PARTIAL && hasDeletes) { + throw new RuntimeException("Can't collapse requests with deletions."); + } + metrics.counter(MetricRegistry.name(type.name(), "collapsed")).inc(); + currentCollapsed++; + } + UpdateRequest update = (UpdateRequest) solrReq; + MirroredSolrRequest.setParams(updateReqBatch, params); + if (lastUpdateParamsAsNamedList == null) { + lastUpdateParamsAsNamedList = lastUpdateParams.toNamedList(); + } + // merge + List docs = update.getDocuments(); + if (docs != null) { + updateReqBatch.add(docs); + metrics.counter(MetricRegistry.name(type.name(), "add")).inc(docs.size()); + } + List deletes = update.getDeleteById(); + if (deletes != null) { + updateReqBatch.deleteById(deletes); + metrics.counter(MetricRegistry.name(type.name(), "dbi")).inc(deletes.size()); + } + List deleteByQuery = update.getDeleteQuery(); + if (deleteByQuery != null) { + for (String delByQuery : deleteByQuery) { + updateReqBatch.deleteByQuery(delByQuery); + } + metrics.counter(MetricRegistry.name(type.name(), "dbq")).inc(deleteByQuery.size()); + } + } else { + // non-update requests should be sent immediately + sendBatch(req.getSolrRequest(), type, lastRecord, workUnit); + } + } + + if (updateReqBatch != null) { + sendBatch(updateReqBatch, MirroredSolrRequest.Type.UPDATE, lastRecord, workUnit); + } + try { + partitionManager.checkForOffsetUpdates(partition); + } catch (Throwable e) { + log.error("Error while checking offset updates, shutting down", e); + return false; + } + + // handleItem sets the thread interrupt, let's exit if there has been an interrupt set + if (Thread.currentThread().isInterrupted()) { + log.info("Kafka Consumer thread interrupted, shutting down Kafka consumer."); + return false; + } + } catch (WakeupException e) { + log.info("Caught wakeup exception, shutting down KafkaSolrRequestConsumer."); + return false; + } catch (Exception e) { + // If there is any exception returned by handleItem, don't set the offset. + + if (e instanceof ClassCastException || e instanceof SerializationException) { // TODO: optional + log.error("Non retryable error", e); + return false; + } + log.error("Exception occurred in Kafka consumer thread, stopping the Consumer.", e); + return false; + } } - // it's an update but with different params - if (type == MirroredSolrRequest.Type.UPDATE && - ( - // different params - (lastUpdateParams != null && !lastUpdateParams.toNamedList().equals(params.toNamedList())) || - // no collapsing - (collapseUpdates == CrossDcConf.CollapseUpdates.NONE) || - // partial collapsing but has deletes - (collapseUpdates == CrossDcConf.CollapseUpdates.PARTIAL && hasDeletes) || - // too many collapsed - emit - currentCollapsed >= maxCollapseRecords - ) - ) { - if (log.isTraceEnabled()) { - log.trace("Starting new UpdateRequest, params={}", params); - } - // send previous batch, if any - if (updateReqBatch != null) { - sendBatch(updateReqBatch, type, lastRecord, workUnit); - } - updateReqBatch = null; - lastUpdateParamsAsNamedList = null; - currentCollapsed = 0; - workUnit = new PartitionManager.WorkUnit(partition); - workUnit.nextOffset = PartitionManager.getOffsetForPartition(partitionRecords); - partitionWork.partitionQueue.add(workUnit); + try { + partitionManager.checkOffsetUpdates(); + } catch (Throwable e) { + log.error("Error while checking offset updates, shutting down", e); + return false; } - lastUpdateParams = params; - if (type == MirroredSolrRequest.Type.UPDATE) { - if (updateReqBatch == null) { - // just initialize - updateReqBatch = new UpdateRequest(); - } else { - if (collapseUpdates == CrossDcConf.CollapseUpdates.NONE) { - throw new RuntimeException("Can't collapse requests."); - } - if (collapseUpdates == CrossDcConf.CollapseUpdates.PARTIAL && hasDeletes) { - throw new RuntimeException("Can't collapse requests with deletions."); - } - metrics.counter(MetricRegistry.name(type.name(), "collapsed")).inc(); - currentCollapsed++; - } - UpdateRequest update = (UpdateRequest) solrReq; - MirroredSolrRequest.setParams(updateReqBatch, params); - if (lastUpdateParamsAsNamedList == null) { - lastUpdateParamsAsNamedList = lastUpdateParams.toNamedList(); - } - // merge - List docs = update.getDocuments(); - if (docs != null) { - updateReqBatch.add(docs); - metrics.counter(MetricRegistry.name(type.name(), "add")).inc(docs.size()); - } - List deletes = update.getDeleteById(); - if (deletes != null) { - updateReqBatch.deleteById(deletes); - metrics.counter(MetricRegistry.name(type.name(), "dbi")).inc(deletes.size()); - } - List deleteByQuery = update.getDeleteQuery(); - if (deleteByQuery != null) { - for (String delByQuery : deleteByQuery) { - updateReqBatch.deleteByQuery(delByQuery); - } - metrics.counter(MetricRegistry.name(type.name(), "dbq")).inc(deleteByQuery.size()); - } - } else { - // non-update requests should be sent immediately - sendBatch(req.getSolrRequest(), type, lastRecord, workUnit); - } - } - - if (updateReqBatch != null) { - sendBatch(updateReqBatch, MirroredSolrRequest.Type.UPDATE, lastRecord, workUnit); - } - try { - partitionManager.checkForOffsetUpdates(partition); - } catch (Throwable e) { - log.error("Error while checking offset updates, shutting down", e); - return false; - } - - // handleItem sets the thread interrupt, let's exit if there has been an interrupt set - if (Thread.currentThread().isInterrupted()) { - log.info("Kafka Consumer thread interrupted, shutting down Kafka consumer."); - return false; - } } catch (WakeupException e) { - log.info("Caught wakeup exception, shutting down KafkaSolrRequestConsumer."); - return false; + log.info("Caught wakeup exception, shutting down KafkaSolrRequestConsumer"); + return false; } catch (Exception e) { - // If there is any exception returned by handleItem, don't set the offset. - if (e instanceof ClassCastException || e instanceof SerializationException) { // TODO: optional - log.error("Non retryable error", e); + if (e instanceof ClassCastException || e instanceof SerializationException) { // TODO: optional + log.error("Non retryable error", e); + return false; + } + + log.error("Exception occurred in Kafka consumer thread, but we will continue.", e); + } + return true; + } + + private boolean hasDeletes(UpdateRequest ureq) { + if (ureq == null) { return false; - } - log.error("Exception occurred in Kafka consumer thread, stopping the Consumer.", e); - return false; } - } - - try { - partitionManager.checkOffsetUpdates(); - } catch (Throwable e) { - log.error("Error while checking offset updates, shutting down", e); - return false; - } - - } catch (WakeupException e) { - log.info("Caught wakeup exception, shutting down KafkaSolrRequestConsumer"); - return false; - } catch (Exception e) { - - if (e instanceof ClassCastException || e instanceof SerializationException) { // TODO: optional - log.error("Non retryable error", e); - return false; - } - - log.error("Exception occurred in Kafka consumer thread, but we will continue.", e); + return (ureq.getDeleteByIdMap() != null && !ureq.getDeleteByIdMap().isEmpty()) || + (ureq.getDeleteQuery() != null && !ureq.getDeleteQuery().isEmpty()); } - return true; - } - private boolean hasDeletes(UpdateRequest ureq) { - if (ureq == null) { - return false; + public void sendBatch(SolrRequest solrReqBatch, MirroredSolrRequest.Type type, ConsumerRecord lastRecord, PartitionManager.WorkUnit workUnit) { + SolrRequest finalSolrReqBatch = solrReqBatch; + // Kafka client is not thread-safe !!! + Future future = executor.submit(() -> { + try { + final MirroredSolrRequest mirroredSolrRequest = new MirroredSolrRequest(type, lastRecord.value().getAttempt(), finalSolrReqBatch); + final IQueueHandler.Result result = messageProcessor.handleItem(mirroredSolrRequest); + + processResult(type, result); + } catch (MirroringException e) { + // We don't really know what to do here + log.error("Mirroring exception occurred while resubmitting to Kafka. We are going to stop the consumer thread now.", e); + throw new RuntimeException(e); + } + + }); + workUnit.workItems.add(future); } - return (ureq.getDeleteByIdMap() != null && !ureq.getDeleteByIdMap().isEmpty()) || - (ureq.getDeleteQuery() != null && !ureq.getDeleteQuery().isEmpty()); - } - - public void sendBatch(SolrRequest solrReqBatch, MirroredSolrRequest.Type type, ConsumerRecord lastRecord, PartitionManager.WorkUnit workUnit) { - SolrRequest finalSolrReqBatch = solrReqBatch; - // Kafka client is not thread-safe !!! - Future future = executor.submit(() -> { - try { - final MirroredSolrRequest mirroredSolrRequest = new MirroredSolrRequest(type, lastRecord.value().getAttempt(), finalSolrReqBatch); - final IQueueHandler.Result result = messageProcessor.handleItem(mirroredSolrRequest); - - processResult(type, result); - } catch (MirroringException e) { - // We don't really know what to do here - log.error("Mirroring exception occurred while resubmitting to Kafka. We are going to stop the consumer thread now.", e); - throw new RuntimeException(e); - } - }); - workUnit.workItems.add(future); - } + protected void processResult(MirroredSolrRequest.Type type, IQueueHandler.Result result) throws MirroringException { + MirroredSolrRequest item = result.getItem(); + switch (result.status()) { + case FAILED_RESUBMIT: + if (log.isTraceEnabled()) { + log.trace("result=failed-resubmit"); + } + final int attempt = item.getAttempt(); + if (attempt > this.maxAttempts) { + log.info("Sending message to dead letter queue because of max attempts limit with current value = {}", attempt); + kafkaMirroringSink.submitToDlq(item); + metrics.counter(MetricRegistry.name(type.name(), "failed-dlq")).inc(); + } else { + kafkaMirroringSink.submit(item); + metrics.counter(MetricRegistry.name(type.name(), "failed-resubmit")).inc(); + } + break; + case HANDLED: + // no-op + if (log.isTraceEnabled()) { + log.trace("result=handled"); + } + metrics.counter(MetricRegistry.name(type.name(), "handled")).inc(); + break; + case NOT_HANDLED_SHUTDOWN: + if (log.isTraceEnabled()) { + log.trace("result=nothandled_shutdown"); + } + metrics.counter(MetricRegistry.name(type.name(), "nothandled_shutdown")).inc(); + case FAILED_RETRY: + log.error("Unexpected response while processing request. We never expect {}.", result.status().toString()); + metrics.counter(MetricRegistry.name(type.name(), "failed-retry")).inc(); + break; + default: + if (log.isTraceEnabled()) { + log.trace("result=no matching case"); + } + // no-op + } + } - protected void processResult(MirroredSolrRequest.Type type, IQueueHandler.Result result) throws MirroringException { - MirroredSolrRequest item = result.getItem(); - switch (result.status()) { - case FAILED_RESUBMIT: - if (log.isTraceEnabled()) { - log.trace("result=failed-resubmit"); + /** + * Shutdown the Kafka consumer by calling wakeup. + */ + public final void shutdown() { + kafkaConsumer.wakeup(); + log.info("Shutdown called on KafkaCrossDcConsumer"); + try { + if (!executor.isShutdown()) { + executor.shutdown(); + executor.awaitTermination(30, TimeUnit.SECONDS); + } + if (!offsetCheckExecutor.isShutdown()) { + offsetCheckExecutor.shutdown(); + offsetCheckExecutor.awaitTermination(30, TimeUnit.SECONDS); + } + solrClient.close(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + log.warn("Interrupted while waiting for executor to shutdown"); + } catch (Exception e) { + log.warn("Exception closing Solr client on shutdown", e); + } finally { + Util.logMetrics(metrics); } - final int attempt = item.getAttempt(); - if (attempt > this.maxAttempts) { - log.info("Sending message to dead letter queue because of max attempts limit with current value = {}", attempt); - kafkaMirroringSink.submitToDlq(item); - metrics.counter(MetricRegistry.name(type.name(), "failed-dlq")).inc(); + } + + protected CloudSolrClient createSolrClient(KafkaCrossDcConf conf) { + long timeoutInMillis = 25 * 1000; + int maxConnectionsPerHost = 5; + + SSLConfig sslconfig = getSSLConfig(conf); + var solrHttp2 = conf.getBool(KafkaCrossDcConf.SOLR_HTTP2_ENABLED); + + if (solrHttp2) { + final Http2SolrClient client = new Http2SolrClient.Builder() + .withBasicAuthCredentials("solr", "SolrRocks") + .withMaxConnectionsPerHost(maxConnectionsPerHost) + .withIdleTimeout(timeoutInMillis, TimeUnit.MILLISECONDS) + .withConnectionTimeout(timeoutInMillis, TimeUnit.MILLISECONDS) + .withSSLConfig(sslconfig) + .build(); + return new CloudHttp2SolrClient.Builder(Collections.singletonList(conf.get(KafkaCrossDcConf.ZK_CONNECT_STRING)), Optional.empty()) + .withHttpClient(client) + .build(); } else { - kafkaMirroringSink.submit(item); - metrics.counter(MetricRegistry.name(type.name(), "failed-resubmit")).inc(); + int timeout = 25 * 1000; + int maxConnections = 10; + int maxConnectionsPerRoute = 5; + final RequestConfig requestConfig = RequestConfig.custom() + .setConnectTimeout(timeout) + .setConnectionRequestTimeout(timeout) + .setSocketTimeout(timeout) + .build(); + + CredentialsProvider provider = new BasicCredentialsProvider(); + // TODO: Store creds in a vault. + UsernamePasswordCredentials credentials = new UsernamePasswordCredentials("solr", "SolrRocks"); + provider.setCredentials(AuthScope.ANY, credentials); + + SSLContext sslcontext = getSSLContext(conf); + SSLConnectionSocketFactory factory = new SSLConnectionSocketFactory(sslcontext, NoopHostnameVerifier.INSTANCE); + + Registry socketFactoryRegistry = RegistryBuilder.create() + .register("https", factory) + .register("http", new PlainConnectionSocketFactory()) + .build(); + final PoolingHttpClientConnectionManager connectionManager = + new PoolingHttpClientConnectionManager(socketFactoryRegistry); + connectionManager.setMaxTotal(maxConnections); + connectionManager.setDefaultMaxPerRoute(maxConnectionsPerRoute); + + final HttpClient client = HttpClients.custom() + .setDefaultCredentialsProvider(provider) + .setDefaultRequestConfig(requestConfig) + .setSSLSocketFactory(factory) + .setConnectionManager(connectionManager) + .addInterceptorFirst(new PreemptiveAuthInterceptor(provider)) + .build(); + + return new CloudLegacySolrClient.Builder(Collections.singletonList(conf.get(KafkaCrossDcConf.ZK_CONNECT_STRING)), Optional.empty()) + .withHttpClient(client) + .build(); } - break; - case HANDLED: - // no-op - if (log.isTraceEnabled()) { - log.trace("result=handled"); + } + + static class PreemptiveAuthInterceptor implements HttpRequestInterceptor { + CredentialsProvider credsProvider; + + PreemptiveAuthInterceptor(CredentialsProvider credentialsProvider) { + this.credsProvider = credentialsProvider; } - metrics.counter(MetricRegistry.name(type.name(), "handled")).inc(); - break; - case NOT_HANDLED_SHUTDOWN: - if (log.isTraceEnabled()) { - log.trace("result=nothandled_shutdown"); + + public void process(final HttpRequest request, final HttpContext context) throws HttpException { + AuthState authState = (AuthState) context.getAttribute(HttpClientContext.TARGET_AUTH_STATE); + if (authState.getAuthScheme() == null) { + AuthScheme authScheme = new BasicScheme(); + HttpHost targetHost = (HttpHost) context.getAttribute(HttpCoreContext.HTTP_TARGET_HOST); + Credentials creds = credsProvider.getCredentials(new AuthScope(targetHost.getHostName(), targetHost.getPort())); + if (creds == null) { + throw new HttpException("No credentials for preemptive authentication"); + } + authState.update(authScheme, creds); + } + } - metrics.counter(MetricRegistry.name(type.name(), "nothandled_shutdown")).inc(); - case FAILED_RETRY: - log.error("Unexpected response while processing request. We never expect {}.", result.status().toString()); - metrics.counter(MetricRegistry.name(type.name(), "failed-retry")).inc(); - break; - default: - if (log.isTraceEnabled()) { - log.trace("result=no matching case"); + } + + private static SSLContext getSSLContext(KafkaCrossDcConf conf) { + final Path trustStorePath = Paths.get(conf.get("ssl.truststore.location")); + final Path keyStorePath = Paths.get(conf.get("ssl.keystore.location")); + + try (var inputStream1 = Files.newInputStream(trustStorePath); + var inputStream2 = Files.newInputStream(keyStorePath)) { + + KeyStore keyStore = KeyStore.getInstance("PKCS12"); + KeyStore trustStore = KeyStore.getInstance("JKS"); + trustStore.load(inputStream1, conf.get("ssl.truststore.password").toCharArray()); + keyStore.load(inputStream2, conf.get("ssl.keystore.password").toCharArray()); + + KeyManagerFactory keyManagerFactory = + KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm()); + keyManagerFactory.init(keyStore, conf.get("ssl.keystore.password").toCharArray()); + + TrustManagerFactory trustManagerFactory = + TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm()); + trustManagerFactory.init(trustStore); + + var trustManagers = Arrays.asList(trustManagerFactory.getTrustManagers()).stream().map(tm -> { + if (tm instanceof X509TrustManager) { + try { + return new SolrSSLConfig.SolrX509TrustManager((X509TrustManager) tm); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + return tm; + }).toArray(TrustManager[]::new); + + SSLContext sslContext = SSLContext.getInstance("TLS"); + sslContext.init( + keyManagerFactory.getKeyManagers(), + //trustManagerFactory.getTrustManagers(), + trustManagers, + new SecureRandom()); + + return sslContext; + } catch (Exception e) { + throw new RuntimeException(e); } - // no-op } - } - - - - /** - * Shutdown the Kafka consumer by calling wakeup. - */ - public final void shutdown() { - kafkaConsumer.wakeup(); - log.info("Shutdown called on KafkaCrossDcConsumer"); - try { - if (!executor.isShutdown()) { - executor.shutdown(); - executor.awaitTermination(30, TimeUnit.SECONDS); - } - if (!offsetCheckExecutor.isShutdown()) { - offsetCheckExecutor.shutdown(); - offsetCheckExecutor.awaitTermination(30, TimeUnit.SECONDS); - } - solrClient.close(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - log.warn("Interrupted while waiting for executor to shutdown"); - } catch (Exception e) { - log.warn("Exception closing Solr client on shutdown", e); - } finally { - Util.logMetrics(metrics); + + private static SSLConfig getSSLConfig(KafkaCrossDcConf conf) { + return new SolrSSLConfig(true, + true, + conf.get("ssl.keystore.location"), + conf.get("ssl.keystore.password"), + conf.get("ssl.truststore.location"), + conf.get("ssl.truststore.password")); } - } - protected CloudSolrClient createSolrClient(KafkaCrossDcConf conf) { - return new CloudSolrClient.Builder(Collections.singletonList(conf.get(KafkaCrossDcConf.ZK_CONNECT_STRING)), Optional.empty()).build(); - } + public static class SolrSSLConfig extends SSLConfig { + public SolrSSLConfig(boolean useSsl, boolean clientAuth, String keyStore, String keyStorePassword, String trustStore, String trustStorePassword) { + super(useSsl, clientAuth, keyStore, keyStorePassword, trustStore, trustStorePassword); + } + + public static class SolrSSLContextFactory extends SslContextFactory { + @Override + protected TrustManager[] getTrustManagers(KeyStore trustStore, Collection crls) throws Exception { + TrustManager[] managers = null; + if (trustStore != null) { + // Revocation checking is only supported for PKIX algorithm + if (isValidatePeerCerts() && "PKIX".equalsIgnoreCase(getTrustManagerFactoryAlgorithm())) { + PKIXBuilderParameters pbParams = newPKIXBuilderParameters(trustStore, crls); + + TrustManagerFactory trustManagerFactory = getTrustManagerFactoryInstance(); + trustManagerFactory.init(new CertPathTrustManagerParameters(pbParams)); + + managers = trustManagerFactory.getTrustManagers(); + } else { + TrustManagerFactory trustManagerFactory = getTrustManagerFactoryInstance(); + trustManagerFactory.init(trustStore); + + managers = trustManagerFactory.getTrustManagers(); + } + } + + managers = Arrays.asList(managers).stream().map(tm -> { + if (tm instanceof X509TrustManager) { + try { + return new SolrX509TrustManager((X509TrustManager) tm); + } catch (Exception e) { + log.error("Error while returning TrustManagers.", e); + throw new RuntimeException(e); + } + } + return tm; + }).toArray(TrustManager[]::new); + + return managers; + } + } - protected KafkaMirroringSink createKafkaMirroringSink(KafkaCrossDcConf conf) { - return new KafkaMirroringSink(conf); - } + public static class SolrX509TrustManager implements X509TrustManager { + private X509TrustManager trustManager; + public SolrX509TrustManager(X509TrustManager trustManager) { + this.trustManager = trustManager; + } + + @Override + public void checkClientTrusted(X509Certificate[] chain, String authType) throws CertificateException { + trustManager.checkClientTrusted(chain, authType); + } + + @Override + public void checkServerTrusted(X509Certificate[] chain, String authType) throws CertificateException { + trustManager.checkServerTrusted(chain, authType); + } + + @Override + public X509Certificate[] getAcceptedIssuers() { + return null; + } + } + + @Override + public SslContextFactory.Client createClientContextFactory() { + if (!isSSLMode()) { + return null; + } + + SslContextFactory.Client factory = new SolrSSLContextFactory.Client(); + if (getKeyStore() != null) { + factory.setKeyStorePath(getKeyStore()); + } + if (getKeyStorePassword() != null) { + factory.setKeyStorePassword(getKeyStorePassword()); + } + + if (isClientAuthMode()) { + if (getTrustStore() != null) + factory.setTrustStorePath(getTrustStore()); + if (getTrustStorePassword() != null) + factory.setTrustStorePassword(getTrustStorePassword()); + } + + log.info("Disabling HostName Verification."); + factory.setHostnameVerifier(NoopHostnameVerifier.INSTANCE); + return factory; + } + } + + protected KafkaMirroringSink createKafkaMirroringSink(KafkaCrossDcConf conf) { + return new KafkaMirroringSink(conf); + } } diff --git a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/messageprocessor/SolrMessageProcessor.java b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/messageprocessor/SolrMessageProcessor.java index cf0186d..ae65da3 100644 --- a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/messageprocessor/SolrMessageProcessor.java +++ b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/messageprocessor/SolrMessageProcessor.java @@ -189,6 +189,7 @@ private Result processMirroredSolrRequest(SolrRequest reque if (log.isDebugEnabled()) { log.debug("Sending request to Solr at address={} with params {}", connectString, request.getParams()); } + log.info("Sending request to Solr at address={} with params {}", connectString, request.getParams()); Result result; SolrResponseBase response = null; Timer.Context ctx = metrics.timer(MetricRegistry.name(type.name(), "outputTime")).time(); diff --git a/crossdc-producer/build.gradle b/crossdc-producer/build.gradle index 60327f9..f935de5 100644 --- a/crossdc-producer/build.gradle +++ b/crossdc-producer/build.gradle @@ -34,11 +34,11 @@ sourceSets { } dependencies { - implementation project(':crossdc-consumer') implementation project(path: ':crossdc-commons', configuration: 'shadow') provided group: 'org.apache.solr', name: 'solr-core', version: "${solrVersion}" + testImplementation project(':crossdc-consumer') testImplementation 'org.hamcrest:hamcrest:2.2' testImplementation 'junit:junit:4.13.2' testImplementation('org.mockito:mockito-inline:5.1.1') diff --git a/crossdc-producer/src/main/java/org/apache/solr/handler/MirroringSchemaHandler.java b/crossdc-producer/src/main/java/org/apache/solr/handler/MirroringSchemaHandler.java new file mode 100644 index 0000000..69da590 --- /dev/null +++ b/crossdc-producer/src/main/java/org/apache/solr/handler/MirroringSchemaHandler.java @@ -0,0 +1,200 @@ +package org.apache.solr.handler; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.solr.client.solrj.SolrRequest; +import org.apache.solr.common.SolrException; +import org.apache.solr.common.cloud.CollectionProperties; +import org.apache.solr.common.cloud.SolrZkClient; +import org.apache.solr.common.params.CommonParams; +import org.apache.solr.common.params.ModifiableSolrParams; +import org.apache.solr.common.util.NamedList; +import org.apache.solr.core.CloseHook; +import org.apache.solr.core.CoreDescriptor; +import org.apache.solr.core.SolrCore; +import org.apache.solr.crossdc.common.ConfUtil; +import org.apache.solr.crossdc.common.ConfigProperty; +import org.apache.solr.crossdc.common.CrossDcConstants; +import org.apache.solr.crossdc.common.KafkaCrossDcConf; +import org.apache.solr.crossdc.common.KafkaMirroringSink; +import org.apache.solr.crossdc.common.MirroredSolrRequest; +import org.apache.solr.request.SolrQueryRequest; +import org.apache.solr.response.SolrQueryResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.lang.invoke.MethodHandles; +import java.util.HashMap; +import java.util.Locale; +import java.util.Map; + +public class MirroringSchemaHandler extends SchemaHandler { + + private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + private NamedList args; + private KafkaCrossDcConf conf; + private boolean enabled = true; + private KafkaMirroringSink sink; + + private final Map properties = new HashMap<>(); + + @Override + public void init(final NamedList args) { + super.init(args); + + this.args = args; + } + + @Override + public void inform(SolrCore core) { + if (core.getCoreContainer().isZooKeeperAware()) { + lookupPropertyOverridesInZk(core); + } else { + applyArgsOverrides(); + if (enabled) { + throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, getClass().getSimpleName() + " only supported in SolrCloud mode; please disable or remove from solrconfig.xml"); + } + log.warn("Core '{}' was configured to use a disabled {}, but {} is only supported in SolrCloud deployments. A NoOp processor will be used instead", core.getName(), this.getClass().getSimpleName(), this.getClass().getSimpleName()); + } + + if (!enabled) { + return; + } + + ConfUtil.verifyProperties(properties); + + // load the request mirroring sink class and instantiate. + // mirroringHandler = core.getResourceLoader().newInstance(RequestMirroringHandler.class.getName(), KafkaRequestMirroringHandler.class); + + conf = new KafkaCrossDcConf(properties); + + + sink = new KafkaMirroringSink(conf); + + MirroringSchemaHandler.Closer closer = new MirroringSchemaHandler.Closer(sink); + core.addCloseHook(new MirroringSchemaHandler.MyCloseHook(closer)); + } + + private void applyArgsOverrides() { + Boolean enabled = args.getBooleanArg("enabled"); + if (enabled != null && !enabled) { + this.enabled = false; + } + for (ConfigProperty configKey : KafkaCrossDcConf.CONFIG_PROPERTIES) { + String val = args._getStr(configKey.getKey(), null); + if (val != null && !val.isBlank()) { + properties.put(configKey.getKey(), val); + } + } + } + + + private static class MyCloseHook implements CloseHook { + private final MirroringSchemaHandler.Closer closer; + + public MyCloseHook(MirroringSchemaHandler.Closer closer) { + this.closer = closer; + } + + @Override + public void preClose(SolrCore core) { + + } + + @Override + public void postClose(SolrCore core) { + closer.close(); + } + } + + private static class Closer { + private final KafkaMirroringSink sink; + + public Closer(KafkaMirroringSink sink) { + this.sink = sink; + } + + public final void close() { + try { + this.sink.close(); + } catch (IOException e) { + log.error("Exception closing sink", e); + } + } + + } + + private void lookupPropertyOverridesInZk(SolrCore core) { + log.info("Producer startup config properties before adding additional properties from Zookeeper={}", properties); + + try { + SolrZkClient solrZkClient = core.getCoreContainer().getZkController().getZkClient(); + ConfUtil.fillProperties(solrZkClient, properties); + applyArgsOverrides(); + CollectionProperties cp = new CollectionProperties(solrZkClient); + Map collectionProperties = cp.getCollectionProperties(core.getCoreDescriptor().getCollectionName()); + for (ConfigProperty configKey : KafkaCrossDcConf.CONFIG_PROPERTIES) { + String val = collectionProperties.get("crossdc." + configKey.getKey()); + if (val != null && !val.isBlank()) { + properties.put(configKey.getKey(), val); + } + } + String enabledVal = collectionProperties.get("crossdc.enabled"); + if (enabledVal != null) { + if (Boolean.parseBoolean(enabledVal.toString())) { + this.enabled = true; + } else { + this.enabled = false; + } + } + } catch (Exception e) { + log.error("Exception looking for CrossDC configuration in Zookeeper", e); + throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Exception looking for CrossDC configuration in Zookeeper", e); + } + } + + @Override + public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp) throws Exception { + boolean doMirroring = req.getParams().getBool(CrossDcConstants.SHOULD_MIRROR, true); + if (!doMirroring) { + log.info(" -- doMirroring=false, skipping..."); + super.handleRequestBody(req, rsp); + return; + } + + // throw any errors before mirroring + baseHandleRequestBody(req, rsp); + SolrRequest.METHOD method = SolrRequest.METHOD.valueOf(req.getHttpMethod().toUpperCase(Locale.ROOT)); + if (SolrRequest.METHOD.POST.equals(method)) { + if (rsp.getException() != null) { + return; + } + if (sink == null) { + return; + } + CoreDescriptor coreDesc = req.getCore().getCoreDescriptor(); + String collection = coreDesc.getCollectionName(); + if (collection == null) { + throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Could not determine collection name for " + + MirroringSchemaHandler.class.getSimpleName() + ". Solr may not be running in cloud mode."); + } + + // mirror + ModifiableSolrParams mirroredParams = ModifiableSolrParams.of(req.getParams()); + mirroredParams.set("collection", collection); + // remove internal version parameter + mirroredParams.remove(CommonParams.VERSION_FIELD); + // make sure to turn this off to prevent looping + mirroredParams.set(CrossDcConstants.SHOULD_MIRROR, Boolean.FALSE.toString()); + log.info(" -- mirroring mirroredParams={}, original responseHeader={}, responseValues={}", mirroredParams, rsp.getResponseHeader(), rsp.getValues()); + + MirroredSolrRequest.MirroredSchemaRequest schemaRequest = new MirroredSolrRequest.MirroredSchemaRequest(method, mirroredParams); + sink.submit(new MirroredSolrRequest(MirroredSolrRequest.Type.SCHEMA, schemaRequest)); + } + } + + @VisibleForTesting + public void baseHandleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp) throws Exception { + super.handleRequestBody(req, rsp); + } +} diff --git a/crossdc-producer/src/main/java/org/apache/solr/update/processor/KafkaRequestMirroringHandler.java b/crossdc-producer/src/main/java/org/apache/solr/update/processor/KafkaRequestMirroringHandler.java index 9141a76..a33f227 100644 --- a/crossdc-producer/src/main/java/org/apache/solr/update/processor/KafkaRequestMirroringHandler.java +++ b/crossdc-producer/src/main/java/org/apache/solr/update/processor/KafkaRequestMirroringHandler.java @@ -49,6 +49,7 @@ public void mirror(UpdateRequest request) throws MirroringException { } // TODO: Enforce external version constraint for consistent update replication (cross-cluster) final MirroredSolrRequest mirroredRequest = new MirroredSolrRequest(MirroredSolrRequest.Type.UPDATE, 1, request, TimeUnit.MILLISECONDS.toNanos(System.currentTimeMillis())); + log.info("submit update to sink mirroredRequest={}", mirroredRequest); try { sink.submit(mirroredRequest); } catch (MirroringException exception) { diff --git a/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateProcessor.java b/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateProcessor.java index eb22b87..9a329da 100644 --- a/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateProcessor.java +++ b/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateProcessor.java @@ -117,6 +117,7 @@ UpdateRequest createMirrorRequest() { final SolrInputDocument doc = cmd.getSolrInputDocument().deepCopy(); doc.removeField(CommonParams.VERSION_FIELD); // strip internal doc version final long estimatedDocSizeInBytes = ObjectSizeEstimator.estimate(doc); + log.info("Adding doc={}", doc); if (log.isDebugEnabled()) { log.debug("estimated doc size is {} bytes, max size is {}", estimatedDocSizeInBytes, maxMirroringDocSizeBytes); } diff --git a/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateRequestProcessorFactory.java b/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateRequestProcessorFactory.java index 96e8b85..44fa4a2 100644 --- a/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateRequestProcessorFactory.java +++ b/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateRequestProcessorFactory.java @@ -251,6 +251,8 @@ public UpdateRequestProcessor getInstance(final SolrQueryRequest req, final Solr log.trace("Create MirroringUpdateProcessor with mirroredParams={}", mirroredParams); } + log.info("Create MirroringUpdateProcessor with mirroredParams={}", mirroredParams); + return new MirroringUpdateProcessor(next, doMirroring, indexUnmirrorableDocs, mirrorCommits, expandDbq, maxMirroringBatchSizeBytes, mirroredParams, DistribPhase.parseParam(req.getParams().get(DISTRIB_UPDATE_PARAM)), doMirroring ? mirroringHandler : null, producerMetrics); } diff --git a/gradle.properties b/gradle.properties index c18c681..99e3919 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,2 +1,2 @@ -solrVersion=9.2.0 +solrVersion=9.5.0 kafkaVersion=3.5.1