-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdocument_agent.py
More file actions
346 lines (280 loc) · 13.3 KB
/
Copy pathdocument_agent.py
File metadata and controls
346 lines (280 loc) · 13.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
"""Document Agent node for LangGraph multi-agent orchestration."""
from typing import Dict, Any, List, Optional
import sys
import os
import hashlib
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
from sqlalchemy import text
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
import chromadb
from ..state import MultiAgentState, AgentResponse
from config.database import get_db_session
# Initialize ChromaDB client directly (simpler approach)
_chroma_client = None
# Simple in-memory cache for Document agent results
# Key: hash of (question, company_id), Value: result dict
_doc_cache: Dict[str, dict] = {}
_CACHE_MAX_SIZE = 50 # Maximum number of cached entries
def _get_cache_key(question: str, company_id: int) -> str:
"""Generate a cache key from question and company_id."""
cache_string = f"{company_id}:{question.lower().strip()}"
return hashlib.md5(cache_string.encode()).hexdigest()
def _get_cached_result(question: str, company_id: int) -> Optional[dict]:
"""Check if we have a cached result for this query."""
cache_key = _get_cache_key(question, company_id)
if cache_key in _doc_cache:
print(f"✅ CACHE HIT: Returning cached document result")
return _doc_cache[cache_key]
return None
def _cache_result(question: str, company_id: int, result: dict) -> None:
"""Cache the result for future use."""
global _doc_cache
# Evict oldest entries if cache is full
if len(_doc_cache) >= _CACHE_MAX_SIZE:
# Remove first 10 entries (simple eviction)
keys_to_remove = list(_doc_cache.keys())[:10]
for key in keys_to_remove:
del _doc_cache[key]
print(f"🗑️ CACHE EVICTION: Removed {len(keys_to_remove)} old entries")
cache_key = _get_cache_key(question, company_id)
_doc_cache[cache_key] = result
print(f"💾 CACHE STORE: Saved document result (cache size: {len(_doc_cache)})")
def _get_chroma_client():
"""Get ChromaDB client (lazy initialization)."""
global _chroma_client
if _chroma_client is None:
chroma_path = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))), 'database', 'chromadb', 'data')
_chroma_client = chromadb.PersistentClient(path=chroma_path)
return _chroma_client
def _retrieve_company_documents(company_id: int) -> List[dict]:
"""Retrieve all documents for a company."""
sql = """
SELECT d.id, d.metadata->>'filename' as filename, d.metadata->>'content_type' as content_type,
d.metadata->>'file_size' as file_size, d.parsed_content, d.document_summary,
d.bucket_name, d.object_name, d.created_at
FROM public.documents_01_14 d
JOIN public.companies_documents_join cdj ON d.id = cdj.attachment_id
WHERE cdj.company_id = :company_id ORDER BY d.created_at DESC
"""
session = get_db_session()
try:
result = session.execute(text(sql), {"company_id": company_id})
documents = []
for row in result:
doc = {"id": row[0], "filename": row[1], "content_type": row[2], "file_size": row[3],
"parsed_content": row[4], "document_summary": row[5], "bucket_name": row[6],
"object_name": row[7], "created_at": str(row[8]) if row[8] else None}
documents.append(doc)
return documents
finally:
session.close()
def _search_document_content(documents: List[dict], search_terms: List[str]) -> List[dict]:
"""Search for terms within parsed document content."""
results = []
for doc in documents:
content = doc.get("parsed_content")
if not content:
continue
content_lower = content.lower()
matches = []
for term in search_terms:
if term.lower() in content_lower:
idx = content_lower.find(term.lower())
start, end = max(0, idx - 100), min(len(content), idx + len(term) + 100)
matches.append({"term": term, "snippet": f"...{content[start:end]}..."})
if matches:
results.append({"id": doc["id"], "filename": doc["filename"], "content_type": doc["content_type"],
"matches": matches, "full_content": content[:2000] if len(content) > 2000 else content})
return results
def _get_all_company_documents_from_chromadb(company_id: int) -> List[dict]:
"""
Get ALL document summaries for a company from ChromaDB.
Simple approach: No similarity filtering - just get all documents for the company
and let the LLM find the relevant information.
This works because:
- Each company has ~5-28 documents max
- All summaries fit easily in LLM context (~7k tokens worst case)
- LLM is better at finding specific info than vector similarity
Args:
company_id: Company ID to filter by
Returns:
List of all document summaries for the company
"""
try:
client = _get_chroma_client()
collection = client.get_collection('document_summaries')
# Get ALL documents for this company (no similarity threshold)
results = collection.get(
where={'company_id': company_id},
include=['documents', 'metadatas']
)
# Format results
documents = []
if results['ids']:
for i, doc_id in enumerate(results['ids']):
metadata = results['metadatas'][i]
documents.append({
'document_id': metadata.get('document_id'),
'filename': metadata.get('filename', 'Unknown'),
'content_type': metadata.get('content_type', 'Unknown'),
'summary': results['documents'][i], # The actual summary text
'type': metadata.get('type', 'summary')
})
return documents
except Exception as e:
print(f"ChromaDB retrieval failed: {e}")
return []
def _format_company_documents(documents: List[dict]) -> str:
"""Format all company documents for LLM context."""
if not documents:
return "No documents found for this company."
formatted = f"**All Documents for this Company ({len(documents)} total):**\n\n"
for i, doc in enumerate(documents, 1):
formatted += f"**Document {i}: {doc['filename']}**\n"
formatted += f"- Type: {doc['content_type']}\n"
formatted += f"- Document ID: {doc.get('document_id', 'N/A')}\n"
# Include the full summary
if doc.get("summary"):
formatted += f"\n**Summary:**\n{doc['summary']}\n"
formatted += "\n" + "-"*60 + "\n\n"
return formatted
def _format_documents_for_llm(documents: List[dict], with_content: bool = False) -> str:
"""Format documents for LLM (OLD - kept for fallback)."""
if not documents:
return "No documents found for this company."
parts = []
parsed_docs = [d for d in documents if d.get("parsed_content")]
metadata_only = [d for d in documents if not d.get("parsed_content")]
if parsed_docs:
parts.append(f"**Documents with searchable content ({len(parsed_docs)}):**")
for doc in parsed_docs[:5]:
parts.append(f"- {doc['filename']} ({doc['content_type']})")
if doc.get("document_summary"):
parts.append(f" Summary: {doc['document_summary'][:200]}...")
if with_content and doc.get("parsed_content"):
parts.append(f" Content preview: {doc['parsed_content'][:500]}...")
if metadata_only:
parts.append(f"\n**Documents with metadata only ({len(metadata_only)}):**")
for doc in metadata_only[:5]:
parts.append(f"- {doc['filename']} ({doc['content_type']})")
return "\n".join(parts)
def _format_document_agent_memory(memory: List[dict]) -> str:
"""Format document agent's memory for context."""
if not memory:
return "No previous document searches."
context_parts = []
for i, entry in enumerate(memory[-3:], 1):
q = entry.get('question', '')[:80]
result = entry.get('answer', '')[:100]
context_parts.append(f"[{i}] Question: {q}... -> {result}")
return "\n".join(context_parts)
def document_agent_node(state: MultiAgentState) -> Dict[str, Any]:
"""Document Agent node - retrieves all company documents and lets LLM find the answer."""
print("\n" + "="*60)
print("DOCUMENT AGENT NODE - Company Document Retrieval")
print("="*60)
company_id = state["company_id"]
question = state["user_question"]
# Use document agent's own memory
document_agent_memory = state.get("document_agent_memory", [])
print(f"Document Agent memory entries: {len(document_agent_memory)}")
# Check cache first
cached_result = _get_cached_result(question, company_id)
if cached_result:
print(f"Document Agent completed (FROM CACHE)")
print("="*60 + "\n")
agent_response = AgentResponse(
agent_name="document_agent",
content=cached_result.get("natural_response", ""),
data=None,
sql=None,
documents=cached_result.get("documents", []),
confidence=cached_result.get("confidence", 0.85),
error=None
)
return {
"agent_responses": [agent_response],
"retrieved_documents": cached_result.get("documents", []),
"document_summary": cached_result.get("natural_response", ""),
"execution_path": ["document_agent (cached)"],
"document_agent_memory": document_agent_memory # No new memory entry for cached results
}
# Cache miss - proceed with document retrieval
print("🔍 CACHE MISS: Retrieving documents...")
print(f"Retrieving all documents for company {company_id}...")
print(f"Question: {question}")
# Get ALL documents for this company from ChromaDB (no similarity filtering)
company_docs = _get_all_company_documents_from_chromadb(company_id)
print(f"Found {len(company_docs)} documents for company")
# Log documents for trace
if company_docs:
print("\n📄 COMPANY DOCUMENTS:")
print("="*60)
for i, doc in enumerate(company_docs, 1):
print(f"\n📄 Document {i}: {doc['filename']}")
print(f" Type: {doc.get('content_type', 'N/A')}")
print(f" Document ID: {doc.get('document_id', 'N/A')}")
if doc.get("summary"):
summary_preview = doc["summary"][:300]
print(f" Summary preview: {summary_preview}...")
print("-"*40)
# Format all documents for LLM
docs_info = _format_company_documents(company_docs)
print(f"\n📝 Formatted docs info length: {len(docs_info)} characters")
# Also retrieve full document metadata from PostgreSQL
documents = _retrieve_company_documents(company_id)
print("Generating natural language response...")
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0.3)
prompt = ChatPromptTemplate.from_messages([
("system", """You are a document analyst for Harper Insurance. You have access to ALL document summaries for this company.
## YOUR TASK:
Search through the document summaries below and find the answer to the user's question.
## DOCUMENTS:
{documents}
## INSTRUCTIONS:
1. Read through ALL the document summaries above
2. Find the specific information the user is asking about
3. Quote the exact values from the summaries (premiums, dates, coverage types, etc.)
4. Cite which document contains the information
## RESPONSE FORMAT:
- Start with the direct answer
- Quote the specific text from the document
- Reference the document filename
## EXAMPLE:
User: "What is the premium?"
Response: "According to **NXTKCJ99LW-00-GL-policy-0000.pdf**, the premium is **$1,036.00**. The document states: 'Premium: $1,036.00'"
## IMPORTANT:
- The answer IS in the documents - search carefully
- If you truly cannot find the answer, say which document might contain it"""),
("human", "{question}")
])
try:
chain = prompt | llm
response = chain.invoke({"documents": docs_info, "question": question})
natural_response = response.content
except Exception as e:
print(f"LLM response generation failed: {e}")
natural_response = docs_info if company_docs else "No documents found for this company."
# Set confidence based on whether we found documents
confidence = 0.85 if company_docs else 0.3
# Cache the result for future use
cache_data = {
"natural_response": natural_response,
"documents": documents,
"confidence": confidence
}
_cache_result(question, company_id, cache_data)
agent_response = AgentResponse(agent_name="document_agent", content=natural_response, data=None, sql=None,
documents=documents, confidence=confidence, error=None)
print(f"Document Agent completed (confidence: {confidence})")
print("="*60 + "\n")
# Create memory entry for this document retrieval
result_summary = f"Found {len(company_docs)} docs; Response: {natural_response[:100]}..."
memory_entry = {
"question": question,
"answer": result_summary
}
return {"agent_responses": [agent_response], "retrieved_documents": documents,
"document_summary": natural_response, "execution_path": ["document_agent"],
"document_agent_memory": document_agent_memory + [memory_entry]}