From d2a2bfb51f1fbf2d04c99e09b898bef1490689f1 Mon Sep 17 00:00:00 2001 From: Thomas Poignant Date: Wed, 22 Apr 2026 17:14:53 +0200 Subject: [PATCH 1/7] fix(gofeatureflag): issue when using inProcess with high concurency Signed-off-by: Thomas Poignant --- providers/go-feature-flag/README.md | 1 + .../GoFeatureFlagProviderOptions.java | 12 ++++ .../evaluator/InProcessEvaluator.java | 32 ++++----- .../providers/gofeatureflag/util/Const.java | 1 + .../gofeatureflag/wasm/WasmEvaluatorPool.java | 72 +++++++++++++++++++ .../GoFeatureFlagProviderTest.java | 45 ++++++++++++ 6 files changed, 147 insertions(+), 16 deletions(-) create mode 100644 providers/go-feature-flag/src/main/java/dev/openfeature/contrib/providers/gofeatureflag/wasm/WasmEvaluatorPool.java diff --git a/providers/go-feature-flag/README.md b/providers/go-feature-flag/README.md index e2eaa6bb1..ddadb5ab8 100644 --- a/providers/go-feature-flag/README.md +++ b/providers/go-feature-flag/README.md @@ -82,6 +82,7 @@ You can configure the provider with several options to customize its behavior. T | **`exporterMetadata`** | `false` | exporterMetadata is the metadata we send to the GO Feature Flag relay proxy when we report the evaluation data usage. | | **`evaluationFlagList`** | `false` | If you are using in process evaluation, by default we will load in memory all the flags available in the relay proxy. If you want to limit the number of flags loaded in memory, you can use this parameter. By setting this parameter, you will only load the flags available in the list.

If null or empty, all the flags available in the relay proxy will be loaded.

