diff --git a/entity/batch.go b/entity/batch.go index f68bcdcb..5d80c3a8 100644 --- a/entity/batch.go +++ b/entity/batch.go @@ -89,3 +89,21 @@ func BatchFromBytes(data []byte) (Batch, error) { err := json.Unmarshal(data, &batch) return batch, err } + +// BatchID is a lightweight entity for publishing and consuming just the batch identifier via the queue. +type BatchID struct { + // ID is the globally unique identifier for the batch. + ID string `json:"id"` +} + +// ToBytes serializes the BatchID to JSON bytes for queue message payload. +func (b BatchID) ToBytes() ([]byte, error) { + return json.Marshal(b) +} + +// BatchIDFromBytes deserializes a BatchID from JSON bytes. +func BatchIDFromBytes(data []byte) (BatchID, error) { + var bid BatchID + err := json.Unmarshal(data, &bid) + return bid, err +} diff --git a/entity/request.go b/entity/request.go index 5054932b..3736bec9 100644 --- a/entity/request.go +++ b/entity/request.go @@ -104,3 +104,21 @@ func RequestFromBytes(data []byte) (Request, error) { err := json.Unmarshal(data, &req) return req, err } + +// RequestID is a lightweight entity for publishing and consuming just the request identifier via the queue. +type RequestID struct { + // ID is the globally unique identifier for the land request. + ID string `json:"id"` +} + +// ToBytes serializes the RequestID to JSON bytes for queue message payload. +func (r RequestID) ToBytes() ([]byte, error) { + return json.Marshal(r) +} + +// RequestIDFromBytes deserializes a RequestID from JSON bytes. +func RequestIDFromBytes(data []byte) (RequestID, error) { + var rid RequestID + err := json.Unmarshal(data, &rid) + return rid, err +} diff --git a/example/server/orchestrator/main.go b/example/server/orchestrator/main.go index 01f45cc4..25f13364 100644 --- a/example/server/orchestrator/main.go +++ b/example/server/orchestrator/main.go @@ -390,6 +390,7 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t validateController := validate.NewController( logger, scope, + store, registry, mc, consumer.TopicKeyValidate, @@ -415,6 +416,7 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t scoreController := score.NewController( logger, scope, + store, registry, consumer.TopicKeyScore, "orchestrator-score", @@ -426,6 +428,7 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t speculateController := speculate.NewController( logger, scope, + store, registry, consumer.TopicKeySpeculate, "orchestrator-speculate", @@ -437,6 +440,7 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t buildController := build.NewController( logger, scope, + store, registry, consumer.TopicKeyBuild, "orchestrator-build", @@ -448,6 +452,7 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t buildsignalController := buildsignal.NewController( logger, scope, + store, registry, consumer.TopicKeyBuildSignal, "orchestrator-buildsignal", @@ -459,6 +464,7 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t mergeController := merge.NewController( logger, scope, + store, registry, consumer.TopicKeyMerge, "orchestrator-merge", diff --git a/orchestrator/controller/batch/BUILD.bazel b/orchestrator/controller/batch/BUILD.bazel index 5138cc41..1633aa71 100644 --- a/orchestrator/controller/batch/BUILD.bazel +++ b/orchestrator/controller/batch/BUILD.bazel @@ -22,7 +22,6 @@ go_test( embed = [":batch"], deps = [ "//core/consumer", - "//core/errs", "//entity", "//entity/queue", "//extension/counter/mock", diff --git a/orchestrator/controller/batch/batch.go b/orchestrator/controller/batch/batch.go index 60767ba3..803de2db 100644 --- a/orchestrator/controller/batch/batch.go +++ b/orchestrator/controller/batch/batch.go @@ -16,6 +16,7 @@ package batch import ( "context" + "errors" "fmt" "github.com/uber-go/tally/v4" @@ -72,12 +73,18 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er msg := delivery.Message() - // Deserialize request entity - request, err := entity.RequestFromBytes(msg.Payload) + // Deserialize request ID from payload + rid, err := entity.RequestIDFromBytes(msg.Payload) if err != nil { c.metricsScope.Counter("deserialize_errors").Inc(1) - // Non-retryable: malformed messages will never succeed regardless of retry count - return fmt.Errorf("failed to deserialize request: %w", err) + return fmt.Errorf("failed to deserialize request ID: %w", err) + } + + // Fetch request from storage + request, err := c.store.GetRequestStore().Get(ctx, rid.ID) + if err != nil { + c.metricsScope.Counter("storage_errors").Inc(1) + return fmt.Errorf("failed to get request %s: %w", rid.ID, err) } c.logger.Infow("received batch event", @@ -139,11 +146,18 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er if err == nil { bd.Dependents = append(existing.Dependents, bd.Dependents...) } + + if err := c.store.GetBatchDependentStore().Create(ctx, bd); err != nil && !errors.Is(err, storage.ErrAlreadyExists) { + c.metricsScope.Counter("batch_dependent_store_errors").Inc(1) + return fmt.Errorf("failed to create batch dependent for batchID=%s: %w", dep.ID, err) + } } - // TODO: - // - Add batch to DB - // - Add to batch dependent DB + // Persist batch to storage (idempotent — ErrAlreadyExists means a retry) + if err := c.store.GetBatchStore().Create(ctx, batch); err != nil && !errors.Is(err, storage.ErrAlreadyExists) { + c.metricsScope.Counter("batch_store_errors").Inc(1) + return fmt.Errorf("failed to create batch: %w", err) + } c.logger.Infow("batch created", "batch_id", batch.ID, @@ -153,7 +167,7 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er ) // Publish to score topic - if err := c.publish(ctx, consumer.TopicKeyScore, batch); err != nil { + if err := c.publish(ctx, consumer.TopicKeyScore, batch.ID, batch.Queue); err != nil { c.metricsScope.Counter("publish_errors").Inc(1) return fmt.Errorf("failed to publish to score: %w", err) } @@ -168,14 +182,15 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er return nil // Success - message will be acked } -// publish publishes a batch to the specified topic key. -func (c *Controller) publish(ctx context.Context, key consumer.TopicKey, batch entity.Batch) error { - payload, err := batch.ToBytes() +// publish publishes a batch ID to the specified topic key. +func (c *Controller) publish(ctx context.Context, key consumer.TopicKey, batchID string, partitionKey string) error { + bid := entity.BatchID{ID: batchID} + payload, err := bid.ToBytes() if err != nil { - return fmt.Errorf("failed to serialize batch: %w", err) + return fmt.Errorf("failed to serialize batch ID: %w", err) } - msg := entityqueue.NewMessage(batch.ID, payload, batch.Queue, nil) + msg := entityqueue.NewMessage(batchID, payload, partitionKey, nil) q, ok := c.registry.Queue(key) if !ok { diff --git a/orchestrator/controller/batch/batch_test.go b/orchestrator/controller/batch/batch_test.go index 88c3787d..70c47751 100644 --- a/orchestrator/controller/batch/batch_test.go +++ b/orchestrator/controller/batch/batch_test.go @@ -24,7 +24,6 @@ import ( "github.com/stretchr/testify/require" "github.com/uber-go/tally/v4" "github.com/uber/submitqueue/core/consumer" - "github.com/uber/submitqueue/core/errs" "github.com/uber/submitqueue/entity" "github.com/uber/submitqueue/entity/queue" countermock "github.com/uber/submitqueue/extension/counter/mock" @@ -35,6 +34,13 @@ import ( "go.uber.org/zap/zaptest" ) +// requestIDPayload serializes a RequestID to JSON bytes for test message payloads. +func requestIDPayload(t *testing.T, id string) []byte { + payload, err := entity.RequestID{ID: id}.ToBytes() + require.NoError(t, err) + return payload +} + // newSequentialCounter returns a mock counter that returns incrementing values starting at 1. func newSequentialCounter(ctrl *gomock.Controller) *countermock.MockCounter { var seq int64 @@ -47,6 +53,18 @@ func newSequentialCounter(ctrl *gomock.Controller) *countermock.MockCounter { return cnt } +// testRequest returns a standard test request for batch tests. +func testRequest() entity.Request { + return entity.Request{ + ID: "test-queue/123", + Queue: "test-queue", + Change: entity.Change{URIs: []string{"github://uber/service/pull/456/abc123def"}}, + LandStrategy: entity.RequestLandStrategyRebase, + State: entity.RequestStateStarted, + Version: 1, + } +} + // newTestController creates a controller with test dependencies. // If mockStorage is nil, a default MockStorage with an empty batch store is created. func newTestController(t *testing.T, ctrl *gomock.Controller, cnt *countermock.MockCounter, mockStorage *storagemock.MockStorage, publishErr error) *Controller { @@ -56,9 +74,15 @@ func newTestController(t *testing.T, ctrl *gomock.Controller, cnt *countermock.M if mockStorage == nil { mockBatchStore := storagemock.NewMockBatchStore(ctrl) mockBatchStore.EXPECT().GetByQueueAndStates(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes() + mockBatchStore.EXPECT().Create(gomock.Any(), gomock.Any()).Return(nil).AnyTimes() + + mockReqStore := storagemock.NewMockRequestStore(ctrl) + req := testRequest() + mockReqStore.EXPECT().Get(gomock.Any(), req.ID).Return(req, nil).AnyTimes() mockStorage = storagemock.NewMockStorage(ctrl) mockStorage.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes() + mockStorage.EXPECT().GetRequestStore().Return(mockReqStore).AnyTimes() } mockPub := queuemock.NewMockPublisher(ctrl) @@ -94,42 +118,34 @@ func TestController_Process_Success(t *testing.T) { controller := newTestController(t, ctrl, newSequentialCounter(ctrl), nil, nil) - request := entity.Request{ - ID: "test-queue/123", - Queue: "test-queue", - Change: entity.Change{URIs: []string{"github://uber/service/pull/456/abc123def"}}, - LandStrategy: entity.RequestLandStrategyRebase, - State: entity.RequestStateStarted, - Version: 1, - } - - payload, err := request.ToBytes() - require.NoError(t, err) - - msg := queue.NewMessage("test-queue/123", payload, "test-queue", nil) + request := testRequest() + msg := queue.NewMessage(request.ID, requestIDPayload(t, request.ID), request.Queue, nil) delivery := queuemock.NewMockDelivery(ctrl) delivery.EXPECT().Message().Return(msg).AnyTimes() delivery.EXPECT().Attempt().Return(1).AnyTimes() - err = controller.Process(context.Background(), delivery) + err := controller.Process(context.Background(), delivery) require.NoError(t, err) } -func TestController_Process_InvalidJSON(t *testing.T) { +func TestController_Process_StorageFailure(t *testing.T) { ctrl := gomock.NewController(t) - controller := newTestController(t, ctrl, newSequentialCounter(ctrl), nil, nil) + mockReqStore := storagemock.NewMockRequestStore(ctrl) + mockReqStore.EXPECT().Get(gomock.Any(), "test-queue/123").Return(entity.Request{}, fmt.Errorf("db connection lost")) + + mockStorage := storagemock.NewMockStorage(ctrl) + mockStorage.EXPECT().GetRequestStore().Return(mockReqStore).AnyTimes() + + controller := newTestController(t, ctrl, newSequentialCounter(ctrl), mockStorage, nil) - invalidPayload := []byte(`{"invalid": json"}`) - msg := queue.NewMessage("invalid-msg", invalidPayload, "partition1", nil) + msg := queue.NewMessage("test-queue/123", requestIDPayload(t, "test-queue/123"), "test-queue", nil) delivery := queuemock.NewMockDelivery(ctrl) delivery.EXPECT().Message().Return(msg).AnyTimes() delivery.EXPECT().Attempt().Return(1).AnyTimes() err := controller.Process(context.Background(), delivery) - - require.Error(t, err) - assert.False(t, errs.IsRetryable(err)) + assert.Error(t, err) } func TestController_Process_PublishFailure(t *testing.T) { @@ -137,24 +153,13 @@ func TestController_Process_PublishFailure(t *testing.T) { controller := newTestController(t, ctrl, newSequentialCounter(ctrl), nil, fmt.Errorf("publish failed")) - request := entity.Request{ - ID: "test-queue/123", - Queue: "test-queue", - Change: entity.Change{URIs: []string{"github://uber/service/pull/1/xyz789abc"}}, - LandStrategy: entity.RequestLandStrategyRebase, - State: entity.RequestStateStarted, - Version: 1, - } - - payload, err := request.ToBytes() - require.NoError(t, err) - - msg := queue.NewMessage(request.ID, payload, request.Queue, nil) + request := testRequest() + msg := queue.NewMessage(request.ID, requestIDPayload(t, request.ID), request.Queue, nil) delivery := queuemock.NewMockDelivery(ctrl) delivery.EXPECT().Message().Return(msg).AnyTimes() delivery.EXPECT().Attempt().Return(1).AnyTimes() - err = controller.Process(context.Background(), delivery) + err := controller.Process(context.Background(), delivery) assert.Error(t, err) } @@ -165,30 +170,28 @@ func TestController_Process_CounterFailure(t *testing.T) { cnt.EXPECT().Next(gomock.Any(), gomock.Any()).Return(int64(0), fmt.Errorf("counter unavailable")) controller := newTestController(t, ctrl, cnt, nil, nil) - request := entity.Request{ - ID: "test-queue/123", - Queue: "test-queue", - Change: entity.Change{URIs: []string{"github://uber/service/pull/456/abc123def"}}, - LandStrategy: entity.RequestLandStrategyRebase, - State: entity.RequestStateStarted, - Version: 1, - } - - payload, err := request.ToBytes() - require.NoError(t, err) - - msg := queue.NewMessage(request.ID, payload, request.Queue, nil) + request := testRequest() + msg := queue.NewMessage(request.ID, requestIDPayload(t, request.ID), request.Queue, nil) delivery := queuemock.NewMockDelivery(ctrl) delivery.EXPECT().Message().Return(msg).AnyTimes() delivery.EXPECT().Attempt().Return(1).AnyTimes() - err = controller.Process(context.Background(), delivery) + err := controller.Process(context.Background(), delivery) assert.Error(t, err) } func TestController_Process_WithDependencies(t *testing.T) { ctrl := gomock.NewController(t) + request := entity.Request{ + ID: "test-queue/456", + Queue: "test-queue", + Change: entity.Change{URIs: []string{"github://uber/service/pull/789/abc123def"}}, + LandStrategy: entity.RequestLandStrategyRebase, + State: entity.RequestStateStarted, + Version: 1, + } + // Set up storage with active batches to become dependencies. activeBatches := []entity.Batch{ {ID: "test-queue/batch/1", Queue: "test-queue", State: entity.BatchStateCreated, Version: 1}, @@ -197,40 +200,35 @@ func TestController_Process_WithDependencies(t *testing.T) { mockBatchStore := storagemock.NewMockBatchStore(ctrl) mockBatchStore.EXPECT().GetByQueueAndStates(gomock.Any(), "test-queue", gomock.Any()).Return(activeBatches, nil) + mockBatchStore.EXPECT().Create(gomock.Any(), gomock.Any()).Return(nil) mockBatchDependentStore := storagemock.NewMockBatchDependentStore(ctrl) // batch/1 has no existing dependents. mockBatchDependentStore.EXPECT().Get(gomock.Any(), "test-queue/batch/1").Return(entity.BatchDependent{}, storage.ErrNotFound) + mockBatchDependentStore.EXPECT().Create(gomock.Any(), gomock.Any()).Return(nil) // batch/2 already has an existing dependent. mockBatchDependentStore.EXPECT().Get(gomock.Any(), "test-queue/batch/2").Return(entity.BatchDependent{ BatchID: "test-queue/batch/2", Dependents: []string{"test-queue/batch/99"}, }, nil) + mockBatchDependentStore.EXPECT().Create(gomock.Any(), gomock.Any()).Return(nil) + + mockReqStore := storagemock.NewMockRequestStore(ctrl) + mockReqStore.EXPECT().Get(gomock.Any(), request.ID).Return(request, nil) mockStorage := storagemock.NewMockStorage(ctrl) mockStorage.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes() mockStorage.EXPECT().GetBatchDependentStore().Return(mockBatchDependentStore).AnyTimes() + mockStorage.EXPECT().GetRequestStore().Return(mockReqStore).AnyTimes() controller := newTestController(t, ctrl, newSequentialCounter(ctrl), mockStorage, nil) - request := entity.Request{ - ID: "test-queue/456", - Queue: "test-queue", - Change: entity.Change{URIs: []string{"github://uber/service/pull/789/abc123def"}}, - LandStrategy: entity.RequestLandStrategyRebase, - State: entity.RequestStateStarted, - Version: 1, - } - - payload, err := request.ToBytes() - require.NoError(t, err) - - msg := queue.NewMessage(request.ID, payload, request.Queue, nil) + msg := queue.NewMessage(request.ID, requestIDPayload(t, request.ID), request.Queue, nil) delivery := queuemock.NewMockDelivery(ctrl) delivery.EXPECT().Message().Return(msg).AnyTimes() delivery.EXPECT().Attempt().Return(1).AnyTimes() - err = controller.Process(context.Background(), delivery) + err := controller.Process(context.Background(), delivery) require.NoError(t, err) } diff --git a/orchestrator/controller/build/BUILD.bazel b/orchestrator/controller/build/BUILD.bazel index a47bcb7e..9767bc59 100644 --- a/orchestrator/controller/build/BUILD.bazel +++ b/orchestrator/controller/build/BUILD.bazel @@ -9,6 +9,7 @@ go_library( "//core/consumer", "//entity", "//entity/queue", + "//extension/storage", "@com_github_uber_go_tally_v4//:tally", "@org_uber_go_zap//:zap", ], @@ -24,6 +25,7 @@ go_test( "//entity", "//entity/queue", "//extension/queue/mock", + "//extension/storage/mock", "@com_github_stretchr_testify//assert", "@com_github_stretchr_testify//require", "@com_github_uber_go_tally_v4//:tally", diff --git a/orchestrator/controller/build/build.go b/orchestrator/controller/build/build.go index eab2b780..8bef78fa 100644 --- a/orchestrator/controller/build/build.go +++ b/orchestrator/controller/build/build.go @@ -22,6 +22,7 @@ import ( "github.com/uber/submitqueue/core/consumer" "github.com/uber/submitqueue/entity" entityqueue "github.com/uber/submitqueue/entity/queue" + "github.com/uber/submitqueue/extension/storage" "go.uber.org/zap" ) @@ -31,6 +32,7 @@ import ( type Controller struct { logger *zap.SugaredLogger metricsScope tally.Scope + store storage.Storage registry consumer.TopicRegistry topicKey consumer.TopicKey consumerGroup string @@ -43,6 +45,7 @@ var _ consumer.Controller = (*Controller)(nil) func NewController( logger *zap.SugaredLogger, scope tally.Scope, + store storage.Storage, registry consumer.TopicRegistry, topicKey consumer.TopicKey, consumerGroup string, @@ -50,6 +53,7 @@ func NewController( return &Controller{ logger: logger.Named("build_controller"), metricsScope: scope.SubScope("build_controller"), + store: store, registry: registry, topicKey: topicKey, consumerGroup: consumerGroup, @@ -64,12 +68,18 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er msg := delivery.Message() - // Deserialize batch entity - batch, err := entity.BatchFromBytes(msg.Payload) + // Deserialize batch ID from payload + bid, err := entity.BatchIDFromBytes(msg.Payload) if err != nil { c.metricsScope.Counter("deserialize_errors").Inc(1) - // Non-retryable: malformed messages will never succeed regardless of retry count - return fmt.Errorf("failed to deserialize batch: %w", err) + return fmt.Errorf("failed to deserialize batch ID: %w", err) + } + + // Fetch batch from storage + batch, err := c.store.GetBatchStore().Get(ctx, bid.ID) + if err != nil { + c.metricsScope.Counter("storage_errors").Inc(1) + return fmt.Errorf("failed to get batch %s: %w", bid.ID, err) } c.logger.Infow("received build event", diff --git a/orchestrator/controller/build/build_test.go b/orchestrator/controller/build/build_test.go index 9edae378..0e595195 100644 --- a/orchestrator/controller/build/build_test.go +++ b/orchestrator/controller/build/build_test.go @@ -27,12 +27,40 @@ import ( "github.com/uber/submitqueue/entity" "github.com/uber/submitqueue/entity/queue" queuemock "github.com/uber/submitqueue/extension/queue/mock" + storagemock "github.com/uber/submitqueue/extension/storage/mock" "go.uber.org/mock/gomock" "go.uber.org/zap/zaptest" ) +// batchIDPayload serializes a BatchID to JSON bytes for test message payloads. +func batchIDPayload(t *testing.T, id string) []byte { + payload, err := entity.BatchID{ID: id}.ToBytes() + require.NoError(t, err) + return payload +} + +// testBatch returns a standard test batch for build tests. +func testBatch() entity.Batch { + return entity.Batch{ + ID: "test-queue/batch/1", + Queue: "test-queue", + State: entity.BatchStateCreated, + Version: 1, + } +} + +// newMockStorage creates a MockStorage with a MockBatchStore that returns the given batch on Get. +func newMockStorage(ctrl *gomock.Controller, batch entity.Batch) *storagemock.MockStorage { + mockBatchStore := storagemock.NewMockBatchStore(ctrl) + mockBatchStore.EXPECT().Get(gomock.Any(), batch.ID).Return(batch, nil).AnyTimes() + + store := storagemock.NewMockStorage(ctrl) + store.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes() + return store +} + // newTestController creates a controller with test dependencies. -func newTestController(t *testing.T, ctrl *gomock.Controller, publishErr error) *Controller { +func newTestController(t *testing.T, ctrl *gomock.Controller, store *storagemock.MockStorage, publishErr error) *Controller { logger := zaptest.NewLogger(t).Sugar() scope := tally.NoopScope @@ -51,12 +79,14 @@ func newTestController(t *testing.T, ctrl *gomock.Controller, publishErr error) ) require.NoError(t, err) - return NewController(logger, scope, registry, consumer.TopicKeyBuild, "orchestrator-build") + return NewController(logger, scope, store, registry, consumer.TopicKeyBuild, "orchestrator-build") } func TestNewController(t *testing.T) { ctrl := gomock.NewController(t) - controller := newTestController(t, ctrl, nil) + batch := testBatch() + store := newMockStorage(ctrl, batch) + controller := newTestController(t, ctrl, store, nil) require.NotNil(t, controller) assert.Equal(t, consumer.TopicKeyBuild, controller.TopicKey()) @@ -67,40 +97,35 @@ func TestNewController(t *testing.T) { func TestController_Process_Success(t *testing.T) { ctrl := gomock.NewController(t) - controller := newTestController(t, ctrl, nil) + batch := testBatch() + store := newMockStorage(ctrl, batch) + controller := newTestController(t, ctrl, store, nil) - batch := entity.Batch{ - ID: "test-queue/batch/1", - Queue: "test-queue", - State: entity.BatchStateCreated, - Version: 1, - } - - payload, err := batch.ToBytes() - require.NoError(t, err) - - msg := queue.NewMessage("test-queue/batch/1", payload, "test-queue", nil) + msg := queue.NewMessage(batch.ID, batchIDPayload(t, batch.ID), batch.Queue, nil) delivery := queuemock.NewMockDelivery(ctrl) delivery.EXPECT().Message().Return(msg).AnyTimes() delivery.EXPECT().Attempt().Return(1).AnyTimes() - err = controller.Process(context.Background(), delivery) + err := controller.Process(context.Background(), delivery) require.NoError(t, err) } -func TestController_Process_InvalidJSON(t *testing.T) { +func TestController_Process_StorageFailure(t *testing.T) { ctrl := gomock.NewController(t) - controller := newTestController(t, ctrl, nil) + mockBatchStore := storagemock.NewMockBatchStore(ctrl) + mockBatchStore.EXPECT().Get(gomock.Any(), "test-queue/batch/1").Return(entity.Batch{}, fmt.Errorf("db connection lost")) + store := storagemock.NewMockStorage(ctrl) + store.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes() + + controller := newTestController(t, ctrl, store, nil) - invalidPayload := []byte(`{"invalid": json"}`) - msg := queue.NewMessage("invalid-msg", invalidPayload, "partition1", nil) + msg := queue.NewMessage("test-queue/batch/1", batchIDPayload(t, "test-queue/batch/1"), "test-queue", nil) delivery := queuemock.NewMockDelivery(ctrl) delivery.EXPECT().Message().Return(msg).AnyTimes() delivery.EXPECT().Attempt().Return(1).AnyTimes() err := controller.Process(context.Background(), delivery) - require.Error(t, err) assert.False(t, errs.IsRetryable(err)) } @@ -108,30 +133,24 @@ func TestController_Process_InvalidJSON(t *testing.T) { func TestController_Process_PublishFailure(t *testing.T) { ctrl := gomock.NewController(t) - controller := newTestController(t, ctrl, fmt.Errorf("publish failed")) - - batch := entity.Batch{ - ID: "test-queue/batch/1", - Queue: "test-queue", - State: entity.BatchStateCreated, - Version: 1, - } - - payload, err := batch.ToBytes() - require.NoError(t, err) + batch := testBatch() + store := newMockStorage(ctrl, batch) + controller := newTestController(t, ctrl, store, fmt.Errorf("publish failed")) - msg := queue.NewMessage(batch.ID, payload, batch.Queue, nil) + msg := queue.NewMessage(batch.ID, batchIDPayload(t, batch.ID), batch.Queue, nil) delivery := queuemock.NewMockDelivery(ctrl) delivery.EXPECT().Message().Return(msg).AnyTimes() delivery.EXPECT().Attempt().Return(1).AnyTimes() - err = controller.Process(context.Background(), delivery) + err := controller.Process(context.Background(), delivery) assert.Error(t, err) } func TestController_InterfaceImplementation(t *testing.T) { ctrl := gomock.NewController(t) - controller := newTestController(t, ctrl, nil) + batch := testBatch() + store := newMockStorage(ctrl, batch) + controller := newTestController(t, ctrl, store, nil) var _ consumer.Controller = controller } diff --git a/orchestrator/controller/buildsignal/BUILD.bazel b/orchestrator/controller/buildsignal/BUILD.bazel index 0ca992b7..73c973a8 100644 --- a/orchestrator/controller/buildsignal/BUILD.bazel +++ b/orchestrator/controller/buildsignal/BUILD.bazel @@ -9,6 +9,7 @@ go_library( "//core/consumer", "//entity", "//entity/queue", + "//extension/storage", "@com_github_uber_go_tally_v4//:tally", "@org_uber_go_zap//:zap", ], @@ -24,6 +25,7 @@ go_test( "//entity", "//entity/queue", "//extension/queue/mock", + "//extension/storage/mock", "@com_github_stretchr_testify//assert", "@com_github_stretchr_testify//require", "@com_github_uber_go_tally_v4//:tally", diff --git a/orchestrator/controller/buildsignal/buildsignal.go b/orchestrator/controller/buildsignal/buildsignal.go index 95a0a686..c7b4db87 100644 --- a/orchestrator/controller/buildsignal/buildsignal.go +++ b/orchestrator/controller/buildsignal/buildsignal.go @@ -22,6 +22,7 @@ import ( "github.com/uber/submitqueue/core/consumer" "github.com/uber/submitqueue/entity" entityqueue "github.com/uber/submitqueue/entity/queue" + "github.com/uber/submitqueue/extension/storage" "go.uber.org/zap" ) @@ -31,6 +32,7 @@ import ( type Controller struct { logger *zap.SugaredLogger metricsScope tally.Scope + store storage.Storage registry consumer.TopicRegistry topicKey consumer.TopicKey consumerGroup string @@ -43,6 +45,7 @@ var _ consumer.Controller = (*Controller)(nil) func NewController( logger *zap.SugaredLogger, scope tally.Scope, + store storage.Storage, registry consumer.TopicRegistry, topicKey consumer.TopicKey, consumerGroup string, @@ -50,6 +53,7 @@ func NewController( return &Controller{ logger: logger.Named("buildsignal_controller"), metricsScope: scope.SubScope("buildsignal_controller"), + store: store, registry: registry, topicKey: topicKey, consumerGroup: consumerGroup, @@ -84,19 +88,22 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er // - Evaluate build result (pass/fail) // - Update batch state based on build outcome - batch := entity.Batch{ - ID: build.BatchID, + // Fetch batch from storage to get the partition key (queue) + batch, err := c.store.GetBatchStore().Get(ctx, build.BatchID) + if err != nil { + c.metricsScope.Counter("storage_errors").Inc(1) + return fmt.Errorf("failed to get batch %s: %w", build.BatchID, err) } // Publish batch to speculate topic - if err := c.publish(ctx, consumer.TopicKeySpeculate, batch); err != nil { + if err := c.publish(ctx, consumer.TopicKeySpeculate, batch.ID, batch.Queue); err != nil { c.metricsScope.Counter("publish_errors").Inc(1) return fmt.Errorf("failed to publish to speculate: %w", err) } c.logger.Infow("published batch to speculate", "build_id", build.ID, - "batch_id", build.BatchID, + "batch_id", batch.ID, "topic_key", consumer.TopicKeySpeculate, ) @@ -105,14 +112,15 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er return nil // Success - message will be acked } -// publish publishes a batch to the specified topic key. -func (c *Controller) publish(ctx context.Context, key consumer.TopicKey, batch entity.Batch) error { - payload, err := batch.ToBytes() +// publish publishes a batch ID to the specified topic key. +func (c *Controller) publish(ctx context.Context, key consumer.TopicKey, batchID string, partitionKey string) error { + bid := entity.BatchID{ID: batchID} + payload, err := bid.ToBytes() if err != nil { - return fmt.Errorf("failed to serialize batch: %w", err) + return fmt.Errorf("failed to serialize batch ID: %w", err) } - msg := entityqueue.NewMessage(batch.ID, payload, batch.Queue, nil) + msg := entityqueue.NewMessage(batchID, payload, partitionKey, nil) q, ok := c.registry.Queue(key) if !ok { diff --git a/orchestrator/controller/buildsignal/buildsignal_test.go b/orchestrator/controller/buildsignal/buildsignal_test.go index e6157373..44db77a9 100644 --- a/orchestrator/controller/buildsignal/buildsignal_test.go +++ b/orchestrator/controller/buildsignal/buildsignal_test.go @@ -27,12 +27,26 @@ import ( "github.com/uber/submitqueue/entity" "github.com/uber/submitqueue/entity/queue" queuemock "github.com/uber/submitqueue/extension/queue/mock" + storagemock "github.com/uber/submitqueue/extension/storage/mock" "go.uber.org/mock/gomock" "go.uber.org/zap/zaptest" ) +// newMockStorage creates a MockStorage with a MockBatchStore that returns a batch for the given batchID. +func newMockStorage(ctrl *gomock.Controller, batchID string) *storagemock.MockStorage { + mockBatchStore := storagemock.NewMockBatchStore(ctrl) + mockBatchStore.EXPECT().Get(gomock.Any(), batchID).Return(entity.Batch{ + ID: batchID, + Queue: "test-queue", + }, nil).AnyTimes() + + store := storagemock.NewMockStorage(ctrl) + store.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes() + return store +} + // newTestController creates a controller with test dependencies. -func newTestController(t *testing.T, ctrl *gomock.Controller, publishErr error) *Controller { +func newTestController(t *testing.T, ctrl *gomock.Controller, store *storagemock.MockStorage, publishErr error) *Controller { logger := zaptest.NewLogger(t).Sugar() scope := tally.NoopScope @@ -51,12 +65,13 @@ func newTestController(t *testing.T, ctrl *gomock.Controller, publishErr error) ) require.NoError(t, err) - return NewController(logger, scope, registry, consumer.TopicKeyBuildSignal, "orchestrator-buildsignal") + return NewController(logger, scope, store, registry, consumer.TopicKeyBuildSignal, "orchestrator-buildsignal") } func TestNewController(t *testing.T) { ctrl := gomock.NewController(t) - controller := newTestController(t, ctrl, nil) + store := newMockStorage(ctrl, "test-queue/batch/1") + controller := newTestController(t, ctrl, store, nil) require.NotNil(t, controller) assert.Equal(t, consumer.TopicKeyBuildSignal, controller.TopicKey()) @@ -67,7 +82,8 @@ func TestNewController(t *testing.T) { func TestController_Process_Success(t *testing.T) { ctrl := gomock.NewController(t) - controller := newTestController(t, ctrl, nil) + store := newMockStorage(ctrl, "test-queue/batch/1") + controller := newTestController(t, ctrl, store, nil) build := entity.Build{ ID: "build-123", @@ -90,7 +106,8 @@ func TestController_Process_Success(t *testing.T) { func TestController_Process_InvalidJSON(t *testing.T) { ctrl := gomock.NewController(t) - controller := newTestController(t, ctrl, nil) + store := newMockStorage(ctrl, "test-queue/batch/1") + controller := newTestController(t, ctrl, store, nil) invalidPayload := []byte(`{"invalid": json"}`) msg := queue.NewMessage("invalid-msg", invalidPayload, "partition1", nil) @@ -107,7 +124,8 @@ func TestController_Process_InvalidJSON(t *testing.T) { func TestController_Process_PublishFailure(t *testing.T) { ctrl := gomock.NewController(t) - controller := newTestController(t, ctrl, fmt.Errorf("publish failed")) + store := newMockStorage(ctrl, "test-queue/batch/2") + controller := newTestController(t, ctrl, store, fmt.Errorf("publish failed")) build := entity.Build{ ID: "build-456", @@ -129,7 +147,8 @@ func TestController_Process_PublishFailure(t *testing.T) { func TestController_InterfaceImplementation(t *testing.T) { ctrl := gomock.NewController(t) - controller := newTestController(t, ctrl, nil) + store := newMockStorage(ctrl, "test-queue/batch/1") + controller := newTestController(t, ctrl, store, nil) var _ consumer.Controller = controller } diff --git a/orchestrator/controller/conclude/conclude.go b/orchestrator/controller/conclude/conclude.go index f98b537f..1c454e27 100644 --- a/orchestrator/controller/conclude/conclude.go +++ b/orchestrator/controller/conclude/conclude.go @@ -69,12 +69,18 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (r msg := delivery.Message() - // Deserialize batch entity - batch, err := entity.BatchFromBytes(msg.Payload) + // Deserialize batch ID from payload + bid, err := entity.BatchIDFromBytes(msg.Payload) if err != nil { - // Non-retryable: malformed messages will never succeed regardless of retry count metrics.NamedCounter(c.metricsScope, "process", "deserialize_errors", 1) - return fmt.Errorf("failed to deserialize batch: %w", err) + return fmt.Errorf("failed to deserialize batch ID: %w", err) + } + + // Fetch batch from storage + batch, err := c.store.GetBatchStore().Get(ctx, bid.ID) + if err != nil { + metrics.NamedCounter(c.metricsScope, "process", "storage_errors", 1) + return fmt.Errorf("failed to get batch %s: %w", bid.ID, err) } c.logger.Infow("received conclude event", diff --git a/orchestrator/controller/conclude/conclude_test.go b/orchestrator/controller/conclude/conclude_test.go index 7d1439ae..990e139d 100644 --- a/orchestrator/controller/conclude/conclude_test.go +++ b/orchestrator/controller/conclude/conclude_test.go @@ -33,6 +33,13 @@ import ( "go.uber.org/zap/zaptest" ) +// batchIDPayload serializes a BatchID to JSON bytes for test message payloads. +func batchIDPayload(t *testing.T, id string) []byte { + payload, err := entity.BatchID{ID: id}.ToBytes() + require.NoError(t, err) + return payload +} + // newTestController creates a controller with test dependencies. func newTestController(t *testing.T, ctrl *gomock.Controller, mockStorage *storagemock.MockStorage) *Controller { logger := zaptest.NewLogger(t).Sugar() @@ -40,8 +47,10 @@ func newTestController(t *testing.T, ctrl *gomock.Controller, mockStorage *stora if mockStorage == nil { mockRequestStore := storagemock.NewMockRequestStore(ctrl) + mockBatchStore := storagemock.NewMockBatchStore(ctrl) mockStorage = storagemock.NewMockStorage(ctrl) mockStorage.EXPECT().GetRequestStore().Return(mockRequestStore).AnyTimes() + mockStorage.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes() } registry, err := consumer.NewTopicRegistry(nil) @@ -78,6 +87,15 @@ func TestController_Process(t *testing.T) { Version: 3, }, setupStore: func(ctrl *gomock.Controller) *storagemock.MockStorage { + mockBatchStore := storagemock.NewMockBatchStore(ctrl) + mockBatchStore.EXPECT().Get(gomock.Any(), "test-queue/batch/1").Return(entity.Batch{ + ID: "test-queue/batch/1", + Queue: "test-queue", + Contains: []string{"test-queue/1", "test-queue/2"}, + State: entity.BatchStateSucceeded, + Version: 3, + }, nil) + mockRequestStore := storagemock.NewMockRequestStore(ctrl) mockRequestStore.EXPECT().Get(gomock.Any(), "test-queue/1").Return(entity.Request{ ID: "test-queue/1", Version: 2, State: entity.RequestStateProcessing, @@ -89,6 +107,7 @@ func TestController_Process(t *testing.T) { mockRequestStore.EXPECT().UpdateState(gomock.Any(), "test-queue/2", int32(3), entity.RequestStateLanded).Return(nil) mockStorage := storagemock.NewMockStorage(ctrl) + mockStorage.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes() mockStorage.EXPECT().GetRequestStore().Return(mockRequestStore).AnyTimes() return mockStorage }, @@ -103,6 +122,15 @@ func TestController_Process(t *testing.T) { Version: 2, }, setupStore: func(ctrl *gomock.Controller) *storagemock.MockStorage { + mockBatchStore := storagemock.NewMockBatchStore(ctrl) + mockBatchStore.EXPECT().Get(gomock.Any(), "test-queue/batch/2").Return(entity.Batch{ + ID: "test-queue/batch/2", + Queue: "test-queue", + Contains: []string{"test-queue/5"}, + State: entity.BatchStateFailed, + Version: 2, + }, nil) + mockRequestStore := storagemock.NewMockRequestStore(ctrl) mockRequestStore.EXPECT().Get(gomock.Any(), "test-queue/5").Return(entity.Request{ ID: "test-queue/5", Version: 1, State: entity.RequestStateProcessing, @@ -110,6 +138,7 @@ func TestController_Process(t *testing.T) { mockRequestStore.EXPECT().UpdateState(gomock.Any(), "test-queue/5", int32(1), entity.RequestStateError).Return(nil) mockStorage := storagemock.NewMockStorage(ctrl) + mockStorage.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes() mockStorage.EXPECT().GetRequestStore().Return(mockRequestStore).AnyTimes() return mockStorage }, @@ -123,6 +152,20 @@ func TestController_Process(t *testing.T) { State: entity.BatchStateCancelled, Version: 2, }, + setupStore: func(ctrl *gomock.Controller) *storagemock.MockStorage { + mockBatchStore := storagemock.NewMockBatchStore(ctrl) + mockBatchStore.EXPECT().Get(gomock.Any(), "test-queue/batch/3").Return(entity.Batch{ + ID: "test-queue/batch/3", + Queue: "test-queue", + Contains: []string{"test-queue/10"}, + State: entity.BatchStateCancelled, + Version: 2, + }, nil) + + mockStorage := storagemock.NewMockStorage(ctrl) + mockStorage.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes() + return mockStorage + }, wantErr: true, retryable: false, }, @@ -135,6 +178,20 @@ func TestController_Process(t *testing.T) { State: entity.BatchStateCreated, Version: 1, }, + setupStore: func(ctrl *gomock.Controller) *storagemock.MockStorage { + mockBatchStore := storagemock.NewMockBatchStore(ctrl) + mockBatchStore.EXPECT().Get(gomock.Any(), "test-queue/batch/4").Return(entity.Batch{ + ID: "test-queue/batch/4", + Queue: "test-queue", + Contains: []string{"test-queue/1"}, + State: entity.BatchStateCreated, + Version: 1, + }, nil) + + mockStorage := storagemock.NewMockStorage(ctrl) + mockStorage.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes() + return mockStorage + }, wantErr: true, retryable: false, }, @@ -148,10 +205,20 @@ func TestController_Process(t *testing.T) { Version: 2, }, setupStore: func(ctrl *gomock.Controller) *storagemock.MockStorage { + mockBatchStore := storagemock.NewMockBatchStore(ctrl) + mockBatchStore.EXPECT().Get(gomock.Any(), "test-queue/batch/5").Return(entity.Batch{ + ID: "test-queue/batch/5", + Queue: "test-queue", + Contains: []string{"test-queue/1"}, + State: entity.BatchStateSucceeded, + Version: 2, + }, nil) + mockRequestStore := storagemock.NewMockRequestStore(ctrl) mockRequestStore.EXPECT().Get(gomock.Any(), "test-queue/1").Return(entity.Request{}, fmt.Errorf("db connection lost")) mockStorage := storagemock.NewMockStorage(ctrl) + mockStorage.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes() mockStorage.EXPECT().GetRequestStore().Return(mockRequestStore).AnyTimes() return mockStorage }, @@ -168,6 +235,15 @@ func TestController_Process(t *testing.T) { Version: 2, }, setupStore: func(ctrl *gomock.Controller) *storagemock.MockStorage { + mockBatchStore := storagemock.NewMockBatchStore(ctrl) + mockBatchStore.EXPECT().Get(gomock.Any(), "test-queue/batch/6").Return(entity.Batch{ + ID: "test-queue/batch/6", + Queue: "test-queue", + Contains: []string{"test-queue/1"}, + State: entity.BatchStateSucceeded, + Version: 2, + }, nil) + mockRequestStore := storagemock.NewMockRequestStore(ctrl) mockRequestStore.EXPECT().Get(gomock.Any(), "test-queue/1").Return(entity.Request{ ID: "test-queue/1", Version: 2, State: entity.RequestStateProcessing, @@ -175,6 +251,7 @@ func TestController_Process(t *testing.T) { mockRequestStore.EXPECT().UpdateState(gomock.Any(), "test-queue/1", int32(2), entity.RequestStateLanded).Return(storage.ErrVersionMismatch) mockStorage := storagemock.NewMockStorage(ctrl) + mockStorage.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes() mockStorage.EXPECT().GetRequestStore().Return(mockRequestStore).AnyTimes() return mockStorage }, @@ -189,6 +266,19 @@ func TestController_Process(t *testing.T) { State: entity.BatchStateSucceeded, Version: 1, }, + setupStore: func(ctrl *gomock.Controller) *storagemock.MockStorage { + mockBatchStore := storagemock.NewMockBatchStore(ctrl) + mockBatchStore.EXPECT().Get(gomock.Any(), "test-queue/batch/7").Return(entity.Batch{ + ID: "test-queue/batch/7", + Queue: "test-queue", + State: entity.BatchStateSucceeded, + Version: 1, + }, nil) + + mockStorage := storagemock.NewMockStorage(ctrl) + mockStorage.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes() + return mockStorage + }, }, } @@ -203,15 +293,12 @@ func TestController_Process(t *testing.T) { controller := newTestController(t, ctrl, mockStorage) - payload, err := tt.batch.ToBytes() - require.NoError(t, err) - - msg := queue.NewMessage(tt.batch.ID, payload, tt.batch.Queue, nil) + msg := queue.NewMessage(tt.batch.ID, batchIDPayload(t, tt.batch.ID), tt.batch.Queue, nil) delivery := queuemock.NewMockDelivery(ctrl) delivery.EXPECT().Message().Return(msg).AnyTimes() delivery.EXPECT().Attempt().Return(1).AnyTimes() - err = controller.Process(context.Background(), delivery) + err := controller.Process(context.Background(), delivery) if tt.wantErr { require.Error(t, err) @@ -223,19 +310,23 @@ func TestController_Process(t *testing.T) { } } -func TestController_Process_InvalidJSON(t *testing.T) { +func TestController_Process_StorageFailure(t *testing.T) { ctrl := gomock.NewController(t) - controller := newTestController(t, ctrl, nil) + mockBatchStore := storagemock.NewMockBatchStore(ctrl) + mockBatchStore.EXPECT().Get(gomock.Any(), "test-queue/batch/1").Return(entity.Batch{}, fmt.Errorf("db connection lost")) + + mockStorage := storagemock.NewMockStorage(ctrl) + mockStorage.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes() - invalidPayload := []byte(`{"invalid": json"}`) - msg := queue.NewMessage("invalid-msg", invalidPayload, "partition1", nil) + controller := newTestController(t, ctrl, mockStorage) + + msg := queue.NewMessage("test-queue/batch/1", batchIDPayload(t, "test-queue/batch/1"), "test-queue", nil) delivery := queuemock.NewMockDelivery(ctrl) delivery.EXPECT().Message().Return(msg).AnyTimes() delivery.EXPECT().Attempt().Return(1).AnyTimes() err := controller.Process(context.Background(), delivery) - require.Error(t, err) assert.False(t, errs.IsRetryable(err)) } diff --git a/orchestrator/controller/merge/BUILD.bazel b/orchestrator/controller/merge/BUILD.bazel index f388af3f..93333685 100644 --- a/orchestrator/controller/merge/BUILD.bazel +++ b/orchestrator/controller/merge/BUILD.bazel @@ -9,6 +9,7 @@ go_library( "//core/consumer", "//entity", "//entity/queue", + "//extension/storage", "@com_github_uber_go_tally_v4//:tally", "@org_uber_go_zap//:zap", ], @@ -24,6 +25,7 @@ go_test( "//entity", "//entity/queue", "//extension/queue/mock", + "//extension/storage/mock", "@com_github_stretchr_testify//assert", "@com_github_stretchr_testify//require", "@com_github_uber_go_tally_v4//:tally", diff --git a/orchestrator/controller/merge/merge.go b/orchestrator/controller/merge/merge.go index 7d1f1949..395e4c4c 100644 --- a/orchestrator/controller/merge/merge.go +++ b/orchestrator/controller/merge/merge.go @@ -22,6 +22,7 @@ import ( "github.com/uber/submitqueue/core/consumer" "github.com/uber/submitqueue/entity" entityqueue "github.com/uber/submitqueue/entity/queue" + "github.com/uber/submitqueue/extension/storage" "go.uber.org/zap" ) @@ -31,6 +32,7 @@ import ( type Controller struct { logger *zap.SugaredLogger metricsScope tally.Scope + store storage.Storage registry consumer.TopicRegistry topicKey consumer.TopicKey consumerGroup string @@ -43,6 +45,7 @@ var _ consumer.Controller = (*Controller)(nil) func NewController( logger *zap.SugaredLogger, scope tally.Scope, + store storage.Storage, registry consumer.TopicRegistry, topicKey consumer.TopicKey, consumerGroup string, @@ -50,6 +53,7 @@ func NewController( return &Controller{ logger: logger.Named("merge_controller"), metricsScope: scope.SubScope("merge_controller"), + store: store, registry: registry, topicKey: topicKey, consumerGroup: consumerGroup, @@ -64,12 +68,18 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er msg := delivery.Message() - // Deserialize batch entity - batch, err := entity.BatchFromBytes(msg.Payload) + // Deserialize batch ID from payload + bid, err := entity.BatchIDFromBytes(msg.Payload) if err != nil { c.metricsScope.Counter("deserialize_errors").Inc(1) - // Non-retryable: malformed messages will never succeed regardless of retry count - return fmt.Errorf("failed to deserialize batch: %w", err) + return fmt.Errorf("failed to deserialize batch ID: %w", err) + } + + // Fetch batch from storage + batch, err := c.store.GetBatchStore().Get(ctx, bid.ID) + if err != nil { + c.metricsScope.Counter("storage_errors").Inc(1) + return fmt.Errorf("failed to get batch %s: %w", bid.ID, err) } c.logger.Infow("received merge event", @@ -86,7 +96,7 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er // - Handle merge conflicts // Publish to conclude topic - if err := c.publish(ctx, consumer.TopicKeyConclude, batch); err != nil { + if err := c.publish(ctx, consumer.TopicKeyConclude, batch.ID, batch.Queue); err != nil { c.metricsScope.Counter("publish_errors").Inc(1) return fmt.Errorf("failed to publish to conclude: %w", err) } @@ -97,7 +107,7 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er ) // Publish to speculate topic - if err := c.publish(ctx, consumer.TopicKeySpeculate, batch); err != nil { + if err := c.publish(ctx, consumer.TopicKeySpeculate, batch.ID, batch.Queue); err != nil { c.metricsScope.Counter("publish_errors").Inc(1) return fmt.Errorf("failed to publish to speculate: %w", err) } @@ -112,14 +122,15 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er return nil // Success - message will be acked } -// publish publishes a batch to the specified topic key. -func (c *Controller) publish(ctx context.Context, key consumer.TopicKey, batch entity.Batch) error { - payload, err := batch.ToBytes() +// publish publishes a batch ID to the specified topic key. +func (c *Controller) publish(ctx context.Context, key consumer.TopicKey, batchID string, partitionKey string) error { + bid := entity.BatchID{ID: batchID} + payload, err := bid.ToBytes() if err != nil { - return fmt.Errorf("failed to serialize batch: %w", err) + return fmt.Errorf("failed to serialize batch ID: %w", err) } - msg := entityqueue.NewMessage(batch.ID, payload, batch.Queue, nil) + msg := entityqueue.NewMessage(batchID, payload, partitionKey, nil) q, ok := c.registry.Queue(key) if !ok { diff --git a/orchestrator/controller/merge/merge_test.go b/orchestrator/controller/merge/merge_test.go index 1677dc79..aa9f0738 100644 --- a/orchestrator/controller/merge/merge_test.go +++ b/orchestrator/controller/merge/merge_test.go @@ -27,12 +27,40 @@ import ( "github.com/uber/submitqueue/entity" "github.com/uber/submitqueue/entity/queue" queuemock "github.com/uber/submitqueue/extension/queue/mock" + storagemock "github.com/uber/submitqueue/extension/storage/mock" "go.uber.org/mock/gomock" "go.uber.org/zap/zaptest" ) +// batchIDPayload serializes a BatchID to JSON bytes for test message payloads. +func batchIDPayload(t *testing.T, id string) []byte { + payload, err := entity.BatchID{ID: id}.ToBytes() + require.NoError(t, err) + return payload +} + +// testBatch returns a standard test batch for merge tests. +func testBatch() entity.Batch { + return entity.Batch{ + ID: "test-queue/batch/1", + Queue: "test-queue", + State: entity.BatchStateCreated, + Version: 1, + } +} + +// newMockStorage creates a MockStorage with a MockBatchStore that returns the given batch on Get. +func newMockStorage(ctrl *gomock.Controller, batch entity.Batch) *storagemock.MockStorage { + mockBatchStore := storagemock.NewMockBatchStore(ctrl) + mockBatchStore.EXPECT().Get(gomock.Any(), batch.ID).Return(batch, nil).AnyTimes() + + store := storagemock.NewMockStorage(ctrl) + store.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes() + return store +} + // newTestController creates a controller with test dependencies. -func newTestController(t *testing.T, ctrl *gomock.Controller, publishErr error) *Controller { +func newTestController(t *testing.T, ctrl *gomock.Controller, store *storagemock.MockStorage, publishErr error) *Controller { logger := zaptest.NewLogger(t).Sugar() scope := tally.NoopScope @@ -54,12 +82,14 @@ func newTestController(t *testing.T, ctrl *gomock.Controller, publishErr error) ) require.NoError(t, err) - return NewController(logger, scope, registry, consumer.TopicKeyMerge, "orchestrator-merge") + return NewController(logger, scope, store, registry, consumer.TopicKeyMerge, "orchestrator-merge") } func TestNewController(t *testing.T) { ctrl := gomock.NewController(t) - controller := newTestController(t, ctrl, nil) + batch := testBatch() + store := newMockStorage(ctrl, batch) + controller := newTestController(t, ctrl, store, nil) require.NotNil(t, controller) assert.Equal(t, consumer.TopicKeyMerge, controller.TopicKey()) @@ -70,40 +100,35 @@ func TestNewController(t *testing.T) { func TestController_Process_Success(t *testing.T) { ctrl := gomock.NewController(t) - controller := newTestController(t, ctrl, nil) + batch := testBatch() + store := newMockStorage(ctrl, batch) + controller := newTestController(t, ctrl, store, nil) - batch := entity.Batch{ - ID: "test-queue/batch/1", - Queue: "test-queue", - State: entity.BatchStateCreated, - Version: 1, - } - - payload, err := batch.ToBytes() - require.NoError(t, err) - - msg := queue.NewMessage("test-queue/batch/1", payload, "test-queue", nil) + msg := queue.NewMessage(batch.ID, batchIDPayload(t, batch.ID), batch.Queue, nil) delivery := queuemock.NewMockDelivery(ctrl) delivery.EXPECT().Message().Return(msg).AnyTimes() delivery.EXPECT().Attempt().Return(1).AnyTimes() - err = controller.Process(context.Background(), delivery) + err := controller.Process(context.Background(), delivery) require.NoError(t, err) } -func TestController_Process_InvalidJSON(t *testing.T) { +func TestController_Process_StorageFailure(t *testing.T) { ctrl := gomock.NewController(t) - controller := newTestController(t, ctrl, nil) + mockBatchStore := storagemock.NewMockBatchStore(ctrl) + mockBatchStore.EXPECT().Get(gomock.Any(), "test-queue/batch/1").Return(entity.Batch{}, fmt.Errorf("db connection lost")) + store := storagemock.NewMockStorage(ctrl) + store.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes() + + controller := newTestController(t, ctrl, store, nil) - invalidPayload := []byte(`{"invalid": json"}`) - msg := queue.NewMessage("invalid-msg", invalidPayload, "partition1", nil) + msg := queue.NewMessage("test-queue/batch/1", batchIDPayload(t, "test-queue/batch/1"), "test-queue", nil) delivery := queuemock.NewMockDelivery(ctrl) delivery.EXPECT().Message().Return(msg).AnyTimes() delivery.EXPECT().Attempt().Return(1).AnyTimes() err := controller.Process(context.Background(), delivery) - require.Error(t, err) assert.False(t, errs.IsRetryable(err)) } @@ -111,30 +136,24 @@ func TestController_Process_InvalidJSON(t *testing.T) { func TestController_Process_PublishFailure(t *testing.T) { ctrl := gomock.NewController(t) - controller := newTestController(t, ctrl, fmt.Errorf("publish failed")) - - batch := entity.Batch{ - ID: "test-queue/batch/1", - Queue: "test-queue", - State: entity.BatchStateCreated, - Version: 1, - } - - payload, err := batch.ToBytes() - require.NoError(t, err) + batch := testBatch() + store := newMockStorage(ctrl, batch) + controller := newTestController(t, ctrl, store, fmt.Errorf("publish failed")) - msg := queue.NewMessage(batch.ID, payload, batch.Queue, nil) + msg := queue.NewMessage(batch.ID, batchIDPayload(t, batch.ID), batch.Queue, nil) delivery := queuemock.NewMockDelivery(ctrl) delivery.EXPECT().Message().Return(msg).AnyTimes() delivery.EXPECT().Attempt().Return(1).AnyTimes() - err = controller.Process(context.Background(), delivery) + err := controller.Process(context.Background(), delivery) assert.Error(t, err) } func TestController_InterfaceImplementation(t *testing.T) { ctrl := gomock.NewController(t) - controller := newTestController(t, ctrl, nil) + batch := testBatch() + store := newMockStorage(ctrl, batch) + controller := newTestController(t, ctrl, store, nil) var _ consumer.Controller = controller } diff --git a/orchestrator/controller/score/BUILD.bazel b/orchestrator/controller/score/BUILD.bazel index 525da050..d7fb2254 100644 --- a/orchestrator/controller/score/BUILD.bazel +++ b/orchestrator/controller/score/BUILD.bazel @@ -9,6 +9,7 @@ go_library( "//core/consumer", "//entity", "//entity/queue", + "//extension/storage", "@com_github_uber_go_tally_v4//:tally", "@org_uber_go_zap//:zap", ], @@ -24,6 +25,7 @@ go_test( "//entity", "//entity/queue", "//extension/queue/mock", + "//extension/storage/mock", "@com_github_stretchr_testify//assert", "@com_github_stretchr_testify//require", "@com_github_uber_go_tally_v4//:tally", diff --git a/orchestrator/controller/score/score.go b/orchestrator/controller/score/score.go index 2750c795..4a916caf 100644 --- a/orchestrator/controller/score/score.go +++ b/orchestrator/controller/score/score.go @@ -22,6 +22,7 @@ import ( "github.com/uber/submitqueue/core/consumer" "github.com/uber/submitqueue/entity" entityqueue "github.com/uber/submitqueue/entity/queue" + "github.com/uber/submitqueue/extension/storage" "go.uber.org/zap" ) @@ -31,6 +32,7 @@ import ( type Controller struct { logger *zap.SugaredLogger metricsScope tally.Scope + store storage.Storage registry consumer.TopicRegistry topicKey consumer.TopicKey consumerGroup string @@ -43,6 +45,7 @@ var _ consumer.Controller = (*Controller)(nil) func NewController( logger *zap.SugaredLogger, scope tally.Scope, + store storage.Storage, registry consumer.TopicRegistry, topicKey consumer.TopicKey, consumerGroup string, @@ -50,6 +53,7 @@ func NewController( return &Controller{ logger: logger.Named("score_controller"), metricsScope: scope.SubScope("score_controller"), + store: store, registry: registry, topicKey: topicKey, consumerGroup: consumerGroup, @@ -64,12 +68,18 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er msg := delivery.Message() - // Deserialize batch entity - batch, err := entity.BatchFromBytes(msg.Payload) + // Deserialize batch ID from payload + bid, err := entity.BatchIDFromBytes(msg.Payload) if err != nil { c.metricsScope.Counter("deserialize_errors").Inc(1) - // Non-retryable: malformed messages will never succeed regardless of retry count - return fmt.Errorf("failed to deserialize batch: %w", err) + return fmt.Errorf("failed to deserialize batch ID: %w", err) + } + + // Fetch batch from storage + batch, err := c.store.GetBatchStore().Get(ctx, bid.ID) + if err != nil { + c.metricsScope.Counter("storage_errors").Inc(1) + return fmt.Errorf("failed to get batch %s: %w", bid.ID, err) } c.logger.Infow("received score event", @@ -86,7 +96,7 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er // - Apply scoring heuristics // Publish to speculate topic - if err := c.publish(ctx, consumer.TopicKeySpeculate, batch); err != nil { + if err := c.publish(ctx, consumer.TopicKeySpeculate, batch.ID, batch.Queue); err != nil { c.metricsScope.Counter("publish_errors").Inc(1) return fmt.Errorf("failed to publish to speculate: %w", err) } @@ -101,14 +111,15 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er return nil // Success - message will be acked } -// publish publishes a batch to the specified topic key. -func (c *Controller) publish(ctx context.Context, key consumer.TopicKey, batch entity.Batch) error { - payload, err := batch.ToBytes() +// publish publishes a batch ID to the specified topic key. +func (c *Controller) publish(ctx context.Context, key consumer.TopicKey, batchID string, partitionKey string) error { + bid := entity.BatchID{ID: batchID} + payload, err := bid.ToBytes() if err != nil { - return fmt.Errorf("failed to serialize batch: %w", err) + return fmt.Errorf("failed to serialize batch ID: %w", err) } - msg := entityqueue.NewMessage(batch.ID, payload, batch.Queue, nil) + msg := entityqueue.NewMessage(batchID, payload, partitionKey, nil) q, ok := c.registry.Queue(key) if !ok { diff --git a/orchestrator/controller/score/score_test.go b/orchestrator/controller/score/score_test.go index 6850e775..763dc46e 100644 --- a/orchestrator/controller/score/score_test.go +++ b/orchestrator/controller/score/score_test.go @@ -27,12 +27,40 @@ import ( "github.com/uber/submitqueue/entity" "github.com/uber/submitqueue/entity/queue" queuemock "github.com/uber/submitqueue/extension/queue/mock" + storagemock "github.com/uber/submitqueue/extension/storage/mock" "go.uber.org/mock/gomock" "go.uber.org/zap/zaptest" ) +// batchIDPayload serializes a BatchID to JSON bytes for test message payloads. +func batchIDPayload(t *testing.T, id string) []byte { + payload, err := entity.BatchID{ID: id}.ToBytes() + require.NoError(t, err) + return payload +} + +// testBatch returns a standard test batch for score tests. +func testBatch() entity.Batch { + return entity.Batch{ + ID: "test-queue/batch/1", + Queue: "test-queue", + State: entity.BatchStateCreated, + Version: 1, + } +} + +// newMockStorage creates a MockStorage with a MockBatchStore that returns the given batch on Get. +func newMockStorage(ctrl *gomock.Controller, batch entity.Batch) *storagemock.MockStorage { + mockBatchStore := storagemock.NewMockBatchStore(ctrl) + mockBatchStore.EXPECT().Get(gomock.Any(), batch.ID).Return(batch, nil).AnyTimes() + + store := storagemock.NewMockStorage(ctrl) + store.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes() + return store +} + // newTestController creates a controller with test dependencies. -func newTestController(t *testing.T, ctrl *gomock.Controller, publishErr error) *Controller { +func newTestController(t *testing.T, ctrl *gomock.Controller, store *storagemock.MockStorage, publishErr error) *Controller { logger := zaptest.NewLogger(t).Sugar() scope := tally.NoopScope @@ -51,12 +79,14 @@ func newTestController(t *testing.T, ctrl *gomock.Controller, publishErr error) ) require.NoError(t, err) - return NewController(logger, scope, registry, consumer.TopicKeyScore, "orchestrator-score") + return NewController(logger, scope, store, registry, consumer.TopicKeyScore, "orchestrator-score") } func TestNewController(t *testing.T) { ctrl := gomock.NewController(t) - controller := newTestController(t, ctrl, nil) + batch := testBatch() + store := newMockStorage(ctrl, batch) + controller := newTestController(t, ctrl, store, nil) require.NotNil(t, controller) assert.Equal(t, consumer.TopicKeyScore, controller.TopicKey()) @@ -67,40 +97,35 @@ func TestNewController(t *testing.T) { func TestController_Process_Success(t *testing.T) { ctrl := gomock.NewController(t) - controller := newTestController(t, ctrl, nil) + batch := testBatch() + store := newMockStorage(ctrl, batch) + controller := newTestController(t, ctrl, store, nil) - batch := entity.Batch{ - ID: "test-queue/batch/1", - Queue: "test-queue", - State: entity.BatchStateCreated, - Version: 1, - } - - payload, err := batch.ToBytes() - require.NoError(t, err) - - msg := queue.NewMessage("test-queue/batch/1", payload, "test-queue", nil) + msg := queue.NewMessage(batch.ID, batchIDPayload(t, batch.ID), batch.Queue, nil) delivery := queuemock.NewMockDelivery(ctrl) delivery.EXPECT().Message().Return(msg).AnyTimes() delivery.EXPECT().Attempt().Return(1).AnyTimes() - err = controller.Process(context.Background(), delivery) + err := controller.Process(context.Background(), delivery) require.NoError(t, err) } -func TestController_Process_InvalidJSON(t *testing.T) { +func TestController_Process_StorageFailure(t *testing.T) { ctrl := gomock.NewController(t) - controller := newTestController(t, ctrl, nil) + mockBatchStore := storagemock.NewMockBatchStore(ctrl) + mockBatchStore.EXPECT().Get(gomock.Any(), "test-queue/batch/1").Return(entity.Batch{}, fmt.Errorf("db connection lost")) + store := storagemock.NewMockStorage(ctrl) + store.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes() + + controller := newTestController(t, ctrl, store, nil) - invalidPayload := []byte(`{"invalid": json"}`) - msg := queue.NewMessage("invalid-msg", invalidPayload, "partition1", nil) + msg := queue.NewMessage("test-queue/batch/1", batchIDPayload(t, "test-queue/batch/1"), "test-queue", nil) delivery := queuemock.NewMockDelivery(ctrl) delivery.EXPECT().Message().Return(msg).AnyTimes() delivery.EXPECT().Attempt().Return(1).AnyTimes() err := controller.Process(context.Background(), delivery) - require.Error(t, err) assert.False(t, errs.IsRetryable(err)) } @@ -108,30 +133,24 @@ func TestController_Process_InvalidJSON(t *testing.T) { func TestController_Process_PublishFailure(t *testing.T) { ctrl := gomock.NewController(t) - controller := newTestController(t, ctrl, fmt.Errorf("publish failed")) - - batch := entity.Batch{ - ID: "test-queue/batch/1", - Queue: "test-queue", - State: entity.BatchStateCreated, - Version: 1, - } - - payload, err := batch.ToBytes() - require.NoError(t, err) + batch := testBatch() + store := newMockStorage(ctrl, batch) + controller := newTestController(t, ctrl, store, fmt.Errorf("publish failed")) - msg := queue.NewMessage(batch.ID, payload, batch.Queue, nil) + msg := queue.NewMessage(batch.ID, batchIDPayload(t, batch.ID), batch.Queue, nil) delivery := queuemock.NewMockDelivery(ctrl) delivery.EXPECT().Message().Return(msg).AnyTimes() delivery.EXPECT().Attempt().Return(1).AnyTimes() - err = controller.Process(context.Background(), delivery) + err := controller.Process(context.Background(), delivery) assert.Error(t, err) } func TestController_InterfaceImplementation(t *testing.T) { ctrl := gomock.NewController(t) - controller := newTestController(t, ctrl, nil) + batch := testBatch() + store := newMockStorage(ctrl, batch) + controller := newTestController(t, ctrl, store, nil) var _ consumer.Controller = controller } diff --git a/orchestrator/controller/speculate/BUILD.bazel b/orchestrator/controller/speculate/BUILD.bazel index 013ff592..955780c4 100644 --- a/orchestrator/controller/speculate/BUILD.bazel +++ b/orchestrator/controller/speculate/BUILD.bazel @@ -9,6 +9,7 @@ go_library( "//core/consumer", "//entity", "//entity/queue", + "//extension/storage", "@com_github_uber_go_tally_v4//:tally", "@org_uber_go_zap//:zap", ], @@ -24,6 +25,7 @@ go_test( "//entity", "//entity/queue", "//extension/queue/mock", + "//extension/storage/mock", "@com_github_stretchr_testify//assert", "@com_github_stretchr_testify//require", "@com_github_uber_go_tally_v4//:tally", diff --git a/orchestrator/controller/speculate/speculate.go b/orchestrator/controller/speculate/speculate.go index 7caf8b3c..6d1a7df7 100644 --- a/orchestrator/controller/speculate/speculate.go +++ b/orchestrator/controller/speculate/speculate.go @@ -22,6 +22,7 @@ import ( "github.com/uber/submitqueue/core/consumer" "github.com/uber/submitqueue/entity" entityqueue "github.com/uber/submitqueue/entity/queue" + "github.com/uber/submitqueue/extension/storage" "go.uber.org/zap" ) @@ -31,6 +32,7 @@ import ( type Controller struct { logger *zap.SugaredLogger metricsScope tally.Scope + store storage.Storage registry consumer.TopicRegistry topicKey consumer.TopicKey consumerGroup string @@ -43,6 +45,7 @@ var _ consumer.Controller = (*Controller)(nil) func NewController( logger *zap.SugaredLogger, scope tally.Scope, + store storage.Storage, registry consumer.TopicRegistry, topicKey consumer.TopicKey, consumerGroup string, @@ -50,6 +53,7 @@ func NewController( return &Controller{ logger: logger.Named("speculate_controller"), metricsScope: scope.SubScope("speculate_controller"), + store: store, registry: registry, topicKey: topicKey, consumerGroup: consumerGroup, @@ -64,12 +68,18 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er msg := delivery.Message() - // Deserialize batch entity - batch, err := entity.BatchFromBytes(msg.Payload) + // Deserialize batch ID from payload + bid, err := entity.BatchIDFromBytes(msg.Payload) if err != nil { c.metricsScope.Counter("deserialize_errors").Inc(1) - // Non-retryable: malformed messages will never succeed regardless of retry count - return fmt.Errorf("failed to deserialize batch: %w", err) + return fmt.Errorf("failed to deserialize batch ID: %w", err) + } + + // Fetch batch from storage + batch, err := c.store.GetBatchStore().Get(ctx, bid.ID) + if err != nil { + c.metricsScope.Counter("storage_errors").Inc(1) + return fmt.Errorf("failed to get batch %s: %w", bid.ID, err) } c.logger.Infow("received speculate event", @@ -88,7 +98,7 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er // - Publish to merge only if speculation is complete and successful (ready to land) // Publish to build topic - if err := c.publish(ctx, consumer.TopicKeyBuild, batch); err != nil { + if err := c.publish(ctx, consumer.TopicKeyBuild, batch.ID, batch.Queue); err != nil { c.metricsScope.Counter("publish_errors").Inc(1) return fmt.Errorf("failed to publish to build: %w", err) } @@ -99,7 +109,7 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er ) // Publish to merge topic - if err := c.publish(ctx, consumer.TopicKeyMerge, batch); err != nil { + if err := c.publish(ctx, consumer.TopicKeyMerge, batch.ID, batch.Queue); err != nil { c.metricsScope.Counter("publish_errors").Inc(1) return fmt.Errorf("failed to publish to merge: %w", err) } @@ -114,14 +124,15 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er return nil // Success - message will be acked } -// publish publishes a batch to the specified topic key. -func (c *Controller) publish(ctx context.Context, key consumer.TopicKey, batch entity.Batch) error { - payload, err := batch.ToBytes() +// publish publishes a batch ID to the specified topic key. +func (c *Controller) publish(ctx context.Context, key consumer.TopicKey, batchID string, partitionKey string) error { + bid := entity.BatchID{ID: batchID} + payload, err := bid.ToBytes() if err != nil { - return fmt.Errorf("failed to serialize batch: %w", err) + return fmt.Errorf("failed to serialize batch ID: %w", err) } - msg := entityqueue.NewMessage(batch.ID, payload, batch.Queue, nil) + msg := entityqueue.NewMessage(batchID, payload, partitionKey, nil) q, ok := c.registry.Queue(key) if !ok { diff --git a/orchestrator/controller/speculate/speculate_test.go b/orchestrator/controller/speculate/speculate_test.go index 51466b30..dd651f67 100644 --- a/orchestrator/controller/speculate/speculate_test.go +++ b/orchestrator/controller/speculate/speculate_test.go @@ -27,12 +27,40 @@ import ( "github.com/uber/submitqueue/entity" "github.com/uber/submitqueue/entity/queue" queuemock "github.com/uber/submitqueue/extension/queue/mock" + storagemock "github.com/uber/submitqueue/extension/storage/mock" "go.uber.org/mock/gomock" "go.uber.org/zap/zaptest" ) +// batchIDPayload serializes a BatchID to JSON bytes for test message payloads. +func batchIDPayload(t *testing.T, id string) []byte { + payload, err := entity.BatchID{ID: id}.ToBytes() + require.NoError(t, err) + return payload +} + +// testBatch returns a standard test batch for speculate tests. +func testBatch() entity.Batch { + return entity.Batch{ + ID: "test-queue/batch/1", + Queue: "test-queue", + State: entity.BatchStateCreated, + Version: 1, + } +} + +// newMockStorage creates a MockStorage with a MockBatchStore that returns the given batch on Get. +func newMockStorage(ctrl *gomock.Controller, batch entity.Batch) *storagemock.MockStorage { + mockBatchStore := storagemock.NewMockBatchStore(ctrl) + mockBatchStore.EXPECT().Get(gomock.Any(), batch.ID).Return(batch, nil).AnyTimes() + + store := storagemock.NewMockStorage(ctrl) + store.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes() + return store +} + // newTestController creates a controller with test dependencies. -func newTestController(t *testing.T, ctrl *gomock.Controller, publishErr error) *Controller { +func newTestController(t *testing.T, ctrl *gomock.Controller, store *storagemock.MockStorage, publishErr error) *Controller { logger := zaptest.NewLogger(t).Sugar() scope := tally.NoopScope @@ -54,12 +82,14 @@ func newTestController(t *testing.T, ctrl *gomock.Controller, publishErr error) ) require.NoError(t, err) - return NewController(logger, scope, registry, consumer.TopicKeySpeculate, "orchestrator-speculate") + return NewController(logger, scope, store, registry, consumer.TopicKeySpeculate, "orchestrator-speculate") } func TestNewController(t *testing.T) { ctrl := gomock.NewController(t) - controller := newTestController(t, ctrl, nil) + batch := testBatch() + store := newMockStorage(ctrl, batch) + controller := newTestController(t, ctrl, store, nil) require.NotNil(t, controller) assert.Equal(t, consumer.TopicKeySpeculate, controller.TopicKey()) @@ -70,40 +100,35 @@ func TestNewController(t *testing.T) { func TestController_Process_Success(t *testing.T) { ctrl := gomock.NewController(t) - controller := newTestController(t, ctrl, nil) + batch := testBatch() + store := newMockStorage(ctrl, batch) + controller := newTestController(t, ctrl, store, nil) - batch := entity.Batch{ - ID: "test-queue/batch/1", - Queue: "test-queue", - State: entity.BatchStateCreated, - Version: 1, - } - - payload, err := batch.ToBytes() - require.NoError(t, err) - - msg := queue.NewMessage("test-queue/batch/1", payload, "test-queue", nil) + msg := queue.NewMessage(batch.ID, batchIDPayload(t, batch.ID), batch.Queue, nil) delivery := queuemock.NewMockDelivery(ctrl) delivery.EXPECT().Message().Return(msg).AnyTimes() delivery.EXPECT().Attempt().Return(1).AnyTimes() - err = controller.Process(context.Background(), delivery) + err := controller.Process(context.Background(), delivery) require.NoError(t, err) } -func TestController_Process_InvalidJSON(t *testing.T) { +func TestController_Process_StorageFailure(t *testing.T) { ctrl := gomock.NewController(t) - controller := newTestController(t, ctrl, nil) + mockBatchStore := storagemock.NewMockBatchStore(ctrl) + mockBatchStore.EXPECT().Get(gomock.Any(), "test-queue/batch/1").Return(entity.Batch{}, fmt.Errorf("db connection lost")) + store := storagemock.NewMockStorage(ctrl) + store.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes() + + controller := newTestController(t, ctrl, store, nil) - invalidPayload := []byte(`{"invalid": json"}`) - msg := queue.NewMessage("invalid-msg", invalidPayload, "partition1", nil) + msg := queue.NewMessage("test-queue/batch/1", batchIDPayload(t, "test-queue/batch/1"), "test-queue", nil) delivery := queuemock.NewMockDelivery(ctrl) delivery.EXPECT().Message().Return(msg).AnyTimes() delivery.EXPECT().Attempt().Return(1).AnyTimes() err := controller.Process(context.Background(), delivery) - require.Error(t, err) assert.False(t, errs.IsRetryable(err)) } @@ -111,30 +136,24 @@ func TestController_Process_InvalidJSON(t *testing.T) { func TestController_Process_PublishFailure(t *testing.T) { ctrl := gomock.NewController(t) - controller := newTestController(t, ctrl, fmt.Errorf("publish failed")) - - batch := entity.Batch{ - ID: "test-queue/batch/1", - Queue: "test-queue", - State: entity.BatchStateCreated, - Version: 1, - } - - payload, err := batch.ToBytes() - require.NoError(t, err) + batch := testBatch() + store := newMockStorage(ctrl, batch) + controller := newTestController(t, ctrl, store, fmt.Errorf("publish failed")) - msg := queue.NewMessage(batch.ID, payload, batch.Queue, nil) + msg := queue.NewMessage(batch.ID, batchIDPayload(t, batch.ID), batch.Queue, nil) delivery := queuemock.NewMockDelivery(ctrl) delivery.EXPECT().Message().Return(msg).AnyTimes() delivery.EXPECT().Attempt().Return(1).AnyTimes() - err = controller.Process(context.Background(), delivery) + err := controller.Process(context.Background(), delivery) assert.Error(t, err) } func TestController_InterfaceImplementation(t *testing.T) { ctrl := gomock.NewController(t) - controller := newTestController(t, ctrl, nil) + batch := testBatch() + store := newMockStorage(ctrl, batch) + controller := newTestController(t, ctrl, store, nil) var _ consumer.Controller = controller } diff --git a/orchestrator/controller/start/start.go b/orchestrator/controller/start/start.go index 1bb3e584..be8ddf97 100644 --- a/orchestrator/controller/start/start.go +++ b/orchestrator/controller/start/start.go @@ -115,7 +115,7 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er } // Publish to validate topic - if err := c.publish(ctx, consumer.TopicKeyValidate, request); err != nil { + if err := c.publish(ctx, consumer.TopicKeyValidate, request.ID, request.Queue); err != nil { c.metricsScope.Counter("publish_errors").Inc(1) return fmt.Errorf("failed to publish to validate: %w", err) } @@ -130,14 +130,15 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er return nil // Success - message will be acked } -// publish publishes a request to the specified topic key. -func (c *Controller) publish(ctx context.Context, key consumer.TopicKey, request entity.Request) error { - payload, err := request.ToBytes() +// publish publishes a request ID to the specified topic key. +func (c *Controller) publish(ctx context.Context, key consumer.TopicKey, requestID string, partitionKey string) error { + rid := entity.RequestID{ID: requestID} + payload, err := rid.ToBytes() if err != nil { - return fmt.Errorf("failed to serialize request: %w", err) + return fmt.Errorf("failed to serialize request ID: %w", err) } - msg := entityqueue.NewMessage(request.ID, payload, request.Queue, nil) + msg := entityqueue.NewMessage(requestID, payload, partitionKey, nil) q, ok := c.registry.Queue(key) if !ok { diff --git a/orchestrator/controller/validate/BUILD.bazel b/orchestrator/controller/validate/BUILD.bazel index 3635cd57..0ea6244a 100644 --- a/orchestrator/controller/validate/BUILD.bazel +++ b/orchestrator/controller/validate/BUILD.bazel @@ -11,6 +11,7 @@ go_library( "//entity", "//entity/queue", "//extension/mergechecker", + "//extension/storage", "@com_github_uber_go_tally_v4//:tally", "@org_uber_go_zap//:zap", ], @@ -28,6 +29,7 @@ go_test( "//extension/mergechecker", "//extension/mergechecker/mock", "//extension/queue/mock", + "//extension/storage/mock", "@com_github_stretchr_testify//assert", "@com_github_stretchr_testify//require", "@com_github_uber_go_tally_v4//:tally", diff --git a/orchestrator/controller/validate/validate.go b/orchestrator/controller/validate/validate.go index e17fb009..2ee8d8b1 100644 --- a/orchestrator/controller/validate/validate.go +++ b/orchestrator/controller/validate/validate.go @@ -24,6 +24,7 @@ import ( "github.com/uber/submitqueue/entity" entityqueue "github.com/uber/submitqueue/entity/queue" "github.com/uber/submitqueue/extension/mergechecker" + "github.com/uber/submitqueue/extension/storage" "go.uber.org/zap" ) @@ -34,6 +35,7 @@ import ( type Controller struct { logger *zap.SugaredLogger metricsScope tally.Scope + store storage.Storage registry consumer.TopicRegistry mergeChecker mergechecker.MergeChecker topicKey consumer.TopicKey @@ -47,6 +49,7 @@ var _ consumer.Controller = (*Controller)(nil) func NewController( logger *zap.SugaredLogger, scope tally.Scope, + store storage.Storage, registry consumer.TopicRegistry, mergeChecker mergechecker.MergeChecker, topicKey consumer.TopicKey, @@ -55,6 +58,7 @@ func NewController( return &Controller{ logger: logger.Named("validate_controller"), metricsScope: scope.SubScope("validate_controller"), + store: store, registry: registry, mergeChecker: mergeChecker, topicKey: topicKey, @@ -70,12 +74,18 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er msg := delivery.Message() - // Deserialize request entity - request, err := entity.RequestFromBytes(msg.Payload) + // Deserialize request ID from payload + rid, err := entity.RequestIDFromBytes(msg.Payload) if err != nil { c.metricsScope.Counter("deserialize_errors").Inc(1) - // Non-retryable: malformed messages will never succeed regardless of retry count - return fmt.Errorf("failed to deserialize request: %w", err) + return fmt.Errorf("failed to deserialize request ID: %w", err) + } + + // Fetch request from storage + request, err := c.store.GetRequestStore().Get(ctx, rid.ID) + if err != nil { + c.metricsScope.Counter("storage_errors").Inc(1) + return fmt.Errorf("failed to get request %s: %w", rid.ID, err) } c.logger.Infow("received validate event", @@ -104,7 +114,7 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er } // Publish to batch topic - if err := c.publish(ctx, consumer.TopicKeyBatch, request); err != nil { + if err := c.publish(ctx, consumer.TopicKeyBatch, request.ID, request.Queue); err != nil { c.metricsScope.Counter("publish_errors").Inc(1) return fmt.Errorf("failed to publish to batch: %w", err) } @@ -119,14 +129,15 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er return nil // Success - message will be acked } -// publish publishes a request to the specified topic key. -func (c *Controller) publish(ctx context.Context, key consumer.TopicKey, request entity.Request) error { - payload, err := request.ToBytes() +// publish publishes a request ID to the specified topic key. +func (c *Controller) publish(ctx context.Context, key consumer.TopicKey, requestID string, partitionKey string) error { + rid := entity.RequestID{ID: requestID} + payload, err := rid.ToBytes() if err != nil { - return fmt.Errorf("failed to serialize request: %w", err) + return fmt.Errorf("failed to serialize request ID: %w", err) } - msg := entityqueue.NewMessage(request.ID, payload, request.Queue, nil) + msg := entityqueue.NewMessage(requestID, payload, partitionKey, nil) q, ok := c.registry.Queue(key) if !ok { diff --git a/orchestrator/controller/validate/validate_test.go b/orchestrator/controller/validate/validate_test.go index 50aef98f..cba0500d 100644 --- a/orchestrator/controller/validate/validate_test.go +++ b/orchestrator/controller/validate/validate_test.go @@ -29,10 +29,18 @@ import ( "github.com/uber/submitqueue/extension/mergechecker" mergecheckermock "github.com/uber/submitqueue/extension/mergechecker/mock" queuemock "github.com/uber/submitqueue/extension/queue/mock" + storagemock "github.com/uber/submitqueue/extension/storage/mock" "go.uber.org/mock/gomock" "go.uber.org/zap/zaptest" ) +// requestIDPayload serializes a RequestID to JSON bytes for test message payloads. +func requestIDPayload(t *testing.T, id string) []byte { + payload, err := entity.RequestID{ID: id}.ToBytes() + require.NoError(t, err) + return payload +} + // newMergeableMock returns a mock MergeChecker that always returns mergeable. func newMergeableMock(ctrl *gomock.Controller) *mergecheckermock.MockMergeChecker { mc := mergecheckermock.NewMockMergeChecker(ctrl) @@ -40,8 +48,18 @@ func newMergeableMock(ctrl *gomock.Controller) *mergecheckermock.MockMergeChecke return mc } +// newMockStorage creates a MockStorage with a MockRequestStore that returns the given request on Get. +func newMockStorage(ctrl *gomock.Controller, request entity.Request) *storagemock.MockStorage { + mockReqStore := storagemock.NewMockRequestStore(ctrl) + mockReqStore.EXPECT().Get(gomock.Any(), request.ID).Return(request, nil).AnyTimes() + + store := storagemock.NewMockStorage(ctrl) + store.EXPECT().GetRequestStore().Return(mockReqStore).AnyTimes() + return store +} + // newTestController creates a controller with test dependencies. -func newTestController(t *testing.T, ctrl *gomock.Controller, mc mergechecker.MergeChecker, publishErr error) *Controller { +func newTestController(t *testing.T, ctrl *gomock.Controller, store *storagemock.MockStorage, mc mergechecker.MergeChecker, publishErr error) *Controller { logger := zaptest.NewLogger(t).Sugar() scope := tally.NoopScope @@ -60,13 +78,22 @@ func newTestController(t *testing.T, ctrl *gomock.Controller, mc mergechecker.Me ) require.NoError(t, err) - return NewController(logger, scope, registry, mc, consumer.TopicKeyValidate, "orchestrator-validate") + return NewController(logger, scope, store, registry, mc, consumer.TopicKeyValidate, "orchestrator-validate") } func TestNewController(t *testing.T) { ctrl := gomock.NewController(t) mc := newMergeableMock(ctrl) - controller := newTestController(t, ctrl, mc, nil) + request := entity.Request{ + ID: "test-queue/123", + Queue: "test-queue", + Change: entity.Change{URIs: []string{"github://uber/service/pull/456/abc123def"}}, + LandStrategy: entity.RequestLandStrategyRebase, + State: entity.RequestStateStarted, + Version: 1, + } + store := newMockStorage(ctrl, request) + controller := newTestController(t, ctrl, store, mc, nil) require.NotNil(t, controller) assert.Equal(t, consumer.TopicKeyValidate, controller.TopicKey()) @@ -78,8 +105,6 @@ func TestController_Process_Success(t *testing.T) { ctrl := gomock.NewController(t) mc := newMergeableMock(ctrl) - controller := newTestController(t, ctrl, mc, nil) - request := entity.Request{ ID: "test-queue/123", Queue: "test-queue", @@ -88,33 +113,36 @@ func TestController_Process_Success(t *testing.T) { State: entity.RequestStateStarted, Version: 1, } + store := newMockStorage(ctrl, request) - payload, err := request.ToBytes() - require.NoError(t, err) + controller := newTestController(t, ctrl, store, mc, nil) - msg := queue.NewMessage("test-queue/123", payload, "test-queue", nil) + msg := queue.NewMessage("test-queue/123", requestIDPayload(t, request.ID), "test-queue", nil) delivery := queuemock.NewMockDelivery(ctrl) delivery.EXPECT().Message().Return(msg).AnyTimes() delivery.EXPECT().Attempt().Return(1).AnyTimes() - err = controller.Process(context.Background(), delivery) + err := controller.Process(context.Background(), delivery) require.NoError(t, err) } -func TestController_Process_InvalidJSON(t *testing.T) { +func TestController_Process_StorageFailure(t *testing.T) { ctrl := gomock.NewController(t) mc := newMergeableMock(ctrl) - controller := newTestController(t, ctrl, mc, nil) + mockReqStore := storagemock.NewMockRequestStore(ctrl) + mockReqStore.EXPECT().Get(gomock.Any(), "test-queue/123").Return(entity.Request{}, fmt.Errorf("db connection lost")) + store := storagemock.NewMockStorage(ctrl) + store.EXPECT().GetRequestStore().Return(mockReqStore).AnyTimes() + + controller := newTestController(t, ctrl, store, mc, nil) - invalidPayload := []byte(`{"invalid": json"}`) - msg := queue.NewMessage("invalid-msg", invalidPayload, "partition1", nil) + msg := queue.NewMessage("test-queue/123", requestIDPayload(t, "test-queue/123"), "test-queue", nil) delivery := queuemock.NewMockDelivery(ctrl) delivery.EXPECT().Message().Return(msg).AnyTimes() delivery.EXPECT().Attempt().Return(1).AnyTimes() err := controller.Process(context.Background(), delivery) - require.Error(t, err) assert.False(t, errs.IsRetryable(err)) } @@ -123,8 +151,6 @@ func TestController_Process_PublishFailure(t *testing.T) { ctrl := gomock.NewController(t) mc := newMergeableMock(ctrl) - controller := newTestController(t, ctrl, mc, fmt.Errorf("publish failed")) - request := entity.Request{ ID: "test-queue/123", Queue: "test-queue", @@ -133,23 +159,25 @@ func TestController_Process_PublishFailure(t *testing.T) { State: entity.RequestStateStarted, Version: 1, } + store := newMockStorage(ctrl, request) - payload, err := request.ToBytes() - require.NoError(t, err) + controller := newTestController(t, ctrl, store, mc, fmt.Errorf("publish failed")) - msg := queue.NewMessage(request.ID, payload, request.Queue, nil) + msg := queue.NewMessage(request.ID, requestIDPayload(t, request.ID), request.Queue, nil) delivery := queuemock.NewMockDelivery(ctrl) delivery.EXPECT().Message().Return(msg).AnyTimes() delivery.EXPECT().Attempt().Return(1).AnyTimes() - err = controller.Process(context.Background(), delivery) + err := controller.Process(context.Background(), delivery) assert.Error(t, err) } func TestController_InterfaceImplementation(t *testing.T) { ctrl := gomock.NewController(t) mc := newMergeableMock(ctrl) - controller := newTestController(t, ctrl, mc, nil) + request := entity.Request{ID: "test-queue/123", Queue: "test-queue"} + store := newMockStorage(ctrl, request) + controller := newTestController(t, ctrl, store, mc, nil) var _ consumer.Controller = controller } @@ -160,8 +188,6 @@ func TestController_Process_NotMergeable(t *testing.T) { mc := mergecheckermock.NewMockMergeChecker(ctrl) mc.EXPECT().Check(gomock.Any(), gomock.Any(), gomock.Any()).Return(mergechecker.Result{Mergeable: false}, nil) - controller := newTestController(t, ctrl, mc, nil) - request := entity.Request{ ID: "test-queue/123", Queue: "test-queue", @@ -170,16 +196,16 @@ func TestController_Process_NotMergeable(t *testing.T) { State: entity.RequestStateStarted, Version: 1, } + store := newMockStorage(ctrl, request) - payload, err := request.ToBytes() - require.NoError(t, err) + controller := newTestController(t, ctrl, store, mc, nil) - msg := queue.NewMessage(request.ID, payload, request.Queue, nil) + msg := queue.NewMessage(request.ID, requestIDPayload(t, request.ID), request.Queue, nil) delivery := queuemock.NewMockDelivery(ctrl) delivery.EXPECT().Message().Return(msg).AnyTimes() delivery.EXPECT().Attempt().Return(1).AnyTimes() - err = controller.Process(context.Background(), delivery) + err := controller.Process(context.Background(), delivery) require.Error(t, err) assert.False(t, errs.IsRetryable(err)) } @@ -190,8 +216,6 @@ func TestController_Process_MergeCheckError(t *testing.T) { mc := mergecheckermock.NewMockMergeChecker(ctrl) mc.EXPECT().Check(gomock.Any(), gomock.Any(), gomock.Any()).Return(mergechecker.Result{}, fmt.Errorf("merge check failed")) - controller := newTestController(t, ctrl, mc, nil) - request := entity.Request{ ID: "test-queue/123", Queue: "test-queue", @@ -200,16 +224,16 @@ func TestController_Process_MergeCheckError(t *testing.T) { State: entity.RequestStateStarted, Version: 1, } + store := newMockStorage(ctrl, request) - payload, err := request.ToBytes() - require.NoError(t, err) + controller := newTestController(t, ctrl, store, mc, nil) - msg := queue.NewMessage(request.ID, payload, request.Queue, nil) + msg := queue.NewMessage(request.ID, requestIDPayload(t, request.ID), request.Queue, nil) delivery := queuemock.NewMockDelivery(ctrl) delivery.EXPECT().Message().Return(msg).AnyTimes() delivery.EXPECT().Attempt().Return(1).AnyTimes() - err = controller.Process(context.Background(), delivery) + err := controller.Process(context.Background(), delivery) require.Error(t, err) assert.False(t, errs.IsRetryable(err)) }