From a8cda1489fdf6e2955390e69dbb6941dc8c7a740 Mon Sep 17 00:00:00 2001 From: Vibhav Bobade Date: Fri, 20 Mar 2026 22:51:34 +0530 Subject: [PATCH 1/8] feat(policies): parallelize policy evaluations with bounded concurrency Use errgroup to evaluate policy attachments concurrently within VerifyStatement and VerifyMaterial, bounded by runtime.NumCPU(). This reduces evaluation time from O(n*t) to O(t) where n is the number of policies and t is the slowest single evaluation. Also fixes pre-existing concurrency safety issues: - Move remotePolicyCache/remoteGroupCache mutexes from instance-level to package-level to prevent data races under concurrent access - Move extism.SetLogLevel() from per-Verify() call to engine construction to avoid racing on global state Fixes #2899 Signed-off-by: Vibhav Bobade --- go.mod | 2 +- pkg/policies/engine/wasm/engine.go | 30 +++--- pkg/policies/group_loader.go | 11 +- pkg/policies/loader.go | 11 +- pkg/policies/policies.go | 85 ++++++++++++--- pkg/policies/policy_groups.go | 166 +++++++++++++++++------------ 6 files changed, 194 insertions(+), 111 deletions(-) diff --git a/go.mod b/go.mod index 118391f29..d1159f0d9 100644 --- a/go.mod +++ b/go.mod @@ -442,7 +442,7 @@ require ( golang.org/x/crypto v0.46.0 golang.org/x/mod v0.31.0 // indirect golang.org/x/net v0.48.0 // indirect - golang.org/x/sync v0.19.0 // indirect + golang.org/x/sync v0.19.0 golang.org/x/sys v0.40.0 // indirect golang.org/x/text v0.32.0 // indirect golang.org/x/time v0.14.0 // indirect diff --git a/pkg/policies/engine/wasm/engine.go b/pkg/policies/engine/wasm/engine.go index 9b191971e..0ecf42360 100644 --- a/pkg/policies/engine/wasm/engine.go +++ b/pkg/policies/engine/wasm/engine.go @@ -59,6 +59,21 @@ func NewEngine(opts ...engine.Option) *Engine { logger = &noopLogger } + // Set WASM plugin log level once at engine creation to avoid + // concurrent calls to the global extism.SetLogLevel from Verify(). + switch { + case logger.GetLevel() <= zerolog.TraceLevel: + extism.SetLogLevel(extism.LogLevelTrace) + case logger.GetLevel() <= zerolog.DebugLevel: + extism.SetLogLevel(extism.LogLevelDebug) + case logger.GetLevel() <= zerolog.InfoLevel: + extism.SetLogLevel(extism.LogLevelInfo) + case logger.GetLevel() <= zerolog.WarnLevel: + extism.SetLogLevel(extism.LogLevelWarn) + default: + extism.SetLogLevel(extism.LogLevelError) + } + return &Engine{ executionTimeout: executionTimeout, logger: logger, @@ -70,21 +85,6 @@ func NewEngine(opts ...engine.Option) *Engine { func (e *Engine) Verify(ctx context.Context, policy *engine.Policy, input []byte, args map[string]any) (*engine.EvaluationResult, error) { e.logger.Debug().Str("policy", policy.Name).Int("wasm_size", len(policy.Source)).Int("input_size", len(input)).Int("args_count", len(args)).Msg("Starting WASM policy execution") - // Enable WASM plugin logging based on logger level - // This allows LogInfo(), LogDebug(), etc. from the WASM policy to be visible - switch { - case e.logger.GetLevel() <= zerolog.TraceLevel: - extism.SetLogLevel(extism.LogLevelTrace) - case e.logger.GetLevel() <= zerolog.DebugLevel: - extism.SetLogLevel(extism.LogLevelDebug) - case e.logger.GetLevel() <= zerolog.InfoLevel: - extism.SetLogLevel(extism.LogLevelInfo) - case e.logger.GetLevel() <= zerolog.WarnLevel: - extism.SetLogLevel(extism.LogLevelWarn) - default: - extism.SetLogLevel(extism.LogLevelError) - } - // Prepare config with args if present configMap := make(map[string]string) if len(args) > 0 { diff --git a/pkg/policies/group_loader.go b/pkg/policies/group_loader.go index 5e24bd40b..f22177f44 100644 --- a/pkg/policies/group_loader.go +++ b/pkg/policies/group_loader.go @@ -99,8 +99,6 @@ func (l *HTTPSGroupLoader) Load(_ context.Context, attachment *v1.PolicyGroupAtt // ChainloopGroupLoader loads groups referenced with chainloop://provider/name URLs type ChainloopGroupLoader struct { Client pb.AttestationServiceClient - - cacheMutex sync.Mutex } type groupWithReference struct { @@ -108,7 +106,10 @@ type groupWithReference struct { reference *PolicyDescriptor } -var remoteGroupCache = make(map[string]*groupWithReference) +var ( + remoteGroupCache = make(map[string]*groupWithReference) + remoteGroupCacheMutex sync.Mutex +) func NewChainloopGroupLoader(client pb.AttestationServiceClient) *ChainloopGroupLoader { return &ChainloopGroupLoader{Client: client} @@ -117,8 +118,8 @@ func NewChainloopGroupLoader(client pb.AttestationServiceClient) *ChainloopGroup func (c *ChainloopGroupLoader) Load(ctx context.Context, attachment *v1.PolicyGroupAttachment) (*v1.PolicyGroup, *PolicyDescriptor, error) { ref := attachment.GetRef() - c.cacheMutex.Lock() - defer c.cacheMutex.Unlock() + remoteGroupCacheMutex.Lock() + defer remoteGroupCacheMutex.Unlock() if v, ok := remoteGroupCache[ref]; ok { return v.group, v.reference, nil diff --git a/pkg/policies/loader.go b/pkg/policies/loader.go index 995bd5461..2c4803170 100644 --- a/pkg/policies/loader.go +++ b/pkg/policies/loader.go @@ -158,8 +158,6 @@ func (l *HTTPSLoader) Load(_ context.Context, attachment *v1.PolicyAttachment) ( // ChainloopLoader loads policies referenced with chainloop://provider/name URLs type ChainloopLoader struct { Client pb.AttestationServiceClient - - cacheMutex sync.Mutex } type policyWithReference struct { @@ -167,7 +165,10 @@ type policyWithReference struct { reference *PolicyDescriptor } -var remotePolicyCache = make(map[string]*policyWithReference) +var ( + remotePolicyCache = make(map[string]*policyWithReference) + remotePolicyCacheMutex sync.Mutex +) func NewChainloopLoader(client pb.AttestationServiceClient) *ChainloopLoader { return &ChainloopLoader{Client: client} @@ -176,8 +177,8 @@ func NewChainloopLoader(client pb.AttestationServiceClient) *ChainloopLoader { func (c *ChainloopLoader) Load(ctx context.Context, attachment *v1.PolicyAttachment) (*v1.Policy, *PolicyDescriptor, error) { ref := attachment.GetRef() - c.cacheMutex.Lock() - defer c.cacheMutex.Unlock() + remotePolicyCacheMutex.Lock() + defer remotePolicyCacheMutex.Unlock() if v, ok := remotePolicyCache[ref]; ok { return v.policy, v.reference, nil diff --git a/pkg/policies/policies.go b/pkg/policies/policies.go index 486cf58ee..510b61f93 100644 --- a/pkg/policies/policies.go +++ b/pkg/policies/policies.go @@ -22,6 +22,7 @@ import ( "fmt" "net/url" "path/filepath" + "runtime" "slices" "strings" @@ -40,6 +41,7 @@ import ( "github.com/chainloop-dev/chainloop/pkg/policies/engine" "github.com/chainloop-dev/chainloop/pkg/policies/engine/rego" "github.com/chainloop-dev/chainloop/pkg/policies/engine/wasm" + "golang.org/x/sync/errgroup" ) type PolicyError struct { @@ -72,6 +74,9 @@ const ( EvalPhasePush ) +// defaultMaxConcurrency is the default number of concurrent policy evaluations. +var defaultMaxConcurrency = runtime.NumCPU() + type PolicyVerifier struct { policies *v1.Policies logger *zerolog.Logger @@ -82,6 +87,7 @@ type PolicyVerifier struct { includeRawData bool enablePrint bool evalPhase EvalPhase + maxConcurrency int } var _ Verifier = (*PolicyVerifier)(nil) @@ -93,6 +99,7 @@ type PolicyVerifierOptions struct { EnablePrint bool GRPCConn *grpc.ClientConn EvalPhase EvalPhase + MaxConcurrency int } type PolicyVerifierOption func(*PolicyVerifierOptions) @@ -133,12 +140,23 @@ func WithEvalPhase(phase EvalPhase) PolicyVerifierOption { } } +func WithMaxConcurrency(n int) PolicyVerifierOption { + return func(o *PolicyVerifierOptions) { + o.MaxConcurrency = n + } +} + func NewPolicyVerifier(policies *v1.Policies, client v13.AttestationServiceClient, logger *zerolog.Logger, opts ...PolicyVerifierOption) *PolicyVerifier { options := &PolicyVerifierOptions{} for _, opt := range opts { opt(options) } + maxConcurrency := options.MaxConcurrency + if maxConcurrency <= 0 { + maxConcurrency = defaultMaxConcurrency + } + return &PolicyVerifier{ policies: policies, client: client, @@ -149,6 +167,7 @@ func NewPolicyVerifier(policies *v1.Policies, client v13.AttestationServiceClien includeRawData: options.IncludeRawData, enablePrint: options.EnablePrint, evalPhase: options.EvalPhase, + maxConcurrency: maxConcurrency, } } @@ -171,14 +190,29 @@ func (pv *PolicyVerifier) VerifyMaterial(ctx context.Context, material *v12.Atte return nil, NewPolicyError(err) } - for _, attachment := range attachments { - ev, err := pv.evaluatePolicyAttachment(ctx, attachment, subject, - &evalOpts{kind: material.MaterialType, name: material.GetId()}, - ) - if err != nil { - return nil, NewPolicyError(err) - } + results := make([]*v12.PolicyEvaluation, len(attachments)) + g, ctx := errgroup.WithContext(ctx) + g.SetLimit(pv.maxConcurrency) + for i, attachment := range attachments { + g.Go(func() error { + ev, err := pv.evaluatePolicyAttachment(ctx, attachment, subject, + &evalOpts{kind: material.MaterialType, name: material.GetId()}, + ) + if err != nil { + return NewPolicyError(err) + } + results[i] = ev + return nil + }) + } + + if err := g.Wait(); err != nil { + return nil, err + } + + // Filter nil entries (skipped policies) + for _, ev := range results { if ev != nil { result = append(result, ev) } @@ -425,19 +459,36 @@ func ComputeArguments(name string, inputs []*v1.PolicyInput, args map[string]str // VerifyStatement verifies that the statement is compliant with the policies present in the schema func (pv *PolicyVerifier) VerifyStatement(ctx context.Context, statement *intoto.Statement) ([]*v12.PolicyEvaluation, error) { - result := make([]*v12.PolicyEvaluation, 0) policies := pv.policies.GetAttestation() - for _, policyAtt := range policies { - material, err := protojson.Marshal(statement) - if err != nil { - return nil, NewPolicyError(err) - } - ev, err := pv.evaluatePolicyAttachment(ctx, policyAtt, material, &evalOpts{kind: v1.CraftingSchema_Material_ATTESTATION}) - if err != nil { - return nil, NewPolicyError(err) - } + // Marshal statement once — it's read-only input shared across evaluations + statementJSON, err := protojson.Marshal(statement) + if err != nil { + return nil, NewPolicyError(err) + } + + results := make([]*v12.PolicyEvaluation, len(policies)) + g, ctx := errgroup.WithContext(ctx) + g.SetLimit(pv.maxConcurrency) + + for i, policyAtt := range policies { + g.Go(func() error { + ev, err := pv.evaluatePolicyAttachment(ctx, policyAtt, statementJSON, &evalOpts{kind: v1.CraftingSchema_Material_ATTESTATION}) + if err != nil { + return NewPolicyError(err) + } + results[i] = ev + return nil + }) + } + + if err := g.Wait(); err != nil { + return nil, err + } + // Filter nil entries (skipped policies) + result := make([]*v12.PolicyEvaluation, 0, len(policies)) + for _, ev := range results { if ev != nil { result = append(result, ev) } diff --git a/pkg/policies/policy_groups.go b/pkg/policies/policy_groups.go index f77bdd97f..bbd339278 100644 --- a/pkg/policies/policy_groups.go +++ b/pkg/policies/policy_groups.go @@ -27,6 +27,7 @@ import ( "github.com/chainloop-dev/chainloop/pkg/templates" intoto "github.com/in-toto/attestation/go/v1" "github.com/rs/zerolog" + "golang.org/x/sync/errgroup" "google.golang.org/protobuf/encoding/protojson" "google.golang.org/protobuf/proto" ) @@ -74,44 +75,58 @@ func (pgv *PolicyGroupVerifier) VerifyMaterial(ctx context.Context, material *ap return nil, NewPolicyError(err) } - for _, policyAtt := range policyAtts { - // Check if policy should be skipped - skip, policyName, err := pgv.shouldSkipPolicy(ctx, policyAtt, groupAtt.GetSkip()) - if err != nil { - return nil, NewPolicyError(fmt.Errorf("failed to check if policy should be skipped: %w", err)) - } - - if skip { - pgv.logger.Debug().Str("policy", policyName).Msg("skipping attestation policy per skip list") - continue - } - - // Load material content - subject, err := material.GetEvaluableContent(path) - if err != nil { - return nil, NewPolicyError(err) - } + // Load material content once for all policies in this group + subject, err := material.GetEvaluableContent(path) + if err != nil { + return nil, NewPolicyError(err) + } - ev, err := pgv.evaluatePolicyAttachment(ctx, applyGroupGate(policyAtt, groupAtt), subject, - &evalOpts{kind: material.MaterialType, name: material.GetId(), bindings: groupArgs}, - ) - if err != nil { - return nil, NewPolicyError(err) - } + groupResults := make([]*api.PolicyEvaluation, len(policyAtts)) + g, gCtx := errgroup.WithContext(ctx) + g.SetLimit(pgv.maxConcurrency) + + for i, policyAtt := range policyAtts { + g.Go(func() error { + // Check if policy should be skipped + skip, policyName, err := pgv.shouldSkipPolicy(gCtx, policyAtt, groupAtt.GetSkip()) + if err != nil { + return NewPolicyError(fmt.Errorf("failed to check if policy should be skipped: %w", err)) + } + + if skip { + pgv.logger.Debug().Str("policy", policyName).Msg("skipping attestation policy per skip list") + return nil + } + + ev, err := pgv.evaluatePolicyAttachment(gCtx, applyGroupGate(policyAtt, groupAtt), subject, + &evalOpts{kind: material.MaterialType, name: material.GetId(), bindings: groupArgs}, + ) + if err != nil { + return NewPolicyError(err) + } + + if ev != nil { + // Assign group reference to this evaluation + ev.GroupReference = &api.PolicyEvaluation_Reference{ + Name: group.GetMetadata().GetName(), + Digest: desc.GetDigest(), + Uri: desc.GetURI(), + OrgName: desc.GetOrgName(), + } + } + groupResults[i] = ev + return nil + }) + } - if ev == nil { - // no evaluation, skip - continue - } + if err := g.Wait(); err != nil { + return nil, err + } - // Assign group reference to this evaluation - ev.GroupReference = &api.PolicyEvaluation_Reference{ - Name: group.GetMetadata().GetName(), - Digest: desc.GetDigest(), - Uri: desc.GetURI(), - OrgName: desc.GetOrgName(), + for _, ev := range groupResults { + if ev != nil { + result = append(result, ev) } - result = append(result, ev) } } @@ -138,44 +153,59 @@ func (pgv *PolicyGroupVerifier) VerifyStatement(ctx context.Context, statement * return nil, NewPolicyError(err) } - for _, attachment := range group.GetSpec().GetPolicies().GetAttestation() { - // Check if policy should be skipped - skip, policyName, err := pgv.shouldSkipPolicy(ctx, attachment, groupAtt.GetSkip()) - if err != nil { - return nil, NewPolicyError(fmt.Errorf("failed to check if policy should be skipped: %w", err)) - } - - if skip { - pgv.logger.Debug().Str("policy", policyName).Msg("skipping attestation policy per skip list") - continue - } - - material, err := protojson.Marshal(statement) - if err != nil { - return nil, NewPolicyError(err) - } + // Marshal statement once for all policies in this group + statementJSON, err := protojson.Marshal(statement) + if err != nil { + return nil, NewPolicyError(err) + } - ev, err := pgv.evaluatePolicyAttachment(ctx, applyGroupGate(attachment, groupAtt), material, - &evalOpts{kind: v1.CraftingSchema_Material_ATTESTATION, bindings: groupArgs}, - ) - if err != nil { - return nil, NewPolicyError(err) - } + attestationPolicies := group.GetSpec().GetPolicies().GetAttestation() + groupResults := make([]*api.PolicyEvaluation, len(attestationPolicies)) + g, gCtx := errgroup.WithContext(ctx) + g.SetLimit(pgv.maxConcurrency) + + for i, attachment := range attestationPolicies { + g.Go(func() error { + // Check if policy should be skipped + skip, policyName, err := pgv.shouldSkipPolicy(gCtx, attachment, groupAtt.GetSkip()) + if err != nil { + return NewPolicyError(fmt.Errorf("failed to check if policy should be skipped: %w", err)) + } + + if skip { + pgv.logger.Debug().Str("policy", policyName).Msg("skipping attestation policy per skip list") + return nil + } + + ev, err := pgv.evaluatePolicyAttachment(gCtx, applyGroupGate(attachment, groupAtt), statementJSON, + &evalOpts{kind: v1.CraftingSchema_Material_ATTESTATION, bindings: groupArgs}, + ) + if err != nil { + return NewPolicyError(err) + } + + if ev != nil { + // Assign group reference to this evaluation + ev.GroupReference = &api.PolicyEvaluation_Reference{ + Name: group.GetMetadata().GetName(), + Digest: desc.GetDigest(), + Uri: desc.GetURI(), + OrgName: desc.GetOrgName(), + } + } + groupResults[i] = ev + return nil + }) + } - if ev == nil { - // no evaluation, skip - continue - } + if err := g.Wait(); err != nil { + return nil, err + } - // Assign group reference to this evaluation - ev.GroupReference = &api.PolicyEvaluation_Reference{ - Name: group.GetMetadata().GetName(), - Digest: desc.GetDigest(), - Uri: desc.GetURI(), - OrgName: desc.GetOrgName(), + for _, ev := range groupResults { + if ev != nil { + result = append(result, ev) } - - result = append(result, ev) } } From 03026556666b2ef1fb50f4ca6d79c2fd099e976c Mon Sep 17 00:00:00 2001 From: Vibhav Bobade Date: Fri, 20 Mar 2026 23:01:57 +0530 Subject: [PATCH 2/8] feat(policies): parallelize policy evaluations with bounded concurrency Use errgroup to evaluate policy attachments concurrently within VerifyStatement and VerifyMaterial, bounded by runtime.NumCPU(). This reduces evaluation time from O(n*t) to O(t) where n is the number of policies and t is the slowest single evaluation. Also fixes pre-existing concurrency safety issues: - Move remotePolicyCache/remoteGroupCache mutexes from instance-level to package-level to prevent data races under concurrent access - Use check-lock-check pattern in cache loaders to avoid holding the mutex during gRPC calls, enabling true parallel cache-miss fetches - Move extism.SetLogLevel() to sync.Once to guarantee it is called exactly once across all concurrent WASM engine instances Fixes #2899 Signed-off-by: Vibhav Bobade --- pkg/policies/engine/wasm/engine.go | 35 ++++++++++++++++++------------ pkg/policies/group_loader.go | 12 +++++++--- pkg/policies/loader.go | 11 +++++++--- pkg/policies/policies.go | 8 +++---- 4 files changed, 42 insertions(+), 24 deletions(-) diff --git a/pkg/policies/engine/wasm/engine.go b/pkg/policies/engine/wasm/engine.go index 0ecf42360..f688b66a0 100644 --- a/pkg/policies/engine/wasm/engine.go +++ b/pkg/policies/engine/wasm/engine.go @@ -19,6 +19,7 @@ import ( "context" "encoding/json" "fmt" + "sync" "time" "github.com/chainloop-dev/chainloop/pkg/policies/engine" @@ -28,6 +29,10 @@ import ( "github.com/rs/zerolog" ) +// setLogLevelOnce ensures extism.SetLogLevel is called exactly once, +// since it modifies global state and is not safe for concurrent calls. +var setLogLevelOnce sync.Once + // Ensure Engine implements PolicyEngine interface var _ engine.PolicyEngine = (*Engine)(nil) @@ -59,20 +64,22 @@ func NewEngine(opts ...engine.Option) *Engine { logger = &noopLogger } - // Set WASM plugin log level once at engine creation to avoid - // concurrent calls to the global extism.SetLogLevel from Verify(). - switch { - case logger.GetLevel() <= zerolog.TraceLevel: - extism.SetLogLevel(extism.LogLevelTrace) - case logger.GetLevel() <= zerolog.DebugLevel: - extism.SetLogLevel(extism.LogLevelDebug) - case logger.GetLevel() <= zerolog.InfoLevel: - extism.SetLogLevel(extism.LogLevelInfo) - case logger.GetLevel() <= zerolog.WarnLevel: - extism.SetLogLevel(extism.LogLevelWarn) - default: - extism.SetLogLevel(extism.LogLevelError) - } + // Set WASM plugin log level exactly once across all engine instances. + // extism.SetLogLevel modifies global state and is not safe for concurrent calls. + setLogLevelOnce.Do(func() { + switch { + case logger.GetLevel() <= zerolog.TraceLevel: + extism.SetLogLevel(extism.LogLevelTrace) + case logger.GetLevel() <= zerolog.DebugLevel: + extism.SetLogLevel(extism.LogLevelDebug) + case logger.GetLevel() <= zerolog.InfoLevel: + extism.SetLogLevel(extism.LogLevelInfo) + case logger.GetLevel() <= zerolog.WarnLevel: + extism.SetLogLevel(extism.LogLevelWarn) + default: + extism.SetLogLevel(extism.LogLevelError) + } + }) return &Engine{ executionTimeout: executionTimeout, diff --git a/pkg/policies/group_loader.go b/pkg/policies/group_loader.go index f22177f44..d0489afaf 100644 --- a/pkg/policies/group_loader.go +++ b/pkg/policies/group_loader.go @@ -118,12 +118,13 @@ func NewChainloopGroupLoader(client pb.AttestationServiceClient) *ChainloopGroup func (c *ChainloopGroupLoader) Load(ctx context.Context, attachment *v1.PolicyGroupAttachment) (*v1.PolicyGroup, *PolicyDescriptor, error) { ref := attachment.GetRef() + // Check cache (read under lock, release before any I/O) remoteGroupCacheMutex.Lock() - defer remoteGroupCacheMutex.Unlock() - if v, ok := remoteGroupCache[ref]; ok { + remoteGroupCacheMutex.Unlock() return v.group, v.reference, nil } + remoteGroupCacheMutex.Unlock() if !IsProviderScheme(ref) { return nil, nil, fmt.Errorf("invalid group reference %q", ref) @@ -131,6 +132,7 @@ func (c *ChainloopGroupLoader) Load(ctx context.Context, attachment *v1.PolicyGr providerRef := ProviderParts(ref) + // gRPC call happens outside the lock resp, err := c.Client.GetPolicyGroup(ctx, &pb.AttestationServiceGetPolicyGroupRequest{ Provider: providerRef.Provider, GroupName: providerRef.Name, @@ -154,7 +156,11 @@ func (c *ChainloopGroupLoader) Load(ctx context.Context, attachment *v1.PolicyGr } reference := policyReferenceResourceDescriptor(providerRef.Name, resp.Reference.GetUrl(), orgName, h) - // cache result + + // Write to cache under lock + remoteGroupCacheMutex.Lock() remoteGroupCache[ref] = &groupWithReference{group: resp.GetGroup(), reference: reference} + remoteGroupCacheMutex.Unlock() + return resp.GetGroup(), reference, nil } diff --git a/pkg/policies/loader.go b/pkg/policies/loader.go index 2c4803170..acc8ba232 100644 --- a/pkg/policies/loader.go +++ b/pkg/policies/loader.go @@ -177,12 +177,13 @@ func NewChainloopLoader(client pb.AttestationServiceClient) *ChainloopLoader { func (c *ChainloopLoader) Load(ctx context.Context, attachment *v1.PolicyAttachment) (*v1.Policy, *PolicyDescriptor, error) { ref := attachment.GetRef() + // Check cache (read under lock, release before any I/O) remotePolicyCacheMutex.Lock() - defer remotePolicyCacheMutex.Unlock() - if v, ok := remotePolicyCache[ref]; ok { + remotePolicyCacheMutex.Unlock() return v.policy, v.reference, nil } + remotePolicyCacheMutex.Unlock() if !IsProviderScheme(ref) { return nil, nil, fmt.Errorf("invalid policy reference %q", ref) @@ -190,6 +191,7 @@ func (c *ChainloopLoader) Load(ctx context.Context, attachment *v1.PolicyAttachm providerRef := ProviderParts(ref) + // gRPC call happens outside the lock resp, err := c.Client.GetPolicy(ctx, &pb.AttestationServiceGetPolicyRequest{ Provider: providerRef.Provider, PolicyName: providerRef.Name, @@ -214,8 +216,11 @@ func (c *ChainloopLoader) Load(ctx context.Context, attachment *v1.PolicyAttachm reference := policyReferenceResourceDescriptor(providerRef.Name, resp.Reference.GetUrl(), orgName, h) - // cache result + // Write to cache under lock + remotePolicyCacheMutex.Lock() remotePolicyCache[ref] = &policyWithReference{policy: resp.GetPolicy(), reference: reference} + remotePolicyCacheMutex.Unlock() + return resp.GetPolicy(), reference, nil } diff --git a/pkg/policies/policies.go b/pkg/policies/policies.go index 510b61f93..d37945842 100644 --- a/pkg/policies/policies.go +++ b/pkg/policies/policies.go @@ -191,12 +191,12 @@ func (pv *PolicyVerifier) VerifyMaterial(ctx context.Context, material *v12.Atte } results := make([]*v12.PolicyEvaluation, len(attachments)) - g, ctx := errgroup.WithContext(ctx) + g, gCtx := errgroup.WithContext(ctx) g.SetLimit(pv.maxConcurrency) for i, attachment := range attachments { g.Go(func() error { - ev, err := pv.evaluatePolicyAttachment(ctx, attachment, subject, + ev, err := pv.evaluatePolicyAttachment(gCtx, attachment, subject, &evalOpts{kind: material.MaterialType, name: material.GetId()}, ) if err != nil { @@ -468,12 +468,12 @@ func (pv *PolicyVerifier) VerifyStatement(ctx context.Context, statement *intoto } results := make([]*v12.PolicyEvaluation, len(policies)) - g, ctx := errgroup.WithContext(ctx) + g, gCtx := errgroup.WithContext(ctx) g.SetLimit(pv.maxConcurrency) for i, policyAtt := range policies { g.Go(func() error { - ev, err := pv.evaluatePolicyAttachment(ctx, policyAtt, statementJSON, &evalOpts{kind: v1.CraftingSchema_Material_ATTESTATION}) + ev, err := pv.evaluatePolicyAttachment(gCtx, policyAtt, statementJSON, &evalOpts{kind: v1.CraftingSchema_Material_ATTESTATION}) if err != nil { return NewPolicyError(err) } From 48419ef3fb51b9c33bab77d281f0b990c911a9fd Mon Sep 17 00:00:00 2001 From: Vibhav Bobade Date: Fri, 20 Mar 2026 23:27:45 +0530 Subject: [PATCH 3/8] feat(policies): parallelize policy evaluations with bounded concurrency Use errgroup to evaluate policy attachments concurrently within VerifyStatement and VerifyMaterial, bounded by max(NumCPU*2, 10). This reduces evaluation time from O(n*t) to O(t) where n is the number of policies and t is the slowest single evaluation. Concurrency safety fixes: - Move remotePolicyCache/remoteGroupCache mutexes from instance-level to package-level to prevent data races under concurrent access - Use singleflight to coalesce concurrent gRPC fetches for the same policy/group ref, ensuring exactly one fetch per key - Move extism.SetLogLevel() to sync.Once to guarantee it is called exactly once across all concurrent WASM engine instances Adds race detector tests covering: - Concurrent VerifyStatement from 10 goroutines - Concurrent VerifyMaterial from 10 goroutines - WithMaxConcurrency option validation - Sequential (maxConcurrency=1) vs parallel result equivalence - errgroup cancellation on policy load failure Fixes #2899 Signed-off-by: Vibhav Bobade --- pkg/policies/concurrency_test.go | 236 +++++++++++++++++++++++++++++ pkg/policies/engine/wasm/engine.go | 2 + pkg/policies/group_loader.go | 77 ++++++---- pkg/policies/loader.go | 78 ++++++---- pkg/policies/policies.go | 4 +- 5 files changed, 336 insertions(+), 61 deletions(-) create mode 100644 pkg/policies/concurrency_test.go diff --git a/pkg/policies/concurrency_test.go b/pkg/policies/concurrency_test.go new file mode 100644 index 000000000..d66c67f46 --- /dev/null +++ b/pkg/policies/concurrency_test.go @@ -0,0 +1,236 @@ +// +// Copyright 2024-2026 The Chainloop Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package policies + +import ( + "context" + "os" + "sync" + "testing" + + v12 "github.com/chainloop-dev/chainloop/app/controlplane/api/workflowcontract/v1" + v1 "github.com/chainloop-dev/chainloop/pkg/attestation/crafter/api/attestation/v1" + intoto "github.com/in-toto/attestation/go/v1" + "github.com/rs/zerolog" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/protobuf/encoding/protojson" +) + +// TestConcurrentVerifyStatement runs VerifyStatement from multiple goroutines +// to exercise errgroup parallelism and catch race conditions. +// Run with: go test -race -count=1 -run TestConcurrentVerifyStatement ./pkg/policies/... +func TestConcurrentVerifyStatement(t *testing.T) { + logger := zerolog.Nop() + + schema := &v12.CraftingSchema{ + Policies: &v12.Policies{ + Attestation: []*v12.PolicyAttachment{ + {Policy: &v12.PolicyAttachment_Ref{Ref: "file://testdata/workflow.yaml"}}, + {Policy: &v12.PolicyAttachment_Ref{Ref: "file://testdata/materials.yaml"}}, + }, + }, + } + + statement := loadStatementForTest(t, "testdata/statement.json") + + pv := NewPolicyVerifier(schema.GetPolicies(), nil, &logger) + + // Run multiple VerifyStatement calls concurrently + const goroutines = 10 + var wg sync.WaitGroup + errs := make([]error, goroutines) + results := make([][]*v1.PolicyEvaluation, goroutines) + + for i := range goroutines { + wg.Add(1) + go func() { + defer wg.Done() + res, err := pv.VerifyStatement(context.Background(), statement) + errs[i] = err + results[i] = res + }() + } + + wg.Wait() + + // All calls should succeed + for i := range goroutines { + assert.NoError(t, errs[i], "goroutine %d failed", i) + } + + // All calls should return the same number of evaluations + for i := 1; i < goroutines; i++ { + assert.Equal(t, len(results[0]), len(results[i]), + "goroutine %d returned different number of evaluations", i) + } +} + +// TestConcurrentVerifyMaterial runs VerifyMaterial from multiple goroutines. +func TestConcurrentVerifyMaterial(t *testing.T) { + logger := zerolog.Nop() + + schema := &v12.CraftingSchema{ + Policies: &v12.Policies{ + Materials: []*v12.PolicyAttachment{ + { + Policy: &v12.PolicyAttachment_Ref{Ref: "file://testdata/materials.yaml"}, + Selector: &v12.PolicyAttachment_MaterialSelector{Name: "sbom"}, + }, + }, + }, + } + + material := &v1.Attestation_Material{ + MaterialType: v12.CraftingSchema_Material_SBOM_CYCLONEDX_JSON, + M: &v1.Attestation_Material_String_{ + String_: &v1.Attestation_Material_KeyVal{ + Id: "sbom", + Value: "testdata/sbom.cyclonedx.json", + }, + }, + } + + pv := NewPolicyVerifier(schema.GetPolicies(), nil, &logger) + + const goroutines = 10 + var wg sync.WaitGroup + errs := make([]error, goroutines) + results := make([][]*v1.PolicyEvaluation, goroutines) + + for i := range goroutines { + wg.Add(1) + go func() { + defer wg.Done() + res, err := pv.VerifyMaterial(context.Background(), material, "") + errs[i] = err + results[i] = res + }() + } + + wg.Wait() + + for i := range goroutines { + assert.NoError(t, errs[i], "goroutine %d failed", i) + } + + for i := 1; i < goroutines; i++ { + assert.Equal(t, len(results[0]), len(results[i]), + "goroutine %d returned different number of evaluations", i) + } +} + +// TestWithMaxConcurrency verifies the WithMaxConcurrency option is applied. +func TestWithMaxConcurrency(t *testing.T) { + logger := zerolog.Nop() + + // Test default + pv := NewPolicyVerifier(nil, nil, &logger) + assert.Greater(t, pv.maxConcurrency, 0, "default maxConcurrency should be positive") + + // Test custom value + pv = NewPolicyVerifier(nil, nil, &logger, WithMaxConcurrency(5)) + assert.Equal(t, 5, pv.maxConcurrency) + + // Test zero defaults to computed default + pv = NewPolicyVerifier(nil, nil, &logger, WithMaxConcurrency(0)) + assert.Equal(t, defaultMaxConcurrency, pv.maxConcurrency) + + // Test negative defaults to computed default + pv = NewPolicyVerifier(nil, nil, &logger, WithMaxConcurrency(-1)) + assert.Equal(t, defaultMaxConcurrency, pv.maxConcurrency) +} + +// TestVerifyStatementWithConcurrencyOne ensures sequential mode (maxConcurrency=1) +// produces identical results to default parallel mode. +func TestVerifyStatementWithConcurrencyOne(t *testing.T) { + logger := zerolog.Nop() + + schema := &v12.CraftingSchema{ + Policies: &v12.Policies{ + Attestation: []*v12.PolicyAttachment{ + {Policy: &v12.PolicyAttachment_Ref{Ref: "file://testdata/workflow.yaml"}}, + {Policy: &v12.PolicyAttachment_Ref{Ref: "file://testdata/materials.yaml"}}, + }, + }, + } + + statement := loadStatementForTest(t, "testdata/statement.json") + + // Sequential + pvSeq := NewPolicyVerifier(schema.GetPolicies(), nil, &logger, WithMaxConcurrency(1)) + seqResults, seqErr := pvSeq.VerifyStatement(context.Background(), statement) + + // Parallel (default) + pvPar := NewPolicyVerifier(schema.GetPolicies(), nil, &logger) + parResults, parErr := pvPar.VerifyStatement(context.Background(), statement) + + require.NoError(t, seqErr) + require.NoError(t, parErr) + assert.Equal(t, len(seqResults), len(parResults), + "sequential and parallel should return same number of evaluations") + + // Build name→result maps for comparison (order may differ) + seqByName := make(map[string]*v1.PolicyEvaluation) + for _, ev := range seqResults { + seqByName[ev.Name] = ev + } + + for _, ev := range parResults { + seqEv, ok := seqByName[ev.Name] + assert.True(t, ok, "parallel result has policy %q not found in sequential", ev.Name) + if ok { + assert.Equal(t, len(seqEv.Violations), len(ev.Violations), + "policy %q: violation count mismatch", ev.Name) + assert.Equal(t, seqEv.Skipped, ev.Skipped, + "policy %q: skipped mismatch", ev.Name) + } + } +} + +// TestErrGroupCancellation verifies that when one policy evaluation fails, +// the errgroup context is cancelled and propagated. +func TestErrGroupCancellation(t *testing.T) { + logger := zerolog.Nop() + + schema := &v12.CraftingSchema{ + Policies: &v12.Policies{ + Attestation: []*v12.PolicyAttachment{ + {Policy: &v12.PolicyAttachment_Ref{Ref: "file://testdata/workflow.yaml"}}, + // This policy ref does not exist — will cause an error + {Policy: &v12.PolicyAttachment_Ref{Ref: "file://testdata/nonexistent_policy.yaml"}}, + }, + }, + } + + statement := loadStatementForTest(t, "testdata/statement.json") + + pv := NewPolicyVerifier(schema.GetPolicies(), nil, &logger) + _, err := pv.VerifyStatement(context.Background(), statement) + + assert.Error(t, err, "should fail when a policy file does not exist") + assert.IsType(t, &PolicyError{}, err, "error should be a PolicyError") +} + +func loadStatementForTest(t *testing.T, file string) *intoto.Statement { + t.Helper() + stContent, err := os.ReadFile(file) + require.NoError(t, err) + var statement intoto.Statement + err = protojson.Unmarshal(stContent, &statement) + require.NoError(t, err) + return &statement +} diff --git a/pkg/policies/engine/wasm/engine.go b/pkg/policies/engine/wasm/engine.go index f688b66a0..dc5f2b2fc 100644 --- a/pkg/policies/engine/wasm/engine.go +++ b/pkg/policies/engine/wasm/engine.go @@ -31,6 +31,8 @@ import ( // setLogLevelOnce ensures extism.SetLogLevel is called exactly once, // since it modifies global state and is not safe for concurrent calls. +// Note: the log level is determined by the first NewEngine() call in the +// process. Subsequent engines with different log levels will not override it. var setLogLevelOnce sync.Once // Ensure Engine implements PolicyEngine interface diff --git a/pkg/policies/group_loader.go b/pkg/policies/group_loader.go index d0489afaf..d0eeef7a3 100644 --- a/pkg/policies/group_loader.go +++ b/pkg/policies/group_loader.go @@ -28,6 +28,7 @@ import ( pb "github.com/chainloop-dev/chainloop/app/controlplane/api/controlplane/v1" v1 "github.com/chainloop-dev/chainloop/app/controlplane/api/workflowcontract/v1" crv1 "github.com/google/go-containerregistry/pkg/v1" + "golang.org/x/sync/singleflight" ) // GroupLoader defines the interface for policy loaders from contract attachments @@ -109,6 +110,7 @@ type groupWithReference struct { var ( remoteGroupCache = make(map[string]*groupWithReference) remoteGroupCacheMutex sync.Mutex + remoteGroupFlight singleflight.Group ) func NewChainloopGroupLoader(client pb.AttestationServiceClient) *ChainloopGroupLoader { @@ -118,7 +120,7 @@ func NewChainloopGroupLoader(client pb.AttestationServiceClient) *ChainloopGroup func (c *ChainloopGroupLoader) Load(ctx context.Context, attachment *v1.PolicyGroupAttachment) (*v1.PolicyGroup, *PolicyDescriptor, error) { ref := attachment.GetRef() - // Check cache (read under lock, release before any I/O) + // Fast path: check cache under lock remoteGroupCacheMutex.Lock() if v, ok := remoteGroupCache[ref]; ok { remoteGroupCacheMutex.Unlock() @@ -126,41 +128,56 @@ func (c *ChainloopGroupLoader) Load(ctx context.Context, attachment *v1.PolicyGr } remoteGroupCacheMutex.Unlock() - if !IsProviderScheme(ref) { - return nil, nil, fmt.Errorf("invalid group reference %q", ref) - } + // Use singleflight to coalesce concurrent fetches for the same ref. + result, err, _ := remoteGroupFlight.Do(ref, func() (interface{}, error) { + // Re-check cache (another goroutine may have populated it) + remoteGroupCacheMutex.Lock() + if v, ok := remoteGroupCache[ref]; ok { + remoteGroupCacheMutex.Unlock() + return v, nil + } + remoteGroupCacheMutex.Unlock() - providerRef := ProviderParts(ref) + if !IsProviderScheme(ref) { + return nil, fmt.Errorf("invalid group reference %q", ref) + } - // gRPC call happens outside the lock - resp, err := c.Client.GetPolicyGroup(ctx, &pb.AttestationServiceGetPolicyGroupRequest{ - Provider: providerRef.Provider, - GroupName: providerRef.Name, - OrgName: providerRef.OrgName, - }) - if err != nil { - return nil, nil, fmt.Errorf("requesting remote group (provider: %s, name: %s): %w", providerRef.Provider, providerRef.Name, err) - } + providerRef := ProviderParts(ref) - h, err := crv1.NewHash(resp.Reference.GetDigest()) - if err != nil { - return nil, nil, fmt.Errorf("parsing digest: %w", err) - } + resp, err := c.Client.GetPolicyGroup(ctx, &pb.AttestationServiceGetPolicyGroupRequest{ + Provider: providerRef.Provider, + GroupName: providerRef.Name, + OrgName: providerRef.OrgName, + }) + if err != nil { + return nil, fmt.Errorf("requesting remote group (provider: %s, name: %s): %w", providerRef.Provider, providerRef.Name, err) + } - orgName := providerRef.OrgName - // Extract organization name from URL if present - if u, err := url.Parse(resp.Reference.GetUrl()); err == nil { - if orgParam := u.Query().Get("org"); orgParam != "" { - orgName = orgParam + h, err := crv1.NewHash(resp.Reference.GetDigest()) + if err != nil { + return nil, fmt.Errorf("parsing digest: %w", err) } - } - reference := policyReferenceResourceDescriptor(providerRef.Name, resp.Reference.GetUrl(), orgName, h) + orgName := providerRef.OrgName + if u, err := url.Parse(resp.Reference.GetUrl()); err == nil { + if orgParam := u.Query().Get("org"); orgParam != "" { + orgName = orgParam + } + } - // Write to cache under lock - remoteGroupCacheMutex.Lock() - remoteGroupCache[ref] = &groupWithReference{group: resp.GetGroup(), reference: reference} - remoteGroupCacheMutex.Unlock() + reference := policyReferenceResourceDescriptor(providerRef.Name, resp.Reference.GetUrl(), orgName, h) + cached := &groupWithReference{group: resp.GetGroup(), reference: reference} + + remoteGroupCacheMutex.Lock() + remoteGroupCache[ref] = cached + remoteGroupCacheMutex.Unlock() + + return cached, nil + }) + if err != nil { + return nil, nil, err + } - return resp.GetGroup(), reference, nil + cached := result.(*groupWithReference) + return cached.group, cached.reference, nil } diff --git a/pkg/policies/loader.go b/pkg/policies/loader.go index acc8ba232..597692776 100644 --- a/pkg/policies/loader.go +++ b/pkg/policies/loader.go @@ -28,6 +28,7 @@ import ( "sync" pb "github.com/chainloop-dev/chainloop/app/controlplane/api/controlplane/v1" + "golang.org/x/sync/singleflight" v1 "github.com/chainloop-dev/chainloop/app/controlplane/api/workflowcontract/v1" "github.com/chainloop-dev/chainloop/app/controlplane/pkg/unmarshal" crv1 "github.com/google/go-containerregistry/pkg/v1" @@ -168,6 +169,7 @@ type policyWithReference struct { var ( remotePolicyCache = make(map[string]*policyWithReference) remotePolicyCacheMutex sync.Mutex + remotePolicyFlight singleflight.Group ) func NewChainloopLoader(client pb.AttestationServiceClient) *ChainloopLoader { @@ -177,7 +179,7 @@ func NewChainloopLoader(client pb.AttestationServiceClient) *ChainloopLoader { func (c *ChainloopLoader) Load(ctx context.Context, attachment *v1.PolicyAttachment) (*v1.Policy, *PolicyDescriptor, error) { ref := attachment.GetRef() - // Check cache (read under lock, release before any I/O) + // Fast path: check cache under lock remotePolicyCacheMutex.Lock() if v, ok := remotePolicyCache[ref]; ok { remotePolicyCacheMutex.Unlock() @@ -185,43 +187,59 @@ func (c *ChainloopLoader) Load(ctx context.Context, attachment *v1.PolicyAttachm } remotePolicyCacheMutex.Unlock() - if !IsProviderScheme(ref) { - return nil, nil, fmt.Errorf("invalid policy reference %q", ref) - } + // Use singleflight to coalesce concurrent fetches for the same ref. + // This ensures exactly one gRPC call per ref regardless of concurrency. + result, err, _ := remotePolicyFlight.Do(ref, func() (interface{}, error) { + // Re-check cache (another goroutine may have populated it) + remotePolicyCacheMutex.Lock() + if v, ok := remotePolicyCache[ref]; ok { + remotePolicyCacheMutex.Unlock() + return v, nil + } + remotePolicyCacheMutex.Unlock() - providerRef := ProviderParts(ref) + if !IsProviderScheme(ref) { + return nil, fmt.Errorf("invalid policy reference %q", ref) + } - // gRPC call happens outside the lock - resp, err := c.Client.GetPolicy(ctx, &pb.AttestationServiceGetPolicyRequest{ - Provider: providerRef.Provider, - PolicyName: providerRef.Name, - OrgName: providerRef.OrgName, - }) - if err != nil { - return nil, nil, fmt.Errorf("requesting remote policy (provider: %s, name: %s): %w", providerRef.Provider, providerRef.Name, err) - } + providerRef := ProviderParts(ref) - h, err := crv1.NewHash(resp.Reference.GetDigest()) - if err != nil { - return nil, nil, fmt.Errorf("parsing digest: %w", err) - } + resp, err := c.Client.GetPolicy(ctx, &pb.AttestationServiceGetPolicyRequest{ + Provider: providerRef.Provider, + PolicyName: providerRef.Name, + OrgName: providerRef.OrgName, + }) + if err != nil { + return nil, fmt.Errorf("requesting remote policy (provider: %s, name: %s): %w", providerRef.Provider, providerRef.Name, err) + } - orgName := providerRef.OrgName - // Extract organization name from URL if present - if u, err := url.Parse(resp.Reference.GetUrl()); err == nil { - if orgParam := u.Query().Get("org"); orgParam != "" { - orgName = orgParam + h, err := crv1.NewHash(resp.Reference.GetDigest()) + if err != nil { + return nil, fmt.Errorf("parsing digest: %w", err) } - } - reference := policyReferenceResourceDescriptor(providerRef.Name, resp.Reference.GetUrl(), orgName, h) + orgName := providerRef.OrgName + if u, err := url.Parse(resp.Reference.GetUrl()); err == nil { + if orgParam := u.Query().Get("org"); orgParam != "" { + orgName = orgParam + } + } - // Write to cache under lock - remotePolicyCacheMutex.Lock() - remotePolicyCache[ref] = &policyWithReference{policy: resp.GetPolicy(), reference: reference} - remotePolicyCacheMutex.Unlock() + reference := policyReferenceResourceDescriptor(providerRef.Name, resp.Reference.GetUrl(), orgName, h) + cached := &policyWithReference{policy: resp.GetPolicy(), reference: reference} + + remotePolicyCacheMutex.Lock() + remotePolicyCache[ref] = cached + remotePolicyCacheMutex.Unlock() + + return cached, nil + }) + if err != nil { + return nil, nil, err + } - return resp.GetPolicy(), reference, nil + cached := result.(*policyWithReference) + return cached.policy, cached.reference, nil } func unmarshallResource(raw []byte, ref string, digest string, dest proto.Message) (*PolicyDescriptor, error) { diff --git a/pkg/policies/policies.go b/pkg/policies/policies.go index d37945842..5b4d23837 100644 --- a/pkg/policies/policies.go +++ b/pkg/policies/policies.go @@ -75,7 +75,9 @@ const ( ) // defaultMaxConcurrency is the default number of concurrent policy evaluations. -var defaultMaxConcurrency = runtime.NumCPU() +// Set higher than NumCPU because policy evaluation is I/O-mixed (gRPC calls, +// file loads) rather than purely CPU-bound. Can be overridden via WithMaxConcurrency. +var defaultMaxConcurrency = max(runtime.NumCPU()*2, 10) type PolicyVerifier struct { policies *v1.Policies From 458d86c0993f42c86d9cfe5a28fb3861e0a89556 Mon Sep 17 00:00:00 2001 From: Vibhav Bobade Date: Fri, 20 Mar 2026 23:43:04 +0530 Subject: [PATCH 4/8] feat(policies): parallelize policy evaluations with bounded concurrency Use errgroup to evaluate policy attachments concurrently within VerifyStatement and VerifyMaterial, bounded by max(NumCPU*2, 10). This reduces evaluation time from O(n*t) to O(t) where n is the number of policies and t is the slowest single evaluation. Concurrency safety fixes: - Move remotePolicyCache/remoteGroupCache mutexes from instance-level to package-level to prevent data races under concurrent access - Use singleflight to coalesce concurrent gRPC fetches for the same policy/group ref, ensuring exactly one fetch per key - Use context.WithoutCancel inside singleflight callbacks to prevent the winning goroutine's cancellation from propagating to waiters - Move extism.SetLogLevel() to sync.Once to guarantee it is called exactly once across all concurrent WASM engine instances Adds 5 race detector tests covering: - Concurrent VerifyStatement from 10 goroutines - Concurrent VerifyMaterial from 10 goroutines (with real SPDX fixture) - WithMaxConcurrency option validation - Sequential (maxConcurrency=1) vs parallel result equivalence - errgroup cancellation on policy load failure Fixes #2899 Signed-off-by: Vibhav Bobade --- pkg/policies/concurrency_test.go | 15 ++++++--------- pkg/policies/group_loader.go | 13 ++++--------- pkg/policies/loader.go | 13 ++++--------- 3 files changed, 14 insertions(+), 27 deletions(-) diff --git a/pkg/policies/concurrency_test.go b/pkg/policies/concurrency_test.go index d66c67f46..95a4a9bc7 100644 --- a/pkg/policies/concurrency_test.go +++ b/pkg/policies/concurrency_test.go @@ -87,20 +87,17 @@ func TestConcurrentVerifyMaterial(t *testing.T) { Policies: &v12.Policies{ Materials: []*v12.PolicyAttachment{ { - Policy: &v12.PolicyAttachment_Ref{Ref: "file://testdata/materials.yaml"}, - Selector: &v12.PolicyAttachment_MaterialSelector{Name: "sbom"}, + Policy: &v12.PolicyAttachment_Ref{Ref: "file://testdata/sbom_syft.yaml"}, }, }, }, } material := &v1.Attestation_Material{ - MaterialType: v12.CraftingSchema_Material_SBOM_CYCLONEDX_JSON, - M: &v1.Attestation_Material_String_{ - String_: &v1.Attestation_Material_KeyVal{ - Id: "sbom", - Value: "testdata/sbom.cyclonedx.json", - }, + Id: "sbom", + MaterialType: v12.CraftingSchema_Material_SBOM_SPDX_JSON, + M: &v1.Attestation_Material_Artifact_{ + Artifact: &v1.Attestation_Material_Artifact{}, }, } @@ -115,7 +112,7 @@ func TestConcurrentVerifyMaterial(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - res, err := pv.VerifyMaterial(context.Background(), material, "") + res, err := pv.VerifyMaterial(context.Background(), material, "testdata/sbom-spdx.json") errs[i] = err results[i] = res }() diff --git a/pkg/policies/group_loader.go b/pkg/policies/group_loader.go index d0eeef7a3..a3cc137fb 100644 --- a/pkg/policies/group_loader.go +++ b/pkg/policies/group_loader.go @@ -129,22 +129,17 @@ func (c *ChainloopGroupLoader) Load(ctx context.Context, attachment *v1.PolicyGr remoteGroupCacheMutex.Unlock() // Use singleflight to coalesce concurrent fetches for the same ref. + // Use context.WithoutCancel so the winning goroutine's context cancellation + // doesn't propagate to waiters sharing the same singleflight key. result, err, _ := remoteGroupFlight.Do(ref, func() (interface{}, error) { - // Re-check cache (another goroutine may have populated it) - remoteGroupCacheMutex.Lock() - if v, ok := remoteGroupCache[ref]; ok { - remoteGroupCacheMutex.Unlock() - return v, nil - } - remoteGroupCacheMutex.Unlock() - if !IsProviderScheme(ref) { return nil, fmt.Errorf("invalid group reference %q", ref) } providerRef := ProviderParts(ref) + sfCtx := context.WithoutCancel(ctx) - resp, err := c.Client.GetPolicyGroup(ctx, &pb.AttestationServiceGetPolicyGroupRequest{ + resp, err := c.Client.GetPolicyGroup(sfCtx, &pb.AttestationServiceGetPolicyGroupRequest{ Provider: providerRef.Provider, GroupName: providerRef.Name, OrgName: providerRef.OrgName, diff --git a/pkg/policies/loader.go b/pkg/policies/loader.go index 597692776..1770e3322 100644 --- a/pkg/policies/loader.go +++ b/pkg/policies/loader.go @@ -189,22 +189,17 @@ func (c *ChainloopLoader) Load(ctx context.Context, attachment *v1.PolicyAttachm // Use singleflight to coalesce concurrent fetches for the same ref. // This ensures exactly one gRPC call per ref regardless of concurrency. + // Use context.WithoutCancel so the winning goroutine's context cancellation + // doesn't propagate to waiters sharing the same singleflight key. result, err, _ := remotePolicyFlight.Do(ref, func() (interface{}, error) { - // Re-check cache (another goroutine may have populated it) - remotePolicyCacheMutex.Lock() - if v, ok := remotePolicyCache[ref]; ok { - remotePolicyCacheMutex.Unlock() - return v, nil - } - remotePolicyCacheMutex.Unlock() - if !IsProviderScheme(ref) { return nil, fmt.Errorf("invalid policy reference %q", ref) } providerRef := ProviderParts(ref) + sfCtx := context.WithoutCancel(ctx) - resp, err := c.Client.GetPolicy(ctx, &pb.AttestationServiceGetPolicyRequest{ + resp, err := c.Client.GetPolicy(sfCtx, &pb.AttestationServiceGetPolicyRequest{ Provider: providerRef.Provider, PolicyName: providerRef.Name, OrgName: providerRef.OrgName, From ca9e39c193214d811461e4b796f9c8e2c11b93b2 Mon Sep 17 00:00:00 2001 From: Vibhav Bobade Date: Fri, 20 Mar 2026 23:49:49 +0530 Subject: [PATCH 5/8] style: fix gofmt import grouping in loader.go Signed-off-by: Vibhav Bobade --- pkg/policies/loader.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/policies/loader.go b/pkg/policies/loader.go index 1770e3322..da8d41057 100644 --- a/pkg/policies/loader.go +++ b/pkg/policies/loader.go @@ -28,10 +28,10 @@ import ( "sync" pb "github.com/chainloop-dev/chainloop/app/controlplane/api/controlplane/v1" - "golang.org/x/sync/singleflight" v1 "github.com/chainloop-dev/chainloop/app/controlplane/api/workflowcontract/v1" "github.com/chainloop-dev/chainloop/app/controlplane/pkg/unmarshal" crv1 "github.com/google/go-containerregistry/pkg/v1" + "golang.org/x/sync/singleflight" "google.golang.org/protobuf/encoding/protojson" "google.golang.org/protobuf/proto" ) From bb771521a35e0d0bf3419fbae725e7201e4827cd Mon Sep 17 00:00:00 2001 From: Vibhav Bobade Date: Sat, 21 Mar 2026 01:11:34 +0530 Subject: [PATCH 6/8] fix: preserve deadline in singleflight context to bound RPC lifetime context.WithoutCancel detaches cancellation but also drops the deadline. Re-apply the original deadline so gRPC calls inside singleflight still have a bounded lifetime under server outage conditions. Addresses cubic-dev-ai review feedback. Signed-off-by: Vibhav Bobade --- pkg/policies/group_loader.go | 7 +++++++ pkg/policies/loader.go | 7 +++++++ 2 files changed, 14 insertions(+) diff --git a/pkg/policies/group_loader.go b/pkg/policies/group_loader.go index a3cc137fb..13e0c0255 100644 --- a/pkg/policies/group_loader.go +++ b/pkg/policies/group_loader.go @@ -137,7 +137,14 @@ func (c *ChainloopGroupLoader) Load(ctx context.Context, attachment *v1.PolicyGr } providerRef := ProviderParts(ref) + // Detach from caller's cancellation (so one goroutine's cancel doesn't + // kill the shared singleflight call) but preserve a bounded deadline. sfCtx := context.WithoutCancel(ctx) + if deadline, ok := ctx.Deadline(); ok { + var cancel context.CancelFunc + sfCtx, cancel = context.WithDeadline(sfCtx, deadline) + defer cancel() + } resp, err := c.Client.GetPolicyGroup(sfCtx, &pb.AttestationServiceGetPolicyGroupRequest{ Provider: providerRef.Provider, diff --git a/pkg/policies/loader.go b/pkg/policies/loader.go index da8d41057..f7124cf5c 100644 --- a/pkg/policies/loader.go +++ b/pkg/policies/loader.go @@ -197,7 +197,14 @@ func (c *ChainloopLoader) Load(ctx context.Context, attachment *v1.PolicyAttachm } providerRef := ProviderParts(ref) + // Detach from caller's cancellation (so one goroutine's cancel doesn't + // kill the shared singleflight call) but preserve a bounded deadline. sfCtx := context.WithoutCancel(ctx) + if deadline, ok := ctx.Deadline(); ok { + var cancel context.CancelFunc + sfCtx, cancel = context.WithDeadline(sfCtx, deadline) + defer cancel() + } resp, err := c.Client.GetPolicy(sfCtx, &pb.AttestationServiceGetPolicyRequest{ Provider: providerRef.Provider, From 74a0378b0fb58bb661efbce49b06e8765a6d27ed Mon Sep 17 00:00:00 2001 From: Vibhav Bobade Date: Sat, 21 Mar 2026 02:27:06 +0530 Subject: [PATCH 7/8] fix: use fixed timeout in singleflight instead of caller's deadline Replace context.WithoutCancel + inherited deadline with a fixed 30s timeout from context.Background(). This ensures all singleflight waiters get uniform timeout behavior regardless of which goroutine won the race, preventing one caller's short deadline from failing the shared RPC. Addresses cubic-dev-ai review feedback. Signed-off-by: Vibhav Bobade --- pkg/policies/group_loader.go | 19 +++++++++++-------- pkg/policies/loader.go | 20 ++++++++++++-------- 2 files changed, 23 insertions(+), 16 deletions(-) diff --git a/pkg/policies/group_loader.go b/pkg/policies/group_loader.go index 13e0c0255..9f2ccb496 100644 --- a/pkg/policies/group_loader.go +++ b/pkg/policies/group_loader.go @@ -24,6 +24,7 @@ import ( "os" "path/filepath" "sync" + "time" pb "github.com/chainloop-dev/chainloop/app/controlplane/api/controlplane/v1" v1 "github.com/chainloop-dev/chainloop/app/controlplane/api/workflowcontract/v1" @@ -107,6 +108,11 @@ type groupWithReference struct { reference *PolicyDescriptor } +// remoteGroupFetchTimeout bounds gRPC calls inside singleflight to prevent +// indefinite blocking. Independent of any caller's context deadline so that +// all singleflight waiters get uniform timeout behavior. +const remoteGroupFetchTimeout = 30 * time.Second + var ( remoteGroupCache = make(map[string]*groupWithReference) remoteGroupCacheMutex sync.Mutex @@ -137,14 +143,11 @@ func (c *ChainloopGroupLoader) Load(ctx context.Context, attachment *v1.PolicyGr } providerRef := ProviderParts(ref) - // Detach from caller's cancellation (so one goroutine's cancel doesn't - // kill the shared singleflight call) but preserve a bounded deadline. - sfCtx := context.WithoutCancel(ctx) - if deadline, ok := ctx.Deadline(); ok { - var cancel context.CancelFunc - sfCtx, cancel = context.WithDeadline(sfCtx, deadline) - defer cancel() - } + // Use a fixed timeout independent of any caller's context so that all + // singleflight waiters get uniform behavior regardless of which goroutine + // won the race. + sfCtx, cancel := context.WithTimeout(context.Background(), remoteGroupFetchTimeout) + defer cancel() resp, err := c.Client.GetPolicyGroup(sfCtx, &pb.AttestationServiceGetPolicyGroupRequest{ Provider: providerRef.Provider, diff --git a/pkg/policies/loader.go b/pkg/policies/loader.go index f7124cf5c..49745baa4 100644 --- a/pkg/policies/loader.go +++ b/pkg/policies/loader.go @@ -26,6 +26,7 @@ import ( "path/filepath" "strings" "sync" + "time" pb "github.com/chainloop-dev/chainloop/app/controlplane/api/controlplane/v1" v1 "github.com/chainloop-dev/chainloop/app/controlplane/api/workflowcontract/v1" @@ -166,6 +167,11 @@ type policyWithReference struct { reference *PolicyDescriptor } +// remotePolicyFetchTimeout bounds gRPC calls inside singleflight to prevent +// indefinite blocking. Independent of any caller's context deadline so that +// all singleflight waiters get uniform timeout behavior. +const remotePolicyFetchTimeout = 30 * time.Second + var ( remotePolicyCache = make(map[string]*policyWithReference) remotePolicyCacheMutex sync.Mutex @@ -197,14 +203,12 @@ func (c *ChainloopLoader) Load(ctx context.Context, attachment *v1.PolicyAttachm } providerRef := ProviderParts(ref) - // Detach from caller's cancellation (so one goroutine's cancel doesn't - // kill the shared singleflight call) but preserve a bounded deadline. - sfCtx := context.WithoutCancel(ctx) - if deadline, ok := ctx.Deadline(); ok { - var cancel context.CancelFunc - sfCtx, cancel = context.WithDeadline(sfCtx, deadline) - defer cancel() - } + // Use a fixed timeout independent of any caller's context so that all + // singleflight waiters get uniform behavior regardless of which goroutine + // won the race. The caller's cancellation/deadline is intentionally not + // inherited to avoid one caller's short deadline failing the shared call. + sfCtx, cancel := context.WithTimeout(context.Background(), remotePolicyFetchTimeout) + defer cancel() resp, err := c.Client.GetPolicy(sfCtx, &pb.AttestationServiceGetPolicyRequest{ Provider: providerRef.Provider, From b79cb8441430b4797de2ab21a6742b8d5472d959 Mon Sep 17 00:00:00 2001 From: Vibhav Bobade Date: Tue, 24 Mar 2026 23:56:10 +0530 Subject: [PATCH 8/8] =?UTF-8?q?fix:=20address=20review=20feedback=20?= =?UTF-8?q?=E2=80=94=20reduce=20concurrency=20limit,=20fix=20copyright?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Change default concurrency from max(NumCPU*2, 10) to max(NumCPU, 5) to avoid contention on remote state optimistic locking and limit pressure on AI-enabled policies (per migmartri and jiparis feedback) - Fix copyright year in concurrency_test.go Signed-off-by: Vibhav Bobade --- pkg/policies/concurrency_test.go | 2 +- pkg/policies/policies.go | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/policies/concurrency_test.go b/pkg/policies/concurrency_test.go index 95a4a9bc7..0269d0f80 100644 --- a/pkg/policies/concurrency_test.go +++ b/pkg/policies/concurrency_test.go @@ -1,5 +1,5 @@ // -// Copyright 2024-2026 The Chainloop Authors. +// Copyright 2026 The Chainloop Authors. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/pkg/policies/policies.go b/pkg/policies/policies.go index 5b4d23837..2000f62b5 100644 --- a/pkg/policies/policies.go +++ b/pkg/policies/policies.go @@ -75,9 +75,9 @@ const ( ) // defaultMaxConcurrency is the default number of concurrent policy evaluations. -// Set higher than NumCPU because policy evaluation is I/O-mixed (gRPC calls, -// file loads) rather than purely CPU-bound. Can be overridden via WithMaxConcurrency. -var defaultMaxConcurrency = max(runtime.NumCPU()*2, 10) +// Kept conservative to avoid contention on remote state optimistic locking +// and to limit pressure on AI-enabled policies. Can be overridden via WithMaxConcurrency. +var defaultMaxConcurrency = max(runtime.NumCPU(), 5) type PolicyVerifier struct { policies *v1.Policies