From 416d7f5d1776d6501751fd5a237f122ebb05ff0e Mon Sep 17 00:00:00 2001 From: manjari Date: Tue, 24 Feb 2026 18:37:31 +0000 Subject: [PATCH 1/2] feat(entities) Adds speculation tree store interface and mysql implementation --- extension/storage/BUILD.bazel | 1 + extension/storage/mysql/BUILD.bazel | 1 + .../storage/mysql/speculation_tree_store.go | 96 +++++++++++++++++++ extension/storage/mysql/storage.go | 7 ++ extension/storage/speculation_tree_store.go | 22 +++++ extension/storage/storage.go | 3 + gateway/controller/land_test.go | 32 +++++++ 7 files changed, 162 insertions(+) create mode 100644 extension/storage/mysql/speculation_tree_store.go create mode 100644 extension/storage/speculation_tree_store.go diff --git a/extension/storage/BUILD.bazel b/extension/storage/BUILD.bazel index 0b3e3d06..fa34c18d 100644 --- a/extension/storage/BUILD.bazel +++ b/extension/storage/BUILD.bazel @@ -8,6 +8,7 @@ go_library( "build_store.go", "change_provider_store.go", "request_store.go", + "speculation_tree_store.go", "storage.go", ], importpath = "github.com/uber/submitqueue/extension/storage", diff --git a/extension/storage/mysql/BUILD.bazel b/extension/storage/mysql/BUILD.bazel index e14b8769..e81dce3e 100644 --- a/extension/storage/mysql/BUILD.bazel +++ b/extension/storage/mysql/BUILD.bazel @@ -8,6 +8,7 @@ go_library( "build_store.go", "change_provider_store.go", "request_store.go", + "speculation_tree_store.go", "storage.go", ], importpath = "github.com/uber/submitqueue/extension/storage/mysql", diff --git a/extension/storage/mysql/speculation_tree_store.go b/extension/storage/mysql/speculation_tree_store.go new file mode 100644 index 00000000..4038658e --- /dev/null +++ b/extension/storage/mysql/speculation_tree_store.go @@ -0,0 +1,96 @@ +package mysql + +import ( + "context" + "database/sql" + "encoding/json" + "errors" + "fmt" + + "github.com/go-sql-driver/mysql" + + "github.com/uber/submitqueue/entity" + "github.com/uber/submitqueue/extension/storage" +) + +type speculationTreeStore struct { + db *sql.DB +} + +// NewSpeculationTreeStore creates a new MySQL-backed SpeculationTreeStore. +func NewSpeculationTreeStore(db *sql.DB) storage.SpeculationTreeStore { + return &speculationTreeStore{db: db} +} + +// Get retrieves the speculation tree by batch ID. Returns ErrNotFound if the speculation tree is not found. +func (s *speculationTreeStore) Get(ctx context.Context, batchID string) (entity.SpeculationTree, error) { + var st entity.SpeculationTree + var speculationsJSON []byte + + err := s.db.QueryRowContext(ctx, + "SELECT batch_id, queue, speculations FROM speculation_tree WHERE batch_id = ?", + batchID, + ).Scan(&st.BatchID, &st.Queue, &speculationsJSON) + + if errors.Is(err, sql.ErrNoRows) { + return entity.SpeculationTree{}, storage.WrapNotFound(err) + } + if err != nil { + return entity.SpeculationTree{}, fmt.Errorf("failed to get speculation tree entity batchID=%s from the database: %w", batchID, err) + } + + if err := json.Unmarshal(speculationsJSON, &st.Speculations); err != nil { + return entity.SpeculationTree{}, fmt.Errorf("failed to unmarshal speculations for speculation tree entity batchID=%s from the database: %w", batchID, err) + } + + return st, nil +} + +// Create creates a new speculation tree. Returns ErrAlreadyExists if the entry already exists. +func (s *speculationTreeStore) Create(ctx context.Context, speculationTree entity.SpeculationTree) error { + speculationsJSON, err := json.Marshal(speculationTree.Speculations) + if err != nil { + return fmt.Errorf("failed to marshal speculations batchID=%s for Create speculation tree entity: %w", speculationTree.BatchID, err) + } + + _, err = s.db.ExecContext(ctx, + "INSERT INTO speculation_tree (batch_id, queue, speculations) VALUES (?, ?, ?)", + speculationTree.BatchID, speculationTree.Queue, speculationsJSON, + ) + if err != nil { + var mysqlErr *mysql.MySQLError + if errors.As(err, &mysqlErr) && mysqlErr.Number == 1062 { + return fmt.Errorf("speculation tree entity batchID=%s: %w", speculationTree.BatchID, storage.ErrAlreadyExists) + } + return fmt.Errorf("failed to insert speculation tree entity batchID=%s: %w", speculationTree.BatchID, err) + } + + return nil +} + +// UpdateSpeculations updates the speculations of a speculation tree. Returns ErrNotFound if the speculation tree is not found. +func (s *speculationTreeStore) UpdateSpeculations(ctx context.Context, batchID string, speculations []map[string]string) error { + speculationsJSON, err := json.Marshal(speculations) + if err != nil { + return fmt.Errorf("failed to marshal speculations batchID=%s for UpdateSpeculations: %w", batchID, err) + } + + result, err := s.db.ExecContext(ctx, + "UPDATE speculation_tree SET speculations = ? WHERE batch_id = ?", + speculationsJSON, batchID, + ) + if err != nil { + return fmt.Errorf("failed to update speculations for batchID=%q: %w", batchID, err) + } + + rowsAffected, err := result.RowsAffected() + if err != nil { + return fmt.Errorf("failed to get rows affected from update for batchID=%q: %w", batchID, err) + } + + if rowsAffected != 1 { + return storage.WrapNotFound(fmt.Errorf("speculation tree entity batchID=%s", batchID)) + } + + return nil +} diff --git a/extension/storage/mysql/storage.go b/extension/storage/mysql/storage.go index 8b42ddb1..cef6e353 100644 --- a/extension/storage/mysql/storage.go +++ b/extension/storage/mysql/storage.go @@ -15,6 +15,7 @@ type mysqlStorage struct { batchStore storage.BatchStore batchDependentStore storage.BatchDependentStore buildStore storage.BuildStore + speculationTreeStore storage.SpeculationTreeStore } // NewStorage creates a new MySQL storage. @@ -26,6 +27,7 @@ func NewStorage(db *sql.DB) (storage.Storage, error) { batchStore: NewBatchStore(db), batchDependentStore: NewBatchDependentStore(db), buildStore: NewBuildStore(db), + speculationTreeStore: NewSpeculationTreeStore(db), }, nil } @@ -54,6 +56,11 @@ func (f *mysqlStorage) GetBuildStore() storage.BuildStore { return f.buildStore } +// GetSpeculationTreeStore returns the MySQL-backed SpeculationTreeStore. +func (f *mysqlStorage) GetSpeculationTreeStore() storage.SpeculationTreeStore { + return f.speculationTreeStore +} + // Close closes the underlying database connection. func (f *mysqlStorage) Close() error { return f.db.Close() diff --git a/extension/storage/speculation_tree_store.go b/extension/storage/speculation_tree_store.go new file mode 100644 index 00000000..7c4bf806 --- /dev/null +++ b/extension/storage/speculation_tree_store.go @@ -0,0 +1,22 @@ +package storage + +import ( + "context" + + "github.com/uber/submitqueue/entity" +) + +// SpeculationTreeStore is an interface that defines methods for managing speculation trees in the database. +type SpeculationTreeStore interface { + // Get retrieves the speculation tree by batch ID. + // Returns ErrNotFound if the speculation tree is not found. + Get(ctx context.Context, batchID string) (entity.SpeculationTree, error) + + // Create creates a new speculation tree. + // Returns ErrAlreadyExists if the entry already exists. + Create(ctx context.Context, speculationTree entity.SpeculationTree) error + + // UpdateSpeculations updates the speculations of a speculation tree. + // Returns ErrNotFound if the speculation tree is not found. + UpdateSpeculations(ctx context.Context, batchID string, speculations []map[string]string) error +} diff --git a/extension/storage/storage.go b/extension/storage/storage.go index 3f42c461..e87d69c5 100644 --- a/extension/storage/storage.go +++ b/extension/storage/storage.go @@ -41,6 +41,9 @@ type Storage interface { // GetBuildStore returns the BuildStore instance. GetBuildStore() BuildStore + // GetSpeculationTreeStore returns the SpeculationTreeStore instance. + GetSpeculationTreeStore() SpeculationTreeStore + // Close closes the storage and all underlying connections. Should only be called once at the end of the program. Close() error } diff --git a/gateway/controller/land_test.go b/gateway/controller/land_test.go index 1bfa40e6..9154e160 100644 --- a/gateway/controller/land_test.go +++ b/gateway/controller/land_test.go @@ -124,12 +124,40 @@ func (m *mockBuildStore) UpdateStatus(ctx context.Context, id string, newStatus return nil } +type mockSpeculationTreeStore struct { + createFunc func(ctx context.Context, speculationTree entity.SpeculationTree) error + getFunc func(ctx context.Context, batchID string) (entity.SpeculationTree, error) + updateSpeculationsFunc func(ctx context.Context, batchID string, speculations []map[string]string) error +} + +func (m *mockSpeculationTreeStore) Get(ctx context.Context, batchID string) (entity.SpeculationTree, error) { + if m.getFunc != nil { + return m.getFunc(ctx, batchID) + } + return entity.SpeculationTree{}, nil +} + +func (m *mockSpeculationTreeStore) Create(ctx context.Context, speculationTree entity.SpeculationTree) error { + if m.createFunc != nil { + return m.createFunc(ctx, speculationTree) + } + return nil +} + +func (m *mockSpeculationTreeStore) UpdateSpeculations(ctx context.Context, batchID string, speculations []map[string]string) error { + if m.updateSpeculationsFunc != nil { + return m.updateSpeculationsFunc(ctx, batchID, speculations) + } + return nil +} + type mockStorage struct { requestStore storage.RequestStore changeProviderStore storage.ChangeProviderStore batchStore storage.BatchStore batchDependentStore storage.BatchDependentStore buildStore storage.BuildStore + speculationTreeStore storage.SpeculationTreeStore } func (m *mockStorage) GetRequestStore() storage.RequestStore { @@ -152,6 +180,10 @@ func (m *mockStorage) GetBuildStore() storage.BuildStore { return m.buildStore } +func (m *mockStorage) GetSpeculationTreeStore() storage.SpeculationTreeStore { + return m.speculationTreeStore +} + func (m *mockStorage) Close() error { return nil } From bad173578d48536a1514c50f6aa60f25dc583fbb Mon Sep 17 00:00:00 2001 From: manjari Date: Tue, 24 Feb 2026 19:51:48 +0000 Subject: [PATCH 2/2] update store based on new speculation tree entity --- extension/storage/mysql/speculation_tree_store.go | 10 +++++----- extension/storage/speculation_tree_store.go | 2 +- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/extension/storage/mysql/speculation_tree_store.go b/extension/storage/mysql/speculation_tree_store.go index 4038658e..54e83c97 100644 --- a/extension/storage/mysql/speculation_tree_store.go +++ b/extension/storage/mysql/speculation_tree_store.go @@ -28,9 +28,9 @@ func (s *speculationTreeStore) Get(ctx context.Context, batchID string) (entity. var speculationsJSON []byte err := s.db.QueryRowContext(ctx, - "SELECT batch_id, queue, speculations FROM speculation_tree WHERE batch_id = ?", + "SELECT batch_id, speculations FROM speculation_tree WHERE batch_id = ?", batchID, - ).Scan(&st.BatchID, &st.Queue, &speculationsJSON) + ).Scan(&st.BatchID, &speculationsJSON) if errors.Is(err, sql.ErrNoRows) { return entity.SpeculationTree{}, storage.WrapNotFound(err) @@ -54,8 +54,8 @@ func (s *speculationTreeStore) Create(ctx context.Context, speculationTree entit } _, err = s.db.ExecContext(ctx, - "INSERT INTO speculation_tree (batch_id, queue, speculations) VALUES (?, ?, ?)", - speculationTree.BatchID, speculationTree.Queue, speculationsJSON, + "INSERT INTO speculation_tree (batch_id, speculations) VALUES (?, ?)", + speculationTree.BatchID, speculationsJSON, ) if err != nil { var mysqlErr *mysql.MySQLError @@ -69,7 +69,7 @@ func (s *speculationTreeStore) Create(ctx context.Context, speculationTree entit } // UpdateSpeculations updates the speculations of a speculation tree. Returns ErrNotFound if the speculation tree is not found. -func (s *speculationTreeStore) UpdateSpeculations(ctx context.Context, batchID string, speculations []map[string]string) error { +func (s *speculationTreeStore) UpdateSpeculations(ctx context.Context, batchID string, speculations []entity.SpeculationInfo) error { speculationsJSON, err := json.Marshal(speculations) if err != nil { return fmt.Errorf("failed to marshal speculations batchID=%s for UpdateSpeculations: %w", batchID, err) diff --git a/extension/storage/speculation_tree_store.go b/extension/storage/speculation_tree_store.go index 7c4bf806..cf285f05 100644 --- a/extension/storage/speculation_tree_store.go +++ b/extension/storage/speculation_tree_store.go @@ -18,5 +18,5 @@ type SpeculationTreeStore interface { // UpdateSpeculations updates the speculations of a speculation tree. // Returns ErrNotFound if the speculation tree is not found. - UpdateSpeculations(ctx context.Context, batchID string, speculations []map[string]string) error + UpdateSpeculations(ctx context.Context, batchID string, speculations []entity.SpeculationInfo) error }