diff --git a/src/main/java/org/dataloader/DataLoaderHelper.java b/src/main/java/org/dataloader/DataLoaderHelper.java index feb6184b..83bb0dfd 100644 --- a/src/main/java/org/dataloader/DataLoaderHelper.java +++ b/src/main/java/org/dataloader/DataLoaderHelper.java @@ -188,6 +188,8 @@ CompletableFuture load(K key, Object loadContext) { } ctx.onDispatched(); + loaderOptions.getDispatchStrategy().loadCalled(dataLoader, key, loadContext, loadCallFuture); + loadCallFuture = loadCallFuture.whenComplete((result, error) -> loaderOptions.getDispatchStrategy().loadCompleted(dataLoader, result, error)); loadCallFuture.whenComplete(ctx::onCompleted); return loadCallFuture; } @@ -195,6 +197,8 @@ CompletableFuture load(K key, Object loadContext) { private CompletableFuture incrementCacheHitAndReturnCF(DataLoaderInstrumentationContext ctx, K key, Object loadContext, CompletableFuture cachedFuture) { stats.incrementCacheHitCount(new IncrementCacheHitCountStatisticsContext<>(key, loadContext)); ctx.onDispatched(); + loaderOptions.getDispatchStrategy().loadCalled(dataLoader, key, loadContext, cachedFuture); + cachedFuture = cachedFuture.whenComplete((result, error) -> loaderOptions.getDispatchStrategy().loadCompleted(dataLoader, result, error)); cachedFuture.whenComplete(ctx::onCompleted); return cachedFuture; } diff --git a/src/main/java/org/dataloader/DataLoaderOptions.java b/src/main/java/org/dataloader/DataLoaderOptions.java index f7c006fa..c374075c 100644 --- a/src/main/java/org/dataloader/DataLoaderOptions.java +++ b/src/main/java/org/dataloader/DataLoaderOptions.java @@ -55,6 +55,7 @@ public class DataLoaderOptions { private final ValueCacheOptions valueCacheOptions; private final BatchLoaderScheduler batchLoaderScheduler; private final DataLoaderInstrumentation instrumentation; + private final DispatchStrategy dispatchStrategy; /** * Creates a new data loader options with default settings. @@ -72,6 +73,7 @@ public DataLoaderOptions() { valueCacheOptions = DEFAULT_VALUE_CACHE_OPTIONS; batchLoaderScheduler = null; instrumentation = DataLoaderInstrumentationHelper.NOOP_INSTRUMENTATION; + dispatchStrategy = DispatchStrategy.NO_OP; } private DataLoaderOptions(Builder builder) { @@ -87,6 +89,7 @@ private DataLoaderOptions(Builder builder) { this.valueCacheOptions = builder.valueCacheOptions; this.batchLoaderScheduler = builder.batchLoaderScheduler; this.instrumentation = builder.instrumentation; + this.dispatchStrategy = builder.dispatchStrategy; } /** @@ -116,6 +119,7 @@ public static DataLoaderOptions.Builder newOptions(DataLoaderOptions otherOption * Will transform the current options in to a builder ands allow you to build a new set of options * * @param builderConsumer the consumer of a builder that has this objects starting values + * * @return a new {@link DataLoaderOptions} object */ public DataLoaderOptions transform(Consumer builderConsumer) { @@ -126,19 +130,22 @@ public DataLoaderOptions transform(Consumer builderConsumer) { @Override public boolean equals(Object o) { - if (o == null || getClass() != o.getClass()) return false; + if (o == null || getClass() != o.getClass()) { + return false; + } DataLoaderOptions that = (DataLoaderOptions) o; return batchingEnabled == that.batchingEnabled - && cachingEnabled == that.cachingEnabled - && cachingExceptionsEnabled == that.cachingExceptionsEnabled - && maxBatchSize == that.maxBatchSize - && Objects.equals(cacheKeyFunction, that.cacheKeyFunction) && - Objects.equals(cacheMap, that.cacheMap) && - Objects.equals(valueCache, that.valueCache) && - Objects.equals(statisticsCollector, that.statisticsCollector) && - Objects.equals(environmentProvider, that.environmentProvider) && - Objects.equals(valueCacheOptions, that.valueCacheOptions) && - Objects.equals(batchLoaderScheduler, that.batchLoaderScheduler); + && cachingEnabled == that.cachingEnabled + && cachingExceptionsEnabled == that.cachingExceptionsEnabled + && maxBatchSize == that.maxBatchSize + && Objects.equals(cacheKeyFunction, that.cacheKeyFunction) && + Objects.equals(cacheMap, that.cacheMap) && + Objects.equals(valueCache, that.valueCache) && + Objects.equals(statisticsCollector, that.statisticsCollector) && + Objects.equals(environmentProvider, that.environmentProvider) && + Objects.equals(valueCacheOptions, that.valueCacheOptions) && + Objects.equals(batchLoaderScheduler, that.batchLoaderScheduler) && + Objects.equals(dispatchStrategy, that.dispatchStrategy); } @@ -254,7 +261,15 @@ public DataLoaderInstrumentation getInstrumentation() { return instrumentation; } + /** + * @return the {@link DispatchStrategy} to use for dispatching batch loads + */ + public DispatchStrategy getDispatchStrategy() { + return dispatchStrategy; + } + public static class Builder { + private DispatchStrategy dispatchStrategy = DispatchStrategy.NO_OP; private boolean batchingEnabled; private boolean cachingEnabled; private boolean cachingExceptionsEnabled; @@ -285,12 +300,14 @@ public Builder() { this.valueCacheOptions = other.valueCacheOptions; this.batchLoaderScheduler = other.batchLoaderScheduler; this.instrumentation = other.instrumentation; + this.dispatchStrategy = other.dispatchStrategy; } /** * Sets the option that determines whether batch loading is enabled. * * @param batchingEnabled {@code true} to enable batch loading, {@code false} otherwise + * * @return this builder for fluent coding */ public Builder setBatchingEnabled(boolean batchingEnabled) { @@ -302,6 +319,7 @@ public Builder setBatchingEnabled(boolean batchingEnabled) { * Sets the option that determines whether caching is enabled. * * @param cachingEnabled {@code true} to enable caching, {@code false} otherwise + * * @return this builder for fluent coding */ public Builder setCachingEnabled(boolean cachingEnabled) { @@ -313,6 +331,7 @@ public Builder setCachingEnabled(boolean cachingEnabled) { * Sets the option that determines whether exceptional values are cache enabled. * * @param cachingExceptionsEnabled {@code true} to enable caching exceptional values, {@code false} otherwise + * * @return this builder for fluent coding */ public Builder setCachingExceptionsEnabled(boolean cachingExceptionsEnabled) { @@ -324,6 +343,7 @@ public Builder setCachingExceptionsEnabled(boolean cachingExceptionsEnabled) { * Sets the function to use for creating the cache key, if caching is enabled. * * @param cacheKeyFunction the cache key function to use + * * @return this builder for fluent coding */ public Builder setCacheKeyFunction(CacheKey cacheKeyFunction) { @@ -335,6 +355,7 @@ public Builder setCacheKeyFunction(CacheKey cacheKeyFunction) { * Sets the cache map implementation to use for caching, if caching is enabled. * * @param cacheMap the cache map instance + * * @return this builder for fluent coding */ public Builder setCacheMap(CacheMap cacheMap) { @@ -346,6 +367,7 @@ public Builder setCacheMap(CacheMap cacheMap) { * Sets the value cache implementation to use for caching values, if caching is enabled. * * @param valueCache the value cache instance + * * @return this builder for fluent coding */ public Builder setValueCache(ValueCache valueCache) { @@ -358,6 +380,7 @@ public Builder setValueCache(ValueCache valueCache) { * before they are split into multiple class * * @param maxBatchSize the maximum batch size + * * @return this builder for fluent coding */ public Builder setMaxBatchSize(int maxBatchSize) { @@ -371,6 +394,7 @@ public Builder setMaxBatchSize(int maxBatchSize) { * a common value * * @param statisticsCollector the statistics collector to use + * * @return this builder for fluent coding */ public Builder setStatisticsCollector(Supplier statisticsCollector) { @@ -382,6 +406,7 @@ public Builder setStatisticsCollector(Supplier statisticsCo * Sets the batch loader environment provider that will be used to give context to batch load functions * * @param environmentProvider the batch loader context provider + * * @return this builder for fluent coding */ public Builder setBatchLoaderContextProvider(BatchLoaderContextProvider environmentProvider) { @@ -393,6 +418,7 @@ public Builder setBatchLoaderContextProvider(BatchLoaderContextProvider environm * Sets the {@link ValueCacheOptions} that control how the {@link ValueCache} will be used * * @param valueCacheOptions the value cache options + * * @return this builder for fluent coding */ public Builder setValueCacheOptions(ValueCacheOptions valueCacheOptions) { @@ -405,6 +431,7 @@ public Builder setValueCacheOptions(ValueCacheOptions valueCacheOptions) { * to some future time. * * @param batchLoaderScheduler the scheduler + * * @return this builder for fluent coding */ public Builder setBatchLoaderScheduler(BatchLoaderScheduler batchLoaderScheduler) { @@ -416,6 +443,7 @@ public Builder setBatchLoaderScheduler(BatchLoaderScheduler batchLoaderScheduler * Sets in a new {@link DataLoaderInstrumentation} * * @param instrumentation the new {@link DataLoaderInstrumentation} + * * @return this builder for fluent coding */ public Builder setInstrumentation(DataLoaderInstrumentation instrumentation) { @@ -423,6 +451,18 @@ public Builder setInstrumentation(DataLoaderInstrumentation instrumentation) { return this; } + /** + * Sets in a new {@link DispatchStrategy} + * + * @param dispatchStrategy the new {@link DispatchStrategy} + * + * @return the builder for fluent coding + */ + public Builder setDispatchStrategy(DispatchStrategy dispatchStrategy) { + this.dispatchStrategy = dispatchStrategy; + return this; + } + public DataLoaderOptions build() { return new DataLoaderOptions(this); } diff --git a/src/main/java/org/dataloader/DataLoaderRegistry.java b/src/main/java/org/dataloader/DataLoaderRegistry.java index 6bc79f64..9b8cb778 100644 --- a/src/main/java/org/dataloader/DataLoaderRegistry.java +++ b/src/main/java/org/dataloader/DataLoaderRegistry.java @@ -46,24 +46,28 @@ public class DataLoaderRegistry { protected final Map> dataLoaders; protected final @Nullable DataLoaderInstrumentation instrumentation; + private final DispatchStrategy dispatchStrategy; + public DataLoaderRegistry() { - this(new ConcurrentHashMap<>(), null); + this(new ConcurrentHashMap<>(), null, DispatchStrategy.NO_OP); } private DataLoaderRegistry(Builder builder) { - this(builder.dataLoaders, builder.instrumentation); + this(builder.dataLoaders, builder.instrumentation, builder.dispatchStrategy); } - protected DataLoaderRegistry(Map> dataLoaders, @Nullable DataLoaderInstrumentation instrumentation) { - this.dataLoaders = instrumentDLs(dataLoaders, instrumentation); + protected DataLoaderRegistry(Map> dataLoaders, @Nullable DataLoaderInstrumentation instrumentation, DispatchStrategy dispatchStrategy) { + this.dataLoaders = instrumentDLs(dataLoaders, instrumentation, dispatchStrategy); this.instrumentation = instrumentation; + this.dispatchStrategy = dispatchStrategy; + dispatchStrategy.onRegistryCreation(this); } - private Map> instrumentDLs(Map> incomingDataLoaders, @Nullable DataLoaderInstrumentation registryInstrumentation) { + private Map> instrumentDLs(Map> incomingDataLoaders, @Nullable DataLoaderInstrumentation registryInstrumentation, DispatchStrategy dispatchStrategy) { Map> dataLoaders = new ConcurrentHashMap<>(incomingDataLoaders); - if (registryInstrumentation != null) { - dataLoaders.replaceAll((k, existingDL) -> nameAndInstrumentDL(k, registryInstrumentation, existingDL)); + if (registryInstrumentation != null || dispatchStrategy != DispatchStrategy.NO_OP) { + dataLoaders.replaceAll((k, existingDL) -> nameAndInstrumentDL(k, registryInstrumentation, existingDL, dispatchStrategy)); } return dataLoaders; } @@ -74,12 +78,13 @@ protected DataLoaderRegistry(Map> dataLoaders, @Nullabl * @param key the key used to register the data loader * @param registryInstrumentation the common registry {@link DataLoaderInstrumentation} * @param existingDL the existing data loader + * * @return a new {@link DataLoader} or the same one if there is nothing to change */ - private static DataLoader nameAndInstrumentDL(String key, @Nullable DataLoaderInstrumentation registryInstrumentation, DataLoader existingDL) { + private static DataLoader nameAndInstrumentDL(String key, @Nullable DataLoaderInstrumentation registryInstrumentation, DataLoader existingDL, DispatchStrategy dispatchStrategy) { existingDL = checkAndSetName(key, existingDL); - if (registryInstrumentation == null) { + if (registryInstrumentation == null && dispatchStrategy == DispatchStrategy.NO_OP) { return existingDL; } DataLoaderOptions options = existingDL.getOptions(); @@ -92,18 +97,18 @@ protected DataLoaderRegistry(Map> dataLoaders, @Nullabl } if (existingInstrumentation == DataLoaderInstrumentationHelper.NOOP_INSTRUMENTATION) { // replace it with the registry one - return mkInstrumentedDataLoader(existingDL, options, registryInstrumentation); + return mkInstrumentedDataLoader(existingDL, options, registryInstrumentation, dispatchStrategy); } if (existingInstrumentation instanceof ChainedDataLoaderInstrumentation) { // avoids calling a chained inside a chained DataLoaderInstrumentation newInstrumentation = ((ChainedDataLoaderInstrumentation) existingInstrumentation).prepend(registryInstrumentation); - return mkInstrumentedDataLoader(existingDL, options, newInstrumentation); + return mkInstrumentedDataLoader(existingDL, options, newInstrumentation, dispatchStrategy); } else { DataLoaderInstrumentation newInstrumentation = new ChainedDataLoaderInstrumentation().add(registryInstrumentation).add(existingInstrumentation); - return mkInstrumentedDataLoader(existingDL, options, newInstrumentation); + return mkInstrumentedDataLoader(existingDL, options, newInstrumentation, dispatchStrategy); } } else { - return mkInstrumentedDataLoader(existingDL, options, registryInstrumentation); + return mkInstrumentedDataLoader(existingDL, options, registryInstrumentation, dispatchStrategy); } } @@ -116,12 +121,17 @@ protected DataLoaderRegistry(Map> dataLoaders, @Nullabl return dataLoader; } - private static DataLoader mkInstrumentedDataLoader(DataLoader existingDL, DataLoaderOptions options, DataLoaderInstrumentation newInstrumentation) { - return existingDL.transform(builder -> builder.options(setInInstrumentation(options, newInstrumentation))); + private static DataLoader mkInstrumentedDataLoader(DataLoader existingDL, DataLoaderOptions options, @Nullable DataLoaderInstrumentation newInstrumentation, DispatchStrategy dispatchStrategy) { + return existingDL.transform(builder -> builder.options(setInInstrumentation(options, newInstrumentation, dispatchStrategy))); } - private static DataLoaderOptions setInInstrumentation(DataLoaderOptions options, DataLoaderInstrumentation newInstrumentation) { - return options.transform(optionsBuilder -> optionsBuilder.setInstrumentation(newInstrumentation)); + private static DataLoaderOptions setInInstrumentation(DataLoaderOptions options, @Nullable DataLoaderInstrumentation newInstrumentation, DispatchStrategy dispatchStrategy) { + return options.transform(optionsBuilder -> { + optionsBuilder.setDispatchStrategy(dispatchStrategy); + if (newInstrumentation != null) { + optionsBuilder.setInstrumentation(newInstrumentation); + } + }); } /** @@ -140,11 +150,12 @@ private static DataLoaderOptions setInInstrumentation(DataLoaderOptions options, * object that was registered. * * @param dataLoader the named data loader to register + * * @return this registry */ public DataLoaderRegistry register(DataLoader dataLoader) { String name = Assertions.nonNull(dataLoader.getName(), () -> "The DataLoader must have a non null name"); - dataLoaders.put(name, nameAndInstrumentDL(name, instrumentation, dataLoader)); + dataLoaders.put(name, nameAndInstrumentDL(name, instrumentation, dataLoader, dispatchStrategy)); return this; } @@ -157,10 +168,11 @@ public DataLoaderRegistry register(DataLoader dataLoader) { * * @param key the key to put the data loader under * @param dataLoader the data loader to register + * * @return this registry */ public DataLoaderRegistry register(String key, DataLoader dataLoader) { - dataLoaders.put(key, nameAndInstrumentDL(key, instrumentation, dataLoader)); + dataLoaders.put(key, nameAndInstrumentDL(key, instrumentation, dataLoader, dispatchStrategy)); return this; } @@ -173,10 +185,11 @@ public DataLoaderRegistry register(String key, DataLoader dataLoader) { * * @param key the key to put the data loader under * @param dataLoader the data loader to register + * * @return the data loader instance that was registered */ public DataLoader registerAndGet(String key, DataLoader dataLoader) { - dataLoaders.put(key, nameAndInstrumentDL(key, instrumentation, dataLoader)); + dataLoaders.put(key, nameAndInstrumentDL(key, instrumentation, dataLoader, dispatchStrategy)); return Objects.requireNonNull(getDataLoader(key)); } @@ -195,6 +208,7 @@ public DataLoader registerAndGet(String key, DataLoader dataL * @param mappingFunction the function to compute a data loader * @param the type of keys * @param the type of values + * * @return a data loader */ @SuppressWarnings("unchecked") @@ -202,7 +216,7 @@ public DataLoader computeIfAbsent(final String key, final Function> mappingFunction) { return (DataLoader) dataLoaders.computeIfAbsent(key, (k) -> { DataLoader dl = mappingFunction.apply(k); - return nameAndInstrumentDL(key, instrumentation, dl); + return nameAndInstrumentDL(key, instrumentation, dl, dispatchStrategy); }); } @@ -211,6 +225,7 @@ public DataLoader computeIfAbsent(final String key, * and return a new combined registry * * @param registry the registry to combine into this registry + * * @return a new combined registry */ public DataLoaderRegistry combine(DataLoaderRegistry registry) { @@ -239,6 +254,7 @@ public DataLoaderRegistry combine(DataLoaderRegistry registry) { * This will unregister a new dataloader * * @param key the key of the data loader to unregister + * * @return this registry */ public DataLoaderRegistry unregister(String key) { @@ -252,6 +268,7 @@ public DataLoaderRegistry unregister(String key) { * @param key the key of the data loader * @param the type of keys * @param the type of values + * * @return a data loader or null if it's not present */ @SuppressWarnings("unchecked") @@ -322,6 +339,7 @@ public static Builder newRegistry() { public static class Builder { private final Map> dataLoaders = new HashMap<>(); + private DispatchStrategy dispatchStrategy = DispatchStrategy.NO_OP; private @Nullable DataLoaderInstrumentation instrumentation; /** @@ -329,6 +347,7 @@ public static class Builder { * * @param key the key to put the data loader under * @param dataLoader the data loader to register + * * @return this builder for a fluent pattern */ public Builder register(String key, DataLoader dataLoader) { @@ -341,6 +360,7 @@ public Builder register(String key, DataLoader dataLoader) { * from a previous {@link DataLoaderRegistry} * * @param otherRegistry the previous {@link DataLoaderRegistry} + * * @return this builder for a fluent pattern */ public Builder registerAll(DataLoaderRegistry otherRegistry) { @@ -348,11 +368,30 @@ public Builder registerAll(DataLoaderRegistry otherRegistry) { return this; } + /** + * The {@link DataLoaderInstrumentation} to use for this registry + * + * @param instrumentation instrumentation to use + * + * @return the builder for a fluent pattern + */ public Builder instrumentation(DataLoaderInstrumentation instrumentation) { this.instrumentation = instrumentation; return this; } + /** + * The {@link DispatchStrategy} to use for this registry + * + * @param dispatchStrategy strategy to use + * + * @return this builder for a fluent pattern + */ + public Builder dispatchStrategy(DispatchStrategy dispatchStrategy) { + this.dispatchStrategy = dispatchStrategy; + return this; + } + /** * @return the newly built {@link DataLoaderRegistry} */ diff --git a/src/main/java/org/dataloader/DispatchStrategy.java b/src/main/java/org/dataloader/DispatchStrategy.java new file mode 100644 index 00000000..c3016b84 --- /dev/null +++ b/src/main/java/org/dataloader/DispatchStrategy.java @@ -0,0 +1,44 @@ +package org.dataloader; + +import org.dataloader.annotations.PublicApi; +import org.jspecify.annotations.NullMarked; +import org.jspecify.annotations.Nullable; + +import java.util.concurrent.CompletableFuture; + + +/** + * An interface to implement to allow for custom dispatch strategies when executing {@link DataLoader}s + */ +@NullMarked +@PublicApi +public interface DispatchStrategy { + + /** + * A {@link DispatchStrategy} that does nothing + */ + DispatchStrategy NO_OP = new DispatchStrategy() { + }; + + /** + * Lifecycle method called when the registry is created that this dispatch strategy is attached to + * @param registry the {@link DataLoaderRegistry} this dispatch strategy is attached to + */ + default void onRegistryCreation(DataLoaderRegistry registry) { + + } + + /** + * Called when a {@link DataLoader#load(Object)} is called on a dataloader + */ + default void loadCalled(DataLoader dataLoader, Object key, @Nullable Object loadContext, CompletableFuture result) { + + } + + /** + * Called when a {@link DataLoader#load(Object)} is executed and completed on a dataloader + */ + default void loadCompleted(DataLoader dataLoader, @Nullable Object result, @Nullable Throwable error) { + + } +} diff --git a/src/main/java/org/dataloader/registries/ScheduledDataLoaderRegistry.java b/src/main/java/org/dataloader/registries/ScheduledDataLoaderRegistry.java index 4f62378d..33498b5c 100644 --- a/src/main/java/org/dataloader/registries/ScheduledDataLoaderRegistry.java +++ b/src/main/java/org/dataloader/registries/ScheduledDataLoaderRegistry.java @@ -2,6 +2,7 @@ import org.dataloader.DataLoader; import org.dataloader.DataLoaderRegistry; +import org.dataloader.DispatchStrategy; import org.dataloader.annotations.ExperimentalApi; import org.dataloader.impl.Assertions; import org.dataloader.instrumentation.DataLoaderInstrumentation; @@ -69,7 +70,7 @@ public class ScheduledDataLoaderRegistry extends DataLoaderRegistry implements A private volatile boolean closed; private ScheduledDataLoaderRegistry(Builder builder) { - super(builder.dataLoaders, builder.instrumentation); + super(builder.dataLoaders, builder.instrumentation, DispatchStrategy.NO_OP); this.scheduledExecutorService = Assertions.nonNull(builder.scheduledExecutorService); this.defaultExecutorUsed = builder.defaultExecutorUsed; this.schedule = builder.schedule; diff --git a/src/main/java/org/dataloader/strategy/GreedyLevelByLevelChainedDispatchStrategy.java b/src/main/java/org/dataloader/strategy/GreedyLevelByLevelChainedDispatchStrategy.java new file mode 100644 index 00000000..fb694249 --- /dev/null +++ b/src/main/java/org/dataloader/strategy/GreedyLevelByLevelChainedDispatchStrategy.java @@ -0,0 +1,170 @@ +package org.dataloader.strategy; + +import org.dataloader.DataLoader; +import org.dataloader.DataLoaderRegistry; +import org.dataloader.DispatchStrategy; +import org.dataloader.annotations.PublicApi; +import org.dataloader.annotations.VisibleForTesting; +import org.dataloader.impl.Assertions; +import org.jspecify.annotations.NullMarked; +import org.jspecify.annotations.Nullable; + +import java.time.Duration; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * A {@link DispatchStrategy} which balances batching and performance by dispatching level by level with minimal waiting. + *

+ * We use a fallback {@link ScheduledExecutorService} to handle when work is stuck due to async calls in the chain for + * chained dataloaders. This minimizes the amount of threads spawned to only be used when there is no known work to be + * done and the chain is not finished. + *

+ * Due to the concept of 'known' work we greedily walk the chain instead of waiting for async calls to finish before + * kicking off the next level. + *

+ * In practice this will greedily fill up DataLoader keys while walking the chain to provide a nice balance of + * batching/dedupe/caching while not needing to worry about manually dispatching the tree. + */ +@PublicApi +@NullMarked +public class GreedyLevelByLevelChainedDispatchStrategy implements DispatchStrategy { + + private static final Duration DEFAULT_FALLBACK_TIMEOUT = Duration.ofMillis(30); + + private final ScheduledExecutorService scheduledExecutorService; + private final AtomicInteger pendingLoadCount = new AtomicInteger(0); + private final AtomicInteger totalWorkCount = new AtomicInteger(0); + private final Object dispatchLock = new Object(); + + // only used for tests + Runnable onIteration = () -> { + + }; + + private final Duration fallbackTimeout; + @Nullable private ScheduledFuture fallbackDispatchFuture = null; + + @Nullable private Runnable dispatchCallback; + + private GreedyLevelByLevelChainedDispatchStrategy(Builder builder) { + this.scheduledExecutorService = builder.scheduledExecutorService; + this.fallbackTimeout = builder.fallbackTimeout; + } + + @Override + public void onRegistryCreation(DataLoaderRegistry registry) { + dispatchCallback = registry::dispatchAll; + } + + @Override + public void loadCalled(DataLoader dataLoader, Object key, @Nullable Object loaderContext, CompletableFuture result) { + // initial load called + pendingLoadCount.incrementAndGet(); + int previousTotal = totalWorkCount.getAndIncrement(); + if (previousTotal == 0) { + triggerDeterministicDispatch(); + } + } + + @Override + public void loadCompleted(DataLoader dataLoader, @Nullable Object result, @Nullable Throwable error) { + pendingLoadCount.decrementAndGet(); + } + + private void triggerDeterministicDispatch() { + synchronized (dispatchLock) { + if (dispatchCallback == null) { + throw new IllegalStateException("Dispatch strategy started without being registered to registry"); + } + + // sanity check + if (pendingLoadCount.get() == 0) { + return; + } + + while (pendingLoadCount.get() > 0) { + onIteration.run(); + + int workBefore = totalWorkCount.get(); + + dispatchCallback.run(); + + int workAfter = totalWorkCount.get(); + int pendingAfter = pendingLoadCount.get(); + + // no progress but not done - trigger async check + if (workAfter == workBefore && pendingAfter > 0) { + scheduleFallbackDispatch(); + break; + } + + // completed + if (pendingAfter == 0) { + resetState(); + } + } + } + } + + private synchronized void scheduleFallbackDispatch() { + // fallback already scheduled, don't reschedule + if (fallbackDispatchFuture != null && !fallbackDispatchFuture.isDone()) { + return; + } + + fallbackDispatchFuture = + scheduledExecutorService.schedule( + () -> { + // clear the future so we can start scheduling again + synchronized (this) { + fallbackDispatchFuture = null; + } + triggerDeterministicDispatch(); + }, + fallbackTimeout.toMillis(), + TimeUnit.MILLISECONDS + ); + } + + private synchronized void resetState() { + pendingLoadCount.set(0); + totalWorkCount.set(0); + if (fallbackDispatchFuture != null) { + fallbackDispatchFuture.cancel(false); + } + } + + @VisibleForTesting + void onIteration(Runnable onIteration) { + this.onIteration = onIteration; + } + + public static class Builder { + private Duration fallbackTimeout = DEFAULT_FALLBACK_TIMEOUT; + private final ScheduledExecutorService scheduledExecutorService; + + public Builder(ScheduledExecutorService scheduledExecutorService) { + this.scheduledExecutorService = Assertions.nonNull(scheduledExecutorService); + } + + public Builder setFallbackTimeout(Duration fallbackTimeout) { + if (fallbackTimeout == null) { + throw new IllegalArgumentException("fallbackTimeout must not be null"); + } + if (fallbackTimeout.isZero() || fallbackTimeout.isNegative()) { + throw new IllegalArgumentException("fallbackTimeout must be a positive duration"); + } + this.fallbackTimeout = Assertions.nonNull(fallbackTimeout); + return this; + } + + public GreedyLevelByLevelChainedDispatchStrategy build() { + return new GreedyLevelByLevelChainedDispatchStrategy(this); + } + + } +} diff --git a/src/test/java/org/dataloader/DataLoaderCacheMapTest.java b/src/test/java/org/dataloader/DataLoaderCacheMapTest.java index d3de4aad..f24c6bd4 100644 --- a/src/test/java/org/dataloader/DataLoaderCacheMapTest.java +++ b/src/test/java/org/dataloader/DataLoaderCacheMapTest.java @@ -65,7 +65,8 @@ public void should_access_to_future_dependants() { Collection> futures = dataLoader.getCacheMap().getAll(); List> futuresList = new ArrayList<>(futures); - assertThat(futuresList.get(0).getNumberOfDependents(), equalTo(4)); // instrumentation is depending on the CF completing - assertThat(futuresList.get(1).getNumberOfDependents(), equalTo(2)); + // instrumentation is depending on the CF completing + dispatch strategy + assertThat(futuresList.get(0).getNumberOfDependents(), equalTo(6)); + assertThat(futuresList.get(1).getNumberOfDependents(), equalTo(3)); } } diff --git a/src/test/java/org/dataloader/strategy/GreedyLevelByLevelChainedDispatchStrategyStressTest.java b/src/test/java/org/dataloader/strategy/GreedyLevelByLevelChainedDispatchStrategyStressTest.java new file mode 100644 index 00000000..7507b272 --- /dev/null +++ b/src/test/java/org/dataloader/strategy/GreedyLevelByLevelChainedDispatchStrategyStressTest.java @@ -0,0 +1,286 @@ +package org.dataloader.strategy; + +import org.dataloader.BatchLoader; +import org.dataloader.DataLoaderFactory; +import org.dataloader.DataLoaderRegistry; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.stream.Collectors; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; + +public class GreedyLevelByLevelChainedDispatchStrategyStressTest { + + private int iterationCount; + private List> dispatchOrder; + private List> queueOrder; + private List> completionOrder; + private DataLoaderRegistry registry; + private CountDownLatch bLatch; + private CountDownLatch gLatch; + private CountDownLatch aStarted; + private CountDownLatch gCompleted; + private CountDownLatch iCompleted; + + /* + Simulating tree with async conditions + + A + B (async) completed before G + E + F + C + G (async) completes last + H + D + I + J + */ + @BeforeEach + public void setup() { + dispatchOrder = new ArrayList<>(); + queueOrder = new ArrayList<>(); + completionOrder = new ArrayList<>(); + for (int i = 0; i < 5; i++) { + dispatchOrder.add(new ArrayList<>()); + queueOrder.add(new ArrayList<>()); + completionOrder.add(new ArrayList<>()); + } + addAtIteration(queueOrder, "A"); + bLatch = new CountDownLatch(1); + gLatch = new CountDownLatch(1); + aStarted = new CountDownLatch(1); + gCompleted = new CountDownLatch(1); + iCompleted = new CountDownLatch(1); + iterationCount = 1; + GreedyLevelByLevelChainedDispatchStrategy greedyLevelByLevelChainedDispatchStrategy = + new GreedyLevelByLevelChainedDispatchStrategy.Builder(Executors.newSingleThreadScheduledExecutor()) + .setFallbackTimeout(Duration.ofMillis(300)).build(); + greedyLevelByLevelChainedDispatchStrategy.onIteration(() -> iterationCount += 1); + registry = DataLoaderRegistry.newRegistry() + .dispatchStrategy(greedyLevelByLevelChainedDispatchStrategy) + .build(); + + + // Loaders named after diagram above + BatchLoader eLoader = keys -> { + addAtIteration(dispatchOrder, "E"); + return CompletableFuture.completedFuture(keys); + }; + BatchLoader fLoader = keys -> { + addAtIteration(dispatchOrder, "F"); + return CompletableFuture.completedFuture(keys); + }; + BatchLoader hLoader = keys -> { + addAtIteration(dispatchOrder, "H"); + return CompletableFuture.completedFuture(keys); + }; + BatchLoader iLoader = keys -> { + addAtIteration(dispatchOrder, "I"); + return CompletableFuture.completedFuture(keys); + }; + BatchLoader jLoader = keys -> { + addAtIteration(dispatchOrder, "J"); + return CompletableFuture.completedFuture(keys); + }; + + BatchLoader gLoader = keys -> { + addAtIteration(dispatchOrder, "G"); + CompletableFuture> gFuture = new CompletableFuture<>(); + CompletableFuture.runAsync(() -> { + try { + gLatch.await(); + gFuture.complete(keys); + } catch (InterruptedException e) { + // do nothing + } + }); + return gFuture; + }; + + BatchLoader bLoader = keys -> { + addAtIteration(dispatchOrder, "B"); + CompletableFuture> bFuture = new CompletableFuture<>(); + CompletableFuture.runAsync(() -> { + try { + bLatch.await(); + CompletableFuture eResult = registry.getDataLoader("eLoader").load(keys.get(0)) + .whenComplete((result, error) -> addAtIteration(completionOrder, "E")); + addAtIteration(queueOrder, "E"); + CompletableFuture fResult = registry.getDataLoader("fLoader").load(keys.get(0)) + .whenComplete((result, error) -> addAtIteration(completionOrder, "F")); + addAtIteration(queueOrder, "F"); + eResult.thenCombine(fResult, (eNum, fNum) -> List.of(eNum + fNum)) + .thenAccept(bFuture::complete); + } catch (InterruptedException e) { + // do nothing + } + }); + return bFuture; + }; + + BatchLoader cLoader = keys -> { + addAtIteration(dispatchOrder, "C"); + CompletableFuture gResult = registry.getDataLoader("gLoader").load(keys.get(0)) + .whenComplete((result, error) -> { + addAtIteration(completionOrder, "G"); + gCompleted.countDown(); + }); + addAtIteration(queueOrder, "G"); + CompletableFuture hResult = registry.getDataLoader("hLoader").load(keys.get(0)) + .whenComplete((result, error) -> addAtIteration(completionOrder, "H")); + addAtIteration(queueOrder, "H"); + + return gResult.thenCombine(hResult, (gNum, hNum) -> List.of(gNum + hNum)); + }; + + BatchLoader dLoader = keys -> { + addAtIteration(dispatchOrder, "D"); + CompletableFuture iResult = registry.getDataLoader("iLoader").load(keys.get(0)) + .whenComplete((result, error) -> { + addAtIteration(completionOrder, "I"); + iCompleted.countDown(); + }); + addAtIteration(queueOrder, "I"); + CompletableFuture jResult = registry.getDataLoader("jLoader").load(keys.get(0)) + .whenComplete((result, error) -> addAtIteration(completionOrder, "J")); + addAtIteration(queueOrder, "J"); + + return iResult.thenCombine(jResult, (iNum, jNum) -> List.of(iNum + jNum)); + }; + + BatchLoader aLoader = keys -> { + aStarted.countDown(); + addAtIteration(dispatchOrder, "A"); + CompletableFuture bResult = registry.getDataLoader("bLoader").load(keys.get(0)) + .whenComplete((result, error) -> addAtIteration(completionOrder, "B")); + addAtIteration(queueOrder, "B"); + CompletableFuture cResult = registry.getDataLoader("cLoader").load(keys.get(0)) + .whenComplete((result, error) -> addAtIteration(completionOrder, "C")); + addAtIteration(queueOrder, "C"); + CompletableFuture dResult = registry.getDataLoader("dLoader").load(keys.get(0)) + .whenComplete((result, error) -> addAtIteration(completionOrder, "D")); + addAtIteration(queueOrder, "D"); + + return CompletableFuture.allOf(bResult, cResult, dResult).thenApply(unused -> { + int bNum = bResult.join(); + int cNum = cResult.join(); + int dNum = dResult.join(); + + return List.of(bNum + cNum + dNum); + }).whenComplete((result, error) -> addAtIteration(completionOrder, "A")); + }; + + registry.register("aLoader", DataLoaderFactory.newDataLoader(aLoader)); + registry.register("bLoader", DataLoaderFactory.newDataLoader(bLoader)); + registry.register("cLoader", DataLoaderFactory.newDataLoader(cLoader)); + registry.register("dLoader", DataLoaderFactory.newDataLoader(dLoader)); + registry.register("eLoader", DataLoaderFactory.newDataLoader(eLoader)); + registry.register("fLoader", DataLoaderFactory.newDataLoader(fLoader)); + registry.register("gLoader", DataLoaderFactory.newDataLoader(gLoader)); + registry.register("hLoader", DataLoaderFactory.newDataLoader(hLoader)); + registry.register("iLoader", DataLoaderFactory.newDataLoader(iLoader)); + registry.register("jLoader", DataLoaderFactory.newDataLoader(jLoader)); + } + + + /* + Explanation of assertions. + + G and B are async + G unlocked once leaf nodes have started + + B unlocked once G completes + + Dispatch order + Iteration 1: - Due to dataloader order in the registry C is dispatched and H is dispatched greedily + A, B, C, D H + Iteration 2: + G, I, J - E and F are blocked by B as they are async chained + Iteration 3: + E, F - B has unlocked and allowed dispatching of E and F + + Queue Order + Iteration 1: - + A + Iteration 2: + B, C, D, G, H, I, J - All but E and F queued as we get as much work as possible + Iteration 3 + E, F - B unlocks E and F once async call completes + + Completion Order + H, J, I, D, G, C, E, F, B, A + + Walk the tree up from roots greedily as calls finish. + D finishes first as no blocks + C finishes second as G is async + B finishes last as well as E and F leafs as they are blocked by async B finishing + */ + @Test + void verifyExecutionOrder() throws Exception { + CompletableFuture result = CompletableFuture.supplyAsync(() -> registry.getDataLoader("aLoader").load(1).join(), + Executors.newSingleThreadExecutor()); + + aStarted.await(); + + // do not release g until leaf level started + iCompleted.await(); + + // g call finished + gLatch.countDown(); + + // do not release b until leafs completed + gCompleted.await(); + + // b call finished + bLatch.countDown(); + + int resultNum = result.join(); + + // 6 leaf nodes added together + assertThat(resultNum, equalTo(6)); + + // clean up padded lists + dispatchOrder = dispatchOrder.stream().filter(list -> !list.isEmpty()).collect(Collectors.toList()); + queueOrder = queueOrder.stream().filter(list -> !list.isEmpty()).collect(Collectors.toList()); + List flatCompletionOrder = completionOrder.stream().flatMap(List::stream).collect(Collectors.toList()); + + // Due to DataLoaders queueing other dataloaders during dispatch more work is done than level by level + assertThat(dispatchOrder, equalTo(List.of( + List.of("A", "C", "B", "H", "D"), + List.of("G", "J", "I"), + List.of("E", "F") + ))); + // Greedily queues all known work capable + assertThat(queueOrder, equalTo(List.of( + List.of("A"), + List.of("B", "C", "D", "G", "H", "I", "J"), + List.of("E", "F") + ))); + + assertThat(completionOrder, equalTo(List.of( + "H", + "J", + "I", + "D", + "G", + "C", + "E", + "F", + "B", + "A" + ))); + } + + private void addAtIteration(List> aList, String toAdd) { + aList.get(iterationCount).add(toAdd); + } +} diff --git a/src/test/java/org/dataloader/strategy/GreedyLevelByLevelChainedDispatchStrategyTest.java b/src/test/java/org/dataloader/strategy/GreedyLevelByLevelChainedDispatchStrategyTest.java new file mode 100644 index 00000000..4cb0ee9d --- /dev/null +++ b/src/test/java/org/dataloader/strategy/GreedyLevelByLevelChainedDispatchStrategyTest.java @@ -0,0 +1,229 @@ +package org.dataloader.strategy; + +import org.dataloader.BatchLoader; +import org.dataloader.DataLoader; +import org.dataloader.DataLoaderFactory; +import org.dataloader.DataLoaderRegistry; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.stream.Collectors; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; + +public class GreedyLevelByLevelChainedDispatchStrategyTest { + + private ScheduledExecutorService scheduledExecutorService; + + @BeforeEach + public void setUp() { + this.scheduledExecutorService = Executors.newScheduledThreadPool(2); + } + + @AfterEach + public void cleanUp() { + this.scheduledExecutorService.shutdownNow(); + } + + @Test + void singleDepthLoadSucceeds() throws Exception { + DataLoaderRegistry registry = DataLoaderRegistry.newRegistry() + .register(SimpleLoader.class.getSimpleName(), DataLoaderFactory.newDataLoader(new SimpleLoader())) + .dispatchStrategy(new GreedyLevelByLevelChainedDispatchStrategy.Builder(scheduledExecutorService).build()) + .build(); + DataLoader loader = registry.getDataLoader(SimpleLoader.class.getSimpleName()); + CompletableFuture result = loader.load(1); + assertThat(result.isDone(), equalTo(true)); + assertThat(result.get(), equalTo(1)); + } + + @Test + void singleDepthLoadSucceedsMultipleTimes() throws Exception { + DataLoaderRegistry registry = DataLoaderRegistry.newRegistry() + .register(SimpleLoader.class.getSimpleName(), DataLoaderFactory.newDataLoader(new SimpleLoader())) + .dispatchStrategy(new GreedyLevelByLevelChainedDispatchStrategy.Builder(scheduledExecutorService).build()) + .build(); + DataLoader loader = registry.getDataLoader(SimpleLoader.class.getSimpleName()); + CompletableFuture result = loader.load(1); + assertThat(result.isDone(), equalTo(true)); + assertThat(result.get(), equalTo(1)); + + // state reset, kick off another load + CompletableFuture result2 = loader.load(1); + assertThat(result2.isDone(), equalTo(true)); + assertThat(result2.get(), equalTo(1)); + } + + @Test + void chainedLoaderSucceeds() throws Exception { + DataLoaderRegistry registry = DataLoaderRegistry.newRegistry() + .dispatchStrategy(new GreedyLevelByLevelChainedDispatchStrategy.Builder(scheduledExecutorService).build()) + .build(); + registry.register(SimpleLoader.class.getSimpleName(), DataLoaderFactory.newDataLoader(new SimpleLoader())); + registry.register(ChainedLoader.class.getSimpleName(), DataLoaderFactory.newDataLoader(new ChainedLoader(registry))); + DataLoader loader = registry.getDataLoader(ChainedLoader.class.getSimpleName()); + CompletableFuture result = loader.load(1); + assertThat(result.isDone(), equalTo(true)); + assertThat(result.get(), equalTo(1)); + } + + @Test + void chainedAsyncLoaderSucceeds() { + CountDownLatch latch = new CountDownLatch(1); + DataLoaderRegistry registry = DataLoaderRegistry.newRegistry() + .dispatchStrategy(new GreedyLevelByLevelChainedDispatchStrategy.Builder(scheduledExecutorService).build()) + .build(); + registry.register(SimpleLoader.class.getSimpleName(), DataLoaderFactory.newDataLoader(new SimpleLoader())); + registry.register(ChainedAsyncLoader.class.getSimpleName(), DataLoaderFactory.newDataLoader(new ChainedAsyncLoader(registry, latch))); + DataLoader loader = registry.getDataLoader(ChainedAsyncLoader.class.getSimpleName()); + CompletableFuture result = loader.load(1); + // future not done, fallback triggered + assertThat(result.isDone(), equalTo(false)); + // allow loader to continue, simulating async behavior + latch.countDown(); + + // blocking wait for fallback dispatch to trigger + Integer resultInteger = result.join(); + + assertThat(resultInteger, equalTo(1)); + } + + @Test + void dispatchGoesByLevel() throws Exception { + DataLoaderRegistry registry = DataLoaderRegistry.newRegistry() + .dispatchStrategy(new GreedyLevelByLevelChainedDispatchStrategy.Builder(scheduledExecutorService).build()) + .build(); + List> leafLevelSeenKeys = new ArrayList<>(); + BatchLoader leaf = keys -> { + leafLevelSeenKeys.add(keys); + return CompletableFuture.completedFuture(keys); + }; + + List> secondLevelSeenKeys = new ArrayList<>(); + BatchLoader secondLevel = keys -> { + secondLevelSeenKeys.add(keys); + List> futures = keys.stream().map(key -> registry.getDataLoader("leaf").load(key)).collect(Collectors.toList()); + return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).thenApply(unused -> { + List results = new ArrayList<>(); + for (CompletableFuture future: futures) { + results.add(future.join()); + } + return results; + }); + }; + + // Call trigger 2 loads on root to secondLevel + BatchLoader root = keys -> { + DataLoader second = registry.getDataLoader("secondLevel"); + List> firstCall = keys.stream().map(second::load).collect(Collectors.toList()); + + // used to verify batching + List> secondCall = keys.stream().map(second::load).collect(Collectors.toList()); + + // used to verify multiple keys + List> thirdCall = keys.stream().map(key -> second.load(key + 1)).collect(Collectors.toList()); + CompletableFuture firstFinished = CompletableFuture.allOf(firstCall.toArray(new CompletableFuture[0])); + CompletableFuture secondFinished = CompletableFuture.allOf(secondCall.toArray(new CompletableFuture[0])); + CompletableFuture thirdFinished = CompletableFuture.allOf(thirdCall.toArray(new CompletableFuture[0])); + CompletableFuture allFinished = CompletableFuture.allOf(firstFinished, secondFinished, thirdFinished); + return allFinished.thenApply(unused -> { + List result = new ArrayList<>(); + for (int i = 0; i < firstCall.size(); i++) { + int firstResult = firstCall.get(i).join(); + int secondResult = secondCall.get(i).join(); + int thirdResult = thirdCall.get(i).join(); + result.add(firstResult + secondResult + thirdResult); + } + return result; + }); + }; + + registry.register("root", DataLoaderFactory.newDataLoader(root)); + registry.register("secondLevel", DataLoaderFactory.newDataLoader(secondLevel)); + registry.register("leaf", DataLoaderFactory.newDataLoader(leaf)); + + CompletableFuture result = registry.getDataLoader("root").load(1); + + assertThat(result.isDone(), equalTo(true)); + // 1 + 1 + 2 (first, second, third call) + assertThat(result.get(), equalTo(4)); + + // verify levels only called once even though multiple loads called with different arguments (level by level) + assertThat(secondLevelSeenKeys.size(), equalTo(1)); + assertThat(leafLevelSeenKeys.size(), equalTo(1)); + + // verify keys sent to levels are proper + assertThat(secondLevelSeenKeys.get(0), equalTo(List.of(1, 2))); + assertThat(leafLevelSeenKeys.get(0), equalTo(List.of(1, 2))); + } + + private static final class ChainedAsyncLoader implements BatchLoader { + private final DataLoaderRegistry dataLoaderRegistry; + private final CountDownLatch latch; + public ChainedAsyncLoader(DataLoaderRegistry dataLoaderRegistry, CountDownLatch latch) { + this.dataLoaderRegistry = dataLoaderRegistry; + this.latch = latch; + } + + @Override + public CompletionStage> load(List keys) { + return CompletableFuture.supplyAsync(() -> { + try { + latch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + + List> futures = keys.stream().map(key -> + dataLoaderRegistry.getDataLoader(SimpleLoader.class.getSimpleName()).load(key) + ).collect(Collectors.toList()); + return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).thenApply(unused -> { + List result = new ArrayList<>(); + for (CompletableFuture future: futures) { + result.add(future.join()); + } + return result; + }).join(); + }); + } + } + + private static final class ChainedLoader implements BatchLoader { + + private final DataLoaderRegistry dataLoaderRegistry; + public ChainedLoader(DataLoaderRegistry dataLoaderRegistry) { + this.dataLoaderRegistry = dataLoaderRegistry; + } + + @Override + public CompletionStage> load(List keys) { + List> futures = keys.stream().map(key -> + dataLoaderRegistry.getDataLoader(SimpleLoader.class.getSimpleName()).load(key) + ).collect(Collectors.toList()); + return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).thenApply(unused -> { + List result = new ArrayList<>(); + for (CompletableFuture future: futures) { + result.add(future.join()); + } + return result; + }); + } + } + + private static final class SimpleLoader implements BatchLoader { + + @Override + public CompletionStage> load(List keys) { + return CompletableFuture.completedFuture(keys); + } + } +}