diff --git a/docs/task-batching.adoc b/docs/task-batching.adoc index aa509978..918002d6 100644 --- a/docs/task-batching.adoc +++ b/docs/task-batching.adoc @@ -109,6 +109,6 @@ public class TaskBatchingMain { } } ---- -<1> Pass `lingerMillis` and `capacity` to the constructor. +<1> Pass `lingerMillis` and `capacity` or their Supplier to the constructor. <2> Implement `processBatchingTasks(List)`. <3> Call `BatchingTask#completion` 's `DeferredCompletion#complete()`. diff --git a/processor/src/it/java/com/linecorp/decaton/processor/BatchingProcessorTest.java b/processor/src/it/java/com/linecorp/decaton/processor/BatchingProcessorTest.java index 216ea48b..24aa91cf 100644 --- a/processor/src/it/java/com/linecorp/decaton/processor/BatchingProcessorTest.java +++ b/processor/src/it/java/com/linecorp/decaton/processor/BatchingProcessorTest.java @@ -16,8 +16,11 @@ package com.linecorp.decaton.processor; +import static org.junit.jupiter.api.Assertions.assertTrue; + import java.util.List; import java.util.Random; +import java.util.concurrent.atomic.AtomicInteger; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; @@ -45,7 +48,7 @@ public void testBatchingProcessor() throws Exception { ProcessorTestSuite .builder(rule) .configureProcessorsBuilder(builder -> builder.thenProcess( - new BatchingProcessor(1000, 100) { + new BatchingProcessor(1000L, 100) { @Override protected void processBatchingTasks(List> batchingTasks) { // adding some random delay to simulate realistic usage @@ -65,4 +68,43 @@ protected void processBatchingTasks(List> batchingTasks) .build() .run(); } + + @Test + @Timeout(30) + public void testDynamicConfiguration() throws Exception { + Random rand = randomExtension.random(); + + final long[] lingerMsValues = {1000L, 2000L, 3000L}; + AtomicInteger lingerMsCallTimes = new AtomicInteger(0); + final int[] capacityValues = {100, 200, 300}; + AtomicInteger capacityCallTimes = new AtomicInteger(0); + + ProcessorTestSuite + .builder(rule) + .configureProcessorsBuilder(builder -> builder.thenProcess( + new BatchingProcessor( + () -> lingerMsValues[lingerMsCallTimes.getAndIncrement() % lingerMsValues.length], + () -> capacityValues[capacityCallTimes.getAndIncrement() % capacityValues.length] + ) { + @Override + protected void processBatchingTasks(List> batchingTasks) { + try { + Thread.sleep(rand.nextInt(10)); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + batchingTasks.forEach(batchingTask -> batchingTask.completion().complete()); + } + } + )) + .propertySupplier(StaticPropertySupplier.of( + Property.ofStatic(ProcessorProperties.CONFIG_PARTITION_CONCURRENCY, 16) + )) + .build() + .run(); + + assertTrue(lingerMsCallTimes.get() > 1); + assertTrue(capacityCallTimes.get() > 1); + } } diff --git a/processor/src/main/java/com/linecorp/decaton/processor/processors/BatchingProcessor.java b/processor/src/main/java/com/linecorp/decaton/processor/processors/BatchingProcessor.java index ac236fbd..b7615cac 100644 --- a/processor/src/main/java/com/linecorp/decaton/processor/processors/BatchingProcessor.java +++ b/processor/src/main/java/com/linecorp/decaton/processor/processors/BatchingProcessor.java @@ -18,10 +18,17 @@ import java.util.ArrayList; import java.util.List; +import java.util.Objects; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Predicate; +import java.util.function.Supplier; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.linecorp.decaton.processor.DecatonProcessor; import com.linecorp.decaton.processor.DeferredCompletion; @@ -39,11 +46,12 @@ * @param type of task to batch */ public abstract class BatchingProcessor implements DecatonProcessor { + private static final Logger logger = LoggerFactory.getLogger(BatchingProcessor.class); private final ScheduledExecutorService executor; private List> currentBatch = new ArrayList<>(); - private final long lingerMillis; - private final int capacity; + private final Supplier lingerMillisSupplier; + private final Supplier capacitySupplier; private final ReentrantLock rollingLock; @Value @@ -62,8 +70,35 @@ public static class BatchingTask { * tasks in past before reaching capacity are pushed to {@link BatchingTask#processBatchingTasks(List)}. */ protected BatchingProcessor(long lingerMillis, int capacity) { - this.lingerMillis = lingerMillis; - this.capacity = capacity; + this(() -> lingerMillis, () -> capacity); + } + + /** + * Instantiate {@link BatchingProcessor} with dynamic lingerMillis and capacity config. + *

