Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
185 changes: 91 additions & 94 deletions agents/python/src/lib/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import os
from typing import Any, Dict, List, cast

from copilotkit.langgraph import copilotkit_customize_config, copilotkit_emit_state
from copilotkit.langgraph import copilotkit_emit_state
from langchain.tools import tool
from langchain_core.messages import AIMessage, SystemMessage, ToolMessage
from langchain_core.runnables import RunnableConfig
Expand All @@ -17,7 +17,6 @@
from src.lib.model import get_model
from src.lib.state import AgentState
from src.lib.mcp_integration import search_knowledge_base, get_visualization_iframe
from src.lib.chat import ENABLE_DEEP_QUERIES

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -104,49 +103,40 @@ async def search_node(state: AgentState, config: RunnableConfig):

data_questions = state.get("data_questions", [])

# Separate fast and deep questions for staged execution
# Separate fast and prediction market questions
fast_questions = [q for q in data_questions if isinstance(q, dict) and q.get("search_effort") == "fast"]
deep_questions = [q for q in data_questions if isinstance(q, dict) and q.get("search_effort") == "deep"]
prediction_market_questions = [q for q in data_questions if isinstance(q, dict) and q.get("query_type") == "prediction_market"]

# Filter out deep queries if disabled, BUT always allow prediction_market queries
if not ENABLE_DEEP_QUERIES:
deep_questions = [q for q in deep_questions if q.get("query_type") == "prediction_market"]
# All Tako questions run as fast in Phase 1
all_tako_questions = fast_questions + prediction_market_questions

search_results = []
tako_results = []

# STAGE 1: Run Tako searches (fast + deep) and Tavily web searches in parallel
# PHASE 1: Run all Tako searches (as fast) and Tavily web searches in parallel
# Add logs for all searches
for query in queries:
state["logs"].append({"message": f"Web search: {query}", "done": False})
for q_obj in fast_questions:
for q_obj in all_tako_questions:
state["logs"].append({"message": f"Tako search: {q_obj['question']}", "done": False})
for q_obj in deep_questions:
state["logs"].append({"message": f"Tako deep search: {q_obj['question']}", "done": False})
if queries or fast_questions or deep_questions:
if queries or all_tako_questions:
await copilotkit_emit_state(config, state)

