Skip to content

Commit 3e09ea0

Browse files
committed
refactor(conflict): take a batch of changes as analyzer input
## Summary ### Why? The conflict analyzer received entity.Batch values (request IDs only), so a real analyzer — eventually backed by uber/tango ChangedTargetsAndEdges — could not see the actual changes without storage lookups, which extensions must not do. The scorer already solved this by moving to entity.BatchChanges; mirror it here. ### What? - conflict.Analyzer.Analyze input changes from entity.Batch to entity.BatchChanges for both candidate and in-flight; the output ([]Conflict) is unchanged. No new entity types — a Tango-backed analyzer derives targets from the change URIs/files itself, with the target branch injected per queue at construction. - New submitqueue/core/batchchanges.Collect: shared assembler that resolves a batch's request IDs -> change records -> entity.BatchChanges. The score and batch controllers both use it (extracted from the score controller). - batch controller assembles candidate + in-flight BatchChanges before Analyze. - all/none updated to the new input; conflict/fake switched to the sq-fake=conflict-error URI marker (drops FailOn/FailAlways); mock regenerated. - example: conflictfake.New(all/none) without a predicate; removed the dedicated e2e-conflict-error-queue (the marker now reaches the analyzer on any queue). ## Test Plan - ✅ make test (45/45 unit), make e2e-test (1/1), make integration-test (7/7) - ✅ bazel build //...; check-gazelle / check-tidy / check-mocks clean once committed
1 parent 903c210 commit 3e09ea0

19 files changed

Lines changed: 327 additions & 153 deletions

File tree

example/submitqueue/gateway/server/queues.yaml

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,3 @@ queues:
77
- name: test-queue
88
- name: e2e-test-queue
99
- name: e2e-cancel-queue
10-
# Routes to an analyzer that always errors (conflictfake.FailAlways) so e2e can
11-
# exercise the conflict-analysis error path. See newQueueRegistry in the
12-
# orchestrator example server.
13-
- name: e2e-conflict-error-queue

example/submitqueue/orchestrator/server/main.go

Lines changed: 7 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -779,10 +779,9 @@ func newQueueRegistry(logger *zap.Logger, scope tally.Scope) (queueRegistry, err
779779
//
780780
// The scorer is wrapped by scorerfake so a change URI carrying
781781
// "sq-fake=score-error" forces a scoring error end-to-end; it is a pure
782-
// passthrough otherwise. The analyzer is wrapped by conflictfake with a nil
783-
// predicate (passthrough) — swap the predicate (e.g. conflictfake.FailAlways)
784-
// on a queue to exercise the analyzer error path, as e2e-conflict-error-queue
785-
// below does.
782+
// passthrough otherwise. The analyzer is wrapped by conflictfake so a change
783+
// URI carrying "sq-fake=conflict-error" forces a conflict-analysis error;
784+
// passthrough otherwise.
786785
base := queueExtensions{
787786
mergeChecker: mc,
788787
changeProvider: cp,
@@ -794,7 +793,7 @@ func newQueueRegistry(logger *zap.Logger, scope tally.Scope) (queueRegistry, err
794793
)),
795794
// TODO: replace the delegate with a real analyzer (e.g. Tango target
796795
// analysis). "all" serializes the queue conservatively.
797-
analyzer: conflictfake.New(all.New(), nil),
796+
analyzer: conflictfake.New(all.New()),
798797
}
799798

