diff --git a/entity/batch.go b/entity/batch.go index 8d47c677..e84bf0a5 100644 --- a/entity/batch.go +++ b/entity/batch.go @@ -10,6 +10,8 @@ const ( BatchStateUnknown BatchState = "" // BatchStateCreated is the state of a batch that has been created for processing. BatchStateCreated BatchState = "created" + // BatchStateReady is the state of a batch that has been persisted and is ready for scoring. + BatchStateReady BatchState = "ready" // BatchStateSpeculating is the state of a batch that is undergoing speculative execution. BatchStateSpeculating BatchState = "speculating" // BatchStateFinalizing is the state of a batch that is being finalized after speculative execution. diff --git a/orchestrator/controller/batch/BUILD.bazel b/orchestrator/controller/batch/BUILD.bazel index f5a824d3..491849ae 100644 --- a/orchestrator/controller/batch/BUILD.bazel +++ b/orchestrator/controller/batch/BUILD.bazel @@ -8,6 +8,7 @@ go_library( deps = [ "//core/consumer", "//core/errs", + "//core/metrics", "//entity", "//entity/queue", "//extension/counter", diff --git a/orchestrator/controller/batch/batch.go b/orchestrator/controller/batch/batch.go index 1b0b3faf..2bbe69a0 100644 --- a/orchestrator/controller/batch/batch.go +++ b/orchestrator/controller/batch/batch.go @@ -2,11 +2,13 @@ package batch import ( "context" + "errors" "fmt" "github.com/uber-go/tally/v4" "github.com/uber/submitqueue/core/consumer" "github.com/uber/submitqueue/core/errs" + "github.com/uber/submitqueue/core/metrics" "github.com/uber/submitqueue/entity" entityqueue "github.com/uber/submitqueue/entity/queue" "github.com/uber/submitqueue/extension/counter" @@ -54,8 +56,9 @@ func NewController( // Process processes a batch delivery from the queue. // Deserializes the request, groups into batch, and publishes to the score topic. // Returns nil to ack (success), or error to nack (retry). -func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) error { - c.metricsScope.Counter("received").Inc(1) +func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (retErr error) { + op := metrics.Begin(c.metricsScope, "process") + defer func() { op.Complete(retErr) }() msg := delivery.Message() @@ -68,7 +71,6 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er "attempt", delivery.Attempt(), "error", err, ) - c.metricsScope.Counter("deserialize_errors").Inc(1) // Non-retryable: malformed messages will never succeed regardless of retry count return fmt.Errorf("failed to deserialize request: %w", err) } @@ -83,15 +85,16 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er ) // Generate a globally unique batch ID. + counterOp := metrics.Begin(c.metricsScope, "counter_next") seq, err := c.counter.Next(ctx, "batch/"+request.Queue) + counterOp.Complete(err) if err != nil { c.logger.Errorw("failed to generate batch ID", "request_id", request.ID, "queue", request.Queue, "error", err, ) - c.metricsScope.Counter("counter_errors").Inc(1) - return fmt.Errorf("failed to generate batch ID for queue=%s: %w", request.Queue, err) + return errs.NewRetryableError(fmt.Errorf("failed to generate batch ID for queue=%s: %w", request.Queue, err)) } batch := entity.Batch{ @@ -102,20 +105,37 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er Version: 1, } + // Persist batch to storage. + // ErrAlreadyExists should never happen since batch IDs are generated from a unique counter. + batchCreateOp := metrics.Begin(c.metricsScope, "batch_store_create") + if err := c.store.GetBatchStore().Create(ctx, batch); err != nil { + batchCreateOp.Complete(err) + c.logger.Errorw("failed to create batch in storage", + "batch_id", batch.ID, + "error", err, + ) + if errors.Is(err, storage.ErrAlreadyExists) { + return fmt.Errorf("unexpected duplicate batch ID=%s: %w", batch.ID, err) + } + return errs.NewRetryableError(fmt.Errorf("failed to create batch: %w", err)) + } + batchCreateOp.Complete(nil) + // Get active batches for this queue to set as dependencies. + getActiveOp := metrics.Begin(c.metricsScope, "batch_store_get_active") activeBatches, err := c.store.GetBatchStore().GetByQueueAndStates(ctx, request.Queue, []entity.BatchState{ - entity.BatchStateCreated, + entity.BatchStateReady, entity.BatchStateSpeculating, entity.BatchStateFinalizing, }) + getActiveOp.Complete(err) if err != nil { c.logger.Errorw("failed to get active batches", "request_id", request.ID, "queue", request.Queue, "error", err, ) - c.metricsScope.Counter("batch_store_errors").Inc(1) - return fmt.Errorf("failed to get active batches for queue=%s: %w", request.Queue, err) + return errs.NewRetryableError(fmt.Errorf("failed to get active batches for queue=%s: %w", request.Queue, err)) } for _, dep := range activeBatches { @@ -132,27 +152,76 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er bd := entity.BatchDependent{ BatchID: dep.ID, Dependents: []string{batch.ID}, + Version: 1, } + // Get existing dependents for this batch (already in store) + getDepOp := metrics.Begin(c.metricsScope, "batch_dependent_get") existing, err := c.store.GetBatchDependentStore().Get(ctx, dep.ID) if err != nil && !storage.IsNotFound(err) { + getDepOp.Complete(err) c.logger.Errorw("failed to get existing batch dependent", "batch_id", dep.ID, "error", err, ) - c.metricsScope.Counter("batch_dependent_store_errors").Inc(1) - return fmt.Errorf("failed to get batch dependent for batchID=%s: %w", dep.ID, err) + return errs.NewRetryableError(fmt.Errorf("failed to get batch dependent for batchID=%s: %w", dep.ID, err)) } + getDepOp.Complete(nil) + if err == nil { + // Existing record found — update with merged dependents list. + // Note: existing.Dependents may have batches that are in "new" state + // indicating errors in previous batch creation pipeline. "new" + // should not be considered an active state for further processing. The + // callers of the batch dependents store should check for this. bd.Dependents = append(existing.Dependents, bd.Dependents...) + updateOp := metrics.Begin(c.metricsScope, "batch_dependent_update") + if err := c.store.GetBatchDependentStore().UpdateDependents(ctx, dep.ID, existing.Version, bd.Dependents); err != nil { + updateOp.Complete(err) + c.logger.Errorw("failed to update batch dependent", + "batch_id", dep.ID, + "error", err, + ) + return errs.NewRetryableError(fmt.Errorf("failed to update batch dependent for batchID=%s: %w", dep.ID, err)) + } + updateOp.Complete(nil) + c.logger.Debugw("updated batch dependent", + "batch_id", dep.ID, + "dependent_count", len(bd.Dependents), + ) + } else { + // No existing record — create new batch dependent entry. + createDepOp := metrics.Begin(c.metricsScope, "batch_dependent_create") + createErr := c.store.GetBatchDependentStore().Create(ctx, bd) + if createErr != nil && !errors.Is(createErr, storage.ErrAlreadyExists) { + createDepOp.Complete(createErr) + c.logger.Errorw("failed to create batch dependent", + "batch_id", dep.ID, + "error", createErr, + ) + return errs.NewRetryableError(fmt.Errorf("failed to create batch dependent for batchID=%s: %w", dep.ID, createErr)) + } + createDepOp.Complete(nil) + c.logger.Debugw("created batch dependent", + "batch_id", dep.ID, + "dependent_batch_id", batch.ID, + ) } } - // TODO: - // - Add batch to DB - // - Add to batch dependent DB + // Transition batch state from created to ready. + updateStateOp := metrics.Begin(c.metricsScope, "batch_store_update_state") + if err := c.store.GetBatchStore().UpdateState(ctx, batch.ID, batch.Version, entity.BatchStateReady); err != nil { + updateStateOp.Complete(err) + c.logger.Errorw("failed to update batch state to ready", + "batch_id", batch.ID, + "error", err, + ) + return errs.NewRetryableError(fmt.Errorf("failed to update batch state to ready: %w", err)) + } + updateStateOp.Complete(nil) - c.logger.Infow("batch created", + c.logger.Infow("batch ready", "batch_id", batch.ID, "request_id", request.ID, "queue", request.Queue, @@ -160,23 +229,23 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er ) // Publish to score topic + publishOp := metrics.Begin(c.metricsScope, "publish") if err := c.publish(ctx, consumer.TopicKeyScore, batch); err != nil { + publishOp.Complete(err) c.logger.Errorw("failed to publish output", "batch_id", batch.ID, "topic_key", consumer.TopicKeyScore, "error", err, ) - c.metricsScope.Counter("publish_errors").Inc(1) return errs.NewRetryableError(fmt.Errorf("failed to publish to score: %w", err)) } + publishOp.Complete(nil) c.logger.Infow("published batch to score", "batch_id", batch.ID, "topic_key", consumer.TopicKeyScore, ) - c.metricsScope.Counter("processed").Inc(1) - return nil // Success - message will be acked } diff --git a/orchestrator/controller/batch/batch_test.go b/orchestrator/controller/batch/batch_test.go index fb9df901..31991207 100644 --- a/orchestrator/controller/batch/batch_test.go +++ b/orchestrator/controller/batch/batch_test.go @@ -42,6 +42,8 @@ func newTestController(t *testing.T, ctrl *gomock.Controller, cnt *countermock.M if mockStorage == nil { mockBatchStore := storagemock.NewMockBatchStore(ctrl) mockBatchStore.EXPECT().GetByQueueAndStates(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes() + mockBatchStore.EXPECT().Create(gomock.Any(), gomock.Any()).Return(nil).AnyTimes() + mockBatchStore.EXPECT().UpdateState(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes() mockStorage = storagemock.NewMockStorage(ctrl) mockStorage.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes() @@ -170,6 +172,7 @@ func TestController_Process_CounterFailure(t *testing.T) { err = controller.Process(context.Background(), delivery) assert.Error(t, err) + assert.True(t, errs.IsRetryable(err)) } func TestController_Process_WithDependencies(t *testing.T) { @@ -183,15 +186,20 @@ func TestController_Process_WithDependencies(t *testing.T) { mockBatchStore := storagemock.NewMockBatchStore(ctrl) mockBatchStore.EXPECT().GetByQueueAndStates(gomock.Any(), "test-queue", gomock.Any()).Return(activeBatches, nil) + mockBatchStore.EXPECT().Create(gomock.Any(), gomock.Any()).Return(nil) + mockBatchStore.EXPECT().UpdateState(gomock.Any(), gomock.Any(), int32(1), entity.BatchStateReady).Return(nil) mockBatchDependentStore := storagemock.NewMockBatchDependentStore(ctrl) - // batch/1 has no existing dependents. + // batch/1 has no existing dependents — will be created. mockBatchDependentStore.EXPECT().Get(gomock.Any(), "test-queue/batch/1").Return(entity.BatchDependent{}, storage.ErrNotFound) - // batch/2 already has an existing dependent. + mockBatchDependentStore.EXPECT().Create(gomock.Any(), gomock.Any()).Return(nil) + // batch/2 already has an existing dependent — will be updated. mockBatchDependentStore.EXPECT().Get(gomock.Any(), "test-queue/batch/2").Return(entity.BatchDependent{ BatchID: "test-queue/batch/2", Dependents: []string{"test-queue/batch/99"}, + Version: 1, }, nil) + mockBatchDependentStore.EXPECT().UpdateDependents(gomock.Any(), "test-queue/batch/2", int32(1), gomock.Any()).Return(nil) mockStorage := storagemock.NewMockStorage(ctrl) mockStorage.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes() @@ -220,6 +228,274 @@ func TestController_Process_WithDependencies(t *testing.T) { require.NoError(t, err) } +func TestController_Process_BatchCreateFailure(t *testing.T) { + ctrl := gomock.NewController(t) + + mockBatchStore := storagemock.NewMockBatchStore(ctrl) + mockBatchStore.EXPECT().Create(gomock.Any(), gomock.Any()).Return(fmt.Errorf("db connection lost")) + + mockStorage := storagemock.NewMockStorage(ctrl) + mockStorage.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes() + + controller := newTestController(t, ctrl, newSequentialCounter(ctrl), mockStorage, nil) + + request := entity.Request{ + ID: "test-queue/123", + Queue: "test-queue", + Change: entity.Change{URIs: []string{"github://uber/service/pull/456/abc123def"}}, + LandStrategy: entity.RequestLandStrategyRebase, + State: entity.RequestStateNew, + Version: 1, + } + + payload, err := request.ToBytes() + require.NoError(t, err) + + msg := queue.NewMessage(request.ID, payload, request.Queue, nil) + delivery := queuemock.NewMockDelivery(ctrl) + delivery.EXPECT().Message().Return(msg).AnyTimes() + delivery.EXPECT().Attempt().Return(1).AnyTimes() + + err = controller.Process(context.Background(), delivery) + require.Error(t, err) + assert.True(t, errs.IsRetryable(err)) +} + +func TestController_Process_BatchCreateAlreadyExists(t *testing.T) { + ctrl := gomock.NewController(t) + + mockBatchStore := storagemock.NewMockBatchStore(ctrl) + mockBatchStore.EXPECT().Create(gomock.Any(), gomock.Any()).Return(storage.ErrAlreadyExists) + + mockStorage := storagemock.NewMockStorage(ctrl) + mockStorage.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes() + + controller := newTestController(t, ctrl, newSequentialCounter(ctrl), mockStorage, nil) + + request := entity.Request{ + ID: "test-queue/123", + Queue: "test-queue", + Change: entity.Change{URIs: []string{"github://uber/service/pull/456/abc123def"}}, + LandStrategy: entity.RequestLandStrategyRebase, + State: entity.RequestStateNew, + Version: 1, + } + + payload, err := request.ToBytes() + require.NoError(t, err) + + msg := queue.NewMessage(request.ID, payload, request.Queue, nil) + delivery := queuemock.NewMockDelivery(ctrl) + delivery.EXPECT().Message().Return(msg).AnyTimes() + delivery.EXPECT().Attempt().Return(1).AnyTimes() + + err = controller.Process(context.Background(), delivery) + require.Error(t, err) + assert.ErrorIs(t, err, storage.ErrAlreadyExists) + assert.False(t, errs.IsRetryable(err)) +} + +func TestController_Process_GetActiveBatchesFailure(t *testing.T) { + ctrl := gomock.NewController(t) + + mockBatchStore := storagemock.NewMockBatchStore(ctrl) + mockBatchStore.EXPECT().Create(gomock.Any(), gomock.Any()).Return(nil) + mockBatchStore.EXPECT().GetByQueueAndStates(gomock.Any(), "test-queue", gomock.Any()).Return(nil, fmt.Errorf("db timeout")) + + mockStorage := storagemock.NewMockStorage(ctrl) + mockStorage.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes() + + controller := newTestController(t, ctrl, newSequentialCounter(ctrl), mockStorage, nil) + + request := entity.Request{ + ID: "test-queue/123", + Queue: "test-queue", + Change: entity.Change{URIs: []string{"github://uber/service/pull/456/abc123def"}}, + LandStrategy: entity.RequestLandStrategyRebase, + State: entity.RequestStateNew, + Version: 1, + } + + payload, err := request.ToBytes() + require.NoError(t, err) + + msg := queue.NewMessage(request.ID, payload, request.Queue, nil) + delivery := queuemock.NewMockDelivery(ctrl) + delivery.EXPECT().Message().Return(msg).AnyTimes() + delivery.EXPECT().Attempt().Return(1).AnyTimes() + + err = controller.Process(context.Background(), delivery) + require.Error(t, err) + assert.True(t, errs.IsRetryable(err)) +} + +func TestController_Process_GetBatchDependentFailure(t *testing.T) { + ctrl := gomock.NewController(t) + + activeBatches := []entity.Batch{ + {ID: "test-queue/batch/1", Queue: "test-queue", State: entity.BatchStateReady, Version: 1}, + } + + mockBatchStore := storagemock.NewMockBatchStore(ctrl) + mockBatchStore.EXPECT().Create(gomock.Any(), gomock.Any()).Return(nil) + mockBatchStore.EXPECT().GetByQueueAndStates(gomock.Any(), "test-queue", gomock.Any()).Return(activeBatches, nil) + + mockBatchDependentStore := storagemock.NewMockBatchDependentStore(ctrl) + mockBatchDependentStore.EXPECT().Get(gomock.Any(), "test-queue/batch/1").Return(entity.BatchDependent{}, fmt.Errorf("db error")) + + mockStorage := storagemock.NewMockStorage(ctrl) + mockStorage.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes() + mockStorage.EXPECT().GetBatchDependentStore().Return(mockBatchDependentStore).AnyTimes() + + controller := newTestController(t, ctrl, newSequentialCounter(ctrl), mockStorage, nil) + + request := entity.Request{ + ID: "test-queue/123", + Queue: "test-queue", + Change: entity.Change{URIs: []string{"github://uber/service/pull/456/abc123def"}}, + LandStrategy: entity.RequestLandStrategyRebase, + State: entity.RequestStateNew, + Version: 1, + } + + payload, err := request.ToBytes() + require.NoError(t, err) + + msg := queue.NewMessage(request.ID, payload, request.Queue, nil) + delivery := queuemock.NewMockDelivery(ctrl) + delivery.EXPECT().Message().Return(msg).AnyTimes() + delivery.EXPECT().Attempt().Return(1).AnyTimes() + + err = controller.Process(context.Background(), delivery) + require.Error(t, err) + assert.True(t, errs.IsRetryable(err)) +} + +func TestController_Process_UpdateDependentsFailure(t *testing.T) { + ctrl := gomock.NewController(t) + + activeBatches := []entity.Batch{ + {ID: "test-queue/batch/1", Queue: "test-queue", State: entity.BatchStateReady, Version: 1}, + } + + mockBatchStore := storagemock.NewMockBatchStore(ctrl) + mockBatchStore.EXPECT().Create(gomock.Any(), gomock.Any()).Return(nil) + mockBatchStore.EXPECT().GetByQueueAndStates(gomock.Any(), "test-queue", gomock.Any()).Return(activeBatches, nil) + + mockBatchDependentStore := storagemock.NewMockBatchDependentStore(ctrl) + mockBatchDependentStore.EXPECT().Get(gomock.Any(), "test-queue/batch/1").Return(entity.BatchDependent{ + BatchID: "test-queue/batch/1", + Dependents: []string{"test-queue/batch/99"}, + Version: 1, + }, nil) + mockBatchDependentStore.EXPECT().UpdateDependents(gomock.Any(), "test-queue/batch/1", int32(1), gomock.Any()).Return(fmt.Errorf("version conflict")) + + mockStorage := storagemock.NewMockStorage(ctrl) + mockStorage.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes() + mockStorage.EXPECT().GetBatchDependentStore().Return(mockBatchDependentStore).AnyTimes() + + controller := newTestController(t, ctrl, newSequentialCounter(ctrl), mockStorage, nil) + + request := entity.Request{ + ID: "test-queue/123", + Queue: "test-queue", + Change: entity.Change{URIs: []string{"github://uber/service/pull/456/abc123def"}}, + LandStrategy: entity.RequestLandStrategyRebase, + State: entity.RequestStateNew, + Version: 1, + } + + payload, err := request.ToBytes() + require.NoError(t, err) + + msg := queue.NewMessage(request.ID, payload, request.Queue, nil) + delivery := queuemock.NewMockDelivery(ctrl) + delivery.EXPECT().Message().Return(msg).AnyTimes() + delivery.EXPECT().Attempt().Return(1).AnyTimes() + + err = controller.Process(context.Background(), delivery) + require.Error(t, err) + assert.True(t, errs.IsRetryable(err)) +} + +func TestController_Process_CreateBatchDependentFailure(t *testing.T) { + ctrl := gomock.NewController(t) + + activeBatches := []entity.Batch{ + {ID: "test-queue/batch/1", Queue: "test-queue", State: entity.BatchStateReady, Version: 1}, + } + + mockBatchStore := storagemock.NewMockBatchStore(ctrl) + mockBatchStore.EXPECT().Create(gomock.Any(), gomock.Any()).Return(nil) + mockBatchStore.EXPECT().GetByQueueAndStates(gomock.Any(), "test-queue", gomock.Any()).Return(activeBatches, nil) + + mockBatchDependentStore := storagemock.NewMockBatchDependentStore(ctrl) + mockBatchDependentStore.EXPECT().Get(gomock.Any(), "test-queue/batch/1").Return(entity.BatchDependent{}, storage.ErrNotFound) + mockBatchDependentStore.EXPECT().Create(gomock.Any(), gomock.Any()).Return(fmt.Errorf("db error")) + + mockStorage := storagemock.NewMockStorage(ctrl) + mockStorage.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes() + mockStorage.EXPECT().GetBatchDependentStore().Return(mockBatchDependentStore).AnyTimes() + + controller := newTestController(t, ctrl, newSequentialCounter(ctrl), mockStorage, nil) + + request := entity.Request{ + ID: "test-queue/123", + Queue: "test-queue", + Change: entity.Change{URIs: []string{"github://uber/service/pull/456/abc123def"}}, + LandStrategy: entity.RequestLandStrategyRebase, + State: entity.RequestStateNew, + Version: 1, + } + + payload, err := request.ToBytes() + require.NoError(t, err) + + msg := queue.NewMessage(request.ID, payload, request.Queue, nil) + delivery := queuemock.NewMockDelivery(ctrl) + delivery.EXPECT().Message().Return(msg).AnyTimes() + delivery.EXPECT().Attempt().Return(1).AnyTimes() + + err = controller.Process(context.Background(), delivery) + require.Error(t, err) + assert.True(t, errs.IsRetryable(err)) +} + +func TestController_Process_UpdateStateFailure(t *testing.T) { + ctrl := gomock.NewController(t) + + mockBatchStore := storagemock.NewMockBatchStore(ctrl) + mockBatchStore.EXPECT().Create(gomock.Any(), gomock.Any()).Return(nil) + mockBatchStore.EXPECT().GetByQueueAndStates(gomock.Any(), "test-queue", gomock.Any()).Return(nil, nil) + mockBatchStore.EXPECT().UpdateState(gomock.Any(), gomock.Any(), int32(1), entity.BatchStateReady).Return(fmt.Errorf("version conflict")) + + mockStorage := storagemock.NewMockStorage(ctrl) + mockStorage.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes() + + controller := newTestController(t, ctrl, newSequentialCounter(ctrl), mockStorage, nil) + + request := entity.Request{ + ID: "test-queue/123", + Queue: "test-queue", + Change: entity.Change{URIs: []string{"github://uber/service/pull/456/abc123def"}}, + LandStrategy: entity.RequestLandStrategyRebase, + State: entity.RequestStateNew, + Version: 1, + } + + payload, err := request.ToBytes() + require.NoError(t, err) + + msg := queue.NewMessage(request.ID, payload, request.Queue, nil) + delivery := queuemock.NewMockDelivery(ctrl) + delivery.EXPECT().Message().Return(msg).AnyTimes() + delivery.EXPECT().Attempt().Return(1).AnyTimes() + + err = controller.Process(context.Background(), delivery) + require.Error(t, err) + assert.True(t, errs.IsRetryable(err)) +} + func TestController_InterfaceImplementation(t *testing.T) { ctrl := gomock.NewController(t) controller := newTestController(t, ctrl, newSequentialCounter(ctrl), nil, nil)