diff --git a/app/artifact-cas/cmd/wire.go b/app/artifact-cas/cmd/wire.go index d9c2d823f..da2a3735e 100644 --- a/app/artifact-cas/cmd/wire.go +++ b/app/artifact-cas/cmd/wire.go @@ -24,8 +24,10 @@ import ( "github.com/chainloop-dev/chainloop/app/artifact-cas/internal/conf" "github.com/chainloop-dev/chainloop/app/artifact-cas/internal/server" "github.com/chainloop-dev/chainloop/app/artifact-cas/internal/service" + "github.com/chainloop-dev/chainloop/app/controlplane/pkg/auditor" "github.com/chainloop-dev/chainloop/pkg/blobmanager/loader" "github.com/chainloop-dev/chainloop/pkg/credentials" + "github.com/chainloop-dev/chainloop/pkg/natsconn" "github.com/go-kratos/kratos/v2/log" "github.com/google/wire" ) @@ -40,12 +42,37 @@ func wireApp(*conf.Bootstrap, *conf.Server, *conf.Auth, credentials.Reader, log. newApp, serviceOpts, newProtoValidator, + newNatsConfig, + natsconn.New, + // publish-only: the control plane owns the chainloop-audit stream configuration + auditor.NewPublishOnlyAuditLogPublisher, + service.NewAuditDispatcher, ), ) } -func serviceOpts(l log.Logger) []service.NewOpt { +func serviceOpts(l log.Logger, audit *service.AuditDispatcher) []service.NewOpt { return []service.NewOpt{ service.WithLogger(l), + service.WithAuditDispatcher(audit), } } + +// newNatsConfig converts the proto config to a plain natsconn.Config, nil when unset +func newNatsConfig(bc *conf.Bootstrap) *natsconn.Config { + c := bc.GetNatsServer() + if c.GetUri() == "" { + return nil + } + + cfg := &natsconn.Config{ + URI: c.GetUri(), + Name: "chainloop-artifact-cas", + } + + if c.GetToken() != "" { + cfg.Token = c.GetToken() + } + + return cfg +} diff --git a/app/artifact-cas/cmd/wire_gen.go b/app/artifact-cas/cmd/wire_gen.go index 07dba05dc..6ae068607 100644 --- a/app/artifact-cas/cmd/wire_gen.go +++ b/app/artifact-cas/cmd/wire_gen.go @@ -10,8 +10,10 @@ import ( "github.com/chainloop-dev/chainloop/app/artifact-cas/internal/conf" "github.com/chainloop-dev/chainloop/app/artifact-cas/internal/server" "github.com/chainloop-dev/chainloop/app/artifact-cas/internal/service" + "github.com/chainloop-dev/chainloop/app/controlplane/pkg/auditor" "github.com/chainloop-dev/chainloop/pkg/blobmanager/loader" "github.com/chainloop-dev/chainloop/pkg/credentials" + "github.com/chainloop-dev/chainloop/pkg/natsconn" "github.com/go-kratos/kratos/v2/log" ) @@ -24,38 +26,74 @@ import ( // wireApp init kratos application. func wireApp(bootstrap *conf.Bootstrap, confServer *conf.Server, auth *conf.Auth, reader credentials.Reader, logger log.Logger) (*app, func(), error) { providers := loader.LoadProviders(reader) - v := serviceOpts(logger) + config := newNatsConfig(bootstrap) + reloadableConnection, cleanup, err := natsconn.New(config, logger) + if err != nil { + return nil, nil, err + } + auditLogPublisher, err := auditor.NewPublishOnlyAuditLogPublisher(reloadableConnection, logger) + if err != nil { + cleanup() + return nil, nil, err + } + auditDispatcher := service.NewAuditDispatcher(auditLogPublisher, logger) + v := serviceOpts(logger, auditDispatcher) byteStreamService := service.NewByteStreamService(providers, v...) resourceService := service.NewResourceService(providers, v...) validator, err := newProtoValidator() if err != nil { + cleanup() return nil, nil, err } grpcServer, err := server.NewGRPCServer(confServer, auth, byteStreamService, resourceService, providers, validator, logger) if err != nil { + cleanup() return nil, nil, err } downloadService := service.NewDownloadService(providers, v...) httpServer, err := server.NewHTTPServer(confServer, auth, downloadService, providers, logger) if err != nil { + cleanup() return nil, nil, err } httpMetricsServer, err := server.NewHTTPMetricsServer(confServer) if err != nil { + cleanup() return nil, nil, err } - tracerProvider, cleanup, err := server.NewTracerProvider(bootstrap, logger) + tracerProvider, cleanup2, err := server.NewTracerProvider(bootstrap, logger) if err != nil { + cleanup() return nil, nil, err } mainApp := newApp(logger, grpcServer, httpServer, httpMetricsServer, providers, tracerProvider) return mainApp, func() { + cleanup2() cleanup() }, nil } // wire.go: -func serviceOpts(l log.Logger) []service.NewOpt { - return []service.NewOpt{service.WithLogger(l)} +func serviceOpts(l log.Logger, audit *service.AuditDispatcher) []service.NewOpt { + return []service.NewOpt{service.WithLogger(l), service.WithAuditDispatcher(audit)} +} + +// newNatsConfig converts the proto config to a plain natsconn.Config, nil when unset +func newNatsConfig(bc *conf.Bootstrap) *natsconn.Config { + c := bc.GetNatsServer() + if c.GetUri() == "" { + return nil + } + + cfg := &natsconn.Config{ + URI: c.GetUri(), + Name: "chainloop-artifact-cas", + } + + if c.GetToken() != "" { + cfg.Token = c.GetToken() + } + + return cfg } diff --git a/app/artifact-cas/internal/conf/conf.pb.go b/app/artifact-cas/internal/conf/conf.pb.go index a29ee28e9..e724c6098 100644 --- a/app/artifact-cas/internal/conf/conf.pb.go +++ b/app/artifact-cas/internal/conf/conf.pb.go @@ -44,8 +44,11 @@ type Bootstrap struct { Auth *Auth `protobuf:"bytes,2,opt,name=auth,proto3" json:"auth,omitempty"` Observability *Bootstrap_Observability `protobuf:"bytes,3,opt,name=observability,proto3" json:"observability,omitempty"` CredentialsService *v1.Credentials `protobuf:"bytes,4,opt,name=credentials_service,json=credentialsService,proto3" json:"credentials_service,omitempty"` - unknownFields protoimpl.UnknownFields - sizeCache protoimpl.SizeCache + // Optional NATS server configuration to publish audit events to the + // control-plane-owned stream. When unset, event publishing is disabled. + NatsServer *Bootstrap_NatsServer `protobuf:"bytes,5,opt,name=nats_server,json=natsServer,proto3" json:"nats_server,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache } func (x *Bootstrap) Reset() { @@ -106,6 +109,13 @@ func (x *Bootstrap) GetCredentialsService() *v1.Credentials { return nil } +func (x *Bootstrap) GetNatsServer() *Bootstrap_NatsServer { + if x != nil { + return x.NatsServer + } + return nil +} + type Server struct { state protoimpl.MessageState `protogen:"open.v1"` // Regular HTTP endpoint @@ -227,6 +237,81 @@ func (x *Auth) GetPublicKeyPath() string { return "" } +type Bootstrap_NatsServer struct { + state protoimpl.MessageState `protogen:"open.v1"` + // NATS server URI, e.g. "nats://localhost:4222" + Uri string `protobuf:"bytes,1,opt,name=uri,proto3" json:"uri,omitempty"` + // Types that are valid to be assigned to Authentication: + // + // *Bootstrap_NatsServer_Token + Authentication isBootstrap_NatsServer_Authentication `protobuf_oneof:"authentication"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *Bootstrap_NatsServer) Reset() { + *x = Bootstrap_NatsServer{} + mi := &file_conf_proto_msgTypes[3] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *Bootstrap_NatsServer) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Bootstrap_NatsServer) ProtoMessage() {} + +func (x *Bootstrap_NatsServer) ProtoReflect() protoreflect.Message { + mi := &file_conf_proto_msgTypes[3] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Bootstrap_NatsServer.ProtoReflect.Descriptor instead. +func (*Bootstrap_NatsServer) Descriptor() ([]byte, []int) { + return file_conf_proto_rawDescGZIP(), []int{0, 0} +} + +func (x *Bootstrap_NatsServer) GetUri() string { + if x != nil { + return x.Uri + } + return "" +} + +func (x *Bootstrap_NatsServer) GetAuthentication() isBootstrap_NatsServer_Authentication { + if x != nil { + return x.Authentication + } + return nil +} + +func (x *Bootstrap_NatsServer) GetToken() string { + if x != nil { + if x, ok := x.Authentication.(*Bootstrap_NatsServer_Token); ok { + return x.Token + } + } + return "" +} + +type isBootstrap_NatsServer_Authentication interface { + isBootstrap_NatsServer_Authentication() +} + +type Bootstrap_NatsServer_Token struct { + Token string `protobuf:"bytes,2,opt,name=token,proto3,oneof"` +} + +func (*Bootstrap_NatsServer_Token) isBootstrap_NatsServer_Authentication() {} + type Bootstrap_Observability struct { state protoimpl.MessageState `protogen:"open.v1"` Sentry *Bootstrap_Observability_Sentry `protobuf:"bytes,1,opt,name=sentry,proto3" json:"sentry,omitempty"` @@ -237,7 +322,7 @@ type Bootstrap_Observability struct { func (x *Bootstrap_Observability) Reset() { *x = Bootstrap_Observability{} - mi := &file_conf_proto_msgTypes[3] + mi := &file_conf_proto_msgTypes[4] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -249,7 +334,7 @@ func (x *Bootstrap_Observability) String() string { func (*Bootstrap_Observability) ProtoMessage() {} func (x *Bootstrap_Observability) ProtoReflect() protoreflect.Message { - mi := &file_conf_proto_msgTypes[3] + mi := &file_conf_proto_msgTypes[4] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -262,7 +347,7 @@ func (x *Bootstrap_Observability) ProtoReflect() protoreflect.Message { // Deprecated: Use Bootstrap_Observability.ProtoReflect.Descriptor instead. func (*Bootstrap_Observability) Descriptor() ([]byte, []int) { - return file_conf_proto_rawDescGZIP(), []int{0, 0} + return file_conf_proto_rawDescGZIP(), []int{0, 1} } func (x *Bootstrap_Observability) GetSentry() *Bootstrap_Observability_Sentry { @@ -290,7 +375,7 @@ type Bootstrap_Observability_Sentry struct { func (x *Bootstrap_Observability_Sentry) Reset() { *x = Bootstrap_Observability_Sentry{} - mi := &file_conf_proto_msgTypes[4] + mi := &file_conf_proto_msgTypes[5] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -302,7 +387,7 @@ func (x *Bootstrap_Observability_Sentry) String() string { func (*Bootstrap_Observability_Sentry) ProtoMessage() {} func (x *Bootstrap_Observability_Sentry) ProtoReflect() protoreflect.Message { - mi := &file_conf_proto_msgTypes[4] + mi := &file_conf_proto_msgTypes[5] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -315,7 +400,7 @@ func (x *Bootstrap_Observability_Sentry) ProtoReflect() protoreflect.Message { // Deprecated: Use Bootstrap_Observability_Sentry.ProtoReflect.Descriptor instead. func (*Bootstrap_Observability_Sentry) Descriptor() ([]byte, []int) { - return file_conf_proto_rawDescGZIP(), []int{0, 0, 0} + return file_conf_proto_rawDescGZIP(), []int{0, 1, 0} } func (x *Bootstrap_Observability_Sentry) GetDsn() string { @@ -349,7 +434,7 @@ type Bootstrap_Observability_Tracing struct { func (x *Bootstrap_Observability_Tracing) Reset() { *x = Bootstrap_Observability_Tracing{} - mi := &file_conf_proto_msgTypes[5] + mi := &file_conf_proto_msgTypes[6] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -361,7 +446,7 @@ func (x *Bootstrap_Observability_Tracing) String() string { func (*Bootstrap_Observability_Tracing) ProtoMessage() {} func (x *Bootstrap_Observability_Tracing) ProtoReflect() protoreflect.Message { - mi := &file_conf_proto_msgTypes[5] + mi := &file_conf_proto_msgTypes[6] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -374,7 +459,7 @@ func (x *Bootstrap_Observability_Tracing) ProtoReflect() protoreflect.Message { // Deprecated: Use Bootstrap_Observability_Tracing.ProtoReflect.Descriptor instead. func (*Bootstrap_Observability_Tracing) Descriptor() ([]byte, []int) { - return file_conf_proto_rawDescGZIP(), []int{0, 0, 1} + return file_conf_proto_rawDescGZIP(), []int{0, 1, 1} } func (x *Bootstrap_Observability_Tracing) GetEnabled() bool { @@ -414,7 +499,7 @@ type Server_CORS struct { func (x *Server_CORS) Reset() { *x = Server_CORS{} - mi := &file_conf_proto_msgTypes[6] + mi := &file_conf_proto_msgTypes[7] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -426,7 +511,7 @@ func (x *Server_CORS) String() string { func (*Server_CORS) ProtoMessage() {} func (x *Server_CORS) ProtoReflect() protoreflect.Message { - mi := &file_conf_proto_msgTypes[6] + mi := &file_conf_proto_msgTypes[7] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -462,7 +547,7 @@ type Server_HTTP struct { func (x *Server_HTTP) Reset() { *x = Server_HTTP{} - mi := &file_conf_proto_msgTypes[7] + mi := &file_conf_proto_msgTypes[8] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -474,7 +559,7 @@ func (x *Server_HTTP) String() string { func (*Server_HTTP) ProtoMessage() {} func (x *Server_HTTP) ProtoReflect() protoreflect.Message { - mi := &file_conf_proto_msgTypes[7] + mi := &file_conf_proto_msgTypes[8] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -529,7 +614,7 @@ type Server_TLS struct { func (x *Server_TLS) Reset() { *x = Server_TLS{} - mi := &file_conf_proto_msgTypes[8] + mi := &file_conf_proto_msgTypes[9] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -541,7 +626,7 @@ func (x *Server_TLS) String() string { func (*Server_TLS) ProtoMessage() {} func (x *Server_TLS) ProtoReflect() protoreflect.Message { - mi := &file_conf_proto_msgTypes[8] + mi := &file_conf_proto_msgTypes[9] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -583,7 +668,7 @@ type Server_GRPC struct { func (x *Server_GRPC) Reset() { *x = Server_GRPC{} - mi := &file_conf_proto_msgTypes[9] + mi := &file_conf_proto_msgTypes[10] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -595,7 +680,7 @@ func (x *Server_GRPC) String() string { func (*Server_GRPC) ProtoMessage() {} func (x *Server_GRPC) ProtoReflect() protoreflect.Message { - mi := &file_conf_proto_msgTypes[9] + mi := &file_conf_proto_msgTypes[10] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -644,12 +729,19 @@ var File_conf_proto protoreflect.FileDescriptor const file_conf_proto_rawDesc = "" + "\n" + "\n" + - "conf.proto\x1a\x1bcredentials/v1/config.proto\x1a\x1egoogle/protobuf/duration.proto\"\xb7\x04\n" + + "conf.proto\x1a\x1bcredentials/v1/config.proto\x1a\x1egoogle/protobuf/duration.proto\"\xb9\x05\n" + "\tBootstrap\x12\x1f\n" + "\x06server\x18\x01 \x01(\v2\a.ServerR\x06server\x12\x19\n" + "\x04auth\x18\x02 \x01(\v2\x05.AuthR\x04auth\x12>\n" + "\robservability\x18\x03 \x01(\v2\x18.Bootstrap.ObservabilityR\robservability\x12L\n" + - "\x13credentials_service\x18\x04 \x01(\v2\x1b.credentials.v1.CredentialsR\x12credentialsService\x1a\xdf\x02\n" + + "\x13credentials_service\x18\x04 \x01(\v2\x1b.credentials.v1.CredentialsR\x12credentialsService\x126\n" + + "\vnats_server\x18\x05 \x01(\v2\x15.Bootstrap.NatsServerR\n" + + "natsServer\x1aH\n" + + "\n" + + "NatsServer\x12\x10\n" + + "\x03uri\x18\x01 \x01(\tR\x03uri\x12\x16\n" + + "\x05token\x18\x02 \x01(\tH\x00R\x05tokenB\x10\n" + + "\x0eauthentication\x1a\xdf\x02\n" + "\rObservability\x127\n" + "\x06sentry\x18\x01 \x01(\v2\x1f.Bootstrap.Observability.SentryR\x06sentry\x12:\n" + "\atracing\x18\x02 \x01(\v2 .Bootstrap.Observability.TracingR\atracing\x1a<\n" + @@ -699,40 +791,42 @@ func file_conf_proto_rawDescGZIP() []byte { return file_conf_proto_rawDescData } -var file_conf_proto_msgTypes = make([]protoimpl.MessageInfo, 10) +var file_conf_proto_msgTypes = make([]protoimpl.MessageInfo, 11) var file_conf_proto_goTypes = []any{ (*Bootstrap)(nil), // 0: Bootstrap (*Server)(nil), // 1: Server (*Auth)(nil), // 2: Auth - (*Bootstrap_Observability)(nil), // 3: Bootstrap.Observability - (*Bootstrap_Observability_Sentry)(nil), // 4: Bootstrap.Observability.Sentry - (*Bootstrap_Observability_Tracing)(nil), // 5: Bootstrap.Observability.Tracing - (*Server_CORS)(nil), // 6: Server.CORS - (*Server_HTTP)(nil), // 7: Server.HTTP - (*Server_TLS)(nil), // 8: Server.TLS - (*Server_GRPC)(nil), // 9: Server.GRPC - (*v1.Credentials)(nil), // 10: credentials.v1.Credentials - (*durationpb.Duration)(nil), // 11: google.protobuf.Duration + (*Bootstrap_NatsServer)(nil), // 3: Bootstrap.NatsServer + (*Bootstrap_Observability)(nil), // 4: Bootstrap.Observability + (*Bootstrap_Observability_Sentry)(nil), // 5: Bootstrap.Observability.Sentry + (*Bootstrap_Observability_Tracing)(nil), // 6: Bootstrap.Observability.Tracing + (*Server_CORS)(nil), // 7: Server.CORS + (*Server_HTTP)(nil), // 8: Server.HTTP + (*Server_TLS)(nil), // 9: Server.TLS + (*Server_GRPC)(nil), // 10: Server.GRPC + (*v1.Credentials)(nil), // 11: credentials.v1.Credentials + (*durationpb.Duration)(nil), // 12: google.protobuf.Duration } var file_conf_proto_depIdxs = []int32{ 1, // 0: Bootstrap.server:type_name -> Server 2, // 1: Bootstrap.auth:type_name -> Auth - 3, // 2: Bootstrap.observability:type_name -> Bootstrap.Observability - 10, // 3: Bootstrap.credentials_service:type_name -> credentials.v1.Credentials - 7, // 4: Server.http:type_name -> Server.HTTP - 9, // 5: Server.grpc:type_name -> Server.GRPC - 7, // 6: Server.http_metrics:type_name -> Server.HTTP - 4, // 7: Bootstrap.Observability.sentry:type_name -> Bootstrap.Observability.Sentry - 5, // 8: Bootstrap.Observability.tracing:type_name -> Bootstrap.Observability.Tracing - 11, // 9: Server.HTTP.timeout:type_name -> google.protobuf.Duration - 6, // 10: Server.HTTP.cors:type_name -> Server.CORS - 11, // 11: Server.GRPC.timeout:type_name -> google.protobuf.Duration - 8, // 12: Server.GRPC.tls_config:type_name -> Server.TLS - 13, // [13:13] is the sub-list for method output_type - 13, // [13:13] is the sub-list for method input_type - 13, // [13:13] is the sub-list for extension type_name - 13, // [13:13] is the sub-list for extension extendee - 0, // [0:13] is the sub-list for field type_name + 4, // 2: Bootstrap.observability:type_name -> Bootstrap.Observability + 11, // 3: Bootstrap.credentials_service:type_name -> credentials.v1.Credentials + 3, // 4: Bootstrap.nats_server:type_name -> Bootstrap.NatsServer + 8, // 5: Server.http:type_name -> Server.HTTP + 10, // 6: Server.grpc:type_name -> Server.GRPC + 8, // 7: Server.http_metrics:type_name -> Server.HTTP + 5, // 8: Bootstrap.Observability.sentry:type_name -> Bootstrap.Observability.Sentry + 6, // 9: Bootstrap.Observability.tracing:type_name -> Bootstrap.Observability.Tracing + 12, // 10: Server.HTTP.timeout:type_name -> google.protobuf.Duration + 7, // 11: Server.HTTP.cors:type_name -> Server.CORS + 12, // 12: Server.GRPC.timeout:type_name -> google.protobuf.Duration + 9, // 13: Server.GRPC.tls_config:type_name -> Server.TLS + 14, // [14:14] is the sub-list for method output_type + 14, // [14:14] is the sub-list for method input_type + 14, // [14:14] is the sub-list for extension type_name + 14, // [14:14] is the sub-list for extension extendee + 0, // [0:14] is the sub-list for field type_name } func init() { file_conf_proto_init() } @@ -740,14 +834,17 @@ func file_conf_proto_init() { if File_conf_proto != nil { return } - file_conf_proto_msgTypes[5].OneofWrappers = []any{} + file_conf_proto_msgTypes[3].OneofWrappers = []any{ + (*Bootstrap_NatsServer_Token)(nil), + } + file_conf_proto_msgTypes[6].OneofWrappers = []any{} type x struct{} out := protoimpl.TypeBuilder{ File: protoimpl.DescBuilder{ GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: unsafe.Slice(unsafe.StringData(file_conf_proto_rawDesc), len(file_conf_proto_rawDesc)), NumEnums: 0, - NumMessages: 10, + NumMessages: 11, NumExtensions: 0, NumServices: 0, }, diff --git a/app/artifact-cas/internal/conf/conf.proto b/app/artifact-cas/internal/conf/conf.proto index 63c26a9bc..9905e1c03 100644 --- a/app/artifact-cas/internal/conf/conf.proto +++ b/app/artifact-cas/internal/conf/conf.proto @@ -25,6 +25,17 @@ message Bootstrap { Auth auth = 2; Observability observability = 3; credentials.v1.Credentials credentials_service = 4; + // Optional NATS server configuration to publish audit events to the + // control-plane-owned stream. When unset, event publishing is disabled. + NatsServer nats_server = 5; + + message NatsServer { + // NATS server URI, e.g. "nats://localhost:4222" + string uri = 1; + oneof authentication { + string token = 2; + } + } message Observability { Sentry sentry = 1; diff --git a/app/artifact-cas/internal/service/auditor.go b/app/artifact-cas/internal/service/auditor.go new file mode 100644 index 000000000..56e2f7405 --- /dev/null +++ b/app/artifact-cas/internal/service/auditor.go @@ -0,0 +1,87 @@ +// +// Copyright 2026 The Chainloop Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package service + +import ( + "fmt" + + "github.com/chainloop-dev/chainloop/app/controlplane/pkg/auditor" + casJWT "github.com/chainloop-dev/chainloop/internal/robotaccount/cas" + "github.com/chainloop-dev/chainloop/pkg/servicelogger" + "github.com/getsentry/sentry-go" + "github.com/go-kratos/kratos/v2/log" + "github.com/google/uuid" +) + +// eventPublisher is the subset of auditor.AuditLogPublisher used by the dispatcher +type eventPublisher interface { + Publish(data *auditor.EventPayload) error +} + +// AuditDispatcher publishes CAS audit events. Unlike the control plane's +// biz.AuditorUseCase, the actor is always SYSTEM (CAS JWTs carry no user +// identity) and the org comes from the JWT claims instead of the request context. +type AuditDispatcher struct { + // nil when NATS is not configured, making the dispatcher a no-op + publisher eventPublisher + log *log.Helper +} + +func NewAuditDispatcher(publisher *auditor.AuditLogPublisher, logger log.Logger) *AuditDispatcher { + d := &AuditDispatcher{log: servicelogger.ScopedHelper(logger, "audit-dispatcher")} + // keep the interface nil when the publisher is disabled so shouldEmit short-circuits + if publisher != nil { + d.publisher = publisher + } + + return d +} + +// shouldEmit returns true when Dispatch would actually publish an event for the +// given claims. Hooks use it to skip extra work (e.g. backend Describe round-trips). +func (d *AuditDispatcher) shouldEmit(claims *casJWT.Claims) bool { + return d != nil && d.publisher != nil && claims != nil && !claims.SourceInternal +} + +// Dispatch generates and publishes an audit event with a SYSTEM actor and the +// organization from the JWT claims. Best-effort: errors are logged, never returned. +// Internal control plane traffic (SourceInternal claim) emits no events. +func (d *AuditDispatcher) Dispatch(entry auditor.LogEntry, claims *casJWT.Claims) { + if !d.shouldEmit(claims) { + return + } + + orgID, err := uuid.Parse(claims.OrgID) + if err != nil { + d.log.Warnw("msg", "skipping audit event, invalid org id", "org_id", claims.OrgID, "error", err) + return + } + + payload, err := auditor.GenerateAuditEvent(entry, + auditor.WithActor(auditor.ActorTypeSystem, uuid.Nil, "", ""), + auditor.WithOrgID(orgID), + ) + if err != nil { + d.log.Errorw("msg", "failed to generate audit event", "error", err) + sentry.CaptureException(fmt.Errorf("failed to generate audit event: %w", err)) + return + } + + if err := d.publisher.Publish(payload); err != nil { + d.log.Errorw("msg", "failed to publish audit event", "error", err) + sentry.CaptureException(fmt.Errorf("failed to publish audit event: %w", err)) + } +} diff --git a/app/artifact-cas/internal/service/auditor_test.go b/app/artifact-cas/internal/service/auditor_test.go new file mode 100644 index 000000000..fd20aefda --- /dev/null +++ b/app/artifact-cas/internal/service/auditor_test.go @@ -0,0 +1,179 @@ +// +// Copyright 2026 The Chainloop Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package service + +import ( + "encoding/json" + "errors" + "testing" + + "github.com/chainloop-dev/chainloop/app/controlplane/pkg/auditor" + "github.com/chainloop-dev/chainloop/app/controlplane/pkg/auditor/events" + casJWT "github.com/chainloop-dev/chainloop/internal/robotaccount/cas" + "github.com/chainloop-dev/chainloop/pkg/servicelogger" + "github.com/go-kratos/kratos/v2/log" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// fakePublisher records published payloads and optionally fails +type fakePublisher struct { + published []*auditor.EventPayload + err error +} + +func (f *fakePublisher) Publish(data *auditor.EventPayload) error { + if f.err != nil { + return f.err + } + + f.published = append(f.published, data) + return nil +} + +func newTestDispatcher(p eventPublisher) *AuditDispatcher { + return &AuditDispatcher{publisher: p, log: servicelogger.ScopedHelper(log.DefaultLogger, "test")} +} + +func testUploadedEntry() auditor.LogEntry { + return &events.CASArtifactUploaded{ + CASArtifactBase: &events.CASArtifactBase{ + Digest: "b94d27b9934d3e08a52e52d7da7dabfac484efe37a5380ee9088f7ace2efcde9", + SizeBytes: 11, + FileName: "test.txt", + BackendType: "OCI", + }, + } +} + +const testOrgID = "1089bb36-e27b-428b-8009-d015c8737c54" + +func TestAuditDispatcherDispatch(t *testing.T) { + tests := []struct { + name string + dispatcher *AuditDispatcher + entry auditor.LogEntry + claims *casJWT.Claims + wantPublished int + }{ + { + name: "nil dispatcher is a no-op", + dispatcher: nil, + entry: testUploadedEntry(), + claims: &casJWT.Claims{OrgID: testOrgID}, + }, + { + name: "nil publisher is a no-op", + dispatcher: newTestDispatcher(nil), + entry: testUploadedEntry(), + claims: &casJWT.Claims{OrgID: testOrgID}, + }, + { + name: "internal control plane traffic is skipped", + dispatcher: newTestDispatcher(&fakePublisher{}), + entry: testUploadedEntry(), + claims: &casJWT.Claims{OrgID: testOrgID, SourceInternal: true}, + }, + { + name: "invalid org id is skipped", + dispatcher: newTestDispatcher(&fakePublisher{}), + entry: testUploadedEntry(), + claims: &casJWT.Claims{OrgID: "not-an-uuid"}, + }, + { + name: "invalid entry is skipped", + dispatcher: newTestDispatcher(&fakePublisher{}), + entry: &events.CASArtifactUploaded{CASArtifactBase: &events.CASArtifactBase{}}, + claims: &casJWT.Claims{OrgID: testOrgID}, + }, + { + name: "publish errors are swallowed", + dispatcher: newTestDispatcher(&fakePublisher{err: errors.New("nats is down")}), + entry: testUploadedEntry(), + claims: &casJWT.Claims{OrgID: testOrgID}, + }, + { + name: "client traffic is published", + dispatcher: newTestDispatcher(&fakePublisher{}), + entry: testUploadedEntry(), + claims: &casJWT.Claims{OrgID: testOrgID}, + wantPublished: 1, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + // must never panic nor return an error + tc.dispatcher.Dispatch(tc.entry, tc.claims) + + if tc.dispatcher == nil || tc.dispatcher.publisher == nil { + return + } + fake := tc.dispatcher.publisher.(*fakePublisher) + + require.Len(t, fake.published, tc.wantPublished) + if tc.wantPublished == 0 { + return + } + + got := fake.published[0] + assert.Equal(t, auditor.AuditEventType, got.EventType) + assert.Equal(t, events.CASArtifactUploadedActionType, got.Data.ActionType) + assert.Equal(t, events.CASArtifactType, got.Data.TargetType) + assert.Equal(t, auditor.ActorType(auditor.ActorTypeSystem), got.Data.ActorType) + require.NotNil(t, got.Data.OrgID) + assert.Equal(t, testOrgID, got.Data.OrgID.String()) + }) + } +} + +func TestAuditDispatcherShouldEmit(t *testing.T) { + tests := []struct { + name string + dispatcher *AuditDispatcher + claims *casJWT.Claims + want bool + }{ + {name: "nil dispatcher", dispatcher: nil, claims: &casJWT.Claims{}, want: false}, + {name: "nil publisher", dispatcher: newTestDispatcher(nil), claims: &casJWT.Claims{}, want: false}, + {name: "nil claims", dispatcher: newTestDispatcher(&fakePublisher{}), claims: nil, want: false}, + {name: "internal traffic", dispatcher: newTestDispatcher(&fakePublisher{}), claims: &casJWT.Claims{SourceInternal: true}, want: false}, + {name: "client traffic", dispatcher: newTestDispatcher(&fakePublisher{}), claims: &casJWT.Claims{}, want: true}, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + assert.Equal(t, tc.want, tc.dispatcher.shouldEmit(tc.claims)) + }) + } +} + +// artifactEventInfo mirrors the action info payload of CAS artifact events for assertions +type artifactEventInfo struct { + Digest string `json:"digest"` + SizeBytes int64 `json:"size_bytes"` + FileName string `json:"file_name"` + BackendType string `json:"backend_type"` + Skipped bool `json:"skipped"` +} + +func decodeArtifactEvent(t *testing.T, payload *auditor.EventPayload) *artifactEventInfo { + t.Helper() + + info := &artifactEventInfo{} + require.NoError(t, json.Unmarshal(payload.Data.Info, info)) + return info +} diff --git a/app/artifact-cas/internal/service/bytestream.go b/app/artifact-cas/internal/service/bytestream.go index c167827dd..b533c1cf6 100644 --- a/app/artifact-cas/internal/service/bytestream.go +++ b/app/artifact-cas/internal/service/bytestream.go @@ -29,6 +29,7 @@ import ( "errors" v1 "github.com/chainloop-dev/chainloop/app/artifact-cas/api/cas/v1" + "github.com/chainloop-dev/chainloop/app/controlplane/pkg/auditor/events" casJWT "github.com/chainloop-dev/chainloop/internal/robotaccount/cas" backend "github.com/chainloop-dev/chainloop/pkg/blobmanager" "github.com/chainloop-dev/chainloop/pkg/otelx" @@ -93,6 +94,23 @@ func (s *ByteStreamService) Write(stream bytestream.ByteStream_WriteServer) erro return sl.LogAndMaskErr(err, s.log) } else if exists { s.log.Infow("msg", "artifact already exists", "digest", req.resource.Digest) + if s.audit.shouldEmit(info) { + // the stored size is not known at the dedup point, look it up best-effort + var size int64 + if r, err := storageBackend.Describe(ctx, req.resource.Digest); err == nil { + size = r.Size + } + + s.audit.Dispatch(&events.CASArtifactUploaded{ + CASArtifactBase: &events.CASArtifactBase{ + Digest: req.resource.Digest, + SizeBytes: size, + FileName: req.resource.FileName, + BackendType: info.BackendType, + }, + Skipped: true, + }, info) + } return stream.SendAndClose(&bytestream.WriteResponse{}) } @@ -136,6 +154,15 @@ func (s *ByteStreamService) Write(stream bytestream.ByteStream_WriteServer) erro } s.log.Infow("msg", "upload finished", "name", req.resource.FileName, "digest", req.resource.Digest, "size", buffer.size) + s.audit.Dispatch(&events.CASArtifactUploaded{ + CASArtifactBase: &events.CASArtifactBase{ + Digest: req.resource.Digest, + SizeBytes: buffer.size, + FileName: req.resource.FileName, + BackendType: info.BackendType, + }, + }, info) + return stream.SendAndClose(&bytestream.WriteResponse{CommittedSize: buffer.size}) } @@ -170,7 +197,7 @@ func (s *ByteStreamService) Read(req *bytestream.ReadRequest, stream bytestream. } // streamwriter will stream chunks of data to the client - sw := &streamWriter{stream, s.log, req.ResourceName, sha256.New()} + sw := &streamWriter{stream: stream, log: s.log, wantChecksum: req.ResourceName, gotChecksum: sha256.New()} if err := backend.Download(ctx, sw, req.ResourceName); err != nil { if isClientDisconnect(err) { s.log.Infow("msg", "download canceled", "digest", req.ResourceName) @@ -186,6 +213,13 @@ func (s *ByteStreamService) Read(req *bytestream.ReadRequest, stream bytestream. } s.log.Infow("msg", "download finished", "digest", req.ResourceName) + s.audit.Dispatch(&events.CASArtifactDownloaded{ + CASArtifactBase: &events.CASArtifactBase{ + Digest: req.ResourceName, + SizeBytes: sw.size, + BackendType: info.BackendType, + }, + }, info) return nil } @@ -312,6 +346,8 @@ type streamWriter struct { wantChecksum string // calculated gotChecksum of the data sent gotChecksum hash.Hash + // total number of bytes sent + size int64 } // Send the chunk of data through the bytestream @@ -322,6 +358,8 @@ func (sw *streamWriter) Write(data []byte) (int, error) { if _, err := sw.gotChecksum.Write(data); err != nil { return 0, err } + + sw.size += int64(len(data)) return len(data), sw.stream.Send(&bytestream.ReadResponse{Data: data}) } diff --git a/app/artifact-cas/internal/service/bytestream_test.go b/app/artifact-cas/internal/service/bytestream_test.go index 5f5149cb1..45c2cf451 100644 --- a/app/artifact-cas/internal/service/bytestream_test.go +++ b/app/artifact-cas/internal/service/bytestream_test.go @@ -146,6 +146,9 @@ func (s *bytestreamSuite) TestWrite() { // NOTE: separated test cases for each error case to make sure the context and stubs are re-set func (s *bytestreamSuite) TestWriteExist() { s.ociBackend.On("Exists", mock.Anything, s.resource.Digest).Return(true, nil) + s.ociBackend.On("Describe", mock.Anything, s.resource.Digest).Return(&v1.CASResource{ + FileName: s.resource.FileName, Digest: s.resource.Digest, Size: 1024, + }, nil) stream, err := s.client.Write(s.upCtx) s.NoError(err) @@ -156,6 +159,51 @@ func (s *bytestreamSuite) TestWriteExist() { got, err := stream.CloseAndRecv() s.NoError(err) s.Equal(int64(0), got.CommittedSize) + + // a skipped upload event is emitted with the stored size + s.Require().Len(s.audit.published, 1) + info := decodeArtifactEvent(s.T(), s.audit.published[0]) + s.True(info.Skipped) + s.Equal(s.resource.Digest, info.Digest) + s.Equal(int64(1024), info.SizeBytes) + s.Equal(s.resource.FileName, info.FileName) +} + +func (s *bytestreamSuite) TestWriteExistDescribeFails() { + s.ociBackend.On("Exists", mock.Anything, s.resource.Digest).Return(true, nil) + s.ociBackend.On("Describe", mock.Anything, s.resource.Digest).Return(nil, errors.New("describe failed")) + + stream, err := s.client.Write(s.upCtx) + s.NoError(err) + s.NoError(stream.Send(&bytestream.WriteRequest{ + ResourceName: encodeResource(s.T(), s.resource), + })) + + _, err = stream.CloseAndRecv() + s.NoError(err) + + // the event is still emitted, with unknown (0) size + s.Require().Len(s.audit.published, 1) + info := decodeArtifactEvent(s.T(), s.audit.published[0]) + s.True(info.Skipped) + s.Equal(int64(0), info.SizeBytes) +} + +func (s *bytestreamSuite) TestWriteExistInternalTraffic() { + // internal control plane traffic emits no events nor extra Describe calls + s.ociBackend.On("Exists", mock.Anything, s.resource.Digest).Return(true, nil) + + ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs("role", "uploader", "source-internal", "true")) + stream, err := s.client.Write(ctx) + s.NoError(err) + s.NoError(stream.Send(&bytestream.WriteRequest{ + ResourceName: encodeResource(s.T(), s.resource), + })) + + _, err = stream.CloseAndRecv() + s.NoError(err) + s.Empty(s.audit.published) + s.ociBackend.AssertNotCalled(s.T(), "Describe") } func (s *bytestreamSuite) TestWriteOK() { @@ -179,6 +227,14 @@ func (s *bytestreamSuite) TestWriteOK() { got, err := stream.CloseAndRecv() s.NoError(err) s.Equal(int64(len(data)), got.CommittedSize) + + // an upload event is emitted with the uploaded size + s.Require().Len(s.audit.published, 1) + info := decodeArtifactEvent(s.T(), s.audit.published[0]) + s.False(info.Skipped) + s.Equal(s.resource.Digest, info.Digest) + s.Equal(int64(len(data)), info.SizeBytes) + s.Equal(s.resource.FileName, info.FileName) } func (s *bytestreamSuite) TestWriteErrorUploading() { @@ -194,6 +250,8 @@ func (s *bytestreamSuite) TestWriteErrorUploading() { _, err = stream.CloseAndRecv() assertGRPCError(s.T(), err, codes.Internal, "") + // failed uploads emit no events + s.Empty(s.audit.published) } func (s *bytestreamSuite) TestRead() { @@ -235,14 +293,15 @@ func (s *bytestreamSuite) TestReadErrorDownloading() { } func (s *bytestreamSuite) TestDownloadOk() { - s.ociBackend.On("Download", mock.Anything, mock.Anything, "b94d27b9934d3e08a52e52d7da7dabfac484efe37a5380ee9088f7ace2efcde9"). + const digest = "b94d27b9934d3e08a52e52d7da7dabfac484efe37a5380ee9088f7ace2efcde9" + s.ociBackend.On("Download", mock.Anything, mock.Anything, digest). Return(nil).Run(func(args mock.Arguments) { buf := bytes.NewBuffer([]byte("hello world")) _, err := io.Copy(args.Get(1).(io.Writer), buf) s.NoError(err) }) - reader, err := s.client.Read(s.downCtx, &bytestream.ReadRequest{ResourceName: "b94d27b9934d3e08a52e52d7da7dabfac484efe37a5380ee9088f7ace2efcde9"}) + reader, err := s.client.Read(s.downCtx, &bytestream.ReadRequest{ResourceName: digest}) s.NoError(err) // receive the data, it should contain all of it since the buffer is server side is 1MB @@ -253,6 +312,13 @@ func (s *bytestreamSuite) TestDownloadOk() { got, err = reader.Recv() s.ErrorIs(err, io.EOF) s.Nil(got) + + // a download event is emitted with the transferred size + s.Require().Len(s.audit.published, 1) + info := decodeArtifactEvent(s.T(), s.audit.published[0]) + s.Equal(digest, info.Digest) + s.Equal(int64(len("hello world")), info.SizeBytes) + s.False(info.Skipped) } func (s *bytestreamSuite) TestDownloadFoundMistmathedDigest() { @@ -274,6 +340,8 @@ func (s *bytestreamSuite) TestDownloadFoundMistmathedDigest() { got, err = reader.Recv() s.ErrorContains(err, "checksum mismatch:") s.Nil(got) + // tampered downloads emit no events + s.Empty(s.audit.published) } func assertGRPCError(t *testing.T, err error, code codes.Code, errMsg string) { @@ -293,6 +361,7 @@ type bytestreamSuite struct { client bytestream.ByteStreamClient ociBackend *mocks.UploaderDownloader resource *v1.CASResource + audit *fakePublisher upCtx context.Context downCtx context.Context } @@ -318,7 +387,10 @@ func (s *bytestreamSuite) SetupTest() { } claims := &casJWT.Claims{ - StoredSecretID: "secret-id", BackendType: backendType, + StoredSecretID: "secret-id", BackendType: backendType, OrgID: testOrgID, + } + if v := md.Get("source-internal"); len(v) > 0 { + claims.SourceInternal = true } if maxBytes := md.Get("max-bytes"); len(maxBytes) > 0 { parsedMaxBytes, err := strconv.ParseInt(maxBytes[0], 10, 64) @@ -327,9 +399,10 @@ func (s *bytestreamSuite) SetupTest() { } if roles := md.Get("role"); len(roles) > 0 { - if roles[0] == "downloader" { + switch roles[0] { + case "downloader": claims.Role = casJWT.Downloader - } else if roles[0] == "uploader" { + case "uploader": claims.Role = casJWT.Uploader } } @@ -343,11 +416,12 @@ func (s *bytestreamSuite) SetupTest() { ociBackend := mocks.NewUploaderDownloader(s.T()) ociBackendProvider.On("FromCredentials", mock.Anything, mock.Anything).Maybe().Return(ociBackend, nil) + s.audit = &fakePublisher{} bytestream.RegisterByteStreamServer( server, NewByteStreamService(backend.Providers{ backendType: ociBackendProvider, - }, WithLogger(log.DefaultLogger)), + }, WithLogger(log.DefaultLogger), WithAuditDispatcher(newTestDispatcher(s.audit))), ) go func() { _ = server.Serve(l) diff --git a/app/artifact-cas/internal/service/download.go b/app/artifact-cas/internal/service/download.go index 1da7693bb..dcc6f8f34 100644 --- a/app/artifact-cas/internal/service/download.go +++ b/app/artifact-cas/internal/service/download.go @@ -25,6 +25,7 @@ import ( "strconv" "code.cloudfoundry.org/bytefmt" + "github.com/chainloop-dev/chainloop/app/controlplane/pkg/auditor/events" casJWT "github.com/chainloop-dev/chainloop/internal/robotaccount/cas" backend "github.com/chainloop-dev/chainloop/pkg/blobmanager" "github.com/chainloop-dev/chainloop/pkg/otelx" @@ -133,6 +134,16 @@ func (s *DownloadService) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } + // the backend egress completed and the content was verified, record the download + s.audit.Dispatch(&events.CASArtifactDownloaded{ + CASArtifactBase: &events.CASArtifactBase{ + Digest: wantChecksum.Hex, + SizeBytes: info.Size, + FileName: filename, + BackendType: auth.BackendType, + }, + }, auth) + if _, err := io.Copy(w, buf); err != nil { if isClientDisconnect(err) { s.log.Infow("msg", "download canceled during response write", "digest", wantChecksum) diff --git a/app/artifact-cas/internal/service/download_test.go b/app/artifact-cas/internal/service/download_test.go new file mode 100644 index 000000000..c6a219a89 --- /dev/null +++ b/app/artifact-cas/internal/service/download_test.go @@ -0,0 +1,123 @@ +// +// Copyright 2026 The Chainloop Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package service + +import ( + "io" + "net/http" + "net/http/httptest" + "testing" + + v1 "github.com/chainloop-dev/chainloop/app/artifact-cas/api/cas/v1" + casJWT "github.com/chainloop-dev/chainloop/internal/robotaccount/cas" + backend "github.com/chainloop-dev/chainloop/pkg/blobmanager" + "github.com/chainloop-dev/chainloop/pkg/blobmanager/mocks" + "github.com/go-kratos/kratos/v2/log" + jwtMiddleware "github.com/go-kratos/kratos/v2/middleware/auth/jwt" + "github.com/gorilla/mux" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" +) + +func TestDownloadServiceAuditEvents(t *testing.T) { + const ( + backendType = "backend-type" + // sha256 of "hello world" + digestHex = "b94d27b9934d3e08a52e52d7da7dabfac484efe37a5380ee9088f7ace2efcde9" + ) + + downloaderClaims := func(sourceInternal bool) *casJWT.Claims { + return &casJWT.Claims{ + Role: casJWT.Downloader, + StoredSecretID: "secret-id", + BackendType: backendType, + OrgID: testOrgID, + SourceInternal: sourceInternal, + } + } + + tests := []struct { + name string + content string + claims *casJWT.Claims + wantStatus int + wantEvents int + }{ + { + name: "successful download emits an event", + content: "hello world", + claims: downloaderClaims(false), + wantStatus: http.StatusOK, + wantEvents: 1, + }, + { + name: "checksum mismatch emits no event", + content: "tampered content", + claims: downloaderClaims(false), + wantStatus: http.StatusUnauthorized, + }, + { + name: "internal control plane traffic emits no event", + content: "hello world", + claims: downloaderClaims(true), + wantStatus: http.StatusOK, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + provider := mocks.NewProvider(t) + uploaderDownloader := mocks.NewUploaderDownloader(t) + provider.On("FromCredentials", mock.Anything, mock.Anything).Return(uploaderDownloader, nil) + uploaderDownloader.On("Describe", mock.Anything, digestHex).Return(&v1.CASResource{ + FileName: "test.txt", Digest: digestHex, Size: int64(len(tc.content)), + }, nil) + uploaderDownloader.On("Download", mock.Anything, mock.Anything, digestHex).Return(nil). + Run(func(args mock.Arguments) { + _, err := io.WriteString(args.Get(1).(io.Writer), tc.content) + require.NoError(t, err) + }) + + audit := &fakePublisher{} + svc := NewDownloadService( + backend.Providers{backendType: provider}, + WithLogger(log.DefaultLogger), + WithAuditDispatcher(newTestDispatcher(audit)), + ) + + req := httptest.NewRequest(http.MethodGet, "/download/sha256:"+digestHex, nil) + req = mux.SetURLVars(req, map[string]string{"digest": "sha256:" + digestHex}) + req = req.WithContext(jwtMiddleware.NewContext(req.Context(), tc.claims)) + + w := httptest.NewRecorder() + svc.ServeHTTP(w, req) + + assert.Equal(t, tc.wantStatus, w.Code) + require.Len(t, audit.published, tc.wantEvents) + if tc.wantEvents == 0 { + return + } + + info := decodeArtifactEvent(t, audit.published[0]) + assert.Equal(t, digestHex, info.Digest) + assert.Equal(t, int64(len(tc.content)), info.SizeBytes) + assert.Equal(t, "test.txt", info.FileName) + assert.Equal(t, backendType, info.BackendType) + assert.False(t, info.Skipped) + }) + } +} diff --git a/app/artifact-cas/internal/service/service.go b/app/artifact-cas/internal/service/service.go index acc1f453b..a1a24c2ef 100644 --- a/app/artifact-cas/internal/service/service.go +++ b/app/artifact-cas/internal/service/service.go @@ -36,6 +36,8 @@ var ProviderSet = wire.NewSet(NewByteStreamService, NewResourceService, NewDownl type commonService struct { log *log.Helper backends backend.Providers + // best-effort audit events publisher, nil-safe + audit *AuditDispatcher } func (s *commonService) loadBackend(ctx context.Context, providerType, secretID string) (backend.UploaderDownloader, error) { @@ -64,6 +66,12 @@ func WithLogger(logger log.Logger) NewOpt { } } +func WithAuditDispatcher(d *AuditDispatcher) NewOpt { + return func(s *commonService) { + s.audit = d + } +} + func newCommonService(backends backend.Providers, opts ...NewOpt) *commonService { s := &commonService{ log: servicelogger.EmptyLogger(), diff --git a/app/controlplane/pkg/auditor/events/casartifact.go b/app/controlplane/pkg/auditor/events/casartifact.go new file mode 100644 index 000000000..b5f75a49b --- /dev/null +++ b/app/controlplane/pkg/auditor/events/casartifact.go @@ -0,0 +1,118 @@ +// +// Copyright 2026 The Chainloop Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package events + +import ( + "encoding/json" + "errors" + "fmt" + + "github.com/chainloop-dev/chainloop/app/controlplane/pkg/auditor" + "github.com/google/uuid" +) + +var ( + _ auditor.LogEntry = (*CASArtifactUploaded)(nil) + _ auditor.LogEntry = (*CASArtifactDownloaded)(nil) +) + +const ( + CASArtifactType auditor.TargetType = "CASArtifact" + CASArtifactUploadedActionType string = "CASArtifactUploaded" + CASArtifactDownloadedActionType string = "CASArtifactDownloaded" +) + +// CASArtifactBase contains the common fields for all CAS artifact events. +// These events are emitted by the Artifact CAS data plane, where only the +// organization (not the user) is known, so they don't require an actor. +type CASArtifactBase struct { + // Digest is the sha256 hex digest of the artifact + Digest string `json:"digest"` + // SizeBytes is the size of the artifact, 0 when unknown + SizeBytes int64 `json:"size_bytes"` + FileName string `json:"file_name,omitempty"` + BackendType string `json:"backend_type,omitempty"` +} + +func (c *CASArtifactBase) RequiresActor() bool { + return false +} + +func (c *CASArtifactBase) TargetType() auditor.TargetType { + return CASArtifactType +} + +// TargetID is nil since artifacts are identified by their digest, carried in the action info +func (c *CASArtifactBase) TargetID() *uuid.UUID { + return nil +} + +func (c *CASArtifactBase) validate() error { + if c.Digest == "" { + return errors.New("digest is required") + } + + return nil +} + +// CASArtifactUploaded represents an artifact upload to the CAS. +// Skipped is true when the upload was deduplicated: the artifact already +// existed in the backend and no bytes were transferred or stored. +type CASArtifactUploaded struct { + *CASArtifactBase + Skipped bool `json:"skipped"` +} + +func (c *CASArtifactUploaded) ActionType() string { + return CASArtifactUploadedActionType +} + +func (c *CASArtifactUploaded) ActionInfo() (json.RawMessage, error) { + if err := c.validate(); err != nil { + return nil, err + } + + return json.Marshal(&c) +} + +func (c *CASArtifactUploaded) Description() string { + if c.Skipped { + return fmt.Sprintf("upload of artifact %s skipped, already exists", c.Digest) + } + + return fmt.Sprintf("artifact %s (%d bytes) was uploaded", c.Digest, c.SizeBytes) +} + +// CASArtifactDownloaded represents an artifact download from the CAS +type CASArtifactDownloaded struct { + *CASArtifactBase +} + +func (c *CASArtifactDownloaded) ActionType() string { + return CASArtifactDownloadedActionType +} + +func (c *CASArtifactDownloaded) ActionInfo() (json.RawMessage, error) { + if err := c.validate(); err != nil { + return nil, err + } + + return json.Marshal(&c) +} + +func (c *CASArtifactDownloaded) Description() string { + return fmt.Sprintf("artifact %s (%d bytes) was downloaded", c.Digest, c.SizeBytes) +} diff --git a/app/controlplane/pkg/auditor/events/casartifact_test.go b/app/controlplane/pkg/auditor/events/casartifact_test.go new file mode 100644 index 000000000..96bcd497a --- /dev/null +++ b/app/controlplane/pkg/auditor/events/casartifact_test.go @@ -0,0 +1,169 @@ +// +// Copyright 2026 The Chainloop Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package events_test + +import ( + "encoding/json" + "os" + "path/filepath" + "testing" + + "github.com/chainloop-dev/chainloop/app/controlplane/pkg/auditor" + "github.com/chainloop-dev/chainloop/app/controlplane/pkg/auditor/events" + + "github.com/google/uuid" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestCASArtifactEvents(t *testing.T) { + orgUUID, err := uuid.Parse("1089bb36-e27b-428b-8009-d015c8737c54") + require.NoError(t, err) + + const ( + digest = "b94d27b9934d3e08a52e52d7da7dabfac484efe37a5380ee9088f7ace2efcde9" + fileName = "sbom.cyclonedx.json" + backendType = "OCI" + ) + + tests := []struct { + name string + event auditor.LogEntry + expected string + }{ + { + name: "artifact uploaded", + event: &events.CASArtifactUploaded{ + CASArtifactBase: &events.CASArtifactBase{ + Digest: digest, + SizeBytes: 1024, + FileName: fileName, + BackendType: backendType, + }, + }, + expected: "testdata/casartifacts/casartifact_uploaded.json", + }, + { + name: "artifact upload skipped (deduplicated)", + event: &events.CASArtifactUploaded{ + CASArtifactBase: &events.CASArtifactBase{ + Digest: digest, + SizeBytes: 1024, + FileName: fileName, + BackendType: backendType, + }, + Skipped: true, + }, + expected: "testdata/casartifacts/casartifact_upload_skipped.json", + }, + { + name: "artifact upload skipped with unknown size", + event: &events.CASArtifactUploaded{ + CASArtifactBase: &events.CASArtifactBase{ + Digest: digest, + BackendType: backendType, + }, + Skipped: true, + }, + expected: "testdata/casartifacts/casartifact_upload_skipped_unknown_size.json", + }, + { + name: "artifact downloaded", + event: &events.CASArtifactDownloaded{ + CASArtifactBase: &events.CASArtifactBase{ + Digest: digest, + SizeBytes: 1024, + FileName: fileName, + BackendType: backendType, + }, + }, + expected: "testdata/casartifacts/casartifact_downloaded.json", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // CAS artifact events are system-generated, no actor identity available + require.False(t, tt.event.RequiresActor()) + require.Nil(t, tt.event.TargetID()) + require.Equal(t, events.CASArtifactType, tt.event.TargetType()) + + eventPayload, err := auditor.GenerateAuditEvent(tt.event, + auditor.WithOrgID(orgUUID), + auditor.WithActor(auditor.ActorTypeSystem, uuid.Nil, "", ""), + ) + require.NoError(t, err) + + want, err := json.MarshalIndent(eventPayload.Data, "", " ") + require.NoError(t, err) + + if updateGolden { + err := os.MkdirAll(filepath.Dir(tt.expected), 0755) + require.NoError(t, err) + err = os.WriteFile(filepath.Clean(tt.expected), want, 0600) + require.NoError(t, err) + } + + gotRaw, err := os.ReadFile(filepath.Clean(tt.expected)) + require.NoError(t, err) + + var gotPayload auditor.AuditEventPayload + err = json.Unmarshal(gotRaw, &gotPayload) + require.NoError(t, err) + got, err := json.MarshalIndent(gotPayload, "", " ") + require.NoError(t, err) + + assert.Equal(t, string(want), string(got)) + }) + } +} + +// TestCASArtifactEventsFailed tests the behavior of CAS artifact events when they are expected to fail +func TestCASArtifactEventsFailed(t *testing.T) { + tests := []struct { + name string + event auditor.LogEntry + expectedErr string + }{ + { + name: "artifact uploaded with missing digest", + event: &events.CASArtifactUploaded{ + CASArtifactBase: &events.CASArtifactBase{ + SizeBytes: 1024, + FileName: "sbom.cyclonedx.json", + }, + }, + expectedErr: "digest is required", + }, + { + name: "artifact downloaded with missing digest", + event: &events.CASArtifactDownloaded{ + CASArtifactBase: &events.CASArtifactBase{ + SizeBytes: 1024, + }, + }, + expectedErr: "digest is required", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + _, err := tt.event.ActionInfo() + assert.Error(t, err) + assert.Contains(t, err.Error(), tt.expectedErr) + }) + } +} diff --git a/app/controlplane/pkg/auditor/events/testdata/casartifacts/casartifact_downloaded.json b/app/controlplane/pkg/auditor/events/testdata/casartifacts/casartifact_downloaded.json new file mode 100644 index 000000000..1a6ea51ea --- /dev/null +++ b/app/controlplane/pkg/auditor/events/testdata/casartifacts/casartifact_downloaded.json @@ -0,0 +1,18 @@ +{ + "ActionType": "CASArtifactDownloaded", + "TargetType": "CASArtifact", + "TargetID": null, + "ActorType": "SYSTEM", + "ActorID": null, + "ActorEmail": "", + "ActorName": "", + "OrgID": "1089bb36-e27b-428b-8009-d015c8737c54", + "Description": "artifact b94d27b9934d3e08a52e52d7da7dabfac484efe37a5380ee9088f7ace2efcde9 (1024 bytes) was downloaded", + "Info": { + "digest": "b94d27b9934d3e08a52e52d7da7dabfac484efe37a5380ee9088f7ace2efcde9", + "size_bytes": 1024, + "file_name": "sbom.cyclonedx.json", + "backend_type": "OCI" + }, + "Digest": "sha256:10edea2ce083e958285ccfae58b3d07e5697f74d3f87b32950b9dc821aa74baf" +} \ No newline at end of file diff --git a/app/controlplane/pkg/auditor/events/testdata/casartifacts/casartifact_upload_skipped.json b/app/controlplane/pkg/auditor/events/testdata/casartifacts/casartifact_upload_skipped.json new file mode 100644 index 000000000..3c151a10b --- /dev/null +++ b/app/controlplane/pkg/auditor/events/testdata/casartifacts/casartifact_upload_skipped.json @@ -0,0 +1,19 @@ +{ + "ActionType": "CASArtifactUploaded", + "TargetType": "CASArtifact", + "TargetID": null, + "ActorType": "SYSTEM", + "ActorID": null, + "ActorEmail": "", + "ActorName": "", + "OrgID": "1089bb36-e27b-428b-8009-d015c8737c54", + "Description": "upload of artifact b94d27b9934d3e08a52e52d7da7dabfac484efe37a5380ee9088f7ace2efcde9 skipped, already exists", + "Info": { + "digest": "b94d27b9934d3e08a52e52d7da7dabfac484efe37a5380ee9088f7ace2efcde9", + "size_bytes": 1024, + "file_name": "sbom.cyclonedx.json", + "backend_type": "OCI", + "skipped": true + }, + "Digest": "sha256:ad38191fe0592e1856981ea6d7d6e5fa3b0d4c0eca95f006d195c28aca5cc253" +} \ No newline at end of file diff --git a/app/controlplane/pkg/auditor/events/testdata/casartifacts/casartifact_upload_skipped_unknown_size.json b/app/controlplane/pkg/auditor/events/testdata/casartifacts/casartifact_upload_skipped_unknown_size.json new file mode 100644 index 000000000..73fc066e8 --- /dev/null +++ b/app/controlplane/pkg/auditor/events/testdata/casartifacts/casartifact_upload_skipped_unknown_size.json @@ -0,0 +1,18 @@ +{ + "ActionType": "CASArtifactUploaded", + "TargetType": "CASArtifact", + "TargetID": null, + "ActorType": "SYSTEM", + "ActorID": null, + "ActorEmail": "", + "ActorName": "", + "OrgID": "1089bb36-e27b-428b-8009-d015c8737c54", + "Description": "upload of artifact b94d27b9934d3e08a52e52d7da7dabfac484efe37a5380ee9088f7ace2efcde9 skipped, already exists", + "Info": { + "digest": "b94d27b9934d3e08a52e52d7da7dabfac484efe37a5380ee9088f7ace2efcde9", + "size_bytes": 0, + "backend_type": "OCI", + "skipped": true + }, + "Digest": "sha256:53267f375f5380485824fb9b9894df38ac0abeb286c8213aa201dc42b6c08627" +} \ No newline at end of file diff --git a/app/controlplane/pkg/auditor/events/testdata/casartifacts/casartifact_uploaded.json b/app/controlplane/pkg/auditor/events/testdata/casartifacts/casartifact_uploaded.json new file mode 100644 index 000000000..95842ee1e --- /dev/null +++ b/app/controlplane/pkg/auditor/events/testdata/casartifacts/casartifact_uploaded.json @@ -0,0 +1,19 @@ +{ + "ActionType": "CASArtifactUploaded", + "TargetType": "CASArtifact", + "TargetID": null, + "ActorType": "SYSTEM", + "ActorID": null, + "ActorEmail": "", + "ActorName": "", + "OrgID": "1089bb36-e27b-428b-8009-d015c8737c54", + "Description": "artifact b94d27b9934d3e08a52e52d7da7dabfac484efe37a5380ee9088f7ace2efcde9 (1024 bytes) was uploaded", + "Info": { + "digest": "b94d27b9934d3e08a52e52d7da7dabfac484efe37a5380ee9088f7ace2efcde9", + "size_bytes": 1024, + "file_name": "sbom.cyclonedx.json", + "backend_type": "OCI", + "skipped": false + }, + "Digest": "sha256:071f2a3d5546c88472102c0681e0305a9a4f2cc58d8d44a655c767d23c3b3491" +} \ No newline at end of file diff --git a/app/controlplane/pkg/auditor/nats.go b/app/controlplane/pkg/auditor/nats.go index e32480997..a11349c00 100644 --- a/app/controlplane/pkg/auditor/nats.go +++ b/app/controlplane/pkg/auditor/nats.go @@ -41,15 +41,14 @@ type AuditLogPublisher struct { logger *log.Helper } +// NewAuditLogPublisher creates a publisher that owns the JetStream stream: +// it creates or updates it on boot and after every NATS reconnection. func NewAuditLogPublisher(ctx context.Context, rc *natsconn.ReloadableConnection, logger log.Logger) (*AuditLogPublisher, error) { - l := log.NewHelper(log.With(logger, "component", "natsAuditLogPublisher")) - if rc == nil { - l.Infow("msg", "NATS connection not set, audit log publisher disabled") + p := newPublisher(rc, logger) + if p == nil { return nil, nil } - p := &AuditLogPublisher{rc: rc, logger: l} - if err := p.initJetStream(); err != nil { return nil, err } @@ -59,6 +58,30 @@ func NewAuditLogPublisher(ctx context.Context, rc *natsconn.ReloadableConnection return p, nil } +// NewPublishOnlyAuditLogPublisher creates a publisher that never creates or +// updates the JetStream stream. Meant for components (e.g. the Artifact CAS) +// that publish to the stream owned and configured by the control plane, so they +// can't accidentally override its configuration (e.g. downgrade the replica count). +func NewPublishOnlyAuditLogPublisher(rc *natsconn.ReloadableConnection, logger log.Logger) (*AuditLogPublisher, error) { + p := newPublisher(rc, logger) + if p != nil { + p.logger.Infow("msg", "stream management disabled, running in publish-only mode") + } + + return p, nil +} + +// newPublisher returns nil when NATS is not configured (the publisher is disabled) +func newPublisher(rc *natsconn.ReloadableConnection, logger log.Logger) *AuditLogPublisher { + l := log.NewHelper(log.With(logger, "component", "natsAuditLogPublisher")) + if rc == nil { + l.Infow("msg", "NATS connection not set, audit log publisher disabled") + return nil + } + + return &AuditLogPublisher{rc: rc, logger: l} +} + func (p *AuditLogPublisher) initJetStream() error { js, err := jetstream.New(p.rc.Conn) if err != nil { diff --git a/app/controlplane/pkg/auditor/nats_test.go b/app/controlplane/pkg/auditor/nats_test.go new file mode 100644 index 000000000..ebe4f06bf --- /dev/null +++ b/app/controlplane/pkg/auditor/nats_test.go @@ -0,0 +1,77 @@ +// +// Copyright 2026 The Chainloop Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package auditor + +import ( + "context" + "testing" + + "github.com/chainloop-dev/chainloop/pkg/natsconn" + "github.com/go-kratos/kratos/v2/log" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestNewAuditLogPublisher(t *testing.T) { + tests := []struct { + name string + rc *natsconn.ReloadableConnection + constructor func(*natsconn.ReloadableConnection) (*AuditLogPublisher, error) + // nil publisher means disabled (no NATS configured) + wantNil bool + }{ + { + name: "nil connection disables the publisher", + rc: nil, + constructor: func(rc *natsconn.ReloadableConnection) (*AuditLogPublisher, error) { + return NewAuditLogPublisher(context.Background(), rc, log.DefaultLogger) + }, + wantNil: true, + }, + { + name: "nil connection disables the publish-only publisher", + rc: nil, + constructor: func(rc *natsconn.ReloadableConnection) (*AuditLogPublisher, error) { + return NewPublishOnlyAuditLogPublisher(rc, log.DefaultLogger) + }, + wantNil: true, + }, + { + // publish-only mode skips stream creation/updates so it doesn't + // need a live JetStream context at construction time + name: "publish-only mode skips stream management", + rc: &natsconn.ReloadableConnection{}, + constructor: func(rc *natsconn.ReloadableConnection) (*AuditLogPublisher, error) { + return NewPublishOnlyAuditLogPublisher(rc, log.DefaultLogger) + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + p, err := tc.constructor(tc.rc) + require.NoError(t, err) + if tc.wantNil { + assert.Nil(t, p) + // nil publisher is a no-op + assert.NoError(t, p.Publish(&EventPayload{})) + return + } + + assert.NotNil(t, p) + }) + } +} diff --git a/app/controlplane/pkg/biz/casclient.go b/app/controlplane/pkg/biz/casclient.go index 5e5d9b9f2..f09a3f0f7 100644 --- a/app/controlplane/pkg/biz/casclient.go +++ b/app/controlplane/pkg/biz/casclient.go @@ -110,7 +110,8 @@ func (uc *CASClientUseCase) Upload(ctx context.Context, backendType, secretID st uc.logger.Infow("msg", "upload initialized", "filename", filename, "digest", digest) // client with temporary set of credentials - client, closeFn, err := uc.casAPIClient(&CASCredsOpts{BackendType: backendType, SecretPath: secretID, Role: casJWT.Uploader, OrgID: orgID}) + // SourceInternal flags this as the control plane's own traffic so the CAS doesn't emit audit events for it + client, closeFn, err := uc.casAPIClient(&CASCredsOpts{BackendType: backendType, SecretPath: secretID, Role: casJWT.Uploader, OrgID: orgID, SourceInternal: true}) if err != nil { return fmt.Errorf("failed to create cas client: %w", err) } @@ -132,7 +133,8 @@ func (uc *CASClientUseCase) Download(ctx context.Context, backendType, secretID uc.logger.Infow("msg", "download initialized", "digest", digest) - client, closeFn, err := uc.casAPIClient(&CASCredsOpts{BackendType: backendType, SecretPath: secretID, Role: casJWT.Downloader, OrgID: orgID}) + // SourceInternal flags this as the control plane's own traffic so the CAS doesn't emit audit events for it + client, closeFn, err := uc.casAPIClient(&CASCredsOpts{BackendType: backendType, SecretPath: secretID, Role: casJWT.Downloader, OrgID: orgID, SourceInternal: true}) if err != nil { return fmt.Errorf("failed to create cas client: %w", err) } diff --git a/app/controlplane/pkg/biz/cascredentials.go b/app/controlplane/pkg/biz/cascredentials.go index 41e1360bb..cf7e36f15 100644 --- a/app/controlplane/pkg/biz/cascredentials.go +++ b/app/controlplane/pkg/biz/cascredentials.go @@ -55,11 +55,20 @@ type CASCredsOpts struct { // it to scope per-tenant STS sessions, and non-managed providers // still carry it for audit traceability. OrgID uuid.UUID + // SourceInternal flags tokens minted for the control plane's own CAS + // client so the CAS can skip audit events for internal traffic + SourceInternal bool } func (uc *CASCredentialsUseCase) GenerateTemporaryCredentials(backendRef *CASCredsOpts) (string, error) { if backendRef.OrgID == uuid.Nil { return "", fmt.Errorf("org id is required") } - return uc.jwtBuilder.GenerateJWT(backendRef.BackendType, backendRef.SecretPath, jwt.CASAudience, backendRef.Role, backendRef.MaxBytes, backendRef.OrgID.String()) + + var opts []robotaccount.GenerateOpt + if backendRef.SourceInternal { + opts = append(opts, robotaccount.WithSourceInternal()) + } + + return uc.jwtBuilder.GenerateJWT(backendRef.BackendType, backendRef.SecretPath, jwt.CASAudience, backendRef.Role, backendRef.MaxBytes, backendRef.OrgID.String(), opts...) } diff --git a/deployment/chainloop/Chart.yaml b/deployment/chainloop/Chart.yaml index 911c7d2ea..3104beaac 100644 --- a/deployment/chainloop/Chart.yaml +++ b/deployment/chainloop/Chart.yaml @@ -7,7 +7,7 @@ description: Chainloop is an open source software supply chain control plane, a type: application # Bump the patch (not minor, not major) version on each change in the Chart Source code -version: 1.393.0 +version: 1.393.1 # Do not update appVersion, this is handled automatically by the release process appVersion: v1.100.3 diff --git a/deployment/chainloop/README.md b/deployment/chainloop/README.md index 1293d86e2..c9e30d54d 100644 --- a/deployment/chainloop/README.md +++ b/deployment/chainloop/README.md @@ -786,6 +786,11 @@ Once done, you can access with [two predefined users](https://github.com/chainlo | `cas.replicaCount` | Number of replicas | `2` | | `cas.defaultMaxEntrySize` | Maximum size for each entry in the CAS backend, default 100MB | | | `cas.externalURL` | Optional External URL for the CAS service. If not set it will be derived from the ingress and service configuration | | +| `cas.nats` | optional NATS configuration for audit events publishing. The stream is owned and configured by the control plane, the CAS only publishes to it. | | +| `cas.nats.enabled` | Enable audit events publishing through a Nats stream | `false` | +| `cas.nats.host` | NATS Host | `""` | +| `cas.nats.port` | NATS Port | `4222` | +| `cas.nats.token` | NATS Client authentication token | `""` | | `cas.image.registry` | Image registry | `REGISTRY_NAME` | | `cas.image.repository` | Image repository | `REPOSITORY_NAME` | | `cas.containerPorts.http` | controlplane HTTP container port | `8000` | diff --git a/deployment/chainloop/templates/_helpers.tpl b/deployment/chainloop/templates/_helpers.tpl index 37447d265..7dcd8e264 100644 --- a/deployment/chainloop/templates/_helpers.tpl +++ b/deployment/chainloop/templates/_helpers.tpl @@ -483,3 +483,12 @@ Return the Nats connection string {{- printf "nats://%s:%d" $host ($port | int) }} {{- end -}} + +{{/* +Return the Nats connection string for the CAS +*/}} +{{- define "cas.nats.connection_string" -}} +{{- $host := required "nats server hostname not set" .Values.cas.nats.host }} +{{- $port := required "nats server port not set" .Values.cas.nats.port }} +{{- printf "nats://%s:%d" $host ($port | int) }} +{{- end -}} diff --git a/deployment/chainloop/templates/cas/secret-config.yaml b/deployment/chainloop/templates/cas/secret-config.yaml index 4d0ea5790..e037358fd 100644 --- a/deployment/chainloop/templates/cas/secret-config.yaml +++ b/deployment/chainloop/templates/cas/secret-config.yaml @@ -24,4 +24,11 @@ stringData: public_key_path: "/tmp/cas.public.pem" # Deprecated, use public_key_path instead. Remove option once release of the app 0.15+ is out. robot_account_public_key_path: "/tmp/cas.public.pem" + {{- if .Values.cas.nats.enabled }} + nats_server: + uri: {{ include "cas.nats.connection_string" . | quote }} + {{- if ne .Values.cas.nats.token "" }} + token: {{ .Values.cas.nats.token | quote }} + {{- end }} + {{- end }} # TODO: add observability \ No newline at end of file diff --git a/deployment/chainloop/values.yaml b/deployment/chainloop/values.yaml index c44e009ea..eda3457a6 100644 --- a/deployment/chainloop/values.yaml +++ b/deployment/chainloop/values.yaml @@ -1024,6 +1024,17 @@ cas: ## allowedOrigins: "https://app.chainloop.dev,https://console.example.com" allowedOrigins: [] + ## @extra cas.nats optional NATS configuration for audit events publishing. The stream is owned and configured by the control plane, the CAS only publishes to it. + ## @param cas.nats.enabled Enable audit events publishing through a Nats stream + ## @param cas.nats.host NATS Host + ## @param cas.nats.port NATS Port + ## @param cas.nats.token NATS Client authentication token + nats: + enabled: false + host: "" + port: 4222 + token: "" + ## @param cas.image.registry [default: REGISTRY_NAME] Image registry ## @param cas.image.repository [default: REPOSITORY_NAME] Image repository ## @skip cas.image.tag diff --git a/internal/robotaccount/cas/robotaccount.go b/internal/robotaccount/cas/robotaccount.go index 223f69b58..9f7e33726 100644 --- a/internal/robotaccount/cas/robotaccount.go +++ b/internal/robotaccount/cas/robotaccount.go @@ -46,6 +46,11 @@ type Claims struct { // per-tenant STS sessions; the non-managed providers ignore it but // it is still carried for audit traceability. OrgID string `json:"org-id"` + // SourceInternal is true when the token was minted for the control plane's + // own CAS client (e.g. attestation storage, policy material reads). + // The CAS skips audit event emission for this traffic so it doesn't + // pollute per-org usage numbers. The zero value (false) means client traffic. + SourceInternal bool `json:"source-internal,omitempty"` } type Role string @@ -111,12 +116,23 @@ func NewBuilder(opts ...NewOpt) (*Builder, error) { return b, nil } +// GenerateOpt tweaks optional claims of the minted token +type GenerateOpt func(c *Claims) + +// WithSourceInternal flags the token as minted for the control plane's own +// CAS client, so the CAS can tell internal traffic apart from client traffic +func WithSourceInternal() GenerateOpt { + return func(c *Claims) { + c.SourceInternal = true + } +} + // GenerateJWT mints a CAS token. All fields are required, including // orgID — managed providers (e.g. AWS-S3-ACCESS-POINT) need it to scope // per-tenant STS sessions and other providers still record it for // audit. The token always carries the CAS audience and a short expiry // window. -func (ra *Builder) GenerateJWT(backendType, secretID, audience string, role Role, maxBytes int64, orgID string) (string, error) { +func (ra *Builder) GenerateJWT(backendType, secretID, audience string, role Role, maxBytes int64, orgID string, opts ...GenerateOpt) (string, error) { if backendType == "" { return "", fmt.Errorf("backend type is required") } @@ -159,6 +175,10 @@ func (ra *Builder) GenerateJWT(backendType, secretID, audience string, role Role claims.ExpiresAt = jwt.NewNumericDate(time.Now().Add(*ra.expiration)) } + for _, opt := range opts { + opt(claims) + } + resultToken := jwt.NewWithClaims(SigningMethod, claims) return resultToken.SignedString(ra.pk) } diff --git a/internal/robotaccount/cas/robotaccount_test.go b/internal/robotaccount/cas/robotaccount_test.go index d627a5633..eee0b868e 100644 --- a/internal/robotaccount/cas/robotaccount_test.go +++ b/internal/robotaccount/cas/robotaccount_test.go @@ -1,5 +1,5 @@ // -// Copyright 2023 The Chainloop Authors. +// Copyright 2023-2026 The Chainloop Authors. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -150,27 +150,41 @@ func TestGenerateJWT(t *testing.T) { WithPrivateKey("testdata/test-key.ec.pem"), WithExpiration(5*time.Second), ) - require.NoError(t, err) - token, err := b.GenerateJWT("OCI", "secret-id", JWTAudience, Uploader, 123, "org-uuid") - assert.NoError(t, err) - assert.NotEmpty(t, token) - // Verify signature and check claims - rawKey, err := os.ReadFile("testdata/test-key.ec.pub") - require.NoError(t, err) + tests := []struct { + name string + opts []GenerateOpt + wantSourceInternal bool + }{ + {name: "default, client traffic"}, + {name: "internal controlplane traffic", opts: []GenerateOpt{WithSourceInternal()}, wantSourceInternal: true}, + } - claims := &Claims{} - tokenInfo, err := jwt.ParseWithClaims(token, claims, loadPublicKey(rawKey)) - require.NoError(t, err) - assert.True(t, tokenInfo.Valid) - assert.Equal(t, "secret-id", claims.StoredSecretID) - assert.Equal(t, Uploader, claims.Role) - assert.Equal(t, "my-issuer", claims.Issuer) - assert.Contains(t, claims.Audience, "artifact-cas.chainloop") - assert.Equal(t, claims.MaxBytes, int64(123)) - assert.Equal(t, "org-uuid", claims.OrgID) - assert.WithinDuration(t, time.Now(), claims.ExpiresAt.Time, 10*time.Second) + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + token, err := b.GenerateJWT("OCI", "secret-id", JWTAudience, Uploader, 123, "org-uuid", tc.opts...) + assert.NoError(t, err) + assert.NotEmpty(t, token) + + // Verify signature and check claims + rawKey, err := os.ReadFile("testdata/test-key.ec.pub") + require.NoError(t, err) + + claims := &Claims{} + tokenInfo, err := jwt.ParseWithClaims(token, claims, loadPublicKey(rawKey)) + require.NoError(t, err) + assert.True(t, tokenInfo.Valid) + assert.Equal(t, "secret-id", claims.StoredSecretID) + assert.Equal(t, Uploader, claims.Role) + assert.Equal(t, "my-issuer", claims.Issuer) + assert.Contains(t, claims.Audience, "artifact-cas.chainloop") + assert.Equal(t, claims.MaxBytes, int64(123)) + assert.Equal(t, "org-uuid", claims.OrgID) + assert.Equal(t, tc.wantSourceInternal, claims.SourceInternal) + assert.WithinDuration(t, time.Now(), claims.ExpiresAt.Time, 10*time.Second) + }) + } } // load key for verification