From ce08de9e1f94b3a6599c93fc2376b860c053103b Mon Sep 17 00:00:00 2001 From: technowhizz <7688823+technowhizz@users.noreply.github.com> Date: Sun, 10 May 2026 20:03:04 +0000 Subject: [PATCH 1/5] Add OpenAI Responses API compatibility route Adds a minimal non-streaming /v1/responses endpoint that translates supported Responses API input shapes into the existing chat completions request path. The response is adapted back into the core Responses fields expected by OpenAI-compatible clients, including output message content, output_text, timestamps, status, and usage where available. --- claude_code_api/api/chat.py | 548 ++++++++++++++++++++++++++++++- claude_code_api/main.py | 1 + claude_code_api/models/openai.py | 87 +++++ tests/test_responses_api.py | 155 +++++++++ 4 files changed, 790 insertions(+), 1 deletion(-) create mode 100644 tests/test_responses_api.py diff --git a/claude_code_api/api/chat.py b/claude_code_api/api/chat.py index 5f41d99..21c7b18 100644 --- a/claude_code_api/api/chat.py +++ b/claude_code_api/api/chat.py @@ -2,7 +2,8 @@ import hashlib import json -from typing import Any, Dict, Optional, Tuple +import uuid +from typing import Any, AsyncGenerator, Dict, List, Optional, Tuple import structlog from fastapi import APIRouter, HTTPException, Request, status @@ -20,6 +21,8 @@ ChatCompletionRequest, ChatCompletionResponse, ErrorResponse, + ResponsesCreateRequest, + ResponsesResponse, ) from claude_code_api.utils.parser import ( ClaudeOutputParser, @@ -31,6 +34,7 @@ create_non_streaming_response, create_sse_response, ) +from claude_code_api.utils.time import utc_timestamp logger = structlog.get_logger() router = APIRouter() @@ -53,6 +57,23 @@ 500: {"model": ErrorResponse}, } +RESPONSES_API_RESPONSES = { + 200: { + "model": ResponsesResponse, + "description": "Responses API response (JSON when stream=false, SSE when stream=true).", + "content": { + "text/event-stream": {"schema": {"type": "string"}}, + }, + }, + 400: {"model": ErrorResponse}, + 422: {"model": ErrorResponse}, + 503: {"model": ErrorResponse}, + 500: {"model": ErrorResponse}, +} + +RESPONSE_TEXT_BLOCK_TYPES = {"input_text", "output_text", "text"} +RESPONSE_INPUT_ROLES = {"system", "user", "assistant", "tool"} + def _http_error( status_code: int, message: str, error_type: str, code: str @@ -63,6 +84,15 @@ def _http_error( ) +def _input_error(message: str, code: str = "invalid_input") -> HTTPException: + return _http_error( + status.HTTP_400_BAD_REQUEST, + message, + "invalid_request_error", + code, + ) + + async def _log_raw_request(req: Request) -> None: raw_body = await req.body() content_type = req.headers.get("content-type", "unknown") @@ -116,6 +146,470 @@ def _extract_prompts(request: ChatCompletionRequest) -> Tuple[str, str]: return user_prompt, system_prompt +def _coerce_response_content_block(block: Any, location: str) -> str: + if isinstance(block, str): + return block + + if not isinstance(block, dict): + raise _input_error( + f"Unsupported content block at {location}: expected an object or string.", + "unsupported_input_block", + ) + + block_type = block.get("type") + if block_type in RESPONSE_TEXT_BLOCK_TYPES: + if "text" not in block: + raise _input_error( + f"Text content block at {location} is missing the 'text' field.", + "invalid_input_block", + ) + return str(block["text"]) + + if block_type is None: + if "text" in block: + return str(block["text"]) + if "content" in block: + return str(block["content"]) + + raise _input_error( + f"Unsupported content block type at {location}: {block_type!r}.", + "unsupported_input_block", + ) + + +def _coerce_response_content(content: Any, location: str) -> str: + if content is None: + return "" + if isinstance(content, str): + return content + if isinstance(content, list): + text_parts = [ + _coerce_response_content_block(block, f"{location}.content[{index}]") + for index, block in enumerate(content) + ] + return "\n".join(part for part in text_parts if part) + if isinstance(content, dict): + return _coerce_response_content_block(content, f"{location}.content") + + raise _input_error( + f"Unsupported content at {location}: expected a string or content block array.", + "unsupported_input_content", + ) + + +def _coerce_response_role(role: Any, location: str) -> str: + if not isinstance(role, str) or not role: + raise _input_error( + f"Message at {location} is missing a valid 'role'.", + "invalid_input_message", + ) + + if role == "developer": + return "system" + + if role not in RESPONSE_INPUT_ROLES: + raise _input_error( + f"Unsupported message role at {location}: {role!r}.", + "unsupported_input_role", + ) + + return role + + +def _responses_input_to_chat_messages(input_value: Any) -> List[Dict[str, Any]]: + if isinstance(input_value, str): + return [{"role": "user", "content": input_value}] + + if not isinstance(input_value, list): + raise _input_error( + "The 'input' field must be a string or an array of message objects.", + "invalid_input", + ) + + messages: List[Dict[str, Any]] = [] + for index, item in enumerate(input_value): + location = f"input[{index}]" + if not isinstance(item, dict): + raise _input_error( + f"Message at {location} must be an object.", + "invalid_input_message", + ) + + item_type = item.get("type") + if item_type not in (None, "message"): + raise _input_error( + f"Unsupported input item type at {location}: {item_type!r}.", + "unsupported_input_item", + ) + + role = _coerce_response_role(item.get("role"), location) + content = _coerce_response_content(item.get("content"), location) + message: Dict[str, Any] = {"role": role, "content": content} + + for optional_field in ("name", "tool_call_id", "tool_calls"): + if optional_field in item: + message[optional_field] = item[optional_field] + + messages.append(message) + + return messages + + +def _responses_request_to_chat_request( + request: ResponsesCreateRequest, stream: bool = False +) -> ChatCompletionRequest: + messages = _responses_input_to_chat_messages(request.input) + system_prompt = request.instructions if request.instructions else None + + return ChatCompletionRequest( + model=request.model, + messages=messages, + temperature=request.temperature, + max_tokens=request.max_output_tokens, + stream=stream, + project_id=request.project_id, + session_id=request.session_id, + system_prompt=system_prompt, + ) + + +def _extract_chat_response_text(chat_response: Dict[str, Any]) -> str: + choices = chat_response.get("choices") or [] + if not choices or not isinstance(choices[0], dict): + return "" + + message = choices[0].get("message") or {} + if not isinstance(message, dict): + return "" + + content = message.get("content") + if isinstance(content, str): + return content + if content is None: + return "" + return str(content) + + +def _responses_usage_from_chat(chat_response: Dict[str, Any]) -> Dict[str, Any]: + usage = chat_response.get("usage") or {} + if not isinstance(usage, dict): + usage = {} + + return { + "input_tokens": usage.get("prompt_tokens"), + "output_tokens": usage.get("completion_tokens"), + "total_tokens": usage.get("total_tokens"), + } + + +def _chat_response_to_responses_response( + request: ResponsesCreateRequest, chat_response: Dict[str, Any] +) -> Dict[str, Any]: + created_at = chat_response.get("created") or utc_timestamp() + completed_at = utc_timestamp() + output_text = _extract_chat_response_text(chat_response) + + return { + "id": f"resp_{uuid.uuid4().hex}", + "object": "response", + "created_at": created_at, + "status": "completed", + "completed_at": completed_at, + "error": None, + "incomplete_details": None, + "instructions": None, + "max_output_tokens": request.max_output_tokens, + "model": chat_response.get("model") or request.model, + "output": [ + { + "id": f"msg_{uuid.uuid4().hex}", + "type": "message", + "status": "completed", + "role": "assistant", + "content": [ + { + "type": "output_text", + "text": output_text, + "annotations": [], + } + ], + } + ], + "output_text": output_text, + "usage": _responses_usage_from_chat(chat_response), + } + + +def _responses_stream_event(event_type: str, data: Dict[str, Any]) -> str: + payload = {"type": event_type, **data} + json_data = json.dumps(payload, separators=(",", ":")) + return f"event: {event_type}\ndata: {json_data}\n\n" + + +def _responses_stream_error(message: str) -> str: + return _responses_stream_event( + "response.failed", + { + "response": { + "id": f"resp_{uuid.uuid4().hex}", + "object": "response", + "created_at": utc_timestamp(), + "status": "failed", + "error": { + "message": message, + "type": "server_error", + "code": "stream_error", + }, + } + }, + ) + + +async def _iter_sse_events(body_iterator: Any) -> AsyncGenerator[str, None]: + buffer = "" + async for chunk in body_iterator: + if isinstance(chunk, bytes): + buffer += chunk.decode("utf-8") + else: + buffer += str(chunk) + + while "\n\n" in buffer: + raw_event, buffer = buffer.split("\n\n", 1) + if raw_event.strip(): + yield raw_event + + if buffer.strip(): + yield buffer + + +def _sse_data(raw_event: str) -> Optional[str]: + data_lines = [] + for line in raw_event.splitlines(): + if line.startswith("data:"): + data_lines.append(line[5:].lstrip()) + if not data_lines: + return None + return "\n".join(data_lines) + + +def _responses_completed_payload( + response_id: str, + message_id: str, + created_at: int, + completed_at: int, + request: ResponsesCreateRequest, + model: str, + output_text: str, +) -> Dict[str, Any]: + return { + "id": response_id, + "object": "response", + "created_at": created_at, + "status": "completed", + "completed_at": completed_at, + "error": None, + "incomplete_details": None, + "instructions": None, + "max_output_tokens": request.max_output_tokens, + "model": model, + "output": [ + { + "id": message_id, + "type": "message", + "status": "completed", + "role": "assistant", + "content": [ + { + "type": "output_text", + "text": output_text, + "annotations": [], + } + ], + } + ], + "output_text": output_text, + "usage": { + "input_tokens": None, + "output_tokens": None, + "total_tokens": None, + }, + } + + +async def _create_responses_sse_from_chat_stream( + chat_stream_response: StreamingResponse, + request: ResponsesCreateRequest, +) -> AsyncGenerator[str, None]: + response_id = f"resp_{uuid.uuid4().hex}" + message_id = f"msg_{uuid.uuid4().hex}" + created_at = utc_timestamp() + model = request.model + output_parts: List[str] = [] + content_started = False + + yield _responses_stream_event( + "response.created", + { + "response": { + "id": response_id, + "object": "response", + "created_at": created_at, + "status": "in_progress", + "model": model, + } + }, + ) + yield _responses_stream_event( + "response.output_item.added", + { + "output_index": 0, + "item": { + "id": message_id, + "type": "message", + "status": "in_progress", + "role": "assistant", + "content": [], + }, + }, + ) + + try: + async for raw_event in _iter_sse_events(chat_stream_response.body_iterator): + payload = _sse_data(raw_event) + if payload is None: + continue + if payload == "[DONE]": + break + + try: + chunk = json.loads(payload) + except json.JSONDecodeError: + continue + + if "error" in chunk: + yield _responses_stream_event( + "response.failed", {"response": {"id": response_id, **chunk}} + ) + return + + model = chunk.get("model") or model + choices = chunk.get("choices") or [] + if not choices: + continue + + choice = choices[0] + delta = choice.get("delta") or {} + text_delta = delta.get("content") + if not text_delta: + continue + + if not content_started: + content_started = True + yield _responses_stream_event( + "response.content_part.added", + { + "item_id": message_id, + "output_index": 0, + "content_index": 0, + "part": { + "type": "output_text", + "text": "", + "annotations": [], + }, + }, + ) + + output_parts.append(str(text_delta)) + yield _responses_stream_event( + "response.output_text.delta", + { + "item_id": message_id, + "output_index": 0, + "content_index": 0, + "delta": str(text_delta), + }, + ) + + output_text = "".join(output_parts) + if not content_started: + yield _responses_stream_event( + "response.content_part.added", + { + "item_id": message_id, + "output_index": 0, + "content_index": 0, + "part": { + "type": "output_text", + "text": "", + "annotations": [], + }, + }, + ) + + yield _responses_stream_event( + "response.output_text.done", + { + "item_id": message_id, + "output_index": 0, + "content_index": 0, + "text": output_text, + }, + ) + yield _responses_stream_event( + "response.content_part.done", + { + "item_id": message_id, + "output_index": 0, + "content_index": 0, + "part": { + "type": "output_text", + "text": output_text, + "annotations": [], + }, + }, + ) + yield _responses_stream_event( + "response.output_item.done", + { + "output_index": 0, + "item": { + "id": message_id, + "type": "message", + "status": "completed", + "role": "assistant", + "content": [ + { + "type": "output_text", + "text": output_text, + "annotations": [], + } + ], + }, + }, + ) + + completed_at = utc_timestamp() + yield _responses_stream_event( + "response.completed", + { + "response": _responses_completed_payload( + response_id=response_id, + message_id=message_id, + created_at=created_at, + completed_at=completed_at, + request=request, + model=model, + output_text=output_text, + ) + }, + ) + yield "data: [DONE]\n\n" + + except Exception as e: + logger.error("Responses streaming error", error=str(e), exc_info=True) + yield _responses_stream_error("Stream error") + + async def _resolve_session( session_manager: SessionManager, request: ChatCompletionRequest, @@ -255,6 +749,58 @@ def _log_response_payload(response: Dict[str, Any]) -> None: ) +@router.post( + "/responses", + responses=RESPONSES_API_RESPONSES, +) +async def create_response(request: ResponsesCreateRequest, req: Request) -> Any: + """Create a minimal OpenAI Responses API response.""" + logger.info( + "Responses API request validated", + model=request.model, + stream=request.stream, + max_output_tokens=request.max_output_tokens, + project_id=request.project_id, + session_id=request.session_id, + ) + + chat_request = _responses_request_to_chat_request( + request, stream=bool(request.stream) + ) + chat_response = await create_chat_completion(chat_request, req) + + if request.stream: + if not isinstance(chat_response, StreamingResponse): + raise _http_error( + status.HTTP_500_INTERNAL_SERVER_ERROR, + "Unexpected chat completion streaming response type.", + "internal_error", + "unexpected_response_type", + ) + return StreamingResponse( + _create_responses_sse_from_chat_stream(chat_response, request), + media_type="text/event-stream", + headers={ + "Cache-Control": "no-cache", + "Connection": "keep-alive", + "X-Accel-Buffering": "no", + }, + ) + + if hasattr(chat_response, "model_dump"): + chat_response = chat_response.model_dump() + + if not isinstance(chat_response, dict): + raise _http_error( + status.HTTP_500_INTERNAL_SERVER_ERROR, + "Unexpected chat completion response type.", + "internal_error", + "unexpected_response_type", + ) + + return _chat_response_to_responses_response(request, chat_response) + + @router.post( "/chat/completions", response_model=ChatCompletionResponse, diff --git a/claude_code_api/main.py b/claude_code_api/main.py index 8d5c630..0681cbc 100644 --- a/claude_code_api/main.py +++ b/claude_code_api/main.py @@ -200,6 +200,7 @@ async def root(): "description": "OpenAI-compatible API for Claude Code", "endpoints": { "chat": "/v1/chat/completions", + "responses": "/v1/responses", "models": "/v1/models", "projects": "/v1/projects", "sessions": "/v1/sessions", diff --git a/claude_code_api/models/openai.py b/claude_code_api/models/openai.py index f538096..896f5f1 100644 --- a/claude_code_api/models/openai.py +++ b/claude_code_api/models/openai.py @@ -208,6 +208,93 @@ class ChatCompletionResponse(BaseModel): ) +class ResponsesCreateRequest(BaseModel): + """Minimal OpenAI Responses API request model.""" + + model: str = Field(..., description="ID of the model to use") + input: Union[str, List[Any]] = Field( + ..., description="Text input or message-like input objects" + ) + temperature: Optional[float] = Field( + None, ge=0.0, le=2.0, description="Sampling temperature" + ) + max_output_tokens: Optional[int] = Field( + None, ge=1, description="Maximum number of tokens to generate" + ) + stream: Optional[bool] = Field( + False, description="Whether to stream response events" + ) + instructions: Optional[str] = Field(None, description="System instructions") + + # Extension fields for Claude Code + project_id: Optional[str] = Field( + None, description="Project ID for Claude Code context" + ) + session_id: Optional[str] = Field( + None, description="Session ID to continue conversation" + ) + + +class ResponsesOutputText(BaseModel): + """Responses API output text content block.""" + + type: Literal["output_text"] = Field( + "output_text", description=OBJECT_TYPE_DESC + ) + text: str = Field(..., description="Assistant output text") + annotations: List[Any] = Field( + default_factory=list, description="Output text annotations" + ) + + +class ResponsesOutputMessage(BaseModel): + """Responses API output message.""" + + id: str = Field(..., description="Message ID") + type: Literal["message"] = Field("message", description=OBJECT_TYPE_DESC) + status: Literal["completed"] = Field("completed", description="Message status") + role: Literal["assistant"] = Field("assistant", description="Message role") + content: List[ResponsesOutputText] = Field( + ..., description="Message content blocks" + ) + + +class ResponsesUsage(BaseModel): + """Responses API token usage.""" + + input_tokens: Optional[int] = Field(None, description="Input token count") + output_tokens: Optional[int] = Field(None, description="Output token count") + total_tokens: Optional[int] = Field(None, description="Total token count") + + +class ResponsesResponse(BaseModel): + """Minimal OpenAI Responses API response model.""" + + id: str = Field(..., description="Response ID") + object: Literal["response"] = Field("response", description=OBJECT_TYPE_DESC) + created_at: int = Field( + ..., description="Unix timestamp of when the response was created" + ) + status: Literal["completed"] = Field("completed", description="Response status") + completed_at: int = Field( + ..., description="Unix timestamp of when the response completed" + ) + error: Optional[Dict[str, Any]] = Field(None, description="Response error") + incomplete_details: Optional[Dict[str, Any]] = Field( + None, description="Incomplete response details" + ) + instructions: Optional[str] = Field(None, description="System instructions") + max_output_tokens: Optional[int] = Field( + None, description="Maximum number of output tokens requested" + ) + model: str = Field(..., description="Model used for the response") + output: List[ResponsesOutputMessage] = Field( + ..., description="Response output items" + ) + output_text: str = Field(..., description="Concatenated assistant output text") + usage: ResponsesUsage = Field(..., description="Token usage") + + # Streaming Models class ChatCompletionChunkDelta(BaseModel): """Delta object for streaming responses.""" diff --git a/tests/test_responses_api.py b/tests/test_responses_api.py new file mode 100644 index 0000000..8a1de3a --- /dev/null +++ b/tests/test_responses_api.py @@ -0,0 +1,155 @@ +"""Tests for the minimal OpenAI Responses API compatibility route.""" + +import json + +from tests.model_utils import get_test_model_id + + +DEFAULT_MODEL = get_test_model_id() + + +def parse_sse_events(body_text: str): + events = [] + current_event = {"event": None, "data": []} + for line in body_text.splitlines(): + if not line: + if current_event["data"]: + payload = "\n".join(current_event["data"]) + if payload == "[DONE]": + events.append({"event": current_event["event"], "data": "[DONE]"}) + else: + events.append( + { + "event": current_event["event"], + "data": json.loads(payload), + } + ) + current_event = {"event": None, "data": []} + continue + + if line.startswith("event: "): + current_event["event"] = line[7:] + elif line.startswith("data: "): + current_event["data"].append(line[6:]) + + return events + + +def test_responses_string_input(test_client): + response = test_client.post( + "/v1/responses", + json={ + "model": DEFAULT_MODEL, + "input": "Hi", + "stream": False, + "temperature": 0.2, + "max_output_tokens": 16, + }, + ) + + assert response.status_code == 200 + data = response.json() + + assert data["id"].startswith("resp_") + assert data["object"] == "response" + assert data["status"] == "completed" + assert data["model"] == DEFAULT_MODEL + assert data["max_output_tokens"] == 16 + assert data["output_text"] == "Hello! How can I help today?" + assert data["output"][0]["type"] == "message" + assert data["output"][0]["role"] == "assistant" + assert data["output"][0]["content"][0]["type"] == "output_text" + assert data["output"][0]["content"][0]["text"] == data["output_text"] + assert data["usage"]["input_tokens"] is not None + assert data["usage"]["output_tokens"] is not None + assert data["usage"]["total_tokens"] is not None + + +def test_responses_message_array_input_text_blocks(test_client): + response = test_client.post( + "/v1/responses", + json={ + "model": DEFAULT_MODEL, + "input": [ + {"role": "system", "content": "Keep replies short."}, + { + "type": "message", + "role": "user", + "content": [{"type": "input_text", "text": "Hi"}], + }, + ], + }, + ) + + assert response.status_code == 200 + data = response.json() + assert data["object"] == "response" + assert data["output_text"] == "Hello! How can I help today?" + + +def test_responses_streaming_text_events(test_client): + response = test_client.post( + "/v1/responses", + json={"model": DEFAULT_MODEL, "input": "Hi", "stream": True}, + ) + + assert response.status_code == 200 + assert "text/event-stream" in response.headers["content-type"] + + events = parse_sse_events(response.text) + event_names = [event["event"] for event in events] + + assert "response.created" in event_names + assert "response.output_item.added" in event_names + assert "response.content_part.added" in event_names + assert "response.output_text.delta" in event_names + assert "response.output_text.done" in event_names + assert "response.content_part.done" in event_names + assert "response.output_item.done" in event_names + assert "response.completed" in event_names + assert events[-1]["data"] == "[DONE]" + + deltas = [ + event["data"]["delta"] + for event in events + if event["event"] == "response.output_text.delta" + ] + assert "".join(deltas) == "Hello! How can I help today?" + + completed = next( + event["data"] + for event in events + if event["event"] == "response.completed" + ) + assert completed["response"]["object"] == "response" + assert completed["response"]["status"] == "completed" + assert completed["response"]["output_text"] == "Hello! How can I help today?" + + +def test_responses_rejects_unsupported_content_block(test_client): + response = test_client.post( + "/v1/responses", + json={ + "model": DEFAULT_MODEL, + "input": [ + { + "role": "user", + "content": [{"type": "input_image", "image_url": "https://x"}], + } + ], + }, + ) + + assert response.status_code == 400 + data = response.json() + assert data["error"]["code"] == "unsupported_input_block" + + +def test_openapi_responses_schema(test_client): + response = test_client.get("/openapi.json") + assert response.status_code == 200 + + schema = response.json() + assert "/v1/responses" in schema["paths"] + assert "ResponsesCreateRequest" in schema["components"]["schemas"] + assert "ResponsesResponse" in schema["components"]["schemas"] From c48f4999993e742559eee35d9b01f510413b700f Mon Sep 17 00:00:00 2001 From: technowhizz <7688823+technowhizz@users.noreply.github.com> Date: Sun, 10 May 2026 20:03:13 +0000 Subject: [PATCH 2/5] Add configurable HTTP access logging Introduces request logging through the application middleware using the existing access_log setting as the source of truth. This keeps HTTP method, path, status, duration, and client details available when access logging is enabled without forcing noisy request logs for installations that have disabled them. --- claude_code_api/core/config.py | 1 + claude_code_api/core/logging_config.py | 27 +++++++++--- claude_code_api/main.py | 24 ++++++++++ tests/test_logging_config.py | 40 +++++++++++++++++ tests/test_request_logging.py | 61 ++++++++++++++++++++++++++ 5 files changed, 148 insertions(+), 5 deletions(-) create mode 100644 tests/test_request_logging.py diff --git a/claude_code_api/core/config.py b/claude_code_api/core/config.py index 6c0b31e..46fd222 100644 --- a/claude_code_api/core/config.py +++ b/claude_code_api/core/config.py @@ -161,6 +161,7 @@ def parse_api_keys(cls, v): # Logging Configuration log_level: str = "INFO" + access_log: bool = False log_format: str = "json" log_file_path: str = default_log_file_path() log_to_file: bool = True diff --git a/claude_code_api/core/logging_config.py b/claude_code_api/core/logging_config.py index c121235..e30a8ea 100644 --- a/claude_code_api/core/logging_config.py +++ b/claude_code_api/core/logging_config.py @@ -60,7 +60,9 @@ def _create_file_handler( return handler -def _minimal_event_filter(debug_enabled: bool, min_level_name: str | None): +def _minimal_event_filter( + debug_enabled: bool, min_level_name: str | None, access_log_enabled: bool = False +): if debug_enabled: return None @@ -76,6 +78,8 @@ def _processor( return event_dict if event_dict.get("lifecycle") is True: return event_dict + if access_log_enabled and event_dict.get("access_log") is True: + return event_dict if event_dict.get("event") in _LIFECYCLE_EVENTS: return event_dict raise structlog.DropEvent @@ -84,10 +88,15 @@ def _processor( def _build_processors( - debug_enabled: bool, log_format: str, min_level_name: str | None + debug_enabled: bool, + log_format: str, + min_level_name: str | None, + access_log_enabled: bool = False, ) -> list[Any]: processors: list[Any] = [structlog.stdlib.filter_by_level] - minimal_filter = _minimal_event_filter(debug_enabled, min_level_name) + minimal_filter = _minimal_event_filter( + debug_enabled, min_level_name, access_log_enabled + ) if minimal_filter: processors.append(minimal_filter) @@ -122,6 +131,10 @@ def configure_logging(settings: Any) -> None: log_backup_count = int(getattr(settings, "log_backup_count", _DEFAULT_BACKUP_COUNT)) log_to_console = bool(getattr(settings, "log_to_console", True)) log_min_level = getattr(settings, "log_min_level_when_not_debug", "WARNING") + access_log_enabled = bool(getattr(settings, "access_log", False)) + + if access_log_enabled and log_level > logging.INFO: + log_level = logging.INFO handlers: list[logging.Handler] = [] if log_to_file and log_file_path: @@ -151,11 +164,15 @@ def configure_logging(settings: Any) -> None: root_logger.addHandler(handler) if not debug_enabled: - logging.getLogger("uvicorn.access").setLevel(logging.ERROR) + logging.getLogger("uvicorn.access").setLevel( + logging.INFO if access_log_enabled else logging.ERROR + ) logging.getLogger("uvicorn.error").setLevel(logging.ERROR) structlog.configure( - processors=_build_processors(debug_enabled, log_format, log_min_level), + processors=_build_processors( + debug_enabled, log_format, log_min_level, access_log_enabled + ), context_class=dict, logger_factory=structlog.stdlib.LoggerFactory(), wrapper_class=structlog.stdlib.BoundLogger, diff --git a/claude_code_api/main.py b/claude_code_api/main.py index 0681cbc..d9c0b95 100644 --- a/claude_code_api/main.py +++ b/claude_code_api/main.py @@ -5,6 +5,7 @@ while leveraging Claude Code's powerful workflow capabilities. """ +import time from contextlib import asynccontextmanager from typing import AsyncGenerator @@ -121,6 +122,28 @@ def custom_openapi(): app.middleware("http")(auth_middleware) +@app.middleware("http") +async def request_logging_middleware(request, call_next): + if not settings.access_log: + return await call_next(request) + + start = time.perf_counter() + response = await call_next(request) + duration_ms = round((time.perf_counter() - start) * 1000, 2) + + logger.info( + "HTTP request", + access_log=True, + method=request.method, + path=request.url.path, + status_code=response.status_code, + duration_ms=duration_ms, + client_host=request.client.host if request.client else None, + ) + + return response + + @app.exception_handler(HTTPException) async def http_exception_handler(request, exc): """Custom handler for HTTP exceptions to support OpenAI error format.""" @@ -226,4 +249,5 @@ async def root(): port=settings.port, reload=True, log_level=settings.log_level.lower(), + access_log=settings.access_log, ) diff --git a/tests/test_logging_config.py b/tests/test_logging_config.py index 6f2acb6..edc9604 100644 --- a/tests/test_logging_config.py +++ b/tests/test_logging_config.py @@ -22,6 +22,14 @@ def test_minimal_event_filter_allows_warning_and_lifecycle_info(): lifecycle_event = {"event": "Starting Claude Code API Gateway"} assert processor(None, "info", lifecycle_event) is lifecycle_event + access_event = {"event": "HTTP request", "access_log": True} + with pytest.raises(structlog.DropEvent): + processor(None, "info", access_event) + + access_processor = logging_config._minimal_event_filter(False, "WARNING", True) + assert access_processor is not None + assert access_processor(None, "info", access_event) is access_event + with pytest.raises(structlog.DropEvent): processor(None, "info", {"event": "suppressed info"}) @@ -46,6 +54,7 @@ def raise_oserror(*args, **kwargs): log_backup_count=1, log_to_console=False, log_min_level_when_not_debug="WARNING", + access_log=False, ) try: @@ -60,3 +69,34 @@ def raise_oserror(*args, **kwargs): root_logger.addHandler(handler) root_logger.setLevel(original_level) structlog.reset_defaults() + + +def test_configure_logging_keeps_info_enabled_for_access_logs(): + original_root = logging.getLogger() + original_handlers = list(original_root.handlers) + original_level = original_root.level + + settings = SimpleNamespace( + debug=False, + log_level="ERROR", + log_format="json", + log_file_path="", + log_to_file=False, + log_max_bytes=1024, + log_backup_count=1, + log_to_console=True, + log_min_level_when_not_debug="WARNING", + access_log=True, + ) + + try: + logging_config.configure_logging(settings) + assert logging.getLogger().level == logging.INFO + assert logging.getLogger("uvicorn.access").level == logging.INFO + finally: + root_logger = logging.getLogger() + root_logger.handlers.clear() + for handler in original_handlers: + root_logger.addHandler(handler) + root_logger.setLevel(original_level) + structlog.reset_defaults() diff --git a/tests/test_request_logging.py b/tests/test_request_logging.py new file mode 100644 index 0000000..baf8e01 --- /dev/null +++ b/tests/test_request_logging.py @@ -0,0 +1,61 @@ +"""Tests for HTTP request access logging middleware.""" + +import asyncio +from types import SimpleNamespace + +from claude_code_api import main as main_module + + +class FakeLogger: + def __init__(self): + self.calls = [] + + def info(self, *args, **kwargs): + self.calls.append((args, kwargs)) + + +def _request(path: str = "/health"): + return SimpleNamespace( + method="GET", + url=SimpleNamespace(path=path), + client=SimpleNamespace(host="127.0.0.1"), + ) + + +async def _response(request): + return SimpleNamespace(status_code=204) + + +def test_request_logging_middleware_skips_when_access_log_disabled(monkeypatch): + fake_logger = FakeLogger() + monkeypatch.setattr(main_module.settings, "access_log", False) + monkeypatch.setattr(main_module, "logger", fake_logger) + + response = asyncio.run( + main_module.request_logging_middleware(_request(), _response) + ) + + assert response.status_code == 204 + assert fake_logger.calls == [] + + +def test_request_logging_middleware_logs_when_access_log_enabled(monkeypatch): + fake_logger = FakeLogger() + monkeypatch.setattr(main_module.settings, "access_log", True) + monkeypatch.setattr(main_module, "logger", fake_logger) + + response = asyncio.run( + main_module.request_logging_middleware(_request("/v1/models"), _response) + ) + + assert response.status_code == 204 + assert len(fake_logger.calls) == 1 + + args, kwargs = fake_logger.calls[0] + assert args == ("HTTP request",) + assert kwargs["access_log"] is True + assert kwargs["method"] == "GET" + assert kwargs["path"] == "/v1/models" + assert kwargs["status_code"] == 204 + assert kwargs["client_host"] == "127.0.0.1" + assert isinstance(kwargs["duration_ms"], float) From 1e9eabe2ed4dfbac2875fe039abc6fed9521fb29 Mon Sep 17 00:00:00 2001 From: technowhizz <7688823+technowhizz@users.noreply.github.com> Date: Sun, 10 May 2026 20:03:21 +0000 Subject: [PATCH 3/5] Improve Docker build cache reuse Reorders the Docker build so dependency installation and slow setup steps can be cached independently from application source changes. This reduces rebuild time during local iteration while keeping the final runtime image behavior and startup command unchanged. --- .dockerignore | 34 ++++++++++++++++++++++++++++++ docker/Dockerfile | 53 +++++++++++++++++++++++++++++++++++------------ 2 files changed, 74 insertions(+), 13 deletions(-) create mode 100644 .dockerignore diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..22c7f0f --- /dev/null +++ b/.dockerignore @@ -0,0 +1,34 @@ +.git +.github +.pytest_cache +__pycache__ +*.py[cod] +*.pyo +*.pyd +*.egg-info +.eggs +build +dist +.coverage +htmlcov +.mypy_cache +.tox +.venv +venv + +# Local runtime state and secrets should not affect Docker build cache. +.env +.env.* +*.db +*.sqlite +*.sqlite3 +claude_projects +claude_sessions +workspace +config +claude-config + +# Not needed in the runtime image. +tests +docs +assets diff --git a/docker/Dockerfile b/docker/Dockerfile index 0f9aa03..0678ee3 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -1,8 +1,17 @@ +# syntax=docker/dockerfile:1.7 + FROM ubuntu:24.04 +ARG APP_UID=1001 +ARG APP_GID=1001 + ENV DEBIAN_FRONTEND=noninteractive +ENV VIRTUAL_ENV=/home/claudeuser/venv +ENV PATH="${VIRTUAL_ENV}/bin:/home/claudeuser/.local/bin:/home/claudeuser/.bun/bin:${PATH}" -RUN apt-get update && apt-get install -y \ +RUN --mount=type=cache,target=/var/cache/apt,sharing=locked \ + --mount=type=cache,target=/var/lib/apt/lists,sharing=locked \ + apt-get update && apt-get install -y --no-install-recommends \ bash \ ca-certificates \ curl \ @@ -11,30 +20,43 @@ RUN apt-get update && apt-get install -y \ python3 \ python3-pip \ python3-venv \ - sudo \ - && rm -rf /var/lib/apt/lists/* + sudo # Create non-root user -RUN useradd -m -s /bin/bash claudeuser && \ - echo "claudeuser ALL=(ALL) NOPASSWD:ALL" >> /etc/sudoers +RUN groupadd --gid "${APP_GID}" claudeuser && \ + useradd --uid "${APP_UID}" --gid "${APP_GID}" -m -s /bin/bash claudeuser && \ + echo "claudeuser ALL=(ALL) NOPASSWD:ALL" >> /etc/sudoers && \ + mkdir -p /home/claudeuser/app /home/claudeuser/.config/claude && \ + chown -R claudeuser:claudeuser /home/claudeuser # Set up application directory WORKDIR /home/claudeuser/app -COPY . /home/claudeuser/app -RUN chown -R claudeuser:claudeuser /home/claudeuser/app USER claudeuser # Install Claude CLI using the official installer (no npm required) RUN curl -fsSL https://claude.ai/install.sh | bash -# Create virtualenv and install dependencies -RUN python3 -m venv /home/claudeuser/venv && \ - /home/claudeuser/venv/bin/pip install --upgrade pip setuptools wheel && \ - /home/claudeuser/venv/bin/pip install -e . --use-pep517 || \ - /home/claudeuser/venv/bin/pip install -e . +# Create virtualenv and install dependency metadata before copying source. +RUN python3 -m venv "${VIRTUAL_ENV}" + +RUN --mount=type=cache,id=claude-api-pip-cache,target=/home/claudeuser/.cache/pip,uid=1001,gid=1001,mode=0775 \ + pip install --upgrade pip setuptools wheel + +COPY --chown=claudeuser:claudeuser pyproject.toml ./ + +RUN --mount=type=cache,id=claude-api-pip-cache,target=/home/claudeuser/.cache/pip,uid=1001,gid=1001,mode=0775 \ + python3 - <<'PY' > /tmp/runtime-requirements.txt && \ + pip install -r /tmp/runtime-requirements.txt +import tomllib +from pathlib import Path + +metadata = tomllib.loads(Path("pyproject.toml").read_text(encoding="utf-8")) +for dependency in metadata["project"]["dependencies"]: + print(dependency) +PY -ENV PATH="/home/claudeuser/venv/bin:/home/claudeuser/.local/bin:/home/claudeuser/.bun/bin:${PATH}" +COPY --chown=claudeuser:claudeuser setup.py setup.cfg README.md VERSION ./ # Create Claude config and workspace directories RUN mkdir -p /home/claudeuser/.config/claude /home/claudeuser/app/workspace @@ -100,4 +122,9 @@ exec python3 -m claude_code_api.main EOF RUN chmod +x /home/claudeuser/entrypoint.sh +COPY --chown=claudeuser:claudeuser claude_code_api ./claude_code_api + +RUN --mount=type=cache,id=claude-api-pip-cache,target=/home/claudeuser/.cache/pip,uid=1001,gid=1001,mode=0775 \ + pip install --no-deps --no-build-isolation . + ENTRYPOINT ["/home/claudeuser/entrypoint.sh"] From 5163fe3222340d9b062e9af298646754abe6b951 Mon Sep 17 00:00:00 2001 From: technowhizz <7688823+technowhizz@users.noreply.github.com> Date: Sun, 10 May 2026 20:03:24 +0000 Subject: [PATCH 4/5] Avoid Claude CLI stdin wait warnings Closes the Claude CLI stdin stream explicitly for requests that do not provide stdin data, preventing the CLI from waiting before continuing. This removes the recurring warning and avoids adding avoidable latency to simple non-interactive API calls. --- claude_code_api/core/claude_manager.py | 2 +- tests/test_claude_manager_unit.py | 38 ++++++++++++++++++++++++++ 2 files changed, 39 insertions(+), 1 deletion(-) diff --git a/claude_code_api/core/claude_manager.py b/claude_code_api/core/claude_manager.py index 66330e4..a4fa10e 100644 --- a/claude_code_api/core/claude_manager.py +++ b/claude_code_api/core/claude_manager.py @@ -100,7 +100,7 @@ async def start( cwd=src_dir, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, - stdin=asyncio.subprocess.PIPE, + stdin=asyncio.subprocess.DEVNULL, ) self.is_running = True diff --git a/tests/test_claude_manager_unit.py b/tests/test_claude_manager_unit.py index 8648fd5..a70a2c9 100644 --- a/tests/test_claude_manager_unit.py +++ b/tests/test_claude_manager_unit.py @@ -1,5 +1,6 @@ """Unit tests for Claude manager helpers.""" +import asyncio import os import types @@ -51,6 +52,43 @@ def test_decode_output_line(): assert data["type"] == "text" +@pytest.mark.asyncio +async def test_claude_process_redirects_stdin_to_devnull(monkeypatch): + process = cm.ClaudeProcess(session_id="sess", project_path="/tmp") + captured_kwargs = {} + + class EmptyStream: + async def readline(self): + return b"" + + class FakeProcess: + stdout = EmptyStream() + stderr = EmptyStream() + returncode = None + + def terminate(self): + pass + + async def wait(self): + return 0 + + async def fake_create_subprocess_exec(*_args, **kwargs): + captured_kwargs.update(kwargs) + return FakeProcess() + + async def fake_verify_startup(self): + return True + + monkeypatch.setattr(cm.asyncio, "create_subprocess_exec", fake_create_subprocess_exec) + monkeypatch.setattr(cm.ClaudeProcess, "_verify_startup", fake_verify_startup) + + try: + assert await process.start(prompt="hello") is True + assert captured_kwargs["stdin"] == asyncio.subprocess.DEVNULL + finally: + await process.stop() + + @pytest.mark.asyncio async def test_create_session_rejects_duplicate_active_session(monkeypatch, tmp_path): manager = cm.ClaudeManager() From 15d38ee5f59a809fa3a98586e7d7806c7becab34 Mon Sep 17 00:00:00 2001 From: technowhizz <7688823+technowhizz@users.noreply.github.com> Date: Sun, 10 May 2026 20:08:18 +0000 Subject: [PATCH 5/5] Add current Claude Opus and Sonnet models Updates the default model catalog with current Claude Opus and Sonnet aliases while preserving existing canonical model entries. The aliases now resolve to the latest documented model IDs, and the model API tests cover the new defaults and compatibility mappings. --- claude_code_api/config/models.json | 39 ++++++++++++++++++++++++++---- tests/test_models_unit.py | 17 ++++++++++--- 2 files changed, 48 insertions(+), 8 deletions(-) diff --git a/claude_code_api/config/models.json b/claude_code_api/config/models.json index bc770a2..1993c1f 100644 --- a/claude_code_api/config/models.json +++ b/claude_code_api/config/models.json @@ -1,15 +1,34 @@ { - "default_model": "claude-sonnet-4-5-20250929", + "default_model": "claude-sonnet-4-6", "aliases": { - "opus": "claude-opus-4-6-20260205", - "sonnet": "claude-sonnet-4-5-20250929", + "opus": "claude-opus-4-7", + "sonnet": "claude-sonnet-4-6", "haiku": "claude-haiku-4-5-20251001", - "claude-opus-latest": "claude-opus-4-6-20260205", + + "claude-opus-latest": "claude-opus-4-7", + "claude-opus-4-7": "claude-opus-4-7", "claude-opus-4-6": "claude-opus-4-6-20260205", "claude-opus-4-5": "claude-opus-4-5-20251101", - "claude-opus-4-5-latest": "claude-opus-4-5-20251101" + "claude-opus-4-5-latest": "claude-opus-4-5-20251101", + + "claude-sonnet-latest": "claude-sonnet-4-6", + "claude-sonnet-4-6": "claude-sonnet-4-6", + "claude-sonnet-4-5": "claude-sonnet-4-5-20250929", + + "claude-haiku-latest": "claude-haiku-4-5-20251001", + "claude-haiku-4-5": "claude-haiku-4-5-20251001" }, "models": [ + { + "id": "claude-opus-4-7", + "name": "Claude Opus 4.7", + "description": "Most capable generally available Claude model for complex reasoning and agentic coding", + "max_tokens": 131072, + "input_cost_per_1k": 0.005, + "output_cost_per_1k": 0.025, + "supports_streaming": true, + "supports_tools": true + }, { "id": "claude-opus-4-6-20260205", "name": "Claude Opus 4.6", @@ -30,6 +49,16 @@ "supports_streaming": true, "supports_tools": true }, + { + "id": "claude-sonnet-4-6", + "name": "Claude Sonnet 4.6", + "description": "Best combination of speed and intelligence", + "max_tokens": 65536, + "input_cost_per_1k": 0.003, + "output_cost_per_1k": 0.015, + "supports_streaming": true, + "supports_tools": true + }, { "id": "claude-sonnet-4-5-20250929", "name": "Claude Sonnet 4.5", diff --git a/tests/test_models_unit.py b/tests/test_models_unit.py index 7168d0d..0c29836 100644 --- a/tests/test_models_unit.py +++ b/tests/test_models_unit.py @@ -14,17 +14,28 @@ def clear_models_cache(): claude_models._load_models_config.cache_clear() -def test_opus_46_is_available(): +def test_latest_opus_and_sonnet_are_available(): available_models = {model.id for model in claude_models.get_available_models()} + assert "claude-opus-4-7" in available_models assert "claude-opus-4-6-20260205" in available_models + assert "claude-sonnet-4-6" in available_models -def test_opus_alias_resolves_to_canonical_model(): +def test_model_aliases_resolve_to_current_models(): + assert claude_models.validate_claude_model("claude-opus-4-7") == "claude-opus-4-7" + assert claude_models.validate_claude_model("opus") == "claude-opus-4-7" + assert claude_models.validate_claude_model("sonnet") == "claude-sonnet-4-6" + assert ( + claude_models.validate_claude_model("claude-sonnet-latest") + == "claude-sonnet-4-6" + ) + + +def test_opus_46_alias_resolves_to_canonical_model(): assert ( claude_models.validate_claude_model("claude-opus-4-6") == "claude-opus-4-6-20260205" ) - assert claude_models.validate_claude_model("opus") == "claude-opus-4-6-20260205" def test_opus_45_falls_forward_to_latest_opus_when_missing(tmp_path, monkeypatch):