From 75a28432cdf674c614aba2feb5942671732421ac Mon Sep 17 00:00:00 2001
From: Tiihott <48@teragrep.com>
Date: Tue, 5 Dec 2023 12:12:46 +0200
Subject: [PATCH 001/146] Initial setup for Kafka consumer groups.
---
pom.xml | 67 +++++++
src/main/java/com/teragrep/cfe_39/Config.java | 161 +++++++++++++++++
.../consumers/kafka/DatabaseOutput.java | 117 +++++++++++++
.../consumers/kafka/KafkaController.java | 159 +++++++++++++++++
.../consumers/kafka/KafkaReaderTemp.java | 63 +++++++
.../kafka/MockKafkaConsumerFactoryTemp.java | 163 ++++++++++++++++++
.../consumers/kafka/ReadCoordinatorTemp.java | 71 ++++++++
.../cfe_39/metrics/DurationStatistics.java | 81 +++++++++
.../cfe_39/metrics/RuntimeStatistics.java | 48 ++++++
.../cfe_39/metrics/mxj/AtomicLongMXJItem.java | 43 +++++
.../cfe_39/metrics/mxj/MXJBeanDynamizer.java | 29 ++++
.../cfe_39/metrics/mxj/MXJEndpoint.java | 40 +++++
.../teragrep/cfe_39/metrics/mxj/MXJItem.java | 14 ++
.../cfe_39/metrics/topic/TopicCounter.java | 69 ++++++++
.../cfe_39/metrics/topic/TopicStatistics.java | 59 +++++++
15 files changed, 1184 insertions(+)
create mode 100644 pom.xml
create mode 100644 src/main/java/com/teragrep/cfe_39/Config.java
create mode 100644 src/main/java/com/teragrep/cfe_39/consumers/kafka/DatabaseOutput.java
create mode 100644 src/main/java/com/teragrep/cfe_39/consumers/kafka/KafkaController.java
create mode 100644 src/main/java/com/teragrep/cfe_39/consumers/kafka/KafkaReaderTemp.java
create mode 100644 src/main/java/com/teragrep/cfe_39/consumers/kafka/MockKafkaConsumerFactoryTemp.java
create mode 100644 src/main/java/com/teragrep/cfe_39/consumers/kafka/ReadCoordinatorTemp.java
create mode 100644 src/main/java/com/teragrep/cfe_39/metrics/DurationStatistics.java
create mode 100644 src/main/java/com/teragrep/cfe_39/metrics/RuntimeStatistics.java
create mode 100644 src/main/java/com/teragrep/cfe_39/metrics/mxj/AtomicLongMXJItem.java
create mode 100644 src/main/java/com/teragrep/cfe_39/metrics/mxj/MXJBeanDynamizer.java
create mode 100644 src/main/java/com/teragrep/cfe_39/metrics/mxj/MXJEndpoint.java
create mode 100644 src/main/java/com/teragrep/cfe_39/metrics/mxj/MXJItem.java
create mode 100644 src/main/java/com/teragrep/cfe_39/metrics/topic/TopicCounter.java
create mode 100644 src/main/java/com/teragrep/cfe_39/metrics/topic/TopicStatistics.java
diff --git a/pom.xml b/pom.xml
new file mode 100644
index 00000000..6f9c7eec
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,67 @@
+
+
+ 4.0.0
+ com.teragrep
+ cfe_39
+ jar
+ ${revision}${sha1}${changelist}
+ cfe_39
+
+ UTF-8
+ 1.8
+ 1.8
+ 1.8
+ 0.0.1
+ -SNAPSHOT
+
+
+
+
+
+
+
+
+ com.teragrep
+ rlo_06
+ 9.0.1
+
+
+ com.teragrep
+ rlo_09
+ 2.0.4
+
+
+ org.apache.avro
+ avro
+ 1.11.3
+
+
+ org.apache.logging.log4j
+ log4j-slf4j2-impl
+ 2.20.0
+
+
+ org.apache.logging.log4j
+ log4j-core
+ 2.19.0
+
+
+ org.apache.logging.log4j
+ log4j-slf4j2-impl
+ 2.20.0
+
+
+ org.slf4j
+ slf4j-api
+ 2.0.7
+
+
+
+
+ scrapyard.xnet.fi-teragrep-releases
+ https://scrapyard.xnet.fi/repository/teragrep-releases/
+
+
+
\ No newline at end of file
diff --git a/src/main/java/com/teragrep/cfe_39/Config.java b/src/main/java/com/teragrep/cfe_39/Config.java
new file mode 100644
index 00000000..17816d11
--- /dev/null
+++ b/src/main/java/com/teragrep/cfe_39/Config.java
@@ -0,0 +1,161 @@
+package com.teragrep.cfe_39;
+
+import org.apache.logging.log4j.core.config.ConfigurationSource;
+import org.apache.logging.log4j.core.config.Configurator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Enumeration;
+import java.util.Properties;
+
+public class Config {
+
+ // db
+ private final String dbConnectionUrl;
+ private final String dbUsername;
+ private final String dbPassword;
+
+ // kafka
+ private final String queueTopicPattern;
+
+ private final Properties kafkaConsumerProperties;
+
+ private final boolean replicationEnabled;
+ private final int streamSize;
+ private final String streamUnits;
+ private static final Logger LOGGER = LoggerFactory.getLogger(Config.class);
+ private final int dropPartitionsOlderThanHours;
+ private final int createPartitionsInAdvanceHours;
+ private final boolean overrideTableLocation;
+ private final String tableLocation;
+
+ Config() throws IOException {
+ Properties properties = new Properties();
+ Path configPath = Paths.get(System.getProperty("cfe_30.config.location", System.getProperty("user.dir") + "/etc/application.properties"));
+ LOGGER.info("Loading application config '" + configPath.toAbsolutePath() + "'");
+ properties.load(Files.newInputStream(configPath));
+ LOGGER.debug("Got configuration: " + properties);
+
+ // db
+ this.dbConnectionUrl = properties.getProperty("db.connectionUrl");
+ if (this.dbConnectionUrl == null) {
+ throw new IllegalArgumentException("db.connectionUrl not set");
+ }
+ this.dbUsername = properties.getProperty("db.username");
+ if (this.dbUsername == null) {
+ throw new IllegalArgumentException("db.username not set");
+ }
+ this.dbPassword = properties.getProperty("db.password");
+ if (this.dbPassword == null) {
+ throw new IllegalArgumentException("db.password not set");
+ }
+
+ String replicationEnabledString = properties.getProperty("db.replicationEnabled", "false");
+ this.replicationEnabled = Boolean.parseBoolean(replicationEnabledString);
+
+ String streamBytesString = properties.getProperty("db.streamSize", "512");
+ this.streamSize = Integer.parseInt(streamBytesString);
+ this.streamUnits = properties.getProperty("db.streamUnits", "rows");
+ this.dropPartitionsOlderThanHours = Integer.parseInt(properties.getProperty("db.dropPartitionsOlderThanHours", "4"));
+ if(dropPartitionsOlderThanHours <= 0) {
+ throw new IllegalArgumentException("db.dropPartitionsOlderThanHours must be set to >0, got " + dropPartitionsOlderThanHours);
+ }
+ this.createPartitionsInAdvanceHours = Integer.parseInt(properties.getProperty("db.createPartitionsInAdvanceHours", "8"));
+ if(createPartitionsInAdvanceHours <= 0) {
+ throw new IllegalArgumentException("createPartitionsInAdvanceHours must be set to >0, got " + createPartitionsInAdvanceHours);
+ }
+
+ String overrideTableLocationString = properties.getProperty("db.overrideTableLocation", "false");
+ this.overrideTableLocation = Boolean.parseBoolean(overrideTableLocationString);
+ this.tableLocation = properties.getProperty("db.tableLocation");
+ if(overrideTableLocation && tableLocation.isEmpty()) {
+ throw new IllegalArgumentException("db.tableLocation resulted in empty string when db.overrideTableLocation was true");
+ }
+
+ // kafka
+ this.queueTopicPattern = properties.getProperty("queueTopicPattern", "^.*$");
+
+ this.kafkaConsumerProperties = loadSubProperties(properties, "consumer.");
+ String loginConfig = properties.getProperty("java.security.auth.login.config");
+ if(loginConfig == null) {
+ throw new IOException("Property java.security.auth.login.config does not exist");
+ }
+ if(!(new File(loginConfig)).isFile()) {
+ throw new IOException("File '" + loginConfig + "' set by java.security.auth.login.config does not exist");
+ }
+ System.setProperty("java.security.auth.login.config", loginConfig);
+
+ // Just for loggers to work
+ Path log4j2Config = Paths.get(properties.getProperty("log4j2.configurationFile", System.getProperty("user.dir") + "/etc/log4j2.properties"));
+ LOGGER.info("Loading log4j2 config from '" + log4j2Config.toRealPath() + "'");
+ Configurator.reconfigure(log4j2Config.toUri());
+ }
+
+ private Properties loadSubProperties(Properties properties, String prefix) {
+ Properties subProperties = new Properties();
+
+ Enumeration