Skip to content

Commit 28e9ec4

Browse files
committed
feat(entities) Adds batch store interface and mysql implementation
1 parent aa1b44b commit 28e9ec4

7 files changed

Lines changed: 176 additions & 0 deletions

File tree

extension/storage/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ load("@rules_go//go:def.bzl", "go_library")
33
go_library(
44
name = "storage",
55
srcs = [
6+
"batch_store.go",
67
"change_provider_store.go",
78
"request_store.go",
89
"storage.go",

extension/storage/batch_store.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package storage
2+
3+
import (
4+
"context"
5+
6+
"github.com/uber/submitqueue/entity"
7+
)
8+
9+
// BatchStore is an interface that defines methods for managing batches in the database.
10+
type BatchStore interface {
11+
// Get retrieves a batch by ID. Returns ErrNotFound if the batch is not found.
12+
Get(ctx context.Context, id string) (entity.Batch, error)
13+
14+
// Create creates a new batch. The batch must have a unique ID already assigned.
15+
// Returns ErrAlreadyExists if a batch with the same ID already exists.
16+
Create(ctx context.Context, batch entity.Batch) error
17+
18+
// UpdateState updates the state of a batch if the current version matches the expected version. If versions do not match, returns ErrVersionMismatch.
19+
// The implementation should increment the version by 1 atomically with the state update.
20+
UpdateState(ctx context.Context, id string, version int32, newState entity.BatchState) error
21+
}

extension/storage/mysql/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ load("@rules_go//go:def.bzl", "go_library")
33
go_library(
44
name = "mysql",
55
srcs = [
6+
"batch_store.go",
67
"change_provider_store.go",
78
"request_store.go",
89
"storage.go",
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
package mysql
2+
3+
import (
4+
"context"
5+
"database/sql"
6+
"encoding/json"
7+
"errors"
8+
"fmt"
9+
10+
"github.com/go-sql-driver/mysql"
11+
12+
"github.com/uber/submitqueue/entity"
13+
"github.com/uber/submitqueue/extension/storage"
14+
)
15+
16+
type batchStore struct {
17+
db *sql.DB
18+
}
19+
20+
// NewBatchStore creates a new MySQL-backed BatchStore.
21+
func NewBatchStore(db *sql.DB) storage.BatchStore {
22+
return &batchStore{db: db}
23+
}
24+
25+
// Get retrieves a batch by ID. Returns ErrNotFound if the batch is not found.
26+
func (s *batchStore) Get(ctx context.Context, id string) (entity.Batch, error) {
27+
var batch entity.Batch
28+
var containsJSON []byte
29+
var dependenciesJSON []byte
30+
31+
err := s.db.QueryRowContext(ctx,
32+
"SELECT id, queue, contains, dependencies, state, version FROM batch WHERE id = ?",
33+
id,
34+
).Scan(&batch.ID, &batch.Queue, &containsJSON, &dependenciesJSON, &batch.State, &batch.Version)
35+
36+
if errors.Is(err, sql.ErrNoRows) {
37+
return entity.Batch{}, storage.WrapNotFound(err)
38+
}
39+
if err != nil {
40+
return entity.Batch{}, fmt.Errorf("failed to get batch entity id=%s from the database: %w", id, err)
41+
}
42+
43+
if err := json.Unmarshal(containsJSON, &batch.Contains); err != nil {
44+
return entity.Batch{}, fmt.Errorf("failed to unmarshal contains for batch entity id=%s from the database: %w", id, err)
45+
}
46+
47+
if err := json.Unmarshal(dependenciesJSON, &batch.Dependencies); err != nil {
48+
return entity.Batch{}, fmt.Errorf("failed to unmarshal dependencies for batch entity id=%s from the database: %w", id, err)
49+
}
50+
51+
return batch, nil
52+
}
53+
54+
// Create creates a new batch. The batch must have a unique ID already assigned. Returns ErrAlreadyExists if the batch ID already exists.
55+
func (s *batchStore) Create(ctx context.Context, batch entity.Batch) error {
56+
containsJSON, err := json.Marshal(batch.Contains)
57+
if err != nil {
58+
return fmt.Errorf("failed to marshal contains=%v id=%s for Create batch entity: %w", batch.Contains, batch.ID, err)
59+
}
60+
61+
dependenciesJSON, err := json.Marshal(batch.Dependencies)
62+
if err != nil {
63+
return fmt.Errorf("failed to marshal dependencies=%v id=%s for Create batch entity: %w", batch.Dependencies, batch.ID, err)
64+
}
65+
66+
_, err = s.db.ExecContext(ctx,
67+
"INSERT INTO batch (id, queue, contains, dependencies, state, version) VALUES (?, ?, ?, ?, ?, ?)",
68+
batch.ID, batch.Queue, containsJSON, dependenciesJSON, batch.State, batch.Version,
69+
)
70+
if err != nil {
71+
var mysqlErr *mysql.MySQLError
72+
if errors.As(err, &mysqlErr) && mysqlErr.Number == 1062 {
73+
return fmt.Errorf("batch entity id=%s: %w", batch.ID, storage.ErrAlreadyExists)
74+
}
75+
return fmt.Errorf("failed to insert batch entity id=%s: %w", batch.ID, err)
76+
}
77+
78+
return nil
79+
}
80+
81+
// UpdateState updates the state of a batch if the current version matches the expected version. If versions do not match, returns ErrVersionMismatch.
82+
// The implementation increments the version by 1 atomically with the state update.
83+
func (s *batchStore) UpdateState(ctx context.Context, id string, version int32, newState entity.BatchState) error {
84+
result, err := s.db.ExecContext(ctx,
85+
"UPDATE batch SET state = ?, version = version + 1 WHERE id = ? AND version = ?",
86+
newState, id, version,
87+
)
88+
if err != nil {
89+
return fmt.Errorf(
90+
"failed to update batch state for id=%q version=%d newState=%v: %w",
91+
id, version, newState, err,
92+
)
93+
}
94+
95+
rowsAffected, err := result.RowsAffected()
96+
if err != nil {
97+
return fmt.Errorf(
98+
"failed to get rows affected from update for id=%q version=%d newState=%v: %w",
99+
id, version, newState, err,
100+
)
101+
}
102+
103+
if rowsAffected != 1 {
104+
return fmt.Errorf(
105+
"version mismatch for batch update: id=%q expected_version=%d newState=%v: %w",
106+
id, version, newState, storage.ErrVersionMismatch,
107+
)
108+
}
109+
110+
return nil
111+
}

extension/storage/mysql/storage.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ type mysqlStorage struct {
3131
db *sql.DB
3232
requestStore storage.RequestStore
3333
changeProviderStore storage.ChangeProviderStore
34+
batchStore storage.BatchStore
3435
}
3536

3637
// NewStorage creates a new MySQL storage.
@@ -54,6 +55,7 @@ func NewStorage(p MySQLParameters) (storage.Storage, error) {
5455
db: db,
5556
requestStore: NewRequestStore(db),
5657
changeProviderStore: NewChangeProviderStore(db),
58+
batchStore: NewBatchStore(db),
5759
}, nil
5860
}
5961

@@ -67,6 +69,11 @@ func (f *mysqlStorage) GetChangeProviderStore() storage.ChangeProviderStore {
6769
return f.changeProviderStore
6870
}
6971

72+
// GetBatchStore returns the MySQL-backed BatchStore.
73+
func (f *mysqlStorage) GetBatchStore() storage.BatchStore {
74+
return f.batchStore
75+
}
76+
7077
// Close closes the underlying database connection.
7178
func (f *mysqlStorage) Close() error {
7279
return f.db.Close()

extension/storage/storage.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,9 @@ type Storage interface {
3232
// GetChangeProviderStore returns the ChangeProviderStore instance.
3333
GetChangeProviderStore() ChangeProviderStore
3434

35+
// GetBatchStore returns the BatchStore instance.
36+
GetBatchStore() BatchStore
37+
3538
// Close closes the storage and all underlying connections. Should only be called once at the end of the program.
3639
Close() error
3740
}

gateway/controller/land_test.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,9 +51,37 @@ func (m *mockChangeProviderStore) Create(ctx context.Context, changeProvider ent
5151
return m.createFunc(ctx, changeProvider)
5252
}
5353

54+
type mockBatchStore struct {
55+
createFunc func(ctx context.Context, batch entity.Batch) error
56+
getFunc func(ctx context.Context, id string) (entity.Batch, error)
57+
updateStateFunc func(ctx context.Context, id string, version int32, newState entity.BatchState) error
58+
}
59+
60+
func (m *mockBatchStore) Get(ctx context.Context, id string) (entity.Batch, error) {
61+
if m.getFunc != nil {
62+
return m.getFunc(ctx, id)
63+
}
64+
return entity.Batch{}, nil
65+
}
66+
67+
func (m *mockBatchStore) Create(ctx context.Context, batch entity.Batch) error {
68+
if m.createFunc != nil {
69+
return m.createFunc(ctx, batch)
70+
}
71+
return nil
72+
}
73+
74+
func (m *mockBatchStore) UpdateState(ctx context.Context, id string, version int32, newState entity.BatchState) error {
75+
if m.updateStateFunc != nil {
76+
return m.updateStateFunc(ctx, id, version, newState)
77+
}
78+
return nil
79+
}
80+
5481
type mockStorage struct {
5582
requestStore storage.RequestStore
5683
changeProviderStore storage.ChangeProviderStore
84+
batchStore storage.BatchStore
5785
}
5886

5987
func (m *mockStorage) GetRequestStore() storage.RequestStore {
@@ -64,6 +92,10 @@ func (m *mockStorage) GetChangeProviderStore() storage.ChangeProviderStore {
6492
return m.changeProviderStore
6593
}
6694

95+
func (m *mockStorage) GetBatchStore() storage.BatchStore {
96+
return m.batchStore
97+
}
98+
6799
func (m *mockStorage) Close() error {
68100
return nil
69101
}

0 commit comments

Comments
 (0)