Skip to content

Commit 54f4c7b

Browse files
committed
refactor(request-log): gateway is sole writer of request log
## Summary ### Why? The request log had two persistence paths: the gateway wrote some entries directly, while the orchestrator ran the `log`-topic consumer that wrote all downstream entries to storage. Having the orchestrator persist the request log blurs ownership — the orchestrator is a pipeline that should only emit events, not own the request-log table. Concentrating all writes in the gateway gives a single owner for the request log and keeps the orchestrator free of request-log storage writes. ### What? The request-log persistence consumer moves from the orchestrator to the gateway: - Move `submitqueue/orchestrator/controller/log/` → `submitqueue/gateway/controller/log/` (importpath, doc comment, and default consumer group `orchestrator-log` → `gateway-log`). Logic is unchanged. - Orchestrator: `TopicKeyLog` becomes publish-only (subscription dropped), the log controller registration and import are removed, controller count 11 → 10. It still publishes via `submitqueue/core/request.PublishLog`. - Gateway: builds a consumer (generic + mysql classifiers), registers the moved log controller on `TopicKeyLog` with a subscription (group `gateway-log`), starts it, and drains it with `Stop(30000)` on shutdown — preserving the 128+SIGTERM graceful-exit contract. - Add `HOSTNAME=gateway-dev` to both gateway compose files for a stable subscriber name; update the workflow RFC and gateway README. - Tests: add a gateway integration test that publishes to the log topic (as the orchestrator does) and asserts the gateway consumer persists it, and an e2e test that lands a request and asserts Status advances to `started` — exercising the publish→consume→persist path across both services. The gateway keeps its two synchronous direct writes (`accepted` on Land, `cancelling` on Cancel) for read-your-write visibility at RPC return. Both are gateway writes, so the invariant holds: only the gateway persists the request log; the orchestrator only publishes. This works because gateway and orchestrator already share the same queue and app databases. ## Test Plan - ✅ `bazel build` of both servers + the moved package - ✅ `make test` — unit tests pass (incl. the moved log_test) - ✅ `make check-gazelle`, `make check-tidy`, `make lint` (fmt + license) - ✅ `make integration-test-submitqueue-gateway` — new `TestRequestLogConsumer` verifies the gateway consumer persists a log entry published to the log topic - ✅ `make e2e-test` — new `TestLandRequest_PersistsStartedLogViaGatewayConsumer` verifies an orchestrator-published `started` log is persisted by the gateway and readable via Status; both services still exit 128+SIGTERM on shutdown
1 parent 35e2613 commit 54f4c7b

15 files changed

Lines changed: 208 additions & 30 deletions

File tree

doc/rfc/submitqueue/workflow.md

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
The orchestrator processes land requests through a queue-driven pipeline of small, single-purpose controllers. The gateway accepts a request over RPC and hands it off asynchronously; from there each controller consumes one topic, advances the request or batch, and publishes to the next topic. Most hops carry only an ID — the controller fetches the entity from storage — while a few entry points (`start`, `buildsignal`, `log`) carry the full payload because there is no row to fetch yet.
44

5-
The pipeline has two cycles: `speculate → build → buildsignal → speculate` (CI feedback loop) and `merge → speculate` (advance the next batch). `conclude` is the only stage that transitions a request to a terminal state; `log` is an append-only sink that any controller can publish to via `submitqueue/core/request.PublishLog`.
5+
The pipeline has two cycles: `speculate → build → buildsignal → speculate` (CI feedback loop) and `merge → speculate` (advance the next batch). `conclude` is the only stage that transitions a request to a terminal state; `log` is an append-only sink that any controller can publish to via `submitqueue/core/request.PublishLog`. The orchestrator only *publishes* request log events — the **gateway** is the sole consumer of the `log` topic and the only service that persists the request log to storage.
66

77
## Diagram
88

@@ -14,8 +14,8 @@ The pipeline has two cycles: `speculate → build → buildsignal → speculate`
1414
│ LandRequest
1515
1616
┌──────────────────────┐ ┌──────────────────────────────────┐
17-
│ log (terminal sink) │◄───│ start │
18-
Append RequestLog │ │ Persist Request, emit Started │
17+
│ log (gateway sink) │◄───│ start │
18+
Persist RequestLog │ │ Persist Request, emit Started │
1919
└──────────────────────┘ └────────────────┬─────────────────┘
2020
▲ │ RequestID
2121
│ ▼
@@ -79,4 +79,4 @@ The pipeline has two cycles: `speculate → build → buildsignal → speculate`
7979
| **buildsignal** | Build | speculate | Feed CI result back into speculation |
8080
| **merge** | BatchID | conclude, speculate | Merge the batch and advance the queue |
8181
| **conclude** | BatchID || Map terminal batch state to request state |
82-
| **log** | RequestLog || Append-only sink for request log events |
82+
| **log** | RequestLog || Gateway-owned sink: persists request log events to storage |

