Skip to content

Commit 365fd12

Browse files
committed
refactor(storage): revert per-queue Factory; storage is global
## Summary ### Why? PR #188 added `storage.Factory` (`For(queue) (Storage, error)`) and threaded it into every controller, leaving a `TODO(queue-aware)` in all 11 orchestrator controllers: each fell back to `stores.For("")` because storage is needed *before* the by-ID entity load, so the queue isn't known yet. That chicken-and-egg was the sole reason for the TODO. Behavioral extensions (mergechecker, buildrunner, scorer, conflict, changeprovider, pusher) are already correctly per-queue — resolved *after* the load from the loaded entity's queue. Storage, by contrast, does not vary by queue: every queue shares the same entity schema and tables. Per-queue storage, if ever needed, is better achieved by splitting the environment and wiring different stores per deployment, not an in-process Factory. ### What? - Remove the `storage.Factory` interface and delete the `staticFactory` / `NewStaticFactory` implementation (+ its test). Regenerate the storage mock (drops `MockFactory`). - Switch the 11 orchestrator controllers and the gateway `land` controller from `storage.Factory` to a single injected `storage.Storage`; drop the `stores.For("")` hack and the `TODO(queue-aware)` comments. Queue for logging and behavioral-extension resolution continues to come from the loaded entity. - Update example wiring (gateway + orchestrator `main.go`) to pass the store directly instead of `NewStaticFactory(store)`. - Behavioral-extension Factories are unchanged. No behavior change under today's single-store deployment — `For("")` already returned this exact store. ## Test Plan ✅ `make build`, `make test` (39 pass) ✅ `make gazelle` / `make mocks` / `make tidy` / `make fmt` (all in sync)
1 parent 61dd1ae commit 365fd12

36 files changed

Lines changed: 75 additions & 297 deletions

File tree

