Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion internal/actions/checksync/checksync.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
actionsdb "github.com/tenseleyFlow/shithub/internal/actions/sqlc"
"github.com/tenseleyFlow/shithub/internal/checks"
checksdb "github.com/tenseleyFlow/shithub/internal/checks/sqlc"
"github.com/tenseleyFlow/shithub/internal/pulls/mergeenqueue"
)

// Deps wires check synchronization to postgres and logging.
Expand Down Expand Up @@ -91,7 +92,16 @@ func Job(ctx context.Context, deps Deps, job actionsdb.WorkflowJob) error {
return nil
}
_, err = checks.Update(ctx, checks.Deps{Pool: deps.Pool, Logger: deps.Logger}, params)
return err
if err != nil {
return err
}
// S64: when the Actions runner reports a terminal job state, fan out
// pr:mergeability ticks to every open PR sharing the head SHA so a
// required-checks gate can flip blocked → clean.
if params.Status == "completed" {
mergeenqueue.ForHeadSHA(ctx, deps.Pool, deps.Logger, run.RepoID, run.HeadSha)
}
return nil
}

func timeFromPg(ts pgtype.Timestamptz) time.Time {
Expand Down
63 changes: 63 additions & 0 deletions internal/pulls/mergeenqueue/mergeenqueue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// SPDX-License-Identifier: AGPL-3.0-or-later

// Package mergeenqueue centralizes pr:mergeability job enqueueing so
// triggers across the codebase (PR create, head sync, check completion,
// review submit) share a single, side-effect-only helper. Lives outside
// the pulls orchestrator package to break the
// pulls → actions/trigger → actions/checksync import cycle.
//
// Best-effort: every helper here logs on failure and returns nothing.
// pr:mergeability is idempotent — a missed enqueue gets picked up by
// the next trigger.
package mergeenqueue

import (
"context"
"log/slog"

"github.com/jackc/pgx/v5/pgxpool"

pullsdb "github.com/tenseleyFlow/shithub/internal/pulls/sqlc"
"github.com/tenseleyFlow/shithub/internal/worker"
)

// ForPR enqueues a pr:mergeability job for prID. trigger is a short
// label used for logs / future metrics ("pr_create", "head_sync",
// "check_complete", "review_submit").
func ForPR(ctx context.Context, pool *pgxpool.Pool, logger *slog.Logger, prID int64, trigger string) {
if pool == nil || prID == 0 {
return
}
if _, err := worker.Enqueue(ctx, pool, worker.KindPRMergeability,
map[string]any{"pr_id": prID}, worker.EnqueueOptions{}); err != nil {
if logger != nil {
logger.WarnContext(ctx, "mergeenqueue: enqueue",
"error", err, "pr_id", prID, "trigger", trigger)
}
return
}
_ = worker.Notify(ctx, pool)
}

// ForHeadSHA fans out ForPR to every open PR whose head_oid matches
// headSHA in the given repo. Use after a check run for that SHA
// transitions to "completed".
func ForHeadSHA(ctx context.Context, pool *pgxpool.Pool, logger *slog.Logger, repoID int64, headSHA string) {
if pool == nil || headSHA == "" {
return
}
prIDs, err := pullsdb.New().ListOpenPRsForHeadSHA(ctx, pool, pullsdb.ListOpenPRsForHeadSHAParams{
HeadRepoID: repoID,
HeadOid: headSHA,
})
if err != nil {
if logger != nil {
logger.WarnContext(ctx, "mergeenqueue: list open PRs",
"error", err, "repo_id", repoID, "head_sha", headSHA)
}
return
}
for _, prID := range prIDs {
ForPR(ctx, pool, logger, prID, "check_complete")
}
}
131 changes: 131 additions & 0 deletions internal/pulls/mergeenqueue/mergeenqueue_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
// SPDX-License-Identifier: AGPL-3.0-or-later

package mergeenqueue_test

import (
"context"
"io"
"log/slog"
"testing"

"github.com/jackc/pgx/v5/pgtype"

issuesdb "github.com/tenseleyFlow/shithub/internal/issues/sqlc"
"github.com/tenseleyFlow/shithub/internal/pulls/mergeenqueue"
pullsdb "github.com/tenseleyFlow/shithub/internal/pulls/sqlc"
reposdb "github.com/tenseleyFlow/shithub/internal/repos/sqlc"
"github.com/tenseleyFlow/shithub/internal/testing/dbtest"
usersdb "github.com/tenseleyFlow/shithub/internal/users/sqlc"
)

const fixtureHash = "$argon2id$v=19$m=16384,t=1,p=1$" +
"AAAAAAAAAAAAAAAA$" +
"AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"

// S64: ForHeadSHA fans out one pr:mergeability job per open PR whose
// head_oid matches the given SHA. Used by the check-completion trigger.
// Merged or closed PRs are filtered out.
func TestForHeadSHA_EnqueuesPerOpenPR(t *testing.T) {
ctx := context.Background()
pool := dbtest.NewTestDB(t)
logger := slog.New(slog.NewTextHandler(io.Discard, nil))

uq := usersdb.New()
user, err := uq.CreateUser(ctx, pool, usersdb.CreateUserParams{
Username: "alice", DisplayName: "Alice", PasswordHash: fixtureHash,
})
if err != nil {
t.Fatalf("CreateUser: %v", err)
}
repo, err := reposdb.New().CreateRepo(ctx, pool, reposdb.CreateRepoParams{
OwnerUserID: pgtype.Int8{Int64: user.ID, Valid: true},
Name: "demo",
DefaultBranch: "trunk",
Visibility: reposdb.RepoVisibilityPublic,
})
if err != nil {
t.Fatalf("CreateRepo: %v", err)
}
iq := issuesdb.New()
if err := iq.EnsureRepoIssueCounter(ctx, pool, repo.ID); err != nil {
t.Fatalf("EnsureRepoIssueCounter: %v", err)
}

const sharedSHA = "deadbeefcafef00ddeadbeefcafef00ddeadbeef"
const otherSHA = "0000000000000000000000000000000000000000"

// Two PRs at sharedSHA, one PR at otherSHA — only the first two
// should be enqueued.
var nextNum int64
openPR := func(headRef, headOID string) int64 {
nextNum++
issue, err := iq.CreateIssue(ctx, pool, issuesdb.CreateIssueParams{
RepoID: repo.ID,
Number: nextNum,
Kind: issuesdb.IssueKindPr,
Title: "x",
Body: "",
AuthorUserID: pgtype.Int8{Int64: user.ID, Valid: true},
})
if err != nil {
t.Fatalf("CreateIssue: %v", err)
}
if _, err := pullsdb.New().CreatePullRequest(ctx, pool, pullsdb.CreatePullRequestParams{
IssueID: issue.ID, BaseRef: "trunk", HeadRef: headRef, HeadRepoID: repo.ID,
BaseOid: sharedSHA, HeadOid: headOID, Draft: false,
}); err != nil {
t.Fatalf("CreatePullRequest: %v", err)
}
return issue.ID
}
pr1 := openPR("feature-a", sharedSHA)
pr2 := openPR("feature-b", sharedSHA)
_ = openPR("feature-c", otherSHA)

mergeenqueue.ForHeadSHA(ctx, pool, logger, repo.ID, sharedSHA)

var n1, n2 int
if err := pool.QueryRow(ctx,
`SELECT count(*) FROM jobs WHERE kind = 'pr:mergeability'
AND (payload->>'pr_id')::bigint = $1`, pr1).Scan(&n1); err != nil {
t.Fatalf("count pr1: %v", err)
}
if err := pool.QueryRow(ctx,
`SELECT count(*) FROM jobs WHERE kind = 'pr:mergeability'
AND (payload->>'pr_id')::bigint = $1`, pr2).Scan(&n2); err != nil {
t.Fatalf("count pr2: %v", err)
}
if n1 == 0 || n2 == 0 {
t.Errorf("expected jobs for pr1=%d and pr2=%d; got %d and %d", pr1, pr2, n1, n2)
}

// Verify the other-SHA PR was NOT enqueued.
var total int
if err := pool.QueryRow(ctx,
`SELECT count(*) FROM jobs WHERE kind = 'pr:mergeability'`,
).Scan(&total); err != nil {
t.Fatalf("count total: %v", err)
}
if total > n1+n2 {
t.Errorf("expected only PRs at sharedSHA to enqueue; total=%d (pr1+pr2=%d)", total, n1+n2)
}
}

// Empty SHA is a no-op — guards against accidental fan-out on rows that
// haven't been snapshotted yet.
func TestForHeadSHA_EmptyHeadSHA(t *testing.T) {
pool := dbtest.NewTestDB(t)
logger := slog.New(slog.NewTextHandler(io.Discard, nil))
// No PRs needed: just confirms the helper doesn't panic + writes
// nothing.
mergeenqueue.ForHeadSHA(context.Background(), pool, logger, 1, "")
var n int
if err := pool.QueryRow(context.Background(),
`SELECT count(*) FROM jobs WHERE kind = 'pr:mergeability'`,
).Scan(&n); err != nil {
t.Fatalf("count: %v", err)
}
if n != 0 {
t.Errorf("empty SHA should not enqueue; got %d jobs", n)
}
}
6 changes: 6 additions & 0 deletions internal/pulls/pulls.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/tenseleyFlow/shithub/internal/issues"
issuesdb "github.com/tenseleyFlow/shithub/internal/issues/sqlc"
mdrender "github.com/tenseleyFlow/shithub/internal/markdown"
"github.com/tenseleyFlow/shithub/internal/pulls/mergeenqueue"
"github.com/tenseleyFlow/shithub/internal/pulls/review"
pullsdb "github.com/tenseleyFlow/shithub/internal/pulls/sqlc"
repogit "github.com/tenseleyFlow/shithub/internal/repos/git"
Expand Down Expand Up @@ -163,6 +164,11 @@ func Create(ctx context.Context, deps Deps, p CreateParams) (CreateResult, error
}
}

