Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions apps/web/src/lib/code-reviews/client/code-review-worker-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import 'server-only';

import type { CodeReviewPayload } from '../triggers/prepare-review-payload';
import { CODE_REVIEW_WORKER_AUTH_TOKEN } from '@/lib/config.server';
import * as z from 'zod';

// Fetch timeout in milliseconds
const FETCH_TIMEOUT_MS = 10000;
Expand Down Expand Up @@ -62,6 +63,23 @@ export type CancelReviewResponse = {
reviewId: string;
};

const ReviewStatusResponseSchema = z.object({
reviewId: z.string(),
status: z.enum(['queued', 'running', 'completed', 'failed', 'cancelled']),
sessionId: z.string().optional(),
cliSessionId: z.string().optional(),
startedAt: z.string().optional(),
completedAt: z.string().optional(),
model: z.string().optional(),
totalTokensIn: z.number().optional(),
totalTokensOut: z.number().optional(),
totalCost: z.number().optional(),
errorMessage: z.string().optional(),
terminalReason: z.string().optional(),
});

export type ReviewStatusResponse = z.infer<typeof ReviewStatusResponseSchema>;

/**
* Code Review Worker API Client
* Handles all communication with the Cloudflare Worker for code reviews
Expand Down Expand Up @@ -147,6 +165,23 @@ class CodeReviewWorkerClient {

return response.json() as Promise<CancelReviewResponse>;
}

async getReviewStatus(reviewId: string): Promise<ReviewStatusResponse | null> {
const response = await fetchWithTimeout(`${this.baseUrl}/reviews/${reviewId}/status`, {
headers: this.getHeaders(),
});

if (response.status === 404) {
return null;
}

if (!response.ok) {
const errorText = await response.text();
throw new Error(`Failed to fetch review status: ${response.status} ${errorText}`);
}

return ReviewStatusResponseSchema.parse(await response.json());
}
}

// Export a singleton instance
Expand Down
97 changes: 96 additions & 1 deletion apps/web/src/lib/code-reviews/db/code-reviews.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import {
microdollar_usage,
microdollar_usage_metadata,
} from '@kilocode/db/schema';
import { eq, and, desc, count, ne, inArray, sql, sum, gte } from 'drizzle-orm';
import { eq, and, desc, count, ne, inArray, sql, sum, gte, isNull } from 'drizzle-orm';
import { captureException } from '@sentry/nextjs';
import type { CreateReviewParams, CodeReviewStatus, ListReviewsParams, Owner } from '../core';
import type { CloudAgentCodeReview } from '@kilocode/db/schema';
Expand Down Expand Up @@ -163,6 +163,101 @@ export async function updateCodeReviewStatus(
}
}

export async function updateCodeReviewStatusIfNonTerminal(
reviewId: string,
status: CodeReviewStatus,
updates: {
sessionId?: string;
cliSessionId?: string;
errorMessage?: string;
terminalReason?: CodeReviewTerminalReason;
startedAt?: Date;
completedAt?: Date;
agentVersion?: string;
model?: string;
totalTokensIn?: number;
totalTokensOut?: number;
totalCostMusd?: number;
} = {}
): Promise<boolean> {
try {
const updateData: Partial<typeof cloud_agent_code_reviews.$inferInsert> = {
status,
updated_at: new Date().toISOString(),
};

if (updates.sessionId !== undefined) updateData.session_id = updates.sessionId;
if (updates.cliSessionId !== undefined) updateData.cli_session_id = updates.cliSessionId;
if (updates.errorMessage !== undefined) updateData.error_message = updates.errorMessage;
if (updates.terminalReason !== undefined) updateData.terminal_reason = updates.terminalReason;
if (updates.startedAt !== undefined) updateData.started_at = updates.startedAt.toISOString();
if (updates.completedAt !== undefined) {
updateData.completed_at = updates.completedAt.toISOString();
}
if (updates.agentVersion !== undefined) updateData.agent_version = updates.agentVersion;
if (updates.model !== undefined) updateData.model = updates.model;
if (updates.totalTokensIn !== undefined) updateData.total_tokens_in = updates.totalTokensIn;
if (updates.totalTokensOut !== undefined) updateData.total_tokens_out = updates.totalTokensOut;
if (updates.totalCostMusd !== undefined) updateData.total_cost_musd = updates.totalCostMusd;

if (status === 'running' && !updates.startedAt) {
updateData.started_at = new Date().toISOString();
}
if (
(status === 'completed' || status === 'failed' || status === 'cancelled') &&
!updates.completedAt
) {
updateData.completed_at = new Date().toISOString();
}

const updated = await db
.update(cloud_agent_code_reviews)
.set(updateData)
.where(
and(
eq(cloud_agent_code_reviews.id, reviewId),
inArray(cloud_agent_code_reviews.status, ['pending', 'queued', 'running'])
)
)
.returning({ id: cloud_agent_code_reviews.id });

return updated.length > 0;
} catch (error) {
captureException(error, {
tags: { operation: 'updateCodeReviewStatusIfNonTerminal' },
extra: { reviewId, status, updates },
});
throw error;
}
}

export async function releaseQueuedReviewClaim(reviewId: string): Promise<boolean> {
try {
const released = await db
.update(cloud_agent_code_reviews)
.set({
status: 'pending',
updated_at: new Date().toISOString(),
})
.where(
and(
eq(cloud_agent_code_reviews.id, reviewId),
eq(cloud_agent_code_reviews.status, 'queued'),
isNull(cloud_agent_code_reviews.session_id)
)
)
.returning({ id: cloud_agent_code_reviews.id });

return released.length > 0;
} catch (error) {
captureException(error, {
tags: { operation: 'releaseQueuedReviewClaim' },
extra: { reviewId },
});
throw error;
}
}

/**
* Updates only usage-related columns on a code review, without touching status or timestamps.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
const mockDispatchReview = jest.fn();
const mockGetReviewStatus = jest.fn();
const mockGetAgentConfigForOwner = jest.fn();
const mockPrepareReviewPayload = jest.fn();

jest.mock('@/lib/code-reviews/client/code-review-worker-client', () => ({
codeReviewWorkerClient: {
dispatchReview: (...args: unknown[]) => mockDispatchReview(...args),
getReviewStatus: (...args: unknown[]) => mockGetReviewStatus(...args),
},
}));

Expand Down Expand Up @@ -58,6 +60,7 @@ describe('tryDispatchPendingReviews', () => {

beforeEach(() => {
mockDispatchReview.mockResolvedValue(undefined);
mockGetReviewStatus.mockResolvedValue(null);
mockGetAgentConfigForOwner.mockResolvedValue({ id: 'test-agent-config', config: {} });
mockPrepareReviewPayload.mockImplementation((params: { reviewId: string }) => ({
reviewId: params.reviewId,
Expand All @@ -69,6 +72,7 @@ describe('tryDispatchPendingReviews', () => {
.delete(cloud_agent_code_reviews)
.where(eq(cloud_agent_code_reviews.repo_full_name, REPO));
mockDispatchReview.mockReset();
mockGetReviewStatus.mockReset();
mockGetAgentConfigForOwner.mockReset();
mockPrepareReviewPayload.mockReset();
});
Expand Down Expand Up @@ -462,4 +466,130 @@ describe('tryDispatchPendingReviews', () => {
expect.objectContaining({ reviewId: staleQueuedReview.id })
);
});

it('keeps a dispatch timeout claimed when the Worker status probe finds queued DO state', async () => {
const recentTimestamp = minutesAgo(1);
const owner = { type: 'user', id: testUser.id } satisfies ReviewOwner;
await setTestUserBalance(DEFAULT_TIER_BALANCE_MICRODOLLARS);
mockDispatchReview.mockRejectedValue(new Error('Request timeout after 10000ms'));
mockGetReviewStatus.mockResolvedValue({ reviewId: 'unused', status: 'queued' });

const [review] = await db
.insert(cloud_agent_code_reviews)
.values(
reviewValues({
owner,
status: 'pending',
createdAt: recentTimestamp,
updatedAt: recentTimestamp,
})
)
.returning({ id: cloud_agent_code_reviews.id });

if (!review) {
throw new Error('Expected review to be inserted');
}

const result = await tryDispatchPendingReviews({
type: 'user',
id: testUser.id,
userId: testUser.id,
});

const storedReview = await db.query.cloud_agent_code_reviews.findFirst({
where: eq(cloud_agent_code_reviews.id, review.id),
});

expect(result).toEqual({
dispatched: 1,
pending: 0,
activeCount: 1,
});
expect(mockGetReviewStatus).toHaveBeenCalledWith(review.id);
expect(storedReview?.status).toBe('queued');
});

it('releases a dispatch timeout claim when the Worker status probe finds no DO state', async () => {
const recentTimestamp = minutesAgo(1);
const owner = { type: 'user', id: testUser.id } satisfies ReviewOwner;
await setTestUserBalance(DEFAULT_TIER_BALANCE_MICRODOLLARS);
mockDispatchReview.mockRejectedValue(new Error('Request timeout after 10000ms'));
mockGetReviewStatus.mockResolvedValue(null);

const [review] = await db
.insert(cloud_agent_code_reviews)
.values(
reviewValues({
owner,
status: 'pending',
createdAt: recentTimestamp,
updatedAt: recentTimestamp,
})
)
.returning({ id: cloud_agent_code_reviews.id });

if (!review) {
throw new Error('Expected review to be inserted');
}

const result = await tryDispatchPendingReviews({
type: 'user',
id: testUser.id,
userId: testUser.id,
});

const storedReview = await db.query.cloud_agent_code_reviews.findFirst({
where: eq(cloud_agent_code_reviews.id, review.id),
});

expect(result).toEqual({
dispatched: 0,
pending: 1,
activeCount: 0,
});
expect(mockGetReviewStatus).toHaveBeenCalledWith(review.id);
expect(storedReview?.status).toBe('pending');
});

it('keeps a dispatch timeout claim when the Worker status probe also fails', async () => {
const recentTimestamp = minutesAgo(1);
const owner = { type: 'user', id: testUser.id } satisfies ReviewOwner;
await setTestUserBalance(DEFAULT_TIER_BALANCE_MICRODOLLARS);
mockDispatchReview.mockRejectedValue(new Error('Request timeout after 10000ms'));
mockGetReviewStatus.mockRejectedValue(new Error('status probe timeout'));

const [review] = await db
.insert(cloud_agent_code_reviews)
.values(
reviewValues({
owner,
status: 'pending',
createdAt: recentTimestamp,
updatedAt: recentTimestamp,
})
)
.returning({ id: cloud_agent_code_reviews.id });

if (!review) {
throw new Error('Expected review to be inserted');
}

const result = await tryDispatchPendingReviews({
type: 'user',
id: testUser.id,
userId: testUser.id,
});

const storedReview = await db.query.cloud_agent_code_reviews.findFirst({
where: eq(cloud_agent_code_reviews.id, review.id),
});

expect(result).toEqual({
dispatched: 0,
pending: 1,
activeCount: 0,
});
expect(mockGetReviewStatus).toHaveBeenCalledWith(review.id);
expect(storedReview?.status).toBe('queued');
});
});
Loading