From 02f96bcf257fab9e2793494ac4e08c5ff9ad17b1 Mon Sep 17 00:00:00 2001 From: "kun98.liu" Date: Tue, 25 Nov 2025 13:02:19 +0900 Subject: [PATCH 1/9] Make lingerMillis and capacity dynamic for BatchingProcessor --- .../processor/BatchingProcessorTest.java | 2 +- .../processors/BatchingProcessor.java | 21 +++++++++++-------- .../processors/BatchingProcessorTest.java | 2 +- 3 files changed, 14 insertions(+), 11 deletions(-) diff --git a/processor/src/it/java/com/linecorp/decaton/processor/BatchingProcessorTest.java b/processor/src/it/java/com/linecorp/decaton/processor/BatchingProcessorTest.java index 216ea48b..a349645c 100644 --- a/processor/src/it/java/com/linecorp/decaton/processor/BatchingProcessorTest.java +++ b/processor/src/it/java/com/linecorp/decaton/processor/BatchingProcessorTest.java @@ -45,7 +45,7 @@ public void testBatchingProcessor() throws Exception { ProcessorTestSuite .builder(rule) .configureProcessorsBuilder(builder -> builder.thenProcess( - new BatchingProcessor(1000, 100) { + new BatchingProcessor(() -> 1000L, () -> 100) { @Override protected void processBatchingTasks(List> batchingTasks) { // adding some random delay to simulate realistic usage diff --git a/processor/src/main/java/com/linecorp/decaton/processor/processors/BatchingProcessor.java b/processor/src/main/java/com/linecorp/decaton/processor/processors/BatchingProcessor.java index ac236fbd..9ca346ca 100644 --- a/processor/src/main/java/com/linecorp/decaton/processor/processors/BatchingProcessor.java +++ b/processor/src/main/java/com/linecorp/decaton/processor/processors/BatchingProcessor.java @@ -22,6 +22,7 @@ import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Supplier; import com.linecorp.decaton.processor.DecatonProcessor; import com.linecorp.decaton.processor.DeferredCompletion; @@ -42,8 +43,8 @@ public abstract class BatchingProcessor implements DecatonProcessor { private final ScheduledExecutorService executor; private List> currentBatch = new ArrayList<>(); - private final long lingerMillis; - private final int capacity; + private final Supplier lingerMillisSupplier; + private final Supplier capacitySupplier; private final ReentrantLock rollingLock; @Value @@ -56,14 +57,14 @@ public static class BatchingTask { /** * Instantiate {@link BatchingProcessor}. - * @param lingerMillis time limit for this processor. On every lingerMillis milliseconds, + * @param lingerMillisSupplier time limit for this processor. On every lingerMillis milliseconds, * tasks in past lingerMillis milliseconds are pushed to {@link BatchingTask#processBatchingTasks(List)}. - * @param capacity size limit for this processor. Every time tasks’size reaches capacity, + * @param lingerMillisSupplier size limit for this processor. Every time tasks’size reaches capacity, * tasks in past before reaching capacity are pushed to {@link BatchingTask#processBatchingTasks(List)}. */ - protected BatchingProcessor(long lingerMillis, int capacity) { - this.lingerMillis = lingerMillis; - this.capacity = capacity; + protected BatchingProcessor(Supplier lingerMillisSupplier, Supplier capacitySupplier) { + this.lingerMillisSupplier = lingerMillisSupplier; + this.capacitySupplier = capacitySupplier; ScheduledThreadPoolExecutor scheduledExecutor = new ScheduledThreadPoolExecutor( 1, @@ -108,14 +109,16 @@ private void periodicallyFlushTask() { } private void scheduleFlush() { - executor.schedule(this::periodicallyFlushTask, lingerMillis, TimeUnit.MILLISECONDS); + long lingerMs = this.lingerMillisSupplier.get(); + executor.schedule(this::periodicallyFlushTask, lingerMs, TimeUnit.MILLISECONDS); } @Override public void process(ProcessingContext context, T task) throws InterruptedException { rollingLock.lock(); try { - if (currentBatch.size() >= this.capacity) { + int cap = this.capacitySupplier.get(); + if (currentBatch.size() >= cap) { final List> batch = currentBatch; executor.submit(() -> processBatchingTasks(batch)); currentBatch = new ArrayList<>(); diff --git a/processor/src/test/java/com/linecorp/decaton/processor/processors/BatchingProcessorTest.java b/processor/src/test/java/com/linecorp/decaton/processor/processors/BatchingProcessorTest.java index c675edf5..4391f2c8 100644 --- a/processor/src/test/java/com/linecorp/decaton/processor/processors/BatchingProcessorTest.java +++ b/processor/src/test/java/com/linecorp/decaton/processor/processors/BatchingProcessorTest.java @@ -60,7 +60,7 @@ private HelloTask buildHelloTask(String name, int age) { } private BatchingProcessor buildProcessor(CountDownLatch processLatch, long lingerMs, int capacity) { - return new BatchingProcessor(lingerMs, capacity) { + return new BatchingProcessor(() -> lingerMs, () -> capacity) { @Override protected void processBatchingTasks(List> batchingTasks) { List helloTasks = From c6df93e24a564cf69460ea911343ee5791efea5e Mon Sep 17 00:00:00 2001 From: "kun98.liu" Date: Wed, 10 Dec 2025 17:31:22 +0900 Subject: [PATCH 2/9] Add it --- .../processor/BatchingProcessorTest.java | 45 +++++++++++++++++++ 1 file changed, 45 insertions(+) diff --git a/processor/src/it/java/com/linecorp/decaton/processor/BatchingProcessorTest.java b/processor/src/it/java/com/linecorp/decaton/processor/BatchingProcessorTest.java index a349645c..2e20eb85 100644 --- a/processor/src/it/java/com/linecorp/decaton/processor/BatchingProcessorTest.java +++ b/processor/src/it/java/com/linecorp/decaton/processor/BatchingProcessorTest.java @@ -16,8 +16,11 @@ package com.linecorp.decaton.processor; +import static org.junit.jupiter.api.Assertions.assertTrue; + import java.util.List; import java.util.Random; +import java.util.concurrent.atomic.AtomicInteger; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; @@ -65,4 +68,46 @@ protected void processBatchingTasks(List> batchingTasks) .build() .run(); } + + @Test + @Timeout(30) + public void testDynamicConfiguration() throws Exception { + // Test with dynamic linger milliseconds but static capacity + Random rand = randomExtension.random(); + + // Track the value of lingerMs for testing purposes + final long[] lingerMsValues = {1000L, 2000L, 3000L}; + AtomicInteger lingerMsCallTimes = new AtomicInteger(0); + final int[] capacityValues = {100, 200, 300}; + AtomicInteger capacityCallTimes = new AtomicInteger(0); + + ProcessorTestSuite + .builder(rule) + .configureProcessorsBuilder(builder -> builder.thenProcess( + new BatchingProcessor( + () -> lingerMsValues[lingerMsCallTimes.getAndIncrement() % lingerMsValues.length], + () -> capacityValues[capacityCallTimes.getAndIncrement() % capacityValues.length] + ) { + @Override + protected void processBatchingTasks(List> batchingTasks) { + // adding some random delay to simulate realistic usage + try { + Thread.sleep(rand.nextInt(10)); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + batchingTasks.forEach(batchingTask -> batchingTask.completion().complete()); + } + } + )) + .propertySupplier(StaticPropertySupplier.of( + Property.ofStatic(ProcessorProperties.CONFIG_PARTITION_CONCURRENCY, 16) + )) + .build() + .run(); + + assertTrue(lingerMsCallTimes.get() > 1); + assertTrue(capacityCallTimes.get() > 1); + } } From 433a97dfa44e2d2728bb2255913ffa5302032696 Mon Sep 17 00:00:00 2001 From: "kun98.liu" Date: Wed, 10 Dec 2025 17:44:37 +0900 Subject: [PATCH 3/9] Fix doc --- docs/task-batching.adoc | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/docs/task-batching.adoc b/docs/task-batching.adoc index aa509978..d38b0f44 100644 --- a/docs/task-batching.adoc +++ b/docs/task-batching.adoc @@ -21,10 +21,10 @@ To use `Task Batching`, you need to create a class that inherits `BatchingProces |=== |parameter |Description -|`lingerMillis` +|`lingerMillisSupplier` |Time limit for this processor. On every lingerMillis milliseconds, tasks in past lingerMillis milliseconds are pushed to `BatchingProcessor#processBatchingTasks(List)`. -|`capacity` +|`capacitySupplier` |Capacity size limit for this processor. Every time tasks’size reaches capacity, tasks in past before reaching capacity are pushed to `BatchingProcessor#processBatchingTasks(List)`. |=== @@ -68,7 +68,7 @@ Create a class that inherits `BatchingProcessor`, as shown in the following exam ---- public class InsertHelloTaskBatchingProcessor extends BatchingProcessor { public InsertHelloTaskBatchingProcessor(long lingerMillis, int capacity) { - super(lingerMillis, capacity); // <1> + super(() -> lingerMillis, () -> capacity); // <1> } @Override @@ -109,6 +109,6 @@ public class TaskBatchingMain { } } ---- -<1> Pass `lingerMillis` and `capacity` to the constructor. +<1> Pass `lingerMillisSupplier` and `capacitySupplier` to the constructor. <2> Implement `processBatchingTasks(List)`. <3> Call `BatchingTask#completion` 's `DeferredCompletion#complete()`. From e7f597cdb579601486b1d18ee9df9693651c4b4a Mon Sep 17 00:00:00 2001 From: "kun98.liu" Date: Wed, 10 Dec 2025 17:47:56 +0900 Subject: [PATCH 4/9] remove redundant code --- .../decaton/processor/processors/BatchingProcessor.java | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/processor/src/main/java/com/linecorp/decaton/processor/processors/BatchingProcessor.java b/processor/src/main/java/com/linecorp/decaton/processor/processors/BatchingProcessor.java index 9ca346ca..87b06426 100644 --- a/processor/src/main/java/com/linecorp/decaton/processor/processors/BatchingProcessor.java +++ b/processor/src/main/java/com/linecorp/decaton/processor/processors/BatchingProcessor.java @@ -59,7 +59,7 @@ public static class BatchingTask { * Instantiate {@link BatchingProcessor}. * @param lingerMillisSupplier time limit for this processor. On every lingerMillis milliseconds, * tasks in past lingerMillis milliseconds are pushed to {@link BatchingTask#processBatchingTasks(List)}. - * @param lingerMillisSupplier size limit for this processor. Every time tasks’size reaches capacity, + * @param capacitySupplier size limit for this processor. Every time tasks’size reaches capacity, * tasks in past before reaching capacity are pushed to {@link BatchingTask#processBatchingTasks(List)}. */ protected BatchingProcessor(Supplier lingerMillisSupplier, Supplier capacitySupplier) { @@ -109,16 +109,14 @@ private void periodicallyFlushTask() { } private void scheduleFlush() { - long lingerMs = this.lingerMillisSupplier.get(); - executor.schedule(this::periodicallyFlushTask, lingerMs, TimeUnit.MILLISECONDS); + executor.schedule(this::periodicallyFlushTask, this.lingerMillisSupplier.get(), TimeUnit.MILLISECONDS); } @Override public void process(ProcessingContext context, T task) throws InterruptedException { rollingLock.lock(); try { - int cap = this.capacitySupplier.get(); - if (currentBatch.size() >= cap) { + if (currentBatch.size() >= this.capacitySupplier.get()) { final List> batch = currentBatch; executor.submit(() -> processBatchingTasks(batch)); currentBatch = new ArrayList<>(); From e82696b6b571d138935318d4fc15b723e4e74ee5 Mon Sep 17 00:00:00 2001 From: "kun98.liu" Date: Wed, 10 Dec 2025 18:25:56 +0900 Subject: [PATCH 5/9] remove redundant comment --- .../com/linecorp/decaton/processor/BatchingProcessorTest.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/processor/src/it/java/com/linecorp/decaton/processor/BatchingProcessorTest.java b/processor/src/it/java/com/linecorp/decaton/processor/BatchingProcessorTest.java index 2e20eb85..b7b2d703 100644 --- a/processor/src/it/java/com/linecorp/decaton/processor/BatchingProcessorTest.java +++ b/processor/src/it/java/com/linecorp/decaton/processor/BatchingProcessorTest.java @@ -72,10 +72,8 @@ protected void processBatchingTasks(List> batchingTasks) @Test @Timeout(30) public void testDynamicConfiguration() throws Exception { - // Test with dynamic linger milliseconds but static capacity Random rand = randomExtension.random(); - // Track the value of lingerMs for testing purposes final long[] lingerMsValues = {1000L, 2000L, 3000L}; AtomicInteger lingerMsCallTimes = new AtomicInteger(0); final int[] capacityValues = {100, 200, 300}; @@ -90,7 +88,6 @@ public void testDynamicConfiguration() throws Exception { ) { @Override protected void processBatchingTasks(List> batchingTasks) { - // adding some random delay to simulate realistic usage try { Thread.sleep(rand.nextInt(10)); } catch (InterruptedException e) { From 048cbcb897bbfa9822b0519c497f7abd35faed49 Mon Sep 17 00:00:00 2001 From: "kun98.liu" Date: Sat, 27 Dec 2025 21:34:25 +0900 Subject: [PATCH 6/9] Add overload constructor --- .../com/linecorp/decaton/processor/BatchingProcessorTest.java | 2 +- .../decaton/processor/processors/BatchingProcessor.java | 4 ++++ .../decaton/processor/processors/BatchingProcessorTest.java | 2 +- 3 files changed, 6 insertions(+), 2 deletions(-) diff --git a/processor/src/it/java/com/linecorp/decaton/processor/BatchingProcessorTest.java b/processor/src/it/java/com/linecorp/decaton/processor/BatchingProcessorTest.java index b7b2d703..24aa91cf 100644 --- a/processor/src/it/java/com/linecorp/decaton/processor/BatchingProcessorTest.java +++ b/processor/src/it/java/com/linecorp/decaton/processor/BatchingProcessorTest.java @@ -48,7 +48,7 @@ public void testBatchingProcessor() throws Exception { ProcessorTestSuite .builder(rule) .configureProcessorsBuilder(builder -> builder.thenProcess( - new BatchingProcessor(() -> 1000L, () -> 100) { + new BatchingProcessor(1000L, 100) { @Override protected void processBatchingTasks(List> batchingTasks) { // adding some random delay to simulate realistic usage diff --git a/processor/src/main/java/com/linecorp/decaton/processor/processors/BatchingProcessor.java b/processor/src/main/java/com/linecorp/decaton/processor/processors/BatchingProcessor.java index 87b06426..1403c078 100644 --- a/processor/src/main/java/com/linecorp/decaton/processor/processors/BatchingProcessor.java +++ b/processor/src/main/java/com/linecorp/decaton/processor/processors/BatchingProcessor.java @@ -55,6 +55,10 @@ public static class BatchingTask { T task; } + protected BatchingProcessor(long lingerMillis, int capacity) { + this(() -> lingerMillis, () -> capacity); + } + /** * Instantiate {@link BatchingProcessor}. * @param lingerMillisSupplier time limit for this processor. On every lingerMillis milliseconds, diff --git a/processor/src/test/java/com/linecorp/decaton/processor/processors/BatchingProcessorTest.java b/processor/src/test/java/com/linecorp/decaton/processor/processors/BatchingProcessorTest.java index 4391f2c8..c675edf5 100644 --- a/processor/src/test/java/com/linecorp/decaton/processor/processors/BatchingProcessorTest.java +++ b/processor/src/test/java/com/linecorp/decaton/processor/processors/BatchingProcessorTest.java @@ -60,7 +60,7 @@ private HelloTask buildHelloTask(String name, int age) { } private BatchingProcessor buildProcessor(CountDownLatch processLatch, long lingerMs, int capacity) { - return new BatchingProcessor(() -> lingerMs, () -> capacity) { + return new BatchingProcessor(lingerMs, capacity) { @Override protected void processBatchingTasks(List> batchingTasks) { List helloTasks = From 63a296e8fc374f8e8654444fc28c6148bf913d94 Mon Sep 17 00:00:00 2001 From: "kun98.liu" Date: Mon, 29 Dec 2025 15:48:06 +0900 Subject: [PATCH 7/9] Add validations and use last-known-good value as fallback in processing time --- .../processors/BatchingProcessor.java | 87 +++++++++++++++++-- .../processors/BatchingProcessorTest.java | 58 ++++++++++++- 2 files changed, 134 insertions(+), 11 deletions(-) diff --git a/processor/src/main/java/com/linecorp/decaton/processor/processors/BatchingProcessor.java b/processor/src/main/java/com/linecorp/decaton/processor/processors/BatchingProcessor.java index 1403c078..cbd448fd 100644 --- a/processor/src/main/java/com/linecorp/decaton/processor/processors/BatchingProcessor.java +++ b/processor/src/main/java/com/linecorp/decaton/processor/processors/BatchingProcessor.java @@ -18,12 +18,18 @@ import java.util.ArrayList; import java.util.List; +import java.util.Objects; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Predicate; import java.util.function.Supplier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import com.linecorp.decaton.processor.DecatonProcessor; import com.linecorp.decaton.processor.DeferredCompletion; import com.linecorp.decaton.processor.ProcessingContext; @@ -40,6 +46,7 @@ * @param type of task to batch */ public abstract class BatchingProcessor implements DecatonProcessor { + private static final Logger logger = LoggerFactory.getLogger(BatchingProcessor.class); private final ScheduledExecutorService executor; private List> currentBatch = new ArrayList<>(); @@ -55,20 +62,46 @@ public static class BatchingTask { T task; } + /** + * Instantiate {@link BatchingProcessor}. + * @param lingerMillis time limit for this processor. On every lingerMillis milliseconds, + * tasks in past lingerMillis milliseconds are pushed to {@link BatchingTask#processBatchingTasks(List)}. + * @param capacity size limit for this processor. Every time tasks’size reaches capacity, + * tasks in past before reaching capacity are pushed to {@link BatchingTask#processBatchingTasks(List)}. + */ protected BatchingProcessor(long lingerMillis, int capacity) { this(() -> lingerMillis, () -> capacity); } /** - * Instantiate {@link BatchingProcessor}. - * @param lingerMillisSupplier time limit for this processor. On every lingerMillis milliseconds, - * tasks in past lingerMillis milliseconds are pushed to {@link BatchingTask#processBatchingTasks(List)}. - * @param capacitySupplier size limit for this processor. Every time tasks’size reaches capacity, - * tasks in past before reaching capacity are pushed to {@link BatchingTask#processBatchingTasks(List)}. + * Instantiate {@link BatchingProcessor} with dynamic lingerMillis and capacity config. + *

+ * The suppliers are verified capable of returning positive numbers during instantiation. + * If not, this constructor will fail to instantiate the BatchingProcessor instance. + *
+ * If the suppliers return non-positive numbers in processing time, lask-known-good value is used as fallback. + *

*/ protected BatchingProcessor(Supplier lingerMillisSupplier, Supplier capacitySupplier) { - this.lingerMillisSupplier = lingerMillisSupplier; - this.capacitySupplier = capacitySupplier; + Objects.requireNonNull(lingerMillisSupplier, "lingerMillisSupplier must not be null."); + Objects.requireNonNull(capacitySupplier, "capacitySupplier must not be null."); + + this.lingerMillisSupplier = validatedAndLKGWrappedSupplier( + "lingerMillisSupplier", + lingerMillisSupplier, + v -> v > 0, + new AtomicReference<>()); + + this.capacitySupplier = validatedAndLKGWrappedSupplier( + "capacitySupplier", + capacitySupplier, + v -> v > 0, + new AtomicReference<>() + ); + + // initialize last-known-good values or fail fast at constructor time + this.lingerMillisSupplier.get(); + this.capacitySupplier.get(); ScheduledThreadPoolExecutor scheduledExecutor = new ScheduledThreadPoolExecutor( 1, @@ -138,6 +171,46 @@ public void close() throws Exception { executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS); } + private Supplier validatedAndLKGWrappedSupplier( + String name, + Supplier delegate, + Predicate validator, + AtomicReference lkg) { + return () -> { + final V value; + try { + value = delegate.get(); + } catch (Exception e) { + if (lkg != null && lkg.get() != null) { + V lkgValue = lkg.get(); + logger.warn("{} threw exception from get(), using last-known-good value: {}.", name, + lkgValue, e); + return lkgValue; + } + logger.error("{} threw exception from get(). No last-known-good value is available.", name); + throw new IllegalArgumentException(name + " threw exception from get().", e); + } + + if (!validator.test(value)) { + if (lkg != null && lkg.get() != null) { + V lkgValue = lkg.get(); + logger.warn("{} returned invalid value: {}, using last-known-good value: {}.", + name, + value, + lkgValue); + return lkgValue; + } + logger.error("{} returned invalid value: {}. No last-known-good value is available.", + name, + value); + throw new IllegalArgumentException(name + " returned invalid value: " + value); + } + + lkg.set(value); + return value; + }; + } + /** * After complete processing batch of tasks, * *MUST* call {@link BatchingTask#completion}'s {@link DeferredCompletion#complete()} or diff --git a/processor/src/test/java/com/linecorp/decaton/processor/processors/BatchingProcessorTest.java b/processor/src/test/java/com/linecorp/decaton/processor/processors/BatchingProcessorTest.java index c675edf5..1839237c 100644 --- a/processor/src/test/java/com/linecorp/decaton/processor/processors/BatchingProcessorTest.java +++ b/processor/src/test/java/com/linecorp/decaton/processor/processors/BatchingProcessorTest.java @@ -17,6 +17,7 @@ package com.linecorp.decaton.processor.processors; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.verify; import static org.mockito.internal.verification.VerificationModeFactory.times; @@ -26,6 +27,8 @@ import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; import java.util.stream.Collectors; import org.junit.jupiter.api.BeforeEach; @@ -51,7 +54,6 @@ public class BatchingProcessorTest { @BeforeEach public void before() { - doReturn(completion).when(context).deferCompletion(); processedTasks.clear(); } @@ -59,7 +61,9 @@ private HelloTask buildHelloTask(String name, int age) { return HelloTask.newBuilder().setName(name).setAge(age).build(); } - private BatchingProcessor buildProcessor(CountDownLatch processLatch, long lingerMs, int capacity) { + private BatchingProcessor buildProcessor(CountDownLatch processLatch, + Supplier lingerMs, + Supplier capacity) { return new BatchingProcessor(lingerMs, capacity) { @Override protected void processBatchingTasks(List> batchingTasks) { @@ -75,9 +79,11 @@ protected void processBatchingTasks(List> batchingTasks) @Test @Timeout(5) public void testLingerLimit() throws InterruptedException { + doReturn(completion).when(context).deferCompletion(); + long lingerMs = 1000; CountDownLatch processLatch = new CountDownLatch(1); - BatchingProcessor processor = buildProcessor(processLatch, lingerMs, Integer.MAX_VALUE); + BatchingProcessor processor = buildProcessor(processLatch, () -> lingerMs, () -> Integer.MAX_VALUE); HelloTask task1 = buildHelloTask("one", 1); processor.process(context, task1); @@ -91,8 +97,10 @@ public void testLingerLimit() throws InterruptedException { @Test public void testCapacityLimit() throws InterruptedException { + doReturn(completion).when(context).deferCompletion(); + CountDownLatch processLatch = new CountDownLatch(1); - BatchingProcessor processor = buildProcessor(processLatch, Long.MAX_VALUE, 2); + BatchingProcessor processor = buildProcessor(processLatch, () -> Long.MAX_VALUE, () -> 2); HelloTask task1 = buildHelloTask("one", 1); HelloTask task2 = buildHelloTask("two", 2); @@ -109,4 +117,46 @@ public void testCapacityLimit() throws InterruptedException { verify(context, times(3)).deferCompletion(); verify(completion, times(processedTasks.size())).complete(); } + + @Test + public void failFastDuringInstantiation() { + assertThrows(NullPointerException.class, + () -> buildProcessor(new CountDownLatch(1), null, null)); + assertThrows(IllegalArgumentException.class, + () -> buildProcessor(new CountDownLatch(1), () -> 0L, () -> 1)); + assertThrows(IllegalArgumentException.class, + () -> buildProcessor(new CountDownLatch(1), () -> 100L, () -> 0)); + assertThrows(IllegalArgumentException.class, + () -> buildProcessor(new CountDownLatch(1), () -> 100L / 0, () -> -1)); + } + + @Test + public void fallbackToLKG() throws InterruptedException { + doReturn(completion).when(context).deferCompletion(); + + // only 2 is a legal value, and used as lask-known-good fallback. + final int[] capacityValues = {2, 0, -1}; + AtomicInteger count = new AtomicInteger(0); + CountDownLatch processLatch = new CountDownLatch(1); + BatchingProcessor processor = + buildProcessor(processLatch, + () -> Long.MAX_VALUE, + () -> capacityValues[count.getAndIncrement() + % capacityValues.length]); + + HelloTask task1 = buildHelloTask("one", 1); + HelloTask task2 = buildHelloTask("two", 2); + HelloTask task3 = buildHelloTask("three", 3); + + processor.process(context, task1); + processor.process(context, task2); + processor.process(context, task3); + + processLatch.await(); + + assertEquals(new ArrayList<>(Arrays.asList(task1, task2)), processedTasks); + verify(context, times(3)).deferCompletion(); + verify(completion, times(processedTasks.size())).complete(); + assertEquals(4, count.get()); + } } From ad8aca08592e079fba3b492e7c8b8cacd4ae78c6 Mon Sep 17 00:00:00 2001 From: "kun98.liu" Date: Mon, 29 Dec 2025 15:51:04 +0900 Subject: [PATCH 8/9] Revert doc changes --- docs/task-batching.adoc | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/docs/task-batching.adoc b/docs/task-batching.adoc index d38b0f44..918002d6 100644 --- a/docs/task-batching.adoc +++ b/docs/task-batching.adoc @@ -21,10 +21,10 @@ To use `Task Batching`, you need to create a class that inherits `BatchingProces |=== |parameter |Description -|`lingerMillisSupplier` +|`lingerMillis` |Time limit for this processor. On every lingerMillis milliseconds, tasks in past lingerMillis milliseconds are pushed to `BatchingProcessor#processBatchingTasks(List)`. -|`capacitySupplier` +|`capacity` |Capacity size limit for this processor. Every time tasks’size reaches capacity, tasks in past before reaching capacity are pushed to `BatchingProcessor#processBatchingTasks(List)`. |=== @@ -68,7 +68,7 @@ Create a class that inherits `BatchingProcessor`, as shown in the following exam ---- public class InsertHelloTaskBatchingProcessor extends BatchingProcessor { public InsertHelloTaskBatchingProcessor(long lingerMillis, int capacity) { - super(() -> lingerMillis, () -> capacity); // <1> + super(lingerMillis, capacity); // <1> } @Override @@ -109,6 +109,6 @@ public class TaskBatchingMain { } } ---- -<1> Pass `lingerMillisSupplier` and `capacitySupplier` to the constructor. +<1> Pass `lingerMillis` and `capacity` or their Supplier to the constructor. <2> Implement `processBatchingTasks(List)`. <3> Call `BatchingTask#completion` 's `DeferredCompletion#complete()`. From ddec2d1f385667140f9baea8287b96d557b9ae16 Mon Sep 17 00:00:00 2001 From: "kun98.liu" Date: Thu, 1 Jan 2026 14:49:12 +0900 Subject: [PATCH 9/9] apply comment --- .../processor/processors/BatchingProcessor.java | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/processor/src/main/java/com/linecorp/decaton/processor/processors/BatchingProcessor.java b/processor/src/main/java/com/linecorp/decaton/processor/processors/BatchingProcessor.java index cbd448fd..b7615cac 100644 --- a/processor/src/main/java/com/linecorp/decaton/processor/processors/BatchingProcessor.java +++ b/processor/src/main/java/com/linecorp/decaton/processor/processors/BatchingProcessor.java @@ -89,15 +89,12 @@ protected BatchingProcessor(Supplier lingerMillisSupplier, Supplier v > 0, - new AtomicReference<>()); + v -> v > 0); this.capacitySupplier = validatedAndLKGWrappedSupplier( "capacitySupplier", capacitySupplier, - v -> v > 0, - new AtomicReference<>() - ); + v -> v > 0); // initialize last-known-good values or fail fast at constructor time this.lingerMillisSupplier.get(); @@ -171,17 +168,17 @@ public void close() throws Exception { executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS); } - private Supplier validatedAndLKGWrappedSupplier( + private static Supplier validatedAndLKGWrappedSupplier( String name, Supplier delegate, - Predicate validator, - AtomicReference lkg) { + Predicate validator) { + final AtomicReference lkg = new AtomicReference<>(); return () -> { final V value; try { value = delegate.get(); } catch (Exception e) { - if (lkg != null && lkg.get() != null) { + if (lkg.get() != null) { V lkgValue = lkg.get(); logger.warn("{} threw exception from get(), using last-known-good value: {}.", name, lkgValue, e); @@ -192,7 +189,7 @@ private Supplier validatedAndLKGWrappedSupplier( } if (!validator.test(value)) { - if (lkg != null && lkg.get() != null) { + if (lkg.get() != null) { V lkgValue = lkg.get(); logger.warn("{} returned invalid value: {}, using last-known-good value: {}.", name,