Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion example/server/gateway/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ go_library(
deps = [
"//extension/counter/mysql",
"//extension/queue/mysql",
"//extension/storage/mysql",
"//gateway/controller",
"//gateway/protopb",
"@com_github_go_sql_driver_mysql//:mysql",
Expand Down
9 changes: 2 additions & 7 deletions example/server/gateway/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/uber-go/tally/v4"
mysqlcounter "github.com/uber/submitqueue/extension/counter/mysql"
queueMySQL "github.com/uber/submitqueue/extension/queue/mysql"
"github.com/uber/submitqueue/extension/storage/mysql"
"github.com/uber/submitqueue/gateway/controller"
pb "github.com/uber/submitqueue/gateway/protopb"
"go.uber.org/zap"
Expand Down Expand Up @@ -102,11 +101,7 @@ func run() error {
}
defer appDB.Close()

// Initialize storage and counter from shared app database connection
store, err := mysql.NewStorage(appDB)
if err != nil {
return fmt.Errorf("failed to create storage: %w", err)
}
// Initialize counter from shared app database connection
cnt := mysqlcounter.NewCounter(appDB)

// Open queue database connection
Expand Down Expand Up @@ -141,7 +136,7 @@ func run() error {

// Create controllers and wrap them for gRPC
pingController := controller.NewPingController(logger, scope)
landController := controller.NewLandController(logger.Sugar(), scope, store, cnt, mysqlQueue.Publisher(), "request")
landController := controller.NewLandController(logger.Sugar(), scope, cnt, mysqlQueue.Publisher(), "request")
gatewayServer := &GatewayServer{
pingController: pingController,
landController: landController,
Expand Down
1 change: 1 addition & 0 deletions example/server/orchestrator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,7 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t
requestController := request.NewController(
logger,
scope,
store,
registry,
consumer.TopicKeyRequest,
"orchestrator-request",
Expand Down
2 changes: 0 additions & 2 deletions gateway/controller/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ go_library(
"//entity/queue",
"//extension/counter",
"//extension/queue",
"//extension/storage",
"//gateway/protopb",
"@com_github_uber_go_tally_v4//:tally",
"@org_uber_go_zap//:zap",
Expand All @@ -33,7 +32,6 @@ go_test(
"//entity/queue",
"//extension/counter/mock",
"//extension/queue/mock",
"//extension/storage/mock",
"//gateway/protopb",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
Expand Down
12 changes: 4 additions & 8 deletions gateway/controller/land.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"github.com/uber/submitqueue/entity/queue"
"github.com/uber/submitqueue/extension/counter"
extqueue "github.com/uber/submitqueue/extension/queue"
"github.com/uber/submitqueue/extension/storage"
pb "github.com/uber/submitqueue/gateway/protopb"
"go.uber.org/zap"
)
Expand All @@ -30,19 +29,17 @@ func IsInvalidRequest(err error) bool {
type LandController struct {
logger *zap.SugaredLogger
metricsScope tally.Scope
store storage.Storage
counter counter.Counter
publisher extqueue.Publisher
topic string // Topic to publish requests to (e.g., "request", "land_request")
}

// NewLandController creates a new instance of the gateway land controller.
// topic: the queue topic to publish requests to (e.g., "request", "land_request")
func NewLandController(logger *zap.SugaredLogger, scope tally.Scope, store storage.Storage, counter counter.Counter, publisher extqueue.Publisher, topic string) *LandController {
func NewLandController(logger *zap.SugaredLogger, scope tally.Scope, counter counter.Counter, publisher extqueue.Publisher, topic string) *LandController {
return &LandController{
logger: logger,
metricsScope: scope,
store: store,
counter: counter,
publisher: publisher,
topic: topic,
Expand Down Expand Up @@ -94,10 +91,6 @@ func (c *LandController) Land(ctx context.Context, req *pb.LandRequest) (*pb.Lan
Version: 1,
}

if err := c.store.GetRequestStore().Create(ctx, request); err != nil {
return nil, fmt.Errorf("LandController failed to create request for queue=%s: %w", req.Queue, err)
}

c.logger.Debugw("land request created",
"queue", req.Queue,
"sqid", request.ID,
Expand All @@ -106,6 +99,9 @@ func (c *LandController) Land(ctx context.Context, req *pb.LandRequest) (*pb.Lan
"strategy", string(strategy),
)

// TODO: Insert the created request to the
// event store

// Publish to queue for async processing
if err := c.publishToQueue(ctx, request); err != nil {
c.logger.Errorw("failed to publish request to queue",
Expand Down
97 changes: 9 additions & 88 deletions gateway/controller/land_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"github.com/uber/submitqueue/entity/queue"
countermock "github.com/uber/submitqueue/extension/counter/mock"
queuemock "github.com/uber/submitqueue/extension/queue/mock"
storagemock "github.com/uber/submitqueue/extension/storage/mock"
pb "github.com/uber/submitqueue/gateway/protopb"
"go.uber.org/mock/gomock"
"go.uber.org/zap"
Expand All @@ -27,23 +26,18 @@ func noopPublisher(ctrl *gomock.Controller) *queuemock.MockPublisher {

func TestNewLandController(t *testing.T) {
ctrl := gomock.NewController(t)
store := storagemock.NewMockStorage(ctrl)

cnt := countermock.NewMockCounter(ctrl)
controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, store, cnt, noopPublisher(ctrl), "request")
controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, noopPublisher(ctrl), "request")
require.NotNil(t, controller)
}

func TestLand_ReturnsSqid(t *testing.T) {
ctrl := gomock.NewController(t)
mockReqStore := storagemock.NewMockRequestStore(ctrl)
mockReqStore.EXPECT().Create(gomock.Any(), gomock.Any()).Return(nil)
store := storagemock.NewMockStorage(ctrl)
store.EXPECT().GetRequestStore().Return(mockReqStore).AnyTimes()

cnt := countermock.NewMockCounter(ctrl)
cnt.EXPECT().Next(gomock.Any(), gomock.Any()).Return(int64(1), nil)
controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, store, cnt, noopPublisher(ctrl), "request")
controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, noopPublisher(ctrl), "request")
ctx := context.Background()

req := &pb.LandRequest{
Expand All @@ -56,70 +50,12 @@ func TestLand_ReturnsSqid(t *testing.T) {
assert.Equal(t, "test-queue/1", resp.Sqid)
}

func TestLand_PassesCorrectParametersToStore(t *testing.T) {
var capturedRequest entity.Request

ctrl := gomock.NewController(t)
mockReqStore := storagemock.NewMockRequestStore(ctrl)
mockReqStore.EXPECT().Create(gomock.Any(), gomock.Any()).DoAndReturn(
func(ctx context.Context, request entity.Request) error {
capturedRequest = request
return nil
},
)
store := storagemock.NewMockStorage(ctrl)
store.EXPECT().GetRequestStore().Return(mockReqStore).AnyTimes()

cnt := countermock.NewMockCounter(ctrl)
cnt.EXPECT().Next(gomock.Any(), gomock.Any()).Return(int64(42), nil)
controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, store, cnt, noopPublisher(ctrl), "request")
ctx := context.Background()

req := &pb.LandRequest{
Queue: "my-queue",
Change: &pb.Change{Uris: []string{"github://uber/myservice/pull/1/abc111", "github://uber/myservice/pull/2/def222"}},
Strategy: pb.Strategy_REBASE,
}
resp, err := controller.Land(ctx, req)

require.NoError(t, err)
assert.Equal(t, "my-queue/42", capturedRequest.ID)
assert.Equal(t, "my-queue", capturedRequest.Queue)
assert.Equal(t, []string{"github://uber/myservice/pull/1/abc111", "github://uber/myservice/pull/2/def222"}, capturedRequest.Change.URIs)
assert.Equal(t, entity.RequestLandStrategyRebase, capturedRequest.LandStrategy)
assert.Equal(t, entity.RequestStateNew, capturedRequest.State)
assert.Equal(t, int32(1), capturedRequest.Version)
assert.Equal(t, "my-queue/42", resp.Sqid)
}

func TestLand_ReturnsErrorOnStorageFailure(t *testing.T) {
ctrl := gomock.NewController(t)
mockReqStore := storagemock.NewMockRequestStore(ctrl)
mockReqStore.EXPECT().Create(gomock.Any(), gomock.Any()).Return(fmt.Errorf("database connection failed"))
store := storagemock.NewMockStorage(ctrl)
store.EXPECT().GetRequestStore().Return(mockReqStore).AnyTimes()

cnt := countermock.NewMockCounter(ctrl)
cnt.EXPECT().Next(gomock.Any(), gomock.Any()).Return(int64(1), nil)
controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, store, cnt, noopPublisher(ctrl), "request")
ctx := context.Background()

req := &pb.LandRequest{
Queue: "test-queue",
Change: &pb.Change{Uris: []string{"github://uber/test-repo/pull/123/abc123def"}},
}
_, err := controller.Land(ctx, req)

require.Error(t, err)
}

func TestLand_ReturnsErrorOnCounterFailure(t *testing.T) {
ctrl := gomock.NewController(t)
store := storagemock.NewMockStorage(ctrl)

cnt := countermock.NewMockCounter(ctrl)
cnt.EXPECT().Next(gomock.Any(), gomock.Any()).Return(int64(0), fmt.Errorf("counter unavailable"))
controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, store, cnt, noopPublisher(ctrl), "request")
controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, noopPublisher(ctrl), "request")
ctx := context.Background()

req := &pb.LandRequest{
Expand All @@ -135,10 +71,6 @@ func TestLand_CounterDomainIncludesQueue(t *testing.T) {
var capturedDomain string

ctrl := gomock.NewController(t)
mockReqStore := storagemock.NewMockRequestStore(ctrl)
mockReqStore.EXPECT().Create(gomock.Any(), gomock.Any()).Return(nil)
store := storagemock.NewMockStorage(ctrl)
store.EXPECT().GetRequestStore().Return(mockReqStore).AnyTimes()

cnt := countermock.NewMockCounter(ctrl)
cnt.EXPECT().Next(gomock.Any(), gomock.Any()).DoAndReturn(
Expand All @@ -147,7 +79,7 @@ func TestLand_CounterDomainIncludesQueue(t *testing.T) {
return 1, nil
},
)
controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, store, cnt, noopPublisher(ctrl), "request")
controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, noopPublisher(ctrl), "request")
ctx := context.Background()

req := &pb.LandRequest{
Expand All @@ -162,10 +94,9 @@ func TestLand_CounterDomainIncludesQueue(t *testing.T) {

func TestLand_ReturnsErrorOnEmptyQueue(t *testing.T) {
ctrl := gomock.NewController(t)
store := storagemock.NewMockStorage(ctrl)

cnt := countermock.NewMockCounter(ctrl)
controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, store, cnt, noopPublisher(ctrl), "request")
controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, noopPublisher(ctrl), "request")
ctx := context.Background()

req := &pb.LandRequest{
Expand All @@ -180,10 +111,9 @@ func TestLand_ReturnsErrorOnEmptyQueue(t *testing.T) {

func TestLand_ReturnsErrorOnEmptyChangeUri(t *testing.T) {
ctrl := gomock.NewController(t)
store := storagemock.NewMockStorage(ctrl)

cnt := countermock.NewMockCounter(ctrl)
controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, store, cnt, noopPublisher(ctrl), "request")
controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, noopPublisher(ctrl), "request")
ctx := context.Background()

req := &pb.LandRequest{
Expand All @@ -198,10 +128,9 @@ func TestLand_ReturnsErrorOnEmptyChangeUri(t *testing.T) {

func TestLand_ReturnsErrorOnNilChange(t *testing.T) {
ctrl := gomock.NewController(t)
store := storagemock.NewMockStorage(ctrl)

cnt := countermock.NewMockCounter(ctrl)
controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, store, cnt, noopPublisher(ctrl), "request")
controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, noopPublisher(ctrl), "request")
ctx := context.Background()

req := &pb.LandRequest{
Expand All @@ -219,10 +148,6 @@ func TestLand_PublishesToQueue(t *testing.T) {
var publishedMessage queue.Message

ctrl := gomock.NewController(t)
mockReqStore := storagemock.NewMockRequestStore(ctrl)
mockReqStore.EXPECT().Create(gomock.Any(), gomock.Any()).Return(nil)
store := storagemock.NewMockStorage(ctrl)
store.EXPECT().GetRequestStore().Return(mockReqStore).AnyTimes()

cnt := countermock.NewMockCounter(ctrl)
cnt.EXPECT().Next(gomock.Any(), gomock.Any()).Return(int64(123), nil)
Expand All @@ -235,7 +160,7 @@ func TestLand_PublishesToQueue(t *testing.T) {
},
)

controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, store, cnt, publisher, "request")
controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, publisher, "request")
ctx := context.Background()

req := &pb.LandRequest{
Expand Down Expand Up @@ -266,17 +191,13 @@ func TestLand_PublishesToQueue(t *testing.T) {

func TestLand_ContinuesWhenPublishFails(t *testing.T) {
ctrl := gomock.NewController(t)
mockReqStore := storagemock.NewMockRequestStore(ctrl)
mockReqStore.EXPECT().Create(gomock.Any(), gomock.Any()).Return(nil)
store := storagemock.NewMockStorage(ctrl)
store.EXPECT().GetRequestStore().Return(mockReqStore).AnyTimes()

cnt := countermock.NewMockCounter(ctrl)
cnt.EXPECT().Next(gomock.Any(), gomock.Any()).Return(int64(999), nil)
publisher := queuemock.NewMockPublisher(ctrl)
publisher.EXPECT().Publish(gomock.Any(), gomock.Any(), gomock.Any()).Return(fmt.Errorf("queue unavailable"))

controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, store, cnt, publisher, "request")
controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, publisher, "request")
ctx := context.Background()

req := &pb.LandRequest{
Expand Down
3 changes: 3 additions & 0 deletions orchestrator/controller/request/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ go_library(
"//core/errs",
"//entity",
"//entity/queue",
"//extension/storage",
"@com_github_uber_go_tally_v4//:tally",
"@org_uber_go_zap//:zap",
],
Expand All @@ -25,6 +26,8 @@ go_test(
"//entity",
"//entity/queue",
"//extension/queue/mock",
"//extension/storage",
"//extension/storage/mock",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
"@com_github_uber_go_tally_v4//:tally",
Expand Down
17 changes: 16 additions & 1 deletion orchestrator/controller/request/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,25 @@ package request

import (
"context"
"errors"
"fmt"

"github.com/uber-go/tally/v4"
"github.com/uber/submitqueue/core/consumer"
"github.com/uber/submitqueue/core/errs"
"github.com/uber/submitqueue/entity"
entityqueue "github.com/uber/submitqueue/entity/queue"
"github.com/uber/submitqueue/extension/storage"
"go.uber.org/zap"
)

// Controller handles request queue messages.
// It consumes requests and publishes to the validate stage.
// It consumes requests, persists them to storage, and publishes to the validate stage.
// Implements consumer.Controller interface for integration with the consumer.
type Controller struct {
logger *zap.SugaredLogger
metricsScope tally.Scope
store storage.Storage
registry consumer.TopicRegistry
topicKey consumer.TopicKey
consumerGroup string
Expand All @@ -30,13 +33,15 @@ var _ consumer.Controller = (*Controller)(nil)
func NewController(
logger *zap.SugaredLogger,
scope tally.Scope,
store storage.Storage,
registry consumer.TopicRegistry,
topicKey consumer.TopicKey,
consumerGroup string,
) *Controller {
return &Controller{
logger: logger.Named("request_controller"),
metricsScope: scope.SubScope("request_controller"),
store: store,
registry: registry,
topicKey: topicKey,
consumerGroup: consumerGroup,
Expand Down Expand Up @@ -77,6 +82,16 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er
"partition_key", msg.PartitionKey,
)

// Persist request to storage (idempotent — ErrAlreadyExists means a retry)
if err := c.store.GetRequestStore().Create(ctx, request); err != nil && !errors.Is(err, storage.ErrAlreadyExists) {
c.logger.Errorw("failed to create request in storage",
"request_id", request.ID,
"error", err,
)
c.metricsScope.Counter("storage_errors").Inc(1)
return errs.NewRetryableError(fmt.Errorf("failed to create request: %w", err))
}

// Publish to validate topic
if err := c.publish(ctx, consumer.TopicKeyValidate, request); err != nil {
c.logger.Errorw("failed to publish output",
Expand Down
Loading