@@ -17,7 +17,14 @@ import (
1717 extqueue "github.com/uber/submitqueue/extension/queue"
1818 queueMySQL "github.com/uber/submitqueue/extension/queue/mysql"
1919 "github.com/uber/submitqueue/orchestrator/controller"
20+ "github.com/uber/submitqueue/orchestrator/controller/batch"
21+ "github.com/uber/submitqueue/orchestrator/controller/build"
22+ "github.com/uber/submitqueue/orchestrator/controller/buildsignal"
23+ "github.com/uber/submitqueue/orchestrator/controller/finalize"
24+ "github.com/uber/submitqueue/orchestrator/controller/merge"
25+ "github.com/uber/submitqueue/orchestrator/controller/mergesignal"
2026 "github.com/uber/submitqueue/orchestrator/controller/request"
27+ "github.com/uber/submitqueue/orchestrator/controller/speculate"
2128 pb "github.com/uber/submitqueue/orchestrator/protopb"
2229 "go.uber.org/zap"
2330 "google.golang.org/grpc"
@@ -116,37 +123,17 @@ func run() error {
116123 subscriberName = fmt .Sprintf ("orchestrator-%d" , time .Now ().Unix ())
117124 }
118125
119- registry := consumer .NewTopicRegistry (
120- []consumer.TopicConfig {
121- {Topic : consumer .TopicRequest , Queue : mysqlQueue },
122- {Topic : consumer .TopicToBatch , Queue : mysqlQueue },
123- },
124- []extqueue.SubscriptionConfig {
125- extqueue .DefaultSubscriptionConfig (
126- consumer .TopicRequest .String (),
127- subscriberName ,
128- "orchestrator-request" ,
129- ),
130- },
131- )
126+ registry := newTopicRegistry (mysqlQueue , subscriberName )
132127
133128 // Create consumer
134129 c := consumer .New (logger .Sugar (), scope .SubScope ("consumer" ), registry )
135130
136- // Register request controller
137- // Pipeline: request → batch → speculation → build → merge
138- requestController := request .NewController (
139- logger .Sugar (),
140- scope ,
141- registry ,
142- consumer .TopicRequest ,
143- "orchestrator-request" ,
144- )
145- if err := c .Register (requestController ); err != nil {
146- return fmt .Errorf ("failed to register request controller: %w" , err )
131+ // Register controllers
132+ if err := registerControllers (c , logger .Sugar (), scope , registry ); err != nil {
133+ return err
147134 }
148135
149- logger .Info ("controllers registered" , zap .Int ("count" , 1 ))
136+ logger .Info ("controllers registered" , zap .Int ("count" , 8 ))
150137
151138 // Start consumers
152139 if err := c .Start (ctx ); err != nil {
@@ -202,3 +189,158 @@ func run() error {
202189
203190 return err
204191}
192+
193+ // newTopicRegistry builds the TopicRegistry with all topic and subscription configs.
194+ func newTopicRegistry (q extqueue.Queue , subscriberName string ) consumer.TopicRegistry {
195+ return consumer .NewTopicRegistry (
196+ []consumer.TopicConfig {
197+ {Topic : consumer .TopicRequest , Queue : q },
198+ {Topic : consumer .TopicToBatch , Queue : q },
199+ {Topic : consumer .TopicBatched , Queue : q },
200+ {Topic : consumer .TopicBuild , Queue : q },
201+ {Topic : consumer .TopicBuildSignal , Queue : q },
202+ {Topic : consumer .TopicToMerge , Queue : q },
203+ {Topic : consumer .TopicMergeSignal , Queue : q },
204+ {Topic : consumer .TopicFinalize , Queue : q },
205+ },
206+ []extqueue.SubscriptionConfig {
207+ extqueue .DefaultSubscriptionConfig (
208+ consumer .TopicRequest .String (),
209+ subscriberName ,
210+ "orchestrator-request" ,
211+ ),
212+ extqueue .DefaultSubscriptionConfig (
213+ consumer .TopicToBatch .String (),
214+ subscriberName ,
215+ "orchestrator-batch" ,
216+ ),
217+ extqueue .DefaultSubscriptionConfig (
218+ consumer .TopicBatched .String (),
219+ subscriberName ,
220+ "orchestrator-speculate" ,
221+ ),
222+ extqueue .DefaultSubscriptionConfig (
223+ consumer .TopicBuild .String (),
224+ subscriberName ,
225+ "orchestrator-build" ,
226+ ),
227+ extqueue .DefaultSubscriptionConfig (
228+ consumer .TopicBuildSignal .String (),
229+ subscriberName ,
230+ "orchestrator-buildsignal" ,
231+ ),
232+ extqueue .DefaultSubscriptionConfig (
233+ consumer .TopicToMerge .String (),
234+ subscriberName ,
235+ "orchestrator-merge" ,
236+ ),
237+ extqueue .DefaultSubscriptionConfig (
238+ consumer .TopicMergeSignal .String (),
239+ subscriberName ,
240+ "orchestrator-mergesignal" ,
241+ ),
242+ extqueue .DefaultSubscriptionConfig (
243+ consumer .TopicFinalize .String (),
244+ subscriberName ,
245+ "orchestrator-finalize" ,
246+ ),
247+ },
248+ )
249+ }
250+
251+ // registerControllers creates all pipeline controllers and registers them with the consumer.
252+ // Pipeline: request → batch → speculate → build → build-signal
253+ //
254+ // → merge → merge-signal
255+ // finalize (terminal)
256+ func registerControllers (c consumer.Consumer , logger * zap.SugaredLogger , scope tally.Scope , registry consumer.TopicRegistry ) error {
257+ requestController := request .NewController (
258+ logger ,
259+ scope ,
260+ registry ,
261+ consumer .TopicRequest ,
262+ "orchestrator-request" ,
263+ )
264+ if err := c .Register (requestController ); err != nil {
265+ return fmt .Errorf ("failed to register request controller: %w" , err )
266+ }
267+
268+ batchController := batch .NewController (
269+ logger ,
270+ scope ,
271+ registry ,
272+ consumer .TopicToBatch ,
273+ "orchestrator-batch" ,
274+ )
275+ if err := c .Register (batchController ); err != nil {
276+ return fmt .Errorf ("failed to register batch controller: %w" , err )
277+ }
278+
279+ speculateController := speculate .NewController (
280+ logger ,
281+ scope ,
282+ registry ,
283+ consumer .TopicBatched ,
284+ "orchestrator-speculate" ,
285+ )
286+ if err := c .Register (speculateController ); err != nil {
287+ return fmt .Errorf ("failed to register speculate controller: %w" , err )
288+ }
289+
290+ buildController := build .NewController (
291+ logger ,
292+ scope ,
293+ registry ,
294+ consumer .TopicBuild ,
295+ "orchestrator-build" ,
296+ )
297+ if err := c .Register (buildController ); err != nil {
298+ return fmt .Errorf ("failed to register build controller: %w" , err )
299+ }
300+
301+ buildSignalController := buildsignal .NewController (
302+ logger ,
303+ scope ,
304+ registry ,
305+ consumer .TopicBuildSignal ,
306+ "orchestrator-buildsignal" ,
307+ )
308+ if err := c .Register (buildSignalController ); err != nil {
309+ return fmt .Errorf ("failed to register buildsignal controller: %w" , err )
310+ }
311+
312+ mergeController := merge .NewController (
313+ logger ,
314+ scope ,
315+ registry ,
316+ consumer .TopicToMerge ,
317+ "orchestrator-merge" ,
318+ )
319+ if err := c .Register (mergeController ); err != nil {
320+ return fmt .Errorf ("failed to register merge controller: %w" , err )
321+ }
322+
323+ mergeSignalController := mergesignal .NewController (
324+ logger ,
325+ scope ,
326+ registry ,
327+ consumer .TopicMergeSignal ,
328+ "orchestrator-mergesignal" ,
329+ )
330+ if err := c .Register (mergeSignalController ); err != nil {
331+ return fmt .Errorf ("failed to register mergesignal controller: %w" , err )
332+ }
333+
334+ finalizeController := finalize .NewController (
335+ logger ,
336+ scope ,
337+ registry ,
338+ consumer .TopicFinalize ,
339+ "orchestrator-finalize" ,
340+ )
341+ if err := c .Register (finalizeController ); err != nil {
342+ return fmt .Errorf ("failed to register finalize controller: %w" , err )
343+ }
344+
345+ return nil
346+ }
0 commit comments