Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion example/server/gateway/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func run() error {
defer appDB.Close()

// Initialize counter from shared app database connection
cnt := mysqlcounter.NewCounter(appDB)
cnt := mysqlcounter.NewCounter(appDB, scope.SubScope("counter"))

// Open queue database connection
queueDSN := os.Getenv("QUEUE_MYSQL_DSN")
Expand Down
4 changes: 2 additions & 2 deletions example/server/orchestrator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,9 @@ func run() error {
}
defer appDB.Close()

cnt := mysqlcounter.NewCounter(appDB)
cnt := mysqlcounter.NewCounter(appDB, scope.SubScope("counter"))

store, err := mysqlstorage.NewStorage(appDB)
store, err := mysqlstorage.NewStorage(appDB, scope.SubScope("storage"))
if err != nil {
return fmt.Errorf("failed to create storage: %w", err)
}
Expand Down
2 changes: 2 additions & 0 deletions extension/counter/mysql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ go_library(
importpath = "github.com/uber/submitqueue/extension/counter/mysql",
visibility = ["//visibility:public"],
deps = [
"//core/metrics",
"//extension/counter",
"@com_github_uber_go_tally_v4//:tally",
],
)
13 changes: 9 additions & 4 deletions extension/counter/mysql/counter.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,26 @@ import (
"database/sql"
"fmt"

"github.com/uber-go/tally/v4"
"github.com/uber/submitqueue/core/metrics"
"github.com/uber/submitqueue/extension/counter"
)

type mysqlCounter struct {
db *sql.DB
db *sql.DB
scope tally.Scope
}

// NewCounter creates a new MySQL-backed Counter.
func NewCounter(db *sql.DB) counter.Counter {
return &mysqlCounter{db: db}
func NewCounter(db *sql.DB, scope tally.Scope) counter.Counter {
return &mysqlCounter{db: db, scope: scope}
}

// Next atomically increments the counter for the given domain and returns the new value.
// Uses MySQL's LAST_INSERT_ID() to set the value atomically and read the incremented value.
func (c *mysqlCounter) Next(ctx context.Context, domain string) (int64, error) {
func (c *mysqlCounter) Next(ctx context.Context, domain string) (ret int64, retErr error) {
op := metrics.Begin(c.scope, "next")
defer func() { op.Complete(retErr) }()
Comment thread
behinddwalls marked this conversation as resolved.
result, err := c.db.ExecContext(ctx,
"INSERT INTO counter (domain, value) VALUES (?, LAST_INSERT_ID(1)) ON DUPLICATE KEY UPDATE value = LAST_INSERT_ID(value + 1)",
domain,
Expand Down
3 changes: 3 additions & 0 deletions extension/scorer/composite/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ go_library(
importpath = "github.com/uber/submitqueue/extension/scorer/composite",
visibility = ["//visibility:public"],
deps = [
"//core/metrics",
"//entity",
"//extension/scorer",
"@com_github_uber_go_tally_v4//:tally",
],
)

Expand All @@ -20,5 +22,6 @@ go_test(
"//extension/scorer",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
"@com_github_uber_go_tally_v4//:tally",
],
)
12 changes: 10 additions & 2 deletions extension/scorer/composite/scorer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package composite
import (
"context"

"github.com/uber-go/tally/v4"
"github.com/uber/submitqueue/core/metrics"
"github.com/uber/submitqueue/entity"
"github.com/uber/submitqueue/extension/scorer"
)
Expand Down Expand Up @@ -51,12 +53,14 @@ type compositeScorer struct {
scorers map[string]scorer.Scorer
// reduce combines named scores into a single value.
reduce ReduceFunc
// scope is the tally scope for emitting metrics.
scope tally.Scope
}

// New creates a composite Scorer that evaluates all named child scorers and combines
// their results using the given reduce function.
// Panics if scorers is empty or reduce is nil.
func New(scorers map[string]scorer.Scorer, reduce ReduceFunc) scorer.Scorer {
func New(scorers map[string]scorer.Scorer, reduce ReduceFunc, scope tally.Scope) scorer.Scorer {
if len(scorers) == 0 {
panic("composite.New: scorers must not be empty")
}
Expand All @@ -66,12 +70,16 @@ func New(scorers map[string]scorer.Scorer, reduce ReduceFunc) scorer.Scorer {
return &compositeScorer{
scorers: scorers,
reduce: reduce,
scope: scope,
}
}

// Score evaluates all child scorers and combines their results using the reduce function.
// If any child scorer returns an error, that error is returned immediately.
func (c *compositeScorer) Score(ctx context.Context, change entity.Change) (float64, error) {
func (c *compositeScorer) Score(ctx context.Context, change entity.Change) (ret float64, retErr error) {
op := metrics.Begin(c.scope, "score")
defer func() { op.Complete(retErr) }()

scores := make(map[string]float64, len(c.scorers))
for name, s := range c.scorers {
score, err := s.Score(ctx, change)
Expand Down
11 changes: 6 additions & 5 deletions extension/scorer/composite/scorer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/uber-go/tally/v4"
"github.com/uber/submitqueue/entity"
"github.com/uber/submitqueue/extension/scorer"
)
Expand Down Expand Up @@ -83,7 +84,7 @@ func TestScorer_Score(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s := New(tt.scorers, tt.reduce)
s := New(tt.scorers, tt.reduce, tally.NoopScope)
got, err := s.Score(context.Background(), entity.Change{})
require.NoError(t, err)
assert.InDelta(t, tt.want, got, 1e-9)
Expand All @@ -95,20 +96,20 @@ func TestScorer_Score_ChildError(t *testing.T) {
s := New(map[string]scorer.Scorer{
"error": &errorScorer{},
"files": &fixedScorer{0.9},
}, Min)
}, Min, tally.NoopScope)
_, err := s.Score(context.Background(), entity.Change{})
require.Error(t, err)
}

func TestNew_EmptyScorers(t *testing.T) {
assert.Panics(t, func() {
New(map[string]scorer.Scorer{}, Min)
New(map[string]scorer.Scorer{}, Min, tally.NoopScope)
})
}

func TestNew_NilReduce(t *testing.T) {
assert.Panics(t, func() {
New(map[string]scorer.Scorer{"files": &fixedScorer{0.9}}, nil)
New(map[string]scorer.Scorer{"files": &fixedScorer{0.9}}, nil, tally.NoopScope)
})
}

Expand All @@ -124,7 +125,7 @@ func TestReduceFunc_ReceivesNames(t *testing.T) {
s := New(map[string]scorer.Scorer{
"files": &fixedScorer{0.9},
"deps": &fixedScorer{0.95},
}, custom)
}, custom, tally.NoopScope)
got, err := s.Score(context.Background(), entity.Change{})
require.NoError(t, err)
assert.Equal(t, 0.9, got)
Expand Down
3 changes: 3 additions & 0 deletions extension/scorer/heuristic/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ go_library(
importpath = "github.com/uber/submitqueue/extension/scorer/heuristic",
visibility = ["//visibility:public"],
deps = [
"//core/metrics",
"//entity",
"//extension/scorer",
"@com_github_uber_go_tally_v4//:tally",
],
)

Expand All @@ -19,5 +21,6 @@ go_test(
"//entity",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
"@com_github_uber_go_tally_v4//:tally",
],
)
11 changes: 9 additions & 2 deletions extension/scorer/heuristic/scorer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"fmt"

"github.com/uber-go/tally/v4"
"github.com/uber/submitqueue/core/metrics"
"github.com/uber/submitqueue/entity"
"github.com/uber/submitqueue/extension/scorer"
)
Expand All @@ -28,23 +30,28 @@ type heuristicScorer struct {
buckets []Bucket
// valueFunc extracts the numeric value from a Change.
valueFunc ValueFunc
// scope is the tally scope for emitting metrics.
scope tally.Scope
}

// New creates a new heuristic Scorer with the given buckets and value function.
// Panics if valueFunc is nil.
func New(buckets []Bucket, valueFunc ValueFunc) scorer.Scorer {
func New(buckets []Bucket, valueFunc ValueFunc, scope tally.Scope) scorer.Scorer {
if valueFunc == nil {
panic("heuristic.New: valueFunc must not be nil")
}
return &heuristicScorer{
buckets: buckets,
valueFunc: valueFunc,
scope: scope,
}
}

// Score extracts the value from the change, then returns the probability score for the first
// bucket whose range [Min, Max] contains the value. Returns an error if no bucket matches.
func (s *heuristicScorer) Score(ctx context.Context, change entity.Change) (float64, error) {
func (s *heuristicScorer) Score(ctx context.Context, change entity.Change) (ret float64, retErr error) {
op := metrics.Begin(s.scope, "score")
defer func() { op.Complete(retErr) }()
value, err := s.valueFunc(ctx, change)
if err != nil {
return 0, err
Expand Down
7 changes: 4 additions & 3 deletions extension/scorer/heuristic/scorer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/uber-go/tally/v4"
"github.com/uber/submitqueue/entity"
)

Expand Down Expand Up @@ -91,7 +92,7 @@ func TestScorer_Score(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s := New(tt.buckets, tt.valueFunc)
s := New(tt.buckets, tt.valueFunc, tally.NoopScope)
got, err := s.Score(context.Background(), entity.Change{})
if tt.wantErr {
require.Error(t, err)
Expand All @@ -107,13 +108,13 @@ func TestScorer_Score_ValueFuncError(t *testing.T) {
failing := func(_ context.Context, _ entity.Change) (int, error) {
return 0, assert.AnError
}
s := New([]Bucket{{Min: 0, Max: 10, Score: 0.9}}, failing)
s := New([]Bucket{{Min: 0, Max: 10, Score: 0.9}}, failing, tally.NoopScope)
_, err := s.Score(context.Background(), entity.Change{})
require.Error(t, err)
}

func TestNew_NilValueFunc(t *testing.T) {
assert.Panics(t, func() {
New([]Bucket{{Min: 0, Max: 10, Score: 0.85}}, nil)
New([]Bucket{{Min: 0, Max: 10, Score: 0.85}}, nil, tally.NoopScope)
})
}
2 changes: 2 additions & 0 deletions extension/storage/mysql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@ go_library(
importpath = "github.com/uber/submitqueue/extension/storage/mysql",
visibility = ["//visibility:public"],
deps = [
"//core/metrics",
"//entity",
"//extension/storage",
"@com_github_go_sql_driver_mysql//:mysql",
"@com_github_uber_go_tally_v4//:tally",
],
)
24 changes: 18 additions & 6 deletions extension/storage/mysql/batch_dependent_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,28 @@ import (
"fmt"

"github.com/go-sql-driver/mysql"
"github.com/uber-go/tally/v4"

"github.com/uber/submitqueue/core/metrics"
"github.com/uber/submitqueue/entity"
"github.com/uber/submitqueue/extension/storage"
)

type batchDependentStore struct {
db *sql.DB
db *sql.DB
scope tally.Scope
}

// NewBatchDependentStore creates a new MySQL-backed BatchDependentStore.
func NewBatchDependentStore(db *sql.DB) storage.BatchDependentStore {
return &batchDependentStore{db: db}
func NewBatchDependentStore(db *sql.DB, scope tally.Scope) storage.BatchDependentStore {
return &batchDependentStore{db: db, scope: scope}
}

// Get retrieves the batch dependent by batch ID. Returns ErrNotFound if the batch dependent is not found.
func (s *batchDependentStore) Get(ctx context.Context, batchID string) (entity.BatchDependent, error) {
func (s *batchDependentStore) Get(ctx context.Context, batchID string) (ret entity.BatchDependent, retErr error) {
op := metrics.Begin(s.scope, "get")
defer func() { op.Complete(retErr) }()

var bd entity.BatchDependent
var dependentsJSON []byte

Expand All @@ -47,7 +53,10 @@ func (s *batchDependentStore) Get(ctx context.Context, batchID string) (entity.B
}

// Create creates a new batch dependent. Returns ErrAlreadyExists if the entry already exists.
func (s *batchDependentStore) Create(ctx context.Context, batchDependent entity.BatchDependent) error {
func (s *batchDependentStore) Create(ctx context.Context, batchDependent entity.BatchDependent) (retErr error) {
op := metrics.Begin(s.scope, "create")
defer func() { op.Complete(retErr) }()

dependentsJSON, err := json.Marshal(batchDependent.Dependents)
if err != nil {
return fmt.Errorf("failed to marshal dependents batchID=%s for Create batch dependent entity: %w", batchDependent.BatchID, err)
Expand All @@ -71,7 +80,10 @@ func (s *batchDependentStore) Create(ctx context.Context, batchDependent entity.
// UpdateDependents updates the dependents of a batch dependent if the current version matches the expected version.
// If versions do not match, returns ErrVersionMismatch.
// The implementation increments the version by 1 atomically with the dependents update.
func (s *batchDependentStore) UpdateDependents(ctx context.Context, batchID string, version int32, dependents []string) error {
func (s *batchDependentStore) UpdateDependents(ctx context.Context, batchID string, version int32, dependents []string) (retErr error) {
op := metrics.Begin(s.scope, "update_dependents")
defer func() { op.Complete(retErr) }()

dependentsJSON, err := json.Marshal(dependents)
if err != nil {
return fmt.Errorf("failed to marshal dependents batchID=%s for UpdateDependents batch dependent entity: %w", batchID, err)
Expand Down
29 changes: 22 additions & 7 deletions extension/storage/mysql/batch_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,28 @@ import (
"strings"

"github.com/go-sql-driver/mysql"
"github.com/uber-go/tally/v4"

"github.com/uber/submitqueue/core/metrics"
"github.com/uber/submitqueue/entity"
"github.com/uber/submitqueue/extension/storage"
)

type batchStore struct {
db *sql.DB
db *sql.DB
scope tally.Scope
}

// NewBatchStore creates a new MySQL-backed BatchStore.
func NewBatchStore(db *sql.DB) storage.BatchStore {
return &batchStore{db: db}
func NewBatchStore(db *sql.DB, scope tally.Scope) storage.BatchStore {
return &batchStore{db: db, scope: scope}
}

// Get retrieves a batch by ID. Returns ErrNotFound if the batch is not found.
func (s *batchStore) Get(ctx context.Context, id string) (entity.Batch, error) {
func (s *batchStore) Get(ctx context.Context, id string) (ret entity.Batch, retErr error) {
op := metrics.Begin(s.scope, "get")
defer func() { op.Complete(retErr) }()

var batch entity.Batch
var containsJSON []byte
var dependenciesJSON []byte
Expand Down Expand Up @@ -53,7 +59,10 @@ func (s *batchStore) Get(ctx context.Context, id string) (entity.Batch, error) {
}

// Create creates a new batch. The batch must have a unique ID already assigned. Returns ErrAlreadyExists if the batch ID already exists.
func (s *batchStore) Create(ctx context.Context, batch entity.Batch) error {
func (s *batchStore) Create(ctx context.Context, batch entity.Batch) (retErr error) {
op := metrics.Begin(s.scope, "create")
defer func() { op.Complete(retErr) }()

containsJSON, err := json.Marshal(batch.Contains)
if err != nil {
return fmt.Errorf("failed to marshal contains=%v id=%s for Create batch entity: %w", batch.Contains, batch.ID, err)
Expand Down Expand Up @@ -81,7 +90,10 @@ func (s *batchStore) Create(ctx context.Context, batch entity.Batch) error {

// UpdateState updates the state of a batch if the current version matches the expected version. If versions do not match, returns ErrVersionMismatch.
// The implementation increments the version by 1 atomically with the state update.
func (s *batchStore) UpdateState(ctx context.Context, id string, version int32, newState entity.BatchState) error {
func (s *batchStore) UpdateState(ctx context.Context, id string, version int32, newState entity.BatchState) (retErr error) {
op := metrics.Begin(s.scope, "update_state")
defer func() { op.Complete(retErr) }()

result, err := s.db.ExecContext(ctx,
"UPDATE batch SET state = ?, version = version + 1 WHERE id = ? AND version = ?",
newState, id, version,
Expand Down Expand Up @@ -112,7 +124,10 @@ func (s *batchStore) UpdateState(ctx context.Context, id string, version int32,
}

// GetByQueueAndStates retrieves all batches that belong to the given queue and are in the given states.
func (s *batchStore) GetByQueueAndStates(ctx context.Context, queue string, states []entity.BatchState) ([]entity.Batch, error) {
func (s *batchStore) GetByQueueAndStates(ctx context.Context, queue string, states []entity.BatchState) (ret []entity.Batch, retErr error) {
op := metrics.Begin(s.scope, "get_by_queue_and_states")
defer func() { op.Complete(retErr) }()

if len(states) == 0 {
return nil, nil
}
Expand Down
Loading