Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions core/request/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
load("@rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "request",
srcs = ["request.go"],
importpath = "github.com/uber/submitqueue/core/request",
visibility = ["//visibility:public"],
deps = [
"//entity",
"//extension/storage",
],
)

go_test(
name = "request_test",
srcs = ["request_test.go"],
embed = [":request"],
deps = [
"//entity",
"//extension/storage",
"//extension/storage/mock",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
"@org_uber_go_mock//gomock",
],
)
76 changes: 76 additions & 0 deletions core/request/request.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package request

import (
"context"
"fmt"

"github.com/uber/submitqueue/entity"
"github.com/uber/submitqueue/extension/storage"
)

// CurrentState holds the current request status obtained from the request log. It is eventually consistent with the request status in the request store. It might take some time to converge, typically no more than a few seconds.
type CurrentState struct {
// Status is the current request status obtained from the request log.
Status string
// LastError is the last error associated with the current status.
LastError string
// Metadata is the metadata associated with the current status.
Metadata map[string]string
}

// GetCurrentStateFromRequestLog returns the current reconciled state for a request by reading the
// request log. Returns ErrNotFound if the request ID has no records in the log database. The state is eventually consistent with the request status in the request store. It might take some time to converge, typically no more than a few seconds.
func GetCurrentStateFromRequestLog(ctx context.Context, store storage.RequestLogStore, requestID string) (CurrentState, error) {
logs, err := store.List(ctx, requestID)
if err != nil {
return CurrentState{}, fmt.Errorf("failed to list request logs for request_id=%s: %w", requestID, err)
}

// Reconciliation strategy:
//
// Timestamps in request log records are client-generated and may not be consistent with the
// actual order of state modifications (e.g. clock skew, concurrent writers). Therefore we
// cannot rely on timestamps alone to determine the most current status.
//
// Records that originate from the Request entity carry a RequestVersion, which is
// monotonically incremented by the storage layer under optimistic locking. Version ordering
// is authoritative and guaranteed by the Request data model.
//
// The algorithm:
// 1. If any record has a terminal status (landed, error) AND a version (RequestVersion > 0),
// pick the one with the highest version. Timestamp breaks ties between equal versions, even though it should not happen.
// 2. Otherwise, fall back to the record with the largest timestamp.

var bestTerminal *entity.RequestLog
var bestLatest *entity.RequestLog

for i := range logs {
// iterate over all log records, storage contract guarantees that the records are ordered by timestamp ascending.
Comment thread
sbalabanov marked this conversation as resolved.
log := &logs[i]

// Track the record with the largest timestamp as fallback.
if bestLatest == nil || log.TimestampMs > bestLatest.TimestampMs {
bestLatest = log
}

// A terminal candidate must have a version from the Request entity and a terminal status.
if log.RequestVersion > 0 && entity.IsRequestStateTerminal(entity.RequestState(log.Status)) {
if bestTerminal == nil ||
log.RequestVersion > bestTerminal.RequestVersion ||
(log.RequestVersion == bestTerminal.RequestVersion && log.TimestampMs > bestTerminal.TimestampMs) {
bestTerminal = log
}
}
}

winner := bestLatest
if bestTerminal != nil {
winner = bestTerminal
}
Comment thread
sbalabanov marked this conversation as resolved.

return CurrentState{
Status: winner.Status,
LastError: winner.LastError,
Metadata: winner.Metadata,
}, nil
}
113 changes: 113 additions & 0 deletions core/request/request_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package request

import (
"context"
"fmt"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"

"github.com/uber/submitqueue/entity"
"github.com/uber/submitqueue/extension/storage"
storagemock "github.com/uber/submitqueue/extension/storage/mock"
)

