From 0a7ec95e753728a7c5aa31f55288496bea689140 Mon Sep 17 00:00:00 2001 From: manjari Date: Tue, 24 Feb 2026 03:09:26 +0000 Subject: [PATCH] feat(entities) Adds batch store interface and mysql implementation --- extension/storage/BUILD.bazel | 1 + extension/storage/batch_store.go | 21 +++++ extension/storage/mysql/BUILD.bazel | 1 + extension/storage/mysql/batch_store.go | 111 +++++++++++++++++++++++++ extension/storage/mysql/storage.go | 7 ++ extension/storage/storage.go | 3 + gateway/controller/land_test.go | 32 +++++++ 7 files changed, 176 insertions(+) create mode 100644 extension/storage/batch_store.go create mode 100644 extension/storage/mysql/batch_store.go diff --git a/extension/storage/BUILD.bazel b/extension/storage/BUILD.bazel index 34f17052..8e2c0693 100644 --- a/extension/storage/BUILD.bazel +++ b/extension/storage/BUILD.bazel @@ -3,6 +3,7 @@ load("@rules_go//go:def.bzl", "go_library") go_library( name = "storage", srcs = [ + "batch_store.go", "change_provider_store.go", "request_store.go", "storage.go", diff --git a/extension/storage/batch_store.go b/extension/storage/batch_store.go new file mode 100644 index 00000000..7bc5052b --- /dev/null +++ b/extension/storage/batch_store.go @@ -0,0 +1,21 @@ +package storage + +import ( + "context" + + "github.com/uber/submitqueue/entity" +) + +// BatchStore is an interface that defines methods for managing batches in the database. +type BatchStore interface { + // Get retrieves a batch by ID. Returns ErrNotFound if the batch is not found. + Get(ctx context.Context, id string) (entity.Batch, error) + + // Create creates a new batch. The batch must have a unique ID already assigned. + // Returns ErrAlreadyExists if a batch with the same ID already exists. + Create(ctx context.Context, batch entity.Batch) error + + // UpdateState updates the state of a batch if the current version matches the expected version. If versions do not match, returns ErrVersionMismatch. + // The implementation should increment the version by 1 atomically with the state update. + UpdateState(ctx context.Context, id string, version int32, newState entity.BatchState) error +} diff --git a/extension/storage/mysql/BUILD.bazel b/extension/storage/mysql/BUILD.bazel index 4628ddd6..3304b7f2 100644 --- a/extension/storage/mysql/BUILD.bazel +++ b/extension/storage/mysql/BUILD.bazel @@ -3,6 +3,7 @@ load("@rules_go//go:def.bzl", "go_library") go_library( name = "mysql", srcs = [ + "batch_store.go", "change_provider_store.go", "request_store.go", "storage.go", diff --git a/extension/storage/mysql/batch_store.go b/extension/storage/mysql/batch_store.go new file mode 100644 index 00000000..f96a5b62 --- /dev/null +++ b/extension/storage/mysql/batch_store.go @@ -0,0 +1,111 @@ +package mysql + +import ( + "context" + "database/sql" + "encoding/json" + "errors" + "fmt" + + "github.com/go-sql-driver/mysql" + + "github.com/uber/submitqueue/entity" + "github.com/uber/submitqueue/extension/storage" +) + +type batchStore struct { + db *sql.DB +} + +// NewBatchStore creates a new MySQL-backed BatchStore. +func NewBatchStore(db *sql.DB) storage.BatchStore { + return &batchStore{db: db} +} + +// Get retrieves a batch by ID. Returns ErrNotFound if the batch is not found. +func (s *batchStore) Get(ctx context.Context, id string) (entity.Batch, error) { + var batch entity.Batch + var containsJSON []byte + var dependenciesJSON []byte + + err := s.db.QueryRowContext(ctx, + "SELECT id, queue, contains, dependencies, state, version FROM batch WHERE id = ?", + id, + ).Scan(&batch.ID, &batch.Queue, &containsJSON, &dependenciesJSON, &batch.State, &batch.Version) + + if errors.Is(err, sql.ErrNoRows) { + return entity.Batch{}, storage.WrapNotFound(err) + } + if err != nil { + return entity.Batch{}, fmt.Errorf("failed to get batch entity id=%s from the database: %w", id, err) + } + + if err := json.Unmarshal(containsJSON, &batch.Contains); err != nil { + return entity.Batch{}, fmt.Errorf("failed to unmarshal contains for batch entity id=%s from the database: %w", id, err) + } + + if err := json.Unmarshal(dependenciesJSON, &batch.Dependencies); err != nil { + return entity.Batch{}, fmt.Errorf("failed to unmarshal dependencies for batch entity id=%s from the database: %w", id, err) + } + + return batch, nil +} + +// Create creates a new batch. The batch must have a unique ID already assigned. Returns ErrAlreadyExists if the batch ID already exists. +func (s *batchStore) Create(ctx context.Context, batch entity.Batch) error { + containsJSON, err := json.Marshal(batch.Contains) + if err != nil { + return fmt.Errorf("failed to marshal contains=%v id=%s for Create batch entity: %w", batch.Contains, batch.ID, err) + } + + dependenciesJSON, err := json.Marshal(batch.Dependencies) + if err != nil { + return fmt.Errorf("failed to marshal dependencies=%v id=%s for Create batch entity: %w", batch.Dependencies, batch.ID, err) + } + + _, err = s.db.ExecContext(ctx, + "INSERT INTO batch (id, queue, contains, dependencies, state, version) VALUES (?, ?, ?, ?, ?, ?)", + batch.ID, batch.Queue, containsJSON, dependenciesJSON, batch.State, batch.Version, + ) + if err != nil { + var mysqlErr *mysql.MySQLError + if errors.As(err, &mysqlErr) && mysqlErr.Number == 1062 { + return fmt.Errorf("batch entity id=%s: %w", batch.ID, storage.ErrAlreadyExists) + } + return fmt.Errorf("failed to insert batch entity id=%s: %w", batch.ID, err) + } + + return nil +} + +// UpdateState updates the state of a batch if the current version matches the expected version. If versions do not match, returns ErrVersionMismatch. +// The implementation increments the version by 1 atomically with the state update. +func (s *batchStore) UpdateState(ctx context.Context, id string, version int32, newState entity.BatchState) error { + result, err := s.db.ExecContext(ctx, + "UPDATE batch SET state = ?, version = version + 1 WHERE id = ? AND version = ?", + newState, id, version, + ) + if err != nil { + return fmt.Errorf( + "failed to update batch state for id=%q version=%d newState=%v: %w", + id, version, newState, err, + ) + } + + rowsAffected, err := result.RowsAffected() + if err != nil { + return fmt.Errorf( + "failed to get rows affected from update for id=%q version=%d newState=%v: %w", + id, version, newState, err, + ) + } + + if rowsAffected != 1 { + return fmt.Errorf( + "version mismatch for batch update: id=%q expected_version=%d newState=%v: %w", + id, version, newState, storage.ErrVersionMismatch, + ) + } + + return nil +} diff --git a/extension/storage/mysql/storage.go b/extension/storage/mysql/storage.go index 12421e87..e876c59a 100644 --- a/extension/storage/mysql/storage.go +++ b/extension/storage/mysql/storage.go @@ -12,6 +12,7 @@ type mysqlStorage struct { db *sql.DB requestStore storage.RequestStore changeProviderStore storage.ChangeProviderStore + batchStore storage.BatchStore } // NewStorage creates a new MySQL storage. @@ -20,6 +21,7 @@ func NewStorage(db *sql.DB) (storage.Storage, error) { db: db, requestStore: NewRequestStore(db), changeProviderStore: NewChangeProviderStore(db), + batchStore: NewBatchStore(db), }, nil } @@ -33,6 +35,11 @@ func (f *mysqlStorage) GetChangeProviderStore() storage.ChangeProviderStore { return f.changeProviderStore } +// GetBatchStore returns the MySQL-backed BatchStore. +func (f *mysqlStorage) GetBatchStore() storage.BatchStore { + return f.batchStore +} + // Close closes the underlying database connection. func (f *mysqlStorage) Close() error { return f.db.Close() diff --git a/extension/storage/storage.go b/extension/storage/storage.go index 40f84599..72c94872 100644 --- a/extension/storage/storage.go +++ b/extension/storage/storage.go @@ -32,6 +32,9 @@ type Storage interface { // GetChangeProviderStore returns the ChangeProviderStore instance. GetChangeProviderStore() ChangeProviderStore + // GetBatchStore returns the BatchStore instance. + GetBatchStore() BatchStore + // Close closes the storage and all underlying connections. Should only be called once at the end of the program. Close() error } diff --git a/gateway/controller/land_test.go b/gateway/controller/land_test.go index c724a5a1..01532a9c 100644 --- a/gateway/controller/land_test.go +++ b/gateway/controller/land_test.go @@ -51,9 +51,37 @@ func (m *mockChangeProviderStore) Create(ctx context.Context, changeProvider ent return m.createFunc(ctx, changeProvider) } +type mockBatchStore struct { + createFunc func(ctx context.Context, batch entity.Batch) error + getFunc func(ctx context.Context, id string) (entity.Batch, error) + updateStateFunc func(ctx context.Context, id string, version int32, newState entity.BatchState) error +} + +func (m *mockBatchStore) Get(ctx context.Context, id string) (entity.Batch, error) { + if m.getFunc != nil { + return m.getFunc(ctx, id) + } + return entity.Batch{}, nil +} + +func (m *mockBatchStore) Create(ctx context.Context, batch entity.Batch) error { + if m.createFunc != nil { + return m.createFunc(ctx, batch) + } + return nil +} + +func (m *mockBatchStore) UpdateState(ctx context.Context, id string, version int32, newState entity.BatchState) error { + if m.updateStateFunc != nil { + return m.updateStateFunc(ctx, id, version, newState) + } + return nil +} + type mockStorage struct { requestStore storage.RequestStore changeProviderStore storage.ChangeProviderStore + batchStore storage.BatchStore } func (m *mockStorage) GetRequestStore() storage.RequestStore { @@ -64,6 +92,10 @@ func (m *mockStorage) GetChangeProviderStore() storage.ChangeProviderStore { return m.changeProviderStore } +func (m *mockStorage) GetBatchStore() storage.BatchStore { + return m.batchStore +} + func (m *mockStorage) Close() error { return nil }