diff --git a/core/pkg/evaluator/json.go b/core/pkg/evaluator/json.go index bb3cdc223..b7242a89e 100644 --- a/core/pkg/evaluator/json.go +++ b/core/pkg/evaluator/json.go @@ -59,6 +59,14 @@ type flagdProperties struct { type variantEvaluator func(context.Context, string, string, map[string]any) ( variant string, variants map[string]interface{}, reason string, metadata map[string]interface{}, error error) +// WithStrictValidation enables strict schema validation. When enabled, flag configurations +// that do not conform to the schema will be rejected with an error instead of accepted with a warning. +func WithStrictValidation() JSONEvaluatorOption { + return func(je *JSON) { + je.strictValidation = true + } +} + // Deprecated - this will be remove in the next release func WithEvaluator(name string, evalFunc func(interface{}, interface{}) interface{}) JSONEvaluatorOption { return func(_ *JSON) { @@ -68,10 +76,11 @@ func WithEvaluator(name string, evalFunc func(interface{}, interface{}) interfac // JSON evaluator type JSON struct { - store store.IStore - Logger *logger.Logger - jsonEvalTracer trace.Tracer - jsonSchema *jsonschema.Schema + store store.IStore + Logger *logger.Logger + jsonEvalTracer trace.Tracer + jsonSchema *jsonschema.Schema + strictValidation bool Resolver } @@ -467,6 +476,10 @@ func (je *JSON) configToFlagDefinition(config string, definition *Definition) er return fmt.Errorf("failed to unmarshal JSON string: %v", err) } if err := je.jsonSchema.Validate(inst); err != nil { + if je.strictValidation { + return fmt.Errorf( + "flag definition does not conform to the schema; validation errors: %w", err) + } je.Logger.Warn(fmt.Sprintf( "flag definition does not conform to the schema; validation errors: %s", err), ) diff --git a/core/pkg/evaluator/json_test.go b/core/pkg/evaluator/json_test.go index 14993f0e0..a370b6a38 100644 --- a/core/pkg/evaluator/json_test.go +++ b/core/pkg/evaluator/json_test.go @@ -443,6 +443,57 @@ func TestSetState_Valid_NoError(t *testing.T) { } } +func TestSetState_StrictValidation_InvalidFlags_ReturnsError(t *testing.T) { + evaluator := flagdEvaluator.NewJSON( + logger.NewLogger(nil, false), store.NewFlags(), flagdEvaluator.WithStrictValidation(), + ) + + // set state with an invalid flag definition should return error in strict mode + err := evaluator.SetState(sync.DataSync{FlagData: InvalidFlags, Source: "testSource"}) + require.Error(t, err) + assert.Contains(t, err.Error(), "flag definition does not conform to the schema") +} + +func TestSetState_StrictValidation_ValidFlags_NoError(t *testing.T) { + evaluator := flagdEvaluator.NewJSON( + logger.NewLogger(nil, false), store.NewFlags(), flagdEvaluator.WithStrictValidation(), + ) + + // set state with a valid flag definition should succeed in strict mode + err := evaluator.SetState(sync.DataSync{FlagData: ValidFlags, Source: "testSource"}) + require.NoError(t, err) +} + +func TestSetState_StrictValidation_PreservesExistingState(t *testing.T) { + evaluator := flagdEvaluator.NewJSON( + logger.NewLogger(nil, false), store.NewFlags(), flagdEvaluator.WithStrictValidation(), + ) + + // first, load valid flags + err := evaluator.SetState(sync.DataSync{FlagData: ValidFlags, Source: "testSource"}) + require.NoError(t, err) + + // verify the flag is accessible + val := evaluator.ResolveAsAnyValue(context.Background(), "", ValidFlag, nil) + require.NoError(t, val.Error) + + // now try to load invalid flags - should fail + err = evaluator.SetState(sync.DataSync{FlagData: InvalidFlags, Source: "testSource"}) + require.Error(t, err) + + // verify the original valid flag is still accessible (store was not modified) + val = evaluator.ResolveAsAnyValue(context.Background(), "", ValidFlag, nil) + require.NoError(t, val.Error) +} + +func TestSetState_WithoutStrictValidation_InvalidFlags_NoError(t *testing.T) { + // without strict validation, invalid flags should still be accepted (backward compatible) + evaluator := flagdEvaluator.NewJSON(logger.NewLogger(nil, false), store.NewFlags()) + + err := evaluator.SetState(sync.DataSync{FlagData: InvalidFlags, Source: "testSource"}) + require.NoError(t, err) +} + func TestResolveAllValues(t *testing.T) { evaluator := flagdEvaluator.NewJSON(logger.NewLogger(nil, false), store.NewFlags()) err := evaluator.SetState(sync.DataSync{FlagData: flagConfig, Source: "testSource"}) diff --git a/docs/reference/flagd-cli/flagd_start.md b/docs/reference/flagd-cli/flagd_start.md index 61bdf6422..73d0bd780 100644 --- a/docs/reference/flagd-cli/flagd_start.md +++ b/docs/reference/flagd-cli/flagd_start.md @@ -33,6 +33,7 @@ flagd start [flags] -d, --socket-path string Flagd unix socket path. With grpc the evaluations service will become available on this address. With http(s) the grpc-gateway proxy will use this address internally. -s, --sources string JSON representation of an array of SourceConfig objects. Required fields: uri (string) and provider (string). Optional source-specific fields are also available, see https://flagd.dev/reference/sync-configuration/#source-configuration --stream-deadline duration Set a server-side deadline for flagd sync and event streams (default 0, means no deadline). + --strict-validation Enables strict schema validation. Invalid initial configurations cause flagd to exit on startup; readiness is gated on every source producing a valid configuration. WARNING: a bad configuration delivered to multiple flagd instances will cause all of them to exit, potentially leading to a cascading failure. -g, --sync-port int32 gRPC Sync port (default 8015) -e, --sync-socket-path string Flagd sync service socket path. With grpc the sync service will be available on this address. -f, --uri .yaml/.yml/.json Set a sync provider uri to read data from, this can be a filepath, URL (HTTP and gRPC), FeatureFlag custom resource, or GCS, Azure Blob or S3. When flag keys are duplicated across multiple providers the merge priority follows the index of the flag arguments, as such flags from the uri at index 0 take the lowest precedence, with duplicated keys being overwritten by those from the uri at index 1. Please note that if you are using filepath, flagd only supports files with .yaml/.yml/.json extension. diff --git a/flagd/cmd/start.go b/flagd/cmd/start.go index 5b6ed8499..f49125703 100644 --- a/flagd/cmd/start.go +++ b/flagd/cmd/start.go @@ -37,6 +37,7 @@ const ( syncSocketPathFlagName = "sync-socket-path" uriFlagName = "uri" disableSyncMetadata = "disable-sync-metadata" + strictValidationFlagName = "strict-validation" contextValueFlagName = "context-value" headerToContextKeyFlagName = "context-from-header" streamDeadlineFlagName = "stream-deadline" @@ -94,6 +95,7 @@ func init() { "header values to context values, where key is Header name, value is context key") flags.Duration(streamDeadlineFlagName, 0, "Set a server-side deadline for flagd sync and event streams (default 0, means no deadline).") flags.Bool(disableSyncMetadata, false, "Disables the getMetadata endpoint of the sync service. Defaults to false, but will default to true in later versions.") + flags.Bool(strictValidationFlagName, false, "Enables strict schema validation. Invalid initial configurations cause flagd to exit on startup; readiness is gated on every source producing a valid configuration. WARNING: a bad configuration delivered to multiple flagd instances will cause all of them to exit, potentially leading to a cascading failure.") flags.Int64P(maxRequestBodyFlagName, "B", 1_000_000, "Maximum allowed request body size in bytes. Requests exceeding this are rejected with HTTP 413 (OFREP) or 429 (connect). Set to 0 to disable. WARNING: disabling this limit may allow memory exhaustion from oversized requests.") flags.Int64P(maxRequestHeaderFlagName, "R", 1_000_000, "Maximum allowed request header size in bytes. Requests exceeding this are rejected with HTTP 431. Set to 0 to use Go's built-in default (1 MiB). WARNING: setting a very large or zero value may allow memory exhaustion from oversized headers.") @@ -122,6 +124,7 @@ func bindFlags(flags *pflag.FlagSet) { _ = viper.BindPFlag(headerToContextKeyFlagName, flags.Lookup(headerToContextKeyFlagName)) _ = viper.BindPFlag(streamDeadlineFlagName, flags.Lookup(streamDeadlineFlagName)) _ = viper.BindPFlag(disableSyncMetadata, flags.Lookup(disableSyncMetadata)) + _ = viper.BindPFlag(strictValidationFlagName, flags.Lookup(strictValidationFlagName)) _ = viper.BindPFlag(maxRequestBodyFlagName, flags.Lookup(maxRequestBodyFlagName)) _ = viper.BindPFlag(maxRequestHeaderFlagName, flags.Lookup(maxRequestHeaderFlagName)) } @@ -212,6 +215,7 @@ var startCmd = &cobra.Command{ HeaderToContextKeyMappings: headerToContextKeyMappings, MaxRequestBodyBytes: maxRequestBodyBytes, MaxRequestHeaderBytes: maxRequestHeaderBytes, + StrictValidation: viper.GetBool(strictValidationFlagName), }) if err != nil { rtLogger.Fatal(err.Error()) diff --git a/flagd/pkg/runtime/from_config.go b/flagd/pkg/runtime/from_config.go index ad4a98a28..a16e1fd23 100644 --- a/flagd/pkg/runtime/from_config.go +++ b/flagd/pkg/runtime/from_config.go @@ -48,6 +48,7 @@ type Config struct { HeaderToContextKeyMappings map[string]string MaxRequestBodyBytes int64 MaxRequestHeaderBytes int64 + StrictValidation bool } // FromConfig builds a runtime from startup configurations @@ -94,7 +95,11 @@ func FromConfig(logger *logger.Logger, version string, config Config) (*Runtime, } // derive evaluator - jsonEvaluator := evaluator.NewJSON(logger, store) + var evaluatorOpts []evaluator.JSONEvaluatorOption + if config.StrictValidation { + evaluatorOpts = append(evaluatorOpts, evaluator.WithStrictValidation()) + } + jsonEvaluator := evaluator.NewJSON(logger, store, evaluatorOpts...) // derive services @@ -158,6 +163,8 @@ func FromConfig(logger *logger.Logger, version string, config Config) (*Runtime, SyncService: flagSyncService, OfrepService: ofrepService, EvaluationService: connectService, + StrictValidation: config.StrictValidation, + ExpectedSources: sources, ServiceConfig: service.Configuration{ Port: config.ServicePort, ManagementPort: config.ManagementPort, diff --git a/flagd/pkg/runtime/runtime.go b/flagd/pkg/runtime/runtime.go index df03f2663..181bc7bee 100644 --- a/flagd/pkg/runtime/runtime.go +++ b/flagd/pkg/runtime/runtime.go @@ -26,8 +26,11 @@ type Runtime struct { EvaluationService service.IFlagEvaluationService ServiceConfig service.Configuration Syncs []sync.ISync + StrictValidation bool + ExpectedSources []string - mu msync.Mutex + mu msync.Mutex + validatedSources map[string]bool } //nolint:funlen @@ -45,12 +48,17 @@ func (r *Runtime) Start() error { defer cancel() g, gCtx := errgroup.WithContext(ctx) dataSync := make(chan sync.DataSync, len(r.Syncs)) + if r.validatedSources == nil { + r.validatedSources = make(map[string]bool, len(r.Syncs)) + } // Initialize DataSync channel watcher g.Go(func() error { for { select { case data := <-dataSync: - r.updateAndEmit(data) + if err := r.updateAndEmit(data); err != nil { + return err + } case <-gCtx.Done(): return nil } @@ -119,18 +127,45 @@ func (r *Runtime) isReady() bool { return false } } + // in strict-validation mode, readiness additionally requires every + // configured source to have produced at least one valid configuration. + if r.StrictValidation { + r.mu.Lock() + defer r.mu.Unlock() + for _, src := range r.ExpectedSources { + if !r.validatedSources[src] { + return false + } + } + } return true } -// updateAndEmit helps to update state, notify changes and trigger sync updates -func (r *Runtime) updateAndEmit(payload sync.DataSync) { +// updateAndEmit helps to update state, notify changes and trigger sync updates. +// In strict-validation mode, an error on the *first* payload from a given source +// (i.e. before that source has ever produced a valid state) is returned so that +// startup can fail fast. Once a source has produced at least one valid payload it +// is recorded in validatedSources; subsequent errors from that source are logged +// and the previous valid state is preserved (current behavior). +func (r *Runtime) updateAndEmit(payload sync.DataSync) error { r.mu.Lock() defer r.mu.Unlock() - err := r.Evaluator.SetState(payload) - if err != nil { + if r.validatedSources == nil { + r.validatedSources = make(map[string]bool) + } + + if err := r.Evaluator.SetState(payload); err != nil { r.Logger.Error(fmt.Sprintf("error setting state: %v", err)) - return + if r.StrictValidation && !r.validatedSources[payload.Source] { + return fmt.Errorf( + "strict validation: initial flag configuration from source %q is invalid: %w", + payload.Source, err, + ) + } + return nil } + r.validatedSources[payload.Source] = true r.SyncService.Emit(payload.Source) + return nil } diff --git a/flagd/pkg/runtime/runtime_test.go b/flagd/pkg/runtime/runtime_test.go new file mode 100644 index 000000000..30db4f197 --- /dev/null +++ b/flagd/pkg/runtime/runtime_test.go @@ -0,0 +1,175 @@ +package runtime + +import ( + "context" + "testing" + + "github.com/open-feature/flagd/core/pkg/evaluator" + "github.com/open-feature/flagd/core/pkg/logger" + "github.com/open-feature/flagd/core/pkg/store" + "github.com/open-feature/flagd/core/pkg/sync" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +const validFlags = `{ + "flags": { + "validFlag": { + "state": "ENABLED", + "variants": { + "on": true, + "off": false + }, + "defaultVariant": "on" + } + } +}` + +const invalidFlags = `{ + "flags": { + "invalidFlag": { + "notState": "ENABLED", + "notVariants": { + "on": true, + "off": false + }, + "notDefaultVariant": "on" + } + } +}` + +// fakeSyncService is a minimal stand-in for flagsync.ISyncService so that +// updateAndEmit can be exercised without spinning up the gRPC sync server. +type fakeSyncService struct { + emitted []string +} + +func (f *fakeSyncService) Start(_ context.Context) error { return nil } + +func (f *fakeSyncService) Emit(source string) { + f.emitted = append(f.emitted, source) +} + +func newRuntimeWithEvaluator(t *testing.T, strict bool) (*Runtime, *fakeSyncService) { + t.Helper() + log := logger.NewLogger(nil, false) + flagStore := store.NewFlags() + var opts []evaluator.JSONEvaluatorOption + if strict { + opts = append(opts, evaluator.WithStrictValidation()) + } + eval := evaluator.NewJSON(log, flagStore, opts...) + fakeSync := &fakeSyncService{} + return &Runtime{ + Logger: log, + Evaluator: eval, + SyncService: fakeSync, + StrictValidation: strict, + }, fakeSync +} + +func TestUpdateAndEmit_StrictValidation_FailsOnInvalidInitialConfig(t *testing.T) { + r, fakeSync := newRuntimeWithEvaluator(t, true) + + err := r.updateAndEmit(sync.DataSync{FlagData: invalidFlags, Source: "source-A"}) + require.Error(t, err) + assert.Contains(t, err.Error(), "strict validation") + assert.Contains(t, err.Error(), "source-A") + assert.Empty(t, fakeSync.emitted, "no emit should happen when initial config is invalid") +} + +func TestUpdateAndEmit_StrictValidation_TolaratesRuntimeUpdateFailure(t *testing.T) { + r, fakeSync := newRuntimeWithEvaluator(t, true) + + // first, a valid bootstrap payload + require.NoError(t, r.updateAndEmit(sync.DataSync{FlagData: validFlags, Source: "source-A"})) + require.Equal(t, []string{"source-A"}, fakeSync.emitted) + + // later, an invalid runtime update from the same (already bootstrapped) source + // must NOT cause an error to propagate; the prior valid state is preserved. + err := r.updateAndEmit(sync.DataSync{FlagData: invalidFlags, Source: "source-A"}) + require.NoError(t, err) + require.Equal(t, []string{"source-A"}, fakeSync.emitted, "emit must not fire for the failed update") +} + +func TestUpdateAndEmit_StrictValidation_PerSourceBootstrap(t *testing.T) { + r, _ := newRuntimeWithEvaluator(t, true) + + // source-A bootstraps successfully + require.NoError(t, r.updateAndEmit(sync.DataSync{FlagData: validFlags, Source: "source-A"})) + + // source-B is brand new; an invalid first payload from it must still fail-fast + // even though source-A has already bootstrapped. + err := r.updateAndEmit(sync.DataSync{FlagData: invalidFlags, Source: "source-B"}) + require.Error(t, err) + assert.Contains(t, err.Error(), "source-B") +} + +func TestUpdateAndEmit_NoStrictValidation_TolaratesInvalidInitial(t *testing.T) { + r, fakeSync := newRuntimeWithEvaluator(t, false) + + // without strict-validation, the legacy behavior is preserved: invalid initial + // payloads are logged and the runtime continues. The store is empty, so Emit + // still fires (mirroring pre-existing behavior of accepting with a warning). + err := r.updateAndEmit(sync.DataSync{FlagData: invalidFlags, Source: "source-A"}) + require.NoError(t, err) + // in non-strict mode, the underlying evaluator returns nil, so we treat the + // payload as bootstrapped and emit. + assert.Equal(t, []string{"source-A"}, fakeSync.emitted) +} + +// fakeSync is a minimal sync.ISync used to exercise isReady without spinning +// up real sync providers. +type fakeSync struct { + ready bool +} + +func (f *fakeSync) Init(_ context.Context) error { return nil } +func (f *fakeSync) Sync(_ context.Context, _ chan<- sync.DataSync) error { return nil } +func (f *fakeSync) ReSync(_ context.Context, _ chan<- sync.DataSync) error { + return nil +} +func (f *fakeSync) IsReady() bool { return f.ready } + +func TestIsReady_StrictValidation_AllSourcesValidated_True(t *testing.T) { + r, _ := newRuntimeWithEvaluator(t, true) + r.Syncs = []sync.ISync{&fakeSync{ready: true}, &fakeSync{ready: true}} + r.ExpectedSources = []string{"source-A", "source-B"} + + require.NoError(t, r.updateAndEmit(sync.DataSync{FlagData: validFlags, Source: "source-A"})) + require.NoError(t, r.updateAndEmit(sync.DataSync{FlagData: validFlags, Source: "source-B"})) + + assert.True(t, r.isReady()) +} + +func TestIsReady_StrictValidation_OneSourceUnvalidated_False(t *testing.T) { + r, _ := newRuntimeWithEvaluator(t, true) + r.Syncs = []sync.ISync{&fakeSync{ready: true}, &fakeSync{ready: true}} + r.ExpectedSources = []string{"source-A", "source-B"} + + // only source-A delivers a valid payload; source-B never has. + require.NoError(t, r.updateAndEmit(sync.DataSync{FlagData: validFlags, Source: "source-A"})) + + assert.False(t, r.isReady(), "must not be ready until every configured source has validated") +} + +func TestIsReady_NoStrictValidation_IgnoresValidation(t *testing.T) { + r, _ := newRuntimeWithEvaluator(t, false) + r.Syncs = []sync.ISync{&fakeSync{ready: true}, &fakeSync{ready: true}} + r.ExpectedSources = []string{"source-A", "source-B"} + + // no source has validated, but transports are ready and strict mode is off. + assert.True(t, r.isReady(), "non-strict mode must not gate readiness on validation") +} + +func TestIsReady_StrictValidation_TransportNotReady_False(t *testing.T) { + r, _ := newRuntimeWithEvaluator(t, true) + r.Syncs = []sync.ISync{&fakeSync{ready: true}, &fakeSync{ready: false}} + r.ExpectedSources = []string{"source-A", "source-B"} + + // even with all sources validated, transport-level not-ready wins. + require.NoError(t, r.updateAndEmit(sync.DataSync{FlagData: validFlags, Source: "source-A"})) + require.NoError(t, r.updateAndEmit(sync.DataSync{FlagData: validFlags, Source: "source-B"})) + + assert.False(t, r.isReady()) +}