Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 1 addition & 11 deletions entities/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -1,18 +1,8 @@
load("@rules_go//go:def.bzl", "go_library", "go_test")
load("@rules_go//go:def.bzl", "go_library")

go_library(
name = "entities",
srcs = ["request.go"],
importpath = "github.com/uber/submitqueue/entities",
visibility = ["//visibility:public"],
)

go_test(
name = "entities_test",
srcs = ["request_test.go"],
embed = [":entities"],
deps = [
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
],
)
33 changes: 2 additions & 31 deletions entities/request.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,5 @@
package entities

import (
"fmt"
"strconv"
"strings"
)

// RequestLandStrategy defines the possible source control integration methods.
type RequestLandStrategy string
Expand Down Expand Up @@ -52,10 +47,10 @@ type Request struct {
// Immutable fields, fixed at request entity creation
// ****************

// ID is the globally unique identifier for the land request. Format: "<queue>/<counter_value>".
ID string
// Queue is the name of the queue processing the land request. Queue name is defined in the configuration and should be unique within the system.
Queue string
// Seq is an autoincrementing integer identifier for the land request. It is unique within the queue.
Seq int64
// Change is a number of code changes (such as pull requests) to land into the target branch. Target branch is defined by the queue configuration.
Change Change
// LandStrategy is the source control integration strategy to use for this land operation.
Expand All @@ -71,27 +66,3 @@ type Request struct {
// Versioning starts at 1 and is incremented for each change to the object.
Version int32
}

// GetID returns the globally unique identifier for the land request.
func (r *Request) GetID() string {
return fmt.Sprintf("%s/%d", r.Queue, r.Seq)
}

// ParseRequestID parses the globally unique identifier for the land request and returns the queue name and sequence number.
func ParseRequestID(id string) (queue string, seq int64, err error) {
parts := strings.Split(id, "/")
if len(parts) != 2 {
return "", 0, fmt.Errorf("invalid format of the request ID: %s; expected format: <queue>/<seq>", id)
}

seq, err = strconv.ParseInt(parts[1], 10, 64)
if err != nil {
return "", 0, fmt.Errorf("invalid sequence number in the request ID: %s; expected format: <queue>/<seq>; parsing error: %w", id, err)
}

if seq <= 0 {
return "", 0, fmt.Errorf("invalid sequence number in the request ID: %s; expected format: <queue>/<seq>; sequence number must be greater than 0 but got %d", id, seq)
}

return parts[0], seq, nil
}
112 changes: 0 additions & 112 deletions entities/request_test.go

This file was deleted.

2 changes: 2 additions & 0 deletions examples/server/gateway/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@ go_library(
importpath = "github.com/uber/submitqueue/examples/server/gateway",
visibility = ["//visibility:private"],
deps = [
"//extensions/counter/mysql",
"//extensions/storage/mysql",
"//gateway/controller",
"//gateway/protopb",
"@com_github_go_sql_driver_mysql//:mysql",
"@com_github_uber_go_tally_v4//:tally",
"@org_golang_google_grpc//:grpc",
"@org_golang_google_grpc//reflection",
Expand Down
13 changes: 12 additions & 1 deletion examples/server/gateway/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"context"
"database/sql"
"fmt"
"net"
"os"
Expand All @@ -10,7 +11,9 @@ import (
"syscall"
"time"

_ "github.com/go-sql-driver/mysql"
"github.com/uber-go/tally/v4"
mysqlcounter "github.com/uber/submitqueue/extensions/counter/mysql"
"github.com/uber/submitqueue/extensions/storage/mysql"
"github.com/uber/submitqueue/gateway/controller"
pb "github.com/uber/submitqueue/gateway/protopb"
Expand Down Expand Up @@ -98,12 +101,20 @@ func run() error {
}
defer storeFactory.Close()

// Initialize MySQL counter
counterDB, err := sql.Open("mysql", mysqlDSN)
if err != nil {
return fmt.Errorf("failed to open MySQL connection for counter: %w", err)
}
defer counterDB.Close()
cnt := mysqlcounter.NewCounter(counterDB)

// Create gRPC server
grpcServer := grpc.NewServer()

// Create controllers and wrap them for gRPC
pingController := controller.NewPingController(logger, scope)
landController := controller.NewLandController(logger, scope, storeFactory)
landController := controller.NewLandController(logger, scope, storeFactory, cnt)
gatewayServer := &GatewayServer{
pingController: pingController,
landController: landController,
Expand Down
8 changes: 8 additions & 0 deletions extensions/counter/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
load("@rules_go//go:def.bzl", "go_library")

go_library(
name = "counter",
srcs = ["counter.go"],
importpath = "github.com/uber/submitqueue/extensions/counter",
visibility = ["//visibility:public"],
)
35 changes: 35 additions & 0 deletions extensions/counter/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Counter

Vendor-agnostic interface for atomic sequential number generation.

## Interface

### Counter

Generates unique, sequential values scoped to a domain string.

```go
type Counter interface {
Next(ctx context.Context, domain string) (int64, error)
}
```

- **domain**: A string key that scopes the counter (max 255 characters). Each domain maintains its own independent sequence.
- **Next**: Atomically increments and returns the next value. The first call for a new domain returns 1. Safe for concurrent use; values are unique but ordering is not guaranteed.

## Usage

```go
cnt := mysqlcounter.NewCounter(db)

// Generate sequential IDs for different domains
val, err := cnt.Next(ctx, "request/my-queue") // returns 1
val, err = cnt.Next(ctx, "request/my-queue") // returns 2
val, err = cnt.Next(ctx, "request/other") // returns 1
```

## Implementing a Backend

1. Create `extensions/counter/{backend}/` directory
2. Implement the `Counter` interface
3. Add a schema file under `extensions/counter/{backend}/schema/` if the backend requires it
14 changes: 14 additions & 0 deletions extensions/counter/counter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package counter

import "context"

// Counter provides atomic sequential number generation for a given domain.
// Each call to Next returns the next value in the sequence for the specified domain.
// The value is guaranteed to be unique within the domain throughout the system and persisted accordingly.
type Counter interface {
// Next atomically increments the counter for the given domain and returns the new value.
// The first call for a new domain returns 1.
// The implementation should support at least 255 length domains.
// The function is safe to be called concurrently and will give unique results, but the order of the values is not guaranteed.
Next(ctx context.Context, domain string) (int64, error)
}
11 changes: 11 additions & 0 deletions extensions/counter/mysql/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
load("@rules_go//go:def.bzl", "go_library")

go_library(
name = "mysql",
srcs = ["counter.go"],
importpath = "github.com/uber/submitqueue/extensions/counter/mysql",
visibility = ["//visibility:public"],
deps = [
"//extensions/counter",
],
)
37 changes: 37 additions & 0 deletions extensions/counter/mysql/counter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package mysql

import (
"context"
"database/sql"
"fmt"

"github.com/uber/submitqueue/extensions/counter"
)

type mysqlCounter struct {
db *sql.DB
}

// NewCounter creates a new MySQL-backed Counter.
func NewCounter(db *sql.DB) counter.Counter {
return &mysqlCounter{db: db}
}

// Next atomically increments the counter for the given domain and returns the new value.
// Uses MySQL's LAST_INSERT_ID() to set the value atomically and read the incremented value.
func (c *mysqlCounter) Next(ctx context.Context, domain string) (int64, error) {
result, err := c.db.ExecContext(ctx,
"INSERT INTO counter (domain, value) VALUES (?, LAST_INSERT_ID(1)) ON DUPLICATE KEY UPDATE value = LAST_INSERT_ID(value + 1)",
domain,
)
if err != nil {
return 0, fmt.Errorf("failed to increment counter for domain=%s: %w", domain, err)
}

value, err := result.LastInsertId()
if err != nil {
return 0, fmt.Errorf("failed to get counter value for domain=%s: %w", domain, err)
}

return value, nil
}
5 changes: 5 additions & 0 deletions extensions/counter/mysql/schema/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
filegroup(
name = "schema",
srcs = glob(["*.sql"]),
visibility = ["//visibility:public"],
)
5 changes: 5 additions & 0 deletions extensions/counter/mysql/schema/counter.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
CREATE TABLE IF NOT EXISTS counter (
domain VARCHAR(255) NOT NULL,
value BIGINT NOT NULL,
PRIMARY KEY (domain)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
Loading