diff --git a/providers/go-feature-flag/README.md b/providers/go-feature-flag/README.md index e2eaa6bb1..ddadb5ab8 100644 --- a/providers/go-feature-flag/README.md +++ b/providers/go-feature-flag/README.md @@ -82,6 +82,7 @@ You can configure the provider with several options to customize its behavior. T | **`exporterMetadata`** | `false` | exporterMetadata is the metadata we send to the GO Feature Flag relay proxy when we report the evaluation data usage. | | **`evaluationFlagList`** | `false` | If you are using in process evaluation, by default we will load in memory all the flags available in the relay proxy. If you want to limit the number of flags loaded in memory, you can use this parameter. By setting this parameter, you will only load the flags available in the list.

If null or empty, all the flags available in the relay proxy will be loaded.

| | **`flagChangePollingIntervalMs`** | `false` | interval time we poll the proxy to check if the configuration has changed. It is used for the in process evaluation to check if we should refresh our internal cache. default: `120000` | +| **`wasmEvaluatorPoolSize`** | `false` | _(IN_PROCESS only)_ Number of WASM instances kept in the evaluation pool. Each instance owns independent memory, allowing fully concurrent flag evaluations without serialisation. Must be `>= 1`. _(default: number of available CPU cores)_ | ### Evaluate a feature flag The OpenFeature client is used to retrieve values for the current `EvaluationContext`. For example, retrieving a boolean value for the flag **"my-flag"**: diff --git a/providers/go-feature-flag/src/main/java/dev/openfeature/contrib/providers/gofeatureflag/GoFeatureFlagProviderOptions.java b/providers/go-feature-flag/src/main/java/dev/openfeature/contrib/providers/gofeatureflag/GoFeatureFlagProviderOptions.java index cfa45664a..9cd620553 100644 --- a/providers/go-feature-flag/src/main/java/dev/openfeature/contrib/providers/gofeatureflag/GoFeatureFlagProviderOptions.java +++ b/providers/go-feature-flag/src/main/java/dev/openfeature/contrib/providers/gofeatureflag/GoFeatureFlagProviderOptions.java @@ -91,6 +91,14 @@ public class GoFeatureFlagProviderOptions { */ private Long flagChangePollingIntervalMs; + /** + * (optional) Number of WASM instances kept in the evaluation pool for in-process evaluation. + * Each instance owns independent WASM linear memory, allowing fully concurrent evaluations + * without serialisation. Must be >= 1 when set explicitly. + * Default: number of available CPU cores. + */ + private Integer wasmEvaluatorPoolSize; + /** * Validate the options provided to the provider. * @@ -107,6 +115,10 @@ public void validate() throws InvalidOptions { throw new InvalidEndpoint("malformed endpoint: " + getEndpoint()); } + if (getWasmEvaluatorPoolSize() != null && getWasmEvaluatorPoolSize() < 1) { + throw new InvalidOptions("wasmEvaluatorPoolSize must be at least 1"); + } + if (getExporterMetadata() != null) { val acceptableExporterMetadataTypes = List.of("String", "Boolean", "Integer", "Double"); for (Map.Entry entry : getExporterMetadata().entrySet()) { diff --git a/providers/go-feature-flag/src/main/java/dev/openfeature/contrib/providers/gofeatureflag/evaluator/InProcessEvaluator.java b/providers/go-feature-flag/src/main/java/dev/openfeature/contrib/providers/gofeatureflag/evaluator/InProcessEvaluator.java index d8c752db7..4a63cb3ad 100644 --- a/providers/go-feature-flag/src/main/java/dev/openfeature/contrib/providers/gofeatureflag/evaluator/InProcessEvaluator.java +++ b/providers/go-feature-flag/src/main/java/dev/openfeature/contrib/providers/gofeatureflag/evaluator/InProcessEvaluator.java @@ -6,7 +6,7 @@ import dev.openfeature.contrib.providers.gofeatureflag.bean.FlagConfigResponse; import dev.openfeature.contrib.providers.gofeatureflag.bean.GoFeatureFlagResponse; import dev.openfeature.contrib.providers.gofeatureflag.util.Const; -import dev.openfeature.contrib.providers.gofeatureflag.wasm.EvaluationWasm; +import dev.openfeature.contrib.providers.gofeatureflag.wasm.WasmEvaluatorPool; import dev.openfeature.contrib.providers.gofeatureflag.wasm.bean.FlagContext; import dev.openfeature.contrib.providers.gofeatureflag.wasm.bean.WasmInput; import dev.openfeature.sdk.ErrorCode; @@ -16,7 +16,6 @@ import io.reactivex.rxjava3.core.Observable; import io.reactivex.rxjava3.disposables.Disposable; import io.reactivex.rxjava3.schedulers.Schedulers; -import io.reactivex.rxjava3.subjects.PublishSubject; import java.util.ArrayList; import java.util.Collections; import java.util.Date; @@ -35,23 +34,35 @@ public class InProcessEvaluator implements IEvaluator { /** API to contact GO Feature Flag. */ private final GoFeatureFlagApi api; - /** WASM evaluation engine. */ - private final EvaluationWasm evaluationEngine; + /** Pool of WASM evaluation engine instances for thread-safe concurrent evaluation. */ + private final WasmEvaluatorPool evaluationPool; /** Options to configure the provider. */ private final GoFeatureFlagProviderOptions options; /** Method to call when we have a configuration change. */ private final Consumer emitProviderConfigurationChanged; - /** Local copy of the flags' configuration. */ - private Map flags; - /** Evaluation context enrichment. */ - private Map evaluationContextEnrichment; - /** Last hash of the flags' configuration. */ - private String etag; - /** Last update of the flags' configuration. */ - private Date lastUpdate; + /** Immutable snapshot of all flag configuration state; updated atomically by the polling daemon. */ + private volatile EvaluatorState state; /** disposable which manage the polling of the flag configurations. */ private Disposable configurationDisposable; + private static final class EvaluatorState { + final Map flags; + final Map evaluationContextEnrichment; + final String etag; + final Date lastUpdate; + + EvaluatorState( + Map flags, + Map evaluationContextEnrichment, + String etag, + Date lastUpdate) { + this.flags = flags; + this.evaluationContextEnrichment = evaluationContextEnrichment; + this.etag = etag; + this.lastUpdate = lastUpdate; + } + } + /** * Constructor of the InProcessEvaluator. * @@ -64,17 +75,19 @@ public InProcessEvaluator( GoFeatureFlagProviderOptions options, Consumer emitProviderConfigurationChanged) { this.api = api; - this.flags = Collections.emptyMap(); - this.etag = ""; this.options = options; - this.lastUpdate = new Date(0); this.emitProviderConfigurationChanged = emitProviderConfigurationChanged; - this.evaluationEngine = new EvaluationWasm(); + this.state = new EvaluatorState(Collections.emptyMap(), null, "", new Date(0)); + int poolSize = options.getWasmEvaluatorPoolSize() != null + ? options.getWasmEvaluatorPoolSize() + : Const.DEFAULT_WASM_EVALUATOR_POOL_SIZE; + this.evaluationPool = new WasmEvaluatorPool(poolSize); } @Override public GoFeatureFlagResponse evaluate(String key, Object defaultValue, EvaluationContext evaluationContext) { - if (this.flags.get(key) == null) { + EvaluatorState current = this.state; + if (current.flags.get(key) == null) { val err = new GoFeatureFlagResponse(); err.setReason(Reason.ERROR.name()); err.setErrorCode(ErrorCode.FLAG_NOT_FOUND.name()); @@ -85,30 +98,29 @@ public GoFeatureFlagResponse evaluate(String key, Object defaultValue, Evaluatio val wasmInput = WasmInput.builder() .flagContext(FlagContext.builder() .defaultSdkValue(defaultValue) - .evaluationContextEnrichment(this.evaluationContextEnrichment) + .evaluationContextEnrichment(current.evaluationContextEnrichment) .build()) .evalContext(evaluationContext.asObjectMap()) - .flag(this.flags.get(key)) + .flag(current.flags.get(key)) .flagKey(key) .build(); - return this.evaluationEngine.evaluate(wasmInput); + return this.evaluationPool.evaluate(wasmInput); } @Override public boolean isFlagTrackable(final String flagKey) { - Flag flag = this.flags.get(flagKey); + Flag flag = this.state.flags.get(flagKey); return flag != null && (flag.getTrackEvents() == null || flag.getTrackEvents()); } @Override public void init() { - val configFlags = api.retrieveFlagConfiguration(this.etag, options.getEvaluationFlagList()); - this.flags = configFlags.getFlags(); - this.etag = configFlags.getEtag(); - this.lastUpdate = configFlags.getLastUpdated(); - this.evaluationContextEnrichment = configFlags.getEvaluationContextEnrichment(); - // We call the WASM engine to avoid a cold start at the 1st evaluation - this.evaluationEngine.preWarmWasm(); + val configFlags = api.retrieveFlagConfiguration(this.state.etag, options.getEvaluationFlagList()); + this.state = new EvaluatorState( + configFlags.getFlags(), + configFlags.getEvaluationContextEnrichment(), + configFlags.getEtag(), + configFlags.getLastUpdated()); // start the polling of the flag configuration this.configurationDisposable = startCheckFlagConfigurationChangesDaemon(); @@ -131,13 +143,11 @@ private Disposable startCheckFlagConfigurationChangesDaemon() { ? options.getFlagChangePollingIntervalMs() : Const.DEFAULT_POLLING_CONFIG_FLAG_CHANGE_INTERVAL_MS; - PublishSubject stopSignal = PublishSubject.create(); - Observable intervalObservable = Observable.interval(pollingIntervalMs, TimeUnit.MILLISECONDS); + Observable intervalObservable = + Observable.interval(pollingIntervalMs, TimeUnit.MILLISECONDS, Schedulers.io()); Observable apiCallObservable = intervalObservable - // as soon something is published in stopSignal, the interval will stop - .takeUntil(stopSignal) - .flatMap(tick -> Observable.fromCallable( - () -> this.api.retrieveFlagConfiguration(this.etag, options.getEvaluationFlagList())) + .flatMap(tick -> Observable.fromCallable(() -> + this.api.retrieveFlagConfiguration(this.state.etag, options.getEvaluationFlagList())) .onErrorResumeNext(e -> { log.error("error while calling flag configuration API", e); return Observable.empty(); @@ -146,22 +156,24 @@ private Disposable startCheckFlagConfigurationChangesDaemon() { return apiCallObservable.subscribe( response -> { - if (response.getEtag().equals(this.etag)) { + EvaluatorState current = this.state; + if (response.getEtag().equals(current.etag)) { log.debug("flag configuration has not changed: {}", response); return; } - if (response.getLastUpdated().before(this.lastUpdate)) { + if (response.getLastUpdated().before(current.lastUpdate)) { log.info("configuration received is older than the current one"); return; } log.info("flag configuration has changed"); - this.etag = response.getEtag(); - this.lastUpdate = response.getLastUpdated(); - val flagChanges = findFlagConfigurationChanges(this.flags, response.getFlags()); - this.flags = response.getFlags(); - this.evaluationContextEnrichment = response.getEvaluationContextEnrichment(); + val flagChanges = findFlagConfigurationChanges(current.flags, response.getFlags()); + this.state = new EvaluatorState( + response.getFlags(), + response.getEvaluationContextEnrichment(), + response.getEtag(), + response.getLastUpdated()); val changeDetails = ProviderEventDetails.builder() .flagsChanged(flagChanges) .message("flag configuration has changed") diff --git a/providers/go-feature-flag/src/main/java/dev/openfeature/contrib/providers/gofeatureflag/util/Const.java b/providers/go-feature-flag/src/main/java/dev/openfeature/contrib/providers/gofeatureflag/util/Const.java index 38b4f8e8a..6b3c6a326 100644 --- a/providers/go-feature-flag/src/main/java/dev/openfeature/contrib/providers/gofeatureflag/util/Const.java +++ b/providers/go-feature-flag/src/main/java/dev/openfeature/contrib/providers/gofeatureflag/util/Const.java @@ -21,6 +21,8 @@ public class Const { public static final long DEFAULT_POLLING_CONFIG_FLAG_CHANGE_INTERVAL_MS = 2L * 60L * 1000L; public static final long DEFAULT_FLUSH_INTERVAL_MS = Duration.ofMinutes(1).toMillis(); public static final int DEFAULT_MAX_PENDING_EVENTS = 10000; + public static final int DEFAULT_WASM_EVALUATOR_POOL_SIZE = + Runtime.getRuntime().availableProcessors(); // MAPPERS public static final ObjectMapper DESERIALIZE_OBJECT_MAPPER = new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); diff --git a/providers/go-feature-flag/src/main/java/dev/openfeature/contrib/providers/gofeatureflag/wasm/WasmEvaluatorPool.java b/providers/go-feature-flag/src/main/java/dev/openfeature/contrib/providers/gofeatureflag/wasm/WasmEvaluatorPool.java new file mode 100644 index 000000000..dd7a2a99f --- /dev/null +++ b/providers/go-feature-flag/src/main/java/dev/openfeature/contrib/providers/gofeatureflag/wasm/WasmEvaluatorPool.java @@ -0,0 +1,66 @@ +package dev.openfeature.contrib.providers.gofeatureflag.wasm; + +import dev.openfeature.contrib.providers.gofeatureflag.bean.GoFeatureFlagResponse; +import dev.openfeature.contrib.providers.gofeatureflag.exception.WasmFileNotFound; +import dev.openfeature.contrib.providers.gofeatureflag.wasm.bean.WasmInput; +import dev.openfeature.sdk.ErrorCode; +import dev.openfeature.sdk.Reason; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import lombok.extern.slf4j.Slf4j; + +/** + * WasmEvaluatorPool manages a fixed pool of EvaluationWasm instances. + * Each instance owns independent WASM linear memory, allowing concurrent + * evaluate() calls without interleaving memory operations. + */ +@Slf4j +public final class WasmEvaluatorPool { + private final BlockingQueue pool; + + /** + * Creates a pool of {@code size} independent EvaluationWasm instances. + * All instances are allocated eagerly so that first-call latency is + * absorbed at provider initialisation time. + * + * @param size number of WASM instances; must be >= 1 + * @throws WasmFileNotFound if the embedded WASM module cannot be loaded + */ + public WasmEvaluatorPool(int size) throws WasmFileNotFound { + this.pool = new ArrayBlockingQueue<>(size); + for (int i = 0; i < size; i++) { + EvaluationWasm instance = new EvaluationWasm(); + instance.preWarmWasm(); + pool.add(instance); + } + } + + /** + * Evaluates a feature flag by borrowing one WASM instance from the pool, + * delegating to it, and returning it when done. + * Blocks if all instances are busy until one becomes available. + * + * @param wasmInput evaluation input + * @return evaluation result + */ + public GoFeatureFlagResponse evaluate(WasmInput wasmInput) { + EvaluationWasm instance; + try { + instance = pool.take(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + GoFeatureFlagResponse err = new GoFeatureFlagResponse(); + err.setErrorCode(ErrorCode.GENERAL.name()); + err.setReason(Reason.ERROR.name()); + err.setErrorDetails("WASM evaluator pool interrupted while waiting for an available instance"); + return err; + } + try { + return instance.evaluate(wasmInput); + } finally { + if (!pool.offer(instance)) { + log.error("Failed to return WASM instance to pool — instance leaked, pool capacity may be compromised"); + } + } + } +} diff --git a/providers/go-feature-flag/src/test/java/dev/openfeature/contrib/providers/gofeatureflag/GoFeatureFlagProviderTest.java b/providers/go-feature-flag/src/test/java/dev/openfeature/contrib/providers/gofeatureflag/GoFeatureFlagProviderTest.java index b11593e97..dd837f796 100644 --- a/providers/go-feature-flag/src/test/java/dev/openfeature/contrib/providers/gofeatureflag/GoFeatureFlagProviderTest.java +++ b/providers/go-feature-flag/src/test/java/dev/openfeature/contrib/providers/gofeatureflag/GoFeatureFlagProviderTest.java @@ -28,7 +28,10 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import lombok.val; @@ -596,6 +599,49 @@ void shouldNotApplyAScheduledRolloutStepIfTheDateIsInTheFuture() { assertEquals(want, got); } } + + @DisplayName("Should evaluate flags correctly under concurrent access") + @SneakyThrows + @Test + void shouldEvaluateFlagsCorrectlyUnderConcurrentAccess() { + GoFeatureFlagProvider provider = new GoFeatureFlagProvider(GoFeatureFlagProviderOptions.builder() + .endpoint(baseUrl.toString()) + .evaluationType(EvaluationType.IN_PROCESS) + .flagChangePollingIntervalMs(999999L) + .build()); + OpenFeatureAPI.getInstance().setProviderAndWait(testName, provider); + val client = OpenFeatureAPI.getInstance().getClient(testName); + + int threadCount = 20; + int evaluationsPerThread = 100; + AtomicInteger errorCount = new AtomicInteger(0); + CountDownLatch startGate = new CountDownLatch(1); + CountDownLatch doneLatch = new CountDownLatch(threadCount); + + for (int t = 0; t < threadCount; t++) { + new Thread(() -> { + try { + startGate.await(); + for (int i = 0; i < evaluationsPerThread; i++) { + FlagEvaluationDetails result = client.getBooleanDetails( + "bool_targeting_match", false, TestUtils.defaultEvaluationContext); + if (result.getErrorCode() != null) { + errorCount.incrementAndGet(); + } + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } finally { + doneLatch.countDown(); + } + }) + .start(); + } + + startGate.countDown(); + assertTrue(doneLatch.await(30, TimeUnit.SECONDS), "Threads did not finish in time"); + assertEquals(0, errorCount.get(), "Concurrent evaluations produced errors"); + } } @Nested