Skip to content

Commit dae55ea

Browse files
committed
feat(orchestrator): add pipeline stage controllers for batch, speculate, build, merge, and finalize
1 parent 0e2d740 commit dae55ea

24 files changed

Lines changed: 2009 additions & 25 deletions

File tree

core/consumer/registry.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,18 @@ const (
1010
TopicRequest Topic = "request"
1111
// TopicToBatch is where validated requests are published for batching.
1212
TopicToBatch Topic = "to-batch"
13+
// TopicBatched is where batched requests are published for speculation.
14+
TopicBatched Topic = "batched"
15+
// TopicBuild is where requests are published for builds.
16+
TopicBuild Topic = "build"
17+
// TopicBuildSignal is where build signals are published for processing.
18+
TopicBuildSignal Topic = "build-signal"
19+
// TopicToMerge is where requests are published for merging.
20+
TopicToMerge Topic = "to-merge"
21+
// TopicMergeSignal is where merge signals are published for processing.
22+
TopicMergeSignal Topic = "merge-signal"
23+
// TopicFinalize is where requests are published for finalization.
24+
TopicFinalize Topic = "finalize"
1325
)
1426

1527
// String returns the topic name as a string.

example/server/orchestrator/BUILD.bazel

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,14 @@ go_library(
1515
"//extension/queue",
1616
"//extension/queue/mysql",
1717
"//orchestrator/controller",
18+
"//orchestrator/controller/batch",
19+
"//orchestrator/controller/build",
20+
"//orchestrator/controller/buildsignal",
21+
"//orchestrator/controller/finalize",
22+
"//orchestrator/controller/merge",
23+
"//orchestrator/controller/mergesignal",
1824
"//orchestrator/controller/request",
25+
"//orchestrator/controller/speculate",
1926
"//orchestrator/protopb",
2027
"@com_github_go_sql_driver_mysql//:mysql",
2128
"@com_github_uber_go_tally_v4//:tally",

example/server/orchestrator/main.go

Lines changed: 167 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,14 @@ import (
1717
extqueue "github.com/uber/submitqueue/extension/queue"
1818
queueMySQL "github.com/uber/submitqueue/extension/queue/mysql"
1919
"github.com/uber/submitqueue/orchestrator/controller"
20+
"github.com/uber/submitqueue/orchestrator/controller/batch"
21+
"github.com/uber/submitqueue/orchestrator/controller/build"
22+
"github.com/uber/submitqueue/orchestrator/controller/buildsignal"
23+
"github.com/uber/submitqueue/orchestrator/controller/finalize"
24+
"github.com/uber/submitqueue/orchestrator/controller/merge"
25+
"github.com/uber/submitqueue/orchestrator/controller/mergesignal"
2026
"github.com/uber/submitqueue/orchestrator/controller/request"
27+
"github.com/uber/submitqueue/orchestrator/controller/speculate"
2128
pb "github.com/uber/submitqueue/orchestrator/protopb"
2229
"go.uber.org/zap"
2330
"google.golang.org/grpc"
@@ -116,37 +123,17 @@ func run() error {
116123
subscriberName = fmt.Sprintf("orchestrator-%d", time.Now().Unix())
117124
}
118125

119-
registry := consumer.NewTopicRegistry(
120-
[]consumer.TopicConfig{
121-
{Topic: consumer.TopicRequest, Queue: mysqlQueue},
122-
{Topic: consumer.TopicToBatch, Queue: mysqlQueue},
123-
},
124-
[]extqueue.SubscriptionConfig{
125-
extqueue.DefaultSubscriptionConfig(
126-
consumer.TopicRequest.String(),
127-
subscriberName,
128-
"orchestrator-request",
129-
),
130-
},
131-
)
126+
registry := newTopicRegistry(mysqlQueue, subscriberName)
132127

133128
// Create consumer
134129
c := consumer.New(logger.Sugar(), scope.SubScope("consumer"), registry)
135130

136-
// Register request controller
137-
// Pipeline: request → batch → speculation → build → merge
138-
requestController := request.NewController(
139-
logger.Sugar(),
140-
scope,
141-
registry,
142-
consumer.TopicRequest,
143-
"orchestrator-request",
144-
)
145-
if err := c.Register(requestController); err != nil {
146-
return fmt.Errorf("failed to register request controller: %w", err)
131+
// Register controllers
132+
if err := registerControllers(c, logger.Sugar(), scope, registry); err != nil {
133+
return err
147134
}
148135

149-
logger.Info("controllers registered", zap.Int("count", 1))
136+
logger.Info("controllers registered", zap.Int("count", 8))
150137

151138
// Start consumers
152139
if err := c.Start(ctx); err != nil {
@@ -202,3 +189,158 @@ func run() error {
202189

203190
return err
204191
}
192+
193+
// newTopicRegistry builds the TopicRegistry with all topic and subscription configs.
194+
func newTopicRegistry(q extqueue.Queue, subscriberName string) consumer.TopicRegistry {
195+
return consumer.NewTopicRegistry(
196+
[]consumer.TopicConfig{
197+
{Topic: consumer.TopicRequest, Queue: q},
198+
{Topic: consumer.TopicToBatch, Queue: q},
199+
{Topic: consumer.TopicBatched, Queue: q},
200+
{Topic: consumer.TopicBuild, Queue: q},
201+
{Topic: consumer.TopicBuildSignal, Queue: q},
202+
{Topic: consumer.TopicToMerge, Queue: q},
203+
{Topic: consumer.TopicMergeSignal, Queue: q},
204+
{Topic: consumer.TopicFinalize, Queue: q},
205+
},
206+
[]extqueue.SubscriptionConfig{
207+
extqueue.DefaultSubscriptionConfig(
208+
consumer.TopicRequest.String(),
209+
subscriberName,
210+
"orchestrator-request",
211+
),
212+
extqueue.DefaultSubscriptionConfig(
213+
consumer.TopicToBatch.String(),
214+
subscriberName,
215+
"orchestrator-batch",
216+
),
217+
extqueue.DefaultSubscriptionConfig(
218+
consumer.TopicBatched.String(),
219+
subscriberName,
220+
"orchestrator-speculate",
221+
),
222+
extqueue.DefaultSubscriptionConfig(
223+
consumer.TopicBuild.String(),
224+
subscriberName,
225+
"orchestrator-build",
226+
),
227+
extqueue.DefaultSubscriptionConfig(
228+
consumer.TopicBuildSignal.String(),
229+
subscriberName,
230+
"orchestrator-buildsignal",
231+
),
232+
extqueue.DefaultSubscriptionConfig(
233+
consumer.TopicToMerge.String(),
234+
subscriberName,
235+
"orchestrator-merge",
236+
),
237+
extqueue.DefaultSubscriptionConfig(
238+
consumer.TopicMergeSignal.String(),
239+
subscriberName,
240+
"orchestrator-mergesignal",
241+
),
242+
extqueue.DefaultSubscriptionConfig(
243+
consumer.TopicFinalize.String(),
244+
subscriberName,
245+
"orchestrator-finalize",
246+
),
247+
},
248+
)
249+
}
250+
251+
// registerControllers creates all pipeline controllers and registers them with the consumer.
252+
// Pipeline: request → batch → speculate → build → build-signal
253+
//
254+
// → merge → merge-signal
255+
// finalize (terminal)
256+
func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope tally.Scope, registry consumer.TopicRegistry) error {
257+
requestController := request.NewController(
258+
logger,
259+
scope,
260+
registry,
261+
consumer.TopicRequest,
262+
"orchestrator-request",
263+
)
264+
if err := c.Register(requestController); err != nil {
265+
return fmt.Errorf("failed to register request controller: %w", err)
266+
}
267+
268+
batchController := batch.NewController(
269+
logger,
270+
scope,
271+
registry,
272+
consumer.TopicToBatch,
273+
"orchestrator-batch",
274+
)
275+
if err := c.Register(batchController); err != nil {
276+
return fmt.Errorf("failed to register batch controller: %w", err)
277+
}
278+
279+
speculateController := speculate.NewController(
280+
logger,
281+
scope,
282+
registry,
283+
consumer.TopicBatched,
284+
"orchestrator-speculate",
285+
)
286+
if err := c.Register(speculateController); err != nil {
287+
return fmt.Errorf("failed to register speculate controller: %w", err)
288+
}
289+
290+
buildController := build.NewController(
291+
logger,
292+
scope,
293+
registry,
294+
consumer.TopicBuild,
295+
"orchestrator-build",
296+
)
297+
if err := c.Register(buildController); err != nil {
298+
return fmt.Errorf("failed to register build controller: %w", err)
299+
}
300+
301+
buildSignalController := buildsignal.NewController(
302+
logger,
303+
scope,
304+
registry,
305+
consumer.TopicBuildSignal,
306+
"orchestrator-buildsignal",
307+
)
308+
if err := c.Register(buildSignalController); err != nil {
309+
return fmt.Errorf("failed to register buildsignal controller: %w", err)
310+
}
311+
312+
mergeController := merge.NewController(
313+
logger,
314+
scope,
315+
registry,
316+
consumer.TopicToMerge,
317+
"orchestrator-merge",
318+
)
319+
if err := c.Register(mergeController); err != nil {
320+
return fmt.Errorf("failed to register merge controller: %w", err)
321+
}
322+
323+
mergeSignalController := mergesignal.NewController(
324+
logger,
325+
scope,
326+
registry,
327+
consumer.TopicMergeSignal,
328+
"orchestrator-mergesignal",
329+
)
330+
if err := c.Register(mergeSignalController); err != nil {
331+
return fmt.Errorf("failed to register mergesignal controller: %w", err)
332+
}
333+
334+
finalizeController := finalize.NewController(
335+
logger,
336+
scope,
337+
registry,
338+
consumer.TopicFinalize,
339+
"orchestrator-finalize",
340+
)
341+
if err := c.Register(finalizeController); err != nil {
342+
return fmt.Errorf("failed to register finalize controller: %w", err)
343+
}
344+
345+
return nil
346+
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
load("@rules_go//go:def.bzl", "go_library", "go_test")
2+
3+
go_library(
4+
name = "batch",
5+
srcs = ["batch.go"],
6+
importpath = "github.com/uber/submitqueue/orchestrator/controller/batch",
7+
visibility = ["//visibility:public"],
8+
deps = [
9+
"//core/consumer",
10+
"//entity",
11+
"//entity/queue",
12+
"@com_github_uber_go_tally_v4//:tally",
13+
"@org_uber_go_zap//:zap",
14+
],
15+
)
16+
17+
go_test(
18+
name = "batch_test",
19+
srcs = ["batch_test.go"],
20+
embed = [":batch"],
21+
deps = [
22+
"//core/consumer",
23+
"//entity",
24+
"//entity/queue",
25+
"//extension/queue/mock",
26+
"@com_github_stretchr_testify//assert",
27+
"@com_github_stretchr_testify//require",
28+
"@com_github_uber_go_tally_v4//:tally",
29+
"@org_uber_go_mock//gomock",
30+
"@org_uber_go_zap//zaptest",
31+
],
32+
)

0 commit comments

Comments
 (0)