func TestGetCurrentStateFromRequestLog(t *testing.T) {
tests := []struct {
name string
logs []entity.RequestLog
expected CurrentState
}{
{
name: "single record",
logs: []entity.RequestLog{
{RequestID: "q/1", TimestampMs: 1000, Status: "new", RequestVersion: 1, LastError: "", Metadata: map[string]string{}},
},
expected: CurrentState{Status: "new", LastError: "", Metadata: map[string]string{}},
},
{
name: "terminal status wins over later non-terminal",
logs: []entity.RequestLog{
{RequestID: "q/1", TimestampMs: 1000, Status: "new", RequestVersion: 1, LastError: "", Metadata: map[string]string{}},
{RequestID: "q/1", TimestampMs: 2000, Status: "landed", RequestVersion: 3, LastError: "", Metadata: map[string]string{"batch": "b1"}},
{RequestID: "q/1", TimestampMs: 3000, Status: "processing", RequestVersion: 0, LastError: "", Metadata: map[string]string{}},
},
expected: CurrentState{Status: "landed", LastError: "", Metadata: map[string]string{"batch": "b1"}},
},
{
name: "terminal error status with last error",
logs: []entity.RequestLog{
{RequestID: "q/1", TimestampMs: 1000, Status: "new", RequestVersion: 1, LastError: "", Metadata: map[string]string{}},
{RequestID: "q/1", TimestampMs: 2000, Status: "error", RequestVersion: 4, LastError: "merge conflict", Metadata: map[string]string{"step": "merge"}},
},
expected: CurrentState{Status: "error", LastError: "merge conflict", Metadata: map[string]string{"step": "merge"}},
},
{
name: "multiple terminal records picks highest version",
logs: []entity.RequestLog{
{RequestID: "q/1", TimestampMs: 1000, Status: "error", RequestVersion: 2, LastError: "timeout", Metadata: map[string]string{}},
{RequestID: "q/1", TimestampMs: 2000, Status: "landed", RequestVersion: 5, LastError: "", Metadata: map[string]string{"final": "true"}},
{RequestID: "q/1", TimestampMs: 3000, Status: "error", RequestVersion: 3, LastError: "conflict", Metadata: map[string]string{}},
},
expected: CurrentState{Status: "landed", LastError: "", Metadata: map[string]string{"final": "true"}},
},
{
name: "same version terminal records uses timestamp tiebreaker",
logs: []entity.RequestLog{
{RequestID: "q/1", TimestampMs: 1000, Status: "error", RequestVersion: 3, LastError: "first", Metadata: map[string]string{}},
{RequestID: "q/1", TimestampMs: 2000, Status: "error", RequestVersion: 3, LastError: "second", Metadata: map[string]string{}},
},
expected: CurrentState{Status: "error", LastError: "second", Metadata: map[string]string{}},
},
{
name: "terminal status without version is not terminal",
logs: []entity.RequestLog{
{RequestID: "q/1", TimestampMs: 1000, Status: "landed", RequestVersion: 0, LastError: "", Metadata: map[string]string{}},
{RequestID: "q/1", TimestampMs: 2000, Status: "processing", RequestVersion: 0, LastError: "", Metadata: map[string]string{"source": "gw"}},
},
expected: CurrentState{Status: "processing", LastError: "", Metadata: map[string]string{"source": "gw"}},
},
{
name: "no terminal records falls back to latest timestamp",
logs: []entity.RequestLog{
{RequestID: "q/1", TimestampMs: 1000, Status: "new", RequestVersion: 1, LastError: "", Metadata: map[string]string{}},
{RequestID: "q/1", TimestampMs: 3000, Status: "validated", RequestVersion: 2, LastError: "", Metadata: map[string]string{}},
{RequestID: "q/1", TimestampMs: 2000, Status: "processing", RequestVersion: 0, LastError: "", Metadata: map[string]string{}},
},
expected: CurrentState{Status: "validated", LastError: "", Metadata: map[string]string{}},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ctrl := gomock.NewController(t)
mockStore := storagemock.NewMockRequestLogStore(ctrl)
mockStore.EXPECT().List(gomock.Any(), "q/1").Return(tt.logs, nil)

result, err := GetCurrentStateFromRequestLog(context.Background(), mockStore, "q/1")
require.NoError(t, err)
assert.Equal(t, tt.expected, result)
})
}
}