# Build all tasks
# Build all tasks - all Tako searches run as "fast" in Phase 1
tavily_tasks = [async_tavily_search(query) for query in queries]
fast_tako_tasks = [
tako_tasks = [
search_knowledge_base(q["question"], search_effort="fast")
for q in fast_questions
]
deep_tako_tasks = [
search_knowledge_base(q["question"], search_effort="deep")
for q in deep_questions
for q in all_tako_questions
]

all_tasks = tavily_tasks + fast_tako_tasks + deep_tako_tasks
all_tasks = tavily_tasks + tako_tasks
if all_tasks:
all_results = await asyncio.gather(*all_tasks, return_exceptions=True)

# Split results back into tavily, fast tako, and deep tako
# Split results back into tavily and tako
num_tavily = len(tavily_tasks)
num_fast = len(fast_tako_tasks)
tavily_results = all_results[:num_tavily]
fast_tako_results = all_results[num_tavily:num_tavily + num_fast]
deep_tako_results = all_results[num_tavily + num_fast:]
tako_fast_results = all_results[num_tavily:]

# Process Tavily results
for i, result in enumerate(tavily_results):
Expand All @@ -157,73 +147,86 @@ async def search_node(state: AgentState, config: RunnableConfig):
state["logs"][i]["done"] = True
await copilotkit_emit_state(config, state)

# Process fast Tako results
fast_log_offset = num_tavily
for i, result in enumerate(fast_tako_results):
# Process Tako results
tako_log_offset = num_tavily
for i, result in enumerate(tako_fast_results):
if isinstance(result, Exception):
tako_results.append({"error": str(result)})
elif result:
tako_results.extend(result)
state["logs"][fast_log_offset + i]["done"] = True
state["logs"][tako_log_offset + i]["done"] = True
await copilotkit_emit_state(config, state)

# Process deep Tako results (prediction markets, etc.)
deep_log_offset = num_tavily + num_fast
for i, result in enumerate(deep_tako_results):
if isinstance(result, Exception):
tako_results.append({"error": str(result)})
elif result:
# Add resources immediately for streaming
existing_urls = {r.get("url") for r in state["resources"]}
existing_titles = {r.get("title", "").lower() for r in state["resources"] if r.get("resource_type") == "tako_chart"}
for chart in result:
chart_title_lower = chart.get("title", "").lower()
if chart.get("url") not in existing_urls and chart_title_lower not in existing_titles:
iframe_html = await get_visualization_iframe(
item_id=chart.get("id"),
embed_url=chart.get("embed_url")
)
state["resources"].append({
"url": chart["url"],
"title": chart["title"],
"description": chart["description"],
"content": chart["description"],
"resource_type": "tako_chart",
"source": "Tako",
"card_id": chart.get("id"),
"embed_url": chart.get("embed_url"),
"iframe_html": iframe_html,
})
existing_urls.add(chart["url"])
existing_titles.add(chart_title_lower)
tako_results.extend(result)
state["logs"][deep_log_offset + i]["done"] = True
await copilotkit_emit_state(config, state)

logger.info(f"Search completed: {len(search_results)} web results, {len(tako_results)} Tako results")

# STAGE 2: Only run fallback web searches if Tako returned no results
if not tako_results and fast_questions:
logger.info("No Tako results found, falling back to web searches for Tako questions")
fallback_queries = [q["question"] for q in fast_questions[:2]]

for query in fallback_queries:
state["logs"].append({"message": f"Fallback web search: {query}", "done": False})
await copilotkit_emit_state(config, state)

fallback_tasks = [async_tavily_search(query) for query in fallback_queries]
fallback_results = await asyncio.gather(*fallback_tasks, return_exceptions=True)

log_offset = len(state["logs"]) - len(fallback_queries)
for i, result in enumerate(fallback_results):
if isinstance(result, Exception):
search_results.append({"error": str(result)})
else:
search_results.append(result)
state["logs"][log_offset + i]["done"] = True
logger.info(f"Phase 1 completed: {len(search_results)} web results, {len(tako_results)} Tako results")

# PHASE 2: If Tako returned no results, run fallbacks
if not tako_results:
fallback_tasks = []
fallback_logs = []

# Fallback web searches for fast questions
if fast_questions:
logger.info("No Tako results found, falling back to web searches for Tako questions")
fallback_web_queries = [q["question"] for q in fast_questions[:2]]
for query in fallback_web_queries:
fallback_logs.append(("web", query))
state["logs"].append({"message": f"Fallback web search: {query}", "done": False})
fallback_tasks.extend([async_tavily_search(query) for query in fallback_web_queries])

# Deep search for prediction market questions
if prediction_market_questions:
logger.info("Re-running prediction market queries with deep search")
for q_obj in prediction_market_questions:
fallback_logs.append(("deep", q_obj["question"]))
state["logs"].append({"message": f"Tako deep search: {q_obj['question']}", "done": False})
fallback_tasks.extend([
search_knowledge_base(q["question"], search_effort="deep")
for q in prediction_market_questions
])

if fallback_tasks:
await copilotkit_emit_state(config, state)

logger.info(f"Fallback web search completed with {len([r for r in fallback_results if not isinstance(r, Exception)])} results")
fallback_results = await asyncio.gather(*fallback_tasks, return_exceptions=True)

log_offset = len(state["logs"]) - len(fallback_tasks)
for i, result in enumerate(fallback_results):
task_type, _ = fallback_logs[i]
if isinstance(result, Exception):
if task_type == "web":
search_results.append({"error": str(result)})
else:
tako_results.append({"error": str(result)})
elif task_type == "web":
search_results.append(result)
elif result: # deep Tako result
# Add resources immediately for streaming
existing_urls = {r.get("url") for r in state["resources"]}
existing_titles = {r.get("title", "").lower() for r in state["resources"] if r.get("resource_type") == "tako_chart"}
for chart in result:
chart_title_lower = chart.get("title", "").lower()
if chart.get("url") not in existing_urls and chart_title_lower not in existing_titles:
iframe_html = await get_visualization_iframe(
item_id=chart.get("id"),
embed_url=chart.get("embed_url")
)
state["resources"].append({
"url": chart["url"],
"title": chart["title"],
"description": chart["description"],
"content": chart["description"],
"resource_type": "tako_chart",
"source": "Tako",
"card_id": chart.get("id"),
"embed_url": chart.get("embed_url"),
"iframe_html": iframe_html,
})
existing_urls.add(chart["url"])
existing_titles.add(chart_title_lower)
tako_results.extend(result)
state["logs"][log_offset + i]["done"] = True
await copilotkit_emit_state(config, state)

logger.info("Phase 2 fallback completed")

# Deduplicate Tako charts by title (same chart may appear in multiple searches)
seen_titles = {}
Expand All @@ -243,16 +246,10 @@ async def search_node(state: AgentState, config: RunnableConfig):
if duplicates_removed > 0:
logger.info(f"Removed {duplicates_removed} duplicate Tako charts by title")

config = copilotkit_customize_config(
config,
emit_intermediate_state=[
{
"state_key": "resources",
"tool": "ExtractResources",
"tool_argument": "resources",
}
],
)
# Note: We don't use emit_intermediate_state for resources here because
# we manually manage and emit resources throughout the search process.
# Using emit_intermediate_state would replace accumulated resources with
# just the newly selected ones, causing flicker.

model = get_model(state)
ainvoke_kwargs = {}
Expand Down