Skip to content
5 changes: 4 additions & 1 deletion entity/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ load("@rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "entity",
srcs = ["request.go"],
srcs = [
"change_provider.go",
"request.go",
],
importpath = "github.com/uber/submitqueue/entity",
visibility = ["//visibility:public"],
)
Expand Down
15 changes: 15 additions & 0 deletions entity/change_provider.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package entity

// ChangeProvider represents a code change from an external provider (e.g., a GitHub pull request or Gerrit changelist)
// along with its associated metadata. The object is immutable after creation.
type ChangeProvider struct {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

plz comment the struct itself

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

// RequestID is the globally unique identifier for the land request. Format: "<queue>/<counter_value>".
RequestID string
// ChangeProviderSrc defines the source of the change. For e.g. - Github, Gitlab etc.
ChangeProviderSrc string
// ChangeProviderID is the identifier specified by the change provider source. For e.g. - Github PR ID etc.
ChangeProviderID string
// Metadata is the interesting data from the change provider that we want to store.
// This is a freeform JSON object.
Metadata map[string]string
}
1 change: 1 addition & 0 deletions extension/storage/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ load("@rules_go//go:def.bzl", "go_library")
go_library(
name = "storage",
srcs = [
"change_provider_store.go",
"request_store.go",
"storage.go",
],
Expand Down
26 changes: 26 additions & 0 deletions extension/storage/change_provider_store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package storage

import (
"context"

"github.com/uber/submitqueue/entity"
)

// ChangeProviderStore is an interface that defines methods for managing change provider information in the database.
type ChangeProviderStore interface {
// Get retrieves information about a change by ID.
// Returns ErrNotFound if the change provider is not found.
//
// Note: The order of ChangeProvider entities here is not guaranteed to
// be the same as the request to which it belongs. The caller is repsonsible
// for inspecting and mapping the result of this function to the
// order of changes within the original request.
//
Get(ctx context.Context, requestID string) ([]entity.ChangeProvider, error)

// Create creates a new change provider.
Create(ctx context.Context, changeProvider entity.ChangeProvider) error

// There is no update function since once created, data is only ever read from this
// store.
}
1 change: 1 addition & 0 deletions extension/storage/mysql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ load("@rules_go//go:def.bzl", "go_library")
go_library(
name = "mysql",
srcs = [
"change_provider_store.go",
"request_store.go",
"storage.go",
],
Expand Down
88 changes: 88 additions & 0 deletions extension/storage/mysql/change_provider_store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package mysql

import (
"context"
"database/sql"
"encoding/json"
"errors"
"fmt"

"github.com/go-sql-driver/mysql"

"github.com/uber/submitqueue/entity"
"github.com/uber/submitqueue/extension/storage"
)

type changeProviderStore struct {
db *sql.DB
}

// NewChangeProviderStore creates a new MySQL-backed ChangeProviderStore.
func NewChangeProviderStore(db *sql.DB) storage.ChangeProviderStore {
return &changeProviderStore{db: db}
}

// Get retrieves change provider(s) by request ID. Returns ErrNotFound if the change provider is not found.
Comment thread
behinddwalls marked this conversation as resolved.
//
// Note: The order of ChangeProvider entities returned here is not guaranteed
// to be the same as the request to which it belongs. The caller is repsonsible
// for inspecting and mapping the result of this function to the
// order of changes within the original request.
//
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: empty line

func (s *changeProviderStore) Get(ctx context.Context, requestID string) ([]entity.ChangeProvider, error) {
rows, err := s.db.QueryContext(ctx,
"SELECT request_id, change_provider_src, change_provider_id, metadata FROM change_provider WHERE request_id = ?",
requestID,
)
if err != nil {
return nil, fmt.Errorf("failed to get change provider entities requestID=%s from the database: %w", requestID, err)
}
defer rows.Close()

var results []entity.ChangeProvider
for rows.Next() {
var cp entity.ChangeProvider
var metadataJSON []byte

if err := rows.Scan(&cp.RequestID, &cp.ChangeProviderSrc, &cp.ChangeProviderID, &metadataJSON); err != nil {
return nil, fmt.Errorf("failed to scan change provider entity requestID=%s from the database: %w", requestID, err)
}

if err := json.Unmarshal(metadataJSON, &cp.Metadata); err != nil {
return nil, fmt.Errorf("failed to unmarshal metadata for change provider entity requestID=%s from the database: %w", requestID, err)
}

results = append(results, cp)
}
if err := rows.Err(); err != nil {
return nil, fmt.Errorf("failed to iterate change provider entities requestID=%s from the database: %w", requestID, err)
}

if len(results) == 0 {
return nil, storage.WrapNotFound(fmt.Errorf("change provider entity requestID=%s", requestID))
}

return results, nil
}

// Create creates a new change provider. Returns ErrAlreadyExists if the entry already exists.
func (s *changeProviderStore) Create(ctx context.Context, changeProvider entity.ChangeProvider) error {
metadataJSON, err := json.Marshal(changeProvider.Metadata)
if err != nil {
return fmt.Errorf("failed to marshal metadata id=%s for Create change provider entity: %w", changeProvider.RequestID, err)
}

_, err = s.db.ExecContext(ctx,
"INSERT INTO change_provider (request_id, change_provider_src, change_provider_id, metadata) VALUES (?, ?, ?, ?)",
changeProvider.RequestID, changeProvider.ChangeProviderSrc, changeProvider.ChangeProviderID, metadataJSON,
)
if err != nil {
var mysqlErr *mysql.MySQLError
if errors.As(err, &mysqlErr) && mysqlErr.Number == 1062 {
return fmt.Errorf("change provider entity id=%s: %w", changeProvider.RequestID, storage.ErrAlreadyExists)
}
return fmt.Errorf("failed to insert change provider entity id=%s: %w", changeProvider.RequestID, err)
}

return nil
}
7 changes: 7 additions & 0 deletions extension/storage/mysql/schema/change_provider.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
CREATE TABLE IF NOT EXISTS change_provider (
request_id VARCHAR(255) NOT NULL,
change_provider_src VARCHAR(255) NOT NULL,
change_provider_id VARCHAR(255) NOT NULL,
metadata JSON NOT NUll,
PRIMARY KEY (request_id,change_provider_src,change_provider_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
15 changes: 11 additions & 4 deletions extension/storage/mysql/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ type MySQLParameters struct {
}

type mysqlStorage struct {
db *sql.DB
requestStore storage.RequestStore
db *sql.DB
requestStore storage.RequestStore
changeProviderStore storage.ChangeProviderStore
}

// NewStorage creates a new MySQL storage.
Expand All @@ -50,8 +51,9 @@ func NewStorage(p MySQLParameters) (storage.Storage, error) {
}

return &mysqlStorage{
db: db,
requestStore: NewRequestStore(db),
db: db,
requestStore: NewRequestStore(db),
changeProviderStore: NewChangeProviderStore(db),
}, nil
}

Expand All @@ -60,6 +62,11 @@ func (f *mysqlStorage) GetRequestStore() storage.RequestStore {
return f.requestStore
}

// GetChangeProviderStore returns the MySQL-backed ChangeProviderStore.
func (f *mysqlStorage) GetChangeProviderStore() storage.ChangeProviderStore {
return f.changeProviderStore
}

// Close closes the underlying database connection.
func (f *mysqlStorage) Close() error {
return f.db.Close()
Expand Down
3 changes: 3 additions & 0 deletions extension/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ type Storage interface {
// GetRequestStore returns the RequestStore instance.
GetRequestStore() RequestStore

// GetChangeProviderStore returns the ChangeProviderStore instance.
GetChangeProviderStore() ChangeProviderStore

// Close closes the storage and all underlying connections. Should only be called once at the end of the program.
Close() error
}
111 changes: 85 additions & 26 deletions gateway/controller/land_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,31 @@ func (m *mockRequestStore) UpdateState(ctx context.Context, id string, version i
return nil
}

type mockChangeProviderStore struct {
createFunc func(ctx context.Context, changeProvider entity.ChangeProvider) error
}

func (m *mockChangeProviderStore) Get(ctx context.Context, requestID string) ([]entity.ChangeProvider, error) {
return nil, nil
}

func (m *mockChangeProviderStore) Create(ctx context.Context, changeProvider entity.ChangeProvider) error {
return m.createFunc(ctx, changeProvider)
}

type mockStorage struct {
requestStore storage.RequestStore
requestStore storage.RequestStore
changeProviderStore storage.ChangeProviderStore
}

func (m *mockStorage) GetRequestStore() storage.RequestStore {
return m.requestStore
}

func (m *mockStorage) GetChangeProviderStore() storage.ChangeProviderStore {
return m.changeProviderStore
}

func (m *mockStorage) Close() error {
return nil
}
Expand All @@ -71,11 +88,18 @@ func noopPublisher() *mockPublisher {
}

func TestNewLandController(t *testing.T) {
store := &mockStorage{requestStore: &mockRequestStore{
createFunc: func(ctx context.Context, request entity.Request) error {
return nil
store := &mockStorage{
requestStore: &mockRequestStore{
createFunc: func(ctx context.Context, request entity.Request) error {
return nil
},
},
}}
changeProviderStore: &mockChangeProviderStore{
createFunc: func(ctx context.Context, changeProvider entity.ChangeProvider) error {
return nil
},
},
}
cnt := &mockCounter{nextFunc: func(ctx context.Context, domain string) (int64, error) {
return 1, nil
}}
Expand All @@ -84,11 +108,18 @@ func TestNewLandController(t *testing.T) {
}

func TestLand_ReturnsSqid(t *testing.T) {
store := &mockStorage{requestStore: &mockRequestStore{
createFunc: func(ctx context.Context, request entity.Request) error {
return nil
store := &mockStorage{
requestStore: &mockRequestStore{
createFunc: func(ctx context.Context, request entity.Request) error {
return nil
},
},
}}
changeProviderStore: &mockChangeProviderStore{
createFunc: func(ctx context.Context, changeProvider entity.ChangeProvider) error {
return nil
},
},
}
cnt := &mockCounter{nextFunc: func(ctx context.Context, domain string) (int64, error) {
return 1, nil
}}
Expand All @@ -108,12 +139,19 @@ func TestLand_ReturnsSqid(t *testing.T) {
func TestLand_PassesCorrectParametersToStore(t *testing.T) {
var capturedRequest entity.Request

store := &mockStorage{requestStore: &mockRequestStore{
createFunc: func(ctx context.Context, request entity.Request) error {
capturedRequest = request
return nil
store := &mockStorage{
requestStore: &mockRequestStore{
createFunc: func(ctx context.Context, request entity.Request) error {
capturedRequest = request
return nil
},
},
}}
changeProviderStore: &mockChangeProviderStore{
createFunc: func(ctx context.Context, changeProvider entity.ChangeProvider) error {
return nil
},
},
}
cnt := &mockCounter{nextFunc: func(ctx context.Context, domain string) (int64, error) {
return 42, nil
}}
Expand All @@ -139,11 +177,18 @@ func TestLand_PassesCorrectParametersToStore(t *testing.T) {
}

func TestLand_ReturnsErrorOnStorageFailure(t *testing.T) {
store := &mockStorage{requestStore: &mockRequestStore{
createFunc: func(ctx context.Context, request entity.Request) error {
return fmt.Errorf("database connection failed")
store := &mockStorage{
requestStore: &mockRequestStore{
createFunc: func(ctx context.Context, request entity.Request) error {
return fmt.Errorf("database connection failed")
},
},
}}
changeProviderStore: &mockChangeProviderStore{
createFunc: func(ctx context.Context, changeProvider entity.ChangeProvider) error {
return nil
},
},
}
cnt := &mockCounter{nextFunc: func(ctx context.Context, domain string) (int64, error) {
return 1, nil
}}
Expand All @@ -160,11 +205,18 @@ func TestLand_ReturnsErrorOnStorageFailure(t *testing.T) {
}

func TestLand_ReturnsErrorOnCounterFailure(t *testing.T) {
store := &mockStorage{requestStore: &mockRequestStore{
createFunc: func(ctx context.Context, request entity.Request) error {
return nil
store := &mockStorage{
requestStore: &mockRequestStore{
createFunc: func(ctx context.Context, request entity.Request) error {
return nil
},
},
}}
changeProviderStore: &mockChangeProviderStore{
createFunc: func(ctx context.Context, changeProvider entity.ChangeProvider) error {
return nil
},
},
}
cnt := &mockCounter{nextFunc: func(ctx context.Context, domain string) (int64, error) {
return 0, fmt.Errorf("counter unavailable")
}}
Expand All @@ -183,11 +235,18 @@ func TestLand_ReturnsErrorOnCounterFailure(t *testing.T) {
func TestLand_CounterDomainIncludesQueue(t *testing.T) {
var capturedDomain string

store := &mockStorage{requestStore: &mockRequestStore{
createFunc: func(ctx context.Context, request entity.Request) error {
return nil
store := &mockStorage{
requestStore: &mockRequestStore{
createFunc: func(ctx context.Context, request entity.Request) error {
return nil
},
},
}}
changeProviderStore: &mockChangeProviderStore{
createFunc: func(ctx context.Context, changeProvider entity.ChangeProvider) error {
return nil
},
},
}
cnt := &mockCounter{nextFunc: func(ctx context.Context, domain string) (int64, error) {
capturedDomain = domain
return 1, nil
Expand Down
Loading