func TestGetCurrentStateFromRequestLog_NoRecords(t *testing.T) {
ctrl := gomock.NewController(t)
mockStore := storagemock.NewMockRequestLogStore(ctrl)
mockStore.EXPECT().List(gomock.Any(), "q/1").Return(nil, storage.ErrNotFound)

_, err := GetCurrentStateFromRequestLog(context.Background(), mockStore, "q/1")
assert.Error(t, err)
assert.True(t, storage.IsNotFound(err))
}

func TestGetCurrentStateFromRequestLog_StoreError(t *testing.T) {
ctrl := gomock.NewController(t)
mockStore := storagemock.NewMockRequestLogStore(ctrl)
mockStore.EXPECT().List(gomock.Any(), "q/1").Return(nil, fmt.Errorf("db connection lost"))

_, err := GetCurrentStateFromRequestLog(context.Background(), mockStore, "q/1")
assert.Error(t, err)
}
2 changes: 2 additions & 0 deletions entity/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ go_library(
"change_provider.go",
"queue_config.go",
"request.go",
"request_log.go",
"speculation_tree.go",
],
importpath = "github.com/uber/submitqueue/entity",
Expand All @@ -20,6 +21,7 @@ go_test(
srcs = [
"batch_test.go",
"build_test.go",
"request_log_test.go",
"request_test.go",
],
embed = [":entity"],
Expand Down
5 changes: 5 additions & 0 deletions entity/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ const (
RequestStateError RequestState = "error"
)

// IsRequestStateTerminal returns true if the state represents a final, irreversible state (landed or error).
func IsRequestStateTerminal(s RequestState) bool {
return s == RequestStateLanded || s == RequestStateError
Comment thread
sbalabanov marked this conversation as resolved.
}

// Change represents a code change identified by URIs from a code change provider (e.g., GitHub Pull Request, Phabricator Diff).
// The provider is extracted from the URI scheme. The object is immutable after creation.
type Change struct {
Expand Down
62 changes: 62 additions & 0 deletions entity/request_log.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package entity

import (
"encoding/json"
"time"
)

// RequestLog is an append-only record that captures a point-in-time snapshot of a request's status
// for reconciliation purposes. It is stored in a separate database from the request store to support
// eventual consistency reconciliation.
type RequestLog struct {
// RequestID is the ID of the request this log entry belongs to. References entity.Request.ID.
RequestID string `json:"request_id"`
// TimestampMs is the time this log entry was created, in milliseconds since Unix epoch.
TimestampMs int64 `json:"timestamp_ms"`
// Status is the request status at the time this log entry was created. It does not have to correspond to the request status. For example, it may contain intermediate statuses like "validated" or "processing".
Status string `json:"status"`
// RequestVersion is the version of the request at the time this log entry was created.
// Zero if the version is not available.
RequestVersion int32 `json:"request_version"`
Comment thread
sbalabanov marked this conversation as resolved.
// LastError is the last error message associated with the status at the time of this log entry.
// Empty string if no error.
LastError string `json:"last_error"`
// Metadata is a set of key-value pairs providing additional context for this log entry.
// Empty map if no metadata.
Metadata map[string]string `json:"metadata"`
}

// NewRequestLog creates a new RequestLog with the given fields.
// TimestampMs is set to the current time. If metadata is nil, it will be initialized as an empty map.
func NewRequestLog(requestID string, status string, requestVersion int32, lastError string, metadata map[string]string) RequestLog {
if metadata == nil {
metadata = make(map[string]string)
}
return RequestLog{
RequestID: requestID,
TimestampMs: time.Now().UnixMilli(),
Status: status,
RequestVersion: requestVersion,
LastError: lastError,
Metadata: metadata,
}
}

// ToBytes serializes the RequestLog to JSON bytes for queue message payload.
func (r RequestLog) ToBytes() ([]byte, error) {
return json.Marshal(r)
}

// RequestLogFromBytes deserializes a RequestLog from JSON bytes.
// If metadata is absent from the JSON, it will be initialized as an empty map.
func RequestLogFromBytes(data []byte) (RequestLog, error) {
var log RequestLog
err := json.Unmarshal(data, &log)
if err != nil {
return log, err
}
if log.Metadata == nil {
log.Metadata = make(map[string]string)
}
return log, nil
}
Loading