Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion agents/python/src/lib/chat.py
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,7 @@ async def replace_marker(match):
goto="chat_node",
update={
"research_question": research_question,
"resources": state.get("resources", []), # Preserve resources
"messages": [
ai_message,
ToolMessage(
Expand Down Expand Up @@ -421,6 +422,7 @@ async def replace_marker(match):
goto="search_node",
update={
"data_questions": data_questions,
"resources": state.get("resources", []), # Preserve resources
"messages": [
ai_message,
ToolMessage(
Expand All @@ -432,4 +434,4 @@ async def replace_marker(match):
)

logger.info(f"=== CHAT_NODE: Routing to {goto} ===")
return Command(goto=goto, update={"messages": response})
return Command(goto=goto, update={"messages": response, "resources": state.get("resources", [])})
143 changes: 70 additions & 73 deletions agents/python/src/lib/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,122 +112,119 @@ async def search_node(state: AgentState, config: RunnableConfig):
if not ENABLE_DEEP_QUERIES:
deep_questions = [q for q in deep_questions if q.get("query_type") == "prediction_market"]

# Add logs for both web and Tako searches
for query in queries:
state["logs"].append({"message": f"Web search for {query}", "done": False})
search_results = []
tako_results = []

# STAGE 1: Run Tako searches (fast + deep) and Tavily web searches in parallel
# Add logs for all searches
for query in queries:
state["logs"].append({"message": f"Web search: {query}", "done": False})
for q_obj in fast_questions:
state["logs"].append({"message": f"Tako fast search: {q_obj['question']}", "done": False})

state["logs"].append({"message": f"Tako search: {q_obj['question']}", "done": False})
for q_obj in deep_questions:
state["logs"].append({"message": f"Tako deep search: {q_obj['question']}", "done": False})
if queries or fast_questions or deep_questions:
await copilotkit_emit_state(config, state)

await copilotkit_emit_state(config, state)

search_results = []
tako_results = []

# Run Tavily web search and Tako knowledge search in parallel
# Build all tasks
tavily_tasks = [async_tavily_search(query) for query in queries]
fast_tako_tasks = [
search_knowledge_base(q["question"], search_effort="fast")
for q in fast_questions
]
deep_tako_tasks = [
search_knowledge_base(q["question"], search_effort="deep")
for q in deep_questions
]

fast_tasks = tavily_tasks + fast_tako_tasks
fast_results = await asyncio.gather(*fast_tasks, return_exceptions=True)

# Split results back into Tavily and Tako fast
num_tavily = len(tavily_tasks)
tavily_results = fast_results[:num_tavily]
fast_tako_results = fast_results[num_tavily:]

# Process Tavily results
for i, result in enumerate(tavily_results):
if isinstance(result, Exception):
search_results.append({"error": str(result)})
else:
search_results.append(result)
state["logs"][i]["done"] = True
await copilotkit_emit_state(config, state)

# Process fast Tako results
for i, result in enumerate(fast_tako_results):
log_index = num_tavily + i
question = fast_questions[i]["question"]
if isinstance(result, Exception):
tako_results.append({"error": str(result)})
elif result: # Tako returned results
tako_results.extend(result)
state["logs"][log_index]["done"] = True
await copilotkit_emit_state(config, state)

# RETRY LOGIC: If no Tako results found, retry with 2 additional web searches
if not tako_results and fast_questions:
logger.info("No Tako results found, retrying with web searches")
# Use up to 2 questions for web search fallback
fallback_queries = [q["question"] for q in fast_questions[:2]]

for query in fallback_queries:
state["logs"].append({"message": f"Tako Web Search: {query}", "done": False})
await copilotkit_emit_state(config, state)
all_tasks = tavily_tasks + fast_tako_tasks + deep_tako_tasks
if all_tasks:
all_results = await asyncio.gather(*all_tasks, return_exceptions=True)

fallback_tasks = [async_tavily_search(query) for query in fallback_queries]
fallback_results = await asyncio.gather(*fallback_tasks, return_exceptions=True)
# Split results back into tavily, fast tako, and deep tako
num_tavily = len(tavily_tasks)
num_fast = len(fast_tako_tasks)
tavily_results = all_results[:num_tavily]
fast_tako_results = all_results[num_tavily:num_tavily + num_fast]
deep_tako_results = all_results[num_tavily + num_fast:]

for i, result in enumerate(fallback_results):
# Process Tavily results
for i, result in enumerate(tavily_results):
if isinstance(result, Exception):
search_results.append({"error": str(result)})
else:
search_results.append(result)
# Mark the fallback log as done
state["logs"][-(len(fallback_queries) - i)]["done"] = True
state["logs"][i]["done"] = True
await copilotkit_emit_state(config, state)

logger.info(f"Fallback web search completed with {len([r for r in fallback_results if not isinstance(r, Exception)])} results")
# Process fast Tako results
fast_log_offset = num_tavily
for i, result in enumerate(fast_tako_results):
if isinstance(result, Exception):
tako_results.append({"error": str(result)})
elif result:
tako_results.extend(result)
state["logs"][fast_log_offset + i]["done"] = True
await copilotkit_emit_state(config, state)

for i, q_obj in enumerate(deep_questions):
log_index = num_tavily + len(fast_questions) + i
question = q_obj["question"]
try:
result = await search_knowledge_base(question, search_effort="deep")
if result: # Tako returned results
# STREAM: Add resources immediately (incremental emission)
# This allows frontend to show charts as they arrive
# Process deep Tako results (prediction markets, etc.)
deep_log_offset = num_tavily + num_fast
for i, result in enumerate(deep_tako_results):
if isinstance(result, Exception):
tako_results.append({"error": str(result)})
elif result:
# Add resources immediately for streaming
existing_urls = {r.get("url") for r in state["resources"]}
existing_titles = {r.get("title", "").lower() for r in state["resources"] if r.get("resource_type") == "tako_chart"}
for chart in result:
chart_title_lower = chart.get("title", "").lower()
if chart.get("url") not in existing_urls and chart_title_lower not in existing_titles:
# Generate iframe HTML for preview modal
iframe_html = await get_visualization_iframe(
item_id=chart.get("id"),
embed_url=chart.get("embed_url")
)

state["resources"].append({
"url": chart["url"],
"title": chart["title"],
"description": chart["description"],
"content": chart["description"], # Use description as content
"content": chart["description"],
"resource_type": "tako_chart",
"source": "Tako",
"card_id": chart.get("id"), # Use card_id for frontend
"card_id": chart.get("id"),
"embed_url": chart.get("embed_url"),
"iframe_html": iframe_html, # Include iframe HTML for preview
"iframe_html": iframe_html,
})
existing_urls.add(chart["url"])
existing_titles.add(chart_title_lower)

tako_results.extend(result)
state["logs"][deep_log_offset + i]["done"] = True
await copilotkit_emit_state(config, state)

logger.info(f"Search completed: {len(search_results)} web results, {len(tako_results)} Tako results")

# STAGE 2: Only run fallback web searches if Tako returned no results
if not tako_results and fast_questions:
logger.info("No Tako results found, falling back to web searches for Tako questions")
fallback_queries = [q["question"] for q in fast_questions[:2]]

state["logs"][log_index]["done"] = True
await copilotkit_emit_state(config, state) # Stream progress + new resources
except Exception as e:
tako_results.append({"error": str(e)})
state["logs"][log_index]["done"] = True
for query in fallback_queries:
state["logs"].append({"message": f"Fallback web search: {query}", "done": False})
await copilotkit_emit_state(config, state)

fallback_tasks = [async_tavily_search(query) for query in fallback_queries]
fallback_results = await asyncio.gather(*fallback_tasks, return_exceptions=True)

log_offset = len(state["logs"]) - len(fallback_queries)
for i, result in enumerate(fallback_results):
if isinstance(result, Exception):
search_results.append({"error": str(result)})
else:
search_results.append(result)
state["logs"][log_offset + i]["done"] = True
await copilotkit_emit_state(config, state)

logger.info(f"Fallback web search completed with {len([r for r in fallback_results if not isinstance(r, Exception)])} results")

# Deduplicate Tako charts by title (same chart may appear in multiple searches)
seen_titles = {}
deduped_tako = []
Expand Down