From 3090b307c3bded422976805d5cc39ba6b4762467 Mon Sep 17 00:00:00 2001 From: "kevin.new" Date: Fri, 27 Feb 2026 19:57:57 +0000 Subject: [PATCH 1/5] feat(controller): hooking up merge controller --- entity/batch.go | 8 +- example/server/orchestrator/main.go | 2 + orchestrator/controller/batch/batch.go | 2 +- orchestrator/controller/merge/BUILD.bazel | 7 +- orchestrator/controller/merge/merge.go | 158 +++++++-- orchestrator/controller/merge/merge_test.go | 361 ++++++++++++++++---- 6 files changed, 439 insertions(+), 99 deletions(-) diff --git a/entity/batch.go b/entity/batch.go index 8d47c677..6ae2ff0d 100644 --- a/entity/batch.go +++ b/entity/batch.go @@ -12,8 +12,12 @@ const ( BatchStateCreated BatchState = "created" // BatchStateSpeculating is the state of a batch that is undergoing speculative execution. BatchStateSpeculating BatchState = "speculating" - // BatchStateFinalizing is the state of a batch that is being finalized after speculative execution. - BatchStateFinalizing BatchState = "finalizing" + // BatchStateLanding is the state of a batch that is being landed. + BatchStateLanding BatchState = "landing" + // BatchStateLandFailed is the state of a batch that has failed to land. + BatchStateLandFailed BatchState = "land_failed" + // BatchStateLandSucceeded is the state of a batch that has successfully landed. + BatchStateLandSucceeded BatchState = "land_succeeded" // BatchStateSucceeded is the terminal state of a batch that has been successfully landed. BatchStateSucceeded BatchState = "succeeded" // BatchStateFailed is the terminal state of a batch that has failed. diff --git a/example/server/orchestrator/main.go b/example/server/orchestrator/main.go index 7e500b5f..0d2de08a 100644 --- a/example/server/orchestrator/main.go +++ b/example/server/orchestrator/main.go @@ -361,6 +361,8 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t logger, scope, registry, + nil, // TODO: wire LandProvider implementation + nil, // TODO: wire Storage implementation consumer.TopicKeyToMerge, "orchestrator-merge", ) diff --git a/orchestrator/controller/batch/batch.go b/orchestrator/controller/batch/batch.go index 3c08c61d..114cdeff 100644 --- a/orchestrator/controller/batch/batch.go +++ b/orchestrator/controller/batch/batch.go @@ -106,7 +106,7 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er activeBatches, err := c.store.GetBatchStore().GetByQueueAndStates(ctx, request.Queue, []entity.BatchState{ entity.BatchStateCreated, entity.BatchStateSpeculating, - entity.BatchStateFinalizing, + entity.BatchStateLanding, }) if err != nil { c.logger.Errorw("failed to get active batches", diff --git a/orchestrator/controller/merge/BUILD.bazel b/orchestrator/controller/merge/BUILD.bazel index 3f2c16b5..047cd6b9 100644 --- a/orchestrator/controller/merge/BUILD.bazel +++ b/orchestrator/controller/merge/BUILD.bazel @@ -7,9 +7,10 @@ go_library( visibility = ["//visibility:public"], deps = [ "//core/consumer", - "//core/errs", "//entity", "//entity/queue", + "//extension/landprovider", + "//extension/storage", "@com_github_uber_go_tally_v4//:tally", "@org_uber_go_zap//:zap", ], @@ -21,10 +22,12 @@ go_test( embed = [":merge"], deps = [ "//core/consumer", - "//core/errs", "//entity", "//entity/queue", + "//extension/landprovider", + "//extension/landprovider/mock", "//extension/queue/mock", + "//extension/storage/mock", "@com_github_stretchr_testify//assert", "@com_github_stretchr_testify//require", "@com_github_uber_go_tally_v4//:tally", diff --git a/orchestrator/controller/merge/merge.go b/orchestrator/controller/merge/merge.go index f12869f0..6dc94871 100644 --- a/orchestrator/controller/merge/merge.go +++ b/orchestrator/controller/merge/merge.go @@ -6,19 +6,23 @@ import ( "github.com/uber-go/tally/v4" "github.com/uber/submitqueue/core/consumer" - "github.com/uber/submitqueue/core/errs" "github.com/uber/submitqueue/entity" entityqueue "github.com/uber/submitqueue/entity/queue" + "github.com/uber/submitqueue/extension/landprovider" + "github.com/uber/submitqueue/extension/storage" "go.uber.org/zap" ) // Controller handles merge queue messages. -// It consumes merge requests, performs merges, and publishes results to the merge signal stage. +// It consumes batches, fetches the contained requests from storage, +// lands the changes via the LandProvider, and publishes results to the merge signal stage. // Implements consumer.Controller interface for integration with the consumer. type Controller struct { logger *zap.SugaredLogger metricsScope tally.Scope registry consumer.TopicRegistry + landProvider landprovider.LandProvider + storage storage.Storage topicKey consumer.TopicKey consumerGroup string } @@ -31,6 +35,8 @@ func NewController( logger *zap.SugaredLogger, scope tally.Scope, registry consumer.TopicRegistry, + landProvider landprovider.LandProvider, + storage storage.Storage, topicKey consumer.TopicKey, consumerGroup string, ) *Controller { @@ -38,75 +44,175 @@ func NewController( logger: logger.Named("merge_controller"), metricsScope: scope.SubScope("merge_controller"), registry: registry, + landProvider: landProvider, + storage: storage, topicKey: topicKey, consumerGroup: consumerGroup, } } // Process processes a merge delivery from the queue. -// Deserializes the request, performs the merge, and publishes to the merge signal topic. +// Deserializes the batch, fetches contained requests, lands the changes, +// and publishes the batch to the merge signal topic. // Returns nil to ack (success), or error to nack (retry). func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) error { c.metricsScope.Counter("received").Inc(1) msg := delivery.Message() - // Deserialize request entity - request, err := entity.RequestFromBytes(msg.Payload) + // Deserialize batch entity + batch, err := entity.BatchFromBytes(msg.Payload) if err != nil { - c.logger.Errorw("failed to deserialize request", + c.logger.Errorw("failed to deserialize batch", "message_id", msg.ID, "partition_key", msg.PartitionKey, "attempt", delivery.Attempt(), "error", err, ) c.metricsScope.Counter("deserialize_errors").Inc(1) - // Non-retryable: malformed messages will never succeed regardless of retry count - return fmt.Errorf("failed to deserialize request: %w", err) + // Non-retryable: malformed messages will never succeed + return fmt.Errorf("failed to deserialize batch: %w", err) } c.logger.Infow("received merge event", - "request_id", request.ID, - "queue", request.Queue, - "state", string(request.State), - "version", request.Version, + "batch_id", batch.ID, + "queue", batch.Queue, + "request_count", len(batch.Contains), + "state", string(batch.State), + "version", batch.Version, "attempt", delivery.Attempt(), "partition_key", msg.PartitionKey, ) - // TODO: Add merge logic - // - Perform source control merge operation - // - Handle merge conflicts + // Idempotency guard: re-fetch the batch from storage to check current state. + // On retries (e.g., land succeeded but publish failed), this prevents calling + // Land again for changes that were already merged. + currentBatch, err := c.storage.GetBatchStore().Get(ctx, batch.ID) + if err != nil { + c.logger.Errorw("failed to fetch current batch state", + "batch_id", batch.ID, + "error", err, + ) + c.metricsScope.Counter("batch_fetch_errors").Inc(1) + return fmt.Errorf("failed to fetch batch %s: %w", batch.ID, err) + } + + // Use the current version from storage for subsequent operations + batch = currentBatch + + switch batch.State { + case entity.BatchStateLandSucceeded, entity.BatchStateLandFailed: + c.logger.Infow("batch already landed, skipping to publish", + "batch_id", batch.ID, + "state", string(batch.State), + ) + c.metricsScope.Counter("idempotent_skip").Inc(1) + case entity.BatchStateLanding: + newState, err := c.land(ctx, batch) + if err != nil { + return err + } + + if err := c.storage.GetBatchStore().UpdateState(ctx, batch.ID, batch.Version, newState); err != nil { + c.logger.Errorw("failed to update batch state", + "batch_id", batch.ID, + "target_state", string(newState), + "error", err, + ) + c.metricsScope.Counter("batch_update_errors").Inc(1) + return fmt.Errorf("failed to update batch state: %w", err) + } + + batch.State = newState + batch.Version++ + default: + c.logger.Errorw("unexpected batch state for merge", + "batch_id", batch.ID, + "state", string(batch.State), + ) + c.metricsScope.Counter("unexpected_state").Inc(1) + return fmt.Errorf("unexpected batch state %s for batch %s", batch.State, batch.ID) + } // Publish to merge signal topic - if err := c.publish(ctx, consumer.TopicKeyMergeSignal, request); err != nil { + if err := c.publish(ctx, consumer.TopicKeyMergeSignal, batch); err != nil { c.logger.Errorw("failed to publish output", - "request_id", request.ID, + "batch_id", batch.ID, "topic_key", consumer.TopicKeyMergeSignal, "error", err, ) c.metricsScope.Counter("publish_errors").Inc(1) - return errs.NewRetryableError(fmt.Errorf("failed to publish to merge-signal: %w", err)) + return fmt.Errorf("failed to publish to merge-signal: %w", err) } - c.logger.Infow("published request to next stage", - "request_id", request.ID, + c.logger.Infow("published batch to next stage", + "batch_id", batch.ID, + "state", string(batch.State), "topic_key", consumer.TopicKeyMergeSignal, ) c.metricsScope.Counter("processed").Inc(1) + return nil +} + +// land fetches the requests in the batch, lands them via the LandProvider, +// and classifies the outcome into a batch state. +func (c *Controller) land(ctx context.Context, batch entity.Batch) (entity.BatchState, error) { + requestStore := c.storage.GetRequestStore() + entries := make([]landprovider.LandEntry, 0, len(batch.Contains)) + + for _, requestID := range batch.Contains { + request, err := requestStore.Get(ctx, requestID) + if err != nil { + c.logger.Errorw("failed to fetch request", + "batch_id", batch.ID, + "request_id", requestID, + "error", err, + ) + c.metricsScope.Counter("request_fetch_errors").Inc(1) + return "", fmt.Errorf("failed to fetch request %s: %w", requestID, err) + } + + entries = append(entries, landprovider.LandEntry{ + Strategy: request.LandStrategy, + Change: request.Change, + }) + } - return nil // Success - message will be acked + landResult, err := c.landProvider.Land(ctx, batch.Queue, entries) + + switch { + case err == nil: + c.logger.Infow("land succeeded", + "batch_id", batch.ID, + "sha", landResult.SHA, + ) + return entity.BatchStateLandSucceeded, nil + case landprovider.IsLandRejected(err): + c.logger.Errorw("land rejected", + "batch_id", batch.ID, + "error", err, + ) + c.metricsScope.Counter("land_rejected").Inc(1) + return entity.BatchStateLandFailed, nil + default: + c.logger.Errorw("land failed", + "batch_id", batch.ID, + "error", err, + ) + c.metricsScope.Counter("land_errors").Inc(1) + return "", fmt.Errorf("land failed: %w", err) + } } -// publish publishes a request to the specified topic key. -func (c *Controller) publish(ctx context.Context, key consumer.TopicKey, request entity.Request) error { - payload, err := request.ToBytes() +// publish publishes a batch to the specified topic key. +func (c *Controller) publish(ctx context.Context, key consumer.TopicKey, batch entity.Batch) error { + payload, err := batch.ToBytes() if err != nil { - return fmt.Errorf("failed to serialize request: %w", err) + return fmt.Errorf("failed to serialize batch: %w", err) } - msg := entityqueue.NewMessage(request.ID, payload, request.Queue, nil) + msg := entityqueue.NewMessage(batch.ID, payload, batch.Queue, nil) q, ok := c.registry.Queue(key) if !ok { diff --git a/orchestrator/controller/merge/merge_test.go b/orchestrator/controller/merge/merge_test.go index dd351180..f93d7e47 100644 --- a/orchestrator/controller/merge/merge_test.go +++ b/orchestrator/controller/merge/merge_test.go @@ -9,40 +9,29 @@ import ( "github.com/stretchr/testify/require" "github.com/uber-go/tally/v4" "github.com/uber/submitqueue/core/consumer" - "github.com/uber/submitqueue/core/errs" "github.com/uber/submitqueue/entity" "github.com/uber/submitqueue/entity/queue" + "github.com/uber/submitqueue/extension/landprovider" + landprovidermock "github.com/uber/submitqueue/extension/landprovider/mock" queuemock "github.com/uber/submitqueue/extension/queue/mock" + storagemock "github.com/uber/submitqueue/extension/storage/mock" "go.uber.org/mock/gomock" "go.uber.org/zap/zaptest" ) -// newTestController creates a controller with test dependencies. -func newTestController(t *testing.T, ctrl *gomock.Controller, publishErr error) *Controller { +func TestNewController(t *testing.T) { + ctrl := gomock.NewController(t) + lp := landprovidermock.NewMockLandProvider(ctrl) + mockStorage := storagemock.NewMockStorage(ctrl) + logger := zaptest.NewLogger(t).Sugar() scope := tally.NoopScope - - mockPub := queuemock.NewMockPublisher(ctrl) - mockPub.EXPECT().Publish(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( - func(ctx context.Context, topic string, msg queue.Message) error { - return publishErr - }, - ).AnyTimes() - - mockQ := queuemock.NewMockQueue(ctrl) - mockQ.EXPECT().Publisher().Return(mockPub).AnyTimes() - registry, err := consumer.NewTopicRegistry( - []consumer.TopicConfig{{Key: consumer.TopicKeyMergeSignal, Name: "merge-signal", Queue: mockQ}}, + []consumer.TopicConfig{{Key: consumer.TopicKeyMergeSignal, Name: "merge-signal", Queue: queuemock.NewMockQueue(ctrl)}}, ) require.NoError(t, err) - return NewController(logger, scope, registry, consumer.TopicKeyToMerge, "orchestrator-merge") -} - -func TestNewController(t *testing.T) { - ctrl := gomock.NewController(t) - controller := newTestController(t, ctrl, nil) + controller := NewController(logger, scope, registry, lp, mockStorage, consumer.TopicKeyToMerge, "orchestrator-merge") require.NotNil(t, controller) assert.Equal(t, consumer.TopicKeyToMerge, controller.TopicKey()) @@ -50,78 +39,314 @@ func TestNewController(t *testing.T) { assert.Equal(t, "merge", controller.Name()) } -func TestController_Process_Success(t *testing.T) { - ctrl := gomock.NewController(t) - - controller := newTestController(t, ctrl, nil) - - request := entity.Request{ +func TestController_Process(t *testing.T) { + testRequest := entity.Request{ ID: "test-queue/123", Queue: "test-queue", Change: entity.Change{URIs: []string{"github://uber/service/pull/456/abc123def"}}, LandStrategy: entity.RequestLandStrategyRebase, - State: entity.RequestStateNew, + State: entity.RequestStateProcessing, Version: 1, } - payload, err := request.ToBytes() - require.NoError(t, err) + testBatch := entity.Batch{ + ID: "test-queue/batch/1", + Queue: "test-queue", + Contains: []string{"test-queue/123"}, + State: entity.BatchStateLanding, + Version: 1, + } - msg := queue.NewMessage("test-queue/123", payload, "test-queue", nil) - delivery := queuemock.NewMockDelivery(ctrl) - delivery.EXPECT().Message().Return(msg).AnyTimes() - delivery.EXPECT().Attempt().Return(1).AnyTimes() + tests := []struct { + name string + setupMocks func(*gomock.Controller) (landprovider.LandProvider, *storagemock.MockStorage, error) + payload []byte + wantErr bool + }{ + { + name: "success", + setupMocks: func(ctrl *gomock.Controller) (landprovider.LandProvider, *storagemock.MockStorage, error) { + lp := landprovidermock.NewMockLandProvider(ctrl) + lp.EXPECT().Land(gomock.Any(), gomock.Any(), gomock.Any()).Return(landprovider.Result{SHA: "abc123def"}, nil) - err = controller.Process(context.Background(), delivery) - require.NoError(t, err) -} + mockReqStore := storagemock.NewMockRequestStore(ctrl) + mockReqStore.EXPECT().Get(gomock.Any(), "test-queue/123").Return(testRequest, nil) -func TestController_Process_InvalidJSON(t *testing.T) { - ctrl := gomock.NewController(t) + mockBatchStore := storagemock.NewMockBatchStore(ctrl) + mockBatchStore.EXPECT().Get(gomock.Any(), "test-queue/batch/1").Return(testBatch, nil) + mockBatchStore.EXPECT().UpdateState(gomock.Any(), "test-queue/batch/1", int32(1), entity.BatchStateLandSucceeded).Return(nil) - controller := newTestController(t, ctrl, nil) + mockStorage := storagemock.NewMockStorage(ctrl) + mockStorage.EXPECT().GetRequestStore().Return(mockReqStore) + mockStorage.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes() - invalidPayload := []byte(`{"invalid": json"}`) - msg := queue.NewMessage("invalid-msg", invalidPayload, "partition1", nil) - delivery := queuemock.NewMockDelivery(ctrl) - delivery.EXPECT().Message().Return(msg).AnyTimes() - delivery.EXPECT().Attempt().Return(1).AnyTimes() + return lp, mockStorage, nil + }, + payload: func() []byte { + b, _ := testBatch.ToBytes() + return b + }(), + wantErr: false, + }, + { + name: "invalid json", + setupMocks: func(ctrl *gomock.Controller) (landprovider.LandProvider, *storagemock.MockStorage, error) { + lp := landprovidermock.NewMockLandProvider(ctrl) + mockStorage := storagemock.NewMockStorage(ctrl) + return lp, mockStorage, nil + }, + payload: []byte(`{"invalid": json"}`), + wantErr: true, + }, + { + name: "idempotent - already land_succeeded", + setupMocks: func(ctrl *gomock.Controller) (landprovider.LandProvider, *storagemock.MockStorage, error) { + lp := landprovidermock.NewMockLandProvider(ctrl) - err := controller.Process(context.Background(), delivery) + succeededBatch := testBatch + succeededBatch.State = entity.BatchStateLandSucceeded + succeededBatch.Version = 2 - require.Error(t, err) - assert.False(t, errs.IsRetryable(err)) -} + mockBatchStore := storagemock.NewMockBatchStore(ctrl) + mockBatchStore.EXPECT().Get(gomock.Any(), "test-queue/batch/1").Return(succeededBatch, nil) -func TestController_Process_PublishFailure(t *testing.T) { - ctrl := gomock.NewController(t) + mockStorage := storagemock.NewMockStorage(ctrl) + mockStorage.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes() - controller := newTestController(t, ctrl, fmt.Errorf("publish failed")) + return lp, mockStorage, nil + }, + payload: func() []byte { + b, _ := testBatch.ToBytes() + return b + }(), + wantErr: false, + }, + { + name: "idempotent - already land_failed", + setupMocks: func(ctrl *gomock.Controller) (landprovider.LandProvider, *storagemock.MockStorage, error) { + lp := landprovidermock.NewMockLandProvider(ctrl) - request := entity.Request{ - ID: "test-queue/123", - Queue: "test-queue", - Change: entity.Change{URIs: []string{"github://uber/service/pull/1/xyz789abc"}}, - LandStrategy: entity.RequestLandStrategyRebase, - State: entity.RequestStateNew, - Version: 1, + failedBatch := testBatch + failedBatch.State = entity.BatchStateLandFailed + failedBatch.Version = 2 + + mockBatchStore := storagemock.NewMockBatchStore(ctrl) + mockBatchStore.EXPECT().Get(gomock.Any(), "test-queue/batch/1").Return(failedBatch, nil) + + mockStorage := storagemock.NewMockStorage(ctrl) + mockStorage.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes() + + return lp, mockStorage, nil + }, + payload: func() []byte { + b, _ := testBatch.ToBytes() + return b + }(), + wantErr: false, + }, + { + name: "request not found", + setupMocks: func(ctrl *gomock.Controller) (landprovider.LandProvider, *storagemock.MockStorage, error) { + lp := landprovidermock.NewMockLandProvider(ctrl) + + mockReqStore := storagemock.NewMockRequestStore(ctrl) + mockReqStore.EXPECT().Get(gomock.Any(), "test-queue/123").Return(entity.Request{}, fmt.Errorf("not found")) + + mockBatchStore := storagemock.NewMockBatchStore(ctrl) + mockBatchStore.EXPECT().Get(gomock.Any(), "test-queue/batch/1").Return(testBatch, nil) + + mockStorage := storagemock.NewMockStorage(ctrl) + mockStorage.EXPECT().GetRequestStore().Return(mockReqStore) + mockStorage.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes() + + return lp, mockStorage, nil + }, + payload: func() []byte { + b, _ := testBatch.ToBytes() + return b + }(), + wantErr: true, + }, + { + name: "land rejected - batch marked as failed and published", + setupMocks: func(ctrl *gomock.Controller) (landprovider.LandProvider, *storagemock.MockStorage, error) { + lp := landprovidermock.NewMockLandProvider(ctrl) + lp.EXPECT().Land(gomock.Any(), gomock.Any(), gomock.Any()).Return(landprovider.Result{}, landprovider.WrapLandRejected(fmt.Errorf("merge conflict"))) + + mockReqStore := storagemock.NewMockRequestStore(ctrl) + mockReqStore.EXPECT().Get(gomock.Any(), "test-queue/123").Return(testRequest, nil) + + mockBatchStore := storagemock.NewMockBatchStore(ctrl) + mockBatchStore.EXPECT().Get(gomock.Any(), "test-queue/batch/1").Return(testBatch, nil) + mockBatchStore.EXPECT().UpdateState(gomock.Any(), "test-queue/batch/1", int32(1), entity.BatchStateLandFailed).Return(nil) + + mockStorage := storagemock.NewMockStorage(ctrl) + mockStorage.EXPECT().GetRequestStore().Return(mockReqStore) + mockStorage.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes() + + return lp, mockStorage, nil + }, + payload: func() []byte { + b, _ := testBatch.ToBytes() + return b + }(), + wantErr: false, // Acked — terminal failure handled + }, + { + name: "land error - not rejected", + setupMocks: func(ctrl *gomock.Controller) (landprovider.LandProvider, *storagemock.MockStorage, error) { + lp := landprovidermock.NewMockLandProvider(ctrl) + lp.EXPECT().Land(gomock.Any(), gomock.Any(), gomock.Any()).Return(landprovider.Result{}, fmt.Errorf("generic error")) + + mockReqStore := storagemock.NewMockRequestStore(ctrl) + mockReqStore.EXPECT().Get(gomock.Any(), "test-queue/123").Return(testRequest, nil) + + mockBatchStore := storagemock.NewMockBatchStore(ctrl) + mockBatchStore.EXPECT().Get(gomock.Any(), "test-queue/batch/1").Return(testBatch, nil) + + mockStorage := storagemock.NewMockStorage(ctrl) + mockStorage.EXPECT().GetRequestStore().Return(mockReqStore) + mockStorage.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes() + + return lp, mockStorage, nil + }, + payload: func() []byte { + b, _ := testBatch.ToBytes() + return b + }(), + wantErr: true, + }, + { + name: "batch update error after successful land", + setupMocks: func(ctrl *gomock.Controller) (landprovider.LandProvider, *storagemock.MockStorage, error) { + lp := landprovidermock.NewMockLandProvider(ctrl) + lp.EXPECT().Land(gomock.Any(), gomock.Any(), gomock.Any()).Return(landprovider.Result{SHA: "abc123def"}, nil) + + mockReqStore := storagemock.NewMockRequestStore(ctrl) + mockReqStore.EXPECT().Get(gomock.Any(), "test-queue/123").Return(testRequest, nil) + + mockBatchStore := storagemock.NewMockBatchStore(ctrl) + mockBatchStore.EXPECT().Get(gomock.Any(), "test-queue/batch/1").Return(testBatch, nil) + mockBatchStore.EXPECT().UpdateState(gomock.Any(), "test-queue/batch/1", int32(1), entity.BatchStateLandSucceeded).Return(fmt.Errorf("version mismatch")) + + mockStorage := storagemock.NewMockStorage(ctrl) + mockStorage.EXPECT().GetRequestStore().Return(mockReqStore) + mockStorage.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes() + + return lp, mockStorage, nil + }, + payload: func() []byte { + b, _ := testBatch.ToBytes() + return b + }(), + wantErr: true, + }, + { + name: "batch update error after land rejection", + setupMocks: func(ctrl *gomock.Controller) (landprovider.LandProvider, *storagemock.MockStorage, error) { + lp := landprovidermock.NewMockLandProvider(ctrl) + lp.EXPECT().Land(gomock.Any(), gomock.Any(), gomock.Any()).Return(landprovider.Result{}, landprovider.WrapLandRejected(fmt.Errorf("merge conflict"))) + + mockReqStore := storagemock.NewMockRequestStore(ctrl) + mockReqStore.EXPECT().Get(gomock.Any(), "test-queue/123").Return(testRequest, nil) + + mockBatchStore := storagemock.NewMockBatchStore(ctrl) + mockBatchStore.EXPECT().Get(gomock.Any(), "test-queue/batch/1").Return(testBatch, nil) + mockBatchStore.EXPECT().UpdateState(gomock.Any(), "test-queue/batch/1", int32(1), entity.BatchStateLandFailed).Return(fmt.Errorf("version mismatch")) + + mockStorage := storagemock.NewMockStorage(ctrl) + mockStorage.EXPECT().GetRequestStore().Return(mockReqStore) + mockStorage.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes() + + return lp, mockStorage, nil + }, + payload: func() []byte { + b, _ := testBatch.ToBytes() + return b + }(), + wantErr: true, + }, + { + name: "publish failure", + setupMocks: func(ctrl *gomock.Controller) (landprovider.LandProvider, *storagemock.MockStorage, error) { + lp := landprovidermock.NewMockLandProvider(ctrl) + lp.EXPECT().Land(gomock.Any(), gomock.Any(), gomock.Any()).Return(landprovider.Result{SHA: "abc123def"}, nil) + + mockReqStore := storagemock.NewMockRequestStore(ctrl) + mockReqStore.EXPECT().Get(gomock.Any(), "test-queue/123").Return(testRequest, nil) + + mockBatchStore := storagemock.NewMockBatchStore(ctrl) + mockBatchStore.EXPECT().Get(gomock.Any(), "test-queue/batch/1").Return(testBatch, nil) + mockBatchStore.EXPECT().UpdateState(gomock.Any(), "test-queue/batch/1", int32(1), entity.BatchStateLandSucceeded).Return(nil) + + mockStorage := storagemock.NewMockStorage(ctrl) + mockStorage.EXPECT().GetRequestStore().Return(mockReqStore) + mockStorage.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes() + + return lp, mockStorage, fmt.Errorf("publish failed") + }, + payload: func() []byte { + b, _ := testBatch.ToBytes() + return b + }(), + wantErr: true, + }, } - payload, err := request.ToBytes() - require.NoError(t, err) + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctrl := gomock.NewController(t) + lp, mockStorage, publishErr := tt.setupMocks(ctrl) + + logger := zaptest.NewLogger(t).Sugar() + scope := tally.NoopScope + + mockPub := queuemock.NewMockPublisher(ctrl) + mockPub.EXPECT().Publish(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( + func(ctx context.Context, topic string, msg queue.Message) error { + return publishErr + }, + ).AnyTimes() + + mockQ := queuemock.NewMockQueue(ctrl) + mockQ.EXPECT().Publisher().Return(mockPub).AnyTimes() + + registry, err := consumer.NewTopicRegistry( + []consumer.TopicConfig{{Key: consumer.TopicKeyMergeSignal, Name: "merge-signal", Queue: mockQ}}, + ) + require.NoError(t, err) + + controller := NewController(logger, scope, registry, lp, mockStorage, consumer.TopicKeyToMerge, "orchestrator-merge") - msg := queue.NewMessage(request.ID, payload, request.Queue, nil) - delivery := queuemock.NewMockDelivery(ctrl) - delivery.EXPECT().Message().Return(msg).AnyTimes() - delivery.EXPECT().Attempt().Return(1).AnyTimes() + msg := queue.NewMessage("test-batch-id", tt.payload, "test-queue", nil) + delivery := queuemock.NewMockDelivery(ctrl) + delivery.EXPECT().Message().Return(msg).AnyTimes() + delivery.EXPECT().Attempt().Return(1).AnyTimes() - err = controller.Process(context.Background(), delivery) - assert.Error(t, err) + err = controller.Process(context.Background(), delivery) + + if tt.wantErr { + require.Error(t, err) + } else { + require.NoError(t, err) + } + }) + } } func TestController_InterfaceImplementation(t *testing.T) { ctrl := gomock.NewController(t) - controller := newTestController(t, ctrl, nil) + lp := landprovidermock.NewMockLandProvider(ctrl) + mockStorage := storagemock.NewMockStorage(ctrl) + + logger := zaptest.NewLogger(t).Sugar() + scope := tally.NoopScope + registry, err := consumer.NewTopicRegistry( + []consumer.TopicConfig{{Key: consumer.TopicKeyMergeSignal, Name: "merge-signal", Queue: queuemock.NewMockQueue(ctrl)}}, + ) + require.NoError(t, err) + + controller := NewController(logger, scope, registry, lp, mockStorage, consumer.TopicKeyToMerge, "orchestrator-merge") var _ consumer.Controller = controller } From fb603fc8eadf7f6d32314d208022f4e3bcbe272f Mon Sep 17 00:00:00 2001 From: "kevin.new" Date: Mon, 2 Mar 2026 07:53:54 +0000 Subject: [PATCH 2/5] WIP change message to batch ids --- orchestrator/controller/merge/merge.go | 66 ++++++--------- orchestrator/controller/merge/merge_test.go | 89 +++++++++------------ 2 files changed, 64 insertions(+), 91 deletions(-) diff --git a/orchestrator/controller/merge/merge.go b/orchestrator/controller/merge/merge.go index 6dc94871..307aa2c0 100644 --- a/orchestrator/controller/merge/merge.go +++ b/orchestrator/controller/merge/merge.go @@ -52,54 +52,44 @@ func NewController( } // Process processes a merge delivery from the queue. -// Deserializes the batch, fetches contained requests, lands the changes, -// and publishes the batch to the merge signal topic. +// Extracts the batch ID from the message, fetches the batch from storage, +// lands the changes, and publishes the batch ID to the merge signal topic. // Returns nil to ack (success), or error to nack (retry). func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) error { c.metricsScope.Counter("received").Inc(1) msg := delivery.Message() + batchID := string(msg.Payload) - // Deserialize batch entity - batch, err := entity.BatchFromBytes(msg.Payload) - if err != nil { - c.logger.Errorw("failed to deserialize batch", + c.logger.Infow("received merge event", + "batch_id", batchID, + "attempt", delivery.Attempt(), + "partition_key", msg.PartitionKey, + ) + + if batchID == "" { + c.logger.Errorw("empty batch ID in message", "message_id", msg.ID, "partition_key", msg.PartitionKey, "attempt", delivery.Attempt(), - "error", err, ) c.metricsScope.Counter("deserialize_errors").Inc(1) - // Non-retryable: malformed messages will never succeed - return fmt.Errorf("failed to deserialize batch: %w", err) + return fmt.Errorf("empty batch ID in message %s", msg.ID) } - c.logger.Infow("received merge event", - "batch_id", batch.ID, - "queue", batch.Queue, - "request_count", len(batch.Contains), - "state", string(batch.State), - "version", batch.Version, - "attempt", delivery.Attempt(), - "partition_key", msg.PartitionKey, - ) - - // Idempotency guard: re-fetch the batch from storage to check current state. - // On retries (e.g., land succeeded but publish failed), this prevents calling - // Land again for changes that were already merged. - currentBatch, err := c.storage.GetBatchStore().Get(ctx, batch.ID) + // Fetch the batch from storage — this is the single source of truth. + // On retries (e.g., land succeeded but publish failed), reading current + // state prevents calling Land again for changes that were already merged. + batch, err := c.storage.GetBatchStore().Get(ctx, batchID) if err != nil { - c.logger.Errorw("failed to fetch current batch state", - "batch_id", batch.ID, + c.logger.Errorw("failed to fetch batch", + "batch_id", batchID, "error", err, ) c.metricsScope.Counter("batch_fetch_errors").Inc(1) - return fmt.Errorf("failed to fetch batch %s: %w", batch.ID, err) + return fmt.Errorf("failed to fetch batch %s: %w", batchID, err) } - // Use the current version from storage for subsequent operations - batch = currentBatch - switch batch.State { case entity.BatchStateLandSucceeded, entity.BatchStateLandFailed: c.logger.Infow("batch already landed, skipping to publish", @@ -134,8 +124,8 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er return fmt.Errorf("unexpected batch state %s for batch %s", batch.State, batch.ID) } - // Publish to merge signal topic - if err := c.publish(ctx, consumer.TopicKeyMergeSignal, batch); err != nil { + // Publish batch ID to merge signal topic + if err := c.publish(ctx, consumer.TopicKeyMergeSignal, batch.ID, batch.Queue); err != nil { c.logger.Errorw("failed to publish output", "batch_id", batch.ID, "topic_key", consumer.TopicKeyMergeSignal, @@ -179,13 +169,12 @@ func (c *Controller) land(ctx context.Context, batch entity.Batch) (entity.Batch }) } - landResult, err := c.landProvider.Land(ctx, batch.Queue, entries) + err := c.landProvider.Land(ctx, batch.Queue, entries) switch { case err == nil: c.logger.Infow("land succeeded", "batch_id", batch.ID, - "sha", landResult.SHA, ) return entity.BatchStateLandSucceeded, nil case landprovider.IsLandRejected(err): @@ -205,14 +194,9 @@ func (c *Controller) land(ctx context.Context, batch entity.Batch) (entity.Batch } } -// publish publishes a batch to the specified topic key. -func (c *Controller) publish(ctx context.Context, key consumer.TopicKey, batch entity.Batch) error { - payload, err := batch.ToBytes() - if err != nil { - return fmt.Errorf("failed to serialize batch: %w", err) - } - - msg := entityqueue.NewMessage(batch.ID, payload, batch.Queue, nil) +// publish publishes a batch ID to the specified topic key. +func (c *Controller) publish(ctx context.Context, key consumer.TopicKey, id string, partitionKey string) error { + msg := entityqueue.NewMessage(id, []byte(id), partitionKey, nil) q, ok := c.registry.Queue(key) if !ok { diff --git a/orchestrator/controller/merge/merge_test.go b/orchestrator/controller/merge/merge_test.go index f93d7e47..b62877cf 100644 --- a/orchestrator/controller/merge/merge_test.go +++ b/orchestrator/controller/merge/merge_test.go @@ -60,14 +60,14 @@ func TestController_Process(t *testing.T) { tests := []struct { name string setupMocks func(*gomock.Controller) (landprovider.LandProvider, *storagemock.MockStorage, error) - payload []byte + payload string wantErr bool }{ { name: "success", setupMocks: func(ctrl *gomock.Controller) (landprovider.LandProvider, *storagemock.MockStorage, error) { lp := landprovidermock.NewMockLandProvider(ctrl) - lp.EXPECT().Land(gomock.Any(), gomock.Any(), gomock.Any()).Return(landprovider.Result{SHA: "abc123def"}, nil) + lp.EXPECT().Land(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil) mockReqStore := storagemock.NewMockRequestStore(ctrl) mockReqStore.EXPECT().Get(gomock.Any(), "test-queue/123").Return(testRequest, nil) @@ -82,21 +82,34 @@ func TestController_Process(t *testing.T) { return lp, mockStorage, nil }, - payload: func() []byte { - b, _ := testBatch.ToBytes() - return b - }(), + payload: "test-queue/batch/1", wantErr: false, }, { - name: "invalid json", + name: "empty batch ID", setupMocks: func(ctrl *gomock.Controller) (landprovider.LandProvider, *storagemock.MockStorage, error) { lp := landprovidermock.NewMockLandProvider(ctrl) mockStorage := storagemock.NewMockStorage(ctrl) return lp, mockStorage, nil }, - payload: []byte(`{"invalid": json"}`), - wantErr: true, + payload: "", + wantErr: true, + }, + { + name: "batch not found", + setupMocks: func(ctrl *gomock.Controller) (landprovider.LandProvider, *storagemock.MockStorage, error) { + lp := landprovidermock.NewMockLandProvider(ctrl) + + mockBatchStore := storagemock.NewMockBatchStore(ctrl) + mockBatchStore.EXPECT().Get(gomock.Any(), "test-queue/batch/1").Return(entity.Batch{}, fmt.Errorf("not found")) + + mockStorage := storagemock.NewMockStorage(ctrl) + mockStorage.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes() + + return lp, mockStorage, nil + }, + payload: "test-queue/batch/1", + wantErr: true, }, { name: "idempotent - already land_succeeded", @@ -115,10 +128,7 @@ func TestController_Process(t *testing.T) { return lp, mockStorage, nil }, - payload: func() []byte { - b, _ := testBatch.ToBytes() - return b - }(), + payload: "test-queue/batch/1", wantErr: false, }, { @@ -138,10 +148,7 @@ func TestController_Process(t *testing.T) { return lp, mockStorage, nil }, - payload: func() []byte { - b, _ := testBatch.ToBytes() - return b - }(), + payload: "test-queue/batch/1", wantErr: false, }, { @@ -161,17 +168,14 @@ func TestController_Process(t *testing.T) { return lp, mockStorage, nil }, - payload: func() []byte { - b, _ := testBatch.ToBytes() - return b - }(), + payload: "test-queue/batch/1", wantErr: true, }, { name: "land rejected - batch marked as failed and published", setupMocks: func(ctrl *gomock.Controller) (landprovider.LandProvider, *storagemock.MockStorage, error) { lp := landprovidermock.NewMockLandProvider(ctrl) - lp.EXPECT().Land(gomock.Any(), gomock.Any(), gomock.Any()).Return(landprovider.Result{}, landprovider.WrapLandRejected(fmt.Errorf("merge conflict"))) + lp.EXPECT().Land(gomock.Any(), gomock.Any(), gomock.Any()).Return(landprovider.WrapLandRejected(fmt.Errorf("merge conflict"))) mockReqStore := storagemock.NewMockRequestStore(ctrl) mockReqStore.EXPECT().Get(gomock.Any(), "test-queue/123").Return(testRequest, nil) @@ -186,17 +190,14 @@ func TestController_Process(t *testing.T) { return lp, mockStorage, nil }, - payload: func() []byte { - b, _ := testBatch.ToBytes() - return b - }(), - wantErr: false, // Acked — terminal failure handled + payload: "test-queue/batch/1", + wantErr: false, }, { name: "land error - not rejected", setupMocks: func(ctrl *gomock.Controller) (landprovider.LandProvider, *storagemock.MockStorage, error) { lp := landprovidermock.NewMockLandProvider(ctrl) - lp.EXPECT().Land(gomock.Any(), gomock.Any(), gomock.Any()).Return(landprovider.Result{}, fmt.Errorf("generic error")) + lp.EXPECT().Land(gomock.Any(), gomock.Any(), gomock.Any()).Return(fmt.Errorf("generic error")) mockReqStore := storagemock.NewMockRequestStore(ctrl) mockReqStore.EXPECT().Get(gomock.Any(), "test-queue/123").Return(testRequest, nil) @@ -210,17 +211,14 @@ func TestController_Process(t *testing.T) { return lp, mockStorage, nil }, - payload: func() []byte { - b, _ := testBatch.ToBytes() - return b - }(), - wantErr: true, + payload: "test-queue/batch/1", + wantErr: true, }, { name: "batch update error after successful land", setupMocks: func(ctrl *gomock.Controller) (landprovider.LandProvider, *storagemock.MockStorage, error) { lp := landprovidermock.NewMockLandProvider(ctrl) - lp.EXPECT().Land(gomock.Any(), gomock.Any(), gomock.Any()).Return(landprovider.Result{SHA: "abc123def"}, nil) + lp.EXPECT().Land(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil) mockReqStore := storagemock.NewMockRequestStore(ctrl) mockReqStore.EXPECT().Get(gomock.Any(), "test-queue/123").Return(testRequest, nil) @@ -235,17 +233,14 @@ func TestController_Process(t *testing.T) { return lp, mockStorage, nil }, - payload: func() []byte { - b, _ := testBatch.ToBytes() - return b - }(), + payload: "test-queue/batch/1", wantErr: true, }, { name: "batch update error after land rejection", setupMocks: func(ctrl *gomock.Controller) (landprovider.LandProvider, *storagemock.MockStorage, error) { lp := landprovidermock.NewMockLandProvider(ctrl) - lp.EXPECT().Land(gomock.Any(), gomock.Any(), gomock.Any()).Return(landprovider.Result{}, landprovider.WrapLandRejected(fmt.Errorf("merge conflict"))) + lp.EXPECT().Land(gomock.Any(), gomock.Any(), gomock.Any()).Return(landprovider.WrapLandRejected(fmt.Errorf("merge conflict"))) mockReqStore := storagemock.NewMockRequestStore(ctrl) mockReqStore.EXPECT().Get(gomock.Any(), "test-queue/123").Return(testRequest, nil) @@ -260,17 +255,14 @@ func TestController_Process(t *testing.T) { return lp, mockStorage, nil }, - payload: func() []byte { - b, _ := testBatch.ToBytes() - return b - }(), + payload: "test-queue/batch/1", wantErr: true, }, { name: "publish failure", setupMocks: func(ctrl *gomock.Controller) (landprovider.LandProvider, *storagemock.MockStorage, error) { lp := landprovidermock.NewMockLandProvider(ctrl) - lp.EXPECT().Land(gomock.Any(), gomock.Any(), gomock.Any()).Return(landprovider.Result{SHA: "abc123def"}, nil) + lp.EXPECT().Land(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil) mockReqStore := storagemock.NewMockRequestStore(ctrl) mockReqStore.EXPECT().Get(gomock.Any(), "test-queue/123").Return(testRequest, nil) @@ -285,11 +277,8 @@ func TestController_Process(t *testing.T) { return lp, mockStorage, fmt.Errorf("publish failed") }, - payload: func() []byte { - b, _ := testBatch.ToBytes() - return b - }(), - wantErr: true, + payload: "test-queue/batch/1", + wantErr: true, }, } @@ -318,7 +307,7 @@ func TestController_Process(t *testing.T) { controller := NewController(logger, scope, registry, lp, mockStorage, consumer.TopicKeyToMerge, "orchestrator-merge") - msg := queue.NewMessage("test-batch-id", tt.payload, "test-queue", nil) + msg := queue.NewMessage("test-msg-id", []byte(tt.payload), "test-queue", nil) delivery := queuemock.NewMockDelivery(ctrl) delivery.EXPECT().Message().Return(msg).AnyTimes() delivery.EXPECT().Attempt().Return(1).AnyTimes() From 10669dae85acdf9e1faa14075ae67ac61d49017a Mon Sep 17 00:00:00 2001 From: "kevin.new" Date: Mon, 2 Mar 2026 18:24:54 +0000 Subject: [PATCH 3/5] added landProvider factory and wired up with orchestrator/mergecontroller --- example/server/orchestrator/BUILD.bazel | 2 ++ example/server/orchestrator/main.go | 34 ++++++++++++++++++++++--- orchestrator/controller/merge/merge.go | 4 +-- 3 files changed, 34 insertions(+), 6 deletions(-) diff --git a/example/server/orchestrator/BUILD.bazel b/example/server/orchestrator/BUILD.bazel index c342fa47..4675ad40 100644 --- a/example/server/orchestrator/BUILD.bazel +++ b/example/server/orchestrator/BUILD.bazel @@ -14,6 +14,8 @@ go_library( "//core/consumer", "//extension/counter", "//extension/counter/mysql", + "//extension/landprovider", + "//extension/landprovider/github", "//extension/mergechecker", "//extension/mergechecker/github", "//extension/queue", diff --git a/example/server/orchestrator/main.go b/example/server/orchestrator/main.go index 0d2de08a..554ac4fe 100644 --- a/example/server/orchestrator/main.go +++ b/example/server/orchestrator/main.go @@ -17,8 +17,10 @@ import ( "github.com/uber/submitqueue/core/consumer" "github.com/uber/submitqueue/extension/counter" mysqlcounter "github.com/uber/submitqueue/extension/counter/mysql" + "github.com/uber/submitqueue/extension/landprovider" "github.com/uber/submitqueue/extension/mergechecker" githubchecker "github.com/uber/submitqueue/extension/mergechecker/github" + githublander "github.com/uber/submitqueue/extension/landprovider/github" extqueue "github.com/uber/submitqueue/extension/queue" queueMySQL "github.com/uber/submitqueue/extension/queue/mysql" mysqlstorage "github.com/uber/submitqueue/extension/storage/mysql" @@ -160,8 +162,11 @@ func run() error { // Create merge checker mc := newMergeChecker(logger, scope) + // Create land provider + lp := newLandProvider(logger, scope) + // Register controllers - if err := registerControllers(c, logger.Sugar(), scope, registry, mc, cnt, store); err != nil { + if err := registerControllers(c, logger.Sugar(), scope, registry, mc, lp, cnt, store); err != nil { return err } @@ -298,7 +303,7 @@ func newTopicRegistry(q extqueue.Queue, subscriberName string) (consumer.TopicRe // → merge → merge-signal // finalize (terminal) -func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope tally.Scope, registry consumer.TopicRegistry, mc mergechecker.MergeChecker, cnt counter.Counter, store storage.Storage) error { +func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope tally.Scope, registry consumer.TopicRegistry, mc mergechecker.MergeChecker, lp landprovider.LandProvider, cnt counter.Counter, store storage.Storage) error { requestController := request.NewController( logger, scope, @@ -361,8 +366,8 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t logger, scope, registry, - nil, // TODO: wire LandProvider implementation - nil, // TODO: wire Storage implementation + lp, + store, consumer.TopicKeyToMerge, "orchestrator-merge", ) @@ -420,6 +425,27 @@ func newMergeChecker(logger *zap.Logger, scope tally.Scope) mergechecker.MergeCh }) } +// newLandProvider creates a LandProvider for GitHub (github.com). +// Configured via GITHUB_TOKEN and GITHUB_API_URL environment variables. +func newLandProvider(logger *zap.Logger, scope tally.Scope) landprovider.LandProvider { + apiURL := os.Getenv("GITHUB_API_URL") + if apiURL == "" { + apiURL = "https://api.github.com" + } + + httpClient := &http.Client{} + if token := os.Getenv("GITHUB_TOKEN"); token != "" { + httpClient.Transport = &bearerTransport{token: token} + } + + return githublander.NewLandProvider(githublander.Params{ + HTTPClient: httpClient, + APIURL: apiURL, + Logger: logger.Sugar(), + MetricsScope: scope.SubScope("landprovider"), + }) +} + // bearerTransport is an http.RoundTripper that adds a Bearer token to requests. type bearerTransport struct { token string diff --git a/orchestrator/controller/merge/merge.go b/orchestrator/controller/merge/merge.go index 307aa2c0..b6edc824 100644 --- a/orchestrator/controller/merge/merge.go +++ b/orchestrator/controller/merge/merge.go @@ -149,7 +149,7 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er // and classifies the outcome into a batch state. func (c *Controller) land(ctx context.Context, batch entity.Batch) (entity.BatchState, error) { requestStore := c.storage.GetRequestStore() - entries := make([]landprovider.LandEntry, 0, len(batch.Contains)) + entries := make([]entity.LandEntry, 0, len(batch.Contains)) for _, requestID := range batch.Contains { request, err := requestStore.Get(ctx, requestID) @@ -163,7 +163,7 @@ func (c *Controller) land(ctx context.Context, batch entity.Batch) (entity.Batch return "", fmt.Errorf("failed to fetch request %s: %w", requestID, err) } - entries = append(entries, landprovider.LandEntry{ + entries = append(entries, entity.LandEntry{ Strategy: request.LandStrategy, Change: request.Change, }) From 93bcc1f50d58038e6cb52604730fa08a07f65b9a Mon Sep 17 00:00:00 2001 From: "kevin.new" Date: Mon, 2 Mar 2026 20:38:00 +0000 Subject: [PATCH 4/5] fixes: remove unncessary batch states, update errs to retryable, update logical flow for idempotency --- entity/batch.go | 6 - entity/batch_test.go | 1 - orchestrator/controller/batch/batch.go | 1 - orchestrator/controller/merge/BUILD.bazel | 2 + orchestrator/controller/merge/merge.go | 70 +++++---- orchestrator/controller/merge/merge_test.go | 160 +++++++++++++++++--- 6 files changed, 173 insertions(+), 67 deletions(-) diff --git a/entity/batch.go b/entity/batch.go index 6ae2ff0d..f0a57102 100644 --- a/entity/batch.go +++ b/entity/batch.go @@ -12,12 +12,6 @@ const ( BatchStateCreated BatchState = "created" // BatchStateSpeculating is the state of a batch that is undergoing speculative execution. BatchStateSpeculating BatchState = "speculating" - // BatchStateLanding is the state of a batch that is being landed. - BatchStateLanding BatchState = "landing" - // BatchStateLandFailed is the state of a batch that has failed to land. - BatchStateLandFailed BatchState = "land_failed" - // BatchStateLandSucceeded is the state of a batch that has successfully landed. - BatchStateLandSucceeded BatchState = "land_succeeded" // BatchStateSucceeded is the terminal state of a batch that has been successfully landed. BatchStateSucceeded BatchState = "succeeded" // BatchStateFailed is the terminal state of a batch that has failed. diff --git a/entity/batch_test.go b/entity/batch_test.go index e204b24f..2aa88a9c 100644 --- a/entity/batch_test.go +++ b/entity/batch_test.go @@ -16,7 +16,6 @@ func TestBatchState_IsTerminal(t *testing.T) { {name: "unknown", state: BatchStateUnknown, terminal: false}, {name: "created", state: BatchStateCreated, terminal: false}, {name: "speculating", state: BatchStateSpeculating, terminal: false}, - {name: "finalizing", state: BatchStateFinalizing, terminal: false}, {name: "succeeded", state: BatchStateSucceeded, terminal: true}, {name: "failed", state: BatchStateFailed, terminal: true}, {name: "cancelled", state: BatchStateCancelled, terminal: true}, diff --git a/orchestrator/controller/batch/batch.go b/orchestrator/controller/batch/batch.go index 114cdeff..c7d23221 100644 --- a/orchestrator/controller/batch/batch.go +++ b/orchestrator/controller/batch/batch.go @@ -106,7 +106,6 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er activeBatches, err := c.store.GetBatchStore().GetByQueueAndStates(ctx, request.Queue, []entity.BatchState{ entity.BatchStateCreated, entity.BatchStateSpeculating, - entity.BatchStateLanding, }) if err != nil { c.logger.Errorw("failed to get active batches", diff --git a/orchestrator/controller/merge/BUILD.bazel b/orchestrator/controller/merge/BUILD.bazel index 047cd6b9..c7e59db4 100644 --- a/orchestrator/controller/merge/BUILD.bazel +++ b/orchestrator/controller/merge/BUILD.bazel @@ -7,6 +7,7 @@ go_library( visibility = ["//visibility:public"], deps = [ "//core/consumer", + "//core/errs", "//entity", "//entity/queue", "//extension/landprovider", @@ -22,6 +23,7 @@ go_test( embed = [":merge"], deps = [ "//core/consumer", + "//core/errs", "//entity", "//entity/queue", "//extension/landprovider", diff --git a/orchestrator/controller/merge/merge.go b/orchestrator/controller/merge/merge.go index b6edc824..7e82520f 100644 --- a/orchestrator/controller/merge/merge.go +++ b/orchestrator/controller/merge/merge.go @@ -6,6 +6,7 @@ import ( "github.com/uber-go/tally/v4" "github.com/uber/submitqueue/core/consumer" + "github.com/uber/submitqueue/core/errs" "github.com/uber/submitqueue/entity" entityqueue "github.com/uber/submitqueue/entity/queue" "github.com/uber/submitqueue/extension/landprovider" @@ -15,7 +16,8 @@ import ( // Controller handles merge queue messages. // It consumes batches, fetches the contained requests from storage, -// lands the changes via the LandProvider, and publishes results to the merge signal stage. +// lands the changes via the LandProvider, and publishes results +// to the merge-signal and speculator stages. // Implements consumer.Controller interface for integration with the consumer. type Controller struct { logger *zap.SugaredLogger @@ -53,7 +55,7 @@ func NewController( // Process processes a merge delivery from the queue. // Extracts the batch ID from the message, fetches the batch from storage, -// lands the changes, and publishes the batch ID to the merge signal topic. +// lands the changes, and publishes the batch ID to downstream topics. // Returns nil to ack (success), or error to nack (retry). func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) error { c.metricsScope.Counter("received").Inc(1) @@ -78,8 +80,8 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er } // Fetch the batch from storage — this is the single source of truth. - // On retries (e.g., land succeeded but publish failed), reading current - // state prevents calling Land again for changes that were already merged. + // Terminal states mean the land outcome was already persisted on a + // previous attempt; skip straight to publish. batch, err := c.storage.GetBatchStore().Get(ctx, batchID) if err != nil { c.logger.Errorw("failed to fetch batch", @@ -87,20 +89,13 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er "error", err, ) c.metricsScope.Counter("batch_fetch_errors").Inc(1) - return fmt.Errorf("failed to fetch batch %s: %w", batchID, err) + return errs.NewRetryableError(fmt.Errorf("failed to fetch batch %s: %w", batchID, err)) } - switch batch.State { - case entity.BatchStateLandSucceeded, entity.BatchStateLandFailed: - c.logger.Infow("batch already landed, skipping to publish", - "batch_id", batch.ID, - "state", string(batch.State), - ) - c.metricsScope.Counter("idempotent_skip").Inc(1) - case entity.BatchStateLanding: + if !batch.State.IsTerminal() { newState, err := c.land(ctx, batch) if err != nil { - return err + return fmt.Errorf("batch %s: %w", batch.ID, err) } if err := c.storage.GetBatchStore().UpdateState(ctx, batch.ID, batch.Version, newState); err != nil { @@ -110,35 +105,37 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er "error", err, ) c.metricsScope.Counter("batch_update_errors").Inc(1) - return fmt.Errorf("failed to update batch state: %w", err) + return errs.NewRetryableError(fmt.Errorf("failed to update batch state: %w", err)) } batch.State = newState - batch.Version++ - default: - c.logger.Errorw("unexpected batch state for merge", + } else { + c.logger.Infow("batch already in terminal state, skipping to publish", "batch_id", batch.ID, "state", string(batch.State), ) - c.metricsScope.Counter("unexpected_state").Inc(1) - return fmt.Errorf("unexpected batch state %s for batch %s", batch.State, batch.ID) + c.metricsScope.Counter("idempotent_skip").Inc(1) } - // Publish batch ID to merge signal topic - if err := c.publish(ctx, consumer.TopicKeyMergeSignal, batch.ID, batch.Queue); err != nil { - c.logger.Errorw("failed to publish output", - "batch_id", batch.ID, - "topic_key", consumer.TopicKeyMergeSignal, - "error", err, - ) - c.metricsScope.Counter("publish_errors").Inc(1) - return fmt.Errorf("failed to publish to merge-signal: %w", err) + // Publish batch ID to merge-signal and speculator topics. + // On retry, a topic that already received this message may get a duplicate; + // this is safe because downstream consumers are idempotent. + for _, key := range []consumer.TopicKey{consumer.TopicKeyMergeSignal, consumer.TopicKeyBatched} { + if err := c.publish(ctx, key, batch.ID, batch.Queue); err != nil { + c.logger.Errorw("failed to publish output", + "batch_id", batch.ID, + "topic_key", key, + "error", err, + ) + c.metricsScope.Counter("publish_errors").Inc(1) + return errs.NewRetryableError(fmt.Errorf("failed to publish to %s: %w", key, err)) + } } - c.logger.Infow("published batch to next stage", + c.logger.Infow("published batch to next stages", "batch_id", batch.ID, "state", string(batch.State), - "topic_key", consumer.TopicKeyMergeSignal, + "topic_keys", []string{consumer.TopicKeyMergeSignal.String(), consumer.TopicKeyBatched.String()}, ) c.metricsScope.Counter("processed").Inc(1) @@ -160,7 +157,7 @@ func (c *Controller) land(ctx context.Context, batch entity.Batch) (entity.Batch "error", err, ) c.metricsScope.Counter("request_fetch_errors").Inc(1) - return "", fmt.Errorf("failed to fetch request %s: %w", requestID, err) + return "", errs.NewRetryableError(fmt.Errorf("failed to fetch request %s: %w", requestID, err)) } entries = append(entries, entity.LandEntry{ @@ -172,25 +169,26 @@ func (c *Controller) land(ctx context.Context, batch entity.Batch) (entity.Batch err := c.landProvider.Land(ctx, batch.Queue, entries) switch { - case err == nil: + case err == nil, landprovider.IsAlreadyLanded(err): c.logger.Infow("land succeeded", "batch_id", batch.ID, + "already_landed", landprovider.IsAlreadyLanded(err), ) - return entity.BatchStateLandSucceeded, nil + return entity.BatchStateSucceeded, nil case landprovider.IsLandRejected(err): c.logger.Errorw("land rejected", "batch_id", batch.ID, "error", err, ) c.metricsScope.Counter("land_rejected").Inc(1) - return entity.BatchStateLandFailed, nil + return entity.BatchStateFailed, nil default: c.logger.Errorw("land failed", "batch_id", batch.ID, "error", err, ) c.metricsScope.Counter("land_errors").Inc(1) - return "", fmt.Errorf("land failed: %w", err) + return "", errs.NewRetryableError(fmt.Errorf("land failed: %w", err)) } } diff --git a/orchestrator/controller/merge/merge_test.go b/orchestrator/controller/merge/merge_test.go index b62877cf..bf389d34 100644 --- a/orchestrator/controller/merge/merge_test.go +++ b/orchestrator/controller/merge/merge_test.go @@ -3,12 +3,14 @@ package merge import ( "context" "fmt" + "sync/atomic" "testing" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/uber-go/tally/v4" "github.com/uber/submitqueue/core/consumer" + "github.com/uber/submitqueue/core/errs" "github.com/uber/submitqueue/entity" "github.com/uber/submitqueue/entity/queue" "github.com/uber/submitqueue/extension/landprovider" @@ -27,7 +29,10 @@ func TestNewController(t *testing.T) { logger := zaptest.NewLogger(t).Sugar() scope := tally.NoopScope registry, err := consumer.NewTopicRegistry( - []consumer.TopicConfig{{Key: consumer.TopicKeyMergeSignal, Name: "merge-signal", Queue: queuemock.NewMockQueue(ctrl)}}, + []consumer.TopicConfig{ + {Key: consumer.TopicKeyMergeSignal, Name: "merge-signal", Queue: queuemock.NewMockQueue(ctrl)}, + {Key: consumer.TopicKeyBatched, Name: "batched", Queue: queuemock.NewMockQueue(ctrl)}, + }, ) require.NoError(t, err) @@ -53,15 +58,16 @@ func TestController_Process(t *testing.T) { ID: "test-queue/batch/1", Queue: "test-queue", Contains: []string{"test-queue/123"}, - State: entity.BatchStateLanding, + State: entity.BatchStateSpeculating, Version: 1, } tests := []struct { - name string - setupMocks func(*gomock.Controller) (landprovider.LandProvider, *storagemock.MockStorage, error) - payload string - wantErr bool + name string + setupMocks func(*gomock.Controller) (landprovider.LandProvider, *storagemock.MockStorage, error) + payload string + wantErr bool + wantRetryable bool }{ { name: "success", @@ -74,7 +80,7 @@ func TestController_Process(t *testing.T) { mockBatchStore := storagemock.NewMockBatchStore(ctrl) mockBatchStore.EXPECT().Get(gomock.Any(), "test-queue/batch/1").Return(testBatch, nil) - mockBatchStore.EXPECT().UpdateState(gomock.Any(), "test-queue/batch/1", int32(1), entity.BatchStateLandSucceeded).Return(nil) + mockBatchStore.EXPECT().UpdateState(gomock.Any(), "test-queue/batch/1", int32(1), entity.BatchStateSucceeded).Return(nil) mockStorage := storagemock.NewMockStorage(ctrl) mockStorage.EXPECT().GetRequestStore().Return(mockReqStore) @@ -96,7 +102,7 @@ func TestController_Process(t *testing.T) { wantErr: true, }, { - name: "batch not found", + name: "batch not found - retryable", setupMocks: func(ctrl *gomock.Controller) (landprovider.LandProvider, *storagemock.MockStorage, error) { lp := landprovidermock.NewMockLandProvider(ctrl) @@ -110,14 +116,15 @@ func TestController_Process(t *testing.T) { }, payload: "test-queue/batch/1", wantErr: true, + wantRetryable: true, }, { - name: "idempotent - already land_succeeded", + name: "idempotent - already succeeded", setupMocks: func(ctrl *gomock.Controller) (landprovider.LandProvider, *storagemock.MockStorage, error) { lp := landprovidermock.NewMockLandProvider(ctrl) succeededBatch := testBatch - succeededBatch.State = entity.BatchStateLandSucceeded + succeededBatch.State = entity.BatchStateSucceeded succeededBatch.Version = 2 mockBatchStore := storagemock.NewMockBatchStore(ctrl) @@ -132,12 +139,12 @@ func TestController_Process(t *testing.T) { wantErr: false, }, { - name: "idempotent - already land_failed", + name: "idempotent - already failed", setupMocks: func(ctrl *gomock.Controller) (landprovider.LandProvider, *storagemock.MockStorage, error) { lp := landprovidermock.NewMockLandProvider(ctrl) failedBatch := testBatch - failedBatch.State = entity.BatchStateLandFailed + failedBatch.State = entity.BatchStateFailed failedBatch.Version = 2 mockBatchStore := storagemock.NewMockBatchStore(ctrl) @@ -152,7 +159,7 @@ func TestController_Process(t *testing.T) { wantErr: false, }, { - name: "request not found", + name: "request not found - retryable", setupMocks: func(ctrl *gomock.Controller) (landprovider.LandProvider, *storagemock.MockStorage, error) { lp := landprovidermock.NewMockLandProvider(ctrl) @@ -170,6 +177,7 @@ func TestController_Process(t *testing.T) { }, payload: "test-queue/batch/1", wantErr: true, + wantRetryable: true, }, { name: "land rejected - batch marked as failed and published", @@ -182,7 +190,29 @@ func TestController_Process(t *testing.T) { mockBatchStore := storagemock.NewMockBatchStore(ctrl) mockBatchStore.EXPECT().Get(gomock.Any(), "test-queue/batch/1").Return(testBatch, nil) - mockBatchStore.EXPECT().UpdateState(gomock.Any(), "test-queue/batch/1", int32(1), entity.BatchStateLandFailed).Return(nil) + mockBatchStore.EXPECT().UpdateState(gomock.Any(), "test-queue/batch/1", int32(1), entity.BatchStateFailed).Return(nil) + + mockStorage := storagemock.NewMockStorage(ctrl) + mockStorage.EXPECT().GetRequestStore().Return(mockReqStore) + mockStorage.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes() + + return lp, mockStorage, nil + }, + payload: "test-queue/batch/1", + wantErr: false, + }, + { + name: "already landed - classified as success", + setupMocks: func(ctrl *gomock.Controller) (landprovider.LandProvider, *storagemock.MockStorage, error) { + lp := landprovidermock.NewMockLandProvider(ctrl) + lp.EXPECT().Land(gomock.Any(), gomock.Any(), gomock.Any()).Return(landprovider.ErrAlreadyLanded) + + mockReqStore := storagemock.NewMockRequestStore(ctrl) + mockReqStore.EXPECT().Get(gomock.Any(), "test-queue/123").Return(testRequest, nil) + + mockBatchStore := storagemock.NewMockBatchStore(ctrl) + mockBatchStore.EXPECT().Get(gomock.Any(), "test-queue/batch/1").Return(testBatch, nil) + mockBatchStore.EXPECT().UpdateState(gomock.Any(), "test-queue/batch/1", int32(1), entity.BatchStateSucceeded).Return(nil) mockStorage := storagemock.NewMockStorage(ctrl) mockStorage.EXPECT().GetRequestStore().Return(mockReqStore) @@ -194,7 +224,7 @@ func TestController_Process(t *testing.T) { wantErr: false, }, { - name: "land error - not rejected", + name: "land error - retryable", setupMocks: func(ctrl *gomock.Controller) (landprovider.LandProvider, *storagemock.MockStorage, error) { lp := landprovidermock.NewMockLandProvider(ctrl) lp.EXPECT().Land(gomock.Any(), gomock.Any(), gomock.Any()).Return(fmt.Errorf("generic error")) @@ -213,9 +243,10 @@ func TestController_Process(t *testing.T) { }, payload: "test-queue/batch/1", wantErr: true, + wantRetryable: true, }, { - name: "batch update error after successful land", + name: "batch update error after successful land - retryable", setupMocks: func(ctrl *gomock.Controller) (landprovider.LandProvider, *storagemock.MockStorage, error) { lp := landprovidermock.NewMockLandProvider(ctrl) lp.EXPECT().Land(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil) @@ -225,7 +256,7 @@ func TestController_Process(t *testing.T) { mockBatchStore := storagemock.NewMockBatchStore(ctrl) mockBatchStore.EXPECT().Get(gomock.Any(), "test-queue/batch/1").Return(testBatch, nil) - mockBatchStore.EXPECT().UpdateState(gomock.Any(), "test-queue/batch/1", int32(1), entity.BatchStateLandSucceeded).Return(fmt.Errorf("version mismatch")) + mockBatchStore.EXPECT().UpdateState(gomock.Any(), "test-queue/batch/1", int32(1), entity.BatchStateSucceeded).Return(fmt.Errorf("version mismatch")) mockStorage := storagemock.NewMockStorage(ctrl) mockStorage.EXPECT().GetRequestStore().Return(mockReqStore) @@ -235,9 +266,10 @@ func TestController_Process(t *testing.T) { }, payload: "test-queue/batch/1", wantErr: true, + wantRetryable: true, }, { - name: "batch update error after land rejection", + name: "batch update error after land rejection - retryable", setupMocks: func(ctrl *gomock.Controller) (landprovider.LandProvider, *storagemock.MockStorage, error) { lp := landprovidermock.NewMockLandProvider(ctrl) lp.EXPECT().Land(gomock.Any(), gomock.Any(), gomock.Any()).Return(landprovider.WrapLandRejected(fmt.Errorf("merge conflict"))) @@ -247,7 +279,7 @@ func TestController_Process(t *testing.T) { mockBatchStore := storagemock.NewMockBatchStore(ctrl) mockBatchStore.EXPECT().Get(gomock.Any(), "test-queue/batch/1").Return(testBatch, nil) - mockBatchStore.EXPECT().UpdateState(gomock.Any(), "test-queue/batch/1", int32(1), entity.BatchStateLandFailed).Return(fmt.Errorf("version mismatch")) + mockBatchStore.EXPECT().UpdateState(gomock.Any(), "test-queue/batch/1", int32(1), entity.BatchStateFailed).Return(fmt.Errorf("version mismatch")) mockStorage := storagemock.NewMockStorage(ctrl) mockStorage.EXPECT().GetRequestStore().Return(mockReqStore) @@ -257,9 +289,10 @@ func TestController_Process(t *testing.T) { }, payload: "test-queue/batch/1", wantErr: true, + wantRetryable: true, }, { - name: "publish failure", + name: "publish failure - retryable", setupMocks: func(ctrl *gomock.Controller) (landprovider.LandProvider, *storagemock.MockStorage, error) { lp := landprovidermock.NewMockLandProvider(ctrl) lp.EXPECT().Land(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil) @@ -269,7 +302,7 @@ func TestController_Process(t *testing.T) { mockBatchStore := storagemock.NewMockBatchStore(ctrl) mockBatchStore.EXPECT().Get(gomock.Any(), "test-queue/batch/1").Return(testBatch, nil) - mockBatchStore.EXPECT().UpdateState(gomock.Any(), "test-queue/batch/1", int32(1), entity.BatchStateLandSucceeded).Return(nil) + mockBatchStore.EXPECT().UpdateState(gomock.Any(), "test-queue/batch/1", int32(1), entity.BatchStateSucceeded).Return(nil) mockStorage := storagemock.NewMockStorage(ctrl) mockStorage.EXPECT().GetRequestStore().Return(mockReqStore) @@ -279,6 +312,7 @@ func TestController_Process(t *testing.T) { }, payload: "test-queue/batch/1", wantErr: true, + wantRetryable: true, }, } @@ -301,7 +335,10 @@ func TestController_Process(t *testing.T) { mockQ.EXPECT().Publisher().Return(mockPub).AnyTimes() registry, err := consumer.NewTopicRegistry( - []consumer.TopicConfig{{Key: consumer.TopicKeyMergeSignal, Name: "merge-signal", Queue: mockQ}}, + []consumer.TopicConfig{ + {Key: consumer.TopicKeyMergeSignal, Name: "merge-signal", Queue: mockQ}, + {Key: consumer.TopicKeyBatched, Name: "batched", Queue: mockQ}, + }, ) require.NoError(t, err) @@ -316,6 +353,7 @@ func TestController_Process(t *testing.T) { if tt.wantErr { require.Error(t, err) + assert.Equal(t, tt.wantRetryable, errs.IsRetryable(err), "retryability mismatch") } else { require.NoError(t, err) } @@ -323,6 +361,79 @@ func TestController_Process(t *testing.T) { } } +func TestController_Process_PartialPublishFailure(t *testing.T) { + testRequest := entity.Request{ + ID: "test-queue/123", + Queue: "test-queue", + Change: entity.Change{URIs: []string{"github://uber/service/pull/456/abc123def"}}, + LandStrategy: entity.RequestLandStrategyRebase, + State: entity.RequestStateProcessing, + Version: 1, + } + + testBatch := entity.Batch{ + ID: "test-queue/batch/1", + Queue: "test-queue", + Contains: []string{"test-queue/123"}, + State: entity.BatchStateSpeculating, + Version: 1, + } + + ctrl := gomock.NewController(t) + + lp := landprovidermock.NewMockLandProvider(ctrl) + lp.EXPECT().Land(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil) + + mockReqStore := storagemock.NewMockRequestStore(ctrl) + mockReqStore.EXPECT().Get(gomock.Any(), "test-queue/123").Return(testRequest, nil) + + mockBatchStore := storagemock.NewMockBatchStore(ctrl) + mockBatchStore.EXPECT().Get(gomock.Any(), "test-queue/batch/1").Return(testBatch, nil) + mockBatchStore.EXPECT().UpdateState(gomock.Any(), "test-queue/batch/1", int32(1), entity.BatchStateSucceeded).Return(nil) + + mockStorage := storagemock.NewMockStorage(ctrl) + mockStorage.EXPECT().GetRequestStore().Return(mockReqStore) + mockStorage.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes() + + // First publish succeeds, second fails. + var publishCount atomic.Int32 + mockPub := queuemock.NewMockPublisher(ctrl) + mockPub.EXPECT().Publish(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( + func(ctx context.Context, topic string, msg queue.Message) error { + if publishCount.Add(1) == 2 { + return fmt.Errorf("second publish failed") + } + return nil + }, + ).AnyTimes() + + mockQ := queuemock.NewMockQueue(ctrl) + mockQ.EXPECT().Publisher().Return(mockPub).AnyTimes() + + logger := zaptest.NewLogger(t).Sugar() + scope := tally.NoopScope + + registry, err := consumer.NewTopicRegistry( + []consumer.TopicConfig{ + {Key: consumer.TopicKeyMergeSignal, Name: "merge-signal", Queue: mockQ}, + {Key: consumer.TopicKeyBatched, Name: "batched", Queue: mockQ}, + }, + ) + require.NoError(t, err) + + controller := NewController(logger, scope, registry, lp, mockStorage, consumer.TopicKeyToMerge, "orchestrator-merge") + + msg := queue.NewMessage("test-msg-id", []byte("test-queue/batch/1"), "test-queue", nil) + delivery := queuemock.NewMockDelivery(ctrl) + delivery.EXPECT().Message().Return(msg).AnyTimes() + delivery.EXPECT().Attempt().Return(1).AnyTimes() + + err = controller.Process(context.Background(), delivery) + + require.Error(t, err) + assert.True(t, errs.IsRetryable(err), "partial publish failure should be retryable") +} + func TestController_InterfaceImplementation(t *testing.T) { ctrl := gomock.NewController(t) lp := landprovidermock.NewMockLandProvider(ctrl) @@ -331,7 +442,10 @@ func TestController_InterfaceImplementation(t *testing.T) { logger := zaptest.NewLogger(t).Sugar() scope := tally.NoopScope registry, err := consumer.NewTopicRegistry( - []consumer.TopicConfig{{Key: consumer.TopicKeyMergeSignal, Name: "merge-signal", Queue: queuemock.NewMockQueue(ctrl)}}, + []consumer.TopicConfig{ + {Key: consumer.TopicKeyMergeSignal, Name: "merge-signal", Queue: queuemock.NewMockQueue(ctrl)}, + {Key: consumer.TopicKeyBatched, Name: "batched", Queue: queuemock.NewMockQueue(ctrl)}, + }, ) require.NoError(t, err) From f9a2c00d6e90947422a563ea2b97d583f0090a0d Mon Sep 17 00:00:00 2001 From: "kevin.new" Date: Mon, 2 Mar 2026 21:09:27 +0000 Subject: [PATCH 5/5] add request ids to logging in happy path --- orchestrator/controller/merge/merge.go | 1 + 1 file changed, 1 insertion(+) diff --git a/orchestrator/controller/merge/merge.go b/orchestrator/controller/merge/merge.go index 7e82520f..1725e1b7 100644 --- a/orchestrator/controller/merge/merge.go +++ b/orchestrator/controller/merge/merge.go @@ -172,6 +172,7 @@ func (c *Controller) land(ctx context.Context, batch entity.Batch) (entity.Batch case err == nil, landprovider.IsAlreadyLanded(err): c.logger.Infow("land succeeded", "batch_id", batch.ID, + "request_ids", batch.Contains, "already_landed", landprovider.IsAlreadyLanded(err), ) return entity.BatchStateSucceeded, nil