example/submitqueue/docker-compose.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,8 @@ services:
5555
- QUEUE_MYSQL_DSN=root:root@tcp(mysql-queue:3306)/submitqueue?parseTime=true
5656
# Path to YAML queue configuration baked into the image
5757
- QUEUE_CONFIG_PATH=/root/queues.yaml
58+
# Stable subscriber name for the request-log consumer
59+
- HOSTNAME=gateway-dev
5860
depends_on:
5961
mysql-app:
6062
condition: service_healthy

example/submitqueue/gateway/server/BUILD.bazel

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,16 @@ go_library(
1111
importpath = "github.com/uber/submitqueue/example/submitqueue/gateway/server",
1212
visibility = ["//visibility:private"],
1313
deps = [
14+
"//core/errs/generic",
15+
"//core/errs/mysql",
1416
"//extension/counter/mysql",
17+
"//extension/messagequeue",
1518
"//extension/messagequeue/mysql",
1619
"//submitqueue/core/consumer",
1720
"//submitqueue/extension/queueconfig/yaml",
1821
"//submitqueue/extension/storage/mysql",
1922
"//submitqueue/gateway/controller",
23+
"//submitqueue/gateway/controller/log",
2024
"//submitqueue/gateway/protopb",
2125
"@com_github_go_sql_driver_mysql//:mysql",
2226
"@com_github_uber_go_tally_v4//:tally",

example/submitqueue/gateway/server/docker-compose.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,8 @@ services:
5555
- QUEUE_MYSQL_DSN=root:root@tcp(mysql-queue:3306)/submitqueue?parseTime=true
5656
# Path to YAML queue configuration baked into the image
5757
- QUEUE_CONFIG_PATH=/root/queues.yaml
58+
# Stable subscriber name for the request-log consumer
59+
- HOSTNAME=gateway-dev
5860
depends_on:
5961
mysql-app:
6062
condition: service_healthy

example/submitqueue/gateway/server/main.go

Lines changed: 67 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,16 @@ import (
2828

2929
_ "github.com/go-sql-driver/mysql"
3030
"github.com/uber-go/tally/v4"
31+
genericerrs "github.com/uber/submitqueue/core/errs/generic"
32+
mysqlerrs "github.com/uber/submitqueue/core/errs/mysql"
3133
mysqlcounter "github.com/uber/submitqueue/extension/counter/mysql"
34+
extqueue "github.com/uber/submitqueue/extension/messagequeue"
3235
queueMySQL "github.com/uber/submitqueue/extension/messagequeue/mysql"
3336
"github.com/uber/submitqueue/submitqueue/core/consumer"
3437
yamlqueueconfig "github.com/uber/submitqueue/submitqueue/extension/queueconfig/yaml"
3538
mysqlstorage "github.com/uber/submitqueue/submitqueue/extension/storage/mysql"
3639
"github.com/uber/submitqueue/submitqueue/gateway/controller"
40+
logctrl "github.com/uber/submitqueue/submitqueue/gateway/controller/log"
3741
pb "github.com/uber/submitqueue/submitqueue/gateway/protopb"
3842
"go.uber.org/zap"
3943
"google.golang.org/grpc"
@@ -174,12 +178,29 @@ func run() error {
174178
zap.String("queue_dsn", queueDSN),
175179
)
176180

177-
// Build a publish-only topic registry: gateway only feeds the start of the
178-
// orchestrator pipeline (TopicKeyStart) and the cancel topic (TopicKeyCancel).
179-
// No subscription is configured because the gateway never consumes from the queue.
181+
// Stable subscriber name for the log-topic consumer. Falls back to a
182+
// time-seeded name when HOSTNAME is unset (e.g. local runs).
183+
subscriberName := os.Getenv("HOSTNAME")
184+
if subscriberName == "" {
185+
subscriberName = fmt.Sprintf("gateway-%d", time.Now().Unix())
186+
}
187+
188+
// Build the topic registry. The gateway publishes to the start of the
189+
// orchestrator pipeline (TopicKeyStart) and the cancel topic (TopicKeyCancel) —
190+
// both publish-only. It additionally consumes the log topic (TopicKeyLog):
191+
// the gateway is the sole writer of the request log, persisting entries that
192+
// the orchestrator publishes there.
180193
registry, err := consumer.NewTopicRegistry([]consumer.TopicConfig{
181194
{Key: consumer.TopicKeyStart, Name: "start", Queue: mysqlQueue},
182195
{Key: consumer.TopicKeyCancel, Name: "cancel", Queue: mysqlQueue},
196+
{
197+
Key: consumer.TopicKeyLog,
198+
Name: "log",
199+
Queue: mysqlQueue,
200+
Subscription: extqueue.DefaultSubscriptionConfig(
201+
subscriberName, "gateway-log",
202+
),
203+
},
183204
})
184205
if err != nil {
185206
return fmt.Errorf("failed to create topic registry: %w", err)
@@ -201,7 +222,8 @@ func run() error {
201222

202223
// Initialize storage from the shared app database connection. The land
203224
// controller writes to this store directly; cancel/status use the request
204-
// log store directly.
225+
// log store directly. The log consumer (registered below) is the sole
226+
// persister of request log entries published by the orchestrator.
205227
store, err := mysqlstorage.NewStorage(appDB, scope.SubScope("storage"))
206228
if err != nil {
207229
return fmt.Errorf("failed to create storage: %w", err)
@@ -236,6 +258,29 @@ func run() error {
236258
// Register reflection service for debugging with grpcurl
237259
reflection.Register(grpcServer)
238260

261+
// Create the queue consumer and register the log controller. The gateway is
262+
// the sole persister of the request log: the orchestrator publishes entries
263+
// to the log topic and this consumer writes them to storage.
264+
logConsumer := consumer.New(logger.Sugar(), scope.SubScope("consumer"), registry,
265+
// Storage (extension/storage/mysql) and queue (extension/messagequeue/mysql)
266+
// both run on the same MySQL driver, so a single classifier covers
267+
// errors surfaced from either backend.
268+
genericerrs.Classifier,
269+
mysqlerrs.Classifier,
270+
)
271+
272+
logController := logctrl.NewController(logger.Sugar(), scope, store, consumer.TopicKeyLog, "gateway-log")
273+
if err := logConsumer.Register(logController); err != nil {
274+
return fmt.Errorf("failed to register log controller: %w", err)
275+
}
276+
277+
if err := logConsumer.Start(ctx); err != nil {
278+
// The error can also be a result of a context cancellation due to SIGINT or SIGTERM.
279+
// This is expected, just propagate it.
280+
return fmt.Errorf("failed to start log consumer: %w", err)
281+
}
282+
logger.Info("log consumer started")
283+
239284
// Listen on configurable port
240285
port := os.Getenv("PORT")
241286
if port == "" {
@@ -257,6 +302,8 @@ func run() error {
257302

258303
// Wait for interrupt signal or server critical error
259304
// If interruption is signaled, gracefully stop the server
305+
// If the server exits with an error, cancel the context to signal the consumer
306+
// After this, stop the consumer
260307
// If an error happens during shutdown, return the actual error, not the context cancellation error
261308
var serverErr error
262309
select {
@@ -273,10 +320,25 @@ func run() error {
273320
serverErr = <-serverErrCh
274321
case serverErr = <-serverErrCh:
275322
fmt.Println("Shutting down gateway server due to critical GRPC server error...")
323+
324+
// Cancel the context to signal cancellation to the queue consumer
325+
cancel()
276326
}
277327

278328
if serverErr != nil {
279-
err = fmt.Errorf("GRPC server exited with error: %w", serverErr)
329+
serverErr = fmt.Errorf("GRPC server exited with error: %w", serverErr)
330+
}
331+
332+
// Stop the consumer with a 30s timeout; by this time the context should be
333+
// cancelled and the processing threads may already be exiting; recollect them.
334+
errStop := logConsumer.Stop(30000)
335+
if errStop != nil {
336+
errStop = fmt.Errorf("failed to stop consumer: %w", errStop)
337+
}
338+
339+
if errStop != nil || serverErr != nil {
340+
// Override context cancellation error with the shutdown error
341+
err = errors.Join(errStop, serverErr)
280342
}
281343

282344
return err

example/submitqueue/orchestrator/server/BUILD.bazel

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@ go_library(
4242
"//submitqueue/orchestrator/controller/buildsignal",
4343
"//submitqueue/orchestrator/controller/cancel",
4444
"//submitqueue/orchestrator/controller/conclude",
45-
"//submitqueue/orchestrator/controller/log",
4645
"//submitqueue/orchestrator/controller/merge",
4746
"//submitqueue/orchestrator/controller/score",
4847
"//submitqueue/orchestrator/controller/speculate",

example/submitqueue/orchestrator/server/main.go

Lines changed: 4 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,6 @@ import (
6161
"github.com/uber/submitqueue/submitqueue/orchestrator/controller/buildsignal"
6262
"github.com/uber/submitqueue/submitqueue/orchestrator/controller/cancel"
6363
"github.com/uber/submitqueue/submitqueue/orchestrator/controller/conclude"
64-
logctrl "github.com/uber/submitqueue/submitqueue/orchestrator/controller/log"
6564
"github.com/uber/submitqueue/submitqueue/orchestrator/controller/merge"
6665
"github.com/uber/submitqueue/submitqueue/orchestrator/controller/score"
6766
"github.com/uber/submitqueue/submitqueue/orchestrator/controller/speculate"
@@ -238,7 +237,7 @@ func run() error {
238237
return err
239238
}
240239

241-
logger.Info("controllers registered", zap.Int("count", 11))
240+
logger.Info("controllers registered", zap.Int("count", 10))
242241

243242
// Start consumers
244243
if err := c.Start(ctx); err != nil {
@@ -408,12 +407,12 @@ func newTopicRegistry(q extqueue.Queue, subscriberName string) (consumer.TopicRe
408407
),
409408
},
410409
{
410+
// Publish-only: the orchestrator emits request log entries to the
411+
// log topic but never persists them. The gateway is the sole
412+
// consumer that writes the request log to storage.
411413
Key: consumer.TopicKeyLog,
412414
Name: "log",
413415
Queue: q,
414-
Subscription: extqueue.DefaultSubscriptionConfig(
415-
subscriberName, "orchestrator-log",
416-
),
417416
},
418417
})
419418
}
@@ -605,17 +604,6 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t
605604
return fmt.Errorf("failed to register conclude controller: %w", err)
606605
}
607606

608-
logController := logctrl.NewController(
609-
logger,
610-
scope,
611-
store,
612-
consumer.TopicKeyLog,
613-
"orchestrator-log",
614-
)
615-
if err := c.Register(logController); err != nil {
616-
return fmt.Errorf("failed to register log controller: %w", err)
617-
}
618-
619607
return nil
620608
}
621609

submitqueue/gateway/README.md

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1,20 @@
1-
SubmitQueue Gateway
1+
# SubmitQueue Gateway
2+
3+
The gateway is the RPC entry point to SubmitQueue. It accepts `Land`, `Cancel`,
4+
`Status`, and `Ping` calls, validates them at the edge, and hands work off to the
5+
orchestrator pipeline asynchronously via the message queue.
6+
7+
## Request log ownership
8+
9+
The gateway is the **sole writer of the request log**. No other service persists
10+
request log entries:
11+
12+
- For statuses it produces synchronously (`accepted` on `Land`, `cancelling` on
13+
`Cancel`), the gateway writes directly to storage so the entry is visible the
14+
moment the RPC returns.
15+
- For statuses produced downstream, the orchestrator only *publishes* entries to
16+
the `log` topic via `submitqueue/core/request.PublishLog`. The gateway runs a
17+
consumer that drains the `log` topic and persists each entry to storage.
18+
19+
This keeps a single service responsible for the request log while letting the
20+
orchestrator remain free of storage writes for it.

submitqueue/orchestrator/controller/log/BUILD.bazel renamed to submitqueue/gateway/controller/log/BUILD.bazel

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ load("@rules_go//go:def.bzl", "go_library", "go_test")
33
go_library(
44
name = "log",
55
srcs = ["log.go"],
6-
importpath = "github.com/uber/submitqueue/submitqueue/orchestrator/controller/log",
6+
importpath = "github.com/uber/submitqueue/submitqueue/gateway/controller/log",
77
visibility = ["//visibility:public"],
88
deps = [
99
"//core/metrics",

submitqueue/orchestrator/controller/log/log.go renamed to submitqueue/gateway/controller/log/log.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,10 @@ import (
2929
// Controller handles log queue messages.
3030
// It consumes request log entries and persists them to storage.
3131
// Implements consumer.Controller interface for integration with the consumer.
32+
//
33+
// The request log is written exclusively by the gateway: other services
34+
// (e.g. the orchestrator) only publish log entries to the log topic, and this
35+
// controller is the single consumer that persists them to storage.
3236
type Controller struct {
3337
logger *zap.SugaredLogger
3438
metricsScope tally.Scope
@@ -40,7 +44,7 @@ type Controller struct {
4044
// Verify Controller implements consumer.Controller interface at compile time.
4145
var _ consumer.Controller = (*Controller)(nil)
4246

43-
// NewController creates a new log controller for the orchestrator.
47+
// NewController creates a new log controller for the gateway.
4448
func NewController(
4549
logger *zap.SugaredLogger,
4650
scope tally.Scope,

0 commit comments

Comments
 (0)