From b549addb83f6bb89dd7f963e5cf839806e2a9f7c Mon Sep 17 00:00:00 2001 From: zjy365 <3161362058@qq.com> Date: Mon, 20 Apr 2026 21:09:51 +0800 Subject: [PATCH 1/7] refactor(chat): make codex runtime event-first and harden stream recovery --- .../tasks/[taskId]/chat/interrupt/route.ts | 75 ++ app/api/tasks/[taskId]/chat/turn/route.ts | 72 +- app/api/tasks/[taskId]/chat/v2/route.ts | 228 ++++ .../tasks/[taskId]/chat/v2/stream/route.ts | 295 +++++ app/api/tasks/[taskId]/events/route.ts | 49 + app/api/tasks/[taskId]/messages/route.ts | 17 +- app/api/tasks/[taskId]/route.ts | 19 +- app/api/tasks/route.ts | 43 +- codex-gateway | 1 + components/home-page-content.tsx | 5 +- components/repo-selector.tsx | 182 ++- components/sealos-home-page-content.tsx | 1 + components/task-agent-activity.tsx | 75 ++ components/task-chat-transcript.tsx | 9 +- components/task-chat.tsx | 18 +- components/task-form.tsx | 52 +- lib/codex-gateway/chat-v2-service.ts | 95 ++ lib/codex-gateway/client.ts | 20 + lib/codex-gateway/completion.ts | 132 +- lib/codex-gateway/runner.ts | 16 +- lib/db/migrations/0026_marvelous_firelord.sql | 28 + lib/db/migrations/meta/0026_snapshot.json | 1094 +++++++++++++++++ lib/db/migrations/meta/_journal.json | 7 + lib/db/schema.ts | 113 ++ lib/devbox/client.ts | 5 + lib/hooks/use-task-agent-chat-v2.ts | 422 +++++++ lib/hooks/use-task.ts | 7 + lib/task-agent-events.ts | 233 ++++ lib/task-chat-v2.ts | 84 ++ lib/task-event-projection.ts | 182 +++ lib/task-events.ts | 219 ++++ lib/utils/async.ts | 25 + 32 files changed, 3529 insertions(+), 294 deletions(-) create mode 100644 app/api/tasks/[taskId]/chat/interrupt/route.ts create mode 100644 app/api/tasks/[taskId]/chat/v2/route.ts create mode 100644 app/api/tasks/[taskId]/chat/v2/stream/route.ts create mode 100644 app/api/tasks/[taskId]/events/route.ts create mode 160000 codex-gateway create mode 100644 components/task-agent-activity.tsx create mode 100644 lib/codex-gateway/chat-v2-service.ts create mode 100644 lib/db/migrations/0026_marvelous_firelord.sql create mode 100644 lib/db/migrations/meta/0026_snapshot.json create mode 100644 lib/hooks/use-task-agent-chat-v2.ts create mode 100644 lib/task-agent-events.ts create mode 100644 lib/task-chat-v2.ts create mode 100644 lib/task-event-projection.ts create mode 100644 lib/task-events.ts create mode 100644 lib/utils/async.ts diff --git a/app/api/tasks/[taskId]/chat/interrupt/route.ts b/app/api/tasks/[taskId]/chat/interrupt/route.ts new file mode 100644 index 0000000..5f24454 --- /dev/null +++ b/app/api/tasks/[taskId]/chat/interrupt/route.ts @@ -0,0 +1,75 @@ +import { and, eq, isNull } from 'drizzle-orm' +import { NextResponse } from 'next/server' +import { CodexGatewayApiError, interruptCodexGatewayTurn } from '@/lib/codex-gateway/client' +import { hasActiveTurnCheckpoint } from '@/lib/codex-gateway/completion' +import { getTaskGatewayContext } from '@/lib/codex-gateway/task' +import { db } from '@/lib/db/client' +import { tasks } from '@/lib/db/schema' +import { getServerSession } from '@/lib/session/get-server-session' + +export const runtime = 'nodejs' +export const dynamic = 'force-dynamic' +export const maxDuration = 60 + +interface RouteParams { + params: Promise<{ + taskId: string + }> +} + +export async function POST(_request: Request, { params }: RouteParams) { + try { + const session = await getServerSession() + if (!session?.user?.id) { + return NextResponse.json({ error: 'Unauthorized' }, { status: 401 }) + } + + const { taskId } = await params + const [task] = await db + .select() + .from(tasks) + .where(and(eq(tasks.id, taskId), eq(tasks.userId, session.user.id), isNull(tasks.deletedAt))) + .limit(1) + + if (!task) { + return NextResponse.json({ error: 'Task not found' }, { status: 404 }) + } + + if (task.selectedAgent !== 'codex') { + return NextResponse.json({ error: 'Unsupported agent' }, { status: 400 }) + } + + if (!hasActiveTurnCheckpoint(task) || !task.activeTurnSessionId) { + return NextResponse.json({ error: 'Task does not have an active turn' }, { status: 409 }) + } + + const { gatewayUrl, gatewayAuthToken } = await getTaskGatewayContext(taskId, session.user.id) + if (!gatewayUrl) { + return NextResponse.json({ error: 'Gateway URL is not configured' }, { status: 400 }) + } + + const result = await interruptCodexGatewayTurn(gatewayUrl, task.activeTurnSessionId, gatewayAuthToken) + + return NextResponse.json({ + success: true, + data: { + sessionId: result.sessionId, + state: result.state, + }, + }) + } catch (error) { + if (error instanceof CodexGatewayApiError) { + return NextResponse.json( + { + error: 'Failed to interrupt active turn', + statusCode: error.status, + message: error.message, + }, + { status: error.status >= 400 && error.status < 500 ? error.status : 502 }, + ) + } + + console.error('Failed to interrupt chat turn:', error) + return NextResponse.json({ error: 'Failed to interrupt active turn' }, { status: 500 }) + } +} diff --git a/app/api/tasks/[taskId]/chat/turn/route.ts b/app/api/tasks/[taskId]/chat/turn/route.ts index 4370553..53f814d 100644 --- a/app/api/tasks/[taskId]/chat/turn/route.ts +++ b/app/api/tasks/[taskId]/chat/turn/route.ts @@ -2,16 +2,11 @@ import { after, NextRequest, NextResponse } from 'next/server' import { and, eq, isNull } from 'drizzle-orm' import { z } from 'zod' import { CodexGatewayApiError } from '@/lib/codex-gateway/client' -import { startCodexGatewayTaskTurn, waitForCodexGatewayTurnCompletion } from '@/lib/codex-gateway/runner' -import { createTaskChatStreamDescriptor } from '@/lib/codex-gateway/stream-ticket' +import { finalizeTaskChatV2Turn, startTaskChatV2Turn } from '@/lib/codex-gateway/chat-v2-service' import { db } from '@/lib/db/client' import { tasks } from '@/lib/db/schema' -import { ensureTaskDevboxRuntime } from '@/lib/devbox/runtime' import { getServerSession } from '@/lib/session/get-server-session' -import { appendTaskMessage } from '@/lib/task-messages' import { checkRateLimit } from '@/lib/utils/rate-limit' -import { createTaskLogger } from '@/lib/utils/task-logger' -import { formatKeyTaskLogMessage, TASK_FLOW_LOGS } from '@/lib/utils/task-flow-logs' export const runtime = 'nodejs' export const dynamic = 'force-dynamic' @@ -80,58 +75,15 @@ export async function POST(request: NextRequest, { params }: RouteParams) { } const prompt = parsed.data.prompt || parsed.data.message || '' - const logger = createTaskLogger(taskId) - let userMessagePersisted = false - const userInputReceivedLog = formatKeyTaskLogMessage(TASK_FLOW_LOGS.USER_INPUT_RECEIVED, { - promptChars: prompt.length, + const result = await startTaskChatV2Turn({ + task, + prompt, source: 'chat-turn', }) - await logger.info(userInputReceivedLog) - console.info(userInputReceivedLog) - - try { - await appendTaskMessage({ - taskId: resolvedTaskId, - role: 'user', - content: prompt, - }) - userMessagePersisted = true - const userInputSavedLog = formatKeyTaskLogMessage(TASK_FLOW_LOGS.USER_INPUT_SAVED, { - promptChars: prompt.length, - source: 'chat-turn', - }) - await logger.info(userInputSavedLog) - console.info(userInputSavedLog) - } catch { - console.error('Failed to persist chat turn user message') - } - - await db - .update(tasks) - .set({ - status: 'processing', - progress: 0, - error: null, - completedAt: null, - updatedAt: new Date(), - }) - .where(eq(tasks.id, resolvedTaskId)) - - await ensureTaskDevboxRuntime(task, { logger }) - - const startedTurn = await startCodexGatewayTaskTurn(resolvedTaskId, prompt, { - appendUserMessage: !userMessagePersisted, - model: task.selectedModel, - }) - const stream = await createTaskChatStreamDescriptor({ - taskId: resolvedTaskId, - userId: session.user.id, - sessionId: startedTurn.sessionId, - }) after(async () => { try { - await waitForCodexGatewayTurnCompletion(startedTurn) + await finalizeTaskChatV2Turn(result.startedTurn) } catch (error) { console.error('Failed to finalize chat turn:', error) @@ -143,8 +95,6 @@ export async function POST(request: NextRequest, { params }: RouteParams) { updatedAt: new Date(), }) .where(eq(tasks.id, resolvedTaskId)) - - await logger.error('Failed to finalize chat turn') } }) @@ -152,14 +102,16 @@ export async function POST(request: NextRequest, { params }: RouteParams) { success: true, data: { session: { - sessionId: startedTurn.sessionId, + sessionId: result.startedTurn.sessionId, + threadId: result.startedTurn.threadId, + turnId: result.startedTurn.turnId, }, - stream, + stream: result.stream, turn: { - transcriptCursor: startedTurn.transcriptCursor, + transcriptCursor: result.startedTurn.transcriptCursor, turnAccepted: true, - turnStartedAt: startedTurn.startedAt.toISOString(), - streamUrl: stream.streamUrl, + turnStartedAt: result.startedTurn.startedAt.toISOString(), + streamUrl: result.stream.streamUrl, }, }, }) diff --git a/app/api/tasks/[taskId]/chat/v2/route.ts b/app/api/tasks/[taskId]/chat/v2/route.ts new file mode 100644 index 0000000..72d2f9e --- /dev/null +++ b/app/api/tasks/[taskId]/chat/v2/route.ts @@ -0,0 +1,228 @@ +import { after, NextRequest, NextResponse } from 'next/server' +import { and, asc, eq, isNull } from 'drizzle-orm' +import { z } from 'zod' +import { CodexGatewayApiError } from '@/lib/codex-gateway/client' +import { finalizeTaskChatV2Turn, startTaskChatV2Turn } from '@/lib/codex-gateway/chat-v2-service' +import { + hasActiveTurnCheckpoint, + reconcileIncompleteTurnSafely, + shouldAttemptTurnReconciliation, +} from '@/lib/codex-gateway/completion' +import { db } from '@/lib/db/client' +import { taskMessages, tasks } from '@/lib/db/schema' +import { reconcileProjectedTaskMessages } from '@/lib/task-event-projection' +import { + closeTaskChatV2StreamDescriptor, + ensureTaskChatV2StreamDescriptor, + getActiveTaskChatV2StreamDescriptor, +} from '@/lib/task-chat-v2' +import { listTaskEvents } from '@/lib/task-events' +import { getServerSession } from '@/lib/session/get-server-session' +import { checkRateLimit } from '@/lib/utils/rate-limit' + +export const runtime = 'nodejs' +export const dynamic = 'force-dynamic' +export const maxDuration = 300 + +const turnSchema = z.object({ + prompt: z.string().trim().min(1, 'Prompt is required'), +}) + +interface RouteParams { + params: Promise<{ + taskId: string + }> +} + +async function getOwnedTask(taskId: string, userId: string) { + const [task] = await db + .select() + .from(tasks) + .where(and(eq(tasks.id, taskId), eq(tasks.userId, userId), isNull(tasks.deletedAt))) + .limit(1) + + return task || null +} + +export async function GET(_request: NextRequest, { params }: RouteParams) { + try { + const session = await getServerSession() + if (!session?.user?.id) { + return NextResponse.json({ error: 'Unauthorized' }, { status: 401 }) + } + + const { taskId } = await params + let task = await getOwnedTask(taskId, session.user.id) + + if (!task) { + return NextResponse.json({ error: 'Task not found' }, { status: 404 }) + } + + if ( + task.selectedAgent === 'codex' && + hasActiveTurnCheckpoint(task) && + shouldAttemptTurnReconciliation(task, 5_000) + ) { + try { + task = (await reconcileIncompleteTurnSafely(task.id, 2_500)) || task + } catch { + console.error('Failed to reconcile incomplete Codex turn') + } + } + + await reconcileProjectedTaskMessages(taskId) + + const refreshedTask = (await getOwnedTask(taskId, session.user.id)) || task + const messages = await db + .select() + .from(taskMessages) + .where(eq(taskMessages.taskId, taskId)) + .orderBy(asc(taskMessages.createdAt)) + + let stream = await getActiveTaskChatV2StreamDescriptor(taskId) + + if ( + refreshedTask.selectedAgent === 'codex' && + hasActiveTurnCheckpoint(refreshedTask) && + refreshedTask.activeTurnSessionId + ) { + stream = await ensureTaskChatV2StreamDescriptor({ + taskId, + sessionId: refreshedTask.activeTurnSessionId, + threadId: null, + turnId: null, + startedAt: refreshedTask.activeTurnStartedAt || undefined, + }) + } else if (stream) { + await closeTaskChatV2StreamDescriptor(taskId) + stream = null + } + + const events = await listTaskEvents(taskId, { limit: 200 }) + + return NextResponse.json({ + success: true, + data: { + task: refreshedTask, + messages, + events, + stream, + }, + }) + } catch (error) { + console.error('Failed to fetch chat v2 state:', error) + return NextResponse.json({ error: 'Failed to fetch chat state' }, { status: 500 }) + } +} + +export async function POST(request: NextRequest, { params }: RouteParams) { + let taskId: string | null = null + + try { + const session = await getServerSession() + if (!session?.user?.id) { + return NextResponse.json({ error: 'Unauthorized' }, { status: 401 }) + } + + const rateLimit = await checkRateLimit(session.user.id) + if (!rateLimit.allowed) { + return NextResponse.json( + { + error: 'Rate limit exceeded', + message: `You have reached the daily limit of ${rateLimit.total} messages (tasks + follow-ups). Your limit will reset at ${rateLimit.resetAt.toISOString()}`, + remaining: rateLimit.remaining, + total: rateLimit.total, + resetAt: rateLimit.resetAt.toISOString(), + }, + { status: 429 }, + ) + } + + ;({ taskId } = await params) + const resolvedTaskId = taskId + + const body = await request.json().catch(() => ({})) + const parsed = turnSchema.safeParse(body) + if (!parsed.success) { + return NextResponse.json({ error: 'Prompt is required' }, { status: 400 }) + } + + const task = await getOwnedTask(resolvedTaskId, session.user.id) + if (!task) { + return NextResponse.json({ error: 'Task not found' }, { status: 404 }) + } + + if (task.selectedAgent !== 'codex') { + return NextResponse.json({ error: 'Unsupported agent' }, { status: 400 }) + } + + const result = await startTaskChatV2Turn({ + task, + prompt: parsed.data.prompt, + source: 'chat-v2', + }) + + after(async () => { + try { + await finalizeTaskChatV2Turn(result.startedTurn) + } catch (error) { + console.error('Failed to finalize chat v2 turn:', error) + + await db + .update(tasks) + .set({ + status: 'error', + error: 'Failed to finalize chat turn', + updatedAt: new Date(), + }) + .where(eq(tasks.id, resolvedTaskId)) + } + }) + + return NextResponse.json({ + success: true, + data: { + session: { + sessionId: result.startedTurn.sessionId, + threadId: result.startedTurn.threadId, + turnId: result.startedTurn.turnId, + }, + stream: result.stream, + turn: { + transcriptCursor: result.startedTurn.transcriptCursor, + turnAccepted: true, + turnStartedAt: result.startedTurn.startedAt.toISOString(), + }, + }, + }) + } catch (error) { + if (taskId) { + try { + await db + .update(tasks) + .set({ + status: 'error', + error: 'Failed to start chat turn', + updatedAt: new Date(), + }) + .where(eq(tasks.id, taskId)) + } catch { + console.error('Failed to mark chat v2 turn as errored') + } + } + + if (error instanceof CodexGatewayApiError) { + return NextResponse.json( + { + error: 'Failed to start chat turn', + statusCode: error.status, + message: error.message, + }, + { status: error.status >= 400 && error.status < 500 ? error.status : 502 }, + ) + } + + console.error('Failed to start chat v2 turn:', error) + return NextResponse.json({ error: 'Failed to start chat turn' }, { status: 500 }) + } +} diff --git a/app/api/tasks/[taskId]/chat/v2/stream/route.ts b/app/api/tasks/[taskId]/chat/v2/stream/route.ts new file mode 100644 index 0000000..4c63bb6 --- /dev/null +++ b/app/api/tasks/[taskId]/chat/v2/stream/route.ts @@ -0,0 +1,295 @@ +import { NextRequest, NextResponse } from 'next/server' +import { getCodexGatewayEventStreamUrl } from '@/lib/codex-gateway/client' +import { getTaskGatewayContext } from '@/lib/codex-gateway/task' +import type { CodexGatewayState } from '@/lib/codex-gateway/types' +import { closeTaskStream, getTaskStream, recordTaskEvent, touchTaskStream } from '@/lib/task-events' +import { getServerSession } from '@/lib/session/get-server-session' + +export const runtime = 'nodejs' +export const dynamic = 'force-dynamic' +export const maxDuration = 300 + +interface RouteParams { + params: Promise<{ + taskId: string + }> +} + +function mapGatewayEventKind(eventName: string) { + switch (eventName) { + case 'session': + return 'gateway.session.opened' as const + case 'state': + return 'gateway.state.snapshot' as const + case 'notification': + return 'gateway.notification' as const + case 'server-request': + return 'gateway.server_request' as const + case 'warning': + return 'gateway.warning' as const + case 'session-closed': + return 'gateway.session.closed' as const + default: + return null + } +} + +function parseSseBlock(block: string): { + dataText: string + eventName: string +} | null { + if (!block.trim()) { + return null + } + + let eventName = 'message' + const dataLines: string[] = [] + + for (const line of block.split(/\r?\n/)) { + if (line.startsWith('event:')) { + eventName = line.slice('event:'.length).trim() || 'message' + continue + } + + if (line.startsWith('data:')) { + dataLines.push(line.slice('data:'.length).trimStart()) + } + } + + return { + eventName, + dataText: dataLines.join('\n'), + } +} + +async function persistGatewayEvent(input: { + eventName: string + payload: Record | null + sessionId: string + streamId: string + taskId: string + transcriptCursor: number | null +}) { + const eventKind = mapGatewayEventKind(input.eventName) + + if (!eventKind) { + return + } + + if (eventKind === 'gateway.session.opened') { + const nextSessionId = + typeof input.payload?.id === 'string' && input.payload.id.trim() ? input.payload.id.trim() : input.sessionId + + await touchTaskStream(input.streamId, { sessionId: nextSessionId }) + await recordTaskEvent({ + taskId: input.taskId, + streamId: input.streamId, + kind: eventKind, + sessionId: nextSessionId, + payload: input.payload, + }) + return + } + + if (eventKind === 'gateway.state.snapshot') { + const state = (input.payload || {}) as CodexGatewayState & Record + + await touchTaskStream(input.streamId, { + threadId: state.threadId || null, + turnId: state.currentTurnId || null, + }) + + await recordTaskEvent({ + taskId: input.taskId, + streamId: input.streamId, + kind: eventKind, + sessionId: input.sessionId, + threadId: state.threadId || null, + turnId: state.currentTurnId || null, + payload: { + ...state, + transcriptCursor: input.transcriptCursor, + }, + }) + + if (!state.activeTurn && state.lastTurnStatus) { + await closeTaskStream(input.streamId, 'closed') + } + + return + } + + await recordTaskEvent({ + taskId: input.taskId, + streamId: input.streamId, + kind: eventKind, + sessionId: input.sessionId, + payload: input.payload, + }) + + if (eventKind === 'gateway.session.closed') { + await closeTaskStream(input.streamId, 'closed') + } +} + +export async function GET(request: NextRequest, { params }: RouteParams) { + const decoder = new TextDecoder() + let streamId: string | null = null + let taskId: string | null = null + + try { + const session = await getServerSession() + if (!session?.user?.id) { + return NextResponse.json({ error: 'Unauthorized' }, { status: 401 }) + } + + ;({ taskId } = await params) + const resolvedTaskId = taskId + streamId = request.nextUrl.searchParams.get('streamId') + + if (!streamId) { + return NextResponse.json({ error: 'Missing stream id' }, { status: 400 }) + } + + const resolvedStreamId = streamId + + const stream = await getTaskStream(resolvedStreamId) + if (!stream || stream.taskId !== resolvedTaskId || stream.status !== 'active') { + return NextResponse.json({ error: 'Stream not found' }, { status: 404 }) + } + + const { task, gatewayUrl, gatewayAuthToken } = await getTaskGatewayContext(resolvedTaskId, session.user.id) + + if (!task) { + return NextResponse.json({ error: 'Task not found' }, { status: 404 }) + } + + if (!gatewayUrl) { + return NextResponse.json({ error: 'Gateway URL is not configured' }, { status: 400 }) + } + + if (task.activeTurnSessionId && task.activeTurnSessionId !== stream.sessionId) { + await closeTaskStream(resolvedStreamId, 'errored') + return NextResponse.json({ error: 'Stream session is no longer active' }, { status: 410 }) + } + + const upstream = await fetch(getCodexGatewayEventStreamUrl(gatewayUrl, stream.sessionId, gatewayAuthToken), { + headers: { + accept: 'text/event-stream', + }, + cache: 'no-store', + signal: AbortSignal.timeout(15_000), + }) + + if (!upstream.ok || !upstream.body) { + await closeTaskStream(resolvedStreamId, 'errored') + return NextResponse.json({ error: 'Failed to connect to Codex gateway events' }, { status: 502 }) + } + + const headers = new Headers() + headers.set('content-type', 'text/event-stream; charset=utf-8') + headers.set('cache-control', 'no-cache, no-transform') + headers.set('connection', 'keep-alive') + headers.set('x-accel-buffering', 'no') + + const transcriptCursor = + task.activeTurnSessionId === stream.sessionId && typeof task.activeTurnTranscriptCursor === 'number' + ? task.activeTurnTranscriptCursor + : null + + const reader = upstream.body.getReader() + let sseBuffer = '' + + const handleSseBlock = async (block: string) => { + const parsedBlock = parseSseBlock(block) + if (!parsedBlock || !parsedBlock.dataText) { + return + } + + try { + const payload = JSON.parse(parsedBlock.dataText) as Record + await persistGatewayEvent({ + taskId: resolvedTaskId, + streamId: resolvedStreamId, + sessionId: stream.sessionId, + eventName: parsedBlock.eventName, + payload, + transcriptCursor, + }) + } catch { + console.error('Failed to persist gateway stream event') + } + } + + const flushBufferedEvents = async (flushAll: boolean) => { + while (true) { + const separatorMatch = sseBuffer.match(/\r?\n\r?\n/) + if (!separatorMatch || separatorMatch.index === undefined) { + break + } + + const block = sseBuffer.slice(0, separatorMatch.index) + sseBuffer = sseBuffer.slice(separatorMatch.index + separatorMatch[0].length) + await handleSseBlock(block) + } + + if (flushAll && sseBuffer.trim()) { + const finalBlock = sseBuffer + sseBuffer = '' + await handleSseBlock(finalBlock) + } + } + + const streamResponse = new ReadableStream({ + async start(controller) { + try { + while (true) { + const { done, value } = await reader.read() + + if (done) { + break + } + + if (!value) { + continue + } + + controller.enqueue(value) + sseBuffer += decoder.decode(value, { stream: true }) + await flushBufferedEvents(false) + } + + sseBuffer += decoder.decode() + await flushBufferedEvents(true) + controller.close() + } catch (error) { + await closeTaskStream(resolvedStreamId, 'errored') + try { + controller.close() + } catch { + // Ignore close errors after upstream socket termination. + } + } finally { + reader.releaseLock() + } + }, + async cancel() { + await reader.cancel() + }, + }) + + return new Response(streamResponse, { + status: 200, + headers, + }) + } catch (error) { + if (streamId) { + await closeTaskStream(streamId, 'errored').catch(() => { + console.error('Failed to close chat v2 stream after proxy error') + }) + } + + console.error('Failed to proxy chat v2 stream:', error) + return NextResponse.json({ error: 'Failed to proxy chat stream' }, { status: 500 }) + } +} diff --git a/app/api/tasks/[taskId]/events/route.ts b/app/api/tasks/[taskId]/events/route.ts new file mode 100644 index 0000000..71d4761 --- /dev/null +++ b/app/api/tasks/[taskId]/events/route.ts @@ -0,0 +1,49 @@ +import { and, eq, isNull } from 'drizzle-orm' +import { NextRequest, NextResponse } from 'next/server' +import { db } from '@/lib/db/client' +import { tasks } from '@/lib/db/schema' +import { listTaskEvents } from '@/lib/task-events' +import { getServerSession } from '@/lib/session/get-server-session' + +export const runtime = 'nodejs' +export const dynamic = 'force-dynamic' +export const maxDuration = 60 + +interface RouteParams { + params: Promise<{ + taskId: string + }> +} + +export async function GET(request: NextRequest, { params }: RouteParams) { + try { + const session = await getServerSession() + if (!session?.user?.id) { + return NextResponse.json({ error: 'Unauthorized' }, { status: 401 }) + } + + const { taskId } = await params + const [task] = await db + .select({ id: tasks.id }) + .from(tasks) + .where(and(eq(tasks.id, taskId), eq(tasks.userId, session.user.id), isNull(tasks.deletedAt))) + .limit(1) + + if (!task) { + return NextResponse.json({ error: 'Task not found' }, { status: 404 }) + } + + const limitParam = Number.parseInt(request.nextUrl.searchParams.get('limit') || '', 10) + const events = await listTaskEvents(taskId, { + limit: Number.isFinite(limitParam) ? limitParam : 200, + }) + + return NextResponse.json({ + success: true, + events, + }) + } catch (error) { + console.error('Failed to fetch task events:', error) + return NextResponse.json({ error: 'Failed to fetch task events' }, { status: 500 }) + } +} diff --git a/app/api/tasks/[taskId]/messages/route.ts b/app/api/tasks/[taskId]/messages/route.ts index a67f58c..7344724 100644 --- a/app/api/tasks/[taskId]/messages/route.ts +++ b/app/api/tasks/[taskId]/messages/route.ts @@ -3,7 +3,12 @@ import { getServerSession } from '@/lib/session/get-server-session' import { db } from '@/lib/db/client' import { taskMessages, tasks } from '@/lib/db/schema' import { eq, and, asc, isNull } from 'drizzle-orm' -import { hasActiveTurnCheckpoint, reconcileIncompleteTurn } from '@/lib/codex-gateway/completion' +import { + hasActiveTurnCheckpoint, + reconcileIncompleteTurnSafely, + shouldAttemptTurnReconciliation, +} from '@/lib/codex-gateway/completion' +import { reconcileProjectedTaskMessages } from '@/lib/task-event-projection' export async function GET(req: NextRequest, context: { params: Promise<{ taskId: string }> }) { try { @@ -26,14 +31,20 @@ export async function GET(req: NextRequest, context: { params: Promise<{ taskId: return NextResponse.json({ error: 'Task not found' }, { status: 404 }) } - if (task[0].selectedAgent === 'codex' && hasActiveTurnCheckpoint(task[0])) { + if ( + task[0].selectedAgent === 'codex' && + hasActiveTurnCheckpoint(task[0]) && + shouldAttemptTurnReconciliation(task[0], 5_000) + ) { try { - await reconcileIncompleteTurn(taskId) + await reconcileIncompleteTurnSafely(taskId, 2_500) } catch { console.error('Failed to reconcile incomplete Codex turn') } } + await reconcileProjectedTaskMessages(taskId) + // Fetch all messages for this task, ordered by creation time const messages = await db .select() diff --git a/app/api/tasks/[taskId]/route.ts b/app/api/tasks/[taskId]/route.ts index 602570d..03cc59c 100644 --- a/app/api/tasks/[taskId]/route.ts +++ b/app/api/tasks/[taskId]/route.ts @@ -6,8 +6,13 @@ import { createTaskLogger } from '@/lib/utils/task-logger' import { deleteDevbox, DevboxApiError } from '@/lib/devbox/client' import { getServerSession } from '@/lib/session/get-server-session' import { CodexGatewayApiError, deleteCodexGatewaySession } from '@/lib/codex-gateway/client' -import { hasActiveTurnCheckpoint, reconcileIncompleteTurn } from '@/lib/codex-gateway/completion' +import { + hasActiveTurnCheckpoint, + reconcileIncompleteTurnSafely, + shouldAttemptTurnReconciliation, +} from '@/lib/codex-gateway/completion' import { getTaskGatewayContext } from '@/lib/codex-gateway/task' +import { closeTaskChatV2StreamDescriptor } from '@/lib/task-chat-v2' interface RouteParams { params: Promise<{ @@ -35,9 +40,13 @@ export async function GET(request: NextRequest, { params }: RouteParams) { let currentTask = task[0] - if (currentTask.selectedAgent === 'codex' && hasActiveTurnCheckpoint(currentTask)) { + if ( + currentTask.selectedAgent === 'codex' && + hasActiveTurnCheckpoint(currentTask) && + shouldAttemptTurnReconciliation(currentTask) + ) { try { - currentTask = (await reconcileIncompleteTurn(currentTask.id)) || currentTask + currentTask = (await reconcileIncompleteTurnSafely(currentTask.id)) || currentTask } catch { console.error('Failed to reconcile incomplete Codex turn') } @@ -127,6 +136,10 @@ export async function PATCH(request: NextRequest, { params }: RouteParams) { .where(eq(tasks.id, taskId)) .returning() + await closeTaskChatV2StreamDescriptor(taskId).catch(() => { + console.error('Failed to close active chat stream during stop') + }) + if (existingTask.runtimeName) { try { await deleteDevbox(existingTask.runtimeName) diff --git a/app/api/tasks/route.ts b/app/api/tasks/route.ts index 1fbac32..cb8cb9b 100644 --- a/app/api/tasks/route.ts +++ b/app/api/tasks/route.ts @@ -1,17 +1,14 @@ import { NextRequest, NextResponse, after } from 'next/server' import { and, desc, eq, isNull, or } from 'drizzle-orm' -import { startCodexGatewayTaskTurn, waitForCodexGatewayTurnCompletion } from '@/lib/codex-gateway/runner' +import { finalizeTaskChatV2Turn, startTaskChatV2Turn } from '@/lib/codex-gateway/chat-v2-service' import { FORCED_CODEX_MODEL } from '@/lib/codex/defaults' import { db } from '@/lib/db/client' import { insertTaskSchema, tasks } from '@/lib/db/schema' -import { ensureTaskDevboxRuntime } from '@/lib/devbox/runtime' import { getServerSession } from '@/lib/session/get-server-session' -import { appendTaskMessage } from '@/lib/task-messages' import { generateBranchName, createFallbackBranchName } from '@/lib/utils/branch-name-generator' import { generateId } from '@/lib/utils/id' import { checkRateLimit } from '@/lib/utils/rate-limit' import { createTaskLogger } from '@/lib/utils/task-logger' -import { formatKeyTaskLogMessage, TASK_FLOW_LOGS } from '@/lib/utils/task-flow-logs' import { generateTaskTitle, createFallbackTitle } from '@/lib/utils/title-generator' export const runtime = 'nodejs' @@ -190,44 +187,16 @@ export async function POST(request: NextRequest) { } }) - const logger = createTaskLogger(taskId) - let userMessagePersisted = false - const userInputReceivedLog = formatKeyTaskLogMessage(TASK_FLOW_LOGS.USER_INPUT_RECEIVED, { - promptChars: validatedData.prompt.length, - source: 'task-create', - selectedModel, - }) - await logger.info(userInputReceivedLog) - console.info(userInputReceivedLog) - try { - await appendTaskMessage({ - taskId, - role: 'user', - content: validatedData.prompt, - }) - userMessagePersisted = true - const userInputSavedLog = formatKeyTaskLogMessage(TASK_FLOW_LOGS.USER_INPUT_SAVED, { - promptChars: validatedData.prompt.length, + const startedTurn = await startTaskChatV2Turn({ + task: newTask, + prompt: validatedData.prompt, source: 'task-create', }) - await logger.info(userInputSavedLog) - console.info(userInputSavedLog) - } catch { - console.error('Failed to persist task user message') - } - - try { - await ensureTaskDevboxRuntime(newTask, { logger }) - - const startedTurn = await startCodexGatewayTaskTurn(taskId, validatedData.prompt, { - appendUserMessage: !userMessagePersisted, - model: selectedModel, - }) after(async () => { try { - await waitForCodexGatewayTurnCompletion(startedTurn) + await finalizeTaskChatV2Turn(startedTurn.startedTurn) } catch { console.error('Failed to finalize Codex task') @@ -240,6 +209,7 @@ export async function POST(request: NextRequest) { }) .where(eq(tasks.id, taskId)) + const logger = createTaskLogger(taskId) await logger.error('Failed to finalize Codex task') } }) @@ -255,6 +225,7 @@ export async function POST(request: NextRequest) { }) .where(eq(tasks.id, taskId)) + const logger = createTaskLogger(taskId) await logger.error('Failed to start Codex gateway task') return NextResponse.json({ error: 'Failed to start Codex gateway task' }, { status: 500 }) } diff --git a/codex-gateway b/codex-gateway new file mode 160000 index 0000000..be4e22c --- /dev/null +++ b/codex-gateway @@ -0,0 +1 @@ +Subproject commit be4e22c8e134048bb84e46cb31f55397c5393c04 diff --git a/components/home-page-content.tsx b/components/home-page-content.tsx index 4ac7a0c..05c14ef 100644 --- a/components/home-page-content.tsx +++ b/components/home-page-content.tsx @@ -29,7 +29,6 @@ import { multiRepoModeAtom, selectedReposAtom } from '@/lib/atoms/multi-repo' import { sessionAtom } from '@/lib/atoms/session' import { githubConnectionAtom, githubConnectionInitializedAtom } from '@/lib/atoms/github-connection' import { OpenRepoUrlDialog } from '@/components/open-repo-url-dialog' -import { MultiRepoDialog } from '@/components/multi-repo-dialog' interface HomePageContentProps { initialSelectedOwner?: string @@ -62,7 +61,6 @@ export function HomePageContent({ const [loadingGitHub, setLoadingGitHub] = useState(false) const [isRefreshing, setIsRefreshing] = useState(false) const [showOpenRepoDialog, setShowOpenRepoDialog] = useState(false) - const [showMultiRepoDialog, setShowMultiRepoDialog] = useState(false) const router = useRouter() const searchParams = useSearchParams() const { refreshTasks, addTaskOptimistically } = useTasks() @@ -265,7 +263,6 @@ export function HomePageContent({ onOwnerChange={handleOwnerChange} onRepoChange={handleRepoChange} size="sm" - onMultiRepoClick={() => setShowMultiRepoDialog(true)} /> @@ -557,6 +554,7 @@ export function HomePageContent({ - {/* Sign In Dialog */} diff --git a/components/repo-selector.tsx b/components/repo-selector.tsx index 83708d0..4cc66a6 100644 --- a/components/repo-selector.tsx +++ b/components/repo-selector.tsx @@ -3,11 +3,10 @@ import { useState, useEffect, useRef } from 'react' import { Input } from '@/components/ui/input' import { Select, SelectContent, SelectItem, SelectTrigger, SelectValue } from '@/components/ui/select' -import { Lock, Loader2, Layers } from 'lucide-react' +import { Lock, Loader2 } from 'lucide-react' import { useAtomValue, useSetAtom, useAtom } from 'jotai' import { githubConnectionAtom } from '@/lib/atoms/github-connection' import { githubOwnersAtom, githubReposAtomFamily } from '@/lib/atoms/github-cache' -import { multiRepoModeAtom, selectedReposAtom } from '@/lib/atoms/multi-repo' interface GitHubOwner { login: string @@ -31,7 +30,6 @@ interface RepoSelectorProps { onRepoChange: (repo: string) => void disabled?: boolean size?: 'sm' | 'default' - onMultiRepoClick?: () => void } export function RepoSelector({ @@ -41,7 +39,6 @@ export function RepoSelector({ onRepoChange, disabled = false, size = 'default', - onMultiRepoClick, }: RepoSelectorProps) { const [repoFilter, setRepoFilter] = useState('') // Initialize with selected owner to prevent flash @@ -54,10 +51,6 @@ export function RepoSelector({ const [temporaryOwner, setTemporaryOwner] = useState(null) const [temporaryRepo, setTemporaryRepo] = useState(null) - // Multi-repo mode state - const [multiRepoMode, setMultiRepoMode] = useAtom(multiRepoModeAtom) - const selectedRepos = useAtomValue(selectedReposAtom) - // Ref for the filter input to focus it when dropdown opens const filterInputRef = useRef(null) @@ -413,16 +406,6 @@ export function RepoSelector({ } const handleOwnerChange = (value: string) => { - if (value === '__many__') { - // Enable multi-repo mode - setMultiRepoMode(true) - onMultiRepoClick?.() - return - } - - // Disable multi-repo mode when selecting a specific owner - setMultiRepoMode(false) - onOwnerChange(value) onRepoChange('') // Reset repo when owner changes setRepoFilter('') // Reset filter when owner changes @@ -482,22 +465,13 @@ export function RepoSelector({ return (
- {showOwnersLoading ? (
Loading...
- ) : multiRepoMode ? ( -
- - Multi-repo -
) : size === 'sm' && selectedOwnerData ? ( // Mobile: Show only avatar
@@ -515,14 +489,6 @@ export function RepoSelector({ )} - {/* Multi-repo option */} - -
- - Multi-repo -
-
-
{displayedOwners && displayedOwners.map((owner) => ( @@ -535,84 +501,74 @@ export function RepoSelector({ - {/* Show "X repo(s) selected" button in multi-repo mode, or regular repo dropdown otherwise */} - {multiRepoMode ? ( - - ) : ( - selectedOwner && ( - <> - / - - 50 - ? `Filter ${repos?.length || 0} repositories...` - : 'Filter repositories...' - } - value={repoFilter} - onChange={(e) => setRepoFilter(e.target.value)} - disabled={disabled} - className="text-base md:text-sm h-8" - onClick={(e) => e.stopPropagation()} - onKeyDown={(e) => e.stopPropagation()} - /> -
- )} - {filteredRepos.length === 0 && repoFilter ? ( -
- No repositories match "{repoFilter}" -
- ) : showReposLoading ? ( -
- - Loading repositories... -
- ) : ( - <> - {displayedRepos.map((repo) => ( - -
- {repo.name} - {repo.private && } -
-
- ))} - {hasMoreRepos && ( -
- Showing first 50 of {repos?.length || 0} repositories. Use filter to find more. + {selectedOwner && ( + <> + / + + 50 + ? `Filter ${repos?.length || 0} repositories...` + : 'Filter repositories...' + } + value={repoFilter} + onChange={(e) => setRepoFilter(e.target.value)} + disabled={disabled} + className="text-base md:text-sm h-8" + onClick={(e) => e.stopPropagation()} + onKeyDown={(e) => e.stopPropagation()} + /> +
+ )} + {filteredRepos.length === 0 && repoFilter ? ( +
+ No repositories match "{repoFilter}" +
+ ) : showReposLoading ? ( +
+ + Loading repositories... +
+ ) : ( + <> + {displayedRepos.map((repo) => ( + +
+ {repo.name} + {repo.private && }
- )} - - )} -
- - - ) + + ))} + {hasMoreRepos && ( +
+ Showing first 50 of {repos?.length || 0} repositories. Use filter to find more. +
+ )} + + )} + + + )}
) diff --git a/components/sealos-home-page-content.tsx b/components/sealos-home-page-content.tsx index 9d1d5e8..7cc0435 100644 --- a/components/sealos-home-page-content.tsx +++ b/components/sealos-home-page-content.tsx @@ -169,6 +169,7 @@ export function SealosHomePageContent({ + } + + if (tone === 'error') { + return + } + + if (tone === 'warning') { + return + } + + return +} + +export const TaskAgentActivity = memo(function TaskAgentActivity({ isStreaming, items }: TaskAgentActivityProps) { + const visibleItems = items.slice(-8).toReversed() + + if (visibleItems.length === 0) { + return null + } + + return ( +
+
+
Agent Activity
+ {isStreaming ? ( +
+ + Live +
+ ) : null} +
+
+ {visibleItems.map((item) => ( +
+
{getToneIcon(item.tone)}
+
+
+ {item.label} + + {new Date(item.occurredAt).toLocaleTimeString()} + +
+ {item.detail ? ( +
+ {item.detail} +
+ ) : null} +
+
+ ))} +
+
+ ) +}) diff --git a/components/task-chat-transcript.tsx b/components/task-chat-transcript.tsx index 46f089c..eda76b4 100644 --- a/components/task-chat-transcript.tsx +++ b/components/task-chat-transcript.tsx @@ -3,14 +3,18 @@ import { memo, useDeferredValue, useEffect, useMemo, useRef } from 'react' import { Check, Copy, Loader2, RotateCcw } from 'lucide-react' import type { LogEntry, Task } from '@/lib/db/schema' +import { TaskAgentActivity } from '@/components/task-agent-activity' import type { ChatTaskMessage } from '@/lib/task-chat' import { buildChatTurns, parseTaskAgentMessage } from '@/lib/task-chat' +import type { TaskAgentActivityItem } from '@/lib/task-agent-events' import { TaskChatMarkdown } from '@/components/task-chat-markdown' import { cn } from '@/lib/utils' interface TaskChatTranscriptProps { + activityItems: TaskAgentActivityItem[] copiedMessageId: string | null isGatewayTask: boolean + isStreaming: boolean logs: LogEntry[] messages: ChatTaskMessage[] onCopyMessage: (messageId: string, content: string) => void @@ -23,8 +27,10 @@ function isTaskProcessing(status: Task['status']): boolean { } export const TaskChatTranscript = memo(function TaskChatTranscript({ + activityItems, copiedMessageId, isGatewayTask, + isStreaming, logs, messages, onCopyMessage, @@ -35,7 +41,7 @@ export const TaskChatTranscript = memo(function TaskChatTranscript({ const wasAtBottomRef = useRef(true) const deferredMessages = useDeferredValue(messages) const turns = useMemo(() => buildChatTurns(deferredMessages), [deferredMessages]) - const isProcessing = isTaskProcessing(status) + const isProcessing = isStreaming || isTaskProcessing(status) const visibleLogs = useMemo(() => logs.filter((entry) => !entry.message.startsWith('[SERVER]')).slice(-6), [logs]) @@ -90,6 +96,7 @@ export const TaskChatTranscript = memo(function TaskChatTranscript({ return (
+ {isGatewayTask ? : null} {turns.map((turn, index) => { const isLastTurn = index === turns.length - 1 const showWaitingState = isLastTurn && isProcessing && turn.agentMessages.length === 0 diff --git a/components/task-chat.tsx b/components/task-chat.tsx index 218c093..45e9d9b 100644 --- a/components/task-chat.tsx +++ b/components/task-chat.tsx @@ -21,7 +21,7 @@ import { TaskChatComposer } from '@/components/task-chat-composer' import { TaskChatMarkdown } from '@/components/task-chat-markdown' import { TaskChatTranscript } from '@/components/task-chat-transcript' import { DropdownMenu, DropdownMenuContent, DropdownMenuItem, DropdownMenuTrigger } from '@/components/ui/dropdown-menu' -import { useTaskChatMessages } from '@/lib/hooks/use-task-chat-messages' +import { useTaskAgentChatV2 } from '@/lib/hooks/use-task-agent-chat-v2' interface TaskChatProps { taskId: string @@ -57,10 +57,6 @@ interface DeploymentInfo { createdAt?: string } -function isTaskProcessing(status: Task['status']): boolean { - return status === 'processing' || status === 'pending' -} - export function TaskChat({ taskId, task, chatOnly = false }: TaskChatProps) { const [activeTab, setActiveTab] = useState<'chat' | 'comments' | 'actions' | 'deployments'>('chat') const [newMessage, setNewMessage] = useAtom(taskChatInputAtomFamily(taskId)) @@ -79,17 +75,19 @@ export function TaskChat({ taskId, task, chatOnly = false }: TaskChatProps) { const deploymentLoadedRef = useRef(false) const { + activityItems, error: chatError, isGatewayTask, isLoading, isSending, isStopping, + isStreaming, messages, refreshMessages, retryMessage, sendMessage, stopTask, - } = useTaskChatMessages(taskId, task) + } = useTaskAgentChatV2(taskId, task) const fetchPRComments = useCallback( async (showLoading = true) => { @@ -247,11 +245,11 @@ export function TaskChat({ taskId, task, chatOnly = false }: TaskChatProps) { const handleStopTask = useCallback(async () => { const result = await stopTask() if (result.success) { - toast.success('Task stopped successfully') + toast.success('Generation stopped') return } - toast.error(result.error || 'Failed to stop task') + toast.error(result.error || 'Failed to stop generation') }, [stopTask]) const handleCopyMessage = useCallback(async (messageId: string, content: string) => { @@ -584,8 +582,10 @@ export function TaskChat({ taskId, task, chatOnly = false }: TaskChatProps) { return ( void isSubmitting: boolean + isAuthenticated: boolean selectedOwner: string selectedRepo: string initialInstallDependencies?: boolean @@ -45,6 +46,7 @@ const FIXED_TASK_MODEL = 'gpt-5.4' export function TaskForm({ onSubmit, isSubmitting, + isAuthenticated, selectedOwner, selectedRepo, initialMaxDuration = 300, @@ -119,28 +121,54 @@ export function TaskForm({ }) } + const hasSelectedRepo = Boolean(selectedOwner && selectedRepo) + + const title = !isAuthenticated + ? 'Sign In to Deploy on Sealos' + : hasSelectedRepo + ? 'Deploy Your Project to Sealos' + : 'Choose a Repository to Deploy' + + const description = !isAuthenticated + ? 'Sign in first, then choose a GitHub repository and tell Sealos how it should be analyzed, built, and deployed.' + : hasSelectedRepo + ? 'Tell Sealos what you want to do with this repository. A simple deployment request is enough.' + : 'Connect GitHub if needed, choose a repository, then describe how Sealos should analyze, build, and deploy it.' + + const repoBannerText = hasSelectedRepo + ? `${selectedOwner}/${selectedRepo}` + : !isAuthenticated + ? 'Sign in to choose a GitHub repository.' + : 'Choose a GitHub repository from the header to begin.' + + const placeholder = !isAuthenticated + ? 'Sign in first. After that, choose a repository and describe the deployment task here.' + : hasSelectedRepo + ? 'For example: deploy this repository to Sealos.' + : 'Choose a repository first. Then describe how Sealos should build and deploy it.' + + const helperText = !isAuthenticated + ? 'Start by signing in. After you pick a repository, you can describe the deployment task here.' + : hasSelectedRepo + ? '' + : 'After you pick a repository, describe what Sealos should build and deploy.' + return (
-

Deploy Your Project to Sealos

-

- Select a GitHub repository and describe how it should be analyzed, built, and deployed. -

+

{title}

+

{description}

-
- {selectedOwner && selectedRepo - ? `${selectedOwner}/${selectedRepo}` - : 'Select a GitHub repository to begin.'} -
+
{repoBannerText}