800799
// test-queue: bucketed heuristic scorer; conservative (serialized) conflicts
@@ -812,7 +811,7 @@ func newQueueRegistry(logger *zap.Logger, scope tally.Scope) (queueRegistry, err
812811

813812
// e2e-test-queue: composite scorer; no conflicts (maximum parallelism).
814813
e2eQueue := base
815-
e2eQueue.analyzer = conflictfake.New(none.New(), nil)
814+
e2eQueue.analyzer = conflictfake.New(none.New())
816815
e2eQueue.scorer = scorerfake.New(composite.New(
817816
map[string]scorer.Scorer{
818817
"size": heuristic.New([]heuristic.Bucket{{Min: 0, Max: 1<<31 - 1, Score: 0.8}}, batchLines, scope),
@@ -821,17 +820,11 @@ func newQueueRegistry(logger *zap.Logger, scope tally.Scope) (queueRegistry, err
821820
composite.Avg, scope.SubScope("scorer.e2e-test-queue"),
822821
))
823822

824-
// e2e-conflict-error-queue: every conflict analysis fails, exercising the
825-
// analyzer error path. Scorer/edge integrations inherit the baseline.
826-
conflictErrQueue := base
827-
conflictErrQueue.analyzer = conflictfake.New(all.New(), conflictfake.FailAlways)
828-
829823
return queueRegistry{
830824
def: base,
831825
byQueue: map[string]queueExtensions{
832-
"test-queue": testQueue,
833-
"e2e-test-queue": e2eQueue,
834-
"e2e-conflict-error-queue": conflictErrQueue,
826+
"test-queue": testQueue,
827+
"e2e-test-queue": e2eQueue,
835828
},
836829
}, nil
837830
}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
load("@rules_go//go:def.bzl", "go_library", "go_test")
2+
3+
go_library(
4+
name = "batchchanges",
5+
srcs = ["batchchanges.go"],
6+
importpath = "github.com/uber/submitqueue/submitqueue/core/batchchanges",
7+
visibility = ["//visibility:public"],
8+
deps = [
9+
"//submitqueue/entity",
10+
"//submitqueue/extension/storage",
11+
],
12+
)
13+
14+
go_test(
15+
name = "batchchanges_test",
16+
srcs = ["batchchanges_test.go"],
17+
embed = [":batchchanges"],
18+
deps = [
19+
"//submitqueue/entity",
20+
"//submitqueue/extension/storage/mock",
21+
"@com_github_stretchr_testify//assert",
22+
"@com_github_stretchr_testify//require",
23+
"@org_uber_go_mock//gomock",
24+
],
25+
)
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
// Copyright (c) 2025 Uber Technologies, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
// Package batchchanges assembles the normalized, batch-level view of a batch's
16+
// changes (entity.BatchChanges) from storage. A Batch references only request
17+
// IDs, so resolving the actual change facts requires reading each request and
18+
// its per-URI change records. Centralizing this here keeps that storage
19+
// traversal out of the extensions (scorer, conflict analyzer), which consume
20+
// entity.BatchChanges and must not touch storage themselves.
21+
package batchchanges
22+
23+
import (
24+
"context"
25+
"fmt"
26+
27+
"github.com/uber/submitqueue/submitqueue/entity"
28+
"github.com/uber/submitqueue/submitqueue/extension/storage"
29+
)
30+
31+
// Collect assembles the normalized entity.BatchChanges for a batch by resolving
32+
// each request and reading its change records per URI. For each URI it selects
33+
// the record owned by the request (GetByURI returns rows for every request that
34+
// ever claimed the URI) and appends its URI + provider-supplied details.
35+
func Collect(ctx context.Context, store storage.Storage, batch entity.Batch) (entity.BatchChanges, error) {
36+
changes := entity.BatchChanges{BatchID: batch.ID, Queue: batch.Queue}
37+
for _, requestID := range batch.Contains {
38+
request, err := store.GetRequestStore().Get(ctx, requestID)
39+
if err != nil {
40+
return entity.BatchChanges{}, fmt.Errorf("failed to get request %s: %w", requestID, err)
41+
}
42+
for _, uri := range request.Change.URIs {
43+
records, err := store.GetChangeStore().GetByURI(ctx, batch.Queue, uri)
44+
if err != nil {
45+
return entity.BatchChanges{}, fmt.Errorf("failed to read change record for request %s uri=%s: %w", requestID, uri, err)
46+
}
47+
for _, rec := range records {
48+
if rec.RequestID != requestID {
49+
continue
50+
}
51+
changes.Changes = append(changes.Changes, entity.ChangeInfo{URI: rec.URI, Details: rec.Details})
52+
break
53+
}
54+
}
55+
}
56+
return changes, nil
57+
}
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
// Copyright (c) 2025 Uber Technologies, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package batchchanges
16+
17+
import (
18+
"context"
19+
"testing"
20+
21+
"github.com/stretchr/testify/assert"
22+
"github.com/stretchr/testify/require"
23+
"github.com/uber/submitqueue/submitqueue/entity"
24+
storagemock "github.com/uber/submitqueue/submitqueue/extension/storage/mock"
25+
"go.uber.org/mock/gomock"
26+
)
27+
28+
func TestCollect(t *testing.T) {
29+
ctrl := gomock.NewController(t)
30+
31+
batch := entity.Batch{ID: "q/batch/1", Queue: "q", Contains: []string{"q/1", "q/2"}}
32+
req1 := entity.Request{ID: "q/1", Change: entity.Change{URIs: []string{"github://o/r/pull/1/a"}}}
33+
req2 := entity.Request{ID: "q/2", Change: entity.Change{URIs: []string{"github://o/r/pull/2/b"}}}
34+
rec1 := entity.ChangeRecord{Queue: "q", URI: "github://o/r/pull/1/a", RequestID: "q/1",
35+
Details: entity.ChangeDetails{ChangedFiles: []entity.ChangedFile{{Path: "f1", LinesAdded: 3}}}}
36+
// A stray record for the same URI owned by a different request must be skipped.
37+
recOther := entity.ChangeRecord{Queue: "q", URI: "github://o/r/pull/1/a", RequestID: "q/999"}
38+
rec2 := entity.ChangeRecord{Queue: "q", URI: "github://o/r/pull/2/b", RequestID: "q/2",
39+
Details: entity.ChangeDetails{ChangedFiles: []entity.ChangedFile{{Path: "f2", LinesAdded: 5}}}}
40+
41+
reqStore := storagemock.NewMockRequestStore(ctrl)
42+
reqStore.EXPECT().Get(gomock.Any(), "q/1").Return(req1, nil)
43+
reqStore.EXPECT().Get(gomock.Any(), "q/2").Return(req2, nil)
44+
45+
changeStore := storagemock.NewMockChangeStore(ctrl)
46+
changeStore.EXPECT().GetByURI(gomock.Any(), "q", "github://o/r/pull/1/a").Return([]entity.ChangeRecord{recOther, rec1}, nil)
47+
changeStore.EXPECT().GetByURI(gomock.Any(), "q", "github://o/r/pull/2/b").Return([]entity.ChangeRecord{rec2}, nil)
48+
49+
store := storagemock.NewMockStorage(ctrl)
50+
store.EXPECT().GetRequestStore().Return(reqStore).AnyTimes()
51+
store.EXPECT().GetChangeStore().Return(changeStore).AnyTimes()
52+
53+
got, err := Collect(context.Background(), store, batch)
54+
require.NoError(t, err)
55+
assert.Equal(t, "q/batch/1", got.BatchID)
56+
assert.Equal(t, "q", got.Queue)
57+
require.Len(t, got.Changes, 2)
58+
assert.Equal(t, "github://o/r/pull/1/a", got.Changes[0].URI)
59+
assert.Equal(t, "github://o/r/pull/2/b", got.Changes[1].URI)
60+
assert.Equal(t, 8, got.TotalLinesChanged())
61+
}

submitqueue/extension/conflict/README.md

Lines changed: 24 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -5,23 +5,29 @@ and the batches already in flight.
55

66
## Interface
77

8-
`Analyzer` exposes a single `Analyze` method that takes the candidate batch
9-
and the list of in-flight batches it might conflict with. It returns the
10-
subset of in-flight batches that conflict with the candidate, each paired
11-
with a `ConflictType` describing the kind of conflict. An empty result means
8+
`Analyzer` exposes a single `Analyze` method that takes the candidate batch's
9+
changes and the list of in-flight batches' changes it might conflict with.
10+
Inputs are `entity.BatchChanges` — each batch's flattened change facts (URIs +
11+
provider details), assembled by the caller — so the analyzer sees the actual
12+
changes rather than just batch IDs, and can reason about changed targets/files
13+
without reading storage itself. It returns the subset of in-flight batches that
14+
conflict with the candidate, each paired with a `ConflictType` describing the
15+
kind of conflict (referenced by `BatchChanges.BatchID`). An empty result means
1216
the candidate is free to advance independently.
1317

14-
Callers are responsible for filtering out the candidate itself and any
15-
terminal batches from the in-flight list before invoking the analyzer. The
16-
analyzer itself stays free of lifecycle knowledge. A non-nil error reports
17-
an infrastructure failure of the analysis and should be treated as
18-
retryable by the caller.
18+
Callers are responsible for assembling `entity.BatchChanges` (see
19+
`submitqueue/core/batchchanges`), and for filtering out the candidate itself and
20+
any terminal batches from the in-flight list before invoking the analyzer. The
21+
analyzer itself stays free of lifecycle knowledge. A non-nil error reports an
22+
infrastructure failure of the analysis and should be treated as retryable by the
23+
caller.
1924

20-
The analyzer is intentionally pure with respect to batch state: it does not
21-
mutate inputs, does not read storage, and may be called concurrently. Real
22-
implementations are expected to resolve the batch contents (e.g. changed
23-
build targets, modified files) via whichever upstream system they depend
24-
on, and to return as much classification detail as that system supports.
25+
The analyzer is intentionally pure with respect to storage: it does not mutate
26+
inputs, does not read storage, and may be called concurrently. A real
27+
implementation (e.g. one backed by uber/tango) derives changed build
28+
targets/edges from the change set — `Changes[].URI` carries the repo + head
29+
commit SHA, and the target branch is injected per queue at construction — and
30+
returns as much classification detail as that system supports.
2531

2632
## Implementations
2733

@@ -35,8 +41,10 @@ on, and to return as much classification detail as that system supports.
3541
## Adding a new backend
3642

3743
1. Create `extension/conflict/{backend}/` with an `Analyzer` implementation.
38-
2. Resolve each `entity.Batch` into whatever signal the backend needs
39-
(e.g. changed build targets, files touched, dependency graphs).
44+
2. Derive whatever signal the backend needs from each `entity.BatchChanges`
45+
(e.g. changed build targets, files touched, dependency graphs) — the change
46+
URIs and provider details are in hand; resolve the rest via your upstream
47+
system.
4048
3. Emit one `Conflict` per (in-flight batch, detected conflict type). Pick
4149
the most specific `ConflictType` your backend can determine; use
4250
`ConflictTypeConservative` only when the backend cannot prove the absence

submitqueue/extension/conflict/all/all.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,14 +36,14 @@ func New() conflict.Analyzer {
3636

3737
// Analyze returns one ConflictTypeConservative Conflict per in-flight batch,
3838
// preserving the input order. Returns an empty slice when inFlight is empty.
39-
func (analyzer) Analyze(_ context.Context, _ entity.Batch, inFlight []entity.Batch) ([]conflict.Conflict, error) {
39+
func (analyzer) Analyze(_ context.Context, _ entity.BatchChanges, inFlight []entity.BatchChanges) ([]conflict.Conflict, error) {
4040
if len(inFlight) == 0 {
4141
return nil, nil
4242
}
4343
conflicts := make([]conflict.Conflict, len(inFlight))
4444
for i, b := range inFlight {
4545
conflicts[i] = conflict.Conflict{
46-
BatchID: b.ID,
46+
BatchID: b.BatchID,
4747
Type: conflict.ConflictTypeConservative,
4848
}
4949
}

submitqueue/extension/conflict/all/all_test.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,11 @@ import (
2525
)
2626

2727
func TestAnalyze(t *testing.T) {
28-
batch := entity.Batch{ID: "queueA/batch/10"}
28+
candidate := entity.BatchChanges{BatchID: "queueA/batch/10"}
2929

3030
tests := []struct {
3131
name string
32-
inFlight []entity.Batch
32+
inFlight []entity.BatchChanges
3333
want []conflict.Conflict
3434
}{
3535
{
@@ -39,15 +39,15 @@ func TestAnalyze(t *testing.T) {
3939
},
4040
{
4141
name: "empty in-flight slice yields no conflicts",
42-
inFlight: []entity.Batch{},
42+
inFlight: []entity.BatchChanges{},
4343
want: nil,
4444
},
4545
{
4646
name: "every in-flight batch is reported in input order",
47-
inFlight: []entity.Batch{
48-
{ID: "queueA/batch/1"},
49-
{ID: "queueA/batch/2"},
50-
{ID: "queueA/batch/3"},
47+
inFlight: []entity.BatchChanges{
48+
{BatchID: "queueA/batch/1"},
49+
{BatchID: "queueA/batch/2"},
50+
{BatchID: "queueA/batch/3"},
5151
},
5252
want: []conflict.Conflict{
5353
{BatchID: "queueA/batch/1", Type: conflict.ConflictTypeConservative},
@@ -60,7 +60,7 @@ func TestAnalyze(t *testing.T) {
6060
a := New()
6161
for _, tt := range tests {
6262
t.Run(tt.name, func(t *testing.T) {
63-
got, err := a.Analyze(context.Background(), batch, tt.inFlight)
63+
got, err := a.Analyze(context.Background(), candidate, tt.inFlight)
6464
require.NoError(t, err)
6565
assert.Equal(t, tt.want, got)
6666
})

submitqueue/extension/conflict/conflict.go

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -56,15 +56,21 @@ type Conflict struct {
5656
// already in flight, so the speculation layer can decide which batches can
5757
// safely advance in parallel.
5858
type Analyzer interface {
59-
// Analyze returns the subset of inFlight batches that conflict with
60-
// batch, each paired with the type of conflict detected. An empty
61-
// result means batch does not conflict with any in-flight batch.
59+
// Analyze returns the subset of inFlight batches that conflict with the
60+
// candidate, each paired with the type of conflict detected. An empty
61+
// result means the candidate does not conflict with any in-flight batch.
6262
//
63-
// Callers should not include batch itself in inFlight; terminal batches
64-
// should be filtered out before calling. A non-nil error indicates the
65-
// analysis itself failed (infrastructure issue) and should be treated as
66-
// retryable by the caller.
67-
Analyze(ctx context.Context, batch entity.Batch, inFlight []entity.Batch) ([]Conflict, error)
63+
// Inputs are entity.BatchChanges — each batch's flattened change facts
64+
// (URIs + provider details), assembled by the caller — so the analyzer
65+
// sees the actual changes rather than just batch IDs, and can reason about
66+
// changed targets/files without touching storage. A conflict references the
67+
// in-flight batch by its BatchChanges.BatchID.
68+
//
69+
// Callers should not include the candidate itself in inFlight; terminal
70+
// batches should be filtered out before calling. A non-nil error indicates
71+
// the analysis itself failed (infrastructure issue) and should be treated
72+
// as retryable by the caller.
73+
Analyze(ctx context.Context, candidate entity.BatchChanges, inFlight []entity.BatchChanges) ([]Conflict, error)
6874
}
6975

7076
// Config carries the per-queue identity handed to a Factory. The system knows

0 commit comments

Comments
 (0)