diff --git a/example/server/orchestrator/BUILD.bazel b/example/server/orchestrator/BUILD.bazel index 9db3e45e..78cab3f7 100644 --- a/example/server/orchestrator/BUILD.bazel +++ b/example/server/orchestrator/BUILD.bazel @@ -12,6 +12,8 @@ go_library( visibility = ["//visibility:private"], deps = [ "//core/consumer", + "//extension/counter", + "//extension/counter/mysql", "//extension/mergechecker", "//extension/mergechecker/github", "//extension/queue", diff --git a/example/server/orchestrator/main.go b/example/server/orchestrator/main.go index 2e2f6afd..15f2950e 100644 --- a/example/server/orchestrator/main.go +++ b/example/server/orchestrator/main.go @@ -15,6 +15,8 @@ import ( _ "github.com/go-sql-driver/mysql" "github.com/uber-go/tally/v4" "github.com/uber/submitqueue/core/consumer" + "github.com/uber/submitqueue/extension/counter" + mysqlcounter "github.com/uber/submitqueue/extension/counter/mysql" "github.com/uber/submitqueue/extension/mergechecker" githubchecker "github.com/uber/submitqueue/extension/mergechecker/github" extqueue "github.com/uber/submitqueue/extension/queue" @@ -95,6 +97,20 @@ func run() error { metricsWgDone.Wait() }() + // Open app database connection for counter + // Docker Compose healthchecks ensure MySQL is ready before service starts + appDSN := os.Getenv("MYSQL_DSN") + if appDSN == "" { + return fmt.Errorf("MYSQL_DSN environment variable is required") + } + appDB, err := sql.Open("mysql", appDSN) + if err != nil { + return fmt.Errorf("failed to open app database: %w", err) + } + defer appDB.Close() + + cnt := mysqlcounter.NewCounter(appDB) + // Open queue database connection // Docker Compose healthchecks ensure MySQL is ready before service starts queueDSN := os.Getenv("QUEUE_MYSQL_DSN") @@ -135,7 +151,7 @@ func run() error { mc := newMergeChecker(logger, scope) // Register controllers - if err := registerControllers(c, logger.Sugar(), scope, registry, mc); err != nil { + if err := registerControllers(c, logger.Sugar(), scope, registry, mc, cnt); err != nil { return err } @@ -259,7 +275,8 @@ func newTopicRegistry(q extqueue.Queue, subscriberName string) consumer.TopicReg // // → merge → merge-signal // finalize (terminal) -func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope tally.Scope, registry consumer.TopicRegistry, mc mergechecker.MergeChecker) error { + +func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope tally.Scope, registry consumer.TopicRegistry, mc mergechecker.MergeChecker, cnt counter.Counter) error { requestController := request.NewController( logger, scope, @@ -276,6 +293,7 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t logger, scope, registry, + cnt, consumer.TopicToBatch, "orchestrator-batch", ) diff --git a/orchestrator/controller/batch/BUILD.bazel b/orchestrator/controller/batch/BUILD.bazel index dba155a0..b66963e2 100644 --- a/orchestrator/controller/batch/BUILD.bazel +++ b/orchestrator/controller/batch/BUILD.bazel @@ -9,6 +9,7 @@ go_library( "//core/consumer", "//entity", "//entity/queue", + "//extension/counter", "@com_github_uber_go_tally_v4//:tally", "@org_uber_go_zap//:zap", ], diff --git a/orchestrator/controller/batch/batch.go b/orchestrator/controller/batch/batch.go index cc3bc7eb..8705fa48 100644 --- a/orchestrator/controller/batch/batch.go +++ b/orchestrator/controller/batch/batch.go @@ -8,6 +8,7 @@ import ( "github.com/uber/submitqueue/core/consumer" "github.com/uber/submitqueue/entity" entityqueue "github.com/uber/submitqueue/entity/queue" + "github.com/uber/submitqueue/extension/counter" "go.uber.org/zap" ) @@ -18,6 +19,7 @@ type Controller struct { logger *zap.SugaredLogger metricsScope tally.Scope registry consumer.TopicRegistry + counter counter.Counter topic consumer.Topic consumerGroup string } @@ -30,6 +32,7 @@ func NewController( logger *zap.SugaredLogger, scope tally.Scope, registry consumer.TopicRegistry, + counter counter.Counter, topic consumer.Topic, consumerGroup string, ) *Controller { @@ -37,6 +40,7 @@ func NewController( logger: logger.Named("batch_controller"), metricsScope: scope.SubScope("batch_controller"), registry: registry, + counter: counter, topic: topic, consumerGroup: consumerGroup, } @@ -73,9 +77,37 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er "partition_key", msg.PartitionKey, ) - // TODO: Add batching logic - // - Group requests into batches - // - Create batch entity with dependencies + // Generate a globally unique batch ID. + seq, err := c.counter.Next(ctx, "batch/"+request.Queue) + if err != nil { + c.logger.Errorw("failed to generate batch ID", + "request_id", request.ID, + "queue", request.Queue, + "error", err, + ) + c.metricsScope.Counter("counter_errors").Inc(1) + return fmt.Errorf("failed to generate batch ID for queue=%s: %w", request.Queue, err) + } + + batch := entity.Batch{ + ID: fmt.Sprintf("%s/batch/%d", request.Queue, seq), + Queue: request.Queue, + Contains: []string{request.ID}, + // TODO Dependencies + State: entity.BatchStateCreated, + Version: 1, + } + + c.logger.Infow("batch created", + "batch_id", batch.ID, + "request_id", request.ID, + "queue", request.Queue, + ) + + // TODO: + // - Add batch to DB + // - Create batch dependent entity + // - Add to batch dependent DB // Publish to speculate topic if err := c.publish(ctx, consumer.TopicBatched, request); err != nil { diff --git a/orchestrator/controller/batch/batch_test.go b/orchestrator/controller/batch/batch_test.go index 3b35167f..17d977e3 100644 --- a/orchestrator/controller/batch/batch_test.go +++ b/orchestrator/controller/batch/batch_test.go @@ -3,6 +3,7 @@ package batch import ( "context" "fmt" + "sync/atomic" "testing" "github.com/stretchr/testify/assert" @@ -16,8 +17,27 @@ import ( "go.uber.org/zap/zaptest" ) +// mockCounter implements counter.Counter for testing. +type mockCounter struct { + nextFunc func(ctx context.Context, domain string) (int64, error) +} + +func (m *mockCounter) Next(ctx context.Context, domain string) (int64, error) { + return m.nextFunc(ctx, domain) +} + +// newSequentialCounter returns a mock counter that returns incrementing values starting at 1. +func newSequentialCounter() *mockCounter { + var seq int64 + return &mockCounter{ + nextFunc: func(ctx context.Context, domain string) (int64, error) { + return atomic.AddInt64(&seq, 1), nil + }, + } +} + // newTestController creates a controller with test dependencies. -func newTestController(t *testing.T, ctrl *gomock.Controller, publishErr error) *Controller { +func newTestController(t *testing.T, ctrl *gomock.Controller, cnt *mockCounter, publishErr error) *Controller { logger := zaptest.NewLogger(t).Sugar() scope := tally.NoopScope @@ -36,12 +56,12 @@ func newTestController(t *testing.T, ctrl *gomock.Controller, publishErr error) nil, ) - return NewController(logger, scope, registry, consumer.TopicToBatch, "orchestrator-batch") + return NewController(logger, scope, registry, cnt, consumer.TopicToBatch, "orchestrator-batch") } func TestNewController(t *testing.T) { ctrl := gomock.NewController(t) - controller := newTestController(t, ctrl, nil) + controller := newTestController(t, ctrl, newSequentialCounter(), nil) require.NotNil(t, controller) assert.Equal(t, consumer.TopicToBatch, controller.Topic()) @@ -52,7 +72,7 @@ func TestNewController(t *testing.T) { func TestController_Process_Success(t *testing.T) { ctrl := gomock.NewController(t) - controller := newTestController(t, ctrl, nil) + controller := newTestController(t, ctrl, newSequentialCounter(), nil) request := entity.Request{ ID: "test-queue/123", @@ -78,7 +98,7 @@ func TestController_Process_Success(t *testing.T) { func TestController_Process_InvalidJSON(t *testing.T) { ctrl := gomock.NewController(t) - controller := newTestController(t, ctrl, nil) + controller := newTestController(t, ctrl, newSequentialCounter(), nil) invalidPayload := []byte(`{"invalid": json"}`) msg := queue.NewMessage("invalid-msg", invalidPayload, "partition1", nil) @@ -95,7 +115,7 @@ func TestController_Process_InvalidJSON(t *testing.T) { func TestController_Process_PublishFailure(t *testing.T) { ctrl := gomock.NewController(t) - controller := newTestController(t, ctrl, fmt.Errorf("publish failed")) + controller := newTestController(t, ctrl, newSequentialCounter(), fmt.Errorf("publish failed")) request := entity.Request{ ID: "test-queue/123", @@ -118,9 +138,40 @@ func TestController_Process_PublishFailure(t *testing.T) { assert.Error(t, err) } +func TestController_Process_CounterFailure(t *testing.T) { + ctrl := gomock.NewController(t) + + cnt := &mockCounter{ + nextFunc: func(ctx context.Context, domain string) (int64, error) { + return 0, fmt.Errorf("counter unavailable") + }, + } + controller := newTestController(t, ctrl, cnt, nil) + + request := entity.Request{ + ID: "test-queue/123", + Queue: "test-queue", + Change: entity.Change{URIs: []string{"github://uber/service/pull/456/abc123def"}}, + LandStrategy: entity.RequestLandStrategyRebase, + State: entity.RequestStateNew, + Version: 1, + } + + payload, err := request.ToBytes() + require.NoError(t, err) + + msg := queue.NewMessage(request.ID, payload, request.Queue, nil) + delivery := queuemock.NewMockDelivery(ctrl) + delivery.EXPECT().Message().Return(msg).AnyTimes() + delivery.EXPECT().Attempt().Return(1).AnyTimes() + + err = controller.Process(context.Background(), delivery) + assert.Error(t, err) +} + func TestController_InterfaceImplementation(t *testing.T) { ctrl := gomock.NewController(t) - controller := newTestController(t, ctrl, nil) + controller := newTestController(t, ctrl, newSequentialCounter(), nil) var _ consumer.Controller = controller }