diff --git a/.github/workflows/pr-checks.yml b/.github/workflows/pr-checks.yml index b1dd71b..7f95c42 100644 --- a/.github/workflows/pr-checks.yml +++ b/.github/workflows/pr-checks.yml @@ -19,8 +19,6 @@ jobs: - name: Setup pnpm uses: pnpm/action-setup@v4 - with: - version: 9 - name: Get pnpm store directory shell: bash @@ -46,4 +44,3 @@ jobs: - name: Run build run: pnpm build - diff --git a/app/api/tasks/[taskId]/chat/interrupt/route.ts b/app/api/tasks/[taskId]/chat/interrupt/route.ts index e0093bc..f510ccf 100644 --- a/app/api/tasks/[taskId]/chat/interrupt/route.ts +++ b/app/api/tasks/[taskId]/chat/interrupt/route.ts @@ -1,7 +1,11 @@ import { and, eq, isNull } from 'drizzle-orm' import { NextResponse } from 'next/server' import { CodexGatewayApiError, interruptCodexGatewayTurn } from '@/lib/codex-gateway/client' -import { hasActiveTurnCheckpoint, reconcileIncompleteTurnSafely } from '@/lib/codex-gateway/completion' +import { + finalizeActiveTurnInterrupted, + hasActiveTurnCheckpoint, + reconcileIncompleteTurnSafely, +} from '@/lib/codex-gateway/completion' import { getTaskGatewayContext } from '@/lib/codex-gateway/task' import { db } from '@/lib/db/client' import { tasks } from '@/lib/db/schema' @@ -44,22 +48,56 @@ export async function POST(_request: Request, { params }: RouteParams) { } const { gatewayUrl, gatewayAuthToken } = await getTaskGatewayContext(taskId, session.user.id) + + const finalizeInterruptedLocally = async () => { + await finalizeActiveTurnInterrupted({ + taskId, + sessionId: task.activeTurnSessionId, + clearGatewaySession: true, + }) + } + if (!gatewayUrl) { - return NextResponse.json({ error: 'Gateway URL is not configured' }, { status: 400 }) + await finalizeInterruptedLocally() + return NextResponse.json({ + success: true, + data: { + sessionId: task.activeTurnSessionId, + state: null, + }, + }) } - const result = await interruptCodexGatewayTurn(gatewayUrl, task.activeTurnSessionId, gatewayAuthToken) - await reconcileIncompleteTurnSafely(taskId, 2_500).catch(() => { - console.error('Failed to reconcile interrupted chat turn') - }) + try { + const result = await interruptCodexGatewayTurn(gatewayUrl, task.activeTurnSessionId, gatewayAuthToken) + const reconciledTask = await reconcileIncompleteTurnSafely(taskId, 2_500).catch(() => { + console.error('Failed to reconcile interrupted chat turn') + return null + }) - return NextResponse.json({ - success: true, - data: { - sessionId: result.sessionId, - state: result.state, - }, - }) + if (reconciledTask && hasActiveTurnCheckpoint(reconciledTask)) { + await finalizeInterruptedLocally() + } else if (!reconciledTask) { + await finalizeInterruptedLocally() + } + + return NextResponse.json({ + success: true, + data: { + sessionId: result.sessionId, + state: result.state, + }, + }) + } catch { + await finalizeInterruptedLocally() + return NextResponse.json({ + success: true, + data: { + sessionId: task.activeTurnSessionId, + state: null, + }, + }) + } } catch (error) { if (error instanceof CodexGatewayApiError) { return NextResponse.json( diff --git a/app/api/tasks/[taskId]/chat/v2/stream/route.ts b/app/api/tasks/[taskId]/chat/v2/stream/route.ts index da1b9de..ed8a620 100644 --- a/app/api/tasks/[taskId]/chat/v2/stream/route.ts +++ b/app/api/tasks/[taskId]/chat/v2/stream/route.ts @@ -1,14 +1,18 @@ import { NextRequest, NextResponse } from 'next/server' import { finalizeActiveTurnFailure, reconcileIncompleteTurnSafely } from '@/lib/codex-gateway/completion' import { getCodexGatewayEventStreamUrl } from '@/lib/codex-gateway/client' +import { diagnoseCodexTurnFailure } from '@/lib/codex-gateway/failure-diagnostics' import { getTaskGatewayContext } from '@/lib/codex-gateway/task' import type { CodexGatewayState } from '@/lib/codex-gateway/types' import { closeTaskStream, getTaskStream, recordTaskEvent, touchTaskStream } from '@/lib/task-events' +import { formatKeyTaskLogMessage, TASK_FLOW_LOGS } from '@/lib/utils/task-flow-logs' import { getServerSession } from '@/lib/session/get-server-session' export const runtime = 'nodejs' export const dynamic = 'force-dynamic' -export const maxDuration = 300 +export const maxDuration = 1800 + +const STREAM_HEARTBEAT_INTERVAL_MS = 10_000 interface RouteParams { params: Promise<{ @@ -63,6 +67,10 @@ function parseSseBlock(block: string): { } } +function encodeSseBlock(encoder: TextEncoder, block: string): Uint8Array { + return encoder.encode(`${block}\n\n`) +} + async function persistGatewayEvent(input: { eventName: string payload: Record | null @@ -150,9 +158,38 @@ async function persistMissingSessionFailure(taskId: string, sessionId: string) { }) } +async function logStreamFailureDiagnostic(input: { + fallbackError: string + httpStatus?: number + sessionId?: string | null + taskId: string + turnStatus?: string | null +}) { + const diagnostic = await diagnoseCodexTurnFailure({ + taskId: input.taskId, + sessionId: input.sessionId, + fallbackError: input.fallbackError, + httpStatus: input.httpStatus, + turnStatus: input.turnStatus, + }) + + console.info( + formatKeyTaskLogMessage(TASK_FLOW_LOGS.GATEWAY_STREAM_RECONNECTING, { + sessionId: input.sessionId ?? null, + streamState: 'errored', + errorSource: diagnostic.source, + httpStatus: input.httpStatus, + turnStatus: input.turnStatus ?? null, + }), + ) + console.error('Chat v2 stream upstream failed', diagnostic) +} + export async function GET(request: NextRequest, { params }: RouteParams) { const decoder = new TextDecoder() + const encoder = new TextEncoder() let streamId: string | null = null + let streamSessionId: string | null = null let taskId: string | null = null try { @@ -175,6 +212,7 @@ export async function GET(request: NextRequest, { params }: RouteParams) { if (!stream || stream.taskId !== resolvedTaskId || stream.status !== 'active') { return NextResponse.json({ error: 'Stream not found' }, { status: 404 }) } + streamSessionId = stream.sessionId const { task, gatewayUrl, gatewayAuthToken } = await getTaskGatewayContext(resolvedTaskId, session.user.id) @@ -191,19 +229,45 @@ export async function GET(request: NextRequest, { params }: RouteParams) { return NextResponse.json({ error: 'Stream session is no longer active' }, { status: 410 }) } + const streamState = + stream.lastEventAt.getTime() > stream.startedAt.getTime() ? ('resumed' as const) : ('connected' as const) + console.info( + formatKeyTaskLogMessage( + streamState === 'resumed' ? TASK_FLOW_LOGS.GATEWAY_STREAM_RESUMED : TASK_FLOW_LOGS.GATEWAY_STREAM_CONNECTED, + { + sessionId: stream.sessionId, + streamState, + }, + ), + ) + const upstream = await fetch(getCodexGatewayEventStreamUrl(gatewayUrl, stream.sessionId, gatewayAuthToken), { headers: { accept: 'text/event-stream', }, cache: 'no-store', - signal: AbortSignal.timeout(15_000), }) if (!upstream.ok || !upstream.body) { await closeTaskStream(resolvedStreamId, 'errored') + await logStreamFailureDiagnostic({ + taskId: resolvedTaskId, + sessionId: stream.sessionId, + fallbackError: 'Codex gateway turn failed', + httpStatus: upstream.status, + }) if (upstream.status === 404 || upstream.status === 410) { await persistMissingSessionFailure(resolvedTaskId, stream.sessionId) + } else if (upstream.status >= 500 && upstream.status < 600) { + await finalizeActiveTurnFailure({ + taskId: resolvedTaskId, + sessionId: stream.sessionId, + error: 'Codex gateway turn failed', + clearGatewaySession: true, + }).catch(() => { + console.error('Failed to force finalize chat v2 stream connection error') + }) } else { await reconcileIncompleteTurnSafely(resolvedTaskId, 2_500).catch(() => { console.error('Failed to reconcile chat v2 stream connection error') @@ -226,11 +290,12 @@ export async function GET(request: NextRequest, { params }: RouteParams) { const reader = upstream.body.getReader() let sseBuffer = '' + const heartbeatChunk = encoder.encode(': ping\n\n') const handleSseBlock = async (block: string) => { const parsedBlock = parseSseBlock(block) if (!parsedBlock || !parsedBlock.dataText) { - return + return encodeSseBlock(encoder, block) } try { @@ -243,12 +308,16 @@ export async function GET(request: NextRequest, { params }: RouteParams) { payload, transcriptCursor, }) - } catch { - console.error('Failed to persist gateway stream event') + } catch (error) { + console.error('Failed to persist gateway stream event', error) } + + return encodeSseBlock(encoder, block) } const flushBufferedEvents = async (flushAll: boolean) => { + const encodedBlocks: Uint8Array[] = [] + while (true) { const separatorMatch = sseBuffer.match(/\r?\n\r?\n/) if (!separatorMatch || separatorMatch.index === undefined) { @@ -257,18 +326,32 @@ export async function GET(request: NextRequest, { params }: RouteParams) { const block = sseBuffer.slice(0, separatorMatch.index) sseBuffer = sseBuffer.slice(separatorMatch.index + separatorMatch[0].length) - await handleSseBlock(block) + encodedBlocks.push(await handleSseBlock(block)) } if (flushAll && sseBuffer.trim()) { - const finalBlock = sseBuffer + console.error('Chat v2 stream ended with incomplete SSE block') sseBuffer = '' - await handleSseBlock(finalBlock) } + + return encodedBlocks } const streamResponse = new ReadableStream({ async start(controller) { + let closed = false + const heartbeatTimer = setInterval(() => { + if (closed || sseBuffer.trim()) { + return + } + + try { + controller.enqueue(heartbeatChunk) + } catch { + // Ignore enqueue errors after stream shutdown. + } + }, STREAM_HEARTBEAT_INTERVAL_MS) + try { while (true) { const { done, value } = await reader.read() @@ -281,25 +364,44 @@ export async function GET(request: NextRequest, { params }: RouteParams) { continue } - controller.enqueue(value) sseBuffer += decoder.decode(value, { stream: true }) - await flushBufferedEvents(false) + const encodedBlocks = await flushBufferedEvents(false) + for (const encodedBlock of encodedBlocks) { + controller.enqueue(encodedBlock) + } } sseBuffer += decoder.decode() - await flushBufferedEvents(true) + const encodedBlocks = await flushBufferedEvents(true) + for (const encodedBlock of encodedBlocks) { + controller.enqueue(encodedBlock) + } + closed = true controller.close() } catch (error) { await closeTaskStream(resolvedStreamId, 'errored') - await reconcileIncompleteTurnSafely(resolvedTaskId, 2_500).catch(() => { - console.error('Failed to reconcile chat v2 stream reader error') + await logStreamFailureDiagnostic({ + taskId: resolvedTaskId, + sessionId: stream.sessionId, + fallbackError: 'Codex gateway turn failed', + }) + await finalizeActiveTurnFailure({ + taskId: resolvedTaskId, + sessionId: stream.sessionId, + error: 'Codex gateway turn failed', + clearGatewaySession: true, + }).catch(() => { + console.error('Failed to force finalize chat v2 stream reader error') }) try { + closed = true controller.close() } catch { // Ignore close errors after upstream socket termination. } } finally { + closed = true + clearInterval(heartbeatTimer) reader.releaseLock() } }, @@ -320,8 +422,20 @@ export async function GET(request: NextRequest, { params }: RouteParams) { } if (taskId) { - await reconcileIncompleteTurnSafely(taskId, 2_500).catch(() => { - console.error('Failed to reconcile chat v2 stream proxy error') + await logStreamFailureDiagnostic({ + taskId, + sessionId: streamSessionId, + fallbackError: 'Codex gateway turn failed', + }).catch(() => { + console.error('Failed to log chat v2 stream proxy diagnostic') + }) + await finalizeActiveTurnFailure({ + taskId, + sessionId: streamSessionId, + error: 'Codex gateway turn failed', + clearGatewaySession: true, + }).catch(() => { + console.error('Failed to force finalize chat v2 stream proxy error') }) } diff --git a/lib/codex-gateway/completion.ts b/lib/codex-gateway/completion.ts index c01adf5..cf131ed 100644 --- a/lib/codex-gateway/completion.ts +++ b/lib/codex-gateway/completion.ts @@ -1,13 +1,16 @@ import { and, desc, eq } from 'drizzle-orm' import { CodexGatewayApiError, getCodexGatewaySessionState } from '@/lib/codex-gateway/client' +import { diagnoseCodexTurnFailure } from '@/lib/codex-gateway/failure-diagnostics' import { getAssistantContentAfterCursor, type TranscriptTextEntry } from '@/lib/codex-gateway/transcript' import { getTaskGatewayContextById } from '@/lib/codex-gateway/task' import { db } from '@/lib/db/client' import { taskEvents, tasks, type Task } from '@/lib/db/schema' +import { closeTaskChatV2StreamDescriptor } from '@/lib/task-chat-v2' import { projectAssistantMessage } from '@/lib/task-event-projection' import { buildProjectedAssistantMessageId } from '@/lib/task-message-ids' import { appendProjectedAssistantMessageEvent, recordTaskEvent } from '@/lib/task-events' import { OperationTimeoutError, withTimeout } from '@/lib/utils/async' +import { formatKeyTaskLogMessage, TASK_FLOW_LOGS } from '@/lib/utils/task-flow-logs' export const TURN_COMPLETION_STATES = ['pending', 'running', 'completed', 'failed'] as const @@ -40,6 +43,14 @@ interface FinalizeActiveTurnFailureInput { taskId: string } +interface ForceFinalizeActiveTurnInput { + clearGatewaySession?: boolean + error?: string | null + mode: 'failed' | 'interrupted' + sessionId?: string | null + taskId: string +} + function parseTranscriptEntries(value: unknown): TranscriptTextEntry[] { if (!Array.isArray(value)) { return [] @@ -217,6 +228,29 @@ export async function finalizeTurnCompletion(input: FinalizeTurnInput): Promise< } export async function finalizeActiveTurnFailure(input: FinalizeActiveTurnFailureInput): Promise { + return await forceFinalizeActiveTurn({ + taskId: input.taskId, + sessionId: input.sessionId, + mode: 'failed', + error: input.error, + clearGatewaySession: input.clearGatewaySession, + }) +} + +export async function finalizeActiveTurnInterrupted(input: { + clearGatewaySession?: boolean + sessionId?: string | null + taskId: string +}): Promise { + return await forceFinalizeActiveTurn({ + taskId: input.taskId, + sessionId: input.sessionId, + mode: 'interrupted', + clearGatewaySession: input.clearGatewaySession, + }) +} + +async function forceFinalizeActiveTurn(input: ForceFinalizeActiveTurnInput): Promise { const [task] = await db.select().from(tasks).where(eq(tasks.id, input.taskId)).limit(1) if (!task || !hasActiveTurnCheckpoint(task) || !task.activeTurnSessionId || task.status === 'stopped') { @@ -233,14 +267,49 @@ export async function finalizeActiveTurnFailure(input: FinalizeActiveTurnFailure task.activeTurnTranscriptCursor!, ) + await closeTaskChatV2StreamDescriptor(input.taskId, input.mode === 'failed' ? 'errored' : 'closed').catch(() => { + console.error('Failed to close active chat stream during turn finalize') + }) + + if (input.mode === 'failed') { + const fallbackError = input.error || 'Codex gateway turn failed' + const diagnostic = await diagnoseCodexTurnFailure({ + taskId: input.taskId, + sessionId: task.activeTurnSessionId, + fallbackError, + turnStatus: 'failed', + }) + + console.info( + formatKeyTaskLogMessage(TASK_FLOW_LOGS.GATEWAY_TURN_FAILED, { + sessionId: task.activeTurnSessionId, + errorSource: diagnostic.source, + turnStatus: 'failed', + }), + ) + console.error('Chat v2 turn finalized as failed', diagnostic) + + return await finalizeTurnCompletion({ + taskId: input.taskId, + sessionId: task.activeTurnSessionId, + transcriptCursor: task.activeTurnTranscriptCursor!, + assistantContent, + success: false, + error: fallbackError, + clearGatewaySession: input.clearGatewaySession ?? true, + turnStatus: 'failed', + }) + } + return await finalizeTurnCompletion({ taskId: input.taskId, sessionId: task.activeTurnSessionId, transcriptCursor: task.activeTurnTranscriptCursor!, assistantContent, - success: false, - error: input.error, - clearGatewaySession: input.clearGatewaySession ?? task.gatewaySessionId === task.activeTurnSessionId, + success: true, + error: null, + clearGatewaySession: input.clearGatewaySession ?? true, + turnStatus: 'interrupted', }) } @@ -273,6 +342,28 @@ export async function reconcileIncompleteTurn(taskId: string): Promise= 500 && error.status < 600) { + return await finalizeActiveTurnFailure({ + taskId, + sessionId: task.activeTurnSessionId, + error: 'Codex gateway turn failed', + clearGatewaySession: true, + }) + } + + if (!(error instanceof CodexGatewayApiError)) { + return await finalizeActiveTurnFailure({ + taskId, + sessionId: task.activeTurnSessionId, + error: 'Codex gateway turn failed', + clearGatewaySession: true, + }) + } + throw error } } @@ -321,6 +444,7 @@ export async function reconcileIncompleteTurnSafely(taskId: string, timeoutMs = return null } + console.error('Chat v2 reconcile failed', error) throw error } } diff --git a/lib/codex-gateway/failure-diagnostics.ts b/lib/codex-gateway/failure-diagnostics.ts new file mode 100644 index 0000000..46b5641 --- /dev/null +++ b/lib/codex-gateway/failure-diagnostics.ts @@ -0,0 +1,232 @@ +import { and, desc, eq, inArray } from 'drizzle-orm' +import { db } from '@/lib/db/client' +import { taskEvents } from '@/lib/db/schema' + +type FailureSource = + | 'gateway-fallback' + | 'gateway-notification' + | 'gateway-session' + | 'gateway-state' + | 'gateway-warning' + +export interface CodexTurnFailureDiagnostic { + error: string + httpStatus?: number + source: FailureSource + turnStatus?: string | null +} + +interface DiagnosticCandidate { + error: string + priority: number + source: Exclude +} + +function normalizeDiagnosticText(value: string): string { + return value.replace(/\s+/g, ' ').trim() +} + +function isReconnectWarning(value: string): boolean { + return /^reconnecting\.\.\./i.test(value.trim()) +} + +function extractTextValue(record: Record, key: string): string | null { + const value = record[key] + + if (typeof value !== 'string') { + return null + } + + const normalizedValue = normalizeDiagnosticText(value) + return normalizedValue || null +} + +function extractPayloadTextCandidates(payload: Record | null | undefined): string[] { + if (!payload) { + return [] + } + + const candidates = [ + extractTextValue(payload, 'message'), + extractTextValue(payload, 'error'), + extractTextValue(payload, 'error_message'), + extractTextValue(payload, 'errorMessage'), + extractTextValue(payload, 'detail'), + extractTextValue(payload, 'textPreview'), + ] + + return candidates.filter((candidate): candidate is string => candidate !== null && !isReconnectWarning(candidate)) +} + +function buildCandidate( + error: string, + source: DiagnosticCandidate['source'], + priority: number, +): DiagnosticCandidate | null { + const normalizedError = normalizeDiagnosticText(error) + + if (!normalizedError || isReconnectWarning(normalizedError)) { + return null + } + + return { + error: normalizedError, + source, + priority, + } +} + +function classifyDiagnosticPriority(error: string): number { + const normalizedError = error.toLowerCase() + + if (normalizedError.includes('stream disconnected before completion')) { + return 100 + } + + if (normalizedError.includes('systemerror') || normalizedError.includes('system error')) { + return 90 + } + + if (normalizedError.includes('session is no longer available')) { + return 80 + } + + return 50 +} + +function extractNotificationCandidates(payload: Record | null | undefined): DiagnosticCandidate[] { + if (!payload) { + return [] + } + + const candidates = extractPayloadTextCandidates(payload) + .map((candidate) => buildCandidate(candidate, 'gateway-notification', classifyDiagnosticPriority(candidate))) + .filter((candidate): candidate is DiagnosticCandidate => Boolean(candidate)) + + const method = extractTextValue(payload, 'method') + const nestedStatus = payload.status + const statusType = + nestedStatus && typeof nestedStatus === 'object' && 'type' in nestedStatus && typeof nestedStatus.type === 'string' + ? normalizeDiagnosticText(nestedStatus.type) + : null + + if (method === 'thread/status/changed' && statusType) { + const candidate = buildCandidate(statusType, 'gateway-notification', classifyDiagnosticPriority(statusType)) + if (candidate) { + candidates.push(candidate) + } + } + + return candidates +} + +function extractStateCandidates(payload: Record | null | undefined): DiagnosticCandidate[] { + if (!payload) { + return [] + } + + const recentEvents = Array.isArray(payload.recentEvents) ? payload.recentEvents : [] + const candidates: DiagnosticCandidate[] = [] + + for (const recentEvent of [...recentEvents].reverse()) { + if (!recentEvent || typeof recentEvent !== 'object') { + continue + } + + const textPreview = + 'textPreview' in recentEvent && typeof recentEvent.textPreview === 'string' + ? normalizeDiagnosticText(recentEvent.textPreview) + : null + const method = + 'method' in recentEvent && typeof recentEvent.method === 'string' + ? normalizeDiagnosticText(recentEvent.method) + : null + const status = + 'status' in recentEvent && typeof recentEvent.status === 'string' + ? normalizeDiagnosticText(recentEvent.status) + : null + + if (textPreview) { + const candidate = buildCandidate(textPreview, 'gateway-state', classifyDiagnosticPriority(textPreview)) + if (candidate) { + candidates.push(candidate) + } + } + + if (method === 'thread/status/changed' && status) { + const candidate = buildCandidate(status, 'gateway-state', classifyDiagnosticPriority(status)) + if (candidate) { + candidates.push(candidate) + } + } + } + + return candidates +} + +export async function diagnoseCodexTurnFailure(input: { + fallbackError: string + httpStatus?: number + sessionId?: string | null + taskId: string + turnStatus?: string | null +}): Promise { + if (input.httpStatus === 404 || input.httpStatus === 410) { + return { + error: 'Codex gateway session is no longer available', + httpStatus: input.httpStatus, + source: 'gateway-session', + turnStatus: input.turnStatus ?? null, + } + } + + const eventFilters = [eq(taskEvents.taskId, input.taskId)] + if (input.sessionId) { + eventFilters.push(eq(taskEvents.sessionId, input.sessionId)) + } + + const recentEvents = await db + .select({ + kind: taskEvents.kind, + payload: taskEvents.payload, + }) + .from(taskEvents) + .where( + and( + ...eventFilters, + inArray(taskEvents.kind, ['gateway.warning', 'gateway.notification', 'gateway.state.snapshot']), + ), + ) + .orderBy(desc(taskEvents.seq)) + .limit(40) + + let bestCandidate: DiagnosticCandidate | null = null + + for (const event of recentEvents) { + let candidates: DiagnosticCandidate[] = [] + + if (event.kind === 'gateway.warning') { + candidates = extractPayloadTextCandidates(event.payload).flatMap((candidate) => { + const nextCandidate = buildCandidate(candidate, 'gateway-warning', classifyDiagnosticPriority(candidate)) + return nextCandidate ? [nextCandidate] : [] + }) + } else if (event.kind === 'gateway.notification') { + candidates = extractNotificationCandidates(event.payload) + } else if (event.kind === 'gateway.state.snapshot') { + candidates = extractStateCandidates(event.payload) + } + + for (const candidate of candidates) { + if (!bestCandidate || candidate.priority > bestCandidate.priority) { + bestCandidate = candidate + } + } + } + + return { + error: bestCandidate?.error || input.fallbackError, + httpStatus: input.httpStatus, + source: bestCandidate?.source || 'gateway-fallback', + turnStatus: input.turnStatus ?? null, + } +} diff --git a/lib/hooks/use-task-agent-chat-v2.ts b/lib/hooks/use-task-agent-chat-v2.ts index bedfe45..75e44c8 100644 --- a/lib/hooks/use-task-agent-chat-v2.ts +++ b/lib/hooks/use-task-agent-chat-v2.ts @@ -99,6 +99,7 @@ export function useTaskAgentChatV2(taskId: string, task: Task) { ) const [retainedStreamingMessage, setRetainedStreamingMessage] = useState(null) const [activeStream, setActiveStream] = useState(null) + const [streamReconnectNonce, setStreamReconnectNonce] = useState(0) const [isLoading, setIsLoading] = useState(true) const [error, setError] = useState(null) const [isSending, setIsSending] = useState(false) @@ -226,13 +227,45 @@ export function useTaskAgentChatV2(taskId: string, task: Task) { const source = new EventSource(streamUrl) eventSourceRef.current = source + const closeSource = () => { + source.close() + + if (eventSourceRef.current === source) { + eventSourceRef.current = null + } + } + + const scheduleReconnect = () => { + clearReconnectTimer() + closeSource() + + const nextAttempt = reconnectAttemptRef.current + 1 + reconnectAttemptRef.current = nextAttempt + const reconnectDelayMs = Math.min(1000 * nextAttempt, 5000) + + reconnectTimeoutRef.current = window.setTimeout(() => { + reconnectTimeoutRef.current = null + setStreamReconnectNonce((previousValue) => previousValue + 1) + void refreshChat(false) + }, reconnectDelayMs) + } + source.onopen = () => { clearReconnectTimer() reconnectAttemptRef.current = 0 } source.addEventListener('state', (event) => { - const nextState = JSON.parse(event.data) as CodexGatewayState + let nextState: CodexGatewayState + + try { + nextState = JSON.parse(event.data) as CodexGatewayState + } catch (error) { + console.error('Chat v2 state event payload parse failed', error) + scheduleReconnect() + return + } + const nextIdentity = liveTurnIdentity || taskCheckpointIdentity const nextStreamingMessage = buildStreamingAgentMessageFromState(taskId, nextState, nextIdentity) @@ -257,11 +290,7 @@ export function useTaskAgentChatV2(taskId: string, task: Task) { reconnectAttemptRef.current = 0 setActiveStream(null) setLiveTurnIdentity(null) - source.close() - - if (eventSourceRef.current === source) { - eventSourceRef.current = null - } + closeSource() void refreshChat(false) } @@ -272,11 +301,7 @@ export function useTaskAgentChatV2(taskId: string, task: Task) { reconnectAttemptRef.current = 0 setActiveStream(null) setLiveTurnIdentity(null) - source.close() - - if (eventSourceRef.current === source) { - eventSourceRef.current = null - } + closeSource() void refreshChat(false) }) @@ -286,32 +311,22 @@ export function useTaskAgentChatV2(taskId: string, task: Task) { return } - clearReconnectTimer() - source.close() - - if (eventSourceRef.current === source) { - eventSourceRef.current = null - } - - const nextAttempt = reconnectAttemptRef.current + 1 - reconnectAttemptRef.current = nextAttempt - const reconnectDelayMs = Math.min(1000 * nextAttempt, 5000) - - reconnectTimeoutRef.current = window.setTimeout(() => { - reconnectTimeoutRef.current = null - void refreshChat(false) - }, reconnectDelayMs) + scheduleReconnect() } return () => { clearReconnectTimer() - source.close() - - if (eventSourceRef.current === source) { - eventSourceRef.current = null - } + closeSource() } - }, [activeStream, clearReconnectTimer, liveTurnIdentity, refreshChat, taskCheckpointIdentity, taskId]) + }, [ + activeStream, + clearReconnectTimer, + liveTurnIdentity, + refreshChat, + streamReconnectNonce, + taskCheckpointIdentity, + taskId, + ]) const sendMessage = useCallback( async (content: string): Promise => { @@ -417,6 +432,15 @@ export function useTaskAgentChatV2(taskId: string, task: Task) { } } + clearReconnectTimer() + eventSourceRef.current?.close() + eventSourceRef.current = null + setActiveStream(null) + setLiveTurnIdentity(null) + setLiveState(null) + setRetainedStreamingMessage(null) + void refreshChat(false) + return { success: true, } @@ -428,7 +452,7 @@ export function useTaskAgentChatV2(taskId: string, task: Task) { } finally { setIsStopping(false) } - }, [activeStream, liveState?.activeTurn, task, taskId]) + }, [activeStream, clearReconnectTimer, liveState?.activeTurn, refreshChat, task, taskId]) const streamingMessage = useMemo( () => buildStreamingAgentMessage(taskId, liveState, persistedMessages, liveTurnIdentity), diff --git a/lib/utils/task-flow-logs.ts b/lib/utils/task-flow-logs.ts index b727e46..f8f0997 100644 --- a/lib/utils/task-flow-logs.ts +++ b/lib/utils/task-flow-logs.ts @@ -8,9 +8,13 @@ export const TASK_FLOW_LOGS = { DEVBOX_RUNTIME_READY: '[KEY][DEVBOX] Runtime ready', GATEWAY_SESSION_PREPARING: '[KEY][GATEWAY] Preparing session', GATEWAY_SESSION_READY: '[KEY][GATEWAY] Session ready', + GATEWAY_STREAM_CONNECTED: '[KEY][GATEWAY] Stream connected', + GATEWAY_STREAM_RECONNECTING: '[KEY][GATEWAY] Stream reconnecting', + GATEWAY_STREAM_RESUMED: '[KEY][GATEWAY] Stream resumed', GATEWAY_TURN_SENDING: '[KEY][GATEWAY] Sending user input', GATEWAY_TURN_WAITING: '[KEY][GATEWAY] Waiting for response', GATEWAY_TURN_COMPLETED: '[KEY][GATEWAY] Response received', + GATEWAY_TURN_FAILED: '[KEY][GATEWAY] Response failed', } as const const KEY_TASK_LOG_PATTERN = /^\[KEY\]\[(USER|DEVBOX|GATEWAY)\]\s*([^|]*?)(?:\s+\|\s+(.+))?$/ @@ -23,8 +27,11 @@ const KEY_TASK_LOG_METADATA_KEYS = [ 'sessionId', 'threadId', 'selectedModel', + 'streamState', 'transcriptCursor', 'turnStatus', + 'errorSource', + 'httpStatus', 'installedSkill', ] as const