Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/task-batching.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,6 @@ public class TaskBatchingMain {
}
}
----
<1> Pass `lingerMillis` and `capacity` to the constructor.
<1> Pass `lingerMillis` and `capacity` or their Supplier to the constructor.
<2> Implement `processBatchingTasks(List)`.
<3> Call `BatchingTask#completion` 's `DeferredCompletion#complete()`.
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@

package com.linecorp.decaton.processor;

import static org.junit.jupiter.api.Assertions.assertTrue;

import java.util.List;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
Expand Down Expand Up @@ -45,7 +48,7 @@ public void testBatchingProcessor() throws Exception {
ProcessorTestSuite
.builder(rule)
.configureProcessorsBuilder(builder -> builder.thenProcess(
new BatchingProcessor<TestTask>(1000, 100) {
new BatchingProcessor<TestTask>(1000L, 100) {
@Override
protected void processBatchingTasks(List<BatchingTask<TestTask>> batchingTasks) {
// adding some random delay to simulate realistic usage
Expand All @@ -65,4 +68,43 @@ protected void processBatchingTasks(List<BatchingTask<TestTask>> batchingTasks)
.build()
.run();
}

@Test
@Timeout(30)
public void testDynamicConfiguration() throws Exception {
Random rand = randomExtension.random();

final long[] lingerMsValues = {1000L, 2000L, 3000L};
AtomicInteger lingerMsCallTimes = new AtomicInteger(0);
final int[] capacityValues = {100, 200, 300};
AtomicInteger capacityCallTimes = new AtomicInteger(0);

ProcessorTestSuite
.builder(rule)
.configureProcessorsBuilder(builder -> builder.thenProcess(
new BatchingProcessor<TestTask>(
() -> lingerMsValues[lingerMsCallTimes.getAndIncrement() % lingerMsValues.length],
() -> capacityValues[capacityCallTimes.getAndIncrement() % capacityValues.length]
) {
@Override
protected void processBatchingTasks(List<BatchingTask<TestTask>> batchingTasks) {
try {
Thread.sleep(rand.nextInt(10));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
batchingTasks.forEach(batchingTask -> batchingTask.completion().complete());
}
}
))
.propertySupplier(StaticPropertySupplier.of(
Property.ofStatic(ProcessorProperties.CONFIG_PARTITION_CONCURRENCY, 16)
))
.build()
.run();

assertTrue(lingerMsCallTimes.get() > 1);
assertTrue(capacityCallTimes.get() > 1);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,17 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Predicate;
import java.util.function.Supplier;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.linecorp.decaton.processor.DecatonProcessor;
import com.linecorp.decaton.processor.DeferredCompletion;
Expand All @@ -39,11 +46,12 @@
* @param <T> type of task to batch
*/
public abstract class BatchingProcessor<T> implements DecatonProcessor<T> {
private static final Logger logger = LoggerFactory.getLogger(BatchingProcessor.class);

private final ScheduledExecutorService executor;
private List<BatchingTask<T>> currentBatch = new ArrayList<>();
private final long lingerMillis;
private final int capacity;
private final Supplier<Long> lingerMillisSupplier;
private final Supplier<Integer> capacitySupplier;
private final ReentrantLock rollingLock;

@Value
Expand All @@ -62,8 +70,35 @@ public static class BatchingTask<T> {
* tasks in past before reaching capacity are pushed to {@link BatchingTask#processBatchingTasks(List)}.
*/
protected BatchingProcessor(long lingerMillis, int capacity) {
this.lingerMillis = lingerMillis;
this.capacity = capacity;
this(() -> lingerMillis, () -> capacity);
}

/**
* Instantiate {@link BatchingProcessor} with dynamic lingerMillis and capacity config.
* <p>
* The suppliers are verified capable of returning positive numbers during instantiation.
* If not, this constructor will fail to instantiate the BatchingProcessor instance.
* <br>
* If the suppliers return non-positive numbers in processing time, lask-known-good value is used as fallback.
* </p>
*/
protected BatchingProcessor(Supplier<Long> lingerMillisSupplier, Supplier<Integer> capacitySupplier) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest keep current constructor to avoid breaking change, by adding overload like BatchingProcessor(lingerMillis, capacity) { this(() -> lingerMillis, () -> capacity) ... }

Objects.requireNonNull(lingerMillisSupplier, "lingerMillisSupplier must not be null.");
Objects.requireNonNull(capacitySupplier, "capacitySupplier must not be null.");

this.lingerMillisSupplier = validatedAndLKGWrappedSupplier(
"lingerMillisSupplier",
lingerMillisSupplier,
v -> v > 0);

this.capacitySupplier = validatedAndLKGWrappedSupplier(
"capacitySupplier",
capacitySupplier,
v -> v > 0);

// initialize last-known-good values or fail fast at constructor time
this.lingerMillisSupplier.get();
this.capacitySupplier.get();

ScheduledThreadPoolExecutor scheduledExecutor = new ScheduledThreadPoolExecutor(
1,
Expand Down Expand Up @@ -108,14 +143,14 @@ private void periodicallyFlushTask() {
}

private void scheduleFlush() {
executor.schedule(this::periodicallyFlushTask, lingerMillis, TimeUnit.MILLISECONDS);
executor.schedule(this::periodicallyFlushTask, this.lingerMillisSupplier.get(), TimeUnit.MILLISECONDS);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just getting new value and use it might be unsafe when supplier returns invalid value like negative value.

I suggest add a validation, and when it fails, logs it and keep using current value. WDYT? (Yeah I know, even the current code also doesn't have validation though...)
for capacity as well.

Copy link
Contributor Author

@kun98-liu kun98-liu Dec 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

EDIT:
I thought adding validations for extreme large numbers would be useful, but actually it could be intended in some cases. So let me add non-negative validations and error handling when get() throws exceptions

}

@Override
public void process(ProcessingContext<T> context, T task) throws InterruptedException {
rollingLock.lock();
try {
if (currentBatch.size() >= this.capacity) {
if (currentBatch.size() >= this.capacitySupplier.get()) {
final List<BatchingTask<T>> batch = currentBatch;
executor.submit(() -> processBatchingTasks(batch));
currentBatch = new ArrayList<>();
Expand All @@ -133,6 +168,46 @@ public void close() throws Exception {
executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
}

private static <V> Supplier<V> validatedAndLKGWrappedSupplier(
String name,
Supplier<V> delegate,
Predicate<V> validator) {
final AtomicReference<V> lkg = new AtomicReference<>();
return () -> {
final V value;
try {
value = delegate.get();
} catch (Exception e) {
if (lkg.get() != null) {
V lkgValue = lkg.get();
logger.warn("{} threw exception from get(), using last-known-good value: {}.", name,
lkgValue, e);
return lkgValue;
}
logger.error("{} threw exception from get(). No last-known-good value is available.", name);
throw new IllegalArgumentException(name + " threw exception from get().", e);
}

if (!validator.test(value)) {
if (lkg.get() != null) {
V lkgValue = lkg.get();
logger.warn("{} returned invalid value: {}, using last-known-good value: {}.",
name,
value,
lkgValue);
return lkgValue;
}
logger.error("{} returned invalid value: {}. No last-known-good value is available.",
name,
value);
throw new IllegalArgumentException(name + " returned invalid value: " + value);
}

lkg.set(value);
return value;
};
}

/**
* After complete processing batch of tasks,
* *MUST* call {@link BatchingTask#completion}'s {@link DeferredCompletion#complete()} or
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.linecorp.decaton.processor.processors;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.verify;
import static org.mockito.internal.verification.VerificationModeFactory.times;
Expand All @@ -26,6 +27,8 @@
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import org.junit.jupiter.api.BeforeEach;
Expand All @@ -51,15 +54,16 @@ public class BatchingProcessorTest {

@BeforeEach
public void before() {
doReturn(completion).when(context).deferCompletion();
processedTasks.clear();
}

private HelloTask buildHelloTask(String name, int age) {
return HelloTask.newBuilder().setName(name).setAge(age).build();
}

private BatchingProcessor<HelloTask> buildProcessor(CountDownLatch processLatch, long lingerMs, int capacity) {
private BatchingProcessor<HelloTask> buildProcessor(CountDownLatch processLatch,
Supplier<Long> lingerMs,
Supplier<Integer> capacity) {
return new BatchingProcessor<HelloTask>(lingerMs, capacity) {
@Override
protected void processBatchingTasks(List<BatchingTask<HelloTask>> batchingTasks) {
Expand All @@ -75,9 +79,11 @@ protected void processBatchingTasks(List<BatchingTask<HelloTask>> batchingTasks)
@Test
@Timeout(5)
public void testLingerLimit() throws InterruptedException {
doReturn(completion).when(context).deferCompletion();

long lingerMs = 1000;
CountDownLatch processLatch = new CountDownLatch(1);
BatchingProcessor<HelloTask> processor = buildProcessor(processLatch, lingerMs, Integer.MAX_VALUE);
BatchingProcessor<HelloTask> processor = buildProcessor(processLatch, () -> lingerMs, () -> Integer.MAX_VALUE);

HelloTask task1 = buildHelloTask("one", 1);
processor.process(context, task1);
Expand All @@ -91,8 +97,10 @@ public void testLingerLimit() throws InterruptedException {

@Test
public void testCapacityLimit() throws InterruptedException {
doReturn(completion).when(context).deferCompletion();

CountDownLatch processLatch = new CountDownLatch(1);
BatchingProcessor<HelloTask> processor = buildProcessor(processLatch, Long.MAX_VALUE, 2);
BatchingProcessor<HelloTask> processor = buildProcessor(processLatch, () -> Long.MAX_VALUE, () -> 2);

HelloTask task1 = buildHelloTask("one", 1);
HelloTask task2 = buildHelloTask("two", 2);
Expand All @@ -109,4 +117,46 @@ public void testCapacityLimit() throws InterruptedException {
verify(context, times(3)).deferCompletion();
verify(completion, times(processedTasks.size())).complete();
}

@Test
public void failFastDuringInstantiation() {
assertThrows(NullPointerException.class,
() -> buildProcessor(new CountDownLatch(1), null, null));
assertThrows(IllegalArgumentException.class,
() -> buildProcessor(new CountDownLatch(1), () -> 0L, () -> 1));
assertThrows(IllegalArgumentException.class,
() -> buildProcessor(new CountDownLatch(1), () -> 100L, () -> 0));
assertThrows(IllegalArgumentException.class,
() -> buildProcessor(new CountDownLatch(1), () -> 100L / 0, () -> -1));
}

@Test
public void fallbackToLKG() throws InterruptedException {
doReturn(completion).when(context).deferCompletion();

// only 2 is a legal value, and used as lask-known-good fallback.
final int[] capacityValues = {2, 0, -1};
AtomicInteger count = new AtomicInteger(0);
CountDownLatch processLatch = new CountDownLatch(1);
BatchingProcessor<HelloTask> processor =
buildProcessor(processLatch,
() -> Long.MAX_VALUE,
() -> capacityValues[count.getAndIncrement()
% capacityValues.length]);

HelloTask task1 = buildHelloTask("one", 1);
HelloTask task2 = buildHelloTask("two", 2);
HelloTask task3 = buildHelloTask("three", 3);

processor.process(context, task1);
processor.process(context, task2);
processor.process(context, task3);

processLatch.await();

assertEquals(new ArrayList<>(Arrays.asList(task1, task2)), processedTasks);
verify(context, times(3)).deferCompletion();
verify(completion, times(processedTasks.size())).complete();
assertEquals(4, count.get());
}
}