diff --git a/README.md b/README.md
index 7af52c3..6c056b0 100644
--- a/README.md
+++ b/README.md
@@ -39,6 +39,7 @@ Pipes integrate external models, services, or complex workflows into OpenWebUI.
| | [`openrouter_reasoning.py`](pipes/openrouter_reasoning.py) | Integration with OpenRouter reasoning models |
| **DeepSeek** | [`deepseek_reasoning.py`](pipes/deepseek_reasoning.py) | Integration with DeepSeek reasoning models |
| **OAIPro** | [`oaipro_reasoning.py`](pipes/oaipro_reasoning.py) | Integration with OAIPro reasoning |
+| **Doubao** | [`doubao_image.py`](pipes/doubao_image.py) | Image generation using Doubao Seedream |
### Tools
Tools provide specific functionalities that can be called by the LLM (Function Calling).
diff --git a/README_zh-CN.md b/README_zh-CN.md
index 6fb0013..f7a53cb 100644
--- a/README_zh-CN.md
+++ b/README_zh-CN.md
@@ -39,6 +39,7 @@
| | [`openrouter_reasoning.py`](pipes/openrouter_reasoning.py) | 集成 OpenRouter 推理模型 |
| **DeepSeek** | [`deepseek_reasoning.py`](pipes/deepseek_reasoning.py) | 集成 DeepSeek 推理模型 |
| **OAIPro** | [`oaipro_reasoning.py`](pipes/oaipro_reasoning.py) | 集成 OAIPro 推理 |
+| **Doubao** | [`doubao_image.py`](pipes/doubao_image.py) | 使用 Doubao Seedream 生成图像 |
### Tools (工具)
工具提供可由 LLM 调用的特定功能 (Function Calling)。
diff --git a/pipes/doubao_image.py b/pipes/doubao_image.py
index c55e83f..a2d343c 100644
--- a/pipes/doubao_image.py
+++ b/pipes/doubao_image.py
@@ -3,13 +3,12 @@
description: Image generation with Doubao Seedream 5.0
author: yuzukumo
git_url: https://github.com/OVINC-CN/OpenWebUIPlugin.git
-version: 0.1.0
+version: 0.1.1
licence: MIT
"""
import base64
import binascii
-import io
import json
import logging
import mimetypes
@@ -18,13 +17,13 @@
from typing import Any, AsyncIterable, List, Literal, Optional, Tuple
import httpx
-from fastapi import BackgroundTasks, Request, UploadFile
+from fastapi import Request
from httpx import Response
from open_webui.env import GLOBAL_LOG_LEVEL
-from open_webui.models.users import UserModel, Users
-from open_webui.routers.files import get_file_content_by_id, upload_file
+from open_webui.models.users import UserModel
+from open_webui.routers.files import get_file_content_by_id
+from open_webui.routers.images import upload_image
from pydantic import BaseModel, Field
-from starlette.datastructures import Headers
from starlette.responses import StreamingResponse
logger = logging.getLogger(__name__)
@@ -83,16 +82,26 @@ def pipes(self) -> List[dict]:
{"id": model.strip(), "name": model.strip()} for model in self.valves.models.split(",") if model.strip()
]
- async def pipe(self, body: dict, __user__: dict, __request__: Request) -> StreamingResponse:
+ async def pipe(
+ self,
+ body: dict,
+ __user__: dict,
+ __request__: Request,
+ __metadata__: Optional[dict] = None,
+ ) -> StreamingResponse:
return StreamingResponse(
- self._pipe(body=body, __user__=__user__, __request__=__request__),
+ self._pipe(body=body, __user__=__user__, __request__=__request__, __metadata__=__metadata__),
media_type="text/event-stream",
)
- async def _pipe(self, body: dict, __user__: dict, __request__: Request) -> AsyncIterable[str]:
- user = Users.get_user_by_id(__user__["id"])
- if not user:
- raise ValueError("user not found")
+ async def _pipe(
+ self,
+ body: dict,
+ __user__: dict,
+ __request__: Request,
+ __metadata__: Optional[dict] = None,
+ ) -> AsyncIterable[str]:
+ user = self._get_user(__user__)
model, payload = await self._build_payload(user=user, body=body, user_valves=__user__.get("valves", {}))
@@ -112,10 +121,11 @@ async def _pipe(self, body: dict, __user__: dict, __request__: Request) -> Async
)
response_json = response.json()
- content, usage = self._parse_response_content(
+ content, usage = await self._parse_response_content(
response_json=response_json,
user=user,
__request__=__request__,
+ metadata=__metadata__,
output_format=payload["json"]["output_format"],
)
if body.get("stream"):
@@ -216,20 +226,22 @@ async def _parse_messages(self, user: UserModel, body: dict) -> Tuple[str, List[
prompt = body.get("prompt", "Please generate an image.")
return prompt, images
- def _parse_response_content(
+ async def _parse_response_content(
self,
response_json: dict,
user: UserModel,
__request__: Request,
+ metadata: Optional[dict],
output_format: str,
) -> Tuple[str, Optional[dict]]:
results: List[str] = []
mime_type = "image/png" if output_format == "png" else "image/jpeg"
for item in response_json.get("data", []):
- image_markdown = self._render_response_item(
+ image_markdown = await self._render_response_item(
item=item,
user=user,
__request__=__request__,
+ metadata=metadata,
fallback_mime_type=mime_type,
)
if image_markdown:
@@ -243,11 +255,12 @@ def _parse_response_content(
return "\n\n".join(results), response_json.get("usage")
- def _render_response_item(
+ async def _render_response_item(
self,
item: dict,
user: UserModel,
__request__: Request,
+ metadata: Optional[dict],
fallback_mime_type: str,
) -> str:
if not isinstance(item, dict):
@@ -258,11 +271,12 @@ def _render_response_item(
mime_type = item.get("mime_type") or fallback_mime_type
if b64_json:
- return self._upload_image(
+ return await self._upload_image(
__request__=__request__,
user=user,
image_data=b64_json,
mime_type=mime_type,
+ metadata=metadata,
)
if image_url:
@@ -270,31 +284,22 @@ def _render_response_item(
return ""
- def _upload_image(
+ async def _upload_image(
self,
__request__: Request,
user: UserModel,
image_data: str,
mime_type: str,
+ metadata: Optional[dict],
) -> str:
image_bytes = self._decode_base64_image(image_data)
- file_ext = mimetypes.guess_extension(mime_type) or ".png"
- if file_ext == ".jpe":
- file_ext = ".jpg"
-
- file_item = upload_file(
+ file_item, image_url = await upload_image(
request=__request__,
- background_tasks=BackgroundTasks(),
- file=UploadFile(
- file=io.BytesIO(image_bytes),
- filename=f"generated-image-{uuid.uuid4().hex}{file_ext}",
- headers=Headers({"content-type": mime_type}),
- ),
- process=False,
+ image_data=image_bytes,
+ content_type=mime_type,
+ metadata={"mime_type": mime_type, **(metadata or {})},
user=user,
- metadata={"mime_type": mime_type},
)
- image_url = __request__.app.url_path_for("get_file_content_by_id", id=file_item.id)
return f""
async def _get_image_data_url_from_markdown(self, user: UserModel, markdown_string: str) -> str:
@@ -365,6 +370,19 @@ def _normalize_user_valves(user_valves: Any) -> "Pipe.UserValves":
return Pipe.UserValves(**user_valves.dict())
return Pipe.UserValves(**(user_valves or {}))
+ @staticmethod
+ def _get_user(user_data: Any) -> UserModel:
+ if isinstance(user_data, UserModel):
+ return user_data
+ if isinstance(user_data, BaseModel):
+ if hasattr(user_data, "model_dump"):
+ user_data = user_data.model_dump()
+ else:
+ user_data = user_data.dict()
+ if isinstance(user_data, dict):
+ return UserModel(**{k: v for k, v in user_data.items() if k != "valves"})
+ raise ValueError("user not found")
+
@staticmethod
def _format_data(
is_stream: bool,
diff --git a/pipes/gemini_image.py b/pipes/gemini_image.py
index 62a5453..a1f172b 100644
--- a/pipes/gemini_image.py
+++ b/pipes/gemini_image.py
@@ -1,28 +1,27 @@
"""
title: Gemini Image
description: Image generation with Gemini
-author: OVINC CN
+author: OVINC CN, yuzukumo
git_url: https://github.com/OVINC-CN/OpenWebUIPlugin.git
-version: 0.0.11
+version: 0.1.0
licence: MIT
"""
import base64
-import io
import json
import logging
import time
import uuid
-from typing import AsyncIterable, Literal, Optional, Tuple
+from typing import Any, AsyncIterable, List, Literal, Optional, Tuple
import httpx
-from fastapi import BackgroundTasks, Request, UploadFile
+from fastapi import Request
from httpx import Response
from open_webui.env import GLOBAL_LOG_LEVEL
-from open_webui.models.users import UserModel, Users
-from open_webui.routers.files import get_file_content_by_id, upload_file
+from open_webui.models.users import UserModel
+from open_webui.routers.files import get_file_content_by_id
+from open_webui.routers.images import get_image_data, upload_image
from pydantic import BaseModel, Field
-from starlette.datastructures import Headers
from starlette.responses import StreamingResponse
logger = logging.getLogger(__name__)
@@ -58,36 +57,64 @@ class Valves(BaseModel):
api_key: str = Field(default="", title="API Key")
timeout: int = Field(default=600, title="请求超时时间 (秒)")
proxy: Optional[str] = Field(default="", title="代理地址")
- models: str = Field(default="gemini-3-pro-image-preview", title="模型", description="使用英文逗号分隔多个模型")
+ models: str = Field(
+ default="gemini-3.1-flash-image-preview,gemini-3-pro-image-preview",
+ title="模型",
+ description="使用英文逗号分隔多个模型",
+ )
response_modalities: Literal["TEXT", "IMAGE", "TEXT,IMAGE"] = Field(
default="IMAGE", title="响应模态", description="使用英文逗号分隔"
)
class UserValves(BaseModel):
image_size: Literal["512", "1K", "2K", "4K"] = Field(default="1K", title="图片大小 (像素)")
- aspect_ratio: Literal["1:1", "2:3", "3:2", "3:4", "4:3", "4:5", "5:4", "9:16", "16:9", "21:9"] = Field(
- default="1:1", title="图片比例"
- )
+ aspect_ratio: Literal[
+ "1:1",
+ "1:4",
+ "1:8",
+ "2:3",
+ "3:2",
+ "3:4",
+ "4:1",
+ "4:3",
+ "4:5",
+ "5:4",
+ "8:1",
+ "9:16",
+ "16:9",
+ "21:9",
+ ] = Field(default="1:1", title="图片比例")
def __init__(self):
self.valves = self.Valves()
- self.user_valves = self.UserValves()
def pipes(self):
- return [{"id": model, "name": model} for model in self.valves.models.split(",")]
+ return [
+ {"id": model.strip(), "name": model.strip()} for model in self.valves.models.split(",") if model.strip()
+ ]
async def pipe(
self,
body: dict,
__user__: dict,
__request__: Request,
+ __metadata__: Optional[dict] = None,
) -> StreamingResponse:
- return StreamingResponse(self._pipe(body=body, __user__=__user__, __request__=__request__))
+ return StreamingResponse(
+ self._pipe(body=body, __user__=__user__, __request__=__request__, __metadata__=__metadata__),
+ media_type="text/event-stream",
+ )
+
+ async def _pipe(
+ self,
+ body: dict,
+ __user__: dict,
+ __request__: Request,
+ __metadata__: Optional[dict] = None,
+ ) -> AsyncIterable[str]:
+ user = self._get_user(__user__)
+ model, payload = await self._build_payload(user=user, body=body, user_valves=__user__.get("valves", {}))
- async def _pipe(self, body: dict, __user__: dict, __request__: Request) -> AsyncIterable:
- user = Users.get_user_by_id(__user__["id"])
- model, payload = await self._build_payload(user=user, body=body, user_valves=__user__["valves"])
- # call client
async with httpx.AsyncClient(
headers={"x-goog-api-key": self.valves.api_key},
proxy=self.valves.proxy or None,
@@ -97,10 +124,10 @@ async def _pipe(self, body: dict, __user__: dict, __request__: Request) -> Async
response = await client.post(**payload)
if response.status_code != 200:
raise APIException(status=response.status_code, content=response.text, response=response)
- response = response.json()
- # upload image
+ response_json = response.json()
+
results = []
- for item in response["candidates"]:
+ for item in response_json.get("candidates", []):
content = item.get("content", {})
if not content:
results.append(item.get("finishReason", ""))
@@ -111,31 +138,26 @@ async def _pipe(self, body: dict, __user__: dict, __request__: Request) -> Async
continue
for part in parts:
if "text" in part:
- if part["text"].endswith("`"):
- results.append(part["text"][:-1])
- else:
- results.append(part["text"])
+ results.append(part["text"])
if "inlineData" in part:
inline_data = part["inlineData"]
results.append(
- self._upload_image(
+ await self._upload_image(
__request__=__request__,
user=user,
image_data=inline_data["data"],
mime_type=inline_data["mimeType"],
+ metadata=__metadata__,
)
)
- # format response data
- usage_metadata = response.get("usageMetadata", None)
+ usage_metadata = dict(response_json.get("usageMetadata") or {})
usage = {
- "prompt_tokens": usage_metadata.pop("promptTokenCount", 0) if usage_metadata else 0,
- "completion_tokens": usage_metadata.pop("candidatesTokenCount", 0) if usage_metadata else 0,
- "total_tokens": usage_metadata.pop("totalTokenCount", 0) if usage_metadata else 0,
- "prompt_tokens_details": {
- "cached_tokens": (usage_metadata.get("cachedContentTokenCount", 0) if usage_metadata else 0)
- },
- "metadata": usage_metadata or {},
+ "prompt_tokens": usage_metadata.pop("promptTokenCount", 0),
+ "completion_tokens": usage_metadata.pop("candidatesTokenCount", 0),
+ "total_tokens": usage_metadata.pop("totalTokenCount", 0),
+ "prompt_tokens_details": {"cached_tokens": usage_metadata.get("cachedContentTokenCount", 0)},
+ "metadata": usage_metadata,
}
if usage_metadata and "toolUsePromptTokenCount" in usage_metadata:
usage["prompt_tokens"] += usage_metadata["toolUsePromptTokenCount"]
@@ -152,69 +174,98 @@ async def _pipe(self, body: dict, __user__: dict, __request__: Request) -> Async
else:
yield self._format_data(is_stream=False, model=model, content=content, usage=usage)
- def _upload_image(self, __request__: Request, user: UserModel, image_data: str, mime_type: str) -> str:
- file_item = upload_file(
+ async def _upload_image(
+ self,
+ __request__: Request,
+ user: UserModel,
+ image_data: str,
+ mime_type: str,
+ metadata: Optional[dict],
+ ) -> str:
+ file_item, image_url = await upload_image(
request=__request__,
- background_tasks=BackgroundTasks(),
- file=UploadFile(
- file=io.BytesIO(base64.b64decode(image_data)),
- filename=f"generated-image-{uuid.uuid4().hex}.png",
- headers=Headers({"content-type": mime_type}),
- ),
- process=False,
+ image_data=base64.b64decode(image_data),
+ content_type=mime_type,
+ metadata={"mime_type": mime_type, **(metadata or {})},
user=user,
- metadata={"mime_type": mime_type},
)
- image_url = __request__.app.url_path_for("get_file_content_by_id", id=file_item.id)
return f""
- async def _get_image_content(self, user: UserModel, markdown_string: str):
- file_id = markdown_string.split("![gemini-image-")[1].split("]")[0]
+ async def _markdown_to_inline_image_part(self, user: UserModel, markdown_string: str) -> dict:
+ file_id = self._extract_file_id_from_markdown(markdown_string)
+ if not file_id:
+ raise ValueError("invalid image markdown")
+
file_response = await get_file_content_by_id(id=file_id, user=user)
- return open(file_response.path, "rb")
+ with open(file_response.path, "rb") as file_content:
+ image_bytes = file_content.read()
+
+ mime_type = file_response.media_type or "image/png"
+ return {
+ "inline_data": {
+ "mime_type": mime_type,
+ "data": base64.b64encode(image_bytes).decode(),
+ }
+ }
- async def _build_payload(self, user: UserModel, body: dict, user_valves: UserValves) -> Tuple[str, dict]:
- # payload
- model = body["model"].split(".", 1)[1]
- parts = []
+ async def _build_payload(self, user: UserModel, body: dict, user_valves: Any) -> Tuple[str, dict]:
+ model = body["model"].split(".", 1)[1] if "." in body["model"] else body["model"]
+ user_valves = self._normalize_user_valves(user_valves)
+ self._validate_model_options(model=model, user_valves=user_valves)
+ parts: List[dict] = []
+ has_text = False
+ image_count = 0
- # read messages
- messages = body["messages"]
- if len(messages) >= 2:
- messages = messages[-2:]
+ messages = body.get("messages", [])
+ if len(messages) > 6:
+ messages = messages[-6:]
for message in messages:
- # ignore system message
- if message["role"] == "system":
+ role = message.get("role")
+ if role == "system":
continue
- # parse content
- message_content = message["content"]
- # str content
+
+ allow_text = role != "assistant"
+ message_content = message.get("content")
if isinstance(message_content, str):
for item in message_content.split("\n"):
+ item = item.strip()
if not item:
continue
- if item.startswith("![gemini-image-"):
- file = await self._get_image_content(user, item)
- parts.append(
- {"inline_data": {"mime_type": "image/png", "data": base64.b64encode(file.read()).decode()}}
- )
+ if item.startswith("![") and "-image-" in item:
+ parts.append(await self._markdown_to_inline_image_part(user, item))
+ image_count += 1
continue
- parts.append({"text": message_content})
- # list content
- elif isinstance(message_content, list):
+ if allow_text:
+ parts.append({"text": item})
+ has_text = True
+ continue
+
+ if isinstance(message_content, list):
for content in message_content:
- if content["type"] == "text":
- parts.append({"text": content["text"]})
+ content_type = content.get("type")
+ if content_type == "text":
+ text = content.get("text", "").strip()
+ if text and allow_text:
+ parts.append({"text": text})
+ has_text = True
continue
- if content["type"] == "image_url":
- image_url = content["image_url"]["url"]
- header, encoded = image_url.split(",", 1)
- mime_type = header.split(";")[0].split(":")[1]
- parts.append({"inline_data": {"mime_type": mime_type, "data": encoded}})
- else:
- raise TypeError("message content invalid")
+ if content_type in {"image_url", "input_image"}:
+ parts.append(await self._content_to_inline_image_part(content))
+ image_count += 1
+ continue
+ raise TypeError("message content invalid")
+ continue
+
+ raise TypeError("message content invalid")
+
+ if image_count > 14:
+ raise ValueError("Gemini 3 image models support up to 14 reference images.")
+
+ if not has_text:
+ prompt = body.get("prompt", "Please generate an image.").strip()
+ if prompt:
+ parts.append({"text": prompt})
- # init payload
payload = {
"url": self.valves.base_url.format(model=model),
"json": {
@@ -229,12 +280,76 @@ async def _build_payload(self, user: UserModel, body: dict, user_valves: UserVal
},
}
- # check tools
if body.get("tools", []):
payload["json"]["tools"] = body["tools"]
return model, payload
+ async def _content_to_inline_image_part(self, content: dict) -> dict:
+ image_url = self._extract_image_url(content)
+ if not image_url:
+ raise TypeError("message content invalid")
+
+ image_bytes, mime_type = await get_image_data(image_url)
+ if image_bytes is None or not mime_type:
+ raise ValueError("invalid image input")
+
+ return {
+ "inline_data": {
+ "mime_type": mime_type,
+ "data": base64.b64encode(image_bytes).decode(),
+ }
+ }
+
+ @staticmethod
+ def _extract_image_url(content: dict) -> str:
+ image_url = content.get("image_url", "")
+ if isinstance(image_url, dict):
+ image_url = image_url.get("url", "")
+ return image_url if isinstance(image_url, str) else ""
+
+ @staticmethod
+ def _extract_file_id_from_markdown(markdown_string: str) -> str:
+ try:
+ alt_text = markdown_string.split("![", 1)[1].split("]", 1)[0]
+ if "-image-" not in alt_text:
+ return ""
+ return alt_text.rsplit("-image-", 1)[1]
+ except Exception:
+ return ""
+
+ @staticmethod
+ def _normalize_user_valves(user_valves: Any) -> "Pipe.UserValves":
+ if isinstance(user_valves, Pipe.UserValves):
+ return user_valves
+ if isinstance(user_valves, BaseModel):
+ if hasattr(user_valves, "model_dump"):
+ return Pipe.UserValves(**user_valves.model_dump())
+ return Pipe.UserValves(**user_valves.dict())
+ return Pipe.UserValves(**(user_valves or {}))
+
+ @staticmethod
+ def _validate_model_options(model: str, user_valves: "Pipe.UserValves") -> None:
+ flash_only_aspect_ratios = {"1:4", "1:8", "4:1", "8:1"}
+ if model == "gemini-3-pro-image-preview":
+ if user_valves.image_size == "512":
+ raise ValueError("Gemini 3 Pro Image Preview does not support 512 resolution.")
+ if user_valves.aspect_ratio in flash_only_aspect_ratios:
+ raise ValueError("Gemini 3 Pro Image Preview does not support 1:4, 1:8, 4:1, or 8:1 aspect ratios.")
+
+ @staticmethod
+ def _get_user(user_data: UserModel | BaseModel | dict) -> UserModel:
+ if isinstance(user_data, UserModel):
+ return user_data
+ if isinstance(user_data, BaseModel):
+ if hasattr(user_data, "model_dump"):
+ user_data = user_data.model_dump()
+ else:
+ user_data = user_data.dict()
+ if isinstance(user_data, dict):
+ return UserModel(**{k: v for k, v in user_data.items() if k != "valves"})
+ raise ValueError("user not found")
+
def _format_data(
self,
is_stream: bool,
diff --git a/pipes/grok_image.py b/pipes/grok_image.py
index 24a422e..3bc8e46 100644
--- a/pipes/grok_image.py
+++ b/pipes/grok_image.py
@@ -1,28 +1,28 @@
"""
title: Grok Image
description: Image generation with Grok
-author: OVINC CN
+author: OVINC CN, yuzukumo
git_url: https://github.com/OVINC-CN/OpenWebUIPlugin.git
-version: 0.1.1
+version: 0.1.2
licence: MIT
"""
import base64
-import io
import json
import logging
+import mimetypes
import time
import uuid
from typing import AsyncIterable, Literal, Optional, Tuple
import httpx
-from fastapi import BackgroundTasks, Request, UploadFile
+from fastapi import Request
from httpx import Response
from open_webui.env import GLOBAL_LOG_LEVEL
-from open_webui.models.users import UserModel, Users
-from open_webui.routers.files import get_file_content_by_id, upload_file
+from open_webui.models.users import UserModel
+from open_webui.routers.files import get_file_content_by_id
+from open_webui.routers.images import get_image_data, upload_image
from pydantic import BaseModel, Field
-from starlette.datastructures import Headers
from starlette.responses import StreamingResponse
logger = logging.getLogger(__name__)
@@ -56,13 +56,14 @@ class Valves(BaseModel):
num_of_images: int = Field(default=1, title="图片数量", ge=1, le=10)
timeout: int = Field(default=600, title="请求超时时间 (秒)")
proxy: Optional[str] = Field(default="", title="代理地址")
- models: str = Field(default="grok-imagine-image-pro", title="模型", description="使用英文逗号分隔多个模型")
+ models: str = Field(
+ default="grok-imagine-image,grok-imagine-image-pro",
+ title="模型",
+ description="使用英文逗号分隔多个模型",
+ )
class UserValves(BaseModel):
- enable_nsfw: bool = Field(default=False, title="是否启用NSFW内容")
- is_kids_mode: bool = Field(default=False, title="是否启用儿童模式")
resolution: Literal["1k", "2k"] = Field(default="1k", title="图片分辨率")
- quality: Literal["low", "medium", "high"] = Field(default="medium", title="图片质量")
aspect_ratio: Literal[
"1:1", "3:4", "4:3", "9:16", "16:9", "2:3", "3:2", "9:19.5", "19.5:9", "9:20", "20:9", "1:2", "2:1", "auto"
] = Field(default="auto", title="图片比例")
@@ -79,13 +80,23 @@ async def pipe(
body: dict,
__user__: dict,
__request__: Request,
+ __metadata__: Optional[dict] = None,
) -> StreamingResponse:
- return StreamingResponse(self._pipe(body=body, __user__=__user__, __request__=__request__))
+ return StreamingResponse(
+ self._pipe(body=body, __user__=__user__, __request__=__request__, __metadata__=__metadata__)
+ )
- async def _pipe(self, body: dict, __user__: dict, __request__: Request) -> AsyncIterable:
- user = Users.get_user_by_id(__user__["id"])
- model, payload = await self._build_payload(user=user, body=body, user_valves=__user__["valves"])
- # call client
+ async def _pipe(
+ self,
+ body: dict,
+ __user__: dict,
+ __request__: Request,
+ __metadata__: Optional[dict] = None,
+ ) -> AsyncIterable[str]:
+ user = self._get_user(__user__)
+ model, payload, multipart_fallback = await self._build_payload(
+ user=user, body=body, user_valves=__user__.get("valves", {})
+ )
async with httpx.AsyncClient(
base_url=self.valves.base_url,
headers={"Authorization": f"Bearer {self.valves.api_key}"},
@@ -94,28 +105,35 @@ async def _pipe(self, body: dict, __user__: dict, __request__: Request) -> Async
timeout=self.valves.timeout,
) as client:
response = await client.post(**payload)
+ if response.status_code != 200 and multipart_fallback and self._should_retry_with_multipart(response.text):
+ response = await client.post(**multipart_fallback)
if response.status_code != 200:
raise APIException(status=response.status_code, content=response.text, response=response)
response = response.json()
# upload image
results = []
- for item in response["data"]:
- results.append(
- self._upload_image(
- __request__=__request__,
- user=user,
- image_data=item["b64_json"],
- mime_type=item["mime_type"],
+ for item in response.get("data", []):
+ b64_json = item.get("b64_json")
+ image_url = item.get("url")
+ if b64_json:
+ results.append(
+ await self._upload_image(
+ __request__=__request__,
+ user=user,
+ image_data=b64_json,
+ mime_type=item.get("mime_type", "image/jpeg"),
+ metadata=__metadata__,
+ )
)
- )
+ elif image_url:
+ results.append(f"")
+
+ if not results and response.get("error"):
+ raise ValueError(response["error"].get("message", "Unknown API error"))
+ if not results:
+ raise ValueError("No image returned by xAI image API")
# format response data
- usage_metadata = response.get("usage", None) or {}
- usage = {
- "prompt_tokens": len(payload["json"].get("images") or []),
- "completion_tokens": len(results),
- "metadata": usage_metadata or {},
- }
- usage["total_tokens"] = usage["prompt_tokens"] + usage["completion_tokens"]
+ usage = response.get("usage", None)
# response
content = "\n\n".join(results)
if body.get("stream"):
@@ -124,35 +142,44 @@ async def _pipe(self, body: dict, __user__: dict, __request__: Request) -> Async
else:
yield self._format_data(is_stream=False, model=model, content=content, usage=usage)
- def _upload_image(self, __request__: Request, user: UserModel, image_data: str, mime_type: str) -> str:
- file_item = upload_file(
+ async def _upload_image(
+ self,
+ __request__: Request,
+ user: UserModel,
+ image_data: str,
+ mime_type: str,
+ metadata: Optional[dict],
+ ) -> str:
+ file_item, image_url = await upload_image(
request=__request__,
- background_tasks=BackgroundTasks(),
- file=UploadFile(
- file=io.BytesIO(base64.b64decode(image_data)),
- filename=f"generated-image-{uuid.uuid4().hex}.png",
- headers=Headers({"content-type": mime_type}),
- ),
- process=False,
+ image_data=base64.b64decode(image_data),
+ content_type=mime_type,
+ metadata={"mime_type": mime_type, **(metadata or {})},
user=user,
- metadata={"mime_type": mime_type},
)
- image_url = __request__.app.url_path_for("get_file_content_by_id", id=file_item.id)
return f""
- async def _get_image_content(self, user: UserModel, markdown_string: str):
+ async def _get_image_content(self, user: UserModel, markdown_string: str) -> str:
file_id = markdown_string.split("![grok-image-")[1].split("]")[0]
file_response = await get_file_content_by_id(id=file_id, user=user)
- return open(file_response.path, "rb")
+ mime_type = mimetypes.guess_type(file_response.path)[0] or "image/png"
+ with open(file_response.path, "rb") as file_content:
+ encoded = base64.b64encode(file_content.read()).decode()
+ return f"data:{mime_type};base64,{encoded}"
- async def _build_payload(self, user: UserModel, body: dict, user_valves: UserValves) -> Tuple[str, dict]:
- # payload
- model = body["model"].split(".", 1)[1]
+ async def _build_payload(
+ self,
+ user: UserModel,
+ body: dict,
+ user_valves: UserValves,
+ ) -> Tuple[str, dict, Optional[dict]]:
+ model = body["model"].split(".", 1)[1] if "." in body["model"] else body["model"]
+ user_valves = self._normalize_user_valves(user_valves)
images = []
- prompt = ""
+ prompt_parts = []
# read messages
- messages = body["messages"]
+ messages = body.get("messages", [])
if len(messages) >= 2:
messages = messages[-2:]
for message in messages:
@@ -164,47 +191,147 @@ async def _build_payload(self, user: UserModel, body: dict, user_valves: UserVal
# str content
if isinstance(message_content, str):
for item in message_content.split("\n"):
+ item = item.strip()
if not item:
continue
if item.startswith("![grok-image-"):
- file = await self._get_image_content(user, item)
- images.append({"url": f"data:image/jpeg;base64,{base64.b64encode(file.read()).decode()}"})
+ image_url = await self._get_image_content(user, item)
+ images.append({"type": "image_url", "url": image_url})
continue
- prompt = item
+ prompt_parts.append(item)
# list content
elif isinstance(message_content, list):
for content in message_content:
if content["type"] == "text":
- prompt = content["text"]
+ prompt_parts.append(content["text"])
continue
- if content["type"] == "image_url":
+ if content["type"] in {"image_url", "input_image"}:
image_url = content["image_url"]["url"]
- images.append({"url": image_url})
+ images.append({"type": "image_url", "url": image_url})
else:
raise TypeError("message content invalid")
+ prompt = "\n".join(prompt_parts).strip()
+ if not prompt:
+ prompt = body.get("prompt", "")
+
+ if len(images) > 5:
+ raise ValueError("xAI image editing supports up to 5 input images.")
+
# init payload
payload = {
"url": "/images/generations",
"json": {
"model": model,
- "prompt": (
- f"{str(user_valves.enable_nsfw).lower()}"
- f"{str(user_valves.is_kids_mode).lower()}\n"
- f"{prompt}"
- ),
- "quality": user_valves.quality,
- "aspect_ratio": user_valves.aspect_ratio,
+ "prompt": prompt,
"resolution": user_valves.resolution,
"response_format": "b64_json",
- "n": self.valves.num_of_images,
},
}
- if images:
+
+ if not images:
+ payload["json"]["n"] = self.valves.num_of_images
+ payload["json"]["aspect_ratio"] = user_valves.aspect_ratio
+ return model, payload, None
+
+ if len(images) == 1:
+ payload["json"]["image"] = images[0]
+ else:
+ payload["json"]["aspect_ratio"] = user_valves.aspect_ratio
payload["json"]["images"] = images
- payload["url"] = "/images/edits"
+ payload["url"] = "/images/edits"
+
+ multipart_fallback = await self._build_multipart_edit_payload(
+ images=images,
+ model=model,
+ prompt=prompt,
+ user_valves=user_valves,
+ )
+ return model, payload, multipart_fallback
- return model, payload
+ async def _build_multipart_edit_payload(
+ self,
+ images: list[dict],
+ model: str,
+ prompt: str,
+ user_valves: "Pipe.UserValves",
+ ) -> dict:
+ data = {
+ "model": model,
+ "prompt": prompt,
+ "resolution": user_valves.resolution,
+ "response_format": "b64_json",
+ }
+ if user_valves.aspect_ratio != "auto":
+ data["aspect_ratio"] = user_valves.aspect_ratio
+
+ files = [await self._image_ref_to_multipart_file(image=image, field_name="image[]") for image in images]
+ return {
+ "url": "/images/edits",
+ "data": self._stringify_form_data(data),
+ "files": files,
+ }
+
+ async def _image_ref_to_multipart_file(
+ self,
+ image: dict,
+ field_name: str,
+ ) -> tuple[str, tuple[str, bytes, str]]:
+ image_url = image.get("url", "")
+ if not image_url:
+ raise TypeError("message content invalid")
+
+ image_bytes, mime_type = await get_image_data(image_url)
+ if image_bytes is None or not mime_type:
+ raise ValueError("invalid image input")
+
+ file_ext = mimetypes.guess_extension(mime_type) or ".png"
+ if file_ext == ".jpe":
+ file_ext = ".jpg"
+
+ return (
+ field_name,
+ (f"{uuid.uuid4().hex}{file_ext}", image_bytes, mime_type),
+ )
+
+ @staticmethod
+ def _stringify_form_data(data: dict) -> dict:
+ form_data = {}
+ for key, value in data.items():
+ if value is None:
+ continue
+ if isinstance(value, bool):
+ form_data[key] = "true" if value else "false"
+ else:
+ form_data[key] = str(value)
+ return form_data
+
+ @staticmethod
+ def _should_retry_with_multipart(response_text: str) -> bool:
+ return "failed to parse multipart form" in response_text.lower()
+
+ @staticmethod
+ def _normalize_user_valves(user_valves: UserValves | dict) -> UserValves:
+ if isinstance(user_valves, Pipe.UserValves):
+ return user_valves
+ if isinstance(user_valves, BaseModel):
+ if hasattr(user_valves, "model_dump"):
+ return Pipe.UserValves(**user_valves.model_dump())
+ return Pipe.UserValves(**user_valves.dict())
+ return Pipe.UserValves(**(user_valves or {}))
+
+ @staticmethod
+ def _get_user(user_data: UserModel | BaseModel | dict) -> UserModel:
+ if isinstance(user_data, UserModel):
+ return user_data
+ if isinstance(user_data, BaseModel):
+ if hasattr(user_data, "model_dump"):
+ user_data = user_data.model_dump()
+ else:
+ user_data = user_data.dict()
+ if isinstance(user_data, dict):
+ return UserModel(**{k: v for k, v in user_data.items() if k != "valves"})
+ raise ValueError("user not found")
def _format_data(
self,
diff --git a/pipes/openai_image.py b/pipes/openai_image.py
index a279cca..6f27a9c 100644
--- a/pipes/openai_image.py
+++ b/pipes/openai_image.py
@@ -1,27 +1,28 @@
"""
title: OpenAI Image
-author: OVINC CN
+author: OVINC CN, yuzukumo
git_url: https://github.com/OVINC-CN/OpenWebUIPlugin.git
-version: 0.0.9
+version: 0.1.1
licence: MIT
"""
import base64
-import io
import json
import logging
+import mimetypes
+import re
import time
import uuid
-from typing import AsyncIterable, List, Literal, Optional
+from typing import Any, AsyncIterable, List, Literal, Optional, Tuple
import httpx
-from fastapi import BackgroundTasks, Request, UploadFile
+from fastapi import Request
from httpx import Response
from open_webui.env import GLOBAL_LOG_LEVEL
-from open_webui.models.users import UserModel, Users
-from open_webui.routers.files import get_file_content_by_id, upload_file
+from open_webui.models.users import UserModel
+from open_webui.routers.files import get_file_content_by_id
+from open_webui.routers.images import get_image_data, upload_image
from pydantic import BaseModel, Field
-from starlette.datastructures import Headers
from starlette.responses import StreamingResponse
logger = logging.getLogger(__name__)
@@ -35,12 +36,10 @@ def __init__(self, status: int, content: str, response: Response):
self._response = response
def __str__(self) -> str:
- # error msg
try:
return json.loads(self._content)["error"]["message"]
except Exception:
pass
- # build in error
try:
self._response.raise_for_status()
except Exception as err:
@@ -55,30 +54,71 @@ class Valves(BaseModel):
num_of_images: int = Field(default=1, title="图片数量", ge=1, le=10)
timeout: int = Field(default=600, title="请求超时(秒)")
proxy: str = Field(default="", title="代理地址")
- models: str = Field(default="gpt-image-1", title="支持模型列表", description="多个模型用逗号分隔")
+ models: str = Field(
+ default="gpt-image-2",
+ title="支持模型列表",
+ description="多个模型用逗号分隔",
+ )
class UserValves(BaseModel):
quality: Literal["low", "medium", "high", "auto"] = Field(default="auto", title="图片质量")
- size: Literal["1024x1024", "1536x1024", "1024x1536", "auto"] = Field(default="auto", title="图片比例")
+ size_preset: Literal[
+ "auto",
+ "1024x1024",
+ "1536x1024",
+ "1024x1536",
+ "2048x2048",
+ "2048x1152",
+ "3840x2160",
+ "2160x3840",
+ "customize",
+ ] = Field(
+ default="auto",
+ title="图片分辨率",
+ )
+ custom_size: str = Field(
+ default="1024x1024",
+ title="自定义分辨率",
+ description="仅在分辨率选择 customize 时生效,例如 1024x1024",
+ )
+ moderation: Literal["auto", "low"] = Field(default="auto", title="审核级别")
+ output_format: Literal["png", "jpeg", "webp"] = Field(default="png", title="输出格式")
+ output_compression: int = Field(default=100, title="压缩质量", ge=0, le=100)
+ enable_mask_mode: bool = Field(default=False, title="启用 Mask 模式")
def __init__(self):
self.valves = self.Valves()
def pipes(self) -> List[dict]:
- return [{"id": m.strip(), "name": m.strip()} for m in self.valves.models.split(",") if m.strip()]
+ return [
+ {"id": model.strip(), "name": model.strip()} for model in self.valves.models.split(",") if model.strip()
+ ]
async def pipe(
self,
body: dict,
__user__: dict,
__request__: Request,
+ __metadata__: Optional[dict] = None,
) -> StreamingResponse:
- return StreamingResponse(self._pipe(body=body, __user__=__user__, __request__=__request__))
+ return StreamingResponse(
+ self._pipe(body=body, __user__=__user__, __request__=__request__, __metadata__=__metadata__)
+ )
+
+ async def _pipe(
+ self,
+ body: dict,
+ __user__: dict,
+ __request__: Request,
+ __metadata__: Optional[dict] = None,
+ ) -> AsyncIterable[str]:
+ user = self._get_user(__user__)
+
+ model, payload, output_format = await self._build_payload(
+ user=user, body=body, user_valves=__user__.get("valves", {})
+ )
+ is_stream = bool(body.get("stream"))
- async def _pipe(self, body: dict, __user__: dict, __request__: Request) -> AsyncIterable:
- user = Users.get_user_by_id(__user__["id"])
- model, payload = await self._build_payload(user=user, body=body, user_valves=__user__["valves"])
- # call client
async with httpx.AsyncClient(
base_url=self.valves.base_url,
headers={"Authorization": f"Bearer {self.valves.api_key}"},
@@ -89,118 +129,408 @@ async def _pipe(self, body: dict, __user__: dict, __request__: Request) -> Async
response = await client.post(**payload)
if response.status_code != 200:
raise APIException(status=response.status_code, content=response.text, response=response)
- response = response.json()
-
- # upload image
- results = []
- for item in response["data"]:
- results.append(
- self._upload_image(
- __request__=__request__,
- user=user,
- image_data=item["b64_json"],
- mime_type="image/png",
- )
- )
-
- # format response data
- usage = response.get("usage", None)
-
- # response
- content = "\n\n".join(results)
- if body.get("stream"):
+ response_json = response.json()
+
+ content = await self._parse_response_images(
+ response_json=response_json,
+ output_format=output_format,
+ user=user,
+ __request__=__request__,
+ metadata=__metadata__,
+ )
+ usage = self._extract_usage(response_json)
+ if is_stream:
yield self._format_data(is_stream=True, model=model, content=content, usage=None)
- yield self._format_data(is_stream=True, model=model, content=None, usage=usage)
- else:
- yield self._format_data(is_stream=False, model=model, content=content, usage=usage)
+ if usage:
+ yield self._format_data(is_stream=True, model=model, content=None, usage=usage)
+ return
- def _upload_image(self, __request__: Request, user: UserModel, image_data: str, mime_type: str) -> str:
- file_item = upload_file(
- request=__request__,
- background_tasks=BackgroundTasks(),
- file=UploadFile(
- file=io.BytesIO(base64.b64decode(image_data)),
- filename=f"generated-image-{uuid.uuid4().hex}.png",
- headers=Headers({"content-type": mime_type}),
- ),
- process=False,
- user=user,
- metadata={"mime_type": mime_type},
- )
- image_url = __request__.app.url_path_for("get_file_content_by_id", id=file_item.id)
- return f""
+ yield self._format_data(is_stream=False, model=model, content=content, usage=usage)
- async def _get_image_content(self, user: UserModel, markdown_string: str):
- file_id = markdown_string.split("![openai-image-")[1].split("]")[0]
- file_response = await get_file_content_by_id(id=file_id, user=user)
- return open(file_response.path, "rb")
+ async def _build_payload(self, user: UserModel, body: dict, user_valves: Any) -> Tuple[str, dict, str]:
+ user_valves = self._normalize_user_valves(user_valves)
+ model = body["model"].split(".", 1)[1] if "." in body["model"] else body["model"]
+
+ prompt, images = await self._parse_messages(user=user, body=body)
+ mask = await self._parse_mask(user=user, body=body)
+ size = self._resolve_size(user_valves=user_valves)
- async def _build_payload(self, user: UserModel, body: dict, user_valves: UserValves) -> (str, dict):
- # payload
- model = body["model"].split(".", 1)[1]
data = {
- "image": [],
- "prompt": "",
- "n": self.valves.num_of_images,
"model": model,
- "quality": user_valves.quality,
- "size": user_valves.size,
+ "prompt": prompt,
}
- # read messages
- messages = body["messages"]
- if len(messages) >= 2:
- messages = messages[-2:]
+ if user_valves.quality != "auto":
+ data["quality"] = user_valves.quality
+ if size:
+ data["size"] = size
+ if user_valves.moderation != "auto":
+ data["moderation"] = user_valves.moderation
+ if user_valves.output_format != "png":
+ data["output_format"] = user_valves.output_format
+ if user_valves.output_format in {"jpeg", "webp"} and user_valves.output_compression != 100:
+ data["output_compression"] = user_valves.output_compression
+
+ if not images:
+ data["n"] = self.valves.num_of_images
+ payload = {"url": "/images/generations", "json": data}
+ return model, payload, user_valves.output_format
+
+ if len(images) > 16:
+ raise ValueError("OpenAI image edits support up to 16 input images.")
+
+ if user_valves.enable_mask_mode and len(images) >= 2 and not mask:
+ mask = images[1]
+ images = [images[0], *images[2:]]
+
+ edit_data = dict(data)
+ if self.valves.num_of_images > 1:
+ edit_data["n"] = self.valves.num_of_images
+
+ files = [
+ await self._image_ref_to_multipart_file(user=user, image_ref=image, field_name="image[]")
+ for image in images
+ ]
+ if mask:
+ files.append(await self._image_ref_to_multipart_file(user=user, image_ref=mask, field_name="mask"))
+
+ payload = {
+ "url": "/images/edits",
+ "data": self._stringify_form_data(edit_data),
+ "files": files,
+ }
+ return model, payload, user_valves.output_format
+
+ async def _parse_messages(self, user: UserModel, body: dict) -> Tuple[str, List[dict]]:
+ prompt_parts: List[str] = []
+ images: List[dict] = []
+
+ messages = body.get("messages", [])
+ if len(messages) >= 4:
+ messages = messages[-4:]
+
for message in messages:
- # ignore system message
- if message["role"] == "system":
+ if message.get("role") == "system":
continue
- # parse content
- message_content = message["content"]
- # str content
+
+ allow_text = message.get("role") != "assistant"
+ message_content = message.get("content")
+
if isinstance(message_content, str):
for item in message_content.split("\n"):
+ item = item.strip()
if not item:
continue
if item.startswith("![openai-image-"):
- data["image"].append(await self._get_image_content(user, item))
+ images.append(await self._markdown_to_image_ref(user, item))
continue
- data["prompt"] += f"\n{message_content}"
- # list content
- elif isinstance(message_content, list):
+ if allow_text:
+ prompt_parts.append(item)
+ continue
+
+ if isinstance(message_content, list):
for content in message_content:
if content["type"] == "text":
- data["prompt"] += f"\n{content['text']}"
+ text = content.get("text", "").strip()
+ if text and allow_text:
+ prompt_parts.append(text)
continue
- if content["type"] == "image_url":
- image_url = content["image_url"]["url"]
- header, encoded = image_url.split(",", 1)
- mime_type = header.split(";")[0].split(":")[1]
- file_name = f"{uuid.uuid4().hex}.{mime_type.split('/')[-1]}"
- image_bytes = base64.b64decode(encoded.encode())
- data["image"].append(
- (
- file_name,
- image_bytes,
- mime_type,
- {"content-type": mime_type},
- )
- )
+ if content["type"] in {"image_url", "input_image"}:
+ images.append(self._image_ref_from_content(content))
+ continue
+ raise TypeError("message content invalid")
+ continue
+
+ raise TypeError("message content invalid")
+
+ prompt = "\n".join(prompt_parts).strip()
+ if not prompt:
+ prompt = body.get("prompt", "Please generate an image.")
+ return prompt, images
+
+ async def _parse_mask(self, user: UserModel, body: dict) -> Optional[dict]:
+ mask = body.get("mask")
+ if not mask:
+ return None
+ return await self._normalize_image_ref(user, mask)
+
+ async def _parse_response_images(
+ self,
+ response_json: dict,
+ output_format: str,
+ user: UserModel,
+ __request__: Request,
+ metadata: Optional[dict],
+ ) -> str:
+ results = []
+ for item in self._collect_image_items(response_json):
+ rendered = await self._render_documented_image_item(
+ item=item,
+ output_format=output_format,
+ user=user,
+ __request__=__request__,
+ metadata=metadata,
+ )
+ if rendered:
+ results.append(rendered)
+
+ if not results and response_json.get("error"):
+ raise ValueError(response_json["error"].get("message", "Unknown API error"))
+ if not results:
+ preview = json.dumps(response_json, ensure_ascii=False)
+ if len(preview) > 600:
+ preview = f"{preview[:600]}..."
+ raise ValueError(f"No image returned by OpenAI image API. Response preview: {preview}")
+
+ return "\n\n".join(results)
+
+ async def _render_documented_image_item(
+ self,
+ item: dict,
+ output_format: str,
+ user: UserModel,
+ __request__: Request,
+ metadata: Optional[dict],
+ ) -> str:
+ if not isinstance(item, dict):
+ return ""
+ image_data = item.get("b64_json")
+ if image_data:
+ mime_type = item.get("mime_type") or self._mime_type_for_format(output_format)
+ return await self._upload_image(
+ __request__=__request__,
+ user=user,
+ image_source=image_data,
+ mime_type=mime_type,
+ metadata=metadata,
+ )
+ image_url = item.get("url")
+ if image_url:
+ return await self._upload_image(
+ __request__=__request__,
+ user=user,
+ image_source=image_url,
+ mime_type=self._mime_type_for_format(output_format),
+ metadata=metadata,
+ )
+ return ""
+
+ async def _upload_image(
+ self,
+ __request__: Request,
+ user: UserModel,
+ image_source: str,
+ mime_type: str,
+ metadata: Optional[dict],
+ ) -> str:
+ headers = None
+ if image_source.startswith("http://") or image_source.startswith("https://"):
+ headers = {"Authorization": f"Bearer {self.valves.api_key}"}
+
+ image_bytes, detected_mime_type = await get_image_data(image_source, headers=headers)
+ if image_bytes is None:
+ raise ValueError("invalid image data returned by OpenAI image API")
+
+ file_item, image_url = await upload_image(
+ request=__request__,
+ image_data=image_bytes,
+ content_type=detected_mime_type or mime_type,
+ metadata={"mime_type": detected_mime_type or mime_type, **(metadata or {})},
+ user=user,
+ )
+ return f""
+
+ async def _markdown_to_image_ref(self, user: UserModel, markdown_string: str) -> dict:
+ file_id = markdown_string.split("![openai-image-")[1].split("]")[0]
+ return await self._normalize_image_ref(user, file_id)
+
+ async def _normalize_image_ref(self, user: UserModel, value: Any) -> dict:
+ if isinstance(value, dict):
+ if "image_url" in value or "file_id" in value:
+ return value
+ raise TypeError("invalid image reference")
+
+ if not isinstance(value, str):
+ raise TypeError("invalid image reference")
+
+ if value.startswith("![openai-image-"):
+ file_id = value.split("![openai-image-")[1].split("]")[0]
+ return await self._normalize_image_ref(user, file_id)
+
+ if value.startswith("http://") or value.startswith("https://") or value.startswith("data:"):
+ return {"image_url": value}
+
+ file_response = await get_file_content_by_id(id=value, user=user)
+ mime_type = mimetypes.guess_type(file_response.path)[0] or "image/png"
+ with open(file_response.path, "rb") as file_content:
+ encoded = base64.b64encode(file_content.read()).decode()
+ return {"image_url": f"data:{mime_type};base64,{encoded}"}
+
+ @staticmethod
+ def _image_ref_from_content(content: dict) -> dict:
+ image_url = content["image_url"]["url"]
+ return {"image_url": image_url}
+
+ async def _image_ref_to_multipart_file(
+ self,
+ user: UserModel,
+ image_ref: dict,
+ field_name: str,
+ ) -> tuple[str, tuple[str, bytes, str]]:
+ image_bytes, mime_type = await self._image_ref_to_bytes(user=user, image_ref=image_ref)
+ file_ext = mimetypes.guess_extension(mime_type) or ".png"
+ if file_ext == ".jpe":
+ file_ext = ".jpg"
+ return (
+ field_name,
+ (f"{uuid.uuid4().hex}{file_ext}", image_bytes, mime_type),
+ )
+
+ async def _image_ref_to_bytes(self, user: UserModel, image_ref: dict) -> tuple[bytes, str]:
+ if not isinstance(image_ref, dict):
+ raise TypeError("invalid image reference")
+
+ if "file_id" in image_ref:
+ file_response = await get_file_content_by_id(id=image_ref["file_id"], user=user)
+ with open(file_response.path, "rb") as file_content:
+ image_bytes = file_content.read()
+ mime_type = file_response.media_type or mimetypes.guess_type(file_response.path)[0] or "image/png"
+ return image_bytes, mime_type
+
+ image_url = image_ref.get("image_url")
+ if isinstance(image_url, dict):
+ image_url = image_url.get("url", "")
+ if not isinstance(image_url, str) or not image_url:
+ raise TypeError("invalid image reference")
+
+ image_bytes, mime_type = await get_image_data(image_url)
+ if image_bytes is None or not mime_type:
+ raise ValueError("invalid image input")
+ return image_bytes, mime_type
+
+ @staticmethod
+ def _stringify_form_data(data: dict) -> dict:
+ form_data = {}
+ for key, value in data.items():
+ if value is None:
+ continue
+ if isinstance(value, bool):
+ form_data[key] = "true" if value else "false"
else:
- raise TypeError("message content invalid")
+ form_data[key] = str(value)
+ return form_data
+
+ @staticmethod
+ def _collect_image_items(payload: Any) -> list[dict]:
+ if not isinstance(payload, dict):
+ return []
+
+ items = []
+ for container in (payload, payload.get("response")):
+ if not isinstance(container, dict):
+ continue
+
+ data = container.get("data")
+ if isinstance(data, list):
+ items.extend(item for item in data if isinstance(item, dict))
+ elif isinstance(data, dict):
+ items.append(data)
+
+ if any(key in container for key in ("b64_json", "url")):
+ items.append(container)
+
+ return items
+
+ @staticmethod
+ def _extract_usage(payload: Any) -> Optional[dict]:
+ if not isinstance(payload, dict):
+ return None
+
+ usage = payload.get("usage")
+ if isinstance(usage, dict):
+ return usage
+
+ response = payload.get("response")
+ if isinstance(response, dict):
+ usage = response.get("usage")
+ if isinstance(usage, dict):
+ return usage
- # init payload
- if data["image"]:
- files = data.pop("image")
- payload = {"url": "/images/edits", "files": [("image[]", file) for file in files], "data": data}
+ return None
+
+ @staticmethod
+ def _mime_type_for_format(output_format: str) -> str:
+ return {
+ "png": "image/png",
+ "jpeg": "image/jpeg",
+ "webp": "image/webp",
+ }.get(output_format, "image/png")
+
+ @staticmethod
+ def _resolve_size(user_valves: "Pipe.UserValves") -> Optional[str]:
+ if user_valves.size_preset == "auto":
+ return None
+
+ if user_valves.size_preset == "customize":
+ size = user_valves.custom_size.strip()
else:
- data.pop("image", None)
- payload = {"url": "/images/generations", "json": data}
+ size = user_valves.size_preset
+
+ width, height = Pipe._parse_size_string(size)
+ normalized_size = f"{width}x{height}"
+ Pipe._validate_gpt_image_2_size(normalized_size)
+ return normalized_size
+
+ @staticmethod
+ def _validate_gpt_image_2_size(size: str) -> None:
+ width, height = Pipe._parse_size_string(size)
+ pixels = width * height
+ short_edge = min(width, height)
+ long_edge = max(width, height)
- return model, payload
+ if width > 3840 or height > 3840:
+ raise ValueError("gpt-image-2 widths and heights must be 3840 pixels or smaller.")
+ if width % 16 != 0 or height % 16 != 0:
+ raise ValueError("gpt-image-2 widths and heights must be multiples of 16.")
+ if pixels < 655360 or pixels > 8294400:
+ raise ValueError("gpt-image-2 image sizes must stay between 655,360 and 8,294,400 total pixels.")
+ if long_edge > short_edge * 3:
+ raise ValueError("gpt-image-2 aspect ratio cannot exceed 3:1.")
+ @staticmethod
+ def _parse_size_string(size: str) -> tuple[int, int]:
+ normalized = size.strip().lower()
+ match = re.fullmatch(r"(\d+)\s*x\s*(\d+)", normalized)
+ if not match:
+ raise ValueError("Custom size must use WIDTHxHEIGHT format, for example 2048x1152.")
+ return int(match.group(1)), int(match.group(2))
+
+ @staticmethod
+ def _normalize_user_valves(user_valves: Any) -> "Pipe.UserValves":
+ if isinstance(user_valves, Pipe.UserValves):
+ return user_valves
+ if isinstance(user_valves, BaseModel):
+ if hasattr(user_valves, "model_dump"):
+ user_valves = user_valves.model_dump()
+ else:
+ user_valves = user_valves.dict()
+ return Pipe.UserValves(**(user_valves or {}))
+
+ @staticmethod
+ def _get_user(user_data: Any) -> UserModel:
+ if isinstance(user_data, UserModel):
+ return user_data
+ if isinstance(user_data, BaseModel):
+ if hasattr(user_data, "model_dump"):
+ user_data = user_data.model_dump()
+ else:
+ user_data = user_data.dict()
+ if isinstance(user_data, dict):
+ return UserModel(**{k: v for k, v in user_data.items() if k != "valves"})
+ raise ValueError("user not found")
+
+ @staticmethod
def _format_data(
- self,
is_stream: bool,
model: Optional[str] = "",
content: Optional[str] = "",