From bc8323e3246237cca8fcfa80a4eb0e2c470907e9 Mon Sep 17 00:00:00 2001 From: Nicholas Montgomery Date: Mon, 24 Nov 2025 22:48:06 -0500 Subject: [PATCH 1/3] fix(openai): properly transform streaming tool_calls and handle incomplete streams MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This fix addresses two critical issues with OpenAI-compatible provider streaming: 1. **Tool Calls Transformation**: OpenAI streaming sends tool_calls in a different format than Anthropic. This commit implements full transformation: - Detects tool_calls in OpenAI streaming chunks - Transforms to Anthropic tool_use format with proper event structure - Closes text content blocks before tool_use blocks - Sends content_block_start/delta/stop for each tool 2. **Incomplete Stream Handling**: Some OpenAI-compatible providers (notably Cerebras) close the stream without sending a finish_reason chunk. This commit adds: - Stream finalization that detects incomplete streams - Automatic end event generation (content_block_stop, message_delta, message_stop) - Prevents duplicate end events when finish_reason IS sent The fix ensures: - ✅ Streaming works with tool calls (TodoWrite, etc.) - ✅ No duplicate messages - ✅ Graceful handling of provider-specific streaming bugs - ✅ Full Anthropic API compatibility Tested with: Cerebras (zai-glm-4.6), streaming tool calls work perfectly. --- Cargo.toml | 1 + src/providers/openai.rs | 330 +++++++++++++++++++++++++++++++++++++++- 2 files changed, 327 insertions(+), 4 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 1d1743f..c700305 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -56,6 +56,7 @@ once_cell = "1" # Lazy statics async-trait = "0.1" # Async trait support dirs = "5" # User directories regex = "1" # Regular expressions +uuid = { version = "1.0", features = ["v4", "serde"] } # UUID generation for streaming # OAuth & Auth oauth2 = "4" # OAuth 2.0 client diff --git a/src/providers/openai.rs b/src/providers/openai.rs index 5a003ba..e86189a 100644 --- a/src/providers/openai.rs +++ b/src/providers/openai.rs @@ -189,6 +189,38 @@ struct ResponsesUsage { output_tokens: u32, } +/// OpenAI Streaming Chunk (for SSE transformation) +#[derive(Debug, Deserialize)] +struct OpenAIStreamChunk { + id: String, + #[serde(default)] + model: String, + choices: Vec, + #[serde(default)] + created: u64, +} + +#[derive(Debug, Deserialize)] +struct OpenAIStreamChoice { + delta: OpenAIStreamDelta, + #[serde(default)] + index: usize, + #[serde(default)] + finish_reason: Option, +} + +#[derive(Debug, Deserialize)] +struct OpenAIStreamDelta { + #[serde(default)] + content: Option, + #[serde(default)] + reasoning: Option, // For GLM/Cerebras models + #[serde(default)] + role: Option, + #[serde(default)] + tool_calls: Option>, +} + /// OpenAI provider implementation pub struct OpenAIProvider { name: String, @@ -828,6 +860,184 @@ impl OpenAIProvider { }, } } + + /// Transform OpenAI streaming chunk to Anthropic format + /// Returns raw SSE-formatted bytes (event: type / data: json) + fn transform_openai_chunk_to_anthropic_sse(chunk: &OpenAIStreamChunk, message_id: &str, is_first: &mut bool, has_content_block: &mut bool, stream_ended: &mut bool) -> String { + let mut output = String::new(); + + // First chunk: emit message_start + if *is_first { + *is_first = false; + let message_start = serde_json::json!({ + "type": "message_start", + "message": { + "id": message_id, + "type": "message", + "role": "assistant", + "content": [], + "model": chunk.model, + "stop_reason": null, + "stop_sequence": null, + "usage": { + "input_tokens": 0, + "output_tokens": 0 + } + } + }); + output.push_str(&format!("event: message_start\ndata: {}\n\n", message_start)); + } + + // Process delta content + for choice in &chunk.choices { + // Handle text content (content or reasoning fields) + let text_content = choice.delta.content.as_ref() + .or(choice.delta.reasoning.as_ref()); // Support reasoning field for GLM/Cerebras + + if let Some(text) = text_content { + // Skip empty text + if text.is_empty() { + continue; + } + + // Emit content_block_start if this is the first content + if !*has_content_block { + *has_content_block = true; + let block_start = serde_json::json!({ + "type": "content_block_start", + "index": 0, + "content_block": { + "type": "text", + "text": "" + } + }); + output.push_str(&format!("event: content_block_start\ndata: {}\n\n", block_start)); + } + + // Emit content_block_delta + let delta = serde_json::json!({ + "type": "content_block_delta", + "index": 0, + "delta": { + "type": "text_delta", + "text": text + } + }); + output.push_str(&format!("event: content_block_delta\ndata: {}\n\n", delta)); + } + + // Handle tool calls (OpenAI function calling → Anthropic tool_use) + if let Some(ref tool_calls) = choice.delta.tool_calls { + tracing::debug!("🔧 Transforming {} tool_calls to Anthropic tool_use format", tool_calls.len()); + + // First, close any open text content block + if *has_content_block { + let block_stop = serde_json::json!({ + "type": "content_block_stop", + "index": 0 + }); + output.push_str(&format!("event: content_block_stop\ndata: {}\n\n", block_stop)); + *has_content_block = false; + } + + // Transform each tool call to Anthropic format + for (idx, tool_call) in tool_calls.iter().enumerate() { + // Extract tool info + if let Some(ref function) = tool_call.get("function") { + let default_tool_id = format!("tool_{}", idx); + let tool_id = tool_call.get("id") + .and_then(|v| v.as_str()) + .unwrap_or(&default_tool_id); + let tool_name = function.get("name") + .and_then(|v| v.as_str()) + .unwrap_or("unknown"); + let tool_args = function.get("arguments") + .and_then(|v| v.as_str()) + .unwrap_or("{}"); + + tracing::debug!("🔨 Tool: {} (id: {})", tool_name, tool_id); + + // Parse arguments to validate JSON + let tool_input: serde_json::Value = serde_json::from_str(tool_args) + .unwrap_or(serde_json::json!({})); + + // Send tool_use content_block_start + let block_start = serde_json::json!({ + "type": "content_block_start", + "index": idx + 1, // Index after text content + "content_block": { + "type": "tool_use", + "id": tool_id, + "name": tool_name + } + }); + output.push_str(&format!("event: content_block_start\ndata: {}\n\n", block_start)); + + // Send tool input as delta + let input_delta = serde_json::json!({ + "type": "content_block_delta", + "index": idx + 1, + "delta": { + "type": "input_json_delta", + "partial_json": serde_json::to_string(&tool_input).unwrap_or_default() + } + }); + output.push_str(&format!("event: content_block_delta\ndata: {}\n\n", input_delta)); + + // Close tool_use block + let block_stop = serde_json::json!({ + "type": "content_block_stop", + "index": idx + 1 + }); + output.push_str(&format!("event: content_block_stop\ndata: {}\n\n", block_stop)); + } + } + + continue; + } + + // Handle finish_reason (stream end) + if let Some(reason) = &choice.finish_reason { + *stream_ended = true; // Mark that stream ended properly + + if *has_content_block { + // Emit content_block_stop + let block_stop = serde_json::json!({ + "type": "content_block_stop", + "index": 0 + }); + output.push_str(&format!("event: content_block_stop\ndata: {}\n\n", block_stop)); + } + + // Emit message_delta with stop reason + let stop_reason = match reason.as_str() { + "stop" => "end_turn", + "length" => "max_tokens", + "tool_calls" => "end_turn", // Tool calls also end the turn + _ => "end_turn" + }; + let message_delta = serde_json::json!({ + "type": "message_delta", + "delta": { + "stop_reason": stop_reason, + "stop_sequence": null + }, + "usage": { + "output_tokens": 0 + } + }); + output.push_str(&format!("event: message_delta\ndata: {}\n\n", message_delta)); + + // Emit message_stop + let message_stop = serde_json::json!({ + "type": "message_stop" + }); + output.push_str(&format!("event: message_stop\ndata: {}\n\n", message_stop)); + } + } + + output + } } #[async_trait] @@ -1120,11 +1330,123 @@ impl AnthropicProvider for OpenAIProvider { }); } - // TODO: Transform OpenAI SSE format to Anthropic SSE format - // For now, just pass through the stream - let stream = response.bytes_stream().map_err(|e| ProviderError::HttpError(e)); + // Transform OpenAI SSE format to Anthropic SSE format + use futures::stream::StreamExt; + use crate::providers::streaming::SseStream; + use std::sync::{Arc, Mutex}; + + let message_id = format!("msg_{}", uuid::Uuid::new_v4()); + let is_first = Arc::new(Mutex::new(true)); + let has_content_block = Arc::new(Mutex::new(false)); + let stream_ended_properly = Arc::new(Mutex::new(false)); // Track if finish_reason was received + let has_content_for_cleanup = has_content_block.clone(); // Clone before moving into closure + let stream_ended_for_cleanup = stream_ended_properly.clone(); + + // Convert response bytes stream to SSE events + let sse_stream = SseStream::new(response.bytes_stream()); + + // Transform OpenAI SSE events to Anthropic format + let transformed_stream = sse_stream.then(move |result| { + let message_id = message_id.clone(); + let is_first = is_first.clone(); + let has_content_block = has_content_block.clone(); + let stream_ended_properly = stream_ended_properly.clone(); + + async move { + match result { + Ok(sse_event) => { + tracing::debug!("📦 Received SSE chunk: {}", sse_event.data.chars().take(100).collect::()); + + // Skip empty data + if sse_event.data.trim().is_empty() { + tracing::debug!("⏭️ Skipping empty SSE event"); + return Ok(Bytes::new()); + } + + if sse_event.data.trim() == "[DONE]" { + tracing::debug!("✅ Stream finished with [DONE]"); + return Ok(Bytes::new()); + } + + // Parse OpenAI chunk + match serde_json::from_str::(&sse_event.data) { + Ok(chunk) => { + tracing::debug!("✨ Transforming chunk with {} choices", chunk.choices.len()); + + // Transform to Anthropic format (raw SSE bytes) + let sse_output = Self::transform_openai_chunk_to_anthropic_sse( + &chunk, + &message_id, + &mut *is_first.lock().unwrap(), + &mut *has_content_block.lock().unwrap(), + &mut *stream_ended_properly.lock().unwrap() + ); + + tracing::debug!("📤 Sending {} bytes", sse_output.len()); + + // Return as raw bytes (already SSE-formatted) + Ok(Bytes::from(sse_output)) + } + Err(e) => { + tracing::warn!("❌ Failed to parse OpenAI chunk: {} - Data: {}", e, sse_event.data); + Ok(Bytes::new()) + } + } + } + Err(e) => { + tracing::error!("💥 Stream error: {}", e); + Err(ProviderError::HttpError(e)) + } + } + } + }) + .try_filter(|bytes| futures::future::ready(!bytes.is_empty())); + + // Add stream finalization to handle Cerebras bug where stream closes without finish_reason + let finalized_stream = transformed_stream.chain(futures::stream::once(async move { + // Only send end events if stream didn't end properly AND we have an open content block + let has_content = *has_content_for_cleanup.lock().unwrap(); + let ended_properly = *stream_ended_for_cleanup.lock().unwrap(); + + if has_content && !ended_properly { + tracing::warn!("⚠️ Stream ended without finish_reason - sending end events (Cerebras bug workaround)"); + + let mut output = String::new(); + + // Close content block + let block_stop = serde_json::json!({ + "type": "content_block_stop", + "index": 0 + }); + output.push_str(&format!("event: content_block_stop\ndata: {}\n\n", block_stop)); + + // Send message_delta with end_turn + let message_delta = serde_json::json!({ + "type": "message_delta", + "delta": { + "stop_reason": "end_turn", + "stop_sequence": null + }, + "usage": { + "output_tokens": 0 + } + }); + output.push_str(&format!("event: message_delta\ndata: {}\n\n", message_delta)); + + // Send message_stop + let message_stop = serde_json::json!({ + "type": "message_stop" + }); + output.push_str(&format!("event: message_stop\ndata: {}\n\n", message_stop)); + + Ok(Bytes::from(output)) + } else { + Ok(Bytes::new()) + } + })) + .try_filter(|bytes| futures::future::ready(!bytes.is_empty())); - Ok(Box::pin(stream)) + Ok(Box::pin(finalized_stream)) } fn supports_model(&self, model: &str) -> bool { From 67cca748e488e1dc7453d4c52db2204f801a3fe6 Mon Sep 17 00:00:00 2001 From: Eli Dickinson Date: Thu, 27 Nov 2025 21:29:55 -0500 Subject: [PATCH 2/3] fix(openai): transform tool_calls to Anthropic tool_use format in non-streaming responses The transform_response function was only extracting text content and ignoring tool_calls from OpenAI responses. This caused tool calling to fail silently. Now properly transforms OpenAI tool_calls format: - OpenAI: message.tool_calls[].function.{name, arguments} - Anthropic: content[].tool_use.{id, name, input} Streaming already had this transformation, now non-streaming matches. --- src/providers/openai.rs | 26 +++++++++++++++++++++++--- 1 file changed, 23 insertions(+), 3 deletions(-) diff --git a/src/providers/openai.rs b/src/providers/openai.rs index e86189a..2acbd68 100644 --- a/src/providers/openai.rs +++ b/src/providers/openai.rs @@ -789,6 +789,8 @@ impl OpenAIProvider { let choice = response.choices.into_iter().next() .expect("OpenAI response must have at least one choice"); + let mut content_blocks = Vec::new(); + // Extract text from content or reasoning (for GLM models via Cerebras) let text = if let Some(content) = choice.message.content { match content { @@ -813,13 +815,31 @@ impl OpenAIProvider { String::new() }; + // Add text content if present + if !text.is_empty() { + content_blocks.push(ContentBlock::Text { text }); + } + + // Transform tool_calls to Anthropic tool_use format + if let Some(tool_calls) = choice.message.tool_calls { + for tool_call in tool_calls { + // Parse arguments from JSON string + let input = serde_json::from_str(&tool_call.function.arguments) + .unwrap_or(serde_json::json!({})); + + content_blocks.push(ContentBlock::ToolUse { + id: tool_call.id, + name: tool_call.function.name, + input, + }); + } + } + ProviderResponse { id: response.id, r#type: "message".to_string(), role: "assistant".to_string(), - content: vec![ContentBlock::Text { - text, - }], + content: content_blocks, model: response.model, stop_reason: choice.finish_reason, stop_sequence: None, From 7a3fb3380b04afb1132fb55ff5695240b321bdde Mon Sep 17 00:00:00 2001 From: Eli Dickinson Date: Fri, 28 Nov 2025 22:20:41 -0500 Subject: [PATCH 3/3] refactor(openai): remove Cerebras incomplete stream workaround The workaround was added to handle Cerebras closing streams without finish_reason. However, the root cause was the SSE event queue bug in streaming.rs that dropped finish_reason events when multiple events arrived in the same TCP chunk. Now that the event queue bug is fixed, the workaround is no longer needed. This simplifies the streaming implementation by removing: - Stream finalization logic that generated synthetic end events - Arc variables used to track stream state for cleanup The streaming implementation now relies on providers sending proper finish_reason events, which works correctly with the fixed event queue. --- src/providers/openai.rs | 50 ++--------------------------------------- 1 file changed, 2 insertions(+), 48 deletions(-) diff --git a/src/providers/openai.rs b/src/providers/openai.rs index 2acbd68..009f3e2 100644 --- a/src/providers/openai.rs +++ b/src/providers/openai.rs @@ -1358,9 +1358,7 @@ impl AnthropicProvider for OpenAIProvider { let message_id = format!("msg_{}", uuid::Uuid::new_v4()); let is_first = Arc::new(Mutex::new(true)); let has_content_block = Arc::new(Mutex::new(false)); - let stream_ended_properly = Arc::new(Mutex::new(false)); // Track if finish_reason was received - let has_content_for_cleanup = has_content_block.clone(); // Clone before moving into closure - let stream_ended_for_cleanup = stream_ended_properly.clone(); + let stream_ended_properly = Arc::new(Mutex::new(false)); // Convert response bytes stream to SSE events let sse_stream = SseStream::new(response.bytes_stream()); @@ -1422,51 +1420,7 @@ impl AnthropicProvider for OpenAIProvider { }) .try_filter(|bytes| futures::future::ready(!bytes.is_empty())); - // Add stream finalization to handle Cerebras bug where stream closes without finish_reason - let finalized_stream = transformed_stream.chain(futures::stream::once(async move { - // Only send end events if stream didn't end properly AND we have an open content block - let has_content = *has_content_for_cleanup.lock().unwrap(); - let ended_properly = *stream_ended_for_cleanup.lock().unwrap(); - - if has_content && !ended_properly { - tracing::warn!("⚠️ Stream ended without finish_reason - sending end events (Cerebras bug workaround)"); - - let mut output = String::new(); - - // Close content block - let block_stop = serde_json::json!({ - "type": "content_block_stop", - "index": 0 - }); - output.push_str(&format!("event: content_block_stop\ndata: {}\n\n", block_stop)); - - // Send message_delta with end_turn - let message_delta = serde_json::json!({ - "type": "message_delta", - "delta": { - "stop_reason": "end_turn", - "stop_sequence": null - }, - "usage": { - "output_tokens": 0 - } - }); - output.push_str(&format!("event: message_delta\ndata: {}\n\n", message_delta)); - - // Send message_stop - let message_stop = serde_json::json!({ - "type": "message_stop" - }); - output.push_str(&format!("event: message_stop\ndata: {}\n\n", message_stop)); - - Ok(Bytes::from(output)) - } else { - Ok(Bytes::new()) - } - })) - .try_filter(|bytes| futures::future::ready(!bytes.is_empty())); - - Ok(Box::pin(finalized_stream)) + Ok(Box::pin(transformed_stream)) } fn supports_model(&self, model: &str) -> bool {