From 19c8fabdc6a8cf2a5da8e5e321086f06671355d2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1=C5=A1=20Reme=C5=A1?= Date: Mon, 22 Jun 2026 14:44:54 +0200 Subject: [PATCH 1/2] feat: wire MCP server support into all three providers MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add external MCP server integration via streamable HTTP transport. The operator configures endpoints through LIGHTSPEED_MCP_SERVERS env var (JSON array of {name, url, headers?} objects). Configuration: - McpServerConfig dataclass and resolve_mcp_servers() in config.py - mcp_servers field on ResolvedSDK and ProviderQueryOptions - Flows from app startup through routing to provider query Provider wiring: - Claude: mcp_servers dict on ClaudeAgentOptions with mcp__* allowed_tools patterns - Gemini: McpToolset with StreamableHTTPConnectionParams in tools - OpenAI: MCPServerStreamableHttp with connect/cleanup lifecycle Health probes: - R3 check_mcp_endpoints() probes each MCP server URL - Readiness route returns 503 when any MCP endpoint unreachable Signed-off-by: Tomáš Remeš Assisted-by: Claude Code:claude-opus-4-6 --- .ai/spec/what/configuration.md | 6 +- .ai/spec/what/health-probes.md | 4 +- .ai/spec/what/provider-contract.md | 20 ++-- AGENTS.md | 2 + src/lightspeed_agentic/app.py | 1 + src/lightspeed_agentic/config.py | 52 +++++++++- src/lightspeed_agentic/health.py | 15 ++- src/lightspeed_agentic/providers/claude.py | 17 +++- src/lightspeed_agentic/providers/gemini.py | 15 ++- src/lightspeed_agentic/providers/openai.py | 108 ++++++++++++--------- src/lightspeed_agentic/routes/__init__.py | 6 ++ src/lightspeed_agentic/routes/query.py | 7 +- src/lightspeed_agentic/types.py | 6 +- tests/test_config.py | 81 +++++++++++++++- tests/test_ready.py | 94 +++++++++++++++++- 15 files changed, 369 insertions(+), 65 deletions(-) diff --git a/.ai/spec/what/configuration.md b/.ai/spec/what/configuration.md index fbac5df..7fc4379 100644 --- a/.ai/spec/what/configuration.md +++ b/.ai/spec/what/configuration.md @@ -17,6 +17,7 @@ Cross-references: how options are consumed in code → `how/provider-architectur | `LIGHTSPEED_PROVIDER_PROJECT` | When provider=`vertex` | Cloud project ID | | `LIGHTSPEED_PROVIDER_REGION` | When provider=`vertex` or `bedrock` | Cloud region | | `LIGHTSPEED_PROVIDER_API_VERSION` | When provider=`azure` | API version | + | `LIGHTSPEED_MCP_SERVERS` | No | JSON array of MCP server configs: `[{"name":"…","url":"…","headers":{}}]` | Credentials are mounted via `envFrom` (all secret keys as env vars) AND as files at `/var/run/secrets/llm-credentials/`. @@ -66,7 +67,9 @@ Cross-references: how options are consumed in code → `how/provider-architectur 18. **Non-hermetic fallback.** When prefetch directories are absent, the container build recipe may fetch selected binaries from external URLs for developer builds. -19. **System packages — minimum expectations.** Runtime image includes Bash, Git, OpenShift CLI (`oc`), Kubernetes CLI (`kubectl`), ripgrep, Node.js (Claude Code CLI), and supporting OS utilities for debugging and archives per the container recipe. +19. **MCP server configuration.** `LIGHTSPEED_MCP_SERVERS` is an optional JSON-encoded array of MCP server endpoint configs. Each entry MUST have `name` (string) and `url` (string); `headers` (object) is optional. When set, `resolve_mcp_servers()` parses and validates the entries at startup. The resulting `McpServerConfig` tuple is carried on `ResolvedSDK.mcp_servers` and passed through to `ProviderQueryOptions.mcp_servers`. Each provider maps these to its SDK's native MCP integration (Claude: `ClaudeAgentOptions.mcp_servers` dict; Gemini: `McpToolset` with `StreamableHTTPConnectionParams`; OpenAI: `MCPServerStreamableHttp` instances). Only streamable HTTP transport is supported. Health probes (R3) check reachability of each configured MCP endpoint. + +20. **System packages — minimum expectations.** Runtime image includes Bash, Git, OpenShift CLI (`oc`), Kubernetes CLI (`kubectl`), ripgrep, Node.js (Claude Code CLI), and supporting OS utilities for debugging and archives per the container recipe. ## Configuration Surface @@ -81,6 +84,7 @@ Cross-references: how options are consumed in code → `how/provider-architectur | `LIGHTSPEED_PROVIDER_API_VERSION` | API version from operator (see rule 1). | | `ANTHROPIC_MODEL`, `GEMINI_MODEL`, `OPENAI_MODEL` | Internal: SDK-specific model vars. Set by configuration mapping (rule 2), not operator. | | `LIGHTSPEED_SKILLS_DIR` | Skill root and provider working directory default. | +| `LIGHTSPEED_MCP_SERVERS` | Optional JSON array of MCP server endpoint configs (see rule 19). | | `ANTHROPIC_API_KEY` | Claude SDK credential (from credentials secret envFrom). | | `GOOGLE_API_KEY`, `GEMINI_API_KEY` | Google GenAI credential (from credentials secret envFrom). | | `OPENAI_API_KEY` | OpenAI SDK credential (from credentials secret envFrom). | diff --git a/.ai/spec/what/health-probes.md b/.ai/spec/what/health-probes.md index 9ebe0c5..e94e5eb 100644 --- a/.ai/spec/what/health-probes.md +++ b/.ai/spec/what/health-probes.md @@ -51,7 +51,7 @@ Returns HTTP 200 when all checks pass, HTTP 503 when any check fails. Not under | `gemini` | `https://generativelanguage.googleapis.com/` | | `openai` | `OPENAI_BASE_URL` or `https://api.openai.com/` | -**R3 — MCP server reachability.** Same pattern as R2, for each configured MCP endpoint. [PLANNED: when MCP support lands] +**R3 — MCP server reachability.** Same pattern as R2, for each configured MCP endpoint (from `ResolvedSDK.mcp_servers`). Each MCP server URL is probed with an unauthenticated HTTP GET (3-second timeout). Checks are keyed as `mcp_{name}` in the readiness response. Skipped when no MCP servers are configured. ## Recommended Probe Config @@ -96,4 +96,4 @@ Cross-reference: probes are **not** under `/v1/agent` → `run-api.md` rules 2, | [test_ready.py](../../../tests/test_ready.py) | R1 (credential env per backend), R2 (endpoint probe semantics), healthy/unhealthy `/ready` responses | Mocks `probe_provider_endpoint` for route tests; does not hit live provider URLs | | [sandbox_e2e.feature](../../../tests/e2e/features/sandbox_e2e.feature) (Readiness/liveness) | Liveness; readiness when container env satisfies R1 and R2 | E2e covers **happy path** only (200 on `/ready`); negative 503 scenarios stay unit-tested unless spike adds a deliberate misconfigured container | -R3 (MCP reachability) has no tests until MCP support lands. +R3 (MCP reachability) is covered by unit tests in `test_ready.py` (`check_mcp_endpoints`, `run_readiness_checks` with MCP, `/ready` route with failing MCP). diff --git a/.ai/spec/what/provider-contract.md b/.ai/spec/what/provider-contract.md index 987414f..b7235d4 100644 --- a/.ai/spec/what/provider-contract.md +++ b/.ai/spec/what/provider-contract.md @@ -38,23 +38,25 @@ Cross-references: HTTP mapping of prompts and timeouts → `run-api.md`. Env and 16. **ProviderQueryOptions — `stream`.** When true, adapters that support partial streaming should yield deltas; when false, they may batch. The HTTP `POST /run` path does not set this flag from the request body. -17. **Thin-adapter principle.** Providers MUST delegate tool execution, command invocation, and skill discovery to their SDKs. Adapters MUST NOT implement custom tool executors that duplicate SDK behavior except for minimal glue (e.g., auto-confirm, path layout). +17. **ProviderQueryOptions — `mcp_servers`.** Tuple of `McpServerConfig` (name, URL, optional headers) for external MCP server endpoints. When non-empty, each adapter wires MCP tools into its SDK: Claude passes an `mcp_servers` dict on `ClaudeAgentOptions` with `type: "http"` entries and adds `mcp__{name}__*` patterns to `allowed_tools`; Gemini appends `McpToolset` instances with `StreamableHTTPConnectionParams` to the agent tools list; OpenAI creates `MCPServerStreamableHttp` instances, manages their connect/cleanup lifecycle, and passes them to `SandboxAgent`. Only streamable HTTP transport is supported. -18. **Structured output.** When `output_schema` is set: Claude uses the SDK’s JSON-schema output format; Gemini sets native response MIME type and response schema on the content config; OpenAI wraps the schema for the agents SDK output type with strict JSON-schema mode enabled for native OpenAI endpoints (api.openai.com) and disabled for custom endpoints (vLLM etc. via `OPENAI_BASE_URL`). When strict mode is enabled, the schema is transformed to add `additionalProperties: false` and list all properties as required at every object level, as OpenAI’s strict mode requires. +18. **Thin-adapter principle.** Providers MUST delegate tool execution, command invocation, and skill discovery to their SDKs. Adapters MUST NOT implement custom tool executors that duplicate SDK behavior except for minimal glue (e.g., auto-confirm, path layout). -19. **Skills.** Claude discovers skills via SDK skill settings and a writable symlink layout under the effective cwd when the skill root is read-only. Gemini loads a skill toolset from the skill directory listing. OpenAI uses lazy skill loading from a local directory source rooted at `cwd`. +19. **Structured output.** When `output_schema` is set: Claude uses the SDK’s JSON-schema output format; Gemini sets native response MIME type and response schema on the content config; OpenAI wraps the schema for the agents SDK output type with strict JSON-schema mode enabled for native OpenAI endpoints (api.openai.com) and disabled for custom endpoints (vLLM etc. via `OPENAI_BASE_URL`). When strict mode is enabled, the schema is transformed to add `additionalProperties: false` and list all properties as required at every object level, as OpenAI’s strict mode requires. -20. **Default allowed tools list.** Shared default names: `Bash`, `Read`, `Glob`, `Grep`, `Skill`. The HTTP route always passes this list unless a future contract exposes overrides. [PLANNED: OLS-3033] +20. **Skills.** Claude discovers skills via SDK skill settings and a writable symlink layout under the effective cwd when the skill root is read-only. Gemini loads a skill toolset from the skill directory listing. OpenAI uses lazy skill loading from a local directory source rooted at `cwd`. -21. **Event logging.** A phase-tagged logger buffers `thinking_delta` events, flushes when buffer size exceeds an internal threshold or on `content_block_stop` or tool/result events, and logs truncated thinking. Tool calls and results are logged with separate input/output truncation caps. The `result` event logs cost, combined token count, and truncated final text. +21. **Default allowed tools list.** Shared default names: `Bash`, `Read`, `Glob`, `Grep`, `Skill`. The HTTP route always passes this list unless a future contract exposes overrides. [PLANNED: OLS-3033] -22. **Stringifying tool I/O.** Non-string tool arguments and results are JSON-serialized for events when the SDK exposes structured objects. +22. **Event logging.** A phase-tagged logger buffers `thinking_delta` events, flushes when buffer size exceeds an internal threshold or on `content_block_stop` or tool/result events, and logs truncated thinking. Tool calls and results are logged with separate input/output truncation caps. The `result` event logs cost, combined token count, and truncated final text. -23. **Gemini / Vertex.** When Vertex mode is enabled via environment, search-style tools MUST NOT be combined with non-search tools in the same agent tool list; the adapter omits those search tools in that mode. +23. **Stringifying tool I/O.** Non-string tool arguments and results are JSON-serialized for events when the SDK exposes structured objects. -24. **Gemini / exit loop.** When no `output_schema` is set, the adapter registers an SDK exit-loop tool; when `output_schema` is set, that tool is omitted. +24. **Gemini / Vertex.** When Vertex mode is enabled via environment, search-style tools MUST NOT be combined with non-search tools in the same agent tool list; the adapter omits those search tools in that mode. -25. **OpenAI client.** The OpenAI adapter constructs an async OpenAI client with optional base URL override from environment (see `configuration.md`). +25. **Gemini / exit loop.** When no `output_schema` is set, the adapter registers an SDK exit-loop tool; when `output_schema` is set, that tool is omitted. + +26. **OpenAI client.** The OpenAI adapter constructs an async OpenAI client with optional base URL override from environment (see `configuration.md`). ## Configuration Surface diff --git a/AGENTS.md b/AGENTS.md index bdf9ade..ae1e563 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -128,6 +128,7 @@ src/lightspeed_agentic/ | Skills | Native `skills="all"` | Native `SkillToolset` | Native `Skills` capability | | Structured output | `output_format` JSON schema | Native response schema path | `output_type` wrapper | | Streaming | Partial message stream events | `StreamingMode.SSE` | `Runner.run_streamed()` | +| MCP servers | `mcp_servers` dict on `ClaudeAgentOptions` | `McpToolset` with `StreamableHTTPConnectionParams` | `MCPServerStreamableHttp` instances | Keep provider adapters thin. The SDK should own tool execution and skill discovery; shared path logic belongs in `tools.py`, not in duplicated provider @@ -229,6 +230,7 @@ The Konflux pipeline will prefetch the new versions on the next PR. | `LIGHTSPEED_PROVIDER_REGION` | Cloud region (Vertex, Bedrock) | | `LIGHTSPEED_PROVIDER_API_VERSION` | API version (Azure) | | `LIGHTSPEED_SKILLS_DIR` | Skills root mounted by the FastAPI app, default `/app/skills` | +| `LIGHTSPEED_MCP_SERVERS` | Optional JSON array of MCP server configs: `[{"name":"…","url":"…","headers":{}}]` | | `ANTHROPIC_MODEL` | Default Claude model for query routes | | `GEMINI_MODEL` | Default Gemini model for query routes | | `OPENAI_MODEL` | Default OpenAI model for query routes | diff --git a/src/lightspeed_agentic/app.py b/src/lightspeed_agentic/app.py index fc6dd29..97e38da 100644 --- a/src/lightspeed_agentic/app.py +++ b/src/lightspeed_agentic/app.py @@ -33,6 +33,7 @@ provider, skills_dir=os.environ.get("LIGHTSPEED_SKILLS_DIR", "/app/skills"), model=startup_model, + mcp_servers=sdk.mcp_servers, ) app.include_router(router, prefix="/v1/agent") diff --git a/src/lightspeed_agentic/config.py b/src/lightspeed_agentic/config.py index c1b3b60..8989bfc 100644 --- a/src/lightspeed_agentic/config.py +++ b/src/lightspeed_agentic/config.py @@ -8,8 +8,10 @@ from __future__ import annotations import dataclasses +import json import logging import os +from typing import Any logger = logging.getLogger(__name__) @@ -22,6 +24,43 @@ def _llm_credentials_path() -> str: return override or LLM_CREDENTIALS_PATH +@dataclasses.dataclass(frozen=True) +class McpServerConfig: + """A single MCP server endpoint configured by the operator.""" + + name: str + url: str + headers: dict[str, str] = dataclasses.field(default_factory=dict) + + +def resolve_mcp_servers() -> tuple[McpServerConfig, ...]: + """Parse LIGHTSPEED_MCP_SERVERS env var into validated configs.""" + raw = os.environ.get("LIGHTSPEED_MCP_SERVERS", "").strip() + if not raw: + return () + + entries: list[Any] = json.loads(raw) + if not isinstance(entries, list): + raise ValueError("LIGHTSPEED_MCP_SERVERS must be a JSON array") + + servers: list[McpServerConfig] = [] + for i, entry in enumerate(entries): + if not isinstance(entry, dict): + raise ValueError(f"LIGHTSPEED_MCP_SERVERS[{i}] must be an object") + name = entry.get("name") + url = entry.get("url") + if not name or not isinstance(name, str): + raise ValueError(f"LIGHTSPEED_MCP_SERVERS[{i}] missing required 'name' string") + if not url or not isinstance(url, str): + raise ValueError(f"LIGHTSPEED_MCP_SERVERS[{i}] missing required 'url' string") + headers = entry.get("headers", {}) + if not isinstance(headers, dict): + raise ValueError(f"LIGHTSPEED_MCP_SERVERS[{i}] 'headers' must be an object") + servers.append(McpServerConfig(name=name, url=url, headers=headers)) + + return tuple(servers) + + _DEFAULT_VERTEX_REGION = "us-east5" _DEFAULT_BEDROCK_REGION = "us-east-1" @@ -33,6 +72,7 @@ class ResolvedSDK: name: str # "claude", "gemini", "openai" expected_envs: tuple[str, ...] # credential env vars expected from envFrom probe_url: str # R2 reachability probe base URL + mcp_servers: tuple[McpServerConfig, ...] = () def _setenv(key: str, value: str) -> None: @@ -194,5 +234,15 @@ def resolve_sdk() -> ResolvedSDK: "Supported: anthropic, vertex, openai, azure, bedrock" ) - logger.info("Resolved LIGHTSPEED_PROVIDER=%s → SDK=%s", provider, sdk.name) + mcp_servers = resolve_mcp_servers() + if mcp_servers: + sdk = dataclasses.replace(sdk, mcp_servers=mcp_servers) + logger.info( + "Resolved LIGHTSPEED_PROVIDER=%s → SDK=%s (MCP servers: %s)", + provider, + sdk.name, + ", ".join(s.name for s in mcp_servers), + ) + else: + logger.info("Resolved LIGHTSPEED_PROVIDER=%s → SDK=%s", provider, sdk.name) return sdk diff --git a/src/lightspeed_agentic/health.py b/src/lightspeed_agentic/health.py index 213598f..e169d0d 100644 --- a/src/lightspeed_agentic/health.py +++ b/src/lightspeed_agentic/health.py @@ -10,7 +10,7 @@ from fastapi import FastAPI from fastapi.responses import JSONResponse -from lightspeed_agentic.config import ResolvedSDK +from lightspeed_agentic.config import McpServerConfig, ResolvedSDK PROBE_TIMEOUT_SEC = 3.0 @@ -57,11 +57,24 @@ def check_provider_endpoint(probe_url: str) -> str: return probe_provider_endpoint(url) +def check_mcp_endpoints( + servers: tuple[McpServerConfig, ...], + timeout: float = PROBE_TIMEOUT_SEC, +) -> dict[str, str]: + """R3: probe each configured MCP server URL for reachability.""" + results: dict[str, str] = {} + for server in servers: + results[f"mcp_{server.name}"] = probe_provider_endpoint(server.url, timeout) + return results + + def run_readiness_checks(sdk: ResolvedSDK) -> tuple[bool, dict[str, str]]: checks = { "provider_env": check_provider_env(sdk.expected_envs), "provider_endpoint": check_provider_endpoint(sdk.probe_url), } + if sdk.mcp_servers: + checks.update(check_mcp_endpoints(sdk.mcp_servers)) return all(status == "ok" for status in checks.values()), checks diff --git a/src/lightspeed_agentic/providers/claude.py b/src/lightspeed_agentic/providers/claude.py index cba99b8..8670248 100644 --- a/src/lightspeed_agentic/providers/claude.py +++ b/src/lightspeed_agentic/providers/claude.py @@ -87,17 +87,32 @@ async def query(self, options: ProviderQueryOptions) -> AsyncIterator[ProviderEv "schema": options.output_schema, } + mcp_servers: dict[str, object] | None = None + allowed_tools = list(options.allowed_tools) + if options.mcp_servers: + mcp_servers = { + server.name: { + "type": "http", + "url": server.url, + **({"headers": server.headers} if server.headers else {}), + } + for server in options.mcp_servers + } + for server in options.mcp_servers: + allowed_tools.append(f"mcp__{server.name}__*") + sdk_options = ClaudeAgentOptions( model=options.model, max_turns=options.max_turns, max_budget_usd=options.max_budget_usd, system_prompt=options.system_prompt, - allowed_tools=options.allowed_tools, + allowed_tools=allowed_tools, permission_mode="bypassPermissions", cwd=effective_cwd, skills="all", include_partial_messages=True, output_format=output_format, + mcp_servers=mcp_servers, ) _tool_name = "" diff --git a/src/lightspeed_agentic/providers/gemini.py b/src/lightspeed_agentic/providers/gemini.py index 4e4bd55..1664edd 100644 --- a/src/lightspeed_agentic/providers/gemini.py +++ b/src/lightspeed_agentic/providers/gemini.py @@ -88,8 +88,6 @@ async def _auto_confirm_run(*, args: Any, tool_context: Any) -> Any: bash.run_async = _auto_confirm_run # type: ignore[method-assign] - # TODO: investigate more ADK built-in tools: - # load_artifacts, load_memory, computer_use, file_search, mcp_servers is_vertex = os.environ.get("GOOGLE_GENAI_USE_VERTEXAI", "").upper() == "TRUE" tools: list[Any] = [bash] # Vertex AI rejects mixing search tools (google_search, url_context) @@ -103,6 +101,19 @@ async def _auto_confirm_run(*, args: Any, tool_context: Any) -> Any: if skill_toolset is not None: tools.append(skill_toolset) + if options.mcp_servers: + from google.adk.tools.mcp_tool.mcp_session_manager import ( + StreamableHTTPConnectionParams, + ) + from google.adk.tools.mcp_tool.mcp_toolset import McpToolset + + for server in options.mcp_servers: + conn = StreamableHTTPConnectionParams( + url=server.url, + headers=server.headers or None, + ) + tools.append(McpToolset(connection_params=conn)) + if not options.output_schema: tools.append(exit_loop) diff --git a/src/lightspeed_agentic/providers/openai.py b/src/lightspeed_agentic/providers/openai.py index 04f569f..b227b83 100644 --- a/src/lightspeed_agentic/providers/openai.py +++ b/src/lightspeed_agentic/providers/openai.py @@ -212,6 +212,17 @@ async def query(self, options: ProviderQueryOptions) -> AsyncIterator[ProviderEv manifest = _build_manifest(options.cwd) + mcp_server_instances: list[Any] = [] + if options.mcp_servers: + from agents.mcp import MCPServerStreamableHttp + + for server in options.mcp_servers: + params: dict[str, Any] = {"url": server.url} + if server.headers: + params["headers"] = server.headers + mcp_srv = MCPServerStreamableHttp(name=server.name, params=params) + mcp_server_instances.append(mcp_srv) + agent_kwargs: dict[str, Any] = { "name": "lightspeed", "instructions": options.system_prompt, @@ -219,53 +230,62 @@ async def query(self, options: ProviderQueryOptions) -> AsyncIterator[ProviderEv "capabilities": capabilities, "default_manifest": manifest, } + if mcp_server_instances: + agent_kwargs["mcp_servers"] = mcp_server_instances if options.output_schema: agent_kwargs["output_type"] = _RawJsonSchema(options.output_schema) agent = SandboxAgent(**agent_kwargs) - run_config = RunConfig( - sandbox=SandboxRunConfig( - client=UnixLocalSandboxClient(), - ), - ) - - result = Runner.run_streamed( - agent, - options.prompt, - max_turns=options.max_turns, - run_config=run_config, - ) - - async for event in result.stream_events(): - if isinstance(event, RawResponsesStreamEvent): - if isinstance(event.data, ResponseTextDeltaEvent) and event.data.delta: - yield TextDeltaEvent(text=event.data.delta) - elif isinstance(event, RunItemStreamEvent): - if isinstance(event.item, ToolCallItem): - raw = event.item.raw_item - name = ( - getattr(raw, "name", None) - or (raw.get("name") if isinstance(raw, dict) else "") - or "" - ) - args = getattr(raw, "arguments", None) or "" - yield ToolCallEvent(name=name, input=args[:TOOL_INPUT_MAX_CHARS]) - elif isinstance(event.item, ToolCallOutputItem): - yield ToolResultEvent( - output=stringify(event.item.output)[:TOOL_OUTPUT_MAX_CHARS] - ) - - yield ContentBlockStopEvent() - - usage = getattr(result, "usage", None) or {} - input_tokens = getattr(usage, "input_tokens", 0) - output_tokens = getattr(usage, "output_tokens", 0) - - yield ResultEvent( - text=stringify(result.final_output), - cost_usd=0, - input_tokens=input_tokens, - output_tokens=output_tokens, - ) + for srv in mcp_server_instances: + await srv.connect() + + try: + run_config = RunConfig( + sandbox=SandboxRunConfig( + client=UnixLocalSandboxClient(), + ), + ) + + result = Runner.run_streamed( + agent, + options.prompt, + max_turns=options.max_turns, + run_config=run_config, + ) + + async for event in result.stream_events(): + if isinstance(event, RawResponsesStreamEvent): + if isinstance(event.data, ResponseTextDeltaEvent) and event.data.delta: + yield TextDeltaEvent(text=event.data.delta) + elif isinstance(event, RunItemStreamEvent): + if isinstance(event.item, ToolCallItem): + raw = event.item.raw_item + name = ( + getattr(raw, "name", None) + or (raw.get("name") if isinstance(raw, dict) else "") + or "" + ) + args = getattr(raw, "arguments", None) or "" + yield ToolCallEvent(name=name, input=args[:TOOL_INPUT_MAX_CHARS]) + elif isinstance(event.item, ToolCallOutputItem): + yield ToolResultEvent( + output=stringify(event.item.output)[:TOOL_OUTPUT_MAX_CHARS] + ) + + yield ContentBlockStopEvent() + + usage = getattr(result, "usage", None) or {} + input_tokens = getattr(usage, "input_tokens", 0) + output_tokens = getattr(usage, "output_tokens", 0) + + yield ResultEvent( + text=stringify(result.final_output), + cost_usd=0, + input_tokens=input_tokens, + output_tokens=output_tokens, + ) + finally: + for srv in mcp_server_instances: + await srv.cleanup() diff --git a/src/lightspeed_agentic/routes/__init__.py b/src/lightspeed_agentic/routes/__init__.py index b243c4c..192ec62 100644 --- a/src/lightspeed_agentic/routes/__init__.py +++ b/src/lightspeed_agentic/routes/__init__.py @@ -8,12 +8,16 @@ from __future__ import annotations import os +from typing import TYPE_CHECKING from fastapi import APIRouter from lightspeed_agentic.routes.query import register_query_routes from lightspeed_agentic.types import DEFAULT_MODEL, AgentProvider +if TYPE_CHECKING: + from lightspeed_agentic.config import McpServerConfig + _MODEL_ENV_VARS = { "claude": "ANTHROPIC_MODEL", "gemini": "GEMINI_MODEL", @@ -52,6 +56,7 @@ def build_router( model: str | None = None, max_turns: int = 200, default_timeout_ms: int = 300_000, + mcp_servers: tuple[McpServerConfig, ...] = (), ) -> APIRouter: resolved_model = _resolve_router_model(provider.name, model) @@ -63,5 +68,6 @@ def build_router( model=resolved_model, max_turns=max_turns, default_timeout_ms=default_timeout_ms, + mcp_servers=mcp_servers, ) return router diff --git a/src/lightspeed_agentic/routes/query.py b/src/lightspeed_agentic/routes/query.py index 9cccd2c..d270106 100644 --- a/src/lightspeed_agentic/routes/query.py +++ b/src/lightspeed_agentic/routes/query.py @@ -9,7 +9,7 @@ import asyncio import json import logging -from typing import Any +from typing import TYPE_CHECKING, Any from fastapi import APIRouter @@ -18,6 +18,9 @@ from lightspeed_agentic.tools import DEFAULT_ALLOWED_TOOLS from lightspeed_agentic.types import AgentProvider, ProviderQueryOptions +if TYPE_CHECKING: + from lightspeed_agentic.config import McpServerConfig + logger = logging.getLogger("lightspeed_agentic") @@ -61,6 +64,7 @@ def register_query_routes( model: str, max_turns: int, default_timeout_ms: int, + mcp_servers: tuple[McpServerConfig, ...] = (), ) -> None: async def run_endpoint(req: RunRequest) -> RunResponse: timeout = req.timeout_ms if req.timeout_ms is not None else default_timeout_ms @@ -84,6 +88,7 @@ async def run_endpoint(req: RunRequest) -> RunResponse: allowed_tools=DEFAULT_ALLOWED_TOOLS, cwd=skills_dir, output_schema=req.outputSchema, + mcp_servers=mcp_servers, ) ) diff --git a/src/lightspeed_agentic/types.py b/src/lightspeed_agentic/types.py index 4612b73..cda69c5 100644 --- a/src/lightspeed_agentic/types.py +++ b/src/lightspeed_agentic/types.py @@ -4,7 +4,10 @@ from abc import ABC, abstractmethod from collections.abc import AsyncIterator from dataclasses import dataclass, field -from typing import Any, Literal +from typing import TYPE_CHECKING, Any, Literal + +if TYPE_CHECKING: + from lightspeed_agentic.config import McpServerConfig TOOL_INPUT_MAX_CHARS = 300 TOOL_OUTPUT_MAX_CHARS = 500 @@ -79,6 +82,7 @@ class ProviderQueryOptions: cwd: str output_schema: dict[str, Any] | None = None stream: bool = False + mcp_servers: tuple[McpServerConfig, ...] = () class AgentProvider(ABC): diff --git a/tests/test_config.py b/tests/test_config.py index 79c9131..72ee1dd 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -2,11 +2,12 @@ from __future__ import annotations +import json import os import pytest -from lightspeed_agentic.config import resolve_sdk +from lightspeed_agentic.config import resolve_mcp_servers, resolve_sdk @pytest.fixture(autouse=True) @@ -45,6 +46,7 @@ def _clean_env(monkeypatch: pytest.MonkeyPatch) -> None: "AZURE_OPENAI_ENDPOINT", "AZURE_OPENAI_API_VERSION", "AWS_REGION", + "LIGHTSPEED_MCP_SERVERS", ]: monkeypatch.delenv(var, raising=False) @@ -227,3 +229,80 @@ def test_unknown_provider(monkeypatch: pytest.MonkeyPatch) -> None: with pytest.raises(ValueError, match="Unknown provider"): resolve_sdk() + + +# --- MCP server resolution --- + + +def test_mcp_servers_unset(monkeypatch: pytest.MonkeyPatch) -> None: + _clean_env(monkeypatch) + assert resolve_mcp_servers() == () + + +def test_mcp_servers_empty_string(monkeypatch: pytest.MonkeyPatch) -> None: + _clean_env(monkeypatch) + monkeypatch.setenv("LIGHTSPEED_MCP_SERVERS", " ") + assert resolve_mcp_servers() == () + + +def test_mcp_servers_valid(monkeypatch: pytest.MonkeyPatch) -> None: + _clean_env(monkeypatch) + monkeypatch.setenv( + "LIGHTSPEED_MCP_SERVERS", + '[{"url": "https://mcp.example.com/sse", "name": "jira"},' + ' {"url": "https://mcp2.example.com/", "name": "github", "headers": {"x-api-key": "k"}}]', + ) + servers = resolve_mcp_servers() + assert len(servers) == 2 + assert servers[0].name == "jira" + assert servers[0].url == "https://mcp.example.com/sse" + assert servers[0].headers == {} + assert servers[1].name == "github" + assert servers[1].headers == {"x-api-key": "k"} + + +def test_mcp_servers_invalid_json(monkeypatch: pytest.MonkeyPatch) -> None: + _clean_env(monkeypatch) + monkeypatch.setenv("LIGHTSPEED_MCP_SERVERS", "not json") + with pytest.raises(json.JSONDecodeError): + resolve_mcp_servers() + + +def test_mcp_servers_not_array(monkeypatch: pytest.MonkeyPatch) -> None: + _clean_env(monkeypatch) + monkeypatch.setenv("LIGHTSPEED_MCP_SERVERS", '{"url": "x", "name": "y"}') + with pytest.raises(ValueError, match="must be a JSON array"): + resolve_mcp_servers() + + +def test_mcp_servers_missing_name(monkeypatch: pytest.MonkeyPatch) -> None: + _clean_env(monkeypatch) + monkeypatch.setenv("LIGHTSPEED_MCP_SERVERS", '[{"url": "https://x.com"}]') + with pytest.raises(ValueError, match="missing required 'name'"): + resolve_mcp_servers() + + +def test_mcp_servers_missing_url(monkeypatch: pytest.MonkeyPatch) -> None: + _clean_env(monkeypatch) + monkeypatch.setenv("LIGHTSPEED_MCP_SERVERS", '[{"name": "test"}]') + with pytest.raises(ValueError, match="missing required 'url'"): + resolve_mcp_servers() + + +def test_resolve_sdk_populates_mcp_servers(monkeypatch: pytest.MonkeyPatch) -> None: + _clean_env(monkeypatch) + monkeypatch.setenv("LIGHTSPEED_PROVIDER", "anthropic") + monkeypatch.setenv( + "LIGHTSPEED_MCP_SERVERS", + '[{"url": "https://mcp.example.com", "name": "test"}]', + ) + sdk = resolve_sdk() + assert len(sdk.mcp_servers) == 1 + assert sdk.mcp_servers[0].name == "test" + + +def test_resolve_sdk_no_mcp_servers(monkeypatch: pytest.MonkeyPatch) -> None: + _clean_env(monkeypatch) + monkeypatch.setenv("LIGHTSPEED_PROVIDER", "anthropic") + sdk = resolve_sdk() + assert sdk.mcp_servers == () diff --git a/tests/test_ready.py b/tests/test_ready.py index 21c85bb..385555d 100644 --- a/tests/test_ready.py +++ b/tests/test_ready.py @@ -9,8 +9,9 @@ from fastapi import FastAPI from httpx import ASGITransport, AsyncClient -from lightspeed_agentic.config import ResolvedSDK +from lightspeed_agentic.config import McpServerConfig, ResolvedSDK from lightspeed_agentic.health import ( + check_mcp_endpoints, check_provider_endpoint, check_provider_env, probe_provider_endpoint, @@ -210,3 +211,94 @@ def test_run_readiness_checks_all_ok(monkeypatch: pytest.MonkeyPatch) -> None: ok, checks = run_readiness_checks(_OPENAI_DIRECT) assert ok is True assert checks == {"provider_env": "ok", "provider_endpoint": "ok"} + + +# --- R3: MCP endpoint reachability --- + +_MCP_SERVERS = ( + McpServerConfig(name="jira", url="https://mcp-jira.example.com/sse"), + McpServerConfig(name="github", url="https://mcp-github.example.com/"), +) + + +def test_check_mcp_endpoints_all_ok() -> None: + with patch("lightspeed_agentic.health.probe_provider_endpoint", return_value="ok"): + results = check_mcp_endpoints(_MCP_SERVERS) + assert results == {"mcp_jira": "ok", "mcp_github": "ok"} + + +def test_check_mcp_endpoints_one_failing() -> None: + def _side_effect(url: str, *_: object) -> str: + if "jira" in url: + return "error: connection refused" + return "ok" + + with patch("lightspeed_agentic.health.probe_provider_endpoint", side_effect=_side_effect): + results = check_mcp_endpoints(_MCP_SERVERS) + assert results["mcp_jira"].startswith("error: ") + assert results["mcp_github"] == "ok" + + +def test_check_mcp_endpoints_empty() -> None: + assert check_mcp_endpoints(()) == {} + + +def test_run_readiness_checks_with_mcp_ok(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv("OPENAI_API_KEY", "k") + sdk_with_mcp = ResolvedSDK( + "openai", + ("OPENAI_API_KEY",), + "https://api.openai.com/", + mcp_servers=_MCP_SERVERS, + ) + with patch("lightspeed_agentic.health.probe_provider_endpoint", return_value="ok"): + ok, checks = run_readiness_checks(sdk_with_mcp) + assert ok is True + assert "mcp_jira" in checks + assert "mcp_github" in checks + + +def test_run_readiness_checks_with_mcp_failing(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv("OPENAI_API_KEY", "k") + sdk_with_mcp = ResolvedSDK( + "openai", + ("OPENAI_API_KEY",), + "https://api.openai.com/", + mcp_servers=_MCP_SERVERS, + ) + + def _side_effect(url: str, *_: object) -> str: + if "jira" in url: + return "error: timeout" + return "ok" + + with patch("lightspeed_agentic.health.probe_provider_endpoint", side_effect=_side_effect): + ok, checks = run_readiness_checks(sdk_with_mcp) + assert ok is False + assert checks["mcp_jira"].startswith("error: ") + + +@pytest.mark.asyncio +async def test_ready_route_with_mcp_failing(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv("OPENAI_API_KEY", "k") + sdk_with_mcp = ResolvedSDK( + "openai", + ("OPENAI_API_KEY",), + "https://api.openai.com/", + mcp_servers=(_MCP_SERVERS[0],), + ) + + def _fail_mcp(url: str, *_: object) -> str: + if "jira" in url: + return "error: connection refused" + return "ok" + + app = FastAPI() + register_ready_route(app, sdk=sdk_with_mcp) + with patch("lightspeed_agentic.health.probe_provider_endpoint", side_effect=_fail_mcp): + async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client: + resp = await client.get("/ready") + assert resp.status_code == 503 + body = resp.json() + assert body["status"] == "error" + assert body["checks"]["mcp_jira"].startswith("error: ") From 84e3fa9db4785597157db2650da3c6ec942b92c0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1=C5=A1=20Reme=C5=A1?= Date: Tue, 23 Jun 2026 12:57:40 +0200 Subject: [PATCH 2/2] fix: support operator list-format MCP headers with ServiceAccountToken source The operator injects LIGHTSPEED_MCP_SERVERS headers as an array of {name, source/value} objects, but resolve_mcp_servers() only accepted a dict. This caused a ValueError at startup, crash-looping the pod. Accept both formats: - dict: {"header-name": "static-value"} (existing) - list: [{"name": "h", "source": "ServiceAccountToken"}] (operator) ServiceAccountToken source reads from the mounted SA token file. Co-Authored-By: Claude --- src/lightspeed_agentic/config.py | 50 ++++++++++++++++- tests/test_config.py | 96 ++++++++++++++++++++++++++++++++ 2 files changed, 143 insertions(+), 3 deletions(-) diff --git a/src/lightspeed_agentic/config.py b/src/lightspeed_agentic/config.py index 8989bfc..3561786 100644 --- a/src/lightspeed_agentic/config.py +++ b/src/lightspeed_agentic/config.py @@ -16,6 +16,7 @@ logger = logging.getLogger(__name__) LLM_CREDENTIALS_PATH = "/var/run/secrets/llm-credentials" +_SA_TOKEN_PATH = "/var/run/secrets/kubernetes.io/serviceaccount/token" # noqa: S105 def _llm_credentials_path() -> str: @@ -33,6 +34,51 @@ class McpServerConfig: headers: dict[str, str] = dataclasses.field(default_factory=dict) +def _resolve_header_source(source: str) -> str: + """Resolve a dynamic header source to its value.""" + if source == "ServiceAccountToken": + try: + with open(_SA_TOKEN_PATH) as f: + return f.read().strip() + except FileNotFoundError as err: + raise ValueError( + f"Header source 'ServiceAccountToken' requires {_SA_TOKEN_PATH}" + ) from err + raise ValueError(f"Unknown header source: {source!r}") + + +def _resolve_headers(raw: Any, index: int) -> dict[str, str]: + """Accept headers as a dict or as the operator's list-of-objects format. + + Dict format (static): {"header-name": "value"} + List format (dynamic): [{"name": "header-name", "source": "ServiceAccountToken"}] + """ + if isinstance(raw, dict): + return raw + if isinstance(raw, list): + resolved: dict[str, str] = {} + for j, item in enumerate(raw): + if not isinstance(item, dict): + raise ValueError( + f"LIGHTSPEED_MCP_SERVERS[{index}].headers[{j}] must be an object" + ) + hdr_name = item.get("name") + if not hdr_name or not isinstance(hdr_name, str): + raise ValueError( + f"LIGHTSPEED_MCP_SERVERS[{index}].headers[{j}] missing 'name'" + ) + if "value" in item: + resolved[hdr_name] = str(item["value"]) + elif "source" in item: + resolved[hdr_name] = _resolve_header_source(item["source"]) + else: + raise ValueError( + f"LIGHTSPEED_MCP_SERVERS[{index}].headers[{j}] needs 'value' or 'source'" + ) + return resolved + raise ValueError(f"LIGHTSPEED_MCP_SERVERS[{index}] 'headers' must be an object or array") + + def resolve_mcp_servers() -> tuple[McpServerConfig, ...]: """Parse LIGHTSPEED_MCP_SERVERS env var into validated configs.""" raw = os.environ.get("LIGHTSPEED_MCP_SERVERS", "").strip() @@ -53,9 +99,7 @@ def resolve_mcp_servers() -> tuple[McpServerConfig, ...]: raise ValueError(f"LIGHTSPEED_MCP_SERVERS[{i}] missing required 'name' string") if not url or not isinstance(url, str): raise ValueError(f"LIGHTSPEED_MCP_SERVERS[{i}] missing required 'url' string") - headers = entry.get("headers", {}) - if not isinstance(headers, dict): - raise ValueError(f"LIGHTSPEED_MCP_SERVERS[{i}] 'headers' must be an object") + headers = _resolve_headers(entry.get("headers", {}), i) servers.append(McpServerConfig(name=name, url=url, headers=headers)) return tuple(servers) diff --git a/tests/test_config.py b/tests/test_config.py index 72ee1dd..4fffbcf 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -289,6 +289,102 @@ def test_mcp_servers_missing_url(monkeypatch: pytest.MonkeyPatch) -> None: resolve_mcp_servers() +def test_mcp_servers_headers_list_with_source( + monkeypatch: pytest.MonkeyPatch, tmp_path: object +) -> None: + """Operator-style headers: list of {name, source} objects.""" + _clean_env(monkeypatch) + import lightspeed_agentic.config as cfg + + token_file = tmp_path / "token" # type: ignore[operator] + token_file.write_text("test-sa-token") + monkeypatch.setattr(cfg, "_SA_TOKEN_PATH", str(token_file)) + monkeypatch.setenv( + "LIGHTSPEED_MCP_SERVERS", + json.dumps( + [ + { + "name": "k8s", + "url": "http://mcp.local/mcp", + "headers": [ + {"name": "kubernetes-authorization", "source": "ServiceAccountToken"} + ], + } + ] + ), + ) + servers = resolve_mcp_servers() + assert len(servers) == 1 + assert servers[0].headers == {"kubernetes-authorization": "test-sa-token"} + + +def test_mcp_servers_headers_list_with_value(monkeypatch: pytest.MonkeyPatch) -> None: + """List-format headers with static value.""" + _clean_env(monkeypatch) + monkeypatch.setenv( + "LIGHTSPEED_MCP_SERVERS", + json.dumps( + [ + { + "name": "k8s", + "url": "http://mcp.local/mcp", + "headers": [{"name": "x-api-key", "value": "my-key"}], + } + ] + ), + ) + servers = resolve_mcp_servers() + assert servers[0].headers == {"x-api-key": "my-key"} + + +def test_mcp_servers_headers_list_missing_name(monkeypatch: pytest.MonkeyPatch) -> None: + _clean_env(monkeypatch) + monkeypatch.setenv( + "LIGHTSPEED_MCP_SERVERS", + json.dumps([{ + "name": "k8s", "url": "http://x", + "headers": [{"source": "ServiceAccountToken"}], + }]), + ) + with pytest.raises(ValueError, match="missing 'name'"): + resolve_mcp_servers() + + +def test_mcp_servers_headers_list_missing_value_and_source(monkeypatch: pytest.MonkeyPatch) -> None: + _clean_env(monkeypatch) + monkeypatch.setenv( + "LIGHTSPEED_MCP_SERVERS", + json.dumps([{"name": "k8s", "url": "http://x", "headers": [{"name": "h"}]}]), + ) + with pytest.raises(ValueError, match="needs 'value' or 'source'"): + resolve_mcp_servers() + + +def test_mcp_servers_headers_unknown_source( + monkeypatch: pytest.MonkeyPatch, +) -> None: + _clean_env(monkeypatch) + monkeypatch.setenv( + "LIGHTSPEED_MCP_SERVERS", + json.dumps([{ + "name": "k8s", "url": "http://x", + "headers": [{"name": "h", "source": "Bogus"}], + }]), + ) + with pytest.raises(ValueError, match="Unknown header source"): + resolve_mcp_servers() + + +def test_mcp_servers_headers_invalid_type(monkeypatch: pytest.MonkeyPatch) -> None: + _clean_env(monkeypatch) + monkeypatch.setenv( + "LIGHTSPEED_MCP_SERVERS", + json.dumps([{"name": "k8s", "url": "http://x", "headers": "bad"}]), + ) + with pytest.raises(ValueError, match="must be an object or array"): + resolve_mcp_servers() + + def test_resolve_sdk_populates_mcp_servers(monkeypatch: pytest.MonkeyPatch) -> None: _clean_env(monkeypatch) monkeypatch.setenv("LIGHTSPEED_PROVIDER", "anthropic")