-
Notifications
You must be signed in to change notification settings - Fork 0
feat(requestlog): Implement Request Log and state reconciliation #117
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,26 @@ | ||
| load("@rules_go//go:def.bzl", "go_library", "go_test") | ||
|
|
||
| go_library( | ||
| name = "request", | ||
| srcs = ["request.go"], | ||
| importpath = "github.com/uber/submitqueue/core/request", | ||
| visibility = ["//visibility:public"], | ||
| deps = [ | ||
| "//entity", | ||
| "//extension/storage", | ||
| ], | ||
| ) | ||
|
|
||
| go_test( | ||
| name = "request_test", | ||
| srcs = ["request_test.go"], | ||
| embed = [":request"], | ||
| deps = [ | ||
| "//entity", | ||
| "//extension/storage", | ||
| "//extension/storage/mock", | ||
| "@com_github_stretchr_testify//assert", | ||
| "@com_github_stretchr_testify//require", | ||
| "@org_uber_go_mock//gomock", | ||
| ], | ||
| ) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,76 @@ | ||
| package request | ||
|
|
||
| import ( | ||
| "context" | ||
| "fmt" | ||
|
|
||
| "github.com/uber/submitqueue/entity" | ||
| "github.com/uber/submitqueue/extension/storage" | ||
| ) | ||
|
|
||
| // CurrentState holds the current request status obtained from the request log. It is eventually consistent with the request status in the request store. It might take some time to converge, typically no more than a few seconds. | ||
| type CurrentState struct { | ||
| // Status is the current request status obtained from the request log. | ||
| Status string | ||
| // LastError is the last error associated with the current status. | ||
| LastError string | ||
| // Metadata is the metadata associated with the current status. | ||
| Metadata map[string]string | ||
| } | ||
|
|
||
| // GetCurrentStateFromRequestLog returns the current reconciled state for a request by reading the | ||
| // request log. Returns ErrNotFound if the request ID has no records in the log database. The state is eventually consistent with the request status in the request store. It might take some time to converge, typically no more than a few seconds. | ||
| func GetCurrentStateFromRequestLog(ctx context.Context, store storage.RequestLogStore, requestID string) (CurrentState, error) { | ||
| logs, err := store.List(ctx, requestID) | ||
| if err != nil { | ||
| return CurrentState{}, fmt.Errorf("failed to list request logs for request_id=%s: %w", requestID, err) | ||
| } | ||
|
|
||
| // Reconciliation strategy: | ||
| // | ||
| // Timestamps in request log records are client-generated and may not be consistent with the | ||
| // actual order of state modifications (e.g. clock skew, concurrent writers). Therefore we | ||
| // cannot rely on timestamps alone to determine the most current status. | ||
| // | ||
| // Records that originate from the Request entity carry a RequestVersion, which is | ||
| // monotonically incremented by the storage layer under optimistic locking. Version ordering | ||
| // is authoritative and guaranteed by the Request data model. | ||
| // | ||
| // The algorithm: | ||
| // 1. If any record has a terminal status (landed, error) AND a version (RequestVersion > 0), | ||
| // pick the one with the highest version. Timestamp breaks ties between equal versions, even though it should not happen. | ||
| // 2. Otherwise, fall back to the record with the largest timestamp. | ||
|
|
||
| var bestTerminal *entity.RequestLog | ||
| var bestLatest *entity.RequestLog | ||
|
|
||
| for i := range logs { | ||
| // iterate over all log records, storage contract guarantees that the records are ordered by timestamp ascending. | ||
| log := &logs[i] | ||
|
|
||
| // Track the record with the largest timestamp as fallback. | ||
| if bestLatest == nil || log.TimestampMs > bestLatest.TimestampMs { | ||
| bestLatest = log | ||
| } | ||
|
|
||
| // A terminal candidate must have a version from the Request entity and a terminal status. | ||
| if log.RequestVersion > 0 && entity.IsRequestStateTerminal(entity.RequestState(log.Status)) { | ||
| if bestTerminal == nil || | ||
| log.RequestVersion > bestTerminal.RequestVersion || | ||
| (log.RequestVersion == bestTerminal.RequestVersion && log.TimestampMs > bestTerminal.TimestampMs) { | ||
| bestTerminal = log | ||
| } | ||
| } | ||
| } | ||
|
|
||
| winner := bestLatest | ||
| if bestTerminal != nil { | ||
| winner = bestTerminal | ||
| } | ||
|
sbalabanov marked this conversation as resolved.
|
||
|
|
||
| return CurrentState{ | ||
| Status: winner.Status, | ||
| LastError: winner.LastError, | ||
| Metadata: winner.Metadata, | ||
| }, nil | ||
| } | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,113 @@ | ||
| package request | ||
|
|
||
| import ( | ||
| "context" | ||
| "fmt" | ||
| "testing" | ||
|
|
||
| "github.com/stretchr/testify/assert" | ||
| "github.com/stretchr/testify/require" | ||
| "go.uber.org/mock/gomock" | ||
|
|
||
| "github.com/uber/submitqueue/entity" | ||
| "github.com/uber/submitqueue/extension/storage" | ||
| storagemock "github.com/uber/submitqueue/extension/storage/mock" | ||
| ) | ||
|
|
||
| func TestGetCurrentStateFromRequestLog(t *testing.T) { | ||
| tests := []struct { | ||
| name string | ||
| logs []entity.RequestLog | ||
| expected CurrentState | ||
| }{ | ||
| { | ||
| name: "single record", | ||
| logs: []entity.RequestLog{ | ||
| {RequestID: "q/1", TimestampMs: 1000, Status: "new", RequestVersion: 1, LastError: "", Metadata: map[string]string{}}, | ||
| }, | ||
| expected: CurrentState{Status: "new", LastError: "", Metadata: map[string]string{}}, | ||
| }, | ||
| { | ||
| name: "terminal status wins over later non-terminal", | ||
| logs: []entity.RequestLog{ | ||
| {RequestID: "q/1", TimestampMs: 1000, Status: "new", RequestVersion: 1, LastError: "", Metadata: map[string]string{}}, | ||
| {RequestID: "q/1", TimestampMs: 2000, Status: "landed", RequestVersion: 3, LastError: "", Metadata: map[string]string{"batch": "b1"}}, | ||
| {RequestID: "q/1", TimestampMs: 3000, Status: "processing", RequestVersion: 0, LastError: "", Metadata: map[string]string{}}, | ||
| }, | ||
| expected: CurrentState{Status: "landed", LastError: "", Metadata: map[string]string{"batch": "b1"}}, | ||
| }, | ||
| { | ||
| name: "terminal error status with last error", | ||
| logs: []entity.RequestLog{ | ||
| {RequestID: "q/1", TimestampMs: 1000, Status: "new", RequestVersion: 1, LastError: "", Metadata: map[string]string{}}, | ||
| {RequestID: "q/1", TimestampMs: 2000, Status: "error", RequestVersion: 4, LastError: "merge conflict", Metadata: map[string]string{"step": "merge"}}, | ||
| }, | ||
| expected: CurrentState{Status: "error", LastError: "merge conflict", Metadata: map[string]string{"step": "merge"}}, | ||
| }, | ||
| { | ||
| name: "multiple terminal records picks highest version", | ||
| logs: []entity.RequestLog{ | ||
| {RequestID: "q/1", TimestampMs: 1000, Status: "error", RequestVersion: 2, LastError: "timeout", Metadata: map[string]string{}}, | ||
| {RequestID: "q/1", TimestampMs: 2000, Status: "landed", RequestVersion: 5, LastError: "", Metadata: map[string]string{"final": "true"}}, | ||
| {RequestID: "q/1", TimestampMs: 3000, Status: "error", RequestVersion: 3, LastError: "conflict", Metadata: map[string]string{}}, | ||
| }, | ||
| expected: CurrentState{Status: "landed", LastError: "", Metadata: map[string]string{"final": "true"}}, | ||
| }, | ||
| { | ||
| name: "same version terminal records uses timestamp tiebreaker", | ||
| logs: []entity.RequestLog{ | ||
| {RequestID: "q/1", TimestampMs: 1000, Status: "error", RequestVersion: 3, LastError: "first", Metadata: map[string]string{}}, | ||
| {RequestID: "q/1", TimestampMs: 2000, Status: "error", RequestVersion: 3, LastError: "second", Metadata: map[string]string{}}, | ||
| }, | ||
| expected: CurrentState{Status: "error", LastError: "second", Metadata: map[string]string{}}, | ||
| }, | ||
| { | ||
| name: "terminal status without version is not terminal", | ||
| logs: []entity.RequestLog{ | ||
| {RequestID: "q/1", TimestampMs: 1000, Status: "landed", RequestVersion: 0, LastError: "", Metadata: map[string]string{}}, | ||
| {RequestID: "q/1", TimestampMs: 2000, Status: "processing", RequestVersion: 0, LastError: "", Metadata: map[string]string{"source": "gw"}}, | ||
| }, | ||
| expected: CurrentState{Status: "processing", LastError: "", Metadata: map[string]string{"source": "gw"}}, | ||
| }, | ||
| { | ||
| name: "no terminal records falls back to latest timestamp", | ||
| logs: []entity.RequestLog{ | ||
| {RequestID: "q/1", TimestampMs: 1000, Status: "new", RequestVersion: 1, LastError: "", Metadata: map[string]string{}}, | ||
| {RequestID: "q/1", TimestampMs: 3000, Status: "validated", RequestVersion: 2, LastError: "", Metadata: map[string]string{}}, | ||
| {RequestID: "q/1", TimestampMs: 2000, Status: "processing", RequestVersion: 0, LastError: "", Metadata: map[string]string{}}, | ||
| }, | ||
| expected: CurrentState{Status: "validated", LastError: "", Metadata: map[string]string{}}, | ||
| }, | ||
| } | ||
|
|
||
| for _, tt := range tests { | ||
| t.Run(tt.name, func(t *testing.T) { | ||
| ctrl := gomock.NewController(t) | ||
| mockStore := storagemock.NewMockRequestLogStore(ctrl) | ||
| mockStore.EXPECT().List(gomock.Any(), "q/1").Return(tt.logs, nil) | ||
|
|
||
| result, err := GetCurrentStateFromRequestLog(context.Background(), mockStore, "q/1") | ||
| require.NoError(t, err) | ||
| assert.Equal(t, tt.expected, result) | ||
| }) | ||
| } | ||
| } | ||
|
|
||
| func TestGetCurrentStateFromRequestLog_NoRecords(t *testing.T) { | ||
| ctrl := gomock.NewController(t) | ||
| mockStore := storagemock.NewMockRequestLogStore(ctrl) | ||
| mockStore.EXPECT().List(gomock.Any(), "q/1").Return(nil, storage.ErrNotFound) | ||
|
|
||
| _, err := GetCurrentStateFromRequestLog(context.Background(), mockStore, "q/1") | ||
| assert.Error(t, err) | ||
| assert.True(t, storage.IsNotFound(err)) | ||
| } | ||
|
|
||
| func TestGetCurrentStateFromRequestLog_StoreError(t *testing.T) { | ||
| ctrl := gomock.NewController(t) | ||
| mockStore := storagemock.NewMockRequestLogStore(ctrl) | ||
| mockStore.EXPECT().List(gomock.Any(), "q/1").Return(nil, fmt.Errorf("db connection lost")) | ||
|
|
||
| _, err := GetCurrentStateFromRequestLog(context.Background(), mockStore, "q/1") | ||
| assert.Error(t, err) | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,62 @@ | ||
| package entity | ||
|
|
||
| import ( | ||
| "encoding/json" | ||
| "time" | ||
| ) | ||
|
|
||
| // RequestLog is an append-only record that captures a point-in-time snapshot of a request's status | ||
| // for reconciliation purposes. It is stored in a separate database from the request store to support | ||
| // eventual consistency reconciliation. | ||
| type RequestLog struct { | ||
| // RequestID is the ID of the request this log entry belongs to. References entity.Request.ID. | ||
| RequestID string `json:"request_id"` | ||
| // TimestampMs is the time this log entry was created, in milliseconds since Unix epoch. | ||
| TimestampMs int64 `json:"timestamp_ms"` | ||
| // Status is the request status at the time this log entry was created. It does not have to correspond to the request status. For example, it may contain intermediate statuses like "validated" or "processing". | ||
| Status string `json:"status"` | ||
| // RequestVersion is the version of the request at the time this log entry was created. | ||
| // Zero if the version is not available. | ||
| RequestVersion int32 `json:"request_version"` | ||
|
sbalabanov marked this conversation as resolved.
|
||
| // LastError is the last error message associated with the status at the time of this log entry. | ||
| // Empty string if no error. | ||
| LastError string `json:"last_error"` | ||
| // Metadata is a set of key-value pairs providing additional context for this log entry. | ||
| // Empty map if no metadata. | ||
| Metadata map[string]string `json:"metadata"` | ||
| } | ||
|
|
||
| // NewRequestLog creates a new RequestLog with the given fields. | ||
| // TimestampMs is set to the current time. If metadata is nil, it will be initialized as an empty map. | ||
| func NewRequestLog(requestID string, status string, requestVersion int32, lastError string, metadata map[string]string) RequestLog { | ||
| if metadata == nil { | ||
| metadata = make(map[string]string) | ||
| } | ||
| return RequestLog{ | ||
| RequestID: requestID, | ||
| TimestampMs: time.Now().UnixMilli(), | ||
| Status: status, | ||
| RequestVersion: requestVersion, | ||
| LastError: lastError, | ||
| Metadata: metadata, | ||
| } | ||
| } | ||
|
|
||
| // ToBytes serializes the RequestLog to JSON bytes for queue message payload. | ||
| func (r RequestLog) ToBytes() ([]byte, error) { | ||
| return json.Marshal(r) | ||
| } | ||
|
|
||
| // RequestLogFromBytes deserializes a RequestLog from JSON bytes. | ||
| // If metadata is absent from the JSON, it will be initialized as an empty map. | ||
| func RequestLogFromBytes(data []byte) (RequestLog, error) { | ||
| var log RequestLog | ||
| err := json.Unmarshal(data, &log) | ||
| if err != nil { | ||
| return log, err | ||
| } | ||
| if log.Metadata == nil { | ||
| log.Metadata = make(map[string]string) | ||
| } | ||
| return log, nil | ||
| } | ||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.