Skip to content

Commit d930811

Browse files
authored
Merge branch 'main' into manjari/batch-controller
2 parents e505f38 + 555d294 commit d930811

30 files changed

Lines changed: 735 additions & 145 deletions

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ make run-client-gateway MESSAGE="hello"
3939
go run example/client/gateway/main.go -message "hello"
4040

4141
# Or using grpcurl
42-
grpcurl -plaintext -d '{"message": "hello"}' localhost:8081 uber.devexp.submitqueue.gateway.SubmitQueueGateway/Ping
42+
grpcurl -plaintext -d '{"message": "hello"}' localhost:8081 uber.submitqueue.gateway.SubmitQueueGateway/Ping
4343
```
4444

4545
For detailed instructions, see [example/README.md](example/README.md).
@@ -336,7 +336,7 @@ make proto
336336
3. **Or use grpcurl:**
337337
```bash
338338
grpcurl -plaintext -d '{"message": "hello"}' \
339-
localhost:8081 uber.devexp.submitqueue.gateway.SubmitQueueGateway/Ping
339+
localhost:8081 uber.submitqueue.gateway.SubmitQueueGateway/Ping
340340
```
341341

342342
#### Testing All Services

core/request/BUILD.bazel

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
load("@rules_go//go:def.bzl", "go_library", "go_test")
2+
3+
go_library(
4+
name = "request",
5+
srcs = ["request.go"],
6+
importpath = "github.com/uber/submitqueue/core/request",
7+
visibility = ["//visibility:public"],
8+
deps = [
9+
"//entity",
10+
"//extension/storage",
11+
],
12+
)
13+
14+
go_test(
15+
name = "request_test",
16+
srcs = ["request_test.go"],
17+
embed = [":request"],
18+
deps = [
19+
"//entity",
20+
"//extension/storage",
21+
"//extension/storage/mock",
22+
"@com_github_stretchr_testify//assert",
23+
"@com_github_stretchr_testify//require",
24+
"@org_uber_go_mock//gomock",
25+
],
26+
)

core/request/request.go

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
package request
2+
3+
import (
4+
"context"
5+
"fmt"
6+
7+
"github.com/uber/submitqueue/entity"
8+
"github.com/uber/submitqueue/extension/storage"
9+
)
10+
11+
// CurrentState holds the current request status obtained from the request log. It is eventually consistent with the request status in the request store. It might take some time to converge, typically no more than a few seconds.
12+
type CurrentState struct {
13+
// Status is the current request status obtained from the request log.
14+
Status string
15+
// LastError is the last error associated with the current status.
16+
LastError string
17+
// Metadata is the metadata associated with the current status.
18+
Metadata map[string]string
19+
}
20+
21+
// GetCurrentStateFromRequestLog returns the current reconciled state for a request by reading the
22+
// request log. Returns ErrNotFound if the request ID has no records in the log database. The state is eventually consistent with the request status in the request store. It might take some time to converge, typically no more than a few seconds.
23+
func GetCurrentStateFromRequestLog(ctx context.Context, store storage.RequestLogStore, requestID string) (CurrentState, error) {
24+
logs, err := store.List(ctx, requestID)
25+
if err != nil {
26+
return CurrentState{}, fmt.Errorf("failed to list request logs for request_id=%s: %w", requestID, err)
27+
}
28+
29+
// Reconciliation strategy:
30+
//
31+
// Timestamps in request log records are client-generated and may not be consistent with the
32+
// actual order of state modifications (e.g. clock skew, concurrent writers). Therefore we
33+
// cannot rely on timestamps alone to determine the most current status.
34+
//
35+
// Records that originate from the Request entity carry a RequestVersion, which is
36+
// monotonically incremented by the storage layer under optimistic locking. Version ordering
37+
// is authoritative and guaranteed by the Request data model.
38+
//
39+
// The algorithm:
40+
// 1. If any record has a terminal status (landed, error) AND a version (RequestVersion > 0),
41+
// pick the one with the highest version. Timestamp breaks ties between equal versions, even though it should not happen.
42+
// 2. Otherwise, fall back to the record with the largest timestamp.
43+
44+
var bestTerminal *entity.RequestLog
45+
var bestLatest *entity.RequestLog
46+
47+
for i := range logs {
48+
// iterate over all log records, storage contract guarantees that the records are ordered by timestamp ascending.
49+
log := &logs[i]
50+
51+
// Track the record with the largest timestamp as fallback.
52+
if bestLatest == nil || log.TimestampMs > bestLatest.TimestampMs {
53+
bestLatest = log
54+
}
55+
56+
// A terminal candidate must have a version from the Request entity and a terminal status.
57+
if log.RequestVersion > 0 && entity.IsRequestStateTerminal(entity.RequestState(log.Status)) {
58+
if bestTerminal == nil ||
59+
log.RequestVersion > bestTerminal.RequestVersion ||
60+
(log.RequestVersion == bestTerminal.RequestVersion && log.TimestampMs > bestTerminal.TimestampMs) {
61+
bestTerminal = log
62+
}
63+
}
64+
}
65+
66+
winner := bestLatest
67+
if bestTerminal != nil {
68+
winner = bestTerminal
69+
}
70+
71+
return CurrentState{
72+
Status: winner.Status,
73+
LastError: winner.LastError,
74+
Metadata: winner.Metadata,
75+
}, nil
76+
}

core/request/request_test.go

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
package request
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"testing"
7+
8+
"github.com/stretchr/testify/assert"
9+
"github.com/stretchr/testify/require"
10+
"go.uber.org/mock/gomock"
11+
12+
"github.com/uber/submitqueue/entity"
13+
"github.com/uber/submitqueue/extension/storage"
14+
storagemock "github.com/uber/submitqueue/extension/storage/mock"
15+
)
16+
17+
func TestGetCurrentStateFromRequestLog(t *testing.T) {
18+
tests := []struct {
19+
name string
20+
logs []entity.RequestLog
21+
expected CurrentState
22+
}{
23+
{
24+
name: "single record",
25+
logs: []entity.RequestLog{
26+
{RequestID: "q/1", TimestampMs: 1000, Status: "new", RequestVersion: 1, LastError: "", Metadata: map[string]string{}},
27+
},
28+
expected: CurrentState{Status: "new", LastError: "", Metadata: map[string]string{}},
29+
},
30+
{
31+
name: "terminal status wins over later non-terminal",
32+
logs: []entity.RequestLog{
33+
{RequestID: "q/1", TimestampMs: 1000, Status: "new", RequestVersion: 1, LastError: "", Metadata: map[string]string{}},
34+
{RequestID: "q/1", TimestampMs: 2000, Status: "landed", RequestVersion: 3, LastError: "", Metadata: map[string]string{"batch": "b1"}},
35+
{RequestID: "q/1", TimestampMs: 3000, Status: "processing", RequestVersion: 0, LastError: "", Metadata: map[string]string{}},
36+
},
37+
expected: CurrentState{Status: "landed", LastError: "", Metadata: map[string]string{"batch": "b1"}},
38+
},
39+
{
40+
name: "terminal error status with last error",
41+
logs: []entity.RequestLog{
42+
{RequestID: "q/1", TimestampMs: 1000, Status: "new", RequestVersion: 1, LastError: "", Metadata: map[string]string{}},
43+
{RequestID: "q/1", TimestampMs: 2000, Status: "error", RequestVersion: 4, LastError: "merge conflict", Metadata: map[string]string{"step": "merge"}},
44+
},
45+
expected: CurrentState{Status: "error", LastError: "merge conflict", Metadata: map[string]string{"step": "merge"}},
46+
},
47+
{
48+
name: "multiple terminal records picks highest version",
49+
logs: []entity.RequestLog{
50+
{RequestID: "q/1", TimestampMs: 1000, Status: "error", RequestVersion: 2, LastError: "timeout", Metadata: map[string]string{}},
51+
{RequestID: "q/1", TimestampMs: 2000, Status: "landed", RequestVersion: 5, LastError: "", Metadata: map[string]string{"final": "true"}},
52+
{RequestID: "q/1", TimestampMs: 3000, Status: "error", RequestVersion: 3, LastError: "conflict", Metadata: map[string]string{}},
53+
},
54+
expected: CurrentState{Status: "landed", LastError: "", Metadata: map[string]string{"final": "true"}},
55+
},
56+
{
57+
name: "same version terminal records uses timestamp tiebreaker",
58+
logs: []entity.RequestLog{
59+
{RequestID: "q/1", TimestampMs: 1000, Status: "error", RequestVersion: 3, LastError: "first", Metadata: map[string]string{}},
60+
{RequestID: "q/1", TimestampMs: 2000, Status: "error", RequestVersion: 3, LastError: "second", Metadata: map[string]string{}},
61+
},
62+
expected: CurrentState{Status: "error", LastError: "second", Metadata: map[string]string{}},
63+
},
64+
{
65+
name: "terminal status without version is not terminal",
66+
logs: []entity.RequestLog{
67+
{RequestID: "q/1", TimestampMs: 1000, Status: "landed", RequestVersion: 0, LastError: "", Metadata: map[string]string{}},
68+
{RequestID: "q/1", TimestampMs: 2000, Status: "processing", RequestVersion: 0, LastError: "", Metadata: map[string]string{"source": "gw"}},
69+
},
70+
expected: CurrentState{Status: "processing", LastError: "", Metadata: map[string]string{"source": "gw"}},
71+
},
72+
{
73+
name: "no terminal records falls back to latest timestamp",
74+
logs: []entity.RequestLog{
75+
{RequestID: "q/1", TimestampMs: 1000, Status: "new", RequestVersion: 1, LastError: "", Metadata: map[string]string{}},
76+
{RequestID: "q/1", TimestampMs: 3000, Status: "validated", RequestVersion: 2, LastError: "", Metadata: map[string]string{}},
77+
{RequestID: "q/1", TimestampMs: 2000, Status: "processing", RequestVersion: 0, LastError: "", Metadata: map[string]string{}},
78+
},
79+
expected: CurrentState{Status: "validated", LastError: "", Metadata: map[string]string{}},
80+
},
81+
}
82+
83+
for _, tt := range tests {
84+
t.Run(tt.name, func(t *testing.T) {
85+
ctrl := gomock.NewController(t)
86+
mockStore := storagemock.NewMockRequestLogStore(ctrl)
87+
mockStore.EXPECT().List(gomock.Any(), "q/1").Return(tt.logs, nil)
88+
89+
result, err := GetCurrentStateFromRequestLog(context.Background(), mockStore, "q/1")
90+
require.NoError(t, err)
91+
assert.Equal(t, tt.expected, result)
92+
})
93+
}
94+
}
95+
96+
func TestGetCurrentStateFromRequestLog_NoRecords(t *testing.T) {
97+
ctrl := gomock.NewController(t)
98+
mockStore := storagemock.NewMockRequestLogStore(ctrl)
99+
mockStore.EXPECT().List(gomock.Any(), "q/1").Return(nil, storage.ErrNotFound)
100+
101+
_, err := GetCurrentStateFromRequestLog(context.Background(), mockStore, "q/1")
102+
assert.Error(t, err)
103+
assert.True(t, storage.IsNotFound(err))
104+
}
105+
106+
func TestGetCurrentStateFromRequestLog_StoreError(t *testing.T) {
107+
ctrl := gomock.NewController(t)
108+
mockStore := storagemock.NewMockRequestLogStore(ctrl)
109+
mockStore.EXPECT().List(gomock.Any(), "q/1").Return(nil, fmt.Errorf("db connection lost"))
110+
111+
_, err := GetCurrentStateFromRequestLog(context.Background(), mockStore, "q/1")
112+
assert.Error(t, err)
113+
}

entity/BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ go_library(
99
"change_provider.go",
1010
"queue_config.go",
1111
"request.go",
12+
"request_log.go",
1213
"speculation_tree.go",
1314
],
1415
importpath = "github.com/uber/submitqueue/entity",
@@ -20,6 +21,7 @@ go_test(
2021
srcs = [
2122
"batch_test.go",
2223
"build_test.go",
24+
"request_log_test.go",
2325
"request_test.go",
2426
],
2527
embed = [":entity"],

entity/request.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,11 @@ const (
3535
RequestStateError RequestState = "error"
3636
)
3737

38+
// IsRequestStateTerminal returns true if the state represents a final, irreversible state (landed or error).
39+
func IsRequestStateTerminal(s RequestState) bool {
40+
return s == RequestStateLanded || s == RequestStateError
41+
}
42+
3843
// Change represents a code change identified by URIs from a code change provider (e.g., GitHub Pull Request, Phabricator Diff).
3944
// The provider is extracted from the URI scheme. The object is immutable after creation.
4045
type Change struct {

entity/request_log.go

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
package entity
2+
3+
import (
4+
"encoding/json"
5+
"time"
6+
)
7+
8+
// RequestLog is an append-only record that captures a point-in-time snapshot of a request's status
9+
// for reconciliation purposes. It is stored in a separate database from the request store to support
10+
// eventual consistency reconciliation.
11+
type RequestLog struct {
12+
// RequestID is the ID of the request this log entry belongs to. References entity.Request.ID.
13+
RequestID string `json:"request_id"`
14+
// TimestampMs is the time this log entry was created, in milliseconds since Unix epoch.
15+
TimestampMs int64 `json:"timestamp_ms"`
16+
// Status is the request status at the time this log entry was created. It does not have to correspond to the request status. For example, it may contain intermediate statuses like "validated" or "processing".
17+
Status string `json:"status"`
18+
// RequestVersion is the version of the request at the time this log entry was created.
19+
// Zero if the version is not available.
20+
RequestVersion int32 `json:"request_version"`
21+
// LastError is the last error message associated with the status at the time of this log entry.
22+
// Empty string if no error.
23+
LastError string `json:"last_error"`
24+
// Metadata is a set of key-value pairs providing additional context for this log entry.
25+
// Empty map if no metadata.
26+
Metadata map[string]string `json:"metadata"`
27+
}
28+
29+
// NewRequestLog creates a new RequestLog with the given fields.
30+
// TimestampMs is set to the current time. If metadata is nil, it will be initialized as an empty map.
31+
func NewRequestLog(requestID string, status string, requestVersion int32, lastError string, metadata map[string]string) RequestLog {
32+
if metadata == nil {
33+
metadata = make(map[string]string)
34+
}
35+
return RequestLog{
36+
RequestID: requestID,
37+
TimestampMs: time.Now().UnixMilli(),
38+
Status: status,
39+
RequestVersion: requestVersion,
40+
LastError: lastError,
41+
Metadata: metadata,
42+
}
43+
}
44+
45+
// ToBytes serializes the RequestLog to JSON bytes for queue message payload.
46+
func (r RequestLog) ToBytes() ([]byte, error) {
47+
return json.Marshal(r)
48+
}
49+
50+
// RequestLogFromBytes deserializes a RequestLog from JSON bytes.
51+
// If metadata is absent from the JSON, it will be initialized as an empty map.
52+
func RequestLogFromBytes(data []byte) (RequestLog, error) {
53+
var log RequestLog
54+
err := json.Unmarshal(data, &log)
55+
if err != nil {
56+
return log, err
57+
}
58+
if log.Metadata == nil {
59+
log.Metadata = make(map[string]string)
60+
}
61+
return log, nil
62+
}

0 commit comments

Comments
 (0)