From 3d54069f1b5e90d1406fb0521048b8096876e5ef Mon Sep 17 00:00:00 2001 From: oldjs Date: Wed, 6 Aug 2025 08:03:58 +0800 Subject: [PATCH 1/3] feat(proxy): add OpenAI-compatible streaming response format MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit feat(proxy): 添加OpenAI兼容的流式响应格式 --- docker-compose.yml | 2 +- proxy_handler.py | 103 +++++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 100 insertions(+), 5 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index 5e8822d..3b2c273 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -47,7 +47,7 @@ services: - REFRESH_CHECK_INTERVAL=${REFRESH_CHECK_INTERVAL:-3600} # 刷新检查间隔(秒) volumes: # 可选:挂载日志目录 - - ./logs:/app/logs + - logs:/app/logs restart: unless-stopped healthcheck: test: ["CMD", "curl", "-f", "http://localhost:8000/health"] diff --git a/proxy_handler.py b/proxy_handler.py index 4507688..e220a12 100644 --- a/proxy_handler.py +++ b/proxy_handler.py @@ -264,10 +264,105 @@ async def handle_chat_completion(self, request: ChatCompletionRequest): async def stream_response( self, response: httpx.Response, model: str ) -> AsyncGenerator[str, None]: - """Generate streaming response""" - async for parsed in self.process_streaming_response(response): - yield f"data: {json.dumps(parsed)}\n\n" - yield "data: [DONE]\n\n" + """Generate streaming response in OpenAI format""" + import uuid + import time + + # Generate a unique completion ID + completion_id = f"chatcmpl-{uuid.uuid4().hex[:29]}" + + # Track content for transformation if needed + accumulated_content = "" + current_phase = None + + try: + async for parsed in self.process_streaming_response(response): + try: + data = parsed.get("data", {}) + delta_content = data.get("delta_content", "") + phase = data.get("phase", "") + + # Track phase changes + if phase != current_phase: + current_phase = phase + logger.debug(f"Phase changed to: {phase}") + + # For SHOW_THINK_TAGS=false, only send content during answer phase + if ( + not settings.SHOW_THINK_TAGS + and phase != "answer" + and delta_content + ): + logger.debug( + f"Skipping content in {phase} phase (SHOW_THINK_TAGS=false)" + ) + accumulated_content += delta_content # Still accumulate for potential transformation + continue + + # Accumulate all content for potential transformation + accumulated_content += delta_content + + # Apply content transformation to the delta + if delta_content: + # For streaming, we need to be careful about transformation + # Only transform if we have complete thinking blocks + if settings.SHOW_THINK_TAGS: + # Convert
to tags on the fly + transformed_delta = delta_content + transformed_delta = re.sub( + r"]*>", "", transformed_delta + ) + transformed_delta = transformed_delta.replace( + "
", "" + ) + transformed_delta = re.sub( + r".*?", + "", + transformed_delta, + flags=re.DOTALL, + ) + else: + # For non-think mode in streaming, just pass through answer content + transformed_delta = delta_content + + # Create OpenAI-compatible streaming chunk + openai_chunk = { + "id": completion_id, + "object": "chat.completion.chunk", + "created": int(time.time()), + "model": model, + "choices": [ + { + "index": 0, + "delta": {"content": transformed_delta}, + "finish_reason": None, + } + ], + } + + yield f"data: {json.dumps(openai_chunk)}\n\n" + + except Exception as e: + logger.error(f"Error processing streaming chunk: {e}") + continue + + # Send final completion chunk + final_chunk = { + "id": completion_id, + "object": "chat.completion.chunk", + "created": int(time.time()), + "model": model, + "choices": [{"index": 0, "delta": {}, "finish_reason": "stop"}], + } + + yield f"data: {json.dumps(final_chunk)}\n\n" + yield "data: [DONE]\n\n" + + except Exception as e: + logger.error(f"Streaming error: {e}") + # Send error in OpenAI format + error_chunk = {"error": {"message": str(e), "type": "server_error"}} + yield f"data: {json.dumps(error_chunk)}\n\n" async def non_stream_response( self, response: httpx.Response, model: str From e525bbe690f1c910f62f289e317b7f7644882e9d Mon Sep 17 00:00:00 2001 From: oldjs Date: Wed, 6 Aug 2025 08:07:58 +0800 Subject: [PATCH 2/3] feat(proxy): add OpenAI-compatible streaming response format MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit feat(proxy): 添加OpenAI兼容的流式响应格式 --- docker-compose.yml | 2 +- proxy_handler.py | 103 +++++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 100 insertions(+), 5 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index 5e8822d..3b2c273 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -47,7 +47,7 @@ services: - REFRESH_CHECK_INTERVAL=${REFRESH_CHECK_INTERVAL:-3600} # 刷新检查间隔(秒) volumes: # 可选:挂载日志目录 - - ./logs:/app/logs + - logs:/app/logs restart: unless-stopped healthcheck: test: ["CMD", "curl", "-f", "http://localhost:8000/health"] diff --git a/proxy_handler.py b/proxy_handler.py index 4507688..e220a12 100644 --- a/proxy_handler.py +++ b/proxy_handler.py @@ -264,10 +264,105 @@ async def handle_chat_completion(self, request: ChatCompletionRequest): async def stream_response( self, response: httpx.Response, model: str ) -> AsyncGenerator[str, None]: - """Generate streaming response""" - async for parsed in self.process_streaming_response(response): - yield f"data: {json.dumps(parsed)}\n\n" - yield "data: [DONE]\n\n" + """Generate streaming response in OpenAI format""" + import uuid + import time + + # Generate a unique completion ID + completion_id = f"chatcmpl-{uuid.uuid4().hex[:29]}" + + # Track content for transformation if needed + accumulated_content = "" + current_phase = None + + try: + async for parsed in self.process_streaming_response(response): + try: + data = parsed.get("data", {}) + delta_content = data.get("delta_content", "") + phase = data.get("phase", "") + + # Track phase changes + if phase != current_phase: + current_phase = phase + logger.debug(f"Phase changed to: {phase}") + + # For SHOW_THINK_TAGS=false, only send content during answer phase + if ( + not settings.SHOW_THINK_TAGS + and phase != "answer" + and delta_content + ): + logger.debug( + f"Skipping content in {phase} phase (SHOW_THINK_TAGS=false)" + ) + accumulated_content += delta_content # Still accumulate for potential transformation + continue + + # Accumulate all content for potential transformation + accumulated_content += delta_content + + # Apply content transformation to the delta + if delta_content: + # For streaming, we need to be careful about transformation + # Only transform if we have complete thinking blocks + if settings.SHOW_THINK_TAGS: + # Convert
to tags on the fly + transformed_delta = delta_content + transformed_delta = re.sub( + r"]*>", "", transformed_delta + ) + transformed_delta = transformed_delta.replace( + "
", "" + ) + transformed_delta = re.sub( + r".*?", + "", + transformed_delta, + flags=re.DOTALL, + ) + else: + # For non-think mode in streaming, just pass through answer content + transformed_delta = delta_content + + # Create OpenAI-compatible streaming chunk + openai_chunk = { + "id": completion_id, + "object": "chat.completion.chunk", + "created": int(time.time()), + "model": model, + "choices": [ + { + "index": 0, + "delta": {"content": transformed_delta}, + "finish_reason": None, + } + ], + } + + yield f"data: {json.dumps(openai_chunk)}\n\n" + + except Exception as e: + logger.error(f"Error processing streaming chunk: {e}") + continue + + # Send final completion chunk + final_chunk = { + "id": completion_id, + "object": "chat.completion.chunk", + "created": int(time.time()), + "model": model, + "choices": [{"index": 0, "delta": {}, "finish_reason": "stop"}], + } + + yield f"data: {json.dumps(final_chunk)}\n\n" + yield "data: [DONE]\n\n" + + except Exception as e: + logger.error(f"Streaming error: {e}") + # Send error in OpenAI format + error_chunk = {"error": {"message": str(e), "type": "server_error"}} + yield f"data: {json.dumps(error_chunk)}\n\n" async def non_stream_response( self, response: httpx.Response, model: str From 8a55e7a8b0515b66339542a06a7d4929f280a067 Mon Sep 17 00:00:00 2001 From: oldjs Date: Wed, 6 Aug 2025 08:16:05 +0800 Subject: [PATCH 3/3] ci(docker): expand docker build workflow trigger branches MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ci(docker): 扩展docker构建工作流触发分支 --- .github/workflows/docker-build.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/docker-build.yml b/.github/workflows/docker-build.yml index 437a1a7..f2338b8 100644 --- a/.github/workflows/docker-build.yml +++ b/.github/workflows/docker-build.yml @@ -2,10 +2,10 @@ name: Build and Push Docker Image on: push: - branches: [main, master, develop, docker] + branches: ["*"] tags: ["v*"] pull_request: - branches: [main, master, develop, docker] + branches: ["*"] env: REGISTRY: ghcr.io