diff --git a/example/submitqueue/gateway/server/queues.yaml b/example/submitqueue/gateway/server/queues.yaml index df7571fe..5caa3219 100644 --- a/example/submitqueue/gateway/server/queues.yaml +++ b/example/submitqueue/gateway/server/queues.yaml @@ -7,7 +7,3 @@ queues: - name: test-queue - name: e2e-test-queue - name: e2e-cancel-queue - # Routes to an analyzer that always errors (conflictfake.FailAlways) so e2e can - # exercise the conflict-analysis error path. See newQueueRegistry in the - # orchestrator example server. - - name: e2e-conflict-error-queue diff --git a/example/submitqueue/orchestrator/server/main.go b/example/submitqueue/orchestrator/server/main.go index a4dbf09d..a42169ff 100644 --- a/example/submitqueue/orchestrator/server/main.go +++ b/example/submitqueue/orchestrator/server/main.go @@ -779,10 +779,9 @@ func newQueueRegistry(logger *zap.Logger, scope tally.Scope) (queueRegistry, err // // The scorer is wrapped by scorerfake so a change URI carrying // "sq-fake=score-error" forces a scoring error end-to-end; it is a pure - // passthrough otherwise. The analyzer is wrapped by conflictfake with a nil - // predicate (passthrough) — swap the predicate (e.g. conflictfake.FailAlways) - // on a queue to exercise the analyzer error path, as e2e-conflict-error-queue - // below does. + // passthrough otherwise. The analyzer is wrapped by conflictfake so a change + // URI carrying "sq-fake=conflict-error" forces a conflict-analysis error; + // passthrough otherwise. base := queueExtensions{ mergeChecker: mc, changeProvider: cp, @@ -794,7 +793,7 @@ func newQueueRegistry(logger *zap.Logger, scope tally.Scope) (queueRegistry, err )), // TODO: replace the delegate with a real analyzer (e.g. Tango target // analysis). "all" serializes the queue conservatively. - analyzer: conflictfake.New(all.New(), nil), + analyzer: conflictfake.New(all.New()), } // test-queue: bucketed heuristic scorer; conservative (serialized) conflicts @@ -812,7 +811,7 @@ func newQueueRegistry(logger *zap.Logger, scope tally.Scope) (queueRegistry, err // e2e-test-queue: composite scorer; no conflicts (maximum parallelism). e2eQueue := base - e2eQueue.analyzer = conflictfake.New(none.New(), nil) + e2eQueue.analyzer = conflictfake.New(none.New()) e2eQueue.scorer = scorerfake.New(composite.New( map[string]scorer.Scorer{ "size": heuristic.New([]heuristic.Bucket{{Min: 0, Max: 1<<31 - 1, Score: 0.8}}, batchLines, scope), @@ -821,17 +820,11 @@ func newQueueRegistry(logger *zap.Logger, scope tally.Scope) (queueRegistry, err composite.Avg, scope.SubScope("scorer.e2e-test-queue"), )) - // e2e-conflict-error-queue: every conflict analysis fails, exercising the - // analyzer error path. Scorer/edge integrations inherit the baseline. - conflictErrQueue := base - conflictErrQueue.analyzer = conflictfake.New(all.New(), conflictfake.FailAlways) - return queueRegistry{ def: base, byQueue: map[string]queueExtensions{ - "test-queue": testQueue, - "e2e-test-queue": e2eQueue, - "e2e-conflict-error-queue": conflictErrQueue, + "test-queue": testQueue, + "e2e-test-queue": e2eQueue, }, }, nil } diff --git a/submitqueue/core/batchchanges/BUILD.bazel b/submitqueue/core/batchchanges/BUILD.bazel new file mode 100644 index 00000000..7bad3139 --- /dev/null +++ b/submitqueue/core/batchchanges/BUILD.bazel @@ -0,0 +1,25 @@ +load("@rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "batchchanges", + srcs = ["batchchanges.go"], + importpath = "github.com/uber/submitqueue/submitqueue/core/batchchanges", + visibility = ["//visibility:public"], + deps = [ + "//submitqueue/entity", + "//submitqueue/extension/storage", + ], +) + +go_test( + name = "batchchanges_test", + srcs = ["batchchanges_test.go"], + embed = [":batchchanges"], + deps = [ + "//submitqueue/entity", + "//submitqueue/extension/storage/mock", + "@com_github_stretchr_testify//assert", + "@com_github_stretchr_testify//require", + "@org_uber_go_mock//gomock", + ], +) diff --git a/submitqueue/core/batchchanges/batchchanges.go b/submitqueue/core/batchchanges/batchchanges.go new file mode 100644 index 00000000..0ed1b429 --- /dev/null +++ b/submitqueue/core/batchchanges/batchchanges.go @@ -0,0 +1,57 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package batchchanges assembles the normalized, batch-level view of a batch's +// changes (entity.BatchChanges) from storage. A Batch references only request +// IDs, so resolving the actual change facts requires reading each request and +// its per-URI change records. Centralizing this here keeps that storage +// traversal out of the extensions (scorer, conflict analyzer), which consume +// entity.BatchChanges and must not touch storage themselves. +package batchchanges + +import ( + "context" + "fmt" + + "github.com/uber/submitqueue/submitqueue/entity" + "github.com/uber/submitqueue/submitqueue/extension/storage" +) + +// Collect assembles the normalized entity.BatchChanges for a batch by resolving +// each request and reading its change records per URI. For each URI it selects +// the record owned by the request (GetByURI returns rows for every request that +// ever claimed the URI) and appends its URI + provider-supplied details. +func Collect(ctx context.Context, store storage.Storage, batch entity.Batch) (entity.BatchChanges, error) { + changes := entity.BatchChanges{BatchID: batch.ID, Queue: batch.Queue} + for _, requestID := range batch.Contains { + request, err := store.GetRequestStore().Get(ctx, requestID) + if err != nil { + return entity.BatchChanges{}, fmt.Errorf("failed to get request %s: %w", requestID, err) + } + for _, uri := range request.Change.URIs { + records, err := store.GetChangeStore().GetByURI(ctx, batch.Queue, uri) + if err != nil { + return entity.BatchChanges{}, fmt.Errorf("failed to read change record for request %s uri=%s: %w", requestID, uri, err) + } + for _, rec := range records { + if rec.RequestID != requestID { + continue + } + changes.Changes = append(changes.Changes, entity.ChangeInfo{URI: rec.URI, Details: rec.Details}) + break + } + } + } + return changes, nil +} diff --git a/submitqueue/core/batchchanges/batchchanges_test.go b/submitqueue/core/batchchanges/batchchanges_test.go new file mode 100644 index 00000000..c0fe6ba4 --- /dev/null +++ b/submitqueue/core/batchchanges/batchchanges_test.go @@ -0,0 +1,61 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package batchchanges + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/uber/submitqueue/submitqueue/entity" + storagemock "github.com/uber/submitqueue/submitqueue/extension/storage/mock" + "go.uber.org/mock/gomock" +) + +func TestCollect(t *testing.T) { + ctrl := gomock.NewController(t) + + batch := entity.Batch{ID: "q/batch/1", Queue: "q", Contains: []string{"q/1", "q/2"}} + req1 := entity.Request{ID: "q/1", Change: entity.Change{URIs: []string{"github://o/r/pull/1/a"}}} + req2 := entity.Request{ID: "q/2", Change: entity.Change{URIs: []string{"github://o/r/pull/2/b"}}} + rec1 := entity.ChangeRecord{Queue: "q", URI: "github://o/r/pull/1/a", RequestID: "q/1", + Details: entity.ChangeDetails{ChangedFiles: []entity.ChangedFile{{Path: "f1", LinesAdded: 3}}}} + // A stray record for the same URI owned by a different request must be skipped. + recOther := entity.ChangeRecord{Queue: "q", URI: "github://o/r/pull/1/a", RequestID: "q/999"} + rec2 := entity.ChangeRecord{Queue: "q", URI: "github://o/r/pull/2/b", RequestID: "q/2", + Details: entity.ChangeDetails{ChangedFiles: []entity.ChangedFile{{Path: "f2", LinesAdded: 5}}}} + + reqStore := storagemock.NewMockRequestStore(ctrl) + reqStore.EXPECT().Get(gomock.Any(), "q/1").Return(req1, nil) + reqStore.EXPECT().Get(gomock.Any(), "q/2").Return(req2, nil) + + changeStore := storagemock.NewMockChangeStore(ctrl) + changeStore.EXPECT().GetByURI(gomock.Any(), "q", "github://o/r/pull/1/a").Return([]entity.ChangeRecord{recOther, rec1}, nil) + changeStore.EXPECT().GetByURI(gomock.Any(), "q", "github://o/r/pull/2/b").Return([]entity.ChangeRecord{rec2}, nil) + + store := storagemock.NewMockStorage(ctrl) + store.EXPECT().GetRequestStore().Return(reqStore).AnyTimes() + store.EXPECT().GetChangeStore().Return(changeStore).AnyTimes() + + got, err := Collect(context.Background(), store, batch) + require.NoError(t, err) + assert.Equal(t, "q/batch/1", got.BatchID) + assert.Equal(t, "q", got.Queue) + require.Len(t, got.Changes, 2) + assert.Equal(t, "github://o/r/pull/1/a", got.Changes[0].URI) + assert.Equal(t, "github://o/r/pull/2/b", got.Changes[1].URI) + assert.Equal(t, 8, got.TotalLinesChanged()) +} diff --git a/submitqueue/extension/conflict/README.md b/submitqueue/extension/conflict/README.md index 4813f641..daeb373e 100644 --- a/submitqueue/extension/conflict/README.md +++ b/submitqueue/extension/conflict/README.md @@ -5,23 +5,29 @@ and the batches already in flight. ## Interface -`Analyzer` exposes a single `Analyze` method that takes the candidate batch -and the list of in-flight batches it might conflict with. It returns the -subset of in-flight batches that conflict with the candidate, each paired -with a `ConflictType` describing the kind of conflict. An empty result means +`Analyzer` exposes a single `Analyze` method that takes the candidate batch's +changes and the list of in-flight batches' changes it might conflict with. +Inputs are `entity.BatchChanges` — each batch's flattened change facts (URIs + +provider details), assembled by the caller — so the analyzer sees the actual +changes rather than just batch IDs, and can reason about changed targets/files +without reading storage itself. It returns the subset of in-flight batches that +conflict with the candidate, each paired with a `ConflictType` describing the +kind of conflict (referenced by `BatchChanges.BatchID`). An empty result means the candidate is free to advance independently. -Callers are responsible for filtering out the candidate itself and any -terminal batches from the in-flight list before invoking the analyzer. The -analyzer itself stays free of lifecycle knowledge. A non-nil error reports -an infrastructure failure of the analysis and should be treated as -retryable by the caller. +Callers are responsible for assembling `entity.BatchChanges` (see +`submitqueue/core/batchchanges`), and for filtering out the candidate itself and +any terminal batches from the in-flight list before invoking the analyzer. The +analyzer itself stays free of lifecycle knowledge. A non-nil error reports an +infrastructure failure of the analysis and should be treated as retryable by the +caller. -The analyzer is intentionally pure with respect to batch state: it does not -mutate inputs, does not read storage, and may be called concurrently. Real -implementations are expected to resolve the batch contents (e.g. changed -build targets, modified files) via whichever upstream system they depend -on, and to return as much classification detail as that system supports. +The analyzer is intentionally pure with respect to storage: it does not mutate +inputs, does not read storage, and may be called concurrently. A real +implementation (e.g. one backed by uber/tango) derives changed build +targets/edges from the change set — `Changes[].URI` carries the repo + head +commit SHA, and the target branch is injected per queue at construction — and +returns as much classification detail as that system supports. ## Implementations @@ -35,8 +41,10 @@ on, and to return as much classification detail as that system supports. ## Adding a new backend 1. Create `extension/conflict/{backend}/` with an `Analyzer` implementation. -2. Resolve each `entity.Batch` into whatever signal the backend needs - (e.g. changed build targets, files touched, dependency graphs). +2. Derive whatever signal the backend needs from each `entity.BatchChanges` + (e.g. changed build targets, files touched, dependency graphs) — the change + URIs and provider details are in hand; resolve the rest via your upstream + system. 3. Emit one `Conflict` per (in-flight batch, detected conflict type). Pick the most specific `ConflictType` your backend can determine; use `ConflictTypeConservative` only when the backend cannot prove the absence diff --git a/submitqueue/extension/conflict/all/all.go b/submitqueue/extension/conflict/all/all.go index d9e91743..95dc57c6 100644 --- a/submitqueue/extension/conflict/all/all.go +++ b/submitqueue/extension/conflict/all/all.go @@ -36,14 +36,14 @@ func New() conflict.Analyzer { // Analyze returns one ConflictTypeConservative Conflict per in-flight batch, // preserving the input order. Returns an empty slice when inFlight is empty. -func (analyzer) Analyze(_ context.Context, _ entity.Batch, inFlight []entity.Batch) ([]conflict.Conflict, error) { +func (analyzer) Analyze(_ context.Context, _ entity.BatchChanges, inFlight []entity.BatchChanges) ([]conflict.Conflict, error) { if len(inFlight) == 0 { return nil, nil } conflicts := make([]conflict.Conflict, len(inFlight)) for i, b := range inFlight { conflicts[i] = conflict.Conflict{ - BatchID: b.ID, + BatchID: b.BatchID, Type: conflict.ConflictTypeConservative, } } diff --git a/submitqueue/extension/conflict/all/all_test.go b/submitqueue/extension/conflict/all/all_test.go index df32419c..97148405 100644 --- a/submitqueue/extension/conflict/all/all_test.go +++ b/submitqueue/extension/conflict/all/all_test.go @@ -25,11 +25,11 @@ import ( ) func TestAnalyze(t *testing.T) { - batch := entity.Batch{ID: "queueA/batch/10"} + candidate := entity.BatchChanges{BatchID: "queueA/batch/10"} tests := []struct { name string - inFlight []entity.Batch + inFlight []entity.BatchChanges want []conflict.Conflict }{ { @@ -39,15 +39,15 @@ func TestAnalyze(t *testing.T) { }, { name: "empty in-flight slice yields no conflicts", - inFlight: []entity.Batch{}, + inFlight: []entity.BatchChanges{}, want: nil, }, { name: "every in-flight batch is reported in input order", - inFlight: []entity.Batch{ - {ID: "queueA/batch/1"}, - {ID: "queueA/batch/2"}, - {ID: "queueA/batch/3"}, + inFlight: []entity.BatchChanges{ + {BatchID: "queueA/batch/1"}, + {BatchID: "queueA/batch/2"}, + {BatchID: "queueA/batch/3"}, }, want: []conflict.Conflict{ {BatchID: "queueA/batch/1", Type: conflict.ConflictTypeConservative}, @@ -60,7 +60,7 @@ func TestAnalyze(t *testing.T) { a := New() for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - got, err := a.Analyze(context.Background(), batch, tt.inFlight) + got, err := a.Analyze(context.Background(), candidate, tt.inFlight) require.NoError(t, err) assert.Equal(t, tt.want, got) }) diff --git a/submitqueue/extension/conflict/conflict.go b/submitqueue/extension/conflict/conflict.go index 3c1d1e5c..d06d8cc9 100644 --- a/submitqueue/extension/conflict/conflict.go +++ b/submitqueue/extension/conflict/conflict.go @@ -56,15 +56,21 @@ type Conflict struct { // already in flight, so the speculation layer can decide which batches can // safely advance in parallel. type Analyzer interface { - // Analyze returns the subset of inFlight batches that conflict with - // batch, each paired with the type of conflict detected. An empty - // result means batch does not conflict with any in-flight batch. + // Analyze returns the subset of inFlight batches that conflict with the + // candidate, each paired with the type of conflict detected. An empty + // result means the candidate does not conflict with any in-flight batch. // - // Callers should not include batch itself in inFlight; terminal batches - // should be filtered out before calling. A non-nil error indicates the - // analysis itself failed (infrastructure issue) and should be treated as - // retryable by the caller. - Analyze(ctx context.Context, batch entity.Batch, inFlight []entity.Batch) ([]Conflict, error) + // Inputs are entity.BatchChanges — each batch's flattened change facts + // (URIs + provider details), assembled by the caller — so the analyzer + // sees the actual changes rather than just batch IDs, and can reason about + // changed targets/files without touching storage. A conflict references the + // in-flight batch by its BatchChanges.BatchID. + // + // Callers should not include the candidate itself in inFlight; terminal + // batches should be filtered out before calling. A non-nil error indicates + // the analysis itself failed (infrastructure issue) and should be treated + // as retryable by the caller. + Analyze(ctx context.Context, candidate entity.BatchChanges, inFlight []entity.BatchChanges) ([]Conflict, error) } // Config carries the per-queue identity handed to a Factory. The system knows diff --git a/submitqueue/extension/conflict/fake/BUILD.bazel b/submitqueue/extension/conflict/fake/BUILD.bazel index 34919e4a..c8c02c6d 100644 --- a/submitqueue/extension/conflict/fake/BUILD.bazel +++ b/submitqueue/extension/conflict/fake/BUILD.bazel @@ -6,6 +6,7 @@ go_library( importpath = "github.com/uber/submitqueue/submitqueue/extension/conflict/fake", visibility = ["//visibility:public"], deps = [ + "//submitqueue/core/fakemarker", "//submitqueue/entity", "//submitqueue/extension/conflict", ], diff --git a/submitqueue/extension/conflict/fake/fake.go b/submitqueue/extension/conflict/fake/fake.go index 0f5c0398..c9e5f38b 100644 --- a/submitqueue/extension/conflict/fake/fake.go +++ b/submitqueue/extension/conflict/fake/fake.go @@ -14,12 +14,14 @@ // Package fake provides a conflict.Analyzer that decorates an existing analyzer: // it delegates to the wrapped implementation for the happy path, but injects an -// error when a caller-supplied predicate matches. +// error when a candidate change URI carries a failure marker of the form +// "sq-fake=": // -// Unlike the change-facing fakes, Analyze operates on batches — it never sees -// change URIs — so error injection is predicate-driven rather than marker-driven. -// To exercise the analyzer's error path in e2e, route a queue to an analyzer -// built with a failing predicate (e.g. FailAlways) via the queue wiring. It is +// sq-fake=conflict-error -> non-nil error (the delegate is not called) +// +// Because the analyzer now receives the candidate's changes (entity.BatchChanges +// with URIs), the same URI-marker convention used by the other fakes works here — +// a land request can drive the conflict-analysis error path end-to-end. It is // intended for examples and tests only, never production. package fake @@ -27,36 +29,43 @@ import ( "context" "fmt" + "github.com/uber/submitqueue/submitqueue/core/fakemarker" "github.com/uber/submitqueue/submitqueue/entity" "github.com/uber/submitqueue/submitqueue/extension/conflict" ) -// FailOn decides whether Analyze should inject an error for the given inputs. -type FailOn func(batch entity.Batch, inFlight []entity.Batch) bool - -// FailAlways is a FailOn that injects an error on every Analyze call. -func FailAlways(entity.Batch, []entity.Batch) bool { return true } +// Recognized marker token. See the package doc for the convention. +const tokenError = "conflict-error" -// analyzerFake decorates a delegate Analyzer, injecting an error when failOn -// reports true. +// analyzerFake decorates a delegate Analyzer, injecting an error when a +// candidate change URI carries the failure marker. type analyzerFake struct { delegate conflict.Analyzer - failOn FailOn } // New returns a conflict.Analyzer that delegates to the given analyzer but -// returns an error when failOn reports true for the call's inputs. The delegate -// is the existing analyzer implementation to wrap (e.g. all or none). A nil -// failOn never injects an error (pure passthrough). -func New(delegate conflict.Analyzer, failOn FailOn) conflict.Analyzer { - return analyzerFake{delegate: delegate, failOn: failOn} +// returns an error when a candidate change URI carries the +// "sq-fake=conflict-error" marker. The delegate is the existing analyzer +// implementation to wrap (e.g. all or none). +func New(delegate conflict.Analyzer) conflict.Analyzer { + return analyzerFake{delegate: delegate} +} + +// Analyze returns an error when a candidate change URI carries the failure +// marker; otherwise it delegates to the wrapped analyzer. +func (a analyzerFake) Analyze(ctx context.Context, candidate entity.BatchChanges, inFlight []entity.BatchChanges) ([]conflict.Conflict, error) { + if markerToken(candidate) == tokenError { + return nil, fmt.Errorf("fake: marked conflict-analysis error for batch %q", candidate.BatchID) + } + return a.delegate.Analyze(ctx, candidate, inFlight) } -// Analyze returns an error when failOn reports true; otherwise it delegates to -// the wrapped analyzer. -func (a analyzerFake) Analyze(ctx context.Context, batch entity.Batch, inFlight []entity.Batch) ([]conflict.Conflict, error) { - if a.failOn != nil && a.failOn(batch, inFlight) { - return nil, fmt.Errorf("fake: injected analyze error for batch %q", batch.ID) +// markerToken returns the marker token embedded in the first candidate change +// URI that carries one, or "" if none do. +func markerToken(changes entity.BatchChanges) string { + uris := make([]string, 0, len(changes.Changes)) + for _, c := range changes.Changes { + uris = append(uris, c.URI) } - return a.delegate.Analyze(ctx, batch, inFlight) + return fakemarker.Token(uris) } diff --git a/submitqueue/extension/conflict/fake/fake_test.go b/submitqueue/extension/conflict/fake/fake_test.go index 9004766a..cef4a8e1 100644 --- a/submitqueue/extension/conflict/fake/fake_test.go +++ b/submitqueue/extension/conflict/fake/fake_test.go @@ -27,42 +27,39 @@ import ( ) func TestNew_ImplementsInterface(t *testing.T) { - var _ conflict.Analyzer = New(none.New(), nil) + var _ conflict.Analyzer = New(none.New()) } -func TestAnalyze_DelegatesWhenNoFailOn(t *testing.T) { - // Delegate to "all": one conflict per in-flight batch. nil failOn -> passthrough. - a := New(all.New(), nil) - got, err := a.Analyze(context.Background(), - entity.Batch{ID: "q/batch/1"}, - []entity.Batch{{ID: "q/batch/2"}, {ID: "q/batch/3"}}) - require.NoError(t, err) - assert.Len(t, got, 2) +func batch(id string, uris ...string) entity.BatchChanges { + changes := make([]entity.ChangeInfo, 0, len(uris)) + for _, u := range uris { + changes = append(changes, entity.ChangeInfo{URI: u}) + } + return entity.BatchChanges{BatchID: id, Queue: "q", Changes: changes} } -func TestAnalyze_DelegatesWhenFailOnFalse(t *testing.T) { - a := New(none.New(), func(entity.Batch, []entity.Batch) bool { return false }) - got, err := a.Analyze(context.Background(), entity.Batch{ID: "q/batch/1"}, nil) +func TestAnalyze_DelegatesWhenUnmarked(t *testing.T) { + // Delegate to "all": one conflict per in-flight batch. + a := New(all.New()) + got, err := a.Analyze(context.Background(), + batch("q/batch/1", "github://o/r/pull/1/a"), + []entity.BatchChanges{batch("q/batch/2"), batch("q/batch/3")}) require.NoError(t, err) - assert.Empty(t, got) + assert.Len(t, got, 2) } -func TestAnalyze_FailAlways(t *testing.T) { - a := New(none.New(), FailAlways) - _, err := a.Analyze(context.Background(), entity.Batch{ID: "q/batch/1"}, nil) +func TestAnalyze_ErrorMarker(t *testing.T) { + a := New(all.New()) + _, err := a.Analyze(context.Background(), + batch("q/batch/1", "github://o/r/pull/1/a?sq-fake=conflict-error"), + []entity.BatchChanges{batch("q/batch/2")}) require.Error(t, err) } -func TestAnalyze_FailOnPredicate(t *testing.T) { - // Inject an error only for a specific batch ID. - a := New(none.New(), func(b entity.Batch, _ []entity.Batch) bool { - return b.ID == "q/batch/bad" - }) - - _, err := a.Analyze(context.Background(), entity.Batch{ID: "q/batch/bad"}, nil) +func TestAnalyze_MarkerOnSecondURI(t *testing.T) { + a := New(none.New()) + _, err := a.Analyze(context.Background(), + batch("q/batch/1", "github://o/r/pull/1/a", "github://o/r/pull/2/b?sq-fake=conflict-error"), + nil) require.Error(t, err) - - got, err := a.Analyze(context.Background(), entity.Batch{ID: "q/batch/ok"}, nil) - require.NoError(t, err) - assert.Empty(t, got) } diff --git a/submitqueue/extension/conflict/mock/conflict_mock.go b/submitqueue/extension/conflict/mock/conflict_mock.go index e0a795ca..c492a8f6 100644 --- a/submitqueue/extension/conflict/mock/conflict_mock.go +++ b/submitqueue/extension/conflict/mock/conflict_mock.go @@ -43,18 +43,18 @@ func (m *MockAnalyzer) EXPECT() *MockAnalyzerMockRecorder { } // Analyze mocks base method. -func (m *MockAnalyzer) Analyze(ctx context.Context, batch entity.Batch, inFlight []entity.Batch) ([]conflict.Conflict, error) { +func (m *MockAnalyzer) Analyze(ctx context.Context, candidate entity.BatchChanges, inFlight []entity.BatchChanges) ([]conflict.Conflict, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Analyze", ctx, batch, inFlight) + ret := m.ctrl.Call(m, "Analyze", ctx, candidate, inFlight) ret0, _ := ret[0].([]conflict.Conflict) ret1, _ := ret[1].(error) return ret0, ret1 } // Analyze indicates an expected call of Analyze. -func (mr *MockAnalyzerMockRecorder) Analyze(ctx, batch, inFlight any) *gomock.Call { +func (mr *MockAnalyzerMockRecorder) Analyze(ctx, candidate, inFlight any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Analyze", reflect.TypeOf((*MockAnalyzer)(nil).Analyze), ctx, batch, inFlight) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Analyze", reflect.TypeOf((*MockAnalyzer)(nil).Analyze), ctx, candidate, inFlight) } // MockFactory is a mock of Factory interface. diff --git a/submitqueue/extension/conflict/none/none.go b/submitqueue/extension/conflict/none/none.go index ec48d362..dd7f14b2 100644 --- a/submitqueue/extension/conflict/none/none.go +++ b/submitqueue/extension/conflict/none/none.go @@ -33,6 +33,6 @@ func New() conflict.Analyzer { } // Analyze always returns a nil conflict slice, regardless of inputs. -func (analyzer) Analyze(_ context.Context, _ entity.Batch, _ []entity.Batch) ([]conflict.Conflict, error) { +func (analyzer) Analyze(_ context.Context, _ entity.BatchChanges, _ []entity.BatchChanges) ([]conflict.Conflict, error) { return nil, nil } diff --git a/submitqueue/extension/conflict/none/none_test.go b/submitqueue/extension/conflict/none/none_test.go index fa45c30c..553bbe8f 100644 --- a/submitqueue/extension/conflict/none/none_test.go +++ b/submitqueue/extension/conflict/none/none_test.go @@ -24,20 +24,20 @@ import ( ) func TestAnalyze(t *testing.T) { - batch := entity.Batch{ID: "queueA/batch/10"} + candidate := entity.BatchChanges{BatchID: "queueA/batch/10"} tests := []struct { name string - inFlight []entity.Batch + inFlight []entity.BatchChanges }{ {name: "no in-flight batches", inFlight: nil}, - {name: "empty in-flight slice", inFlight: []entity.Batch{}}, + {name: "empty in-flight slice", inFlight: []entity.BatchChanges{}}, { name: "many in-flight batches", - inFlight: []entity.Batch{ - {ID: "queueA/batch/1"}, - {ID: "queueA/batch/2"}, - {ID: "queueA/batch/3"}, + inFlight: []entity.BatchChanges{ + {BatchID: "queueA/batch/1"}, + {BatchID: "queueA/batch/2"}, + {BatchID: "queueA/batch/3"}, }, }, } @@ -45,7 +45,7 @@ func TestAnalyze(t *testing.T) { a := New() for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - got, err := a.Analyze(context.Background(), batch, tt.inFlight) + got, err := a.Analyze(context.Background(), candidate, tt.inFlight) require.NoError(t, err) assert.Empty(t, got) }) diff --git a/submitqueue/orchestrator/controller/batch/BUILD.bazel b/submitqueue/orchestrator/controller/batch/BUILD.bazel index 68839362..8aa87717 100644 --- a/submitqueue/orchestrator/controller/batch/BUILD.bazel +++ b/submitqueue/orchestrator/controller/batch/BUILD.bazel @@ -9,6 +9,7 @@ go_library( "//core/metrics", "//entity/messagequeue", "//extension/counter", + "//submitqueue/core/batchchanges", "//submitqueue/core/consumer", "//submitqueue/entity", "//submitqueue/extension/conflict", diff --git a/submitqueue/orchestrator/controller/batch/batch.go b/submitqueue/orchestrator/controller/batch/batch.go index e111d754..40b26eac 100644 --- a/submitqueue/orchestrator/controller/batch/batch.go +++ b/submitqueue/orchestrator/controller/batch/batch.go @@ -23,6 +23,7 @@ import ( "github.com/uber/submitqueue/core/metrics" entityqueue "github.com/uber/submitqueue/entity/messagequeue" "github.com/uber/submitqueue/extension/counter" + "github.com/uber/submitqueue/submitqueue/core/batchchanges" "github.com/uber/submitqueue/submitqueue/core/consumer" "github.com/uber/submitqueue/submitqueue/entity" "github.com/uber/submitqueue/submitqueue/extension/conflict" @@ -154,7 +155,25 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (r metrics.NamedCounter(c.metricsScope, opName, "conflict_analyzer_errors", 1) return fmt.Errorf("failed to build conflict analyzer for queue=%s: %w", batch.Queue, err) } - conflicts, err := analyzer.Analyze(ctx, batch, activeBatches) + // Resolve the candidate's and each in-flight batch's changes so the analyzer + // sees the actual change set (URIs + provider details), not just batch IDs. + // This keeps the storage traversal in the controller, out of the extension. + candidateChanges, err := batchchanges.Collect(ctx, c.store, batch) + if err != nil { + metrics.NamedCounter(c.metricsScope, opName, "batch_store_errors", 1) + return fmt.Errorf("failed to collect changes for batchID=%s: %w", batch.ID, err) + } + inFlightChanges := make([]entity.BatchChanges, 0, len(activeBatches)) + for _, ab := range activeBatches { + abChanges, err := batchchanges.Collect(ctx, c.store, ab) + if err != nil { + metrics.NamedCounter(c.metricsScope, opName, "batch_store_errors", 1) + return fmt.Errorf("failed to collect changes for in-flight batchID=%s: %w", ab.ID, err) + } + inFlightChanges = append(inFlightChanges, abChanges) + } + + conflicts, err := analyzer.Analyze(ctx, candidateChanges, inFlightChanges) if err != nil { metrics.NamedCounter(c.metricsScope, opName, "conflict_analyzer_errors", 1) return fmt.Errorf("failed to analyze conflicts for batchID=%s: %w", batch.ID, err) diff --git a/submitqueue/orchestrator/controller/batch/batch_test.go b/submitqueue/orchestrator/controller/batch/batch_test.go index d881de9f..2b8e0729 100644 --- a/submitqueue/orchestrator/controller/batch/batch_test.go +++ b/submitqueue/orchestrator/controller/batch/batch_test.go @@ -57,6 +57,14 @@ func newSequentialCounter(ctrl *gomock.Controller) *countermock.MockCounter { return cnt } +// anyChangeStore returns a change store that answers any GetByURI with no +// records — enough for batchchanges.Collect to assemble empty change facts. +func anyChangeStore(ctrl *gomock.Controller) *storagemock.MockChangeStore { + cs := storagemock.NewMockChangeStore(ctrl) + cs.EXPECT().GetByURI(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes() + return cs +} + // testRequest returns a standard test request for batch tests. func testRequest() entity.Request { return entity.Request{ @@ -86,6 +94,11 @@ func newTestController(t *testing.T, ctrl *gomock.Controller, cnt *countermock.M mockReqStore.EXPECT().Get(gomock.Any(), req.ID).Return(req, nil).AnyTimes() mockReqStore.EXPECT().UpdateState(gomock.Any(), req.ID, req.Version, req.Version+1, entity.RequestStateBatched).Return(nil).AnyTimes() + // The batch controller assembles the candidate's changes via + // batchchanges.Collect, which reads the change store per URI. + mockChangeStore := storagemock.NewMockChangeStore(ctrl) + mockChangeStore.EXPECT().GetByURI(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes() + mockBatchDependentStore := storagemock.NewMockBatchDependentStore(ctrl) mockBatchDependentStore.EXPECT().Create(gomock.Any(), gomock.Any()).Return(nil).AnyTimes() @@ -93,6 +106,7 @@ func newTestController(t *testing.T, ctrl *gomock.Controller, cnt *countermock.M mockStorage.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes() mockStorage.EXPECT().GetBatchDependentStore().Return(mockBatchDependentStore).AnyTimes() mockStorage.EXPECT().GetRequestStore().Return(mockReqStore).AnyTimes() + mockStorage.EXPECT().GetChangeStore().Return(mockChangeStore).AnyTimes() } if analyzer == nil { @@ -237,13 +251,14 @@ func TestController_Process_WithDependencies(t *testing.T) { mockBatchDependentStore.EXPECT().Create(gomock.Any(), gomock.Any()).Return(nil) mockReqStore := storagemock.NewMockRequestStore(ctrl) - mockReqStore.EXPECT().Get(gomock.Any(), request.ID).Return(request, nil) + mockReqStore.EXPECT().Get(gomock.Any(), request.ID).Return(request, nil).AnyTimes() mockReqStore.EXPECT().UpdateState(gomock.Any(), request.ID, request.Version, request.Version+1, entity.RequestStateBatched).Return(nil) mockStorage := storagemock.NewMockStorage(ctrl) mockStorage.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes() mockStorage.EXPECT().GetBatchDependentStore().Return(mockBatchDependentStore).AnyTimes() mockStorage.EXPECT().GetRequestStore().Return(mockReqStore).AnyTimes() + mockStorage.EXPECT().GetChangeStore().Return(anyChangeStore(ctrl)).AnyTimes() controller := newTestController(t, ctrl, newSequentialCounter(ctrl), mockStorage, nil, nil) @@ -281,13 +296,14 @@ func TestController_Process_AnalyzerSelectsSubset(t *testing.T) { mockBatchDependentStore.EXPECT().Create(gomock.Any(), gomock.Any()).Return(nil) mockReqStore := storagemock.NewMockRequestStore(ctrl) - mockReqStore.EXPECT().Get(gomock.Any(), request.ID).Return(request, nil) + mockReqStore.EXPECT().Get(gomock.Any(), request.ID).Return(request, nil).AnyTimes() mockReqStore.EXPECT().UpdateState(gomock.Any(), request.ID, request.Version, request.Version+1, entity.RequestStateBatched).Return(nil) mockStorage := storagemock.NewMockStorage(ctrl) mockStorage.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes() mockStorage.EXPECT().GetBatchDependentStore().Return(mockBatchDependentStore).AnyTimes() mockStorage.EXPECT().GetRequestStore().Return(mockReqStore).AnyTimes() + mockStorage.EXPECT().GetChangeStore().Return(anyChangeStore(ctrl)).AnyTimes() // Analyzer returns duplicate Conflict entries for the same batch (different // conflict types) to prove the controller dedupes by BatchID. @@ -317,11 +333,12 @@ func TestController_Process_AnalyzerFailure(t *testing.T) { mockBatchStore.EXPECT().GetByQueueAndStates(gomock.Any(), "test-queue", gomock.Any()).Return(nil, nil) mockReqStore := storagemock.NewMockRequestStore(ctrl) - mockReqStore.EXPECT().Get(gomock.Any(), request.ID).Return(request, nil) + mockReqStore.EXPECT().Get(gomock.Any(), request.ID).Return(request, nil).AnyTimes() mockStorage := storagemock.NewMockStorage(ctrl) mockStorage.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes() mockStorage.EXPECT().GetRequestStore().Return(mockReqStore).AnyTimes() + mockStorage.EXPECT().GetChangeStore().Return(anyChangeStore(ctrl)).AnyTimes() analyzer := conflictmock.NewMockAnalyzer(ctrl) analyzer.EXPECT().Analyze(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, fmt.Errorf("analyzer down")) @@ -372,7 +389,7 @@ func TestController_Process_HaltedShortCircuit(t *testing.T) { // Batch store with no EXPECTs — must not be queried. mockBatchStore := storagemock.NewMockBatchStore(ctrl) mockReqStore := storagemock.NewMockRequestStore(ctrl) - mockReqStore.EXPECT().Get(gomock.Any(), request.ID).Return(request, nil) + mockReqStore.EXPECT().Get(gomock.Any(), request.ID).Return(request, nil).AnyTimes() // No UpdateState expected — gomock fails if called. mockStorage := storagemock.NewMockStorage(ctrl) @@ -419,7 +436,7 @@ func TestController_Process_CASLostToCancel(t *testing.T) { mockBatchDependentStore.EXPECT().Create(gomock.Any(), gomock.Any()).Return(nil) mockReqStore := storagemock.NewMockRequestStore(ctrl) - mockReqStore.EXPECT().Get(gomock.Any(), request.ID).Return(request, nil) + mockReqStore.EXPECT().Get(gomock.Any(), request.ID).Return(request, nil).AnyTimes() mockReqStore.EXPECT().UpdateState( gomock.Any(), request.ID, request.Version, request.Version+1, entity.RequestStateBatched, ).Return(fmt.Errorf("cas: %w", storage.ErrVersionMismatch)) @@ -428,6 +445,7 @@ func TestController_Process_CASLostToCancel(t *testing.T) { mockStorage.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes() mockStorage.EXPECT().GetBatchDependentStore().Return(mockBatchDependentStore).AnyTimes() mockStorage.EXPECT().GetRequestStore().Return(mockReqStore).AnyTimes() + mockStorage.EXPECT().GetChangeStore().Return(anyChangeStore(ctrl)).AnyTimes() // Publisher with no EXPECTs — must not be called. mockPub := queuemock.NewMockPublisher(ctrl) @@ -471,7 +489,7 @@ func TestController_Process_CASUnexpectedErrorPropagates(t *testing.T) { casErr := fmt.Errorf("db connection lost") mockReqStore := storagemock.NewMockRequestStore(ctrl) - mockReqStore.EXPECT().Get(gomock.Any(), request.ID).Return(request, nil) + mockReqStore.EXPECT().Get(gomock.Any(), request.ID).Return(request, nil).AnyTimes() mockReqStore.EXPECT().UpdateState( gomock.Any(), request.ID, request.Version, request.Version+1, entity.RequestStateBatched, ).Return(casErr) @@ -480,6 +498,7 @@ func TestController_Process_CASUnexpectedErrorPropagates(t *testing.T) { mockStorage.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes() mockStorage.EXPECT().GetBatchDependentStore().Return(mockBatchDependentStore).AnyTimes() mockStorage.EXPECT().GetRequestStore().Return(mockReqStore).AnyTimes() + mockStorage.EXPECT().GetChangeStore().Return(anyChangeStore(ctrl)).AnyTimes() controller := newTestController(t, ctrl, newSequentialCounter(ctrl), mockStorage, nil, nil) @@ -516,7 +535,7 @@ func TestController_Process_RecoveryAfterPriorCAS(t *testing.T) { mockBatchDependentStore.EXPECT().Create(gomock.Any(), gomock.Any()).Return(nil) mockReqStore := storagemock.NewMockRequestStore(ctrl) - mockReqStore.EXPECT().Get(gomock.Any(), request.ID).Return(request, nil) + mockReqStore.EXPECT().Get(gomock.Any(), request.ID).Return(request, nil).AnyTimes() mockReqStore.EXPECT().UpdateState( gomock.Any(), request.ID, request.Version, request.Version+1, entity.RequestStateBatched, ).Return(nil) @@ -525,6 +544,7 @@ func TestController_Process_RecoveryAfterPriorCAS(t *testing.T) { mockStorage.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes() mockStorage.EXPECT().GetBatchDependentStore().Return(mockBatchDependentStore).AnyTimes() mockStorage.EXPECT().GetRequestStore().Return(mockReqStore).AnyTimes() + mockStorage.EXPECT().GetChangeStore().Return(anyChangeStore(ctrl)).AnyTimes() controller := newTestController(t, ctrl, newSequentialCounter(ctrl), mockStorage, nil, nil) diff --git a/submitqueue/orchestrator/controller/score/BUILD.bazel b/submitqueue/orchestrator/controller/score/BUILD.bazel index e1f9aa49..130e6afd 100644 --- a/submitqueue/orchestrator/controller/score/BUILD.bazel +++ b/submitqueue/orchestrator/controller/score/BUILD.bazel @@ -8,6 +8,7 @@ go_library( deps = [ "//core/metrics", "//entity/messagequeue", + "//submitqueue/core/batchchanges", "//submitqueue/core/consumer", "//submitqueue/core/request", "//submitqueue/entity", diff --git a/submitqueue/orchestrator/controller/score/score.go b/submitqueue/orchestrator/controller/score/score.go index 6df27ee5..99d18ce3 100644 --- a/submitqueue/orchestrator/controller/score/score.go +++ b/submitqueue/orchestrator/controller/score/score.go @@ -21,6 +21,7 @@ import ( "github.com/uber-go/tally/v4" "github.com/uber/submitqueue/core/metrics" entityqueue "github.com/uber/submitqueue/entity/messagequeue" + "github.com/uber/submitqueue/submitqueue/core/batchchanges" "github.com/uber/submitqueue/submitqueue/core/consumer" corerequest "github.com/uber/submitqueue/submitqueue/core/request" "github.com/uber/submitqueue/submitqueue/entity" @@ -183,7 +184,7 @@ func (c *Controller) scoreBatch(ctx context.Context, batch entity.Batch) (float6 return 0, fmt.Errorf("failed to build scorer for batch %s: %w", batch.ID, err) } - changes, err := c.collectBatchChanges(ctx, batch) + changes, err := batchchanges.Collect(ctx, c.store, batch) if err != nil { return 0, err } @@ -195,34 +196,6 @@ func (c *Controller) scoreBatch(ctx context.Context, batch entity.Batch) (float6 return score, nil } -// collectBatchChanges assembles the normalized entity.BatchChanges for a batch by -// resolving each request and reading its change records per URI. For each URI it -// selects the record owned by the request (GetByURI returns rows for all requests -// that ever claimed the URI) and appends its URI + details. -func (c *Controller) collectBatchChanges(ctx context.Context, batch entity.Batch) (entity.BatchChanges, error) { - changes := entity.BatchChanges{BatchID: batch.ID, Queue: batch.Queue} - for _, requestID := range batch.Contains { - request, err := c.store.GetRequestStore().Get(ctx, requestID) - if err != nil { - return entity.BatchChanges{}, fmt.Errorf("failed to get request %s: %w", requestID, err) - } - for _, uri := range request.Change.URIs { - records, err := c.store.GetChangeStore().GetByURI(ctx, batch.Queue, uri) - if err != nil { - return entity.BatchChanges{}, fmt.Errorf("failed to read change record for request %s uri=%s: %w", requestID, uri, err) - } - for _, rec := range records { - if rec.RequestID != requestID { - continue - } - changes.Changes = append(changes.Changes, entity.ChangeInfo{URI: rec.URI, Details: rec.Details}) - break - } - } - } - return changes, nil -} - // publish publishes a batch ID to the specified topic key. func (c *Controller) publish(ctx context.Context, key consumer.TopicKey, batchID string, partitionKey string) error { bid := entity.BatchID{ID: batchID}