Skip to content

Commit 61dd1ae

Browse files
authored
refactor(extensions): per-queue Factory; drop queue from method signatures (#188)
## Summary ### Why? Behavioral extensions were global singletons frozen at process start, and two leaked the queue into vendor-agnostic method signatures (`mergechecker.Check`, `buildrunner.Trigger`). Each extension should instead be produced by a Factory the controller asks for using the queue, identified only by name, so an implementation can be selected/configured per queue. ### What? - Reduce `entity.QueueConfig` to `{Name}` (the registry of valid queues); all behavioral/VCS config moves into the factory implementations. - Each extension (changeprovider, pusher, scorer, conflict, mergechecker, buildrunner) ships a narrow `Config{QueueName}` and a `Factory` interface (`For(cfg) (T, error)`); concrete factories are written by integrators in the example wiring and tests. Generated mocks updated. - Drop the queue argument from `mergechecker.Check(ctx, change)` and `buildrunner.Trigger(ctx, base, head, metadata)`; delete `mergechecker.MultiChecker`. `Status`/`Cancel` stay buildID-keyed. - Add `storage.Factory` (default mysql) and thread it into every controller; the gateway `land` controller resolves `For(req.Queue)` — the one place per-queue storage routing is actionable. Orchestrator controllers carry a `TODO(queue-aware)` to derive the queue from the loaded entity for logging/metrics/storage. - Controllers resolve each extension from its Factory using the loaded entity's queue; `buildsignal` fetches the batch before polling `Status`. Example servers wire concrete static factory structs. Stacked on #187 (now merged); rebased onto main. ## Test Plan ✅ `make build`, `make test` (40 pass) ✅ `make fmt` / `lint` / `check-gazelle` / `check-tidy` / `check-mocks` ✅ Integration + e2e (Docker) verified locally: counter, gateway, orchestrator, e2e suites pass
1 parent 7032251 commit 61dd1ae

65 files changed

Lines changed: 936 additions & 424 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -334,7 +334,7 @@ local-stovepipe-gateway-start: build-stovepipe-gateway-linux ## Start Stovepipe
334334

335335
mocks: ## Generate mock files using mockgen
336336
@echo "Generating mocks..."
337-
@$(BAZEL) run @rules_go//go -- generate ./submitqueue/extension/storage/... ./submitqueue/extension/buildrunner/... ./submitqueue/extension/changestore/... ./extension/counter/... ./extension/messagequeue/... ./submitqueue/extension/queueconfig/... ./submitqueue/extension/mergechecker/... ./submitqueue/extension/pusher/... ./submitqueue/extension/scorer/... ./submitqueue/extension/conflict/... ./submitqueue/core/consumer/...
337+
@$(BAZEL) run @rules_go//go -- generate ./submitqueue/extension/storage/... ./submitqueue/extension/buildrunner/... ./submitqueue/extension/changeprovider/... ./submitqueue/extension/changestore/... ./extension/counter/... ./extension/messagequeue/... ./submitqueue/extension/queueconfig/... ./submitqueue/extension/mergechecker/... ./submitqueue/extension/pusher/... ./submitqueue/extension/scorer/... ./submitqueue/extension/conflict/... ./submitqueue/core/consumer/...
338338
@echo "Mocks generated successfully!"
339339

340340
proto: ## Generate protobuf files from .proto definitions

example/submitqueue/gateway/server/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ go_library(
1515
"//extension/messagequeue/mysql",
1616
"//submitqueue/core/consumer",
1717
"//submitqueue/extension/queueconfig/yaml",
18+
"//submitqueue/extension/storage",
1819
"//submitqueue/extension/storage/mysql",
1920
"//submitqueue/gateway/controller",
2021
"//submitqueue/gateway/protopb",

example/submitqueue/gateway/server/main.go

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import (
3232
queueMySQL "github.com/uber/submitqueue/extension/messagequeue/mysql"
3333
"github.com/uber/submitqueue/submitqueue/core/consumer"
3434
yamlqueueconfig "github.com/uber/submitqueue/submitqueue/extension/queueconfig/yaml"
35+
"github.com/uber/submitqueue/submitqueue/extension/storage"
3536
mysqlstorage "github.com/uber/submitqueue/submitqueue/extension/storage/mysql"
3637
"github.com/uber/submitqueue/submitqueue/gateway/controller"
3738
pb "github.com/uber/submitqueue/submitqueue/gateway/protopb"
@@ -199,8 +200,15 @@ func run() error {
199200
},
200201
))
201202

202-
// Initialize request log store from shared app database connection
203-
requestLogStore := mysqlstorage.NewRequestLogStore(appDB, scope.SubScope("request_log_store"))
203+
// Initialize storage from the shared app database connection. The land
204+
// controller takes a storage.Factory (static: every queue → this store);
205+
// cancel/status use the request log store directly.
206+
store, err := mysqlstorage.NewStorage(appDB, scope.SubScope("storage"))
207+
if err != nil {
208+
return fmt.Errorf("failed to create storage: %w", err)
209+
}
210+
stores := storage.NewStaticFactory(store)
211+
requestLogStore := store.GetRequestLogStore()
204212

205213
// Load queue configurations from YAML. Path is required so the gateway
206214
// can reject requests for unknown queues at the edge.
@@ -215,7 +223,7 @@ func run() error {
215223

216224
// Create controllers and wrap them for gRPC
217225
pingController := controller.NewPingController(logger, scope)
218-
landController := controller.NewLandController(logger.Sugar(), scope, cnt, requestLogStore, queueConfigs, registry)
226+
landController := controller.NewLandController(logger.Sugar(), scope, cnt, stores, queueConfigs, registry)
219227
cancelController := controller.NewCancelController(logger.Sugar(), scope, requestLogStore, registry)
220228
statusController := controller.NewStatusController(logger.Sugar(), scope, requestLogStore)
221229
gatewayServer := &GatewayServer{
Lines changed: 4 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,31 +1,9 @@
11
# Example queue configurations consumed by the gateway YAML store.
2-
# The gateway loads this file at startup; QUEUE_CONFIG_PATH must point
3-
# at it. Each entry maps a queue name to the VCS repository + target
4-
# branch and selects the extension implementations used downstream.
2+
# The gateway loads this file at startup; QUEUE_CONFIG_PATH must point at it.
3+
# QueueConfig is just the registry of valid queue names — all behavioral and
4+
# VCS configuration lives in the extension factory implementations wired in the
5+
# server, not here.
56
queues:
67
- name: test-queue
7-
vcs_type: git
8-
vcs_address: git@github.com:uber/submitqueue.git
9-
target: main
10-
build_runner: buildkite.com/uber/submitqueue-ci
11-
change_provider: github
12-
merge_checker: github
13-
land_provider: github
14-
158
- name: e2e-test-queue
16-
vcs_type: git
17-
vcs_address: git@github.com:uber/submitqueue.git
18-
target: main
19-
build_runner: buildkite.com/uber/submitqueue-ci
20-
change_provider: github
21-
merge_checker: github
22-
land_provider: github
23-
249
- name: e2e-cancel-queue
25-
vcs_type: git
26-
vcs_address: git@github.com:uber/submitqueue.git
27-
target: main
28-
build_runner: buildkite.com/uber/submitqueue-ci
29-
change_provider: github
30-
merge_checker: github
31-
land_provider: github

example/submitqueue/orchestrator/server/BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,13 @@ go_library(
2626
"//submitqueue/extension/changeprovider/github",
2727
"//submitqueue/extension/changestore",
2828
"//submitqueue/extension/changestore/mysql",
29+
"//submitqueue/extension/conflict",
2930
"//submitqueue/extension/conflict/all",
3031
"//submitqueue/extension/mergechecker",
3132
"//submitqueue/extension/mergechecker/github",
3233
"//submitqueue/extension/pusher",
3334
"//submitqueue/extension/pusher/git",
35+
"//submitqueue/extension/scorer",
3436
"//submitqueue/extension/scorer/heuristic",
3537
"//submitqueue/extension/storage",
3638
"//submitqueue/extension/storage/mysql",

example/submitqueue/orchestrator/server/main.go

Lines changed: 59 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -45,11 +45,13 @@ import (
4545
githubprovider "github.com/uber/submitqueue/submitqueue/extension/changeprovider/github"
4646
"github.com/uber/submitqueue/submitqueue/extension/changestore"
4747
mysqlchangestore "github.com/uber/submitqueue/submitqueue/extension/changestore/mysql"
48+
"github.com/uber/submitqueue/submitqueue/extension/conflict"
4849
"github.com/uber/submitqueue/submitqueue/extension/conflict/all"
4950
"github.com/uber/submitqueue/submitqueue/extension/mergechecker"
5051
githubchecker "github.com/uber/submitqueue/submitqueue/extension/mergechecker/github"
5152
"github.com/uber/submitqueue/submitqueue/extension/pusher"
5253
gitpusher "github.com/uber/submitqueue/submitqueue/extension/pusher/git"
54+
"github.com/uber/submitqueue/submitqueue/extension/scorer"
5355
"github.com/uber/submitqueue/submitqueue/extension/scorer/heuristic"
5456
"github.com/uber/submitqueue/submitqueue/extension/storage"
5557
mysqlstorage "github.com/uber/submitqueue/submitqueue/extension/storage/mysql"
@@ -425,11 +427,48 @@ func newTopicRegistry(q extqueue.Queue, subscriberName string) (consumer.TopicRe
425427
// │ │ │
426428
// └────────┴───────────────────────┘
427429

430+
// Static per-extension factories for the example server: every queue resolves
431+
// to the single configured implementation. A real deployment would vary the
432+
// returned implementation by cfg.QueueName (and inject per-queue config).
433+
type changeProviderFactory struct{ impl changeprovider.ChangeProvider }
434+
435+
func (f changeProviderFactory) For(changeprovider.Config) (changeprovider.ChangeProvider, error) {
436+
return f.impl, nil
437+
}
438+
439+
type mergeCheckerFactory struct{ impl mergechecker.MergeChecker }
440+
441+
func (f mergeCheckerFactory) For(mergechecker.Config) (mergechecker.MergeChecker, error) {
442+
return f.impl, nil
443+
}
444+
445+
type pusherFactory struct{ impl pusher.Pusher }
446+
447+
func (f pusherFactory) For(pusher.Config) (pusher.Pusher, error) { return f.impl, nil }
448+
449+
type buildRunnerFactory struct{ impl buildrunner.BuildRunner }
450+
451+
func (f buildRunnerFactory) For(buildrunner.Config) (buildrunner.BuildRunner, error) {
452+
return f.impl, nil
453+
}
454+
455+
type scorerFactory struct{ impl scorer.Scorer }
456+
457+
func (f scorerFactory) For(scorer.Config) (scorer.Scorer, error) { return f.impl, nil }
458+
459+
type conflictFactory struct{ impl conflict.Analyzer }
460+
461+
func (f conflictFactory) For(conflict.Config) (conflict.Analyzer, error) { return f.impl, nil }
462+
428463
func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope tally.Scope, registry consumer.TopicRegistry, mc mergechecker.MergeChecker, cp changeprovider.ChangeProvider, psh pusher.Pusher, br buildrunner.BuildRunner, cnt counter.Counter, store storage.Storage, changeStore changestore.ChangeStore) error {
464+
// Static storage factory: every queue resolves to the one configured store.
465+
// The factory is the injection point for future per-queue backends.
466+
stores := storage.NewStaticFactory(store)
467+
429468
requestController := start.NewController(
430469
logger,
431470
scope,
432-
store,
471+
stores,
433472
changeStore,
434473
registry,
435474
consumer.TopicKeyStart,
@@ -442,7 +481,7 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t
442481
cancelController := cancel.NewController(
443482
logger,
444483
scope,
445-
store,
484+
stores,
446485
registry,
447486
consumer.TopicKeyCancel,
448487
"orchestrator-cancel",
@@ -454,11 +493,11 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t
454493
validateController := validate.NewController(
455494
logger,
456495
scope,
457-
store,
496+
stores,
458497
changeStore,
459498
registry,
460-
mc,
461-
cp,
499+
mergeCheckerFactory{impl: mc},
500+
changeProviderFactory{impl: cp},
462501
consumer.TopicKeyValidate,
463502
"orchestrator-validate",
464503
)
@@ -471,10 +510,10 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t
471510
scope,
472511
registry,
473512
cnt,
474-
store,
513+
stores,
475514
// TODO: replace with a real conflict analyzer (e.g. one backed by
476515
// Tango target analysis). The "all" stub serializes the queue.
477-
all.New(),
516+
conflictFactory{impl: all.New()},
478517
consumer.TopicKeyBatch,
479518
"orchestrator-batch",
480519
)
@@ -485,9 +524,9 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t
485524
scoreController := score.NewController(
486525
logger,
487526
scope,
488-
store,
527+
stores,
489528
// TODO: replace with a real scorer
490-
heuristic.New(
529+
scorerFactory{impl: heuristic.New(
491530
[]heuristic.Bucket{
492531
{Min: 0, Max: 1, Score: 0.95},
493532
{Min: 2, Max: 5, Score: 0.80},
@@ -498,7 +537,7 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t
498537
return len(change.URIs), nil
499538
},
500539
scope.SubScope("scorer"),
501-
),
540+
)},
502541
registry,
503542
consumer.TopicKeyScore,
504543
"orchestrator-score",
@@ -510,7 +549,7 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t
510549
speculateController := speculate.NewController(
511550
logger,
512551
scope,
513-
store,
552+
stores,
514553
registry,
515554
consumer.TopicKeySpeculate,
516555
"orchestrator-speculate",
@@ -522,8 +561,8 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t
522561
buildController := build.NewController(
523562
logger,
524563
scope,
525-
store,
526-
br,
564+
stores,
565+
buildRunnerFactory{impl: br},
527566
registry,
528567
consumer.TopicKeyBuild,
529568
"orchestrator-build",
@@ -535,8 +574,8 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t
535574
buildsignalController := buildsignal.NewController(
536575
logger,
537576
scope,
538-
store,
539-
br,
577+
stores,
578+
buildRunnerFactory{impl: br},
540579
registry,
541580
consumer.TopicKeyBuildSignal,
542581
"orchestrator-buildsignal",
@@ -548,9 +587,9 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t
548587
mergeController := merge.NewController(
549588
logger,
550589
scope,
551-
store,
590+
stores,
552591
registry,
553-
psh,
592+
pusherFactory{impl: psh},
554593
consumer.TopicKeyMerge,
555594
"orchestrator-merge",
556595
)
@@ -561,7 +600,7 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t
561600
concludeController := conclude.NewController(
562601
logger,
563602
scope,
564-
store,
603+
stores,
565604
registry,
566605
consumer.TopicKeyConclude,
567606
"orchestrator-conclude",
@@ -573,7 +612,7 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t
573612
logController := logctrl.NewController(
574613
logger,
575614
scope,
576-
store,
615+
stores,
577616
consumer.TopicKeyLog,
578617
"orchestrator-log",
579618
)
@@ -619,14 +658,10 @@ func newMergeChecker(logger *zap.Logger, scope tally.Scope) (mergechecker.MergeC
619658

620659
client.Timeout = parseTimeout(os.Getenv("GITHUB_TIMEOUT"), 30*time.Second)
621660

622-
github := githubchecker.NewMergeChecker(githubchecker.Params{
661+
return githubchecker.NewMergeChecker(githubchecker.Params{
623662
HTTPClient: client,
624663
Logger: logger.Sugar(),
625664
MetricsScope: scope.SubScope("mergechecker"),
626-
})
627-
628-
return mergechecker.NewMultiChecker(map[string]mergechecker.MergeChecker{
629-
"github": github,
630665
}), nil
631666
}
632667

submitqueue/entity/queue_config.go

Lines changed: 5 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -14,52 +14,13 @@
1414

1515
package entity
1616

17-
// QueueConfig holds the configuration for a single submit queue.
18-
// Each queue maps a VCS repository + target to a processing pipeline.
19-
// A repository can have multiple queues, but each queue has exactly one target.
20-
// Immutable after creation.
17+
// QueueConfig identifies a single submit queue. It is the registry of valid
18+
// queue names; the gateway validates that a land request targets a known queue.
19+
// All behavioral and VCS configuration lives in the extension factory
20+
// implementations, which are constructed per integrator deployment — the system
21+
// hands a factory only the queue name. Immutable after creation.
2122
type QueueConfig struct {
2223
// Name uniquely identifies this queue within the system.
2324
// Referenced by Request.Queue.
2425
Name string `json:"name" yaml:"name"`
25-
26-
// VCSType identifies the version control system (e.g., "git", "svn", "perforce").
27-
// A queue operates on exactly one VCS.
28-
VCSType string `json:"vcs_type" yaml:"vcs_type"`
29-
30-
// VCSAddress identifies the repository in the version control system.
31-
// The format is VCS-specific:
32-
// - Git: remote URL (e.g., "git@github.com:uber/submitqueue.git")
33-
// - Perforce: depot path (e.g., "//depot/project")
34-
// - SVN: repository URL (e.g., "https://svn.example.com/repos/project")
35-
VCSAddress string `json:"vcs_address" yaml:"vcs_address"`
36-
37-
// Target is the landing target where changes are merged.
38-
// The format is VCS-specific:
39-
// - Git: branch ref (e.g., "main", "release/v2")
40-
// - Perforce: stream or depot path (e.g., "//depot/main/...")
41-
// - SVN: repository path (e.g., "trunk/")
42-
Target string `json:"target" yaml:"target"`
43-
44-
// BuildRunner identifies the CI pipeline or job that runs builds for this queue.
45-
// Opaque to the system; meaningful only to the build runner extension implementation.
46-
// Examples:
47-
// - Buildkite: "buildkite.com/uber/submitqueue-ci"
48-
// - Jenkins: "jenkins.example.com/job/submitqueue-verify"
49-
BuildRunner string `json:"build_runner" yaml:"build_runner"`
50-
51-
// ChangeProvider identifies the change provider implementation for this queue.
52-
// Opaque to the system; meaningful only to the change provider extension implementation.
53-
// Examples: "github", "gitlab", "phabricator"
54-
ChangeProvider string `json:"change_provider" yaml:"change_provider"`
55-
56-
// MergeChecker identifies the merge checker implementation for this queue.
57-
// Opaque to the system; meaningful only to the merge checker extension implementation.
58-
// Examples: "github", "gitlab"
59-
MergeChecker string `json:"merge_checker" yaml:"merge_checker"`
60-
61-
// LandProvider identifies the land provider implementation for this queue.
62-
// Opaque to the system; meaningful only to the land provider extension implementation.
63-
// Examples: "github", "gitlab"
64-
LandProvider string `json:"land_provider" yaml:"land_provider"`
6526
}

submitqueue/extension/buildrunner/build_runner.go

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,11 +55,10 @@ type BuildRunner interface {
5555
// asynchronously. Callers learn the build's progress via Status, not
5656
// via Trigger.
5757
//
58-
// queueName selects the runner-specific job configuration.
59-
// Returns an error if the request is invalid.
58+
// The runner is already bound to its queue's job configuration by the
59+
// Factory that built it. Returns an error if the request is invalid.
6060
Trigger(
6161
ctx context.Context,
62-
queueName string,
6362
base []entity.Change,
6463
head []entity.Change,
6564
metadata entity.BuildMetadata,
@@ -80,3 +79,18 @@ type BuildRunner interface {
8079
// on already-terminal builds. Returns an error if the build does not exist.
8180
Cancel(ctx context.Context, buildID entity.BuildID) error
8281
}
82+
83+
// Config carries the per-queue identity handed to a Factory. The system knows
84+
// only the queue name; everything an implementation needs (endpoint, pipeline,
85+
// credentials) is injected at construction by the integrator.
86+
type Config struct {
87+
// QueueName identifies the queue this BuildRunner serves.
88+
QueueName string
89+
}
90+
91+
// Factory builds the BuildRunner for a queue. Implementations are provided by
92+
// integrators (and tests) and inject whatever they need at construction.
93+
type Factory interface {
94+
// For returns the BuildRunner for the given queue.
95+
For(cfg Config) (BuildRunner, error)
96+
}

submitqueue/extension/buildrunner/mock/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ go_library(
77
visibility = ["//visibility:public"],
88
deps = [
99
"//submitqueue/entity",
10+
"//submitqueue/extension/buildrunner",
1011
"@org_uber_go_mock//gomock",
1112
],
1213
)

0 commit comments

Comments
 (0)