From df4b39c0990c97e134df6be551f91175368e5495 Mon Sep 17 00:00:00 2001 From: manjari Date: Tue, 3 Mar 2026 18:12:34 +0000 Subject: [PATCH] feat(controller) Move request store insert from gateway to orchestrator --- example/server/gateway/BUILD.bazel | 1 - example/server/gateway/main.go | 9 +- example/server/orchestrator/main.go | 1 + gateway/controller/BUILD.bazel | 2 - gateway/controller/land.go | 12 +-- gateway/controller/land_test.go | 97 ++----------------- orchestrator/controller/request/BUILD.bazel | 3 + orchestrator/controller/request/request.go | 17 +++- .../controller/request/request_test.go | 94 ++++++++++++++++-- 9 files changed, 120 insertions(+), 116 deletions(-) diff --git a/example/server/gateway/BUILD.bazel b/example/server/gateway/BUILD.bazel index 73027d30..c8a55621 100644 --- a/example/server/gateway/BUILD.bazel +++ b/example/server/gateway/BUILD.bazel @@ -13,7 +13,6 @@ go_library( deps = [ "//extension/counter/mysql", "//extension/queue/mysql", - "//extension/storage/mysql", "//gateway/controller", "//gateway/protopb", "@com_github_go_sql_driver_mysql//:mysql", diff --git a/example/server/gateway/main.go b/example/server/gateway/main.go index 9a7c6dbf..48eec7ba 100644 --- a/example/server/gateway/main.go +++ b/example/server/gateway/main.go @@ -15,7 +15,6 @@ import ( "github.com/uber-go/tally/v4" mysqlcounter "github.com/uber/submitqueue/extension/counter/mysql" queueMySQL "github.com/uber/submitqueue/extension/queue/mysql" - "github.com/uber/submitqueue/extension/storage/mysql" "github.com/uber/submitqueue/gateway/controller" pb "github.com/uber/submitqueue/gateway/protopb" "go.uber.org/zap" @@ -102,11 +101,7 @@ func run() error { } defer appDB.Close() - // Initialize storage and counter from shared app database connection - store, err := mysql.NewStorage(appDB) - if err != nil { - return fmt.Errorf("failed to create storage: %w", err) - } + // Initialize counter from shared app database connection cnt := mysqlcounter.NewCounter(appDB) // Open queue database connection @@ -141,7 +136,7 @@ func run() error { // Create controllers and wrap them for gRPC pingController := controller.NewPingController(logger, scope) - landController := controller.NewLandController(logger.Sugar(), scope, store, cnt, mysqlQueue.Publisher(), "request") + landController := controller.NewLandController(logger.Sugar(), scope, cnt, mysqlQueue.Publisher(), "request") gatewayServer := &GatewayServer{ pingController: pingController, landController: landController, diff --git a/example/server/orchestrator/main.go b/example/server/orchestrator/main.go index b1a48b7f..31158867 100644 --- a/example/server/orchestrator/main.go +++ b/example/server/orchestrator/main.go @@ -314,6 +314,7 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t requestController := request.NewController( logger, scope, + store, registry, consumer.TopicKeyRequest, "orchestrator-request", diff --git a/gateway/controller/BUILD.bazel b/gateway/controller/BUILD.bazel index 007fe5fd..5329be0e 100644 --- a/gateway/controller/BUILD.bazel +++ b/gateway/controller/BUILD.bazel @@ -14,7 +14,6 @@ go_library( "//entity/queue", "//extension/counter", "//extension/queue", - "//extension/storage", "//gateway/protopb", "@com_github_uber_go_tally_v4//:tally", "@org_uber_go_zap//:zap", @@ -33,7 +32,6 @@ go_test( "//entity/queue", "//extension/counter/mock", "//extension/queue/mock", - "//extension/storage/mock", "//gateway/protopb", "@com_github_stretchr_testify//assert", "@com_github_stretchr_testify//require", diff --git a/gateway/controller/land.go b/gateway/controller/land.go index 7ecee3c4..fe5d861d 100644 --- a/gateway/controller/land.go +++ b/gateway/controller/land.go @@ -12,7 +12,6 @@ import ( "github.com/uber/submitqueue/entity/queue" "github.com/uber/submitqueue/extension/counter" extqueue "github.com/uber/submitqueue/extension/queue" - "github.com/uber/submitqueue/extension/storage" pb "github.com/uber/submitqueue/gateway/protopb" "go.uber.org/zap" ) @@ -30,7 +29,6 @@ func IsInvalidRequest(err error) bool { type LandController struct { logger *zap.SugaredLogger metricsScope tally.Scope - store storage.Storage counter counter.Counter publisher extqueue.Publisher topic string // Topic to publish requests to (e.g., "request", "land_request") @@ -38,11 +36,10 @@ type LandController struct { // NewLandController creates a new instance of the gateway land controller. // topic: the queue topic to publish requests to (e.g., "request", "land_request") -func NewLandController(logger *zap.SugaredLogger, scope tally.Scope, store storage.Storage, counter counter.Counter, publisher extqueue.Publisher, topic string) *LandController { +func NewLandController(logger *zap.SugaredLogger, scope tally.Scope, counter counter.Counter, publisher extqueue.Publisher, topic string) *LandController { return &LandController{ logger: logger, metricsScope: scope, - store: store, counter: counter, publisher: publisher, topic: topic, @@ -94,10 +91,6 @@ func (c *LandController) Land(ctx context.Context, req *pb.LandRequest) (*pb.Lan Version: 1, } - if err := c.store.GetRequestStore().Create(ctx, request); err != nil { - return nil, fmt.Errorf("LandController failed to create request for queue=%s: %w", req.Queue, err) - } - c.logger.Debugw("land request created", "queue", req.Queue, "sqid", request.ID, @@ -106,6 +99,9 @@ func (c *LandController) Land(ctx context.Context, req *pb.LandRequest) (*pb.Lan "strategy", string(strategy), ) + // TODO: Insert the created request to the + // event store + // Publish to queue for async processing if err := c.publishToQueue(ctx, request); err != nil { c.logger.Errorw("failed to publish request to queue", diff --git a/gateway/controller/land_test.go b/gateway/controller/land_test.go index cd014363..1d930909 100644 --- a/gateway/controller/land_test.go +++ b/gateway/controller/land_test.go @@ -12,7 +12,6 @@ import ( "github.com/uber/submitqueue/entity/queue" countermock "github.com/uber/submitqueue/extension/counter/mock" queuemock "github.com/uber/submitqueue/extension/queue/mock" - storagemock "github.com/uber/submitqueue/extension/storage/mock" pb "github.com/uber/submitqueue/gateway/protopb" "go.uber.org/mock/gomock" "go.uber.org/zap" @@ -27,23 +26,18 @@ func noopPublisher(ctrl *gomock.Controller) *queuemock.MockPublisher { func TestNewLandController(t *testing.T) { ctrl := gomock.NewController(t) - store := storagemock.NewMockStorage(ctrl) cnt := countermock.NewMockCounter(ctrl) - controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, store, cnt, noopPublisher(ctrl), "request") + controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, noopPublisher(ctrl), "request") require.NotNil(t, controller) } func TestLand_ReturnsSqid(t *testing.T) { ctrl := gomock.NewController(t) - mockReqStore := storagemock.NewMockRequestStore(ctrl) - mockReqStore.EXPECT().Create(gomock.Any(), gomock.Any()).Return(nil) - store := storagemock.NewMockStorage(ctrl) - store.EXPECT().GetRequestStore().Return(mockReqStore).AnyTimes() cnt := countermock.NewMockCounter(ctrl) cnt.EXPECT().Next(gomock.Any(), gomock.Any()).Return(int64(1), nil) - controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, store, cnt, noopPublisher(ctrl), "request") + controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, noopPublisher(ctrl), "request") ctx := context.Background() req := &pb.LandRequest{ @@ -56,70 +50,12 @@ func TestLand_ReturnsSqid(t *testing.T) { assert.Equal(t, "test-queue/1", resp.Sqid) } -func TestLand_PassesCorrectParametersToStore(t *testing.T) { - var capturedRequest entity.Request - - ctrl := gomock.NewController(t) - mockReqStore := storagemock.NewMockRequestStore(ctrl) - mockReqStore.EXPECT().Create(gomock.Any(), gomock.Any()).DoAndReturn( - func(ctx context.Context, request entity.Request) error { - capturedRequest = request - return nil - }, - ) - store := storagemock.NewMockStorage(ctrl) - store.EXPECT().GetRequestStore().Return(mockReqStore).AnyTimes() - - cnt := countermock.NewMockCounter(ctrl) - cnt.EXPECT().Next(gomock.Any(), gomock.Any()).Return(int64(42), nil) - controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, store, cnt, noopPublisher(ctrl), "request") - ctx := context.Background() - - req := &pb.LandRequest{ - Queue: "my-queue", - Change: &pb.Change{Uris: []string{"github://uber/myservice/pull/1/abc111", "github://uber/myservice/pull/2/def222"}}, - Strategy: pb.Strategy_REBASE, - } - resp, err := controller.Land(ctx, req) - - require.NoError(t, err) - assert.Equal(t, "my-queue/42", capturedRequest.ID) - assert.Equal(t, "my-queue", capturedRequest.Queue) - assert.Equal(t, []string{"github://uber/myservice/pull/1/abc111", "github://uber/myservice/pull/2/def222"}, capturedRequest.Change.URIs) - assert.Equal(t, entity.RequestLandStrategyRebase, capturedRequest.LandStrategy) - assert.Equal(t, entity.RequestStateNew, capturedRequest.State) - assert.Equal(t, int32(1), capturedRequest.Version) - assert.Equal(t, "my-queue/42", resp.Sqid) -} - -func TestLand_ReturnsErrorOnStorageFailure(t *testing.T) { - ctrl := gomock.NewController(t) - mockReqStore := storagemock.NewMockRequestStore(ctrl) - mockReqStore.EXPECT().Create(gomock.Any(), gomock.Any()).Return(fmt.Errorf("database connection failed")) - store := storagemock.NewMockStorage(ctrl) - store.EXPECT().GetRequestStore().Return(mockReqStore).AnyTimes() - - cnt := countermock.NewMockCounter(ctrl) - cnt.EXPECT().Next(gomock.Any(), gomock.Any()).Return(int64(1), nil) - controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, store, cnt, noopPublisher(ctrl), "request") - ctx := context.Background() - - req := &pb.LandRequest{ - Queue: "test-queue", - Change: &pb.Change{Uris: []string{"github://uber/test-repo/pull/123/abc123def"}}, - } - _, err := controller.Land(ctx, req) - - require.Error(t, err) -} - func TestLand_ReturnsErrorOnCounterFailure(t *testing.T) { ctrl := gomock.NewController(t) - store := storagemock.NewMockStorage(ctrl) cnt := countermock.NewMockCounter(ctrl) cnt.EXPECT().Next(gomock.Any(), gomock.Any()).Return(int64(0), fmt.Errorf("counter unavailable")) - controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, store, cnt, noopPublisher(ctrl), "request") + controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, noopPublisher(ctrl), "request") ctx := context.Background() req := &pb.LandRequest{ @@ -135,10 +71,6 @@ func TestLand_CounterDomainIncludesQueue(t *testing.T) { var capturedDomain string ctrl := gomock.NewController(t) - mockReqStore := storagemock.NewMockRequestStore(ctrl) - mockReqStore.EXPECT().Create(gomock.Any(), gomock.Any()).Return(nil) - store := storagemock.NewMockStorage(ctrl) - store.EXPECT().GetRequestStore().Return(mockReqStore).AnyTimes() cnt := countermock.NewMockCounter(ctrl) cnt.EXPECT().Next(gomock.Any(), gomock.Any()).DoAndReturn( @@ -147,7 +79,7 @@ func TestLand_CounterDomainIncludesQueue(t *testing.T) { return 1, nil }, ) - controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, store, cnt, noopPublisher(ctrl), "request") + controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, noopPublisher(ctrl), "request") ctx := context.Background() req := &pb.LandRequest{ @@ -162,10 +94,9 @@ func TestLand_CounterDomainIncludesQueue(t *testing.T) { func TestLand_ReturnsErrorOnEmptyQueue(t *testing.T) { ctrl := gomock.NewController(t) - store := storagemock.NewMockStorage(ctrl) cnt := countermock.NewMockCounter(ctrl) - controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, store, cnt, noopPublisher(ctrl), "request") + controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, noopPublisher(ctrl), "request") ctx := context.Background() req := &pb.LandRequest{ @@ -180,10 +111,9 @@ func TestLand_ReturnsErrorOnEmptyQueue(t *testing.T) { func TestLand_ReturnsErrorOnEmptyChangeUri(t *testing.T) { ctrl := gomock.NewController(t) - store := storagemock.NewMockStorage(ctrl) cnt := countermock.NewMockCounter(ctrl) - controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, store, cnt, noopPublisher(ctrl), "request") + controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, noopPublisher(ctrl), "request") ctx := context.Background() req := &pb.LandRequest{ @@ -198,10 +128,9 @@ func TestLand_ReturnsErrorOnEmptyChangeUri(t *testing.T) { func TestLand_ReturnsErrorOnNilChange(t *testing.T) { ctrl := gomock.NewController(t) - store := storagemock.NewMockStorage(ctrl) cnt := countermock.NewMockCounter(ctrl) - controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, store, cnt, noopPublisher(ctrl), "request") + controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, noopPublisher(ctrl), "request") ctx := context.Background() req := &pb.LandRequest{ @@ -219,10 +148,6 @@ func TestLand_PublishesToQueue(t *testing.T) { var publishedMessage queue.Message ctrl := gomock.NewController(t) - mockReqStore := storagemock.NewMockRequestStore(ctrl) - mockReqStore.EXPECT().Create(gomock.Any(), gomock.Any()).Return(nil) - store := storagemock.NewMockStorage(ctrl) - store.EXPECT().GetRequestStore().Return(mockReqStore).AnyTimes() cnt := countermock.NewMockCounter(ctrl) cnt.EXPECT().Next(gomock.Any(), gomock.Any()).Return(int64(123), nil) @@ -235,7 +160,7 @@ func TestLand_PublishesToQueue(t *testing.T) { }, ) - controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, store, cnt, publisher, "request") + controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, publisher, "request") ctx := context.Background() req := &pb.LandRequest{ @@ -266,17 +191,13 @@ func TestLand_PublishesToQueue(t *testing.T) { func TestLand_ContinuesWhenPublishFails(t *testing.T) { ctrl := gomock.NewController(t) - mockReqStore := storagemock.NewMockRequestStore(ctrl) - mockReqStore.EXPECT().Create(gomock.Any(), gomock.Any()).Return(nil) - store := storagemock.NewMockStorage(ctrl) - store.EXPECT().GetRequestStore().Return(mockReqStore).AnyTimes() cnt := countermock.NewMockCounter(ctrl) cnt.EXPECT().Next(gomock.Any(), gomock.Any()).Return(int64(999), nil) publisher := queuemock.NewMockPublisher(ctrl) publisher.EXPECT().Publish(gomock.Any(), gomock.Any(), gomock.Any()).Return(fmt.Errorf("queue unavailable")) - controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, store, cnt, publisher, "request") + controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, publisher, "request") ctx := context.Background() req := &pb.LandRequest{ diff --git a/orchestrator/controller/request/BUILD.bazel b/orchestrator/controller/request/BUILD.bazel index 7fd2d24b..d6855766 100644 --- a/orchestrator/controller/request/BUILD.bazel +++ b/orchestrator/controller/request/BUILD.bazel @@ -10,6 +10,7 @@ go_library( "//core/errs", "//entity", "//entity/queue", + "//extension/storage", "@com_github_uber_go_tally_v4//:tally", "@org_uber_go_zap//:zap", ], @@ -25,6 +26,8 @@ go_test( "//entity", "//entity/queue", "//extension/queue/mock", + "//extension/storage", + "//extension/storage/mock", "@com_github_stretchr_testify//assert", "@com_github_stretchr_testify//require", "@com_github_uber_go_tally_v4//:tally", diff --git a/orchestrator/controller/request/request.go b/orchestrator/controller/request/request.go index e2ca885f..550d40c9 100644 --- a/orchestrator/controller/request/request.go +++ b/orchestrator/controller/request/request.go @@ -2,6 +2,7 @@ package request import ( "context" + "errors" "fmt" "github.com/uber-go/tally/v4" @@ -9,15 +10,17 @@ import ( "github.com/uber/submitqueue/core/errs" "github.com/uber/submitqueue/entity" entityqueue "github.com/uber/submitqueue/entity/queue" + "github.com/uber/submitqueue/extension/storage" "go.uber.org/zap" ) // Controller handles request queue messages. -// It consumes requests and publishes to the validate stage. +// It consumes requests, persists them to storage, and publishes to the validate stage. // Implements consumer.Controller interface for integration with the consumer. type Controller struct { logger *zap.SugaredLogger metricsScope tally.Scope + store storage.Storage registry consumer.TopicRegistry topicKey consumer.TopicKey consumerGroup string @@ -30,6 +33,7 @@ var _ consumer.Controller = (*Controller)(nil) func NewController( logger *zap.SugaredLogger, scope tally.Scope, + store storage.Storage, registry consumer.TopicRegistry, topicKey consumer.TopicKey, consumerGroup string, @@ -37,6 +41,7 @@ func NewController( return &Controller{ logger: logger.Named("request_controller"), metricsScope: scope.SubScope("request_controller"), + store: store, registry: registry, topicKey: topicKey, consumerGroup: consumerGroup, @@ -77,6 +82,16 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er "partition_key", msg.PartitionKey, ) + // Persist request to storage (idempotent — ErrAlreadyExists means a retry) + if err := c.store.GetRequestStore().Create(ctx, request); err != nil && !errors.Is(err, storage.ErrAlreadyExists) { + c.logger.Errorw("failed to create request in storage", + "request_id", request.ID, + "error", err, + ) + c.metricsScope.Counter("storage_errors").Inc(1) + return errs.NewRetryableError(fmt.Errorf("failed to create request: %w", err)) + } + // Publish to validate topic if err := c.publish(ctx, consumer.TopicKeyValidate, request); err != nil { c.logger.Errorw("failed to publish output", diff --git a/orchestrator/controller/request/request_test.go b/orchestrator/controller/request/request_test.go index b0098e2e..9bf28501 100644 --- a/orchestrator/controller/request/request_test.go +++ b/orchestrator/controller/request/request_test.go @@ -13,12 +13,14 @@ import ( "github.com/uber/submitqueue/entity" "github.com/uber/submitqueue/entity/queue" queuemock "github.com/uber/submitqueue/extension/queue/mock" + "github.com/uber/submitqueue/extension/storage" + storagemock "github.com/uber/submitqueue/extension/storage/mock" "go.uber.org/mock/gomock" "go.uber.org/zap/zaptest" ) // newTestController creates a controller with test dependencies. -func newTestController(t *testing.T, ctrl *gomock.Controller, publishErr error) *Controller { +func newTestController(t *testing.T, ctrl *gomock.Controller, store *storagemock.MockStorage, publishErr error) *Controller { logger := zaptest.NewLogger(t).Sugar() scope := tally.NoopScope @@ -37,12 +39,22 @@ func newTestController(t *testing.T, ctrl *gomock.Controller, publishErr error) ) require.NoError(t, err) - return NewController(logger, scope, registry, consumer.TopicKeyRequest, "orchestrator-request") + return NewController(logger, scope, store, registry, consumer.TopicKeyRequest, "orchestrator-request") +} + +// newMockStorage creates a MockStorage with a MockRequestStore that succeeds on Create. +func newMockStorage(ctrl *gomock.Controller) *storagemock.MockStorage { + mockReqStore := storagemock.NewMockRequestStore(ctrl) + mockReqStore.EXPECT().Create(gomock.Any(), gomock.Any()).Return(nil).AnyTimes() + + store := storagemock.NewMockStorage(ctrl) + store.EXPECT().GetRequestStore().Return(mockReqStore).AnyTimes() + return store } func TestNewController(t *testing.T) { ctrl := gomock.NewController(t) - controller := newTestController(t, ctrl, nil) + controller := newTestController(t, ctrl, newMockStorage(ctrl), nil) require.NotNil(t, controller) assert.Equal(t, consumer.TopicKeyRequest, controller.TopicKey()) @@ -53,7 +65,7 @@ func TestNewController(t *testing.T) { func TestController_Process_Success(t *testing.T) { ctrl := gomock.NewController(t) - controller := newTestController(t, ctrl, nil) + controller := newTestController(t, ctrl, newMockStorage(ctrl), nil) request := entity.Request{ ID: "test-queue/123", @@ -79,7 +91,7 @@ func TestController_Process_Success(t *testing.T) { func TestController_Process_InvalidJSON(t *testing.T) { ctrl := gomock.NewController(t) - controller := newTestController(t, ctrl, nil) + controller := newTestController(t, ctrl, newMockStorage(ctrl), nil) invalidPayload := []byte(`{"invalid": json"}`) msg := queue.NewMessage("invalid-msg", invalidPayload, "partition1", nil) @@ -111,7 +123,7 @@ func TestController_Process_AllRequestStates(t *testing.T) { t.Run(tt.name, func(t *testing.T) { ctrl := gomock.NewController(t) - controller := newTestController(t, ctrl, nil) + controller := newTestController(t, ctrl, newMockStorage(ctrl), nil) request := entity.Request{ ID: fmt.Sprintf("queue/%s", tt.state), @@ -139,7 +151,7 @@ func TestController_Process_AllRequestStates(t *testing.T) { func TestController_Process_MultipleChanges(t *testing.T) { ctrl := gomock.NewController(t) - controller := newTestController(t, ctrl, nil) + controller := newTestController(t, ctrl, newMockStorage(ctrl), nil) request := entity.Request{ ID: "queue/999", @@ -171,7 +183,7 @@ func TestController_Process_MultipleChanges(t *testing.T) { func TestController_Process_PublishFailure(t *testing.T) { ctrl := gomock.NewController(t) - controller := newTestController(t, ctrl, fmt.Errorf("publish failed")) + controller := newTestController(t, ctrl, newMockStorage(ctrl), fmt.Errorf("publish failed")) request := entity.Request{ ID: "test-queue/123", @@ -194,9 +206,73 @@ func TestController_Process_PublishFailure(t *testing.T) { assert.Error(t, err) } +func TestController_Process_StorageFailure(t *testing.T) { + ctrl := gomock.NewController(t) + + mockReqStore := storagemock.NewMockRequestStore(ctrl) + mockReqStore.EXPECT().Create(gomock.Any(), gomock.Any()).Return(fmt.Errorf("database connection failed")) + store := storagemock.NewMockStorage(ctrl) + store.EXPECT().GetRequestStore().Return(mockReqStore).AnyTimes() + + controller := newTestController(t, ctrl, store, nil) + + request := entity.Request{ + ID: "test-queue/123", + Queue: "test-queue", + Change: entity.Change{URIs: []string{"github://uber/service/pull/1/xyz789abc"}}, + LandStrategy: entity.RequestLandStrategyRebase, + State: entity.RequestStateNew, + Version: 1, + } + + payload, err := request.ToBytes() + require.NoError(t, err) + + msg := queue.NewMessage(request.ID, payload, request.Queue, nil) + delivery := queuemock.NewMockDelivery(ctrl) + delivery.EXPECT().Message().Return(msg).AnyTimes() + delivery.EXPECT().Attempt().Return(1).AnyTimes() + + err = controller.Process(context.Background(), delivery) + require.Error(t, err) + assert.True(t, errs.IsRetryable(err)) +} + +func TestController_Process_AlreadyExistsSucceeds(t *testing.T) { + ctrl := gomock.NewController(t) + + mockReqStore := storagemock.NewMockRequestStore(ctrl) + mockReqStore.EXPECT().Create(gomock.Any(), gomock.Any()).Return(fmt.Errorf("duplicate: %w", storage.ErrAlreadyExists)) + store := storagemock.NewMockStorage(ctrl) + store.EXPECT().GetRequestStore().Return(mockReqStore).AnyTimes() + + controller := newTestController(t, ctrl, store, nil) + + request := entity.Request{ + ID: "test-queue/123", + Queue: "test-queue", + Change: entity.Change{URIs: []string{"github://uber/service/pull/1/xyz789abc"}}, + LandStrategy: entity.RequestLandStrategyRebase, + State: entity.RequestStateNew, + Version: 1, + } + + payload, err := request.ToBytes() + require.NoError(t, err) + + msg := queue.NewMessage(request.ID, payload, request.Queue, nil) + delivery := queuemock.NewMockDelivery(ctrl) + delivery.EXPECT().Message().Return(msg).AnyTimes() + delivery.EXPECT().Attempt().Return(1).AnyTimes() + + // Should succeed even though Create returns ErrAlreadyExists (idempotent) + err = controller.Process(context.Background(), delivery) + require.NoError(t, err) +} + func TestController_InterfaceImplementation(t *testing.T) { ctrl := gomock.NewController(t) - controller := newTestController(t, ctrl, nil) + controller := newTestController(t, ctrl, newMockStorage(ctrl), nil) var _ consumer.Controller = controller }