Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions gateway/controller/land.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,11 +124,6 @@ func (c *LandController) Land(ctx context.Context, req *pb.LandRequest) (*pb.Lan

// Publish to queue for async processing
if err := c.publishToQueue(ctx, landRequest); err != nil {
c.logger.Errorw("failed to publish request to queue",
"queue", req.Queue,
"sqid", landRequest.ID,
"error", err,
)
return nil, fmt.Errorf("LandController failed to publish request to queue: %w", err)
}

Expand Down
1 change: 0 additions & 1 deletion orchestrator/controller/batch/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//core/consumer",
"//core/errs",
"//entity",
"//entity/queue",
"//extension/counter",
Expand Down
28 changes: 1 addition & 27 deletions orchestrator/controller/batch/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (

"github.com/uber-go/tally/v4"
"github.com/uber/submitqueue/core/consumer"
"github.com/uber/submitqueue/core/errs"
"github.com/uber/submitqueue/entity"
entityqueue "github.com/uber/submitqueue/entity/queue"
"github.com/uber/submitqueue/extension/counter"
Expand Down Expand Up @@ -76,12 +75,6 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er
// Deserialize request entity
request, err := entity.RequestFromBytes(msg.Payload)
if err != nil {
c.logger.Errorw("failed to deserialize request",
"message_id", msg.ID,
"partition_key", msg.PartitionKey,
"attempt", delivery.Attempt(),
"error", err,
)
c.metricsScope.Counter("deserialize_errors").Inc(1)
// Non-retryable: malformed messages will never succeed regardless of retry count
return fmt.Errorf("failed to deserialize request: %w", err)
Expand All @@ -99,11 +92,6 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er
// Generate a globally unique batch ID.
seq, err := c.counter.Next(ctx, "batch/"+request.Queue)
if err != nil {
c.logger.Errorw("failed to generate batch ID",
"request_id", request.ID,
"queue", request.Queue,
"error", err,
)
c.metricsScope.Counter("counter_errors").Inc(1)
return fmt.Errorf("failed to generate batch ID for queue=%s: %w", request.Queue, err)
}
Expand All @@ -123,11 +111,6 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er
entity.BatchStateFinalizing,
})
if err != nil {
c.logger.Errorw("failed to get active batches",
"request_id", request.ID,
"queue", request.Queue,
"error", err,
)
c.metricsScope.Counter("batch_store_errors").Inc(1)
return fmt.Errorf("failed to get active batches for queue=%s: %w", request.Queue, err)
}
Expand All @@ -150,10 +133,6 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er

existing, err := c.store.GetBatchDependentStore().Get(ctx, dep.ID)
if err != nil && !storage.IsNotFound(err) {
c.logger.Errorw("failed to get existing batch dependent",
"batch_id", dep.ID,
"error", err,
)
c.metricsScope.Counter("batch_dependent_store_errors").Inc(1)
return fmt.Errorf("failed to get batch dependent for batchID=%s: %w", dep.ID, err)
}
Expand All @@ -175,13 +154,8 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er

// Publish to score topic
if err := c.publish(ctx, consumer.TopicKeyScore, batch); err != nil {
c.logger.Errorw("failed to publish output",
"batch_id", batch.ID,
"topic_key", consumer.TopicKeyScore,
"error", err,
)
c.metricsScope.Counter("publish_errors").Inc(1)
return errs.NewRetryableError(fmt.Errorf("failed to publish to score: %w", err))
return fmt.Errorf("failed to publish to score: %w", err)
}

c.logger.Infow("published batch to score",
Expand Down
1 change: 0 additions & 1 deletion orchestrator/controller/build/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//core/consumer",
"//core/errs",
"//entity",
"//entity/queue",
"@com_github_uber_go_tally_v4//:tally",
Expand Down
15 changes: 1 addition & 14 deletions orchestrator/controller/build/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (

"github.com/uber-go/tally/v4"
"github.com/uber/submitqueue/core/consumer"
"github.com/uber/submitqueue/core/errs"
"github.com/uber/submitqueue/entity"
entityqueue "github.com/uber/submitqueue/entity/queue"
"go.uber.org/zap"
Expand Down Expand Up @@ -68,12 +67,6 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er
// Deserialize batch entity
batch, err := entity.BatchFromBytes(msg.Payload)
if err != nil {
c.logger.Errorw("failed to deserialize batch",
"message_id", msg.ID,
"partition_key", msg.PartitionKey,
"attempt", delivery.Attempt(),
"error", err,
)
c.metricsScope.Counter("deserialize_errors").Inc(1)
// Non-retryable: malformed messages will never succeed regardless of retry count
return fmt.Errorf("failed to deserialize batch: %w", err)
Expand All @@ -100,14 +93,8 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er

// Publish build to build signal topic
if err := c.publish(ctx, consumer.TopicKeyBuildSignal, build); err != nil {
c.logger.Errorw("failed to publish output",
"batch_id", batch.ID,
"build_id", build.ID,
"topic_key", consumer.TopicKeyBuildSignal,
"error", err,
)
c.metricsScope.Counter("publish_errors").Inc(1)
return errs.NewRetryableError(fmt.Errorf("failed to publish to buildsignal: %w", err))
return fmt.Errorf("failed to publish to buildsignal: %w", err)
}

c.logger.Infow("published build to buildsignal",
Expand Down
1 change: 0 additions & 1 deletion orchestrator/controller/buildsignal/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//core/consumer",
"//core/errs",
"//entity",
"//entity/queue",
"@com_github_uber_go_tally_v4//:tally",
Expand Down
15 changes: 1 addition & 14 deletions orchestrator/controller/buildsignal/buildsignal.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (

"github.com/uber-go/tally/v4"
"github.com/uber/submitqueue/core/consumer"
"github.com/uber/submitqueue/core/errs"
"github.com/uber/submitqueue/entity"
entityqueue "github.com/uber/submitqueue/entity/queue"
"go.uber.org/zap"
Expand Down Expand Up @@ -68,12 +67,6 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er
// Deserialize build entity
build, err := entity.BuildFromBytes(msg.Payload)
if err != nil {
c.logger.Errorw("failed to deserialize build",
"message_id", msg.ID,
"partition_key", msg.PartitionKey,
"attempt", delivery.Attempt(),
"error", err,
)
c.metricsScope.Counter("deserialize_errors").Inc(1)
// Non-retryable: malformed messages will never succeed regardless of retry count
return fmt.Errorf("failed to deserialize build: %w", err)
Expand All @@ -97,14 +90,8 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er

// Publish batch to speculate topic
if err := c.publish(ctx, consumer.TopicKeySpeculate, batch); err != nil {
c.logger.Errorw("failed to publish output",
"build_id", build.ID,
"batch_id", build.BatchID,
"topic_key", consumer.TopicKeySpeculate,
"error", err,
)
c.metricsScope.Counter("publish_errors").Inc(1)
return errs.NewRetryableError(fmt.Errorf("failed to publish to speculate: %w", err))
return fmt.Errorf("failed to publish to speculate: %w", err)
}

c.logger.Infow("published batch to speculate",
Expand Down
1 change: 0 additions & 1 deletion orchestrator/controller/conclude/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//core/consumer",
"//core/errs",
"//core/metrics",
"//entity",
"//extension/storage",
Expand Down
27 changes: 2 additions & 25 deletions orchestrator/controller/conclude/conclude.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (

"github.com/uber-go/tally/v4"
"github.com/uber/submitqueue/core/consumer"
"github.com/uber/submitqueue/core/errs"
"github.com/uber/submitqueue/core/metrics"
"github.com/uber/submitqueue/entity"
"github.com/uber/submitqueue/extension/storage"
Expand Down Expand Up @@ -73,12 +72,6 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (r
// Deserialize batch entity
batch, err := entity.BatchFromBytes(msg.Payload)
if err != nil {
c.logger.Errorw("failed to deserialize batch",
"message_id", msg.ID,
"partition_key", msg.PartitionKey,
"attempt", delivery.Attempt(),
"error", err,
)
// Non-retryable: malformed messages will never succeed regardless of retry count
metrics.NamedCounter(c.metricsScope, "process", "deserialize_errors", 1)
return fmt.Errorf("failed to deserialize batch: %w", err)
Expand All @@ -100,10 +93,6 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (r
// as updated by the merge controller.
requestState, err := batchStateToRequestState(batch.State)
if err != nil {
c.logger.Errorw("unexpected batch state",
"batch_id", batch.ID,
"state", string(batch.State),
)
metrics.NamedCounter(c.metricsScope, "process", "unexpected_state_errors", 1)
return fmt.Errorf("unexpected batch state %q for batch %s: %w", batch.State, batch.ID, err)
}
Expand All @@ -112,25 +101,13 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (r
for _, requestID := range batch.Contains {
request, err := c.store.GetRequestStore().Get(ctx, requestID)
if err != nil {
c.logger.Errorw("failed to get request from storage",
"batch_id", batch.ID,
"request_id", requestID,
"error", err,
)
metrics.NamedCounter(c.metricsScope, "process", "request_store_errors", 1)
return errs.NewRetryableError(fmt.Errorf("failed to get request %s: %w", requestID, err))
return fmt.Errorf("failed to get request %s: %w", requestID, err)
}

if err := c.store.GetRequestStore().UpdateState(ctx, requestID, request.Version, requestState); err != nil {
c.logger.Errorw("failed to update request state",
"batch_id", batch.ID,
"request_id", requestID,
"from_version", request.Version,
"to_state", string(requestState),
"error", err,
)
metrics.NamedCounter(c.metricsScope, "process", "request_update_errors", 1)
return errs.NewRetryableError(fmt.Errorf("failed to update request %s state to %s: %w", requestID, requestState, err))
return fmt.Errorf("failed to update request %s state to %s: %w", requestID, requestState, err)
}

c.logger.Infow("updated request state",
Expand Down
4 changes: 2 additions & 2 deletions orchestrator/controller/conclude/conclude_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func TestController_Process(t *testing.T) {
return mockStorage
},
wantErr: true,
retryable: true,
retryable: false,
},
{
name: "request store update failure is retryable",
Expand All @@ -179,7 +179,7 @@ func TestController_Process(t *testing.T) {
return mockStorage
},
wantErr: true,
retryable: true,
retryable: false,
},
{
name: "empty contains list succeeds",
Expand Down
1 change: 0 additions & 1 deletion orchestrator/controller/merge/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//core/consumer",
"//core/errs",
"//entity",
"//entity/queue",
"@com_github_uber_go_tally_v4//:tally",
Expand Down
21 changes: 2 additions & 19 deletions orchestrator/controller/merge/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (

"github.com/uber-go/tally/v4"
"github.com/uber/submitqueue/core/consumer"
"github.com/uber/submitqueue/core/errs"
"github.com/uber/submitqueue/entity"
entityqueue "github.com/uber/submitqueue/entity/queue"
"go.uber.org/zap"
Expand Down Expand Up @@ -68,12 +67,6 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er
// Deserialize batch entity
batch, err := entity.BatchFromBytes(msg.Payload)
if err != nil {
c.logger.Errorw("failed to deserialize batch",
"message_id", msg.ID,
"partition_key", msg.PartitionKey,
"attempt", delivery.Attempt(),
"error", err,
)
c.metricsScope.Counter("deserialize_errors").Inc(1)
// Non-retryable: malformed messages will never succeed regardless of retry count
return fmt.Errorf("failed to deserialize batch: %w", err)
Expand All @@ -94,13 +87,8 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er

// Publish to conclude topic
if err := c.publish(ctx, consumer.TopicKeyConclude, batch); err != nil {
c.logger.Errorw("failed to publish to conclude",
"batch_id", batch.ID,
"topic_key", consumer.TopicKeyConclude,
"error", err,
)
c.metricsScope.Counter("publish_errors").Inc(1)
return errs.NewRetryableError(fmt.Errorf("failed to publish to conclude: %w", err))
return fmt.Errorf("failed to publish to conclude: %w", err)
}

c.logger.Infow("published batch to conclude",
Expand All @@ -110,13 +98,8 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er

// Publish to speculate topic
if err := c.publish(ctx, consumer.TopicKeySpeculate, batch); err != nil {
c.logger.Errorw("failed to publish to speculate",
"batch_id", batch.ID,
"topic_key", consumer.TopicKeySpeculate,
"error", err,
)
c.metricsScope.Counter("publish_errors").Inc(1)
return errs.NewRetryableError(fmt.Errorf("failed to publish to speculate: %w", err))
return fmt.Errorf("failed to publish to speculate: %w", err)
}

c.logger.Infow("published batch to speculate",
Expand Down
1 change: 0 additions & 1 deletion orchestrator/controller/score/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//core/consumer",
"//core/errs",
"//entity",
"//entity/queue",
"@com_github_uber_go_tally_v4//:tally",
Expand Down
14 changes: 1 addition & 13 deletions orchestrator/controller/score/score.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (

"github.com/uber-go/tally/v4"
"github.com/uber/submitqueue/core/consumer"
"github.com/uber/submitqueue/core/errs"
"github.com/uber/submitqueue/entity"
entityqueue "github.com/uber/submitqueue/entity/queue"
"go.uber.org/zap"
Expand Down Expand Up @@ -68,12 +67,6 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er
// Deserialize batch entity
batch, err := entity.BatchFromBytes(msg.Payload)
if err != nil {
c.logger.Errorw("failed to deserialize batch",
"message_id", msg.ID,
"partition_key", msg.PartitionKey,
"attempt", delivery.Attempt(),
"error", err,
)
c.metricsScope.Counter("deserialize_errors").Inc(1)
// Non-retryable: malformed messages will never succeed regardless of retry count
return fmt.Errorf("failed to deserialize batch: %w", err)
Expand All @@ -94,13 +87,8 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er

// Publish to speculate topic
if err := c.publish(ctx, consumer.TopicKeySpeculate, batch); err != nil {
c.logger.Errorw("failed to publish output",
"batch_id", batch.ID,
"topic_key", consumer.TopicKeySpeculate,
"error", err,
)
c.metricsScope.Counter("publish_errors").Inc(1)
return errs.NewRetryableError(fmt.Errorf("failed to publish to speculate: %w", err))
return fmt.Errorf("failed to publish to speculate: %w", err)
}

c.logger.Infow("published batch to speculate",
Expand Down
1 change: 0 additions & 1 deletion orchestrator/controller/speculate/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//core/consumer",
"//core/errs",
"//entity",
"//entity/queue",
"@com_github_uber_go_tally_v4//:tally",
Expand Down
Loading