diff --git a/app/chat_gpt_client.py b/app/chat_gpt_client.py
index 4ae8e10dc..0dd213cc9 100644
--- a/app/chat_gpt_client.py
+++ b/app/chat_gpt_client.py
@@ -3,34 +3,46 @@
import os
import logging
from typing import List
+import agentops
+from dotenv import load_dotenv
+# Load environment variables
+load_dotenv()
+
+# Initialize AgentOps
+AGENTOPS_API_KEY = os.getenv("AGENTOPS_API_KEY")
+if not AGENTOPS_API_KEY:
+ raise ValueError("AGENTOPS_API_KEY not found in environment variables")
+
+agentops.init(AGENTOPS_API_KEY)
CHAT_GPT_DEFAULT_MODEL = os.getenv("CHAT_GPT_MODEL", "gpt-4o")
CHAT_GPT_DEFAULT_TEMPERATURE = float(os.getenv("CHAT_GPT_TEMPERATURE", "0.7"))
CHAT_GPT_DEFAULT_MAX_TOKENS = int(os.getenv("CHAT_GPT_MAX_TOKENS", "1500"))
-
class MessageRole(str, Enum):
user = "user"
assistant = "assistant"
system = "system"
-
class Message(BaseModel):
role: MessageRole
content: str
-
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
-# Configure OpenAI client
-client = AsyncOpenAI(api_key=os.getenv("OPENAI_API_KEY"))
-if not client.api_key:
- raise ValueError("OPENAI_API_KEY not found in environment variables")
+@agentops.record_function('configure_openai_client')
+def configure_openai_client():
+ client = AsyncOpenAI(api_key=os.getenv("OPENAI_API_KEY"))
+ if not client.api_key:
+ raise ValueError("OPENAI_API_KEY not found in environment variables")
+ return client
+client = configure_openai_client()
+@agentops.record_function('get_chat_response_with_history')
async def get_chat_response_with_history(
messages: List[Message],
system_prompt: str = "You are a helpful assistant that always answers questions.",
@@ -60,5 +72,27 @@ async def get_chat_response_with_history(
)
return response.choices[0].message.content.strip()
except Exception as e:
- logger.error(f"OpenAI API error: {str(e)}")
+ error_message = f"OpenAI API error: {str(e)}"
+ logger.error(error_message)
+ agentops.log_error(error_message)
return f"I'm sorry, but I encountered an error: {str(e)}"
+
+@agentops.record_function('main')
+async def main():
+ # Example usage
+ messages = [
+ Message(role=MessageRole.user, content="Hello, how are you?"),
+ Message(role=MessageRole.assistant, content="I'm doing well, thank you for asking. How can I assist you today?"),
+ Message(role=MessageRole.user, content="Can you tell me about the weather?"),
+ ]
+ response = await get_chat_response_with_history(messages)
+ print(f"Assistant's response: {response}")
+
+if __name__ == "__main__":
+ import asyncio
+ try:
+ asyncio.run(main())
+ except Exception as e:
+ agentops.log_error(str(e))
+ finally:
+ agentops.end_session('Success')
\ No newline at end of file
diff --git a/app/main.py b/app/main.py
index ab1c77bcf..cfa6f56d0 100644
--- a/app/main.py
+++ b/app/main.py
@@ -7,6 +7,7 @@
from typing import List, Dict
import markdown2
import uuid
+import agentops
from app.chat_gpt_client import get_chat_response_with_history, Message, MessageRole
from app.rag_service import RAGService
@@ -15,45 +16,47 @@
# Load environment variables
load_dotenv()
-# Load configuration from environment variables with default values
-CHAT_TITLE = os.getenv(
- "CHAT_TITLE",
- "AI Chat Assistant",
-)
-WELCOME_MESSAGE = os.getenv(
- "WELCOME_MESSAGE",
- "Welcome! How can I assist you today?",
-)
-SYSTEM_PROMPT = os.getenv(
- "SYSTEM_PROMPT",
- "You are a helpful assistant that answers questions based on the given context and chat history.",
-)
-# TODO: Move this to be a Pydantc Field on the AstraDBStore (AstraDBConfig?)
-ASTRA_COLLECTION_NAME = os.getenv("ASTRA_COLLECTION_NAME")
+# Initialize AgentOps
+AGENTOPS_API_KEY = os.getenv("AGENTOPS_API_KEY")
+if not AGENTOPS_API_KEY:
+ raise ValueError("AGENTOPS_API_KEY not found in environment variables")
+agentops.init(AGENTOPS_API_KEY)
-app = FastAPI()
+# Load configuration from environment variables with default values
+CHAT_TITLE = os.getenv("CHAT_TITLE", "AI Chat Assistant")
+WELCOME_MESSAGE = os.getenv("WELCOME_MESSAGE", "Welcome! How can I assist you today?")
+SYSTEM_PROMPT = os.getenv("SYSTEM_PROMPT", "You are a helpful assistant that answers questions based on the given context and chat history.")
+ASTRA_COLLECTION_NAME = os.getenv("ASTRA_COLLECTION_NAME")
-templates_directory = os.path.join(os.path.dirname(__file__), "templates")
-templates = Jinja2Templates(directory=templates_directory)
+@agentops.record_function('create_app')
+def create_app():
+ app = FastAPI()
+
+ templates_directory = os.path.join(os.path.dirname(__file__), "templates")
+ templates = Jinja2Templates(directory=templates_directory)
+
+ static_directory = os.path.join(os.path.dirname(__file__), "static")
+ app.mount("/static", StaticFiles(directory=static_directory), name="static")
+
+ return app, templates
-# Mount the static directory
-static_directory = os.path.join(os.path.dirname(__file__), "static")
-app.mount("/static", StaticFiles(directory=static_directory), name="static")
+app, templates = create_app()
-# Simulating a database with an in-memory list
chat_history: List[Message] = []
-# Get the absolute path to the project root
project_root = os.path.dirname(os.path.abspath(__file__))
-# Initialize RAG service with ChromaDBStore
-chroma_db_path = os.path.join(project_root, "db")
-vector_store = AstraDBStore(collection_name=ASTRA_COLLECTION_NAME)
-rag_service = RAGService(vector_store)
+@agentops.record_function('initialize_rag_service')
+def initialize_rag_service():
+ chroma_db_path = os.path.join(project_root, "db")
+ vector_store = AstraDBStore(collection_name=ASTRA_COLLECTION_NAME)
+ return RAGService(vector_store)
+rag_service = initialize_rag_service()
@app.get("/", response_class=HTMLResponse)
+@agentops.record_function('read_root')
async def read_root(request: Request) -> HTMLResponse:
return templates.TemplateResponse(
"chat.html",
@@ -64,48 +67,59 @@ async def read_root(request: Request) -> HTMLResponse:
},
)
-
@app.post("/chat")
+@agentops.record_function('chat')
async def chat(request: Request, message: str = Form(...)) -> HTMLResponse:
- # Prepare messages with the correct order
- prepared_messages, citations = await rag_service.prepare_messages_with_sources(
- system_prompt=f"{SYSTEM_PROMPT}",
- chat_history=chat_history[-5:], # Last 5 messages for context
- user_message=message,
- )
-
- # Get response from ChatGPT using prepared messages
- bot_response = await get_chat_response_with_history(prepared_messages)
-
- # Render Markdown to HTML (with safety features)
- bot_response_html = markdown2.markdown(bot_response, safe_mode="escape")
-
- # Add user message and bot response to chat history
- chat_history.append(Message(role=MessageRole.user, content=message))
- chat_history.append(Message(role=MessageRole.assistant, content=bot_response))
-
- message_id = str(uuid.uuid4())
-
- response_html = templates.TemplateResponse(
- "bot_message.html",
- {
- "request": request,
- "bot_response_html": bot_response_html,
- "citations": citations,
- "message_id": message_id,
- },
- )
-
- return response_html
-
+ try:
+ prepared_messages, citations = await rag_service.prepare_messages_with_sources(
+ system_prompt=f"{SYSTEM_PROMPT}",
+ chat_history=chat_history[-5:],
+ user_message=message,
+ )
+
+ bot_response = await get_chat_response_with_history(prepared_messages)
+ bot_response_html = markdown2.markdown(bot_response, safe_mode="escape")
+
+ chat_history.append(Message(role=MessageRole.user, content=message))
+ chat_history.append(Message(role=MessageRole.assistant, content=bot_response))
+
+ message_id = str(uuid.uuid4())
+
+ response_html = templates.TemplateResponse(
+ "bot_message.html",
+ {
+ "request": request,
+ "bot_response_html": bot_response_html,
+ "citations": citations,
+ "message_id": message_id,
+ },
+ )
+
+ return response_html
+ except Exception as e:
+ agentops.log_error(f"Error in chat endpoint: {str(e)}")
+ raise
@app.get("/api/chat_history")
+@agentops.record_function('get_chat_history')
async def get_chat_history() -> List[Dict[str, str]]:
return [message.model_dump() for message in chat_history]
-
-# Optional: Add a route to clear chat history (for testing/demo purposes)
@app.post("/api/clear_history")
+@agentops.record_function('clear_history')
async def clear_history() -> Dict[str, str]:
chat_history.clear()
return {"message": "Chat history cleared"}
+
+@agentops.record_function('main')
+def main():
+ import uvicorn
+ uvicorn.run(app, host="0.0.0.0", port=8000)
+
+if __name__ == "__main__":
+ try:
+ main()
+ except Exception as e:
+ agentops.log_error(str(e))
+ finally:
+ agentops.end_session('Success')
\ No newline at end of file
diff --git a/app/rag_service.py b/app/rag_service.py
index bac2770fd..f530a1dc1 100644
--- a/app/rag_service.py
+++ b/app/rag_service.py
@@ -1,59 +1,114 @@
from typing import List, Tuple
+import agentops
from app.vector_store import VectorStore
from app.chat_gpt_client import Message, MessageRole
from app.models import RagCitation
-
class RAGService:
def __init__(self, vector_store: VectorStore):
self.vector_store = vector_store
+ @agentops.record_function('get_relevant_context')
async def get_relevant_context(
self,
query: str,
top_k: int = 5,
) -> Tuple[str, List[RagCitation]]:
- results = await self.vector_store.query(query, top_k)
- context = "\n".join([result.content for result in results])
- citations = [
- RagCitation(source=result.metadata.source, content=result.content)
- for result in results
- ]
- return context, citations
+ try:
+ results = await self.vector_store.query(query, top_k)
+ context = "\n".join([result.content for result in results])
+ citations = [
+ RagCitation(source=result.metadata.source, content=result.content)
+ for result in results
+ ]
+ return context, citations
+ except Exception as e:
+ agentops.log_error(f"Error in get_relevant_context: {str(e)}")
+ raise
+ @agentops.record_function('prepare_messages_with_sources')
async def prepare_messages_with_sources(
self,
system_prompt: str,
chat_history: List[Message],
user_message: str,
) -> Tuple[List[Message], List[RagCitation]]:
- context, citations = await self.get_relevant_context(user_message)
+ try:
+ context, citations = await self.get_relevant_context(user_message)
- prepared_messages = [
- Message(
- role=MessageRole.system,
- content=f"{system_prompt}\n\nRelevant context: {context}",
- ),
- *chat_history,
- Message(role=MessageRole.user, content=user_message),
- ]
+ prepared_messages = [
+ Message(
+ role=MessageRole.system,
+ content=f"{system_prompt}\n\nRelevant context: {context}",
+ ),
+ *chat_history,
+ Message(role=MessageRole.user, content=user_message),
+ ]
- return prepared_messages, citations
+ return prepared_messages, citations
+ except Exception as e:
+ agentops.log_error(f"Error in prepare_messages_with_sources: {str(e)}")
+ raise
- # Keep the original prepare_messages method for backwards compatibility
+ @agentops.record_function('prepare_messages')
async def prepare_messages(
self, system_prompt: str, chat_history: List[Message], user_message: str
) -> List[Message]:
- context, _ = await self.get_relevant_context(user_message)
-
- prepared_messages = [
- Message(
- role=MessageRole.system,
- content=f"{system_prompt}\n\nRelevant context: {context}",
- ),
- *chat_history,
- Message(role=MessageRole.user, content=user_message),
- ]
-
- return prepared_messages
+ try:
+ context, _ = await self.get_relevant_context(user_message)
+
+ prepared_messages = [
+ Message(
+ role=MessageRole.system,
+ content=f"{system_prompt}\n\nRelevant context: {context}",
+ ),
+ *chat_history,
+ Message(role=MessageRole.user, content=user_message),
+ ]
+
+ return prepared_messages
+ except Exception as e:
+ agentops.log_error(f"Error in prepare_messages: {str(e)}")
+ raise
+
+# Example usage and testing
+@agentops.record_function('main')
+async def main():
+ # This is a placeholder for testing purposes
+ # You would typically initialize your vector store and RAGService here
+ from app.vector_store import MockVectorStore
+ vector_store = MockVectorStore()
+ rag_service = RAGService(vector_store)
+
+ # Example usage
+ system_prompt = "You are a helpful assistant."
+ chat_history = [
+ Message(role=MessageRole.user, content="Hello!"),
+ Message(role=MessageRole.assistant, content="Hi there! How can I help you today?"),
+ ]
+ user_message = "Tell me about the weather."
+
+ try:
+ prepared_messages, citations = await rag_service.prepare_messages_with_sources(
+ system_prompt, chat_history, user_message
+ )
+ print("Prepared Messages:", prepared_messages)
+ print("Citations:", citations)
+
+ prepared_messages_without_sources = await rag_service.prepare_messages(
+ system_prompt, chat_history, user_message
+ )
+ print("Prepared Messages (without sources):", prepared_messages_without_sources)
+
+ except Exception as e:
+ print(f"An error occurred: {str(e)}")
+
+if __name__ == "__main__":
+ import asyncio
+ try:
+ asyncio.run(main())
+ except Exception as e:
+ agentops.log_error(str(e))
+ finally:
+ agentops.end_session('Success')
\ No newline at end of file
diff --git a/app/vector_store.py b/app/vector_store.py
index 3ac5bb98f..de28881c4 100644
--- a/app/vector_store.py
+++ b/app/vector_store.py
@@ -9,21 +9,26 @@
from chromadb.config import Settings
import chromadb.utils.embedding_functions as embedding_functions
from dotenv import load_dotenv
+import agentops
# Load environment variables
load_dotenv()
+# Initialize AgentOps
+AGENTOPS_API_KEY = os.getenv("AGENTOPS_API_KEY")
+if not AGENTOPS_API_KEY:
+ raise ValueError("AGENTOPS_API_KEY not found in environment variables")
+
+agentops.init(AGENTOPS_API_KEY)
class VectorStoreMetadata(BaseModel):
score: float = Field(..., description="Relevance score of the document")
source: str = Field(..., description="Source of the document")
-
class VectorStoreResult(BaseModel):
content: str = Field(..., description="Content of the document")
metadata: VectorStoreMetadata
-
class VectorStore(ABC):
@abstractmethod
async def query(self, query: str, top_k: int = 5) -> List[VectorStoreResult]:
@@ -36,7 +41,6 @@ async def query(self, query: str, top_k: int = 5) -> List[VectorStoreResult]:
"""
pass
-
class MockVectorStore(VectorStore):
def __init__(self):
self.lorem_ipsum = [
@@ -52,86 +56,72 @@ def __init__(self):
"Laboris nisi ut aliquip ex ea commodo consequat.",
]
+ @agentops.record_function('MockVectorStore.query')
async def query(self, query: str, top_k: int = 5) -> List[VectorStoreResult]:
- # Simulate a delay to mimic a real database query
- await asyncio.sleep(0.1)
-
- # Randomly select 'top_k' sentences from the lorem ipsum list
- selected_sentences = random.sample(
- self.lorem_ipsum, min(top_k, len(self.lorem_ipsum))
- )
-
- # Create a list of VectorStoreResult objects with the selected sentences
- results = [
- VectorStoreResult(
- content=sentence,
- metadata=VectorStoreMetadata(
- score=round(random.uniform(0.5, 1.0), 2),
- source=f"mock_document_{index_n+1}.txt",
- ),
+ try:
+ await asyncio.sleep(0.1)
+ selected_sentences = random.sample(
+ self.lorem_ipsum, min(top_k, len(self.lorem_ipsum))
)
- for index_n, sentence in enumerate(selected_sentences)
- ]
-
- return results
-
+ results = [
+ VectorStoreResult(
+ content=sentence,
+ metadata=VectorStoreMetadata(
+ score=round(random.uniform(0.5, 1.0), 2),
+ source=f"mock_document_{index_n+1}.txt",
+ ),
+ )
+ for index_n, sentence in enumerate(selected_sentences)
+ ]
+ return results
+ except Exception as e:
+ agentops.log_error(f"Error in MockVectorStore query: {str(e)}")
+ raise
-# Placeholder classes for other vector stores
class PineconeStore(VectorStore):
+ @agentops.record_function('PineconeStore.query')
async def query(self, query: str, top_k: int = 5) -> List[VectorStoreResult]:
# Implement Pinecone-specific query method
pass
-
class ChromaDBStore(VectorStore):
def __init__(self, path: str, collection_name: str = "default_collection"):
self.client = chromadb.PersistentClient(
path=path, settings=Settings(allow_reset=True)
)
-
openai_ef = embedding_functions.OpenAIEmbeddingFunction(
api_key=os.getenv("OPENAI_API_KEY"), model_name="text-embedding-3-small"
)
-
self.collection = self.client.get_or_create_collection(
name=collection_name, embedding_function=openai_ef
)
+ @agentops.record_function('ChromaDBStore.query')
async def query(self, query: str, top_k: int = 5) -> List[VectorStoreResult]:
- results = self.collection.query(query_texts=[query], n_results=top_k)
-
- vector_store_results = []
-
- # Process query results
- # The ChromaDB query method returns results in a specific format:
- # - 'documents', 'metadatas', and 'distances' are lists of lists
- # - The outer list corresponds to the number of queries (in our case, always 1)
- # - The inner lists contain the results for each query
- # We use [0] to access the results of our single query, then zip these lists together
- # enumerate is used to get an index (i) for each result, starting from 0
- for i, (document, metadata, distance) in enumerate(
- zip(
- results["documents"][0],
- results["metadatas"][0],
- results["distances"][0],
- )
- ):
- # Convert distance to a similarity score (assuming cosine distance)
- # Cosine distance ranges from 0 to 2, so we normalize and invert it
- similarity_score = 1 - (distance / 2)
-
- vector_store_results.append(
- VectorStoreResult(
- content=document,
- metadata=VectorStoreMetadata(
- score=similarity_score,
- source=metadata.get("source", f"document_{i}"),
- ),
+ try:
+ results = self.collection.query(query_texts=[query], n_results=top_k)
+ vector_store_results = []
+ for i, (document, metadata, distance) in enumerate(
+ zip(
+ results["documents"][0],
+ results["metadatas"][0],
+ results["distances"][0],
)
- )
-
- return vector_store_results
-
+ ):
+ similarity_score = 1 - (distance / 2)
+ vector_store_results.append(
+ VectorStoreResult(
+ content=document,
+ metadata=VectorStoreMetadata(
+ score=similarity_score,
+ source=metadata.get("source", f"document_{i}"),
+ ),
+ )
+ )
+ return vector_store_results
+ except Exception as e:
+ agentops.log_error(f"Error in ChromaDBStore query: {str(e)}")
+ raise
class AstraDBStore(VectorStore):
def __init__(self, collection_name: str = "default_collection"):
@@ -141,7 +131,6 @@ def __init__(self, collection_name: str = "default_collection"):
raise ValueError(
"ASTRA_DB_ENDPOINT and ASTRA_DB_TOKEN must be set in the environment"
)
-
self.client = DataAPIClient(token=self.astra_db_token)
self.db = self.client.get_database_by_api_endpoint(self.astra_db_endpoint)
self.collection = self.db.get_collection(collection_name)
@@ -151,13 +140,6 @@ def _filter_unique_results(
results: List[Dict[str, Any]],
top_k: int,
) -> List[Dict[str, Any]]:
- """
- Filter results to ensure uniqueness based on document ID.
-
- :param results: List of result dictionaries from AstraDB query
- :param top_k: Maximum number of results to return
- :return: List of unique result dictionaries, up to top_k in length
- """
seen_content = set()
unique_results = []
for result in results:
@@ -169,31 +151,75 @@ def _filter_unique_results(
break
return unique_results
+ @agentops.record_function('AstraDBStore.query')
async def query(self, query: str, top_k: int = 5) -> List[VectorStoreResult]:
- results = self.collection.find(
- sort={"$vectorize": query},
- limit=top_k,
- projection={"$vectorize": True},
- include_similarity=True,
- )
-
- # Filter for unique results
- unique_results = self._filter_unique_results(results, top_k)
-
- vector_store_results = []
- for result in unique_results:
- # Extract the document content and metadata
- # document = result.get("document", {})
- content = result.get("content", "")
- similarity = result.get("$similarity", 0.0)
- metadata = result.get("metadata", {})
- source = metadata.get("source", "Unknown")
-
- vector_store_results.append(
- VectorStoreResult(
- content=content,
- metadata=VectorStoreMetadata(score=similarity, source=source),
- )
+ try:
+ results = self.collection.find(
+ sort={"$vectorize": query},
+ limit=top_k,
+ projection={"$vectorize": True},
+ include_similarity=True,
)
-
- return vector_store_results
+ unique_results = self._filter_unique_results(results, top_k)
+ vector_store_results = []
+ for result in unique_results:
+ content = result.get("content", "")
+ similarity = result.get("$similarity", 0.0)
+ metadata = result.get("metadata", {})
+ source = metadata.get("source", "Unknown")
+ vector_store_results.append(
+ VectorStoreResult(
+ content=content,
+ metadata=VectorStoreMetadata(score=similarity, source=source),
+ )
+ )
+ return vector_store_results
+ except Exception as e:
+ agentops.log_error(f"Error in AstraDBStore query: {str(e)}")
+ raise
+
+@agentops.record_function('main')
+async def main():
+ # This is a placeholder for testing purposes
+ mock_store = MockVectorStore()
+ chroma_store = ChromaDBStore(path="./chromadb")
+ astra_store = AstraDBStore()
+
+ query = "example query"
+ top_k = 3
+
+ try:
+ print("MockVectorStore results:")
+ mock_results = await mock_store.query(query, top_k)
+ for result in mock_results:
+ print(f"Content: {result.content}")
+ print(f"Score: {result.metadata.score}")
+ print(f"Source: {result.metadata.source}")
+ print()
+
+ print("ChromaDBStore results:")
+ chroma_results = await chroma_store.query(query, top_k)
+ for result in chroma_results:
+ print(f"Content: {result.content}")
+ print(f"Score: {result.metadata.score}")
+ print(f"Source: {result.metadata.source}")
+ print()
+
+ print("AstraDBStore results:")
+ astra_results = await astra_store.query(query, top_k)
+ for result in astra_results:
+ print(f"Content: {result.content}")
+ print(f"Score: {result.metadata.score}")
+ print(f"Source: {result.metadata.source}")
+ print()
+
+ except Exception as e:
+ print(f"An error occurred: {str(e)}")
+
+if __name__ == "__main__":
+ try:
+ asyncio.run(main())
+ except Exception as e:
+ agentops.log_error(str(e))
+ finally:
+ agentops.end_session('Success')
\ No newline at end of file