diff --git a/.gitignore b/.gitignore index 494bece..67b948b 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ .venv/ +.vscode/ __pycache__/ *.pyc .env @@ -16,4 +17,5 @@ coverage.xml .scannerwork/ opencode.json .vscode/ -agents \ No newline at end of file +agents +opencode.json diff --git a/pyproject.toml b/pyproject.toml index 57db886..5422d74 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -34,7 +34,7 @@ dependencies = [ "arize-phoenix-otel==0.15.0", "openinference-instrumentation-langchain==0.1.62", "arize-phoenix-client==2.3.0", - + "tenacity>=8.0.0" ] [dependency-groups] dev = [ diff --git a/src/application/requests/prompt.py b/src/application/requests/prompt.py index 729473c..220f9e7 100644 --- a/src/application/requests/prompt.py +++ b/src/application/requests/prompt.py @@ -1,17 +1,56 @@ -from pydantic import BaseModel, Field +from enum import Enum +from pydantic import BaseModel, Field, field_validator, model_validator + + +class MessageRole(str, Enum): + system = "system" + user = "user" + assistant = "assistant" + + +class PromptMessage(BaseModel): + role: MessageRole + content: str = Field(..., min_length=1) class CreatePromptRequest(BaseModel): - identifier: str = Field(..., min_length=1) - content: list[dict[str, str]] - model_name: str + identifier: str = Field(..., min_length=1, pattern=r"^[a-zA-Z0-9_-]+$") + content: list[PromptMessage] = Field(..., min_length=1) + model_name: str = Field(..., min_length=1) description: str | None = None tags: list[str] | None = None - metadata : dict | None = None + metadata: dict | None = None + + @model_validator(mode="after") + def validate_content_roles(self) -> "CreatePromptRequest": + roles = [msg.role for msg in self.content] + if MessageRole.user not in roles and MessageRole.system not in roles: + raise ValueError("content must contain at least one 'user' or 'system' message") + return self + + @field_validator("content", mode="before") + @classmethod + def validate_message_format(cls, v: list) -> list: + if not v: + raise ValueError("content cannot be empty") + for i, msg in enumerate(v): + if isinstance(msg, dict): + if "role" not in msg: + raise ValueError(f"message[{i}] missing 'role' field") + if "content" not in msg: + raise ValueError(f"message[{i}] missing 'content' field") + return v class UpdatePromptRequest(BaseModel): - content: list[dict[str, str]] | None = None + content: list[PromptMessage] | None = None model_name: str | None = None description: str | None = None - metadata : dict | None = None + metadata: dict | None = None + + @field_validator("content", mode="before") + @classmethod + def validate_content_not_empty(cls, v: list | None) -> list | None: + if v is not None and len(v) == 0: + raise ValueError("content cannot be an empty list") + return v diff --git a/src/application/routes/prompt.py b/src/application/routes/prompt.py index 8e6cde0..6cb254d 100644 --- a/src/application/routes/prompt.py +++ b/src/application/routes/prompt.py @@ -1,6 +1,7 @@ import logging from fastapi import APIRouter, Depends, HTTPException +from httpx import HTTPStatusError from src.application.requests.prompt import ( CreatePromptRequest, @@ -9,7 +10,7 @@ from src.application.use_cases.create_prompt import CreatePromptUseCase from src.application.use_cases.get_prompt import GetPromptUseCase from src.application.use_cases.update_prompt import UpdatePromptUseCase -from src.dependencies import get_prompt_manager # We'll add this +from src.dependencies import get_prompt_manager from src.domain.ports.prompt_manager import PromptManager logger = logging.getLogger("composable-agents") @@ -17,7 +18,23 @@ router = APIRouter(prefix="/prompts", tags=["prompts"]) -@router.post("/create") +def _handle_http_error(e: Exception, identifier: str | None = None) -> HTTPException: + """Map exceptions to appropriate HTTP status codes.""" + if isinstance(e, ValueError) and "not found" in str(e).lower(): + return HTTPException(status_code=404, detail=str(e)) + if isinstance(e, HTTPStatusError): + if e.response.status_code == 404: + return HTTPException(status_code=404, detail=f"Prompt not found: {identifier}") + if e.response.status_code == 409: + return HTTPException(status_code=409, detail=f"Prompt already exists: {identifier}") + if e.response.status_code == 400: + return HTTPException(status_code=400, detail=str(e)) + if isinstance(e, ValueError): + return HTTPException(status_code=400, detail=str(e)) + return HTTPException(status_code=500, detail=str(e)) + + +@router.post("/create", status_code=201) async def create_prompt( request: CreatePromptRequest, prompt_manager: PromptManager = Depends(get_prompt_manager), @@ -25,9 +42,10 @@ async def create_prompt( """Create a new prompt.""" use_case = CreatePromptUseCase(prompt_manager) try: + content_dicts = [msg.model_dump() for msg in request.content] prompt = await use_case.execute( identifier=request.identifier, - content=request.content, + content=content_dicts, model_name=request.model_name, description=request.description, tags=request.tags, @@ -35,8 +53,8 @@ async def create_prompt( ) return {"status": "success", "prompt": prompt} except Exception as e: - logger.error(f"Error creating prompt: {e}") - raise HTTPException(status_code=500, detail=str(e)) + logger.error(f"Error creating prompt '{request.identifier}': {e}") + raise _handle_http_error(e, request.identifier) @router.get("/get/{identifier}") @@ -56,8 +74,8 @@ async def get_prompt( ) return {"status": "success", "prompt": prompt} except Exception as e: - logger.error(f"Error getting prompt: {e}") - raise HTTPException(status_code=404, detail=str(e)) + logger.error(f"Error getting prompt '{identifier}': {e}") + raise _handle_http_error(e, identifier) @router.put("/update/{identifier}") @@ -69,14 +87,15 @@ async def update_prompt( """Update a prompt.""" use_case = UpdatePromptUseCase(prompt_manager) try: + content_dicts = [msg.model_dump() for msg in request.content] if request.content else None prompt = await use_case.execute( identifier=identifier, - content=request.content, + content=content_dicts, model_name=request.model_name, description=request.description, metadata=request.metadata, ) return {"status": "success", "prompt": prompt} except Exception as e: - logger.error(f"Error updating prompt: {e}") - raise HTTPException(status_code=500, detail=str(e)) + logger.error(f"Error updating prompt '{identifier}': {e}") + raise _handle_http_error(e, identifier) diff --git a/src/infrastructure/prompt_management/phoenix_prompt_adapter.py b/src/infrastructure/prompt_management/phoenix_prompt_adapter.py index 3dc5c7c..951e2da 100644 --- a/src/infrastructure/prompt_management/phoenix_prompt_adapter.py +++ b/src/infrastructure/prompt_management/phoenix_prompt_adapter.py @@ -1,42 +1,91 @@ import logging import os +from functools import wraps +from typing import TypeVar, Callable, Any +import asyncio +import httpx from cachetools import TTLCache, cached from phoenix.client import Client +from phoenix.client.client import _update_headers, _WrappedClient from phoenix.client.resources.prompts import PromptVersion as PhoenixPromptVersion +from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type from src.domain.entities.prompt import Prompt, PromptVersion from src.domain.ports.prompt_manager import PromptManager logger = logging.getLogger("composable-agents") +# Retry config +_RETRY_ATTEMPTS = 3 +_RETRY_MIN_WAIT = 1 +_RETRY_MAX_WAIT = 10 + +_phoenix_retry = retry( + stop=stop_after_attempt(_RETRY_ATTEMPTS), + wait=wait_exponential(multiplier=1, min=_RETRY_MIN_WAIT, max=_RETRY_MAX_WAIT), + retry=retry_if_exception_type((httpx.TimeoutException, httpx.ConnectError)), + reraise=True, +) + + +class PhoenixUnavailableError(RuntimeError): + """Raised when Phoenix is unreachable or returns a server error.""" + pass + + +def _wrap_phoenix_error(operation: str, identifier: str, e: Exception) -> Exception: + """Convert raw Phoenix/httpx exceptions to meaningful domain errors.""" + if isinstance(e, (httpx.TimeoutException, httpx.ConnectError)): + return PhoenixUnavailableError( + f"Phoenix unavailable during '{operation}' for '{identifier}': {e}" + ) + if isinstance(e, httpx.HTTPStatusError): + if e.response.status_code == 404: + return ValueError(f"Prompt not found: {identifier}") + if e.response.status_code >= 500: + return PhoenixUnavailableError( + f"Phoenix server error ({e.response.status_code}) during '{operation}' for '{identifier}'" + ) + return e + class PhoenixPromptManagerProvider(PromptManager): """Phoenix implementation of PromptManager port.""" - def __init__(self, base_url: str | None = None, api_key: str | None = None): + def __init__( + self, + base_url: str | None = None, + api_key: str | None = None, + timeout: float = 10.0, # configurable timeout + ): base_url = base_url or os.getenv("PHOENIX_COLLECTOR_ENDPOINT", "http://localhost:6006") api_key = api_key or os.getenv("PHOENIX_API_KEY") + self._timeout = timeout try: self._client = Client( base_url=base_url, api_key=api_key, + http_client=httpx.Client( + base_url=base_url, + headers={"Authorization": f"Bearer {api_key}"} if api_key else {}, + timeout=httpx.Timeout(connect=10.0, read=timeout, write=timeout, pool=10.0), + ), ) - logger.info(f"PhoenixPromptManagerProvider initialized with base_url={base_url}") + logger.info("PhoenixPromptManagerProvider initialized base_url=%s timeout=%ss", base_url, timeout) except Exception as e: - logger.error(f"Failed to initialize Phoenix client: {e}") + logger.error("Failed to initialize Phoenix client: %s", e) self._client = None + @_phoenix_retry async def get_prompt( self, identifier: str, version_id: str | None = None, tag: str | None = None, ) -> Prompt: - """Retrieve a prompt from Phoenix.""" if not self._client: - raise RuntimeError("Phoenix client not initialized") - + raise PhoenixUnavailableError("Phoenix client not initialized") try: prompt_obj: PhoenixPromptVersion = self._client.prompts.get( prompt_identifier=identifier, @@ -46,11 +95,14 @@ async def get_prompt( if not prompt_obj: raise ValueError(f"Prompt not found: {identifier}") return self._to_domain_prompt(prompt_obj, identifier=identifier, description=prompt_obj._description) - except Exception as e: - logger.error(f"Error getting prompt {identifier}: {e}") + except (ValueError, PhoenixUnavailableError): raise + except Exception as e: + logger.error("Error getting prompt '%s': %s", identifier, e) + raise _wrap_phoenix_error("get_prompt", identifier, e) from e @cached(cache=TTLCache(maxsize=10, ttl=300)) + @_phoenix_retry async def get_prompt_content( self, identifier: str, @@ -58,7 +110,7 @@ async def get_prompt_content( tag: str | None = None, ) -> dict[str, str]: if not self._client: - raise RuntimeError("Phoenix client not initialized") + raise PhoenixUnavailableError("Phoenix client not initialized") try: prompt_obj = self._client.prompts.get( prompt_identifier=identifier, @@ -66,13 +118,15 @@ async def get_prompt_content( tag=tag, ) domain = self._to_domain_prompt(prompt_obj, identifier=identifier) - # Return first message (system prompt) or empty messages = domain.current_version.content return messages[0] if messages else {} - except Exception as e: - logger.error(f"Error getting prompt content {identifier}: {e}") + except (ValueError, PhoenixUnavailableError): raise + except Exception as e: + logger.error("Error getting prompt content '%s': %s", identifier, e) + raise _wrap_phoenix_error("get_prompt_content", identifier, e) from e + @_phoenix_retry async def create_prompt( self, identifier: str, @@ -82,10 +136,8 @@ async def create_prompt( tags: list[str] | None = None, metadata: dict | None = None, ) -> PhoenixPromptVersion: - """Create a new prompt in Phoenix.""" if not self._client: - raise RuntimeError("Phoenix client not initialized") - + raise PhoenixUnavailableError("Phoenix client not initialized") try: prompt_obj = self._client.prompts.create( name=identifier, @@ -93,7 +145,6 @@ async def create_prompt( prompt_description=description, prompt_metadata=metadata, ) - if tags and prompt_obj.id: for tag in tags: try: @@ -102,50 +153,59 @@ async def create_prompt( name=tag, ) except Exception as tag_error: - logger.warning(f"Failed to add tag {tag}: {tag_error}") + logger.warning("Failed to add tag '%s' to '%s': %s", tag, identifier, tag_error) - logger.info(f"Created prompt {identifier}") + logger.info("Created prompt '%s'", identifier) return prompt_obj - except Exception as e: - logger.error(f"Error creating prompt {identifier}: {e}") + except (ValueError, PhoenixUnavailableError): raise + except Exception as e: + logger.error("Error creating prompt '%s': %s", identifier, e) + raise _wrap_phoenix_error("create_prompt", identifier, e) from e + @_phoenix_retry async def update_prompt( self, identifier: str, content: list[dict[str, str]] | None = None, model_name: str | None = None, description: str | None = None, + metadata: dict | None = None, ) -> PhoenixPromptVersion: - """Update a prompt (creates new version).""" if not self._client: - raise RuntimeError("Phoenix client not initialized") - + raise PhoenixUnavailableError("Phoenix client not initialized") + if description is not None: + logger.warning( + "Phoenix does not support updating description on existing prompts — description change ignored for '%s'", + identifier + ) try: current = await self.get_prompt(identifier) - updated = self._client.prompts.create( name=identifier, version=PhoenixPromptVersion(content, model_name=model_name), prompt_description=description or current.description, ) - logger.info(f"Updated prompt {identifier}") + logger.info("Updated prompt '%s'", identifier) return updated - except Exception as e: - logger.error(f"Error updating prompt {identifier}: {e}") + except (ValueError, PhoenixUnavailableError): raise + except Exception as e: + logger.error("Error updating prompt '%s': %s", identifier, e) + raise _wrap_phoenix_error("update_prompt", identifier, e) from e async def add_tag(self, identifier: str, tag: str) -> None: - """Add a tag to a prompt.""" if not self._client: - raise RuntimeError("Phoenix client not initialized") - + raise PhoenixUnavailableError("Phoenix client not initialized") try: - self._client.prompts.tag(prompt_identifier=identifier, tag=tag) - logger.info(f"Added tag {tag} to prompt {identifier}") + self._client.prompts.tags.create( + prompt_version_id=identifier, + name=tag, + ) + logger.info("Added tag '%s' to prompt '%s'", tag, identifier) except Exception as e: - logger.error(f"Error adding tag: {e}") - raise + logger.error("Error adding tag '%s' to '%s': %s", tag, identifier, e) + raise _wrap_phoenix_error("add_tag", identifier, e) from e def _to_domain_prompt( self, @@ -153,17 +213,13 @@ def _to_domain_prompt( identifier: str | None = None, description: str | None = None, ) -> Prompt: - """Convert Phoenix PromptVersion to domain entity.""" - template = getattr(phoenix_prompt, "_template", {}) raw_messages = template.get("messages", []) if isinstance(template, dict) else [] - # Normalize Phoenix message format → domain format messages = [] for msg in raw_messages: role = msg.get("role", "") raw_content = msg.get("content", "") - # Phoenix stores content as list of blocks or plain string if isinstance(raw_content, list): text = " ".join( block.get("text", "") for block in raw_content diff --git a/tests/unit/test_phoenix_prompt_manager.py b/tests/unit/test_phoenix_prompt_manager.py index bf4e66b..28eeb24 100644 --- a/tests/unit/test_phoenix_prompt_manager.py +++ b/tests/unit/test_phoenix_prompt_manager.py @@ -72,8 +72,11 @@ async def test_get_prompt_not_found(self, manager): @pytest.mark.asyncio async def test_add_tag(self, manager): - manager._client.prompts.tag = MagicMock() + manager._client.prompts.tag.create = MagicMock() await manager.add_tag("test-prompt", "new-tag") - manager._client.prompts.tag.assert_called_once_with(prompt_identifier="test-prompt", tag="new-tag") + manager._client.prompts.tags.create.assert_called_once_with( + prompt_version_id="test-prompt", + name="new-tag", + ) diff --git a/tests/unit/test_prompt_use_cases.py b/tests/unit/test_prompt_use_cases.py new file mode 100644 index 0000000..32c5628 --- /dev/null +++ b/tests/unit/test_prompt_use_cases.py @@ -0,0 +1,198 @@ +# tests/unit/test_prompt_use_cases.py +from datetime import datetime +from unittest.mock import AsyncMock, MagicMock + +import pytest + +from src.application.use_cases.create_prompt import CreatePromptUseCase +from src.application.use_cases.get_prompt import GetPromptUseCase +from src.application.use_cases.update_prompt import UpdatePromptUseCase +from src.domain.entities.prompt import Prompt, PromptVersion + + +def _make_prompt(identifier: str = "test-prompt") -> Prompt: + return Prompt( + identifier=identifier, + description="Test description", + current_version=PromptVersion( + version_id="v1", + content=[{"role": "user", "content": "Hello"}], + model_name="gpt-4", + tags=[], + ), + created_at=datetime.now(), + updated_at=datetime.now(), + ) + + +@pytest.fixture +def mock_prompt_manager(): + manager = MagicMock() + manager.get_prompt = AsyncMock() + manager.create_prompt = AsyncMock() + manager.update_prompt = AsyncMock() + return manager + + +class TestCreatePromptUseCase: + async def test_execute_success(self, mock_prompt_manager): + expected = _make_prompt() + mock_prompt_manager.create_prompt.return_value = expected + + use_case = CreatePromptUseCase(mock_prompt_manager) + result = await use_case.execute( + identifier="test-prompt", + content=[{"role": "user", "content": "Hello"}], + model_name="gpt-4", + description="Test description", + tags=["tag1"], + ) + + assert result == expected + mock_prompt_manager.create_prompt.assert_called_once_with( + identifier="test-prompt", + content=[{"role": "user", "content": "Hello"}], + model_name="gpt-4", + description="Test description", + tags=["tag1"], + metadata=None, + ) + + async def test_execute_propagates_exception(self, mock_prompt_manager): + mock_prompt_manager.create_prompt.side_effect = ValueError("already exists") + + use_case = CreatePromptUseCase(mock_prompt_manager) + with pytest.raises(ValueError, match="already exists"): + await use_case.execute( + identifier="test-prompt", + content=[{"role": "user", "content": "Hello"}], + model_name="gpt-4", + ) + + async def test_execute_without_optional_fields(self, mock_prompt_manager): + expected = _make_prompt() + mock_prompt_manager.create_prompt.return_value = expected + + use_case = CreatePromptUseCase(mock_prompt_manager) + result = await use_case.execute( + identifier="test-prompt", + content=[{"role": "user", "content": "Hello"}], + model_name="gpt-4", + ) + + assert result == expected + mock_prompt_manager.create_prompt.assert_called_once_with( + identifier="test-prompt", + content=[{"role": "user", "content": "Hello"}], + model_name="gpt-4", + description=None, + tags=None, + metadata=None, + ) + + +class TestGetPromptUseCase: + async def test_execute_success(self, mock_prompt_manager): + expected = _make_prompt() + mock_prompt_manager.get_prompt.return_value = expected + + use_case = GetPromptUseCase(mock_prompt_manager) + result = await use_case.execute(identifier="test-prompt") + + assert result == expected + mock_prompt_manager.get_prompt.assert_called_once_with( + identifier="test-prompt", + version_id=None, + tag=None, + ) + + async def test_execute_with_version_id(self, mock_prompt_manager): + expected = _make_prompt() + mock_prompt_manager.get_prompt.return_value = expected + + use_case = GetPromptUseCase(mock_prompt_manager) + result = await use_case.execute(identifier="test-prompt", version_id="v2") + + assert result == expected + mock_prompt_manager.get_prompt.assert_called_once_with( + identifier="test-prompt", + version_id="v2", + tag=None, + ) + + async def test_execute_with_tag(self, mock_prompt_manager): + expected = _make_prompt() + mock_prompt_manager.get_prompt.return_value = expected + + use_case = GetPromptUseCase(mock_prompt_manager) + result = await use_case.execute(identifier="test-prompt", tag="production") + + assert result == expected + mock_prompt_manager.get_prompt.assert_called_once_with( + identifier="test-prompt", + version_id=None, + tag="production", + ) + + async def test_execute_not_found_raises(self, mock_prompt_manager): + mock_prompt_manager.get_prompt.side_effect = ValueError("Prompt not found: test-prompt") + + use_case = GetPromptUseCase(mock_prompt_manager) + with pytest.raises(ValueError, match="not found"): + await use_case.execute(identifier="test-prompt") + + +class TestUpdatePromptUseCase: + async def test_execute_success(self, mock_prompt_manager): + expected = _make_prompt() + mock_prompt_manager.update_prompt.return_value = expected + + use_case = UpdatePromptUseCase(mock_prompt_manager) + result = await use_case.execute( + identifier="test-prompt", + content=[{"role": "user", "content": "Updated"}], + model_name="gpt-4", + description="Updated description", + ) + + assert result == expected + mock_prompt_manager.update_prompt.assert_called_once_with( + identifier="test-prompt", + content=[{"role": "user", "content": "Updated"}], + model_name="gpt-4", + description="Updated description", + metadata=None, + ) + + async def test_execute_partial_update(self, mock_prompt_manager): + expected = _make_prompt() + mock_prompt_manager.update_prompt.return_value = expected + + use_case = UpdatePromptUseCase(mock_prompt_manager) + result = await use_case.execute( + identifier="test-prompt", + description="Only description updated", + ) + + assert result == expected + mock_prompt_manager.update_prompt.assert_called_once_with( + identifier="test-prompt", + content=None, + model_name=None, + description="Only description updated", + metadata=None, + ) + + async def test_execute_not_found_raises(self, mock_prompt_manager): + mock_prompt_manager.update_prompt.side_effect = ValueError("Prompt not found: test-prompt") + + use_case = UpdatePromptUseCase(mock_prompt_manager) + with pytest.raises(ValueError, match="not found"): + await use_case.execute(identifier="test-prompt") + + async def test_execute_propagates_exception(self, mock_prompt_manager): + mock_prompt_manager.update_prompt.side_effect = RuntimeError("Phoenix unavailable") + + use_case = UpdatePromptUseCase(mock_prompt_manager) + with pytest.raises(RuntimeError, match="Phoenix unavailable"): + await use_case.execute(identifier="test-prompt") \ No newline at end of file diff --git a/uv.lock b/uv.lock index aa1ed17..ea4d40c 100644 --- a/uv.lock +++ b/uv.lock @@ -1,5 +1,5 @@ version = 1 -revision = 2 +revision = 3 requires-python = ">=3.11" resolution-markers = [ "python_full_version >= '3.14'", @@ -562,6 +562,7 @@ dependencies = [ { name = "requests" }, { name = "sqlalchemy", extra = ["asyncio"] }, { name = "sse-starlette" }, + { name = "tenacity" }, { name = "uvicorn", extra = ["standard"] }, ] @@ -599,6 +600,7 @@ requires-dist = [ { name = "requests", specifier = ">=2.33.0" }, { name = "sqlalchemy", extras = ["asyncio"], specifier = ">=2.0.0" }, { name = "sse-starlette", specifier = ">=3.2.0" }, + { name = "tenacity", specifier = ">=8.0.0" }, { name = "uvicorn", extras = ["standard"], specifier = ">=0.40.0" }, ]