diff --git a/Cargo.toml b/Cargo.toml index 1d1743f..c700305 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -56,6 +56,7 @@ once_cell = "1" # Lazy statics async-trait = "0.1" # Async trait support dirs = "5" # User directories regex = "1" # Regular expressions +uuid = { version = "1.0", features = ["v4", "serde"] } # UUID generation for streaming # OAuth & Auth oauth2 = "4" # OAuth 2.0 client diff --git a/src/providers/openai.rs b/src/providers/openai.rs index 5a003ba..18c8d83 100644 --- a/src/providers/openai.rs +++ b/src/providers/openai.rs @@ -189,6 +189,57 @@ struct ResponsesUsage { output_tokens: u32, } +/// OpenAI Streaming Chunk (for SSE transformation) +#[derive(Debug, Deserialize)] +struct OpenAIStreamChunk { + id: String, + #[serde(default)] + model: String, + choices: Vec, + #[serde(default)] + created: u64, +} + +#[derive(Debug, Deserialize)] +struct OpenAIStreamChoice { + delta: OpenAIStreamDelta, + #[serde(default)] + index: usize, + #[serde(default)] + finish_reason: Option, +} + +#[derive(Debug, Deserialize)] +struct OpenAIStreamDelta { + #[serde(default)] + content: Option, + #[serde(default)] + reasoning: Option, // For GLM/Cerebras models + #[serde(default)] + role: Option, + #[serde(default)] + tool_calls: Option>, +} + +/// State for OpenAI → Anthropic SSE transformation +/// +/// Tracks streaming state across multiple chunks to properly transform +/// OpenAI's incremental tool call format to Anthropic's content block format. +#[derive(Debug, Default)] +struct StreamTransformState { + /// Has message_start been emitted? + message_started: bool, + /// Is a text content block currently open? + text_block_open: bool, + /// Tool call indices that have had content_block_start emitted + /// Maps OpenAI tool_call index → Anthropic content_block index + tool_blocks: std::collections::HashMap, + /// Next available content block index + next_block_index: u32, + /// Has finish_reason been received? + stream_ended: bool, +} + /// OpenAI provider implementation pub struct OpenAIProvider { name: String, @@ -575,7 +626,23 @@ impl OpenAIProvider { .map(|s| s.to_string()) } - /// Transform Anthropic request to OpenAI format + /// Transform Anthropic request format to OpenAI Chat Completions format. + /// + /// This handles the structural differences between the two APIs: + /// + /// # Message Content Transformation + /// - Anthropic: `content` can be string or array of typed blocks (text, image, tool_use, tool_result) + /// - OpenAI: `content` can be string or array of parts (text, image_url), with tools in separate fields + /// + /// # Key Transformations + /// - `tool_use` blocks → `tool_calls` array on assistant messages + /// - `tool_result` blocks → separate `tool` role messages (must come BEFORE user content) + /// - `image` blocks → `image_url` content parts with data URI encoding + /// - `thinking` blocks → dropped (OpenAI doesn't support this) + /// + /// # Tool Definition Mapping + /// - Anthropic: `{ name, description, input_schema }` + /// - OpenAI: `{ type: "function", function: { name, description, parameters } }` fn transform_request(&self, request: &AnthropicRequest) -> Result { let mut openai_messages = Vec::new(); @@ -684,7 +751,33 @@ impl OpenAIProvider { } } - // Add main message with content and/or tool_calls + // OpenAI Message Ordering for Tool Results + // ========================================== + // OpenAI requires tool response messages to appear BEFORE user content + // when a user message contains both tool_results and text content. + // + // In Anthropic's format, a single user message can contain mixed content: + // { role: "user", content: [tool_result, tool_result, text] } + // + // OpenAI requires separate messages in this order: + // 1. { role: "tool", tool_call_id: "...", content: "..." } // for each result + // 2. { role: "user", content: "..." } // user's text content + // + // This is critical for parallel tool calls where the user provides multiple + // tool results and then adds additional context or instructions. + + // Add separate tool result messages FIRST + for (tool_use_id, result_content) in tool_results { + openai_messages.push(OpenAIMessage { + role: "tool".to_string(), + content: Some(OpenAIContent::String(result_content)), + reasoning: None, + tool_calls: None, + tool_call_id: Some(tool_use_id), + }); + } + + // Then add main message with content and/or tool_calls if !content_parts.is_empty() || !tool_calls.is_empty() { let content = if content_parts.is_empty() { None @@ -707,17 +800,6 @@ impl OpenAIProvider { tool_call_id: None, }); } - - // Add separate tool result messages - for (tool_use_id, result_content) in tool_results { - openai_messages.push(OpenAIMessage { - role: "tool".to_string(), - content: Some(OpenAIContent::String(result_content)), - reasoning: None, - tool_calls: None, - tool_call_id: Some(tool_use_id), - }); - } } } } @@ -752,11 +834,22 @@ impl OpenAIProvider { }) } - /// Transform OpenAI response to Anthropic format + /// Transform OpenAI Chat Completions response to Anthropic Messages format. + /// + /// # Response Structure Mapping + /// - OpenAI: `{ id, model, choices: [{ message: { content, tool_calls }, finish_reason }], usage }` + /// - Anthropic: `{ id, model, content: [...blocks], stop_reason, usage }` + /// + /// # Content Extraction Priority + /// 1. `message.content` (string or parts array) + /// 2. `message.reasoning` (for GLM/Cerebras models with chain-of-thought) + /// 3. `message.tool_calls` → converted to `tool_use` content blocks fn transform_response(&self, response: OpenAIResponse) -> ProviderResponse { let choice = response.choices.into_iter().next() .expect("OpenAI response must have at least one choice"); + let mut content_blocks = Vec::new(); + // Extract text from content or reasoning (for GLM models via Cerebras) let text = if let Some(content) = choice.message.content { match content { @@ -781,15 +874,51 @@ impl OpenAIProvider { String::new() }; + // Add text content if present + if !text.is_empty() { + content_blocks.push(ContentBlock::Text { text }); + } + + // Non-streaming Tool Calls Transformation + // ======================================== + // OpenAI returns tool_calls as an array in the message: + // { id: "call_xxx", type: "function", function: { name: "...", arguments: "{...}" } } + // + // We transform each to Anthropic's tool_use content block: + // { type: "tool_use", id: "...", name: "...", input: {...} } + // + // Note: OpenAI's `arguments` is a JSON string that we parse into `input` object. + if let Some(tool_calls) = choice.message.tool_calls { + for tool_call in tool_calls { + // Parse arguments from JSON string + let input = serde_json::from_str(&tool_call.function.arguments) + .unwrap_or(serde_json::json!({})); + + content_blocks.push(ContentBlock::ToolUse { + id: tool_call.id, + name: tool_call.function.name, + input, + }); + } + } + + // Map OpenAI finish_reason to Anthropic stop_reason + let stop_reason = choice.finish_reason.map(|reason| { + match reason.as_str() { + "stop" => "end_turn".to_string(), + "length" => "max_tokens".to_string(), + "tool_calls" => "tool_use".to_string(), + _ => "end_turn".to_string(), + } + }); + ProviderResponse { id: response.id, r#type: "message".to_string(), role: "assistant".to_string(), - content: vec![ContentBlock::Text { - text, - }], + content: content_blocks, model: response.model, - stop_reason: choice.finish_reason, + stop_reason, stop_sequence: None, usage: Usage { input_tokens: response.usage.prompt_tokens, @@ -828,6 +957,253 @@ impl OpenAIProvider { }, } } + + /// Transform OpenAI streaming chunk to Anthropic SSE format. + /// + /// This function converts OpenAI's Chat Completions streaming format to Anthropic's + /// Messages API streaming format. The transformation is stateful and handles: + /// + /// # Event Mapping (OpenAI → Anthropic) + /// - First chunk → `message_start` (initializes the message envelope) + /// - `delta.content` / `delta.reasoning` → `content_block_start` + `content_block_delta` + /// - `delta.tool_calls` → `content_block_start` (tool_use) + `input_json_delta` (incremental) + /// - `finish_reason` → `content_block_stop` (for all open blocks) + `message_delta` + `message_stop` + /// + /// # Tool Call Streaming + /// OpenAI sends tool calls incrementally: + /// - First chunk: `{ index: 0, id: "call_xxx", function: { name: "get_weather", arguments: "" } }` + /// - Next chunks: `{ index: 0, function: { arguments: "{\"loc" } }` + /// - More chunks: `{ index: 0, function: { arguments: "ation\":" } }` + /// + /// We transform this to Anthropic format: + /// - On first chunk (has id+name): emit `content_block_start` with type=tool_use + /// - On argument chunks: emit `content_block_delta` with partial_json + /// - On finish_reason: emit `content_block_stop` for all open tool blocks + /// + /// # Provider Quirks + /// - GLM/Cerebras models use `reasoning` field instead of `content` for chain-of-thought + /// - Cerebras may close the stream without sending `finish_reason` (handled by caller) + fn transform_openai_chunk_to_anthropic_sse(chunk: &OpenAIStreamChunk, message_id: &str, state: &mut StreamTransformState) -> String { + let mut output = String::new(); + + // First chunk: emit message_start + if !state.message_started { + state.message_started = true; + let message_start = serde_json::json!({ + "type": "message_start", + "message": { + "id": message_id, + "type": "message", + "role": "assistant", + "content": [], + "model": chunk.model, + "stop_reason": null, + "stop_sequence": null, + "usage": { + "input_tokens": 0, + "output_tokens": 0 + } + } + }); + output.push_str(&format!("event: message_start\ndata: {}\n\n", message_start)); + } + + // Process delta content + for choice in &chunk.choices { + // Handle text content (content or reasoning fields) + let text_content = choice.delta.content.as_ref() + .or(choice.delta.reasoning.as_ref()); // Support reasoning field for GLM/Cerebras + + if let Some(text) = text_content { + // Don't use continue for empty text - finish_reason processing + // is required even when content is empty to ensure proper stream termination. + if !text.is_empty() { + + // Emit content_block_start if this is the first text content + if !state.text_block_open { + state.text_block_open = true; + state.next_block_index = 1; // Text block is always index 0 + let block_start = serde_json::json!({ + "type": "content_block_start", + "index": 0, + "content_block": { + "type": "text", + "text": "" + } + }); + output.push_str(&format!("event: content_block_start\ndata: {}\n\n", block_start)); + } + + // Emit content_block_delta + let delta = serde_json::json!({ + "type": "content_block_delta", + "index": 0, + "delta": { + "type": "text_delta", + "text": text + } + }); + output.push_str(&format!("event: content_block_delta\ndata: {}\n\n", delta)); + } + } + + // Tool Calls Transformation (OpenAI function calling → Anthropic tool_use) + // ========================================================================== + // OpenAI sends tool calls incrementally: + // First chunk: { index: 0, id: "call_xxx", function: { name: "...", arguments: "" } } + // Next chunks: { index: 0, function: { arguments: "{\"loc" } } + // + // Anthropic expects: + // content_block_start: { type: "tool_use", id: "...", name: "...", input: {} } + // content_block_delta: { type: "input_json_delta", partial_json: "..." } + // content_block_stop: (only at finish_reason) + if let Some(ref tool_calls) = choice.delta.tool_calls { + // Close text block if open (tool calls come after text) + if state.text_block_open { + let block_stop = serde_json::json!({ + "type": "content_block_stop", + "index": 0 + }); + output.push_str(&format!("event: content_block_stop\ndata: {}\n\n", block_stop)); + state.text_block_open = false; + } + + for tool_call in tool_calls { + // Get the tool call index from OpenAI + let tool_index = tool_call.get("index") + .and_then(|v| v.as_u64()) + .unwrap_or(0) as u32; + + // Check if this is the first chunk for this tool (has id and name) + let has_id = tool_call.get("id").and_then(|v| v.as_str()).is_some(); + let has_name = tool_call.get("function") + .and_then(|f| f.get("name")) + .and_then(|n| n.as_str()) + .is_some(); + + if has_id && has_name && !state.tool_blocks.contains_key(&tool_index) { + // First chunk for this tool: emit content_block_start + let tool_id = tool_call.get("id") + .and_then(|v| v.as_str()) + .unwrap_or("tool_0"); + let tool_name = tool_call.get("function") + .and_then(|f| f.get("name")) + .and_then(|n| n.as_str()) + .unwrap_or("unknown"); + + let block_index = state.next_block_index; + state.tool_blocks.insert(tool_index, block_index); + state.next_block_index += 1; + + tracing::debug!("🔧 Tool start: {} (id: {}) at block index {}", tool_name, tool_id, block_index); + + let block_start = serde_json::json!({ + "type": "content_block_start", + "index": block_index, + "content_block": { + "type": "tool_use", + "id": tool_id, + "name": tool_name, + "input": {} + } + }); + output.push_str(&format!("event: content_block_start\ndata: {}\n\n", block_start)); + } + + // Emit argument chunks as input_json_delta + if let Some(args) = tool_call.get("function") + .and_then(|f| f.get("arguments")) + .and_then(|a| a.as_str()) + { + if !args.is_empty() { + // Get the block index for this tool + let block_index = state.tool_blocks.get(&tool_index).copied() + .unwrap_or_else(|| { + // Tool block wasn't started yet (shouldn't happen, but handle gracefully) + let idx = state.next_block_index; + state.tool_blocks.insert(tool_index, idx); + state.next_block_index += 1; + idx + }); + + let input_delta = serde_json::json!({ + "type": "content_block_delta", + "index": block_index, + "delta": { + "type": "input_json_delta", + "partial_json": args + } + }); + output.push_str(&format!("event: content_block_delta\ndata: {}\n\n", input_delta)); + } + } + } + } + + // Stream Termination (finish_reason handling) + // ============================================= + // When OpenAI sends a chunk with finish_reason, we need to emit the + // Anthropic stream termination sequence: + // 1. content_block_stop (for text block if open) + // 2. content_block_stop (for each open tool block) + // 3. message_delta (with stop_reason mapped from finish_reason) + // 4. message_stop (signals end of message) + if let Some(reason) = &choice.finish_reason { + state.stream_ended = true; + + // Close text block if still open + if state.text_block_open { + let block_stop = serde_json::json!({ + "type": "content_block_stop", + "index": 0 + }); + output.push_str(&format!("event: content_block_stop\ndata: {}\n\n", block_stop)); + } + + // Close all open tool blocks + for (_, block_index) in &state.tool_blocks { + let block_stop = serde_json::json!({ + "type": "content_block_stop", + "index": block_index + }); + output.push_str(&format!("event: content_block_stop\ndata: {}\n\n", block_stop)); + } + + // Emit message_delta with stop reason + // Mapping: OpenAI finish_reason → Anthropic stop_reason + let stop_reason = match reason.as_str() { + "stop" => "end_turn", + "length" => "max_tokens", + "tool_calls" => "tool_use", // Model wants to execute tools + _ => "end_turn" + }; + let message_delta = serde_json::json!({ + "type": "message_delta", + "delta": { + "stop_reason": stop_reason, + "stop_sequence": null + }, + "usage": { + "output_tokens": 0 + } + }); + output.push_str(&format!("event: message_delta\ndata: {}\n\n", message_delta)); + + // Emit message_stop + let message_stop = serde_json::json!({ + "type": "message_stop" + }); + output.push_str(&format!("event: message_stop\ndata: {}\n\n", message_stop)); + } + } + + // If no events were emitted but we processed a chunk, send a ping + if output.is_empty() { + output.push_str(": ping\n\n"); + } + + output + } } #[async_trait] @@ -1120,11 +1496,140 @@ impl AnthropicProvider for OpenAIProvider { }); } - // TODO: Transform OpenAI SSE format to Anthropic SSE format - // For now, just pass through the stream - let stream = response.bytes_stream().map_err(|e| ProviderError::HttpError(e)); + // Transform OpenAI SSE format to Anthropic SSE format + use futures::stream::StreamExt; + use crate::providers::streaming::SseStream; + use std::sync::{Arc, Mutex}; + + let message_id = format!("msg_{}", uuid::Uuid::new_v4()); + + // Streaming State Management + // =========================== + // Using Arc> to track state across async chunks. + // The state tracks: message_started, text_block_open, tool_blocks, stream_ended + let state = Arc::new(Mutex::new(StreamTransformState::default())); + let state_for_cleanup = state.clone(); + + // Convert response bytes stream to SSE events + let sse_stream = SseStream::new(response.bytes_stream()); + + // Transform OpenAI SSE events to Anthropic format + let transformed_stream = sse_stream.then(move |result| { + let message_id = message_id.clone(); + let state = state.clone(); + + async move { + match result { + Ok(sse_event) => { + // If stream already ended, don't process any more chunks + if state.lock().unwrap().stream_ended { + tracing::debug!("⏹️ Stream already ended, skipping chunk"); + return Ok(Bytes::new()); + } + + tracing::debug!("📦 Received SSE chunk: {}", sse_event.data); + + // Skip empty data + if sse_event.data.trim().is_empty() { + tracing::debug!("⏭️ Skipping empty SSE event"); + return Ok(Bytes::new()); + } + + if sse_event.data.trim() == "[DONE]" { + tracing::debug!("✅ Stream finished with [DONE]"); + return Ok(Bytes::new()); + } + + // Parse OpenAI chunk + match serde_json::from_str::(&sse_event.data) { + Ok(chunk) => { + tracing::debug!("✨ Transforming chunk with {} choices", chunk.choices.len()); + + // Transform to Anthropic format (raw SSE bytes) + let sse_output = Self::transform_openai_chunk_to_anthropic_sse( + &chunk, + &message_id, + &mut *state.lock().unwrap() + ); + + if !sse_output.is_empty() { + tracing::debug!("SSE: {} bytes", sse_output.len()); + } + + // Return as raw bytes (already SSE-formatted) + Ok(Bytes::from(sse_output)) + } + Err(e) => { + tracing::warn!("❌ Failed to parse OpenAI chunk: {} - Data: {}", e, sse_event.data); + Ok(Bytes::new()) + } + } + } + Err(e) => { + tracing::error!("💥 Stream error: {}", e); + Err(ProviderError::HttpError(e)) + } + } + } + }) + .try_filter(|bytes| futures::future::ready(!bytes.is_empty())); + + // Add stream finalization to ensure proper termination + // Some providers close streams without sending finish_reason + let finalized_stream = transformed_stream.chain(futures::stream::once(async move { + let state = state_for_cleanup.lock().unwrap(); + + // Only send end events if stream didn't end properly + if state.message_started && !state.stream_ended { + tracing::warn!("⚠️ Stream ended without finish_reason - sending end events"); + + let mut output = String::new(); + + // Close text block if open + if state.text_block_open { + let block_stop = serde_json::json!({ + "type": "content_block_stop", + "index": 0 + }); + output.push_str(&format!("event: content_block_stop\ndata: {}\n\n", block_stop)); + } + + // Close all tool blocks + for (_, block_index) in &state.tool_blocks { + let block_stop = serde_json::json!({ + "type": "content_block_stop", + "index": block_index + }); + output.push_str(&format!("event: content_block_stop\ndata: {}\n\n", block_stop)); + } + + // Send message_delta with end_turn (we don't know the real stop_reason) + let message_delta = serde_json::json!({ + "type": "message_delta", + "delta": { + "stop_reason": "end_turn", + "stop_sequence": null + }, + "usage": { + "output_tokens": 0 + } + }); + output.push_str(&format!("event: message_delta\ndata: {}\n\n", message_delta)); + + // Send message_stop + let message_stop = serde_json::json!({ + "type": "message_stop" + }); + output.push_str(&format!("event: message_stop\ndata: {}\n\n", message_stop)); + + Ok(Bytes::from(output)) + } else { + Ok(Bytes::new()) + } + })) + .try_filter(|bytes| futures::future::ready(!bytes.is_empty())); - Ok(Box::pin(stream)) + Ok(Box::pin(finalized_stream)) } fn supports_model(&self, model: &str) -> bool { diff --git a/src/providers/streaming.rs b/src/providers/streaming.rs index c79dc4e..b6b3e5c 100644 --- a/src/providers/streaming.rs +++ b/src/providers/streaming.rs @@ -69,6 +69,8 @@ pub struct SseStream { #[pin] inner: S, buffer: String, + /// Queue of parsed events waiting to be emitted + event_queue: std::collections::VecDeque, } impl SseStream { @@ -76,6 +78,7 @@ impl SseStream { Self { inner: stream, buffer: String::new(), + event_queue: std::collections::VecDeque::new(), } } } @@ -89,6 +92,12 @@ where fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = self.project(); + // First, check if we have queued events to emit + if let Some(event) = this.event_queue.pop_front() { + return Poll::Ready(Some(Ok(event))); + } + + // Poll the inner stream for new data match this.inner.poll_next(cx) { Poll::Ready(Some(Ok(bytes))) => { // Add new bytes to buffer @@ -96,12 +105,23 @@ where this.buffer.push_str(text); // Try to parse complete events from buffer - let events = parse_sse_events(this.buffer); - - if let Some(event) = events.first() { - // Clear buffer after parsing - *this.buffer = String::new(); - return Poll::Ready(Some(Ok(event.clone()))); + // Note: We only clear buffer up to the last complete event + if let Some(last_event_end) = this.buffer.rfind("\n\n") { + let complete_portion = &this.buffer[..last_event_end + 2]; + let events = parse_sse_events(complete_portion); + + // Add all parsed events to queue + for event in events { + this.event_queue.push_back(event); + } + + // Keep only the incomplete portion in buffer + *this.buffer = this.buffer[last_event_end + 2..].to_string(); + + // Return the first queued event if available + if let Some(event) = this.event_queue.pop_front() { + return Poll::Ready(Some(Ok(event))); + } } } @@ -116,10 +136,17 @@ where let events = parse_sse_events(this.buffer); *this.buffer = String::new(); - if let Some(event) = events.first() { - return Poll::Ready(Some(Ok(event.clone()))); + // Add all parsed events to queue + for event in events { + this.event_queue.push_back(event); } } + + // Return next queued event, or None if queue is empty + if let Some(event) = this.event_queue.pop_front() { + return Poll::Ready(Some(Ok(event))); + } + Poll::Ready(None) } Poll::Pending => Poll::Pending, diff --git a/src/server/mod.rs b/src/server/mod.rs index df364ab..7a1ca8b 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -7,6 +7,7 @@ use crate::router::Router; use crate::providers::ProviderRegistry; use crate::auth::TokenStore; use axum::{ + body::Body, extract::State, http::{HeaderMap, StatusCode}, response::{ @@ -18,7 +19,7 @@ use axum::{ use std::sync::Arc; use tokio::net::TcpListener; use tracing::{error, info}; -use futures::stream::StreamExt; +use futures::stream::TryStreamExt; /// Application state shared across handlers #[derive(Clone)] @@ -704,20 +705,24 @@ async fn handle_messages( Ok(stream) => { info!("✅ Streaming request started with provider: {}", mapping.provider); - // Convert byte stream to SSE response - // The provider returns raw bytes (SSE format), we pass them through - let sse_stream = stream.map(|result| { - result.map(|bytes| { - // Convert bytes to string for SSE event - let data = String::from_utf8_lossy(&bytes).to_string(); - Event::default().data(data) - }).map_err(|e| { - error!("Stream error: {}", e); - std::io::Error::new(std::io::ErrorKind::Other, e.to_string()) - }) + // Convert provider stream to HTTP response + // The provider already returns properly formatted SSE bytes (event: + data: lines) + // We pass them through as-is without wrapping + let body_stream = stream.map_err(|e| { + error!("Stream error: {}", e); + std::io::Error::new(std::io::ErrorKind::Other, e.to_string()) }); - return Ok(Sse::new(sse_stream).into_response()); + let body = Body::from_stream(body_stream); + let response = Response::builder() + .status(200) + .header("Content-Type", "text/event-stream") + .header("Cache-Control", "no-cache") + .header("Connection", "keep-alive") + .body(body) + .unwrap(); + + return Ok(response); } Err(e) => { info!("⚠️ Provider {} streaming failed: {}, trying next fallback", mapping.provider, e);