diff --git a/extension/landprovider/github/BUILD.bazel b/extension/landprovider/github/BUILD.bazel new file mode 100644 index 00000000..e0923574 --- /dev/null +++ b/extension/landprovider/github/BUILD.bazel @@ -0,0 +1,37 @@ +load("@rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "github", + srcs = [ + "land.go", + "merge.go", + "validate.go", + ], + importpath = "github.com/uber/submitqueue/extension/landprovider/github", + visibility = ["//visibility:public"], + deps = [ + "//entity", + "//entity/github", + "//extension/landprovider", + "@com_github_uber_go_tally_v4//:tally", + "@org_uber_go_zap//:zap", + ], +) + +go_test( + name = "github_test", + srcs = [ + "land_test.go", + "merge_test.go", + "validate_test.go", + ], + embed = [":github"], + deps = [ + "//entity", + "//extension/landprovider", + "@com_github_stretchr_testify//assert", + "@com_github_stretchr_testify//require", + "@com_github_uber_go_tally_v4//:tally", + "@org_uber_go_zap//zaptest", + ], +) diff --git a/extension/landprovider/github/land.go b/extension/landprovider/github/land.go new file mode 100644 index 00000000..339bade4 --- /dev/null +++ b/extension/landprovider/github/land.go @@ -0,0 +1,94 @@ +package github + +import ( + "context" + "fmt" + "net/http" + + "github.com/uber-go/tally/v4" + "github.com/uber/submitqueue/entity" + entitygithub "github.com/uber/submitqueue/entity/github" + "github.com/uber/submitqueue/extension/landprovider" + "go.uber.org/zap" +) + +// Params holds the dependencies for the GitHub LandProvider. +type Params struct { + // HTTPClient is a pre-configured HTTP client with auth (bearer token, GitHub App JWT, etc.). + // Auth is the caller's responsibility via HTTP transport/round-tripper. + HTTPClient *http.Client + // APIURL is the GitHub REST API base URL + // (e.g., "https://api.github.com" or "https://ghe.example.com/api/v3"). + APIURL string + // Logger is the structured logger. + Logger *zap.SugaredLogger + // MetricsScope is the metrics scope for instrumentation. + MetricsScope tally.Scope +} + +// landProvider implements the landprovider.LandProvider interface using the GitHub REST API. +// +// Limitation: this implementation supports landing exactly one PR per call. +// Landing multiple PRs in a single call is not idempotent — if PR N merges +// successfully but PR N+1 fails, a retry would fail on the already-merged PR. +// Callers needing multi-PR landing should use an implementation that provides +// atomic or idempotent batch semantics. +type landProvider struct { + httpClient *http.Client + apiURL string + logger *zap.SugaredLogger + metricsScope tally.Scope +} + +// Verify landProvider implements landprovider.LandProvider at compile time. +var _ landprovider.LandProvider = (*landProvider)(nil) + +// NewLandProvider creates a new GitHub LandProvider. +func NewLandProvider(params Params) landprovider.LandProvider { + return &landProvider{ + httpClient: params.HTTPClient, + apiURL: params.APIURL, + logger: params.Logger.Named("github_landprovider"), + metricsScope: params.MetricsScope.SubScope("github_landprovider"), + } +} + +// Land merges a single PR into the target branch using the GitHub merge PR REST API. +// Returns an error if entries contain more than one PR, since merging multiple PRs +// is not idempotent — a partial failure leaves already-merged PRs in a state that +// cannot be retried. +// +// For single-PR landing, idempotency is ensured by checking if the PR is already +// merged before attempting the merge. Returns ErrAlreadyLanded if so. +func (l *landProvider) Land(ctx context.Context, queue string, entries []entity.LandEntry) error { + l.metricsScope.Counter("land_started").Inc(1) + + if err := validateSinglePR(entries); err != nil { + l.metricsScope.Counter("validation_errors").Inc(1) + return err + } + + entry := entries[0] + uri := entry.Change.URIs[0] + + cid, err := entitygithub.ParseChangeID(uri) + if err != nil { + l.metricsScope.Counter("parse_errors").Inc(1) + return fmt.Errorf("failed to parse change ID %q: %w", uri, err) + } + + if err := l.mergePR(ctx, cid, entry.Strategy); err != nil { + switch { + case landprovider.IsAlreadyLanded(err): + l.metricsScope.Counter("already_landed").Inc(1) + case landprovider.IsLandRejected(err): + l.metricsScope.Counter("land_rejected").Inc(1) + default: + l.metricsScope.Counter("api_errors").Inc(1) + } + return err + } + + l.metricsScope.Counter("land_succeeded").Inc(1) + return nil +} diff --git a/extension/landprovider/github/land_test.go b/extension/landprovider/github/land_test.go new file mode 100644 index 00000000..af4dec50 --- /dev/null +++ b/extension/landprovider/github/land_test.go @@ -0,0 +1,162 @@ +package github + +import ( + "context" + "encoding/json" + "net/http" + "net/http/httptest" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/uber-go/tally/v4" + "github.com/uber/submitqueue/entity" + "github.com/uber/submitqueue/extension/landprovider" + "go.uber.org/zap/zaptest" +) + +func newTestLandProvider(t *testing.T, serverURL string) landprovider.LandProvider { + logger := zaptest.NewLogger(t).Sugar() + scope := tally.NoopScope + + return NewLandProvider(Params{ + HTTPClient: &http.Client{}, + APIURL: serverURL, + Logger: logger, + MetricsScope: scope, + }) +} + +func mergeHandler(t *testing.T, statusCode int, message string) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + t.Helper() + + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(statusCode) + resp := mergeResponse{Message: message} + err := json.NewEncoder(w).Encode(resp) + require.NoError(t, err) + } +} + +// prMergedHandler returns a handler for GET /repos/{owner}/{repo}/pulls/{number}/merge. +// Returns 204 if merged, 404 if not (empty body, matching the GitHub API). +func prMergedHandler(merged bool) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + if merged { + w.WriteHeader(http.StatusNoContent) + } else { + w.WriteHeader(http.StatusNotFound) + } + } +} + +// routingHandler routes PUT (merge) and GET (PR state) requests to separate handlers. +func routingHandler(putHandler, getHandler http.HandlerFunc) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + switch r.Method { + case http.MethodPut: + putHandler(w, r) + case http.MethodGet: + getHandler(w, r) + default: + w.WriteHeader(http.StatusMethodNotAllowed) + } + } +} + +func TestLandProvider_Land(t *testing.T) { + singleEntry := func(uri string) []entity.LandEntry { + return []entity.LandEntry{{ + Strategy: entity.RequestLandStrategyRebase, + Change: entity.Change{URIs: []string{uri}}, + }} + } + + tests := []struct { + name string + handler http.HandlerFunc + entries []entity.LandEntry + wantErr bool + rejected bool + alreadyLanded bool + }{ + { + // isPRMerged → not merged → mergePR → 200 OK + name: "success", + handler: routingHandler(mergeHandler(t, http.StatusOK, "Pull Request successfully merged"), prMergedHandler(false)), + entries: singleEntry("github://uber/repo/1/abc123"), + }, + { + // Land → ParseChangeID fails (no server call) + name: "invalid change ID", + handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + require.Fail(t, "should not reach server") + }), + entries: singleEntry("invalid-id"), + wantErr: true, + }, + { + // isPRMerged → not merged → mergePR → 409 → WrapLandRejected + name: "land rejected", + handler: routingHandler(mergeHandler(t, http.StatusConflict, "Head branch was modified"), prMergedHandler(false)), + entries: singleEntry("github://uber/repo/1/abc123"), + wantErr: true, + rejected: true, + }, + { + // isPRMerged → already merged → ErrAlreadyLanded (no merge attempt) + name: "already merged - idempotent retry", + handler: routingHandler(mergeHandler(t, http.StatusOK, "should not be called"), prMergedHandler(true)), + entries: singleEntry("github://uber/repo/1/abc123"), + wantErr: true, + alreadyLanded: true, + }, + { + // isPRMerged → not merged → mergePR → 500 → plain error + name: "server error", + handler: routingHandler( + http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusInternalServerError) + _, _ = w.Write([]byte("internal server error")) + }), + prMergedHandler(false), + ), + entries: singleEntry("github://uber/repo/1/abc123"), + wantErr: true, + }, + { + // isPRMerged → unexpected status code (500) → error propagated + name: "merge status check error", + handler: routingHandler( + mergeHandler(t, http.StatusOK, "should not be called"), + http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusInternalServerError) + }), + ), + entries: singleEntry("github://uber/repo/1/abc123"), + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + server := httptest.NewServer(tt.handler) + defer server.Close() + + lp := newTestLandProvider(t, server.URL) + err := lp.Land(context.Background(), "test-queue", tt.entries) + if tt.wantErr { + require.Error(t, err) + if tt.rejected { + assert.True(t, landprovider.IsLandRejected(err)) + } + if tt.alreadyLanded { + assert.True(t, landprovider.IsAlreadyLanded(err)) + } + return + } + require.NoError(t, err) + }) + } +} diff --git a/extension/landprovider/github/merge.go b/extension/landprovider/github/merge.go new file mode 100644 index 00000000..1cc982c4 --- /dev/null +++ b/extension/landprovider/github/merge.go @@ -0,0 +1,166 @@ +package github + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net/http" + + "github.com/uber/submitqueue/entity" + entitygithub "github.com/uber/submitqueue/entity/github" + "github.com/uber/submitqueue/extension/landprovider" +) + +// mergeMethod is the GitHub merge method for the REST API. +type mergeMethod string + +const ( + // mergeMethodMerge creates a merge commit. + mergeMethodMerge mergeMethod = "merge" + // mergeMethodSquash squashes all commits into one before merging. + mergeMethodSquash mergeMethod = "squash" + // mergeMethodRebase rebases commits onto the base branch. + mergeMethodRebase mergeMethod = "rebase" +) + +// mergeRequest is the request body for the GitHub merge PR REST API. +type mergeRequest struct { + // MergeMethod is the merge strategy to use. + MergeMethod mergeMethod `json:"merge_method"` + // SHA is the head SHA to verify before merging. + SHA string `json:"sha"` +} + +// mergeResponse is the response body from the GitHub merge PR REST API. +type mergeResponse struct { + // Message is a human-readable result message. + Message string `json:"message"` +} + +// mapStrategyToMergeMethod maps a RequestLandStrategy to a GitHub merge method. +func mapStrategyToMergeMethod(strategy entity.RequestLandStrategy) (mergeMethod, error) { + switch strategy { + case entity.RequestLandStrategyRebase: + return mergeMethodRebase, nil + case entity.RequestLandStrategySquashRebase: + return mergeMethodSquash, nil + case entity.RequestLandStrategyMerge: + return mergeMethodMerge, nil + default: + return "", fmt.Errorf("unsupported land strategy: %q", strategy) + } +} + +// mergePR calls the GitHub REST API to merge a single pull request. +// Checks if the PR is already merged before attempting, to ensure idempotency. +func (l *landProvider) mergePR(ctx context.Context, cid entitygithub.ChangeID, strategy entity.RequestLandStrategy) error { + // Check if already merged before attempting the merge. + merged, err := l.isPRMerged(ctx, cid) + if err != nil { + return fmt.Errorf("failed to check PR merge status: %w", err) + } + if merged { + l.logger.Infow("PR already merged", + "pr", cid.PRNumber, + "owner", cid.Org, + "repo", cid.Repo, + ) + return landprovider.ErrAlreadyLanded + } + + // Build the merge request + method, err := mapStrategyToMergeMethod(strategy) + if err != nil { + return err + } + + reqBody, err := json.Marshal(mergeRequest{ + MergeMethod: method, + SHA: cid.HeadCommitSHA, + }) + if err != nil { + return fmt.Errorf("failed to marshal merge request: %w", err) + } + + // PUT /repos/{owner}/{repo}/pulls/{number}/merge + url := fmt.Sprintf("%s/repos/%s/%s/pulls/%d/merge", l.apiURL, cid.Org, cid.Repo, cid.PRNumber) + + httpReq, err := http.NewRequestWithContext(ctx, http.MethodPut, url, bytes.NewReader(reqBody)) + if err != nil { + return fmt.Errorf("failed to create http request: %w", err) + } + httpReq.Header.Set("Content-Type", "application/json") + + resp, err := l.httpClient.Do(httpReq) + if err != nil { + return fmt.Errorf("merge request failed: %w", err) + } + defer resp.Body.Close() + + body, err := io.ReadAll(resp.Body) + if err != nil { + return fmt.Errorf("failed to read merge response: %w", err) + } + + // Classify the response: rejection statuses are terminal (no retry), + // 200 is success, anything else is an infra error (retryable). + switch resp.StatusCode { + case http.StatusMethodNotAllowed, http.StatusConflict, http.StatusUnprocessableEntity: + // 405: merge cannot be performed (conflicts, draft, closed). + // 409: head SHA does not match the sha parameter (stale change). + // 422: validation failed (required checks, disallowed merge method). + var mergeResp mergeResponse + if err := json.Unmarshal(body, &mergeResp); err != nil { + return landprovider.WrapLandRejected( + fmt.Errorf("PR #%d merge rejected (status %d): %s", cid.PRNumber, resp.StatusCode, string(body)), + ) + } + return landprovider.WrapLandRejected( + fmt.Errorf("PR #%d: %s", cid.PRNumber, mergeResp.Message), + ) + case http.StatusOK: + // Success — fall through to happy path. + default: + return fmt.Errorf("unexpected status %d merging PR #%d: %s", resp.StatusCode, cid.PRNumber, string(body)) + } + + l.logger.Infow("PR merged successfully", + "pr", cid.PRNumber, + "owner", cid.Org, + "repo", cid.Repo, + "method", method, + ) + + return nil +} + +// isPRMerged checks whether a pull request has already been merged. +// Uses the dedicated GitHub "check if merged" endpoint which returns +// 204 if merged, 404 if not merged (empty response body). +func (l *landProvider) isPRMerged(ctx context.Context, cid entitygithub.ChangeID) (bool, error) { + // GET /repos/{owner}/{repo}/pulls/{number}/merge + // Returns 204 (merged) or 404 (not merged) with an empty body. + url := fmt.Sprintf("%s/repos/%s/%s/pulls/%d/merge", l.apiURL, cid.Org, cid.Repo, cid.PRNumber) + + httpReq, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) + if err != nil { + return false, fmt.Errorf("failed to create http request: %w", err) + } + + resp, err := l.httpClient.Do(httpReq) + if err != nil { + return false, fmt.Errorf("failed to check PR merge status: %w", err) + } + defer resp.Body.Close() + + switch resp.StatusCode { + case http.StatusNoContent: + return true, nil + case http.StatusNotFound: + return false, nil + default: + return false, fmt.Errorf("unexpected status %d checking merge status of PR #%d", resp.StatusCode, cid.PRNumber) + } +} diff --git a/extension/landprovider/github/merge_test.go b/extension/landprovider/github/merge_test.go new file mode 100644 index 00000000..14e811bc --- /dev/null +++ b/extension/landprovider/github/merge_test.go @@ -0,0 +1,56 @@ +package github + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/uber/submitqueue/entity" +) + +func TestMapStrategyToMergeMethod(t *testing.T) { + tests := []struct { + name string + strategy entity.RequestLandStrategy + want mergeMethod + wantErr bool + }{ + { + name: "rebase strategy", + strategy: entity.RequestLandStrategyRebase, + want: mergeMethodRebase, + }, + { + name: "squash rebase strategy", + strategy: entity.RequestLandStrategySquashRebase, + want: mergeMethodSquash, + }, + { + name: "merge strategy", + strategy: entity.RequestLandStrategyMerge, + want: mergeMethodMerge, + }, + { + name: "unknown strategy", + strategy: entity.RequestLandStrategyUnknown, + wantErr: true, + }, + { + name: "invalid strategy", + strategy: entity.RequestLandStrategy("invalid"), + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := mapStrategyToMergeMethod(tt.strategy) + if tt.wantErr { + require.Error(t, err) + return + } + require.NoError(t, err) + assert.Equal(t, tt.want, got) + }) + } +} diff --git a/extension/landprovider/github/validate.go b/extension/landprovider/github/validate.go new file mode 100644 index 00000000..851bef8a --- /dev/null +++ b/extension/landprovider/github/validate.go @@ -0,0 +1,34 @@ +package github + +import ( + "fmt" + + "github.com/uber/submitqueue/entity" +) + +// validateSinglePR ensures entries contain exactly one PR. +// This implementation does not support batch landing because the GitHub merge API +// operates on individual PRs, making multi-PR landing non-idempotent on retry. +func validateSinglePR(entries []entity.LandEntry) error { + if len(entries) == 0 { + return fmt.Errorf("no entries to land") + } + + var totalURIs int + for _, entry := range entries { + totalURIs += len(entry.Change.URIs) + } + + if totalURIs == 0 { + return fmt.Errorf("no change URIs to land") + } + if totalURIs > 1 { + return fmt.Errorf("this implementation supports landing exactly one PR per call, got %d", totalURIs) + } + + if len(entries[0].Change.URIs) == 0 { + return fmt.Errorf("no change URIs in first entry") + } + + return nil +} diff --git a/extension/landprovider/github/validate_test.go b/extension/landprovider/github/validate_test.go new file mode 100644 index 00000000..104a4f17 --- /dev/null +++ b/extension/landprovider/github/validate_test.go @@ -0,0 +1,74 @@ +package github + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/uber/submitqueue/entity" +) + +func TestValidateSinglePR(t *testing.T) { + validEntry := entity.LandEntry{ + Strategy: entity.RequestLandStrategyRebase, + Change: entity.Change{URIs: []string{"github://uber/repo/1/abc123"}}, + } + + tests := []struct { + name string + entries []entity.LandEntry + wantErr bool + }{ + { + name: "single entry single URI", + entries: []entity.LandEntry{validEntry}, + }, + { + // len(entries) == 0 + name: "no entries", + entries: []entity.LandEntry{}, + wantErr: true, + }, + { + // totalURIs == 0 + name: "entry with no URIs", + entries: []entity.LandEntry{{Strategy: entity.RequestLandStrategyRebase, Change: entity.Change{URIs: []string{}}}}, + wantErr: true, + }, + { + // totalURIs > 1 via multiple entries + name: "multiple entries", + entries: []entity.LandEntry{validEntry, validEntry}, + wantErr: true, + }, + { + // totalURIs > 1 via multiple URIs in one entry + name: "multiple URIs in single entry", + entries: []entity.LandEntry{{ + Strategy: entity.RequestLandStrategyRebase, + Change: entity.Change{URIs: []string{"github://uber/repo/1/sha1", "github://uber/repo/2/sha2"}}, + }}, + wantErr: true, + }, + { + // len(entries[0].Change.URIs) == 0 with URI in second entry + name: "first entry empty second has URI", + entries: []entity.LandEntry{ + {Strategy: entity.RequestLandStrategyRebase, Change: entity.Change{URIs: []string{}}}, + validEntry, + }, + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := validateSinglePR(tt.entries) + if tt.wantErr { + require.Error(t, err) + return + } + assert.NoError(t, err) + }) + } +}