From 1939d78a8da70f8b9dcf3fbb2259388951e20159 Mon Sep 17 00:00:00 2001 From: Vivian <106744968+fwVivian@users.noreply.github.com> Date: Thu, 23 Apr 2026 22:03:55 +0800 Subject: [PATCH 1/5] =?UTF-8?q?=E6=9B=B4=E6=96=B0=E7=89=88=E6=9C=AC?= =?UTF-8?q?=E8=87=B30.1.7=EF=BC=8C=E5=A2=9E=E5=BC=BAAPI=E5=BC=82=E5=B8=B8?= =?UTF-8?q?=E5=A4=84=E7=90=86=EF=BC=8C=E4=BC=98=E5=8C=96=E6=B5=81=E5=BC=8F?= =?UTF-8?q?=E5=93=8D=E5=BA=94=EF=BC=8C=E6=B7=BB=E5=8A=A0Web=E6=90=9C?= =?UTF-8?q?=E7=B4=A2=E5=8A=9F=E8=83=BD=E6=94=AF=E6=8C=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pipes/openai_responses.py | 724 ++++++++++++++++++++++++++++++++------ 1 file changed, 613 insertions(+), 111 deletions(-) diff --git a/pipes/openai_responses.py b/pipes/openai_responses.py index 521af1f..22efed4 100644 --- a/pipes/openai_responses.py +++ b/pipes/openai_responses.py @@ -2,7 +2,7 @@ title: OpenAI Responses author: OVINC CN git_url: https://github.com/OVINC-CN/OpenWebUIPlugin.git -version: 0.1.2 +version: 0.1.7 licence: MIT """ @@ -10,7 +10,7 @@ import logging import time import uuid -from typing import AsyncIterable, Literal, Optional, Tuple +from typing import Any, AsyncIterable, Literal, Optional, Tuple import httpx from fastapi import Request @@ -24,23 +24,37 @@ class APIException(Exception): - def __init__(self, status: int, content: str, response: Response): + def __init__( + self, + status: int, + content: str, + response: Response, + request_id: str = "", + ): self._status = status self._content = content self._response = response + self._request_id = request_id def __str__(self) -> str: - # error msg + message = "" try: - return json.loads(self._content)["error"]["message"] + message = json.loads(self._content)["error"]["message"] except Exception: pass - # build in error - try: - self._response.raise_for_status() - except Exception as err: - return str(err) - return "Unknown API error" + + if not message: + try: + self._response.raise_for_status() + except Exception as err: + message = str(err) + + if not message: + message = "Unknown API error" + + if self._request_id: + return f"{message} (request_id: {self._request_id})" + return message class Pipe: @@ -49,146 +63,628 @@ class Valves(BaseModel): api_key: str = Field(default="", title="API Key") enable_reasoning: bool = Field(default=True, title="展示思考内容") allow_params: Optional[str] = Field( - default="", title="透传参数", description="允许配置的参数,使用英文逗号分隔,例如 temperature" + default="", + title="透传参数", + description="允许透传的参数,英文逗号分隔,例如 temperature,top_p", ) timeout: int = Field(default=600, title="请求超时时间(秒)") proxy: Optional[str] = Field(default="", title="代理地址") - models: str = Field(default="gpt-5", title="模型", description="使用英文逗号分隔多个模型") + models: str = Field( + default="gpt-5.1,gpt-5", + title="模型", + description="使用英文逗号分隔多个模型", + ) + + enable_web_search: bool = Field(default=True, title="启用 OpenAI Web Search") + web_search_context_size: Literal["low", "medium", "high"] = Field( + default="medium", + title="Web Search 上下文大小", + ) + web_search_domains: Optional[str] = Field( + default="", + title="Web Search 域名白名单", + description="英文逗号分隔,例如 openai.com,platform.openai.com", + ) + web_search_country: Optional[str] = Field( + default="", + title="搜索国家", + description="两位国家代码,例如 US、CN", + ) + web_search_city: Optional[str] = Field(default="", title="搜索城市") + web_search_region: Optional[str] = Field(default="", title="搜索地区/省州") + web_search_timezone: Optional[str] = Field( + default="", + title="搜索时区", + description="IANA 时区,例如 Asia/Shanghai", + ) + append_sources_to_answer: bool = Field( + default=True, + title="在答案末尾追加来源链接", + ) class UserValves(BaseModel): - verbosity: Literal["low", "medium", "high"] = Field(default="medium", title="输出详细程度") - reasoning_effort: Literal["none", "low", "medium", "high", "xhigh"] = Field(default="low", title="思考推理强度") - summary: Literal["auto", "concise", "detailed"] = Field(default="auto", title="思考输出摘要程度") + verbosity: Literal["low", "medium", "high"] = Field( + default="medium", + title="输出详细程度", + ) + reasoning_effort: Literal["none", "low", "medium", "high", "xhigh"] = Field( + default="low", + title="思考推理强度", + ) + summary: Literal["auto", "concise", "detailed"] = Field( + default="auto", + title="思考输出摘要程度", + ) def __init__(self): self.valves = self.Valves() def pipes(self): - return [{"id": model, "name": model} for model in self.valves.models.split(",") if model] + return [ + {"id": model.strip(), "name": model.strip()} + for model in self.valves.models.split(",") + if model.strip() + ] + + async def pipe( + self, body: dict, __user__: dict, __request__: Request + ) -> StreamingResponse: + return StreamingResponse( + self.__stream_pipe(body=body, __user__=__user__, __request__=__request__), + media_type="text/event-stream", + ) - async def pipe(self, body: dict, __user__: dict, __request__: Request) -> StreamingResponse: - return StreamingResponse(self.__stream_pipe(body=body, __user__=__user__, __request__=__request__)) + async def __stream_pipe( + self, body: dict, __user__: dict, __request__: Request + ) -> AsyncIterable[str]: + user_valves = self._coerce_user_valves((__user__ or {}).get("valves")) + model, payload = await self._build_payload(body=body, user_valves=user_valves) - async def __stream_pipe(self, body: dict, __user__: dict, __request__: Request) -> AsyncIterable: - model, payload = await self._build_payload(body=body, user_valves=__user__["valves"]) - # call client async with httpx.AsyncClient( - base_url=self.valves.base_url, + base_url=self.valves.base_url.rstrip("/") + "/", headers={"Authorization": f"Bearer {self.valves.api_key}"}, proxy=self.valves.proxy or None, trust_env=True, timeout=self.valves.timeout, ) as client: async with client.stream(**payload) as response: + request_id = response.headers.get("x-request-id", "") + if response.status_code != 200: - text = "" - async for line in response.aiter_lines(): - text += line # pylint: disable=R1713 - logger.error("response invalid with %d: %s", response.status_code, text) - raise APIException(status=response.status_code, content=text, response=response) + text = await self._read_error_text(response) + logger.error( + "response invalid with %d request_id=%s body=%s", + response.status_code, + request_id, + text, + ) + raise APIException( + status=response.status_code, + content=text, + response=response, + request_id=request_id, + ) + is_thinking = self.valves.enable_reasoning + usage = None + annotations = [] + response_sources = [] + async for line in response.aiter_lines(): line = line.strip() if not line: continue if line.startswith("event:") or not line.startswith("data:"): continue - if line.startswith("data: "): - line = line[6:] - if isinstance(line, str): - line = json.loads(line) - match line.get("type"): - case "response.reasoning_summary_text.delta": - if is_thinking: - yield self._format_stream_data(model=model, reasoning_content=line["delta"]) - case "response.output_text.delta": - if is_thinking: - is_thinking = False - yield self._format_stream_data(model=model, content=line["delta"]) - case "response.completed": + + line = line[5:].strip() + if not line or line == "[DONE]": + continue + + try: + event = json.loads(line) + except json.JSONDecodeError: + logger.warning("ignore invalid stream line: %s", line) + continue + + event_type = event.get("type", "") + + if event_type == "response.reasoning_summary_text.delta": + if is_thinking and event.get("delta"): yield self._format_stream_data( - model=model, usage=line["response"]["usage"], if_finished=True + model=model, + reasoning_content=event["delta"], ) - case _: - event_type = line["type"] - if event_type.endswith("in_progress") or event_type.endswith("completed"): - event_type_split = event_type.split(".")[1:] - if len(event_type_split) == 2: - data = { - "event": { - "type": "status", - "data": { - "description": " ".join(event_type_split), - "done": event_type_split[1] == "completed", - }, - } - } - yield f"data: {json.dumps(data)}\n\n" - - async def _build_payload(self, body: dict, user_valves: UserValves, stream: bool = True) -> Tuple[str, dict]: - model = body["model"].split(".", 1)[1] - - # build messages - messages = [] - for message in body["messages"]: - if isinstance(message["content"], str): - messages.append({"content": message["content"], "role": message["role"]}) - elif isinstance(message["content"], list): - content = [] - for item in message["content"]: - if item["type"] == "text": - content.append({"type": "input_text", "text": item["text"]}) - elif item["type"] == "image_url": - content.append( - { - "type": "input_image", - "image_url": item["image_url"]["url"], - } + continue + + if event_type == "response.output_text.delta": + if is_thinking: + is_thinking = False + if event.get("delta"): + yield self._format_stream_data( + model=model, + content=event["delta"], + ) + continue + + if event_type == "response.output_text.annotation.added": + annotation = event.get("annotation", {}) + if annotation.get("type") == "url_citation": + annotations.append(annotation) + continue + + if event_type in { + "response.web_search_call.searching", + "response.web_search_call.in_progress", + }: + yield self._format_status_data( + description="web search", + done=False, ) - else: - raise TypeError("Invalid message content type %s" % item["type"]) - messages.append({"role": message["role"], "content": content}) - else: - raise TypeError("Invalid message content type %s" % type(message["content"])) + continue - # reasoning - reasoning_effort = user_valves.reasoning_effort + if event_type == "response.web_search_call.completed": + yield self._format_status_data( + description="web search completed", + done=True, + ) + continue - # build body - data = { + if event_type == "response.completed": + response_obj = event.get("response", {}) + usage = response_obj.get("usage") + response_sources = self._extract_sources_from_response( + response_obj + ) + continue + + if event_type == "response.incomplete": + reason = ( + event.get("response", {}) + .get("incomplete_details", {}) + .get("reason", "unknown") + ) + raise RuntimeError( + self._format_upstream_error( + f"OpenAI response incomplete: {reason}", + request_id=request_id, + ) + ) + + if event_type == "response.failed": + err = event.get("response", {}).get("error") or event.get( + "error", {} + ) + message = err.get( + "message", + "An error occurred while processing your request.", + ) + code = err.get("code", "") + raise RuntimeError( + self._format_upstream_error( + message=message, + request_id=request_id, + code=code, + ) + ) + + if self.valves.append_sources_to_answer: + source_text = self._render_sources(annotations, response_sources) + if source_text: + yield self._format_stream_data(model=model, content=source_text) + + yield self._format_stream_data( + model=model, + usage=usage, + if_finished=True, + ) + + async def _build_payload( + self, body: dict, user_valves: "Pipe.UserValves", stream: bool = True + ) -> Tuple[str, dict]: + model = self._extract_model_name(body.get("model", "")) + messages = self._convert_messages(body.get("messages", [])) + tools = self._build_tools() + + incoming_tools = body.get("tools") or [] + incoming_tool_choice = body.get("tool_choice") + + if incoming_tools: + logger.info( + "ignoring incoming OpenWebUI tools, using OpenAI web_search only: %s", + self._summarize_tools(incoming_tools), + ) + + if incoming_tool_choice not in (None, "", "auto"): + logger.info( + "ignoring incoming OpenWebUI tool_choice, forcing auto: %s", + incoming_tool_choice, + ) + + data: dict[str, Any] = { "model": model, "input": messages, - "reasoning": { - "effort": reasoning_effort, - "summary": user_valves.summary, - }, - "text": { - "verbosity": user_valves.verbosity, - }, "stream": stream, "store": False, } - # max tokens + if self._is_gpt5_family(model): + data["text"] = {"verbosity": user_valves.verbosity} + reasoning_effort = self._normalize_reasoning_effort( + model=model, + effort=user_valves.reasoning_effort, + using_web_search=bool(tools), + ) + data["reasoning"] = {"effort": reasoning_effort} + if self.valves.enable_reasoning: + data["reasoning"]["summary"] = user_valves.summary + if "max_completion_tokens" in body: data["max_output_tokens"] = body["max_completion_tokens"] elif "max_tokens" in body: data["max_output_tokens"] = body["max_tokens"] - # other parameters - allowed_params = [k for k in self.valves.allow_params.split(",") if k] + allowed_params = [ + key.strip() for key in self.valves.allow_params.split(",") if key.strip() + ] + reserved = { + "model", + "input", + "messages", + "tools", + "tool_choice", + "stream", + "store", + "text", + "reasoning", + "include", + "max_completion_tokens", + "max_tokens", + } for key, val in body.items(): - if key in allowed_params: + if key in allowed_params and key not in reserved: data[key] = val - payload = {"method": "POST", "url": "/responses", "json": data} - # check tools - if body.get("tools", []): - payload["json"]["tools"] = body["tools"] + if tools: + data["tools"] = tools + data["tool_choice"] = "auto" + data["include"] = ["web_search_call.action.sources"] + payload = {"method": "POST", "url": "responses", "json": data} + logger.debug("responses payload=%s", json.dumps(data, ensure_ascii=False)) return model, payload - # pylint: disable=R0913,R0917 + def _extract_model_name(self, raw_model: str) -> str: + raw_model = (raw_model or "").strip() + configured = [ + model.strip() for model in self.valves.models.split(",") if model.strip() + ] + + if raw_model in configured: + return raw_model + + for model in sorted(configured, key=len, reverse=True): + if raw_model.endswith(f".{model}"): + return model + + return raw_model + + def _convert_messages(self, messages: list[dict]) -> list[dict]: + converted = [] + + for message in messages: + role = (message.get("role") or "user").strip() + if role not in {"user", "assistant", "system", "developer"}: + role = "user" + + content_value = message.get("content", "") + + if isinstance(content_value, str): + converted.append( + { + "type": "message", + "role": role, + "content": content_value, + } + ) + continue + + if isinstance(content_value, list): + if role == "assistant": + content = self._convert_assistant_content(content_value) + else: + content = self._convert_input_content(content_value) + + if not content: + content = "" + + converted.append( + { + "type": "message", + "role": role, + "content": content, + } + ) + continue + + raise TypeError("Invalid message content type %s" % type(content_value)) + + return converted + + def _convert_input_content(self, items: list[dict]) -> list[dict]: + content = [] + + for item in items: + item_type = item.get("type") + + if item_type in {"text", "input_text"}: + content.append({"type": "input_text", "text": item.get("text", "")}) + elif item_type == "image_url": + image_url = item.get("image_url", {}) + if isinstance(image_url, dict): + image_url = image_url.get("url", "") + content.append( + { + "type": "input_image", + "image_url": image_url, + } + ) + elif item_type == "input_image": + content.append(item) + elif item_type == "input_file": + content.append(item) + elif item_type == "output_text": + content.append({"type": "input_text", "text": item.get("text", "")}) + elif item_type == "refusal": + content.append({"type": "input_text", "text": item.get("refusal", "")}) + elif item_type == "reasoning_content": + text = item.get("text") or item.get("reasoning_content") or "" + if text: + content.append({"type": "input_text", "text": text}) + else: + text = item.get("text") + if isinstance(text, str): + content.append({"type": "input_text", "text": text}) + else: + raise TypeError("Invalid message content type %s" % item_type) + + return content + + def _convert_assistant_content(self, items: list[dict]) -> list[dict]: + content = [] + + for item in items: + item_type = item.get("type") + + if item_type in {"text", "output_text", "input_text"}: + content.append({"type": "output_text", "text": item.get("text", "")}) + elif item_type == "refusal": + content.append( + { + "type": "refusal", + "refusal": item.get("refusal", ""), + } + ) + elif item_type == "reasoning_content": + continue + else: + text = item.get("text") + if isinstance(text, str): + content.append({"type": "output_text", "text": text}) + + return content + + def _coerce_user_valves(self, raw: Any) -> "Pipe.UserValves": + if isinstance(raw, self.UserValves): + return raw + + if raw is None: + return self.UserValves() + + if isinstance(raw, dict): + return self.UserValves(**raw) + + if isinstance(raw, BaseModel): + if hasattr(raw, "model_dump"): + return self.UserValves(**raw.model_dump()) + return self.UserValves(**raw.dict()) + + fields = getattr(self.UserValves, "model_fields", None) or getattr( + self.UserValves, "__fields__", {} + ) + data = {name: getattr(raw, name) for name in fields if hasattr(raw, name)} + return self.UserValves(**data) + + def _build_tools(self) -> list[dict]: + if not self.valves.enable_web_search: + return [] + + tool = { + "type": "web_search", + "search_context_size": self.valves.web_search_context_size, + } + + domains = self._parse_domains(self.valves.web_search_domains) + if domains: + tool["filters"] = {"allowed_domains": domains} + + user_location = self._build_user_location() + if user_location: + tool["user_location"] = user_location + + return [tool] + + def _build_user_location(self) -> Optional[dict]: + values = { + "type": "approximate", + "country": (self.valves.web_search_country or "").strip().upper(), + "city": (self.valves.web_search_city or "").strip(), + "region": (self.valves.web_search_region or "").strip(), + "timezone": (self.valves.web_search_timezone or "").strip(), + } + if not any( + [ + values["country"], + values["city"], + values["region"], + values["timezone"], + ] + ): + return None + return {k: v for k, v in values.items() if v} + + def _parse_domains(self, raw_domains: Optional[str]) -> list[str]: + domains = [] + for domain in (raw_domains or "").split(","): + domain = domain.strip().lower() + if not domain: + continue + if domain.startswith("http://"): + domain = domain[7:] + elif domain.startswith("https://"): + domain = domain[8:] + domain = domain.rstrip("/") + if domain: + domains.append(domain) + return domains + + def _summarize_tools(self, tools: list[dict]) -> list[str]: + summary = [] + for tool in tools: + if not isinstance(tool, dict): + summary.append(str(type(tool))) + continue + + tool_type = tool.get("type", "") + if tool_type == "function": + fn = tool.get("function", {}) or {} + summary.append(f"function:{fn.get('name', 'unknown')}") + else: + summary.append(tool_type or "unknown") + return summary + + def _is_gpt5_family(self, model: str) -> bool: + model = (model or "").lower() + return model.startswith("gpt-5") + + def _normalize_reasoning_effort( + self, model: str, effort: str, using_web_search: bool + ) -> str: + model = (model or "").lower() + + if model.startswith("gpt-5.1") or model.startswith("gpt-5.2"): + if effort == "xhigh": + return "high" + if effort in {"none", "low", "medium", "high"}: + return effort + return "none" + + if model.startswith("gpt-5"): + if effort == "none": + return "low" if using_web_search else "low" + if effort == "xhigh": + return "high" + if effort in {"low", "medium", "high"}: + return effort + return "low" + + return effort + + def _extract_sources_from_response(self, response: dict) -> list[dict]: + sources = [] + + for item in response.get("output", []): + if item.get("type") == "web_search_call": + action = item.get("action", {}) or {} + for source in action.get("sources", []) or []: + if source.get("url"): + sources.append( + { + "url": source["url"], + "title": source.get("title", ""), + } + ) + + if item.get("type") == "message": + for content in item.get("content", []): + for annotation in content.get("annotations", []) or []: + if annotation.get("type") == "url_citation" and annotation.get( + "url" + ): + sources.append( + { + "url": annotation["url"], + "title": annotation.get("title", ""), + } + ) + + return sources + + def _render_sources(self, annotations: list[dict], sources: list[dict]) -> str: + merged = [] + + for annotation in annotations: + if annotation.get("url"): + merged.append( + { + "url": annotation["url"], + "title": annotation.get("title", ""), + } + ) + + merged.extend(sources) + + deduped = [] + seen = set() + for item in merged: + url = item.get("url", "").strip() + if not url or url in seen: + continue + seen.add(url) + deduped.append( + { + "url": url, + "title": item.get("title", "").strip(), + } + ) + + if not deduped: + return "" + + lines = ["", "", "Sources:"] + for idx, item in enumerate(deduped, start=1): + title = item["title"] or item["url"] + lines.append(f"{idx}. {title} - {item['url']}") + return "\n".join(lines) + + async def _read_error_text(self, response: Response) -> str: + text = "" + async for line in response.aiter_lines(): + text += line + return text + + def _format_upstream_error( + self, message: str, request_id: str = "", code: str = "" + ) -> str: + parts = [message] + if code: + parts.append(f"code={code}") + if request_id: + parts.append(f"request_id={request_id}") + return " | ".join(parts) + + def _format_status_data(self, description: str, done: bool) -> str: + data = { + "event": { + "type": "status", + "data": { + "description": description, + "done": done, + }, + } + } + return f"data: {json.dumps(data, ensure_ascii=False)}\n\n" + def _format_stream_data( self, model: Optional[str] = "", @@ -197,21 +693,27 @@ def _format_stream_data( usage: Optional[dict] = None, if_finished: bool = False, ) -> str: + delta = {} + if content: + delta["content"] = content + if reasoning_content: + delta["reasoning_content"] = reasoning_content + data = { "id": f"chat.{uuid.uuid4().hex}", "object": "chat.completion.chunk", - "choices": [], - "created": int(time.time()), - "model": model, - } - if content or reasoning_content: - data["choices"] = [ + "choices": [ { "finish_reason": "stop" if if_finished else "", "index": 0, - "delta": {"content": content, "reasoning_content": reasoning_content}, + "delta": delta, } - ] + ], + "created": int(time.time()), + "model": model, + } + if usage: data["usage"] = usage - return f"data: {json.dumps(data)}\n\n" + + return f"data: {json.dumps(data, ensure_ascii=False)}\n\n" From e1b24348d5348dc6f00a310d39c69e4576b5dd7d Mon Sep 17 00:00:00 2001 From: Vivian <106744968+fwVivian@users.noreply.github.com> Date: Thu, 23 Apr 2026 22:03:55 +0800 Subject: [PATCH 2/5] Updated to version 0.1.7, enhancing API exception handling, optimizing streaming responses, and adding support for web search functionality. --- pipes/openai_responses.py | 724 ++++++++++++++++++++++++++++++++------ 1 file changed, 613 insertions(+), 111 deletions(-) diff --git a/pipes/openai_responses.py b/pipes/openai_responses.py index 521af1f..22efed4 100644 --- a/pipes/openai_responses.py +++ b/pipes/openai_responses.py @@ -2,7 +2,7 @@ title: OpenAI Responses author: OVINC CN git_url: https://github.com/OVINC-CN/OpenWebUIPlugin.git -version: 0.1.2 +version: 0.1.7 licence: MIT """ @@ -10,7 +10,7 @@ import logging import time import uuid -from typing import AsyncIterable, Literal, Optional, Tuple +from typing import Any, AsyncIterable, Literal, Optional, Tuple import httpx from fastapi import Request @@ -24,23 +24,37 @@ class APIException(Exception): - def __init__(self, status: int, content: str, response: Response): + def __init__( + self, + status: int, + content: str, + response: Response, + request_id: str = "", + ): self._status = status self._content = content self._response = response + self._request_id = request_id def __str__(self) -> str: - # error msg + message = "" try: - return json.loads(self._content)["error"]["message"] + message = json.loads(self._content)["error"]["message"] except Exception: pass - # build in error - try: - self._response.raise_for_status() - except Exception as err: - return str(err) - return "Unknown API error" + + if not message: + try: + self._response.raise_for_status() + except Exception as err: + message = str(err) + + if not message: + message = "Unknown API error" + + if self._request_id: + return f"{message} (request_id: {self._request_id})" + return message class Pipe: @@ -49,146 +63,628 @@ class Valves(BaseModel): api_key: str = Field(default="", title="API Key") enable_reasoning: bool = Field(default=True, title="展示思考内容") allow_params: Optional[str] = Field( - default="", title="透传参数", description="允许配置的参数,使用英文逗号分隔,例如 temperature" + default="", + title="透传参数", + description="允许透传的参数,英文逗号分隔,例如 temperature,top_p", ) timeout: int = Field(default=600, title="请求超时时间(秒)") proxy: Optional[str] = Field(default="", title="代理地址") - models: str = Field(default="gpt-5", title="模型", description="使用英文逗号分隔多个模型") + models: str = Field( + default="gpt-5.1,gpt-5", + title="模型", + description="使用英文逗号分隔多个模型", + ) + + enable_web_search: bool = Field(default=True, title="启用 OpenAI Web Search") + web_search_context_size: Literal["low", "medium", "high"] = Field( + default="medium", + title="Web Search 上下文大小", + ) + web_search_domains: Optional[str] = Field( + default="", + title="Web Search 域名白名单", + description="英文逗号分隔,例如 openai.com,platform.openai.com", + ) + web_search_country: Optional[str] = Field( + default="", + title="搜索国家", + description="两位国家代码,例如 US、CN", + ) + web_search_city: Optional[str] = Field(default="", title="搜索城市") + web_search_region: Optional[str] = Field(default="", title="搜索地区/省州") + web_search_timezone: Optional[str] = Field( + default="", + title="搜索时区", + description="IANA 时区,例如 Asia/Shanghai", + ) + append_sources_to_answer: bool = Field( + default=True, + title="在答案末尾追加来源链接", + ) class UserValves(BaseModel): - verbosity: Literal["low", "medium", "high"] = Field(default="medium", title="输出详细程度") - reasoning_effort: Literal["none", "low", "medium", "high", "xhigh"] = Field(default="low", title="思考推理强度") - summary: Literal["auto", "concise", "detailed"] = Field(default="auto", title="思考输出摘要程度") + verbosity: Literal["low", "medium", "high"] = Field( + default="medium", + title="输出详细程度", + ) + reasoning_effort: Literal["none", "low", "medium", "high", "xhigh"] = Field( + default="low", + title="思考推理强度", + ) + summary: Literal["auto", "concise", "detailed"] = Field( + default="auto", + title="思考输出摘要程度", + ) def __init__(self): self.valves = self.Valves() def pipes(self): - return [{"id": model, "name": model} for model in self.valves.models.split(",") if model] + return [ + {"id": model.strip(), "name": model.strip()} + for model in self.valves.models.split(",") + if model.strip() + ] + + async def pipe( + self, body: dict, __user__: dict, __request__: Request + ) -> StreamingResponse: + return StreamingResponse( + self.__stream_pipe(body=body, __user__=__user__, __request__=__request__), + media_type="text/event-stream", + ) - async def pipe(self, body: dict, __user__: dict, __request__: Request) -> StreamingResponse: - return StreamingResponse(self.__stream_pipe(body=body, __user__=__user__, __request__=__request__)) + async def __stream_pipe( + self, body: dict, __user__: dict, __request__: Request + ) -> AsyncIterable[str]: + user_valves = self._coerce_user_valves((__user__ or {}).get("valves")) + model, payload = await self._build_payload(body=body, user_valves=user_valves) - async def __stream_pipe(self, body: dict, __user__: dict, __request__: Request) -> AsyncIterable: - model, payload = await self._build_payload(body=body, user_valves=__user__["valves"]) - # call client async with httpx.AsyncClient( - base_url=self.valves.base_url, + base_url=self.valves.base_url.rstrip("/") + "/", headers={"Authorization": f"Bearer {self.valves.api_key}"}, proxy=self.valves.proxy or None, trust_env=True, timeout=self.valves.timeout, ) as client: async with client.stream(**payload) as response: + request_id = response.headers.get("x-request-id", "") + if response.status_code != 200: - text = "" - async for line in response.aiter_lines(): - text += line # pylint: disable=R1713 - logger.error("response invalid with %d: %s", response.status_code, text) - raise APIException(status=response.status_code, content=text, response=response) + text = await self._read_error_text(response) + logger.error( + "response invalid with %d request_id=%s body=%s", + response.status_code, + request_id, + text, + ) + raise APIException( + status=response.status_code, + content=text, + response=response, + request_id=request_id, + ) + is_thinking = self.valves.enable_reasoning + usage = None + annotations = [] + response_sources = [] + async for line in response.aiter_lines(): line = line.strip() if not line: continue if line.startswith("event:") or not line.startswith("data:"): continue - if line.startswith("data: "): - line = line[6:] - if isinstance(line, str): - line = json.loads(line) - match line.get("type"): - case "response.reasoning_summary_text.delta": - if is_thinking: - yield self._format_stream_data(model=model, reasoning_content=line["delta"]) - case "response.output_text.delta": - if is_thinking: - is_thinking = False - yield self._format_stream_data(model=model, content=line["delta"]) - case "response.completed": + + line = line[5:].strip() + if not line or line == "[DONE]": + continue + + try: + event = json.loads(line) + except json.JSONDecodeError: + logger.warning("ignore invalid stream line: %s", line) + continue + + event_type = event.get("type", "") + + if event_type == "response.reasoning_summary_text.delta": + if is_thinking and event.get("delta"): yield self._format_stream_data( - model=model, usage=line["response"]["usage"], if_finished=True + model=model, + reasoning_content=event["delta"], ) - case _: - event_type = line["type"] - if event_type.endswith("in_progress") or event_type.endswith("completed"): - event_type_split = event_type.split(".")[1:] - if len(event_type_split) == 2: - data = { - "event": { - "type": "status", - "data": { - "description": " ".join(event_type_split), - "done": event_type_split[1] == "completed", - }, - } - } - yield f"data: {json.dumps(data)}\n\n" - - async def _build_payload(self, body: dict, user_valves: UserValves, stream: bool = True) -> Tuple[str, dict]: - model = body["model"].split(".", 1)[1] - - # build messages - messages = [] - for message in body["messages"]: - if isinstance(message["content"], str): - messages.append({"content": message["content"], "role": message["role"]}) - elif isinstance(message["content"], list): - content = [] - for item in message["content"]: - if item["type"] == "text": - content.append({"type": "input_text", "text": item["text"]}) - elif item["type"] == "image_url": - content.append( - { - "type": "input_image", - "image_url": item["image_url"]["url"], - } + continue + + if event_type == "response.output_text.delta": + if is_thinking: + is_thinking = False + if event.get("delta"): + yield self._format_stream_data( + model=model, + content=event["delta"], + ) + continue + + if event_type == "response.output_text.annotation.added": + annotation = event.get("annotation", {}) + if annotation.get("type") == "url_citation": + annotations.append(annotation) + continue + + if event_type in { + "response.web_search_call.searching", + "response.web_search_call.in_progress", + }: + yield self._format_status_data( + description="web search", + done=False, ) - else: - raise TypeError("Invalid message content type %s" % item["type"]) - messages.append({"role": message["role"], "content": content}) - else: - raise TypeError("Invalid message content type %s" % type(message["content"])) + continue - # reasoning - reasoning_effort = user_valves.reasoning_effort + if event_type == "response.web_search_call.completed": + yield self._format_status_data( + description="web search completed", + done=True, + ) + continue - # build body - data = { + if event_type == "response.completed": + response_obj = event.get("response", {}) + usage = response_obj.get("usage") + response_sources = self._extract_sources_from_response( + response_obj + ) + continue + + if event_type == "response.incomplete": + reason = ( + event.get("response", {}) + .get("incomplete_details", {}) + .get("reason", "unknown") + ) + raise RuntimeError( + self._format_upstream_error( + f"OpenAI response incomplete: {reason}", + request_id=request_id, + ) + ) + + if event_type == "response.failed": + err = event.get("response", {}).get("error") or event.get( + "error", {} + ) + message = err.get( + "message", + "An error occurred while processing your request.", + ) + code = err.get("code", "") + raise RuntimeError( + self._format_upstream_error( + message=message, + request_id=request_id, + code=code, + ) + ) + + if self.valves.append_sources_to_answer: + source_text = self._render_sources(annotations, response_sources) + if source_text: + yield self._format_stream_data(model=model, content=source_text) + + yield self._format_stream_data( + model=model, + usage=usage, + if_finished=True, + ) + + async def _build_payload( + self, body: dict, user_valves: "Pipe.UserValves", stream: bool = True + ) -> Tuple[str, dict]: + model = self._extract_model_name(body.get("model", "")) + messages = self._convert_messages(body.get("messages", [])) + tools = self._build_tools() + + incoming_tools = body.get("tools") or [] + incoming_tool_choice = body.get("tool_choice") + + if incoming_tools: + logger.info( + "ignoring incoming OpenWebUI tools, using OpenAI web_search only: %s", + self._summarize_tools(incoming_tools), + ) + + if incoming_tool_choice not in (None, "", "auto"): + logger.info( + "ignoring incoming OpenWebUI tool_choice, forcing auto: %s", + incoming_tool_choice, + ) + + data: dict[str, Any] = { "model": model, "input": messages, - "reasoning": { - "effort": reasoning_effort, - "summary": user_valves.summary, - }, - "text": { - "verbosity": user_valves.verbosity, - }, "stream": stream, "store": False, } - # max tokens + if self._is_gpt5_family(model): + data["text"] = {"verbosity": user_valves.verbosity} + reasoning_effort = self._normalize_reasoning_effort( + model=model, + effort=user_valves.reasoning_effort, + using_web_search=bool(tools), + ) + data["reasoning"] = {"effort": reasoning_effort} + if self.valves.enable_reasoning: + data["reasoning"]["summary"] = user_valves.summary + if "max_completion_tokens" in body: data["max_output_tokens"] = body["max_completion_tokens"] elif "max_tokens" in body: data["max_output_tokens"] = body["max_tokens"] - # other parameters - allowed_params = [k for k in self.valves.allow_params.split(",") if k] + allowed_params = [ + key.strip() for key in self.valves.allow_params.split(",") if key.strip() + ] + reserved = { + "model", + "input", + "messages", + "tools", + "tool_choice", + "stream", + "store", + "text", + "reasoning", + "include", + "max_completion_tokens", + "max_tokens", + } for key, val in body.items(): - if key in allowed_params: + if key in allowed_params and key not in reserved: data[key] = val - payload = {"method": "POST", "url": "/responses", "json": data} - # check tools - if body.get("tools", []): - payload["json"]["tools"] = body["tools"] + if tools: + data["tools"] = tools + data["tool_choice"] = "auto" + data["include"] = ["web_search_call.action.sources"] + payload = {"method": "POST", "url": "responses", "json": data} + logger.debug("responses payload=%s", json.dumps(data, ensure_ascii=False)) return model, payload - # pylint: disable=R0913,R0917 + def _extract_model_name(self, raw_model: str) -> str: + raw_model = (raw_model or "").strip() + configured = [ + model.strip() for model in self.valves.models.split(",") if model.strip() + ] + + if raw_model in configured: + return raw_model + + for model in sorted(configured, key=len, reverse=True): + if raw_model.endswith(f".{model}"): + return model + + return raw_model + + def _convert_messages(self, messages: list[dict]) -> list[dict]: + converted = [] + + for message in messages: + role = (message.get("role") or "user").strip() + if role not in {"user", "assistant", "system", "developer"}: + role = "user" + + content_value = message.get("content", "") + + if isinstance(content_value, str): + converted.append( + { + "type": "message", + "role": role, + "content": content_value, + } + ) + continue + + if isinstance(content_value, list): + if role == "assistant": + content = self._convert_assistant_content(content_value) + else: + content = self._convert_input_content(content_value) + + if not content: + content = "" + + converted.append( + { + "type": "message", + "role": role, + "content": content, + } + ) + continue + + raise TypeError("Invalid message content type %s" % type(content_value)) + + return converted + + def _convert_input_content(self, items: list[dict]) -> list[dict]: + content = [] + + for item in items: + item_type = item.get("type") + + if item_type in {"text", "input_text"}: + content.append({"type": "input_text", "text": item.get("text", "")}) + elif item_type == "image_url": + image_url = item.get("image_url", {}) + if isinstance(image_url, dict): + image_url = image_url.get("url", "") + content.append( + { + "type": "input_image", + "image_url": image_url, + } + ) + elif item_type == "input_image": + content.append(item) + elif item_type == "input_file": + content.append(item) + elif item_type == "output_text": + content.append({"type": "input_text", "text": item.get("text", "")}) + elif item_type == "refusal": + content.append({"type": "input_text", "text": item.get("refusal", "")}) + elif item_type == "reasoning_content": + text = item.get("text") or item.get("reasoning_content") or "" + if text: + content.append({"type": "input_text", "text": text}) + else: + text = item.get("text") + if isinstance(text, str): + content.append({"type": "input_text", "text": text}) + else: + raise TypeError("Invalid message content type %s" % item_type) + + return content + + def _convert_assistant_content(self, items: list[dict]) -> list[dict]: + content = [] + + for item in items: + item_type = item.get("type") + + if item_type in {"text", "output_text", "input_text"}: + content.append({"type": "output_text", "text": item.get("text", "")}) + elif item_type == "refusal": + content.append( + { + "type": "refusal", + "refusal": item.get("refusal", ""), + } + ) + elif item_type == "reasoning_content": + continue + else: + text = item.get("text") + if isinstance(text, str): + content.append({"type": "output_text", "text": text}) + + return content + + def _coerce_user_valves(self, raw: Any) -> "Pipe.UserValves": + if isinstance(raw, self.UserValves): + return raw + + if raw is None: + return self.UserValves() + + if isinstance(raw, dict): + return self.UserValves(**raw) + + if isinstance(raw, BaseModel): + if hasattr(raw, "model_dump"): + return self.UserValves(**raw.model_dump()) + return self.UserValves(**raw.dict()) + + fields = getattr(self.UserValves, "model_fields", None) or getattr( + self.UserValves, "__fields__", {} + ) + data = {name: getattr(raw, name) for name in fields if hasattr(raw, name)} + return self.UserValves(**data) + + def _build_tools(self) -> list[dict]: + if not self.valves.enable_web_search: + return [] + + tool = { + "type": "web_search", + "search_context_size": self.valves.web_search_context_size, + } + + domains = self._parse_domains(self.valves.web_search_domains) + if domains: + tool["filters"] = {"allowed_domains": domains} + + user_location = self._build_user_location() + if user_location: + tool["user_location"] = user_location + + return [tool] + + def _build_user_location(self) -> Optional[dict]: + values = { + "type": "approximate", + "country": (self.valves.web_search_country or "").strip().upper(), + "city": (self.valves.web_search_city or "").strip(), + "region": (self.valves.web_search_region or "").strip(), + "timezone": (self.valves.web_search_timezone or "").strip(), + } + if not any( + [ + values["country"], + values["city"], + values["region"], + values["timezone"], + ] + ): + return None + return {k: v for k, v in values.items() if v} + + def _parse_domains(self, raw_domains: Optional[str]) -> list[str]: + domains = [] + for domain in (raw_domains or "").split(","): + domain = domain.strip().lower() + if not domain: + continue + if domain.startswith("http://"): + domain = domain[7:] + elif domain.startswith("https://"): + domain = domain[8:] + domain = domain.rstrip("/") + if domain: + domains.append(domain) + return domains + + def _summarize_tools(self, tools: list[dict]) -> list[str]: + summary = [] + for tool in tools: + if not isinstance(tool, dict): + summary.append(str(type(tool))) + continue + + tool_type = tool.get("type", "") + if tool_type == "function": + fn = tool.get("function", {}) or {} + summary.append(f"function:{fn.get('name', 'unknown')}") + else: + summary.append(tool_type or "unknown") + return summary + + def _is_gpt5_family(self, model: str) -> bool: + model = (model or "").lower() + return model.startswith("gpt-5") + + def _normalize_reasoning_effort( + self, model: str, effort: str, using_web_search: bool + ) -> str: + model = (model or "").lower() + + if model.startswith("gpt-5.1") or model.startswith("gpt-5.2"): + if effort == "xhigh": + return "high" + if effort in {"none", "low", "medium", "high"}: + return effort + return "none" + + if model.startswith("gpt-5"): + if effort == "none": + return "low" if using_web_search else "low" + if effort == "xhigh": + return "high" + if effort in {"low", "medium", "high"}: + return effort + return "low" + + return effort + + def _extract_sources_from_response(self, response: dict) -> list[dict]: + sources = [] + + for item in response.get("output", []): + if item.get("type") == "web_search_call": + action = item.get("action", {}) or {} + for source in action.get("sources", []) or []: + if source.get("url"): + sources.append( + { + "url": source["url"], + "title": source.get("title", ""), + } + ) + + if item.get("type") == "message": + for content in item.get("content", []): + for annotation in content.get("annotations", []) or []: + if annotation.get("type") == "url_citation" and annotation.get( + "url" + ): + sources.append( + { + "url": annotation["url"], + "title": annotation.get("title", ""), + } + ) + + return sources + + def _render_sources(self, annotations: list[dict], sources: list[dict]) -> str: + merged = [] + + for annotation in annotations: + if annotation.get("url"): + merged.append( + { + "url": annotation["url"], + "title": annotation.get("title", ""), + } + ) + + merged.extend(sources) + + deduped = [] + seen = set() + for item in merged: + url = item.get("url", "").strip() + if not url or url in seen: + continue + seen.add(url) + deduped.append( + { + "url": url, + "title": item.get("title", "").strip(), + } + ) + + if not deduped: + return "" + + lines = ["", "", "Sources:"] + for idx, item in enumerate(deduped, start=1): + title = item["title"] or item["url"] + lines.append(f"{idx}. {title} - {item['url']}") + return "\n".join(lines) + + async def _read_error_text(self, response: Response) -> str: + text = "" + async for line in response.aiter_lines(): + text += line + return text + + def _format_upstream_error( + self, message: str, request_id: str = "", code: str = "" + ) -> str: + parts = [message] + if code: + parts.append(f"code={code}") + if request_id: + parts.append(f"request_id={request_id}") + return " | ".join(parts) + + def _format_status_data(self, description: str, done: bool) -> str: + data = { + "event": { + "type": "status", + "data": { + "description": description, + "done": done, + }, + } + } + return f"data: {json.dumps(data, ensure_ascii=False)}\n\n" + def _format_stream_data( self, model: Optional[str] = "", @@ -197,21 +693,27 @@ def _format_stream_data( usage: Optional[dict] = None, if_finished: bool = False, ) -> str: + delta = {} + if content: + delta["content"] = content + if reasoning_content: + delta["reasoning_content"] = reasoning_content + data = { "id": f"chat.{uuid.uuid4().hex}", "object": "chat.completion.chunk", - "choices": [], - "created": int(time.time()), - "model": model, - } - if content or reasoning_content: - data["choices"] = [ + "choices": [ { "finish_reason": "stop" if if_finished else "", "index": 0, - "delta": {"content": content, "reasoning_content": reasoning_content}, + "delta": delta, } - ] + ], + "created": int(time.time()), + "model": model, + } + if usage: data["usage"] = usage - return f"data: {json.dumps(data)}\n\n" + + return f"data: {json.dumps(data, ensure_ascii=False)}\n\n" From 41db8183c7b5ff1ce3c3d14cf970c323635956a4 Mon Sep 17 00:00:00 2001 From: Vivian <106744968+fwVivian@users.noreply.github.com> Date: Thu, 23 Apr 2026 22:23:54 +0800 Subject: [PATCH 3/5] style(openai_responses): apply black formatting --- pipes/openai_responses.py | 54 ++++++++++----------------------------- 1 file changed, 13 insertions(+), 41 deletions(-) diff --git a/pipes/openai_responses.py b/pipes/openai_responses.py index 22efed4..e557e47 100644 --- a/pipes/openai_responses.py +++ b/pipes/openai_responses.py @@ -121,22 +121,16 @@ def __init__(self): def pipes(self): return [ - {"id": model.strip(), "name": model.strip()} - for model in self.valves.models.split(",") - if model.strip() + {"id": model.strip(), "name": model.strip()} for model in self.valves.models.split(",") if model.strip() ] - async def pipe( - self, body: dict, __user__: dict, __request__: Request - ) -> StreamingResponse: + async def pipe(self, body: dict, __user__: dict, __request__: Request) -> StreamingResponse: return StreamingResponse( self.__stream_pipe(body=body, __user__=__user__, __request__=__request__), media_type="text/event-stream", ) - async def __stream_pipe( - self, body: dict, __user__: dict, __request__: Request - ) -> AsyncIterable[str]: + async def __stream_pipe(self, body: dict, __user__: dict, __request__: Request) -> AsyncIterable[str]: user_valves = self._coerce_user_valves((__user__ or {}).get("valves")) model, payload = await self._build_payload(body=body, user_valves=user_valves) @@ -233,17 +227,11 @@ async def __stream_pipe( if event_type == "response.completed": response_obj = event.get("response", {}) usage = response_obj.get("usage") - response_sources = self._extract_sources_from_response( - response_obj - ) + response_sources = self._extract_sources_from_response(response_obj) continue if event_type == "response.incomplete": - reason = ( - event.get("response", {}) - .get("incomplete_details", {}) - .get("reason", "unknown") - ) + reason = event.get("response", {}).get("incomplete_details", {}).get("reason", "unknown") raise RuntimeError( self._format_upstream_error( f"OpenAI response incomplete: {reason}", @@ -252,9 +240,7 @@ async def __stream_pipe( ) if event_type == "response.failed": - err = event.get("response", {}).get("error") or event.get( - "error", {} - ) + err = event.get("response", {}).get("error") or event.get("error", {}) message = err.get( "message", "An error occurred while processing your request.", @@ -279,9 +265,7 @@ async def __stream_pipe( if_finished=True, ) - async def _build_payload( - self, body: dict, user_valves: "Pipe.UserValves", stream: bool = True - ) -> Tuple[str, dict]: + async def _build_payload(self, body: dict, user_valves: "Pipe.UserValves", stream: bool = True) -> Tuple[str, dict]: model = self._extract_model_name(body.get("model", "")) messages = self._convert_messages(body.get("messages", [])) tools = self._build_tools() @@ -324,9 +308,7 @@ async def _build_payload( elif "max_tokens" in body: data["max_output_tokens"] = body["max_tokens"] - allowed_params = [ - key.strip() for key in self.valves.allow_params.split(",") if key.strip() - ] + allowed_params = [key.strip() for key in self.valves.allow_params.split(",") if key.strip()] reserved = { "model", "input", @@ -356,9 +338,7 @@ async def _build_payload( def _extract_model_name(self, raw_model: str) -> str: raw_model = (raw_model or "").strip() - configured = [ - model.strip() for model in self.valves.models.split(",") if model.strip() - ] + configured = [model.strip() for model in self.valves.models.split(",") if model.strip()] if raw_model in configured: return raw_model @@ -489,9 +469,7 @@ def _coerce_user_valves(self, raw: Any) -> "Pipe.UserValves": return self.UserValves(**raw.model_dump()) return self.UserValves(**raw.dict()) - fields = getattr(self.UserValves, "model_fields", None) or getattr( - self.UserValves, "__fields__", {} - ) + fields = getattr(self.UserValves, "model_fields", None) or getattr(self.UserValves, "__fields__", {}) data = {name: getattr(raw, name) for name in fields if hasattr(raw, name)} return self.UserValves(**data) @@ -567,9 +545,7 @@ def _is_gpt5_family(self, model: str) -> bool: model = (model or "").lower() return model.startswith("gpt-5") - def _normalize_reasoning_effort( - self, model: str, effort: str, using_web_search: bool - ) -> str: + def _normalize_reasoning_effort(self, model: str, effort: str, using_web_search: bool) -> str: model = (model or "").lower() if model.startswith("gpt-5.1") or model.startswith("gpt-5.2"): @@ -608,9 +584,7 @@ def _extract_sources_from_response(self, response: dict) -> list[dict]: if item.get("type") == "message": for content in item.get("content", []): for annotation in content.get("annotations", []) or []: - if annotation.get("type") == "url_citation" and annotation.get( - "url" - ): + if annotation.get("type") == "url_citation" and annotation.get("url"): sources.append( { "url": annotation["url"], @@ -663,9 +637,7 @@ async def _read_error_text(self, response: Response) -> str: text += line return text - def _format_upstream_error( - self, message: str, request_id: str = "", code: str = "" - ) -> str: + def _format_upstream_error(self, message: str, request_id: str = "", code: str = "") -> str: parts = [message] if code: parts.append(f"code={code}") From 807bac170988579a2bc9f309e3f9369ab5a80772 Mon Sep 17 00:00:00 2001 From: Vivian <106744968+fwVivian@users.noreply.github.com> Date: Thu, 23 Apr 2026 23:08:55 +0800 Subject: [PATCH 4/5] refactor(pipe): update stream data formatting and normalize reasoning effort logic --- pipes/openai_responses.py | 53 +++++++++++++++------------------------ 1 file changed, 20 insertions(+), 33 deletions(-) diff --git a/pipes/openai_responses.py b/pipes/openai_responses.py index e557e47..369a0bf 100644 --- a/pipes/openai_responses.py +++ b/pipes/openai_responses.py @@ -187,7 +187,7 @@ async def __stream_pipe(self, body: dict, __user__: dict, __request__: Request) if is_thinking and event.get("delta"): yield self._format_stream_data( model=model, - reasoning_content=event["delta"], + delta={"reasoning_content": event["delta"]}, ) continue @@ -197,7 +197,7 @@ async def __stream_pipe(self, body: dict, __user__: dict, __request__: Request) if event.get("delta"): yield self._format_stream_data( model=model, - content=event["delta"], + delta={"content": event["delta"]}, ) continue @@ -257,7 +257,7 @@ async def __stream_pipe(self, body: dict, __user__: dict, __request__: Request) if self.valves.append_sources_to_answer: source_text = self._render_sources(annotations, response_sources) if source_text: - yield self._format_stream_data(model=model, content=source_text) + yield self._format_stream_data(model=model, delta={"content": source_text}) yield self._format_stream_data( model=model, @@ -548,23 +548,20 @@ def _is_gpt5_family(self, model: str) -> bool: def _normalize_reasoning_effort(self, model: str, effort: str, using_web_search: bool) -> str: model = (model or "").lower() - if model.startswith("gpt-5.1") or model.startswith("gpt-5.2"): - if effort == "xhigh": - return "high" - if effort in {"none", "low", "medium", "high"}: - return effort - return "none" - - if model.startswith("gpt-5"): - if effort == "none": - return "low" if using_web_search else "low" - if effort == "xhigh": - return "high" - if effort in {"low", "medium", "high"}: - return effort - return "low" - - return effort + if model.startswith(("gpt-5.1", "gpt-5.2")): + allowed_efforts = {"none", "low", "medium", "high"} + default_effort = "none" + elif model.startswith("gpt-5"): + allowed_efforts = {"low", "medium", "high"} + default_effort = "low" + else: + return effort + + if effort == "xhigh": + return "high" + if effort in allowed_efforts: + return effort + return default_effort def _extract_sources_from_response(self, response: dict) -> list[dict]: sources = [] @@ -632,10 +629,7 @@ def _render_sources(self, annotations: list[dict], sources: list[dict]) -> str: return "\n".join(lines) async def _read_error_text(self, response: Response) -> str: - text = "" - async for line in response.aiter_lines(): - text += line - return text + return "".join([line async for line in response.aiter_lines()]) def _format_upstream_error(self, message: str, request_id: str = "", code: str = "") -> str: parts = [message] @@ -660,17 +654,10 @@ def _format_status_data(self, description: str, done: bool) -> str: def _format_stream_data( self, model: Optional[str] = "", - content: Optional[str] = "", - reasoning_content: Optional[str] = "", + delta: Optional[dict] = None, usage: Optional[dict] = None, if_finished: bool = False, ) -> str: - delta = {} - if content: - delta["content"] = content - if reasoning_content: - delta["reasoning_content"] = reasoning_content - data = { "id": f"chat.{uuid.uuid4().hex}", "object": "chat.completion.chunk", @@ -678,7 +665,7 @@ def _format_stream_data( { "finish_reason": "stop" if if_finished else "", "index": 0, - "delta": delta, + "delta": delta or {}, } ], "created": int(time.time()), From 1d17cc642d094387f66c063c921cb80a10dabf62 Mon Sep 17 00:00:00 2001 From: Vivian <106744968+fwVivian@users.noreply.github.com> Date: Sat, 25 Apr 2026 23:52:09 +0800 Subject: [PATCH 5/5] refactor(openai_responses): simplify API response handling and update model configuration --- pipes/openai_responses.py | 901 ++++++++++++++------------------------ 1 file changed, 328 insertions(+), 573 deletions(-) diff --git a/pipes/openai_responses.py b/pipes/openai_responses.py index 369a0bf..18adb64 100644 --- a/pipes/openai_responses.py +++ b/pipes/openai_responses.py @@ -1,678 +1,433 @@ """ -title: OpenAI Responses +title: OpenAI Responses Minimal author: OVINC CN -git_url: https://github.com/OVINC-CN/OpenWebUIPlugin.git -version: 0.1.7 +version: 0.1.0 licence: MIT """ import json -import logging +import mimetypes import time import uuid -from typing import Any, AsyncIterable, Literal, Optional, Tuple +from pathlib import Path +from typing import AsyncIterable, Optional import httpx from fastapi import Request -from httpx import Response -from open_webui.env import GLOBAL_LOG_LEVEL from pydantic import BaseModel, Field from starlette.responses import StreamingResponse -logger = logging.getLogger(__name__) -logger.setLevel(GLOBAL_LOG_LEVEL) - - -class APIException(Exception): - def __init__( - self, - status: int, - content: str, - response: Response, - request_id: str = "", - ): - self._status = status - self._content = content - self._response = response - self._request_id = request_id - - def __str__(self) -> str: - message = "" - try: - message = json.loads(self._content)["error"]["message"] - except Exception: - pass - - if not message: - try: - self._response.raise_for_status() - except Exception as err: - message = str(err) - - if not message: - message = "Unknown API error" - - if self._request_id: - return f"{message} (request_id: {self._request_id})" - return message - class Pipe: class Valves(BaseModel): base_url: str = Field(default="https://api.openai.com/v1", title="Base URL") api_key: str = Field(default="", title="API Key") - enable_reasoning: bool = Field(default=True, title="展示思考内容") - allow_params: Optional[str] = Field( - default="", - title="透传参数", - description="允许透传的参数,英文逗号分隔,例如 temperature,top_p", - ) + models: str = Field(default="gpt-5.5", title="模型") timeout: int = Field(default=600, title="请求超时时间(秒)") - proxy: Optional[str] = Field(default="", title="代理地址") - models: str = Field( - default="gpt-5.1,gpt-5", - title="模型", - description="使用英文逗号分隔多个模型", - ) - enable_web_search: bool = Field(default=True, title="启用 OpenAI Web Search") - web_search_context_size: Literal["low", "medium", "high"] = Field( - default="medium", - title="Web Search 上下文大小", - ) - web_search_domains: Optional[str] = Field( - default="", - title="Web Search 域名白名单", - description="英文逗号分隔,例如 openai.com,platform.openai.com", - ) - web_search_country: Optional[str] = Field( - default="", - title="搜索国家", - description="两位国家代码,例如 US、CN", - ) - web_search_city: Optional[str] = Field(default="", title="搜索城市") - web_search_region: Optional[str] = Field(default="", title="搜索地区/省州") - web_search_timezone: Optional[str] = Field( - default="", - title="搜索时区", - description="IANA 时区,例如 Asia/Shanghai", - ) - append_sources_to_answer: bool = Field( - default=True, - title="在答案末尾追加来源链接", - ) - - class UserValves(BaseModel): - verbosity: Literal["low", "medium", "high"] = Field( - default="medium", - title="输出详细程度", - ) - reasoning_effort: Literal["none", "low", "medium", "high", "xhigh"] = Field( - default="low", - title="思考推理强度", - ) - summary: Literal["auto", "concise", "detailed"] = Field( - default="auto", - title="思考输出摘要程度", - ) def __init__(self): self.valves = self.Valves() def pipes(self): return [ - {"id": model.strip(), "name": model.strip()} for model in self.valves.models.split(",") if model.strip() + {"id": model.strip(), "name": model.strip()} + for model in self.valves.models.split(",") + if model.strip() ] - async def pipe(self, body: dict, __user__: dict, __request__: Request) -> StreamingResponse: + async def pipe( + self, + body: dict, + __user__: dict, + __request__: Request, + __files__: Optional[list[dict]] = None, + ) -> StreamingResponse: return StreamingResponse( - self.__stream_pipe(body=body, __user__=__user__, __request__=__request__), + self._stream(body, __files__ or []), media_type="text/event-stream", ) - async def __stream_pipe(self, body: dict, __user__: dict, __request__: Request) -> AsyncIterable[str]: - user_valves = self._coerce_user_valves((__user__ or {}).get("valves")) - model, payload = await self._build_payload(body=body, user_valves=user_valves) + async def _stream(self, body: dict, injected_files: list[dict]) -> AsyncIterable[str]: + api_key = (self.valves.api_key or "").strip() + if not api_key: + raise RuntimeError("OpenAI API Key is empty. Please set the API Key valve.") + + model = self._extract_model_name(body.get("model", "")) + body_files = list(body.get("files") or []) + body_files.extend(injected_files) + data = { + "model": model, + "input": self._convert_messages( + body.get("messages", []), + body_files=body_files, + ), + "stream": True, + } + + if model.lower().startswith("gpt-5"): + data["reasoning"] = {"effort": "high", "summary": "auto"} + + if self.valves.enable_web_search: + data["tools"] = [{"type": "web_search"}] + data["tool_choice"] = "auto" async with httpx.AsyncClient( base_url=self.valves.base_url.rstrip("/") + "/", - headers={"Authorization": f"Bearer {self.valves.api_key}"}, - proxy=self.valves.proxy or None, - trust_env=True, + headers={"Authorization": f"Bearer {api_key}"}, timeout=self.valves.timeout, ) as client: - async with client.stream(**payload) as response: - request_id = response.headers.get("x-request-id", "") - + async with client.stream("POST", "responses", json=data) as response: if response.status_code != 200: - text = await self._read_error_text(response) - logger.error( - "response invalid with %d request_id=%s body=%s", - response.status_code, - request_id, - text, - ) - raise APIException( - status=response.status_code, - content=text, - response=response, - request_id=request_id, - ) - - is_thinking = self.valves.enable_reasoning - usage = None - annotations = [] - response_sources = [] + error = await response.aread() + raise RuntimeError(error.decode("utf-8", errors="replace")) async for line in response.aiter_lines(): line = line.strip() - if not line: - continue - if line.startswith("event:") or not line.startswith("data:"): + if not line.startswith("data:"): continue - line = line[5:].strip() - if not line or line == "[DONE]": + raw = line[5:].strip() + if not raw or raw == "[DONE]": continue - try: - event = json.loads(line) - except json.JSONDecodeError: - logger.warning("ignore invalid stream line: %s", line) - continue - - event_type = event.get("type", "") + event = json.loads(raw) + event_type = event.get("type") if event_type == "response.reasoning_summary_text.delta": - if is_thinking and event.get("delta"): - yield self._format_stream_data( - model=model, - delta={"reasoning_content": event["delta"]}, - ) - continue + delta = event.get("delta") + if delta: + yield self._chunk(model, {"reasoning_content": delta}) - if event_type == "response.output_text.delta": - if is_thinking: - is_thinking = False - if event.get("delta"): - yield self._format_stream_data( - model=model, - delta={"content": event["delta"]}, - ) - continue - - if event_type == "response.output_text.annotation.added": - annotation = event.get("annotation", {}) - if annotation.get("type") == "url_citation": - annotations.append(annotation) - continue + elif event_type == "response.output_text.delta": + delta = event.get("delta") + if delta: + yield self._chunk(model, {"content": delta}) - if event_type in { + elif event_type in { "response.web_search_call.searching", "response.web_search_call.in_progress", }: - yield self._format_status_data( - description="web search", - done=False, - ) - continue + yield self._status("web search", done=False) - if event_type == "response.web_search_call.completed": - yield self._format_status_data( - description="web search completed", - done=True, - ) - continue + elif event_type == "response.web_search_call.completed": + yield self._status("web search completed", done=True) - if event_type == "response.completed": - response_obj = event.get("response", {}) - usage = response_obj.get("usage") - response_sources = self._extract_sources_from_response(response_obj) - continue + elif event_type == "response.failed": + error = event.get("response", {}).get("error") or {} + raise RuntimeError(error.get("message", "OpenAI response failed")) - if event_type == "response.incomplete": - reason = event.get("response", {}).get("incomplete_details", {}).get("reason", "unknown") - raise RuntimeError( - self._format_upstream_error( - f"OpenAI response incomplete: {reason}", - request_id=request_id, - ) + elif event_type == "response.incomplete": + details = event.get("response", {}).get( + "incomplete_details", {} ) + reason = details.get("reason", "unknown") + raise RuntimeError(f"OpenAI response incomplete: {reason}") - if event_type == "response.failed": - err = event.get("response", {}).get("error") or event.get("error", {}) - message = err.get( - "message", - "An error occurred while processing your request.", - ) - code = err.get("code", "") - raise RuntimeError( - self._format_upstream_error( - message=message, - request_id=request_id, - code=code, - ) - ) - - if self.valves.append_sources_to_answer: - source_text = self._render_sources(annotations, response_sources) - if source_text: - yield self._format_stream_data(model=model, delta={"content": source_text}) - - yield self._format_stream_data( - model=model, - usage=usage, - if_finished=True, - ) - - async def _build_payload(self, body: dict, user_valves: "Pipe.UserValves", stream: bool = True) -> Tuple[str, dict]: - model = self._extract_model_name(body.get("model", "")) - messages = self._convert_messages(body.get("messages", [])) - tools = self._build_tools() - - incoming_tools = body.get("tools") or [] - incoming_tool_choice = body.get("tool_choice") - - if incoming_tools: - logger.info( - "ignoring incoming OpenWebUI tools, using OpenAI web_search only: %s", - self._summarize_tools(incoming_tools), - ) - - if incoming_tool_choice not in (None, "", "auto"): - logger.info( - "ignoring incoming OpenWebUI tool_choice, forcing auto: %s", - incoming_tool_choice, - ) - - data: dict[str, Any] = { - "model": model, - "input": messages, - "stream": stream, - "store": False, - } - - if self._is_gpt5_family(model): - data["text"] = {"verbosity": user_valves.verbosity} - reasoning_effort = self._normalize_reasoning_effort( - model=model, - effort=user_valves.reasoning_effort, - using_web_search=bool(tools), - ) - data["reasoning"] = {"effort": reasoning_effort} - if self.valves.enable_reasoning: - data["reasoning"]["summary"] = user_valves.summary - - if "max_completion_tokens" in body: - data["max_output_tokens"] = body["max_completion_tokens"] - elif "max_tokens" in body: - data["max_output_tokens"] = body["max_tokens"] - - allowed_params = [key.strip() for key in self.valves.allow_params.split(",") if key.strip()] - reserved = { - "model", - "input", - "messages", - "tools", - "tool_choice", - "stream", - "store", - "text", - "reasoning", - "include", - "max_completion_tokens", - "max_tokens", - } - for key, val in body.items(): - if key in allowed_params and key not in reserved: - data[key] = val - - if tools: - data["tools"] = tools - data["tool_choice"] = "auto" - data["include"] = ["web_search_call.action.sources"] - - payload = {"method": "POST", "url": "responses", "json": data} - logger.debug("responses payload=%s", json.dumps(data, ensure_ascii=False)) - return model, payload + yield self._chunk(model, {}, finished=True) def _extract_model_name(self, raw_model: str) -> str: raw_model = (raw_model or "").strip() - configured = [model.strip() for model in self.valves.models.split(",") if model.strip()] + models = [m.strip() for m in self.valves.models.split(",") if m.strip()] - if raw_model in configured: + if raw_model in models: return raw_model - for model in sorted(configured, key=len, reverse=True): + for model in sorted(models, key=len, reverse=True): if raw_model.endswith(f".{model}"): return model - return raw_model + return raw_model or models[0] - def _convert_messages(self, messages: list[dict]) -> list[dict]: + def _convert_messages( + self, messages: list[dict], body_files: Optional[list[dict]] = None + ) -> list[dict]: converted = [] + body_files = body_files or [] + last_user_index = self._last_user_message_index(messages) - for message in messages: - role = (message.get("role") or "user").strip() + for index, message in enumerate(messages): + role = message.get("role") or "user" if role not in {"user", "assistant", "system", "developer"}: role = "user" - content_value = message.get("content", "") - - if isinstance(content_value, str): - converted.append( - { - "type": "message", - "role": role, - "content": content_value, - } - ) - continue - - if isinstance(content_value, list): - if role == "assistant": - content = self._convert_assistant_content(content_value) - else: - content = self._convert_input_content(content_value) - - if not content: - content = "" - - converted.append( - { - "type": "message", - "role": role, - "content": content, - } - ) - continue - - raise TypeError("Invalid message content type %s" % type(content_value)) + files = list(message.get("files") or []) + if body_files and index == last_user_index: + files.extend(body_files) + + content = self._convert_content( + message.get("content", ""), + role=role, + files=files, + ) + + converted.append( + { + "type": "message", + "role": role, + "content": content, + } + ) return converted - def _convert_input_content(self, items: list[dict]) -> list[dict]: - content = [] - - for item in items: - item_type = item.get("type") - - if item_type in {"text", "input_text"}: - content.append({"type": "input_text", "text": item.get("text", "")}) - elif item_type == "image_url": - image_url = item.get("image_url", {}) - if isinstance(image_url, dict): - image_url = image_url.get("url", "") - content.append( - { - "type": "input_image", - "image_url": image_url, - } - ) - elif item_type == "input_image": - content.append(item) - elif item_type == "input_file": - content.append(item) - elif item_type == "output_text": - content.append({"type": "input_text", "text": item.get("text", "")}) - elif item_type == "refusal": - content.append({"type": "input_text", "text": item.get("refusal", "")}) - elif item_type == "reasoning_content": - text = item.get("text") or item.get("reasoning_content") or "" - if text: - content.append({"type": "input_text", "text": text}) - else: - text = item.get("text") - if isinstance(text, str): - content.append({"type": "input_text", "text": text}) - else: - raise TypeError("Invalid message content type %s" % item_type) - - return content - - def _convert_assistant_content(self, items: list[dict]) -> list[dict]: - content = [] - - for item in items: - item_type = item.get("type") - - if item_type in {"text", "output_text", "input_text"}: - content.append({"type": "output_text", "text": item.get("text", "")}) - elif item_type == "refusal": - content.append( - { - "type": "refusal", - "refusal": item.get("refusal", ""), - } - ) - elif item_type == "reasoning_content": - continue - else: - text = item.get("text") - if isinstance(text, str): - content.append({"type": "output_text", "text": text}) - - return content - - def _coerce_user_valves(self, raw: Any) -> "Pipe.UserValves": - if isinstance(raw, self.UserValves): - return raw + def _last_user_message_index(self, messages: list[dict]) -> int: + for index in range(len(messages) - 1, -1, -1): + if messages[index].get("role") == "user": + return index + return len(messages) - 1 + + def _convert_content(self, content: object, role: str, files: list[dict]) -> object: + if isinstance(content, str) and not files: + return content + + parts = [] + + if isinstance(content, str): + if content: + parts.append(self._text_part(content, role)) + elif isinstance(content, list): + for item in content: + part = self._convert_content_part(item, role) + if part: + parts.append(part) + else: + raise TypeError("Invalid message content type %s" % type(content)) - if raw is None: - return self.UserValves() + for file in files: + part = self._convert_file_part(file) + if part: + parts.append(part) - if isinstance(raw, dict): - return self.UserValves(**raw) + return parts or "" - if isinstance(raw, BaseModel): - if hasattr(raw, "model_dump"): - return self.UserValves(**raw.model_dump()) - return self.UserValves(**raw.dict()) + def _convert_content_part(self, item: object, role: str) -> Optional[dict]: + if not isinstance(item, dict): + return None - fields = getattr(self.UserValves, "model_fields", None) or getattr(self.UserValves, "__fields__", {}) - data = {name: getattr(raw, name) for name in fields if hasattr(raw, name)} - return self.UserValves(**data) + item_type = item.get("type") + + if item_type in {"text", "input_text", "output_text"}: + text = item.get("text", "") + if isinstance(text, str): + return self._text_part(text, role) + + if item_type in {"image_url", "input_image"}: + image_url = self._url_string(item.get("image_url") or item.get("url")) + + part = {"type": "input_image"} + if image_url: + part["image_url"] = image_url + if item.get("file_id"): + part["file_id"] = item["file_id"] + if item.get("detail"): + part["detail"] = item["detail"] + return part if len(part) > 1 else None + + if item_type in {"file", "input_file"}: + return self._convert_file_part(item) + + if item_type == "refusal": + refusal = item.get("refusal", "") + if isinstance(refusal, str): + return self._text_part(refusal, role) + + text = item.get("text") + if isinstance(text, str): + return self._text_part(text, role) + + return self._convert_file_part(item) + + def _convert_file_part(self, item: dict) -> Optional[dict]: + file = item.get("file") if isinstance(item.get("file"), dict) else item + if self._is_image_file(file): + return self._convert_image_file_part(file) + + part = {"type": "input_file"} + + file_id = self._openai_file_id(file) + filename = file.get("filename") or file.get("name") + file_url = self._url_string(file.get("file_url") or file.get("url")) + file_data = self._data_url( + file.get("file_data") or file.get("data"), + filename=filename, + mime_type=self._mime_type(file, filename), + ) - def _build_tools(self) -> list[dict]: - if not self.valves.enable_web_search: - return [] + if file_id: + part["file_id"] = file_id + if file_url: + part["file_url"] = file_url + if file_data: + part["file_data"] = file_data + if filename: + part["filename"] = filename + + if file_id or file_url or file_data: + return part + return None + + def _convert_image_file_part(self, file: dict) -> Optional[dict]: + part = {"type": "input_image"} + + file_id = self._openai_file_id(file) + image_url = self._url_string(file.get("image_url") or file.get("url")) + image_data = self._data_url( + file.get("file_data") or file.get("data"), + filename=file.get("filename") or file.get("name"), + mime_type=self._mime_type(file, file.get("filename") or file.get("name")), + default_mime_type="image/png", + ) - tool = { - "type": "web_search", - "search_context_size": self.valves.web_search_context_size, - } + if file_id: + part["file_id"] = file_id + if image_url: + part["image_url"] = image_url + elif image_data: + part["image_url"] = image_data + + if file.get("detail"): + part["detail"] = file["detail"] + + if file_id or image_url or image_data: + return part + return None + + def _is_image_file(self, file: dict) -> bool: + file_type = str( + file.get("mime_type") + or file.get("mime") + or file.get("content_type") + or file.get("type") + or "" + ).lower() + filename = str(file.get("filename") or file.get("name") or "").lower() + url = str(self._url_string(file.get("image_url") or file.get("url")) or "").lower() + data = self._extract_data_string(file.get("file_data") or file.get("data")) + data_prefix = (data or "")[:32].lower() + + return ( + file_type.startswith("image/") + or file_type == "image" + or filename.endswith((".png", ".jpg", ".jpeg", ".webp", ".gif")) + or Path(url).suffix in {".png", ".jpg", ".jpeg", ".webp", ".gif"} + or data_prefix.startswith("data:image/") + ) - domains = self._parse_domains(self.valves.web_search_domains) - if domains: - tool["filters"] = {"allowed_domains": domains} + def _openai_file_id(self, file: dict) -> Optional[str]: + file_id = file.get("file_id") + if isinstance(file_id, str) and file_id: + return file_id - user_location = self._build_user_location() - if user_location: - tool["user_location"] = user_location + raw_id = file.get("id") + if isinstance(raw_id, str) and raw_id.startswith("file-"): + return raw_id - return [tool] + return None - def _build_user_location(self) -> Optional[dict]: - values = { - "type": "approximate", - "country": (self.valves.web_search_country or "").strip().upper(), - "city": (self.valves.web_search_city or "").strip(), - "region": (self.valves.web_search_region or "").strip(), - "timezone": (self.valves.web_search_timezone or "").strip(), - } - if not any( - [ - values["country"], - values["city"], - values["region"], - values["timezone"], - ] - ): + def _data_url( + self, + raw: object, + filename: object = None, + mime_type: Optional[str] = None, + default_mime_type: str = "application/octet-stream", + ) -> Optional[str]: + data = self._extract_data_string(raw) + if not data: return None - return {k: v for k, v in values.items() if v} - - def _parse_domains(self, raw_domains: Optional[str]) -> list[str]: - domains = [] - for domain in (raw_domains or "").split(","): - domain = domain.strip().lower() - if not domain: - continue - if domain.startswith("http://"): - domain = domain[7:] - elif domain.startswith("https://"): - domain = domain[8:] - domain = domain.rstrip("/") - if domain: - domains.append(domain) - return domains - - def _summarize_tools(self, tools: list[dict]) -> list[str]: - summary = [] - for tool in tools: - if not isinstance(tool, dict): - summary.append(str(type(tool))) - continue - - tool_type = tool.get("type", "") - if tool_type == "function": - fn = tool.get("function", {}) or {} - summary.append(f"function:{fn.get('name', 'unknown')}") - else: - summary.append(tool_type or "unknown") - return summary - - def _is_gpt5_family(self, model: str) -> bool: - model = (model or "").lower() - return model.startswith("gpt-5") - - def _normalize_reasoning_effort(self, model: str, effort: str, using_web_search: bool) -> str: - model = (model or "").lower() - - if model.startswith(("gpt-5.1", "gpt-5.2")): - allowed_efforts = {"none", "low", "medium", "high"} - default_effort = "none" - elif model.startswith("gpt-5"): - allowed_efforts = {"low", "medium", "high"} - default_effort = "low" - else: - return effort - - if effort == "xhigh": - return "high" - if effort in allowed_efforts: - return effort - return default_effort - - def _extract_sources_from_response(self, response: dict) -> list[dict]: - sources = [] - - for item in response.get("output", []): - if item.get("type") == "web_search_call": - action = item.get("action", {}) or {} - for source in action.get("sources", []) or []: - if source.get("url"): - sources.append( - { - "url": source["url"], - "title": source.get("title", ""), - } - ) - if item.get("type") == "message": - for content in item.get("content", []): - for annotation in content.get("annotations", []) or []: - if annotation.get("type") == "url_citation" and annotation.get("url"): - sources.append( - { - "url": annotation["url"], - "title": annotation.get("title", ""), - } - ) - - return sources - - def _render_sources(self, annotations: list[dict], sources: list[dict]) -> str: - merged = [] - - for annotation in annotations: - if annotation.get("url"): - merged.append( - { - "url": annotation["url"], - "title": annotation.get("title", ""), - } - ) - - merged.extend(sources) - - deduped = [] - seen = set() - for item in merged: - url = item.get("url", "").strip() - if not url or url in seen: - continue - seen.add(url) - deduped.append( - { - "url": url, - "title": item.get("title", "").strip(), - } - ) + data = data.strip() + if data.startswith("data:"): + return data - if not deduped: - return "" + if data.startswith(("http://", "https://")): + return None - lines = ["", "", "Sources:"] - for idx, item in enumerate(deduped, start=1): - title = item["title"] or item["url"] - lines.append(f"{idx}. {title} - {item['url']}") - return "\n".join(lines) + mime_type = mime_type or self._mime_type({}, filename) or default_mime_type + return f"data:{mime_type};base64,{data}" - async def _read_error_text(self, response: Response) -> str: - return "".join([line async for line in response.aiter_lines()]) + def _url_string(self, raw: object) -> Optional[str]: + if isinstance(raw, str): + return raw - def _format_upstream_error(self, message: str, request_id: str = "", code: str = "") -> str: - parts = [message] - if code: - parts.append(f"code={code}") - if request_id: - parts.append(f"request_id={request_id}") - return " | ".join(parts) + if isinstance(raw, dict): + for key in ("url", "href", "file_url", "image_url"): + value = raw.get(key) + if isinstance(value, str): + return value - def _format_status_data(self, description: str, done: bool) -> str: - data = { - "event": { - "type": "status", - "data": { - "description": description, - "done": done, - }, - } - } - return f"data: {json.dumps(data, ensure_ascii=False)}\n\n" + return None - def _format_stream_data( - self, - model: Optional[str] = "", - delta: Optional[dict] = None, - usage: Optional[dict] = None, - if_finished: bool = False, - ) -> str: + def _extract_data_string(self, raw: object) -> Optional[str]: + if isinstance(raw, str): + return raw + + if isinstance(raw, bytes): + return raw.decode("utf-8", errors="ignore") + + if isinstance(raw, dict): + for key in ( + "data", + "content", + "base64", + "base64_data", + "file_data", + "body", + "value", + ): + value = raw.get(key) + extracted = self._extract_data_string(value) + if extracted: + return extracted + + return None + + def _mime_type(self, file: dict, filename: object = None) -> Optional[str]: + explicit = ( + file.get("mime_type") + or file.get("mime") + or file.get("content_type") + or file.get("type") + ) + if isinstance(explicit, str) and "/" in explicit: + return explicit + + if isinstance(filename, str): + guessed, _ = mimetypes.guess_type(filename) + if guessed: + return guessed + + return None + + def _text_part(self, text: str, role: str) -> dict: + if role == "assistant": + return {"type": "output_text", "text": text} + return {"type": "input_text", "text": text} + + def _chunk(self, model: str, delta: dict, finished: bool = False) -> str: data = { "id": f"chat.{uuid.uuid4().hex}", "object": "chat.completion.chunk", + "created": int(time.time()), + "model": model, "choices": [ { - "finish_reason": "stop" if if_finished else "", "index": 0, - "delta": delta or {}, + "finish_reason": "stop" if finished else None, + "delta": delta, } ], - "created": int(time.time()), - "model": model, } + return f"data: {json.dumps(data, ensure_ascii=False)}\n\n" - if usage: - data["usage"] = usage - + def _status(self, description: str, done: bool) -> str: + data = { + "event": { + "type": "status", + "data": { + "description": description, + "done": done, + }, + } + } return f"data: {json.dumps(data, ensure_ascii=False)}\n\n"