diff --git a/backend/internal/cli/review.go b/backend/internal/cli/review.go index fa9046f9..20207978 100644 --- a/backend/internal/cli/review.go +++ b/backend/internal/cli/review.go @@ -1,6 +1,7 @@ package cli import ( + "encoding/json" "errors" "fmt" "io" @@ -15,29 +16,42 @@ import ( // reviewRun mirrors the daemon's domain.ReviewRun for the CLI client. type reviewRun struct { - ID string `json:"id"` - SessionID string `json:"sessionId"` - Harness string `json:"harness"` - PRURL string `json:"prUrl"` - TargetSHA string `json:"targetSha"` - Status string `json:"status"` - Verdict string `json:"verdict"` - Body string `json:"body"` - CreatedAt time.Time `json:"createdAt"` + ID string `json:"id"` + SessionID string `json:"sessionId"` + BatchID string `json:"batchId"` + Harness string `json:"harness"` + PRURL string `json:"prUrl"` + TargetSHA string `json:"targetSha"` + Status string `json:"status"` + Verdict string `json:"verdict"` + Body string `json:"body"` + GithubReviewID string `json:"githubReviewId"` + CreatedAt time.Time `json:"createdAt"` + DeliveredAt *time.Time `json:"deliveredAt,omitempty"` } // reviewRunResponse mirrors controllers.ReviewRunResponse. type reviewRunResponse struct { - Review reviewRun `json:"review"` - ReviewerHandleID string `json:"reviewerHandleId"` + Review reviewRun `json:"review"` + Reviews []reviewRun `json:"reviews"` + ReviewerHandleID string `json:"reviewerHandleId"` } -// submitReviewRequest mirrors controllers.SubmitReviewInput. -type submitReviewRequest struct { +// submitReviewItem mirrors controllers.SubmitReviewItem. +type submitReviewItem struct { RunID string `json:"runId"` Verdict string `json:"verdict"` - Body string `json:"body"` - GithubReviewID string `json:"githubReviewId"` + Body string `json:"body,omitempty"` + GithubReviewID string `json:"githubReviewId,omitempty"` +} + +// submitReviewRequest mirrors controllers.SubmitReviewInput. +type submitReviewRequest struct { + RunID string `json:"runId,omitempty"` + Verdict string `json:"verdict,omitempty"` + Body string `json:"body,omitempty"` + GithubReviewID string `json:"githubReviewId,omitempty"` + Reviews []submitReviewItem `json:"reviews,omitempty"` } type reviewSubmitOptions struct { @@ -46,6 +60,7 @@ type reviewSubmitOptions struct { verdict string body string reviewID string + reviews string } func newReviewCommand(ctx *commandContext) *cobra.Command { @@ -77,6 +92,7 @@ func newReviewSubmitCommand(ctx *commandContext) *cobra.Command { cmd.Flags().StringVar(&opts.verdict, "verdict", "", "Review verdict: approved or changes_requested (required)") cmd.Flags().StringVar(&opts.body, "body", "", "Review body: a path to a Markdown file, or - to read from stdin (so nothing is written into the worktree)") cmd.Flags().StringVar(&opts.reviewID, "review-id", "", "Id of the GitHub PR review just posted (the .id from the gh api POST that created the review)") + cmd.Flags().StringVar(&opts.reviews, "reviews", "", "JSON review results array or object: a path, or - to read from stdin") return cmd } @@ -88,6 +104,9 @@ func (c *commandContext) submitReview(cmd *cobra.Command, args []string, opts re if session == "" { return usageError{errors.New("usage: worker session id is required (positional or --session)")} } + if strings.TrimSpace(opts.reviews) != "" { + return c.submitReviewBatch(cmd, session, opts) + } runID := strings.TrimSpace(opts.runID) if runID == "" { return usageError{errors.New("usage: --run is required")} @@ -121,3 +140,49 @@ func (c *commandContext) submitReview(cmd *cobra.Command, args []string, opts re _, err := fmt.Fprintf(cmd.OutOrStdout(), "recorded %s review for %s\n", res.Review.Verdict, session) return err } + +func (c *commandContext) submitReviewBatch(cmd *cobra.Command, session string, opts reviewSubmitOptions) error { + if strings.TrimSpace(opts.runID) != "" || strings.TrimSpace(opts.verdict) != "" || strings.TrimSpace(opts.body) != "" || strings.TrimSpace(opts.reviewID) != "" { + return usageError{errors.New("usage: --reviews cannot be combined with --run, --verdict, --body, or --review-id")} + } + reviews, err := readReviewItems(cmd, strings.TrimSpace(opts.reviews)) + if err != nil { + return err + } + path := "sessions/" + url.PathEscape(session) + "/reviews/submit" + var res reviewRunResponse + if err := c.postJSON(cmd.Context(), path, submitReviewRequest{Reviews: reviews}, &res); err != nil { + return err + } + count := len(res.Reviews) + if count == 0 { + count = len(reviews) + } + _, err = fmt.Fprintf(cmd.OutOrStdout(), "recorded %d review(s) for %s\n", count, session) + return err +} + +func readReviewItems(cmd *cobra.Command, path string) ([]submitReviewItem, error) { + var raw []byte + var err error + if path == "-" { + raw, err = io.ReadAll(cmd.InOrStdin()) + } else { + raw, err = os.ReadFile(path) + } + if err != nil { + return nil, usageError{fmt.Errorf("read review results: %w", err)} + } + var req submitReviewRequest + if err := json.Unmarshal(raw, &req); err == nil && len(req.Reviews) > 0 { + return req.Reviews, nil + } + var reviews []submitReviewItem + if err := json.Unmarshal(raw, &reviews); err != nil { + return nil, usageError{fmt.Errorf("decode review results JSON: %w", err)} + } + if len(reviews) == 0 { + return nil, usageError{errors.New("usage: --reviews requires at least one review result")} + } + return reviews, nil +} diff --git a/backend/internal/cli/review_test.go b/backend/internal/cli/review_test.go index b3033de1..4c3a1481 100644 --- a/backend/internal/cli/review_test.go +++ b/backend/internal/cli/review_test.go @@ -104,6 +104,32 @@ func TestReviewSubmitAcceptsUnderscoreFlags(t *testing.T) { } } +func TestReviewSubmitBatchReadsReviewsFromStdin(t *testing.T) { + cfg := setConfigEnv(t) + srv, capture := reviewServer(t, http.StatusOK, `{"reviews":[{"id":"run-1","verdict":"changes_requested"},{"id":"run-2","verdict":"approved"}]}`) + writeRunFileFor(t, cfg, srv) + + deps := aliveDeps() + deps.In = strings.NewReader(`{"reviews":[{"runId":"run-1","verdict":"changes_requested","body":"fix auth","githubReviewId":"101"},{"runId":"run-2","verdict":"approved","body":"looks good"}]}`) + out, errOut, err := executeCLI(t, deps, "review", "submit", "mer-1", "--reviews", "-") + if err != nil { + t.Fatalf("unexpected error: %v\nstderr=%s", err, errOut) + } + if !strings.Contains(out, "recorded 2 review(s) for mer-1") { + t.Fatalf("stdout = %q", out) + } + var req submitReviewRequest + if err := json.Unmarshal([]byte(capture.body), &req); err != nil { + t.Fatalf("decode body: %v", err) + } + if len(req.Reviews) != 2 || req.Reviews[0].RunID != "run-1" || req.Reviews[0].GithubReviewID != "101" || req.Reviews[1].Verdict != "approved" { + t.Fatalf("request = %+v", req) + } + if req.RunID != "" || req.Verdict != "" { + t.Fatalf("batch request should not also set legacy fields: %+v", req) + } +} + func TestReviewSubmitUsesSessionFlag(t *testing.T) { cfg := setConfigEnv(t) srv, capture := reviewServer(t, http.StatusOK, `{"review":{"verdict":"approved"}}`) diff --git a/backend/internal/domain/review.go b/backend/internal/domain/review.go index 6750a16d..749cb6c2 100644 --- a/backend/internal/domain/review.go +++ b/backend/internal/domain/review.go @@ -29,11 +29,15 @@ type Review struct { // ReviewRun is one review pass against a worker's PR. type ReviewRun struct { - ID string `json:"id"` - ReviewID string `json:"reviewId"` - SessionID SessionID `json:"sessionId"` - Harness ReviewerHarness `json:"harness"` - PRURL string `json:"prUrl"` + ID string `json:"id"` + ReviewID string `json:"reviewId"` + SessionID SessionID `json:"sessionId"` + // BatchID groups review runs created by one trigger so worker feedback can + // be delivered once after the whole trigger batch is terminal. Empty marks + // legacy/single-run delivery. + BatchID string `json:"batchId"` + Harness ReviewerHarness `json:"harness"` + PRURL string `json:"prUrl"` // TargetSHA is the PR head commit this pass reviewed. TargetSHA string `json:"targetSha"` Status ReviewRunStatus `json:"status"` diff --git a/backend/internal/httpd/apispec/openapi.yaml b/backend/internal/httpd/apispec/openapi.yaml index a66191e5..5dcf1891 100644 --- a/backend/internal/httpd/apispec/openapi.yaml +++ b/backend/internal/httpd/apispec/openapi.yaml @@ -1243,13 +1243,13 @@ paths: content: application/json: schema: - $ref: '#/components/schemas/ReviewRunResponse' + $ref: '#/components/schemas/TriggerReviewResponse' description: OK "201": content: application/json: schema: - $ref: '#/components/schemas/ReviewRunResponse' + $ref: '#/components/schemas/TriggerReviewResponse' description: Created "404": content: @@ -1636,7 +1636,7 @@ components: type: string reviews: items: - $ref: '#/components/schemas/ReviewRun' + $ref: '#/components/schemas/PRReviewState' type: array required: - reviewerHandleId @@ -1772,6 +1772,33 @@ components: - id - projectId type: object + PRReviewState: + properties: + latestRun: + $ref: '#/components/schemas/ReviewRun' + prNumber: + type: integer + prUrl: + type: string + status: + enum: + - needs_review + - running + - up_to_date + - changes_requested + - ineligible + type: string + targetSha: + type: string + title: + type: string + required: + - prUrl + - prNumber + - title + - targetSha + - status + type: object Project: properties: agent: @@ -1933,6 +1960,8 @@ components: type: object ReviewRun: properties: + batchId: + type: string body: type: string createdAt: @@ -1965,6 +1994,7 @@ components: - id - reviewId - sessionId + - batchId - harness - prUrl - targetSha @@ -1980,8 +2010,13 @@ components: $ref: '#/components/schemas/ReviewRun' reviewerHandleId: type: string + reviews: + items: + $ref: '#/components/schemas/ReviewRun' + type: array required: - review + - reviews - reviewerHandleId type: object RoleOverride: @@ -2384,6 +2419,26 @@ components: - projectId type: object SubmitReviewInput: + properties: + body: + description: Review body recorded by AO. Required for changes_requested. + type: string + githubReviewId: + description: Id of the GitHub PR review the reviewer posted, if any. + type: string + reviews: + description: Batched review results recorded by one reviewer CLI command. + items: + $ref: '#/components/schemas/SubmitReviewItem' + type: array + runId: + description: Review run id being completed. + type: string + verdict: + description: 'Review verdict: approved or changes_requested.' + type: string + type: object + SubmitReviewItem: properties: body: description: Review body recorded by AO. Required for changes_requested. @@ -2400,8 +2455,18 @@ components: required: - runId - verdict - - body - - githubReviewId + type: object + TriggerReviewResponse: + properties: + reviewerHandleId: + type: string + reviews: + items: + $ref: '#/components/schemas/PRReviewState' + type: array + required: + - reviewerHandleId + - reviews type: object WorkspaceRepo: properties: diff --git a/backend/internal/httpd/apispec/specgen/build.go b/backend/internal/httpd/apispec/specgen/build.go index a2611d61..c4f60e25 100644 --- a/backend/internal/httpd/apispec/specgen/build.go +++ b/backend/internal/httpd/apispec/specgen/build.go @@ -181,11 +181,14 @@ var schemaNames = map[string]string{ "ControllersResolveCommentsRequest": "ResolveCommentsRequest", "ControllersResolveCommentsResponse": "ResolveCommentsResponse", // httpd/controllers — review wire envelopes - "ControllersListReviewsResponse": "ListReviewsResponse", - "ControllersReviewRunResponse": "ReviewRunResponse", - "ControllersSubmitReviewInput": "SubmitReviewInput", + "ControllersListReviewsResponse": "ListReviewsResponse", + "ControllersReviewRunResponse": "ReviewRunResponse", + "ControllersTriggerReviewResponse": "TriggerReviewResponse", + "ControllersSubmitReviewItem": "SubmitReviewItem", + "ControllersSubmitReviewInput": "SubmitReviewInput", // domain review entities - "DomainReviewRun": "ReviewRun", + "DomainReviewRun": "ReviewRun", + "ReviewPRReviewState": "PRReviewState", // service/project entities + DTOs "ProjectProject": "Project", "ProjectSummary": "ProjectSummary", @@ -345,8 +348,8 @@ func reviewOperations() []operation { summary: "Trigger a code review of a worker's PR", pathParams: []any{controllers.SessionIDParam{}}, resps: []respUnit{ - {http.StatusOK, controllers.ReviewRunResponse{}}, - {http.StatusCreated, controllers.ReviewRunResponse{}}, + {http.StatusOK, controllers.TriggerReviewResponse{}}, + {http.StatusCreated, controllers.TriggerReviewResponse{}}, {http.StatusUnprocessableEntity, envelope.APIError{}}, {http.StatusNotFound, envelope.APIError{}}, {http.StatusNotImplemented, envelope.APIError{}}, diff --git a/backend/internal/httpd/controllers/reviews.go b/backend/internal/httpd/controllers/reviews.go index e629670f..1790e3c1 100644 --- a/backend/internal/httpd/controllers/reviews.go +++ b/backend/internal/httpd/controllers/reviews.go @@ -10,6 +10,7 @@ import ( "github.com/aoagents/agent-orchestrator/backend/internal/domain" "github.com/aoagents/agent-orchestrator/backend/internal/httpd/apispec" "github.com/aoagents/agent-orchestrator/backend/internal/httpd/envelope" + reviewcore "github.com/aoagents/agent-orchestrator/backend/internal/review" reviewsvc "github.com/aoagents/agent-orchestrator/backend/internal/service/review" ) @@ -17,23 +18,40 @@ import ( // reviewerHandleId is the live reviewer pane's runtime handle, for the UI to // attach its terminal over /mux (empty when no reviewer has run). type ListReviewsResponse struct { - ReviewerHandleID string `json:"reviewerHandleId"` - Reviews []domain.ReviewRun `json:"reviews"` + ReviewerHandleID string `json:"reviewerHandleId"` + Reviews []reviewcore.PRReviewState `json:"reviews"` } -// ReviewRunResponse is the body of trigger (200/201) and submit (200). It -// carries the run plus the reviewer pane handle so the UI can attach a terminal. +// ReviewRunResponse is the body of submit (200). It carries the run plus the +// reviewer pane handle so the UI can attach a terminal. type ReviewRunResponse struct { - Review domain.ReviewRun `json:"review"` - ReviewerHandleID string `json:"reviewerHandleId"` + Review domain.ReviewRun `json:"review"` + Reviews []domain.ReviewRun `json:"reviews"` + ReviewerHandleID string `json:"reviewerHandleId"` } -// SubmitReviewInput is the body of POST /api/v1/sessions/{sessionId}/reviews/submit. -type SubmitReviewInput struct { +// TriggerReviewResponse is the body of trigger (200/201). reviews carries the +// PR-scoped review state after the trigger. +type TriggerReviewResponse struct { + ReviewerHandleID string `json:"reviewerHandleId"` + Reviews []reviewcore.PRReviewState `json:"reviews"` +} + +// SubmitReviewItem is one review result in a batched submit request. +type SubmitReviewItem struct { RunID string `json:"runId" description:"Review run id being completed."` Verdict string `json:"verdict" description:"Review verdict: approved or changes_requested."` - Body string `json:"body" description:"Review body recorded by AO. Required for changes_requested."` - GithubReviewID string `json:"githubReviewId" description:"Id of the GitHub PR review the reviewer posted, if any."` + Body string `json:"body,omitempty" description:"Review body recorded by AO. Required for changes_requested."` + GithubReviewID string `json:"githubReviewId,omitempty" description:"Id of the GitHub PR review the reviewer posted, if any."` +} + +// SubmitReviewInput is the body of POST /api/v1/sessions/{sessionId}/reviews/submit. +type SubmitReviewInput struct { + RunID string `json:"runId,omitempty" description:"Review run id being completed."` + Verdict string `json:"verdict,omitempty" description:"Review verdict: approved or changes_requested."` + Body string `json:"body,omitempty" description:"Review body recorded by AO. Required for changes_requested."` + GithubReviewID string `json:"githubReviewId,omitempty" description:"Id of the GitHub PR review the reviewer posted, if any."` + Reviews []SubmitReviewItem `json:"reviews,omitempty" description:"Batched review results recorded by one reviewer CLI command."` } // ReviewsController owns the session-scoped /reviews routes. A nil Svc returns 501. @@ -58,11 +76,11 @@ func (c *ReviewsController) list(w http.ResponseWriter, r *http.Request) { writeReviewError(w, r, err) return } - runs := res.Runs - if runs == nil { - runs = []domain.ReviewRun{} + reviews := res.Reviews + if reviews == nil { + reviews = []reviewcore.PRReviewState{} } - envelope.WriteJSON(w, http.StatusOK, ListReviewsResponse{ReviewerHandleID: res.ReviewerHandleID, Reviews: runs}) + envelope.WriteJSON(w, http.StatusOK, ListReviewsResponse{ReviewerHandleID: res.ReviewerHandleID, Reviews: reviews}) } func (c *ReviewsController) trigger(w http.ResponseWriter, r *http.Request) { @@ -81,7 +99,14 @@ func (c *ReviewsController) trigger(w http.ResponseWriter, r *http.Request) { if res.Created { status = http.StatusCreated } - envelope.WriteJSON(w, status, ReviewRunResponse{Review: res.Run, ReviewerHandleID: res.ReviewerHandleID}) + reviews := res.Reviews + if reviews == nil { + reviews = []reviewcore.PRReviewState{} + } + envelope.WriteJSON(w, status, TriggerReviewResponse{ + ReviewerHandleID: res.ReviewerHandleID, + Reviews: reviews, + }) } func (c *ReviewsController) submit(w http.ResponseWriter, r *http.Request) { @@ -94,12 +119,34 @@ func (c *ReviewsController) submit(w http.ResponseWriter, r *http.Request) { envelope.WriteAPIError(w, r, http.StatusBadRequest, "bad_request", "INVALID_BODY", "Invalid request body", nil) return } - run, err := c.Svc.Submit(r.Context(), sessionID(r), in.RunID, domain.ReviewVerdict(in.Verdict), in.Body, in.GithubReviewID) + reviews := make([]reviewsvc.SubmittedReview, 0, len(in.Reviews)) + if len(in.Reviews) > 0 { + for _, item := range in.Reviews { + reviews = append(reviews, reviewsvc.SubmittedReview{ + RunID: item.RunID, + Verdict: domain.ReviewVerdict(item.Verdict), + Body: item.Body, + GithubReviewID: item.GithubReviewID, + }) + } + } else { + reviews = append(reviews, reviewsvc.SubmittedReview{ + RunID: in.RunID, + Verdict: domain.ReviewVerdict(in.Verdict), + Body: in.Body, + GithubReviewID: in.GithubReviewID, + }) + } + runs, err := c.Svc.SubmitMany(r.Context(), sessionID(r), reviews) if err != nil { writeReviewError(w, r, err) return } - envelope.WriteJSON(w, http.StatusOK, ReviewRunResponse{Review: run}) + first := domain.ReviewRun{} + if len(runs) > 0 { + first = runs[0] + } + envelope.WriteJSON(w, http.StatusOK, ReviewRunResponse{Review: first, Reviews: runs}) } func writeReviewError(w http.ResponseWriter, r *http.Request, err error) { diff --git a/backend/internal/httpd/controllers/reviews_test.go b/backend/internal/httpd/controllers/reviews_test.go index 674105eb..123c4542 100644 --- a/backend/internal/httpd/controllers/reviews_test.go +++ b/backend/internal/httpd/controllers/reviews_test.go @@ -20,12 +20,18 @@ import ( type fakeReviewService struct { triggerErr error + trigger reviewcore.TriggerResult + list reviewcore.SessionReviews + submitted []reviewsvc.SubmittedReview } func (f *fakeReviewService) Trigger(context.Context, domain.SessionID) (reviewcore.TriggerResult, error) { if f.triggerErr != nil { return reviewcore.TriggerResult{}, f.triggerErr } + if f.trigger.ReviewerHandleID != "" || f.trigger.Run.ID != "" || f.trigger.Reviews != nil || f.trigger.CreatedRuns != nil { + return f.trigger, nil + } return reviewcore.TriggerResult{Run: domain.ReviewRun{ID: "run-1"}, Created: true}, nil } @@ -33,8 +39,17 @@ func (f *fakeReviewService) Submit(context.Context, domain.SessionID, string, do return domain.ReviewRun{}, nil } +func (f *fakeReviewService) SubmitMany(_ context.Context, _ domain.SessionID, reviews []reviewsvc.SubmittedReview) ([]domain.ReviewRun, error) { + f.submitted = append([]reviewsvc.SubmittedReview(nil), reviews...) + runs := make([]domain.ReviewRun, 0, len(reviews)) + for _, review := range reviews { + runs = append(runs, domain.ReviewRun{ID: review.RunID, Verdict: review.Verdict, Body: review.Body, GithubReviewID: review.GithubReviewID}) + } + return runs, nil +} + func (f *fakeReviewService) List(context.Context, domain.SessionID) (reviewcore.SessionReviews, error) { - return reviewcore.SessionReviews{}, nil + return f.list, nil } func newReviewTestServer(t *testing.T, svc reviewsvc.Manager) *httptest.Server { @@ -59,3 +74,73 @@ func TestReviewsTrigger_MissingReviewerBinaryReturns422WithCause(t *testing.T) { t.Fatalf("message = %q, want reviewer binary cause", got.Message) } } + +func TestReviewsListIncludesReviewStates(t *testing.T) { + srv := newReviewTestServer(t, &fakeReviewService{list: reviewcore.SessionReviews{ + ReviewerHandleID: "review-mer-1", + Runs: []domain.ReviewRun{{ID: "run-1", PRURL: "https://github.com/o/r/pull/1", TargetSHA: "sha1"}}, + Reviews: []reviewcore.PRReviewState{{PRURL: "https://github.com/o/r/pull/1", PRNumber: 1, TargetSHA: "sha1", Status: reviewcore.ReviewStateUpToDate}}, + }}) + + body, status, headers := doRequest(t, srv, "GET", "/api/v1/sessions/mer-1/reviews", "") + assertJSON(t, headers) + if status != http.StatusOK { + t.Fatalf("status = %d body=%s", status, body) + } + if !strings.Contains(string(body), `"reviews"`) || !strings.Contains(string(body), `"up_to_date"`) || !strings.Contains(string(body), `"reviewerHandleId":"review-mer-1"`) { + t.Fatalf("body missing review states/handle: %s", body) + } + if strings.Contains(string(body), `"items"`) || strings.Contains(string(body), `"reviewItems"`) || strings.Contains(string(body), `"reviewRuns"`) { + t.Fatalf("body contains deprecated review item aliases: %s", body) + } +} + +func TestReviewsTriggerIncludesBatchFields(t *testing.T) { + run1 := domain.ReviewRun{ID: "run-1", PRURL: "https://github.com/o/r/pull/1", TargetSHA: "sha1"} + run2 := domain.ReviewRun{ID: "run-2", PRURL: "https://github.com/o/r/pull/2", TargetSHA: "sha2"} + srv := newReviewTestServer(t, &fakeReviewService{trigger: reviewcore.TriggerResult{ + Run: run1, + ReviewerHandleID: "review-mer-1", + Created: true, + CreatedRuns: []domain.ReviewRun{run1, run2}, + Reviews: []reviewcore.PRReviewState{ + {PRURL: run1.PRURL, PRNumber: 1, TargetSHA: run1.TargetSHA, Status: reviewcore.ReviewStateRunning, LatestRun: &run1}, + {PRURL: run2.PRURL, PRNumber: 2, TargetSHA: run2.TargetSHA, Status: reviewcore.ReviewStateRunning, LatestRun: &run2}, + }, + }}) + + body, status, headers := doRequest(t, srv, "POST", "/api/v1/sessions/mer-1/reviews/trigger", "") + assertJSON(t, headers) + if status != http.StatusCreated { + t.Fatalf("status = %d body=%s", status, body) + } + for _, want := range []string{`"reviews"`, `"running"`, `"run-1"`, `"run-2"`, `"reviewerHandleId":"review-mer-1"`} { + if !strings.Contains(string(body), want) { + t.Fatalf("body missing %s: %s", want, body) + } + } + for _, unwanted := range []string{`"reviewItems"`, `"items"`, `"createdReviews"`, `"createdRuns"`, `"reviewRuns"`, `"review":`} { + if strings.Contains(string(body), unwanted) { + t.Fatalf("body contains deprecated field %s: %s", unwanted, body) + } + } +} + +func TestReviewsSubmitAcceptsBatchedReviews(t *testing.T) { + svc := &fakeReviewService{} + srv := newReviewTestServer(t, svc) + + body, status, headers := doRequest(t, srv, "POST", "/api/v1/sessions/mer-1/reviews/submit", `{"reviews":[{"runId":"run-1","verdict":"changes_requested","body":"fix auth","githubReviewId":"101"},{"runId":"run-2","verdict":"approved"}]}`) + assertJSON(t, headers) + if status != http.StatusOK { + t.Fatalf("status = %d body=%s", status, body) + } + if len(svc.submitted) != 2 || svc.submitted[0].RunID != "run-1" || svc.submitted[1].Verdict != domain.VerdictApproved { + t.Fatalf("submitted = %+v", svc.submitted) + } + for _, want := range []string{`"reviews"`, `"run-1"`, `"run-2"`} { + if !strings.Contains(string(body), want) { + t.Fatalf("body missing %s: %s", want, body) + } + } +} diff --git a/backend/internal/lifecycle/manager_test.go b/backend/internal/lifecycle/manager_test.go index 4ccbe21d..c8526c46 100644 --- a/backend/internal/lifecycle/manager_test.go +++ b/backend/internal/lifecycle/manager_test.go @@ -543,8 +543,10 @@ func TestApplyReviewResultSendsAndDedupsThroughPRSignature(t *testing.T) { t.Fatalf("outcome/messages = %q/%v, want sent once", outcome, msg.msgs) } got := msg.msgs[0] - if !strings.Contains(got, "[AO reviewer]") || !strings.Contains(got, "fix the bug") || !strings.Contains(got, "98[2J765") { - t.Fatalf("AO review nudge missing label/body/review id: %q", got) + for _, want := range []string{"[AO reviewer]", "PR: " + result.PRURL, "Verdict: changes_requested", "Review body:\nfix the bug", "GitHub review: 98[2J765"} { + if !strings.Contains(got, want) { + t.Fatalf("AO review nudge missing %q: %q", want, got) + } } if strings.Contains(got, "\x1b") { t.Fatalf("AO review nudge should sanitize control bytes: %q", got) @@ -572,6 +574,65 @@ func TestApplyReviewResultSendsAndDedupsThroughPRSignature(t *testing.T) { } } +func TestApplyReviewBatchSendsCombinedAndDedups(t *testing.T) { + st := newFakeStore() + st.sessions["mer-1"] = working("mer-1") + msg := &fakeMessenger{} + m := New(st, msg) + results := []ReviewResult{ + {RunID: "run-2", BatchID: "batch-1", WorkerID: "mer-1", PRURL: "https://github.com/o/r/pull/2", TargetSHA: "sha2", Verdict: domain.VerdictChangesRequested, Body: "fix tests", GithubReviewID: "102"}, + {RunID: "run-1", BatchID: "batch-1", WorkerID: "mer-1", PRURL: "https://github.com/o/r/pull/1", TargetSHA: "sha1", Verdict: domain.VerdictChangesRequested, Body: "fix auth", GithubReviewID: "101"}, + } + + outcome, err := m.ApplyReviewBatch(ctx, "mer-1", "batch-1", results) + if err != nil { + t.Fatalf("ApplyReviewBatch: %v", err) + } + if outcome != ReviewDeliverySent || len(msg.msgs) != 1 { + t.Fatalf("outcome/messages = %q/%v, want sent once", outcome, msg.msgs) + } + got := msg.msgs[0] + for _, want := range []string{ + "submitted 2 review(s) requesting changes", + "PR: https://github.com/o/r/pull/1", + "GitHub review: 101", + "Review body:\nfix auth", + "PR: https://github.com/o/r/pull/2", + "GitHub review: 102", + "Review body:\nfix tests", + } { + if !strings.Contains(got, want) { + t.Fatalf("batch nudge missing %q: %q", want, got) + } + } + if st.signatures["https://github.com/o/r/pull/1"] == "" { + t.Fatal("batch nudge did not persist signature on anchor PR") + } + + outcome, err = m.ApplyReviewBatch(ctx, "mer-1", "batch-1", results) + if err != nil { + t.Fatalf("repeat ApplyReviewBatch: %v", err) + } + if outcome != ReviewDeliverySent || len(msg.msgs) != 1 { + t.Fatalf("repeat should suppress duplicate send, outcome=%q msgs=%v", outcome, msg.msgs) + } +} + +func TestApplyReviewBatchNoopsWithoutDeliverableResults(t *testing.T) { + st := newFakeStore() + st.sessions["mer-1"] = working("mer-1") + msg := &fakeMessenger{} + m := New(st, msg) + + outcome, err := m.ApplyReviewBatch(ctx, "mer-1", "batch-1", nil) + if err != nil { + t.Fatalf("ApplyReviewBatch: %v", err) + } + if outcome != ReviewDeliveryNoop || len(msg.msgs) != 0 || st.signatureWrites != 0 { + t.Fatalf("empty batch should no-op, outcome=%q msgs=%v signatureWrites=%d", outcome, msg.msgs, st.signatureWrites) + } +} + func TestApplyReviewResultNoopsWhenIrrelevant(t *testing.T) { deliveredAt := time.Unix(100, 0).UTC() tests := []struct { diff --git a/backend/internal/lifecycle/reactions.go b/backend/internal/lifecycle/reactions.go index 52611956..3b115afe 100644 --- a/backend/internal/lifecycle/reactions.go +++ b/backend/internal/lifecycle/reactions.go @@ -5,6 +5,7 @@ import ( "encoding/json" "fmt" "log/slog" + "sort" "strings" "sync" "time" @@ -33,6 +34,7 @@ const ( // review_run row. type ReviewResult struct { RunID string + BatchID string WorkerID domain.SessionID PRURL string TargetSHA string @@ -42,6 +44,56 @@ type ReviewResult struct { DeliveredAt *time.Time } +// ApplyReviewBatch reacts to one reviewer CLI submission after the review +// service has decided which current-head changes-requested results are +// deliverable. +func (m *Manager) ApplyReviewBatch(ctx context.Context, workerID domain.SessionID, batchID string, results []ReviewResult) (ReviewDeliveryOutcome, error) { + if batchID == "" || len(results) == 0 { + return ReviewDeliveryNoop, nil + } + rec, ok, err := m.store.GetSession(ctx, workerID) + if err != nil || !ok { + return ReviewDeliveryNoop, err + } + if rec.IsTerminated || rec.Activity.State == domain.ActivityWaitingInput { + return ReviewDeliveryNoop, nil + } + if m.messenger == nil { + return ReviewDeliveryNoop, nil + } + sort.Slice(results, func(i, j int) bool { + if results[i].PRURL != results[j].PRURL { + return results[i].PRURL < results[j].PRURL + } + return results[i].RunID < results[j].RunID + }) + var msg strings.Builder + fmt.Fprintf(&msg, "[AO reviewer] AO's internal code reviewer submitted %d review(s) requesting changes.\n", len(results)) + var sigParts []string + for i, r := range results { + fmt.Fprintf(&msg, "\nReview %d\nPR: %s\nVerdict: %s", i+1, domain.SanitizeControlChars(r.PRURL), domain.SanitizeControlChars(string(r.Verdict))) + if r.TargetSHA != "" { + fmt.Fprintf(&msg, "\nHead commit: %s", domain.SanitizeControlChars(r.TargetSHA)) + } + if r.GithubReviewID != "" { + safeReviewID := domain.SanitizeControlChars(r.GithubReviewID) + fmt.Fprintf(&msg, "\nGitHub review: %s", safeReviewID) + fmt.Fprintf(&msg, "\nOnce you have addressed it, reply on GitHub review %s with how you addressed it, then resolve the review comment threads you addressed.", safeReviewID) + } + if r.Body != "" { + fmt.Fprintf(&msg, "\n\nReview body:\n%s\n", domain.SanitizeControlChars(r.Body)) + } + sigParts = append(sigParts, strings.Join([]string{r.RunID, r.PRURL, r.TargetSHA, r.GithubReviewID, r.Body}, "\x00")) + } + anchorPR := results[0].PRURL + key := "review-batch:" + anchorPR + ":" + batchID + sig := strings.Join(sigParts, "\x01") + if err := m.sendOnce(ctx, workerID, anchorPR, key, sig, msg.String(), reviewMaxNudge); err != nil { + return ReviewDeliveryNoop, err + } + return ReviewDeliverySent, nil +} + type reactionState struct { mu sync.Mutex seen map[string]string @@ -154,13 +206,14 @@ func (m *Manager) ApplyReviewResult(ctx context.Context, workerID domain.Session if m.messenger == nil { return ReviewDeliveryNoop, nil } - msg := "[AO reviewer] AO's internal code reviewer requested changes on your PR. Review the feedback below and address it." + msg := fmt.Sprintf("[AO reviewer] AO's internal code reviewer submitted a review.\n\nPR: %s\nVerdict: %s", domain.SanitizeControlChars(r.PRURL), domain.SanitizeControlChars(string(r.Verdict))) if r.GithubReviewID != "" { safeReviewID := domain.SanitizeControlChars(r.GithubReviewID) - msg += fmt.Sprintf(" This feedback is GitHub review %s. Once you have addressed it, reply on that review referencing id %s with how you addressed it, then resolve the review comment threads you addressed.", safeReviewID, safeReviewID) + msg += fmt.Sprintf("\nGitHub review: %s", safeReviewID) + msg += fmt.Sprintf("\n\nOnce you have addressed it, reply on GitHub review %s with how you addressed it, then resolve the review comment threads you addressed.", safeReviewID) } if r.Body != "" { - msg += "\n\n" + domain.SanitizeControlChars(r.Body) + msg += "\n\nReview body:\n" + domain.SanitizeControlChars(r.Body) } key := "review:" + r.PRURL + ":ao:" + r.RunID sig := strings.Join([]string{r.TargetSHA, r.RunID, r.GithubReviewID, r.Body}, "\x00") diff --git a/backend/internal/ports/reviewer.go b/backend/internal/ports/reviewer.go index a42df4d7..21bb4ffe 100644 --- a/backend/internal/ports/reviewer.go +++ b/backend/internal/ports/reviewer.go @@ -37,6 +37,11 @@ type ReviewInvocation struct { PRURL string // TargetSHA is the PR head commit under review. TargetSHA string + // ReviewQueue lists all review tasks created by the same trigger so a shared + // reviewer pane can review multiple PRs and submit the results together. + ReviewQueue []ReviewTask + // ReviewIndex is this invocation's zero-based position in ReviewQueue. + ReviewIndex int // WorkspacePath is the worker's checkout the reviewer reads. WorkspacePath string // Prompt and SystemPrompt are the review instructions AO authored centrally, @@ -48,6 +53,13 @@ type ReviewInvocation struct { SystemPrompt string } +// ReviewTask is one PR/run in a multi-PR review trigger queue. +type ReviewTask struct { + RunID string + PRURL string + TargetSHA string +} + // ReviewCommandSpec is how to launch a reviewer: the argv and any extra env the // adapter needs. AO supplies the workspace and review-tracking env around it. type ReviewCommandSpec struct { diff --git a/backend/internal/review/launcher.go b/backend/internal/review/launcher.go index fef9d0b3..b03dbc3a 100644 --- a/backend/internal/review/launcher.go +++ b/backend/internal/review/launcher.go @@ -31,6 +31,8 @@ type LaunchSpec struct { WorkspacePath string PRURL string TargetSHA string + ReviewQueue []ports.ReviewTask + ReviewIndex int } // reviewerRuntime is the runtime surface the launcher needs: create a pane, @@ -73,6 +75,8 @@ func (l *agentLauncher) invocation(spec LaunchSpec) ports.ReviewInvocation { WorkerSessionID: spec.WorkerID, PRURL: spec.PRURL, TargetSHA: spec.TargetSHA, + ReviewQueue: spec.ReviewQueue, + ReviewIndex: spec.ReviewIndex, WorkspacePath: spec.WorkspacePath, Prompt: prompt, SystemPrompt: systemPrompt, diff --git a/backend/internal/review/planner.go b/backend/internal/review/planner.go new file mode 100644 index 00000000..a2cbdd6f --- /dev/null +++ b/backend/internal/review/planner.go @@ -0,0 +1,95 @@ +package review + +import ( + "sort" + + "github.com/aoagents/agent-orchestrator/backend/internal/domain" +) + +// StateStatus is the per-PR review planning state. +type StateStatus string + +const ( + // ReviewStateNeedsReview means an eligible PR has no current AO approval or running pass. + ReviewStateNeedsReview StateStatus = "needs_review" + // ReviewStateRunning means a review run is already active for the PR's current head. + ReviewStateRunning StateStatus = "running" + // ReviewStateUpToDate means AO approved the PR's current head. + ReviewStateUpToDate StateStatus = "up_to_date" + // ReviewStateChangesRequested means AO requested changes on the PR's current head. + ReviewStateChangesRequested StateStatus = "changes_requested" + // ReviewStateIneligible means the PR is draft, closed, merged, or missing required facts. + ReviewStateIneligible StateStatus = "ineligible" +) + +// PRReviewState is one PR-scoped review decision for a worker session. +type PRReviewState struct { + PRURL string `json:"prUrl"` + PRNumber int `json:"prNumber"` + Title string `json:"title"` + TargetSHA string `json:"targetSha"` + Status StateStatus `json:"status" enum:"needs_review,running,up_to_date,changes_requested,ineligible"` + LatestRun *domain.ReviewRun `json:"latestRun,omitempty"` +} + +// Plan computes per-PR review work from the currently observed PRs and existing +// review runs. It is pure so the trigger path and API list path share exactly +// the same eligibility/status rules. +func Plan(prs []domain.PullRequest, runs []domain.ReviewRun) []PRReviewState { + latest := latestRunsByPRAndSHA(runs) + reviews := make([]PRReviewState, 0, len(prs)) + for _, pr := range prs { + review := PRReviewState{ + PRURL: pr.URL, + PRNumber: pr.Number, + Title: pr.Title, + TargetSHA: pr.HeadSHA, + Status: ReviewStateNeedsReview, + } + if pr.URL == "" || pr.HeadSHA == "" || pr.Draft || pr.Merged || pr.Closed { + review.Status = ReviewStateIneligible + if run, ok := latest[review.PRURL+"\x00"+review.TargetSHA]; ok { + review.LatestRun = &run + } + reviews = append(reviews, review) + continue + } + if run, ok := latest[review.PRURL+"\x00"+review.TargetSHA]; ok { + review.LatestRun = &run + switch { + case run.Status == domain.ReviewRunRunning: + review.Status = ReviewStateRunning + case run.Verdict == domain.VerdictApproved: + review.Status = ReviewStateUpToDate + case run.Verdict == domain.VerdictChangesRequested: + review.Status = ReviewStateChangesRequested + case run.Status == domain.ReviewRunFailed: + review.Status = ReviewStateNeedsReview + default: + review.Status = ReviewStateNeedsReview + } + } + reviews = append(reviews, review) + } + sort.SliceStable(reviews, func(i, j int) bool { + if reviews[i].PRNumber != reviews[j].PRNumber { + return reviews[i].PRNumber < reviews[j].PRNumber + } + return reviews[i].PRURL < reviews[j].PRURL + }) + return reviews +} + +func latestRunsByPRAndSHA(runs []domain.ReviewRun) map[string]domain.ReviewRun { + latest := make(map[string]domain.ReviewRun) + for _, run := range runs { + if run.PRURL == "" || run.TargetSHA == "" { + continue + } + key := run.PRURL + "\x00" + run.TargetSHA + if existing, ok := latest[key]; !ok || run.CreatedAt.After(existing.CreatedAt) { + latest[key] = run + } + } + return latest +} diff --git a/backend/internal/review/planner_test.go b/backend/internal/review/planner_test.go new file mode 100644 index 00000000..9b655f2a --- /dev/null +++ b/backend/internal/review/planner_test.go @@ -0,0 +1,68 @@ +package review + +import ( + "testing" + "time" + + "github.com/aoagents/agent-orchestrator/backend/internal/domain" +) + +func TestPlanStatuses(t *testing.T) { + now := time.Unix(100, 0).UTC() + tests := []struct { + name string + pr domain.PullRequest + runs []domain.ReviewRun + want StateStatus + }{ + {name: "open needs review", pr: planPR("pr1", 1, "sha1"), want: ReviewStateNeedsReview}, + {name: "draft ineligible", pr: withDraft(planPR("pr1", 1, "sha1")), want: ReviewStateIneligible}, + {name: "merged ineligible", pr: withMerged(planPR("pr1", 1, "sha1")), want: ReviewStateIneligible}, + {name: "closed ineligible", pr: withClosed(planPR("pr1", 1, "sha1")), want: ReviewStateIneligible}, + {name: "approved current sha up to date", pr: planPR("pr1", 1, "sha1"), runs: []domain.ReviewRun{ + {ID: "run-1", PRURL: "pr1", TargetSHA: "sha1", Status: domain.ReviewRunComplete, Verdict: domain.VerdictApproved, CreatedAt: now}, + }, want: ReviewStateUpToDate}, + {name: "changes requested current sha", pr: planPR("pr1", 1, "sha1"), runs: []domain.ReviewRun{ + {ID: "run-1", PRURL: "pr1", TargetSHA: "sha1", Status: domain.ReviewRunComplete, Verdict: domain.VerdictChangesRequested, CreatedAt: now}, + }, want: ReviewStateChangesRequested}, + {name: "running current sha", pr: planPR("pr1", 1, "sha1"), runs: []domain.ReviewRun{ + {ID: "run-1", PRURL: "pr1", TargetSHA: "sha1", Status: domain.ReviewRunRunning, CreatedAt: now}, + }, want: ReviewStateRunning}, + {name: "different sha needs review", pr: planPR("pr1", 1, "sha2"), runs: []domain.ReviewRun{ + {ID: "run-1", PRURL: "pr1", TargetSHA: "sha1", Status: domain.ReviewRunComplete, Verdict: domain.VerdictApproved, CreatedAt: now}, + }, want: ReviewStateNeedsReview}, + {name: "failed current sha retryable", pr: planPR("pr1", 1, "sha1"), runs: []domain.ReviewRun{ + {ID: "run-1", PRURL: "pr1", TargetSHA: "sha1", Status: domain.ReviewRunFailed, CreatedAt: now}, + }, want: ReviewStateNeedsReview}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := Plan([]domain.PullRequest{tt.pr}, tt.runs) + if len(got) != 1 { + t.Fatalf("review states = %d, want 1", len(got)) + } + if got[0].Status != tt.want { + t.Fatalf("status = %s, want %s; item=%+v", got[0].Status, tt.want, got[0]) + } + }) + } +} + +func planPR(url string, n int, sha string) domain.PullRequest { + return domain.PullRequest{URL: url, Number: n, HeadSHA: sha} +} + +func withDraft(pr domain.PullRequest) domain.PullRequest { + pr.Draft = true + return pr +} + +func withMerged(pr domain.PullRequest) domain.PullRequest { + pr.Merged = true + return pr +} + +func withClosed(pr domain.PullRequest) domain.PullRequest { + pr.Closed = true + return pr +} diff --git a/backend/internal/review/prompt.go b/backend/internal/review/prompt.go index 09e51307..11f7fab4 100644 --- a/backend/internal/review/prompt.go +++ b/backend/internal/review/prompt.go @@ -1,6 +1,9 @@ package review -import "fmt" +import ( + "fmt" + "strings" +) // reviewTexts returns the user-facing prompt and the system prompt to deliver to // a reviewer, authored in one place — the reviewer analogue of @@ -14,14 +17,18 @@ import "fmt" func reviewTexts(spec LaunchSpec) (prompt, systemPrompt string) { systemPrompt = `## Code reviewer role -You are an AO code reviewer. You review a single pull request's changes in the current checkout — do not start unrelated work. Inspect what the PR changed by diffing the checkout against the PR's base branch, and review for correctness bugs, missing error handling, security issues, test coverage, and clear deviations from the surrounding code's conventions. Prefer a few high-confidence findings over nitpicks. +You are an AO code reviewer. You review the requested pull request changes in the current checkout — do not start unrelated work. Inspect what each PR changed by diffing the checkout against the PR's base branch, and review for correctness bugs, missing error handling, security issues, test coverage, and clear deviations from the surrounding code's conventions. Prefer a few high-confidence findings over nitpicks. Post your review as a comment on the pull request, stating clearly whether it needs changes or is ready, with inline comments for specific findings. Do not push commits, edit files, or modify the branch — review only.` - prompt = fmt.Sprintf(`Review pull request %s (head commit %s). + queueText := reviewQueueText(spec) + prompt = fmt.Sprintf(`Review the requested pull request(s) for worker session %s. +%s + +Complete every review task in the queue autonomously. Do not ask the user whether to continue to the next PR, and do not stop after the first PR unless the provider or checkout is genuinely unusable for every queued task. Do these steps in order: -1. Post your review on the pull request and capture its id in one call. Post with `+"`gh api`"+` rather than `+"`gh pr review`"+`: it is the only way to attach inline comments, and its response carries the created review's id, so AO can tell the worker exactly which review to address. Send the review as a JSON body so the inline comments form a proper array of objects: +1. For each PR below, post a separate review on that pull request and capture its id in one call. Post with `+"`gh api`"+` rather than `+"`gh pr review`"+`: it is the only way to attach inline comments, and its response carries the created review's id, so AO can tell the worker exactly which review to address. Send the review as a JSON body so the inline comments form a proper array of objects: gh api --method POST repos/{owner}/{repo}/pulls/{number}/reviews --input - --jq '.id' <<'JSON' { "event": "COMMENT", "body": "", @@ -31,13 +38,29 @@ Do these steps in order: - Substitute the PR's owner/repo/number. Add one object to "comments" per inline finding; omit the field for a review with no inline comments. - Always use "event": "COMMENT": reviews are posted from the PR author's own account, and GitHub rejects both APPROVE and REQUEST_CHANGES on your own PR. State in the body whether you are requesting changes or approving; the machine-readable verdict goes to AO in step 2. - The printed number is the review id. If the call fails on the provider, leave the id empty. -2. Record the result with AO, passing your full review on stdin with --body - so nothing is ever written into the worktree (a file there could be committed onto the worker's branch): +2. Record all results with AO using one command. Pass JSON on stdin so nothing is ever written into the worktree (a file there could be committed onto the worker's branch). Include one object per PR/run from the queue: - ao review submit --session %s --run %s --verdict --review-id --body - <<'MD' - - MD + ao review submit --session %s --reviews - <<'JSON' + { + "reviews": [ + { "runId": "", "verdict": "", "githubReviewId": "", "body": "" } + ] + } + JSON -Only if step 1 genuinely fails on the provider, still run step 2 (without --review-id) so the result is recorded.`, - spec.PRURL, spec.TargetSHA, spec.WorkerID, spec.RunID) +Only if step 1 genuinely fails on the provider for a PR, still include that run in step 2 with an empty githubReviewId so the result is recorded.`, + spec.WorkerID, queueText, spec.WorkerID) return prompt, systemPrompt } + +func reviewQueueText(spec LaunchSpec) string { + if len(spec.ReviewQueue) <= 1 { + return fmt.Sprintf("\nReview task queue:\n* 1. %s (head commit %s, run %s)\n", spec.PRURL, spec.TargetSHA, spec.RunID) + } + var b strings.Builder + fmt.Fprintf(&b, "\nAO created %d review tasks for this worker session. Review every queued PR, then submit all results together.\n\nReview task queue:\n", len(spec.ReviewQueue)) + for i, task := range spec.ReviewQueue { + fmt.Fprintf(&b, "* %d. %s (head commit %s, run %s)\n", i+1, task.PRURL, task.TargetSHA, task.RunID) + } + return b.String() +} diff --git a/backend/internal/review/prompt_test.go b/backend/internal/review/prompt_test.go new file mode 100644 index 00000000..49260179 --- /dev/null +++ b/backend/internal/review/prompt_test.go @@ -0,0 +1,36 @@ +package review + +import ( + "strings" + "testing" + + "github.com/aoagents/agent-orchestrator/backend/internal/ports" +) + +func TestReviewTextsIncludesMultiPRQueue(t *testing.T) { + spec := launchSpec() + spec.RunID = "run-2" + spec.PRURL = "https://github.com/o/r/pull/2" + spec.TargetSHA = "sha2" + spec.ReviewIndex = 1 + spec.ReviewQueue = []ports.ReviewTask{ + {RunID: "run-1", PRURL: "https://github.com/o/r/pull/1", TargetSHA: "sha1"}, + {RunID: "run-2", PRURL: "https://github.com/o/r/pull/2", TargetSHA: "sha2"}, + } + + prompt, _ := reviewTexts(spec) + for _, want := range []string{ + "AO created 2 review tasks", + "Review every queued PR, then submit all results together", + "Complete every review task in the queue autonomously", + "Do not ask the user whether to continue to the next PR", + "* 1. https://github.com/o/r/pull/1 (head commit sha1, run run-1)", + "* 2. https://github.com/o/r/pull/2 (head commit sha2, run run-2)", + "ao review submit --session mer-1 --reviews -", + `"reviews": [`, + } { + if !strings.Contains(prompt, want) { + t.Fatalf("prompt missing %q:\n%s", want, prompt) + } + } +} diff --git a/backend/internal/review/review.go b/backend/internal/review/review.go index 1b981640..46d47b67 100644 --- a/backend/internal/review/review.go +++ b/backend/internal/review/review.go @@ -18,6 +18,7 @@ import ( "github.com/google/uuid" "github.com/aoagents/agent-orchestrator/backend/internal/domain" + "github.com/aoagents/agent-orchestrator/backend/internal/ports" ) // ErrInvalid and ErrNotFound let the transport layer map failures to 422/404. @@ -34,9 +35,9 @@ type Store interface { InsertReviewRun(ctx stdctx.Context, r domain.ReviewRun) error UpdateReviewRunResult(ctx stdctx.Context, id string, status domain.ReviewRunStatus, verdict domain.ReviewVerdict, body, githubReviewID string) (bool, error) SupersedeReviewRun(ctx stdctx.Context, id, body string) (bool, error) - SupersedeStaleRunningReviewRuns(ctx stdctx.Context, sessionID domain.SessionID, targetSHA, body string) (int64, error) + SupersedeStaleRunningReviewRuns(ctx stdctx.Context, sessionID domain.SessionID, prURL, targetSHA, body string) (int64, error) GetReviewRun(ctx stdctx.Context, id string) (domain.ReviewRun, bool, error) - GetReviewRunBySessionAndSHA(ctx stdctx.Context, id domain.SessionID, targetSHA string) (domain.ReviewRun, bool, error) + GetReviewRunBySessionPRAndSHA(ctx stdctx.Context, id domain.SessionID, prURL, targetSHA string) (domain.ReviewRun, bool, error) ListReviewRunsBySession(ctx stdctx.Context, id domain.SessionID) ([]domain.ReviewRun, error) } @@ -134,6 +135,8 @@ type TriggerResult struct { Run domain.ReviewRun ReviewerHandleID string Created bool + Reviews []PRReviewState + CreatedRuns []domain.ReviewRun } // SessionReviews is a worker's review state: the live reviewer handle plus its @@ -141,14 +144,12 @@ type TriggerResult struct { type SessionReviews struct { ReviewerHandleID string Runs []domain.ReviewRun + Reviews []PRReviewState } -// Trigger starts (or reuses) a review of a worker's PR at its current head: -// - if a non-failed run already exists for this commit, it is returned unchanged; -// - otherwise, if a live reviewer pane exists, it is messaged to review the -// new commit; if not, a fresh reviewer is spawned; -// - the run is recorded before launch so startup failures leave a visible -// failed pass instead of an empty gap. +// Trigger starts reviews for every PR on the worker session that needs review. +// It reuses running/up-to-date runs, retries failed/current changes-requested +// heads, and uses one reviewer pane for every new run in the batch. func (e *Engine) Trigger(ctx stdctx.Context, workerID domain.SessionID) (TriggerResult, error) { if workerID == "" { return TriggerResult{}, fmt.Errorf("%w: worker session id is required", ErrInvalid) @@ -175,39 +176,21 @@ func (e *Engine) Trigger(ctx stdctx.Context, workerID domain.SessionID) (Trigger return TriggerResult{}, fmt.Errorf("%w: worker session %q has no workspace to review", ErrInvalid, workerID) } - pr, err := e.workerPR(ctx, workerID) + prs, err := e.prs.ListPRsBySession(ctx, workerID) if err != nil { return TriggerResult{}, err } - targetSHA := pr.HeadSHA - - review, hasReview, err := e.store.GetReviewBySession(ctx, workerID) + if len(prs) == 0 { + return TriggerResult{}, fmt.Errorf("%w: worker %q has no PR to review", ErrInvalid, workerID) + } + runs, err := e.store.ListReviewRunsBySession(ctx, workerID) if err != nil { return TriggerResult{}, err } + reviews := Plan(prs, runs) - // Idempotency: a pass for this commit is reusable while it is still running - // or once it carries a verdict. The fallback branch below is defensive for - // any non-running, non-failed row that somehow lacks a verdict; normal - // Submit paths complete a run only with a valid verdict (#342). - if existing, ok, err := e.store.GetReviewRunBySessionAndSHA(ctx, workerID, targetSHA); err != nil { - return TriggerResult{}, err - } else if ok && (existing.Status == domain.ReviewRunRunning || existing.Verdict != domain.VerdictNone) { - return TriggerResult{Run: existing, ReviewerHandleID: review.ReviewerHandleID, Created: false}, nil - } else if ok && existing.Status != domain.ReviewRunFailed { - superseded, err := e.store.SupersedeReviewRun(ctx, existing.ID, "superseded by a new review trigger") - if err != nil { - return TriggerResult{}, err - } - if !superseded { - if latest, ok, err := e.store.GetReviewRun(ctx, existing.ID); err != nil { - return TriggerResult{}, err - } else if ok { - return TriggerResult{Run: latest, ReviewerHandleID: review.ReviewerHandleID, Created: false}, nil - } - } - } - if _, err := e.store.SupersedeStaleRunningReviewRuns(ctx, workerID, targetSHA, "superseded by a review trigger for a newer commit"); err != nil { + reviewRow, hasReview, err := e.store.GetReviewBySession(ctx, workerID) + if err != nil { return TriggerResult{}, err } @@ -217,78 +200,156 @@ func (e *Engine) Trigger(ctx stdctx.Context, workerID domain.SessionID) (Trigger } now := e.clock() - runID := e.newID() - spec := LaunchSpec{ - RunID: runID, - WorkerID: workerID, - Harness: harness, - WorkspacePath: worker.Metadata.WorkspacePath, - PRURL: pr.URL, - TargetSHA: targetSHA, - } - - review, err = e.upsertReview(ctx, worker, harness, pr.URL, review.ReviewerHandleID, now) + reviewRow, err = e.upsertReview(ctx, worker, harness, reviewRow.ReviewerHandleID, now) if err != nil { return TriggerResult{}, err } - run := domain.ReviewRun{ - ID: runID, - ReviewID: review.ID, - SessionID: workerID, - Harness: harness, - PRURL: pr.URL, - TargetSHA: targetSHA, - Status: domain.ReviewRunRunning, - Verdict: domain.VerdictNone, - CreatedAt: now, - } - if err := e.store.InsertReviewRun(ctx, run); err != nil { - if errors.Is(err, domain.ErrDuplicateReviewRun) { - if existing, ok, getErr := e.store.GetReviewRunBySessionAndSHA(ctx, workerID, targetSHA); getErr != nil { - return TriggerResult{}, getErr - } else if ok { - return TriggerResult{Run: existing, ReviewerHandleID: review.ReviewerHandleID, Created: false}, nil + + var created []domain.ReviewRun + batchID := "" + for _, reviewState := range reviews { + if reviewState.Status != ReviewStateNeedsReview && reviewState.Status != ReviewStateChangesRequested { + continue + } + if reviewState.LatestRun != nil && reviewState.LatestRun.Status != domain.ReviewRunFailed && reviewState.LatestRun.Status != domain.ReviewRunRunning && reviewState.LatestRun.Verdict == domain.VerdictNone { + superseded, err := e.store.SupersedeReviewRun(ctx, reviewState.LatestRun.ID, "superseded by a new review trigger") + if err != nil { + return TriggerResult{}, err + } + if !superseded { + if latest, ok, err := e.store.GetReviewRun(ctx, reviewState.LatestRun.ID); err != nil { + return TriggerResult{}, err + } else if ok { + reviews = replaceReviewLatestRun(reviews, reviewState.PRURL, reviewState.TargetSHA, latest) + continue + } } } - return TriggerResult{}, err + if _, err := e.store.SupersedeStaleRunningReviewRuns(ctx, workerID, reviewState.PRURL, reviewState.TargetSHA, "superseded by a review trigger for a newer commit"); err != nil { + return TriggerResult{}, err + } + if batchID == "" { + batchID = e.newID() + } + run := domain.ReviewRun{ + ID: e.newID(), + ReviewID: reviewRow.ID, + SessionID: workerID, + BatchID: batchID, + Harness: harness, + PRURL: reviewState.PRURL, + TargetSHA: reviewState.TargetSHA, + Status: domain.ReviewRunRunning, + Verdict: domain.VerdictNone, + CreatedAt: now, + } + if err := e.store.InsertReviewRun(ctx, run); err != nil { + if errors.Is(err, domain.ErrDuplicateReviewRun) { + if existing, ok, getErr := e.store.GetReviewRunBySessionPRAndSHA(ctx, workerID, reviewState.PRURL, reviewState.TargetSHA); getErr != nil { + return TriggerResult{}, getErr + } else if ok { + reviews = replaceReviewLatestRun(reviews, reviewState.PRURL, reviewState.TargetSHA, existing) + continue + } + } + return TriggerResult{}, err + } + created = append(created, run) + reviews = replaceReviewLatestRun(reviews, reviewState.PRURL, reviewState.TargetSHA, run) + } + if len(created) == 0 { + return TriggerResult{Run: firstReusableRun(reviews), ReviewerHandleID: reviewRow.ReviewerHandleID, Created: false, Reviews: reviews}, nil } - failRun := func(err error) error { - if _, updateErr := e.store.UpdateReviewRunResult(ctx, run.ID, domain.ReviewRunFailed, domain.VerdictNone, err.Error(), ""); updateErr != nil { - return updateErr + failRuns := func(start int, err error) error { + for _, run := range created[start:] { + if _, updateErr := e.store.UpdateReviewRunResult(ctx, run.ID, domain.ReviewRunFailed, domain.VerdictNone, err.Error(), ""); updateErr != nil { + return updateErr + } } return err } - // Reuse a live reviewer pane if there is one; otherwise spawn a fresh one. handleID := "" - if hasReview && review.ReviewerHandleID != "" { - alive, err := e.launcher.Alive(ctx, review.ReviewerHandleID) + queue := reviewQueue(created) + if hasReview && reviewRow.ReviewerHandleID != "" { + alive, err := e.launcher.Alive(ctx, reviewRow.ReviewerHandleID) if err != nil { - return TriggerResult{}, failRun(err) + return TriggerResult{}, failRuns(0, err) } if alive { - if err := e.launcher.Notify(ctx, review.ReviewerHandleID, spec); err != nil { - return TriggerResult{}, failRun(fmt.Errorf("notify reviewer: %w", err)) - } - handleID = review.ReviewerHandleID + handleID = reviewRow.ReviewerHandleID } } if handleID == "" { - h, err := e.launcher.Spawn(ctx, spec) + h, err := e.launcher.Spawn(ctx, reviewLaunchSpec(worker, harness, created[0], queue, 0)) if err != nil { - return TriggerResult{}, failRun(fmt.Errorf("launch reviewer: %w", err)) + return TriggerResult{}, failRuns(0, fmt.Errorf("launch reviewer: %w", err)) } handleID = h + } else { + if err := e.launcher.Notify(ctx, handleID, reviewLaunchSpec(worker, harness, created[0], queue, 0)); err != nil { + return TriggerResult{}, failRuns(0, fmt.Errorf("notify reviewer: %w", err)) + } } - - // The reviewer is running; now record the pass. - review, err = e.upsertReview(ctx, worker, harness, pr.URL, handleID, now) + reviewRow, err = e.upsertReview(ctx, worker, harness, handleID, now) if err != nil { return TriggerResult{}, err } - run.ReviewID = review.ID - return TriggerResult{Run: run, ReviewerHandleID: handleID, Created: true}, nil + for i := range created { + created[i].ReviewID = reviewRow.ID + } + return TriggerResult{Run: created[0], ReviewerHandleID: handleID, Created: true, Reviews: reviews, CreatedRuns: created}, nil +} + +func reviewLaunchSpec(worker domain.SessionRecord, harness domain.ReviewerHarness, run domain.ReviewRun, queue []ports.ReviewTask, index int) LaunchSpec { + return LaunchSpec{ + RunID: run.ID, + WorkerID: worker.ID, + Harness: harness, + WorkspacePath: worker.Metadata.WorkspacePath, + PRURL: run.PRURL, + TargetSHA: run.TargetSHA, + ReviewQueue: queue, + ReviewIndex: index, + } +} + +func reviewQueue(runs []domain.ReviewRun) []ports.ReviewTask { + queue := make([]ports.ReviewTask, 0, len(runs)) + for _, run := range runs { + queue = append(queue, ports.ReviewTask{ + RunID: run.ID, + PRURL: run.PRURL, + TargetSHA: run.TargetSHA, + }) + } + return queue +} + +func replaceReviewLatestRun(reviews []PRReviewState, prURL, targetSHA string, run domain.ReviewRun) []PRReviewState { + for i := range reviews { + if reviews[i].PRURL == prURL && reviews[i].TargetSHA == targetSHA { + reviews[i].LatestRun = &run + if run.Status == domain.ReviewRunRunning { + reviews[i].Status = ReviewStateRunning + } + break + } + } + return reviews +} + +func firstReusableRun(reviews []PRReviewState) domain.ReviewRun { + // Legacy compatibility only: in the multi-PR model the authoritative state + // is Reviews. When no run is created, this field is just a best-effort + // non-empty run for older clients. + for _, review := range reviews { + if review.LatestRun != nil { + return *review.LatestRun + } + } + return domain.ReviewRun{} } // List returns a worker's review state: the live reviewer handle and its passes. @@ -306,18 +367,11 @@ func (e *Engine) List(ctx stdctx.Context, workerID domain.SessionID) (SessionRev } else if ok { handle = review.ReviewerHandleID } - return SessionReviews{ReviewerHandleID: handle, Runs: runs}, nil -} - -func (e *Engine) workerPR(ctx stdctx.Context, workerID domain.SessionID) (domain.PullRequest, error) { prs, err := e.prs.ListPRsBySession(ctx, workerID) if err != nil { - return domain.PullRequest{}, err - } - if len(prs) == 0 { - return domain.PullRequest{}, fmt.Errorf("%w: worker %q has no PR to review", ErrInvalid, workerID) + return SessionReviews{}, err } - return prs[0], nil + return SessionReviews{ReviewerHandleID: handle, Runs: runs, Reviews: Plan(prs, runs)}, nil } // reviewerHarness resolves which harness reviews the worker's PR: a configured @@ -335,7 +389,7 @@ func (e *Engine) reviewerHarness(ctx stdctx.Context, worker domain.SessionRecord return cfg.ResolveReviewerHarness(worker.Harness), nil } -func (e *Engine) upsertReview(ctx stdctx.Context, worker domain.SessionRecord, harness domain.ReviewerHarness, prURL, handleID string, now time.Time) (domain.Review, error) { +func (e *Engine) upsertReview(ctx stdctx.Context, worker domain.SessionRecord, harness domain.ReviewerHarness, handleID string, now time.Time) (domain.Review, error) { existing, ok, err := e.store.GetReviewBySession(ctx, worker.ID) if err != nil { return domain.Review{}, err @@ -345,7 +399,7 @@ func (e *Engine) upsertReview(ctx stdctx.Context, worker domain.SessionRecord, h SessionID: worker.ID, ProjectID: worker.ProjectID, Harness: harness, - PRURL: prURL, + PRURL: "", ReviewerHandleID: handleID, CreatedAt: now, UpdatedAt: now, diff --git a/backend/internal/review/review_test.go b/backend/internal/review/review_test.go index 003b19b5..fb57edd8 100644 --- a/backend/internal/review/review_test.go +++ b/backend/internal/review/review_test.go @@ -74,10 +74,10 @@ func (f *fakeStore) SupersedeReviewRun(_ context.Context, id, body string) (bool } return false, nil } -func (f *fakeStore) SupersedeStaleRunningReviewRuns(_ context.Context, sessionID domain.SessionID, targetSHA, body string) (int64, error) { +func (f *fakeStore) SupersedeStaleRunningReviewRuns(_ context.Context, sessionID domain.SessionID, prURL, targetSHA, body string) (int64, error) { var n int64 for i := range f.runs { - if f.runs[i].SessionID == sessionID && f.runs[i].TargetSHA != targetSHA && f.runs[i].Status == domain.ReviewRunRunning && f.runs[i].Verdict == domain.VerdictNone { + if f.runs[i].SessionID == sessionID && f.runs[i].PRURL == prURL && f.runs[i].TargetSHA != targetSHA && f.runs[i].Status == domain.ReviewRunRunning && f.runs[i].Verdict == domain.VerdictNone { f.runs[i].Status = domain.ReviewRunFailed f.runs[i].Body = body n++ @@ -93,9 +93,9 @@ func (f *fakeStore) GetReviewRun(_ context.Context, id string) (domain.ReviewRun } return domain.ReviewRun{}, false, nil } -func (f *fakeStore) GetReviewRunBySessionAndSHA(_ context.Context, _ domain.SessionID, sha string) (domain.ReviewRun, bool, error) { +func (f *fakeStore) GetReviewRunBySessionPRAndSHA(_ context.Context, _ domain.SessionID, prURL, sha string) (domain.ReviewRun, bool, error) { for i := len(f.runs) - 1; i >= 0; i-- { - if f.runs[i].TargetSHA == sha { + if f.runs[i].PRURL == prURL && f.runs[i].TargetSHA == sha { return f.runs[i], true, nil } } @@ -136,12 +136,15 @@ type fakeLauncher struct { notified bool gotSpec LaunchSpec gotHandle string + specs []LaunchSpec + handles []string } func (f *fakeLauncher) Spawn(_ context.Context, spec LaunchSpec) (string, error) { f.spawned = true f.spawnCount++ f.gotSpec = spec + f.specs = append(f.specs, spec) if f.spawnErr != nil { return "", f.spawnErr } @@ -151,6 +154,8 @@ func (f *fakeLauncher) Notify(_ context.Context, handleID string, spec LaunchSpe f.notified = true f.gotHandle = handleID f.gotSpec = spec + f.handles = append(f.handles, handleID) + f.specs = append(f.specs, spec) return f.notifyErr } func (f *fakeLauncher) Alive(_ context.Context, _ string) (bool, error) { @@ -176,7 +181,7 @@ func newEngineForTest(store Store, sessions Sessions, prs PRs, projects Projects } func prAt(sha string) fakePRs { - return fakePRs{prs: []domain.PullRequest{{URL: "https://github.com/o/r/pull/1", HeadSHA: sha}}} + return fakePRs{prs: []domain.PullRequest{{URL: "https://github.com/o/r/pull/1", Number: 1, HeadSHA: sha}}} } // --- tests --- @@ -259,7 +264,7 @@ func TestTriggerFallsBackToExistingRunOnUniqueConflict(t *testing.T) { if res.Created { t.Fatalf("expected Created=false on unique conflict: %+v", res) } - if res.Run.TargetSHA != "sha1" || res.Run.ID != "winner-id-1" { + if res.Run.TargetSHA != "sha1" || !strings.HasPrefix(res.Run.ID, "winner-") { t.Fatalf("expected the recorded winner run, got %+v", res.Run) } if launcher.spawnCount != 0 { @@ -271,7 +276,7 @@ func TestTriggerIsIdempotentForSameCommit(t *testing.T) { store := &fakeStore{ review: &domain.Review{ID: "rev-1", SessionID: "mer-1", ReviewerHandleID: "review-mer-1"}, runs: []domain.ReviewRun{{ - ID: "run-1", SessionID: "mer-1", TargetSHA: "sha1", + ID: "run-1", SessionID: "mer-1", PRURL: "https://github.com/o/r/pull/1", TargetSHA: "sha1", Status: domain.ReviewRunComplete, Verdict: domain.VerdictApproved, }}, } @@ -296,7 +301,7 @@ func TestTriggerIsIdempotentForSameCommit(t *testing.T) { func TestTriggerReusesRunningRowWithNoVerdict(t *testing.T) { store := &fakeStore{ review: &domain.Review{ID: "rev-1", SessionID: "mer-1", ReviewerHandleID: "review-mer-1"}, - runs: []domain.ReviewRun{{ID: "run-1", SessionID: "mer-1", TargetSHA: "sha1", Status: domain.ReviewRunRunning}}, + runs: []domain.ReviewRun{{ID: "run-1", SessionID: "mer-1", PRURL: "https://github.com/o/r/pull/1", TargetSHA: "sha1", Status: domain.ReviewRunRunning}}, } launcher := &fakeLauncher{alive: false, handle: "review-mer-2"} eng := newEngineForTest(store, fakeSessions{rec: liveWorker(), ok: true}, prAt("sha1"), fakeProjects{}, launcher) @@ -319,7 +324,7 @@ func TestTriggerReusesRunningRowWithNoVerdict(t *testing.T) { func TestTriggerSupersedesNonRunningRowWithNoVerdict(t *testing.T) { store := &fakeStore{ review: &domain.Review{ID: "rev-1", SessionID: "mer-1", ReviewerHandleID: "review-mer-1"}, - runs: []domain.ReviewRun{{ID: "run-1", SessionID: "mer-1", TargetSHA: "sha1", Status: domain.ReviewRunComplete}}, + runs: []domain.ReviewRun{{ID: "run-1", SessionID: "mer-1", PRURL: "https://github.com/o/r/pull/1", TargetSHA: "sha1", Status: domain.ReviewRunComplete}}, } launcher := &fakeLauncher{alive: true, handle: "review-mer-1"} eng := newEngineForTest(store, fakeSessions{rec: liveWorker(), ok: true}, prAt("sha1"), fakeProjects{}, launcher) @@ -342,7 +347,7 @@ func TestTriggerSupersedesNonRunningRowWithNoVerdict(t *testing.T) { func TestTriggerNotifiesLiveReviewerOnNewCommit(t *testing.T) { store := &fakeStore{ review: &domain.Review{ID: "rev-1", SessionID: "mer-1", ReviewerHandleID: "review-mer-1"}, - runs: []domain.ReviewRun{{ID: "run-0", SessionID: "mer-1", TargetSHA: "sha0", Status: domain.ReviewRunComplete}}, + runs: []domain.ReviewRun{{ID: "run-0", SessionID: "mer-1", PRURL: "https://github.com/o/r/pull/1", TargetSHA: "sha0", Status: domain.ReviewRunComplete}}, } launcher := &fakeLauncher{alive: true} eng := newEngineForTest(store, fakeSessions{rec: liveWorker(), ok: true}, prAt("sha1"), fakeProjects{}, launcher) @@ -365,7 +370,7 @@ func TestTriggerNotifiesLiveReviewerOnNewCommit(t *testing.T) { func TestTriggerSupersedesOlderRunningRunOnNewCommit(t *testing.T) { store := &fakeStore{ review: &domain.Review{ID: "rev-1", SessionID: "mer-1", ReviewerHandleID: "review-mer-1"}, - runs: []domain.ReviewRun{{ID: "run-old", SessionID: "mer-1", TargetSHA: "sha0", Status: domain.ReviewRunRunning}}, + runs: []domain.ReviewRun{{ID: "run-old", SessionID: "mer-1", PRURL: "https://github.com/o/r/pull/1", TargetSHA: "sha0", Status: domain.ReviewRunRunning}}, } launcher := &fakeLauncher{alive: true, handle: "review-mer-1"} eng := newEngineForTest(store, fakeSessions{rec: liveWorker(), ok: true}, prAt("sha1"), fakeProjects{}, launcher) @@ -388,7 +393,7 @@ func TestTriggerSupersedesOlderRunningRunOnNewCommit(t *testing.T) { func TestTriggerSpawnsWhenReviewerDead(t *testing.T) { store := &fakeStore{ review: &domain.Review{ID: "rev-1", SessionID: "mer-1", ReviewerHandleID: "review-mer-1"}, - runs: []domain.ReviewRun{{ID: "run-0", SessionID: "mer-1", TargetSHA: "sha0", Status: domain.ReviewRunComplete}}, + runs: []domain.ReviewRun{{ID: "run-0", SessionID: "mer-1", PRURL: "https://github.com/o/r/pull/1", TargetSHA: "sha0", Status: domain.ReviewRunComplete}}, } launcher := &fakeLauncher{alive: false, handle: "review-mer-1"} eng := newEngineForTest(store, fakeSessions{rec: liveWorker(), ok: true}, prAt("sha1"), fakeProjects{}, launcher) @@ -424,7 +429,7 @@ func TestTriggerLaunchFailureRecordsFailedRun(t *testing.T) { func TestTriggerRetriesAfterFailedRunForSameCommit(t *testing.T) { store := &fakeStore{ review: &domain.Review{ID: "rev-1", SessionID: "mer-1", ReviewerHandleID: "review-mer-1"}, - runs: []domain.ReviewRun{{ID: "run-failed", ReviewID: "rev-1", SessionID: "mer-1", TargetSHA: "sha1", Status: domain.ReviewRunFailed}}, + runs: []domain.ReviewRun{{ID: "run-failed", ReviewID: "rev-1", SessionID: "mer-1", PRURL: "https://github.com/o/r/pull/1", TargetSHA: "sha1", Status: domain.ReviewRunFailed}}, } launcher := &fakeLauncher{handle: "review-mer-1"} eng := newEngineForTest(store, fakeSessions{rec: liveWorker(), ok: true}, prAt("sha1"), fakeProjects{}, launcher) @@ -441,6 +446,111 @@ func TestTriggerRetriesAfterFailedRunForSameCommit(t *testing.T) { } } +func TestTriggerCreatesRunsForMultipleEligiblePRsWithOneReviewer(t *testing.T) { + store := &fakeStore{} + launcher := &fakeLauncher{handle: "review-mer-1"} + prs := fakePRs{prs: []domain.PullRequest{ + {URL: "https://github.com/o/r/pull/1", Number: 1, HeadSHA: "sha1"}, + {URL: "https://github.com/o/r/pull/2", Number: 2, HeadSHA: "sha2"}, + }} + eng := newEngineForTest(store, fakeSessions{rec: liveWorker(), ok: true}, prs, fakeProjects{}, launcher) + + res, err := eng.Trigger(context.Background(), "mer-1") + if err != nil { + t.Fatalf("Trigger: %v", err) + } + if !res.Created || len(res.CreatedRuns) != 2 || len(store.runs) != 2 { + t.Fatalf("created batch = %+v runs=%+v", res, store.runs) + } + if res.CreatedRuns[0].BatchID == "" || res.CreatedRuns[0].BatchID != res.CreatedRuns[1].BatchID { + t.Fatalf("created runs should share one batch id: %+v", res.CreatedRuns) + } + if launcher.spawnCount != 1 || len(launcher.handles) != 0 { + t.Fatalf("expected one spawn and no extra notify, launcher=%+v", launcher) + } + if len(launcher.specs) != 1 { + t.Fatalf("launch specs = %d, want 1: %+v", len(launcher.specs), launcher.specs) + } + spec := launcher.specs[0] + if spec.ReviewIndex != 0 || len(spec.ReviewQueue) != 2 { + t.Fatalf("spec queue context = index %d queue %+v", spec.ReviewIndex, spec.ReviewQueue) + } + if spec.ReviewQueue[0].PRURL != "https://github.com/o/r/pull/1" || spec.ReviewQueue[1].PRURL != "https://github.com/o/r/pull/2" { + t.Fatalf("spec queue URLs = %+v", spec.ReviewQueue) + } + if store.review == nil || store.review.ReviewerHandleID != "review-mer-1" || store.review.PRURL != "" { + t.Fatalf("review row = %+v, want shared handle and no behavioral pr_url", store.review) + } +} + +func TestTriggerAllowsTwoPRsWithSameHeadSHA(t *testing.T) { + store := &fakeStore{} + launcher := &fakeLauncher{handle: "review-mer-1"} + prs := fakePRs{prs: []domain.PullRequest{ + {URL: "https://github.com/o/r/pull/1", Number: 1, HeadSHA: "same"}, + {URL: "https://github.com/o/r/pull/2", Number: 2, HeadSHA: "same"}, + }} + eng := newEngineForTest(store, fakeSessions{rec: liveWorker(), ok: true}, prs, fakeProjects{}, launcher) + + res, err := eng.Trigger(context.Background(), "mer-1") + if err != nil { + t.Fatalf("Trigger: %v", err) + } + if len(res.CreatedRuns) != 2 { + t.Fatalf("created runs = %d, want 2: %+v", len(res.CreatedRuns), res.CreatedRuns) + } + if store.runs[0].PRURL == store.runs[1].PRURL || store.runs[0].TargetSHA != store.runs[1].TargetSHA { + t.Fatalf("runs should differ by PR only: %+v", store.runs) + } +} + +func TestTriggerSkipsApprovedAndRunningCurrentHead(t *testing.T) { + store := &fakeStore{ + review: &domain.Review{ID: "rev-1", SessionID: "mer-1", ReviewerHandleID: "review-mer-1"}, + runs: []domain.ReviewRun{ + {ID: "approved", ReviewID: "rev-1", SessionID: "mer-1", PRURL: "https://github.com/o/r/pull/1", TargetSHA: "sha1", Status: domain.ReviewRunComplete, Verdict: domain.VerdictApproved, CreatedAt: time.Unix(1, 0)}, + {ID: "running", ReviewID: "rev-1", SessionID: "mer-1", PRURL: "https://github.com/o/r/pull/2", TargetSHA: "sha2", Status: domain.ReviewRunRunning, CreatedAt: time.Unix(2, 0)}, + }, + } + launcher := &fakeLauncher{alive: true} + prs := fakePRs{prs: []domain.PullRequest{ + {URL: "https://github.com/o/r/pull/1", Number: 1, HeadSHA: "sha1"}, + {URL: "https://github.com/o/r/pull/2", Number: 2, HeadSHA: "sha2"}, + }} + eng := newEngineForTest(store, fakeSessions{rec: liveWorker(), ok: true}, prs, fakeProjects{}, launcher) + + res, err := eng.Trigger(context.Background(), "mer-1") + if err != nil { + t.Fatalf("Trigger: %v", err) + } + if res.Created || len(res.CreatedRuns) != 0 || launcher.spawned || launcher.notified { + t.Fatalf("expected no new work: res=%+v launcher=%+v", res, launcher) + } + if len(res.Reviews) != 2 || res.Reviews[0].Status != ReviewStateUpToDate || res.Reviews[1].Status != ReviewStateRunning { + t.Fatalf("review states = %+v", res.Reviews) + } +} + +func TestTriggerCreatesRunForChangesRequestedCurrentHead(t *testing.T) { + store := &fakeStore{ + review: &domain.Review{ID: "rev-1", SessionID: "mer-1", ReviewerHandleID: "review-mer-1"}, + runs: []domain.ReviewRun{{ + ID: "changes", ReviewID: "rev-1", SessionID: "mer-1", PRURL: "https://github.com/o/r/pull/1", TargetSHA: "sha1", + Status: domain.ReviewRunComplete, Verdict: domain.VerdictChangesRequested, CreatedAt: time.Unix(1, 0), + }}, + } + launcher := &fakeLauncher{alive: true, handle: "review-mer-1"} + eng := newEngineForTest(store, fakeSessions{rec: liveWorker(), ok: true}, prAt("sha1"), fakeProjects{}, launcher) + + res, err := eng.Trigger(context.Background(), "mer-1") + if err != nil { + t.Fatalf("Trigger: %v", err) + } + if !res.Created || len(res.CreatedRuns) != 1 || !launcher.notified || launcher.spawned { + t.Fatalf("expected rerun on changes_requested current head: res=%+v launcher=%+v", res, launcher) + } +} + func TestTriggerUsesConfiguredReviewerHarness(t *testing.T) { store := &fakeStore{} projects := fakeProjects{cfg: domain.ProjectConfig{Reviewers: []domain.ReviewerConfig{{Harness: domain.ReviewerHarness("greptile")}}}} @@ -474,7 +584,7 @@ func TestTriggerRejectsBadWorkerState(t *testing.T) { func TestListReturnsHandleAndRuns(t *testing.T) { store := &fakeStore{ review: &domain.Review{ID: "rev-1", SessionID: "mer-1", ReviewerHandleID: "review-mer-1"}, - runs: []domain.ReviewRun{{ID: "run-1", SessionID: "mer-1", TargetSHA: "sha1"}}, + runs: []domain.ReviewRun{{ID: "run-1", SessionID: "mer-1", PRURL: "https://github.com/o/r/pull/1", TargetSHA: "sha1"}}, } eng := newEngineForTest(store, fakeSessions{rec: liveWorker(), ok: true}, prAt("sha1"), fakeProjects{}, &fakeLauncher{}) got, err := eng.List(context.Background(), "mer-1") diff --git a/backend/internal/service/review/review.go b/backend/internal/service/review/review.go index be23d3a8..61f09e3b 100644 --- a/backend/internal/service/review/review.go +++ b/backend/internal/service/review/review.go @@ -27,6 +27,7 @@ var ( type Manager interface { Trigger(ctx context.Context, workerID domain.SessionID) (reviewcore.TriggerResult, error) Submit(ctx context.Context, workerID domain.SessionID, runID string, verdict domain.ReviewVerdict, body, githubReviewID string) (domain.ReviewRun, error) + SubmitMany(ctx context.Context, workerID domain.SessionID, reviews []SubmittedReview) ([]domain.ReviewRun, error) List(ctx context.Context, workerID domain.SessionID) (reviewcore.SessionReviews, error) } @@ -45,12 +46,14 @@ type Store interface { GetReviewRun(ctx context.Context, id string) (domain.ReviewRun, bool, error) UpdateReviewRunResult(ctx context.Context, id string, status domain.ReviewRunStatus, verdict domain.ReviewVerdict, body, githubReviewID string) (bool, error) MarkReviewRunDelivered(ctx context.Context, id string, deliveredAt time.Time) (bool, error) + ListPRsBySession(ctx context.Context, id domain.SessionID) ([]domain.PullRequest, error) } // Reducer is the lifecycle reaction boundary used after a review result has // been persisted. type Reducer interface { ApplyReviewResult(ctx context.Context, workerID domain.SessionID, result lifecycle.ReviewResult) (lifecycle.ReviewDeliveryOutcome, error) + ApplyReviewBatch(ctx context.Context, workerID domain.SessionID, batchID string, results []lifecycle.ReviewResult) (lifecycle.ReviewDeliveryOutcome, error) } // Option customizes the review service. @@ -68,7 +71,11 @@ func WithClock(clock func() time.Time) Option { // New wraps a core review engine as the API-facing service. func New(engine *reviewcore.Engine, store Store, opts ...Option) *Service { - s := &Service{engine: engine, store: store, clock: func() time.Time { return time.Now().UTC() }} + s := &Service{ + engine: engine, + store: store, + clock: func() time.Time { return time.Now().UTC() }, + } for _, opt := range opts { opt(s) } @@ -80,11 +87,76 @@ func (s *Service) Trigger(ctx context.Context, workerID domain.SessionID) (revie return s.engine.Trigger(ctx, workerID) } +// SubmittedReview is one review result supplied by the reviewer CLI. +type SubmittedReview struct { + RunID string + Verdict domain.ReviewVerdict + Body string + GithubReviewID string +} + // Submit records a reviewer's result for a specific worker review pass. func (s *Service) Submit(ctx context.Context, workerID domain.SessionID, runID string, verdict domain.ReviewVerdict, body, githubReviewID string) (domain.ReviewRun, error) { + runs, err := s.SubmitMany(ctx, workerID, []SubmittedReview{{ + RunID: runID, + Verdict: verdict, + Body: body, + GithubReviewID: githubReviewID, + }}) + if err != nil { + return domain.ReviewRun{}, err + } + if len(runs) == 0 { + return domain.ReviewRun{}, fmt.Errorf("%w: no review result submitted", ErrInvalid) + } + return runs[0], nil +} + +// SubmitMany records one reviewer CLI submission containing results for one or +// more PR-scoped runs. Delivery is scoped to the runs in this submission, so a +// missing/stuck result for another PR in the same trigger cannot block feedback. +func (s *Service) SubmitMany(ctx context.Context, workerID domain.SessionID, reviews []SubmittedReview) ([]domain.ReviewRun, error) { if workerID == "" { - return domain.ReviewRun{}, fmt.Errorf("%w: worker session id is required", ErrInvalid) + return nil, fmt.Errorf("%w: worker session id is required", ErrInvalid) + } + if len(reviews) == 0 { + return nil, fmt.Errorf("%w: at least one review result is required", ErrInvalid) + } + if s.store == nil { + return nil, fmt.Errorf("review service store is not configured") + } + runs := make([]domain.ReviewRun, 0, len(reviews)) + for _, review := range reviews { + run, err := s.submitOne(ctx, workerID, review) + if err != nil { + return nil, err + } + runs = append(runs, run) + } + if s.lifecycle == nil { + return runs, nil + } + delivered, err := s.deliverSubmitted(ctx, workerID, runs) + if err != nil { + return nil, err + } + byID := make(map[string]domain.ReviewRun, len(delivered)) + for _, run := range delivered { + byID[run.ID] = run } + for i, run := range runs { + if deliveredRun, ok := byID[run.ID]; ok { + runs[i] = deliveredRun + } + } + return runs, nil +} + +func (s *Service) submitOne(ctx context.Context, workerID domain.SessionID, review SubmittedReview) (domain.ReviewRun, error) { + runID := review.RunID + verdict := review.Verdict + body := review.Body + githubReviewID := review.GithubReviewID if runID == "" { return domain.ReviewRun{}, fmt.Errorf("%w: review run id is required", ErrInvalid) } @@ -94,9 +166,6 @@ func (s *Service) Submit(ctx context.Context, workerID domain.SessionID, runID s if verdict == domain.VerdictChangesRequested && body == "" { return domain.ReviewRun{}, fmt.Errorf("%w: a changes_requested review requires a body", ErrInvalid) } - if s.store == nil { - return domain.ReviewRun{}, fmt.Errorf("review service store is not configured") - } run, ok, err := s.store.GetReviewRun(ctx, runID) if err != nil { return domain.ReviewRun{}, err @@ -136,35 +205,92 @@ func (s *Service) Submit(ctx context.Context, workerID domain.SessionID, runID s default: return domain.ReviewRun{}, fmt.Errorf("%w: review run %q is not running", ErrInvalid, runID) } + return run, nil +} - if s.lifecycle == nil { - return run, nil +func (s *Service) deliverSubmitted(ctx context.Context, workerID domain.SessionID, runs []domain.ReviewRun) ([]domain.ReviewRun, error) { + deliverable, err := s.deliverableRuns(ctx, workerID, runs) + if err != nil { + return nil, err + } + if len(deliverable) == 0 { + return nil, nil + } + results := reviewResults(workerID, deliverable) + var outcome lifecycle.ReviewDeliveryOutcome + if len(results) == 1 && results[0].BatchID == "" { + outcome, err = s.lifecycle.ApplyReviewResult(ctx, workerID, results[0]) + } else { + outcome, err = s.lifecycle.ApplyReviewBatch(ctx, workerID, results[0].BatchID, results) } - outcome, err := s.lifecycle.ApplyReviewResult(ctx, workerID, lifecycle.ReviewResult{ - RunID: run.ID, - WorkerID: workerID, - PRURL: run.PRURL, - TargetSHA: run.TargetSHA, - Verdict: run.Verdict, - Body: run.Body, - GithubReviewID: run.GithubReviewID, - DeliveredAt: run.DeliveredAt, - }) if err != nil { - return domain.ReviewRun{}, err + return nil, err + } + if outcome != lifecycle.ReviewDeliverySent { + return nil, nil } - if outcome == lifecycle.ReviewDeliverySent { - deliveredAt := s.clock() + deliveredAt := s.clock() + delivered := make([]domain.ReviewRun, 0, len(deliverable)) + for _, run := range deliverable { updated, err := s.store.MarkReviewRunDelivered(ctx, run.ID, deliveredAt) if err != nil { - return domain.ReviewRun{}, err + return nil, err } if updated { run.Status = domain.ReviewRunDelivered run.DeliveredAt = &deliveredAt + delivered = append(delivered, run) } } - return run, nil + return delivered, nil +} + +func (s *Service) deliverableRuns(ctx context.Context, workerID domain.SessionID, runs []domain.ReviewRun) ([]domain.ReviewRun, error) { + currentHeads, err := s.currentHeadsByPR(ctx, workerID) + if err != nil { + return nil, err + } + deliverable := make([]domain.ReviewRun, 0, len(runs)) + for _, run := range runs { + if run.Status != domain.ReviewRunComplete || run.Verdict != domain.VerdictChangesRequested || run.DeliveredAt != nil { + continue + } + if run.BatchID != "" && currentHeads[run.PRURL] != run.TargetSHA { + continue + } + deliverable = append(deliverable, run) + } + return deliverable, nil +} + +func reviewResults(workerID domain.SessionID, runs []domain.ReviewRun) []lifecycle.ReviewResult { + results := make([]lifecycle.ReviewResult, 0, len(runs)) + for _, run := range runs { + results = append(results, lifecycle.ReviewResult{ + RunID: run.ID, + BatchID: run.BatchID, + WorkerID: workerID, + PRURL: run.PRURL, + TargetSHA: run.TargetSHA, + Verdict: run.Verdict, + Body: run.Body, + GithubReviewID: run.GithubReviewID, + DeliveredAt: run.DeliveredAt, + }) + } + return results +} + +func (s *Service) currentHeadsByPR(ctx context.Context, workerID domain.SessionID) (map[string]string, error) { + prs, err := s.store.ListPRsBySession(ctx, workerID) + if err != nil { + return nil, err + } + current := make(map[string]string, len(prs)) + for _, pr := range prs { + current[pr.URL] = pr.HeadSHA + } + return current, nil } // List returns a worker's review state. diff --git a/backend/internal/service/review/review_test.go b/backend/internal/service/review/review_test.go index c9e4c455..259c47f5 100644 --- a/backend/internal/service/review/review_test.go +++ b/backend/internal/service/review/review_test.go @@ -11,18 +11,45 @@ import ( ) type fakeStore struct { - run domain.ReviewRun - ok bool + run domain.ReviewRun + ok bool + batchRuns []domain.ReviewRun + prs []domain.PullRequest updateCalls int markCalls int + markedIDs []string } -func (f *fakeStore) GetReviewRun(context.Context, string) (domain.ReviewRun, bool, error) { - return f.run, f.ok, nil +func (f *fakeStore) GetReviewRun(_ context.Context, id string) (domain.ReviewRun, bool, error) { + for _, run := range f.batchRuns { + if run.ID == id { + return run, true, nil + } + } + if f.ok && f.run.ID == id { + return f.run, true, nil + } + return domain.ReviewRun{}, false, nil } -func (f *fakeStore) UpdateReviewRunResult(_ context.Context, _ string, status domain.ReviewRunStatus, verdict domain.ReviewVerdict, body, githubReviewID string) (bool, error) { +func (f *fakeStore) UpdateReviewRunResult(_ context.Context, id string, status domain.ReviewRunStatus, verdict domain.ReviewVerdict, body, githubReviewID string) (bool, error) { + for i := range f.batchRuns { + if f.batchRuns[i].ID == id { + if f.batchRuns[i].Status != domain.ReviewRunRunning { + return false, nil + } + f.updateCalls++ + f.batchRuns[i].Status = status + f.batchRuns[i].Verdict = verdict + f.batchRuns[i].Body = body + f.batchRuns[i].GithubReviewID = githubReviewID + if f.run.ID == id { + f.run = f.batchRuns[i] + } + return true, nil + } + } if f.run.Status != domain.ReviewRunRunning { return false, nil } @@ -34,21 +61,44 @@ func (f *fakeStore) UpdateReviewRunResult(_ context.Context, _ string, status do return true, nil } -func (f *fakeStore) MarkReviewRunDelivered(_ context.Context, _ string, deliveredAt time.Time) (bool, error) { +func (f *fakeStore) MarkReviewRunDelivered(_ context.Context, id string, deliveredAt time.Time) (bool, error) { f.markCalls++ - if f.run.Status != domain.ReviewRunComplete || f.run.DeliveredAt != nil { + f.markedIDs = append(f.markedIDs, id) + if f.run.ID == id && f.run.Status == domain.ReviewRunComplete && f.run.DeliveredAt == nil { + f.run.Status = domain.ReviewRunDelivered + f.run.DeliveredAt = &deliveredAt + } + for i := range f.batchRuns { + if f.batchRuns[i].ID == id && f.batchRuns[i].Status == domain.ReviewRunComplete && f.batchRuns[i].DeliveredAt == nil { + f.batchRuns[i].Status = domain.ReviewRunDelivered + f.batchRuns[i].DeliveredAt = &deliveredAt + return true, nil + } + } + if f.run.ID != id || f.run.Status != domain.ReviewRunDelivered { return false, nil } - f.run.Status = domain.ReviewRunDelivered - f.run.DeliveredAt = &deliveredAt return true, nil } +func (f *fakeStore) ListReviewRunsByBatch(context.Context, domain.SessionID, string) ([]domain.ReviewRun, error) { + out := append([]domain.ReviewRun(nil), f.batchRuns...) + return out, nil +} + +func (f *fakeStore) ListPRsBySession(context.Context, domain.SessionID) ([]domain.PullRequest, error) { + out := append([]domain.PullRequest(nil), f.prs...) + return out, nil +} + type fakeReducer struct { - outcome lifecycle.ReviewDeliveryOutcome - err error - calls int - got lifecycle.ReviewResult + outcome lifecycle.ReviewDeliveryOutcome + err error + calls int + batchCalls int + got lifecycle.ReviewResult + gotBatchID string + gotBatch []lifecycle.ReviewResult } func (f *fakeReducer) ApplyReviewResult(_ context.Context, _ domain.SessionID, result lifecycle.ReviewResult) (lifecycle.ReviewDeliveryOutcome, error) { @@ -57,6 +107,13 @@ func (f *fakeReducer) ApplyReviewResult(_ context.Context, _ domain.SessionID, r return f.outcome, f.err } +func (f *fakeReducer) ApplyReviewBatch(_ context.Context, _ domain.SessionID, batchID string, results []lifecycle.ReviewResult) (lifecycle.ReviewDeliveryOutcome, error) { + f.batchCalls++ + f.gotBatchID = batchID + f.gotBatch = append([]lifecycle.ReviewResult(nil), results...) + return f.outcome, f.err +} + func TestSubmitPersistsThenAppliesThenStampsDelivered(t *testing.T) { now := time.Unix(100, 0).UTC() st := &fakeStore{ok: true, run: domain.ReviewRun{ID: "run-1", SessionID: "mer-1", PRURL: "pr1", TargetSHA: "sha1", Status: domain.ReviewRunRunning}} @@ -78,6 +135,98 @@ func TestSubmitPersistsThenAppliesThenStampsDelivered(t *testing.T) { } } +func TestSubmitBatchRunDoesNotWaitForOtherRunningRuns(t *testing.T) { + now := time.Unix(100, 0).UTC() + st := &fakeStore{ + ok: true, + run: domain.ReviewRun{ID: "run-1", SessionID: "mer-1", BatchID: "batch-1", PRURL: "pr1", TargetSHA: "sha1", Status: domain.ReviewRunRunning}, + batchRuns: []domain.ReviewRun{ + {ID: "run-1", SessionID: "mer-1", BatchID: "batch-1", PRURL: "pr1", TargetSHA: "sha1", Status: domain.ReviewRunRunning}, + {ID: "run-2", SessionID: "mer-1", BatchID: "batch-1", PRURL: "pr2", TargetSHA: "sha2", Status: domain.ReviewRunRunning}, + }, + prs: []domain.PullRequest{{URL: "pr1", HeadSHA: "sha1"}, {URL: "pr2", HeadSHA: "sha2"}}, + } + reducer := &fakeReducer{outcome: lifecycle.ReviewDeliverySent} + svc := New(nil, st, WithLifecycleReducer(reducer), WithClock(func() time.Time { return now })) + + run, err := svc.Submit(context.Background(), "mer-1", "run-1", domain.VerdictChangesRequested, "fix pr1", "101") + if err != nil { + t.Fatalf("Submit: %v", err) + } + if run.Status != domain.ReviewRunDelivered || run.DeliveredAt == nil || !run.DeliveredAt.Equal(now) { + t.Fatalf("first submit status = %+v, want delivered", run) + } + if reducer.batchCalls != 1 || len(reducer.gotBatch) != 1 || reducer.gotBatch[0].RunID != "run-1" || st.markCalls != 1 { + t.Fatalf("submitted run should deliver independently: batchCalls=%d got=%+v markCalls=%d", reducer.batchCalls, reducer.gotBatch, st.markCalls) + } +} + +func TestSubmitManySendsCombinedChangesRequested(t *testing.T) { + now := time.Unix(100, 0).UTC() + st := &fakeStore{ + ok: true, + batchRuns: []domain.ReviewRun{ + {ID: "run-1", SessionID: "mer-1", BatchID: "batch-1", PRURL: "pr1", TargetSHA: "sha1", Status: domain.ReviewRunRunning}, + {ID: "run-2", SessionID: "mer-1", BatchID: "batch-1", PRURL: "pr2", TargetSHA: "sha2", Status: domain.ReviewRunRunning}, + {ID: "run-3", SessionID: "mer-1", BatchID: "batch-1", PRURL: "pr3", TargetSHA: "sha3", Status: domain.ReviewRunComplete, Verdict: domain.VerdictApproved}, + {ID: "run-4", SessionID: "mer-1", BatchID: "batch-1", PRURL: "pr4", TargetSHA: "old", Status: domain.ReviewRunComplete, Verdict: domain.VerdictChangesRequested, Body: "stale"}, + {ID: "run-5", SessionID: "mer-1", BatchID: "batch-1", PRURL: "pr5", TargetSHA: "sha5", Status: domain.ReviewRunFailed}, + }, + prs: []domain.PullRequest{ + {URL: "pr1", HeadSHA: "sha1"}, + {URL: "pr2", HeadSHA: "sha2"}, + {URL: "pr3", HeadSHA: "sha3"}, + {URL: "pr4", HeadSHA: "new"}, + {URL: "pr5", HeadSHA: "sha5"}, + }, + } + reducer := &fakeReducer{outcome: lifecycle.ReviewDeliverySent} + svc := New(nil, st, WithLifecycleReducer(reducer), WithClock(func() time.Time { return now })) + + runs, err := svc.SubmitMany(context.Background(), "mer-1", []SubmittedReview{ + {RunID: "run-1", Verdict: domain.VerdictChangesRequested, Body: "fix pr1", GithubReviewID: "101"}, + {RunID: "run-2", Verdict: domain.VerdictChangesRequested, Body: "fix pr2", GithubReviewID: "102"}, + {RunID: "run-3", Verdict: domain.VerdictApproved}, + }) + if err != nil { + t.Fatalf("SubmitMany: %v", err) + } + if reducer.batchCalls != 1 || reducer.gotBatchID != "batch-1" { + t.Fatalf("batch delivery calls/id = %d/%q", reducer.batchCalls, reducer.gotBatchID) + } + if len(reducer.gotBatch) != 2 || reducer.gotBatch[0].RunID != "run-1" || reducer.gotBatch[1].RunID != "run-2" { + t.Fatalf("delivered batch = %+v, want run-1 and run-2 only", reducer.gotBatch) + } + if st.markCalls != 2 { + t.Fatalf("markCalls = %d, want 2", st.markCalls) + } + if runs[0].Status != domain.ReviewRunDelivered || runs[0].DeliveredAt == nil || !runs[0].DeliveredAt.Equal(now) || + runs[1].Status != domain.ReviewRunDelivered || runs[1].DeliveredAt == nil || !runs[1].DeliveredAt.Equal(now) { + t.Fatalf("submitted runs not stamped delivered: %+v", runs) + } +} + +func TestSubmitBatchApprovedOnlySendsNothing(t *testing.T) { + st := &fakeStore{ + ok: true, + run: domain.ReviewRun{ID: "run-2", SessionID: "mer-1", BatchID: "batch-1", PRURL: "pr2", TargetSHA: "sha2", Status: domain.ReviewRunRunning}, + batchRuns: []domain.ReviewRun{ + {ID: "run-1", SessionID: "mer-1", BatchID: "batch-1", PRURL: "pr1", TargetSHA: "sha1", Status: domain.ReviewRunComplete, Verdict: domain.VerdictApproved}, + {ID: "run-2", SessionID: "mer-1", BatchID: "batch-1", PRURL: "pr2", TargetSHA: "sha2", Status: domain.ReviewRunRunning}, + }, + prs: []domain.PullRequest{{URL: "pr1", HeadSHA: "sha1"}, {URL: "pr2", HeadSHA: "sha2"}}, + } + reducer := &fakeReducer{outcome: lifecycle.ReviewDeliverySent} + svc := New(nil, st, WithLifecycleReducer(reducer)) + + if _, err := svc.Submit(context.Background(), "mer-1", "run-2", domain.VerdictApproved, "", "102"); err != nil { + t.Fatalf("Submit: %v", err) + } + if reducer.batchCalls != 0 || st.markCalls != 0 { + t.Fatalf("approved-only batch should not deliver: batchCalls=%d markCalls=%d", reducer.batchCalls, st.markCalls) + } +} + func TestSubmitDeliveryFailureLeavesCompletedUndeliveredForRetry(t *testing.T) { sendErr := errors.New("dead pane") st := &fakeStore{ok: true, run: domain.ReviewRun{ID: "run-1", SessionID: "mer-1", PRURL: "pr1", TargetSHA: "sha1", Status: domain.ReviewRunRunning}} diff --git a/backend/internal/storage/sqlite/gen/models.go b/backend/internal/storage/sqlite/gen/models.go index f3ca3865..ccc4bf74 100644 --- a/backend/internal/storage/sqlite/gen/models.go +++ b/backend/internal/storage/sqlite/gen/models.go @@ -147,6 +147,7 @@ type ReviewRun struct { CreatedAt time.Time GithubReviewID string DeliveredAt sql.NullTime + BatchID string } type Session struct { diff --git a/backend/internal/storage/sqlite/gen/review.sql.go b/backend/internal/storage/sqlite/gen/review.sql.go index f9d08f34..16a939ae 100644 --- a/backend/internal/storage/sqlite/gen/review.sql.go +++ b/backend/internal/storage/sqlite/gen/review.sql.go @@ -35,7 +35,7 @@ func (q *Queries) GetReviewBySession(ctx context.Context, sessionID domain.Sessi } const getReviewRun = `-- name: GetReviewRun :one -SELECT id, review_id, session_id, harness, pr_url, target_sha, status, verdict, body, created_at, github_review_id, delivered_at +SELECT id, review_id, session_id, harness, pr_url, target_sha, status, verdict, body, created_at, github_review_id, delivered_at, batch_id FROM review_run WHERE id = ? ` @@ -55,22 +55,24 @@ func (q *Queries) GetReviewRun(ctx context.Context, id string) (ReviewRun, error &i.CreatedAt, &i.GithubReviewID, &i.DeliveredAt, + &i.BatchID, ) return i, err } -const getReviewRunBySessionAndSHA = `-- name: GetReviewRunBySessionAndSHA :one -SELECT id, review_id, session_id, harness, pr_url, target_sha, status, verdict, body, created_at, github_review_id, delivered_at -FROM review_run WHERE session_id = ? AND target_sha = ? ORDER BY created_at DESC LIMIT 1 +const getReviewRunBySessionPRAndSHA = `-- name: GetReviewRunBySessionPRAndSHA :one +SELECT id, review_id, session_id, harness, pr_url, target_sha, status, verdict, body, created_at, github_review_id, delivered_at, batch_id +FROM review_run WHERE session_id = ? AND pr_url = ? AND target_sha = ? ORDER BY created_at DESC LIMIT 1 ` -type GetReviewRunBySessionAndSHAParams struct { +type GetReviewRunBySessionPRAndSHAParams struct { SessionID domain.SessionID + PRURL string TargetSha string } -func (q *Queries) GetReviewRunBySessionAndSHA(ctx context.Context, arg GetReviewRunBySessionAndSHAParams) (ReviewRun, error) { - row := q.db.QueryRowContext(ctx, getReviewRunBySessionAndSHA, arg.SessionID, arg.TargetSha) +func (q *Queries) GetReviewRunBySessionPRAndSHA(ctx context.Context, arg GetReviewRunBySessionPRAndSHAParams) (ReviewRun, error) { + row := q.db.QueryRowContext(ctx, getReviewRunBySessionPRAndSHA, arg.SessionID, arg.PRURL, arg.TargetSha) var i ReviewRun err := row.Scan( &i.ID, @@ -85,19 +87,21 @@ func (q *Queries) GetReviewRunBySessionAndSHA(ctx context.Context, arg GetReview &i.CreatedAt, &i.GithubReviewID, &i.DeliveredAt, + &i.BatchID, ) return i, err } const insertReviewRun = `-- name: InsertReviewRun :exec -INSERT INTO review_run (id, review_id, session_id, harness, pr_url, target_sha, status, verdict, body, github_review_id, created_at) -VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) +INSERT INTO review_run (id, review_id, session_id, batch_id, harness, pr_url, target_sha, status, verdict, body, github_review_id, created_at) +VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ` type InsertReviewRunParams struct { ID string ReviewID string SessionID domain.SessionID + BatchID string Harness domain.ReviewerHarness PRURL string TargetSha string @@ -113,6 +117,7 @@ func (q *Queries) InsertReviewRun(ctx context.Context, arg InsertReviewRunParams arg.ID, arg.ReviewID, arg.SessionID, + arg.BatchID, arg.Harness, arg.PRURL, arg.TargetSha, @@ -125,8 +130,55 @@ func (q *Queries) InsertReviewRun(ctx context.Context, arg InsertReviewRunParams return err } +const listReviewRunsByBatch = `-- name: ListReviewRunsByBatch :many +SELECT id, review_id, session_id, harness, pr_url, target_sha, status, verdict, body, created_at, github_review_id, delivered_at, batch_id +FROM review_run WHERE session_id = ? AND batch_id = ? ORDER BY created_at ASC, id ASC +` + +type ListReviewRunsByBatchParams struct { + SessionID domain.SessionID + BatchID string +} + +func (q *Queries) ListReviewRunsByBatch(ctx context.Context, arg ListReviewRunsByBatchParams) ([]ReviewRun, error) { + rows, err := q.db.QueryContext(ctx, listReviewRunsByBatch, arg.SessionID, arg.BatchID) + if err != nil { + return nil, err + } + defer rows.Close() + items := []ReviewRun{} + for rows.Next() { + var i ReviewRun + if err := rows.Scan( + &i.ID, + &i.ReviewID, + &i.SessionID, + &i.Harness, + &i.PRURL, + &i.TargetSha, + &i.Status, + &i.Verdict, + &i.Body, + &i.CreatedAt, + &i.GithubReviewID, + &i.DeliveredAt, + &i.BatchID, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + const listReviewRunsBySession = `-- name: ListReviewRunsBySession :many -SELECT id, review_id, session_id, harness, pr_url, target_sha, status, verdict, body, created_at, github_review_id, delivered_at +SELECT id, review_id, session_id, harness, pr_url, target_sha, status, verdict, body, created_at, github_review_id, delivered_at, batch_id FROM review_run WHERE session_id = ? ORDER BY created_at DESC ` @@ -152,6 +204,7 @@ func (q *Queries) ListReviewRunsBySession(ctx context.Context, sessionID domain. &i.CreatedAt, &i.GithubReviewID, &i.DeliveredAt, + &i.BatchID, ); err != nil { return nil, err } @@ -201,17 +254,23 @@ func (q *Queries) SupersedeReviewRun(ctx context.Context, arg SupersedeReviewRun } const supersedeStaleRunningReviewRuns = `-- name: SupersedeStaleRunningReviewRuns :execrows -UPDATE review_run SET status = 'failed', body = ? WHERE session_id = ? AND target_sha != ? AND status = 'running' AND verdict = '' +UPDATE review_run SET status = 'failed', body = ? WHERE session_id = ? AND pr_url = ? AND target_sha != ? AND status = 'running' AND verdict = '' ` type SupersedeStaleRunningReviewRunsParams struct { Body string SessionID domain.SessionID + PRURL string TargetSha string } func (q *Queries) SupersedeStaleRunningReviewRuns(ctx context.Context, arg SupersedeStaleRunningReviewRunsParams) (int64, error) { - result, err := q.db.ExecContext(ctx, supersedeStaleRunningReviewRuns, arg.Body, arg.SessionID, arg.TargetSha) + result, err := q.db.ExecContext(ctx, supersedeStaleRunningReviewRuns, + arg.Body, + arg.SessionID, + arg.PRURL, + arg.TargetSha, + ) if err != nil { return 0, err } diff --git a/backend/internal/storage/sqlite/migrations/0020_review_run_unique_pr_sha.sql b/backend/internal/storage/sqlite/migrations/0020_review_run_unique_pr_sha.sql new file mode 100644 index 00000000..59c03134 --- /dev/null +++ b/backend/internal/storage/sqlite/migrations/0020_review_run_unique_pr_sha.sql @@ -0,0 +1,85 @@ +-- Review runs are PR-scoped. Two PRs in one worker session can legitimately +-- share a head SHA, so the idempotency index must include pr_url. Runs created +-- by one trigger also share batch_id so one reviewer CLI submission can notify +-- the worker about multiple PRs at once. + +-- +goose Up +-- +goose StatementBegin +ALTER TABLE review_run ADD COLUMN batch_id TEXT NOT NULL DEFAULT ''; +-- +goose StatementEnd + +-- +goose StatementBegin +DROP INDEX idx_review_run_session_sha; +-- +goose StatementEnd + +-- +goose StatementBegin +DELETE FROM review_run +WHERE target_sha != '' + AND pr_url != '' + AND status != 'failed' + AND rowid NOT IN ( + SELECT rowid FROM ( + SELECT rowid, + ROW_NUMBER() OVER ( + PARTITION BY session_id, pr_url, target_sha + ORDER BY CASE status WHEN 'complete' THEN 0 WHEN 'delivered' THEN 0 WHEN 'running' THEN 1 ELSE 2 END, + created_at DESC, + rowid DESC + ) AS rn + FROM review_run + WHERE target_sha != '' AND pr_url != '' AND status != 'failed' + ) + WHERE rn = 1 + ); +-- +goose StatementEnd + +-- +goose StatementBegin +CREATE UNIQUE INDEX idx_review_run_session_pr_sha + ON review_run (session_id, pr_url, target_sha) + WHERE target_sha != '' AND status != 'failed'; +-- +goose StatementEnd + +-- +goose StatementBegin +CREATE INDEX idx_review_run_session_batch + ON review_run (session_id, batch_id, created_at) + WHERE batch_id != ''; +-- +goose StatementEnd + +-- +goose Down +-- +goose StatementBegin +DROP INDEX idx_review_run_session_batch; +-- +goose StatementEnd + +-- +goose StatementBegin +DROP INDEX idx_review_run_session_pr_sha; +-- +goose StatementEnd + +-- +goose StatementBegin +DELETE FROM review_run +WHERE target_sha != '' + AND status != 'failed' + AND rowid NOT IN ( + SELECT rowid FROM ( + SELECT rowid, + ROW_NUMBER() OVER ( + PARTITION BY session_id, target_sha + ORDER BY CASE status WHEN 'complete' THEN 0 WHEN 'delivered' THEN 0 WHEN 'running' THEN 1 ELSE 2 END, + created_at DESC, + rowid DESC + ) AS rn + FROM review_run + WHERE target_sha != '' AND status != 'failed' + ) + WHERE rn = 1 + ); +-- +goose StatementEnd + +-- +goose StatementBegin +CREATE UNIQUE INDEX idx_review_run_session_sha + ON review_run (session_id, target_sha) + WHERE target_sha != '' AND status != 'failed'; +-- +goose StatementEnd + +-- +goose StatementBegin +ALTER TABLE review_run DROP COLUMN batch_id; +-- +goose StatementEnd diff --git a/backend/internal/storage/sqlite/queries/review.sql b/backend/internal/storage/sqlite/queries/review.sql index 2dbc3a08..335f87cd 100644 --- a/backend/internal/storage/sqlite/queries/review.sql +++ b/backend/internal/storage/sqlite/queries/review.sql @@ -12,8 +12,8 @@ SELECT id, session_id, project_id, harness, pr_url, reviewer_handle_id, created_ FROM review WHERE session_id = ?; -- name: InsertReviewRun :exec -INSERT INTO review_run (id, review_id, session_id, harness, pr_url, target_sha, status, verdict, body, github_review_id, created_at) -VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?); +INSERT INTO review_run (id, review_id, session_id, batch_id, harness, pr_url, target_sha, status, verdict, body, github_review_id, created_at) +VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?); -- name: UpdateReviewRunResult :execrows UPDATE review_run SET status = ?, verdict = ?, body = ?, github_review_id = ? WHERE id = ? AND status = 'running'; @@ -22,19 +22,23 @@ UPDATE review_run SET status = ?, verdict = ?, body = ?, github_review_id = ? WH UPDATE review_run SET status = 'failed', body = ? WHERE id = ? AND verdict = '' AND status != 'failed'; -- name: SupersedeStaleRunningReviewRuns :execrows -UPDATE review_run SET status = 'failed', body = ? WHERE session_id = ? AND target_sha != ? AND status = 'running' AND verdict = ''; +UPDATE review_run SET status = 'failed', body = ? WHERE session_id = ? AND pr_url = ? AND target_sha != ? AND status = 'running' AND verdict = ''; -- name: MarkReviewRunDelivered :execrows UPDATE review_run SET status = 'delivered', delivered_at = ? WHERE id = ? AND status = 'complete' AND delivered_at IS NULL; -- name: GetReviewRun :one -SELECT id, review_id, session_id, harness, pr_url, target_sha, status, verdict, body, created_at, github_review_id, delivered_at +SELECT id, review_id, session_id, harness, pr_url, target_sha, status, verdict, body, created_at, github_review_id, delivered_at, batch_id FROM review_run WHERE id = ?; --- name: GetReviewRunBySessionAndSHA :one -SELECT id, review_id, session_id, harness, pr_url, target_sha, status, verdict, body, created_at, github_review_id, delivered_at -FROM review_run WHERE session_id = ? AND target_sha = ? ORDER BY created_at DESC LIMIT 1; +-- name: GetReviewRunBySessionPRAndSHA :one +SELECT id, review_id, session_id, harness, pr_url, target_sha, status, verdict, body, created_at, github_review_id, delivered_at, batch_id +FROM review_run WHERE session_id = ? AND pr_url = ? AND target_sha = ? ORDER BY created_at DESC LIMIT 1; -- name: ListReviewRunsBySession :many -SELECT id, review_id, session_id, harness, pr_url, target_sha, status, verdict, body, created_at, github_review_id, delivered_at +SELECT id, review_id, session_id, harness, pr_url, target_sha, status, verdict, body, created_at, github_review_id, delivered_at, batch_id FROM review_run WHERE session_id = ? ORDER BY created_at DESC; + +-- name: ListReviewRunsByBatch :many +SELECT id, review_id, session_id, harness, pr_url, target_sha, status, verdict, body, created_at, github_review_id, delivered_at, batch_id +FROM review_run WHERE session_id = ? AND batch_id = ? ORDER BY created_at ASC, id ASC; diff --git a/backend/internal/storage/sqlite/store/review_store.go b/backend/internal/storage/sqlite/store/review_store.go index cf5fd646..9a62925f 100644 --- a/backend/internal/storage/sqlite/store/review_store.go +++ b/backend/internal/storage/sqlite/store/review_store.go @@ -41,7 +41,7 @@ func (s *Store) GetReviewBySession(ctx context.Context, id domain.SessionID) (do } // InsertReviewRun records a new review pass. A unique-constraint hit on the -// (session_id, target_sha) index (migration 0013) is surfaced as the sentinel +// (session_id, pr_url, target_sha) index (migration 0020) is surfaced as the sentinel // domain.ErrDuplicateReviewRun so the engine can fall back to the existing run. func (s *Store) InsertReviewRun(ctx context.Context, r domain.ReviewRun) error { s.writeMu.Lock() @@ -50,6 +50,7 @@ func (s *Store) InsertReviewRun(ctx context.Context, r domain.ReviewRun) error { ID: r.ID, ReviewID: r.ReviewID, SessionID: r.SessionID, + BatchID: r.BatchID, Harness: r.Harness, PRURL: r.PRURL, TargetSha: r.TargetSHA, @@ -60,7 +61,7 @@ func (s *Store) InsertReviewRun(ctx context.Context, r domain.ReviewRun) error { CreatedAt: r.CreatedAt, }) if isSQLiteUnique(err) { - return fmt.Errorf("insert review run for session %s sha %s: %w", r.SessionID, r.TargetSHA, domain.ErrDuplicateReviewRun) + return fmt.Errorf("insert review run for session %s pr %s sha %s: %w", r.SessionID, r.PRURL, r.TargetSHA, domain.ErrDuplicateReviewRun) } return err } @@ -100,12 +101,13 @@ func (s *Store) SupersedeReviewRun(ctx context.Context, id, body string) (bool, // SupersedeStaleRunningReviewRuns marks older running unverdicted passes for a // worker failed before starting a review for a newer commit. -func (s *Store) SupersedeStaleRunningReviewRuns(ctx context.Context, sessionID domain.SessionID, targetSHA, body string) (int64, error) { +func (s *Store) SupersedeStaleRunningReviewRuns(ctx context.Context, sessionID domain.SessionID, prURL, targetSHA, body string) (int64, error) { s.writeMu.Lock() defer s.writeMu.Unlock() return s.qw.SupersedeStaleRunningReviewRuns(ctx, gen.SupersedeStaleRunningReviewRunsParams{ Body: body, SessionID: sessionID, + PRURL: prURL, TargetSha: targetSHA, }) } @@ -137,16 +139,17 @@ func (s *Store) GetReviewRun(ctx context.Context, id string) (domain.ReviewRun, return reviewRunFromRow(row), true, nil } -// GetReviewRunBySessionAndSHA returns the most recent review pass for a worker -// session at a specific commit, ok=false if none. It lets a repeat trigger for -// the same PR head short-circuit to the existing run. -func (s *Store) GetReviewRunBySessionAndSHA(ctx context.Context, id domain.SessionID, targetSHA string) (domain.ReviewRun, bool, error) { - row, err := s.qr.GetReviewRunBySessionAndSHA(ctx, gen.GetReviewRunBySessionAndSHAParams{SessionID: id, TargetSha: targetSHA}) +// GetReviewRunBySessionPRAndSHA returns the most recent review pass for a +// worker session, PR, and commit, ok=false if none. It lets a repeat trigger for +// the same PR head short-circuit to the existing run without colliding with +// another PR that happens to share the same head commit. +func (s *Store) GetReviewRunBySessionPRAndSHA(ctx context.Context, id domain.SessionID, prURL, targetSHA string) (domain.ReviewRun, bool, error) { + row, err := s.qr.GetReviewRunBySessionPRAndSHA(ctx, gen.GetReviewRunBySessionPRAndSHAParams{SessionID: id, PRURL: prURL, TargetSha: targetSHA}) if errors.Is(err, sql.ErrNoRows) { return domain.ReviewRun{}, false, nil } if err != nil { - return domain.ReviewRun{}, false, fmt.Errorf("get review run for session %s sha %s: %w", id, targetSHA, err) + return domain.ReviewRun{}, false, fmt.Errorf("get review run for session %s pr %s sha %s: %w", id, prURL, targetSHA, err) } return reviewRunFromRow(row), true, nil } @@ -164,6 +167,19 @@ func (s *Store) ListReviewRunsBySession(ctx context.Context, id domain.SessionID return out, nil } +// ListReviewRunsByBatch returns all passes in one trigger-created batch, oldest first. +func (s *Store) ListReviewRunsByBatch(ctx context.Context, id domain.SessionID, batchID string) ([]domain.ReviewRun, error) { + rows, err := s.qr.ListReviewRunsByBatch(ctx, gen.ListReviewRunsByBatchParams{SessionID: id, BatchID: batchID}) + if err != nil { + return nil, fmt.Errorf("list review runs for session %s batch %s: %w", id, batchID, err) + } + out := make([]domain.ReviewRun, 0, len(rows)) + for _, row := range rows { + out = append(out, reviewRunFromRow(row)) + } + return out, nil +} + func reviewFromRow(r gen.Review) domain.Review { return domain.Review{ ID: r.ID, @@ -187,6 +203,7 @@ func reviewRunFromRow(r gen.ReviewRun) domain.ReviewRun { ID: r.ID, ReviewID: r.ReviewID, SessionID: r.SessionID, + BatchID: r.BatchID, Harness: r.Harness, PRURL: r.PRURL, TargetSHA: r.TargetSha, diff --git a/backend/internal/storage/sqlite/store/review_store_test.go b/backend/internal/storage/sqlite/store/review_store_test.go index 39943826..e8980bec 100644 --- a/backend/internal/storage/sqlite/store/review_store_test.go +++ b/backend/internal/storage/sqlite/store/review_store_test.go @@ -9,7 +9,7 @@ import ( "github.com/aoagents/agent-orchestrator/backend/internal/domain" ) -func TestInsertReviewRunDuplicateSHAMapsToSentinel(t *testing.T) { +func TestInsertReviewRunDuplicatePRSHAMapsToSentinel(t *testing.T) { s := newTestStore(t) ctx := context.Background() seedProject(t, s, "mer") @@ -26,21 +26,28 @@ func TestInsertReviewRunDuplicateSHAMapsToSentinel(t *testing.T) { } run := domain.ReviewRun{ ID: "run-1", ReviewID: "rev-1", SessionID: rec.ID, Harness: domain.ReviewerClaudeCode, - TargetSHA: "sha1", Status: domain.ReviewRunRunning, Verdict: domain.VerdictNone, CreatedAt: now, + PRURL: "https://example/pr/1", TargetSHA: "sha1", Status: domain.ReviewRunRunning, Verdict: domain.VerdictNone, CreatedAt: now, } if err := s.InsertReviewRun(ctx, run); err != nil { t.Fatalf("first insert: %v", err) } - // A second run for the same (session_id, target_sha) hits the partial unique - // index (migration 0013) and must surface as the sentinel so the engine can - // fall back to the existing run. + // A second run for the same (session_id, pr_url, target_sha) hits the + // partial unique index (migration 0020) and must surface as the sentinel so + // the engine can fall back to the existing run. dup := run dup.ID = "run-2" if err := s.InsertReviewRun(ctx, dup); !errors.Is(err, domain.ErrDuplicateReviewRun) { t.Fatalf("duplicate insert err = %v, want ErrDuplicateReviewRun", err) } + otherPR := run + otherPR.ID = "run-other-pr" + otherPR.PRURL = "https://example/pr/2" + if err := s.InsertReviewRun(ctx, otherPR); err != nil { + t.Fatalf("same sha on different PR should insert: %v", err) + } + if ok, err := s.UpdateReviewRunResult(ctx, "run-1", domain.ReviewRunFailed, domain.VerdictNone, "claude: not found", ""); err != nil { t.Fatalf("mark failed: %v", err) } else if !ok { @@ -102,7 +109,7 @@ func TestReviewUpsertReusesRowAndRunRoundTrip(t *testing.T) { // A run inserts running and updates to complete/changes_requested. if err := s.InsertReviewRun(ctx, domain.ReviewRun{ - ID: "run-1", ReviewID: got.ID, SessionID: rec.ID, Harness: domain.ReviewerHarness("greptile"), + ID: "run-1", ReviewID: got.ID, SessionID: rec.ID, BatchID: "batch-1", Harness: domain.ReviewerHarness("greptile"), PRURL: got.PRURL, TargetSHA: "sha1", Status: domain.ReviewRunRunning, Verdict: domain.VerdictNone, CreatedAt: now, }); err != nil { @@ -118,18 +125,18 @@ func TestReviewUpsertReusesRowAndRunRoundTrip(t *testing.T) { if err != nil || !ok { t.Fatalf("get run: ok=%v err=%v", ok, err) } - if gotRun.ID != "run-1" || gotRun.SessionID != rec.ID || gotRun.TargetSHA != "sha1" { + if gotRun.ID != "run-1" || gotRun.SessionID != rec.ID || gotRun.BatchID != "batch-1" || gotRun.TargetSHA != "sha1" { t.Fatalf("get run = %+v", gotRun) } - bySHA, ok, err := s.GetReviewRunBySessionAndSHA(ctx, rec.ID, "sha1") + bySHA, ok, err := s.GetReviewRunBySessionPRAndSHA(ctx, rec.ID, got.PRURL, "sha1") if err != nil || !ok { t.Fatalf("by sha: ok=%v err=%v", ok, err) } if bySHA.Status != domain.ReviewRunComplete || bySHA.Verdict != domain.VerdictChangesRequested || bySHA.Body != "please fix" || bySHA.GithubReviewID != "rev-987" { t.Fatalf("run result not persisted: %+v", bySHA) } - if _, ok, _ := s.GetReviewRunBySessionAndSHA(ctx, rec.ID, "other"); ok { + if _, ok, _ := s.GetReviewRunBySessionPRAndSHA(ctx, rec.ID, got.PRURL, "other"); ok { t.Fatal("unexpected run for a different sha") } @@ -140,6 +147,13 @@ func TestReviewUpsertReusesRowAndRunRoundTrip(t *testing.T) { if len(runs) != 1 || runs[0].ID != "run-1" { t.Fatalf("list runs = %+v", runs) } + batchRuns, err := s.ListReviewRunsByBatch(ctx, rec.ID, "batch-1") + if err != nil { + t.Fatalf("list batch runs: %v", err) + } + if len(batchRuns) != 1 || batchRuns[0].ID != "run-1" || batchRuns[0].BatchID != "batch-1" { + t.Fatalf("batch runs = %+v", batchRuns) + } if ok, err := s.UpdateReviewRunResult(ctx, "run-1", domain.ReviewRunComplete, domain.VerdictApproved, "again", ""); err != nil { t.Fatalf("second update: %v", err) @@ -154,7 +168,7 @@ func TestReviewGettersMissing(t *testing.T) { if _, ok, err := s.GetReviewBySession(ctx, "mer-1"); err != nil || ok { t.Fatalf("missing review: ok=%v err=%v", ok, err) } - if _, ok, err := s.GetReviewRunBySessionAndSHA(ctx, "mer-1", "sha1"); err != nil || ok { + if _, ok, err := s.GetReviewRunBySessionPRAndSHA(ctx, "mer-1", "pr1", "sha1"); err != nil || ok { t.Fatalf("missing run: ok=%v err=%v", ok, err) } if _, ok, err := s.GetReviewRun(ctx, "run-missing"); err != nil || ok { diff --git a/frontend/src/api/schema.ts b/frontend/src/api/schema.ts index a7f68302..6f4c7ea8 100644 --- a/frontend/src/api/schema.ts +++ b/frontend/src/api/schema.ts @@ -564,7 +564,7 @@ export interface components { }; ListReviewsResponse: { reviewerHandleId: string; - reviews: components["schemas"]["ReviewRun"][]; + reviews: components["schemas"]["PRReviewState"][]; }; ListSessionPRsResponse: { prs: components["schemas"]["SessionPRSummary"][]; @@ -617,6 +617,15 @@ export interface components { projectId: string; projectName?: string; }; + PRReviewState: { + latestRun?: components["schemas"]["ReviewRun"]; + prNumber: number; + prUrl: string; + /** @enum {string} */ + status: "needs_review" | "running" | "up_to_date" | "changes_requested" | "ineligible"; + targetSha: string; + title: string; + }; Project: { agent?: string; config?: components["schemas"]["ProjectConfig"]; @@ -680,6 +689,7 @@ export interface components { sessionId: string; }; ReviewRun: { + batchId: string; body: string; /** Format: date-time */ createdAt: string; @@ -698,6 +708,7 @@ export interface components { ReviewRunResponse: { review: components["schemas"]["ReviewRun"]; reviewerHandleId: string; + reviews: components["schemas"]["ReviewRun"][]; }; RoleOverride: { agent?: string; @@ -846,14 +857,30 @@ export interface components { }; SubmitReviewInput: { /** @description Review body recorded by AO. Required for changes_requested. */ - body: string; + body?: string; /** @description Id of the GitHub PR review the reviewer posted, if any. */ - githubReviewId: string; + githubReviewId?: string; + /** @description Batched review results recorded by one reviewer CLI command. */ + reviews?: components["schemas"]["SubmitReviewItem"][]; + /** @description Review run id being completed. */ + runId?: string; + /** @description Review verdict: approved or changes_requested. */ + verdict?: string; + }; + SubmitReviewItem: { + /** @description Review body recorded by AO. Required for changes_requested. */ + body?: string; + /** @description Id of the GitHub PR review the reviewer posted, if any. */ + githubReviewId?: string; /** @description Review run id being completed. */ runId: string; /** @description Review verdict: approved or changes_requested. */ verdict: string; }; + TriggerReviewResponse: { + reviewerHandleId: string; + reviews: components["schemas"]["PRReviewState"][]; + }; WorkspaceRepo: { name: string; relativePath: string; @@ -2425,7 +2452,7 @@ export interface operations { [name: string]: unknown; }; content: { - "application/json": components["schemas"]["ReviewRunResponse"]; + "application/json": components["schemas"]["TriggerReviewResponse"]; }; }; /** @description Created */ @@ -2434,7 +2461,7 @@ export interface operations { [name: string]: unknown; }; content: { - "application/json": components["schemas"]["ReviewRunResponse"]; + "application/json": components["schemas"]["TriggerReviewResponse"]; }; }; /** @description Not Found */ diff --git a/frontend/src/renderer/components/SessionInspector.test.tsx b/frontend/src/renderer/components/SessionInspector.test.tsx index 41365e2a..244d9c4e 100644 --- a/frontend/src/renderer/components/SessionInspector.test.tsx +++ b/frontend/src/renderer/components/SessionInspector.test.tsx @@ -85,6 +85,7 @@ const approvedReview = { id: "run-1", reviewId: "review-1", sessionId: "sess-1", + batchId: "batch-1", harness: "codex", status: "complete", verdict: "approved", @@ -94,6 +95,14 @@ const approvedReview = { createdAt: "2026-06-16T10:06:00Z", }; +const reviewState = (latestRun = approvedReview) => ({ + prUrl: latestRun.prUrl, + prNumber: 3, + targetSha: latestRun.targetSha, + status: latestRun.status === "running" ? "running" : "up_to_date", + latestRun, +}); + beforeEach(() => { getMock.mockReset(); postMock.mockReset(); @@ -160,7 +169,7 @@ describe("SessionInspector reviews tab", () => { response: { status: 201 }, data: { reviewerHandleId: "reviewer-pane", - review: { ...approvedReview, status: "running", verdict: "", body: "" }, + reviews: [reviewState({ ...approvedReview, status: "running", verdict: "", body: "" })], }, }); const onOpenReviewerTerminal = vi.fn(); @@ -181,10 +190,10 @@ describe("SessionInspector reviews tab", () => { }); it("shows an up-to-date notice instead of opening the terminal when the backend reuses a run", async () => { - mockCommonGets([approvedReview], "reviewer-pane"); + mockCommonGets([reviewState()], "reviewer-pane"); postMock.mockResolvedValue({ response: { status: 200 }, - data: { reviewerHandleId: "reviewer-pane", review: approvedReview }, + data: { reviewerHandleId: "reviewer-pane", reviews: [reviewState()] }, }); const onOpenReviewerTerminal = vi.fn(); @@ -200,7 +209,7 @@ describe("SessionInspector reviews tab", () => { }); it("shows an approved review and opens its terminal", async () => { - mockCommonGets([approvedReview], "reviewer-pane"); + mockCommonGets([reviewState()], "reviewer-pane"); const onOpenReviewerTerminal = vi.fn(); renderWithQuery( diff --git a/frontend/src/renderer/components/SessionInspector.tsx b/frontend/src/renderer/components/SessionInspector.tsx index 83464c43..ef6c496f 100644 --- a/frontend/src/renderer/components/SessionInspector.tsx +++ b/frontend/src/renderer/components/SessionInspector.tsx @@ -27,6 +27,7 @@ import { PRAttentionPanel, PRSummaryMeta } from "./PRSummaryDisplay"; type ProjectConfig = components["schemas"]["ProjectConfig"]; type ReviewRun = components["schemas"]["ReviewRun"]; +type PRReviewState = components["schemas"]["PRReviewState"]; type ReviewsResponse = components["schemas"]["ListReviewsResponse"]; type OpenReviewerTerminal = (target: { handleId: string; harness: string }) => void; @@ -427,7 +428,8 @@ function ReviewsView({ return; } if (data?.reviewerHandleId) { - onOpenReviewerTerminal?.({ handleId: data.reviewerHandleId, harness: data.review.harness || "reviewer" }); + const harness = latestReview(data.reviews)?.harness || "reviewer"; + onOpenReviewerTerminal?.({ handleId: data.reviewerHandleId, harness }); } }, }); @@ -472,7 +474,7 @@ function ReviewPanel({ }: { session: WorkspaceSession; config?: ProjectConfig; - reviews: ReviewRun[]; + reviews: PRReviewState[]; reviewerHandleId: string; isLoading: boolean; isTriggering: boolean; @@ -507,8 +509,11 @@ function ReviewPanel({ ); } -function latestReview(reviews: ReviewRun[]): ReviewRun | undefined { - return [...reviews].sort((a, b) => Date.parse(b.createdAt) - Date.parse(a.createdAt))[0]; +function latestReview(reviews: PRReviewState[]): ReviewRun | undefined { + return reviews + .map((review) => review.latestRun) + .filter((review): review is ReviewRun => Boolean(review)) + .sort((a, b) => Date.parse(b.createdAt) - Date.parse(a.createdAt))[0]; } function ReviewerCard({