From 34b1ab29ac3222e3a35beb1231b457a02294971f Mon Sep 17 00:00:00 2001 From: sergeyb Date: Wed, 6 May 2026 22:00:26 +0000 Subject: [PATCH] feat(pusher): add Pusher extension and wire it into the merge controller MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Summary Introduces the `extension/pusher` interface that lands a list of `entity.Change` values onto a target branch with all-or-nothing atomicity, and a git-backed implementation that operates against a local checkout. The merge controller now calls `Pusher.Push` for each batch, transitions the batch to Succeeded on success or Failed on `pusher.ErrConflict`, and nacks any other push error so the queue can retry. ## Pusher interface (extension/pusher) - `Push([]Change) (Result, error)` with an explicit atomicity contract: on a non-nil error nothing has reached the remote. - Per-change `ChangeOutcome` reports either `OutcomeStatusCommitted` with the produced commit SHAs in apply order (one Change can land as multiple commits, e.g. a stack), or `OutcomeStatusAlreadyExisted` with no commits when the change is already present on the target. - `ErrConflict` sentinel marks user-caused apply failures so callers can route them to a non-retry path. ## git implementation (extension/pusher/git) - Per-Push cycle: fetch -> reset --hard origin/ -> cherry-pick every URI's head SHA -> push HEAD to refs/heads/. - Cherry-pick uses `--allow-empty` and recovers from "previous cherry-pick is now empty" via `--skip`; genuinely empty resulting commits are rolled back. Both surface as `AlreadyExisted`. - Empty-commit detection compares tree SHAs read via `git cat-file commit` rather than relying on `diff-tree --quiet`'s exit code 1, which has multiple meanings. - A mutex serializes concurrent invocations against the shared checkout. - Push is wrapped with `core/metrics` Begin/Complete so the operation emits the standard push.called / push.succeeded / push.failed / push.latency / push.latency_histogram with error-classification tags. Sub-event counters (push.empty_changes, push.reset_errors, push.cherry_pick_conflicts, push.git_push_errors, push.stale_base_retries, push.stale_base_giveup) live under the same op so dashboards filter on `push.*` to see the whole picture. ## Concurrent-push contention handling - After a push failure the implementation refetches the remote and compares the current tip to the SHA captured at reset time. If it advanced, the failure is treated as contention and the full fetch/reset/cherry-pick/push cycle is retried. Other failures propagate immediately. - Detection by ref-state comparison rather than git error message parsing — robust across git versions and rejection sources (NFF, hook reject, ref-store errors). - Retries are capped at `Params.MaxPushAttempts` (default 10) to bound the worst case on a pathologically busy remote. ## Merge controller (orchestrator/controller/merge) - Takes a `pusher.Pusher` dependency, loads each request in `batch.Contains` to collect changes in batch order, calls Push, and classifies the outcome inline with three explicit cases (success, conflict, generic error) — no helper, no error wrapping at this layer (retryability classification will be added by separate infra). - Version arithmetic stays in the controller per the optimistic locking contract: newVersion is computed before UpdateState and assigned to the in-memory entity only on success. ## Tests - Real-git fixture for the git Pusher: bare remote + checkout + author clone, with a `pre-receive` race hook that on its Nth call moves refs/heads/main to the Nth pre-staged SHA via update-ref (with GIT_QUARANTINE_PATH/GIT_OBJECT_DIRECTORY/ GIT_ALTERNATE_OBJECT_DIRECTORIES unset to bypass git's pre-receive quarantine) and exits 1, driving the retry loop with real git mechanics. Covers single/stacked URIs, already-existed, mixed-partial, conflict, recovery-after-conflict, reset-between-calls, retry-on-contention, and giveup-after-cap. - Merge controller tests rewritten with the new pusher mock and cover successful merge, multi-change ordering, conflict -> Failed, infra-error returns, terminal-batch idempotency, and store/publish failures. ## Wiring - `example/server/orchestrator/main.go`: `newPusher()` reads `PUSHER_CHECKOUT_PATH` (required), `PUSHER_REMOTE` (default "origin"), `PUSHER_TARGET` (default "main"). - Makefile `mocks` target adds `./extension/pusher/...`. Co-Authored-By: Claude Opus 4.7 --- Makefile | 2 +- example/server/orchestrator/BUILD.bazel | 2 + example/server/orchestrator/main.go | 31 +- extension/pusher/BUILD.bazel | 9 + extension/pusher/README.md | 44 ++ extension/pusher/git/BUILD.bazel | 30 ++ extension/pusher/git/git_pusher.go | 435 +++++++++++++++++ extension/pusher/git/git_pusher_test.go | 507 ++++++++++++++++++++ extension/pusher/mock/BUILD.bazel | 13 + extension/pusher/mock/pusher_mock.go | 58 +++ extension/pusher/pusher.go | 88 ++++ orchestrator/controller/merge/BUILD.bazel | 4 + orchestrator/controller/merge/merge.go | 123 +++-- orchestrator/controller/merge/merge_test.go | 445 ++++++++++++++--- 14 files changed, 1679 insertions(+), 112 deletions(-) create mode 100644 extension/pusher/BUILD.bazel create mode 100644 extension/pusher/README.md create mode 100644 extension/pusher/git/BUILD.bazel create mode 100644 extension/pusher/git/git_pusher.go create mode 100644 extension/pusher/git/git_pusher_test.go create mode 100644 extension/pusher/mock/BUILD.bazel create mode 100644 extension/pusher/mock/pusher_mock.go create mode 100644 extension/pusher/pusher.go diff --git a/Makefile b/Makefile index 622364cd..2d6bedef 100644 --- a/Makefile +++ b/Makefile @@ -245,7 +245,7 @@ local-stop: ## Stop all services (keep data) mocks: ## Generate mock files using mockgen @echo "Generating mocks..." - @$(BAZEL) run @rules_go//go -- generate ./extension/storage/... ./extension/counter/... ./extension/queue/... ./extension/mergechecker/... ./extension/scorer/... ./core/consumer/... + @$(BAZEL) run @rules_go//go -- generate ./extension/storage/... ./extension/counter/... ./extension/queue/... ./extension/mergechecker/... ./extension/pusher/... ./extension/scorer/... ./core/consumer/... @echo "Mocks generated successfully!" proto: ## Generate protobuf files from .proto definitions diff --git a/example/server/orchestrator/BUILD.bazel b/example/server/orchestrator/BUILD.bazel index bbbb3a8d..084a1857 100644 --- a/example/server/orchestrator/BUILD.bazel +++ b/example/server/orchestrator/BUILD.bazel @@ -20,6 +20,8 @@ go_library( "//extension/counter/mysql", "//extension/mergechecker", "//extension/mergechecker/github", + "//extension/pusher", + "//extension/pusher/git", "//extension/queue", "//extension/queue/mysql", "//extension/scorer/heuristic", diff --git a/example/server/orchestrator/main.go b/example/server/orchestrator/main.go index e4311d35..cb5a61c0 100644 --- a/example/server/orchestrator/main.go +++ b/example/server/orchestrator/main.go @@ -39,6 +39,8 @@ import ( mysqlcounter "github.com/uber/submitqueue/extension/counter/mysql" "github.com/uber/submitqueue/extension/mergechecker" githubchecker "github.com/uber/submitqueue/extension/mergechecker/github" + "github.com/uber/submitqueue/extension/pusher" + gitpusher "github.com/uber/submitqueue/extension/pusher/git" extqueue "github.com/uber/submitqueue/extension/queue" queueMySQL "github.com/uber/submitqueue/extension/queue/mysql" "github.com/uber/submitqueue/extension/scorer/heuristic" @@ -203,8 +205,14 @@ func run() error { return fmt.Errorf("failed to create change provider: %w", err) } + // Create pusher + psh, err := newPusher(logger, scope) + if err != nil { + return fmt.Errorf("failed to create pusher: %w", err) + } + // Register controllers - if err := registerControllers(c, logger.Sugar(), scope, registry, mc, cp, cnt, store); err != nil { + if err := registerControllers(c, logger.Sugar(), scope, registry, mc, cp, psh, cnt, store); err != nil { return err } @@ -389,7 +397,7 @@ func newTopicRegistry(q extqueue.Queue, subscriberName string) (consumer.TopicRe // │ │ │ // └────────┴────────────────────────┘ -func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope tally.Scope, registry consumer.TopicRegistry, mc mergechecker.MergeChecker, cp changeprovider.ChangeProvider, cnt counter.Counter, store storage.Storage) error { +func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope tally.Scope, registry consumer.TopicRegistry, mc mergechecker.MergeChecker, cp changeprovider.ChangeProvider, psh pusher.Pusher, cnt counter.Counter, store storage.Storage) error { requestController := start.NewController( logger, scope, @@ -495,6 +503,7 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t scope, store, registry, + psh, consumer.TopicKeyMerge, "orchestrator-merge", ) @@ -595,3 +604,21 @@ func newChangeProvider(logger *zap.Logger, scope tally.Scope) (changeprovider.Ch MetricsScope: scope.SubScope("changeprovider"), }), nil } + +// newPusher creates a git-backed Pusher bound to the configured checkout +// path, remote, and target branch. Configured via PUSHER_CHECKOUT_PATH +// (required), PUSHER_REMOTE (default "origin"), and PUSHER_TARGET (default +// "main"). +func newPusher(logger *zap.Logger, scope tally.Scope) (pusher.Pusher, error) { + checkout := os.Getenv("PUSHER_CHECKOUT_PATH") + if checkout == "" { + return nil, fmt.Errorf("PUSHER_CHECKOUT_PATH environment variable is required") + } + return gitpusher.NewPusher(gitpusher.Params{ + CheckoutPath: checkout, + Remote: getEnv("PUSHER_REMOTE", "origin"), + Target: getEnv("PUSHER_TARGET", "main"), + Logger: logger.Sugar(), + MetricsScope: scope.SubScope("pusher"), + }), nil +} diff --git a/extension/pusher/BUILD.bazel b/extension/pusher/BUILD.bazel new file mode 100644 index 00000000..a1aa1383 --- /dev/null +++ b/extension/pusher/BUILD.bazel @@ -0,0 +1,9 @@ +load("@rules_go//go:def.bzl", "go_library") + +go_library( + name = "pusher", + srcs = ["pusher.go"], + importpath = "github.com/uber/submitqueue/extension/pusher", + visibility = ["//visibility:public"], + deps = ["//entity"], +) diff --git a/extension/pusher/README.md b/extension/pusher/README.md new file mode 100644 index 00000000..0703cfd0 --- /dev/null +++ b/extension/pusher/README.md @@ -0,0 +1,44 @@ +# Pusher + +Pluggable abstraction for landing a list of `entity.Change` values onto a +target branch and pushing the result to a source-control remote. + +## Interface + +`Pusher` exposes a single `Push` method that accepts a list of changes. +Implementations are bound to a specific `(checkout, remote, target)` tuple +at construction time, so the interface itself stays vendor- and +configuration-agnostic. + +The interface enforces an **all-or-nothing atomicity contract**: when `Push` +returns an error, no change has reached the remote — neither partially nor +fully. Callers can treat a non-nil error as "the remote is exactly as it was +before the call". The `ErrConflict` sentinel marks user-caused failures so +callers can route them to a non-retry path. + +A successful `Push` returns one `ChangeOutcome` per input change in input +order. Each outcome reports either: + +- `OutcomeStatusCommitted` with the list of `CommitSHAs` produced on the + target branch (one change can land as multiple commits, e.g. a stack of + PRs); or +- `OutcomeStatusAlreadyExisted` with no commits, when the change is already + present on the target branch (previously landed via another path, or + subsumed by an earlier change in the same push). Git surfaces this as + "rebased out" during a cherry-pick. + +## Implementations + +- [`git/`](git/) — applies changes against a local checkout via `git + cherry-pick`, then `git push`. Construction takes the path to the + checkout, the remote name, and the target branch; the implementation + owns that working tree and serializes concurrent invocations. + +## Adding a new backend + +1. Create `extension/pusher/{backend}/` with a `Pusher` implementation. +2. Bind the implementation to its checkout/remote/target at construction. +3. Map each `entity.Change` to the backend's commit/push primitives. +4. Honour the atomicity contract: never publish partial state. Return + `ErrConflict` (wrapped) for user-caused apply failures and a plain error + for transient infra failures. diff --git a/extension/pusher/git/BUILD.bazel b/extension/pusher/git/BUILD.bazel new file mode 100644 index 00000000..cfa7623d --- /dev/null +++ b/extension/pusher/git/BUILD.bazel @@ -0,0 +1,30 @@ +load("@rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "git", + srcs = ["git_pusher.go"], + importpath = "github.com/uber/submitqueue/extension/pusher/git", + visibility = ["//visibility:public"], + deps = [ + "//core/metrics", + "//entity", + "//entity/github", + "//extension/pusher", + "@com_github_uber_go_tally_v4//:tally", + "@org_uber_go_zap//:zap", + ], +) + +go_test( + name = "git_test", + srcs = ["git_pusher_test.go"], + embed = [":git"], + deps = [ + "//entity", + "//extension/pusher", + "@com_github_stretchr_testify//assert", + "@com_github_stretchr_testify//require", + "@com_github_uber_go_tally_v4//:tally", + "@org_uber_go_zap//zaptest", + ], +) diff --git a/extension/pusher/git/git_pusher.go b/extension/pusher/git/git_pusher.go new file mode 100644 index 00000000..2ff4b885 --- /dev/null +++ b/extension/pusher/git/git_pusher.go @@ -0,0 +1,435 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package git is a simple Pusher implementation backed by a local git +// checkout. +// +// On every Push call the implementation: +// +// 1. Fetches the configured remote. +// 2. Resets the checkout's HEAD to refs/remotes//. +// 3. Cherry-picks the head SHA of every URI of every Change, in order. +// A pick that produces no new content (because the change is already +// present on the target branch) is what git surfaces as "rebased out" +// and is recorded as OutcomeStatusAlreadyExisted. +// A pick that fails to apply cleanly is treated as a conflict. +// 4. Pushes HEAD to refs/heads/ on the remote. +// +// Atomicity: nothing is published to the remote until step 4 succeeds. If +// any cherry-pick fails the in-progress pick is aborted and Push returns an +// error without ever invoking step 4. +// +// Contention: if step 4 fails because the remote tip moved between step 2 +// and step 4 (typically a concurrent push from another writer), the whole +// fetch/reset/cherry-pick/push cycle is retried. Detection works by +// re-fetching the remote tip after a push failure and comparing it to the +// SHA we reset to in step 2: if it advanced, the failure is treated as +// contention. Other push failures (network, auth, hook reject without ref +// change) propagate immediately. Retries are capped at +// Params.MaxPushAttempts (default 10) to bound the worst case. +// +// Construction takes the path to an existing checkout, the remote name, and +// the target branch — the implementation owns the working tree at that path +// for the duration of any in-flight Push call and serializes concurrent +// invocations. +// +// Change URIs are parsed using the github-family URI format (see +// entity/github), so each URI's last segment is interpreted as the head +// commit SHA. The SHA must be reachable from a ref on the remote so that +// `git fetch` makes it available locally. +package git + +import ( + "bytes" + "context" + "fmt" + "os/exec" + "strings" + "sync" + + "github.com/uber-go/tally/v4" + "go.uber.org/zap" + + coremetrics "github.com/uber/submitqueue/core/metrics" + "github.com/uber/submitqueue/entity" + entitygithub "github.com/uber/submitqueue/entity/github" + "github.com/uber/submitqueue/extension/pusher" +) + +// defaultMaxPushAttempts caps the retry loop in Push when the remote tip +// keeps moving under us. Bounded to prevent an infinite loop against a +// pathologically busy remote. +const defaultMaxPushAttempts = 10 + +// Params holds the dependencies for the git Pusher. +type Params struct { + // CheckoutPath is the absolute path to an existing git checkout that the + // Pusher will operate against. The Pusher owns this working tree. + CheckoutPath string + // Remote is the name of the git remote to fetch from and push to + // (e.g. "origin"). + Remote string + // Target is the destination branch ref on the remote (e.g. "main"). + Target string + // Logger is the structured logger. + Logger *zap.SugaredLogger + // MetricsScope is the metrics scope for instrumentation. + MetricsScope tally.Scope + // MaxPushAttempts caps how many times Push retries the full + // fetch/reset/cherry-pick/push cycle when the remote tip moves under + // it. Defaults to defaultMaxPushAttempts when zero or negative. + MaxPushAttempts int +} + +// gitPusher implements pusher.Pusher by shelling out to the `git` CLI +// against a local checkout. +type gitPusher struct { + checkoutPath string + remote string + target string + logger *zap.SugaredLogger + metricsScope tally.Scope + maxPushAttempts int + + // mu serializes concurrent Push calls — the underlying checkout cannot + // be safely shared between operations. + mu sync.Mutex +} + +// Verify gitPusher implements pusher.Pusher at compile time. +var _ pusher.Pusher = (*gitPusher)(nil) + +// NewPusher constructs a new git-backed Pusher operating against the given +// checkout. The checkout must already exist and have the configured remote. +func NewPusher(params Params) pusher.Pusher { + maxAttempts := params.MaxPushAttempts + if maxAttempts <= 0 { + maxAttempts = defaultMaxPushAttempts + } + return &gitPusher{ + checkoutPath: params.CheckoutPath, + remote: params.Remote, + target: params.Target, + logger: params.Logger.Named("git_pusher"), + metricsScope: params.MetricsScope.SubScope("git_pusher"), + maxPushAttempts: maxAttempts, + } +} + +// Push fulfils the pusher.Pusher contract. +func (p *gitPusher) Push(ctx context.Context, changes []entity.Change) (ret pusher.Result, retErr error) { + op := coremetrics.Begin(p.metricsScope, "push") + defer func() { op.Complete(retErr) }() + + p.mu.Lock() + defer p.mu.Unlock() + + if len(changes) == 0 { + coremetrics.NamedCounter(p.metricsScope, "push", "empty_changes", 1) + return pusher.Result{}, fmt.Errorf("push called with no changes") + } + + p.logger.Debugw("starting push", + "target", p.target, + "remote", p.remote, + "change_count", len(changes), + ) + + var lastErr error + for attempt := 1; attempt <= p.maxPushAttempts; attempt++ { + baseSHA, outcomes, err := p.tryPush(ctx, changes) + if err == nil { + p.logger.Debugw("push complete", + "target", p.target, + "outcomes", outcomes, + ) + return pusher.Result{Outcomes: outcomes}, nil + } + + // Was the failure caused by the remote tip moving under us between + // reset and push (concurrent-push contention)? That's the only push + // failure worth retrying; everything else (network, auth, hook + // reject without ref change) is fatal. Detection is by re-fetching + // the remote tip and comparing to baseSHA — robust against varying + // git error message formats. baseSHA is empty when the failure + // happened before reset captured a base; treat those as fatal too. + if baseSHA == "" { + return pusher.Result{}, err + } + currentSHA, refetchErr := p.refetchTipSHA(ctx) + if refetchErr != nil { + return pusher.Result{}, fmt.Errorf("refetch after push failure failed: %v (original push error: %w)", refetchErr, err) + } + if currentSHA == baseSHA { + return pusher.Result{}, err + } + + coremetrics.NamedCounter(p.metricsScope, "push", "stale_base_retries", 1) + p.logger.Warnw("remote tip moved during push, retrying", + "attempt", attempt, + "max_attempts", p.maxPushAttempts, + "base_sha", baseSHA, + "current_sha", currentSHA, + "err", err, + ) + lastErr = err + } + + coremetrics.NamedCounter(p.metricsScope, "push", "stale_base_giveup", 1) + return pusher.Result{}, fmt.Errorf("exceeded %d push attempts due to remote contention: %w", p.maxPushAttempts, lastErr) +} + +// tryPush runs one full reset+cherry-pick+push cycle. The returned baseSHA +// is the SHA the cycle was based on (set as soon as resetToRemote completes) +// so the caller can distinguish concurrent-push contention from other push +// failures. baseSHA is empty when the failure happened before reset +// produced a base. +func (p *gitPusher) tryPush(ctx context.Context, changes []entity.Change) (string, []pusher.ChangeOutcome, error) { + if err := p.resetToRemote(ctx); err != nil { + coremetrics.NamedCounter(p.metricsScope, "push", "reset_errors", 1) + return "", nil, err + } + baseSHA, err := p.headSHA(ctx) + if err != nil { + return "", nil, err + } + + outcomes, err := p.cherryPickAll(ctx, changes) + if err != nil { + // Best-effort cleanup so the next attempt starts from a known state. + // The next attempt starts with resetToRemote regardless, so we don't + // care if --abort itself fails (e.g., no pick is in progress). + _, _ = p.run(ctx, nil, "cherry-pick", "--abort") + return baseSHA, nil, err + } + + if err := p.push(ctx); err != nil { + coremetrics.NamedCounter(p.metricsScope, "push", "git_push_errors", 1) + return baseSHA, nil, err + } + return baseSHA, outcomes, nil +} + +// headSHA returns the SHA at HEAD in the local checkout. +func (p *gitPusher) headSHA(ctx context.Context) (string, error) { + out, err := p.run(ctx, nil, "rev-parse", "HEAD") + if err != nil { + return "", fmt.Errorf("git rev-parse HEAD: %w", err) + } + return strings.TrimSpace(string(out)), nil +} + +// refetchTipSHA fetches the remote and returns the current SHA at +// refs/remotes//. Used after a push failure to detect +// whether the remote tip moved under us. +func (p *gitPusher) refetchTipSHA(ctx context.Context) (string, error) { + if _, err := p.run(ctx, nil, "fetch", p.remote); err != nil { + return "", fmt.Errorf("git fetch %s: %w", p.remote, err) + } + remoteRef := p.remote + "/" + p.target + out, err := p.run(ctx, nil, "rev-parse", remoteRef) + if err != nil { + return "", fmt.Errorf("git rev-parse %s: %w", remoteRef, err) + } + return strings.TrimSpace(string(out)), nil +} + +// resetToRemote fetches the configured remote and hard-resets the checkout's +// HEAD to refs/remotes//, producing a clean working tree +// pinned to the latest remote tip. +func (p *gitPusher) resetToRemote(ctx context.Context) error { + if _, err := p.run(ctx, nil, "fetch", p.remote); err != nil { + return fmt.Errorf("git fetch %s: %w", p.remote, err) + } + remoteRef := p.remote + "/" + p.target + if _, err := p.run(ctx, nil, "reset", "--hard", remoteRef); err != nil { + return fmt.Errorf("git reset --hard %s: %w", remoteRef, err) + } + if _, err := p.run(ctx, nil, "clean", "-fdx"); err != nil { + return fmt.Errorf("git clean: %w", err) + } + return nil +} + +// cherryPickAll walks the changes in order, cherry-picking every URI's head +// SHA, and returns one ChangeOutcome per Change in the same order. +func (p *gitPusher) cherryPickAll(ctx context.Context, changes []entity.Change) ([]pusher.ChangeOutcome, error) { + outcomes := make([]pusher.ChangeOutcome, 0, len(changes)) + for _, change := range changes { + commits, err := p.cherryPickChange(ctx, change) + if err != nil { + return nil, err + } + status := pusher.OutcomeStatusCommitted + if len(commits) == 0 { + status = pusher.OutcomeStatusAlreadyExisted + } + outcomes = append(outcomes, pusher.ChangeOutcome{ + Change: change, + Status: status, + CommitSHAs: commits, + }) + } + return outcomes, nil +} + +// cherryPickChange parses each URI in the change, fetches the referenced +// SHA, and cherry-picks it. It returns the list of new commit SHAs +// produced for this change (empty if every pick was a no-op because the +// content was already on the target branch). +func (p *gitPusher) cherryPickChange(ctx context.Context, change entity.Change) ([]string, error) { + var commits []string + for _, uri := range change.URIs { + cid, err := entitygithub.ParseChangeID(uri) + if err != nil { + return nil, fmt.Errorf("invalid change URI %q: %w", uri, err) + } + + sha, picked, err := p.cherryPickSHA(ctx, cid.HeadCommitSHA) + if err != nil { + return nil, fmt.Errorf("cherry-pick %s (uri %q): %w", cid.HeadCommitSHA, uri, err) + } + if picked { + commits = append(commits, sha) + } + } + return commits, nil +} + +// cherryPickSHA cherry-picks a single SHA. It returns: +// - the new commit SHA and picked=true on a successful pick that produced +// a non-empty commit; +// - "" and picked=false when the pick is a no-op (the change is already +// on the target branch); the empty commit is rolled back so it doesn't +// get pushed; +// - an error: pusher.ErrConflict when the pick fails to apply because of a conflict (generally a user-caused failure), +// or any other error when the pick fails for any other reason (potentially recoverable). +// +// `--allow-empty` covers the case where the source commit itself was +// originally empty. For redundant picks (the change is already on target, +// so applying it produces no new diff) git refuses with "previous +// cherry-pick is now empty"; we recover by running `cherry-pick --skip` +// and reporting the change as already-existed (what git would call +// "rebased out"). +func (p *gitPusher) cherryPickSHA(ctx context.Context, sha string) (string, bool, error) { + out, err := p.runCombined(ctx, nil, "cherry-pick", "--allow-empty", sha) + if err != nil { + if isRedundantCherryPick(out) { + if _, skipErr := p.run(ctx, nil, "cherry-pick", "--skip"); skipErr != nil { + return "", false, fmt.Errorf("git cherry-pick --skip after redundant pick: %w", skipErr) + } + return "", false, nil + } + coremetrics.NamedCounter(p.metricsScope, "push", "cherry_pick_conflicts", 1) + return "", false, fmt.Errorf("%w: git cherry-pick %s: %s", pusher.ErrConflict, sha, strings.TrimSpace(string(out))) + } + + // `--allow-empty` lets a genuinely empty source commit through as an + // empty commit on target. Detect and roll that back so it doesn't get + // pushed, and report it as already-existed. + empty, err := p.isEmptyHEADCommit(ctx) + if err != nil { + return "", false, err + } + if empty { + if _, err := p.run(ctx, nil, "reset", "--hard", "HEAD^"); err != nil { + return "", false, fmt.Errorf("git reset --hard HEAD^ after empty pick: %w", err) + } + return "", false, nil + } + + headOut, err := p.run(ctx, nil, "rev-parse", "HEAD") + if err != nil { + return "", false, fmt.Errorf("git rev-parse HEAD: %w", err) + } + return strings.TrimSpace(string(headOut)), true, nil +} + +// isEmptyHEADCommit returns true when HEAD's tree matches HEAD^'s tree — +// i.e. the most recent commit introduces no changes. +func (p *gitPusher) isEmptyHEADCommit(ctx context.Context) (bool, error) { + headTree, err := p.commitTreeSHA(ctx, "HEAD") + if err != nil { + return false, err + } + parentTree, err := p.commitTreeSHA(ctx, "HEAD^") + if err != nil { + return false, err + } + return headTree == parentTree, nil +} + +// commitTreeSHA returns the tree SHA recorded in the commit object at ref. +// It reads the raw commit via `git cat-file commit` and parses the leading +// `tree ` line — the same data `rev-parse ^{tree}` would peel to, +// but without depending on revision-syntax magic. +func (p *gitPusher) commitTreeSHA(ctx context.Context, ref string) (string, error) { + out, err := p.run(ctx, nil, "cat-file", "commit", ref) + if err != nil { + return "", fmt.Errorf("git cat-file commit %s: %w", ref, err) + } + firstLine, _, _ := strings.Cut(string(out), "\n") + const prefix = "tree " + if !strings.HasPrefix(firstLine, prefix) { + return "", fmt.Errorf("git cat-file commit %s: unexpected first line %q", ref, firstLine) + } + return strings.TrimSpace(firstLine[len(prefix):]), nil +} + +// push pushes the current HEAD to refs/heads/ on the remote. +func (p *gitPusher) push(ctx context.Context) error { + refspec := "HEAD:refs/heads/" + p.target + if _, err := p.run(ctx, nil, "push", p.remote, refspec); err != nil { + return fmt.Errorf("git push %s %s: %w", p.remote, refspec, err) + } + return nil +} + +// run executes a `git` command in the checkout. Returns captured stdout and +// an error that includes captured stderr for diagnostics. +func (p *gitPusher) run(ctx context.Context, stdin []byte, args ...string) ([]byte, error) { + cmd := exec.CommandContext(ctx, "git", args...) + cmd.Dir = p.checkoutPath + if stdin != nil { + cmd.Stdin = bytes.NewReader(stdin) + } + var stdout, stderr bytes.Buffer + cmd.Stdout = &stdout + cmd.Stderr = &stderr + if err := cmd.Run(); err != nil { + return nil, fmt.Errorf("%w: %s", err, strings.TrimSpace(stderr.String())) + } + return stdout.Bytes(), nil +} + +// runCombined is like run but returns combined stdout+stderr both on success +// and failure. Used when the caller needs to inspect git's diagnostic +// output (e.g., to detect "previous cherry-pick is now empty"). +func (p *gitPusher) runCombined(ctx context.Context, stdin []byte, args ...string) ([]byte, error) { + cmd := exec.CommandContext(ctx, "git", args...) + cmd.Dir = p.checkoutPath + if stdin != nil { + cmd.Stdin = bytes.NewReader(stdin) + } + return cmd.CombinedOutput() +} + +// isRedundantCherryPick reports whether git's cherry-pick output indicates +// the pick was rejected because the change is already present on target +// (i.e. applying it would produce no diff). +func isRedundantCherryPick(out []byte) bool { + s := string(out) + return strings.Contains(s, "previous cherry-pick is now empty") || + strings.Contains(s, "nothing to commit") +} diff --git a/extension/pusher/git/git_pusher_test.go b/extension/pusher/git/git_pusher_test.go new file mode 100644 index 00000000..e79221c0 --- /dev/null +++ b/extension/pusher/git/git_pusher_test.go @@ -0,0 +1,507 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package git + +import ( + "bytes" + "context" + "errors" + "fmt" + "os" + "os/exec" + "path/filepath" + "strconv" + "strings" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/uber-go/tally/v4" + "go.uber.org/zap/zaptest" + + "github.com/uber/submitqueue/entity" + "github.com/uber/submitqueue/extension/pusher" +) + +// gitFixture provides a bare "remote" repository plus a working checkout +// that pushes to it. Tests run real `git` commands so we exercise the same +// code path as production. +// +// The fixture also exposes helpers for pushing additional commits to side +// branches on the remote, so each test can build the SHAs it needs the +// Pusher to cherry-pick. +type gitFixture struct { + root string + remoteDir string + checkoutDir string + authorDir string // a separate working clone used to author "PR" commits +} + +func setupGitFixture(t *testing.T) gitFixture { + t.Helper() + + if _, err := exec.LookPath("git"); err != nil { + t.Skip("git not available on PATH") + } + + root := t.TempDir() + remoteDir := filepath.Join(root, "remote.git") + checkoutDir := filepath.Join(root, "checkout") + authorDir := filepath.Join(root, "author") + + mustGit(t, root, "init", "--bare", "-b", "main", remoteDir) + + // Seed main with one initial commit so the Pusher's reset/fetch flow has + // something to land on. + mustGit(t, root, "init", "-b", "main", authorDir) + configRepo(t, authorDir, "author", "author@example.com") + require.NoError(t, writeFile(filepath.Join(authorDir, "hello.txt"), "hello\nworld\n")) + mustGit(t, authorDir, "add", ".") + mustGit(t, authorDir, "commit", "-m", "seed") + mustGit(t, authorDir, "remote", "add", "origin", remoteDir) + mustGit(t, authorDir, "push", "origin", "main") + + mustGit(t, root, "clone", remoteDir, checkoutDir) + configRepo(t, checkoutDir, "checkout", "checkout@example.com") + + return gitFixture{ + root: root, + remoteDir: remoteDir, + checkoutDir: checkoutDir, + authorDir: authorDir, + } +} + +// configRepo applies the test-only config that lets git commit work in a +// sandbox without GPG signing, system identity, or system git hooks. The +// devpod environment installs hooks via the system git config (core.hooksPath +// = /etc/git-hooks) that interfere with test commits — point each test repo +// at an empty hooks dir to disarm them without resorting to --no-verify. +func configRepo(t *testing.T, dir, name, email string) { + mustGit(t, dir, "config", "user.name", name) + mustGit(t, dir, "config", "user.email", email) + mustGit(t, dir, "config", "commit.gpgsign", "false") + mustGit(t, dir, "config", "tag.gpgsign", "false") + + hooksDir := filepath.Join(dir, ".no-hooks") + require.NoError(t, os.MkdirAll(hooksDir, 0o755)) + mustGit(t, dir, "config", "core.hooksPath", hooksDir) +} + +// pushPRCommit creates a single commit on a feature branch on the remote +// branched off the *current* origin/main, returning the resulting SHA. +func (f gitFixture) pushPRCommit(t *testing.T, branch, path, contents, message string) string { + t.Helper() + mustGit(t, f.authorDir, "fetch", "origin") + return f.pushPRCommitFrom(t, "origin/main", branch, path, contents, message) +} + +// pushPRCommitFrom creates a single commit on `branch` based on the given +// base ref, returning the resulting SHA. Use this to create branches that +// diverge from a specific ancestor (so they conflict when cherry-picked +// onto a newer main). +func (f gitFixture) pushPRCommitFrom(t *testing.T, base, branch, path, contents, message string) string { + t.Helper() + mustGit(t, f.authorDir, "checkout", "-B", branch, base) + require.NoError(t, writeFile(filepath.Join(f.authorDir, path), contents)) + mustGit(t, f.authorDir, "add", ".") + mustGit(t, f.authorDir, "commit", "-m", message) + mustGit(t, f.authorDir, "push", "-f", "origin", branch) + out := mustGitOutput(t, f.authorDir, "rev-parse", "HEAD") + return strings.TrimSpace(string(out)) +} + +// remoteSHA returns the SHA at refs/heads/ on the bare remote. +func (f gitFixture) remoteSHA(t *testing.T, branch string) string { + t.Helper() + out := mustGitOutput(t, f.remoteDir, "rev-parse", branch) + return strings.TrimSpace(string(out)) +} + +// landOnMain cherry-picks an arbitrary SHA directly onto main on the +// remote, simulating "this content is already on the target branch" for +// rebased-out tests. +func (f gitFixture) landOnMain(t *testing.T, sha string) { + t.Helper() + mustGit(t, f.authorDir, "fetch", "origin") + mustGit(t, f.authorDir, "checkout", "-B", "land", "origin/main") + mustGit(t, f.authorDir, "cherry-pick", sha) + mustGit(t, f.authorDir, "push", "origin", "land:main") +} + +// uri builds a github-format URI ending in `sha` so the Pusher's parser +// resolves it to that SHA. +func uri(sha string) string { + return fmt.Sprintf("github://uber/submitqueue/1/%s", sha) +} + +func (f gitFixture) newPusher(t *testing.T) pusher.Pusher { + return NewPusher(Params{ + CheckoutPath: f.checkoutDir, + Remote: "origin", + Target: "main", + Logger: zaptest.NewLogger(t).Sugar(), + MetricsScope: tally.NoopScope, + }) +} + +// installRaceHook writes a pre-receive hook on the bare remote that +// simulates concurrent pushes. On its Nth invocation it reads the Nth line +// of race-shas, points refs/heads/main at that SHA via update-ref, and +// exits 1 (rejecting the current push). Once race-shas is exhausted it +// exits 0 and the push goes through. +// +// Combined with the gitPusher's contention-detection (refetch + compare +// base SHA), this lets a test drive the full retry loop using only real +// git mechanics — the second-attempt's reset picks up the moved tip and +// proceeds from there. +func (f gitFixture) installRaceHook(t *testing.T, raceSHAs []string) { + t.Helper() + hookDir := filepath.Join(f.remoteDir, "hooks") + require.NoError(t, os.MkdirAll(hookDir, 0o755)) + // Override the system-wide core.hooksPath so the hook we just wrote + // actually fires on the bare remote (the devpod sets a global + // /etc/git-hooks directory that would otherwise win). + mustGit(t, f.remoteDir, "config", "core.hooksPath", hookDir) + require.NoError(t, writeFile( + filepath.Join(hookDir, "race-shas"), + strings.Join(raceSHAs, "\n")+"\n", + )) + const script = `#!/bin/sh +counter_file="$GIT_DIR/hooks/race-counter" +race_sha_file="$GIT_DIR/hooks/race-shas" +count=$(cat "$counter_file" 2>/dev/null || echo 0) +count=$((count + 1)) +echo "$count" > "$counter_file" +next_sha=$(sed -n "${count}p" "$race_sha_file") +if [ -z "$next_sha" ]; then + exit 0 +fi +# Pre-receive runs in git's quarantine env; unset its markers so update-ref +# is allowed to mutate the live ref store. +unset GIT_QUARANTINE_PATH GIT_OBJECT_DIRECTORY GIT_ALTERNATE_OBJECT_DIRECTORIES +git update-ref refs/heads/main "$next_sha" +echo "race hook moved main to $next_sha and rejected push" >&2 +exit 1 +` + hookPath := filepath.Join(hookDir, "pre-receive") + require.NoError(t, os.WriteFile(hookPath, []byte(script), 0o755)) +} + +// hookInvocations returns the number of times the pre-receive race hook +// has fired. Used by retry tests to verify the loop ran the expected +// number of attempts. +func (f gitFixture) hookInvocations(t *testing.T) int { + t.Helper() + data, err := os.ReadFile(filepath.Join(f.remoteDir, "hooks", "race-counter")) + if errors.Is(err, os.ErrNotExist) { + return 0 + } + require.NoError(t, err) + n, err := strconv.Atoi(strings.TrimSpace(string(data))) + require.NoError(t, err) + return n +} + +// remoteHEAD returns the SHA that origin/main currently points to. +func (f gitFixture) remoteHEAD(t *testing.T) string { + out := mustGitOutput(t, f.remoteDir, "rev-parse", "main") + return strings.TrimSpace(string(out)) +} + +// remoteFile returns the contents of `path` at origin/main on the bare remote. +func (f gitFixture) remoteFile(t *testing.T, path string) string { + out := mustGitOutput(t, f.remoteDir, "show", "main:"+path) + return string(out) +} + +// remoteCommitsSinceSeed returns commit SHAs on origin/main newer than the +// first (seed) commit, in chronological order. +func (f gitFixture) remoteCommitsSinceSeed(t *testing.T) []string { + out := mustGitOutput(t, f.remoteDir, "log", "--reverse", "--format=%H", "main") + all := strings.Split(strings.TrimSpace(string(out)), "\n") + if len(all) <= 1 { + return nil + } + return all[1:] +} + +func TestPusher_Push_SingleChangeSingleURIProducesOneCommit(t *testing.T) { + f := setupGitFixture(t) + sha := f.pushPRCommit(t, "feature/a", "hello.txt", "hello\nearth\n", "tweak hello") + p := f.newPusher(t) + + res, err := p.Push(context.Background(), []entity.Change{ + {URIs: []string{uri(sha)}}, + }) + require.NoError(t, err) + require.Len(t, res.Outcomes, 1) + + out := res.Outcomes[0] + assert.Equal(t, pusher.OutcomeStatusCommitted, out.Status) + require.Len(t, out.CommitSHAs, 1) + assert.Equal(t, []string{out.CommitSHAs[0]}, f.remoteCommitsSinceSeed(t)) + assert.Equal(t, "hello\nearth\n", f.remoteFile(t, "hello.txt")) +} + +func TestPusher_Push_StackedURIsProduceMultipleCommitsForOneChange(t *testing.T) { + f := setupGitFixture(t) + // Build a stack on a single branch so the second SHA's parent is the first SHA. + mustGit(t, f.authorDir, "fetch", "origin") + mustGit(t, f.authorDir, "checkout", "-B", "feature/stack", "origin/main") + + require.NoError(t, writeFile(filepath.Join(f.authorDir, "hello.txt"), "hello\nearth\n")) + mustGit(t, f.authorDir, "add", ".") + mustGit(t, f.authorDir, "commit", "-m", "step 1") + sha1 := strings.TrimSpace(string(mustGitOutput(t, f.authorDir, "rev-parse", "HEAD"))) + + require.NoError(t, writeFile(filepath.Join(f.authorDir, "hello.txt"), "hello\nearth\ngoodbye\n")) + mustGit(t, f.authorDir, "add", ".") + mustGit(t, f.authorDir, "commit", "-m", "step 2") + sha2 := strings.TrimSpace(string(mustGitOutput(t, f.authorDir, "rev-parse", "HEAD"))) + + mustGit(t, f.authorDir, "push", "-f", "origin", "feature/stack") + + p := f.newPusher(t) + res, err := p.Push(context.Background(), []entity.Change{ + {URIs: []string{uri(sha1), uri(sha2)}}, + }) + require.NoError(t, err) + require.Len(t, res.Outcomes, 1) + + out := res.Outcomes[0] + assert.Equal(t, pusher.OutcomeStatusCommitted, out.Status) + require.Len(t, out.CommitSHAs, 2) + assert.Equal(t, out.CommitSHAs, f.remoteCommitsSinceSeed(t)) + assert.Equal(t, "hello\nearth\ngoodbye\n", f.remoteFile(t, "hello.txt")) +} + +func TestPusher_Push_AlreadyLandedChangeIsRebasedOut(t *testing.T) { + f := setupGitFixture(t) + sha := f.pushPRCommit(t, "feature/a", "hello.txt", "hello\nearth\n", "tweak hello") + + // Land the same content on main outside the Pusher so the cherry-pick + // finds nothing new to add. + f.landOnMain(t, sha) + mainBeforePush := f.remoteHEAD(t) + + p := f.newPusher(t) + res, err := p.Push(context.Background(), []entity.Change{ + {URIs: []string{uri(sha)}}, + }) + require.NoError(t, err) + require.Len(t, res.Outcomes, 1) + + out := res.Outcomes[0] + assert.Equal(t, pusher.OutcomeStatusAlreadyExisted, out.Status) + assert.Empty(t, out.CommitSHAs) + assert.Equal(t, mainBeforePush, f.remoteHEAD(t), + "rebased-out push should not advance the remote tip") +} + +func TestPusher_Push_MixedChangesPartiallyRebasedOut(t *testing.T) { + f := setupGitFixture(t) + subsumedSHA := f.pushPRCommit(t, "feature/a", "hello.txt", "hello\nearth\n", "tweak hello") + f.landOnMain(t, subsumedSHA) + + freshSHA := f.pushPRCommit(t, "feature/b", "extra.txt", "extra\n", "add extra") + + p := f.newPusher(t) + res, err := p.Push(context.Background(), []entity.Change{ + {URIs: []string{uri(subsumedSHA)}}, + {URIs: []string{uri(freshSHA)}}, + }) + require.NoError(t, err) + require.Len(t, res.Outcomes, 2) + + assert.Equal(t, pusher.OutcomeStatusAlreadyExisted, res.Outcomes[0].Status) + assert.Empty(t, res.Outcomes[0].CommitSHAs) + + assert.Equal(t, pusher.OutcomeStatusCommitted, res.Outcomes[1].Status) + require.Len(t, res.Outcomes[1].CommitSHAs, 1) + + assert.Equal(t, "extra\n", f.remoteFile(t, "extra.txt")) +} + +func TestPusher_Push_ConflictReturnsErrConflictAndDoesNotPush(t *testing.T) { + f := setupGitFixture(t) + seedSHA := f.remoteSHA(t, "main") + + // Both branches start from the same seed and change the same line in + // different ways, then "earth" lands first directly on main. The + // Pusher's attempt to land "mars" must conflict. + earthSHA := f.pushPRCommitFrom(t, seedSHA, "feature/a", "hello.txt", "hello\nearth\n", "earth") + f.landOnMain(t, earthSHA) + mainBefore := f.remoteHEAD(t) + + conflictingSHA := f.pushPRCommitFrom(t, seedSHA, "feature/b", "hello.txt", "hello\nmars\n", "mars") + + p := f.newPusher(t) + _, err := p.Push(context.Background(), []entity.Change{ + {URIs: []string{uri(conflictingSHA)}}, + }) + require.Error(t, err) + assert.True(t, errors.Is(err, pusher.ErrConflict)) + + assert.Equal(t, mainBefore, f.remoteHEAD(t), + "on conflict the remote tip must not move") +} + +func TestPusher_Push_ResetsBetweenCalls(t *testing.T) { + f := setupGitFixture(t) + sha := f.pushPRCommit(t, "feature/a", "hello.txt", "hello\nearth\n", "tweak hello") + p := f.newPusher(t) + + // Dirty the checkout so that, without a reset, subsequent operations + // would fail or include unrelated changes. + require.NoError(t, writeFile(filepath.Join(f.checkoutDir, "stray.txt"), "leftover\n")) + + res, err := p.Push(context.Background(), []entity.Change{ + {URIs: []string{uri(sha)}}, + }) + require.NoError(t, err) + require.Len(t, res.Outcomes[0].CommitSHAs, 1) + + out := mustGitOutput(t, f.remoteDir, "ls-tree", "--name-only", res.Outcomes[0].CommitSHAs[0]) + assert.NotContains(t, string(out), "stray.txt", "unrelated file should not have landed") + assert.Contains(t, string(out), "hello.txt") +} + +func TestPusher_Push_RecoversAfterPriorConflict(t *testing.T) { + f := setupGitFixture(t) + seedSHA := f.remoteSHA(t, "main") + + first := f.pushPRCommitFrom(t, seedSHA, "feature/a", "hello.txt", "hello\nearth\n", "earth") + f.landOnMain(t, first) + conflictingSHA := f.pushPRCommitFrom(t, seedSHA, "feature/b", "hello.txt", "hello\nmars\n", "mars") + + p := f.newPusher(t) + _, err := p.Push(context.Background(), []entity.Change{ + {URIs: []string{uri(conflictingSHA)}}, + }) + require.Error(t, err) + + // A subsequent, clean push must succeed even though the prior call left + // a cherry-pick in progress before its rollback. + freshSHA := f.pushPRCommit(t, "feature/c", "extra.txt", "extra\n", "add extra") + res, err := p.Push(context.Background(), []entity.Change{ + {URIs: []string{uri(freshSHA)}}, + }) + require.NoError(t, err) + assert.Equal(t, pusher.OutcomeStatusCommitted, res.Outcomes[0].Status) + assert.Equal(t, "extra\n", f.remoteFile(t, "extra.txt")) +} + +func TestPusher_Push_RejectsEmptyChanges(t *testing.T) { + f := setupGitFixture(t) + p := f.newPusher(t) + + _, err := p.Push(context.Background(), nil) + require.Error(t, err) + assert.False(t, errors.Is(err, pusher.ErrConflict)) +} + +func TestPusher_Push_InvalidURIErrors(t *testing.T) { + f := setupGitFixture(t) + p := f.newPusher(t) + + _, err := p.Push(context.Background(), []entity.Change{ + {URIs: []string{"not a uri"}}, + }) + require.Error(t, err) +} + +func TestPusher_Push_RetriesWhenRemoteMovesUnderUs(t *testing.T) { + f := setupGitFixture(t) + // Pre-stage one race commit, install the hook, then build the feature + // commit. Order matters: pushPRCommit also goes through the hook, so + // race + feature must be on the remote before the hook is armed. + raceSHA := f.pushPRCommit(t, "race", "race.txt", "race\n", "race commit") + featureSHA := f.pushPRCommit(t, "feature/a", "hello.txt", "hello\nearth\n", "tweak hello") + f.installRaceHook(t, []string{raceSHA}) + + p := f.newPusher(t) + res, err := p.Push(context.Background(), []entity.Change{ + {URIs: []string{uri(featureSHA)}}, + }) + require.NoError(t, err) + require.Len(t, res.Outcomes, 1) + require.Len(t, res.Outcomes[0].CommitSHAs, 1) + + assert.Equal(t, 2, f.hookInvocations(t), + "first attempt rejected by hook, second attempt allowed through") + + commits := f.remoteCommitsSinceSeed(t) + require.Len(t, commits, 2) + assert.Equal(t, raceSHA, commits[0], + "race commit landed first via the hook") + assert.Equal(t, res.Outcomes[0].CommitSHAs[0], commits[1], + "our cherry-pick landed on top after the retry") + assert.Equal(t, "hello\nearth\n", f.remoteFile(t, "hello.txt")) +} + +func TestPusher_Push_GivesUpAfterMaxAttempts(t *testing.T) { + f := setupGitFixture(t) + raceSHAs := []string{ + f.pushPRCommit(t, "race1", "r1.txt", "1\n", "r1"), + f.pushPRCommit(t, "race2", "r2.txt", "2\n", "r2"), + } + featureSHA := f.pushPRCommit(t, "feature/a", "hello.txt", "hello\nearth\n", "tweak hello") + f.installRaceHook(t, raceSHAs) + + p := NewPusher(Params{ + CheckoutPath: f.checkoutDir, + Remote: "origin", + Target: "main", + Logger: zaptest.NewLogger(t).Sugar(), + MetricsScope: tally.NoopScope, + MaxPushAttempts: 2, + }) + _, err := p.Push(context.Background(), []entity.Change{ + {URIs: []string{uri(featureSHA)}}, + }) + require.Error(t, err) + assert.Equal(t, 2, f.hookInvocations(t), + "both attempts hit the hook before the cap kicked in") + // The remote ended up at race2 (the last hook injection), and our + // feature commit never landed. + assert.Equal(t, raceSHAs[1], f.remoteHEAD(t)) +} + +// --- helpers --- + +func mustGit(t *testing.T, dir string, args ...string) { + t.Helper() + cmd := exec.Command("git", args...) + cmd.Dir = dir + var stderr bytes.Buffer + cmd.Stderr = &stderr + require.NoError(t, cmd.Run(), "git %s: %s", strings.Join(args, " "), stderr.String()) +} + +func mustGitOutput(t *testing.T, dir string, args ...string) []byte { + t.Helper() + cmd := exec.Command("git", args...) + cmd.Dir = dir + var stdout, stderr bytes.Buffer + cmd.Stdout = &stdout + cmd.Stderr = &stderr + require.NoError(t, cmd.Run(), "git %s: %s", strings.Join(args, " "), stderr.String()) + return stdout.Bytes() +} + +func writeFile(path, contents string) error { + return os.WriteFile(path, []byte(contents), 0o644) +} diff --git a/extension/pusher/mock/BUILD.bazel b/extension/pusher/mock/BUILD.bazel new file mode 100644 index 00000000..428c8724 --- /dev/null +++ b/extension/pusher/mock/BUILD.bazel @@ -0,0 +1,13 @@ +load("@rules_go//go:def.bzl", "go_library") + +go_library( + name = "mock", + srcs = ["pusher_mock.go"], + importpath = "github.com/uber/submitqueue/extension/pusher/mock", + visibility = ["//visibility:public"], + deps = [ + "//entity", + "//extension/pusher", + "@org_uber_go_mock//gomock", + ], +) diff --git a/extension/pusher/mock/pusher_mock.go b/extension/pusher/mock/pusher_mock.go new file mode 100644 index 00000000..6b0e6699 --- /dev/null +++ b/extension/pusher/mock/pusher_mock.go @@ -0,0 +1,58 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: pusher.go +// +// Generated by this command: +// +// mockgen -source=pusher.go -destination=mock/pusher_mock.go -package=mock +// + +// Package mock is a generated GoMock package. +package mock + +import ( + context "context" + reflect "reflect" + + entity "github.com/uber/submitqueue/entity" + pusher "github.com/uber/submitqueue/extension/pusher" + gomock "go.uber.org/mock/gomock" +) + +// MockPusher is a mock of Pusher interface. +type MockPusher struct { + ctrl *gomock.Controller + recorder *MockPusherMockRecorder + isgomock struct{} +} + +// MockPusherMockRecorder is the mock recorder for MockPusher. +type MockPusherMockRecorder struct { + mock *MockPusher +} + +// NewMockPusher creates a new mock instance. +func NewMockPusher(ctrl *gomock.Controller) *MockPusher { + mock := &MockPusher{ctrl: ctrl} + mock.recorder = &MockPusherMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockPusher) EXPECT() *MockPusherMockRecorder { + return m.recorder +} + +// Push mocks base method. +func (m *MockPusher) Push(ctx context.Context, changes []entity.Change) (pusher.Result, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Push", ctx, changes) + ret0, _ := ret[0].(pusher.Result) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Push indicates an expected call of Push. +func (mr *MockPusherMockRecorder) Push(ctx, changes any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Push", reflect.TypeOf((*MockPusher)(nil).Push), ctx, changes) +} diff --git a/extension/pusher/pusher.go b/extension/pusher/pusher.go new file mode 100644 index 00000000..09bc9afc --- /dev/null +++ b/extension/pusher/pusher.go @@ -0,0 +1,88 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package pusher + +//go:generate mockgen -source=pusher.go -destination=mock/pusher_mock.go -package=mock + +import ( + "context" + "errors" + + "github.com/uber/submitqueue/entity" +) + +// ErrConflict is returned by a Pusher when one of the changes fails to apply +// cleanly on top of the current tip of the target branch. Callers should +// treat conflicts as user-caused and non-retryable. +var ErrConflict = errors.New("change conflict") + +// OutcomeStatus describes what happened to a single Change during a Push. +type OutcomeStatus string + +const ( + // OutcomeStatusUnknown is the unreachable zero value, set by default + // when the structure is initialized. It should never be seen in the system. + OutcomeStatusUnknown OutcomeStatus = "" + // OutcomeStatusCommitted means the change produced one or more commits + // on the target branch. CommitSHAs lists those commits in apply order. + OutcomeStatusCommitted OutcomeStatus = "committed" + // OutcomeStatusAlreadyExisted means the change produced no commits + // because every part of it is already present in the target branch + // (e.g. it previously landed via another path, or a prior change in + // the same push subsumed it). CommitSHAs is empty for this status. + // In git terms this is what a `cherry-pick` surfaces as "rebased out". + OutcomeStatusAlreadyExisted OutcomeStatus = "already_existed" +) + +// ChangeOutcome describes what happened to a single Change inside a Push. +type ChangeOutcome struct { + // Change is the input change this outcome corresponds to. + Change entity.Change + // Status describes whether the change produced commits or was already + // present on the target branch. + Status OutcomeStatus + // CommitSHAs lists the commits this change produced on the target + // branch, in apply order. A single Change may produce multiple commits + // (e.g. a stack of PRs). Empty when Status is OutcomeStatusAlreadyExisted. + CommitSHAs []string +} + +// Result is the outcome of a successful Push call. +type Result struct { + // Outcomes is one entry per input change, in the same order as the + // changes passed to Push. The slice length equals the input length. + Outcomes []ChangeOutcome +} + +// Pusher applies a list of Changes on top of a target branch and pushes the +// result to the source-control remote. Each implementation is bound to a +// specific (checkout, remote, target) at construction time. +// +// Atomicity contract: when Push returns a non-nil error, NO change has been +// pushed to the remote — neither partially nor fully. Implementations must +// either roll back any local state or arrange for the push to never happen +// when any change fails to apply. Callers can treat a non-nil error as +// "the remote is exactly as it was before the call". +// +// On success, len(Result.Outcomes) == len(changes) and Outcomes[i] describes +// what happened to changes[i]. A change can produce multiple commits +// (OutcomeStatusCommitted, CommitSHAs populated in apply order) or none at +// all (OutcomeStatusAlreadyExisted, CommitSHAs empty) — the latter happens +// when the change's content is already present on the target branch. +type Pusher interface { + // Push applies changes onto the target branch and pushes the resulting + // commits. See the type-level docs for the atomicity contract. + Push(ctx context.Context, changes []entity.Change) (Result, error) +} diff --git a/orchestrator/controller/merge/BUILD.bazel b/orchestrator/controller/merge/BUILD.bazel index 93333685..657b9ae6 100644 --- a/orchestrator/controller/merge/BUILD.bazel +++ b/orchestrator/controller/merge/BUILD.bazel @@ -7,8 +7,10 @@ go_library( visibility = ["//visibility:public"], deps = [ "//core/consumer", + "//core/metrics", "//entity", "//entity/queue", + "//extension/pusher", "//extension/storage", "@com_github_uber_go_tally_v4//:tally", "@org_uber_go_zap//:zap", @@ -24,6 +26,8 @@ go_test( "//core/errs", "//entity", "//entity/queue", + "//extension/pusher", + "//extension/pusher/mock", "//extension/queue/mock", "//extension/storage/mock", "@com_github_stretchr_testify//assert", diff --git a/orchestrator/controller/merge/merge.go b/orchestrator/controller/merge/merge.go index 395e4c4c..d09c12db 100644 --- a/orchestrator/controller/merge/merge.go +++ b/orchestrator/controller/merge/merge.go @@ -16,24 +16,35 @@ package merge import ( "context" + "errors" "fmt" "github.com/uber-go/tally/v4" + "go.uber.org/zap" + "github.com/uber/submitqueue/core/consumer" + coremetrics "github.com/uber/submitqueue/core/metrics" "github.com/uber/submitqueue/entity" entityqueue "github.com/uber/submitqueue/entity/queue" + "github.com/uber/submitqueue/extension/pusher" "github.com/uber/submitqueue/extension/storage" - "go.uber.org/zap" ) -// Controller handles merge queue messages. -// It consumes batches, performs merges, and publishes to both conclude and speculate stages. -// Implements consumer.Controller interface for integration with the consumer. +// Controller handles merge queue messages. It loads every request in a batch, +// hands the resulting list of Changes to the configured Pusher, and +// transitions the batch to a terminal state based on the Pusher's outcome. +// After updating state it forwards the batch to conclude (so requests pick +// up the outcome) and to speculate (so downstream batches can re-plan). +// +// Conflicts are user-caused: the batch goes to BatchStateFailed and the +// queue message is acked. Any other Pusher error is treated as transient +// infra: the batch is left in place and the message is nacked. type Controller struct { logger *zap.SugaredLogger metricsScope tally.Scope store storage.Storage registry consumer.TopicRegistry + pusher pusher.Pusher topicKey consumer.TopicKey consumerGroup string } @@ -47,6 +58,7 @@ func NewController( scope tally.Scope, store storage.Storage, registry consumer.TopicRegistry, + pusherImpl pusher.Pusher, topicKey consumer.TopicKey, consumerGroup string, ) *Controller { @@ -55,30 +67,29 @@ func NewController( metricsScope: scope.SubScope("merge_controller"), store: store, registry: registry, + pusher: pusherImpl, topicKey: topicKey, consumerGroup: consumerGroup, } } -// Process processes a merge delivery from the queue. -// Deserializes the batch, performs the merge, and publishes to both conclude and speculate topics. +// Process performs the merge for a batch and forwards it to conclude/speculate. // Returns nil to ack (success), or error to nack (retry). -func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) error { - c.metricsScope.Counter("received").Inc(1) +func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (retErr error) { + op := coremetrics.Begin(c.metricsScope, "process") + defer func() { op.Complete(retErr) }() msg := delivery.Message() - // Deserialize batch ID from payload bid, err := entity.BatchIDFromBytes(msg.Payload) if err != nil { - c.metricsScope.Counter("deserialize_errors").Inc(1) + coremetrics.NamedCounter(c.metricsScope, "process", "deserialize_errors", 1) return fmt.Errorf("failed to deserialize batch ID: %w", err) } - // Fetch batch from storage batch, err := c.store.GetBatchStore().Get(ctx, bid.ID) if err != nil { - c.metricsScope.Counter("storage_errors").Inc(1) + coremetrics.NamedCounter(c.metricsScope, "process", "storage_errors", 1) return fmt.Errorf("failed to get batch %s: %w", bid.ID, err) } @@ -91,35 +102,81 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er "partition_key", msg.PartitionKey, ) - // TODO: Add merge logic - // - Perform source control merge operation - // - Handle merge conflicts + // Idempotency: if the batch is already in a terminal state, a previous + // attempt has already merged (or failed) — just re-fan-out the events + // in case downstream stages missed them. + if batch.State.IsTerminal() { + coremetrics.NamedCounter(c.metricsScope, "process", "skipped_terminal", 1) + return c.fanout(ctx, batch.ID, batch.Queue) + } - // Publish to conclude topic - if err := c.publish(ctx, consumer.TopicKeyConclude, batch.ID, batch.Queue); err != nil { - c.metricsScope.Counter("publish_errors").Inc(1) - return fmt.Errorf("failed to publish to conclude: %w", err) + changes, err := c.collectChanges(ctx, batch) + if err != nil { + coremetrics.NamedCounter(c.metricsScope, "process", "request_load_errors", 1) + return fmt.Errorf("failed to collect changes for batch %s: %w", batch.ID, err) } - c.logger.Infow("published batch to conclude", - "batch_id", batch.ID, - "topic_key", consumer.TopicKeyConclude, - ) + pushRes, pushErr := c.pusher.Push(ctx, changes) + + var newState entity.BatchState + switch { + case pushErr == nil: + newState = entity.BatchStateSucceeded + c.logger.Infow("merged batch", + "batch_id", batch.ID, + "outcomes", pushRes.Outcomes, + ) + case errors.Is(pushErr, pusher.ErrConflict): + coremetrics.NamedCounter(c.metricsScope, "process", "push_conflicts", 1) + newState = entity.BatchStateFailed + c.logger.Warnw("batch merge failed", + "batch_id", batch.ID, + "state", string(newState), + "error", pushErr, + ) + default: + coremetrics.NamedCounter(c.metricsScope, "process", "push_errors", 1) + return fmt.Errorf("push failed for batch %s: %w", batch.ID, pushErr) + } - // Publish to speculate topic - if err := c.publish(ctx, consumer.TopicKeySpeculate, batch.ID, batch.Queue); err != nil { - c.metricsScope.Counter("publish_errors").Inc(1) - return fmt.Errorf("failed to publish to speculate: %w", err) + newVersion := batch.Version + 1 + if err := c.store.GetBatchStore().UpdateState(ctx, batch.ID, batch.Version, newVersion, newState); err != nil { + coremetrics.NamedCounter(c.metricsScope, "process", "state_update_errors", 1) + return fmt.Errorf("failed to transition batch %s to %s: %w", batch.ID, newState, err) } + batch.Version = newVersion + batch.State = newState - c.logger.Infow("published batch to speculate", - "batch_id", batch.ID, - "topic_key", consumer.TopicKeySpeculate, - ) + return c.fanout(ctx, batch.ID, batch.Queue) +} - c.metricsScope.Counter("processed").Inc(1) +// collectChanges loads each request in batch.Contains and returns its +// Change. The result preserves batch.Contains order so the Pusher applies +// the changes in the same order the requests were batched. +func (c *Controller) collectChanges(ctx context.Context, batch entity.Batch) ([]entity.Change, error) { + changes := make([]entity.Change, 0, len(batch.Contains)) + for _, requestID := range batch.Contains { + request, err := c.store.GetRequestStore().Get(ctx, requestID) + if err != nil { + return nil, fmt.Errorf("failed to get request %s: %w", requestID, err) + } + changes = append(changes, request.Change) + } + return changes, nil +} - return nil // Success - message will be acked +// fanout publishes the batch ID to conclude (so requests are updated) and +// to speculate (so dependents can re-evaluate now that this batch is done). +func (c *Controller) fanout(ctx context.Context, batchID, partitionKey string) error { + if err := c.publish(ctx, consumer.TopicKeyConclude, batchID, partitionKey); err != nil { + coremetrics.NamedCounter(c.metricsScope, "process", "publish_conclude_errors", 1) + return fmt.Errorf("failed to publish to conclude: %w", err) + } + if err := c.publish(ctx, consumer.TopicKeySpeculate, batchID, partitionKey); err != nil { + coremetrics.NamedCounter(c.metricsScope, "process", "publish_speculate_errors", 1) + return fmt.Errorf("failed to publish to speculate: %w", err) + } + return nil } // publish publishes a batch ID to the specified topic key. diff --git a/orchestrator/controller/merge/merge_test.go b/orchestrator/controller/merge/merge_test.go index aa9f0738..351f4b42 100644 --- a/orchestrator/controller/merge/merge_test.go +++ b/orchestrator/controller/merge/merge_test.go @@ -22,14 +22,17 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/uber-go/tally/v4" + "go.uber.org/mock/gomock" + "go.uber.org/zap/zaptest" + "github.com/uber/submitqueue/core/consumer" "github.com/uber/submitqueue/core/errs" "github.com/uber/submitqueue/entity" "github.com/uber/submitqueue/entity/queue" + "github.com/uber/submitqueue/extension/pusher" + pushermock "github.com/uber/submitqueue/extension/pusher/mock" queuemock "github.com/uber/submitqueue/extension/queue/mock" storagemock "github.com/uber/submitqueue/extension/storage/mock" - "go.uber.org/mock/gomock" - "go.uber.org/zap/zaptest" ) // batchIDPayload serializes a BatchID to JSON bytes for test message payloads. @@ -39,121 +42,411 @@ func batchIDPayload(t *testing.T, id string) []byte { return payload } -// testBatch returns a standard test batch for merge tests. -func testBatch() entity.Batch { - return entity.Batch{ - ID: "test-queue/batch/1", - Queue: "test-queue", - State: entity.BatchStateCreated, - Version: 1, - } +func newDelivery(t *testing.T, ctrl *gomock.Controller, batchID, partitionKey string) *queuemock.MockDelivery { + msg := queue.NewMessage(batchID, batchIDPayload(t, batchID), partitionKey, nil) + delivery := queuemock.NewMockDelivery(ctrl) + delivery.EXPECT().Message().Return(msg).AnyTimes() + delivery.EXPECT().Attempt().Return(1).AnyTimes() + return delivery } -// newMockStorage creates a MockStorage with a MockBatchStore that returns the given batch on Get. -func newMockStorage(ctrl *gomock.Controller, batch entity.Batch) *storagemock.MockStorage { - mockBatchStore := storagemock.NewMockBatchStore(ctrl) - mockBatchStore.EXPECT().Get(gomock.Any(), batch.ID).Return(batch, nil).AnyTimes() +// newRegistry returns a registry where conclude and speculate accept any publish. +func newRegistry(t *testing.T, ctrl *gomock.Controller, publishErr error) consumer.TopicRegistry { + mockPub := queuemock.NewMockPublisher(ctrl) + mockPub.EXPECT().Publish(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( + func(_ context.Context, _ string, _ queue.Message) error { return publishErr }, + ).AnyTimes() + mockQ := queuemock.NewMockQueue(ctrl) + mockQ.EXPECT().Publisher().Return(mockPub).AnyTimes() + + registry, err := consumer.NewTopicRegistry([]consumer.TopicConfig{ + {Key: consumer.TopicKeyConclude, Name: "conclude", Queue: mockQ}, + {Key: consumer.TopicKeySpeculate, Name: "speculate", Queue: mockQ}, + }) + require.NoError(t, err) + return registry +} +func TestNewController(t *testing.T) { + ctrl := gomock.NewController(t) store := storagemock.NewMockStorage(ctrl) - store.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes() - return store + c := NewController( + zaptest.NewLogger(t).Sugar(), + tally.NoopScope, + store, + newRegistry(t, ctrl, nil), + pushermock.NewMockPusher(ctrl), + consumer.TopicKeyMerge, + "orchestrator-merge", + ) + + require.NotNil(t, c) + assert.Equal(t, consumer.TopicKeyMerge, c.TopicKey()) + assert.Equal(t, "orchestrator-merge", c.ConsumerGroup()) + assert.Equal(t, "merge", c.Name()) + var _ consumer.Controller = c } -// newTestController creates a controller with test dependencies. -func newTestController(t *testing.T, ctrl *gomock.Controller, store *storagemock.MockStorage, publishErr error) *Controller { - logger := zaptest.NewLogger(t).Sugar() - scope := tally.NoopScope +func TestController_Process_SuccessfulMerge(t *testing.T) { + ctrl := gomock.NewController(t) - mockPub := queuemock.NewMockPublisher(ctrl) - mockPub.EXPECT().Publish(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( - func(ctx context.Context, topic string, msg queue.Message) error { - return publishErr + const reqID = "test-queue/1" + const batchID = "test-queue/batch/1" + + batch := entity.Batch{ + ID: batchID, + Queue: "test-queue", + Contains: []string{reqID}, + State: entity.BatchStateFinalizing, + Version: 4, + } + change := entity.Change{URIs: []string{"github://o/r/1/sha"}} + request := entity.Request{ + ID: reqID, + Queue: "test-queue", + Change: change, + State: entity.RequestStateProcessing, + Version: 2, + } + + batchStore := storagemock.NewMockBatchStore(ctrl) + batchStore.EXPECT().Get(gomock.Any(), batchID).Return(batch, nil) + batchStore.EXPECT().UpdateState(gomock.Any(), batchID, int32(4), int32(5), entity.BatchStateSucceeded).Return(nil) + + requestStore := storagemock.NewMockRequestStore(ctrl) + requestStore.EXPECT().Get(gomock.Any(), reqID).Return(request, nil) + + store := storagemock.NewMockStorage(ctrl) + store.EXPECT().GetBatchStore().Return(batchStore).AnyTimes() + store.EXPECT().GetRequestStore().Return(requestStore).AnyTimes() + + mockPusher := pushermock.NewMockPusher(ctrl) + mockPusher.EXPECT().Push(gomock.Any(), gomock.Any()).DoAndReturn( + func(_ context.Context, changes []entity.Change) (pusher.Result, error) { + require.Len(t, changes, 1) + assert.Equal(t, change, changes[0]) + return pusher.Result{Outcomes: []pusher.ChangeOutcome{{ + Change: change, + Status: pusher.OutcomeStatusCommitted, + CommitSHAs: []string{"deadbeef"}, + }}}, nil }, - ).AnyTimes() + ) - mockQ := queuemock.NewMockQueue(ctrl) - mockQ.EXPECT().Publisher().Return(mockPub).AnyTimes() + c := NewController( + zaptest.NewLogger(t).Sugar(), + tally.NoopScope, + store, + newRegistry(t, ctrl, nil), + mockPusher, + consumer.TopicKeyMerge, + "orchestrator-merge", + ) + + err := c.Process(context.Background(), newDelivery(t, ctrl, batchID, batch.Queue)) + require.NoError(t, err) +} + +func TestController_Process_PassesAllChangesInBatchOrder(t *testing.T) { + ctrl := gomock.NewController(t) + + const batchID = "test-queue/batch/multi" + requestIDs := []string{"test-queue/1", "test-queue/2", "test-queue/3"} + changes := []entity.Change{ + {URIs: []string{"github://o/r/1/sha1"}}, + {URIs: []string{"github://o/r/2/sha2"}}, + {URIs: []string{"github://o/r/3/sha3"}}, + } + + batch := entity.Batch{ + ID: batchID, + Queue: "test-queue", + Contains: requestIDs, + State: entity.BatchStateFinalizing, + Version: 1, + } - registry, err := consumer.NewTopicRegistry( - []consumer.TopicConfig{ - {Key: consumer.TopicKeyConclude, Name: "conclude", Queue: mockQ}, - {Key: consumer.TopicKeySpeculate, Name: "speculate", Queue: mockQ}, + batchStore := storagemock.NewMockBatchStore(ctrl) + batchStore.EXPECT().Get(gomock.Any(), batchID).Return(batch, nil) + batchStore.EXPECT().UpdateState(gomock.Any(), batchID, int32(1), int32(2), entity.BatchStateSucceeded).Return(nil) + + requestStore := storagemock.NewMockRequestStore(ctrl) + for i, rid := range requestIDs { + requestStore.EXPECT().Get(gomock.Any(), rid).Return(entity.Request{ + ID: rid, Change: changes[i], + }, nil) + } + + store := storagemock.NewMockStorage(ctrl) + store.EXPECT().GetBatchStore().Return(batchStore).AnyTimes() + store.EXPECT().GetRequestStore().Return(requestStore).AnyTimes() + + mockPusher := pushermock.NewMockPusher(ctrl) + mockPusher.EXPECT().Push(gomock.Any(), gomock.Any()).DoAndReturn( + func(_ context.Context, got []entity.Change) (pusher.Result, error) { + assert.Equal(t, changes, got, "changes must be in batch.Contains order") + outcomes := make([]pusher.ChangeOutcome, len(got)) + for i, ch := range got { + outcomes[i] = pusher.ChangeOutcome{ + Change: ch, + Status: pusher.OutcomeStatusCommitted, + CommitSHAs: []string{fmt.Sprintf("sha-%d", i)}, + } + } + return pusher.Result{Outcomes: outcomes}, nil }, ) + + c := NewController( + zaptest.NewLogger(t).Sugar(), + tally.NoopScope, + store, + newRegistry(t, ctrl, nil), + mockPusher, + consumer.TopicKeyMerge, + "orchestrator-merge", + ) + + err := c.Process(context.Background(), newDelivery(t, ctrl, batchID, batch.Queue)) require.NoError(t, err) +} + +func TestController_Process_PushConflictMarksBatchFailed(t *testing.T) { + ctrl := gomock.NewController(t) + + const reqID = "test-queue/2" + const batchID = "test-queue/batch/2" + + batch := entity.Batch{ + ID: batchID, + Queue: "test-queue", + Contains: []string{reqID}, + State: entity.BatchStateFinalizing, + Version: 3, + } + + batchStore := storagemock.NewMockBatchStore(ctrl) + batchStore.EXPECT().Get(gomock.Any(), batchID).Return(batch, nil) + batchStore.EXPECT().UpdateState(gomock.Any(), batchID, int32(3), int32(4), entity.BatchStateFailed).Return(nil) - return NewController(logger, scope, store, registry, consumer.TopicKeyMerge, "orchestrator-merge") + requestStore := storagemock.NewMockRequestStore(ctrl) + requestStore.EXPECT().Get(gomock.Any(), reqID).Return(entity.Request{ + ID: reqID, Change: entity.Change{URIs: []string{"github://o/r/2/sha"}}, + }, nil) + + store := storagemock.NewMockStorage(ctrl) + store.EXPECT().GetBatchStore().Return(batchStore).AnyTimes() + store.EXPECT().GetRequestStore().Return(requestStore).AnyTimes() + + mockPusher := pushermock.NewMockPusher(ctrl) + mockPusher.EXPECT().Push(gomock.Any(), gomock.Any()).Return( + pusher.Result{}, + fmt.Errorf("apply: %w", pusher.ErrConflict), + ) + + c := NewController( + zaptest.NewLogger(t).Sugar(), + tally.NoopScope, + store, + newRegistry(t, ctrl, nil), + mockPusher, + consumer.TopicKeyMerge, + "orchestrator-merge", + ) + + err := c.Process(context.Background(), newDelivery(t, ctrl, batchID, batch.Queue)) + require.NoError(t, err, "conflict ack-s the message; failure is recorded on the batch") } -func TestNewController(t *testing.T) { +func TestController_Process_PushInfraFailureReturnsError(t *testing.T) { ctrl := gomock.NewController(t) - batch := testBatch() - store := newMockStorage(ctrl, batch) - controller := newTestController(t, ctrl, store, nil) - - require.NotNil(t, controller) - assert.Equal(t, consumer.TopicKeyMerge, controller.TopicKey()) - assert.Equal(t, "orchestrator-merge", controller.ConsumerGroup()) - assert.Equal(t, "merge", controller.Name()) + + const reqID = "test-queue/3" + const batchID = "test-queue/batch/3" + + batch := entity.Batch{ + ID: batchID, + Queue: "test-queue", + Contains: []string{reqID}, + State: entity.BatchStateFinalizing, + Version: 1, + } + + batchStore := storagemock.NewMockBatchStore(ctrl) + batchStore.EXPECT().Get(gomock.Any(), batchID).Return(batch, nil) + + requestStore := storagemock.NewMockRequestStore(ctrl) + requestStore.EXPECT().Get(gomock.Any(), reqID).Return(entity.Request{ + ID: reqID, Change: entity.Change{URIs: []string{"github://o/r/3/sha"}}, + }, nil) + + store := storagemock.NewMockStorage(ctrl) + store.EXPECT().GetBatchStore().Return(batchStore).AnyTimes() + store.EXPECT().GetRequestStore().Return(requestStore).AnyTimes() + + mockPusher := pushermock.NewMockPusher(ctrl) + mockPusher.EXPECT().Push(gomock.Any(), gomock.Any()).Return( + pusher.Result{}, + fmt.Errorf("ssh: connection refused"), + ) + + c := NewController( + zaptest.NewLogger(t).Sugar(), + tally.NoopScope, + store, + newRegistry(t, ctrl, nil), + mockPusher, + consumer.TopicKeyMerge, + "orchestrator-merge", + ) + + err := c.Process(context.Background(), newDelivery(t, ctrl, batchID, batch.Queue)) + require.Error(t, err) } -func TestController_Process_Success(t *testing.T) { +func TestController_Process_TerminalBatchSkipsPushButFansOut(t *testing.T) { ctrl := gomock.NewController(t) - batch := testBatch() - store := newMockStorage(ctrl, batch) - controller := newTestController(t, ctrl, store, nil) + const batchID = "test-queue/batch/4" - msg := queue.NewMessage(batch.ID, batchIDPayload(t, batch.ID), batch.Queue, nil) - delivery := queuemock.NewMockDelivery(ctrl) - delivery.EXPECT().Message().Return(msg).AnyTimes() - delivery.EXPECT().Attempt().Return(1).AnyTimes() + batch := entity.Batch{ + ID: batchID, + Queue: "test-queue", + State: entity.BatchStateSucceeded, + Version: 7, + } + + batchStore := storagemock.NewMockBatchStore(ctrl) + batchStore.EXPECT().Get(gomock.Any(), batchID).Return(batch, nil) + + store := storagemock.NewMockStorage(ctrl) + store.EXPECT().GetBatchStore().Return(batchStore).AnyTimes() + + // Push must NOT be called for an already-terminal batch. + mockPusher := pushermock.NewMockPusher(ctrl) + + c := NewController( + zaptest.NewLogger(t).Sugar(), + tally.NoopScope, + store, + newRegistry(t, ctrl, nil), + mockPusher, + consumer.TopicKeyMerge, + "orchestrator-merge", + ) - err := controller.Process(context.Background(), delivery) + err := c.Process(context.Background(), newDelivery(t, ctrl, batchID, batch.Queue)) require.NoError(t, err) } -func TestController_Process_StorageFailure(t *testing.T) { +func TestController_Process_BatchStoreGetFailureNotRetryable(t *testing.T) { ctrl := gomock.NewController(t) - mockBatchStore := storagemock.NewMockBatchStore(ctrl) - mockBatchStore.EXPECT().Get(gomock.Any(), "test-queue/batch/1").Return(entity.Batch{}, fmt.Errorf("db connection lost")) - store := storagemock.NewMockStorage(ctrl) - store.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes() + const batchID = "test-queue/batch/5" - controller := newTestController(t, ctrl, store, nil) + batchStore := storagemock.NewMockBatchStore(ctrl) + batchStore.EXPECT().Get(gomock.Any(), batchID).Return(entity.Batch{}, fmt.Errorf("db connection lost")) - msg := queue.NewMessage("test-queue/batch/1", batchIDPayload(t, "test-queue/batch/1"), "test-queue", nil) - delivery := queuemock.NewMockDelivery(ctrl) - delivery.EXPECT().Message().Return(msg).AnyTimes() - delivery.EXPECT().Attempt().Return(1).AnyTimes() + store := storagemock.NewMockStorage(ctrl) + store.EXPECT().GetBatchStore().Return(batchStore).AnyTimes() + + c := NewController( + zaptest.NewLogger(t).Sugar(), + tally.NoopScope, + store, + newRegistry(t, ctrl, nil), + pushermock.NewMockPusher(ctrl), + consumer.TopicKeyMerge, + "orchestrator-merge", + ) - err := controller.Process(context.Background(), delivery) + err := c.Process(context.Background(), newDelivery(t, ctrl, batchID, "test-queue")) require.Error(t, err) assert.False(t, errs.IsRetryable(err)) } -func TestController_Process_PublishFailure(t *testing.T) { +func TestController_Process_RequestStoreFailurePropagates(t *testing.T) { ctrl := gomock.NewController(t) - batch := testBatch() - store := newMockStorage(ctrl, batch) - controller := newTestController(t, ctrl, store, fmt.Errorf("publish failed")) + const reqID = "test-queue/6" + const batchID = "test-queue/batch/6" - msg := queue.NewMessage(batch.ID, batchIDPayload(t, batch.ID), batch.Queue, nil) - delivery := queuemock.NewMockDelivery(ctrl) - delivery.EXPECT().Message().Return(msg).AnyTimes() - delivery.EXPECT().Attempt().Return(1).AnyTimes() + batch := entity.Batch{ + ID: batchID, + Queue: "test-queue", + Contains: []string{reqID}, + State: entity.BatchStateFinalizing, + Version: 1, + } + + batchStore := storagemock.NewMockBatchStore(ctrl) + batchStore.EXPECT().Get(gomock.Any(), batchID).Return(batch, nil) + + requestStore := storagemock.NewMockRequestStore(ctrl) + requestStore.EXPECT().Get(gomock.Any(), reqID).Return(entity.Request{}, fmt.Errorf("db connection lost")) + + store := storagemock.NewMockStorage(ctrl) + store.EXPECT().GetBatchStore().Return(batchStore).AnyTimes() + store.EXPECT().GetRequestStore().Return(requestStore).AnyTimes() - err := controller.Process(context.Background(), delivery) - assert.Error(t, err) + c := NewController( + zaptest.NewLogger(t).Sugar(), + tally.NoopScope, + store, + newRegistry(t, ctrl, nil), + pushermock.NewMockPusher(ctrl), + consumer.TopicKeyMerge, + "orchestrator-merge", + ) + + err := c.Process(context.Background(), newDelivery(t, ctrl, batchID, batch.Queue)) + require.Error(t, err) } -func TestController_InterfaceImplementation(t *testing.T) { +func TestController_Process_PublishFailureSurfaces(t *testing.T) { ctrl := gomock.NewController(t) - batch := testBatch() - store := newMockStorage(ctrl, batch) - controller := newTestController(t, ctrl, store, nil) - var _ consumer.Controller = controller + const reqID = "test-queue/7" + const batchID = "test-queue/batch/7" + + batch := entity.Batch{ + ID: batchID, + Queue: "test-queue", + Contains: []string{reqID}, + State: entity.BatchStateFinalizing, + Version: 2, + } + + batchStore := storagemock.NewMockBatchStore(ctrl) + batchStore.EXPECT().Get(gomock.Any(), batchID).Return(batch, nil) + batchStore.EXPECT().UpdateState(gomock.Any(), batchID, int32(2), int32(3), entity.BatchStateSucceeded).Return(nil) + + requestStore := storagemock.NewMockRequestStore(ctrl) + requestStore.EXPECT().Get(gomock.Any(), reqID).Return(entity.Request{ + ID: reqID, Change: entity.Change{URIs: []string{"github://o/r/7/sha"}}, + }, nil) + + store := storagemock.NewMockStorage(ctrl) + store.EXPECT().GetBatchStore().Return(batchStore).AnyTimes() + store.EXPECT().GetRequestStore().Return(requestStore).AnyTimes() + + mockPusher := pushermock.NewMockPusher(ctrl) + mockPusher.EXPECT().Push(gomock.Any(), gomock.Any()).Return( + pusher.Result{Outcomes: []pusher.ChangeOutcome{{ + Status: pusher.OutcomeStatusCommitted, CommitSHAs: []string{"abc"}, + }}}, nil, + ) + + c := NewController( + zaptest.NewLogger(t).Sugar(), + tally.NoopScope, + store, + newRegistry(t, ctrl, fmt.Errorf("queue down")), + mockPusher, + consumer.TopicKeyMerge, + "orchestrator-merge", + ) + + err := c.Process(context.Background(), newDelivery(t, ctrl, batchID, batch.Queue)) + require.Error(t, err) }