diff --git a/.github/ISSUE_TEMPLATE/bug_report.md b/.github/ISSUE_TEMPLATE/bug_report.md
index 19da5283..e889853a 100644
--- a/.github/ISSUE_TEMPLATE/bug_report.md
+++ b/.github/ISSUE_TEMPLATE/bug_report.md
@@ -9,23 +9,23 @@ assignees: ''
**Describe the bug**
-
+
**Expected behavior**
-
+
**How to reproduce**
-
+
**Screenshots**
-
+
**Software version**
-
+
**Desktop (please complete the following information if relevant):**
- - OS:
- - Browser:
- - Version:
-
+- OS:
+- Browser:
+- Version:
+
**Additional context**
-
+
\ No newline at end of file
diff --git a/.github/ISSUE_TEMPLATE/config.yml b/.github/ISSUE_TEMPLATE/config.yml
index 16962298..19d7f20b 100644
--- a/.github/ISSUE_TEMPLATE/config.yml
+++ b/.github/ISSUE_TEMPLATE/config.yml
@@ -5,7 +5,7 @@ contact_links:
about: Problems with Teragrep documentation
- name: Ask a question or get support
url: https://github.com/teragrep/cfe_39/discussions
- about: Ask a question or request support
+ about: Ask a question or request support
- name: Report vulnerability
url: https://github.com/teragrep/teragrep/security/advisories/new
- about: Privately report a security vulnerability
+ about: Privately report a security vulnerability
\ No newline at end of file
diff --git a/.github/ISSUE_TEMPLATE/feature_requests.md b/.github/ISSUE_TEMPLATE/feature_requests.md
new file mode 100644
index 00000000..501c73ec
--- /dev/null
+++ b/.github/ISSUE_TEMPLATE/feature_requests.md
@@ -0,0 +1,20 @@
+---
+name: Feature request
+about: Suggest an idea for this project
+title: ''
+labels: enhancement
+assignees: ''
+
+---
+
+**Description**
+
+
+**Use case or motivation behind the feature request**
+
+
+**Related issues**
+
+
+**Additional context**
+
\ No newline at end of file
diff --git a/.github/ISSUE_TEMPLATE/tasks-and-meta.md b/.github/ISSUE_TEMPLATE/tasks-and-meta.md
index 17546e49..8346c565 100644
--- a/.github/ISSUE_TEMPLATE/tasks-and-meta.md
+++ b/.github/ISSUE_TEMPLATE/tasks-and-meta.md
@@ -8,4 +8,4 @@ assignees: ''
---
**Description**
-
+
\ No newline at end of file
diff --git a/.github/workflows/upload_release.yaml b/.github/workflows/upload_release.yaml
new file mode 100644
index 00000000..4138e06f
--- /dev/null
+++ b/.github/workflows/upload_release.yaml
@@ -0,0 +1,43 @@
+name: Upload Release
+
+on:
+ release:
+ types: [published]
+
+jobs:
+ upload:
+ name: Upload
+ runs-on: ubuntu-latest
+ permissions:
+ contents: write
+
+ steps:
+ - uses: actions/checkout@v3
+ with:
+ fetch-depth: 0
+
+ - name: Set up JDK 8
+ uses: actions/setup-java@v3
+ with:
+ java-version: '8'
+ distribution: 'temurin'
+ server-id: github
+ settings-path: ${{ github.workspace }}
+
+
+ - name: Package jar
+ run: mvn --batch-mode -Drevision=${{ github.event.release.tag_name }} -Dsha1= -Dchangelist= clean package
+ env:
+ GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
+
+ - name: Package rpm
+ run: cd rpm/ && mvn --batch-mode -Drevision=${{ github.event.release.tag_name }} -Dsha1= -Dchangelist= -f rpm.pom.xml package
+ env:
+ GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
+
+ - name: Attach rpm to release
+ uses: softprops/action-gh-release@v1
+ with:
+ files: |
+ rpm/target/rpm/com.teragrep-cfe_39/RPMS/noarch/com.teragrep-cfe_39-*.noarch.rpm
+ target/cfe_39-*-jar-with-dependencies.jar
\ No newline at end of file
diff --git a/.gitignore b/.gitignore
index 2f435308..cc5ebc0c 100644
--- a/.gitignore
+++ b/.gitignore
@@ -15,3 +15,5 @@ buildNumber.properties
.project
# JDT-specific (Eclipse Java Development Tools)
.classpath
+
+src/main/java/com/teragrep/cfe_39/avro/SyslogRecord.java
\ No newline at end of file
diff --git a/README.adoc b/README.adoc
new file mode 100644
index 00000000..4bd6c77a
--- /dev/null
+++ b/README.adoc
@@ -0,0 +1,41 @@
+
+# CFE_39
+
+This is a HDFS Data Ingestion module for PTH_06 use.
+
+## Features
+
+Implements almost real-time datasource that allows reading latest data from Kafka (last few records), semi-latest data from HDFS (Last Few Days) and old data from S3 Archive.
+
+## Documentation
+
+See the official documentation on https://docs.teragrep.com[docs.teragrep.com].
+
+## How to [compile/use/implement]
+
+`mvn clean package`
+
+application.properties, config.jaas and log4j2.properties files have to be created to use this module.
+By default, application.properties file must be placed in /opt/teragrep/cfe_39/etc/ directory.
+The application.properties is used to define the directory where the other files must be placed.
+
+Example configuration files available in cfe_39/rpm/resources/ directory.
+
+## Contributing
+
+You can involve yourself with our project by https://github.com/teragrep/cfe_39/issues/new/choose[opening an issue] or submitting a pull request.
+
+Contribution requirements:
+
+. *All changes must be accompanied by a new or changed test.* If you think testing is not required in your pull request, include a sufficient explanation as why you think so.
+. Security checks must pass
+. Pull requests must align with the principles and http://www.extremeprogramming.org/values.html[values] of extreme programming.
+. Pull requests must follow the principles of Object Thinking and Elegant Objects (EO).
+
+Read more in our https://github.com/teragrep/teragrep/blob/main/contributing.adoc[Contributing Guideline].
+
+### Contributor License Agreement
+
+Contributors must sign https://github.com/teragrep/teragrep/blob/main/cla.adoc[Teragrep Contributor License Agreement] before a pull request is accepted to organization's repositories.
+
+You need to submit the CLA only once. After submitting the CLA you can contribute to all Teragrep's repositories.
\ No newline at end of file
diff --git a/eclipse-java-formatter.xml b/eclipse-java-formatter.xml
new file mode 100644
index 00000000..1e4e9905
--- /dev/null
+++ b/eclipse-java-formatter.xml
@@ -0,0 +1,450 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/license-header b/license-header
new file mode 100644
index 00000000..d14a1f51
--- /dev/null
+++ b/license-header
@@ -0,0 +1,45 @@
+/*
+ * HDFS Data Ingestion for PTH_06 use CFE-39
+ * Copyright (C) 2021-2024 Suomen Kanuuna Oy
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ *
+ *
+ * Additional permission under GNU Affero General Public License version 3
+ * section 7
+ *
+ * If you modify this Program, or any covered work, by linking or combining it
+ * with other code, such other code is not for that reason alone subject to any
+ * of the requirements of the GNU Affero GPL version 3 as long as this Program
+ * is the same Program as licensed from Suomen Kanuuna Oy without any additional
+ * modifications.
+ *
+ * Supplemented terms under GNU Affero General Public License version 3
+ * section 7
+ *
+ * Origin of the software must be attributed to Suomen Kanuuna Oy. Any modified
+ * versions must be marked as "Modified version of" The Program.
+ *
+ * Names of the licensors and authors may not be used for publicity purposes.
+ *
+ * No rights are granted for use of trade names, trademarks, or service marks
+ * which are in The Program if any.
+ *
+ * Licensee must indemnify licensors and authors for any liability that these
+ * contractual assumptions impose on licensors and authors.
+ *
+ * To the extent this program is licensed as part of the Commercial versions of
+ * Teragrep, the applicable Commercial License may apply to this file if you as
+ * a licensee so wish it.
+ */
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
new file mode 100644
index 00000000..bda84206
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,385 @@
+
+
+ 4.0.0
+ com.teragrep
+ cfe_39
+ ${revision}${sha1}${changelist}
+ jar
+ cfe_39
+
+ -SNAPSHOT
+ 3.3.6
+ 1.8
+ 1.8
+ 1.8
+ 4.2.8
+ UTF-8
+ 0.16.0
+ 0.0.1
+
+
+
+
+ io.dropwizard.metrics
+ metrics-core
+ ${metrics.version}
+
+
+ io.dropwizard.metrics
+ metrics-jmx
+ ${metrics.version}
+
+
+ io.prometheus
+ simpleclient
+ ${prometheus-simpleclient.version}
+
+
+ io.prometheus
+ simpleclient_dropwizard
+ ${prometheus-simpleclient.version}
+
+
+ io.prometheus
+ simpleclient_servlet
+ ${prometheus-simpleclient.version}
+
+
+ io.prometheus
+ simpleclient_hotspot
+ ${prometheus-simpleclient.version}
+
+
+ org.junit.jupiter
+ junit-jupiter-engine
+ 5.7.1
+ test
+
+
+ org.junit.platform
+ junit-platform-launcher
+ 1.7.1
+ test
+
+
+ org.junit.jupiter
+ junit-jupiter-api
+ 5.7.1
+ test
+
+
+ org.junit.jupiter
+ junit-jupiter
+ 5.7.1
+ test
+
+
+ com.teragrep
+ rlo_06
+ 9.0.1
+
+
+
+ org.apache.kafka
+ kafka-clients
+ 3.4.0
+
+
+
+ org.xerial.snappy
+ snappy-java
+ 1.1.10.5
+
+
+ org.apache.avro
+ avro
+ 1.11.3
+
+
+
+ org.apache.hadoop
+ hadoop-client
+ ${hadoop.version}
+
+
+
+ org.apache.hadoop
+ hadoop-minicluster
+ ${hadoop.version}
+ test
+
+
+
+ org.apache.hadoop
+ hadoop-common
+ ${hadoop.version}
+
+
+
+ org.apache.hadoop
+ hadoop-hdfs
+ ${hadoop.version}
+ test
+
+
+
+ org.mockito
+ mockito-core
+ 4.11.0
+ test
+
+
+
+ org.apache.logging.log4j
+ log4j-slf4j2-impl
+ 2.20.0
+
+
+ org.apache.logging.log4j
+ log4j-core
+ 2.20.0
+
+
+ org.slf4j
+ slf4j-api
+ 2.0.7
+
+
+
+ ${project.artifactId}-${revision}${changelist}${sha1}
+
+
+ org.apache.rat
+ apache-rat-plugin
+ 0.16.1
+ false
+
+ false
+ false
+
+
+ Teragrep
+ Affero General Public License v3
+
+
+
+
+
+
+ Suomen Kanuuna Oy
+ 2024
+
+ HDFS Data Ingestion for PTH_06 use CFE-39
+
+ Teragrep
+
+
+ true
+ false
+
+
+ .git/**
+ .gitattributes
+ .gitignore
+ .gitmodules
+
+ .github/workflows/*
+ .github/ISSUE_TEMPLATE/*
+ toolchains.xml
+ settings.xml
+
+ pom.xml
+ eclipse-java-formatter.xml
+
+ README.adoc
+
+ license-header
+ src/main/avro/KafkaRecord.avsc
+ src/main/assembly/jar-with-dependencies.xml
+ src/test/resources/broken.application.properties
+ src/test/resources/valid.application.properties
+ src/test/resources/failProcessing.application.properties
+ rpm/resources/config.jaas
+ rpm/resources/log4j2.properties
+ rpm/resources/application.properties
+ rpm/resources/cfe_39.service
+ rpm/rpm.pom.xml
+ src/main/java/com/teragrep/cfe_39/avro/SyslogRecord.java
+
+
+
+
+
+ check
+
+ test
+
+
+
+
+ org.apache.avro
+ avro-maven-plugin
+ 1.11.3
+
+
+
+ schema
+
+ generate-sources
+
+ ${project.basedir}/src/main/avro/
+ ${project.basedir}/src/main/java/
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+ 3.12.1
+
+ -Xlint:all
+ ${java.version}
+ ${java.version}
+
+
+
+ org.apache.maven.plugins
+ maven-jar-plugin
+ 3.3.0
+
+
+
+ true
+ true
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-assembly-plugin
+ 3.6.0
+
+
+ src/main/assembly/jar-with-dependencies.xml
+
+
+
+ com.teragrep.cfe_39.Main
+ true
+
+
+
+
+
+ make-assembly
+
+ single
+
+ package
+
+
+
+
+ org.apache.maven.plugins
+ maven-enforcer-plugin
+ 3.4.1
+
+
+ enforce-maven
+
+ enforce
+
+
+
+
+ 3.2.5
+
+
+
+
+
+ enforce
+ none
+
+
+
+
+ com.diffplug.spotless
+ spotless-maven-plugin
+ 2.30.0
+
+
+
+ ${project.basedir}/eclipse-java-formatter.xml
+ 4.10.0
+
+
+
+ ${project.basedir}/license-header
+
+
+ src/main/java/com/teragrep/cfe_39/avro/SyslogRecord.java
+
+
+
+
+
+ UTF-8
+ \n
+ true
+ false
+ 2
+ recommended_2008_06
+ true
+ true
+ true
+
+
+
+
+
+ .gitattributes
+ .gitignore
+
+
+
+
+ true
+ 4
+
+
+
+
+
+
+
+ check
+
+ compile
+
+
+
+
+ org.jacoco
+ jacoco-maven-plugin
+ 0.8.12
+
+
+
+ prepare-agent
+
+
+
+ report
+
+ report
+
+ prepare-package
+
+
+
+
+
+
diff --git a/rpm/resources/application.properties b/rpm/resources/application.properties
new file mode 100644
index 00000000..fc5100cb
--- /dev/null
+++ b/rpm/resources/application.properties
@@ -0,0 +1,52 @@
+# Kafka security configuration file
+java.security.auth.login.config=/opt/teragrep/cfe_39/etc/config.jaas
+# Logger settings
+log4j2.configurationFile=/opt/teragrep/cfe_39/etc/log4j2.properties
+# What topics are searched from kafka, regex
+queueTopicPattern=^testConsumerTopic-*$
+# Number of consumers created to the consumer groups
+numOfConsumers=2
+# Kafka bootstrap servers
+consumer.bootstrap.servers=test
+# Offset, should not be touched
+consumer.auto.offset.reset=earliest
+# Autocommit, should not be touched
+consumer.enable.auto.commit=false
+# Consumer group id, this is to track the progress of reading hte topic
+consumer.group.id=cfe_39
+# Used security protocol and mechanism
+consumer.security.protocol=SASL_PLAINTEXT
+consumer.sasl.mechanism=PLAIN
+# Maximum records per batch, note that too big number will cause massive load and can cause timeouts to trigger
+consumer.max.poll.records=500
+# How much data can be fetched in one go
+consumer.fetch.max.bytes=1073741820
+# How long for request before timing out. Note that too big max poll records size can cause this to trigger
+consumer.request.timeout.ms=300000
+consumer.max.poll.interval.ms=300000
+# For testing only, remove for prod.
+consumer.useMockKafkaConsumer=true
+# Directory where AVRO files are constructed for HDFS
+queueDirectory=/opt/teragrep/cfe_39/etc/AVRO/
+# The maximum file size for AVRO-files that are to be stored in HDFS database.
+maximumFileSize=60800000
+# Boolean for deciding if records not in RFC5424 should be skipped or not.
+skipNonRFC5424Records=true
+# Boolean for deciding if empty RFC5424 records should be skipped or not.
+skipEmptyRFC5424Records=true
+# HDFS pruning offset, prunes files older than the given milliseconds.
+pruneOffset=172800000
+# HDFS uri
+hdfsuri=hdfs://localhost:45937/
+# Kerberos
+java.security.krb5.kdc=test
+java.security.krb5.realm=test
+hadoop.security.authentication=test
+hadoop.security.authorization=test
+dfs.namenode.kerberos.principal.pattern=test
+KerberosKeytabUser=test
+KerberosKeytabPath=test
+dfs.client.use.datanode.hostname=false
+kerberosLoginAutorenewal=true
+dfs.data.transfer.protection=test
+dfs.encrypt.data.transfer.cipher.suites=test
\ No newline at end of file
diff --git a/rpm/resources/cfe_39.service b/rpm/resources/cfe_39.service
new file mode 100644
index 00000000..6bb58d56
--- /dev/null
+++ b/rpm/resources/cfe_39.service
@@ -0,0 +1,12 @@
+[Unit]
+Description=com.teragrep.cfe_39
+ConditionPathExists=/opt/teragrep/cfe_39/lib/cfe_39.jar
+
+[Service]
+ExecStart=/usr/lib/jvm/jre-1.8.0-openjdk/bin/java -jar /opt/teragrep/cfe_39/lib/cfe_39.jar
+User=srv-cfe_39
+WorkingDirectory=/opt/teragrep/cfe_39
+
+[Install]
+WantedBy=multi-user.target
+
diff --git a/rpm/resources/config.jaas b/rpm/resources/config.jaas
new file mode 100644
index 00000000..045b8540
--- /dev/null
+++ b/rpm/resources/config.jaas
@@ -0,0 +1,9 @@
+KafkaServer {
+ org.apache.kafka.common.security.plain.PlainLoginModule required
+ username="admin"
+ password="admin"
+ user_admin="admin"
+ user_alice="alice"
+ user_bob="bob"
+ user_charlie="charlie";
+};
\ No newline at end of file
diff --git a/rpm/resources/log4j2.properties b/rpm/resources/log4j2.properties
new file mode 100644
index 00000000..9ec3d8ec
--- /dev/null
+++ b/rpm/resources/log4j2.properties
@@ -0,0 +1,10 @@
+appender.console.type = Console
+appender.console.name = ConsoleLogger
+appender.console.layout.type = PatternLayout
+appender.console.layout.pattern = %d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n
+logging.level.org.apache.kafka=WARN
+logging.level.io.confluent.kafka=WARN
+rootLogger.level = INFO
+rootLogger.appenderRef.stdout.ref = ConsoleLogger
+logger.kafka.name = org.apache.kafka
+logger.kafka.level = warn
\ No newline at end of file
diff --git a/rpm/rpm.pom.xml b/rpm/rpm.pom.xml
new file mode 100644
index 00000000..fa4fb42a
--- /dev/null
+++ b/rpm/rpm.pom.xml
@@ -0,0 +1,162 @@
+
+
+ rpm
+ 4.0.0
+ cfe_39
+ ${revision}${sha1}${changelist}
+ cfe_39
+ cfe_39
+ com.teragrep
+
+ UTF-8
+ 1.8
+ 1.8
+ 1.8
+ 0.0.1
+ -SNAPSHOT
+
+
+
+ ${project.basedir}/target
+
+
+ maven-enforcer-plugin
+ 3.4.1
+
+
+ enforce
+ none
+
+
+ enforce-maven
+
+ enforce
+
+
+
+
+ 3.2.5
+
+
+
+
+
+
+
+ org.codehaus.mojo
+ rpm-maven-plugin
+ 2.2.0
+ true
+
+
+ default-rpm
+
+ rpm
+
+ package
+
+
+
+ ${project.groupId}-${project.artifactId}
+ ${project.groupId}-${project.artifactId}
+ ${project.version}
+ ${env.BUILD_ID}
+ Proprietary
+ teragrep Log Management Suite
+ https://teragrep.com/
+ teragrep <servicedesk@teragrep.com>
+ teragrep/LogManagementSuite
+ false
+ srv-cfe_39
+ srv-cfe_39
+ 0644
+ 0755
+
+ _build_id_links none
+ __provides_exclude ^osgi\\(.*$
+ __requires_exclude ^osgi\\(.*$
+
+
+
+ /opt/teragrep/${project.artifactId}/lib
+ true
+ 755
+ 755
+ srv-cfe_39
+ srv-cfe_39
+ true
+
+
+ ${project.basedir}/../target/cfe_39-${revision}${sha1}${changelist}-jar-with-dependencies.jar
+ cfe_39.jar
+
+
+
+
+ /opt/teragrep/${project.artifactId}/etc
+ true
+ noreplace
+
+
+ ${project.basedir}/../rpm/resources/application.properties
+
+
+ ${project.basedir}/../rpm/resources/config.jaas
+
+
+ ${project.basedir}/../rpm/resources/log4j2.properties
+
+
+
+
+ /usr/lib/systemd/system
+ false
+
+
+ ${project.basedir}/resources/cfe_39.service
+
+
+
+
+
+ java-1.8.0-openjdk
+
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-deploy-plugin
+ 3.1.1
+
+ true
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+ 3.12.1
+
+
+ default-compile
+ compile
+
+ compile
+
+
+ true
+
+
+
+
+
+
+
diff --git a/src/main/assembly/jar-with-dependencies.xml b/src/main/assembly/jar-with-dependencies.xml
new file mode 100644
index 00000000..ff9c3a29
--- /dev/null
+++ b/src/main/assembly/jar-with-dependencies.xml
@@ -0,0 +1,21 @@
+
+ jar-with-dependencies
+
+ jar
+
+ false
+
+
+ metaInf-services
+
+
+
+
+ true
+ true
+ runtime
+
+
+
\ No newline at end of file
diff --git a/src/main/avro/KafkaRecord.avsc b/src/main/avro/KafkaRecord.avsc
new file mode 100644
index 00000000..2b55cad6
--- /dev/null
+++ b/src/main/avro/KafkaRecord.avsc
@@ -0,0 +1,15 @@
+{"namespace": "com.teragrep.cfe_39.avro",
+ "type": "record",
+ "name": "SyslogRecord",
+ "fields": [
+ {"name": "timestamp", "type": "long"},
+ {"name": "directory", "type": "string"},
+ {"name": "stream", "type": "string"},
+ {"name": "host", "type": "string"},
+ {"name": "input", "type": "string"},
+ {"name": "partition", "type": "string"},
+ {"name": "offset", "type": "long"},
+ {"name": "origin", "type": "string"},
+ {"name": "payload", "type": "string"}
+ ]
+}
\ No newline at end of file
diff --git a/src/main/java/com/teragrep/cfe_39/Config.java b/src/main/java/com/teragrep/cfe_39/Config.java
new file mode 100644
index 00000000..c29e1ed9
--- /dev/null
+++ b/src/main/java/com/teragrep/cfe_39/Config.java
@@ -0,0 +1,296 @@
+/*
+ * HDFS Data Ingestion for PTH_06 use CFE-39
+ * Copyright (C) 2021-2024 Suomen Kanuuna Oy
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ *
+ *
+ * Additional permission under GNU Affero General Public License version 3
+ * section 7
+ *
+ * If you modify this Program, or any covered work, by linking or combining it
+ * with other code, such other code is not for that reason alone subject to any
+ * of the requirements of the GNU Affero GPL version 3 as long as this Program
+ * is the same Program as licensed from Suomen Kanuuna Oy without any additional
+ * modifications.
+ *
+ * Supplemented terms under GNU Affero General Public License version 3
+ * section 7
+ *
+ * Origin of the software must be attributed to Suomen Kanuuna Oy. Any modified
+ * versions must be marked as "Modified version of" The Program.
+ *
+ * Names of the licensors and authors may not be used for publicity purposes.
+ *
+ * No rights are granted for use of trade names, trademarks, or service marks
+ * which are in The Program if any.
+ *
+ * Licensee must indemnify licensors and authors for any liability that these
+ * contractual assumptions impose on licensors and authors.
+ *
+ * To the extent this program is licensed as part of the Commercial versions of
+ * Teragrep, the applicable Commercial License may apply to this file if you as
+ * a licensee so wish it.
+ */
+package com.teragrep.cfe_39;
+
+import org.apache.logging.log4j.core.config.Configurator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Enumeration;
+import java.util.Properties;
+
+public class Config {
+
+ private final String queueTopicPattern;
+ private final Properties kafkaConsumerProperties;
+ private static final Logger LOGGER = LoggerFactory.getLogger(Config.class);
+ private final String hdfsPath;
+ private String hdfsuri;
+ private final String queueDirectory;
+ private final String kerberosHost;
+ private final String kerberosRealm;
+ private final String kerberosPrincipal;
+ private final String hadoopAuthentication;
+ private final String hadoopAuthorization;
+ private final String kerberosKeytabUser;
+ private final String kerberosKeytabPath;
+ private final String kerberosLoginAutorenewal;
+ private final String kerberosTestMode;
+ private long maximumFileSize;
+ private final int numOfConsumers;
+ private final long pruneOffset;
+ private final boolean skipNonRFC5424Records;
+ private final boolean skipEmptyRFC5424Records;
+ private final String dfsDataTransferProtection;
+ private final String dfsEncryptDataTransferCipherSuites;
+
+ public Config() throws IOException {
+ Properties properties = new Properties();
+ Path configPath = Paths
+ .get(System.getProperty("cfe_39.config.location", "/opt/teragrep/cfe_39/etc/application.properties"));
+ LOGGER.info("Loading application config <[{}]>", configPath.toAbsolutePath());
+
+ try (InputStream inputStream = Files.newInputStream(configPath)) {
+ properties.load(inputStream);
+ LOGGER.debug("Got configuration: <{}>", properties);
+ }
+
+ // HDFS
+ this.hdfsPath = properties.getProperty("hdfsPath", "hdfs:///opt/teragrep/cfe_39/srv/");
+ this.hdfsuri = properties.getProperty("hdfsuri");
+ if (this.hdfsuri == null) {
+ throw new IllegalArgumentException("hdfsuri not set");
+ }
+
+ // HDFS pruning
+ this.pruneOffset = Long.parseLong(properties.getProperty("pruneOffset", "172800000"));
+ if (this.pruneOffset <= 0) {
+ throw new IllegalArgumentException("pruneOffset must be set to >0, got " + pruneOffset);
+ }
+
+ // AVRO
+ this.queueDirectory = properties.getProperty("queueDirectory", System.getProperty("user.dir") + "/etc/AVRO/");
+ this.maximumFileSize = Long.parseLong(properties.getProperty("maximumFileSize", "60800000"));
+ if (this.maximumFileSize <= 0) {
+ throw new IllegalArgumentException("maximumFileSize must be set to >0, got " + maximumFileSize);
+ }
+
+ // kerberos
+ this.kerberosHost = properties.getProperty("java.security.krb5.kdc");
+ if (this.kerberosHost == null) {
+ throw new IllegalArgumentException("kerberosHost not set");
+ }
+ this.kerberosRealm = properties.getProperty("java.security.krb5.realm");
+ if (this.kerberosRealm == null) {
+ throw new IllegalArgumentException("kerberosRealm not set");
+ }
+ this.hadoopAuthentication = properties.getProperty("hadoop.security.authentication");
+ if (this.hadoopAuthentication == null) {
+ throw new IllegalArgumentException("hadoopAuthentication not set");
+ }
+ this.hadoopAuthorization = properties.getProperty("hadoop.security.authorization");
+ if (this.hadoopAuthorization == null) {
+ throw new IllegalArgumentException("hadoopAuthorization not set");
+ }
+ this.kerberosPrincipal = properties.getProperty("dfs.namenode.kerberos.principal.pattern");
+ if (this.kerberosPrincipal == null) {
+ throw new IllegalArgumentException("kerberosPrincipal not set");
+ }
+ this.kerberosKeytabUser = properties.getProperty("KerberosKeytabUser");
+ if (this.kerberosKeytabUser == null) {
+ throw new IllegalArgumentException("kerberosKeytabUser not set");
+ }
+ this.kerberosKeytabPath = properties.getProperty("KerberosKeytabPath");
+ if (this.kerberosKeytabPath == null) {
+ throw new IllegalArgumentException("kerberosKeytabPath not set");
+ }
+ this.kerberosLoginAutorenewal = properties.getProperty("kerberosLoginAutorenewal");
+ if (this.kerberosLoginAutorenewal == null) {
+ throw new IllegalArgumentException("kerberosLoginAutorenewal not set");
+ }
+ this.kerberosTestMode = properties.getProperty("dfs.client.use.datanode.hostname", "false");
+
+ this.dfsDataTransferProtection = properties.getProperty("dfs.data.transfer.protection");
+ if (this.dfsDataTransferProtection == null) {
+ throw new IllegalArgumentException("dfsDataTransferProtection not set");
+ }
+ this.dfsEncryptDataTransferCipherSuites = properties.getProperty("dfs.encrypt.data.transfer.cipher.suites");
+ if (this.dfsEncryptDataTransferCipherSuites == null) {
+ throw new IllegalArgumentException("dfsEncryptDataTransferCipherSuites not set");
+ }
+
+ // kafka
+ this.queueTopicPattern = properties.getProperty("queueTopicPattern", "^.*$");
+ this.numOfConsumers = Integer.parseInt(properties.getProperty("numOfConsumers", "1"));
+
+ // skip non RFC5424 records
+ this.skipNonRFC5424Records = properties.getProperty("skipNonRFC5424Records", "false").equalsIgnoreCase("true");
+
+ // skip empty RFC5424 records
+ this.skipEmptyRFC5424Records = properties
+ .getProperty("skipEmptyRFC5424Records", "false")
+ .equalsIgnoreCase("true");
+
+ this.kafkaConsumerProperties = loadSubProperties(properties, "consumer.");
+ String loginConfig = properties
+ .getProperty("java.security.auth.login.config", System.getProperty("user.dir") + "/rpm/resources/config.jaas");
+ if (loginConfig == null) {
+ throw new IOException("Property java.security.auth.login.config does not exist");
+ }
+ if (!(new File(loginConfig)).isFile()) {
+ throw new IOException("File '" + loginConfig + "' set by java.security.auth.login.config does not exist");
+ }
+
+ // Just for loggers to work
+ Path log4j2Config = Paths
+ .get(properties.getProperty("log4j2.configurationFile", System.getProperty("user.dir") + "/rpm/resources/log4j2.properties"));
+ LOGGER.info("Loading log4j2 config from <[{}]>", log4j2Config.toRealPath());
+ Configurator.reconfigure(log4j2Config.toUri());
+ }
+
+ private Properties loadSubProperties(Properties properties, String prefix) {
+ Properties subProperties = new Properties();
+
+ Enumeration