diff --git a/core/consumer/registry.go b/core/consumer/registry.go index af8535fb..51b42ec0 100644 --- a/core/consumer/registry.go +++ b/core/consumer/registry.go @@ -10,6 +10,18 @@ const ( TopicRequest Topic = "request" // TopicToBatch is where validated requests are published for batching. TopicToBatch Topic = "to-batch" + // TopicBatched is where batched requests are published for speculation. + TopicBatched Topic = "batched" + // TopicBuild is where requests are published for builds. + TopicBuild Topic = "build" + // TopicBuildSignal is where build signals are published for processing. + TopicBuildSignal Topic = "build-signal" + // TopicToMerge is where requests are published for merging. + TopicToMerge Topic = "to-merge" + // TopicMergeSignal is where merge signals are published for processing. + TopicMergeSignal Topic = "merge-signal" + // TopicFinalize is where requests are published for finalization. + TopicFinalize Topic = "finalize" ) // String returns the topic name as a string. diff --git a/example/server/orchestrator/BUILD.bazel b/example/server/orchestrator/BUILD.bazel index 42f081af..7cf1a272 100644 --- a/example/server/orchestrator/BUILD.bazel +++ b/example/server/orchestrator/BUILD.bazel @@ -15,7 +15,14 @@ go_library( "//extension/queue", "//extension/queue/mysql", "//orchestrator/controller", + "//orchestrator/controller/batch", + "//orchestrator/controller/build", + "//orchestrator/controller/buildsignal", + "//orchestrator/controller/finalize", + "//orchestrator/controller/merge", + "//orchestrator/controller/mergesignal", "//orchestrator/controller/request", + "//orchestrator/controller/speculate", "//orchestrator/protopb", "@com_github_go_sql_driver_mysql//:mysql", "@com_github_uber_go_tally_v4//:tally", diff --git a/example/server/orchestrator/main.go b/example/server/orchestrator/main.go index 5f229c5a..e838591d 100644 --- a/example/server/orchestrator/main.go +++ b/example/server/orchestrator/main.go @@ -17,7 +17,14 @@ import ( extqueue "github.com/uber/submitqueue/extension/queue" queueMySQL "github.com/uber/submitqueue/extension/queue/mysql" "github.com/uber/submitqueue/orchestrator/controller" + "github.com/uber/submitqueue/orchestrator/controller/batch" + "github.com/uber/submitqueue/orchestrator/controller/build" + "github.com/uber/submitqueue/orchestrator/controller/buildsignal" + "github.com/uber/submitqueue/orchestrator/controller/finalize" + "github.com/uber/submitqueue/orchestrator/controller/merge" + "github.com/uber/submitqueue/orchestrator/controller/mergesignal" "github.com/uber/submitqueue/orchestrator/controller/request" + "github.com/uber/submitqueue/orchestrator/controller/speculate" pb "github.com/uber/submitqueue/orchestrator/protopb" "go.uber.org/zap" "google.golang.org/grpc" @@ -116,37 +123,17 @@ func run() error { subscriberName = fmt.Sprintf("orchestrator-%d", time.Now().Unix()) } - registry := consumer.NewTopicRegistry( - []consumer.TopicConfig{ - {Topic: consumer.TopicRequest, Queue: mysqlQueue}, - {Topic: consumer.TopicToBatch, Queue: mysqlQueue}, - }, - []extqueue.SubscriptionConfig{ - extqueue.DefaultSubscriptionConfig( - consumer.TopicRequest.String(), - subscriberName, - "orchestrator-request", - ), - }, - ) + registry := newTopicRegistry(mysqlQueue, subscriberName) // Create consumer c := consumer.New(logger.Sugar(), scope.SubScope("consumer"), registry) - // Register request controller - // Pipeline: request → batch → speculation → build → merge - requestController := request.NewController( - logger.Sugar(), - scope, - registry, - consumer.TopicRequest, - "orchestrator-request", - ) - if err := c.Register(requestController); err != nil { - return fmt.Errorf("failed to register request controller: %w", err) + // Register controllers + if err := registerControllers(c, logger.Sugar(), scope, registry); err != nil { + return err } - logger.Info("controllers registered", zap.Int("count", 1)) + logger.Info("controllers registered", zap.Int("count", 8)) // Start consumers if err := c.Start(ctx); err != nil { @@ -202,3 +189,158 @@ func run() error { return err } + +// newTopicRegistry builds the TopicRegistry with all topic and subscription configs. +func newTopicRegistry(q extqueue.Queue, subscriberName string) consumer.TopicRegistry { + return consumer.NewTopicRegistry( + []consumer.TopicConfig{ + {Topic: consumer.TopicRequest, Queue: q}, + {Topic: consumer.TopicToBatch, Queue: q}, + {Topic: consumer.TopicBatched, Queue: q}, + {Topic: consumer.TopicBuild, Queue: q}, + {Topic: consumer.TopicBuildSignal, Queue: q}, + {Topic: consumer.TopicToMerge, Queue: q}, + {Topic: consumer.TopicMergeSignal, Queue: q}, + {Topic: consumer.TopicFinalize, Queue: q}, + }, + []extqueue.SubscriptionConfig{ + extqueue.DefaultSubscriptionConfig( + consumer.TopicRequest.String(), + subscriberName, + "orchestrator-request", + ), + extqueue.DefaultSubscriptionConfig( + consumer.TopicToBatch.String(), + subscriberName, + "orchestrator-batch", + ), + extqueue.DefaultSubscriptionConfig( + consumer.TopicBatched.String(), + subscriberName, + "orchestrator-speculate", + ), + extqueue.DefaultSubscriptionConfig( + consumer.TopicBuild.String(), + subscriberName, + "orchestrator-build", + ), + extqueue.DefaultSubscriptionConfig( + consumer.TopicBuildSignal.String(), + subscriberName, + "orchestrator-buildsignal", + ), + extqueue.DefaultSubscriptionConfig( + consumer.TopicToMerge.String(), + subscriberName, + "orchestrator-merge", + ), + extqueue.DefaultSubscriptionConfig( + consumer.TopicMergeSignal.String(), + subscriberName, + "orchestrator-mergesignal", + ), + extqueue.DefaultSubscriptionConfig( + consumer.TopicFinalize.String(), + subscriberName, + "orchestrator-finalize", + ), + }, + ) +} + +// registerControllers creates all pipeline controllers and registers them with the consumer. +// Pipeline: request → batch → speculate → build → build-signal +// +// → merge → merge-signal +// finalize (terminal) +func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope tally.Scope, registry consumer.TopicRegistry) error { + requestController := request.NewController( + logger, + scope, + registry, + consumer.TopicRequest, + "orchestrator-request", + ) + if err := c.Register(requestController); err != nil { + return fmt.Errorf("failed to register request controller: %w", err) + } + + batchController := batch.NewController( + logger, + scope, + registry, + consumer.TopicToBatch, + "orchestrator-batch", + ) + if err := c.Register(batchController); err != nil { + return fmt.Errorf("failed to register batch controller: %w", err) + } + + speculateController := speculate.NewController( + logger, + scope, + registry, + consumer.TopicBatched, + "orchestrator-speculate", + ) + if err := c.Register(speculateController); err != nil { + return fmt.Errorf("failed to register speculate controller: %w", err) + } + + buildController := build.NewController( + logger, + scope, + registry, + consumer.TopicBuild, + "orchestrator-build", + ) + if err := c.Register(buildController); err != nil { + return fmt.Errorf("failed to register build controller: %w", err) + } + + buildSignalController := buildsignal.NewController( + logger, + scope, + registry, + consumer.TopicBuildSignal, + "orchestrator-buildsignal", + ) + if err := c.Register(buildSignalController); err != nil { + return fmt.Errorf("failed to register buildsignal controller: %w", err) + } + + mergeController := merge.NewController( + logger, + scope, + registry, + consumer.TopicToMerge, + "orchestrator-merge", + ) + if err := c.Register(mergeController); err != nil { + return fmt.Errorf("failed to register merge controller: %w", err) + } + + mergeSignalController := mergesignal.NewController( + logger, + scope, + registry, + consumer.TopicMergeSignal, + "orchestrator-mergesignal", + ) + if err := c.Register(mergeSignalController); err != nil { + return fmt.Errorf("failed to register mergesignal controller: %w", err) + } + + finalizeController := finalize.NewController( + logger, + scope, + registry, + consumer.TopicFinalize, + "orchestrator-finalize", + ) + if err := c.Register(finalizeController); err != nil { + return fmt.Errorf("failed to register finalize controller: %w", err) + } + + return nil +} diff --git a/orchestrator/controller/batch/BUILD.bazel b/orchestrator/controller/batch/BUILD.bazel new file mode 100644 index 00000000..dba155a0 --- /dev/null +++ b/orchestrator/controller/batch/BUILD.bazel @@ -0,0 +1,32 @@ +load("@rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "batch", + srcs = ["batch.go"], + importpath = "github.com/uber/submitqueue/orchestrator/controller/batch", + visibility = ["//visibility:public"], + deps = [ + "//core/consumer", + "//entity", + "//entity/queue", + "@com_github_uber_go_tally_v4//:tally", + "@org_uber_go_zap//:zap", + ], +) + +go_test( + name = "batch_test", + srcs = ["batch_test.go"], + embed = [":batch"], + deps = [ + "//core/consumer", + "//entity", + "//entity/queue", + "//extension/queue/mock", + "@com_github_stretchr_testify//assert", + "@com_github_stretchr_testify//require", + "@com_github_uber_go_tally_v4//:tally", + "@org_uber_go_mock//gomock", + "@org_uber_go_zap//zaptest", + ], +) diff --git a/orchestrator/controller/batch/batch.go b/orchestrator/controller/batch/batch.go new file mode 100644 index 00000000..cc3bc7eb --- /dev/null +++ b/orchestrator/controller/batch/batch.go @@ -0,0 +1,135 @@ +package batch + +import ( + "context" + "fmt" + + "github.com/uber-go/tally/v4" + "github.com/uber/submitqueue/core/consumer" + "github.com/uber/submitqueue/entity" + entityqueue "github.com/uber/submitqueue/entity/queue" + "go.uber.org/zap" +) + +// Controller handles batch queue messages. +// It consumes validated requests, groups them into batches, and publishes to the speculate stage. +// Implements consumer.Controller interface for integration with the consumer. +type Controller struct { + logger *zap.SugaredLogger + metricsScope tally.Scope + registry consumer.TopicRegistry + topic consumer.Topic + consumerGroup string +} + +// Verify Controller implements consumer.Controller interface at compile time. +var _ consumer.Controller = (*Controller)(nil) + +// NewController creates a new batch controller for the orchestrator. +func NewController( + logger *zap.SugaredLogger, + scope tally.Scope, + registry consumer.TopicRegistry, + topic consumer.Topic, + consumerGroup string, +) *Controller { + return &Controller{ + logger: logger.Named("batch_controller"), + metricsScope: scope.SubScope("batch_controller"), + registry: registry, + topic: topic, + consumerGroup: consumerGroup, + } +} + +// Process processes a batch delivery from the queue. +// Deserializes the request, groups into batch, and publishes to the speculate topic. +// Returns nil to ack (success), or error to nack (retry). +func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) error { + c.metricsScope.Counter("received").Inc(1) + + msg := delivery.Message() + + // Deserialize request entity + request, err := entity.RequestFromBytes(msg.Payload) + if err != nil { + c.logger.Errorw("failed to deserialize request", + "message_id", msg.ID, + "partition_key", msg.PartitionKey, + "attempt", delivery.Attempt(), + "error", err, + ) + c.metricsScope.Counter("deserialize_errors").Inc(1) + // Non-retryable: malformed messages will never succeed regardless of retry count + return consumer.NewNonRetryableError(fmt.Errorf("failed to deserialize request: %w", err)) + } + + c.logger.Infow("received batch event", + "request_id", request.ID, + "queue", request.Queue, + "state", string(request.State), + "version", request.Version, + "attempt", delivery.Attempt(), + "partition_key", msg.PartitionKey, + ) + + // TODO: Add batching logic + // - Group requests into batches + // - Create batch entity with dependencies + + // Publish to speculate topic + if err := c.publish(ctx, consumer.TopicBatched, request); err != nil { + c.logger.Errorw("failed to publish output", + "request_id", request.ID, + "topic", consumer.TopicBatched, + "error", err, + ) + c.metricsScope.Counter("publish_errors").Inc(1) + return fmt.Errorf("failed to publish to speculate: %w", err) + } + + c.logger.Infow("published request to next stage", + "request_id", request.ID, + "topic", consumer.TopicBatched, + ) + + c.metricsScope.Counter("processed").Inc(1) + + return nil // Success - message will be acked +} + +// publish publishes a request to the specified topic. +func (c *Controller) publish(ctx context.Context, topic consumer.Topic, request entity.Request) error { + payload, err := request.ToBytes() + if err != nil { + return fmt.Errorf("failed to serialize request: %w", err) + } + + msg := entityqueue.NewMessage(request.ID, payload, request.Queue, nil) + + q, ok := c.registry.Queue(topic) + if !ok { + return fmt.Errorf("no queue registered for topic %s", topic) + } + + if err := q.Publisher().Publish(ctx, topic.String(), msg); err != nil { + return fmt.Errorf("failed to publish message: %w", err) + } + + return nil +} + +// Name returns the controller name for logging and metrics. +func (c *Controller) Name() string { + return "batch" +} + +// Topic returns the topic this controller subscribes to. +func (c *Controller) Topic() consumer.Topic { + return c.topic +} + +// ConsumerGroup returns the consumer group for offset tracking. +func (c *Controller) ConsumerGroup() string { + return c.consumerGroup +} diff --git a/orchestrator/controller/batch/batch_test.go b/orchestrator/controller/batch/batch_test.go new file mode 100644 index 00000000..2383e6db --- /dev/null +++ b/orchestrator/controller/batch/batch_test.go @@ -0,0 +1,126 @@ +package batch + +import ( + "context" + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/uber-go/tally/v4" + "github.com/uber/submitqueue/core/consumer" + "github.com/uber/submitqueue/entity" + "github.com/uber/submitqueue/entity/queue" + queuemock "github.com/uber/submitqueue/extension/queue/mock" + "go.uber.org/mock/gomock" + "go.uber.org/zap/zaptest" +) + +// newTestController creates a controller with test dependencies. +func newTestController(t *testing.T, ctrl *gomock.Controller, publishErr error) *Controller { + logger := zaptest.NewLogger(t).Sugar() + scope := tally.NoopScope + + mockPub := queuemock.NewMockPublisher(ctrl) + mockPub.EXPECT().Publish(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( + func(ctx context.Context, topic string, msg queue.Message) error { + return publishErr + }, + ).AnyTimes() + + mockQ := queuemock.NewMockQueue(ctrl) + mockQ.EXPECT().Publisher().Return(mockPub).AnyTimes() + + registry := consumer.NewTopicRegistry( + []consumer.TopicConfig{{Topic: consumer.TopicBatched, Queue: mockQ}}, + nil, + ) + + return NewController(logger, scope, registry, consumer.TopicToBatch, "orchestrator-batch") +} + +func TestNewController(t *testing.T) { + ctrl := gomock.NewController(t) + controller := newTestController(t, ctrl, nil) + + require.NotNil(t, controller) + assert.Equal(t, consumer.TopicToBatch, controller.Topic()) + assert.Equal(t, "orchestrator-batch", controller.ConsumerGroup()) + assert.Equal(t, "batch", controller.Name()) +} + +func TestController_Process_Success(t *testing.T) { + ctrl := gomock.NewController(t) + + controller := newTestController(t, ctrl, nil) + + request := entity.Request{ + ID: "test-queue/123", + Queue: "test-queue", + Change: entity.Change{Source: "github", IDs: []string{"PR-456"}}, + LandStrategy: entity.RequestLandStrategyRebase, + State: entity.RequestStateNew, + Version: 1, + } + + payload, err := request.ToBytes() + require.NoError(t, err) + + msg := queue.NewMessage("test-queue/123", payload, "test-queue", nil) + delivery := queuemock.NewMockDelivery(ctrl) + delivery.EXPECT().Message().Return(msg).AnyTimes() + delivery.EXPECT().Attempt().Return(1).AnyTimes() + + err = controller.Process(context.Background(), delivery) + require.NoError(t, err) +} + +func TestController_Process_InvalidJSON(t *testing.T) { + ctrl := gomock.NewController(t) + + controller := newTestController(t, ctrl, nil) + + invalidPayload := []byte(`{"invalid": json"}`) + msg := queue.NewMessage("invalid-msg", invalidPayload, "partition1", nil) + delivery := queuemock.NewMockDelivery(ctrl) + delivery.EXPECT().Message().Return(msg).AnyTimes() + delivery.EXPECT().Attempt().Return(1).AnyTimes() + + err := controller.Process(context.Background(), delivery) + + require.Error(t, err) + assert.True(t, consumer.IsNonRetryable(err)) +} + +func TestController_Process_PublishFailure(t *testing.T) { + ctrl := gomock.NewController(t) + + controller := newTestController(t, ctrl, fmt.Errorf("publish failed")) + + request := entity.Request{ + ID: "test-queue/123", + Queue: "test-queue", + Change: entity.Change{Source: "github", IDs: []string{"PR-1"}}, + LandStrategy: entity.RequestLandStrategyRebase, + State: entity.RequestStateNew, + Version: 1, + } + + payload, err := request.ToBytes() + require.NoError(t, err) + + msg := queue.NewMessage(request.ID, payload, request.Queue, nil) + delivery := queuemock.NewMockDelivery(ctrl) + delivery.EXPECT().Message().Return(msg).AnyTimes() + delivery.EXPECT().Attempt().Return(1).AnyTimes() + + err = controller.Process(context.Background(), delivery) + assert.Error(t, err) +} + +func TestController_InterfaceImplementation(t *testing.T) { + ctrl := gomock.NewController(t) + controller := newTestController(t, ctrl, nil) + + var _ consumer.Controller = controller +} diff --git a/orchestrator/controller/build/BUILD.bazel b/orchestrator/controller/build/BUILD.bazel new file mode 100644 index 00000000..89a82b6d --- /dev/null +++ b/orchestrator/controller/build/BUILD.bazel @@ -0,0 +1,32 @@ +load("@rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "build", + srcs = ["build.go"], + importpath = "github.com/uber/submitqueue/orchestrator/controller/build", + visibility = ["//visibility:public"], + deps = [ + "//core/consumer", + "//entity", + "//entity/queue", + "@com_github_uber_go_tally_v4//:tally", + "@org_uber_go_zap//:zap", + ], +) + +go_test( + name = "build_test", + srcs = ["build_test.go"], + embed = [":build"], + deps = [ + "//core/consumer", + "//entity", + "//entity/queue", + "//extension/queue/mock", + "@com_github_stretchr_testify//assert", + "@com_github_stretchr_testify//require", + "@com_github_uber_go_tally_v4//:tally", + "@org_uber_go_mock//gomock", + "@org_uber_go_zap//zaptest", + ], +) diff --git a/orchestrator/controller/build/build.go b/orchestrator/controller/build/build.go new file mode 100644 index 00000000..eb923e3a --- /dev/null +++ b/orchestrator/controller/build/build.go @@ -0,0 +1,135 @@ +package build + +import ( + "context" + "fmt" + + "github.com/uber-go/tally/v4" + "github.com/uber/submitqueue/core/consumer" + "github.com/uber/submitqueue/entity" + entityqueue "github.com/uber/submitqueue/entity/queue" + "go.uber.org/zap" +) + +// Controller handles build queue messages. +// It consumes build requests, triggers builds, and publishes results to the build signal stage. +// Implements consumer.Controller interface for integration with the consumer. +type Controller struct { + logger *zap.SugaredLogger + metricsScope tally.Scope + registry consumer.TopicRegistry + topic consumer.Topic + consumerGroup string +} + +// Verify Controller implements consumer.Controller interface at compile time. +var _ consumer.Controller = (*Controller)(nil) + +// NewController creates a new build controller for the orchestrator. +func NewController( + logger *zap.SugaredLogger, + scope tally.Scope, + registry consumer.TopicRegistry, + topic consumer.Topic, + consumerGroup string, +) *Controller { + return &Controller{ + logger: logger.Named("build_controller"), + metricsScope: scope.SubScope("build_controller"), + registry: registry, + topic: topic, + consumerGroup: consumerGroup, + } +} + +// Process processes a build delivery from the queue. +// Deserializes the request, triggers a build, and publishes to the build signal topic. +// Returns nil to ack (success), or error to nack (retry). +func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) error { + c.metricsScope.Counter("received").Inc(1) + + msg := delivery.Message() + + // Deserialize request entity + request, err := entity.RequestFromBytes(msg.Payload) + if err != nil { + c.logger.Errorw("failed to deserialize request", + "message_id", msg.ID, + "partition_key", msg.PartitionKey, + "attempt", delivery.Attempt(), + "error", err, + ) + c.metricsScope.Counter("deserialize_errors").Inc(1) + // Non-retryable: malformed messages will never succeed regardless of retry count + return consumer.NewNonRetryableError(fmt.Errorf("failed to deserialize request: %w", err)) + } + + c.logger.Infow("received build event", + "request_id", request.ID, + "queue", request.Queue, + "state", string(request.State), + "version", request.Version, + "attempt", delivery.Attempt(), + "partition_key", msg.PartitionKey, + ) + + // TODO: Add build logic + // - Trigger CI build + // - Track build status + + // Publish to build signal topic + if err := c.publish(ctx, consumer.TopicBuildSignal, request); err != nil { + c.logger.Errorw("failed to publish output", + "request_id", request.ID, + "topic", consumer.TopicBuildSignal, + "error", err, + ) + c.metricsScope.Counter("publish_errors").Inc(1) + return fmt.Errorf("failed to publish to build-signal: %w", err) + } + + c.logger.Infow("published request to next stage", + "request_id", request.ID, + "topic", consumer.TopicBuildSignal, + ) + + c.metricsScope.Counter("processed").Inc(1) + + return nil // Success - message will be acked +} + +// publish publishes a request to the specified topic. +func (c *Controller) publish(ctx context.Context, topic consumer.Topic, request entity.Request) error { + payload, err := request.ToBytes() + if err != nil { + return fmt.Errorf("failed to serialize request: %w", err) + } + + msg := entityqueue.NewMessage(request.ID, payload, request.Queue, nil) + + q, ok := c.registry.Queue(topic) + if !ok { + return fmt.Errorf("no queue registered for topic %s", topic) + } + + if err := q.Publisher().Publish(ctx, topic.String(), msg); err != nil { + return fmt.Errorf("failed to publish message: %w", err) + } + + return nil +} + +// Name returns the controller name for logging and metrics. +func (c *Controller) Name() string { + return "build" +} + +// Topic returns the topic this controller subscribes to. +func (c *Controller) Topic() consumer.Topic { + return c.topic +} + +// ConsumerGroup returns the consumer group for offset tracking. +func (c *Controller) ConsumerGroup() string { + return c.consumerGroup +} diff --git a/orchestrator/controller/build/build_test.go b/orchestrator/controller/build/build_test.go new file mode 100644 index 00000000..fa7b8d36 --- /dev/null +++ b/orchestrator/controller/build/build_test.go @@ -0,0 +1,126 @@ +package build + +import ( + "context" + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/uber-go/tally/v4" + "github.com/uber/submitqueue/core/consumer" + "github.com/uber/submitqueue/entity" + "github.com/uber/submitqueue/entity/queue" + queuemock "github.com/uber/submitqueue/extension/queue/mock" + "go.uber.org/mock/gomock" + "go.uber.org/zap/zaptest" +) + +// newTestController creates a controller with test dependencies. +func newTestController(t *testing.T, ctrl *gomock.Controller, publishErr error) *Controller { + logger := zaptest.NewLogger(t).Sugar() + scope := tally.NoopScope + + mockPub := queuemock.NewMockPublisher(ctrl) + mockPub.EXPECT().Publish(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( + func(ctx context.Context, topic string, msg queue.Message) error { + return publishErr + }, + ).AnyTimes() + + mockQ := queuemock.NewMockQueue(ctrl) + mockQ.EXPECT().Publisher().Return(mockPub).AnyTimes() + + registry := consumer.NewTopicRegistry( + []consumer.TopicConfig{{Topic: consumer.TopicBuildSignal, Queue: mockQ}}, + nil, + ) + + return NewController(logger, scope, registry, consumer.TopicBuild, "orchestrator-build") +} + +func TestNewController(t *testing.T) { + ctrl := gomock.NewController(t) + controller := newTestController(t, ctrl, nil) + + require.NotNil(t, controller) + assert.Equal(t, consumer.TopicBuild, controller.Topic()) + assert.Equal(t, "orchestrator-build", controller.ConsumerGroup()) + assert.Equal(t, "build", controller.Name()) +} + +func TestController_Process_Success(t *testing.T) { + ctrl := gomock.NewController(t) + + controller := newTestController(t, ctrl, nil) + + request := entity.Request{ + ID: "test-queue/123", + Queue: "test-queue", + Change: entity.Change{Source: "github", IDs: []string{"PR-456"}}, + LandStrategy: entity.RequestLandStrategyRebase, + State: entity.RequestStateNew, + Version: 1, + } + + payload, err := request.ToBytes() + require.NoError(t, err) + + msg := queue.NewMessage("test-queue/123", payload, "test-queue", nil) + delivery := queuemock.NewMockDelivery(ctrl) + delivery.EXPECT().Message().Return(msg).AnyTimes() + delivery.EXPECT().Attempt().Return(1).AnyTimes() + + err = controller.Process(context.Background(), delivery) + require.NoError(t, err) +} + +func TestController_Process_InvalidJSON(t *testing.T) { + ctrl := gomock.NewController(t) + + controller := newTestController(t, ctrl, nil) + + invalidPayload := []byte(`{"invalid": json"}`) + msg := queue.NewMessage("invalid-msg", invalidPayload, "partition1", nil) + delivery := queuemock.NewMockDelivery(ctrl) + delivery.EXPECT().Message().Return(msg).AnyTimes() + delivery.EXPECT().Attempt().Return(1).AnyTimes() + + err := controller.Process(context.Background(), delivery) + + require.Error(t, err) + assert.True(t, consumer.IsNonRetryable(err)) +} + +func TestController_Process_PublishFailure(t *testing.T) { + ctrl := gomock.NewController(t) + + controller := newTestController(t, ctrl, fmt.Errorf("publish failed")) + + request := entity.Request{ + ID: "test-queue/123", + Queue: "test-queue", + Change: entity.Change{Source: "github", IDs: []string{"PR-1"}}, + LandStrategy: entity.RequestLandStrategyRebase, + State: entity.RequestStateNew, + Version: 1, + } + + payload, err := request.ToBytes() + require.NoError(t, err) + + msg := queue.NewMessage(request.ID, payload, request.Queue, nil) + delivery := queuemock.NewMockDelivery(ctrl) + delivery.EXPECT().Message().Return(msg).AnyTimes() + delivery.EXPECT().Attempt().Return(1).AnyTimes() + + err = controller.Process(context.Background(), delivery) + assert.Error(t, err) +} + +func TestController_InterfaceImplementation(t *testing.T) { + ctrl := gomock.NewController(t) + controller := newTestController(t, ctrl, nil) + + var _ consumer.Controller = controller +} diff --git a/orchestrator/controller/buildsignal/BUILD.bazel b/orchestrator/controller/buildsignal/BUILD.bazel new file mode 100644 index 00000000..2170f6e2 --- /dev/null +++ b/orchestrator/controller/buildsignal/BUILD.bazel @@ -0,0 +1,31 @@ +load("@rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "buildsignal", + srcs = ["buildsignal.go"], + importpath = "github.com/uber/submitqueue/orchestrator/controller/buildsignal", + visibility = ["//visibility:public"], + deps = [ + "//core/consumer", + "//entity", + "@com_github_uber_go_tally_v4//:tally", + "@org_uber_go_zap//:zap", + ], +) + +go_test( + name = "buildsignal_test", + srcs = ["buildsignal_test.go"], + embed = [":buildsignal"], + deps = [ + "//core/consumer", + "//entity", + "//entity/queue", + "//extension/queue/mock", + "@com_github_stretchr_testify//assert", + "@com_github_stretchr_testify//require", + "@com_github_uber_go_tally_v4//:tally", + "@org_uber_go_mock//gomock", + "@org_uber_go_zap//zaptest", + ], +) diff --git a/orchestrator/controller/buildsignal/buildsignal.go b/orchestrator/controller/buildsignal/buildsignal.go new file mode 100644 index 00000000..326104c3 --- /dev/null +++ b/orchestrator/controller/buildsignal/buildsignal.go @@ -0,0 +1,97 @@ +package buildsignal + +import ( + "context" + "fmt" + + "github.com/uber-go/tally/v4" + "github.com/uber/submitqueue/core/consumer" + "github.com/uber/submitqueue/entity" + "go.uber.org/zap" +) + +// Controller handles build signal queue messages. +// It consumes build signals and processes build results. +// Implements consumer.Controller interface for integration with the consumer. +type Controller struct { + logger *zap.SugaredLogger + metricsScope tally.Scope + registry consumer.TopicRegistry + topic consumer.Topic + consumerGroup string +} + +// Verify Controller implements consumer.Controller interface at compile time. +var _ consumer.Controller = (*Controller)(nil) + +// NewController creates a new build signal controller for the orchestrator. +func NewController( + logger *zap.SugaredLogger, + scope tally.Scope, + registry consumer.TopicRegistry, + topic consumer.Topic, + consumerGroup string, +) *Controller { + return &Controller{ + logger: logger.Named("buildsignal_controller"), + metricsScope: scope.SubScope("buildsignal_controller"), + registry: registry, + topic: topic, + consumerGroup: consumerGroup, + } +} + +// Process processes a build signal delivery from the queue. +// Deserializes the request and processes the build result. +// Returns nil to ack (success), or error to nack (retry). +func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) error { + c.metricsScope.Counter("received").Inc(1) + + msg := delivery.Message() + + // Deserialize request entity + request, err := entity.RequestFromBytes(msg.Payload) + if err != nil { + c.logger.Errorw("failed to deserialize request", + "message_id", msg.ID, + "partition_key", msg.PartitionKey, + "attempt", delivery.Attempt(), + "error", err, + ) + c.metricsScope.Counter("deserialize_errors").Inc(1) + // Non-retryable: malformed messages will never succeed regardless of retry count + return consumer.NewNonRetryableError(fmt.Errorf("failed to deserialize request: %w", err)) + } + + c.logger.Infow("received build signal event", + "request_id", request.ID, + "queue", request.Queue, + "state", string(request.State), + "version", request.Version, + "attempt", delivery.Attempt(), + "partition_key", msg.PartitionKey, + ) + + // TODO: Add build signal processing logic + // - Evaluate build result (pass/fail) + // - Update request state based on build outcome + + c.metricsScope.Counter("processed").Inc(1) + + return nil // Success - message will be acked +} + +// Name returns the controller name for logging and metrics. +func (c *Controller) Name() string { + return "buildsignal" +} + +// Topic returns the topic this controller subscribes to. +func (c *Controller) Topic() consumer.Topic { + return c.topic +} + +// ConsumerGroup returns the consumer group for offset tracking. +func (c *Controller) ConsumerGroup() string { + return c.consumerGroup +} diff --git a/orchestrator/controller/buildsignal/buildsignal_test.go b/orchestrator/controller/buildsignal/buildsignal_test.go new file mode 100644 index 00000000..41792c59 --- /dev/null +++ b/orchestrator/controller/buildsignal/buildsignal_test.go @@ -0,0 +1,84 @@ +package buildsignal + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/uber-go/tally/v4" + "github.com/uber/submitqueue/core/consumer" + "github.com/uber/submitqueue/entity" + "github.com/uber/submitqueue/entity/queue" + queuemock "github.com/uber/submitqueue/extension/queue/mock" + "go.uber.org/mock/gomock" + "go.uber.org/zap/zaptest" +) + +// newTestController creates a controller with test dependencies. +func newTestController(t *testing.T) *Controller { + logger := zaptest.NewLogger(t).Sugar() + scope := tally.NoopScope + + registry := consumer.NewTopicRegistry(nil, nil) + + return NewController(logger, scope, registry, consumer.TopicBuildSignal, "orchestrator-buildsignal") +} + +func TestNewController(t *testing.T) { + controller := newTestController(t) + + require.NotNil(t, controller) + assert.Equal(t, consumer.TopicBuildSignal, controller.Topic()) + assert.Equal(t, "orchestrator-buildsignal", controller.ConsumerGroup()) + assert.Equal(t, "buildsignal", controller.Name()) +} + +func TestController_Process_Success(t *testing.T) { + ctrl := gomock.NewController(t) + + controller := newTestController(t) + + request := entity.Request{ + ID: "test-queue/123", + Queue: "test-queue", + Change: entity.Change{Source: "github", IDs: []string{"PR-456"}}, + LandStrategy: entity.RequestLandStrategyRebase, + State: entity.RequestStateNew, + Version: 1, + } + + payload, err := request.ToBytes() + require.NoError(t, err) + + msg := queue.NewMessage("test-queue/123", payload, "test-queue", nil) + delivery := queuemock.NewMockDelivery(ctrl) + delivery.EXPECT().Message().Return(msg).AnyTimes() + delivery.EXPECT().Attempt().Return(1).AnyTimes() + + err = controller.Process(context.Background(), delivery) + require.NoError(t, err) +} + +func TestController_Process_InvalidJSON(t *testing.T) { + ctrl := gomock.NewController(t) + + controller := newTestController(t) + + invalidPayload := []byte(`{"invalid": json"}`) + msg := queue.NewMessage("invalid-msg", invalidPayload, "partition1", nil) + delivery := queuemock.NewMockDelivery(ctrl) + delivery.EXPECT().Message().Return(msg).AnyTimes() + delivery.EXPECT().Attempt().Return(1).AnyTimes() + + err := controller.Process(context.Background(), delivery) + + require.Error(t, err) + assert.True(t, consumer.IsNonRetryable(err)) +} + +func TestController_InterfaceImplementation(t *testing.T) { + controller := newTestController(t) + + var _ consumer.Controller = controller +} diff --git a/orchestrator/controller/finalize/BUILD.bazel b/orchestrator/controller/finalize/BUILD.bazel new file mode 100644 index 00000000..a84dc5fb --- /dev/null +++ b/orchestrator/controller/finalize/BUILD.bazel @@ -0,0 +1,31 @@ +load("@rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "finalize", + srcs = ["finalize.go"], + importpath = "github.com/uber/submitqueue/orchestrator/controller/finalize", + visibility = ["//visibility:public"], + deps = [ + "//core/consumer", + "//entity", + "@com_github_uber_go_tally_v4//:tally", + "@org_uber_go_zap//:zap", + ], +) + +go_test( + name = "finalize_test", + srcs = ["finalize_test.go"], + embed = [":finalize"], + deps = [ + "//core/consumer", + "//entity", + "//entity/queue", + "//extension/queue/mock", + "@com_github_stretchr_testify//assert", + "@com_github_stretchr_testify//require", + "@com_github_uber_go_tally_v4//:tally", + "@org_uber_go_mock//gomock", + "@org_uber_go_zap//zaptest", + ], +) diff --git a/orchestrator/controller/finalize/finalize.go b/orchestrator/controller/finalize/finalize.go new file mode 100644 index 00000000..aa1c6da1 --- /dev/null +++ b/orchestrator/controller/finalize/finalize.go @@ -0,0 +1,98 @@ +package finalize + +import ( + "context" + "fmt" + + "github.com/uber-go/tally/v4" + "github.com/uber/submitqueue/core/consumer" + "github.com/uber/submitqueue/entity" + "go.uber.org/zap" +) + +// Controller handles finalize queue messages. +// It consumes finalization requests and completes the pipeline. +// Implements consumer.Controller interface for integration with the consumer. +type Controller struct { + logger *zap.SugaredLogger + metricsScope tally.Scope + registry consumer.TopicRegistry + topic consumer.Topic + consumerGroup string +} + +// Verify Controller implements consumer.Controller interface at compile time. +var _ consumer.Controller = (*Controller)(nil) + +// NewController creates a new finalize controller for the orchestrator. +func NewController( + logger *zap.SugaredLogger, + scope tally.Scope, + registry consumer.TopicRegistry, + topic consumer.Topic, + consumerGroup string, +) *Controller { + return &Controller{ + logger: logger.Named("finalize_controller"), + metricsScope: scope.SubScope("finalize_controller"), + registry: registry, + topic: topic, + consumerGroup: consumerGroup, + } +} + +// Process processes a finalize delivery from the queue. +// Deserializes the request and completes the pipeline processing. +// Returns nil to ack (success), or error to nack (retry). +func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) error { + c.metricsScope.Counter("received").Inc(1) + + msg := delivery.Message() + + // Deserialize request entity + request, err := entity.RequestFromBytes(msg.Payload) + if err != nil { + c.logger.Errorw("failed to deserialize request", + "message_id", msg.ID, + "partition_key", msg.PartitionKey, + "attempt", delivery.Attempt(), + "error", err, + ) + c.metricsScope.Counter("deserialize_errors").Inc(1) + // Non-retryable: malformed messages will never succeed regardless of retry count + return consumer.NewNonRetryableError(fmt.Errorf("failed to deserialize request: %w", err)) + } + + c.logger.Infow("received finalize event", + "request_id", request.ID, + "queue", request.Queue, + "state", string(request.State), + "version", request.Version, + "attempt", delivery.Attempt(), + "partition_key", msg.PartitionKey, + ) + + // TODO: Add finalization logic + // - Mark request as landed or error + // - Send notifications + // - Clean up resources + + c.metricsScope.Counter("processed").Inc(1) + + return nil // Success - message will be acked +} + +// Name returns the controller name for logging and metrics. +func (c *Controller) Name() string { + return "finalize" +} + +// Topic returns the topic this controller subscribes to. +func (c *Controller) Topic() consumer.Topic { + return c.topic +} + +// ConsumerGroup returns the consumer group for offset tracking. +func (c *Controller) ConsumerGroup() string { + return c.consumerGroup +} diff --git a/orchestrator/controller/finalize/finalize_test.go b/orchestrator/controller/finalize/finalize_test.go new file mode 100644 index 00000000..609c0011 --- /dev/null +++ b/orchestrator/controller/finalize/finalize_test.go @@ -0,0 +1,84 @@ +package finalize + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/uber-go/tally/v4" + "github.com/uber/submitqueue/core/consumer" + "github.com/uber/submitqueue/entity" + "github.com/uber/submitqueue/entity/queue" + queuemock "github.com/uber/submitqueue/extension/queue/mock" + "go.uber.org/mock/gomock" + "go.uber.org/zap/zaptest" +) + +// newTestController creates a controller with test dependencies. +func newTestController(t *testing.T) *Controller { + logger := zaptest.NewLogger(t).Sugar() + scope := tally.NoopScope + + registry := consumer.NewTopicRegistry(nil, nil) + + return NewController(logger, scope, registry, consumer.TopicFinalize, "orchestrator-finalize") +} + +func TestNewController(t *testing.T) { + controller := newTestController(t) + + require.NotNil(t, controller) + assert.Equal(t, consumer.TopicFinalize, controller.Topic()) + assert.Equal(t, "orchestrator-finalize", controller.ConsumerGroup()) + assert.Equal(t, "finalize", controller.Name()) +} + +func TestController_Process_Success(t *testing.T) { + ctrl := gomock.NewController(t) + + controller := newTestController(t) + + request := entity.Request{ + ID: "test-queue/123", + Queue: "test-queue", + Change: entity.Change{Source: "github", IDs: []string{"PR-456"}}, + LandStrategy: entity.RequestLandStrategyRebase, + State: entity.RequestStateNew, + Version: 1, + } + + payload, err := request.ToBytes() + require.NoError(t, err) + + msg := queue.NewMessage("test-queue/123", payload, "test-queue", nil) + delivery := queuemock.NewMockDelivery(ctrl) + delivery.EXPECT().Message().Return(msg).AnyTimes() + delivery.EXPECT().Attempt().Return(1).AnyTimes() + + err = controller.Process(context.Background(), delivery) + require.NoError(t, err) +} + +func TestController_Process_InvalidJSON(t *testing.T) { + ctrl := gomock.NewController(t) + + controller := newTestController(t) + + invalidPayload := []byte(`{"invalid": json"}`) + msg := queue.NewMessage("invalid-msg", invalidPayload, "partition1", nil) + delivery := queuemock.NewMockDelivery(ctrl) + delivery.EXPECT().Message().Return(msg).AnyTimes() + delivery.EXPECT().Attempt().Return(1).AnyTimes() + + err := controller.Process(context.Background(), delivery) + + require.Error(t, err) + assert.True(t, consumer.IsNonRetryable(err)) +} + +func TestController_InterfaceImplementation(t *testing.T) { + controller := newTestController(t) + + var _ consumer.Controller = controller +} diff --git a/orchestrator/controller/merge/BUILD.bazel b/orchestrator/controller/merge/BUILD.bazel new file mode 100644 index 00000000..15063c30 --- /dev/null +++ b/orchestrator/controller/merge/BUILD.bazel @@ -0,0 +1,32 @@ +load("@rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "merge", + srcs = ["merge.go"], + importpath = "github.com/uber/submitqueue/orchestrator/controller/merge", + visibility = ["//visibility:public"], + deps = [ + "//core/consumer", + "//entity", + "//entity/queue", + "@com_github_uber_go_tally_v4//:tally", + "@org_uber_go_zap//:zap", + ], +) + +go_test( + name = "merge_test", + srcs = ["merge_test.go"], + embed = [":merge"], + deps = [ + "//core/consumer", + "//entity", + "//entity/queue", + "//extension/queue/mock", + "@com_github_stretchr_testify//assert", + "@com_github_stretchr_testify//require", + "@com_github_uber_go_tally_v4//:tally", + "@org_uber_go_mock//gomock", + "@org_uber_go_zap//zaptest", + ], +) diff --git a/orchestrator/controller/merge/merge.go b/orchestrator/controller/merge/merge.go new file mode 100644 index 00000000..968c89dd --- /dev/null +++ b/orchestrator/controller/merge/merge.go @@ -0,0 +1,135 @@ +package merge + +import ( + "context" + "fmt" + + "github.com/uber-go/tally/v4" + "github.com/uber/submitqueue/core/consumer" + "github.com/uber/submitqueue/entity" + entityqueue "github.com/uber/submitqueue/entity/queue" + "go.uber.org/zap" +) + +// Controller handles merge queue messages. +// It consumes merge requests, performs merges, and publishes results to the merge signal stage. +// Implements consumer.Controller interface for integration with the consumer. +type Controller struct { + logger *zap.SugaredLogger + metricsScope tally.Scope + registry consumer.TopicRegistry + topic consumer.Topic + consumerGroup string +} + +// Verify Controller implements consumer.Controller interface at compile time. +var _ consumer.Controller = (*Controller)(nil) + +// NewController creates a new merge controller for the orchestrator. +func NewController( + logger *zap.SugaredLogger, + scope tally.Scope, + registry consumer.TopicRegistry, + topic consumer.Topic, + consumerGroup string, +) *Controller { + return &Controller{ + logger: logger.Named("merge_controller"), + metricsScope: scope.SubScope("merge_controller"), + registry: registry, + topic: topic, + consumerGroup: consumerGroup, + } +} + +// Process processes a merge delivery from the queue. +// Deserializes the request, performs the merge, and publishes to the merge signal topic. +// Returns nil to ack (success), or error to nack (retry). +func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) error { + c.metricsScope.Counter("received").Inc(1) + + msg := delivery.Message() + + // Deserialize request entity + request, err := entity.RequestFromBytes(msg.Payload) + if err != nil { + c.logger.Errorw("failed to deserialize request", + "message_id", msg.ID, + "partition_key", msg.PartitionKey, + "attempt", delivery.Attempt(), + "error", err, + ) + c.metricsScope.Counter("deserialize_errors").Inc(1) + // Non-retryable: malformed messages will never succeed regardless of retry count + return consumer.NewNonRetryableError(fmt.Errorf("failed to deserialize request: %w", err)) + } + + c.logger.Infow("received merge event", + "request_id", request.ID, + "queue", request.Queue, + "state", string(request.State), + "version", request.Version, + "attempt", delivery.Attempt(), + "partition_key", msg.PartitionKey, + ) + + // TODO: Add merge logic + // - Perform source control merge operation + // - Handle merge conflicts + + // Publish to merge signal topic + if err := c.publish(ctx, consumer.TopicMergeSignal, request); err != nil { + c.logger.Errorw("failed to publish output", + "request_id", request.ID, + "topic", consumer.TopicMergeSignal, + "error", err, + ) + c.metricsScope.Counter("publish_errors").Inc(1) + return fmt.Errorf("failed to publish to merge-signal: %w", err) + } + + c.logger.Infow("published request to next stage", + "request_id", request.ID, + "topic", consumer.TopicMergeSignal, + ) + + c.metricsScope.Counter("processed").Inc(1) + + return nil // Success - message will be acked +} + +// publish publishes a request to the specified topic. +func (c *Controller) publish(ctx context.Context, topic consumer.Topic, request entity.Request) error { + payload, err := request.ToBytes() + if err != nil { + return fmt.Errorf("failed to serialize request: %w", err) + } + + msg := entityqueue.NewMessage(request.ID, payload, request.Queue, nil) + + q, ok := c.registry.Queue(topic) + if !ok { + return fmt.Errorf("no queue registered for topic %s", topic) + } + + if err := q.Publisher().Publish(ctx, topic.String(), msg); err != nil { + return fmt.Errorf("failed to publish message: %w", err) + } + + return nil +} + +// Name returns the controller name for logging and metrics. +func (c *Controller) Name() string { + return "merge" +} + +// Topic returns the topic this controller subscribes to. +func (c *Controller) Topic() consumer.Topic { + return c.topic +} + +// ConsumerGroup returns the consumer group for offset tracking. +func (c *Controller) ConsumerGroup() string { + return c.consumerGroup +} diff --git a/orchestrator/controller/merge/merge_test.go b/orchestrator/controller/merge/merge_test.go new file mode 100644 index 00000000..efbe4146 --- /dev/null +++ b/orchestrator/controller/merge/merge_test.go @@ -0,0 +1,126 @@ +package merge + +import ( + "context" + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/uber-go/tally/v4" + "github.com/uber/submitqueue/core/consumer" + "github.com/uber/submitqueue/entity" + "github.com/uber/submitqueue/entity/queue" + queuemock "github.com/uber/submitqueue/extension/queue/mock" + "go.uber.org/mock/gomock" + "go.uber.org/zap/zaptest" +) + +// newTestController creates a controller with test dependencies. +func newTestController(t *testing.T, ctrl *gomock.Controller, publishErr error) *Controller { + logger := zaptest.NewLogger(t).Sugar() + scope := tally.NoopScope + + mockPub := queuemock.NewMockPublisher(ctrl) + mockPub.EXPECT().Publish(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( + func(ctx context.Context, topic string, msg queue.Message) error { + return publishErr + }, + ).AnyTimes() + + mockQ := queuemock.NewMockQueue(ctrl) + mockQ.EXPECT().Publisher().Return(mockPub).AnyTimes() + + registry := consumer.NewTopicRegistry( + []consumer.TopicConfig{{Topic: consumer.TopicMergeSignal, Queue: mockQ}}, + nil, + ) + + return NewController(logger, scope, registry, consumer.TopicToMerge, "orchestrator-merge") +} + +func TestNewController(t *testing.T) { + ctrl := gomock.NewController(t) + controller := newTestController(t, ctrl, nil) + + require.NotNil(t, controller) + assert.Equal(t, consumer.TopicToMerge, controller.Topic()) + assert.Equal(t, "orchestrator-merge", controller.ConsumerGroup()) + assert.Equal(t, "merge", controller.Name()) +} + +func TestController_Process_Success(t *testing.T) { + ctrl := gomock.NewController(t) + + controller := newTestController(t, ctrl, nil) + + request := entity.Request{ + ID: "test-queue/123", + Queue: "test-queue", + Change: entity.Change{Source: "github", IDs: []string{"PR-456"}}, + LandStrategy: entity.RequestLandStrategyRebase, + State: entity.RequestStateNew, + Version: 1, + } + + payload, err := request.ToBytes() + require.NoError(t, err) + + msg := queue.NewMessage("test-queue/123", payload, "test-queue", nil) + delivery := queuemock.NewMockDelivery(ctrl) + delivery.EXPECT().Message().Return(msg).AnyTimes() + delivery.EXPECT().Attempt().Return(1).AnyTimes() + + err = controller.Process(context.Background(), delivery) + require.NoError(t, err) +} + +func TestController_Process_InvalidJSON(t *testing.T) { + ctrl := gomock.NewController(t) + + controller := newTestController(t, ctrl, nil) + + invalidPayload := []byte(`{"invalid": json"}`) + msg := queue.NewMessage("invalid-msg", invalidPayload, "partition1", nil) + delivery := queuemock.NewMockDelivery(ctrl) + delivery.EXPECT().Message().Return(msg).AnyTimes() + delivery.EXPECT().Attempt().Return(1).AnyTimes() + + err := controller.Process(context.Background(), delivery) + + require.Error(t, err) + assert.True(t, consumer.IsNonRetryable(err)) +} + +func TestController_Process_PublishFailure(t *testing.T) { + ctrl := gomock.NewController(t) + + controller := newTestController(t, ctrl, fmt.Errorf("publish failed")) + + request := entity.Request{ + ID: "test-queue/123", + Queue: "test-queue", + Change: entity.Change{Source: "github", IDs: []string{"PR-1"}}, + LandStrategy: entity.RequestLandStrategyRebase, + State: entity.RequestStateNew, + Version: 1, + } + + payload, err := request.ToBytes() + require.NoError(t, err) + + msg := queue.NewMessage(request.ID, payload, request.Queue, nil) + delivery := queuemock.NewMockDelivery(ctrl) + delivery.EXPECT().Message().Return(msg).AnyTimes() + delivery.EXPECT().Attempt().Return(1).AnyTimes() + + err = controller.Process(context.Background(), delivery) + assert.Error(t, err) +} + +func TestController_InterfaceImplementation(t *testing.T) { + ctrl := gomock.NewController(t) + controller := newTestController(t, ctrl, nil) + + var _ consumer.Controller = controller +} diff --git a/orchestrator/controller/mergesignal/BUILD.bazel b/orchestrator/controller/mergesignal/BUILD.bazel new file mode 100644 index 00000000..92439fb4 --- /dev/null +++ b/orchestrator/controller/mergesignal/BUILD.bazel @@ -0,0 +1,31 @@ +load("@rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "mergesignal", + srcs = ["mergesignal.go"], + importpath = "github.com/uber/submitqueue/orchestrator/controller/mergesignal", + visibility = ["//visibility:public"], + deps = [ + "//core/consumer", + "//entity", + "@com_github_uber_go_tally_v4//:tally", + "@org_uber_go_zap//:zap", + ], +) + +go_test( + name = "mergesignal_test", + srcs = ["mergesignal_test.go"], + embed = [":mergesignal"], + deps = [ + "//core/consumer", + "//entity", + "//entity/queue", + "//extension/queue/mock", + "@com_github_stretchr_testify//assert", + "@com_github_stretchr_testify//require", + "@com_github_uber_go_tally_v4//:tally", + "@org_uber_go_mock//gomock", + "@org_uber_go_zap//zaptest", + ], +) diff --git a/orchestrator/controller/mergesignal/mergesignal.go b/orchestrator/controller/mergesignal/mergesignal.go new file mode 100644 index 00000000..034cea37 --- /dev/null +++ b/orchestrator/controller/mergesignal/mergesignal.go @@ -0,0 +1,97 @@ +package mergesignal + +import ( + "context" + "fmt" + + "github.com/uber-go/tally/v4" + "github.com/uber/submitqueue/core/consumer" + "github.com/uber/submitqueue/entity" + "go.uber.org/zap" +) + +// Controller handles merge signal queue messages. +// It consumes merge signals and processes merge results. +// Implements consumer.Controller interface for integration with the consumer. +type Controller struct { + logger *zap.SugaredLogger + metricsScope tally.Scope + registry consumer.TopicRegistry + topic consumer.Topic + consumerGroup string +} + +// Verify Controller implements consumer.Controller interface at compile time. +var _ consumer.Controller = (*Controller)(nil) + +// NewController creates a new merge signal controller for the orchestrator. +func NewController( + logger *zap.SugaredLogger, + scope tally.Scope, + registry consumer.TopicRegistry, + topic consumer.Topic, + consumerGroup string, +) *Controller { + return &Controller{ + logger: logger.Named("mergesignal_controller"), + metricsScope: scope.SubScope("mergesignal_controller"), + registry: registry, + topic: topic, + consumerGroup: consumerGroup, + } +} + +// Process processes a merge signal delivery from the queue. +// Deserializes the request and processes the merge result. +// Returns nil to ack (success), or error to nack (retry). +func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) error { + c.metricsScope.Counter("received").Inc(1) + + msg := delivery.Message() + + // Deserialize request entity + request, err := entity.RequestFromBytes(msg.Payload) + if err != nil { + c.logger.Errorw("failed to deserialize request", + "message_id", msg.ID, + "partition_key", msg.PartitionKey, + "attempt", delivery.Attempt(), + "error", err, + ) + c.metricsScope.Counter("deserialize_errors").Inc(1) + // Non-retryable: malformed messages will never succeed regardless of retry count + return consumer.NewNonRetryableError(fmt.Errorf("failed to deserialize request: %w", err)) + } + + c.logger.Infow("received merge signal event", + "request_id", request.ID, + "queue", request.Queue, + "state", string(request.State), + "version", request.Version, + "attempt", delivery.Attempt(), + "partition_key", msg.PartitionKey, + ) + + // TODO: Add merge signal processing logic + // - Evaluate merge result (success/failure) + // - Update request state based on merge outcome + + c.metricsScope.Counter("processed").Inc(1) + + return nil // Success - message will be acked +} + +// Name returns the controller name for logging and metrics. +func (c *Controller) Name() string { + return "mergesignal" +} + +// Topic returns the topic this controller subscribes to. +func (c *Controller) Topic() consumer.Topic { + return c.topic +} + +// ConsumerGroup returns the consumer group for offset tracking. +func (c *Controller) ConsumerGroup() string { + return c.consumerGroup +} diff --git a/orchestrator/controller/mergesignal/mergesignal_test.go b/orchestrator/controller/mergesignal/mergesignal_test.go new file mode 100644 index 00000000..228d4f42 --- /dev/null +++ b/orchestrator/controller/mergesignal/mergesignal_test.go @@ -0,0 +1,84 @@ +package mergesignal + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/uber-go/tally/v4" + "github.com/uber/submitqueue/core/consumer" + "github.com/uber/submitqueue/entity" + "github.com/uber/submitqueue/entity/queue" + queuemock "github.com/uber/submitqueue/extension/queue/mock" + "go.uber.org/mock/gomock" + "go.uber.org/zap/zaptest" +) + +// newTestController creates a controller with test dependencies. +func newTestController(t *testing.T) *Controller { + logger := zaptest.NewLogger(t).Sugar() + scope := tally.NoopScope + + registry := consumer.NewTopicRegistry(nil, nil) + + return NewController(logger, scope, registry, consumer.TopicMergeSignal, "orchestrator-mergesignal") +} + +func TestNewController(t *testing.T) { + controller := newTestController(t) + + require.NotNil(t, controller) + assert.Equal(t, consumer.TopicMergeSignal, controller.Topic()) + assert.Equal(t, "orchestrator-mergesignal", controller.ConsumerGroup()) + assert.Equal(t, "mergesignal", controller.Name()) +} + +func TestController_Process_Success(t *testing.T) { + ctrl := gomock.NewController(t) + + controller := newTestController(t) + + request := entity.Request{ + ID: "test-queue/123", + Queue: "test-queue", + Change: entity.Change{Source: "github", IDs: []string{"PR-456"}}, + LandStrategy: entity.RequestLandStrategyRebase, + State: entity.RequestStateNew, + Version: 1, + } + + payload, err := request.ToBytes() + require.NoError(t, err) + + msg := queue.NewMessage("test-queue/123", payload, "test-queue", nil) + delivery := queuemock.NewMockDelivery(ctrl) + delivery.EXPECT().Message().Return(msg).AnyTimes() + delivery.EXPECT().Attempt().Return(1).AnyTimes() + + err = controller.Process(context.Background(), delivery) + require.NoError(t, err) +} + +func TestController_Process_InvalidJSON(t *testing.T) { + ctrl := gomock.NewController(t) + + controller := newTestController(t) + + invalidPayload := []byte(`{"invalid": json"}`) + msg := queue.NewMessage("invalid-msg", invalidPayload, "partition1", nil) + delivery := queuemock.NewMockDelivery(ctrl) + delivery.EXPECT().Message().Return(msg).AnyTimes() + delivery.EXPECT().Attempt().Return(1).AnyTimes() + + err := controller.Process(context.Background(), delivery) + + require.Error(t, err) + assert.True(t, consumer.IsNonRetryable(err)) +} + +func TestController_InterfaceImplementation(t *testing.T) { + controller := newTestController(t) + + var _ consumer.Controller = controller +} diff --git a/orchestrator/controller/speculate/BUILD.bazel b/orchestrator/controller/speculate/BUILD.bazel new file mode 100644 index 00000000..802eb84c --- /dev/null +++ b/orchestrator/controller/speculate/BUILD.bazel @@ -0,0 +1,32 @@ +load("@rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "speculate", + srcs = ["speculate.go"], + importpath = "github.com/uber/submitqueue/orchestrator/controller/speculate", + visibility = ["//visibility:public"], + deps = [ + "//core/consumer", + "//entity", + "//entity/queue", + "@com_github_uber_go_tally_v4//:tally", + "@org_uber_go_zap//:zap", + ], +) + +go_test( + name = "speculate_test", + srcs = ["speculate_test.go"], + embed = [":speculate"], + deps = [ + "//core/consumer", + "//entity", + "//entity/queue", + "//extension/queue/mock", + "@com_github_stretchr_testify//assert", + "@com_github_stretchr_testify//require", + "@com_github_uber_go_tally_v4//:tally", + "@org_uber_go_mock//gomock", + "@org_uber_go_zap//zaptest", + ], +) diff --git a/orchestrator/controller/speculate/speculate.go b/orchestrator/controller/speculate/speculate.go new file mode 100644 index 00000000..8695464d --- /dev/null +++ b/orchestrator/controller/speculate/speculate.go @@ -0,0 +1,146 @@ +package speculate + +import ( + "context" + "fmt" + + "github.com/uber-go/tally/v4" + "github.com/uber/submitqueue/core/consumer" + "github.com/uber/submitqueue/entity" + entityqueue "github.com/uber/submitqueue/entity/queue" + "go.uber.org/zap" +) + +// Controller handles speculate queue messages. +// It consumes batched requests, performs speculation, and publishes to both build and merge stages. +// Implements consumer.Controller interface for integration with the consumer. +type Controller struct { + logger *zap.SugaredLogger + metricsScope tally.Scope + registry consumer.TopicRegistry + topic consumer.Topic + consumerGroup string +} + +// Verify Controller implements consumer.Controller interface at compile time. +var _ consumer.Controller = (*Controller)(nil) + +// NewController creates a new speculate controller for the orchestrator. +func NewController( + logger *zap.SugaredLogger, + scope tally.Scope, + registry consumer.TopicRegistry, + topic consumer.Topic, + consumerGroup string, +) *Controller { + return &Controller{ + logger: logger.Named("speculate_controller"), + metricsScope: scope.SubScope("speculate_controller"), + registry: registry, + topic: topic, + consumerGroup: consumerGroup, + } +} + +// Process processes a speculate delivery from the queue. +// Deserializes the request, performs speculation, and publishes to both build and merge topics. +// Returns nil to ack (success), or error to nack (retry). +func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) error { + c.metricsScope.Counter("received").Inc(1) + + msg := delivery.Message() + + // Deserialize request entity + request, err := entity.RequestFromBytes(msg.Payload) + if err != nil { + c.logger.Errorw("failed to deserialize request", + "message_id", msg.ID, + "partition_key", msg.PartitionKey, + "attempt", delivery.Attempt(), + "error", err, + ) + c.metricsScope.Counter("deserialize_errors").Inc(1) + // Non-retryable: malformed messages will never succeed regardless of retry count + return consumer.NewNonRetryableError(fmt.Errorf("failed to deserialize request: %w", err)) + } + + c.logger.Infow("received speculate event", + "request_id", request.ID, + "queue", request.Queue, + "state", string(request.State), + "version", request.Version, + "attempt", delivery.Attempt(), + "partition_key", msg.PartitionKey, + ) + + // TODO: Add speculation logic + // - Speculative merge/rebase + // - Conflict detection + + // Publish to build topic + if err := c.publish(ctx, consumer.TopicBuild, request); err != nil { + c.logger.Errorw("failed to publish to build", + "request_id", request.ID, + "topic", consumer.TopicBuild, + "error", err, + ) + c.metricsScope.Counter("publish_errors").Inc(1) + return fmt.Errorf("failed to publish to build: %w", err) + } + + // Publish to merge topic + if err := c.publish(ctx, consumer.TopicToMerge, request); err != nil { + c.logger.Errorw("failed to publish to merge", + "request_id", request.ID, + "topic", consumer.TopicToMerge, + "error", err, + ) + c.metricsScope.Counter("publish_errors").Inc(1) + return fmt.Errorf("failed to publish to merge: %w", err) + } + + c.logger.Infow("published request to next stages", + "request_id", request.ID, + "topics", []string{consumer.TopicBuild.String(), consumer.TopicToMerge.String()}, + ) + + c.metricsScope.Counter("processed").Inc(1) + + return nil // Success - message will be acked +} + +// publish publishes a request to the specified topic. +func (c *Controller) publish(ctx context.Context, topic consumer.Topic, request entity.Request) error { + payload, err := request.ToBytes() + if err != nil { + return fmt.Errorf("failed to serialize request: %w", err) + } + + msg := entityqueue.NewMessage(request.ID, payload, request.Queue, nil) + + q, ok := c.registry.Queue(topic) + if !ok { + return fmt.Errorf("no queue registered for topic %s", topic) + } + + if err := q.Publisher().Publish(ctx, topic.String(), msg); err != nil { + return fmt.Errorf("failed to publish message: %w", err) + } + + return nil +} + +// Name returns the controller name for logging and metrics. +func (c *Controller) Name() string { + return "speculate" +} + +// Topic returns the topic this controller subscribes to. +func (c *Controller) Topic() consumer.Topic { + return c.topic +} + +// ConsumerGroup returns the consumer group for offset tracking. +func (c *Controller) ConsumerGroup() string { + return c.consumerGroup +} diff --git a/orchestrator/controller/speculate/speculate_test.go b/orchestrator/controller/speculate/speculate_test.go new file mode 100644 index 00000000..56e38de3 --- /dev/null +++ b/orchestrator/controller/speculate/speculate_test.go @@ -0,0 +1,129 @@ +package speculate + +import ( + "context" + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/uber-go/tally/v4" + "github.com/uber/submitqueue/core/consumer" + "github.com/uber/submitqueue/entity" + "github.com/uber/submitqueue/entity/queue" + queuemock "github.com/uber/submitqueue/extension/queue/mock" + "go.uber.org/mock/gomock" + "go.uber.org/zap/zaptest" +) + +// newTestController creates a controller with test dependencies. +func newTestController(t *testing.T, ctrl *gomock.Controller, publishErr error) *Controller { + logger := zaptest.NewLogger(t).Sugar() + scope := tally.NoopScope + + mockPub := queuemock.NewMockPublisher(ctrl) + mockPub.EXPECT().Publish(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( + func(ctx context.Context, topic string, msg queue.Message) error { + return publishErr + }, + ).AnyTimes() + + mockQ := queuemock.NewMockQueue(ctrl) + mockQ.EXPECT().Publisher().Return(mockPub).AnyTimes() + + registry := consumer.NewTopicRegistry( + []consumer.TopicConfig{ + {Topic: consumer.TopicBuild, Queue: mockQ}, + {Topic: consumer.TopicToMerge, Queue: mockQ}, + }, + nil, + ) + + return NewController(logger, scope, registry, consumer.TopicBatched, "orchestrator-speculate") +} + +func TestNewController(t *testing.T) { + ctrl := gomock.NewController(t) + controller := newTestController(t, ctrl, nil) + + require.NotNil(t, controller) + assert.Equal(t, consumer.TopicBatched, controller.Topic()) + assert.Equal(t, "orchestrator-speculate", controller.ConsumerGroup()) + assert.Equal(t, "speculate", controller.Name()) +} + +func TestController_Process_Success(t *testing.T) { + ctrl := gomock.NewController(t) + + controller := newTestController(t, ctrl, nil) + + request := entity.Request{ + ID: "test-queue/123", + Queue: "test-queue", + Change: entity.Change{Source: "github", IDs: []string{"PR-456"}}, + LandStrategy: entity.RequestLandStrategyRebase, + State: entity.RequestStateNew, + Version: 1, + } + + payload, err := request.ToBytes() + require.NoError(t, err) + + msg := queue.NewMessage("test-queue/123", payload, "test-queue", nil) + delivery := queuemock.NewMockDelivery(ctrl) + delivery.EXPECT().Message().Return(msg).AnyTimes() + delivery.EXPECT().Attempt().Return(1).AnyTimes() + + err = controller.Process(context.Background(), delivery) + require.NoError(t, err) +} + +func TestController_Process_InvalidJSON(t *testing.T) { + ctrl := gomock.NewController(t) + + controller := newTestController(t, ctrl, nil) + + invalidPayload := []byte(`{"invalid": json"}`) + msg := queue.NewMessage("invalid-msg", invalidPayload, "partition1", nil) + delivery := queuemock.NewMockDelivery(ctrl) + delivery.EXPECT().Message().Return(msg).AnyTimes() + delivery.EXPECT().Attempt().Return(1).AnyTimes() + + err := controller.Process(context.Background(), delivery) + + require.Error(t, err) + assert.True(t, consumer.IsNonRetryable(err)) +} + +func TestController_Process_PublishFailure(t *testing.T) { + ctrl := gomock.NewController(t) + + controller := newTestController(t, ctrl, fmt.Errorf("publish failed")) + + request := entity.Request{ + ID: "test-queue/123", + Queue: "test-queue", + Change: entity.Change{Source: "github", IDs: []string{"PR-1"}}, + LandStrategy: entity.RequestLandStrategyRebase, + State: entity.RequestStateNew, + Version: 1, + } + + payload, err := request.ToBytes() + require.NoError(t, err) + + msg := queue.NewMessage(request.ID, payload, request.Queue, nil) + delivery := queuemock.NewMockDelivery(ctrl) + delivery.EXPECT().Message().Return(msg).AnyTimes() + delivery.EXPECT().Attempt().Return(1).AnyTimes() + + err = controller.Process(context.Background(), delivery) + assert.Error(t, err) +} + +func TestController_InterfaceImplementation(t *testing.T) { + ctrl := gomock.NewController(t) + controller := newTestController(t, ctrl, nil) + + var _ consumer.Controller = controller +}