From ffc198180c04dcbff6d395c4f6385f6844ac89cb Mon Sep 17 00:00:00 2001 From: Miguel Martinez Trivino Date: Sat, 28 Mar 2026 18:53:17 +0100 Subject: [PATCH] feat: resolve policy evaluations from CAS in workflow-run View API Update WorkflowRunService.View() to resolve policy evaluations from the CAS-stored bundle (via PolicyEvaluationsRef) instead of relying solely on inline attestation predicate data. This prepares consumers for the eventual removal of inline policy evaluation content. On any CAS resolution failure, gracefully falls back to inline evaluations with a warning log. Closes #2950 Signed-off-by: Miguel Martinez Trivino --- app/controlplane/cmd/wire.go | 17 +++- app/controlplane/cmd/wire_gen.go | 29 +++++- .../internal/service/attestation.go | 7 +- .../internal/service/workflowrun.go | 78 ++++++++++++++- pkg/attestation/renderer/chainloop/v02.go | 27 ++++-- .../renderer/chainloop/v02_test.go | 97 +++++++++++++++++++ pkg/cache/cache.go | 18 ++-- pkg/cache/natskv.go | 7 +- 8 files changed, 251 insertions(+), 29 deletions(-) diff --git a/app/controlplane/cmd/wire.go b/app/controlplane/cmd/wire.go index baffb49bb..f72db8db9 100644 --- a/app/controlplane/cmd/wire.go +++ b/app/controlplane/cmd/wire.go @@ -138,12 +138,13 @@ func newAuthAllowList(conf *conf.Bootstrap) *pkgConf.AllowList { var cacheProviderSet = wire.NewSet( newMembershipsCache, newClaimsCache, + newPolicyEvalBundleCache, ) func newClaimsCache(conn *nats.Conn, logger log.Logger) (cache.Cache[*jwt.MapClaims], error) { l := log.NewHelper(logger) backend := "memory" - opts := []cache.Option{cache.WithTTL(10 * time.Second), cache.WithLogger(&kratosLogAdapter{h: l})} + opts := []cache.Option{cache.WithTTL(10 * time.Second), cache.WithLogger(&kratosLogAdapter{h: l}), cache.WithDescription("Cache for JWT claims")} if conn != nil { backend = "nats" opts = append(opts, cache.WithNATS(conn, "chainloop-jwt-claims")) @@ -155,7 +156,7 @@ func newClaimsCache(conn *nats.Conn, logger log.Logger) (cache.Cache[*jwt.MapCla func newMembershipsCache(conn *nats.Conn, logger log.Logger) (cache.Cache[*entities.Membership], error) { l := log.NewHelper(logger) backend := "memory" - opts := []cache.Option{cache.WithTTL(time.Second), cache.WithLogger(&kratosLogAdapter{h: l})} + opts := []cache.Option{cache.WithTTL(time.Second), cache.WithLogger(&kratosLogAdapter{h: l}), cache.WithDescription("Cache for org memberships")} if conn != nil { backend = "nats" opts = append(opts, cache.WithNATS(conn, "chainloop-memberships")) @@ -164,6 +165,18 @@ func newMembershipsCache(conn *nats.Conn, logger log.Logger) (cache.Cache[*entit return cache.New[*entities.Membership](opts...) } +func newPolicyEvalBundleCache(conn *nats.Conn, logger log.Logger) (cache.Cache[[]byte], error) { + l := log.NewHelper(logger) + backend := "memory" + opts := []cache.Option{cache.WithTTL(24 * time.Hour), cache.WithLogger(&kratosLogAdapter{h: l}), cache.WithDescription("Cache for policy evaluation bundles from CAS")} + if conn != nil { + backend = "nats" + opts = append(opts, cache.WithNATS(conn, "chainloop-policy-eval-bundles")) + } + l.Infow("msg", "cache initialized", "bucket", "chainloop-policy-eval-bundles", "backend", backend, "ttl", "24h") + return cache.New[[]byte](opts...) +} + // kratosLogAdapter adapts kratos log.Helper (Debugw(...interface{})) to cache.Logger (Debugw(string, ...any)). type kratosLogAdapter struct{ h *log.Helper } diff --git a/app/controlplane/cmd/wire_gen.go b/app/controlplane/cmd/wire_gen.go index 8182c9b6a..8836763cc 100644 --- a/app/controlplane/cmd/wire_gen.go +++ b/app/controlplane/cmd/wire_gen.go @@ -179,19 +179,27 @@ func wireApp(bootstrap *conf.Bootstrap, readerWriter credentials.ReaderWriter, l cleanup() return nil, nil, err } + casMappingRepo := data.NewCASMappingRepo(dataData, casBackendRepo, logger) + casMappingUseCase := biz.NewCASMappingUseCase(casMappingRepo, membershipUseCase, logger) + cache2, err := newPolicyEvalBundleCache(conn, logger) + if err != nil { + cleanup() + return nil, nil, err + } newWorkflowRunServiceOpts := &service.NewWorkflowRunServiceOpts{ WorkflowRunUC: workflowRunUseCase, WorkflowUC: workflowUseCase, WorkflowContractUC: workflowContractUseCase, ProjectUC: projectUseCase, CredsReader: readerWriter, + CASClient: casClientUseCase, + CASMappingUC: casMappingUseCase, + PolicyEvalCache: cache2, Opts: v5, } workflowRunService := service.NewWorkflowRunService(newWorkflowRunServiceOpts) attestationUseCase := biz.NewAttestationUseCase(casClientUseCase, logger) fanOutDispatcher := dispatcher.New(integrationUseCase, workflowUseCase, workflowRunUseCase, readerWriter, casClientUseCase, availablePlugins, logger) - casMappingRepo := data.NewCASMappingRepo(dataData, casBackendRepo, logger) - casMappingUseCase := biz.NewCASMappingUseCase(casMappingRepo, membershipUseCase, logger) v6 := bootstrap.PrometheusIntegration orgMetricsRepo := data.NewOrgMetricsRepo(dataData, logger) orgMetricsUseCase, err := biz.NewOrgMetricsUseCase(orgMetricsRepo, organizationRepo, workflowUseCase, logger) @@ -398,12 +406,13 @@ func newAuthAllowList(conf2 *conf.Bootstrap) *v1.AllowList { var cacheProviderSet = wire.NewSet( newMembershipsCache, newClaimsCache, + newPolicyEvalBundleCache, ) func newClaimsCache(conn *nats.Conn, logger log.Logger) (cache.Cache[*jwt.MapClaims], error) { l := log.NewHelper(logger) backend := "memory" - opts := []cache.Option{cache.WithTTL(10 * time.Second), cache.WithLogger(&kratosLogAdapter{h: l})} + opts := []cache.Option{cache.WithTTL(10 * time.Second), cache.WithLogger(&kratosLogAdapter{h: l}), cache.WithDescription("Cache for JWT claims")} if conn != nil { backend = "nats" opts = append(opts, cache.WithNATS(conn, "chainloop-jwt-claims")) @@ -415,7 +424,7 @@ func newClaimsCache(conn *nats.Conn, logger log.Logger) (cache.Cache[*jwt.MapCla func newMembershipsCache(conn *nats.Conn, logger log.Logger) (cache.Cache[*entities.Membership], error) { l := log.NewHelper(logger) backend := "memory" - opts := []cache.Option{cache.WithTTL(time.Second), cache.WithLogger(&kratosLogAdapter{h: l})} + opts := []cache.Option{cache.WithTTL(time.Second), cache.WithLogger(&kratosLogAdapter{h: l}), cache.WithDescription("Cache for org memberships")} if conn != nil { backend = "nats" opts = append(opts, cache.WithNATS(conn, "chainloop-memberships")) @@ -424,6 +433,18 @@ func newMembershipsCache(conn *nats.Conn, logger log.Logger) (cache.Cache[*entit return cache.New[*entities.Membership](opts...) } +func newPolicyEvalBundleCache(conn *nats.Conn, logger log.Logger) (cache.Cache[[]byte], error) { + l := log.NewHelper(logger) + backend := "memory" + opts := []cache.Option{cache.WithTTL(24 * time.Hour), cache.WithLogger(&kratosLogAdapter{h: l}), cache.WithDescription("Cache for policy evaluation bundles from CAS")} + if conn != nil { + backend = "nats" + opts = append(opts, cache.WithNATS(conn, "chainloop-policy-eval-bundles")) + } + l.Infow("msg", "cache initialized", "bucket", "chainloop-policy-eval-bundles", "backend", backend, "ttl", "24h") + return cache.New[[]byte](opts...) +} + // kratosLogAdapter adapts kratos log.Helper (Debugw(...interface{})) to cache.Logger (Debugw(string, ...any)). type kratosLogAdapter struct{ h *log.Helper } diff --git a/app/controlplane/internal/service/attestation.go b/app/controlplane/internal/service/attestation.go index 815cc0eec..0d81bbd9e 100644 --- a/app/controlplane/internal/service/attestation.go +++ b/app/controlplane/internal/service/attestation.go @@ -533,7 +533,7 @@ func (s *AttestationService) GetPolicyGroup(ctx context.Context, req *cpAPI.Atte }}, nil } -func bizAttestationToPb(att *biz.Attestation) (*cpAPI.AttestationItem, error) { +func bizAttestationToPb(att *biz.Attestation, predicate chainloop.NormalizablePredicate) (*cpAPI.AttestationItem, error) { if att == nil || att.Envelope == nil { return nil, nil } @@ -543,11 +543,6 @@ func bizAttestationToPb(att *biz.Attestation) (*cpAPI.AttestationItem, error) { return nil, err } - predicate, err := chainloop.ExtractPredicate(att.Envelope) - if err != nil { - return nil, fmt.Errorf("error extracting predicate from attestation: %w", err) - } - materials, err := extractMaterials(predicate.GetMaterials()) if err != nil { return nil, fmt.Errorf("error extracting materials from attestation: %w", err) diff --git a/app/controlplane/internal/service/workflowrun.go b/app/controlplane/internal/service/workflowrun.go index 4c692ae18..3f5b39317 100644 --- a/app/controlplane/internal/service/workflowrun.go +++ b/app/controlplane/internal/service/workflowrun.go @@ -1,5 +1,5 @@ // -// Copyright 2024-2025 The Chainloop Authors. +// Copyright 2024-2026 The Chainloop Authors. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -16,6 +16,7 @@ package service import ( + "bytes" "context" "fmt" "slices" @@ -25,9 +26,12 @@ import ( "github.com/chainloop-dev/chainloop/app/controlplane/pkg/authz" "github.com/chainloop-dev/chainloop/app/controlplane/pkg/biz" "github.com/chainloop-dev/chainloop/app/controlplane/pkg/pagination" + chainloop "github.com/chainloop-dev/chainloop/pkg/attestation/renderer/chainloop" + "github.com/chainloop-dev/chainloop/pkg/cache" "github.com/chainloop-dev/chainloop/pkg/credentials" errors "github.com/go-kratos/kratos/v2/errors" "github.com/google/uuid" + intoto "github.com/in-toto/attestation/go/v1" "google.golang.org/protobuf/types/known/timestamppb" ) @@ -40,6 +44,9 @@ type WorkflowRunService struct { workflowContractUseCase *biz.WorkflowContractUseCase projectUseCase *biz.ProjectUseCase credsReader credentials.Reader + casClient biz.CASClient + casMappingUC *biz.CASMappingUseCase + policyEvalCache cache.Cache[[]byte] } type NewWorkflowRunServiceOpts struct { @@ -48,6 +55,9 @@ type NewWorkflowRunServiceOpts struct { WorkflowContractUC *biz.WorkflowContractUseCase ProjectUC *biz.ProjectUseCase CredsReader credentials.Reader + CASClient biz.CASClient + CASMappingUC *biz.CASMappingUseCase + PolicyEvalCache cache.Cache[[]byte] Opts []NewOpt } @@ -59,9 +69,56 @@ func NewWorkflowRunService(opts *NewWorkflowRunServiceOpts) *WorkflowRunService workflowContractUseCase: opts.WorkflowContractUC, projectUseCase: opts.ProjectUC, credsReader: opts.CredsReader, + casClient: opts.CASClient, + casMappingUC: opts.CASMappingUC, + policyEvalCache: opts.PolicyEvalCache, } } +type casResolvedPredicate struct { + chainloop.NormalizablePredicate + evals map[string][]*chainloop.PolicyEvaluation +} + +func (p *casResolvedPredicate) GetPolicyEvaluations() map[string][]*chainloop.PolicyEvaluation { + return p.evals +} + +func (s *WorkflowRunService) resolvePolicyEvaluations( + ctx context.Context, + ref *intoto.ResourceDescriptor, + orgID uuid.UUID, +) (map[string][]*chainloop.PolicyEvaluation, error) { + if ref == nil { + return nil, nil + } + + hexDigest, ok := ref.Digest["sha256"] + if !ok { + return nil, fmt.Errorf("no sha256 digest in policy evaluations ref") + } + digest := fmt.Sprintf("sha256:%s", hexDigest) + + if cached, found, err := s.policyEvalCache.Get(ctx, digest); err == nil && found { + return chainloop.PolicyEvaluationsFromBundle(cached) + } + + mapping, err := s.casMappingUC.FindCASMappingForDownloadByOrg(ctx, digest, []uuid.UUID{orgID}, nil) + if err != nil { + return nil, fmt.Errorf("finding CAS mapping: %w", err) + } + + var buf bytes.Buffer + if err := s.casClient.Download(ctx, string(mapping.CASBackend.Provider), mapping.CASBackend.SecretName, &buf, digest); err != nil { + return nil, fmt.Errorf("downloading policy eval bundle: %w", err) + } + + data := buf.Bytes() + _ = s.policyEvalCache.Set(ctx, digest, data) + + return chainloop.PolicyEvaluationsFromBundle(data) +} + func (s *WorkflowRunService) List(ctx context.Context, req *pb.WorkflowRunServiceListRequest) (*pb.WorkflowRunServiceListResponse, error) { currentOrg, err := requireCurrentOrg(ctx) if err != nil { @@ -185,7 +242,24 @@ func (s *WorkflowRunService) View(ctx context.Context, req *pb.WorkflowRunServic verificationResult = bizVerificationToPb(vr) } - attestation, err := bizAttestationToPb(run.Attestation) + var predicate chainloop.NormalizablePredicate + if run.Attestation != nil && run.Attestation.Envelope != nil { + predicate, err = chainloop.ExtractPredicate(run.Attestation.Envelope) + if err != nil { + return nil, handleUseCaseErr(err, s.log) + } + + if ref := predicate.GetPolicyEvaluationsRef(); ref != nil { + resolved, resolveErr := s.resolvePolicyEvaluations(ctx, ref, run.Workflow.OrgID) + if resolveErr != nil { + s.log.Warnw("msg", "failed to resolve policy evaluations from CAS, using inline", "err", resolveErr) + } else if resolved != nil { + predicate = &casResolvedPredicate{NormalizablePredicate: predicate, evals: resolved} + } + } + } + + attestation, err := bizAttestationToPb(run.Attestation, predicate) if err != nil { return nil, handleUseCaseErr(err, s.log) } diff --git a/pkg/attestation/renderer/chainloop/v02.go b/pkg/attestation/renderer/chainloop/v02.go index 44cdb3f73..56628c508 100644 --- a/pkg/attestation/renderer/chainloop/v02.go +++ b/pkg/attestation/renderer/chainloop/v02.go @@ -288,12 +288,27 @@ func (r *RendererV02) predicate() (*structpb.Struct, error) { return predicate, nil } -// collect all policy evaluations grouped by material and returns if there is a policy violation func mappedPolicyEvaluations(att *v1.Attestation) (map[string][]*PolicyEvaluation, bool, error) { - var hasPolicyViolations bool - result := map[string][]*PolicyEvaluation{} + return groupEvaluations(att.GetPolicyEvaluations()) +} + +// PolicyEvaluationsFromBundle deserializes a PolicyEvaluationBundle from protojson bytes +// and returns evaluations grouped by material name. +func PolicyEvaluationsFromBundle(data []byte) (map[string][]*PolicyEvaluation, error) { + var bundle v1.PolicyEvaluationBundle + if err := protojson.Unmarshal(data, &bundle); err != nil { + return nil, fmt.Errorf("unmarshaling policy evaluation bundle: %w", err) + } + + evals, _, err := groupEvaluations(bundle.GetEvaluations()) + return evals, err +} + +func groupEvaluations(evals []*v1.PolicyEvaluation) (map[string][]*PolicyEvaluation, bool, error) { + var hasViolations bool + result := make(map[string][]*PolicyEvaluation) - for _, p := range att.GetPolicyEvaluations() { + for _, p := range evals { keyName := p.MaterialName if keyName == "" { keyName = AttPolicyEvaluation @@ -305,13 +320,13 @@ func mappedPolicyEvaluations(att *v1.Attestation) (map[string][]*PolicyEvaluatio } if len(ev.Violations) > 0 { - hasPolicyViolations = true + hasViolations = true } result[keyName] = append(result[keyName], ev) } - return result, hasPolicyViolations, nil + return result, hasViolations, nil } func renderEvaluation(ev *v1.PolicyEvaluation) (*PolicyEvaluation, error) { diff --git a/pkg/attestation/renderer/chainloop/v02_test.go b/pkg/attestation/renderer/chainloop/v02_test.go index 73f0eff28..16195ef1f 100644 --- a/pkg/attestation/renderer/chainloop/v02_test.go +++ b/pkg/attestation/renderer/chainloop/v02_test.go @@ -395,6 +395,103 @@ func TestPredicatePolicyEvaluationsRef(t *testing.T) { } } +func TestPolicyEvaluationsFromBundle(t *testing.T) { + tests := []struct { + name string + bundle *api.PolicyEvaluationBundle + wantLen map[string]int // expected number of evaluations per material + wantErr bool + }{ + { + name: "groups evaluations by material name", + bundle: &api.PolicyEvaluationBundle{ + Evaluations: []*api.PolicyEvaluation{ + {Name: "check-sbom", MaterialName: "sbom"}, + {Name: "check-sarif", MaterialName: "sarif"}, + {Name: "check-sbom-format", MaterialName: "sbom"}, + }, + }, + wantLen: map[string]int{"sbom": 2, "sarif": 1}, + }, + { + name: "empty material name uses attestation key", + bundle: &api.PolicyEvaluationBundle{ + Evaluations: []*api.PolicyEvaluation{ + {Name: "att-policy", MaterialName: ""}, + }, + }, + wantLen: map[string]int{AttPolicyEvaluation: 1}, + }, + { + name: "empty bundle returns empty map", + bundle: &api.PolicyEvaluationBundle{}, + wantLen: map[string]int{}, + }, + { + name: "invalid data returns error", + bundle: nil, // we'll pass invalid bytes directly + wantErr: true, + }, + { + name: "preserves violations and fields", + bundle: &api.PolicyEvaluationBundle{ + Evaluations: []*api.PolicyEvaluation{ + { + Name: "vuln-check", + MaterialName: "image", + Description: "checks for vulns", + Violations: []*api.PolicyEvaluation_Violation{ + {Subject: "CVE-2024-1234", Message: "critical vuln found"}, + }, + Skipped: true, + SkipReasons: []string{"not applicable"}, + }, + }, + }, + wantLen: map[string]int{"image": 1}, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + var data []byte + if tc.bundle != nil { + var err error + data, err = protojson.Marshal(tc.bundle) + require.NoError(t, err) + } else if tc.wantErr { + data = []byte("not valid protojson") + } + + result, err := PolicyEvaluationsFromBundle(data) + if tc.wantErr { + assert.Error(t, err) + return + } + require.NoError(t, err) + + // Check grouping counts + assert.Len(t, result, len(tc.wantLen)) + for key, expectedCount := range tc.wantLen { + assert.Len(t, result[key], expectedCount, "key=%s", key) + } + + // For the violations test case, verify field mapping + if tc.name == "preserves violations and fields" { + ev := result["image"][0] + assert.Equal(t, "vuln-check", ev.Name) + assert.Equal(t, "image", ev.MaterialName) + assert.Equal(t, "checks for vulns", ev.Description) + assert.True(t, ev.Skipped) + assert.Equal(t, []string{"not applicable"}, ev.SkipReasons) + require.Len(t, ev.Violations, 1) + assert.Equal(t, "CVE-2024-1234", ev.Violations[0].Subject) + assert.Equal(t, "critical vuln found", ev.Violations[0].Message) + } + }) + } +} + func TestPolicyEvaluationsField(t *testing.T) { raw, err := os.ReadFile("testdata/attestation-pe-snake.json") require.NoError(t, err) diff --git a/pkg/cache/cache.go b/pkg/cache/cache.go index 40a33624f..8fdb4cb76 100644 --- a/pkg/cache/cache.go +++ b/pkg/cache/cache.go @@ -46,12 +46,13 @@ type Logger interface { const defaultMaxSize = 1000 type config struct { - ttl time.Duration - maxSize int - logger Logger - natsConn *nats.Conn - bucketName string - reconnCh <-chan struct{} + ttl time.Duration + maxSize int + logger Logger + natsConn *nats.Conn + bucketName string + description string + reconnCh <-chan struct{} } // Option configures cache construction. @@ -75,6 +76,11 @@ func WithNATS(conn *nats.Conn, bucketName string) Option { } } +// WithDescription sets the NATS KV bucket description. Ignored for in-memory backend. +func WithDescription(desc string) Option { + return func(c *config) { c.description = desc } +} + // WithReconnect provides a channel that signals NATS reconnection events. func WithReconnect(ch <-chan struct{}) Option { return func(c *config) { c.reconnCh = ch } diff --git a/pkg/cache/natskv.go b/pkg/cache/natskv.go index 109972db3..f9e275142 100644 --- a/pkg/cache/natskv.go +++ b/pkg/cache/natskv.go @@ -62,9 +62,10 @@ func (c *natsKVCache[T]) initBucket() error { } kv, err := js.CreateOrUpdateKeyValue(context.Background(), jetstream.KeyValueConfig{ - Bucket: c.bucket, - TTL: c.cfg.ttl, - Storage: jetstream.MemoryStorage, + Bucket: c.bucket, + Description: c.cfg.description, + TTL: c.cfg.ttl, + Storage: jetstream.MemoryStorage, }) if err != nil { return err