| | **`flagChangePollingIntervalMs`** | `false` | interval time we poll the proxy to check if the configuration has changed. It is used for the in process evaluation to check if we should refresh our internal cache. default: `120000` | +| **`wasmEvaluatorPoolSize`** | `false` | _(IN_PROCESS only)_ Number of WASM instances kept in the evaluation pool. Each instance owns independent memory, allowing fully concurrent flag evaluations without serialisation. Must be `>= 1`. _(default: number of available CPU cores)_ | ### Evaluate a feature flag The OpenFeature client is used to retrieve values for the current `EvaluationContext`. For example, retrieving a boolean value for the flag **"my-flag"**: diff --git a/providers/go-feature-flag/src/main/java/dev/openfeature/contrib/providers/gofeatureflag/GoFeatureFlagProviderOptions.java b/providers/go-feature-flag/src/main/java/dev/openfeature/contrib/providers/gofeatureflag/GoFeatureFlagProviderOptions.java index cfa45664a..9cd620553 100644 --- a/providers/go-feature-flag/src/main/java/dev/openfeature/contrib/providers/gofeatureflag/GoFeatureFlagProviderOptions.java +++ b/providers/go-feature-flag/src/main/java/dev/openfeature/contrib/providers/gofeatureflag/GoFeatureFlagProviderOptions.java @@ -91,6 +91,14 @@ public class GoFeatureFlagProviderOptions { */ private Long flagChangePollingIntervalMs; + /** + * (optional) Number of WASM instances kept in the evaluation pool for in-process evaluation. + * Each instance owns independent WASM linear memory, allowing fully concurrent evaluations + * without serialisation. Must be >= 1 when set explicitly. + * Default: number of available CPU cores. + */ + private Integer wasmEvaluatorPoolSize; + /** * Validate the options provided to the provider. * @@ -107,6 +115,10 @@ public void validate() throws InvalidOptions { throw new InvalidEndpoint("malformed endpoint: " + getEndpoint()); } + if (getWasmEvaluatorPoolSize() != null && getWasmEvaluatorPoolSize() < 1) { + throw new InvalidOptions("wasmEvaluatorPoolSize must be at least 1"); + } + if (getExporterMetadata() != null) { val acceptableExporterMetadataTypes = List.of("String", "Boolean", "Integer", "Double"); for (Map.Entry entry : getExporterMetadata().entrySet()) { diff --git a/providers/go-feature-flag/src/main/java/dev/openfeature/contrib/providers/gofeatureflag/evaluator/InProcessEvaluator.java b/providers/go-feature-flag/src/main/java/dev/openfeature/contrib/providers/gofeatureflag/evaluator/InProcessEvaluator.java index d8c752db7..fb3267e41 100644 --- a/providers/go-feature-flag/src/main/java/dev/openfeature/contrib/providers/gofeatureflag/evaluator/InProcessEvaluator.java +++ b/providers/go-feature-flag/src/main/java/dev/openfeature/contrib/providers/gofeatureflag/evaluator/InProcessEvaluator.java @@ -6,7 +6,7 @@ import dev.openfeature.contrib.providers.gofeatureflag.bean.FlagConfigResponse; import dev.openfeature.contrib.providers.gofeatureflag.bean.GoFeatureFlagResponse; import dev.openfeature.contrib.providers.gofeatureflag.util.Const; -import dev.openfeature.contrib.providers.gofeatureflag.wasm.EvaluationWasm; +import dev.openfeature.contrib.providers.gofeatureflag.wasm.WasmEvaluatorPool; import dev.openfeature.contrib.providers.gofeatureflag.wasm.bean.FlagContext; import dev.openfeature.contrib.providers.gofeatureflag.wasm.bean.WasmInput; import dev.openfeature.sdk.ErrorCode; @@ -16,7 +16,6 @@ import io.reactivex.rxjava3.core.Observable; import io.reactivex.rxjava3.disposables.Disposable; import io.reactivex.rxjava3.schedulers.Schedulers; -import io.reactivex.rxjava3.subjects.PublishSubject; import java.util.ArrayList; import java.util.Collections; import java.util.Date; @@ -35,20 +34,20 @@ public class InProcessEvaluator implements IEvaluator { /** API to contact GO Feature Flag. */ private final GoFeatureFlagApi api; - /** WASM evaluation engine. */ - private final EvaluationWasm evaluationEngine; + /** Pool of WASM evaluation engine instances for thread-safe concurrent evaluation. */ + private final WasmEvaluatorPool evaluationPool; /** Options to configure the provider. */ private final GoFeatureFlagProviderOptions options; /** Method to call when we have a configuration change. */ private final Consumer emitProviderConfigurationChanged; /** Local copy of the flags' configuration. */ - private Map flags; + private volatile Map flags; /** Evaluation context enrichment. */ - private Map evaluationContextEnrichment; + private volatile Map evaluationContextEnrichment; /** Last hash of the flags' configuration. */ - private String etag; + private volatile String etag; /** Last update of the flags' configuration. */ - private Date lastUpdate; + private volatile Date lastUpdate; /** disposable which manage the polling of the flag configurations. */ private Disposable configurationDisposable; @@ -69,12 +68,16 @@ public InProcessEvaluator( this.options = options; this.lastUpdate = new Date(0); this.emitProviderConfigurationChanged = emitProviderConfigurationChanged; - this.evaluationEngine = new EvaluationWasm(); + int poolSize = options.getWasmEvaluatorPoolSize() != null + ? options.getWasmEvaluatorPoolSize() + : Const.DEFAULT_WASM_EVALUATOR_POOL_SIZE; + this.evaluationPool = new WasmEvaluatorPool(poolSize); } @Override public GoFeatureFlagResponse evaluate(String key, Object defaultValue, EvaluationContext evaluationContext) { - if (this.flags.get(key) == null) { + Map currentFlags = this.flags; + if (currentFlags.get(key) == null) { val err = new GoFeatureFlagResponse(); err.setReason(Reason.ERROR.name()); err.setErrorCode(ErrorCode.FLAG_NOT_FOUND.name()); @@ -88,10 +91,10 @@ public GoFeatureFlagResponse evaluate(String key, Object defaultValue, Evaluatio .evaluationContextEnrichment(this.evaluationContextEnrichment) .build()) .evalContext(evaluationContext.asObjectMap()) - .flag(this.flags.get(key)) + .flag(currentFlags.get(key)) .flagKey(key) .build(); - return this.evaluationEngine.evaluate(wasmInput); + return this.evaluationPool.evaluate(wasmInput); } @Override @@ -108,7 +111,7 @@ public void init() { this.lastUpdate = configFlags.getLastUpdated(); this.evaluationContextEnrichment = configFlags.getEvaluationContextEnrichment(); // We call the WASM engine to avoid a cold start at the 1st evaluation - this.evaluationEngine.preWarmWasm(); + this.evaluationPool.preWarmWasm(); // start the polling of the flag configuration this.configurationDisposable = startCheckFlagConfigurationChangesDaemon(); @@ -131,11 +134,8 @@ private Disposable startCheckFlagConfigurationChangesDaemon() { ? options.getFlagChangePollingIntervalMs() : Const.DEFAULT_POLLING_CONFIG_FLAG_CHANGE_INTERVAL_MS; - PublishSubject stopSignal = PublishSubject.create(); Observable intervalObservable = Observable.interval(pollingIntervalMs, TimeUnit.MILLISECONDS); Observable apiCallObservable = intervalObservable - // as soon something is published in stopSignal, the interval will stop - .takeUntil(stopSignal) .flatMap(tick -> Observable.fromCallable( () -> this.api.retrieveFlagConfiguration(this.etag, options.getEvaluationFlagList())) .onErrorResumeNext(e -> { diff --git a/providers/go-feature-flag/src/main/java/dev/openfeature/contrib/providers/gofeatureflag/util/Const.java b/providers/go-feature-flag/src/main/java/dev/openfeature/contrib/providers/gofeatureflag/util/Const.java index 38b4f8e8a..bdd45fcd1 100644 --- a/providers/go-feature-flag/src/main/java/dev/openfeature/contrib/providers/gofeatureflag/util/Const.java +++ b/providers/go-feature-flag/src/main/java/dev/openfeature/contrib/providers/gofeatureflag/util/Const.java @@ -21,6 +21,7 @@ public class Const { public static final long DEFAULT_POLLING_CONFIG_FLAG_CHANGE_INTERVAL_MS = 2L * 60L * 1000L; public static final long DEFAULT_FLUSH_INTERVAL_MS = Duration.ofMinutes(1).toMillis(); public static final int DEFAULT_MAX_PENDING_EVENTS = 10000; + public static final int DEFAULT_WASM_EVALUATOR_POOL_SIZE = Runtime.getRuntime().availableProcessors(); // MAPPERS public static final ObjectMapper DESERIALIZE_OBJECT_MAPPER = new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); diff --git a/providers/go-feature-flag/src/main/java/dev/openfeature/contrib/providers/gofeatureflag/wasm/WasmEvaluatorPool.java b/providers/go-feature-flag/src/main/java/dev/openfeature/contrib/providers/gofeatureflag/wasm/WasmEvaluatorPool.java new file mode 100644 index 000000000..c8c98df6a --- /dev/null +++ b/providers/go-feature-flag/src/main/java/dev/openfeature/contrib/providers/gofeatureflag/wasm/WasmEvaluatorPool.java @@ -0,0 +1,72 @@ +package dev.openfeature.contrib.providers.gofeatureflag.wasm; + +import dev.openfeature.contrib.providers.gofeatureflag.bean.GoFeatureFlagResponse; +import dev.openfeature.contrib.providers.gofeatureflag.exception.WasmFileNotFound; +import dev.openfeature.contrib.providers.gofeatureflag.wasm.bean.WasmInput; +import dev.openfeature.sdk.ErrorCode; +import dev.openfeature.sdk.Reason; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import lombok.extern.slf4j.Slf4j; + +/** + * WasmEvaluatorPool manages a fixed pool of EvaluationWasm instances. + * Each instance owns independent WASM linear memory, allowing concurrent + * evaluate() calls without interleaving memory operations. + */ +@Slf4j +public final class WasmEvaluatorPool { + private final BlockingQueue pool; + + /** + * Creates a pool of {@code size} independent EvaluationWasm instances. + * All instances are allocated eagerly so that first-call latency is + * absorbed at provider initialisation time. + * + * @param size number of WASM instances; must be >= 1 + * @throws WasmFileNotFound if the embedded WASM module cannot be loaded + */ + public WasmEvaluatorPool(int size) throws WasmFileNotFound { + this.pool = new ArrayBlockingQueue<>(size); + for (int i = 0; i < size; i++) { + pool.offer(new EvaluationWasm()); + } + } + + /** + * Evaluates a feature flag by borrowing one WASM instance from the pool, + * delegating to it, and returning it when done. + * Blocks if all instances are busy until one becomes available. + * + * @param wasmInput evaluation input + * @return evaluation result + */ + public GoFeatureFlagResponse evaluate(WasmInput wasmInput) { + EvaluationWasm instance; + try { + instance = pool.take(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + GoFeatureFlagResponse err = new GoFeatureFlagResponse(); + err.setErrorCode(ErrorCode.GENERAL.name()); + err.setReason(Reason.ERROR.name()); + err.setErrorDetails("WASM evaluator pool interrupted while waiting for an available instance"); + return err; + } + try { + return instance.evaluate(wasmInput); + } finally { + if (!pool.offer(instance)) { + log.warn("Failed to return WASM instance to pool — pool may be exhausted"); + } + } + } + + /** + * Pre-warms all instances in the pool to avoid cold-start latency on + * the first evaluation after provider initialisation. + */ + public void preWarmWasm() { + pool.forEach(EvaluationWasm::preWarmWasm); + } +} diff --git a/providers/go-feature-flag/src/test/java/dev/openfeature/contrib/providers/gofeatureflag/GoFeatureFlagProviderTest.java b/providers/go-feature-flag/src/test/java/dev/openfeature/contrib/providers/gofeatureflag/GoFeatureFlagProviderTest.java index b11593e97..4cff06c27 100644 --- a/providers/go-feature-flag/src/test/java/dev/openfeature/contrib/providers/gofeatureflag/GoFeatureFlagProviderTest.java +++ b/providers/go-feature-flag/src/test/java/dev/openfeature/contrib/providers/gofeatureflag/GoFeatureFlagProviderTest.java @@ -28,7 +28,10 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import lombok.val; @@ -596,6 +599,48 @@ void shouldNotApplyAScheduledRolloutStepIfTheDateIsInTheFuture() { assertEquals(want, got); } } + + @DisplayName("Should evaluate flags correctly under concurrent access") + @SneakyThrows + @Test + void shouldEvaluateFlagsCorrectlyUnderConcurrentAccess() { + GoFeatureFlagProvider provider = new GoFeatureFlagProvider(GoFeatureFlagProviderOptions.builder() + .endpoint(baseUrl.toString()) + .evaluationType(EvaluationType.IN_PROCESS) + .flagChangePollingIntervalMs(999999L) + .build()); + OpenFeatureAPI.getInstance().setProviderAndWait(testName, provider); + val client = OpenFeatureAPI.getInstance().getClient(testName); + + int threadCount = 20; + int evaluationsPerThread = 100; + AtomicInteger errorCount = new AtomicInteger(0); + CountDownLatch startGate = new CountDownLatch(1); + CountDownLatch doneLatch = new CountDownLatch(threadCount); + + for (int t = 0; t < threadCount; t++) { + new Thread(() -> { + try { + startGate.await(); + for (int i = 0; i < evaluationsPerThread; i++) { + FlagEvaluationDetails result = client.getBooleanDetails( + "bool_targeting_match", false, TestUtils.defaultEvaluationContext); + if (result.getErrorCode() != null) { + errorCount.incrementAndGet(); + } + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } finally { + doneLatch.countDown(); + } + }).start(); + } + + startGate.countDown(); + assertTrue(doneLatch.await(30, TimeUnit.SECONDS), "Threads did not finish in time"); + assertEquals(0, errorCount.get(), "Concurrent evaluations produced errors"); + } } @Nested From 1062126ffc2d21bad915ee192ede5e80c10e8405 Mon Sep 17 00:00:00 2001 From: Thomas Poignant Date: Wed, 22 Apr 2026 17:21:11 +0200 Subject: [PATCH 2/7] typo in comment Signed-off-by: Thomas Poignant --- .../contrib/providers/gofeatureflag/wasm/WasmEvaluatorPool.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/providers/go-feature-flag/src/main/java/dev/openfeature/contrib/providers/gofeatureflag/wasm/WasmEvaluatorPool.java b/providers/go-feature-flag/src/main/java/dev/openfeature/contrib/providers/gofeatureflag/wasm/WasmEvaluatorPool.java index c8c98df6a..17a919b0f 100644 --- a/providers/go-feature-flag/src/main/java/dev/openfeature/contrib/providers/gofeatureflag/wasm/WasmEvaluatorPool.java +++ b/providers/go-feature-flag/src/main/java/dev/openfeature/contrib/providers/gofeatureflag/wasm/WasmEvaluatorPool.java @@ -23,7 +23,7 @@ public final class WasmEvaluatorPool { * All instances are allocated eagerly so that first-call latency is * absorbed at provider initialisation time. * - * @param size number of WASM instances; must be >= 1 + * @param size number of WASM instances; must be >= 1 * @throws WasmFileNotFound if the embedded WASM module cannot be loaded */ public WasmEvaluatorPool(int size) throws WasmFileNotFound { From 5f57a5e91ea76153223ab6b81d5dc9d379a5e30d Mon Sep 17 00:00:00 2001 From: Thomas Poignant Date: Wed, 22 Apr 2026 17:50:54 +0200 Subject: [PATCH 3/7] fixing spotbugs errors Signed-off-by: Thomas Poignant --- .../go-feature-flag/spotbugs-exclusions.xml | 39 +++++++++++++++++++ .../providers/gofeatureflag/util/Const.java | 3 +- .../gofeatureflag/wasm/WasmEvaluatorPool.java | 7 +++- .../GoFeatureFlagProviderTest.java | 29 +++++++------- 4 files changed, 62 insertions(+), 16 deletions(-) create mode 100644 providers/go-feature-flag/spotbugs-exclusions.xml diff --git a/providers/go-feature-flag/spotbugs-exclusions.xml b/providers/go-feature-flag/spotbugs-exclusions.xml new file mode 100644 index 000000000..7f1d7ae12 --- /dev/null +++ b/providers/go-feature-flag/spotbugs-exclusions.xml @@ -0,0 +1,39 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/providers/go-feature-flag/src/main/java/dev/openfeature/contrib/providers/gofeatureflag/util/Const.java b/providers/go-feature-flag/src/main/java/dev/openfeature/contrib/providers/gofeatureflag/util/Const.java index bdd45fcd1..6b3c6a326 100644 --- a/providers/go-feature-flag/src/main/java/dev/openfeature/contrib/providers/gofeatureflag/util/Const.java +++ b/providers/go-feature-flag/src/main/java/dev/openfeature/contrib/providers/gofeatureflag/util/Const.java @@ -21,7 +21,8 @@ public class Const { public static final long DEFAULT_POLLING_CONFIG_FLAG_CHANGE_INTERVAL_MS = 2L * 60L * 1000L; public static final long DEFAULT_FLUSH_INTERVAL_MS = Duration.ofMinutes(1).toMillis(); public static final int DEFAULT_MAX_PENDING_EVENTS = 10000; - public static final int DEFAULT_WASM_EVALUATOR_POOL_SIZE = Runtime.getRuntime().availableProcessors(); + public static final int DEFAULT_WASM_EVALUATOR_POOL_SIZE = + Runtime.getRuntime().availableProcessors(); // MAPPERS public static final ObjectMapper DESERIALIZE_OBJECT_MAPPER = new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); diff --git a/providers/go-feature-flag/src/main/java/dev/openfeature/contrib/providers/gofeatureflag/wasm/WasmEvaluatorPool.java b/providers/go-feature-flag/src/main/java/dev/openfeature/contrib/providers/gofeatureflag/wasm/WasmEvaluatorPool.java index 17a919b0f..f010a6857 100644 --- a/providers/go-feature-flag/src/main/java/dev/openfeature/contrib/providers/gofeatureflag/wasm/WasmEvaluatorPool.java +++ b/providers/go-feature-flag/src/main/java/dev/openfeature/contrib/providers/gofeatureflag/wasm/WasmEvaluatorPool.java @@ -29,7 +29,12 @@ public final class WasmEvaluatorPool { public WasmEvaluatorPool(int size) throws WasmFileNotFound { this.pool = new ArrayBlockingQueue<>(size); for (int i = 0; i < size; i++) { - pool.offer(new EvaluationWasm()); + if (!pool.offer(new EvaluationWasm())) { + log.warn( + "Failed to add WASM instance {} to pool during initialisation" + + " — pool capacity may be exceeded", + i); + } } } diff --git a/providers/go-feature-flag/src/test/java/dev/openfeature/contrib/providers/gofeatureflag/GoFeatureFlagProviderTest.java b/providers/go-feature-flag/src/test/java/dev/openfeature/contrib/providers/gofeatureflag/GoFeatureFlagProviderTest.java index 4cff06c27..dd837f796 100644 --- a/providers/go-feature-flag/src/test/java/dev/openfeature/contrib/providers/gofeatureflag/GoFeatureFlagProviderTest.java +++ b/providers/go-feature-flag/src/test/java/dev/openfeature/contrib/providers/gofeatureflag/GoFeatureFlagProviderTest.java @@ -620,21 +620,22 @@ void shouldEvaluateFlagsCorrectlyUnderConcurrentAccess() { for (int t = 0; t < threadCount; t++) { new Thread(() -> { - try { - startGate.await(); - for (int i = 0; i < evaluationsPerThread; i++) { - FlagEvaluationDetails result = client.getBooleanDetails( - "bool_targeting_match", false, TestUtils.defaultEvaluationContext); - if (result.getErrorCode() != null) { - errorCount.incrementAndGet(); + try { + startGate.await(); + for (int i = 0; i < evaluationsPerThread; i++) { + FlagEvaluationDetails result = client.getBooleanDetails( + "bool_targeting_match", false, TestUtils.defaultEvaluationContext); + if (result.getErrorCode() != null) { + errorCount.incrementAndGet(); + } + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } finally { + doneLatch.countDown(); } - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } finally { - doneLatch.countDown(); - } - }).start(); + }) + .start(); } startGate.countDown(); From d6b623dcfb3992ea34faab5b0f2b2024145ab7ed Mon Sep 17 00:00:00 2001 From: Thomas Poignant Date: Wed, 22 Apr 2026 18:11:41 +0200 Subject: [PATCH 4/7] Use a shared volatile Signed-off-by: Thomas Poignant --- .../evaluator/InProcessEvaluator.java | 75 +++++++++++-------- .../gofeatureflag/wasm/WasmEvaluatorPool.java | 14 +--- 2 files changed, 47 insertions(+), 42 deletions(-) diff --git a/providers/go-feature-flag/src/main/java/dev/openfeature/contrib/providers/gofeatureflag/evaluator/InProcessEvaluator.java b/providers/go-feature-flag/src/main/java/dev/openfeature/contrib/providers/gofeatureflag/evaluator/InProcessEvaluator.java index fb3267e41..fe206fc73 100644 --- a/providers/go-feature-flag/src/main/java/dev/openfeature/contrib/providers/gofeatureflag/evaluator/InProcessEvaluator.java +++ b/providers/go-feature-flag/src/main/java/dev/openfeature/contrib/providers/gofeatureflag/evaluator/InProcessEvaluator.java @@ -40,17 +40,29 @@ public class InProcessEvaluator implements IEvaluator { private final GoFeatureFlagProviderOptions options; /** Method to call when we have a configuration change. */ private final Consumer emitProviderConfigurationChanged; - /** Local copy of the flags' configuration. */ - private volatile Map flags; - /** Evaluation context enrichment. */ - private volatile Map evaluationContextEnrichment; - /** Last hash of the flags' configuration. */ - private volatile String etag; - /** Last update of the flags' configuration. */ - private volatile Date lastUpdate; + /** Immutable snapshot of all flag configuration state; updated atomically by the polling daemon. */ + private volatile EvaluatorState state; /** disposable which manage the polling of the flag configurations. */ private Disposable configurationDisposable; + private static final class EvaluatorState { + final Map flags; + final Map evaluationContextEnrichment; + final String etag; + final Date lastUpdate; + + EvaluatorState( + Map flags, + Map evaluationContextEnrichment, + String etag, + Date lastUpdate) { + this.flags = flags; + this.evaluationContextEnrichment = evaluationContextEnrichment; + this.etag = etag; + this.lastUpdate = lastUpdate; + } + } + /** * Constructor of the InProcessEvaluator. * @@ -63,11 +75,9 @@ public InProcessEvaluator( GoFeatureFlagProviderOptions options, Consumer emitProviderConfigurationChanged) { this.api = api; - this.flags = Collections.emptyMap(); - this.etag = ""; this.options = options; - this.lastUpdate = new Date(0); this.emitProviderConfigurationChanged = emitProviderConfigurationChanged; + this.state = new EvaluatorState(Collections.emptyMap(), null, "", new Date(0)); int poolSize = options.getWasmEvaluatorPoolSize() != null ? options.getWasmEvaluatorPoolSize() : Const.DEFAULT_WASM_EVALUATOR_POOL_SIZE; @@ -76,8 +86,8 @@ public InProcessEvaluator( @Override public GoFeatureFlagResponse evaluate(String key, Object defaultValue, EvaluationContext evaluationContext) { - Map currentFlags = this.flags; - if (currentFlags.get(key) == null) { + EvaluatorState current = this.state; + if (current.flags.get(key) == null) { val err = new GoFeatureFlagResponse(); err.setReason(Reason.ERROR.name()); err.setErrorCode(ErrorCode.FLAG_NOT_FOUND.name()); @@ -88,10 +98,10 @@ public GoFeatureFlagResponse evaluate(String key, Object defaultValue, Evaluatio val wasmInput = WasmInput.builder() .flagContext(FlagContext.builder() .defaultSdkValue(defaultValue) - .evaluationContextEnrichment(this.evaluationContextEnrichment) + .evaluationContextEnrichment(current.evaluationContextEnrichment) .build()) .evalContext(evaluationContext.asObjectMap()) - .flag(currentFlags.get(key)) + .flag(current.flags.get(key)) .flagKey(key) .build(); return this.evaluationPool.evaluate(wasmInput); @@ -99,19 +109,18 @@ public GoFeatureFlagResponse evaluate(String key, Object defaultValue, Evaluatio @Override public boolean isFlagTrackable(final String flagKey) { - Flag flag = this.flags.get(flagKey); + Flag flag = this.state.flags.get(flagKey); return flag != null && (flag.getTrackEvents() == null || flag.getTrackEvents()); } @Override public void init() { - val configFlags = api.retrieveFlagConfiguration(this.etag, options.getEvaluationFlagList()); - this.flags = configFlags.getFlags(); - this.etag = configFlags.getEtag(); - this.lastUpdate = configFlags.getLastUpdated(); - this.evaluationContextEnrichment = configFlags.getEvaluationContextEnrichment(); - // We call the WASM engine to avoid a cold start at the 1st evaluation - this.evaluationPool.preWarmWasm(); + val configFlags = api.retrieveFlagConfiguration(this.state.etag, options.getEvaluationFlagList()); + this.state = new EvaluatorState( + configFlags.getFlags(), + configFlags.getEvaluationContextEnrichment(), + configFlags.getEtag(), + configFlags.getLastUpdated()); // start the polling of the flag configuration this.configurationDisposable = startCheckFlagConfigurationChangesDaemon(); @@ -136,8 +145,8 @@ private Disposable startCheckFlagConfigurationChangesDaemon() { Observable intervalObservable = Observable.interval(pollingIntervalMs, TimeUnit.MILLISECONDS); Observable apiCallObservable = intervalObservable - .flatMap(tick -> Observable.fromCallable( - () -> this.api.retrieveFlagConfiguration(this.etag, options.getEvaluationFlagList())) + .flatMap(tick -> Observable.fromCallable(() -> + this.api.retrieveFlagConfiguration(this.state.etag, options.getEvaluationFlagList())) .onErrorResumeNext(e -> { log.error("error while calling flag configuration API", e); return Observable.empty(); @@ -146,22 +155,24 @@ private Disposable startCheckFlagConfigurationChangesDaemon() { return apiCallObservable.subscribe( response -> { - if (response.getEtag().equals(this.etag)) { + EvaluatorState current = this.state; + if (response.getEtag().equals(current.etag)) { log.debug("flag configuration has not changed: {}", response); return; } - if (response.getLastUpdated().before(this.lastUpdate)) { + if (response.getLastUpdated().before(current.lastUpdate)) { log.info("configuration received is older than the current one"); return; } log.info("flag configuration has changed"); - this.etag = response.getEtag(); - this.lastUpdate = response.getLastUpdated(); - val flagChanges = findFlagConfigurationChanges(this.flags, response.getFlags()); - this.flags = response.getFlags(); - this.evaluationContextEnrichment = response.getEvaluationContextEnrichment(); + val flagChanges = findFlagConfigurationChanges(current.flags, response.getFlags()); + this.state = new EvaluatorState( + response.getFlags(), + response.getEvaluationContextEnrichment(), + response.getEtag(), + response.getLastUpdated()); val changeDetails = ProviderEventDetails.builder() .flagsChanged(flagChanges) .message("flag configuration has changed") diff --git a/providers/go-feature-flag/src/main/java/dev/openfeature/contrib/providers/gofeatureflag/wasm/WasmEvaluatorPool.java b/providers/go-feature-flag/src/main/java/dev/openfeature/contrib/providers/gofeatureflag/wasm/WasmEvaluatorPool.java index f010a6857..fed0b683d 100644 --- a/providers/go-feature-flag/src/main/java/dev/openfeature/contrib/providers/gofeatureflag/wasm/WasmEvaluatorPool.java +++ b/providers/go-feature-flag/src/main/java/dev/openfeature/contrib/providers/gofeatureflag/wasm/WasmEvaluatorPool.java @@ -29,7 +29,9 @@ public final class WasmEvaluatorPool { public WasmEvaluatorPool(int size) throws WasmFileNotFound { this.pool = new ArrayBlockingQueue<>(size); for (int i = 0; i < size; i++) { - if (!pool.offer(new EvaluationWasm())) { + EvaluationWasm instance = new EvaluationWasm(); + instance.preWarmWasm(); + if (!pool.offer(instance)) { log.warn( "Failed to add WASM instance {} to pool during initialisation" + " — pool capacity may be exceeded", @@ -62,16 +64,8 @@ public GoFeatureFlagResponse evaluate(WasmInput wasmInput) { return instance.evaluate(wasmInput); } finally { if (!pool.offer(instance)) { - log.warn("Failed to return WASM instance to pool — pool may be exhausted"); + log.error("Failed to return WASM instance to pool — instance leaked, pool capacity may be compromised"); } } } - - /** - * Pre-warms all instances in the pool to avoid cold-start latency on - * the first evaluation after provider initialisation. - */ - public void preWarmWasm() { - pool.forEach(EvaluationWasm::preWarmWasm); - } } From 8fb30b3ca70b305f9a0e09e4a50ca229929f6ca1 Mon Sep 17 00:00:00 2001 From: Thomas Poignant Date: Wed, 22 Apr 2026 18:12:40 +0200 Subject: [PATCH 5/7] remove spotbugs Signed-off-by: Thomas Poignant --- .../go-feature-flag/spotbugs-exclusions.xml | 39 ------------------- 1 file changed, 39 deletions(-) delete mode 100644 providers/go-feature-flag/spotbugs-exclusions.xml diff --git a/providers/go-feature-flag/spotbugs-exclusions.xml b/providers/go-feature-flag/spotbugs-exclusions.xml deleted file mode 100644 index 7f1d7ae12..000000000 --- a/providers/go-feature-flag/spotbugs-exclusions.xml +++ /dev/null @@ -1,39 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - From b4eb4b61d52e6b2b2c9dc581b38db5b2486d2d3c Mon Sep 17 00:00:00 2001 From: Thomas Poignant Date: Wed, 22 Apr 2026 18:34:30 +0200 Subject: [PATCH 6/7] addressing PR reviews Signed-off-by: Thomas Poignant --- .../gofeatureflag/evaluator/InProcessEvaluator.java | 3 ++- .../providers/gofeatureflag/wasm/WasmEvaluatorPool.java | 7 +------ 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/providers/go-feature-flag/src/main/java/dev/openfeature/contrib/providers/gofeatureflag/evaluator/InProcessEvaluator.java b/providers/go-feature-flag/src/main/java/dev/openfeature/contrib/providers/gofeatureflag/evaluator/InProcessEvaluator.java index fe206fc73..f97806d70 100644 --- a/providers/go-feature-flag/src/main/java/dev/openfeature/contrib/providers/gofeatureflag/evaluator/InProcessEvaluator.java +++ b/providers/go-feature-flag/src/main/java/dev/openfeature/contrib/providers/gofeatureflag/evaluator/InProcessEvaluator.java @@ -143,7 +143,8 @@ private Disposable startCheckFlagConfigurationChangesDaemon() { ? options.getFlagChangePollingIntervalMs() : Const.DEFAULT_POLLING_CONFIG_FLAG_CHANGE_INTERVAL_MS; - Observable intervalObservable = Observable.interval(pollingIntervalMs, TimeUnit.MILLISECONDS); + Observable intervalObservable = Observable.interval( + pollingIntervalMs, TimeUnit.MILLISECONDS, Schedulers.io()); Observable apiCallObservable = intervalObservable .flatMap(tick -> Observable.fromCallable(() -> this.api.retrieveFlagConfiguration(this.state.etag, options.getEvaluationFlagList())) diff --git a/providers/go-feature-flag/src/main/java/dev/openfeature/contrib/providers/gofeatureflag/wasm/WasmEvaluatorPool.java b/providers/go-feature-flag/src/main/java/dev/openfeature/contrib/providers/gofeatureflag/wasm/WasmEvaluatorPool.java index fed0b683d..dd7a2a99f 100644 --- a/providers/go-feature-flag/src/main/java/dev/openfeature/contrib/providers/gofeatureflag/wasm/WasmEvaluatorPool.java +++ b/providers/go-feature-flag/src/main/java/dev/openfeature/contrib/providers/gofeatureflag/wasm/WasmEvaluatorPool.java @@ -31,12 +31,7 @@ public WasmEvaluatorPool(int size) throws WasmFileNotFound { for (int i = 0; i < size; i++) { EvaluationWasm instance = new EvaluationWasm(); instance.preWarmWasm(); - if (!pool.offer(instance)) { - log.warn( - "Failed to add WASM instance {} to pool during initialisation" - + " — pool capacity may be exceeded", - i); - } + pool.add(instance); } } From 5ff52af2cd3e76213945fa1a25d22a399c1984f6 Mon Sep 17 00:00:00 2001 From: Thomas Poignant Date: Wed, 22 Apr 2026 18:46:14 +0200 Subject: [PATCH 7/7] code style issue Signed-off-by: Thomas Poignant --- .../providers/gofeatureflag/evaluator/InProcessEvaluator.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/providers/go-feature-flag/src/main/java/dev/openfeature/contrib/providers/gofeatureflag/evaluator/InProcessEvaluator.java b/providers/go-feature-flag/src/main/java/dev/openfeature/contrib/providers/gofeatureflag/evaluator/InProcessEvaluator.java index f97806d70..4a63cb3ad 100644 --- a/providers/go-feature-flag/src/main/java/dev/openfeature/contrib/providers/gofeatureflag/evaluator/InProcessEvaluator.java +++ b/providers/go-feature-flag/src/main/java/dev/openfeature/contrib/providers/gofeatureflag/evaluator/InProcessEvaluator.java @@ -143,8 +143,8 @@ private Disposable startCheckFlagConfigurationChangesDaemon() { ? options.getFlagChangePollingIntervalMs() : Const.DEFAULT_POLLING_CONFIG_FLAG_CHANGE_INTERVAL_MS; - Observable intervalObservable = Observable.interval( - pollingIntervalMs, TimeUnit.MILLISECONDS, Schedulers.io()); + Observable intervalObservable = + Observable.interval(pollingIntervalMs, TimeUnit.MILLISECONDS, Schedulers.io()); Observable apiCallObservable = intervalObservable .flatMap(tick -> Observable.fromCallable(() -> this.api.retrieveFlagConfiguration(this.state.etag, options.getEvaluationFlagList()))