diff --git a/app/api/tasks/[taskId]/chat/interrupt/route.ts b/app/api/tasks/[taskId]/chat/interrupt/route.ts index 5f24454..e0093bc 100644 --- a/app/api/tasks/[taskId]/chat/interrupt/route.ts +++ b/app/api/tasks/[taskId]/chat/interrupt/route.ts @@ -1,7 +1,7 @@ import { and, eq, isNull } from 'drizzle-orm' import { NextResponse } from 'next/server' import { CodexGatewayApiError, interruptCodexGatewayTurn } from '@/lib/codex-gateway/client' -import { hasActiveTurnCheckpoint } from '@/lib/codex-gateway/completion' +import { hasActiveTurnCheckpoint, reconcileIncompleteTurnSafely } from '@/lib/codex-gateway/completion' import { getTaskGatewayContext } from '@/lib/codex-gateway/task' import { db } from '@/lib/db/client' import { tasks } from '@/lib/db/schema' @@ -49,6 +49,9 @@ export async function POST(_request: Request, { params }: RouteParams) { } const result = await interruptCodexGatewayTurn(gatewayUrl, task.activeTurnSessionId, gatewayAuthToken) + await reconcileIncompleteTurnSafely(taskId, 2_500).catch(() => { + console.error('Failed to reconcile interrupted chat turn') + }) return NextResponse.json({ success: true, diff --git a/app/api/tasks/[taskId]/chat/v2/route.ts b/app/api/tasks/[taskId]/chat/v2/route.ts index 72d2f9e..23e935a 100644 --- a/app/api/tasks/[taskId]/chat/v2/route.ts +++ b/app/api/tasks/[taskId]/chat/v2/route.ts @@ -1,8 +1,8 @@ -import { after, NextRequest, NextResponse } from 'next/server' +import { NextRequest, NextResponse } from 'next/server' import { and, asc, eq, isNull } from 'drizzle-orm' import { z } from 'zod' import { CodexGatewayApiError } from '@/lib/codex-gateway/client' -import { finalizeTaskChatV2Turn, startTaskChatV2Turn } from '@/lib/codex-gateway/chat-v2-service' +import { startTaskChatV2Turn } from '@/lib/codex-gateway/chat-v2-service' import { hasActiveTurnCheckpoint, reconcileIncompleteTurnSafely, @@ -25,6 +25,7 @@ export const dynamic = 'force-dynamic' export const maxDuration = 300 const turnSchema = z.object({ + clientMessageId: z.string().trim().min(1).optional(), prompt: z.string().trim().min(1, 'Prompt is required'), }) @@ -158,27 +159,11 @@ export async function POST(request: NextRequest, { params }: RouteParams) { const result = await startTaskChatV2Turn({ task, + clientMessageId: parsed.data.clientMessageId, prompt: parsed.data.prompt, source: 'chat-v2', }) - after(async () => { - try { - await finalizeTaskChatV2Turn(result.startedTurn) - } catch (error) { - console.error('Failed to finalize chat v2 turn:', error) - - await db - .update(tasks) - .set({ - status: 'error', - error: 'Failed to finalize chat turn', - updatedAt: new Date(), - }) - .where(eq(tasks.id, resolvedTaskId)) - } - }) - return NextResponse.json({ success: true, data: { diff --git a/app/api/tasks/[taskId]/chat/v2/stream/route.ts b/app/api/tasks/[taskId]/chat/v2/stream/route.ts index 4c63bb6..da1b9de 100644 --- a/app/api/tasks/[taskId]/chat/v2/stream/route.ts +++ b/app/api/tasks/[taskId]/chat/v2/stream/route.ts @@ -1,4 +1,5 @@ import { NextRequest, NextResponse } from 'next/server' +import { finalizeActiveTurnFailure, reconcileIncompleteTurnSafely } from '@/lib/codex-gateway/completion' import { getCodexGatewayEventStreamUrl } from '@/lib/codex-gateway/client' import { getTaskGatewayContext } from '@/lib/codex-gateway/task' import type { CodexGatewayState } from '@/lib/codex-gateway/types' @@ -114,6 +115,9 @@ async function persistGatewayEvent(input: { if (!state.activeTurn && state.lastTurnStatus) { await closeTaskStream(input.streamId, 'closed') + await reconcileIncompleteTurnSafely(input.taskId, 2_500).catch(() => { + console.error('Failed to reconcile chat v2 stream terminal state') + }) } return @@ -129,9 +133,23 @@ async function persistGatewayEvent(input: { if (eventKind === 'gateway.session.closed') { await closeTaskStream(input.streamId, 'closed') + await reconcileIncompleteTurnSafely(input.taskId, 2_500).catch(() => { + console.error('Failed to reconcile chat v2 session closure') + }) } } +async function persistMissingSessionFailure(taskId: string, sessionId: string) { + await finalizeActiveTurnFailure({ + taskId, + sessionId, + error: 'Codex gateway session is no longer available', + clearGatewaySession: true, + }).catch(() => { + console.error('Failed to persist missing Codex gateway session') + }) +} + export async function GET(request: NextRequest, { params }: RouteParams) { const decoder = new TextDecoder() let streamId: string | null = null @@ -183,6 +201,15 @@ export async function GET(request: NextRequest, { params }: RouteParams) { if (!upstream.ok || !upstream.body) { await closeTaskStream(resolvedStreamId, 'errored') + + if (upstream.status === 404 || upstream.status === 410) { + await persistMissingSessionFailure(resolvedTaskId, stream.sessionId) + } else { + await reconcileIncompleteTurnSafely(resolvedTaskId, 2_500).catch(() => { + console.error('Failed to reconcile chat v2 stream connection error') + }) + } + return NextResponse.json({ error: 'Failed to connect to Codex gateway events' }, { status: 502 }) } @@ -264,6 +291,9 @@ export async function GET(request: NextRequest, { params }: RouteParams) { controller.close() } catch (error) { await closeTaskStream(resolvedStreamId, 'errored') + await reconcileIncompleteTurnSafely(resolvedTaskId, 2_500).catch(() => { + console.error('Failed to reconcile chat v2 stream reader error') + }) try { controller.close() } catch { @@ -289,6 +319,12 @@ export async function GET(request: NextRequest, { params }: RouteParams) { }) } + if (taskId) { + await reconcileIncompleteTurnSafely(taskId, 2_500).catch(() => { + console.error('Failed to reconcile chat v2 stream proxy error') + }) + } + console.error('Failed to proxy chat v2 stream:', error) return NextResponse.json({ error: 'Failed to proxy chat stream' }, { status: 500 }) } diff --git a/app/api/tasks/[taskId]/continue/route.ts b/app/api/tasks/[taskId]/continue/route.ts index 02b09f2..cad6db2 100644 --- a/app/api/tasks/[taskId]/continue/route.ts +++ b/app/api/tasks/[taskId]/continue/route.ts @@ -1,14 +1,15 @@ -import { NextRequest, NextResponse, after } from 'next/server' import { and, eq, isNull } from 'drizzle-orm' +import { NextRequest, NextResponse } from 'next/server' +import { startTaskChatV2Turn } from '@/lib/codex-gateway/chat-v2-service' import { db } from '@/lib/db/client' import { tasks } from '@/lib/db/schema' -import { startCodexGatewayTaskTurn, waitForCodexGatewayTurnCompletion } from '@/lib/codex-gateway/runner' -import { ensureTaskDevboxRuntime } from '@/lib/devbox/runtime' -import { prependSealosDeployContext } from '@/lib/sealos-deploy-context' -import { checkRateLimit } from '@/lib/utils/rate-limit' -import { createTaskLogger } from '@/lib/utils/task-logger' import { getServerSession } from '@/lib/session/get-server-session' -import { appendTaskMessage } from '@/lib/task-messages' +import { generateId } from '@/lib/utils/id' +import { checkRateLimit } from '@/lib/utils/rate-limit' + +function buildCompatClientMessageId(): string { + return `continue-compat:${generateId(16)}` +} export async function POST(req: NextRequest, context: { params: Promise<{ taskId: string }> }) { try { @@ -32,15 +33,16 @@ export async function POST(req: NextRequest, context: { params: Promise<{ taskId } const { taskId } = await context.params - const body = await req.json() - const { message } = body + const body = (await req.json().catch(() => ({}))) as { + clientMessageId?: string + message?: unknown + } + const message = typeof body.message === 'string' ? body.message.trim() : '' - if (!message || typeof message !== 'string' || !message.trim()) { + if (!message) { return NextResponse.json({ error: 'Message is required' }, { status: 400 }) } - const trimmedMessage = message.trim() - const [task] = await db .select() .from(tasks) @@ -55,57 +57,14 @@ export async function POST(req: NextRequest, context: { params: Promise<{ taskId return NextResponse.json({ error: 'Unsupported agent' }, { status: 400 }) } - let userMessagePersisted = false - - try { - await appendTaskMessage({ - taskId, - role: 'user', - content: trimmedMessage, - }) - userMessagePersisted = true - } catch { - console.error('Failed to persist follow-up user message') - } - - await db - .update(tasks) - .set({ - status: 'processing', - progress: 0, - error: null, - completedAt: null, - updatedAt: new Date(), - }) - .where(eq(tasks.id, taskId)) - - after(async () => { - const logger = createTaskLogger(taskId) - - try { - const runtime = await ensureTaskDevboxRuntime(task, { logger }) - const gatewayPrompt = prependSealosDeployContext(trimmedMessage, runtime.namespace || task.runtimeNamespace) - - const startedTurn = await startCodexGatewayTaskTurn(taskId, gatewayPrompt, { - appendUserMessage: !userMessagePersisted, - model: task.selectedModel, - }) - - await waitForCodexGatewayTurnCompletion(startedTurn) - } catch { - console.error('Failed to finalize Codex gateway follow-up') - - await db - .update(tasks) - .set({ - status: 'error', - error: 'Failed to finalize Codex gateway follow-up', - updatedAt: new Date(), - }) - .where(eq(tasks.id, taskId)) - - await logger.error('Failed to finalize Codex gateway follow-up') - } + await startTaskChatV2Turn({ + task, + clientMessageId: + typeof body.clientMessageId === 'string' && body.clientMessageId.trim() + ? body.clientMessageId.trim() + : buildCompatClientMessageId(), + prompt: message, + source: 'continue-compat', }) return NextResponse.json({ success: true }) diff --git a/app/api/tasks/[taskId]/route.ts b/app/api/tasks/[taskId]/route.ts index 03cc59c..8778a40 100644 --- a/app/api/tasks/[taskId]/route.ts +++ b/app/api/tasks/[taskId]/route.ts @@ -93,6 +93,12 @@ export async function PATCH(request: NextRequest, { params }: RouteParams) { // Log the stop request await logger.info('Stop request received - terminating task execution...') + if (hasActiveTurnCheckpoint(existingTask)) { + await reconcileIncompleteTurnSafely(taskId, 1_500).catch(() => { + console.error('Failed to reconcile task before stop') + }) + } + const { gatewayUrl, gatewayAuthToken } = await getTaskGatewayContext(taskId, session.user.id) if (existingTask.gatewaySessionId && gatewayUrl) { diff --git a/app/api/tasks/route.ts b/app/api/tasks/route.ts index cb8cb9b..9823047 100644 --- a/app/api/tasks/route.ts +++ b/app/api/tasks/route.ts @@ -1,6 +1,6 @@ import { NextRequest, NextResponse, after } from 'next/server' import { and, desc, eq, isNull, or } from 'drizzle-orm' -import { finalizeTaskChatV2Turn, startTaskChatV2Turn } from '@/lib/codex-gateway/chat-v2-service' +import { startTaskChatV2Turn } from '@/lib/codex-gateway/chat-v2-service' import { FORCED_CODEX_MODEL } from '@/lib/codex/defaults' import { db } from '@/lib/db/client' import { insertTaskSchema, tasks } from '@/lib/db/schema' @@ -188,31 +188,12 @@ export async function POST(request: NextRequest) { }) try { - const startedTurn = await startTaskChatV2Turn({ + await startTaskChatV2Turn({ task: newTask, + clientMessageId: `task-create:${taskId}`, prompt: validatedData.prompt, source: 'task-create', }) - - after(async () => { - try { - await finalizeTaskChatV2Turn(startedTurn.startedTurn) - } catch { - console.error('Failed to finalize Codex task') - - await db - .update(tasks) - .set({ - status: 'error', - error: 'Failed to finalize Codex task', - updatedAt: new Date(), - }) - .where(eq(tasks.id, taskId)) - - const logger = createTaskLogger(taskId) - await logger.error('Failed to finalize Codex task') - } - }) } catch { console.error('Failed to start Codex task') diff --git a/components/merge-pr-dialog.tsx b/components/merge-pr-dialog.tsx index 332c236..e2db691 100644 --- a/components/merge-pr-dialog.tsx +++ b/components/merge-pr-dialog.tsx @@ -25,6 +25,14 @@ interface MergePRDialogProps { onMergeInitiated?: () => void } +function buildClientMessageId(prefix: string): string { + if (typeof crypto !== 'undefined' && typeof crypto.randomUUID === 'function') { + return `${prefix}:${crypto.randomUUID()}` + } + + return `${prefix}:${Date.now()}-${Math.random().toString(36).slice(2, 10)}` +} + export function MergePRDialog({ taskId, prUrl: _prUrl, @@ -89,13 +97,14 @@ export function MergePRDialog({ try { // Send a follow-up message to the current task to fix merge conflicts - const response = await fetch(`/api/tasks/${taskId}/continue`, { + const response = await fetch(`/api/tasks/${taskId}/chat/v2`, { method: 'POST', headers: { 'Content-Type': 'application/json', }, body: JSON.stringify({ - message: + clientMessageId: buildClientMessageId('merge-conflict-follow-up'), + prompt: 'Fix merge conflicts in the current branch and prepare it for merging. Review the conflicting changes carefully and resolve them intelligently, preserving the intent of both sets of changes where possible.', }), }) diff --git a/lib/codex-gateway/chat-v2-service.ts b/lib/codex-gateway/chat-v2-service.ts index 1beac21..62410a5 100644 --- a/lib/codex-gateway/chat-v2-service.ts +++ b/lib/codex-gateway/chat-v2-service.ts @@ -16,6 +16,7 @@ export interface StartTaskChatV2TurnResult { } export async function startTaskChatV2Turn(input: { + clientMessageId?: string prompt: string source: string task: Task @@ -31,6 +32,7 @@ export async function startTaskChatV2Turn(input: { await appendUserMessageEvent({ taskId: input.task.id, + clientMessageId: input.clientMessageId, content: input.prompt, source: input.source, }) diff --git a/lib/codex-gateway/completion.ts b/lib/codex-gateway/completion.ts index 512c496..c01adf5 100644 --- a/lib/codex-gateway/completion.ts +++ b/lib/codex-gateway/completion.ts @@ -1,10 +1,11 @@ -import { eq } from 'drizzle-orm' +import { and, desc, eq } from 'drizzle-orm' import { CodexGatewayApiError, getCodexGatewaySessionState } from '@/lib/codex-gateway/client' -import { getAssistantContentAfterCursor } from '@/lib/codex-gateway/transcript' +import { getAssistantContentAfterCursor, type TranscriptTextEntry } from '@/lib/codex-gateway/transcript' import { getTaskGatewayContextById } from '@/lib/codex-gateway/task' import { db } from '@/lib/db/client' -import { tasks, type Task } from '@/lib/db/schema' -import { buildProjectedAssistantMessageId, projectAssistantMessage } from '@/lib/task-event-projection' +import { taskEvents, tasks, type Task } from '@/lib/db/schema' +import { projectAssistantMessage } from '@/lib/task-event-projection' +import { buildProjectedAssistantMessageId } from '@/lib/task-message-ids' import { appendProjectedAssistantMessageEvent, recordTaskEvent } from '@/lib/task-events' import { OperationTimeoutError, withTimeout } from '@/lib/utils/async' @@ -32,6 +33,61 @@ interface FinalizeTurnInput { turnStatus?: string | null } +interface FinalizeActiveTurnFailureInput { + clearGatewaySession?: boolean + error: string + sessionId?: string | null + taskId: string +} + +function parseTranscriptEntries(value: unknown): TranscriptTextEntry[] { + if (!Array.isArray(value)) { + return [] + } + + return value.flatMap((entry) => { + if (!entry || typeof entry !== 'object') { + return [] + } + + const role = 'role' in entry && typeof entry.role === 'string' ? entry.role : null + const text = 'text' in entry && typeof entry.text === 'string' ? entry.text : null + + if (!role || text === null) { + return [] + } + + return [{ role, text }] + }) +} + +async function getPersistedAssistantContentForTurn( + taskId: string, + sessionId: string, + transcriptCursor: number, +): Promise { + const [latestSnapshotEvent] = await db + .select({ payload: taskEvents.payload }) + .from(taskEvents) + .where( + and( + eq(taskEvents.taskId, taskId), + eq(taskEvents.kind, 'gateway.state.snapshot'), + eq(taskEvents.sessionId, sessionId), + ), + ) + .orderBy(desc(taskEvents.seq)) + .limit(1) + + const transcript = parseTranscriptEntries(latestSnapshotEvent?.payload?.transcript) + + if (!transcript.length) { + return '' + } + + return getAssistantContentAfterCursor(transcriptCursor, transcript) +} + export function hasActiveTurnCheckpoint(task: Task | null | undefined): boolean { return ( Boolean(task?.activeTurnSessionId) && @@ -160,6 +216,34 @@ export async function finalizeTurnCompletion(input: FinalizeTurnInput): Promise< return updatedTask || null } +export async function finalizeActiveTurnFailure(input: FinalizeActiveTurnFailureInput): Promise { + const [task] = await db.select().from(tasks).where(eq(tasks.id, input.taskId)).limit(1) + + if (!task || !hasActiveTurnCheckpoint(task) || !task.activeTurnSessionId || task.status === 'stopped') { + return task || null + } + + if (input.sessionId && task.activeTurnSessionId !== input.sessionId) { + return task + } + + const assistantContent = await getPersistedAssistantContentForTurn( + input.taskId, + task.activeTurnSessionId, + task.activeTurnTranscriptCursor!, + ) + + return await finalizeTurnCompletion({ + taskId: input.taskId, + sessionId: task.activeTurnSessionId, + transcriptCursor: task.activeTurnTranscriptCursor!, + assistantContent, + success: false, + error: input.error, + clearGatewaySession: input.clearGatewaySession ?? task.gatewaySessionId === task.activeTurnSessionId, + }) +} + export async function reconcileIncompleteTurn(taskId: string): Promise { const { task, gatewayUrl, gatewayAuthToken } = await getTaskGatewayContextById(taskId) @@ -208,11 +292,17 @@ export async function reconcileIncompleteTurn(taskId: string): Promise statement-breakpoint +ALTER TABLE "task_messages" ADD COLUMN "client_message_id" text;--> statement-breakpoint +CREATE UNIQUE INDEX "task_events_task_id_client_message_id_idx" ON "task_events" USING btree ("task_id","client_message_id");--> statement-breakpoint +CREATE UNIQUE INDEX "task_messages_task_id_client_message_id_idx" ON "task_messages" USING btree ("task_id","client_message_id"); \ No newline at end of file diff --git a/lib/db/migrations/meta/0027_snapshot.json b/lib/db/migrations/meta/0027_snapshot.json new file mode 100644 index 0000000..004fe40 --- /dev/null +++ b/lib/db/migrations/meta/0027_snapshot.json @@ -0,0 +1,1149 @@ +{ + "id": "4afeadd8-2c84-4ef7-86a0-5fe005006008", + "prevId": "5d6073f2-db34-472c-997c-eb57bce17c2e", + "version": "7", + "dialect": "postgresql", + "tables": { + "public.accounts": { + "name": "accounts", + "schema": "", + "columns": { + "id": { + "name": "id", + "type": "text", + "primaryKey": true, + "notNull": true + }, + "user_id": { + "name": "user_id", + "type": "text", + "primaryKey": false, + "notNull": true + }, + "provider": { + "name": "provider", + "type": "text", + "primaryKey": false, + "notNull": true, + "default": "'github'" + }, + "external_user_id": { + "name": "external_user_id", + "type": "text", + "primaryKey": false, + "notNull": true + }, + "access_token": { + "name": "access_token", + "type": "text", + "primaryKey": false, + "notNull": true + }, + "refresh_token": { + "name": "refresh_token", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "expires_at": { + "name": "expires_at", + "type": "timestamp", + "primaryKey": false, + "notNull": false + }, + "scope": { + "name": "scope", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "username": { + "name": "username", + "type": "text", + "primaryKey": false, + "notNull": true + }, + "created_at": { + "name": "created_at", + "type": "timestamp", + "primaryKey": false, + "notNull": true, + "default": "now()" + }, + "updated_at": { + "name": "updated_at", + "type": "timestamp", + "primaryKey": false, + "notNull": true, + "default": "now()" + } + }, + "indexes": { + "accounts_user_id_provider_idx": { + "name": "accounts_user_id_provider_idx", + "columns": [ + { + "expression": "user_id", + "isExpression": false, + "asc": true, + "nulls": "last" + }, + { + "expression": "provider", + "isExpression": false, + "asc": true, + "nulls": "last" + } + ], + "isUnique": true, + "concurrently": false, + "method": "btree", + "with": {} + } + }, + "foreignKeys": { + "accounts_user_id_users_id_fk": { + "name": "accounts_user_id_users_id_fk", + "tableFrom": "accounts", + "tableTo": "users", + "columnsFrom": [ + "user_id" + ], + "columnsTo": [ + "id" + ], + "onDelete": "cascade", + "onUpdate": "no action" + } + }, + "compositePrimaryKeys": {}, + "uniqueConstraints": {}, + "policies": {}, + "checkConstraints": {}, + "isRLSEnabled": false + }, + "public.connectors": { + "name": "connectors", + "schema": "", + "columns": { + "id": { + "name": "id", + "type": "text", + "primaryKey": true, + "notNull": true + }, + "user_id": { + "name": "user_id", + "type": "text", + "primaryKey": false, + "notNull": true + }, + "name": { + "name": "name", + "type": "text", + "primaryKey": false, + "notNull": true + }, + "description": { + "name": "description", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "type": { + "name": "type", + "type": "text", + "primaryKey": false, + "notNull": true, + "default": "'remote'" + }, + "base_url": { + "name": "base_url", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "oauth_client_id": { + "name": "oauth_client_id", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "oauth_client_secret": { + "name": "oauth_client_secret", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "command": { + "name": "command", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "env": { + "name": "env", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "status": { + "name": "status", + "type": "text", + "primaryKey": false, + "notNull": true, + "default": "'disconnected'" + }, + "created_at": { + "name": "created_at", + "type": "timestamp", + "primaryKey": false, + "notNull": true, + "default": "now()" + }, + "updated_at": { + "name": "updated_at", + "type": "timestamp", + "primaryKey": false, + "notNull": true, + "default": "now()" + } + }, + "indexes": {}, + "foreignKeys": { + "connectors_user_id_users_id_fk": { + "name": "connectors_user_id_users_id_fk", + "tableFrom": "connectors", + "tableTo": "users", + "columnsFrom": [ + "user_id" + ], + "columnsTo": [ + "id" + ], + "onDelete": "cascade", + "onUpdate": "no action" + } + }, + "compositePrimaryKeys": {}, + "uniqueConstraints": {}, + "policies": {}, + "checkConstraints": {}, + "isRLSEnabled": false + }, + "public.keys": { + "name": "keys", + "schema": "", + "columns": { + "id": { + "name": "id", + "type": "text", + "primaryKey": true, + "notNull": true + }, + "user_id": { + "name": "user_id", + "type": "text", + "primaryKey": false, + "notNull": true + }, + "provider": { + "name": "provider", + "type": "text", + "primaryKey": false, + "notNull": true + }, + "value": { + "name": "value", + "type": "text", + "primaryKey": false, + "notNull": true + }, + "base_url": { + "name": "base_url", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "created_at": { + "name": "created_at", + "type": "timestamp", + "primaryKey": false, + "notNull": true, + "default": "now()" + }, + "updated_at": { + "name": "updated_at", + "type": "timestamp", + "primaryKey": false, + "notNull": true, + "default": "now()" + } + }, + "indexes": { + "keys_user_id_provider_idx": { + "name": "keys_user_id_provider_idx", + "columns": [ + { + "expression": "user_id", + "isExpression": false, + "asc": true, + "nulls": "last" + }, + { + "expression": "provider", + "isExpression": false, + "asc": true, + "nulls": "last" + } + ], + "isUnique": true, + "concurrently": false, + "method": "btree", + "with": {} + } + }, + "foreignKeys": { + "keys_user_id_users_id_fk": { + "name": "keys_user_id_users_id_fk", + "tableFrom": "keys", + "tableTo": "users", + "columnsFrom": [ + "user_id" + ], + "columnsTo": [ + "id" + ], + "onDelete": "cascade", + "onUpdate": "no action" + } + }, + "compositePrimaryKeys": {}, + "uniqueConstraints": {}, + "policies": {}, + "checkConstraints": {}, + "isRLSEnabled": false + }, + "public.settings": { + "name": "settings", + "schema": "", + "columns": { + "id": { + "name": "id", + "type": "text", + "primaryKey": true, + "notNull": true + }, + "user_id": { + "name": "user_id", + "type": "text", + "primaryKey": false, + "notNull": true + }, + "key": { + "name": "key", + "type": "text", + "primaryKey": false, + "notNull": true + }, + "value": { + "name": "value", + "type": "text", + "primaryKey": false, + "notNull": true + }, + "created_at": { + "name": "created_at", + "type": "timestamp", + "primaryKey": false, + "notNull": true, + "default": "now()" + }, + "updated_at": { + "name": "updated_at", + "type": "timestamp", + "primaryKey": false, + "notNull": true, + "default": "now()" + } + }, + "indexes": { + "settings_user_id_key_idx": { + "name": "settings_user_id_key_idx", + "columns": [ + { + "expression": "user_id", + "isExpression": false, + "asc": true, + "nulls": "last" + }, + { + "expression": "key", + "isExpression": false, + "asc": true, + "nulls": "last" + } + ], + "isUnique": true, + "concurrently": false, + "method": "btree", + "with": {} + } + }, + "foreignKeys": { + "settings_user_id_users_id_fk": { + "name": "settings_user_id_users_id_fk", + "tableFrom": "settings", + "tableTo": "users", + "columnsFrom": [ + "user_id" + ], + "columnsTo": [ + "id" + ], + "onDelete": "cascade", + "onUpdate": "no action" + } + }, + "compositePrimaryKeys": {}, + "uniqueConstraints": {}, + "policies": {}, + "checkConstraints": {}, + "isRLSEnabled": false + }, + "public.task_events": { + "name": "task_events", + "schema": "", + "columns": { + "id": { + "name": "id", + "type": "text", + "primaryKey": true, + "notNull": true + }, + "task_id": { + "name": "task_id", + "type": "text", + "primaryKey": false, + "notNull": true + }, + "seq": { + "name": "seq", + "type": "integer", + "primaryKey": false, + "notNull": true + }, + "kind": { + "name": "kind", + "type": "text", + "primaryKey": false, + "notNull": true + }, + "stream_id": { + "name": "stream_id", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "session_id": { + "name": "session_id", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "thread_id": { + "name": "thread_id", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "turn_id": { + "name": "turn_id", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "client_message_id": { + "name": "client_message_id", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "payload": { + "name": "payload", + "type": "jsonb", + "primaryKey": false, + "notNull": false + }, + "created_at": { + "name": "created_at", + "type": "timestamp", + "primaryKey": false, + "notNull": true, + "default": "now()" + } + }, + "indexes": { + "task_events_task_id_seq_idx": { + "name": "task_events_task_id_seq_idx", + "columns": [ + { + "expression": "task_id", + "isExpression": false, + "asc": true, + "nulls": "last" + }, + { + "expression": "seq", + "isExpression": false, + "asc": true, + "nulls": "last" + } + ], + "isUnique": true, + "concurrently": false, + "method": "btree", + "with": {} + }, + "task_events_task_id_client_message_id_idx": { + "name": "task_events_task_id_client_message_id_idx", + "columns": [ + { + "expression": "task_id", + "isExpression": false, + "asc": true, + "nulls": "last" + }, + { + "expression": "client_message_id", + "isExpression": false, + "asc": true, + "nulls": "last" + } + ], + "isUnique": true, + "concurrently": false, + "method": "btree", + "with": {} + } + }, + "foreignKeys": { + "task_events_task_id_tasks_id_fk": { + "name": "task_events_task_id_tasks_id_fk", + "tableFrom": "task_events", + "tableTo": "tasks", + "columnsFrom": [ + "task_id" + ], + "columnsTo": [ + "id" + ], + "onDelete": "cascade", + "onUpdate": "no action" + } + }, + "compositePrimaryKeys": {}, + "uniqueConstraints": {}, + "policies": {}, + "checkConstraints": {}, + "isRLSEnabled": false + }, + "public.task_messages": { + "name": "task_messages", + "schema": "", + "columns": { + "id": { + "name": "id", + "type": "text", + "primaryKey": true, + "notNull": true + }, + "task_id": { + "name": "task_id", + "type": "text", + "primaryKey": false, + "notNull": true + }, + "role": { + "name": "role", + "type": "text", + "primaryKey": false, + "notNull": true + }, + "content": { + "name": "content", + "type": "text", + "primaryKey": false, + "notNull": true + }, + "client_message_id": { + "name": "client_message_id", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "created_at": { + "name": "created_at", + "type": "timestamp", + "primaryKey": false, + "notNull": true, + "default": "now()" + } + }, + "indexes": { + "task_messages_task_id_client_message_id_idx": { + "name": "task_messages_task_id_client_message_id_idx", + "columns": [ + { + "expression": "task_id", + "isExpression": false, + "asc": true, + "nulls": "last" + }, + { + "expression": "client_message_id", + "isExpression": false, + "asc": true, + "nulls": "last" + } + ], + "isUnique": true, + "concurrently": false, + "method": "btree", + "with": {} + } + }, + "foreignKeys": { + "task_messages_task_id_tasks_id_fk": { + "name": "task_messages_task_id_tasks_id_fk", + "tableFrom": "task_messages", + "tableTo": "tasks", + "columnsFrom": [ + "task_id" + ], + "columnsTo": [ + "id" + ], + "onDelete": "cascade", + "onUpdate": "no action" + } + }, + "compositePrimaryKeys": {}, + "uniqueConstraints": {}, + "policies": {}, + "checkConstraints": {}, + "isRLSEnabled": false + }, + "public.task_streams": { + "name": "task_streams", + "schema": "", + "columns": { + "id": { + "name": "id", + "type": "text", + "primaryKey": true, + "notNull": true + }, + "task_id": { + "name": "task_id", + "type": "text", + "primaryKey": false, + "notNull": true + }, + "session_id": { + "name": "session_id", + "type": "text", + "primaryKey": false, + "notNull": true + }, + "thread_id": { + "name": "thread_id", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "turn_id": { + "name": "turn_id", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "status": { + "name": "status", + "type": "text", + "primaryKey": false, + "notNull": true, + "default": "'active'" + }, + "started_at": { + "name": "started_at", + "type": "timestamp", + "primaryKey": false, + "notNull": true, + "default": "now()" + }, + "last_event_at": { + "name": "last_event_at", + "type": "timestamp", + "primaryKey": false, + "notNull": true, + "default": "now()" + }, + "ended_at": { + "name": "ended_at", + "type": "timestamp", + "primaryKey": false, + "notNull": false + } + }, + "indexes": {}, + "foreignKeys": { + "task_streams_task_id_tasks_id_fk": { + "name": "task_streams_task_id_tasks_id_fk", + "tableFrom": "task_streams", + "tableTo": "tasks", + "columnsFrom": [ + "task_id" + ], + "columnsTo": [ + "id" + ], + "onDelete": "cascade", + "onUpdate": "no action" + } + }, + "compositePrimaryKeys": {}, + "uniqueConstraints": {}, + "policies": {}, + "checkConstraints": {}, + "isRLSEnabled": false + }, + "public.tasks": { + "name": "tasks", + "schema": "", + "columns": { + "id": { + "name": "id", + "type": "text", + "primaryKey": true, + "notNull": true + }, + "user_id": { + "name": "user_id", + "type": "text", + "primaryKey": false, + "notNull": true + }, + "prompt": { + "name": "prompt", + "type": "text", + "primaryKey": false, + "notNull": true + }, + "title": { + "name": "title", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "repo_url": { + "name": "repo_url", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "selected_agent": { + "name": "selected_agent", + "type": "text", + "primaryKey": false, + "notNull": false, + "default": "'claude'" + }, + "selected_model": { + "name": "selected_model", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "install_dependencies": { + "name": "install_dependencies", + "type": "boolean", + "primaryKey": false, + "notNull": false, + "default": false + }, + "max_duration": { + "name": "max_duration", + "type": "integer", + "primaryKey": false, + "notNull": false, + "default": 300 + }, + "keep_alive": { + "name": "keep_alive", + "type": "boolean", + "primaryKey": false, + "notNull": false, + "default": false + }, + "enable_browser": { + "name": "enable_browser", + "type": "boolean", + "primaryKey": false, + "notNull": false, + "default": false + }, + "status": { + "name": "status", + "type": "text", + "primaryKey": false, + "notNull": true, + "default": "'pending'" + }, + "progress": { + "name": "progress", + "type": "integer", + "primaryKey": false, + "notNull": false, + "default": 0 + }, + "logs": { + "name": "logs", + "type": "jsonb", + "primaryKey": false, + "notNull": false + }, + "error": { + "name": "error", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "branch_name": { + "name": "branch_name", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "runtime_provider": { + "name": "runtime_provider", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "runtime_name": { + "name": "runtime_name", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "runtime_namespace": { + "name": "runtime_namespace", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "runtime_state": { + "name": "runtime_state", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "workspace_prepared_at": { + "name": "workspace_prepared_at", + "type": "timestamp", + "primaryKey": false, + "notNull": false + }, + "workspace_fingerprint": { + "name": "workspace_fingerprint", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "runtime_checked_at": { + "name": "runtime_checked_at", + "type": "timestamp", + "primaryKey": false, + "notNull": false + }, + "gateway_ready_at": { + "name": "gateway_ready_at", + "type": "timestamp", + "primaryKey": false, + "notNull": false + }, + "gateway_url": { + "name": "gateway_url", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "gateway_session_id": { + "name": "gateway_session_id", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "active_turn_session_id": { + "name": "active_turn_session_id", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "active_turn_started_at": { + "name": "active_turn_started_at", + "type": "timestamp", + "primaryKey": false, + "notNull": false + }, + "active_turn_transcript_cursor": { + "name": "active_turn_transcript_cursor", + "type": "integer", + "primaryKey": false, + "notNull": false + }, + "turn_completion_state": { + "name": "turn_completion_state", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "turn_completion_checked_at": { + "name": "turn_completion_checked_at", + "type": "timestamp", + "primaryKey": false, + "notNull": false + }, + "sandbox_id": { + "name": "sandbox_id", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "agent_session_id": { + "name": "agent_session_id", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "sandbox_url": { + "name": "sandbox_url", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "preview_url": { + "name": "preview_url", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "pr_url": { + "name": "pr_url", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "pr_number": { + "name": "pr_number", + "type": "integer", + "primaryKey": false, + "notNull": false + }, + "pr_status": { + "name": "pr_status", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "pr_merge_commit_sha": { + "name": "pr_merge_commit_sha", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "mcp_server_ids": { + "name": "mcp_server_ids", + "type": "jsonb", + "primaryKey": false, + "notNull": false + }, + "created_at": { + "name": "created_at", + "type": "timestamp", + "primaryKey": false, + "notNull": true, + "default": "now()" + }, + "updated_at": { + "name": "updated_at", + "type": "timestamp", + "primaryKey": false, + "notNull": true, + "default": "now()" + }, + "completed_at": { + "name": "completed_at", + "type": "timestamp", + "primaryKey": false, + "notNull": false + }, + "deleted_at": { + "name": "deleted_at", + "type": "timestamp", + "primaryKey": false, + "notNull": false + } + }, + "indexes": {}, + "foreignKeys": { + "tasks_user_id_users_id_fk": { + "name": "tasks_user_id_users_id_fk", + "tableFrom": "tasks", + "tableTo": "users", + "columnsFrom": [ + "user_id" + ], + "columnsTo": [ + "id" + ], + "onDelete": "cascade", + "onUpdate": "no action" + } + }, + "compositePrimaryKeys": {}, + "uniqueConstraints": {}, + "policies": {}, + "checkConstraints": {}, + "isRLSEnabled": false + }, + "public.users": { + "name": "users", + "schema": "", + "columns": { + "id": { + "name": "id", + "type": "text", + "primaryKey": true, + "notNull": true + }, + "provider": { + "name": "provider", + "type": "text", + "primaryKey": false, + "notNull": true + }, + "external_id": { + "name": "external_id", + "type": "text", + "primaryKey": false, + "notNull": true + }, + "access_token": { + "name": "access_token", + "type": "text", + "primaryKey": false, + "notNull": true + }, + "refresh_token": { + "name": "refresh_token", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "scope": { + "name": "scope", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "username": { + "name": "username", + "type": "text", + "primaryKey": false, + "notNull": true + }, + "email": { + "name": "email", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "name": { + "name": "name", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "avatar_url": { + "name": "avatar_url", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "created_at": { + "name": "created_at", + "type": "timestamp", + "primaryKey": false, + "notNull": true, + "default": "now()" + }, + "updated_at": { + "name": "updated_at", + "type": "timestamp", + "primaryKey": false, + "notNull": true, + "default": "now()" + }, + "last_login_at": { + "name": "last_login_at", + "type": "timestamp", + "primaryKey": false, + "notNull": true, + "default": "now()" + } + }, + "indexes": { + "users_provider_external_id_idx": { + "name": "users_provider_external_id_idx", + "columns": [ + { + "expression": "provider", + "isExpression": false, + "asc": true, + "nulls": "last" + }, + { + "expression": "external_id", + "isExpression": false, + "asc": true, + "nulls": "last" + } + ], + "isUnique": true, + "concurrently": false, + "method": "btree", + "with": {} + } + }, + "foreignKeys": {}, + "compositePrimaryKeys": {}, + "uniqueConstraints": {}, + "policies": {}, + "checkConstraints": {}, + "isRLSEnabled": false + } + }, + "enums": {}, + "schemas": {}, + "sequences": {}, + "roles": {}, + "policies": {}, + "views": {}, + "_meta": { + "columns": {}, + "schemas": {}, + "tables": {} + } +} \ No newline at end of file diff --git a/lib/db/migrations/meta/_journal.json b/lib/db/migrations/meta/_journal.json index 8c0f278..6badf8a 100644 --- a/lib/db/migrations/meta/_journal.json +++ b/lib/db/migrations/meta/_journal.json @@ -190,6 +190,13 @@ "when": 1776685833943, "tag": "0026_marvelous_firelord", "breakpoints": true + }, + { + "idx": 27, + "version": "7", + "when": 1776766917758, + "tag": "0027_windy_pet_avengers", + "breakpoints": true } ] } \ No newline at end of file diff --git a/lib/db/schema.ts b/lib/db/schema.ts index 4786ce0..8f76e80 100644 --- a/lib/db/schema.ts +++ b/lib/db/schema.ts @@ -437,11 +437,16 @@ export const taskEvents = pgTable( sessionId: text('session_id'), threadId: text('thread_id'), turnId: text('turn_id'), + clientMessageId: text('client_message_id'), payload: jsonb('payload').$type | null>(), createdAt: timestamp('created_at').defaultNow().notNull(), }, (table) => ({ taskSeqUnique: uniqueIndex('task_events_task_id_seq_idx').on(table.taskId, table.seq), + taskClientMessageUnique: uniqueIndex('task_events_task_id_client_message_id_idx').on( + table.taskId, + table.clientMessageId, + ), }), ) @@ -454,6 +459,7 @@ export const insertTaskEventSchema = z.object({ sessionId: z.string().optional(), threadId: z.string().optional(), turnId: z.string().optional(), + clientMessageId: z.string().optional(), payload: z.record(z.string(), z.unknown()).nullable().optional(), createdAt: z.date().optional(), }) @@ -467,6 +473,7 @@ export const selectTaskEventSchema = z.object({ sessionId: z.string().nullable(), threadId: z.string().nullable(), turnId: z.string().nullable(), + clientMessageId: z.string().nullable(), payload: z.record(z.string(), z.unknown()).nullable(), createdAt: z.date(), }) @@ -520,23 +527,34 @@ export type TaskStream = z.infer export type InsertTaskStream = z.infer // Task messages table - stores user and agent messages for each task -export const taskMessages = pgTable('task_messages', { - id: text('id').primaryKey(), - taskId: text('task_id') - .notNull() - .references(() => tasks.id, { onDelete: 'cascade' }), // Foreign key to tasks table - role: text('role', { - enum: ['user', 'agent'], - }).notNull(), // Who sent the message - content: text('content').notNull(), // The message content - createdAt: timestamp('created_at').defaultNow().notNull(), -}) +export const taskMessages = pgTable( + 'task_messages', + { + id: text('id').primaryKey(), + taskId: text('task_id') + .notNull() + .references(() => tasks.id, { onDelete: 'cascade' }), // Foreign key to tasks table + role: text('role', { + enum: ['user', 'agent'], + }).notNull(), // Who sent the message + content: text('content').notNull(), // The message content + clientMessageId: text('client_message_id'), + createdAt: timestamp('created_at').defaultNow().notNull(), + }, + (table) => ({ + taskClientMessageUnique: uniqueIndex('task_messages_task_id_client_message_id_idx').on( + table.taskId, + table.clientMessageId, + ), + }), +) export const insertTaskMessageSchema = z.object({ id: z.string().optional(), taskId: z.string().min(1, 'Task ID is required'), role: z.enum(['user', 'agent']), content: z.string().min(1, 'Content is required'), + clientMessageId: z.string().optional(), createdAt: z.date().optional(), }) @@ -545,6 +563,7 @@ export const selectTaskMessageSchema = z.object({ taskId: z.string(), role: z.enum(['user', 'agent']), content: z.string(), + clientMessageId: z.string().nullable(), createdAt: z.date(), }) diff --git a/lib/devbox/runtime.ts b/lib/devbox/runtime.ts index 3a1f1ec..cb4184a 100644 --- a/lib/devbox/runtime.ts +++ b/lib/devbox/runtime.ts @@ -18,6 +18,7 @@ import { getDevboxArchiveAfterPauseTime, getDevboxDefaultImage, getDevboxNamespa import { createTaskDevboxName, createTaskDevboxUpstreamId } from '@/lib/devbox/naming' import type { DevboxInfo, DevboxSshInfo } from '@/lib/devbox/types' import { getUserGitHubToken } from '@/lib/github/user-token' +import { redactSensitiveInfo } from '@/lib/utils/logging' import type { TaskLogger } from '@/lib/utils/task-logger' import { formatKeyTaskLogMessage, TASK_FLOW_LOGS } from '@/lib/utils/task-flow-logs' import { createAuthenticatedRepoUrl } from '@/lib/sandbox/config' @@ -53,11 +54,84 @@ const DEVBOX_BOOTSTRAP_READY_POLL_MS = 2_000 const DEVBOX_SKILL_INSTALL_MARKER = '__CODEX_SKILL_INSTALLED__:1' const DEVBOX_RUNTIME_READY_TIMEOUT_MS = 60_000 const DEVBOX_RUNTIME_READY_POLL_MS = 2_000 +const DEVBOX_SECRET_READY_MAX_RETRIES = 3 +const DEVBOX_SECRET_READY_RETRY_DELAY_MS = 2_000 +const DEVBOX_BOOTSTRAP_DEBUG_MAX_LINES = 20 +const DEVBOX_BOOTSTRAP_DEBUG_MAX_CHARS = 2_000 function sleep(ms: number) { return new Promise((resolve) => setTimeout(resolve, ms)) } +function stripAnsi(value: string): string { + return value.replace(/\u001b\[[0-9;]*[A-Za-z]/g, '') +} + +function redactPathLikeInfo(value: string): string { + return value + .replace(/https?:\/\/[^\s'"]+/gi, '[REDACTED_URL]') + .replace(/\b(?:\/[^/\s'"]+)+\/?/g, '[REDACTED_PATH]') + .replace(/\b(?:[A-Za-z]:\\(?:[^\\\s'"]+\\)*[^\\\s'"]*)/g, '[REDACTED_PATH]') +} + +function summarizeBootstrapDebugOutput(value: string): string { + const normalized = stripAnsi(value).replace(/\r\n/g, '\n').trim() + + if (!normalized) { + return '[empty]' + } + + const lines = redactPathLikeInfo(redactSensitiveInfo(normalized)) + .split('\n') + .map((line) => line.trimEnd()) + .filter((line) => line.length > 0) + + if (lines.length === 0) { + return '[empty]' + } + + const summary = lines.slice(-DEVBOX_BOOTSTRAP_DEBUG_MAX_LINES).join('\n') + if (summary.length <= DEVBOX_BOOTSTRAP_DEBUG_MAX_CHARS) { + return summary + } + + return summary.slice(-DEVBOX_BOOTSTRAP_DEBUG_MAX_CHARS) +} + +function logBootstrapDebugOutput(stdout: string, stderr: string): void { + const stdoutSummary = summarizeBootstrapDebugOutput(stdout) + const stderrSummary = summarizeBootstrapDebugOutput(stderr) + + console.error('Devbox workspace bootstrap stdout summary:', stdoutSummary) + console.error('Devbox workspace bootstrap stderr summary:', stderrSummary) +} + +function isDevboxSecretPendingError(error: unknown): error is DevboxApiError { + return ( + error instanceof DevboxApiError && + error.status >= 500 && + error.message.includes('get devbox private key failed') && + error.message.includes('not found') + ) +} + +async function getDevboxWithSecretRetry(name: string): Promise<{ data: DevboxInfo }> { + let attempt = 0 + + while (true) { + try { + return await getDevbox(name) + } catch (error) { + if (!isDevboxSecretPendingError(error) || attempt >= DEVBOX_SECRET_READY_MAX_RETRIES) { + throw error + } + + attempt += 1 + await sleep(DEVBOX_SECRET_READY_RETRY_DELAY_MS) + } + } +} + function shellEscape(value: string): string { return `'${value.replace(/'/g, `'\\''`)}'` } @@ -147,7 +221,7 @@ async function waitForRunningDevbox(runtimeName: string): Promise { const startedAt = Date.now() while (Date.now() - startedAt < DEVBOX_RUNTIME_READY_TIMEOUT_MS) { - const response = await getDevbox(runtimeName) + const response = await getDevboxWithSecretRetry(runtimeName) if (response.data.state.phase === 'Running') { return response.data @@ -160,7 +234,7 @@ async function waitForRunningDevbox(runtimeName: string): Promise { } async function ensureRunningDevbox(task: Task, runtimeName: string): Promise { - const response = await getDevbox(runtimeName) + const response = await getDevboxWithSecretRetry(runtimeName) if (response.data.state.phase === 'Running') { return response.data @@ -311,7 +385,7 @@ ${managedCodexConfigToml}EOF`, while (true) { try { - const runtime = await getDevbox(runtimeName) + const runtime = await getDevboxWithSecretRetry(runtimeName) if (runtime.data.state.phase !== 'Running') { console.info('Devbox workspace bootstrap waiting for runtime') lastPendingError = true @@ -330,7 +404,9 @@ ${managedCodexConfigToml}EOF`, console.info('Devbox workspace bootstrap exec finished') if (execResponse.data.exitCode !== 0) { + await logger?.error('Devbox workspace bootstrap failed') console.error('Devbox workspace bootstrap failed') + logBootstrapDebugOutput(execResponse.data.stdout, execResponse.data.stderr) throw new Error('Failed to bootstrap Devbox workspace') } @@ -539,7 +615,7 @@ export async function ensureTaskDevboxRuntime( ], }) - const infoResponse = await getDevbox(runtimeName) + const infoResponse = await getDevboxWithSecretRetry(runtimeName) const runtimeInfo = infoResponse.data.state.phase === 'Running' ? infoResponse.data : await ensureRunningDevbox(task, runtimeName) const gatewayUrl = resolveCodexGatewayUrl(runtimeName, task.gatewayUrl, runtimeInfo) diff --git a/lib/hooks/use-task-agent-chat-v2.ts b/lib/hooks/use-task-agent-chat-v2.ts index e9e93b8..bedfe45 100644 --- a/lib/hooks/use-task-agent-chat-v2.ts +++ b/lib/hooks/use-task-agent-chat-v2.ts @@ -9,7 +9,8 @@ import { buildStreamingAgentMessageFromState, combineChatMessages, createOptimisticUserMessage, - hasPersistedAssistantContent, + hasPersistedAssistantIdentity, + type LiveAssistantMessageIdentity, reconcileOptimisticMessages, } from '@/lib/task-chat' import { @@ -53,6 +54,23 @@ interface ChatV2TurnResponse { error?: string } +function buildClientMessageId(prefix: string): string { + if (typeof crypto !== 'undefined' && typeof crypto.randomUUID === 'function') { + return `${prefix}:${crypto.randomUUID()}` + } + + return `${prefix}:${Date.now()}-${Math.random().toString(36).slice(2, 10)}` +} + +function getLiveTurnIdentity(task: Pick) { + return task.activeTurnSessionId && typeof task.activeTurnTranscriptCursor === 'number' + ? { + sessionId: task.activeTurnSessionId, + transcriptCursor: task.activeTurnTranscriptCursor, + } + : null +} + function isTaskStreaming(task: Task): boolean { return ( Boolean(task.activeTurnSessionId) && @@ -62,10 +80,23 @@ function isTaskStreaming(task: Task): boolean { } export function useTaskAgentChatV2(taskId: string, task: Task) { + const taskCheckpointIdentity = useMemo( + () => + task.activeTurnSessionId && typeof task.activeTurnTranscriptCursor === 'number' + ? { + sessionId: task.activeTurnSessionId, + transcriptCursor: task.activeTurnTranscriptCursor, + } + : null, + [task.activeTurnSessionId, task.activeTurnTranscriptCursor], + ) const [persistedMessages, setPersistedMessages] = useState([]) const [persistedEvents, setPersistedEvents] = useState([]) const [pendingMessages, setPendingMessages] = useState[]>([]) const [liveState, setLiveState] = useState(null) + const [liveTurnIdentity, setLiveTurnIdentity] = useState( + getLiveTurnIdentity(task), + ) const [retainedStreamingMessage, setRetainedStreamingMessage] = useState(null) const [activeStream, setActiveStream] = useState(null) const [isLoading, setIsLoading] = useState(true) @@ -104,6 +135,8 @@ export function useTaskAgentChatV2(taskId: string, task: Task) { } startTransition(() => { + const refreshedLiveTurnIdentity = getLiveTurnIdentity(data.data!.task) + setPersistedMessages((previousMessages) => areTaskMessagesEqual(previousMessages, data.data!.messages) ? previousMessages : data.data!.messages, ) @@ -112,6 +145,7 @@ export function useTaskAgentChatV2(taskId: string, task: Task) { setActiveStream((previousStream) => previousStream?.streamId === data.data!.stream?.streamId ? previousStream : data.data!.stream, ) + setLiveTurnIdentity(refreshedLiveTurnIdentity) if (!data.data!.stream) { setLiveState(null) } @@ -120,7 +154,13 @@ export function useTaskAgentChatV2(taskId: string, task: Task) { return previousMessage } - return hasPersistedAssistantContent(data.data!.messages, previousMessage.content) ? null : previousMessage + return data.data!.messages.some( + (message) => message.role === 'agent' && message.id === previousMessage.id, + ) || + (refreshedLiveTurnIdentity && + hasPersistedAssistantIdentity(data.data!.messages, refreshedLiveTurnIdentity)) + ? null + : previousMessage }) }) @@ -147,6 +187,12 @@ export function useTaskAgentChatV2(taskId: string, task: Task) { } }, [clearReconnectTimer]) + useEffect(() => { + if (!liveTurnIdentity && taskCheckpointIdentity) { + setLiveTurnIdentity(taskCheckpointIdentity) + } + }, [liveTurnIdentity, taskCheckpointIdentity]) + useEffect(() => { const taskUpdateToken = task.updatedAt ? new Date(task.updatedAt).toISOString() : null if (!taskUpdateToken) { @@ -187,7 +233,8 @@ export function useTaskAgentChatV2(taskId: string, task: Task) { source.addEventListener('state', (event) => { const nextState = JSON.parse(event.data) as CodexGatewayState - const nextStreamingMessage = buildStreamingAgentMessageFromState(taskId, nextState) + const nextIdentity = liveTurnIdentity || taskCheckpointIdentity + const nextStreamingMessage = buildStreamingAgentMessageFromState(taskId, nextState, nextIdentity) startTransition(() => { setLiveState(nextState) @@ -209,6 +256,7 @@ export function useTaskAgentChatV2(taskId: string, task: Task) { clearReconnectTimer() reconnectAttemptRef.current = 0 setActiveStream(null) + setLiveTurnIdentity(null) source.close() if (eventSourceRef.current === source) { @@ -223,6 +271,7 @@ export function useTaskAgentChatV2(taskId: string, task: Task) { clearReconnectTimer() reconnectAttemptRef.current = 0 setActiveStream(null) + setLiveTurnIdentity(null) source.close() if (eventSourceRef.current === source) { @@ -262,7 +311,7 @@ export function useTaskAgentChatV2(taskId: string, task: Task) { eventSourceRef.current = null } } - }, [activeStream, clearReconnectTimer, refreshChat, taskId]) + }, [activeStream, clearReconnectTimer, liveTurnIdentity, refreshChat, taskCheckpointIdentity, taskId]) const sendMessage = useCallback( async (content: string): Promise => { @@ -274,7 +323,8 @@ export function useTaskAgentChatV2(taskId: string, task: Task) { } } - const optimisticMessage = createOptimisticUserMessage(taskId, trimmedContent) + const clientMessageId = buildClientMessageId('chat') + const optimisticMessage = createOptimisticUserMessage(taskId, trimmedContent, clientMessageId) setPendingMessages((previousMessages) => [...previousMessages, optimisticMessage]) setRetainedStreamingMessage(null) setLiveState(null) @@ -287,6 +337,7 @@ export function useTaskAgentChatV2(taskId: string, task: Task) { 'Content-Type': 'application/json', }, body: JSON.stringify({ + clientMessageId, prompt: trimmedContent, }), }) @@ -307,6 +358,10 @@ export function useTaskAgentChatV2(taskId: string, task: Task) { clearReconnectTimer() reconnectAttemptRef.current = 0 setActiveStream(data.data!.stream) + setLiveTurnIdentity({ + sessionId: data.data!.session.sessionId, + transcriptCursor: data.data!.turn.transcriptCursor, + }) }) void refreshChat(false) @@ -376,8 +431,8 @@ export function useTaskAgentChatV2(taskId: string, task: Task) { }, [activeStream, liveState?.activeTurn, task, taskId]) const streamingMessage = useMemo( - () => buildStreamingAgentMessage(taskId, liveState, persistedMessages), - [liveState, persistedMessages, taskId], + () => buildStreamingAgentMessage(taskId, liveState, persistedMessages, liveTurnIdentity), + [liveState, liveTurnIdentity, persistedMessages, taskId], ) const activeStreamingMessage = streamingMessage || retainedStreamingMessage @@ -390,19 +445,20 @@ export function useTaskAgentChatV2(taskId: string, task: Task) { const activityItems = useMemo(() => { const persistedActivityItems = buildAgentActivityItemsFromTaskEvents(persistedEvents) const liveActivityItems = buildAgentActivityItemsFromState(liveState) - const seen = new Set() + const latestByIdentity = new Map() - return [...persistedActivityItems, ...liveActivityItems] - .toSorted((left, right) => left.occurredAt.localeCompare(right.occurredAt)) - .filter((item) => { - const key = `${item.label}|${item.detail}|${item.occurredAt}` - if (seen.has(key)) { - return false - } + for (const item of [...persistedActivityItems, ...liveActivityItems]) { + const identityKey = `${item.groupKey}|${item.label}|${item.detail}` + const previousItem = latestByIdentity.get(identityKey) - seen.add(key) - return true - }) + if (!previousItem || previousItem.occurredAt.localeCompare(item.occurredAt) <= 0) { + latestByIdentity.set(identityKey, item) + } + } + + return Array.from(latestByIdentity.values()).toSorted((left, right) => + left.occurredAt.localeCompare(right.occurredAt), + ) }, [liveState, persistedEvents]) return { diff --git a/lib/hooks/use-task-chat-messages.ts b/lib/hooks/use-task-chat-messages.ts deleted file mode 100644 index 8f3b868..0000000 --- a/lib/hooks/use-task-chat-messages.ts +++ /dev/null @@ -1,629 +0,0 @@ -'use client' - -import { startTransition, useCallback, useEffect, useMemo, useRef, useState } from 'react' -import type { CodexGatewayState } from '@/lib/codex-gateway/types' -import type { Task, TaskMessage } from '@/lib/db/schema' -import { - areTaskMessagesEqual, - buildStreamingAgentMessageFromState, - buildStreamingAgentMessage, - combineChatMessages, - createOptimisticUserMessage, - hasPersistedAssistantContent, - reconcileOptimisticMessages, -} from '@/lib/task-chat' - -interface ChatRuntimeRouteResponse { - success: boolean - data?: { - runtime: { - status: Task['status'] - runtimeName: Task['runtimeName'] - runtimeState: Task['runtimeState'] - workspacePreparedAt: Task['workspacePreparedAt'] - runtimeCheckedAt: Task['runtimeCheckedAt'] - gatewayReadyAt: Task['gatewayReadyAt'] - gatewaySessionId: string | null - turnCompletionState: Task['turnCompletionState'] - } - session: { - sessionId: string - state: CodexGatewayState - } | null - stream: { - streamTicket: string - streamUrl: string - } | null - } - error?: string -} - -interface ChatActionResult { - success: boolean - error?: string -} - -interface ChatTurnRouteResponse { - success: boolean - data?: { - session?: { - sessionId: string - } - stream?: { - streamTicket: string - streamUrl: string - } - turn?: { - transcriptCursor: number - turnAccepted: boolean - turnStartedAt: string - streamUrl: string - } - } - error?: string -} - -function isTaskProcessing(status: Task['status']): boolean { - return status === 'processing' || status === 'pending' -} - -function shouldPrewarmGatewayTask(task: Task, gatewaySessionId: string | null): boolean { - if (task.selectedAgent !== 'codex' || gatewaySessionId) { - return false - } - - if ( - task.activeTurnSessionId && - task.turnCompletionState && - task.turnCompletionState !== 'completed' && - task.turnCompletionState !== 'failed' - ) { - return false - } - - return !task.runtimeName || task.runtimeState !== 'Running' || !task.workspacePreparedAt || !task.gatewaySessionId -} - -export function useTaskChatMessages(taskId: string, task: Task) { - const [persistedMessages, setPersistedMessages] = useState([]) - const [pendingMessages, setPendingMessages] = useState[]>([]) - const [gatewayState, setGatewayState] = useState(null) - const [retainedStreamingMessage, setRetainedStreamingMessage] = useState(null) - const [gatewaySessionId, setGatewaySessionId] = useState(task.gatewaySessionId) - const [gatewayStreamUrl, setGatewayStreamUrl] = useState(null) - const [gatewayTurnPending, setGatewayTurnPending] = useState( - task.selectedAgent === 'codex' && isTaskProcessing(task.status), - ) - const [isLoading, setIsLoading] = useState(true) - const [error, setError] = useState(null) - const [isSending, setIsSending] = useState(false) - const [isStopping, setIsStopping] = useState(false) - const gatewayEventSourceRef = useRef(null) - const gatewayReconnectTimeoutRef = useRef(null) - const gatewayReconnectAttemptRef = useRef(0) - const lastTaskUpdateRef = useRef(null) - const lastPrewarmTokenRef = useRef(null) - const persistedMessagesRef = useRef([]) - const retainedStreamingContentRef = useRef(null) - const isGatewayTask = task.selectedAgent === 'codex' - - const applyGatewayRuntimeSnapshot = useCallback((data?: ChatRuntimeRouteResponse['data']) => { - if (!data) { - return false - } - - startTransition(() => { - setGatewaySessionId(data.session?.sessionId || data.runtime.gatewaySessionId || null) - setGatewayState(data.session?.state || null) - setGatewayStreamUrl(data.stream?.streamUrl || null) - - if (data.session?.state.activeTurn) { - setGatewayTurnPending(true) - } else if (!isTaskProcessing(data.runtime.status)) { - setGatewayTurnPending(false) - } - }) - - return Boolean(data.session) - }, []) - - useEffect(() => { - persistedMessagesRef.current = persistedMessages - }, [persistedMessages]) - - useEffect(() => { - // Keep the latest streamed content available to SSE callbacks without recreating the connection. - retainedStreamingContentRef.current = retainedStreamingMessage?.content?.trim() || null - }, [retainedStreamingMessage]) - - const clearGatewayReconnectTimer = useCallback(() => { - if (gatewayReconnectTimeoutRef.current !== null) { - window.clearTimeout(gatewayReconnectTimeoutRef.current) - gatewayReconnectTimeoutRef.current = null - } - }, []) - - const refreshMessages = useCallback( - async (showLoading = true): Promise => { - if (showLoading) { - setIsLoading(true) - } - - setError(null) - - try { - const response = await fetch(`/api/tasks/${taskId}/messages`, { - cache: 'no-store', - }) - const data = (await response.json()) as { - success?: boolean - messages?: TaskMessage[] - error?: string - } - - if (!response.ok || !data.success || !data.messages) { - setError(data.error || 'Failed to fetch messages') - return false - } - - startTransition(() => { - setPersistedMessages((previousMessages) => - areTaskMessagesEqual(previousMessages, data.messages as TaskMessage[]) ? previousMessages : data.messages!, - ) - setPendingMessages((previousMessages) => reconcileOptimisticMessages(previousMessages, data.messages!)) - setRetainedStreamingMessage((previousMessage) => { - if (!previousMessage) { - return previousMessage - } - - return hasPersistedAssistantContent(data.messages!, previousMessage.content) ? null : previousMessage - }) - }) - - return true - } catch { - setError('Failed to fetch messages') - return false - } finally { - if (showLoading) { - setIsLoading(false) - } - } - }, - [taskId], - ) - - const syncFinalMessages = useCallback( - async (expectedContent?: string | null) => { - const normalizedExpectedContent = expectedContent?.trim() || null - if (!normalizedExpectedContent) { - void refreshMessages(false) - return - } - - for (let attempt = 0; attempt < 10; attempt += 1) { - const refreshed = await refreshMessages(false) - if (!refreshed) { - await new Promise((resolve) => window.setTimeout(resolve, 400)) - continue - } - - if (hasPersistedAssistantContent(persistedMessagesRef.current, normalizedExpectedContent)) { - return - } - - await new Promise((resolve) => window.setTimeout(resolve, attempt < 4 ? 300 : 700)) - } - }, - [refreshMessages], - ) - - const refreshChatRuntime = useCallback(async (): Promise => { - if (!isGatewayTask) { - return false - } - - try { - const response = await fetch(`/api/tasks/${taskId}/chat/runtime`, { - cache: 'no-store', - }) - - if (!response.ok) { - return false - } - - const data = (await response.json()) as ChatRuntimeRouteResponse - return applyGatewayRuntimeSnapshot(data.data) - } catch { - return false - } - }, [applyGatewayRuntimeSnapshot, isGatewayTask, taskId]) - - const prewarmChatRuntime = useCallback(async (): Promise => { - if (!isGatewayTask) { - return false - } - - try { - const response = await fetch(`/api/tasks/${taskId}/chat/prewarm`, { - method: 'POST', - }) - - if (!response.ok) { - return false - } - - const data = (await response.json()) as ChatRuntimeRouteResponse - return applyGatewayRuntimeSnapshot(data.data) - } catch { - return false - } - }, [applyGatewayRuntimeSnapshot, isGatewayTask, taskId]) - - useEffect(() => { - void refreshMessages(true) - }, [refreshMessages]) - - useEffect(() => { - return () => { - clearGatewayReconnectTimer() - } - }, [clearGatewayReconnectTimer]) - - useEffect(() => { - if (!isGatewayTask) { - setGatewaySessionId(null) - setGatewayStreamUrl(null) - return - } - - if (task.gatewaySessionId) { - setGatewaySessionId((previousSessionId) => previousSessionId || task.gatewaySessionId) - return - } - - if (!isTaskProcessing(task.status)) { - setGatewaySessionId(null) - setGatewayStreamUrl(null) - } - }, [isGatewayTask, task.gatewaySessionId, task.status]) - - useEffect(() => { - if (!isGatewayTask) { - setGatewayTurnPending(false) - setGatewaySessionId(null) - setGatewayStreamUrl(null) - setGatewayState(null) - return - } - - setGatewayTurnPending(isTaskProcessing(task.status)) - }, [isGatewayTask, task.status]) - - useEffect(() => { - const taskUpdateToken = task.updatedAt ? new Date(task.updatedAt).toISOString() : null - if (!taskUpdateToken) { - return - } - - if (!lastTaskUpdateRef.current) { - lastTaskUpdateRef.current = taskUpdateToken - return - } - - if (lastTaskUpdateRef.current === taskUpdateToken) { - return - } - - lastTaskUpdateRef.current = taskUpdateToken - - const hasActiveStream = gatewayState?.activeTurn || gatewayTurnPending - if (!hasActiveStream) { - void refreshMessages(false) - } - }, [gatewayState?.activeTurn, gatewayTurnPending, refreshMessages, task.updatedAt]) - - useEffect(() => { - if (!isGatewayTask) { - return - } - - if (gatewaySessionId || gatewayTurnPending || task.gatewaySessionId) { - void refreshChatRuntime() - } - }, [gatewaySessionId, gatewayTurnPending, isGatewayTask, refreshChatRuntime, task.gatewaySessionId]) - - useEffect(() => { - const prewarmToken = [ - task.runtimeName || '', - task.runtimeState || '', - task.workspacePreparedAt ? new Date(task.workspacePreparedAt).toISOString() : '', - task.gatewaySessionId || '', - gatewaySessionId || '', - ].join('|') - - if (!shouldPrewarmGatewayTask(task, gatewaySessionId) || lastPrewarmTokenRef.current === prewarmToken) { - return - } - - lastPrewarmTokenRef.current = prewarmToken - void prewarmChatRuntime() - }, [gatewaySessionId, prewarmChatRuntime, task]) - - useEffect(() => { - const shouldConnect = - isGatewayTask && Boolean(gatewayStreamUrl) && (isTaskProcessing(task.status) || gatewayTurnPending) - - if (!shouldConnect) { - gatewayEventSourceRef.current?.close() - gatewayEventSourceRef.current = null - return - } - - const source = new EventSource(gatewayStreamUrl!) - gatewayEventSourceRef.current = source - source.onopen = () => { - clearGatewayReconnectTimer() - gatewayReconnectAttemptRef.current = 0 - } - - source.addEventListener('state', (event) => { - const nextState = JSON.parse(event.data) as CodexGatewayState - const finalStreamingMessage = buildStreamingAgentMessageFromState(taskId, nextState) - - startTransition(() => { - setGatewayState(nextState) - - if (finalStreamingMessage) { - setRetainedStreamingMessage((previousMessage) => { - if (!previousMessage) { - return finalStreamingMessage - } - - return previousMessage.content.length >= finalStreamingMessage.content.length - ? previousMessage - : finalStreamingMessage - }) - } - }) - - if (!nextState.activeTurn && nextState.lastTurnStatus) { - clearGatewayReconnectTimer() - gatewayReconnectAttemptRef.current = 0 - setGatewayTurnPending(false) - setGatewayStreamUrl(null) - source.close() - - if (gatewayEventSourceRef.current === source) { - gatewayEventSourceRef.current = null - } - - void syncFinalMessages(finalStreamingMessage?.content) - } - }) - - source.addEventListener('session-closed', () => { - clearGatewayReconnectTimer() - gatewayReconnectAttemptRef.current = 0 - startTransition(() => { - setGatewayTurnPending(false) - setGatewaySessionId(null) - setGatewayStreamUrl(null) - }) - - source.close() - - if (gatewayEventSourceRef.current === source) { - gatewayEventSourceRef.current = null - } - - void syncFinalMessages(retainedStreamingContentRef.current) - }) - - source.onerror = () => { - if (gatewayEventSourceRef.current !== source) { - return - } - - if (!isTaskProcessing(task.status) && !gatewayTurnPending) { - if (source.readyState === EventSource.CLOSED) { - gatewayEventSourceRef.current = null - } - - return - } - - clearGatewayReconnectTimer() - source.close() - - if (gatewayEventSourceRef.current === source) { - gatewayEventSourceRef.current = null - } - - const nextAttempt = gatewayReconnectAttemptRef.current + 1 - gatewayReconnectAttemptRef.current = nextAttempt - const reconnectDelayMs = Math.min(1000 * nextAttempt, 5000) - - gatewayReconnectTimeoutRef.current = window.setTimeout(() => { - gatewayReconnectTimeoutRef.current = null - void refreshChatRuntime() - }, reconnectDelayMs) - } - - return () => { - clearGatewayReconnectTimer() - source.close() - - if (gatewayEventSourceRef.current === source) { - gatewayEventSourceRef.current = null - } - } - }, [ - clearGatewayReconnectTimer, - gatewayStreamUrl, - gatewayTurnPending, - isGatewayTask, - refreshMessages, - refreshChatRuntime, - syncFinalMessages, - task.status, - taskId, - ]) - - const sendMessage = useCallback( - async (content: string): Promise => { - const trimmedContent = content.trim() - if (!trimmedContent || isSending) { - return { - success: false, - error: 'Message is required', - } - } - - const optimisticMessage = createOptimisticUserMessage(taskId, trimmedContent) - setPendingMessages((previousMessages) => [...previousMessages, optimisticMessage]) - setRetainedStreamingMessage(null) - setIsSending(true) - - try { - const response = await fetch( - isGatewayTask ? `/api/tasks/${taskId}/chat/turn` : `/api/tasks/${taskId}/continue`, - { - method: 'POST', - headers: { - 'Content-Type': 'application/json', - }, - body: JSON.stringify({ - ...(isGatewayTask ? { prompt: trimmedContent } : { message: trimmedContent }), - }), - }, - ) - - const data = (await response.json()) as ChatTurnRouteResponse & { - error?: string - } - - if (!response.ok) { - setPendingMessages((previousMessages) => - previousMessages.filter((message) => message.id !== optimisticMessage.id), - ) - - return { - success: false, - error: data.error || 'Failed to send message', - } - } - - startTransition(() => { - setGatewayTurnPending(isGatewayTask) - - if (isGatewayTask) { - clearGatewayReconnectTimer() - gatewayReconnectAttemptRef.current = 0 - setGatewaySessionId(data.data?.session?.sessionId || null) - setGatewayState(null) - setGatewayStreamUrl(data.data?.stream?.streamUrl || data.data?.turn?.streamUrl || null) - } - }) - - void refreshMessages(false) - - return { - success: true, - } - } catch { - setPendingMessages((previousMessages) => - previousMessages.filter((message) => message.id !== optimisticMessage.id), - ) - return { - success: false, - error: 'Failed to send message', - } - } finally { - setIsSending(false) - } - }, - [clearGatewayReconnectTimer, isGatewayTask, isSending, refreshMessages, taskId], - ) - - const retryMessage = useCallback( - async (content: string): Promise => { - return await sendMessage(content) - }, - [sendMessage], - ) - - const stopTask = useCallback(async (): Promise => { - setIsStopping(true) - - try { - const response = await fetch(`/api/tasks/${taskId}`, { - method: 'PATCH', - headers: { - 'Content-Type': 'application/json', - }, - body: JSON.stringify({ action: 'stop' }), - }) - - if (!response.ok) { - const data = (await response.json()) as { - error?: string - } - - return { - success: false, - error: data.error || 'Failed to stop task', - } - } - - startTransition(() => { - clearGatewayReconnectTimer() - gatewayReconnectAttemptRef.current = 0 - setGatewayTurnPending(false) - setGatewaySessionId(null) - setGatewayStreamUrl(null) - setGatewayState(null) - setRetainedStreamingMessage(null) - }) - gatewayEventSourceRef.current?.close() - gatewayEventSourceRef.current = null - - return { - success: true, - } - } catch { - return { - success: false, - error: 'Failed to stop task', - } - } finally { - setIsStopping(false) - } - }, [clearGatewayReconnectTimer, taskId]) - - const streamingMessage = useMemo( - () => buildStreamingAgentMessage(taskId, gatewayState, persistedMessages), - [gatewayState, persistedMessages, taskId], - ) - - const activeStreamingMessage = streamingMessage || retainedStreamingMessage - - const messages = useMemo( - () => combineChatMessages(persistedMessages, pendingMessages, activeStreamingMessage), - [activeStreamingMessage, pendingMessages, persistedMessages], - ) - - return { - error, - isGatewayTask, - isLoading, - isSending, - isStopping, - isStreaming: Boolean(gatewayState?.activeTurn || gatewayTurnPending), - messages, - refreshMessages, - retryMessage, - sendMessage, - stopTask, - } -} diff --git a/lib/task-agent-events.ts b/lib/task-agent-events.ts index 1620a96..4947b07 100644 --- a/lib/task-agent-events.ts +++ b/lib/task-agent-events.ts @@ -206,6 +206,31 @@ function buildSummaryEventItem(summaryEvent: CodexGatewaySummaryEvent, index: nu } } +function buildSummaryItemsFromSnapshotEvent(event: TaskEvent): TaskAgentActivityItem[] { + const recentEvents = Array.isArray(event.payload?.recentEvents) ? event.payload.recentEvents : [] + + return recentEvents.flatMap((summaryEvent, index) => { + if (!summaryEvent || typeof summaryEvent !== 'object') { + return [] + } + + const item = buildSummaryEventItem( + { + at: normalizeOccurredAt(summaryEvent.at, event.createdAt), + type: typeof summaryEvent.type === 'string' ? summaryEvent.type : 'event', + method: typeof summaryEvent.method === 'string' ? summaryEvent.method : null, + itemType: typeof summaryEvent.itemType === 'string' ? summaryEvent.itemType : null, + itemId: typeof summaryEvent.itemId === 'string' ? summaryEvent.itemId : null, + status: typeof summaryEvent.status === 'string' ? summaryEvent.status : null, + textPreview: typeof summaryEvent.textPreview === 'string' ? summaryEvent.textPreview : null, + }, + index, + ) + + return item ? [item] : [] + }) +} + function buildTaskEventItem(event: TaskEvent): TaskAgentActivityItem | null { if (event.kind === 'gateway.warning') { return { @@ -267,34 +292,10 @@ function buildTaskEventItem(event: TaskEvent): TaskAgentActivityItem | null { export function buildAgentActivityItemsFromTaskEvents(events: TaskEvent[]): TaskAgentActivityItem[] { const items: TaskAgentActivityItem[] = [] + const latestStateSnapshot = [...events].reverse().find((event) => event.kind === 'gateway.state.snapshot') for (const event of events) { if (event.kind === 'gateway.state.snapshot') { - const recentEvents = Array.isArray(event.payload?.recentEvents) ? event.payload.recentEvents : [] - - for (const [index, summaryEvent] of recentEvents.entries()) { - if (!summaryEvent || typeof summaryEvent !== 'object') { - continue - } - - const item = buildSummaryEventItem( - { - at: normalizeOccurredAt(summaryEvent.at, event.createdAt), - type: typeof summaryEvent.type === 'string' ? summaryEvent.type : 'event', - method: typeof summaryEvent.method === 'string' ? summaryEvent.method : null, - itemType: typeof summaryEvent.itemType === 'string' ? summaryEvent.itemType : null, - itemId: typeof summaryEvent.itemId === 'string' ? summaryEvent.itemId : null, - status: typeof summaryEvent.status === 'string' ? summaryEvent.status : null, - textPreview: typeof summaryEvent.textPreview === 'string' ? summaryEvent.textPreview : null, - }, - index, - ) - - if (item) { - items.push(item) - } - } - continue } @@ -304,19 +305,24 @@ export function buildAgentActivityItemsFromTaskEvents(events: TaskEvent[]): Task } } - const seenIds = new Set() + if (latestStateSnapshot) { + items.push(...buildSummaryItemsFromSnapshotEvent(latestStateSnapshot)) + } - return items - .toSorted((left, right) => left.occurredAt.localeCompare(right.occurredAt)) - .filter((item) => { - const key = `${item.label}|${item.detail}|${item.occurredAt}` - if (seenIds.has(key)) { - return false - } + const latestByIdentity = new Map() + + for (const item of items) { + const identityKey = `${item.groupKey}|${item.label}|${item.detail}` + const previousItem = latestByIdentity.get(identityKey) + + if (!previousItem || previousItem.occurredAt.localeCompare(item.occurredAt) <= 0) { + latestByIdentity.set(identityKey, item) + } + } - seenIds.add(key) - return true - }) + return Array.from(latestByIdentity.values()).toSorted((left, right) => + left.occurredAt.localeCompare(right.occurredAt), + ) } export function buildAgentActivityItemsFromState(state: CodexGatewayState | null): TaskAgentActivityItem[] { diff --git a/lib/task-chat.ts b/lib/task-chat.ts index 495d5d0..9df2372 100644 --- a/lib/task-chat.ts +++ b/lib/task-chat.ts @@ -1,6 +1,7 @@ import type { CodexGatewayState } from '@/lib/codex-gateway/types' import { getAssistantContentAfterLastUser, getLastUserTranscriptIndex } from '@/lib/codex-gateway/transcript' import type { TaskMessage } from '@/lib/db/schema' +import { buildProjectedAssistantMessageId } from '@/lib/task-message-ids' export interface OptimisticTaskMessage extends TaskMessage { optimistic: true @@ -8,6 +9,11 @@ export interface OptimisticTaskMessage extends TaskMessage { export type ChatTaskMessage = TaskMessage | OptimisticTaskMessage +export interface LiveAssistantMessageIdentity { + sessionId: string + transcriptCursor: number +} + export interface ChatTurn { id: string userMessage: ChatTaskMessage | null @@ -26,12 +32,17 @@ function isOptimisticMessage(message: ChatTaskMessage): message is OptimisticTas return 'optimistic' in message && message.optimistic === true } -export function createOptimisticUserMessage(taskId: string, content: string): OptimisticTaskMessage { +export function createOptimisticUserMessage( + taskId: string, + content: string, + clientMessageId: string, +): OptimisticTaskMessage { return { - id: `optimistic-user-${Date.now()}`, + id: `optimistic-user-${clientMessageId}`, taskId, role: 'user', content, + clientMessageId, createdAt: new Date(), optimistic: true, } @@ -72,9 +83,18 @@ export function reconcileOptimisticMessages( persistedMessages: TaskMessage[], ): OptimisticTaskMessage[] { const persistedUserMessages = persistedMessages.filter((message) => message.role === 'user') + const persistedMessageIds = new Set( + persistedUserMessages + .map((message) => (typeof message.clientMessageId === 'string' ? message.clientMessageId : null)) + .filter((messageId): messageId is string => Boolean(messageId)), + ) const consumedPersistedIds = new Set() return pendingMessages.filter((pendingMessage) => { + if (pendingMessage.clientMessageId && persistedMessageIds.has(pendingMessage.clientMessageId)) { + return false + } + const pendingCreatedAt = getTimestamp(pendingMessage.createdAt) const matchingPersistedMessage = persistedUserMessages.find((persistedMessage) => { if (consumedPersistedIds.has(persistedMessage.id)) { @@ -103,6 +123,7 @@ export function buildStreamingAgentMessage( taskId: string, gatewayState: CodexGatewayState | null, persistedMessages: TaskMessage[], + identity?: LiveAssistantMessageIdentity | null, ): TaskMessage | null { if (!gatewayState) { return null @@ -126,6 +147,18 @@ export function buildStreamingAgentMessage( return null } + const streamingMessageId = + identity && Number.isFinite(identity.transcriptCursor) + ? buildProjectedAssistantMessageId(identity.sessionId, identity.transcriptCursor) + : null + + if ( + streamingMessageId && + persistedMessages.some((message) => message.role === 'agent' && message.id === streamingMessageId) + ) { + return null + } + const latestPersistedAgentMessage = [...persistedMessages].reverse().find((message) => message.role === 'agent') const latestPersistedContent = latestPersistedAgentMessage ? parseTaskAgentMessage(latestPersistedAgentMessage.content).trim() @@ -141,10 +174,11 @@ export function buildStreamingAgentMessage( const latestAssistantEntry = assistantEntries[assistantEntries.length - 1] return { - id: `gateway-stream-${latestAssistantEntry.id}`, + id: streamingMessageId || `gateway-stream-${latestAssistantEntry.id}`, taskId, role: 'agent', content: streamingContent, + clientMessageId: null, createdAt: new Date(latestAssistantEntry.createdAt), } } @@ -152,6 +186,7 @@ export function buildStreamingAgentMessage( export function buildStreamingAgentMessageFromState( taskId: string, gatewayState: CodexGatewayState | null, + identity?: LiveAssistantMessageIdentity | null, ): TaskMessage | null { if (!gatewayState) { return null @@ -176,12 +211,17 @@ export function buildStreamingAgentMessageFromState( } const latestAssistantEntry = assistantEntries[assistantEntries.length - 1] + const streamingMessageId = + identity && Number.isFinite(identity.transcriptCursor) + ? buildProjectedAssistantMessageId(identity.sessionId, identity.transcriptCursor) + : null return { - id: `gateway-stream-${latestAssistantEntry.id}`, + id: streamingMessageId || `gateway-stream-${latestAssistantEntry.id}`, taskId, role: 'agent', content, + clientMessageId: null, createdAt: new Date(latestAssistantEntry.createdAt), } } @@ -209,6 +249,18 @@ export function hasPersistedAssistantContent( }) } +export function hasPersistedAssistantIdentity( + persistedMessages: TaskMessage[], + identity: LiveAssistantMessageIdentity | null | undefined, +): boolean { + if (!identity) { + return false + } + + const messageId = buildProjectedAssistantMessageId(identity.sessionId, identity.transcriptCursor) + return persistedMessages.some((message) => message.role === 'agent' && message.id === messageId) +} + export function combineChatMessages( persistedMessages: TaskMessage[], pendingMessages: OptimisticTaskMessage[], @@ -216,7 +268,15 @@ export function combineChatMessages( ): ChatTaskMessage[] { const combinedMessages: ChatTaskMessage[] = [...persistedMessages, ...pendingMessages] - if (streamingMessage && !hasPersistedAssistantContent(persistedMessages, streamingMessage.content)) { + const hasPersistedStreamingMessage = streamingMessage + ? persistedMessages.some((message) => message.role === 'agent' && message.id === streamingMessage.id) + : false + + if ( + streamingMessage && + !hasPersistedStreamingMessage && + !hasPersistedAssistantContent(persistedMessages, streamingMessage.content) + ) { combinedMessages.push(streamingMessage) } diff --git a/lib/task-event-projection.ts b/lib/task-event-projection.ts index 3f798a5..d6cae62 100644 --- a/lib/task-event-projection.ts +++ b/lib/task-event-projection.ts @@ -1,14 +1,14 @@ import { and, asc, desc, eq, inArray } from 'drizzle-orm' -import { getAssistantContentAfterCursor } from '@/lib/codex-gateway/transcript' import { db } from '@/lib/db/client' import { taskEvents, taskMessages, type TaskEvent } from '@/lib/db/schema' +import { + buildProjectedAssistantMessageId, + buildTaskClientMessageId, + buildTaskEventUserMessageId, +} from '@/lib/task-message-ids' import { generateId } from '@/lib/utils/id' -const PROJECTABLE_EVENT_KINDS = [ - 'user_message.created', - 'assistant.message.projected', - 'gateway.state.snapshot', -] as const +const PROJECTABLE_EVENT_KINDS = ['user_message.created', 'assistant.message.projected'] as const function parseTranscriptCursor(value: unknown): number | null { if (typeof value === 'number' && Number.isFinite(value)) { @@ -23,59 +23,43 @@ function parseTranscriptCursor(value: unknown): number | null { return null } -function extractProjectedAssistantSnapshot(event: TaskEvent): { - content: string - messageId?: string -} | null { - const payload = event.payload || null - const sessionId = typeof payload?.sessionId === 'string' ? payload.sessionId : event.sessionId || undefined - const transcriptCursor = parseTranscriptCursor(payload?.transcriptCursor) - - if (!sessionId || transcriptCursor === null) { - return null - } - - const transcriptValue = payload?.transcript - if (!Array.isArray(transcriptValue)) { - return null - } - - const transcript = transcriptValue.flatMap((entry) => { - if (!entry || typeof entry !== 'object' || typeof entry.role !== 'string' || typeof entry.text !== 'string') { - return [] - } - - return [{ role: entry.role, text: entry.text }] - }) - - const content = getAssistantContentAfterCursor(transcriptCursor, transcript) - if (!content) { - return null - } - - return { - content, - messageId: buildProjectedAssistantMessageId(sessionId, transcriptCursor), - } -} - -export function buildTaskEventUserMessageId(eventId: string): string { - return `task-user-event-${eventId}` -} - -export function buildProjectedAssistantMessageId(sessionId: string, transcriptCursor: number): string { - return `codex-agent-${sessionId}-${transcriptCursor}` -} - export async function projectUserMessageFromEvent( - event: Pick, + event: Pick, ): Promise { const content = typeof event.payload?.content === 'string' ? event.payload.content.trim() : '' + const clientMessageId = + typeof event.clientMessageId === 'string' && event.clientMessageId.trim() + ? event.clientMessageId.trim() + : typeof event.payload?.clientMessageId === 'string' && event.payload.clientMessageId.trim() + ? event.payload.clientMessageId.trim() + : null if (!content) { return } + if (clientMessageId) { + await db + .insert(taskMessages) + .values({ + id: buildTaskClientMessageId(clientMessageId), + taskId: event.taskId, + role: 'user', + content, + clientMessageId, + createdAt: event.createdAt, + }) + .onConflictDoUpdate({ + target: [taskMessages.taskId, taskMessages.clientMessageId], + set: { + content, + createdAt: event.createdAt, + }, + }) + + return + } + await db .insert(taskMessages) .values({ @@ -153,20 +137,17 @@ export async function reconcileProjectedTaskMessages(taskId: string): Promise | null @@ -39,6 +37,18 @@ export async function recordTaskEvent(input: RecordTaskEventInput): Promise { await tx.execute(sql`select pg_advisory_xact_lock(hashtext(${input.taskId}))`) + if (input.clientMessageId) { + const [existingEvent] = await tx + .select() + .from(taskEvents) + .where(and(eq(taskEvents.taskId, input.taskId), eq(taskEvents.clientMessageId, input.clientMessageId))) + .limit(1) + + if (existingEvent) { + return existingEvent + } + } + const [latestEvent] = await tx .select({ seq: taskEvents.seq }) .from(taskEvents) @@ -55,6 +65,7 @@ export async function recordTaskEvent(input: RecordTaskEventInput): Promise