-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathvector_db.py
More file actions
256 lines (214 loc) · 10.4 KB
/
vector_db.py
File metadata and controls
256 lines (214 loc) · 10.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
# Import necessary libraries for vector database operations
# os: For handling file paths and environment variables
# chromadb: Core library for creating and managing vector databases
# Settings: Configuration class for ChromaDB client
# GoogleGenerativeAIEmbeddings: LangChain wrapper for Google's embedding model to convert text to vectors
# Chroma: LangChain's vector store implementation using ChromaDB
# Document: LangChain's base class for representing text documents with metadata
# uuid: For generating unique identifiers for documents
# datetime: For timestamping when content is added
import os
import chromadb
from chromadb.config import Settings
from langchain_google_genai import GoogleGenerativeAIEmbeddings
from langchain_community.vectorstores import Chroma
from langchain_core.documents import Document
import uuid
from datetime import datetime
class VectorDatabase:
"""
Main class for managing the vector database in PolySensor.
This class handles storing, searching, and retrieving AI-generated analysis results
as vector embeddings, enabling semantic search (Retrieval-Augmented Generation - RAG).
Vectors are numerical representations of text that capture semantic meaning,
allowing similarity searches (e.g., finding related analyses without exact keyword matching).
"""
def __init__(self, persist_directory="./chroma_db"):
"""
Initialize the vector database with ChromaDB.
Args:
persist_directory (str): Path where the database files will be saved persistently.
This ensures data survives app restarts.
What happens here:
1. Create a persistent ChromaDB client that saves data to disk.
2. Set up Google's embedding model to convert text (analysis results) into vectors.
3. Create or retrieve a collection (like a table) to store the vectors.
4. Wrap it with LangChain's Chroma for easier integration with AI chains.
"""
self.persist_directory = persist_directory
self.client = chromadb.PersistentClient(path=persist_directory)
# Initialize embeddings using Google Generative AI
# Embeddings convert text into high-dimensional vectors (e.g., 768 numbers)
# that represent meaning - similar texts have similar vectors
self.embeddings = GoogleGenerativeAIEmbeddings(
model="models/embedding-001", # Google's text embedding model
google_api_key=os.getenv("GOOGLE_API_KEY") # API key from environment for security
)
# Create or get collection - this is like a database table for vectors
# Metadata describes the collection's purpose
self.collection = self.client.get_or_create_collection(
name="polysensor_content", # Unique name for this collection
metadata={"description": "PolySensor analyzed content embeddings"} # Human-readable description
)
# Initialize LangChain Chroma wrapper
# This provides a higher-level API for adding/searching documents
self.vectorstore = Chroma(
client=self.client, # Underlying ChromaDB client
collection_name="polysensor_content", # Matches the collection name
embedding_function=self.embeddings # Function to generate vectors from text
)
def add_content(self, content, metadata=None, content_type="unknown"):
"""
Add content to the vector database.
This method:
1. Generates a unique ID for the document.
2. Enriches metadata with type, timestamp, and ID.
3. Creates a LangChain Document object.
4. Embeds the content (converts to vector) and stores it in the collection.
Args:
content (str): The analysis result text to embed and store.
This is typically AI-generated insights from file processing.
metadata (dict, optional): Extra info like filename, file size, etc.
content_type (str): Category like 'analysis' to filter searches later.
Returns:
str: Unique document ID if successful, None if failed.
Why use vectors? Enables semantic search - e.g., find analyses about "machine learning"
even if the exact words aren't used.
"""
try:
# Generate unique ID - ensures no duplicates
doc_id = str(uuid.uuid4())
# Prepare metadata - dictionary for searchable attributes
if metadata is None:
metadata = {}
# Add standard metadata fields
metadata.update({
"content_type": content_type, # e.g., 'analysis' for AI outputs
"timestamp": datetime.now().isoformat(), # When it was added
"id": doc_id # For retrieval/deletion
})
# Create document - LangChain's way to bundle text + metadata
doc = Document(
page_content=content, # The actual text to embed
metadata=metadata # Attached info for filtering/sorting
)
# Add to vector store - this embeds (vectorizes) and persists the document
self.vectorstore.add_documents([doc]) # List for batch adding
# Log success (in production, use proper logging)
print(f"Document {doc_id} added successfully")
return doc_id
except Exception as e:
# Handle errors like API quota exceeded or invalid content
print(f"Error adding content to vector database: {e}")
return None
def search_similar(self, query, n_results=5, content_type=None):
"""
Search for similar content in the vector database using semantic similarity.
This is key for RAG (Retrieval-Augmented Generation): Find relevant past analyses
to provide context to the AI during chat, improving response quality.
How it works:
1. Embed the query into a vector.
2. Compute cosine similarity to all stored vectors.
3. Return top matches (lower score = more similar, as it's distance).
4. Optional filter by content_type (e.g., only 'analysis' results).
Args:
query (str): Text to search for (e.g., user's chat question).
n_results (int): How many top matches to return (default 5).
content_type (str, optional): Filter results, e.g., 'analysis'.
Returns:
list: Each item is a dict with content, metadata, and similarity_score (0-1, lower=better).
"""
try:
# Prepare search filter if content_type specified
# Filters reduce search space for efficiency
filter_dict = None
if content_type:
filter_dict = {"content_type": content_type}
# Perform similarity search
# k = number of results; score is similarity distance
docs = self.vectorstore.similarity_search_with_score(
query=query, # Text to find matches for
k=n_results, # Limit results
filter=filter_dict # Optional metadata filter
)
# Format results for API response
results = []
for doc, score in docs:
results.append({
"content": doc.page_content, # The matched analysis text
"metadata": doc.metadata, # e.g., filename, timestamp
"similarity_score": float(score) # Distance score (closer to 0 = more similar)
})
return results
except Exception as e:
# Errors like empty DB or embedding failures
print(f"Error searching vector database: {e}")
return []
def get_all_content(self, limit=100):
"""
Get all stored content (for debugging/admin purposes).
Retrieves raw documents without embedding/searching.
Useful for listing all analyses or debugging data issues.
Args:
limit (int): Max documents to fetch (prevents overload on large DBs).
Returns:
dict: LangChain's get() output with ids, documents, metadatas, etc.
"""
try:
# Fetch without similarity - just retrieve by limit
docs = self.vectorstore.get(limit=limit)
return docs
except Exception as e:
print(f"Error retrieving content: {e}")
return []
def delete_content(self, doc_id):
"""
Delete a specific document by its unique ID.
Useful for removing outdated or erroneous analyses.
Note: This permanently removes the vector and metadata.
Args:
doc_id (str): The unique identifier from add_content().
Returns:
bool: True if deleted successfully, False otherwise.
"""
try:
# Delete by list of IDs (single here)
self.vectorstore.delete([doc_id])
print(f"Document {doc_id} deleted successfully")
return True
except Exception as e:
print(f"Error deleting content: {e}")
return False
def get_collection_stats(self):
"""
Get basic statistics about the vector collection.
Helps monitor DB growth and health (e.g., how many analyses stored).
Returns:
dict: Stats like total_documents and collection_name.
"""
try:
# Count documents in the collection
count = self.collection.count()
return {
"total_documents": count, # Number of stored vectors/analyses
"collection_name": "polysensor_content" # Fixed name
}
except Exception as e:
print(f"Error getting collection stats: {e}")
return {"error": str(e)}
# Global instance - Singleton pattern for efficiency
# Ensures only one DB connection throughout the app lifecycle
vector_db = None
def get_vector_db():
"""
Get or create the global vector database instance.
Uses lazy initialization: Creates the DB only when first needed.
This is a singleton pattern - one shared instance across the app.
Returns:
VectorDatabase: The shared instance.
"""
global vector_db
if vector_db is None:
# Initialize on first call
vector_db = VectorDatabase()
return vector_db