Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,20 @@ Vendor-agnostic, pluggable interfaces with implementations in subdirectories:
2. Implementations at `extension/{ext}/{impl}/`
3. Factory interface for dependency injection and lifecycle management

**Design interfaces for the technology *space*, not the implementation in front of you.** The interface is a contract every backend will have to satisfy — SQL, key-value (DynamoDB, Bigtable), document, message queue, search, RPC, in-memory, mocks. If the contract assumes a capability that some plausible backend can't provide cheaply, you've baked the current impl's strengths into the API.

Common over-constraints to avoid:
- **Batch atomicity** (multi-row inserts as one transaction) — many KV stores can't do this. Prefer single-record primitives + caller loops + idempotency-on-retry.
- **Multi-key queries** (`WHERE x IN (...)`) — fine in SQL, awkward elsewhere. Prefer per-key reads.
- **Server-side filters** (joins, sub-queries, complex predicates) — push filtering and aggregation to the caller; keep the store responsible only for "get/put by key" semantics.
- **Transactions across entities** — virtually no distributed store offers this. Use eventual consistency + idempotency.
- **Strict ordering / exactly-once** in messaging — most queues are at-least-once with best-effort ordering. Make consumers idempotent.
- **Synchronous, low-latency calls** for things that may run remotely — design for retry/backoff and timeouts, not assumed-fast.

The cost of "callers loop over a small batch" is usually negligible. The cost of forcing every future backend to fake a capability the API demanded is permanent.

When in doubt, ask: *"If the next implementation were DynamoDB / Kafka / Bigtable / a remote RPC service / an in-memory map, could it satisfy this signature without contortion?"* If the answer is no, simplify the contract.

### Import Paths

- RPC Controllers: `github.com/uber/submitqueue/{service}/controller`
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ local-stovepipe-stop: ## Stop Stovepipe service

mocks: ## Generate mock files using mockgen
@echo "Generating mocks..."
@$(BAZEL) run @rules_go//go -- generate ./extension/storage/... ./extension/counter/... ./extension/queue/... ./extension/mergechecker/... ./extension/pusher/... ./extension/scorer/... ./core/consumer/...
@$(BAZEL) run @rules_go//go -- generate ./extension/storage/... ./extension/changestore/... ./extension/counter/... ./extension/queue/... ./extension/mergechecker/... ./extension/scorer/... ./core/consumer/...
@echo "Mocks generated successfully!"

