Skip to content

Commit 2aae626

Browse files
committed
feat(controller) Add batch ID generation via counter to batch controller
1 parent 8e4e657 commit 2aae626

5 files changed

Lines changed: 115 additions & 12 deletions

File tree

example/server/orchestrator/BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ go_library(
1212
visibility = ["//visibility:private"],
1313
deps = [
1414
"//core/consumer",
15+
"//extension/counter",
16+
"//extension/counter/mysql",
1517
"//extension/queue",
1618
"//extension/queue/mysql",
1719
"//orchestrator/controller",

example/server/orchestrator/main.go

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ import (
1414
_ "github.com/go-sql-driver/mysql"
1515
"github.com/uber-go/tally/v4"
1616
"github.com/uber/submitqueue/core/consumer"
17+
"github.com/uber/submitqueue/extension/counter"
18+
mysqlcounter "github.com/uber/submitqueue/extension/counter/mysql"
1719
extqueue "github.com/uber/submitqueue/extension/queue"
1820
queueMySQL "github.com/uber/submitqueue/extension/queue/mysql"
1921
"github.com/uber/submitqueue/orchestrator/controller"
@@ -92,6 +94,20 @@ func run() error {
9294
metricsWgDone.Wait()
9395
}()
9496

97+
// Open app database connection for counter
98+
// Docker Compose healthchecks ensure MySQL is ready before service starts
99+
appDSN := os.Getenv("MYSQL_DSN")
100+
if appDSN == "" {
101+
return fmt.Errorf("MYSQL_DSN environment variable is required")
102+
}
103+
appDB, err := sql.Open("mysql", appDSN)
104+
if err != nil {
105+
return fmt.Errorf("failed to open app database: %w", err)
106+
}
107+
defer appDB.Close()
108+
109+
cnt := mysqlcounter.NewCounter(appDB)
110+
95111
// Open queue database connection
96112
// Docker Compose healthchecks ensure MySQL is ready before service starts
97113
queueDSN := os.Getenv("QUEUE_MYSQL_DSN")
@@ -129,7 +145,7 @@ func run() error {
129145
c := consumer.New(logger.Sugar(), scope.SubScope("consumer"), registry)
130146

131147
// Register controllers
132-
if err := registerControllers(c, logger.Sugar(), scope, registry); err != nil {
148+
if err := registerControllers(c, logger.Sugar(), scope, registry, cnt); err != nil {
133149
return err
134150
}
135151

@@ -253,7 +269,7 @@ func newTopicRegistry(q extqueue.Queue, subscriberName string) consumer.TopicReg
253269
//
254270
// → merge → merge-signal
255271
// finalize (terminal)
256-
func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope tally.Scope, registry consumer.TopicRegistry) error {
272+
func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope tally.Scope, registry consumer.TopicRegistry, cnt counter.Counter) error {
257273
requestController := request.NewController(
258274
logger,
259275
scope,
@@ -269,6 +285,7 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t
269285
logger,
270286
scope,
271287
registry,
288+
cnt,
272289
consumer.TopicToBatch,
273290
"orchestrator-batch",
274291
)

orchestrator/controller/batch/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ go_library(
99
"//core/consumer",
1010
"//entity",
1111
"//entity/queue",
12+
"//extension/counter",
1213
"@com_github_uber_go_tally_v4//:tally",
1314
"@org_uber_go_zap//:zap",
1415
],

orchestrator/controller/batch/batch.go

Lines changed: 35 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"github.com/uber/submitqueue/core/consumer"
99
"github.com/uber/submitqueue/entity"
1010
entityqueue "github.com/uber/submitqueue/entity/queue"
11+
"github.com/uber/submitqueue/extension/counter"
1112
"go.uber.org/zap"
1213
)
1314

@@ -18,6 +19,7 @@ type Controller struct {
1819
logger *zap.SugaredLogger
1920
metricsScope tally.Scope
2021
registry consumer.TopicRegistry
22+
counter counter.Counter
2123
topic consumer.Topic
2224
consumerGroup string
2325
}
@@ -30,13 +32,15 @@ func NewController(
3032
logger *zap.SugaredLogger,
3133
scope tally.Scope,
3234
registry consumer.TopicRegistry,
35+
counter counter.Counter,
3336
topic consumer.Topic,
3437
consumerGroup string,
3538
) *Controller {
3639
return &Controller{
3740
logger: logger.Named("batch_controller"),
3841
metricsScope: scope.SubScope("batch_controller"),
3942
registry: registry,
43+
counter: counter,
4044
topic: topic,
4145
consumerGroup: consumerGroup,
4246
}
@@ -73,9 +77,37 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er
7377
"partition_key", msg.PartitionKey,
7478
)
7579

76-
// TODO: Add batching logic
77-
// - Group requests into batches
78-
// - Create batch entity with dependencies
80+
// Generate a globally unique batch ID.
81+
seq, err := c.counter.Next(ctx, "batch/"+request.Queue)
82+
if err != nil {
83+
c.logger.Errorw("failed to generate batch ID",
84+
"request_id", request.ID,
85+
"queue", request.Queue,
86+
"error", err,
87+
)
88+
c.metricsScope.Counter("counter_errors").Inc(1)
89+
return fmt.Errorf("failed to generate batch ID for queue=%s: %w", request.Queue, err)
90+
}
91+
92+
batch := entity.Batch{
93+
ID: fmt.Sprintf("%s/batch/%d", request.Queue, seq),
94+
Queue: request.Queue,
95+
Contains: []string{request.ID},
96+
// TODO Dependencies
97+
State: entity.BatchStateCreated,
98+
Version: 1,
99+
}
100+
101+
c.logger.Infow("batch created",
102+
"batch_id", batch.ID,
103+
"request_id", request.ID,
104+
"queue", request.Queue,
105+
)
106+
107+
// TODO:
108+
// - Add batch to DB
109+
// - Create batch dependent entity
110+
// - Add to batch dependent DB
79111

80112
// Publish to speculate topic
81113
if err := c.publish(ctx, consumer.TopicBatched, request); err != nil {

orchestrator/controller/batch/batch_test.go

Lines changed: 58 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package batch
33
import (
44
"context"
55
"fmt"
6+
"sync/atomic"
67
"testing"
78

89
"github.com/stretchr/testify/assert"
@@ -16,8 +17,27 @@ import (
1617
"go.uber.org/zap/zaptest"
1718
)
1819

20+
// mockCounter implements counter.Counter for testing.
21+
type mockCounter struct {
22+
nextFunc func(ctx context.Context, domain string) (int64, error)
23+
}
24+
25+
func (m *mockCounter) Next(ctx context.Context, domain string) (int64, error) {
26+
return m.nextFunc(ctx, domain)
27+
}
28+
29+
// newSequentialCounter returns a mock counter that returns incrementing values starting at 1.
30+
func newSequentialCounter() *mockCounter {
31+
var seq int64
32+
return &mockCounter{
33+
nextFunc: func(ctx context.Context, domain string) (int64, error) {
34+
return atomic.AddInt64(&seq, 1), nil
35+
},
36+
}
37+
}
38+
1939
// newTestController creates a controller with test dependencies.
20-
func newTestController(t *testing.T, ctrl *gomock.Controller, publishErr error) *Controller {
40+
func newTestController(t *testing.T, ctrl *gomock.Controller, cnt *mockCounter, publishErr error) *Controller {
2141
logger := zaptest.NewLogger(t).Sugar()
2242
scope := tally.NoopScope
2343

@@ -36,12 +56,12 @@ func newTestController(t *testing.T, ctrl *gomock.Controller, publishErr error)
3656
nil,
3757
)
3858

39-
return NewController(logger, scope, registry, consumer.TopicToBatch, "orchestrator-batch")
59+
return NewController(logger, scope, registry, cnt, consumer.TopicToBatch, "orchestrator-batch")
4060
}
4161

4262
func TestNewController(t *testing.T) {
4363
ctrl := gomock.NewController(t)
44-
controller := newTestController(t, ctrl, nil)
64+
controller := newTestController(t, ctrl, newSequentialCounter(), nil)
4565

4666
require.NotNil(t, controller)
4767
assert.Equal(t, consumer.TopicToBatch, controller.Topic())
@@ -52,7 +72,7 @@ func TestNewController(t *testing.T) {
5272
func TestController_Process_Success(t *testing.T) {
5373
ctrl := gomock.NewController(t)
5474

55-
controller := newTestController(t, ctrl, nil)
75+
controller := newTestController(t, ctrl, newSequentialCounter(), nil)
5676

5777
request := entity.Request{
5878
ID: "test-queue/123",
@@ -78,7 +98,7 @@ func TestController_Process_Success(t *testing.T) {
7898
func TestController_Process_InvalidJSON(t *testing.T) {
7999
ctrl := gomock.NewController(t)
80100

81-
controller := newTestController(t, ctrl, nil)
101+
controller := newTestController(t, ctrl, newSequentialCounter(), nil)
82102

83103
invalidPayload := []byte(`{"invalid": json"}`)
84104
msg := queue.NewMessage("invalid-msg", invalidPayload, "partition1", nil)
@@ -95,7 +115,7 @@ func TestController_Process_InvalidJSON(t *testing.T) {
95115
func TestController_Process_PublishFailure(t *testing.T) {
96116
ctrl := gomock.NewController(t)
97117

98-
controller := newTestController(t, ctrl, fmt.Errorf("publish failed"))
118+
controller := newTestController(t, ctrl, newSequentialCounter(), fmt.Errorf("publish failed"))
99119

100120
request := entity.Request{
101121
ID: "test-queue/123",
@@ -118,9 +138,40 @@ func TestController_Process_PublishFailure(t *testing.T) {
118138
assert.Error(t, err)
119139
}
120140

141+
func TestController_Process_CounterFailure(t *testing.T) {
142+
ctrl := gomock.NewController(t)
143+
144+
cnt := &mockCounter{
145+
nextFunc: func(ctx context.Context, domain string) (int64, error) {
146+
return 0, fmt.Errorf("counter unavailable")
147+
},
148+
}
149+
controller := newTestController(t, ctrl, cnt, nil)
150+
151+
request := entity.Request{
152+
ID: "test-queue/123",
153+
Queue: "test-queue",
154+
Change: entity.Change{URIs: []string{"github://uber/service/pull/456/abc123def"}},
155+
LandStrategy: entity.RequestLandStrategyRebase,
156+
State: entity.RequestStateNew,
157+
Version: 1,
158+
}
159+
160+
payload, err := request.ToBytes()
161+
require.NoError(t, err)
162+
163+
msg := queue.NewMessage(request.ID, payload, request.Queue, nil)
164+
delivery := queuemock.NewMockDelivery(ctrl)
165+
delivery.EXPECT().Message().Return(msg).AnyTimes()
166+
delivery.EXPECT().Attempt().Return(1).AnyTimes()
167+
168+
err = controller.Process(context.Background(), delivery)
169+
assert.Error(t, err)
170+
}
171+
121172
func TestController_InterfaceImplementation(t *testing.T) {
122173
ctrl := gomock.NewController(t)
123-
controller := newTestController(t, ctrl, nil)
174+
controller := newTestController(t, ctrl, newSequentialCounter(), nil)
124175

125176
var _ consumer.Controller = controller
126177
}

0 commit comments

Comments
 (0)