Skip to content

Commit f83707e

Browse files
committed
feat(score): Add stab scorer and implement scoring controller
1 parent 8e3e4af commit f83707e

17 files changed

Lines changed: 509 additions & 58 deletions

File tree

CLAUDE.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -240,7 +240,7 @@ deps = [
240240
### Testing
241241

242242
- **Table-driven tests** — prefer table-driven tests with `t.Run` subtests over individual test functions.
243-
- **Avoid asserting on error messages** — assert on error type or generic error.
243+
- **Avoid asserting on error messages** — assert on error type or check the error with `require.Error`, do not `assert.Contains(t, err.Error(), message)`
244244
- **No change detector tests** — don't assert on default values, internal structure, or implementation details that can change without affecting behavior. Test what the code *does*, not how it's constructed.
245245
- **No `time.Sleep` for synchronization** — use channels, callbacks, condition variables.
246246
- **Use testify**`assert`/`require` instead of `t.Fatal()`.

core/request/BUILD.bazel

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,21 +2,32 @@ load("@rules_go//go:def.bzl", "go_library", "go_test")
22

33
go_library(
44
name = "request",
5-
srcs = ["request.go"],
5+
srcs = [
6+
"log.go",
7+
"request.go",
8+
],
69
importpath = "github.com/uber/submitqueue/core/request",
710
visibility = ["//visibility:public"],
811
deps = [
12+
"//core/consumer",
913
"//entity",
14+
"//entity/queue",
1015
"//extension/storage",
1116
],
1217
)
1318

1419
go_test(
1520
name = "request_test",
16-
srcs = ["request_test.go"],
21+
srcs = [
22+
"log_test.go",
23+
"request_test.go",
24+
],
1725
embed = [":request"],
1826
deps = [
27+
"//core/consumer",
1928
"//entity",
29+
"//entity/queue",
30+
"//extension/queue/mock",
2031
"//extension/storage",
2132
"//extension/storage/mock",
2233
"@com_github_stretchr_testify//assert",

core/request/log.go

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
// Copyright (c) 2025 Uber Technologies, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package request
16+
17+
import (
18+
"context"
19+
"fmt"
20+
21+
"github.com/uber/submitqueue/core/consumer"
22+
"github.com/uber/submitqueue/entity"
23+
entityqueue "github.com/uber/submitqueue/entity/queue"
24+
)
25+
26+
// PublishLog publishes a single request log entry to the log topic for async persistence.
27+
// The partitionKey ensures ordering of log entries for the same request; typically set to the request ID.
28+
func PublishLog(ctx context.Context, registry consumer.TopicRegistry, logEntry entity.RequestLog, partitionKey string) error {
29+
payload, err := logEntry.ToBytes()
30+
if err != nil {
31+
return fmt.Errorf("failed to serialize request log: %w", err)
32+
}
33+
34+
msg := entityqueue.NewMessage(logEntry.RequestID, payload, partitionKey, nil)
35+
36+
q, ok := registry.Queue(consumer.TopicKeyLog)
37+
if !ok {
38+
return fmt.Errorf("no queue registered for topic key %s", consumer.TopicKeyLog)
39+
}
40+
41+
topicName, ok := registry.TopicName(consumer.TopicKeyLog)
42+
if !ok {
43+
return fmt.Errorf("no topic name registered for topic key %s", consumer.TopicKeyLog)
44+
}
45+
46+
if err := q.Publisher().Publish(ctx, topicName, msg); err != nil {
47+
return fmt.Errorf("failed to publish message: %w", err)
48+
}
49+
50+
return nil
51+
}
52+
53+
// PublishBatchLogs publishes a request log entry for each request ID in the batch to the log topic.
54+
// Each entry uses the request ID as the partition key to ensure per-request ordering.
55+
func PublishBatchLogs(ctx context.Context, registry consumer.TopicRegistry, requestIDs []string, status entity.RequestStatus, metadata map[string]string) error {
56+
for _, requestID := range requestIDs {
57+
logEntry := entity.NewRequestLog(requestID, status, 0, "", metadata)
58+
if err := PublishLog(ctx, registry, logEntry, requestID); err != nil {
59+
return fmt.Errorf("failed to publish request log for request %s: %w", requestID, err)
60+
}
61+
}
62+
return nil
63+
}

core/request/log_test.go

Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
// Copyright (c) 2025 Uber Technologies, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package request
16+
17+
import (
18+
"context"
19+
"fmt"
20+
"testing"
21+
22+
"github.com/stretchr/testify/require"
23+
"github.com/uber/submitqueue/core/consumer"
24+
"github.com/uber/submitqueue/entity"
25+
"github.com/uber/submitqueue/entity/queue"
26+
queuemock "github.com/uber/submitqueue/extension/queue/mock"
27+
"go.uber.org/mock/gomock"
28+
)
29+
30+
func newTestRegistry(t *testing.T, ctrl *gomock.Controller, publishErr error) consumer.TopicRegistry {
31+
mockPub := queuemock.NewMockPublisher(ctrl)
32+
mockPub.EXPECT().Publish(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(
33+
func(ctx context.Context, topic string, msg queue.Message) error {
34+
return publishErr
35+
},
36+
).AnyTimes()
37+
38+
mockQ := queuemock.NewMockQueue(ctrl)
39+
mockQ.EXPECT().Publisher().Return(mockPub).AnyTimes()
40+
41+
registry, err := consumer.NewTopicRegistry(
42+
[]consumer.TopicConfig{{Key: consumer.TopicKeyLog, Name: "log", Queue: mockQ}},
43+
)
44+
require.NoError(t, err)
45+
return registry
46+
}
47+
48+
func TestPublishLog_Success(t *testing.T) {
49+
ctrl := gomock.NewController(t)
50+
registry := newTestRegistry(t, ctrl, nil)
51+
52+
logEntry := entity.NewRequestLog("req/1", entity.RequestStatusStarted, 1, "", nil)
53+
err := PublishLog(context.Background(), registry, logEntry, "req/1")
54+
require.NoError(t, err)
55+
}
56+
57+
func TestPublishLog_PublishFailure(t *testing.T) {
58+
ctrl := gomock.NewController(t)
59+
registry := newTestRegistry(t, ctrl, fmt.Errorf("connection refused"))
60+
61+
logEntry := entity.NewRequestLog("req/1", entity.RequestStatusStarted, 1, "", nil)
62+
err := PublishLog(context.Background(), registry, logEntry, "req/1")
63+
require.Error(t, err)
64+
}
65+
66+
func TestPublishBatchLogs_Success(t *testing.T) {
67+
ctrl := gomock.NewController(t)
68+
registry := newTestRegistry(t, ctrl, nil)
69+
70+
err := PublishBatchLogs(context.Background(), registry,
71+
[]string{"req/1", "req/2", "req/3"},
72+
entity.RequestStatusScored,
73+
map[string]string{"batch_id": "b/1"},
74+
)
75+
require.NoError(t, err)
76+
}
77+
78+
func TestPublishBatchLogs_PartialFailure(t *testing.T) {
79+
ctrl := gomock.NewController(t)
80+
81+
callCount := 0
82+
mockPub := queuemock.NewMockPublisher(ctrl)
83+
mockPub.EXPECT().Publish(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(
84+
func(ctx context.Context, topic string, msg queue.Message) error {
85+
callCount++
86+
if callCount == 2 {
87+
return fmt.Errorf("publish failed")
88+
}
89+
return nil
90+
},
91+
).AnyTimes()
92+
93+
mockQ := queuemock.NewMockQueue(ctrl)
94+
mockQ.EXPECT().Publisher().Return(mockPub).AnyTimes()
95+
96+
registry, err := consumer.NewTopicRegistry(
97+
[]consumer.TopicConfig{{Key: consumer.TopicKeyLog, Name: "log", Queue: mockQ}},
98+
)
99+
require.NoError(t, err)
100+
101+
err = PublishBatchLogs(context.Background(), registry,
102+
[]string{"req/1", "req/2", "req/3"},
103+
entity.RequestStatusScored,
104+
map[string]string{"batch_id": "b/1"},
105+
)
106+
require.Error(t, err)
107+
}
108+
109+
func TestPublishBatchLogs_Empty(t *testing.T) {
110+
ctrl := gomock.NewController(t)
111+
registry := newTestRegistry(t, ctrl, nil)
112+
113+
err := PublishBatchLogs(context.Background(), registry, nil, entity.RequestStatusScored, nil)
114+
require.NoError(t, err)
115+
}

entity/batch.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ const (
3232
BatchStateSucceeded BatchState = "succeeded"
3333
// BatchStateFailed is the terminal state of a batch that has failed.
3434
BatchStateFailed BatchState = "failed"
35+
// BatchStateScored is the state of a batch that has been scored for build success probability.
36+
BatchStateScored BatchState = "scored"
3537
// BatchStateCancelled is the terminal state of a batch that was cancelled before completion.
3638
BatchStateCancelled BatchState = "cancelled"
3739
)
@@ -81,6 +83,10 @@ type Batch struct {
8183
// - queueA/batch/3 will contain queueA/batch/1
8284
Dependencies []string
8385

86+
// Score is the predicted probability of build success for this batch, ranging from 0.0 to 1.0.
87+
// Set during the scoring phase. Zero value means the batch has not been scored yet.
88+
Score float64
89+
8490
// The state of the batch lifecycle this batch is in. Updateable field with Version for optimistic locking.
8591
State BatchState
8692

entity/request_log.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,9 @@ const (
4949
// RequestStatusBatched indicates that the request has been included in a new batch and will be sent to speculation.
5050
RequestStatusBatched RequestStatus = "batched"
5151

52+
// RequestStatusScored indicates that the batch containing the request has been scored for build success probability.
53+
RequestStatusScored RequestStatus = "scored"
54+
5255
// RequestStatusSpeculating indicates that the request is currently being speculated (e.g., speculative merge/rebase, etc.).
5356
RequestStatusSpeculating RequestStatus = "speculating"
5457

example/server/orchestrator/BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,14 @@ go_library(
1212
visibility = ["//visibility:private"],
1313
deps = [
1414
"//core/consumer",
15+
"//entity",
1516
"//extension/counter",
1617
"//extension/counter/mysql",
1718
"//extension/mergechecker",
1819
"//extension/mergechecker/github",
1920
"//extension/queue",
2021
"//extension/queue/mysql",
22+
"//extension/scorer/heuristic",
2123
"//extension/storage",
2224
"//extension/storage/mysql",
2325
"//orchestrator/controller",

example/server/orchestrator/main.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,14 @@ import (
3030
_ "github.com/go-sql-driver/mysql"
3131
"github.com/uber-go/tally/v4"
3232
"github.com/uber/submitqueue/core/consumer"
33+
"github.com/uber/submitqueue/entity"
3334
"github.com/uber/submitqueue/extension/counter"
3435
mysqlcounter "github.com/uber/submitqueue/extension/counter/mysql"
3536
"github.com/uber/submitqueue/extension/mergechecker"
3637
githubchecker "github.com/uber/submitqueue/extension/mergechecker/github"
3738
extqueue "github.com/uber/submitqueue/extension/queue"
3839
queueMySQL "github.com/uber/submitqueue/extension/queue/mysql"
40+
"github.com/uber/submitqueue/extension/scorer/heuristic"
3941
"github.com/uber/submitqueue/extension/storage"
4042
mysqlstorage "github.com/uber/submitqueue/extension/storage/mysql"
4143
"github.com/uber/submitqueue/orchestrator/controller"
@@ -417,6 +419,19 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t
417419
logger,
418420
scope,
419421
store,
422+
// TODO: replace with a real scorer
423+
heuristic.New(
424+
[]heuristic.Bucket{
425+
{Min: 0, Max: 1, Score: 0.95},
426+
{Min: 2, Max: 5, Score: 0.80},
427+
{Min: 6, Max: 20, Score: 0.60},
428+
{Min: 21, Max: 1<<31 - 1, Score: 0.40},
429+
},
430+
func(_ context.Context, change entity.Change) (int, error) {
431+
return len(change.URIs), nil
432+
},
433+
scope.SubScope("scorer"),
434+
),
420435
registry,
421436
consumer.TopicKeyScore,
422437
"orchestrator-score",

extension/storage/batch_store.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,10 @@ type BatchStore interface {
3535
// The implementation should increment the version by 1 atomically with the state update.
3636
UpdateState(ctx context.Context, id string, version int32, newState entity.BatchState) error
3737

38+
// UpdateScoreAndState atomically updates the score and state of a batch if the current version matches the expected version.
39+
// If versions do not match, returns ErrVersionMismatch. The implementation should increment the version by 1 atomically.
40+
UpdateScoreAndState(ctx context.Context, id string, version int32, score float64, newState entity.BatchState) error
41+
3842
// GetByQueueAndStates retrieves all batches that belong to the given queue and are in the given states.
3943
GetByQueueAndStates(ctx context.Context, queue string, states []entity.BatchState) ([]entity.Batch, error)
4044
}

extension/storage/mock/batch_store_mock.go

Lines changed: 14 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)