Skip to content

Commit de8932c

Browse files
committed
feat(buildrunner): poll-driven buildsignal + pipeline wiring
## Summary ### Why? The orchestrator's build stage needs to drive the `BuildRunner` contract end-to-end: trigger the runner, persist the result, and poll `Status` until terminal so the batch state machine can react. Polling has to behave like the rest of the pipeline (queue-driven, partition-isolated, restart-safe) rather than running as an in-process timer loop. ### What? Stacks on top of the BuildRunner interface, noop, and `PublishAfter` PRs. Wires the runner into the orchestrator pipeline. The build poll loop runs as queue traffic inside the existing `buildsignal` consumer (no separate stage). On each delivery it loads the `Build` from storage, calls `BuildRunner.Status`, persists the result via `BuildStore.UpdateStatus`, publishes the batch ID to `speculate` so the state machine re-evaluates, and re-publishes itself via `Publisher.PublishAfter` until the build reaches a terminal state. A webhook-capable backend can publish into the same topic — the consumer cannot tell a poll-driven message from a push. Only the build **ID** travels on the queue (`entity.BuildID`); the consumer reloads the full `Build` from `BuildStore`, keeping the message small and storage the single source of truth — the same ID-on-the-queue, load-from-storage pattern the rest of the pipeline already uses for batches and requests. The controllers consume the runner's `entity.BuildID` signatures (`Trigger` returns one; `Status` takes one). Pieces: - `orchestrator/controller/build`: assembles `base` from `batch.Dependencies` and `head` from `batch.Contains`, calls `Trigger`, persists the initial `Build{Accepted}` via `BuildStore.Create` (`ErrAlreadyExists` is swallowed for redelivery), publishes the build ID to `buildsignal`. - `orchestrator/controller/buildsignal`: the polling consumer described above. It loads the `Build` by ID, then polls. `PollDelayAcceptedMs=5000`, `PollDelayRunningMs=2000` by default (vars so tests can override; a TODO notes these should move into the `queueconfig` extension). Error classification: only the `PublishAfter` re-schedule is wrapped retryable (`errs.NewRetryableError`) — it is the poll loop's heartbeat, so a transient enqueue blip nacks and replays (up to `MaxAttempts`) rather than rejecting the loop's only live message straight to DLQ. Deserialize, the `Build` load, `Status`, `UpdateStatus`, and the speculate publish stay non-retryable and reject to DLQ on first failure, where an operational republish is the recovery path. - `example/server/orchestrator/main.go`: passes the `BuildRunner` to both `build.NewController` and `buildsignal.NewController`; pipeline diagram updated. - root `BUILD.bazel`: adds `# gazelle:exclude .claude` so gazelle does not index nested worktrees as duplicate rule definitions and corrupt the canonical BUILD files. ## Test Plan - ✅ `bazel test //extension/buildrunner/... //orchestrator/controller/build/... //orchestrator/controller/buildsignal/... //extension/queue/...` — all pass. - ✅ `make fmt lint check-tidy check-gazelle check-mocks` — clean. - ✅ `make build` — all targets compile. - New coverage: build controller persist+publish path (with `ErrAlreadyExists` swallow), buildsignal poll loop (terminal forwards to speculate, non-terminal re-publishes via `PublishAfter` with per-status delay, retryable re-publish failure, non-retryable build-load / `Status` / `UpdateStatus` failures reject to DLQ).
1 parent 64d1a6e commit de8932c

10 files changed

Lines changed: 634 additions & 134 deletions

File tree

BUILD.bazel

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,10 @@ load("@gazelle//:def.bzl", "gazelle")
22

33
# gazelle:prefix github.com/uber/submitqueue
44

5+
# Exclude nested worktrees (created under .claude/worktrees) so gazelle does not
6+
# index them as duplicate rule definitions and corrupt the canonical BUILD files.
7+
# gazelle:exclude .claude
8+
59
# Resolve protobuf import ambiguities - use the actual protopb packages, not the proto aliases
610
# gazelle:resolve go github.com/uber/submitqueue/gateway/protopb //gateway/protopb
711
# gazelle:resolve go github.com/uber/submitqueue/orchestrator/protopb //orchestrator/protopb

