diff --git a/apps/web/src/lib/code-reviews/client/code-review-worker-client.ts b/apps/web/src/lib/code-reviews/client/code-review-worker-client.ts index 81c98194fe..eaef0ccadd 100644 --- a/apps/web/src/lib/code-reviews/client/code-review-worker-client.ts +++ b/apps/web/src/lib/code-reviews/client/code-review-worker-client.ts @@ -2,6 +2,7 @@ import 'server-only'; import type { CodeReviewPayload } from '../triggers/prepare-review-payload'; import { CODE_REVIEW_WORKER_AUTH_TOKEN } from '@/lib/config.server'; +import * as z from 'zod'; // Fetch timeout in milliseconds const FETCH_TIMEOUT_MS = 10000; @@ -62,6 +63,23 @@ export type CancelReviewResponse = { reviewId: string; }; +const ReviewStatusResponseSchema = z.object({ + reviewId: z.string(), + status: z.enum(['queued', 'running', 'completed', 'failed', 'cancelled']), + sessionId: z.string().optional(), + cliSessionId: z.string().optional(), + startedAt: z.string().optional(), + completedAt: z.string().optional(), + model: z.string().optional(), + totalTokensIn: z.number().optional(), + totalTokensOut: z.number().optional(), + totalCost: z.number().optional(), + errorMessage: z.string().optional(), + terminalReason: z.string().optional(), +}); + +export type ReviewStatusResponse = z.infer; + /** * Code Review Worker API Client * Handles all communication with the Cloudflare Worker for code reviews @@ -147,6 +165,23 @@ class CodeReviewWorkerClient { return response.json() as Promise; } + + async getReviewStatus(reviewId: string): Promise { + const response = await fetchWithTimeout(`${this.baseUrl}/reviews/${reviewId}/status`, { + headers: this.getHeaders(), + }); + + if (response.status === 404) { + return null; + } + + if (!response.ok) { + const errorText = await response.text(); + throw new Error(`Failed to fetch review status: ${response.status} ${errorText}`); + } + + return ReviewStatusResponseSchema.parse(await response.json()); + } } // Export a singleton instance diff --git a/apps/web/src/lib/code-reviews/db/code-reviews.ts b/apps/web/src/lib/code-reviews/db/code-reviews.ts index e347e27bfa..154b79d782 100644 --- a/apps/web/src/lib/code-reviews/db/code-reviews.ts +++ b/apps/web/src/lib/code-reviews/db/code-reviews.ts @@ -11,7 +11,7 @@ import { microdollar_usage, microdollar_usage_metadata, } from '@kilocode/db/schema'; -import { eq, and, desc, count, ne, inArray, sql, sum, gte } from 'drizzle-orm'; +import { eq, and, desc, count, ne, inArray, sql, sum, gte, isNull } from 'drizzle-orm'; import { captureException } from '@sentry/nextjs'; import type { CreateReviewParams, CodeReviewStatus, ListReviewsParams, Owner } from '../core'; import type { CloudAgentCodeReview } from '@kilocode/db/schema'; @@ -163,6 +163,101 @@ export async function updateCodeReviewStatus( } } +export async function updateCodeReviewStatusIfNonTerminal( + reviewId: string, + status: CodeReviewStatus, + updates: { + sessionId?: string; + cliSessionId?: string; + errorMessage?: string; + terminalReason?: CodeReviewTerminalReason; + startedAt?: Date; + completedAt?: Date; + agentVersion?: string; + model?: string; + totalTokensIn?: number; + totalTokensOut?: number; + totalCostMusd?: number; + } = {} +): Promise { + try { + const updateData: Partial = { + status, + updated_at: new Date().toISOString(), + }; + + if (updates.sessionId !== undefined) updateData.session_id = updates.sessionId; + if (updates.cliSessionId !== undefined) updateData.cli_session_id = updates.cliSessionId; + if (updates.errorMessage !== undefined) updateData.error_message = updates.errorMessage; + if (updates.terminalReason !== undefined) updateData.terminal_reason = updates.terminalReason; + if (updates.startedAt !== undefined) updateData.started_at = updates.startedAt.toISOString(); + if (updates.completedAt !== undefined) { + updateData.completed_at = updates.completedAt.toISOString(); + } + if (updates.agentVersion !== undefined) updateData.agent_version = updates.agentVersion; + if (updates.model !== undefined) updateData.model = updates.model; + if (updates.totalTokensIn !== undefined) updateData.total_tokens_in = updates.totalTokensIn; + if (updates.totalTokensOut !== undefined) updateData.total_tokens_out = updates.totalTokensOut; + if (updates.totalCostMusd !== undefined) updateData.total_cost_musd = updates.totalCostMusd; + + if (status === 'running' && !updates.startedAt) { + updateData.started_at = new Date().toISOString(); + } + if ( + (status === 'completed' || status === 'failed' || status === 'cancelled') && + !updates.completedAt + ) { + updateData.completed_at = new Date().toISOString(); + } + + const updated = await db + .update(cloud_agent_code_reviews) + .set(updateData) + .where( + and( + eq(cloud_agent_code_reviews.id, reviewId), + inArray(cloud_agent_code_reviews.status, ['pending', 'queued', 'running']) + ) + ) + .returning({ id: cloud_agent_code_reviews.id }); + + return updated.length > 0; + } catch (error) { + captureException(error, { + tags: { operation: 'updateCodeReviewStatusIfNonTerminal' }, + extra: { reviewId, status, updates }, + }); + throw error; + } +} + +export async function releaseQueuedReviewClaim(reviewId: string): Promise { + try { + const released = await db + .update(cloud_agent_code_reviews) + .set({ + status: 'pending', + updated_at: new Date().toISOString(), + }) + .where( + and( + eq(cloud_agent_code_reviews.id, reviewId), + eq(cloud_agent_code_reviews.status, 'queued'), + isNull(cloud_agent_code_reviews.session_id) + ) + ) + .returning({ id: cloud_agent_code_reviews.id }); + + return released.length > 0; + } catch (error) { + captureException(error, { + tags: { operation: 'releaseQueuedReviewClaim' }, + extra: { reviewId }, + }); + throw error; + } +} + /** * Updates only usage-related columns on a code review, without touching status or timestamps. */ diff --git a/apps/web/src/lib/code-reviews/dispatch/dispatch-pending-reviews.test.ts b/apps/web/src/lib/code-reviews/dispatch/dispatch-pending-reviews.test.ts index cf01fe365d..216d69a50c 100644 --- a/apps/web/src/lib/code-reviews/dispatch/dispatch-pending-reviews.test.ts +++ b/apps/web/src/lib/code-reviews/dispatch/dispatch-pending-reviews.test.ts @@ -1,10 +1,12 @@ const mockDispatchReview = jest.fn(); +const mockGetReviewStatus = jest.fn(); const mockGetAgentConfigForOwner = jest.fn(); const mockPrepareReviewPayload = jest.fn(); jest.mock('@/lib/code-reviews/client/code-review-worker-client', () => ({ codeReviewWorkerClient: { dispatchReview: (...args: unknown[]) => mockDispatchReview(...args), + getReviewStatus: (...args: unknown[]) => mockGetReviewStatus(...args), }, })); @@ -58,6 +60,7 @@ describe('tryDispatchPendingReviews', () => { beforeEach(() => { mockDispatchReview.mockResolvedValue(undefined); + mockGetReviewStatus.mockResolvedValue(null); mockGetAgentConfigForOwner.mockResolvedValue({ id: 'test-agent-config', config: {} }); mockPrepareReviewPayload.mockImplementation((params: { reviewId: string }) => ({ reviewId: params.reviewId, @@ -69,6 +72,7 @@ describe('tryDispatchPendingReviews', () => { .delete(cloud_agent_code_reviews) .where(eq(cloud_agent_code_reviews.repo_full_name, REPO)); mockDispatchReview.mockReset(); + mockGetReviewStatus.mockReset(); mockGetAgentConfigForOwner.mockReset(); mockPrepareReviewPayload.mockReset(); }); @@ -462,4 +466,130 @@ describe('tryDispatchPendingReviews', () => { expect.objectContaining({ reviewId: staleQueuedReview.id }) ); }); + + it('keeps a dispatch timeout claimed when the Worker status probe finds queued DO state', async () => { + const recentTimestamp = minutesAgo(1); + const owner = { type: 'user', id: testUser.id } satisfies ReviewOwner; + await setTestUserBalance(DEFAULT_TIER_BALANCE_MICRODOLLARS); + mockDispatchReview.mockRejectedValue(new Error('Request timeout after 10000ms')); + mockGetReviewStatus.mockResolvedValue({ reviewId: 'unused', status: 'queued' }); + + const [review] = await db + .insert(cloud_agent_code_reviews) + .values( + reviewValues({ + owner, + status: 'pending', + createdAt: recentTimestamp, + updatedAt: recentTimestamp, + }) + ) + .returning({ id: cloud_agent_code_reviews.id }); + + if (!review) { + throw new Error('Expected review to be inserted'); + } + + const result = await tryDispatchPendingReviews({ + type: 'user', + id: testUser.id, + userId: testUser.id, + }); + + const storedReview = await db.query.cloud_agent_code_reviews.findFirst({ + where: eq(cloud_agent_code_reviews.id, review.id), + }); + + expect(result).toEqual({ + dispatched: 1, + pending: 0, + activeCount: 1, + }); + expect(mockGetReviewStatus).toHaveBeenCalledWith(review.id); + expect(storedReview?.status).toBe('queued'); + }); + + it('releases a dispatch timeout claim when the Worker status probe finds no DO state', async () => { + const recentTimestamp = minutesAgo(1); + const owner = { type: 'user', id: testUser.id } satisfies ReviewOwner; + await setTestUserBalance(DEFAULT_TIER_BALANCE_MICRODOLLARS); + mockDispatchReview.mockRejectedValue(new Error('Request timeout after 10000ms')); + mockGetReviewStatus.mockResolvedValue(null); + + const [review] = await db + .insert(cloud_agent_code_reviews) + .values( + reviewValues({ + owner, + status: 'pending', + createdAt: recentTimestamp, + updatedAt: recentTimestamp, + }) + ) + .returning({ id: cloud_agent_code_reviews.id }); + + if (!review) { + throw new Error('Expected review to be inserted'); + } + + const result = await tryDispatchPendingReviews({ + type: 'user', + id: testUser.id, + userId: testUser.id, + }); + + const storedReview = await db.query.cloud_agent_code_reviews.findFirst({ + where: eq(cloud_agent_code_reviews.id, review.id), + }); + + expect(result).toEqual({ + dispatched: 0, + pending: 1, + activeCount: 0, + }); + expect(mockGetReviewStatus).toHaveBeenCalledWith(review.id); + expect(storedReview?.status).toBe('pending'); + }); + + it('keeps a dispatch timeout claim when the Worker status probe also fails', async () => { + const recentTimestamp = minutesAgo(1); + const owner = { type: 'user', id: testUser.id } satisfies ReviewOwner; + await setTestUserBalance(DEFAULT_TIER_BALANCE_MICRODOLLARS); + mockDispatchReview.mockRejectedValue(new Error('Request timeout after 10000ms')); + mockGetReviewStatus.mockRejectedValue(new Error('status probe timeout')); + + const [review] = await db + .insert(cloud_agent_code_reviews) + .values( + reviewValues({ + owner, + status: 'pending', + createdAt: recentTimestamp, + updatedAt: recentTimestamp, + }) + ) + .returning({ id: cloud_agent_code_reviews.id }); + + if (!review) { + throw new Error('Expected review to be inserted'); + } + + const result = await tryDispatchPendingReviews({ + type: 'user', + id: testUser.id, + userId: testUser.id, + }); + + const storedReview = await db.query.cloud_agent_code_reviews.findFirst({ + where: eq(cloud_agent_code_reviews.id, review.id), + }); + + expect(result).toEqual({ + dispatched: 0, + pending: 1, + activeCount: 0, + }); + expect(mockGetReviewStatus).toHaveBeenCalledWith(review.id); + expect(storedReview?.status).toBe('queued'); + }); }); diff --git a/apps/web/src/lib/code-reviews/dispatch/dispatch-pending-reviews.ts b/apps/web/src/lib/code-reviews/dispatch/dispatch-pending-reviews.ts index c10886c983..24d87c7b08 100644 --- a/apps/web/src/lib/code-reviews/dispatch/dispatch-pending-reviews.ts +++ b/apps/web/src/lib/code-reviews/dispatch/dispatch-pending-reviews.ts @@ -19,7 +19,11 @@ import { eq, and, or, count, gte, lt, sql } from 'drizzle-orm'; import type { Owner } from '../core'; import { prepareReviewPayload } from '../triggers/prepare-review-payload'; import { getAgentConfigForOwner } from '@/lib/agent-config/db/agent-configs'; -import { updateCodeReviewStatus } from '../db/code-reviews'; +import { + releaseQueuedReviewClaim, + updateCodeReviewStatus, + updateCodeReviewStatusIfNonTerminal, +} from '../db/code-reviews'; import { captureException } from '@sentry/nextjs'; import { errorExceptInTest, logExceptInTest } from '@/lib/utils.server'; import { codeReviewWorkerClient } from '../client/code-review-worker-client'; @@ -282,9 +286,7 @@ async function dispatchReview( } // 4. Dispatch to Cloudflare Worker to create CodeReviewOrchestrator DO. - // If this fails, keep the claim in `queued` and rely on stale-claim - // recovery. A transport failure is ambiguous: the worker may have - // created the DO even if this request did not observe the response. + // If this fails, probe DO state before deciding whether to release the claim. const agentVersion = 'v2'; try { await codeReviewWorkerClient.dispatchReview({ @@ -301,7 +303,7 @@ async function dispatchReview( tags: { operation: 'dispatch-review-worker-call' }, extra: { reviewId: review.id, owner }, }); - return false; + return handleAmbiguousDispatchFailure(review, owner); } // 5. Record which agent version was dispatched without rewriting status. @@ -329,3 +331,53 @@ async function dispatchReview( return true; } + +async function handleAmbiguousDispatchFailure( + review: CloudAgentCodeReview, + owner: Owner +): Promise { + try { + const workerStatus = await codeReviewWorkerClient.getReviewStatus(review.id); + + if (!workerStatus) { + const released = await releaseQueuedReviewClaim(review.id); + logExceptInTest('[dispatchReview] Worker has no DO state after dispatch failure', { + reviewId: review.id, + released, + }); + return false; + } + + if (workerStatus.status === 'queued' || workerStatus.status === 'running') { + logExceptInTest('[dispatchReview] Worker accepted review despite dispatch failure', { + reviewId: review.id, + status: workerStatus.status, + }); + return true; + } + + const mirrored = await updateCodeReviewStatusIfNonTerminal(review.id, workerStatus.status, { + sessionId: workerStatus.sessionId, + cliSessionId: workerStatus.cliSessionId, + errorMessage: workerStatus.errorMessage, + completedAt: workerStatus.completedAt ? new Date(workerStatus.completedAt) : undefined, + }); + + logExceptInTest('[dispatchReview] Mirrored terminal Worker status after dispatch failure', { + reviewId: review.id, + status: workerStatus.status, + mirrored, + }); + return true; + } catch (statusError) { + errorExceptInTest('[dispatchReview] Worker status probe failed, leaving review queued', { + reviewId: review.id, + error: statusError, + }); + captureException(statusError, { + tags: { operation: 'dispatch-review-worker-status-probe' }, + extra: { reviewId: review.id, owner }, + }); + return false; + } +} diff --git a/apps/web/src/routers/code-reviews-router.test.ts b/apps/web/src/routers/code-reviews-router.test.ts new file mode 100644 index 0000000000..31fcf05b41 --- /dev/null +++ b/apps/web/src/routers/code-reviews-router.test.ts @@ -0,0 +1,190 @@ +const mockCancelReview = jest.fn(); + +jest.mock('@/lib/code-reviews/client/code-review-worker-client', () => ({ + codeReviewWorkerClient: { + cancelReview: (...args: unknown[]) => mockCancelReview(...args), + }, +})); + +jest.mock('@/lib/integrations/platforms/github/adapter', () => ({ + createCheckRun: jest.fn(), + updateCheckRun: jest.fn(), +})); + +jest.mock('@/lib/integrations/platforms/gitlab/adapter', () => ({ + setCommitStatus: jest.fn(), +})); + +import { db } from '@/lib/drizzle'; +import { createCallerForUser } from '@/routers/test-utils'; +import { insertTestUser } from '@/tests/helpers/user.helper'; +import { cloud_agent_code_reviews, kilocode_users, type User } from '@kilocode/db/schema'; +import { eq } from 'drizzle-orm'; + +const REPO = `test-org/code-reviews-cancel-${Date.now()}`; +type ReviewStatus = 'pending' | 'queued' | 'running'; +type CodeReviewInsert = typeof cloud_agent_code_reviews.$inferInsert; + +function reviewValues( + userId: string, + status: ReviewStatus, + overrides: Partial = {} +) { + const idSuffix = crypto.randomUUID(); + return { + owned_by_user_id: userId, + owned_by_organization_id: null, + platform_integration_id: null, + check_run_id: null, + repo_full_name: REPO, + pr_number: 1, + pr_url: `https://github.com/${REPO}/pull/1`, + pr_title: 'Test PR', + pr_author: 'octocat', + base_ref: 'main', + head_ref: `feature/${idSuffix}`, + head_sha: `sha-${idSuffix}`, + status, + agent_version: 'v2', + ...overrides, + } satisfies CodeReviewInsert; +} + +describe('codeReviewRouter.cancel', () => { + let testUser: User; + + beforeAll(async () => { + testUser = await insertTestUser(); + }); + + beforeEach(() => { + mockCancelReview.mockResolvedValue({ success: true, reviewId: 'unused' }); + }); + + afterEach(async () => { + await db + .delete(cloud_agent_code_reviews) + .where(eq(cloud_agent_code_reviews.repo_full_name, REPO)); + mockCancelReview.mockReset(); + }); + + afterAll(async () => { + await db.delete(kilocode_users).where(eq(kilocode_users.id, testUser.id)); + }); + + it('locally cancels a queued review without a session when the Worker returns false', async () => { + const [review] = await db + .insert(cloud_agent_code_reviews) + .values(reviewValues(testUser.id, 'queued')) + .returning({ id: cloud_agent_code_reviews.id }); + mockCancelReview.mockResolvedValue({ success: false, reviewId: review.id }); + + const caller = await createCallerForUser(testUser.id); + const result = await caller.codeReviews.cancel({ reviewId: review.id }); + + const storedReview = await db.query.cloud_agent_code_reviews.findFirst({ + where: eq(cloud_agent_code_reviews.id, review.id), + }); + + expect(result.success).toBe(true); + expect(mockCancelReview).toHaveBeenCalledWith(review.id, 'Cancelled by user'); + expect(storedReview?.status).toBe('cancelled'); + expect(storedReview?.completed_at).toBeTruthy(); + }); + + it('cancels pending reviews locally without calling the Worker', async () => { + const [review] = await db + .insert(cloud_agent_code_reviews) + .values(reviewValues(testUser.id, 'pending')) + .returning({ id: cloud_agent_code_reviews.id }); + + const caller = await createCallerForUser(testUser.id); + const result = await caller.codeReviews.cancel({ reviewId: review.id }); + + const storedReview = await db.query.cloud_agent_code_reviews.findFirst({ + where: eq(cloud_agent_code_reviews.id, review.id), + }); + + expect(result.success).toBe(true); + expect(mockCancelReview).not.toHaveBeenCalled(); + expect(storedReview?.status).toBe('cancelled'); + expect(storedReview?.completed_at).toBeTruthy(); + }); + + it('locally cancels a queued review without a session when the Worker throws', async () => { + const [review] = await db + .insert(cloud_agent_code_reviews) + .values(reviewValues(testUser.id, 'queued')) + .returning({ id: cloud_agent_code_reviews.id }); + mockCancelReview.mockRejectedValue(new Error('Request timeout after 10000ms')); + + const caller = await createCallerForUser(testUser.id); + const result = await caller.codeReviews.cancel({ reviewId: review.id }); + + const storedReview = await db.query.cloud_agent_code_reviews.findFirst({ + where: eq(cloud_agent_code_reviews.id, review.id), + }); + + expect(result.success).toBe(true); + expect(mockCancelReview).toHaveBeenCalledWith(review.id, 'Cancelled by user'); + expect(storedReview?.status).toBe('cancelled'); + expect(storedReview?.completed_at).toBeTruthy(); + }); + + it('does not claim success for queued reviews with a session when the Worker returns false', async () => { + const [review] = await db + .insert(cloud_agent_code_reviews) + .values(reviewValues(testUser.id, 'queued', { session_id: 'agent-session-1' })) + .returning({ id: cloud_agent_code_reviews.id }); + mockCancelReview.mockResolvedValue({ success: false, reviewId: review.id }); + + const caller = await createCallerForUser(testUser.id); + const result = await caller.codeReviews.cancel({ reviewId: review.id }); + + const storedReview = await db.query.cloud_agent_code_reviews.findFirst({ + where: eq(cloud_agent_code_reviews.id, review.id), + }); + + expect(result).toEqual({ success: false, error: 'Worker could not cancel code review' }); + expect(storedReview?.status).toBe('queued'); + expect(storedReview?.completed_at).toBeNull(); + }); + + it('does not locally cancel queued reviews with a session when the Worker throws', async () => { + const [review] = await db + .insert(cloud_agent_code_reviews) + .values(reviewValues(testUser.id, 'queued', { session_id: 'agent-session-1' })) + .returning({ id: cloud_agent_code_reviews.id }); + mockCancelReview.mockRejectedValue(new Error('Request timeout after 10000ms')); + + const caller = await createCallerForUser(testUser.id); + const result = await caller.codeReviews.cancel({ reviewId: review.id }); + + const storedReview = await db.query.cloud_agent_code_reviews.findFirst({ + where: eq(cloud_agent_code_reviews.id, review.id), + }); + + expect(result).toEqual({ success: false, error: 'Worker could not cancel code review' }); + expect(storedReview?.status).toBe('queued'); + expect(storedReview?.completed_at).toBeNull(); + }); + + it('does not locally cancel running reviews when the Worker throws', async () => { + const [review] = await db + .insert(cloud_agent_code_reviews) + .values(reviewValues(testUser.id, 'running', { session_id: 'agent-session-1' })) + .returning({ id: cloud_agent_code_reviews.id }); + mockCancelReview.mockRejectedValue(new Error('Request timeout after 10000ms')); + + const caller = await createCallerForUser(testUser.id); + const result = await caller.codeReviews.cancel({ reviewId: review.id }); + + const storedReview = await db.query.cloud_agent_code_reviews.findFirst({ + where: eq(cloud_agent_code_reviews.id, review.id), + }); + + expect(result).toEqual({ success: false, error: 'Worker could not cancel code review' }); + expect(storedReview?.status).toBe('running'); + expect(storedReview?.completed_at).toBeNull(); + }); +}); diff --git a/apps/web/src/routers/code-reviews/code-reviews-router.ts b/apps/web/src/routers/code-reviews/code-reviews-router.ts index 41b9bacdc2..ab2a69bbd3 100644 --- a/apps/web/src/routers/code-reviews/code-reviews-router.ts +++ b/apps/web/src/routers/code-reviews/code-reviews-router.ts @@ -376,19 +376,44 @@ export const codeReviewRouter = createTRPCRouter({ // This will: stop stream processing, update DB, and interrupt cloud agent session (kill processes) if (['running', 'queued'].includes(review.status)) { try { - await codeReviewWorkerClient.cancelReview(input.reviewId, 'Cancelled by user'); - // Worker updates DB status and interrupts cloud agent session + const cancelResult = await codeReviewWorkerClient.cancelReview( + input.reviewId, + 'Cancelled by user' + ); + if (!cancelResult.success && review.status === 'queued' && !review.session_id) { + logExceptInTest( + '[cancel] Worker cancel returned false, cancelling queued review locally', + { + reviewId: input.reviewId, + status: review.status, + } + ); + await cancelCodeReview(input.reviewId); + try { + await cancelPRGateCheck(review); + } catch (gateError) { + logExceptInTest('[cancel] Failed to finalize PR gate check:', gateError); + } + return successResult({ message: 'Code review cancelled successfully' }); + } + if (!cancelResult.success) { + return failureResult('Worker could not cancel code review'); + } + // Worker updates DB status and interrupts cloud agent session when cancellation succeeds. return successResult({ message: 'Code review cancelled successfully' }); } catch (workerError) { - // If worker call fails, still update DB status as fallback - console.error('Worker cancel failed, updating DB directly:', workerError); - await cancelCodeReview(input.reviewId); - try { - await cancelPRGateCheck(review); - } catch (gateError) { - logExceptInTest('[cancel] Failed to finalize PR gate check:', gateError); + if (review.status === 'queued' && !review.session_id) { + console.error('Worker cancel failed, updating DB directly:', workerError); + await cancelCodeReview(input.reviewId); + try { + await cancelPRGateCheck(review); + } catch (gateError) { + logExceptInTest('[cancel] Failed to finalize PR gate check:', gateError); + } + return successResult({ message: 'Code review cancelled (worker unreachable)' }); } - return successResult({ message: 'Code review cancelled (worker unreachable)' }); + console.error('Worker cancel failed:', workerError); + return failureResult('Worker could not cancel code review'); } } diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 70c3fa8311..cc2bb566cf 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -1404,7 +1404,13 @@ importers: hono: specifier: ^4.12.7 version: 4.12.8 + zod: + specifier: 'catalog:' + version: 4.3.6 devDependencies: + '@cloudflare/vitest-pool-workers': + specifier: ^0.12.21 + version: 0.12.21(@cloudflare/workers-types@4.20260430.1)(@vitest/runner@4.1.0)(@vitest/snapshot@4.1.0)(bufferutil@4.1.0)(utf-8-validate@6.0.6)(vitest@3.2.4) '@cloudflare/workers-types': specifier: 'catalog:' version: 4.20260430.1 @@ -1414,6 +1420,9 @@ importers: typescript: specifier: 'catalog:' version: 5.9.3 + vitest: + specifier: ^3.2.4 + version: 3.2.4(@types/debug@4.1.12)(@types/node@25.5.0)(@vitest/ui@3.2.4)(esbuild@0.27.4)(jiti@2.6.1)(terser@5.46.0)(tsx@4.21.0)(yaml@2.8.2) wrangler: specifier: 'catalog:' version: 4.87.0(@cloudflare/workers-types@4.20260430.1)(bufferutil@4.1.0)(utf-8-validate@6.0.6) diff --git a/services/code-review-infra/package.json b/services/code-review-infra/package.json index 59f920b17f..112e2ff5b1 100644 --- a/services/code-review-infra/package.json +++ b/services/code-review-infra/package.json @@ -8,16 +8,20 @@ "dev": "wrangler dev", "tail": "wrangler tail", "typecheck": "tsgo --noEmit", - "lint": "pnpm -w exec oxlint --config .oxlintrc.json services/code-review-infra/src" + "lint": "pnpm -w exec oxlint --config .oxlintrc.json services/code-review-infra/src", + "test": "vitest run --config vitest.workers.config.ts" }, "devDependencies": { + "@cloudflare/vitest-pool-workers": "^0.12.21", "@cloudflare/workers-types": "catalog:", "@typescript/native-preview": "catalog:", "typescript": "catalog:", + "vitest": "^3.2.4", "wrangler": "catalog:" }, "dependencies": { "@kilocode/worker-utils": "workspace:*", - "hono": "catalog:" + "hono": "catalog:", + "zod": "catalog:" } } diff --git a/services/code-review-infra/src/code-review-orchestrator.ts b/services/code-review-infra/src/code-review-orchestrator.ts index 9fd843fd8c..0aa69ee3c6 100644 --- a/services/code-review-infra/src/code-review-orchestrator.ts +++ b/services/code-review-infra/src/code-review-orchestrator.ts @@ -18,9 +18,13 @@ import type { CodeReview, CodeReviewStatus, CodeReviewStatusResponse, + CodeReviewStatusResult, CodeReviewEvent, SessionInput, } from './types'; +import { InternalStatusResponseSchema } from './types'; + +type UpdateStatusResult = 'updated' | 'db-terminal'; /** Shape of an SSE event parsed from the cloud agent stream */ type SseEventPayload = { @@ -79,6 +83,9 @@ export class CodeReviewOrchestrator extends DurableObject { /** Cleanup delay after review completion (7 days) */ private static readonly CLEANUP_DELAY_MS = 7 * 24 * 60 * 60 * 1000; + /** Fallback alarm for queued reviews accepted by the Worker but not run via waitUntil. */ + private static readonly RUN_REVIEW_FALLBACK_DELAY_MS = 30_000; + /** Batch size for event persistence (save every N events to reduce CPU usage) */ private static readonly EVENT_BATCH_SIZE = 10; @@ -104,9 +111,7 @@ export class CodeReviewOrchestrator extends DurableObject { } /** - * Alarm handler for scheduled cleanup tasks. - * Only used for cleanup after review completion (7 days later). - * Review execution is handled via runReview() in HTTP context to avoid 15-min wall time limit. + * Alarm handler for review recovery and scheduled cleanup tasks. */ async alarm(): Promise { try { @@ -129,6 +134,15 @@ export class CodeReviewOrchestrator extends DurableObject { status: this.state.status, }); await this.ctx.storage.deleteAll(); + } else if (this.state.status === 'queued') { + console.log('[CodeReviewOrchestrator] Fallback alarm starting queued review', { + reviewId: this.state.reviewId, + }); + await this.runReview(); + } else if (this.state.status === 'running') { + console.log('[CodeReviewOrchestrator] Fallback alarm no-op for running review', { + reviewId: this.state.reviewId, + }); } else { // Unexpected state - log for debugging console.warn('[CodeReviewOrchestrator] Alarm fired for non-terminal state', { @@ -187,7 +201,7 @@ export class CodeReviewOrchestrator extends DurableObject { errorMessage?: string; terminalReason?: CloudAgentTerminalReason; } - ): Promise { + ): Promise { // Check if there are any actual changes to process const statusChanged = this.state.status !== status; const sessionIdChanged = @@ -213,7 +227,16 @@ export class CodeReviewOrchestrator extends DurableObject { !errorMessageChanged && !terminalReasonChanged ) { - return; + if (status !== 'running') { + return 'updated'; + } + + try { + return await this.updateDBStatus(status, options); + } catch (error) { + console.error('[CodeReviewOrchestrator] Failed to refresh DB running status:', error); + return 'updated'; + } } // Update status if it changed @@ -271,7 +294,10 @@ export class CodeReviewOrchestrator extends DurableObject { // Update Next.js DB via internal API try { - await this.updateDBStatus(status, options); + const dbUpdateResult = await this.updateDBStatus(status, options); + if (dbUpdateResult === 'db-terminal') { + return 'db-terminal'; + } } catch (error) { console.error('[CodeReviewOrchestrator] Failed to update DB status:', error); @@ -285,6 +311,23 @@ export class CodeReviewOrchestrator extends DurableObject { } // For non-terminal states (queued/running), continue - we've saved state locally } + + return 'updated'; + } + + private async setLocalTerminalStateFromDB( + status: Extract + ): Promise { + this.state.status = status; + this.state.completedAt = this.state.completedAt ?? new Date().toISOString(); + this.state.events = []; + this.state.updatedAt = new Date().toISOString(); + await this.ctx.storage.setAlarm(Date.now() + CodeReviewOrchestrator.CLEANUP_DELAY_MS); + await this.saveState(); + console.log('[CodeReviewOrchestrator] Local state synced to terminal DB status', { + reviewId: this.state.reviewId, + status, + }); } /** @@ -298,7 +341,7 @@ export class CodeReviewOrchestrator extends DurableObject { errorMessage?: string; terminalReason?: CloudAgentTerminalReason; } - ): Promise { + ): Promise { // Use path-based endpoint (same as callback endpoint for consistency) const url = `${this.env.API_URL}/api/internal/code-review-status/${this.state.reviewId}`; @@ -324,6 +367,14 @@ export class CodeReviewOrchestrator extends DurableObject { const errorText = await response.text(); throw new Error(`Failed to update DB status: ${response.status} ${errorText}`); } + + const body = InternalStatusResponseSchema.parse(await response.json()); + if (body.message === 'Review already in terminal state' && body.currentStatus) { + await this.setLocalTerminalStateFromDB(body.currentStatus); + return 'db-terminal'; + } + + return 'updated'; } private getTerminalReason(error: unknown): CloudAgentTerminalReason | undefined { @@ -451,6 +502,9 @@ export class CodeReviewOrchestrator extends DurableObject { previousCloudAgentSessionId: params.previousCloudAgentSessionId, }; await this.saveState(); + await this.ctx.storage.setAlarm( + Date.now() + CodeReviewOrchestrator.RUN_REVIEW_FALLBACK_DELAY_MS + ); console.log('[CodeReviewOrchestrator] Review created and queued', { reviewId: params.reviewId, @@ -458,8 +512,10 @@ export class CodeReviewOrchestrator extends DurableObject { agentVersion: params.agentVersion, }); - // Note: Review execution is triggered via runReview() from the worker - // Alarms are only used for cleanup after completion. + console.log('[CodeReviewOrchestrator] Scheduled queued review fallback alarm', { + reviewId: params.reviewId, + fallbackInMs: CodeReviewOrchestrator.RUN_REVIEW_FALLBACK_DELAY_MS, + }); return { status: this.state.status }; } @@ -468,12 +524,21 @@ export class CodeReviewOrchestrator extends DurableObject { * RPC method: Return current state. */ async status(): Promise { + const currentStatus = await this.getStatus(); + if (!currentStatus) { + throw new Error('Review not found'); + } + + return currentStatus; + } + + async getStatus(): Promise { if (!this.state) { await this.loadState(); } if (!this.state) { - throw new Error('Review not found'); + return null; } return { @@ -634,7 +699,8 @@ export class CodeReviewOrchestrator extends DurableObject { const client = this.getCloudAgentNextClient(); try { - await this.updateStatus('running'); + const statusUpdateResult = await this.updateStatus('running'); + if (statusUpdateResult === 'db-terminal') return; console.log('[CodeReviewOrchestrator] Starting review via cloud-agent-next', { reviewId: this.state.reviewId, @@ -769,7 +835,8 @@ export class CodeReviewOrchestrator extends DurableObject { }); try { - await this.updateStatus('running'); + const statusUpdateResult = await this.updateStatus('running'); + if (statusUpdateResult === 'db-terminal') return; // Build internal headers (internalApiProtectedProcedure — API key + Bearer token) const internalHeaders: Record = { @@ -892,7 +959,8 @@ export class CodeReviewOrchestrator extends DurableObject { const runStartTime = Date.now(); try { - await this.updateStatus('running'); + const statusUpdateResult = await this.updateStatus('running'); + if (statusUpdateResult === 'db-terminal') return; console.log('[CodeReviewOrchestrator] Starting review with async streaming', { reviewId: this.state.reviewId, diff --git a/services/code-review-infra/src/index.ts b/services/code-review-infra/src/index.ts index 086efd59fe..ac985a5e95 100644 --- a/services/code-review-infra/src/index.ts +++ b/services/code-review-infra/src/index.ts @@ -149,6 +149,31 @@ app.get('/reviews/:reviewId/events', async (c: Context) => { return c.json(result); }); +// Route: GET /reviews/:reviewId/status +app.get('/reviews/:reviewId/status', async (c: Context) => { + const reviewId = c.req.param('reviewId'); + + if (!reviewId) { + return c.json({ error: 'reviewId parameter required' }, 400); + } + + console.log('[GET /reviews/:reviewId/status] Fetching status', { reviewId }); + + const id = c.env.CODE_REVIEW_ORCHESTRATOR.idFromName(reviewId); + + const result = await withDORetry( + () => c.env.CODE_REVIEW_ORCHESTRATOR.get(id), + stub => stub.getStatus(), + 'getStatus' + ); + + if (!result) { + return c.json({ error: 'Review not found' }, 404); + } + + return c.json(result); +}); + // Route: POST /reviews/:reviewId/cancel app.post('/reviews/:reviewId/cancel', async (c: Context) => { const reviewId = c.req.param('reviewId'); diff --git a/services/code-review-infra/src/types.ts b/services/code-review-infra/src/types.ts index 535503d2a9..96934042d0 100644 --- a/services/code-review-infra/src/types.ts +++ b/services/code-review-infra/src/types.ts @@ -4,6 +4,7 @@ import type { CodeReviewOrchestrator } from './code-review-orchestrator'; import type { Owner, MCPServerConfig, CloudAgentTerminalReason } from '@kilocode/worker-utils'; +import * as z from 'zod'; export type { Owner, MCPServerConfig }; @@ -89,6 +90,17 @@ export interface CodeReviewStatusResponse { terminalReason?: CloudAgentTerminalReason; } +export type CodeReviewStatusResult = CodeReviewStatusResponse | null; + +export const InternalStatusResponseSchema = z.object({ + success: z.boolean().optional(), + message: z.string().optional(), + currentStatus: z.enum(['completed', 'failed', 'cancelled']).optional(), + error: z.string().optional(), +}); + +export type InternalStatusResponse = z.infer; + export interface CodeReviewRequest { reviewId: string; authToken: string; diff --git a/services/code-review-infra/test/integration/code-review-orchestrator.test.ts b/services/code-review-infra/test/integration/code-review-orchestrator.test.ts new file mode 100644 index 0000000000..14587aa003 --- /dev/null +++ b/services/code-review-infra/test/integration/code-review-orchestrator.test.ts @@ -0,0 +1,200 @@ +import { env, runDurableObjectAlarm, runInDurableObject, SELF } from 'cloudflare:test'; +import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest'; +import type { CodeReviewOrchestrator } from '../../src/code-review-orchestrator'; +import type { CodeReview, SessionInput } from '../../src/types'; + +function getReviewStub(name = `review-${crypto.randomUUID()}`) { + const id = env.CODE_REVIEW_ORCHESTRATOR.idFromName(name); + return env.CODE_REVIEW_ORCHESTRATOR.get(id); +} + +function sessionInput(): SessionInput { + return { + gitUrl: 'https://example.test/repo.git', + prompt: 'Review this pull request', + mode: 'code', + model: 'test-model', + upstreamBranch: 'main', + }; +} + +function codeReview(overrides: Partial = {}): CodeReview { + return { + reviewId: `review-${crypto.randomUUID()}`, + authToken: 'test-auth-token', + sessionInput: sessionInput(), + owner: { + type: 'user', + id: 'user-id', + userId: 'user-id', + }, + status: 'queued', + updatedAt: new Date().toISOString(), + agentVersion: 'v2', + ...overrides, + }; +} + +function workerAuthHeaders(): HeadersInit { + return { Authorization: 'Bearer test-backend-token' }; +} + +describe('CodeReviewOrchestrator recovery', () => { + const originalFetch = globalThis.fetch; + + beforeEach(() => { + vi.restoreAllMocks(); + }); + + afterEach(() => { + globalThis.fetch = originalFetch; + }); + + it('start arms a fallback alarm for a queued review', async () => { + const stub = getReviewStub(); + + await stub.start({ + reviewId: crypto.randomUUID(), + authToken: 'test-auth-token', + sessionInput: sessionInput(), + owner: { type: 'user', id: 'user-id', userId: 'user-id' }, + agentVersion: 'v2', + }); + + const alarm = await runInDurableObject(stub, async (_instance: CodeReviewOrchestrator, state) => + state.storage.getAlarm() + ); + + expect(alarm).toEqual(expect.any(Number)); + expect(alarm).toBeGreaterThan(Date.now()); + }); + + it('status route returns DO status and 404s when no state exists', async () => { + const missingId = crypto.randomUUID(); + const missingResponse = await SELF.fetch(`https://worker.test/reviews/${missingId}/status`, { + headers: workerAuthHeaders(), + }); + expect(missingResponse.status).toBe(404); + + const reviewId = crypto.randomUUID(); + const stub = getReviewStub(reviewId); + await stub.start({ + reviewId, + authToken: 'test-auth-token', + sessionInput: sessionInput(), + owner: { type: 'user', id: 'user-id', userId: 'user-id' }, + agentVersion: 'v2', + }); + + const response = await SELF.fetch(`https://worker.test/reviews/${reviewId}/status`, { + headers: workerAuthHeaders(), + }); + + expect(response.status).toBe(200); + await expect(response.json()).resolves.toMatchObject({ + reviewId, + status: 'queued', + }); + }); + + it('queued review alarm retries runReview and transitions to running', async () => { + const stub = getReviewStub(); + const fetchMock = vi.fn(async (request: RequestInfo | URL) => { + const url = String(request); + if (url.includes('/api/internal/code-review-status/')) { + return Response.json({ success: true }); + } + if (url.includes('/trpc/prepareSession')) { + return Response.json({ + result: { + data: { + cloudAgentSessionId: 'agent-test-session', + kiloSessionId: 'ses_test_session', + }, + }, + }); + } + if (url.includes('/trpc/initiateFromKilocodeSessionV2')) { + return Response.json({ result: { data: { executionId: 'exec-test', status: 'running' } } }); + } + return new Response('unexpected fetch', { status: 500 }); + }); + globalThis.fetch = fetchMock; + + await runInDurableObject(stub, async (_instance: CodeReviewOrchestrator, state) => { + await state.storage.put('state', codeReview()); + await state.storage.setAlarm(Date.now() + 30_000); + }); + + const ran = await runDurableObjectAlarm(stub); + + expect(ran).toBe(true); + const status = await stub.status(); + expect(status).toMatchObject({ + status: 'running', + sessionId: 'agent-test-session', + cliSessionId: 'ses_test_session', + }); + expect(fetchMock).toHaveBeenCalledWith( + 'https://cloud-agent-next.example.test/trpc/prepareSession', + expect.any(Object) + ); + expect(fetchMock).toHaveBeenCalledWith( + 'https://cloud-agent-next.example.test/trpc/initiateFromKilocodeSessionV2', + expect.any(Object) + ); + }); + + it('aborts alarm recovery before cloud-agent calls when DB is already terminal', async () => { + const stub = getReviewStub(); + const fetchMock = vi.fn(async (request: RequestInfo | URL) => { + const url = String(request); + if (url.includes('/api/internal/code-review-status/')) { + return Response.json({ + success: true, + message: 'Review already in terminal state', + currentStatus: 'cancelled', + }); + } + return new Response('cloud-agent should not be called', { status: 500 }); + }); + globalThis.fetch = fetchMock; + + await runInDurableObject(stub, async (_instance: CodeReviewOrchestrator, state) => { + await state.storage.put('state', codeReview()); + await state.storage.setAlarm(Date.now() + 30_000); + }); + + const ran = await runDurableObjectAlarm(stub); + + expect(ran).toBe(true); + const status = await stub.status(); + expect(status.status).toBe('cancelled'); + expect(fetchMock).toHaveBeenCalledTimes(1); + }); + + it('terminal cleanup alarm still deletes storage', async () => { + const stub = getReviewStub(); + + await runInDurableObject(stub, async (_instance: CodeReviewOrchestrator, state) => { + await state.storage.put( + 'state', + codeReview({ + status: 'completed', + completedAt: new Date().toISOString(), + events: [{ timestamp: new Date().toISOString(), eventType: 'test', message: 'stored' }], + }) + ); + await state.storage.setAlarm(Date.now() + 60_000); + }); + + const ran = await runDurableObjectAlarm(stub); + + expect(ran).toBe(true); + const stored = await runInDurableObject( + stub, + async (_instance: CodeReviewOrchestrator, state) => state.storage.get('state') + ); + expect(stored).toBeUndefined(); + }); +}); diff --git a/services/code-review-infra/vitest.workers.config.ts b/services/code-review-infra/vitest.workers.config.ts new file mode 100644 index 0000000000..e228ce7236 --- /dev/null +++ b/services/code-review-infra/vitest.workers.config.ts @@ -0,0 +1,17 @@ +import { defineWorkersProject } from '@cloudflare/vitest-pool-workers/config'; + +export default defineWorkersProject({ + test: { + name: 'integration', + globals: true, + include: ['test/integration/**/*.test.ts'], + poolOptions: { + workers: { + singleWorker: true, + wrangler: { + configPath: './wrangler.test.jsonc', + }, + }, + }, + }, +}); diff --git a/services/code-review-infra/wrangler.test.jsonc b/services/code-review-infra/wrangler.test.jsonc new file mode 100644 index 0000000000..9bdcdd6c6c --- /dev/null +++ b/services/code-review-infra/wrangler.test.jsonc @@ -0,0 +1,28 @@ +{ + "$schema": "node_modules/wrangler/config-schema.json", + "name": "kilo-code-review-worker-test", + "main": "src/index.ts", + "compatibility_date": "2025-01-20", + "compatibility_flags": ["nodejs_compat"], + "durable_objects": { + "bindings": [ + { + "name": "CODE_REVIEW_ORCHESTRATOR", + "class_name": "CodeReviewOrchestrator", + }, + ], + }, + "migrations": [ + { + "tag": "v1", + "new_sqlite_classes": ["CodeReviewOrchestrator"], + }, + ], + "vars": { + "API_URL": "https://api.example.test", + "INTERNAL_API_SECRET": "test-internal-secret", + "CLOUD_AGENT_URL": "https://cloud-agent.example.test", + "CLOUD_AGENT_NEXT_URL": "https://cloud-agent-next.example.test", + "BACKEND_AUTH_TOKEN": "test-backend-token", + }, +}