diff --git a/core/consumer/registry.go b/core/consumer/registry.go index 0bbfbaec..945d9ade 100644 --- a/core/consumer/registry.go +++ b/core/consumer/registry.go @@ -16,20 +16,22 @@ type TopicKey string const ( // TopicKeyRequest is the pipeline stage where new requests arrive from the gateway. TopicKeyRequest TopicKey = "request" - // TopicKeyToBatch is the pipeline stage where validated requests are published for batching. - TopicKeyToBatch TopicKey = "to-batch" - // TopicKeyBatched is the pipeline stage where batched requests are published for speculation. - TopicKeyBatched TopicKey = "batched" - // TopicKeyBuild is the pipeline stage where requests are published for builds. + // TopicKeyValidate is the pipeline stage where requests are published for validation. + TopicKeyValidate TopicKey = "validate" + // TopicKeyBatch is the pipeline stage where validated requests are published for batching. + TopicKeyBatch TopicKey = "batch" + // TopicKeyScore is the pipeline stage where batches are published for scoring. + TopicKeyScore TopicKey = "score" + // TopicKeySpeculate is the pipeline stage where scored batches are published for speculation. + TopicKeySpeculate TopicKey = "speculate" + // TopicKeyBuild is the pipeline stage where speculated batches are published for builds. TopicKeyBuild TopicKey = "build" - // TopicKeyBuildSignal is the pipeline stage where build signals are published for processing. - TopicKeyBuildSignal TopicKey = "build-signal" - // TopicKeyToMerge is the pipeline stage where requests are published for merging. - TopicKeyToMerge TopicKey = "to-merge" - // TopicKeyMergeSignal is the pipeline stage where merge signals are published for processing. - TopicKeyMergeSignal TopicKey = "merge-signal" - // TopicKeyFinalize is the pipeline stage where requests are published for finalization. - TopicKeyFinalize TopicKey = "finalize" + // TopicKeyBuildSignal is the pipeline stage where builds are published for build signal processing. + TopicKeyBuildSignal TopicKey = "buildsignal" + // TopicKeyMerge is the pipeline stage where speculated batches are published for merging. + TopicKeyMerge TopicKey = "merge" + // TopicKeyConclude is the pipeline stage where merged requests are published for conclusion. + TopicKeyConclude TopicKey = "conclude" ) // String returns the topic key as a string. diff --git a/core/consumer/registry_test.go b/core/consumer/registry_test.go index b5ee39f5..c5a36884 100644 --- a/core/consumer/registry_test.go +++ b/core/consumer/registry_test.go @@ -162,7 +162,7 @@ func TestTopicRegistry_Queue_PerTopic(t *testing.T) { registry, err := consumer.NewTopicRegistry( []consumer.TopicConfig{ {Key: consumer.TopicKeyRequest, Name: "request", Queue: mockQ1}, - {Key: consumer.TopicKeyToBatch, Name: "to-batch", Queue: mockQ2}, + {Key: consumer.TopicKeyValidate, Name: "validate", Queue: mockQ2}, }, ) require.NoError(t, err) @@ -171,7 +171,7 @@ func TestTopicRegistry_Queue_PerTopic(t *testing.T) { require.True(t, ok) assert.Equal(t, mockQ1, q1) - q2, ok := registry.Queue(consumer.TopicKeyToBatch) + q2, ok := registry.Queue(consumer.TopicKeyValidate) require.True(t, ok) assert.Equal(t, mockQ2, q2) diff --git a/entity/build.go b/entity/build.go index c6253502..bb848340 100644 --- a/entity/build.go +++ b/entity/build.go @@ -1,5 +1,7 @@ package entity +import "encoding/json" + // BuildStatus defines the possible states of a build. type BuildStatus string @@ -61,3 +63,15 @@ type Build struct { // Status represents the state of the build lifecycle this build is in. Status BuildStatus } + +// ToBytes serializes the Build to JSON bytes for queue message payload. +func (b Build) ToBytes() ([]byte, error) { + return json.Marshal(b) +} + +// BuildFromBytes deserializes a Build from JSON bytes. +func BuildFromBytes(data []byte) (Build, error) { + var build Build + err := json.Unmarshal(data, &build) + return build, err +} diff --git a/entity/build_test.go b/entity/build_test.go index cf0aa0ac..41c98cf2 100644 --- a/entity/build_test.go +++ b/entity/build_test.go @@ -4,6 +4,7 @@ import ( "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestBuildStatus_IsTerminal(t *testing.T) { @@ -55,3 +56,128 @@ func TestBuildStatus_IsTerminal(t *testing.T) { }) } } + +func TestBuild_ToBytes(t *testing.T) { + build := Build{ + ID: "build-1", + BatchID: "batch-1", + SpeculationPath: SpeculationPathInfo{ + Base: []string{"batch-0", "batch-prev"}, + }, + Score: 0.85, + Status: BuildStatusQueued, + } + + data, err := build.ToBytes() + require.NoError(t, err) + assert.NotEmpty(t, data) + + // Verify JSON contains expected fields + jsonStr := string(data) + assert.Contains(t, jsonStr, "build-1") + assert.Contains(t, jsonStr, "batch-1") + assert.Contains(t, jsonStr, "queued") +} + +func TestBuildFromBytes(t *testing.T) { + original := Build{ + ID: "build-42", + BatchID: "batch-7", + SpeculationPath: SpeculationPathInfo{ + Base: []string{"batch-5", "batch-6"}, + }, + Score: 0.92, + Status: BuildStatusRunning, + } + + // Serialize + data, err := original.ToBytes() + require.NoError(t, err) + + // Deserialize + deserialized, err := BuildFromBytes(data) + require.NoError(t, err) + + // Verify all fields match + assert.Equal(t, original.ID, deserialized.ID) + assert.Equal(t, original.BatchID, deserialized.BatchID) + assert.Equal(t, original.SpeculationPath.Base, deserialized.SpeculationPath.Base) + assert.Equal(t, original.Score, deserialized.Score) + assert.Equal(t, original.Status, deserialized.Status) +} + +func TestBuildFromBytes_InvalidJSON(t *testing.T) { + invalidJSON := []byte(`{"invalid": json"}`) + + _, err := BuildFromBytes(invalidJSON) + assert.Error(t, err) +} + +func TestBuildFromBytes_EmptyData(t *testing.T) { + emptyJSON := []byte(`{}`) + + build, err := BuildFromBytes(emptyJSON) + require.NoError(t, err) + + // Empty JSON should deserialize with zero values + assert.Empty(t, build.ID) + assert.Empty(t, build.BatchID) + assert.Equal(t, BuildStatusUnknown, build.Status) + assert.Equal(t, float32(0), build.Score) +} + +func TestBuild_SerializationRoundTrip(t *testing.T) { + tests := []struct { + name string + build Build + }{ + { + name: "queued build with speculation path", + build: Build{ + ID: "build-100", + BatchID: "batch-50", + SpeculationPath: SpeculationPathInfo{ + Base: []string{"batch-48", "batch-49"}, + }, + Score: 0.75, + Status: BuildStatusQueued, + }, + }, + { + name: "passed build with no speculation base", + build: Build{ + ID: "build-200", + BatchID: "batch-60", + Score: 1.0, + Status: BuildStatusPassed, + }, + }, + { + name: "failed build with zero score", + build: Build{ + ID: "build-300", + BatchID: "batch-70", + SpeculationPath: SpeculationPathInfo{ + Base: []string{"batch-65"}, + }, + Score: 0, + Status: BuildStatusFailed, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Serialize + data, err := tt.build.ToBytes() + require.NoError(t, err) + + // Deserialize + deserialized, err := BuildFromBytes(data) + require.NoError(t, err) + + // Verify complete equality + assert.Equal(t, tt.build, deserialized) + }) + } +} diff --git a/example/README.md b/example/README.md index 2581de27..87fd7d04 100644 --- a/example/README.md +++ b/example/README.md @@ -5,7 +5,7 @@ Example gRPC servers and clients for running the submitqueue services locally. ## Services - **Gateway** (port 8081) — entry point for land requests. Exposes `Ping` and `Land` RPCs. -- **Orchestrator** (port 8082) — coordinates the pipeline. Exposes `Ping` RPC and consumes queue messages across 8 pipeline topics. +- **Orchestrator** (port 8082) — coordinates the pipeline. Exposes `Ping` RPC and consumes queue messages across 9 pipeline topics. Both services require MySQL (app database + queue database). Docker Compose handles this automatically. diff --git a/example/server/orchestrator/BUILD.bazel b/example/server/orchestrator/BUILD.bazel index c342fa47..26ad6f18 100644 --- a/example/server/orchestrator/BUILD.bazel +++ b/example/server/orchestrator/BUILD.bazel @@ -24,11 +24,12 @@ go_library( "//orchestrator/controller/batch", "//orchestrator/controller/build", "//orchestrator/controller/buildsignal", - "//orchestrator/controller/finalize", + "//orchestrator/controller/conclude", "//orchestrator/controller/merge", - "//orchestrator/controller/mergesignal", "//orchestrator/controller/request", + "//orchestrator/controller/score", "//orchestrator/controller/speculate", + "//orchestrator/controller/validate", "//orchestrator/protopb", "@com_github_go_sql_driver_mysql//:mysql", "@com_github_uber_go_tally_v4//:tally", diff --git a/example/server/orchestrator/main.go b/example/server/orchestrator/main.go index 7e500b5f..b1a48b7f 100644 --- a/example/server/orchestrator/main.go +++ b/example/server/orchestrator/main.go @@ -26,12 +26,13 @@ import ( "github.com/uber/submitqueue/orchestrator/controller" "github.com/uber/submitqueue/orchestrator/controller/batch" "github.com/uber/submitqueue/orchestrator/controller/build" - "github.com/uber/submitqueue/orchestrator/controller/buildsignal" - "github.com/uber/submitqueue/orchestrator/controller/finalize" + "github.com/uber/submitqueue/orchestrator/controller/conclude" "github.com/uber/submitqueue/orchestrator/controller/merge" - "github.com/uber/submitqueue/orchestrator/controller/mergesignal" + "github.com/uber/submitqueue/orchestrator/controller/buildsignal" "github.com/uber/submitqueue/orchestrator/controller/request" + "github.com/uber/submitqueue/orchestrator/controller/score" "github.com/uber/submitqueue/orchestrator/controller/speculate" + "github.com/uber/submitqueue/orchestrator/controller/validate" pb "github.com/uber/submitqueue/orchestrator/protopb" "go.uber.org/zap" "google.golang.org/grpc" @@ -165,7 +166,7 @@ func run() error { return err } - logger.Info("controllers registered", zap.Int("count", 8)) + logger.Info("controllers registered", zap.Int("count", 9)) // Start consumers if err := c.Start(ctx); err != nil { @@ -234,16 +235,32 @@ func newTopicRegistry(q extqueue.Queue, subscriberName string) (consumer.TopicRe ), }, { - Key: consumer.TopicKeyToBatch, - Name: "to-batch", + Key: consumer.TopicKeyValidate, + Name: "validate", + Queue: q, + Subscription: extqueue.DefaultSubscriptionConfig( + subscriberName, "orchestrator-validate", + ), + }, + { + Key: consumer.TopicKeyBatch, + Name: "batch", Queue: q, Subscription: extqueue.DefaultSubscriptionConfig( subscriberName, "orchestrator-batch", ), }, { - Key: consumer.TopicKeyBatched, - Name: "batched", + Key: consumer.TopicKeyScore, + Name: "score", + Queue: q, + Subscription: extqueue.DefaultSubscriptionConfig( + subscriberName, "orchestrator-score", + ), + }, + { + Key: consumer.TopicKeySpeculate, + Name: "speculate", Queue: q, Subscription: extqueue.DefaultSubscriptionConfig( subscriberName, "orchestrator-speculate", @@ -259,51 +276,45 @@ func newTopicRegistry(q extqueue.Queue, subscriberName string) (consumer.TopicRe }, { Key: consumer.TopicKeyBuildSignal, - Name: "build-signal", + Name: "buildsignal", Queue: q, Subscription: extqueue.DefaultSubscriptionConfig( subscriberName, "orchestrator-buildsignal", ), }, { - Key: consumer.TopicKeyToMerge, - Name: "to-merge", + Key: consumer.TopicKeyMerge, + Name: "merge", Queue: q, Subscription: extqueue.DefaultSubscriptionConfig( subscriberName, "orchestrator-merge", ), }, { - Key: consumer.TopicKeyMergeSignal, - Name: "merge-signal", + Key: consumer.TopicKeyConclude, + Name: "conclude", Queue: q, Subscription: extqueue.DefaultSubscriptionConfig( - subscriberName, "orchestrator-mergesignal", - ), - }, - { - Key: consumer.TopicKeyFinalize, - Name: "finalize", - Queue: q, - Subscription: extqueue.DefaultSubscriptionConfig( - subscriberName, "orchestrator-finalize", + subscriberName, "orchestrator-conclude", ), }, }) } // registerControllers creates all pipeline controllers and registers them with the consumer. -// Pipeline: request → batch → speculate → build → build-signal +// Pipeline: // -// → merge → merge-signal -// finalize (terminal) +// request → validate → batch → score → speculate → build → buildsignal ─┐ +// ↑ ↘ │ +// │ merge → conclude │ +// │ │ │ +// └────────┴────────────────────────┘ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope tally.Scope, registry consumer.TopicRegistry, mc mergechecker.MergeChecker, cnt counter.Counter, store storage.Storage) error { requestController := request.NewController( logger, scope, registry, - mc, consumer.TopicKeyRequest, "orchestrator-request", ) @@ -311,24 +322,47 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t return fmt.Errorf("failed to register request controller: %w", err) } + validateController := validate.NewController( + logger, + scope, + registry, + mc, + consumer.TopicKeyValidate, + "orchestrator-validate", + ) + if err := c.Register(validateController); err != nil { + return fmt.Errorf("failed to register validate controller: %w", err) + } + batchController := batch.NewController( logger, scope, registry, cnt, store, - consumer.TopicKeyToBatch, + consumer.TopicKeyBatch, "orchestrator-batch", ) if err := c.Register(batchController); err != nil { return fmt.Errorf("failed to register batch controller: %w", err) } + scoreController := score.NewController( + logger, + scope, + registry, + consumer.TopicKeyScore, + "orchestrator-score", + ) + if err := c.Register(scoreController); err != nil { + return fmt.Errorf("failed to register score controller: %w", err) + } + speculateController := speculate.NewController( logger, scope, registry, - consumer.TopicKeyBatched, + consumer.TopicKeySpeculate, "orchestrator-speculate", ) if err := c.Register(speculateController); err != nil { @@ -346,14 +380,14 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t return fmt.Errorf("failed to register build controller: %w", err) } - buildSignalController := buildsignal.NewController( + buildsignalController := buildsignal.NewController( logger, scope, registry, consumer.TopicKeyBuildSignal, "orchestrator-buildsignal", ) - if err := c.Register(buildSignalController); err != nil { + if err := c.Register(buildsignalController); err != nil { return fmt.Errorf("failed to register buildsignal controller: %w", err) } @@ -361,33 +395,22 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t logger, scope, registry, - consumer.TopicKeyToMerge, + consumer.TopicKeyMerge, "orchestrator-merge", ) if err := c.Register(mergeController); err != nil { return fmt.Errorf("failed to register merge controller: %w", err) } - mergeSignalController := mergesignal.NewController( - logger, - scope, - registry, - consumer.TopicKeyMergeSignal, - "orchestrator-mergesignal", - ) - if err := c.Register(mergeSignalController); err != nil { - return fmt.Errorf("failed to register mergesignal controller: %w", err) - } - - finalizeController := finalize.NewController( + concludeController := conclude.NewController( logger, scope, registry, - consumer.TopicKeyFinalize, - "orchestrator-finalize", + consumer.TopicKeyConclude, + "orchestrator-conclude", ) - if err := c.Register(finalizeController); err != nil { - return fmt.Errorf("failed to register finalize controller: %w", err) + if err := c.Register(concludeController); err != nil { + return fmt.Errorf("failed to register conclude controller: %w", err) } return nil diff --git a/orchestrator/controller/batch/batch.go b/orchestrator/controller/batch/batch.go index 3c08c61d..1b0b3faf 100644 --- a/orchestrator/controller/batch/batch.go +++ b/orchestrator/controller/batch/batch.go @@ -15,7 +15,7 @@ import ( ) // Controller handles batch queue messages. -// It consumes validated requests, groups them into batches, and publishes to the speculate stage. +// It consumes validated requests, groups them into batches, and publishes to the score stage. // Implements consumer.Controller interface for integration with the consumer. type Controller struct { logger *zap.SugaredLogger @@ -52,7 +52,7 @@ func NewController( } // Process processes a batch delivery from the queue. -// Deserializes the request, groups into batch, and publishes to the speculate topic. +// Deserializes the request, groups into batch, and publishes to the score topic. // Returns nil to ack (success), or error to nack (retry). func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) error { c.metricsScope.Counter("received").Inc(1) @@ -159,20 +159,20 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er "dependency_count", len(batch.Dependencies), ) - // Publish to speculate topic - if err := c.publish(ctx, consumer.TopicKeyBatched, batch); err != nil { + // Publish to score topic + if err := c.publish(ctx, consumer.TopicKeyScore, batch); err != nil { c.logger.Errorw("failed to publish output", "batch_id", batch.ID, - "topic_key", consumer.TopicKeyBatched, + "topic_key", consumer.TopicKeyScore, "error", err, ) c.metricsScope.Counter("publish_errors").Inc(1) - return errs.NewRetryableError(fmt.Errorf("failed to publish to speculate: %w", err)) + return errs.NewRetryableError(fmt.Errorf("failed to publish to score: %w", err)) } - c.logger.Infow("published batch to next stage", + c.logger.Infow("published batch to score", "batch_id", batch.ID, - "topic_key", consumer.TopicKeyBatched, + "topic_key", consumer.TopicKeyScore, ) c.metricsScope.Counter("processed").Inc(1) diff --git a/orchestrator/controller/batch/batch_test.go b/orchestrator/controller/batch/batch_test.go index 39d25e15..fb9df901 100644 --- a/orchestrator/controller/batch/batch_test.go +++ b/orchestrator/controller/batch/batch_test.go @@ -58,11 +58,11 @@ func newTestController(t *testing.T, ctrl *gomock.Controller, cnt *countermock.M mockQ.EXPECT().Publisher().Return(mockPub).AnyTimes() registry, err := consumer.NewTopicRegistry( - []consumer.TopicConfig{{Key: consumer.TopicKeyBatched, Name: "batched", Queue: mockQ}}, + []consumer.TopicConfig{{Key: consumer.TopicKeyScore, Name: "score", Queue: mockQ}}, ) require.NoError(t, err) - return NewController(logger, scope, registry, cnt, mockStorage, consumer.TopicKeyToBatch, "orchestrator-batch") + return NewController(logger, scope, registry, cnt, mockStorage, consumer.TopicKeyBatch, "orchestrator-batch") } func TestNewController(t *testing.T) { @@ -70,7 +70,7 @@ func TestNewController(t *testing.T) { controller := newTestController(t, ctrl, newSequentialCounter(ctrl), nil, nil) require.NotNil(t, controller) - assert.Equal(t, consumer.TopicKeyToBatch, controller.TopicKey()) + assert.Equal(t, consumer.TopicKeyBatch, controller.TopicKey()) assert.Equal(t, "orchestrator-batch", controller.ConsumerGroup()) assert.Equal(t, "batch", controller.Name()) } diff --git a/orchestrator/controller/build/build.go b/orchestrator/controller/build/build.go index 096625b3..2bc5ba5a 100644 --- a/orchestrator/controller/build/build.go +++ b/orchestrator/controller/build/build.go @@ -13,7 +13,7 @@ import ( ) // Controller handles build queue messages. -// It consumes build requests, triggers builds, and publishes results to the build signal stage. +// It consumes batches, triggers builds, and publishes scheduled builds to the build signal stage (which processes build results). // Implements consumer.Controller interface for integration with the consumer. type Controller struct { logger *zap.SugaredLogger @@ -44,17 +44,17 @@ func NewController( } // Process processes a build delivery from the queue. -// Deserializes the request, triggers a build, and publishes to the build signal topic. +// Deserializes the batch, triggers a build, and publishes a build entity to the build signal topic. // Returns nil to ack (success), or error to nack (retry). func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) error { c.metricsScope.Counter("received").Inc(1) msg := delivery.Message() - // Deserialize request entity - request, err := entity.RequestFromBytes(msg.Payload) + // Deserialize batch entity + batch, err := entity.BatchFromBytes(msg.Payload) if err != nil { - c.logger.Errorw("failed to deserialize request", + c.logger.Errorw("failed to deserialize batch", "message_id", msg.ID, "partition_key", msg.PartitionKey, "attempt", delivery.Attempt(), @@ -62,14 +62,14 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er ) c.metricsScope.Counter("deserialize_errors").Inc(1) // Non-retryable: malformed messages will never succeed regardless of retry count - return fmt.Errorf("failed to deserialize request: %w", err) + return fmt.Errorf("failed to deserialize batch: %w", err) } c.logger.Infow("received build event", - "request_id", request.ID, - "queue", request.Queue, - "state", string(request.State), - "version", request.Version, + "batch_id", batch.ID, + "queue", batch.Queue, + "state", string(batch.State), + "version", batch.Version, "attempt", delivery.Attempt(), "partition_key", msg.PartitionKey, ) @@ -78,19 +78,27 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er // - Trigger CI build // - Track build status - // Publish to build signal topic - if err := c.publish(ctx, consumer.TopicKeyBuildSignal, request); err != nil { + build := entity.Build{ + ID: batch.ID, + BatchID: batch.ID, + Status: entity.BuildStatusQueued, + } + + // Publish build to build signal topic + if err := c.publish(ctx, consumer.TopicKeyBuildSignal, build); err != nil { c.logger.Errorw("failed to publish output", - "request_id", request.ID, + "batch_id", batch.ID, + "build_id", build.ID, "topic_key", consumer.TopicKeyBuildSignal, "error", err, ) c.metricsScope.Counter("publish_errors").Inc(1) - return errs.NewRetryableError(fmt.Errorf("failed to publish to build-signal: %w", err)) + return errs.NewRetryableError(fmt.Errorf("failed to publish to buildsignal: %w", err)) } - c.logger.Infow("published request to next stage", - "request_id", request.ID, + c.logger.Infow("published build to buildsignal", + "batch_id", batch.ID, + "build_id", build.ID, "topic_key", consumer.TopicKeyBuildSignal, ) @@ -99,14 +107,14 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er return nil // Success - message will be acked } -// publish publishes a request to the specified topic key. -func (c *Controller) publish(ctx context.Context, key consumer.TopicKey, request entity.Request) error { - payload, err := request.ToBytes() +// publish publishes a build to the specified topic key. +func (c *Controller) publish(ctx context.Context, key consumer.TopicKey, build entity.Build) error { + payload, err := build.ToBytes() if err != nil { - return fmt.Errorf("failed to serialize request: %w", err) + return fmt.Errorf("failed to serialize build: %w", err) } - msg := entityqueue.NewMessage(request.ID, payload, request.Queue, nil) + msg := entityqueue.NewMessage(build.ID, payload, build.BatchID, nil) q, ok := c.registry.Queue(key) if !ok { diff --git a/orchestrator/controller/build/build_test.go b/orchestrator/controller/build/build_test.go index 6241d582..1feefbd6 100644 --- a/orchestrator/controller/build/build_test.go +++ b/orchestrator/controller/build/build_test.go @@ -33,7 +33,7 @@ func newTestController(t *testing.T, ctrl *gomock.Controller, publishErr error) mockQ.EXPECT().Publisher().Return(mockPub).AnyTimes() registry, err := consumer.NewTopicRegistry( - []consumer.TopicConfig{{Key: consumer.TopicKeyBuildSignal, Name: "build-signal", Queue: mockQ}}, + []consumer.TopicConfig{{Key: consumer.TopicKeyBuildSignal, Name: "buildsignal", Queue: mockQ}}, ) require.NoError(t, err) @@ -55,19 +55,17 @@ func TestController_Process_Success(t *testing.T) { controller := newTestController(t, ctrl, nil) - request := entity.Request{ - ID: "test-queue/123", - Queue: "test-queue", - Change: entity.Change{URIs: []string{"github://uber/service/pull/456/abc123def"}}, - LandStrategy: entity.RequestLandStrategyRebase, - State: entity.RequestStateNew, - Version: 1, + batch := entity.Batch{ + ID: "test-queue/batch/1", + Queue: "test-queue", + State: entity.BatchStateCreated, + Version: 1, } - payload, err := request.ToBytes() + payload, err := batch.ToBytes() require.NoError(t, err) - msg := queue.NewMessage("test-queue/123", payload, "test-queue", nil) + msg := queue.NewMessage("test-queue/batch/1", payload, "test-queue", nil) delivery := queuemock.NewMockDelivery(ctrl) delivery.EXPECT().Message().Return(msg).AnyTimes() delivery.EXPECT().Attempt().Return(1).AnyTimes() @@ -98,19 +96,17 @@ func TestController_Process_PublishFailure(t *testing.T) { controller := newTestController(t, ctrl, fmt.Errorf("publish failed")) - request := entity.Request{ - ID: "test-queue/123", - Queue: "test-queue", - Change: entity.Change{URIs: []string{"github://uber/service/pull/1/xyz789abc"}}, - LandStrategy: entity.RequestLandStrategyRebase, - State: entity.RequestStateNew, - Version: 1, + batch := entity.Batch{ + ID: "test-queue/batch/1", + Queue: "test-queue", + State: entity.BatchStateCreated, + Version: 1, } - payload, err := request.ToBytes() + payload, err := batch.ToBytes() require.NoError(t, err) - msg := queue.NewMessage(request.ID, payload, request.Queue, nil) + msg := queue.NewMessage(batch.ID, payload, batch.Queue, nil) delivery := queuemock.NewMockDelivery(ctrl) delivery.EXPECT().Message().Return(msg).AnyTimes() delivery.EXPECT().Attempt().Return(1).AnyTimes() diff --git a/orchestrator/controller/buildsignal/BUILD.bazel b/orchestrator/controller/buildsignal/BUILD.bazel index 2e8e9e95..ed57d1a2 100644 --- a/orchestrator/controller/buildsignal/BUILD.bazel +++ b/orchestrator/controller/buildsignal/BUILD.bazel @@ -7,7 +7,9 @@ go_library( visibility = ["//visibility:public"], deps = [ "//core/consumer", + "//core/errs", "//entity", + "//entity/queue", "@com_github_uber_go_tally_v4//:tally", "@org_uber_go_zap//:zap", ], diff --git a/orchestrator/controller/buildsignal/buildsignal.go b/orchestrator/controller/buildsignal/buildsignal.go index bcf16729..06e7b2c3 100644 --- a/orchestrator/controller/buildsignal/buildsignal.go +++ b/orchestrator/controller/buildsignal/buildsignal.go @@ -6,12 +6,14 @@ import ( "github.com/uber-go/tally/v4" "github.com/uber/submitqueue/core/consumer" + "github.com/uber/submitqueue/core/errs" "github.com/uber/submitqueue/entity" + entityqueue "github.com/uber/submitqueue/entity/queue" "go.uber.org/zap" ) // Controller handles build signal queue messages. -// It consumes build signals and processes build results. +// It consumes builds from the build signal topic and publishes batch results to the speculate stage only if the build has reached a terminal state. // Implements consumer.Controller interface for integration with the consumer. type Controller struct { logger *zap.SugaredLogger @@ -42,17 +44,17 @@ func NewController( } // Process processes a build signal delivery from the queue. -// Deserializes the request and processes the build result. +// Deserializes the build and publishes a batch result to the speculate topic. // Returns nil to ack (success), or error to nack (retry). func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) error { c.metricsScope.Counter("received").Inc(1) msg := delivery.Message() - // Deserialize request entity - request, err := entity.RequestFromBytes(msg.Payload) + // Deserialize build entity + build, err := entity.BuildFromBytes(msg.Payload) if err != nil { - c.logger.Errorw("failed to deserialize request", + c.logger.Errorw("failed to deserialize build", "message_id", msg.ID, "partition_key", msg.PartitionKey, "attempt", delivery.Attempt(), @@ -60,27 +62,74 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er ) c.metricsScope.Counter("deserialize_errors").Inc(1) // Non-retryable: malformed messages will never succeed regardless of retry count - return fmt.Errorf("failed to deserialize request: %w", err) + return fmt.Errorf("failed to deserialize build: %w", err) } c.logger.Infow("received build signal event", - "request_id", request.ID, - "queue", request.Queue, - "state", string(request.State), - "version", request.Version, + "build_id", build.ID, + "batch_id", build.BatchID, + "status", string(build.Status), "attempt", delivery.Attempt(), "partition_key", msg.PartitionKey, ) // TODO: Add build signal processing logic // - Evaluate build result (pass/fail) - // - Update request state based on build outcome + // - Update batch state based on build outcome + + batch := entity.Batch{ + ID: build.BatchID, + } + + // Publish batch to speculate topic + if err := c.publish(ctx, consumer.TopicKeySpeculate, batch); err != nil { + c.logger.Errorw("failed to publish output", + "build_id", build.ID, + "batch_id", build.BatchID, + "topic_key", consumer.TopicKeySpeculate, + "error", err, + ) + c.metricsScope.Counter("publish_errors").Inc(1) + return errs.NewRetryableError(fmt.Errorf("failed to publish to speculate: %w", err)) + } + + c.logger.Infow("published batch to speculate", + "build_id", build.ID, + "batch_id", build.BatchID, + "topic_key", consumer.TopicKeySpeculate, + ) c.metricsScope.Counter("processed").Inc(1) return nil // Success - message will be acked } +// publish publishes a batch to the specified topic key. +func (c *Controller) publish(ctx context.Context, key consumer.TopicKey, batch entity.Batch) error { + payload, err := batch.ToBytes() + if err != nil { + return fmt.Errorf("failed to serialize batch: %w", err) + } + + msg := entityqueue.NewMessage(batch.ID, payload, batch.Queue, nil) + + q, ok := c.registry.Queue(key) + if !ok { + return fmt.Errorf("no queue registered for topic key %s", key) + } + + topicName, ok := c.registry.TopicName(key) + if !ok { + return fmt.Errorf("no topic name registered for topic key %s", key) + } + + if err := q.Publisher().Publish(ctx, topicName, msg); err != nil { + return fmt.Errorf("failed to publish message: %w", err) + } + + return nil +} + // Name returns the controller name for logging and metrics. func (c *Controller) Name() string { return "buildsignal" diff --git a/orchestrator/controller/buildsignal/buildsignal_test.go b/orchestrator/controller/buildsignal/buildsignal_test.go index 697b94df..cca9eca0 100644 --- a/orchestrator/controller/buildsignal/buildsignal_test.go +++ b/orchestrator/controller/buildsignal/buildsignal_test.go @@ -2,6 +2,7 @@ package buildsignal import ( "context" + "fmt" "testing" "github.com/stretchr/testify/assert" @@ -17,18 +18,31 @@ import ( ) // newTestController creates a controller with test dependencies. -func newTestController(t *testing.T) *Controller { +func newTestController(t *testing.T, ctrl *gomock.Controller, publishErr error) *Controller { logger := zaptest.NewLogger(t).Sugar() scope := tally.NoopScope - registry, err := consumer.NewTopicRegistry(nil) + mockPub := queuemock.NewMockPublisher(ctrl) + mockPub.EXPECT().Publish(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( + func(ctx context.Context, topic string, msg queue.Message) error { + return publishErr + }, + ).AnyTimes() + + mockQ := queuemock.NewMockQueue(ctrl) + mockQ.EXPECT().Publisher().Return(mockPub).AnyTimes() + + registry, err := consumer.NewTopicRegistry( + []consumer.TopicConfig{{Key: consumer.TopicKeySpeculate, Name: "speculate", Queue: mockQ}}, + ) require.NoError(t, err) return NewController(logger, scope, registry, consumer.TopicKeyBuildSignal, "orchestrator-buildsignal") } func TestNewController(t *testing.T) { - controller := newTestController(t) + ctrl := gomock.NewController(t) + controller := newTestController(t, ctrl, nil) require.NotNil(t, controller) assert.Equal(t, consumer.TopicKeyBuildSignal, controller.TopicKey()) @@ -39,21 +53,18 @@ func TestNewController(t *testing.T) { func TestController_Process_Success(t *testing.T) { ctrl := gomock.NewController(t) - controller := newTestController(t) + controller := newTestController(t, ctrl, nil) - request := entity.Request{ - ID: "test-queue/123", - Queue: "test-queue", - Change: entity.Change{URIs: []string{"github://uber/service/pull/456/abc123def"}}, - LandStrategy: entity.RequestLandStrategyRebase, - State: entity.RequestStateNew, - Version: 1, + build := entity.Build{ + ID: "build-123", + BatchID: "test-queue/batch/1", + Status: entity.BuildStatusQueued, } - payload, err := request.ToBytes() + payload, err := build.ToBytes() require.NoError(t, err) - msg := queue.NewMessage("test-queue/123", payload, "test-queue", nil) + msg := queue.NewMessage("build-123", payload, "test-queue/batch/1", nil) delivery := queuemock.NewMockDelivery(ctrl) delivery.EXPECT().Message().Return(msg).AnyTimes() delivery.EXPECT().Attempt().Return(1).AnyTimes() @@ -65,7 +76,7 @@ func TestController_Process_Success(t *testing.T) { func TestController_Process_InvalidJSON(t *testing.T) { ctrl := gomock.NewController(t) - controller := newTestController(t) + controller := newTestController(t, ctrl, nil) invalidPayload := []byte(`{"invalid": json"}`) msg := queue.NewMessage("invalid-msg", invalidPayload, "partition1", nil) @@ -79,8 +90,32 @@ func TestController_Process_InvalidJSON(t *testing.T) { assert.False(t, errs.IsRetryable(err)) } +func TestController_Process_PublishFailure(t *testing.T) { + ctrl := gomock.NewController(t) + + controller := newTestController(t, ctrl, fmt.Errorf("publish failed")) + + build := entity.Build{ + ID: "build-456", + BatchID: "test-queue/batch/2", + Status: entity.BuildStatusRunning, + } + + payload, err := build.ToBytes() + require.NoError(t, err) + + msg := queue.NewMessage(build.ID, payload, build.BatchID, nil) + delivery := queuemock.NewMockDelivery(ctrl) + delivery.EXPECT().Message().Return(msg).AnyTimes() + delivery.EXPECT().Attempt().Return(1).AnyTimes() + + err = controller.Process(context.Background(), delivery) + assert.Error(t, err) +} + func TestController_InterfaceImplementation(t *testing.T) { - controller := newTestController(t) + ctrl := gomock.NewController(t) + controller := newTestController(t, ctrl, nil) var _ consumer.Controller = controller } diff --git a/orchestrator/controller/finalize/BUILD.bazel b/orchestrator/controller/conclude/BUILD.bazel similarity index 81% rename from orchestrator/controller/finalize/BUILD.bazel rename to orchestrator/controller/conclude/BUILD.bazel index 32a4fcbb..09ae5e72 100644 --- a/orchestrator/controller/finalize/BUILD.bazel +++ b/orchestrator/controller/conclude/BUILD.bazel @@ -1,9 +1,9 @@ load("@rules_go//go:def.bzl", "go_library", "go_test") go_library( - name = "finalize", - srcs = ["finalize.go"], - importpath = "github.com/uber/submitqueue/orchestrator/controller/finalize", + name = "conclude", + srcs = ["conclude.go"], + importpath = "github.com/uber/submitqueue/orchestrator/controller/conclude", visibility = ["//visibility:public"], deps = [ "//core/consumer", @@ -14,9 +14,9 @@ go_library( ) go_test( - name = "finalize_test", - srcs = ["finalize_test.go"], - embed = [":finalize"], + name = "conclude_test", + srcs = ["conclude_test.go"], + embed = [":conclude"], deps = [ "//core/consumer", "//core/errs", diff --git a/orchestrator/controller/finalize/finalize.go b/orchestrator/controller/conclude/conclude.go similarity index 68% rename from orchestrator/controller/finalize/finalize.go rename to orchestrator/controller/conclude/conclude.go index 88d60b5d..6beae488 100644 --- a/orchestrator/controller/finalize/finalize.go +++ b/orchestrator/controller/conclude/conclude.go @@ -1,4 +1,4 @@ -package finalize +package conclude import ( "context" @@ -10,8 +10,8 @@ import ( "go.uber.org/zap" ) -// Controller handles finalize queue messages. -// It consumes finalization requests and completes the pipeline. +// Controller handles conclude queue messages. +// It consumes batches and completes the pipeline. // Implements consumer.Controller interface for integration with the consumer. type Controller struct { logger *zap.SugaredLogger @@ -24,7 +24,7 @@ type Controller struct { // Verify Controller implements consumer.Controller interface at compile time. var _ consumer.Controller = (*Controller)(nil) -// NewController creates a new finalize controller for the orchestrator. +// NewController creates a new conclude controller for the orchestrator. func NewController( logger *zap.SugaredLogger, scope tally.Scope, @@ -33,26 +33,26 @@ func NewController( consumerGroup string, ) *Controller { return &Controller{ - logger: logger.Named("finalize_controller"), - metricsScope: scope.SubScope("finalize_controller"), + logger: logger.Named("conclude_controller"), + metricsScope: scope.SubScope("conclude_controller"), registry: registry, topicKey: topicKey, consumerGroup: consumerGroup, } } -// Process processes a finalize delivery from the queue. -// Deserializes the request and completes the pipeline processing. +// Process processes a conclude delivery from the queue. +// Deserializes the batch and completes the pipeline processing. // Returns nil to ack (success), or error to nack (retry). func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) error { c.metricsScope.Counter("received").Inc(1) msg := delivery.Message() - // Deserialize request entity - request, err := entity.RequestFromBytes(msg.Payload) + // Deserialize batch entity + batch, err := entity.BatchFromBytes(msg.Payload) if err != nil { - c.logger.Errorw("failed to deserialize request", + c.logger.Errorw("failed to deserialize batch", "message_id", msg.ID, "partition_key", msg.PartitionKey, "attempt", delivery.Attempt(), @@ -60,20 +60,20 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er ) c.metricsScope.Counter("deserialize_errors").Inc(1) // Non-retryable: malformed messages will never succeed regardless of retry count - return fmt.Errorf("failed to deserialize request: %w", err) + return fmt.Errorf("failed to deserialize batch: %w", err) } - c.logger.Infow("received finalize event", - "request_id", request.ID, - "queue", request.Queue, - "state", string(request.State), - "version", request.Version, + c.logger.Infow("received conclude event", + "batch_id", batch.ID, + "queue", batch.Queue, + "state", string(batch.State), + "version", batch.Version, "attempt", delivery.Attempt(), "partition_key", msg.PartitionKey, ) - // TODO: Add finalization logic - // - Mark request as landed or error + // TODO: Add conclusion logic + // - Mark batch as succeeded or failed // - Send notifications // - Clean up resources @@ -84,7 +84,7 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er // Name returns the controller name for logging and metrics. func (c *Controller) Name() string { - return "finalize" + return "conclude" } // TopicKey returns the topic key this controller subscribes to. diff --git a/orchestrator/controller/finalize/finalize_test.go b/orchestrator/controller/conclude/conclude_test.go similarity index 75% rename from orchestrator/controller/finalize/finalize_test.go rename to orchestrator/controller/conclude/conclude_test.go index 61091a9f..ac0e783e 100644 --- a/orchestrator/controller/finalize/finalize_test.go +++ b/orchestrator/controller/conclude/conclude_test.go @@ -1,4 +1,4 @@ -package finalize +package conclude import ( "context" @@ -24,16 +24,16 @@ func newTestController(t *testing.T) *Controller { registry, err := consumer.NewTopicRegistry(nil) require.NoError(t, err) - return NewController(logger, scope, registry, consumer.TopicKeyFinalize, "orchestrator-finalize") + return NewController(logger, scope, registry, consumer.TopicKeyConclude, "orchestrator-conclude") } func TestNewController(t *testing.T) { controller := newTestController(t) require.NotNil(t, controller) - assert.Equal(t, consumer.TopicKeyFinalize, controller.TopicKey()) - assert.Equal(t, "orchestrator-finalize", controller.ConsumerGroup()) - assert.Equal(t, "finalize", controller.Name()) + assert.Equal(t, consumer.TopicKeyConclude, controller.TopicKey()) + assert.Equal(t, "orchestrator-conclude", controller.ConsumerGroup()) + assert.Equal(t, "conclude", controller.Name()) } func TestController_Process_Success(t *testing.T) { @@ -41,19 +41,17 @@ func TestController_Process_Success(t *testing.T) { controller := newTestController(t) - request := entity.Request{ - ID: "test-queue/123", - Queue: "test-queue", - Change: entity.Change{URIs: []string{"github://uber/service/pull/456/abc123def"}}, - LandStrategy: entity.RequestLandStrategyRebase, - State: entity.RequestStateNew, - Version: 1, + batch := entity.Batch{ + ID: "test-queue/batch/1", + Queue: "test-queue", + State: entity.BatchStateCreated, + Version: 1, } - payload, err := request.ToBytes() + payload, err := batch.ToBytes() require.NoError(t, err) - msg := queue.NewMessage("test-queue/123", payload, "test-queue", nil) + msg := queue.NewMessage("test-queue/batch/1", payload, "test-queue", nil) delivery := queuemock.NewMockDelivery(ctrl) delivery.EXPECT().Message().Return(msg).AnyTimes() delivery.EXPECT().Attempt().Return(1).AnyTimes() diff --git a/orchestrator/controller/merge/merge.go b/orchestrator/controller/merge/merge.go index f12869f0..314f3b09 100644 --- a/orchestrator/controller/merge/merge.go +++ b/orchestrator/controller/merge/merge.go @@ -13,7 +13,7 @@ import ( ) // Controller handles merge queue messages. -// It consumes merge requests, performs merges, and publishes results to the merge signal stage. +// It consumes batches, performs merges, and publishes to both conclude and speculate stages. // Implements consumer.Controller interface for integration with the consumer. type Controller struct { logger *zap.SugaredLogger @@ -44,17 +44,17 @@ func NewController( } // Process processes a merge delivery from the queue. -// Deserializes the request, performs the merge, and publishes to the merge signal topic. +// Deserializes the batch, performs the merge, and publishes to both conclude and speculate topics. // Returns nil to ack (success), or error to nack (retry). func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) error { c.metricsScope.Counter("received").Inc(1) msg := delivery.Message() - // Deserialize request entity - request, err := entity.RequestFromBytes(msg.Payload) + // Deserialize batch entity + batch, err := entity.BatchFromBytes(msg.Payload) if err != nil { - c.logger.Errorw("failed to deserialize request", + c.logger.Errorw("failed to deserialize batch", "message_id", msg.ID, "partition_key", msg.PartitionKey, "attempt", delivery.Attempt(), @@ -62,14 +62,14 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er ) c.metricsScope.Counter("deserialize_errors").Inc(1) // Non-retryable: malformed messages will never succeed regardless of retry count - return fmt.Errorf("failed to deserialize request: %w", err) + return fmt.Errorf("failed to deserialize batch: %w", err) } c.logger.Infow("received merge event", - "request_id", request.ID, - "queue", request.Queue, - "state", string(request.State), - "version", request.Version, + "batch_id", batch.ID, + "queue", batch.Queue, + "state", string(batch.State), + "version", batch.Version, "attempt", delivery.Attempt(), "partition_key", msg.PartitionKey, ) @@ -78,20 +78,36 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er // - Perform source control merge operation // - Handle merge conflicts - // Publish to merge signal topic - if err := c.publish(ctx, consumer.TopicKeyMergeSignal, request); err != nil { - c.logger.Errorw("failed to publish output", - "request_id", request.ID, - "topic_key", consumer.TopicKeyMergeSignal, + // Publish to conclude topic + if err := c.publish(ctx, consumer.TopicKeyConclude, batch); err != nil { + c.logger.Errorw("failed to publish to conclude", + "batch_id", batch.ID, + "topic_key", consumer.TopicKeyConclude, "error", err, ) c.metricsScope.Counter("publish_errors").Inc(1) - return errs.NewRetryableError(fmt.Errorf("failed to publish to merge-signal: %w", err)) + return errs.NewRetryableError(fmt.Errorf("failed to publish to conclude: %w", err)) } - c.logger.Infow("published request to next stage", - "request_id", request.ID, - "topic_key", consumer.TopicKeyMergeSignal, + c.logger.Infow("published batch to conclude", + "batch_id", batch.ID, + "topic_key", consumer.TopicKeyConclude, + ) + + // Publish to speculate topic + if err := c.publish(ctx, consumer.TopicKeySpeculate, batch); err != nil { + c.logger.Errorw("failed to publish to speculate", + "batch_id", batch.ID, + "topic_key", consumer.TopicKeySpeculate, + "error", err, + ) + c.metricsScope.Counter("publish_errors").Inc(1) + return errs.NewRetryableError(fmt.Errorf("failed to publish to speculate: %w", err)) + } + + c.logger.Infow("published batch to speculate", + "batch_id", batch.ID, + "topic_key", consumer.TopicKeySpeculate, ) c.metricsScope.Counter("processed").Inc(1) @@ -99,14 +115,14 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er return nil // Success - message will be acked } -// publish publishes a request to the specified topic key. -func (c *Controller) publish(ctx context.Context, key consumer.TopicKey, request entity.Request) error { - payload, err := request.ToBytes() +// publish publishes a batch to the specified topic key. +func (c *Controller) publish(ctx context.Context, key consumer.TopicKey, batch entity.Batch) error { + payload, err := batch.ToBytes() if err != nil { - return fmt.Errorf("failed to serialize request: %w", err) + return fmt.Errorf("failed to serialize batch: %w", err) } - msg := entityqueue.NewMessage(request.ID, payload, request.Queue, nil) + msg := entityqueue.NewMessage(batch.ID, payload, batch.Queue, nil) q, ok := c.registry.Queue(key) if !ok { diff --git a/orchestrator/controller/merge/merge_test.go b/orchestrator/controller/merge/merge_test.go index dd351180..97687a9d 100644 --- a/orchestrator/controller/merge/merge_test.go +++ b/orchestrator/controller/merge/merge_test.go @@ -33,11 +33,14 @@ func newTestController(t *testing.T, ctrl *gomock.Controller, publishErr error) mockQ.EXPECT().Publisher().Return(mockPub).AnyTimes() registry, err := consumer.NewTopicRegistry( - []consumer.TopicConfig{{Key: consumer.TopicKeyMergeSignal, Name: "merge-signal", Queue: mockQ}}, + []consumer.TopicConfig{ + {Key: consumer.TopicKeyConclude, Name: "conclude", Queue: mockQ}, + {Key: consumer.TopicKeySpeculate, Name: "speculate", Queue: mockQ}, + }, ) require.NoError(t, err) - return NewController(logger, scope, registry, consumer.TopicKeyToMerge, "orchestrator-merge") + return NewController(logger, scope, registry, consumer.TopicKeyMerge, "orchestrator-merge") } func TestNewController(t *testing.T) { @@ -45,7 +48,7 @@ func TestNewController(t *testing.T) { controller := newTestController(t, ctrl, nil) require.NotNil(t, controller) - assert.Equal(t, consumer.TopicKeyToMerge, controller.TopicKey()) + assert.Equal(t, consumer.TopicKeyMerge, controller.TopicKey()) assert.Equal(t, "orchestrator-merge", controller.ConsumerGroup()) assert.Equal(t, "merge", controller.Name()) } @@ -55,19 +58,17 @@ func TestController_Process_Success(t *testing.T) { controller := newTestController(t, ctrl, nil) - request := entity.Request{ - ID: "test-queue/123", - Queue: "test-queue", - Change: entity.Change{URIs: []string{"github://uber/service/pull/456/abc123def"}}, - LandStrategy: entity.RequestLandStrategyRebase, - State: entity.RequestStateNew, - Version: 1, + batch := entity.Batch{ + ID: "test-queue/batch/1", + Queue: "test-queue", + State: entity.BatchStateCreated, + Version: 1, } - payload, err := request.ToBytes() + payload, err := batch.ToBytes() require.NoError(t, err) - msg := queue.NewMessage("test-queue/123", payload, "test-queue", nil) + msg := queue.NewMessage("test-queue/batch/1", payload, "test-queue", nil) delivery := queuemock.NewMockDelivery(ctrl) delivery.EXPECT().Message().Return(msg).AnyTimes() delivery.EXPECT().Attempt().Return(1).AnyTimes() @@ -98,19 +99,17 @@ func TestController_Process_PublishFailure(t *testing.T) { controller := newTestController(t, ctrl, fmt.Errorf("publish failed")) - request := entity.Request{ - ID: "test-queue/123", - Queue: "test-queue", - Change: entity.Change{URIs: []string{"github://uber/service/pull/1/xyz789abc"}}, - LandStrategy: entity.RequestLandStrategyRebase, - State: entity.RequestStateNew, - Version: 1, + batch := entity.Batch{ + ID: "test-queue/batch/1", + Queue: "test-queue", + State: entity.BatchStateCreated, + Version: 1, } - payload, err := request.ToBytes() + payload, err := batch.ToBytes() require.NoError(t, err) - msg := queue.NewMessage(request.ID, payload, request.Queue, nil) + msg := queue.NewMessage(batch.ID, payload, batch.Queue, nil) delivery := queuemock.NewMockDelivery(ctrl) delivery.EXPECT().Message().Return(msg).AnyTimes() delivery.EXPECT().Attempt().Return(1).AnyTimes() diff --git a/orchestrator/controller/mergesignal/mergesignal.go b/orchestrator/controller/mergesignal/mergesignal.go deleted file mode 100644 index a136d24e..00000000 --- a/orchestrator/controller/mergesignal/mergesignal.go +++ /dev/null @@ -1,97 +0,0 @@ -package mergesignal - -import ( - "context" - "fmt" - - "github.com/uber-go/tally/v4" - "github.com/uber/submitqueue/core/consumer" - "github.com/uber/submitqueue/entity" - "go.uber.org/zap" -) - -// Controller handles merge signal queue messages. -// It consumes merge signals and processes merge results. -// Implements consumer.Controller interface for integration with the consumer. -type Controller struct { - logger *zap.SugaredLogger - metricsScope tally.Scope - registry consumer.TopicRegistry - topicKey consumer.TopicKey - consumerGroup string -} - -// Verify Controller implements consumer.Controller interface at compile time. -var _ consumer.Controller = (*Controller)(nil) - -// NewController creates a new merge signal controller for the orchestrator. -func NewController( - logger *zap.SugaredLogger, - scope tally.Scope, - registry consumer.TopicRegistry, - topicKey consumer.TopicKey, - consumerGroup string, -) *Controller { - return &Controller{ - logger: logger.Named("mergesignal_controller"), - metricsScope: scope.SubScope("mergesignal_controller"), - registry: registry, - topicKey: topicKey, - consumerGroup: consumerGroup, - } -} - -// Process processes a merge signal delivery from the queue. -// Deserializes the request and processes the merge result. -// Returns nil to ack (success), or error to nack (retry). -func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) error { - c.metricsScope.Counter("received").Inc(1) - - msg := delivery.Message() - - // Deserialize request entity - request, err := entity.RequestFromBytes(msg.Payload) - if err != nil { - c.logger.Errorw("failed to deserialize request", - "message_id", msg.ID, - "partition_key", msg.PartitionKey, - "attempt", delivery.Attempt(), - "error", err, - ) - c.metricsScope.Counter("deserialize_errors").Inc(1) - // Non-retryable: malformed messages will never succeed regardless of retry count - return fmt.Errorf("failed to deserialize request: %w", err) - } - - c.logger.Infow("received merge signal event", - "request_id", request.ID, - "queue", request.Queue, - "state", string(request.State), - "version", request.Version, - "attempt", delivery.Attempt(), - "partition_key", msg.PartitionKey, - ) - - // TODO: Add merge signal processing logic - // - Evaluate merge result (success/failure) - // - Update request state based on merge outcome - - c.metricsScope.Counter("processed").Inc(1) - - return nil // Success - message will be acked -} - -// Name returns the controller name for logging and metrics. -func (c *Controller) Name() string { - return "mergesignal" -} - -// TopicKey returns the topic key this controller subscribes to. -func (c *Controller) TopicKey() consumer.TopicKey { - return c.topicKey -} - -// ConsumerGroup returns the consumer group for offset tracking. -func (c *Controller) ConsumerGroup() string { - return c.consumerGroup -} diff --git a/orchestrator/controller/mergesignal/mergesignal_test.go b/orchestrator/controller/mergesignal/mergesignal_test.go deleted file mode 100644 index cd31a2d1..00000000 --- a/orchestrator/controller/mergesignal/mergesignal_test.go +++ /dev/null @@ -1,86 +0,0 @@ -package mergesignal - -import ( - "context" - "testing" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "github.com/uber-go/tally/v4" - "github.com/uber/submitqueue/core/consumer" - "github.com/uber/submitqueue/core/errs" - "github.com/uber/submitqueue/entity" - "github.com/uber/submitqueue/entity/queue" - queuemock "github.com/uber/submitqueue/extension/queue/mock" - "go.uber.org/mock/gomock" - "go.uber.org/zap/zaptest" -) - -// newTestController creates a controller with test dependencies. -func newTestController(t *testing.T) *Controller { - logger := zaptest.NewLogger(t).Sugar() - scope := tally.NoopScope - - registry, err := consumer.NewTopicRegistry(nil) - require.NoError(t, err) - - return NewController(logger, scope, registry, consumer.TopicKeyMergeSignal, "orchestrator-mergesignal") -} - -func TestNewController(t *testing.T) { - controller := newTestController(t) - - require.NotNil(t, controller) - assert.Equal(t, consumer.TopicKeyMergeSignal, controller.TopicKey()) - assert.Equal(t, "orchestrator-mergesignal", controller.ConsumerGroup()) - assert.Equal(t, "mergesignal", controller.Name()) -} - -func TestController_Process_Success(t *testing.T) { - ctrl := gomock.NewController(t) - - controller := newTestController(t) - - request := entity.Request{ - ID: "test-queue/123", - Queue: "test-queue", - Change: entity.Change{URIs: []string{"github://uber/service/pull/456/abc123def"}}, - LandStrategy: entity.RequestLandStrategyRebase, - State: entity.RequestStateNew, - Version: 1, - } - - payload, err := request.ToBytes() - require.NoError(t, err) - - msg := queue.NewMessage("test-queue/123", payload, "test-queue", nil) - delivery := queuemock.NewMockDelivery(ctrl) - delivery.EXPECT().Message().Return(msg).AnyTimes() - delivery.EXPECT().Attempt().Return(1).AnyTimes() - - err = controller.Process(context.Background(), delivery) - require.NoError(t, err) -} - -func TestController_Process_InvalidJSON(t *testing.T) { - ctrl := gomock.NewController(t) - - controller := newTestController(t) - - invalidPayload := []byte(`{"invalid": json"}`) - msg := queue.NewMessage("invalid-msg", invalidPayload, "partition1", nil) - delivery := queuemock.NewMockDelivery(ctrl) - delivery.EXPECT().Message().Return(msg).AnyTimes() - delivery.EXPECT().Attempt().Return(1).AnyTimes() - - err := controller.Process(context.Background(), delivery) - - require.Error(t, err) - assert.False(t, errs.IsRetryable(err)) -} - -func TestController_InterfaceImplementation(t *testing.T) { - controller := newTestController(t) - - var _ consumer.Controller = controller -} diff --git a/orchestrator/controller/request/BUILD.bazel b/orchestrator/controller/request/BUILD.bazel index 8ae98813..7fd2d24b 100644 --- a/orchestrator/controller/request/BUILD.bazel +++ b/orchestrator/controller/request/BUILD.bazel @@ -10,7 +10,6 @@ go_library( "//core/errs", "//entity", "//entity/queue", - "//extension/mergechecker", "@com_github_uber_go_tally_v4//:tally", "@org_uber_go_zap//:zap", ], @@ -25,8 +24,6 @@ go_test( "//core/errs", "//entity", "//entity/queue", - "//extension/mergechecker", - "//extension/mergechecker/mock", "//extension/queue/mock", "@com_github_stretchr_testify//assert", "@com_github_stretchr_testify//require", diff --git a/orchestrator/controller/request/request.go b/orchestrator/controller/request/request.go index c74c315d..e2ca885f 100644 --- a/orchestrator/controller/request/request.go +++ b/orchestrator/controller/request/request.go @@ -9,18 +9,16 @@ import ( "github.com/uber/submitqueue/core/errs" "github.com/uber/submitqueue/entity" entityqueue "github.com/uber/submitqueue/entity/queue" - "github.com/uber/submitqueue/extension/mergechecker" "go.uber.org/zap" ) // Controller handles request queue messages. -// It consumes requests, validates them, and publishes to the next stage. +// It consumes requests and publishes to the validate stage. // Implements consumer.Controller interface for integration with the consumer. type Controller struct { logger *zap.SugaredLogger metricsScope tally.Scope registry consumer.TopicRegistry - mergeChecker mergechecker.MergeChecker topicKey consumer.TopicKey consumerGroup string } @@ -33,7 +31,6 @@ func NewController( logger *zap.SugaredLogger, scope tally.Scope, registry consumer.TopicRegistry, - mergeChecker mergechecker.MergeChecker, topicKey consumer.TopicKey, consumerGroup string, ) *Controller { @@ -41,14 +38,13 @@ func NewController( logger: logger.Named("request_controller"), metricsScope: scope.SubScope("request_controller"), registry: registry, - mergeChecker: mergeChecker, topicKey: topicKey, consumerGroup: consumerGroup, } } // Process processes a request delivery from the queue. -// Deserializes the request, validates it, and publishes to the batch topic. +// Deserializes the request and publishes to the validate topic. // Returns nil to ack (success), or error to nack (retry). func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) error { c.metricsScope.Counter("received").Inc(1) @@ -81,40 +77,20 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er "partition_key", msg.PartitionKey, ) - // Merge conflict check - mergeResult, err := c.mergeChecker.Check(ctx, request.Queue, request.Change) - if err != nil { - c.logger.Errorw("merge check failed", - "request_id", request.ID, - "error", err, - ) - c.metricsScope.Counter("merge_check_errors").Inc(1) - return fmt.Errorf("merge check failed: %w", err) - } - if !mergeResult.Mergeable { - c.logger.Infow("request not mergeable", - "request_id", request.ID, - "queue", request.Queue, - "reason", mergeResult.Reason, - ) - c.metricsScope.Counter("not_mergeable").Inc(1) - return errs.NewUserError(fmt.Errorf("request %s is not mergeable: %s", request.ID, mergeResult.Reason)) - } - - // Publish to batch topic - if err := c.publish(ctx, consumer.TopicKeyToBatch, request); err != nil { + // Publish to validate topic + if err := c.publish(ctx, consumer.TopicKeyValidate, request); err != nil { c.logger.Errorw("failed to publish output", "request_id", request.ID, - "topic_key", "to-batch", + "topic_key", consumer.TopicKeyValidate, "error", err, ) c.metricsScope.Counter("publish_errors").Inc(1) - return errs.NewRetryableError(fmt.Errorf("failed to publish to batch: %w", err)) + return errs.NewRetryableError(fmt.Errorf("failed to publish to validate: %w", err)) } - c.logger.Infow("published request to next stage", + c.logger.Infow("published request to validate", "request_id", request.ID, - "topic_key", "to-batch", + "topic_key", consumer.TopicKeyValidate, ) c.metricsScope.Counter("processed").Inc(1) diff --git a/orchestrator/controller/request/request_test.go b/orchestrator/controller/request/request_test.go index e6a6144c..b0098e2e 100644 --- a/orchestrator/controller/request/request_test.go +++ b/orchestrator/controller/request/request_test.go @@ -12,22 +12,13 @@ import ( "github.com/uber/submitqueue/core/errs" "github.com/uber/submitqueue/entity" "github.com/uber/submitqueue/entity/queue" - "github.com/uber/submitqueue/extension/mergechecker" - mergecheckermock "github.com/uber/submitqueue/extension/mergechecker/mock" queuemock "github.com/uber/submitqueue/extension/queue/mock" "go.uber.org/mock/gomock" "go.uber.org/zap/zaptest" ) -// newMergeableMock returns a mock MergeChecker that always returns mergeable. -func newMergeableMock(ctrl *gomock.Controller) *mergecheckermock.MockMergeChecker { - mc := mergecheckermock.NewMockMergeChecker(ctrl) - mc.EXPECT().Check(gomock.Any(), gomock.Any(), gomock.Any()).Return(mergechecker.Result{Mergeable: true}, nil).AnyTimes() - return mc -} - // newTestController creates a controller with test dependencies. -func newTestController(t *testing.T, ctrl *gomock.Controller, mc mergechecker.MergeChecker, publishErr error) *Controller { +func newTestController(t *testing.T, ctrl *gomock.Controller, publishErr error) *Controller { logger := zaptest.NewLogger(t).Sugar() scope := tally.NoopScope @@ -42,17 +33,16 @@ func newTestController(t *testing.T, ctrl *gomock.Controller, mc mergechecker.Me mockQ.EXPECT().Publisher().Return(mockPub).AnyTimes() registry, err := consumer.NewTopicRegistry( - []consumer.TopicConfig{{Key: consumer.TopicKeyToBatch, Name: "to-batch", Queue: mockQ}}, + []consumer.TopicConfig{{Key: consumer.TopicKeyValidate, Name: "validate", Queue: mockQ}}, ) require.NoError(t, err) - return NewController(logger, scope, registry, mc, consumer.TopicKeyRequest, "orchestrator-request") + return NewController(logger, scope, registry, consumer.TopicKeyRequest, "orchestrator-request") } func TestNewController(t *testing.T) { ctrl := gomock.NewController(t) - mc := newMergeableMock(ctrl) - controller := newTestController(t, ctrl, mc, nil) + controller := newTestController(t, ctrl, nil) require.NotNil(t, controller) assert.Equal(t, consumer.TopicKeyRequest, controller.TopicKey()) @@ -62,9 +52,8 @@ func TestNewController(t *testing.T) { func TestController_Process_Success(t *testing.T) { ctrl := gomock.NewController(t) - mc := newMergeableMock(ctrl) - controller := newTestController(t, ctrl, mc, nil) + controller := newTestController(t, ctrl, nil) request := entity.Request{ ID: "test-queue/123", @@ -89,9 +78,8 @@ func TestController_Process_Success(t *testing.T) { func TestController_Process_InvalidJSON(t *testing.T) { ctrl := gomock.NewController(t) - mc := newMergeableMock(ctrl) - controller := newTestController(t, ctrl, mc, nil) + controller := newTestController(t, ctrl, nil) invalidPayload := []byte(`{"invalid": json"}`) msg := queue.NewMessage("invalid-msg", invalidPayload, "partition1", nil) @@ -122,9 +110,8 @@ func TestController_Process_AllRequestStates(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { ctrl := gomock.NewController(t) - mc := newMergeableMock(ctrl) - controller := newTestController(t, ctrl, mc, nil) + controller := newTestController(t, ctrl, nil) request := entity.Request{ ID: fmt.Sprintf("queue/%s", tt.state), @@ -151,9 +138,8 @@ func TestController_Process_AllRequestStates(t *testing.T) { func TestController_Process_MultipleChanges(t *testing.T) { ctrl := gomock.NewController(t) - mc := newMergeableMock(ctrl) - controller := newTestController(t, ctrl, mc, nil) + controller := newTestController(t, ctrl, nil) request := entity.Request{ ID: "queue/999", @@ -184,9 +170,8 @@ func TestController_Process_MultipleChanges(t *testing.T) { func TestController_Process_PublishFailure(t *testing.T) { ctrl := gomock.NewController(t) - mc := newMergeableMock(ctrl) - controller := newTestController(t, ctrl, mc, fmt.Errorf("publish failed")) + controller := newTestController(t, ctrl, fmt.Errorf("publish failed")) request := entity.Request{ ID: "test-queue/123", @@ -211,68 +196,7 @@ func TestController_Process_PublishFailure(t *testing.T) { func TestController_InterfaceImplementation(t *testing.T) { ctrl := gomock.NewController(t) - mc := newMergeableMock(ctrl) - controller := newTestController(t, ctrl, mc, nil) + controller := newTestController(t, ctrl, nil) var _ consumer.Controller = controller } - -func TestController_Process_NotMergeable(t *testing.T) { - ctrl := gomock.NewController(t) - - mc := mergecheckermock.NewMockMergeChecker(ctrl) - mc.EXPECT().Check(gomock.Any(), gomock.Any(), gomock.Any()).Return(mergechecker.Result{Mergeable: false}, nil) - - controller := newTestController(t, ctrl, mc, nil) - - request := entity.Request{ - ID: "test-queue/123", - Queue: "test-queue", - Change: entity.Change{URIs: []string{"github://uber/repo/1/abc123"}}, - LandStrategy: entity.RequestLandStrategyRebase, - State: entity.RequestStateNew, - Version: 1, - } - - payload, err := request.ToBytes() - require.NoError(t, err) - - msg := queue.NewMessage(request.ID, payload, request.Queue, nil) - delivery := queuemock.NewMockDelivery(ctrl) - delivery.EXPECT().Message().Return(msg).AnyTimes() - delivery.EXPECT().Attempt().Return(1).AnyTimes() - - err = controller.Process(context.Background(), delivery) - require.Error(t, err) - assert.False(t, errs.IsRetryable(err)) -} - -func TestController_Process_MergeCheckError(t *testing.T) { - ctrl := gomock.NewController(t) - - mc := mergecheckermock.NewMockMergeChecker(ctrl) - mc.EXPECT().Check(gomock.Any(), gomock.Any(), gomock.Any()).Return(mergechecker.Result{}, fmt.Errorf("merge check failed")) - - controller := newTestController(t, ctrl, mc, nil) - - request := entity.Request{ - ID: "test-queue/123", - Queue: "test-queue", - Change: entity.Change{URIs: []string{"github://uber/repo/1/abc123"}}, - LandStrategy: entity.RequestLandStrategyRebase, - State: entity.RequestStateNew, - Version: 1, - } - - payload, err := request.ToBytes() - require.NoError(t, err) - - msg := queue.NewMessage(request.ID, payload, request.Queue, nil) - delivery := queuemock.NewMockDelivery(ctrl) - delivery.EXPECT().Message().Return(msg).AnyTimes() - delivery.EXPECT().Attempt().Return(1).AnyTimes() - - err = controller.Process(context.Background(), delivery) - require.Error(t, err) - assert.False(t, errs.IsRetryable(err)) -} diff --git a/orchestrator/controller/mergesignal/BUILD.bazel b/orchestrator/controller/score/BUILD.bazel similarity index 79% rename from orchestrator/controller/mergesignal/BUILD.bazel rename to orchestrator/controller/score/BUILD.bazel index c62db22e..43829396 100644 --- a/orchestrator/controller/mergesignal/BUILD.bazel +++ b/orchestrator/controller/score/BUILD.bazel @@ -1,22 +1,24 @@ load("@rules_go//go:def.bzl", "go_library", "go_test") go_library( - name = "mergesignal", - srcs = ["mergesignal.go"], - importpath = "github.com/uber/submitqueue/orchestrator/controller/mergesignal", + name = "score", + srcs = ["score.go"], + importpath = "github.com/uber/submitqueue/orchestrator/controller/score", visibility = ["//visibility:public"], deps = [ "//core/consumer", + "//core/errs", "//entity", + "//entity/queue", "@com_github_uber_go_tally_v4//:tally", "@org_uber_go_zap//:zap", ], ) go_test( - name = "mergesignal_test", - srcs = ["mergesignal_test.go"], - embed = [":mergesignal"], + name = "score_test", + srcs = ["score_test.go"], + embed = [":score"], deps = [ "//core/consumer", "//core/errs", diff --git a/orchestrator/controller/score/score.go b/orchestrator/controller/score/score.go new file mode 100644 index 00000000..f6e07ed8 --- /dev/null +++ b/orchestrator/controller/score/score.go @@ -0,0 +1,141 @@ +package score + +import ( + "context" + "fmt" + + "github.com/uber-go/tally/v4" + "github.com/uber/submitqueue/core/consumer" + "github.com/uber/submitqueue/core/errs" + "github.com/uber/submitqueue/entity" + entityqueue "github.com/uber/submitqueue/entity/queue" + "go.uber.org/zap" +) + +// Controller handles score queue messages. +// It consumes batches, scores them, and publishes to the speculate stage. +// Implements consumer.Controller interface for integration with the consumer. +type Controller struct { + logger *zap.SugaredLogger + metricsScope tally.Scope + registry consumer.TopicRegistry + topicKey consumer.TopicKey + consumerGroup string +} + +// Verify Controller implements consumer.Controller interface at compile time. +var _ consumer.Controller = (*Controller)(nil) + +// NewController creates a new score controller for the orchestrator. +func NewController( + logger *zap.SugaredLogger, + scope tally.Scope, + registry consumer.TopicRegistry, + topicKey consumer.TopicKey, + consumerGroup string, +) *Controller { + return &Controller{ + logger: logger.Named("score_controller"), + metricsScope: scope.SubScope("score_controller"), + registry: registry, + topicKey: topicKey, + consumerGroup: consumerGroup, + } +} + +// Process processes a score delivery from the queue. +// Deserializes the batch, scores it, and publishes to the speculate topic. +// Returns nil to ack (success), or error to nack (retry). +func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) error { + c.metricsScope.Counter("received").Inc(1) + + msg := delivery.Message() + + // Deserialize batch entity + batch, err := entity.BatchFromBytes(msg.Payload) + if err != nil { + c.logger.Errorw("failed to deserialize batch", + "message_id", msg.ID, + "partition_key", msg.PartitionKey, + "attempt", delivery.Attempt(), + "error", err, + ) + c.metricsScope.Counter("deserialize_errors").Inc(1) + // Non-retryable: malformed messages will never succeed regardless of retry count + return fmt.Errorf("failed to deserialize batch: %w", err) + } + + c.logger.Infow("received score event", + "batch_id", batch.ID, + "queue", batch.Queue, + "state", string(batch.State), + "version", batch.Version, + "attempt", delivery.Attempt(), + "partition_key", msg.PartitionKey, + ) + + // TODO: Add scoring logic + // - Evaluate batch priority + // - Apply scoring heuristics + + // Publish to speculate topic + if err := c.publish(ctx, consumer.TopicKeySpeculate, batch); err != nil { + c.logger.Errorw("failed to publish output", + "batch_id", batch.ID, + "topic_key", consumer.TopicKeySpeculate, + "error", err, + ) + c.metricsScope.Counter("publish_errors").Inc(1) + return errs.NewRetryableError(fmt.Errorf("failed to publish to speculate: %w", err)) + } + + c.logger.Infow("published batch to speculate", + "batch_id", batch.ID, + "topic_key", consumer.TopicKeySpeculate, + ) + + c.metricsScope.Counter("processed").Inc(1) + + return nil // Success - message will be acked +} + +// publish publishes a batch to the specified topic key. +func (c *Controller) publish(ctx context.Context, key consumer.TopicKey, batch entity.Batch) error { + payload, err := batch.ToBytes() + if err != nil { + return fmt.Errorf("failed to serialize batch: %w", err) + } + + msg := entityqueue.NewMessage(batch.ID, payload, batch.Queue, nil) + + q, ok := c.registry.Queue(key) + if !ok { + return fmt.Errorf("no queue registered for topic key %s", key) + } + + topicName, ok := c.registry.TopicName(key) + if !ok { + return fmt.Errorf("no topic name registered for topic key %s", key) + } + + if err := q.Publisher().Publish(ctx, topicName, msg); err != nil { + return fmt.Errorf("failed to publish message: %w", err) + } + + return nil +} + +// Name returns the controller name for logging and metrics. +func (c *Controller) Name() string { + return "score" +} + +// TopicKey returns the topic key this controller subscribes to. +func (c *Controller) TopicKey() consumer.TopicKey { + return c.topicKey +} + +// ConsumerGroup returns the consumer group for offset tracking. +func (c *Controller) ConsumerGroup() string { + return c.consumerGroup +} diff --git a/orchestrator/controller/score/score_test.go b/orchestrator/controller/score/score_test.go new file mode 100644 index 00000000..e10d5006 --- /dev/null +++ b/orchestrator/controller/score/score_test.go @@ -0,0 +1,123 @@ +package score + +import ( + "context" + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/uber-go/tally/v4" + "github.com/uber/submitqueue/core/consumer" + "github.com/uber/submitqueue/core/errs" + "github.com/uber/submitqueue/entity" + "github.com/uber/submitqueue/entity/queue" + queuemock "github.com/uber/submitqueue/extension/queue/mock" + "go.uber.org/mock/gomock" + "go.uber.org/zap/zaptest" +) + +// newTestController creates a controller with test dependencies. +func newTestController(t *testing.T, ctrl *gomock.Controller, publishErr error) *Controller { + logger := zaptest.NewLogger(t).Sugar() + scope := tally.NoopScope + + mockPub := queuemock.NewMockPublisher(ctrl) + mockPub.EXPECT().Publish(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( + func(ctx context.Context, topic string, msg queue.Message) error { + return publishErr + }, + ).AnyTimes() + + mockQ := queuemock.NewMockQueue(ctrl) + mockQ.EXPECT().Publisher().Return(mockPub).AnyTimes() + + registry, err := consumer.NewTopicRegistry( + []consumer.TopicConfig{{Key: consumer.TopicKeySpeculate, Name: "speculate", Queue: mockQ}}, + ) + require.NoError(t, err) + + return NewController(logger, scope, registry, consumer.TopicKeyScore, "orchestrator-score") +} + +func TestNewController(t *testing.T) { + ctrl := gomock.NewController(t) + controller := newTestController(t, ctrl, nil) + + require.NotNil(t, controller) + assert.Equal(t, consumer.TopicKeyScore, controller.TopicKey()) + assert.Equal(t, "orchestrator-score", controller.ConsumerGroup()) + assert.Equal(t, "score", controller.Name()) +} + +func TestController_Process_Success(t *testing.T) { + ctrl := gomock.NewController(t) + + controller := newTestController(t, ctrl, nil) + + batch := entity.Batch{ + ID: "test-queue/batch/1", + Queue: "test-queue", + State: entity.BatchStateCreated, + Version: 1, + } + + payload, err := batch.ToBytes() + require.NoError(t, err) + + msg := queue.NewMessage("test-queue/batch/1", payload, "test-queue", nil) + delivery := queuemock.NewMockDelivery(ctrl) + delivery.EXPECT().Message().Return(msg).AnyTimes() + delivery.EXPECT().Attempt().Return(1).AnyTimes() + + err = controller.Process(context.Background(), delivery) + require.NoError(t, err) +} + +func TestController_Process_InvalidJSON(t *testing.T) { + ctrl := gomock.NewController(t) + + controller := newTestController(t, ctrl, nil) + + invalidPayload := []byte(`{"invalid": json"}`) + msg := queue.NewMessage("invalid-msg", invalidPayload, "partition1", nil) + delivery := queuemock.NewMockDelivery(ctrl) + delivery.EXPECT().Message().Return(msg).AnyTimes() + delivery.EXPECT().Attempt().Return(1).AnyTimes() + + err := controller.Process(context.Background(), delivery) + + require.Error(t, err) + assert.False(t, errs.IsRetryable(err)) +} + +func TestController_Process_PublishFailure(t *testing.T) { + ctrl := gomock.NewController(t) + + controller := newTestController(t, ctrl, fmt.Errorf("publish failed")) + + batch := entity.Batch{ + ID: "test-queue/batch/1", + Queue: "test-queue", + State: entity.BatchStateCreated, + Version: 1, + } + + payload, err := batch.ToBytes() + require.NoError(t, err) + + msg := queue.NewMessage(batch.ID, payload, batch.Queue, nil) + delivery := queuemock.NewMockDelivery(ctrl) + delivery.EXPECT().Message().Return(msg).AnyTimes() + delivery.EXPECT().Attempt().Return(1).AnyTimes() + + err = controller.Process(context.Background(), delivery) + assert.Error(t, err) +} + +func TestController_InterfaceImplementation(t *testing.T) { + ctrl := gomock.NewController(t) + controller := newTestController(t, ctrl, nil) + + var _ consumer.Controller = controller +} diff --git a/orchestrator/controller/speculate/speculate.go b/orchestrator/controller/speculate/speculate.go index 14d9cc43..93be3257 100644 --- a/orchestrator/controller/speculate/speculate.go +++ b/orchestrator/controller/speculate/speculate.go @@ -13,7 +13,7 @@ import ( ) // Controller handles speculate queue messages. -// It consumes batched requests, performs speculation, and publishes to both build and merge stages. +// It consumes batches, performs speculation, and publishes to both build and merge stages. // Implements consumer.Controller interface for integration with the consumer. type Controller struct { logger *zap.SugaredLogger @@ -44,17 +44,17 @@ func NewController( } // Process processes a speculate delivery from the queue. -// Deserializes the request, performs speculation, and publishes to both build and merge topics. +// Deserializes the batch, performs speculation, and publishes to both build and merge topics. // Returns nil to ack (success), or error to nack (retry). func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) error { c.metricsScope.Counter("received").Inc(1) msg := delivery.Message() - // Deserialize request entity - request, err := entity.RequestFromBytes(msg.Payload) + // Deserialize batch entity + batch, err := entity.BatchFromBytes(msg.Payload) if err != nil { - c.logger.Errorw("failed to deserialize request", + c.logger.Errorw("failed to deserialize batch", "message_id", msg.ID, "partition_key", msg.PartitionKey, "attempt", delivery.Attempt(), @@ -62,14 +62,14 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er ) c.metricsScope.Counter("deserialize_errors").Inc(1) // Non-retryable: malformed messages will never succeed regardless of retry count - return fmt.Errorf("failed to deserialize request: %w", err) + return fmt.Errorf("failed to deserialize batch: %w", err) } c.logger.Infow("received speculate event", - "request_id", request.ID, - "queue", request.Queue, - "state", string(request.State), - "version", request.Version, + "batch_id", batch.ID, + "queue", batch.Queue, + "state", string(batch.State), + "version", batch.Version, "attempt", delivery.Attempt(), "partition_key", msg.PartitionKey, ) @@ -77,11 +77,13 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er // TODO: Add speculation logic // - Speculative merge/rebase // - Conflict detection + // - Publish to build only if speculation is in progress (needs CI verification) + // - Publish to merge only if speculation is complete and successful (ready to land) // Publish to build topic - if err := c.publish(ctx, consumer.TopicKeyBuild, request); err != nil { + if err := c.publish(ctx, consumer.TopicKeyBuild, batch); err != nil { c.logger.Errorw("failed to publish to build", - "request_id", request.ID, + "batch_id", batch.ID, "topic_key", consumer.TopicKeyBuild, "error", err, ) @@ -89,20 +91,25 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er return errs.NewRetryableError(fmt.Errorf("failed to publish to build: %w", err)) } + c.logger.Infow("published batch to build", + "batch_id", batch.ID, + "topic_key", consumer.TopicKeyBuild, + ) + // Publish to merge topic - if err := c.publish(ctx, consumer.TopicKeyToMerge, request); err != nil { + if err := c.publish(ctx, consumer.TopicKeyMerge, batch); err != nil { c.logger.Errorw("failed to publish to merge", - "request_id", request.ID, - "topic_key", consumer.TopicKeyToMerge, + "batch_id", batch.ID, + "topic_key", consumer.TopicKeyMerge, "error", err, ) c.metricsScope.Counter("publish_errors").Inc(1) return errs.NewRetryableError(fmt.Errorf("failed to publish to merge: %w", err)) } - c.logger.Infow("published request to next stages", - "request_id", request.ID, - "topic_keys", []string{consumer.TopicKeyBuild.String(), consumer.TopicKeyToMerge.String()}, + c.logger.Infow("published batch to merge", + "batch_id", batch.ID, + "topic_key", consumer.TopicKeyMerge, ) c.metricsScope.Counter("processed").Inc(1) @@ -110,14 +117,14 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er return nil // Success - message will be acked } -// publish publishes a request to the specified topic key. -func (c *Controller) publish(ctx context.Context, key consumer.TopicKey, request entity.Request) error { - payload, err := request.ToBytes() +// publish publishes a batch to the specified topic key. +func (c *Controller) publish(ctx context.Context, key consumer.TopicKey, batch entity.Batch) error { + payload, err := batch.ToBytes() if err != nil { - return fmt.Errorf("failed to serialize request: %w", err) + return fmt.Errorf("failed to serialize batch: %w", err) } - msg := entityqueue.NewMessage(request.ID, payload, request.Queue, nil) + msg := entityqueue.NewMessage(batch.ID, payload, batch.Queue, nil) q, ok := c.registry.Queue(key) if !ok { diff --git a/orchestrator/controller/speculate/speculate_test.go b/orchestrator/controller/speculate/speculate_test.go index ad0e89a0..1ace8b2e 100644 --- a/orchestrator/controller/speculate/speculate_test.go +++ b/orchestrator/controller/speculate/speculate_test.go @@ -35,12 +35,12 @@ func newTestController(t *testing.T, ctrl *gomock.Controller, publishErr error) registry, err := consumer.NewTopicRegistry( []consumer.TopicConfig{ {Key: consumer.TopicKeyBuild, Name: "build", Queue: mockQ}, - {Key: consumer.TopicKeyToMerge, Name: "to-merge", Queue: mockQ}, + {Key: consumer.TopicKeyMerge, Name: "merge", Queue: mockQ}, }, ) require.NoError(t, err) - return NewController(logger, scope, registry, consumer.TopicKeyBatched, "orchestrator-speculate") + return NewController(logger, scope, registry, consumer.TopicKeySpeculate, "orchestrator-speculate") } func TestNewController(t *testing.T) { @@ -48,7 +48,7 @@ func TestNewController(t *testing.T) { controller := newTestController(t, ctrl, nil) require.NotNil(t, controller) - assert.Equal(t, consumer.TopicKeyBatched, controller.TopicKey()) + assert.Equal(t, consumer.TopicKeySpeculate, controller.TopicKey()) assert.Equal(t, "orchestrator-speculate", controller.ConsumerGroup()) assert.Equal(t, "speculate", controller.Name()) } @@ -58,19 +58,17 @@ func TestController_Process_Success(t *testing.T) { controller := newTestController(t, ctrl, nil) - request := entity.Request{ - ID: "test-queue/123", - Queue: "test-queue", - Change: entity.Change{URIs: []string{"github://uber/service/pull/456/abc123def"}}, - LandStrategy: entity.RequestLandStrategyRebase, - State: entity.RequestStateNew, - Version: 1, + batch := entity.Batch{ + ID: "test-queue/batch/1", + Queue: "test-queue", + State: entity.BatchStateCreated, + Version: 1, } - payload, err := request.ToBytes() + payload, err := batch.ToBytes() require.NoError(t, err) - msg := queue.NewMessage("test-queue/123", payload, "test-queue", nil) + msg := queue.NewMessage("test-queue/batch/1", payload, "test-queue", nil) delivery := queuemock.NewMockDelivery(ctrl) delivery.EXPECT().Message().Return(msg).AnyTimes() delivery.EXPECT().Attempt().Return(1).AnyTimes() @@ -101,19 +99,17 @@ func TestController_Process_PublishFailure(t *testing.T) { controller := newTestController(t, ctrl, fmt.Errorf("publish failed")) - request := entity.Request{ - ID: "test-queue/123", - Queue: "test-queue", - Change: entity.Change{URIs: []string{"github://uber/service/pull/1/xyz789abc"}}, - LandStrategy: entity.RequestLandStrategyRebase, - State: entity.RequestStateNew, - Version: 1, + batch := entity.Batch{ + ID: "test-queue/batch/1", + Queue: "test-queue", + State: entity.BatchStateCreated, + Version: 1, } - payload, err := request.ToBytes() + payload, err := batch.ToBytes() require.NoError(t, err) - msg := queue.NewMessage(request.ID, payload, request.Queue, nil) + msg := queue.NewMessage(batch.ID, payload, batch.Queue, nil) delivery := queuemock.NewMockDelivery(ctrl) delivery.EXPECT().Message().Return(msg).AnyTimes() delivery.EXPECT().Attempt().Return(1).AnyTimes() diff --git a/orchestrator/controller/validate/BUILD.bazel b/orchestrator/controller/validate/BUILD.bazel new file mode 100644 index 00000000..3635cd57 --- /dev/null +++ b/orchestrator/controller/validate/BUILD.bazel @@ -0,0 +1,37 @@ +load("@rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "validate", + srcs = ["validate.go"], + importpath = "github.com/uber/submitqueue/orchestrator/controller/validate", + visibility = ["//visibility:public"], + deps = [ + "//core/consumer", + "//core/errs", + "//entity", + "//entity/queue", + "//extension/mergechecker", + "@com_github_uber_go_tally_v4//:tally", + "@org_uber_go_zap//:zap", + ], +) + +go_test( + name = "validate_test", + srcs = ["validate_test.go"], + embed = [":validate"], + deps = [ + "//core/consumer", + "//core/errs", + "//entity", + "//entity/queue", + "//extension/mergechecker", + "//extension/mergechecker/mock", + "//extension/queue/mock", + "@com_github_stretchr_testify//assert", + "@com_github_stretchr_testify//require", + "@com_github_uber_go_tally_v4//:tally", + "@org_uber_go_mock//gomock", + "@org_uber_go_zap//zaptest", + ], +) diff --git a/orchestrator/controller/validate/validate.go b/orchestrator/controller/validate/validate.go new file mode 100644 index 00000000..4411baa2 --- /dev/null +++ b/orchestrator/controller/validate/validate.go @@ -0,0 +1,162 @@ +package validate + +import ( + "context" + "fmt" + + "github.com/uber-go/tally/v4" + "github.com/uber/submitqueue/core/consumer" + "github.com/uber/submitqueue/core/errs" + "github.com/uber/submitqueue/entity" + entityqueue "github.com/uber/submitqueue/entity/queue" + "github.com/uber/submitqueue/extension/mergechecker" + "go.uber.org/zap" +) + +// Controller handles validate queue messages. +// It consumes requests, performs validation checks (merge conflicts, duplicate requests, etc.), +// and publishes to the batch stage. Validation logic is extensible to support additional checks. +// Implements consumer.Controller interface for integration with the consumer. +type Controller struct { + logger *zap.SugaredLogger + metricsScope tally.Scope + registry consumer.TopicRegistry + mergeChecker mergechecker.MergeChecker + topicKey consumer.TopicKey + consumerGroup string +} + +// Verify Controller implements consumer.Controller interface at compile time. +var _ consumer.Controller = (*Controller)(nil) + +// NewController creates a new validate controller for the orchestrator. +func NewController( + logger *zap.SugaredLogger, + scope tally.Scope, + registry consumer.TopicRegistry, + mergeChecker mergechecker.MergeChecker, + topicKey consumer.TopicKey, + consumerGroup string, +) *Controller { + return &Controller{ + logger: logger.Named("validate_controller"), + metricsScope: scope.SubScope("validate_controller"), + registry: registry, + mergeChecker: mergeChecker, + topicKey: topicKey, + consumerGroup: consumerGroup, + } +} + +// Process processes a validate delivery from the queue. +// Deserializes the request and publishes to the batch topic. +// Returns nil to ack (success), or error to nack (retry). +func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) error { + c.metricsScope.Counter("received").Inc(1) + + msg := delivery.Message() + + // Deserialize request entity + request, err := entity.RequestFromBytes(msg.Payload) + if err != nil { + c.logger.Errorw("failed to deserialize request", + "message_id", msg.ID, + "partition_key", msg.PartitionKey, + "attempt", delivery.Attempt(), + "error", err, + ) + c.metricsScope.Counter("deserialize_errors").Inc(1) + // Non-retryable: malformed messages will never succeed regardless of retry count + return fmt.Errorf("failed to deserialize request: %w", err) + } + + c.logger.Infow("received validate event", + "request_id", request.ID, + "queue", request.Queue, + "state", string(request.State), + "version", request.Version, + "attempt", delivery.Attempt(), + "partition_key", msg.PartitionKey, + ) + + // Merge conflict check + mergeResult, err := c.mergeChecker.Check(ctx, request.Queue, request.Change) + if err != nil { + c.logger.Errorw("merge check failed", + "request_id", request.ID, + "error", err, + ) + c.metricsScope.Counter("merge_check_errors").Inc(1) + return fmt.Errorf("merge check failed: %w", err) + } + if !mergeResult.Mergeable { + c.logger.Infow("request not mergeable", + "request_id", request.ID, + "queue", request.Queue, + "reason", mergeResult.Reason, + ) + c.metricsScope.Counter("not_mergeable").Inc(1) + return errs.NewUserError(fmt.Errorf("request %s is not mergeable: %s", request.ID, mergeResult.Reason)) + } + + // Publish to batch topic + if err := c.publish(ctx, consumer.TopicKeyBatch, request); err != nil { + c.logger.Errorw("failed to publish output", + "request_id", request.ID, + "topic_key", consumer.TopicKeyBatch, + "error", err, + ) + c.metricsScope.Counter("publish_errors").Inc(1) + return errs.NewRetryableError(fmt.Errorf("failed to publish to batch: %w", err)) + } + + c.logger.Infow("published request to batch", + "request_id", request.ID, + "topic_key", consumer.TopicKeyBatch, + ) + + c.metricsScope.Counter("processed").Inc(1) + + return nil // Success - message will be acked +} + +// publish publishes a request to the specified topic key. +func (c *Controller) publish(ctx context.Context, key consumer.TopicKey, request entity.Request) error { + payload, err := request.ToBytes() + if err != nil { + return fmt.Errorf("failed to serialize request: %w", err) + } + + msg := entityqueue.NewMessage(request.ID, payload, request.Queue, nil) + + q, ok := c.registry.Queue(key) + if !ok { + return fmt.Errorf("no queue registered for topic key %s", key) + } + + topicName, ok := c.registry.TopicName(key) + if !ok { + return fmt.Errorf("no topic name registered for topic key %s", key) + } + + if err := q.Publisher().Publish(ctx, topicName, msg); err != nil { + return fmt.Errorf("failed to publish message: %w", err) + } + + return nil +} + +// Name returns the controller name for logging and metrics. +func (c *Controller) Name() string { + return "validate" +} + +// TopicKey returns the topic key this controller subscribes to. +func (c *Controller) TopicKey() consumer.TopicKey { + return c.topicKey +} + +// ConsumerGroup returns the consumer group for offset tracking. +func (c *Controller) ConsumerGroup() string { + return c.consumerGroup +} diff --git a/orchestrator/controller/validate/validate_test.go b/orchestrator/controller/validate/validate_test.go new file mode 100644 index 00000000..d22f82ed --- /dev/null +++ b/orchestrator/controller/validate/validate_test.go @@ -0,0 +1,201 @@ +package validate + +import ( + "context" + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/uber-go/tally/v4" + "github.com/uber/submitqueue/core/consumer" + "github.com/uber/submitqueue/core/errs" + "github.com/uber/submitqueue/entity" + "github.com/uber/submitqueue/entity/queue" + "github.com/uber/submitqueue/extension/mergechecker" + mergecheckermock "github.com/uber/submitqueue/extension/mergechecker/mock" + queuemock "github.com/uber/submitqueue/extension/queue/mock" + "go.uber.org/mock/gomock" + "go.uber.org/zap/zaptest" +) + +// newMergeableMock returns a mock MergeChecker that always returns mergeable. +func newMergeableMock(ctrl *gomock.Controller) *mergecheckermock.MockMergeChecker { + mc := mergecheckermock.NewMockMergeChecker(ctrl) + mc.EXPECT().Check(gomock.Any(), gomock.Any(), gomock.Any()).Return(mergechecker.Result{Mergeable: true}, nil).AnyTimes() + return mc +} + +// newTestController creates a controller with test dependencies. +func newTestController(t *testing.T, ctrl *gomock.Controller, mc mergechecker.MergeChecker, publishErr error) *Controller { + logger := zaptest.NewLogger(t).Sugar() + scope := tally.NoopScope + + mockPub := queuemock.NewMockPublisher(ctrl) + mockPub.EXPECT().Publish(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( + func(ctx context.Context, topic string, msg queue.Message) error { + return publishErr + }, + ).AnyTimes() + + mockQ := queuemock.NewMockQueue(ctrl) + mockQ.EXPECT().Publisher().Return(mockPub).AnyTimes() + + registry, err := consumer.NewTopicRegistry( + []consumer.TopicConfig{{Key: consumer.TopicKeyBatch, Name: "batch", Queue: mockQ}}, + ) + require.NoError(t, err) + + return NewController(logger, scope, registry, mc, consumer.TopicKeyValidate, "orchestrator-validate") +} + +func TestNewController(t *testing.T) { + ctrl := gomock.NewController(t) + mc := newMergeableMock(ctrl) + controller := newTestController(t, ctrl, mc, nil) + + require.NotNil(t, controller) + assert.Equal(t, consumer.TopicKeyValidate, controller.TopicKey()) + assert.Equal(t, "orchestrator-validate", controller.ConsumerGroup()) + assert.Equal(t, "validate", controller.Name()) +} + +func TestController_Process_Success(t *testing.T) { + ctrl := gomock.NewController(t) + mc := newMergeableMock(ctrl) + + controller := newTestController(t, ctrl, mc, nil) + + request := entity.Request{ + ID: "test-queue/123", + Queue: "test-queue", + Change: entity.Change{URIs: []string{"github://uber/service/pull/456/abc123def"}}, + LandStrategy: entity.RequestLandStrategyRebase, + State: entity.RequestStateNew, + Version: 1, + } + + payload, err := request.ToBytes() + require.NoError(t, err) + + msg := queue.NewMessage("test-queue/123", payload, "test-queue", nil) + delivery := queuemock.NewMockDelivery(ctrl) + delivery.EXPECT().Message().Return(msg).AnyTimes() + delivery.EXPECT().Attempt().Return(1).AnyTimes() + + err = controller.Process(context.Background(), delivery) + require.NoError(t, err) +} + +func TestController_Process_InvalidJSON(t *testing.T) { + ctrl := gomock.NewController(t) + mc := newMergeableMock(ctrl) + + controller := newTestController(t, ctrl, mc, nil) + + invalidPayload := []byte(`{"invalid": json"}`) + msg := queue.NewMessage("invalid-msg", invalidPayload, "partition1", nil) + delivery := queuemock.NewMockDelivery(ctrl) + delivery.EXPECT().Message().Return(msg).AnyTimes() + delivery.EXPECT().Attempt().Return(1).AnyTimes() + + err := controller.Process(context.Background(), delivery) + + require.Error(t, err) + assert.False(t, errs.IsRetryable(err)) +} + +func TestController_Process_PublishFailure(t *testing.T) { + ctrl := gomock.NewController(t) + mc := newMergeableMock(ctrl) + + controller := newTestController(t, ctrl, mc, fmt.Errorf("publish failed")) + + request := entity.Request{ + ID: "test-queue/123", + Queue: "test-queue", + Change: entity.Change{URIs: []string{"github://uber/service/pull/1/xyz789abc"}}, + LandStrategy: entity.RequestLandStrategyRebase, + State: entity.RequestStateNew, + Version: 1, + } + + payload, err := request.ToBytes() + require.NoError(t, err) + + msg := queue.NewMessage(request.ID, payload, request.Queue, nil) + delivery := queuemock.NewMockDelivery(ctrl) + delivery.EXPECT().Message().Return(msg).AnyTimes() + delivery.EXPECT().Attempt().Return(1).AnyTimes() + + err = controller.Process(context.Background(), delivery) + assert.Error(t, err) +} + +func TestController_InterfaceImplementation(t *testing.T) { + ctrl := gomock.NewController(t) + mc := newMergeableMock(ctrl) + controller := newTestController(t, ctrl, mc, nil) + + var _ consumer.Controller = controller +} + +func TestController_Process_NotMergeable(t *testing.T) { + ctrl := gomock.NewController(t) + + mc := mergecheckermock.NewMockMergeChecker(ctrl) + mc.EXPECT().Check(gomock.Any(), gomock.Any(), gomock.Any()).Return(mergechecker.Result{Mergeable: false}, nil) + + controller := newTestController(t, ctrl, mc, nil) + + request := entity.Request{ + ID: "test-queue/123", + Queue: "test-queue", + Change: entity.Change{URIs: []string{"github://uber/repo/1/abc123"}}, + LandStrategy: entity.RequestLandStrategyRebase, + State: entity.RequestStateNew, + Version: 1, + } + + payload, err := request.ToBytes() + require.NoError(t, err) + + msg := queue.NewMessage(request.ID, payload, request.Queue, nil) + delivery := queuemock.NewMockDelivery(ctrl) + delivery.EXPECT().Message().Return(msg).AnyTimes() + delivery.EXPECT().Attempt().Return(1).AnyTimes() + + err = controller.Process(context.Background(), delivery) + require.Error(t, err) + assert.False(t, errs.IsRetryable(err)) +} + +func TestController_Process_MergeCheckError(t *testing.T) { + ctrl := gomock.NewController(t) + + mc := mergecheckermock.NewMockMergeChecker(ctrl) + mc.EXPECT().Check(gomock.Any(), gomock.Any(), gomock.Any()).Return(mergechecker.Result{}, fmt.Errorf("merge check failed")) + + controller := newTestController(t, ctrl, mc, nil) + + request := entity.Request{ + ID: "test-queue/123", + Queue: "test-queue", + Change: entity.Change{URIs: []string{"github://uber/repo/1/abc123"}}, + LandStrategy: entity.RequestLandStrategyRebase, + State: entity.RequestStateNew, + Version: 1, + } + + payload, err := request.ToBytes() + require.NoError(t, err) + + msg := queue.NewMessage(request.ID, payload, request.Queue, nil) + delivery := queuemock.NewMockDelivery(ctrl) + delivery.EXPECT().Message().Return(msg).AnyTimes() + delivery.EXPECT().Attempt().Return(1).AnyTimes() + + err = controller.Process(context.Background(), delivery) + require.Error(t, err) + assert.False(t, errs.IsRetryable(err)) +}