Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions example/server/orchestrator/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ go_library(
visibility = ["//visibility:private"],
deps = [
"//core/consumer",
"//extension/counter",
"//extension/counter/mysql",
"//extension/mergechecker",
"//extension/mergechecker/github",
"//extension/queue",
Expand Down
22 changes: 20 additions & 2 deletions example/server/orchestrator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (
_ "github.com/go-sql-driver/mysql"
"github.com/uber-go/tally/v4"
"github.com/uber/submitqueue/core/consumer"
"github.com/uber/submitqueue/extension/counter"
mysqlcounter "github.com/uber/submitqueue/extension/counter/mysql"
"github.com/uber/submitqueue/extension/mergechecker"
githubchecker "github.com/uber/submitqueue/extension/mergechecker/github"
extqueue "github.com/uber/submitqueue/extension/queue"
Expand Down Expand Up @@ -95,6 +97,20 @@ func run() error {
metricsWgDone.Wait()
}()

// Open app database connection for counter
// Docker Compose healthchecks ensure MySQL is ready before service starts
appDSN := os.Getenv("MYSQL_DSN")
if appDSN == "" {
return fmt.Errorf("MYSQL_DSN environment variable is required")
}
appDB, err := sql.Open("mysql", appDSN)
if err != nil {
return fmt.Errorf("failed to open app database: %w", err)
}
defer appDB.Close()

cnt := mysqlcounter.NewCounter(appDB)

// Open queue database connection
// Docker Compose healthchecks ensure MySQL is ready before service starts
queueDSN := os.Getenv("QUEUE_MYSQL_DSN")
Expand Down Expand Up @@ -135,7 +151,7 @@ func run() error {
mc := newMergeChecker(logger, scope)

// Register controllers
if err := registerControllers(c, logger.Sugar(), scope, registry, mc); err != nil {
if err := registerControllers(c, logger.Sugar(), scope, registry, mc, cnt); err != nil {
return err
}

Expand Down Expand Up @@ -259,7 +275,8 @@ func newTopicRegistry(q extqueue.Queue, subscriberName string) consumer.TopicReg
//
// → merge → merge-signal
// finalize (terminal)
func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope tally.Scope, registry consumer.TopicRegistry, mc mergechecker.MergeChecker) error {

func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope tally.Scope, registry consumer.TopicRegistry, mc mergechecker.MergeChecker, cnt counter.Counter) error {
requestController := request.NewController(
logger,
scope,
Expand All @@ -276,6 +293,7 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t
logger,
scope,
registry,
cnt,
consumer.TopicToBatch,
"orchestrator-batch",
)
Expand Down
1 change: 1 addition & 0 deletions orchestrator/controller/batch/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ go_library(
"//core/consumer",
"//entity",
"//entity/queue",
"//extension/counter",
"@com_github_uber_go_tally_v4//:tally",
"@org_uber_go_zap//:zap",
],
Expand Down
38 changes: 35 additions & 3 deletions orchestrator/controller/batch/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/uber/submitqueue/core/consumer"
"github.com/uber/submitqueue/entity"
entityqueue "github.com/uber/submitqueue/entity/queue"
"github.com/uber/submitqueue/extension/counter"
"go.uber.org/zap"
)

Expand All @@ -18,6 +19,7 @@ type Controller struct {
logger *zap.SugaredLogger
metricsScope tally.Scope
registry consumer.TopicRegistry
counter counter.Counter
topic consumer.Topic
consumerGroup string
}
Expand All @@ -30,13 +32,15 @@ func NewController(
logger *zap.SugaredLogger,
scope tally.Scope,
registry consumer.TopicRegistry,
counter counter.Counter,
topic consumer.Topic,
consumerGroup string,
) *Controller {
return &Controller{
logger: logger.Named("batch_controller"),
metricsScope: scope.SubScope("batch_controller"),
registry: registry,
counter: counter,
topic: topic,
consumerGroup: consumerGroup,
}
Expand Down Expand Up @@ -73,9 +77,37 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er
"partition_key", msg.PartitionKey,
)

// TODO: Add batching logic
// - Group requests into batches
// - Create batch entity with dependencies
// Generate a globally unique batch ID.
seq, err := c.counter.Next(ctx, "batch/"+request.Queue)
if err != nil {
c.logger.Errorw("failed to generate batch ID",
"request_id", request.ID,
"queue", request.Queue,
"error", err,
)
c.metricsScope.Counter("counter_errors").Inc(1)
return fmt.Errorf("failed to generate batch ID for queue=%s: %w", request.Queue, err)
}

batch := entity.Batch{
ID: fmt.Sprintf("%s/batch/%d", request.Queue, seq),
Queue: request.Queue,
Contains: []string{request.ID},
// TODO Dependencies
State: entity.BatchStateCreated,
Version: 1,
}

c.logger.Infow("batch created",
"batch_id", batch.ID,
"request_id", request.ID,
"queue", request.Queue,
)

// TODO:
// - Add batch to DB
// - Create batch dependent entity
// - Add to batch dependent DB

// Publish to speculate topic
if err := c.publish(ctx, consumer.TopicBatched, request); err != nil {
Expand Down
65 changes: 58 additions & 7 deletions orchestrator/controller/batch/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package batch
import (
"context"
"fmt"
"sync/atomic"
"testing"

"github.com/stretchr/testify/assert"
Expand All @@ -16,8 +17,27 @@ import (
"go.uber.org/zap/zaptest"
)

// mockCounter implements counter.Counter for testing.
type mockCounter struct {
Comment thread
manjari25 marked this conversation as resolved.
nextFunc func(ctx context.Context, domain string) (int64, error)
}

func (m *mockCounter) Next(ctx context.Context, domain string) (int64, error) {
return m.nextFunc(ctx, domain)
}

// newSequentialCounter returns a mock counter that returns incrementing values starting at 1.
func newSequentialCounter() *mockCounter {
Comment thread
manjari25 marked this conversation as resolved.
var seq int64
return &mockCounter{
nextFunc: func(ctx context.Context, domain string) (int64, error) {
return atomic.AddInt64(&seq, 1), nil
},
}
}

// newTestController creates a controller with test dependencies.
func newTestController(t *testing.T, ctrl *gomock.Controller, publishErr error) *Controller {
func newTestController(t *testing.T, ctrl *gomock.Controller, cnt *mockCounter, publishErr error) *Controller {
logger := zaptest.NewLogger(t).Sugar()
scope := tally.NoopScope

Expand All @@ -36,12 +56,12 @@ func newTestController(t *testing.T, ctrl *gomock.Controller, publishErr error)
nil,
)

return NewController(logger, scope, registry, consumer.TopicToBatch, "orchestrator-batch")
return NewController(logger, scope, registry, cnt, consumer.TopicToBatch, "orchestrator-batch")
}

func TestNewController(t *testing.T) {
ctrl := gomock.NewController(t)
controller := newTestController(t, ctrl, nil)
controller := newTestController(t, ctrl, newSequentialCounter(), nil)

require.NotNil(t, controller)
assert.Equal(t, consumer.TopicToBatch, controller.Topic())
Expand All @@ -52,7 +72,7 @@ func TestNewController(t *testing.T) {
func TestController_Process_Success(t *testing.T) {
ctrl := gomock.NewController(t)

controller := newTestController(t, ctrl, nil)
controller := newTestController(t, ctrl, newSequentialCounter(), nil)

request := entity.Request{
ID: "test-queue/123",
Expand All @@ -78,7 +98,7 @@ func TestController_Process_Success(t *testing.T) {
func TestController_Process_InvalidJSON(t *testing.T) {
ctrl := gomock.NewController(t)

controller := newTestController(t, ctrl, nil)
controller := newTestController(t, ctrl, newSequentialCounter(), nil)

invalidPayload := []byte(`{"invalid": json"}`)
msg := queue.NewMessage("invalid-msg", invalidPayload, "partition1", nil)
Expand All @@ -95,7 +115,7 @@ func TestController_Process_InvalidJSON(t *testing.T) {
func TestController_Process_PublishFailure(t *testing.T) {
ctrl := gomock.NewController(t)

controller := newTestController(t, ctrl, fmt.Errorf("publish failed"))
controller := newTestController(t, ctrl, newSequentialCounter(), fmt.Errorf("publish failed"))

request := entity.Request{
ID: "test-queue/123",
Expand All @@ -118,9 +138,40 @@ func TestController_Process_PublishFailure(t *testing.T) {
assert.Error(t, err)
}

func TestController_Process_CounterFailure(t *testing.T) {
ctrl := gomock.NewController(t)

cnt := &mockCounter{
nextFunc: func(ctx context.Context, domain string) (int64, error) {
return 0, fmt.Errorf("counter unavailable")
},
}
controller := newTestController(t, ctrl, cnt, nil)

request := entity.Request{
ID: "test-queue/123",
Queue: "test-queue",
Change: entity.Change{URIs: []string{"github://uber/service/pull/456/abc123def"}},
LandStrategy: entity.RequestLandStrategyRebase,
State: entity.RequestStateNew,
Version: 1,
}

payload, err := request.ToBytes()
require.NoError(t, err)

msg := queue.NewMessage(request.ID, payload, request.Queue, nil)
delivery := queuemock.NewMockDelivery(ctrl)
delivery.EXPECT().Message().Return(msg).AnyTimes()
delivery.EXPECT().Attempt().Return(1).AnyTimes()

err = controller.Process(context.Background(), delivery)
assert.Error(t, err)
}

func TestController_InterfaceImplementation(t *testing.T) {
ctrl := gomock.NewController(t)
controller := newTestController(t, ctrl, nil)
controller := newTestController(t, ctrl, newSequentialCounter(), nil)

var _ consumer.Controller = controller
}