Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file is a move because of package boundaries. Not sure how we want them to shake out.

The HttpConfig -> HttpProperties I think was the missing component, and moving that around created further complications.

Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
package com.launchdarkly.sdk.server;

import com.launchdarkly.sdk.server.datasources.Initializer;
import com.launchdarkly.sdk.server.datasources.Synchronizer;
import com.launchdarkly.sdk.server.integrations.FDv2PollingInitializerBuilder;
import com.launchdarkly.sdk.server.integrations.FDv2PollingSynchronizerBuilder;
import com.launchdarkly.sdk.server.integrations.FDv2StreamingSynchronizerBuilder;
import com.launchdarkly.sdk.server.integrations.PollingDataSourceBuilder;
import com.launchdarkly.sdk.server.interfaces.ServiceEndpoints;
import com.launchdarkly.sdk.server.subsystems.DataSourceBuilderContext;

import java.net.URI;

import static com.launchdarkly.sdk.server.ComponentsImpl.toHttpProperties;

/**
* Components for use with the data system.
* <p>
* This class is not stable, and not subject to any backwards compatibility guarantees or semantic versioning.
* It is in early access. If you want access to this feature please join the EAP. https://launchdarkly.com/docs/sdk/features/data-saving-mode
* </p>
*/
public final class DataSystemComponents {

static class FDv2PollingInitializerBuilderImpl extends FDv2PollingInitializerBuilder {
@Override
public Initializer build(DataSourceBuilderContext context) {
ServiceEndpoints endpoints = serviceEndpointsOverride != null
? serviceEndpointsOverride
: context.getServiceEndpoints();
URI configuredBaseUri = StandardEndpoints.selectBaseUri(
endpoints.getPollingBaseUri(),
StandardEndpoints.DEFAULT_POLLING_BASE_URI,
"Polling",
context.getBaseLogger());

DefaultFDv2Requestor requestor = new DefaultFDv2Requestor(
toHttpProperties(context.getHttp()),
configuredBaseUri,
StandardEndpoints.FDV2_POLLING_REQUEST_PATH,
context.getBaseLogger());

return new PollingInitializerImpl(
requestor,
context.getBaseLogger(),
context.getSelectorSource()
);
}
}

static class FDv2PollingSynchronizerBuilderImpl extends FDv2PollingSynchronizerBuilder {
@Override
public Synchronizer build(DataSourceBuilderContext context) {
ServiceEndpoints endpoints = serviceEndpointsOverride != null
? serviceEndpointsOverride
: context.getServiceEndpoints();
URI configuredBaseUri = StandardEndpoints.selectBaseUri(
endpoints.getPollingBaseUri(),
StandardEndpoints.DEFAULT_POLLING_BASE_URI,
"Polling",
context.getBaseLogger());

DefaultFDv2Requestor requestor = new DefaultFDv2Requestor(
toHttpProperties(context.getHttp()),
configuredBaseUri,
StandardEndpoints.FDV2_POLLING_REQUEST_PATH,
context.getBaseLogger());

return new PollingSynchronizerImpl(
requestor,
context.getBaseLogger(),
context.getSelectorSource(),
context.getSharedExecutor(),
pollInterval
);
}
}

static class FDv2StreamingSynchronizerBuilderImpl extends FDv2StreamingSynchronizerBuilder {
@Override
public Synchronizer build(DataSourceBuilderContext context) {
ServiceEndpoints endpoints = serviceEndpointsOverride != null
? serviceEndpointsOverride
: context.getServiceEndpoints();
URI configuredBaseUri = StandardEndpoints.selectBaseUri(
endpoints.getStreamingBaseUri(),
StandardEndpoints.DEFAULT_STREAMING_BASE_URI,
"Streaming",
context.getBaseLogger());

return new StreamingSynchronizerImpl(
toHttpProperties(context.getHttp()),
configuredBaseUri,
StandardEndpoints.FDV2_STREAMING_REQUEST_PATH,
context.getBaseLogger(),
context.getSelectorSource(),
null,
initialReconnectDelay
);
}
}

private DataSystemComponents() {}

/**
* Get a builder for a polling initializer.
*
* @return the polling initializer builder
*/
public static FDv2PollingInitializerBuilder pollingInitializer() {
return new FDv2PollingInitializerBuilderImpl();
}

/**
* Get a builder for a polling synchronizer.
*
* @return the polling synchronizer builder
*/
public static FDv2PollingSynchronizerBuilder pollingSynchronizer() {
return new FDv2PollingSynchronizerBuilderImpl();
}

/**
* Get a builder for a streaming synchronizer.
*
* @return the streaming synchronizer builder
*/
public static FDv2StreamingSynchronizerBuilder streamingSynchronizer() {
return new FDv2StreamingSynchronizerBuilderImpl();
}

/**
* Get a builder for a FDv1 compatible polling data source.
* <p>
* This is intended for use as a fallback.
* </p>
*
* @return the FDv1 compatible polling data source builder
*/
public static PollingDataSourceBuilder fDv1Polling() {
return Components.pollingDataSource();
}
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was a mistake in the implementation. So both polling and streaming have this update.

Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@
* Implementation of FDv2Requestor for polling feature flag data via FDv2 protocol.
*/
public class DefaultFDv2Requestor implements FDv2Requestor, Closeable {
private static final String VERSION_QUERY_PARAM = "version";
private static final String STATE_QUERY_PARAM = "state";
private static final String BASIS_QUERY_PARAM = "basis";

private final OkHttpClient httpClient;
private final URI pollingUri;
Expand Down Expand Up @@ -67,11 +66,7 @@ public CompletableFuture<FDv2PayloadResponse> Poll(Selector selector) {
URI requestUri = pollingUri;

if (!selector.isEmpty()) {
requestUri = HttpHelpers.addQueryParam(requestUri, VERSION_QUERY_PARAM, String.valueOf(selector.getVersion()));
}

if (selector.getState() != null && !selector.getState().isEmpty()) {
requestUri = HttpHelpers.addQueryParam(requestUri, STATE_QUERY_PARAM, selector.getState());
requestUri = HttpHelpers.addQueryParam(requestUri, BASIS_QUERY_PARAM, selector.getState());
}

logger.debug("Making FDv2 polling request to: {}", requestUri);
Expand Down
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still lots of work to do here. Outside the scope of this PR.

Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package com.launchdarkly.sdk.server;

import com.google.common.collect.ImmutableList;
import com.launchdarkly.sdk.server.datasources.FDv2SourceResult;
import com.launchdarkly.sdk.server.datasources.Initializer;
import com.launchdarkly.sdk.server.datasources.Synchronizer;
import com.launchdarkly.sdk.server.interfaces.DataSourceStatusProvider;
import com.launchdarkly.sdk.server.subsystems.DataSource;
import com.launchdarkly.sdk.server.subsystems.DataSourceUpdateSink;
import com.launchdarkly.sdk.server.subsystems.DataSourceUpdateSinkV2;

import java.io.Closeable;
import java.io.IOException;
Expand All @@ -18,10 +19,10 @@
import java.util.stream.Collectors;

class FDv2DataSource implements DataSource {
private final List<InitializerFactory> initializers;
private final List<DataSourceFactory<Initializer>> initializers;
private final List<SynchronizerFactoryWithState> synchronizers;

private final DataSourceUpdateSink dataSourceUpdates;
private final DataSourceUpdateSinkV2 dataSourceUpdates;

private final CompletableFuture<Boolean> startFuture = new CompletableFuture<>();
private final AtomicBoolean started = new AtomicBoolean(false);
Expand All @@ -46,12 +47,12 @@ public enum State {
Blocked
}

private final SynchronizerFactory factory;
private final DataSourceFactory<Synchronizer> factory;

private State state = State.Available;


public SynchronizerFactoryWithState(SynchronizerFactory factory) {
public SynchronizerFactoryWithState(DataSourceFactory<Synchronizer> factory) {
this.factory = factory;
}

Expand All @@ -68,19 +69,15 @@ public Synchronizer build() {
}
}

public interface InitializerFactory {
Initializer build();
}

public interface SynchronizerFactory {
Synchronizer build();
public interface DataSourceFactory<T> {
T build();
}


public FDv2DataSource(
List<InitializerFactory> initializers,
List<SynchronizerFactory> synchronizers,
DataSourceUpdateSink dataSourceUpdates
ImmutableList<DataSourceFactory<Initializer>> initializers,
ImmutableList<DataSourceFactory<Synchronizer>> synchronizers,
DataSourceUpdateSinkV2 dataSourceUpdates
) {
this.initializers = initializers;
this.synchronizers = synchronizers
Expand Down Expand Up @@ -116,6 +113,40 @@ private SynchronizerFactoryWithState getFirstAvailableSynchronizer() {
}
}

private void runInitializers() {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved this for better logical organization.

boolean anyDataReceived = false;
for (DataSourceFactory<Initializer> factory : initializers) {
try {
Initializer initializer = factory.build();
if (setActiveSource(initializer)) return;
FDv2SourceResult result = initializer.run().get();
switch (result.getResultType()) {
case CHANGE_SET:
dataSourceUpdates.apply(result.getChangeSet());
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actual change to the method. Apply the data.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ignored return value of apply causes incorrect initialization

Medium Severity

The return value of dataSourceUpdates.apply() is ignored in both runInitializers and runSynchronizers. The apply() method returns false when the store operation fails, but the code proceeds to set anyDataReceived = true and complete startFuture with true regardless. This causes the SDK to report successful initialization even when data failed to persist to the store, leading to an inconsistent state where isInitialized() returns true but the store lacks the expected data.

Additional Locations (1)

Fix in Cursor Fix in Web

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am going to ignore this for now, until we work on other FDv2DataSource updates.

anyDataReceived = true;
if (!result.getChangeSet().getSelector().isEmpty()) {
// We received data with a selector, so we end the initialization process.
dataSourceUpdates.updateStatus(DataSourceStatusProvider.State.VALID, null);
startFuture.complete(true);
return;
}
break;
case STATUS:
// TODO: Implement.
break;
}
} catch (ExecutionException | InterruptedException | CancellationException e) {
// TODO: Log.
}
}
// We received data without a selector, and we have exhausted initializers, so we are going to
// consider ourselves initialized.
if (anyDataReceived) {
dataSourceUpdates.updateStatus(DataSourceStatusProvider.State.VALID, null);
startFuture.complete(true);
}
}

private void runSynchronizers() {
SynchronizerFactoryWithState availableSynchronizer = getFirstAvailableSynchronizer();
// TODO: Add recovery handling. If there are no available synchronizers, but there are
Expand All @@ -130,7 +161,7 @@ private void runSynchronizers() {
FDv2SourceResult result = synchronizer.next().get();
switch (result.getResultType()) {
case CHANGE_SET:
// TODO: Apply to the store.
dataSourceUpdates.apply(result.getChangeSet());
// This could have been completed by any data source. But if it has not been completed before
// now, then we complete it.
startFuture.complete(true);
Expand Down Expand Up @@ -186,40 +217,6 @@ private boolean setActiveSource(Closeable synchronizer) {
return false;
}

private void runInitializers() {
boolean anyDataReceived = false;
for (InitializerFactory factory : initializers) {
try {
Initializer initializer = factory.build();
if (setActiveSource(initializer)) return;
FDv2SourceResult res = initializer.run().get();
switch (res.getResultType()) {
case CHANGE_SET:
// TODO: Apply to the store.
anyDataReceived = true;
if (!res.getChangeSet().getSelector().isEmpty()) {
// We received data with a selector, so we end the initialization process.
dataSourceUpdates.updateStatus(DataSourceStatusProvider.State.VALID, null);
startFuture.complete(true);
return;
}
break;
case STATUS:
// TODO: Implement.
break;
}
} catch (ExecutionException | InterruptedException | CancellationException e) {
// TODO: Log.
}
}
// We received data without a selector, and we have exhausted initializers, so we are going to
// consider ourselves initialized.
if (anyDataReceived) {
dataSourceUpdates.updateStatus(DataSourceStatusProvider.State.VALID, null);
startFuture.complete(true);
}
}

@Override
public Future<Void> start() {
if (!started.getAndSet(true)) {
Expand Down
Loading