diff --git a/app/chat_gpt_client.py b/app/chat_gpt_client.py index 4ae8e10dc..0dd213cc9 100644 --- a/app/chat_gpt_client.py +++ b/app/chat_gpt_client.py @@ -3,34 +3,46 @@ import os import logging from typing import List +import agentops +from dotenv import load_dotenv +# Load environment variables +load_dotenv() + +# Initialize AgentOps +AGENTOPS_API_KEY = os.getenv("AGENTOPS_API_KEY") +if not AGENTOPS_API_KEY: + raise ValueError("AGENTOPS_API_KEY not found in environment variables") + +agentops.init(AGENTOPS_API_KEY) CHAT_GPT_DEFAULT_MODEL = os.getenv("CHAT_GPT_MODEL", "gpt-4o") CHAT_GPT_DEFAULT_TEMPERATURE = float(os.getenv("CHAT_GPT_TEMPERATURE", "0.7")) CHAT_GPT_DEFAULT_MAX_TOKENS = int(os.getenv("CHAT_GPT_MAX_TOKENS", "1500")) - class MessageRole(str, Enum): user = "user" assistant = "assistant" system = "system" - class Message(BaseModel): role: MessageRole content: str - # Set up logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) -# Configure OpenAI client -client = AsyncOpenAI(api_key=os.getenv("OPENAI_API_KEY")) -if not client.api_key: - raise ValueError("OPENAI_API_KEY not found in environment variables") +@agentops.record_function('configure_openai_client') +def configure_openai_client(): + client = AsyncOpenAI(api_key=os.getenv("OPENAI_API_KEY")) + if not client.api_key: + raise ValueError("OPENAI_API_KEY not found in environment variables") + return client +client = configure_openai_client() +@agentops.record_function('get_chat_response_with_history') async def get_chat_response_with_history( messages: List[Message], system_prompt: str = "You are a helpful assistant that always answers questions.", @@ -60,5 +72,27 @@ async def get_chat_response_with_history( ) return response.choices[0].message.content.strip() except Exception as e: - logger.error(f"OpenAI API error: {str(e)}") + error_message = f"OpenAI API error: {str(e)}" + logger.error(error_message) + agentops.log_error(error_message) return f"I'm sorry, but I encountered an error: {str(e)}" + +@agentops.record_function('main') +async def main(): + # Example usage + messages = [ + Message(role=MessageRole.user, content="Hello, how are you?"), + Message(role=MessageRole.assistant, content="I'm doing well, thank you for asking. How can I assist you today?"), + Message(role=MessageRole.user, content="Can you tell me about the weather?"), + ] + response = await get_chat_response_with_history(messages) + print(f"Assistant's response: {response}") + +if __name__ == "__main__": + import asyncio + try: + asyncio.run(main()) + except Exception as e: + agentops.log_error(str(e)) + finally: + agentops.end_session('Success') \ No newline at end of file diff --git a/app/main.py b/app/main.py index ab1c77bcf..cfa6f56d0 100644 --- a/app/main.py +++ b/app/main.py @@ -7,6 +7,7 @@ from typing import List, Dict import markdown2 import uuid +import agentops from app.chat_gpt_client import get_chat_response_with_history, Message, MessageRole from app.rag_service import RAGService @@ -15,45 +16,47 @@ # Load environment variables load_dotenv() -# Load configuration from environment variables with default values -CHAT_TITLE = os.getenv( - "CHAT_TITLE", - "AI Chat Assistant", -) -WELCOME_MESSAGE = os.getenv( - "WELCOME_MESSAGE", - "Welcome! How can I assist you today?", -) -SYSTEM_PROMPT = os.getenv( - "SYSTEM_PROMPT", - "You are a helpful assistant that answers questions based on the given context and chat history.", -) -# TODO: Move this to be a Pydantc Field on the AstraDBStore (AstraDBConfig?) -ASTRA_COLLECTION_NAME = os.getenv("ASTRA_COLLECTION_NAME") +# Initialize AgentOps +AGENTOPS_API_KEY = os.getenv("AGENTOPS_API_KEY") +if not AGENTOPS_API_KEY: + raise ValueError("AGENTOPS_API_KEY not found in environment variables") +agentops.init(AGENTOPS_API_KEY) -app = FastAPI() +# Load configuration from environment variables with default values +CHAT_TITLE = os.getenv("CHAT_TITLE", "AI Chat Assistant") +WELCOME_MESSAGE = os.getenv("WELCOME_MESSAGE", "Welcome! How can I assist you today?") +SYSTEM_PROMPT = os.getenv("SYSTEM_PROMPT", "You are a helpful assistant that answers questions based on the given context and chat history.") +ASTRA_COLLECTION_NAME = os.getenv("ASTRA_COLLECTION_NAME") -templates_directory = os.path.join(os.path.dirname(__file__), "templates") -templates = Jinja2Templates(directory=templates_directory) +@agentops.record_function('create_app') +def create_app(): + app = FastAPI() + + templates_directory = os.path.join(os.path.dirname(__file__), "templates") + templates = Jinja2Templates(directory=templates_directory) + + static_directory = os.path.join(os.path.dirname(__file__), "static") + app.mount("/static", StaticFiles(directory=static_directory), name="static") + + return app, templates -# Mount the static directory -static_directory = os.path.join(os.path.dirname(__file__), "static") -app.mount("/static", StaticFiles(directory=static_directory), name="static") +app, templates = create_app() -# Simulating a database with an in-memory list chat_history: List[Message] = [] -# Get the absolute path to the project root project_root = os.path.dirname(os.path.abspath(__file__)) -# Initialize RAG service with ChromaDBStore -chroma_db_path = os.path.join(project_root, "db") -vector_store = AstraDBStore(collection_name=ASTRA_COLLECTION_NAME) -rag_service = RAGService(vector_store) +@agentops.record_function('initialize_rag_service') +def initialize_rag_service(): + chroma_db_path = os.path.join(project_root, "db") + vector_store = AstraDBStore(collection_name=ASTRA_COLLECTION_NAME) + return RAGService(vector_store) +rag_service = initialize_rag_service() @app.get("/", response_class=HTMLResponse) +@agentops.record_function('read_root') async def read_root(request: Request) -> HTMLResponse: return templates.TemplateResponse( "chat.html", @@ -64,48 +67,59 @@ async def read_root(request: Request) -> HTMLResponse: }, ) - @app.post("/chat") +@agentops.record_function('chat') async def chat(request: Request, message: str = Form(...)) -> HTMLResponse: - # Prepare messages with the correct order - prepared_messages, citations = await rag_service.prepare_messages_with_sources( - system_prompt=f"{SYSTEM_PROMPT}", - chat_history=chat_history[-5:], # Last 5 messages for context - user_message=message, - ) - - # Get response from ChatGPT using prepared messages - bot_response = await get_chat_response_with_history(prepared_messages) - - # Render Markdown to HTML (with safety features) - bot_response_html = markdown2.markdown(bot_response, safe_mode="escape") - - # Add user message and bot response to chat history - chat_history.append(Message(role=MessageRole.user, content=message)) - chat_history.append(Message(role=MessageRole.assistant, content=bot_response)) - - message_id = str(uuid.uuid4()) - - response_html = templates.TemplateResponse( - "bot_message.html", - { - "request": request, - "bot_response_html": bot_response_html, - "citations": citations, - "message_id": message_id, - }, - ) - - return response_html - + try: + prepared_messages, citations = await rag_service.prepare_messages_with_sources( + system_prompt=f"{SYSTEM_PROMPT}", + chat_history=chat_history[-5:], + user_message=message, + ) + + bot_response = await get_chat_response_with_history(prepared_messages) + bot_response_html = markdown2.markdown(bot_response, safe_mode="escape") + + chat_history.append(Message(role=MessageRole.user, content=message)) + chat_history.append(Message(role=MessageRole.assistant, content=bot_response)) + + message_id = str(uuid.uuid4()) + + response_html = templates.TemplateResponse( + "bot_message.html", + { + "request": request, + "bot_response_html": bot_response_html, + "citations": citations, + "message_id": message_id, + }, + ) + + return response_html + except Exception as e: + agentops.log_error(f"Error in chat endpoint: {str(e)}") + raise @app.get("/api/chat_history") +@agentops.record_function('get_chat_history') async def get_chat_history() -> List[Dict[str, str]]: return [message.model_dump() for message in chat_history] - -# Optional: Add a route to clear chat history (for testing/demo purposes) @app.post("/api/clear_history") +@agentops.record_function('clear_history') async def clear_history() -> Dict[str, str]: chat_history.clear() return {"message": "Chat history cleared"} + +@agentops.record_function('main') +def main(): + import uvicorn + uvicorn.run(app, host="0.0.0.0", port=8000) + +if __name__ == "__main__": + try: + main() + except Exception as e: + agentops.log_error(str(e)) + finally: + agentops.end_session('Success') \ No newline at end of file diff --git a/app/rag_service.py b/app/rag_service.py index bac2770fd..f530a1dc1 100644 --- a/app/rag_service.py +++ b/app/rag_service.py @@ -1,59 +1,114 @@ from typing import List, Tuple +import agentops from app.vector_store import VectorStore from app.chat_gpt_client import Message, MessageRole from app.models import RagCitation - class RAGService: def __init__(self, vector_store: VectorStore): self.vector_store = vector_store + @agentops.record_function('get_relevant_context') async def get_relevant_context( self, query: str, top_k: int = 5, ) -> Tuple[str, List[RagCitation]]: - results = await self.vector_store.query(query, top_k) - context = "\n".join([result.content for result in results]) - citations = [ - RagCitation(source=result.metadata.source, content=result.content) - for result in results - ] - return context, citations + try: + results = await self.vector_store.query(query, top_k) + context = "\n".join([result.content for result in results]) + citations = [ + RagCitation(source=result.metadata.source, content=result.content) + for result in results + ] + return context, citations + except Exception as e: + agentops.log_error(f"Error in get_relevant_context: {str(e)}") + raise + @agentops.record_function('prepare_messages_with_sources') async def prepare_messages_with_sources( self, system_prompt: str, chat_history: List[Message], user_message: str, ) -> Tuple[List[Message], List[RagCitation]]: - context, citations = await self.get_relevant_context(user_message) + try: + context, citations = await self.get_relevant_context(user_message) - prepared_messages = [ - Message( - role=MessageRole.system, - content=f"{system_prompt}\n\nRelevant context: {context}", - ), - *chat_history, - Message(role=MessageRole.user, content=user_message), - ] + prepared_messages = [ + Message( + role=MessageRole.system, + content=f"{system_prompt}\n\nRelevant context: {context}", + ), + *chat_history, + Message(role=MessageRole.user, content=user_message), + ] - return prepared_messages, citations + return prepared_messages, citations + except Exception as e: + agentops.log_error(f"Error in prepare_messages_with_sources: {str(e)}") + raise - # Keep the original prepare_messages method for backwards compatibility + @agentops.record_function('prepare_messages') async def prepare_messages( self, system_prompt: str, chat_history: List[Message], user_message: str ) -> List[Message]: - context, _ = await self.get_relevant_context(user_message) - - prepared_messages = [ - Message( - role=MessageRole.system, - content=f"{system_prompt}\n\nRelevant context: {context}", - ), - *chat_history, - Message(role=MessageRole.user, content=user_message), - ] - - return prepared_messages + try: + context, _ = await self.get_relevant_context(user_message) + + prepared_messages = [ + Message( + role=MessageRole.system, + content=f"{system_prompt}\n\nRelevant context: {context}", + ), + *chat_history, + Message(role=MessageRole.user, content=user_message), + ] + + return prepared_messages + except Exception as e: + agentops.log_error(f"Error in prepare_messages: {str(e)}") + raise + +# Example usage and testing +@agentops.record_function('main') +async def main(): + # This is a placeholder for testing purposes + # You would typically initialize your vector store and RAGService here + from app.vector_store import MockVectorStore + vector_store = MockVectorStore() + rag_service = RAGService(vector_store) + + # Example usage + system_prompt = "You are a helpful assistant." + chat_history = [ + Message(role=MessageRole.user, content="Hello!"), + Message(role=MessageRole.assistant, content="Hi there! How can I help you today?"), + ] + user_message = "Tell me about the weather." + + try: + prepared_messages, citations = await rag_service.prepare_messages_with_sources( + system_prompt, chat_history, user_message + ) + print("Prepared Messages:", prepared_messages) + print("Citations:", citations) + + prepared_messages_without_sources = await rag_service.prepare_messages( + system_prompt, chat_history, user_message + ) + print("Prepared Messages (without sources):", prepared_messages_without_sources) + + except Exception as e: + print(f"An error occurred: {str(e)}") + +if __name__ == "__main__": + import asyncio + try: + asyncio.run(main()) + except Exception as e: + agentops.log_error(str(e)) + finally: + agentops.end_session('Success') \ No newline at end of file diff --git a/app/vector_store.py b/app/vector_store.py index 3ac5bb98f..de28881c4 100644 --- a/app/vector_store.py +++ b/app/vector_store.py @@ -9,21 +9,26 @@ from chromadb.config import Settings import chromadb.utils.embedding_functions as embedding_functions from dotenv import load_dotenv +import agentops # Load environment variables load_dotenv() +# Initialize AgentOps +AGENTOPS_API_KEY = os.getenv("AGENTOPS_API_KEY") +if not AGENTOPS_API_KEY: + raise ValueError("AGENTOPS_API_KEY not found in environment variables") + +agentops.init(AGENTOPS_API_KEY) class VectorStoreMetadata(BaseModel): score: float = Field(..., description="Relevance score of the document") source: str = Field(..., description="Source of the document") - class VectorStoreResult(BaseModel): content: str = Field(..., description="Content of the document") metadata: VectorStoreMetadata - class VectorStore(ABC): @abstractmethod async def query(self, query: str, top_k: int = 5) -> List[VectorStoreResult]: @@ -36,7 +41,6 @@ async def query(self, query: str, top_k: int = 5) -> List[VectorStoreResult]: """ pass - class MockVectorStore(VectorStore): def __init__(self): self.lorem_ipsum = [ @@ -52,86 +56,72 @@ def __init__(self): "Laboris nisi ut aliquip ex ea commodo consequat.", ] + @agentops.record_function('MockVectorStore.query') async def query(self, query: str, top_k: int = 5) -> List[VectorStoreResult]: - # Simulate a delay to mimic a real database query - await asyncio.sleep(0.1) - - # Randomly select 'top_k' sentences from the lorem ipsum list - selected_sentences = random.sample( - self.lorem_ipsum, min(top_k, len(self.lorem_ipsum)) - ) - - # Create a list of VectorStoreResult objects with the selected sentences - results = [ - VectorStoreResult( - content=sentence, - metadata=VectorStoreMetadata( - score=round(random.uniform(0.5, 1.0), 2), - source=f"mock_document_{index_n+1}.txt", - ), + try: + await asyncio.sleep(0.1) + selected_sentences = random.sample( + self.lorem_ipsum, min(top_k, len(self.lorem_ipsum)) ) - for index_n, sentence in enumerate(selected_sentences) - ] - - return results - + results = [ + VectorStoreResult( + content=sentence, + metadata=VectorStoreMetadata( + score=round(random.uniform(0.5, 1.0), 2), + source=f"mock_document_{index_n+1}.txt", + ), + ) + for index_n, sentence in enumerate(selected_sentences) + ] + return results + except Exception as e: + agentops.log_error(f"Error in MockVectorStore query: {str(e)}") + raise -# Placeholder classes for other vector stores class PineconeStore(VectorStore): + @agentops.record_function('PineconeStore.query') async def query(self, query: str, top_k: int = 5) -> List[VectorStoreResult]: # Implement Pinecone-specific query method pass - class ChromaDBStore(VectorStore): def __init__(self, path: str, collection_name: str = "default_collection"): self.client = chromadb.PersistentClient( path=path, settings=Settings(allow_reset=True) ) - openai_ef = embedding_functions.OpenAIEmbeddingFunction( api_key=os.getenv("OPENAI_API_KEY"), model_name="text-embedding-3-small" ) - self.collection = self.client.get_or_create_collection( name=collection_name, embedding_function=openai_ef ) + @agentops.record_function('ChromaDBStore.query') async def query(self, query: str, top_k: int = 5) -> List[VectorStoreResult]: - results = self.collection.query(query_texts=[query], n_results=top_k) - - vector_store_results = [] - - # Process query results - # The ChromaDB query method returns results in a specific format: - # - 'documents', 'metadatas', and 'distances' are lists of lists - # - The outer list corresponds to the number of queries (in our case, always 1) - # - The inner lists contain the results for each query - # We use [0] to access the results of our single query, then zip these lists together - # enumerate is used to get an index (i) for each result, starting from 0 - for i, (document, metadata, distance) in enumerate( - zip( - results["documents"][0], - results["metadatas"][0], - results["distances"][0], - ) - ): - # Convert distance to a similarity score (assuming cosine distance) - # Cosine distance ranges from 0 to 2, so we normalize and invert it - similarity_score = 1 - (distance / 2) - - vector_store_results.append( - VectorStoreResult( - content=document, - metadata=VectorStoreMetadata( - score=similarity_score, - source=metadata.get("source", f"document_{i}"), - ), + try: + results = self.collection.query(query_texts=[query], n_results=top_k) + vector_store_results = [] + for i, (document, metadata, distance) in enumerate( + zip( + results["documents"][0], + results["metadatas"][0], + results["distances"][0], ) - ) - - return vector_store_results - + ): + similarity_score = 1 - (distance / 2) + vector_store_results.append( + VectorStoreResult( + content=document, + metadata=VectorStoreMetadata( + score=similarity_score, + source=metadata.get("source", f"document_{i}"), + ), + ) + ) + return vector_store_results + except Exception as e: + agentops.log_error(f"Error in ChromaDBStore query: {str(e)}") + raise class AstraDBStore(VectorStore): def __init__(self, collection_name: str = "default_collection"): @@ -141,7 +131,6 @@ def __init__(self, collection_name: str = "default_collection"): raise ValueError( "ASTRA_DB_ENDPOINT and ASTRA_DB_TOKEN must be set in the environment" ) - self.client = DataAPIClient(token=self.astra_db_token) self.db = self.client.get_database_by_api_endpoint(self.astra_db_endpoint) self.collection = self.db.get_collection(collection_name) @@ -151,13 +140,6 @@ def _filter_unique_results( results: List[Dict[str, Any]], top_k: int, ) -> List[Dict[str, Any]]: - """ - Filter results to ensure uniqueness based on document ID. - - :param results: List of result dictionaries from AstraDB query - :param top_k: Maximum number of results to return - :return: List of unique result dictionaries, up to top_k in length - """ seen_content = set() unique_results = [] for result in results: @@ -169,31 +151,75 @@ def _filter_unique_results( break return unique_results + @agentops.record_function('AstraDBStore.query') async def query(self, query: str, top_k: int = 5) -> List[VectorStoreResult]: - results = self.collection.find( - sort={"$vectorize": query}, - limit=top_k, - projection={"$vectorize": True}, - include_similarity=True, - ) - - # Filter for unique results - unique_results = self._filter_unique_results(results, top_k) - - vector_store_results = [] - for result in unique_results: - # Extract the document content and metadata - # document = result.get("document", {}) - content = result.get("content", "") - similarity = result.get("$similarity", 0.0) - metadata = result.get("metadata", {}) - source = metadata.get("source", "Unknown") - - vector_store_results.append( - VectorStoreResult( - content=content, - metadata=VectorStoreMetadata(score=similarity, source=source), - ) + try: + results = self.collection.find( + sort={"$vectorize": query}, + limit=top_k, + projection={"$vectorize": True}, + include_similarity=True, ) - - return vector_store_results + unique_results = self._filter_unique_results(results, top_k) + vector_store_results = [] + for result in unique_results: + content = result.get("content", "") + similarity = result.get("$similarity", 0.0) + metadata = result.get("metadata", {}) + source = metadata.get("source", "Unknown") + vector_store_results.append( + VectorStoreResult( + content=content, + metadata=VectorStoreMetadata(score=similarity, source=source), + ) + ) + return vector_store_results + except Exception as e: + agentops.log_error(f"Error in AstraDBStore query: {str(e)}") + raise + +@agentops.record_function('main') +async def main(): + # This is a placeholder for testing purposes + mock_store = MockVectorStore() + chroma_store = ChromaDBStore(path="./chromadb") + astra_store = AstraDBStore() + + query = "example query" + top_k = 3 + + try: + print("MockVectorStore results:") + mock_results = await mock_store.query(query, top_k) + for result in mock_results: + print(f"Content: {result.content}") + print(f"Score: {result.metadata.score}") + print(f"Source: {result.metadata.source}") + print() + + print("ChromaDBStore results:") + chroma_results = await chroma_store.query(query, top_k) + for result in chroma_results: + print(f"Content: {result.content}") + print(f"Score: {result.metadata.score}") + print(f"Source: {result.metadata.source}") + print() + + print("AstraDBStore results:") + astra_results = await astra_store.query(query, top_k) + for result in astra_results: + print(f"Content: {result.content}") + print(f"Score: {result.metadata.score}") + print(f"Source: {result.metadata.source}") + print() + + except Exception as e: + print(f"An error occurred: {str(e)}") + +if __name__ == "__main__": + try: + asyncio.run(main()) + except Exception as e: + agentops.log_error(str(e)) + finally: + agentops.end_session('Success') \ No newline at end of file