Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion app/api/tasks/[taskId]/chat/interrupt/route.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { and, eq, isNull } from 'drizzle-orm'
import { NextResponse } from 'next/server'
import { CodexGatewayApiError, interruptCodexGatewayTurn } from '@/lib/codex-gateway/client'
import { hasActiveTurnCheckpoint } from '@/lib/codex-gateway/completion'
import { hasActiveTurnCheckpoint, reconcileIncompleteTurnSafely } from '@/lib/codex-gateway/completion'
import { getTaskGatewayContext } from '@/lib/codex-gateway/task'
import { db } from '@/lib/db/client'
import { tasks } from '@/lib/db/schema'
Expand Down Expand Up @@ -49,6 +49,9 @@ export async function POST(_request: Request, { params }: RouteParams) {
}

const result = await interruptCodexGatewayTurn(gatewayUrl, task.activeTurnSessionId, gatewayAuthToken)
await reconcileIncompleteTurnSafely(taskId, 2_500).catch(() => {
console.error('Failed to reconcile interrupted chat turn')
})

return NextResponse.json({
success: true,
Expand Down
23 changes: 4 additions & 19 deletions app/api/tasks/[taskId]/chat/v2/route.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import { after, NextRequest, NextResponse } from 'next/server'
import { NextRequest, NextResponse } from 'next/server'
import { and, asc, eq, isNull } from 'drizzle-orm'
import { z } from 'zod'
import { CodexGatewayApiError } from '@/lib/codex-gateway/client'
import { finalizeTaskChatV2Turn, startTaskChatV2Turn } from '@/lib/codex-gateway/chat-v2-service'
import { startTaskChatV2Turn } from '@/lib/codex-gateway/chat-v2-service'
import {
hasActiveTurnCheckpoint,
reconcileIncompleteTurnSafely,
Expand All @@ -25,6 +25,7 @@ export const dynamic = 'force-dynamic'
export const maxDuration = 300

const turnSchema = z.object({
clientMessageId: z.string().trim().min(1).optional(),
prompt: z.string().trim().min(1, 'Prompt is required'),
})

Expand Down Expand Up @@ -158,27 +159,11 @@ export async function POST(request: NextRequest, { params }: RouteParams) {

const result = await startTaskChatV2Turn({
task,
clientMessageId: parsed.data.clientMessageId,
prompt: parsed.data.prompt,
source: 'chat-v2',
})

after(async () => {
try {
await finalizeTaskChatV2Turn(result.startedTurn)
} catch (error) {
console.error('Failed to finalize chat v2 turn:', error)

await db
.update(tasks)
.set({
status: 'error',
error: 'Failed to finalize chat turn',
updatedAt: new Date(),
})
.where(eq(tasks.id, resolvedTaskId))
}
})

return NextResponse.json({
success: true,
data: {
Expand Down
36 changes: 36 additions & 0 deletions app/api/tasks/[taskId]/chat/v2/stream/route.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { NextRequest, NextResponse } from 'next/server'
import { finalizeActiveTurnFailure, reconcileIncompleteTurnSafely } from '@/lib/codex-gateway/completion'
import { getCodexGatewayEventStreamUrl } from '@/lib/codex-gateway/client'
import { getTaskGatewayContext } from '@/lib/codex-gateway/task'
import type { CodexGatewayState } from '@/lib/codex-gateway/types'
Expand Down Expand Up @@ -114,6 +115,9 @@ async function persistGatewayEvent(input: {

if (!state.activeTurn && state.lastTurnStatus) {
await closeTaskStream(input.streamId, 'closed')
await reconcileIncompleteTurnSafely(input.taskId, 2_500).catch(() => {
console.error('Failed to reconcile chat v2 stream terminal state')
})
}

return
Expand All @@ -129,9 +133,23 @@ async function persistGatewayEvent(input: {

if (eventKind === 'gateway.session.closed') {
await closeTaskStream(input.streamId, 'closed')
await reconcileIncompleteTurnSafely(input.taskId, 2_500).catch(() => {
console.error('Failed to reconcile chat v2 session closure')
})
}
}

async function persistMissingSessionFailure(taskId: string, sessionId: string) {
await finalizeActiveTurnFailure({
taskId,
sessionId,
error: 'Codex gateway session is no longer available',
clearGatewaySession: true,
}).catch(() => {
console.error('Failed to persist missing Codex gateway session')
})
}

export async function GET(request: NextRequest, { params }: RouteParams) {
const decoder = new TextDecoder()
let streamId: string | null = null
Expand Down Expand Up @@ -183,6 +201,15 @@ export async function GET(request: NextRequest, { params }: RouteParams) {

if (!upstream.ok || !upstream.body) {
await closeTaskStream(resolvedStreamId, 'errored')

if (upstream.status === 404 || upstream.status === 410) {
await persistMissingSessionFailure(resolvedTaskId, stream.sessionId)
} else {
await reconcileIncompleteTurnSafely(resolvedTaskId, 2_500).catch(() => {
console.error('Failed to reconcile chat v2 stream connection error')
})
}

return NextResponse.json({ error: 'Failed to connect to Codex gateway events' }, { status: 502 })
}

Expand Down Expand Up @@ -264,6 +291,9 @@ export async function GET(request: NextRequest, { params }: RouteParams) {
controller.close()
} catch (error) {
await closeTaskStream(resolvedStreamId, 'errored')
await reconcileIncompleteTurnSafely(resolvedTaskId, 2_500).catch(() => {
console.error('Failed to reconcile chat v2 stream reader error')
})
try {
controller.close()
} catch {
Expand All @@ -289,6 +319,12 @@ export async function GET(request: NextRequest, { params }: RouteParams) {
})
}

if (taskId) {
await reconcileIncompleteTurnSafely(taskId, 2_500).catch(() => {
console.error('Failed to reconcile chat v2 stream proxy error')
})
}

console.error('Failed to proxy chat v2 stream:', error)
return NextResponse.json({ error: 'Failed to proxy chat stream' }, { status: 500 })
}
Expand Down
85 changes: 22 additions & 63 deletions app/api/tasks/[taskId]/continue/route.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
import { NextRequest, NextResponse, after } from 'next/server'
import { and, eq, isNull } from 'drizzle-orm'
import { NextRequest, NextResponse } from 'next/server'
import { startTaskChatV2Turn } from '@/lib/codex-gateway/chat-v2-service'
import { db } from '@/lib/db/client'
import { tasks } from '@/lib/db/schema'
import { startCodexGatewayTaskTurn, waitForCodexGatewayTurnCompletion } from '@/lib/codex-gateway/runner'
import { ensureTaskDevboxRuntime } from '@/lib/devbox/runtime'
import { prependSealosDeployContext } from '@/lib/sealos-deploy-context'
import { checkRateLimit } from '@/lib/utils/rate-limit'
import { createTaskLogger } from '@/lib/utils/task-logger'
import { getServerSession } from '@/lib/session/get-server-session'
import { appendTaskMessage } from '@/lib/task-messages'
import { generateId } from '@/lib/utils/id'
import { checkRateLimit } from '@/lib/utils/rate-limit'

function buildCompatClientMessageId(): string {
return `continue-compat:${generateId(16)}`
}

export async function POST(req: NextRequest, context: { params: Promise<{ taskId: string }> }) {
try {
Expand All @@ -32,15 +33,16 @@ export async function POST(req: NextRequest, context: { params: Promise<{ taskId
}

const { taskId } = await context.params
const body = await req.json()
const { message } = body
const body = (await req.json().catch(() => ({}))) as {
clientMessageId?: string
message?: unknown
}
const message = typeof body.message === 'string' ? body.message.trim() : ''

if (!message || typeof message !== 'string' || !message.trim()) {
if (!message) {
return NextResponse.json({ error: 'Message is required' }, { status: 400 })
}

const trimmedMessage = message.trim()

const [task] = await db
.select()
.from(tasks)
Expand All @@ -55,57 +57,14 @@ export async function POST(req: NextRequest, context: { params: Promise<{ taskId
return NextResponse.json({ error: 'Unsupported agent' }, { status: 400 })
}

let userMessagePersisted = false

try {
await appendTaskMessage({
taskId,
role: 'user',
content: trimmedMessage,
})
userMessagePersisted = true
} catch {
console.error('Failed to persist follow-up user message')
}

await db
.update(tasks)
.set({
status: 'processing',
progress: 0,
error: null,
completedAt: null,
updatedAt: new Date(),
})
.where(eq(tasks.id, taskId))

after(async () => {
const logger = createTaskLogger(taskId)

try {
const runtime = await ensureTaskDevboxRuntime(task, { logger })
const gatewayPrompt = prependSealosDeployContext(trimmedMessage, runtime.namespace || task.runtimeNamespace)

const startedTurn = await startCodexGatewayTaskTurn(taskId, gatewayPrompt, {
appendUserMessage: !userMessagePersisted,
model: task.selectedModel,
})

await waitForCodexGatewayTurnCompletion(startedTurn)
} catch {
console.error('Failed to finalize Codex gateway follow-up')

await db
.update(tasks)
.set({
status: 'error',
error: 'Failed to finalize Codex gateway follow-up',
updatedAt: new Date(),
})
.where(eq(tasks.id, taskId))

await logger.error('Failed to finalize Codex gateway follow-up')
}
await startTaskChatV2Turn({
task,
clientMessageId:
typeof body.clientMessageId === 'string' && body.clientMessageId.trim()
? body.clientMessageId.trim()
: buildCompatClientMessageId(),
prompt: message,
source: 'continue-compat',
})

return NextResponse.json({ success: true })
Expand Down
6 changes: 6 additions & 0 deletions app/api/tasks/[taskId]/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,12 @@ export async function PATCH(request: NextRequest, { params }: RouteParams) {
// Log the stop request
await logger.info('Stop request received - terminating task execution...')

if (hasActiveTurnCheckpoint(existingTask)) {
await reconcileIncompleteTurnSafely(taskId, 1_500).catch(() => {
console.error('Failed to reconcile task before stop')
})
}

const { gatewayUrl, gatewayAuthToken } = await getTaskGatewayContext(taskId, session.user.id)

if (existingTask.gatewaySessionId && gatewayUrl) {
Expand Down
25 changes: 3 additions & 22 deletions app/api/tasks/route.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { NextRequest, NextResponse, after } from 'next/server'
import { and, desc, eq, isNull, or } from 'drizzle-orm'
import { finalizeTaskChatV2Turn, startTaskChatV2Turn } from '@/lib/codex-gateway/chat-v2-service'
import { startTaskChatV2Turn } from '@/lib/codex-gateway/chat-v2-service'
import { FORCED_CODEX_MODEL } from '@/lib/codex/defaults'
import { db } from '@/lib/db/client'
import { insertTaskSchema, tasks } from '@/lib/db/schema'
Expand Down Expand Up @@ -188,31 +188,12 @@ export async function POST(request: NextRequest) {
})

try {
const startedTurn = await startTaskChatV2Turn({
await startTaskChatV2Turn({
task: newTask,
clientMessageId: `task-create:${taskId}`,
prompt: validatedData.prompt,
source: 'task-create',
})

after(async () => {
try {
await finalizeTaskChatV2Turn(startedTurn.startedTurn)
} catch {
console.error('Failed to finalize Codex task')

await db
.update(tasks)
.set({
status: 'error',
error: 'Failed to finalize Codex task',
updatedAt: new Date(),
})
.where(eq(tasks.id, taskId))

const logger = createTaskLogger(taskId)
await logger.error('Failed to finalize Codex task')
}
})
} catch {
console.error('Failed to start Codex task')

Expand Down
13 changes: 11 additions & 2 deletions components/merge-pr-dialog.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,14 @@ interface MergePRDialogProps {
onMergeInitiated?: () => void
}

function buildClientMessageId(prefix: string): string {
if (typeof crypto !== 'undefined' && typeof crypto.randomUUID === 'function') {
return `${prefix}:${crypto.randomUUID()}`
}

return `${prefix}:${Date.now()}-${Math.random().toString(36).slice(2, 10)}`
}

export function MergePRDialog({
taskId,
prUrl: _prUrl,
Expand Down Expand Up @@ -89,13 +97,14 @@ export function MergePRDialog({

try {
// Send a follow-up message to the current task to fix merge conflicts
const response = await fetch(`/api/tasks/${taskId}/continue`, {
const response = await fetch(`/api/tasks/${taskId}/chat/v2`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({
message:
clientMessageId: buildClientMessageId('merge-conflict-follow-up'),
prompt:
'Fix merge conflicts in the current branch and prepare it for merging. Review the conflicting changes carefully and resolve them intelligently, preserving the intent of both sets of changes where possible.',
}),
})
Expand Down
2 changes: 2 additions & 0 deletions lib/codex-gateway/chat-v2-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ export interface StartTaskChatV2TurnResult {
}

export async function startTaskChatV2Turn(input: {
clientMessageId?: string
prompt: string
source: string
task: Task
Expand All @@ -31,6 +32,7 @@ export async function startTaskChatV2Turn(input: {

await appendUserMessageEvent({
taskId: input.task.id,
clientMessageId: input.clientMessageId,
content: input.prompt,
source: input.source,
})
Expand Down
Loading
Loading