diff --git a/example/server/gateway/main.go b/example/server/gateway/main.go index 48eec7ba..09f46c53 100644 --- a/example/server/gateway/main.go +++ b/example/server/gateway/main.go @@ -102,7 +102,7 @@ func run() error { defer appDB.Close() // Initialize counter from shared app database connection - cnt := mysqlcounter.NewCounter(appDB) + cnt := mysqlcounter.NewCounter(appDB, scope.SubScope("counter")) // Open queue database connection queueDSN := os.Getenv("QUEUE_MYSQL_DSN") diff --git a/example/server/orchestrator/main.go b/example/server/orchestrator/main.go index 31158867..f59b9eb9 100644 --- a/example/server/orchestrator/main.go +++ b/example/server/orchestrator/main.go @@ -112,9 +112,9 @@ func run() error { } defer appDB.Close() - cnt := mysqlcounter.NewCounter(appDB) + cnt := mysqlcounter.NewCounter(appDB, scope.SubScope("counter")) - store, err := mysqlstorage.NewStorage(appDB) + store, err := mysqlstorage.NewStorage(appDB, scope.SubScope("storage")) if err != nil { return fmt.Errorf("failed to create storage: %w", err) } diff --git a/extension/counter/mysql/BUILD.bazel b/extension/counter/mysql/BUILD.bazel index 64b4a0ce..f123a089 100644 --- a/extension/counter/mysql/BUILD.bazel +++ b/extension/counter/mysql/BUILD.bazel @@ -6,6 +6,8 @@ go_library( importpath = "github.com/uber/submitqueue/extension/counter/mysql", visibility = ["//visibility:public"], deps = [ + "//core/metrics", "//extension/counter", + "@com_github_uber_go_tally_v4//:tally", ], ) diff --git a/extension/counter/mysql/counter.go b/extension/counter/mysql/counter.go index ba72639b..5a97470e 100644 --- a/extension/counter/mysql/counter.go +++ b/extension/counter/mysql/counter.go @@ -5,21 +5,26 @@ import ( "database/sql" "fmt" + "github.com/uber-go/tally/v4" + "github.com/uber/submitqueue/core/metrics" "github.com/uber/submitqueue/extension/counter" ) type mysqlCounter struct { - db *sql.DB + db *sql.DB + scope tally.Scope } // NewCounter creates a new MySQL-backed Counter. -func NewCounter(db *sql.DB) counter.Counter { - return &mysqlCounter{db: db} +func NewCounter(db *sql.DB, scope tally.Scope) counter.Counter { + return &mysqlCounter{db: db, scope: scope} } // Next atomically increments the counter for the given domain and returns the new value. // Uses MySQL's LAST_INSERT_ID() to set the value atomically and read the incremented value. -func (c *mysqlCounter) Next(ctx context.Context, domain string) (int64, error) { +func (c *mysqlCounter) Next(ctx context.Context, domain string) (ret int64, retErr error) { + op := metrics.Begin(c.scope, "next") + defer func() { op.Complete(retErr) }() result, err := c.db.ExecContext(ctx, "INSERT INTO counter (domain, value) VALUES (?, LAST_INSERT_ID(1)) ON DUPLICATE KEY UPDATE value = LAST_INSERT_ID(value + 1)", domain, diff --git a/extension/scorer/composite/BUILD.bazel b/extension/scorer/composite/BUILD.bazel index 3016a608..6dfef9b5 100644 --- a/extension/scorer/composite/BUILD.bazel +++ b/extension/scorer/composite/BUILD.bazel @@ -6,8 +6,10 @@ go_library( importpath = "github.com/uber/submitqueue/extension/scorer/composite", visibility = ["//visibility:public"], deps = [ + "//core/metrics", "//entity", "//extension/scorer", + "@com_github_uber_go_tally_v4//:tally", ], ) @@ -20,5 +22,6 @@ go_test( "//extension/scorer", "@com_github_stretchr_testify//assert", "@com_github_stretchr_testify//require", + "@com_github_uber_go_tally_v4//:tally", ], ) diff --git a/extension/scorer/composite/scorer.go b/extension/scorer/composite/scorer.go index e1c7eba9..b557c9ae 100644 --- a/extension/scorer/composite/scorer.go +++ b/extension/scorer/composite/scorer.go @@ -3,6 +3,8 @@ package composite import ( "context" + "github.com/uber-go/tally/v4" + "github.com/uber/submitqueue/core/metrics" "github.com/uber/submitqueue/entity" "github.com/uber/submitqueue/extension/scorer" ) @@ -51,12 +53,14 @@ type compositeScorer struct { scorers map[string]scorer.Scorer // reduce combines named scores into a single value. reduce ReduceFunc + // scope is the tally scope for emitting metrics. + scope tally.Scope } // New creates a composite Scorer that evaluates all named child scorers and combines // their results using the given reduce function. // Panics if scorers is empty or reduce is nil. -func New(scorers map[string]scorer.Scorer, reduce ReduceFunc) scorer.Scorer { +func New(scorers map[string]scorer.Scorer, reduce ReduceFunc, scope tally.Scope) scorer.Scorer { if len(scorers) == 0 { panic("composite.New: scorers must not be empty") } @@ -66,12 +70,16 @@ func New(scorers map[string]scorer.Scorer, reduce ReduceFunc) scorer.Scorer { return &compositeScorer{ scorers: scorers, reduce: reduce, + scope: scope, } } // Score evaluates all child scorers and combines their results using the reduce function. // If any child scorer returns an error, that error is returned immediately. -func (c *compositeScorer) Score(ctx context.Context, change entity.Change) (float64, error) { +func (c *compositeScorer) Score(ctx context.Context, change entity.Change) (ret float64, retErr error) { + op := metrics.Begin(c.scope, "score") + defer func() { op.Complete(retErr) }() + scores := make(map[string]float64, len(c.scorers)) for name, s := range c.scorers { score, err := s.Score(ctx, change) diff --git a/extension/scorer/composite/scorer_test.go b/extension/scorer/composite/scorer_test.go index 6ee2e690..65802664 100644 --- a/extension/scorer/composite/scorer_test.go +++ b/extension/scorer/composite/scorer_test.go @@ -7,6 +7,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/uber-go/tally/v4" "github.com/uber/submitqueue/entity" "github.com/uber/submitqueue/extension/scorer" ) @@ -83,7 +84,7 @@ func TestScorer_Score(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - s := New(tt.scorers, tt.reduce) + s := New(tt.scorers, tt.reduce, tally.NoopScope) got, err := s.Score(context.Background(), entity.Change{}) require.NoError(t, err) assert.InDelta(t, tt.want, got, 1e-9) @@ -95,20 +96,20 @@ func TestScorer_Score_ChildError(t *testing.T) { s := New(map[string]scorer.Scorer{ "error": &errorScorer{}, "files": &fixedScorer{0.9}, - }, Min) + }, Min, tally.NoopScope) _, err := s.Score(context.Background(), entity.Change{}) require.Error(t, err) } func TestNew_EmptyScorers(t *testing.T) { assert.Panics(t, func() { - New(map[string]scorer.Scorer{}, Min) + New(map[string]scorer.Scorer{}, Min, tally.NoopScope) }) } func TestNew_NilReduce(t *testing.T) { assert.Panics(t, func() { - New(map[string]scorer.Scorer{"files": &fixedScorer{0.9}}, nil) + New(map[string]scorer.Scorer{"files": &fixedScorer{0.9}}, nil, tally.NoopScope) }) } @@ -124,7 +125,7 @@ func TestReduceFunc_ReceivesNames(t *testing.T) { s := New(map[string]scorer.Scorer{ "files": &fixedScorer{0.9}, "deps": &fixedScorer{0.95}, - }, custom) + }, custom, tally.NoopScope) got, err := s.Score(context.Background(), entity.Change{}) require.NoError(t, err) assert.Equal(t, 0.9, got) diff --git a/extension/scorer/heuristic/BUILD.bazel b/extension/scorer/heuristic/BUILD.bazel index f75806d6..f185c3d1 100644 --- a/extension/scorer/heuristic/BUILD.bazel +++ b/extension/scorer/heuristic/BUILD.bazel @@ -6,8 +6,10 @@ go_library( importpath = "github.com/uber/submitqueue/extension/scorer/heuristic", visibility = ["//visibility:public"], deps = [ + "//core/metrics", "//entity", "//extension/scorer", + "@com_github_uber_go_tally_v4//:tally", ], ) @@ -19,5 +21,6 @@ go_test( "//entity", "@com_github_stretchr_testify//assert", "@com_github_stretchr_testify//require", + "@com_github_uber_go_tally_v4//:tally", ], ) diff --git a/extension/scorer/heuristic/scorer.go b/extension/scorer/heuristic/scorer.go index 06144f4b..2958345c 100644 --- a/extension/scorer/heuristic/scorer.go +++ b/extension/scorer/heuristic/scorer.go @@ -4,6 +4,8 @@ import ( "context" "fmt" + "github.com/uber-go/tally/v4" + "github.com/uber/submitqueue/core/metrics" "github.com/uber/submitqueue/entity" "github.com/uber/submitqueue/extension/scorer" ) @@ -28,23 +30,28 @@ type heuristicScorer struct { buckets []Bucket // valueFunc extracts the numeric value from a Change. valueFunc ValueFunc + // scope is the tally scope for emitting metrics. + scope tally.Scope } // New creates a new heuristic Scorer with the given buckets and value function. // Panics if valueFunc is nil. -func New(buckets []Bucket, valueFunc ValueFunc) scorer.Scorer { +func New(buckets []Bucket, valueFunc ValueFunc, scope tally.Scope) scorer.Scorer { if valueFunc == nil { panic("heuristic.New: valueFunc must not be nil") } return &heuristicScorer{ buckets: buckets, valueFunc: valueFunc, + scope: scope, } } // Score extracts the value from the change, then returns the probability score for the first // bucket whose range [Min, Max] contains the value. Returns an error if no bucket matches. -func (s *heuristicScorer) Score(ctx context.Context, change entity.Change) (float64, error) { +func (s *heuristicScorer) Score(ctx context.Context, change entity.Change) (ret float64, retErr error) { + op := metrics.Begin(s.scope, "score") + defer func() { op.Complete(retErr) }() value, err := s.valueFunc(ctx, change) if err != nil { return 0, err diff --git a/extension/scorer/heuristic/scorer_test.go b/extension/scorer/heuristic/scorer_test.go index 6f371082..6878baf4 100644 --- a/extension/scorer/heuristic/scorer_test.go +++ b/extension/scorer/heuristic/scorer_test.go @@ -6,6 +6,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/uber-go/tally/v4" "github.com/uber/submitqueue/entity" ) @@ -91,7 +92,7 @@ func TestScorer_Score(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - s := New(tt.buckets, tt.valueFunc) + s := New(tt.buckets, tt.valueFunc, tally.NoopScope) got, err := s.Score(context.Background(), entity.Change{}) if tt.wantErr { require.Error(t, err) @@ -107,13 +108,13 @@ func TestScorer_Score_ValueFuncError(t *testing.T) { failing := func(_ context.Context, _ entity.Change) (int, error) { return 0, assert.AnError } - s := New([]Bucket{{Min: 0, Max: 10, Score: 0.9}}, failing) + s := New([]Bucket{{Min: 0, Max: 10, Score: 0.9}}, failing, tally.NoopScope) _, err := s.Score(context.Background(), entity.Change{}) require.Error(t, err) } func TestNew_NilValueFunc(t *testing.T) { assert.Panics(t, func() { - New([]Bucket{{Min: 0, Max: 10, Score: 0.85}}, nil) + New([]Bucket{{Min: 0, Max: 10, Score: 0.85}}, nil, tally.NoopScope) }) } diff --git a/extension/storage/mysql/BUILD.bazel b/extension/storage/mysql/BUILD.bazel index e81dce3e..113086fe 100644 --- a/extension/storage/mysql/BUILD.bazel +++ b/extension/storage/mysql/BUILD.bazel @@ -14,8 +14,10 @@ go_library( importpath = "github.com/uber/submitqueue/extension/storage/mysql", visibility = ["//visibility:public"], deps = [ + "//core/metrics", "//entity", "//extension/storage", "@com_github_go_sql_driver_mysql//:mysql", + "@com_github_uber_go_tally_v4//:tally", ], ) diff --git a/extension/storage/mysql/batch_dependent_store.go b/extension/storage/mysql/batch_dependent_store.go index a4872f2b..0ef5f369 100644 --- a/extension/storage/mysql/batch_dependent_store.go +++ b/extension/storage/mysql/batch_dependent_store.go @@ -8,22 +8,28 @@ import ( "fmt" "github.com/go-sql-driver/mysql" + "github.com/uber-go/tally/v4" + "github.com/uber/submitqueue/core/metrics" "github.com/uber/submitqueue/entity" "github.com/uber/submitqueue/extension/storage" ) type batchDependentStore struct { - db *sql.DB + db *sql.DB + scope tally.Scope } // NewBatchDependentStore creates a new MySQL-backed BatchDependentStore. -func NewBatchDependentStore(db *sql.DB) storage.BatchDependentStore { - return &batchDependentStore{db: db} +func NewBatchDependentStore(db *sql.DB, scope tally.Scope) storage.BatchDependentStore { + return &batchDependentStore{db: db, scope: scope} } // Get retrieves the batch dependent by batch ID. Returns ErrNotFound if the batch dependent is not found. -func (s *batchDependentStore) Get(ctx context.Context, batchID string) (entity.BatchDependent, error) { +func (s *batchDependentStore) Get(ctx context.Context, batchID string) (ret entity.BatchDependent, retErr error) { + op := metrics.Begin(s.scope, "get") + defer func() { op.Complete(retErr) }() + var bd entity.BatchDependent var dependentsJSON []byte @@ -47,7 +53,10 @@ func (s *batchDependentStore) Get(ctx context.Context, batchID string) (entity.B } // Create creates a new batch dependent. Returns ErrAlreadyExists if the entry already exists. -func (s *batchDependentStore) Create(ctx context.Context, batchDependent entity.BatchDependent) error { +func (s *batchDependentStore) Create(ctx context.Context, batchDependent entity.BatchDependent) (retErr error) { + op := metrics.Begin(s.scope, "create") + defer func() { op.Complete(retErr) }() + dependentsJSON, err := json.Marshal(batchDependent.Dependents) if err != nil { return fmt.Errorf("failed to marshal dependents batchID=%s for Create batch dependent entity: %w", batchDependent.BatchID, err) @@ -71,7 +80,10 @@ func (s *batchDependentStore) Create(ctx context.Context, batchDependent entity. // UpdateDependents updates the dependents of a batch dependent if the current version matches the expected version. // If versions do not match, returns ErrVersionMismatch. // The implementation increments the version by 1 atomically with the dependents update. -func (s *batchDependentStore) UpdateDependents(ctx context.Context, batchID string, version int32, dependents []string) error { +func (s *batchDependentStore) UpdateDependents(ctx context.Context, batchID string, version int32, dependents []string) (retErr error) { + op := metrics.Begin(s.scope, "update_dependents") + defer func() { op.Complete(retErr) }() + dependentsJSON, err := json.Marshal(dependents) if err != nil { return fmt.Errorf("failed to marshal dependents batchID=%s for UpdateDependents batch dependent entity: %w", batchID, err) diff --git a/extension/storage/mysql/batch_store.go b/extension/storage/mysql/batch_store.go index 62f96a3e..8ee3d9ea 100644 --- a/extension/storage/mysql/batch_store.go +++ b/extension/storage/mysql/batch_store.go @@ -9,22 +9,28 @@ import ( "strings" "github.com/go-sql-driver/mysql" + "github.com/uber-go/tally/v4" + "github.com/uber/submitqueue/core/metrics" "github.com/uber/submitqueue/entity" "github.com/uber/submitqueue/extension/storage" ) type batchStore struct { - db *sql.DB + db *sql.DB + scope tally.Scope } // NewBatchStore creates a new MySQL-backed BatchStore. -func NewBatchStore(db *sql.DB) storage.BatchStore { - return &batchStore{db: db} +func NewBatchStore(db *sql.DB, scope tally.Scope) storage.BatchStore { + return &batchStore{db: db, scope: scope} } // Get retrieves a batch by ID. Returns ErrNotFound if the batch is not found. -func (s *batchStore) Get(ctx context.Context, id string) (entity.Batch, error) { +func (s *batchStore) Get(ctx context.Context, id string) (ret entity.Batch, retErr error) { + op := metrics.Begin(s.scope, "get") + defer func() { op.Complete(retErr) }() + var batch entity.Batch var containsJSON []byte var dependenciesJSON []byte @@ -53,7 +59,10 @@ func (s *batchStore) Get(ctx context.Context, id string) (entity.Batch, error) { } // Create creates a new batch. The batch must have a unique ID already assigned. Returns ErrAlreadyExists if the batch ID already exists. -func (s *batchStore) Create(ctx context.Context, batch entity.Batch) error { +func (s *batchStore) Create(ctx context.Context, batch entity.Batch) (retErr error) { + op := metrics.Begin(s.scope, "create") + defer func() { op.Complete(retErr) }() + containsJSON, err := json.Marshal(batch.Contains) if err != nil { return fmt.Errorf("failed to marshal contains=%v id=%s for Create batch entity: %w", batch.Contains, batch.ID, err) @@ -81,7 +90,10 @@ func (s *batchStore) Create(ctx context.Context, batch entity.Batch) error { // UpdateState updates the state of a batch if the current version matches the expected version. If versions do not match, returns ErrVersionMismatch. // The implementation increments the version by 1 atomically with the state update. -func (s *batchStore) UpdateState(ctx context.Context, id string, version int32, newState entity.BatchState) error { +func (s *batchStore) UpdateState(ctx context.Context, id string, version int32, newState entity.BatchState) (retErr error) { + op := metrics.Begin(s.scope, "update_state") + defer func() { op.Complete(retErr) }() + result, err := s.db.ExecContext(ctx, "UPDATE batch SET state = ?, version = version + 1 WHERE id = ? AND version = ?", newState, id, version, @@ -112,7 +124,10 @@ func (s *batchStore) UpdateState(ctx context.Context, id string, version int32, } // GetByQueueAndStates retrieves all batches that belong to the given queue and are in the given states. -func (s *batchStore) GetByQueueAndStates(ctx context.Context, queue string, states []entity.BatchState) ([]entity.Batch, error) { +func (s *batchStore) GetByQueueAndStates(ctx context.Context, queue string, states []entity.BatchState) (ret []entity.Batch, retErr error) { + op := metrics.Begin(s.scope, "get_by_queue_and_states") + defer func() { op.Complete(retErr) }() + if len(states) == 0 { return nil, nil } diff --git a/extension/storage/mysql/build_store.go b/extension/storage/mysql/build_store.go index 1db9e06f..a0820aa2 100644 --- a/extension/storage/mysql/build_store.go +++ b/extension/storage/mysql/build_store.go @@ -8,22 +8,28 @@ import ( "fmt" "github.com/go-sql-driver/mysql" + "github.com/uber-go/tally/v4" + "github.com/uber/submitqueue/core/metrics" "github.com/uber/submitqueue/entity" "github.com/uber/submitqueue/extension/storage" ) type buildStore struct { - db *sql.DB + db *sql.DB + scope tally.Scope } // NewBuildStore creates a new MySQL-backed BuildStore. -func NewBuildStore(db *sql.DB) storage.BuildStore { - return &buildStore{db: db} +func NewBuildStore(db *sql.DB, scope tally.Scope) storage.BuildStore { + return &buildStore{db: db, scope: scope} } // Get retrieves a build by ID. Returns ErrNotFound if the build is not found. -func (s *buildStore) Get(ctx context.Context, id string) (entity.Build, error) { +func (s *buildStore) Get(ctx context.Context, id string) (ret entity.Build, retErr error) { + op := metrics.Begin(s.scope, "get") + defer func() { op.Complete(retErr) }() + var build entity.Build var speculationPathJSON []byte @@ -47,7 +53,10 @@ func (s *buildStore) Get(ctx context.Context, id string) (entity.Build, error) { } // Create creates a new build. The build must have a unique ID already assigned. Returns ErrAlreadyExists if the build ID already exists. -func (s *buildStore) Create(ctx context.Context, build entity.Build) error { +func (s *buildStore) Create(ctx context.Context, build entity.Build) (retErr error) { + op := metrics.Begin(s.scope, "create") + defer func() { op.Complete(retErr) }() + speculationPathJSON, err := json.Marshal(build.SpeculationPath) if err != nil { return fmt.Errorf("failed to marshal speculation_path id=%s for Create build entity: %w", build.ID, err) @@ -69,7 +78,10 @@ func (s *buildStore) Create(ctx context.Context, build entity.Build) error { } // UpdateStatus updates the status of a build. Returns ErrNotFound if the build is not found. -func (s *buildStore) UpdateStatus(ctx context.Context, id string, newStatus entity.BuildStatus) error { +func (s *buildStore) UpdateStatus(ctx context.Context, id string, newStatus entity.BuildStatus) (retErr error) { + op := metrics.Begin(s.scope, "update_status") + defer func() { op.Complete(retErr) }() + result, err := s.db.ExecContext(ctx, "UPDATE build SET status = ? WHERE id = ?", newStatus, id, diff --git a/extension/storage/mysql/change_provider_store.go b/extension/storage/mysql/change_provider_store.go index 44c9cdf4..ef6acae0 100644 --- a/extension/storage/mysql/change_provider_store.go +++ b/extension/storage/mysql/change_provider_store.go @@ -8,18 +8,21 @@ import ( "fmt" "github.com/go-sql-driver/mysql" + "github.com/uber-go/tally/v4" + "github.com/uber/submitqueue/core/metrics" "github.com/uber/submitqueue/entity" "github.com/uber/submitqueue/extension/storage" ) type changeProviderStore struct { - db *sql.DB + db *sql.DB + scope tally.Scope } // NewChangeProviderStore creates a new MySQL-backed ChangeProviderStore. -func NewChangeProviderStore(db *sql.DB) storage.ChangeProviderStore { - return &changeProviderStore{db: db} +func NewChangeProviderStore(db *sql.DB, scope tally.Scope) storage.ChangeProviderStore { + return &changeProviderStore{db: db, scope: scope} } // Get retrieves change provider(s) by request ID. Returns ErrNotFound if the change provider is not found. @@ -29,7 +32,10 @@ func NewChangeProviderStore(db *sql.DB) storage.ChangeProviderStore { // for inspecting and mapping the result of this function to the // order of changes within the original request. // -func (s *changeProviderStore) Get(ctx context.Context, requestID string) ([]entity.ChangeProvider, error) { +func (s *changeProviderStore) Get(ctx context.Context, requestID string) (ret []entity.ChangeProvider, retErr error) { + op := metrics.Begin(s.scope, "get") + defer func() { op.Complete(retErr) }() + rows, err := s.db.QueryContext(ctx, "SELECT request_id, change_provider_src, change_provider_id, metadata FROM change_provider WHERE request_id = ?", requestID, @@ -66,7 +72,10 @@ func (s *changeProviderStore) Get(ctx context.Context, requestID string) ([]enti } // Create creates a new change provider. Returns ErrAlreadyExists if the entry already exists. -func (s *changeProviderStore) Create(ctx context.Context, changeProvider entity.ChangeProvider) error { +func (s *changeProviderStore) Create(ctx context.Context, changeProvider entity.ChangeProvider) (retErr error) { + op := metrics.Begin(s.scope, "create") + defer func() { op.Complete(retErr) }() + metadataJSON, err := json.Marshal(changeProvider.Metadata) if err != nil { return fmt.Errorf("failed to marshal metadata id=%s for Create change provider entity: %w", changeProvider.RequestID, err) diff --git a/extension/storage/mysql/request_store.go b/extension/storage/mysql/request_store.go index 4937066f..1045a9ef 100644 --- a/extension/storage/mysql/request_store.go +++ b/extension/storage/mysql/request_store.go @@ -8,22 +8,28 @@ import ( "fmt" "github.com/go-sql-driver/mysql" + "github.com/uber-go/tally/v4" + "github.com/uber/submitqueue/core/metrics" "github.com/uber/submitqueue/entity" "github.com/uber/submitqueue/extension/storage" ) type requestStore struct { - db *sql.DB + db *sql.DB + scope tally.Scope } // NewRequestStore creates a new MySQL-backed RequestStore. -func NewRequestStore(db *sql.DB) storage.RequestStore { - return &requestStore{db: db} +func NewRequestStore(db *sql.DB, scope tally.Scope) storage.RequestStore { + return &requestStore{db: db, scope: scope} } // Get retrieves a land request by ID. Returns ErrNotFound if the request is not found. -func (r *requestStore) Get(ctx context.Context, id string) (entity.Request, error) { +func (r *requestStore) Get(ctx context.Context, id string) (ret entity.Request, retErr error) { + op := metrics.Begin(r.scope, "get") + defer func() { op.Complete(retErr) }() + var req entity.Request var changeURIsJSON []byte @@ -48,7 +54,10 @@ func (r *requestStore) Get(ctx context.Context, id string) (entity.Request, erro } // Create creates a new land request. The request must have a unique ID already assigned. Returns ErrAlreadyExists if the request ID already exists. -func (r *requestStore) Create(ctx context.Context, request entity.Request) error { +func (r *requestStore) Create(ctx context.Context, request entity.Request) (retErr error) { + op := metrics.Begin(r.scope, "create") + defer func() { op.Complete(retErr) }() + // Marshal the change URIs to JSON changeURIsJSON, err := json.Marshal(request.Change.URIs) if err != nil { @@ -74,7 +83,10 @@ func (r *requestStore) Create(ctx context.Context, request entity.Request) error // UpdateState updates the state of a land request if the current version matches the expected version. If versions do not match, returns ErrVersionMismatch. // The implementation increments the version by 1 atomically with the state update. -func (r *requestStore) UpdateState(ctx context.Context, id string, version int32, newState entity.RequestState) error { +func (r *requestStore) UpdateState(ctx context.Context, id string, version int32, newState entity.RequestState) (retErr error) { + op := metrics.Begin(r.scope, "update_state") + defer func() { op.Complete(retErr) }() + result, err := r.db.ExecContext(ctx, "UPDATE request SET state = ?, version = version + 1 WHERE id = ? AND version = ?", newState, id, version, diff --git a/extension/storage/mysql/speculation_tree_store.go b/extension/storage/mysql/speculation_tree_store.go index 54e83c97..2be22844 100644 --- a/extension/storage/mysql/speculation_tree_store.go +++ b/extension/storage/mysql/speculation_tree_store.go @@ -8,22 +8,28 @@ import ( "fmt" "github.com/go-sql-driver/mysql" + "github.com/uber-go/tally/v4" + "github.com/uber/submitqueue/core/metrics" "github.com/uber/submitqueue/entity" "github.com/uber/submitqueue/extension/storage" ) type speculationTreeStore struct { - db *sql.DB + db *sql.DB + scope tally.Scope } // NewSpeculationTreeStore creates a new MySQL-backed SpeculationTreeStore. -func NewSpeculationTreeStore(db *sql.DB) storage.SpeculationTreeStore { - return &speculationTreeStore{db: db} +func NewSpeculationTreeStore(db *sql.DB, scope tally.Scope) storage.SpeculationTreeStore { + return &speculationTreeStore{db: db, scope: scope} } // Get retrieves the speculation tree by batch ID. Returns ErrNotFound if the speculation tree is not found. -func (s *speculationTreeStore) Get(ctx context.Context, batchID string) (entity.SpeculationTree, error) { +func (s *speculationTreeStore) Get(ctx context.Context, batchID string) (ret entity.SpeculationTree, retErr error) { + op := metrics.Begin(s.scope, "get") + defer func() { op.Complete(retErr) }() + var st entity.SpeculationTree var speculationsJSON []byte @@ -47,7 +53,10 @@ func (s *speculationTreeStore) Get(ctx context.Context, batchID string) (entity. } // Create creates a new speculation tree. Returns ErrAlreadyExists if the entry already exists. -func (s *speculationTreeStore) Create(ctx context.Context, speculationTree entity.SpeculationTree) error { +func (s *speculationTreeStore) Create(ctx context.Context, speculationTree entity.SpeculationTree) (retErr error) { + op := metrics.Begin(s.scope, "create") + defer func() { op.Complete(retErr) }() + speculationsJSON, err := json.Marshal(speculationTree.Speculations) if err != nil { return fmt.Errorf("failed to marshal speculations batchID=%s for Create speculation tree entity: %w", speculationTree.BatchID, err) @@ -69,7 +78,10 @@ func (s *speculationTreeStore) Create(ctx context.Context, speculationTree entit } // UpdateSpeculations updates the speculations of a speculation tree. Returns ErrNotFound if the speculation tree is not found. -func (s *speculationTreeStore) UpdateSpeculations(ctx context.Context, batchID string, speculations []entity.SpeculationInfo) error { +func (s *speculationTreeStore) UpdateSpeculations(ctx context.Context, batchID string, speculations []entity.SpeculationInfo) (retErr error) { + op := metrics.Begin(s.scope, "update_speculations") + defer func() { op.Complete(retErr) }() + speculationsJSON, err := json.Marshal(speculations) if err != nil { return fmt.Errorf("failed to marshal speculations batchID=%s for UpdateSpeculations: %w", batchID, err) diff --git a/extension/storage/mysql/storage.go b/extension/storage/mysql/storage.go index cef6e353..4fee579a 100644 --- a/extension/storage/mysql/storage.go +++ b/extension/storage/mysql/storage.go @@ -4,6 +4,7 @@ import ( "database/sql" _ "github.com/go-sql-driver/mysql" + "github.com/uber-go/tally/v4" "github.com/uber/submitqueue/extension/storage" ) @@ -19,15 +20,15 @@ type mysqlStorage struct { } // NewStorage creates a new MySQL storage. -func NewStorage(db *sql.DB) (storage.Storage, error) { +func NewStorage(db *sql.DB, scope tally.Scope) (storage.Storage, error) { return &mysqlStorage{ db: db, - requestStore: NewRequestStore(db), - changeProviderStore: NewChangeProviderStore(db), - batchStore: NewBatchStore(db), - batchDependentStore: NewBatchDependentStore(db), - buildStore: NewBuildStore(db), - speculationTreeStore: NewSpeculationTreeStore(db), + requestStore: NewRequestStore(db, scope.SubScope("request_store")), + changeProviderStore: NewChangeProviderStore(db, scope.SubScope("change_provider_store")), + batchStore: NewBatchStore(db, scope.SubScope("batch_store")), + batchDependentStore: NewBatchDependentStore(db, scope.SubScope("batch_dependent_store")), + buildStore: NewBuildStore(db, scope.SubScope("build_store")), + speculationTreeStore: NewSpeculationTreeStore(db, scope.SubScope("speculation_tree_store")), }, nil } diff --git a/test/integration/extension/counter/mysql/BUILD.bazel b/test/integration/extension/counter/mysql/BUILD.bazel index fb475cc1..c1b09259 100644 --- a/test/integration/extension/counter/mysql/BUILD.bazel +++ b/test/integration/extension/counter/mysql/BUILD.bazel @@ -15,5 +15,6 @@ go_test( "@com_github_go_sql_driver_mysql//:mysql", "@com_github_stretchr_testify//require", "@com_github_stretchr_testify//suite", + "@com_github_uber_go_tally_v4//:tally", ], ) diff --git a/test/integration/extension/counter/mysql/counter_test.go b/test/integration/extension/counter/mysql/counter_test.go index bdb6532b..0d0e75c3 100644 --- a/test/integration/extension/counter/mysql/counter_test.go +++ b/test/integration/extension/counter/mysql/counter_test.go @@ -8,6 +8,7 @@ import ( _ "github.com/go-sql-driver/mysql" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" + "github.com/uber-go/tally/v4" mysqlcounter "github.com/uber/submitqueue/extension/counter/mysql" countersuite "github.com/uber/submitqueue/test/integration/extension/counter" "github.com/uber/submitqueue/test/testutil" @@ -59,7 +60,7 @@ func (s *MySQLCounterIntegrationSuite) SetupSuite() { s.log.Logf("Schemas applied successfully") // Create counter instance - cnt := mysqlcounter.NewCounter(s.db) + cnt := mysqlcounter.NewCounter(s.db, tally.NoopScope) // Provide the counter instance to the contract suite s.SetContext(ctx) diff --git a/test/integration/extension/storage/mysql/BUILD.bazel b/test/integration/extension/storage/mysql/BUILD.bazel index 00f704d0..cb7e9aa7 100644 --- a/test/integration/extension/storage/mysql/BUILD.bazel +++ b/test/integration/extension/storage/mysql/BUILD.bazel @@ -15,5 +15,6 @@ go_test( "@com_github_go_sql_driver_mysql//:mysql", "@com_github_stretchr_testify//require", "@com_github_stretchr_testify//suite", + "@com_github_uber_go_tally_v4//:tally", ], ) diff --git a/test/integration/extension/storage/mysql/storage_test.go b/test/integration/extension/storage/mysql/storage_test.go index 4f4c9d41..7c1bda5a 100644 --- a/test/integration/extension/storage/mysql/storage_test.go +++ b/test/integration/extension/storage/mysql/storage_test.go @@ -8,6 +8,7 @@ import ( _ "github.com/go-sql-driver/mysql" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" + "github.com/uber-go/tally/v4" mysqlstorage "github.com/uber/submitqueue/extension/storage/mysql" storagesuite "github.com/uber/submitqueue/test/integration/extension/storage" "github.com/uber/submitqueue/test/testutil" @@ -59,7 +60,7 @@ func (s *MySQLStorageIntegrationSuite) SetupSuite() { s.log.Logf("Schemas applied successfully") // Create storage instance using the existing database connection - store, err := mysqlstorage.NewStorage(s.db) + store, err := mysqlstorage.NewStorage(s.db, tally.NoopScope) require.NoError(t, err, "failed to create storage") // Provide the storage instance to the contract suite