From 21d279085105975486e8e95c6355d80361f4db2f Mon Sep 17 00:00:00 2001 From: bobby abbott Date: Tue, 27 Jan 2026 22:46:51 -0800 Subject: [PATCH] save resources --- agents/python/src/lib/chat.py | 4 +- agents/python/src/lib/search.py | 143 ++++++++++++++++---------------- 2 files changed, 73 insertions(+), 74 deletions(-) diff --git a/agents/python/src/lib/chat.py b/agents/python/src/lib/chat.py index 33563ce..1c2a404 100644 --- a/agents/python/src/lib/chat.py +++ b/agents/python/src/lib/chat.py @@ -387,6 +387,7 @@ async def replace_marker(match): goto="chat_node", update={ "research_question": research_question, + "resources": state.get("resources", []), # Preserve resources "messages": [ ai_message, ToolMessage( @@ -421,6 +422,7 @@ async def replace_marker(match): goto="search_node", update={ "data_questions": data_questions, + "resources": state.get("resources", []), # Preserve resources "messages": [ ai_message, ToolMessage( @@ -432,4 +434,4 @@ async def replace_marker(match): ) logger.info(f"=== CHAT_NODE: Routing to {goto} ===") - return Command(goto=goto, update={"messages": response}) + return Command(goto=goto, update={"messages": response, "resources": state.get("resources", [])}) diff --git a/agents/python/src/lib/search.py b/agents/python/src/lib/search.py index c0194f0..98bd01a 100644 --- a/agents/python/src/lib/search.py +++ b/agents/python/src/lib/search.py @@ -112,122 +112,119 @@ async def search_node(state: AgentState, config: RunnableConfig): if not ENABLE_DEEP_QUERIES: deep_questions = [q for q in deep_questions if q.get("query_type") == "prediction_market"] - # Add logs for both web and Tako searches - for query in queries: - state["logs"].append({"message": f"Web search for {query}", "done": False}) + search_results = [] + tako_results = [] + # STAGE 1: Run Tako searches (fast + deep) and Tavily web searches in parallel + # Add logs for all searches + for query in queries: + state["logs"].append({"message": f"Web search: {query}", "done": False}) for q_obj in fast_questions: - state["logs"].append({"message": f"Tako fast search: {q_obj['question']}", "done": False}) - + state["logs"].append({"message": f"Tako search: {q_obj['question']}", "done": False}) for q_obj in deep_questions: state["logs"].append({"message": f"Tako deep search: {q_obj['question']}", "done": False}) + if queries or fast_questions or deep_questions: + await copilotkit_emit_state(config, state) - await copilotkit_emit_state(config, state) - - search_results = [] - tako_results = [] - - # Run Tavily web search and Tako knowledge search in parallel + # Build all tasks tavily_tasks = [async_tavily_search(query) for query in queries] fast_tako_tasks = [ search_knowledge_base(q["question"], search_effort="fast") for q in fast_questions ] + deep_tako_tasks = [ + search_knowledge_base(q["question"], search_effort="deep") + for q in deep_questions + ] - fast_tasks = tavily_tasks + fast_tako_tasks - fast_results = await asyncio.gather(*fast_tasks, return_exceptions=True) - - # Split results back into Tavily and Tako fast - num_tavily = len(tavily_tasks) - tavily_results = fast_results[:num_tavily] - fast_tako_results = fast_results[num_tavily:] - - # Process Tavily results - for i, result in enumerate(tavily_results): - if isinstance(result, Exception): - search_results.append({"error": str(result)}) - else: - search_results.append(result) - state["logs"][i]["done"] = True - await copilotkit_emit_state(config, state) - - # Process fast Tako results - for i, result in enumerate(fast_tako_results): - log_index = num_tavily + i - question = fast_questions[i]["question"] - if isinstance(result, Exception): - tako_results.append({"error": str(result)}) - elif result: # Tako returned results - tako_results.extend(result) - state["logs"][log_index]["done"] = True - await copilotkit_emit_state(config, state) - - # RETRY LOGIC: If no Tako results found, retry with 2 additional web searches - if not tako_results and fast_questions: - logger.info("No Tako results found, retrying with web searches") - # Use up to 2 questions for web search fallback - fallback_queries = [q["question"] for q in fast_questions[:2]] - - for query in fallback_queries: - state["logs"].append({"message": f"Tako Web Search: {query}", "done": False}) - await copilotkit_emit_state(config, state) + all_tasks = tavily_tasks + fast_tako_tasks + deep_tako_tasks + if all_tasks: + all_results = await asyncio.gather(*all_tasks, return_exceptions=True) - fallback_tasks = [async_tavily_search(query) for query in fallback_queries] - fallback_results = await asyncio.gather(*fallback_tasks, return_exceptions=True) + # Split results back into tavily, fast tako, and deep tako + num_tavily = len(tavily_tasks) + num_fast = len(fast_tako_tasks) + tavily_results = all_results[:num_tavily] + fast_tako_results = all_results[num_tavily:num_tavily + num_fast] + deep_tako_results = all_results[num_tavily + num_fast:] - for i, result in enumerate(fallback_results): + # Process Tavily results + for i, result in enumerate(tavily_results): if isinstance(result, Exception): search_results.append({"error": str(result)}) else: search_results.append(result) - # Mark the fallback log as done - state["logs"][-(len(fallback_queries) - i)]["done"] = True + state["logs"][i]["done"] = True await copilotkit_emit_state(config, state) - logger.info(f"Fallback web search completed with {len([r for r in fallback_results if not isinstance(r, Exception)])} results") + # Process fast Tako results + fast_log_offset = num_tavily + for i, result in enumerate(fast_tako_results): + if isinstance(result, Exception): + tako_results.append({"error": str(result)}) + elif result: + tako_results.extend(result) + state["logs"][fast_log_offset + i]["done"] = True + await copilotkit_emit_state(config, state) - for i, q_obj in enumerate(deep_questions): - log_index = num_tavily + len(fast_questions) + i - question = q_obj["question"] - try: - result = await search_knowledge_base(question, search_effort="deep") - if result: # Tako returned results - # STREAM: Add resources immediately (incremental emission) - # This allows frontend to show charts as they arrive + # Process deep Tako results (prediction markets, etc.) + deep_log_offset = num_tavily + num_fast + for i, result in enumerate(deep_tako_results): + if isinstance(result, Exception): + tako_results.append({"error": str(result)}) + elif result: + # Add resources immediately for streaming existing_urls = {r.get("url") for r in state["resources"]} existing_titles = {r.get("title", "").lower() for r in state["resources"] if r.get("resource_type") == "tako_chart"} for chart in result: chart_title_lower = chart.get("title", "").lower() if chart.get("url") not in existing_urls and chart_title_lower not in existing_titles: - # Generate iframe HTML for preview modal iframe_html = await get_visualization_iframe( item_id=chart.get("id"), embed_url=chart.get("embed_url") ) - state["resources"].append({ "url": chart["url"], "title": chart["title"], "description": chart["description"], - "content": chart["description"], # Use description as content + "content": chart["description"], "resource_type": "tako_chart", "source": "Tako", - "card_id": chart.get("id"), # Use card_id for frontend + "card_id": chart.get("id"), "embed_url": chart.get("embed_url"), - "iframe_html": iframe_html, # Include iframe HTML for preview + "iframe_html": iframe_html, }) existing_urls.add(chart["url"]) existing_titles.add(chart_title_lower) - tako_results.extend(result) + state["logs"][deep_log_offset + i]["done"] = True + await copilotkit_emit_state(config, state) + + logger.info(f"Search completed: {len(search_results)} web results, {len(tako_results)} Tako results") + + # STAGE 2: Only run fallback web searches if Tako returned no results + if not tako_results and fast_questions: + logger.info("No Tako results found, falling back to web searches for Tako questions") + fallback_queries = [q["question"] for q in fast_questions[:2]] - state["logs"][log_index]["done"] = True - await copilotkit_emit_state(config, state) # Stream progress + new resources - except Exception as e: - tako_results.append({"error": str(e)}) - state["logs"][log_index]["done"] = True + for query in fallback_queries: + state["logs"].append({"message": f"Fallback web search: {query}", "done": False}) + await copilotkit_emit_state(config, state) + + fallback_tasks = [async_tavily_search(query) for query in fallback_queries] + fallback_results = await asyncio.gather(*fallback_tasks, return_exceptions=True) + + log_offset = len(state["logs"]) - len(fallback_queries) + for i, result in enumerate(fallback_results): + if isinstance(result, Exception): + search_results.append({"error": str(result)}) + else: + search_results.append(result) + state["logs"][log_offset + i]["done"] = True await copilotkit_emit_state(config, state) + logger.info(f"Fallback web search completed with {len([r for r in fallback_results if not isinstance(r, Exception)])} results") + # Deduplicate Tako charts by title (same chart may appear in multiple searches) seen_titles = {} deduped_tako = []