Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions providers/go-feature-flag/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ You can configure the provider with several options to customize its behavior. T
| **`exporterMetadata`** | `false` | exporterMetadata is the metadata we send to the GO Feature Flag relay proxy when we report the evaluation data usage. |
| **`evaluationFlagList`** | `false` | If you are using in process evaluation, by default we will load in memory all the flags available in the relay proxy. If you want to limit the number of flags loaded in memory, you can use this parameter. By setting this parameter, you will only load the flags available in the list. <p>If null or empty, all the flags available in the relay proxy will be loaded.</p> |
| **`flagChangePollingIntervalMs`** | `false` | interval time we poll the proxy to check if the configuration has changed. It is used for the in process evaluation to check if we should refresh our internal cache. default: `120000` |
| **`wasmEvaluatorPoolSize`** | `false` | _(IN_PROCESS only)_ Number of WASM instances kept in the evaluation pool. Each instance owns independent memory, allowing fully concurrent flag evaluations without serialisation. Must be `>= 1`. _(default: number of available CPU cores)_ |

### Evaluate a feature flag
The OpenFeature client is used to retrieve values for the current `EvaluationContext`. For example, retrieving a boolean value for the flag **"my-flag"**:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,14 @@ public class GoFeatureFlagProviderOptions {
*/
private Long flagChangePollingIntervalMs;

/**
* (optional) Number of WASM instances kept in the evaluation pool for in-process evaluation.
* Each instance owns independent WASM linear memory, allowing fully concurrent evaluations
* without serialisation. Must be &gt;= 1 when set explicitly.
* Default: number of available CPU cores.
*/
private Integer wasmEvaluatorPoolSize;

/**
* Validate the options provided to the provider.
*
Expand All @@ -107,6 +115,10 @@ public void validate() throws InvalidOptions {
throw new InvalidEndpoint("malformed endpoint: " + getEndpoint());
}

if (getWasmEvaluatorPoolSize() != null && getWasmEvaluatorPoolSize() < 1) {
throw new InvalidOptions("wasmEvaluatorPoolSize must be at least 1");
}

if (getExporterMetadata() != null) {
val acceptableExporterMetadataTypes = List.of("String", "Boolean", "Integer", "Double");
for (Map.Entry<String, Object> entry : getExporterMetadata().entrySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import dev.openfeature.contrib.providers.gofeatureflag.bean.FlagConfigResponse;
import dev.openfeature.contrib.providers.gofeatureflag.bean.GoFeatureFlagResponse;
import dev.openfeature.contrib.providers.gofeatureflag.util.Const;
import dev.openfeature.contrib.providers.gofeatureflag.wasm.EvaluationWasm;
import dev.openfeature.contrib.providers.gofeatureflag.wasm.WasmEvaluatorPool;
import dev.openfeature.contrib.providers.gofeatureflag.wasm.bean.FlagContext;
import dev.openfeature.contrib.providers.gofeatureflag.wasm.bean.WasmInput;
import dev.openfeature.sdk.ErrorCode;
Expand All @@ -16,7 +16,6 @@
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.schedulers.Schedulers;
import io.reactivex.rxjava3.subjects.PublishSubject;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
Expand All @@ -35,23 +34,35 @@
public class InProcessEvaluator implements IEvaluator {
/** API to contact GO Feature Flag. */
private final GoFeatureFlagApi api;
/** WASM evaluation engine. */
private final EvaluationWasm evaluationEngine;
/** Pool of WASM evaluation engine instances for thread-safe concurrent evaluation. */
private final WasmEvaluatorPool evaluationPool;
/** Options to configure the provider. */
private final GoFeatureFlagProviderOptions options;
/** Method to call when we have a configuration change. */
private final Consumer<ProviderEventDetails> emitProviderConfigurationChanged;
/** Local copy of the flags' configuration. */
private Map<String, Flag> flags;
/** Evaluation context enrichment. */
private Map<String, Object> evaluationContextEnrichment;
/** Last hash of the flags' configuration. */
private String etag;
/** Last update of the flags' configuration. */
private Date lastUpdate;
/** Immutable snapshot of all flag configuration state; updated atomically by the polling daemon. */
private volatile EvaluatorState state;
/** disposable which manage the polling of the flag configurations. */
private Disposable configurationDisposable;

private static final class EvaluatorState {
final Map<String, Flag> flags;
final Map<String, Object> evaluationContextEnrichment;
final String etag;
final Date lastUpdate;

EvaluatorState(
Map<String, Flag> flags,
Map<String, Object> evaluationContextEnrichment,
String etag,
Date lastUpdate) {
this.flags = flags;
this.evaluationContextEnrichment = evaluationContextEnrichment;
this.etag = etag;
this.lastUpdate = lastUpdate;
}
}

/**
* Constructor of the InProcessEvaluator.
*
Expand All @@ -64,17 +75,19 @@ public InProcessEvaluator(
GoFeatureFlagProviderOptions options,
Consumer<ProviderEventDetails> emitProviderConfigurationChanged) {
this.api = api;
this.flags = Collections.emptyMap();
this.etag = "";
this.options = options;
this.lastUpdate = new Date(0);
this.emitProviderConfigurationChanged = emitProviderConfigurationChanged;
this.evaluationEngine = new EvaluationWasm();
this.state = new EvaluatorState(Collections.emptyMap(), null, "", new Date(0));
int poolSize = options.getWasmEvaluatorPoolSize() != null
? options.getWasmEvaluatorPoolSize()
: Const.DEFAULT_WASM_EVALUATOR_POOL_SIZE;
this.evaluationPool = new WasmEvaluatorPool(poolSize);
}

@Override
public GoFeatureFlagResponse evaluate(String key, Object defaultValue, EvaluationContext evaluationContext) {
if (this.flags.get(key) == null) {
EvaluatorState current = this.state;
if (current.flags.get(key) == null) {
val err = new GoFeatureFlagResponse();
err.setReason(Reason.ERROR.name());
err.setErrorCode(ErrorCode.FLAG_NOT_FOUND.name());
Expand All @@ -85,30 +98,29 @@ public GoFeatureFlagResponse evaluate(String key, Object defaultValue, Evaluatio
val wasmInput = WasmInput.builder()
.flagContext(FlagContext.builder()
.defaultSdkValue(defaultValue)
.evaluationContextEnrichment(this.evaluationContextEnrichment)
.evaluationContextEnrichment(current.evaluationContextEnrichment)
.build())
.evalContext(evaluationContext.asObjectMap())
.flag(this.flags.get(key))
.flag(current.flags.get(key))
.flagKey(key)
.build();
return this.evaluationEngine.evaluate(wasmInput);
return this.evaluationPool.evaluate(wasmInput);
}

@Override
public boolean isFlagTrackable(final String flagKey) {
Flag flag = this.flags.get(flagKey);
Flag flag = this.state.flags.get(flagKey);
return flag != null && (flag.getTrackEvents() == null || flag.getTrackEvents());
}

@Override
public void init() {
val configFlags = api.retrieveFlagConfiguration(this.etag, options.getEvaluationFlagList());
this.flags = configFlags.getFlags();
this.etag = configFlags.getEtag();
this.lastUpdate = configFlags.getLastUpdated();
this.evaluationContextEnrichment = configFlags.getEvaluationContextEnrichment();
// We call the WASM engine to avoid a cold start at the 1st evaluation
this.evaluationEngine.preWarmWasm();
val configFlags = api.retrieveFlagConfiguration(this.state.etag, options.getEvaluationFlagList());
this.state = new EvaluatorState(
configFlags.getFlags(),
configFlags.getEvaluationContextEnrichment(),
configFlags.getEtag(),
configFlags.getLastUpdated());

// start the polling of the flag configuration
this.configurationDisposable = startCheckFlagConfigurationChangesDaemon();
Expand All @@ -131,13 +143,11 @@ private Disposable startCheckFlagConfigurationChangesDaemon() {
? options.getFlagChangePollingIntervalMs()
: Const.DEFAULT_POLLING_CONFIG_FLAG_CHANGE_INTERVAL_MS;

PublishSubject<Object> stopSignal = PublishSubject.create();
Observable<Long> intervalObservable = Observable.interval(pollingIntervalMs, TimeUnit.MILLISECONDS);
Observable<Long> intervalObservable =
Observable.interval(pollingIntervalMs, TimeUnit.MILLISECONDS, Schedulers.io());
Observable<FlagConfigResponse> apiCallObservable = intervalObservable
// as soon something is published in stopSignal, the interval will stop
.takeUntil(stopSignal)
.flatMap(tick -> Observable.fromCallable(
() -> this.api.retrieveFlagConfiguration(this.etag, options.getEvaluationFlagList()))
.flatMap(tick -> Observable.fromCallable(() ->
this.api.retrieveFlagConfiguration(this.state.etag, options.getEvaluationFlagList()))
.onErrorResumeNext(e -> {
log.error("error while calling flag configuration API", e);
return Observable.empty();
Expand All @@ -146,22 +156,24 @@ private Disposable startCheckFlagConfigurationChangesDaemon() {

return apiCallObservable.subscribe(
response -> {
if (response.getEtag().equals(this.etag)) {
EvaluatorState current = this.state;
if (response.getEtag().equals(current.etag)) {
log.debug("flag configuration has not changed: {}", response);
return;
}

if (response.getLastUpdated().before(this.lastUpdate)) {
if (response.getLastUpdated().before(current.lastUpdate)) {
log.info("configuration received is older than the current one");
return;
}

log.info("flag configuration has changed");
this.etag = response.getEtag();
this.lastUpdate = response.getLastUpdated();
val flagChanges = findFlagConfigurationChanges(this.flags, response.getFlags());
this.flags = response.getFlags();
this.evaluationContextEnrichment = response.getEvaluationContextEnrichment();
val flagChanges = findFlagConfigurationChanges(current.flags, response.getFlags());
this.state = new EvaluatorState(
response.getFlags(),
response.getEvaluationContextEnrichment(),
response.getEtag(),
response.getLastUpdated());
val changeDetails = ProviderEventDetails.builder()
.flagsChanged(flagChanges)
.message("flag configuration has changed")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ public class Const {
public static final long DEFAULT_POLLING_CONFIG_FLAG_CHANGE_INTERVAL_MS = 2L * 60L * 1000L;
public static final long DEFAULT_FLUSH_INTERVAL_MS = Duration.ofMinutes(1).toMillis();
public static final int DEFAULT_MAX_PENDING_EVENTS = 10000;
public static final int DEFAULT_WASM_EVALUATOR_POOL_SIZE =
Runtime.getRuntime().availableProcessors();
// MAPPERS
public static final ObjectMapper DESERIALIZE_OBJECT_MAPPER =
new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package dev.openfeature.contrib.providers.gofeatureflag.wasm;

import dev.openfeature.contrib.providers.gofeatureflag.bean.GoFeatureFlagResponse;
import dev.openfeature.contrib.providers.gofeatureflag.exception.WasmFileNotFound;
import dev.openfeature.contrib.providers.gofeatureflag.wasm.bean.WasmInput;
import dev.openfeature.sdk.ErrorCode;
import dev.openfeature.sdk.Reason;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import lombok.extern.slf4j.Slf4j;

/**
* WasmEvaluatorPool manages a fixed pool of EvaluationWasm instances.
* Each instance owns independent WASM linear memory, allowing concurrent
* evaluate() calls without interleaving memory operations.
*/
@Slf4j
public final class WasmEvaluatorPool {
private final BlockingQueue<EvaluationWasm> pool;

/**
* Creates a pool of {@code size} independent EvaluationWasm instances.
* All instances are allocated eagerly so that first-call latency is
* absorbed at provider initialisation time.
*
* @param size number of WASM instances; must be >= 1
* @throws WasmFileNotFound if the embedded WASM module cannot be loaded
*/
public WasmEvaluatorPool(int size) throws WasmFileNotFound {
this.pool = new ArrayBlockingQueue<>(size);
for (int i = 0; i < size; i++) {
EvaluationWasm instance = new EvaluationWasm();
instance.preWarmWasm();
pool.add(instance);
}
}
Comment thread
thomaspoignant marked this conversation as resolved.

/**
* Evaluates a feature flag by borrowing one WASM instance from the pool,
* delegating to it, and returning it when done.
* Blocks if all instances are busy until one becomes available.
*
* @param wasmInput evaluation input
* @return evaluation result
*/
public GoFeatureFlagResponse evaluate(WasmInput wasmInput) {
EvaluationWasm instance;
try {
instance = pool.take();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
GoFeatureFlagResponse err = new GoFeatureFlagResponse();
err.setErrorCode(ErrorCode.GENERAL.name());
err.setReason(Reason.ERROR.name());
err.setErrorDetails("WASM evaluator pool interrupted while waiting for an available instance");
return err;
}
try {
return instance.evaluate(wasmInput);
} finally {
if (!pool.offer(instance)) {
log.error("Failed to return WASM instance to pool — instance leaked, pool capacity may be compromised");
}
Comment thread
thomaspoignant marked this conversation as resolved.
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import lombok.val;
Expand Down Expand Up @@ -596,6 +599,49 @@ void shouldNotApplyAScheduledRolloutStepIfTheDateIsInTheFuture() {
assertEquals(want, got);
}
}

@DisplayName("Should evaluate flags correctly under concurrent access")
@SneakyThrows
@Test
void shouldEvaluateFlagsCorrectlyUnderConcurrentAccess() {
GoFeatureFlagProvider provider = new GoFeatureFlagProvider(GoFeatureFlagProviderOptions.builder()
.endpoint(baseUrl.toString())
.evaluationType(EvaluationType.IN_PROCESS)
.flagChangePollingIntervalMs(999999L)
.build());
OpenFeatureAPI.getInstance().setProviderAndWait(testName, provider);
val client = OpenFeatureAPI.getInstance().getClient(testName);

int threadCount = 20;
int evaluationsPerThread = 100;
AtomicInteger errorCount = new AtomicInteger(0);
CountDownLatch startGate = new CountDownLatch(1);
CountDownLatch doneLatch = new CountDownLatch(threadCount);

for (int t = 0; t < threadCount; t++) {
new Thread(() -> {
try {
startGate.await();
for (int i = 0; i < evaluationsPerThread; i++) {
FlagEvaluationDetails<Boolean> result = client.getBooleanDetails(
"bool_targeting_match", false, TestUtils.defaultEvaluationContext);
if (result.getErrorCode() != null) {
errorCount.incrementAndGet();
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
doneLatch.countDown();
}
})
.start();
}

startGate.countDown();
assertTrue(doneLatch.await(30, TimeUnit.SECONDS), "Threads did not finish in time");
assertEquals(0, errorCount.get(), "Concurrent evaluations produced errors");
}
}

@Nested
Expand Down
Loading