From 6ecff592b5eecfe90f7dd127b11751808f875c9c Mon Sep 17 00:00:00 2001 From: bobby abbott Date: Tue, 27 Jan 2026 23:06:30 -0800 Subject: [PATCH] Don't flicker resources --- agents/python/src/lib/search.py | 185 ++++++++++++++++---------------- 1 file changed, 91 insertions(+), 94 deletions(-) diff --git a/agents/python/src/lib/search.py b/agents/python/src/lib/search.py index 98bd01a..ce1999c 100644 --- a/agents/python/src/lib/search.py +++ b/agents/python/src/lib/search.py @@ -7,7 +7,7 @@ import os from typing import Any, Dict, List, cast -from copilotkit.langgraph import copilotkit_customize_config, copilotkit_emit_state +from copilotkit.langgraph import copilotkit_emit_state from langchain.tools import tool from langchain_core.messages import AIMessage, SystemMessage, ToolMessage from langchain_core.runnables import RunnableConfig @@ -17,7 +17,6 @@ from src.lib.model import get_model from src.lib.state import AgentState from src.lib.mcp_integration import search_knowledge_base, get_visualization_iframe -from src.lib.chat import ENABLE_DEEP_QUERIES logger = logging.getLogger(__name__) @@ -104,49 +103,40 @@ async def search_node(state: AgentState, config: RunnableConfig): data_questions = state.get("data_questions", []) - # Separate fast and deep questions for staged execution + # Separate fast and prediction market questions fast_questions = [q for q in data_questions if isinstance(q, dict) and q.get("search_effort") == "fast"] - deep_questions = [q for q in data_questions if isinstance(q, dict) and q.get("search_effort") == "deep"] + prediction_market_questions = [q for q in data_questions if isinstance(q, dict) and q.get("query_type") == "prediction_market"] - # Filter out deep queries if disabled, BUT always allow prediction_market queries - if not ENABLE_DEEP_QUERIES: - deep_questions = [q for q in deep_questions if q.get("query_type") == "prediction_market"] + # All Tako questions run as fast in Phase 1 + all_tako_questions = fast_questions + prediction_market_questions search_results = [] tako_results = [] - # STAGE 1: Run Tako searches (fast + deep) and Tavily web searches in parallel + # PHASE 1: Run all Tako searches (as fast) and Tavily web searches in parallel # Add logs for all searches for query in queries: state["logs"].append({"message": f"Web search: {query}", "done": False}) - for q_obj in fast_questions: + for q_obj in all_tako_questions: state["logs"].append({"message": f"Tako search: {q_obj['question']}", "done": False}) - for q_obj in deep_questions: - state["logs"].append({"message": f"Tako deep search: {q_obj['question']}", "done": False}) - if queries or fast_questions or deep_questions: + if queries or all_tako_questions: await copilotkit_emit_state(config, state) - # Build all tasks + # Build all tasks - all Tako searches run as "fast" in Phase 1 tavily_tasks = [async_tavily_search(query) for query in queries] - fast_tako_tasks = [ + tako_tasks = [ search_knowledge_base(q["question"], search_effort="fast") - for q in fast_questions - ] - deep_tako_tasks = [ - search_knowledge_base(q["question"], search_effort="deep") - for q in deep_questions + for q in all_tako_questions ] - all_tasks = tavily_tasks + fast_tako_tasks + deep_tako_tasks + all_tasks = tavily_tasks + tako_tasks if all_tasks: all_results = await asyncio.gather(*all_tasks, return_exceptions=True) - # Split results back into tavily, fast tako, and deep tako + # Split results back into tavily and tako num_tavily = len(tavily_tasks) - num_fast = len(fast_tako_tasks) tavily_results = all_results[:num_tavily] - fast_tako_results = all_results[num_tavily:num_tavily + num_fast] - deep_tako_results = all_results[num_tavily + num_fast:] + tako_fast_results = all_results[num_tavily:] # Process Tavily results for i, result in enumerate(tavily_results): @@ -157,73 +147,86 @@ async def search_node(state: AgentState, config: RunnableConfig): state["logs"][i]["done"] = True await copilotkit_emit_state(config, state) - # Process fast Tako results - fast_log_offset = num_tavily - for i, result in enumerate(fast_tako_results): + # Process Tako results + tako_log_offset = num_tavily + for i, result in enumerate(tako_fast_results): if isinstance(result, Exception): tako_results.append({"error": str(result)}) elif result: tako_results.extend(result) - state["logs"][fast_log_offset + i]["done"] = True + state["logs"][tako_log_offset + i]["done"] = True await copilotkit_emit_state(config, state) - # Process deep Tako results (prediction markets, etc.) - deep_log_offset = num_tavily + num_fast - for i, result in enumerate(deep_tako_results): - if isinstance(result, Exception): - tako_results.append({"error": str(result)}) - elif result: - # Add resources immediately for streaming - existing_urls = {r.get("url") for r in state["resources"]} - existing_titles = {r.get("title", "").lower() for r in state["resources"] if r.get("resource_type") == "tako_chart"} - for chart in result: - chart_title_lower = chart.get("title", "").lower() - if chart.get("url") not in existing_urls and chart_title_lower not in existing_titles: - iframe_html = await get_visualization_iframe( - item_id=chart.get("id"), - embed_url=chart.get("embed_url") - ) - state["resources"].append({ - "url": chart["url"], - "title": chart["title"], - "description": chart["description"], - "content": chart["description"], - "resource_type": "tako_chart", - "source": "Tako", - "card_id": chart.get("id"), - "embed_url": chart.get("embed_url"), - "iframe_html": iframe_html, - }) - existing_urls.add(chart["url"]) - existing_titles.add(chart_title_lower) - tako_results.extend(result) - state["logs"][deep_log_offset + i]["done"] = True - await copilotkit_emit_state(config, state) - - logger.info(f"Search completed: {len(search_results)} web results, {len(tako_results)} Tako results") - - # STAGE 2: Only run fallback web searches if Tako returned no results - if not tako_results and fast_questions: - logger.info("No Tako results found, falling back to web searches for Tako questions") - fallback_queries = [q["question"] for q in fast_questions[:2]] - - for query in fallback_queries: - state["logs"].append({"message": f"Fallback web search: {query}", "done": False}) - await copilotkit_emit_state(config, state) - - fallback_tasks = [async_tavily_search(query) for query in fallback_queries] - fallback_results = await asyncio.gather(*fallback_tasks, return_exceptions=True) - - log_offset = len(state["logs"]) - len(fallback_queries) - for i, result in enumerate(fallback_results): - if isinstance(result, Exception): - search_results.append({"error": str(result)}) - else: - search_results.append(result) - state["logs"][log_offset + i]["done"] = True + logger.info(f"Phase 1 completed: {len(search_results)} web results, {len(tako_results)} Tako results") + + # PHASE 2: If Tako returned no results, run fallbacks + if not tako_results: + fallback_tasks = [] + fallback_logs = [] + + # Fallback web searches for fast questions + if fast_questions: + logger.info("No Tako results found, falling back to web searches for Tako questions") + fallback_web_queries = [q["question"] for q in fast_questions[:2]] + for query in fallback_web_queries: + fallback_logs.append(("web", query)) + state["logs"].append({"message": f"Fallback web search: {query}", "done": False}) + fallback_tasks.extend([async_tavily_search(query) for query in fallback_web_queries]) + + # Deep search for prediction market questions + if prediction_market_questions: + logger.info("Re-running prediction market queries with deep search") + for q_obj in prediction_market_questions: + fallback_logs.append(("deep", q_obj["question"])) + state["logs"].append({"message": f"Tako deep search: {q_obj['question']}", "done": False}) + fallback_tasks.extend([ + search_knowledge_base(q["question"], search_effort="deep") + for q in prediction_market_questions + ]) + + if fallback_tasks: await copilotkit_emit_state(config, state) - - logger.info(f"Fallback web search completed with {len([r for r in fallback_results if not isinstance(r, Exception)])} results") + fallback_results = await asyncio.gather(*fallback_tasks, return_exceptions=True) + + log_offset = len(state["logs"]) - len(fallback_tasks) + for i, result in enumerate(fallback_results): + task_type, _ = fallback_logs[i] + if isinstance(result, Exception): + if task_type == "web": + search_results.append({"error": str(result)}) + else: + tako_results.append({"error": str(result)}) + elif task_type == "web": + search_results.append(result) + elif result: # deep Tako result + # Add resources immediately for streaming + existing_urls = {r.get("url") for r in state["resources"]} + existing_titles = {r.get("title", "").lower() for r in state["resources"] if r.get("resource_type") == "tako_chart"} + for chart in result: + chart_title_lower = chart.get("title", "").lower() + if chart.get("url") not in existing_urls and chart_title_lower not in existing_titles: + iframe_html = await get_visualization_iframe( + item_id=chart.get("id"), + embed_url=chart.get("embed_url") + ) + state["resources"].append({ + "url": chart["url"], + "title": chart["title"], + "description": chart["description"], + "content": chart["description"], + "resource_type": "tako_chart", + "source": "Tako", + "card_id": chart.get("id"), + "embed_url": chart.get("embed_url"), + "iframe_html": iframe_html, + }) + existing_urls.add(chart["url"]) + existing_titles.add(chart_title_lower) + tako_results.extend(result) + state["logs"][log_offset + i]["done"] = True + await copilotkit_emit_state(config, state) + + logger.info("Phase 2 fallback completed") # Deduplicate Tako charts by title (same chart may appear in multiple searches) seen_titles = {} @@ -243,16 +246,10 @@ async def search_node(state: AgentState, config: RunnableConfig): if duplicates_removed > 0: logger.info(f"Removed {duplicates_removed} duplicate Tako charts by title") - config = copilotkit_customize_config( - config, - emit_intermediate_state=[ - { - "state_key": "resources", - "tool": "ExtractResources", - "tool_argument": "resources", - } - ], - ) + # Note: We don't use emit_intermediate_state for resources here because + # we manually manage and emit resources throughout the search process. + # Using emit_intermediate_state would replace accumulated resources with + # just the newly selected ones, causing flicker. model = get_model(state) ainvoke_kwargs = {}