diff --git a/example/submitqueue/orchestrator/server/main.go b/example/submitqueue/orchestrator/server/main.go index 11a03d20..5595cc89 100644 --- a/example/submitqueue/orchestrator/server/main.go +++ b/example/submitqueue/orchestrator/server/main.go @@ -515,16 +515,17 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t logger, scope, store, - // TODO: replace with a real scorer + // Heuristic scorer: bucket the batch by total lines changed across all of + // its changes — larger batches are likelier to fail to land. scorerFactory{impl: heuristic.New( []heuristic.Bucket{ - {Min: 0, Max: 1, Score: 0.95}, - {Min: 2, Max: 5, Score: 0.80}, - {Min: 6, Max: 20, Score: 0.60}, - {Min: 21, Max: 1<<31 - 1, Score: 0.40}, + {Min: 0, Max: 50, Score: 0.95}, + {Min: 51, Max: 250, Score: 0.80}, + {Min: 251, Max: 1000, Score: 0.60}, + {Min: 1001, Max: 1<<31 - 1, Score: 0.40}, }, - func(_ context.Context, change entity.Change) (int, error) { - return len(change.URIs), nil + func(_ context.Context, changes entity.BatchChanges) (int, error) { + return changes.TotalLinesChanged(), nil }, scope.SubScope("scorer"), )}, diff --git a/submitqueue/entity/BUILD.bazel b/submitqueue/entity/BUILD.bazel index d99f079b..21a57602 100644 --- a/submitqueue/entity/BUILD.bazel +++ b/submitqueue/entity/BUILD.bazel @@ -4,6 +4,7 @@ go_library( name = "entity", srcs = [ "batch.go", + "batch_changes.go", "batch_dependent.go", "build.go", "cancel_request.go", diff --git a/submitqueue/entity/batch_changes.go b/submitqueue/entity/batch_changes.go new file mode 100644 index 00000000..d97e8a9c --- /dev/null +++ b/submitqueue/entity/batch_changes.go @@ -0,0 +1,48 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package entity + +// BatchChanges is the normalized, batch-level view of all changes in a batch, +// assembled by the score controller and handed to a Scorer. A Batch references +// only request IDs, so the controller resolves each request's change records and +// flattens their details into Changes — giving the scorer the whole batch's +// change facts in one value without coupling it to storage. +type BatchChanges struct { + // BatchID is the batch being scored. Format: "/batch/". + BatchID string + // Queue is the queue the batch belongs to. + Queue string + // Changes is every change (URI + provider-supplied details) across all requests + // in the batch. Order is unspecified. + Changes []ChangeInfo +} + +// TotalLinesChanged returns the total number of lines touched across every change in the batch. +func (b BatchChanges) TotalLinesChanged() int { + total := 0 + for _, c := range b.Changes { + total += c.Details.TotalLinesChanged() + } + return total +} + +// TotalFiles returns the total number of files touched across every change in the batch. +func (b BatchChanges) TotalFiles() int { + total := 0 + for _, c := range b.Changes { + total += c.Details.FileCount() + } + return total +} diff --git a/submitqueue/extension/scorer/composite/scorer.go b/submitqueue/extension/scorer/composite/scorer.go index aa46e8e7..82f84596 100644 --- a/submitqueue/extension/scorer/composite/scorer.go +++ b/submitqueue/extension/scorer/composite/scorer.go @@ -90,13 +90,13 @@ func New(scorers map[string]scorer.Scorer, reduce ReduceFunc, scope tally.Scope) // Score evaluates all child scorers and combines their results using the reduce function. // If any child scorer returns an error, that error is returned immediately. -func (c *compositeScorer) Score(ctx context.Context, change entity.Change) (ret float64, retErr error) { +func (c *compositeScorer) Score(ctx context.Context, changes entity.BatchChanges) (ret float64, retErr error) { op := metrics.Begin(c.scope, "score") defer func() { op.Complete(retErr) }() scores := make(map[string]float64, len(c.scorers)) for name, s := range c.scorers { - score, err := s.Score(ctx, change) + score, err := s.Score(ctx, changes) if err != nil { return 0, err } diff --git a/submitqueue/extension/scorer/composite/scorer_test.go b/submitqueue/extension/scorer/composite/scorer_test.go index 2abec0a1..a6b95878 100644 --- a/submitqueue/extension/scorer/composite/scorer_test.go +++ b/submitqueue/extension/scorer/composite/scorer_test.go @@ -31,14 +31,14 @@ type fixedScorer struct { score float64 } -func (f *fixedScorer) Score(_ context.Context, _ entity.Change) (float64, error) { +func (f *fixedScorer) Score(_ context.Context, _ entity.BatchChanges) (float64, error) { return f.score, nil } // errorScorer always returns an error. type errorScorer struct{} -func (e *errorScorer) Score(_ context.Context, _ entity.Change) (float64, error) { +func (e *errorScorer) Score(_ context.Context, _ entity.BatchChanges) (float64, error) { return 0, fmt.Errorf("scorer failed") } @@ -99,7 +99,7 @@ func TestScorer_Score(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { s := New(tt.scorers, tt.reduce, tally.NoopScope) - got, err := s.Score(context.Background(), entity.Change{}) + got, err := s.Score(context.Background(), entity.BatchChanges{}) require.NoError(t, err) assert.InDelta(t, tt.want, got, 1e-9) }) @@ -111,7 +111,7 @@ func TestScorer_Score_ChildError(t *testing.T) { "error": &errorScorer{}, "files": &fixedScorer{0.9}, }, Min, tally.NoopScope) - _, err := s.Score(context.Background(), entity.Change{}) + _, err := s.Score(context.Background(), entity.BatchChanges{}) require.Error(t, err) } @@ -140,7 +140,7 @@ func TestReduceFunc_ReceivesNames(t *testing.T) { "files": &fixedScorer{0.9}, "deps": &fixedScorer{0.95}, }, custom, tally.NoopScope) - got, err := s.Score(context.Background(), entity.Change{}) + got, err := s.Score(context.Background(), entity.BatchChanges{}) require.NoError(t, err) assert.Equal(t, 0.9, got) assert.ElementsMatch(t, []string{"files", "deps"}, receivedNames) diff --git a/submitqueue/extension/scorer/heuristic/scorer.go b/submitqueue/extension/scorer/heuristic/scorer.go index 94e81482..7203d0e6 100644 --- a/submitqueue/extension/scorer/heuristic/scorer.go +++ b/submitqueue/extension/scorer/heuristic/scorer.go @@ -24,8 +24,8 @@ import ( "github.com/uber/submitqueue/submitqueue/extension/scorer" ) -// ValueFunc extracts a single numeric value from a Change for bucketing. -type ValueFunc func(context.Context, entity.Change) (int, error) +// ValueFunc extracts a single numeric value from a batch of changes for bucketing. +type ValueFunc func(context.Context, entity.BatchChanges) (int, error) // Bucket defines a range [Min, Max] mapped to a probability Score. type Bucket struct { @@ -37,12 +37,12 @@ type Bucket struct { Score float64 } -// heuristicScorer computes a success probability by bucketing a metric extracted from a Change. +// heuristicScorer computes a success probability by bucketing a metric extracted from a batch of changes. // It follows the Java HeuristicsBasedSuccessPredictor pattern. type heuristicScorer struct { // buckets is the list of ranges to match against. buckets []Bucket - // valueFunc extracts the numeric value from a Change. + // valueFunc extracts the numeric value from a batch of changes. valueFunc ValueFunc // scope is the tally scope for emitting metrics. scope tally.Scope @@ -61,12 +61,12 @@ func New(buckets []Bucket, valueFunc ValueFunc, scope tally.Scope) scorer.Scorer } } -// Score extracts the value from the change, then returns the probability score for the first -// bucket whose range [Min, Max] contains the value. Returns an error if no bucket matches. -func (s *heuristicScorer) Score(ctx context.Context, change entity.Change) (ret float64, retErr error) { +// Score extracts the value from the batch of changes, then returns the probability score for the +// first bucket whose range [Min, Max] contains the value. Returns an error if no bucket matches. +func (s *heuristicScorer) Score(ctx context.Context, changes entity.BatchChanges) (ret float64, retErr error) { op := metrics.Begin(s.scope, "score") defer func() { op.Complete(retErr) }() - value, err := s.valueFunc(ctx, change) + value, err := s.valueFunc(ctx, changes) if err != nil { return 0, err } diff --git a/submitqueue/extension/scorer/heuristic/scorer_test.go b/submitqueue/extension/scorer/heuristic/scorer_test.go index 61c8cbf7..98d0bcce 100644 --- a/submitqueue/extension/scorer/heuristic/scorer_test.go +++ b/submitqueue/extension/scorer/heuristic/scorer_test.go @@ -26,7 +26,7 @@ import ( // staticValue returns a ValueFunc that always returns the given value. func staticValue(value int) ValueFunc { - return func(_ context.Context, _ entity.Change) (int, error) { + return func(_ context.Context, _ entity.BatchChanges) (int, error) { return value, nil } } @@ -107,7 +107,7 @@ func TestScorer_Score(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { s := New(tt.buckets, tt.valueFunc, tally.NoopScope) - got, err := s.Score(context.Background(), entity.Change{}) + got, err := s.Score(context.Background(), entity.BatchChanges{}) if tt.wantErr { require.Error(t, err) return @@ -119,11 +119,11 @@ func TestScorer_Score(t *testing.T) { } func TestScorer_Score_ValueFuncError(t *testing.T) { - failing := func(_ context.Context, _ entity.Change) (int, error) { + failing := func(_ context.Context, _ entity.BatchChanges) (int, error) { return 0, assert.AnError } s := New([]Bucket{{Min: 0, Max: 10, Score: 0.9}}, failing, tally.NoopScope) - _, err := s.Score(context.Background(), entity.Change{}) + _, err := s.Score(context.Background(), entity.BatchChanges{}) require.Error(t, err) } diff --git a/submitqueue/extension/scorer/mock/scorer_mock.go b/submitqueue/extension/scorer/mock/scorer_mock.go index 9cfc706f..72edc280 100644 --- a/submitqueue/extension/scorer/mock/scorer_mock.go +++ b/submitqueue/extension/scorer/mock/scorer_mock.go @@ -43,18 +43,18 @@ func (m *MockScorer) EXPECT() *MockScorerMockRecorder { } // Score mocks base method. -func (m *MockScorer) Score(ctx context.Context, change entity.Change) (float64, error) { +func (m *MockScorer) Score(ctx context.Context, changes entity.BatchChanges) (float64, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Score", ctx, change) + ret := m.ctrl.Call(m, "Score", ctx, changes) ret0, _ := ret[0].(float64) ret1, _ := ret[1].(error) return ret0, ret1 } // Score indicates an expected call of Score. -func (mr *MockScorerMockRecorder) Score(ctx, change any) *gomock.Call { +func (mr *MockScorerMockRecorder) Score(ctx, changes any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Score", reflect.TypeOf((*MockScorer)(nil).Score), ctx, change) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Score", reflect.TypeOf((*MockScorer)(nil).Score), ctx, changes) } // MockFactory is a mock of Factory interface. diff --git a/submitqueue/extension/scorer/scorer.go b/submitqueue/extension/scorer/scorer.go index 8b0141ad..6837448e 100644 --- a/submitqueue/extension/scorer/scorer.go +++ b/submitqueue/extension/scorer/scorer.go @@ -22,11 +22,11 @@ import ( "github.com/uber/submitqueue/submitqueue/entity" ) -// Scorer computes a success probability score for a change based on its characteristics. +// Scorer computes a success probability score for a batch of changes based on their characteristics. type Scorer interface { // Score returns a probability between 0.0 and 1.0 indicating the likelihood - // of a successful land for the given change. - Score(ctx context.Context, change entity.Change) (float64, error) + // of a successful land for the given batch of changes. + Score(ctx context.Context, changes entity.BatchChanges) (float64, error) } // Config carries the per-queue identity handed to a Factory. The system knows diff --git a/submitqueue/orchestrator/controller/score/score.go b/submitqueue/orchestrator/controller/score/score.go index 2b1c3e61..6df27ee5 100644 --- a/submitqueue/orchestrator/controller/score/score.go +++ b/submitqueue/orchestrator/controller/score/score.go @@ -173,27 +173,54 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (r return nil // Success - message will be acked } -// scoreBatch scores each request's change in the batch and returns the combined probability. -// Uses multiplicative probability: if any single request fails, the entire batch fails, -// so the batch score is the product of individual request scores. +// scoreBatch normalizes the batch's changes and scores them as a whole. It resolves +// each request in the batch, reads that request's change records (one per URI), and +// flattens their provider-supplied details into a single entity.BatchChanges, which +// the scorer turns into one probability for the batch. func (c *Controller) scoreBatch(ctx context.Context, batch entity.Batch) (float64, error) { sc, err := c.scorers.For(scorer.Config{QueueName: batch.Queue}) if err != nil { return 0, fmt.Errorf("failed to build scorer for batch %s: %w", batch.ID, err) } - score := 1.0 + + changes, err := c.collectBatchChanges(ctx, batch) + if err != nil { + return 0, err + } + + score, err := sc.Score(ctx, changes) + if err != nil { + return 0, fmt.Errorf("failed to score batch %s: %w", batch.ID, err) + } + return score, nil +} + +// collectBatchChanges assembles the normalized entity.BatchChanges for a batch by +// resolving each request and reading its change records per URI. For each URI it +// selects the record owned by the request (GetByURI returns rows for all requests +// that ever claimed the URI) and appends its URI + details. +func (c *Controller) collectBatchChanges(ctx context.Context, batch entity.Batch) (entity.BatchChanges, error) { + changes := entity.BatchChanges{BatchID: batch.ID, Queue: batch.Queue} for _, requestID := range batch.Contains { request, err := c.store.GetRequestStore().Get(ctx, requestID) if err != nil { - return 0, fmt.Errorf("failed to get request %s: %w", requestID, err) + return entity.BatchChanges{}, fmt.Errorf("failed to get request %s: %w", requestID, err) } - s, err := sc.Score(ctx, request.Change) - if err != nil { - return 0, fmt.Errorf("failed to score request %s: %w", requestID, err) + for _, uri := range request.Change.URIs { + records, err := c.store.GetChangeStore().GetByURI(ctx, batch.Queue, uri) + if err != nil { + return entity.BatchChanges{}, fmt.Errorf("failed to read change record for request %s uri=%s: %w", requestID, uri, err) + } + for _, rec := range records { + if rec.RequestID != requestID { + continue + } + changes.Changes = append(changes.Changes, entity.ChangeInfo{URI: rec.URI, Details: rec.Details}) + break + } } - score *= s } - return score, nil + return changes, nil } // publish publishes a batch ID to the specified topic key. diff --git a/submitqueue/orchestrator/controller/score/score_test.go b/submitqueue/orchestrator/controller/score/score_test.go index aec0c974..f57a9bb1 100644 --- a/submitqueue/orchestrator/controller/score/score_test.go +++ b/submitqueue/orchestrator/controller/score/score_test.go @@ -64,7 +64,28 @@ func testRequest() entity.Request { } } -// newMockStorage creates a MockStorage with a MockBatchStore and MockRequestStore. +// mockChangeStore returns a MockChangeStore that serves one self-owned ChangeRecord +// per URI for each request, mirroring what start.claimURIs + validate enrichment +// would have persisted. The score controller reads these via GetByURI to assemble +// the batch's changes. +func mockChangeStore(ctrl *gomock.Controller, requests ...entity.Request) *storagemock.MockChangeStore { + cs := storagemock.NewMockChangeStore(ctrl) + for _, req := range requests { + for _, uri := range req.Change.URIs { + rec := entity.ChangeRecord{ + URI: uri, + RequestID: req.ID, + Queue: req.Queue, + Details: entity.ChangeDetails{ChangedFiles: []entity.ChangedFile{{Path: "f.go", LinesAdded: 5}}}, + Version: 1, + } + cs.EXPECT().GetByURI(gomock.Any(), req.Queue, uri).Return([]entity.ChangeRecord{rec}, nil).AnyTimes() + } + } + return cs +} + +// newMockStorage creates a MockStorage with a MockBatchStore, MockRequestStore, and MockChangeStore. func newMockStorage(ctrl *gomock.Controller, batch entity.Batch, request entity.Request) *storagemock.MockStorage { mockBatchStore := storagemock.NewMockBatchStore(ctrl) mockBatchStore.EXPECT().Get(gomock.Any(), batch.ID).Return(batch, nil).AnyTimes() @@ -76,6 +97,7 @@ func newMockStorage(ctrl *gomock.Controller, batch entity.Batch, request entity. store := storagemock.NewMockStorage(ctrl) store.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes() store.EXPECT().GetRequestStore().Return(mockRequestStore).AnyTimes() + store.EXPECT().GetChangeStore().Return(mockChangeStore(ctrl, request)).AnyTimes() return store } @@ -131,7 +153,7 @@ func TestController_Process_Success(t *testing.T) { store := newMockStorage(ctrl, batch, request) mockScorer := scorermock.NewMockScorer(ctrl) - mockScorer.EXPECT().Score(gomock.Any(), request.Change).Return(0.85, nil) + mockScorer.EXPECT().Score(gomock.Any(), gomock.Any()).Return(0.85, nil) controller := newTestController(t, ctrl, store, mockScorer, nil) @@ -144,7 +166,9 @@ func TestController_Process_Success(t *testing.T) { require.NoError(t, err) } -func TestController_Process_MultipleRequests_MinScore(t *testing.T) { +// TestController_Process_BatchLevelScore verifies the controller assembles all of the +// batch's changes into one BatchChanges and persists the single score the scorer returns. +func TestController_Process_BatchLevelScore(t *testing.T) { ctrl := gomock.NewController(t) batch := entity.Batch{ @@ -172,8 +196,8 @@ func TestController_Process_MultipleRequests_MinScore(t *testing.T) { mockBatchStore := storagemock.NewMockBatchStore(ctrl) mockBatchStore.EXPECT().Get(gomock.Any(), batch.ID).Return(batch, nil) - // Expect the multiplicative score (0.9 * 0.6 = 0.54) to be persisted - mockBatchStore.EXPECT().UpdateScoreAndState(gomock.Any(), batch.ID, batch.Version, batch.Version+1, 0.54, entity.BatchStateScored).Return(nil) + // The single batch-level score is persisted. + mockBatchStore.EXPECT().UpdateScoreAndState(gomock.Any(), batch.ID, batch.Version, batch.Version+1, 0.7, entity.BatchStateScored).Return(nil) mockRequestStore := storagemock.NewMockRequestStore(ctrl) mockRequestStore.EXPECT().Get(gomock.Any(), "test-queue/1").Return(request1, nil) @@ -182,10 +206,17 @@ func TestController_Process_MultipleRequests_MinScore(t *testing.T) { store := storagemock.NewMockStorage(ctrl) store.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes() store.EXPECT().GetRequestStore().Return(mockRequestStore).AnyTimes() + store.EXPECT().GetChangeStore().Return(mockChangeStore(ctrl, request1, request2)).AnyTimes() + // Capture the BatchChanges to assert both requests' changes were gathered. mockScorer := scorermock.NewMockScorer(ctrl) - mockScorer.EXPECT().Score(gomock.Any(), request1.Change).Return(0.9, nil) - mockScorer.EXPECT().Score(gomock.Any(), request2.Change).Return(0.6, nil) + mockScorer.EXPECT().Score(gomock.Any(), gomock.Any()).DoAndReturn( + func(_ context.Context, changes entity.BatchChanges) (float64, error) { + assert.Equal(t, batch.ID, changes.BatchID) + assert.Len(t, changes.Changes, 2) + return 0.7, nil + }, + ) controller := newTestController(t, ctrl, store, mockScorer, nil) @@ -234,9 +265,10 @@ func TestController_Process_ScorerFailure(t *testing.T) { store := storagemock.NewMockStorage(ctrl) store.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes() store.EXPECT().GetRequestStore().Return(mockRequestStore).AnyTimes() + store.EXPECT().GetChangeStore().Return(mockChangeStore(ctrl, request)).AnyTimes() mockScorer := scorermock.NewMockScorer(ctrl) - mockScorer.EXPECT().Score(gomock.Any(), request.Change).Return(0.0, fmt.Errorf("no bucket matches value 99")) + mockScorer.EXPECT().Score(gomock.Any(), gomock.Any()).Return(0.0, fmt.Errorf("no bucket matches value 99")) controller := newTestController(t, ctrl, store, mockScorer, nil) @@ -265,9 +297,10 @@ func TestController_Process_UpdateScoreFailure(t *testing.T) { store := storagemock.NewMockStorage(ctrl) store.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes() store.EXPECT().GetRequestStore().Return(mockRequestStore).AnyTimes() + store.EXPECT().GetChangeStore().Return(mockChangeStore(ctrl, request)).AnyTimes() mockScorer := scorermock.NewMockScorer(ctrl) - mockScorer.EXPECT().Score(gomock.Any(), request.Change).Return(0.85, nil) + mockScorer.EXPECT().Score(gomock.Any(), gomock.Any()).Return(0.85, nil) controller := newTestController(t, ctrl, store, mockScorer, nil) @@ -288,7 +321,7 @@ func TestController_Process_PublishFailure(t *testing.T) { store := newMockStorage(ctrl, batch, request) mockScorer := scorermock.NewMockScorer(ctrl) - mockScorer.EXPECT().Score(gomock.Any(), request.Change).Return(0.85, nil) + mockScorer.EXPECT().Score(gomock.Any(), gomock.Any()).Return(0.85, nil) controller := newTestController(t, ctrl, store, mockScorer, fmt.Errorf("publish failed"))