Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions MODULE.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use_repo(
"com_github_go_sql_driver_mysql",
"com_github_gogo_protobuf",
"com_github_stretchr_testify",
"com_github_testcontainers_testcontainers_go_modules_mysql",
"com_github_uber_go_tally_v4",
"org_golang_google_grpc",
"org_golang_google_protobuf",
Expand Down
1 change: 1 addition & 0 deletions examples/server/gateway/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ go_library(
importpath = "github.com/uber/submitqueue/examples/server/gateway",
visibility = ["//visibility:private"],
deps = [
"//extensions/storage/mysql",
"//gateway/controller",
"//gateway/protopb",
"@com_github_uber_go_tally_v4//:tally",
Expand Down
33 changes: 29 additions & 4 deletions examples/server/gateway/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

"github.com/uber-go/tally/v4"
"github.com/uber/submitqueue/extensions/storage/mysql"
"github.com/uber/submitqueue/gateway/controller"
pb "github.com/uber/submitqueue/gateway/protopb"
"go.uber.org/zap"
Expand All @@ -21,12 +22,18 @@ import (
// GatewayServer wraps the controller and implements the gRPC service interface
type GatewayServer struct {
pb.UnimplementedSubmitQueueGatewayServer
controller *controller.PingController
pingController *controller.PingController
landController *controller.LandController
}

// Ping delegates to the controller
func (s *GatewayServer) Ping(ctx context.Context, req *pb.PingRequest) (*pb.PingResponse, error) {
return s.controller.Ping(ctx, req)
return s.pingController.Ping(ctx, req)
}

// Land delegates to the controller
func (s *GatewayServer) Land(ctx context.Context, req *pb.LandRequest) (*pb.LandResponse, error) {
return s.landController.Land(ctx, req)
}

func main() {
Expand Down Expand Up @@ -75,13 +82,31 @@ func run() error {
metricsWgDone.Wait()
}()

// Initialize MySQL storage factory
mysqlDSN := os.Getenv("MYSQL_DSN")
if mysqlDSN == "" {
mysqlDSN = "root:root@tcp(localhost:3306)/submitqueue?parseTime=true"
}
storeFactory, err := mysql.NewFactory(mysql.MySQLParameters{
DSN: mysqlDSN,
MaxOpenConns: 10,
MaxIdleConns: 5,
ConnMaxLifetime: 5 * time.Minute,
})
if err != nil {
return fmt.Errorf("failed to create MySQL storage factory: %w", err)
}
defer storeFactory.Close()

// Create gRPC server
grpcServer := grpc.NewServer()

// Create ping controller and wrap it for gRPC
// Create controllers and wrap them for gRPC
pingController := controller.NewPingController(logger, scope)
landController := controller.NewLandController(logger, scope, storeFactory)
gatewayServer := &GatewayServer{
controller: pingController,
pingController: pingController,
landController: landController,
}
pb.RegisterSubmitQueueGatewayServer(grpcServer, gatewayServer)

Expand Down
5 changes: 5 additions & 0 deletions extensions/storage/mysql/schema/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
filegroup(
name = "schema",
srcs = glob(["*.sql"]),
visibility = ["//visibility:public"],
)
4 changes: 2 additions & 2 deletions extensions/storage/mysql/schema/request.sql
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ CREATE TABLE IF NOT EXISTS request (
seq BIGINT NOT NULL,
change_source VARCHAR(255) NOT NULL,
change_ids JSON NOT NULL,
land_strategy INT NOT NULL DEFAULT 0,
land_strategy INT NOT NULL,
state INT NOT NULL,
version INT NOT NULL DEFAULT 1,
version INT NOT NULL,
PRIMARY KEY (queue, seq)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
4 changes: 4 additions & 0 deletions gateway/controller/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ go_library(
importpath = "github.com/uber/submitqueue/gateway/controller",
visibility = ["//visibility:public"],
deps = [
"//entities",
"//extensions/storage",
"//gateway/protopb",
"@com_github_uber_go_tally_v4//:tally",
"@org_uber_go_zap//:zap",
Expand All @@ -23,6 +25,8 @@ go_test(
],
embed = [":controller"],
deps = [
"//entities",
"//extensions/storage",
"//gateway/protopb",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
Expand Down
20 changes: 17 additions & 3 deletions gateway/controller/land.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"time"

"github.com/uber-go/tally/v4"
"github.com/uber/submitqueue/entities"
"github.com/uber/submitqueue/extensions/storage"
pb "github.com/uber/submitqueue/gateway/protopb"
"go.uber.org/zap"
)
Expand All @@ -14,13 +16,15 @@ import (
type LandController struct {
logger *zap.Logger
metricsScope tally.Scope
storeFactory storage.StoreFactory
}

// NewLandController creates a new instance of the gateway land controller
func NewLandController(logger *zap.Logger, scope tally.Scope) *LandController {
func NewLandController(logger *zap.Logger, scope tally.Scope, storeFactory storage.StoreFactory) *LandController {
return &LandController{
logger: logger,
metricsScope: scope,
storeFactory: storeFactory,
}
}

Expand All @@ -33,8 +37,18 @@ func (c *LandController) Land(ctx context.Context, req *pb.LandRequest) (*pb.Lan

c.metricsScope.Counter("land_request_count").Inc(1)

// TODO: Implement proper SQID generation and send the request to the appropriate queue. So far unix time to make it sequential.
sqid := fmt.Sprintf("%d", time.Now().Unix())
change := entities.Change{
Source: req.Change.GetSource(),
IDs: req.Change.GetIds(),
}
strategy := entities.RequestLandStrategy(int(req.Strategy))

request, err := c.storeFactory.GetRequestStore().Create(ctx, req.Queue, change, strategy, entities.RequestStateNew)
if err != nil {
return nil, fmt.Errorf("LandController failed to create request for queue=%s: %w", req.Queue, err)
}

sqid := request.GetID()

c.logger.Debug("land request received",
zap.String("queue", req.Queue),
Expand Down
114 changes: 111 additions & 3 deletions gateway/controller/land_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,70 @@ package controller

import (
"context"
"fmt"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/uber-go/tally/v4"
"github.com/uber/submitqueue/entities"
"github.com/uber/submitqueue/extensions/storage"
pb "github.com/uber/submitqueue/gateway/protopb"
"go.uber.org/zap"
)

type mockRequestStore struct {
createFunc func(ctx context.Context, queue string, change entities.Change, strategy entities.RequestLandStrategy, state entities.RequestState) (entities.Request, error)
}

func (m *mockRequestStore) Get(ctx context.Context, id string) (entities.Request, error) {
return entities.Request{}, nil
}

func (m *mockRequestStore) Create(ctx context.Context, queue string, change entities.Change, strategy entities.RequestLandStrategy, state entities.RequestState) (entities.Request, error) {
return m.createFunc(ctx, queue, change, strategy, state)
}

func (m *mockRequestStore) UpdateState(ctx context.Context, id string, version int32, newState entities.RequestState) error {
return nil
}

type mockStoreFactory struct {
requestStore storage.RequestStore
}

func (m *mockStoreFactory) GetRequestStore() storage.RequestStore {
return m.requestStore
}

func (m *mockStoreFactory) Close() error {
return nil
}

func TestNewLandController(t *testing.T) {
controller := NewLandController(zap.NewNop(), tally.NoopScope)
factory := &mockStoreFactory{requestStore: &mockRequestStore{
createFunc: func(ctx context.Context, queue string, change entities.Change, strategy entities.RequestLandStrategy, state entities.RequestState) (entities.Request, error) {
return entities.Request{}, nil
},
}}
controller := NewLandController(zap.NewNop(), tally.NoopScope, factory)
require.NotNil(t, controller)
}

func TestLand_ReturnsSqid(t *testing.T) {
controller := NewLandController(zap.NewNop(), tally.NoopScope)
factory := &mockStoreFactory{requestStore: &mockRequestStore{
createFunc: func(ctx context.Context, queue string, change entities.Change, strategy entities.RequestLandStrategy, state entities.RequestState) (entities.Request, error) {
return entities.Request{
Queue: queue,
Seq: 1,
Change: change,
LandStrategy: strategy,
State: state,
Version: 1,
}, nil
},
}}
controller := NewLandController(zap.NewNop(), tally.NoopScope, factory)
ctx := context.Background()

req := &pb.LandRequest{
Expand All @@ -26,5 +75,64 @@ func TestLand_ReturnsSqid(t *testing.T) {
resp, err := controller.Land(ctx, req)

require.NoError(t, err)
require.NotEmpty(t, resp.Sqid)
assert.Equal(t, "test-queue/1", resp.Sqid)
}

func TestLand_PassesCorrectParametersToStore(t *testing.T) {
var capturedQueue string
var capturedChange entities.Change
var capturedStrategy entities.RequestLandStrategy
var capturedState entities.RequestState

factory := &mockStoreFactory{requestStore: &mockRequestStore{
createFunc: func(ctx context.Context, queue string, change entities.Change, strategy entities.RequestLandStrategy, state entities.RequestState) (entities.Request, error) {
capturedQueue = queue
capturedChange = change
capturedStrategy = strategy
capturedState = state
return entities.Request{
Queue: queue,
Seq: 42,
Change: change,
LandStrategy: strategy,
State: state,
Version: 1,
}, nil
},
}}
controller := NewLandController(zap.NewNop(), tally.NoopScope, factory)
ctx := context.Background()

req := &pb.LandRequest{
Queue: "my-queue",
Change: &pb.Change{Source: "github", Ids: []string{"pr-1", "pr-2"}},
Strategy: pb.Strategy_STRATEGY_REBASE,
}
resp, err := controller.Land(ctx, req)

require.NoError(t, err)
assert.Equal(t, "my-queue", capturedQueue)
assert.Equal(t, "github", capturedChange.Source)
assert.Equal(t, []string{"pr-1", "pr-2"}, capturedChange.IDs)
assert.Equal(t, entities.RequestLandStrategyRebase, capturedStrategy)
assert.Equal(t, entities.RequestStateNew, capturedState)
assert.Equal(t, "my-queue/42", resp.Sqid)
}

func TestLand_ReturnsErrorOnStorageFailure(t *testing.T) {
factory := &mockStoreFactory{requestStore: &mockRequestStore{
createFunc: func(ctx context.Context, queue string, change entities.Change, strategy entities.RequestLandStrategy, state entities.RequestState) (entities.Request, error) {
return entities.Request{}, fmt.Errorf("database connection failed")
},
}}
controller := NewLandController(zap.NewNop(), tally.NoopScope, factory)
ctx := context.Background()

req := &pb.LandRequest{
Queue: "test-queue",
Change: &pb.Change{Source: "github", Ids: []string{"123"}},
}
_, err := controller.Land(ctx, req)

require.Error(t, err)
}
Loading
Loading