Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions config.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,9 @@ def __init__(self):
# Per-agent (SmolAgentFactory) execution timeout in seconds. Injected into
# the generated workflow as AGENT_EXECUTION_TIMEOUT. 3600 = 1 hour.
self.agent_execution_timeout: int = 18000
# Maximum input tokens per agent step before the context-window guard fires.
# Prevents runaway context explosion. Default 800k; raise for large-context models.
self.max_context_tokens: int = 800_000
self.runner_default_max_memory_mb: int = 10000
self.runner_default_max_cpu_percent: int = 100
self.runner_temp_dir: str = "./tmp"
Expand Down Expand Up @@ -237,6 +240,7 @@ def jsonify(
"runner_default_python_version": self.runner_default_python_version,
"runner_default_timeout": self.runner_default_timeout,
"agent_execution_timeout": self.agent_execution_timeout,
"max_context_tokens": self.max_context_tokens,
"runner_default_max_memory_mb": self.runner_default_max_memory_mb,
"runner_default_max_cpu_percent": self.runner_default_max_cpu_percent,
"runner_temp_dir": self.runner_temp_dir,
Expand All @@ -256,6 +260,7 @@ def from_json(self, data: dict[str, Any]) -> None:
)
self.smolagent_model_id = data.get("smolagent_model_id", self.smolagent_model_id)
self.judge_model = data.get("judge_model", self.judge_model)
self.capsule_namer_model = data.get("capsule_namer_model", self.capsule_namer_model)
self.engine_name = data.get("engine_name", self.engine_name)
self.openrouter_provider = data.get("openrouter_provider", self.openrouter_provider)
self.prompt_planner = data.get("prompt_planner", self.prompt_planner)
Expand Down Expand Up @@ -299,6 +304,9 @@ def from_json(self, data: dict[str, Any]) -> None:
self.agent_execution_timeout = data.get(
"agent_execution_timeout", self.agent_execution_timeout
)
self.max_context_tokens = data.get(
"max_context_tokens", self.max_context_tokens
)
self.runner_default_max_memory_mb = data.get(
"runner_default_max_memory_mb", self.runner_default_max_memory_mb
)
Expand Down Expand Up @@ -360,6 +368,7 @@ def __str__(self) -> str:
lines.append(f" runner_default_python_version={self.runner_default_python_version}")
lines.append(f" runner_default_timeout={self.runner_default_timeout}")
lines.append(f" agent_execution_timeout={self.agent_execution_timeout}")
lines.append(f" max_context_tokens={self.max_context_tokens}")
lines.append(f" runner_default_max_memory_mb={self.runner_default_max_memory_mb}")
lines.append(f" runner_default_max_cpu_percent={self.runner_default_max_cpu_percent}")
lines.append(f" runner_temp_dir={self.runner_temp_dir}")
Expand Down
25 changes: 21 additions & 4 deletions sources/benchmark_evaluation/csv_mode.py
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,17 @@ def _generate_task_default(self, row, workspace_subfolder: str | None = None):
All file operations MUST be performed within this subfolder.
"""

# When the Prompt is a self-contained task capsule (non-empty) and the URL
# points to a local file rather than a web resource, the file cannot be
# navigated to by the agent and the capsule already encodes all required
# context. In that case, skip the paper/URL wrapper entirely.
is_local_path = url and not url.startswith("http")
if prompt and is_local_path:
return f"""
{prompt}
{workspace_instruction}
""".strip()

return f"""
Paper title: {paper_title}
Url to paper: {url}
Expand Down Expand Up @@ -500,9 +511,11 @@ def _analyze_results(self, goal: str, results_str: str, execution_time: float) -
Provide your analysis following the specified output format."""

analysis_text = self.result_analyzer(prompt)
import re as _re
_sl = _re.search(r'SUCCESS_LEVEL:\s*(High|Medium|Low|Incomplete|Failed|Error)', analysis_text, _re.IGNORECASE)
analysis = {
"full_analysis": analysis_text,
"success_level": "Medium",
"success_level": _sl.group(1) if _sl else "Medium",
"key_insight": "Analysis completed"
}
return analysis
Expand Down Expand Up @@ -607,6 +620,8 @@ def _create_isolated_config(self, task_id: str) -> Any:
isolated_workspace = Path(self._base_workspace_dir) / f"worker_{task_id}"
isolated_workspace.mkdir(parents=True, exist_ok=True)
isolated_config.workspace_dir = str(isolated_workspace)
# Resolve to absolute so CWD-relative paths don't break in concurrent workers
isolated_config.runs_capsule_dir = str(Path(self.config.runs_capsule_dir).resolve())
return isolated_config

def _cleanup_isolated_workspace(self, task_id: str) -> None:
Expand Down Expand Up @@ -721,7 +736,7 @@ async def _process_single_task(
goal=goal,
judge=True,
enable_evolution=learning,
scenario_rubric=None,
scenario_rubric=scenario_rubric_filename,
single_agent_mode=single_agent_mode
)
results_str = self._format_task_mode_results(runs[-1])
Expand Down Expand Up @@ -827,9 +842,11 @@ def _analyze_results_isolated(self, goal: str, results_str: str, execution_time:
Provide your analysis following the specified output format."""

