-
Notifications
You must be signed in to change notification settings - Fork 60
Make lingerMillis and capacity dynamic for BatchingProcessor #268
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
02f96bc
c6df93e
433a97d
e7f597c
e82696b
048cbcb
63a296e
ad8aca0
ddec2d1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -18,10 +18,17 @@ | |
|
|
||
| import java.util.ArrayList; | ||
| import java.util.List; | ||
| import java.util.Objects; | ||
| import java.util.concurrent.ScheduledExecutorService; | ||
| import java.util.concurrent.ScheduledThreadPoolExecutor; | ||
| import java.util.concurrent.TimeUnit; | ||
| import java.util.concurrent.atomic.AtomicReference; | ||
| import java.util.concurrent.locks.ReentrantLock; | ||
| import java.util.function.Predicate; | ||
| import java.util.function.Supplier; | ||
|
|
||
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
|
|
||
| import com.linecorp.decaton.processor.DecatonProcessor; | ||
| import com.linecorp.decaton.processor.DeferredCompletion; | ||
|
|
@@ -39,11 +46,12 @@ | |
| * @param <T> type of task to batch | ||
| */ | ||
| public abstract class BatchingProcessor<T> implements DecatonProcessor<T> { | ||
| private static final Logger logger = LoggerFactory.getLogger(BatchingProcessor.class); | ||
|
|
||
| private final ScheduledExecutorService executor; | ||
| private List<BatchingTask<T>> currentBatch = new ArrayList<>(); | ||
| private final long lingerMillis; | ||
| private final int capacity; | ||
| private final Supplier<Long> lingerMillisSupplier; | ||
| private final Supplier<Integer> capacitySupplier; | ||
| private final ReentrantLock rollingLock; | ||
|
|
||
| @Value | ||
|
|
@@ -62,8 +70,35 @@ public static class BatchingTask<T> { | |
| * tasks in past before reaching capacity are pushed to {@link BatchingTask#processBatchingTasks(List)}. | ||
| */ | ||
| protected BatchingProcessor(long lingerMillis, int capacity) { | ||
| this.lingerMillis = lingerMillis; | ||
| this.capacity = capacity; | ||
| this(() -> lingerMillis, () -> capacity); | ||
| } | ||
|
|
||
| /** | ||
| * Instantiate {@link BatchingProcessor} with dynamic lingerMillis and capacity config. | ||
| * <p> | ||
| * The suppliers are verified capable of returning positive numbers during instantiation. | ||
| * If not, this constructor will fail to instantiate the BatchingProcessor instance. | ||
| * <br> | ||
| * If the suppliers return non-positive numbers in processing time, lask-known-good value is used as fallback. | ||
| * </p> | ||
| */ | ||
| protected BatchingProcessor(Supplier<Long> lingerMillisSupplier, Supplier<Integer> capacitySupplier) { | ||
| Objects.requireNonNull(lingerMillisSupplier, "lingerMillisSupplier must not be null."); | ||
| Objects.requireNonNull(capacitySupplier, "capacitySupplier must not be null."); | ||
|
|
||
| this.lingerMillisSupplier = validatedAndLKGWrappedSupplier( | ||
| "lingerMillisSupplier", | ||
| lingerMillisSupplier, | ||
| v -> v > 0); | ||
|
|
||
| this.capacitySupplier = validatedAndLKGWrappedSupplier( | ||
| "capacitySupplier", | ||
| capacitySupplier, | ||
| v -> v > 0); | ||
|
|
||
| // initialize last-known-good values or fail fast at constructor time | ||
| this.lingerMillisSupplier.get(); | ||
| this.capacitySupplier.get(); | ||
|
|
||
| ScheduledThreadPoolExecutor scheduledExecutor = new ScheduledThreadPoolExecutor( | ||
| 1, | ||
|
|
@@ -108,14 +143,14 @@ private void periodicallyFlushTask() { | |
| } | ||
|
|
||
| private void scheduleFlush() { | ||
| executor.schedule(this::periodicallyFlushTask, lingerMillis, TimeUnit.MILLISECONDS); | ||
| executor.schedule(this::periodicallyFlushTask, this.lingerMillisSupplier.get(), TimeUnit.MILLISECONDS); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just getting new value and use it might be unsafe when supplier returns invalid value like negative value. I suggest add a validation, and when it fails, logs it and keep using current value. WDYT? (Yeah I know, even the current code also doesn't have validation though...)
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. EDIT: |
||
| } | ||
|
|
||
| @Override | ||
| public void process(ProcessingContext<T> context, T task) throws InterruptedException { | ||
| rollingLock.lock(); | ||
| try { | ||
| if (currentBatch.size() >= this.capacity) { | ||
| if (currentBatch.size() >= this.capacitySupplier.get()) { | ||
| final List<BatchingTask<T>> batch = currentBatch; | ||
| executor.submit(() -> processBatchingTasks(batch)); | ||
| currentBatch = new ArrayList<>(); | ||
|
|
@@ -133,6 +168,46 @@ public void close() throws Exception { | |
| executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS); | ||
| } | ||
|
|
||
| private static <V> Supplier<V> validatedAndLKGWrappedSupplier( | ||
| String name, | ||
| Supplier<V> delegate, | ||
| Predicate<V> validator) { | ||
| final AtomicReference<V> lkg = new AtomicReference<>(); | ||
| return () -> { | ||
| final V value; | ||
| try { | ||
| value = delegate.get(); | ||
| } catch (Exception e) { | ||
| if (lkg.get() != null) { | ||
| V lkgValue = lkg.get(); | ||
| logger.warn("{} threw exception from get(), using last-known-good value: {}.", name, | ||
| lkgValue, e); | ||
| return lkgValue; | ||
| } | ||
| logger.error("{} threw exception from get(). No last-known-good value is available.", name); | ||
| throw new IllegalArgumentException(name + " threw exception from get().", e); | ||
| } | ||
|
|
||
| if (!validator.test(value)) { | ||
| if (lkg.get() != null) { | ||
| V lkgValue = lkg.get(); | ||
| logger.warn("{} returned invalid value: {}, using last-known-good value: {}.", | ||
| name, | ||
| value, | ||
| lkgValue); | ||
| return lkgValue; | ||
| } | ||
| logger.error("{} returned invalid value: {}. No last-known-good value is available.", | ||
| name, | ||
| value); | ||
| throw new IllegalArgumentException(name + " returned invalid value: " + value); | ||
| } | ||
|
|
||
| lkg.set(value); | ||
| return value; | ||
| }; | ||
| } | ||
|
|
||
| /** | ||
| * After complete processing batch of tasks, | ||
| * *MUST* call {@link BatchingTask#completion}'s {@link DeferredCompletion#complete()} or | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suggest keep current constructor to avoid breaking change, by adding overload like
BatchingProcessor(lingerMillis, capacity) { this(() -> lingerMillis, () -> capacity) ... }