diff --git a/app/controlplane/cmd/wire.go b/app/controlplane/cmd/wire.go index baffb49bb..f72db8db9 100644 --- a/app/controlplane/cmd/wire.go +++ b/app/controlplane/cmd/wire.go @@ -138,12 +138,13 @@ func newAuthAllowList(conf *conf.Bootstrap) *pkgConf.AllowList { var cacheProviderSet = wire.NewSet( newMembershipsCache, newClaimsCache, + newPolicyEvalBundleCache, ) func newClaimsCache(conn *nats.Conn, logger log.Logger) (cache.Cache[*jwt.MapClaims], error) { l := log.NewHelper(logger) backend := "memory" - opts := []cache.Option{cache.WithTTL(10 * time.Second), cache.WithLogger(&kratosLogAdapter{h: l})} + opts := []cache.Option{cache.WithTTL(10 * time.Second), cache.WithLogger(&kratosLogAdapter{h: l}), cache.WithDescription("Cache for JWT claims")} if conn != nil { backend = "nats" opts = append(opts, cache.WithNATS(conn, "chainloop-jwt-claims")) @@ -155,7 +156,7 @@ func newClaimsCache(conn *nats.Conn, logger log.Logger) (cache.Cache[*jwt.MapCla func newMembershipsCache(conn *nats.Conn, logger log.Logger) (cache.Cache[*entities.Membership], error) { l := log.NewHelper(logger) backend := "memory" - opts := []cache.Option{cache.WithTTL(time.Second), cache.WithLogger(&kratosLogAdapter{h: l})} + opts := []cache.Option{cache.WithTTL(time.Second), cache.WithLogger(&kratosLogAdapter{h: l}), cache.WithDescription("Cache for org memberships")} if conn != nil { backend = "nats" opts = append(opts, cache.WithNATS(conn, "chainloop-memberships")) @@ -164,6 +165,18 @@ func newMembershipsCache(conn *nats.Conn, logger log.Logger) (cache.Cache[*entit return cache.New[*entities.Membership](opts...) } +func newPolicyEvalBundleCache(conn *nats.Conn, logger log.Logger) (cache.Cache[[]byte], error) { + l := log.NewHelper(logger) + backend := "memory" + opts := []cache.Option{cache.WithTTL(24 * time.Hour), cache.WithLogger(&kratosLogAdapter{h: l}), cache.WithDescription("Cache for policy evaluation bundles from CAS")} + if conn != nil { + backend = "nats" + opts = append(opts, cache.WithNATS(conn, "chainloop-policy-eval-bundles")) + } + l.Infow("msg", "cache initialized", "bucket", "chainloop-policy-eval-bundles", "backend", backend, "ttl", "24h") + return cache.New[[]byte](opts...) +} + // kratosLogAdapter adapts kratos log.Helper (Debugw(...interface{})) to cache.Logger (Debugw(string, ...any)). type kratosLogAdapter struct{ h *log.Helper } diff --git a/app/controlplane/cmd/wire_gen.go b/app/controlplane/cmd/wire_gen.go index 8182c9b6a..8836763cc 100644 --- a/app/controlplane/cmd/wire_gen.go +++ b/app/controlplane/cmd/wire_gen.go @@ -179,19 +179,27 @@ func wireApp(bootstrap *conf.Bootstrap, readerWriter credentials.ReaderWriter, l cleanup() return nil, nil, err } + casMappingRepo := data.NewCASMappingRepo(dataData, casBackendRepo, logger) + casMappingUseCase := biz.NewCASMappingUseCase(casMappingRepo, membershipUseCase, logger) + cache2, err := newPolicyEvalBundleCache(conn, logger) + if err != nil { + cleanup() + return nil, nil, err + } newWorkflowRunServiceOpts := &service.NewWorkflowRunServiceOpts{ WorkflowRunUC: workflowRunUseCase, WorkflowUC: workflowUseCase, WorkflowContractUC: workflowContractUseCase, ProjectUC: projectUseCase, CredsReader: readerWriter, + CASClient: casClientUseCase, + CASMappingUC: casMappingUseCase, + PolicyEvalCache: cache2, Opts: v5, } workflowRunService := service.NewWorkflowRunService(newWorkflowRunServiceOpts) attestationUseCase := biz.NewAttestationUseCase(casClientUseCase, logger) fanOutDispatcher := dispatcher.New(integrationUseCase, workflowUseCase, workflowRunUseCase, readerWriter, casClientUseCase, availablePlugins, logger) - casMappingRepo := data.NewCASMappingRepo(dataData, casBackendRepo, logger) - casMappingUseCase := biz.NewCASMappingUseCase(casMappingRepo, membershipUseCase, logger) v6 := bootstrap.PrometheusIntegration orgMetricsRepo := data.NewOrgMetricsRepo(dataData, logger) orgMetricsUseCase, err := biz.NewOrgMetricsUseCase(orgMetricsRepo, organizationRepo, workflowUseCase, logger) @@ -398,12 +406,13 @@ func newAuthAllowList(conf2 *conf.Bootstrap) *v1.AllowList { var cacheProviderSet = wire.NewSet( newMembershipsCache, newClaimsCache, + newPolicyEvalBundleCache, ) func newClaimsCache(conn *nats.Conn, logger log.Logger) (cache.Cache[*jwt.MapClaims], error) { l := log.NewHelper(logger) backend := "memory" - opts := []cache.Option{cache.WithTTL(10 * time.Second), cache.WithLogger(&kratosLogAdapter{h: l})} + opts := []cache.Option{cache.WithTTL(10 * time.Second), cache.WithLogger(&kratosLogAdapter{h: l}), cache.WithDescription("Cache for JWT claims")} if conn != nil { backend = "nats" opts = append(opts, cache.WithNATS(conn, "chainloop-jwt-claims")) @@ -415,7 +424,7 @@ func newClaimsCache(conn *nats.Conn, logger log.Logger) (cache.Cache[*jwt.MapCla func newMembershipsCache(conn *nats.Conn, logger log.Logger) (cache.Cache[*entities.Membership], error) { l := log.NewHelper(logger) backend := "memory" - opts := []cache.Option{cache.WithTTL(time.Second), cache.WithLogger(&kratosLogAdapter{h: l})} + opts := []cache.Option{cache.WithTTL(time.Second), cache.WithLogger(&kratosLogAdapter{h: l}), cache.WithDescription("Cache for org memberships")} if conn != nil { backend = "nats" opts = append(opts, cache.WithNATS(conn, "chainloop-memberships")) @@ -424,6 +433,18 @@ func newMembershipsCache(conn *nats.Conn, logger log.Logger) (cache.Cache[*entit return cache.New[*entities.Membership](opts...) } +func newPolicyEvalBundleCache(conn *nats.Conn, logger log.Logger) (cache.Cache[[]byte], error) { + l := log.NewHelper(logger) + backend := "memory" + opts := []cache.Option{cache.WithTTL(24 * time.Hour), cache.WithLogger(&kratosLogAdapter{h: l}), cache.WithDescription("Cache for policy evaluation bundles from CAS")} + if conn != nil { + backend = "nats" + opts = append(opts, cache.WithNATS(conn, "chainloop-policy-eval-bundles")) + } + l.Infow("msg", "cache initialized", "bucket", "chainloop-policy-eval-bundles", "backend", backend, "ttl", "24h") + return cache.New[[]byte](opts...) +} + // kratosLogAdapter adapts kratos log.Helper (Debugw(...interface{})) to cache.Logger (Debugw(string, ...any)). type kratosLogAdapter struct{ h *log.Helper } diff --git a/app/controlplane/internal/service/attestation.go b/app/controlplane/internal/service/attestation.go index 815cc0eec..0d81bbd9e 100644 --- a/app/controlplane/internal/service/attestation.go +++ b/app/controlplane/internal/service/attestation.go @@ -533,7 +533,7 @@ func (s *AttestationService) GetPolicyGroup(ctx context.Context, req *cpAPI.Atte }}, nil } -func bizAttestationToPb(att *biz.Attestation) (*cpAPI.AttestationItem, error) { +func bizAttestationToPb(att *biz.Attestation, predicate chainloop.NormalizablePredicate) (*cpAPI.AttestationItem, error) { if att == nil || att.Envelope == nil { return nil, nil } @@ -543,11 +543,6 @@ func bizAttestationToPb(att *biz.Attestation) (*cpAPI.AttestationItem, error) { return nil, err } - predicate, err := chainloop.ExtractPredicate(att.Envelope) - if err != nil { - return nil, fmt.Errorf("error extracting predicate from attestation: %w", err) - } - materials, err := extractMaterials(predicate.GetMaterials()) if err != nil { return nil, fmt.Errorf("error extracting materials from attestation: %w", err) diff --git a/app/controlplane/internal/service/workflowrun.go b/app/controlplane/internal/service/workflowrun.go index 4c692ae18..3f5b39317 100644 --- a/app/controlplane/internal/service/workflowrun.go +++ b/app/controlplane/internal/service/workflowrun.go @@ -1,5 +1,5 @@ // -// Copyright 2024-2025 The Chainloop Authors. +// Copyright 2024-2026 The Chainloop Authors. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -16,6 +16,7 @@ package service import ( + "bytes" "context" "fmt" "slices" @@ -25,9 +26,12 @@ import ( "github.com/chainloop-dev/chainloop/app/controlplane/pkg/authz" "github.com/chainloop-dev/chainloop/app/controlplane/pkg/biz" "github.com/chainloop-dev/chainloop/app/controlplane/pkg/pagination" + chainloop "github.com/chainloop-dev/chainloop/pkg/attestation/renderer/chainloop" + "github.com/chainloop-dev/chainloop/pkg/cache" "github.com/chainloop-dev/chainloop/pkg/credentials" errors "github.com/go-kratos/kratos/v2/errors" "github.com/google/uuid" + intoto "github.com/in-toto/attestation/go/v1" "google.golang.org/protobuf/types/known/timestamppb" ) @@ -40,6 +44,9 @@ type WorkflowRunService struct { workflowContractUseCase *biz.WorkflowContractUseCase projectUseCase *biz.ProjectUseCase credsReader credentials.Reader + casClient biz.CASClient + casMappingUC *biz.CASMappingUseCase + policyEvalCache cache.Cache[[]byte] } type NewWorkflowRunServiceOpts struct { @@ -48,6 +55,9 @@ type NewWorkflowRunServiceOpts struct { WorkflowContractUC *biz.WorkflowContractUseCase ProjectUC *biz.ProjectUseCase CredsReader credentials.Reader + CASClient biz.CASClient + CASMappingUC *biz.CASMappingUseCase + PolicyEvalCache cache.Cache[[]byte] Opts []NewOpt } @@ -59,9 +69,56 @@ func NewWorkflowRunService(opts *NewWorkflowRunServiceOpts) *WorkflowRunService workflowContractUseCase: opts.WorkflowContractUC, projectUseCase: opts.ProjectUC, credsReader: opts.CredsReader, + casClient: opts.CASClient, + casMappingUC: opts.CASMappingUC, + policyEvalCache: opts.PolicyEvalCache, } } +type casResolvedPredicate struct { + chainloop.NormalizablePredicate + evals map[string][]*chainloop.PolicyEvaluation +} + +func (p *casResolvedPredicate) GetPolicyEvaluations() map[string][]*chainloop.PolicyEvaluation { + return p.evals +} + +func (s *WorkflowRunService) resolvePolicyEvaluations( + ctx context.Context, + ref *intoto.ResourceDescriptor, + orgID uuid.UUID, +) (map[string][]*chainloop.PolicyEvaluation, error) { + if ref == nil { + return nil, nil + } + + hexDigest, ok := ref.Digest["sha256"] + if !ok { + return nil, fmt.Errorf("no sha256 digest in policy evaluations ref") + } + digest := fmt.Sprintf("sha256:%s", hexDigest) + + if cached, found, err := s.policyEvalCache.Get(ctx, digest); err == nil && found { + return chainloop.PolicyEvaluationsFromBundle(cached) + } + + mapping, err := s.casMappingUC.FindCASMappingForDownloadByOrg(ctx, digest, []uuid.UUID{orgID}, nil) + if err != nil { + return nil, fmt.Errorf("finding CAS mapping: %w", err) + } + + var buf bytes.Buffer + if err := s.casClient.Download(ctx, string(mapping.CASBackend.Provider), mapping.CASBackend.SecretName, &buf, digest); err != nil { + return nil, fmt.Errorf("downloading policy eval bundle: %w", err) + } + + data := buf.Bytes() + _ = s.policyEvalCache.Set(ctx, digest, data) + + return chainloop.PolicyEvaluationsFromBundle(data) +} + func (s *WorkflowRunService) List(ctx context.Context, req *pb.WorkflowRunServiceListRequest) (*pb.WorkflowRunServiceListResponse, error) { currentOrg, err := requireCurrentOrg(ctx) if err != nil { @@ -185,7 +242,24 @@ func (s *WorkflowRunService) View(ctx context.Context, req *pb.WorkflowRunServic verificationResult = bizVerificationToPb(vr) } - attestation, err := bizAttestationToPb(run.Attestation) + var predicate chainloop.NormalizablePredicate + if run.Attestation != nil && run.Attestation.Envelope != nil { + predicate, err = chainloop.ExtractPredicate(run.Attestation.Envelope) + if err != nil { + return nil, handleUseCaseErr(err, s.log) + } + + if ref := predicate.GetPolicyEvaluationsRef(); ref != nil { + resolved, resolveErr := s.resolvePolicyEvaluations(ctx, ref, run.Workflow.OrgID) + if resolveErr != nil { + s.log.Warnw("msg", "failed to resolve policy evaluations from CAS, using inline", "err", resolveErr) + } else if resolved != nil { + predicate = &casResolvedPredicate{NormalizablePredicate: predicate, evals: resolved} + } + } + } + + attestation, err := bizAttestationToPb(run.Attestation, predicate) if err != nil { return nil, handleUseCaseErr(err, s.log) } diff --git a/pkg/attestation/renderer/chainloop/v02.go b/pkg/attestation/renderer/chainloop/v02.go index 44cdb3f73..56628c508 100644 --- a/pkg/attestation/renderer/chainloop/v02.go +++ b/pkg/attestation/renderer/chainloop/v02.go @@ -288,12 +288,27 @@ func (r *RendererV02) predicate() (*structpb.Struct, error) { return predicate, nil } -// collect all policy evaluations grouped by material and returns if there is a policy violation func mappedPolicyEvaluations(att *v1.Attestation) (map[string][]*PolicyEvaluation, bool, error) { - var hasPolicyViolations bool - result := map[string][]*PolicyEvaluation{} + return groupEvaluations(att.GetPolicyEvaluations()) +} + +// PolicyEvaluationsFromBundle deserializes a PolicyEvaluationBundle from protojson bytes +// and returns evaluations grouped by material name. +func PolicyEvaluationsFromBundle(data []byte) (map[string][]*PolicyEvaluation, error) { + var bundle v1.PolicyEvaluationBundle + if err := protojson.Unmarshal(data, &bundle); err != nil { + return nil, fmt.Errorf("unmarshaling policy evaluation bundle: %w", err) + } + + evals, _, err := groupEvaluations(bundle.GetEvaluations()) + return evals, err +} + +func groupEvaluations(evals []*v1.PolicyEvaluation) (map[string][]*PolicyEvaluation, bool, error) { + var hasViolations bool + result := make(map[string][]*PolicyEvaluation) - for _, p := range att.GetPolicyEvaluations() { + for _, p := range evals { keyName := p.MaterialName if keyName == "" { keyName = AttPolicyEvaluation @@ -305,13 +320,13 @@ func mappedPolicyEvaluations(att *v1.Attestation) (map[string][]*PolicyEvaluatio } if len(ev.Violations) > 0 { - hasPolicyViolations = true + hasViolations = true } result[keyName] = append(result[keyName], ev) } - return result, hasPolicyViolations, nil + return result, hasViolations, nil } func renderEvaluation(ev *v1.PolicyEvaluation) (*PolicyEvaluation, error) { diff --git a/pkg/attestation/renderer/chainloop/v02_test.go b/pkg/attestation/renderer/chainloop/v02_test.go index 73f0eff28..16195ef1f 100644 --- a/pkg/attestation/renderer/chainloop/v02_test.go +++ b/pkg/attestation/renderer/chainloop/v02_test.go @@ -395,6 +395,103 @@ func TestPredicatePolicyEvaluationsRef(t *testing.T) { } } +func TestPolicyEvaluationsFromBundle(t *testing.T) { + tests := []struct { + name string + bundle *api.PolicyEvaluationBundle + wantLen map[string]int // expected number of evaluations per material + wantErr bool + }{ + { + name: "groups evaluations by material name", + bundle: &api.PolicyEvaluationBundle{ + Evaluations: []*api.PolicyEvaluation{ + {Name: "check-sbom", MaterialName: "sbom"}, + {Name: "check-sarif", MaterialName: "sarif"}, + {Name: "check-sbom-format", MaterialName: "sbom"}, + }, + }, + wantLen: map[string]int{"sbom": 2, "sarif": 1}, + }, + { + name: "empty material name uses attestation key", + bundle: &api.PolicyEvaluationBundle{ + Evaluations: []*api.PolicyEvaluation{ + {Name: "att-policy", MaterialName: ""}, + }, + }, + wantLen: map[string]int{AttPolicyEvaluation: 1}, + }, + { + name: "empty bundle returns empty map", + bundle: &api.PolicyEvaluationBundle{}, + wantLen: map[string]int{}, + }, + { + name: "invalid data returns error", + bundle: nil, // we'll pass invalid bytes directly + wantErr: true, + }, + { + name: "preserves violations and fields", + bundle: &api.PolicyEvaluationBundle{ + Evaluations: []*api.PolicyEvaluation{ + { + Name: "vuln-check", + MaterialName: "image", + Description: "checks for vulns", + Violations: []*api.PolicyEvaluation_Violation{ + {Subject: "CVE-2024-1234", Message: "critical vuln found"}, + }, + Skipped: true, + SkipReasons: []string{"not applicable"}, + }, + }, + }, + wantLen: map[string]int{"image": 1}, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + var data []byte + if tc.bundle != nil { + var err error + data, err = protojson.Marshal(tc.bundle) + require.NoError(t, err) + } else if tc.wantErr { + data = []byte("not valid protojson") + } + + result, err := PolicyEvaluationsFromBundle(data) + if tc.wantErr { + assert.Error(t, err) + return + } + require.NoError(t, err) + + // Check grouping counts + assert.Len(t, result, len(tc.wantLen)) + for key, expectedCount := range tc.wantLen { + assert.Len(t, result[key], expectedCount, "key=%s", key) + } + + // For the violations test case, verify field mapping + if tc.name == "preserves violations and fields" { + ev := result["image"][0] + assert.Equal(t, "vuln-check", ev.Name) + assert.Equal(t, "image", ev.MaterialName) + assert.Equal(t, "checks for vulns", ev.Description) + assert.True(t, ev.Skipped) + assert.Equal(t, []string{"not applicable"}, ev.SkipReasons) + require.Len(t, ev.Violations, 1) + assert.Equal(t, "CVE-2024-1234", ev.Violations[0].Subject) + assert.Equal(t, "critical vuln found", ev.Violations[0].Message) + } + }) + } +} + func TestPolicyEvaluationsField(t *testing.T) { raw, err := os.ReadFile("testdata/attestation-pe-snake.json") require.NoError(t, err) diff --git a/pkg/cache/cache.go b/pkg/cache/cache.go index 40a33624f..8fdb4cb76 100644 --- a/pkg/cache/cache.go +++ b/pkg/cache/cache.go @@ -46,12 +46,13 @@ type Logger interface { const defaultMaxSize = 1000 type config struct { - ttl time.Duration - maxSize int - logger Logger - natsConn *nats.Conn - bucketName string - reconnCh <-chan struct{} + ttl time.Duration + maxSize int + logger Logger + natsConn *nats.Conn + bucketName string + description string + reconnCh <-chan struct{} } // Option configures cache construction. @@ -75,6 +76,11 @@ func WithNATS(conn *nats.Conn, bucketName string) Option { } } +// WithDescription sets the NATS KV bucket description. Ignored for in-memory backend. +func WithDescription(desc string) Option { + return func(c *config) { c.description = desc } +} + // WithReconnect provides a channel that signals NATS reconnection events. func WithReconnect(ch <-chan struct{}) Option { return func(c *config) { c.reconnCh = ch } diff --git a/pkg/cache/natskv.go b/pkg/cache/natskv.go index 109972db3..f9e275142 100644 --- a/pkg/cache/natskv.go +++ b/pkg/cache/natskv.go @@ -62,9 +62,10 @@ func (c *natsKVCache[T]) initBucket() error { } kv, err := js.CreateOrUpdateKeyValue(context.Background(), jetstream.KeyValueConfig{ - Bucket: c.bucket, - TTL: c.cfg.ttl, - Storage: jetstream.MemoryStorage, + Bucket: c.bucket, + Description: c.cfg.description, + TTL: c.cfg.ttl, + Storage: jetstream.MemoryStorage, }) if err != nil { return err