core/consumer/registry.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,11 @@ const (
4040
TopicKeySpeculate TopicKey = "speculate"
4141
// TopicKeyBuild is the pipeline stage where speculated batches are published for builds.
4242
TopicKeyBuild TopicKey = "build"
43-
// TopicKeyBuildSignal is the pipeline stage where builds are published for build signal processing.
43+
// TopicKeyBuildSignal is the polling stage for triggered builds. Each
44+
// message carries a Build; the consumer calls BuildRunner.Status,
45+
// persists the latest status, publishes the batch ID to TopicKeySpeculate
46+
// so the state machine re-evaluates, and re-publishes itself via
47+
// PublishAfter when the build has not yet reached a terminal state.
4448
TopicKeyBuildSignal TopicKey = "buildsignal"
4549
// TopicKeyMerge is the pipeline stage where speculated batches are published for merging.
4650
TopicKeyMerge TopicKey = "merge"

example/server/orchestrator/BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ go_library(
1414
"//core/consumer",
1515
"//core/httpclient",
1616
"//entity",
17+
"//extension/buildrunner",
18+
"//extension/buildrunner/noop",
1719
"//extension/changeprovider",
1820
"//extension/changeprovider/github",
1921
"//extension/changestore",

example/server/orchestrator/main.go

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ import (
3333
"github.com/uber/submitqueue/core/consumer"
3434
"github.com/uber/submitqueue/core/httpclient"
3535
"github.com/uber/submitqueue/entity"
36+
"github.com/uber/submitqueue/extension/buildrunner"
37+
buildnoop "github.com/uber/submitqueue/extension/buildrunner/noop"
3638
"github.com/uber/submitqueue/extension/changeprovider"
3739
githubprovider "github.com/uber/submitqueue/extension/changeprovider/github"
3840
"github.com/uber/submitqueue/extension/changestore"
@@ -216,8 +218,12 @@ func run() error {
216218
return fmt.Errorf("failed to create pusher: %w", err)
217219
}
218220

221+
// Create build runner. The noop runner is the pass-through default
222+
// (every build immediately succeeds) until a real backend is wired in.
223+
br := buildnoop.New()
224+
219225
// Register controllers
220-
if err := registerControllers(c, logger.Sugar(), scope, registry, mc, cp, psh, cnt, store, changeStore); err != nil {
226+
if err := registerControllers(c, logger.Sugar(), scope, registry, mc, cp, psh, br, cnt, store, changeStore); err != nil {
221227
return err
222228
}
223229

@@ -397,12 +403,12 @@ func newTopicRegistry(q extqueue.Queue, subscriberName string) (consumer.TopicRe
397403
// Pipeline:
398404
//
399405
// request → validate → batch → score → speculate → build → buildsignal ─┐
400-
// ↑ ↘
401-
// │ merge → conclude
402-
// │ │
403-
// └────────┴───────────────────────
406+
// ↑ ↘ ↻ poll
407+
// │ merge → conclude │
408+
// │ │ │
409+
// └────────┴───────────────────────┘
404410

405-
func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope tally.Scope, registry consumer.TopicRegistry, mc mergechecker.MergeChecker, cp changeprovider.ChangeProvider, psh pusher.Pusher, cnt counter.Counter, store storage.Storage, changeStore changestore.ChangeStore) error {
411+
func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope tally.Scope, registry consumer.TopicRegistry, mc mergechecker.MergeChecker, cp changeprovider.ChangeProvider, psh pusher.Pusher, br buildrunner.BuildRunner, cnt counter.Counter, store storage.Storage, changeStore changestore.ChangeStore) error {
406412
requestController := start.NewController(
407413
logger,
408414
scope,
@@ -488,6 +494,7 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t
488494
logger,
489495
scope,
490496
store,
497+
br,
491498
registry,
492499
consumer.TopicKeyBuild,
493500
"orchestrator-build",
@@ -500,6 +507,7 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t
500507
logger,
501508
scope,
502509
store,
510+
br,
503511
registry,
504512
consumer.TopicKeyBuildSignal,
505513
"orchestrator-buildsignal",

orchestrator/controller/build/BUILD.bazel

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ go_library(
1010
"//core/metrics",
1111
"//entity",
1212
"//entity/queue",
13+
"//extension/buildrunner",
1314
"//extension/storage",
1415
"@com_github_uber_go_tally_v4//:tally",
1516
"@org_uber_go_zap//:zap",
@@ -25,7 +26,11 @@ go_test(
2526
"//core/errs",
2627
"//entity",
2728
"//entity/queue",
29+
"//extension/buildrunner",
30+
"//extension/buildrunner/mock",
31+
"//extension/buildrunner/noop",
2832
"//extension/queue/mock",
33+
"//extension/storage",
2934
"//extension/storage/mock",
3035
"@com_github_stretchr_testify//assert",
3136
"@com_github_stretchr_testify//require",

orchestrator/controller/build/build.go

Lines changed: 70 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,15 @@ package build
1616

1717
import (
1818
"context"
19+
"errors"
1920
"fmt"
2021

2122
"github.com/uber-go/tally/v4"
2223
"github.com/uber/submitqueue/core/consumer"
2324
"github.com/uber/submitqueue/core/metrics"
2425
"github.com/uber/submitqueue/entity"
2526
entityqueue "github.com/uber/submitqueue/entity/queue"
27+
"github.com/uber/submitqueue/extension/buildrunner"
2628
"github.com/uber/submitqueue/extension/storage"
2729
"go.uber.org/zap"
2830
)
@@ -34,6 +36,7 @@ type Controller struct {
3436
logger *zap.SugaredLogger
3537
metricsScope tally.Scope
3638
store storage.Storage
39+
buildRunner buildrunner.BuildRunner
3740
registry consumer.TopicRegistry
3841
topicKey consumer.TopicKey
3942
consumerGroup string
@@ -47,6 +50,7 @@ func NewController(
4750
logger *zap.SugaredLogger,
4851
scope tally.Scope,
4952
store storage.Storage,
53+
buildRunner buildrunner.BuildRunner,
5054
registry consumer.TopicRegistry,
5155
topicKey consumer.TopicKey,
5256
consumerGroup string,
@@ -55,6 +59,7 @@ func NewController(
5559
logger: logger.Named("build_controller"),
5660
metricsScope: scope.SubScope("build_controller"),
5761
store: store,
62+
buildRunner: buildRunner,
5863
registry: registry,
5964
topicKey: topicKey,
6065
consumerGroup: consumerGroup,
@@ -95,17 +100,45 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (r
95100
"partition_key", msg.PartitionKey,
96101
)
97102

98-
// TODO: Add build logic
99-
// - Trigger CI build
100-
// - Track build status
103+
// Assemble base (dependency batches in order) and head (this batch).
104+
base, err := c.collectChanges(ctx, batch.Dependencies)
105+
if err != nil {
106+
metrics.NamedCounter(c.metricsScope, opName, "storage_errors", 1)
107+
return fmt.Errorf("failed to assemble base changes for batch %s: %w", batch.ID, err)
108+
}
109+
head, err := c.collectChanges(ctx, []string{batch.ID})
110+
if err != nil {
111+
metrics.NamedCounter(c.metricsScope, opName, "storage_errors", 1)
112+
return fmt.Errorf("failed to assemble head changes for batch %s: %w", batch.ID, err)
113+
}
114+
115+
// Trigger the build with the configured build manager. metadata is nil
116+
// until a caller-supplied source materializes (e.g. requester / ticket
117+
// pulled off the originating LandRequest).
118+
buildID, err := c.buildRunner.Trigger(ctx, batch.Queue, base, head, nil)
119+
if err != nil {
120+
metrics.NamedCounter(c.metricsScope, opName, "trigger_errors", 1)
121+
return fmt.Errorf("failed to trigger build for batch %s: %w", batch.ID, err)
122+
}
101123

102124
build := entity.Build{
103-
ID: batch.ID,
104-
BatchID: batch.ID,
105-
Status: entity.BuildStatusAccepted,
125+
ID: buildID.ID,
126+
BatchID: batch.ID,
127+
SpeculationPath: entity.SpeculationPathInfo{Base: append([]string{}, batch.Dependencies...)},
128+
Status: entity.BuildStatusAccepted,
129+
}
130+
131+
// Persist the initial Build snapshot so the buildsignal poll loop has a
132+
// row to UpdateStatus against. ErrAlreadyExists is benign — a redelivery
133+
// of this message after a previous successful Create.
134+
if err := c.store.GetBuildStore().Create(ctx, build); err != nil && !errors.Is(err, storage.ErrAlreadyExists) {
135+
metrics.NamedCounter(c.metricsScope, opName, "storage_errors", 1)
136+
return fmt.Errorf("failed to persist build %s: %w", build.ID, err)
106137
}
107138

108-
// Publish build to build signal topic
139+
// Hand off to the buildsignal poll loop; it calls Status, updates the
140+
// persisted Build, publishes to speculate, and re-publishes itself via
141+
// PublishAfter until terminal.
109142
if err := c.publish(ctx, consumer.TopicKeyBuildSignal, build); err != nil {
110143
metrics.NamedCounter(c.metricsScope, opName, "publish_errors", 1)
111144
return fmt.Errorf("failed to publish to buildsignal: %w", err)
@@ -114,17 +147,44 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (r
114147
c.logger.Infow("published build to buildsignal",
115148
"batch_id", batch.ID,
116149
"build_id", build.ID,
150+
"status", string(build.Status),
117151
"topic_key", consumer.TopicKeyBuildSignal,
118152
)
119153

120154
return nil // Success - message will be acked
121155
}
122156

123-
// publish publishes a build to the specified topic key.
157+
// collectChanges loads each batch by ID and concatenates the Change values
158+
// from its contained requests in batch order. Used to build the base
159+
// (dependency batches) and head (this batch) inputs to BuildRunner.Trigger.
160+
func (c *Controller) collectChanges(ctx context.Context, batchIDs []string) ([]entity.Change, error) {
161+
if len(batchIDs) == 0 {
162+
return nil, nil
163+
}
164+
var changes []entity.Change
165+
for _, bID := range batchIDs {
166+
b, err := c.store.GetBatchStore().Get(ctx, bID)
167+
if err != nil {
168+
return nil, fmt.Errorf("failed to get batch %s: %w", bID, err)
169+
}
170+
for _, reqID := range b.Contains {
171+
req, err := c.store.GetRequestStore().Get(ctx, reqID)
172+
if err != nil {
173+
return nil, fmt.Errorf("failed to get request %s for batch %s: %w", reqID, bID, err)
174+
}
175+
changes = append(changes, req.Change)
176+
}
177+
}
178+
return changes, nil
179+
}
180+
181+
// publish publishes a build's ID to the specified topic key. Only the
182+
// identifier travels on the queue; the consumer loads the full Build from
183+
// storage, keeping the message small and the store the single source of truth.
124184
func (c *Controller) publish(ctx context.Context, key consumer.TopicKey, build entity.Build) error {
125-
payload, err := build.ToBytes()
185+
payload, err := entity.BuildID{ID: build.ID}.ToBytes()
126186
if err != nil {
127-
return fmt.Errorf("failed to serialize build: %w", err)
187+
return fmt.Errorf("failed to serialize build ID: %w", err)
128188
}
129189

130190
msg := entityqueue.NewMessage(build.ID, payload, build.BatchID, nil)

0 commit comments

Comments
 (0)