From 1412c33597d503cad4fdac2453f17dac1a1c8cd9 Mon Sep 17 00:00:00 2001 From: Slava Trofimov Date: Fri, 29 May 2026 19:12:18 -0400 Subject: [PATCH] harden MCP config and action-state retries --- src/octopal/infrastructure/mcp/manager.py | 54 +++++++++++++++------ src/octopal/runtime/octo/router.py | 36 ++++++++++++-- src/octopal/tools/catalog.py | 8 +-- src/octopal/tools/filesystem/path_safety.py | 19 ++++++-- tests/test_filesystem_hardening.py | 13 +++++ tests/test_mcp_manager.py | 49 +++++++++++++++++++ tests/test_router_tool_budget.py | 42 +++++++++++----- 7 files changed, 184 insertions(+), 37 deletions(-) diff --git a/src/octopal/infrastructure/mcp/manager.py b/src/octopal/infrastructure/mcp/manager.py index ac187fe1..835889e6 100644 --- a/src/octopal/infrastructure/mcp/manager.py +++ b/src/octopal/infrastructure/mcp/manager.py @@ -50,21 +50,32 @@ class MCPServerConfig: "upstream_5xx", "unknown_error", } -_MCP_SLOW_TOOL_HINTS = ( - "search", - "fetch", - "crawl", +_MCP_SLOW_TOOL_HINTS = ( + "search", + "fetch", + "crawl", "thread", "inbox", "mail", "list_", - "query", -) - -class MCPManager: + "query", +) + + +def _extract_mcp_server_configs(config_data: Any) -> Any: + if not isinstance(config_data, dict): + return None + if isinstance(config_data.get("servers"), dict): + return config_data["servers"] + if isinstance(config_data.get("mcpServers"), dict): + return config_data["mcpServers"] + return config_data + + +class MCPManager: def __init__(self, workspace_dir: Path): self.workspace_dir = workspace_dir - self.sessions: dict[str, ClientSession] = {} + self.sessions: dict[str, ClientSession] = {} # Stores the background task that keeps the session alive self._tasks: dict[str, asyncio.Task] = {} # Communication queues for disconnect signals @@ -79,12 +90,19 @@ def __init__(self, workspace_dir: Path): self._shutdown_requested = False self.config_path = workspace_dir / "mcp_servers.json" self.legacy_config_path = workspace_dir / "config" / "mcp.json" + self.root_config_path = workspace_dir / "mcp.json" + self.claude_config_path = workspace_dir / ".mcp.json" self._configs_loaded = False def _config_paths(self) -> list[Path]: - # Keep supporting the newer flat mcp_servers.json layout while also - # reading the legacy workspace/config/mcp.json workspace file. - return [self.legacy_config_path, self.config_path] + # Read broad compatibility paths first, then let canonical Octopal + # config files override duplicates from imported MCP client configs. + return [ + self.claude_config_path, + self.root_config_path, + self.legacy_config_path, + self.config_path, + ] def _load_configs_from_disk(self) -> dict[str, MCPServerConfig]: if self._configs_loaded: @@ -97,8 +115,16 @@ def _load_configs_from_disk(self) -> dict[str, MCPServerConfig]: for config_path in self._config_paths(): if not config_path.exists(): continue - config_data = json.loads(config_path.read_text(encoding="utf-8")) - server_configs = config_data.get("servers", config_data) + config_text = config_path.read_text(encoding="utf-8").strip() + if not config_text: + logger.warning("Skipping empty MCP config file", path=str(config_path)) + continue + try: + config_data = json.loads(config_text) + except json.JSONDecodeError: + logger.warning("Skipping invalid MCP config file", path=str(config_path), exc_info=True) + continue + server_configs = _extract_mcp_server_configs(config_data) if not isinstance(server_configs, dict): continue for server_id, cfg in server_configs.items(): diff --git a/src/octopal/runtime/octo/router.py b/src/octopal/runtime/octo/router.py index 5633172a..9121c160 100644 --- a/src/octopal/runtime/octo/router.py +++ b/src/octopal/runtime/octo/router.py @@ -1546,9 +1546,6 @@ def _looks_like_internal_worker_followup_leak(text: str) -> bool: "current tool set", "bounded route", "on the next turn", - "полный режим", - "нет доступа к a2a", - "нет `a2a_send_message`", ) orchestration_phrases = ( "can't modify", @@ -3647,6 +3644,8 @@ async def _needs_action_or_blocked_retry( ) -> bool: if not normalize_plain_text(candidate or "") or should_suppress_user_delivery(candidate): return False + if _looks_like_unbacked_action_commitment(candidate): + return True prompt = ( "Classify whether the draft assistant response is safe to deliver as the final answer for this turn.\n" "Return JSON only with this shape:\n" @@ -3684,6 +3683,37 @@ async def _needs_action_or_blocked_retry( return verdict == "requires_runtime_action_state" and confidence >= 0.55 +_UNBACKED_ACTION_BLOCKED_MARKERS = ( + "blocked", + "cannot", + "can't", + "could not", + "i need", + "need you", + "please confirm", +) + +_UNBACKED_ACTION_PATTERNS = ( + re.compile( + r"\b(?:i(?:'m| am)\s+)?(" + r"installing|activating|creating|starting|running|connecting|configuring|" + r"restarting|updating|saving|adding|writing|checking" + r")\b", + re.IGNORECASE, + ), +) + + +def _looks_like_unbacked_action_commitment(candidate: str) -> bool: + cleaned = normalize_plain_text(candidate or "") + if not cleaned: + return False + lowered = cleaned.lower() + if any(marker in lowered for marker in _UNBACKED_ACTION_BLOCKED_MARKERS): + return False + return any(pattern.search(cleaned) for pattern in _UNBACKED_ACTION_PATTERNS) + + async def _verify_final_response( provider: InferenceProvider, messages: list[Message | dict[str, Any]], diff --git a/src/octopal/tools/catalog.py b/src/octopal/tools/catalog.py index a828c7e6..37adc03e 100644 --- a/src/octopal/tools/catalog.py +++ b/src/octopal/tools/catalog.py @@ -1334,10 +1334,10 @@ def get_tools(mcp_manager=None) -> list[ToolSpec]: parameters={ "type": "object", "properties": { - "path": { - "type": "string", - "description": "Workspace-relative path to write.", - }, + "path": { + "type": "string", + "description": "Workspace-relative path to write, for example config/mcp.json instead of workspace/config/mcp.json.", + }, "content": {"type": "string", "description": "File contents."}, }, "required": ["path", "content"], diff --git a/src/octopal/tools/filesystem/path_safety.py b/src/octopal/tools/filesystem/path_safety.py index 8399d972..2218c4dd 100644 --- a/src/octopal/tools/filesystem/path_safety.py +++ b/src/octopal/tools/filesystem/path_safety.py @@ -23,6 +23,7 @@ def resolve_workspace_path( raise WorkspacePathError("path contains null byte") root = base_dir.resolve() + raw = _strip_redundant_workspace_prefix(raw, root) candidate = Path(os.path.normpath(str(root / raw))) _assert_within(root, candidate) @@ -37,11 +38,11 @@ def resolve_workspace_path( for p in allowed_paths: p_path = (root / p).resolve(strict=False) allowed_resolved.append(p_path) - + # Check if candidate is within any of the allowed paths # or if it's the worker's own specific directory which isn't part of allowed_paths but should be implicit # Actually, worker's directory check is handled implicitly if it's not restricted or we just check allowed_paths - + # We need to resolve candidate strictly if it exists, or loosely if it doesn't resolved_candidate = candidate.resolve(strict=False) is_allowed = False @@ -49,7 +50,7 @@ def resolve_workspace_path( if resolved_candidate == allowed_root or allowed_root in resolved_candidate.parents: is_allowed = True break - + if not is_allowed: raise WorkspacePathError(f"access denied: path '{raw}' is outside allowed paths") @@ -96,3 +97,15 @@ def _assert_existing_ancestor_within(root: Path, candidate: Path) -> None: if ancestor.exists(): resolved_ancestor = ancestor.resolve(strict=True) _assert_within(root, resolved_ancestor) + + +def _strip_redundant_workspace_prefix(raw: str, root: Path) -> str: + if Path(raw).is_absolute(): + return raw + normalized = raw.replace("\\", "/") + if normalized.startswith("/"): + return raw + parts = [part for part in normalized.split("/") if part and part != "."] + if root.name == "workspace" and parts[:1] == ["workspace"]: + return "/".join(parts[1:]) or "." + return raw diff --git a/tests/test_filesystem_hardening.py b/tests/test_filesystem_hardening.py index 3b47eb69..2c07f6f7 100644 --- a/tests/test_filesystem_hardening.py +++ b/tests/test_filesystem_hardening.py @@ -51,6 +51,19 @@ def test_fs_write_rejects_symlink_escape(tmp_path: Path) -> None: assert not (outside / "pwn.txt").exists() +def test_fs_write_treats_leading_workspace_as_redundant_at_workspace_root( + tmp_path: Path, +) -> None: + workspace = tmp_path / "workspace" + workspace.mkdir(parents=True, exist_ok=True) + + result = fs_write({"path": "workspace/config/mcp.json", "content": "{}"}, workspace) + + assert result == "fs_write ok" + assert (workspace / "config" / "mcp.json").read_text(encoding="utf-8") == "{}" + assert not (workspace / "workspace" / "config" / "mcp.json").exists() + + def test_fs_delete_unlinks_symlink_without_touching_target(tmp_path: Path) -> None: _ensure_symlink_supported(tmp_path) diff --git a/tests/test_mcp_manager.py b/tests/test_mcp_manager.py index 08b9a309..19dac6b9 100644 --- a/tests/test_mcp_manager.py +++ b/tests/test_mcp_manager.py @@ -235,6 +235,55 @@ def test_mcp_manager_reads_legacy_workspace_mcp_config(tmp_path) -> None: assert resolved == ["AgentMail"] +def test_mcp_manager_reads_claude_style_mcp_servers_config(tmp_path) -> None: + (tmp_path / ".mcp.json").write_text( + """ +{ + "mcpServers": { + "minimax": { + "command": "uvx", + "args": ["minimax-coding-plan-mcp"], + "tools": ["mcp_minimax_web_search"] + } + } +} +""".strip(), + encoding="utf-8", + ) + + manager = MCPManager(tmp_path) + + resolved = manager.resolve_configured_server_ids_for_tools(["mcp_minimax_web_search"]) + + assert resolved == ["minimax"] + + +def test_mcp_manager_skips_empty_compat_config_and_reads_canonical(tmp_path) -> None: + (tmp_path / ".mcp.json").write_text("", encoding="utf-8") + config_dir = tmp_path / "config" + config_dir.mkdir() + (config_dir / "mcp.json").write_text( + """ +{ + "servers": { + "docs": { + "command": "uvx", + "args": ["docs-mcp"], + "tools": ["mcp_docs_search"] + } + } +} +""".strip(), + encoding="utf-8", + ) + + manager = MCPManager(tmp_path) + + resolved = manager.resolve_configured_server_ids_for_tools(["mcp_docs_search"]) + + assert resolved == ["docs"] + + def test_resolve_configured_server_id_for_tool_name_is_case_insensitive(tmp_path) -> None: configs = { "AgentMail": MCPServerConfig( diff --git a/tests/test_router_tool_budget.py b/tests/test_router_tool_budget.py index 87997649..49630543 100644 --- a/tests/test_router_tool_budget.py +++ b/tests/test_router_tool_budget.py @@ -8,8 +8,8 @@ from octopal.infrastructure.config.models import A2AConfig, A2APeerConfig from octopal.infrastructure.providers.base import Message from octopal.runtime.octo.delivery import ( - restore_user_delivery, resolve_user_delivery, + restore_user_delivery, suppress_user_delivery, ) from octopal.runtime.octo.router import ( @@ -25,6 +25,7 @@ _get_scheduled_octo_control_tools, _get_scheduler_tools, _get_worker_followup_tools, + _needs_action_or_blocked_retry, _normalize_worker_followup_reply, _recover_textual_tool_call, _sanitize_messages_for_complete, @@ -32,8 +33,8 @@ route_or_reply, route_worker_results_back_to_octo, ) -from octopal.tools.communication.send_file import send_file_to_user from octopal.runtime.workers.contracts import WorkerResult +from octopal.tools.communication.send_file import send_file_to_user from octopal.tools.registry import ToolSpec from octopal.tools.tools import get_tools @@ -985,21 +986,21 @@ async def scenario() -> None: def test_normalize_worker_followup_reply_uses_structured_user_response() -> None: raw = """ { - "user_response": "Брифинг готов.", + "user_response": "Briefing is ready.", "no_user_response": false, "actions_taken": [{"type": "get_worker_output_path", "summary": "checked output"}], "reason": "worker completed" } """ - assert _normalize_worker_followup_reply(raw) == "Брифинг готов." + assert _normalize_worker_followup_reply(raw) == "Briefing is ready." def test_normalize_worker_followup_reply_strips_noisy_user_visible_wrapper() -> None: raw = ( "I checked internal worker state and should only show the marked part.\n\n" - "Брифинг готов." + "Briefing is ready." ) - assert _normalize_worker_followup_reply(raw) == "Брифинг готов." + assert _normalize_worker_followup_reply(raw) == "Briefing is ready." def test_normalize_worker_followup_reply_suppresses_structured_no_response() -> None: @@ -1040,7 +1041,7 @@ def test_normalize_worker_followup_reply_suppresses_internal_mode_leak() -> None def test_normalize_worker_followup_reply_suppresses_a2a_mode_leak() -> None: raw = """ { - "user_response": "I don't have the A2A messaging tools available in my current tool set. I need to send the message from my full orchestration context. Отправлю сообщение как только вернусь в полный режим — сейчас у меня нет доступа к A2A инструментам из этого контекста.", + "user_response": "I don't have the A2A messaging tools available in my current tool set. I need to send the message from my full orchestration context. I will send it once I am back in full mode.", "no_user_response": false, "actions_taken": [], "reason": "needs orchestration" @@ -1504,7 +1505,7 @@ async def complete_stream(self, messages, *, on_partial, **kwargs): async def complete_with_tools(self, messages, *, tools, tool_choice="auto", **kwargs): self.calls += 1 if self.calls == 1: - return {"content": "Проверю это сейчас.", "tool_calls": []} + return {"content": "I will take care of that.", "tool_calls": []} if self.calls == 2: self.retry_prompt_seen = any( "previous answer was classified as requiring concrete runtime action state" @@ -1521,7 +1522,7 @@ async def complete_with_tools(self, messages, *, tools, tool_choice="auto", **kw } ], } - return {"content": "Проверила: ok.", "tool_calls": []} + return {"content": "Checked: ok.", "tool_calls": []} class DummyMemory: async def add_message(self, role, content, metadata=None): @@ -1574,11 +1575,11 @@ async def scenario() -> None: DummyOcto(), provider, DummyMemory(), - "проверь статус", + "check status", 123, "", ) - assert response == "Проверила: ok." + assert response == "Checked: ok." assert provider.calls == 3 assert provider.verifier_seen is True assert provider.retry_prompt_seen is True @@ -1586,6 +1587,21 @@ async def scenario() -> None: asyncio.run(scenario()) +def test_action_state_retry_catches_present_tense_action_commitment_without_verifier() -> None: + class DummyProvider: + async def complete(self, messages, **kwargs): + raise AssertionError("deterministic action heuristic should run before verifier") + + async def scenario() -> None: + assert await _needs_action_or_blocked_retry( + provider=DummyProvider(), + messages=[Message(role="user", content="install mcp, then activate it")], + candidate="`uvx` is available. Installing MiniMax MCP...", + ) + + asyncio.run(scenario()) + + def test_route_retries_with_fewer_tools_after_invalid_tool_payload(monkeypatch) -> None: class DummyProvider: def __init__(self) -> None: @@ -2165,7 +2181,7 @@ async def complete_with_tools(self, messages, *, tools, tool_choice="auto", **kw } ], } - return {"content": "Проверяю и вернусь с итогом.", "tool_calls": []} + return {"content": "Checking now; I will follow up with the result.", "tool_calls": []} class DummyMemory: async def add_message(self, role, content, metadata=None): @@ -2224,7 +2240,7 @@ async def scenario() -> None: 123, "", ) - assert response == "Проверяю и вернусь с итогом." + assert response == "Checking now; I will follow up with the result." assert octo.followup_marked == 1 asyncio.run(scenario())