Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions core/consumer/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,18 @@ const (
TopicRequest Topic = "request"
// TopicToBatch is where validated requests are published for batching.
TopicToBatch Topic = "to-batch"
Comment thread
behinddwalls marked this conversation as resolved.
// TopicBatched is where batched requests are published for speculation.
TopicBatched Topic = "batched"
Comment thread
behinddwalls marked this conversation as resolved.
// TopicBuild is where requests are published for builds.
TopicBuild Topic = "build"
// TopicBuildSignal is where build signals are published for processing.
TopicBuildSignal Topic = "build-signal"
// TopicToMerge is where requests are published for merging.
TopicToMerge Topic = "to-merge"
// TopicMergeSignal is where merge signals are published for processing.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

from the description, not clear the difference from the previous one

TopicMergeSignal Topic = "merge-signal"
// TopicFinalize is where requests are published for finalization.
TopicFinalize Topic = "finalize"
)

// String returns the topic name as a string.
Expand Down
7 changes: 7 additions & 0 deletions example/server/orchestrator/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,14 @@ go_library(
"//extension/queue",
"//extension/queue/mysql",
"//orchestrator/controller",
"//orchestrator/controller/batch",
"//orchestrator/controller/build",
"//orchestrator/controller/buildsignal",
"//orchestrator/controller/finalize",
"//orchestrator/controller/merge",
"//orchestrator/controller/mergesignal",
"//orchestrator/controller/request",
"//orchestrator/controller/speculate",
"//orchestrator/protopb",
"@com_github_go_sql_driver_mysql//:mysql",
"@com_github_uber_go_tally_v4//:tally",
Expand Down
192 changes: 167 additions & 25 deletions example/server/orchestrator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,14 @@ import (
extqueue "github.com/uber/submitqueue/extension/queue"
queueMySQL "github.com/uber/submitqueue/extension/queue/mysql"
"github.com/uber/submitqueue/orchestrator/controller"
"github.com/uber/submitqueue/orchestrator/controller/batch"
"github.com/uber/submitqueue/orchestrator/controller/build"
"github.com/uber/submitqueue/orchestrator/controller/buildsignal"
"github.com/uber/submitqueue/orchestrator/controller/finalize"
"github.com/uber/submitqueue/orchestrator/controller/merge"
"github.com/uber/submitqueue/orchestrator/controller/mergesignal"
"github.com/uber/submitqueue/orchestrator/controller/request"
"github.com/uber/submitqueue/orchestrator/controller/speculate"
pb "github.com/uber/submitqueue/orchestrator/protopb"
"go.uber.org/zap"
"google.golang.org/grpc"
Expand Down Expand Up @@ -116,37 +123,17 @@ func run() error {
subscriberName = fmt.Sprintf("orchestrator-%d", time.Now().Unix())
}

registry := consumer.NewTopicRegistry(
[]consumer.TopicConfig{
{Topic: consumer.TopicRequest, Queue: mysqlQueue},
{Topic: consumer.TopicToBatch, Queue: mysqlQueue},
},
[]extqueue.SubscriptionConfig{
extqueue.DefaultSubscriptionConfig(
consumer.TopicRequest.String(),
subscriberName,
"orchestrator-request",
),
},
)
registry := newTopicRegistry(mysqlQueue, subscriberName)

// Create consumer
c := consumer.New(logger.Sugar(), scope.SubScope("consumer"), registry)

// Register request controller
// Pipeline: request → batch → speculation → build → merge
requestController := request.NewController(
logger.Sugar(),
scope,
registry,
consumer.TopicRequest,
"orchestrator-request",
)
if err := c.Register(requestController); err != nil {
return fmt.Errorf("failed to register request controller: %w", err)
// Register controllers
if err := registerControllers(c, logger.Sugar(), scope, registry); err != nil {
return err
}

logger.Info("controllers registered", zap.Int("count", 1))
logger.Info("controllers registered", zap.Int("count", 8))

// Start consumers
if err := c.Start(ctx); err != nil {
Expand Down Expand Up @@ -202,3 +189,158 @@ func run() error {

return err
}

// newTopicRegistry builds the TopicRegistry with all topic and subscription configs.
func newTopicRegistry(q extqueue.Queue, subscriberName string) consumer.TopicRegistry {
return consumer.NewTopicRegistry(
[]consumer.TopicConfig{
{Topic: consumer.TopicRequest, Queue: q},
{Topic: consumer.TopicToBatch, Queue: q},
{Topic: consumer.TopicBatched, Queue: q},
{Topic: consumer.TopicBuild, Queue: q},
{Topic: consumer.TopicBuildSignal, Queue: q},
{Topic: consumer.TopicToMerge, Queue: q},
{Topic: consumer.TopicMergeSignal, Queue: q},
{Topic: consumer.TopicFinalize, Queue: q},
},
[]extqueue.SubscriptionConfig{
extqueue.DefaultSubscriptionConfig(
consumer.TopicRequest.String(),
subscriberName,
"orchestrator-request",
),
extqueue.DefaultSubscriptionConfig(
consumer.TopicToBatch.String(),
subscriberName,
"orchestrator-batch",
),
extqueue.DefaultSubscriptionConfig(
consumer.TopicBatched.String(),
subscriberName,
"orchestrator-speculate",
),
extqueue.DefaultSubscriptionConfig(
consumer.TopicBuild.String(),
subscriberName,
"orchestrator-build",
),
extqueue.DefaultSubscriptionConfig(
consumer.TopicBuildSignal.String(),
subscriberName,
"orchestrator-buildsignal",
),
extqueue.DefaultSubscriptionConfig(
consumer.TopicToMerge.String(),
subscriberName,
"orchestrator-merge",
),
extqueue.DefaultSubscriptionConfig(
consumer.TopicMergeSignal.String(),
subscriberName,
"orchestrator-mergesignal",
),
extqueue.DefaultSubscriptionConfig(
consumer.TopicFinalize.String(),
subscriberName,
"orchestrator-finalize",
),
},
)
}

// registerControllers creates all pipeline controllers and registers them with the consumer.
// Pipeline: request → batch → speculate → build → build-signal
//
// → merge → merge-signal
// finalize (terminal)
func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope tally.Scope, registry consumer.TopicRegistry) error {
requestController := request.NewController(
logger,
scope,
registry,
consumer.TopicRequest,
"orchestrator-request",
)
if err := c.Register(requestController); err != nil {
return fmt.Errorf("failed to register request controller: %w", err)
}

batchController := batch.NewController(
logger,
scope,
registry,
consumer.TopicToBatch,
"orchestrator-batch",
)
if err := c.Register(batchController); err != nil {
return fmt.Errorf("failed to register batch controller: %w", err)
}

speculateController := speculate.NewController(
logger,
scope,
registry,
consumer.TopicBatched,
"orchestrator-speculate",
)
if err := c.Register(speculateController); err != nil {
return fmt.Errorf("failed to register speculate controller: %w", err)
}

buildController := build.NewController(
logger,
scope,
registry,
consumer.TopicBuild,
"orchestrator-build",
)
if err := c.Register(buildController); err != nil {
return fmt.Errorf("failed to register build controller: %w", err)
}

buildSignalController := buildsignal.NewController(
logger,
scope,
registry,
consumer.TopicBuildSignal,
"orchestrator-buildsignal",
)
if err := c.Register(buildSignalController); err != nil {
return fmt.Errorf("failed to register buildsignal controller: %w", err)
}

mergeController := merge.NewController(
logger,
scope,
registry,
consumer.TopicToMerge,
"orchestrator-merge",
)
if err := c.Register(mergeController); err != nil {
return fmt.Errorf("failed to register merge controller: %w", err)
}

mergeSignalController := mergesignal.NewController(
logger,
scope,
registry,
consumer.TopicMergeSignal,
"orchestrator-mergesignal",
)
if err := c.Register(mergeSignalController); err != nil {
return fmt.Errorf("failed to register mergesignal controller: %w", err)
}

finalizeController := finalize.NewController(
logger,
scope,
registry,
consumer.TopicFinalize,
"orchestrator-finalize",
)
if err := c.Register(finalizeController); err != nil {
return fmt.Errorf("failed to register finalize controller: %w", err)
}

return nil
}
32 changes: 32 additions & 0 deletions orchestrator/controller/batch/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
load("@rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "batch",
srcs = ["batch.go"],
importpath = "github.com/uber/submitqueue/orchestrator/controller/batch",
visibility = ["//visibility:public"],
deps = [
"//core/consumer",
"//entity",
"//entity/queue",
"@com_github_uber_go_tally_v4//:tally",
"@org_uber_go_zap//:zap",
],
)

go_test(
name = "batch_test",
srcs = ["batch_test.go"],
embed = [":batch"],
deps = [
"//core/consumer",
"//entity",
"//entity/queue",
"//extension/queue/mock",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
"@com_github_uber_go_tally_v4//:tally",
"@org_uber_go_mock//gomock",
"@org_uber_go_zap//zaptest",
],
)
Loading