diff --git a/README.md b/README.md index 612d5f07..8d6355d3 100644 --- a/README.md +++ b/README.md @@ -257,6 +257,20 @@ cc-switch provider stream-check # Run stream health check cc-switch provider fetch-models # Fetch remote model list ``` +#### OpenAI-compatible Provider Options + +When using an OpenAI-compatible provider (e.g. a third-party relay), you can enable token usage reporting in streaming responses by adding `stream_include_usage` to the provider's settings config: + +```json +{ + "stream_include_usage": true +} +``` + +This injects `"stream_options": {"include_usage": true}` into every streaming request sent to the upstream. The proxy then reads the trailing usage chunk that OpenAI-compatible APIs append after `finish_reason`, and surfaces real `input_tokens` / `output_tokens` values in the `message_delta` event instead of `null`. + +To apply this, edit the provider and paste the JSON above into the **Settings Config** field. + ### 🛠️ MCP Server Management Manage Model Context Protocol servers across Claude, Codex, Gemini, and OpenCode. diff --git a/README_ZH.md b/README_ZH.md index 20870ccf..4bdfad40 100644 --- a/README_ZH.md +++ b/README_ZH.md @@ -258,6 +258,20 @@ cc-switch provider stream-check # 执行流式健康检查 cc-switch provider fetch-models # 拉取远端模型列表 ``` +#### OpenAI 兼容供应商选项 + +使用 OpenAI 兼容供应商(例如第三方中转服务)时,可以在供应商的 Settings Config 中加入 `stream_include_usage`,以开启流式响应中的 token 用量上报: + +```json +{ + "stream_include_usage": true +} +``` + +开启后,代理会在每次流式请求中自动注入 `"stream_options": {"include_usage": true}`。上游返回流结束后附带的用量 chunk 会被读取,并将真实的 `input_tokens` / `output_tokens` 数值填入 `message_delta` 事件,而不是 `null`。 + +编辑对应供应商,将上述 JSON 粘贴到 **Settings Config** 字段中即可生效。 + ### 🛠️ MCP 服务器管理 跨 Claude、Codex、Gemini 与 OpenCode 管理模型上下文协议服务器。 diff --git a/src-tauri/src/provider.rs b/src-tauri/src/provider.rs index 2a885158..1df8fae9 100644 --- a/src-tauri/src/provider.rs +++ b/src-tauri/src/provider.rs @@ -65,6 +65,44 @@ impl Provider { in_failover_queue: false, } } + + /// 读取 stream_include_usage 设置。 + /// 如果 provider 自身未设置,则根据 API 格式自动判断默认值: + /// - openai_chat / openai_responses 兼容模式默认 true + /// - anthropic 原生模式默认 false + pub fn stream_include_usage(&self) -> bool { + let explicit = self + .settings_config + .get("stream_include_usage") + .and_then(|v| v.as_bool()); + + if let Some(value) = explicit { + return value; + } + + // 自动根据 API 格式推断默认值 + let api_format = self + .meta + .as_ref() + .and_then(|meta| meta.api_format.as_deref()) + .or_else(|| { + self.settings_config + .get("api_format") + .and_then(|v| v.as_str()) + }); + + match api_format { + Some("openai_chat") | Some("openai_responses") => true, + _ => { + // 未显式设置 api_format 时,根据常见字段推断是否为 OpenAI 兼容供应商 + let has_openai_fields = self.settings_config.get("baseUrl").is_some() + || self.settings_config.get("base_url").is_some() + || self.settings_config.get("apiKey").is_some() + || self.settings_config.get("api_key").is_some(); + has_openai_fields + } + } + } } /// 供应商管理器 diff --git a/src-tauri/src/proxy/provider_router.rs b/src-tauri/src/proxy/provider_router.rs index 5d9d828d..805ab4ca 100644 --- a/src-tauri/src/proxy/provider_router.rs +++ b/src-tauri/src/proxy/provider_router.rs @@ -69,6 +69,12 @@ impl ProviderRouter { } else { if let Some(current) = self.current_provider(app_type)? { total_providers = 1; + log::info!( + "[ProviderRouter] [{}] selected provider: {} (stream_include_usage={})", + app_type, + current.name, + current.stream_include_usage() + ); result.push(current); } } diff --git a/src-tauri/src/proxy/providers/claude.rs b/src-tauri/src/proxy/providers/claude.rs index 9fd82fe9..04fad271 100644 --- a/src-tauri/src/proxy/providers/claude.rs +++ b/src-tauri/src/proxy/providers/claude.rs @@ -288,11 +288,21 @@ impl ProviderAdapter for ClaudeAdapter { .and_then(|meta| meta.prompt_cache_key.as_deref()) .unwrap_or(&provider.id); - match self.get_api_format(provider) { + let api_format = self.get_api_format(provider); + let stream_include_usage = provider.stream_include_usage(); + + if stream_include_usage { + log::info!( + "[ClaudeAdapter] Provider '{}' stream_include_usage enabled", + provider.name + ); + } + + match api_format { "openai_responses" => { super::transform_responses::anthropic_to_responses(body, Some(cache_key)) } - _ => super::transform::anthropic_to_openai(body, Some(cache_key)), + _ => super::transform::anthropic_to_openai(body, Some(cache_key), stream_include_usage), } } diff --git a/src-tauri/src/proxy/providers/streaming.rs b/src-tauri/src/proxy/providers/streaming.rs index a164823c..29f7bc38 100644 --- a/src-tauri/src/proxy/providers/streaming.rs +++ b/src-tauri/src/proxy/providers/streaming.rs @@ -98,6 +98,12 @@ pub fn create_anthropic_sse_stream( let mut open_tool_block_indices: HashSet = HashSet::new(); let mut legacy_function_name: Option = None; let mut legacy_function_block_index: Option = None; + // Cache the latest usage chunk; OpenAI sends a trailing choices:[] chunk + // with the real token counts when stream_options.include_usage is enabled. + let mut cached_usage: Option = None; + // Defer message_delta until [DONE] so that the trailing usage chunk + // (choices:[], usage:{...}) is guaranteed to arrive first. + let mut pending_stop_reason: Option = None; tokio::pin!(stream); @@ -120,6 +126,39 @@ pub fn create_anthropic_sse_stream( }; if data.trim() == "[DONE]" { + // Emit the deferred message_delta now that all + // trailing chunks (including include_usage) have + // been processed. + if let Some(stop_reason) = pending_stop_reason.take() { + let effective_usage = cached_usage.take(); + let usage_json = effective_usage.as_ref().map(|usage| { + let mut u = json!({ + "input_tokens": usage.prompt_tokens, + "output_tokens": usage.completion_tokens + }); + if let Some(cached) = extract_cache_read_tokens(usage) { + u["cache_read_input_tokens"] = json!(cached); + } + if let Some(created) = usage.cache_creation_input_tokens { + u["cache_creation_input_tokens"] = json!(created); + } + u + }); + let delta_event = json!({ + "type": "message_delta", + "delta": { + "stop_reason": stop_reason, + "stop_sequence": null + }, + "usage": usage_json + }); + let delta_sse = format!( + "event: message_delta\ndata: {}\n\n", + serde_json::to_string(&delta_event).unwrap_or_default() + ); + yield Ok(Bytes::from(delta_sse)); + } + let event = json!({"type": "message_stop"}); let sse_data = format!( "event: message_stop\ndata: {}\n\n", @@ -140,6 +179,16 @@ pub fn create_anthropic_sse_stream( current_model = Some(chunk.model.clone()); } + // OpenAI sends a trailing chunk with choices:[] that + // carries the real usage when include_usage is enabled. + // Cache the usage and skip the rest of the processing. + if chunk.choices.is_empty() { + if let Some(u) = chunk.usage { + cached_usage = Some(u); + } + continue; + } + let Some(choice) = chunk.choices.first() else { continue; }; @@ -582,32 +631,16 @@ pub fn create_anthropic_sse_stream( open_tool_block_indices.clear(); } - let usage_json = chunk.usage.as_ref().map(|usage| { - let mut usage_json = json!({ - "input_tokens": usage.prompt_tokens, - "output_tokens": usage.completion_tokens - }); - if let Some(cached) = extract_cache_read_tokens(usage) { - usage_json["cache_read_input_tokens"] = json!(cached); - } - if let Some(created) = usage.cache_creation_input_tokens { - usage_json["cache_creation_input_tokens"] = json!(created); - } - usage_json - }); - let event = json!({ - "type": "message_delta", - "delta": { - "stop_reason": map_stop_reason(Some(finish_reason)), - "stop_sequence": null - }, - "usage": usage_json - }); - let sse_data = format!( - "event: message_delta\ndata: {}\n\n", - serde_json::to_string(&event).unwrap_or_default() - ); - yield Ok(Bytes::from(sse_data)); + // Defer message_delta to [DONE] so the trailing + // include_usage chunk can be captured first. + // Also absorb any inline usage from this chunk as + // a fallback (providers that don't send a trailing + // chunk will still have usage here). + if cached_usage.is_none() { + cached_usage = chunk.usage; + } + pending_stop_reason = + Some(map_stop_reason(Some(finish_reason)).unwrap_or_default()); } } } @@ -963,4 +996,28 @@ mod tests { "empty content deltas should not open text blocks" ); } + + #[tokio::test] + async fn stream_options_include_usage_trailing_chunk_is_used() { + // OpenAI sends a trailing choices:[] chunk with real usage when + // stream_options.include_usage is enabled. The finish_reason chunk + // itself carries usage:null in this mode. + let input = concat!( + "data: {\"id\":\"chatcmpl_1\",\"model\":\"gpt-4o\",\"choices\":[{\"delta\":{\"content\":\"Hi\"}}]}\n\n", + "data: {\"id\":\"chatcmpl_1\",\"model\":\"gpt-4o\",\"choices\":[{\"delta\":{},\"finish_reason\":\"stop\"}],\"usage\":null}\n\n", + "data: {\"id\":\"chatcmpl_1\",\"model\":\"gpt-4o\",\"choices\":[],\"usage\":{\"prompt_tokens\":10,\"completion_tokens\":5}}\n\n", + "data: [DONE]\n\n" + ); + + let events = collect_events(input).await; + let message_delta = events + .iter() + .find(|event| event["type"] == "message_delta") + .expect("message_delta event"); + + assert_eq!(message_delta["usage"]["input_tokens"], 10, + "should pick up prompt_tokens from trailing include_usage chunk"); + assert_eq!(message_delta["usage"]["output_tokens"], 5, + "should pick up completion_tokens from trailing include_usage chunk"); + } } diff --git a/src-tauri/src/proxy/providers/transform.rs b/src-tauri/src/proxy/providers/transform.rs index ee6c2231..048ac4bc 100644 --- a/src-tauri/src/proxy/providers/transform.rs +++ b/src-tauri/src/proxy/providers/transform.rs @@ -1,7 +1,11 @@ use crate::proxy::error::ProxyError; use serde_json::{json, Value}; -pub fn anthropic_to_openai(body: Value, cache_key: Option<&str>) -> Result { +pub fn anthropic_to_openai( + body: Value, + cache_key: Option<&str>, + stream_include_usage: bool, +) -> Result { let mut result = json!({}); if let Some(model) = body.get("model").and_then(|m| m.as_str()) { @@ -50,6 +54,9 @@ pub fn anthropic_to_openai(body: Value, cache_key: Option<&str>) -> Result