diff --git a/docs/configuration/workspaceinfo-customization.md b/docs/configuration/workspaceinfo-customization.md
index c3ca4f6a..cba8f981 100644
--- a/docs/configuration/workspaceinfo-customization.md
+++ b/docs/configuration/workspaceinfo-customization.md
@@ -26,12 +26,39 @@ codeCollections:
- # Another code collections to scan
+# Information about MCP servers to discover tools from
+mcpConfig:
+ servers:
+ - display_name: jira
+ url: https://jira-mcp.internal:443/mcp
+ secret_ref: jira-mcp-token
+
+
# Custom information about specific code bundles
custom:
prometheus_provider: gmp
# More custom configuration
```
+#### MCP Server Discovery (`mcpConfig`)
+
+The optional `mcpConfig` block declares one or more private MCP servers to introspect during the indexer phase. For each entry the workspace builder calls `initialize` + `tools/list` on the server, then emits one `mcp_tool` resource per discovered tool. The `mcp-tool-proxy` codebundle in [rw-generic-codecollection](https://github.com/runwhen-contrib/rw-generic-codecollection) renders one SLX + Runbook per `mcp_tool` resource via its generation rule.
+
+
| Field | Required | Description |
|---|
display_name | yes | Short identifier used in generated SLX names and tags (e.g. jira, linear). Lowercase, alphanumeric / underscores. |
url | yes | Full HTTPS URL of the MCP endpoint, including the path (commonly /mcp). Must be reachable from the workspace-builder pod's network at index time, and from runner pods at execution time. |
secret_ref | yes | Name of a Kubernetes Secret in the same namespace whose data.token is the bearer token to send as Authorization: Bearer <token>. Token is base64-decoded by the indexer before use. |
verify_tls | no (default true) | Set to false to skip TLS certificate verification for this server. Intended for environments where the pod's CA bundle does not yet trust the server's issuer; a warning is logged for every cycle in which it is disabled. Prefer extending the CA bundle for production use. |
+
+> The generated Runbook's codeBundle.ref is pinned to the ref of the codecollection the template was loaded from (the ref standard template variable). Point your codeCollections entry for rw-generic-codecollection at the branch/tag you want runners to execute against — no extra knob in mcpConfig.
+
+A single server's `tools/list` failure logs a warning and skips that server only — other servers and the rest of the cycle continue normally.
+
+##### Required secret shape
+
+```bash
+kubectl -n create secret generic jira-mcp-token \
+ --from-literal=token=''
+```
+
+The runner uses the same `secret_ref` at execution time (via `secretsProvided.workspaceKey`), so the secret must be reachable from the runner namespace too.
+
#### Basic Workspace Configuration Info
There are several settings that are used to configure information in the workspace that's generated to be uploaded to the RunWhen platform to. The available settings are:
diff --git a/src/component.py b/src/component.py
index 5dd857f1..0a111533 100644
--- a/src/component.py
+++ b/src/component.py
@@ -248,7 +248,7 @@ def init_components():
# be added here, which is less than ideal, although practically may not be
# a huge deal.
component_stages_init = (
- (Stage.INDEXER, ["load_resources", "kubeapi", "cloudquery", "azure_devops"]),
+ (Stage.INDEXER, ["load_resources", "kubeapi", "cloudquery", "azure_devops", "mcp_tools"]),
(Stage.ENRICHER, ["generation_rules"]),
(Stage.RENDERER, ["render_output_items", "dump_resources"])
)
diff --git a/src/enrichers/generation_rules.py b/src/enrichers/generation_rules.py
index 7ad470b1..7642fea8 100644
--- a/src/enrichers/generation_rules.py
+++ b/src/enrichers/generation_rules.py
@@ -35,6 +35,7 @@
from .gcp import GCPPlatformHandler, GCP_PLATFORM
from .aws import AWSPlatformHandler, AWS_PLATFORM
from .azure_devops import AzureDevOpsPlatformHandler
+from .mcp import McpPlatformHandler, MCP_PLATFORM
from renderers.render_output_items import OUTPUT_ITEMS_PROPERTY
from renderers.render_output_items import OutputItem as RendererOutputItem
@@ -1110,6 +1111,7 @@ def load(context: Context) -> None:
GCP_PLATFORM: GCPPlatformHandler(),
AWS_PLATFORM: AWSPlatformHandler(),
"azure_devops": AzureDevOpsPlatformHandler(),
+ MCP_PLATFORM: McpPlatformHandler(),
}
context.set_property(PLATFORM_HANDLERS_PROPERTY_NAME, platform_handlers)
request_code_collections = context.get_setting("CODE_COLLECTIONS")
diff --git a/src/enrichers/mcp.py b/src/enrichers/mcp.py
new file mode 100644
index 00000000..9a06998a
--- /dev/null
+++ b/src/enrichers/mcp.py
@@ -0,0 +1,35 @@
+from typing import Any, Optional
+
+from resources import Resource
+
+from .generation_rule_types import PlatformHandler
+
+# Source of truth for the platform name lives with the indexer; mirror it
+# here for the enricher's registration without forcing a cross-package
+# dependency at the top level.
+from indexers.mcp_tools import PLATFORM_NAME as MCP_PLATFORM
+
+
+class McpPlatformHandler(PlatformHandler):
+ """Platform handler for MCP-discovered tool resources.
+
+ Resources are emitted by `indexers.mcp_tools._emit_tool_resource` with a
+ nested `spec` dict carrying the server/tool fields. This handler surfaces
+ those fields as qualifier values and resource property values so the
+ generic generation-rules engine can match and name SLXs against them.
+ """
+
+ def __init__(self):
+ super().__init__(MCP_PLATFORM)
+
+ def _spec_value(self, resource: Resource, key: str) -> Optional[str]:
+ spec = getattr(resource, "spec", None) or {}
+ value = spec.get(key)
+ return value if value is not None else None
+
+ def get_resource_qualifier_value(self, resource: Resource, qualifier_name: str) -> Optional[str]:
+ return self._spec_value(resource, qualifier_name)
+
+ def get_resource_property_values(self, resource: Resource, property_name: str) -> Optional[list[Any]]:
+ value = self._spec_value(resource, property_name)
+ return [value] if value is not None else None
diff --git a/src/indexers/mcp_tools.py b/src/indexers/mcp_tools.py
new file mode 100644
index 00000000..c2aa65fc
--- /dev/null
+++ b/src/indexers/mcp_tools.py
@@ -0,0 +1,307 @@
+"""Indexer that discovers MCP tools by:
+ 1. Reading the workspace's MCP server list from the MCP_CONFIG setting
+ (populated from Helm `mcpConfig:` values via the runner's existing
+ workspaceInfo plumbing — Approach D2).
+ 2. For each configured server, calling its `tools/list` MCP endpoint
+ (in-VPC, outbound from the runner).
+ 3. Emitting one `mcp_tool` resource per tool to the Registry, with the
+ server's URL/secret-ref/display-name and the tool's input schema.
+
+A subsequent generation-rule run (enrichers/generation_rules) matches these
+resources and renders one SLX + Runbook per tool via the mcp-tool-proxy
+templates in rw-generic-codecollection.
+"""
+
+import logging
+from typing import Any
+
+import requests
+
+from component import Setting, SettingDependency, Context
+from resources import Registry, REGISTRY_PROPERTY_NAME
+
+logger = logging.getLogger(__name__)
+
+DOCUMENTATION = "Discovers MCP tools from Helm-configured MCP servers (Approach D2)."
+
+# Same pattern as CLOUD_CONFIG_SETTING in src/indexers/common.py — a DICT
+# setting populated from the workspaceInfo YAML's `mcpConfig:` key, which the
+# runner Helm chart writes from its values.yaml `mcpConfig:` block:
+#
+# mcpConfig:
+# servers:
+# - display_name: jira
+# url: https://jira-mcp.internal:443/mcp
+# secret_ref: jira-mcp-token
+# - display_name: linear
+# url: https://linear-mcp.internal:443/mcp
+# secret_ref: linear-mcp-token
+MCP_CONFIG_SETTING = Setting(
+ "MCP_CONFIG",
+ "mcpConfig",
+ Setting.Type.DICT,
+ "Configuration for MCP servers to introspect for tool discovery.",
+ dict(),
+)
+
+SETTINGS = (
+ SettingDependency(MCP_CONFIG_SETTING, False),
+)
+
+PLATFORM_NAME = "mcp"
+RESOURCE_TYPE = "mcp_tool"
+TOOLS_LIST_TIMEOUT = 15
+
+# Verbs that strongly suggest a tool only reads state. Used as a fallback when
+# the MCP server doesn't set `annotations.readOnlyHint=true` — most servers
+# don't bother to set the hint, so we fall back on the tool name's leading
+# verb to avoid defaulting every tool to read-write.
+READ_ONLY_VERBS = (
+ "get", "list", "search", "read", "fetch",
+ "describe", "find", "query", "show", "view",
+)
+
+
+def _name_suggests_read_only(name: str) -> bool:
+ """True if the tool name's leading token (split on `_` or `-`) is one of
+ READ_ONLY_VERBS. Examples that match: `list_teams`, `get-project`,
+ `search_documentation`. Examples that don't: `create_issue`,
+ `update_project`, `delete_attachment`."""
+ if not name:
+ return False
+ lower = name.lower()
+ for verb in READ_ONLY_VERBS:
+ if lower == verb or lower.startswith(verb + "_") or lower.startswith(verb + "-"):
+ return True
+ return False
+
+
+def _compute_access(tool: dict[str, Any]) -> str:
+ """Determine the SLX `access` tag for an MCP tool.
+
+ - `annotations.readOnlyHint=true` is authoritative → "read-only".
+ - Otherwise (hint is false, missing, or annotations absent) fall back on
+ the tool name's leading verb. Many MCP servers leave readOnlyHint unset
+ or default it to false even for clearly read-only tools, so we'd flag
+ half the catalog as read-write without the heuristic.
+ - Default is "read-write" — safer to over-mark write capability than to
+ silently let a write tool through as read-only.
+ """
+ annotations = tool.get("annotations") or {}
+ if annotations.get("readOnlyHint") is True:
+ return "read-only"
+ if _name_suggests_read_only(tool.get("name", "")):
+ return "read-only"
+ return "read-write"
+
+
+def index(context: Context) -> None:
+ logger.info("mcp_tools: indexer starting")
+ config = context.get_setting(MCP_CONFIG_SETTING) or {}
+ raw_count = len((config or {}).get("servers") or [])
+ logger.info("mcp_tools: MCP_CONFIG has %d raw server entries", raw_count)
+ servers = _load_servers_from_setting(config, on_warning=context.add_warning)
+ logger.info("mcp_tools: %d server entries passed validation", len(servers))
+ if not servers:
+ logger.info("mcp_tools: no MCP servers configured; skipping.")
+ return
+
+ registry: Registry = context.get_property(REGISTRY_PROPERTY_NAME)
+
+ total_tools = 0
+ for server in servers:
+ name = server.get("display_name")
+ url = server.get("url")
+ logger.info("mcp_tools: calling tools/list on %s (url=%s, secret_ref=%s)",
+ name, url, server.get("secret_ref"))
+ try:
+ tools = _list_tools(server)
+ except Exception as exc:
+ # Preserve previous SLXs on failure (per design §7.9). We simply
+ # don't emit fresh resources for this server; the existing SLXs
+ # from the previous successful cycle stay in place upstream.
+ logger.warning("mcp_tools: tools/list failed for %s: %s", name, exc)
+ context.add_warning(
+ f"MCP tools/list failed for {name}: {exc}")
+ continue
+ logger.info("mcp_tools: %s returned %d tools", name, len(tools))
+ for tool in tools:
+ logger.debug("mcp_tools: emitting resource for %s/%s", name, tool.get("name"))
+ _emit_tool_resource(registry, server, tool)
+ total_tools += 1
+ logger.info("mcp_tools: indexer complete; emitted %d mcp_tool resources across %d servers",
+ total_tools, len(servers))
+
+
+REQUIRED_SERVER_FIELDS = ("display_name", "url", "secret_ref")
+
+
+def _load_servers_from_setting(config, on_warning=None) -> list[dict[str, Any]]:
+ """Parse the MCP_CONFIG setting (a DICT mirroring the Helm values block)
+ into a list of validated server entries. Skips malformed entries with a
+ warning so a single bad config row doesn't prevent the rest from working.
+ """
+ if not config:
+ return []
+ raw = config.get("servers") or []
+ valid: list[dict[str, Any]] = []
+ for entry in raw:
+ if not isinstance(entry, dict):
+ if on_warning:
+ on_warning(f"mcpConfig.servers entry is not a dict: {entry!r}")
+ continue
+ missing = [f for f in REQUIRED_SERVER_FIELDS if not entry.get(f)]
+ if missing:
+ label = entry.get("display_name") or entry.get("url") or ""
+ if on_warning:
+ on_warning(
+ f"mcpConfig.servers[{label}] missing required field(s) "
+ f"{missing}; skipping")
+ continue
+ valid.append(entry)
+ return valid
+
+
+def _resolve_secret(secret_ref: str) -> str:
+ """Read a workspace secret and return the token value. Resolved here so
+ tests can monkey-patch this single function rather than threading a
+ fetcher parameter through every call site.
+
+ k8s_utils.get_secret returns secret.data from the Kubernetes Python
+ client unchanged — values are still base64-encoded. Must be decoded
+ before use as a bearer token (see azure_utils.py:95 for prior art).
+ """
+ import base64
+ from k8s_utils import get_secret
+ data = get_secret(secret_ref)
+ # Secret convention: stored under key "token"; fall back to single-key shape.
+ encoded = data.get("token") or next(iter(data.values()))
+ return base64.b64decode(encoded).decode("utf-8")
+
+
+def _list_tools(server: dict[str, Any],
+ fetch_secret=None) -> list[dict[str, Any]]:
+ """Calls the MCP server's initialize/notifications/tools/list handshake
+ and returns the `tools` array from the result.
+
+ `fetch_secret` is injected for testability. Defaults to _resolve_secret
+ which talks to the k8s secret store at runtime.
+
+ The `verify_tls` field on a server entry (default True) is forwarded to
+ the underlying `requests` call. Set it to False for a server whose cert
+ can't be validated by the pod's CA bundle (temporary escape hatch — the
+ proper fix is to add the issuer to the bundle).
+ """
+ if fetch_secret is None:
+ fetch_secret = _resolve_secret
+ token = fetch_secret(server["secret_ref"])
+
+ verify_tls = server.get("verify_tls", True)
+ if verify_tls is False:
+ logger.warning(
+ "mcp_tools: TLS verification DISABLED for %s — temporary debug "
+ "mode. The proper fix is to trust the server's CA in the pod's "
+ "CA bundle. Do not leave verify_tls=false in production.",
+ server.get("display_name"),
+ )
+ # Silence requests' per-call InsecureRequestWarning so the warning
+ # above is the only noise per server (logged once at start).
+ try:
+ from urllib3 import disable_warnings
+ from urllib3.exceptions import InsecureRequestWarning
+ disable_warnings(InsecureRequestWarning)
+ except Exception:
+ pass
+
+ s = requests.Session()
+ # Note: setting s.verify alone isn't enough when REQUESTS_CA_BUNDLE is set
+ # in the environment — requests.Session.merge_environment_settings reads
+ # the env path into the per-request verify (when the request-level value
+ # is None/True) and then merge_setting returns the per-request value over
+ # the session-level one. We pass verify=verify_tls to each post() call so
+ # the env override is skipped when verify_tls is False.
+ s.verify = verify_tls
+ s.headers.update({
+ "Content-Type": "application/json",
+ "Authorization": f"Bearer {token}",
+ "Accept": "application/json, text/event-stream",
+ })
+ init = s.post(server["url"],
+ json={"jsonrpc": "2.0", "id": 1, "method": "initialize",
+ "params": {"protocolVersion": "2025-03-26",
+ "capabilities": {},
+ "clientInfo": {"name": "runwhen-builder", "version": "1.0.0"}}},
+ timeout=TOOLS_LIST_TIMEOUT,
+ verify=verify_tls)
+ init.raise_for_status()
+ sid = init.headers.get("Mcp-Session-Id")
+ if sid:
+ s.headers["Mcp-Session-Id"] = sid
+ s.post(server["url"],
+ json={"jsonrpc": "2.0", "method": "notifications/initialized"},
+ timeout=TOOLS_LIST_TIMEOUT,
+ verify=verify_tls)
+ resp = s.post(server["url"],
+ json={"jsonrpc": "2.0", "id": 2, "method": "tools/list"},
+ timeout=TOOLS_LIST_TIMEOUT,
+ verify=verify_tls)
+ resp.raise_for_status()
+ parsed = _parse_jsonrpc_response(resp)
+ if parsed is None:
+ raise RuntimeError("tools/list returned no parseable JSON-RPC envelope")
+ if "error" in parsed:
+ raise RuntimeError(f"tools/list error: {parsed['error']}")
+ return parsed.get("result", {}).get("tools", [])
+
+
+def _parse_jsonrpc_response(resp) -> dict[str, Any] | None:
+ """Parse a JSON or SSE (text/event-stream) MCP response body. Mirrors
+ `_parse_response` in the rw-generic-codecollection mcp-tool-proxy client:
+ streamable HTTP MCP servers may answer in either content type, and the
+ SSE form is `event: message\\ndata: \\n\\n` per RPC envelope.
+ """
+ import json as _json
+ ct = resp.headers.get("Content-Type", "")
+ if "text/event-stream" in ct:
+ for line in resp.text.split("\n"):
+ if line.startswith("data: "):
+ try:
+ msg = _json.loads(line[len("data: "):])
+ if "id" in msg or "result" in msg or "error" in msg:
+ return msg
+ except _json.JSONDecodeError:
+ continue
+ return None
+ try:
+ return resp.json()
+ except Exception:
+ return None
+
+
+def _emit_tool_resource(registry: Registry,
+ server: dict[str, Any],
+ tool: dict[str, Any]) -> None:
+ """Add one `mcp_tool` resource to the registry under platform=mcp."""
+ server_name = server["display_name"]
+ tool_name = tool["name"]
+ qualified = f"{server_name}__{tool_name}"
+ spec = {
+ "server_id": server.get("server_id"),
+ "server_display_name": server_name,
+ "server_url": server["url"],
+ "secret_ref": server["secret_ref"],
+ "verify_tls": server.get("verify_tls", True),
+ "tool_name": tool_name,
+ "description": tool.get("description", ""),
+ "input_schema": tool.get("inputSchema") or tool.get("input_schema") or {
+ "type": "object", "properties": {}, "required": [],
+ },
+ "access": _compute_access(tool),
+ }
+ registry.add_resource(
+ platform_name=PLATFORM_NAME,
+ resource_type_name=RESOURCE_TYPE,
+ resource_name=qualified,
+ resource_qualified_name=qualified,
+ resource_attributes={"spec": spec},
+ )
diff --git a/src/renderers/render_output_items.py b/src/renderers/render_output_items.py
index fb72b0c2..2889a599 100644
--- a/src/renderers/render_output_items.py
+++ b/src/renderers/render_output_items.py
@@ -88,6 +88,16 @@ def compute_resource_path_from_hierarchy(data: dict) -> None:
if name and name not in tag_lookup and value is not None:
tag_lookup[name] = str(value)
+ # If the template already set an explicit resourcePath, honor it — some
+ # platforms (e.g. mcp) want hierarchy and resourcePath to be different
+ # depths (3-key hierarchy for UI grouping, 2-key resourcePath for the
+ # underlying resource identity). For platforms that don't set it, we
+ # still compute from hierarchy so they get the auto-behavior unchanged.
+ if additional_context.get('resourcePath'):
+ logger.debug(
+ f"Honoring explicit resourcePath from template: {additional_context['resourcePath']}")
+ return
+
# Build resourcePath from the hierarchy entries in order.
# resource_name always appears in the path even when its value duplicates
# a parent entry — the hierarchy is the single source of truth.
diff --git a/src/run.py b/src/run.py
index 049e0144..0b612ebf 100755
--- a/src/run.py
+++ b/src/run.py
@@ -226,7 +226,7 @@ def main():
help=f'Host/port info for where the {SERVICE_NAME} REST service is running. '
f'Format is :')
parser.add_argument('-c', '--components', action='store',
- default="load_resources,kubeapi,cloudquery,azure_devops,generation_rules,render_output_items,dump_resources")
+ default="load_resources,kubeapi,cloudquery,azure_devops,mcp_tools,generation_rules,render_output_items,dump_resources")
parser.add_argument('-o', '--output', action='store', dest='output_path', default="output",
help="Path to output directory for generated files. "
"The path is relative to the base directory.")
@@ -433,6 +433,7 @@ def main():
code_collections = workspace_info.get("codeCollections")
overrides = workspace_info.get("overrides", {})
task_tag_exclusions = workspace_info.get("taskTagExclusions")
+ mcp_config = workspace_info.get("mcpConfig")
# ------------------------------------------------------------------ 4. validation guards
missing = []
@@ -682,6 +683,8 @@ def main():
request_data['codeCollections'] = code_collections
if cloud_config:
request_data['cloudConfig'] = cloud_config
+ if mcp_config:
+ request_data['mcpConfig'] = mcp_config
if overrides:
request_data['overrides'] = overrides
if task_tag_exclusions:
diff --git a/src/run.sh b/src/run.sh
index 1571bab5..e4d86735 100755
--- a/src/run.sh
+++ b/src/run.sh
@@ -54,9 +54,9 @@ fi
touch "$LOCK_FILE"
# Construct components string based on whether --disable-cloudquery is set
-COMPONENTS="load_resources,kubeapi,azure_devops,generation_rules,render_output_items,dump_resources"
+COMPONENTS="load_resources,kubeapi,azure_devops,mcp_tools,generation_rules,render_output_items,dump_resources"
if [ $DISABLE_CLOUDQUERY -eq 0 ]; then
- COMPONENTS="load_resources,kubeapi,cloudquery,azure_devops,generation_rules,render_output_items,dump_resources"
+ COMPONENTS="load_resources,kubeapi,cloudquery,azure_devops,mcp_tools,generation_rules,render_output_items,dump_resources"
fi
# Run the Python script with your specified arguments
diff --git a/src/tests.py b/src/tests.py
index d67c5812..75c9bcd2 100644
--- a/src/tests.py
+++ b/src/tests.py
@@ -464,4 +464,613 @@ def test_code_bundle_black_list(self):
# return local_repo_url
#
# def test_no_code_bundle_config(self):
-# pass
\ No newline at end of file
+# pass
+
+
+# ---------------------------------------------------------------------------
+# mcp_tools indexer
+# ---------------------------------------------------------------------------
+
+import json
+import responses
+from unittest import mock
+
+from indexers import mcp_tools
+from enrichers.mcp import McpPlatformHandler, MCP_PLATFORM
+
+
+class McpPlatformHandlerTest(TestCase):
+ def setUp(self):
+ self.handler = McpPlatformHandler()
+
+ def test_platform_name_matches_indexer(self):
+ self.assertEqual(MCP_PLATFORM, mcp_tools.PLATFORM_NAME)
+ self.assertEqual(self.handler.name, MCP_PLATFORM)
+
+ def test_qualifier_value_pulled_from_spec(self):
+ class R: # minimal resource stand-in
+ spec = {"server_display_name": "jira", "tool_name": "create_issue"}
+
+ self.assertEqual(
+ self.handler.get_resource_qualifier_value(R(), "server_display_name"),
+ "jira",
+ )
+ self.assertEqual(
+ self.handler.get_resource_qualifier_value(R(), "tool_name"),
+ "create_issue",
+ )
+
+ def test_qualifier_value_returns_none_when_missing(self):
+ class R:
+ spec = {"server_display_name": "jira"}
+
+ self.assertIsNone(self.handler.get_resource_qualifier_value(R(), "tool_name"))
+
+ def test_property_values_wrap_spec_value_in_list(self):
+ class R:
+ spec = {"tool_name": "create_issue"}
+
+ self.assertEqual(
+ self.handler.get_resource_property_values(R(), "tool_name"),
+ ["create_issue"],
+ )
+ self.assertIsNone(self.handler.get_resource_property_values(R(), "missing"))
+
+ def test_handles_resource_without_spec(self):
+ class R:
+ pass
+
+ self.assertIsNone(self.handler.get_resource_qualifier_value(R(), "tool_name"))
+ self.assertIsNone(self.handler.get_resource_property_values(R(), "tool_name"))
+
+
+class LoadServersFromSettingTest(TestCase):
+ def test_returns_servers_list_from_well_formed_config(self):
+ config = {"servers": [
+ {"display_name": "jira",
+ "url": "https://jira-mcp.internal/mcp",
+ "secret_ref": "jira-mcp-token"},
+ {"display_name": "linear",
+ "url": "https://linear-mcp.internal/mcp",
+ "secret_ref": "linear-mcp-token"},
+ ]}
+ servers = mcp_tools._load_servers_from_setting(config)
+ self.assertEqual(len(servers), 2)
+ self.assertEqual(servers[0]["display_name"], "jira")
+
+ def test_returns_empty_when_no_servers_key(self):
+ self.assertEqual(mcp_tools._load_servers_from_setting({}), [])
+ self.assertEqual(mcp_tools._load_servers_from_setting(None), [])
+
+ def test_skips_entries_missing_required_fields_and_warns(self):
+ warnings = []
+ config = {"servers": [
+ {"display_name": "ok",
+ "url": "https://ok.internal/mcp",
+ "secret_ref": "ok-token"},
+ {"display_name": "broken"}, # missing url + secret_ref
+ {"url": "https://anon.internal/mcp",
+ "secret_ref": "anon-token"}, # missing display_name
+ ]}
+ servers = mcp_tools._load_servers_from_setting(
+ config, on_warning=warnings.append)
+ self.assertEqual([s["display_name"] for s in servers], ["ok"])
+ self.assertEqual(len(warnings), 2)
+ self.assertTrue(any("broken" in w for w in warnings))
+
+
+class ListToolsTest(TestCase):
+ @responses.activate
+ def test_lists_tools_via_initialize_and_tools_list(self):
+ url = "https://jira-mcp.internal/mcp"
+
+ def init_cb(request):
+ body = json.loads(request.body)
+ assert body["method"] == "initialize"
+ return (200, {"Content-Type": "application/json",
+ "Mcp-Session-Id": "s1"},
+ json.dumps({"jsonrpc": "2.0", "id": body["id"],
+ "result": {"protocolVersion": "2025-03-26",
+ "capabilities": {},
+ "serverInfo": {"name": "x", "version": "0"}}}))
+
+ def notify_or_list_cb(request):
+ body = json.loads(request.body)
+ if body.get("method") == "notifications/initialized":
+ return (200, {}, "")
+ assert body["method"] == "tools/list"
+ return (200, {"Content-Type": "application/json"},
+ json.dumps({"jsonrpc": "2.0", "id": body["id"],
+ "result": {"tools": [
+ {"name": "create_issue",
+ "description": "Create a Jira issue",
+ "inputSchema": {"type": "object",
+ "properties": {"project": {"type": "string"}},
+ "required": ["project"]}},
+ ]}}))
+
+ responses.add_callback(responses.POST, url, callback=init_cb)
+ responses.add_callback(responses.POST, url, callback=notify_or_list_cb)
+ responses.add_callback(responses.POST, url, callback=notify_or_list_cb)
+
+ server = {"display_name": "jira", "url": url, "secret_ref": "tok"}
+ tools = mcp_tools._list_tools(server, fetch_secret=lambda _: "stub-token")
+ self.assertEqual(len(tools), 1)
+ self.assertEqual(tools[0]["name"], "create_issue")
+ self.assertEqual(tools[0]["inputSchema"]["required"], ["project"])
+
+ def test_verify_tls_flag_propagates_to_requests(self):
+ """verify_tls=False on a server entry should disable cert verification
+ on the per-request `verify=` kwarg (not just session-level), because
+ REQUESTS_CA_BUNDLE in the environment can override session.verify.
+ Default (omitted) stays True."""
+ captured = {"calls": []}
+ real_session_post = mcp_tools.requests.Session.post
+
+ def post_spy(self, url, **kwargs):
+ captured["calls"].append({
+ "session_verify": self.verify,
+ "kwarg_verify": kwargs.get("verify"),
+ })
+ class _Resp:
+ status_code = 200
+ headers = {}
+ def raise_for_status(self): pass
+ def json(self_inner): return {"jsonrpc": "2.0", "id": 1,
+ "result": {"tools": []}}
+ return _Resp()
+
+ mcp_tools.requests.Session.post = post_spy
+ try:
+ mcp_tools._list_tools(
+ {"display_name": "x", "url": "https://x.invalid/mcp",
+ "secret_ref": "tok", "verify_tls": False},
+ fetch_secret=lambda _: "stub",
+ )
+ # Every post() call must carry verify=False as a kwarg so the
+ # env-var override path in requests is skipped.
+ self.assertTrue(captured["calls"])
+ for call in captured["calls"]:
+ self.assertEqual(call["session_verify"], False)
+ self.assertEqual(call["kwarg_verify"], False)
+
+ captured["calls"] = []
+ mcp_tools._list_tools(
+ {"display_name": "x", "url": "https://x.invalid/mcp",
+ "secret_ref": "tok"}, # verify_tls omitted → default True
+ fetch_secret=lambda _: "stub",
+ )
+ for call in captured["calls"]:
+ self.assertEqual(call["session_verify"], True)
+ self.assertEqual(call["kwarg_verify"], True)
+ finally:
+ mcp_tools.requests.Session.post = real_session_post
+
+
+from resources import Registry, REGISTRY_PROPERTY_NAME
+from component import Context
+
+
+from renderers.render_output_items import compute_resource_path_from_hierarchy
+
+
+class ComputeResourcePathFromHierarchyTest(TestCase):
+ """The post-render hook in render_output_items.py derives `resourcePath`
+ from `additionalContext.hierarchy` + `tags` so they stay in sync. But
+ some platforms (e.g. mcp) need hierarchy and resourcePath at different
+ depths — 3-key hierarchy for UI grouping, 2-key resourcePath for the
+ underlying resource identity. Test that an explicit `resourcePath` in
+ the template is honored, while the auto-compute path stays unchanged
+ for templates that don't set it.
+ """
+
+ def _make_data(self, hierarchy, tags, explicit_resource_path=None):
+ additional_context = {"hierarchy": hierarchy}
+ if explicit_resource_path is not None:
+ additional_context["resourcePath"] = explicit_resource_path
+ return {"spec": {"additionalContext": additional_context, "tags": tags}}
+
+ def test_auto_computes_when_resource_path_not_set(self):
+ data = self._make_data(
+ hierarchy=["platform", "mcp_server", "mcp_tool"],
+ tags=[{"name": "platform", "value": "mcp"},
+ {"name": "mcp_server", "value": "linear-mcp"},
+ {"name": "mcp_tool", "value": "list_teams"}],
+ )
+ compute_resource_path_from_hierarchy(data)
+ self.assertEqual(
+ data["spec"]["additionalContext"]["resourcePath"],
+ "mcp/linear-mcp/list_teams")
+
+ def test_honors_explicit_resource_path_from_template(self):
+ # Same hierarchy as above, but template already set resourcePath to a
+ # different depth — must not be overwritten.
+ data = self._make_data(
+ hierarchy=["platform", "mcp_server", "mcp_tool"],
+ tags=[{"name": "platform", "value": "mcp"},
+ {"name": "mcp_server", "value": "linear-mcp"},
+ {"name": "mcp_tool", "value": "list_teams"}],
+ explicit_resource_path="mcp/linear-mcp",
+ )
+ compute_resource_path_from_hierarchy(data)
+ self.assertEqual(
+ data["spec"]["additionalContext"]["resourcePath"],
+ "mcp/linear-mcp")
+
+ def test_empty_explicit_resource_path_falls_through_to_auto_compute(self):
+ # `resourcePath: ""` in a template (e.g. when an upstream value was
+ # missing) should not block auto-compute — we only honor truthy values.
+ data = self._make_data(
+ hierarchy=["platform", "cluster"],
+ tags=[{"name": "platform", "value": "kubernetes"},
+ {"name": "cluster", "value": "prod"}],
+ explicit_resource_path="",
+ )
+ compute_resource_path_from_hierarchy(data)
+ self.assertEqual(
+ data["spec"]["additionalContext"]["resourcePath"],
+ "kubernetes/prod")
+
+
+class ComputeAccessTest(TestCase):
+ def test_read_only_hint_true_is_authoritative(self):
+ tool = {"name": "create_issue", "annotations": {"readOnlyHint": True}}
+ self.assertEqual(mcp_tools._compute_access(tool), "read-only")
+
+ def test_read_only_hint_false_falls_back_to_name_heuristic(self):
+ # Hint says not read-only, but verb says it is — heuristic wins.
+ # Some MCP servers leave readOnlyHint at the default `false` even for
+ # tools that clearly only read; the verb is the more reliable signal.
+ tool = {"name": "list_teams", "annotations": {"readOnlyHint": False}}
+ self.assertEqual(mcp_tools._compute_access(tool), "read-only")
+
+ def test_missing_annotations_uses_name_heuristic(self):
+ for name in ("list_teams", "get_project", "search_documentation",
+ "read_file", "describe_table", "fetch_user", "find_issue",
+ "query_db", "show_status", "view_doc"):
+ self.assertEqual(mcp_tools._compute_access({"name": name}),
+ "read-only", msg=name)
+
+ def test_write_verbs_default_to_read_write(self):
+ for name in ("create_issue", "update_project", "delete_attachment",
+ "save_comment", "send_message", "post_status"):
+ self.assertEqual(mcp_tools._compute_access({"name": name}),
+ "read-write", msg=name)
+
+ def test_heuristic_matches_kebab_case_names(self):
+ self.assertEqual(
+ mcp_tools._compute_access({"name": "list-teams"}), "read-only")
+ self.assertEqual(
+ mcp_tools._compute_access({"name": "create-issue"}), "read-write")
+
+ def test_heuristic_does_not_match_substring(self):
+ # `listen_for_events` starts with "listen", not "list_" — must not
+ # be flagged read-only just because "list" is a prefix of "listen".
+ self.assertEqual(
+ mcp_tools._compute_access({"name": "listen_for_events"}),
+ "read-write")
+ # Same for `gettysburg_addresses` — not "get_".
+ self.assertEqual(
+ mcp_tools._compute_access({"name": "gettysburg_addresses"}),
+ "read-write")
+
+ def test_empty_or_missing_name_defaults_to_read_write(self):
+ self.assertEqual(mcp_tools._compute_access({}), "read-write")
+ self.assertEqual(mcp_tools._compute_access({"name": ""}), "read-write")
+
+
+class EmitToolResourceTest(TestCase):
+ def test_emits_resource_with_expected_shape(self):
+ reg = Registry()
+ server = {"server_id": "u1", "display_name": "jira",
+ "url": "https://jira-mcp.internal/mcp",
+ "secret_ref": "jira-mcp-token"}
+ tool = {"name": "create_issue",
+ "description": "Create a Jira issue",
+ "inputSchema": {"type": "object",
+ "properties": {"project": {"type": "string"}},
+ "required": ["project"]}}
+ mcp_tools._emit_tool_resource(reg, server, tool)
+
+ rt = reg.lookup_resource_type("mcp", "mcp_tool")
+ self.assertIsNotNone(rt)
+ self.assertEqual(len(rt.instances), 1)
+ res = next(iter(rt.instances.values()))
+ self.assertEqual(res.spec["server_display_name"], "jira")
+ self.assertEqual(res.spec["tool_name"], "create_issue")
+ self.assertEqual(res.spec["secret_ref"], "jira-mcp-token")
+ self.assertEqual(res.spec["input_schema"]["required"], ["project"])
+ # `create_*` is a write verb → access defaults to read-write.
+ self.assertEqual(res.spec["access"], "read-write")
+
+ def test_emits_access_read_only_for_list_verb(self):
+ reg = Registry()
+ server = {"display_name": "jira",
+ "url": "https://jira-mcp.internal/mcp",
+ "secret_ref": "jira-mcp-token"}
+ mcp_tools._emit_tool_resource(
+ reg, server, {"name": "list_projects", "inputSchema": {}})
+ res = next(iter(
+ reg.lookup_resource_type("mcp", "mcp_tool").instances.values()))
+ self.assertEqual(res.spec["access"], "read-only")
+
+ def test_index_skips_when_config_empty(self):
+ ctx = Context({}, mock.MagicMock())
+ ctx.set_property(REGISTRY_PROPERTY_NAME, Registry())
+ mcp_tools.index(ctx) # should not raise
+ reg = ctx.get_property(REGISTRY_PROPERTY_NAME)
+ self.assertEqual(reg.platforms, {})
+
+ def test_index_preserves_on_tools_list_failure(self):
+ # MCP_CONFIG lists one server; tools/list raises → no resources
+ # emitted, warning recorded, no exception propagated.
+ ctx = Context({
+ "MCP_CONFIG": {"servers": [
+ {"display_name": "jira",
+ "url": "https://jira-mcp.internal/mcp",
+ "secret_ref": "tok"},
+ ]},
+ }, mock.MagicMock())
+ ctx.set_property(REGISTRY_PROPERTY_NAME, Registry())
+ with mock.patch.object(mcp_tools, "_list_tools",
+ side_effect=RuntimeError("boom")) as patched:
+ mcp_tools.index(ctx)
+ self.assertEqual(patched.call_count, 1)
+ reg = ctx.get_property(REGISTRY_PROPERTY_NAME)
+ self.assertEqual(reg.platforms, {})
+ self.assertTrue(any("boom" in w for w in ctx.warnings))
+
+
+import yaml as _yaml
+
+
+class EndToEndMcpIndexingTest(TestCase):
+ """Runs the full mcp_tools indexer against an in-memory MCP_CONFIG +
+ stub MCP server, then renders the rw-generic-codecollection templates
+ against the resulting registry and asserts the SLX + Runbook YAML
+ reflect the discovered tool."""
+
+ @responses.activate
+ def test_indexer_to_template_render(self):
+ # 1. Helm-provided MCP_CONFIG (single server)
+ mcp_config = {"servers": [
+ {"display_name": "jira",
+ "url": "https://jira-mcp.internal/mcp",
+ "secret_ref": "jira-mcp-token"},
+ ]}
+
+ # 2. Stub MCP tools/list
+ url = "https://jira-mcp.internal/mcp"
+
+ def init_cb(request):
+ body = json.loads(request.body)
+ return (200, {"Content-Type": "application/json",
+ "Mcp-Session-Id": "s1"},
+ json.dumps({"jsonrpc": "2.0", "id": body["id"],
+ "result": {"protocolVersion": "2025-03-26",
+ "capabilities": {},
+ "serverInfo": {"name": "x", "version": "0"}}}))
+
+ def list_cb(request):
+ body = json.loads(request.body)
+ if body.get("method") == "notifications/initialized":
+ return (200, {}, "")
+ assert body["method"] == "tools/list"
+ return (200, {"Content-Type": "application/json"},
+ json.dumps({"jsonrpc": "2.0", "id": body["id"],
+ "result": {"tools": [
+ {"name": "create_issue",
+ "description": "Create a Jira issue",
+ "inputSchema": {
+ "type": "object",
+ "properties": {
+ "project": {"type": "string", "description": "Project key"},
+ "summary": {"type": "string"},
+ },
+ "required": ["project", "summary"]}},
+ ]}}))
+
+ responses.add_callback(responses.POST, url, callback=init_cb)
+ responses.add_callback(responses.POST, url, callback=list_cb)
+ responses.add_callback(responses.POST, url, callback=list_cb)
+
+ # 3. Run indexer with the secret resolver stubbed out
+ ctx = Context({"MCP_CONFIG": mcp_config}, mock.MagicMock())
+ ctx.set_property(REGISTRY_PROPERTY_NAME, Registry())
+ with mock.patch.object(mcp_tools, "_resolve_secret",
+ return_value="stub-token"):
+ mcp_tools.index(ctx)
+ reg = ctx.get_property(REGISTRY_PROPERTY_NAME)
+ instances = reg.lookup_resource_type("mcp", "mcp_tool").instances
+ self.assertEqual(len(instances), 1)
+ match_resource = next(iter(instances.values()))
+
+ # 4. Render Runbook template from the codecollection.
+ import jinja2
+ cb_path = os.environ.get(
+ "MCP_TOOL_PROXY_PATH",
+ "/Users/prats/Documents/work/rw-generic-codecollection/codebundles/mcp-tool-proxy",
+ )
+ env = jinja2.Environment(
+ loader=jinja2.FileSystemLoader(os.path.join(cb_path, ".runwhen/templates")),
+ undefined=jinja2.StrictUndefined,
+ )
+ t = env.get_template("mcp-tool-proxy-runbook.yaml")
+ out = t.render(slx_name="mcp-jira-create-issue",
+ default_location="loc1",
+ match_resource=match_resource)
+ parsed = _yaml.safe_load(out)
+ runtime_vars = parsed["spec"]["runtimeVarsProvided"]
+ names = {v["name"] for v in runtime_vars}
+ self.assertEqual(names, {"project", "summary"})
+ # RuntimeVarEntry only allows name/default/description/validation
+ # (corestate-operator api/v1/common_types.go) — guard against drift
+ # that would put unknown fields on the rendered Runbook.
+ allowed_fields = {"name", "default", "description", "validation"}
+ for v in runtime_vars:
+ extra = set(v.keys()) - allowed_fields
+ self.assertFalse(extra, f"runtimeVarsProvided[{v.get('name')}] has unexpected field(s): {extra}")
+ # Every var must carry a validation block with a CRD-allowed type
+ # (the CRD enum is {enum, regex}); when the MCP property has neither
+ # enum nor pattern the template falls back to regex .*
+ self.assertIn("validation", v, f"runtimeVarsProvided[{v['name']}] missing validation")
+ self.assertIn(v["validation"]["type"], {"enum", "regex"},
+ f"runtimeVarsProvided[{v['name']}] has invalid validation.type")
+ # Static config var carries the input schema as JSON for the codebundle.
+ # MCP_VERIFY_TLS is forwarded from the indexer's per-server verify_tls
+ # flag — defaults to "true" when the field is omitted from mcpConfig.
+ config_names = {c["name"] for c in parsed["spec"]["configProvided"]}
+ self.assertEqual(
+ config_names,
+ {"MCP_SERVER_URL", "MCP_SERVER_DISPLAY_NAME", "MCP_TOOL_NAME",
+ "MCP_INPUT_SCHEMA", "MCP_VERIFY_TLS"},
+ )
+
+
+class RunbookDefaultsAreStringsTest(TestCase):
+ """The runbook template must coerce every runtimeVar `default` to a YAML
+ string. Robot Framework treats runtime vars as strings — if the template
+ leaves a JSON schema default as a raw number/bool/list/dict, YAML parses
+ it as that native type and the runner barfs on the type mismatch."""
+
+ def _render(self, properties: dict, required: list | None = None) -> dict:
+ import jinja2
+ from indexers import mcp_tools
+ from resources import Registry, REGISTRY_PROPERTY_NAME
+ from component import Context
+
+ # Build a fake registry entry by running _emit_tool_resource directly
+ # — no MCP server needed; we only care about template rendering.
+ reg = Registry()
+ input_schema = {"type": "object", "properties": properties}
+ if required is not None:
+ input_schema["required"] = required
+ mcp_tools._emit_tool_resource(
+ reg,
+ {"display_name": "srv",
+ "url": "https://srv.local/mcp",
+ "secret_ref": "srv-token"},
+ {"name": "do_thing",
+ "description": "",
+ "inputSchema": input_schema},
+ )
+ match_resource = next(iter(
+ reg.lookup_resource_type("mcp", "mcp_tool").instances.values()))
+
+ cb_path = os.environ.get(
+ "MCP_TOOL_PROXY_PATH",
+ "/Users/prats/Documents/work/rw-generic-codecollection/codebundles/mcp-tool-proxy",
+ )
+ env = jinja2.Environment(
+ loader=jinja2.FileSystemLoader(os.path.join(cb_path, ".runwhen/templates")),
+ undefined=jinja2.StrictUndefined,
+ )
+ t = env.get_template("mcp-tool-proxy-runbook.yaml")
+ out = t.render(slx_name="srv-do-thing",
+ default_location="loc1",
+ match_resource=match_resource)
+ return _yaml.safe_load(out)
+
+ def _defaults_by_name(self, parsed: dict) -> dict:
+ return {v["name"]: v["default"] for v in parsed["spec"]["runtimeVarsProvided"]}
+
+ def test_string_default_passes_through(self):
+ defaults = self._defaults_by_name(self._render({
+ "project": {"type": "string", "default": "RW"},
+ }))
+ self.assertEqual(defaults["project"], "RW")
+ self.assertIsInstance(defaults["project"], str)
+
+ def test_int_default_becomes_string(self):
+ defaults = self._defaults_by_name(self._render({
+ "limit": {"type": "integer", "default": 42},
+ }))
+ self.assertEqual(defaults["limit"], "42")
+ self.assertIsInstance(defaults["limit"], str)
+
+ def test_bool_default_becomes_lowercase_json_string(self):
+ # tojson on True yields "true" (JSON), not "True" (Python repr) —
+ # matters because Robot/downstream consumers expect JSON-shaped bools.
+ defaults = self._defaults_by_name(self._render({
+ "dry_run": {"type": "boolean", "default": True},
+ }))
+ self.assertEqual(defaults["dry_run"], "true")
+ self.assertIsInstance(defaults["dry_run"], str)
+
+ def test_list_default_becomes_json_string(self):
+ defaults = self._defaults_by_name(self._render({
+ "tags": {"type": "array", "default": ["a", "b"]},
+ }))
+ self.assertEqual(defaults["tags"], '["a", "b"]')
+ self.assertIsInstance(defaults["tags"], str)
+
+ def test_dict_default_becomes_json_string(self):
+ defaults = self._defaults_by_name(self._render({
+ "filters": {"type": "object", "default": {"k": "v"}},
+ }))
+ self.assertEqual(defaults["filters"], '{"k": "v"}')
+ self.assertIsInstance(defaults["filters"], str)
+
+ def test_missing_default_becomes_empty_string(self):
+ defaults = self._defaults_by_name(self._render({
+ "project": {"type": "string"},
+ }))
+ self.assertEqual(defaults["project"], "")
+ self.assertIsInstance(defaults["project"], str)
+
+ def test_null_default_becomes_empty_string(self):
+ defaults = self._defaults_by_name(self._render({
+ "project": {"type": "string", "default": None},
+ }))
+ self.assertEqual(defaults["project"], "")
+ self.assertIsInstance(defaults["project"], str)
+
+
+class RunbookRequiredParamMarkingTest(RunbookDefaultsAreStringsTest):
+ """The runbook template should mark required parameters in the rendered
+ description so downstream UI/agent surfaces know which inputs are
+ mandatory. JSON Schema lists required fields at the top level (not as a
+ per-property flag), so this needs the schema's `required` array."""
+
+ def _descriptions_by_name(self, parsed: dict) -> dict:
+ return {v["name"]: v["description"] for v in parsed["spec"]["runtimeVarsProvided"]}
+
+ def test_required_param_with_description_gets_suffix(self):
+ descs = self._descriptions_by_name(self._render(
+ {"project": {"type": "string", "description": "Project key"}},
+ required=["project"],
+ ))
+ self.assertEqual(descs["project"], "Project key (required)")
+
+ def test_required_param_with_empty_description_gets_sentence(self):
+ descs = self._descriptions_by_name(self._render(
+ {"project": {"type": "string"}},
+ required=["project"],
+ ))
+ self.assertEqual(descs["project"], "Required parameter.")
+
+ def test_optional_param_description_is_unchanged(self):
+ descs = self._descriptions_by_name(self._render(
+ {"summary": {"type": "string", "description": "Issue title"}},
+ required=["project"], # different param required; summary is not
+ ))
+ self.assertEqual(descs["summary"], "Issue title")
+
+ def test_no_required_array_in_schema(self):
+ # Schemas may omit `required` entirely — must not crash, no suffix.
+ descs = self._descriptions_by_name(self._render(
+ {"project": {"type": "string", "description": "Project key"}},
+ required=None,
+ ))
+ self.assertEqual(descs["project"], "Project key")
+
+ def test_mixed_required_and_optional(self):
+ descs = self._descriptions_by_name(self._render(
+ {"project": {"type": "string", "description": "Project key"},
+ "summary": {"type": "string", "description": "Issue title"},
+ "labels": {"type": "string", "description": "CSV labels"}},
+ required=["project", "summary"],
+ ))
+ self.assertEqual(descs["project"], "Project key (required)")
+ self.assertEqual(descs["summary"], "Issue title (required)")
+ self.assertEqual(descs["labels"], "CSV labels")
\ No newline at end of file