// S64: kick off the initial mergeability tick so the PR's
// mergeable_state moves off `unknown` without waiting for a human to
// open the HTML review screen (the only previous trigger).
mergeenqueue.ForPR(ctx, deps.Pool, deps.Logger, prRow.IssueID, "pr_create")

return CreateResult{Issue: issueRow, PullRequest: prRow}, nil
}

Expand Down
35 changes: 35 additions & 0 deletions internal/pulls/pulls_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,41 @@ func TestCreate_OpensPRWithIssueRow(t *testing.T) {
}
}

// S64: Create enqueues a pr:mergeability job so the new PR's
// mergeable_state moves off `unknown` without waiting for a human to
// open the HTML review screen. The earlier "review-handler-only" wiring
// left CLI-driven PRs stuck on `unknown` forever — A17/A20 audit.
func TestCreate_EnqueuesMergeabilityJob(t *testing.T) {
f := setup(t)
commitOnBranch(t, f.gitDir, "trunk", "init", "README.md", "hi\n")
commitOnBranch(t, f.gitDir, "feature", "add foo", "foo.txt", "foo\n")

res, err := pulls.Create(context.Background(), f.deps, pulls.CreateParams{
RepoID: f.repoID,
AuthorUserID: f.userID,
Title: "Add foo",
BaseRef: "trunk",
HeadRef: "feature",
GitDir: f.gitDir,
})
if err != nil {
t.Fatalf("Create: %v", err)
}

var n int
if err := f.pool.QueryRow(context.Background(),
`SELECT count(*) FROM jobs
WHERE kind = 'pr:mergeability'
AND (payload->>'pr_id')::bigint = $1`,
res.PullRequest.IssueID,
).Scan(&n); err != nil {
t.Fatalf("count jobs: %v", err)
}
if n < 1 {
t.Errorf("expected at least one pr:mergeability job for PR %d, got %d", res.PullRequest.IssueID, n)
}
}

