Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ MODULE.bazel.lock
.vscode/
.idea/
.claude/
.mcp.json
.ijwb/


Expand Down
2 changes: 2 additions & 0 deletions core/consumer/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ const (
TopicKeyMerge TopicKey = "merge"
// TopicKeyConclude is the pipeline stage where merged requests are published for conclusion.
TopicKeyConclude TopicKey = "conclude"
// TopicKeyLog is the pipeline stage where per-request logs are written.
TopicKeyLog TopicKey = "log"
)

// String returns the topic key as a string.
Expand Down
4 changes: 2 additions & 2 deletions core/request/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
// CurrentState holds the current request status obtained from the request log. It is eventually consistent with the request status in the request store. It might take some time to converge, typically no more than a few seconds.
type CurrentState struct {
// Status is the current request status obtained from the request log.
Status string
Status entity.RequestStatus
// LastError is the last error associated with the current status.
LastError string
// Metadata is the metadata associated with the current status.
Expand Down Expand Up @@ -68,7 +68,7 @@ func GetCurrentStateFromRequestLog(ctx context.Context, store storage.RequestLog
}

// A terminal candidate must have a version from the Request entity and a terminal status.
if log.RequestVersion > 0 && entity.IsRequestStateTerminal(entity.RequestState(log.Status)) {
if log.RequestVersion > 0 && entity.IsRequestStateTerminal(entity.RequestState(string(log.Status))) {
if bestTerminal == nil ||
log.RequestVersion > bestTerminal.RequestVersion ||
(log.RequestVersion == bestTerminal.RequestVersion && log.TimestampMs > bestTerminal.TimestampMs) {
Expand Down
46 changes: 23 additions & 23 deletions core/request/request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,60 +37,60 @@ func TestGetCurrentStateFromRequestLog(t *testing.T) {
{
name: "single record",
logs: []entity.RequestLog{
{RequestID: "q/1", TimestampMs: 1000, Status: "new", RequestVersion: 1, LastError: "", Metadata: map[string]string{}},
{RequestID: "q/1", TimestampMs: 1000, Status: entity.RequestStatusNew, RequestVersion: 1, LastError: "", Metadata: map[string]string{}},
},
expected: CurrentState{Status: "new", LastError: "", Metadata: map[string]string{}},
expected: CurrentState{Status: entity.RequestStatusNew, LastError: "", Metadata: map[string]string{}},
},
{
name: "terminal status wins over later non-terminal",
logs: []entity.RequestLog{
{RequestID: "q/1", TimestampMs: 1000, Status: "new", RequestVersion: 1, LastError: "", Metadata: map[string]string{}},
{RequestID: "q/1", TimestampMs: 2000, Status: "landed", RequestVersion: 3, LastError: "", Metadata: map[string]string{"batch": "b1"}},
{RequestID: "q/1", TimestampMs: 3000, Status: "processing", RequestVersion: 0, LastError: "", Metadata: map[string]string{}},
{RequestID: "q/1", TimestampMs: 1000, Status: entity.RequestStatusNew, RequestVersion: 1, LastError: "", Metadata: map[string]string{}},
{RequestID: "q/1", TimestampMs: 2000, Status: entity.RequestStatusLanded, RequestVersion: 3, LastError: "", Metadata: map[string]string{"batch": "b1"}},
{RequestID: "q/1", TimestampMs: 3000, Status: entity.RequestStatusProcessing, RequestVersion: 0, LastError: "", Metadata: map[string]string{}},
},
expected: CurrentState{Status: "landed", LastError: "", Metadata: map[string]string{"batch": "b1"}},
expected: CurrentState{Status: entity.RequestStatusLanded, LastError: "", Metadata: map[string]string{"batch": "b1"}},
},
{
name: "terminal error status with last error",
logs: []entity.RequestLog{
{RequestID: "q/1", TimestampMs: 1000, Status: "new", RequestVersion: 1, LastError: "", Metadata: map[string]string{}},
{RequestID: "q/1", TimestampMs: 2000, Status: "error", RequestVersion: 4, LastError: "merge conflict", Metadata: map[string]string{"step": "merge"}},
{RequestID: "q/1", TimestampMs: 1000, Status: entity.RequestStatusNew, RequestVersion: 1, LastError: "", Metadata: map[string]string{}},
{RequestID: "q/1", TimestampMs: 2000, Status: entity.RequestStatusError, RequestVersion: 4, LastError: "merge conflict", Metadata: map[string]string{"step": "merge"}},
},
expected: CurrentState{Status: "error", LastError: "merge conflict", Metadata: map[string]string{"step": "merge"}},
expected: CurrentState{Status: entity.RequestStatusError, LastError: "merge conflict", Metadata: map[string]string{"step": "merge"}},
},
{
name: "multiple terminal records picks highest version",
logs: []entity.RequestLog{
{RequestID: "q/1", TimestampMs: 1000, Status: "error", RequestVersion: 2, LastError: "timeout", Metadata: map[string]string{}},
{RequestID: "q/1", TimestampMs: 2000, Status: "landed", RequestVersion: 5, LastError: "", Metadata: map[string]string{"final": "true"}},
{RequestID: "q/1", TimestampMs: 3000, Status: "error", RequestVersion: 3, LastError: "conflict", Metadata: map[string]string{}},
{RequestID: "q/1", TimestampMs: 1000, Status: entity.RequestStatusError, RequestVersion: 2, LastError: "timeout", Metadata: map[string]string{}},
{RequestID: "q/1", TimestampMs: 2000, Status: entity.RequestStatusLanded, RequestVersion: 5, LastError: "", Metadata: map[string]string{"final": "true"}},
{RequestID: "q/1", TimestampMs: 3000, Status: entity.RequestStatusError, RequestVersion: 3, LastError: "conflict", Metadata: map[string]string{}},
},
expected: CurrentState{Status: "landed", LastError: "", Metadata: map[string]string{"final": "true"}},
expected: CurrentState{Status: entity.RequestStatusLanded, LastError: "", Metadata: map[string]string{"final": "true"}},
},
{
name: "same version terminal records uses timestamp tiebreaker",
logs: []entity.RequestLog{
{RequestID: "q/1", TimestampMs: 1000, Status: "error", RequestVersion: 3, LastError: "first", Metadata: map[string]string{}},
{RequestID: "q/1", TimestampMs: 2000, Status: "error", RequestVersion: 3, LastError: "second", Metadata: map[string]string{}},
{RequestID: "q/1", TimestampMs: 1000, Status: entity.RequestStatusError, RequestVersion: 3, LastError: "first", Metadata: map[string]string{}},
{RequestID: "q/1", TimestampMs: 2000, Status: entity.RequestStatusError, RequestVersion: 3, LastError: "second", Metadata: map[string]string{}},
},
expected: CurrentState{Status: "error", LastError: "second", Metadata: map[string]string{}},
expected: CurrentState{Status: entity.RequestStatusError, LastError: "second", Metadata: map[string]string{}},
},
{
name: "terminal status without version is not terminal",
logs: []entity.RequestLog{
{RequestID: "q/1", TimestampMs: 1000, Status: "landed", RequestVersion: 0, LastError: "", Metadata: map[string]string{}},
{RequestID: "q/1", TimestampMs: 2000, Status: "processing", RequestVersion: 0, LastError: "", Metadata: map[string]string{"source": "gw"}},
{RequestID: "q/1", TimestampMs: 1000, Status: entity.RequestStatusLanded, RequestVersion: 0, LastError: "", Metadata: map[string]string{}},
{RequestID: "q/1", TimestampMs: 2000, Status: entity.RequestStatusProcessing, RequestVersion: 0, LastError: "", Metadata: map[string]string{"source": "gw"}},
},
expected: CurrentState{Status: "processing", LastError: "", Metadata: map[string]string{"source": "gw"}},
expected: CurrentState{Status: entity.RequestStatusProcessing, LastError: "", Metadata: map[string]string{"source": "gw"}},
},
{
name: "no terminal records falls back to latest timestamp",
logs: []entity.RequestLog{
{RequestID: "q/1", TimestampMs: 1000, Status: "new", RequestVersion: 1, LastError: "", Metadata: map[string]string{}},
{RequestID: "q/1", TimestampMs: 3000, Status: "validated", RequestVersion: 2, LastError: "", Metadata: map[string]string{}},
{RequestID: "q/1", TimestampMs: 2000, Status: "processing", RequestVersion: 0, LastError: "", Metadata: map[string]string{}},
{RequestID: "q/1", TimestampMs: 1000, Status: entity.RequestStatusNew, RequestVersion: 1, LastError: "", Metadata: map[string]string{}},
{RequestID: "q/1", TimestampMs: 3000, Status: entity.RequestStatusValidated, RequestVersion: 2, LastError: "", Metadata: map[string]string{}},
{RequestID: "q/1", TimestampMs: 2000, Status: entity.RequestStatusProcessing, RequestVersion: 0, LastError: "", Metadata: map[string]string{}},
},
expected: CurrentState{Status: "validated", LastError: "", Metadata: map[string]string{}},
expected: CurrentState{Status: entity.RequestStatusValidated, LastError: "", Metadata: map[string]string{}},
},
}

Expand Down
2 changes: 1 addition & 1 deletion entity/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ const (
RequestLandStrategyMerge RequestLandStrategy = "merge"
)

// RequestState defines the possible states of a land request.
// RequestState defines the possible states of a land request. They are internal and used to implement a state machine. A separate RequestStatus type is used to track the customer-friendly status of a request.
type RequestState string

const (
Expand Down
67 changes: 64 additions & 3 deletions entity/request_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,64 @@ import (
"time"
)

// RequestLogStatus defines the possible status of a request. Status is customer-friendly and can be displayed to the user.
// It is different from the request state, which is internal and used to implement a state machine. Request statuses can be
// generally added freely by the system without breaking the state machine.
// Some statuses correspond to the request state, in which case they should be supplemented with the request state version to be used for reconciliation.
// Other statuses are purely informational and can be added freely.
// Every status may be accompanied by a last error message and free-formmetadata in the Request Log. It will only be used for display or debugging purposes.
type RequestStatus string

const (
// RequestStatusUnknown is the unknown sentinel status. It is set by default when the structure is initialized. It should never be seen in the system.
RequestStatusUnknown RequestStatus = ""

// RequestStatusAccepted indicates that the request has been accepted by the system. Typically a gateway service will set this status when the land request is received and persisted to the logging database.
RequestStatusAccepted RequestStatus = "accepted"

// RequestStatusNew is the initial status of a request. It corresponds to the RequestStateNew state and typically set by the orchestrator service when the request is received and persisted to the operating database.
RequestStatusNew RequestStatus = "new"
Comment thread
sbalabanov marked this conversation as resolved.

// RequestStatusValidating indicates that the request is currently being validated (e.g., duplicate check, merge check, etc.).
RequestStatusValidating RequestStatus = "validating"

// RequestStatusValidated indicates that the request has been validated (duplicate check, merge check etc.) successfully. It corresponds to the RequestStateValidated state.
RequestStatusValidated RequestStatus = "validated"

// RequestStatusBatching indicates that the request is waiting to be included in a batch.
RequestStatusBatching RequestStatus = "batching"

// RequestStatusBatched indicates that the request has been included in a new batch and will be sent to speculation.
RequestStatusBatched RequestStatus = "batched"

// RequestStatusSpeculating indicates that the request is currently being speculated (e.g., speculative merge/rebase, etc.).
RequestStatusSpeculating RequestStatus = "speculating"

// RequestStatusSpeculated indicates that the request has been successfully speculated and is ready to be validated via a build system.
RequestStatusSpeculated RequestStatus = "speculated"

// RequestStatusBuilding indicates that the request is currently being built (e.g., CI/CD system is building the change on top of the speculation path).
RequestStatusBuilding RequestStatus = "building"

// RequestStatusBuilt indicates that the request has finished the build step successfully and can move to the next phase, either wait for other requests to finish or move to the land phase.
RequestStatusBuilt RequestStatus = "built"
Comment thread
sbalabanov marked this conversation as resolved.

// RequestStatusWaitingPath indicates that the request is waiting for other preceiding request in the same speculation path to finish.
RequestStatusWaitingPath RequestStatus = "waitingpath"

// RequestStatusLanding indicates that the request is actively being landed (e.g., source control operation is in progress to push the change to the target branch).
RequestStatusLanding RequestStatus = "landing"

// RequestStatusProcessing is the status of a request that is being processed. It corresponds to the RequestStateProcessing state.
RequestStatusProcessing RequestStatus = "processing"

Comment thread
sbalabanov marked this conversation as resolved.
// RequestStatusLanded indicates that the request has been successfully processed and landed. It corresponds to the RequestStateLanded state.
RequestStatusLanded RequestStatus = "landed"

// RequestStatusError indicates that the request has encountered an error. It corresponds to the RequestStateError state.
RequestStatusError RequestStatus = "error"
)

// RequestLog is an append-only record that captures a point-in-time snapshot of a request's status
// for reconciliation purposes. It is stored in a separate database from the request store to support
// eventual consistency reconciliation.
Expand All @@ -27,8 +85,8 @@ type RequestLog struct {
RequestID string `json:"request_id"`
// TimestampMs is the time this log entry was created, in milliseconds since Unix epoch.
TimestampMs int64 `json:"timestamp_ms"`
// Status is the request status at the time this log entry was created. It does not have to correspond to the request status. For example, it may contain intermediate statuses like "validated" or "processing".
Status string `json:"status"`
// Status is the request status at the time this log entry was created. It may contain requests states from the state machine and also display-friendly intermediate statuses.
Status RequestStatus `json:"status"`
// RequestVersion is the version of the request at the time this log entry was created.
// Zero if the version is not available.
RequestVersion int32 `json:"request_version"`
Expand All @@ -42,7 +100,10 @@ type RequestLog struct {

// NewRequestLog creates a new RequestLog with the given fields.
// TimestampMs is set to the current time. If metadata is nil, it will be initialized as an empty map.
func NewRequestLog(requestID string, status string, requestVersion int32, lastError string, metadata map[string]string) RequestLog {
// requestVersion is the version of the request entity, should only be set if reporting a request state as a status, otherwise it should be 0.
// lastError is the last error message associated with the status at the time of this log entry, empty string if no error.
// metadata is a set of key-value pairs providing additional context for this log entry. Not constrained to any specific format or schema, used for display or debugging purposes.
func NewRequestLog(requestID string, status RequestStatus, requestVersion int32, lastError string, metadata map[string]string) RequestLog {
if metadata == nil {
metadata = make(map[string]string)
}
Expand Down
12 changes: 6 additions & 6 deletions entity/request_log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
)

func TestNewRequestLog_NilMetadata(t *testing.T) {
log := NewRequestLog("queue1/100", "new", 0, "", nil)
log := NewRequestLog("queue1/100", RequestStatusNew, 0, "", nil)

assert.NotNil(t, log.Metadata)
assert.Empty(t, log.Metadata)
Expand All @@ -32,7 +32,7 @@ func TestRequestLog_ToBytes(t *testing.T) {
log := RequestLog{
RequestID: "test-queue/123",
TimestampMs: 1709568000000,
Status: "new",
Status: RequestStatusNew,
RequestVersion: 1,
LastError: "",
Metadata: map[string]string{"source": "gateway"},
Expand All @@ -52,7 +52,7 @@ func TestRequestLogFromBytes(t *testing.T) {
original := RequestLog{
RequestID: "my-queue/999",
TimestampMs: 1709568000000,
Status: "processing",
Status: RequestStatusProcessing,
RequestVersion: 3,
LastError: "timeout",
Metadata: map[string]string{"step": "validation", "attempt": "2"},
Expand Down Expand Up @@ -104,7 +104,7 @@ func TestRequestLog_SerializationRoundTrip(t *testing.T) {
log: RequestLog{
RequestID: "queue1/100",
TimestampMs: 1709568000000,
Status: "landed",
Status: RequestStatusLanded,
RequestVersion: 5,
LastError: "",
Metadata: map[string]string{"source": "orchestrator", "batch_id": "b-1"},
Expand All @@ -115,7 +115,7 @@ func TestRequestLog_SerializationRoundTrip(t *testing.T) {
log: RequestLog{
RequestID: "queue2/200",
TimestampMs: 1709568001000,
Status: "error",
Status: RequestStatusError,
RequestVersion: 2,
LastError: "merge conflict detected",
Metadata: map[string]string{},
Expand All @@ -126,7 +126,7 @@ func TestRequestLog_SerializationRoundTrip(t *testing.T) {
log: RequestLog{
RequestID: "queue3/300",
TimestampMs: 1709568002000,
Status: "new",
Status: RequestStatusNew,
RequestVersion: 0,
LastError: "",
Metadata: map[string]string{"key": "value"},
Expand Down
1 change: 1 addition & 0 deletions example/server/orchestrator/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ go_library(
"//orchestrator/controller/build",
"//orchestrator/controller/buildsignal",
"//orchestrator/controller/conclude",
"//orchestrator/controller/log",
"//orchestrator/controller/merge",
"//orchestrator/controller/request",
"//orchestrator/controller/score",
Expand Down
20 changes: 20 additions & 0 deletions example/server/orchestrator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"github.com/uber/submitqueue/orchestrator/controller/build"
"github.com/uber/submitqueue/orchestrator/controller/buildsignal"
"github.com/uber/submitqueue/orchestrator/controller/conclude"
logctrl "github.com/uber/submitqueue/orchestrator/controller/log"
"github.com/uber/submitqueue/orchestrator/controller/merge"
"github.com/uber/submitqueue/orchestrator/controller/request"
"github.com/uber/submitqueue/orchestrator/controller/score"
Expand Down Expand Up @@ -353,6 +354,14 @@ func newTopicRegistry(q extqueue.Queue, subscriberName string) (consumer.TopicRe
subscriberName, "orchestrator-conclude",
),
},
{
Key: consumer.TopicKeyLog,
Name: "log",
Queue: q,
Subscription: extqueue.DefaultSubscriptionConfig(
subscriberName, "orchestrator-log",
),
},
})
}

Expand Down Expand Up @@ -470,6 +479,17 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t
return fmt.Errorf("failed to register conclude controller: %w", err)
}

logController := logctrl.NewController(
logger,
scope,
store,
consumer.TopicKeyLog,
"orchestrator-log",
)
if err := c.Register(logController); err != nil {
return fmt.Errorf("failed to register log controller: %w", err)
}

return nil
}

Expand Down
4 changes: 3 additions & 1 deletion gateway/controller/land.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,9 @@ func (c *LandController) Land(ctx context.Context, req *pb.LandRequest) (*pb.Lan
}

// Record the accepted status in the request log for reconciliation. Once the request materializes as a Request entity, the status might be updated to "new".
logEntry := entity.NewRequestLog(request.ID, "accepted", 0, "", nil)
// It is important to record the status before publishing to the queue for processing. It is important to publish straight to the database and not via a queue.
// Gateway has to stay consistent with the request log.
logEntry := entity.NewRequestLog(request.ID, entity.RequestStatusAccepted, 0, "", nil)
if err := c.requestLogStore.Insert(ctx, logEntry); err != nil {
return nil, fmt.Errorf("LandController failed to insert request log for sqid=%s: %w", request.ID, err)
}
Expand Down
32 changes: 32 additions & 0 deletions orchestrator/controller/log/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
load("@rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "log",
srcs = ["log.go"],
importpath = "github.com/uber/submitqueue/orchestrator/controller/log",
visibility = ["//visibility:public"],
deps = [
"//core/consumer",
"//entity",
"//extension/storage",
"@com_github_uber_go_tally_v4//:tally",
"@org_uber_go_zap//:zap",
],
)

go_test(
name = "log_test",
srcs = ["log_test.go"],
embed = [":log"],
deps = [
"//core/consumer",
"//entity",
"//entity/queue",
"//extension/queue/mock",
"//extension/storage/mock",
"@com_github_stretchr_testify//require",
"@com_github_uber_go_tally_v4//:tally",
"@org_uber_go_mock//gomock",
"@org_uber_go_zap//zaptest",
],
)
Loading