analysis_text = self.result_analyzer(prompt)
import re as _re
_sl = _re.search(r'SUCCESS_LEVEL:\s*(High|Medium|Low|Incomplete|Failed|Error)', analysis_text, _re.IGNORECASE)
analysis = {
"full_analysis": analysis_text,
"success_level": "Medium",
"success_level": _sl.group(1) if _sl else "Medium",
"key_insight": "Analysis completed"
}
return analysis
Expand Down Expand Up @@ -1017,7 +1034,7 @@ async def run_single_thread_eval_loop(self, dataset_type: str, dataset_path: str
runs = await self.evolve.start_workflow_evolution(goal=goal,
judge=True,
enable_evolution=learning,
scenario_rubric=None,
scenario_rubric=scenario_rubric_filename,
single_agent_mode=single_agent_mode
)
results_str = self._format_task_mode_results(runs[-1])
Expand Down
58 changes: 50 additions & 8 deletions sources/core/evolution_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -353,8 +353,12 @@ async def start_workflow_evolution(
wf = None
max_iteration = self.config.max_learning_evolve_iterations if enable_evolution else 1

# Reset archive at session start
# Reset archive and variation history at session start so each task
# starts with a clean slate (prevents boldness contamination across rows).
self.selection._archive = []
self.variation.score_history.clear()
self.variation.textual_gradient_history.clear()
self.variation.agent_count_history.clear()

parents, _ = self.select_parent_workflow(
goal, template_uuid=template_uuid
Expand Down Expand Up @@ -467,7 +471,8 @@ async def evolve_generation(
original_task=runs[-1].original_task,
single_agent_mode=single_agent_mode
)
wf_info = WorkflowInfo(uuid, Path(f"{self.workflow_dir}/{uuid}"))
_valid_uuid = uuid and uuid != "generation_failed"
wf_info = WorkflowInfo(uuid, Path(f"{self.workflow_dir}/{uuid}")) if _valid_uuid else None
self._save_evolution_prompt_artifact(uuid, runs[-1].prompt)
# Persist lineage as soon as we have a uuid so the evolution tree can
# include even runs that subsequently fail evaluation.
Expand All @@ -487,7 +492,7 @@ async def evolve_generation(
if workspace_mgr is not None and uuid:
workspace_mgr.save_run_snapshot(uuid)

if workflow_genotype_code:
if workflow_genotype_code and wf_info:
# Evaluate and calculate costs
eval_type, current_iteration_cost = await self._evaluate_and_calculate_cost(
executed, runs[-1].judge, uuid, runs[-1].answers, runs[-1].scenario_rubric, assertion_history
Expand All @@ -496,7 +501,7 @@ async def evolve_generation(
runs[-1].reward_uncapped = wf_info.overall_score_uncapped
runs[-1].code = wf_info.code

if uuid:
if uuid and wf_info:
verifier = (wf_info.state_result or {}).get("evaluation", {}).get("verifier", {})
is_failure = (
on_error
Expand All @@ -518,11 +523,14 @@ async def evolve_generation(
)

runs[-1].current_uuid = uuid
runs[-1].answers = wf_info.answers
runs[-1].state_result = wf_info.state_result
agents_answers = self.extract_agents_behavior(wf_info.state_result)
runs[-1].answers = wf_info.answers if wf_info else []
runs[-1].state_result = wf_info.state_result if wf_info else {}
agents_answers = self.extract_agents_behavior(wf_info.state_result if wf_info else {})
self.show_answers(agents_answers)
rewards_history.append(wf_info.overall_score)
if not on_error and wf_info:
rewards_history.append(wf_info.overall_score)
else:
rewards_history.append(0.0)

# ── Survivor validation: gate + populate _archive (steady-state population)
if uuid and not on_error:
Expand Down Expand Up @@ -677,6 +685,40 @@ async def evolve_generation(
runs[-1].plot = self._save_final_plots(assertion_history, rewards_history, uuid)
return runs

def _export_astra(self, best_uuid: str, goal: str) -> bool:
"""Best-effort post-run ASTRA export of the best workflow's trace.

Gated on ``config.export_astra`` (opt-in, off by default). Failure is
non-fatal: a missing memory directory, an LLM hiccup, or a YAML write
error must not break the evolution loop. Writes a ``.export_status``
sidecar under the capsule directory so callers can inspect outcomes.

Returns:
``True`` if export succeeded, ``False`` otherwise.
"""
if not getattr(self.config, "export_astra", False):
return False
status_dir = os.path.join(self.config.runs_capsule_dir, best_uuid)
status_path = os.path.join(status_dir, ".export_status")
try:
from sources.transparency import AstraExporter
result = AstraExporter(self.config, self.logger).export(best_uuid, goal)
os.makedirs(status_dir, exist_ok=True)
with open(status_path, "w") as _f:
json.dump({"status": "ok", "path": str(result)}, _f)
return True
except Exception as exc:
self.logger.warning(f"[ASTRA] export skipped: {exc}")
print_warn(f"ASTRA export failed (non-fatal): {exc}")
try:
os.makedirs(status_dir, exist_ok=True)
with open(status_path, "w") as _f:
json.dump({"status": "error", "error": str(exc)}, _f)
except Exception:
pass
return False


def _get_human_validation(self) -> bool:
"""Get human validation for continuing the workflow.

Expand Down
33 changes: 26 additions & 7 deletions sources/core/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,22 +47,41 @@ async def load_tools_code(self) -> tuple[str, str]:
"""
tools_code = ""
existing_tool_prompt = ""
import asyncio as _asyncio
tool_manager = ToolManager(self.config)
try:
tool_setup = False
while tool_setup == False:
_max_retries = 5
_backoff = 1.0
mcps = []
for _attempt in range(_max_retries):
try:
mcps = await tool_manager.discover_mcp_servers()
tool_setup = await tool_manager.verify_tools()
except Exception as e:
self.logger.error(f"load_tools_code: Failed to discover MCP servers: {str(e)}")
raise RuntimeError(f"Failed to discover MCP servers: {str(e)}") from e
if tool_setup and mcps:
break
except Exception as e:
if _attempt == _max_retries - 1:
self.logger.error(
f"load_tools_code: MCP discovery failed after {_max_retries} attempts: {e}"
)
raise RuntimeError(
f"MCP discovery failed after {_max_retries} attempts: {e}"
) from e
self.logger.warning(
f"MCP discovery attempt {_attempt + 1}/{_max_retries} failed: {e}. "
f"Retrying in {_backoff:.1f}s…"
)
await _asyncio.sleep(_backoff)
_backoff = min(_backoff * 2, 30.0)
else:
raise RuntimeError(
f"MCP servers did not become ready after {_max_retries} attempts."
)
if not mcps:
raise ValueError(
"\n" + "=" * 80 +
"\n🚨 FATAL ERROR: No MCP Servers Found! 🚨"
"\n" + "-" * 80 +
"\nPlease ensure at least one MCP instance is running on Toolomics."
"\nRetrying until MCPs detected.... use CTRL+C to stop."
"\n" + "=" * 80 + "\n"
)
for mcp in mcps:
Expand Down
29 changes: 21 additions & 8 deletions sources/core/llm_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,14 @@ def _is_claude_model(self) -> bool:
"""
return self.config.provider == "anthropic" or "claude" in self.config.model.lower()

def _is_no_temperature_model(self) -> bool:
"""True for models that reject the temperature parameter entirely.

Claude Opus 4.x (e.g. claude-opus-4-8) does not accept temperature
at all — passing it triggers an invalid_request_error from Anthropic.
"""
return "claude-opus-4" in self.config.model.lower()

def save_call(self, call: dict[str, Any]) -> None:
"""Save the API call details to a JSON file.

Expand Down Expand Up @@ -317,8 +325,11 @@ def _messages_match(self, expected: list[dict[str, Any]], cached: list[dict[str,

@staticmethod
def _is_temperature_error(error: Exception) -> bool:
"""True when the API rejected ``temperature``, read from ``error.param``."""
return getattr(error, "param", None) == "temperature"
"""True when the API rejected the ``temperature`` parameter."""
if getattr(error, "param", None) == "temperature":
return True
msg = str(error).lower()
return "temperature" in msg and ("deprecated" in msg or "range" in msg)

def _is_retryable_error(self, error: Exception) -> bool:
"""Check if an error is retryable (temporary/transient).
Expand Down Expand Up @@ -404,18 +415,20 @@ def __call__(self, prompt: str, timeout: int = 180, use_cache: bool = True) -> s
attempt = 0
max_wait = 500 # Maximum wait time in seconds
context_window_retry_count = 0 # Track context window errors specifically
effective_temperature = self.config.temperature
# None means "omit temperature from the request" (required for Opus 4.x)
effective_temperature = None if self._is_no_temperature_model() else self.config.temperature

while True: # Infinite retry loop
try:
completion_params = {
"model": f"{self.config.provider}/{self.config.model}",
"messages": message,
"temperature": effective_temperature,
"timeout": timeout,
"max_tokens": self.config.max_tokens,
"drop_params": True,
}
if effective_temperature is not None:
completion_params["temperature"] = effective_temperature
completion_params["api_key"] = self.config.key
# Add reasoning effort if supported (not for Claude models)
if self._supports_reasoning_tokens() and not self._is_claude_model():
Expand Down Expand Up @@ -454,12 +467,12 @@ def __call__(self, prompt: str, timeout: int = 180, use_cache: bool = True) -> s
attempt += 1

except Exception as e:
if self._is_temperature_error(e) and effective_temperature != 1.0:
if self._is_temperature_error(e) and effective_temperature is not None:
self.logger.warning(
f"Provider rejected temperature={effective_temperature:.2f}; "
f"falling back to 1.0 and retrying."
f"Provider rejected temperature={effective_temperature}; "
f"stripping temperature and retrying."
)
effective_temperature = 1.0
effective_temperature = None
continue

# Check if this is a retryable error
Expand Down
11 changes: 9 additions & 2 deletions sources/core/orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,15 @@ def progress_handler(line: str) -> None:
result.stdout or result.stderr or "No output from workflow execution."
)
else:
print_err(f"Workflow execution failed: {result.stderr}")
raise Exception(f"Workflow execution failed: {result.stderr}")
stderr = result.stderr or ""
if "TimeoutExpired" in stderr or "timed out" in stderr.lower():
error_type = "TIMEOUT"
elif "SyntaxError" in stderr:
error_type = "SYNTAX_ERROR"
else:
error_type = "RUNTIME_ERROR"
print_err(f"[{error_type}] Workflow execution failed: {stderr}")
raise Exception(f"[{error_type}] Workflow execution failed: {stderr}")

def perspicacite_grounding_task(self, task: str) -> str:
"""Query Perspicacite-AI for a literature-grounded approach to a task.
Expand Down
8 changes: 8 additions & 0 deletions sources/core/planner.py
Original file line number Diff line number Diff line change
Expand Up @@ -656,6 +656,14 @@ def _can_execute_step(self, step: PlanStep) -> tuple[bool, list[str]]:
dep_task = next((task for task in self.task_history if task.name == dep_name), None)
if dep_task is None or dep_task.status != TaskStatus.COMPLETED:
missing_deps.append(dep_name)
continue
expected_outputs = getattr(dep_task, "expected_outputs", None)
if expected_outputs:
outputs_ok, missing_outputs = self._verify_expected_outputs(expected_outputs)
if not outputs_ok:
missing_deps.append(
f"{dep_name}[missing_outputs:{','.join(missing_outputs)}]"
)
return len(missing_deps) == 0, missing_deps

def request_user_exit(self, msg: str) -> None:
Expand Down
5 changes: 3 additions & 2 deletions sources/core/selection.py
Original file line number Diff line number Diff line change
Expand Up @@ -577,6 +577,9 @@ def _add_to_archive(self, member: PopulationMember) -> None:
member: The population member to admit.
"""
self._archive.append(member)
# Refresh metrics first so eviction uses up-to-date qd_scores (including
# the newly-admitted member's novelty contribution to existing peers).
self._refresh_member_metrics()

if len(self._archive) > self.population_size:
weakest = min(self._archive, key=lambda m: m.qd_score)
Expand All @@ -587,8 +590,6 @@ def _add_to_archive(self, member: PopulationMember) -> None:
f"reward={weakest.reward:.3f}) — archive full"
)

self._refresh_member_metrics()

def _refresh_member_metrics(self) -> None:
"""Recompute stored novelty + qd_score for every archive member."""
if len(self._archive) < 2:
Expand Down
Loading