diff --git a/docs/mcp-e2e-test-suite.md b/docs/mcp-e2e-test-suite.md new file mode 100644 index 00000000..9e856416 --- /dev/null +++ b/docs/mcp-e2e-test-suite.md @@ -0,0 +1,211 @@ +# MCP Protocol E2E Test Suite + +**Contribution by Flamehaven | CAS-2026-05-ASSET-004 | 2026-05-06** + +--- + +## What This Is + +A pytest-native, environment-agnostic end-to-end test suite for the **MCP stdio transport layer** of AssetOpsBench. + +### The Coverage Gap This Fills + +All existing tests in `src/servers/vibration/tests/test_tools.py` use the following pattern: + +```python +result = await call_tool(mcp, "compute_fft_spectrum", {"data_id": data_id}) +``` + +This calls `mcp.call_tool()` directly in-process. It tests **tool logic** but never touches the MCP wire protocol — no subprocess is spawned, no JSON-RPC frames are exchanged, no stdio streams are opened. + +This suite tests the actual transport stack: + +``` +pytest process + -> spawns servers.vibration.main subprocess via sys.executable + -> communicates over stdin/stdout via MCP JSON-RPC (stdio transport) + -> validates Pydantic schema enforcement, error responses, clean teardown +``` + +--- + +## Requirements + +- Python 3.12+ +- `mcp[cli]` and `fastmcp` installed (included in `pyproject.toml` core deps) +- `pytest`, `pytest-anyio`, `anyio` installed (included in `[dependency-groups].dev`) +- **No CouchDB, no API keys, no external services required** +- **Subprocess spawning required** — each test spawns a real server process via `sys.executable`. Sandboxed environments that block subprocess creation or Windows named pipe access (e.g., restricted container runtimes, some in-browser execution environments) will fail. Standard CI runners (GitHub Actions ubuntu-latest, Windows Server) are confirmed compatible. + +--- + +## Running + +```bash +# All 6 scenarios +pytest test_mcp_e2e.py -v + +# Single scenario by keyword +pytest test_mcp_e2e.py -v -k "sc04" + +# Parallel (xdist-safe) +pytest -n auto test_mcp_e2e.py + +# Legacy standalone (original behavior, backward compatible) +python test_mcp_e2e.py +``` + +Expected output: + +``` +test_mcp_e2e.py::TestVibrationMCPProtocol::test_sc01_tool_listing[asyncio] PASSED +test_mcp_e2e.py::TestVibrationMCPProtocol::test_sc02_static_tool_happy_path[asyncio] PASSED +test_mcp_e2e.py::TestVibrationMCPProtocol::test_sc03_iso_severity_zone_classification[asyncio] PASSED +test_mcp_e2e.py::TestVibrationMCPProtocol::test_sc04_pydantic_boundary_missing_required_field[asyncio] PASSED +test_mcp_e2e.py::TestVibrationMCPProtocol::test_sc05_data_not_found_returns_error_not_crash[asyncio] PASSED +test_mcp_e2e.py::TestVibrationMCPProtocol::test_sc06_session_teardown_exits_cleanly[asyncio] PASSED + +6 passed; runtime varies by local subprocess startup cost +``` + +--- + +## Engineering Design Decisions + +### 1. `sys.executable` — Environment Agnostic + +The original script used `command="python"`, which resolves from OS `PATH` and may point to a system Python without `mcp` installed, causing `ModuleNotFoundError` in the subprocess. + +```python +# Before (fragile) +StdioServerParameters(command="python", args=["-m", "servers.vibration.main"]) + +# After (environment-agnostic) +StdioServerParameters(command=sys.executable, args=["-m", "servers.vibration.main"]) +``` + +`sys.executable` always resolves to the Python running pytest — venv, conda, or pyenv. + +### 2. `anyio.fail_after` — Zombie Process Prevention + +`asyncio.wait_for` raises `TimeoutError` but does not propagate cancellation through anyio task groups. `stdio_client` uses anyio internally to manage its read/write streams and subprocess lifecycle. Using `anyio.fail_after` ensures the cancellation signal travels through the full anyio task group stack, terminating the server subprocess rather than leaving a zombie process. + +```python +@asynccontextmanager +async def _mcp_session(module: str): + async with stdio_client(params) as (r, w): + async with ClientSession(r, w) as session: + with anyio.fail_after(_DEADLINE): # anyio cancel scope + await session.initialize() + yield session + +# SC-06 wraps the full lifecycle including teardown: +with anyio.fail_after(_DEADLINE * 2): + async with _mcp_session("servers.vibration.main") as session: + ... +``` + +### 3. `_SENSITIVE_KEYS` — Air-Gapped Sandbox + +`get_default_environment()` inherits the full OS environment including `WATSONX_APIKEY`, `OPENAI_API_KEY`, etc. If server logic is ever modified to call an LLM, tests would make billable API calls silently. All LLM credentials are stripped before passing the environment to the subprocess. + +```python +_SENSITIVE_KEYS = frozenset({ + "WATSONX_APIKEY", "WATSONX_PROJECT_ID", "OPENAI_API_KEY", + "ANTHROPIC_API_KEY", "LITELLM_API_KEY", "LITELLM_BASE_URL", ... +}) +for key in _SENSITIVE_KEYS: + env.pop(key, None) +``` + +### 4. `OTEL_SDK_DISABLED=true` — xdist Parallel Safe + +Under `pytest -n auto` (xdist), multiple test processes may write to the same `OTEL_TRACES_FILE` simultaneously. Setting `OTEL_SDK_DISABLED=true` in each subprocess's environment disables the OTEL SDK entirely, preventing concurrent JSONL file write collisions. + +For tests that specifically need OTEL tracing, pass an `otel_dir` path to `_server_params` to get a per-process trace file: + +```python +async with _mcp_session("servers.vibration.main", otel_dir=tmp_path) as session: + ... +``` + +### 5. `vibration_session` Fixture — Per-Test Isolation + +The function-scoped `vibration_session` fixture spawns a fresh server subprocess for each test. This eliminates in-memory data store state leaks between tests. + +SC-06 deliberately does **not** use the fixture — it tests the complete lifecycle including teardown within a single `anyio.fail_after` scope. + +--- + +## Test Scenarios + +| ID | Name | What It Tests | +|---|---|---| +| SC-01 | Tool Listing | Server starts over stdio; all 8 expected tools are exposed | +| SC-02 | Static Tool Happy Path | `list_known_bearings` returns bearing DB without CouchDB | +| SC-03 | ISO Severity Classification | `assess_vibration_severity` returns correct ISO 10816 zones (A and D) | +| SC-04 | Pydantic Boundary | `diagnose_vibration` with missing `data_id` → `Field required`, no crash | +| SC-05 | Data Not Found | `compute_fft_spectrum` with unknown `data_id` → structured error dict | +| SC-06 | Teardown Regression | Full lifecycle within `anyio.fail_after(40s)` — FIND-010 deadlock guard | + +--- + +## Extending to Other Servers + +To add E2E coverage for another MCP server (e.g., `servers.utilities.main`), add a new class: + +```python +class TestUtilitiesMCPProtocol: + + @pytest.fixture + async def utilities_session(self): + async with _mcp_session("servers.utilities.main") as session: + yield session + + @pytest.mark.anyio + async def test_sc01_tool_listing(self, utilities_session): + tools = await asyncio.wait_for(utilities_session.list_tools(), timeout=_DEADLINE) + names = {t.name for t in tools.tools} + assert "convert_units" in names # adjust to actual tool names +``` + +Servers requiring CouchDB (`servers.iot`, `servers.wo`) can be added with a skip guard: + +```python +import os +requires_couchdb = pytest.mark.skipif( + os.environ.get("COUCHDB_URL") is None, + reason="CouchDB not available (set COUCHDB_URL)" +) + +@requires_couchdb +@pytest.mark.anyio +async def test_sc01_tool_listing(self, iot_session): ... +``` + +--- + +## Relationship to CAS Audit Findings + +| Finding | Scenario | +|---|---| +| FIND-006: Pydantic strict validation (PASS) | SC-04 | +| FIND-010: Async teardown deadlock (static risk) | SC-06 | +| FIND-002: DSP tools functional (confirmed) | SC-03 via ISO classification | +| FIND-005: MCP tools accessible over protocol | SC-01, SC-02 | + +--- + +## Verified On + +| Field | Value | +|---|---| +| Commit | `a928284b06411ceb9e31663d936cf8b342b15ca2` | +| OS | Windows 11 | +| Python | 3.12.10 | +| Initial result | 6 passed in 29.12s on 2026-05-06 | +| Latest local re-check | 6 passed in 18.54s on 2026-05-07 after repo-root locator hardening | + +--- + +*Flamehaven Contribution | AssetOpsBench MCP E2E Suite v2 | 2026-05-06* diff --git a/test_mcp_e2e.py b/test_mcp_e2e.py new file mode 100644 index 00000000..35b81d14 --- /dev/null +++ b/test_mcp_e2e.py @@ -0,0 +1,322 @@ +"""MCP stdio transport E2E tests — environment-agnostic, CI-safe. + +Uses sys.executable so tests run in any venv/conda/pyenv without +PATH manipulation or shell activation. No CouchDB, API keys, or +external services required for the core scenarios. + +Usage: + pytest test_mcp_e2e.py -v # all 6 scenarios + pytest test_mcp_e2e.py -v -k "sc04" # Pydantic boundary only + pytest -n auto test_mcp_e2e.py # parallel (xdist-safe) + python test_mcp_e2e.py # legacy standalone run + +Design decisions +---------------- +1. sys.executable — env-agnostic subprocess spawn, no PATH dependency +2. anyio.fail_after — propagates cancellation through anyio task groups, + terminating the server subprocess on deadlock instead + of leaving a zombie process (unlike asyncio.wait_for) +3. _SENSITIVE_KEYS — LLM credentials stripped before passing env to server; + prevents billable API calls if server logic changes +4. OTEL_SDK_DISABLED — disables OTEL in the subprocess; avoids concurrent + JSONL file writes under pytest-xdist parallel runs +5. vibration_session — function-scoped pytest fixture; each test gets its own + subprocess, eliminating cross-test state pollution +""" + +from __future__ import annotations + +import asyncio +import json +import sys +from contextlib import asynccontextmanager +from pathlib import Path +from typing import AsyncGenerator + +import anyio +import pytest +from mcp.client.session import ClientSession +from mcp.client.stdio import StdioServerParameters, get_default_environment, stdio_client + +# --------------------------------------------------------------------------- +# Constants +# --------------------------------------------------------------------------- + +def _find_repo_root(start: Path) -> Path: + """Locate AssetOpsBench root whether this file is at root or in a suite folder.""" + for candidate in (start, *start.parents): + if (candidate / "pyproject.toml").is_file() and ( + candidate / "src" / "servers" / "vibration" / "main.py" + ).is_file(): + return candidate + raise RuntimeError( + "Could not locate AssetOpsBench repo root. Place this test under the " + "repository root or a child directory containing pyproject.toml and src/." + ) + + +_REPO_ROOT = _find_repo_root(Path(__file__).resolve().parent) +_SRC = _REPO_ROOT / "src" +_DEADLINE = 20 # seconds per operation; anyio enforces this at subprocess level + +# LLM credentials that must not reach the test subprocess. +# Prevents accidental billable API calls if server-side logic is ever changed. +_SENSITIVE_KEYS: frozenset[str] = frozenset({ + "WATSONX_APIKEY", "WATSONX_PROJECT_ID", "WATSONX_URL", + "OPENAI_API_KEY", "ANTHROPIC_API_KEY", "LITELLM_API_KEY", + "LITELLM_BASE_URL", "COHERE_API_KEY", "AZURE_API_KEY", + "AZURE_API_BASE", "HUGGINGFACE_API_KEY", +}) + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _server_params( + module: str, + *, + otel_dir: Path | None = None, +) -> StdioServerParameters: + """Build StdioServerParameters for a given server module. + + Uses sys.executable (env-agnostic), strips LLM credentials (air-gap), + and disables OTEL SDK by default to prevent JSONL file collisions under + pytest-xdist. Pass otel_dir to enable per-process OTEL tracing instead. + """ + env = get_default_environment() + env["PYTHONPATH"] = str(_SRC) + + # Air-gap: strip all LLM credentials from the subprocess environment + for key in _SENSITIVE_KEYS: + env.pop(key, None) + + if otel_dir is not None: + # Per-process trace file — safe for xdist and keeps OTEL enabled + env.pop("OTEL_SDK_DISABLED", None) + env["OTEL_TRACES_FILE"] = str(otel_dir / "traces.jsonl") + else: + # Default: disable OTEL entirely in test subprocess + env["OTEL_SDK_DISABLED"] = "true" + + return StdioServerParameters( + command=sys.executable, + args=["-m", module], + env=env, + ) + + +@asynccontextmanager +async def _mcp_session( + module: str, + *, + otel_dir: Path | None = None, +) -> AsyncGenerator[ClientSession, None]: + """Open an MCP session with anyio deadline enforcement. + + anyio.fail_after is used instead of asyncio.wait_for because anyio's + cancel scopes propagate through anyio task groups — which is how + stdio_client manages its read/write streams internally. This ensures + the server subprocess is terminated rather than orphaned on timeout. + + The deadline covers initialization only. Per-call timeouts are applied + at the call site via asyncio.wait_for. + """ + params = _server_params(module, otel_dir=otel_dir) + async with stdio_client(params) as (read_stream, write_stream): + async with ClientSession(read_stream, write_stream) as session: + with anyio.fail_after(_DEADLINE): + await session.initialize() + yield session + + +def _parse_result(result) -> dict: + """Extract the first JSON dict from an MCP tool call result.""" + for content in result.content: + text = getattr(content, "text", None) or str(content) + try: + return json.loads(text) + except (json.JSONDecodeError, TypeError): + return {"_raw": text} + return {} + + +# --------------------------------------------------------------------------- +# Pytest fixture — function-scoped (fresh subprocess per test) +# --------------------------------------------------------------------------- + + +@pytest.fixture +async def vibration_session() -> AsyncGenerator[ClientSession, None]: + """Function-scoped fixture: each test gets a fresh vibration server process. + + Isolation guarantee: no in-memory data store state leaks between tests. + Teardown: stdio_client.__aexit__ terminates the subprocess via anyio + cancel scope when the fixture goes out of scope. + """ + async with _mcp_session("servers.vibration.main") as session: + yield session + + +# --------------------------------------------------------------------------- +# vibration server — no CouchDB required +# --------------------------------------------------------------------------- + + +class TestVibrationMCPProtocol: + """E2E tests over real MCP stdio transport to the vibration server. + + SC-01 to SC-05 use the vibration_session fixture (function-scoped). + SC-06 exercises the full lifecycle directly to validate teardown. + """ + + @pytest.mark.anyio + async def test_sc01_tool_listing(self, vibration_session: ClientSession) -> None: + """SC-01: Server starts and exposes expected tools over stdio.""" + tools = await asyncio.wait_for(vibration_session.list_tools(), timeout=_DEADLINE) + names = {t.name for t in tools.tools} + expected = { + "get_vibration_data", + "list_vibration_sensors", + "compute_fft_spectrum", + "compute_envelope_spectrum", + "assess_vibration_severity", + "calculate_bearing_frequencies", + "list_known_bearings", + "diagnose_vibration", + } + assert expected <= names, f"Missing tools: {expected - names}" + + @pytest.mark.anyio + async def test_sc02_static_tool_happy_path(self, vibration_session: ClientSession) -> None: + """SC-02: list_known_bearings returns static database without CouchDB.""" + result = await asyncio.wait_for( + vibration_session.call_tool("list_known_bearings", {}), + timeout=_DEADLINE, + ) + data = _parse_result(result) + assert "bearings" in data, f"Expected 'bearings' key, got: {data}" + assert len(data["bearings"]) >= 5 + names = [b["name"] for b in data["bearings"]] + assert any("6205" in n for n in names), f"6205 not found in {names}" + + @pytest.mark.anyio + async def test_sc03_iso_severity_zone_classification(self, vibration_session: ClientSession) -> None: + """SC-03: assess_vibration_severity classifies ISO 10816 zones correctly.""" + zone_d = _parse_result(await asyncio.wait_for( + vibration_session.call_tool("assess_vibration_severity", {"rms_velocity_mm_s": 50.0}), + timeout=_DEADLINE, + )) + zone_a = _parse_result(await asyncio.wait_for( + vibration_session.call_tool("assess_vibration_severity", {"rms_velocity_mm_s": 0.5}), + timeout=_DEADLINE, + )) + assert zone_d.get("iso_zone") == "D", f"Expected D, got: {zone_d}" + assert zone_a.get("iso_zone") == "A", f"Expected A, got: {zone_a}" + + @pytest.mark.anyio + async def test_sc04_pydantic_boundary_missing_required_field( + self, vibration_session: ClientSession + ) -> None: + """SC-04: Pydantic rejects diagnose_vibration when data_id is absent. + + Primary MCP schema validation regression test. The FastMCP Pydantic + layer must catch the missing field and return a structured validation + error rather than crashing the subprocess. + """ + result = await asyncio.wait_for( + vibration_session.call_tool( + "diagnose_vibration", + { + "velocity_rms": 15.5, + "machine_class": "Class I", + "envelope_peak_freqs": [50.0, 100.0, 150.0], + "bpfo": 50.0, + "bpfi": 70.0, + "ftf": 10.0, + "bsf": 30.0, + }, + ), + timeout=_DEADLINE, + ) + raw_text = result.content[0].text if result.content else "" + assert "data_id" in raw_text, f"Expected 'data_id' in error, got: {raw_text!r}" + assert any( + kw in raw_text.lower() + for kw in ("field required", "validation error", "missing") + ), f"Expected validation error message, got: {raw_text!r}" + + @pytest.mark.anyio + async def test_sc05_data_not_found_returns_error_not_crash( + self, vibration_session: ClientSession + ) -> None: + """SC-05: compute_fft_spectrum with unknown data_id returns error dict. + + Verifies application-level errors (data not found) are returned as + structured JSON error responses, not unhandled subprocess crashes. + """ + result = await asyncio.wait_for( + vibration_session.call_tool( + "compute_fft_spectrum", {"data_id": "__nonexistent__"} + ), + timeout=_DEADLINE, + ) + data = _parse_result(result) + assert "error" in data, f"Expected error key, got: {data}" + + @pytest.mark.anyio + async def test_sc06_session_teardown_exits_cleanly(self) -> None: + """SC-06: Full session lifecycle completes within 2x_DEADLINE seconds. + + Regression test for FIND-010 (async teardown deadlock). Uses the + context manager directly (not the fixture) to test the complete + lifecycle. anyio.fail_after wraps the entire block — if teardown + hangs, anyio cancels it via the stdio_client task group rather + than leaving a zombie process behind. + """ + exited = False + with anyio.fail_after(_DEADLINE * 2): + async with _mcp_session("servers.vibration.main") as session: + await asyncio.wait_for(session.list_tools(), timeout=_DEADLINE) + exited = True + assert exited, "Session teardown caused a deadlock" + + +# --------------------------------------------------------------------------- +# Legacy standalone entry point (python test_mcp_e2e.py) +# --------------------------------------------------------------------------- + + +async def _legacy_run() -> None: + """Original single-scenario run retained for backward compatibility.""" + print("[E2E] Starting vibration MCP server...") + params = _server_params("servers.vibration.main") + async with stdio_client(params) as (read_stream, write_stream): + print("[E2E] Server connected. Initializing session...") + async with ClientSession(read_stream, write_stream) as session: + await session.initialize() + print("[E2E] Session initialized.") + tools = await session.list_tools() + print(f"[E2E] Available tools: {[t.name for t in tools.tools]}") + mock_args = { + "velocity_rms": 15.5, + "machine_class": "Class I", + "envelope_peak_freqs": [50.0, 100.0, 150.0], + "bpfo": 50.0, + "bpfi": 70.0, + "ftf": 10.0, + "bsf": 30.0, + } + print(f"[E2E] Calling 'diagnose_vibration' with mock data: {mock_args}") + try: + result = await session.call_tool("diagnose_vibration", mock_args) + print("[E2E] Tool executed successfully.") + print("[E2E] Result:") + for content in result.content: + print(content.text) + except Exception as e: + print(f"[E2E] ERROR during tool execution: {e}") + + +if __name__ == "__main__": + asyncio.run(_legacy_run())