Skip to content

Commit 8ff2679

Browse files
authored
feat(prometheus): Add Prometheus collector (#1101)
Signed-off-by: Javier Rodriguez <javier@chainloop.dev>
1 parent ca2b646 commit 8ff2679

17 files changed

Lines changed: 744 additions & 237 deletions

File tree

app/controlplane/cmd/wire.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ func wireApp(*conf.Bootstrap, credentials.ReaderWriter, log.Logger, sdk.Availabl
4848
wire.Bind(new(biz.CASClient), new(*biz.CASClientUseCase)),
4949
serviceOpts,
5050
wire.Value([]biz.CASClientOpts{}),
51-
wire.FieldsOf(new(*conf.Bootstrap), "Server", "Auth", "Data", "CasServer", "ReferrerSharedIndex", "Onboarding"),
51+
wire.FieldsOf(new(*conf.Bootstrap), "Server", "Auth", "Data", "CasServer", "ReferrerSharedIndex", "Onboarding", "PrometheusIntegration"),
5252
wire.FieldsOf(new(*conf.Data), "Database"),
5353
dispatcher.New,
5454
authz.NewDatabaseEnforcer,

app/controlplane/cmd/wire_gen.go

Lines changed: 5 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

app/controlplane/internal/conf/controlplane/config/v1/conf.pb.go

Lines changed: 310 additions & 227 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

app/controlplane/internal/conf/controlplane/config/v1/conf.proto

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,9 @@ message Bootstrap {
6262

6363
// Configuration for onboarding users in organizations with specific roles
6464
repeated OnboardingSpec onboarding = 10;
65+
66+
// Configuration to enable Prometheus integration for the specified organizations
67+
repeated PrometheusIntegrationSpec prometheus_integration = 11;
6568
}
6669

6770
// Configuration used to enable a shared index API endpoint that can be used to discover metadata referrers
@@ -177,3 +180,9 @@ message OnboardingSpec {
177180
(buf.validate.field).enum.defined_only = true
178181
];
179182
}
183+
184+
// PrometheusIntegrationSpec is a configuration to enable Prometheus integration for the
185+
// specified organizations
186+
message PrometheusIntegrationSpec {
187+
string org_name = 1 [(buf.validate.field).string.min_len = 1];
188+
}

app/controlplane/internal/server/grpc.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ type Opts struct {
7979
AttestationStateSvc *service.AttestationStateService
8080
UserSvc *service.UserService
8181
SigningSvc *service.SigningService
82+
PrometheusSvc *service.PrometheusService
8283
// Utils
8384
Logger log.Logger
8485
ServerConfig *conf.Server

app/controlplane/internal/server/http.go

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,10 @@ import (
1919
"context"
2020
"errors"
2121

22+
"github.com/chainloop-dev/chainloop/app/controlplane/pkg/jwt/apitoken"
23+
middlewares_http "github.com/chainloop-dev/chainloop/pkg/middlewares/http"
24+
"github.com/golang-jwt/jwt/v4"
25+
2226
"github.com/bufbuild/protovalidate-go"
2327
v1 "github.com/chainloop-dev/chainloop/app/controlplane/api/controlplane/v1"
2428
"github.com/chainloop-dev/chainloop/app/controlplane/internal/service"
@@ -34,7 +38,7 @@ import (
3438
"github.com/go-kratos/kratos/v2/transport/http"
3539
)
3640

37-
// NewHTTPServer new a HTTP server.
41+
// NewHTTPServer new an HTTP server.
3842
func NewHTTPServer(opts *Opts, grpcSrv *grpc.Server) (*http.Server, error) {
3943
middlewares := craftMiddleware(opts)
4044
// important, the validation middleware should be the last one
@@ -59,6 +63,12 @@ func NewHTTPServer(opts *Opts, grpcSrv *grpc.Server) (*http.Server, error) {
5963
// NOTE: these non-grpc transcoded methods DO NOT RUN the middlewares
6064
httpSrv.Handle(service.AuthLoginPath, opts.AuthSvc.RegisterLoginHandler())
6165
httpSrv.Handle(service.AuthCallbackPath, opts.AuthSvc.RegisterCallbackHandler())
66+
httpSrv.Handle(service.PrometheusMetricsPath, middlewares_http.AuthFromAuthorizationHeader(
67+
loadJWTKeyFunc(opts.AuthConfig.GetGeneratedJwsHmacSecret()),
68+
apiTokenCustomClaims(),
69+
apitoken.SigningMethod,
70+
opts.PrometheusSvc,
71+
))
6272
v1.RegisterStatusServiceHTTPServer(httpSrv, service.NewStatusService(opts.AuthSvc.AuthURLs.Login, Version, opts.CASClientUseCase))
6373
v1.RegisterReferrerServiceHTTPServer(httpSrv, service.NewReferrerService(opts.ReferrerUseCase))
6474

@@ -102,3 +112,17 @@ func protoValidateHTTPMiddleware(validator *protovalidate.Validator) middleware.
102112
}
103113
}
104114
}
115+
116+
// loadJWTKeyFunc returns a Keyfunc that returns the raw key
117+
func loadJWTKeyFunc(rawKey string) jwt.Keyfunc {
118+
return func(_ *jwt.Token) (interface{}, error) {
119+
return []byte(rawKey), nil
120+
}
121+
}
122+
123+
// apiTokenCustomClaims returns a ClaimsFunc that returns a custom claims struct
124+
func apiTokenCustomClaims() middlewares_http.ClaimsFunc {
125+
return func() jwt.Claims {
126+
return &apitoken.CustomClaims{}
127+
}
128+
}
Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
//
2+
// Copyright 2024 The Chainloop Authors.
3+
//
4+
// Licensed under the Apache License, Version 2.0 (the "License");
5+
// you may not use this file except in compliance with the License.
6+
// You may obtain a copy of the License at
7+
//
8+
// http://www.apache.org/licenses/LICENSE-2.0
9+
//
10+
// Unless required by applicable law or agreed to in writing, software
11+
// distributed under the License is distributed on an "AS IS" BASIS,
12+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
// See the License for the specific language governing permissions and
14+
// limitations under the License.
15+
16+
package service
17+
18+
import (
19+
"fmt"
20+
"net/http"
21+
22+
"github.com/chainloop-dev/chainloop/app/controlplane/pkg/biz"
23+
"github.com/chainloop-dev/chainloop/app/controlplane/pkg/jwt/apitoken"
24+
25+
jwtmiddleware "github.com/go-kratos/kratos/v2/middleware/auth/jwt"
26+
"github.com/gorilla/mux"
27+
"github.com/prometheus/common/expfmt"
28+
)
29+
30+
const (
31+
// PrometheusMetricsPath is the path for the Prometheus metrics
32+
PrometheusMetricsPath = "/prom/{org_name}/metrics"
33+
)
34+
35+
// PrometheusService is the prometheus service
36+
type PrometheusService struct {
37+
*service
38+
// Use Cases
39+
prometheusUseCase *biz.PrometheusUseCase
40+
organizationUseCase *biz.OrganizationUseCase
41+
}
42+
43+
// NewPrometheusService creates a new prometheus service
44+
func NewPrometheusService(orgUseCase *biz.OrganizationUseCase, prometheusUseCase *biz.PrometheusUseCase, opts ...NewOpt) *PrometheusService {
45+
return &PrometheusService{
46+
organizationUseCase: orgUseCase,
47+
prometheusUseCase: prometheusUseCase,
48+
service: newService(opts...),
49+
}
50+
}
51+
52+
// MetricsRequestHandler is the handler for the metrics request. It fetches the Prometheus registry
53+
// and if found, retrieves all metrics in Prometheus format.
54+
func (p *PrometheusService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
55+
// Extract org_name from the URL path
56+
orgName, ok := mux.Vars(r)["org_name"]
57+
if !ok {
58+
http.Error(w, "Error extracting organization name from URL path", http.StatusBadRequest)
59+
return
60+
}
61+
62+
// Extracts the organization name from the request
63+
rawClaims, ok := jwtmiddleware.FromContext(r.Context())
64+
if !ok {
65+
http.Error(w, "Error extracting claims from context", http.StatusInternalServerError)
66+
return
67+
}
68+
69+
apiTokenClaims, ok := rawClaims.(*apitoken.CustomClaims)
70+
if !ok {
71+
http.Error(w, "Error extracting API Token claims", http.StatusInternalServerError)
72+
return
73+
}
74+
75+
// Check if the organization in the API Token matches the one in the request
76+
if apiTokenClaims.OrgName != orgName {
77+
http.Error(w, fmt.Sprintf("Organization [%v] on API Token does not match the organization in the request", apiTokenClaims.OrgName), http.StatusBadRequest)
78+
return
79+
}
80+
81+
// Checks if the organization has a Prometheus integration activated
82+
if !p.prometheusUseCase.OrganizationHasRegistry(orgName) {
83+
http.Error(w, "Organization does not have a Prometheus integration activated", http.StatusNotFound)
84+
return
85+
}
86+
87+
// Fetches the Prometheus registry for the organization
88+
reg := p.prometheusUseCase.GetRegistryByOrganizationName(orgName)
89+
if reg == nil {
90+
http.Error(w, "Error fetching Prometheus registry", http.StatusInternalServerError)
91+
return
92+
}
93+
94+
// Gathers the metrics
95+
gather, err := reg.Gather()
96+
if err != nil {
97+
http.Error(w, "Error gathering metrics", http.StatusInternalServerError)
98+
return
99+
}
100+
101+
// Encode and write the metrics to the response
102+
contentType := expfmt.Negotiate(r.Header)
103+
w.Header().Set("Content-Type", string(contentType))
104+
105+
enc := expfmt.NewEncoder(w, contentType)
106+
for _, mf := range gather {
107+
if err := enc.Encode(mf); err != nil {
108+
http.Error(w, "Error encoding metrics", http.StatusInternalServerError)
109+
return
110+
}
111+
}
112+
}

app/controlplane/internal/service/service.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ var ProviderSet = wire.NewSet(
5050
NewAttestationStateService,
5151
NewUserService,
5252
NewSigningService,
53+
NewPrometheusService,
5354
wire.Struct(new(NewWorkflowRunServiceOpts), "*"),
5455
wire.Struct(new(NewAttestationServiceOpts), "*"),
5556
wire.Struct(new(NewAttestationStateServiceOpt), "*"),

app/controlplane/pkg/biz/biz.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ var ProviderSet = wire.NewSet(
4949
NewAPITokenSyncerUseCase,
5050
NewAttestationStateUseCase,
5151
NewChainloopSigningUseCase,
52+
NewPrometheusUseCase,
5253
wire.Struct(new(NewIntegrationUseCaseOpts), "*"),
5354
wire.Struct(new(NewUserUseCaseParams), "*"),
5455
)

app/controlplane/pkg/biz/orgmetrics.go

Lines changed: 53 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,19 @@ import (
2020
"fmt"
2121
"time"
2222

23+
prometheuscollector "github.com/chainloop-dev/chainloop/app/controlplane/pkg/metrics/prometheus"
24+
2325
"github.com/go-kratos/kratos/v2/log"
2426
"github.com/google/uuid"
2527
)
2628

2729
type OrgMetricsUseCase struct {
2830
logger *log.Helper
29-
repo OrgMetricsRepo
31+
// Repositories
32+
repo OrgMetricsRepo
33+
orgRepo OrganizationRepo
34+
// Use Cases
35+
wfUseCase *WorkflowUseCase
3036
}
3137

3238
type OrgMetricsRepo interface {
@@ -64,8 +70,13 @@ func (tw *TimeWindow) Validate() error {
6470
return nil
6571
}
6672

67-
func NewOrgMetricsUseCase(r OrgMetricsRepo, l log.Logger) (*OrgMetricsUseCase, error) {
68-
return &OrgMetricsUseCase{logger: log.NewHelper(l), repo: r}, nil
73+
func NewOrgMetricsUseCase(r OrgMetricsRepo, orgRepo OrganizationRepo, wfUseCase *WorkflowUseCase, l log.Logger) (*OrgMetricsUseCase, error) {
74+
return &OrgMetricsUseCase{
75+
orgRepo: orgRepo,
76+
wfUseCase: wfUseCase,
77+
logger: log.NewHelper(l),
78+
repo: r,
79+
}, nil
6980
}
7081

7182
func (uc *OrgMetricsUseCase) RunsTotal(ctx context.Context, orgID string, timeWindow *TimeWindow) (int32, error) {
@@ -149,6 +160,45 @@ func (uc *OrgMetricsUseCase) TopWorkflowsByRunsCount(ctx context.Context, orgID
149160
return uc.repo.TopWorkflowsByRunsCount(ctx, orgUUID, numWorkflows, timeWindow)
150161
}
151162

163+
// GetLastWorkflowStatusByRun returns the last status of each workflow by its last run
164+
// It only returns workflows with at least one run and skips workflows with initialized runs
165+
func (uc *OrgMetricsUseCase) GetLastWorkflowStatusByRun(ctx context.Context, orgName string) ([]*prometheuscollector.WorkflowLastStatusByRunReport, error) {
166+
// Find organization
167+
org, err := uc.orgRepo.FindByName(ctx, orgName)
168+
if err != nil {
169+
return nil, fmt.Errorf("finding organization: %w", err)
170+
}
171+
172+
// List all workflows
173+
wfs, err := uc.wfUseCase.List(ctx, org.ID)
174+
if err != nil {
175+
return nil, fmt.Errorf("listing workflows: %w", err)
176+
}
177+
178+
// Create reports
179+
// nolint:prealloc
180+
var reports []*prometheuscollector.WorkflowLastStatusByRunReport
181+
for _, wf := range wfs {
182+
// Skip workflows with no runs
183+
if wf.RunsCounter == 0 {
184+
continue
185+
}
186+
187+
// Skip workflows with initialized runs since they are not yet finished
188+
if wf.LastRun.State == string(WorkflowRunInitialized) {
189+
continue
190+
}
191+
192+
reports = append(reports, &prometheuscollector.WorkflowLastStatusByRunReport{
193+
OrgName: orgName,
194+
WorkflowName: wf.Name,
195+
Status: wf.LastRun.State,
196+
})
197+
}
198+
199+
return reports, nil
200+
}
201+
152202
// validateTimeWindowIsSet validates that the time window is set
153203
func validateTimeWindowIsSet(tw *TimeWindow) error {
154204
// Check if time window is set

0 commit comments

Comments
 (0)