From 45262671d6b81eff359c6cfc64ddb99018245981 Mon Sep 17 00:00:00 2001 From: Preetam Dwivedi Date: Thu, 4 Jun 2026 07:53:06 -0700 Subject: [PATCH] refactor(storage): fold ChangeStore into storage, drop dead store MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Summary ### Why? Two "change"-related stores were in the wrong shape. `ChangeStore` — the real, used store that records per-URI claims for in-flight requests and backs `start`'s URI claiming and `validate`'s overlap detection — lived as its own top-level extension and was injected into controllers as a separate dependency, bypassing the `storage.Storage` aggregator that owns every other entity store. Meanwhile `ChangeProviderStore` (exposed via `Storage.GetChangeProviderStore()`, with a mysql impl, mock, and schema) was dead code: no controller ever called it, and its `entity.ChangeProvider` was orphaned alongside it. ### What? - Move `ChangeStore` into `package storage`: the interface, mysql impl, and `change.sql` schema now live under `extension/storage[/mysql]`, and `ChangeStore` is a first-class member of the `Storage` aggregator via `GetChangeStore()` — matching every other store. - `start` and `validate` controllers drop their separate `changeStore` constructor param and read `store.GetChangeStore()`; the example orchestrator no longer constructs/injects it separately. - Delete the dead `ChangeProviderStore` (interface, mysql, mock, schema), `GetChangeProviderStore()`, and the orphaned `entity/change_provider.go`. - Fold the standalone changestore integration suite into the shared `StorageContractSuite` (driven through `GetChangeStore()`); e2e drops the now-redundant changestore schema apply. ## Test Plan - ✅ `make build`, `make test` (start/validate controllers pass against the storage-package mock) - ✅ `make lint`, `make check-gazelle`, `make check-mocks`, `make check-tidy` (no drift; `go.mod` / `MODULE.bazel` unchanged) - ✅ `make integration-test` (storage mysql suite now exercises the change store via `GetChangeStore()`) - ✅ `make e2e-test` (full land→validate flow: URI claim + overlap detection, `change` table applied from the storage schema dir) --- Makefile | 2 +- .../orchestrator/server/BUILD.bazel | 2 - .../submitqueue/orchestrator/server/main.go | 10 +- submitqueue/entity/BUILD.bazel | 1 - submitqueue/entity/change_provider.go | 29 --- submitqueue/extension/changestore/BUILD.bazel | 9 - submitqueue/extension/changestore/README.md | 24 --- .../extension/changestore/mock/BUILD.bazel | 12 -- .../extension/changestore/mysql/BUILD.bazel | 14 -- .../changestore/mysql/schema/BUILD.bazel | 5 - submitqueue/extension/storage/BUILD.bazel | 2 +- .../storage/change_provider_store.go | 42 ---- .../{changestore => storage}/change_store.go | 2 +- .../extension/storage/mock/BUILD.bazel | 2 +- .../mock/change_provider_store_mock.go | 71 ------ .../mock/change_store_mock.go | 0 .../extension/storage/mock/storage_mock.go | 14 +- .../extension/storage/mysql/BUILD.bazel | 2 +- .../storage/mysql/change_provider_store.go | 110 ---------- .../mysql/change_store.go | 4 +- .../extension/storage/mysql/schema/README.md | 7 + .../mysql/schema/change.sql | 2 +- .../storage/mysql/schema/change_provider.sql | 7 - .../extension/storage/mysql/storage.go | 10 +- submitqueue/extension/storage/storage.go | 4 +- .../orchestrator/controller/start/BUILD.bazel | 2 - .../orchestrator/controller/start/start.go | 6 +- .../controller/start/start_test.go | 15 +- .../controller/validate/BUILD.bazel | 2 - .../controller/validate/validate.go | 6 +- .../controller/validate/validate_test.go | 17 +- test/e2e/submitqueue/BUILD.bazel | 1 - test/e2e/submitqueue/suite_test.go | 1 - .../extension/changestore/mysql/BUILD.bazel | 25 --- .../changestore/mysql/changestore_test.go | 202 ------------------ .../changestore/mysql/docker-compose.yml | 19 -- .../submitqueue/extension/storage/suite.go | 141 ++++++++++++ 37 files changed, 191 insertions(+), 633 deletions(-) delete mode 100644 submitqueue/entity/change_provider.go delete mode 100644 submitqueue/extension/changestore/BUILD.bazel delete mode 100644 submitqueue/extension/changestore/README.md delete mode 100644 submitqueue/extension/changestore/mock/BUILD.bazel delete mode 100644 submitqueue/extension/changestore/mysql/BUILD.bazel delete mode 100644 submitqueue/extension/changestore/mysql/schema/BUILD.bazel delete mode 100644 submitqueue/extension/storage/change_provider_store.go rename submitqueue/extension/{changestore => storage}/change_store.go (99%) delete mode 100644 submitqueue/extension/storage/mock/change_provider_store_mock.go rename submitqueue/extension/{changestore => storage}/mock/change_store_mock.go (100%) delete mode 100644 submitqueue/extension/storage/mysql/change_provider_store.go rename submitqueue/extension/{changestore => storage}/mysql/change_store.go (95%) rename submitqueue/extension/{changestore => storage}/mysql/schema/change.sql (88%) delete mode 100644 submitqueue/extension/storage/mysql/schema/change_provider.sql delete mode 100644 test/integration/submitqueue/extension/changestore/mysql/BUILD.bazel delete mode 100644 test/integration/submitqueue/extension/changestore/mysql/changestore_test.go delete mode 100644 test/integration/submitqueue/extension/changestore/mysql/docker-compose.yml diff --git a/Makefile b/Makefile index 0230b3e9..4a8f7cf5 100644 --- a/Makefile +++ b/Makefile @@ -336,7 +336,7 @@ local-stovepipe-gateway-start: build-stovepipe-gateway-linux ## Start Stovepipe mocks: ## Generate mock files using mockgen @echo "Generating mocks..." - @$(BAZEL) run @rules_go//go -- generate ./submitqueue/extension/storage/... ./submitqueue/extension/buildrunner/... ./submitqueue/extension/changeprovider/... ./submitqueue/extension/changestore/... ./extension/counter/... ./extension/messagequeue/... ./submitqueue/extension/queueconfig/... ./submitqueue/extension/mergechecker/... ./submitqueue/extension/pusher/... ./submitqueue/extension/scorer/... ./submitqueue/extension/conflict/... ./submitqueue/core/consumer/... + @$(BAZEL) run @rules_go//go -- generate ./submitqueue/extension/storage/... ./submitqueue/extension/buildrunner/... ./submitqueue/extension/changeprovider/... ./extension/counter/... ./extension/messagequeue/... ./submitqueue/extension/queueconfig/... ./submitqueue/extension/mergechecker/... ./submitqueue/extension/pusher/... ./submitqueue/extension/scorer/... ./submitqueue/extension/conflict/... ./submitqueue/core/consumer/... @echo "Mocks generated successfully!" proto: ## Generate protobuf files from .proto definitions diff --git a/example/submitqueue/orchestrator/server/BUILD.bazel b/example/submitqueue/orchestrator/server/BUILD.bazel index 93c13e67..5190e02e 100644 --- a/example/submitqueue/orchestrator/server/BUILD.bazel +++ b/example/submitqueue/orchestrator/server/BUILD.bazel @@ -24,8 +24,6 @@ go_library( "//submitqueue/extension/buildrunner/noop", "//submitqueue/extension/changeprovider", "//submitqueue/extension/changeprovider/github", - "//submitqueue/extension/changestore", - "//submitqueue/extension/changestore/mysql", "//submitqueue/extension/conflict", "//submitqueue/extension/conflict/all", "//submitqueue/extension/mergechecker", diff --git a/example/submitqueue/orchestrator/server/main.go b/example/submitqueue/orchestrator/server/main.go index 6fc80ace..11a03d20 100644 --- a/example/submitqueue/orchestrator/server/main.go +++ b/example/submitqueue/orchestrator/server/main.go @@ -43,8 +43,6 @@ import ( buildnoop "github.com/uber/submitqueue/submitqueue/extension/buildrunner/noop" "github.com/uber/submitqueue/submitqueue/extension/changeprovider" githubprovider "github.com/uber/submitqueue/submitqueue/extension/changeprovider/github" - "github.com/uber/submitqueue/submitqueue/extension/changestore" - mysqlchangestore "github.com/uber/submitqueue/submitqueue/extension/changestore/mysql" "github.com/uber/submitqueue/submitqueue/extension/conflict" "github.com/uber/submitqueue/submitqueue/extension/conflict/all" "github.com/uber/submitqueue/submitqueue/extension/mergechecker" @@ -164,8 +162,6 @@ func run() error { return fmt.Errorf("failed to create storage: %w", err) } - changeStore := mysqlchangestore.NewChangeStore(appDB, scope.SubScope("changestore")) - // Open queue database connection // Docker Compose healthchecks ensure MySQL is ready before service starts queueDSN := os.Getenv("QUEUE_MYSQL_DSN") @@ -234,7 +230,7 @@ func run() error { br := buildnoop.New() // Register controllers - if err := registerControllers(c, logger.Sugar(), scope, registry, mc, cp, psh, br, cnt, store, changeStore); err != nil { + if err := registerControllers(c, logger.Sugar(), scope, registry, mc, cp, psh, br, cnt, store); err != nil { return err } @@ -460,12 +456,11 @@ type conflictFactory struct{ impl conflict.Analyzer } func (f conflictFactory) For(conflict.Config) (conflict.Analyzer, error) { return f.impl, nil } -func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope tally.Scope, registry consumer.TopicRegistry, mc mergechecker.MergeChecker, cp changeprovider.ChangeProvider, psh pusher.Pusher, br buildrunner.BuildRunner, cnt counter.Counter, store storage.Storage, changeStore changestore.ChangeStore) error { +func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope tally.Scope, registry consumer.TopicRegistry, mc mergechecker.MergeChecker, cp changeprovider.ChangeProvider, psh pusher.Pusher, br buildrunner.BuildRunner, cnt counter.Counter, store storage.Storage) error { requestController := start.NewController( logger, scope, store, - changeStore, registry, consumer.TopicKeyStart, "orchestrator-start", @@ -490,7 +485,6 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t logger, scope, store, - changeStore, registry, mergeCheckerFactory{impl: mc}, changeProviderFactory{impl: cp}, diff --git a/submitqueue/entity/BUILD.bazel b/submitqueue/entity/BUILD.bazel index d99f079b..39a077dc 100644 --- a/submitqueue/entity/BUILD.bazel +++ b/submitqueue/entity/BUILD.bazel @@ -7,7 +7,6 @@ go_library( "batch_dependent.go", "build.go", "cancel_request.go", - "change_provider.go", "change_record.go", "land_request.go", "queue_config.go", diff --git a/submitqueue/entity/change_provider.go b/submitqueue/entity/change_provider.go deleted file mode 100644 index fcfa104f..00000000 --- a/submitqueue/entity/change_provider.go +++ /dev/null @@ -1,29 +0,0 @@ -// Copyright (c) 2025 Uber Technologies, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package entity - -// ChangeProvider represents a code change from an external provider (e.g., a GitHub pull request or Gerrit changelist) -// along with its associated metadata. The object is immutable after creation. -type ChangeProvider struct { - // RequestID is the globally unique identifier for the land request. Format: "/". - RequestID string - // ChangeProviderSrc defines the source of the change. For e.g. - Github, Gitlab etc. - ChangeProviderSrc string - // ChangeProviderID is the identifier specified by the change provider source. For e.g. - Github PR ID etc. - ChangeProviderID string - // Metadata is the interesting data from the change provider that we want to store. - // This is a freeform JSON object. - Metadata map[string]string -} diff --git a/submitqueue/extension/changestore/BUILD.bazel b/submitqueue/extension/changestore/BUILD.bazel deleted file mode 100644 index d1a19a33..00000000 --- a/submitqueue/extension/changestore/BUILD.bazel +++ /dev/null @@ -1,9 +0,0 @@ -load("@rules_go//go:def.bzl", "go_library") - -go_library( - name = "changestore", - srcs = ["change_store.go"], - importpath = "github.com/uber/submitqueue/submitqueue/extension/changestore", - visibility = ["//visibility:public"], - deps = ["//submitqueue/entity"], -) diff --git a/submitqueue/extension/changestore/README.md b/submitqueue/extension/changestore/README.md deleted file mode 100644 index 0d4d2aab..00000000 --- a/submitqueue/extension/changestore/README.md +++ /dev/null @@ -1,24 +0,0 @@ -# ChangeStore - -Vendor-agnostic interface for tracking per-URI claims by in-flight land requests. - -Each record asserts that a specific URI (e.g., a GitHub PR) was claimed by a specific request, scoped to a queue. The store is read by the orchestrator's `validate` controller to detect duplicate requests — submissions whose URIs overlap with another in-flight request's URIs in the same queue. - -The interface is intentionally per-record / per-URI so any backend (SQL, DynamoDB, Bigtable, …) can implement it without needing batch atomicity or multi-key query support. Callers that have multiple URIs to claim or check loop over them; the typical request has a small number of URIs (a single PR or a short stack), so the loop overhead is negligible. - -## Semantics - -- **Identity is immutable.** A record is keyed by `(Queue, URI, RequestID)`; once written, that triple is never mutated. -- **Queue leads the key.** Backends should make `Queue` the leading column of the primary key (or partition key, in shardable stores). All reads are queue-scoped, so this turns lookups into PK-prefix scans and keeps the table shardable. -- **`RequestID` in the key is intentional.** Concurrent claims by different requests on the same URI coexist as distinct rows. Same-request retries collide on the PK and are absorbed idempotently; cross-request collisions show up as additional rows that callers detect via `GetByURI`. -- **Metadata is required and mutable.** The `Metadata` field is JSON. The store treats `'{}'` as the canonical "no metadata yet" value — callers that pass an empty Go string get `'{}'` written. Downstream enrichment can update it; `UpdatedAt` reflects the last update. -- **Per-record writes, idempotent.** `Create` writes a single record. A primary-key conflict is silently ignored, which makes queue-redelivery of the same request a safe no-op. There is no batch atomicity in the contract — callers with multiple URIs loop and rely on idempotency to converge under partial failure / retry. -- **Per-URI reads, no filtering.** `GetByURI` returns every record for a given `(queue, uri)`. The store does not filter by `request_id` or by the owning request's state. Callers that want to skip self filter by `RequestID`; callers that want only live owners consult `RequestStore` for liveness. -- **Versioned for safe metadata updates.** Each record carries a `Version` integer (starts at 1). Future `UpdateMetadata` operations follow the same caller-owned-arithmetic + conditional-write pattern as `RequestStore.UpdateState` — the caller passes `oldVersion` and `newVersion`, and the store performs a pure conditional write. -- **Append-only by design.** Records are not deleted when the owning request reaches a terminal state; the historical claim is preserved for audit. Duplicate detection filters terminals out at query time via the controller-side liveness check. - -## Implementing a Backend - -1. Create `extension/changestore/{backend}/` directory. -2. Implement the `ChangeStore` interface. -3. Add schema files under `extension/changestore/{backend}/schema/` if the backend requires them. diff --git a/submitqueue/extension/changestore/mock/BUILD.bazel b/submitqueue/extension/changestore/mock/BUILD.bazel deleted file mode 100644 index 57b5c3b0..00000000 --- a/submitqueue/extension/changestore/mock/BUILD.bazel +++ /dev/null @@ -1,12 +0,0 @@ -load("@rules_go//go:def.bzl", "go_library") - -go_library( - name = "mock", - srcs = ["change_store_mock.go"], - importpath = "github.com/uber/submitqueue/submitqueue/extension/changestore/mock", - visibility = ["//visibility:public"], - deps = [ - "//submitqueue/entity", - "@org_uber_go_mock//gomock", - ], -) diff --git a/submitqueue/extension/changestore/mysql/BUILD.bazel b/submitqueue/extension/changestore/mysql/BUILD.bazel deleted file mode 100644 index 5c181e8a..00000000 --- a/submitqueue/extension/changestore/mysql/BUILD.bazel +++ /dev/null @@ -1,14 +0,0 @@ -load("@rules_go//go:def.bzl", "go_library") - -go_library( - name = "mysql", - srcs = ["change_store.go"], - importpath = "github.com/uber/submitqueue/submitqueue/extension/changestore/mysql", - visibility = ["//visibility:public"], - deps = [ - "//core/metrics", - "//submitqueue/entity", - "//submitqueue/extension/changestore", - "@com_github_uber_go_tally_v4//:tally", - ], -) diff --git a/submitqueue/extension/changestore/mysql/schema/BUILD.bazel b/submitqueue/extension/changestore/mysql/schema/BUILD.bazel deleted file mode 100644 index 3412d773..00000000 --- a/submitqueue/extension/changestore/mysql/schema/BUILD.bazel +++ /dev/null @@ -1,5 +0,0 @@ -filegroup( - name = "schema", - srcs = glob(["*.sql"]), - visibility = ["//visibility:public"], -) diff --git a/submitqueue/extension/storage/BUILD.bazel b/submitqueue/extension/storage/BUILD.bazel index fe889fbc..867eef0f 100644 --- a/submitqueue/extension/storage/BUILD.bazel +++ b/submitqueue/extension/storage/BUILD.bazel @@ -6,7 +6,7 @@ go_library( "batch_dependent_store.go", "batch_store.go", "build_store.go", - "change_provider_store.go", + "change_store.go", "request_log_store.go", "request_store.go", "speculation_tree_store.go", diff --git a/submitqueue/extension/storage/change_provider_store.go b/submitqueue/extension/storage/change_provider_store.go deleted file mode 100644 index 326b4485..00000000 --- a/submitqueue/extension/storage/change_provider_store.go +++ /dev/null @@ -1,42 +0,0 @@ -// Copyright (c) 2025 Uber Technologies, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package storage - -//go:generate mockgen -source=change_provider_store.go -destination=mock/change_provider_store_mock.go -package=mock - -import ( - "context" - - "github.com/uber/submitqueue/submitqueue/entity" -) - -// ChangeProviderStore is an interface that defines methods for managing change provider information in the database. -type ChangeProviderStore interface { - // Get retrieves information about a change by ID. - // Returns ErrNotFound if the change provider is not found. - // - // Note: The order of ChangeProvider entities here is not guaranteed to - // be the same as the request to which it belongs. The caller is repsonsible - // for inspecting and mapping the result of this function to the - // order of changes within the original request. - // - Get(ctx context.Context, requestID string) ([]entity.ChangeProvider, error) - - // Create creates a new change provider. - Create(ctx context.Context, changeProvider entity.ChangeProvider) error - - // There is no update function since once created, data is only ever read from this - // store. -} diff --git a/submitqueue/extension/changestore/change_store.go b/submitqueue/extension/storage/change_store.go similarity index 99% rename from submitqueue/extension/changestore/change_store.go rename to submitqueue/extension/storage/change_store.go index 62e00877..a995f33c 100644 --- a/submitqueue/extension/changestore/change_store.go +++ b/submitqueue/extension/storage/change_store.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package changestore +package storage //go:generate mockgen -source=change_store.go -destination=mock/change_store_mock.go -package=mock diff --git a/submitqueue/extension/storage/mock/BUILD.bazel b/submitqueue/extension/storage/mock/BUILD.bazel index a8d2585a..55c5d808 100644 --- a/submitqueue/extension/storage/mock/BUILD.bazel +++ b/submitqueue/extension/storage/mock/BUILD.bazel @@ -6,7 +6,7 @@ go_library( "batch_dependent_store_mock.go", "batch_store_mock.go", "build_store_mock.go", - "change_provider_store_mock.go", + "change_store_mock.go", "request_log_store_mock.go", "request_store_mock.go", "speculation_tree_store_mock.go", diff --git a/submitqueue/extension/storage/mock/change_provider_store_mock.go b/submitqueue/extension/storage/mock/change_provider_store_mock.go deleted file mode 100644 index df1a7aa7..00000000 --- a/submitqueue/extension/storage/mock/change_provider_store_mock.go +++ /dev/null @@ -1,71 +0,0 @@ -// Code generated by MockGen. DO NOT EDIT. -// Source: change_provider_store.go -// -// Generated by this command: -// -// mockgen -source=change_provider_store.go -destination=mock/change_provider_store_mock.go -package=mock -// - -// Package mock is a generated GoMock package. -package mock - -import ( - context "context" - reflect "reflect" - - entity "github.com/uber/submitqueue/submitqueue/entity" - gomock "go.uber.org/mock/gomock" -) - -// MockChangeProviderStore is a mock of ChangeProviderStore interface. -type MockChangeProviderStore struct { - ctrl *gomock.Controller - recorder *MockChangeProviderStoreMockRecorder - isgomock struct{} -} - -// MockChangeProviderStoreMockRecorder is the mock recorder for MockChangeProviderStore. -type MockChangeProviderStoreMockRecorder struct { - mock *MockChangeProviderStore -} - -// NewMockChangeProviderStore creates a new mock instance. -func NewMockChangeProviderStore(ctrl *gomock.Controller) *MockChangeProviderStore { - mock := &MockChangeProviderStore{ctrl: ctrl} - mock.recorder = &MockChangeProviderStoreMockRecorder{mock} - return mock -} - -// EXPECT returns an object that allows the caller to indicate expected use. -func (m *MockChangeProviderStore) EXPECT() *MockChangeProviderStoreMockRecorder { - return m.recorder -} - -// Create mocks base method. -func (m *MockChangeProviderStore) Create(ctx context.Context, changeProvider entity.ChangeProvider) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Create", ctx, changeProvider) - ret0, _ := ret[0].(error) - return ret0 -} - -// Create indicates an expected call of Create. -func (mr *MockChangeProviderStoreMockRecorder) Create(ctx, changeProvider any) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Create", reflect.TypeOf((*MockChangeProviderStore)(nil).Create), ctx, changeProvider) -} - -// Get mocks base method. -func (m *MockChangeProviderStore) Get(ctx context.Context, requestID string) ([]entity.ChangeProvider, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Get", ctx, requestID) - ret0, _ := ret[0].([]entity.ChangeProvider) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// Get indicates an expected call of Get. -func (mr *MockChangeProviderStoreMockRecorder) Get(ctx, requestID any) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Get", reflect.TypeOf((*MockChangeProviderStore)(nil).Get), ctx, requestID) -} diff --git a/submitqueue/extension/changestore/mock/change_store_mock.go b/submitqueue/extension/storage/mock/change_store_mock.go similarity index 100% rename from submitqueue/extension/changestore/mock/change_store_mock.go rename to submitqueue/extension/storage/mock/change_store_mock.go diff --git a/submitqueue/extension/storage/mock/storage_mock.go b/submitqueue/extension/storage/mock/storage_mock.go index ddf0574a..4133bc2a 100644 --- a/submitqueue/extension/storage/mock/storage_mock.go +++ b/submitqueue/extension/storage/mock/storage_mock.go @@ -96,18 +96,18 @@ func (mr *MockStorageMockRecorder) GetBuildStore() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetBuildStore", reflect.TypeOf((*MockStorage)(nil).GetBuildStore)) } -// GetChangeProviderStore mocks base method. -func (m *MockStorage) GetChangeProviderStore() storage.ChangeProviderStore { +// GetChangeStore mocks base method. +func (m *MockStorage) GetChangeStore() storage.ChangeStore { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GetChangeProviderStore") - ret0, _ := ret[0].(storage.ChangeProviderStore) + ret := m.ctrl.Call(m, "GetChangeStore") + ret0, _ := ret[0].(storage.ChangeStore) return ret0 } -// GetChangeProviderStore indicates an expected call of GetChangeProviderStore. -func (mr *MockStorageMockRecorder) GetChangeProviderStore() *gomock.Call { +// GetChangeStore indicates an expected call of GetChangeStore. +func (mr *MockStorageMockRecorder) GetChangeStore() *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetChangeProviderStore", reflect.TypeOf((*MockStorage)(nil).GetChangeProviderStore)) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetChangeStore", reflect.TypeOf((*MockStorage)(nil).GetChangeStore)) } // GetRequestLogStore mocks base method. diff --git a/submitqueue/extension/storage/mysql/BUILD.bazel b/submitqueue/extension/storage/mysql/BUILD.bazel index 525df156..671f83a6 100644 --- a/submitqueue/extension/storage/mysql/BUILD.bazel +++ b/submitqueue/extension/storage/mysql/BUILD.bazel @@ -6,7 +6,7 @@ go_library( "batch_dependent_store.go", "batch_store.go", "build_store.go", - "change_provider_store.go", + "change_store.go", "request_log_store.go", "request_store.go", "speculation_tree_store.go", diff --git a/submitqueue/extension/storage/mysql/change_provider_store.go b/submitqueue/extension/storage/mysql/change_provider_store.go deleted file mode 100644 index 1ee52f34..00000000 --- a/submitqueue/extension/storage/mysql/change_provider_store.go +++ /dev/null @@ -1,110 +0,0 @@ -// Copyright (c) 2025 Uber Technologies, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package mysql - -import ( - "context" - "database/sql" - "encoding/json" - "errors" - "fmt" - - "github.com/go-sql-driver/mysql" - "github.com/uber-go/tally/v4" - - "github.com/uber/submitqueue/core/metrics" - "github.com/uber/submitqueue/submitqueue/entity" - "github.com/uber/submitqueue/submitqueue/extension/storage" -) - -type changeProviderStore struct { - db *sql.DB - scope tally.Scope -} - -// NewChangeProviderStore creates a new MySQL-backed ChangeProviderStore. -func NewChangeProviderStore(db *sql.DB, scope tally.Scope) storage.ChangeProviderStore { - return &changeProviderStore{db: db, scope: scope} -} - -// Get retrieves change provider(s) by request ID. Returns ErrNotFound if the change provider is not found. -// -// Note: The order of ChangeProvider entities returned here is not guaranteed -// to be the same as the request to which it belongs. The caller is repsonsible -// for inspecting and mapping the result of this function to the -// order of changes within the original request. -func (s *changeProviderStore) Get(ctx context.Context, requestID string) (ret []entity.ChangeProvider, retErr error) { - op := metrics.Begin(s.scope, "get") - defer func() { op.Complete(retErr) }() - - rows, err := s.db.QueryContext(ctx, - "SELECT request_id, change_provider_src, change_provider_id, metadata FROM change_provider WHERE request_id = ?", - requestID, - ) - if err != nil { - return nil, fmt.Errorf("failed to get change provider entities requestID=%s from the database: %w", requestID, err) - } - defer rows.Close() - - var results []entity.ChangeProvider - for rows.Next() { - var cp entity.ChangeProvider - var metadataJSON []byte - - if err := rows.Scan(&cp.RequestID, &cp.ChangeProviderSrc, &cp.ChangeProviderID, &metadataJSON); err != nil { - return nil, fmt.Errorf("failed to scan change provider entity requestID=%s from the database: %w", requestID, err) - } - - if err := json.Unmarshal(metadataJSON, &cp.Metadata); err != nil { - return nil, fmt.Errorf("failed to unmarshal metadata for change provider entity requestID=%s from the database: %w", requestID, err) - } - - results = append(results, cp) - } - if err := rows.Err(); err != nil { - return nil, fmt.Errorf("failed to iterate change provider entities requestID=%s from the database: %w", requestID, err) - } - - if len(results) == 0 { - return nil, storage.WrapNotFound(fmt.Errorf("change provider entity requestID=%s", requestID)) - } - - return results, nil -} - -// Create creates a new change provider. Returns ErrAlreadyExists if the entry already exists. -func (s *changeProviderStore) Create(ctx context.Context, changeProvider entity.ChangeProvider) (retErr error) { - op := metrics.Begin(s.scope, "create") - defer func() { op.Complete(retErr) }() - - metadataJSON, err := json.Marshal(changeProvider.Metadata) - if err != nil { - return fmt.Errorf("failed to marshal metadata id=%s for Create change provider entity: %w", changeProvider.RequestID, err) - } - - _, err = s.db.ExecContext(ctx, - "INSERT INTO change_provider (request_id, change_provider_src, change_provider_id, metadata) VALUES (?, ?, ?, ?)", - changeProvider.RequestID, changeProvider.ChangeProviderSrc, changeProvider.ChangeProviderID, metadataJSON, - ) - if err != nil { - var mysqlErr *mysql.MySQLError - if errors.As(err, &mysqlErr) && mysqlErr.Number == 1062 { - return fmt.Errorf("change provider entity id=%s: %w", changeProvider.RequestID, storage.ErrAlreadyExists) - } - return fmt.Errorf("failed to insert change provider entity id=%s: %w", changeProvider.RequestID, err) - } - - return nil -} diff --git a/submitqueue/extension/changestore/mysql/change_store.go b/submitqueue/extension/storage/mysql/change_store.go similarity index 95% rename from submitqueue/extension/changestore/mysql/change_store.go rename to submitqueue/extension/storage/mysql/change_store.go index 0b8f1bfb..c83ae92b 100644 --- a/submitqueue/extension/changestore/mysql/change_store.go +++ b/submitqueue/extension/storage/mysql/change_store.go @@ -23,7 +23,7 @@ import ( "github.com/uber/submitqueue/core/metrics" "github.com/uber/submitqueue/submitqueue/entity" - "github.com/uber/submitqueue/submitqueue/extension/changestore" + "github.com/uber/submitqueue/submitqueue/extension/storage" ) type changeStore struct { @@ -32,7 +32,7 @@ type changeStore struct { } // NewChangeStore creates a new MySQL-backed ChangeStore. -func NewChangeStore(db *sql.DB, scope tally.Scope) changestore.ChangeStore { +func NewChangeStore(db *sql.DB, scope tally.Scope) storage.ChangeStore { return &changeStore{db: db, scope: scope} } diff --git a/submitqueue/extension/storage/mysql/schema/README.md b/submitqueue/extension/storage/mysql/schema/README.md index 26e24b37..9e8c36bf 100644 --- a/submitqueue/extension/storage/mysql/schema/README.md +++ b/submitqueue/extension/storage/mysql/schema/README.md @@ -15,3 +15,10 @@ The `batch` table has a composite secondary index on `(queue, state)`. This inde #### Future: Prune job As the `batch` table grows, the secondary index will grow with it, increasing storage costs and degrading write performance. To mitigate this, a prune job should be introduced to periodically delete batches in terminal states (`succeeded`, `failed`, `cancelled`) that are older than a configurable retention period. This keeps the table and its indexes bounded in size, ensuring consistent query and write performance over time. + +## change table + +### Composite primary key: `(queue, uri, request_id)` + +The `change` table records per-URI claims by in-flight requests. `request_id` is part of the primary key so that concurrent claims on the same URI by different requests coexist as distinct rows — a same-request retry collides on the PK and is a no-op (`INSERT IGNORE`), while a different-request claim is a new row that `GetByURI` surfaces for overlap detection. `queue` leads the key so queue-scoped lookups are primary-key-prefix scans and the table is shardable by queue. + diff --git a/submitqueue/extension/changestore/mysql/schema/change.sql b/submitqueue/extension/storage/mysql/schema/change.sql similarity index 88% rename from submitqueue/extension/changestore/mysql/schema/change.sql rename to submitqueue/extension/storage/mysql/schema/change.sql index 9492402f..d5397754 100644 --- a/submitqueue/extension/changestore/mysql/schema/change.sql +++ b/submitqueue/extension/storage/mysql/schema/change.sql @@ -1,7 +1,7 @@ -- request_id is part of the PK so concurrent claims by different requests on the -- same URI coexist as distinct rows. Same-request retry → PK conflict (no-op via -- INSERT IGNORE). Different-request collision → distinct row, surfaced by --- FindOverlapping. Queue leads the PK so queue-scoped lookups are PK-prefix scans +-- GetByURI. Queue leads the PK so queue-scoped lookups are PK-prefix scans -- and the table is shardable by queue. CREATE TABLE IF NOT EXISTS `change` ( uri VARCHAR(255) NOT NULL, diff --git a/submitqueue/extension/storage/mysql/schema/change_provider.sql b/submitqueue/extension/storage/mysql/schema/change_provider.sql deleted file mode 100644 index f714d33f..00000000 --- a/submitqueue/extension/storage/mysql/schema/change_provider.sql +++ /dev/null @@ -1,7 +0,0 @@ -CREATE TABLE IF NOT EXISTS change_provider ( - request_id VARCHAR(255) NOT NULL, - change_provider_src VARCHAR(255) NOT NULL, - change_provider_id VARCHAR(255) NOT NULL, - metadata JSON NOT NUll, - PRIMARY KEY (request_id,change_provider_src,change_provider_id) -) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; diff --git a/submitqueue/extension/storage/mysql/storage.go b/submitqueue/extension/storage/mysql/storage.go index 48d02031..661fac9c 100644 --- a/submitqueue/extension/storage/mysql/storage.go +++ b/submitqueue/extension/storage/mysql/storage.go @@ -26,7 +26,7 @@ import ( type mysqlStorage struct { db *sql.DB requestStore storage.RequestStore - changeProviderStore storage.ChangeProviderStore + changeStore storage.ChangeStore batchStore storage.BatchStore batchDependentStore storage.BatchDependentStore buildStore storage.BuildStore @@ -39,7 +39,7 @@ func NewStorage(db *sql.DB, scope tally.Scope) (storage.Storage, error) { return &mysqlStorage{ db: db, requestStore: NewRequestStore(db, scope.SubScope("request_store")), - changeProviderStore: NewChangeProviderStore(db, scope.SubScope("change_provider_store")), + changeStore: NewChangeStore(db, scope.SubScope("change_store")), batchStore: NewBatchStore(db, scope.SubScope("batch_store")), batchDependentStore: NewBatchDependentStore(db, scope.SubScope("batch_dependent_store")), buildStore: NewBuildStore(db, scope.SubScope("build_store")), @@ -53,9 +53,9 @@ func (f *mysqlStorage) GetRequestStore() storage.RequestStore { return f.requestStore } -// GetChangeProviderStore returns the MySQL-backed ChangeProviderStore. -func (f *mysqlStorage) GetChangeProviderStore() storage.ChangeProviderStore { - return f.changeProviderStore +// GetChangeStore returns the MySQL-backed ChangeStore. +func (f *mysqlStorage) GetChangeStore() storage.ChangeStore { + return f.changeStore } // GetBatchStore returns the MySQL-backed BatchStore. diff --git a/submitqueue/extension/storage/storage.go b/submitqueue/extension/storage/storage.go index 1b84c4e5..a02bef73 100644 --- a/submitqueue/extension/storage/storage.go +++ b/submitqueue/extension/storage/storage.go @@ -47,8 +47,8 @@ type Storage interface { // GetRequestStore returns the RequestStore instance. GetRequestStore() RequestStore - // GetChangeProviderStore returns the ChangeProviderStore instance. - GetChangeProviderStore() ChangeProviderStore + // GetChangeStore returns the ChangeStore instance. + GetChangeStore() ChangeStore // GetBatchStore returns the BatchStore instance. GetBatchStore() BatchStore diff --git a/submitqueue/orchestrator/controller/start/BUILD.bazel b/submitqueue/orchestrator/controller/start/BUILD.bazel index b82b9990..f9b1adb1 100644 --- a/submitqueue/orchestrator/controller/start/BUILD.bazel +++ b/submitqueue/orchestrator/controller/start/BUILD.bazel @@ -11,7 +11,6 @@ go_library( "//submitqueue/core/consumer", "//submitqueue/core/request", "//submitqueue/entity", - "//submitqueue/extension/changestore", "//submitqueue/extension/storage", "@com_github_uber_go_tally_v4//:tally", "@org_uber_go_zap//:zap", @@ -28,7 +27,6 @@ go_test( "//extension/messagequeue/mock", "//submitqueue/core/consumer", "//submitqueue/entity", - "//submitqueue/extension/changestore/mock", "//submitqueue/extension/storage", "//submitqueue/extension/storage/mock", "@com_github_stretchr_testify//assert", diff --git a/submitqueue/orchestrator/controller/start/start.go b/submitqueue/orchestrator/controller/start/start.go index 5349835f..f389d139 100644 --- a/submitqueue/orchestrator/controller/start/start.go +++ b/submitqueue/orchestrator/controller/start/start.go @@ -26,7 +26,6 @@ import ( "github.com/uber/submitqueue/submitqueue/core/consumer" corerequest "github.com/uber/submitqueue/submitqueue/core/request" "github.com/uber/submitqueue/submitqueue/entity" - "github.com/uber/submitqueue/submitqueue/extension/changestore" "github.com/uber/submitqueue/submitqueue/extension/storage" "go.uber.org/zap" ) @@ -41,7 +40,6 @@ type Controller struct { logger *zap.SugaredLogger metricsScope tally.Scope store storage.Storage - changeStore changestore.ChangeStore registry consumer.TopicRegistry topicKey consumer.TopicKey consumerGroup string @@ -55,7 +53,6 @@ func NewController( logger *zap.SugaredLogger, scope tally.Scope, store storage.Storage, - changeStore changestore.ChangeStore, registry consumer.TopicRegistry, topicKey consumer.TopicKey, consumerGroup string, @@ -64,7 +61,6 @@ func NewController( logger: logger.Named("start_controller"), metricsScope: scope.SubScope("start_controller"), store: store, - changeStore: changeStore, registry: registry, topicKey: topicKey, consumerGroup: consumerGroup, @@ -160,7 +156,7 @@ func (c *Controller) claimURIs(ctx context.Context, request entity.Request) erro UpdatedAt: now, Version: 1, } - if err := c.changeStore.Create(ctx, record); err != nil { + if err := c.store.GetChangeStore().Create(ctx, record); err != nil { return fmt.Errorf("failed to claim uri=%s for request %s: %w", uri, request.ID, err) } } diff --git a/submitqueue/orchestrator/controller/start/start_test.go b/submitqueue/orchestrator/controller/start/start_test.go index 05995542..04d90286 100644 --- a/submitqueue/orchestrator/controller/start/start_test.go +++ b/submitqueue/orchestrator/controller/start/start_test.go @@ -27,7 +27,6 @@ import ( queuemock "github.com/uber/submitqueue/extension/messagequeue/mock" "github.com/uber/submitqueue/submitqueue/core/consumer" "github.com/uber/submitqueue/submitqueue/entity" - changemock "github.com/uber/submitqueue/submitqueue/extension/changestore/mock" "github.com/uber/submitqueue/submitqueue/extension/storage" storagemock "github.com/uber/submitqueue/submitqueue/extension/storage/mock" "go.uber.org/mock/gomock" @@ -39,12 +38,14 @@ func newTestController( t *testing.T, ctrl *gomock.Controller, store *storagemock.MockStorage, - cs *changemock.MockChangeStore, + cs *storagemock.MockChangeStore, publishErr error, ) *Controller { logger := zaptest.NewLogger(t).Sugar() scope := tally.NoopScope + store.EXPECT().GetChangeStore().Return(cs).AnyTimes() + mockPub := queuemock.NewMockPublisher(ctrl) mockPub.EXPECT().Publish(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( func(ctx context.Context, topic string, msg entityqueue.Message) error { @@ -63,7 +64,7 @@ func newTestController( ) require.NoError(t, err) - return NewController(logger, scope, store, cs, registry, consumer.TopicKeyStart, "orchestrator-start") + return NewController(logger, scope, store, registry, consumer.TopicKeyStart, "orchestrator-start") } // newMockStorage creates a MockStorage with a MockRequestStore that succeeds on Create. @@ -77,8 +78,8 @@ func newMockStorage(ctrl *gomock.Controller) *storagemock.MockStorage { } // newMockChangeStore returns a MockChangeStore that accepts any Create call. -func newMockChangeStore(ctrl *gomock.Controller) *changemock.MockChangeStore { - cs := changemock.NewMockChangeStore(ctrl) +func newMockChangeStore(ctrl *gomock.Controller) *storagemock.MockChangeStore { + cs := storagemock.NewMockChangeStore(ctrl) cs.EXPECT().Create(gomock.Any(), gomock.Any()).Return(nil).AnyTimes() return cs } @@ -197,7 +198,7 @@ func TestController_Process_AllStrategies(t *testing.T) { func TestController_Process_MultipleChanges(t *testing.T) { ctrl := gomock.NewController(t) - cs := changemock.NewMockChangeStore(ctrl) + cs := storagemock.NewMockChangeStore(ctrl) var captured []entity.ChangeRecord cs.EXPECT().Create(gomock.Any(), gomock.Any()).DoAndReturn( func(ctx context.Context, record entity.ChangeRecord) error { @@ -292,7 +293,7 @@ func TestController_Process_AlreadyExistsSucceeds(t *testing.T) { func TestController_Process_ChangeStoreFailure(t *testing.T) { ctrl := gomock.NewController(t) - cs := changemock.NewMockChangeStore(ctrl) + cs := storagemock.NewMockChangeStore(ctrl) cs.EXPECT().Create(gomock.Any(), gomock.Any()).Return(fmt.Errorf("change store down")) controller := newTestController(t, ctrl, newMockStorage(ctrl), cs, nil) diff --git a/submitqueue/orchestrator/controller/validate/BUILD.bazel b/submitqueue/orchestrator/controller/validate/BUILD.bazel index 19b7cf21..5880202e 100644 --- a/submitqueue/orchestrator/controller/validate/BUILD.bazel +++ b/submitqueue/orchestrator/controller/validate/BUILD.bazel @@ -12,7 +12,6 @@ go_library( "//submitqueue/core/consumer", "//submitqueue/entity", "//submitqueue/extension/changeprovider", - "//submitqueue/extension/changestore", "//submitqueue/extension/mergechecker", "//submitqueue/extension/storage", "@com_github_uber_go_tally_v4//:tally", @@ -32,7 +31,6 @@ go_test( "//submitqueue/entity", "//submitqueue/extension/changeprovider", "//submitqueue/extension/changeprovider/mock", - "//submitqueue/extension/changestore/mock", "//submitqueue/extension/mergechecker", "//submitqueue/extension/mergechecker/mock", "//submitqueue/extension/storage", diff --git a/submitqueue/orchestrator/controller/validate/validate.go b/submitqueue/orchestrator/controller/validate/validate.go index 71615eb9..4aa738e9 100644 --- a/submitqueue/orchestrator/controller/validate/validate.go +++ b/submitqueue/orchestrator/controller/validate/validate.go @@ -26,7 +26,6 @@ import ( "github.com/uber/submitqueue/submitqueue/core/consumer" "github.com/uber/submitqueue/submitqueue/entity" "github.com/uber/submitqueue/submitqueue/extension/changeprovider" - "github.com/uber/submitqueue/submitqueue/extension/changestore" "github.com/uber/submitqueue/submitqueue/extension/mergechecker" "github.com/uber/submitqueue/submitqueue/extension/storage" "go.uber.org/zap" @@ -40,7 +39,6 @@ type Controller struct { logger *zap.SugaredLogger metricsScope tally.Scope store storage.Storage - changeStore changestore.ChangeStore registry consumer.TopicRegistry mergeCheckers mergechecker.Factory changeProviders changeprovider.Factory @@ -56,7 +54,6 @@ func NewController( logger *zap.SugaredLogger, scope tally.Scope, store storage.Storage, - changeStore changestore.ChangeStore, registry consumer.TopicRegistry, mergeCheckers mergechecker.Factory, changeProviders changeprovider.Factory, @@ -67,7 +64,6 @@ func NewController( logger: logger.Named("validate_controller"), metricsScope: scope.SubScope("validate_controller"), store: store, - changeStore: changeStore, registry: registry, mergeCheckers: mergeCheckers, changeProviders: changeProviders, @@ -201,7 +197,7 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (r func (c *Controller) checkDuplicate(ctx context.Context, request entity.Request) (string, error) { seenOwners := make(map[string]struct{}) for _, uri := range request.Change.URIs { - records, err := c.changeStore.GetByURI(ctx, request.Queue, uri) + records, err := c.store.GetChangeStore().GetByURI(ctx, request.Queue, uri) if err != nil { coremetrics.NamedCounter(c.metricsScope, "process", "change_store_query_errors", 1) return "", fmt.Errorf("failed to query change store for request %s uri=%s: %w", request.ID, uri, err) diff --git a/submitqueue/orchestrator/controller/validate/validate_test.go b/submitqueue/orchestrator/controller/validate/validate_test.go index c8e8850a..4426e9a8 100644 --- a/submitqueue/orchestrator/controller/validate/validate_test.go +++ b/submitqueue/orchestrator/controller/validate/validate_test.go @@ -29,7 +29,6 @@ import ( "github.com/uber/submitqueue/submitqueue/entity" "github.com/uber/submitqueue/submitqueue/extension/changeprovider" changeprovidermock "github.com/uber/submitqueue/submitqueue/extension/changeprovider/mock" - changemock "github.com/uber/submitqueue/submitqueue/extension/changestore/mock" "github.com/uber/submitqueue/submitqueue/extension/mergechecker" mergecheckermock "github.com/uber/submitqueue/submitqueue/extension/mergechecker/mock" "github.com/uber/submitqueue/submitqueue/extension/storage" @@ -84,8 +83,8 @@ func newMockStorage(ctrl *gomock.Controller, request entity.Request) (*storagemo // newMockChangeStore creates a MockChangeStore with default no-overlap behavior. // Tests that need to simulate overlap can override GetByURI with their own EXPECT. // Validate is read-only against the change store — it never calls Create. -func newMockChangeStore(ctrl *gomock.Controller) *changemock.MockChangeStore { - cs := changemock.NewMockChangeStore(ctrl) +func newMockChangeStore(ctrl *gomock.Controller) *storagemock.MockChangeStore { + cs := storagemock.NewMockChangeStore(ctrl) cs.EXPECT().GetByURI(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes() return cs } @@ -95,13 +94,15 @@ func newTestController( t *testing.T, ctrl *gomock.Controller, store *storagemock.MockStorage, - cs *changemock.MockChangeStore, + cs *storagemock.MockChangeStore, mc mergechecker.MergeChecker, publishErr error, ) *Controller { logger := zaptest.NewLogger(t).Sugar() scope := tally.NoopScope + store.EXPECT().GetChangeStore().Return(cs).AnyTimes() + mockPub := queuemock.NewMockPublisher(ctrl) mockPub.EXPECT().Publish(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( func(ctx context.Context, topic string, msg entityqueue.Message) error { @@ -124,7 +125,7 @@ func newTestController( cpFactory := changeprovidermock.NewMockFactory(ctrl) cpFactory.EXPECT().For(gomock.Any()).Return(cp, nil).AnyTimes() - return NewController(logger, scope, store, cs, registry, mcFactory, cpFactory, consumer.TopicKeyValidate, "orchestrator-validate") + return NewController(logger, scope, store, registry, mcFactory, cpFactory, consumer.TopicKeyValidate, "orchestrator-validate") } func TestNewController(t *testing.T) { @@ -421,7 +422,7 @@ func TestController_Process_DuplicateDetection(t *testing.T) { store := storagemock.NewMockStorage(ctrl) store.EXPECT().GetRequestStore().Return(mockReqStore).AnyTimes() - cs := changemock.NewMockChangeStore(ctrl) + cs := storagemock.NewMockChangeStore(ctrl) // One GetByURI per URI on the request, in order. Controller short-circuits on first // live duplicate, so .AnyTimes() lets unmatched URIs go un-queried. for _, u := range uris { @@ -465,7 +466,7 @@ func TestController_Process_ChangeStoreQueryFailure(t *testing.T) { } store, _ := newMockStorage(ctrl, request) - cs := changemock.NewMockChangeStore(ctrl) + cs := storagemock.NewMockChangeStore(ctrl) cs.EXPECT().GetByURI(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, fmt.Errorf("change store down")) controller := newTestController(t, ctrl, store, cs, mc, nil) @@ -504,7 +505,7 @@ func TestController_Process_TerminalShortCircuit(t *testing.T) { // No EXPECTs on merge checker or change store: gomock will fail if either is called. mc := mergecheckermock.NewMockMergeChecker(ctrl) - cs := changemock.NewMockChangeStore(ctrl) + cs := storagemock.NewMockChangeStore(ctrl) // Sentinel publish error: if Process publishes, it returns a non-nil err, // which the require.NoError below will catch. diff --git a/test/e2e/submitqueue/BUILD.bazel b/test/e2e/submitqueue/BUILD.bazel index e6c24d41..b0db796e 100644 --- a/test/e2e/submitqueue/BUILD.bazel +++ b/test/e2e/submitqueue/BUILD.bazel @@ -9,7 +9,6 @@ go_test( "//example/submitqueue:docker-compose.yml", "//extension/counter/mysql/schema", "//extension/messagequeue/mysql/schema", - "//submitqueue/extension/changestore/mysql/schema", "//submitqueue/extension/storage/mysql/schema", ], tags = [ diff --git a/test/e2e/submitqueue/suite_test.go b/test/e2e/submitqueue/suite_test.go index 1387da4b..19a82787 100644 --- a/test/e2e/submitqueue/suite_test.go +++ b/test/e2e/submitqueue/suite_test.go @@ -87,7 +87,6 @@ func (s *E2EIntegrationSuite) SetupSuite() { // Apply schemas programmatically to application database testutil.ApplySchema(t, s.log, s.db, testutil.SchemaDir("submitqueue/extension/storage/mysql/schema")) testutil.ApplySchema(t, s.log, s.db, testutil.SchemaDir("extension/counter/mysql/schema")) - testutil.ApplySchema(t, s.log, s.db, testutil.SchemaDir("submitqueue/extension/changestore/mysql/schema")) // Apply schemas programmatically to queue database testutil.ApplySchema(t, s.log, s.queueDB, testutil.SchemaDir("extension/messagequeue/mysql/schema")) diff --git a/test/integration/submitqueue/extension/changestore/mysql/BUILD.bazel b/test/integration/submitqueue/extension/changestore/mysql/BUILD.bazel deleted file mode 100644 index ffc829c8..00000000 --- a/test/integration/submitqueue/extension/changestore/mysql/BUILD.bazel +++ /dev/null @@ -1,25 +0,0 @@ -load("@rules_go//go:def.bzl", "go_test") - -go_test( - name = "mysql_test", - srcs = ["changestore_test.go"], - data = [ - "docker-compose.yml", - "//submitqueue/extension/changestore/mysql/schema", - ], - tags = [ - "external", - "integration", - ], - deps = [ - "//submitqueue/entity", - "//submitqueue/extension/changestore", - "//submitqueue/extension/changestore/mysql", - "//test/testutil", - "@com_github_go_sql_driver_mysql//:mysql", - "@com_github_stretchr_testify//assert", - "@com_github_stretchr_testify//require", - "@com_github_stretchr_testify//suite", - "@com_github_uber_go_tally_v4//:tally", - ], -) diff --git a/test/integration/submitqueue/extension/changestore/mysql/changestore_test.go b/test/integration/submitqueue/extension/changestore/mysql/changestore_test.go deleted file mode 100644 index 248dbf81..00000000 --- a/test/integration/submitqueue/extension/changestore/mysql/changestore_test.go +++ /dev/null @@ -1,202 +0,0 @@ -// Copyright (c) 2025 Uber Technologies, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package mysql - -import ( - "context" - "database/sql" - "sort" - "testing" - - _ "github.com/go-sql-driver/mysql" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "github.com/stretchr/testify/suite" - "github.com/uber-go/tally/v4" - - "github.com/uber/submitqueue/submitqueue/entity" - "github.com/uber/submitqueue/submitqueue/extension/changestore" - mysqlchangestore "github.com/uber/submitqueue/submitqueue/extension/changestore/mysql" - "github.com/uber/submitqueue/test/testutil" -) - -// MySQLChangeStoreIntegrationSuite tests the MySQL ChangeStore implementation -// against a real MySQL instance launched via docker-compose. -type MySQLChangeStoreIntegrationSuite struct { - suite.Suite - stack *testutil.ComposeStack - db *sql.DB - store changestore.ChangeStore - log *testutil.TestLogger - ctx context.Context -} - -func TestMySQLChangeStoreIntegration(t *testing.T) { - suite.Run(t, new(MySQLChangeStoreIntegrationSuite)) -} - -func (s *MySQLChangeStoreIntegrationSuite) SetupSuite() { - t := s.T() - s.ctx = context.Background() - s.log = testutil.NewTestLogger(t) - - s.log.Logf("Starting MySQL ChangeStore integration test suite using docker-compose") - - s.stack = testutil.NewComposeStack( - t, - s.log, - s.ctx, - "docker-compose.yml", - "ext-submitqueue-changestore-mysql", - ) - - require.NoError(t, s.stack.Up(), "failed to start compose stack") - s.log.Logf("Compose stack started successfully") - - var err error - s.db, err = s.stack.ConnectMySQLService("mysql") - require.NoError(t, err, "failed to connect to MySQL") - - testutil.ApplySchema(t, s.log, s.db, testutil.SchemaDir("submitqueue/extension/changestore/mysql/schema")) - s.log.Logf("Schemas applied successfully") - - s.store = mysqlchangestore.NewChangeStore(s.db, tally.NoopScope) - - t.Cleanup(func() { - if s.db != nil { - s.log.Logf("Closing MySQL connection") - s.db.Close() - } - }) -} - -// SetupTest truncates the change table between tests so cases stay isolated. -func (s *MySQLChangeStoreIntegrationSuite) SetupTest() { - _, err := s.db.ExecContext(s.ctx, "TRUNCATE TABLE `change`") - require.NoError(s.T(), err) -} - -func (s *MySQLChangeStoreIntegrationSuite) TestCreateAndGet_NoMatch() { - t := s.T() - require.NoError(t, s.store.Create(s.ctx, entity.ChangeRecord{ - URI: "github://uber/x/pull/1/aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", RequestID: "q/1", Queue: "q", CreatedAt: 1, UpdatedAt: 1, Version: 1, - })) - - got, err := s.store.GetByURI(s.ctx, "q", "github://uber/x/pull/2/bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb") - require.NoError(t, err) - assert.Empty(t, got) -} - -func (s *MySQLChangeStoreIntegrationSuite) TestCreateAndGet_Match() { - t := s.T() - uri := "github://uber/x/pull/1/aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" - require.NoError(t, s.store.Create(s.ctx, entity.ChangeRecord{ - URI: uri, RequestID: "q/1", Queue: "q", CreatedAt: 1, UpdatedAt: 1, Version: 1, - })) - - got, err := s.store.GetByURI(s.ctx, "q", uri) - require.NoError(t, err) - require.Len(t, got, 1) - assert.Equal(t, "q/1", got[0].RequestID) - assert.Equal(t, uri, got[0].URI) - assert.Equal(t, "q", got[0].Queue) - assert.Equal(t, int32(1), got[0].Version) -} - -func (s *MySQLChangeStoreIntegrationSuite) TestGetByURI_DoesNotExcludeSelf() { - // The store does not filter by request_id; callers filter self if they wish. - t := s.T() - uri := "github://uber/x/pull/1/aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" - require.NoError(t, s.store.Create(s.ctx, entity.ChangeRecord{ - URI: uri, RequestID: "q/1", Queue: "q", CreatedAt: 1, UpdatedAt: 1, Version: 1, - })) - - got, err := s.store.GetByURI(s.ctx, "q", uri) - require.NoError(t, err) - require.Len(t, got, 1, "store returns the row even when caller might consider it self") - assert.Equal(t, "q/1", got[0].RequestID) -} - -func (s *MySQLChangeStoreIntegrationSuite) TestGetByURI_QueueScoped() { - t := s.T() - uri := "github://uber/x/pull/1/aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" - require.NoError(t, s.store.Create(s.ctx, entity.ChangeRecord{ - URI: uri, RequestID: "qA/1", Queue: "qA", CreatedAt: 1, UpdatedAt: 1, Version: 1, - })) - - got, err := s.store.GetByURI(s.ctx, "qB", uri) - require.NoError(t, err) - assert.Empty(t, got, "GetByURI must not return rows from a different queue") -} - -func (s *MySQLChangeStoreIntegrationSuite) TestCreate_Idempotent() { - t := s.T() - rec := entity.ChangeRecord{URI: "github://uber/x/pull/1/aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", RequestID: "q/1", Queue: "q", CreatedAt: 1, UpdatedAt: 1, Version: 1} - - require.NoError(t, s.store.Create(s.ctx, rec)) - require.NoError(t, s.store.Create(s.ctx, rec), "second insert with same PK must succeed (INSERT IGNORE)") - - var count int - require.NoError(t, s.db.QueryRowContext(s.ctx, "SELECT COUNT(*) FROM `change`").Scan(&count)) - assert.Equal(t, 1, count, "INSERT IGNORE must not duplicate rows") -} - -func (s *MySQLChangeStoreIntegrationSuite) TestCreate_DifferentRequestSameURI() { - t := s.T() - uri := "github://uber/x/pull/1/aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" - require.NoError(t, s.store.Create(s.ctx, entity.ChangeRecord{ - URI: uri, RequestID: "q/1", Queue: "q", CreatedAt: 1, UpdatedAt: 1, Version: 1, - })) - require.NoError(t, s.store.Create(s.ctx, entity.ChangeRecord{ - URI: uri, RequestID: "q/2", Queue: "q", CreatedAt: 2, UpdatedAt: 2, Version: 1, - })) - - got, err := s.store.GetByURI(s.ctx, "q", uri) - require.NoError(t, err) - require.Len(t, got, 2) - - ids := []string{got[0].RequestID, got[1].RequestID} - sort.Strings(ids) - assert.Equal(t, []string{"q/1", "q/2"}, ids) -} - -func (s *MySQLChangeStoreIntegrationSuite) TestCreate_PreservesMetadata() { - t := s.T() - const meta = `{"title":"add new feature"}` - uri := "github://uber/x/pull/1/aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" - require.NoError(t, s.store.Create(s.ctx, entity.ChangeRecord{ - URI: uri, RequestID: "q/1", Queue: "q", Metadata: meta, CreatedAt: 1, UpdatedAt: 1, Version: 1, - })) - - got, err := s.store.GetByURI(s.ctx, "q", uri) - require.NoError(t, err) - require.Len(t, got, 1) - assert.JSONEq(t, meta, got[0].Metadata) -} - -func (s *MySQLChangeStoreIntegrationSuite) TestCreate_EmptyMetadataStoredAsObject() { - // metadata is NOT NULL in the schema. The impl substitutes '{}' for an empty - // Metadata field so callers don't need to know about the column constraint. - t := s.T() - uri := "github://uber/x/pull/1/aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" - require.NoError(t, s.store.Create(s.ctx, entity.ChangeRecord{ - URI: uri, RequestID: "q/1", Queue: "q", CreatedAt: 1, UpdatedAt: 1, Version: 1, - })) - - got, err := s.store.GetByURI(s.ctx, "q", uri) - require.NoError(t, err) - require.Len(t, got, 1) - assert.JSONEq(t, "{}", got[0].Metadata) -} diff --git a/test/integration/submitqueue/extension/changestore/mysql/docker-compose.yml b/test/integration/submitqueue/extension/changestore/mysql/docker-compose.yml deleted file mode 100644 index 74241c77..00000000 --- a/test/integration/submitqueue/extension/changestore/mysql/docker-compose.yml +++ /dev/null @@ -1,19 +0,0 @@ -# Docker Compose for MySQL ChangeStore Library Tests -# Tests the changestore library's MySQL implementation in isolation. - -services: - mysql: - image: mysql:8.0 - environment: - MYSQL_ROOT_PASSWORD: root - MYSQL_DATABASE: submitqueue - ports: - - "3306" # Random ephemeral port to avoid conflicts - healthcheck: - # Use 127.0.0.1 (TCP) instead of localhost (Unix socket). MySQL treats - # "localhost" as a socket connection, which can be ready before the TCP - # listener — causing dependent services that connect over TCP to fail. - test: ["CMD", "mysqladmin", "ping", "-h", "127.0.0.1", "-proot"] - interval: 5s - timeout: 5s - retries: 10 diff --git a/test/integration/submitqueue/extension/storage/suite.go b/test/integration/submitqueue/extension/storage/suite.go index dec135cd..32bf66ce 100644 --- a/test/integration/submitqueue/extension/storage/suite.go +++ b/test/integration/submitqueue/extension/storage/suite.go @@ -16,6 +16,7 @@ package storage import ( "context" + "sort" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -227,3 +228,143 @@ func (s *StorageContractSuite) TestStorage_CreateDuplicate() { s.log.Logf("CreateDuplicate test passed: prevented duplicate creation") } + +// changeURI is a representative change URI reused across change-store contract tests. +const changeURI = "github://uber/x/pull/1/aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + +// Change-store contract tests scope each case to a distinct queue so they stay +// isolated without truncation (GetByURI is scoped by (queue, uri)). + +// TestStorage_ChangeCreateAndGet_NoMatch verifies GetByURI returns empty for an unclaimed URI. +func (s *StorageContractSuite) TestStorage_ChangeCreateAndGet_NoMatch() { + t := s.T() + ctx := s.ctx + const queue = "cq-nomatch" + + require.NoError(t, s.storage.GetChangeStore().Create(ctx, entity.ChangeRecord{ + URI: changeURI, RequestID: queue + "/1", Queue: queue, CreatedAt: 1, UpdatedAt: 1, Version: 1, + })) + + got, err := s.storage.GetChangeStore().GetByURI(ctx, queue, "github://uber/x/pull/2/bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb") + require.NoError(t, err) + assert.Empty(t, got) +} + +// TestStorage_ChangeCreateAndGet_Match verifies a created record is returned by GetByURI. +func (s *StorageContractSuite) TestStorage_ChangeCreateAndGet_Match() { + t := s.T() + ctx := s.ctx + const queue = "cq-match" + + require.NoError(t, s.storage.GetChangeStore().Create(ctx, entity.ChangeRecord{ + URI: changeURI, RequestID: queue + "/1", Queue: queue, CreatedAt: 1, UpdatedAt: 1, Version: 1, + })) + + got, err := s.storage.GetChangeStore().GetByURI(ctx, queue, changeURI) + require.NoError(t, err) + require.Len(t, got, 1) + assert.Equal(t, queue+"/1", got[0].RequestID) + assert.Equal(t, changeURI, got[0].URI) + assert.Equal(t, queue, got[0].Queue) + assert.Equal(t, int32(1), got[0].Version) +} + +// TestStorage_ChangeGetByURI_DoesNotExcludeSelf verifies the store does not filter by request_id. +func (s *StorageContractSuite) TestStorage_ChangeGetByURI_DoesNotExcludeSelf() { + t := s.T() + ctx := s.ctx + const queue = "cq-self" + + require.NoError(t, s.storage.GetChangeStore().Create(ctx, entity.ChangeRecord{ + URI: changeURI, RequestID: queue + "/1", Queue: queue, CreatedAt: 1, UpdatedAt: 1, Version: 1, + })) + + got, err := s.storage.GetChangeStore().GetByURI(ctx, queue, changeURI) + require.NoError(t, err) + require.Len(t, got, 1, "store returns the row even when caller might consider it self") + assert.Equal(t, queue+"/1", got[0].RequestID) +} + +// TestStorage_ChangeGetByURI_QueueScoped verifies GetByURI never returns rows from another queue. +func (s *StorageContractSuite) TestStorage_ChangeGetByURI_QueueScoped() { + t := s.T() + ctx := s.ctx + + require.NoError(t, s.storage.GetChangeStore().Create(ctx, entity.ChangeRecord{ + URI: changeURI, RequestID: "cq-scoped-A/1", Queue: "cq-scoped-A", CreatedAt: 1, UpdatedAt: 1, Version: 1, + })) + + got, err := s.storage.GetChangeStore().GetByURI(ctx, "cq-scoped-B", changeURI) + require.NoError(t, err) + assert.Empty(t, got, "GetByURI must not return rows from a different queue") +} + +// TestStorage_ChangeCreate_Idempotent verifies a repeated Create of the same PK is a no-op. +func (s *StorageContractSuite) TestStorage_ChangeCreate_Idempotent() { + t := s.T() + ctx := s.ctx + const queue = "cq-idem" + rec := entity.ChangeRecord{URI: changeURI, RequestID: queue + "/1", Queue: queue, CreatedAt: 1, UpdatedAt: 1, Version: 1} + + require.NoError(t, s.storage.GetChangeStore().Create(ctx, rec)) + require.NoError(t, s.storage.GetChangeStore().Create(ctx, rec), "second insert with same PK must succeed (INSERT IGNORE)") + + got, err := s.storage.GetChangeStore().GetByURI(ctx, queue, changeURI) + require.NoError(t, err) + assert.Len(t, got, 1, "idempotent create must not duplicate rows") +} + +// TestStorage_ChangeCreate_DifferentRequestSameURI verifies distinct requests on one URI coexist. +func (s *StorageContractSuite) TestStorage_ChangeCreate_DifferentRequestSameURI() { + t := s.T() + ctx := s.ctx + const queue = "cq-multi" + + require.NoError(t, s.storage.GetChangeStore().Create(ctx, entity.ChangeRecord{ + URI: changeURI, RequestID: queue + "/1", Queue: queue, CreatedAt: 1, UpdatedAt: 1, Version: 1, + })) + require.NoError(t, s.storage.GetChangeStore().Create(ctx, entity.ChangeRecord{ + URI: changeURI, RequestID: queue + "/2", Queue: queue, CreatedAt: 2, UpdatedAt: 2, Version: 1, + })) + + got, err := s.storage.GetChangeStore().GetByURI(ctx, queue, changeURI) + require.NoError(t, err) + require.Len(t, got, 2) + + ids := []string{got[0].RequestID, got[1].RequestID} + sort.Strings(ids) + assert.Equal(t, []string{queue + "/1", queue + "/2"}, ids) +} + +// TestStorage_ChangeCreate_PreservesMetadata verifies metadata round-trips through the store. +func (s *StorageContractSuite) TestStorage_ChangeCreate_PreservesMetadata() { + t := s.T() + ctx := s.ctx + const queue = "cq-meta" + const meta = `{"title":"add new feature"}` + + require.NoError(t, s.storage.GetChangeStore().Create(ctx, entity.ChangeRecord{ + URI: changeURI, RequestID: queue + "/1", Queue: queue, Metadata: meta, CreatedAt: 1, UpdatedAt: 1, Version: 1, + })) + + got, err := s.storage.GetChangeStore().GetByURI(ctx, queue, changeURI) + require.NoError(t, err) + require.Len(t, got, 1) + assert.JSONEq(t, meta, got[0].Metadata) +} + +// TestStorage_ChangeCreate_EmptyMetadataStoredAsObject verifies empty metadata is stored as "{}". +func (s *StorageContractSuite) TestStorage_ChangeCreate_EmptyMetadataStoredAsObject() { + t := s.T() + ctx := s.ctx + const queue = "cq-emptymeta" + + require.NoError(t, s.storage.GetChangeStore().Create(ctx, entity.ChangeRecord{ + URI: changeURI, RequestID: queue + "/1", Queue: queue, CreatedAt: 1, UpdatedAt: 1, Version: 1, + })) + + got, err := s.storage.GetChangeStore().GetByURI(ctx, queue, changeURI) + require.NoError(t, err) + require.Len(t, got, 1) + assert.JSONEq(t, "{}", got[0].Metadata) +}