From 4019c3e15185cd672c336b2775b0b50a4e5af8b9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Louis-F=C3=A9lix=20Nothias?= Date: Sun, 28 Jun 2026 12:47:24 +0200 Subject: [PATCH] fix: robustness and eval improvements across core, evolution, and agents - factory.py: replace infinite MCP discovery loop with 5-retry exponential backoff - orchestrator.py: classify sandbox errors as TIMEOUT/SYNTAX_ERROR/RUNTIME_ERROR - workflow_factory.py: validate Python keyword identifiers, atomic state_result.json write, post-assemble compile check - workflow_runner.py: delete temp exec script in finally block - evolution_engine.py: ghost WorkflowInfo guard (generation_failed uuid); ASTRA export returns bool + .export_status sidecar; clear VariationEngine score/gradient/agent-count histories on session start - selection.py: refresh member metrics before eviction check (not after) - planner.py: verify expected outputs of completed dependencies before marking step executable - smolagent_factory.py: file lock on save_memories; explicit TimeoutError re-raise before generic handler; restore exception capture in _run_agent retry loop; max_steps=35 + context-window guard callback - astra_exporter.py: fail-fast on empty trace - llm_provider.py: strip temperature entirely for Opus 4.x (invalid_request_error); broaden temperature error detection to message string - variation_engine.py: cap variation temperature at 1.0 for Claude models - csv_mode.py: parse SUCCESS_LEVEL from LLM response; pass scenario_rubric filename; resolve runs_capsule_dir to absolute path - config.py: add max_steps, max_context_tokens, export_astra fields - workflow_v11.md: add MCP response schema guidance, absolute-path rule, env-setup via shell tool, no built-in file I/O rule Co-Authored-By: Claude Sonnet 4.6 --- config.py | 9 + sources/benchmark_evaluation/csv_mode.py | 25 ++- sources/core/evolution_engine.py | 58 +++++- sources/core/factory.py | 33 ++- sources/core/llm_provider.py | 29 ++- sources/core/orchestrator.py | 11 +- sources/core/planner.py | 8 + sources/core/selection.py | 5 +- sources/core/variation_engine.py | 5 +- sources/core/workflow_factory.py | 18 +- sources/core/workflow_runner.py | 8 +- sources/modules/smolagent_factory.py | 82 +++++++- sources/prompts/workflow_v11.md | 4 + sources/transparency/astra_exporter.py | 245 +++++++++++++++++++++++ 14 files changed, 497 insertions(+), 43 deletions(-) create mode 100644 sources/transparency/astra_exporter.py diff --git a/config.py b/config.py index e89d122..8cf2de2 100644 --- a/config.py +++ b/config.py @@ -111,6 +111,9 @@ def __init__(self): # Per-agent (SmolAgentFactory) execution timeout in seconds. Injected into # the generated workflow as AGENT_EXECUTION_TIMEOUT. 3600 = 1 hour. self.agent_execution_timeout: int = 18000 + # Maximum input tokens per agent step before the context-window guard fires. + # Prevents runaway context explosion. Default 800k; raise for large-context models. + self.max_context_tokens: int = 800_000 self.runner_default_max_memory_mb: int = 10000 self.runner_default_max_cpu_percent: int = 100 self.runner_temp_dir: str = "./tmp" @@ -237,6 +240,7 @@ def jsonify( "runner_default_python_version": self.runner_default_python_version, "runner_default_timeout": self.runner_default_timeout, "agent_execution_timeout": self.agent_execution_timeout, + "max_context_tokens": self.max_context_tokens, "runner_default_max_memory_mb": self.runner_default_max_memory_mb, "runner_default_max_cpu_percent": self.runner_default_max_cpu_percent, "runner_temp_dir": self.runner_temp_dir, @@ -256,6 +260,7 @@ def from_json(self, data: dict[str, Any]) -> None: ) self.smolagent_model_id = data.get("smolagent_model_id", self.smolagent_model_id) self.judge_model = data.get("judge_model", self.judge_model) + self.capsule_namer_model = data.get("capsule_namer_model", self.capsule_namer_model) self.engine_name = data.get("engine_name", self.engine_name) self.openrouter_provider = data.get("openrouter_provider", self.openrouter_provider) self.prompt_planner = data.get("prompt_planner", self.prompt_planner) @@ -299,6 +304,9 @@ def from_json(self, data: dict[str, Any]) -> None: self.agent_execution_timeout = data.get( "agent_execution_timeout", self.agent_execution_timeout ) + self.max_context_tokens = data.get( + "max_context_tokens", self.max_context_tokens + ) self.runner_default_max_memory_mb = data.get( "runner_default_max_memory_mb", self.runner_default_max_memory_mb ) @@ -360,6 +368,7 @@ def __str__(self) -> str: lines.append(f" runner_default_python_version={self.runner_default_python_version}") lines.append(f" runner_default_timeout={self.runner_default_timeout}") lines.append(f" agent_execution_timeout={self.agent_execution_timeout}") + lines.append(f" max_context_tokens={self.max_context_tokens}") lines.append(f" runner_default_max_memory_mb={self.runner_default_max_memory_mb}") lines.append(f" runner_default_max_cpu_percent={self.runner_default_max_cpu_percent}") lines.append(f" runner_temp_dir={self.runner_temp_dir}") diff --git a/sources/benchmark_evaluation/csv_mode.py b/sources/benchmark_evaluation/csv_mode.py index 8d3fa4e..64df976 100644 --- a/sources/benchmark_evaluation/csv_mode.py +++ b/sources/benchmark_evaluation/csv_mode.py @@ -404,6 +404,17 @@ def _generate_task_default(self, row, workspace_subfolder: str | None = None): All file operations MUST be performed within this subfolder. """ + # When the Prompt is a self-contained task capsule (non-empty) and the URL + # points to a local file rather than a web resource, the file cannot be + # navigated to by the agent and the capsule already encodes all required + # context. In that case, skip the paper/URL wrapper entirely. + is_local_path = url and not url.startswith("http") + if prompt and is_local_path: + return f""" +{prompt} +{workspace_instruction} + """.strip() + return f""" Paper title: {paper_title} Url to paper: {url} @@ -500,9 +511,11 @@ def _analyze_results(self, goal: str, results_str: str, execution_time: float) - Provide your analysis following the specified output format.""" analysis_text = self.result_analyzer(prompt) + import re as _re + _sl = _re.search(r'SUCCESS_LEVEL:\s*(High|Medium|Low|Incomplete|Failed|Error)', analysis_text, _re.IGNORECASE) analysis = { "full_analysis": analysis_text, - "success_level": "Medium", + "success_level": _sl.group(1) if _sl else "Medium", "key_insight": "Analysis completed" } return analysis @@ -607,6 +620,8 @@ def _create_isolated_config(self, task_id: str) -> Any: isolated_workspace = Path(self._base_workspace_dir) / f"worker_{task_id}" isolated_workspace.mkdir(parents=True, exist_ok=True) isolated_config.workspace_dir = str(isolated_workspace) + # Resolve to absolute so CWD-relative paths don't break in concurrent workers + isolated_config.runs_capsule_dir = str(Path(self.config.runs_capsule_dir).resolve()) return isolated_config def _cleanup_isolated_workspace(self, task_id: str) -> None: @@ -721,7 +736,7 @@ async def _process_single_task( goal=goal, judge=True, enable_evolution=learning, - scenario_rubric=None, + scenario_rubric=scenario_rubric_filename, single_agent_mode=single_agent_mode ) results_str = self._format_task_mode_results(runs[-1]) @@ -827,9 +842,11 @@ def _analyze_results_isolated(self, goal: str, results_str: str, execution_time: Provide your analysis following the specified output format.""" analysis_text = self.result_analyzer(prompt) + import re as _re + _sl = _re.search(r'SUCCESS_LEVEL:\s*(High|Medium|Low|Incomplete|Failed|Error)', analysis_text, _re.IGNORECASE) analysis = { "full_analysis": analysis_text, - "success_level": "Medium", + "success_level": _sl.group(1) if _sl else "Medium", "key_insight": "Analysis completed" } return analysis @@ -1017,7 +1034,7 @@ async def run_single_thread_eval_loop(self, dataset_type: str, dataset_path: str runs = await self.evolve.start_workflow_evolution(goal=goal, judge=True, enable_evolution=learning, - scenario_rubric=None, + scenario_rubric=scenario_rubric_filename, single_agent_mode=single_agent_mode ) results_str = self._format_task_mode_results(runs[-1]) diff --git a/sources/core/evolution_engine.py b/sources/core/evolution_engine.py index d4d2750..acd8972 100644 --- a/sources/core/evolution_engine.py +++ b/sources/core/evolution_engine.py @@ -353,8 +353,12 @@ async def start_workflow_evolution( wf = None max_iteration = self.config.max_learning_evolve_iterations if enable_evolution else 1 - # Reset archive at session start + # Reset archive and variation history at session start so each task + # starts with a clean slate (prevents boldness contamination across rows). self.selection._archive = [] + self.variation.score_history.clear() + self.variation.textual_gradient_history.clear() + self.variation.agent_count_history.clear() parents, _ = self.select_parent_workflow( goal, template_uuid=template_uuid @@ -467,7 +471,8 @@ async def evolve_generation( original_task=runs[-1].original_task, single_agent_mode=single_agent_mode ) - wf_info = WorkflowInfo(uuid, Path(f"{self.workflow_dir}/{uuid}")) + _valid_uuid = uuid and uuid != "generation_failed" + wf_info = WorkflowInfo(uuid, Path(f"{self.workflow_dir}/{uuid}")) if _valid_uuid else None self._save_evolution_prompt_artifact(uuid, runs[-1].prompt) # Persist lineage as soon as we have a uuid so the evolution tree can # include even runs that subsequently fail evaluation. @@ -487,7 +492,7 @@ async def evolve_generation( if workspace_mgr is not None and uuid: workspace_mgr.save_run_snapshot(uuid) - if workflow_genotype_code: + if workflow_genotype_code and wf_info: # Evaluate and calculate costs eval_type, current_iteration_cost = await self._evaluate_and_calculate_cost( executed, runs[-1].judge, uuid, runs[-1].answers, runs[-1].scenario_rubric, assertion_history @@ -496,7 +501,7 @@ async def evolve_generation( runs[-1].reward_uncapped = wf_info.overall_score_uncapped runs[-1].code = wf_info.code - if uuid: + if uuid and wf_info: verifier = (wf_info.state_result or {}).get("evaluation", {}).get("verifier", {}) is_failure = ( on_error @@ -518,11 +523,14 @@ async def evolve_generation( ) runs[-1].current_uuid = uuid - runs[-1].answers = wf_info.answers - runs[-1].state_result = wf_info.state_result - agents_answers = self.extract_agents_behavior(wf_info.state_result) + runs[-1].answers = wf_info.answers if wf_info else [] + runs[-1].state_result = wf_info.state_result if wf_info else {} + agents_answers = self.extract_agents_behavior(wf_info.state_result if wf_info else {}) self.show_answers(agents_answers) - rewards_history.append(wf_info.overall_score) + if not on_error and wf_info: + rewards_history.append(wf_info.overall_score) + else: + rewards_history.append(0.0) # โ”€โ”€ Survivor validation: gate + populate _archive (steady-state population) if uuid and not on_error: @@ -677,6 +685,40 @@ async def evolve_generation( runs[-1].plot = self._save_final_plots(assertion_history, rewards_history, uuid) return runs + def _export_astra(self, best_uuid: str, goal: str) -> bool: + """Best-effort post-run ASTRA export of the best workflow's trace. + + Gated on ``config.export_astra`` (opt-in, off by default). Failure is + non-fatal: a missing memory directory, an LLM hiccup, or a YAML write + error must not break the evolution loop. Writes a ``.export_status`` + sidecar under the capsule directory so callers can inspect outcomes. + + Returns: + ``True`` if export succeeded, ``False`` otherwise. + """ + if not getattr(self.config, "export_astra", False): + return False + status_dir = os.path.join(self.config.runs_capsule_dir, best_uuid) + status_path = os.path.join(status_dir, ".export_status") + try: + from sources.transparency import AstraExporter + result = AstraExporter(self.config, self.logger).export(best_uuid, goal) + os.makedirs(status_dir, exist_ok=True) + with open(status_path, "w") as _f: + json.dump({"status": "ok", "path": str(result)}, _f) + return True + except Exception as exc: + self.logger.warning(f"[ASTRA] export skipped: {exc}") + print_warn(f"ASTRA export failed (non-fatal): {exc}") + try: + os.makedirs(status_dir, exist_ok=True) + with open(status_path, "w") as _f: + json.dump({"status": "error", "error": str(exc)}, _f) + except Exception: + pass + return False + + def _get_human_validation(self) -> bool: """Get human validation for continuing the workflow. diff --git a/sources/core/factory.py b/sources/core/factory.py index 30bf868..ce202aa 100644 --- a/sources/core/factory.py +++ b/sources/core/factory.py @@ -47,22 +47,41 @@ async def load_tools_code(self) -> tuple[str, str]: """ tools_code = "" existing_tool_prompt = "" + import asyncio as _asyncio tool_manager = ToolManager(self.config) - try: - tool_setup = False - while tool_setup == False: + _max_retries = 5 + _backoff = 1.0 + mcps = [] + for _attempt in range(_max_retries): + try: mcps = await tool_manager.discover_mcp_servers() tool_setup = await tool_manager.verify_tools() - except Exception as e: - self.logger.error(f"load_tools_code: Failed to discover MCP servers: {str(e)}") - raise RuntimeError(f"Failed to discover MCP servers: {str(e)}") from e + if tool_setup and mcps: + break + except Exception as e: + if _attempt == _max_retries - 1: + self.logger.error( + f"load_tools_code: MCP discovery failed after {_max_retries} attempts: {e}" + ) + raise RuntimeError( + f"MCP discovery failed after {_max_retries} attempts: {e}" + ) from e + self.logger.warning( + f"MCP discovery attempt {_attempt + 1}/{_max_retries} failed: {e}. " + f"Retrying in {_backoff:.1f}sโ€ฆ" + ) + await _asyncio.sleep(_backoff) + _backoff = min(_backoff * 2, 30.0) + else: + raise RuntimeError( + f"MCP servers did not become ready after {_max_retries} attempts." + ) if not mcps: raise ValueError( "\n" + "=" * 80 + "\n๐Ÿšจ FATAL ERROR: No MCP Servers Found! ๐Ÿšจ" "\n" + "-" * 80 + "\nPlease ensure at least one MCP instance is running on Toolomics." - "\nRetrying until MCPs detected.... use CTRL+C to stop." "\n" + "=" * 80 + "\n" ) for mcp in mcps: diff --git a/sources/core/llm_provider.py b/sources/core/llm_provider.py index 7ad0ce1..9018ee2 100644 --- a/sources/core/llm_provider.py +++ b/sources/core/llm_provider.py @@ -216,6 +216,14 @@ def _is_claude_model(self) -> bool: """ return self.config.provider == "anthropic" or "claude" in self.config.model.lower() + def _is_no_temperature_model(self) -> bool: + """True for models that reject the temperature parameter entirely. + + Claude Opus 4.x (e.g. claude-opus-4-8) does not accept temperature + at all โ€” passing it triggers an invalid_request_error from Anthropic. + """ + return "claude-opus-4" in self.config.model.lower() + def save_call(self, call: dict[str, Any]) -> None: """Save the API call details to a JSON file. @@ -317,8 +325,11 @@ def _messages_match(self, expected: list[dict[str, Any]], cached: list[dict[str, @staticmethod def _is_temperature_error(error: Exception) -> bool: - """True when the API rejected ``temperature``, read from ``error.param``.""" - return getattr(error, "param", None) == "temperature" + """True when the API rejected the ``temperature`` parameter.""" + if getattr(error, "param", None) == "temperature": + return True + msg = str(error).lower() + return "temperature" in msg and ("deprecated" in msg or "range" in msg) def _is_retryable_error(self, error: Exception) -> bool: """Check if an error is retryable (temporary/transient). @@ -404,18 +415,20 @@ def __call__(self, prompt: str, timeout: int = 180, use_cache: bool = True) -> s attempt = 0 max_wait = 500 # Maximum wait time in seconds context_window_retry_count = 0 # Track context window errors specifically - effective_temperature = self.config.temperature + # None means "omit temperature from the request" (required for Opus 4.x) + effective_temperature = None if self._is_no_temperature_model() else self.config.temperature while True: # Infinite retry loop try: completion_params = { "model": f"{self.config.provider}/{self.config.model}", "messages": message, - "temperature": effective_temperature, "timeout": timeout, "max_tokens": self.config.max_tokens, "drop_params": True, } + if effective_temperature is not None: + completion_params["temperature"] = effective_temperature completion_params["api_key"] = self.config.key # Add reasoning effort if supported (not for Claude models) if self._supports_reasoning_tokens() and not self._is_claude_model(): @@ -454,12 +467,12 @@ def __call__(self, prompt: str, timeout: int = 180, use_cache: bool = True) -> s attempt += 1 except Exception as e: - if self._is_temperature_error(e) and effective_temperature != 1.0: + if self._is_temperature_error(e) and effective_temperature is not None: self.logger.warning( - f"Provider rejected temperature={effective_temperature:.2f}; " - f"falling back to 1.0 and retrying." + f"Provider rejected temperature={effective_temperature}; " + f"stripping temperature and retrying." ) - effective_temperature = 1.0 + effective_temperature = None continue # Check if this is a retryable error diff --git a/sources/core/orchestrator.py b/sources/core/orchestrator.py index c78738e..e89cfe1 100644 --- a/sources/core/orchestrator.py +++ b/sources/core/orchestrator.py @@ -89,8 +89,15 @@ def progress_handler(line: str) -> None: result.stdout or result.stderr or "No output from workflow execution." ) else: - print_err(f"Workflow execution failed: {result.stderr}") - raise Exception(f"Workflow execution failed: {result.stderr}") + stderr = result.stderr or "" + if "TimeoutExpired" in stderr or "timed out" in stderr.lower(): + error_type = "TIMEOUT" + elif "SyntaxError" in stderr: + error_type = "SYNTAX_ERROR" + else: + error_type = "RUNTIME_ERROR" + print_err(f"[{error_type}] Workflow execution failed: {stderr}") + raise Exception(f"[{error_type}] Workflow execution failed: {stderr}") def perspicacite_grounding_task(self, task: str) -> str: """Query Perspicacite-AI for a literature-grounded approach to a task. diff --git a/sources/core/planner.py b/sources/core/planner.py index 214c7f2..f2cc24a 100644 --- a/sources/core/planner.py +++ b/sources/core/planner.py @@ -656,6 +656,14 @@ def _can_execute_step(self, step: PlanStep) -> tuple[bool, list[str]]: dep_task = next((task for task in self.task_history if task.name == dep_name), None) if dep_task is None or dep_task.status != TaskStatus.COMPLETED: missing_deps.append(dep_name) + continue + expected_outputs = getattr(dep_task, "expected_outputs", None) + if expected_outputs: + outputs_ok, missing_outputs = self._verify_expected_outputs(expected_outputs) + if not outputs_ok: + missing_deps.append( + f"{dep_name}[missing_outputs:{','.join(missing_outputs)}]" + ) return len(missing_deps) == 0, missing_deps def request_user_exit(self, msg: str) -> None: diff --git a/sources/core/selection.py b/sources/core/selection.py index 7940196..afef3ab 100644 --- a/sources/core/selection.py +++ b/sources/core/selection.py @@ -577,6 +577,9 @@ def _add_to_archive(self, member: PopulationMember) -> None: member: The population member to admit. """ self._archive.append(member) + # Refresh metrics first so eviction uses up-to-date qd_scores (including + # the newly-admitted member's novelty contribution to existing peers). + self._refresh_member_metrics() if len(self._archive) > self.population_size: weakest = min(self._archive, key=lambda m: m.qd_score) @@ -587,8 +590,6 @@ def _add_to_archive(self, member: PopulationMember) -> None: f"reward={weakest.reward:.3f}) โ€” archive full" ) - self._refresh_member_metrics() - def _refresh_member_metrics(self) -> None: """Recompute stored novelty + qd_score for every archive member.""" if len(self._archive) < 2: diff --git a/sources/core/variation_engine.py b/sources/core/variation_engine.py index 7a694fc..b12da52 100644 --- a/sources/core/variation_engine.py +++ b/sources/core/variation_engine.py @@ -51,10 +51,13 @@ def setup_llm(self, config): self.judge_model = config.workflow_llm_model try: provider, model = self.judge_model.split("/", 1) if "/" in self.judge_model else ("openai", self.judge_model) + # Anthropic/Claude caps at 1.0; Opus 4.x strips temperature entirely (handled by LLMProvider) + is_claude = provider == "anthropic" or "claude" in model.lower() + variation_temperature = min(1.2, 1.0) if is_claude else 1.2 self.llm_config = LLMConfig().from_dict({ "model": model, "provider": provider, - "temperature": 1.2, + "temperature": variation_temperature, "reasoning_effort": config.reasoning_effort, "max_tokens": getattr(config, 'max_tokens', 8192), "openrouter_provider": config.openrouter_provider_for(self.judge_model), diff --git a/sources/core/workflow_factory.py b/sources/core/workflow_factory.py index 080b3a8..3ffa58e 100644 --- a/sources/core/workflow_factory.py +++ b/sources/core/workflow_factory.py @@ -265,6 +265,12 @@ def validate_workflow_structure(self, workflow_genotype_code: str) -> None: nodes = set(re.findall(patterns["nodes"], workflow_genotype_code)) if not nodes: raise ValueError("No workflow nodes found") + import keyword as _keyword + invalid_ids = {n for n in nodes if _keyword.iskeyword(n) or not n.isidentifier()} + if invalid_ids: + raise ValueError( + f"Workflow node names are Python keywords or invalid identifiers: {sorted(invalid_ids)}" + ) self.logger.debug(f"Workflow nodes discovered: {', '.join(sorted(nodes))}") # Validate START edge target exists @@ -346,6 +352,8 @@ def assemble_workflow( ENGINE_NAME = {self.config.engine_name!r} OPENROUTER_PROVIDER = {self.config.openrouter_provider_for(self.config.smolagent_model_id)!r} AGENT_EXECUTION_TIMEOUT = {self.config.agent_execution_timeout!r} +MAX_CONTEXT_TOKENS = {self.config.max_context_tokens!r} +WORKSPACE_DIR = {self.config.workspace_dir!r} GOAL = {goal!r} SYSTEM_PROMPT = {smolagent_system_prompt!r} @@ -388,8 +396,11 @@ def assemble_workflow( if WORKFLOW_PATH: print("workflow run: saving workflow state JSON at :", WORKFLOW_PATH) try: - with open(os.path.join(WORKFLOW_PATH, "state_result.json"), "w") as f: + _state_path = os.path.join(WORKFLOW_PATH, "state_result.json") + _tmp_path = _state_path + ".tmp" + with open(_tmp_path, "w") as f: json.dump(result_state, f, indent=2) + os.replace(_tmp_path, _state_path) except Exception as e: raise(Exception(f"Could not save workflow data:" + str(e))) """ @@ -486,6 +497,11 @@ async def craft_workflow( smolagent_system_prompt ) + try: + compile(complete_code, "", "exec") + except SyntaxError as e: + raise ValueError(f"UUID:{uuid_str}|Assembled workflow has invalid syntax: {e}") from e + self.logger.info("Workflow generation completed") self.logger.debug(f"Workflow path: {workflow_path}") diff --git a/sources/core/workflow_runner.py b/sources/core/workflow_runner.py index 08c239a..b5fac24 100644 --- a/sources/core/workflow_runner.py +++ b/sources/core/workflow_runner.py @@ -336,7 +336,13 @@ async def execute( with open(script_path, "w") as f: f.write(code) cmd = [*self._python_cmd, script_path] - return await self._run_command(cmd, execution_id, progress_callback) + try: + return await self._run_command(cmd, execution_id, progress_callback) + finally: + try: + os.unlink(script_path) + except OSError: + pass @staticmethod def _build_color_env() -> dict[str, str]: diff --git a/sources/modules/smolagent_factory.py b/sources/modules/smolagent_factory.py index 23ec339..2c2eb96 100644 --- a/sources/modules/smolagent_factory.py +++ b/sources/modules/smolagent_factory.py @@ -38,10 +38,9 @@ from smolagents.local_python_executor import BASE_PYTHON_TOOLS, DANGEROUS_FUNCTIONS, DANGEROUS_MODULES import signal - -import subprocess -DANGEROUS_FUNCTIONS = {subprocess} -DANGEROUS_MODULES = {} +# Sandbox enforcement relies on smolagents' LocalPythonExecutor defaults. +# Do NOT rebind DANGEROUS_MODULES/DANGEROUS_FUNCTIONS here โ€” that only +# changes the module-level name and never reaches the executor. LANGFUSE_PUBLIC_KEY=os.getenv("LANGFUSE_PUBLIC_KEY") LANGFUSE_SECRET_KEY=os.getenv("LANGFUSE_SECRET_KEY") @@ -57,6 +56,29 @@ SmolagentsInstrumentor().instrument(tracer_provider=trace_provider) +def _make_tool_json_aware(tool): + """Patch a tool's forward() so JSON-string responses are auto-parsed to dicts. + + Toolomics MCP tools return raw JSON strings. Generated agent code that indexes + into the result without calling json.loads() first gets a TypeError. This wrapper + makes every tool silently parse its output when it is a valid JSON string, so + generated code can index directly without the boilerplate. + """ + if not hasattr(tool, "forward"): + return tool + original_forward = tool.forward + def _json_aware_forward(*args, **kwargs): + result = original_forward(*args, **kwargs) + if isinstance(result, str): + try: + return json.loads(result) + except (json.JSONDecodeError, ValueError): + pass + return result + tool.forward = _json_aware_forward + return tool + + class SmolAgentFactory: def __init__(self, @@ -64,11 +86,11 @@ def __init__(self, instruct_prompt, tools=[], temperature=0.7, - max_steps=128, + max_steps=35, ) -> None: self.name = name self.instruct_prompt = instruct_prompt - self.tools = tools + self.tools = [_make_tool_json_aware(t) for t in tools] # variable defined by workflow factory self.model_id = MODEL_ID self.memory_folder = MEMORY_PATH @@ -92,13 +114,37 @@ def __init__(self, assert os.path.exists(self.memory_folder), f"Memory folder {self.memory_folder} does not exist. Please create it." + # Maximum input-token count allowed per agent run. When any step's + # cumulative input tokens exceed this, the agent is stopped early to + # prevent context-window explosion and runaway cost. Injected from + # config as MAX_CONTEXT_TOKENS; falls back to 800k if not set. + self.max_context_tokens: int = globals().get("MAX_CONTEXT_TOKENS", 800_000) + try: self.engine = self.get_engine() + + def _context_guard_callback(step): + """Stop the agent when input tokens exceed the configured ceiling.""" + if not isinstance(step, ActionStep): + return + usage = getattr(step, "token_usage", None) + if usage is None: + return + input_tokens = getattr(usage, "input_tokens", None) or ( + usage.get("input_tokens") if isinstance(usage, dict) else None + ) + if input_tokens and input_tokens > self.max_context_tokens: + raise RuntimeError( + f"Context window guard: input tokens ({input_tokens:,}) exceeded " + f"limit ({self.max_context_tokens:,}). Stopping agent '{self.name}'." + ) + self.agent = CodeAgent( tools=self.tools, model=self.engine, name=f"{self.name}_agent", max_steps=max_steps, + step_callbacks=[_context_guard_callback], #planning_interval=planning_interval, # think more before acting additional_authorized_imports = [ 'requests', 'json', 'requests.exceptions', @@ -201,10 +247,21 @@ def build_workflow_step_prompt(self, state: WorkflowState) -> str: truncated_answer = str(answer)[:4096] + "..." if len(str(answer)) > 4096 else str(answer) prev_infos += f"- Agent '{step_name}': {truncated_answer}\n\n" + workspace_dir = globals().get("WORKSPACE_DIR", "") + workspace_hint = f"Workspace dir: {workspace_dir}" if workspace_dir else "" + return f""" OPERATIONAL CONTEXT: {prev_infos} +SANDBOX RULES (MANDATORY): +1. NEVER use `open()`, `f.write()`, or any built-in file I/O โ€” forbidden in this sandbox. + Use the file-write tools provided in the workflow context instead. +2. Tool responses may be JSON strings โ€” always parse before indexing: + result = json.loads(raw) if isinstance(raw, str) else raw + Then access content via result.get("stdout", str(raw)). Never assume arbitrary key names. +3. ALL file paths must be ABSOLUTE. {workspace_hint} + TASK: {self.instruct_prompt} Address complain from the last agent informations if any. @@ -261,9 +318,14 @@ def save_memories(self, workflow_uuid: str): ) memories.append(action_step) try: + import fcntl agent_task_path = os.path.join(self.memory_folder, f"task_{self.name}.json") with open(agent_task_path, "w") as f: - json.dump(memories, f, indent=2) + fcntl.flock(f, fcntl.LOCK_EX) + try: + json.dump(memories, f, indent=2) + finally: + fcntl.flock(f, fcntl.LOCK_UN) print(f"Agent memories saved successfully to {agent_task_path}") except Exception as e: print(f"Failed to save memory: {str(e)}") @@ -358,12 +420,11 @@ def _run_agent(): error = False warning = True except Exception as e: + result['exception'] = e print(str(e)) print("retrying...") error = True count += 1 - #result['exception'] = e - #result['completed'] = True agent_thread = threading.Thread(target=_run_agent, daemon=True) agent_thread.start() @@ -377,6 +438,9 @@ def _run_agent(): raise result['exception'] self.save_memories(workflow_uuid=workflow_uuid) return result['response'] + except TimeoutError: + self.save_memories(workflow_uuid=workflow_uuid) + raise except Exception as e: self.save_memories(workflow_uuid=workflow_uuid) raise e diff --git a/sources/prompts/workflow_v11.md b/sources/prompts/workflow_v11.md index 6778457..7b8ae93 100644 --- a/sources/prompts/workflow_v11.md +++ b/sources/prompts/workflow_v11.md @@ -214,6 +214,10 @@ START --> [Solver] --> [Grounded Checker] --> END - Degenerate output is failure. Any fallback, constant, placeholder, or base-rate result must route to repair, never report SUCCESS. - Prefer the smallest workflow with a grounded check and a repair path. Add an agent only when it forces a commitment the single pass skips or routes around a real bottleneck. - Reserve deliberation for genuinely independent inputs. Use the debate pattern only when the critics are grounded in different external evidence or run on different models; otherwise it is agreement theater. +- **MCP tool response schema**: Toolomics tools return JSON strings with schema `{"status": "success"|"error", "stdout": "...", "stderr": "...", "exit_code": 0}`. Agent code must always do `import json; result = json.loads(raw) if isinstance(raw, str) else raw` before indexing, then read content via `result.get("stdout", raw)`. Never assume keys like `page_content`, `text`, or `content` โ€” check the actual response shape before indexing. +- **Absolute paths only**: All file paths in agent instructions must be absolute. The global `WORKSPACE_DIR` is available in the workflow context; use it as the base for output paths (e.g. `os.path.join(WORKSPACE_DIR, subdir, filename)`). +- **Environment setup via shell tool**: When the task requires installing packages or setting up an environment (conda, pip, apt, etc.), instruct the agent to use `execute_command("conda install ...", timeout=1800)` (or longer for heavy installs). Never use `import subprocess` โ€” it is forbidden in the sandbox. Set `timeout` explicitly for any command that may take more than 5 minutes. +- **No built-in file I/O**: The sandbox forbids `open()`, `f.write()`, and `f.read()`. Agent instructions must direct agents to use the file-write tools available in the workflow โ€” never Python's built-in file I/O. ## Checklist diff --git a/sources/transparency/astra_exporter.py b/sources/transparency/astra_exporter.py new file mode 100644 index 0000000..93885a2 --- /dev/null +++ b/sources/transparency/astra_exporter.py @@ -0,0 +1,245 @@ +"""Orchestrate the post-run ASTRA export for the best evolved workflow. + +Pipeline +-------- +1. Locate the best run's memory directory (``/``). +2. Load the smolagents trace and strip the bloat โ€” see + :mod:`sources.transparency.trace_compaction`. +3. Run a per-step decision-extraction LLM pass โ€” see + :mod:`sources.transparency.decision_extractor`. +4. Assemble the analysis + universe dicts and write them into the run's + capsule subfolder as ``astra.yaml`` and ``universes/best.yaml``. + +Output target: ``//``. The best run's +UUID is used as the capsule subfolder name (stable, deterministic, mirrors +the ``sources/memory/`` convention) โ€” it does NOT depend on the +LLM-named goal capsule produced later by ``LocalTransfer``. + +The exporter is wired into :func:`start_workflow_evolution` immediately +after ``workspace_mgr.restore_best`` and is gated on ``config.export_astra``. +""" + +from __future__ import annotations + +import json +import logging +import os +import sys +from pathlib import Path + +if __name__ == "__main__": + sys.path.append( + os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + ) + +from sources.transparency.decision_extractor import extract_decisions +from sources.transparency.memory_trace import RECIPE_FILENAME, reconstruct_recipe +from sources.transparency.trace_compaction import compact_trace, load_trace +from sources.transparency.yaml_writer import ( + build_analysis, + build_universe, + write_export, +) + + +# Names never worth listing as scientific outputs. +_ARTEFACT_IGNORE = {".DS_Store", "Thumbs.db", ".gitkeep", RECIPE_FILENAME} + + +class AstraExporter: + """Build an ASTRA-spec YAML pair from the best run's saved memory.""" + + # WorkspaceManager snapshots best-run artefacts under this root with the + # naming pattern ``mimosa_run__``. Overridable for + # tests so the lookup can target a sandbox rather than the real ``/tmp``. + _SNAPSHOT_ROOT: Path = Path("/tmp") + + def __init__(self, config, logger: logging.Logger | None = None) -> None: + """Bind to the running :class:`config.Config` and the engine's logger.""" + self.config = config + self.logger = logger or logging.getLogger(__name__) + + def export(self, best_uuid: str, goal: str) -> Path | None: + """Run the pipeline and return the analysis path (None on failure). + + Args: + best_uuid: UUID of the run selected as best by the evolution loop. + goal: User-provided goal text โ€” supplied to the LLM as context. + """ + from sources.cli.pretty_print import ( + print_info, print_ok, print_section, print_warn, + ) + print_section("ASTRA EXPORT") + memory_path = Path(self.config.memory_dir) / best_uuid + workspace_dir = Path(self.config.workspace_dir) + capsule_dir = Path(self.config.runs_capsule_dir) / best_uuid + if not memory_path.is_dir(): + print_warn(f"Memory directory missing: {memory_path}; export skipped.") + return None + + raw_steps = load_trace(memory_path) + if not raw_steps: + print_warn(f"Empty trace for {best_uuid}; ASTRA export skipped.") + return None + compact = compact_trace(raw_steps) + print_info( + f"Trace compacted: {len(raw_steps)} raw โ†’ {len(compact)} candidate steps." + ) + + llm_config = self._build_llm_config() + decisions = extract_decisions(compact, goal, memory_path, llm_config) + print_info(f"Decisions extracted: {len(decisions)}.") + + artefacts_dir = self._resolve_artefacts_dir(best_uuid, workspace_dir) + if artefacts_dir != workspace_dir: + print_info(f"Reading artefacts from /tmp snapshot: {artefacts_dir}") + workspace_files = self._list_workspace_files(artefacts_dir) + + recipe_command = self._write_recipe(capsule_dir, memory_path) + analysis = build_analysis( + goal, best_uuid, workspace_files, decisions, recipe_command + ) + universe = build_universe(decisions, best_uuid) + path = write_export(capsule_dir, analysis, universe) + print_ok(f"ASTRA analysis written to {path}") + return path + + def _write_recipe(self, capsule_dir: Path, memory_path: Path) -> str: + """Reconstruct the run's code into ``recipe.py`` and return its command. + + Falls back to a pointer command when no executable code was recovered, + so the analysis stays valid for runs that produced no code steps. + """ + from sources.transparency.yaml_writer import _RECIPE_FALLBACK_COMMAND + code = reconstruct_recipe(memory_path) + if not code.strip(): + return _RECIPE_FALLBACK_COMMAND + capsule_dir.mkdir(parents=True, exist_ok=True) + (capsule_dir / RECIPE_FILENAME).write_text(code) + return f"python {RECIPE_FILENAME}" + + def _build_llm_config(self): + """Reuse the project's judge model for cheap structured extraction.""" + from sources.core.llm_provider import LLMConfig + model = getattr(self.config, "judge_model", None) or "anthropic/claude-sonnet-4-5" + provider = model.split("/", 1)[0] if "/" in model else "anthropic" + return LLMConfig( + model=model, + provider=provider, + temperature=0.0, + openrouter_provider=self.config.openrouter_provider_for(model), + ) + + def _resolve_artefacts_dir(self, best_uuid: str, workspace_dir: Path) -> Path: + """Prefer the run's saved /tmp snapshot, fall back to the live workspace. + + The snapshot is the canonical source โ€” ``WorkspaceManager.restore_best`` + copies it into ``workspace_dir`` just before the export runs. Falling + back keeps standalone mode functional after ``cleanup()`` has wiped + ``/tmp``. + """ + pattern = f"mimosa_run_*_{best_uuid}" + for snapshot in self._SNAPSHOT_ROOT.glob(pattern): + if snapshot.is_dir(): + return snapshot + return workspace_dir + + def _list_workspace_files(self, workspace_dir: Path) -> list[str]: + """Top-level artefact files, sorted; drops our own output and OS junk.""" + if not workspace_dir.is_dir(): + return [] + excluded = {"astra.yaml", "universes"} | _ARTEFACT_IGNORE + return sorted( + p.name for p in workspace_dir.iterdir() + if p.is_file() and p.name not in excluded and not p.name.startswith(".") + ) + + +def _resolve_goal(memory_dir: Path, uuid: str, override: str | None) -> str: + """Read the goal from ``state_result.json`` unless an override is given.""" + if override: + return override + state_path = memory_dir / uuid / "state_result.json" + if not state_path.exists(): + return "(unknown goal โ€” pass --goal to override)" + try: + return json.loads(state_path.read_text()).get("goal") or "(unknown goal)" + except (OSError, json.JSONDecodeError): + return "(unknown goal โ€” state_result.json unreadable)" + + +def _run_standalone(uuid: str, goal_override: str | None) -> int: + """Export ASTRA for a real memory UUID using the project's Config.""" + logging.basicConfig(level=logging.INFO, format="%(message)s") + from config import Config + config = Config() + goal = _resolve_goal(Path(config.memory_dir), uuid, goal_override) + path = AstraExporter(config).export(uuid, goal) + if path is None: + print("[FAIL] export returned None โ€” see warnings above.") + return 1 + print(f"[OK] ASTRA export wrote {path}") + return 0 + + +def _run_smoke_check() -> None: + """Self-contained sanity check โ€” sandboxed in /var/folders, leaves nothing behind.""" + import tempfile + from types import SimpleNamespace + with tempfile.TemporaryDirectory() as tmp: + memory_dir = Path(tmp) / "memory" + workspace_dir = Path(tmp) / "workspace" + capsule_dir = Path(tmp) / "runs_capsule" + run_uuid = "smoke-uuid" + (memory_dir / run_uuid).mkdir(parents=True) + workspace_dir.mkdir() + capsule_dir.mkdir() + (workspace_dir / "model.pkl").write_text("fake") + (workspace_dir / ".DS_Store").write_text("junk") + (memory_dir / run_uuid / "task_single_agent.json").write_text(json.dumps([ + { + "step_number": 1, + "model_output_message": {"content": "Plot."}, + "code_action": "import matplotlib.pyplot as plt\nplt.savefig('x.png')", + "observations": "saved", + }, + ])) + config = SimpleNamespace( + memory_dir=str(memory_dir), + workspace_dir=str(workspace_dir), + runs_capsule_dir=str(capsule_dir), + judge_model="openrouter/deepseek/deepseek-v4-flash", + openrouter_provider_for=lambda _m: None, + ) + exporter = AstraExporter(config) + compact = compact_trace(load_trace(memory_dir / run_uuid)) + assert compact == [], f"Mechanical step should be filtered, got {compact}" + files = exporter._list_workspace_files(workspace_dir) + assert files == ["model.pkl"], f".DS_Store should be dropped, got {files}" + cmd = exporter._write_recipe(capsule_dir / run_uuid, memory_dir / run_uuid) + assert cmd == "python recipe.py", cmd + recipe = (capsule_dir / run_uuid / "recipe.py").read_text() + assert "plt.savefig" in recipe and "step 1 ยท single_agent" in recipe, recipe + analysis = build_analysis("smoke goal", run_uuid, files, [], cmd) + universe = build_universe([], run_uuid) + out = write_export(capsule_dir / run_uuid, analysis, universe) + assert out.exists() and out.parent == capsule_dir / run_uuid, out + print(f"[OK] astra_exporter smoke check passed (wrote {out})") + + +if __name__ == "__main__": + import argparse + parser = argparse.ArgumentParser( + description=( + "ASTRA exporter. Without args, runs a self-cleaning smoke check. " + "With --uuid, exports a real run's memory under sources/memory/ " + "to runs_capsule//." + ) + ) + parser.add_argument("--uuid", help="Run UUID under config.memory_dir to export.") + parser.add_argument("--goal", help="Override goal text (default: read from state_result.json).") + args = parser.parse_args() + if args.uuid: + sys.exit(_run_standalone(args.uuid, args.goal)) + _run_smoke_check() + sys.exit(0)