Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 8 additions & 7 deletions example/submitqueue/orchestrator/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -515,16 +515,17 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t
logger,
scope,
store,
// TODO: replace with a real scorer
// Heuristic scorer: bucket the batch by total lines changed across all of
// its changes — larger batches are likelier to fail to land.
scorerFactory{impl: heuristic.New(
[]heuristic.Bucket{
{Min: 0, Max: 1, Score: 0.95},
{Min: 2, Max: 5, Score: 0.80},
{Min: 6, Max: 20, Score: 0.60},
{Min: 21, Max: 1<<31 - 1, Score: 0.40},
{Min: 0, Max: 50, Score: 0.95},
{Min: 51, Max: 250, Score: 0.80},
{Min: 251, Max: 1000, Score: 0.60},
{Min: 1001, Max: 1<<31 - 1, Score: 0.40},
},
func(_ context.Context, change entity.Change) (int, error) {
return len(change.URIs), nil
func(_ context.Context, changes entity.BatchChanges) (int, error) {
return changes.TotalLinesChanged(), nil
},
scope.SubScope("scorer"),
)},
Expand Down
1 change: 1 addition & 0 deletions submitqueue/entity/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go_library(
name = "entity",
srcs = [
"batch.go",
"batch_changes.go",
"batch_dependent.go",
"build.go",
"cancel_request.go",
Expand Down
48 changes: 48 additions & 0 deletions submitqueue/entity/batch_changes.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Copyright (c) 2025 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package entity

// BatchChanges is the normalized, batch-level view of all changes in a batch,
// assembled by the score controller and handed to a Scorer. A Batch references
// only request IDs, so the controller resolves each request's change records and
// flattens their details into Changes — giving the scorer the whole batch's
// change facts in one value without coupling it to storage.
type BatchChanges struct {
// BatchID is the batch being scored. Format: "<queue>/batch/<counter_value>".
BatchID string
// Queue is the queue the batch belongs to.
Queue string
// Changes is every change (URI + provider-supplied details) across all requests
// in the batch. Order is unspecified.
Changes []ChangeInfo
}

// TotalLinesChanged returns the total number of lines touched across every change in the batch.
func (b BatchChanges) TotalLinesChanged() int {
total := 0
for _, c := range b.Changes {
total += c.Details.TotalLinesChanged()
}
return total
}

// TotalFiles returns the total number of files touched across every change in the batch.
func (b BatchChanges) TotalFiles() int {
total := 0
for _, c := range b.Changes {
total += c.Details.FileCount()
}
return total
}
4 changes: 2 additions & 2 deletions submitqueue/extension/scorer/composite/scorer.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,13 @@ func New(scorers map[string]scorer.Scorer, reduce ReduceFunc, scope tally.Scope)

// Score evaluates all child scorers and combines their results using the reduce function.
// If any child scorer returns an error, that error is returned immediately.
func (c *compositeScorer) Score(ctx context.Context, change entity.Change) (ret float64, retErr error) {
func (c *compositeScorer) Score(ctx context.Context, changes entity.BatchChanges) (ret float64, retErr error) {
op := metrics.Begin(c.scope, "score")
defer func() { op.Complete(retErr) }()

scores := make(map[string]float64, len(c.scorers))
for name, s := range c.scorers {
score, err := s.Score(ctx, change)
score, err := s.Score(ctx, changes)
if err != nil {
return 0, err
}
Expand Down
10 changes: 5 additions & 5 deletions submitqueue/extension/scorer/composite/scorer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,14 @@ type fixedScorer struct {
score float64
}

func (f *fixedScorer) Score(_ context.Context, _ entity.Change) (float64, error) {
func (f *fixedScorer) Score(_ context.Context, _ entity.BatchChanges) (float64, error) {
return f.score, nil
}

// errorScorer always returns an error.
type errorScorer struct{}

func (e *errorScorer) Score(_ context.Context, _ entity.Change) (float64, error) {
func (e *errorScorer) Score(_ context.Context, _ entity.BatchChanges) (float64, error) {
return 0, fmt.Errorf("scorer failed")
}

Expand Down Expand Up @@ -99,7 +99,7 @@ func TestScorer_Score(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s := New(tt.scorers, tt.reduce, tally.NoopScope)
got, err := s.Score(context.Background(), entity.Change{})
got, err := s.Score(context.Background(), entity.BatchChanges{})
require.NoError(t, err)
assert.InDelta(t, tt.want, got, 1e-9)
})
Expand All @@ -111,7 +111,7 @@ func TestScorer_Score_ChildError(t *testing.T) {
"error": &errorScorer{},
"files": &fixedScorer{0.9},
}, Min, tally.NoopScope)
_, err := s.Score(context.Background(), entity.Change{})
_, err := s.Score(context.Background(), entity.BatchChanges{})
require.Error(t, err)
}

Expand Down Expand Up @@ -140,7 +140,7 @@ func TestReduceFunc_ReceivesNames(t *testing.T) {
"files": &fixedScorer{0.9},
"deps": &fixedScorer{0.95},
}, custom, tally.NoopScope)
got, err := s.Score(context.Background(), entity.Change{})
got, err := s.Score(context.Background(), entity.BatchChanges{})
require.NoError(t, err)
assert.Equal(t, 0.9, got)
assert.ElementsMatch(t, []string{"files", "deps"}, receivedNames)
Expand Down
16 changes: 8 additions & 8 deletions submitqueue/extension/scorer/heuristic/scorer.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import (
"github.com/uber/submitqueue/submitqueue/extension/scorer"
)

// ValueFunc extracts a single numeric value from a Change for bucketing.
type ValueFunc func(context.Context, entity.Change) (int, error)
// ValueFunc extracts a single numeric value from a batch of changes for bucketing.
type ValueFunc func(context.Context, entity.BatchChanges) (int, error)

// Bucket defines a range [Min, Max] mapped to a probability Score.
type Bucket struct {
Expand All @@ -37,12 +37,12 @@ type Bucket struct {
Score float64
}

// heuristicScorer computes a success probability by bucketing a metric extracted from a Change.
// heuristicScorer computes a success probability by bucketing a metric extracted from a batch of changes.
// It follows the Java HeuristicsBasedSuccessPredictor pattern.
type heuristicScorer struct {
// buckets is the list of ranges to match against.
buckets []Bucket
// valueFunc extracts the numeric value from a Change.
// valueFunc extracts the numeric value from a batch of changes.
valueFunc ValueFunc
// scope is the tally scope for emitting metrics.
scope tally.Scope
Expand All @@ -61,12 +61,12 @@ func New(buckets []Bucket, valueFunc ValueFunc, scope tally.Scope) scorer.Scorer
}
}

// Score extracts the value from the change, then returns the probability score for the first
// bucket whose range [Min, Max] contains the value. Returns an error if no bucket matches.
func (s *heuristicScorer) Score(ctx context.Context, change entity.Change) (ret float64, retErr error) {
// Score extracts the value from the batch of changes, then returns the probability score for the
// first bucket whose range [Min, Max] contains the value. Returns an error if no bucket matches.
func (s *heuristicScorer) Score(ctx context.Context, changes entity.BatchChanges) (ret float64, retErr error) {
op := metrics.Begin(s.scope, "score")
defer func() { op.Complete(retErr) }()
value, err := s.valueFunc(ctx, change)
value, err := s.valueFunc(ctx, changes)
if err != nil {
return 0, err
}
Expand Down
8 changes: 4 additions & 4 deletions submitqueue/extension/scorer/heuristic/scorer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (

// staticValue returns a ValueFunc that always returns the given value.
func staticValue(value int) ValueFunc {
return func(_ context.Context, _ entity.Change) (int, error) {
return func(_ context.Context, _ entity.BatchChanges) (int, error) {
return value, nil
}
}
Expand Down Expand Up @@ -107,7 +107,7 @@ func TestScorer_Score(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s := New(tt.buckets, tt.valueFunc, tally.NoopScope)
got, err := s.Score(context.Background(), entity.Change{})
got, err := s.Score(context.Background(), entity.BatchChanges{})
if tt.wantErr {
require.Error(t, err)
return
Expand All @@ -119,11 +119,11 @@ func TestScorer_Score(t *testing.T) {
}

func TestScorer_Score_ValueFuncError(t *testing.T) {
failing := func(_ context.Context, _ entity.Change) (int, error) {
failing := func(_ context.Context, _ entity.BatchChanges) (int, error) {
return 0, assert.AnError
}
s := New([]Bucket{{Min: 0, Max: 10, Score: 0.9}}, failing, tally.NoopScope)
_, err := s.Score(context.Background(), entity.Change{})
_, err := s.Score(context.Background(), entity.BatchChanges{})
require.Error(t, err)
}

Expand Down
8 changes: 4 additions & 4 deletions submitqueue/extension/scorer/mock/scorer_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions submitqueue/extension/scorer/scorer.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@ import (
"github.com/uber/submitqueue/submitqueue/entity"
)

// Scorer computes a success probability score for a change based on its characteristics.
// Scorer computes a success probability score for a batch of changes based on their characteristics.
type Scorer interface {
// Score returns a probability between 0.0 and 1.0 indicating the likelihood
// of a successful land for the given change.
Score(ctx context.Context, change entity.Change) (float64, error)
// of a successful land for the given batch of changes.
Score(ctx context.Context, changes entity.BatchChanges) (float64, error)
}

// Config carries the per-queue identity handed to a Factory. The system knows
Expand Down
47 changes: 37 additions & 10 deletions submitqueue/orchestrator/controller/score/score.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,27 +173,54 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (r
return nil // Success - message will be acked
}

// scoreBatch scores each request's change in the batch and returns the combined probability.
// Uses multiplicative probability: if any single request fails, the entire batch fails,
// so the batch score is the product of individual request scores.
// scoreBatch normalizes the batch's changes and scores them as a whole. It resolves
// each request in the batch, reads that request's change records (one per URI), and
// flattens their provider-supplied details into a single entity.BatchChanges, which
// the scorer turns into one probability for the batch.
func (c *Controller) scoreBatch(ctx context.Context, batch entity.Batch) (float64, error) {
sc, err := c.scorers.For(scorer.Config{QueueName: batch.Queue})
if err != nil {
return 0, fmt.Errorf("failed to build scorer for batch %s: %w", batch.ID, err)
}
score := 1.0

changes, err := c.collectBatchChanges(ctx, batch)
if err != nil {
return 0, err
}

score, err := sc.Score(ctx, changes)
if err != nil {
return 0, fmt.Errorf("failed to score batch %s: %w", batch.ID, err)
}
return score, nil
}

// collectBatchChanges assembles the normalized entity.BatchChanges for a batch by
// resolving each request and reading its change records per URI. For each URI it
// selects the record owned by the request (GetByURI returns rows for all requests
// that ever claimed the URI) and appends its URI + details.
func (c *Controller) collectBatchChanges(ctx context.Context, batch entity.Batch) (entity.BatchChanges, error) {
changes := entity.BatchChanges{BatchID: batch.ID, Queue: batch.Queue}
for _, requestID := range batch.Contains {
request, err := c.store.GetRequestStore().Get(ctx, requestID)
if err != nil {
return 0, fmt.Errorf("failed to get request %s: %w", requestID, err)
return entity.BatchChanges{}, fmt.Errorf("failed to get request %s: %w", requestID, err)
}
s, err := sc.Score(ctx, request.Change)
if err != nil {
return 0, fmt.Errorf("failed to score request %s: %w", requestID, err)
for _, uri := range request.Change.URIs {
records, err := c.store.GetChangeStore().GetByURI(ctx, batch.Queue, uri)
if err != nil {
return entity.BatchChanges{}, fmt.Errorf("failed to read change record for request %s uri=%s: %w", requestID, uri, err)
}
for _, rec := range records {
if rec.RequestID != requestID {
continue
}
changes.Changes = append(changes.Changes, entity.ChangeInfo{URI: rec.URI, Details: rec.Details})
break
}
}
score *= s
}
return score, nil
return changes, nil
}

// publish publishes a batch ID to the specified topic key.
Expand Down
Loading
Loading