example/submitqueue/gateway/server/BUILD.bazel

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ go_library(
1515
"//extension/messagequeue/mysql",
1616
"//submitqueue/core/consumer",
1717
"//submitqueue/extension/queueconfig/yaml",
18-
"//submitqueue/extension/storage",
1918
"//submitqueue/extension/storage/mysql",
2019
"//submitqueue/gateway/controller",
2120
"//submitqueue/gateway/protopb",

example/submitqueue/gateway/server/main.go

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@ import (
3232
queueMySQL "github.com/uber/submitqueue/extension/messagequeue/mysql"
3333
"github.com/uber/submitqueue/submitqueue/core/consumer"
3434
yamlqueueconfig "github.com/uber/submitqueue/submitqueue/extension/queueconfig/yaml"
35-
"github.com/uber/submitqueue/submitqueue/extension/storage"
3635
mysqlstorage "github.com/uber/submitqueue/submitqueue/extension/storage/mysql"
3736
"github.com/uber/submitqueue/submitqueue/gateway/controller"
3837
pb "github.com/uber/submitqueue/submitqueue/gateway/protopb"
@@ -201,13 +200,12 @@ func run() error {
201200
))
202201

203202
// Initialize storage from the shared app database connection. The land
204-
// controller takes a storage.Factory (static: every queue → this store);
205-
// cancel/status use the request log store directly.
203+
// controller writes to this store directly; cancel/status use the request
204+
// log store directly.
206205
store, err := mysqlstorage.NewStorage(appDB, scope.SubScope("storage"))
207206
if err != nil {
208207
return fmt.Errorf("failed to create storage: %w", err)
209208
}
210-
stores := storage.NewStaticFactory(store)
211209
requestLogStore := store.GetRequestLogStore()
212210

213211
// Load queue configurations from YAML. Path is required so the gateway
@@ -223,7 +221,7 @@ func run() error {
223221

224222
// Create controllers and wrap them for gRPC
225223
pingController := controller.NewPingController(logger, scope)
226-
landController := controller.NewLandController(logger.Sugar(), scope, cnt, stores, queueConfigs, registry)
224+
landController := controller.NewLandController(logger.Sugar(), scope, cnt, store, queueConfigs, registry)
227225
cancelController := controller.NewCancelController(logger.Sugar(), scope, requestLogStore, registry)
228226
statusController := controller.NewStatusController(logger.Sugar(), scope, requestLogStore)
229227
gatewayServer := &GatewayServer{

example/submitqueue/orchestrator/server/main.go

Lines changed: 11 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -461,14 +461,10 @@ type conflictFactory struct{ impl conflict.Analyzer }
461461
func (f conflictFactory) For(conflict.Config) (conflict.Analyzer, error) { return f.impl, nil }
462462

463463
func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope tally.Scope, registry consumer.TopicRegistry, mc mergechecker.MergeChecker, cp changeprovider.ChangeProvider, psh pusher.Pusher, br buildrunner.BuildRunner, cnt counter.Counter, store storage.Storage, changeStore changestore.ChangeStore) error {
464-
// Static storage factory: every queue resolves to the one configured store.
465-
// The factory is the injection point for future per-queue backends.
466-
stores := storage.NewStaticFactory(store)
467-
468464
requestController := start.NewController(
469465
logger,
470466
scope,
471-
stores,
467+
store,
472468
changeStore,
473469
registry,
474470
consumer.TopicKeyStart,
@@ -481,7 +477,7 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t
481477
cancelController := cancel.NewController(
482478
logger,
483479
scope,
484-
stores,
480+
store,
485481
registry,
486482
consumer.TopicKeyCancel,
487483
"orchestrator-cancel",
@@ -493,7 +489,7 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t
493489
validateController := validate.NewController(
494490
logger,
495491
scope,
496-
stores,
492+
store,
497493
changeStore,
498494
registry,
499495
mergeCheckerFactory{impl: mc},
@@ -510,7 +506,7 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t
510506
scope,
511507
registry,
512508
cnt,
513-
stores,
509+
store,
514510
// TODO: replace with a real conflict analyzer (e.g. one backed by
515511
// Tango target analysis). The "all" stub serializes the queue.
516512
conflictFactory{impl: all.New()},
@@ -524,7 +520,7 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t
524520
scoreController := score.NewController(
525521
logger,
526522
scope,
527-
stores,
523+
store,
528524
// TODO: replace with a real scorer
529525
scorerFactory{impl: heuristic.New(
530526
[]heuristic.Bucket{
@@ -549,7 +545,7 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t
549545
speculateController := speculate.NewController(
550546
logger,
551547
scope,
552-
stores,
548+
store,
553549
registry,
554550
consumer.TopicKeySpeculate,
555551
"orchestrator-speculate",
@@ -561,7 +557,7 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t
561557
buildController := build.NewController(
562558
logger,
563559
scope,
564-
stores,
560+
store,
565561
buildRunnerFactory{impl: br},
566562
registry,
567563
consumer.TopicKeyBuild,
@@ -574,7 +570,7 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t
574570
buildsignalController := buildsignal.NewController(
575571
logger,
576572
scope,
577-
stores,
573+
store,
578574
buildRunnerFactory{impl: br},
579575
registry,
580576
consumer.TopicKeyBuildSignal,
@@ -587,7 +583,7 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t
587583
mergeController := merge.NewController(
588584
logger,
589585
scope,
590-
stores,
586+
store,
591587
registry,
592588
pusherFactory{impl: psh},
593589
consumer.TopicKeyMerge,
@@ -600,7 +596,7 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t
600596
concludeController := conclude.NewController(
601597
logger,
602598
scope,
603-
stores,
599+
store,
604600
registry,
605601
consumer.TopicKeyConclude,
606602
"orchestrator-conclude",
@@ -612,7 +608,7 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t
612608
logController := logctrl.NewController(
613609
logger,
614610
scope,
615-
stores,
611+
store,
616612
consumer.TopicKeyLog,
617613
"orchestrator-log",
618614
)
Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
load("@rules_go//go:def.bzl", "go_library", "go_test")
1+
load("@rules_go//go:def.bzl", "go_library")
22

33
go_library(
44
name = "storage",
@@ -7,7 +7,6 @@ go_library(
77
"batch_store.go",
88
"build_store.go",
99
"change_provider_store.go",
10-
"factory.go",
1110
"request_log_store.go",
1211
"request_store.go",
1312
"speculation_tree_store.go",
@@ -17,13 +16,3 @@ go_library(
1716
visibility = ["//visibility:public"],
1817
deps = ["//submitqueue/entity"],
1918
)
20-
21-
go_test(
22-
name = "storage_test",
23-
srcs = ["factory_test.go"],
24-
embed = [":storage"],
25-
deps = [
26-
"@com_github_stretchr_testify//assert",
27-
"@com_github_stretchr_testify//require",
28-
],
29-
)

submitqueue/extension/storage/factory.go

Lines changed: 0 additions & 34 deletions
This file was deleted.

submitqueue/extension/storage/factory_test.go

Lines changed: 0 additions & 48 deletions
This file was deleted.

submitqueue/extension/storage/mock/storage_mock.go

Lines changed: 0 additions & 39 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

submitqueue/extension/storage/storage.go

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -68,15 +68,3 @@ type Storage interface {
6868
// Close closes the storage and all underlying connections. Should only be called once at the end of the program.
6969
Close() error
7070
}
71-
72-
// Factory returns the Storage backing a named queue. It exists so a single
73-
// queue can be migrated to a different backend without affecting others; the
74-
// default implementation (NewStaticFactory) returns the same Storage for
75-
// every queue. Callers resolve the Storage per message from the queue name
76-
// carried in the message envelope, before any entity lookup.
77-
type Factory interface {
78-
// For returns the Storage for the named queue. An empty name selects the
79-
// default backend. It returns an error if no backend is configured for the
80-
// queue.
81-
For(name string) (Storage, error)
82-
}

submitqueue/gateway/controller/land.go

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -64,20 +64,20 @@ type LandController struct {
6464
logger *zap.SugaredLogger
6565
metricsScope tally.Scope
6666
counter counter.Counter
67-
stores storage.Factory
67+
store storage.Storage
6868
queueConfigs queueconfig.Store
6969
registry consumer.TopicRegistry
7070
}
7171

7272
// NewLandController creates a new instance of the gateway land controller.
7373
// The controller publishes land requests to the topic registered under
7474
// consumer.TopicKeyStart in the registry.
75-
func NewLandController(logger *zap.SugaredLogger, scope tally.Scope, counter counter.Counter, stores storage.Factory, queueConfigs queueconfig.Store, registry consumer.TopicRegistry) *LandController {
75+
func NewLandController(logger *zap.SugaredLogger, scope tally.Scope, counter counter.Counter, store storage.Storage, queueConfigs queueconfig.Store, registry consumer.TopicRegistry) *LandController {
7676
return &LandController{
7777
logger: logger,
7878
metricsScope: scope.SubScope("land_controller"),
7979
counter: counter,
80-
stores: stores,
80+
store: store,
8181
queueConfigs: queueConfigs,
8282
registry: registry,
8383
}
@@ -110,14 +110,6 @@ func (c *LandController) Land(ctx context.Context, req *pb.LandRequest) (resp *p
110110
return nil, fmt.Errorf("LandController failed to look up queue %q: %w", queue, err)
111111
}
112112

113-
// Resolve the storage backend for this entityqueue. The queue is known up front
114-
// here (from the request), so this is the one place per-queue storage
115-
// routing is actionable.
116-
store, err := c.stores.For(queue)
117-
if err != nil {
118-
return nil, fmt.Errorf("LandController failed to resolve storage for queue=%s: %w", queue, err)
119-
}
120-
121113
// TODO: pass default queue land strategy to resolver function to process a default.
122114
strategy, err := resolveRequestLandStrategy(req.Strategy)
123115
if err != nil {
@@ -141,7 +133,7 @@ func (c *LandController) Land(ctx context.Context, req *pb.LandRequest) (resp *p
141133
// It is important to record the status before publishing to the queue for processing. It is important to publish straight to the database and not via a entityqueue.
142134
// Gateway has to stay consistent with the request log.
143135
logEntry := entity.NewRequestLog(landRequest.ID, entity.RequestStatusAccepted, 0, "", nil)
144-
if err := store.GetRequestLogStore().Insert(ctx, logEntry); err != nil {
136+
if err := c.store.GetRequestLogStore().Insert(ctx, logEntry); err != nil {
145137
return nil, fmt.Errorf("LandController failed to insert request log for sqid=%s: %w", landRequest.ID, err)
146138
}
147139

0 commit comments

Comments
 (0)