Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions core/consumer/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//core/errs",
"//core/metrics",
"//entity/queue",
"//extension/queue",
"@com_github_uber_go_tally_v4//:tally",
Expand Down
54 changes: 25 additions & 29 deletions core/consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/uber-go/tally/v4"
"github.com/uber/submitqueue/core/errs"
"github.com/uber/submitqueue/core/metrics"
"github.com/uber/submitqueue/extension/queue"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -329,8 +330,10 @@ func (m *consumer) processPartition(ctx context.Context, controller Controller,

// processDelivery calls the controller and performs ack/nack based on the result.
func (m *consumer) processDelivery(ctx context.Context, controller Controller, delivery queue.Delivery, controllerScope tally.Scope) {
const opName = "process"

start := time.Now()
controllerScope.Counter("messages_received").Inc(1)
metrics.NamedCounter(controllerScope, opName, "messages_received", 1)

msg := delivery.Message()
topicKey := controller.TopicKey()
Expand Down Expand Up @@ -364,10 +367,7 @@ func (m *consumer) processDelivery(ctx context.Context, controller Controller, d
}
}

latencyScope := controllerScope.Tagged(map[string]string{
"success": successTag,
})
latencyScope.Timer("controller_latency").Record(elapsed)
metrics.NamedTimer(controllerScope, opName, "controller_latency", elapsed, metrics.NewTag("success", successTag))

if err != nil {
// Check if the error is non-retryable (poison pill message)
Expand All @@ -382,7 +382,7 @@ func (m *consumer) processDelivery(ctx context.Context, controller Controller, d
"elapsed_ms", elapsed.Milliseconds(),
)

controllerScope.Counter("non_retryable_errors").Inc(1)
metrics.NamedCounter(controllerScope, opName, "non_retryable_errors", 1)

// Reject moves to DLQ (or acks if DLQ disabled)
if rejectErr := delivery.Reject(ctx, err.Error()); rejectErr != nil {
Expand All @@ -392,7 +392,7 @@ func (m *consumer) processDelivery(ctx context.Context, controller Controller, d
"message_id", msg.ID,
"error", rejectErr,
)
controllerScope.Counter("reject_errors").Inc(1)
metrics.NamedCounter(controllerScope, opName, "reject_errors", 1)
}
return
}
Expand All @@ -414,7 +414,7 @@ func (m *consumer) processDelivery(ctx context.Context, controller Controller, d
"elapsed_ms", elapsed.Milliseconds(),
)

controllerScope.Counter("controller_errors").Inc(1)
metrics.NamedCounter(controllerScope, opName, "controller_errors", 1)

// Nack with no delay - let visibility timeout handle retry delay
nackStart := time.Now()
Expand All @@ -425,14 +425,13 @@ func (m *consumer) processDelivery(ctx context.Context, controller Controller, d
"message_id", msg.ID,
"error", nackErr,
)
controllerScope.Counter("nack_errors").Inc(1)
metrics.NamedCounter(controllerScope, opName, "nack_errors", 1)
} else {
controllerScope.Counter("nack_count").Inc(1)
nackScope := controllerScope.Tagged(map[string]string{
"operation": "nack",
"success": "true",
})
nackScope.Timer("ack_nack_latency").Record(time.Since(nackStart))
metrics.NamedCounter(controllerScope, opName, "nack_count", 1)
metrics.NamedTimer(controllerScope, opName, "ack_nack_latency", time.Since(nackStart),
metrics.NewTag("operation", "nack"),
metrics.NewTag("success", "true"),
)
}
return
}
Expand All @@ -446,23 +445,20 @@ func (m *consumer) processDelivery(ctx context.Context, controller Controller, d
"message_id", msg.ID,
"error", ackErr,
)
controllerScope.Counter("ack_errors").Inc(1)
ackScope := controllerScope.Tagged(map[string]string{
"operation": "ack",
"success": "false",
})
ackScope.Timer("ack_nack_latency").Record(time.Since(ackStart))
metrics.NamedCounter(controllerScope, opName, "ack_errors", 1)
metrics.NamedTimer(controllerScope, opName, "ack_nack_latency", time.Since(ackStart),
metrics.NewTag("operation", "ack"),
metrics.NewTag("success", "false"),
)
return
}

