-
Notifications
You must be signed in to change notification settings - Fork 11
chore: Connect FDv2 data system. #108
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: rlamb/streaming-synchronizer
Are you sure you want to change the base?
Changes from all commits
91d2cb9
d610988
dab1ef1
68c490f
7bb6394
ae39996
02c4c98
776b4c6
d9dc5f0
20ec764
47e30b9
dd1146b
38f90c2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,144 @@ | ||
| package com.launchdarkly.sdk.server; | ||
|
|
||
| import com.launchdarkly.sdk.server.datasources.Initializer; | ||
| import com.launchdarkly.sdk.server.datasources.Synchronizer; | ||
| import com.launchdarkly.sdk.server.integrations.FDv2PollingInitializerBuilder; | ||
| import com.launchdarkly.sdk.server.integrations.FDv2PollingSynchronizerBuilder; | ||
| import com.launchdarkly.sdk.server.integrations.FDv2StreamingSynchronizerBuilder; | ||
| import com.launchdarkly.sdk.server.integrations.PollingDataSourceBuilder; | ||
| import com.launchdarkly.sdk.server.interfaces.ServiceEndpoints; | ||
| import com.launchdarkly.sdk.server.subsystems.DataSourceBuilderContext; | ||
|
|
||
| import java.net.URI; | ||
|
|
||
| import static com.launchdarkly.sdk.server.ComponentsImpl.toHttpProperties; | ||
|
|
||
| /** | ||
| * Components for use with the data system. | ||
| * <p> | ||
| * This class is not stable, and not subject to any backwards compatibility guarantees or semantic versioning. | ||
| * It is in early access. If you want access to this feature please join the EAP. https://launchdarkly.com/docs/sdk/features/data-saving-mode | ||
| * </p> | ||
| */ | ||
| public final class DataSystemComponents { | ||
|
|
||
| static class FDv2PollingInitializerBuilderImpl extends FDv2PollingInitializerBuilder { | ||
| @Override | ||
| public Initializer build(DataSourceBuilderContext context) { | ||
| ServiceEndpoints endpoints = serviceEndpointsOverride != null | ||
| ? serviceEndpointsOverride | ||
| : context.getServiceEndpoints(); | ||
| URI configuredBaseUri = StandardEndpoints.selectBaseUri( | ||
| endpoints.getPollingBaseUri(), | ||
| StandardEndpoints.DEFAULT_POLLING_BASE_URI, | ||
| "Polling", | ||
| context.getBaseLogger()); | ||
|
|
||
| DefaultFDv2Requestor requestor = new DefaultFDv2Requestor( | ||
| toHttpProperties(context.getHttp()), | ||
| configuredBaseUri, | ||
| StandardEndpoints.FDV2_POLLING_REQUEST_PATH, | ||
| context.getBaseLogger()); | ||
|
|
||
| return new PollingInitializerImpl( | ||
| requestor, | ||
| context.getBaseLogger(), | ||
| context.getSelectorSource() | ||
| ); | ||
| } | ||
| } | ||
|
|
||
| static class FDv2PollingSynchronizerBuilderImpl extends FDv2PollingSynchronizerBuilder { | ||
| @Override | ||
| public Synchronizer build(DataSourceBuilderContext context) { | ||
| ServiceEndpoints endpoints = serviceEndpointsOverride != null | ||
| ? serviceEndpointsOverride | ||
| : context.getServiceEndpoints(); | ||
| URI configuredBaseUri = StandardEndpoints.selectBaseUri( | ||
| endpoints.getPollingBaseUri(), | ||
| StandardEndpoints.DEFAULT_POLLING_BASE_URI, | ||
| "Polling", | ||
| context.getBaseLogger()); | ||
|
|
||
| DefaultFDv2Requestor requestor = new DefaultFDv2Requestor( | ||
| toHttpProperties(context.getHttp()), | ||
| configuredBaseUri, | ||
| StandardEndpoints.FDV2_POLLING_REQUEST_PATH, | ||
| context.getBaseLogger()); | ||
|
|
||
| return new PollingSynchronizerImpl( | ||
| requestor, | ||
| context.getBaseLogger(), | ||
| context.getSelectorSource(), | ||
| context.getSharedExecutor(), | ||
| pollInterval | ||
| ); | ||
| } | ||
| } | ||
|
|
||
| static class FDv2StreamingSynchronizerBuilderImpl extends FDv2StreamingSynchronizerBuilder { | ||
| @Override | ||
| public Synchronizer build(DataSourceBuilderContext context) { | ||
| ServiceEndpoints endpoints = serviceEndpointsOverride != null | ||
| ? serviceEndpointsOverride | ||
| : context.getServiceEndpoints(); | ||
| URI configuredBaseUri = StandardEndpoints.selectBaseUri( | ||
| endpoints.getStreamingBaseUri(), | ||
| StandardEndpoints.DEFAULT_STREAMING_BASE_URI, | ||
| "Streaming", | ||
| context.getBaseLogger()); | ||
|
|
||
| return new StreamingSynchronizerImpl( | ||
| toHttpProperties(context.getHttp()), | ||
| configuredBaseUri, | ||
| StandardEndpoints.FDV2_STREAMING_REQUEST_PATH, | ||
| context.getBaseLogger(), | ||
| context.getSelectorSource(), | ||
| null, | ||
| initialReconnectDelay | ||
| ); | ||
| } | ||
| } | ||
|
|
||
| private DataSystemComponents() {} | ||
|
|
||
| /** | ||
| * Get a builder for a polling initializer. | ||
| * | ||
| * @return the polling initializer builder | ||
| */ | ||
| public static FDv2PollingInitializerBuilder pollingInitializer() { | ||
| return new FDv2PollingInitializerBuilderImpl(); | ||
| } | ||
|
|
||
| /** | ||
| * Get a builder for a polling synchronizer. | ||
| * | ||
| * @return the polling synchronizer builder | ||
| */ | ||
| public static FDv2PollingSynchronizerBuilder pollingSynchronizer() { | ||
| return new FDv2PollingSynchronizerBuilderImpl(); | ||
| } | ||
|
|
||
| /** | ||
| * Get a builder for a streaming synchronizer. | ||
| * | ||
| * @return the streaming synchronizer builder | ||
| */ | ||
| public static FDv2StreamingSynchronizerBuilder streamingSynchronizer() { | ||
| return new FDv2StreamingSynchronizerBuilderImpl(); | ||
| } | ||
|
|
||
| /** | ||
| * Get a builder for a FDv1 compatible polling data source. | ||
| * <p> | ||
| * This is intended for use as a fallback. | ||
| * </p> | ||
| * | ||
| * @return the FDv1 compatible polling data source builder | ||
| */ | ||
| public static PollingDataSourceBuilder fDv1Polling() { | ||
| return Components.pollingDataSource(); | ||
| } | ||
| } | ||
|
|
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This was a mistake in the implementation. So both polling and streaming have this update. |
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Still lots of work to do here. Outside the scope of this PR. |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,11 +1,12 @@ | ||
| package com.launchdarkly.sdk.server; | ||
|
|
||
| import com.google.common.collect.ImmutableList; | ||
| import com.launchdarkly.sdk.server.datasources.FDv2SourceResult; | ||
| import com.launchdarkly.sdk.server.datasources.Initializer; | ||
| import com.launchdarkly.sdk.server.datasources.Synchronizer; | ||
| import com.launchdarkly.sdk.server.interfaces.DataSourceStatusProvider; | ||
| import com.launchdarkly.sdk.server.subsystems.DataSource; | ||
| import com.launchdarkly.sdk.server.subsystems.DataSourceUpdateSink; | ||
| import com.launchdarkly.sdk.server.subsystems.DataSourceUpdateSinkV2; | ||
|
|
||
| import java.io.Closeable; | ||
| import java.io.IOException; | ||
|
|
@@ -18,10 +19,10 @@ | |
| import java.util.stream.Collectors; | ||
|
|
||
| class FDv2DataSource implements DataSource { | ||
| private final List<InitializerFactory> initializers; | ||
| private final List<DataSourceFactory<Initializer>> initializers; | ||
| private final List<SynchronizerFactoryWithState> synchronizers; | ||
|
|
||
| private final DataSourceUpdateSink dataSourceUpdates; | ||
| private final DataSourceUpdateSinkV2 dataSourceUpdates; | ||
|
|
||
| private final CompletableFuture<Boolean> startFuture = new CompletableFuture<>(); | ||
| private final AtomicBoolean started = new AtomicBoolean(false); | ||
|
|
@@ -46,12 +47,12 @@ public enum State { | |
| Blocked | ||
| } | ||
|
|
||
| private final SynchronizerFactory factory; | ||
| private final DataSourceFactory<Synchronizer> factory; | ||
|
|
||
| private State state = State.Available; | ||
|
|
||
|
|
||
| public SynchronizerFactoryWithState(SynchronizerFactory factory) { | ||
| public SynchronizerFactoryWithState(DataSourceFactory<Synchronizer> factory) { | ||
| this.factory = factory; | ||
| } | ||
|
|
||
|
|
@@ -68,19 +69,15 @@ public Synchronizer build() { | |
| } | ||
| } | ||
|
|
||
| public interface InitializerFactory { | ||
| Initializer build(); | ||
| } | ||
|
|
||
| public interface SynchronizerFactory { | ||
| Synchronizer build(); | ||
| public interface DataSourceFactory<T> { | ||
| T build(); | ||
| } | ||
|
|
||
|
|
||
| public FDv2DataSource( | ||
| List<InitializerFactory> initializers, | ||
| List<SynchronizerFactory> synchronizers, | ||
| DataSourceUpdateSink dataSourceUpdates | ||
| ImmutableList<DataSourceFactory<Initializer>> initializers, | ||
| ImmutableList<DataSourceFactory<Synchronizer>> synchronizers, | ||
| DataSourceUpdateSinkV2 dataSourceUpdates | ||
| ) { | ||
| this.initializers = initializers; | ||
| this.synchronizers = synchronizers | ||
|
|
@@ -116,6 +113,40 @@ private SynchronizerFactoryWithState getFirstAvailableSynchronizer() { | |
| } | ||
| } | ||
|
|
||
| private void runInitializers() { | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I moved this for better logical organization. |
||
| boolean anyDataReceived = false; | ||
| for (DataSourceFactory<Initializer> factory : initializers) { | ||
| try { | ||
| Initializer initializer = factory.build(); | ||
| if (setActiveSource(initializer)) return; | ||
| FDv2SourceResult result = initializer.run().get(); | ||
| switch (result.getResultType()) { | ||
| case CHANGE_SET: | ||
| dataSourceUpdates.apply(result.getChangeSet()); | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actual change to the method. Apply the data. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ignored return value of apply causes incorrect initializationMedium Severity The return value of Additional Locations (1)
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am going to ignore this for now, until we work on other FDv2DataSource updates. |
||
| anyDataReceived = true; | ||
| if (!result.getChangeSet().getSelector().isEmpty()) { | ||
| // We received data with a selector, so we end the initialization process. | ||
| dataSourceUpdates.updateStatus(DataSourceStatusProvider.State.VALID, null); | ||
| startFuture.complete(true); | ||
| return; | ||
| } | ||
| break; | ||
| case STATUS: | ||
| // TODO: Implement. | ||
| break; | ||
| } | ||
| } catch (ExecutionException | InterruptedException | CancellationException e) { | ||
| // TODO: Log. | ||
| } | ||
| } | ||
| // We received data without a selector, and we have exhausted initializers, so we are going to | ||
| // consider ourselves initialized. | ||
| if (anyDataReceived) { | ||
| dataSourceUpdates.updateStatus(DataSourceStatusProvider.State.VALID, null); | ||
| startFuture.complete(true); | ||
| } | ||
| } | ||
|
|
||
| private void runSynchronizers() { | ||
| SynchronizerFactoryWithState availableSynchronizer = getFirstAvailableSynchronizer(); | ||
| // TODO: Add recovery handling. If there are no available synchronizers, but there are | ||
|
|
@@ -130,7 +161,7 @@ private void runSynchronizers() { | |
| FDv2SourceResult result = synchronizer.next().get(); | ||
| switch (result.getResultType()) { | ||
| case CHANGE_SET: | ||
| // TODO: Apply to the store. | ||
| dataSourceUpdates.apply(result.getChangeSet()); | ||
| // This could have been completed by any data source. But if it has not been completed before | ||
| // now, then we complete it. | ||
| startFuture.complete(true); | ||
|
|
@@ -186,40 +217,6 @@ private boolean setActiveSource(Closeable synchronizer) { | |
| return false; | ||
| } | ||
|
|
||
| private void runInitializers() { | ||
| boolean anyDataReceived = false; | ||
| for (InitializerFactory factory : initializers) { | ||
| try { | ||
| Initializer initializer = factory.build(); | ||
| if (setActiveSource(initializer)) return; | ||
| FDv2SourceResult res = initializer.run().get(); | ||
| switch (res.getResultType()) { | ||
| case CHANGE_SET: | ||
| // TODO: Apply to the store. | ||
| anyDataReceived = true; | ||
| if (!res.getChangeSet().getSelector().isEmpty()) { | ||
| // We received data with a selector, so we end the initialization process. | ||
| dataSourceUpdates.updateStatus(DataSourceStatusProvider.State.VALID, null); | ||
| startFuture.complete(true); | ||
| return; | ||
| } | ||
| break; | ||
| case STATUS: | ||
| // TODO: Implement. | ||
| break; | ||
| } | ||
| } catch (ExecutionException | InterruptedException | CancellationException e) { | ||
| // TODO: Log. | ||
| } | ||
| } | ||
| // We received data without a selector, and we have exhausted initializers, so we are going to | ||
| // consider ourselves initialized. | ||
| if (anyDataReceived) { | ||
| dataSourceUpdates.updateStatus(DataSourceStatusProvider.State.VALID, null); | ||
| startFuture.complete(true); | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public Future<Void> start() { | ||
| if (!started.getAndSet(true)) { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This file is a move because of package boundaries. Not sure how we want them to shake out.
The HttpConfig -> HttpProperties I think was the missing component, and moving that around created further complications.