From cca3577a3f838e63d3889a15c6264f6db0a664dd Mon Sep 17 00:00:00 2001 From: sergeyb Date: Wed, 4 Mar 2026 19:01:55 +0000 Subject: [PATCH] feat(requestlog): Implement Request Log and state reconciliation --- core/request/BUILD.bazel | 26 ++++ core/request/request.go | 76 ++++++++++ core/request/request_test.go | 113 +++++++++++++++ entity/BUILD.bazel | 2 + entity/request.go | 5 + entity/request_log.go | 62 ++++++++ entity/request_log_test.go | 134 ++++++++++++++++++ example/server/gateway/BUILD.bazel | 1 + example/server/gateway/main.go | 6 +- extension/storage/BUILD.bazel | 2 + extension/storage/mock/BUILD.bazel | 10 ++ extension/storage/mysql/BUILD.bazel | 1 + extension/storage/mysql/request_log_store.go | 98 +++++++++++++ .../storage/mysql/schema/request_log.sql | 10 ++ extension/storage/mysql/storage.go | 7 + extension/storage/request_log_store.go | 20 +++ extension/storage/storage.go | 3 + gateway/controller/BUILD.bazel | 2 + gateway/controller/land.go | 34 +++-- gateway/controller/land_test.go | 26 ++-- 20 files changed, 614 insertions(+), 24 deletions(-) create mode 100644 core/request/BUILD.bazel create mode 100644 core/request/request.go create mode 100644 core/request/request_test.go create mode 100644 entity/request_log.go create mode 100644 entity/request_log_test.go create mode 100644 extension/storage/mysql/request_log_store.go create mode 100644 extension/storage/mysql/schema/request_log.sql create mode 100644 extension/storage/request_log_store.go diff --git a/core/request/BUILD.bazel b/core/request/BUILD.bazel new file mode 100644 index 00000000..9fb5ed72 --- /dev/null +++ b/core/request/BUILD.bazel @@ -0,0 +1,26 @@ +load("@rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "request", + srcs = ["request.go"], + importpath = "github.com/uber/submitqueue/core/request", + visibility = ["//visibility:public"], + deps = [ + "//entity", + "//extension/storage", + ], +) + +go_test( + name = "request_test", + srcs = ["request_test.go"], + embed = [":request"], + deps = [ + "//entity", + "//extension/storage", + "//extension/storage/mock", + "@com_github_stretchr_testify//assert", + "@com_github_stretchr_testify//require", + "@org_uber_go_mock//gomock", + ], +) diff --git a/core/request/request.go b/core/request/request.go new file mode 100644 index 00000000..996dd065 --- /dev/null +++ b/core/request/request.go @@ -0,0 +1,76 @@ +package request + +import ( + "context" + "fmt" + + "github.com/uber/submitqueue/entity" + "github.com/uber/submitqueue/extension/storage" +) + +// CurrentState holds the current request status obtained from the request log. It is eventually consistent with the request status in the request store. It might take some time to converge, typically no more than a few seconds. +type CurrentState struct { + // Status is the current request status obtained from the request log. + Status string + // LastError is the last error associated with the current status. + LastError string + // Metadata is the metadata associated with the current status. + Metadata map[string]string +} + +// GetCurrentStateFromRequestLog returns the current reconciled state for a request by reading the +// request log. Returns ErrNotFound if the request ID has no records in the log database. The state is eventually consistent with the request status in the request store. It might take some time to converge, typically no more than a few seconds. +func GetCurrentStateFromRequestLog(ctx context.Context, store storage.RequestLogStore, requestID string) (CurrentState, error) { + logs, err := store.List(ctx, requestID) + if err != nil { + return CurrentState{}, fmt.Errorf("failed to list request logs for request_id=%s: %w", requestID, err) + } + + // Reconciliation strategy: + // + // Timestamps in request log records are client-generated and may not be consistent with the + // actual order of state modifications (e.g. clock skew, concurrent writers). Therefore we + // cannot rely on timestamps alone to determine the most current status. + // + // Records that originate from the Request entity carry a RequestVersion, which is + // monotonically incremented by the storage layer under optimistic locking. Version ordering + // is authoritative and guaranteed by the Request data model. + // + // The algorithm: + // 1. If any record has a terminal status (landed, error) AND a version (RequestVersion > 0), + // pick the one with the highest version. Timestamp breaks ties between equal versions, even though it should not happen. + // 2. Otherwise, fall back to the record with the largest timestamp. + + var bestTerminal *entity.RequestLog + var bestLatest *entity.RequestLog + + for i := range logs { + // iterate over all log records, storage contract guarantees that the records are ordered by timestamp ascending. + log := &logs[i] + + // Track the record with the largest timestamp as fallback. + if bestLatest == nil || log.TimestampMs > bestLatest.TimestampMs { + bestLatest = log + } + + // A terminal candidate must have a version from the Request entity and a terminal status. + if log.RequestVersion > 0 && entity.IsRequestStateTerminal(entity.RequestState(log.Status)) { + if bestTerminal == nil || + log.RequestVersion > bestTerminal.RequestVersion || + (log.RequestVersion == bestTerminal.RequestVersion && log.TimestampMs > bestTerminal.TimestampMs) { + bestTerminal = log + } + } + } + + winner := bestLatest + if bestTerminal != nil { + winner = bestTerminal + } + + return CurrentState{ + Status: winner.Status, + LastError: winner.LastError, + Metadata: winner.Metadata, + }, nil +} diff --git a/core/request/request_test.go b/core/request/request_test.go new file mode 100644 index 00000000..8cf489b4 --- /dev/null +++ b/core/request/request_test.go @@ -0,0 +1,113 @@ +package request + +import ( + "context" + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/mock/gomock" + + "github.com/uber/submitqueue/entity" + "github.com/uber/submitqueue/extension/storage" + storagemock "github.com/uber/submitqueue/extension/storage/mock" +) + +func TestGetCurrentStateFromRequestLog(t *testing.T) { + tests := []struct { + name string + logs []entity.RequestLog + expected CurrentState + }{ + { + name: "single record", + logs: []entity.RequestLog{ + {RequestID: "q/1", TimestampMs: 1000, Status: "new", RequestVersion: 1, LastError: "", Metadata: map[string]string{}}, + }, + expected: CurrentState{Status: "new", LastError: "", Metadata: map[string]string{}}, + }, + { + name: "terminal status wins over later non-terminal", + logs: []entity.RequestLog{ + {RequestID: "q/1", TimestampMs: 1000, Status: "new", RequestVersion: 1, LastError: "", Metadata: map[string]string{}}, + {RequestID: "q/1", TimestampMs: 2000, Status: "landed", RequestVersion: 3, LastError: "", Metadata: map[string]string{"batch": "b1"}}, + {RequestID: "q/1", TimestampMs: 3000, Status: "processing", RequestVersion: 0, LastError: "", Metadata: map[string]string{}}, + }, + expected: CurrentState{Status: "landed", LastError: "", Metadata: map[string]string{"batch": "b1"}}, + }, + { + name: "terminal error status with last error", + logs: []entity.RequestLog{ + {RequestID: "q/1", TimestampMs: 1000, Status: "new", RequestVersion: 1, LastError: "", Metadata: map[string]string{}}, + {RequestID: "q/1", TimestampMs: 2000, Status: "error", RequestVersion: 4, LastError: "merge conflict", Metadata: map[string]string{"step": "merge"}}, + }, + expected: CurrentState{Status: "error", LastError: "merge conflict", Metadata: map[string]string{"step": "merge"}}, + }, + { + name: "multiple terminal records picks highest version", + logs: []entity.RequestLog{ + {RequestID: "q/1", TimestampMs: 1000, Status: "error", RequestVersion: 2, LastError: "timeout", Metadata: map[string]string{}}, + {RequestID: "q/1", TimestampMs: 2000, Status: "landed", RequestVersion: 5, LastError: "", Metadata: map[string]string{"final": "true"}}, + {RequestID: "q/1", TimestampMs: 3000, Status: "error", RequestVersion: 3, LastError: "conflict", Metadata: map[string]string{}}, + }, + expected: CurrentState{Status: "landed", LastError: "", Metadata: map[string]string{"final": "true"}}, + }, + { + name: "same version terminal records uses timestamp tiebreaker", + logs: []entity.RequestLog{ + {RequestID: "q/1", TimestampMs: 1000, Status: "error", RequestVersion: 3, LastError: "first", Metadata: map[string]string{}}, + {RequestID: "q/1", TimestampMs: 2000, Status: "error", RequestVersion: 3, LastError: "second", Metadata: map[string]string{}}, + }, + expected: CurrentState{Status: "error", LastError: "second", Metadata: map[string]string{}}, + }, + { + name: "terminal status without version is not terminal", + logs: []entity.RequestLog{ + {RequestID: "q/1", TimestampMs: 1000, Status: "landed", RequestVersion: 0, LastError: "", Metadata: map[string]string{}}, + {RequestID: "q/1", TimestampMs: 2000, Status: "processing", RequestVersion: 0, LastError: "", Metadata: map[string]string{"source": "gw"}}, + }, + expected: CurrentState{Status: "processing", LastError: "", Metadata: map[string]string{"source": "gw"}}, + }, + { + name: "no terminal records falls back to latest timestamp", + logs: []entity.RequestLog{ + {RequestID: "q/1", TimestampMs: 1000, Status: "new", RequestVersion: 1, LastError: "", Metadata: map[string]string{}}, + {RequestID: "q/1", TimestampMs: 3000, Status: "validated", RequestVersion: 2, LastError: "", Metadata: map[string]string{}}, + {RequestID: "q/1", TimestampMs: 2000, Status: "processing", RequestVersion: 0, LastError: "", Metadata: map[string]string{}}, + }, + expected: CurrentState{Status: "validated", LastError: "", Metadata: map[string]string{}}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctrl := gomock.NewController(t) + mockStore := storagemock.NewMockRequestLogStore(ctrl) + mockStore.EXPECT().List(gomock.Any(), "q/1").Return(tt.logs, nil) + + result, err := GetCurrentStateFromRequestLog(context.Background(), mockStore, "q/1") + require.NoError(t, err) + assert.Equal(t, tt.expected, result) + }) + } +} + +func TestGetCurrentStateFromRequestLog_NoRecords(t *testing.T) { + ctrl := gomock.NewController(t) + mockStore := storagemock.NewMockRequestLogStore(ctrl) + mockStore.EXPECT().List(gomock.Any(), "q/1").Return(nil, storage.ErrNotFound) + + _, err := GetCurrentStateFromRequestLog(context.Background(), mockStore, "q/1") + assert.Error(t, err) + assert.True(t, storage.IsNotFound(err)) +} + +func TestGetCurrentStateFromRequestLog_StoreError(t *testing.T) { + ctrl := gomock.NewController(t) + mockStore := storagemock.NewMockRequestLogStore(ctrl) + mockStore.EXPECT().List(gomock.Any(), "q/1").Return(nil, fmt.Errorf("db connection lost")) + + _, err := GetCurrentStateFromRequestLog(context.Background(), mockStore, "q/1") + assert.Error(t, err) +} diff --git a/entity/BUILD.bazel b/entity/BUILD.bazel index ccd288d0..de5f2539 100644 --- a/entity/BUILD.bazel +++ b/entity/BUILD.bazel @@ -9,6 +9,7 @@ go_library( "change_provider.go", "queue_config.go", "request.go", + "request_log.go", "speculation_tree.go", ], importpath = "github.com/uber/submitqueue/entity", @@ -20,6 +21,7 @@ go_test( srcs = [ "batch_test.go", "build_test.go", + "request_log_test.go", "request_test.go", ], embed = [":entity"], diff --git a/entity/request.go b/entity/request.go index c85b67dc..aa115c69 100644 --- a/entity/request.go +++ b/entity/request.go @@ -35,6 +35,11 @@ const ( RequestStateError RequestState = "error" ) +// IsRequestStateTerminal returns true if the state represents a final, irreversible state (landed or error). +func IsRequestStateTerminal(s RequestState) bool { + return s == RequestStateLanded || s == RequestStateError +} + // Change represents a code change identified by URIs from a code change provider (e.g., GitHub Pull Request, Phabricator Diff). // The provider is extracted from the URI scheme. The object is immutable after creation. type Change struct { diff --git a/entity/request_log.go b/entity/request_log.go new file mode 100644 index 00000000..b8e09a8e --- /dev/null +++ b/entity/request_log.go @@ -0,0 +1,62 @@ +package entity + +import ( + "encoding/json" + "time" +) + +// RequestLog is an append-only record that captures a point-in-time snapshot of a request's status +// for reconciliation purposes. It is stored in a separate database from the request store to support +// eventual consistency reconciliation. +type RequestLog struct { + // RequestID is the ID of the request this log entry belongs to. References entity.Request.ID. + RequestID string `json:"request_id"` + // TimestampMs is the time this log entry was created, in milliseconds since Unix epoch. + TimestampMs int64 `json:"timestamp_ms"` + // Status is the request status at the time this log entry was created. It does not have to correspond to the request status. For example, it may contain intermediate statuses like "validated" or "processing". + Status string `json:"status"` + // RequestVersion is the version of the request at the time this log entry was created. + // Zero if the version is not available. + RequestVersion int32 `json:"request_version"` + // LastError is the last error message associated with the status at the time of this log entry. + // Empty string if no error. + LastError string `json:"last_error"` + // Metadata is a set of key-value pairs providing additional context for this log entry. + // Empty map if no metadata. + Metadata map[string]string `json:"metadata"` +} + +// NewRequestLog creates a new RequestLog with the given fields. +// TimestampMs is set to the current time. If metadata is nil, it will be initialized as an empty map. +func NewRequestLog(requestID string, status string, requestVersion int32, lastError string, metadata map[string]string) RequestLog { + if metadata == nil { + metadata = make(map[string]string) + } + return RequestLog{ + RequestID: requestID, + TimestampMs: time.Now().UnixMilli(), + Status: status, + RequestVersion: requestVersion, + LastError: lastError, + Metadata: metadata, + } +} + +// ToBytes serializes the RequestLog to JSON bytes for queue message payload. +func (r RequestLog) ToBytes() ([]byte, error) { + return json.Marshal(r) +} + +// RequestLogFromBytes deserializes a RequestLog from JSON bytes. +// If metadata is absent from the JSON, it will be initialized as an empty map. +func RequestLogFromBytes(data []byte) (RequestLog, error) { + var log RequestLog + err := json.Unmarshal(data, &log) + if err != nil { + return log, err + } + if log.Metadata == nil { + log.Metadata = make(map[string]string) + } + return log, nil +} diff --git a/entity/request_log_test.go b/entity/request_log_test.go new file mode 100644 index 00000000..a0fbc01a --- /dev/null +++ b/entity/request_log_test.go @@ -0,0 +1,134 @@ +package entity + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestNewRequestLog_NilMetadata(t *testing.T) { + log := NewRequestLog("queue1/100", "new", 0, "", nil) + + assert.NotNil(t, log.Metadata) + assert.Empty(t, log.Metadata) +} + +func TestRequestLog_ToBytes(t *testing.T) { + log := RequestLog{ + RequestID: "test-queue/123", + TimestampMs: 1709568000000, + Status: "new", + RequestVersion: 1, + LastError: "", + Metadata: map[string]string{"source": "gateway"}, + } + + data, err := log.ToBytes() + require.NoError(t, err) + assert.NotEmpty(t, data) + + jsonStr := string(data) + assert.Contains(t, jsonStr, "test-queue/123") + assert.Contains(t, jsonStr, "1709568000000") + assert.Contains(t, jsonStr, "gateway") +} + +func TestRequestLogFromBytes(t *testing.T) { + original := RequestLog{ + RequestID: "my-queue/999", + TimestampMs: 1709568000000, + Status: "processing", + RequestVersion: 3, + LastError: "timeout", + Metadata: map[string]string{"step": "validation", "attempt": "2"}, + } + + data, err := original.ToBytes() + require.NoError(t, err) + + deserialized, err := RequestLogFromBytes(data) + require.NoError(t, err) + + assert.Equal(t, original.RequestID, deserialized.RequestID) + assert.Equal(t, original.TimestampMs, deserialized.TimestampMs) + assert.Equal(t, original.Status, deserialized.Status) + assert.Equal(t, original.RequestVersion, deserialized.RequestVersion) + assert.Equal(t, original.LastError, deserialized.LastError) + assert.Equal(t, original.Metadata, deserialized.Metadata) +} + +func TestRequestLogFromBytes_InvalidJSON(t *testing.T) { + invalidJSON := []byte(`{"invalid": json"}`) + + _, err := RequestLogFromBytes(invalidJSON) + assert.Error(t, err) +} + +func TestRequestLogFromBytes_EmptyData(t *testing.T) { + emptyJSON := []byte(`{}`) + + log, err := RequestLogFromBytes(emptyJSON) + require.NoError(t, err) + + assert.Empty(t, log.RequestID) + assert.Equal(t, int64(0), log.TimestampMs) + assert.Empty(t, log.Status) + assert.Equal(t, int32(0), log.RequestVersion) + assert.Empty(t, log.LastError) + assert.NotNil(t, log.Metadata) + assert.Empty(t, log.Metadata) +} + +func TestRequestLog_SerializationRoundTrip(t *testing.T) { + tests := []struct { + name string + log RequestLog + }{ + { + name: "with all fields populated", + log: RequestLog{ + RequestID: "queue1/100", + TimestampMs: 1709568000000, + Status: "landed", + RequestVersion: 5, + LastError: "", + Metadata: map[string]string{"source": "orchestrator", "batch_id": "b-1"}, + }, + }, + { + name: "with error", + log: RequestLog{ + RequestID: "queue2/200", + TimestampMs: 1709568001000, + Status: "error", + RequestVersion: 2, + LastError: "merge conflict detected", + Metadata: map[string]string{}, + }, + }, + { + name: "with zero version", + log: RequestLog{ + RequestID: "queue3/300", + TimestampMs: 1709568002000, + Status: "new", + RequestVersion: 0, + LastError: "", + Metadata: map[string]string{"key": "value"}, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + data, err := tt.log.ToBytes() + require.NoError(t, err) + + deserialized, err := RequestLogFromBytes(data) + require.NoError(t, err) + + assert.Equal(t, tt.log, deserialized) + }) + } +} diff --git a/example/server/gateway/BUILD.bazel b/example/server/gateway/BUILD.bazel index c8a55621..73027d30 100644 --- a/example/server/gateway/BUILD.bazel +++ b/example/server/gateway/BUILD.bazel @@ -13,6 +13,7 @@ go_library( deps = [ "//extension/counter/mysql", "//extension/queue/mysql", + "//extension/storage/mysql", "//gateway/controller", "//gateway/protopb", "@com_github_go_sql_driver_mysql//:mysql", diff --git a/example/server/gateway/main.go b/example/server/gateway/main.go index 09f46c53..7e04b6e1 100644 --- a/example/server/gateway/main.go +++ b/example/server/gateway/main.go @@ -15,6 +15,7 @@ import ( "github.com/uber-go/tally/v4" mysqlcounter "github.com/uber/submitqueue/extension/counter/mysql" queueMySQL "github.com/uber/submitqueue/extension/queue/mysql" + mysqlstorage "github.com/uber/submitqueue/extension/storage/mysql" "github.com/uber/submitqueue/gateway/controller" pb "github.com/uber/submitqueue/gateway/protopb" "go.uber.org/zap" @@ -134,9 +135,12 @@ func run() error { // Create gRPC server grpcServer := grpc.NewServer() + // Initialize request log store from shared app database connection + requestLogStore := mysqlstorage.NewRequestLogStore(appDB, scope.SubScope("request_log_store")) + // Create controllers and wrap them for gRPC pingController := controller.NewPingController(logger, scope) - landController := controller.NewLandController(logger.Sugar(), scope, cnt, mysqlQueue.Publisher(), "request") + landController := controller.NewLandController(logger.Sugar(), scope, cnt, mysqlQueue.Publisher(), requestLogStore, "request") gatewayServer := &GatewayServer{ pingController: pingController, landController: landController, diff --git a/extension/storage/BUILD.bazel b/extension/storage/BUILD.bazel index 5278e5b8..3bcbff1f 100644 --- a/extension/storage/BUILD.bazel +++ b/extension/storage/BUILD.bazel @@ -6,6 +6,7 @@ exports_files( "batch_store.go", "build_store.go", "change_provider_store.go", + "request_log_store.go", "request_store.go", "speculation_tree_store.go", "storage.go", @@ -20,6 +21,7 @@ go_library( "batch_store.go", "build_store.go", "change_provider_store.go", + "request_log_store.go", "request_store.go", "speculation_tree_store.go", "storage.go", diff --git a/extension/storage/mock/BUILD.bazel b/extension/storage/mock/BUILD.bazel index 49da6756..d3ebd886 100644 --- a/extension/storage/mock/BUILD.bazel +++ b/extension/storage/mock/BUILD.bazel @@ -66,6 +66,15 @@ gomock( source_importpath = "github.com/uber/submitqueue/extension/storage", ) +gomock( + name = "mock_request_log_store_src", + out = "request_log_store_mock.go", + mockgen_tool = _MOCKGEN, + package = "mock", + source = "//extension/storage:request_log_store.go", + source_importpath = "github.com/uber/submitqueue/extension/storage", +) + # gazelle:ignore go_library( name = "mock", @@ -74,6 +83,7 @@ go_library( ":mock_batch_store_src", ":mock_build_store_src", ":mock_change_provider_store_src", + ":mock_request_log_store_src", ":mock_request_store_src", ":mock_speculation_tree_store_src", ":mock_storage_src", diff --git a/extension/storage/mysql/BUILD.bazel b/extension/storage/mysql/BUILD.bazel index 113086fe..df8f6f8f 100644 --- a/extension/storage/mysql/BUILD.bazel +++ b/extension/storage/mysql/BUILD.bazel @@ -7,6 +7,7 @@ go_library( "batch_store.go", "build_store.go", "change_provider_store.go", + "request_log_store.go", "request_store.go", "speculation_tree_store.go", "storage.go", diff --git a/extension/storage/mysql/request_log_store.go b/extension/storage/mysql/request_log_store.go new file mode 100644 index 00000000..6b2f548c --- /dev/null +++ b/extension/storage/mysql/request_log_store.go @@ -0,0 +1,98 @@ +package mysql + +import ( + "context" + "database/sql" + "encoding/json" + "fmt" + "math/rand/v2" + + "github.com/uber-go/tally/v4" + + "github.com/uber/submitqueue/core/metrics" + "github.com/uber/submitqueue/entity" + "github.com/uber/submitqueue/extension/storage" +) + +type requestLogStore struct { + db *sql.DB + scope tally.Scope +} + +// NewRequestLogStore creates a new MySQL-backed RequestLogStore. +func NewRequestLogStore(db *sql.DB, scope tally.Scope) storage.RequestLogStore { + return &requestLogStore{db: db, scope: scope} +} + +// Insert appends a new request log record. The primary key is (request_id, timestamp_ms, salt). +// Multiple log entries for the same request can share a timestamp (e.g. concurrent writers or +// millisecond-precision collisions), so a random salt is generated to guarantee uniqueness +// without requiring the caller to manage deduplication. +func (r *requestLogStore) Insert(ctx context.Context, log entity.RequestLog) (retErr error) { + op := metrics.Begin(r.scope, "insert") + defer func() { op.Complete(retErr) }() + + metadataJSON, err := json.Marshal(log.Metadata) + if err != nil { + return fmt.Errorf("failed to marshal metadata for request log request_id=%s: %w", log.RequestID, err) + } + + // Generate a random salt to break primary key ties when two inserts share the same + // (request_id, timestamp_ms). The salt is part of the composite primary key in MySQL + // but is never exposed through the storage interface or returned to callers. + salt := rand.Int64() + + _, err = r.db.ExecContext(ctx, + "INSERT INTO request_log (request_id, timestamp_ms, salt, status, request_version, last_error, metadata) VALUES (?, ?, ?, ?, ?, ?, ?)", + log.RequestID, log.TimestampMs, salt, log.Status, log.RequestVersion, log.LastError, metadataJSON, + ) + if err != nil { + return fmt.Errorf("failed to insert request log for request_id=%s timestamp_ms=%d: %w", log.RequestID, log.TimestampMs, err) + } + + return nil +} + +// List retrieves all request log records for a given request ID, ordered by timestamp ascending. +// Salt is used as a secondary sort key to provide stable ordering for entries that share a +// timestamp, but it is not included in the SELECT columns and never returned to callers. +func (r *requestLogStore) List(ctx context.Context, requestID string) (ret []entity.RequestLog, retErr error) { + op := metrics.Begin(r.scope, "list") + defer func() { op.Complete(retErr) }() + + rows, err := r.db.QueryContext(ctx, + "SELECT request_id, timestamp_ms, status, request_version, last_error, metadata FROM request_log WHERE request_id = ? ORDER BY timestamp_ms ASC, salt ASC", + requestID, + ) + if err != nil { + return nil, fmt.Errorf("failed to list request logs for request_id=%s: %w", requestID, err) + } + defer rows.Close() + + var logs []entity.RequestLog + for rows.Next() { + var log entity.RequestLog + var metadataJSON []byte + + err := rows.Scan(&log.RequestID, &log.TimestampMs, &log.Status, &log.RequestVersion, &log.LastError, &metadataJSON) + if err != nil { + return nil, fmt.Errorf("failed to scan request log row for request_id=%s: %w", requestID, err) + } + + if err := json.Unmarshal(metadataJSON, &log.Metadata); err != nil { + return nil, fmt.Errorf("failed to unmarshal metadata for request log request_id=%s: %w", requestID, err) + } + + logs = append(logs, log) + } + + if err := rows.Err(); err != nil { + return nil, fmt.Errorf("failed to iterate request log rows for request_id=%s: %w", requestID, err) + } + + if len(logs) == 0 { + return nil, fmt.Errorf("no request log records for request_id=%s: %w", requestID, storage.ErrNotFound) + } + + return logs, nil +} diff --git a/extension/storage/mysql/schema/request_log.sql b/extension/storage/mysql/schema/request_log.sql new file mode 100644 index 00000000..8866eeac --- /dev/null +++ b/extension/storage/mysql/schema/request_log.sql @@ -0,0 +1,10 @@ +CREATE TABLE IF NOT EXISTS request_log ( + request_id VARCHAR(255) NOT NULL, + timestamp_ms BIGINT NOT NULL, + salt BIGINT NOT NULL, + status VARCHAR(64) NOT NULL, + request_version INT NOT NULL, + last_error TEXT NOT NULL, + metadata JSON NOT NULL, + PRIMARY KEY (request_id, timestamp_ms, salt) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; diff --git a/extension/storage/mysql/storage.go b/extension/storage/mysql/storage.go index 4fee579a..ca1d6510 100644 --- a/extension/storage/mysql/storage.go +++ b/extension/storage/mysql/storage.go @@ -17,6 +17,7 @@ type mysqlStorage struct { batchDependentStore storage.BatchDependentStore buildStore storage.BuildStore speculationTreeStore storage.SpeculationTreeStore + requestLogStore storage.RequestLogStore } // NewStorage creates a new MySQL storage. @@ -29,6 +30,7 @@ func NewStorage(db *sql.DB, scope tally.Scope) (storage.Storage, error) { batchDependentStore: NewBatchDependentStore(db, scope.SubScope("batch_dependent_store")), buildStore: NewBuildStore(db, scope.SubScope("build_store")), speculationTreeStore: NewSpeculationTreeStore(db, scope.SubScope("speculation_tree_store")), + requestLogStore: NewRequestLogStore(db, scope.SubScope("request_log_store")), }, nil } @@ -62,6 +64,11 @@ func (f *mysqlStorage) GetSpeculationTreeStore() storage.SpeculationTreeStore { return f.speculationTreeStore } +// GetRequestLogStore returns the MySQL-backed RequestLogStore. +func (f *mysqlStorage) GetRequestLogStore() storage.RequestLogStore { + return f.requestLogStore +} + // Close closes the underlying database connection. func (f *mysqlStorage) Close() error { return f.db.Close() diff --git a/extension/storage/request_log_store.go b/extension/storage/request_log_store.go new file mode 100644 index 00000000..65b98d58 --- /dev/null +++ b/extension/storage/request_log_store.go @@ -0,0 +1,20 @@ +package storage + +//go:generate mockgen -source=request_log_store.go -destination=mock/request_log_store.go -package=mock + +import ( + "context" + + "github.com/uber/submitqueue/entity" +) + +// RequestLogStore is an interface that defines methods for managing request log records in an append-only database. +// Request logs are used to reconcile request statuses with eventual consistency into a separate database from RequestStore. +type RequestLogStore interface { + // Insert appends a new request log record. Timestamps should be generated by the caller and not modified by the implementation. + Insert(ctx context.Context, log entity.RequestLog) error + + // List retrieves all request log records for a given request ID, ordered by timestamp ascending. + // Returns ErrNotFound if no records exist for the given request ID. + List(ctx context.Context, requestID string) ([]entity.RequestLog, error) +} diff --git a/extension/storage/storage.go b/extension/storage/storage.go index 8aeef18f..68901e50 100644 --- a/extension/storage/storage.go +++ b/extension/storage/storage.go @@ -46,6 +46,9 @@ type Storage interface { // GetSpeculationTreeStore returns the SpeculationTreeStore instance. GetSpeculationTreeStore() SpeculationTreeStore + // GetRequestLogStore returns the RequestLogStore instance. + GetRequestLogStore() RequestLogStore + // Close closes the storage and all underlying connections. Should only be called once at the end of the program. Close() error } diff --git a/gateway/controller/BUILD.bazel b/gateway/controller/BUILD.bazel index 5329be0e..007fe5fd 100644 --- a/gateway/controller/BUILD.bazel +++ b/gateway/controller/BUILD.bazel @@ -14,6 +14,7 @@ go_library( "//entity/queue", "//extension/counter", "//extension/queue", + "//extension/storage", "//gateway/protopb", "@com_github_uber_go_tally_v4//:tally", "@org_uber_go_zap//:zap", @@ -32,6 +33,7 @@ go_test( "//entity/queue", "//extension/counter/mock", "//extension/queue/mock", + "//extension/storage/mock", "//gateway/protopb", "@com_github_stretchr_testify//assert", "@com_github_stretchr_testify//require", diff --git a/gateway/controller/land.go b/gateway/controller/land.go index fe5d861d..faf0777e 100644 --- a/gateway/controller/land.go +++ b/gateway/controller/land.go @@ -12,6 +12,7 @@ import ( "github.com/uber/submitqueue/entity/queue" "github.com/uber/submitqueue/extension/counter" extqueue "github.com/uber/submitqueue/extension/queue" + "github.com/uber/submitqueue/extension/storage" pb "github.com/uber/submitqueue/gateway/protopb" "go.uber.org/zap" ) @@ -27,22 +28,24 @@ func IsInvalidRequest(err error) bool { // LandController handles land business logic for the gateway type LandController struct { - logger *zap.SugaredLogger - metricsScope tally.Scope - counter counter.Counter - publisher extqueue.Publisher - topic string // Topic to publish requests to (e.g., "request", "land_request") + logger *zap.SugaredLogger + metricsScope tally.Scope + counter counter.Counter + publisher extqueue.Publisher + requestLogStore storage.RequestLogStore + topic string // Topic to publish requests to (e.g., "request", "land_request") } // NewLandController creates a new instance of the gateway land controller. // topic: the queue topic to publish requests to (e.g., "request", "land_request") -func NewLandController(logger *zap.SugaredLogger, scope tally.Scope, counter counter.Counter, publisher extqueue.Publisher, topic string) *LandController { +func NewLandController(logger *zap.SugaredLogger, scope tally.Scope, counter counter.Counter, publisher extqueue.Publisher, requestLogStore storage.RequestLogStore, topic string) *LandController { return &LandController{ - logger: logger, - metricsScope: scope, - counter: counter, - publisher: publisher, - topic: topic, + logger: logger, + metricsScope: scope, + counter: counter, + publisher: publisher, + requestLogStore: requestLogStore, + topic: topic, } } @@ -91,6 +94,12 @@ func (c *LandController) Land(ctx context.Context, req *pb.LandRequest) (*pb.Lan Version: 1, } + // Record the accepted status in the request log for reconciliation. Once the request materializes as a Request entity, the status might be updated to "new". + logEntry := entity.NewRequestLog(request.ID, "accepted", 0, "", nil) + if err := c.requestLogStore.Insert(ctx, logEntry); err != nil { + return nil, fmt.Errorf("LandController failed to insert request log for sqid=%s: %w", request.ID, err) + } + c.logger.Debugw("land request created", "queue", req.Queue, "sqid", request.ID, @@ -99,9 +108,6 @@ func (c *LandController) Land(ctx context.Context, req *pb.LandRequest) (*pb.Lan "strategy", string(strategy), ) - // TODO: Insert the created request to the - // event store - // Publish to queue for async processing if err := c.publishToQueue(ctx, request); err != nil { c.logger.Errorw("failed to publish request to queue", diff --git a/gateway/controller/land_test.go b/gateway/controller/land_test.go index 1d930909..9c5bf575 100644 --- a/gateway/controller/land_test.go +++ b/gateway/controller/land_test.go @@ -12,6 +12,7 @@ import ( "github.com/uber/submitqueue/entity/queue" countermock "github.com/uber/submitqueue/extension/counter/mock" queuemock "github.com/uber/submitqueue/extension/queue/mock" + storagemock "github.com/uber/submitqueue/extension/storage/mock" pb "github.com/uber/submitqueue/gateway/protopb" "go.uber.org/mock/gomock" "go.uber.org/zap" @@ -24,11 +25,18 @@ func noopPublisher(ctrl *gomock.Controller) *queuemock.MockPublisher { return pub } +// noopRequestLogStore returns a mock RequestLogStore that succeeds silently. +func noopRequestLogStore(ctrl *gomock.Controller) *storagemock.MockRequestLogStore { + s := storagemock.NewMockRequestLogStore(ctrl) + s.EXPECT().Insert(gomock.Any(), gomock.Any()).Return(nil).AnyTimes() + return s +} + func TestNewLandController(t *testing.T) { ctrl := gomock.NewController(t) cnt := countermock.NewMockCounter(ctrl) - controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, noopPublisher(ctrl), "request") + controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, noopPublisher(ctrl), noopRequestLogStore(ctrl), "request") require.NotNil(t, controller) } @@ -37,7 +45,7 @@ func TestLand_ReturnsSqid(t *testing.T) { cnt := countermock.NewMockCounter(ctrl) cnt.EXPECT().Next(gomock.Any(), gomock.Any()).Return(int64(1), nil) - controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, noopPublisher(ctrl), "request") + controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, noopPublisher(ctrl), noopRequestLogStore(ctrl), "request") ctx := context.Background() req := &pb.LandRequest{ @@ -55,7 +63,7 @@ func TestLand_ReturnsErrorOnCounterFailure(t *testing.T) { cnt := countermock.NewMockCounter(ctrl) cnt.EXPECT().Next(gomock.Any(), gomock.Any()).Return(int64(0), fmt.Errorf("counter unavailable")) - controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, noopPublisher(ctrl), "request") + controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, noopPublisher(ctrl), noopRequestLogStore(ctrl), "request") ctx := context.Background() req := &pb.LandRequest{ @@ -79,7 +87,7 @@ func TestLand_CounterDomainIncludesQueue(t *testing.T) { return 1, nil }, ) - controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, noopPublisher(ctrl), "request") + controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, noopPublisher(ctrl), noopRequestLogStore(ctrl), "request") ctx := context.Background() req := &pb.LandRequest{ @@ -96,7 +104,7 @@ func TestLand_ReturnsErrorOnEmptyQueue(t *testing.T) { ctrl := gomock.NewController(t) cnt := countermock.NewMockCounter(ctrl) - controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, noopPublisher(ctrl), "request") + controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, noopPublisher(ctrl), noopRequestLogStore(ctrl), "request") ctx := context.Background() req := &pb.LandRequest{ @@ -113,7 +121,7 @@ func TestLand_ReturnsErrorOnEmptyChangeUri(t *testing.T) { ctrl := gomock.NewController(t) cnt := countermock.NewMockCounter(ctrl) - controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, noopPublisher(ctrl), "request") + controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, noopPublisher(ctrl), noopRequestLogStore(ctrl), "request") ctx := context.Background() req := &pb.LandRequest{ @@ -130,7 +138,7 @@ func TestLand_ReturnsErrorOnNilChange(t *testing.T) { ctrl := gomock.NewController(t) cnt := countermock.NewMockCounter(ctrl) - controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, noopPublisher(ctrl), "request") + controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, noopPublisher(ctrl), noopRequestLogStore(ctrl), "request") ctx := context.Background() req := &pb.LandRequest{ @@ -160,7 +168,7 @@ func TestLand_PublishesToQueue(t *testing.T) { }, ) - controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, publisher, "request") + controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, publisher, noopRequestLogStore(ctrl), "request") ctx := context.Background() req := &pb.LandRequest{ @@ -197,7 +205,7 @@ func TestLand_ContinuesWhenPublishFails(t *testing.T) { publisher := queuemock.NewMockPublisher(ctrl) publisher.EXPECT().Publish(gomock.Any(), gomock.Any(), gomock.Any()).Return(fmt.Errorf("queue unavailable")) - controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, publisher, "request") + controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, publisher, noopRequestLogStore(ctrl), "request") ctx := context.Background() req := &pb.LandRequest{