controllerScope.Counter("messages_processed").Inc(1)
controllerScope.Counter("ack_count").Inc(1)

ackScope := controllerScope.Tagged(map[string]string{
"operation": "ack",
"success": "true",
})
ackScope.Timer("ack_nack_latency").Record(time.Since(ackStart))
metrics.NamedCounter(controllerScope, opName, "messages_processed", 1)
metrics.NamedCounter(controllerScope, opName, "ack_count", 1)
metrics.NamedTimer(controllerScope, opName, "ack_nack_latency", time.Since(ackStart),
metrics.NewTag("operation", "ack"),
metrics.NewTag("success", "true"),
)

m.logger.Debugw("message processed successfully",
"controller", controller.Name(),
Expand Down
1 change: 1 addition & 0 deletions extension/mergechecker/github/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ go_library(
importpath = "github.com/uber/submitqueue/extension/mergechecker/github",
visibility = ["//visibility:public"],
deps = [
"//core/metrics",
"//entity",
"//entity/github",
"//extension/mergechecker",
Expand Down
19 changes: 11 additions & 8 deletions extension/mergechecker/github/checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"net/http"

"github.com/uber-go/tally/v4"
"github.com/uber/submitqueue/core/metrics"
"github.com/uber/submitqueue/entity"
entitygithub "github.com/uber/submitqueue/entity/github"
"github.com/uber/submitqueue/extension/mergechecker"
Expand Down Expand Up @@ -60,17 +61,19 @@ func NewMergeChecker(params Params) mergechecker.MergeChecker {
}

// Check assesses whether a change can merge cleanly using the GitHub GraphQL API.
func (c *mergeChecker) Check(ctx context.Context, queue string, change entity.Change) (mergechecker.Result, error) {
c.metricsScope.Counter("check_started").Inc(1)
func (c *mergeChecker) Check(ctx context.Context, queue string, change entity.Change) (result mergechecker.Result, retErr error) {
const opName = "check"

op := metrics.Begin(c.metricsScope, opName)
defer func() { op.Complete(retErr) }()

result := mergechecker.Result{}
// Parse all change IDs
// TODO: classify parse errors as user errors (non-retryable) vs system errors.
changeIDs := make([]entitygithub.ChangeID, 0, len(change.URIs))
for _, rawID := range change.URIs {
cid, err := entitygithub.ParseChangeID(rawID)
if err != nil {
c.metricsScope.Counter("parse_errors").Inc(1)
metrics.NamedCounter(c.metricsScope, opName, "parse_errors", 1)
return result, fmt.Errorf("failed to parse change ID %q: %w", rawID, err)
}
changeIDs = append(changeIDs, cid)
Expand All @@ -79,26 +82,26 @@ func (c *mergeChecker) Check(ctx context.Context, queue string, change entity.Ch
// Fetch PR info from GitHub GraphQL API
prInfoMap, err := c.fetchPRInfo(ctx, changeIDs)
if err != nil {
c.metricsScope.Counter("graphql_errors").Inc(1)
metrics.NamedCounter(c.metricsScope, opName, "graphql_errors", 1)
return result, fmt.Errorf("failed to fetch PR info: %w", err)
}

// Validate PR mergeability
mergeable, reason, err := validatePRs(changeIDs, prInfoMap)
if err != nil {
c.metricsScope.Counter("validation_errors").Inc(1)
metrics.NamedCounter(c.metricsScope, opName, "validation_errors", 1)
return result, err
}

if !mergeable {
c.metricsScope.Counter("not_mergeable").Inc(1)
metrics.NamedCounter(c.metricsScope, opName, "not_mergeable", 1)
c.logger.Infow("change not mergeable",
"queue", queue,
"reason", reason,
"change_uris", change.URIs,
)
} else {
c.metricsScope.Counter("mergeable").Inc(1)
metrics.NamedCounter(c.metricsScope, opName, "mergeable", 1)
}

result.Mergeable = mergeable
Expand Down
1 change: 1 addition & 0 deletions gateway/controller/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ go_library(
deps = [
"//core/consumer",
"//core/errs",
"//core/metrics",
"//entity",
"//entity/queue",
"//extension/counter",
Expand Down
16 changes: 7 additions & 9 deletions gateway/controller/land.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ import (
"context"
"errors"
"fmt"
"time"

"github.com/uber-go/tally/v4"
"github.com/uber/submitqueue/core/consumer"
"github.com/uber/submitqueue/core/errs"
"github.com/uber/submitqueue/core/metrics"
"github.com/uber/submitqueue/entity"
"github.com/uber/submitqueue/entity/queue"
"github.com/uber/submitqueue/extension/counter"
Expand Down Expand Up @@ -55,21 +55,19 @@ type LandController struct {
func NewLandController(logger *zap.SugaredLogger, scope tally.Scope, counter counter.Counter, requestLogStore storage.RequestLogStore, registry consumer.TopicRegistry) *LandController {
return &LandController{
logger: logger,
metricsScope: scope,
metricsScope: scope.SubScope("land_controller"),
counter: counter,
requestLogStore: requestLogStore,
registry: registry,
}
}

// Land handles the land request and returns a response
func (c *LandController) Land(ctx context.Context, req *pb.LandRequest) (*pb.LandResponse, error) {
start := time.Now()
defer func() {
c.metricsScope.Timer("land_request_latency").Record(time.Since(start))
}()
func (c *LandController) Land(ctx context.Context, req *pb.LandRequest) (resp *pb.LandResponse, retErr error) {
const opName = "land"

c.metricsScope.Counter("land_request_count").Inc(1)
op := metrics.Begin(c.metricsScope, opName)
defer func() { op.Complete(retErr) }()

// Validate required fields.
if req.Queue == "" {
Expand Down Expand Up @@ -131,7 +129,7 @@ func (c *LandController) Land(ctx context.Context, req *pb.LandRequest) (*pb.Lan
"sqid", landRequest.ID,
"topic_key", consumer.TopicKeyStart,
)
c.metricsScope.Counter("publish_success").Inc(1)
metrics.NamedCounter(c.metricsScope, opName, "publish_success", 1)

return &pb.LandResponse{
Sqid: landRequest.ID,
Expand Down
1 change: 1 addition & 0 deletions orchestrator/controller/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ go_library(
importpath = "github.com/uber/submitqueue/orchestrator/controller",
visibility = ["//visibility:public"],
deps = [
"//core/metrics",
"//orchestrator/protopb",
"@com_github_uber_go_tally_v4//:tally",
"@org_uber_go_zap//:zap",
Expand Down
1 change: 1 addition & 0 deletions orchestrator/controller/batch/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//core/consumer",
"//core/metrics",
"//entity",
"//entity/queue",
"//extension/conflict",
Expand Down
30 changes: 16 additions & 14 deletions orchestrator/controller/batch/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

"github.com/uber-go/tally/v4"
"github.com/uber/submitqueue/core/consumer"
"github.com/uber/submitqueue/core/metrics"
"github.com/uber/submitqueue/entity"
entityqueue "github.com/uber/submitqueue/entity/queue"
"github.com/uber/submitqueue/extension/conflict"
Expand Down Expand Up @@ -71,22 +72,25 @@ func NewController(
// Process processes a batch delivery from the queue.
// Deserializes the request, groups into batch, and publishes to the score topic.
// Returns nil to ack (success), or error to nack (retry).
func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) error {
c.metricsScope.Counter("received").Inc(1)
func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (retErr error) {
const opName = "process"

op := metrics.Begin(c.metricsScope, opName)
defer func() { op.Complete(retErr) }()

msg := delivery.Message()

// Deserialize request ID from payload
rid, err := entity.RequestIDFromBytes(msg.Payload)
if err != nil {
c.metricsScope.Counter("deserialize_errors").Inc(1)
metrics.NamedCounter(c.metricsScope, opName, "deserialize_errors", 1)
return fmt.Errorf("failed to deserialize request ID: %w", err)
}

// Fetch request from storage
request, err := c.store.GetRequestStore().Get(ctx, rid.ID)
if err != nil {
c.metricsScope.Counter("storage_errors").Inc(1)
metrics.NamedCounter(c.metricsScope, opName, "storage_errors", 1)
return fmt.Errorf("failed to get request %s: %w", rid.ID, err)
}

Expand All @@ -104,7 +108,7 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er
// Generate a globally unique batch ID.
seq, err := c.counter.Next(ctx, "batch/"+request.Queue)
if err != nil {
c.metricsScope.Counter("counter_errors").Inc(1)
metrics.NamedCounter(c.metricsScope, opName, "counter_errors", 1)
return fmt.Errorf("failed to generate batch ID for queue=%s: %w", request.Queue, err)
}

Expand All @@ -125,7 +129,7 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er
entity.BatchStateMerging,
})
if err != nil {
c.metricsScope.Counter("batch_store_errors").Inc(1)
metrics.NamedCounter(c.metricsScope, opName, "batch_store_errors", 1)
return fmt.Errorf("failed to get active batches for queue=%s: %w", request.Queue, err)
}

Expand All @@ -134,7 +138,7 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er
// apply; the dependency graph only tracks the relation.
conflicts, err := c.analyzer.Analyze(ctx, batch, activeBatches)
if err != nil {
c.metricsScope.Counter("conflict_analyzer_errors").Inc(1)
metrics.NamedCounter(c.metricsScope, opName, "conflict_analyzer_errors", 1)
return fmt.Errorf("failed to analyze conflicts for batchID=%s: %w", batch.ID, err)
}

Expand All @@ -155,15 +159,15 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er
for _, depID := range conflictingIDs {
existing, err := c.store.GetBatchDependentStore().Get(ctx, depID)
if err != nil {
c.metricsScope.Counter("batch_dependent_store_errors").Inc(1)
metrics.NamedCounter(c.metricsScope, opName, "batch_dependent_store_errors", 1)
return fmt.Errorf("failed to get batch dependent for batchID=%s: %w", depID, err)
}

dependents := append(existing.Dependents, batch.ID)

newVersion := existing.Version + 1
if err := c.store.GetBatchDependentStore().UpdateDependents(ctx, depID, existing.Version, newVersion, dependents); err != nil {
c.metricsScope.Counter("batch_dependent_store_errors").Inc(1)
metrics.NamedCounter(c.metricsScope, opName, "batch_dependent_store_errors", 1)
return fmt.Errorf("failed to update batch dependent index for existing batchID=%s and new batchID=%s: %w", depID, batch.ID, err)
}
}
Expand All @@ -176,15 +180,15 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er
}

if err := c.store.GetBatchDependentStore().Create(ctx, bd); err != nil {
c.metricsScope.Counter("batch_dependent_store_errors").Inc(1)
metrics.NamedCounter(c.metricsScope, opName, "batch_dependent_store_errors", 1)
return fmt.Errorf("failed to create batch dependent index for new batchID=%s: %w", batch.ID, err)
}

// Persist batch to storage.
// This is the final operation that concludes the batch creation process. If it fails, BatchDependents will be pointing to a batch id that does not exist.
// We do not reuse batch ids, a retry of this operation will create a new batch with a new ID. The downstream logic that operates on BatchDependent should be able to handle stale entries.
if err := c.store.GetBatchStore().Create(ctx, batch); err != nil {
c.metricsScope.Counter("batch_store_errors").Inc(1)
metrics.NamedCounter(c.metricsScope, opName, "batch_store_errors", 1)
return fmt.Errorf("failed to create batch in batch store: %w", err)
}

Expand All @@ -199,7 +203,7 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er
// If it fails and the controller retries, a new batch will be created with the new batch ID but the same request ID.
// The downstream logic should be able to handle stale entries by looking at the state of the batch.
if err := c.publish(ctx, consumer.TopicKeyScore, batch.ID, batch.Queue); err != nil {
c.metricsScope.Counter("publish_errors").Inc(1)
metrics.NamedCounter(c.metricsScope, opName, "publish_errors", 1)
return fmt.Errorf("failed to publish batch ID to score topic: %w", err)
}

Expand All @@ -208,8 +212,6 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er
"topic_key", consumer.TopicKeyScore,
)

c.metricsScope.Counter("processed").Inc(1)

return nil // Success - message will be acked
}

Expand Down
1 change: 1 addition & 0 deletions orchestrator/controller/build/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//core/consumer",
"//core/metrics",
"//entity",
"//entity/queue",
"//extension/storage",
Expand Down
Loading
Loading