+ * The suppliers are verified capable of returning positive numbers during instantiation. + * If not, this constructor will fail to instantiate the BatchingProcessor instance. + *
+ * If the suppliers return non-positive numbers in processing time, lask-known-good value is used as fallback. + *

+ */ + protected BatchingProcessor(Supplier lingerMillisSupplier, Supplier capacitySupplier) { + Objects.requireNonNull(lingerMillisSupplier, "lingerMillisSupplier must not be null."); + Objects.requireNonNull(capacitySupplier, "capacitySupplier must not be null."); + + this.lingerMillisSupplier = validatedAndLKGWrappedSupplier( + "lingerMillisSupplier", + lingerMillisSupplier, + v -> v > 0); + + this.capacitySupplier = validatedAndLKGWrappedSupplier( + "capacitySupplier", + capacitySupplier, + v -> v > 0); + + // initialize last-known-good values or fail fast at constructor time + this.lingerMillisSupplier.get(); + this.capacitySupplier.get(); ScheduledThreadPoolExecutor scheduledExecutor = new ScheduledThreadPoolExecutor( 1, @@ -108,14 +143,14 @@ private void periodicallyFlushTask() { } private void scheduleFlush() { - executor.schedule(this::periodicallyFlushTask, lingerMillis, TimeUnit.MILLISECONDS); + executor.schedule(this::periodicallyFlushTask, this.lingerMillisSupplier.get(), TimeUnit.MILLISECONDS); } @Override public void process(ProcessingContext context, T task) throws InterruptedException { rollingLock.lock(); try { - if (currentBatch.size() >= this.capacity) { + if (currentBatch.size() >= this.capacitySupplier.get()) { final List> batch = currentBatch; executor.submit(() -> processBatchingTasks(batch)); currentBatch = new ArrayList<>(); @@ -133,6 +168,46 @@ public void close() throws Exception { executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS); } + private static Supplier validatedAndLKGWrappedSupplier( + String name, + Supplier delegate, + Predicate validator) { + final AtomicReference lkg = new AtomicReference<>(); + return () -> { + final V value; + try { + value = delegate.get(); + } catch (Exception e) { + if (lkg.get() != null) { + V lkgValue = lkg.get(); + logger.warn("{} threw exception from get(), using last-known-good value: {}.", name, + lkgValue, e); + return lkgValue; + } + logger.error("{} threw exception from get(). No last-known-good value is available.", name); + throw new IllegalArgumentException(name + " threw exception from get().", e); + } + + if (!validator.test(value)) { + if (lkg.get() != null) { + V lkgValue = lkg.get(); + logger.warn("{} returned invalid value: {}, using last-known-good value: {}.", + name, + value, + lkgValue); + return lkgValue; + } + logger.error("{} returned invalid value: {}. No last-known-good value is available.", + name, + value); + throw new IllegalArgumentException(name + " returned invalid value: " + value); + } + + lkg.set(value); + return value; + }; + } + /** * After complete processing batch of tasks, * *MUST* call {@link BatchingTask#completion}'s {@link DeferredCompletion#complete()} or diff --git a/processor/src/test/java/com/linecorp/decaton/processor/processors/BatchingProcessorTest.java b/processor/src/test/java/com/linecorp/decaton/processor/processors/BatchingProcessorTest.java index c675edf5..1839237c 100644 --- a/processor/src/test/java/com/linecorp/decaton/processor/processors/BatchingProcessorTest.java +++ b/processor/src/test/java/com/linecorp/decaton/processor/processors/BatchingProcessorTest.java @@ -17,6 +17,7 @@ package com.linecorp.decaton.processor.processors; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.verify; import static org.mockito.internal.verification.VerificationModeFactory.times; @@ -26,6 +27,8 @@ import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; import java.util.stream.Collectors; import org.junit.jupiter.api.BeforeEach; @@ -51,7 +54,6 @@ public class BatchingProcessorTest { @BeforeEach public void before() { - doReturn(completion).when(context).deferCompletion(); processedTasks.clear(); } @@ -59,7 +61,9 @@ private HelloTask buildHelloTask(String name, int age) { return HelloTask.newBuilder().setName(name).setAge(age).build(); } - private BatchingProcessor buildProcessor(CountDownLatch processLatch, long lingerMs, int capacity) { + private BatchingProcessor buildProcessor(CountDownLatch processLatch, + Supplier lingerMs, + Supplier capacity) { return new BatchingProcessor(lingerMs, capacity) { @Override protected void processBatchingTasks(List> batchingTasks) { @@ -75,9 +79,11 @@ protected void processBatchingTasks(List> batchingTasks) @Test @Timeout(5) public void testLingerLimit() throws InterruptedException { + doReturn(completion).when(context).deferCompletion(); + long lingerMs = 1000; CountDownLatch processLatch = new CountDownLatch(1); - BatchingProcessor processor = buildProcessor(processLatch, lingerMs, Integer.MAX_VALUE); + BatchingProcessor processor = buildProcessor(processLatch, () -> lingerMs, () -> Integer.MAX_VALUE); HelloTask task1 = buildHelloTask("one", 1); processor.process(context, task1); @@ -91,8 +97,10 @@ public void testLingerLimit() throws InterruptedException { @Test public void testCapacityLimit() throws InterruptedException { + doReturn(completion).when(context).deferCompletion(); + CountDownLatch processLatch = new CountDownLatch(1); - BatchingProcessor processor = buildProcessor(processLatch, Long.MAX_VALUE, 2); + BatchingProcessor processor = buildProcessor(processLatch, () -> Long.MAX_VALUE, () -> 2); HelloTask task1 = buildHelloTask("one", 1); HelloTask task2 = buildHelloTask("two", 2); @@ -109,4 +117,46 @@ public void testCapacityLimit() throws InterruptedException { verify(context, times(3)).deferCompletion(); verify(completion, times(processedTasks.size())).complete(); } + + @Test + public void failFastDuringInstantiation() { + assertThrows(NullPointerException.class, + () -> buildProcessor(new CountDownLatch(1), null, null)); + assertThrows(IllegalArgumentException.class, + () -> buildProcessor(new CountDownLatch(1), () -> 0L, () -> 1)); + assertThrows(IllegalArgumentException.class, + () -> buildProcessor(new CountDownLatch(1), () -> 100L, () -> 0)); + assertThrows(IllegalArgumentException.class, + () -> buildProcessor(new CountDownLatch(1), () -> 100L / 0, () -> -1)); + } + + @Test + public void fallbackToLKG() throws InterruptedException { + doReturn(completion).when(context).deferCompletion(); + + // only 2 is a legal value, and used as lask-known-good fallback. + final int[] capacityValues = {2, 0, -1}; + AtomicInteger count = new AtomicInteger(0); + CountDownLatch processLatch = new CountDownLatch(1); + BatchingProcessor processor = + buildProcessor(processLatch, + () -> Long.MAX_VALUE, + () -> capacityValues[count.getAndIncrement() + % capacityValues.length]); + + HelloTask task1 = buildHelloTask("one", 1); + HelloTask task2 = buildHelloTask("two", 2); + HelloTask task3 = buildHelloTask("three", 3); + + processor.process(context, task1); + processor.process(context, task2); + processor.process(context, task3); + + processLatch.await(); + + assertEquals(new ArrayList<>(Arrays.asList(task1, task2)), processedTasks); + verify(context, times(3)).deferCompletion(); + verify(completion, times(processedTasks.size())).complete(); + assertEquals(4, count.get()); + } }