Skip to content
Merged
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,7 @@ require (
golang.org/x/crypto v0.46.0
golang.org/x/mod v0.31.0 // indirect
golang.org/x/net v0.48.0 // indirect
golang.org/x/sync v0.19.0 // indirect
golang.org/x/sync v0.19.0
golang.org/x/sys v0.40.0 // indirect
golang.org/x/text v0.32.0 // indirect
golang.org/x/time v0.14.0 // indirect
Expand Down
233 changes: 233 additions & 0 deletions pkg/policies/concurrency_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,233 @@
//
// Copyright 2026 The Chainloop Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package policies

import (
"context"
"os"
"sync"
"testing"

v12 "github.com/chainloop-dev/chainloop/app/controlplane/api/workflowcontract/v1"
v1 "github.com/chainloop-dev/chainloop/pkg/attestation/crafter/api/attestation/v1"
intoto "github.com/in-toto/attestation/go/v1"
"github.com/rs/zerolog"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/encoding/protojson"
)

// TestConcurrentVerifyStatement runs VerifyStatement from multiple goroutines
// to exercise errgroup parallelism and catch race conditions.
// Run with: go test -race -count=1 -run TestConcurrentVerifyStatement ./pkg/policies/...
func TestConcurrentVerifyStatement(t *testing.T) {
logger := zerolog.Nop()

schema := &v12.CraftingSchema{
Policies: &v12.Policies{
Attestation: []*v12.PolicyAttachment{
{Policy: &v12.PolicyAttachment_Ref{Ref: "file://testdata/workflow.yaml"}},
{Policy: &v12.PolicyAttachment_Ref{Ref: "file://testdata/materials.yaml"}},
},
},
}

statement := loadStatementForTest(t, "testdata/statement.json")

pv := NewPolicyVerifier(schema.GetPolicies(), nil, &logger)

// Run multiple VerifyStatement calls concurrently
const goroutines = 10
var wg sync.WaitGroup
errs := make([]error, goroutines)
results := make([][]*v1.PolicyEvaluation, goroutines)

for i := range goroutines {
wg.Add(1)
go func() {
defer wg.Done()
res, err := pv.VerifyStatement(context.Background(), statement)
errs[i] = err
results[i] = res
}()
}

wg.Wait()

// All calls should succeed
for i := range goroutines {
assert.NoError(t, errs[i], "goroutine %d failed", i)
}

// All calls should return the same number of evaluations
for i := 1; i < goroutines; i++ {
assert.Equal(t, len(results[0]), len(results[i]),
"goroutine %d returned different number of evaluations", i)
}
}

// TestConcurrentVerifyMaterial runs VerifyMaterial from multiple goroutines.
func TestConcurrentVerifyMaterial(t *testing.T) {
logger := zerolog.Nop()

schema := &v12.CraftingSchema{
Policies: &v12.Policies{
Materials: []*v12.PolicyAttachment{
{
Policy: &v12.PolicyAttachment_Ref{Ref: "file://testdata/sbom_syft.yaml"},
},
},
},
}

material := &v1.Attestation_Material{
Id: "sbom",
MaterialType: v12.CraftingSchema_Material_SBOM_SPDX_JSON,
M: &v1.Attestation_Material_Artifact_{
Artifact: &v1.Attestation_Material_Artifact{},
},
}

pv := NewPolicyVerifier(schema.GetPolicies(), nil, &logger)

const goroutines = 10
var wg sync.WaitGroup
errs := make([]error, goroutines)
results := make([][]*v1.PolicyEvaluation, goroutines)

for i := range goroutines {
wg.Add(1)
go func() {
defer wg.Done()
res, err := pv.VerifyMaterial(context.Background(), material, "testdata/sbom-spdx.json")
errs[i] = err
results[i] = res
}()
}

wg.Wait()

for i := range goroutines {
assert.NoError(t, errs[i], "goroutine %d failed", i)
}

for i := 1; i < goroutines; i++ {
assert.Equal(t, len(results[0]), len(results[i]),
"goroutine %d returned different number of evaluations", i)
}
}

// TestWithMaxConcurrency verifies the WithMaxConcurrency option is applied.
func TestWithMaxConcurrency(t *testing.T) {
logger := zerolog.Nop()

// Test default
pv := NewPolicyVerifier(nil, nil, &logger)
assert.Greater(t, pv.maxConcurrency, 0, "default maxConcurrency should be positive")

// Test custom value
pv = NewPolicyVerifier(nil, nil, &logger, WithMaxConcurrency(5))
assert.Equal(t, 5, pv.maxConcurrency)

// Test zero defaults to computed default
pv = NewPolicyVerifier(nil, nil, &logger, WithMaxConcurrency(0))
assert.Equal(t, defaultMaxConcurrency, pv.maxConcurrency)

// Test negative defaults to computed default
pv = NewPolicyVerifier(nil, nil, &logger, WithMaxConcurrency(-1))
assert.Equal(t, defaultMaxConcurrency, pv.maxConcurrency)
}

// TestVerifyStatementWithConcurrencyOne ensures sequential mode (maxConcurrency=1)
// produces identical results to default parallel mode.
func TestVerifyStatementWithConcurrencyOne(t *testing.T) {
logger := zerolog.Nop()

schema := &v12.CraftingSchema{
Policies: &v12.Policies{
Attestation: []*v12.PolicyAttachment{
{Policy: &v12.PolicyAttachment_Ref{Ref: "file://testdata/workflow.yaml"}},
{Policy: &v12.PolicyAttachment_Ref{Ref: "file://testdata/materials.yaml"}},
},
},
}

statement := loadStatementForTest(t, "testdata/statement.json")

// Sequential
pvSeq := NewPolicyVerifier(schema.GetPolicies(), nil, &logger, WithMaxConcurrency(1))
seqResults, seqErr := pvSeq.VerifyStatement(context.Background(), statement)

// Parallel (default)
pvPar := NewPolicyVerifier(schema.GetPolicies(), nil, &logger)
parResults, parErr := pvPar.VerifyStatement(context.Background(), statement)

require.NoError(t, seqErr)
require.NoError(t, parErr)
assert.Equal(t, len(seqResults), len(parResults),
"sequential and parallel should return same number of evaluations")

// Build name→result maps for comparison (order may differ)
seqByName := make(map[string]*v1.PolicyEvaluation)
for _, ev := range seqResults {
seqByName[ev.Name] = ev
}

for _, ev := range parResults {
seqEv, ok := seqByName[ev.Name]
assert.True(t, ok, "parallel result has policy %q not found in sequential", ev.Name)
if ok {
assert.Equal(t, len(seqEv.Violations), len(ev.Violations),
"policy %q: violation count mismatch", ev.Name)
assert.Equal(t, seqEv.Skipped, ev.Skipped,
"policy %q: skipped mismatch", ev.Name)
}
}
}

// TestErrGroupCancellation verifies that when one policy evaluation fails,
// the errgroup context is cancelled and propagated.
func TestErrGroupCancellation(t *testing.T) {
logger := zerolog.Nop()

schema := &v12.CraftingSchema{
Policies: &v12.Policies{
Attestation: []*v12.PolicyAttachment{
{Policy: &v12.PolicyAttachment_Ref{Ref: "file://testdata/workflow.yaml"}},
// This policy ref does not exist — will cause an error
{Policy: &v12.PolicyAttachment_Ref{Ref: "file://testdata/nonexistent_policy.yaml"}},
},
},
}

statement := loadStatementForTest(t, "testdata/statement.json")

pv := NewPolicyVerifier(schema.GetPolicies(), nil, &logger)
_, err := pv.VerifyStatement(context.Background(), statement)

assert.Error(t, err, "should fail when a policy file does not exist")
assert.IsType(t, &PolicyError{}, err, "error should be a PolicyError")
}

func loadStatementForTest(t *testing.T, file string) *intoto.Statement {
t.Helper()
stContent, err := os.ReadFile(file)
require.NoError(t, err)
var statement intoto.Statement
err = protojson.Unmarshal(stContent, &statement)
require.NoError(t, err)
return &statement
}
39 changes: 24 additions & 15 deletions pkg/policies/engine/wasm/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"context"
"encoding/json"
"fmt"
"sync"
"time"

"github.com/chainloop-dev/chainloop/pkg/policies/engine"
Expand All @@ -28,6 +29,12 @@ import (
"github.com/rs/zerolog"
)

// setLogLevelOnce ensures extism.SetLogLevel is called exactly once,
// since it modifies global state and is not safe for concurrent calls.
// Note: the log level is determined by the first NewEngine() call in the
// process. Subsequent engines with different log levels will not override it.
var setLogLevelOnce sync.Once

// Ensure Engine implements PolicyEngine interface
var _ engine.PolicyEngine = (*Engine)(nil)

Expand Down Expand Up @@ -59,6 +66,23 @@ func NewEngine(opts ...engine.Option) *Engine {
logger = &noopLogger
}

// Set WASM plugin log level exactly once across all engine instances.
// extism.SetLogLevel modifies global state and is not safe for concurrent calls.
setLogLevelOnce.Do(func() {
switch {
case logger.GetLevel() <= zerolog.TraceLevel:
extism.SetLogLevel(extism.LogLevelTrace)
case logger.GetLevel() <= zerolog.DebugLevel:
extism.SetLogLevel(extism.LogLevelDebug)
case logger.GetLevel() <= zerolog.InfoLevel:
extism.SetLogLevel(extism.LogLevelInfo)
case logger.GetLevel() <= zerolog.WarnLevel:
extism.SetLogLevel(extism.LogLevelWarn)
default:
extism.SetLogLevel(extism.LogLevelError)
}
})

return &Engine{
executionTimeout: executionTimeout,
logger: logger,
Expand All @@ -70,21 +94,6 @@ func NewEngine(opts ...engine.Option) *Engine {
func (e *Engine) Verify(ctx context.Context, policy *engine.Policy, input []byte, args map[string]any) (*engine.EvaluationResult, error) {
e.logger.Debug().Str("policy", policy.Name).Int("wasm_size", len(policy.Source)).Int("input_size", len(input)).Int("args_count", len(args)).Msg("Starting WASM policy execution")

// Enable WASM plugin logging based on logger level
// This allows LogInfo(), LogDebug(), etc. from the WASM policy to be visible
switch {
case e.logger.GetLevel() <= zerolog.TraceLevel:
extism.SetLogLevel(extism.LogLevelTrace)
case e.logger.GetLevel() <= zerolog.DebugLevel:
extism.SetLogLevel(extism.LogLevelDebug)
case e.logger.GetLevel() <= zerolog.InfoLevel:
extism.SetLogLevel(extism.LogLevelInfo)
case e.logger.GetLevel() <= zerolog.WarnLevel:
extism.SetLogLevel(extism.LogLevelWarn)
default:
extism.SetLogLevel(extism.LogLevelError)
}

// Prepare config with args if present
configMap := make(map[string]string)
if len(args) > 0 {
Expand Down
Loading
Loading