builder, DataSourceBuildInputs context) {
+ this.builder = builder;
+ this.context = context;
+ }
+
+ @Override
+ public TDataSource build() {
+ return builder.build(context);
+ }
+ }
+
/**
* Creates a new FDv2DataSystem instance.
*
@@ -55,18 +80,79 @@ private FDv2DataSystem(
* @throws UnsupportedOperationException since this is not yet fully implemented
*/
static FDv2DataSystem create(
- LDLogger logger,
- LDConfig config,
- ClientContextImpl clientContext,
- LoggingConfiguration logConfig
+ LDLogger logger,
+ LDConfig config,
+ ClientContextImpl clientContext,
+ LoggingConfiguration logConfig
) {
if (config.dataSystem == null) {
throw new IllegalArgumentException("DataSystem configuration is required for FDv2DataSystem");
}
-
- // TODO: Implement FDv2DataSystem once all dependencies are available
-
- throw new UnsupportedOperationException("FDv2DataSystem is not yet fully implemented");
+ DataStoreUpdatesImpl dataStoreUpdates = new DataStoreUpdatesImpl(
+ EventBroadcasterImpl.forDataStoreStatus(clientContext.sharedExecutor, logger));
+
+ InMemoryDataStore store = new InMemoryDataStore();
+
+ DataStoreStatusProvider dataStoreStatusProvider = new DataStoreStatusProviderImpl(store, dataStoreUpdates);
+
+ // Create a single flag change broadcaster to be shared between DataSourceUpdatesImpl and FlagTrackerImpl
+ EventBroadcasterImpl flagChangeBroadcaster =
+ EventBroadcasterImpl.forFlagChangeEvents(clientContext.sharedExecutor, logger);
+
+ // Create a single data source status broadcaster to be shared between DataSourceUpdatesImpl and DataSourceStatusProviderImpl
+ EventBroadcasterImpl dataSourceStatusBroadcaster =
+ EventBroadcasterImpl.forDataSourceStatus(clientContext.sharedExecutor, logger);
+
+ DataSourceUpdatesImpl dataSourceUpdates = new DataSourceUpdatesImpl(
+ store,
+ dataStoreStatusProvider,
+ flagChangeBroadcaster,
+ dataSourceStatusBroadcaster,
+ clientContext.sharedExecutor,
+ logConfig.getLogDataSourceOutageAsErrorAfter(),
+ logger
+ );
+
+ DataSystemConfiguration dataSystemConfiguration = config.dataSystem.build();
+ SelectorSource selectorSource = new SelectorSourceFacade(store);
+
+ DataSourceBuildInputs builderContext = new DataSourceBuildInputs(
+ clientContext.getBaseLogger(),
+ clientContext.getThreadPriority(),
+ dataSourceUpdates,
+ clientContext.getServiceEndpoints(),
+ clientContext.getHttp(),
+ clientContext.sharedExecutor,
+ clientContext.diagnosticStore,
+ selectorSource
+ );
+
+ ImmutableList> initializerFactories = dataSystemConfiguration.getInitializers().stream()
+ .map(initializer -> new FactoryWrapper<>(initializer, builderContext))
+ .collect(ImmutableList.toImmutableList());
+
+ ImmutableList> synchronizerFactories = dataSystemConfiguration.getSynchronizers().stream()
+ .map(synchronizer -> new FactoryWrapper<>(synchronizer, builderContext))
+ .collect(ImmutableList.toImmutableList());
+
+ DataSource dataSource = new FDv2DataSource(
+ initializerFactories,
+ synchronizerFactories,
+ dataSourceUpdates
+ );
+ DataSourceStatusProvider dataSourceStatusProvider = new DataSourceStatusProviderImpl(
+ dataSourceStatusBroadcaster,
+ dataSourceUpdates);
+
+ FlagChangeNotifier flagChanged = new FlagChangedFacade(dataSourceUpdates);
+
+ return new FDv2DataSystem(
+ store,
+ dataSource,
+ dataSourceStatusProvider,
+ dataStoreStatusProvider,
+ flagChanged
+ );
}
@Override
@@ -76,8 +162,7 @@ public ReadOnlyStore getStore() {
@Override
public Future start() {
- // TODO: Implement FDv2DataSystem.start() once all dependencies are available
- throw new UnsupportedOperationException("FDv2DataSystem.start() is not yet implemented");
+ return dataSource.start();
}
@Override
@@ -106,12 +191,8 @@ public void close() throws IOException {
return;
}
try {
- if (dataSource instanceof Closeable) {
- ((Closeable) dataSource).close();
- }
- if (store instanceof Closeable) {
- ((Closeable) store).close();
- }
+ dataSource.close();
+ store.close();
} finally {
disposed = true;
}
diff --git a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/SelectorSourceFacade.java b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/SelectorSourceFacade.java
new file mode 100644
index 0000000..6ce01af
--- /dev/null
+++ b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/SelectorSourceFacade.java
@@ -0,0 +1,17 @@
+package com.launchdarkly.sdk.server;
+
+import com.launchdarkly.sdk.internal.fdv2.sources.Selector;
+import com.launchdarkly.sdk.server.datasources.SelectorSource;
+import com.launchdarkly.sdk.server.subsystems.TransactionalDataStore;
+
+class SelectorSourceFacade implements SelectorSource {
+ private final TransactionalDataStore store;
+ public SelectorSourceFacade(TransactionalDataStore store) {
+ this.store = store;
+ }
+
+ @Override
+ public Selector getSelector() {
+ return store.getSelector();
+ }
+}
diff --git a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/StreamingSynchronizerImpl.java b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/StreamingSynchronizerImpl.java
index 7d5b558..c5d52f3 100644
--- a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/StreamingSynchronizerImpl.java
+++ b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/StreamingSynchronizerImpl.java
@@ -98,10 +98,7 @@ private void startStream() {
// Add selector query parameters if the selector is not empty
if (!selector.isEmpty()) {
- updatedUri = HttpHelpers.addQueryParam(updatedUri, "version", String.valueOf(selector.getVersion()));
- if (selector.getState() != null && !selector.getState().isEmpty()) {
- updatedUri = HttpHelpers.addQueryParam(updatedUri, "state", selector.getState());
- }
+ updatedUri = HttpHelpers.addQueryParam(updatedUri, "basis", selector.getState());
}
// Add the payloadFilter query parameter if present and non-empty
diff --git a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/integrations/DataSystemBuilder.java b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/integrations/DataSystemBuilder.java
index cf06377..ccf018c 100644
--- a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/integrations/DataSystemBuilder.java
+++ b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/integrations/DataSystemBuilder.java
@@ -1,12 +1,12 @@
package com.launchdarkly.sdk.server.integrations;
import com.google.common.collect.ImmutableList;
-import com.launchdarkly.sdk.server.subsystems.ComponentConfigurer;
-import com.launchdarkly.sdk.server.subsystems.DataSource;
-import com.launchdarkly.sdk.server.subsystems.DataStore;
-import com.launchdarkly.sdk.server.subsystems.DataSystemConfiguration;
+import com.launchdarkly.sdk.server.datasources.Initializer;
+import com.launchdarkly.sdk.server.datasources.Synchronizer;
+import com.launchdarkly.sdk.server.subsystems.*;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
/**
@@ -18,67 +18,63 @@
*/
public final class DataSystemBuilder {
- private final List> initializers = new ArrayList<>();
- private final List> synchronizers = new ArrayList<>();
+ private final List> initializers = new ArrayList<>();
+ private final List> synchronizers = new ArrayList<>();
private ComponentConfigurer fDv1FallbackSynchronizer;
private ComponentConfigurer persistentStore;
private DataSystemConfiguration.DataStoreMode persistentDataStoreMode;
/**
* Add one or more initializers to the builder.
- * To replace initializers, please refer to {@link #replaceInitializers(ComponentConfigurer[])}.
- *
+ * To replace initializers, please refer to {@link #replaceInitializers(DataSourceBuilder[])}.
+ *
* @param initializers the initializers to add
* @return a reference to the builder
*/
- public DataSystemBuilder initializers(ComponentConfigurer... initializers) {
- for (ComponentConfigurer initializer : initializers) {
- this.initializers.add(initializer);
- }
+ @SafeVarargs
+ public final DataSystemBuilder initializers(DataSourceBuilder... initializers) {
+ this.initializers.addAll(Arrays.asList(initializers));
return this;
}
/**
* Replaces any existing initializers with the given initializers.
- * To add initializers, please refer to {@link #initializers(ComponentConfigurer[])}.
- *
+ * To add initializers, please refer to {@link #initializers(DataSourceBuilder[])}.
+ *
* @param initializers the initializers to replace the current initializers with
* @return a reference to this builder
*/
- public DataSystemBuilder replaceInitializers(ComponentConfigurer... initializers) {
+ @SafeVarargs
+ public final DataSystemBuilder replaceInitializers(DataSourceBuilder... initializers) {
this.initializers.clear();
- for (ComponentConfigurer initializer : initializers) {
- this.initializers.add(initializer);
- }
+ this.initializers.addAll(Arrays.asList(initializers));
return this;
}
/**
* Add one or more synchronizers to the builder.
- * To replace synchronizers, please refer to {@link #replaceSynchronizers(ComponentConfigurer[])}.
- *
+ * To replace synchronizers, please refer to {@link #replaceSynchronizers(DataSourceBuilder[])}.
+ *
* @param synchronizers the synchronizers to add
* @return a reference to the builder
*/
- public DataSystemBuilder synchronizers(ComponentConfigurer... synchronizers) {
- for (ComponentConfigurer synchronizer : synchronizers) {
- this.synchronizers.add(synchronizer);
- }
+ @SafeVarargs
+ public final DataSystemBuilder synchronizers(DataSourceBuilder... synchronizers) {
+ this.synchronizers.addAll(Arrays.asList(synchronizers));
return this;
}
/**
* Replaces any existing synchronizers with the given synchronizers.
- * To add synchronizers, please refer to {@link #synchronizers(ComponentConfigurer[])}.
- *
+ * To add synchronizers, please refer to {@link #synchronizers(DataSourceBuilder[])}.
+ *
* @param synchronizers the synchronizers to replace the current synchronizers with
* @return a reference to this builder
*/
- public DataSystemBuilder replaceSynchronizers(ComponentConfigurer... synchronizers) {
+ @SafeVarargs
+ public final DataSystemBuilder replaceSynchronizers(DataSourceBuilder... synchronizers) {
this.synchronizers.clear();
- for (ComponentConfigurer synchronizer : synchronizers) {
- this.synchronizers.add(synchronizer);
- }
+ this.synchronizers.addAll(Arrays.asList(synchronizers));
return this;
}
@@ -87,11 +83,14 @@ public DataSystemBuilder replaceSynchronizers(ComponentConfigurer...
*
* LaunchDarkly can instruct the SDK to fall back to this synchronizer.
*
- *
+ *
* @param fDv1FallbackSynchronizer the FDv1 fallback synchronizer
* @return a reference to the builder
*/
+ @SuppressWarnings("unchecked")
public DataSystemBuilder fDv1FallbackSynchronizer(ComponentConfigurer fDv1FallbackSynchronizer) {
+ // Legacy DataSource configurers are used for FDv1 backward compatibility
+ // This is safe because DataSource is only used in the fallback context
this.fDv1FallbackSynchronizer = fDv1FallbackSynchronizer;
return this;
}
diff --git a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/integrations/DataSystemComponents.java b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/integrations/DataSystemComponents.java
deleted file mode 100644
index 43d5854..0000000
--- a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/integrations/DataSystemComponents.java
+++ /dev/null
@@ -1,46 +0,0 @@
-package com.launchdarkly.sdk.server.integrations;
-
-import com.launchdarkly.sdk.server.Components;
-
-/**
- * Components for use with the data system.
- *
- * This class is not stable, and not subject to any backwards compatibility guarantees or semantic versioning.
- * It is in early access. If you want access to this feature please join the EAP. https://launchdarkly.com/docs/sdk/features/data-saving-mode
- *
- */
-public final class DataSystemComponents {
-
- private DataSystemComponents() {}
-
- /**
- * Get a builder for a polling data source.
- *
- * @return the polling data source builder
- */
- public static FDv2PollingDataSourceBuilder polling() {
- return new FDv2PollingDataSourceBuilder();
- }
-
- /**
- * Get a builder for a streaming data source.
- *
- * @return the streaming data source builder
- */
- public static FDv2StreamingDataSourceBuilder streaming() {
- return new FDv2StreamingDataSourceBuilder();
- }
-
- /**
- * Get a builder for a FDv1 compatible polling data source.
- *
- * This is intended for use as a fallback.
- *
- *
- * @return the FDv1 compatible polling data source builder
- */
- public static PollingDataSourceBuilder fDv1Polling() {
- return Components.pollingDataSource();
- }
-}
-
diff --git a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/integrations/DataSystemModes.java b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/integrations/DataSystemModes.java
index 8254415..8d2d9b4 100644
--- a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/integrations/DataSystemModes.java
+++ b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/integrations/DataSystemModes.java
@@ -1,5 +1,6 @@
package com.launchdarkly.sdk.server.integrations;
+import com.launchdarkly.sdk.server.DataSystemComponents;
import com.launchdarkly.sdk.server.subsystems.ComponentConfigurer;
import com.launchdarkly.sdk.server.subsystems.DataStore;
import com.launchdarkly.sdk.server.subsystems.DataSystemConfiguration;
@@ -39,8 +40,8 @@ public final class DataSystemModes {
*/
public DataSystemBuilder defaultMode() {
return custom()
- .initializers(DataSystemComponents.polling())
- .synchronizers(DataSystemComponents.streaming(), DataSystemComponents.polling())
+ .initializers(DataSystemComponents.pollingInitializer())
+ .synchronizers(DataSystemComponents.streamingSynchronizer(), DataSystemComponents.pollingSynchronizer())
.fDv1FallbackSynchronizer(DataSystemComponents.fDv1Polling());
}
@@ -66,7 +67,7 @@ public DataSystemBuilder defaultMode() {
*/
public DataSystemBuilder streaming() {
return custom()
- .synchronizers(DataSystemComponents.streaming())
+ .synchronizers(DataSystemComponents.streamingSynchronizer())
.fDv1FallbackSynchronizer(DataSystemComponents.fDv1Polling());
}
@@ -88,7 +89,7 @@ public DataSystemBuilder streaming() {
*/
public DataSystemBuilder polling() {
return custom()
- .synchronizers(DataSystemComponents.polling())
+ .synchronizers(DataSystemComponents.pollingSynchronizer())
.fDv1FallbackSynchronizer(DataSystemComponents.fDv1Polling());
}
@@ -144,8 +145,8 @@ public DataSystemBuilder persistentStore(ComponentConfigurer persiste
*
* LDConfig config = new LDConfig.Builder("my-sdk-key")
* .dataSystem(Components.dataSystem().custom()
- * .initializers(DataSystemComponents.polling())
- * .synchronizers(DataSystemComponents.streaming(), DataSystemComponents.polling())
+ * .initializers(DataSystemComponents.pollingInitializer())
+ * .synchronizers(DataSystemComponents.streamingSynchronizer(), DataSystemComponents.pollingSynchronizer())
* .fDv1FallbackSynchronizer(DataSystemComponents.fDv1Polling()));
*
*
diff --git a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/integrations/FDv2PollingInitializerBuilder.java b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/integrations/FDv2PollingInitializerBuilder.java
new file mode 100644
index 0000000..1dcc5f6
--- /dev/null
+++ b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/integrations/FDv2PollingInitializerBuilder.java
@@ -0,0 +1,63 @@
+package com.launchdarkly.sdk.server.integrations;
+
+import com.launchdarkly.sdk.LDValue;
+import com.launchdarkly.sdk.internal.events.DiagnosticConfigProperty;
+import com.launchdarkly.sdk.server.StandardEndpoints;
+import com.launchdarkly.sdk.server.datasources.Initializer;
+import com.launchdarkly.sdk.server.interfaces.ServiceEndpoints;
+import com.launchdarkly.sdk.server.subsystems.ClientContext;
+import com.launchdarkly.sdk.server.subsystems.DataSourceBuilder;
+import com.launchdarkly.sdk.server.subsystems.DiagnosticDescription;
+
+
+/**
+ * Contains methods for configuring the polling initializer.
+ *
+ * This class is not stable, and not subject to any backwards compatibility guarantees or semantic versioning.
+ * It is in early access. If you want access to this feature please join the EAP. https://launchdarkly.com/docs/sdk/features/data-saving-mode
+ *
+ *
+ * Example:
+ *
+ *
+ * LDConfig config = new LDConfig.Builder("my-sdk-key")
+ * .dataSystem(Components.dataSystem().custom()
+ * // DataSystemComponents.pollingInitializer() returns an instance of this builder.
+ * .initializers(DataSystemComponents.pollingInitializer()
+ * .pollInterval(Duration.ofMinutes(10)))
+ * .synchronizers(DataSystemComponents.streamingSynchronizer(), DataSystemComponents.pollingSynchronizer())
+ * .fDv1FallbackSynchronizer(DataSystemComponents.fDv1Polling()));
+ *
+ */
+public abstract class FDv2PollingInitializerBuilder implements DataSourceBuilder, DiagnosticDescription {
+ protected ServiceEndpoints serviceEndpointsOverride;
+
+ /**
+ * Sets overrides for the service endpoints. In typical usage, the initializer will use the commonly defined
+ * service endpoints, but for cases where they need to be controlled at the source level, this method can
+ * be used. This initializer will only use the endpoints applicable to it.
+ *
+ * @param serviceEndpointsOverride the service endpoints to override the base endpoints
+ * @return the builder
+ */
+ public FDv2PollingInitializerBuilder serviceEndpointsOverride(ServiceEndpointsBuilder serviceEndpointsOverride) {
+ this.serviceEndpointsOverride = serviceEndpointsOverride.createServiceEndpoints();
+ return this;
+ }
+
+ @Override
+ public LDValue describeConfiguration(ClientContext context) {
+ ServiceEndpoints endpoints = serviceEndpointsOverride != null
+ ? serviceEndpointsOverride
+ : context.getServiceEndpoints();
+
+ boolean customPollingBaseUri = StandardEndpoints.isCustomBaseUri(
+ endpoints.getPollingBaseUri(), StandardEndpoints.DEFAULT_POLLING_BASE_URI);
+
+ return LDValue.buildObject()
+ .put(DiagnosticConfigProperty.STREAMING_DISABLED.name, true)
+ .put(DiagnosticConfigProperty.CUSTOM_BASE_URI.name, customPollingBaseUri)
+ .put(DiagnosticConfigProperty.USING_RELAY_DAEMON.name, false)
+ .build();
+ }
+}
diff --git a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/integrations/FDv2PollingDataSourceBuilder.java b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/integrations/FDv2PollingSynchronizerBuilder.java
similarity index 58%
rename from lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/integrations/FDv2PollingDataSourceBuilder.java
rename to lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/integrations/FDv2PollingSynchronizerBuilder.java
index 81a40aa..f3ec521 100644
--- a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/integrations/FDv2PollingDataSourceBuilder.java
+++ b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/integrations/FDv2PollingSynchronizerBuilder.java
@@ -3,17 +3,17 @@
import com.launchdarkly.sdk.LDValue;
import com.launchdarkly.sdk.internal.events.DiagnosticConfigProperty;
import com.launchdarkly.sdk.server.StandardEndpoints;
+import com.launchdarkly.sdk.server.datasources.Synchronizer;
import com.launchdarkly.sdk.server.interfaces.ServiceEndpoints;
import com.launchdarkly.sdk.server.subsystems.ClientContext;
-import com.launchdarkly.sdk.server.subsystems.ComponentConfigurer;
-import com.launchdarkly.sdk.server.subsystems.DataSource;
+import com.launchdarkly.sdk.server.subsystems.DataSourceBuilder;
import com.launchdarkly.sdk.server.subsystems.DiagnosticDescription;
import java.net.URI;
import java.time.Duration;
/**
- * Contains methods for configuring the polling data source.
+ * Contains methods for configuring the polling synchronizer.
*
* This class is not stable, and not subject to any backwards compatibility guarantees or semantic versioning.
* It is in early access. If you want access to this feature please join the EAP. https://launchdarkly.com/docs/sdk/features/data-saving-mode
@@ -24,22 +24,23 @@
*
* LDConfig config = new LDConfig.Builder("my-sdk-key")
* .dataSystem(Components.dataSystem().custom()
- * // DataSystemComponents.polling() returns an instance of this builder.
- * .initializers(DataSystemComponents.polling()
- * .pollInterval(Duration.ofMinutes(10)))
- * .synchronizers(DataSystemComponents.streaming(), DataSystemComponents.polling())
+ * .initializers(DataSystemComponents.pollingInitializer())
+ * // DataSystemComponents.pollingSynchronizer() returns an instance of this builder.
+ * .synchronizers(DataSystemComponents.streamingSynchronizer(),
+ * DataSystemComponents.pollingSynchronizer()
+ * .pollInterval(Duration.ofMinutes(10)))
* .fDv1FallbackSynchronizer(DataSystemComponents.fDv1Polling()));
*
*/
-public final class FDv2PollingDataSourceBuilder implements ComponentConfigurer, DiagnosticDescription {
+public abstract class FDv2PollingSynchronizerBuilder implements DataSourceBuilder, DiagnosticDescription {
/**
* The default value for {@link #pollInterval(Duration)}: 30 seconds.
*/
public static final Duration DEFAULT_POLL_INTERVAL = Duration.ofSeconds(30);
- Duration pollInterval = DEFAULT_POLL_INTERVAL;
+ protected Duration pollInterval = DEFAULT_POLL_INTERVAL;
- private ServiceEndpoints serviceEndpointsOverride;
+ protected ServiceEndpoints serviceEndpointsOverride;
/**
* Sets the interval at which the SDK will poll for feature flag updates.
@@ -47,11 +48,11 @@ public final class FDv2PollingDataSourceBuilder implements ComponentConfigurer
- *
+ *
* @param pollInterval the polling interval
* @return the builder
*/
- public FDv2PollingDataSourceBuilder pollInterval(Duration pollInterval) {
+ public FDv2PollingSynchronizerBuilder pollInterval(Duration pollInterval) {
this.pollInterval = pollInterval != null && pollInterval.compareTo(DEFAULT_POLL_INTERVAL) >= 0
? pollInterval
: DEFAULT_POLL_INTERVAL;
@@ -60,64 +61,37 @@ public FDv2PollingDataSourceBuilder pollInterval(Duration pollInterval) {
/**
* Exposed internally for testing.
- *
+ *
* @param pollInterval the polling interval
* @return the builder
*/
- FDv2PollingDataSourceBuilder pollIntervalNoMinimum(Duration pollInterval) {
+ FDv2PollingSynchronizerBuilder pollIntervalNoMinimum(Duration pollInterval) {
this.pollInterval = pollInterval;
return this;
}
/**
- * Sets overrides for the service endpoints. In typical usage, the data source will use the commonly defined
+ * Sets overrides for the service endpoints. In typical usage, the synchronizer will use the commonly defined
* service endpoints, but for cases where they need to be controlled at the source level, this method can
- * be used. This data source will only use the endpoints applicable to it.
- *
+ * be used. This synchronizer will only use the endpoints applicable to it.
+ *
* @param serviceEndpointsOverride the service endpoints to override the base endpoints
* @return the builder
*/
- public FDv2PollingDataSourceBuilder serviceEndpointsOverride(ServiceEndpointsBuilder serviceEndpointsOverride) {
+ public FDv2PollingSynchronizerBuilder serviceEndpointsOverride(ServiceEndpointsBuilder serviceEndpointsOverride) {
this.serviceEndpointsOverride = serviceEndpointsOverride.createServiceEndpoints();
return this;
}
- @Override
- public DataSource build(ClientContext context) {
- ServiceEndpoints endpoints = serviceEndpointsOverride != null
- ? serviceEndpointsOverride
- : context.getServiceEndpoints();
- URI configuredBaseUri = StandardEndpoints.selectBaseUri(
- endpoints.getPollingBaseUri(),
- StandardEndpoints.DEFAULT_POLLING_BASE_URI,
- "Polling",
- context.getBaseLogger());
-
- // TODO: Implement FDv2PollingRequestor
- // var requestor = new FDv2PollingRequestor(context, configuredBaseUri);
-
- // TODO: Implement FDv2PollingDataSource
- // return new FDv2PollingDataSource(
- // context,
- // context.getDataSourceUpdateSink(),
- // requestor,
- // pollInterval,
- // () -> context.getSelectorSource() != null ? context.getSelectorSource().getSelector() : Selector.empty()
- // );
-
- // Placeholder - this will not compile until FDv2PollingDataSource is implemented
- throw new UnsupportedOperationException("FDv2PollingDataSource is not yet implemented");
- }
-
@Override
public LDValue describeConfiguration(ClientContext context) {
ServiceEndpoints endpoints = serviceEndpointsOverride != null
? serviceEndpointsOverride
: context.getServiceEndpoints();
-
+
boolean customPollingBaseUri = StandardEndpoints.isCustomBaseUri(
endpoints.getPollingBaseUri(), StandardEndpoints.DEFAULT_POLLING_BASE_URI);
-
+
return LDValue.buildObject()
.put(DiagnosticConfigProperty.STREAMING_DISABLED.name, true)
.put(DiagnosticConfigProperty.CUSTOM_BASE_URI.name, customPollingBaseUri)
@@ -126,4 +100,3 @@ public LDValue describeConfiguration(ClientContext context) {
.build();
}
}
-
diff --git a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/integrations/FDv2StreamingDataSourceBuilder.java b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/integrations/FDv2StreamingSynchronizerBuilder.java
similarity index 62%
rename from lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/integrations/FDv2StreamingDataSourceBuilder.java
rename to lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/integrations/FDv2StreamingSynchronizerBuilder.java
index 8e08531..5464acf 100644
--- a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/integrations/FDv2StreamingDataSourceBuilder.java
+++ b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/integrations/FDv2StreamingSynchronizerBuilder.java
@@ -3,17 +3,17 @@
import com.launchdarkly.sdk.LDValue;
import com.launchdarkly.sdk.internal.events.DiagnosticConfigProperty;
import com.launchdarkly.sdk.server.StandardEndpoints;
+import com.launchdarkly.sdk.server.datasources.Synchronizer;
import com.launchdarkly.sdk.server.interfaces.ServiceEndpoints;
import com.launchdarkly.sdk.server.subsystems.ClientContext;
-import com.launchdarkly.sdk.server.subsystems.ComponentConfigurer;
-import com.launchdarkly.sdk.server.subsystems.DataSource;
+import com.launchdarkly.sdk.server.subsystems.DataSourceBuilder;
import com.launchdarkly.sdk.server.subsystems.DiagnosticDescription;
import java.net.URI;
import java.time.Duration;
/**
- * Contains methods for configuring the streaming data source.
+ * Contains methods for configuring the streaming synchronizer.
*
* This class is not stable, and not subject to any backwards compatibility guarantees or semantic versioning.
* It is in early access. If you want access to this feature please join the EAP. https://launchdarkly.com/docs/sdk/features/data-saving-mode
@@ -24,22 +24,22 @@
*
* LDConfig config = new LDConfig.Builder("my-sdk-key")
* .dataSystem(Components.dataSystem().custom()
- * .initializers(DataSystemComponents.polling())
- * // DataSystemComponents.streaming() returns an instance of this builder.
- * .synchronizers(DataSystemComponents.streaming()
- * .initialReconnectDelay(Duration.ofSeconds(5)), DataSystemComponents.polling())
+ * .initializers(DataSystemComponents.pollingInitializer())
+ * // DataSystemComponents.streamingSynchronizer() returns an instance of this builder.
+ * .synchronizers(DataSystemComponents.streamingSynchronizer()
+ * .initialReconnectDelay(Duration.ofSeconds(5)), DataSystemComponents.pollingSynchronizer())
* .fDv1FallbackSynchronizer(DataSystemComponents.fDv1Polling()));
*
*/
-public final class FDv2StreamingDataSourceBuilder implements ComponentConfigurer, DiagnosticDescription {
+public abstract class FDv2StreamingSynchronizerBuilder implements DataSourceBuilder, DiagnosticDescription {
/**
* The default value for {@link #initialReconnectDelay(Duration)}: 1000 milliseconds.
*/
public static final Duration DEFAULT_INITIAL_RECONNECT_DELAY = Duration.ofSeconds(1);
- private Duration initialReconnectDelay = DEFAULT_INITIAL_RECONNECT_DELAY;
+ protected Duration initialReconnectDelay = DEFAULT_INITIAL_RECONNECT_DELAY;
- private ServiceEndpoints serviceEndpointsOverride;
+ protected ServiceEndpoints serviceEndpointsOverride;
/**
* Sets the initial reconnect delay for the streaming connection.
@@ -51,63 +51,39 @@ public final class FDv2StreamingDataSourceBuilder implements ComponentConfigurer
*
* The default value is {@link #DEFAULT_INITIAL_RECONNECT_DELAY}.
*
- *
+ *
* @param initialReconnectDelay the reconnect time base value
* @return the builder
*/
- public FDv2StreamingDataSourceBuilder initialReconnectDelay(Duration initialReconnectDelay) {
+ public FDv2StreamingSynchronizerBuilder initialReconnectDelay(Duration initialReconnectDelay) {
this.initialReconnectDelay = initialReconnectDelay != null ? initialReconnectDelay : DEFAULT_INITIAL_RECONNECT_DELAY;
return this;
}
/**
- * Sets overrides for the service endpoints. In typical usage, the data source will use the commonly defined
+ * Sets overrides for the service endpoints. In typical usage, the synchronizer will use the commonly defined
* service endpoints, but for cases where they need to be controlled at the source level, this method can
- * be used. This data source will only use the endpoints applicable to it.
- *
+ * be used. This synchronizer will only use the endpoints applicable to it.
+ *
* @param serviceEndpointsOverride the service endpoints to override the base endpoints
* @return the builder
*/
- public FDv2StreamingDataSourceBuilder serviceEndpointsOverride(ServiceEndpointsBuilder serviceEndpointsOverride) {
+ public FDv2StreamingSynchronizerBuilder serviceEndpointsOverride(ServiceEndpointsBuilder serviceEndpointsOverride) {
this.serviceEndpointsOverride = serviceEndpointsOverride.createServiceEndpoints();
return this;
}
- @Override
- public DataSource build(ClientContext context) {
- ServiceEndpoints endpoints = serviceEndpointsOverride != null
- ? serviceEndpointsOverride
- : context.getServiceEndpoints();
- URI configuredBaseUri = StandardEndpoints.selectBaseUri(
- endpoints.getStreamingBaseUri(),
- StandardEndpoints.DEFAULT_STREAMING_BASE_URI,
- "Streaming",
- context.getBaseLogger());
-
- // TODO: Implement FDv2StreamingDataSource
- // return new FDv2StreamingDataSource(
- // context,
- // context.getDataSourceUpdateSink(),
- // configuredBaseUri,
- // initialReconnectDelay,
- // () -> context.getSelectorSource() != null ? context.getSelectorSource().getSelector() : Selector.empty()
- // );
-
- // Placeholder - this will not compile until FDv2StreamingDataSource is implemented
- throw new UnsupportedOperationException("FDv2StreamingDataSource is not yet implemented");
- }
-
@Override
public LDValue describeConfiguration(ClientContext context) {
ServiceEndpoints endpoints = serviceEndpointsOverride != null
? serviceEndpointsOverride
: context.getServiceEndpoints();
-
+
boolean customStreamingBaseUri = StandardEndpoints.isCustomBaseUri(
endpoints.getStreamingBaseUri(), StandardEndpoints.DEFAULT_STREAMING_BASE_URI);
boolean customPollingBaseUri = StandardEndpoints.isCustomBaseUri(
endpoints.getPollingBaseUri(), StandardEndpoints.DEFAULT_POLLING_BASE_URI);
-
+
return LDValue.buildObject()
.put(DiagnosticConfigProperty.STREAMING_DISABLED.name, false)
.put(DiagnosticConfigProperty.CUSTOM_BASE_URI.name, customPollingBaseUri)
@@ -117,4 +93,3 @@ public LDValue describeConfiguration(ClientContext context) {
.build();
}
}
-
diff --git a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/subsystems/DataSourceBuildInputs.java b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/subsystems/DataSourceBuildInputs.java
new file mode 100644
index 0000000..ac2d4da
--- /dev/null
+++ b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/subsystems/DataSourceBuildInputs.java
@@ -0,0 +1,132 @@
+package com.launchdarkly.sdk.server.subsystems;
+
+import com.launchdarkly.logging.LDLogger;
+import com.launchdarkly.sdk.internal.events.DiagnosticStore;
+import com.launchdarkly.sdk.server.datasources.SelectorSource;
+import com.launchdarkly.sdk.server.interfaces.ServiceEndpoints;
+
+import java.util.concurrent.ScheduledExecutorService;
+
+/**
+ * Build information (dependencies and configuration) provided to initializer and synchronizer builders.
+ *
+ * This class is not stable, and not subject to any backwards compatibility guarantees or semantic versioning.
+ * It is in early access. If you want access to this feature, please join the EAP. https://launchdarkly.com/docs/sdk/features/data-saving-mode
+ *
+ * This consolidates all the parameters needed to construct data source components,
+ * including HTTP configuration, logging, scheduling, and selector state.
+ */
+public final class DataSourceBuildInputs {
+ private final LDLogger baseLogger;
+ private final int threadPriority;
+ private final DataSourceUpdateSink dataSourceUpdates;
+ private final ServiceEndpoints serviceEndpoints;
+ private final HttpConfiguration http;
+ private final ScheduledExecutorService sharedExecutor;
+ private final DiagnosticStore diagnosticStore;
+ private final SelectorSource selectorSource;
+
+ /**
+ * Constructs a DataSourceBuilderContext.
+ *
+ * @param baseLogger the base logger instance
+ * @param threadPriority the thread priority for worker threads
+ * @param dataSourceUpdates the data source update sink
+ * @param serviceEndpoints the service endpoint URIs
+ * @param http HTTP configuration properties
+ * @param sharedExecutor shared executor service for scheduling
+ * @param diagnosticStore diagnostic data accumulator (may be null)
+ * @param selectorSource source for obtaining selectors
+ */
+ public DataSourceBuildInputs(
+ LDLogger baseLogger,
+ int threadPriority,
+ DataSourceUpdateSink dataSourceUpdates,
+ ServiceEndpoints serviceEndpoints,
+ HttpConfiguration http,
+ ScheduledExecutorService sharedExecutor,
+ DiagnosticStore diagnosticStore,
+ SelectorSource selectorSource
+ ) {
+ this.baseLogger = baseLogger;
+ this.threadPriority = threadPriority;
+ this.dataSourceUpdates = dataSourceUpdates;
+ this.serviceEndpoints = serviceEndpoints;
+ this.http = http;
+ this.sharedExecutor = sharedExecutor;
+ this.diagnosticStore = diagnosticStore;
+ this.selectorSource = selectorSource;
+ }
+
+ /**
+ * Returns the base logger instance.
+ *
+ * @return the base logger
+ */
+ public LDLogger getBaseLogger() {
+ return baseLogger;
+ }
+
+ /**
+ * Returns the thread priority for worker threads.
+ *
+ * @return the thread priority
+ */
+ public int getThreadPriority() {
+ return threadPriority;
+ }
+
+ /**
+ * Returns the data source update sink.
+ *
+ * @return the data source update sink
+ */
+ public DataSourceUpdateSink getDataSourceUpdates() {
+ return dataSourceUpdates;
+ }
+
+ /**
+ * Returns the service endpoint URIs.
+ *
+ * @return the service endpoints
+ */
+ public ServiceEndpoints getServiceEndpoints() {
+ return serviceEndpoints;
+ }
+
+ /**
+ * Returns the HTTP configuration properties.
+ *
+ * @return the HTTP configuration
+ */
+ public HttpConfiguration getHttp() {
+ return http;
+ }
+
+ /**
+ * Returns the shared executor service for scheduling.
+ *
+ * @return the shared executor
+ */
+ public ScheduledExecutorService getSharedExecutor() {
+ return sharedExecutor;
+ }
+
+ /**
+ * Returns the diagnostic data accumulator.
+ *
+ * @return the diagnostic store, or null if diagnostics are disabled
+ */
+ public DiagnosticStore getDiagnosticStore() {
+ return diagnosticStore;
+ }
+
+ /**
+ * Returns the selector source.
+ *
+ * @return the selector source
+ */
+ public SelectorSource getSelectorSource() {
+ return selectorSource;
+ }
+}
diff --git a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/subsystems/DataSourceBuilder.java b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/subsystems/DataSourceBuilder.java
new file mode 100644
index 0000000..70fdeee
--- /dev/null
+++ b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/subsystems/DataSourceBuilder.java
@@ -0,0 +1,19 @@
+package com.launchdarkly.sdk.server.subsystems;
+
+
+/**
+ * Interface for building synchronizers and initializers.
+ *
+ * This class is not stable, and not subject to any backwards compatibility guarantees or semantic versioning.
+ * It is in early access. If you want access to this feature, please join the EAP. https://launchdarkly.com/docs/sdk/features/data-saving-mode
+ * @param
+ */
+public interface DataSourceBuilder {
+ /**
+ * Builds a data source instance based on the provided context.
+ *
+ * @param context the context for building the data source
+ * @return the built data source instance
+ */
+ TDataSource build(DataSourceBuildInputs context);
+}
diff --git a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/subsystems/DataSystemConfiguration.java b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/subsystems/DataSystemConfiguration.java
index 489cae8..271a819 100644
--- a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/subsystems/DataSystemConfiguration.java
+++ b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/subsystems/DataSystemConfiguration.java
@@ -1,6 +1,8 @@
package com.launchdarkly.sdk.server.subsystems;
import com.google.common.collect.ImmutableList;
+import com.launchdarkly.sdk.server.datasources.Initializer;
+import com.launchdarkly.sdk.server.datasources.Synchronizer;
/**
* Configuration for the SDK's data acquisition and storage strategy.
@@ -35,8 +37,8 @@ public enum DataStoreMode {
READ_WRITE
}
- private final ImmutableList> initializers;
- private final ImmutableList> synchronizers;
+ private final ImmutableList> initializers;
+ private final ImmutableList> synchronizers;
private final ComponentConfigurer fDv1FallbackSynchronizer;
private final ComponentConfigurer persistentStore;
private final DataStoreMode persistentDataStoreMode;
@@ -54,8 +56,8 @@ public enum DataStoreMode {
* @param persistentDataStoreMode see {@link #getPersistentDataStoreMode()}
*/
public DataSystemConfiguration(
- ImmutableList> initializers,
- ImmutableList> synchronizers,
+ ImmutableList> initializers,
+ ImmutableList> synchronizers,
ComponentConfigurer fDv1FallbackSynchronizer,
ComponentConfigurer persistentStore,
DataStoreMode persistentDataStoreMode) {
@@ -67,26 +69,26 @@ public DataSystemConfiguration(
}
/**
- * A list of factories for creating data sources for initialization.
- *
+ * A list of factories for creating initializers for initialization.
+ *
* @return the list of initializer configurers
*/
- public ImmutableList> getInitializers() {
+ public ImmutableList> getInitializers() {
return initializers;
}
/**
- * A list of factories for creating data sources for synchronization.
- *
+ * A list of factories for creating synchronizers for synchronization.
+ *
* @return the list of synchronizer configurers
*/
- public ImmutableList> getSynchronizers() {
+ public ImmutableList> getSynchronizers() {
return synchronizers;
}
/**
* A synchronizer to fall back to when FDv1 fallback has been requested.
- *
+ *
* @return the FDv1 fallback synchronizer configurer, or null
*/
public ComponentConfigurer getFDv1FallbackSynchronizer() {
@@ -98,7 +100,7 @@ public ComponentConfigurer getFDv1FallbackSynchronizer() {
* null.
*
* The persistent store itself will implement {@link PersistentDataStore}, but we expect that to be wrapped by a factory which can
- * operates at the {@link DataStore} level.
+ * operate at the {@link DataStore} level.
*
*
* @return the persistent store configurer, or null
diff --git a/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/ConfigurationTest.java b/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/ConfigurationTest.java
index 13aa825..76aa11e 100644
--- a/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/ConfigurationTest.java
+++ b/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/ConfigurationTest.java
@@ -1,10 +1,10 @@
package com.launchdarkly.sdk.server;
import com.launchdarkly.sdk.server.integrations.DataSystemBuilder;
-import com.launchdarkly.sdk.server.integrations.DataSystemComponents;
import com.launchdarkly.sdk.server.integrations.DataSystemModes;
-import com.launchdarkly.sdk.server.integrations.FDv2PollingDataSourceBuilder;
-import com.launchdarkly.sdk.server.integrations.FDv2StreamingDataSourceBuilder;
+import com.launchdarkly.sdk.server.integrations.FDv2PollingInitializerBuilder;
+import com.launchdarkly.sdk.server.integrations.FDv2PollingSynchronizerBuilder;
+import com.launchdarkly.sdk.server.integrations.FDv2StreamingSynchronizerBuilder;
import com.launchdarkly.sdk.server.integrations.MockPersistentDataStore;
import com.launchdarkly.sdk.server.integrations.PersistentDataStoreBuilder;
import com.launchdarkly.sdk.server.integrations.PollingDataSourceBuilder;
@@ -61,12 +61,12 @@ public void canConfigureDefaultDataSystem() {
DataSystemModes modes = new DataSystemModes();
DataSystemBuilder builder = modes.defaultMode();
DataSystemConfiguration dataSystemConfig = builder.build();
-
+
assertEquals(1, dataSystemConfig.getInitializers().size());
- assertTrue(dataSystemConfig.getInitializers().get(0) instanceof FDv2PollingDataSourceBuilder);
+ assertTrue(dataSystemConfig.getInitializers().get(0) instanceof FDv2PollingInitializerBuilder);
assertEquals(2, dataSystemConfig.getSynchronizers().size());
- assertTrue(dataSystemConfig.getSynchronizers().get(0) instanceof FDv2StreamingDataSourceBuilder);
- assertTrue(dataSystemConfig.getSynchronizers().get(1) instanceof FDv2PollingDataSourceBuilder);
+ assertTrue(dataSystemConfig.getSynchronizers().get(0) instanceof FDv2StreamingSynchronizerBuilder);
+ assertTrue(dataSystemConfig.getSynchronizers().get(1) instanceof FDv2PollingSynchronizerBuilder);
assertTrue(dataSystemConfig.getFDv1FallbackSynchronizer() instanceof PollingDataSourceBuilder);
assertNull(dataSystemConfig.getPersistentStore());
}
@@ -79,7 +79,7 @@ public void canConfigureStreamingDataSystem() {
assertTrue(dataSystemConfig.getInitializers().isEmpty());
assertEquals(1, dataSystemConfig.getSynchronizers().size());
- assertTrue(dataSystemConfig.getSynchronizers().get(0) instanceof FDv2StreamingDataSourceBuilder);
+ assertTrue(dataSystemConfig.getSynchronizers().get(0) instanceof FDv2StreamingSynchronizerBuilder);
assertTrue(dataSystemConfig.getFDv1FallbackSynchronizer() instanceof PollingDataSourceBuilder);
assertNull(dataSystemConfig.getPersistentStore());
}
@@ -89,10 +89,10 @@ public void canConfigurePollingDataSystem() {
DataSystemModes modes = new DataSystemModes();
DataSystemBuilder builder = modes.polling();
DataSystemConfiguration dataSystemConfig = builder.build();
-
+
assertTrue(dataSystemConfig.getInitializers().isEmpty());
assertEquals(1, dataSystemConfig.getSynchronizers().size());
- assertTrue(dataSystemConfig.getSynchronizers().get(0) instanceof FDv2PollingDataSourceBuilder);
+ assertTrue(dataSystemConfig.getSynchronizers().get(0) instanceof FDv2PollingSynchronizerBuilder);
assertTrue(dataSystemConfig.getFDv1FallbackSynchronizer() instanceof PollingDataSourceBuilder);
assertNull(dataSystemConfig.getPersistentStore());
}
@@ -122,16 +122,16 @@ public void canConfigurePersistentStoreDataSystem() {
ComponentConfigurer storeConfigurer = TestComponents.specificComponent(mockStore);
ComponentConfigurer dataStoreConfigurer = TestComponents.specificComponent(
Components.persistentDataStore(storeConfigurer).build(clientContextWithDataStoreUpdateSink()));
-
+
DataSystemModes modes = new DataSystemModes();
DataSystemBuilder builder = modes.persistentStore(dataStoreConfigurer);
DataSystemConfiguration dataSystemConfig = builder.build();
-
+
assertEquals(1, dataSystemConfig.getInitializers().size());
- assertTrue(dataSystemConfig.getInitializers().get(0) instanceof FDv2PollingDataSourceBuilder);
+ assertTrue(dataSystemConfig.getInitializers().get(0) instanceof FDv2PollingInitializerBuilder);
assertEquals(2, dataSystemConfig.getSynchronizers().size());
- assertTrue(dataSystemConfig.getSynchronizers().get(0) instanceof FDv2StreamingDataSourceBuilder);
- assertTrue(dataSystemConfig.getSynchronizers().get(1) instanceof FDv2PollingDataSourceBuilder);
+ assertTrue(dataSystemConfig.getSynchronizers().get(0) instanceof FDv2StreamingSynchronizerBuilder);
+ assertTrue(dataSystemConfig.getSynchronizers().get(1) instanceof FDv2PollingSynchronizerBuilder);
assertTrue(dataSystemConfig.getFDv1FallbackSynchronizer() instanceof PollingDataSourceBuilder);
assertNotNull(dataSystemConfig.getPersistentStore());
assertEquals(DataSystemConfiguration.DataStoreMode.READ_WRITE, dataSystemConfig.getPersistentDataStoreMode());
@@ -146,21 +146,21 @@ public void canConfigureCustomDataSystemWithAllOptions() {
DataSystemModes modes = new DataSystemModes();
DataSystemBuilder builder = modes.custom()
- .initializers(DataSystemComponents.polling())
- .synchronizers(DataSystemComponents.streaming(), DataSystemComponents.polling())
+ .initializers(DataSystemComponents.pollingInitializer())
+ .synchronizers(DataSystemComponents.streamingSynchronizer(), DataSystemComponents.pollingSynchronizer())
.fDv1FallbackSynchronizer(DataSystemComponents.fDv1Polling())
.persistentStore(dataStoreConfigurer, DataSystemConfiguration.DataStoreMode.READ_WRITE);
-
+
DataSystemConfiguration dataSystemConfig = builder.build();
// Verify initializers
assertEquals(1, dataSystemConfig.getInitializers().size());
- assertTrue(dataSystemConfig.getInitializers().get(0) instanceof FDv2PollingDataSourceBuilder);
+ assertTrue(dataSystemConfig.getInitializers().get(0) instanceof FDv2PollingInitializerBuilder);
// Verify synchronizers
assertEquals(2, dataSystemConfig.getSynchronizers().size());
- assertTrue(dataSystemConfig.getSynchronizers().get(0) instanceof FDv2StreamingDataSourceBuilder);
- assertTrue(dataSystemConfig.getSynchronizers().get(1) instanceof FDv2PollingDataSourceBuilder);
+ assertTrue(dataSystemConfig.getSynchronizers().get(0) instanceof FDv2StreamingSynchronizerBuilder);
+ assertTrue(dataSystemConfig.getSynchronizers().get(1) instanceof FDv2PollingSynchronizerBuilder);
// Verify FDv1 fallback
assertTrue(dataSystemConfig.getFDv1FallbackSynchronizer() instanceof PollingDataSourceBuilder);
@@ -174,52 +174,50 @@ public void canConfigureCustomDataSystemWithAllOptions() {
public void canReplaceInitializersInCustomDataSystem() {
DataSystemModes modes = new DataSystemModes();
DataSystemBuilder builder = modes.custom()
- .initializers(DataSystemComponents.polling())
- .replaceInitializers(DataSystemComponents.streaming());
-
+ .initializers(DataSystemComponents.pollingInitializer())
+ .replaceInitializers(DataSystemComponents.pollingInitializer());
+
DataSystemConfiguration dataSystemConfig = builder.build();
assertEquals(1, dataSystemConfig.getInitializers().size());
- assertTrue(dataSystemConfig.getInitializers().get(0) instanceof FDv2StreamingDataSourceBuilder);
+ assertTrue(dataSystemConfig.getInitializers().get(0) instanceof FDv2PollingInitializerBuilder);
}
@Test
public void canReplaceSynchronizersInCustomDataSystem() {
DataSystemModes modes = new DataSystemModes();
DataSystemBuilder builder = modes.custom()
- .synchronizers(DataSystemComponents.polling())
- .replaceSynchronizers(DataSystemComponents.streaming());
-
+ .synchronizers(DataSystemComponents.pollingSynchronizer())
+ .replaceSynchronizers(DataSystemComponents.streamingSynchronizer());
+
DataSystemConfiguration dataSystemConfig = builder.build();
assertEquals(1, dataSystemConfig.getSynchronizers().size());
- assertTrue(dataSystemConfig.getSynchronizers().get(0) instanceof FDv2StreamingDataSourceBuilder);
+ assertTrue(dataSystemConfig.getSynchronizers().get(0) instanceof FDv2StreamingSynchronizerBuilder);
}
@Test
public void canAddMultipleInitializersToCustomDataSystem() {
DataSystemModes modes = new DataSystemModes();
DataSystemBuilder builder = modes.custom()
- .initializers(DataSystemComponents.polling())
- .initializers(DataSystemComponents.streaming());
-
+ .initializers(DataSystemComponents.pollingInitializer())
+ .initializers(DataSystemComponents.pollingInitializer());
+
DataSystemConfiguration dataSystemConfig = builder.build();
assertEquals(2, dataSystemConfig.getInitializers().size());
- assertTrue(dataSystemConfig.getInitializers().get(0) instanceof FDv2PollingDataSourceBuilder);
- assertTrue(dataSystemConfig.getInitializers().get(1) instanceof FDv2StreamingDataSourceBuilder);
+ assertTrue(dataSystemConfig.getInitializers().get(0) instanceof FDv2PollingInitializerBuilder);
+ assertTrue(dataSystemConfig.getInitializers().get(1) instanceof FDv2PollingInitializerBuilder);
}
@Test
public void canAddMultipleSynchronizersToCustomDataSystem() {
DataSystemModes modes = new DataSystemModes();
DataSystemBuilder builder = modes.custom()
- .synchronizers(DataSystemComponents.polling())
- .synchronizers(DataSystemComponents.streaming())
- .synchronizers(DataSystemComponents.fDv1Polling());
-
+ .synchronizers(DataSystemComponents.pollingSynchronizer())
+ .synchronizers(DataSystemComponents.streamingSynchronizer());
+
DataSystemConfiguration dataSystemConfig = builder.build();
- assertEquals(3, dataSystemConfig.getSynchronizers().size());
- assertTrue(dataSystemConfig.getSynchronizers().get(0) instanceof FDv2PollingDataSourceBuilder);
- assertTrue(dataSystemConfig.getSynchronizers().get(1) instanceof FDv2StreamingDataSourceBuilder);
- assertTrue(dataSystemConfig.getSynchronizers().get(2) instanceof PollingDataSourceBuilder);
+ assertEquals(2, dataSystemConfig.getSynchronizers().size());
+ assertTrue(dataSystemConfig.getSynchronizers().get(0) instanceof FDv2PollingSynchronizerBuilder);
+ assertTrue(dataSystemConfig.getSynchronizers().get(1) instanceof FDv2StreamingSynchronizerBuilder);
}
@Test
@@ -262,7 +260,7 @@ public void canConfigureCustomDataSystemWithReadWritePersistentStore() {
DataSystemModes modes = new DataSystemModes();
DataSystemBuilder builder = modes.custom()
.persistentStore(dataStoreConfigurer, DataSystemConfiguration.DataStoreMode.READ_WRITE)
- .synchronizers(DataSystemComponents.streaming());
+ .synchronizers(DataSystemComponents.streamingSynchronizer());
DataSystemConfiguration dataSystemConfig = builder.build();
assertNotNull(dataSystemConfig.getPersistentStore());
diff --git a/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/DefaultFDv2RequestorTest.java b/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/DefaultFDv2RequestorTest.java
index ea3a77a..5ce321b 100644
--- a/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/DefaultFDv2RequestorTest.java
+++ b/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/DefaultFDv2RequestorTest.java
@@ -136,12 +136,12 @@ public void emptyEventsArray() throws Exception {
}
@Test
- public void requestWithVersionQueryParameter() throws Exception {
+ public void requestWithBasisQueryParameter() throws Exception {
Handler resp = Handlers.bodyJson(EMPTY_EVENTS_JSON);
try (HttpServer server = HttpServer.start(resp)) {
try (DefaultFDv2Requestor requestor = makeRequestor(server)) {
- Selector selector = Selector.make(42, null);
+ Selector selector = Selector.make(42, "test-state");
CompletableFuture future =
requestor.Poll(selector);
@@ -150,18 +150,18 @@ public void requestWithVersionQueryParameter() throws Exception {
RequestInfo req = server.getRecorder().requireRequest();
assertEquals(REQUEST_PATH, req.getPath());
- assertThat(req.getQuery(), containsString("version=42"));
+ assertThat(req.getQuery(), containsString("basis=test-state"));
}
}
}
@Test
- public void requestWithStateQueryParameter() throws Exception {
+ public void requestWithBasisContainingState() throws Exception {
Handler resp = Handlers.bodyJson(EMPTY_EVENTS_JSON);
try (HttpServer server = HttpServer.start(resp)) {
try (DefaultFDv2Requestor requestor = makeRequestor(server)) {
- Selector selector = Selector.make(0, "test-state");
+ Selector selector = Selector.make(0, "(p:payload-1:100)");
CompletableFuture future =
requestor.Poll(selector);
@@ -170,18 +170,18 @@ public void requestWithStateQueryParameter() throws Exception {
RequestInfo req = server.getRecorder().requireRequest();
assertEquals(REQUEST_PATH, req.getPath());
- assertThat(req.getQuery(), containsString("state=test-state"));
+ assertThat(req.getQuery(), containsString("basis=%28p%3Apayload-1%3A100%29"));
}
}
}
@Test
- public void requestWithBothQueryParameters() throws Exception {
+ public void requestWithComplexBasisState() throws Exception {
Handler resp = Handlers.bodyJson(EMPTY_EVENTS_JSON);
try (HttpServer server = HttpServer.start(resp)) {
try (DefaultFDv2Requestor requestor = makeRequestor(server)) {
- Selector selector = Selector.make(100, "my-state");
+ Selector selector = Selector.make(100, "(p:my-payload:200)");
CompletableFuture future =
requestor.Poll(selector);
@@ -190,8 +190,7 @@ public void requestWithBothQueryParameters() throws Exception {
RequestInfo req = server.getRecorder().requireRequest();
assertEquals(REQUEST_PATH, req.getPath());
- assertThat(req.getQuery(), containsString("version=100"));
- assertThat(req.getQuery(), containsString("state=my-state"));
+ assertThat(req.getQuery(), containsString("basis=%28p%3Amy-payload%3A200%29"));
}
}
}
@@ -403,8 +402,8 @@ public void differentSelectorsUseDifferentEtags() throws Exception {
try (HttpServer server = HttpServer.start(resp)) {
try (DefaultFDv2Requestor requestor = makeRequestor(server)) {
- Selector selector1 = Selector.make(100, "state1");
- Selector selector2 = Selector.make(200, "state2");
+ Selector selector1 = Selector.make(100, "(p:payload-1:100)");
+ Selector selector2 = Selector.make(200, "(p:payload-2:200)");
// First request with selector1
requestor.Poll(selector1).get(5, TimeUnit.SECONDS);
diff --git a/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/StreamingSynchronizerImplTest.java b/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/StreamingSynchronizerImplTest.java
index b720a26..eaf305c 100644
--- a/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/StreamingSynchronizerImplTest.java
+++ b/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/StreamingSynchronizerImplTest.java
@@ -403,11 +403,9 @@ public void selectorWithVersionAndState() throws Exception {
// Verify selector was fetched when connecting
verify(selectorSource, atLeastOnce()).getSelector();
- // Verify the request had the correct query parameters
assertEquals(1, server.getRecorder().count());
RequestInfo request = server.getRecorder().requireRequest();
- assertThat(request.getQuery(), containsString("version=50"));
- assertThat(request.getQuery(), containsString("state="));
+ assertThat(request.getQuery(), containsString("basis=%28p%3Aold%3A50%29"));
synchronizer.close();
}
@@ -536,7 +534,7 @@ public void selectorWithVersionOnly() throws Exception {
HttpProperties httpProperties = toHttpProperties(clientContext("sdk-key", baseConfig().build()).getHttp());
SelectorSource selectorSource = mock(SelectorSource.class);
- when(selectorSource.getSelector()).thenReturn(Selector.make(75, null));
+ when(selectorSource.getSelector()).thenReturn(Selector.make(75, "(p:test:75)"));
StreamingSynchronizerImpl synchronizer = new StreamingSynchronizerImpl(
httpProperties,
@@ -554,11 +552,9 @@ public void selectorWithVersionOnly() throws Exception {
assertNotNull(result);
assertEquals(FDv2SourceResult.ResultType.CHANGE_SET, result.getResultType());
- // Verify the request had version but not state parameter
assertEquals(1, server.getRecorder().count());
RequestInfo request = server.getRecorder().requireRequest();
- assertThat(request.getQuery(), containsString("version=75"));
- // State should not be present (or if present, not have an actual state value)
+ assertThat(request.getQuery(), containsString("basis=%28p%3Atest%3A75%29"));
synchronizer.close();
}
@@ -578,7 +574,7 @@ public void selectorWithEmptyState() throws Exception {
HttpProperties httpProperties = toHttpProperties(clientContext("sdk-key", baseConfig().build()).getHttp());
SelectorSource selectorSource = mock(SelectorSource.class);
- when(selectorSource.getSelector()).thenReturn(Selector.make(80, ""));
+ when(selectorSource.getSelector()).thenReturn(Selector.make(80, "(p:empty-test:80)"));
StreamingSynchronizerImpl synchronizer = new StreamingSynchronizerImpl(
httpProperties,
@@ -596,10 +592,9 @@ public void selectorWithEmptyState() throws Exception {
assertNotNull(result);
assertEquals(FDv2SourceResult.ResultType.CHANGE_SET, result.getResultType());
- // Verify the request had version but not state parameter (empty string shouldn't add state)
assertEquals(1, server.getRecorder().count());
RequestInfo request = server.getRecorder().requireRequest();
- assertThat(request.getQuery(), containsString("version=80"));
+ assertThat(request.getQuery(), containsString("basis=%28p%3Aempty-test%3A80%29"));
synchronizer.close();
}
@@ -742,12 +737,10 @@ public void payloadFilterWithSelectorBothAddedToRequest() throws Exception {
assertNotNull(result);
assertEquals(FDv2SourceResult.ResultType.CHANGE_SET, result.getResultType());
- // Verify the request had both filter and selector parameters
assertEquals(1, server.getRecorder().count());
RequestInfo request = server.getRecorder().requireRequest();
assertThat(request.getQuery(), containsString("filter=testFilter"));
- assertThat(request.getQuery(), containsString("version=42"));
- assertThat(request.getQuery(), containsString("state="));
+ assertThat(request.getQuery(), containsString("basis=%28p%3Atest%3A42%29"));
synchronizer.close();
}