From 6b063a260f420fe0ed9bd9ca50bcfc7a9226548e Mon Sep 17 00:00:00 2001 From: manjari Date: Mon, 23 Feb 2026 21:40:58 +0000 Subject: [PATCH 01/10] (entities) Adds change provider entity and mysql backed store for it --- entity/BUILD.bazel | 5 +- entity/change_provider.go | 19 +++++ extension/storage/BUILD.bazel | 1 + extension/storage/change_provider_store.go | 20 +++++ extension/storage/mysql/BUILD.bazel | 1 + .../storage/mysql/change_provider_store.go | 76 +++++++++++++++++++ .../storage/mysql/schema/change_provider.sql | 9 +++ extension/storage/mysql/storage.go | 15 +++- extension/storage/storage.go | 3 + 9 files changed, 144 insertions(+), 5 deletions(-) create mode 100644 entity/change_provider.go create mode 100644 extension/storage/change_provider_store.go create mode 100644 extension/storage/mysql/change_provider_store.go create mode 100644 extension/storage/mysql/schema/change_provider.sql diff --git a/entity/BUILD.bazel b/entity/BUILD.bazel index b9002664..aee3f86d 100644 --- a/entity/BUILD.bazel +++ b/entity/BUILD.bazel @@ -2,7 +2,10 @@ load("@rules_go//go:def.bzl", "go_library") go_library( name = "entity", - srcs = ["request.go"], + srcs = [ + "change_provider.go", + "request.go", + ], importpath = "github.com/uber/submitqueue/entity", visibility = ["//visibility:public"], ) diff --git a/entity/change_provider.go b/entity/change_provider.go new file mode 100644 index 00000000..25738001 --- /dev/null +++ b/entity/change_provider.go @@ -0,0 +1,19 @@ +package entity + +type ChangeProvider struct { + // ID is the globally unique identifier for the land request. Format: "/". + ID string + // Queue is the name of the queue processing the land request. + // This is defined in the configuration and should be unique within the system. + Queue string + // ChangeProviderSrc defines the source of the change. For e.g. - Github, Gitlab etc. + ChangeProviderSrc string + // ChangeProviderID is the identifier specified by the change provider source. For e.g. - Github PR ID etc. + ChangeProviderID string + // Metadata is the interesting data from the change provider that we want to store. + // This is a freeform JSON object. + Metadata map[string]any + // Version is the version of the object. It is used for optimistic locking. + // Versioning starts at 1 and is incremented for each change to the object. + Version int32 +} diff --git a/extension/storage/BUILD.bazel b/extension/storage/BUILD.bazel index 8d6b95f0..34f17052 100644 --- a/extension/storage/BUILD.bazel +++ b/extension/storage/BUILD.bazel @@ -3,6 +3,7 @@ load("@rules_go//go:def.bzl", "go_library") go_library( name = "storage", srcs = [ + "change_provider_store.go", "request_store.go", "storage.go", ], diff --git a/extension/storage/change_provider_store.go b/extension/storage/change_provider_store.go new file mode 100644 index 00000000..97bbe78f --- /dev/null +++ b/extension/storage/change_provider_store.go @@ -0,0 +1,20 @@ +package storage + +import ( + "context" + + "github.com/uber/submitqueue/entity" +) + +// ChangeProviderStore is an interface that defines methods for managing change provider information in the database. +type ChangeProviderStore interface { + // Get retrieves information about a change by ID. Returns ErrNotFound if the change provider is + // not found. + Get(ctx context.Context, id string) (entity.ChangeProvider, error) + + // Create creates a new change provider. + Create(ctx context.Context, request entity.Request) error + + // There is no update function since once created, data is only ever read from this + // store. +} diff --git a/extension/storage/mysql/BUILD.bazel b/extension/storage/mysql/BUILD.bazel index b3efb345..4628ddd6 100644 --- a/extension/storage/mysql/BUILD.bazel +++ b/extension/storage/mysql/BUILD.bazel @@ -3,6 +3,7 @@ load("@rules_go//go:def.bzl", "go_library") go_library( name = "mysql", srcs = [ + "change_provider_store.go", "request_store.go", "storage.go", ], diff --git a/extension/storage/mysql/change_provider_store.go b/extension/storage/mysql/change_provider_store.go new file mode 100644 index 00000000..c805c8f6 --- /dev/null +++ b/extension/storage/mysql/change_provider_store.go @@ -0,0 +1,76 @@ +package mysql + +import ( + "context" + "database/sql" + "encoding/json" + "errors" + "fmt" + + "github.com/go-sql-driver/mysql" + + "github.com/uber/submitqueue/entity" + "github.com/uber/submitqueue/extension/storage" +) + +type changeProviderStore struct { + db *sql.DB +} + +// NewChangeProviderStore creates a new MySQL-backed ChangeProviderStore. +func NewChangeProviderStore(db *sql.DB) storage.ChangeProviderStore { + return &changeProviderStore{db: db} +} + +// Get retrieves a change provider by ID. Returns ErrNotFound if the change provider is not found. +func (s *changeProviderStore) Get(ctx context.Context, id string) (entity.ChangeProvider, error) { + var cp entity.ChangeProvider + var changeProviderIDJSON []byte + var metadataJSON []byte + + err := s.db.QueryRowContext(ctx, + "SELECT id, queue, change_provider_src, change_provider_id, metadata, version FROM change_provider WHERE id = ?", + id, + ).Scan(&cp.ID, &cp.Queue, &cp.ChangeProviderSrc, &changeProviderIDJSON, &metadataJSON, &cp.Version) + + if errors.Is(err, sql.ErrNoRows) { + return entity.ChangeProvider{}, storage.WrapNotFound(err) + } + if err != nil { + return entity.ChangeProvider{}, fmt.Errorf("failed to get change provider entity id=%s from the database: %w", id, err) + } + + if err := json.Unmarshal(changeProviderIDJSON, &cp.ChangeProviderID); err != nil { + return entity.ChangeProvider{}, fmt.Errorf("failed to unmarshal change provider ID for change provider entity id=%s from the database: %w", id, err) + } + + if err := json.Unmarshal(metadataJSON, &cp.Metadata); err != nil { + return entity.ChangeProvider{}, fmt.Errorf("failed to unmarshal metadata for change provider entity id=%s from the database: %w", id, err) + } + + return cp, nil +} + +// Create creates a new change provider from a request. Returns ErrAlreadyExists if the entry already exists. +func (s *changeProviderStore) Create(ctx context.Context, request entity.Request) error { + changeProviderIDJSON, err := json.Marshal(request.Change.IDs) + if err != nil { + return fmt.Errorf("failed to marshal change IDs=%v id=%s for Create change provider entity: %w", request.Change.IDs, request.ID, err) + } + + metadataJSON := []byte("{}") + + _, err = s.db.ExecContext(ctx, + "INSERT INTO change_provider (id, queue, change_provider_src, change_provider_id, metadata, version) VALUES (?, ?, ?, ?, ?, ?)", + request.ID, request.Queue, request.Change.Source, changeProviderIDJSON, metadataJSON, request.Version, + ) + if err != nil { + var mysqlErr *mysql.MySQLError + if errors.As(err, &mysqlErr) && mysqlErr.Number == 1062 { + return fmt.Errorf("change provider entity id=%s: %w", request.ID, storage.ErrAlreadyExists) + } + return fmt.Errorf("failed to insert change provider entity id=%s: %w", request.ID, err) + } + + return nil +} diff --git a/extension/storage/mysql/schema/change_provider.sql b/extension/storage/mysql/schema/change_provider.sql new file mode 100644 index 00000000..77305da2 --- /dev/null +++ b/extension/storage/mysql/schema/change_provider.sql @@ -0,0 +1,9 @@ +CREATE TABLE IF NOT EXISTS change_provider ( + id VARCHAR(255) NOT NULL, + queue VARCHAR(255) NOT NULL, + change_provider_src VARCHAR(255) NOT NULL, + change_provider_id JSON NOT NULL, + metadata JSON NOT NUll, + version INT NOT NULL, + PRIMARY KEY (id,queue,change_provider_src,change_provider_id) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; diff --git a/extension/storage/mysql/storage.go b/extension/storage/mysql/storage.go index b2b8e8ca..618a7486 100644 --- a/extension/storage/mysql/storage.go +++ b/extension/storage/mysql/storage.go @@ -28,8 +28,9 @@ type MySQLParameters struct { } type mysqlStorage struct { - db *sql.DB - requestStore storage.RequestStore + db *sql.DB + requestStore storage.RequestStore + changeProviderStore storage.ChangeProviderStore } // NewStorage creates a new MySQL storage. @@ -50,8 +51,9 @@ func NewStorage(p MySQLParameters) (storage.Storage, error) { } return &mysqlStorage{ - db: db, - requestStore: NewRequestStore(db), + db: db, + requestStore: NewRequestStore(db), + changeProviderStore: NewChangeProviderStore(db), }, nil } @@ -60,6 +62,11 @@ func (f *mysqlStorage) GetRequestStore() storage.RequestStore { return f.requestStore } +// GetChangeProviderStore returns the MySQL-backed ChangeProviderStore. +func (f *mysqlStorage) GetChangeProviderStore() storage.ChangeProviderStore { + return f.changeProviderStore +} + // Close closes the underlying database connection. func (f *mysqlStorage) Close() error { return f.db.Close() diff --git a/extension/storage/storage.go b/extension/storage/storage.go index ab57d004..40f84599 100644 --- a/extension/storage/storage.go +++ b/extension/storage/storage.go @@ -29,6 +29,9 @@ type Storage interface { // GetRequestStore returns the RequestStore instance. GetRequestStore() RequestStore + // GetChangeProviderStore returns the ChangeProviderStore instance. + GetChangeProviderStore() ChangeProviderStore + // Close closes the storage and all underlying connections. Should only be called once at the end of the program. Close() error } From 076451af8ff33c7949a3907a15c7e20173add901 Mon Sep 17 00:00:00 2001 From: manjari Date: Mon, 23 Feb 2026 22:17:51 +0000 Subject: [PATCH 02/10] fix --- extension/storage/change_provider_store.go | 2 +- .../storage/mysql/change_provider_store.go | 23 +++++++------------ .../storage/mysql/schema/change_provider.sql | 2 +- 3 files changed, 10 insertions(+), 17 deletions(-) diff --git a/extension/storage/change_provider_store.go b/extension/storage/change_provider_store.go index 97bbe78f..c0016015 100644 --- a/extension/storage/change_provider_store.go +++ b/extension/storage/change_provider_store.go @@ -13,7 +13,7 @@ type ChangeProviderStore interface { Get(ctx context.Context, id string) (entity.ChangeProvider, error) // Create creates a new change provider. - Create(ctx context.Context, request entity.Request) error + Create(ctx context.Context, changeProvider entity.ChangeProvider) error // There is no update function since once created, data is only ever read from this // store. diff --git a/extension/storage/mysql/change_provider_store.go b/extension/storage/mysql/change_provider_store.go index c805c8f6..5763d73f 100644 --- a/extension/storage/mysql/change_provider_store.go +++ b/extension/storage/mysql/change_provider_store.go @@ -25,13 +25,12 @@ func NewChangeProviderStore(db *sql.DB) storage.ChangeProviderStore { // Get retrieves a change provider by ID. Returns ErrNotFound if the change provider is not found. func (s *changeProviderStore) Get(ctx context.Context, id string) (entity.ChangeProvider, error) { var cp entity.ChangeProvider - var changeProviderIDJSON []byte var metadataJSON []byte err := s.db.QueryRowContext(ctx, "SELECT id, queue, change_provider_src, change_provider_id, metadata, version FROM change_provider WHERE id = ?", id, - ).Scan(&cp.ID, &cp.Queue, &cp.ChangeProviderSrc, &changeProviderIDJSON, &metadataJSON, &cp.Version) + ).Scan(&cp.ID, &cp.Queue, &cp.ChangeProviderSrc, &cp.ChangeProviderID, &metadataJSON, &cp.Version) if errors.Is(err, sql.ErrNoRows) { return entity.ChangeProvider{}, storage.WrapNotFound(err) @@ -40,10 +39,6 @@ func (s *changeProviderStore) Get(ctx context.Context, id string) (entity.Change return entity.ChangeProvider{}, fmt.Errorf("failed to get change provider entity id=%s from the database: %w", id, err) } - if err := json.Unmarshal(changeProviderIDJSON, &cp.ChangeProviderID); err != nil { - return entity.ChangeProvider{}, fmt.Errorf("failed to unmarshal change provider ID for change provider entity id=%s from the database: %w", id, err) - } - if err := json.Unmarshal(metadataJSON, &cp.Metadata); err != nil { return entity.ChangeProvider{}, fmt.Errorf("failed to unmarshal metadata for change provider entity id=%s from the database: %w", id, err) } @@ -51,25 +46,23 @@ func (s *changeProviderStore) Get(ctx context.Context, id string) (entity.Change return cp, nil } -// Create creates a new change provider from a request. Returns ErrAlreadyExists if the entry already exists. -func (s *changeProviderStore) Create(ctx context.Context, request entity.Request) error { - changeProviderIDJSON, err := json.Marshal(request.Change.IDs) +// Create creates a new change provider. Returns ErrAlreadyExists if the entry already exists. +func (s *changeProviderStore) Create(ctx context.Context, changeProvider entity.ChangeProvider) error { + metadataJSON, err := json.Marshal(changeProvider.Metadata) if err != nil { - return fmt.Errorf("failed to marshal change IDs=%v id=%s for Create change provider entity: %w", request.Change.IDs, request.ID, err) + return fmt.Errorf("failed to marshal metadata id=%s for Create change provider entity: %w", changeProvider.ID, err) } - metadataJSON := []byte("{}") - _, err = s.db.ExecContext(ctx, "INSERT INTO change_provider (id, queue, change_provider_src, change_provider_id, metadata, version) VALUES (?, ?, ?, ?, ?, ?)", - request.ID, request.Queue, request.Change.Source, changeProviderIDJSON, metadataJSON, request.Version, + changeProvider.ID, changeProvider.Queue, changeProvider.ChangeProviderSrc, changeProvider.ChangeProviderID, metadataJSON, changeProvider.Version, ) if err != nil { var mysqlErr *mysql.MySQLError if errors.As(err, &mysqlErr) && mysqlErr.Number == 1062 { - return fmt.Errorf("change provider entity id=%s: %w", request.ID, storage.ErrAlreadyExists) + return fmt.Errorf("change provider entity id=%s: %w", changeProvider.ID, storage.ErrAlreadyExists) } - return fmt.Errorf("failed to insert change provider entity id=%s: %w", request.ID, err) + return fmt.Errorf("failed to insert change provider entity id=%s: %w", changeProvider.ID, err) } return nil diff --git a/extension/storage/mysql/schema/change_provider.sql b/extension/storage/mysql/schema/change_provider.sql index 77305da2..a1630b8e 100644 --- a/extension/storage/mysql/schema/change_provider.sql +++ b/extension/storage/mysql/schema/change_provider.sql @@ -2,7 +2,7 @@ CREATE TABLE IF NOT EXISTS change_provider ( id VARCHAR(255) NOT NULL, queue VARCHAR(255) NOT NULL, change_provider_src VARCHAR(255) NOT NULL, - change_provider_id JSON NOT NULL, + change_provider_id VARCHAR(255) NOT NULL, metadata JSON NOT NUll, version INT NOT NULL, PRIMARY KEY (id,queue,change_provider_src,change_provider_id) From 49ab9443f2e15acbc44b3247480a6e6f648988cd Mon Sep 17 00:00:00 2001 From: manjari Date: Mon, 23 Feb 2026 22:22:57 +0000 Subject: [PATCH 03/10] fix land test to add new change provider store --- gateway/controller/land_test.go | 111 ++++++++++++++++++++++++-------- 1 file changed, 85 insertions(+), 26 deletions(-) diff --git a/gateway/controller/land_test.go b/gateway/controller/land_test.go index 46b78a53..a1f5d891 100644 --- a/gateway/controller/land_test.go +++ b/gateway/controller/land_test.go @@ -38,24 +38,48 @@ func (m *mockRequestStore) UpdateState(ctx context.Context, id string, version i return nil } +type mockChangeProviderStore struct { + createFunc func(ctx context.Context, changeProvider entity.ChangeProvider) error +} + +func (m *mockChangeProviderStore) Get(ctx context.Context, id string) (entity.ChangeProvider, error) { + return entity.ChangeProvider{}, nil +} + +func (m *mockChangeProviderStore) Create(ctx context.Context, changeProvider entity.ChangeProvider) error { + return m.createFunc(ctx, changeProvider) +} + type mockStorage struct { - requestStore storage.RequestStore + requestStore storage.RequestStore + changeProviderStore storage.ChangeProviderStore } func (m *mockStorage) GetRequestStore() storage.RequestStore { return m.requestStore } +func (m *mockStorage) GetChangeProviderStore() storage.ChangeProviderStore { + return m.changeProviderStore +} + func (m *mockStorage) Close() error { return nil } func TestNewLandController(t *testing.T) { - store := &mockStorage{requestStore: &mockRequestStore{ - createFunc: func(ctx context.Context, request entity.Request) error { - return nil + store := &mockStorage{ + requestStore: &mockRequestStore{ + createFunc: func(ctx context.Context, request entity.Request) error { + return nil + }, }, - }} + changeProviderStore: &mockChangeProviderStore{ + createFunc: func(ctx context.Context, changeProvider entity.ChangeProvider) error { + return nil + }, + }, + } cnt := &mockCounter{nextFunc: func(ctx context.Context, domain string) (int64, error) { return 1, nil }} @@ -64,11 +88,18 @@ func TestNewLandController(t *testing.T) { } func TestLand_ReturnsSqid(t *testing.T) { - store := &mockStorage{requestStore: &mockRequestStore{ - createFunc: func(ctx context.Context, request entity.Request) error { - return nil + store := &mockStorage{ + requestStore: &mockRequestStore{ + createFunc: func(ctx context.Context, request entity.Request) error { + return nil + }, }, - }} + changeProviderStore: &mockChangeProviderStore{ + createFunc: func(ctx context.Context, changeProvider entity.ChangeProvider) error { + return nil + }, + }, + } cnt := &mockCounter{nextFunc: func(ctx context.Context, domain string) (int64, error) { return 1, nil }} @@ -88,12 +119,19 @@ func TestLand_ReturnsSqid(t *testing.T) { func TestLand_PassesCorrectParametersToStore(t *testing.T) { var capturedRequest entity.Request - store := &mockStorage{requestStore: &mockRequestStore{ - createFunc: func(ctx context.Context, request entity.Request) error { - capturedRequest = request - return nil + store := &mockStorage{ + requestStore: &mockRequestStore{ + createFunc: func(ctx context.Context, request entity.Request) error { + capturedRequest = request + return nil + }, }, - }} + changeProviderStore: &mockChangeProviderStore{ + createFunc: func(ctx context.Context, changeProvider entity.ChangeProvider) error { + return nil + }, + }, + } cnt := &mockCounter{nextFunc: func(ctx context.Context, domain string) (int64, error) { return 42, nil }} @@ -119,11 +157,18 @@ func TestLand_PassesCorrectParametersToStore(t *testing.T) { } func TestLand_ReturnsErrorOnStorageFailure(t *testing.T) { - store := &mockStorage{requestStore: &mockRequestStore{ - createFunc: func(ctx context.Context, request entity.Request) error { - return fmt.Errorf("database connection failed") + store := &mockStorage{ + requestStore: &mockRequestStore{ + createFunc: func(ctx context.Context, request entity.Request) error { + return fmt.Errorf("database connection failed") + }, }, - }} + changeProviderStore: &mockChangeProviderStore{ + createFunc: func(ctx context.Context, changeProvider entity.ChangeProvider) error { + return nil + }, + }, + } cnt := &mockCounter{nextFunc: func(ctx context.Context, domain string) (int64, error) { return 1, nil }} @@ -140,11 +185,18 @@ func TestLand_ReturnsErrorOnStorageFailure(t *testing.T) { } func TestLand_ReturnsErrorOnCounterFailure(t *testing.T) { - store := &mockStorage{requestStore: &mockRequestStore{ - createFunc: func(ctx context.Context, request entity.Request) error { - return nil + store := &mockStorage{ + requestStore: &mockRequestStore{ + createFunc: func(ctx context.Context, request entity.Request) error { + return nil + }, }, - }} + changeProviderStore: &mockChangeProviderStore{ + createFunc: func(ctx context.Context, changeProvider entity.ChangeProvider) error { + return nil + }, + }, + } cnt := &mockCounter{nextFunc: func(ctx context.Context, domain string) (int64, error) { return 0, fmt.Errorf("counter unavailable") }} @@ -163,11 +215,18 @@ func TestLand_ReturnsErrorOnCounterFailure(t *testing.T) { func TestLand_CounterDomainIncludesQueue(t *testing.T) { var capturedDomain string - store := &mockStorage{requestStore: &mockRequestStore{ - createFunc: func(ctx context.Context, request entity.Request) error { - return nil + store := &mockStorage{ + requestStore: &mockRequestStore{ + createFunc: func(ctx context.Context, request entity.Request) error { + return nil + }, }, - }} + changeProviderStore: &mockChangeProviderStore{ + createFunc: func(ctx context.Context, changeProvider entity.ChangeProvider) error { + return nil + }, + }, + } cnt := &mockCounter{nextFunc: func(ctx context.Context, domain string) (int64, error) { capturedDomain = domain return 1, nil From f78e09ea82b27305738c313679e2ba19d70f9444 Mon Sep 17 00:00:00 2001 From: manjari Date: Mon, 23 Feb 2026 22:30:13 +0000 Subject: [PATCH 04/10] version is not needed for the change provider --- entity/change_provider.go | 3 --- extension/storage/mysql/change_provider_store.go | 8 ++++---- extension/storage/mysql/schema/change_provider.sql | 1 - 3 files changed, 4 insertions(+), 8 deletions(-) diff --git a/entity/change_provider.go b/entity/change_provider.go index 25738001..8f79fb1b 100644 --- a/entity/change_provider.go +++ b/entity/change_provider.go @@ -13,7 +13,4 @@ type ChangeProvider struct { // Metadata is the interesting data from the change provider that we want to store. // This is a freeform JSON object. Metadata map[string]any - // Version is the version of the object. It is used for optimistic locking. - // Versioning starts at 1 and is incremented for each change to the object. - Version int32 } diff --git a/extension/storage/mysql/change_provider_store.go b/extension/storage/mysql/change_provider_store.go index 5763d73f..0e40067e 100644 --- a/extension/storage/mysql/change_provider_store.go +++ b/extension/storage/mysql/change_provider_store.go @@ -28,9 +28,9 @@ func (s *changeProviderStore) Get(ctx context.Context, id string) (entity.Change var metadataJSON []byte err := s.db.QueryRowContext(ctx, - "SELECT id, queue, change_provider_src, change_provider_id, metadata, version FROM change_provider WHERE id = ?", + "SELECT id, queue, change_provider_src, change_provider_id, metadata FROM change_provider WHERE id = ?", id, - ).Scan(&cp.ID, &cp.Queue, &cp.ChangeProviderSrc, &cp.ChangeProviderID, &metadataJSON, &cp.Version) + ).Scan(&cp.ID, &cp.Queue, &cp.ChangeProviderSrc, &cp.ChangeProviderID, &metadataJSON) if errors.Is(err, sql.ErrNoRows) { return entity.ChangeProvider{}, storage.WrapNotFound(err) @@ -54,8 +54,8 @@ func (s *changeProviderStore) Create(ctx context.Context, changeProvider entity. } _, err = s.db.ExecContext(ctx, - "INSERT INTO change_provider (id, queue, change_provider_src, change_provider_id, metadata, version) VALUES (?, ?, ?, ?, ?, ?)", - changeProvider.ID, changeProvider.Queue, changeProvider.ChangeProviderSrc, changeProvider.ChangeProviderID, metadataJSON, changeProvider.Version, + "INSERT INTO change_provider (id, queue, change_provider_src, change_provider_id, metadata) VALUES (?, ?, ?, ?, ?)", + changeProvider.ID, changeProvider.Queue, changeProvider.ChangeProviderSrc, changeProvider.ChangeProviderID, metadataJSON, ) if err != nil { var mysqlErr *mysql.MySQLError diff --git a/extension/storage/mysql/schema/change_provider.sql b/extension/storage/mysql/schema/change_provider.sql index a1630b8e..b55ba0e8 100644 --- a/extension/storage/mysql/schema/change_provider.sql +++ b/extension/storage/mysql/schema/change_provider.sql @@ -4,6 +4,5 @@ CREATE TABLE IF NOT EXISTS change_provider ( change_provider_src VARCHAR(255) NOT NULL, change_provider_id VARCHAR(255) NOT NULL, metadata JSON NOT NUll, - version INT NOT NULL, PRIMARY KEY (id,queue,change_provider_src,change_provider_id) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; From a184bfa083bfe64b60db282e11a766d9767c804d Mon Sep 17 00:00:00 2001 From: manjari Date: Mon, 23 Feb 2026 22:37:45 +0000 Subject: [PATCH 05/10] add description for ChangeProvider entity --- entity/change_provider.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/entity/change_provider.go b/entity/change_provider.go index 8f79fb1b..feff2a02 100644 --- a/entity/change_provider.go +++ b/entity/change_provider.go @@ -1,5 +1,7 @@ package entity +// ChangeProvider represents a code change from an external provider (e.g., a GitHub pull request or Gerrit changelist) +// along with its associated metadata. The object is immutable after creation. type ChangeProvider struct { // ID is the globally unique identifier for the land request. Format: "/". ID string From c3406ad1f38427cb2d7924c1d1c822ff761634fb Mon Sep 17 00:00:00 2001 From: manjari Date: Mon, 23 Feb 2026 22:38:38 +0000 Subject: [PATCH 06/10] remove queue from fields for ChangeProvider --- entity/change_provider.go | 3 --- extension/storage/mysql/change_provider_store.go | 8 ++++---- extension/storage/mysql/schema/change_provider.sql | 3 +-- 3 files changed, 5 insertions(+), 9 deletions(-) diff --git a/entity/change_provider.go b/entity/change_provider.go index feff2a02..e1753d66 100644 --- a/entity/change_provider.go +++ b/entity/change_provider.go @@ -5,9 +5,6 @@ package entity type ChangeProvider struct { // ID is the globally unique identifier for the land request. Format: "/". ID string - // Queue is the name of the queue processing the land request. - // This is defined in the configuration and should be unique within the system. - Queue string // ChangeProviderSrc defines the source of the change. For e.g. - Github, Gitlab etc. ChangeProviderSrc string // ChangeProviderID is the identifier specified by the change provider source. For e.g. - Github PR ID etc. diff --git a/extension/storage/mysql/change_provider_store.go b/extension/storage/mysql/change_provider_store.go index 0e40067e..2560c379 100644 --- a/extension/storage/mysql/change_provider_store.go +++ b/extension/storage/mysql/change_provider_store.go @@ -28,9 +28,9 @@ func (s *changeProviderStore) Get(ctx context.Context, id string) (entity.Change var metadataJSON []byte err := s.db.QueryRowContext(ctx, - "SELECT id, queue, change_provider_src, change_provider_id, metadata FROM change_provider WHERE id = ?", + "SELECT id, change_provider_src, change_provider_id, metadata FROM change_provider WHERE id = ?", id, - ).Scan(&cp.ID, &cp.Queue, &cp.ChangeProviderSrc, &cp.ChangeProviderID, &metadataJSON) + ).Scan(&cp.ID, &cp.ChangeProviderSrc, &cp.ChangeProviderID, &metadataJSON) if errors.Is(err, sql.ErrNoRows) { return entity.ChangeProvider{}, storage.WrapNotFound(err) @@ -54,8 +54,8 @@ func (s *changeProviderStore) Create(ctx context.Context, changeProvider entity. } _, err = s.db.ExecContext(ctx, - "INSERT INTO change_provider (id, queue, change_provider_src, change_provider_id, metadata) VALUES (?, ?, ?, ?, ?)", - changeProvider.ID, changeProvider.Queue, changeProvider.ChangeProviderSrc, changeProvider.ChangeProviderID, metadataJSON, + "INSERT INTO change_provider (id, change_provider_src, change_provider_id, metadata) VALUES (?, ?, ?, ?)", + changeProvider.ID, changeProvider.ChangeProviderSrc, changeProvider.ChangeProviderID, metadataJSON, ) if err != nil { var mysqlErr *mysql.MySQLError diff --git a/extension/storage/mysql/schema/change_provider.sql b/extension/storage/mysql/schema/change_provider.sql index b55ba0e8..2af5a66d 100644 --- a/extension/storage/mysql/schema/change_provider.sql +++ b/extension/storage/mysql/schema/change_provider.sql @@ -1,8 +1,7 @@ CREATE TABLE IF NOT EXISTS change_provider ( id VARCHAR(255) NOT NULL, - queue VARCHAR(255) NOT NULL, change_provider_src VARCHAR(255) NOT NULL, change_provider_id VARCHAR(255) NOT NULL, metadata JSON NOT NUll, - PRIMARY KEY (id,queue,change_provider_src,change_provider_id) + PRIMARY KEY (id,change_provider_src,change_provider_id) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; From 5312f79a055cc6e57914a35278e3e432d7d113a0 Mon Sep 17 00:00:00 2001 From: manjari Date: Mon, 23 Feb 2026 22:40:21 +0000 Subject: [PATCH 07/10] update type of metadata --- entity/change_provider.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/entity/change_provider.go b/entity/change_provider.go index e1753d66..6c0760f5 100644 --- a/entity/change_provider.go +++ b/entity/change_provider.go @@ -11,5 +11,5 @@ type ChangeProvider struct { ChangeProviderID string // Metadata is the interesting data from the change provider that we want to store. // This is a freeform JSON object. - Metadata map[string]any + Metadata map[string]string } From 5704c39971a2f7788d63c47cd0f5ba5746420b19 Mon Sep 17 00:00:00 2001 From: manjari Date: Mon, 23 Feb 2026 23:57:07 +0000 Subject: [PATCH 08/10] address comments --- entity/change_provider.go | 4 +- extension/storage/change_provider_store.go | 12 +++-- .../storage/mysql/change_provider_store.go | 51 ++++++++++++------- gateway/controller/land_test.go | 4 +- 4 files changed, 45 insertions(+), 26 deletions(-) diff --git a/entity/change_provider.go b/entity/change_provider.go index 6c0760f5..771c2a33 100644 --- a/entity/change_provider.go +++ b/entity/change_provider.go @@ -3,8 +3,8 @@ package entity // ChangeProvider represents a code change from an external provider (e.g., a GitHub pull request or Gerrit changelist) // along with its associated metadata. The object is immutable after creation. type ChangeProvider struct { - // ID is the globally unique identifier for the land request. Format: "/". - ID string + // RequestID is the globally unique identifier for the land request. Format: "/". + RequestID string // ChangeProviderSrc defines the source of the change. For e.g. - Github, Gitlab etc. ChangeProviderSrc string // ChangeProviderID is the identifier specified by the change provider source. For e.g. - Github PR ID etc. diff --git a/extension/storage/change_provider_store.go b/extension/storage/change_provider_store.go index c0016015..ae913378 100644 --- a/extension/storage/change_provider_store.go +++ b/extension/storage/change_provider_store.go @@ -8,9 +8,15 @@ import ( // ChangeProviderStore is an interface that defines methods for managing change provider information in the database. type ChangeProviderStore interface { - // Get retrieves information about a change by ID. Returns ErrNotFound if the change provider is - // not found. - Get(ctx context.Context, id string) (entity.ChangeProvider, error) + // Get retrieves information about a change by ID. + // Returns ErrNotFound if the change provider is not found. + // + // Note: The order of ChangeProvider entities here is not guaranteed to + // be the same as the request to which it belongs. The caller is repsonsible + // for inspecting and mapping the result of this function to the + // order of changes within the original request. + // + Get(ctx context.Context, requestID string) ([]entity.ChangeProvider, error) // Create creates a new change provider. Create(ctx context.Context, changeProvider entity.ChangeProvider) error diff --git a/extension/storage/mysql/change_provider_store.go b/extension/storage/mysql/change_provider_store.go index 2560c379..1f4e58ea 100644 --- a/extension/storage/mysql/change_provider_store.go +++ b/extension/storage/mysql/change_provider_store.go @@ -22,47 +22,60 @@ func NewChangeProviderStore(db *sql.DB) storage.ChangeProviderStore { return &changeProviderStore{db: db} } -// Get retrieves a change provider by ID. Returns ErrNotFound if the change provider is not found. -func (s *changeProviderStore) Get(ctx context.Context, id string) (entity.ChangeProvider, error) { - var cp entity.ChangeProvider - var metadataJSON []byte - - err := s.db.QueryRowContext(ctx, +// Get retrieves change provider(s) by request ID. Returns ErrNotFound if the change provider is not found. +func (s *changeProviderStore) Get(ctx context.Context, requestID string) ([]entity.ChangeProvider, error) { + rows, err := s.db.QueryContext(ctx, "SELECT id, change_provider_src, change_provider_id, metadata FROM change_provider WHERE id = ?", - id, - ).Scan(&cp.ID, &cp.ChangeProviderSrc, &cp.ChangeProviderID, &metadataJSON) + requestID, + ) + if err != nil { + return nil, fmt.Errorf("failed to get change provider entities requestID=%s from the database: %w", requestID, err) + } + defer rows.Close() + + var results []entity.ChangeProvider + for rows.Next() { + var cp entity.ChangeProvider + var metadataJSON []byte - if errors.Is(err, sql.ErrNoRows) { - return entity.ChangeProvider{}, storage.WrapNotFound(err) + if err := rows.Scan(&cp.RequestID, &cp.ChangeProviderSrc, &cp.ChangeProviderID, &metadataJSON); err != nil { + return nil, fmt.Errorf("failed to scan change provider entity requestID=%s from the database: %w", requestID, err) + } + + if err := json.Unmarshal(metadataJSON, &cp.Metadata); err != nil { + return nil, fmt.Errorf("failed to unmarshal metadata for change provider entity requestID=%s from the database: %w", requestID, err) + } + + results = append(results, cp) } - if err != nil { - return entity.ChangeProvider{}, fmt.Errorf("failed to get change provider entity id=%s from the database: %w", id, err) + if err := rows.Err(); err != nil { + return nil, fmt.Errorf("failed to iterate change provider entities requestID=%s from the database: %w", requestID, err) } - if err := json.Unmarshal(metadataJSON, &cp.Metadata); err != nil { - return entity.ChangeProvider{}, fmt.Errorf("failed to unmarshal metadata for change provider entity id=%s from the database: %w", id, err) + if len(results) == 0 { + return nil, storage.WrapNotFound(fmt.Errorf("change provider entity requestID=%s", requestID)) } - return cp, nil + return results, nil } // Create creates a new change provider. Returns ErrAlreadyExists if the entry already exists. func (s *changeProviderStore) Create(ctx context.Context, changeProvider entity.ChangeProvider) error { metadataJSON, err := json.Marshal(changeProvider.Metadata) if err != nil { - return fmt.Errorf("failed to marshal metadata id=%s for Create change provider entity: %w", changeProvider.ID, err) + return fmt.Errorf("failed to marshal metadata id=%s for Create change provider entity: %w", changeProvider.RequestID, err) } _, err = s.db.ExecContext(ctx, "INSERT INTO change_provider (id, change_provider_src, change_provider_id, metadata) VALUES (?, ?, ?, ?)", - changeProvider.ID, changeProvider.ChangeProviderSrc, changeProvider.ChangeProviderID, metadataJSON, + changeProvider.RequestID, changeProvider.ChangeProviderSrc, changeProvider.ChangeProviderID, metadataJSON, ) if err != nil { var mysqlErr *mysql.MySQLError if errors.As(err, &mysqlErr) && mysqlErr.Number == 1062 { - return fmt.Errorf("change provider entity id=%s: %w", changeProvider.ID, storage.ErrAlreadyExists) + return fmt.Errorf("change provider entity id=%s: %w", changeProvider.RequestID, storage.ErrAlreadyExists) } - return fmt.Errorf("failed to insert change provider entity id=%s: %w", changeProvider.ID, err) + return fmt.Errorf("failed to insert change provider entity id=%s: %w", changeProvider.RequestID, err) } return nil diff --git a/gateway/controller/land_test.go b/gateway/controller/land_test.go index a1f5d891..8f8535d4 100644 --- a/gateway/controller/land_test.go +++ b/gateway/controller/land_test.go @@ -42,8 +42,8 @@ type mockChangeProviderStore struct { createFunc func(ctx context.Context, changeProvider entity.ChangeProvider) error } -func (m *mockChangeProviderStore) Get(ctx context.Context, id string) (entity.ChangeProvider, error) { - return entity.ChangeProvider{}, nil +func (m *mockChangeProviderStore) Get(ctx context.Context, requestID string) ([]entity.ChangeProvider, error) { + return nil, nil } func (m *mockChangeProviderStore) Create(ctx context.Context, changeProvider entity.ChangeProvider) error { From 9fdcaf4b26a8387a38e72dbec4cf094d5baae089 Mon Sep 17 00:00:00 2001 From: manjari Date: Mon, 23 Feb 2026 23:58:52 +0000 Subject: [PATCH 09/10] fix --- extension/storage/mysql/change_provider_store.go | 4 ++-- extension/storage/mysql/schema/change_provider.sql | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/extension/storage/mysql/change_provider_store.go b/extension/storage/mysql/change_provider_store.go index 1f4e58ea..ec3cc6be 100644 --- a/extension/storage/mysql/change_provider_store.go +++ b/extension/storage/mysql/change_provider_store.go @@ -25,7 +25,7 @@ func NewChangeProviderStore(db *sql.DB) storage.ChangeProviderStore { // Get retrieves change provider(s) by request ID. Returns ErrNotFound if the change provider is not found. func (s *changeProviderStore) Get(ctx context.Context, requestID string) ([]entity.ChangeProvider, error) { rows, err := s.db.QueryContext(ctx, - "SELECT id, change_provider_src, change_provider_id, metadata FROM change_provider WHERE id = ?", + "SELECT request_id, change_provider_src, change_provider_id, metadata FROM change_provider WHERE request_id = ?", requestID, ) if err != nil { @@ -67,7 +67,7 @@ func (s *changeProviderStore) Create(ctx context.Context, changeProvider entity. } _, err = s.db.ExecContext(ctx, - "INSERT INTO change_provider (id, change_provider_src, change_provider_id, metadata) VALUES (?, ?, ?, ?)", + "INSERT INTO change_provider (request_id, change_provider_src, change_provider_id, metadata) VALUES (?, ?, ?, ?)", changeProvider.RequestID, changeProvider.ChangeProviderSrc, changeProvider.ChangeProviderID, metadataJSON, ) if err != nil { diff --git a/extension/storage/mysql/schema/change_provider.sql b/extension/storage/mysql/schema/change_provider.sql index 2af5a66d..f714d33f 100644 --- a/extension/storage/mysql/schema/change_provider.sql +++ b/extension/storage/mysql/schema/change_provider.sql @@ -1,7 +1,7 @@ CREATE TABLE IF NOT EXISTS change_provider ( - id VARCHAR(255) NOT NULL, + request_id VARCHAR(255) NOT NULL, change_provider_src VARCHAR(255) NOT NULL, change_provider_id VARCHAR(255) NOT NULL, metadata JSON NOT NUll, - PRIMARY KEY (id,change_provider_src,change_provider_id) + PRIMARY KEY (request_id,change_provider_src,change_provider_id) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; From 5c64579f8c299ac4bfdd240a46211f1cd7b0553c Mon Sep 17 00:00:00 2001 From: manjari Date: Tue, 24 Feb 2026 00:33:08 +0000 Subject: [PATCH 10/10] comment --- extension/storage/mysql/change_provider_store.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/extension/storage/mysql/change_provider_store.go b/extension/storage/mysql/change_provider_store.go index ec3cc6be..44c9cdf4 100644 --- a/extension/storage/mysql/change_provider_store.go +++ b/extension/storage/mysql/change_provider_store.go @@ -23,6 +23,12 @@ func NewChangeProviderStore(db *sql.DB) storage.ChangeProviderStore { } // Get retrieves change provider(s) by request ID. Returns ErrNotFound if the change provider is not found. +// +// Note: The order of ChangeProvider entities returned here is not guaranteed +// to be the same as the request to which it belongs. The caller is repsonsible +// for inspecting and mapping the result of this function to the +// order of changes within the original request. +// func (s *changeProviderStore) Get(ctx context.Context, requestID string) ([]entity.ChangeProvider, error) { rows, err := s.db.QueryContext(ctx, "SELECT request_id, change_provider_src, change_provider_id, metadata FROM change_provider WHERE request_id = ?",