Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 5 additions & 9 deletions app/api/tasks/[taskId]/gateway/turn/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import { tasks } from '@/lib/db/schema'
import { CodexGatewayApiError } from '@/lib/codex-gateway/client'
import { startCodexGatewayTaskTurn, waitForCodexGatewayTurnCompletion } from '@/lib/codex-gateway/runner'
import { getTaskGatewayContext } from '@/lib/codex-gateway/task'
import { prependSealosDeployContext } from '@/lib/sealos-deploy-context'
import { getServerSession } from '@/lib/session/get-server-session'
import { appendTaskMessage } from '@/lib/task-messages'
import { createTaskLogger } from '@/lib/utils/task-logger'
Expand Down Expand Up @@ -58,14 +57,11 @@ export async function POST(request: NextRequest, { params }: RouteParams) {
console.error('Failed to persist gateway turn user message')
}

const startedTurn = await startCodexGatewayTaskTurn(
taskId,
prependSealosDeployContext(prompt, task.runtimeNamespace),
{
appendUserMessage: false,
model: task.selectedModel,
},
)
const startedTurn = await startCodexGatewayTaskTurn(taskId, prompt, {
appendUserMessage: false,
model: task.selectedModel,
runtimeNamespace: task.runtimeNamespace,
})

after(async () => {
try {
Expand Down
5 changes: 2 additions & 3 deletions lib/codex-gateway/chat-v2-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ import { startCodexGatewayTaskTurn, waitForCodexGatewayTurnCompletion } from '@/
import { db } from '@/lib/db/client'
import { tasks, type Task } from '@/lib/db/schema'
import { ensureTaskDevboxRuntime } from '@/lib/devbox/runtime'
import { prependSealosDeployContext } from '@/lib/sealos-deploy-context'
import { ensureTaskChatV2StreamDescriptor, type TaskChatV2StreamDescriptor } from '@/lib/task-chat-v2'
import { appendUserMessageEvent, recordTaskEvent } from '@/lib/task-events'
import { createTaskLogger } from '@/lib/utils/task-logger'
Expand Down Expand Up @@ -56,11 +55,11 @@ export async function startTaskChatV2Turn(input: {
.where(eq(tasks.id, input.task.id))

const runtime = await ensureTaskDevboxRuntime(input.task, { logger })
const gatewayPrompt = prependSealosDeployContext(input.prompt, runtime.namespace || input.task.runtimeNamespace)

const startedTurn = await startCodexGatewayTaskTurn(input.task.id, gatewayPrompt, {
const startedTurn = await startCodexGatewayTaskTurn(input.task.id, input.prompt, {
appendUserMessage: false,
model: input.task.selectedModel,
runtimeNamespace: runtime.namespace || input.task.runtimeNamespace,
})

await recordTaskEvent({
Expand Down
35 changes: 30 additions & 5 deletions lib/codex-gateway/runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,21 @@ import { FORCED_CODEX_MODEL } from '@/lib/codex/defaults'
import { CodexGatewayApiError, getCodexGatewaySessionState, sendCodexGatewayTurn } from '@/lib/codex-gateway/client'
import { finalizeTurnCompletion, markTurnCompletionRunning, recordTurnCheckpoint } from '@/lib/codex-gateway/completion'
import { ensureCodexGatewaySession } from '@/lib/codex-gateway/session'
import type { CodexGatewayState, CodexGatewaySessionResponse } from '@/lib/codex-gateway/types'
import { getTaskGatewayContextById } from '@/lib/codex-gateway/task'
import { getAssistantContentAfterCursor } from '@/lib/codex-gateway/transcript'
import { db } from '@/lib/db/client'
import { taskMessages, tasks } from '@/lib/db/schema'
import { refreshTaskDevboxLease } from '@/lib/devbox/runtime'
import { prependSealosDeployContext } from '@/lib/sealos-deploy-context'
import { generateId } from '@/lib/utils/id'
import { createTaskLogger } from '@/lib/utils/task-logger'
import { formatKeyTaskLogMessage, TASK_FLOW_LOGS } from '@/lib/utils/task-flow-logs'
import type { CodexGatewaySessionResponse } from '@/lib/codex-gateway/types'

interface StartCodexGatewayTurnOptions {
appendUserMessage?: boolean
model?: string | null
runtimeNamespace?: string | null
}

export interface StartedCodexGatewayTurn {
Expand Down Expand Up @@ -60,6 +62,22 @@ function getTurnTranscriptCursor(
return transcript.length
}

function isFirstSessionTurn(state: CodexGatewayState | null | undefined): boolean {
return !state?.transcript?.some((entry) => entry.role === 'user' || entry.role === 'assistant')
}

function buildGatewayPrompt(
prompt: string,
state: CodexGatewayState | null | undefined,
runtimeNamespace?: string | null,
): string {
if (!isFirstSessionTurn(state)) {
return prompt
}

return prependSealosDeployContext(prompt, runtimeNamespace)
}

export async function startCodexGatewayTaskTurn(
taskId: string,
prompt: string,
Expand Down Expand Up @@ -96,6 +114,7 @@ export async function startCodexGatewayTaskTurn(
}
let gatewaySessionId = task.gatewaySessionId
let gatewayThreadId: string | null | undefined = null
let gatewayState: CodexGatewayState | null = null
let turnResponse: CodexGatewaySessionResponse

if (!gatewaySessionId) {
Expand All @@ -108,26 +127,30 @@ export async function startCodexGatewayTaskTurn(

gatewaySessionId = ensuredSession.sessionId
gatewayThreadId = ensuredSession.state.threadId
gatewayState = ensuredSession.state
} else {
try {
const existingSession = await getCodexGatewaySessionState(gatewayUrl, gatewaySessionId, gatewayAuthToken)
gatewayThreadId = existingSession.state.threadId
gatewayState = existingSession.state
} catch (error) {
if (!(error instanceof CodexGatewayApiError && error.status === 404)) {
throw error
}
}
}

let gatewayPrompt = buildGatewayPrompt(prompt, gatewayState, options.runtimeNamespace)

const turnSendingLog = formatKeyTaskLogMessage(TASK_FLOW_LOGS.GATEWAY_TURN_SENDING, {
promptChars: prompt.length,
promptChars: gatewayPrompt.length,
sessionId: gatewaySessionId,
threadId: gatewayThreadId,
})
await logger.info(turnSendingLog)
console.info(turnSendingLog)
try {
turnResponse = await sendCodexGatewayTurn(gatewayUrl, gatewaySessionId, { prompt }, gatewayAuthToken)
turnResponse = await sendCodexGatewayTurn(gatewayUrl, gatewaySessionId, { prompt: gatewayPrompt }, gatewayAuthToken)
} catch (error) {
if (!(error instanceof CodexGatewayApiError && error.status === 404)) {
throw error
Expand All @@ -154,15 +177,17 @@ export async function startCodexGatewayTaskTurn(

gatewaySessionId = refreshedSession.sessionId
gatewayThreadId = refreshedSession.state.threadId
gatewayState = refreshedSession.state
gatewayPrompt = buildGatewayPrompt(prompt, gatewayState, options.runtimeNamespace)
const retryTurnSendingLog = formatKeyTaskLogMessage(TASK_FLOW_LOGS.GATEWAY_TURN_SENDING, {
mode: 'recreated',
promptChars: prompt.length,
promptChars: gatewayPrompt.length,
sessionId: gatewaySessionId,
threadId: gatewayThreadId,
})
await logger.info(retryTurnSendingLog)
console.info(retryTurnSendingLog)
turnResponse = await sendCodexGatewayTurn(gatewayUrl, gatewaySessionId, { prompt }, gatewayAuthToken)
turnResponse = await sendCodexGatewayTurn(gatewayUrl, gatewaySessionId, { prompt: gatewayPrompt }, gatewayAuthToken)
}

const startedAt = new Date()
Expand Down
Loading