proto: ## Generate protobuf files from .proto definitions
Expand Down
1 change: 1 addition & 0 deletions entity/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ go_library(
"batch_dependent.go",
"build.go",
"change_provider.go",
"change_record.go",
"land_request.go",
"queue_config.go",
"request.go",
Expand Down
57 changes: 57 additions & 0 deletions entity/change_record.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Copyright (c) 2025 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package entity

// ChangeRecord represents a single URI's claim by a request, persisted in the change store.
// The (Queue, URI, RequestID) triple is the identity and is immutable; Metadata may be
// updated over time as additional information about the change (e.g., PR title, author,
// mergeability) becomes available.
type ChangeRecord struct {
// URI identifies the change (RFC 3986). Same scheme/format as entity.Change.URIs.
// Example: "github://uber/submitqueue/pull/123/abc123def".
URI string `json:"uri"`

// RequestID is the owning land request that claimed this URI.
// Format matches entity.Request.ID: "<queue>/<counter_value>".
//
// RequestID participates in the change-store primary key so that concurrent claims
// by different requests on the same URI coexist as distinct rows. Same-request
// retries collide on the PK and are absorbed idempotently; different-request
// collisions surface as additional rows that callers detect via FindOverlapping.
RequestID string `json:"request_id"`

// Queue is the queue the owning request belongs to. It is the leading column of
// the change-store primary key, so queue-scoped duplicate checks become PK-prefix
// scans and the table is shardable by queue.
Queue string `json:"queue"`

// Metadata is a JSON-encoded blob of provider-specific information about the change
// (e.g., PR title, author, mergeable state). Stored as `'{}'` when no metadata has
// been populated yet; updated by downstream enrichment.
Metadata string `json:"metadata,omitempty"`

// CreatedAt is the Unix milliseconds timestamp when this record was first created.
CreatedAt int64 `json:"created_at"`

// UpdatedAt is the Unix milliseconds timestamp when this record's Metadata was last updated.
// Equal to CreatedAt when the record has never been updated.
UpdatedAt int64 `json:"updated_at"`

// Version is the optimistic-locking counter for mutable fields (Metadata).
// Starts at 1 on Create and is incremented by callers on every update.
// Mirrors the request-store convention: callers compute newVersion = oldVersion + 1
// and pass both to the update method; the store performs a pure conditional write.
Version int32 `json:"version"`
}
9 changes: 9 additions & 0 deletions extension/changestore/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
load("@rules_go//go:def.bzl", "go_library")

go_library(
name = "changestore",
srcs = ["change_store.go"],
importpath = "github.com/uber/submitqueue/extension/changestore",
visibility = ["//visibility:public"],
deps = ["//entity"],
)
24 changes: 24 additions & 0 deletions extension/changestore/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# ChangeStore

Vendor-agnostic interface for tracking per-URI claims by in-flight land requests.

Each record asserts that a specific URI (e.g., a GitHub PR) was claimed by a specific request, scoped to a queue. The store is read by the orchestrator's `validate` controller to detect duplicate requests — submissions whose URIs overlap with another in-flight request's URIs in the same queue.

The interface is intentionally per-record / per-URI so any backend (SQL, DynamoDB, Bigtable, …) can implement it without needing batch atomicity or multi-key query support. Callers that have multiple URIs to claim or check loop over them; the typical request has a small number of URIs (a single PR or a short stack), so the loop overhead is negligible.

## Semantics

- **Identity is immutable.** A record is keyed by `(Queue, URI, RequestID)`; once written, that triple is never mutated.
- **Queue leads the key.** Backends should make `Queue` the leading column of the primary key (or partition key, in shardable stores). All reads are queue-scoped, so this turns lookups into PK-prefix scans and keeps the table shardable.
- **`RequestID` in the key is intentional.** Concurrent claims by different requests on the same URI coexist as distinct rows. Same-request retries collide on the PK and are absorbed idempotently; cross-request collisions show up as additional rows that callers detect via `GetByURI`.
- **Metadata is required and mutable.** The `Metadata` field is JSON. The store treats `'{}'` as the canonical "no metadata yet" value — callers that pass an empty Go string get `'{}'` written. Downstream enrichment can update it; `UpdatedAt` reflects the last update.
- **Per-record writes, idempotent.** `Create` writes a single record. A primary-key conflict is silently ignored, which makes queue-redelivery of the same request a safe no-op. There is no batch atomicity in the contract — callers with multiple URIs loop and rely on idempotency to converge under partial failure / retry.
- **Per-URI reads, no filtering.** `GetByURI` returns every record for a given `(queue, uri)`. The store does not filter by `request_id` or by the owning request's state. Callers that want to skip self filter by `RequestID`; callers that want only live owners consult `RequestStore` for liveness.
- **Versioned for safe metadata updates.** Each record carries a `Version` integer (starts at 1). Future `UpdateMetadata` operations follow the same caller-owned-arithmetic + conditional-write pattern as `RequestStore.UpdateState` — the caller passes `oldVersion` and `newVersion`, and the store performs a pure conditional write.
- **Append-only by design.** Records are not deleted when the owning request reaches a terminal state; the historical claim is preserved for audit. Duplicate detection filters terminals out at query time via the controller-side liveness check.

## Implementing a Backend

1. Create `extension/changestore/{backend}/` directory.
2. Implement the `ChangeStore` interface.
3. Add schema files under `extension/changestore/{backend}/schema/` if the backend requires them.
51 changes: 51 additions & 0 deletions extension/changestore/change_store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Copyright (c) 2025 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package changestore

//go:generate mockgen -source=change_store.go -destination=mock/change_store_mock.go -package=mock

import (
"context"

"github.com/uber/submitqueue/entity"
)

// ChangeStore manages per-URI claim records for in-flight land requests.
// Each row records that a specific URI was claimed by a specific request, scoped to a queue.
// The (Queue, URI, RequestID) triple is the immutable identity of a record. Metadata may
// evolve over time.
//
// The interface is intentionally per-record / per-URI so that any backend (SQL, DynamoDB,
// Bigtable, …) can implement it without needing batch-atomicity or multi-key query support.
// Callers loop when they have multiple URIs to claim or check; the typical request has a
// small number of URIs (a single PR or a short stack), so the loop overhead is negligible.
type ChangeStore interface {
// Create persists a single ChangeRecord. A primary-key conflict on
// (Queue, URI, RequestID) is silently ignored, which makes the call
// idempotent under queue redeliveries of the same request. Records belonging
// to different requests do not conflict on the PK — cross-request overlap
// is detected by GetByURI, not by Create.
Create(ctx context.Context, record entity.ChangeRecord) error

// GetByURI returns every ChangeRecord for the given (queue, uri). Multiple
// requests can have claimed the same URI over time, so the slice may have
// any number of entries; an empty slice means no claim has ever been
// recorded for this URI in this queue.
//
// The store does not filter by request_id or by the owning request's
// state — callers that want to skip self filter by RequestID, and callers
// that want only live owners consult RequestStore for liveness.
GetByURI(ctx context.Context, queue string, uri string) ([]entity.ChangeRecord, error)
}
12 changes: 12 additions & 0 deletions extension/changestore/mock/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
load("@rules_go//go:def.bzl", "go_library")

go_library(
name = "mock",
srcs = ["change_store_mock.go"],
importpath = "github.com/uber/submitqueue/extension/changestore/mock",
visibility = ["//visibility:public"],
deps = [
"//entity",
"@org_uber_go_mock//gomock",
],
)
71 changes: 71 additions & 0 deletions extension/changestore/mock/change_store_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions extension/changestore/mysql/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
load("@rules_go//go:def.bzl", "go_library")

go_library(
name = "mysql",
srcs = ["change_store.go"],
importpath = "github.com/uber/submitqueue/extension/changestore/mysql",
visibility = ["//visibility:public"],
deps = [
"//core/metrics",
"//entity",
"//extension/changestore",
"@com_github_uber_go_tally_v4//:tally",
],
)
87 changes: 87 additions & 0 deletions extension/changestore/mysql/change_store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
// Copyright (c) 2025 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package mysql

import (
"context"
"database/sql"
"fmt"

"github.com/uber-go/tally/v4"

"github.com/uber/submitqueue/core/metrics"
"github.com/uber/submitqueue/entity"
"github.com/uber/submitqueue/extension/changestore"
)

type changeStore struct {
db *sql.DB
scope tally.Scope
}

// NewChangeStore creates a new MySQL-backed ChangeStore.
func NewChangeStore(db *sql.DB, scope tally.Scope) changestore.ChangeStore {
return &changeStore{db: db, scope: scope}
}

// Create inserts a single ChangeRecord. A primary-key conflict on
// (queue, uri, request_id) is silently ignored via INSERT IGNORE, so
// queue-redelivery of the same request is a no-op.
func (s *changeStore) Create(ctx context.Context, record entity.ChangeRecord) (retErr error) {
op := metrics.Begin(s.scope, "create")
defer func() { op.Complete(retErr) }()

// Use the empty JSON object as the canonical "no metadata yet" value.
// metadata is NOT NULL in the schema and the JSON column type rejects an empty string.
metadata := record.Metadata
if metadata == "" {
metadata = "{}"
}

const query = "INSERT IGNORE INTO `change` (uri, request_id, queue, metadata, created_at, updated_at, version) VALUES (?, ?, ?, ?, ?, ?, ?)"
if _, err := s.db.ExecContext(ctx, query,
record.URI, record.RequestID, record.Queue, metadata, record.CreatedAt, record.UpdatedAt, record.Version,
); err != nil {
return fmt.Errorf("failed to insert change record uri=%s request_id=%s: %w", record.URI, record.RequestID, err)
}
return nil
}

// GetByURI returns every ChangeRecord for (queue, uri). queue leads the WHERE
// clause to align with the (queue, uri, request_id) PK so this is a PK-prefix scan.
func (s *changeStore) GetByURI(ctx context.Context, queue string, uri string) (ret []entity.ChangeRecord, retErr error) {
op := metrics.Begin(s.scope, "get_by_uri")
defer func() { op.Complete(retErr) }()

const query = "SELECT uri, request_id, queue, metadata, created_at, updated_at, version FROM `change` WHERE queue = ? AND uri = ?"
rows, err := s.db.QueryContext(ctx, query, queue, uri)
if err != nil {
return nil, fmt.Errorf("failed to query change records for queue=%s uri=%s: %w", queue, uri, err)
}
defer rows.Close()

var results []entity.ChangeRecord
for rows.Next() {
var rec entity.ChangeRecord
if err := rows.Scan(&rec.URI, &rec.RequestID, &rec.Queue, &rec.Metadata, &rec.CreatedAt, &rec.UpdatedAt, &rec.Version); err != nil {
return nil, fmt.Errorf("failed to scan change record for queue=%s uri=%s: %w", queue, uri, err)
}
results = append(results, rec)
}
if err := rows.Err(); err != nil {
return nil, fmt.Errorf("failed to iterate change records for queue=%s uri=%s: %w", queue, uri, err)
}
return results, nil
}
5 changes: 5 additions & 0 deletions extension/changestore/mysql/schema/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
filegroup(
name = "schema",
srcs = glob(["*.sql"]),
visibility = ["//visibility:public"],
)
15 changes: 15 additions & 0 deletions extension/changestore/mysql/schema/change.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
-- request_id is part of the PK so concurrent claims by different requests on the
-- same URI coexist as distinct rows. Same-request retry → PK conflict (no-op via
-- INSERT IGNORE). Different-request collision → distinct row, surfaced by
-- FindOverlapping. Queue leads the PK so queue-scoped lookups are PK-prefix scans
-- and the table is shardable by queue.
CREATE TABLE IF NOT EXISTS `change` (
uri VARCHAR(255) NOT NULL,
request_id VARCHAR(255) NOT NULL,
queue VARCHAR(255) NOT NULL,
metadata JSON NOT NULL,
created_at BIGINT NOT NULL,
updated_at BIGINT NOT NULL,
version INT NOT NULL,
PRIMARY KEY (queue, uri, request_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
Loading
Loading