func TestCreate_RequestsCodeOwners(t *testing.T) {
f := setup(t)
ctx := context.Background()
Expand Down
15 changes: 15 additions & 0 deletions internal/pulls/queries/pulls.sql
Original file line number Diff line number Diff line change
Expand Up @@ -155,3 +155,18 @@ WHERE pr.head_repo_id = $1
AND pr.head_ref = $2
AND i.state = 'open'
AND pr.merged_at IS NULL;


-- name: ListOpenPRsForHeadSHA :many
-- Returns the issue_ids of every still-open PR whose head_repo_id +
-- head_oid match a given SHA. Used by the check-completion trigger
-- (S64) to fan-out pr:mergeability jobs once CI for a head SHA
-- finishes — the required-checks gate inside Mergeability needs a
-- recompute to flip blocked → clean.
SELECT pr.issue_id
FROM pull_requests pr
JOIN issues i ON i.id = pr.issue_id
WHERE pr.head_repo_id = $1
AND pr.head_oid = $2
AND i.state = 'open'
AND pr.merged_at IS NULL;
40 changes: 40 additions & 0 deletions internal/pulls/sqlc/pulls.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions internal/pulls/sqlc/querier.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions internal/web/handlers/api/checks.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/tenseleyFlow/shithub/internal/auth/policy"
"github.com/tenseleyFlow/shithub/internal/checks"
checksdb "github.com/tenseleyFlow/shithub/internal/checks/sqlc"
"github.com/tenseleyFlow/shithub/internal/pulls/mergeenqueue"
reposdb "github.com/tenseleyFlow/shithub/internal/repos/sqlc"
"github.com/tenseleyFlow/shithub/internal/web/middleware"
)
Expand Down Expand Up @@ -198,6 +199,12 @@ func (h *Handlers) checkRunUpdate(w http.ResponseWriter, r *http.Request) {
writeChecksError(w, err)
return
}
// S64: if this update flipped the run into "completed", fan out
// pr:mergeability ticks to every open PR sharing this head SHA so
// blocked-on-required-checks states recompute promptly.
if updated.Status == checksdb.CheckStatusCompleted {
mergeenqueue.ForHeadSHA(r.Context(), h.d.Pool, h.d.Logger, updated.RepoID, updated.HeadSha)
}
writeJSON(w, http.StatusOK, presentRun(updated))
}

Expand Down
Loading