diff --git a/controller/BUILD.bazel b/controller/BUILD.bazel index 849995e..3ccf7f9 100644 --- a/controller/BUILD.bazel +++ b/controller/BUILD.bazel @@ -5,6 +5,7 @@ go_library( srcs = [ "controller.go", "distance_filter.go", + "errors.go", "getchangedtargetgraph.go", "getchangedtargets.go", "getchangedtargetsandedges.go", diff --git a/controller/errors.go b/controller/errors.go new file mode 100644 index 0000000..2a27585 --- /dev/null +++ b/controller/errors.go @@ -0,0 +1,56 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package controller + +import ( + "context" + "errors" + + "github.com/uber-go/tally" + "github.com/uber/tango/core/common" +) + +// failure_reason tag values for errors that originate in the controller itself. +// Errors from the orchestrator carry their own reason via common.ClassifiedError. +const ( + failureReasonValidation = "validation" + failureReasonGraphFetch = "graph_fetch" + failureReasonSend = "send" + failureReasonCompare = "compare" + failureReasonCancelled = "cancelled" + failureReasonDeadlineExceeded = "deadline_exceeded" + failureReasonUnknown = "unknown" +) + +// emitFailureMetric tags the failure counter with the reason and type from the +// error's ClassifiedError. Context errors are recognised explicitly; everything +// else falls back to unknown/infra. +func emitFailureMetric(scope tally.Scope, err error) { + var ce common.ClassifiedError + switch { + case errors.As(err, &ce): + // already classified — use the error's own reason and type + case errors.Is(err, context.Canceled): + ce = common.WithReason(failureReasonCancelled, common.ErrorTypeUser, err) + case errors.Is(err, context.DeadlineExceeded): + ce = common.WithReason(failureReasonDeadlineExceeded, common.ErrorTypeUser, err) + default: + ce = common.WithReason(failureReasonUnknown, common.ErrorTypeInfra, err) + } + scope.Tagged(map[string]string{ + "failure_type": ce.Type(), + "failure_reason": ce.Reason(), + }).Counter("failure_type").Inc(1) +} diff --git a/controller/getchangedtargets.go b/controller/getchangedtargets.go index 29b0801..6a057f8 100644 --- a/controller/getchangedtargets.go +++ b/controller/getchangedtargets.go @@ -44,14 +44,17 @@ func (c *controller) GetChangedTargets(request *pb.GetChangedTargetsRequest, str defer func() { if retErr != nil { scope.Counter("failure").Inc(1) + emitFailureMetric(scope, retErr) } else { scope.Counter("success").Inc(1) } }() if err := validateGetChangedTargetsRequest(request); err != nil { c.logger.Error("GetChangedTargets: Invalid request", zap.Error(err)) - return err + return common.WithReason(failureReasonValidation, common.ErrorTypeUser, err) } + scope = scope.Tagged(map[string]string{"repo": common.ToShortRemote(request.GetFirstRevision().GetRemote())}) + scope.Counter("calls").Inc(1) ctx := stream.Context() start := time.Now() logger := c.logger.With( @@ -107,7 +110,7 @@ func (c *controller) GetChangedTargets(request *pb.GetChangedTargetsRequest, str scope.Timer("cache_read_duration").Record(cacheReadDuration) if sendErr := sendWithDistanceFilter(stream, cached, maxDist); sendErr != nil { logger.Error("GetChangedTargets: Failed to send cached response", zap.Error(sendErr)) - return fmt.Errorf("failed to send cached response: %w", sendErr) + return common.WithReason(failureReasonSend, common.ErrorTypeInfra, fmt.Errorf("failed to send cached response: %w", sendErr)) } totalDuration := time.Since(start) logger.Info("GetChangedTargets: Successfully streamed from cache", @@ -211,7 +214,7 @@ func (c *controller) GetChangedTargets(request *pb.GetChangedTargetsRequest, str if ctx.Err() != nil { // If the context was cancelled by the upstream, just return the original error without additional augmentation - return ctx.Err() + return common.WithReason(failureReasonCancelled, common.ErrorTypeUser, ctx.Err()) } // Process errors, only aggregating the ones that are original ones and not a result of the other job being cancelled @@ -229,8 +232,6 @@ func (c *controller) GetChangedTargets(request *pb.GetChangedTargetsRequest, str if err != nil { return err } - - // At this point we should have both graphs computed successfully. Let's compare them. firstGraph := jobs[0].graphStreamChunks secondGraph := jobs[1].graphStreamChunks // Drop job references so the GC can reclaim them once the comparison is done. @@ -244,7 +245,7 @@ func (c *controller) GetChangedTargets(request *pb.GetChangedTargetsRequest, str secondGraph = nil if err != nil { logger.Error("GetChangedTargets: Failed to compare target graphs", zap.Error(err)) - return fmt.Errorf("failed to compare target graphs: %w", err) + return common.WithReason(failureReasonCompare, common.ErrorTypeInfra, fmt.Errorf("failed to compare target graphs: %w", err)) } compareDuration := time.Since(compareStart) logger.Info("GetChangedTargets: Target graphs compared", @@ -271,7 +272,7 @@ func (c *controller) GetChangedTargets(request *pb.GetChangedTargetsRequest, str sendStart := time.Now() if err := sendWithDistanceFilter(stream, changedTargetsResponses, maxDist); err != nil { logger.Error("GetChangedTargets: Failed to send response", zap.Error(err)) - return fmt.Errorf("failed to send response: %w", err) + return common.WithReason(failureReasonSend, common.ErrorTypeInfra, fmt.Errorf("failed to send response: %w", err)) } sendDuration := time.Since(sendStart) scope.Timer("send_duration").Record(sendDuration) diff --git a/controller/getchangedtargetsandedges.go b/controller/getchangedtargetsandedges.go index 47c281d..015da21 100644 --- a/controller/getchangedtargetsandedges.go +++ b/controller/getchangedtargetsandedges.go @@ -44,14 +44,17 @@ func (c *controller) GetChangedTargetsAndEdges(request *pb.GetChangedTargetsAndE defer func() { if retErr != nil { scope.Counter("failure").Inc(1) + emitFailureMetric(scope, retErr) } else { scope.Counter("success").Inc(1) } }() if err := validateGetChangedTargetsAndEdgesRequest(request); err != nil { c.logger.Error("GetChangedTargetsAndEdges: Invalid request", zap.Error(err)) - return err + return common.WithReason(failureReasonValidation, common.ErrorTypeUser, err) } + scope = scope.Tagged(map[string]string{"repo": common.ToShortRemote(request.GetFirstRevision().GetRemote())}) + scope.Counter("calls").Inc(1) ctx := stream.Context() start := time.Now() logger := c.logger.With( @@ -101,7 +104,7 @@ func (c *controller) GetChangedTargetsAndEdges(request *pb.GetChangedTargetsAndE scope.Timer("cache_read_duration").Record(cacheReadDuration) if err := sendWithDistanceFilterForEdges(stream, cached, maxDist); err != nil { logger.Error("GetChangedTargetsAndEdges: Failed to send cached response", zap.Error(err)) - return err + return common.WithReason(failureReasonSend, common.ErrorTypeInfra, err) } totalDuration := time.Since(start) logger.Info("GetChangedTargetsAndEdges: Successfully streamed from cache", @@ -192,7 +195,7 @@ func (c *controller) GetChangedTargetsAndEdges(request *pb.GetChangedTargetsAndE scope.Timer("graph_fetch_duration").Record(graphFetchDuration) if ctx.Err() != nil { - return ctx.Err() + return common.WithReason(failureReasonCancelled, common.ErrorTypeUser, ctx.Err()) } var err error @@ -221,7 +224,7 @@ func (c *controller) GetChangedTargetsAndEdges(request *pb.GetChangedTargetsAndE secondGraph = nil if err != nil { logger.Error("GetChangedTargetsAndEdges: Failed to compare target graphs", zap.Error(err)) - return fmt.Errorf("failed to compare target graphs: %w", err) + return common.WithReason(failureReasonCompare, common.ErrorTypeInfra, fmt.Errorf("failed to compare target graphs: %w", err)) } compareDuration := time.Since(compareStart) logger.Info("GetChangedTargetsAndEdges: Target graphs compared", @@ -245,7 +248,7 @@ func (c *controller) GetChangedTargetsAndEdges(request *pb.GetChangedTargetsAndE sendStart := time.Now() if err := sendWithDistanceFilterForEdges(stream, responses, maxDist); err != nil { logger.Error("GetChangedTargetsAndEdges: Failed to send response", zap.Error(err)) - return err + return common.WithReason(failureReasonSend, common.ErrorTypeInfra, err) } sendDuration := time.Since(sendStart) scope.Timer("send_duration").Record(sendDuration) diff --git a/controller/gettargetgraph.go b/controller/gettargetgraph.go index 82ecdae..a335cd9 100644 --- a/controller/gettargetgraph.go +++ b/controller/gettargetgraph.go @@ -35,6 +35,7 @@ func (c *controller) GetTargetGraph(request *pb.GetTargetGraphRequest, stream pb defer func() { if retErr != nil { scope.Counter("failure").Inc(1) + emitFailureMetric(scope, retErr) } else { scope.Counter("success").Inc(1) } @@ -44,6 +45,8 @@ func (c *controller) GetTargetGraph(request *pb.GetTargetGraphRequest, stream pb logger := c.logger.With( zap.Any("build_description", request.GetBuildDescription()), ) + scope = scope.Tagged(map[string]string{"repo": common.ToShortRemote(request.GetBuildDescription().GetRemote())}) + scope.Counter("calls").Inc(1) graphReader, err := c.getGraph(ctx, request.GetBuildDescription(), request.GetOutputConfig(), request.GetRequestOptions(), request.GetBypassCache()) if err != nil { return err @@ -68,11 +71,11 @@ func (c *controller) GetTargetGraph(request *pb.GetTargetGraphRequest, stream pb return nil } if err != nil { - return err + return common.WithReason(failureReasonGraphFetch, common.ErrorTypeInfra, err) } err = stream.Send(graphStreamChunk) if err != nil { - return fmt.Errorf("send graph: %w", err) + return common.WithReason(failureReasonSend, common.ErrorTypeInfra, fmt.Errorf("send graph: %w", err)) } } } diff --git a/core/common/BUILD.bazel b/core/common/BUILD.bazel index 67c6b26..0daeb3c 100644 --- a/core/common/BUILD.bazel +++ b/core/common/BUILD.bazel @@ -3,6 +3,7 @@ load("@rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "common", srcs = [ + "errors.go", "nameidmapper.go", "utils.go", ], diff --git a/core/common/errors.go b/core/common/errors.go new file mode 100644 index 0000000..6b4bc49 --- /dev/null +++ b/core/common/errors.go @@ -0,0 +1,51 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package common + +// Error type values for the failure_type metrics tag. +const ( + ErrorTypeUser = "user" + ErrorTypeInfra = "infra" +) + +// ClassifiedError is an error that carries an explicit failure reason and type +// for metrics classification. External clients can implement this interface so +// that classifyError in the controller picks it up automatically via errors.As, +// without any changes to the classification logic. +type ClassifiedError interface { + error + // Reason returns the failure_reason tag value (e.g. "send", "graph_fetch"). + Reason() string + // Type returns the failure_type tag value: ErrorTypeUser or ErrorTypeInfra. + Type() string + Unwrap() error +} + +// classifiedErr is the package-internal concrete implementation. +type classifiedErr struct { + reason string + errorType string + err error +} + +func (e *classifiedErr) Error() string { return e.err.Error() } +func (e *classifiedErr) Unwrap() error { return e.err } +func (e *classifiedErr) Reason() string { return e.reason } +func (e *classifiedErr) Type() string { return e.errorType } + +// WithReason wraps err with an explicit failure reason and type for metrics classification. +func WithReason(reason, errorType string, err error) ClassifiedError { + return &classifiedErr{reason: reason, errorType: errorType, err: err} +} diff --git a/orchestrator/BUILD.bazel b/orchestrator/BUILD.bazel index e0bb29c..64ceb5c 100644 --- a/orchestrator/BUILD.bazel +++ b/orchestrator/BUILD.bazel @@ -3,6 +3,7 @@ load("@rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "orchestrator", srcs = [ + "errors.go", "native_orchestrator.go", "orchestrator.go", ], @@ -18,6 +19,7 @@ go_library( "//core/workspace", "//graphrunner", "//tangopb", + "@com_github_uber_go_tally//:tally", "@org_uber_go_zap//:zap", ], ) diff --git a/orchestrator/errors.go b/orchestrator/errors.go new file mode 100644 index 0000000..61a3f82 --- /dev/null +++ b/orchestrator/errors.go @@ -0,0 +1,31 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package orchestrator + +// failure_reason tag values emitted by the orchestrator. +const ( + failureReasonConfigParse = "config_parse" + failureReasonNoRepoConfig = "no_repo_config" + failureReasonWorkspaceLease = "workspace_lease" + failureReasonWorkspaceCheckout = "workspace_checkout" + failureReasonRequestCreate = "request_create" + failureReasonRequestApply = "request_apply" + failureReasonTreehashCompute = "treehash_compute" + failureReasonBazelClient = "bazel_client" + failureReasonGraphCompute = "graph_compute" + failureReasonGraphConvert = "graph_convert" + failureReasonStorage = "storage" + failureReasonUnknown = "unknown" +) diff --git a/orchestrator/native_orchestrator.go b/orchestrator/native_orchestrator.go index d21af32..bea74bf 100644 --- a/orchestrator/native_orchestrator.go +++ b/orchestrator/native_orchestrator.go @@ -17,11 +17,13 @@ package orchestrator import ( "bytes" "context" + "errors" "fmt" "os" "time" + "github.com/uber-go/tally" "github.com/uber/tango/config" "github.com/uber/tango/core/bazel" "github.com/uber/tango/core/common" @@ -38,6 +40,7 @@ type nativeOrchestrator struct { storage storage.Storage repoManager repomanager.RepoManager logger *zap.SugaredLogger + scope tally.Scope // gitFactory allows injecting a git.Interface constructor for testing gitFactory func(directory string) git.Interface graphRunner graphrunner.GraphRunner @@ -48,6 +51,7 @@ type Params struct { Storage storage.Storage RepoManager repomanager.RepoManager Logger *zap.SugaredLogger + Scope tally.Scope GitFactory func(directory string) git.Interface GraphRunner graphrunner.GraphRunner ConfigFilePath string @@ -55,10 +59,15 @@ type Params struct { // NewNativeOrchestrator creates a new native orchestrator with the given parameters. func NewNativeOrchestrator(p Params) Orchestrator { + scope := p.Scope + if scope == nil { + scope = tally.NoopScope + } return &nativeOrchestrator{ storage: p.Storage, repoManager: p.RepoManager, logger: p.Logger, + scope: scope.SubScope("orchestrator"), gitFactory: p.GitFactory, graphRunner: p.GraphRunner, configFilePath: p.ConfigFilePath, @@ -67,7 +76,24 @@ func NewNativeOrchestrator(p Params) Orchestrator { // GetTargetGraph is used to compute the target graph locally. // It leases a workspace, checks out the base revision, applies the change requests, and computes the target graph. -func (b *nativeOrchestrator) GetTargetGraph(ctx context.Context, param GetTargetGraphParam) (storage.GraphReader, error) { +func (b *nativeOrchestrator) GetTargetGraph(ctx context.Context, param GetTargetGraphParam) (_ storage.GraphReader, retErr error) { + scope := b.scope.SubScope("get_target_graph") + scope.Counter("calls").Inc(1) + defer func() { + if retErr != nil { + scope.Counter("failure").Inc(1) + var ce common.ClassifiedError + if !errors.As(retErr, &ce) { + ce = common.WithReason(failureReasonUnknown, common.ErrorTypeInfra, retErr) + } + scope.Tagged(map[string]string{ + "failure_type": ce.Type(), + "failure_reason": ce.Reason(), + }).Counter("failure_type").Inc(1) + } else { + scope.Counter("success").Inc(1) + } + }() logger := b.logger.With(zap.Any("build_description", param.Req.BuildDescription)) logger.Infow("GetTargetGraph: Processing request") @@ -75,17 +101,17 @@ func (b *nativeOrchestrator) GetTargetGraph(ctx context.Context, param GetTarget cfg, err := config.Parse(b.configFilePath) if err != nil { logger.Errorw("GetTargetGraph: Error parsing config file", zap.String("configFilePath", b.configFilePath), zap.Error(err)) - return nil, err + return nil, common.WithReason(failureReasonConfigParse, common.ErrorTypeInfra, err) } remote := param.Req.BuildDescription.Remote repoCfg, ok := cfg.GetRepositoryConfig(remote) if !ok { - return nil, fmt.Errorf("no repository configuration found for remote %q", remote) + return nil, common.WithReason(failureReasonNoRepoConfig, common.ErrorTypeUser, fmt.Errorf("no repository configuration found for remote %q", remote)) } ws, err := b.repoManager.Lease(ctx, *param.Req.BuildDescription) if err != nil { logger.Errorw("GetTargetGraph: Error leasing workspace", zap.Error(err)) - return nil, err + return nil, common.WithReason(failureReasonWorkspaceLease, common.ErrorTypeInfra, err) } defer func() { err := ws.Release() @@ -99,7 +125,7 @@ func (b *nativeOrchestrator) GetTargetGraph(ctx context.Context, param GetTarget err = ws.Checkout(ctx, param.Req.BuildDescription.Remote, param.Req.BuildDescription.BaseSha) if err != nil { logger.Errorw("GetTargetGraph: Error checking out base revision", zap.Error(err)) - return nil, err + return nil, common.WithReason(failureReasonWorkspaceCheckout, common.ErrorTypeInfra, err) } logger.Infow("GetTargetGraph: Checked out base revision") @@ -114,14 +140,14 @@ func (b *nativeOrchestrator) GetTargetGraph(ctx context.Context, param GetTarget request, err := workspace.NewRequest(req.GetUrl(), gitModule, param.Req.BuildDescription.BaseSha, req.GetCommit(), logger) if err != nil { logger.Errorw("GetTargetGraph: Error creating request", zap.String("url", req.GetUrl()), zap.Error(err)) - return nil, err + return nil, common.WithReason(failureReasonRequestCreate, common.ErrorTypeInfra, err) } requests = append(requests, request) } err = ws.ApplyRequests(ctx, requests) if err != nil { logger.Errorw("GetTargetGraph: Error applying requests to workspace", zap.Error(err)) - return nil, err + return nil, common.WithReason(failureReasonRequestApply, common.ErrorTypeInfra, err) } logger.Infow("GetTargetGraph: Applied requests", zap.Int("request_count", len(requests))) @@ -129,7 +155,7 @@ func (b *nativeOrchestrator) GetTargetGraph(ctx context.Context, param GetTarget treehash, err := gitModule.RevParse(ctx, "HEAD^{tree}") if err != nil { logger.Errorw("GetTargetGraph: Treehash computation failed", zap.Error(err)) - return nil, err + return nil, common.WithReason(failureReasonTreehashCompute, common.ErrorTypeInfra, err) } treehashPath := common.GetGraphByTreeHash(param.Req.BuildDescription.Remote, treehash, param.Req.BuildDescription.GetStrategy(), param.Req.GetRequestOptions()) if !param.BypassCache { @@ -139,9 +165,8 @@ func (b *nativeOrchestrator) GetTargetGraph(ctx context.Context, param GetTarget return graphReader, nil } if !storage.IsNotFound(err) { - // Other errors (network, infra issues) should be retried logger.Errorw("GetTargetGraph: Storage error", zap.Error(err)) - return nil, err + return nil, common.WithReason(failureReasonStorage, common.ErrorTypeInfra, err) } logger.Infow("GetTargetGraph: Treehash not found, computing target graph", zap.String("treehash", treehash)) } else { @@ -158,7 +183,7 @@ func (b *nativeOrchestrator) GetTargetGraph(ctx context.Context, param GetTarget }) if err != nil { logger.Errorw("GetTargetGraph: Error creating bazel client", zap.Error(err)) - return nil, err + return nil, common.WithReason(failureReasonBazelClient, common.ErrorTypeInfra, err) } // Use default native graph runner runner = graphrunner.NewNativeGraphRunner(graphrunner.NativeGraphRunnerParams{ @@ -171,30 +196,29 @@ func (b *nativeOrchestrator) GetTargetGraph(ctx context.Context, param GetTarget result, err := runner.Compute(ctx, ws) if err != nil { logger.Errorw("GetTargetGraph: Error computing target graph", zap.Error(err)) - return nil, err + return nil, common.WithReason(failureReasonGraphCompute, common.ErrorTypeInfra, err) } responses, err := common.ResultToGetTargetGraphResponse(result) if err != nil { logger.Errorw("GetTargetGraph: Error converting target graph to response", zap.Error(err)) - return nil, err + return nil, common.WithReason(failureReasonGraphConvert, common.ErrorTypeInfra, err) } err = storage.WriteGraphStream(ctx, b.storage, treehashPath, responses) if err != nil { logger.Errorw("GetTargetGraph: Error writing target graph to storage", zap.Error(err)) - return nil, err + return nil, common.WithReason(failureReasonStorage, common.ErrorTypeInfra, err) } - // Map build description to treehash for future lookup. treehashCachePath := common.GetTreehashCachePath(param.Req.BuildDescription) treehashReader := bytes.NewReader([]byte(treehash)) err = b.storage.Put(ctx, storage.UploadRequest{Key: treehashCachePath, Reader: treehashReader}) if err != nil { logger.Errorw("GetTargetGraph: Error storing treehash mapping", zap.Error(err)) - return nil, err + return nil, common.WithReason(failureReasonStorage, common.ErrorTypeInfra, err) } graphReader, err := storage.NewGraphReader(ctx, b.storage, treehashPath) if err != nil { logger.Errorw("GetTargetGraph: Error creating graph reader", zap.Error(err)) - return nil, err + return nil, common.WithReason(failureReasonStorage, common.ErrorTypeInfra, err) } logger.Infow("GetTargetGraph: Done computing and storing target graph", zap.String("treehash", treehash)) return graphReader, nil