diff --git a/core/consumer/README.md b/core/consumer/README.md index a1d72329..ff0ebac2 100644 --- a/core/consumer/README.md +++ b/core/consumer/README.md @@ -26,7 +26,7 @@ The top-level orchestrator. Register controllers, start consuming, and stop grac ```go registry, _ := consumer.NewTopicRegistry([]consumer.TopicConfig{ - {Key: consumer.TopicKeyRequest, Name: "request", Queue: q, Subscription: subConfig}, + {Key: consumer.TopicKeyStart, Name: "request", Queue: q, Subscription: subConfig}, }) c := consumer.New(logger, scope, registry) @@ -64,7 +64,7 @@ The `TopicRegistry` maps topic keys to queue backends, topic names, and subscrip ```go registry, _ := consumer.NewTopicRegistry([]consumer.TopicConfig{ { - Key: consumer.TopicKeyRequest, + Key: consumer.TopicKeyStart, Name: "request", Queue: q, Subscription: extqueue.DefaultSubscriptionConfig("worker-1", "orchestrator"), @@ -78,7 +78,7 @@ registry, _ := consumer.NewTopicRegistry([]consumer.TopicConfig{ }) ``` -**Topic keys** are fixed identifiers for pipeline stages (e.g., `TopicKeyRequest`, `TopicKeyBuild`). The actual queue topic name is configured separately, so library consumers can use their own naming conventions. +**Topic keys** are fixed identifiers for pipeline stages (e.g., `TopicKeyStart`, `TopicKeyBuild`). The actual queue topic name is configured separately, so library consumers can use their own naming conventions. ## Error Handling diff --git a/core/consumer/consumer_test.go b/core/consumer/consumer_test.go index ad07c1bf..31227717 100644 --- a/core/consumer/consumer_test.go +++ b/core/consumer/consumer_test.go @@ -103,7 +103,7 @@ func TestConsumer_Register(t *testing.T) { c := consumer.New(logger, tally.NoopScope, reg) handler1 := consumermock.NewMockController(ctrl) - setupController(handler1, "handler1", consumer.TopicKeyRequest, "group1", nil) + setupController(handler1, "handler1", consumer.TopicKeyStart, "group1", nil) handler2 := consumermock.NewMockController(ctrl) setupController(handler2, "handler2", consumer.TopicKey("other-topic"), "group2", nil) @@ -123,10 +123,10 @@ func TestConsumer_Register_DuplicateTopic(t *testing.T) { c := consumer.New(logger, tally.NoopScope, reg) handler1 := consumermock.NewMockController(ctrl) - setupController(handler1, "handler1", consumer.TopicKeyRequest, "group1", nil) + setupController(handler1, "handler1", consumer.TopicKeyStart, "group1", nil) handler2 := consumermock.NewMockController(ctrl) - setupController(handler2, "handler2", consumer.TopicKeyRequest, "group2", nil) + setupController(handler2, "handler2", consumer.TopicKeyStart, "group2", nil) err := c.Register(handler1) require.NoError(t, err) @@ -146,7 +146,7 @@ func TestConsumer_Register_AfterStop(t *testing.T) { require.NoError(t, err) handler := consumermock.NewMockController(ctrl) - setupController(handler, "handler1", consumer.TopicKeyRequest, "group1", nil) + setupController(handler, "handler1", consumer.TopicKeyStart, "group1", nil) err = c.Register(handler) assert.Error(t, err) @@ -170,7 +170,7 @@ func TestConsumer_Start_AfterStop(t *testing.T) { c := consumer.New(logger, tally.NoopScope, reg) handler := consumermock.NewMockController(ctrl) - setupController(handler, "handler1", consumer.TopicKeyRequest, "group1", nil) + setupController(handler, "handler1", consumer.TopicKeyStart, "group1", nil) err := c.Register(handler) require.NoError(t, err) @@ -189,14 +189,14 @@ func TestConsumer_Start_MissingSubscriptionConfig(t *testing.T) { mockQ := queuemock.NewMockQueue(ctrl) // Registry has queue but no subscription config reg, err := consumer.NewTopicRegistry( - []consumer.TopicConfig{{Key: consumer.TopicKeyRequest, Name: "request", Queue: mockQ}}, + []consumer.TopicConfig{{Key: consumer.TopicKeyStart, Name: "request", Queue: mockQ}}, ) require.NoError(t, err) c := consumer.New(logger, tally.NoopScope, reg) handler := consumermock.NewMockController(ctrl) - setupController(handler, "handler", consumer.TopicKeyRequest, "group", nil) + setupController(handler, "handler", consumer.TopicKeyStart, "group", nil) err = c.Register(handler) require.NoError(t, err) @@ -217,12 +217,12 @@ func TestConsumer_Start_SubscribeFailure(t *testing.T) { mockQ := queuemock.NewMockQueue(ctrl) mockQ.EXPECT().Subscriber().Return(mockSub) - reg := newRegistry(t, mockQ, consumer.TopicKeyRequest, "group") + reg := newRegistry(t, mockQ, consumer.TopicKeyStart, "group") c := consumer.New(logger, tally.NoopScope, reg) handler := consumermock.NewMockController(ctrl) - setupController(handler, "handler", consumer.TopicKeyRequest, "group", nil) + setupController(handler, "handler", consumer.TopicKeyStart, "group", nil) err := c.Register(handler) require.NoError(t, err) @@ -243,13 +243,13 @@ func TestConsumer_ProcessDelivery_Success(t *testing.T) { mockQ := queuemock.NewMockQueue(ctrl) mockQ.EXPECT().Subscriber().Return(mockSub) - reg := newRegistry(t, mockQ, consumer.TopicKeyRequest, "test-group") + reg := newRegistry(t, mockQ, consumer.TopicKeyStart, "test-group") c := consumer.New(logger, tally.NoopScope, reg) handledMsg := "" handler := consumermock.NewMockController(ctrl) - setupController(handler, "test-handler", consumer.TopicKeyRequest, "test-group", + setupController(handler, "test-handler", consumer.TopicKeyStart, "test-group", func(ctx context.Context, delivery consumer.Delivery) error { handledMsg = delivery.Message().ID return nil @@ -289,12 +289,12 @@ func TestConsumer_ProcessDelivery_Error(t *testing.T) { mockQ := queuemock.NewMockQueue(ctrl) mockQ.EXPECT().Subscriber().Return(mockSub) - reg := newRegistry(t, mockQ, consumer.TopicKeyRequest, "test-group") + reg := newRegistry(t, mockQ, consumer.TopicKeyStart, "test-group") c := consumer.New(logger, tally.NoopScope, reg) handler := consumermock.NewMockController(ctrl) - setupController(handler, "test-handler", consumer.TopicKeyRequest, "test-group", + setupController(handler, "test-handler", consumer.TopicKeyStart, "test-group", func(ctx context.Context, delivery consumer.Delivery) error { return errs.NewRetryableError(fmt.Errorf("processing failed")) }, @@ -331,12 +331,12 @@ func TestConsumer_ProcessDelivery_NonRetryableError(t *testing.T) { mockQ := queuemock.NewMockQueue(ctrl) mockQ.EXPECT().Subscriber().Return(mockSub) - reg := newRegistry(t, mockQ, consumer.TopicKeyRequest, "test-group") + reg := newRegistry(t, mockQ, consumer.TopicKeyStart, "test-group") c := consumer.New(logger, tally.NoopScope, reg) handler := consumermock.NewMockController(ctrl) - setupController(handler, "test-handler", consumer.TopicKeyRequest, "test-group", + setupController(handler, "test-handler", consumer.TopicKeyStart, "test-group", func(ctx context.Context, delivery consumer.Delivery) error { return fmt.Errorf("bad payload") }, @@ -382,12 +382,12 @@ func TestConsumer_Stop(t *testing.T) { mockQ := queuemock.NewMockQueue(ctrl) mockQ.EXPECT().Subscriber().Return(mockSub) - reg := newRegistry(t, mockQ, consumer.TopicKeyRequest, "test-group") + reg := newRegistry(t, mockQ, consumer.TopicKeyStart, "test-group") c := consumer.New(logger, tally.NoopScope, reg) handler := consumermock.NewMockController(ctrl) - setupController(handler, "test-handler", consumer.TopicKeyRequest, "test-group", nil) + setupController(handler, "test-handler", consumer.TopicKeyStart, "test-group", nil) err := c.Register(handler) require.NoError(t, err) @@ -440,12 +440,12 @@ func TestConsumer_ObservabilityTags(t *testing.T) { mockQ := queuemock.NewMockQueue(ctrl) mockQ.EXPECT().Subscriber().Return(mockSub) - reg := newRegistry(t, mockQ, consumer.TopicKeyRequest, "test-group") + reg := newRegistry(t, mockQ, consumer.TopicKeyStart, "test-group") testC := consumer.New(logger, testScope, reg) handler := consumermock.NewMockController(ctrl) - setupController(handler, "test-handler", consumer.TopicKeyRequest, "test-group", + setupController(handler, "test-handler", consumer.TopicKeyStart, "test-group", func(ctx context.Context, delivery consumer.Delivery) error { return tt.handlerError }, @@ -515,12 +515,12 @@ func TestConsumer_AckNackLatencyTracking(t *testing.T) { mockQ := queuemock.NewMockQueue(ctrl) mockQ.EXPECT().Subscriber().Return(mockSub) - reg := newRegistry(t, mockQ, consumer.TopicKeyRequest, "test-group") + reg := newRegistry(t, mockQ, consumer.TopicKeyStart, "test-group") c := consumer.New(logger, scope, reg) handler := consumermock.NewMockController(ctrl) - setupController(handler, "test-handler", consumer.TopicKeyRequest, "test-group", + setupController(handler, "test-handler", consumer.TopicKeyStart, "test-group", func(ctx context.Context, delivery consumer.Delivery) error { return nil }, ) @@ -560,12 +560,12 @@ func TestConsumer_ErrorMetrics(t *testing.T) { mockQ := queuemock.NewMockQueue(ctrl) mockQ.EXPECT().Subscriber().Return(mockSub) - reg := newRegistry(t, mockQ, consumer.TopicKeyRequest, "test-group") + reg := newRegistry(t, mockQ, consumer.TopicKeyStart, "test-group") c := consumer.New(logger, scope, reg) handler := consumermock.NewMockController(ctrl) - setupController(handler, "test-handler", consumer.TopicKeyRequest, "test-group", + setupController(handler, "test-handler", consumer.TopicKeyStart, "test-group", func(ctx context.Context, delivery consumer.Delivery) error { return errs.NewRetryableError(fmt.Errorf("processing failed")) }, @@ -616,7 +616,7 @@ func TestConsumer_PerPartitionProcessing(t *testing.T) { mockQ := queuemock.NewMockQueue(ctrl) mockQ.EXPECT().Subscriber().Return(mockSub) - reg := newRegistry(t, mockQ, consumer.TopicKeyRequest, "test-group") + reg := newRegistry(t, mockQ, consumer.TopicKeyStart, "test-group") c := consumer.New(logger, tally.NoopScope, reg) @@ -626,7 +626,7 @@ func TestConsumer_PerPartitionProcessing(t *testing.T) { var partBProcessed atomic.Bool handler := consumermock.NewMockController(ctrl) - setupController(handler, "test-handler", consumer.TopicKeyRequest, "test-group", + setupController(handler, "test-handler", consumer.TopicKeyStart, "test-group", func(ctx context.Context, delivery consumer.Delivery) error { pk := delivery.Message().PartitionKey if pk == "partition-a" { @@ -701,7 +701,7 @@ func TestConsumer_PartitionOrdering(t *testing.T) { mockQ := queuemock.NewMockQueue(ctrl) mockQ.EXPECT().Subscriber().Return(mockSub) - reg := newRegistry(t, mockQ, consumer.TopicKeyRequest, "test-group") + reg := newRegistry(t, mockQ, consumer.TopicKeyStart, "test-group") c := consumer.New(logger, tally.NoopScope, reg) @@ -712,7 +712,7 @@ func TestConsumer_PartitionOrdering(t *testing.T) { allDone := make(chan struct{}) handler := consumermock.NewMockController(ctrl) - setupController(handler, "test-handler", consumer.TopicKeyRequest, "test-group", + setupController(handler, "test-handler", consumer.TopicKeyStart, "test-group", func(ctx context.Context, delivery consumer.Delivery) error { mu.Lock() order = append(order, delivery.Message().ID) @@ -770,14 +770,14 @@ func TestConsumer_PartitionWorkerCleanup(t *testing.T) { mockQ := queuemock.NewMockQueue(ctrl) mockQ.EXPECT().Subscriber().Return(mockSub) - reg := newRegistry(t, mockQ, consumer.TopicKeyRequest, "test-group") + reg := newRegistry(t, mockQ, consumer.TopicKeyStart, "test-group") c := consumer.New(logger, tally.NoopScope, reg) processedCount := int64(0) handler := consumermock.NewMockController(ctrl) - setupController(handler, "test-handler", consumer.TopicKeyRequest, "test-group", + setupController(handler, "test-handler", consumer.TopicKeyStart, "test-group", func(ctx context.Context, delivery consumer.Delivery) error { atomic.AddInt64(&processedCount, 1) return nil diff --git a/core/consumer/registry.go b/core/consumer/registry.go index aca84f2d..64955337 100644 --- a/core/consumer/registry.go +++ b/core/consumer/registry.go @@ -28,8 +28,8 @@ import ( type TopicKey string const ( - // TopicKeyRequest is the pipeline stage where new requests arrive from the gateway. - TopicKeyRequest TopicKey = "request" + // TopicKeyStart is the pipeline stage where new requests arrive from the gateway. + TopicKeyStart TopicKey = "start" // TopicKeyValidate is the pipeline stage where requests are published for validation. TopicKeyValidate TopicKey = "validate" // TopicKeyBatch is the pipeline stage where validated requests are published for batching. diff --git a/core/consumer/registry_test.go b/core/consumer/registry_test.go index 6fbe64a9..a66d644c 100644 --- a/core/consumer/registry_test.go +++ b/core/consumer/registry_test.go @@ -32,7 +32,7 @@ func TestNewTopicRegistry(t *testing.T) { registry, err := consumer.NewTopicRegistry( []consumer.TopicConfig{ { - Key: consumer.TopicKeyRequest, + Key: consumer.TopicKeyStart, Name: "request", Queue: mockQ, Subscription: extqueue.DefaultSubscriptionConfig( @@ -43,15 +43,15 @@ func TestNewTopicRegistry(t *testing.T) { ) require.NoError(t, err) - q, ok := registry.Queue(consumer.TopicKeyRequest) + q, ok := registry.Queue(consumer.TopicKeyStart) require.True(t, ok) assert.Equal(t, mockQ, q) - name, ok := registry.TopicName(consumer.TopicKeyRequest) + name, ok := registry.TopicName(consumer.TopicKeyStart) require.True(t, ok) assert.Equal(t, "request", name) - cfg, ok := registry.SubscriptionConfig(consumer.TopicKeyRequest, "group-a") + cfg, ok := registry.SubscriptionConfig(consumer.TopicKeyStart, "group-a") require.True(t, ok) assert.Equal(t, "group-a", cfg.ConsumerGroup) } @@ -87,7 +87,7 @@ func TestNewTopicRegistry_InvalidTopicName(t *testing.T) { t.Run(tt.name, func(t *testing.T) { _, err := consumer.NewTopicRegistry( []consumer.TopicConfig{ - {Key: consumer.TopicKeyRequest, Name: tt.topicName}, + {Key: consumer.TopicKeyStart, Name: tt.topicName}, }, ) require.Error(t, err) @@ -108,14 +108,14 @@ func TestTopicRegistry_SubscriptionConfig(t *testing.T) { name: "found group-a", configs: []consumer.TopicConfig{ { - Key: consumer.TopicKeyRequest, + Key: consumer.TopicKeyStart, Name: "request", Subscription: extqueue.DefaultSubscriptionConfig( "worker-1", "group-a", ), }, }, - lookupKey: consumer.TopicKeyRequest, + lookupKey: consumer.TopicKeyStart, lookupGroup: "group-a", expectFound: true, expectedGroup: "group-a", @@ -124,14 +124,14 @@ func TestTopicRegistry_SubscriptionConfig(t *testing.T) { name: "not found by group", configs: []consumer.TopicConfig{ { - Key: consumer.TopicKeyRequest, + Key: consumer.TopicKeyStart, Name: "request", Subscription: extqueue.DefaultSubscriptionConfig( "worker-1", "group-a", ), }, }, - lookupKey: consumer.TopicKeyRequest, + lookupKey: consumer.TopicKeyStart, lookupGroup: "nonexistent", expectFound: false, }, @@ -139,7 +139,7 @@ func TestTopicRegistry_SubscriptionConfig(t *testing.T) { name: "not found by topic key", configs: []consumer.TopicConfig{ { - Key: consumer.TopicKeyRequest, + Key: consumer.TopicKeyStart, Name: "request", Subscription: extqueue.DefaultSubscriptionConfig( "worker-1", "group-a", @@ -175,13 +175,13 @@ func TestTopicRegistry_Queue_PerTopic(t *testing.T) { registry, err := consumer.NewTopicRegistry( []consumer.TopicConfig{ - {Key: consumer.TopicKeyRequest, Name: "request", Queue: mockQ1}, + {Key: consumer.TopicKeyStart, Name: "request", Queue: mockQ1}, {Key: consumer.TopicKeyValidate, Name: "validate", Queue: mockQ2}, }, ) require.NoError(t, err) - q1, ok := registry.Queue(consumer.TopicKeyRequest) + q1, ok := registry.Queue(consumer.TopicKeyStart) require.True(t, ok) assert.Equal(t, mockQ1, q1) @@ -201,8 +201,8 @@ func TestTopicKey_String(t *testing.T) { }{ { name: "predefined topic key", - key: consumer.TopicKeyRequest, - expected: "request", + key: consumer.TopicKeyStart, + expected: "start", }, { name: "custom topic key", @@ -224,12 +224,12 @@ func TestTopicRegistry_TopicName(t *testing.T) { registry, err := consumer.NewTopicRegistry( []consumer.TopicConfig{ - {Key: consumer.TopicKeyRequest, Name: "my-custom-request", Queue: mockQ}, + {Key: consumer.TopicKeyStart, Name: "my-custom-request", Queue: mockQ}, }, ) require.NoError(t, err) - name, ok := registry.TopicName(consumer.TopicKeyRequest) + name, ok := registry.TopicName(consumer.TopicKeyStart) require.True(t, ok) assert.Equal(t, "my-custom-request", name) diff --git a/core/request/request_test.go b/core/request/request_test.go index 075fabad..3b84b7e4 100644 --- a/core/request/request_test.go +++ b/core/request/request_test.go @@ -37,14 +37,14 @@ func TestGetCurrentStateFromRequestLog(t *testing.T) { { name: "single record", logs: []entity.RequestLog{ - {RequestID: "q/1", TimestampMs: 1000, Status: entity.RequestStatusNew, RequestVersion: 1, LastError: "", Metadata: map[string]string{}}, + {RequestID: "q/1", TimestampMs: 1000, Status: entity.RequestStatusStarted, RequestVersion: 1, LastError: "", Metadata: map[string]string{}}, }, - expected: CurrentState{Status: entity.RequestStatusNew, LastError: "", Metadata: map[string]string{}}, + expected: CurrentState{Status: entity.RequestStatusStarted, LastError: "", Metadata: map[string]string{}}, }, { name: "terminal status wins over later non-terminal", logs: []entity.RequestLog{ - {RequestID: "q/1", TimestampMs: 1000, Status: entity.RequestStatusNew, RequestVersion: 1, LastError: "", Metadata: map[string]string{}}, + {RequestID: "q/1", TimestampMs: 1000, Status: entity.RequestStatusStarted, RequestVersion: 1, LastError: "", Metadata: map[string]string{}}, {RequestID: "q/1", TimestampMs: 2000, Status: entity.RequestStatusLanded, RequestVersion: 3, LastError: "", Metadata: map[string]string{"batch": "b1"}}, {RequestID: "q/1", TimestampMs: 3000, Status: entity.RequestStatusProcessing, RequestVersion: 0, LastError: "", Metadata: map[string]string{}}, }, @@ -53,7 +53,7 @@ func TestGetCurrentStateFromRequestLog(t *testing.T) { { name: "terminal error status with last error", logs: []entity.RequestLog{ - {RequestID: "q/1", TimestampMs: 1000, Status: entity.RequestStatusNew, RequestVersion: 1, LastError: "", Metadata: map[string]string{}}, + {RequestID: "q/1", TimestampMs: 1000, Status: entity.RequestStatusStarted, RequestVersion: 1, LastError: "", Metadata: map[string]string{}}, {RequestID: "q/1", TimestampMs: 2000, Status: entity.RequestStatusError, RequestVersion: 4, LastError: "merge conflict", Metadata: map[string]string{"step": "merge"}}, }, expected: CurrentState{Status: entity.RequestStatusError, LastError: "merge conflict", Metadata: map[string]string{"step": "merge"}}, @@ -86,7 +86,7 @@ func TestGetCurrentStateFromRequestLog(t *testing.T) { { name: "no terminal records falls back to latest timestamp", logs: []entity.RequestLog{ - {RequestID: "q/1", TimestampMs: 1000, Status: entity.RequestStatusNew, RequestVersion: 1, LastError: "", Metadata: map[string]string{}}, + {RequestID: "q/1", TimestampMs: 1000, Status: entity.RequestStatusStarted, RequestVersion: 1, LastError: "", Metadata: map[string]string{}}, {RequestID: "q/1", TimestampMs: 3000, Status: entity.RequestStatusValidated, RequestVersion: 2, LastError: "", Metadata: map[string]string{}}, {RequestID: "q/1", TimestampMs: 2000, Status: entity.RequestStatusProcessing, RequestVersion: 0, LastError: "", Metadata: map[string]string{}}, }, diff --git a/entity/BUILD.bazel b/entity/BUILD.bazel index de5f2539..09c5a360 100644 --- a/entity/BUILD.bazel +++ b/entity/BUILD.bazel @@ -7,6 +7,7 @@ go_library( "batch_dependent.go", "build.go", "change_provider.go", + "land_request.go", "queue_config.go", "request.go", "request_log.go", @@ -21,6 +22,7 @@ go_test( srcs = [ "batch_test.go", "build_test.go", + "land_request_test.go", "request_log_test.go", "request_test.go", ], diff --git a/entity/land_request.go b/entity/land_request.go new file mode 100644 index 00000000..c21b4700 --- /dev/null +++ b/entity/land_request.go @@ -0,0 +1,43 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package entity + +import "encoding/json" + +// LandRequest represents the gateway-owned fields of a land request sent over the queue +// to the orchestrator. It contains only the validated inputs and generated ID — the orchestrator +// is responsible for constructing the full Request entity with state machine fields. +type LandRequest struct { + // ID is the globally unique identifier for the land request. Format: "/". + ID string `json:"id"` + // Queue is the name of the queue processing the land request. + Queue string `json:"queue"` + // Change is the set of code changes to land. + Change Change `json:"change"` + // LandStrategy is the source control integration strategy to use for this land operation. + LandStrategy RequestLandStrategy `json:"land_strategy"` +} + +// ToBytes serializes the LandRequest to JSON bytes for queue message payload. +func (r LandRequest) ToBytes() ([]byte, error) { + return json.Marshal(r) +} + +// LandRequestFromBytes deserializes a LandRequest from JSON bytes. +func LandRequestFromBytes(data []byte) (LandRequest, error) { + var req LandRequest + err := json.Unmarshal(data, &req) + return req, err +} diff --git a/entity/land_request_test.go b/entity/land_request_test.go new file mode 100644 index 00000000..5c57e034 --- /dev/null +++ b/entity/land_request_test.go @@ -0,0 +1,133 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package entity + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestLandRequest_ToBytes(t *testing.T) { + req := LandRequest{ + ID: "test-queue/123", + Queue: "test-queue", + Change: Change{URIs: []string{"github://uber/submitqueue/pull/456/abc123def", "github://uber/submitqueue/pull/789/def456abc"}}, + LandStrategy: RequestLandStrategyRebase, + } + + data, err := req.ToBytes() + require.NoError(t, err) + assert.NotEmpty(t, data) + + // Verify JSON contains expected fields + jsonStr := string(data) + assert.Contains(t, jsonStr, "test-queue/123") + assert.Contains(t, jsonStr, "github://uber/submitqueue/pull/456/abc123def") + assert.Contains(t, jsonStr, "rebase") +} + +func TestLandRequestFromBytes(t *testing.T) { + original := LandRequest{ + ID: "my-queue/999", + Queue: "my-queue", + Change: Change{URIs: []string{"code.uber.internal.com/D111"}}, + LandStrategy: RequestLandStrategyMerge, + } + + // Serialize + data, err := original.ToBytes() + require.NoError(t, err) + + // Deserialize + deserialized, err := LandRequestFromBytes(data) + require.NoError(t, err) + + // Verify all fields match + assert.Equal(t, original.ID, deserialized.ID) + assert.Equal(t, original.Queue, deserialized.Queue) + assert.Equal(t, original.Change.URIs, deserialized.Change.URIs) + assert.Equal(t, original.LandStrategy, deserialized.LandStrategy) +} + +func TestLandRequestFromBytes_InvalidJSON(t *testing.T) { + invalidJSON := []byte(`{"invalid": json"}`) + + _, err := LandRequestFromBytes(invalidJSON) + assert.Error(t, err) +} + +func TestLandRequestFromBytes_EmptyData(t *testing.T) { + emptyJSON := []byte(`{}`) + + req, err := LandRequestFromBytes(emptyJSON) + require.NoError(t, err) + + // Empty JSON should deserialize with zero values + assert.Empty(t, req.ID) + assert.Empty(t, req.Queue) + assert.Equal(t, RequestLandStrategyUnknown, req.LandStrategy) +} + +func TestLandRequest_SerializationRoundTrip(t *testing.T) { + tests := []struct { + name string + req LandRequest + }{ + { + name: "github stacked diff", + req: LandRequest{ + ID: "queue1/100", + Queue: "queue1", + Change: Change{URIs: []string{"github://uber/repo-a/pull/101/aaa111", "github://uber/repo-a/pull/102/bbb222", "github://uber/repo-a/pull/103/ccc333"}}, + LandStrategy: RequestLandStrategySquashRebase, + }, + }, + { + name: "phabricator revision", + req: LandRequest{ + ID: "queue2/200", + Queue: "queue2", + Change: Change{URIs: []string{"code.uber.internal.com/D12345"}}, + LandStrategy: RequestLandStrategyRebase, + }, + }, + { + name: "github enterprise request", + req: LandRequest{ + ID: "queue3/300", + Queue: "queue3", + Change: Change{URIs: []string{"github.uber.com/internal/service/999/deadbeef12"}}, + LandStrategy: RequestLandStrategyMerge, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Serialize + data, err := tt.req.ToBytes() + require.NoError(t, err) + + // Deserialize + deserialized, err := LandRequestFromBytes(data) + require.NoError(t, err) + + // Verify complete equality + assert.Equal(t, tt.req, deserialized) + }) + } +} diff --git a/entity/request.go b/entity/request.go index 4a299cd1..5054932b 100644 --- a/entity/request.go +++ b/entity/request.go @@ -36,8 +36,8 @@ type RequestState string const ( // RequestStateUnknown is the unreachable state. It is set by default when the structure is initialized. It should never be seen in the system. RequestStateUnknown RequestState = "" - // RequestStateNew is the initial state of a land request. It is confirmed by the system but the processing is not started yet. - RequestStateNew RequestState = "new" + // RequestStateStarted is the initial state of a land request. It is confirmed by the system but the processing is not started yet. + RequestStateStarted RequestState = "started" // RequestStateValidated indicates that the request has been validated (duplicate check, merge check etc.) successfully. RequestStateValidated RequestState = "validated" // RequestStateProcessing is the state of a land request that is being processed. diff --git a/entity/request_log.go b/entity/request_log.go index 285b2db0..7d85ee9f 100644 --- a/entity/request_log.go +++ b/entity/request_log.go @@ -34,8 +34,8 @@ const ( // RequestStatusAccepted indicates that the request has been accepted by the system. Typically a gateway service will set this status when the land request is received and persisted to the logging database. RequestStatusAccepted RequestStatus = "accepted" - // RequestStatusNew is the initial status of a request. It corresponds to the RequestStateNew state and typically set by the orchestrator service when the request is received and persisted to the operating database. - RequestStatusNew RequestStatus = "new" + // RequestStatusStarted is the initial status of a request. It corresponds to the RequestStateStarted state and typically set by the orchestrator service when the request is received and persisted to the operating database. + RequestStatusStarted RequestStatus = "started" // RequestStatusValidating indicates that the request is currently being validated (e.g., duplicate check, merge check, etc.). RequestStatusValidating RequestStatus = "validating" diff --git a/entity/request_log_test.go b/entity/request_log_test.go index 3283e2ea..602bf7e2 100644 --- a/entity/request_log_test.go +++ b/entity/request_log_test.go @@ -22,7 +22,7 @@ import ( ) func TestNewRequestLog_NilMetadata(t *testing.T) { - log := NewRequestLog("queue1/100", RequestStatusNew, 0, "", nil) + log := NewRequestLog("queue1/100", RequestStatusStarted, 0, "", nil) assert.NotNil(t, log.Metadata) assert.Empty(t, log.Metadata) @@ -32,7 +32,7 @@ func TestRequestLog_ToBytes(t *testing.T) { log := RequestLog{ RequestID: "test-queue/123", TimestampMs: 1709568000000, - Status: RequestStatusNew, + Status: RequestStatusStarted, RequestVersion: 1, LastError: "", Metadata: map[string]string{"source": "gateway"}, @@ -126,7 +126,7 @@ func TestRequestLog_SerializationRoundTrip(t *testing.T) { log: RequestLog{ RequestID: "queue3/300", TimestampMs: 1709568002000, - Status: RequestStatusNew, + Status: RequestStatusStarted, RequestVersion: 0, LastError: "", Metadata: map[string]string{"key": "value"}, diff --git a/entity/request_test.go b/entity/request_test.go index 14fbc79b..f19b5887 100644 --- a/entity/request_test.go +++ b/entity/request_test.go @@ -27,7 +27,7 @@ func TestRequest_ToBytes(t *testing.T) { Queue: "test-queue", Change: Change{URIs: []string{"github://uber/submitqueue/pull/456/abc123def", "github://uber/submitqueue/pull/789/def456abc"}}, LandStrategy: RequestLandStrategyRebase, - State: RequestStateNew, + State: RequestStateStarted, Version: 1, } @@ -40,7 +40,7 @@ func TestRequest_ToBytes(t *testing.T) { assert.Contains(t, jsonStr, "test-queue/123") assert.Contains(t, jsonStr, "github://uber/submitqueue/pull/456/abc123def") assert.Contains(t, jsonStr, "rebase") - assert.Contains(t, jsonStr, "new") + assert.Contains(t, jsonStr, "started") } func TestRequestFromBytes(t *testing.T) { @@ -114,7 +114,7 @@ func TestRequest_SerializationRoundTrip(t *testing.T) { Queue: "queue2", Change: Change{URIs: []string{"code.uber.internal.com/D12345"}}, LandStrategy: RequestLandStrategyRebase, - State: RequestStateNew, + State: RequestStateStarted, Version: 1, }, }, diff --git a/example/server/orchestrator/BUILD.bazel b/example/server/orchestrator/BUILD.bazel index 9ef77704..4a89170f 100644 --- a/example/server/orchestrator/BUILD.bazel +++ b/example/server/orchestrator/BUILD.bazel @@ -27,9 +27,9 @@ go_library( "//orchestrator/controller/conclude", "//orchestrator/controller/log", "//orchestrator/controller/merge", - "//orchestrator/controller/request", "//orchestrator/controller/score", "//orchestrator/controller/speculate", + "//orchestrator/controller/start", "//orchestrator/controller/validate", "//orchestrator/protopb", "@com_github_go_sql_driver_mysql//:mysql", diff --git a/example/server/orchestrator/main.go b/example/server/orchestrator/main.go index 0d0e0cf9..01f45cc4 100644 --- a/example/server/orchestrator/main.go +++ b/example/server/orchestrator/main.go @@ -45,9 +45,9 @@ import ( "github.com/uber/submitqueue/orchestrator/controller/conclude" logctrl "github.com/uber/submitqueue/orchestrator/controller/log" "github.com/uber/submitqueue/orchestrator/controller/merge" - "github.com/uber/submitqueue/orchestrator/controller/request" "github.com/uber/submitqueue/orchestrator/controller/score" "github.com/uber/submitqueue/orchestrator/controller/speculate" + "github.com/uber/submitqueue/orchestrator/controller/start" "github.com/uber/submitqueue/orchestrator/controller/validate" pb "github.com/uber/submitqueue/orchestrator/protopb" "go.uber.org/zap" @@ -283,7 +283,7 @@ func run() error { func newTopicRegistry(q extqueue.Queue, subscriberName string) (consumer.TopicRegistry, error) { return consumer.NewTopicRegistry([]consumer.TopicConfig{ { - Key: consumer.TopicKeyRequest, + Key: consumer.TopicKeyStart, Name: "request", Queue: q, Subscription: extqueue.DefaultSubscriptionConfig( @@ -375,12 +375,12 @@ func newTopicRegistry(q extqueue.Queue, subscriberName string) (consumer.TopicRe // └────────┴────────────────────────┘ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope tally.Scope, registry consumer.TopicRegistry, mc mergechecker.MergeChecker, cnt counter.Counter, store storage.Storage) error { - requestController := request.NewController( + requestController := start.NewController( logger, scope, store, registry, - consumer.TopicKeyRequest, + consumer.TopicKeyStart, "orchestrator-request", ) if err := c.Register(requestController); err != nil { diff --git a/gateway/controller/land.go b/gateway/controller/land.go index d0ae3e84..1bbc82fc 100644 --- a/gateway/controller/land.go +++ b/gateway/controller/land.go @@ -99,36 +99,34 @@ func (c *LandController) Land(ctx context.Context, req *pb.LandRequest) (*pb.Lan return nil, fmt.Errorf("LandController failed to generate request ID for queue=%s: %w", queue, err) } - request := entity.Request{ + landRequest := entity.LandRequest{ ID: fmt.Sprintf("%s/%d", queue, seq), Queue: queue, Change: change, LandStrategy: strategy, - State: entity.RequestStateNew, - Version: 1, } // Record the accepted status in the request log for reconciliation. Once the request materializes as a Request entity, the status might be updated to "new". // It is important to record the status before publishing to the queue for processing. It is important to publish straight to the database and not via a queue. // Gateway has to stay consistent with the request log. - logEntry := entity.NewRequestLog(request.ID, entity.RequestStatusAccepted, 0, "", nil) + logEntry := entity.NewRequestLog(landRequest.ID, entity.RequestStatusAccepted, 0, "", nil) if err := c.requestLogStore.Insert(ctx, logEntry); err != nil { - return nil, fmt.Errorf("LandController failed to insert request log for sqid=%s: %w", request.ID, err) + return nil, fmt.Errorf("LandController failed to insert request log for sqid=%s: %w", landRequest.ID, err) } c.logger.Debugw("land request created", "queue", req.Queue, - "sqid", request.ID, + "sqid", landRequest.ID, "change_uris", change.URIs, "change_count", len(change.URIs), "strategy", string(strategy), ) // Publish to queue for async processing - if err := c.publishToQueue(ctx, request); err != nil { + if err := c.publishToQueue(ctx, landRequest); err != nil { c.logger.Errorw("failed to publish request to queue", "queue", req.Queue, - "sqid", request.ID, + "sqid", landRequest.ID, "error", err, ) return nil, fmt.Errorf("LandController failed to publish request to queue: %w", err) @@ -136,33 +134,33 @@ func (c *LandController) Land(ctx context.Context, req *pb.LandRequest) (*pb.Lan c.logger.Infow("request published to queue", "queue", req.Queue, - "sqid", request.ID, + "sqid", landRequest.ID, "topic", c.topic, ) c.metricsScope.Counter("publish_success").Inc(1) return &pb.LandResponse{ - Sqid: request.ID, + Sqid: landRequest.ID, }, nil } -// publishToQueue publishes a request to the request queue for async processing. -func (c *LandController) publishToQueue(ctx context.Context, request entity.Request) error { - // Serialize request entity to JSON - payload, err := request.ToBytes() +// publishToQueue publishes a land request to the request queue for async processing. +func (c *LandController) publishToQueue(ctx context.Context, landRequest entity.LandRequest) error { + // Serialize land request entity to JSON + payload, err := landRequest.ToBytes() if err != nil { - return fmt.Errorf("failed to serialize request: %w", err) + return fmt.Errorf("failed to serialize land request: %w", err) } // Create queue message - // - Message ID: request.ID for idempotency - // - Payload: serialized Request entity - // - Partition key: request.Queue (ensures ordering per queue) - msg := queue.NewMessage(request.ID, payload, request.Queue, nil) + // - Message ID: landRequest.ID for idempotency + // - Payload: serialized LandRequest entity + // - Partition key: landRequest.Queue (ensures ordering per queue) + msg := queue.NewMessage(landRequest.ID, payload, landRequest.Queue, nil) // Publish to request topic if err := c.publisher.Publish(ctx, c.topic, msg); err != nil { - return fmt.Errorf("failed to publish message: %w", err) + return fmt.Errorf("failed to publish land request message: %w", err) } return nil @@ -181,6 +179,6 @@ func resolveRequestLandStrategy(s pb.Strategy) (entity.RequestLandStrategy, erro case pb.Strategy_MERGE: return entity.RequestLandStrategyMerge, nil default: - return entity.RequestLandStrategyUnknown, fmt.Errorf("unknown proto strategy: %v", s) + return entity.RequestLandStrategyUnknown, fmt.Errorf("unknown land strategy in proto message: %v", s) } } diff --git a/gateway/controller/land_test.go b/gateway/controller/land_test.go index 82203f9a..469cb800 100644 --- a/gateway/controller/land_test.go +++ b/gateway/controller/land_test.go @@ -201,14 +201,12 @@ func TestLand_PublishesToQueue(t *testing.T) { assert.Equal(t, "test-queue", publishedMessage.PartitionKey) // Verify payload can be deserialized - deserializedReq, err := entity.RequestFromBytes(publishedMessage.Payload) + deserializedReq, err := entity.LandRequestFromBytes(publishedMessage.Payload) require.NoError(t, err) assert.Equal(t, "test-queue/123", deserializedReq.ID) assert.Equal(t, "test-queue", deserializedReq.Queue) assert.Equal(t, []string{"github://uber/backend/pull/456/fed987cba"}, deserializedReq.Change.URIs) assert.Equal(t, entity.RequestLandStrategyRebase, deserializedReq.LandStrategy) - assert.Equal(t, entity.RequestStateNew, deserializedReq.State) - assert.Equal(t, int32(1), deserializedReq.Version) } func TestLand_ContinuesWhenPublishFails(t *testing.T) { diff --git a/orchestrator/controller/batch/batch_test.go b/orchestrator/controller/batch/batch_test.go index 37d5c0fe..88c3787d 100644 --- a/orchestrator/controller/batch/batch_test.go +++ b/orchestrator/controller/batch/batch_test.go @@ -99,7 +99,7 @@ func TestController_Process_Success(t *testing.T) { Queue: "test-queue", Change: entity.Change{URIs: []string{"github://uber/service/pull/456/abc123def"}}, LandStrategy: entity.RequestLandStrategyRebase, - State: entity.RequestStateNew, + State: entity.RequestStateStarted, Version: 1, } @@ -142,7 +142,7 @@ func TestController_Process_PublishFailure(t *testing.T) { Queue: "test-queue", Change: entity.Change{URIs: []string{"github://uber/service/pull/1/xyz789abc"}}, LandStrategy: entity.RequestLandStrategyRebase, - State: entity.RequestStateNew, + State: entity.RequestStateStarted, Version: 1, } @@ -170,7 +170,7 @@ func TestController_Process_CounterFailure(t *testing.T) { Queue: "test-queue", Change: entity.Change{URIs: []string{"github://uber/service/pull/456/abc123def"}}, LandStrategy: entity.RequestLandStrategyRebase, - State: entity.RequestStateNew, + State: entity.RequestStateStarted, Version: 1, } @@ -218,7 +218,7 @@ func TestController_Process_WithDependencies(t *testing.T) { Queue: "test-queue", Change: entity.Change{URIs: []string{"github://uber/service/pull/789/abc123def"}}, LandStrategy: entity.RequestLandStrategyRebase, - State: entity.RequestStateNew, + State: entity.RequestStateStarted, Version: 1, } diff --git a/orchestrator/controller/log/log_test.go b/orchestrator/controller/log/log_test.go index d82a0367..6c1237a6 100644 --- a/orchestrator/controller/log/log_test.go +++ b/orchestrator/controller/log/log_test.go @@ -49,7 +49,7 @@ func TestController_Process(t *testing.T) { { name: "success", logEntry: newRequestLog( - "test-queue/1", entity.RequestStatusNew, 1, "", nil, + "test-queue/1", entity.RequestStatusStarted, 1, "", nil, ), setupStore: func(ctrl *gomock.Controller) *storagemock.MockStorage { mockLogStore := storagemock.NewMockRequestLogStore(ctrl) diff --git a/orchestrator/controller/request/BUILD.bazel b/orchestrator/controller/start/BUILD.bazel similarity index 85% rename from orchestrator/controller/request/BUILD.bazel rename to orchestrator/controller/start/BUILD.bazel index d6855766..4b42243d 100644 --- a/orchestrator/controller/request/BUILD.bazel +++ b/orchestrator/controller/start/BUILD.bazel @@ -1,9 +1,9 @@ load("@rules_go//go:def.bzl", "go_library", "go_test") go_library( - name = "request", - srcs = ["request.go"], - importpath = "github.com/uber/submitqueue/orchestrator/controller/request", + name = "start", + srcs = ["start.go"], + importpath = "github.com/uber/submitqueue/orchestrator/controller/start", visibility = ["//visibility:public"], deps = [ "//core/consumer", @@ -17,9 +17,9 @@ go_library( ) go_test( - name = "request_test", - srcs = ["request_test.go"], - embed = [":request"], + name = "start_test", + srcs = ["start_test.go"], + embed = [":start"], deps = [ "//core/consumer", "//core/errs", diff --git a/orchestrator/controller/request/request.go b/orchestrator/controller/start/start.go similarity index 88% rename from orchestrator/controller/request/request.go rename to orchestrator/controller/start/start.go index 37e82b78..cc29a87f 100644 --- a/orchestrator/controller/request/request.go +++ b/orchestrator/controller/start/start.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package request +package start import ( "context" @@ -28,7 +28,7 @@ import ( "go.uber.org/zap" ) -// Controller handles request queue messages. +// Controller handles start queue messages. // It consumes requests, persists them to storage, and publishes to the validate stage. // Implements consumer.Controller interface for integration with the consumer. type Controller struct { @@ -43,7 +43,7 @@ type Controller struct { // Verify Controller implements consumer.Controller interface at compile time. var _ consumer.Controller = (*Controller)(nil) -// NewController creates a new request controller for the orchestrator. +// NewController creates a new start controller for the orchestrator. func NewController( logger *zap.SugaredLogger, scope tally.Scope, @@ -53,8 +53,8 @@ func NewController( consumerGroup string, ) *Controller { return &Controller{ - logger: logger.Named("request_controller"), - metricsScope: scope.SubScope("request_controller"), + logger: logger.Named("start_controller"), + metricsScope: scope.SubScope("start_controller"), store: store, registry: registry, topicKey: topicKey, @@ -70,10 +70,10 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er msg := delivery.Message() - // Deserialize request entity - request, err := entity.RequestFromBytes(msg.Payload) + // Deserialize land request from gateway + landRequest, err := entity.LandRequestFromBytes(msg.Payload) if err != nil { - c.logger.Errorw("failed to deserialize request", + c.logger.Errorw("failed to deserialize land request", "message_id", msg.ID, "partition_key", msg.PartitionKey, "attempt", delivery.Attempt(), @@ -81,7 +81,17 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er ) c.metricsScope.Counter("deserialize_errors").Inc(1) // Non-retryable: malformed messages will never succeed regardless of retry count - return fmt.Errorf("failed to deserialize request: %w", err) + return fmt.Errorf("failed to deserialize land request: %w", err) + } + + // Construct the full versioned Request entity with orchestrator-owned fields + request := entity.Request{ + ID: landRequest.ID, + Queue: landRequest.Queue, + Change: landRequest.Change, + LandStrategy: landRequest.LandStrategy, + State: entity.RequestStateStarted, + Version: 1, } c.logger.Infow("received land request event", @@ -107,7 +117,7 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er } // Record the "new" status in the request log - logEntry := entity.NewRequestLog(request.ID, entity.RequestStatusNew, request.Version, "", nil) + logEntry := entity.NewRequestLog(request.ID, entity.RequestStatusStarted, request.Version, "", nil) // Using request.ID as the partition key to ensure ordering of log entries for the same request // and parallel processing of log entries for different requests. if err := c.publishLog(ctx, logEntry, request.ID); err != nil { @@ -190,7 +200,7 @@ func (c *Controller) publishLog(ctx context.Context, logEntry entity.RequestLog, // Name returns the controller name for logging and metrics. func (c *Controller) Name() string { - return "request" + return "start" } // TopicKey returns the topic key this controller subscribes to. diff --git a/orchestrator/controller/request/request_test.go b/orchestrator/controller/start/start_test.go similarity index 73% rename from orchestrator/controller/request/request_test.go rename to orchestrator/controller/start/start_test.go index e37a78c0..78a9c56e 100644 --- a/orchestrator/controller/request/request_test.go +++ b/orchestrator/controller/start/start_test.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package request +package start import ( "context" @@ -56,7 +56,7 @@ func newTestController(t *testing.T, ctrl *gomock.Controller, store *storagemock ) require.NoError(t, err) - return NewController(logger, scope, store, registry, consumer.TopicKeyRequest, "orchestrator-request") + return NewController(logger, scope, store, registry, consumer.TopicKeyStart, "orchestrator-request") } // newMockStorage creates a MockStorage with a MockRequestStore that succeeds on Create. @@ -74,9 +74,9 @@ func TestNewController(t *testing.T) { controller := newTestController(t, ctrl, newMockStorage(ctrl), nil) require.NotNil(t, controller) - assert.Equal(t, consumer.TopicKeyRequest, controller.TopicKey()) + assert.Equal(t, consumer.TopicKeyStart, controller.TopicKey()) assert.Equal(t, "orchestrator-request", controller.ConsumerGroup()) - assert.Equal(t, "request", controller.Name()) + assert.Equal(t, "start", controller.Name()) } func TestController_Process_Success(t *testing.T) { @@ -84,16 +84,14 @@ func TestController_Process_Success(t *testing.T) { controller := newTestController(t, ctrl, newMockStorage(ctrl), nil) - request := entity.Request{ + landRequest := entity.LandRequest{ ID: "test-queue/123", Queue: "test-queue", Change: entity.Change{URIs: []string{"github://uber/service/pull/456/abc123def"}}, LandStrategy: entity.RequestLandStrategyRebase, - State: entity.RequestStateNew, - Version: 1, } - payload, err := request.ToBytes() + payload, err := landRequest.ToBytes() require.NoError(t, err) msg := queue.NewMessage("test-queue/123", payload, "test-queue", nil) @@ -124,16 +122,58 @@ func TestController_Process_InvalidJSON(t *testing.T) { assert.False(t, errs.IsRetryable(err)) } -func TestController_Process_AllRequestStates(t *testing.T) { +func TestController_Process_ConstructsRequestWithStateAndVersion(t *testing.T) { + ctrl := gomock.NewController(t) + + // Capture the request passed to Create + var capturedRequest entity.Request + mockReqStore := storagemock.NewMockRequestStore(ctrl) + mockReqStore.EXPECT().Create(gomock.Any(), gomock.Any()).DoAndReturn( + func(ctx context.Context, req entity.Request) error { + capturedRequest = req + return nil + }, + ) + store := storagemock.NewMockStorage(ctrl) + store.EXPECT().GetRequestStore().Return(mockReqStore).AnyTimes() + + controller := newTestController(t, ctrl, store, nil) + + landRequest := entity.LandRequest{ + ID: "test-queue/42", + Queue: "test-queue", + Change: entity.Change{URIs: []string{"github://uber/service/pull/1/abc123def"}}, + LandStrategy: entity.RequestLandStrategySquashRebase, + } + + payload, err := landRequest.ToBytes() + require.NoError(t, err) + + msg := queue.NewMessage(landRequest.ID, payload, landRequest.Queue, nil) + delivery := queuemock.NewMockDelivery(ctrl) + delivery.EXPECT().Message().Return(msg).AnyTimes() + delivery.EXPECT().Attempt().Return(1).AnyTimes() + + err = controller.Process(context.Background(), delivery) + require.NoError(t, err) + + // Verify the controller sets State and Version on the constructed Request + assert.Equal(t, landRequest.ID, capturedRequest.ID) + assert.Equal(t, landRequest.Queue, capturedRequest.Queue) + assert.Equal(t, landRequest.Change.URIs, capturedRequest.Change.URIs) + assert.Equal(t, landRequest.LandStrategy, capturedRequest.LandStrategy) + assert.Equal(t, entity.RequestStateStarted, capturedRequest.State) + assert.Equal(t, int32(1), capturedRequest.Version) +} + +func TestController_Process_AllStrategies(t *testing.T) { tests := []struct { name string - state entity.RequestState strategy entity.RequestLandStrategy }{ - {"new request", entity.RequestStateNew, entity.RequestLandStrategyRebase}, - {"processing request", entity.RequestStateProcessing, entity.RequestLandStrategySquashRebase}, - {"landed request", entity.RequestStateLanded, entity.RequestLandStrategyMerge}, - {"error request", entity.RequestStateError, entity.RequestLandStrategyRebase}, + {"rebase", entity.RequestLandStrategyRebase}, + {"squash rebase", entity.RequestLandStrategySquashRebase}, + {"merge", entity.RequestLandStrategyMerge}, } for _, tt := range tests { @@ -142,19 +182,17 @@ func TestController_Process_AllRequestStates(t *testing.T) { controller := newTestController(t, ctrl, newMockStorage(ctrl), nil) - request := entity.Request{ - ID: fmt.Sprintf("queue/%s", tt.state), + landRequest := entity.LandRequest{ + ID: fmt.Sprintf("queue/%s", tt.strategy), Queue: "test-queue", Change: entity.Change{URIs: []string{"github://uber/service/pull/1/aaa111bbb"}}, LandStrategy: tt.strategy, - State: tt.state, - Version: 1, } - payload, err := request.ToBytes() + payload, err := landRequest.ToBytes() require.NoError(t, err) - msg := queue.NewMessage(request.ID, payload, request.Queue, nil) + msg := queue.NewMessage(landRequest.ID, payload, landRequest.Queue, nil) delivery := queuemock.NewMockDelivery(ctrl) delivery.EXPECT().Message().Return(msg).AnyTimes() delivery.EXPECT().Attempt().Return(1).AnyTimes() @@ -170,7 +208,7 @@ func TestController_Process_MultipleChanges(t *testing.T) { controller := newTestController(t, ctrl, newMockStorage(ctrl), nil) - request := entity.Request{ + landRequest := entity.LandRequest{ ID: "queue/999", Queue: "test-queue", Change: entity.Change{ @@ -181,14 +219,12 @@ func TestController_Process_MultipleChanges(t *testing.T) { }, }, LandStrategy: entity.RequestLandStrategySquashRebase, - State: entity.RequestStateNew, - Version: 1, } - payload, err := request.ToBytes() + payload, err := landRequest.ToBytes() require.NoError(t, err) - msg := queue.NewMessage(request.ID, payload, request.Queue, nil) + msg := queue.NewMessage(landRequest.ID, payload, landRequest.Queue, nil) delivery := queuemock.NewMockDelivery(ctrl) delivery.EXPECT().Message().Return(msg).AnyTimes() delivery.EXPECT().Attempt().Return(1).AnyTimes() @@ -202,19 +238,17 @@ func TestController_Process_PublishFailure(t *testing.T) { controller := newTestController(t, ctrl, newMockStorage(ctrl), fmt.Errorf("publish failed")) - request := entity.Request{ + landRequest := entity.LandRequest{ ID: "test-queue/123", Queue: "test-queue", Change: entity.Change{URIs: []string{"github://uber/service/pull/1/xyz789abc"}}, LandStrategy: entity.RequestLandStrategyRebase, - State: entity.RequestStateNew, - Version: 1, } - payload, err := request.ToBytes() + payload, err := landRequest.ToBytes() require.NoError(t, err) - msg := queue.NewMessage(request.ID, payload, request.Queue, nil) + msg := queue.NewMessage(landRequest.ID, payload, landRequest.Queue, nil) delivery := queuemock.NewMockDelivery(ctrl) delivery.EXPECT().Message().Return(msg).AnyTimes() delivery.EXPECT().Attempt().Return(1).AnyTimes() @@ -233,19 +267,17 @@ func TestController_Process_StorageFailure(t *testing.T) { controller := newTestController(t, ctrl, store, nil) - request := entity.Request{ + landRequest := entity.LandRequest{ ID: "test-queue/123", Queue: "test-queue", Change: entity.Change{URIs: []string{"github://uber/service/pull/1/xyz789abc"}}, LandStrategy: entity.RequestLandStrategyRebase, - State: entity.RequestStateNew, - Version: 1, } - payload, err := request.ToBytes() + payload, err := landRequest.ToBytes() require.NoError(t, err) - msg := queue.NewMessage(request.ID, payload, request.Queue, nil) + msg := queue.NewMessage(landRequest.ID, payload, landRequest.Queue, nil) delivery := queuemock.NewMockDelivery(ctrl) delivery.EXPECT().Message().Return(msg).AnyTimes() delivery.EXPECT().Attempt().Return(1).AnyTimes() @@ -265,19 +297,17 @@ func TestController_Process_AlreadyExistsSucceeds(t *testing.T) { controller := newTestController(t, ctrl, store, nil) - request := entity.Request{ + landRequest := entity.LandRequest{ ID: "test-queue/123", Queue: "test-queue", Change: entity.Change{URIs: []string{"github://uber/service/pull/1/xyz789abc"}}, LandStrategy: entity.RequestLandStrategyRebase, - State: entity.RequestStateNew, - Version: 1, } - payload, err := request.ToBytes() + payload, err := landRequest.ToBytes() require.NoError(t, err) - msg := queue.NewMessage(request.ID, payload, request.Queue, nil) + msg := queue.NewMessage(landRequest.ID, payload, landRequest.Queue, nil) delivery := queuemock.NewMockDelivery(ctrl) delivery.EXPECT().Message().Return(msg).AnyTimes() delivery.EXPECT().Attempt().Return(1).AnyTimes() diff --git a/orchestrator/controller/validate/validate_test.go b/orchestrator/controller/validate/validate_test.go index 956bc937..50aef98f 100644 --- a/orchestrator/controller/validate/validate_test.go +++ b/orchestrator/controller/validate/validate_test.go @@ -85,7 +85,7 @@ func TestController_Process_Success(t *testing.T) { Queue: "test-queue", Change: entity.Change{URIs: []string{"github://uber/service/pull/456/abc123def"}}, LandStrategy: entity.RequestLandStrategyRebase, - State: entity.RequestStateNew, + State: entity.RequestStateStarted, Version: 1, } @@ -130,7 +130,7 @@ func TestController_Process_PublishFailure(t *testing.T) { Queue: "test-queue", Change: entity.Change{URIs: []string{"github://uber/service/pull/1/xyz789abc"}}, LandStrategy: entity.RequestLandStrategyRebase, - State: entity.RequestStateNew, + State: entity.RequestStateStarted, Version: 1, } @@ -167,7 +167,7 @@ func TestController_Process_NotMergeable(t *testing.T) { Queue: "test-queue", Change: entity.Change{URIs: []string{"github://uber/repo/1/abc123"}}, LandStrategy: entity.RequestLandStrategyRebase, - State: entity.RequestStateNew, + State: entity.RequestStateStarted, Version: 1, } @@ -197,7 +197,7 @@ func TestController_Process_MergeCheckError(t *testing.T) { Queue: "test-queue", Change: entity.Change{URIs: []string{"github://uber/repo/1/abc123"}}, LandStrategy: entity.RequestLandStrategyRebase, - State: entity.RequestStateNew, + State: entity.RequestStateStarted, Version: 1, } diff --git a/test/integration/extension/storage/suite.go b/test/integration/extension/storage/suite.go index 0529b2e3..77825d5f 100644 --- a/test/integration/extension/storage/suite.go +++ b/test/integration/extension/storage/suite.go @@ -58,7 +58,7 @@ func (s *StorageContractSuite) TestStorage_CreateAndGet() { request := entity.Request{ ID: "test/create-get", Queue: "test-queue", - State: entity.RequestStateNew, + State: entity.RequestStateStarted, Change: entity.Change{ URIs: []string{"github://uber/storage-test/pull/123/abc123def"}, }, @@ -101,7 +101,7 @@ func (s *StorageContractSuite) TestStorage_CreateAndGet_StackedPRs() { request := entity.Request{ ID: "test/stacked-prs", Queue: "test-queue", - State: entity.RequestStateNew, + State: entity.RequestStateStarted, Change: entity.Change{ URIs: stackedURIs, }, @@ -133,7 +133,7 @@ func (s *StorageContractSuite) TestStorage_UpdateState() { request := entity.Request{ ID: "test/update", Queue: "test-queue", - State: entity.RequestStateNew, + State: entity.RequestStateStarted, LandStrategy: entity.RequestLandStrategyMerge, Version: 1, } @@ -163,7 +163,7 @@ func (s *StorageContractSuite) TestStorage_OptimisticLocking() { request := entity.Request{ ID: "test/optimistic-lock", Queue: "test-queue", - State: entity.RequestStateNew, + State: entity.RequestStateStarted, LandStrategy: entity.RequestLandStrategyMerge, Version: 1, } @@ -211,7 +211,7 @@ func (s *StorageContractSuite) TestStorage_CreateDuplicate() { request := entity.Request{ ID: "test/duplicate", Queue: "test-queue", - State: entity.RequestStateNew, + State: entity.RequestStateStarted, LandStrategy: entity.RequestLandStrategyMerge, Version: 1, }