diff --git a/Makefile b/Makefile index 622364cd..2d6bedef 100644 --- a/Makefile +++ b/Makefile @@ -245,7 +245,7 @@ local-stop: ## Stop all services (keep data) mocks: ## Generate mock files using mockgen @echo "Generating mocks..." - @$(BAZEL) run @rules_go//go -- generate ./extension/storage/... ./extension/counter/... ./extension/queue/... ./extension/mergechecker/... ./extension/scorer/... ./core/consumer/... + @$(BAZEL) run @rules_go//go -- generate ./extension/storage/... ./extension/counter/... ./extension/queue/... ./extension/mergechecker/... ./extension/pusher/... ./extension/scorer/... ./core/consumer/... @echo "Mocks generated successfully!" proto: ## Generate protobuf files from .proto definitions diff --git a/example/server/orchestrator/BUILD.bazel b/example/server/orchestrator/BUILD.bazel index bbbb3a8d..084a1857 100644 --- a/example/server/orchestrator/BUILD.bazel +++ b/example/server/orchestrator/BUILD.bazel @@ -20,6 +20,8 @@ go_library( "//extension/counter/mysql", "//extension/mergechecker", "//extension/mergechecker/github", + "//extension/pusher", + "//extension/pusher/git", "//extension/queue", "//extension/queue/mysql", "//extension/scorer/heuristic", diff --git a/example/server/orchestrator/main.go b/example/server/orchestrator/main.go index e4311d35..cb5a61c0 100644 --- a/example/server/orchestrator/main.go +++ b/example/server/orchestrator/main.go @@ -39,6 +39,8 @@ import ( mysqlcounter "github.com/uber/submitqueue/extension/counter/mysql" "github.com/uber/submitqueue/extension/mergechecker" githubchecker "github.com/uber/submitqueue/extension/mergechecker/github" + "github.com/uber/submitqueue/extension/pusher" + gitpusher "github.com/uber/submitqueue/extension/pusher/git" extqueue "github.com/uber/submitqueue/extension/queue" queueMySQL "github.com/uber/submitqueue/extension/queue/mysql" "github.com/uber/submitqueue/extension/scorer/heuristic" @@ -203,8 +205,14 @@ func run() error { return fmt.Errorf("failed to create change provider: %w", err) } + // Create pusher + psh, err := newPusher(logger, scope) + if err != nil { + return fmt.Errorf("failed to create pusher: %w", err) + } + // Register controllers - if err := registerControllers(c, logger.Sugar(), scope, registry, mc, cp, cnt, store); err != nil { + if err := registerControllers(c, logger.Sugar(), scope, registry, mc, cp, psh, cnt, store); err != nil { return err } @@ -389,7 +397,7 @@ func newTopicRegistry(q extqueue.Queue, subscriberName string) (consumer.TopicRe // │ │ │ // └────────┴────────────────────────┘ -func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope tally.Scope, registry consumer.TopicRegistry, mc mergechecker.MergeChecker, cp changeprovider.ChangeProvider, cnt counter.Counter, store storage.Storage) error { +func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope tally.Scope, registry consumer.TopicRegistry, mc mergechecker.MergeChecker, cp changeprovider.ChangeProvider, psh pusher.Pusher, cnt counter.Counter, store storage.Storage) error { requestController := start.NewController( logger, scope, @@ -495,6 +503,7 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t scope, store, registry, + psh, consumer.TopicKeyMerge, "orchestrator-merge", ) @@ -595,3 +604,21 @@ func newChangeProvider(logger *zap.Logger, scope tally.Scope) (changeprovider.Ch MetricsScope: scope.SubScope("changeprovider"), }), nil } + +// newPusher creates a git-backed Pusher bound to the configured checkout +// path, remote, and target branch. Configured via PUSHER_CHECKOUT_PATH +// (required), PUSHER_REMOTE (default "origin"), and PUSHER_TARGET (default +// "main"). +func newPusher(logger *zap.Logger, scope tally.Scope) (pusher.Pusher, error) { + checkout := os.Getenv("PUSHER_CHECKOUT_PATH") + if checkout == "" { + return nil, fmt.Errorf("PUSHER_CHECKOUT_PATH environment variable is required") + } + return gitpusher.NewPusher(gitpusher.Params{ + CheckoutPath: checkout, + Remote: getEnv("PUSHER_REMOTE", "origin"), + Target: getEnv("PUSHER_TARGET", "main"), + Logger: logger.Sugar(), + MetricsScope: scope.SubScope("pusher"), + }), nil +} diff --git a/extension/pusher/BUILD.bazel b/extension/pusher/BUILD.bazel new file mode 100644 index 00000000..a1aa1383 --- /dev/null +++ b/extension/pusher/BUILD.bazel @@ -0,0 +1,9 @@ +load("@rules_go//go:def.bzl", "go_library") + +go_library( + name = "pusher", + srcs = ["pusher.go"], + importpath = "github.com/uber/submitqueue/extension/pusher", + visibility = ["//visibility:public"], + deps = ["//entity"], +) diff --git a/extension/pusher/README.md b/extension/pusher/README.md new file mode 100644 index 00000000..0703cfd0 --- /dev/null +++ b/extension/pusher/README.md @@ -0,0 +1,44 @@ +# Pusher + +Pluggable abstraction for landing a list of `entity.Change` values onto a +target branch and pushing the result to a source-control remote. + +## Interface + +`Pusher` exposes a single `Push` method that accepts a list of changes. +Implementations are bound to a specific `(checkout, remote, target)` tuple +at construction time, so the interface itself stays vendor- and +configuration-agnostic. + +The interface enforces an **all-or-nothing atomicity contract**: when `Push` +returns an error, no change has reached the remote — neither partially nor +fully. Callers can treat a non-nil error as "the remote is exactly as it was +before the call". The `ErrConflict` sentinel marks user-caused failures so +callers can route them to a non-retry path. + +A successful `Push` returns one `ChangeOutcome` per input change in input +order. Each outcome reports either: + +- `OutcomeStatusCommitted` with the list of `CommitSHAs` produced on the + target branch (one change can land as multiple commits, e.g. a stack of + PRs); or +- `OutcomeStatusAlreadyExisted` with no commits, when the change is already + present on the target branch (previously landed via another path, or + subsumed by an earlier change in the same push). Git surfaces this as + "rebased out" during a cherry-pick. + +## Implementations + +- [`git/`](git/) — applies changes against a local checkout via `git + cherry-pick`, then `git push`. Construction takes the path to the + checkout, the remote name, and the target branch; the implementation + owns that working tree and serializes concurrent invocations. + +## Adding a new backend + +1. Create `extension/pusher/{backend}/` with a `Pusher` implementation. +2. Bind the implementation to its checkout/remote/target at construction. +3. Map each `entity.Change` to the backend's commit/push primitives. +4. Honour the atomicity contract: never publish partial state. Return + `ErrConflict` (wrapped) for user-caused apply failures and a plain error + for transient infra failures. diff --git a/extension/pusher/git/BUILD.bazel b/extension/pusher/git/BUILD.bazel new file mode 100644 index 00000000..cfa7623d --- /dev/null +++ b/extension/pusher/git/BUILD.bazel @@ -0,0 +1,30 @@ +load("@rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "git", + srcs = ["git_pusher.go"], + importpath = "github.com/uber/submitqueue/extension/pusher/git", + visibility = ["//visibility:public"], + deps = [ + "//core/metrics", + "//entity", + "//entity/github", + "//extension/pusher", + "@com_github_uber_go_tally_v4//:tally", + "@org_uber_go_zap//:zap", + ], +) + +go_test( + name = "git_test", + srcs = ["git_pusher_test.go"], + embed = [":git"], + deps = [ + "//entity", + "//extension/pusher", + "@com_github_stretchr_testify//assert", + "@com_github_stretchr_testify//require", + "@com_github_uber_go_tally_v4//:tally", + "@org_uber_go_zap//zaptest", + ], +) diff --git a/extension/pusher/git/git_pusher.go b/extension/pusher/git/git_pusher.go new file mode 100644 index 00000000..2ff4b885 --- /dev/null +++ b/extension/pusher/git/git_pusher.go @@ -0,0 +1,435 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package git is a simple Pusher implementation backed by a local git +// checkout. +// +// On every Push call the implementation: +// +// 1. Fetches the configured remote. +// 2. Resets the checkout's HEAD to refs/remotes//. +// 3. Cherry-picks the head SHA of every URI of every Change, in order. +// A pick that produces no new content (because the change is already +// present on the target branch) is what git surfaces as "rebased out" +// and is recorded as OutcomeStatusAlreadyExisted. +// A pick that fails to apply cleanly is treated as a conflict. +// 4. Pushes HEAD to refs/heads/ on the remote. +// +// Atomicity: nothing is published to the remote until step 4 succeeds. If +// any cherry-pick fails the in-progress pick is aborted and Push returns an +// error without ever invoking step 4. +// +// Contention: if step 4 fails because the remote tip moved between step 2 +// and step 4 (typically a concurrent push from another writer), the whole +// fetch/reset/cherry-pick/push cycle is retried. Detection works by +// re-fetching the remote tip after a push failure and comparing it to the +// SHA we reset to in step 2: if it advanced, the failure is treated as +// contention. Other push failures (network, auth, hook reject without ref +// change) propagate immediately. Retries are capped at +// Params.MaxPushAttempts (default 10) to bound the worst case. +// +// Construction takes the path to an existing checkout, the remote name, and +// the target branch — the implementation owns the working tree at that path +// for the duration of any in-flight Push call and serializes concurrent +// invocations. +// +// Change URIs are parsed using the github-family URI format (see +// entity/github), so each URI's last segment is interpreted as the head +// commit SHA. The SHA must be reachable from a ref on the remote so that +// `git fetch` makes it available locally. +package git + +import ( + "bytes" + "context" + "fmt" + "os/exec" + "strings" + "sync" + + "github.com/uber-go/tally/v4" + "go.uber.org/zap" + + coremetrics "github.com/uber/submitqueue/core/metrics" + "github.com/uber/submitqueue/entity" + entitygithub "github.com/uber/submitqueue/entity/github" + "github.com/uber/submitqueue/extension/pusher" +) + +// defaultMaxPushAttempts caps the retry loop in Push when the remote tip +// keeps moving under us. Bounded to prevent an infinite loop against a +// pathologically busy remote. +const defaultMaxPushAttempts = 10 + +// Params holds the dependencies for the git Pusher. +type Params struct { + // CheckoutPath is the absolute path to an existing git checkout that the + // Pusher will operate against. The Pusher owns this working tree. + CheckoutPath string + // Remote is the name of the git remote to fetch from and push to + // (e.g. "origin"). + Remote string + // Target is the destination branch ref on the remote (e.g. "main"). + Target string + // Logger is the structured logger. + Logger *zap.SugaredLogger + // MetricsScope is the metrics scope for instrumentation. + MetricsScope tally.Scope + // MaxPushAttempts caps how many times Push retries the full + // fetch/reset/cherry-pick/push cycle when the remote tip moves under + // it. Defaults to defaultMaxPushAttempts when zero or negative. + MaxPushAttempts int +} + +// gitPusher implements pusher.Pusher by shelling out to the `git` CLI +// against a local checkout. +type gitPusher struct { + checkoutPath string + remote string + target string + logger *zap.SugaredLogger + metricsScope tally.Scope + maxPushAttempts int + + // mu serializes concurrent Push calls — the underlying checkout cannot + // be safely shared between operations. + mu sync.Mutex +} + +// Verify gitPusher implements pusher.Pusher at compile time. +var _ pusher.Pusher = (*gitPusher)(nil) + +// NewPusher constructs a new git-backed Pusher operating against the given +// checkout. The checkout must already exist and have the configured remote. +func NewPusher(params Params) pusher.Pusher { + maxAttempts := params.MaxPushAttempts + if maxAttempts <= 0 { + maxAttempts = defaultMaxPushAttempts + } + return &gitPusher{ + checkoutPath: params.CheckoutPath, + remote: params.Remote, + target: params.Target, + logger: params.Logger.Named("git_pusher"), + metricsScope: params.MetricsScope.SubScope("git_pusher"), + maxPushAttempts: maxAttempts, + } +} + +// Push fulfils the pusher.Pusher contract. +func (p *gitPusher) Push(ctx context.Context, changes []entity.Change) (ret pusher.Result, retErr error) { + op := coremetrics.Begin(p.metricsScope, "push") + defer func() { op.Complete(retErr) }() + + p.mu.Lock() + defer p.mu.Unlock() + + if len(changes) == 0 { + coremetrics.NamedCounter(p.metricsScope, "push", "empty_changes", 1) + return pusher.Result{}, fmt.Errorf("push called with no changes") + } + + p.logger.Debugw("starting push", + "target", p.target, + "remote", p.remote, + "change_count", len(changes), + ) + + var lastErr error + for attempt := 1; attempt <= p.maxPushAttempts; attempt++ { + baseSHA, outcomes, err := p.tryPush(ctx, changes) + if err == nil { + p.logger.Debugw("push complete", + "target", p.target, + "outcomes", outcomes, + ) + return pusher.Result{Outcomes: outcomes}, nil + } + + // Was the failure caused by the remote tip moving under us between + // reset and push (concurrent-push contention)? That's the only push + // failure worth retrying; everything else (network, auth, hook + // reject without ref change) is fatal. Detection is by re-fetching + // the remote tip and comparing to baseSHA — robust against varying + // git error message formats. baseSHA is empty when the failure + // happened before reset captured a base; treat those as fatal too. + if baseSHA == "" { + return pusher.Result{}, err + } + currentSHA, refetchErr := p.refetchTipSHA(ctx) + if refetchErr != nil { + return pusher.Result{}, fmt.Errorf("refetch after push failure failed: %v (original push error: %w)", refetchErr, err) + } + if currentSHA == baseSHA { + return pusher.Result{}, err + } + + coremetrics.NamedCounter(p.metricsScope, "push", "stale_base_retries", 1) + p.logger.Warnw("remote tip moved during push, retrying", + "attempt", attempt, + "max_attempts", p.maxPushAttempts, + "base_sha", baseSHA, + "current_sha", currentSHA, + "err", err, + ) + lastErr = err + } + + coremetrics.NamedCounter(p.metricsScope, "push", "stale_base_giveup", 1) + return pusher.Result{}, fmt.Errorf("exceeded %d push attempts due to remote contention: %w", p.maxPushAttempts, lastErr) +} + +// tryPush runs one full reset+cherry-pick+push cycle. The returned baseSHA +// is the SHA the cycle was based on (set as soon as resetToRemote completes) +// so the caller can distinguish concurrent-push contention from other push +// failures. baseSHA is empty when the failure happened before reset +// produced a base. +func (p *gitPusher) tryPush(ctx context.Context, changes []entity.Change) (string, []pusher.ChangeOutcome, error) { + if err := p.resetToRemote(ctx); err != nil { + coremetrics.NamedCounter(p.metricsScope, "push", "reset_errors", 1) + return "", nil, err + } + baseSHA, err := p.headSHA(ctx) + if err != nil { + return "", nil, err + } + + outcomes, err := p.cherryPickAll(ctx, changes) + if err != nil { + // Best-effort cleanup so the next attempt starts from a known state. + // The next attempt starts with resetToRemote regardless, so we don't + // care if --abort itself fails (e.g., no pick is in progress). + _, _ = p.run(ctx, nil, "cherry-pick", "--abort") + return baseSHA, nil, err + } + + if err := p.push(ctx); err != nil { + coremetrics.NamedCounter(p.metricsScope, "push", "git_push_errors", 1) + return baseSHA, nil, err + } + return baseSHA, outcomes, nil +} + +// headSHA returns the SHA at HEAD in the local checkout. +func (p *gitPusher) headSHA(ctx context.Context) (string, error) { + out, err := p.run(ctx, nil, "rev-parse", "HEAD") + if err != nil { + return "", fmt.Errorf("git rev-parse HEAD: %w", err) + } + return strings.TrimSpace(string(out)), nil +} + +// refetchTipSHA fetches the remote and returns the current SHA at +// refs/remotes//. Used after a push failure to detect +// whether the remote tip moved under us. +func (p *gitPusher) refetchTipSHA(ctx context.Context) (string, error) { + if _, err := p.run(ctx, nil, "fetch", p.remote); err != nil { + return "", fmt.Errorf("git fetch %s: %w", p.remote, err) + } + remoteRef := p.remote + "/" + p.target + out, err := p.run(ctx, nil, "rev-parse", remoteRef) + if err != nil { + return "", fmt.Errorf("git rev-parse %s: %w", remoteRef, err) + } + return strings.TrimSpace(string(out)), nil +} + +// resetToRemote fetches the configured remote and hard-resets the checkout's +// HEAD to refs/remotes//, producing a clean working tree +// pinned to the latest remote tip. +func (p *gitPusher) resetToRemote(ctx context.Context) error { + if _, err := p.run(ctx, nil, "fetch", p.remote); err != nil { + return fmt.Errorf("git fetch %s: %w", p.remote, err) + } + remoteRef := p.remote + "/" + p.target + if _, err := p.run(ctx, nil, "reset", "--hard", remoteRef); err != nil { + return fmt.Errorf("git reset --hard %s: %w", remoteRef, err) + } + if _, err := p.run(ctx, nil, "clean", "-fdx"); err != nil { + return fmt.Errorf("git clean: %w", err) + } + return nil +} + +// cherryPickAll walks the changes in order, cherry-picking every URI's head +// SHA, and returns one ChangeOutcome per Change in the same order. +func (p *gitPusher) cherryPickAll(ctx context.Context, changes []entity.Change) ([]pusher.ChangeOutcome, error) { + outcomes := make([]pusher.ChangeOutcome, 0, len(changes)) + for _, change := range changes { + commits, err := p.cherryPickChange(ctx, change) + if err != nil { + return nil, err + } + status := pusher.OutcomeStatusCommitted + if len(commits) == 0 { + status = pusher.OutcomeStatusAlreadyExisted + } + outcomes = append(outcomes, pusher.ChangeOutcome{ + Change: change, + Status: status, + CommitSHAs: commits, + }) + } + return outcomes, nil +} + +// cherryPickChange parses each URI in the change, fetches the referenced +// SHA, and cherry-picks it. It returns the list of new commit SHAs +// produced for this change (empty if every pick was a no-op because the +// content was already on the target branch). +func (p *gitPusher) cherryPickChange(ctx context.Context, change entity.Change) ([]string, error) { + var commits []string + for _, uri := range change.URIs { + cid, err := entitygithub.ParseChangeID(uri) + if err != nil { + return nil, fmt.Errorf("invalid change URI %q: %w", uri, err) + } + + sha, picked, err := p.cherryPickSHA(ctx, cid.HeadCommitSHA) + if err != nil { + return nil, fmt.Errorf("cherry-pick %s (uri %q): %w", cid.HeadCommitSHA, uri, err) + } + if picked { + commits = append(commits, sha) + } + } + return commits, nil +} + +// cherryPickSHA cherry-picks a single SHA. It returns: +// - the new commit SHA and picked=true on a successful pick that produced +// a non-empty commit; +// - "" and picked=false when the pick is a no-op (the change is already +// on the target branch); the empty commit is rolled back so it doesn't +// get pushed; +// - an error: pusher.ErrConflict when the pick fails to apply because of a conflict (generally a user-caused failure), +// or any other error when the pick fails for any other reason (potentially recoverable). +// +// `--allow-empty` covers the case where the source commit itself was +// originally empty. For redundant picks (the change is already on target, +// so applying it produces no new diff) git refuses with "previous +// cherry-pick is now empty"; we recover by running `cherry-pick --skip` +// and reporting the change as already-existed (what git would call +// "rebased out"). +func (p *gitPusher) cherryPickSHA(ctx context.Context, sha string) (string, bool, error) { + out, err := p.runCombined(ctx, nil, "cherry-pick", "--allow-empty", sha) + if err != nil { + if isRedundantCherryPick(out) { + if _, skipErr := p.run(ctx, nil, "cherry-pick", "--skip"); skipErr != nil { + return "", false, fmt.Errorf("git cherry-pick --skip after redundant pick: %w", skipErr) + } + return "", false, nil + } + coremetrics.NamedCounter(p.metricsScope, "push", "cherry_pick_conflicts", 1) + return "", false, fmt.Errorf("%w: git cherry-pick %s: %s", pusher.ErrConflict, sha, strings.TrimSpace(string(out))) + } + + // `--allow-empty` lets a genuinely empty source commit through as an + // empty commit on target. Detect and roll that back so it doesn't get + // pushed, and report it as already-existed. + empty, err := p.isEmptyHEADCommit(ctx) + if err != nil { + return "", false, err + } + if empty { + if _, err := p.run(ctx, nil, "reset", "--hard", "HEAD^"); err != nil { + return "", false, fmt.Errorf("git reset --hard HEAD^ after empty pick: %w", err) + } + return "", false, nil + } + + headOut, err := p.run(ctx, nil, "rev-parse", "HEAD") + if err != nil { + return "", false, fmt.Errorf("git rev-parse HEAD: %w", err) + } + return strings.TrimSpace(string(headOut)), true, nil +} + +// isEmptyHEADCommit returns true when HEAD's tree matches HEAD^'s tree — +// i.e. the most recent commit introduces no changes. +func (p *gitPusher) isEmptyHEADCommit(ctx context.Context) (bool, error) { + headTree, err := p.commitTreeSHA(ctx, "HEAD") + if err != nil { + return false, err + } + parentTree, err := p.commitTreeSHA(ctx, "HEAD^") + if err != nil { + return false, err + } + return headTree == parentTree, nil +} + +// commitTreeSHA returns the tree SHA recorded in the commit object at ref. +// It reads the raw commit via `git cat-file commit` and parses the leading +// `tree ` line — the same data `rev-parse ^{tree}` would peel to, +// but without depending on revision-syntax magic. +func (p *gitPusher) commitTreeSHA(ctx context.Context, ref string) (string, error) { + out, err := p.run(ctx, nil, "cat-file", "commit", ref) + if err != nil { + return "", fmt.Errorf("git cat-file commit %s: %w", ref, err) + } + firstLine, _, _ := strings.Cut(string(out), "\n") + const prefix = "tree " + if !strings.HasPrefix(firstLine, prefix) { + return "", fmt.Errorf("git cat-file commit %s: unexpected first line %q", ref, firstLine) + } + return strings.TrimSpace(firstLine[len(prefix):]), nil +} + +// push pushes the current HEAD to refs/heads/ on the remote. +func (p *gitPusher) push(ctx context.Context) error { + refspec := "HEAD:refs/heads/" + p.target + if _, err := p.run(ctx, nil, "push", p.remote, refspec); err != nil { + return fmt.Errorf("git push %s %s: %w", p.remote, refspec, err) + } + return nil +} + +// run executes a `git` command in the checkout. Returns captured stdout and +// an error that includes captured stderr for diagnostics. +func (p *gitPusher) run(ctx context.Context, stdin []byte, args ...string) ([]byte, error) { + cmd := exec.CommandContext(ctx, "git", args...) + cmd.Dir = p.checkoutPath + if stdin != nil { + cmd.Stdin = bytes.NewReader(stdin) + } + var stdout, stderr bytes.Buffer + cmd.Stdout = &stdout + cmd.Stderr = &stderr + if err := cmd.Run(); err != nil { + return nil, fmt.Errorf("%w: %s", err, strings.TrimSpace(stderr.String())) + } + return stdout.Bytes(), nil +} + +// runCombined is like run but returns combined stdout+stderr both on success +// and failure. Used when the caller needs to inspect git's diagnostic +// output (e.g., to detect "previous cherry-pick is now empty"). +func (p *gitPusher) runCombined(ctx context.Context, stdin []byte, args ...string) ([]byte, error) { + cmd := exec.CommandContext(ctx, "git", args...) + cmd.Dir = p.checkoutPath + if stdin != nil { + cmd.Stdin = bytes.NewReader(stdin) + } + return cmd.CombinedOutput() +} + +// isRedundantCherryPick reports whether git's cherry-pick output indicates +// the pick was rejected because the change is already present on target +// (i.e. applying it would produce no diff). +func isRedundantCherryPick(out []byte) bool { + s := string(out) + return strings.Contains(s, "previous cherry-pick is now empty") || + strings.Contains(s, "nothing to commit") +} diff --git a/extension/pusher/git/git_pusher_test.go b/extension/pusher/git/git_pusher_test.go new file mode 100644 index 00000000..e79221c0 --- /dev/null +++ b/extension/pusher/git/git_pusher_test.go @@ -0,0 +1,507 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package git + +import ( + "bytes" + "context" + "errors" + "fmt" + "os" + "os/exec" + "path/filepath" + "strconv" + "strings" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/uber-go/tally/v4" + "go.uber.org/zap/zaptest" + + "github.com/uber/submitqueue/entity" + "github.com/uber/submitqueue/extension/pusher" +) + +// gitFixture provides a bare "remote" repository plus a working checkout +// that pushes to it. Tests run real `git` commands so we exercise the same +// code path as production. +// +// The fixture also exposes helpers for pushing additional commits to side +// branches on the remote, so each test can build the SHAs it needs the +// Pusher to cherry-pick. +type gitFixture struct { + root string + remoteDir string + checkoutDir string + authorDir string // a separate working clone used to author "PR" commits +} + +func setupGitFixture(t *testing.T) gitFixture { + t.Helper() + + if _, err := exec.LookPath("git"); err != nil { + t.Skip("git not available on PATH") + } + + root := t.TempDir() + remoteDir := filepath.Join(root, "remote.git") + checkoutDir := filepath.Join(root, "checkout") + authorDir := filepath.Join(root, "author") + + mustGit(t, root, "init", "--bare", "-b", "main", remoteDir) + + // Seed main with one initial commit so the Pusher's reset/fetch flow has + // something to land on. + mustGit(t, root, "init", "-b", "main", authorDir) + configRepo(t, authorDir, "author", "author@example.com") + require.NoError(t, writeFile(filepath.Join(authorDir, "hello.txt"), "hello\nworld\n")) + mustGit(t, authorDir, "add", ".") + mustGit(t, authorDir, "commit", "-m", "seed") + mustGit(t, authorDir, "remote", "add", "origin", remoteDir) + mustGit(t, authorDir, "push", "origin", "main") + + mustGit(t, root, "clone", remoteDir, checkoutDir) + configRepo(t, checkoutDir, "checkout", "checkout@example.com") + + return gitFixture{ + root: root, + remoteDir: remoteDir, + checkoutDir: checkoutDir, + authorDir: authorDir, + } +} + +// configRepo applies the test-only config that lets git commit work in a +// sandbox without GPG signing, system identity, or system git hooks. The +// devpod environment installs hooks via the system git config (core.hooksPath +// = /etc/git-hooks) that interfere with test commits — point each test repo +// at an empty hooks dir to disarm them without resorting to --no-verify. +func configRepo(t *testing.T, dir, name, email string) { + mustGit(t, dir, "config", "user.name", name) + mustGit(t, dir, "config", "user.email", email) + mustGit(t, dir, "config", "commit.gpgsign", "false") + mustGit(t, dir, "config", "tag.gpgsign", "false") + + hooksDir := filepath.Join(dir, ".no-hooks") + require.NoError(t, os.MkdirAll(hooksDir, 0o755)) + mustGit(t, dir, "config", "core.hooksPath", hooksDir) +} + +// pushPRCommit creates a single commit on a feature branch on the remote +// branched off the *current* origin/main, returning the resulting SHA. +func (f gitFixture) pushPRCommit(t *testing.T, branch, path, contents, message string) string { + t.Helper() + mustGit(t, f.authorDir, "fetch", "origin") + return f.pushPRCommitFrom(t, "origin/main", branch, path, contents, message) +} + +// pushPRCommitFrom creates a single commit on `branch` based on the given +// base ref, returning the resulting SHA. Use this to create branches that +// diverge from a specific ancestor (so they conflict when cherry-picked +// onto a newer main). +func (f gitFixture) pushPRCommitFrom(t *testing.T, base, branch, path, contents, message string) string { + t.Helper() + mustGit(t, f.authorDir, "checkout", "-B", branch, base) + require.NoError(t, writeFile(filepath.Join(f.authorDir, path), contents)) + mustGit(t, f.authorDir, "add", ".") + mustGit(t, f.authorDir, "commit", "-m", message) + mustGit(t, f.authorDir, "push", "-f", "origin", branch) + out := mustGitOutput(t, f.authorDir, "rev-parse", "HEAD") + return strings.TrimSpace(string(out)) +} + +// remoteSHA returns the SHA at refs/heads/ on the bare remote. +func (f gitFixture) remoteSHA(t *testing.T, branch string) string { + t.Helper() + out := mustGitOutput(t, f.remoteDir, "rev-parse", branch) + return strings.TrimSpace(string(out)) +} + +// landOnMain cherry-picks an arbitrary SHA directly onto main on the +// remote, simulating "this content is already on the target branch" for +// rebased-out tests. +func (f gitFixture) landOnMain(t *testing.T, sha string) { + t.Helper() + mustGit(t, f.authorDir, "fetch", "origin") + mustGit(t, f.authorDir, "checkout", "-B", "land", "origin/main") + mustGit(t, f.authorDir, "cherry-pick", sha) + mustGit(t, f.authorDir, "push", "origin", "land:main") +} + +// uri builds a github-format URI ending in `sha` so the Pusher's parser +// resolves it to that SHA. +func uri(sha string) string { + return fmt.Sprintf("github://uber/submitqueue/1/%s", sha) +} + +func (f gitFixture) newPusher(t *testing.T) pusher.Pusher { + return NewPusher(Params{ + CheckoutPath: f.checkoutDir, + Remote: "origin", + Target: "main", + Logger: zaptest.NewLogger(t).Sugar(), + MetricsScope: tally.NoopScope, + }) +} + +// installRaceHook writes a pre-receive hook on the bare remote that +// simulates concurrent pushes. On its Nth invocation it reads the Nth line +// of race-shas, points refs/heads/main at that SHA via update-ref, and +// exits 1 (rejecting the current push). Once race-shas is exhausted it +// exits 0 and the push goes through. +// +// Combined with the gitPusher's contention-detection (refetch + compare +// base SHA), this lets a test drive the full retry loop using only real +// git mechanics — the second-attempt's reset picks up the moved tip and +// proceeds from there. +func (f gitFixture) installRaceHook(t *testing.T, raceSHAs []string) { + t.Helper() + hookDir := filepath.Join(f.remoteDir, "hooks") + require.NoError(t, os.MkdirAll(hookDir, 0o755)) + // Override the system-wide core.hooksPath so the hook we just wrote + // actually fires on the bare remote (the devpod sets a global + // /etc/git-hooks directory that would otherwise win). + mustGit(t, f.remoteDir, "config", "core.hooksPath", hookDir) + require.NoError(t, writeFile( + filepath.Join(hookDir, "race-shas"), + strings.Join(raceSHAs, "\n")+"\n", + )) + const script = `#!/bin/sh +counter_file="$GIT_DIR/hooks/race-counter" +race_sha_file="$GIT_DIR/hooks/race-shas" +count=$(cat "$counter_file" 2>/dev/null || echo 0) +count=$((count + 1)) +echo "$count" > "$counter_file" +next_sha=$(sed -n "${count}p" "$race_sha_file") +if [ -z "$next_sha" ]; then + exit 0 +fi +# Pre-receive runs in git's quarantine env; unset its markers so update-ref +# is allowed to mutate the live ref store. +unset GIT_QUARANTINE_PATH GIT_OBJECT_DIRECTORY GIT_ALTERNATE_OBJECT_DIRECTORIES +git update-ref refs/heads/main "$next_sha" +echo "race hook moved main to $next_sha and rejected push" >&2 +exit 1 +` + hookPath := filepath.Join(hookDir, "pre-receive") + require.NoError(t, os.WriteFile(hookPath, []byte(script), 0o755)) +} + +// hookInvocations returns the number of times the pre-receive race hook +// has fired. Used by retry tests to verify the loop ran the expected +// number of attempts. +func (f gitFixture) hookInvocations(t *testing.T) int { + t.Helper() + data, err := os.ReadFile(filepath.Join(f.remoteDir, "hooks", "race-counter")) + if errors.Is(err, os.ErrNotExist) { + return 0 + } + require.NoError(t, err) + n, err := strconv.Atoi(strings.TrimSpace(string(data))) + require.NoError(t, err) + return n +} + +// remoteHEAD returns the SHA that origin/main currently points to. +func (f gitFixture) remoteHEAD(t *testing.T) string { + out := mustGitOutput(t, f.remoteDir, "rev-parse", "main") + return strings.TrimSpace(string(out)) +} + +// remoteFile returns the contents of `path` at origin/main on the bare remote. +func (f gitFixture) remoteFile(t *testing.T, path string) string { + out := mustGitOutput(t, f.remoteDir, "show", "main:"+path) + return string(out) +} + +// remoteCommitsSinceSeed returns commit SHAs on origin/main newer than the +// first (seed) commit, in chronological order. +func (f gitFixture) remoteCommitsSinceSeed(t *testing.T) []string { + out := mustGitOutput(t, f.remoteDir, "log", "--reverse", "--format=%H", "main") + all := strings.Split(strings.TrimSpace(string(out)), "\n") + if len(all) <= 1 { + return nil + } + return all[1:] +} + +func TestPusher_Push_SingleChangeSingleURIProducesOneCommit(t *testing.T) { + f := setupGitFixture(t) + sha := f.pushPRCommit(t, "feature/a", "hello.txt", "hello\nearth\n", "tweak hello") + p := f.newPusher(t) + + res, err := p.Push(context.Background(), []entity.Change{ + {URIs: []string{uri(sha)}}, + }) + require.NoError(t, err) + require.Len(t, res.Outcomes, 1) + + out := res.Outcomes[0] + assert.Equal(t, pusher.OutcomeStatusCommitted, out.Status) + require.Len(t, out.CommitSHAs, 1) + assert.Equal(t, []string{out.CommitSHAs[0]}, f.remoteCommitsSinceSeed(t)) + assert.Equal(t, "hello\nearth\n", f.remoteFile(t, "hello.txt")) +} + +func TestPusher_Push_StackedURIsProduceMultipleCommitsForOneChange(t *testing.T) { + f := setupGitFixture(t) + // Build a stack on a single branch so the second SHA's parent is the first SHA. + mustGit(t, f.authorDir, "fetch", "origin") + mustGit(t, f.authorDir, "checkout", "-B", "feature/stack", "origin/main") + + require.NoError(t, writeFile(filepath.Join(f.authorDir, "hello.txt"), "hello\nearth\n")) + mustGit(t, f.authorDir, "add", ".") + mustGit(t, f.authorDir, "commit", "-m", "step 1") + sha1 := strings.TrimSpace(string(mustGitOutput(t, f.authorDir, "rev-parse", "HEAD"))) + + require.NoError(t, writeFile(filepath.Join(f.authorDir, "hello.txt"), "hello\nearth\ngoodbye\n")) + mustGit(t, f.authorDir, "add", ".") + mustGit(t, f.authorDir, "commit", "-m", "step 2") + sha2 := strings.TrimSpace(string(mustGitOutput(t, f.authorDir, "rev-parse", "HEAD"))) + + mustGit(t, f.authorDir, "push", "-f", "origin", "feature/stack") + + p := f.newPusher(t) + res, err := p.Push(context.Background(), []entity.Change{ + {URIs: []string{uri(sha1), uri(sha2)}}, + }) + require.NoError(t, err) + require.Len(t, res.Outcomes, 1) + + out := res.Outcomes[0] + assert.Equal(t, pusher.OutcomeStatusCommitted, out.Status) + require.Len(t, out.CommitSHAs, 2) + assert.Equal(t, out.CommitSHAs, f.remoteCommitsSinceSeed(t)) + assert.Equal(t, "hello\nearth\ngoodbye\n", f.remoteFile(t, "hello.txt")) +} + +func TestPusher_Push_AlreadyLandedChangeIsRebasedOut(t *testing.T) { + f := setupGitFixture(t) + sha := f.pushPRCommit(t, "feature/a", "hello.txt", "hello\nearth\n", "tweak hello") + + // Land the same content on main outside the Pusher so the cherry-pick + // finds nothing new to add. + f.landOnMain(t, sha) + mainBeforePush := f.remoteHEAD(t) + + p := f.newPusher(t) + res, err := p.Push(context.Background(), []entity.Change{ + {URIs: []string{uri(sha)}}, + }) + require.NoError(t, err) + require.Len(t, res.Outcomes, 1) + + out := res.Outcomes[0] + assert.Equal(t, pusher.OutcomeStatusAlreadyExisted, out.Status) + assert.Empty(t, out.CommitSHAs) + assert.Equal(t, mainBeforePush, f.remoteHEAD(t), + "rebased-out push should not advance the remote tip") +} + +func TestPusher_Push_MixedChangesPartiallyRebasedOut(t *testing.T) { + f := setupGitFixture(t) + subsumedSHA := f.pushPRCommit(t, "feature/a", "hello.txt", "hello\nearth\n", "tweak hello") + f.landOnMain(t, subsumedSHA) + + freshSHA := f.pushPRCommit(t, "feature/b", "extra.txt", "extra\n", "add extra") + + p := f.newPusher(t) + res, err := p.Push(context.Background(), []entity.Change{ + {URIs: []string{uri(subsumedSHA)}}, + {URIs: []string{uri(freshSHA)}}, + }) + require.NoError(t, err) + require.Len(t, res.Outcomes, 2) + + assert.Equal(t, pusher.OutcomeStatusAlreadyExisted, res.Outcomes[0].Status) + assert.Empty(t, res.Outcomes[0].CommitSHAs) + + assert.Equal(t, pusher.OutcomeStatusCommitted, res.Outcomes[1].Status) + require.Len(t, res.Outcomes[1].CommitSHAs, 1) + + assert.Equal(t, "extra\n", f.remoteFile(t, "extra.txt")) +} + +func TestPusher_Push_ConflictReturnsErrConflictAndDoesNotPush(t *testing.T) { + f := setupGitFixture(t) + seedSHA := f.remoteSHA(t, "main") + + // Both branches start from the same seed and change the same line in + // different ways, then "earth" lands first directly on main. The + // Pusher's attempt to land "mars" must conflict. + earthSHA := f.pushPRCommitFrom(t, seedSHA, "feature/a", "hello.txt", "hello\nearth\n", "earth") + f.landOnMain(t, earthSHA) + mainBefore := f.remoteHEAD(t) + + conflictingSHA := f.pushPRCommitFrom(t, seedSHA, "feature/b", "hello.txt", "hello\nmars\n", "mars") + + p := f.newPusher(t) + _, err := p.Push(context.Background(), []entity.Change{ + {URIs: []string{uri(conflictingSHA)}}, + }) + require.Error(t, err) + assert.True(t, errors.Is(err, pusher.ErrConflict)) + + assert.Equal(t, mainBefore, f.remoteHEAD(t), + "on conflict the remote tip must not move") +} + +func TestPusher_Push_ResetsBetweenCalls(t *testing.T) { + f := setupGitFixture(t) + sha := f.pushPRCommit(t, "feature/a", "hello.txt", "hello\nearth\n", "tweak hello") + p := f.newPusher(t) + + // Dirty the checkout so that, without a reset, subsequent operations + // would fail or include unrelated changes. + require.NoError(t, writeFile(filepath.Join(f.checkoutDir, "stray.txt"), "leftover\n")) + + res, err := p.Push(context.Background(), []entity.Change{ + {URIs: []string{uri(sha)}}, + }) + require.NoError(t, err) + require.Len(t, res.Outcomes[0].CommitSHAs, 1) + + out := mustGitOutput(t, f.remoteDir, "ls-tree", "--name-only", res.Outcomes[0].CommitSHAs[0]) + assert.NotContains(t, string(out), "stray.txt", "unrelated file should not have landed") + assert.Contains(t, string(out), "hello.txt") +} + +func TestPusher_Push_RecoversAfterPriorConflict(t *testing.T) { + f := setupGitFixture(t) + seedSHA := f.remoteSHA(t, "main") + + first := f.pushPRCommitFrom(t, seedSHA, "feature/a", "hello.txt", "hello\nearth\n", "earth") + f.landOnMain(t, first) + conflictingSHA := f.pushPRCommitFrom(t, seedSHA, "feature/b", "hello.txt", "hello\nmars\n", "mars") + + p := f.newPusher(t) + _, err := p.Push(context.Background(), []entity.Change{ + {URIs: []string{uri(conflictingSHA)}}, + }) + require.Error(t, err) + + // A subsequent, clean push must succeed even though the prior call left + // a cherry-pick in progress before its rollback. + freshSHA := f.pushPRCommit(t, "feature/c", "extra.txt", "extra\n", "add extra") + res, err := p.Push(context.Background(), []entity.Change{ + {URIs: []string{uri(freshSHA)}}, + }) + require.NoError(t, err) + assert.Equal(t, pusher.OutcomeStatusCommitted, res.Outcomes[0].Status) + assert.Equal(t, "extra\n", f.remoteFile(t, "extra.txt")) +} + +func TestPusher_Push_RejectsEmptyChanges(t *testing.T) { + f := setupGitFixture(t) + p := f.newPusher(t) + + _, err := p.Push(context.Background(), nil) + require.Error(t, err) + assert.False(t, errors.Is(err, pusher.ErrConflict)) +} + +func TestPusher_Push_InvalidURIErrors(t *testing.T) { + f := setupGitFixture(t) + p := f.newPusher(t) + + _, err := p.Push(context.Background(), []entity.Change{ + {URIs: []string{"not a uri"}}, + }) + require.Error(t, err) +} + +func TestPusher_Push_RetriesWhenRemoteMovesUnderUs(t *testing.T) { + f := setupGitFixture(t) + // Pre-stage one race commit, install the hook, then build the feature + // commit. Order matters: pushPRCommit also goes through the hook, so + // race + feature must be on the remote before the hook is armed. + raceSHA := f.pushPRCommit(t, "race", "race.txt", "race\n", "race commit") + featureSHA := f.pushPRCommit(t, "feature/a", "hello.txt", "hello\nearth\n", "tweak hello") + f.installRaceHook(t, []string{raceSHA}) + + p := f.newPusher(t) + res, err := p.Push(context.Background(), []entity.Change{ + {URIs: []string{uri(featureSHA)}}, + }) + require.NoError(t, err) + require.Len(t, res.Outcomes, 1) + require.Len(t, res.Outcomes[0].CommitSHAs, 1) + + assert.Equal(t, 2, f.hookInvocations(t), + "first attempt rejected by hook, second attempt allowed through") + + commits := f.remoteCommitsSinceSeed(t) + require.Len(t, commits, 2) + assert.Equal(t, raceSHA, commits[0], + "race commit landed first via the hook") + assert.Equal(t, res.Outcomes[0].CommitSHAs[0], commits[1], + "our cherry-pick landed on top after the retry") + assert.Equal(t, "hello\nearth\n", f.remoteFile(t, "hello.txt")) +} + +func TestPusher_Push_GivesUpAfterMaxAttempts(t *testing.T) { + f := setupGitFixture(t) + raceSHAs := []string{ + f.pushPRCommit(t, "race1", "r1.txt", "1\n", "r1"), + f.pushPRCommit(t, "race2", "r2.txt", "2\n", "r2"), + } + featureSHA := f.pushPRCommit(t, "feature/a", "hello.txt", "hello\nearth\n", "tweak hello") + f.installRaceHook(t, raceSHAs) + + p := NewPusher(Params{ + CheckoutPath: f.checkoutDir, + Remote: "origin", + Target: "main", + Logger: zaptest.NewLogger(t).Sugar(), + MetricsScope: tally.NoopScope, + MaxPushAttempts: 2, + }) + _, err := p.Push(context.Background(), []entity.Change{ + {URIs: []string{uri(featureSHA)}}, + }) + require.Error(t, err) + assert.Equal(t, 2, f.hookInvocations(t), + "both attempts hit the hook before the cap kicked in") + // The remote ended up at race2 (the last hook injection), and our + // feature commit never landed. + assert.Equal(t, raceSHAs[1], f.remoteHEAD(t)) +} + +// --- helpers --- + +func mustGit(t *testing.T, dir string, args ...string) { + t.Helper() + cmd := exec.Command("git", args...) + cmd.Dir = dir + var stderr bytes.Buffer + cmd.Stderr = &stderr + require.NoError(t, cmd.Run(), "git %s: %s", strings.Join(args, " "), stderr.String()) +} + +func mustGitOutput(t *testing.T, dir string, args ...string) []byte { + t.Helper() + cmd := exec.Command("git", args...) + cmd.Dir = dir + var stdout, stderr bytes.Buffer + cmd.Stdout = &stdout + cmd.Stderr = &stderr + require.NoError(t, cmd.Run(), "git %s: %s", strings.Join(args, " "), stderr.String()) + return stdout.Bytes() +} + +func writeFile(path, contents string) error { + return os.WriteFile(path, []byte(contents), 0o644) +} diff --git a/extension/pusher/mock/BUILD.bazel b/extension/pusher/mock/BUILD.bazel new file mode 100644 index 00000000..428c8724 --- /dev/null +++ b/extension/pusher/mock/BUILD.bazel @@ -0,0 +1,13 @@ +load("@rules_go//go:def.bzl", "go_library") + +go_library( + name = "mock", + srcs = ["pusher_mock.go"], + importpath = "github.com/uber/submitqueue/extension/pusher/mock", + visibility = ["//visibility:public"], + deps = [ + "//entity", + "//extension/pusher", + "@org_uber_go_mock//gomock", + ], +) diff --git a/extension/pusher/mock/pusher_mock.go b/extension/pusher/mock/pusher_mock.go new file mode 100644 index 00000000..6b0e6699 --- /dev/null +++ b/extension/pusher/mock/pusher_mock.go @@ -0,0 +1,58 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: pusher.go +// +// Generated by this command: +// +// mockgen -source=pusher.go -destination=mock/pusher_mock.go -package=mock +// + +// Package mock is a generated GoMock package. +package mock + +import ( + context "context" + reflect "reflect" + + entity "github.com/uber/submitqueue/entity" + pusher "github.com/uber/submitqueue/extension/pusher" + gomock "go.uber.org/mock/gomock" +) + +// MockPusher is a mock of Pusher interface. +type MockPusher struct { + ctrl *gomock.Controller + recorder *MockPusherMockRecorder + isgomock struct{} +} + +// MockPusherMockRecorder is the mock recorder for MockPusher. +type MockPusherMockRecorder struct { + mock *MockPusher +} + +// NewMockPusher creates a new mock instance. +func NewMockPusher(ctrl *gomock.Controller) *MockPusher { + mock := &MockPusher{ctrl: ctrl} + mock.recorder = &MockPusherMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockPusher) EXPECT() *MockPusherMockRecorder { + return m.recorder +} + +// Push mocks base method. +func (m *MockPusher) Push(ctx context.Context, changes []entity.Change) (pusher.Result, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Push", ctx, changes) + ret0, _ := ret[0].(pusher.Result) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Push indicates an expected call of Push. +func (mr *MockPusherMockRecorder) Push(ctx, changes any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Push", reflect.TypeOf((*MockPusher)(nil).Push), ctx, changes) +} diff --git a/extension/pusher/pusher.go b/extension/pusher/pusher.go new file mode 100644 index 00000000..09bc9afc --- /dev/null +++ b/extension/pusher/pusher.go @@ -0,0 +1,88 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package pusher + +//go:generate mockgen -source=pusher.go -destination=mock/pusher_mock.go -package=mock + +import ( + "context" + "errors" + + "github.com/uber/submitqueue/entity" +) + +// ErrConflict is returned by a Pusher when one of the changes fails to apply +// cleanly on top of the current tip of the target branch. Callers should +// treat conflicts as user-caused and non-retryable. +var ErrConflict = errors.New("change conflict") + +// OutcomeStatus describes what happened to a single Change during a Push. +type OutcomeStatus string + +const ( + // OutcomeStatusUnknown is the unreachable zero value, set by default + // when the structure is initialized. It should never be seen in the system. + OutcomeStatusUnknown OutcomeStatus = "" + // OutcomeStatusCommitted means the change produced one or more commits + // on the target branch. CommitSHAs lists those commits in apply order. + OutcomeStatusCommitted OutcomeStatus = "committed" + // OutcomeStatusAlreadyExisted means the change produced no commits + // because every part of it is already present in the target branch + // (e.g. it previously landed via another path, or a prior change in + // the same push subsumed it). CommitSHAs is empty for this status. + // In git terms this is what a `cherry-pick` surfaces as "rebased out". + OutcomeStatusAlreadyExisted OutcomeStatus = "already_existed" +) + +// ChangeOutcome describes what happened to a single Change inside a Push. +type ChangeOutcome struct { + // Change is the input change this outcome corresponds to. + Change entity.Change + // Status describes whether the change produced commits or was already + // present on the target branch. + Status OutcomeStatus + // CommitSHAs lists the commits this change produced on the target + // branch, in apply order. A single Change may produce multiple commits + // (e.g. a stack of PRs). Empty when Status is OutcomeStatusAlreadyExisted. + CommitSHAs []string +} + +// Result is the outcome of a successful Push call. +type Result struct { + // Outcomes is one entry per input change, in the same order as the + // changes passed to Push. The slice length equals the input length. + Outcomes []ChangeOutcome +} + +// Pusher applies a list of Changes on top of a target branch and pushes the +// result to the source-control remote. Each implementation is bound to a +// specific (checkout, remote, target) at construction time. +// +// Atomicity contract: when Push returns a non-nil error, NO change has been +// pushed to the remote — neither partially nor fully. Implementations must +// either roll back any local state or arrange for the push to never happen +// when any change fails to apply. Callers can treat a non-nil error as +// "the remote is exactly as it was before the call". +// +// On success, len(Result.Outcomes) == len(changes) and Outcomes[i] describes +// what happened to changes[i]. A change can produce multiple commits +// (OutcomeStatusCommitted, CommitSHAs populated in apply order) or none at +// all (OutcomeStatusAlreadyExisted, CommitSHAs empty) — the latter happens +// when the change's content is already present on the target branch. +type Pusher interface { + // Push applies changes onto the target branch and pushes the resulting + // commits. See the type-level docs for the atomicity contract. + Push(ctx context.Context, changes []entity.Change) (Result, error) +} diff --git a/orchestrator/controller/merge/BUILD.bazel b/orchestrator/controller/merge/BUILD.bazel index 93333685..657b9ae6 100644 --- a/orchestrator/controller/merge/BUILD.bazel +++ b/orchestrator/controller/merge/BUILD.bazel @@ -7,8 +7,10 @@ go_library( visibility = ["//visibility:public"], deps = [ "//core/consumer", + "//core/metrics", "//entity", "//entity/queue", + "//extension/pusher", "//extension/storage", "@com_github_uber_go_tally_v4//:tally", "@org_uber_go_zap//:zap", @@ -24,6 +26,8 @@ go_test( "//core/errs", "//entity", "//entity/queue", + "//extension/pusher", + "//extension/pusher/mock", "//extension/queue/mock", "//extension/storage/mock", "@com_github_stretchr_testify//assert", diff --git a/orchestrator/controller/merge/merge.go b/orchestrator/controller/merge/merge.go index 395e4c4c..d09c12db 100644 --- a/orchestrator/controller/merge/merge.go +++ b/orchestrator/controller/merge/merge.go @@ -16,24 +16,35 @@ package merge import ( "context" + "errors" "fmt" "github.com/uber-go/tally/v4" + "go.uber.org/zap" + "github.com/uber/submitqueue/core/consumer" + coremetrics "github.com/uber/submitqueue/core/metrics" "github.com/uber/submitqueue/entity" entityqueue "github.com/uber/submitqueue/entity/queue" + "github.com/uber/submitqueue/extension/pusher" "github.com/uber/submitqueue/extension/storage" - "go.uber.org/zap" ) -// Controller handles merge queue messages. -// It consumes batches, performs merges, and publishes to both conclude and speculate stages. -// Implements consumer.Controller interface for integration with the consumer. +// Controller handles merge queue messages. It loads every request in a batch, +// hands the resulting list of Changes to the configured Pusher, and +// transitions the batch to a terminal state based on the Pusher's outcome. +// After updating state it forwards the batch to conclude (so requests pick +// up the outcome) and to speculate (so downstream batches can re-plan). +// +// Conflicts are user-caused: the batch goes to BatchStateFailed and the +// queue message is acked. Any other Pusher error is treated as transient +// infra: the batch is left in place and the message is nacked. type Controller struct { logger *zap.SugaredLogger metricsScope tally.Scope store storage.Storage registry consumer.TopicRegistry + pusher pusher.Pusher topicKey consumer.TopicKey consumerGroup string } @@ -47,6 +58,7 @@ func NewController( scope tally.Scope, store storage.Storage, registry consumer.TopicRegistry, + pusherImpl pusher.Pusher, topicKey consumer.TopicKey, consumerGroup string, ) *Controller { @@ -55,30 +67,29 @@ func NewController( metricsScope: scope.SubScope("merge_controller"), store: store, registry: registry, + pusher: pusherImpl, topicKey: topicKey, consumerGroup: consumerGroup, } } -// Process processes a merge delivery from the queue. -// Deserializes the batch, performs the merge, and publishes to both conclude and speculate topics. +// Process performs the merge for a batch and forwards it to conclude/speculate. // Returns nil to ack (success), or error to nack (retry). -func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) error { - c.metricsScope.Counter("received").Inc(1) +func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (retErr error) { + op := coremetrics.Begin(c.metricsScope, "process") + defer func() { op.Complete(retErr) }() msg := delivery.Message() - // Deserialize batch ID from payload bid, err := entity.BatchIDFromBytes(msg.Payload) if err != nil { - c.metricsScope.Counter("deserialize_errors").Inc(1) + coremetrics.NamedCounter(c.metricsScope, "process", "deserialize_errors", 1) return fmt.Errorf("failed to deserialize batch ID: %w", err) } - // Fetch batch from storage batch, err := c.store.GetBatchStore().Get(ctx, bid.ID) if err != nil { - c.metricsScope.Counter("storage_errors").Inc(1) + coremetrics.NamedCounter(c.metricsScope, "process", "storage_errors", 1) return fmt.Errorf("failed to get batch %s: %w", bid.ID, err) } @@ -91,35 +102,81 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er "partition_key", msg.PartitionKey, ) - // TODO: Add merge logic - // - Perform source control merge operation - // - Handle merge conflicts + // Idempotency: if the batch is already in a terminal state, a previous + // attempt has already merged (or failed) — just re-fan-out the events + // in case downstream stages missed them. + if batch.State.IsTerminal() { + coremetrics.NamedCounter(c.metricsScope, "process", "skipped_terminal", 1) + return c.fanout(ctx, batch.ID, batch.Queue) + } - // Publish to conclude topic - if err := c.publish(ctx, consumer.TopicKeyConclude, batch.ID, batch.Queue); err != nil { - c.metricsScope.Counter("publish_errors").Inc(1) - return fmt.Errorf("failed to publish to conclude: %w", err) + changes, err := c.collectChanges(ctx, batch) + if err != nil { + coremetrics.NamedCounter(c.metricsScope, "process", "request_load_errors", 1) + return fmt.Errorf("failed to collect changes for batch %s: %w", batch.ID, err) } - c.logger.Infow("published batch to conclude", - "batch_id", batch.ID, - "topic_key", consumer.TopicKeyConclude, - ) + pushRes, pushErr := c.pusher.Push(ctx, changes) + + var newState entity.BatchState + switch { + case pushErr == nil: + newState = entity.BatchStateSucceeded + c.logger.Infow("merged batch", + "batch_id", batch.ID, + "outcomes", pushRes.Outcomes, + ) + case errors.Is(pushErr, pusher.ErrConflict): + coremetrics.NamedCounter(c.metricsScope, "process", "push_conflicts", 1) + newState = entity.BatchStateFailed + c.logger.Warnw("batch merge failed", + "batch_id", batch.ID, + "state", string(newState), + "error", pushErr, + ) + default: + coremetrics.NamedCounter(c.metricsScope, "process", "push_errors", 1) + return fmt.Errorf("push failed for batch %s: %w", batch.ID, pushErr) + } - // Publish to speculate topic - if err := c.publish(ctx, consumer.TopicKeySpeculate, batch.ID, batch.Queue); err != nil { - c.metricsScope.Counter("publish_errors").Inc(1) - return fmt.Errorf("failed to publish to speculate: %w", err) + newVersion := batch.Version + 1 + if err := c.store.GetBatchStore().UpdateState(ctx, batch.ID, batch.Version, newVersion, newState); err != nil { + coremetrics.NamedCounter(c.metricsScope, "process", "state_update_errors", 1) + return fmt.Errorf("failed to transition batch %s to %s: %w", batch.ID, newState, err) } + batch.Version = newVersion + batch.State = newState - c.logger.Infow("published batch to speculate", - "batch_id", batch.ID, - "topic_key", consumer.TopicKeySpeculate, - ) + return c.fanout(ctx, batch.ID, batch.Queue) +} - c.metricsScope.Counter("processed").Inc(1) +// collectChanges loads each request in batch.Contains and returns its +// Change. The result preserves batch.Contains order so the Pusher applies +// the changes in the same order the requests were batched. +func (c *Controller) collectChanges(ctx context.Context, batch entity.Batch) ([]entity.Change, error) { + changes := make([]entity.Change, 0, len(batch.Contains)) + for _, requestID := range batch.Contains { + request, err := c.store.GetRequestStore().Get(ctx, requestID) + if err != nil { + return nil, fmt.Errorf("failed to get request %s: %w", requestID, err) + } + changes = append(changes, request.Change) + } + return changes, nil +} - return nil // Success - message will be acked +// fanout publishes the batch ID to conclude (so requests are updated) and +// to speculate (so dependents can re-evaluate now that this batch is done). +func (c *Controller) fanout(ctx context.Context, batchID, partitionKey string) error { + if err := c.publish(ctx, consumer.TopicKeyConclude, batchID, partitionKey); err != nil { + coremetrics.NamedCounter(c.metricsScope, "process", "publish_conclude_errors", 1) + return fmt.Errorf("failed to publish to conclude: %w", err) + } + if err := c.publish(ctx, consumer.TopicKeySpeculate, batchID, partitionKey); err != nil { + coremetrics.NamedCounter(c.metricsScope, "process", "publish_speculate_errors", 1) + return fmt.Errorf("failed to publish to speculate: %w", err) + } + return nil } // publish publishes a batch ID to the specified topic key. diff --git a/orchestrator/controller/merge/merge_test.go b/orchestrator/controller/merge/merge_test.go index aa9f0738..351f4b42 100644 --- a/orchestrator/controller/merge/merge_test.go +++ b/orchestrator/controller/merge/merge_test.go @@ -22,14 +22,17 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/uber-go/tally/v4" + "go.uber.org/mock/gomock" + "go.uber.org/zap/zaptest" + "github.com/uber/submitqueue/core/consumer" "github.com/uber/submitqueue/core/errs" "github.com/uber/submitqueue/entity" "github.com/uber/submitqueue/entity/queue" + "github.com/uber/submitqueue/extension/pusher" + pushermock "github.com/uber/submitqueue/extension/pusher/mock" queuemock "github.com/uber/submitqueue/extension/queue/mock" storagemock "github.com/uber/submitqueue/extension/storage/mock" - "go.uber.org/mock/gomock" - "go.uber.org/zap/zaptest" ) // batchIDPayload serializes a BatchID to JSON bytes for test message payloads. @@ -39,121 +42,411 @@ func batchIDPayload(t *testing.T, id string) []byte { return payload } -// testBatch returns a standard test batch for merge tests. -func testBatch() entity.Batch { - return entity.Batch{ - ID: "test-queue/batch/1", - Queue: "test-queue", - State: entity.BatchStateCreated, - Version: 1, - } +func newDelivery(t *testing.T, ctrl *gomock.Controller, batchID, partitionKey string) *queuemock.MockDelivery { + msg := queue.NewMessage(batchID, batchIDPayload(t, batchID), partitionKey, nil) + delivery := queuemock.NewMockDelivery(ctrl) + delivery.EXPECT().Message().Return(msg).AnyTimes() + delivery.EXPECT().Attempt().Return(1).AnyTimes() + return delivery } -// newMockStorage creates a MockStorage with a MockBatchStore that returns the given batch on Get. -func newMockStorage(ctrl *gomock.Controller, batch entity.Batch) *storagemock.MockStorage { - mockBatchStore := storagemock.NewMockBatchStore(ctrl) - mockBatchStore.EXPECT().Get(gomock.Any(), batch.ID).Return(batch, nil).AnyTimes() +// newRegistry returns a registry where conclude and speculate accept any publish. +func newRegistry(t *testing.T, ctrl *gomock.Controller, publishErr error) consumer.TopicRegistry { + mockPub := queuemock.NewMockPublisher(ctrl) + mockPub.EXPECT().Publish(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( + func(_ context.Context, _ string, _ queue.Message) error { return publishErr }, + ).AnyTimes() + mockQ := queuemock.NewMockQueue(ctrl) + mockQ.EXPECT().Publisher().Return(mockPub).AnyTimes() + + registry, err := consumer.NewTopicRegistry([]consumer.TopicConfig{ + {Key: consumer.TopicKeyConclude, Name: "conclude", Queue: mockQ}, + {Key: consumer.TopicKeySpeculate, Name: "speculate", Queue: mockQ}, + }) + require.NoError(t, err) + return registry +} +func TestNewController(t *testing.T) { + ctrl := gomock.NewController(t) store := storagemock.NewMockStorage(ctrl) - store.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes() - return store + c := NewController( + zaptest.NewLogger(t).Sugar(), + tally.NoopScope, + store, + newRegistry(t, ctrl, nil), + pushermock.NewMockPusher(ctrl), + consumer.TopicKeyMerge, + "orchestrator-merge", + ) + + require.NotNil(t, c) + assert.Equal(t, consumer.TopicKeyMerge, c.TopicKey()) + assert.Equal(t, "orchestrator-merge", c.ConsumerGroup()) + assert.Equal(t, "merge", c.Name()) + var _ consumer.Controller = c } -// newTestController creates a controller with test dependencies. -func newTestController(t *testing.T, ctrl *gomock.Controller, store *storagemock.MockStorage, publishErr error) *Controller { - logger := zaptest.NewLogger(t).Sugar() - scope := tally.NoopScope +func TestController_Process_SuccessfulMerge(t *testing.T) { + ctrl := gomock.NewController(t) - mockPub := queuemock.NewMockPublisher(ctrl) - mockPub.EXPECT().Publish(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( - func(ctx context.Context, topic string, msg queue.Message) error { - return publishErr + const reqID = "test-queue/1" + const batchID = "test-queue/batch/1" + + batch := entity.Batch{ + ID: batchID, + Queue: "test-queue", + Contains: []string{reqID}, + State: entity.BatchStateFinalizing, + Version: 4, + } + change := entity.Change{URIs: []string{"github://o/r/1/sha"}} + request := entity.Request{ + ID: reqID, + Queue: "test-queue", + Change: change, + State: entity.RequestStateProcessing, + Version: 2, + } + + batchStore := storagemock.NewMockBatchStore(ctrl) + batchStore.EXPECT().Get(gomock.Any(), batchID).Return(batch, nil) + batchStore.EXPECT().UpdateState(gomock.Any(), batchID, int32(4), int32(5), entity.BatchStateSucceeded).Return(nil) + + requestStore := storagemock.NewMockRequestStore(ctrl) + requestStore.EXPECT().Get(gomock.Any(), reqID).Return(request, nil) + + store := storagemock.NewMockStorage(ctrl) + store.EXPECT().GetBatchStore().Return(batchStore).AnyTimes() + store.EXPECT().GetRequestStore().Return(requestStore).AnyTimes() + + mockPusher := pushermock.NewMockPusher(ctrl) + mockPusher.EXPECT().Push(gomock.Any(), gomock.Any()).DoAndReturn( + func(_ context.Context, changes []entity.Change) (pusher.Result, error) { + require.Len(t, changes, 1) + assert.Equal(t, change, changes[0]) + return pusher.Result{Outcomes: []pusher.ChangeOutcome{{ + Change: change, + Status: pusher.OutcomeStatusCommitted, + CommitSHAs: []string{"deadbeef"}, + }}}, nil }, - ).AnyTimes() + ) - mockQ := queuemock.NewMockQueue(ctrl) - mockQ.EXPECT().Publisher().Return(mockPub).AnyTimes() + c := NewController( + zaptest.NewLogger(t).Sugar(), + tally.NoopScope, + store, + newRegistry(t, ctrl, nil), + mockPusher, + consumer.TopicKeyMerge, + "orchestrator-merge", + ) + + err := c.Process(context.Background(), newDelivery(t, ctrl, batchID, batch.Queue)) + require.NoError(t, err) +} + +func TestController_Process_PassesAllChangesInBatchOrder(t *testing.T) { + ctrl := gomock.NewController(t) + + const batchID = "test-queue/batch/multi" + requestIDs := []string{"test-queue/1", "test-queue/2", "test-queue/3"} + changes := []entity.Change{ + {URIs: []string{"github://o/r/1/sha1"}}, + {URIs: []string{"github://o/r/2/sha2"}}, + {URIs: []string{"github://o/r/3/sha3"}}, + } + + batch := entity.Batch{ + ID: batchID, + Queue: "test-queue", + Contains: requestIDs, + State: entity.BatchStateFinalizing, + Version: 1, + } - registry, err := consumer.NewTopicRegistry( - []consumer.TopicConfig{ - {Key: consumer.TopicKeyConclude, Name: "conclude", Queue: mockQ}, - {Key: consumer.TopicKeySpeculate, Name: "speculate", Queue: mockQ}, + batchStore := storagemock.NewMockBatchStore(ctrl) + batchStore.EXPECT().Get(gomock.Any(), batchID).Return(batch, nil) + batchStore.EXPECT().UpdateState(gomock.Any(), batchID, int32(1), int32(2), entity.BatchStateSucceeded).Return(nil) + + requestStore := storagemock.NewMockRequestStore(ctrl) + for i, rid := range requestIDs { + requestStore.EXPECT().Get(gomock.Any(), rid).Return(entity.Request{ + ID: rid, Change: changes[i], + }, nil) + } + + store := storagemock.NewMockStorage(ctrl) + store.EXPECT().GetBatchStore().Return(batchStore).AnyTimes() + store.EXPECT().GetRequestStore().Return(requestStore).AnyTimes() + + mockPusher := pushermock.NewMockPusher(ctrl) + mockPusher.EXPECT().Push(gomock.Any(), gomock.Any()).DoAndReturn( + func(_ context.Context, got []entity.Change) (pusher.Result, error) { + assert.Equal(t, changes, got, "changes must be in batch.Contains order") + outcomes := make([]pusher.ChangeOutcome, len(got)) + for i, ch := range got { + outcomes[i] = pusher.ChangeOutcome{ + Change: ch, + Status: pusher.OutcomeStatusCommitted, + CommitSHAs: []string{fmt.Sprintf("sha-%d", i)}, + } + } + return pusher.Result{Outcomes: outcomes}, nil }, ) + + c := NewController( + zaptest.NewLogger(t).Sugar(), + tally.NoopScope, + store, + newRegistry(t, ctrl, nil), + mockPusher, + consumer.TopicKeyMerge, + "orchestrator-merge", + ) + + err := c.Process(context.Background(), newDelivery(t, ctrl, batchID, batch.Queue)) require.NoError(t, err) +} + +func TestController_Process_PushConflictMarksBatchFailed(t *testing.T) { + ctrl := gomock.NewController(t) + + const reqID = "test-queue/2" + const batchID = "test-queue/batch/2" + + batch := entity.Batch{ + ID: batchID, + Queue: "test-queue", + Contains: []string{reqID}, + State: entity.BatchStateFinalizing, + Version: 3, + } + + batchStore := storagemock.NewMockBatchStore(ctrl) + batchStore.EXPECT().Get(gomock.Any(), batchID).Return(batch, nil) + batchStore.EXPECT().UpdateState(gomock.Any(), batchID, int32(3), int32(4), entity.BatchStateFailed).Return(nil) - return NewController(logger, scope, store, registry, consumer.TopicKeyMerge, "orchestrator-merge") + requestStore := storagemock.NewMockRequestStore(ctrl) + requestStore.EXPECT().Get(gomock.Any(), reqID).Return(entity.Request{ + ID: reqID, Change: entity.Change{URIs: []string{"github://o/r/2/sha"}}, + }, nil) + + store := storagemock.NewMockStorage(ctrl) + store.EXPECT().GetBatchStore().Return(batchStore).AnyTimes() + store.EXPECT().GetRequestStore().Return(requestStore).AnyTimes() + + mockPusher := pushermock.NewMockPusher(ctrl) + mockPusher.EXPECT().Push(gomock.Any(), gomock.Any()).Return( + pusher.Result{}, + fmt.Errorf("apply: %w", pusher.ErrConflict), + ) + + c := NewController( + zaptest.NewLogger(t).Sugar(), + tally.NoopScope, + store, + newRegistry(t, ctrl, nil), + mockPusher, + consumer.TopicKeyMerge, + "orchestrator-merge", + ) + + err := c.Process(context.Background(), newDelivery(t, ctrl, batchID, batch.Queue)) + require.NoError(t, err, "conflict ack-s the message; failure is recorded on the batch") } -func TestNewController(t *testing.T) { +func TestController_Process_PushInfraFailureReturnsError(t *testing.T) { ctrl := gomock.NewController(t) - batch := testBatch() - store := newMockStorage(ctrl, batch) - controller := newTestController(t, ctrl, store, nil) - - require.NotNil(t, controller) - assert.Equal(t, consumer.TopicKeyMerge, controller.TopicKey()) - assert.Equal(t, "orchestrator-merge", controller.ConsumerGroup()) - assert.Equal(t, "merge", controller.Name()) + + const reqID = "test-queue/3" + const batchID = "test-queue/batch/3" + + batch := entity.Batch{ + ID: batchID, + Queue: "test-queue", + Contains: []string{reqID}, + State: entity.BatchStateFinalizing, + Version: 1, + } + + batchStore := storagemock.NewMockBatchStore(ctrl) + batchStore.EXPECT().Get(gomock.Any(), batchID).Return(batch, nil) + + requestStore := storagemock.NewMockRequestStore(ctrl) + requestStore.EXPECT().Get(gomock.Any(), reqID).Return(entity.Request{ + ID: reqID, Change: entity.Change{URIs: []string{"github://o/r/3/sha"}}, + }, nil) + + store := storagemock.NewMockStorage(ctrl) + store.EXPECT().GetBatchStore().Return(batchStore).AnyTimes() + store.EXPECT().GetRequestStore().Return(requestStore).AnyTimes() + + mockPusher := pushermock.NewMockPusher(ctrl) + mockPusher.EXPECT().Push(gomock.Any(), gomock.Any()).Return( + pusher.Result{}, + fmt.Errorf("ssh: connection refused"), + ) + + c := NewController( + zaptest.NewLogger(t).Sugar(), + tally.NoopScope, + store, + newRegistry(t, ctrl, nil), + mockPusher, + consumer.TopicKeyMerge, + "orchestrator-merge", + ) + + err := c.Process(context.Background(), newDelivery(t, ctrl, batchID, batch.Queue)) + require.Error(t, err) } -func TestController_Process_Success(t *testing.T) { +func TestController_Process_TerminalBatchSkipsPushButFansOut(t *testing.T) { ctrl := gomock.NewController(t) - batch := testBatch() - store := newMockStorage(ctrl, batch) - controller := newTestController(t, ctrl, store, nil) + const batchID = "test-queue/batch/4" - msg := queue.NewMessage(batch.ID, batchIDPayload(t, batch.ID), batch.Queue, nil) - delivery := queuemock.NewMockDelivery(ctrl) - delivery.EXPECT().Message().Return(msg).AnyTimes() - delivery.EXPECT().Attempt().Return(1).AnyTimes() + batch := entity.Batch{ + ID: batchID, + Queue: "test-queue", + State: entity.BatchStateSucceeded, + Version: 7, + } + + batchStore := storagemock.NewMockBatchStore(ctrl) + batchStore.EXPECT().Get(gomock.Any(), batchID).Return(batch, nil) + + store := storagemock.NewMockStorage(ctrl) + store.EXPECT().GetBatchStore().Return(batchStore).AnyTimes() + + // Push must NOT be called for an already-terminal batch. + mockPusher := pushermock.NewMockPusher(ctrl) + + c := NewController( + zaptest.NewLogger(t).Sugar(), + tally.NoopScope, + store, + newRegistry(t, ctrl, nil), + mockPusher, + consumer.TopicKeyMerge, + "orchestrator-merge", + ) - err := controller.Process(context.Background(), delivery) + err := c.Process(context.Background(), newDelivery(t, ctrl, batchID, batch.Queue)) require.NoError(t, err) } -func TestController_Process_StorageFailure(t *testing.T) { +func TestController_Process_BatchStoreGetFailureNotRetryable(t *testing.T) { ctrl := gomock.NewController(t) - mockBatchStore := storagemock.NewMockBatchStore(ctrl) - mockBatchStore.EXPECT().Get(gomock.Any(), "test-queue/batch/1").Return(entity.Batch{}, fmt.Errorf("db connection lost")) - store := storagemock.NewMockStorage(ctrl) - store.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes() + const batchID = "test-queue/batch/5" - controller := newTestController(t, ctrl, store, nil) + batchStore := storagemock.NewMockBatchStore(ctrl) + batchStore.EXPECT().Get(gomock.Any(), batchID).Return(entity.Batch{}, fmt.Errorf("db connection lost")) - msg := queue.NewMessage("test-queue/batch/1", batchIDPayload(t, "test-queue/batch/1"), "test-queue", nil) - delivery := queuemock.NewMockDelivery(ctrl) - delivery.EXPECT().Message().Return(msg).AnyTimes() - delivery.EXPECT().Attempt().Return(1).AnyTimes() + store := storagemock.NewMockStorage(ctrl) + store.EXPECT().GetBatchStore().Return(batchStore).AnyTimes() + + c := NewController( + zaptest.NewLogger(t).Sugar(), + tally.NoopScope, + store, + newRegistry(t, ctrl, nil), + pushermock.NewMockPusher(ctrl), + consumer.TopicKeyMerge, + "orchestrator-merge", + ) - err := controller.Process(context.Background(), delivery) + err := c.Process(context.Background(), newDelivery(t, ctrl, batchID, "test-queue")) require.Error(t, err) assert.False(t, errs.IsRetryable(err)) } -func TestController_Process_PublishFailure(t *testing.T) { +func TestController_Process_RequestStoreFailurePropagates(t *testing.T) { ctrl := gomock.NewController(t) - batch := testBatch() - store := newMockStorage(ctrl, batch) - controller := newTestController(t, ctrl, store, fmt.Errorf("publish failed")) + const reqID = "test-queue/6" + const batchID = "test-queue/batch/6" - msg := queue.NewMessage(batch.ID, batchIDPayload(t, batch.ID), batch.Queue, nil) - delivery := queuemock.NewMockDelivery(ctrl) - delivery.EXPECT().Message().Return(msg).AnyTimes() - delivery.EXPECT().Attempt().Return(1).AnyTimes() + batch := entity.Batch{ + ID: batchID, + Queue: "test-queue", + Contains: []string{reqID}, + State: entity.BatchStateFinalizing, + Version: 1, + } + + batchStore := storagemock.NewMockBatchStore(ctrl) + batchStore.EXPECT().Get(gomock.Any(), batchID).Return(batch, nil) + + requestStore := storagemock.NewMockRequestStore(ctrl) + requestStore.EXPECT().Get(gomock.Any(), reqID).Return(entity.Request{}, fmt.Errorf("db connection lost")) + + store := storagemock.NewMockStorage(ctrl) + store.EXPECT().GetBatchStore().Return(batchStore).AnyTimes() + store.EXPECT().GetRequestStore().Return(requestStore).AnyTimes() - err := controller.Process(context.Background(), delivery) - assert.Error(t, err) + c := NewController( + zaptest.NewLogger(t).Sugar(), + tally.NoopScope, + store, + newRegistry(t, ctrl, nil), + pushermock.NewMockPusher(ctrl), + consumer.TopicKeyMerge, + "orchestrator-merge", + ) + + err := c.Process(context.Background(), newDelivery(t, ctrl, batchID, batch.Queue)) + require.Error(t, err) } -func TestController_InterfaceImplementation(t *testing.T) { +func TestController_Process_PublishFailureSurfaces(t *testing.T) { ctrl := gomock.NewController(t) - batch := testBatch() - store := newMockStorage(ctrl, batch) - controller := newTestController(t, ctrl, store, nil) - var _ consumer.Controller = controller + const reqID = "test-queue/7" + const batchID = "test-queue/batch/7" + + batch := entity.Batch{ + ID: batchID, + Queue: "test-queue", + Contains: []string{reqID}, + State: entity.BatchStateFinalizing, + Version: 2, + } + + batchStore := storagemock.NewMockBatchStore(ctrl) + batchStore.EXPECT().Get(gomock.Any(), batchID).Return(batch, nil) + batchStore.EXPECT().UpdateState(gomock.Any(), batchID, int32(2), int32(3), entity.BatchStateSucceeded).Return(nil) + + requestStore := storagemock.NewMockRequestStore(ctrl) + requestStore.EXPECT().Get(gomock.Any(), reqID).Return(entity.Request{ + ID: reqID, Change: entity.Change{URIs: []string{"github://o/r/7/sha"}}, + }, nil) + + store := storagemock.NewMockStorage(ctrl) + store.EXPECT().GetBatchStore().Return(batchStore).AnyTimes() + store.EXPECT().GetRequestStore().Return(requestStore).AnyTimes() + + mockPusher := pushermock.NewMockPusher(ctrl) + mockPusher.EXPECT().Push(gomock.Any(), gomock.Any()).Return( + pusher.Result{Outcomes: []pusher.ChangeOutcome{{ + Status: pusher.OutcomeStatusCommitted, CommitSHAs: []string{"abc"}, + }}}, nil, + ) + + c := NewController( + zaptest.NewLogger(t).Sugar(), + tally.NoopScope, + store, + newRegistry(t, ctrl, fmt.Errorf("queue down")), + mockPusher, + consumer.TopicKeyMerge, + "orchestrator-merge", + ) + + err := c.Process(context.Background(), newDelivery(t, ctrl, batchID, batch.Queue)) + require.Error(t, err) }