diff --git a/pom.xml b/pom.xml
index 9a2db7ab..6434cf1e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -69,6 +69,32 @@ under the License.
java
${build.directory}/profile-results
+
+
+ 10.18.2
+ false
+ 2.43.0
+ 3.4.3
+ package
+
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
@@ -630,7 +656,82 @@ under the License.
-
+
+
+
+
+ org.apache.maven.plugins
+ maven-checkstyle-plugin
+ 3.3.1
+
+
+ com.puppycrawl.tools
+ checkstyle
+
+ ${checkstyle.version}
+
+
+
+
+
+ validate
+ validate
+
+ check
+
+
+
+
+
+
+ org/apache/flink/benchmark/thrift/**
+
+ /tools/maven/suppressions.xml
+ true
+ /tools/maven/checkstyle.xml
+ true
+ true
+
+
+
+
+ com.diffplug.spotless
+ spotless-maven-plugin
+ ${spotless.version}
+
+
+
+ 1.24.0
+
+
+
+
+
+ org.apache.flink,org.apache.flink.shaded,,javax,java,scala,\#
+
+
+
+
+
+
+ src/main/java/org/apache/flink/benchmark/thrift/**
+
+
+
+
+
+ spotless-check
+ validate
+
+ check
+
+
+
+
+
+
+
+
kr.motd.maven
@@ -754,6 +855,14 @@ under the License.
+
+ org.apache.maven.plugins
+ maven-checkstyle-plugin
+
+
+ com.diffplug.spotless
+ spotless-maven-plugin
+
diff --git a/src/main/java/org/apache/flink/benchmark/BlockingPartitionBenchmark.java b/src/main/java/org/apache/flink/benchmark/BlockingPartitionBenchmark.java
index 0159de1d..db3962d8 100644
--- a/src/main/java/org/apache/flink/benchmark/BlockingPartitionBenchmark.java
+++ b/src/main/java/org/apache/flink/benchmark/BlockingPartitionBenchmark.java
@@ -85,14 +85,13 @@ public void setUp() throws Exception {
env.setBufferTimeout(-1);
}
- protected Configuration createConfiguration(
- boolean compressionEnabled) {
+ protected Configuration createConfiguration(boolean compressionEnabled) {
Configuration configuration = super.createConfiguration();
configuration.set(
NettyShuffleEnvironmentOptions.SHUFFLE_COMPRESSION_CODEC,
- compressionEnabled ?
- NettyShuffleEnvironmentOptions.CompressionCodec.LZ4
+ compressionEnabled
+ ? NettyShuffleEnvironmentOptions.CompressionCodec.LZ4
: NettyShuffleEnvironmentOptions.CompressionCodec.NONE);
configuration.set(
CoreOptions.TMP_DIRS,
diff --git a/src/main/java/org/apache/flink/benchmark/BlockingPartitionRemoteChannelBenchmark.java b/src/main/java/org/apache/flink/benchmark/BlockingPartitionRemoteChannelBenchmark.java
index da411bb1..9df44e3a 100644
--- a/src/main/java/org/apache/flink/benchmark/BlockingPartitionRemoteChannelBenchmark.java
+++ b/src/main/java/org/apache/flink/benchmark/BlockingPartitionRemoteChannelBenchmark.java
@@ -20,7 +20,6 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
-import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator;
import org.apache.flink.util.FileUtils;
diff --git a/src/main/java/org/apache/flink/benchmark/CheckpointEnvironmentContext.java b/src/main/java/org/apache/flink/benchmark/CheckpointEnvironmentContext.java
index 0295cdf3..d7e5a292 100644
--- a/src/main/java/org/apache/flink/benchmark/CheckpointEnvironmentContext.java
+++ b/src/main/java/org/apache/flink/benchmark/CheckpointEnvironmentContext.java
@@ -98,8 +98,7 @@ public enum CheckpointMode {
TaskManagerOptions.MEMORY_SEGMENT_SIZE,
CheckpointEnvironmentContext.START_MEMORY_SEGMENT_SIZE);
config.set(
- CheckpointingOptions.ALIGNED_CHECKPOINT_TIMEOUT,
- Duration.ofMillis(0));
+ CheckpointingOptions.ALIGNED_CHECKPOINT_TIMEOUT, Duration.ofMillis(0));
config.set(TaskManagerOptions.BUFFER_DEBLOAT_ENABLED, false);
return config;
}),
@@ -110,8 +109,7 @@ public enum CheckpointMode {
TaskManagerOptions.MEMORY_SEGMENT_SIZE,
CheckpointEnvironmentContext.START_MEMORY_SEGMENT_SIZE);
config.set(
- CheckpointingOptions.ALIGNED_CHECKPOINT_TIMEOUT,
- Duration.ofMillis(1));
+ CheckpointingOptions.ALIGNED_CHECKPOINT_TIMEOUT, Duration.ofMillis(1));
config.set(TaskManagerOptions.BUFFER_DEBLOAT_ENABLED, false);
return config;
}),
diff --git a/src/main/java/org/apache/flink/benchmark/ContinuousFileReaderOperatorBenchmark.java b/src/main/java/org/apache/flink/benchmark/ContinuousFileReaderOperatorBenchmark.java
index be136912..c6d0ea11 100644
--- a/src/main/java/org/apache/flink/benchmark/ContinuousFileReaderOperatorBenchmark.java
+++ b/src/main/java/org/apache/flink/benchmark/ContinuousFileReaderOperatorBenchmark.java
@@ -27,9 +27,9 @@
import org.apache.flink.streaming.api.functions.sink.legacy.SinkFunction;
import org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperatorFactory;
import org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit;
+import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction;
import joptsimple.internal.Strings;
-import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.OperationsPerInvocation;
import org.openjdk.jmh.runner.Runner;
@@ -43,19 +43,19 @@
@OperationsPerInvocation(value = ContinuousFileReaderOperatorBenchmark.RECORDS_PER_INVOCATION)
public class ContinuousFileReaderOperatorBenchmark extends BenchmarkBase {
- private static final int SPLITS_PER_INVOCATION = 100;
- private static final int LINES_PER_SPLIT = 175_000;
- public static final int RECORDS_PER_INVOCATION = SPLITS_PER_INVOCATION * LINES_PER_SPLIT;
+ private static final int splitsPerInvocation = 100;
+ private static final int linesPerSplit = 175_000;
+ public static final int RECORDS_PER_INVOCATION = splitsPerInvocation * linesPerSplit;
- private static final TimestampedFileInputSplit SPLIT =
+ private static final TimestampedFileInputSplit split =
new TimestampedFileInputSplit(0, 0, new Path("."), 0, 0, new String[] {});
- private static final String LINE = Strings.repeat('0', 10);
+ private static final String line = Strings.repeat('0', 10);
// Source should wait until all elements reach sink. Otherwise, END_OF_INPUT is sent once all
// splits are emitted.
// Thus, all subsequent reads in ContinuousFileReaderOperator would be made in CLOSING state in
// a simple while-true loop (MailboxExecutor.isIdle is always true).
- private static OneShotLatch TARGET_COUNT_REACHED_LATCH = new OneShotLatch();
+ private static OneShotLatch targetCountReachedLatch = new OneShotLatch();
public static void main(String[] args) throws RunnerException {
Options options =
@@ -73,7 +73,7 @@ public static void main(String[] args) throws RunnerException {
@Benchmark
public void readFileSplit(FlinkEnvironmentContext context) throws Exception {
- TARGET_COUNT_REACHED_LATCH.reset();
+ targetCountReachedLatch.reset();
StreamExecutionEnvironment env = context.env;
env.enableCheckpointing(100)
.setParallelism(1)
@@ -93,15 +93,15 @@ private static class MockSourceFunction implements SourceFunction ctx) {
- while (isRunning && count < SPLITS_PER_INVOCATION) {
+ while (isRunning && count < splitsPerInvocation) {
count++;
synchronized (ctx.getCheckpointLock()) {
- ctx.collect(SPLIT);
+ ctx.collect(split);
}
}
while (isRunning) {
try {
- TARGET_COUNT_REACHED_LATCH.await(100, TimeUnit.MILLISECONDS);
+ targetCountReachedLatch.await(100, TimeUnit.MILLISECONDS);
return;
} catch (InterruptedException e) {
if (!isRunning) {
@@ -124,13 +124,13 @@ private static class MockInputFormat extends FileInputFormat {
@Override
public boolean reachedEnd() {
- return count >= ContinuousFileReaderOperatorBenchmark.LINES_PER_SPLIT;
+ return count >= ContinuousFileReaderOperatorBenchmark.linesPerSplit;
}
@Override
public String nextRecord(String s) {
count++;
- return LINE;
+ return line;
}
@Override
@@ -151,7 +151,7 @@ private static class LimitedSink implements SinkFunction {
@Override
public void invoke(String value, Context context) {
if (++count == RECORDS_PER_INVOCATION) {
- TARGET_COUNT_REACHED_LATCH.trigger();
+ targetCountReachedLatch.trigger();
}
}
}
diff --git a/src/main/java/org/apache/flink/benchmark/FlinkEnvironmentContext.java b/src/main/java/org/apache/flink/benchmark/FlinkEnvironmentContext.java
index b51de81e..c3fa19cc 100644
--- a/src/main/java/org/apache/flink/benchmark/FlinkEnvironmentContext.java
+++ b/src/main/java/org/apache/flink/benchmark/FlinkEnvironmentContext.java
@@ -103,11 +103,12 @@ protected Configuration createConfiguration() {
final Configuration configuration = new Configuration();
configuration.set(RestOptions.BIND_PORT, "0");
// no equivalent config available.
- //configuration.setInteger(
+ // configuration.setInteger(
// NettyShuffleEnvironmentOptions.NETWORK_NUM_BUFFERS, NUM_NETWORK_BUFFERS);
configuration.set(DeploymentOptions.TARGET, MiniClusterPipelineExecutorServiceLoader.NAME);
configuration.set(DeploymentOptions.ATTACHED, true);
- // It doesn't make sense to wait for the final checkpoint in benchmarks since it only prolongs
+ // It doesn't make sense to wait for the final checkpoint in benchmarks since it only
+ // prolongs
// the test but doesn't give any advantages.
configuration.set(CheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, false);
// TODO: remove this line after FLINK-28243 will be done
diff --git a/src/main/java/org/apache/flink/benchmark/HighAvailabilityServiceBenchmark.java b/src/main/java/org/apache/flink/benchmark/HighAvailabilityServiceBenchmark.java
index e49c61e7..c91f693e 100644
--- a/src/main/java/org/apache/flink/benchmark/HighAvailabilityServiceBenchmark.java
+++ b/src/main/java/org/apache/flink/benchmark/HighAvailabilityServiceBenchmark.java
@@ -18,7 +18,6 @@
package org.apache.flink.benchmark;
-import org.apache.curator.test.TestingServer;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
@@ -27,6 +26,8 @@
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.util.FileUtils;
+
+import org.apache.curator.test.TestingServer;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
@@ -54,88 +55,94 @@
*/
@OutputTimeUnit(SECONDS)
public class HighAvailabilityServiceBenchmark extends BenchmarkBase {
- public static void main(String[] args) throws RunnerException {
- Options options =
- new OptionsBuilder()
- .verbosity(VerboseMode.NORMAL)
- .include(".*" + HighAvailabilityServiceBenchmark.class.getCanonicalName() + ".*")
- .build();
-
- new Runner(options).run();
- }
-
- @Benchmark
- public void submitJobThroughput(HighAvailabilityContext context) throws Exception {
- context.miniCluster.executeJobBlocking(buildNoOpJob());
- }
-
- private static JobGraph buildNoOpJob() {
- JobGraph jobGraph = new JobGraph(JobID.generate(), UUID.randomUUID().toString());
- jobGraph.addVertex(createNoOpVertex());
- return jobGraph;
- }
-
- private static JobVertex createNoOpVertex() {
- JobVertex vertex = new JobVertex("v");
- vertex.setInvokableClass(NoOpInvokable.class);
- vertex.setParallelism(1);
- vertex.setMaxParallelism(1);
- return vertex;
- }
-
- @State(Thread)
- public static class HighAvailabilityContext extends FlinkEnvironmentContext {
- private TestingServer testingServer;
- public final File haDir;
-
- @Param({"ZOOKEEPER", "NONE"})
- public HighAvailabilityMode highAvailabilityMode;
-
- public HighAvailabilityContext() {
- try {
- haDir = Files.createTempDirectory("bench-ha-").toFile();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public void setUp() throws Exception {
- if (isZookeeperHighAvailability()) {
- testingServer = new TestingServer();
- testingServer.start();
- }
-
- // The method `super.setUp()` will call `createConfiguration()` to get Configuration and
- // create a `MiniCluster`. We need to start TestingServer before `createConfiguration()`,
- // then we can add zookeeper quorum in the configuration. So we can only start
- // `TestingServer` before `super.setUp()`.
- super.setUp();
- }
-
- private boolean isZookeeperHighAvailability() {
- return highAvailabilityMode == HighAvailabilityMode.ZOOKEEPER;
- }
-
- @Override
- protected Configuration createConfiguration() {
- Configuration configuration = super.createConfiguration();
- configuration.set(HighAvailabilityOptions.HA_MODE, highAvailabilityMode.name());
- configuration.set(HighAvailabilityOptions.HA_STORAGE_PATH, haDir.toURI().toString());
- if (isZookeeperHighAvailability()) {
- configuration.set(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, testingServer.getConnectString());
- }
- return configuration;
- }
-
- @Override
- public void tearDown() throws Exception {
- super.tearDown();
- if (isZookeeperHighAvailability()) {
- testingServer.stop();
- testingServer.close();
- }
- FileUtils.deleteDirectory(haDir);
- }
- }
+ public static void main(String[] args) throws RunnerException {
+ Options options =
+ new OptionsBuilder()
+ .verbosity(VerboseMode.NORMAL)
+ .include(
+ ".*"
+ + HighAvailabilityServiceBenchmark.class.getCanonicalName()
+ + ".*")
+ .build();
+
+ new Runner(options).run();
+ }
+
+ @Benchmark
+ public void submitJobThroughput(HighAvailabilityContext context) throws Exception {
+ context.miniCluster.executeJobBlocking(buildNoOpJob());
+ }
+
+ private static JobGraph buildNoOpJob() {
+ JobGraph jobGraph = new JobGraph(JobID.generate(), UUID.randomUUID().toString());
+ jobGraph.addVertex(createNoOpVertex());
+ return jobGraph;
+ }
+
+ private static JobVertex createNoOpVertex() {
+ JobVertex vertex = new JobVertex("v");
+ vertex.setInvokableClass(NoOpInvokable.class);
+ vertex.setParallelism(1);
+ vertex.setMaxParallelism(1);
+ return vertex;
+ }
+
+ @State(Thread)
+ public static class HighAvailabilityContext extends FlinkEnvironmentContext {
+ private TestingServer testingServer;
+ public final File haDir;
+
+ @Param({"ZOOKEEPER", "NONE"})
+ public HighAvailabilityMode highAvailabilityMode;
+
+ public HighAvailabilityContext() {
+ try {
+ haDir = Files.createTempDirectory("bench-ha-").toFile();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void setUp() throws Exception {
+ if (isZookeeperHighAvailability()) {
+ testingServer = new TestingServer();
+ testingServer.start();
+ }
+
+ // The method `super.setUp()` will call `createConfiguration()` to get Configuration and
+ // create a `MiniCluster`. We need to start TestingServer before
+ // `createConfiguration()`,
+ // then we can add zookeeper quorum in the configuration. So we can only start
+ // `TestingServer` before `super.setUp()`.
+ super.setUp();
+ }
+
+ private boolean isZookeeperHighAvailability() {
+ return highAvailabilityMode == HighAvailabilityMode.ZOOKEEPER;
+ }
+
+ @Override
+ protected Configuration createConfiguration() {
+ Configuration configuration = super.createConfiguration();
+ configuration.set(HighAvailabilityOptions.HA_MODE, highAvailabilityMode.name());
+ configuration.set(HighAvailabilityOptions.HA_STORAGE_PATH, haDir.toURI().toString());
+ if (isZookeeperHighAvailability()) {
+ configuration.set(
+ HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM,
+ testingServer.getConnectString());
+ }
+ return configuration;
+ }
+
+ @Override
+ public void tearDown() throws Exception {
+ super.tearDown();
+ if (isZookeeperHighAvailability()) {
+ testingServer.stop();
+ testingServer.close();
+ }
+ FileUtils.deleteDirectory(haDir);
+ }
+ }
}
diff --git a/src/main/java/org/apache/flink/benchmark/KeyByBenchmarks.java b/src/main/java/org/apache/flink/benchmark/KeyByBenchmarks.java
index c4cb9074..149cacd8 100644
--- a/src/main/java/org/apache/flink/benchmark/KeyByBenchmarks.java
+++ b/src/main/java/org/apache/flink/benchmark/KeyByBenchmarks.java
@@ -23,8 +23,8 @@
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.legacy.DiscardingSink;
-
import org.apache.flink.streaming.util.keys.KeySelectorUtil;
+
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.OperationsPerInvocation;
import org.openjdk.jmh.runner.Runner;
@@ -68,8 +68,9 @@ public void arrayKeyBy(FlinkEnvironmentContext context) throws Exception {
StreamExecutionEnvironment env = context.env;
env.setParallelism(4);
- DataStreamSource source = env.addSource(new IncreasingArraySource(ARRAY_RECORDS_PER_INVOCATION, 10));
- source.keyBy(KeySelectorUtil.getSelectorForArray(new int[]{0}, source.getType()))
+ DataStreamSource source =
+ env.addSource(new IncreasingArraySource(ARRAY_RECORDS_PER_INVOCATION, 10));
+ source.keyBy(KeySelectorUtil.getSelectorForArray(new int[] {0}, source.getType()))
.addSink(new DiscardingSink<>());
env.execute();
diff --git a/src/main/java/org/apache/flink/benchmark/MultipleInputBenchmark.java b/src/main/java/org/apache/flink/benchmark/MultipleInputBenchmark.java
index 72ad6c8c..95eca61a 100644
--- a/src/main/java/org/apache/flink/benchmark/MultipleInputBenchmark.java
+++ b/src/main/java/org/apache/flink/benchmark/MultipleInputBenchmark.java
@@ -167,7 +167,8 @@ public static void reset() {
@Override
public SourceReader createReader(
SourceReaderContext readerContext) {
- return new MockSourceReader(MockSourceReader.WaitingForSplits.WAIT_UNTIL_ALL_SPLITS_ASSIGNED, true) {
+ return new MockSourceReader(
+ MockSourceReader.WaitingForSplits.WAIT_UNTIL_ALL_SPLITS_ASSIGNED, true) {
@Override
public InputStatus pollNext(ReaderOutput sourceOutput) {
if (canFinish.isDone() && !canFinish.isCompletedExceptionally()) {
diff --git a/src/main/java/org/apache/flink/benchmark/ProcessingTimerBenchmark.java b/src/main/java/org/apache/flink/benchmark/ProcessingTimerBenchmark.java
index 83939b80..21db382a 100644
--- a/src/main/java/org/apache/flink/benchmark/ProcessingTimerBenchmark.java
+++ b/src/main/java/org/apache/flink/benchmark/ProcessingTimerBenchmark.java
@@ -24,6 +24,7 @@
import org.apache.flink.streaming.api.functions.sink.legacy.DiscardingSink;
import org.apache.flink.streaming.api.functions.source.legacy.RichParallelSourceFunction;
import org.apache.flink.util.Collector;
+
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.OperationsPerInvocation;
import org.openjdk.jmh.runner.Runner;
@@ -41,7 +42,7 @@ public class ProcessingTimerBenchmark extends BenchmarkBase {
private static final int PARALLELISM = 1;
- private static OneShotLatch LATCH = new OneShotLatch();
+ private static OneShotLatch latch = new OneShotLatch();
public static void main(String[] args) throws RunnerException {
Options options =
@@ -55,7 +56,7 @@ public static void main(String[] args) throws RunnerException {
@Benchmark
public void fireProcessingTimers(FlinkEnvironmentContext context) throws Exception {
- LATCH.reset();
+ latch.reset();
StreamExecutionEnvironment env = context.env;
env.setParallelism(PARALLELISM);
@@ -84,7 +85,7 @@ public void run(SourceContext sourceContext) throws Exception {
sourceContext.collect(String.valueOf(random.nextLong()));
}
- LATCH.await();
+ latch.await();
}
@Override
@@ -111,7 +112,8 @@ public void processElement(String s, Context context, Collector collecto
throws Exception {
final long currTimestamp = System.currentTimeMillis();
for (int i = 0; i < timersPerRecord; i++) {
- context.timerService().registerProcessingTimeTimer(currTimestamp - timersPerRecord + i);
+ context.timerService()
+ .registerProcessingTimeTimer(currTimestamp - timersPerRecord + i);
}
}
@@ -119,7 +121,7 @@ public void processElement(String s, Context context, Collector collecto
public void onTimer(long timestamp, OnTimerContext ctx, Collector out)
throws Exception {
if (++firedTimesCount == timersPerRecord) {
- LATCH.trigger();
+ latch.trigger();
}
}
}
diff --git a/src/main/java/org/apache/flink/benchmark/RemoteBenchmarkBase.java b/src/main/java/org/apache/flink/benchmark/RemoteBenchmarkBase.java
index e6fef619..70715e03 100644
--- a/src/main/java/org/apache/flink/benchmark/RemoteBenchmarkBase.java
+++ b/src/main/java/org/apache/flink/benchmark/RemoteBenchmarkBase.java
@@ -36,7 +36,9 @@ protected int getNumberOfSlotsPerTaskManager() {
return 1;
}
- /** @return the number of vertices the respective job graph contains. */
+ /**
+ * @return the number of vertices the respective job graph contains.
+ */
abstract int getNumberOfVertices();
}
}
diff --git a/src/main/java/org/apache/flink/benchmark/SerializationFrameworkMiniBenchmarks.java b/src/main/java/org/apache/flink/benchmark/SerializationFrameworkMiniBenchmarks.java
index dd4a242d..61402c01 100644
--- a/src/main/java/org/apache/flink/benchmark/SerializationFrameworkMiniBenchmarks.java
+++ b/src/main/java/org/apache/flink/benchmark/SerializationFrameworkMiniBenchmarks.java
@@ -68,7 +68,8 @@ public void serializerPojo(FlinkEnvironmentContext context) throws Exception {
StreamExecutionEnvironment env = context.env;
env.setParallelism(4);
ExecutionConfig executionConfig = env.getConfig();
- SerializerConfigImpl serializerConfig = (SerializerConfigImpl) executionConfig.getSerializerConfig();
+ SerializerConfigImpl serializerConfig =
+ (SerializerConfigImpl) executionConfig.getSerializerConfig();
serializerConfig.registerPojoType(MyPojo.class);
serializerConfig.registerPojoType(MyOperation.class);
@@ -85,7 +86,8 @@ public void serializerHeavyString(FlinkEnvironmentContext context) throws Except
StreamExecutionEnvironment env = context.env;
env.setParallelism(1);
ExecutionConfig executionConfig = env.getConfig();
- SerializerConfigImpl serializerConfig = (SerializerConfigImpl) executionConfig.getSerializerConfig();
+ SerializerConfigImpl serializerConfig =
+ (SerializerConfigImpl) executionConfig.getSerializerConfig();
serializerConfig.registerPojoType(MyPojo.class);
serializerConfig.registerPojoType(MyOperation.class);
@@ -115,7 +117,8 @@ public void serializerKryo(FlinkEnvironmentContext context) throws Exception {
StreamExecutionEnvironment env = context.env;
env.setParallelism(4);
ExecutionConfig executionConfig = env.getConfig();
- SerializerConfigImpl serializerConfig = (SerializerConfigImpl) executionConfig.getSerializerConfig();
+ SerializerConfigImpl serializerConfig =
+ (SerializerConfigImpl) executionConfig.getSerializerConfig();
serializerConfig.setForceKryo(true);
serializerConfig.registerKryoType(MyPojo.class);
serializerConfig.registerKryoType(MyOperation.class);
@@ -167,9 +170,9 @@ protected void init() {
super.init();
templates =
new String[] {
- makeString(StringSerializationBenchmark.asciiChars, 1024),
- makeString(StringSerializationBenchmark.russianChars, 1024),
- makeString(StringSerializationBenchmark.chineseChars, 1024)
+ makeString(StringSerializationBenchmark.ASCII_CHARS, 1024),
+ makeString(StringSerializationBenchmark.RUSSIAN_CHARS, 1024),
+ makeString(StringSerializationBenchmark.CHINESE_CHARS, 1024)
};
}
diff --git a/src/main/java/org/apache/flink/benchmark/StateBackendBenchmarkBase.java b/src/main/java/org/apache/flink/benchmark/StateBackendBenchmarkBase.java
index 00d665e2..629ee993 100644
--- a/src/main/java/org/apache/flink/benchmark/StateBackendBenchmarkBase.java
+++ b/src/main/java/org/apache/flink/benchmark/StateBackendBenchmarkBase.java
@@ -21,11 +21,7 @@
import org.apache.flink.benchmark.functions.IntegerLongSource;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.configuration.StateBackendOptions;
-import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
-import org.apache.flink.runtime.state.AbstractStateBackend;
-import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.util.FileUtils;
@@ -65,7 +61,6 @@ public void setUp(StateBackend stateBackend, long recordsPerInvocation) throws I
e.printStackTrace();
}
-
Configuration configuration = Configuration.fromMap(env.getConfiguration().toMap());
String checkpointDataUri = "file://" + checkpointDir.getAbsolutePath();
switch (stateBackend) {
@@ -93,7 +88,7 @@ public void setUp(StateBackend stateBackend, long recordsPerInvocation) throws I
}
// default character
- //env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+ // env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
source = env.addSource(new IntegerLongSource(numberOfElements, recordsPerInvocation));
}
diff --git a/src/main/java/org/apache/flink/benchmark/WatermarkAggregationBenchmark.java b/src/main/java/org/apache/flink/benchmark/WatermarkAggregationBenchmark.java
index 07d60633..9bdac477 100644
--- a/src/main/java/org/apache/flink/benchmark/WatermarkAggregationBenchmark.java
+++ b/src/main/java/org/apache/flink/benchmark/WatermarkAggregationBenchmark.java
@@ -16,15 +16,12 @@
* limitations under the License.
*/
-
package org.apache.flink.benchmark;
import org.apache.flink.runtime.source.coordinator.SourceCoordinatorAlignmentBenchmark;
import org.openjdk.jmh.annotations.Benchmark;
-import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Level;
-import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OperationsPerInvocation;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.TearDown;
@@ -34,7 +31,9 @@
import org.openjdk.jmh.runner.options.OptionsBuilder;
import org.openjdk.jmh.runner.options.VerboseMode;
-/** The watermark aggregation benchmark for source coordinator when enabling the watermark alignment. */
+/**
+ * The watermark aggregation benchmark for source coordinator when enabling the watermark alignment.
+ */
public class WatermarkAggregationBenchmark extends BenchmarkBase {
private static final int NUM_SUBTASKS = 5000;
@@ -47,7 +46,10 @@ public static void main(String[] args) throws RunnerException {
Options options =
new OptionsBuilder()
.verbosity(VerboseMode.NORMAL)
- .include(".*" + WatermarkAggregationBenchmark.class.getCanonicalName() + ".*")
+ .include(
+ ".*"
+ + WatermarkAggregationBenchmark.class.getCanonicalName()
+ + ".*")
.build();
new Runner(options).run();
@@ -71,5 +73,4 @@ public void aggregateWatermark() {
public void teardown() throws Exception {
benchmark.teardown();
}
-
}
diff --git a/src/main/java/org/apache/flink/benchmark/full/PojoSerializationBenchmark.java b/src/main/java/org/apache/flink/benchmark/full/PojoSerializationBenchmark.java
index f6a961b4..98a34b9f 100644
--- a/src/main/java/org/apache/flink/benchmark/full/PojoSerializationBenchmark.java
+++ b/src/main/java/org/apache/flink/benchmark/full/PojoSerializationBenchmark.java
@@ -61,7 +61,9 @@ public class PojoSerializationBenchmark extends BenchmarkBase {
TypeInformation.of(SerializationFrameworkMiniBenchmarks.MyPojo.class)
.createSerializer(config.getSerializerConfig());
TypeSerializer kryoSerializer =
- new KryoSerializer<>(SerializationFrameworkMiniBenchmarks.MyPojo.class, config.getSerializerConfig());
+ new KryoSerializer<>(
+ SerializationFrameworkMiniBenchmarks.MyPojo.class,
+ config.getSerializerConfig());
TypeSerializer avroSerializer =
new AvroSerializer<>(org.apache.flink.benchmark.avro.MyPojo.class);
diff --git a/src/main/java/org/apache/flink/benchmark/full/SerializationFrameworkAllBenchmarks.java b/src/main/java/org/apache/flink/benchmark/full/SerializationFrameworkAllBenchmarks.java
index dafb4ae1..94f71521 100644
--- a/src/main/java/org/apache/flink/benchmark/full/SerializationFrameworkAllBenchmarks.java
+++ b/src/main/java/org/apache/flink/benchmark/full/SerializationFrameworkAllBenchmarks.java
@@ -127,7 +127,8 @@ public void serializerKryoThrift(FlinkEnvironmentContext context) throws Excepti
StreamExecutionEnvironment env = context.env;
env.setParallelism(4);
ExecutionConfig executionConfig = env.getConfig();
- SerializerConfigImpl serializerConfig = (SerializerConfigImpl) executionConfig.getSerializerConfig();
+ SerializerConfigImpl serializerConfig =
+ (SerializerConfigImpl) executionConfig.getSerializerConfig();
serializerConfig.setForceKryo(true);
serializerConfig.addDefaultKryoSerializer(
org.apache.flink.benchmark.thrift.MyPojo.class, TBaseSerializer.class);
@@ -147,7 +148,8 @@ public void serializerKryoProtobuf(FlinkEnvironmentContext context) throws Excep
StreamExecutionEnvironment env = context.env;
env.setParallelism(4);
ExecutionConfig executionConfig = env.getConfig();
- SerializerConfigImpl serializerConfig = (SerializerConfigImpl) executionConfig.getSerializerConfig();
+ SerializerConfigImpl serializerConfig =
+ (SerializerConfigImpl) executionConfig.getSerializerConfig();
serializerConfig.setForceKryo(true);
serializerConfig.registerTypeWithKryoSerializer(
org.apache.flink.benchmark.protobuf.MyPojoOuterClass.MyPojo.class,
diff --git a/src/main/java/org/apache/flink/benchmark/full/StringSerializationBenchmark.java b/src/main/java/org/apache/flink/benchmark/full/StringSerializationBenchmark.java
index 6cc032ef..dd7c3b21 100644
--- a/src/main/java/org/apache/flink/benchmark/full/StringSerializationBenchmark.java
+++ b/src/main/java/org/apache/flink/benchmark/full/StringSerializationBenchmark.java
@@ -52,20 +52,24 @@
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public class StringSerializationBenchmark extends BenchmarkBase {
- public static final char[] asciiChars =
+ public static final char[] ASCII_CHARS =
"qwertyuiopasdfghjklzxcvbnmQWERTYUIOPASDFGHJKLZXCVBNM1234567890".toCharArray();
- public static final char[] russianChars =
+ public static final char[] RUSSIAN_CHARS =
"йцукенгшщзхъфывапролджэячсмитьбюЙЦУКЕНГШЩЗХЪФЫВАПРОЛДЖЭЯЧСМИТЬБЮ".toCharArray();
- public static final char[] chineseChars =
+ public static final char[] CHINESE_CHARS =
"的是不了人我在有他这为之大来以个中上们到国说和地也子要时道出而于就下得可你年生".toCharArray();
+
@Param({"ascii", "russian", "chinese"})
public String type;
+
@Param({"4", "128", "16384"})
public String lengthStr;
+
int length;
String input;
ExecutionConfig config = new ExecutionConfig();
- TypeSerializer serializer = TypeInformation.of(String.class).createSerializer(config.getSerializerConfig());
+ TypeSerializer serializer =
+ TypeInformation.of(String.class).createSerializer(config.getSerializerConfig());
ByteArrayInputStream serializedBuffer;
DataInputView serializedStream;
@@ -85,13 +89,13 @@ public void setup() throws IOException {
length = Integer.parseInt(lengthStr);
switch (type) {
case "ascii":
- input = generate(asciiChars, length);
+ input = generate(ASCII_CHARS, length);
break;
case "russian":
- input = generate(russianChars, length);
+ input = generate(RUSSIAN_CHARS, length);
break;
case "chinese":
- input = generate(chineseChars, length);
+ input = generate(CHINESE_CHARS, length);
break;
default:
throw new IllegalArgumentException(type + "charset is not supported");
diff --git a/src/main/java/org/apache/flink/benchmark/functions/IntegerLongSource.java b/src/main/java/org/apache/flink/benchmark/functions/IntegerLongSource.java
index 22ad2f96..6afba8d0 100644
--- a/src/main/java/org/apache/flink/benchmark/functions/IntegerLongSource.java
+++ b/src/main/java/org/apache/flink/benchmark/functions/IntegerLongSource.java
@@ -24,6 +24,7 @@ public class IntegerLongSource extends RichParallelSourceFunction source(StreamExecutionEnvironment environment, long maxValue) {
return factory.apply(environment, maxValue);
}
-};
+}
diff --git a/src/main/java/org/apache/flink/scheduler/benchmark/e2e/HandleGlobalFailureAndRestartAllTasksBenchmarkExecutor.java b/src/main/java/org/apache/flink/scheduler/benchmark/e2e/HandleGlobalFailureAndRestartAllTasksBenchmarkExecutor.java
index 105ad5ed..2b89b551 100644
--- a/src/main/java/org/apache/flink/scheduler/benchmark/e2e/HandleGlobalFailureAndRestartAllTasksBenchmarkExecutor.java
+++ b/src/main/java/org/apache/flink/scheduler/benchmark/e2e/HandleGlobalFailureAndRestartAllTasksBenchmarkExecutor.java
@@ -32,7 +32,8 @@
import org.openjdk.jmh.runner.RunnerException;
/** The benchmark of handle global failure and restarting tasks in a STREAMING/BATCH job. */
-public class HandleGlobalFailureAndRestartAllTasksBenchmarkExecutor extends SchedulerBenchmarkExecutorBase {
+public class HandleGlobalFailureAndRestartAllTasksBenchmarkExecutor
+ extends SchedulerBenchmarkExecutorBase {
@Param({"BATCH", "STREAMING", "BATCH_EVENLY", "STREAMING_EVENLY"})
private JobConfiguration jobConfiguration;
diff --git a/src/main/java/org/apache/flink/scheduler/benchmark/scheduling/SchedulingDownstreamTasksInBatchJobBenchmarkExecutor.java b/src/main/java/org/apache/flink/scheduler/benchmark/scheduling/SchedulingDownstreamTasksInBatchJobBenchmarkExecutor.java
index 182776b4..b7187470 100644
--- a/src/main/java/org/apache/flink/scheduler/benchmark/scheduling/SchedulingDownstreamTasksInBatchJobBenchmarkExecutor.java
+++ b/src/main/java/org/apache/flink/scheduler/benchmark/scheduling/SchedulingDownstreamTasksInBatchJobBenchmarkExecutor.java
@@ -35,7 +35,12 @@
public class SchedulingDownstreamTasksInBatchJobBenchmarkExecutor
extends SchedulerBenchmarkExecutorBase {
- @Param({"BATCH", "BATCH_HYBRID_DEFAULT", "BATCH_HYBRID_PARTIAL_FINISHED", "BATCH_HYBRID_ALL_FINISHED"})
+ @Param({
+ "BATCH",
+ "BATCH_HYBRID_DEFAULT",
+ "BATCH_HYBRID_PARTIAL_FINISHED",
+ "BATCH_HYBRID_ALL_FINISHED"
+ })
private JobConfiguration jobConfiguration;
private SchedulingDownstreamTasksInBatchJobBenchmark benchmark;
diff --git a/src/main/java/org/apache/flink/state/benchmark/HashMapStateBackendRescalingBenchmarkExecutor.java b/src/main/java/org/apache/flink/state/benchmark/HashMapStateBackendRescalingBenchmarkExecutor.java
index 1ec6032b..f630cc87 100644
--- a/src/main/java/org/apache/flink/state/benchmark/HashMapStateBackendRescalingBenchmarkExecutor.java
+++ b/src/main/java/org/apache/flink/state/benchmark/HashMapStateBackendRescalingBenchmarkExecutor.java
@@ -19,13 +19,16 @@
package org.apache.flink.state.benchmark;
import org.apache.flink.api.common.JobID;
-import org.apache.flink.config.ConfigUtil;
-import org.apache.flink.config.StateBenchmarkOptions;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.state.benchmark.RescalingBenchmarkBuilder;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage;
-import org.openjdk.jmh.annotations.*;
+
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.runner.RunnerException;
import java.io.IOException;
@@ -48,7 +51,8 @@ public static void main(String[] args) throws RunnerException {
@Setup(Level.Trial)
public void setUp() throws Exception {
- // FsStateBackend is deprecated in favor of HashMapStateBackend with setting checkpointStorage.
+ // FsStateBackend is deprecated in favor of HashMapStateBackend with setting
+ // checkpointStorage.
HashMapStateBackend stateBackend = new HashMapStateBackend();
benchmark =
new RescalingBenchmarkBuilder()
@@ -56,10 +60,16 @@ public void setUp() throws Exception {
.setParallelismBefore(rescaleType.getParallelismBefore())
.setParallelismAfter(rescaleType.getParallelismAfter())
.setCheckpointStorageAccess(
- new FileSystemCheckpointStorage(new URI("file://" + prepareDirectory("rescaleDb").getAbsolutePath()), 0)
+ new FileSystemCheckpointStorage(
+ new URI(
+ "file://"
+ + prepareDirectory("rescaleDb")
+ .getAbsolutePath()),
+ 0)
.createCheckpointStorage(new JobID()))
.setStateBackend(stateBackend)
- .setStreamRecordGenerator(new ByteArrayRecordGenerator(numberOfKeys, keyLen))
+ .setStreamRecordGenerator(
+ new ByteArrayRecordGenerator(numberOfKeys, keyLen))
.setStateProcessFunctionSupplier(TestKeyedFunction::new)
.build();
benchmark.setUp();
diff --git a/src/main/java/org/apache/flink/state/benchmark/ListStateBenchmark.java b/src/main/java/org/apache/flink/state/benchmark/ListStateBenchmark.java
index c8c6f2d9..e350ace5 100644
--- a/src/main/java/org/apache/flink/state/benchmark/ListStateBenchmark.java
+++ b/src/main/java/org/apache/flink/state/benchmark/ListStateBenchmark.java
@@ -39,14 +39,14 @@
import static org.apache.flink.state.benchmark.StateBackendBenchmarkUtils.applyToAllKeys;
import static org.apache.flink.state.benchmark.StateBackendBenchmarkUtils.compactState;
import static org.apache.flink.state.benchmark.StateBackendBenchmarkUtils.getListState;
-import static org.apache.flink.state.benchmark.StateBenchmarkConstants.listValueCount;
-import static org.apache.flink.state.benchmark.StateBenchmarkConstants.setupKeyCount;
+import static org.apache.flink.state.benchmark.StateBenchmarkConstants.LIST_VALUE_COUNT;
+import static org.apache.flink.state.benchmark.StateBenchmarkConstants.SETUP_KEY_COUNT;
/** Implementation for list state benchmark testing. */
public class ListStateBenchmark extends StateBenchmarkBase {
- private final String STATE_NAME = "listState";
- private final ListStateDescriptor STATE_DESC =
- new ListStateDescriptor<>(STATE_NAME, Long.class);
+ private final String stateName = "listState";
+ private final ListStateDescriptor stateDesc =
+ new ListStateDescriptor<>(stateName, Long.class);
private ListState listState;
private List dummyLists;
@@ -63,9 +63,9 @@ public static void main(String[] args) throws RunnerException {
@Setup
public void setUp() throws Exception {
keyedStateBackend = createKeyedStateBackend();
- listState = getListState(keyedStateBackend, STATE_DESC);
- dummyLists = new ArrayList<>(listValueCount);
- for (int i = 0; i < listValueCount; ++i) {
+ listState = getListState(keyedStateBackend, stateDesc);
+ dummyLists = new ArrayList<>(LIST_VALUE_COUNT);
+ for (int i = 0; i < LIST_VALUE_COUNT; ++i) {
dummyLists.add(random.nextLong());
}
keyIndex = new AtomicInteger();
@@ -73,27 +73,27 @@ public void setUp() throws Exception {
@Setup(Level.Iteration)
public void setUpPerIteration() throws Exception {
- for (int i = 0; i < setupKeyCount; ++i) {
+ for (int i = 0; i < SETUP_KEY_COUNT; ++i) {
keyedStateBackend.setCurrentKey((long) i);
listState.add(random.nextLong());
}
// make sure only one sst file left, so all get invocation will access this single file,
// to prevent the spike caused by different key distribution in multiple sst files,
// the more access to the older sst file, the lower throughput will be.
- compactState(keyedStateBackend, STATE_DESC);
+ compactState(keyedStateBackend, stateDesc);
}
@TearDown(Level.Iteration)
public void tearDownPerIteration() throws Exception {
applyToAllKeys(
keyedStateBackend,
- STATE_DESC,
+ stateDesc,
(k, state) -> {
keyedStateBackend.setCurrentKey(k);
state.clear();
});
// make the clearance effective, trigger compaction for RocksDB, and GC for heap.
- if (!compactState(keyedStateBackend, STATE_DESC)) {
+ if (!compactState(keyedStateBackend, stateDesc)) {
System.gc();
}
// wait a while for the clearance to take effect.
diff --git a/src/main/java/org/apache/flink/state/benchmark/MapStateBenchmark.java b/src/main/java/org/apache/flink/state/benchmark/MapStateBenchmark.java
index 32dce602..c840b5bd 100644
--- a/src/main/java/org/apache/flink/state/benchmark/MapStateBenchmark.java
+++ b/src/main/java/org/apache/flink/state/benchmark/MapStateBenchmark.java
@@ -37,9 +37,9 @@
import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.flink.state.benchmark.StateBackendBenchmarkUtils.getMapState;
-import static org.apache.flink.state.benchmark.StateBenchmarkConstants.mapKeyCount;
-import static org.apache.flink.state.benchmark.StateBenchmarkConstants.mapKeys;
-import static org.apache.flink.state.benchmark.StateBenchmarkConstants.setupKeyCount;
+import static org.apache.flink.state.benchmark.StateBenchmarkConstants.MAP_KEYS;
+import static org.apache.flink.state.benchmark.StateBenchmarkConstants.MAP_KEY_COUNT;
+import static org.apache.flink.state.benchmark.StateBenchmarkConstants.SETUP_KEY_COUNT;
/** Implementation for map state benchmark testing. */
public class MapStateBenchmark extends StateBenchmarkBase {
@@ -63,14 +63,14 @@ public void setUp() throws Exception {
getMapState(
keyedStateBackend,
new MapStateDescriptor<>("mapState", Long.class, Double.class));
- dummyMaps = new HashMap<>(mapKeyCount);
- for (int i = 0; i < mapKeyCount; ++i) {
- dummyMaps.put(mapKeys.get(i), random.nextDouble());
+ dummyMaps = new HashMap<>(MAP_KEY_COUNT);
+ for (int i = 0; i < MAP_KEY_COUNT; ++i) {
+ dummyMaps.put(MAP_KEYS.get(i), random.nextDouble());
}
- for (int i = 0; i < setupKeyCount; ++i) {
+ for (int i = 0; i < SETUP_KEY_COUNT; ++i) {
keyedStateBackend.setCurrentKey((long) i);
- for (int j = 0; j < mapKeyCount; j++) {
- mapState.put(mapKeys.get(j), random.nextDouble());
+ for (int j = 0; j < MAP_KEY_COUNT; j++) {
+ mapState.put(MAP_KEYS.get(j), random.nextDouble());
}
}
keyIndex = new AtomicInteger();
@@ -107,7 +107,7 @@ public boolean mapIsEmpty(KeyValue keyValue) throws Exception {
}
@Benchmark
- @OperationsPerInvocation(mapKeyCount)
+ @OperationsPerInvocation(MAP_KEY_COUNT)
public void mapKeys(KeyValue keyValue, Blackhole bh) throws Exception {
keyedStateBackend.setCurrentKey(keyValue.setUpKey);
for (Long key : mapState.keys()) {
@@ -116,7 +116,7 @@ public void mapKeys(KeyValue keyValue, Blackhole bh) throws Exception {
}
@Benchmark
- @OperationsPerInvocation(mapKeyCount)
+ @OperationsPerInvocation(MAP_KEY_COUNT)
public void mapValues(KeyValue keyValue, Blackhole bh) throws Exception {
keyedStateBackend.setCurrentKey(keyValue.setUpKey);
for (Double value : mapState.values()) {
@@ -125,7 +125,7 @@ public void mapValues(KeyValue keyValue, Blackhole bh) throws Exception {
}
@Benchmark
- @OperationsPerInvocation(mapKeyCount)
+ @OperationsPerInvocation(MAP_KEY_COUNT)
public void mapEntries(KeyValue keyValue, Blackhole bh) throws Exception {
keyedStateBackend.setCurrentKey(keyValue.setUpKey);
Iterable> iterable = mapState.entries();
@@ -138,7 +138,7 @@ public void mapEntries(KeyValue keyValue, Blackhole bh) throws Exception {
}
@Benchmark
- @OperationsPerInvocation(mapKeyCount)
+ @OperationsPerInvocation(MAP_KEY_COUNT)
public void mapIterator(KeyValue keyValue, Blackhole bh) throws Exception {
keyedStateBackend.setCurrentKey(keyValue.setUpKey);
Iterator> iterator = mapState.iterator();
diff --git a/src/main/java/org/apache/flink/state/benchmark/RescalingBenchmarkBase.java b/src/main/java/org/apache/flink/state/benchmark/RescalingBenchmarkBase.java
index f5b749d0..4b6d68c2 100644
--- a/src/main/java/org/apache/flink/state/benchmark/RescalingBenchmarkBase.java
+++ b/src/main/java/org/apache/flink/state/benchmark/RescalingBenchmarkBase.java
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.state.benchmark;
import org.apache.flink.api.common.functions.OpenContext;
@@ -23,13 +24,10 @@
import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.benchmark.BenchmarkBase;
-import org.apache.flink.config.ConfigUtil;
-import org.apache.flink.config.StateBenchmarkOptions;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.state.benchmark.RescalingBenchmark;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.Collector;
+
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.State;
@@ -41,8 +39,6 @@
import java.io.File;
import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Random;
@@ -105,9 +101,7 @@ protected static class ByteArrayRecordGenerator
private final byte[] fatArray;
private int count = 0;
-
- protected ByteArrayRecordGenerator(final int numberOfKeys,
- final int keyLen) {
+ protected ByteArrayRecordGenerator(final int numberOfKeys, final int keyLen) {
this.numberOfKeys = numberOfKeys;
fatArray = new byte[keyLen];
}
diff --git a/src/main/java/org/apache/flink/state/benchmark/RocksdbStateBackendRescalingBenchmarkExecutor.java b/src/main/java/org/apache/flink/state/benchmark/RocksdbStateBackendRescalingBenchmarkExecutor.java
index b552ad76..205fc8e6 100644
--- a/src/main/java/org/apache/flink/state/benchmark/RocksdbStateBackendRescalingBenchmarkExecutor.java
+++ b/src/main/java/org/apache/flink/state/benchmark/RocksdbStateBackendRescalingBenchmarkExecutor.java
@@ -15,17 +15,20 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.state.benchmark;
import org.apache.flink.api.common.JobID;
-import org.apache.flink.config.ConfigUtil;
-import org.apache.flink.config.StateBenchmarkOptions;
-import org.apache.flink.configuration.Configuration;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
-import org.apache.flink.state.benchmark.RescalingBenchmarkBuilder;
import org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage;
-import org.openjdk.jmh.annotations.*;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.runner.RunnerException;
import java.io.IOException;
@@ -55,10 +58,14 @@ public void setUp() throws Exception {
.setParallelismAfter(rescaleType.getParallelismAfter())
.setManagedMemorySize(512 * 1024 * 1024)
.setCheckpointStorageAccess(
- new FileSystemCheckpointStorage("file://" + prepareDirectory("rescaleDb").getAbsolutePath())
+ new FileSystemCheckpointStorage(
+ "file://"
+ + prepareDirectory("rescaleDb")
+ .getAbsolutePath())
.createCheckpointStorage(new JobID()))
.setStateBackend(stateBackend)
- .setStreamRecordGenerator(new ByteArrayRecordGenerator(numberOfKeys, keyLen))
+ .setStreamRecordGenerator(
+ new ByteArrayRecordGenerator(numberOfKeys, keyLen))
.setStateProcessFunctionSupplier(TestKeyedFunction::new)
.build();
benchmark.setUp();
diff --git a/src/main/java/org/apache/flink/state/benchmark/StateBenchmarkBase.java b/src/main/java/org/apache/flink/state/benchmark/StateBenchmarkBase.java
index 8c1f970d..c61c82e4 100644
--- a/src/main/java/org/apache/flink/state/benchmark/StateBenchmarkBase.java
+++ b/src/main/java/org/apache/flink/state/benchmark/StateBenchmarkBase.java
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.state.benchmark;
import org.apache.flink.benchmark.BenchmarkBase;
@@ -22,8 +23,8 @@
import org.apache.flink.config.StateBenchmarkOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.KeyedStateBackend;
-
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
+
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
@@ -41,15 +42,15 @@
import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.flink.state.benchmark.StateBackendBenchmarkUtils.cleanUp;
-import static org.apache.flink.state.benchmark.StateBenchmarkConstants.mapKeyCount;
-import static org.apache.flink.state.benchmark.StateBenchmarkConstants.mapKeys;
-import static org.apache.flink.state.benchmark.StateBenchmarkConstants.mapValues;
-import static org.apache.flink.state.benchmark.StateBenchmarkConstants.newKeyCount;
-import static org.apache.flink.state.benchmark.StateBenchmarkConstants.newKeys;
-import static org.apache.flink.state.benchmark.StateBenchmarkConstants.randomValueCount;
-import static org.apache.flink.state.benchmark.StateBenchmarkConstants.randomValues;
-import static org.apache.flink.state.benchmark.StateBenchmarkConstants.setupKeyCount;
-import static org.apache.flink.state.benchmark.StateBenchmarkConstants.setupKeys;
+import static org.apache.flink.state.benchmark.StateBenchmarkConstants.MAP_KEYS;
+import static org.apache.flink.state.benchmark.StateBenchmarkConstants.MAP_KEY_COUNT;
+import static org.apache.flink.state.benchmark.StateBenchmarkConstants.MAP_VALUES;
+import static org.apache.flink.state.benchmark.StateBenchmarkConstants.NEW_KEYS;
+import static org.apache.flink.state.benchmark.StateBenchmarkConstants.NEW_KEY_COUNT;
+import static org.apache.flink.state.benchmark.StateBenchmarkConstants.RANDOM_VALUES;
+import static org.apache.flink.state.benchmark.StateBenchmarkConstants.RANDOM_VALUE_COUNT;
+import static org.apache.flink.state.benchmark.StateBenchmarkConstants.SETUP_KEYS;
+import static org.apache.flink.state.benchmark.StateBenchmarkConstants.SETUP_KEY_COUNT;
/** Base implementation of the state benchmarks. */
public class StateBenchmarkBase extends BenchmarkBase {
@@ -66,8 +67,10 @@ protected KeyedStateBackend createKeyedStateBackend() throws Exception {
return createKeyedStateBackend(TtlTimeProvider.DEFAULT);
}
- protected KeyedStateBackend createKeyedStateBackend(TtlTimeProvider ttlTimeProvider) throws Exception {
- return StateBackendBenchmarkUtils.createKeyedStateBackend(backendType, createStateDataDir());
+ protected KeyedStateBackend createKeyedStateBackend(TtlTimeProvider ttlTimeProvider)
+ throws Exception {
+ return StateBackendBenchmarkUtils.createKeyedStateBackend(
+ backendType, createStateDataDir());
}
public static File createStateDataDir() throws IOException {
@@ -108,16 +111,16 @@ public static class KeyValue {
@Setup(Level.Invocation)
public void kvSetup() {
int currentIndex = getCurrentIndex();
- setUpKey = setupKeys.get(currentIndex % setupKeyCount);
- newKey = newKeys.get(currentIndex % newKeyCount);
- mapKey = mapKeys.get(currentIndex % mapKeyCount);
- mapValue = mapValues.get(currentIndex % mapKeyCount);
- value = randomValues.get(currentIndex % randomValueCount);
+ setUpKey = SETUP_KEYS.get(currentIndex % SETUP_KEY_COUNT);
+ newKey = NEW_KEYS.get(currentIndex % NEW_KEY_COUNT);
+ mapKey = MAP_KEYS.get(currentIndex % MAP_KEY_COUNT);
+ mapValue = MAP_VALUES.get(currentIndex % MAP_KEY_COUNT);
+ value = RANDOM_VALUES.get(currentIndex % RANDOM_VALUE_COUNT);
// TODO: singletonList is taking 25% of time in mapAdd benchmark... This shouldn't be
// initiated if benchmark is not using it and for the benchmarks that are using it,
// this should also be probably somehow avoided.
listValue =
- Collections.singletonList(randomValues.get(currentIndex % randomValueCount));
+ Collections.singletonList(RANDOM_VALUES.get(currentIndex % RANDOM_VALUE_COUNT));
}
@TearDown(Level.Invocation)
diff --git a/src/main/java/org/apache/flink/state/benchmark/StateBenchmarkConstants.java b/src/main/java/org/apache/flink/state/benchmark/StateBenchmarkConstants.java
index c0a141f3..d445a202 100644
--- a/src/main/java/org/apache/flink/state/benchmark/StateBenchmarkConstants.java
+++ b/src/main/java/org/apache/flink/state/benchmark/StateBenchmarkConstants.java
@@ -28,54 +28,54 @@
*/
public class StateBenchmarkConstants {
// TODO: why all of those static fields? Those should be inside a context class
- public static final int mapKeyCount = 10;
- public static final int listValueCount = 100;
- public static final int setupKeyCount = 500_000;
- public static final String rootDirName = "benchmark";
- public static final String recoveryDirName = "localRecovery";
- public static final String dbDirName = "dbPath";
+ public static final int MAP_KEY_COUNT = 10;
+ public static final int LIST_VALUE_COUNT = 100;
+ public static final int SETUP_KEY_COUNT = 500_000;
+ public static final String ROOT_DIR_NAME = "benchmark";
+ public static final String RECOVERY_DIR_NAME = "localRecovery";
+ public static final String DB_PATH = "dbPath";
- public static final ArrayList mapKeys = new ArrayList<>(mapKeyCount);
- public static final ArrayList mapValues = new ArrayList<>(mapKeyCount);
- public static final ArrayList setupKeys = new ArrayList<>(setupKeyCount);
- public static final int newKeyCount = 500_000;
- public static final ArrayList newKeys = new ArrayList<>(newKeyCount);
- public static final int randomValueCount = 1_000_000;
- public static final ArrayList randomValues = new ArrayList<>(randomValueCount);
+ public static final ArrayList MAP_KEYS = new ArrayList<>(MAP_KEY_COUNT);
+ public static final ArrayList MAP_VALUES = new ArrayList<>(MAP_KEY_COUNT);
+ public static final ArrayList SETUP_KEYS = new ArrayList<>(SETUP_KEY_COUNT);
+ public static final int NEW_KEY_COUNT = 500_000;
+ public static final ArrayList NEW_KEYS = new ArrayList<>(NEW_KEY_COUNT);
+ public static final int RANDOM_VALUE_COUNT = 1_000_000;
+ public static final ArrayList RANDOM_VALUES = new ArrayList<>(RANDOM_VALUE_COUNT);
static {
- for (int i = 0; i < mapKeyCount; i++) {
- mapKeys.add((long) i);
+ for (int i = 0; i < MAP_KEY_COUNT; i++) {
+ MAP_KEYS.add((long) i);
}
- Collections.shuffle(mapKeys);
+ Collections.shuffle(MAP_KEYS);
}
static {
Random random = new Random();
- for (int i = 0; i < mapKeyCount; i++) {
- mapValues.add(random.nextDouble());
+ for (int i = 0; i < MAP_KEY_COUNT; i++) {
+ MAP_VALUES.add(random.nextDouble());
}
- Collections.shuffle(mapValues);
+ Collections.shuffle(MAP_VALUES);
}
static {
- for (long i = 0; i < setupKeyCount; i++) {
- setupKeys.add(i);
+ for (long i = 0; i < SETUP_KEY_COUNT; i++) {
+ SETUP_KEYS.add(i);
}
- Collections.shuffle(setupKeys);
+ Collections.shuffle(SETUP_KEYS);
}
static {
- for (long i = 0; i < newKeyCount; i++) {
- newKeys.add(i + setupKeyCount);
+ for (long i = 0; i < NEW_KEY_COUNT; i++) {
+ NEW_KEYS.add(i + SETUP_KEY_COUNT);
}
- Collections.shuffle(newKeys);
+ Collections.shuffle(NEW_KEYS);
}
static {
- for (long i = 0; i < randomValueCount; i++) {
- randomValues.add(i);
+ for (long i = 0; i < RANDOM_VALUE_COUNT; i++) {
+ RANDOM_VALUES.add(i);
}
- Collections.shuffle(randomValues);
+ Collections.shuffle(RANDOM_VALUES);
}
}
diff --git a/src/main/java/org/apache/flink/state/benchmark/ValueStateBenchmark.java b/src/main/java/org/apache/flink/state/benchmark/ValueStateBenchmark.java
index 0be9e143..7c35d50d 100644
--- a/src/main/java/org/apache/flink/state/benchmark/ValueStateBenchmark.java
+++ b/src/main/java/org/apache/flink/state/benchmark/ValueStateBenchmark.java
@@ -33,7 +33,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.flink.state.benchmark.StateBackendBenchmarkUtils.getValueState;
-import static org.apache.flink.state.benchmark.StateBenchmarkConstants.setupKeyCount;
+import static org.apache.flink.state.benchmark.StateBenchmarkConstants.SETUP_KEY_COUNT;
/** Implementation for listValue state benchmark testing. */
public class ValueStateBenchmark extends StateBenchmarkBase {
@@ -54,7 +54,7 @@ public void setUp() throws Exception {
keyedStateBackend = createKeyedStateBackend();
valueState =
getValueState(keyedStateBackend, new ValueStateDescriptor<>("kvState", Long.class));
- for (int i = 0; i < setupKeyCount; ++i) {
+ for (int i = 0; i < SETUP_KEY_COUNT; ++i) {
keyedStateBackend.setCurrentKey((long) i);
valueState.update(random.nextLong());
}
diff --git a/src/main/java/org/apache/flink/state/benchmark/ttl/TtlListStateBenchmark.java b/src/main/java/org/apache/flink/state/benchmark/ttl/TtlListStateBenchmark.java
index 829b440f..33fdac7e 100644
--- a/src/main/java/org/apache/flink/state/benchmark/ttl/TtlListStateBenchmark.java
+++ b/src/main/java/org/apache/flink/state/benchmark/ttl/TtlListStateBenchmark.java
@@ -21,6 +21,7 @@
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.state.benchmark.StateBenchmarkBase;
+
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Setup;
@@ -39,12 +40,12 @@
import static org.apache.flink.state.benchmark.StateBackendBenchmarkUtils.applyToAllKeys;
import static org.apache.flink.state.benchmark.StateBackendBenchmarkUtils.compactState;
import static org.apache.flink.state.benchmark.StateBackendBenchmarkUtils.getListState;
-import static org.apache.flink.state.benchmark.StateBenchmarkConstants.listValueCount;
-import static org.apache.flink.state.benchmark.StateBenchmarkConstants.setupKeyCount;
+import static org.apache.flink.state.benchmark.StateBenchmarkConstants.LIST_VALUE_COUNT;
+import static org.apache.flink.state.benchmark.StateBenchmarkConstants.SETUP_KEY_COUNT;
/** Implementation for list state benchmark testing. */
public class TtlListStateBenchmark extends TtlStateBenchmarkBase {
- private final String STATE_NAME = "listState";
+ private final String stateName = "listState";
private ListStateDescriptor stateDesc;
private ListState listState;
private List dummyLists;
@@ -62,10 +63,10 @@ public static void main(String[] args) throws RunnerException {
@Setup
public void setUp() throws Exception {
keyedStateBackend = createKeyedStateBackend();
- stateDesc = configTtl(new ListStateDescriptor<>(STATE_NAME, Long.class));
+ stateDesc = configTtl(new ListStateDescriptor<>(stateName, Long.class));
listState = getListState(keyedStateBackend, stateDesc);
- dummyLists = new ArrayList<>(listValueCount);
- for (int i = 0; i < listValueCount; ++i) {
+ dummyLists = new ArrayList<>(LIST_VALUE_COUNT);
+ for (int i = 0; i < LIST_VALUE_COUNT; ++i) {
dummyLists.add(random.nextLong());
}
keyIndex = new AtomicInteger();
@@ -73,7 +74,7 @@ public void setUp() throws Exception {
@Setup(Level.Iteration)
public void setUpPerIteration() throws Exception {
- for (int i = 0; i < setupKeyCount; ++i) {
+ for (int i = 0; i < SETUP_KEY_COUNT; ++i) {
keyedStateBackend.setCurrentKey((long) i);
setTtlWhenInitialization();
listState.add(random.nextLong());
@@ -127,7 +128,8 @@ public Iterable listGet(StateBenchmarkBase.KeyValue keyValue) throws Excep
}
@Benchmark
- public void listGetAndIterate(StateBenchmarkBase.KeyValue keyValue, Blackhole bh) throws Exception {
+ public void listGetAndIterate(StateBenchmarkBase.KeyValue keyValue, Blackhole bh)
+ throws Exception {
keyedStateBackend.setCurrentKey(keyValue.setUpKey);
Iterable iterable = listState.get();
for (Long value : iterable) {
diff --git a/src/main/java/org/apache/flink/state/benchmark/ttl/TtlMapStateBenchmark.java b/src/main/java/org/apache/flink/state/benchmark/ttl/TtlMapStateBenchmark.java
index 772a103e..8e61ffee 100644
--- a/src/main/java/org/apache/flink/state/benchmark/ttl/TtlMapStateBenchmark.java
+++ b/src/main/java/org/apache/flink/state/benchmark/ttl/TtlMapStateBenchmark.java
@@ -21,6 +21,7 @@
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.state.benchmark.StateBenchmarkBase;
+
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.OperationsPerInvocation;
@@ -38,9 +39,9 @@
import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.flink.state.benchmark.StateBackendBenchmarkUtils.getMapState;
-import static org.apache.flink.state.benchmark.StateBenchmarkConstants.mapKeyCount;
-import static org.apache.flink.state.benchmark.StateBenchmarkConstants.mapKeys;
-import static org.apache.flink.state.benchmark.StateBenchmarkConstants.setupKeyCount;
+import static org.apache.flink.state.benchmark.StateBenchmarkConstants.MAP_KEYS;
+import static org.apache.flink.state.benchmark.StateBenchmarkConstants.MAP_KEY_COUNT;
+import static org.apache.flink.state.benchmark.StateBenchmarkConstants.SETUP_KEY_COUNT;
/** Implementation for map state benchmark testing. */
public class TtlMapStateBenchmark extends TtlStateBenchmarkBase {
@@ -64,15 +65,15 @@ public void setUp() throws Exception {
getMapState(
keyedStateBackend,
configTtl(new MapStateDescriptor<>("mapState", Long.class, Double.class)));
- dummyMaps = new HashMap<>(mapKeyCount);
- for (int i = 0; i < mapKeyCount; ++i) {
- dummyMaps.put(mapKeys.get(i), random.nextDouble());
+ dummyMaps = new HashMap<>(MAP_KEY_COUNT);
+ for (int i = 0; i < MAP_KEY_COUNT; ++i) {
+ dummyMaps.put(MAP_KEYS.get(i), random.nextDouble());
}
- for (int i = 0; i < setupKeyCount; ++i) {
+ for (int i = 0; i < SETUP_KEY_COUNT; ++i) {
keyedStateBackend.setCurrentKey((long) i);
- for (int j = 0; j < mapKeyCount; j++) {
+ for (int j = 0; j < MAP_KEY_COUNT; j++) {
setTtlWhenInitialization();
- mapState.put(mapKeys.get(j), random.nextDouble());
+ mapState.put(MAP_KEYS.get(j), random.nextDouble());
}
}
keyIndex = new AtomicInteger();
@@ -108,7 +109,7 @@ public boolean mapIsEmpty(StateBenchmarkBase.KeyValue keyValue) throws Exception
}
@Benchmark
- @OperationsPerInvocation(mapKeyCount)
+ @OperationsPerInvocation(MAP_KEY_COUNT)
public void mapIterator(StateBenchmarkBase.KeyValue keyValue, Blackhole bh) throws Exception {
keyedStateBackend.setCurrentKey(keyValue.setUpKey);
Iterator> iterator = mapState.iterator();
diff --git a/src/main/java/org/apache/flink/state/benchmark/ttl/TtlStateBenchmarkBase.java b/src/main/java/org/apache/flink/state/benchmark/ttl/TtlStateBenchmarkBase.java
index bfe00178..a5d4fa17 100644
--- a/src/main/java/org/apache/flink/state/benchmark/ttl/TtlStateBenchmarkBase.java
+++ b/src/main/java/org/apache/flink/state/benchmark/ttl/TtlStateBenchmarkBase.java
@@ -23,6 +23,7 @@
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.state.benchmark.StateBenchmarkBase;
+
import org.openjdk.jmh.annotations.Param;
import java.time.Duration;
@@ -42,6 +43,7 @@ public enum ExpiredTimeOptions {
NeverExpired(0);
public long advanceTimePerIteration;
+
ExpiredTimeOptions(int expirePercentPerIteration) {
this.advanceTimePerIteration = initialTime * expirePercentPerIteration / 100;
}
diff --git a/src/main/java/org/apache/flink/state/benchmark/ttl/TtlValueStateBenchmark.java b/src/main/java/org/apache/flink/state/benchmark/ttl/TtlValueStateBenchmark.java
index ee34cfbc..12ed5c2f 100644
--- a/src/main/java/org/apache/flink/state/benchmark/ttl/TtlValueStateBenchmark.java
+++ b/src/main/java/org/apache/flink/state/benchmark/ttl/TtlValueStateBenchmark.java
@@ -20,6 +20,7 @@
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
+
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Setup;
@@ -33,7 +34,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.flink.state.benchmark.StateBackendBenchmarkUtils.getValueState;
-import static org.apache.flink.state.benchmark.StateBenchmarkConstants.setupKeyCount;
+import static org.apache.flink.state.benchmark.StateBenchmarkConstants.SETUP_KEY_COUNT;
/** Implementation for listValue state benchmark testing. */
public class TtlValueStateBenchmark extends TtlStateBenchmarkBase {
@@ -52,8 +53,11 @@ public static void main(String[] args) throws RunnerException {
@Setup
public void setUp() throws Exception {
keyedStateBackend = createKeyedStateBackend();
- valueState = getValueState(keyedStateBackend, configTtl(new ValueStateDescriptor<>("kvState", Long.class)));
- for (int i = 0; i < setupKeyCount; ++i) {
+ valueState =
+ getValueState(
+ keyedStateBackend,
+ configTtl(new ValueStateDescriptor<>("kvState", Long.class)));
+ for (int i = 0; i < SETUP_KEY_COUNT; ++i) {
setTtlWhenInitialization();
keyedStateBackend.setCurrentKey((long) i);
valueState.update(random.nextLong());
diff --git a/tools/maven/checkstyle.xml b/tools/maven/checkstyle.xml
new file mode 100644
index 00000000..ff6c1393
--- /dev/null
+++ b/tools/maven/checkstyle.xml
@@ -0,0 +1,437 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/tools/maven/suppressions.xml b/tools/maven/suppressions.xml
new file mode 100644
index 00000000..b9a8c129
--- /dev/null
+++ b/tools/maven/suppressions.xml
@@ -0,0 +1,72 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+