From 316aa7df226e404369757fedb445a3fc2374a3e2 Mon Sep 17 00:00:00 2001 From: sergeyb Date: Tue, 10 Mar 2026 21:46:20 +0000 Subject: [PATCH] refactor(id): Let queue operate on IDs and controller on database Instead of serializing all fo the Request or Batch entity to the queue, only serialize the appropriate ID. Let controller to get the latest state of the entity from the database directly. This prevents stale entities and improves optimistic concurrency timing. --- entity/batch.go | 18 +++ entity/request.go | 18 +++ example/server/orchestrator/main.go | 6 + orchestrator/controller/batch/BUILD.bazel | 1 - orchestrator/controller/batch/batch.go | 41 ++++-- orchestrator/controller/batch/batch_test.go | 126 +++++++++--------- orchestrator/controller/build/BUILD.bazel | 2 + orchestrator/controller/build/build.go | 18 ++- orchestrator/controller/build/build_test.go | 89 ++++++++----- .../controller/buildsignal/BUILD.bazel | 2 + .../controller/buildsignal/buildsignal.go | 26 ++-- .../buildsignal/buildsignal_test.go | 33 ++++- orchestrator/controller/conclude/conclude.go | 14 +- .../controller/conclude/conclude_test.go | 111 +++++++++++++-- orchestrator/controller/merge/BUILD.bazel | 2 + orchestrator/controller/merge/merge.go | 33 +++-- orchestrator/controller/merge/merge_test.go | 89 ++++++++----- orchestrator/controller/score/BUILD.bazel | 2 + orchestrator/controller/score/score.go | 31 +++-- orchestrator/controller/score/score_test.go | 89 ++++++++----- orchestrator/controller/speculate/BUILD.bazel | 2 + .../controller/speculate/speculate.go | 33 +++-- .../controller/speculate/speculate_test.go | 89 ++++++++----- orchestrator/controller/start/start.go | 13 +- orchestrator/controller/validate/BUILD.bazel | 2 + orchestrator/controller/validate/validate.go | 31 +++-- .../controller/validate/validate_test.go | 90 ++++++++----- 27 files changed, 678 insertions(+), 333 deletions(-) diff --git a/entity/batch.go b/entity/batch.go index f68bcdcb..5d80c3a8 100644 --- a/entity/batch.go +++ b/entity/batch.go @@ -89,3 +89,21 @@ func BatchFromBytes(data []byte) (Batch, error) { err := json.Unmarshal(data, &batch) return batch, err } + +// BatchID is a lightweight entity for publishing and consuming just the batch identifier via the queue. +type BatchID struct { + // ID is the globally unique identifier for the batch. + ID string `json:"id"` +} + +// ToBytes serializes the BatchID to JSON bytes for queue message payload. +func (b BatchID) ToBytes() ([]byte, error) { + return json.Marshal(b) +} + +// BatchIDFromBytes deserializes a BatchID from JSON bytes. +func BatchIDFromBytes(data []byte) (BatchID, error) { + var bid BatchID + err := json.Unmarshal(data, &bid) + return bid, err +} diff --git a/entity/request.go b/entity/request.go index 5054932b..3736bec9 100644 --- a/entity/request.go +++ b/entity/request.go @@ -104,3 +104,21 @@ func RequestFromBytes(data []byte) (Request, error) { err := json.Unmarshal(data, &req) return req, err } + +// RequestID is a lightweight entity for publishing and consuming just the request identifier via the queue. +type RequestID struct { + // ID is the globally unique identifier for the land request. + ID string `json:"id"` +} + +// ToBytes serializes the RequestID to JSON bytes for queue message payload. +func (r RequestID) ToBytes() ([]byte, error) { + return json.Marshal(r) +} + +// RequestIDFromBytes deserializes a RequestID from JSON bytes. +func RequestIDFromBytes(data []byte) (RequestID, error) { + var rid RequestID + err := json.Unmarshal(data, &rid) + return rid, err +} diff --git a/example/server/orchestrator/main.go b/example/server/orchestrator/main.go index 01f45cc4..25f13364 100644 --- a/example/server/orchestrator/main.go +++ b/example/server/orchestrator/main.go @@ -390,6 +390,7 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t validateController := validate.NewController( logger, scope, + store, registry, mc, consumer.TopicKeyValidate, @@ -415,6 +416,7 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t scoreController := score.NewController( logger, scope, + store, registry, consumer.TopicKeyScore, "orchestrator-score", @@ -426,6 +428,7 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t speculateController := speculate.NewController( logger, scope, + store, registry, consumer.TopicKeySpeculate, "orchestrator-speculate", @@ -437,6 +440,7 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t buildController := build.NewController( logger, scope, + store, registry, consumer.TopicKeyBuild, "orchestrator-build", @@ -448,6 +452,7 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t buildsignalController := buildsignal.NewController( logger, scope, + store, registry, consumer.TopicKeyBuildSignal, "orchestrator-buildsignal", @@ -459,6 +464,7 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t mergeController := merge.NewController( logger, scope, + store, registry, consumer.TopicKeyMerge, "orchestrator-merge", diff --git a/orchestrator/controller/batch/BUILD.bazel b/orchestrator/controller/batch/BUILD.bazel index 5138cc41..1633aa71 100644 --- a/orchestrator/controller/batch/BUILD.bazel +++ b/orchestrator/controller/batch/BUILD.bazel @@ -22,7 +22,6 @@ go_test( embed = [":batch"], deps = [ "//core/consumer", - "//core/errs", "//entity", "//entity/queue", "//extension/counter/mock", diff --git a/orchestrator/controller/batch/batch.go b/orchestrator/controller/batch/batch.go index 60767ba3..803de2db 100644 --- a/orchestrator/controller/batch/batch.go +++ b/orchestrator/controller/batch/batch.go @@ -16,6 +16,7 @@ package batch import ( "context" + "errors" "fmt" "github.com/uber-go/tally/v4" @@ -72,12 +73,18 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er msg := delivery.Message() - // Deserialize request entity - request, err := entity.RequestFromBytes(msg.Payload) + // Deserialize request ID from payload + rid, err := entity.RequestIDFromBytes(msg.Payload) if err != nil { c.metricsScope.Counter("deserialize_errors").Inc(1) - // Non-retryable: malformed messages will never succeed regardless of retry count - return fmt.Errorf("failed to deserialize request: %w", err) + return fmt.Errorf("failed to deserialize request ID: %w", err) + } + + // Fetch request from storage + request, err := c.store.GetRequestStore().Get(ctx, rid.ID) + if err != nil { + c.metricsScope.Counter("storage_errors").Inc(1) + return fmt.Errorf("failed to get request %s: %w", rid.ID, err) } c.logger.Infow("received batch event", @@ -139,11 +146,18 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er if err == nil { bd.Dependents = append(existing.Dependents, bd.Dependents...) } + + if err := c.store.GetBatchDependentStore().Create(ctx, bd); err != nil && !errors.Is(err, storage.ErrAlreadyExists) { + c.metricsScope.Counter("batch_dependent_store_errors").Inc(1) + return fmt.Errorf("failed to create batch dependent for batchID=%s: %w", dep.ID, err) + } } - // TODO: - // - Add batch to DB - // - Add to batch dependent DB + // Persist batch to storage (idempotent — ErrAlreadyExists means a retry) + if err := c.store.GetBatchStore().Create(ctx, batch); err != nil && !errors.Is(err, storage.ErrAlreadyExists) { + c.metricsScope.Counter("batch_store_errors").Inc(1) + return fmt.Errorf("failed to create batch: %w", err) + } c.logger.Infow("batch created", "batch_id", batch.ID, @@ -153,7 +167,7 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er ) // Publish to score topic - if err := c.publish(ctx, consumer.TopicKeyScore, batch); err != nil { + if err := c.publish(ctx, consumer.TopicKeyScore, batch.ID, batch.Queue); err != nil { c.metricsScope.Counter("publish_errors").Inc(1) return fmt.Errorf("failed to publish to score: %w", err) } @@ -168,14 +182,15 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er return nil // Success - message will be acked } -// publish publishes a batch to the specified topic key. -func (c *Controller) publish(ctx context.Context, key consumer.TopicKey, batch entity.Batch) error { - payload, err := batch.ToBytes() +// publish publishes a batch ID to the specified topic key. +func (c *Controller) publish(ctx context.Context, key consumer.TopicKey, batchID string, partitionKey string) error { + bid := entity.BatchID{ID: batchID} + payload, err := bid.ToBytes() if err != nil { - return fmt.Errorf("failed to serialize batch: %w", err) + return fmt.Errorf("failed to serialize batch ID: %w", err) } - msg := entityqueue.NewMessage(batch.ID, payload, batch.Queue, nil) + msg := entityqueue.NewMessage(batchID, payload, partitionKey, nil) q, ok := c.registry.Queue(key) if !ok { diff --git a/orchestrator/controller/batch/batch_test.go b/orchestrator/controller/batch/batch_test.go index 88c3787d..70c47751 100644 --- a/orchestrator/controller/batch/batch_test.go +++ b/orchestrator/controller/batch/batch_test.go @@ -24,7 +24,6 @@ import ( "github.com/stretchr/testify/require" "github.com/uber-go/tally/v4" "github.com/uber/submitqueue/core/consumer" - "github.com/uber/submitqueue/core/errs" "github.com/uber/submitqueue/entity" "github.com/uber/submitqueue/entity/queue" countermock "github.com/uber/submitqueue/extension/counter/mock" @@ -35,6 +34,13 @@ import ( "go.uber.org/zap/zaptest" ) +// requestIDPayload serializes a RequestID to JSON bytes for test message payloads. +func requestIDPayload(t *testing.T, id string) []byte { + payload, err := entity.RequestID{ID: id}.ToBytes() + require.NoError(t, err) + return payload +} + // newSequentialCounter returns a mock counter that returns incrementing values starting at 1. func newSequentialCounter(ctrl *gomock.Controller) *countermock.MockCounter { var seq int64 @@ -47,6 +53,18 @@ func newSequentialCounter(ctrl *gomock.Controller) *countermock.MockCounter { return cnt } +// testRequest returns a standard test request for batch tests. +func testRequest() entity.Request { + return entity.Request{ + ID: "test-queue/123", + Queue: "test-queue", + Change: entity.Change{URIs: []string{"github://uber/service/pull/456/abc123def"}}, + LandStrategy: entity.RequestLandStrategyRebase, + State: entity.RequestStateStarted, + Version: 1, + } +} + // newTestController creates a controller with test dependencies. // If mockStorage is nil, a default MockStorage with an empty batch store is created. func newTestController(t *testing.T, ctrl *gomock.Controller, cnt *countermock.MockCounter, mockStorage *storagemock.MockStorage, publishErr error) *Controller { @@ -56,9 +74,15 @@ func newTestController(t *testing.T, ctrl *gomock.Controller, cnt *countermock.M if mockStorage == nil { mockBatchStore := storagemock.NewMockBatchStore(ctrl) mockBatchStore.EXPECT().GetByQueueAndStates(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes() + mockBatchStore.EXPECT().Create(gomock.Any(), gomock.Any()).Return(nil).AnyTimes() + + mockReqStore := storagemock.NewMockRequestStore(ctrl) + req := testRequest() + mockReqStore.EXPECT().Get(gomock.Any(), req.ID).Return(req, nil).AnyTimes() mockStorage = storagemock.NewMockStorage(ctrl) mockStorage.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes() + mockStorage.EXPECT().GetRequestStore().Return(mockReqStore).AnyTimes() } mockPub := queuemock.NewMockPublisher(ctrl) @@ -94,42 +118,34 @@ func TestController_Process_Success(t *testing.T) { controller := newTestController(t, ctrl, newSequentialCounter(ctrl), nil, nil) - request := entity.Request{ - ID: "test-queue/123", - Queue: "test-queue", - Change: entity.Change{URIs: []string{"github://uber/service/pull/456/abc123def"}}, - LandStrategy: entity.RequestLandStrategyRebase, - State: entity.RequestStateStarted, - Version: 1, - } - - payload, err := request.ToBytes() - require.NoError(t, err) - - msg := queue.NewMessage("test-queue/123", payload, "test-queue", nil) + request := testRequest() + msg := queue.NewMessage(request.ID, requestIDPayload(t, request.ID), request.Queue, nil) delivery := queuemock.NewMockDelivery(ctrl) delivery.EXPECT().Message().Return(msg).AnyTimes() delivery.EXPECT().Attempt().Return(1).AnyTimes() - err = controller.Process(context.Background(), delivery) + err := controller.Process(context.Background(), delivery) require.NoError(t, err) } -func TestController_Process_InvalidJSON(t *testing.T) { +func TestController_Process_StorageFailure(t *testing.T) { ctrl := gomock.NewController(t) - controller := newTestController(t, ctrl, newSequentialCounter(ctrl), nil, nil) + mockReqStore := storagemock.NewMockRequestStore(ctrl) + mockReqStore.EXPECT().Get(gomock.Any(), "test-queue/123").Return(entity.Request{}, fmt.Errorf("db connection lost")) + + mockStorage := storagemock.NewMockStorage(ctrl) + mockStorage.EXPECT().GetRequestStore().Return(mockReqStore).AnyTimes() + + controller := newTestController(t, ctrl, newSequentialCounter(ctrl), mockStorage, nil) - invalidPayload := []byte(`{"invalid": json"}`) - msg := queue.NewMessage("invalid-msg", invalidPayload, "partition1", nil) + msg := queue.NewMessage("test-queue/123", requestIDPayload(t, "test-queue/123"), "test-queue", nil) delivery := queuemock.NewMockDelivery(ctrl) delivery.EXPECT().Message().Return(msg).AnyTimes() delivery.EXPECT().Attempt().Return(1).AnyTimes() err := controller.Process(context.Background(), delivery) - - require.Error(t, err) - assert.False(t, errs.IsRetryable(err)) + assert.Error(t, err) } func TestController_Process_PublishFailure(t *testing.T) { @@ -137,24 +153,13 @@ func TestController_Process_PublishFailure(t *testing.T) { controller := newTestController(t, ctrl, newSequentialCounter(ctrl), nil, fmt.Errorf("publish failed")) - request := entity.Request{ - ID: "test-queue/123", - Queue: "test-queue", - Change: entity.Change{URIs: []string{"github://uber/service/pull/1/xyz789abc"}}, - LandStrategy: entity.RequestLandStrategyRebase, - State: entity.RequestStateStarted, - Version: 1, - } - - payload, err := request.ToBytes() - require.NoError(t, err) - - msg := queue.NewMessage(request.ID, payload, request.Queue, nil) + request := testRequest() + msg := queue.NewMessage(request.ID, requestIDPayload(t, request.ID), request.Queue, nil) delivery := queuemock.NewMockDelivery(ctrl) delivery.EXPECT().Message().Return(msg).AnyTimes() delivery.EXPECT().Attempt().Return(1).AnyTimes() - err = controller.Process(context.Background(), delivery) + err := controller.Process(context.Background(), delivery) assert.Error(t, err) } @@ -165,30 +170,28 @@ func TestController_Process_CounterFailure(t *testing.T) { cnt.EXPECT().Next(gomock.Any(), gomock.Any()).Return(int64(0), fmt.Errorf("counter unavailable")) controller := newTestController(t, ctrl, cnt, nil, nil) - request := entity.Request{ - ID: "test-queue/123", - Queue: "test-queue", - Change: entity.Change{URIs: []string{"github://uber/service/pull/456/abc123def"}}, - LandStrategy: entity.RequestLandStrategyRebase, - State: entity.RequestStateStarted, - Version: 1, - } - - payload, err := request.ToBytes() - require.NoError(t, err) - - msg := queue.NewMessage(request.ID, payload, request.Queue, nil) + request := testRequest() + msg := queue.NewMessage(request.ID, requestIDPayload(t, request.ID), request.Queue, nil) delivery := queuemock.NewMockDelivery(ctrl) delivery.EXPECT().Message().Return(msg).AnyTimes() delivery.EXPECT().Attempt().Return(1).AnyTimes() - err = controller.Process(context.Background(), delivery) + err := controller.Process(context.Background(), delivery) assert.Error(t, err) } func TestController_Process_WithDependencies(t *testing.T) { ctrl := gomock.NewController(t) + request := entity.Request{ + ID: "test-queue/456", + Queue: "test-queue", + Change: entity.Change{URIs: []string{"github://uber/service/pull/789/abc123def"}}, + LandStrategy: entity.RequestLandStrategyRebase, + State: entity.RequestStateStarted, + Version: 1, + } + // Set up storage with active batches to become dependencies. activeBatches := []entity.Batch{ {ID: "test-queue/batch/1", Queue: "test-queue", State: entity.BatchStateCreated, Version: 1}, @@ -197,40 +200,35 @@ func TestController_Process_WithDependencies(t *testing.T) { mockBatchStore := storagemock.NewMockBatchStore(ctrl) mockBatchStore.EXPECT().GetByQueueAndStates(gomock.Any(), "test-queue", gomock.Any()).Return(activeBatches, nil) + mockBatchStore.EXPECT().Create(gomock.Any(), gomock.Any()).Return(nil) mockBatchDependentStore := storagemock.NewMockBatchDependentStore(ctrl) // batch/1 has no existing dependents. mockBatchDependentStore.EXPECT().Get(gomock.Any(), "test-queue/batch/1").Return(entity.BatchDependent{}, storage.ErrNotFound) + mockBatchDependentStore.EXPECT().Create(gomock.Any(), gomock.Any()).Return(nil) // batch/2 already has an existing dependent. mockBatchDependentStore.EXPECT().Get(gomock.Any(), "test-queue/batch/2").Return(entity.BatchDependent{ BatchID: "test-queue/batch/2", Dependents: []string{"test-queue/batch/99"}, }, nil) + mockBatchDependentStore.EXPECT().Create(gomock.Any(), gomock.Any()).Return(nil) + + mockReqStore := storagemock.NewMockRequestStore(ctrl) + mockReqStore.EXPECT().Get(gomock.Any(), request.ID).Return(request, nil) mockStorage := storagemock.NewMockStorage(ctrl) mockStorage.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes() mockStorage.EXPECT().GetBatchDependentStore().Return(mockBatchDependentStore).AnyTimes() + mockStorage.EXPECT().GetRequestStore().Return(mockReqStore).AnyTimes() controller := newTestController(t, ctrl, newSequentialCounter(ctrl), mockStorage, nil) - request := entity.Request{ - ID: "test-queue/456", - Queue: "test-queue", - Change: entity.Change{URIs: []string{"github://uber/service/pull/789/abc123def"}}, - LandStrategy: entity.RequestLandStrategyRebase, - State: entity.RequestStateStarted, - Version: 1, - } - - payload, err := request.ToBytes() - require.NoError(t, err) - - msg := queue.NewMessage(request.ID, payload, request.Queue, nil) + msg := queue.NewMessage(request.ID, requestIDPayload(t, request.ID), request.Queue, nil) delivery := queuemock.NewMockDelivery(ctrl) delivery.EXPECT().Message().Return(msg).AnyTimes() delivery.EXPECT().Attempt().Return(1).AnyTimes() - err = controller.Process(context.Background(), delivery) + err := controller.Process(context.Background(), delivery) require.NoError(t, err) } diff --git a/orchestrator/controller/build/BUILD.bazel b/orchestrator/controller/build/BUILD.bazel index a47bcb7e..9767bc59 100644 --- a/orchestrator/controller/build/BUILD.bazel +++ b/orchestrator/controller/build/BUILD.bazel @@ -9,6 +9,7 @@ go_library( "//core/consumer", "//entity", "//entity/queue", + "//extension/storage", "@com_github_uber_go_tally_v4//:tally", "@org_uber_go_zap//:zap", ], @@ -24,6 +25,7 @@ go_test( "//entity", "//entity/queue", "//extension/queue/mock", + "//extension/storage/mock", "@com_github_stretchr_testify//assert", "@com_github_stretchr_testify//require", "@com_github_uber_go_tally_v4//:tally", diff --git a/orchestrator/controller/build/build.go b/orchestrator/controller/build/build.go index eab2b780..8bef78fa 100644 --- a/orchestrator/controller/build/build.go +++ b/orchestrator/controller/build/build.go @@ -22,6 +22,7 @@ import ( "github.com/uber/submitqueue/core/consumer" "github.com/uber/submitqueue/entity" entityqueue "github.com/uber/submitqueue/entity/queue" + "github.com/uber/submitqueue/extension/storage" "go.uber.org/zap" ) @@ -31,6 +32,7 @@ import ( type Controller struct { logger *zap.SugaredLogger metricsScope tally.Scope + store storage.Storage registry consumer.TopicRegistry topicKey consumer.TopicKey consumerGroup string @@ -43,6 +45,7 @@ var _ consumer.Controller = (*Controller)(nil) func NewController( logger *zap.SugaredLogger, scope tally.Scope, + store storage.Storage, registry consumer.TopicRegistry, topicKey consumer.TopicKey, consumerGroup string, @@ -50,6 +53,7 @@ func NewController( return &Controller{ logger: logger.Named("build_controller"), metricsScope: scope.SubScope("build_controller"), + store: store, registry: registry, topicKey: topicKey, consumerGroup: consumerGroup, @@ -64,12 +68,18 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er msg := delivery.Message() - // Deserialize batch entity - batch, err := entity.BatchFromBytes(msg.Payload) + // Deserialize batch ID from payload + bid, err := entity.BatchIDFromBytes(msg.Payload) if err != nil { c.metricsScope.Counter("deserialize_errors").Inc(1) - // Non-retryable: malformed messages will never succeed regardless of retry count - return fmt.Errorf("failed to deserialize batch: %w", err) + return fmt.Errorf("failed to deserialize batch ID: %w", err) + } + + // Fetch batch from storage + batch, err := c.store.GetBatchStore().Get(ctx, bid.ID) + if err != nil { + c.metricsScope.Counter("storage_errors").Inc(1) + return fmt.Errorf("failed to get batch %s: %w", bid.ID, err) } c.logger.Infow("received build event", diff --git a/orchestrator/controller/build/build_test.go b/orchestrator/controller/build/build_test.go index 9edae378..0e595195 100644 --- a/orchestrator/controller/build/build_test.go +++ b/orchestrator/controller/build/build_test.go @@ -27,12 +27,40 @@ import ( "github.com/uber/submitqueue/entity" "github.com/uber/submitqueue/entity/queue" queuemock "github.com/uber/submitqueue/extension/queue/mock" + storagemock "github.com/uber/submitqueue/extension/storage/mock" "go.uber.org/mock/gomock" "go.uber.org/zap/zaptest" ) +// batchIDPayload serializes a BatchID to JSON bytes for test message payloads. +func batchIDPayload(t *testing.T, id string) []byte { + payload, err := entity.BatchID{ID: id}.ToBytes() + require.NoError(t, err) + return payload +} + +// testBatch returns a standard test batch for build tests. +func testBatch() entity.Batch { + return entity.Batch{ + ID: "test-queue/batch/1", + Queue: "test-queue", + State: entity.BatchStateCreated, + Version: 1, + } +} + +// newMockStorage creates a MockStorage with a MockBatchStore that returns the given batch on Get. +func newMockStorage(ctrl *gomock.Controller, batch entity.Batch) *storagemock.MockStorage { + mockBatchStore := storagemock.NewMockBatchStore(ctrl) + mockBatchStore.EXPECT().Get(gomock.Any(), batch.ID).Return(batch, nil).AnyTimes() + + store := storagemock.NewMockStorage(ctrl) + store.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes() + return store +} + // newTestController creates a controller with test dependencies. -func newTestController(t *testing.T, ctrl *gomock.Controller, publishErr error) *Controller { +func newTestController(t *testing.T, ctrl *gomock.Controller, store *storagemock.MockStorage, publishErr error) *Controller { logger := zaptest.NewLogger(t).Sugar() scope := tally.NoopScope @@ -51,12 +79,14 @@ func newTestController(t *testing.T, ctrl *gomock.Controller, publishErr error) ) require.NoError(t, err) - return NewController(logger, scope, registry, consumer.TopicKeyBuild, "orchestrator-build") + return NewController(logger, scope, store, registry, consumer.TopicKeyBuild, "orchestrator-build") } func TestNewController(t *testing.T) { ctrl := gomock.NewController(t) - controller := newTestController(t, ctrl, nil) + batch := testBatch() + store := newMockStorage(ctrl, batch) + controller := newTestController(t, ctrl, store, nil) require.NotNil(t, controller) assert.Equal(t, consumer.TopicKeyBuild, controller.TopicKey()) @@ -67,40 +97,35 @@ func TestNewController(t *testing.T) { func TestController_Process_Success(t *testing.T) { ctrl := gomock.NewController(t) - controller := newTestController(t, ctrl, nil) + batch := testBatch() + store := newMockStorage(ctrl, batch) + controller := newTestController(t, ctrl, store, nil) - batch := entity.Batch{ - ID: "test-queue/batch/1", - Queue: "test-queue", - State: entity.BatchStateCreated, - Version: 1, - } - - payload, err := batch.ToBytes() - require.NoError(t, err) - - msg := queue.NewMessage("test-queue/batch/1", payload, "test-queue", nil) + msg := queue.NewMessage(batch.ID, batchIDPayload(t, batch.ID), batch.Queue, nil) delivery := queuemock.NewMockDelivery(ctrl) delivery.EXPECT().Message().Return(msg).AnyTimes() delivery.EXPECT().Attempt().Return(1).AnyTimes() - err = controller.Process(context.Background(), delivery) + err := controller.Process(context.Background(), delivery) require.NoError(t, err) } -func TestController_Process_InvalidJSON(t *testing.T) { +func TestController_Process_StorageFailure(t *testing.T) { ctrl := gomock.NewController(t) - controller := newTestController(t, ctrl, nil) + mockBatchStore := storagemock.NewMockBatchStore(ctrl) + mockBatchStore.EXPECT().Get(gomock.Any(), "test-queue/batch/1").Return(entity.Batch{}, fmt.Errorf("db connection lost")) + store := storagemock.NewMockStorage(ctrl) + store.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes() + + controller := newTestController(t, ctrl, store, nil) - invalidPayload := []byte(`{"invalid": json"}`) - msg := queue.NewMessage("invalid-msg", invalidPayload, "partition1", nil) + msg := queue.NewMessage("test-queue/batch/1", batchIDPayload(t, "test-queue/batch/1"), "test-queue", nil) delivery := queuemock.NewMockDelivery(ctrl) delivery.EXPECT().Message().Return(msg).AnyTimes() delivery.EXPECT().Attempt().Return(1).AnyTimes() err := controller.Process(context.Background(), delivery) - require.Error(t, err) assert.False(t, errs.IsRetryable(err)) } @@ -108,30 +133,24 @@ func TestController_Process_InvalidJSON(t *testing.T) { func TestController_Process_PublishFailure(t *testing.T) { ctrl := gomock.NewController(t) - controller := newTestController(t, ctrl, fmt.Errorf("publish failed")) - - batch := entity.Batch{ - ID: "test-queue/batch/1", - Queue: "test-queue", - State: entity.BatchStateCreated, - Version: 1, - } - - payload, err := batch.ToBytes() - require.NoError(t, err) + batch := testBatch() + store := newMockStorage(ctrl, batch) + controller := newTestController(t, ctrl, store, fmt.Errorf("publish failed")) - msg := queue.NewMessage(batch.ID, payload, batch.Queue, nil) + msg := queue.NewMessage(batch.ID, batchIDPayload(t, batch.ID), batch.Queue, nil) delivery := queuemock.NewMockDelivery(ctrl) delivery.EXPECT().Message().Return(msg).AnyTimes() delivery.EXPECT().Attempt().Return(1).AnyTimes() - err = controller.Process(context.Background(), delivery) + err := controller.Process(context.Background(), delivery) assert.Error(t, err) } func TestController_InterfaceImplementation(t *testing.T) { ctrl := gomock.NewController(t) - controller := newTestController(t, ctrl, nil) + batch := testBatch() + store := newMockStorage(ctrl, batch) + controller := newTestController(t, ctrl, store, nil) var _ consumer.Controller = controller } diff --git a/orchestrator/controller/buildsignal/BUILD.bazel b/orchestrator/controller/buildsignal/BUILD.bazel index 0ca992b7..73c973a8 100644 --- a/orchestrator/controller/buildsignal/BUILD.bazel +++ b/orchestrator/controller/buildsignal/BUILD.bazel @@ -9,6 +9,7 @@ go_library( "//core/consumer", "//entity", "//entity/queue", + "//extension/storage", "@com_github_uber_go_tally_v4//:tally", "@org_uber_go_zap//:zap", ], @@ -24,6 +25,7 @@ go_test( "//entity", "//entity/queue", "//extension/queue/mock", + "//extension/storage/mock", "@com_github_stretchr_testify//assert", "@com_github_stretchr_testify//require", "@com_github_uber_go_tally_v4//:tally", diff --git a/orchestrator/controller/buildsignal/buildsignal.go b/orchestrator/controller/buildsignal/buildsignal.go index 95a0a686..c7b4db87 100644 --- a/orchestrator/controller/buildsignal/buildsignal.go +++ b/orchestrator/controller/buildsignal/buildsignal.go @@ -22,6 +22,7 @@ import ( "github.com/uber/submitqueue/core/consumer" "github.com/uber/submitqueue/entity" entityqueue "github.com/uber/submitqueue/entity/queue" + "github.com/uber/submitqueue/extension/storage" "go.uber.org/zap" ) @@ -31,6 +32,7 @@ import ( type Controller struct { logger *zap.SugaredLogger metricsScope tally.Scope + store storage.Storage registry consumer.TopicRegistry topicKey consumer.TopicKey consumerGroup string @@ -43,6 +45,7 @@ var _ consumer.Controller = (*Controller)(nil) func NewController( logger *zap.SugaredLogger, scope tally.Scope, + store storage.Storage, registry consumer.TopicRegistry, topicKey consumer.TopicKey, consumerGroup string, @@ -50,6 +53,7 @@ func NewController( return &Controller{ logger: logger.Named("buildsignal_controller"), metricsScope: scope.SubScope("buildsignal_controller"), + store: store, registry: registry, topicKey: topicKey, consumerGroup: consumerGroup, @@ -84,19 +88,22 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er // - Evaluate build result (pass/fail) // - Update batch state based on build outcome - batch := entity.Batch{ - ID: build.BatchID, + // Fetch batch from storage to get the partition key (queue) + batch, err := c.store.GetBatchStore().Get(ctx, build.BatchID) + if err != nil { + c.metricsScope.Counter("storage_errors").Inc(1) + return fmt.Errorf("failed to get batch %s: %w", build.BatchID, err) } // Publish batch to speculate topic - if err := c.publish(ctx, consumer.TopicKeySpeculate, batch); err != nil { + if err := c.publish(ctx, consumer.TopicKeySpeculate, batch.ID, batch.Queue); err != nil { c.metricsScope.Counter("publish_errors").Inc(1) return fmt.Errorf("failed to publish to speculate: %w", err) } c.logger.Infow("published batch to speculate", "build_id", build.ID, - "batch_id", build.BatchID, + "batch_id", batch.ID, "topic_key", consumer.TopicKeySpeculate, ) @@ -105,14 +112,15 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er return nil // Success - message will be acked } -// publish publishes a batch to the specified topic key. -func (c *Controller) publish(ctx context.Context, key consumer.TopicKey, batch entity.Batch) error { - payload, err := batch.ToBytes() +// publish publishes a batch ID to the specified topic key. +func (c *Controller) publish(ctx context.Context, key consumer.TopicKey, batchID string, partitionKey string) error { + bid := entity.BatchID{ID: batchID} + payload, err := bid.ToBytes() if err != nil { - return fmt.Errorf("failed to serialize batch: %w", err) + return fmt.Errorf("failed to serialize batch ID: %w", err) } - msg := entityqueue.NewMessage(batch.ID, payload, batch.Queue, nil) + msg := entityqueue.NewMessage(batchID, payload, partitionKey, nil) q, ok := c.registry.Queue(key) if !ok { diff --git a/orchestrator/controller/buildsignal/buildsignal_test.go b/orchestrator/controller/buildsignal/buildsignal_test.go index e6157373..44db77a9 100644 --- a/orchestrator/controller/buildsignal/buildsignal_test.go +++ b/orchestrator/controller/buildsignal/buildsignal_test.go @@ -27,12 +27,26 @@ import ( "github.com/uber/submitqueue/entity" "github.com/uber/submitqueue/entity/queue" queuemock "github.com/uber/submitqueue/extension/queue/mock" + storagemock "github.com/uber/submitqueue/extension/storage/mock" "go.uber.org/mock/gomock" "go.uber.org/zap/zaptest" ) +// newMockStorage creates a MockStorage with a MockBatchStore that returns a batch for the given batchID. +func newMockStorage(ctrl *gomock.Controller, batchID string) *storagemock.MockStorage { + mockBatchStore := storagemock.NewMockBatchStore(ctrl) + mockBatchStore.EXPECT().Get(gomock.Any(), batchID).Return(entity.Batch{ + ID: batchID, + Queue: "test-queue", + }, nil).AnyTimes() + + store := storagemock.NewMockStorage(ctrl) + store.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes() + return store +} + // newTestController creates a controller with test dependencies. -func newTestController(t *testing.T, ctrl *gomock.Controller, publishErr error) *Controller { +func newTestController(t *testing.T, ctrl *gomock.Controller, store *storagemock.MockStorage, publishErr error) *Controller { logger := zaptest.NewLogger(t).Sugar() scope := tally.NoopScope @@ -51,12 +65,13 @@ func newTestController(t *testing.T, ctrl *gomock.Controller, publishErr error) ) require.NoError(t, err) - return NewController(logger, scope, registry, consumer.TopicKeyBuildSignal, "orchestrator-buildsignal") + return NewController(logger, scope, store, registry, consumer.TopicKeyBuildSignal, "orchestrator-buildsignal") } func TestNewController(t *testing.T) { ctrl := gomock.NewController(t) - controller := newTestController(t, ctrl, nil) + store := newMockStorage(ctrl, "test-queue/batch/1") + controller := newTestController(t, ctrl, store, nil) require.NotNil(t, controller) assert.Equal(t, consumer.TopicKeyBuildSignal, controller.TopicKey()) @@ -67,7 +82,8 @@ func TestNewController(t *testing.T) { func TestController_Process_Success(t *testing.T) { ctrl := gomock.NewController(t) - controller := newTestController(t, ctrl, nil) + store := newMockStorage(ctrl, "test-queue/batch/1") + controller := newTestController(t, ctrl, store, nil) build := entity.Build{ ID: "build-123", @@ -90,7 +106,8 @@ func TestController_Process_Success(t *testing.T) { func TestController_Process_InvalidJSON(t *testing.T) { ctrl := gomock.NewController(t) - controller := newTestController(t, ctrl, nil) + store := newMockStorage(ctrl, "test-queue/batch/1") + controller := newTestController(t, ctrl, store, nil) invalidPayload := []byte(`{"invalid": json"}`) msg := queue.NewMessage("invalid-msg", invalidPayload, "partition1", nil) @@ -107,7 +124,8 @@ func TestController_Process_InvalidJSON(t *testing.T) { func TestController_Process_PublishFailure(t *testing.T) { ctrl := gomock.NewController(t) - controller := newTestController(t, ctrl, fmt.Errorf("publish failed")) + store := newMockStorage(ctrl, "test-queue/batch/2") + controller := newTestController(t, ctrl, store, fmt.Errorf("publish failed")) build := entity.Build{ ID: "build-456", @@ -129,7 +147,8 @@ func TestController_Process_PublishFailure(t *testing.T) { func TestController_InterfaceImplementation(t *testing.T) { ctrl := gomock.NewController(t) - controller := newTestController(t, ctrl, nil) + store := newMockStorage(ctrl, "test-queue/batch/1") + controller := newTestController(t, ctrl, store, nil) var _ consumer.Controller = controller } diff --git a/orchestrator/controller/conclude/conclude.go b/orchestrator/controller/conclude/conclude.go index f98b537f..1c454e27 100644 --- a/orchestrator/controller/conclude/conclude.go +++ b/orchestrator/controller/conclude/conclude.go @@ -69,12 +69,18 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (r msg := delivery.Message() - // Deserialize batch entity - batch, err := entity.BatchFromBytes(msg.Payload) + // Deserialize batch ID from payload + bid, err := entity.BatchIDFromBytes(msg.Payload) if err != nil { - // Non-retryable: malformed messages will never succeed regardless of retry count metrics.NamedCounter(c.metricsScope, "process", "deserialize_errors", 1) - return fmt.Errorf("failed to deserialize batch: %w", err) + return fmt.Errorf("failed to deserialize batch ID: %w", err) + } + + // Fetch batch from storage + batch, err := c.store.GetBatchStore().Get(ctx, bid.ID) + if err != nil { + metrics.NamedCounter(c.metricsScope, "process", "storage_errors", 1) + return fmt.Errorf("failed to get batch %s: %w", bid.ID, err) } c.logger.Infow("received conclude event", diff --git a/orchestrator/controller/conclude/conclude_test.go b/orchestrator/controller/conclude/conclude_test.go index 7d1439ae..990e139d 100644 --- a/orchestrator/controller/conclude/conclude_test.go +++ b/orchestrator/controller/conclude/conclude_test.go @@ -33,6 +33,13 @@ import ( "go.uber.org/zap/zaptest" ) +// batchIDPayload serializes a BatchID to JSON bytes for test message payloads. +func batchIDPayload(t *testing.T, id string) []byte { + payload, err := entity.BatchID{ID: id}.ToBytes() + require.NoError(t, err) + return payload +} + // newTestController creates a controller with test dependencies. func newTestController(t *testing.T, ctrl *gomock.Controller, mockStorage *storagemock.MockStorage) *Controller { logger := zaptest.NewLogger(t).Sugar() @@ -40,8 +47,10 @@ func newTestController(t *testing.T, ctrl *gomock.Controller, mockStorage *stora if mockStorage == nil { mockRequestStore := storagemock.NewMockRequestStore(ctrl) + mockBatchStore := storagemock.NewMockBatchStore(ctrl) mockStorage = storagemock.NewMockStorage(ctrl) mockStorage.EXPECT().GetRequestStore().Return(mockRequestStore).AnyTimes() + mockStorage.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes() } registry, err := consumer.NewTopicRegistry(nil) @@ -78,6 +87,15 @@ func TestController_Process(t *testing.T) { Version: 3, }, setupStore: func(ctrl *gomock.Controller) *storagemock.MockStorage { + mockBatchStore := storagemock.NewMockBatchStore(ctrl) + mockBatchStore.EXPECT().Get(gomock.Any(), "test-queue/batch/1").Return(entity.Batch{ + ID: "test-queue/batch/1", + Queue: "test-queue", + Contains: []string{"test-queue/1", "test-queue/2"}, + State: entity.BatchStateSucceeded, + Version: 3, + }, nil) + mockRequestStore := storagemock.NewMockRequestStore(ctrl) mockRequestStore.EXPECT().Get(gomock.Any(), "test-queue/1").Return(entity.Request{ ID: "test-queue/1", Version: 2, State: entity.RequestStateProcessing, @@ -89,6 +107,7 @@ func TestController_Process(t *testing.T) { mockRequestStore.EXPECT().UpdateState(gomock.Any(), "test-queue/2", int32(3), entity.RequestStateLanded).Return(nil) mockStorage := storagemock.NewMockStorage(ctrl) + mockStorage.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes() mockStorage.EXPECT().GetRequestStore().Return(mockRequestStore).AnyTimes() return mockStorage }, @@ -103,6 +122,15 @@ func TestController_Process(t *testing.T) { Version: 2, }, setupStore: func(ctrl *gomock.Controller) *storagemock.MockStorage { + mockBatchStore := storagemock.NewMockBatchStore(ctrl) + mockBatchStore.EXPECT().Get(gomock.Any(), "test-queue/batch/2").Return(entity.Batch{ + ID: "test-queue/batch/2", + Queue: "test-queue", + Contains: []string{"test-queue/5"}, + State: entity.BatchStateFailed, + Version: 2, + }, nil) + mockRequestStore := storagemock.NewMockRequestStore(ctrl) mockRequestStore.EXPECT().Get(gomock.Any(), "test-queue/5").Return(entity.Request{ ID: "test-queue/5", Version: 1, State: entity.RequestStateProcessing, @@ -110,6 +138,7 @@ func TestController_Process(t *testing.T) { mockRequestStore.EXPECT().UpdateState(gomock.Any(), "test-queue/5", int32(1), entity.RequestStateError).Return(nil) mockStorage := storagemock.NewMockStorage(ctrl) + mockStorage.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes() mockStorage.EXPECT().GetRequestStore().Return(mockRequestStore).AnyTimes() return mockStorage }, @@ -123,6 +152,20 @@ func TestController_Process(t *testing.T) { State: entity.BatchStateCancelled, Version: 2, }, + setupStore: func(ctrl *gomock.Controller) *storagemock.MockStorage { + mockBatchStore := storagemock.NewMockBatchStore(ctrl) + mockBatchStore.EXPECT().Get(gomock.Any(), "test-queue/batch/3").Return(entity.Batch{ + ID: "test-queue/batch/3", + Queue: "test-queue", + Contains: []string{"test-queue/10"}, + State: entity.BatchStateCancelled, + Version: 2, + }, nil) + + mockStorage := storagemock.NewMockStorage(ctrl) + mockStorage.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes() + return mockStorage + }, wantErr: true, retryable: false, }, @@ -135,6 +178,20 @@ func TestController_Process(t *testing.T) { State: entity.BatchStateCreated, Version: 1, }, + setupStore: func(ctrl *gomock.Controller) *storagemock.MockStorage { + mockBatchStore := storagemock.NewMockBatchStore(ctrl) + mockBatchStore.EXPECT().Get(gomock.Any(), "test-queue/batch/4").Return(entity.Batch{ + ID: "test-queue/batch/4", + Queue: "test-queue", + Contains: []string{"test-queue/1"}, + State: entity.BatchStateCreated, + Version: 1, + }, nil) + + mockStorage := storagemock.NewMockStorage(ctrl) + mockStorage.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes() + return mockStorage + }, wantErr: true, retryable: false, }, @@ -148,10 +205,20 @@ func TestController_Process(t *testing.T) { Version: 2, }, setupStore: func(ctrl *gomock.Controller) *storagemock.MockStorage { + mockBatchStore := storagemock.NewMockBatchStore(ctrl) + mockBatchStore.EXPECT().Get(gomock.Any(), "test-queue/batch/5").Return(entity.Batch{ + ID: "test-queue/batch/5", + Queue: "test-queue", + Contains: []string{"test-queue/1"}, + State: entity.BatchStateSucceeded, + Version: 2, + }, nil) + mockRequestStore := storagemock.NewMockRequestStore(ctrl) mockRequestStore.EXPECT().Get(gomock.Any(), "test-queue/1").Return(entity.Request{}, fmt.Errorf("db connection lost")) mockStorage := storagemock.NewMockStorage(ctrl) + mockStorage.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes() mockStorage.EXPECT().GetRequestStore().Return(mockRequestStore).AnyTimes() return mockStorage }, @@ -168,6 +235,15 @@ func TestController_Process(t *testing.T) { Version: 2, }, setupStore: func(ctrl *gomock.Controller) *storagemock.MockStorage { + mockBatchStore := storagemock.NewMockBatchStore(ctrl) + mockBatchStore.EXPECT().Get(gomock.Any(), "test-queue/batch/6").Return(entity.Batch{ + ID: "test-queue/batch/6", + Queue: "test-queue", + Contains: []string{"test-queue/1"}, + State: entity.BatchStateSucceeded, + Version: 2, + }, nil) + mockRequestStore := storagemock.NewMockRequestStore(ctrl) mockRequestStore.EXPECT().Get(gomock.Any(), "test-queue/1").Return(entity.Request{ ID: "test-queue/1", Version: 2, State: entity.RequestStateProcessing, @@ -175,6 +251,7 @@ func TestController_Process(t *testing.T) { mockRequestStore.EXPECT().UpdateState(gomock.Any(), "test-queue/1", int32(2), entity.RequestStateLanded).Return(storage.ErrVersionMismatch) mockStorage := storagemock.NewMockStorage(ctrl) + mockStorage.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes() mockStorage.EXPECT().GetRequestStore().Return(mockRequestStore).AnyTimes() return mockStorage }, @@ -189,6 +266,19 @@ func TestController_Process(t *testing.T) { State: entity.BatchStateSucceeded, Version: 1, }, + setupStore: func(ctrl *gomock.Controller) *storagemock.MockStorage { + mockBatchStore := storagemock.NewMockBatchStore(ctrl) + mockBatchStore.EXPECT().Get(gomock.Any(), "test-queue/batch/7").Return(entity.Batch{ + ID: "test-queue/batch/7", + Queue: "test-queue", + State: entity.BatchStateSucceeded, + Version: 1, + }, nil) + + mockStorage := storagemock.NewMockStorage(ctrl) + mockStorage.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes() + return mockStorage + }, }, } @@ -203,15 +293,12 @@ func TestController_Process(t *testing.T) { controller := newTestController(t, ctrl, mockStorage) - payload, err := tt.batch.ToBytes() - require.NoError(t, err) - - msg := queue.NewMessage(tt.batch.ID, payload, tt.batch.Queue, nil) + msg := queue.NewMessage(tt.batch.ID, batchIDPayload(t, tt.batch.ID), tt.batch.Queue, nil) delivery := queuemock.NewMockDelivery(ctrl) delivery.EXPECT().Message().Return(msg).AnyTimes() delivery.EXPECT().Attempt().Return(1).AnyTimes() - err = controller.Process(context.Background(), delivery) + err := controller.Process(context.Background(), delivery) if tt.wantErr { require.Error(t, err) @@ -223,19 +310,23 @@ func TestController_Process(t *testing.T) { } } -func TestController_Process_InvalidJSON(t *testing.T) { +func TestController_Process_StorageFailure(t *testing.T) { ctrl := gomock.NewController(t) - controller := newTestController(t, ctrl, nil) + mockBatchStore := storagemock.NewMockBatchStore(ctrl) + mockBatchStore.EXPECT().Get(gomock.Any(), "test-queue/batch/1").Return(entity.Batch{}, fmt.Errorf("db connection lost")) + + mockStorage := storagemock.NewMockStorage(ctrl) + mockStorage.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes() - invalidPayload := []byte(`{"invalid": json"}`) - msg := queue.NewMessage("invalid-msg", invalidPayload, "partition1", nil) + controller := newTestController(t, ctrl, mockStorage) + + msg := queue.NewMessage("test-queue/batch/1", batchIDPayload(t, "test-queue/batch/1"), "test-queue", nil) delivery := queuemock.NewMockDelivery(ctrl) delivery.EXPECT().Message().Return(msg).AnyTimes() delivery.EXPECT().Attempt().Return(1).AnyTimes() err := controller.Process(context.Background(), delivery) - require.Error(t, err) assert.False(t, errs.IsRetryable(err)) } diff --git a/orchestrator/controller/merge/BUILD.bazel b/orchestrator/controller/merge/BUILD.bazel index f388af3f..93333685 100644 --- a/orchestrator/controller/merge/BUILD.bazel +++ b/orchestrator/controller/merge/BUILD.bazel @@ -9,6 +9,7 @@ go_library( "//core/consumer", "//entity", "//entity/queue", + "//extension/storage", "@com_github_uber_go_tally_v4//:tally", "@org_uber_go_zap//:zap", ], @@ -24,6 +25,7 @@ go_test( "//entity", "//entity/queue", "//extension/queue/mock", + "//extension/storage/mock", "@com_github_stretchr_testify//assert", "@com_github_stretchr_testify//require", "@com_github_uber_go_tally_v4//:tally", diff --git a/orchestrator/controller/merge/merge.go b/orchestrator/controller/merge/merge.go index 7d1f1949..395e4c4c 100644 --- a/orchestrator/controller/merge/merge.go +++ b/orchestrator/controller/merge/merge.go @@ -22,6 +22,7 @@ import ( "github.com/uber/submitqueue/core/consumer" "github.com/uber/submitqueue/entity" entityqueue "github.com/uber/submitqueue/entity/queue" + "github.com/uber/submitqueue/extension/storage" "go.uber.org/zap" ) @@ -31,6 +32,7 @@ import ( type Controller struct { logger *zap.SugaredLogger metricsScope tally.Scope + store storage.Storage registry consumer.TopicRegistry topicKey consumer.TopicKey consumerGroup string @@ -43,6 +45,7 @@ var _ consumer.Controller = (*Controller)(nil) func NewController( logger *zap.SugaredLogger, scope tally.Scope, + store storage.Storage, registry consumer.TopicRegistry, topicKey consumer.TopicKey, consumerGroup string, @@ -50,6 +53,7 @@ func NewController( return &Controller{ logger: logger.Named("merge_controller"), metricsScope: scope.SubScope("merge_controller"), + store: store, registry: registry, topicKey: topicKey, consumerGroup: consumerGroup, @@ -64,12 +68,18 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er msg := delivery.Message() - // Deserialize batch entity - batch, err := entity.BatchFromBytes(msg.Payload) + // Deserialize batch ID from payload + bid, err := entity.BatchIDFromBytes(msg.Payload) if err != nil { c.metricsScope.Counter("deserialize_errors").Inc(1) - // Non-retryable: malformed messages will never succeed regardless of retry count - return fmt.Errorf("failed to deserialize batch: %w", err) + return fmt.Errorf("failed to deserialize batch ID: %w", err) + } + + // Fetch batch from storage + batch, err := c.store.GetBatchStore().Get(ctx, bid.ID) + if err != nil { + c.metricsScope.Counter("storage_errors").Inc(1) + return fmt.Errorf("failed to get batch %s: %w", bid.ID, err) } c.logger.Infow("received merge event", @@ -86,7 +96,7 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er // - Handle merge conflicts // Publish to conclude topic - if err := c.publish(ctx, consumer.TopicKeyConclude, batch); err != nil { + if err := c.publish(ctx, consumer.TopicKeyConclude, batch.ID, batch.Queue); err != nil { c.metricsScope.Counter("publish_errors").Inc(1) return fmt.Errorf("failed to publish to conclude: %w", err) } @@ -97,7 +107,7 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er ) // Publish to speculate topic - if err := c.publish(ctx, consumer.TopicKeySpeculate, batch); err != nil { + if err := c.publish(ctx, consumer.TopicKeySpeculate, batch.ID, batch.Queue); err != nil { c.metricsScope.Counter("publish_errors").Inc(1) return fmt.Errorf("failed to publish to speculate: %w", err) } @@ -112,14 +122,15 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er return nil // Success - message will be acked } -// publish publishes a batch to the specified topic key. -func (c *Controller) publish(ctx context.Context, key consumer.TopicKey, batch entity.Batch) error { - payload, err := batch.ToBytes() +// publish publishes a batch ID to the specified topic key. +func (c *Controller) publish(ctx context.Context, key consumer.TopicKey, batchID string, partitionKey string) error { + bid := entity.BatchID{ID: batchID} + payload, err := bid.ToBytes() if err != nil { - return fmt.Errorf("failed to serialize batch: %w", err) + return fmt.Errorf("failed to serialize batch ID: %w", err) } - msg := entityqueue.NewMessage(batch.ID, payload, batch.Queue, nil) + msg := entityqueue.NewMessage(batchID, payload, partitionKey, nil) q, ok := c.registry.Queue(key) if !ok { diff --git a/orchestrator/controller/merge/merge_test.go b/orchestrator/controller/merge/merge_test.go index 1677dc79..aa9f0738 100644 --- a/orchestrator/controller/merge/merge_test.go +++ b/orchestrator/controller/merge/merge_test.go @@ -27,12 +27,40 @@ import ( "github.com/uber/submitqueue/entity" "github.com/uber/submitqueue/entity/queue" queuemock "github.com/uber/submitqueue/extension/queue/mock" + storagemock "github.com/uber/submitqueue/extension/storage/mock" "go.uber.org/mock/gomock" "go.uber.org/zap/zaptest" ) +// batchIDPayload serializes a BatchID to JSON bytes for test message payloads. +func batchIDPayload(t *testing.T, id string) []byte { + payload, err := entity.BatchID{ID: id}.ToBytes() + require.NoError(t, err) + return payload +} + +// testBatch returns a standard test batch for merge tests. +func testBatch() entity.Batch { + return entity.Batch{ + ID: "test-queue/batch/1", + Queue: "test-queue", + State: entity.BatchStateCreated, + Version: 1, + } +} + +// newMockStorage creates a MockStorage with a MockBatchStore that returns the given batch on Get. +func newMockStorage(ctrl *gomock.Controller, batch entity.Batch) *storagemock.MockStorage { + mockBatchStore := storagemock.NewMockBatchStore(ctrl) + mockBatchStore.EXPECT().Get(gomock.Any(), batch.ID).Return(batch, nil).AnyTimes() + + store := storagemock.NewMockStorage(ctrl) + store.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes() + return store +} + // newTestController creates a controller with test dependencies. -func newTestController(t *testing.T, ctrl *gomock.Controller, publishErr error) *Controller { +func newTestController(t *testing.T, ctrl *gomock.Controller, store *storagemock.MockStorage, publishErr error) *Controller { logger := zaptest.NewLogger(t).Sugar() scope := tally.NoopScope @@ -54,12 +82,14 @@ func newTestController(t *testing.T, ctrl *gomock.Controller, publishErr error) ) require.NoError(t, err) - return NewController(logger, scope, registry, consumer.TopicKeyMerge, "orchestrator-merge") + return NewController(logger, scope, store, registry, consumer.TopicKeyMerge, "orchestrator-merge") } func TestNewController(t *testing.T) { ctrl := gomock.NewController(t) - controller := newTestController(t, ctrl, nil) + batch := testBatch() + store := newMockStorage(ctrl, batch) + controller := newTestController(t, ctrl, store, nil) require.NotNil(t, controller) assert.Equal(t, consumer.TopicKeyMerge, controller.TopicKey()) @@ -70,40 +100,35 @@ func TestNewController(t *testing.T) { func TestController_Process_Success(t *testing.T) { ctrl := gomock.NewController(t) - controller := newTestController(t, ctrl, nil) + batch := testBatch() + store := newMockStorage(ctrl, batch) + controller := newTestController(t, ctrl, store, nil) - batch := entity.Batch{ - ID: "test-queue/batch/1", - Queue: "test-queue", - State: entity.BatchStateCreated, - Version: 1, - } - - payload, err := batch.ToBytes() - require.NoError(t, err) - - msg := queue.NewMessage("test-queue/batch/1", payload, "test-queue", nil) + msg := queue.NewMessage(batch.ID, batchIDPayload(t, batch.ID), batch.Queue, nil) delivery := queuemock.NewMockDelivery(ctrl) delivery.EXPECT().Message().Return(msg).AnyTimes() delivery.EXPECT().Attempt().Return(1).AnyTimes() - err = controller.Process(context.Background(), delivery) + err := controller.Process(context.Background(), delivery) require.NoError(t, err) } -func TestController_Process_InvalidJSON(t *testing.T) { +func TestController_Process_StorageFailure(t *testing.T) { ctrl := gomock.NewController(t) - controller := newTestController(t, ctrl, nil) + mockBatchStore := storagemock.NewMockBatchStore(ctrl) + mockBatchStore.EXPECT().Get(gomock.Any(), "test-queue/batch/1").Return(entity.Batch{}, fmt.Errorf("db connection lost")) + store := storagemock.NewMockStorage(ctrl) + store.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes() + + controller := newTestController(t, ctrl, store, nil) - invalidPayload := []byte(`{"invalid": json"}`) - msg := queue.NewMessage("invalid-msg", invalidPayload, "partition1", nil) + msg := queue.NewMessage("test-queue/batch/1", batchIDPayload(t, "test-queue/batch/1"), "test-queue", nil) delivery := queuemock.NewMockDelivery(ctrl) delivery.EXPECT().Message().Return(msg).AnyTimes() delivery.EXPECT().Attempt().Return(1).AnyTimes() err := controller.Process(context.Background(), delivery) - require.Error(t, err) assert.False(t, errs.IsRetryable(err)) } @@ -111,30 +136,24 @@ func TestController_Process_InvalidJSON(t *testing.T) { func TestController_Process_PublishFailure(t *testing.T) { ctrl := gomock.NewController(t) - controller := newTestController(t, ctrl, fmt.Errorf("publish failed")) - - batch := entity.Batch{ - ID: "test-queue/batch/1", - Queue: "test-queue", - State: entity.BatchStateCreated, - Version: 1, - } - - payload, err := batch.ToBytes() - require.NoError(t, err) + batch := testBatch() + store := newMockStorage(ctrl, batch) + controller := newTestController(t, ctrl, store, fmt.Errorf("publish failed")) - msg := queue.NewMessage(batch.ID, payload, batch.Queue, nil) + msg := queue.NewMessage(batch.ID, batchIDPayload(t, batch.ID), batch.Queue, nil) delivery := queuemock.NewMockDelivery(ctrl) delivery.EXPECT().Message().Return(msg).AnyTimes() delivery.EXPECT().Attempt().Return(1).AnyTimes() - err = controller.Process(context.Background(), delivery) + err := controller.Process(context.Background(), delivery) assert.Error(t, err) } func TestController_InterfaceImplementation(t *testing.T) { ctrl := gomock.NewController(t) - controller := newTestController(t, ctrl, nil) + batch := testBatch() + store := newMockStorage(ctrl, batch) + controller := newTestController(t, ctrl, store, nil) var _ consumer.Controller = controller } diff --git a/orchestrator/controller/score/BUILD.bazel b/orchestrator/controller/score/BUILD.bazel index 525da050..d7fb2254 100644 --- a/orchestrator/controller/score/BUILD.bazel +++ b/orchestrator/controller/score/BUILD.bazel @@ -9,6 +9,7 @@ go_library( "//core/consumer", "//entity", "//entity/queue", + "//extension/storage", "@com_github_uber_go_tally_v4//:tally", "@org_uber_go_zap//:zap", ], @@ -24,6 +25,7 @@ go_test( "//entity", "//entity/queue", "//extension/queue/mock", + "//extension/storage/mock", "@com_github_stretchr_testify//assert", "@com_github_stretchr_testify//require", "@com_github_uber_go_tally_v4//:tally", diff --git a/orchestrator/controller/score/score.go b/orchestrator/controller/score/score.go index 2750c795..4a916caf 100644 --- a/orchestrator/controller/score/score.go +++ b/orchestrator/controller/score/score.go @@ -22,6 +22,7 @@ import ( "github.com/uber/submitqueue/core/consumer" "github.com/uber/submitqueue/entity" entityqueue "github.com/uber/submitqueue/entity/queue" + "github.com/uber/submitqueue/extension/storage" "go.uber.org/zap" ) @@ -31,6 +32,7 @@ import ( type Controller struct { logger *zap.SugaredLogger metricsScope tally.Scope + store storage.Storage registry consumer.TopicRegistry topicKey consumer.TopicKey consumerGroup string @@ -43,6 +45,7 @@ var _ consumer.Controller = (*Controller)(nil) func NewController( logger *zap.SugaredLogger, scope tally.Scope, + store storage.Storage, registry consumer.TopicRegistry, topicKey consumer.TopicKey, consumerGroup string, @@ -50,6 +53,7 @@ func NewController( return &Controller{ logger: logger.Named("score_controller"), metricsScope: scope.SubScope("score_controller"), + store: store, registry: registry, topicKey: topicKey, consumerGroup: consumerGroup, @@ -64,12 +68,18 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er msg := delivery.Message() - // Deserialize batch entity - batch, err := entity.BatchFromBytes(msg.Payload) + // Deserialize batch ID from payload + bid, err := entity.BatchIDFromBytes(msg.Payload) if err != nil { c.metricsScope.Counter("deserialize_errors").Inc(1) - // Non-retryable: malformed messages will never succeed regardless of retry count - return fmt.Errorf("failed to deserialize batch: %w", err) + return fmt.Errorf("failed to deserialize batch ID: %w", err) + } + + // Fetch batch from storage + batch, err := c.store.GetBatchStore().Get(ctx, bid.ID) + if err != nil { + c.metricsScope.Counter("storage_errors").Inc(1) + return fmt.Errorf("failed to get batch %s: %w", bid.ID, err) } c.logger.Infow("received score event", @@ -86,7 +96,7 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er // - Apply scoring heuristics // Publish to speculate topic - if err := c.publish(ctx, consumer.TopicKeySpeculate, batch); err != nil { + if err := c.publish(ctx, consumer.TopicKeySpeculate, batch.ID, batch.Queue); err != nil { c.metricsScope.Counter("publish_errors").Inc(1) return fmt.Errorf("failed to publish to speculate: %w", err) } @@ -101,14 +111,15 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er return nil // Success - message will be acked } -// publish publishes a batch to the specified topic key. -func (c *Controller) publish(ctx context.Context, key consumer.TopicKey, batch entity.Batch) error { - payload, err := batch.ToBytes() +// publish publishes a batch ID to the specified topic key. +func (c *Controller) publish(ctx context.Context, key consumer.TopicKey, batchID string, partitionKey string) error { + bid := entity.BatchID{ID: batchID} + payload, err := bid.ToBytes() if err != nil { - return fmt.Errorf("failed to serialize batch: %w", err) + return fmt.Errorf("failed to serialize batch ID: %w", err) } - msg := entityqueue.NewMessage(batch.ID, payload, batch.Queue, nil) + msg := entityqueue.NewMessage(batchID, payload, partitionKey, nil) q, ok := c.registry.Queue(key) if !ok { diff --git a/orchestrator/controller/score/score_test.go b/orchestrator/controller/score/score_test.go index 6850e775..763dc46e 100644 --- a/orchestrator/controller/score/score_test.go +++ b/orchestrator/controller/score/score_test.go @@ -27,12 +27,40 @@ import ( "github.com/uber/submitqueue/entity" "github.com/uber/submitqueue/entity/queue" queuemock "github.com/uber/submitqueue/extension/queue/mock" + storagemock "github.com/uber/submitqueue/extension/storage/mock" "go.uber.org/mock/gomock" "go.uber.org/zap/zaptest" ) +// batchIDPayload serializes a BatchID to JSON bytes for test message payloads. +func batchIDPayload(t *testing.T, id string) []byte { + payload, err := entity.BatchID{ID: id}.ToBytes() + require.NoError(t, err) + return payload +} + +// testBatch returns a standard test batch for score tests. +func testBatch() entity.Batch { + return entity.Batch{ + ID: "test-queue/batch/1", + Queue: "test-queue", + State: entity.BatchStateCreated, + Version: 1, + } +} + +// newMockStorage creates a MockStorage with a MockBatchStore that returns the given batch on Get. +func newMockStorage(ctrl *gomock.Controller, batch entity.Batch) *storagemock.MockStorage { + mockBatchStore := storagemock.NewMockBatchStore(ctrl) + mockBatchStore.EXPECT().Get(gomock.Any(), batch.ID).Return(batch, nil).AnyTimes() + + store := storagemock.NewMockStorage(ctrl) + store.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes() + return store +} + // newTestController creates a controller with test dependencies. -func newTestController(t *testing.T, ctrl *gomock.Controller, publishErr error) *Controller { +func newTestController(t *testing.T, ctrl *gomock.Controller, store *storagemock.MockStorage, publishErr error) *Controller { logger := zaptest.NewLogger(t).Sugar() scope := tally.NoopScope @@ -51,12 +79,14 @@ func newTestController(t *testing.T, ctrl *gomock.Controller, publishErr error) ) require.NoError(t, err) - return NewController(logger, scope, registry, consumer.TopicKeyScore, "orchestrator-score") + return NewController(logger, scope, store, registry, consumer.TopicKeyScore, "orchestrator-score") } func TestNewController(t *testing.T) { ctrl := gomock.NewController(t) - controller := newTestController(t, ctrl, nil) + batch := testBatch() + store := newMockStorage(ctrl, batch) + controller := newTestController(t, ctrl, store, nil) require.NotNil(t, controller) assert.Equal(t, consumer.TopicKeyScore, controller.TopicKey()) @@ -67,40 +97,35 @@ func TestNewController(t *testing.T) { func TestController_Process_Success(t *testing.T) { ctrl := gomock.NewController(t) - controller := newTestController(t, ctrl, nil) + batch := testBatch() + store := newMockStorage(ctrl, batch) + controller := newTestController(t, ctrl, store, nil) - batch := entity.Batch{ - ID: "test-queue/batch/1", - Queue: "test-queue", - State: entity.BatchStateCreated, - Version: 1, - } - - payload, err := batch.ToBytes() - require.NoError(t, err) - - msg := queue.NewMessage("test-queue/batch/1", payload, "test-queue", nil) + msg := queue.NewMessage(batch.ID, batchIDPayload(t, batch.ID), batch.Queue, nil) delivery := queuemock.NewMockDelivery(ctrl) delivery.EXPECT().Message().Return(msg).AnyTimes() delivery.EXPECT().Attempt().Return(1).AnyTimes() - err = controller.Process(context.Background(), delivery) + err := controller.Process(context.Background(), delivery) require.NoError(t, err) } -func TestController_Process_InvalidJSON(t *testing.T) { +func TestController_Process_StorageFailure(t *testing.T) { ctrl := gomock.NewController(t) - controller := newTestController(t, ctrl, nil) + mockBatchStore := storagemock.NewMockBatchStore(ctrl) + mockBatchStore.EXPECT().Get(gomock.Any(), "test-queue/batch/1").Return(entity.Batch{}, fmt.Errorf("db connection lost")) + store := storagemock.NewMockStorage(ctrl) + store.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes() + + controller := newTestController(t, ctrl, store, nil) - invalidPayload := []byte(`{"invalid": json"}`) - msg := queue.NewMessage("invalid-msg", invalidPayload, "partition1", nil) + msg := queue.NewMessage("test-queue/batch/1", batchIDPayload(t, "test-queue/batch/1"), "test-queue", nil) delivery := queuemock.NewMockDelivery(ctrl) delivery.EXPECT().Message().Return(msg).AnyTimes() delivery.EXPECT().Attempt().Return(1).AnyTimes() err := controller.Process(context.Background(), delivery) - require.Error(t, err) assert.False(t, errs.IsRetryable(err)) } @@ -108,30 +133,24 @@ func TestController_Process_InvalidJSON(t *testing.T) { func TestController_Process_PublishFailure(t *testing.T) { ctrl := gomock.NewController(t) - controller := newTestController(t, ctrl, fmt.Errorf("publish failed")) - - batch := entity.Batch{ - ID: "test-queue/batch/1", - Queue: "test-queue", - State: entity.BatchStateCreated, - Version: 1, - } - - payload, err := batch.ToBytes() - require.NoError(t, err) + batch := testBatch() + store := newMockStorage(ctrl, batch) + controller := newTestController(t, ctrl, store, fmt.Errorf("publish failed")) - msg := queue.NewMessage(batch.ID, payload, batch.Queue, nil) + msg := queue.NewMessage(batch.ID, batchIDPayload(t, batch.ID), batch.Queue, nil) delivery := queuemock.NewMockDelivery(ctrl) delivery.EXPECT().Message().Return(msg).AnyTimes() delivery.EXPECT().Attempt().Return(1).AnyTimes() - err = controller.Process(context.Background(), delivery) + err := controller.Process(context.Background(), delivery) assert.Error(t, err) } func TestController_InterfaceImplementation(t *testing.T) { ctrl := gomock.NewController(t) - controller := newTestController(t, ctrl, nil) + batch := testBatch() + store := newMockStorage(ctrl, batch) + controller := newTestController(t, ctrl, store, nil) var _ consumer.Controller = controller } diff --git a/orchestrator/controller/speculate/BUILD.bazel b/orchestrator/controller/speculate/BUILD.bazel index 013ff592..955780c4 100644 --- a/orchestrator/controller/speculate/BUILD.bazel +++ b/orchestrator/controller/speculate/BUILD.bazel @@ -9,6 +9,7 @@ go_library( "//core/consumer", "//entity", "//entity/queue", + "//extension/storage", "@com_github_uber_go_tally_v4//:tally", "@org_uber_go_zap//:zap", ], @@ -24,6 +25,7 @@ go_test( "//entity", "//entity/queue", "//extension/queue/mock", + "//extension/storage/mock", "@com_github_stretchr_testify//assert", "@com_github_stretchr_testify//require", "@com_github_uber_go_tally_v4//:tally", diff --git a/orchestrator/controller/speculate/speculate.go b/orchestrator/controller/speculate/speculate.go index 7caf8b3c..6d1a7df7 100644 --- a/orchestrator/controller/speculate/speculate.go +++ b/orchestrator/controller/speculate/speculate.go @@ -22,6 +22,7 @@ import ( "github.com/uber/submitqueue/core/consumer" "github.com/uber/submitqueue/entity" entityqueue "github.com/uber/submitqueue/entity/queue" + "github.com/uber/submitqueue/extension/storage" "go.uber.org/zap" ) @@ -31,6 +32,7 @@ import ( type Controller struct { logger *zap.SugaredLogger metricsScope tally.Scope + store storage.Storage registry consumer.TopicRegistry topicKey consumer.TopicKey consumerGroup string @@ -43,6 +45,7 @@ var _ consumer.Controller = (*Controller)(nil) func NewController( logger *zap.SugaredLogger, scope tally.Scope, + store storage.Storage, registry consumer.TopicRegistry, topicKey consumer.TopicKey, consumerGroup string, @@ -50,6 +53,7 @@ func NewController( return &Controller{ logger: logger.Named("speculate_controller"), metricsScope: scope.SubScope("speculate_controller"), + store: store, registry: registry, topicKey: topicKey, consumerGroup: consumerGroup, @@ -64,12 +68,18 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er msg := delivery.Message() - // Deserialize batch entity - batch, err := entity.BatchFromBytes(msg.Payload) + // Deserialize batch ID from payload + bid, err := entity.BatchIDFromBytes(msg.Payload) if err != nil { c.metricsScope.Counter("deserialize_errors").Inc(1) - // Non-retryable: malformed messages will never succeed regardless of retry count - return fmt.Errorf("failed to deserialize batch: %w", err) + return fmt.Errorf("failed to deserialize batch ID: %w", err) + } + + // Fetch batch from storage + batch, err := c.store.GetBatchStore().Get(ctx, bid.ID) + if err != nil { + c.metricsScope.Counter("storage_errors").Inc(1) + return fmt.Errorf("failed to get batch %s: %w", bid.ID, err) } c.logger.Infow("received speculate event", @@ -88,7 +98,7 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er // - Publish to merge only if speculation is complete and successful (ready to land) // Publish to build topic - if err := c.publish(ctx, consumer.TopicKeyBuild, batch); err != nil { + if err := c.publish(ctx, consumer.TopicKeyBuild, batch.ID, batch.Queue); err != nil { c.metricsScope.Counter("publish_errors").Inc(1) return fmt.Errorf("failed to publish to build: %w", err) } @@ -99,7 +109,7 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er ) // Publish to merge topic - if err := c.publish(ctx, consumer.TopicKeyMerge, batch); err != nil { + if err := c.publish(ctx, consumer.TopicKeyMerge, batch.ID, batch.Queue); err != nil { c.metricsScope.Counter("publish_errors").Inc(1) return fmt.Errorf("failed to publish to merge: %w", err) } @@ -114,14 +124,15 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er return nil // Success - message will be acked } -// publish publishes a batch to the specified topic key. -func (c *Controller) publish(ctx context.Context, key consumer.TopicKey, batch entity.Batch) error { - payload, err := batch.ToBytes() +// publish publishes a batch ID to the specified topic key. +func (c *Controller) publish(ctx context.Context, key consumer.TopicKey, batchID string, partitionKey string) error { + bid := entity.BatchID{ID: batchID} + payload, err := bid.ToBytes() if err != nil { - return fmt.Errorf("failed to serialize batch: %w", err) + return fmt.Errorf("failed to serialize batch ID: %w", err) } - msg := entityqueue.NewMessage(batch.ID, payload, batch.Queue, nil) + msg := entityqueue.NewMessage(batchID, payload, partitionKey, nil) q, ok := c.registry.Queue(key) if !ok { diff --git a/orchestrator/controller/speculate/speculate_test.go b/orchestrator/controller/speculate/speculate_test.go index 51466b30..dd651f67 100644 --- a/orchestrator/controller/speculate/speculate_test.go +++ b/orchestrator/controller/speculate/speculate_test.go @@ -27,12 +27,40 @@ import ( "github.com/uber/submitqueue/entity" "github.com/uber/submitqueue/entity/queue" queuemock "github.com/uber/submitqueue/extension/queue/mock" + storagemock "github.com/uber/submitqueue/extension/storage/mock" "go.uber.org/mock/gomock" "go.uber.org/zap/zaptest" ) +// batchIDPayload serializes a BatchID to JSON bytes for test message payloads. +func batchIDPayload(t *testing.T, id string) []byte { + payload, err := entity.BatchID{ID: id}.ToBytes() + require.NoError(t, err) + return payload +} + +// testBatch returns a standard test batch for speculate tests. +func testBatch() entity.Batch { + return entity.Batch{ + ID: "test-queue/batch/1", + Queue: "test-queue", + State: entity.BatchStateCreated, + Version: 1, + } +} + +// newMockStorage creates a MockStorage with a MockBatchStore that returns the given batch on Get. +func newMockStorage(ctrl *gomock.Controller, batch entity.Batch) *storagemock.MockStorage { + mockBatchStore := storagemock.NewMockBatchStore(ctrl) + mockBatchStore.EXPECT().Get(gomock.Any(), batch.ID).Return(batch, nil).AnyTimes() + + store := storagemock.NewMockStorage(ctrl) + store.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes() + return store +} + // newTestController creates a controller with test dependencies. -func newTestController(t *testing.T, ctrl *gomock.Controller, publishErr error) *Controller { +func newTestController(t *testing.T, ctrl *gomock.Controller, store *storagemock.MockStorage, publishErr error) *Controller { logger := zaptest.NewLogger(t).Sugar() scope := tally.NoopScope @@ -54,12 +82,14 @@ func newTestController(t *testing.T, ctrl *gomock.Controller, publishErr error) ) require.NoError(t, err) - return NewController(logger, scope, registry, consumer.TopicKeySpeculate, "orchestrator-speculate") + return NewController(logger, scope, store, registry, consumer.TopicKeySpeculate, "orchestrator-speculate") } func TestNewController(t *testing.T) { ctrl := gomock.NewController(t) - controller := newTestController(t, ctrl, nil) + batch := testBatch() + store := newMockStorage(ctrl, batch) + controller := newTestController(t, ctrl, store, nil) require.NotNil(t, controller) assert.Equal(t, consumer.TopicKeySpeculate, controller.TopicKey()) @@ -70,40 +100,35 @@ func TestNewController(t *testing.T) { func TestController_Process_Success(t *testing.T) { ctrl := gomock.NewController(t) - controller := newTestController(t, ctrl, nil) + batch := testBatch() + store := newMockStorage(ctrl, batch) + controller := newTestController(t, ctrl, store, nil) - batch := entity.Batch{ - ID: "test-queue/batch/1", - Queue: "test-queue", - State: entity.BatchStateCreated, - Version: 1, - } - - payload, err := batch.ToBytes() - require.NoError(t, err) - - msg := queue.NewMessage("test-queue/batch/1", payload, "test-queue", nil) + msg := queue.NewMessage(batch.ID, batchIDPayload(t, batch.ID), batch.Queue, nil) delivery := queuemock.NewMockDelivery(ctrl) delivery.EXPECT().Message().Return(msg).AnyTimes() delivery.EXPECT().Attempt().Return(1).AnyTimes() - err = controller.Process(context.Background(), delivery) + err := controller.Process(context.Background(), delivery) require.NoError(t, err) } -func TestController_Process_InvalidJSON(t *testing.T) { +func TestController_Process_StorageFailure(t *testing.T) { ctrl := gomock.NewController(t) - controller := newTestController(t, ctrl, nil) + mockBatchStore := storagemock.NewMockBatchStore(ctrl) + mockBatchStore.EXPECT().Get(gomock.Any(), "test-queue/batch/1").Return(entity.Batch{}, fmt.Errorf("db connection lost")) + store := storagemock.NewMockStorage(ctrl) + store.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes() + + controller := newTestController(t, ctrl, store, nil) - invalidPayload := []byte(`{"invalid": json"}`) - msg := queue.NewMessage("invalid-msg", invalidPayload, "partition1", nil) + msg := queue.NewMessage("test-queue/batch/1", batchIDPayload(t, "test-queue/batch/1"), "test-queue", nil) delivery := queuemock.NewMockDelivery(ctrl) delivery.EXPECT().Message().Return(msg).AnyTimes() delivery.EXPECT().Attempt().Return(1).AnyTimes() err := controller.Process(context.Background(), delivery) - require.Error(t, err) assert.False(t, errs.IsRetryable(err)) } @@ -111,30 +136,24 @@ func TestController_Process_InvalidJSON(t *testing.T) { func TestController_Process_PublishFailure(t *testing.T) { ctrl := gomock.NewController(t) - controller := newTestController(t, ctrl, fmt.Errorf("publish failed")) - - batch := entity.Batch{ - ID: "test-queue/batch/1", - Queue: "test-queue", - State: entity.BatchStateCreated, - Version: 1, - } - - payload, err := batch.ToBytes() - require.NoError(t, err) + batch := testBatch() + store := newMockStorage(ctrl, batch) + controller := newTestController(t, ctrl, store, fmt.Errorf("publish failed")) - msg := queue.NewMessage(batch.ID, payload, batch.Queue, nil) + msg := queue.NewMessage(batch.ID, batchIDPayload(t, batch.ID), batch.Queue, nil) delivery := queuemock.NewMockDelivery(ctrl) delivery.EXPECT().Message().Return(msg).AnyTimes() delivery.EXPECT().Attempt().Return(1).AnyTimes() - err = controller.Process(context.Background(), delivery) + err := controller.Process(context.Background(), delivery) assert.Error(t, err) } func TestController_InterfaceImplementation(t *testing.T) { ctrl := gomock.NewController(t) - controller := newTestController(t, ctrl, nil) + batch := testBatch() + store := newMockStorage(ctrl, batch) + controller := newTestController(t, ctrl, store, nil) var _ consumer.Controller = controller } diff --git a/orchestrator/controller/start/start.go b/orchestrator/controller/start/start.go index 1bb3e584..be8ddf97 100644 --- a/orchestrator/controller/start/start.go +++ b/orchestrator/controller/start/start.go @@ -115,7 +115,7 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er } // Publish to validate topic - if err := c.publish(ctx, consumer.TopicKeyValidate, request); err != nil { + if err := c.publish(ctx, consumer.TopicKeyValidate, request.ID, request.Queue); err != nil { c.metricsScope.Counter("publish_errors").Inc(1) return fmt.Errorf("failed to publish to validate: %w", err) } @@ -130,14 +130,15 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er return nil // Success - message will be acked } -// publish publishes a request to the specified topic key. -func (c *Controller) publish(ctx context.Context, key consumer.TopicKey, request entity.Request) error { - payload, err := request.ToBytes() +// publish publishes a request ID to the specified topic key. +func (c *Controller) publish(ctx context.Context, key consumer.TopicKey, requestID string, partitionKey string) error { + rid := entity.RequestID{ID: requestID} + payload, err := rid.ToBytes() if err != nil { - return fmt.Errorf("failed to serialize request: %w", err) + return fmt.Errorf("failed to serialize request ID: %w", err) } - msg := entityqueue.NewMessage(request.ID, payload, request.Queue, nil) + msg := entityqueue.NewMessage(requestID, payload, partitionKey, nil) q, ok := c.registry.Queue(key) if !ok { diff --git a/orchestrator/controller/validate/BUILD.bazel b/orchestrator/controller/validate/BUILD.bazel index 3635cd57..0ea6244a 100644 --- a/orchestrator/controller/validate/BUILD.bazel +++ b/orchestrator/controller/validate/BUILD.bazel @@ -11,6 +11,7 @@ go_library( "//entity", "//entity/queue", "//extension/mergechecker", + "//extension/storage", "@com_github_uber_go_tally_v4//:tally", "@org_uber_go_zap//:zap", ], @@ -28,6 +29,7 @@ go_test( "//extension/mergechecker", "//extension/mergechecker/mock", "//extension/queue/mock", + "//extension/storage/mock", "@com_github_stretchr_testify//assert", "@com_github_stretchr_testify//require", "@com_github_uber_go_tally_v4//:tally", diff --git a/orchestrator/controller/validate/validate.go b/orchestrator/controller/validate/validate.go index e17fb009..2ee8d8b1 100644 --- a/orchestrator/controller/validate/validate.go +++ b/orchestrator/controller/validate/validate.go @@ -24,6 +24,7 @@ import ( "github.com/uber/submitqueue/entity" entityqueue "github.com/uber/submitqueue/entity/queue" "github.com/uber/submitqueue/extension/mergechecker" + "github.com/uber/submitqueue/extension/storage" "go.uber.org/zap" ) @@ -34,6 +35,7 @@ import ( type Controller struct { logger *zap.SugaredLogger metricsScope tally.Scope + store storage.Storage registry consumer.TopicRegistry mergeChecker mergechecker.MergeChecker topicKey consumer.TopicKey @@ -47,6 +49,7 @@ var _ consumer.Controller = (*Controller)(nil) func NewController( logger *zap.SugaredLogger, scope tally.Scope, + store storage.Storage, registry consumer.TopicRegistry, mergeChecker mergechecker.MergeChecker, topicKey consumer.TopicKey, @@ -55,6 +58,7 @@ func NewController( return &Controller{ logger: logger.Named("validate_controller"), metricsScope: scope.SubScope("validate_controller"), + store: store, registry: registry, mergeChecker: mergeChecker, topicKey: topicKey, @@ -70,12 +74,18 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er msg := delivery.Message() - // Deserialize request entity - request, err := entity.RequestFromBytes(msg.Payload) + // Deserialize request ID from payload + rid, err := entity.RequestIDFromBytes(msg.Payload) if err != nil { c.metricsScope.Counter("deserialize_errors").Inc(1) - // Non-retryable: malformed messages will never succeed regardless of retry count - return fmt.Errorf("failed to deserialize request: %w", err) + return fmt.Errorf("failed to deserialize request ID: %w", err) + } + + // Fetch request from storage + request, err := c.store.GetRequestStore().Get(ctx, rid.ID) + if err != nil { + c.metricsScope.Counter("storage_errors").Inc(1) + return fmt.Errorf("failed to get request %s: %w", rid.ID, err) } c.logger.Infow("received validate event", @@ -104,7 +114,7 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er } // Publish to batch topic - if err := c.publish(ctx, consumer.TopicKeyBatch, request); err != nil { + if err := c.publish(ctx, consumer.TopicKeyBatch, request.ID, request.Queue); err != nil { c.metricsScope.Counter("publish_errors").Inc(1) return fmt.Errorf("failed to publish to batch: %w", err) } @@ -119,14 +129,15 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er return nil // Success - message will be acked } -// publish publishes a request to the specified topic key. -func (c *Controller) publish(ctx context.Context, key consumer.TopicKey, request entity.Request) error { - payload, err := request.ToBytes() +// publish publishes a request ID to the specified topic key. +func (c *Controller) publish(ctx context.Context, key consumer.TopicKey, requestID string, partitionKey string) error { + rid := entity.RequestID{ID: requestID} + payload, err := rid.ToBytes() if err != nil { - return fmt.Errorf("failed to serialize request: %w", err) + return fmt.Errorf("failed to serialize request ID: %w", err) } - msg := entityqueue.NewMessage(request.ID, payload, request.Queue, nil) + msg := entityqueue.NewMessage(requestID, payload, partitionKey, nil) q, ok := c.registry.Queue(key) if !ok { diff --git a/orchestrator/controller/validate/validate_test.go b/orchestrator/controller/validate/validate_test.go index 50aef98f..cba0500d 100644 --- a/orchestrator/controller/validate/validate_test.go +++ b/orchestrator/controller/validate/validate_test.go @@ -29,10 +29,18 @@ import ( "github.com/uber/submitqueue/extension/mergechecker" mergecheckermock "github.com/uber/submitqueue/extension/mergechecker/mock" queuemock "github.com/uber/submitqueue/extension/queue/mock" + storagemock "github.com/uber/submitqueue/extension/storage/mock" "go.uber.org/mock/gomock" "go.uber.org/zap/zaptest" ) +// requestIDPayload serializes a RequestID to JSON bytes for test message payloads. +func requestIDPayload(t *testing.T, id string) []byte { + payload, err := entity.RequestID{ID: id}.ToBytes() + require.NoError(t, err) + return payload +} + // newMergeableMock returns a mock MergeChecker that always returns mergeable. func newMergeableMock(ctrl *gomock.Controller) *mergecheckermock.MockMergeChecker { mc := mergecheckermock.NewMockMergeChecker(ctrl) @@ -40,8 +48,18 @@ func newMergeableMock(ctrl *gomock.Controller) *mergecheckermock.MockMergeChecke return mc } +// newMockStorage creates a MockStorage with a MockRequestStore that returns the given request on Get. +func newMockStorage(ctrl *gomock.Controller, request entity.Request) *storagemock.MockStorage { + mockReqStore := storagemock.NewMockRequestStore(ctrl) + mockReqStore.EXPECT().Get(gomock.Any(), request.ID).Return(request, nil).AnyTimes() + + store := storagemock.NewMockStorage(ctrl) + store.EXPECT().GetRequestStore().Return(mockReqStore).AnyTimes() + return store +} + // newTestController creates a controller with test dependencies. -func newTestController(t *testing.T, ctrl *gomock.Controller, mc mergechecker.MergeChecker, publishErr error) *Controller { +func newTestController(t *testing.T, ctrl *gomock.Controller, store *storagemock.MockStorage, mc mergechecker.MergeChecker, publishErr error) *Controller { logger := zaptest.NewLogger(t).Sugar() scope := tally.NoopScope @@ -60,13 +78,22 @@ func newTestController(t *testing.T, ctrl *gomock.Controller, mc mergechecker.Me ) require.NoError(t, err) - return NewController(logger, scope, registry, mc, consumer.TopicKeyValidate, "orchestrator-validate") + return NewController(logger, scope, store, registry, mc, consumer.TopicKeyValidate, "orchestrator-validate") } func TestNewController(t *testing.T) { ctrl := gomock.NewController(t) mc := newMergeableMock(ctrl) - controller := newTestController(t, ctrl, mc, nil) + request := entity.Request{ + ID: "test-queue/123", + Queue: "test-queue", + Change: entity.Change{URIs: []string{"github://uber/service/pull/456/abc123def"}}, + LandStrategy: entity.RequestLandStrategyRebase, + State: entity.RequestStateStarted, + Version: 1, + } + store := newMockStorage(ctrl, request) + controller := newTestController(t, ctrl, store, mc, nil) require.NotNil(t, controller) assert.Equal(t, consumer.TopicKeyValidate, controller.TopicKey()) @@ -78,8 +105,6 @@ func TestController_Process_Success(t *testing.T) { ctrl := gomock.NewController(t) mc := newMergeableMock(ctrl) - controller := newTestController(t, ctrl, mc, nil) - request := entity.Request{ ID: "test-queue/123", Queue: "test-queue", @@ -88,33 +113,36 @@ func TestController_Process_Success(t *testing.T) { State: entity.RequestStateStarted, Version: 1, } + store := newMockStorage(ctrl, request) - payload, err := request.ToBytes() - require.NoError(t, err) + controller := newTestController(t, ctrl, store, mc, nil) - msg := queue.NewMessage("test-queue/123", payload, "test-queue", nil) + msg := queue.NewMessage("test-queue/123", requestIDPayload(t, request.ID), "test-queue", nil) delivery := queuemock.NewMockDelivery(ctrl) delivery.EXPECT().Message().Return(msg).AnyTimes() delivery.EXPECT().Attempt().Return(1).AnyTimes() - err = controller.Process(context.Background(), delivery) + err := controller.Process(context.Background(), delivery) require.NoError(t, err) } -func TestController_Process_InvalidJSON(t *testing.T) { +func TestController_Process_StorageFailure(t *testing.T) { ctrl := gomock.NewController(t) mc := newMergeableMock(ctrl) - controller := newTestController(t, ctrl, mc, nil) + mockReqStore := storagemock.NewMockRequestStore(ctrl) + mockReqStore.EXPECT().Get(gomock.Any(), "test-queue/123").Return(entity.Request{}, fmt.Errorf("db connection lost")) + store := storagemock.NewMockStorage(ctrl) + store.EXPECT().GetRequestStore().Return(mockReqStore).AnyTimes() + + controller := newTestController(t, ctrl, store, mc, nil) - invalidPayload := []byte(`{"invalid": json"}`) - msg := queue.NewMessage("invalid-msg", invalidPayload, "partition1", nil) + msg := queue.NewMessage("test-queue/123", requestIDPayload(t, "test-queue/123"), "test-queue", nil) delivery := queuemock.NewMockDelivery(ctrl) delivery.EXPECT().Message().Return(msg).AnyTimes() delivery.EXPECT().Attempt().Return(1).AnyTimes() err := controller.Process(context.Background(), delivery) - require.Error(t, err) assert.False(t, errs.IsRetryable(err)) } @@ -123,8 +151,6 @@ func TestController_Process_PublishFailure(t *testing.T) { ctrl := gomock.NewController(t) mc := newMergeableMock(ctrl) - controller := newTestController(t, ctrl, mc, fmt.Errorf("publish failed")) - request := entity.Request{ ID: "test-queue/123", Queue: "test-queue", @@ -133,23 +159,25 @@ func TestController_Process_PublishFailure(t *testing.T) { State: entity.RequestStateStarted, Version: 1, } + store := newMockStorage(ctrl, request) - payload, err := request.ToBytes() - require.NoError(t, err) + controller := newTestController(t, ctrl, store, mc, fmt.Errorf("publish failed")) - msg := queue.NewMessage(request.ID, payload, request.Queue, nil) + msg := queue.NewMessage(request.ID, requestIDPayload(t, request.ID), request.Queue, nil) delivery := queuemock.NewMockDelivery(ctrl) delivery.EXPECT().Message().Return(msg).AnyTimes() delivery.EXPECT().Attempt().Return(1).AnyTimes() - err = controller.Process(context.Background(), delivery) + err := controller.Process(context.Background(), delivery) assert.Error(t, err) } func TestController_InterfaceImplementation(t *testing.T) { ctrl := gomock.NewController(t) mc := newMergeableMock(ctrl) - controller := newTestController(t, ctrl, mc, nil) + request := entity.Request{ID: "test-queue/123", Queue: "test-queue"} + store := newMockStorage(ctrl, request) + controller := newTestController(t, ctrl, store, mc, nil) var _ consumer.Controller = controller } @@ -160,8 +188,6 @@ func TestController_Process_NotMergeable(t *testing.T) { mc := mergecheckermock.NewMockMergeChecker(ctrl) mc.EXPECT().Check(gomock.Any(), gomock.Any(), gomock.Any()).Return(mergechecker.Result{Mergeable: false}, nil) - controller := newTestController(t, ctrl, mc, nil) - request := entity.Request{ ID: "test-queue/123", Queue: "test-queue", @@ -170,16 +196,16 @@ func TestController_Process_NotMergeable(t *testing.T) { State: entity.RequestStateStarted, Version: 1, } + store := newMockStorage(ctrl, request) - payload, err := request.ToBytes() - require.NoError(t, err) + controller := newTestController(t, ctrl, store, mc, nil) - msg := queue.NewMessage(request.ID, payload, request.Queue, nil) + msg := queue.NewMessage(request.ID, requestIDPayload(t, request.ID), request.Queue, nil) delivery := queuemock.NewMockDelivery(ctrl) delivery.EXPECT().Message().Return(msg).AnyTimes() delivery.EXPECT().Attempt().Return(1).AnyTimes() - err = controller.Process(context.Background(), delivery) + err := controller.Process(context.Background(), delivery) require.Error(t, err) assert.False(t, errs.IsRetryable(err)) } @@ -190,8 +216,6 @@ func TestController_Process_MergeCheckError(t *testing.T) { mc := mergecheckermock.NewMockMergeChecker(ctrl) mc.EXPECT().Check(gomock.Any(), gomock.Any(), gomock.Any()).Return(mergechecker.Result{}, fmt.Errorf("merge check failed")) - controller := newTestController(t, ctrl, mc, nil) - request := entity.Request{ ID: "test-queue/123", Queue: "test-queue", @@ -200,16 +224,16 @@ func TestController_Process_MergeCheckError(t *testing.T) { State: entity.RequestStateStarted, Version: 1, } + store := newMockStorage(ctrl, request) - payload, err := request.ToBytes() - require.NoError(t, err) + controller := newTestController(t, ctrl, store, mc, nil) - msg := queue.NewMessage(request.ID, payload, request.Queue, nil) + msg := queue.NewMessage(request.ID, requestIDPayload(t, request.ID), request.Queue, nil) delivery := queuemock.NewMockDelivery(ctrl) delivery.EXPECT().Message().Return(msg).AnyTimes() delivery.EXPECT().Attempt().Return(1).AnyTimes() - err = controller.Process(context.Background(), delivery) + err := controller.Process(context.Background(), delivery) require.Error(t, err) assert.False(t, errs.IsRetryable(err)) }