Skip to content
Draft
1 change: 1 addition & 0 deletions processor/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ dependencies {

implementation "net.openhft:zero-allocation-hashing:0.16"
implementation "org.slf4j:slf4j-api:$slf4jVersion"
implementation "com.google.protobuf:protobuf-java-util:$protobufVersion"

testImplementation project(":protobuf")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import java.util.List;
import java.util.Random;
import java.util.concurrent.locks.ReentrantLock;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
Expand All @@ -42,20 +43,27 @@ public class BatchingProcessorTest {
@Timeout(30)
public void testBatchingProcessor() throws Exception {
Random rand = randomExtension.random();
ReentrantLock serializer = new ReentrantLock(true);
ProcessorTestSuite
.builder(rule)
.configureProcessorsBuilder(builder -> builder.thenProcess(
new BatchingProcessor<TestTask>(1000, 100) {
@Override
protected void processBatchingTasks(List<BatchingTask<TestTask>> batchingTasks) {
// adding some random delay to simulate realistic usage
// Since multiple calls to this method might be executed concurrently,
// it is implementation-side responsibility to ensure completion of each batch
// to happen in-order.
serializer.lock();
try {
// adding some random delay to simulate realistic usage
Thread.sleep(rand.nextInt(10));
batchingTasks.forEach(batchingTask -> batchingTask.completion().complete());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
} finally {
serializer.unlock();
}
batchingTasks.forEach(batchingTask -> batchingTask.completion().complete());
}
}
))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,17 @@

package com.linecorp.decaton.processor;

import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.assertEquals;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.CooperativeStickyAssignor;
Expand All @@ -40,12 +41,17 @@
import com.linecorp.decaton.processor.runtime.StaticPropertySupplier;
import com.linecorp.decaton.testing.KafkaClusterExtension;
import com.linecorp.decaton.testing.RandomExtension;
import com.linecorp.decaton.testing.processor.KeyedExecutorService;
import com.linecorp.decaton.testing.processor.ProcessedRecord;
import com.linecorp.decaton.testing.processor.ProcessingGuarantee;
import com.linecorp.decaton.testing.processor.ProcessingGuarantee.GuaranteeType;
import com.linecorp.decaton.testing.processor.ProcessorTestSuite;
import com.linecorp.decaton.testing.processor.ProducedRecord;
import com.linecorp.decaton.testing.processor.TestTask;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class CoreFunctionalityTest {
@RegisterExtension
public static KafkaClusterExtension rule = new KafkaClusterExtension();
Expand Down Expand Up @@ -104,25 +110,26 @@ public void testProcessConcurrent_ThreadScopeProcessor() throws Exception {
@Test
@Timeout(30)
public void testAsyncTaskCompletion() throws Exception {
ExecutorService executorService = Executors.newFixedThreadPool(16);
Random rand = randomExtension.random();
ProcessorTestSuite
.builder(rule)
.configureProcessorsBuilder(builder -> builder.thenProcess((ctx, task) -> {
DeferredCompletion completion = ctx.deferCompletion();
executorService.execute(() -> {
try {
Thread.sleep(rand.nextInt(10));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
} finally {
completion.complete();
}
});
}))
.build()
.run();
try (KeyedExecutorService executor = new KeyedExecutorService(16)) {
Random rand = randomExtension.random();
ProcessorTestSuite
.builder(rule)
.configureProcessorsBuilder(builder -> builder.thenProcess((ctx, task) -> {
DeferredCompletion completion = ctx.deferCompletion();
executor.execute(Arrays.hashCode(task.getKey()), () -> {
try {
Thread.sleep(rand.nextInt(10));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
} finally {
completion.complete();
}
});
}))
.build()
.run();
}
}

/*
Expand All @@ -136,25 +143,26 @@ public void testAsyncTaskCompletion() throws Exception {
@Test
@Timeout(30)
public void testGetCompletionInstanceLater() throws Exception {
ExecutorService executorService = Executors.newFixedThreadPool(16);
Random rand = randomExtension.random();
ProcessorTestSuite
.builder(rule)
.configureProcessorsBuilder(builder -> builder.thenProcess((ctx, task) -> {
ctx.deferCompletion();
executorService.execute(() -> {
try {
Thread.sleep(rand.nextInt(10));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
} finally {
ctx.deferCompletion().complete();
}
});
}))
.build()
.run();
try (KeyedExecutorService executor = new KeyedExecutorService(16)) {
Random rand = randomExtension.random();
ProcessorTestSuite
.builder(rule)
.configureProcessorsBuilder(builder -> builder.thenProcess((ctx, task) -> {
ctx.deferCompletion();
executor.execute(Arrays.hashCode(task.getKey()), () -> {
try {
Thread.sleep(rand.nextInt(10));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
} finally {
ctx.deferCompletion().complete();
}
});
}))
.build()
.run();
}
}

@Test
Expand All @@ -165,10 +173,12 @@ public void testSingleThreadProcessing() throws Exception {
ProcessingGuarantee noDuplicates = new ProcessingGuarantee() {
private final ConcurrentMap<HashableByteArray, List<TestTask>> produced = new ConcurrentHashMap<>();
private final ConcurrentMap<HashableByteArray, List<TestTask>> processed = new ConcurrentHashMap<>();
private final ConcurrentMap<TestTask, Long> taskToOffset = new ConcurrentHashMap<>();

@Override
public void onProduce(ProducedRecord record) {
produced.computeIfAbsent(new HashableByteArray(record.key()), key -> new ArrayList<>()).add(record.task());
taskToOffset.put(record.task(), record.offset());
}

@Override
Expand All @@ -180,7 +190,12 @@ public void onProcess(TaskMetadata metadata, ProcessedRecord record) {
public void doAssert() {
// use assertTrue instead of assertEquals not to cause error message explosion
//noinspection SimplifiableJUnitAssertion
assertTrue(produced.equals(processed));
for (Entry<HashableByteArray, List<TestTask>> e : produced.entrySet()) {
List<Long> producedTasks = e.getValue().stream().map(taskToOffset::get).collect(Collectors.toList());
List<Long> processedTasks = processed.get(e.getKey()).stream().map(taskToOffset::get).collect(Collectors.toList());
assertEquals(producedTasks, processedTasks);
}
// assertTrue(produced.equals(processed));
}
};

Expand All @@ -192,7 +207,8 @@ public void doAssert() {
(ctx, task) -> Thread.sleep(rand.nextInt(10))))
.propertySupplier(StaticPropertySupplier.of(
Property.ofStatic(ProcessorProperties.CONFIG_PARTITION_CONCURRENCY, 1),
Property.ofStatic(ProcessorProperties.CONFIG_MAX_PENDING_RECORDS, 100)
Property.ofStatic(ProcessorProperties.CONFIG_MAX_PENDING_RECORDS, 100),
Property.ofStatic(ProcessorProperties.CONFIG_SHUTDOWN_TIMEOUT_MS, 1000L)
))
.customSemantics(noDuplicates)
.build()
Expand All @@ -214,6 +230,7 @@ public void testAsyncCompletionWithLeakAndTimeout() throws Exception {
ctx.deferCompletion();
}
}))
.excludeSemantics(GuaranteeType.PROCESS_ORDERING)
.build()
.run();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,8 @@

package com.linecorp.decaton.processor;

import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
Expand All @@ -33,6 +32,8 @@
import com.linecorp.decaton.processor.runtime.SubPartitionRuntime;
import com.linecorp.decaton.testing.KafkaClusterExtension;
import com.linecorp.decaton.testing.RandomExtension;
import com.linecorp.decaton.testing.processor.KeyedExecutorService;
import com.linecorp.decaton.testing.processor.ProcessingGuarantee.GuaranteeType;
import com.linecorp.decaton.testing.processor.ProcessorTestSuite;

@EnabledForJreRange(min = JRE.JAVA_21)
Expand Down Expand Up @@ -88,26 +89,27 @@ public void testProcessConcurrent_ThreadScopeProcessor() throws Exception {
@Test
@Timeout(30)
public void testAsyncTaskCompletion() throws Exception {
ExecutorService executorService = Executors.newFixedThreadPool(16);
Random rand = randomExtension.random();
ProcessorTestSuite
.builder(rule)
.subPartitionRuntime(SubPartitionRuntime.VIRTUAL_THREAD)
.configureProcessorsBuilder(builder -> builder.thenProcess((ctx, task) -> {
DeferredCompletion completion = ctx.deferCompletion();
executorService.execute(() -> {
try {
Thread.sleep(rand.nextInt(10));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
} finally {
completion.complete();
}
});
}))
.build()
.run();
try (KeyedExecutorService executor = new KeyedExecutorService(16)) {
Random rand = randomExtension.random();
ProcessorTestSuite
.builder(rule)
.subPartitionRuntime(SubPartitionRuntime.VIRTUAL_THREAD)
.configureProcessorsBuilder(builder -> builder.thenProcess((ctx, task) -> {
DeferredCompletion completion = ctx.deferCompletion();
executor.execute(Arrays.hashCode(task.getKey()), () -> {
try {
Thread.sleep(rand.nextInt(10));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
} finally {
completion.complete();
}
});
}))
.build()
.run();
}
}

/*
Expand All @@ -121,26 +123,27 @@ public void testAsyncTaskCompletion() throws Exception {
@Test
@Timeout(30)
public void testGetCompletionInstanceLater() throws Exception {
ExecutorService executorService = Executors.newFixedThreadPool(16);
Random rand = randomExtension.random();
ProcessorTestSuite
.builder(rule)
.subPartitionRuntime(SubPartitionRuntime.VIRTUAL_THREAD)
.configureProcessorsBuilder(builder -> builder.thenProcess((ctx, task) -> {
ctx.deferCompletion();
executorService.execute(() -> {
try {
Thread.sleep(rand.nextInt(10));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
} finally {
ctx.deferCompletion().complete();
}
});
}))
.build()
.run();
try (KeyedExecutorService executor = new KeyedExecutorService(16)) {
Random rand = randomExtension.random();
ProcessorTestSuite
.builder(rule)
.subPartitionRuntime(SubPartitionRuntime.VIRTUAL_THREAD)
.configureProcessorsBuilder(builder -> builder.thenProcess((ctx, task) -> {
ctx.deferCompletion();
executor.execute(Arrays.hashCode(task.getKey()), () -> {
try {
Thread.sleep(rand.nextInt(10));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
} finally {
ctx.deferCompletion().complete();
}
});
}))
.build()
.run();
}
}

@Test
Expand All @@ -159,6 +162,7 @@ public void testAsyncCompletionWithLeakAndTimeout() throws Exception {
ctx.deferCompletion();
}
}))
.excludeSemantics(GuaranteeType.PROCESS_ORDERING)
.build()
.run();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,11 +246,12 @@ public void testDeferredCompletionLeak() throws Exception {
() -> Metrics.registry()
.find("decaton.offset.last.committed")
.tags("topic", topicName, "partition", "0")
.gauge().value() == 1.0);
.gauge().value() == 2.0);
TestUtils.awaitCondition("latest consumed offset should becomes 9",
() -> Metrics.registry()
.find("decaton.offset.latest.consumed")
.tags("topic", topicName, "partition", "0")
.gauge().value() == 9.0);
}}
}
}
}
3 changes: 2 additions & 1 deletion processor/src/it/resources/logback.xml
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
<pattern>%d{HH:mm:ss.SSS} [%thread] tp=%X{dt_topic}-%X{dt_partition} off=%X{dt_offset} %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>
<root level="WARN">
<appender-ref ref="STDOUT" />
</root>
<logger name="com.linecorp.decaton" level="DEBUG" />
<logger name="com.linecorp.decaton.benchmark" level="INFO" />
<logger name="com.linecorp.decaton.processor.runtime.internal.OutOfOrderCommitControl" level="TRACE" />

<logger name="org.apache.kafka" level="ERROR" />
<logger name="kafka" level="ERROR" />
Expand Down
Loading