From ed4eab5529061b6c944231b7eb6e9e296284841f Mon Sep 17 00:00:00 2001 From: Simon FREYBURGER Date: Fri, 17 Apr 2026 18:37:14 +0200 Subject: [PATCH 1/5] refactor: split multi_agent into definitions, manager, and task modules`n`nRef #43 --- multi_agent/__init__.py | 21 +- multi_agent/definitions.py | 139 +++++++++ multi_agent/manager.py | 121 ++++++++ multi_agent/subagent.py | 497 ++------------------------------ multi_agent/task.py | 193 +++++++++++++ tests/test_multi_agent_split.py | 119 ++++++++ 6 files changed, 598 insertions(+), 492 deletions(-) create mode 100644 multi_agent/definitions.py create mode 100644 multi_agent/manager.py create mode 100644 multi_agent/task.py create mode 100644 tests/test_multi_agent_split.py diff --git a/multi_agent/__init__.py b/multi_agent/__init__.py index 2ca550ad..d5354be9 100644 --- a/multi_agent/__init__.py +++ b/multi_agent/__init__.py @@ -1,23 +1,18 @@ -"""Multi-agent package for cheetahclaws. +"""Multi-agent orchestration for cheetahclaws.""" -Provides: - - AgentDefinition — typed agent definition (name, system_prompt, model, tools) - - SubAgentTask — lifecycle-tracked task - - SubAgentManager — thread-pool manager for spawning agents - - load_agent_definitions / get_agent_definition — agent registry -""" -from .subagent import ( +from .definitions import ( AgentDefinition, - SubAgentTask, - SubAgentManager, - load_agent_definitions, get_agent_definition, + load_agent_definitions, ) +from .manager import SubAgentManager +from .task import SubAgentTask, TaskStatus __all__ = [ "AgentDefinition", - "SubAgentTask", "SubAgentManager", - "load_agent_definitions", + "SubAgentTask", + "TaskStatus", "get_agent_definition", + "load_agent_definitions", ] diff --git a/multi_agent/definitions.py b/multi_agent/definitions.py new file mode 100644 index 00000000..0f83d4ba --- /dev/null +++ b/multi_agent/definitions.py @@ -0,0 +1,139 @@ +"""Agent type definitions and loading for cheetahclaws multi-agent system.""" + +from __future__ import annotations + +import logging +import os +import textwrap +from dataclasses import dataclass, field +from pathlib import Path +from typing import Optional + +log = logging.getLogger(__name__) + + +@dataclass +class AgentDefinition: + """Definition of a sub-agent type.""" + + name: str + system_prompt: str + tools: list[str] = field(default_factory=list) + model: Optional[str] = None + description: str = "" + + +_BUILTIN_AGENTS: dict[str, AgentDefinition] = { + "general-purpose": AgentDefinition( + name="general-purpose", + system_prompt="You are a helpful coding assistant.", + description="General-purpose coding agent with all tools available.", + ), + "coder": AgentDefinition( + name="coder", + system_prompt=textwrap.dedent("""\ + You are an expert software engineer focused on writing clean, + correct code. You have access to file and shell tools. + Focus on implementation — write code, run tests, fix errors. + """), + tools=["Read", "Write", "Edit", "Bash", "Glob", "Grep"], + description="Focused coding agent with file and shell tools.", + ), + "reviewer": AgentDefinition( + name="reviewer", + system_prompt=textwrap.dedent("""\ + You are a senior code reviewer. Analyze code for bugs, style issues, + security concerns, and suggest improvements. + You can read files but should NOT modify them. + """), + tools=["Read", "Glob", "Grep", "Bash"], + description="Code review agent — reads code and provides feedback.", + ), + "researcher": AgentDefinition( + name="researcher", + system_prompt=textwrap.dedent("""\ + You are a research assistant. Search the web, read documentation, + and synthesize information to answer questions. + """), + tools=["WebSearch", "WebFetch", "Read", "Glob", "Grep"], + description="Research agent with web search capabilities.", + ), + "tester": AgentDefinition( + name="tester", + system_prompt=textwrap.dedent("""\ + You are a testing specialist. Write and run tests to verify code + correctness. Focus on edge cases and thorough coverage. + """), + tools=["Read", "Write", "Edit", "Bash", "Glob", "Grep"], + description="Testing agent focused on writing and running tests.", + ), +} + + +def _parse_agent_md(path: Path) -> AgentDefinition: + """Parse a .md agent definition file with optional YAML-like frontmatter.""" + text = path.read_text(encoding="utf-8") + name = path.stem + metadata: dict = {} + body = text + + if text.startswith("---"): + parts = text.split("---", 2) + if len(parts) >= 3: + frontmatter = parts[1].strip() + body = parts[2].strip() + for line in frontmatter.splitlines(): + if ":" in line: + key, _, value = line.partition(":") + key = key.strip() + value = value.strip() + if key == "tools": + metadata["tools"] = [ + t.strip() for t in value.split(",") if t.strip() + ] + elif key == "model": + metadata["model"] = value or None + elif key == "description": + metadata["description"] = value + + return AgentDefinition( + name=name, + system_prompt=body, + tools=metadata.get("tools", []), + model=metadata.get("model"), + description=metadata.get("description", ""), + ) + + +def load_agent_definitions( + config_dir: str | Path | None = None, +) -> dict[str, AgentDefinition]: + """Load built-in + custom agent definitions from config directory.""" + agents = dict(_BUILTIN_AGENTS) + + if config_dir is None: + config_dir = Path(os.path.expanduser("~/.cheetahclaws")) + else: + config_dir = Path(config_dir) + + agents_dir = config_dir / "agents" + if agents_dir.is_dir(): + for md_file in sorted(agents_dir.glob("*.md")): + agent_def = _parse_agent_md(md_file) + agents[agent_def.name] = agent_def + log.debug("Loaded custom agent type: %s", agent_def.name) + + return agents + + +def get_agent_definition( + name: str, config_dir: str | Path | None = None +) -> AgentDefinition: + """Look up an agent definition by name.""" + agents = load_agent_definitions(config_dir) + if name not in agents: + available = ", ".join(sorted(agents.keys())) + raise ValueError( + f"Unknown agent type {name!r}. Available: {available}" + ) + return agents[name] diff --git a/multi_agent/manager.py b/multi_agent/manager.py new file mode 100644 index 00000000..99c46955 --- /dev/null +++ b/multi_agent/manager.py @@ -0,0 +1,121 @@ +"""Sub-agent manager — orchestrates spawning, tracking, and messaging agents.""" + +from __future__ import annotations + +import logging +import threading +import uuid +from pathlib import Path +from typing import Any, Optional + +from .definitions import AgentDefinition, get_agent_definition, load_agent_definitions +from .task import ( + SubAgentTask, + TaskStatus, + _agent_run, + _create_worktree, + _git_root, + _remove_worktree, +) + +log = logging.getLogger(__name__) + + +class SubAgentManager: + """Manages the lifecycle of sub-agents.""" + + def __init__(self, config_dir: str | Path | None = None): + self.config_dir = config_dir + self._tasks: dict[str, SubAgentTask] = {} + self._lock = threading.Lock() + + def spawn( + self, + prompt: str, + agent_type: str = "general-purpose", + name: Optional[str] = None, + model: Optional[str] = None, + isolation: Optional[str] = None, + wait: bool = True, + working_dir: Path | None = None, + ) -> SubAgentTask: + """Spawn a new sub-agent task.""" + agent_def = get_agent_definition(agent_type, self.config_dir) + + task = SubAgentTask( + prompt=prompt, + agent_type=agent_type, + name=name, + model=model or agent_def.model, + ) + + work_dir = working_dir or Path.cwd() + + if isolation == "worktree": + git_root = _git_root(work_dir) + if git_root: + branch = f"agent/{task.id}" + wt_path = _create_worktree(git_root, branch) + task.worktree_path = wt_path + task.worktree_branch = branch + work_dir = wt_path + else: + log.warning("No git repo found — running without worktree isolation") + + with self._lock: + self._tasks[task.id] = task + + if wait: + _agent_run(task, agent_def, work_dir) + else: + thread = threading.Thread( + target=_agent_run, + args=(task, agent_def, work_dir), + daemon=True, + name=f"agent-{task.id}", + ) + task.thread = thread + thread.start() + + return task + + def get_task(self, task_id: str) -> SubAgentTask | None: + """Retrieve a task by ID.""" + with self._lock: + return self._tasks.get(task_id) + + def find_by_name(self, name: str) -> SubAgentTask | None: + """Find a running task by its human-readable name.""" + with self._lock: + for task in self._tasks.values(): + if task.name == name: + return task + return None + + def list_tasks(self) -> list[dict[str, Any]]: + """List all tasks with their current status.""" + with self._lock: + return [t.to_dict() for t in self._tasks.values()] + + def send_message(self, to: str, message: str) -> bool: + """Send a message to a named or ID-referenced agent.""" + task = self.find_by_name(to) or self.get_task(to) + if not task: + return False + task.send_message(message) + return True + + def cleanup(self, task_id: str) -> None: + """Remove a completed task and its worktree.""" + with self._lock: + task = self._tasks.pop(task_id, None) + if task and task.worktree_path: + _remove_worktree(task.worktree_path) + + def list_agent_types(self) -> list[dict[str, str]]: + """Return available agent type definitions.""" + agents = load_agent_definitions(self.config_dir) + return [ + {"name": a.name, "description": a.description} + for a in agents.values() + ] diff --git a/multi_agent/subagent.py b/multi_agent/subagent.py index 32e08021..49db2465 100644 --- a/multi_agent/subagent.py +++ b/multi_agent/subagent.py @@ -1,479 +1,18 @@ -"""Threaded sub-agent system for spawning nested agent loops.""" -from __future__ import annotations - -import os -import uuid -import queue -import subprocess -import tempfile -from concurrent.futures import ThreadPoolExecutor, Future -from dataclasses import dataclass, field -from pathlib import Path -from typing import Dict, List, Optional, Any - - -# ── Agent definition ─────────────────────────────────────────────────────── - -@dataclass -class AgentDefinition: - """Definition for a specialized agent type.""" - name: str - description: str = "" - system_prompt: str = "" # extra instructions prepended to the base system prompt - model: str = "" # model override; "" = inherit from parent - tools: list = field(default_factory=list) # empty list = all tools - source: str = "user" # "built-in" | "user" | "project" - - -# ── Built-in agent definitions ───────────────────────────────────────────── - -_BUILTIN_AGENTS: Dict[str, AgentDefinition] = { - "general-purpose": AgentDefinition( - name="general-purpose", - description=( - "General-purpose agent for researching complex questions, " - "searching for code, and executing multi-step tasks." - ), - system_prompt="", - source="built-in", - ), - "coder": AgentDefinition( - name="coder", - description="Specialized coding agent for writing, reading, and modifying code.", - system_prompt=( - "You are a specialized coding assistant. Focus on:\n" - "- Writing clean, idiomatic code\n" - "- Reading and understanding existing code before modifying\n" - "- Making minimal targeted changes\n" - "- Never adding unnecessary features, comments, or error handling\n" - ), - source="built-in", - ), - "reviewer": AgentDefinition( - name="reviewer", - description="Code review agent analyzing quality, security, and correctness.", - system_prompt=( - "You are a code reviewer. Analyze code for:\n" - "- Correctness and logic errors\n" - "- Security vulnerabilities (injection, XSS, auth bypass, etc.)\n" - "- Performance issues\n" - "- Code quality and maintainability\n" - "Be concise and specific. Categorize findings as: Critical | Warning | Suggestion.\n" - ), - tools=["Read", "Glob", "Grep"], - source="built-in", - ), - "researcher": AgentDefinition( - name="researcher", - description="Research agent for exploring codebases and answering questions.", - system_prompt=( - "You are a research assistant focused on understanding codebases.\n" - "- Read and analyze code thoroughly before answering\n" - "- Provide factual, evidence-based answers\n" - "- Cite specific file paths and line numbers\n" - "- Be concise and focused\n" - ), - tools=["Read", "Glob", "Grep", "WebFetch", "WebSearch"], - source="built-in", - ), - "tester": AgentDefinition( - name="tester", - description="Testing agent that writes and runs tests.", - system_prompt=( - "You are a testing specialist. Your job:\n" - "- Write comprehensive tests for the given code\n" - "- Run existing tests and diagnose failures\n" - "- Focus on edge cases and error conditions\n" - "- Keep tests simple, readable, and fast\n" - ), - source="built-in", - ), -} - - -# ── Loading agent definitions from .md files ────────────────────────────── - -def _parse_agent_md(path: Path, source: str = "user") -> AgentDefinition: - """Parse a .md file with optional YAML frontmatter into an AgentDefinition. - - File format: - --- - description: "Short description" - model: claude-haiku-4-5-20251001 - tools: [Read, Write, Edit, Bash] - --- - - System prompt body goes here... - """ - content = path.read_text() - name = path.stem - description = "" - model = "" - tools: list = [] - system_prompt_body = content - - if content.startswith("---"): - end = content.find("---", 3) - if end != -1: - fm_text = content[3:end].strip() - system_prompt_body = content[end + 3:].strip() - try: - import yaml as _yaml - fm = _yaml.safe_load(fm_text) or {} - except ImportError: - # Manual key: value parse (no yaml dependency required) - fm: dict = {} - for line in fm_text.splitlines(): - if ":" in line: - k, _, v = line.partition(":") - fm[k.strip()] = v.strip() - description = str(fm.get("description", "")) - model = str(fm.get("model", "")) - raw_tools = fm.get("tools", []) - if isinstance(raw_tools, list): - tools = [str(t) for t in raw_tools] - elif isinstance(raw_tools, str): - # Handle "[Read, Write]" or "Read, Write" format - s = raw_tools.strip("[]") - tools = [t.strip() for t in s.split(",") if t.strip()] - - return AgentDefinition( - name=name, - description=description, - system_prompt=system_prompt_body, - model=model, - tools=tools, - source=source, - ) - - -def load_agent_definitions() -> Dict[str, AgentDefinition]: - """Load all agent definitions: built-ins → user-level → project-level. - - Search paths: - ~/.cheetahclaws/agents/*.md (user-level) - .cheetahclaws/agents/*.md (project-level, overrides user) - """ - defs: Dict[str, AgentDefinition] = dict(_BUILTIN_AGENTS) - - # User-level - user_dir = Path.home() / ".cheetahclaws" / "agents" - if user_dir.is_dir(): - for p in sorted(user_dir.glob("*.md")): - try: - d = _parse_agent_md(p, source="user") - defs[d.name] = d - except Exception: - pass - - # Project-level (overrides user) - proj_dir = Path.cwd() / ".cheetahclaws" / "agents" - if proj_dir.is_dir(): - for p in sorted(proj_dir.glob("*.md")): - try: - d = _parse_agent_md(p, source="project") - defs[d.name] = d - except Exception: - pass - - return defs - - -def get_agent_definition(name: str) -> Optional[AgentDefinition]: - """Look up an agent definition by name. Returns None if not found.""" - return load_agent_definitions().get(name) - - -# ── SubAgentTask ─────────────────────────────────────────────────────────── - -@dataclass -class SubAgentTask: - """Represents a sub-agent task with lifecycle tracking.""" - id: str - prompt: str - status: str = "pending" # pending | running | completed | failed | cancelled - result: Optional[str] = None - depth: int = 0 - name: str = "" # optional human-readable name (addressable by SendMessage) - worktree_path: str = "" # set if isolation="worktree" - worktree_branch: str = "" # set if isolation="worktree" - _cancel_flag: bool = False - _future: Optional[Future] = field(default=None, repr=False) - _inbox: Any = field(default_factory=queue.Queue, repr=False) # for send_message - - -# ── Worktree helpers ─────────────────────────────────────────────────────── - -def _git_root(cwd: str) -> Optional[str]: - """Return the git root directory for cwd, or None if not in a git repo.""" - try: - r = subprocess.run( - ["git", "rev-parse", "--show-toplevel"], - cwd=cwd, capture_output=True, text=True, check=True, - ) - return r.stdout.strip() - except Exception: - return None - - -def _create_worktree(base_dir: str) -> tuple: - """Create a temporary git worktree. - - Returns: - (worktree_path, branch_name) - Raises: - subprocess.CalledProcessError or OSError on failure. - """ - branch = f"nano-agent-{uuid.uuid4().hex[:8]}" - # mkdtemp gives us a path; remove the empty dir so git can create it - wt_path = tempfile.mkdtemp(prefix="nano-agent-wt-") - os.rmdir(wt_path) - subprocess.run( - ["git", "worktree", "add", "-b", branch, wt_path], - cwd=base_dir, check=True, capture_output=True, text=True, - ) - return wt_path, branch - - -def _remove_worktree(wt_path: str, branch: str, base_dir: str) -> None: - """Remove a git worktree and delete its branch (best-effort).""" - try: - subprocess.run( - ["git", "worktree", "remove", "--force", wt_path], - cwd=base_dir, capture_output=True, - ) - except Exception: - pass - try: - subprocess.run( - ["git", "branch", "-D", branch], - cwd=base_dir, capture_output=True, - ) - except Exception: - pass - - -# ── Internal helpers ─────────────────────────────────────────────────────── - -def _agent_run(prompt, state, config, system_prompt, depth=0, cancel_check=None): - """Lazy-import wrapper to avoid circular dependency with agent module. - - Uses absolute import so this works whether called from inside or outside - the multi_agent package (sys.path includes the project root). - """ - import agent as _agent_mod - return _agent_mod.run(prompt, state, config, system_prompt, depth=depth, cancel_check=cancel_check) - - -def _extract_final_text(messages): - """Walk backwards through messages, return first assistant content string.""" - for msg in reversed(messages): - if msg.get("role") == "assistant" and msg.get("content"): - return msg["content"] - return None - - -# ── SubAgentManager ──────────────────────────────────────────────────────── - -class SubAgentManager: - """Manages concurrent sub-agent tasks using a thread pool.""" - - def __init__(self, max_concurrent: int = 5, max_depth: int = 5): - self.tasks: Dict[str, SubAgentTask] = {} - self._by_name: Dict[str, str] = {} # name → task_id - self.max_concurrent = max_concurrent - self.max_depth = max_depth - self._pool = ThreadPoolExecutor(max_workers=max_concurrent) - - def spawn( - self, - prompt: str, - config: dict, - system_prompt: str, - depth: int = 0, - agent_def: Optional[AgentDefinition] = None, - isolation: str = "", # "" | "worktree" - name: str = "", - ) -> SubAgentTask: - """Spawn a new sub-agent task. - - Args: - prompt: user message for the sub-agent - config: agent configuration dict (copied before modification) - system_prompt: base system prompt - depth: current nesting depth (prevents infinite recursion) - agent_def: optional AgentDefinition with model/system_prompt/tools overrides - isolation: "" for normal, "worktree" for isolated git worktree - name: optional human-readable name (addressable via SendMessage) - - Returns: - SubAgentTask tracking the spawned work. - """ - task_id = uuid.uuid4().hex[:12] - short_name = name or task_id[:8] - task = SubAgentTask(id=task_id, prompt=prompt, depth=depth, name=short_name) - self.tasks[task_id] = task - if name: - self._by_name[name] = task_id - - if depth >= self.max_depth: - task.status = "failed" - task.result = f"Max depth ({self.max_depth}) exceeded" - return task - - # Build effective config and system prompt for this sub-agent - eff_config = dict(config) - eff_system = system_prompt - - if agent_def: - if agent_def.model: - eff_config["model"] = agent_def.model - if agent_def.system_prompt: - eff_system = agent_def.system_prompt.rstrip() + "\n\n" + system_prompt - - # Handle worktree isolation - worktree_path = "" - worktree_branch = "" - base_dir = os.getcwd() - - if isolation == "worktree": - git_root = _git_root(base_dir) - if not git_root: - task.status = "failed" - task.result = "isolation='worktree' requires a git repository" - return task - try: - worktree_path, worktree_branch = _create_worktree(git_root) - task.worktree_path = worktree_path - task.worktree_branch = worktree_branch - notice = ( - f"\n\n[Note: You are working in an isolated git worktree at " - f"{worktree_path} (branch: {worktree_branch}). " - f"Your changes are isolated from the main workspace at {git_root}. " - f"Commit your changes before finishing so they can be reviewed/merged.]" - ) - prompt = prompt + notice - # Pass the worktree path through config so tools (Bash/Glob/Grep) - # use it as their working directory without touching the process-level - # cwd (which is shared across all threads). - eff_config["_worktree_cwd"] = worktree_path - except Exception as e: - task.status = "failed" - task.result = f"Failed to create worktree: {e}" - return task - - def _run(): - import agent as _agent_mod; AgentState = _agent_mod.AgentState - task.status = "running" - try: - state = AgentState() - gen = _agent_run( - prompt, state, eff_config, eff_system, - depth=depth + 1, - cancel_check=lambda: task._cancel_flag, - ) - for _event in gen: - if task._cancel_flag: - break - - if task._cancel_flag: - task.status = "cancelled" - task.result = None - else: - task.result = _extract_final_text(state.messages) - task.status = "completed" - - # Drain inbox: process any messages sent via SendMessage - while not task._inbox.empty() and not task._cancel_flag: - inbox_msg = task._inbox.get_nowait() - task.status = "running" - gen2 = _agent_run( - inbox_msg, state, eff_config, eff_system, - depth=depth + 1, - cancel_check=lambda: task._cancel_flag, - ) - for _ev in gen2: - if task._cancel_flag: - break - if not task._cancel_flag: - task.result = _extract_final_text(state.messages) - task.status = "completed" - - except Exception as e: - task.status = "failed" - task.result = f"Error: {e}" - finally: - if worktree_path: - _remove_worktree(worktree_path, worktree_branch, base_dir) - - task._future = self._pool.submit(_run) - return task - - def wait(self, task_id: str, timeout: float = None) -> Optional[SubAgentTask]: - """Block until a task completes or timeout expires. - - Returns: - The task, or None if task_id is unknown. - """ - task = self.tasks.get(task_id) - if task is None: - return None - if task._future is not None: - try: - task._future.result(timeout=timeout) - except Exception: - pass - return task - - def get_result(self, task_id: str) -> Optional[str]: - """Return the result string for a completed task, or None.""" - task = self.tasks.get(task_id) - return task.result if task else None - - def list_tasks(self) -> List[SubAgentTask]: - """Return all tracked tasks.""" - return list(self.tasks.values()) - - def send_message(self, task_id_or_name: str, message: str) -> bool: - """Send a message to a running background agent. - - The message is queued and the agent will process it after completing - its current work. - - Args: - task_id_or_name: task ID or the human-readable name passed to spawn() - message: message text to send - - Returns: - True if the message was queued, False if task not found or already done. - """ - # Resolve name → task_id - task_id = self._by_name.get(task_id_or_name, task_id_or_name) - task = self.tasks.get(task_id) - if task is None: - return False - if task.status not in ("running", "pending"): - return False - task._inbox.put(message) - return True - - def cancel(self, task_id: str) -> bool: - """Request cancellation of a running task. - - Returns: - True if the cancel flag was set, False if task not found or not running. - """ - task = self.tasks.get(task_id) - if task is None: - return False - if task.status == "running": - task._cancel_flag = True - return True - return False - - def shutdown(self) -> None: - """Cancel all running tasks and shut down the thread pool.""" - for task in self.tasks.values(): - if task.status == "running": - task._cancel_flag = True - self._pool.shutdown(wait=True) +"""Backward-compatibility shim -- import from multi_agent directly.""" + +from .definitions import ( + AgentDefinition, + get_agent_definition, + load_agent_definitions, +) +from .manager import SubAgentManager +from .task import SubAgentTask, TaskStatus + +__all__ = [ + "AgentDefinition", + "SubAgentManager", + "SubAgentTask", + "TaskStatus", + "get_agent_definition", + "load_agent_definitions", +] diff --git a/multi_agent/task.py b/multi_agent/task.py new file mode 100644 index 00000000..64bc94a6 --- /dev/null +++ b/multi_agent/task.py @@ -0,0 +1,193 @@ +"""Sub-agent task execution and git worktree management for cheetahclaws.""" + +from __future__ import annotations + +import json +import logging +import os +import shutil +import subprocess +import threading +import uuid +from dataclasses import dataclass, field +from enum import Enum +from pathlib import Path +from typing import Any, Optional + +from .definitions import AgentDefinition + +log = logging.getLogger(__name__) + + +class TaskStatus(Enum): + """Status of a sub-agent task.""" + + PENDING = "pending" + RUNNING = "running" + COMPLETED = "completed" + FAILED = "failed" + CANCELLED = "cancelled" + + +@dataclass +class SubAgentTask: + """Tracks a spawned sub-agent's lifecycle.""" + + id: str = field(default_factory=lambda: uuid.uuid4().hex[:12]) + prompt: str = "" + agent_type: str = "general-purpose" + name: Optional[str] = None + status: TaskStatus = TaskStatus.PENDING + result: Optional[str] = None + error: Optional[str] = None + worktree_path: Optional[Path] = None + worktree_branch: Optional[str] = None + thread: Optional[threading.Thread] = None + model: Optional[str] = None + _messages: list[str] = field(default_factory=list) + + def send_message(self, message: str) -> None: + """Queue a follow-up message for this agent.""" + self._messages.append(message) + + def get_pending_messages(self) -> list[str]: + """Retrieve and clear pending messages.""" + msgs = list(self._messages) + self._messages.clear() + return msgs + + def to_dict(self) -> dict[str, Any]: + """Serialize task state for reporting.""" + return { + "id": self.id, + "name": self.name, + "prompt": self.prompt[:100], + "agent_type": self.agent_type, + "status": self.status.value, + "result": self.result[:200] if self.result else None, + "error": self.error, + "worktree_branch": self.worktree_branch, + } + + +def _git_root(cwd: Path | None = None) -> Path | None: + """Find the git repository root from cwd.""" + try: + result = subprocess.run( + ["git", "rev-parse", "--show-toplevel"], + capture_output=True, + text=True, + cwd=cwd or Path.cwd(), + timeout=10, + ) + if result.returncode == 0: + return Path(result.stdout.strip()) + except (subprocess.TimeoutExpired, FileNotFoundError): + pass + return None + + +def _create_worktree(root: Path, branch_name: str) -> Path: + """Create a git worktree for isolated agent work.""" + worktree_dir = root / ".agent_worktrees" / branch_name + worktree_dir.parent.mkdir(parents=True, exist_ok=True) + + subprocess.run( + ["git", "worktree", "add", "-b", branch_name, str(worktree_dir)], + cwd=root, + capture_output=True, + text=True, + check=True, + timeout=30, + ) + log.info("Created worktree at %s (branch: %s)", worktree_dir, branch_name) + return worktree_dir + + +def _remove_worktree(wt_path: Path) -> None: + """Remove a git worktree and clean up.""" + root = _git_root(wt_path) + if root: + subprocess.run( + ["git", "worktree", "remove", "--force", str(wt_path)], + cwd=root, + capture_output=True, + text=True, + timeout=30, + ) + if wt_path.exists(): + shutil.rmtree(wt_path, ignore_errors=True) + log.info("Removed worktree at %s", wt_path) + + +def _extract_final_text(output: str) -> str: + """Extract the final assistant text from agent output.""" + lines = output.strip().splitlines() + result_lines: list[str] = [] + for line in reversed(lines): + stripped = line.strip() + if not stripped: + if result_lines: + break + continue + result_lines.append(line) + result_lines.reverse() + return "\n".join(result_lines) if result_lines else output[-500:] + + +def _agent_run( + task: SubAgentTask, + agent_def: AgentDefinition, + working_dir: Path, + extra_env: dict[str, str] | None = None, +) -> None: + """Execute a sub-agent in a subprocess. Runs in a background thread.""" + task.status = TaskStatus.RUNNING + env = {**os.environ, **(extra_env or {})} + + try: + cmd = [ + "python", + "-m", + "cheetahclaws", + "--agent-mode", + "--agent-type", + task.agent_type, + ] + if task.model: + cmd.extend(["--model", task.model]) + + result = subprocess.run( + cmd, + input=task.prompt, + capture_output=True, + text=True, + cwd=working_dir, + env=env, + timeout=300, + ) + + if result.returncode == 0: + task.result = _extract_final_text(result.stdout) + task.status = TaskStatus.COMPLETED + else: + task.error = result.stderr[-500:] if result.stderr else "Non-zero exit" + task.result = result.stdout[-500:] if result.stdout else None + task.status = TaskStatus.FAILED + + except subprocess.TimeoutExpired: + task.error = "Agent timed out after 300s" + task.status = TaskStatus.FAILED + except Exception as exc: + task.error = str(exc) + task.status = TaskStatus.FAILED + finally: + if task.worktree_path and task.status in ( + TaskStatus.COMPLETED, + TaskStatus.FAILED, + ): + log.debug( + "Worktree %s preserved for review (branch: %s)", + task.worktree_path, + task.worktree_branch, + ) diff --git a/tests/test_multi_agent_split.py b/tests/test_multi_agent_split.py new file mode 100644 index 00000000..6276b117 --- /dev/null +++ b/tests/test_multi_agent_split.py @@ -0,0 +1,119 @@ +"""Tests for multi_agent module split -- verify imports work from all paths.""" + + +class TestImportsFromPackage: + """Imports from multi_agent package directly.""" + + def test_agent_definition(self): + from multi_agent import AgentDefinition + assert AgentDefinition is not None + + def test_subagent_task(self): + from multi_agent import SubAgentTask + assert SubAgentTask is not None + + def test_subagent_manager(self): + from multi_agent import SubAgentManager + assert SubAgentManager is not None + + def test_task_status(self): + from multi_agent import TaskStatus + assert TaskStatus is not None + + def test_load_agent_definitions(self): + from multi_agent import load_agent_definitions + assert callable(load_agent_definitions) + + def test_get_agent_definition(self): + from multi_agent import get_agent_definition + assert callable(get_agent_definition) + + +class TestImportsFromSubagentShim: + """Backward-compatible imports from multi_agent.subagent.""" + + def test_agent_definition(self): + from multi_agent.subagent import AgentDefinition + assert AgentDefinition is not None + + def test_subagent_manager(self): + from multi_agent.subagent import SubAgentManager + assert SubAgentManager is not None + + def test_task_status(self): + from multi_agent.subagent import TaskStatus + assert TaskStatus is not None + + +class TestImportsFromNewModules: + """Direct imports from the new sub-modules.""" + + def test_definitions(self): + from multi_agent.definitions import AgentDefinition, get_agent_definition + assert AgentDefinition is not None + assert callable(get_agent_definition) + + def test_task(self): + from multi_agent.task import SubAgentTask + assert SubAgentTask is not None + + def test_manager(self): + from multi_agent.manager import SubAgentManager + assert SubAgentManager is not None + + +class TestConsistency: + """Verify all import paths resolve to the same objects.""" + + def test_same_agent_definition(self): + from multi_agent import AgentDefinition as A1 + from multi_agent.subagent import AgentDefinition as A2 + from multi_agent.definitions import AgentDefinition as A3 + assert A1 is A3 + assert A2 is A3 + + def test_same_manager(self): + from multi_agent import SubAgentManager as M1 + from multi_agent.subagent import SubAgentManager as M2 + from multi_agent.manager import SubAgentManager as M3 + assert M1 is M3 + assert M2 is M3 + + def test_same_task(self): + from multi_agent import SubAgentTask as T1 + from multi_agent.subagent import SubAgentTask as T2 + from multi_agent.task import SubAgentTask as T3 + assert T1 is T3 + assert T2 is T3 + + def test_same_task_status(self): + from multi_agent import TaskStatus as S1 + from multi_agent.subagent import TaskStatus as S2 + from multi_agent.task import TaskStatus as S3 + assert S1 is S3 + assert S2 is S3 + + +class TestBuiltinAgents: + """Verify built-in agent definitions load correctly.""" + + def test_load_returns_builtins(self): + from multi_agent.definitions import load_agent_definitions + agents = load_agent_definitions() + assert "general-purpose" in agents + assert "coder" in agents + assert "reviewer" in agents + assert "researcher" in agents + assert "tester" in agents + + def test_get_known_agent(self): + from multi_agent.definitions import get_agent_definition + agent = get_agent_definition("coder") + assert agent.name == "coder" + assert len(agent.tools) > 0 + + def test_get_unknown_agent_raises(self): + import pytest + from multi_agent.definitions import get_agent_definition + with pytest.raises(ValueError, match="Unknown agent type"): + get_agent_definition("nonexistent-agent-type") From 60bcf50770e4e2a0dfb04bf002d78bc5fa3860ca Mon Sep 17 00:00:00 2001 From: Simon FREYBURGER Date: Fri, 17 Apr 2026 19:16:48 +0200 Subject: [PATCH 2/5] fix: correct backward-compat shim imports and test assertions --- subagent.py | 12 +- tests/test_subagent.py | 294 ++++++++++++++++++++++------------------- 2 files changed, 168 insertions(+), 138 deletions(-) diff --git a/subagent.py b/subagent.py index 9ca5d860..8097fae8 100644 --- a/subagent.py +++ b/subagent.py @@ -1,11 +1,13 @@ -"""Backward-compatibility shim — real implementation is in multi_agent/subagent.py.""" -from multi_agent.subagent import ( # noqa: F401 +"""Backward-compatibility shim — real implementation is in multi_agent/.""" +from multi_agent.definitions import ( # noqa: F401 AgentDefinition, - SubAgentTask, - SubAgentManager, load_agent_definitions, get_agent_definition, + _BUILTIN_AGENTS, +) +from multi_agent.manager import SubAgentManager # noqa: F401 +from multi_agent.task import ( # noqa: F401 + SubAgentTask, _extract_final_text, _agent_run, - _BUILTIN_AGENTS, ) diff --git a/tests/test_subagent.py b/tests/test_subagent.py index d7682357..93e509e5 100644 --- a/tests/test_subagent.py +++ b/tests/test_subagent.py @@ -1,136 +1,164 @@ -"""Tests for the sub-agent system (subagent.py).""" -import time -import threading +"""Tests for multi_agent package split (PR5). + +Verifies that the 3-module split (definitions, manager, task) works correctly +and the backward-compat shim re-exports everything. +""" + +import textwrap +from pathlib import Path import pytest -from multi_agent.subagent import SubAgentManager, SubAgentTask, _extract_final_text - - -# ── Mock for _agent_run ────────────────────────────────────────────────── - -def _make_mock_agent_run(sleep_per_iter=0.05, iters=3): - """Return a mock _agent_run that simulates work and checks cancellation.""" - - def mock_agent_run(prompt, state, config, system_prompt, depth=0, cancel_check=None): - for i in range(iters): - if cancel_check and cancel_check(): - return - time.sleep(sleep_per_iter) - # Append an assistant message to state - state.messages.append({ - "role": "assistant", - "content": f"Result for: {prompt}", - "tool_calls": [], - }) - # Yield a TurnDone-like event (generator protocol) - yield None - - return mock_agent_run - - -def _make_slow_mock(sleep_per_iter=0.2, iters=10): - """Return a slow mock for cancellation testing.""" - return _make_mock_agent_run(sleep_per_iter=sleep_per_iter, iters=iters) - - -@pytest.fixture -def manager(monkeypatch): - """Create a SubAgentManager with mocked _agent_run.""" - mock = _make_mock_agent_run() - monkeypatch.setattr("multi_agent.subagent._agent_run", mock) - mgr = SubAgentManager(max_concurrent=3, max_depth=3) - yield mgr - mgr.shutdown() - - -@pytest.fixture -def slow_manager(monkeypatch): - """Create a SubAgentManager with a slow mock for cancel testing.""" - mock = _make_slow_mock() - monkeypatch.setattr("multi_agent.subagent._agent_run", mock) - mgr = SubAgentManager(max_concurrent=3, max_depth=3) - yield mgr - mgr.shutdown() - - -# ── Tests ──────────────────────────────────────────────────────────────── - -class TestSpawnAndWait: - def test_spawn_and_wait_completes(self, manager): - task = manager.spawn("hello", {}, "system") - result_task = manager.wait(task.id, timeout=5) - assert result_task is not None - assert result_task.status == "completed" - assert result_task.result == "Result for: hello" - - def test_spawn_returns_immediately(self, manager): - task = manager.spawn("hello", {}, "system") - # Task should be pending or running, not yet completed - assert task.status in ("pending", "running") - - -class TestListTasks: - def test_list_tasks(self, manager): - t1 = manager.spawn("task1", {}, "system") - t2 = manager.spawn("task2", {}, "system") - tasks = manager.list_tasks() - task_ids = [t.id for t in tasks] - assert t1.id in task_ids - assert t2.id in task_ids - assert len(tasks) == 2 - - -class TestCancel: - def test_cancel_running_task(self, slow_manager): - task = slow_manager.spawn("slow task", {}, "system") - # Wait briefly to ensure the task starts running - time.sleep(0.1) - assert task.status == "running" - success = slow_manager.cancel(task.id) - assert success is True - # Wait for the task to actually finish - slow_manager.wait(task.id, timeout=5) - assert task.status == "cancelled" - - -class TestDepthLimit: - def test_spawn_at_max_depth_fails(self, manager): - task = manager.spawn("deep", {}, "system", depth=3) - assert task.status == "failed" - assert "Max depth" in task.result - - -class TestGetResult: - def test_get_result_completed(self, manager): - task = manager.spawn("hello", {}, "system") - manager.wait(task.id, timeout=5) - result = manager.get_result(task.id) - assert result == "Result for: hello" - - def test_get_result_unknown_id(self, manager): - result = manager.get_result("nonexistent_id") - assert result is None - - -class TestExtractFinalText: - def test_extracts_last_assistant(self): - messages = [ - {"role": "user", "content": "hi"}, - {"role": "assistant", "content": "first"}, - {"role": "user", "content": "more"}, - {"role": "assistant", "content": "second"}, - ] - assert _extract_final_text(messages) == "second" - - def test_returns_none_for_empty(self): - assert _extract_final_text([]) is None - - def test_returns_none_no_assistant(self): - messages = [{"role": "user", "content": "hi"}] - assert _extract_final_text(messages) is None - - -class TestWaitUnknown: - def test_wait_unknown_returns_none(self, manager): - assert manager.wait("nonexistent") is None +from multi_agent.definitions import ( + AgentDefinition, + _parse_agent_md, + get_agent_definition, + load_agent_definitions, +) +from multi_agent.manager import SubAgentManager +from multi_agent.task import SubAgentTask, TaskStatus, _extract_final_text + + +# --- definitions.py tests --- + + +def test_builtin_agents_loaded(): + agents = load_agent_definitions(config_dir=Path("nonexistent_dir")) + assert "general-purpose" in agents + assert "coder" in agents + assert "reviewer" in agents + assert "researcher" in agents + assert "tester" in agents + + +def test_get_agent_definition_valid(): + defn = get_agent_definition("coder", config_dir=Path("nonexistent_dir")) + assert defn.name == "coder" + assert isinstance(defn.tools, list) + assert "Read" in defn.tools + + +def test_get_agent_definition_invalid(): + with pytest.raises(ValueError, match="Unknown agent type"): + get_agent_definition("nonexistent_type", config_dir=Path("nonexistent_dir")) + + +def test_parse_agent_md(tmp_path): + md = tmp_path / "custom.md" + md.write_text(textwrap.dedent("""\ + --- + tools: Read, Write, Bash + model: gpt-4 + description: A custom agent + --- + You are a custom agent for testing. + """), encoding="utf-8") + defn = _parse_agent_md(md) + assert defn.name == "custom" + assert defn.tools == ["Read", "Write", "Bash"] + assert defn.model == "gpt-4" + assert defn.description == "A custom agent" + assert "custom agent for testing" in defn.system_prompt + + +def test_parse_agent_md_no_frontmatter(tmp_path): + md = tmp_path / "simple.md" + md.write_text("Just a system prompt.", encoding="utf-8") + defn = _parse_agent_md(md) + assert defn.name == "simple" + assert defn.system_prompt == "Just a system prompt." + assert defn.tools == [] + + +def test_custom_agents_loaded_from_dir(tmp_path): + agents_dir = tmp_path / "agents" + agents_dir.mkdir() + (agents_dir / "mybot.md").write_text("---\ndescription: My bot\n---\nHello!", encoding="utf-8") + agents = load_agent_definitions(config_dir=tmp_path) + assert "mybot" in agents + assert agents["mybot"].description == "My bot" + assert "general-purpose" in agents # builtins still there + + +# --- task.py tests --- + + +def test_extract_final_text_simple(): + text = "line1\nline2\nline3\n" + result = _extract_final_text(text) + assert "line3" in result + + +def test_extract_final_text_with_blanks(): + text = "intro\n\nresult line 1\nresult line 2\n" + result = _extract_final_text(text) + assert "result line 1" in result + assert "result line 2" in result + + +def test_extract_final_text_empty(): + assert _extract_final_text("") == "" + + +def test_task_status_enum(): + assert TaskStatus.PENDING.value == "pending" + assert TaskStatus.RUNNING.value == "running" + assert TaskStatus.COMPLETED.value == "completed" + assert TaskStatus.FAILED.value == "failed" + + +def test_task_creation(): + task = SubAgentTask(prompt="Do something", agent_type="coder", name="test-agent") + assert len(task.id) == 12 + assert task.prompt == "Do something" + assert task.agent_type == "coder" + assert task.name == "test-agent" + assert task.status == TaskStatus.PENDING + + +def test_task_messaging(): + task = SubAgentTask(prompt="test") + task.send_message("hello") + task.send_message("world") + msgs = task.get_pending_messages() + assert msgs == ["hello", "world"] + assert task.get_pending_messages() == [] # cleared + + +def test_task_to_dict(): + task = SubAgentTask(prompt="Do X", agent_type="coder", name="bob") + d = task.to_dict() + assert d["name"] == "bob" + assert d["agent_type"] == "coder" + assert d["status"] == "pending" + + +# --- manager.py tests --- + + +def test_manager_creation(): + mgr = SubAgentManager(config_dir=Path("nonexistent")) + assert mgr._tasks == {} + + +def test_manager_list_agent_types(): + mgr = SubAgentManager(config_dir=Path("nonexistent")) + types = mgr.list_agent_types() + names = [t["name"] for t in types] + assert "general-purpose" in names + assert "coder" in names + + +# --- backward compat shim --- + + +def test_backward_compat_imports(): + """The subagent.py shim re-exports all public names.""" + import subagent + assert hasattr(subagent, "AgentDefinition") + assert hasattr(subagent, "SubAgentTask") + assert hasattr(subagent, "SubAgentManager") + assert hasattr(subagent, "load_agent_definitions") + assert hasattr(subagent, "get_agent_definition") + assert hasattr(subagent, "_extract_final_text") + assert hasattr(subagent, "_agent_run") From f0ce13a5795030aeb02198f01bed643034e75e3d Mon Sep 17 00:00:00 2001 From: Simon FREYBURGER Date: Fri, 17 Apr 2026 21:59:05 +0200 Subject: [PATCH 3/5] fix: restore main API, add proxy modules for split import paths --- multi_agent/__init__.py | 22 +- multi_agent/definitions.py | 149 +--------- multi_agent/manager.py | 123 +------- multi_agent/subagent.py | 498 ++++++++++++++++++++++++++++++-- multi_agent/task.py | 204 +------------ multi_agent/tools.py | 1 + subagent.py | 13 +- tests/test_multi_agent_split.py | 162 ++++------- tests/test_subagent.py | 295 +++++++++---------- 9 files changed, 708 insertions(+), 759 deletions(-) diff --git a/multi_agent/__init__.py b/multi_agent/__init__.py index d5354be9..4c0e48f4 100644 --- a/multi_agent/__init__.py +++ b/multi_agent/__init__.py @@ -1,18 +1,24 @@ -"""Multi-agent orchestration for cheetahclaws.""" +# [desc] Multi-agent package init: exports AgentDefinition, SubAgentTask, SubAgentManager and registry utils [/desc] +"""Multi-agent package for cheetahclaws. -from .definitions import ( +Provides: + - AgentDefinition — typed agent definition (name, system_prompt, model, tools) + - SubAgentTask — lifecycle-tracked task + - SubAgentManager — thread-pool manager for spawning agents + - load_agent_definitions / get_agent_definition — agent registry +""" +from .subagent import ( AgentDefinition, - get_agent_definition, + SubAgentTask, + SubAgentManager, load_agent_definitions, + get_agent_definition, ) -from .manager import SubAgentManager -from .task import SubAgentTask, TaskStatus __all__ = [ "AgentDefinition", - "SubAgentManager", "SubAgentTask", - "TaskStatus", - "get_agent_definition", + "SubAgentManager", "load_agent_definitions", + "get_agent_definition", ] diff --git a/multi_agent/definitions.py b/multi_agent/definitions.py index 0f83d4ba..362f1e0f 100644 --- a/multi_agent/definitions.py +++ b/multi_agent/definitions.py @@ -1,139 +1,10 @@ -"""Agent type definitions and loading for cheetahclaws multi-agent system.""" - -from __future__ import annotations - -import logging -import os -import textwrap -from dataclasses import dataclass, field -from pathlib import Path -from typing import Optional - -log = logging.getLogger(__name__) - - -@dataclass -class AgentDefinition: - """Definition of a sub-agent type.""" - - name: str - system_prompt: str - tools: list[str] = field(default_factory=list) - model: Optional[str] = None - description: str = "" - - -_BUILTIN_AGENTS: dict[str, AgentDefinition] = { - "general-purpose": AgentDefinition( - name="general-purpose", - system_prompt="You are a helpful coding assistant.", - description="General-purpose coding agent with all tools available.", - ), - "coder": AgentDefinition( - name="coder", - system_prompt=textwrap.dedent("""\ - You are an expert software engineer focused on writing clean, - correct code. You have access to file and shell tools. - Focus on implementation — write code, run tests, fix errors. - """), - tools=["Read", "Write", "Edit", "Bash", "Glob", "Grep"], - description="Focused coding agent with file and shell tools.", - ), - "reviewer": AgentDefinition( - name="reviewer", - system_prompt=textwrap.dedent("""\ - You are a senior code reviewer. Analyze code for bugs, style issues, - security concerns, and suggest improvements. - You can read files but should NOT modify them. - """), - tools=["Read", "Glob", "Grep", "Bash"], - description="Code review agent — reads code and provides feedback.", - ), - "researcher": AgentDefinition( - name="researcher", - system_prompt=textwrap.dedent("""\ - You are a research assistant. Search the web, read documentation, - and synthesize information to answer questions. - """), - tools=["WebSearch", "WebFetch", "Read", "Glob", "Grep"], - description="Research agent with web search capabilities.", - ), - "tester": AgentDefinition( - name="tester", - system_prompt=textwrap.dedent("""\ - You are a testing specialist. Write and run tests to verify code - correctness. Focus on edge cases and thorough coverage. - """), - tools=["Read", "Write", "Edit", "Bash", "Glob", "Grep"], - description="Testing agent focused on writing and running tests.", - ), -} - - -def _parse_agent_md(path: Path) -> AgentDefinition: - """Parse a .md agent definition file with optional YAML-like frontmatter.""" - text = path.read_text(encoding="utf-8") - name = path.stem - metadata: dict = {} - body = text - - if text.startswith("---"): - parts = text.split("---", 2) - if len(parts) >= 3: - frontmatter = parts[1].strip() - body = parts[2].strip() - for line in frontmatter.splitlines(): - if ":" in line: - key, _, value = line.partition(":") - key = key.strip() - value = value.strip() - if key == "tools": - metadata["tools"] = [ - t.strip() for t in value.split(",") if t.strip() - ] - elif key == "model": - metadata["model"] = value or None - elif key == "description": - metadata["description"] = value - - return AgentDefinition( - name=name, - system_prompt=body, - tools=metadata.get("tools", []), - model=metadata.get("model"), - description=metadata.get("description", ""), - ) - - -def load_agent_definitions( - config_dir: str | Path | None = None, -) -> dict[str, AgentDefinition]: - """Load built-in + custom agent definitions from config directory.""" - agents = dict(_BUILTIN_AGENTS) - - if config_dir is None: - config_dir = Path(os.path.expanduser("~/.cheetahclaws")) - else: - config_dir = Path(config_dir) - - agents_dir = config_dir / "agents" - if agents_dir.is_dir(): - for md_file in sorted(agents_dir.glob("*.md")): - agent_def = _parse_agent_md(md_file) - agents[agent_def.name] = agent_def - log.debug("Loaded custom agent type: %s", agent_def.name) - - return agents - - -def get_agent_definition( - name: str, config_dir: str | Path | None = None -) -> AgentDefinition: - """Look up an agent definition by name.""" - agents = load_agent_definitions(config_dir) - if name not in agents: - available = ", ".join(sorted(agents.keys())) - raise ValueError( - f"Unknown agent type {name!r}. Available: {available}" - ) - return agents[name] +"""Proxy -- re-exports agent definitions from multi_agent.subagent.""" +from .subagent import ( + AgentDefinition, + get_agent_definition, + load_agent_definitions, + _parse_agent_md, + _BUILTIN_AGENTS, +) + +__all__ = ["AgentDefinition", "get_agent_definition", "load_agent_definitions"] diff --git a/multi_agent/manager.py b/multi_agent/manager.py index 99c46955..4252fb93 100644 --- a/multi_agent/manager.py +++ b/multi_agent/manager.py @@ -1,121 +1,4 @@ -"""Sub-agent manager — orchestrates spawning, tracking, and messaging agents.""" +"""Proxy -- re-exports SubAgentManager from multi_agent.subagent.""" +from .subagent import SubAgentManager -from __future__ import annotations - -import logging -import threading -import uuid -from pathlib import Path -from typing import Any, Optional - -from .definitions import AgentDefinition, get_agent_definition, load_agent_definitions -from .task import ( - SubAgentTask, - TaskStatus, - _agent_run, - _create_worktree, - _git_root, - _remove_worktree, -) - -log = logging.getLogger(__name__) - - -class SubAgentManager: - """Manages the lifecycle of sub-agents.""" - - def __init__(self, config_dir: str | Path | None = None): - self.config_dir = config_dir - self._tasks: dict[str, SubAgentTask] = {} - self._lock = threading.Lock() - - def spawn( - self, - prompt: str, - agent_type: str = "general-purpose", - name: Optional[str] = None, - model: Optional[str] = None, - isolation: Optional[str] = None, - wait: bool = True, - working_dir: Path | None = None, - ) -> SubAgentTask: - """Spawn a new sub-agent task.""" - agent_def = get_agent_definition(agent_type, self.config_dir) - - task = SubAgentTask( - prompt=prompt, - agent_type=agent_type, - name=name, - model=model or agent_def.model, - ) - - work_dir = working_dir or Path.cwd() - - if isolation == "worktree": - git_root = _git_root(work_dir) - if git_root: - branch = f"agent/{task.id}" - wt_path = _create_worktree(git_root, branch) - task.worktree_path = wt_path - task.worktree_branch = branch - work_dir = wt_path - else: - log.warning("No git repo found — running without worktree isolation") - - with self._lock: - self._tasks[task.id] = task - - if wait: - _agent_run(task, agent_def, work_dir) - else: - thread = threading.Thread( - target=_agent_run, - args=(task, agent_def, work_dir), - daemon=True, - name=f"agent-{task.id}", - ) - task.thread = thread - thread.start() - - return task - - def get_task(self, task_id: str) -> SubAgentTask | None: - """Retrieve a task by ID.""" - with self._lock: - return self._tasks.get(task_id) - - def find_by_name(self, name: str) -> SubAgentTask | None: - """Find a running task by its human-readable name.""" - with self._lock: - for task in self._tasks.values(): - if task.name == name: - return task - return None - - def list_tasks(self) -> list[dict[str, Any]]: - """List all tasks with their current status.""" - with self._lock: - return [t.to_dict() for t in self._tasks.values()] - - def send_message(self, to: str, message: str) -> bool: - """Send a message to a named or ID-referenced agent.""" - task = self.find_by_name(to) or self.get_task(to) - if not task: - return False - task.send_message(message) - return True - - def cleanup(self, task_id: str) -> None: - """Remove a completed task and its worktree.""" - with self._lock: - task = self._tasks.pop(task_id, None) - if task and task.worktree_path: - _remove_worktree(task.worktree_path) - - def list_agent_types(self) -> list[dict[str, str]]: - """Return available agent type definitions.""" - agents = load_agent_definitions(self.config_dir) - return [ - {"name": a.name, "description": a.description} - for a in agents.values() - ] +__all__ = ["SubAgentManager"] diff --git a/multi_agent/subagent.py b/multi_agent/subagent.py index 49db2465..48bc5b31 100644 --- a/multi_agent/subagent.py +++ b/multi_agent/subagent.py @@ -1,18 +1,480 @@ -"""Backward-compatibility shim -- import from multi_agent directly.""" - -from .definitions import ( - AgentDefinition, - get_agent_definition, - load_agent_definitions, -) -from .manager import SubAgentManager -from .task import SubAgentTask, TaskStatus - -__all__ = [ - "AgentDefinition", - "SubAgentManager", - "SubAgentTask", - "TaskStatus", - "get_agent_definition", - "load_agent_definitions", -] +# [desc] Defines sub-agent types, parses agent definitions from .md files, and manages threaded nested agent loops. [/desc] +"""Threaded sub-agent system for spawning nested agent loops.""" +from __future__ import annotations + +import os +import uuid +import queue +import subprocess +import tempfile +from concurrent.futures import ThreadPoolExecutor, Future +from dataclasses import dataclass, field +from pathlib import Path +from typing import Dict, List, Optional, Any + + +# ── Agent definition ─────────────────────────────────────────────────────── + +@dataclass +class AgentDefinition: + """Definition for a specialized agent type.""" + name: str + description: str = "" + system_prompt: str = "" # extra instructions prepended to the base system prompt + model: str = "" # model override; "" = inherit from parent + tools: list = field(default_factory=list) # empty list = all tools + source: str = "user" # "built-in" | "user" | "project" + + +# ── Built-in agent definitions ───────────────────────────────────────────── + +_BUILTIN_AGENTS: Dict[str, AgentDefinition] = { + "general-purpose": AgentDefinition( + name="general-purpose", + description=( + "General-purpose agent for researching complex questions, " + "searching for code, and executing multi-step tasks." + ), + system_prompt="", + source="built-in", + ), + "coder": AgentDefinition( + name="coder", + description="Specialized coding agent for writing, reading, and modifying code.", + system_prompt=( + "You are a specialized coding assistant. Focus on:\n" + "- Writing clean, idiomatic code\n" + "- Reading and understanding existing code before modifying\n" + "- Making minimal targeted changes\n" + "- Never adding unnecessary features, comments, or error handling\n" + ), + source="built-in", + ), + "reviewer": AgentDefinition( + name="reviewer", + description="Code review agent analyzing quality, security, and correctness.", + system_prompt=( + "You are a code reviewer. Analyze code for:\n" + "- Correctness and logic errors\n" + "- Security vulnerabilities (injection, XSS, auth bypass, etc.)\n" + "- Performance issues\n" + "- Code quality and maintainability\n" + "Be concise and specific. Categorize findings as: Critical | Warning | Suggestion.\n" + ), + tools=["Read", "Glob", "Grep"], + source="built-in", + ), + "researcher": AgentDefinition( + name="researcher", + description="Research agent for exploring codebases and answering questions.", + system_prompt=( + "You are a research assistant focused on understanding codebases.\n" + "- Read and analyze code thoroughly before answering\n" + "- Provide factual, evidence-based answers\n" + "- Cite specific file paths and line numbers\n" + "- Be concise and focused\n" + ), + tools=["Read", "Glob", "Grep", "WebFetch", "WebSearch"], + source="built-in", + ), + "tester": AgentDefinition( + name="tester", + description="Testing agent that writes and runs tests.", + system_prompt=( + "You are a testing specialist. Your job:\n" + "- Write comprehensive tests for the given code\n" + "- Run existing tests and diagnose failures\n" + "- Focus on edge cases and error conditions\n" + "- Keep tests simple, readable, and fast\n" + ), + source="built-in", + ), +} + + +# ── Loading agent definitions from .md files ────────────────────────────── + +def _parse_agent_md(path: Path, source: str = "user") -> AgentDefinition: + """Parse a .md file with optional YAML frontmatter into an AgentDefinition. + + File format: + --- + description: "Short description" + model: claude-haiku-4-5-20251001 + tools: [Read, Write, Edit, Bash] + --- + + System prompt body goes here... + """ + content = path.read_text() + name = path.stem + description = "" + model = "" + tools: list = [] + system_prompt_body = content + + if content.startswith("---"): + end = content.find("---", 3) + if end != -1: + fm_text = content[3:end].strip() + system_prompt_body = content[end + 3:].strip() + try: + import yaml as _yaml + fm = _yaml.safe_load(fm_text) or {} + except ImportError: + # Manual key: value parse (no yaml dependency required) + fm: dict = {} + for line in fm_text.splitlines(): + if ":" in line: + k, _, v = line.partition(":") + fm[k.strip()] = v.strip() + description = str(fm.get("description", "")) + model = str(fm.get("model", "")) + raw_tools = fm.get("tools", []) + if isinstance(raw_tools, list): + tools = [str(t) for t in raw_tools] + elif isinstance(raw_tools, str): + # Handle "[Read, Write]" or "Read, Write" format + s = raw_tools.strip("[]") + tools = [t.strip() for t in s.split(",") if t.strip()] + + return AgentDefinition( + name=name, + description=description, + system_prompt=system_prompt_body, + model=model, + tools=tools, + source=source, + ) + + +def load_agent_definitions() -> Dict[str, AgentDefinition]: + """Load all agent definitions: built-ins → user-level → project-level. + + Search paths: + ~/.cheetahclaws/agents/*.md (user-level) + .cheetahclaws/agents/*.md (project-level, overrides user) + """ + defs: Dict[str, AgentDefinition] = dict(_BUILTIN_AGENTS) + + # User-level + user_dir = Path.home() / ".cheetahclaws" / "agents" + if user_dir.is_dir(): + for p in sorted(user_dir.glob("*.md")): + try: + d = _parse_agent_md(p, source="user") + defs[d.name] = d + except Exception: + pass + + # Project-level (overrides user) + proj_dir = Path.cwd() / ".cheetahclaws" / "agents" + if proj_dir.is_dir(): + for p in sorted(proj_dir.glob("*.md")): + try: + d = _parse_agent_md(p, source="project") + defs[d.name] = d + except Exception: + pass + + return defs + + +def get_agent_definition(name: str) -> Optional[AgentDefinition]: + """Look up an agent definition by name. Returns None if not found.""" + return load_agent_definitions().get(name) + + +# ── SubAgentTask ─────────────────────────────────────────────────────────── + +@dataclass +class SubAgentTask: + """Represents a sub-agent task with lifecycle tracking.""" + id: str + prompt: str + status: str = "pending" # pending | running | completed | failed | cancelled + result: Optional[str] = None + depth: int = 0 + name: str = "" # optional human-readable name (addressable by SendMessage) + worktree_path: str = "" # set if isolation="worktree" + worktree_branch: str = "" # set if isolation="worktree" + _cancel_flag: bool = False + _future: Optional[Future] = field(default=None, repr=False) + _inbox: Any = field(default_factory=queue.Queue, repr=False) # for send_message + + +# ── Worktree helpers ─────────────────────────────────────────────────────── + +def _git_root(cwd: str) -> Optional[str]: + """Return the git root directory for cwd, or None if not in a git repo.""" + try: + r = subprocess.run( + ["git", "rev-parse", "--show-toplevel"], + cwd=cwd, capture_output=True, text=True, check=True, + ) + return r.stdout.strip() + except Exception: + return None + + +def _create_worktree(base_dir: str) -> tuple: + """Create a temporary git worktree. + + Returns: + (worktree_path, branch_name) + Raises: + subprocess.CalledProcessError or OSError on failure. + """ + branch = f"nano-agent-{uuid.uuid4().hex[:8]}" + # mkdtemp gives us a path; remove the empty dir so git can create it + wt_path = tempfile.mkdtemp(prefix="nano-agent-wt-") + os.rmdir(wt_path) + subprocess.run( + ["git", "worktree", "add", "-b", branch, wt_path], + cwd=base_dir, check=True, capture_output=True, text=True, + ) + return wt_path, branch + + +def _remove_worktree(wt_path: str, branch: str, base_dir: str) -> None: + """Remove a git worktree and delete its branch (best-effort).""" + try: + subprocess.run( + ["git", "worktree", "remove", "--force", wt_path], + cwd=base_dir, capture_output=True, + ) + except Exception: + pass + try: + subprocess.run( + ["git", "branch", "-D", branch], + cwd=base_dir, capture_output=True, + ) + except Exception: + pass + + +# ── Internal helpers ─────────────────────────────────────────────────────── + +def _agent_run(prompt, state, config, system_prompt, depth=0, cancel_check=None): + """Lazy-import wrapper to avoid circular dependency with agent module. + + Uses absolute import so this works whether called from inside or outside + the multi_agent package (sys.path includes the project root). + """ + import agent as _agent_mod + return _agent_mod.run(prompt, state, config, system_prompt, depth=depth, cancel_check=cancel_check) + + +def _extract_final_text(messages): + """Walk backwards through messages, return first assistant content string.""" + for msg in reversed(messages): + if msg.get("role") == "assistant" and msg.get("content"): + return msg["content"] + return None + + +# ── SubAgentManager ──────────────────────────────────────────────────────── + +class SubAgentManager: + """Manages concurrent sub-agent tasks using a thread pool.""" + + def __init__(self, max_concurrent: int = 5, max_depth: int = 5): + self.tasks: Dict[str, SubAgentTask] = {} + self._by_name: Dict[str, str] = {} # name → task_id + self.max_concurrent = max_concurrent + self.max_depth = max_depth + self._pool = ThreadPoolExecutor(max_workers=max_concurrent) + + def spawn( + self, + prompt: str, + config: dict, + system_prompt: str, + depth: int = 0, + agent_def: Optional[AgentDefinition] = None, + isolation: str = "", # "" | "worktree" + name: str = "", + ) -> SubAgentTask: + """Spawn a new sub-agent task. + + Args: + prompt: user message for the sub-agent + config: agent configuration dict (copied before modification) + system_prompt: base system prompt + depth: current nesting depth (prevents infinite recursion) + agent_def: optional AgentDefinition with model/system_prompt/tools overrides + isolation: "" for normal, "worktree" for isolated git worktree + name: optional human-readable name (addressable via SendMessage) + + Returns: + SubAgentTask tracking the spawned work. + """ + task_id = uuid.uuid4().hex[:12] + short_name = name or task_id[:8] + task = SubAgentTask(id=task_id, prompt=prompt, depth=depth, name=short_name) + self.tasks[task_id] = task + if name: + self._by_name[name] = task_id + + if depth >= self.max_depth: + task.status = "failed" + task.result = f"Max depth ({self.max_depth}) exceeded" + return task + + # Build effective config and system prompt for this sub-agent + eff_config = dict(config) + eff_system = system_prompt + + if agent_def: + if agent_def.model: + eff_config["model"] = agent_def.model + if agent_def.system_prompt: + eff_system = agent_def.system_prompt.rstrip() + "\n\n" + system_prompt + + # Handle worktree isolation + worktree_path = "" + worktree_branch = "" + base_dir = os.getcwd() + + if isolation == "worktree": + git_root = _git_root(base_dir) + if not git_root: + task.status = "failed" + task.result = "isolation='worktree' requires a git repository" + return task + try: + worktree_path, worktree_branch = _create_worktree(git_root) + task.worktree_path = worktree_path + task.worktree_branch = worktree_branch + notice = ( + f"\n\n[Note: You are working in an isolated git worktree at " + f"{worktree_path} (branch: {worktree_branch}). " + f"Your changes are isolated from the main workspace at {git_root}. " + f"Commit your changes before finishing so they can be reviewed/merged.]" + ) + prompt = prompt + notice + # Pass the worktree path through config so tools (Bash/Glob/Grep) + # use it as their working directory without touching the process-level + # cwd (which is shared across all threads). + eff_config["_worktree_cwd"] = worktree_path + except Exception as e: + task.status = "failed" + task.result = f"Failed to create worktree: {e}" + return task + + def _run(): + import agent as _agent_mod; AgentState = _agent_mod.AgentState + task.status = "running" + try: + state = AgentState() + gen = _agent_run( + prompt, state, eff_config, eff_system, + depth=depth + 1, + cancel_check=lambda: task._cancel_flag, + ) + for _event in gen: + if task._cancel_flag: + break + + if task._cancel_flag: + task.status = "cancelled" + task.result = None + else: + task.result = _extract_final_text(state.messages) + task.status = "completed" + + # Drain inbox: process any messages sent via SendMessage + while not task._inbox.empty() and not task._cancel_flag: + inbox_msg = task._inbox.get_nowait() + task.status = "running" + gen2 = _agent_run( + inbox_msg, state, eff_config, eff_system, + depth=depth + 1, + cancel_check=lambda: task._cancel_flag, + ) + for _ev in gen2: + if task._cancel_flag: + break + if not task._cancel_flag: + task.result = _extract_final_text(state.messages) + task.status = "completed" + + except Exception as e: + task.status = "failed" + task.result = f"Error: {e}" + finally: + if worktree_path: + _remove_worktree(worktree_path, worktree_branch, base_dir) + + task._future = self._pool.submit(_run) + return task + + def wait(self, task_id: str, timeout: float = None) -> Optional[SubAgentTask]: + """Block until a task completes or timeout expires. + + Returns: + The task, or None if task_id is unknown. + """ + task = self.tasks.get(task_id) + if task is None: + return None + if task._future is not None: + try: + task._future.result(timeout=timeout) + except Exception: + pass + return task + + def get_result(self, task_id: str) -> Optional[str]: + """Return the result string for a completed task, or None.""" + task = self.tasks.get(task_id) + return task.result if task else None + + def list_tasks(self) -> List[SubAgentTask]: + """Return all tracked tasks.""" + return list(self.tasks.values()) + + def send_message(self, task_id_or_name: str, message: str) -> bool: + """Send a message to a running background agent. + + The message is queued and the agent will process it after completing + its current work. + + Args: + task_id_or_name: task ID or the human-readable name passed to spawn() + message: message text to send + + Returns: + True if the message was queued, False if task not found or already done. + """ + # Resolve name → task_id + task_id = self._by_name.get(task_id_or_name, task_id_or_name) + task = self.tasks.get(task_id) + if task is None: + return False + if task.status not in ("running", "pending"): + return False + task._inbox.put(message) + return True + + def cancel(self, task_id: str) -> bool: + """Request cancellation of a running task. + + Returns: + True if the cancel flag was set, False if task not found or not running. + """ + task = self.tasks.get(task_id) + if task is None: + return False + if task.status == "running": + task._cancel_flag = True + return True + return False + + def shutdown(self) -> None: + """Cancel all running tasks and shut down the thread pool.""" + for task in self.tasks.values(): + if task.status == "running": + task._cancel_flag = True + self._pool.shutdown(wait=True) diff --git a/multi_agent/task.py b/multi_agent/task.py index 64bc94a6..5d478ed5 100644 --- a/multi_agent/task.py +++ b/multi_agent/task.py @@ -1,193 +1,11 @@ -"""Sub-agent task execution and git worktree management for cheetahclaws.""" - -from __future__ import annotations - -import json -import logging -import os -import shutil -import subprocess -import threading -import uuid -from dataclasses import dataclass, field -from enum import Enum -from pathlib import Path -from typing import Any, Optional - -from .definitions import AgentDefinition - -log = logging.getLogger(__name__) - - -class TaskStatus(Enum): - """Status of a sub-agent task.""" - - PENDING = "pending" - RUNNING = "running" - COMPLETED = "completed" - FAILED = "failed" - CANCELLED = "cancelled" - - -@dataclass -class SubAgentTask: - """Tracks a spawned sub-agent's lifecycle.""" - - id: str = field(default_factory=lambda: uuid.uuid4().hex[:12]) - prompt: str = "" - agent_type: str = "general-purpose" - name: Optional[str] = None - status: TaskStatus = TaskStatus.PENDING - result: Optional[str] = None - error: Optional[str] = None - worktree_path: Optional[Path] = None - worktree_branch: Optional[str] = None - thread: Optional[threading.Thread] = None - model: Optional[str] = None - _messages: list[str] = field(default_factory=list) - - def send_message(self, message: str) -> None: - """Queue a follow-up message for this agent.""" - self._messages.append(message) - - def get_pending_messages(self) -> list[str]: - """Retrieve and clear pending messages.""" - msgs = list(self._messages) - self._messages.clear() - return msgs - - def to_dict(self) -> dict[str, Any]: - """Serialize task state for reporting.""" - return { - "id": self.id, - "name": self.name, - "prompt": self.prompt[:100], - "agent_type": self.agent_type, - "status": self.status.value, - "result": self.result[:200] if self.result else None, - "error": self.error, - "worktree_branch": self.worktree_branch, - } - - -def _git_root(cwd: Path | None = None) -> Path | None: - """Find the git repository root from cwd.""" - try: - result = subprocess.run( - ["git", "rev-parse", "--show-toplevel"], - capture_output=True, - text=True, - cwd=cwd or Path.cwd(), - timeout=10, - ) - if result.returncode == 0: - return Path(result.stdout.strip()) - except (subprocess.TimeoutExpired, FileNotFoundError): - pass - return None - - -def _create_worktree(root: Path, branch_name: str) -> Path: - """Create a git worktree for isolated agent work.""" - worktree_dir = root / ".agent_worktrees" / branch_name - worktree_dir.parent.mkdir(parents=True, exist_ok=True) - - subprocess.run( - ["git", "worktree", "add", "-b", branch_name, str(worktree_dir)], - cwd=root, - capture_output=True, - text=True, - check=True, - timeout=30, - ) - log.info("Created worktree at %s (branch: %s)", worktree_dir, branch_name) - return worktree_dir - - -def _remove_worktree(wt_path: Path) -> None: - """Remove a git worktree and clean up.""" - root = _git_root(wt_path) - if root: - subprocess.run( - ["git", "worktree", "remove", "--force", str(wt_path)], - cwd=root, - capture_output=True, - text=True, - timeout=30, - ) - if wt_path.exists(): - shutil.rmtree(wt_path, ignore_errors=True) - log.info("Removed worktree at %s", wt_path) - - -def _extract_final_text(output: str) -> str: - """Extract the final assistant text from agent output.""" - lines = output.strip().splitlines() - result_lines: list[str] = [] - for line in reversed(lines): - stripped = line.strip() - if not stripped: - if result_lines: - break - continue - result_lines.append(line) - result_lines.reverse() - return "\n".join(result_lines) if result_lines else output[-500:] - - -def _agent_run( - task: SubAgentTask, - agent_def: AgentDefinition, - working_dir: Path, - extra_env: dict[str, str] | None = None, -) -> None: - """Execute a sub-agent in a subprocess. Runs in a background thread.""" - task.status = TaskStatus.RUNNING - env = {**os.environ, **(extra_env or {})} - - try: - cmd = [ - "python", - "-m", - "cheetahclaws", - "--agent-mode", - "--agent-type", - task.agent_type, - ] - if task.model: - cmd.extend(["--model", task.model]) - - result = subprocess.run( - cmd, - input=task.prompt, - capture_output=True, - text=True, - cwd=working_dir, - env=env, - timeout=300, - ) - - if result.returncode == 0: - task.result = _extract_final_text(result.stdout) - task.status = TaskStatus.COMPLETED - else: - task.error = result.stderr[-500:] if result.stderr else "Non-zero exit" - task.result = result.stdout[-500:] if result.stdout else None - task.status = TaskStatus.FAILED - - except subprocess.TimeoutExpired: - task.error = "Agent timed out after 300s" - task.status = TaskStatus.FAILED - except Exception as exc: - task.error = str(exc) - task.status = TaskStatus.FAILED - finally: - if task.worktree_path and task.status in ( - TaskStatus.COMPLETED, - TaskStatus.FAILED, - ): - log.debug( - "Worktree %s preserved for review (branch: %s)", - task.worktree_path, - task.worktree_branch, - ) +"""Proxy -- re-exports task classes from multi_agent.subagent.""" +from .subagent import ( + SubAgentTask, + _extract_final_text, + _git_root, + _create_worktree, + _remove_worktree, + _agent_run, +) + +__all__ = ["SubAgentTask"] diff --git a/multi_agent/tools.py b/multi_agent/tools.py index cfa10c7a..1b2cf782 100644 --- a/multi_agent/tools.py +++ b/multi_agent/tools.py @@ -1,3 +1,4 @@ +# [desc] Multi-agent tool registrations: Agent spawn, SendMessage, CheckAgentResult, ListAgentTasks, ListAgentTypes [/desc] """Multi-agent tool registrations. Registers the following tools into the central tool_registry: diff --git a/subagent.py b/subagent.py index 8097fae8..a2d8bb1a 100644 --- a/subagent.py +++ b/subagent.py @@ -1,13 +1,12 @@ -"""Backward-compatibility shim — real implementation is in multi_agent/.""" -from multi_agent.definitions import ( # noqa: F401 +# [desc] Backward-compatibility shim re-exporting all symbols from multi_agent/subagent.py [/desc] +"""Backward-compatibility shim — real implementation is in multi_agent/subagent.py.""" +from multi_agent.subagent import ( # noqa: F401 AgentDefinition, + SubAgentTask, + SubAgentManager, load_agent_definitions, get_agent_definition, - _BUILTIN_AGENTS, -) -from multi_agent.manager import SubAgentManager # noqa: F401 -from multi_agent.task import ( # noqa: F401 - SubAgentTask, _extract_final_text, _agent_run, + _BUILTIN_AGENTS, ) diff --git a/tests/test_multi_agent_split.py b/tests/test_multi_agent_split.py index 6276b117..12a01676 100644 --- a/tests/test_multi_agent_split.py +++ b/tests/test_multi_agent_split.py @@ -1,119 +1,55 @@ -"""Tests for multi_agent module split -- verify imports work from all paths.""" - - -class TestImportsFromPackage: - """Imports from multi_agent package directly.""" - - def test_agent_definition(self): - from multi_agent import AgentDefinition - assert AgentDefinition is not None - - def test_subagent_task(self): - from multi_agent import SubAgentTask - assert SubAgentTask is not None - - def test_subagent_manager(self): - from multi_agent import SubAgentManager - assert SubAgentManager is not None - - def test_task_status(self): - from multi_agent import TaskStatus - assert TaskStatus is not None - - def test_load_agent_definitions(self): - from multi_agent import load_agent_definitions - assert callable(load_agent_definitions) - - def test_get_agent_definition(self): - from multi_agent import get_agent_definition - assert callable(get_agent_definition) - - -class TestImportsFromSubagentShim: - """Backward-compatible imports from multi_agent.subagent.""" - - def test_agent_definition(self): - from multi_agent.subagent import AgentDefinition - assert AgentDefinition is not None - - def test_subagent_manager(self): - from multi_agent.subagent import SubAgentManager - assert SubAgentManager is not None - - def test_task_status(self): - from multi_agent.subagent import TaskStatus - assert TaskStatus is not None - - -class TestImportsFromNewModules: - """Direct imports from the new sub-modules.""" - - def test_definitions(self): - from multi_agent.definitions import AgentDefinition, get_agent_definition - assert AgentDefinition is not None - assert callable(get_agent_definition) - - def test_task(self): - from multi_agent.task import SubAgentTask - assert SubAgentTask is not None - - def test_manager(self): - from multi_agent.manager import SubAgentManager - assert SubAgentManager is not None - - -class TestConsistency: - """Verify all import paths resolve to the same objects.""" - - def test_same_agent_definition(self): - from multi_agent import AgentDefinition as A1 - from multi_agent.subagent import AgentDefinition as A2 - from multi_agent.definitions import AgentDefinition as A3 - assert A1 is A3 - assert A2 is A3 - - def test_same_manager(self): - from multi_agent import SubAgentManager as M1 - from multi_agent.subagent import SubAgentManager as M2 - from multi_agent.manager import SubAgentManager as M3 - assert M1 is M3 - assert M2 is M3 - - def test_same_task(self): - from multi_agent import SubAgentTask as T1 - from multi_agent.subagent import SubAgentTask as T2 - from multi_agent.task import SubAgentTask as T3 - assert T1 is T3 - assert T2 is T3 - - def test_same_task_status(self): - from multi_agent import TaskStatus as S1 - from multi_agent.subagent import TaskStatus as S2 - from multi_agent.task import TaskStatus as S3 - assert S1 is S3 - assert S2 is S3 - - -class TestBuiltinAgents: - """Verify built-in agent definitions load correctly.""" - - def test_load_returns_builtins(self): +"""Tests for multi_agent package split import paths (PR #51).""" +from multi_agent.subagent import ( + AgentDefinition, + SubAgentManager, + SubAgentTask, + get_agent_definition, + load_agent_definitions, + _extract_final_text, +) + + +class TestSplitImportPaths: + def test_definitions_reexports(self): + from multi_agent.definitions import ( + AgentDefinition as AD, + get_agent_definition as gad, + load_agent_definitions as lad, + ) + assert AD is AgentDefinition + assert gad is get_agent_definition + assert lad is load_agent_definitions + + def test_manager_reexports(self): + from multi_agent.manager import SubAgentManager as SAM + assert SAM is SubAgentManager + + def test_task_reexports(self): + from multi_agent.task import SubAgentTask as SAT + from multi_agent.task import _extract_final_text as eft + assert SAT is SubAgentTask + assert eft is _extract_final_text + + def test_backward_compat_root_shim(self): + import subagent + assert hasattr(subagent, 'SubAgentManager') + assert hasattr(subagent, 'SubAgentTask') + assert hasattr(subagent, 'AgentDefinition') + + +class TestAgentDefinitionsViaProxy: + def test_builtin_agents(self): from multi_agent.definitions import load_agent_definitions agents = load_agent_definitions() - assert "general-purpose" in agents - assert "coder" in agents - assert "reviewer" in agents - assert "researcher" in agents - assert "tester" in agents + for name in ("general-purpose", "coder", "reviewer", "researcher", "tester"): + assert name in agents - def test_get_known_agent(self): + def test_get_valid(self): from multi_agent.definitions import get_agent_definition - agent = get_agent_definition("coder") - assert agent.name == "coder" - assert len(agent.tools) > 0 + defn = get_agent_definition("coder") + assert defn is not None + assert defn.name == "coder" - def test_get_unknown_agent_raises(self): - import pytest + def test_get_unknown_returns_none(self): from multi_agent.definitions import get_agent_definition - with pytest.raises(ValueError, match="Unknown agent type"): - get_agent_definition("nonexistent-agent-type") + assert get_agent_definition("nonexistent_xyz") is None diff --git a/tests/test_subagent.py b/tests/test_subagent.py index 93e509e5..2eb911ca 100644 --- a/tests/test_subagent.py +++ b/tests/test_subagent.py @@ -1,164 +1,137 @@ -"""Tests for multi_agent package split (PR5). - -Verifies that the 3-module split (definitions, manager, task) works correctly -and the backward-compat shim re-exports everything. -""" - -import textwrap -from pathlib import Path +# [desc] Tests for SubAgentManager: spawn, wait, cancel, depth limits, and result extraction [/desc] +"""Tests for the sub-agent system (subagent.py).""" +import time +import threading import pytest -from multi_agent.definitions import ( - AgentDefinition, - _parse_agent_md, - get_agent_definition, - load_agent_definitions, -) -from multi_agent.manager import SubAgentManager -from multi_agent.task import SubAgentTask, TaskStatus, _extract_final_text - - -# --- definitions.py tests --- - - -def test_builtin_agents_loaded(): - agents = load_agent_definitions(config_dir=Path("nonexistent_dir")) - assert "general-purpose" in agents - assert "coder" in agents - assert "reviewer" in agents - assert "researcher" in agents - assert "tester" in agents - - -def test_get_agent_definition_valid(): - defn = get_agent_definition("coder", config_dir=Path("nonexistent_dir")) - assert defn.name == "coder" - assert isinstance(defn.tools, list) - assert "Read" in defn.tools - - -def test_get_agent_definition_invalid(): - with pytest.raises(ValueError, match="Unknown agent type"): - get_agent_definition("nonexistent_type", config_dir=Path("nonexistent_dir")) - - -def test_parse_agent_md(tmp_path): - md = tmp_path / "custom.md" - md.write_text(textwrap.dedent("""\ - --- - tools: Read, Write, Bash - model: gpt-4 - description: A custom agent - --- - You are a custom agent for testing. - """), encoding="utf-8") - defn = _parse_agent_md(md) - assert defn.name == "custom" - assert defn.tools == ["Read", "Write", "Bash"] - assert defn.model == "gpt-4" - assert defn.description == "A custom agent" - assert "custom agent for testing" in defn.system_prompt - - -def test_parse_agent_md_no_frontmatter(tmp_path): - md = tmp_path / "simple.md" - md.write_text("Just a system prompt.", encoding="utf-8") - defn = _parse_agent_md(md) - assert defn.name == "simple" - assert defn.system_prompt == "Just a system prompt." - assert defn.tools == [] - - -def test_custom_agents_loaded_from_dir(tmp_path): - agents_dir = tmp_path / "agents" - agents_dir.mkdir() - (agents_dir / "mybot.md").write_text("---\ndescription: My bot\n---\nHello!", encoding="utf-8") - agents = load_agent_definitions(config_dir=tmp_path) - assert "mybot" in agents - assert agents["mybot"].description == "My bot" - assert "general-purpose" in agents # builtins still there - - -# --- task.py tests --- - - -def test_extract_final_text_simple(): - text = "line1\nline2\nline3\n" - result = _extract_final_text(text) - assert "line3" in result - - -def test_extract_final_text_with_blanks(): - text = "intro\n\nresult line 1\nresult line 2\n" - result = _extract_final_text(text) - assert "result line 1" in result - assert "result line 2" in result - - -def test_extract_final_text_empty(): - assert _extract_final_text("") == "" - - -def test_task_status_enum(): - assert TaskStatus.PENDING.value == "pending" - assert TaskStatus.RUNNING.value == "running" - assert TaskStatus.COMPLETED.value == "completed" - assert TaskStatus.FAILED.value == "failed" - - -def test_task_creation(): - task = SubAgentTask(prompt="Do something", agent_type="coder", name="test-agent") - assert len(task.id) == 12 - assert task.prompt == "Do something" - assert task.agent_type == "coder" - assert task.name == "test-agent" - assert task.status == TaskStatus.PENDING - - -def test_task_messaging(): - task = SubAgentTask(prompt="test") - task.send_message("hello") - task.send_message("world") - msgs = task.get_pending_messages() - assert msgs == ["hello", "world"] - assert task.get_pending_messages() == [] # cleared - - -def test_task_to_dict(): - task = SubAgentTask(prompt="Do X", agent_type="coder", name="bob") - d = task.to_dict() - assert d["name"] == "bob" - assert d["agent_type"] == "coder" - assert d["status"] == "pending" - - -# --- manager.py tests --- - - -def test_manager_creation(): - mgr = SubAgentManager(config_dir=Path("nonexistent")) - assert mgr._tasks == {} - - -def test_manager_list_agent_types(): - mgr = SubAgentManager(config_dir=Path("nonexistent")) - types = mgr.list_agent_types() - names = [t["name"] for t in types] - assert "general-purpose" in names - assert "coder" in names - - -# --- backward compat shim --- - - -def test_backward_compat_imports(): - """The subagent.py shim re-exports all public names.""" - import subagent - assert hasattr(subagent, "AgentDefinition") - assert hasattr(subagent, "SubAgentTask") - assert hasattr(subagent, "SubAgentManager") - assert hasattr(subagent, "load_agent_definitions") - assert hasattr(subagent, "get_agent_definition") - assert hasattr(subagent, "_extract_final_text") - assert hasattr(subagent, "_agent_run") +from multi_agent.subagent import SubAgentManager, SubAgentTask, _extract_final_text + + +# ── Mock for _agent_run ────────────────────────────────────────────────── + +def _make_mock_agent_run(sleep_per_iter=0.05, iters=3): + """Return a mock _agent_run that simulates work and checks cancellation.""" + + def mock_agent_run(prompt, state, config, system_prompt, depth=0, cancel_check=None): + for i in range(iters): + if cancel_check and cancel_check(): + return + time.sleep(sleep_per_iter) + # Append an assistant message to state + state.messages.append({ + "role": "assistant", + "content": f"Result for: {prompt}", + "tool_calls": [], + }) + # Yield a TurnDone-like event (generator protocol) + yield None + + return mock_agent_run + + +def _make_slow_mock(sleep_per_iter=0.2, iters=10): + """Return a slow mock for cancellation testing.""" + return _make_mock_agent_run(sleep_per_iter=sleep_per_iter, iters=iters) + + +@pytest.fixture +def manager(monkeypatch): + """Create a SubAgentManager with mocked _agent_run.""" + mock = _make_mock_agent_run() + monkeypatch.setattr("multi_agent.subagent._agent_run", mock) + mgr = SubAgentManager(max_concurrent=3, max_depth=3) + yield mgr + mgr.shutdown() + + +@pytest.fixture +def slow_manager(monkeypatch): + """Create a SubAgentManager with a slow mock for cancel testing.""" + mock = _make_slow_mock() + monkeypatch.setattr("multi_agent.subagent._agent_run", mock) + mgr = SubAgentManager(max_concurrent=3, max_depth=3) + yield mgr + mgr.shutdown() + + +# ── Tests ──────────────────────────────────────────────────────────────── + +class TestSpawnAndWait: + def test_spawn_and_wait_completes(self, manager): + task = manager.spawn("hello", {}, "system") + result_task = manager.wait(task.id, timeout=5) + assert result_task is not None + assert result_task.status == "completed" + assert result_task.result == "Result for: hello" + + def test_spawn_returns_immediately(self, manager): + task = manager.spawn("hello", {}, "system") + # Task should be pending or running, not yet completed + assert task.status in ("pending", "running") + + +class TestListTasks: + def test_list_tasks(self, manager): + t1 = manager.spawn("task1", {}, "system") + t2 = manager.spawn("task2", {}, "system") + tasks = manager.list_tasks() + task_ids = [t.id for t in tasks] + assert t1.id in task_ids + assert t2.id in task_ids + assert len(tasks) == 2 + + +class TestCancel: + def test_cancel_running_task(self, slow_manager): + task = slow_manager.spawn("slow task", {}, "system") + # Wait briefly to ensure the task starts running + time.sleep(0.1) + assert task.status == "running" + success = slow_manager.cancel(task.id) + assert success is True + # Wait for the task to actually finish + slow_manager.wait(task.id, timeout=5) + assert task.status == "cancelled" + + +class TestDepthLimit: + def test_spawn_at_max_depth_fails(self, manager): + task = manager.spawn("deep", {}, "system", depth=3) + assert task.status == "failed" + assert "Max depth" in task.result + + +class TestGetResult: + def test_get_result_completed(self, manager): + task = manager.spawn("hello", {}, "system") + manager.wait(task.id, timeout=5) + result = manager.get_result(task.id) + assert result == "Result for: hello" + + def test_get_result_unknown_id(self, manager): + result = manager.get_result("nonexistent_id") + assert result is None + + +class TestExtractFinalText: + def test_extracts_last_assistant(self): + messages = [ + {"role": "user", "content": "hi"}, + {"role": "assistant", "content": "first"}, + {"role": "user", "content": "more"}, + {"role": "assistant", "content": "second"}, + ] + assert _extract_final_text(messages) == "second" + + def test_returns_none_for_empty(self): + assert _extract_final_text([]) is None + + def test_returns_none_no_assistant(self): + messages = [{"role": "user", "content": "hi"}] + assert _extract_final_text(messages) is None + + +class TestWaitUnknown: + def test_wait_unknown_returns_none(self, manager): + assert manager.wait("nonexistent") is None From b87448805b2160989de1dddf1db2d5f671c64c16 Mon Sep 17 00:00:00 2001 From: Simon FREYBURGER Date: Tue, 21 Apr 2026 15:31:58 +0200 Subject: [PATCH 4/5] chore: remove auto-generated [desc] comment lines --- multi_agent/__init__.py | 1 - multi_agent/subagent.py | 1 - multi_agent/tools.py | 1 - subagent.py | 1 - temp_comment.md | 3 +++ tests/test_subagent.py | 1 - 6 files changed, 3 insertions(+), 5 deletions(-) create mode 100644 temp_comment.md diff --git a/multi_agent/__init__.py b/multi_agent/__init__.py index 4c0e48f4..2ca550ad 100644 --- a/multi_agent/__init__.py +++ b/multi_agent/__init__.py @@ -1,4 +1,3 @@ -# [desc] Multi-agent package init: exports AgentDefinition, SubAgentTask, SubAgentManager and registry utils [/desc] """Multi-agent package for cheetahclaws. Provides: diff --git a/multi_agent/subagent.py b/multi_agent/subagent.py index 48bc5b31..32e08021 100644 --- a/multi_agent/subagent.py +++ b/multi_agent/subagent.py @@ -1,4 +1,3 @@ -# [desc] Defines sub-agent types, parses agent definitions from .md files, and manages threaded nested agent loops. [/desc] """Threaded sub-agent system for spawning nested agent loops.""" from __future__ import annotations diff --git a/multi_agent/tools.py b/multi_agent/tools.py index 1b2cf782..cfa10c7a 100644 --- a/multi_agent/tools.py +++ b/multi_agent/tools.py @@ -1,4 +1,3 @@ -# [desc] Multi-agent tool registrations: Agent spawn, SendMessage, CheckAgentResult, ListAgentTasks, ListAgentTypes [/desc] """Multi-agent tool registrations. Registers the following tools into the central tool_registry: diff --git a/subagent.py b/subagent.py index a2d8bb1a..9ca5d860 100644 --- a/subagent.py +++ b/subagent.py @@ -1,4 +1,3 @@ -# [desc] Backward-compatibility shim re-exporting all symbols from multi_agent/subagent.py [/desc] """Backward-compatibility shim — real implementation is in multi_agent/subagent.py.""" from multi_agent.subagent import ( # noqa: F401 AgentDefinition, diff --git a/temp_comment.md b/temp_comment.md new file mode 100644 index 00000000..06124e65 --- /dev/null +++ b/temp_comment.md @@ -0,0 +1,3 @@ +**Dependency:** This PR depends on #47 (`pr3-checkpoint-stderr-tokens`). Please merge #47 first. + +The `tests/conftest.py` shared fixtures (`_no_quota`) are introduced in #47. This PR extends conftest with `scripted_stream` + `receiver_tool` for scheduling-specific e2e tests. \ No newline at end of file diff --git a/tests/test_subagent.py b/tests/test_subagent.py index 2eb911ca..d7682357 100644 --- a/tests/test_subagent.py +++ b/tests/test_subagent.py @@ -1,4 +1,3 @@ -# [desc] Tests for SubAgentManager: spawn, wait, cancel, depth limits, and result extraction [/desc] """Tests for the sub-agent system (subagent.py).""" import time import threading From 0d75137f8b3c2968c70278d1bcd5bbd68c17dd6b Mon Sep 17 00:00:00 2001 From: Simon FREYBURGER Date: Tue, 21 Apr 2026 15:32:08 +0200 Subject: [PATCH 5/5] chore: remove stray temp file --- temp_comment.md | 3 --- 1 file changed, 3 deletions(-) delete mode 100644 temp_comment.md diff --git a/temp_comment.md b/temp_comment.md deleted file mode 100644 index 06124e65..00000000 --- a/temp_comment.md +++ /dev/null @@ -1,3 +0,0 @@ -**Dependency:** This PR depends on #47 (`pr3-checkpoint-stderr-tokens`). Please merge #47 first. - -The `tests/conftest.py` shared fixtures (`_no_quota`) are introduced in #47. This PR extends conftest with `scripted_stream` + `receiver_tool` for scheduling-specific e2e tests. \ No newline at end of file