diff --git a/.gitignore b/.gitignore index d9451591..899dc31e 100644 --- a/.gitignore +++ b/.gitignore @@ -5,6 +5,7 @@ MODULE.bazel.lock .vscode/ .idea/ .claude/ +.mcp.json .ijwb/ diff --git a/core/consumer/registry.go b/core/consumer/registry.go index 83f0d943..aca84f2d 100644 --- a/core/consumer/registry.go +++ b/core/consumer/registry.go @@ -46,6 +46,8 @@ const ( TopicKeyMerge TopicKey = "merge" // TopicKeyConclude is the pipeline stage where merged requests are published for conclusion. TopicKeyConclude TopicKey = "conclude" + // TopicKeyLog is the pipeline stage where per-request logs are written. + TopicKeyLog TopicKey = "log" ) // String returns the topic key as a string. diff --git a/core/request/request.go b/core/request/request.go index a41c3376..db58fa7e 100644 --- a/core/request/request.go +++ b/core/request/request.go @@ -25,7 +25,7 @@ import ( // CurrentState holds the current request status obtained from the request log. It is eventually consistent with the request status in the request store. It might take some time to converge, typically no more than a few seconds. type CurrentState struct { // Status is the current request status obtained from the request log. - Status string + Status entity.RequestStatus // LastError is the last error associated with the current status. LastError string // Metadata is the metadata associated with the current status. @@ -68,7 +68,7 @@ func GetCurrentStateFromRequestLog(ctx context.Context, store storage.RequestLog } // A terminal candidate must have a version from the Request entity and a terminal status. - if log.RequestVersion > 0 && entity.IsRequestStateTerminal(entity.RequestState(log.Status)) { + if log.RequestVersion > 0 && entity.IsRequestStateTerminal(entity.RequestState(string(log.Status))) { if bestTerminal == nil || log.RequestVersion > bestTerminal.RequestVersion || (log.RequestVersion == bestTerminal.RequestVersion && log.TimestampMs > bestTerminal.TimestampMs) { diff --git a/core/request/request_test.go b/core/request/request_test.go index 3ed4f11b..075fabad 100644 --- a/core/request/request_test.go +++ b/core/request/request_test.go @@ -37,60 +37,60 @@ func TestGetCurrentStateFromRequestLog(t *testing.T) { { name: "single record", logs: []entity.RequestLog{ - {RequestID: "q/1", TimestampMs: 1000, Status: "new", RequestVersion: 1, LastError: "", Metadata: map[string]string{}}, + {RequestID: "q/1", TimestampMs: 1000, Status: entity.RequestStatusNew, RequestVersion: 1, LastError: "", Metadata: map[string]string{}}, }, - expected: CurrentState{Status: "new", LastError: "", Metadata: map[string]string{}}, + expected: CurrentState{Status: entity.RequestStatusNew, LastError: "", Metadata: map[string]string{}}, }, { name: "terminal status wins over later non-terminal", logs: []entity.RequestLog{ - {RequestID: "q/1", TimestampMs: 1000, Status: "new", RequestVersion: 1, LastError: "", Metadata: map[string]string{}}, - {RequestID: "q/1", TimestampMs: 2000, Status: "landed", RequestVersion: 3, LastError: "", Metadata: map[string]string{"batch": "b1"}}, - {RequestID: "q/1", TimestampMs: 3000, Status: "processing", RequestVersion: 0, LastError: "", Metadata: map[string]string{}}, + {RequestID: "q/1", TimestampMs: 1000, Status: entity.RequestStatusNew, RequestVersion: 1, LastError: "", Metadata: map[string]string{}}, + {RequestID: "q/1", TimestampMs: 2000, Status: entity.RequestStatusLanded, RequestVersion: 3, LastError: "", Metadata: map[string]string{"batch": "b1"}}, + {RequestID: "q/1", TimestampMs: 3000, Status: entity.RequestStatusProcessing, RequestVersion: 0, LastError: "", Metadata: map[string]string{}}, }, - expected: CurrentState{Status: "landed", LastError: "", Metadata: map[string]string{"batch": "b1"}}, + expected: CurrentState{Status: entity.RequestStatusLanded, LastError: "", Metadata: map[string]string{"batch": "b1"}}, }, { name: "terminal error status with last error", logs: []entity.RequestLog{ - {RequestID: "q/1", TimestampMs: 1000, Status: "new", RequestVersion: 1, LastError: "", Metadata: map[string]string{}}, - {RequestID: "q/1", TimestampMs: 2000, Status: "error", RequestVersion: 4, LastError: "merge conflict", Metadata: map[string]string{"step": "merge"}}, + {RequestID: "q/1", TimestampMs: 1000, Status: entity.RequestStatusNew, RequestVersion: 1, LastError: "", Metadata: map[string]string{}}, + {RequestID: "q/1", TimestampMs: 2000, Status: entity.RequestStatusError, RequestVersion: 4, LastError: "merge conflict", Metadata: map[string]string{"step": "merge"}}, }, - expected: CurrentState{Status: "error", LastError: "merge conflict", Metadata: map[string]string{"step": "merge"}}, + expected: CurrentState{Status: entity.RequestStatusError, LastError: "merge conflict", Metadata: map[string]string{"step": "merge"}}, }, { name: "multiple terminal records picks highest version", logs: []entity.RequestLog{ - {RequestID: "q/1", TimestampMs: 1000, Status: "error", RequestVersion: 2, LastError: "timeout", Metadata: map[string]string{}}, - {RequestID: "q/1", TimestampMs: 2000, Status: "landed", RequestVersion: 5, LastError: "", Metadata: map[string]string{"final": "true"}}, - {RequestID: "q/1", TimestampMs: 3000, Status: "error", RequestVersion: 3, LastError: "conflict", Metadata: map[string]string{}}, + {RequestID: "q/1", TimestampMs: 1000, Status: entity.RequestStatusError, RequestVersion: 2, LastError: "timeout", Metadata: map[string]string{}}, + {RequestID: "q/1", TimestampMs: 2000, Status: entity.RequestStatusLanded, RequestVersion: 5, LastError: "", Metadata: map[string]string{"final": "true"}}, + {RequestID: "q/1", TimestampMs: 3000, Status: entity.RequestStatusError, RequestVersion: 3, LastError: "conflict", Metadata: map[string]string{}}, }, - expected: CurrentState{Status: "landed", LastError: "", Metadata: map[string]string{"final": "true"}}, + expected: CurrentState{Status: entity.RequestStatusLanded, LastError: "", Metadata: map[string]string{"final": "true"}}, }, { name: "same version terminal records uses timestamp tiebreaker", logs: []entity.RequestLog{ - {RequestID: "q/1", TimestampMs: 1000, Status: "error", RequestVersion: 3, LastError: "first", Metadata: map[string]string{}}, - {RequestID: "q/1", TimestampMs: 2000, Status: "error", RequestVersion: 3, LastError: "second", Metadata: map[string]string{}}, + {RequestID: "q/1", TimestampMs: 1000, Status: entity.RequestStatusError, RequestVersion: 3, LastError: "first", Metadata: map[string]string{}}, + {RequestID: "q/1", TimestampMs: 2000, Status: entity.RequestStatusError, RequestVersion: 3, LastError: "second", Metadata: map[string]string{}}, }, - expected: CurrentState{Status: "error", LastError: "second", Metadata: map[string]string{}}, + expected: CurrentState{Status: entity.RequestStatusError, LastError: "second", Metadata: map[string]string{}}, }, { name: "terminal status without version is not terminal", logs: []entity.RequestLog{ - {RequestID: "q/1", TimestampMs: 1000, Status: "landed", RequestVersion: 0, LastError: "", Metadata: map[string]string{}}, - {RequestID: "q/1", TimestampMs: 2000, Status: "processing", RequestVersion: 0, LastError: "", Metadata: map[string]string{"source": "gw"}}, + {RequestID: "q/1", TimestampMs: 1000, Status: entity.RequestStatusLanded, RequestVersion: 0, LastError: "", Metadata: map[string]string{}}, + {RequestID: "q/1", TimestampMs: 2000, Status: entity.RequestStatusProcessing, RequestVersion: 0, LastError: "", Metadata: map[string]string{"source": "gw"}}, }, - expected: CurrentState{Status: "processing", LastError: "", Metadata: map[string]string{"source": "gw"}}, + expected: CurrentState{Status: entity.RequestStatusProcessing, LastError: "", Metadata: map[string]string{"source": "gw"}}, }, { name: "no terminal records falls back to latest timestamp", logs: []entity.RequestLog{ - {RequestID: "q/1", TimestampMs: 1000, Status: "new", RequestVersion: 1, LastError: "", Metadata: map[string]string{}}, - {RequestID: "q/1", TimestampMs: 3000, Status: "validated", RequestVersion: 2, LastError: "", Metadata: map[string]string{}}, - {RequestID: "q/1", TimestampMs: 2000, Status: "processing", RequestVersion: 0, LastError: "", Metadata: map[string]string{}}, + {RequestID: "q/1", TimestampMs: 1000, Status: entity.RequestStatusNew, RequestVersion: 1, LastError: "", Metadata: map[string]string{}}, + {RequestID: "q/1", TimestampMs: 3000, Status: entity.RequestStatusValidated, RequestVersion: 2, LastError: "", Metadata: map[string]string{}}, + {RequestID: "q/1", TimestampMs: 2000, Status: entity.RequestStatusProcessing, RequestVersion: 0, LastError: "", Metadata: map[string]string{}}, }, - expected: CurrentState{Status: "validated", LastError: "", Metadata: map[string]string{}}, + expected: CurrentState{Status: entity.RequestStatusValidated, LastError: "", Metadata: map[string]string{}}, }, } diff --git a/entity/request.go b/entity/request.go index 70060f92..4a299cd1 100644 --- a/entity/request.go +++ b/entity/request.go @@ -30,7 +30,7 @@ const ( RequestLandStrategyMerge RequestLandStrategy = "merge" ) -// RequestState defines the possible states of a land request. +// RequestState defines the possible states of a land request. They are internal and used to implement a state machine. A separate RequestStatus type is used to track the customer-friendly status of a request. type RequestState string const ( diff --git a/entity/request_log.go b/entity/request_log.go index 96228f58..285b2db0 100644 --- a/entity/request_log.go +++ b/entity/request_log.go @@ -19,6 +19,64 @@ import ( "time" ) +// RequestLogStatus defines the possible status of a request. Status is customer-friendly and can be displayed to the user. +// It is different from the request state, which is internal and used to implement a state machine. Request statuses can be +// generally added freely by the system without breaking the state machine. +// Some statuses correspond to the request state, in which case they should be supplemented with the request state version to be used for reconciliation. +// Other statuses are purely informational and can be added freely. +// Every status may be accompanied by a last error message and free-formmetadata in the Request Log. It will only be used for display or debugging purposes. +type RequestStatus string + +const ( + // RequestStatusUnknown is the unknown sentinel status. It is set by default when the structure is initialized. It should never be seen in the system. + RequestStatusUnknown RequestStatus = "" + + // RequestStatusAccepted indicates that the request has been accepted by the system. Typically a gateway service will set this status when the land request is received and persisted to the logging database. + RequestStatusAccepted RequestStatus = "accepted" + + // RequestStatusNew is the initial status of a request. It corresponds to the RequestStateNew state and typically set by the orchestrator service when the request is received and persisted to the operating database. + RequestStatusNew RequestStatus = "new" + + // RequestStatusValidating indicates that the request is currently being validated (e.g., duplicate check, merge check, etc.). + RequestStatusValidating RequestStatus = "validating" + + // RequestStatusValidated indicates that the request has been validated (duplicate check, merge check etc.) successfully. It corresponds to the RequestStateValidated state. + RequestStatusValidated RequestStatus = "validated" + + // RequestStatusBatching indicates that the request is waiting to be included in a batch. + RequestStatusBatching RequestStatus = "batching" + + // RequestStatusBatched indicates that the request has been included in a new batch and will be sent to speculation. + RequestStatusBatched RequestStatus = "batched" + + // RequestStatusSpeculating indicates that the request is currently being speculated (e.g., speculative merge/rebase, etc.). + RequestStatusSpeculating RequestStatus = "speculating" + + // RequestStatusSpeculated indicates that the request has been successfully speculated and is ready to be validated via a build system. + RequestStatusSpeculated RequestStatus = "speculated" + + // RequestStatusBuilding indicates that the request is currently being built (e.g., CI/CD system is building the change on top of the speculation path). + RequestStatusBuilding RequestStatus = "building" + + // RequestStatusBuilt indicates that the request has finished the build step successfully and can move to the next phase, either wait for other requests to finish or move to the land phase. + RequestStatusBuilt RequestStatus = "built" + + // RequestStatusWaitingPath indicates that the request is waiting for other preceiding request in the same speculation path to finish. + RequestStatusWaitingPath RequestStatus = "waitingpath" + + // RequestStatusLanding indicates that the request is actively being landed (e.g., source control operation is in progress to push the change to the target branch). + RequestStatusLanding RequestStatus = "landing" + + // RequestStatusProcessing is the status of a request that is being processed. It corresponds to the RequestStateProcessing state. + RequestStatusProcessing RequestStatus = "processing" + + // RequestStatusLanded indicates that the request has been successfully processed and landed. It corresponds to the RequestStateLanded state. + RequestStatusLanded RequestStatus = "landed" + + // RequestStatusError indicates that the request has encountered an error. It corresponds to the RequestStateError state. + RequestStatusError RequestStatus = "error" +) + // RequestLog is an append-only record that captures a point-in-time snapshot of a request's status // for reconciliation purposes. It is stored in a separate database from the request store to support // eventual consistency reconciliation. @@ -27,8 +85,8 @@ type RequestLog struct { RequestID string `json:"request_id"` // TimestampMs is the time this log entry was created, in milliseconds since Unix epoch. TimestampMs int64 `json:"timestamp_ms"` - // Status is the request status at the time this log entry was created. It does not have to correspond to the request status. For example, it may contain intermediate statuses like "validated" or "processing". - Status string `json:"status"` + // Status is the request status at the time this log entry was created. It may contain requests states from the state machine and also display-friendly intermediate statuses. + Status RequestStatus `json:"status"` // RequestVersion is the version of the request at the time this log entry was created. // Zero if the version is not available. RequestVersion int32 `json:"request_version"` @@ -42,7 +100,10 @@ type RequestLog struct { // NewRequestLog creates a new RequestLog with the given fields. // TimestampMs is set to the current time. If metadata is nil, it will be initialized as an empty map. -func NewRequestLog(requestID string, status string, requestVersion int32, lastError string, metadata map[string]string) RequestLog { +// requestVersion is the version of the request entity, should only be set if reporting a request state as a status, otherwise it should be 0. +// lastError is the last error message associated with the status at the time of this log entry, empty string if no error. +// metadata is a set of key-value pairs providing additional context for this log entry. Not constrained to any specific format or schema, used for display or debugging purposes. +func NewRequestLog(requestID string, status RequestStatus, requestVersion int32, lastError string, metadata map[string]string) RequestLog { if metadata == nil { metadata = make(map[string]string) } diff --git a/entity/request_log_test.go b/entity/request_log_test.go index b8a18f1f..3283e2ea 100644 --- a/entity/request_log_test.go +++ b/entity/request_log_test.go @@ -22,7 +22,7 @@ import ( ) func TestNewRequestLog_NilMetadata(t *testing.T) { - log := NewRequestLog("queue1/100", "new", 0, "", nil) + log := NewRequestLog("queue1/100", RequestStatusNew, 0, "", nil) assert.NotNil(t, log.Metadata) assert.Empty(t, log.Metadata) @@ -32,7 +32,7 @@ func TestRequestLog_ToBytes(t *testing.T) { log := RequestLog{ RequestID: "test-queue/123", TimestampMs: 1709568000000, - Status: "new", + Status: RequestStatusNew, RequestVersion: 1, LastError: "", Metadata: map[string]string{"source": "gateway"}, @@ -52,7 +52,7 @@ func TestRequestLogFromBytes(t *testing.T) { original := RequestLog{ RequestID: "my-queue/999", TimestampMs: 1709568000000, - Status: "processing", + Status: RequestStatusProcessing, RequestVersion: 3, LastError: "timeout", Metadata: map[string]string{"step": "validation", "attempt": "2"}, @@ -104,7 +104,7 @@ func TestRequestLog_SerializationRoundTrip(t *testing.T) { log: RequestLog{ RequestID: "queue1/100", TimestampMs: 1709568000000, - Status: "landed", + Status: RequestStatusLanded, RequestVersion: 5, LastError: "", Metadata: map[string]string{"source": "orchestrator", "batch_id": "b-1"}, @@ -115,7 +115,7 @@ func TestRequestLog_SerializationRoundTrip(t *testing.T) { log: RequestLog{ RequestID: "queue2/200", TimestampMs: 1709568001000, - Status: "error", + Status: RequestStatusError, RequestVersion: 2, LastError: "merge conflict detected", Metadata: map[string]string{}, @@ -126,7 +126,7 @@ func TestRequestLog_SerializationRoundTrip(t *testing.T) { log: RequestLog{ RequestID: "queue3/300", TimestampMs: 1709568002000, - Status: "new", + Status: RequestStatusNew, RequestVersion: 0, LastError: "", Metadata: map[string]string{"key": "value"}, diff --git a/example/server/orchestrator/BUILD.bazel b/example/server/orchestrator/BUILD.bazel index 26ad6f18..9ef77704 100644 --- a/example/server/orchestrator/BUILD.bazel +++ b/example/server/orchestrator/BUILD.bazel @@ -25,6 +25,7 @@ go_library( "//orchestrator/controller/build", "//orchestrator/controller/buildsignal", "//orchestrator/controller/conclude", + "//orchestrator/controller/log", "//orchestrator/controller/merge", "//orchestrator/controller/request", "//orchestrator/controller/score", diff --git a/example/server/orchestrator/main.go b/example/server/orchestrator/main.go index dfa964d4..0d0e0cf9 100644 --- a/example/server/orchestrator/main.go +++ b/example/server/orchestrator/main.go @@ -43,6 +43,7 @@ import ( "github.com/uber/submitqueue/orchestrator/controller/build" "github.com/uber/submitqueue/orchestrator/controller/buildsignal" "github.com/uber/submitqueue/orchestrator/controller/conclude" + logctrl "github.com/uber/submitqueue/orchestrator/controller/log" "github.com/uber/submitqueue/orchestrator/controller/merge" "github.com/uber/submitqueue/orchestrator/controller/request" "github.com/uber/submitqueue/orchestrator/controller/score" @@ -353,6 +354,14 @@ func newTopicRegistry(q extqueue.Queue, subscriberName string) (consumer.TopicRe subscriberName, "orchestrator-conclude", ), }, + { + Key: consumer.TopicKeyLog, + Name: "log", + Queue: q, + Subscription: extqueue.DefaultSubscriptionConfig( + subscriberName, "orchestrator-log", + ), + }, }) } @@ -470,6 +479,17 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t return fmt.Errorf("failed to register conclude controller: %w", err) } + logController := logctrl.NewController( + logger, + scope, + store, + consumer.TopicKeyLog, + "orchestrator-log", + ) + if err := c.Register(logController); err != nil { + return fmt.Errorf("failed to register log controller: %w", err) + } + return nil } diff --git a/gateway/controller/land.go b/gateway/controller/land.go index fff66cf5..d0ae3e84 100644 --- a/gateway/controller/land.go +++ b/gateway/controller/land.go @@ -109,7 +109,9 @@ func (c *LandController) Land(ctx context.Context, req *pb.LandRequest) (*pb.Lan } // Record the accepted status in the request log for reconciliation. Once the request materializes as a Request entity, the status might be updated to "new". - logEntry := entity.NewRequestLog(request.ID, "accepted", 0, "", nil) + // It is important to record the status before publishing to the queue for processing. It is important to publish straight to the database and not via a queue. + // Gateway has to stay consistent with the request log. + logEntry := entity.NewRequestLog(request.ID, entity.RequestStatusAccepted, 0, "", nil) if err := c.requestLogStore.Insert(ctx, logEntry); err != nil { return nil, fmt.Errorf("LandController failed to insert request log for sqid=%s: %w", request.ID, err) } diff --git a/orchestrator/controller/log/BUILD.bazel b/orchestrator/controller/log/BUILD.bazel new file mode 100644 index 00000000..d891b39c --- /dev/null +++ b/orchestrator/controller/log/BUILD.bazel @@ -0,0 +1,32 @@ +load("@rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "log", + srcs = ["log.go"], + importpath = "github.com/uber/submitqueue/orchestrator/controller/log", + visibility = ["//visibility:public"], + deps = [ + "//core/consumer", + "//entity", + "//extension/storage", + "@com_github_uber_go_tally_v4//:tally", + "@org_uber_go_zap//:zap", + ], +) + +go_test( + name = "log_test", + srcs = ["log_test.go"], + embed = [":log"], + deps = [ + "//core/consumer", + "//entity", + "//entity/queue", + "//extension/queue/mock", + "//extension/storage/mock", + "@com_github_stretchr_testify//require", + "@com_github_uber_go_tally_v4//:tally", + "@org_uber_go_mock//gomock", + "@org_uber_go_zap//zaptest", + ], +) diff --git a/orchestrator/controller/log/log.go b/orchestrator/controller/log/log.go new file mode 100644 index 00000000..3edd412e --- /dev/null +++ b/orchestrator/controller/log/log.go @@ -0,0 +1,106 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package log + +import ( + "context" + "fmt" + + "github.com/uber-go/tally/v4" + "github.com/uber/submitqueue/core/consumer" + "github.com/uber/submitqueue/entity" + "github.com/uber/submitqueue/extension/storage" + "go.uber.org/zap" +) + +// Controller handles log queue messages. +// It consumes request log entries and persists them to storage. +// Implements consumer.Controller interface for integration with the consumer. +type Controller struct { + logger *zap.SugaredLogger + metricsScope tally.Scope + store storage.Storage + topicKey consumer.TopicKey + consumerGroup string +} + +// Verify Controller implements consumer.Controller interface at compile time. +var _ consumer.Controller = (*Controller)(nil) + +// NewController creates a new log controller for the orchestrator. +func NewController( + logger *zap.SugaredLogger, + scope tally.Scope, + store storage.Storage, + topicKey consumer.TopicKey, + consumerGroup string, +) *Controller { + return &Controller{ + logger: logger.Named("log_controller"), + metricsScope: scope.SubScope("log_controller"), + store: store, + topicKey: topicKey, + consumerGroup: consumerGroup, + } +} + +// Process processes a log delivery from the queue. +// Deserializes the request log entry and persists it to storage. +// Returns nil to ack (success), or error to nack (retry). +func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) error { + c.metricsScope.Counter("received").Inc(1) + + msg := delivery.Message() + + // Deserialize request log entry + logEntry, err := entity.RequestLogFromBytes(msg.Payload) + if err != nil { + c.metricsScope.Counter("deserialize_errors").Inc(1) + // Non-retryable: malformed messages will never succeed regardless of retry count + return fmt.Errorf("failed to deserialize request log: %w", err) + } + + c.logger.Debugw("received request log entry", + "request_id", logEntry.RequestID, + "status", string(logEntry.Status), + "request_version", logEntry.RequestVersion, + "attempt", delivery.Attempt(), + ) + + // Persist request log to storage + if err := c.store.GetRequestLogStore().Insert(ctx, logEntry); err != nil { + c.metricsScope.Counter("storage_errors").Inc(1) + return fmt.Errorf("failed to insert request log: %w", err) + } + + c.metricsScope.Counter("processed").Inc(1) + + return nil // Success - message will be acked +} + +// Name returns the controller name for logging and metrics. +func (c *Controller) Name() string { + return "log" +} + +// TopicKey returns the topic key this controller subscribes to. +func (c *Controller) TopicKey() consumer.TopicKey { + return c.topicKey +} + +// ConsumerGroup returns the consumer group for offset tracking. +func (c *Controller) ConsumerGroup() string { + return c.consumerGroup +} diff --git a/orchestrator/controller/log/log_test.go b/orchestrator/controller/log/log_test.go new file mode 100644 index 00000000..d82a0367 --- /dev/null +++ b/orchestrator/controller/log/log_test.go @@ -0,0 +1,123 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package log + +import ( + "context" + "fmt" + "testing" + + "github.com/stretchr/testify/require" + "github.com/uber-go/tally/v4" + "github.com/uber/submitqueue/core/consumer" + "github.com/uber/submitqueue/entity" + "github.com/uber/submitqueue/entity/queue" + queuemock "github.com/uber/submitqueue/extension/queue/mock" + storagemock "github.com/uber/submitqueue/extension/storage/mock" + "go.uber.org/mock/gomock" + "go.uber.org/zap/zaptest" +) + +// newTestController creates a controller with test dependencies. +func newTestController(t *testing.T, ctrl *gomock.Controller, store *storagemock.MockStorage) *Controller { + logger := zaptest.NewLogger(t).Sugar() + scope := tally.NoopScope + + return NewController(logger, scope, store, consumer.TopicKeyLog, "orchestrator-log") +} + +func TestController_Process(t *testing.T) { + tests := []struct { + name string + logEntry *entity.RequestLog // nil means use rawPayload instead + rawPayload []byte // used when logEntry is nil (e.g. invalid JSON) + setupStore func(*gomock.Controller) *storagemock.MockStorage + wantErr bool + }{ + { + name: "success", + logEntry: newRequestLog( + "test-queue/1", entity.RequestStatusNew, 1, "", nil, + ), + setupStore: func(ctrl *gomock.Controller) *storagemock.MockStorage { + mockLogStore := storagemock.NewMockRequestLogStore(ctrl) + mockLogStore.EXPECT().Insert(gomock.Any(), gomock.Any()).Return(nil) + store := storagemock.NewMockStorage(ctrl) + store.EXPECT().GetRequestLogStore().Return(mockLogStore).AnyTimes() + return store + }, + wantErr: false, + }, + { + name: "invalid JSON", + rawPayload: []byte(`{"invalid": json"}`), + setupStore: func(ctrl *gomock.Controller) *storagemock.MockStorage { + return storagemock.NewMockStorage(ctrl) + }, + wantErr: true, + }, + { + name: "storage failure", + logEntry: newRequestLog( + "test-queue/2", entity.RequestStatusError, 3, "merge conflict", map[string]string{"step": "merge"}, + ), + setupStore: func(ctrl *gomock.Controller) *storagemock.MockStorage { + mockLogStore := storagemock.NewMockRequestLogStore(ctrl) + mockLogStore.EXPECT().Insert(gomock.Any(), gomock.Any()).Return(fmt.Errorf("database connection failed")) + store := storagemock.NewMockStorage(ctrl) + store.EXPECT().GetRequestLogStore().Return(mockLogStore).AnyTimes() + return store + }, + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctrl := gomock.NewController(t) + + var payload []byte + if tt.logEntry != nil { + var err error + payload, err = tt.logEntry.ToBytes() + require.NoError(t, err) + } else { + payload = tt.rawPayload + } + + store := tt.setupStore(ctrl) + controller := newTestController(t, ctrl, store) + + msg := queue.NewMessage("test-queue/1", payload, "test-queue", nil) + delivery := queuemock.NewMockDelivery(ctrl) + delivery.EXPECT().Message().Return(msg).AnyTimes() + delivery.EXPECT().Attempt().Return(1).AnyTimes() + + err := controller.Process(context.Background(), delivery) + + if tt.wantErr { + require.Error(t, err) + } else { + require.NoError(t, err) + } + }) + } +} + +// newRequestLog is a helper that returns a pointer to a RequestLog for use in test tables. +func newRequestLog(requestID string, status entity.RequestStatus, requestVersion int32, lastError string, metadata map[string]string) *entity.RequestLog { + log := entity.NewRequestLog(requestID, status, requestVersion, lastError, metadata) + return &log +} diff --git a/orchestrator/controller/request/request.go b/orchestrator/controller/request/request.go index d5335137..37e82b78 100644 --- a/orchestrator/controller/request/request.go +++ b/orchestrator/controller/request/request.go @@ -106,6 +106,15 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er return errs.NewRetryableError(fmt.Errorf("failed to create request: %w", err)) } + // Record the "new" status in the request log + logEntry := entity.NewRequestLog(request.ID, entity.RequestStatusNew, request.Version, "", nil) + // Using request.ID as the partition key to ensure ordering of log entries for the same request + // and parallel processing of log entries for different requests. + if err := c.publishLog(ctx, logEntry, request.ID); err != nil { + c.metricsScope.Counter("request_log_errors").Inc(1) + return fmt.Errorf("failed to publish request log: %w", err) + } + // Publish to validate topic if err := c.publish(ctx, consumer.TopicKeyValidate, request); err != nil { c.logger.Errorw("failed to publish output", @@ -153,6 +162,32 @@ func (c *Controller) publish(ctx context.Context, key consumer.TopicKey, request return nil } +// publishLog publishes a request log entry to the log topic for async persistence. +func (c *Controller) publishLog(ctx context.Context, logEntry entity.RequestLog, partitionKey string) error { + payload, err := logEntry.ToBytes() + if err != nil { + return fmt.Errorf("failed to serialize request log: %w", err) + } + + msg := entityqueue.NewMessage(logEntry.RequestID, payload, partitionKey, nil) + + q, ok := c.registry.Queue(consumer.TopicKeyLog) + if !ok { + return fmt.Errorf("no queue registered for topic key %s", consumer.TopicKeyLog) + } + + topicName, ok := c.registry.TopicName(consumer.TopicKeyLog) + if !ok { + return fmt.Errorf("no topic name registered for topic key %s", consumer.TopicKeyLog) + } + + if err := q.Publisher().Publish(ctx, topicName, msg); err != nil { + return fmt.Errorf("failed to publish message: %w", err) + } + + return nil +} + // Name returns the controller name for logging and metrics. func (c *Controller) Name() string { return "request" diff --git a/orchestrator/controller/request/request_test.go b/orchestrator/controller/request/request_test.go index 748efa5c..e37a78c0 100644 --- a/orchestrator/controller/request/request_test.go +++ b/orchestrator/controller/request/request_test.go @@ -49,7 +49,10 @@ func newTestController(t *testing.T, ctrl *gomock.Controller, store *storagemock mockQ.EXPECT().Publisher().Return(mockPub).AnyTimes() registry, err := consumer.NewTopicRegistry( - []consumer.TopicConfig{{Key: consumer.TopicKeyValidate, Name: "validate", Queue: mockQ}}, + []consumer.TopicConfig{ + {Key: consumer.TopicKeyValidate, Name: "validate", Queue: mockQ}, + {Key: consumer.TopicKeyLog, Name: "log", Queue: mockQ}, + }, ) require.NoError(t, err)