Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
77 commits
Select commit Hold shift + click to select a range
5db1cc4
Refactored Offset interface and classes for printing the object in Sy…
Tiihott Aug 21, 2024
58375bd
Added missing rfc5424Frame.next()
Tiihott Aug 21, 2024
be70879
Refactoring DatabaseOutput.java and related classes, WIP. Renamed Off…
Tiihott Aug 22, 2024
27b7c70
Continuing refactoring DatabaseOutput.java and related classes, WIP. …
Tiihott Aug 23, 2024
1045456
Added missing boolean input parameter to SyncableFileOutputStream() t…
Tiihott Aug 23, 2024
d47536d
Added additional test configuration file.
Tiihott Aug 26, 2024
82f42ce
Renamed ProcessingTest.java to BatchDistributionTest.java.
Tiihott Aug 26, 2024
ff29a80
Altered logic behind the usage of MaximumFileSize config parameter. A…
Tiihott Aug 26, 2024
971e060
Refactoring PartitionFile.java further. Implemented PartitionFile int…
Tiihott Aug 27, 2024
04f2abb
Refactoring PartitionFile.java further. Implemented PartitionRecords …
Tiihott Aug 27, 2024
a2948a8
Added missing private final statement.
Tiihott Aug 27, 2024
d654064
Added SyslogAvroWriterTest.java.
Tiihott Aug 27, 2024
a94ff51
Refactored Config to be immutable. Refactored old Config setter usage…
Tiihott Aug 27, 2024
cdec71e
Moved Config.java to com.teragrep.cfe_39.configuration package for fu…
Tiihott Aug 27, 2024
ef7f93a
Added logic for handling consumer group rebalance properly. Refactore…
Tiihott Aug 27, 2024
f1e1ea1
Implemented BatchDistribution interface as a subtype of Consumer inte…
Tiihott Aug 28, 2024
a63713d
Finished implementing kafka consumer group rebalance handling. Added …
Tiihott Aug 28, 2024
d613c98
Refactored HdfsTest.java and BatchDistributionTest.java.
Tiihott Aug 28, 2024
e4e226f
Refactored Ingestion0FilesTest.java and divided the second test insid…
Tiihott Aug 28, 2024
b1613f5
Deleted ingestion0FilesLowSizeTest() that was already moved to a sepa…
Tiihott Aug 28, 2024
9d6ee90
Refactored Ingestion1Old1NewFileTest.java. Fixed bug in syslogFile in…
Tiihott Aug 29, 2024
a29bc6a
Refactored Ingestion2NewFilesTest.java.
Tiihott Aug 29, 2024
8d96270
Refactored Ingestion2OldFilesTest.java
Tiihott Aug 29, 2024
b625bbb
Refactored KafkaConsumerTest.java
Tiihott Aug 29, 2024
fecf04c
Added logging.
Tiihott Aug 29, 2024
c9dfa7f
Spotless.
Tiihott Aug 29, 2024
33c80e2
Renamed IngestionRebalanceListener.java to ConsumerRebalanceListenerI…
Tiihott Aug 30, 2024
42164a4
Removed the pointless additional limitation from the number of thread…
Tiihott Aug 30, 2024
bddd964
Fixed constructor.
Tiihott Aug 30, 2024
cb30c25
Changed Consumer<List<KafkaRecordImpl>> references to subtype BatchDi…
Tiihott Aug 30, 2024
25244a3
Implemented initial framework for configuration refactoring. WIP
Tiihott Aug 30, 2024
d47a7f1
Implemented fixes to consumer group rebalance handling to solve issue…
Tiihott Sep 2, 2024
9e4f760
Renamed KafkaRecordConverter to KafkaAsSyslogRecord.
Tiihott Sep 2, 2024
fc1495e
Renamed convert() method to more appropriate toSyslogRecord().
Tiihott Sep 2, 2024
af1c027
Renamed rebalance() method to more appropriate delete().
Tiihott Sep 2, 2024
62513fe
Continuing refactoring Configuration classes. Combined kerberos/kafka…
Tiihott Sep 4, 2024
e790cdc
Continuing refactoring Configuration classes. Implemented Kafka prope…
Tiihott Sep 5, 2024
9519482
Implemented configureLogging() method for configuring logging.
Tiihott Sep 6, 2024
93fd736
Added missing java.security.auth.login.config key to required configu…
Tiihott Sep 6, 2024
f43813b
Changed all Config class usage in the code to use the new Configurati…
Tiihott Sep 6, 2024
990c65a
Removed now obsolete Config.java and ConfigTest.java files.
Tiihott Sep 6, 2024
c1410be
Improved Configuration tests. Removed duplicate validation of maximum…
Tiihott Sep 9, 2024
86d884f
Removed now unused NullKafkaRecord.java.
Tiihott Sep 9, 2024
a928f4d
Removed unneeded sleep() calls from tests, added comments for calls t…
Tiihott Sep 9, 2024
b1597ed
Removed unnecessary SyslogRecord initializations.
Tiihott Sep 9, 2024
b0eb72b
Implemented for loops for normalRecordsTest assertions.
Tiihott Sep 9, 2024
887da33
Set all classes to be final wherever possible. Made HdfsDataIngestion…
Tiihott Sep 10, 2024
47aba95
Implemented FileSystemFactory interface and class. Moved FileSystem o…
Tiihott Sep 10, 2024
da226e2
Made BatchDistributionImpl, KafkaReader and PartitionRecordsImpl final.
Tiihott Sep 11, 2024
dc6365d
Changed java version used by maven to java-1.8.0-openjdk, which alter…
Tiihott Sep 23, 2024
62ba79e
Implemented KafkaConfiguration.java, HdfsConfiguration.java, KafkaCon…
Tiihott Sep 26, 2024
25c2def
Split properties of ingress (kafka) and egress (hdfs) libraries to di…
Tiihott Sep 26, 2024
74508d8
Refactored Configuration.java, ConfigurationImpl.java and Configurati…
Tiihott Sep 26, 2024
afd4545
Refactored Main.java and all the tests to use the refactored configur…
Tiihott Sep 26, 2024
1f63274
Removed toKafkaConsumerProperties() from ConfigurationImpl and moved …
Tiihott Sep 27, 2024
d3b1a80
Renamed WritableQueue.java to UniqueFileCreated.java.
Tiihott Sep 27, 2024
5f49d37
Implemented dropwizard metrics Timer for measuring performance.
Tiihott Sep 27, 2024
e3ce72e
Renamed Configuration.java loadPropertiesFile(String configurationFil…
Tiihott Oct 1, 2024
f0d8de0
Checkstyle plugin implementation and Checkstyle code cleanup.
Tiihott Oct 29, 2024
7a21cda
Fixed logger message.
Tiihott Oct 31, 2024
ea61a29
Added resource cleanup when exception is caught and rethrown.
Tiihott Nov 6, 2024
0a31fdd
Removed redundant avro-file initializations from PartitionFileImpl.ja…
Tiihott Nov 6, 2024
b8fa902
Added consumer timeout test. Increased timeout parameter values in co…
Tiihott Nov 6, 2024
48cc3d2
Added cnf_01 Teragrep configuration library dependency.
Tiihott Nov 13, 2024
05fcb64
Added refactored NewCommonConfiguration.java and test that use cnf_01.
Tiihott Nov 13, 2024
8ddac46
Added refactored NewKafkaConfiguration.java, NewHdfsConfiguration.jav…
Tiihott Nov 14, 2024
bf496f4
Moved pruneOffset configuration parameter from application.properties…
Tiihott Nov 15, 2024
37d2a70
Refactored code to use the refactored configuration classes and cnf_01.
Tiihott Nov 15, 2024
1146a23
Removed old configuration classes and tests.
Tiihott Nov 15, 2024
0dd9692
Renamed NewCommonConfiguration.java to CommonConfiguration.java. Rena…
Tiihott Nov 15, 2024
12bc003
Implemented PartitionFileFactory.java
Tiihott Nov 18, 2024
062ddcc
Improved PartitionFileImpl constructor by adding secondary constructo…
Tiihott Nov 19, 2024
b6fe5dc
Moved maximumFileSize from common configuration parameters to hdfs co…
Tiihott Nov 19, 2024
90af130
Fixed kafka and hdfs configuration paths.
Tiihott Dec 11, 2024
fc15b0a
Moved numOfConsumers parameter from CommonConfiguration to KafkaConfi…
Tiihott Dec 11, 2024
f3f8fde
Replaced offsetToJSON() printer of KafkaRecord interface with topicPa…
Tiihott Dec 11, 2024
96fa63e
Fixed tests to use target directory for avro file generation.
Tiihott Dec 11, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
224 changes: 224 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
<name>cfe_39</name>
<properties>
<changelist>-SNAPSHOT</changelist>
<cnf_01.version>1.0.0</cnf_01.version>
<hadoop.version>3.3.6</hadoop.version>
<java.version>1.8</java.version>
<maven.compiler.source>1.8</maven.compiler.source>
Expand Down Expand Up @@ -78,6 +79,12 @@
<artifactId>rlo_06</artifactId>
<version>9.0.1</version>
</dependency>
<!-- Configuration -->
<dependency>
<groupId>com.teragrep</groupId>
<artifactId>cnf_01</artifactId>
<version>${cnf_01.version}</version>
</dependency>
<!-- kafka -->
<dependency>
<groupId>org.apache.kafka</groupId>
Expand Down Expand Up @@ -199,9 +206,14 @@
<exclude>src/test/resources/broken.application.properties</exclude>
<exclude>src/test/resources/valid.application.properties</exclude>
<exclude>src/test/resources/failProcessing.application.properties</exclude>
<exclude>src/test/resources/largeFile.application.properties</exclude>
<exclude>rpm/resources/config.jaas</exclude>
<exclude>rpm/resources/log4j2.properties</exclude>
<exclude>rpm/resources/application.properties</exclude>
<exclude>rpm/resources/ingress.properties</exclude>
<exclude>rpm/resources/egress.properties</exclude>
<exclude>src/test/resources/valid.hdfs.properties</exclude>
<exclude>src/test/resources/valid.kafka.properties</exclude>
<exclude>rpm/resources/cfe_39.service</exclude>
<exclude>rpm/rpm.pom.xml</exclude>
<exclude>src/main/java/com/teragrep/cfe_39/avro/SyslogRecord.java</exclude>
Expand Down Expand Up @@ -380,6 +392,218 @@
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
<version>3.5.0</version>
<executions>
<!-- These are all errors that will fail the build if triggered -->
<execution>
<id>scan-errors</id>
<goals>
<goal>check</goal>
</goals>
<phase>process-classes</phase>
<configuration>
<violationSeverity>error</violationSeverity>
<logViolationsToConsole>true</logViolationsToConsole>
<failOnViolation>true</failOnViolation>
<failsOnError>false</failsOnError>
<checkstyleRules>
<module name="Checker">
<module name="TreeWalker">
<module name="ClassMemberImpliedModifier"></module>
<module name="CovariantEquals"></module>
<module name="DefaultComesLast"></module>
<module name="EmptyBlock"></module>
<module name="EmptyCatchBlock"></module>
<module name="EmptyStatement"></module>
<module name="EqualsAvoidNull"></module>
<module name="EqualsHashCode"></module>
<module name="FallThrough"></module>
<module name="FinalClass"></module>
<module name="HiddenField">
<property name="ignoreConstructorParameter" value="true"></property>
<property name="ignoreSetter" value="true"></property>
</module>
<module name="IllegalCatch"></module>
<module name="IllegalImport"></module>
<module name="IllegalThrows"></module>
<module name="IllegalToken"></module>
<module name="IllegalType"></module>
<module name="InnerAssignment"></module>
<module name="InterfaceMemberImpliedModifier"></module>
<module name="MissingOverride"></module>
<module name="MissingSwitchDefault"></module>
<module name="ModifiedControlVariable">
<property name="skipEnhancedForLoopVariable" value="true"></property>
</module>
<module name="ModifierOrder"></module>
<module name="MutableException"></module>
<module name="NeedBraces"></module>
<module name="NestedForDepth">
<property name="max" value="2"></property>
</module>
<module name="NestedTryDepth"></module>
<module name="NoClone"></module>
<module name="NoFinalizer"></module>
<module name="OneTopLevelClass"></module>
<module name="PackageDeclaration"></module>
<module name="PackageName">
<property name="format" value="^com\.teragrep\.[a-z]{3}_\d{2}(?:.[a-zA-Z]\w*)*$"></property>
</module>
<module name="ReturnCount">
<property name="max" value="5"></property>
</module>
<module name="StringLiteralEquality"></module>
<module name="SuperClone"></module>
<module name="SuperFinalize"></module>
<module name="TypeName"></module>
<module name="UpperEll"></module>
<module name="VisibilityModifier">
<property name="allowPublicFinalFields" value="true"></property>
</module>
</module>
<module name="Translation"></module>
<module name="UniqueProperties"></module>
<module name="BeforeExecutionExclusionFileFilter">
<property name="fileNamePattern" value="SyslogRecord\.java$"></property>
</module>
</module>
</checkstyleRules>
</configuration>
</execution>
<!-- These are warnings but will not fail the build -->
<execution>
<id>scan-warnings</id>
<goals>
<goal>check</goal>
</goals>
<phase>process-classes</phase>
<configuration>
<violationSeverity>warning</violationSeverity>
<logViolationsToConsole>true</logViolationsToConsole>
<failOnViolation>false</failOnViolation>
<failsOnError>false</failsOnError>
<checkstyleRules>
<module name="Checker">
<module name="TreeWalker">
<module name="ArrayTypeStyle">
<property name="severity" value="warning"></property>
</module>
<module name="AvoidNestedBlocks">
<property name="allowInSwitchCase" value="true"></property>
<property name="severity" value="warning"></property>
</module>
<module name="AvoidNoArgumentSuperConstructorCall">
<property name="severity" value="warning"></property>
</module>
<module name="AvoidStarImport">
<property name="severity" value="warning"></property>
</module>
<module name="AvoidStaticImport">
<property name="severity" value="warning"></property>
</module>
<module name="DeclarationOrder">
<property name="severity" value="warning"></property>
</module>
<module name="FinalLocalVariable">
<property name="severity" value="warning"></property>
</module>
<module name="FinalParameters">
<property name="severity" value="warning"></property>
</module>
<module name="MagicNumber">
<property name="severity" value="warning"></property>
</module>
<module name="MissingDeprecated">
<property name="severity" value="warning"></property>
</module>
<module name="MultipleVariableDeclarations">
<property name="severity" value="warning"></property>
</module>
<module name="NestedForDepth">
<property name="max" value="1"></property>
<property name="severity" value="warning"></property>
</module>
<module name="NestedIfDepth">
<property name="severity" value="warning"></property>
</module>
<module name="NoArrayTrailingComma">
<property name="severity" value="warning"></property>
</module>
<module name="NoCodeInFile">
<property name="severity" value="warning"></property>
</module>
<module name="NoEnumTrailingComma">
<property name="severity" value="warning"></property>
</module>
<module name="OneStatementPerLine">
<property name="severity" value="warning"></property>
</module>
<module name="OuterTypeFilename">
<property name="severity" value="warning"></property>
</module>
<module name="ParameterAssignment">
<property name="severity" value="warning"></property>
</module>
<module name="RedundantImport">
<property name="severity" value="warning"></property>
</module>
<module name="RequireThis">
<property name="checkFields" value="false"></property>
<property name="checkMethods" value="false"></property>
<property name="validateOnlyOverlapping" value="true"></property>
<property name="severity" value="warning"></property>
</module>
<module name="ReturnCount">
<property name="max" value="1"></property>
<property name="severity" value="warning"></property>
</module>
<module name="SimplifyBooleanExpression">
<property name="severity" value="warning"></property>
</module>
<module name="SimplifyBooleanReturn">
<property name="severity" value="warning"></property>
</module>
<module name="UnnecessarySemicolonAfterOuterTypeDeclaration">
<property name="severity" value="warning"></property>
</module>
<module name="UnnecessarySemicolonAfterTypeMemberDeclaration">
<property name="severity" value="warning"></property>
</module>
<module name="UnnecessarySemicolonInEnumeration">
<property name="severity" value="warning"></property>
</module>
<module name="UnnecessarySemicolonInTryWithResources">
<property name="severity" value="warning"></property>
</module>
<module name="UnusedLocalVariable">
<property name="severity" value="warning"></property>
</module>
<module name="VariableDeclarationUsageDistance">
<property name="severity" value="warning"></property>
</module>
<module name="VisibilityModifier">
<property name="allowPublicFinalFields" value="false"></property>
<property name="severity" value="warning"></property>
</module>
</module>
<module name="NewlineAtEndOfFile">
<property name="severity" value="warning"></property>
</module>
<module name="OrderedProperties">
<property name="severity" value="warning"></property>
</module>
<module name="BeforeExecutionExclusionFileFilter">
<property name="fileNamePattern" value="SyslogRecord\.java$"></property>
</module>
</module>
</checkstyleRules>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
48 changes: 6 additions & 42 deletions rpm/resources/application.properties
Original file line number Diff line number Diff line change
@@ -1,52 +1,16 @@
# Kafka security configuration file
java.security.auth.login.config=/opt/teragrep/cfe_39/etc/config.jaas
# Logger settings
log4j2.configurationFile=/opt/teragrep/cfe_39/etc/log4j2.properties
# hdfs settings
egress.configurationFile=/opt/teragrep/cfe_39/etc/egress.properties
# kafka settings
ingress.configurationFile=/opt/teragrep/cfe_39/etc/ingress.properties
# What topics are searched from kafka, regex
queueTopicPattern=^testConsumerTopic-*$
# Number of consumers created to the consumer groups
numOfConsumers=2
# Kafka bootstrap servers
consumer.bootstrap.servers=test
# Offset, should not be touched
consumer.auto.offset.reset=earliest
# Autocommit, should not be touched
consumer.enable.auto.commit=false
# Consumer group id, this is to track the progress of reading hte topic
consumer.group.id=cfe_39
# Used security protocol and mechanism
consumer.security.protocol=SASL_PLAINTEXT
consumer.sasl.mechanism=PLAIN
# Maximum records per batch, note that too big number will cause massive load and can cause timeouts to trigger
consumer.max.poll.records=500
# How much data can be fetched in one go
consumer.fetch.max.bytes=1073741820
# How long for request before timing out. Note that too big max poll records size can cause this to trigger
consumer.request.timeout.ms=300000
consumer.max.poll.interval.ms=300000
# For testing only, remove for prod.
consumer.useMockKafkaConsumer=true
# Directory where AVRO files are constructed for HDFS
queueDirectory=/opt/teragrep/cfe_39/etc/AVRO/
# The maximum file size for AVRO-files that are to be stored in HDFS database.
maximumFileSize=60800000
# Boolean for deciding if records not in RFC5424 should be skipped or not.
skipNonRFC5424Records=true
# Boolean for deciding if empty RFC5424 records should be skipped or not.
skipEmptyRFC5424Records=true
# HDFS pruning offset, prunes files older than the given milliseconds.
pruneOffset=172800000
# HDFS uri
hdfsuri=hdfs://localhost:45937/
# Kerberos
java.security.krb5.kdc=test
java.security.krb5.realm=test
hadoop.security.authentication=test
hadoop.security.authorization=test
dfs.namenode.kerberos.principal.pattern=test
KerberosKeytabUser=test
KerberosKeytabPath=test
dfs.client.use.datanode.hostname=false
kerberosLoginAutorenewal=true
dfs.data.transfer.protection=test
dfs.encrypt.data.transfer.cipher.suites=test
# timeout modifier for when the consumer's cache of intermediate results are flushed to HDFS
consumerTimeout=600000
20 changes: 20 additions & 0 deletions rpm/resources/egress.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# HDFS pruning, use 157784760000 value while testing HDFS writes to ensure the test records are not pruned. 157784760000L
pruneOffset=157784760000
# HDFS uri
hdfsuri=hdfs://localhost:45937/
# HDFS path
hdfsPath=hdfs:///opt/teragrep/cfe_39/srv/
# Kerberos
java.security.krb5.kdc=test
java.security.krb5.realm=test
hadoop.security.authentication=kerberos
hadoop.security.authorization=test
dfs.namenode.kerberos.principal.pattern=test
KerberosKeytabUser=test
KerberosKeytabPath=test
dfs.client.use.datanode.hostname=false
hadoop.kerberos.keytab.login.autorenewal.enabled=true
dfs.data.transfer.protection=test
dfs.encrypt.data.transfer.cipher.suites=test
# The maximum file size for AVRO-files that are to be stored in HDFS database.
maximumFileSize=3000
24 changes: 24 additions & 0 deletions rpm/resources/ingress.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# Kafka security configuration file
java.security.auth.login.config=/opt/teragrep/cfe_39/etc/config.jaas
# Kafka bootstrap servers
bootstrap.servers=test
# Offset, should not be touched
auto.offset.reset=earliest
# Autocommit, should not be touched
enable.auto.commit=false
# Consumer group id, this is to track the progress of reading hte topic
group.id=cfe_39
# Used security protocol and mechanism
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
# Maximum records per batch, note that too big number will cause massive load and can cause timeouts to trigger
max.poll.records=500
# How much data can be fetched in one go
fetch.max.bytes=1073741820
# How long for request before timing out. Note that too big max poll records size can cause this to trigger
request.timeout.ms=300000
max.poll.interval.ms=300000
# For testing only
useMockKafkaConsumer=true
# Number of consumers created to the consumer groups
numOfConsumers=2
Loading