Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
613 changes: 441 additions & 172 deletions src/app/v1/_lib/proxy/response-handler.ts

Large diffs are not rendered by default.

87 changes: 71 additions & 16 deletions src/lib/async-task-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,19 @@ interface TaskInfo {
promise: Promise<void>;
abortController: AbortController;
createdAt: number;
lastActivityAt: number;
taskType: string;
staleTimeoutMs: number;
}

interface RegisterTaskOptions {
taskType?: string;
abortController?: AbortController;
staleTimeoutMs?: number;
}

const DEFAULT_STALE_TASK_TIMEOUT_MS = 10 * 60 * 1000;

class AsyncTaskManagerClass {
private tasks: Map<string, TaskInfo> = new Map();
private cleanupInterval: NodeJS.Timeout | null = null;
Expand Down Expand Up @@ -60,7 +70,7 @@ class AsyncTaskManagerClass {
this.cleanupAll();
});

// 每分钟检查并清理超时任务(>10 分钟未完成,防止内存泄漏)
// 每分钟检查并清理空闲超时任务,防止挂死后台任务长期强引用上下文。
this.cleanupInterval = setInterval(() => {
this.cleanupCompletedTasks();
}, 60000);
Expand All @@ -74,25 +84,42 @@ class AsyncTaskManagerClass {
* @param taskType 任务类型(用于日志)
* @returns AbortController(可用于取消任务)
*/
register(taskId: string, promise: Promise<void>, taskType = "unknown"): AbortController {
register(
taskId: string,
promise: Promise<void>,
taskTypeOrOptions: string | RegisterTaskOptions = "unknown"
): AbortController {
this.initializeIfNeeded();

const options =
typeof taskTypeOrOptions === "string" ? { taskType: taskTypeOrOptions } : taskTypeOrOptions;
const taskType = options.taskType ?? "unknown";

// 如果任务已存在,先取消旧任务
if (this.tasks.has(taskId)) {
const oldTaskInfo = this.tasks.get(taskId);
if (oldTaskInfo) {
logger.warn("[AsyncTaskManager] Task already exists, cancelling old task", {
taskId,
taskType,
});
this.cancel(taskId);
this.cleanup(taskId, oldTaskInfo);
}

const abortController = new AbortController();
const abortController = options.abortController ?? new AbortController();
const staleTimeoutMs =
options.staleTimeoutMs === undefined || options.staleTimeoutMs <= 0
? DEFAULT_STALE_TASK_TIMEOUT_MS
: options.staleTimeoutMs;
const now = Date.now();

const taskInfo: TaskInfo = {
promise,
abortController,
createdAt: Date.now(),
createdAt: now,
lastActivityAt: now,
taskType,
staleTimeoutMs,
};

this.tasks.set(taskId, taskInfo);
Expand Down Expand Up @@ -126,7 +153,7 @@ class AsyncTaskManagerClass {
}
})
.finally(() => {
this.cleanup(taskId);
this.cleanup(taskId, taskInfo);
});

logger.debug("[AsyncTaskManager] Task registered", {
Expand All @@ -138,6 +165,20 @@ class AsyncTaskManagerClass {
return abortController;
}

/**
* 标记任务仍在推进。流式任务每次读到 chunk 都应 touch,避免长时间活跃流被
* wall-clock stale cleanup 误判为挂死任务。
*/
touch(taskId: string): boolean {
const taskInfo = this.tasks.get(taskId);
if (!taskInfo) {
return false;
}

taskInfo.lastActivityAt = Date.now();
return true;
}

/**
* 取消一个任务
*
Expand All @@ -150,7 +191,9 @@ class AsyncTaskManagerClass {
return;
}

taskInfo.abortController.abort();
if (!taskInfo.abortController.signal.aborted) {
taskInfo.abortController.abort();
}

logger.info("[AsyncTaskManager] Task cancelled", {
taskId,
Expand All @@ -160,45 +203,56 @@ class AsyncTaskManagerClass {
}

/**
* 清理单个任务
* 清理单个任务。必须带上注册时的任务实例,避免旧任务 finally 误删同 taskId 的新任务。
*
* @param taskId 任务唯一标识
*/
cleanup(taskId: string): void {
private cleanup(taskId: string, expectedTask: TaskInfo): boolean {
if (this.tasks.get(taskId) !== expectedTask) {
return false;
}

const deleted = this.tasks.delete(taskId);
if (deleted) {
logger.debug("[AsyncTaskManager] Task cleaned up", {
taskId,
remainingTasks: this.tasks.size,
});
}
return deleted;
}

/**
* 检查并清理超时任务
*
* 遍历所有活跃任务,对于超过 10 分钟还未完成的任务
* 遍历所有活跃任务,对于空闲时间超过任务级 staleTimeoutMs 的任务
* 1. 记录警告日志
* 2. 触发 AbortController 取消任务
* 3. 从任务 Map 中移除
*
* ⚠️ 注意:这不是清理"已完成"的任务,而是清理"超时未完成"的任务
* 注意:这是清理"空闲超时"的任务。活跃流应在收到上游 chunk 时
* 调用 touch() 更新 lastActivityAt,避免被误判为挂死任务。
*/
private cleanupCompletedTasks(): void {
const now = Date.now();
const staleThreshold = 10 * 60 * 1000; // 10 分钟

for (const [taskId, taskInfo] of this.tasks.entries()) {
const age = now - taskInfo.createdAt;
const idleAge = now - taskInfo.lastActivityAt;

const staleTimeoutMs = taskInfo.staleTimeoutMs || DEFAULT_STALE_TASK_TIMEOUT_MS;

// 如果任务超过 10 分钟还没完成,记录警告并取消
if (age > staleThreshold) {
logger.warn("[AsyncTaskManager] Task timeout, cancelling", {
// 如果任务超过阈值没有任何进展,记录警告、取消并从 Map 断开强引用。
if (idleAge > staleTimeoutMs) {
logger.warn("[AsyncTaskManager] Task timeout, cancelling and detaching", {
taskId,
taskType: taskInfo.taskType,
age,
idleAge,
staleTimeoutMs,
});
this.cancel(taskId);
this.cleanup(taskId, taskInfo);
}
}
}
Expand All @@ -211,8 +265,9 @@ class AsyncTaskManagerClass {
count: this.tasks.size,
});

for (const taskId of this.tasks.keys()) {
for (const [taskId, taskInfo] of Array.from(this.tasks.entries())) {
this.cancel(taskId);
this.cleanup(taskId, taskInfo);
}

if (this.cleanupInterval) {
Expand Down
116 changes: 105 additions & 11 deletions src/lib/langfuse/emit-proxy-trace.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@ import type { ProxySession } from "@/app/v1/_lib/proxy/session";
import { logger } from "@/lib/logger";
import type { CostBreakdown } from "@/lib/utils/cost-calculation";

const LANGFUSE_RESPONSE_TEXT_MAX_CHARS = 1024 * 1024;
const LANGFUSE_RESPONSE_TEXT_EDGE_CHARS = 128 * 1024;
const LANGFUSE_TRUNCATED_MARKER = "\n\n[langfuse_response_truncated]\n\n";

export interface EmitProxyLangfuseTraceData {
responseHeaders: Headers;
responseText: string;
Expand All @@ -16,6 +20,81 @@ export interface EmitProxyLangfuseTraceData {
errorMessage?: string;
}

function truncateResponseTextForLangfuse(text: string): string {
if (text.length <= LANGFUSE_RESPONSE_TEXT_MAX_CHARS) {
return text;
}

return `${text.slice(0, LANGFUSE_RESPONSE_TEXT_EDGE_CHARS)}${LANGFUSE_TRUNCATED_MARKER}${text.slice(
-LANGFUSE_RESPONSE_TEXT_EDGE_CHARS
)}`;
}

function buildRequestMessagePreview(message: Record<string, unknown>): Record<string, unknown> {
return {
truncatedForLangfuse: true,
model: typeof message.model === "string" ? message.model : undefined,
stream: typeof message.stream === "boolean" ? message.stream : undefined,
max_tokens: typeof message.max_tokens === "number" ? message.max_tokens : undefined,
temperature: typeof message.temperature === "number" ? message.temperature : undefined,
messageCount: Array.isArray(message.messages) ? message.messages.length : undefined,
contentsCount: Array.isArray(message.contents) ? message.contents.length : undefined,
toolsCount: Array.isArray(message.tools) ? message.tools.length : undefined,
hasSystemPrompt:
(Array.isArray(message.system) && message.system.length > 0) ||
(typeof message.system === "string" && message.system.length > 0),
};
}
Comment on lines +33 to +47

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The newly introduced buildRequestMessagePreview function truncates the request message to avoid holding large objects in memory. However, it completely omits the system and tools fields, only providing hasSystemPrompt and toolsCount.

This causes a silent bug in buildRequestBodySummary (inside trace-proxy-request.ts), which directly accesses msg.system and msg.tools to compute hasSystemPrompt and toolsCount. Because these fields are missing on the preview object, hasSystemPrompt will always evaluate to false and toolsCount will always evaluate to 0 in Langfuse traces for all snapshotted sessions.

To fix this without modifying trace-proxy-request.ts (which is outside the active diff hunk), we can return dummy arrays for system and tools in buildRequestMessagePreview that match the expected checks (Array.isArray and .length).

function buildRequestMessagePreview(message: Record<string, unknown>): Record<string, unknown> {
  if (!message) {
    return { truncatedForLangfuse: true };
  }

  const hasSystemPrompt =
    (Array.isArray(message.system) && message.system.length > 0) ||
    (typeof message.system === "string" && message.system.length > 0);
  const toolsCount = Array.isArray(message.tools) ? message.tools.length : 0;

  return {
    truncatedForLangfuse: true,
    model: typeof message.model === "string" ? message.model : undefined,
    stream: typeof message.stream === "boolean" ? message.stream : undefined,
    max_tokens: typeof message.max_tokens === "number" ? message.max_tokens : undefined,
    temperature: typeof message.temperature === "number" ? message.temperature : undefined,
    messageCount: Array.isArray(message.messages) ? message.messages.length : undefined,
    contentsCount: Array.isArray(message.contents) ? message.contents.length : undefined,
    system: hasSystemPrompt ? [null] : [],
    tools: new Array(toolsCount).fill(null),
  };
}


function buildLangfuseSessionSnapshot(session: ProxySession): ProxySession {
const providerChain = session.getProviderChain().map((item) => ({ ...item }));
const specialSettings = session.getSpecialSettings();
const cacheTtlResolved = session.getCacheTtlResolved();
const context1mApplied = session.getContext1mApplied();
const currentModel = session.getCurrentModel();
const originalModel = session.getOriginalModel();
const modelRedirected = session.isModelRedirected();
const endpoint = session.getEndpoint();
const requestSequence = session.getRequestSequence();
const messagesLength = session.getMessagesLength();
const forwardedRequestBody =
typeof session.forwardedRequestBody === "string"
? truncateResponseTextForLangfuse(session.forwardedRequestBody)
: null;
const requestMessage = buildRequestMessagePreview(session.request.message);

return {
startTime: session.startTime,
method: session.method,
headers: new Headers(session.headers),
request: {
message: requestMessage,
log: truncateResponseTextForLangfuse(session.request.log ?? ""),
note: session.request.note,
model: session.request.model,
imageRequestMetadata: null,
},
userAgent: session.userAgent,
provider: session.provider,
messageContext: session.messageContext,
ttfbMs: session.ttfbMs,
forwardStartTime: session.forwardStartTime,
forwardedRequestBody,
sessionId: session.sessionId,
originalFormat: session.originalFormat,
getMessagesLength: () => messagesLength,
getEndpoint: () => endpoint,
getCurrentModel: () => currentModel,
getProviderChain: () => providerChain,
getRequestSequence: () => requestSequence,
getOriginalModel: () => originalModel,
isModelRedirected: () => modelRedirected,
getSpecialSettings: () => specialSettings,
getCacheTtlResolved: () => cacheTtlResolved,
getContext1mApplied: () => context1mApplied,
} as unknown as ProxySession;
Comment on lines +66 to +95

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 buildRequestBodySummary reads wrong keys from the snapshot object

buildLangfuseSessionSnapshot replaces session.request.message with the output of buildRequestMessagePreview, which stores hasSystemPrompt: bool and toolsCount: number as flat scalar fields. But buildRequestBodySummary in trace-proxy-request.ts reads msg.system (an array) to compute hasSystemPrompt and msg.tools (an array) to compute toolsCount. Because the preview object never sets system or tools, every Langfuse trace will show hasSystemPrompt: false and toolsCount: 0 regardless of what the original request contained, silently corrupting the requestSummary metadata for all requests after this change ships.

Prompt To Fix With AI
This is a comment left during a code review.
Path: src/lib/langfuse/emit-proxy-trace.ts
Line: 66-95

Comment:
**`buildRequestBodySummary` reads wrong keys from the snapshot object**

`buildLangfuseSessionSnapshot` replaces `session.request.message` with the output of `buildRequestMessagePreview`, which stores `hasSystemPrompt: bool` and `toolsCount: number` as flat scalar fields. But `buildRequestBodySummary` in `trace-proxy-request.ts` reads `msg.system` (an array) to compute `hasSystemPrompt` and `msg.tools` (an array) to compute `toolsCount`. Because the preview object never sets `system` or `tools`, every Langfuse trace will show `hasSystemPrompt: false` and `toolsCount: 0` regardless of what the original request contained, silently corrupting the `requestSummary` metadata for all requests after this change ships.

How can I resolve this? If you propose a fix, please make it concise.

}

/**
* 异步发送代理请求的 Langfuse trace。
*
Expand All @@ -27,20 +106,35 @@ export function emitProxyLangfuseTrace(
): void {
if (!process.env.LANGFUSE_PUBLIC_KEY || !process.env.LANGFUSE_SECRET_KEY) return;

// 必须在异步 import 之前截断,避免动态加载/SDK 发送期间闭包继续强引用完整大响应。
const responseText = truncateResponseTextForLangfuse(data.responseText);
const sessionSnapshot = buildLangfuseSessionSnapshot(session);
const {
responseHeaders,
durationMs,
statusCode,
isStreaming,
usageMetrics,
costUsd,
costBreakdown,
sseEventCount,
errorMessage,
} = data;

void import("@/lib/langfuse/trace-proxy-request")
.then(({ traceProxyRequest }) => {
void traceProxyRequest({
session,
responseHeaders: data.responseHeaders,
durationMs: data.durationMs,
statusCode: data.statusCode,
isStreaming: data.isStreaming,
responseText: data.responseText,
usageMetrics: data.usageMetrics,
costUsd: data.costUsd,
costBreakdown: data.costBreakdown,
sseEventCount: data.sseEventCount,
errorMessage: data.errorMessage,
session: sessionSnapshot,
responseHeaders,
durationMs,
statusCode,
isStreaming,
responseText,

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Stop capturing large Langfuse trace inputs

When Langfuse is enabled for a large response, this passes the truncated responseText, but the async callback still closes over the original data object via the adjacent data.* reads; that object still contains the full data.responseText, so the full response stays strongly reachable until the dynamic import and trace submission finish. Destructure the non-large fields before the import() and avoid capturing data so the truncation actually releases the original payload.

Useful? React with 👍 / 👎.

usageMetrics,
costUsd,
costBreakdown,
sseEventCount,
errorMessage,
});
})
.catch((err) => {
Expand Down
31 changes: 29 additions & 2 deletions src/lib/langfuse/trace-proxy-request.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,27 @@ import { isLangfuseEnabled } from "@/lib/langfuse/index";
import { logger } from "@/lib/logger";
import type { CostBreakdown } from "@/lib/utils/cost-calculation";

const LANGFUSE_JSON_PARSE_MAX_CHARS = 1024 * 1024;
const LANGFUSE_TEXT_PREVIEW_EDGE_CHARS = 128 * 1024;

function buildRequestBodySummary(session: ProxySession): Record<string, unknown> {
const msg = session.request.message as Record<string, unknown>;
const hasSystemPrompt =
typeof msg.hasSystemPrompt === "boolean"
? msg.hasSystemPrompt
: Array.isArray(msg.system) && msg.system.length > 0;
const toolsCount =
typeof msg.toolsCount === "number"
? msg.toolsCount
: Array.isArray(msg.tools)
? msg.tools.length
: 0;

return {
model: session.request.model,
messageCount: session.getMessagesLength(),
hasSystemPrompt: Array.isArray(msg.system) && msg.system.length > 0,
toolsCount: Array.isArray(msg.tools) ? msg.tools.length : 0,
hasSystemPrompt,
toolsCount,
stream: msg.stream === true,
maxTokens: typeof msg.max_tokens === "number" ? msg.max_tokens : undefined,
temperature: typeof msg.temperature === "number" ? msg.temperature : undefined,
Expand Down Expand Up @@ -126,6 +140,15 @@ function buildResponseOutput(ctx: TraceContext): unknown {
return output;
}

function buildLargeTextPreview(text: string): Record<string, unknown> {
return {
truncated: true,
totalChars: text.length,
head: text.slice(0, LANGFUSE_TEXT_PREVIEW_EDGE_CHARS),
tail: text.slice(-LANGFUSE_TEXT_PREVIEW_EDGE_CHARS),
};
}

/**
* Send a trace to Langfuse for a completed proxy request.
* Fully async and non-blocking. Errors are caught and logged.
Expand Down Expand Up @@ -422,6 +445,10 @@ export async function traceProxyRequest(ctx: TraceContext): Promise<void> {
}

function tryParseJsonSafe(text: string): unknown {
if (text.length > LANGFUSE_JSON_PARSE_MAX_CHARS) {
return buildLargeTextPreview(text);
}

try {
return JSON.parse(text);
} catch {
Expand Down
Loading
Loading