diff --git a/entity/batch.go b/entity/batch.go index 8d47c677..f0a57102 100644 --- a/entity/batch.go +++ b/entity/batch.go @@ -12,8 +12,6 @@ const ( BatchStateCreated BatchState = "created" // BatchStateSpeculating is the state of a batch that is undergoing speculative execution. BatchStateSpeculating BatchState = "speculating" - // BatchStateFinalizing is the state of a batch that is being finalized after speculative execution. - BatchStateFinalizing BatchState = "finalizing" // BatchStateSucceeded is the terminal state of a batch that has been successfully landed. BatchStateSucceeded BatchState = "succeeded" // BatchStateFailed is the terminal state of a batch that has failed. diff --git a/entity/batch_test.go b/entity/batch_test.go index e204b24f..2aa88a9c 100644 --- a/entity/batch_test.go +++ b/entity/batch_test.go @@ -16,7 +16,6 @@ func TestBatchState_IsTerminal(t *testing.T) { {name: "unknown", state: BatchStateUnknown, terminal: false}, {name: "created", state: BatchStateCreated, terminal: false}, {name: "speculating", state: BatchStateSpeculating, terminal: false}, - {name: "finalizing", state: BatchStateFinalizing, terminal: false}, {name: "succeeded", state: BatchStateSucceeded, terminal: true}, {name: "failed", state: BatchStateFailed, terminal: true}, {name: "cancelled", state: BatchStateCancelled, terminal: true}, diff --git a/example/server/orchestrator/BUILD.bazel b/example/server/orchestrator/BUILD.bazel index c342fa47..4675ad40 100644 --- a/example/server/orchestrator/BUILD.bazel +++ b/example/server/orchestrator/BUILD.bazel @@ -14,6 +14,8 @@ go_library( "//core/consumer", "//extension/counter", "//extension/counter/mysql", + "//extension/landprovider", + "//extension/landprovider/github", "//extension/mergechecker", "//extension/mergechecker/github", "//extension/queue", diff --git a/example/server/orchestrator/main.go b/example/server/orchestrator/main.go index 7e500b5f..554ac4fe 100644 --- a/example/server/orchestrator/main.go +++ b/example/server/orchestrator/main.go @@ -17,8 +17,10 @@ import ( "github.com/uber/submitqueue/core/consumer" "github.com/uber/submitqueue/extension/counter" mysqlcounter "github.com/uber/submitqueue/extension/counter/mysql" + "github.com/uber/submitqueue/extension/landprovider" "github.com/uber/submitqueue/extension/mergechecker" githubchecker "github.com/uber/submitqueue/extension/mergechecker/github" + githublander "github.com/uber/submitqueue/extension/landprovider/github" extqueue "github.com/uber/submitqueue/extension/queue" queueMySQL "github.com/uber/submitqueue/extension/queue/mysql" mysqlstorage "github.com/uber/submitqueue/extension/storage/mysql" @@ -160,8 +162,11 @@ func run() error { // Create merge checker mc := newMergeChecker(logger, scope) + // Create land provider + lp := newLandProvider(logger, scope) + // Register controllers - if err := registerControllers(c, logger.Sugar(), scope, registry, mc, cnt, store); err != nil { + if err := registerControllers(c, logger.Sugar(), scope, registry, mc, lp, cnt, store); err != nil { return err } @@ -298,7 +303,7 @@ func newTopicRegistry(q extqueue.Queue, subscriberName string) (consumer.TopicRe // → merge → merge-signal // finalize (terminal) -func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope tally.Scope, registry consumer.TopicRegistry, mc mergechecker.MergeChecker, cnt counter.Counter, store storage.Storage) error { +func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope tally.Scope, registry consumer.TopicRegistry, mc mergechecker.MergeChecker, lp landprovider.LandProvider, cnt counter.Counter, store storage.Storage) error { requestController := request.NewController( logger, scope, @@ -361,6 +366,8 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t logger, scope, registry, + lp, + store, consumer.TopicKeyToMerge, "orchestrator-merge", ) @@ -418,6 +425,27 @@ func newMergeChecker(logger *zap.Logger, scope tally.Scope) mergechecker.MergeCh }) } +// newLandProvider creates a LandProvider for GitHub (github.com). +// Configured via GITHUB_TOKEN and GITHUB_API_URL environment variables. +func newLandProvider(logger *zap.Logger, scope tally.Scope) landprovider.LandProvider { + apiURL := os.Getenv("GITHUB_API_URL") + if apiURL == "" { + apiURL = "https://api.github.com" + } + + httpClient := &http.Client{} + if token := os.Getenv("GITHUB_TOKEN"); token != "" { + httpClient.Transport = &bearerTransport{token: token} + } + + return githublander.NewLandProvider(githublander.Params{ + HTTPClient: httpClient, + APIURL: apiURL, + Logger: logger.Sugar(), + MetricsScope: scope.SubScope("landprovider"), + }) +} + // bearerTransport is an http.RoundTripper that adds a Bearer token to requests. type bearerTransport struct { token string diff --git a/orchestrator/controller/batch/batch.go b/orchestrator/controller/batch/batch.go index 3c08c61d..c7d23221 100644 --- a/orchestrator/controller/batch/batch.go +++ b/orchestrator/controller/batch/batch.go @@ -106,7 +106,6 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er activeBatches, err := c.store.GetBatchStore().GetByQueueAndStates(ctx, request.Queue, []entity.BatchState{ entity.BatchStateCreated, entity.BatchStateSpeculating, - entity.BatchStateFinalizing, }) if err != nil { c.logger.Errorw("failed to get active batches", diff --git a/orchestrator/controller/merge/BUILD.bazel b/orchestrator/controller/merge/BUILD.bazel index 3f2c16b5..c7e59db4 100644 --- a/orchestrator/controller/merge/BUILD.bazel +++ b/orchestrator/controller/merge/BUILD.bazel @@ -10,6 +10,8 @@ go_library( "//core/errs", "//entity", "//entity/queue", + "//extension/landprovider", + "//extension/storage", "@com_github_uber_go_tally_v4//:tally", "@org_uber_go_zap//:zap", ], @@ -24,7 +26,10 @@ go_test( "//core/errs", "//entity", "//entity/queue", + "//extension/landprovider", + "//extension/landprovider/mock", "//extension/queue/mock", + "//extension/storage/mock", "@com_github_stretchr_testify//assert", "@com_github_stretchr_testify//require", "@com_github_uber_go_tally_v4//:tally", diff --git a/orchestrator/controller/merge/merge.go b/orchestrator/controller/merge/merge.go index f12869f0..1725e1b7 100644 --- a/orchestrator/controller/merge/merge.go +++ b/orchestrator/controller/merge/merge.go @@ -9,16 +9,22 @@ import ( "github.com/uber/submitqueue/core/errs" "github.com/uber/submitqueue/entity" entityqueue "github.com/uber/submitqueue/entity/queue" + "github.com/uber/submitqueue/extension/landprovider" + "github.com/uber/submitqueue/extension/storage" "go.uber.org/zap" ) // Controller handles merge queue messages. -// It consumes merge requests, performs merges, and publishes results to the merge signal stage. +// It consumes batches, fetches the contained requests from storage, +// lands the changes via the LandProvider, and publishes results +// to the merge-signal and speculator stages. // Implements consumer.Controller interface for integration with the consumer. type Controller struct { logger *zap.SugaredLogger metricsScope tally.Scope registry consumer.TopicRegistry + landProvider landprovider.LandProvider + storage storage.Storage topicKey consumer.TopicKey consumerGroup string } @@ -31,6 +37,8 @@ func NewController( logger *zap.SugaredLogger, scope tally.Scope, registry consumer.TopicRegistry, + landProvider landprovider.LandProvider, + storage storage.Storage, topicKey consumer.TopicKey, consumerGroup string, ) *Controller { @@ -38,75 +46,156 @@ func NewController( logger: logger.Named("merge_controller"), metricsScope: scope.SubScope("merge_controller"), registry: registry, + landProvider: landProvider, + storage: storage, topicKey: topicKey, consumerGroup: consumerGroup, } } // Process processes a merge delivery from the queue. -// Deserializes the request, performs the merge, and publishes to the merge signal topic. +// Extracts the batch ID from the message, fetches the batch from storage, +// lands the changes, and publishes the batch ID to downstream topics. // Returns nil to ack (success), or error to nack (retry). func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) error { c.metricsScope.Counter("received").Inc(1) msg := delivery.Message() + batchID := string(msg.Payload) - // Deserialize request entity - request, err := entity.RequestFromBytes(msg.Payload) - if err != nil { - c.logger.Errorw("failed to deserialize request", + c.logger.Infow("received merge event", + "batch_id", batchID, + "attempt", delivery.Attempt(), + "partition_key", msg.PartitionKey, + ) + + if batchID == "" { + c.logger.Errorw("empty batch ID in message", "message_id", msg.ID, "partition_key", msg.PartitionKey, "attempt", delivery.Attempt(), - "error", err, ) c.metricsScope.Counter("deserialize_errors").Inc(1) - // Non-retryable: malformed messages will never succeed regardless of retry count - return fmt.Errorf("failed to deserialize request: %w", err) + return fmt.Errorf("empty batch ID in message %s", msg.ID) } - c.logger.Infow("received merge event", - "request_id", request.ID, - "queue", request.Queue, - "state", string(request.State), - "version", request.Version, - "attempt", delivery.Attempt(), - "partition_key", msg.PartitionKey, - ) - - // TODO: Add merge logic - // - Perform source control merge operation - // - Handle merge conflicts - - // Publish to merge signal topic - if err := c.publish(ctx, consumer.TopicKeyMergeSignal, request); err != nil { - c.logger.Errorw("failed to publish output", - "request_id", request.ID, - "topic_key", consumer.TopicKeyMergeSignal, + // Fetch the batch from storage — this is the single source of truth. + // Terminal states mean the land outcome was already persisted on a + // previous attempt; skip straight to publish. + batch, err := c.storage.GetBatchStore().Get(ctx, batchID) + if err != nil { + c.logger.Errorw("failed to fetch batch", + "batch_id", batchID, "error", err, ) - c.metricsScope.Counter("publish_errors").Inc(1) - return errs.NewRetryableError(fmt.Errorf("failed to publish to merge-signal: %w", err)) + c.metricsScope.Counter("batch_fetch_errors").Inc(1) + return errs.NewRetryableError(fmt.Errorf("failed to fetch batch %s: %w", batchID, err)) } - c.logger.Infow("published request to next stage", - "request_id", request.ID, - "topic_key", consumer.TopicKeyMergeSignal, + if !batch.State.IsTerminal() { + newState, err := c.land(ctx, batch) + if err != nil { + return fmt.Errorf("batch %s: %w", batch.ID, err) + } + + if err := c.storage.GetBatchStore().UpdateState(ctx, batch.ID, batch.Version, newState); err != nil { + c.logger.Errorw("failed to update batch state", + "batch_id", batch.ID, + "target_state", string(newState), + "error", err, + ) + c.metricsScope.Counter("batch_update_errors").Inc(1) + return errs.NewRetryableError(fmt.Errorf("failed to update batch state: %w", err)) + } + + batch.State = newState + } else { + c.logger.Infow("batch already in terminal state, skipping to publish", + "batch_id", batch.ID, + "state", string(batch.State), + ) + c.metricsScope.Counter("idempotent_skip").Inc(1) + } + + // Publish batch ID to merge-signal and speculator topics. + // On retry, a topic that already received this message may get a duplicate; + // this is safe because downstream consumers are idempotent. + for _, key := range []consumer.TopicKey{consumer.TopicKeyMergeSignal, consumer.TopicKeyBatched} { + if err := c.publish(ctx, key, batch.ID, batch.Queue); err != nil { + c.logger.Errorw("failed to publish output", + "batch_id", batch.ID, + "topic_key", key, + "error", err, + ) + c.metricsScope.Counter("publish_errors").Inc(1) + return errs.NewRetryableError(fmt.Errorf("failed to publish to %s: %w", key, err)) + } + } + + c.logger.Infow("published batch to next stages", + "batch_id", batch.ID, + "state", string(batch.State), + "topic_keys", []string{consumer.TopicKeyMergeSignal.String(), consumer.TopicKeyBatched.String()}, ) c.metricsScope.Counter("processed").Inc(1) - - return nil // Success - message will be acked + return nil } -// publish publishes a request to the specified topic key. -func (c *Controller) publish(ctx context.Context, key consumer.TopicKey, request entity.Request) error { - payload, err := request.ToBytes() - if err != nil { - return fmt.Errorf("failed to serialize request: %w", err) +// land fetches the requests in the batch, lands them via the LandProvider, +// and classifies the outcome into a batch state. +func (c *Controller) land(ctx context.Context, batch entity.Batch) (entity.BatchState, error) { + requestStore := c.storage.GetRequestStore() + entries := make([]entity.LandEntry, 0, len(batch.Contains)) + + for _, requestID := range batch.Contains { + request, err := requestStore.Get(ctx, requestID) + if err != nil { + c.logger.Errorw("failed to fetch request", + "batch_id", batch.ID, + "request_id", requestID, + "error", err, + ) + c.metricsScope.Counter("request_fetch_errors").Inc(1) + return "", errs.NewRetryableError(fmt.Errorf("failed to fetch request %s: %w", requestID, err)) + } + + entries = append(entries, entity.LandEntry{ + Strategy: request.LandStrategy, + Change: request.Change, + }) } - msg := entityqueue.NewMessage(request.ID, payload, request.Queue, nil) + err := c.landProvider.Land(ctx, batch.Queue, entries) + + switch { + case err == nil, landprovider.IsAlreadyLanded(err): + c.logger.Infow("land succeeded", + "batch_id", batch.ID, + "request_ids", batch.Contains, + "already_landed", landprovider.IsAlreadyLanded(err), + ) + return entity.BatchStateSucceeded, nil + case landprovider.IsLandRejected(err): + c.logger.Errorw("land rejected", + "batch_id", batch.ID, + "error", err, + ) + c.metricsScope.Counter("land_rejected").Inc(1) + return entity.BatchStateFailed, nil + default: + c.logger.Errorw("land failed", + "batch_id", batch.ID, + "error", err, + ) + c.metricsScope.Counter("land_errors").Inc(1) + return "", errs.NewRetryableError(fmt.Errorf("land failed: %w", err)) + } +} + +// publish publishes a batch ID to the specified topic key. +func (c *Controller) publish(ctx context.Context, key consumer.TopicKey, id string, partitionKey string) error { + msg := entityqueue.NewMessage(id, []byte(id), partitionKey, nil) q, ok := c.registry.Queue(key) if !ok { diff --git a/orchestrator/controller/merge/merge_test.go b/orchestrator/controller/merge/merge_test.go index dd351180..bf389d34 100644 --- a/orchestrator/controller/merge/merge_test.go +++ b/orchestrator/controller/merge/merge_test.go @@ -3,6 +3,7 @@ package merge import ( "context" "fmt" + "sync/atomic" "testing" "github.com/stretchr/testify/assert" @@ -12,37 +13,30 @@ import ( "github.com/uber/submitqueue/core/errs" "github.com/uber/submitqueue/entity" "github.com/uber/submitqueue/entity/queue" + "github.com/uber/submitqueue/extension/landprovider" + landprovidermock "github.com/uber/submitqueue/extension/landprovider/mock" queuemock "github.com/uber/submitqueue/extension/queue/mock" + storagemock "github.com/uber/submitqueue/extension/storage/mock" "go.uber.org/mock/gomock" "go.uber.org/zap/zaptest" ) -// newTestController creates a controller with test dependencies. -func newTestController(t *testing.T, ctrl *gomock.Controller, publishErr error) *Controller { +func TestNewController(t *testing.T) { + ctrl := gomock.NewController(t) + lp := landprovidermock.NewMockLandProvider(ctrl) + mockStorage := storagemock.NewMockStorage(ctrl) + logger := zaptest.NewLogger(t).Sugar() scope := tally.NoopScope - - mockPub := queuemock.NewMockPublisher(ctrl) - mockPub.EXPECT().Publish(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( - func(ctx context.Context, topic string, msg queue.Message) error { - return publishErr - }, - ).AnyTimes() - - mockQ := queuemock.NewMockQueue(ctrl) - mockQ.EXPECT().Publisher().Return(mockPub).AnyTimes() - registry, err := consumer.NewTopicRegistry( - []consumer.TopicConfig{{Key: consumer.TopicKeyMergeSignal, Name: "merge-signal", Queue: mockQ}}, + []consumer.TopicConfig{ + {Key: consumer.TopicKeyMergeSignal, Name: "merge-signal", Queue: queuemock.NewMockQueue(ctrl)}, + {Key: consumer.TopicKeyBatched, Name: "batched", Queue: queuemock.NewMockQueue(ctrl)}, + }, ) require.NoError(t, err) - return NewController(logger, scope, registry, consumer.TopicKeyToMerge, "orchestrator-merge") -} - -func TestNewController(t *testing.T) { - ctrl := gomock.NewController(t) - controller := newTestController(t, ctrl, nil) + controller := NewController(logger, scope, registry, lp, mockStorage, consumer.TopicKeyToMerge, "orchestrator-merge") require.NotNil(t, controller) assert.Equal(t, consumer.TopicKeyToMerge, controller.TopicKey()) @@ -50,78 +44,412 @@ func TestNewController(t *testing.T) { assert.Equal(t, "merge", controller.Name()) } -func TestController_Process_Success(t *testing.T) { - ctrl := gomock.NewController(t) - - controller := newTestController(t, ctrl, nil) - - request := entity.Request{ +func TestController_Process(t *testing.T) { + testRequest := entity.Request{ ID: "test-queue/123", Queue: "test-queue", Change: entity.Change{URIs: []string{"github://uber/service/pull/456/abc123def"}}, LandStrategy: entity.RequestLandStrategyRebase, - State: entity.RequestStateNew, + State: entity.RequestStateProcessing, Version: 1, } - payload, err := request.ToBytes() - require.NoError(t, err) + testBatch := entity.Batch{ + ID: "test-queue/batch/1", + Queue: "test-queue", + Contains: []string{"test-queue/123"}, + State: entity.BatchStateSpeculating, + Version: 1, + } - msg := queue.NewMessage("test-queue/123", payload, "test-queue", nil) - delivery := queuemock.NewMockDelivery(ctrl) - delivery.EXPECT().Message().Return(msg).AnyTimes() - delivery.EXPECT().Attempt().Return(1).AnyTimes() + tests := []struct { + name string + setupMocks func(*gomock.Controller) (landprovider.LandProvider, *storagemock.MockStorage, error) + payload string + wantErr bool + wantRetryable bool + }{ + { + name: "success", + setupMocks: func(ctrl *gomock.Controller) (landprovider.LandProvider, *storagemock.MockStorage, error) { + lp := landprovidermock.NewMockLandProvider(ctrl) + lp.EXPECT().Land(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil) - err = controller.Process(context.Background(), delivery) - require.NoError(t, err) -} + mockReqStore := storagemock.NewMockRequestStore(ctrl) + mockReqStore.EXPECT().Get(gomock.Any(), "test-queue/123").Return(testRequest, nil) -func TestController_Process_InvalidJSON(t *testing.T) { - ctrl := gomock.NewController(t) + mockBatchStore := storagemock.NewMockBatchStore(ctrl) + mockBatchStore.EXPECT().Get(gomock.Any(), "test-queue/batch/1").Return(testBatch, nil) + mockBatchStore.EXPECT().UpdateState(gomock.Any(), "test-queue/batch/1", int32(1), entity.BatchStateSucceeded).Return(nil) - controller := newTestController(t, ctrl, nil) + mockStorage := storagemock.NewMockStorage(ctrl) + mockStorage.EXPECT().GetRequestStore().Return(mockReqStore) + mockStorage.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes() - invalidPayload := []byte(`{"invalid": json"}`) - msg := queue.NewMessage("invalid-msg", invalidPayload, "partition1", nil) - delivery := queuemock.NewMockDelivery(ctrl) - delivery.EXPECT().Message().Return(msg).AnyTimes() - delivery.EXPECT().Attempt().Return(1).AnyTimes() + return lp, mockStorage, nil + }, + payload: "test-queue/batch/1", + wantErr: false, + }, + { + name: "empty batch ID", + setupMocks: func(ctrl *gomock.Controller) (landprovider.LandProvider, *storagemock.MockStorage, error) { + lp := landprovidermock.NewMockLandProvider(ctrl) + mockStorage := storagemock.NewMockStorage(ctrl) + return lp, mockStorage, nil + }, + payload: "", + wantErr: true, + }, + { + name: "batch not found - retryable", + setupMocks: func(ctrl *gomock.Controller) (landprovider.LandProvider, *storagemock.MockStorage, error) { + lp := landprovidermock.NewMockLandProvider(ctrl) - err := controller.Process(context.Background(), delivery) + mockBatchStore := storagemock.NewMockBatchStore(ctrl) + mockBatchStore.EXPECT().Get(gomock.Any(), "test-queue/batch/1").Return(entity.Batch{}, fmt.Errorf("not found")) - require.Error(t, err) - assert.False(t, errs.IsRetryable(err)) -} + mockStorage := storagemock.NewMockStorage(ctrl) + mockStorage.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes() -func TestController_Process_PublishFailure(t *testing.T) { - ctrl := gomock.NewController(t) + return lp, mockStorage, nil + }, + payload: "test-queue/batch/1", + wantErr: true, + wantRetryable: true, + }, + { + name: "idempotent - already succeeded", + setupMocks: func(ctrl *gomock.Controller) (landprovider.LandProvider, *storagemock.MockStorage, error) { + lp := landprovidermock.NewMockLandProvider(ctrl) + + succeededBatch := testBatch + succeededBatch.State = entity.BatchStateSucceeded + succeededBatch.Version = 2 + + mockBatchStore := storagemock.NewMockBatchStore(ctrl) + mockBatchStore.EXPECT().Get(gomock.Any(), "test-queue/batch/1").Return(succeededBatch, nil) + + mockStorage := storagemock.NewMockStorage(ctrl) + mockStorage.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes() + + return lp, mockStorage, nil + }, + payload: "test-queue/batch/1", + wantErr: false, + }, + { + name: "idempotent - already failed", + setupMocks: func(ctrl *gomock.Controller) (landprovider.LandProvider, *storagemock.MockStorage, error) { + lp := landprovidermock.NewMockLandProvider(ctrl) + + failedBatch := testBatch + failedBatch.State = entity.BatchStateFailed + failedBatch.Version = 2 + + mockBatchStore := storagemock.NewMockBatchStore(ctrl) + mockBatchStore.EXPECT().Get(gomock.Any(), "test-queue/batch/1").Return(failedBatch, nil) + + mockStorage := storagemock.NewMockStorage(ctrl) + mockStorage.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes() + + return lp, mockStorage, nil + }, + payload: "test-queue/batch/1", + wantErr: false, + }, + { + name: "request not found - retryable", + setupMocks: func(ctrl *gomock.Controller) (landprovider.LandProvider, *storagemock.MockStorage, error) { + lp := landprovidermock.NewMockLandProvider(ctrl) + + mockReqStore := storagemock.NewMockRequestStore(ctrl) + mockReqStore.EXPECT().Get(gomock.Any(), "test-queue/123").Return(entity.Request{}, fmt.Errorf("not found")) + + mockBatchStore := storagemock.NewMockBatchStore(ctrl) + mockBatchStore.EXPECT().Get(gomock.Any(), "test-queue/batch/1").Return(testBatch, nil) + + mockStorage := storagemock.NewMockStorage(ctrl) + mockStorage.EXPECT().GetRequestStore().Return(mockReqStore) + mockStorage.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes() + + return lp, mockStorage, nil + }, + payload: "test-queue/batch/1", + wantErr: true, + wantRetryable: true, + }, + { + name: "land rejected - batch marked as failed and published", + setupMocks: func(ctrl *gomock.Controller) (landprovider.LandProvider, *storagemock.MockStorage, error) { + lp := landprovidermock.NewMockLandProvider(ctrl) + lp.EXPECT().Land(gomock.Any(), gomock.Any(), gomock.Any()).Return(landprovider.WrapLandRejected(fmt.Errorf("merge conflict"))) + + mockReqStore := storagemock.NewMockRequestStore(ctrl) + mockReqStore.EXPECT().Get(gomock.Any(), "test-queue/123").Return(testRequest, nil) + + mockBatchStore := storagemock.NewMockBatchStore(ctrl) + mockBatchStore.EXPECT().Get(gomock.Any(), "test-queue/batch/1").Return(testBatch, nil) + mockBatchStore.EXPECT().UpdateState(gomock.Any(), "test-queue/batch/1", int32(1), entity.BatchStateFailed).Return(nil) + + mockStorage := storagemock.NewMockStorage(ctrl) + mockStorage.EXPECT().GetRequestStore().Return(mockReqStore) + mockStorage.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes() + + return lp, mockStorage, nil + }, + payload: "test-queue/batch/1", + wantErr: false, + }, + { + name: "already landed - classified as success", + setupMocks: func(ctrl *gomock.Controller) (landprovider.LandProvider, *storagemock.MockStorage, error) { + lp := landprovidermock.NewMockLandProvider(ctrl) + lp.EXPECT().Land(gomock.Any(), gomock.Any(), gomock.Any()).Return(landprovider.ErrAlreadyLanded) + + mockReqStore := storagemock.NewMockRequestStore(ctrl) + mockReqStore.EXPECT().Get(gomock.Any(), "test-queue/123").Return(testRequest, nil) + + mockBatchStore := storagemock.NewMockBatchStore(ctrl) + mockBatchStore.EXPECT().Get(gomock.Any(), "test-queue/batch/1").Return(testBatch, nil) + mockBatchStore.EXPECT().UpdateState(gomock.Any(), "test-queue/batch/1", int32(1), entity.BatchStateSucceeded).Return(nil) + + mockStorage := storagemock.NewMockStorage(ctrl) + mockStorage.EXPECT().GetRequestStore().Return(mockReqStore) + mockStorage.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes() + + return lp, mockStorage, nil + }, + payload: "test-queue/batch/1", + wantErr: false, + }, + { + name: "land error - retryable", + setupMocks: func(ctrl *gomock.Controller) (landprovider.LandProvider, *storagemock.MockStorage, error) { + lp := landprovidermock.NewMockLandProvider(ctrl) + lp.EXPECT().Land(gomock.Any(), gomock.Any(), gomock.Any()).Return(fmt.Errorf("generic error")) + + mockReqStore := storagemock.NewMockRequestStore(ctrl) + mockReqStore.EXPECT().Get(gomock.Any(), "test-queue/123").Return(testRequest, nil) - controller := newTestController(t, ctrl, fmt.Errorf("publish failed")) + mockBatchStore := storagemock.NewMockBatchStore(ctrl) + mockBatchStore.EXPECT().Get(gomock.Any(), "test-queue/batch/1").Return(testBatch, nil) - request := entity.Request{ + mockStorage := storagemock.NewMockStorage(ctrl) + mockStorage.EXPECT().GetRequestStore().Return(mockReqStore) + mockStorage.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes() + + return lp, mockStorage, nil + }, + payload: "test-queue/batch/1", + wantErr: true, + wantRetryable: true, + }, + { + name: "batch update error after successful land - retryable", + setupMocks: func(ctrl *gomock.Controller) (landprovider.LandProvider, *storagemock.MockStorage, error) { + lp := landprovidermock.NewMockLandProvider(ctrl) + lp.EXPECT().Land(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil) + + mockReqStore := storagemock.NewMockRequestStore(ctrl) + mockReqStore.EXPECT().Get(gomock.Any(), "test-queue/123").Return(testRequest, nil) + + mockBatchStore := storagemock.NewMockBatchStore(ctrl) + mockBatchStore.EXPECT().Get(gomock.Any(), "test-queue/batch/1").Return(testBatch, nil) + mockBatchStore.EXPECT().UpdateState(gomock.Any(), "test-queue/batch/1", int32(1), entity.BatchStateSucceeded).Return(fmt.Errorf("version mismatch")) + + mockStorage := storagemock.NewMockStorage(ctrl) + mockStorage.EXPECT().GetRequestStore().Return(mockReqStore) + mockStorage.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes() + + return lp, mockStorage, nil + }, + payload: "test-queue/batch/1", + wantErr: true, + wantRetryable: true, + }, + { + name: "batch update error after land rejection - retryable", + setupMocks: func(ctrl *gomock.Controller) (landprovider.LandProvider, *storagemock.MockStorage, error) { + lp := landprovidermock.NewMockLandProvider(ctrl) + lp.EXPECT().Land(gomock.Any(), gomock.Any(), gomock.Any()).Return(landprovider.WrapLandRejected(fmt.Errorf("merge conflict"))) + + mockReqStore := storagemock.NewMockRequestStore(ctrl) + mockReqStore.EXPECT().Get(gomock.Any(), "test-queue/123").Return(testRequest, nil) + + mockBatchStore := storagemock.NewMockBatchStore(ctrl) + mockBatchStore.EXPECT().Get(gomock.Any(), "test-queue/batch/1").Return(testBatch, nil) + mockBatchStore.EXPECT().UpdateState(gomock.Any(), "test-queue/batch/1", int32(1), entity.BatchStateFailed).Return(fmt.Errorf("version mismatch")) + + mockStorage := storagemock.NewMockStorage(ctrl) + mockStorage.EXPECT().GetRequestStore().Return(mockReqStore) + mockStorage.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes() + + return lp, mockStorage, nil + }, + payload: "test-queue/batch/1", + wantErr: true, + wantRetryable: true, + }, + { + name: "publish failure - retryable", + setupMocks: func(ctrl *gomock.Controller) (landprovider.LandProvider, *storagemock.MockStorage, error) { + lp := landprovidermock.NewMockLandProvider(ctrl) + lp.EXPECT().Land(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil) + + mockReqStore := storagemock.NewMockRequestStore(ctrl) + mockReqStore.EXPECT().Get(gomock.Any(), "test-queue/123").Return(testRequest, nil) + + mockBatchStore := storagemock.NewMockBatchStore(ctrl) + mockBatchStore.EXPECT().Get(gomock.Any(), "test-queue/batch/1").Return(testBatch, nil) + mockBatchStore.EXPECT().UpdateState(gomock.Any(), "test-queue/batch/1", int32(1), entity.BatchStateSucceeded).Return(nil) + + mockStorage := storagemock.NewMockStorage(ctrl) + mockStorage.EXPECT().GetRequestStore().Return(mockReqStore) + mockStorage.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes() + + return lp, mockStorage, fmt.Errorf("publish failed") + }, + payload: "test-queue/batch/1", + wantErr: true, + wantRetryable: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctrl := gomock.NewController(t) + lp, mockStorage, publishErr := tt.setupMocks(ctrl) + + logger := zaptest.NewLogger(t).Sugar() + scope := tally.NoopScope + + mockPub := queuemock.NewMockPublisher(ctrl) + mockPub.EXPECT().Publish(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( + func(ctx context.Context, topic string, msg queue.Message) error { + return publishErr + }, + ).AnyTimes() + + mockQ := queuemock.NewMockQueue(ctrl) + mockQ.EXPECT().Publisher().Return(mockPub).AnyTimes() + + registry, err := consumer.NewTopicRegistry( + []consumer.TopicConfig{ + {Key: consumer.TopicKeyMergeSignal, Name: "merge-signal", Queue: mockQ}, + {Key: consumer.TopicKeyBatched, Name: "batched", Queue: mockQ}, + }, + ) + require.NoError(t, err) + + controller := NewController(logger, scope, registry, lp, mockStorage, consumer.TopicKeyToMerge, "orchestrator-merge") + + msg := queue.NewMessage("test-msg-id", []byte(tt.payload), "test-queue", nil) + delivery := queuemock.NewMockDelivery(ctrl) + delivery.EXPECT().Message().Return(msg).AnyTimes() + delivery.EXPECT().Attempt().Return(1).AnyTimes() + + err = controller.Process(context.Background(), delivery) + + if tt.wantErr { + require.Error(t, err) + assert.Equal(t, tt.wantRetryable, errs.IsRetryable(err), "retryability mismatch") + } else { + require.NoError(t, err) + } + }) + } +} + +func TestController_Process_PartialPublishFailure(t *testing.T) { + testRequest := entity.Request{ ID: "test-queue/123", Queue: "test-queue", - Change: entity.Change{URIs: []string{"github://uber/service/pull/1/xyz789abc"}}, + Change: entity.Change{URIs: []string{"github://uber/service/pull/456/abc123def"}}, LandStrategy: entity.RequestLandStrategyRebase, - State: entity.RequestStateNew, + State: entity.RequestStateProcessing, Version: 1, } - payload, err := request.ToBytes() + testBatch := entity.Batch{ + ID: "test-queue/batch/1", + Queue: "test-queue", + Contains: []string{"test-queue/123"}, + State: entity.BatchStateSpeculating, + Version: 1, + } + + ctrl := gomock.NewController(t) + + lp := landprovidermock.NewMockLandProvider(ctrl) + lp.EXPECT().Land(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil) + + mockReqStore := storagemock.NewMockRequestStore(ctrl) + mockReqStore.EXPECT().Get(gomock.Any(), "test-queue/123").Return(testRequest, nil) + + mockBatchStore := storagemock.NewMockBatchStore(ctrl) + mockBatchStore.EXPECT().Get(gomock.Any(), "test-queue/batch/1").Return(testBatch, nil) + mockBatchStore.EXPECT().UpdateState(gomock.Any(), "test-queue/batch/1", int32(1), entity.BatchStateSucceeded).Return(nil) + + mockStorage := storagemock.NewMockStorage(ctrl) + mockStorage.EXPECT().GetRequestStore().Return(mockReqStore) + mockStorage.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes() + + // First publish succeeds, second fails. + var publishCount atomic.Int32 + mockPub := queuemock.NewMockPublisher(ctrl) + mockPub.EXPECT().Publish(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( + func(ctx context.Context, topic string, msg queue.Message) error { + if publishCount.Add(1) == 2 { + return fmt.Errorf("second publish failed") + } + return nil + }, + ).AnyTimes() + + mockQ := queuemock.NewMockQueue(ctrl) + mockQ.EXPECT().Publisher().Return(mockPub).AnyTimes() + + logger := zaptest.NewLogger(t).Sugar() + scope := tally.NoopScope + + registry, err := consumer.NewTopicRegistry( + []consumer.TopicConfig{ + {Key: consumer.TopicKeyMergeSignal, Name: "merge-signal", Queue: mockQ}, + {Key: consumer.TopicKeyBatched, Name: "batched", Queue: mockQ}, + }, + ) require.NoError(t, err) - msg := queue.NewMessage(request.ID, payload, request.Queue, nil) + controller := NewController(logger, scope, registry, lp, mockStorage, consumer.TopicKeyToMerge, "orchestrator-merge") + + msg := queue.NewMessage("test-msg-id", []byte("test-queue/batch/1"), "test-queue", nil) delivery := queuemock.NewMockDelivery(ctrl) delivery.EXPECT().Message().Return(msg).AnyTimes() delivery.EXPECT().Attempt().Return(1).AnyTimes() err = controller.Process(context.Background(), delivery) - assert.Error(t, err) + + require.Error(t, err) + assert.True(t, errs.IsRetryable(err), "partial publish failure should be retryable") } func TestController_InterfaceImplementation(t *testing.T) { ctrl := gomock.NewController(t) - controller := newTestController(t, ctrl, nil) + lp := landprovidermock.NewMockLandProvider(ctrl) + mockStorage := storagemock.NewMockStorage(ctrl) + + logger := zaptest.NewLogger(t).Sugar() + scope := tally.NoopScope + registry, err := consumer.NewTopicRegistry( + []consumer.TopicConfig{ + {Key: consumer.TopicKeyMergeSignal, Name: "merge-signal", Queue: queuemock.NewMockQueue(ctrl)}, + {Key: consumer.TopicKeyBatched, Name: "batched", Queue: queuemock.NewMockQueue(ctrl)}, + }, + ) + require.NoError(t, err) + + controller := NewController(logger, scope, registry, lp, mockStorage, consumer.TopicKeyToMerge, "orchestrator-merge") var _ consumer.Controller = controller }