From d37a9f7d036c2eee3711ae0f660773c674863c0e Mon Sep 17 00:00:00 2001 From: manjari Date: Wed, 4 Mar 2026 23:00:47 +0000 Subject: [PATCH 1/3] feat(controller) Conclude controller updates request state to reflect batch outcome --- orchestrator/controller/conclude/BUILD.bazel | 4 + orchestrator/controller/conclude/conclude.go | 67 +++++- .../controller/conclude/conclude_test.go | 194 ++++++++++++++++-- 3 files changed, 239 insertions(+), 26 deletions(-) diff --git a/orchestrator/controller/conclude/BUILD.bazel b/orchestrator/controller/conclude/BUILD.bazel index 09ae5e72..20768679 100644 --- a/orchestrator/controller/conclude/BUILD.bazel +++ b/orchestrator/controller/conclude/BUILD.bazel @@ -7,7 +7,9 @@ go_library( visibility = ["//visibility:public"], deps = [ "//core/consumer", + "//core/errs", "//entity", + "//extension/storage", "@com_github_uber_go_tally_v4//:tally", "@org_uber_go_zap//:zap", ], @@ -23,6 +25,8 @@ go_test( "//entity", "//entity/queue", "//extension/queue/mock", + "//extension/storage", + "//extension/storage/mock", "@com_github_stretchr_testify//assert", "@com_github_stretchr_testify//require", "@com_github_uber_go_tally_v4//:tally", diff --git a/orchestrator/controller/conclude/conclude.go b/orchestrator/controller/conclude/conclude.go index 6beae488..2602ddde 100644 --- a/orchestrator/controller/conclude/conclude.go +++ b/orchestrator/controller/conclude/conclude.go @@ -6,7 +6,9 @@ import ( "github.com/uber-go/tally/v4" "github.com/uber/submitqueue/core/consumer" + "github.com/uber/submitqueue/core/errs" "github.com/uber/submitqueue/entity" + "github.com/uber/submitqueue/extension/storage" "go.uber.org/zap" ) @@ -16,6 +18,7 @@ import ( type Controller struct { logger *zap.SugaredLogger metricsScope tally.Scope + store storage.Storage registry consumer.TopicRegistry topicKey consumer.TopicKey consumerGroup string @@ -28,6 +31,7 @@ var _ consumer.Controller = (*Controller)(nil) func NewController( logger *zap.SugaredLogger, scope tally.Scope, + store storage.Storage, registry consumer.TopicRegistry, topicKey consumer.TopicKey, consumerGroup string, @@ -35,6 +39,7 @@ func NewController( return &Controller{ logger: logger.Named("conclude_controller"), metricsScope: scope.SubScope("conclude_controller"), + store: store, registry: registry, topicKey: topicKey, consumerGroup: consumerGroup, @@ -72,10 +77,52 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er "partition_key", msg.PartitionKey, ) - // TODO: Add conclusion logic - // - Mark batch as succeeded or failed - // - Send notifications - // - Clean up resources + // TODO: Handle cancellation + + // Map batch terminal state to request state. + // We expect the batch to be in a terminal state + // as updated by the merge controller. + requestState, err := batchStateToRequestState(batch.State) + if err != nil { + c.logger.Errorw("unexpected batch state", + "batch_id", batch.ID, + "state", string(batch.State), + ) + c.metricsScope.Counter("unexpected_state_errors").Inc(1) + return fmt.Errorf("unexpected batch state %q for batch %s: %w", batch.State, batch.ID, err) + } + + // Update each request's state to reflect the batch outcome. + for _, requestID := range batch.Contains { + request, err := c.store.GetRequestStore().Get(ctx, requestID) + if err != nil { + c.logger.Errorw("failed to get request from storage", + "batch_id", batch.ID, + "request_id", requestID, + "error", err, + ) + c.metricsScope.Counter("request_store_errors").Inc(1) + return errs.NewRetryableError(fmt.Errorf("failed to get request %s: %w", requestID, err)) + } + + if err := c.store.GetRequestStore().UpdateState(ctx, requestID, request.Version, requestState); err != nil { + c.logger.Errorw("failed to update request state", + "batch_id", batch.ID, + "request_id", requestID, + "from_version", request.Version, + "to_state", string(requestState), + "error", err, + ) + c.metricsScope.Counter("request_update_errors").Inc(1) + return errs.NewRetryableError(fmt.Errorf("failed to update request %s state to %s: %w", requestID, requestState, err)) + } + + c.logger.Infow("updated request state", + "batch_id", batch.ID, + "request_id", requestID, + "new_state", string(requestState), + ) + } c.metricsScope.Counter("processed").Inc(1) @@ -96,3 +143,15 @@ func (c *Controller) TopicKey() consumer.TopicKey { func (c *Controller) ConsumerGroup() string { return c.consumerGroup } + +// batchStateToRequestState maps a terminal batch state to the corresponding request state. +func batchStateToRequestState(state entity.BatchState) (entity.RequestState, error) { + switch state { + case entity.BatchStateSucceeded: + return entity.RequestStateLanded, nil + case entity.BatchStateFailed: + return entity.RequestStateError, nil + default: + return entity.RequestStateUnknown, fmt.Errorf("non-terminal batch state: %s", state) + } +} diff --git a/orchestrator/controller/conclude/conclude_test.go b/orchestrator/controller/conclude/conclude_test.go index ac0e783e..8e6ba20f 100644 --- a/orchestrator/controller/conclude/conclude_test.go +++ b/orchestrator/controller/conclude/conclude_test.go @@ -2,6 +2,7 @@ package conclude import ( "context" + "fmt" "testing" "github.com/stretchr/testify/assert" @@ -12,23 +13,32 @@ import ( "github.com/uber/submitqueue/entity" "github.com/uber/submitqueue/entity/queue" queuemock "github.com/uber/submitqueue/extension/queue/mock" + "github.com/uber/submitqueue/extension/storage" + storagemock "github.com/uber/submitqueue/extension/storage/mock" "go.uber.org/mock/gomock" "go.uber.org/zap/zaptest" ) // newTestController creates a controller with test dependencies. -func newTestController(t *testing.T) *Controller { +func newTestController(t *testing.T, ctrl *gomock.Controller, mockStorage *storagemock.MockStorage) *Controller { logger := zaptest.NewLogger(t).Sugar() scope := tally.NoopScope + if mockStorage == nil { + mockRequestStore := storagemock.NewMockRequestStore(ctrl) + mockStorage = storagemock.NewMockStorage(ctrl) + mockStorage.EXPECT().GetRequestStore().Return(mockRequestStore).AnyTimes() + } + registry, err := consumer.NewTopicRegistry(nil) require.NoError(t, err) - return NewController(logger, scope, registry, consumer.TopicKeyConclude, "orchestrator-conclude") + return NewController(logger, scope, mockStorage, registry, consumer.TopicKeyConclude, "orchestrator-conclude") } func TestNewController(t *testing.T) { - controller := newTestController(t) + ctrl := gomock.NewController(t) + controller := newTestController(t, ctrl, nil) require.NotNil(t, controller) assert.Equal(t, consumer.TopicKeyConclude, controller.TopicKey()) @@ -36,34 +46,173 @@ func TestNewController(t *testing.T) { assert.Equal(t, "conclude", controller.Name()) } -func TestController_Process_Success(t *testing.T) { - ctrl := gomock.NewController(t) +func TestController_Process(t *testing.T) { + tests := []struct { + name string + batch entity.Batch + setupStore func(*gomock.Controller) *storagemock.MockStorage + wantErr bool + retryable bool + }{ + { + name: "succeeded batch lands requests", + batch: entity.Batch{ + ID: "test-queue/batch/1", + Queue: "test-queue", + Contains: []string{"test-queue/1", "test-queue/2"}, + State: entity.BatchStateSucceeded, + Version: 3, + }, + setupStore: func(ctrl *gomock.Controller) *storagemock.MockStorage { + mockRequestStore := storagemock.NewMockRequestStore(ctrl) + mockRequestStore.EXPECT().Get(gomock.Any(), "test-queue/1").Return(entity.Request{ + ID: "test-queue/1", Version: 2, State: entity.RequestStateProcessing, + }, nil) + mockRequestStore.EXPECT().UpdateState(gomock.Any(), "test-queue/1", int32(2), entity.RequestStateLanded).Return(nil) + mockRequestStore.EXPECT().Get(gomock.Any(), "test-queue/2").Return(entity.Request{ + ID: "test-queue/2", Version: 3, State: entity.RequestStateProcessing, + }, nil) + mockRequestStore.EXPECT().UpdateState(gomock.Any(), "test-queue/2", int32(3), entity.RequestStateLanded).Return(nil) + + mockStorage := storagemock.NewMockStorage(ctrl) + mockStorage.EXPECT().GetRequestStore().Return(mockRequestStore).AnyTimes() + return mockStorage + }, + }, + { + name: "failed batch errors requests", + batch: entity.Batch{ + ID: "test-queue/batch/2", + Queue: "test-queue", + Contains: []string{"test-queue/5"}, + State: entity.BatchStateFailed, + Version: 2, + }, + setupStore: func(ctrl *gomock.Controller) *storagemock.MockStorage { + mockRequestStore := storagemock.NewMockRequestStore(ctrl) + mockRequestStore.EXPECT().Get(gomock.Any(), "test-queue/5").Return(entity.Request{ + ID: "test-queue/5", Version: 1, State: entity.RequestStateProcessing, + }, nil) + mockRequestStore.EXPECT().UpdateState(gomock.Any(), "test-queue/5", int32(1), entity.RequestStateError).Return(nil) + + mockStorage := storagemock.NewMockStorage(ctrl) + mockStorage.EXPECT().GetRequestStore().Return(mockRequestStore).AnyTimes() + return mockStorage + }, + }, + { + name: "cancelled batch returns error", + batch: entity.Batch{ + ID: "test-queue/batch/3", + Queue: "test-queue", + Contains: []string{"test-queue/10"}, + State: entity.BatchStateCancelled, + Version: 2, + }, + wantErr: true, + retryable: false, + }, + { + name: "non-terminal batch state returns error", + batch: entity.Batch{ + ID: "test-queue/batch/4", + Queue: "test-queue", + Contains: []string{"test-queue/1"}, + State: entity.BatchStateCreated, + Version: 1, + }, + wantErr: true, + retryable: false, + }, + { + name: "request store get failure is retryable", + batch: entity.Batch{ + ID: "test-queue/batch/5", + Queue: "test-queue", + Contains: []string{"test-queue/1"}, + State: entity.BatchStateSucceeded, + Version: 2, + }, + setupStore: func(ctrl *gomock.Controller) *storagemock.MockStorage { + mockRequestStore := storagemock.NewMockRequestStore(ctrl) + mockRequestStore.EXPECT().Get(gomock.Any(), "test-queue/1").Return(entity.Request{}, fmt.Errorf("db connection lost")) + + mockStorage := storagemock.NewMockStorage(ctrl) + mockStorage.EXPECT().GetRequestStore().Return(mockRequestStore).AnyTimes() + return mockStorage + }, + wantErr: true, + retryable: true, + }, + { + name: "request store update failure is retryable", + batch: entity.Batch{ + ID: "test-queue/batch/6", + Queue: "test-queue", + Contains: []string{"test-queue/1"}, + State: entity.BatchStateSucceeded, + Version: 2, + }, + setupStore: func(ctrl *gomock.Controller) *storagemock.MockStorage { + mockRequestStore := storagemock.NewMockRequestStore(ctrl) + mockRequestStore.EXPECT().Get(gomock.Any(), "test-queue/1").Return(entity.Request{ + ID: "test-queue/1", Version: 2, State: entity.RequestStateProcessing, + }, nil) + mockRequestStore.EXPECT().UpdateState(gomock.Any(), "test-queue/1", int32(2), entity.RequestStateLanded).Return(storage.ErrVersionMismatch) + + mockStorage := storagemock.NewMockStorage(ctrl) + mockStorage.EXPECT().GetRequestStore().Return(mockRequestStore).AnyTimes() + return mockStorage + }, + wantErr: true, + retryable: true, + }, + { + name: "empty contains list succeeds", + batch: entity.Batch{ + ID: "test-queue/batch/7", + Queue: "test-queue", + State: entity.BatchStateSucceeded, + Version: 1, + }, + }, + } - controller := newTestController(t) + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctrl := gomock.NewController(t) - batch := entity.Batch{ - ID: "test-queue/batch/1", - Queue: "test-queue", - State: entity.BatchStateCreated, - Version: 1, - } + var mockStorage *storagemock.MockStorage + if tt.setupStore != nil { + mockStorage = tt.setupStore(ctrl) + } - payload, err := batch.ToBytes() - require.NoError(t, err) + controller := newTestController(t, ctrl, mockStorage) - msg := queue.NewMessage("test-queue/batch/1", payload, "test-queue", nil) - delivery := queuemock.NewMockDelivery(ctrl) - delivery.EXPECT().Message().Return(msg).AnyTimes() - delivery.EXPECT().Attempt().Return(1).AnyTimes() + payload, err := tt.batch.ToBytes() + require.NoError(t, err) - err = controller.Process(context.Background(), delivery) - require.NoError(t, err) + msg := queue.NewMessage(tt.batch.ID, payload, tt.batch.Queue, nil) + delivery := queuemock.NewMockDelivery(ctrl) + delivery.EXPECT().Message().Return(msg).AnyTimes() + delivery.EXPECT().Attempt().Return(1).AnyTimes() + + err = controller.Process(context.Background(), delivery) + + if tt.wantErr { + require.Error(t, err) + assert.Equal(t, tt.retryable, errs.IsRetryable(err)) + } else { + require.NoError(t, err) + } + }) + } } func TestController_Process_InvalidJSON(t *testing.T) { ctrl := gomock.NewController(t) - controller := newTestController(t) + controller := newTestController(t, ctrl, nil) invalidPayload := []byte(`{"invalid": json"}`) msg := queue.NewMessage("invalid-msg", invalidPayload, "partition1", nil) @@ -78,7 +227,8 @@ func TestController_Process_InvalidJSON(t *testing.T) { } func TestController_InterfaceImplementation(t *testing.T) { - controller := newTestController(t) + ctrl := gomock.NewController(t) + controller := newTestController(t, ctrl, nil) var _ consumer.Controller = controller } From c320650ff33cacbd4c8b1d687ee2607062730360 Mon Sep 17 00:00:00 2001 From: manjari Date: Wed, 4 Mar 2026 23:15:24 +0000 Subject: [PATCH 2/3] use core/metrics --- orchestrator/controller/conclude/BUILD.bazel | 1 + orchestrator/controller/conclude/conclude.go | 16 ++++++++-------- 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/orchestrator/controller/conclude/BUILD.bazel b/orchestrator/controller/conclude/BUILD.bazel index 20768679..df2075f9 100644 --- a/orchestrator/controller/conclude/BUILD.bazel +++ b/orchestrator/controller/conclude/BUILD.bazel @@ -8,6 +8,7 @@ go_library( deps = [ "//core/consumer", "//core/errs", + "//core/metrics", "//entity", "//extension/storage", "@com_github_uber_go_tally_v4//:tally", diff --git a/orchestrator/controller/conclude/conclude.go b/orchestrator/controller/conclude/conclude.go index 2602ddde..94138708 100644 --- a/orchestrator/controller/conclude/conclude.go +++ b/orchestrator/controller/conclude/conclude.go @@ -7,6 +7,7 @@ import ( "github.com/uber-go/tally/v4" "github.com/uber/submitqueue/core/consumer" "github.com/uber/submitqueue/core/errs" + "github.com/uber/submitqueue/core/metrics" "github.com/uber/submitqueue/entity" "github.com/uber/submitqueue/extension/storage" "go.uber.org/zap" @@ -49,8 +50,9 @@ func NewController( // Process processes a conclude delivery from the queue. // Deserializes the batch and completes the pipeline processing. // Returns nil to ack (success), or error to nack (retry). -func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) error { - c.metricsScope.Counter("received").Inc(1) +func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (retErr error) { + op := metrics.Begin(c.metricsScope, "process") + defer func() { op.Complete(retErr) }() msg := delivery.Message() @@ -63,8 +65,8 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er "attempt", delivery.Attempt(), "error", err, ) - c.metricsScope.Counter("deserialize_errors").Inc(1) // Non-retryable: malformed messages will never succeed regardless of retry count + metrics.NamedCounter(c.metricsScope, "process", "deserialize_errors", 1) return fmt.Errorf("failed to deserialize batch: %w", err) } @@ -88,7 +90,7 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er "batch_id", batch.ID, "state", string(batch.State), ) - c.metricsScope.Counter("unexpected_state_errors").Inc(1) + metrics.NamedCounter(c.metricsScope, "process", "unexpected_state_errors", 1) return fmt.Errorf("unexpected batch state %q for batch %s: %w", batch.State, batch.ID, err) } @@ -101,7 +103,7 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er "request_id", requestID, "error", err, ) - c.metricsScope.Counter("request_store_errors").Inc(1) + metrics.NamedCounter(c.metricsScope, "process", "request_store_errors", 1) return errs.NewRetryableError(fmt.Errorf("failed to get request %s: %w", requestID, err)) } @@ -113,7 +115,7 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er "to_state", string(requestState), "error", err, ) - c.metricsScope.Counter("request_update_errors").Inc(1) + metrics.NamedCounter(c.metricsScope, "process", "request_update_errors", 1) return errs.NewRetryableError(fmt.Errorf("failed to update request %s state to %s: %w", requestID, requestState, err)) } @@ -124,8 +126,6 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er ) } - c.metricsScope.Counter("processed").Inc(1) - return nil // Success - message will be acked } From f8e29dc7e36c116c85002843a50cfa8706343e89 Mon Sep 17 00:00:00 2001 From: manjari Date: Wed, 4 Mar 2026 23:29:57 +0000 Subject: [PATCH 3/3] fix example --- example/server/orchestrator/main.go | 1 + 1 file changed, 1 insertion(+) diff --git a/example/server/orchestrator/main.go b/example/server/orchestrator/main.go index f59b9eb9..c776210a 100644 --- a/example/server/orchestrator/main.go +++ b/example/server/orchestrator/main.go @@ -406,6 +406,7 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t concludeController := conclude.NewController( logger, scope, + store, registry, consumer.TopicKeyConclude, "orchestrator-conclude",