Skip to content

Commit 38c4c9e

Browse files
committed
feat(requestlog): Request Log Event Bus, States and Statuses
1 parent b7b27ab commit 38c4c9e

15 files changed

Lines changed: 422 additions & 37 deletions

File tree

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ MODULE.bazel.lock
55
.vscode/
66
.idea/
77
.claude/
8+
.mcp.json
89
.ijwb/
910

1011

core/consumer/registry.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@ const (
4646
TopicKeyMerge TopicKey = "merge"
4747
// TopicKeyConclude is the pipeline stage where merged requests are published for conclusion.
4848
TopicKeyConclude TopicKey = "conclude"
49+
// TopicKeyLog is the pipeline stage where per-request logs are written.
50+
TopicKeyLog TopicKey = "log"
4951
)
5052

5153
// String returns the topic key as a string.

core/request/request.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import (
2525
// CurrentState holds the current request status obtained from the request log. It is eventually consistent with the request status in the request store. It might take some time to converge, typically no more than a few seconds.
2626
type CurrentState struct {
2727
// Status is the current request status obtained from the request log.
28-
Status string
28+
Status entity.RequestStatus
2929
// LastError is the last error associated with the current status.
3030
LastError string
3131
// Metadata is the metadata associated with the current status.
@@ -68,7 +68,7 @@ func GetCurrentStateFromRequestLog(ctx context.Context, store storage.RequestLog
6868
}
6969

7070
// A terminal candidate must have a version from the Request entity and a terminal status.
71-
if log.RequestVersion > 0 && entity.IsRequestStateTerminal(entity.RequestState(log.Status)) {
71+
if log.RequestVersion > 0 && entity.IsRequestStateTerminal(entity.RequestState(string(log.Status))) {
7272
if bestTerminal == nil ||
7373
log.RequestVersion > bestTerminal.RequestVersion ||
7474
(log.RequestVersion == bestTerminal.RequestVersion && log.TimestampMs > bestTerminal.TimestampMs) {

core/request/request_test.go

Lines changed: 23 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -37,60 +37,60 @@ func TestGetCurrentStateFromRequestLog(t *testing.T) {
3737
{
3838
name: "single record",
3939
logs: []entity.RequestLog{
40-
{RequestID: "q/1", TimestampMs: 1000, Status: "new", RequestVersion: 1, LastError: "", Metadata: map[string]string{}},
40+
{RequestID: "q/1", TimestampMs: 1000, Status: entity.RequestStatusNew, RequestVersion: 1, LastError: "", Metadata: map[string]string{}},
4141
},
42-
expected: CurrentState{Status: "new", LastError: "", Metadata: map[string]string{}},
42+
expected: CurrentState{Status: entity.RequestStatusNew, LastError: "", Metadata: map[string]string{}},
4343
},
4444
{
4545
name: "terminal status wins over later non-terminal",
4646
logs: []entity.RequestLog{
47-
{RequestID: "q/1", TimestampMs: 1000, Status: "new", RequestVersion: 1, LastError: "", Metadata: map[string]string{}},
48-
{RequestID: "q/1", TimestampMs: 2000, Status: "landed", RequestVersion: 3, LastError: "", Metadata: map[string]string{"batch": "b1"}},
49-
{RequestID: "q/1", TimestampMs: 3000, Status: "processing", RequestVersion: 0, LastError: "", Metadata: map[string]string{}},
47+
{RequestID: "q/1", TimestampMs: 1000, Status: entity.RequestStatusNew, RequestVersion: 1, LastError: "", Metadata: map[string]string{}},
48+
{RequestID: "q/1", TimestampMs: 2000, Status: entity.RequestStatusLanded, RequestVersion: 3, LastError: "", Metadata: map[string]string{"batch": "b1"}},
49+
{RequestID: "q/1", TimestampMs: 3000, Status: entity.RequestStatusProcessing, RequestVersion: 0, LastError: "", Metadata: map[string]string{}},
5050
},
51-
expected: CurrentState{Status: "landed", LastError: "", Metadata: map[string]string{"batch": "b1"}},
51+
expected: CurrentState{Status: entity.RequestStatusLanded, LastError: "", Metadata: map[string]string{"batch": "b1"}},
5252
},
5353
{
5454
name: "terminal error status with last error",
5555
logs: []entity.RequestLog{
56-
{RequestID: "q/1", TimestampMs: 1000, Status: "new", RequestVersion: 1, LastError: "", Metadata: map[string]string{}},
57-
{RequestID: "q/1", TimestampMs: 2000, Status: "error", RequestVersion: 4, LastError: "merge conflict", Metadata: map[string]string{"step": "merge"}},
56+
{RequestID: "q/1", TimestampMs: 1000, Status: entity.RequestStatusNew, RequestVersion: 1, LastError: "", Metadata: map[string]string{}},
57+
{RequestID: "q/1", TimestampMs: 2000, Status: entity.RequestStatusError, RequestVersion: 4, LastError: "merge conflict", Metadata: map[string]string{"step": "merge"}},
5858
},
59-
expected: CurrentState{Status: "error", LastError: "merge conflict", Metadata: map[string]string{"step": "merge"}},
59+
expected: CurrentState{Status: entity.RequestStatusError, LastError: "merge conflict", Metadata: map[string]string{"step": "merge"}},
6060
},
6161
{
6262
name: "multiple terminal records picks highest version",
6363
logs: []entity.RequestLog{
64-
{RequestID: "q/1", TimestampMs: 1000, Status: "error", RequestVersion: 2, LastError: "timeout", Metadata: map[string]string{}},
65-
{RequestID: "q/1", TimestampMs: 2000, Status: "landed", RequestVersion: 5, LastError: "", Metadata: map[string]string{"final": "true"}},
66-
{RequestID: "q/1", TimestampMs: 3000, Status: "error", RequestVersion: 3, LastError: "conflict", Metadata: map[string]string{}},
64+
{RequestID: "q/1", TimestampMs: 1000, Status: entity.RequestStatusError, RequestVersion: 2, LastError: "timeout", Metadata: map[string]string{}},
65+
{RequestID: "q/1", TimestampMs: 2000, Status: entity.RequestStatusLanded, RequestVersion: 5, LastError: "", Metadata: map[string]string{"final": "true"}},
66+
{RequestID: "q/1", TimestampMs: 3000, Status: entity.RequestStatusError, RequestVersion: 3, LastError: "conflict", Metadata: map[string]string{}},
6767
},
68-
expected: CurrentState{Status: "landed", LastError: "", Metadata: map[string]string{"final": "true"}},
68+
expected: CurrentState{Status: entity.RequestStatusLanded, LastError: "", Metadata: map[string]string{"final": "true"}},
6969
},
7070
{
7171
name: "same version terminal records uses timestamp tiebreaker",
7272
logs: []entity.RequestLog{
73-
{RequestID: "q/1", TimestampMs: 1000, Status: "error", RequestVersion: 3, LastError: "first", Metadata: map[string]string{}},
74-
{RequestID: "q/1", TimestampMs: 2000, Status: "error", RequestVersion: 3, LastError: "second", Metadata: map[string]string{}},
73+
{RequestID: "q/1", TimestampMs: 1000, Status: entity.RequestStatusError, RequestVersion: 3, LastError: "first", Metadata: map[string]string{}},
74+
{RequestID: "q/1", TimestampMs: 2000, Status: entity.RequestStatusError, RequestVersion: 3, LastError: "second", Metadata: map[string]string{}},
7575
},
76-
expected: CurrentState{Status: "error", LastError: "second", Metadata: map[string]string{}},
76+
expected: CurrentState{Status: entity.RequestStatusError, LastError: "second", Metadata: map[string]string{}},
7777
},
7878
{
7979
name: "terminal status without version is not terminal",
8080
logs: []entity.RequestLog{
81-
{RequestID: "q/1", TimestampMs: 1000, Status: "landed", RequestVersion: 0, LastError: "", Metadata: map[string]string{}},
82-
{RequestID: "q/1", TimestampMs: 2000, Status: "processing", RequestVersion: 0, LastError: "", Metadata: map[string]string{"source": "gw"}},
81+
{RequestID: "q/1", TimestampMs: 1000, Status: entity.RequestStatusLanded, RequestVersion: 0, LastError: "", Metadata: map[string]string{}},
82+
{RequestID: "q/1", TimestampMs: 2000, Status: entity.RequestStatusProcessing, RequestVersion: 0, LastError: "", Metadata: map[string]string{"source": "gw"}},
8383
},
84-
expected: CurrentState{Status: "processing", LastError: "", Metadata: map[string]string{"source": "gw"}},
84+
expected: CurrentState{Status: entity.RequestStatusProcessing, LastError: "", Metadata: map[string]string{"source": "gw"}},
8585
},
8686
{
8787
name: "no terminal records falls back to latest timestamp",
8888
logs: []entity.RequestLog{
89-
{RequestID: "q/1", TimestampMs: 1000, Status: "new", RequestVersion: 1, LastError: "", Metadata: map[string]string{}},
90-
{RequestID: "q/1", TimestampMs: 3000, Status: "validated", RequestVersion: 2, LastError: "", Metadata: map[string]string{}},
91-
{RequestID: "q/1", TimestampMs: 2000, Status: "processing", RequestVersion: 0, LastError: "", Metadata: map[string]string{}},
89+
{RequestID: "q/1", TimestampMs: 1000, Status: entity.RequestStatusNew, RequestVersion: 1, LastError: "", Metadata: map[string]string{}},
90+
{RequestID: "q/1", TimestampMs: 3000, Status: entity.RequestStatusValidated, RequestVersion: 2, LastError: "", Metadata: map[string]string{}},
91+
{RequestID: "q/1", TimestampMs: 2000, Status: entity.RequestStatusProcessing, RequestVersion: 0, LastError: "", Metadata: map[string]string{}},
9292
},
93-
expected: CurrentState{Status: "validated", LastError: "", Metadata: map[string]string{}},
93+
expected: CurrentState{Status: entity.RequestStatusValidated, LastError: "", Metadata: map[string]string{}},
9494
},
9595
}
9696

entity/request.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ const (
3030
RequestLandStrategyMerge RequestLandStrategy = "merge"
3131
)
3232

33-
// RequestState defines the possible states of a land request.
33+
// RequestState defines the possible states of a land request. They are internal and used to implement a state machine. A separate RequestStatus type is used to track the customer-friendly status of a request.
3434
type RequestState string
3535

3636
const (

entity/request_log.go

Lines changed: 64 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,64 @@ import (
1919
"time"
2020
)
2121

22+
// RequestLogStatus defines the possible status of a request. Status is customer-friendly and can be displayed to the user.
23+
// It is different from the request state, which is internal and used to implement a state machine. Request statuses can be
24+
// generally added freely by the system without breaking the state machine.
25+
// Some statuses correspond to the request state, in which case they should be supplemented with the request state version to be used for reconciliation.
26+
// Other statuses are purely informational and can be added freely.
27+
// Every status may be accompanied by a last error message and free-formmetadata in the Request Log. It will only be used for display or debugging purposes.
28+
type RequestStatus string
29+
30+
const (
31+
// RequestStatusUnknown is the unknown sentinel status. It is set by default when the structure is initialized. It should never be seen in the system.
32+
RequestStatusUnknown RequestStatus = ""
33+
34+
// RequestStatusAccepted indicates that the request has been accepted by the system. Typically a gateway service will set this status when the land request is received and persisted to the logging database.
35+
RequestStatusAccepted RequestStatus = "accepted"
36+
37+
// RequestStatusNew is the initial status of a request. It corresponds to the RequestStateNew state and typically set by the orchestrator service when the request is received and persisted to the operating database.
38+
RequestStatusNew RequestStatus = "new"
39+
40+
// RequestStatusValidating indicates that the request is currently being validated (e.g., duplicate check, merge check, etc.).
41+
RequestStatusValidating RequestStatus = "validating"
42+
43+
// RequestStatusValidated indicates that the request has been validated (duplicate check, merge check etc.) successfully. It corresponds to the RequestStateValidated state.
44+
RequestStatusValidated RequestStatus = "validated"
45+
46+
// RequestStatusBatching indicates that the request is waiting to be included in a batch.
47+
RequestStatusBatching RequestStatus = "batching"
48+
49+
// RequestStatusBatched indicates that the request has been included in a new batch and will be sent to speculation.
50+
RequestStatusBatched RequestStatus = "batched"
51+
52+
// RequestStatusSpeculating indicates that the request is currently being speculated (e.g., speculative merge/rebase, etc.).
53+
RequestStatusSpeculating RequestStatus = "speculating"
54+
55+
// RequestStatusSpeculated indicates that the request has been successfully speculated and is ready to be validated via a build system.
56+
RequestStatusSpeculated RequestStatus = "speculated"
57+
58+
// RequestStatusBuilding indicates that the request is currently being built (e.g., CI/CD system is building the change on top of the speculation path).
59+
RequestStatusBuilding RequestStatus = "building"
60+
61+
// RequestStatusBuilt indicates that the request has finished the build step successfully and can move to the next phase, either wait for other requests to finish or move to the land phase.
62+
RequestStatusBuilt RequestStatus = "built"
63+
64+
// RequestStatusWaitingPath indicates that the request is waiting for other preceiding request in the same speculation path to finish.
65+
RequestStatusWaitingPath RequestStatus = "waitingpath"
66+
67+
// RequestStatusLanding indicates that the request is actively being landed (e.g., source control operation is in progress to push the change to the target branch).
68+
RequestStatusLanding RequestStatus = "landing"
69+
70+
// RequestStatusProcessing is the status of a request that is being processed. It corresponds to the RequestStateProcessing state.
71+
RequestStatusProcessing RequestStatus = "processing"
72+
73+
// RequestStatusLanded indicates that the request has been successfully processed and landed. It corresponds to the RequestStateLanded state.
74+
RequestStatusLanded RequestStatus = "landed"
75+
76+
// RequestStatusError indicates that the request has encountered an error. It corresponds to the RequestStateError state.
77+
RequestStatusError RequestStatus = "error"
78+
)
79+
2280
// RequestLog is an append-only record that captures a point-in-time snapshot of a request's status
2381
// for reconciliation purposes. It is stored in a separate database from the request store to support
2482
// eventual consistency reconciliation.
@@ -27,8 +85,8 @@ type RequestLog struct {
2785
RequestID string `json:"request_id"`
2886
// TimestampMs is the time this log entry was created, in milliseconds since Unix epoch.
2987
TimestampMs int64 `json:"timestamp_ms"`
30-
// Status is the request status at the time this log entry was created. It does not have to correspond to the request status. For example, it may contain intermediate statuses like "validated" or "processing".
31-
Status string `json:"status"`
88+
// Status is the request status at the time this log entry was created. It may contain requests states from the state machine and also display-friendly intermediate statuses.
89+
Status RequestStatus `json:"status"`
3290
// RequestVersion is the version of the request at the time this log entry was created.
3391
// Zero if the version is not available.
3492
RequestVersion int32 `json:"request_version"`
@@ -42,7 +100,10 @@ type RequestLog struct {
42100

43101
// NewRequestLog creates a new RequestLog with the given fields.
44102
// TimestampMs is set to the current time. If metadata is nil, it will be initialized as an empty map.
45-
func NewRequestLog(requestID string, status string, requestVersion int32, lastError string, metadata map[string]string) RequestLog {
103+
// requestVersion is the version of the request entity, should only be set if reporting a request state as a status, otherwise it should be 0.
104+
// lastError is the last error message associated with the status at the time of this log entry, empty string if no error.
105+
// metadata is a set of key-value pairs providing additional context for this log entry. Not constrained to any specific format or schema, used for display or debugging purposes.
106+
func NewRequestLog(requestID string, status RequestStatus, requestVersion int32, lastError string, metadata map[string]string) RequestLog {
46107
if metadata == nil {
47108
metadata = make(map[string]string)
48109
}

entity/request_log_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import (
2222
)
2323

2424
func TestNewRequestLog_NilMetadata(t *testing.T) {
25-
log := NewRequestLog("queue1/100", "new", 0, "", nil)
25+
log := NewRequestLog("queue1/100", RequestStatusNew, 0, "", nil)
2626

2727
assert.NotNil(t, log.Metadata)
2828
assert.Empty(t, log.Metadata)
@@ -32,7 +32,7 @@ func TestRequestLog_ToBytes(t *testing.T) {
3232
log := RequestLog{
3333
RequestID: "test-queue/123",
3434
TimestampMs: 1709568000000,
35-
Status: "new",
35+
Status: RequestStatusNew,
3636
RequestVersion: 1,
3737
LastError: "",
3838
Metadata: map[string]string{"source": "gateway"},
@@ -52,7 +52,7 @@ func TestRequestLogFromBytes(t *testing.T) {
5252
original := RequestLog{
5353
RequestID: "my-queue/999",
5454
TimestampMs: 1709568000000,
55-
Status: "processing",
55+
Status: RequestStatusProcessing,
5656
RequestVersion: 3,
5757
LastError: "timeout",
5858
Metadata: map[string]string{"step": "validation", "attempt": "2"},
@@ -104,7 +104,7 @@ func TestRequestLog_SerializationRoundTrip(t *testing.T) {
104104
log: RequestLog{
105105
RequestID: "queue1/100",
106106
TimestampMs: 1709568000000,
107-
Status: "landed",
107+
Status: RequestStatusLanded,
108108
RequestVersion: 5,
109109
LastError: "",
110110
Metadata: map[string]string{"source": "orchestrator", "batch_id": "b-1"},
@@ -115,7 +115,7 @@ func TestRequestLog_SerializationRoundTrip(t *testing.T) {
115115
log: RequestLog{
116116
RequestID: "queue2/200",
117117
TimestampMs: 1709568001000,
118-
Status: "error",
118+
Status: RequestStatusError,
119119
RequestVersion: 2,
120120
LastError: "merge conflict detected",
121121
Metadata: map[string]string{},
@@ -126,7 +126,7 @@ func TestRequestLog_SerializationRoundTrip(t *testing.T) {
126126
log: RequestLog{
127127
RequestID: "queue3/300",
128128
TimestampMs: 1709568002000,
129-
Status: "new",
129+
Status: RequestStatusNew,
130130
RequestVersion: 0,
131131
LastError: "",
132132
Metadata: map[string]string{"key": "value"},

example/server/orchestrator/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ go_library(
2525
"//orchestrator/controller/build",
2626
"//orchestrator/controller/buildsignal",
2727
"//orchestrator/controller/conclude",
28+
"//orchestrator/controller/log",
2829
"//orchestrator/controller/merge",
2930
"//orchestrator/controller/request",
3031
"//orchestrator/controller/score",

example/server/orchestrator/main.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ import (
4343
"github.com/uber/submitqueue/orchestrator/controller/build"
4444
"github.com/uber/submitqueue/orchestrator/controller/buildsignal"
4545
"github.com/uber/submitqueue/orchestrator/controller/conclude"
46+
logctrl "github.com/uber/submitqueue/orchestrator/controller/log"
4647
"github.com/uber/submitqueue/orchestrator/controller/merge"
4748
"github.com/uber/submitqueue/orchestrator/controller/request"
4849
"github.com/uber/submitqueue/orchestrator/controller/score"
@@ -353,6 +354,14 @@ func newTopicRegistry(q extqueue.Queue, subscriberName string) (consumer.TopicRe
353354
subscriberName, "orchestrator-conclude",
354355
),
355356
},
357+
{
358+
Key: consumer.TopicKeyLog,
359+
Name: "log",
360+
Queue: q,
361+
Subscription: extqueue.DefaultSubscriptionConfig(
362+
subscriberName, "orchestrator-log",
363+
),
364+
},
356365
})
357366
}
358367

@@ -470,6 +479,17 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t
470479
return fmt.Errorf("failed to register conclude controller: %w", err)
471480
}
472481

482+
logController := logctrl.NewController(
483+
logger,
484+
scope,
485+
store,
486+
consumer.TopicKeyLog,
487+
"orchestrator-log",
488+
)
489+
if err := c.Register(logController); err != nil {
490+
return fmt.Errorf("failed to register log controller: %w", err)
491+
}
492+
473493
return nil
474494
}
475495

gateway/controller/land.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,9 @@ func (c *LandController) Land(ctx context.Context, req *pb.LandRequest) (*pb.Lan
109109
}
110110

111111
// Record the accepted status in the request log for reconciliation. Once the request materializes as a Request entity, the status might be updated to "new".
112-
logEntry := entity.NewRequestLog(request.ID, "accepted", 0, "", nil)
112+
// It is important to record the status before publishing to the queue for processing. It is important to publish straight to the database and not via a queue.
113+
// Gateway has to stay consistent with the request log.
114+
logEntry := entity.NewRequestLog(request.ID, entity.RequestStatusAccepted, 0, "", nil)
113115
if err := c.requestLogStore.Insert(ctx, logEntry); err != nil {
114116
return nil, fmt.Errorf("LandController failed to insert request log for sqid=%s: %w", request.ID, err)
115117
}

0 commit comments

Comments
 (0)