Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions controller/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go_library(
srcs = [
"controller.go",
"distance_filter.go",
"errors.go",
"getchangedtargetgraph.go",
"getchangedtargets.go",
"getchangedtargetsandedges.go",
Expand Down
56 changes: 56 additions & 0 deletions controller/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Copyright (c) 2025 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package controller

import (
"context"
"errors"

"github.com/uber-go/tally"
"github.com/uber/tango/core/common"
)

// failure_reason tag values for errors that originate in the controller itself.
// Errors from the orchestrator carry their own reason via common.ClassifiedError.
const (
failureReasonValidation = "validation"
failureReasonGraphFetch = "graph_fetch"
failureReasonSend = "send"
failureReasonCompare = "compare"
failureReasonCancelled = "cancelled"
failureReasonDeadlineExceeded = "deadline_exceeded"
failureReasonUnknown = "unknown"
)

// emitFailureMetric tags the failure counter with the reason and type from the
// error's ClassifiedError. Context errors are recognised explicitly; everything
// else falls back to unknown/infra.
func emitFailureMetric(scope tally.Scope, err error) {
var ce common.ClassifiedError
switch {
case errors.As(err, &ce):
// already classified — use the error's own reason and type
case errors.Is(err, context.Canceled):
ce = common.WithReason(failureReasonCancelled, common.ErrorTypeUser, err)
case errors.Is(err, context.DeadlineExceeded):
ce = common.WithReason(failureReasonDeadlineExceeded, common.ErrorTypeUser, err)
default:
ce = common.WithReason(failureReasonUnknown, common.ErrorTypeInfra, err)
}
scope.Tagged(map[string]string{
"failure_type": ce.Type(),
"failure_reason": ce.Reason(),
}).Counter("failure_type").Inc(1)
}
15 changes: 8 additions & 7 deletions controller/getchangedtargets.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,17 @@ func (c *controller) GetChangedTargets(request *pb.GetChangedTargetsRequest, str
defer func() {
if retErr != nil {
scope.Counter("failure").Inc(1)
emitFailureMetric(scope, retErr)
} else {
scope.Counter("success").Inc(1)
}
}()
if err := validateGetChangedTargetsRequest(request); err != nil {
c.logger.Error("GetChangedTargets: Invalid request", zap.Error(err))
return err
return common.WithReason(failureReasonValidation, common.ErrorTypeUser, err)
}
scope = scope.Tagged(map[string]string{"repo": common.ToShortRemote(request.GetFirstRevision().GetRemote())})
scope.Counter("calls").Inc(1)
ctx := stream.Context()
start := time.Now()
logger := c.logger.With(
Expand Down Expand Up @@ -107,7 +110,7 @@ func (c *controller) GetChangedTargets(request *pb.GetChangedTargetsRequest, str
scope.Timer("cache_read_duration").Record(cacheReadDuration)
if sendErr := sendWithDistanceFilter(stream, cached, maxDist); sendErr != nil {
logger.Error("GetChangedTargets: Failed to send cached response", zap.Error(sendErr))
return fmt.Errorf("failed to send cached response: %w", sendErr)
return common.WithReason(failureReasonSend, common.ErrorTypeInfra, fmt.Errorf("failed to send cached response: %w", sendErr))
}
totalDuration := time.Since(start)
logger.Info("GetChangedTargets: Successfully streamed from cache",
Expand Down Expand Up @@ -211,7 +214,7 @@ func (c *controller) GetChangedTargets(request *pb.GetChangedTargetsRequest, str

if ctx.Err() != nil {
// If the context was cancelled by the upstream, just return the original error without additional augmentation
return ctx.Err()
return common.WithReason(failureReasonCancelled, common.ErrorTypeUser, ctx.Err())
}

// Process errors, only aggregating the ones that are original ones and not a result of the other job being cancelled
Expand All @@ -229,8 +232,6 @@ func (c *controller) GetChangedTargets(request *pb.GetChangedTargetsRequest, str
if err != nil {
return err
}

// At this point we should have both graphs computed successfully. Let's compare them.
firstGraph := jobs[0].graphStreamChunks
secondGraph := jobs[1].graphStreamChunks
// Drop job references so the GC can reclaim them once the comparison is done.
Expand All @@ -244,7 +245,7 @@ func (c *controller) GetChangedTargets(request *pb.GetChangedTargetsRequest, str
secondGraph = nil
if err != nil {
logger.Error("GetChangedTargets: Failed to compare target graphs", zap.Error(err))
return fmt.Errorf("failed to compare target graphs: %w", err)
return common.WithReason(failureReasonCompare, common.ErrorTypeInfra, fmt.Errorf("failed to compare target graphs: %w", err))
}
compareDuration := time.Since(compareStart)
logger.Info("GetChangedTargets: Target graphs compared",
Expand All @@ -271,7 +272,7 @@ func (c *controller) GetChangedTargets(request *pb.GetChangedTargetsRequest, str
sendStart := time.Now()
if err := sendWithDistanceFilter(stream, changedTargetsResponses, maxDist); err != nil {
logger.Error("GetChangedTargets: Failed to send response", zap.Error(err))
return fmt.Errorf("failed to send response: %w", err)
return common.WithReason(failureReasonSend, common.ErrorTypeInfra, fmt.Errorf("failed to send response: %w", err))
}
sendDuration := time.Since(sendStart)
scope.Timer("send_duration").Record(sendDuration)
Expand Down
13 changes: 8 additions & 5 deletions controller/getchangedtargetsandedges.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,17 @@ func (c *controller) GetChangedTargetsAndEdges(request *pb.GetChangedTargetsAndE
defer func() {
if retErr != nil {
scope.Counter("failure").Inc(1)
emitFailureMetric(scope, retErr)
} else {
scope.Counter("success").Inc(1)
}
}()
if err := validateGetChangedTargetsAndEdgesRequest(request); err != nil {
c.logger.Error("GetChangedTargetsAndEdges: Invalid request", zap.Error(err))
return err
return common.WithReason(failureReasonValidation, common.ErrorTypeUser, err)
}
scope = scope.Tagged(map[string]string{"repo": common.ToShortRemote(request.GetFirstRevision().GetRemote())})
scope.Counter("calls").Inc(1)
ctx := stream.Context()
start := time.Now()
logger := c.logger.With(
Expand Down Expand Up @@ -101,7 +104,7 @@ func (c *controller) GetChangedTargetsAndEdges(request *pb.GetChangedTargetsAndE
scope.Timer("cache_read_duration").Record(cacheReadDuration)
if err := sendWithDistanceFilterForEdges(stream, cached, maxDist); err != nil {
logger.Error("GetChangedTargetsAndEdges: Failed to send cached response", zap.Error(err))
return err
return common.WithReason(failureReasonSend, common.ErrorTypeInfra, err)
}
totalDuration := time.Since(start)
logger.Info("GetChangedTargetsAndEdges: Successfully streamed from cache",
Expand Down Expand Up @@ -192,7 +195,7 @@ func (c *controller) GetChangedTargetsAndEdges(request *pb.GetChangedTargetsAndE
scope.Timer("graph_fetch_duration").Record(graphFetchDuration)

if ctx.Err() != nil {
return ctx.Err()
return common.WithReason(failureReasonCancelled, common.ErrorTypeUser, ctx.Err())
}

var err error
Expand Down Expand Up @@ -221,7 +224,7 @@ func (c *controller) GetChangedTargetsAndEdges(request *pb.GetChangedTargetsAndE
secondGraph = nil
if err != nil {
logger.Error("GetChangedTargetsAndEdges: Failed to compare target graphs", zap.Error(err))
return fmt.Errorf("failed to compare target graphs: %w", err)
return common.WithReason(failureReasonCompare, common.ErrorTypeInfra, fmt.Errorf("failed to compare target graphs: %w", err))
}
compareDuration := time.Since(compareStart)
logger.Info("GetChangedTargetsAndEdges: Target graphs compared",
Expand All @@ -245,7 +248,7 @@ func (c *controller) GetChangedTargetsAndEdges(request *pb.GetChangedTargetsAndE
sendStart := time.Now()
if err := sendWithDistanceFilterForEdges(stream, responses, maxDist); err != nil {
logger.Error("GetChangedTargetsAndEdges: Failed to send response", zap.Error(err))
return err
return common.WithReason(failureReasonSend, common.ErrorTypeInfra, err)
}
sendDuration := time.Since(sendStart)
scope.Timer("send_duration").Record(sendDuration)
Expand Down
7 changes: 5 additions & 2 deletions controller/gettargetgraph.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ func (c *controller) GetTargetGraph(request *pb.GetTargetGraphRequest, stream pb
defer func() {
if retErr != nil {
scope.Counter("failure").Inc(1)
emitFailureMetric(scope, retErr)
} else {
scope.Counter("success").Inc(1)
}
Expand All @@ -44,6 +45,8 @@ func (c *controller) GetTargetGraph(request *pb.GetTargetGraphRequest, stream pb
logger := c.logger.With(
zap.Any("build_description", request.GetBuildDescription()),
)
scope = scope.Tagged(map[string]string{"repo": common.ToShortRemote(request.GetBuildDescription().GetRemote())})
scope.Counter("calls").Inc(1)
graphReader, err := c.getGraph(ctx, request.GetBuildDescription(), request.GetOutputConfig(), request.GetRequestOptions(), request.GetBypassCache())
if err != nil {
return err
Expand All @@ -68,11 +71,11 @@ func (c *controller) GetTargetGraph(request *pb.GetTargetGraphRequest, stream pb
return nil
}
if err != nil {
return err
return common.WithReason(failureReasonGraphFetch, common.ErrorTypeInfra, err)
}
err = stream.Send(graphStreamChunk)
if err != nil {
return fmt.Errorf("send graph: %w", err)
return common.WithReason(failureReasonSend, common.ErrorTypeInfra, fmt.Errorf("send graph: %w", err))
}
}
}
Expand Down
1 change: 1 addition & 0 deletions core/common/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ load("@rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "common",
srcs = [
"errors.go",
"nameidmapper.go",
"utils.go",
],
Expand Down
51 changes: 51 additions & 0 deletions core/common/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Copyright (c) 2025 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package common

// Error type values for the failure_type metrics tag.
const (
ErrorTypeUser = "user"
ErrorTypeInfra = "infra"
)

// ClassifiedError is an error that carries an explicit failure reason and type
// for metrics classification. External clients can implement this interface so
// that classifyError in the controller picks it up automatically via errors.As,
// without any changes to the classification logic.
type ClassifiedError interface {
error
// Reason returns the failure_reason tag value (e.g. "send", "graph_fetch").
Reason() string
// Type returns the failure_type tag value: ErrorTypeUser or ErrorTypeInfra.
Type() string
Unwrap() error
}

// classifiedErr is the package-internal concrete implementation.
type classifiedErr struct {
reason string
errorType string
err error
}

func (e *classifiedErr) Error() string { return e.err.Error() }
func (e *classifiedErr) Unwrap() error { return e.err }
func (e *classifiedErr) Reason() string { return e.reason }
func (e *classifiedErr) Type() string { return e.errorType }

// WithReason wraps err with an explicit failure reason and type for metrics classification.
func WithReason(reason, errorType string, err error) ClassifiedError {
return &classifiedErr{reason: reason, errorType: errorType, err: err}
}
2 changes: 2 additions & 0 deletions orchestrator/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ load("@rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "orchestrator",
srcs = [
"errors.go",
"native_orchestrator.go",
"orchestrator.go",
],
Expand All @@ -18,6 +19,7 @@ go_library(
"//core/workspace",
"//graphrunner",
"//tangopb",
"@com_github_uber_go_tally//:tally",
"@org_uber_go_zap//:zap",
],
)
Expand Down
31 changes: 31 additions & 0 deletions orchestrator/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Copyright (c) 2025 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package orchestrator

// failure_reason tag values emitted by the orchestrator.
const (
failureReasonConfigParse = "config_parse"
failureReasonNoRepoConfig = "no_repo_config"
failureReasonWorkspaceLease = "workspace_lease"
failureReasonWorkspaceCheckout = "workspace_checkout"
failureReasonRequestCreate = "request_create"
failureReasonRequestApply = "request_apply"
failureReasonTreehashCompute = "treehash_compute"
failureReasonBazelClient = "bazel_client"
failureReasonGraphCompute = "graph_compute"
failureReasonGraphConvert = "graph_convert"
failureReasonStorage = "storage"
failureReasonUnknown = "unknown"
)
Loading
Loading