diff --git a/src/main/java/io/naftiko/engine/exposes/JettyMcpStreamableHandler.java b/src/main/java/io/naftiko/engine/exposes/JettyMcpStreamableHandler.java index 09429ed..f737ce7 100644 --- a/src/main/java/io/naftiko/engine/exposes/JettyMcpStreamableHandler.java +++ b/src/main/java/io/naftiko/engine/exposes/JettyMcpStreamableHandler.java @@ -18,18 +18,18 @@ import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; +import java.util.logging.Level; import org.eclipse.jetty.http.HttpHeader; import org.eclipse.jetty.io.Content; import org.eclipse.jetty.server.Handler; import org.eclipse.jetty.server.Request; import org.eclipse.jetty.server.Response; import org.eclipse.jetty.util.Callback; +import org.restlet.Context; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.ObjectNode; -import io.modelcontextprotocol.spec.McpSchema; /** * Jetty Handler implementing the MCP Streamable HTTP transport protocol. @@ -39,21 +39,20 @@ * - GET: SSE stream for server-initiated messages (returns 405 - not supported) * - DELETE: Session termination * - * Uses MCP Java SDK types (McpSchema) for JSON-RPC message serialization. + * Delegates protocol dispatch to {@link McpProtocolDispatcher} and adds + * HTTP-specific concerns: session management, HTTP status codes, content types. */ public class JettyMcpStreamableHandler extends Handler.Abstract { - private static final String JSONRPC_VERSION = "2.0"; - private static final String MCP_PROTOCOL_VERSION = "2025-03-26"; private static final String HEADER_MCP_SESSION_ID = "Mcp-Session-Id"; - private final McpServerAdapter adapter; + private final McpProtocolDispatcher dispatcher; private final ObjectMapper mapper; private final Map activeSessions; public JettyMcpStreamableHandler(McpServerAdapter adapter) { - this.adapter = adapter; - this.mapper = new ObjectMapper(); + this.dispatcher = new McpProtocolDispatcher(adapter); + this.mapper = dispatcher.getMapper(); this.activeSessions = new ConcurrentHashMap<>(); } @@ -84,169 +83,63 @@ private boolean handlePost(Request request, Response response, Callback callback String body = Content.Source.asString(request, StandardCharsets.UTF_8); if (body == null || body.isBlank()) { - writeJsonRpcError(response, callback, null, -32700, "Parse error: empty body"); + Context.getCurrentLogger().log(Level.WARNING, "Error processing request. Missing or empty body"); + ObjectNode error = dispatcher.buildJsonRpcError(null, -32700, + "Parse error: empty body"); + writeJson(response, callback, 200, error); return true; } JsonNode root = mapper.readTree(body); - String jsonrpc = root.path("jsonrpc").asText(""); - JsonNode idNode = root.get("id"); String rpcMethod = root.path("method").asText(""); - JsonNode params = root.get("params"); - if (!JSONRPC_VERSION.equals(jsonrpc)) { - writeJsonRpcError(response, callback, idNode, -32600, - "Invalid Request: jsonrpc must be '2.0'"); - return true; + // Handle initialize specially — we need to create the session + if ("initialize".equals(rpcMethod)) { + String sessionId = UUID.randomUUID().toString(); + activeSessions.put(sessionId, true); + response.getHeaders().put(HEADER_MCP_SESSION_ID, sessionId); } - switch (rpcMethod) { - case "initialize": - return handleInitialize(request, response, callback, idNode, params); - - case "notifications/initialized": - // Notification — no response needed - response.setStatus(202); - response.write(true, null, callback); - return true; - - case "tools/list": - return handleToolsList(request, response, callback, idNode); - - case "tools/call": - return handleToolsCall(request, response, callback, idNode, params); + // Handle notifications/initialized — return 202 with no body + if ("notifications/initialized".equals(rpcMethod)) { + response.setStatus(202); + response.write(true, null, callback); + return true; + } - case "ping": - return handlePing(response, callback, idNode); + // Delegate to the shared protocol dispatcher + ObjectNode result = dispatcher.dispatch(root); - default: - writeJsonRpcError(response, callback, idNode, -32601, - "Method not found: " + rpcMethod); - return true; + if (result != null) { + writeJson(response, callback, 200, result); + } else { + // Notification — no response body + response.setStatus(202); + response.write(true, null, callback); } + + return true; } catch (JsonProcessingException e) { - writeJsonRpcError(response, callback, null, -32700, "Parse error: " + e.getMessage()); + Context.getCurrentLogger().log(Level.SEVERE, "Error processing request", e); + ObjectNode error = dispatcher.buildJsonRpcError(null, -32700, + "Parse error: " + e.getMessage()); + try { + writeJson(response, callback, 200, error); + } catch (Exception ex) { + writeError(response, callback, 500, "Internal server error"); + } return true; } catch (Exception e) { - writeJsonRpcError(response, callback, null, -32603, + Context.getCurrentLogger().log(Level.SEVERE, "Error processing request", e); + ObjectNode error = dispatcher.buildJsonRpcError(null, -32603, "Internal error: " + e.getMessage()); - return true; - } - } - - /** - * Handle MCP initialize request. - * Returns server capabilities and assigns a session ID. - */ - private boolean handleInitialize(Request request, Response response, Callback callback, - JsonNode id, JsonNode params) throws Exception { - - String sessionId = UUID.randomUUID().toString(); - activeSessions.put(sessionId, true); - - // Build InitializeResult - ObjectNode result = mapper.createObjectNode(); - result.put("protocolVersion", MCP_PROTOCOL_VERSION); - - // Server capabilities — we support tools - ObjectNode capabilities = mapper.createObjectNode(); - capabilities.putObject("tools"); - result.set("capabilities", capabilities); - - // Server info - ObjectNode serverInfo = mapper.createObjectNode(); - serverInfo.put("name", adapter.getMcpServerSpec().getNamespace()); - serverInfo.put("version", "1.0.0"); - result.set("serverInfo", serverInfo); - - // Instructions from the spec description - if (adapter.getMcpServerSpec().getDescription() != null) { - result.put("instructions", adapter.getMcpServerSpec().getDescription()); - } - - // Set session header - response.getHeaders().put(HEADER_MCP_SESSION_ID, sessionId); - - writeJsonRpcResult(response, callback, id, result); - return true; - } - - /** - * Handle tools/list request. - * Returns the list of available tools built from the spec. - */ - private boolean handleToolsList(Request request, Response response, Callback callback, - JsonNode id) throws Exception { - - ObjectNode result = mapper.createObjectNode(); - ArrayNode toolsArray = result.putArray("tools"); - - for (McpSchema.Tool tool : adapter.getTools()) { - ObjectNode toolNode = mapper.createObjectNode(); - toolNode.put("name", tool.name()); - if (tool.description() != null) { - toolNode.put("description", tool.description()); - } - if (tool.inputSchema() != null) { - toolNode.set("inputSchema", mapper.valueToTree(tool.inputSchema())); + try { + writeJson(response, callback, 200, error); + } catch (Exception ex) { + writeError(response, callback, 500, "Internal server error"); } - toolsArray.add(toolNode); - } - - writeJsonRpcResult(response, callback, id, result); - return true; - } - - /** - * Handle tools/call request. - * Dispatches tool invocation to the McpToolHandler. - */ - private boolean handleToolsCall(Request request, Response response, Callback callback, - JsonNode id, JsonNode params) throws Exception { - - if (params == null) { - writeJsonRpcError(response, callback, id, -32602, "Invalid params: missing params"); return true; } - - String toolName = params.path("name").asText(""); - JsonNode argumentsNode = params.get("arguments"); - - @SuppressWarnings("unchecked") - Map arguments = argumentsNode != null - ? mapper.treeToValue(argumentsNode, Map.class) - : new ConcurrentHashMap<>(); - - try { - McpSchema.CallToolResult toolResult = - adapter.getToolHandler().handleToolCall(toolName, arguments); - - ObjectNode result = mapper.valueToTree(toolResult); - writeJsonRpcResult(response, callback, id, result); - } catch (IllegalArgumentException e) { - writeJsonRpcError(response, callback, id, -32602, - "Invalid params: " + e.getMessage()); - } catch (Exception e) { - // Tool execution error — return as a tool result with isError=true - ObjectNode result = mapper.createObjectNode(); - ArrayNode content = result.putArray("content"); - ObjectNode textContent = content.addObject(); - textContent.put("type", "text"); - textContent.put("text", "Error: " + e.getMessage()); - result.put("isError", true); - writeJsonRpcResult(response, callback, id, result); - } - - return true; - } - - /** - * Handle ping request. - */ - private boolean handlePing(Response response, Callback callback, JsonNode id) - throws Exception { - writeJsonRpcResult(response, callback, id, mapper.createObjectNode()); - return true; } /** @@ -262,45 +155,6 @@ private boolean handleDelete(Request request, Response response, Callback callba return true; } - /** - * Write a JSON-RPC success response. - */ - private void writeJsonRpcResult(Response response, Callback callback, JsonNode id, - JsonNode result) throws Exception { - ObjectNode envelope = mapper.createObjectNode(); - envelope.put("jsonrpc", JSONRPC_VERSION); - if (id != null) { - envelope.set("id", id); - } - envelope.set("result", result); - - writeJson(response, callback, 200, envelope); - } - - /** - * Write a JSON-RPC error response. - */ - private void writeJsonRpcError(Response response, Callback callback, JsonNode id, int code, - String message) { - try { - ObjectNode envelope = mapper.createObjectNode(); - envelope.put("jsonrpc", JSONRPC_VERSION); - if (id != null) { - envelope.set("id", id); - } else { - envelope.putNull("id"); - } - - ObjectNode error = envelope.putObject("error"); - error.put("code", code); - error.put("message", message); - - writeJson(response, callback, 200, envelope); - } catch (Exception e) { - writeError(response, callback, 500, "Internal server error"); - } - } - /** * Write a JSON response body. */ diff --git a/src/main/java/io/naftiko/engine/exposes/McpProtocolDispatcher.java b/src/main/java/io/naftiko/engine/exposes/McpProtocolDispatcher.java new file mode 100644 index 0000000..da6ab1f --- /dev/null +++ b/src/main/java/io/naftiko/engine/exposes/McpProtocolDispatcher.java @@ -0,0 +1,218 @@ +/** + * Copyright 2025-2026 Naftiko + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package io.naftiko.engine.exposes; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.logging.Level; +import org.restlet.Context; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import io.modelcontextprotocol.spec.McpSchema; + +/** + * Transport-agnostic MCP JSON-RPC protocol dispatcher. + * + * Handles MCP protocol methods (initialize, tools/list, tools/call, ping) + * and produces JSON-RPC response envelopes. Used by both the Streamable HTTP + * handler and the stdio handler. + */ +public class McpProtocolDispatcher { + + static final String JSONRPC_VERSION = "2.0"; + static final String MCP_PROTOCOL_VERSION = "2025-03-26"; + + private final McpServerAdapter adapter; + private final ObjectMapper mapper; + + public McpProtocolDispatcher(McpServerAdapter adapter) { + this.adapter = adapter; + this.mapper = new ObjectMapper(); + } + + /** + * Dispatch a JSON-RPC request and return the response envelope. + * + * @param request the parsed JSON-RPC request + * @return the JSON-RPC response envelope, or {@code null} for notifications + */ + public ObjectNode dispatch(JsonNode request) { + try { + String jsonrpc = request.path("jsonrpc").asText(""); + JsonNode idNode = request.get("id"); + String rpcMethod = request.path("method").asText(""); + JsonNode params = request.get("params"); + + if (!JSONRPC_VERSION.equals(jsonrpc)) { + return buildJsonRpcError(idNode, -32600, + "Invalid Request: jsonrpc must be '2.0'"); + } + + switch (rpcMethod) { + case "initialize": + return handleInitialize(idNode); + + case "notifications/initialized": + // Notification — no response + return null; + + case "tools/list": + return handleToolsList(idNode); + + case "tools/call": + return handleToolsCall(idNode, params); + + case "ping": + return buildJsonRpcResult(idNode, mapper.createObjectNode()); + + default: + return buildJsonRpcError(idNode, -32601, + "Method not found: " + rpcMethod); + } + } catch (Exception e) { + Context.getCurrentLogger().log(Level.SEVERE, "Error processing request", e); + return buildJsonRpcError(null, -32603, "Internal error: " + e.getMessage()); + } + } + + /** + * Handle MCP initialize request. + */ + private ObjectNode handleInitialize(JsonNode id) { + ObjectNode result = mapper.createObjectNode(); + result.put("protocolVersion", MCP_PROTOCOL_VERSION); + + // Server capabilities — we support tools + ObjectNode capabilities = mapper.createObjectNode(); + capabilities.putObject("tools"); + result.set("capabilities", capabilities); + + // Server info + ObjectNode serverInfo = mapper.createObjectNode(); + serverInfo.put("name", adapter.getMcpServerSpec().getNamespace()); + serverInfo.put("version", "1.0.0"); + result.set("serverInfo", serverInfo); + + // Instructions from the spec description + if (adapter.getMcpServerSpec().getDescription() != null) { + result.put("instructions", adapter.getMcpServerSpec().getDescription()); + } + + return buildJsonRpcResult(id, result); + } + + /** + * Handle tools/list request. + */ + private ObjectNode handleToolsList(JsonNode id) { + ObjectNode result = mapper.createObjectNode(); + ArrayNode toolsArray = result.putArray("tools"); + + for (McpSchema.Tool tool : adapter.getTools()) { + ObjectNode toolNode = mapper.createObjectNode(); + toolNode.put("name", tool.name()); + + if (tool.description() != null) { + toolNode.put("description", tool.description()); + } + + if (tool.inputSchema() != null) { + toolNode.set("inputSchema", mapper.valueToTree(tool.inputSchema())); + } + + toolsArray.add(toolNode); + } + + return buildJsonRpcResult(id, result); + } + + /** + * Handle tools/call request. + */ + @SuppressWarnings("unchecked") + private ObjectNode handleToolsCall(JsonNode id, JsonNode params) { + if (params == null) { + return buildJsonRpcError(id, -32602, "Invalid params: missing params"); + } + + String toolName = params.path("name").asText(""); + JsonNode argumentsNode = params.get("arguments"); + + try { + Map arguments = argumentsNode != null + ? mapper.treeToValue(argumentsNode, Map.class) + : new ConcurrentHashMap<>(); + McpSchema.CallToolResult toolResult = + adapter.getToolHandler().handleToolCall(toolName, arguments); + ObjectNode result = mapper.valueToTree(toolResult); + return buildJsonRpcResult(id, result); + } catch (IllegalArgumentException e) { + Context.getCurrentLogger().log(Level.SEVERE, "Error handling tools call", e); + return buildJsonRpcError(id, -32602, "Invalid params: " + e.getMessage()); + } catch (Exception e) { + Context.getCurrentLogger().log(Level.SEVERE, "Error handling tools call", e); + // Tool execution error — return as a tool result with isError=true + ObjectNode result = mapper.createObjectNode(); + ArrayNode content = result.putArray("content"); + ObjectNode textContent = content.addObject(); + textContent.put("type", "text"); + textContent.put("text", "Error: " + e.getMessage()); + result.put("isError", true); + return buildJsonRpcResult(id, result); + } + } + + /** + * Build a JSON-RPC success response envelope. + */ + ObjectNode buildJsonRpcResult(JsonNode id, JsonNode result) { + ObjectNode envelope = mapper.createObjectNode(); + envelope.put("jsonrpc", JSONRPC_VERSION); + Context.getCurrentLogger().log(Level.INFO, "Building JSON-RPC result for id: " + id); + + if (id != null) { + envelope.set("id", id); + } + + envelope.set("result", result); + return envelope; + } + + /** + * Build a JSON-RPC error response envelope. + */ + ObjectNode buildJsonRpcError(JsonNode id, int code, String message) { + ObjectNode envelope = mapper.createObjectNode(); + envelope.put("jsonrpc", JSONRPC_VERSION); + Context.getCurrentLogger().log(Level.INFO, "Building JSON-RPC error for id: " + id); + + if (id != null) { + envelope.set("id", id); + } else { + envelope.putNull("id"); + } + + ObjectNode error = envelope.putObject("error"); + error.put("code", code); + error.put("message", message); + return envelope; + } + + ObjectMapper getMapper() { + return mapper; + } + +} diff --git a/src/main/java/io/naftiko/engine/exposes/McpServerAdapter.java b/src/main/java/io/naftiko/engine/exposes/McpServerAdapter.java index 4d24da8..903f525 100644 --- a/src/main/java/io/naftiko/engine/exposes/McpServerAdapter.java +++ b/src/main/java/io/naftiko/engine/exposes/McpServerAdapter.java @@ -18,8 +18,10 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.logging.Level; import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.ServerConnector; +import org.restlet.Context; import io.modelcontextprotocol.spec.McpSchema; import io.naftiko.Capability; import io.naftiko.spec.InputParameterSpec; @@ -29,13 +31,24 @@ /** * MCP Server Adapter implementation. * - * Sets up a Jetty HTTP server with the MCP Streamable HTTP transport, - * exposing tools defined in the MCP server specification. Each tool maps - * to consumed HTTP operations via the shared HttpClientAdapter infrastructure. + * Supports two transports, selected via the spec's {@code transport} field: + *
    + *
  • {@code http} (default) — Jetty-based Streamable HTTP server
  • + *
  • {@code stdio} — stdin/stdout JSON-RPC for local IDE integration
  • + *
+ * + * In both modes, tool definitions and the {@link McpToolHandler} are shared. Only the I/O layer + * differs. */ public class McpServerAdapter extends ServerAdapter { - private final Server jettyServer; + /** Jetty server — only initialized for HTTP transport */ + private Server jettyServer; + + /** Stdio handler and thread — only initialized for stdio transport */ + private volatile StdioJsonRpcHandler stdioHandler; + private volatile Thread stdioThread; + private final McpToolHandler toolHandler; private final List tools; @@ -44,14 +57,26 @@ public McpServerAdapter(Capability capability, McpServerSpec serverSpec) { // Build MCP Tool definitions from the spec this.tools = new ArrayList<>(); + Context.getCurrentLogger().log(Level.INFO, "Building MCP Tool definitions from the spec"); + for (McpServerToolSpec toolSpec : serverSpec.getTools()) { this.tools.add(buildMcpTool(toolSpec)); } - // Create the tool handler + // Create the tool handler (transport-agnostic) this.toolHandler = new McpToolHandler(capability, serverSpec.getTools()); - // Set up Jetty server + if (serverSpec.isStdio()) { + initStdioTransport(); + } else { + initHttpTransport(serverSpec); + } + } + + /** + * Initialize the Streamable HTTP transport (Jetty). + */ + private void initHttpTransport(McpServerSpec serverSpec) { this.jettyServer = new Server(); ServerConnector connector = new ServerConnector(jettyServer); String address = serverSpec.getAddress() != null ? serverSpec.getAddress() : "localhost"; @@ -59,7 +84,8 @@ public McpServerAdapter(Capability capability, McpServerSpec serverSpec) { connector.setPort(serverSpec.getPort()); // TODO: Make idle timeout configurable - connector.setIdleTimeout(120000); // 2 minutes — tool calls may involve upstream HTTP requests + connector.setIdleTimeout(120000); // 2 minutes — tool calls may involve upstream HTTP + // requests jettyServer.addConnector(connector); // Set the MCP handler @@ -67,8 +93,16 @@ public McpServerAdapter(Capability capability, McpServerSpec serverSpec) { } /** - * Build an MCP Tool from a tool spec. - * Converts input parameters to a JSON Schema map for the tool's inputSchema. + * Initialize the stdio transport (stdin/stdout JSON-RPC). + */ + private void initStdioTransport() { + McpProtocolDispatcher dispatcher = new McpProtocolDispatcher(this); + this.stdioHandler = new StdioJsonRpcHandler(dispatcher); + } + + /** + * Build an MCP Tool from a tool spec. Converts input parameters to a JSON Schema map for the + * tool's inputSchema. */ private McpSchema.Tool buildMcpTool(McpServerToolSpec toolSpec) { // Build JSON Schema properties from input parameters @@ -79,9 +113,13 @@ private McpSchema.Tool buildMcpTool(McpServerToolSpec toolSpec) { for (InputParameterSpec param : toolSpec.getInputParameters()) { Map property = new HashMap<>(); property.put("type", param.getType() != null ? param.getType() : "string"); + if (param.getDescription() != null) { property.put("description", param.getDescription()); } + + Context.getCurrentLogger().log(Level.INFO, + "Adding parameter to schema: " + param.getName()); schemaProperties.put(param.getName(), property); // By default, all parameters are required unless explicitly marked otherwise @@ -90,17 +128,12 @@ private McpSchema.Tool buildMcpTool(McpServerToolSpec toolSpec) { } // Build the input schema using McpSchema.JsonSchema - McpSchema.JsonSchema inputSchema = new McpSchema.JsonSchema( - "object", + McpSchema.JsonSchema inputSchema = new McpSchema.JsonSchema("object", schemaProperties.isEmpty() ? null : schemaProperties, - required.isEmpty() ? null : required, - null, null, null); - - return McpSchema.Tool.builder() - .name(toolSpec.getName()) - .description(toolSpec.getDescription()) - .inputSchema(inputSchema) - .build(); + required.isEmpty() ? null : required, null, null, null); + + return McpSchema.Tool.builder().name(toolSpec.getName()) + .description(toolSpec.getDescription()).inputSchema(inputSchema).build(); } public McpServerSpec getMcpServerSpec() { @@ -117,15 +150,41 @@ public List getTools() { @Override public void start() throws Exception { - jettyServer.start(); - System.out.println("MCP Server started on " - + getMcpServerSpec().getAddress() + ":" + getMcpServerSpec().getPort() - + " (namespace: " + getMcpServerSpec().getNamespace() + ")"); + if (getMcpServerSpec().isStdio()) { + stdioThread = new Thread(stdioHandler, "mcp-stdio"); + stdioThread.setDaemon(true); + stdioThread.start(); + System.err.println("MCP Server started on stdio" + " (namespace: " + + getMcpServerSpec().getNamespace() + ")"); + Context.getCurrentLogger().log(Level.INFO, "MCP Server started on stdio" + + " (namespace: " + getMcpServerSpec().getNamespace() + ")"); + } else { + jettyServer.start(); + System.out.println("MCP Server started on " + getMcpServerSpec().getAddress() + ":" + + getMcpServerSpec().getPort() + " (namespace: " + + getMcpServerSpec().getNamespace() + ")"); + Context.getCurrentLogger().log(Level.INFO, + "MCP Server started on " + getMcpServerSpec().getAddress() + ":" + + getMcpServerSpec().getPort() + " (namespace: " + + getMcpServerSpec().getNamespace() + ")"); + } } @Override public void stop() throws Exception { - jettyServer.stop(); + if (getMcpServerSpec().isStdio()) { + if (stdioHandler != null) { + stdioHandler.shutdown(); + stdioThread.join(5000); // Wait for the stdio thread to finish + Context.getCurrentLogger().log(Level.INFO, "MCP Server stopped on stdio"); + } + } else { + if (jettyServer != null) { + jettyServer.stop(); + Context.getCurrentLogger().log(Level.INFO, "MCP Server stopped on " + + getMcpServerSpec().getAddress() + ":" + getMcpServerSpec().getPort()); + } + } } } diff --git a/src/main/java/io/naftiko/engine/exposes/McpToolHandler.java b/src/main/java/io/naftiko/engine/exposes/McpToolHandler.java index e468ca2..4aaca82 100644 --- a/src/main/java/io/naftiko/engine/exposes/McpToolHandler.java +++ b/src/main/java/io/naftiko/engine/exposes/McpToolHandler.java @@ -28,7 +28,6 @@ import com.fasterxml.jackson.databind.node.NullNode; import io.modelcontextprotocol.spec.McpSchema; import io.naftiko.Capability; -import io.naftiko.engine.Converter; import io.naftiko.engine.Resolver; import io.naftiko.engine.consumes.ClientAdapter; import io.naftiko.engine.consumes.HttpClientAdapter; @@ -168,9 +167,13 @@ private McpSchema.CallToolResult buildToolResult(McpServerToolSpec toolSpec, true, null, null); } + // Buffer entity text before any mapping to avoid double-read issues + String responseText = found.clientResponse.getEntity().getText(); + // Apply output parameter mappings if defined - if (toolSpec.getOutputParameters() != null && !toolSpec.getOutputParameters().isEmpty()) { - String mapped = mapOutputParameters(toolSpec, found); + if (toolSpec.getOutputParameters() != null && !toolSpec.getOutputParameters().isEmpty() + && responseText != null && !responseText.isEmpty()) { + String mapped = mapOutputParameters(toolSpec, responseText); if (mapped != null) { return new McpSchema.CallToolResult( List.of(new McpSchema.TextContent(mapped)), isError, null, null); @@ -178,24 +181,22 @@ private McpSchema.CallToolResult buildToolResult(McpServerToolSpec toolSpec, } // Fall back to raw response - String responseText = found.clientResponse.getEntity().getText(); return new McpSchema.CallToolResult( List.of(new McpSchema.TextContent(responseText != null ? responseText : "")), isError, null, null); } /** - * Map client response to the tool's declared outputParameters and return a JSON string. - * Returns null when mapping could not be applied. + * Map pre-buffered response text to the tool's declared outputParameters and return a JSON + * string. Returns null when mapping could not be applied. */ - private String mapOutputParameters(McpServerToolSpec toolSpec, HandlingContext found) + private String mapOutputParameters(McpServerToolSpec toolSpec, String responseText) throws IOException { - if (found == null || found.clientResponse == null - || found.clientResponse.getEntity() == null) { + if (responseText == null || responseText.isEmpty()) { return null; } - JsonNode root = Converter.convertToJson(null, null, found.clientResponse.getEntity()); + JsonNode root = mapper.readTree(responseText); for (OutputParameterSpec outputParameter : toolSpec.getOutputParameters()) { JsonNode mapped = Resolver.resolveOutputMappings(outputParameter, root, mapper); @@ -219,15 +220,18 @@ private HandlingContext findClientRequestFor(ApiServerCallSpec call, } Map merged = new HashMap<>(); + if (requestParams != null) { merged.putAll(requestParams); } + if (call.getWith() != null) { merged.putAll(call.getWith()); } if (call.getOperation() != null) { String[] tokens = call.getOperation().split("\\."); + if (tokens.length == 2) { return findClientRequestFor(tokens[0], tokens[1], merged); } diff --git a/src/main/java/io/naftiko/engine/exposes/StdioJsonRpcHandler.java b/src/main/java/io/naftiko/engine/exposes/StdioJsonRpcHandler.java new file mode 100644 index 0000000..7fe5b4f --- /dev/null +++ b/src/main/java/io/naftiko/engine/exposes/StdioJsonRpcHandler.java @@ -0,0 +1,124 @@ +/** + * Copyright 2025-2026 Naftiko + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package io.naftiko.engine.exposes; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.io.PrintStream; +import java.nio.charset.StandardCharsets; +import java.util.logging.Level; +import org.restlet.Context; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; + +/** + * MCP stdio transport handler. + * + * Reads newline-delimited JSON-RPC messages from stdin and writes responses + * to stdout. Designed for local IDE integration where the capability process + * is launched as a subprocess. + * + *

All diagnostic logging goes to stderr since stdout is reserved for + * the JSON-RPC protocol.

+ */ +public class StdioJsonRpcHandler implements Runnable { + + private final McpProtocolDispatcher dispatcher; + private final InputStream input; + private final PrintStream output; + private volatile boolean running; + + /** + * Create a stdio handler using System.in / System.out. + */ + public StdioJsonRpcHandler(McpProtocolDispatcher dispatcher) { + this(dispatcher, System.in, System.out); + } + + /** + * Create a stdio handler with explicit streams (for testing). + */ + public StdioJsonRpcHandler(McpProtocolDispatcher dispatcher, + InputStream input, OutputStream output) { + this.dispatcher = dispatcher; + this.input = input; + this.output = (output instanceof PrintStream) + ? (PrintStream) output + : new PrintStream(output, true, StandardCharsets.UTF_8); + this.running = true; + } + + @Override + public void run() { + try (BufferedReader reader = new BufferedReader( + new InputStreamReader(input, StandardCharsets.UTF_8))) { + + String line; + while (running && (line = reader.readLine()) != null) { + if (line.isBlank()) { + continue; + } + + try { + JsonNode request = dispatcher.getMapper().readTree(line); + ObjectNode response = dispatcher.dispatch(request); + + // Notifications return null — no response to write + if (response != null) { + String json = dispatcher.getMapper().writeValueAsString(response); + output.println(json); + output.flush(); + } + } catch (JsonProcessingException e) { + Context.getCurrentLogger().log(Level.SEVERE, "Error processing request", e); + + + // Malformed JSON — write parse error response + ObjectNode errorResponse = dispatcher.buildJsonRpcError( + null, -32700, "Parse error: " + e.getMessage()); + try { + output.println(dispatcher.getMapper().writeValueAsString(errorResponse)); + output.flush(); + } catch (JsonProcessingException ignored) { + Context.getCurrentLogger().log(Level.SEVERE, "Error serializing error response", ignored); + // Cannot serialize the error response — nothing more we can do + } + } + } + } catch (IOException e) { + if (running) { + System.err.println("MCP stdio error: " + e.getMessage()); + Context.getCurrentLogger().log(Level.SEVERE, "MCP stdio error", e); + } + } + } + + /** + * Signal the handler to stop reading. + */ + public void shutdown() { + this.running = false; + try { + input.close(); + } catch (IOException ignored) { + Context.getCurrentLogger().log(Level.SEVERE, "Error closing input stream", ignored); + // Best-effort close to interrupt the read loop + } + } + +} diff --git a/src/main/java/io/naftiko/spec/exposes/McpServerSpec.java b/src/main/java/io/naftiko/spec/exposes/McpServerSpec.java index 0d30d5a..23d4720 100644 --- a/src/main/java/io/naftiko/spec/exposes/McpServerSpec.java +++ b/src/main/java/io/naftiko/spec/exposes/McpServerSpec.java @@ -20,11 +20,19 @@ /** * MCP Server Specification Element. * - * Defines an MCP server that exposes tools over Streamable HTTP transport. + * Defines an MCP server that exposes tools over a configurable transport. + * Supported transports: + *
    + *
  • {@code http} (default) — Streamable HTTP via Jetty, for networked deployments
  • + *
  • {@code stdio} — stdin/stdout JSON-RPC, for local IDE development
  • + *
* Each tool maps to one or more consumed HTTP operations. */ public class McpServerSpec extends ServerSpec { + @JsonInclude(JsonInclude.Include.NON_NULL) + private volatile String transport; + @JsonInclude(JsonInclude.Include.NON_NULL) private volatile String namespace; @@ -45,6 +53,21 @@ public McpServerSpec(String address, int port, String namespace, String descript this.tools = new CopyOnWriteArrayList<>(); } + /** + * Returns the transport type. Defaults to {@code "http"} when not set. + */ + public String getTransport() { + return transport != null ? transport : "http"; + } + + public void setTransport(String transport) { + this.transport = transport; + } + + public boolean isStdio() { + return "stdio".equals(getTransport()); + } + public String getNamespace() { return namespace; } diff --git a/src/main/resources/schemas/capability-schema.json b/src/main/resources/schemas/capability-schema.json index b76b961..8894090 100644 --- a/src/main/resources/schemas/capability-schema.json +++ b/src/main/resources/schemas/capability-schema.json @@ -744,12 +744,21 @@ }, "ExposesMcp": { "type": "object", - "description": "MCP Server exposition configuration. Exposes tools over MCP Streamable HTTP transport.", + "description": "MCP Server exposition configuration. Exposes tools over MCP transport (Streamable HTTP or stdio).", "properties": { "type": { "type": "string", "const": "mcp" }, + "transport": { + "type": "string", + "enum": [ + "http", + "stdio" + ], + "default": "http", + "description": "The MCP transport to use. 'http' (default) exposes a Streamable HTTP server; 'stdio' uses stdin/stdout JSON-RPC for local IDE integration." + }, "address": { "$ref": "#/$defs/Address" }, @@ -777,10 +786,36 @@ }, "required": [ "type", - "port", "namespace", "tools" ], + "oneOf": [ + { + "properties": { + "transport": { + "const": "stdio" + } + }, + "required": [ + "transport" + ], + "not": { + "required": [ + "port" + ] + } + }, + { + "properties": { + "transport": { + "const": "http" + } + }, + "required": [ + "port" + ] + } + ], "additionalProperties": false }, "McpTool": { diff --git a/src/test/java/io/naftiko/engine/CapabilityMcpStdioIntegrationTest.java b/src/test/java/io/naftiko/engine/CapabilityMcpStdioIntegrationTest.java new file mode 100644 index 0000000..23b6133 --- /dev/null +++ b/src/test/java/io/naftiko/engine/CapabilityMcpStdioIntegrationTest.java @@ -0,0 +1,241 @@ +/** + * Copyright 2025-2026 Naftiko + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package io.naftiko.engine; + +import static org.junit.jupiter.api.Assertions.*; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.nio.charset.StandardCharsets; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; +import io.naftiko.Capability; +import io.naftiko.engine.exposes.McpProtocolDispatcher; +import io.naftiko.engine.exposes.McpServerAdapter; +import io.naftiko.engine.exposes.ServerAdapter; +import io.naftiko.engine.exposes.StdioJsonRpcHandler; +import io.naftiko.spec.NaftikoSpec; +import io.naftiko.spec.exposes.McpServerSpec; + +/** + * Integration tests for MCP Server Adapter with stdio transport. + * Tests YAML deserialization, transport selection, and stdio JSON-RPC protocol. + */ +public class CapabilityMcpStdioIntegrationTest { + + private Capability capability; + + @BeforeEach + public void setUp() throws Exception { + String resourcePath = "src/test/resources/mcp-stdio-capability.yaml"; + File file = new File(resourcePath); + + assertTrue(file.exists(), + "MCP stdio capability test file should exist at " + resourcePath); + + ObjectMapper mapper = new ObjectMapper(new YAMLFactory()); + mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + NaftikoSpec spec = mapper.readValue(file, NaftikoSpec.class); + + capability = new Capability(spec); + } + + @Test + public void testStdioTransportDeserialized() { + McpServerAdapter adapter = (McpServerAdapter) capability.getServerAdapters().get(0); + McpServerSpec spec = adapter.getMcpServerSpec(); + + assertEquals("mcp", spec.getType(), "Type should be 'mcp'"); + assertEquals("stdio", spec.getTransport(), "Transport should be 'stdio'"); + assertTrue(spec.isStdio(), "isStdio() should return true"); + assertEquals("test-mcp-stdio", spec.getNamespace(), + "Namespace should be 'test-mcp-stdio'"); + } + + @Test + public void testStdioAdapterIsMcpServerAdapter() { + ServerAdapter adapter = capability.getServerAdapters().get(0); + assertInstanceOf(McpServerAdapter.class, adapter, + "Stdio transport should use the same McpServerAdapter class"); + } + + @Test + public void testStdioToolsBuilt() { + McpServerAdapter adapter = (McpServerAdapter) capability.getServerAdapters().get(0); + assertNotNull(adapter.getTools(), "Tools should be built"); + assertEquals(1, adapter.getTools().size(), "Should have one tool"); + assertEquals("query-database", adapter.getTools().get(0).name(), + "Tool name should match"); + } + + @Test + public void testStdioAdapterStartAndStop() throws Exception { + // Verify lifecycle works — but don't actually start on System.in + // (that would interfere with the test runner's own stdin). + // Instead, verify the adapter is correctly configured for stdio. + McpServerAdapter adapter = (McpServerAdapter) capability.getServerAdapters().get(0); + assertTrue(adapter.getMcpServerSpec().isStdio(), + "Adapter should be configured for stdio transport"); + assertNotNull(adapter.getToolHandler(), + "Tool handler should be initialized regardless of transport"); + } + + @Test + public void testStdioInitializeProtocol() throws Exception { + McpServerAdapter adapter = (McpServerAdapter) capability.getServerAdapters().get(0); + + // Simulate an initialize request via the protocol dispatcher + McpProtocolDispatcher dispatcher = new McpProtocolDispatcher(adapter); + ObjectMapper mapper = new ObjectMapper(); + + String initRequest = "{\"jsonrpc\":\"2.0\",\"id\":1,\"method\":\"initialize\"," + + "\"params\":{\"protocolVersion\":\"2025-03-26\"," + + "\"clientInfo\":{\"name\":\"test\",\"version\":\"1.0\"}," + + "\"capabilities\":{}}}"; + + JsonNode request = mapper.readTree(initRequest); + JsonNode response = dispatcher.dispatch(request); + + assertNotNull(response, "Initialize should return a response"); + assertEquals("2.0", response.path("jsonrpc").asText()); + assertEquals(1, response.path("id").asInt()); + + JsonNode result = response.get("result"); + assertNotNull(result, "Should have a result"); + assertEquals("2025-03-26", result.path("protocolVersion").asText()); + assertEquals("test-mcp-stdio", + result.path("serverInfo").path("name").asText()); + } + + @Test + public void testStdioToolsListProtocol() throws Exception { + McpServerAdapter adapter = (McpServerAdapter) capability.getServerAdapters().get(0); + + McpProtocolDispatcher dispatcher = new McpProtocolDispatcher(adapter); + ObjectMapper mapper = new ObjectMapper(); + + String listRequest = "{\"jsonrpc\":\"2.0\",\"id\":2,\"method\":\"tools/list\"}"; + + JsonNode response = dispatcher.dispatch(mapper.readTree(listRequest)); + + assertNotNull(response); + JsonNode tools = response.path("result").path("tools"); + assertTrue(tools.isArray(), "tools should be an array"); + assertEquals(1, tools.size(), "Should list one tool"); + assertEquals("query-database", tools.get(0).path("name").asText()); + } + + @Test + public void testStdioNotificationReturnsNull() throws Exception { + McpServerAdapter adapter = (McpServerAdapter) capability.getServerAdapters().get(0); + + McpProtocolDispatcher dispatcher = new McpProtocolDispatcher(adapter); + ObjectMapper mapper = new ObjectMapper(); + + String notification = "{\"jsonrpc\":\"2.0\",\"method\":\"notifications/initialized\"}"; + + JsonNode response = dispatcher.dispatch(mapper.readTree(notification)); + assertNull(response, "Notifications should return null (no response)"); + } + + @Test + public void testStdioPingProtocol() throws Exception { + McpServerAdapter adapter = (McpServerAdapter) capability.getServerAdapters().get(0); + + McpProtocolDispatcher dispatcher = new McpProtocolDispatcher(adapter); + ObjectMapper mapper = new ObjectMapper(); + + String pingRequest = "{\"jsonrpc\":\"2.0\",\"id\":3,\"method\":\"ping\"}"; + + JsonNode response = dispatcher.dispatch(mapper.readTree(pingRequest)); + + assertNotNull(response); + assertEquals(3, response.path("id").asInt()); + assertNotNull(response.get("result"), "Ping should return an empty result"); + } + + @Test + public void testStdioHandlerEndToEnd() throws Exception { + McpServerAdapter adapter = (McpServerAdapter) capability.getServerAdapters().get(0); + + McpProtocolDispatcher dispatcher = new McpProtocolDispatcher(adapter); + + // Build a multi-line input: initialize + tools/list + ping + String input = "{\"jsonrpc\":\"2.0\",\"id\":1,\"method\":\"initialize\"," + + "\"params\":{\"protocolVersion\":\"2025-03-26\"," + + "\"clientInfo\":{\"name\":\"test\",\"version\":\"1.0\"}," + + "\"capabilities\":{}}}\n" + + "{\"jsonrpc\":\"2.0\",\"method\":\"notifications/initialized\"}\n" + + "{\"jsonrpc\":\"2.0\",\"id\":2,\"method\":\"tools/list\"}\n" + + "{\"jsonrpc\":\"2.0\",\"id\":3,\"method\":\"ping\"}\n"; + + ByteArrayInputStream in = new ByteArrayInputStream( + input.getBytes(StandardCharsets.UTF_8)); + ByteArrayOutputStream out = new ByteArrayOutputStream(); + + StdioJsonRpcHandler handler = new StdioJsonRpcHandler(dispatcher, in, out); + handler.run(); // Runs synchronously since input is finite + + String output = out.toString(StandardCharsets.UTF_8); + String[] lines = output.strip().split("\\n"); + + // Should have 3 responses (notification has no response) + assertEquals(3, lines.length, + "Should have 3 response lines (initialize, tools/list, ping)"); + + ObjectMapper mapper = new ObjectMapper(); + + // Verify initialize response + JsonNode initResponse = mapper.readTree(lines[0]); + assertEquals(1, initResponse.path("id").asInt()); + assertEquals("2025-03-26", + initResponse.path("result").path("protocolVersion").asText()); + + // Verify tools/list response + JsonNode toolsResponse = mapper.readTree(lines[1]); + assertEquals(2, toolsResponse.path("id").asInt()); + assertEquals("query-database", + toolsResponse.path("result").path("tools").get(0).path("name").asText()); + + // Verify ping response + JsonNode pingResponse = mapper.readTree(lines[2]); + assertEquals(3, pingResponse.path("id").asInt()); + } + + @Test + public void testHttpTransportDefaultWhenNotSet() throws Exception { + // Load the original MCP capability (no transport field) + String resourcePath = "src/test/resources/mcp-capability.yaml"; + File file = new File(resourcePath); + assertTrue(file.exists()); + + ObjectMapper mapper = new ObjectMapper(new YAMLFactory()); + mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + NaftikoSpec spec = mapper.readValue(file, NaftikoSpec.class); + + Capability httpCapability = new Capability(spec); + McpServerAdapter adapter = + (McpServerAdapter) httpCapability.getServerAdapters().get(0); + + assertEquals("http", adapter.getMcpServerSpec().getTransport(), + "Transport should default to 'http' when not set"); + assertFalse(adapter.getMcpServerSpec().isStdio(), + "isStdio() should return false for default transport"); + } + +} diff --git a/src/test/resources/mcp-stdio-capability.yaml b/src/test/resources/mcp-stdio-capability.yaml new file mode 100644 index 0000000..a8cc15c --- /dev/null +++ b/src/test/resources/mcp-stdio-capability.yaml @@ -0,0 +1,64 @@ +--- +naftiko: "0.4" +info: + label: "MCP Stdio Test Capability" + description: "Test capability for MCP Server Adapter with stdio transport" + tags: + - Test + - MCP + - Stdio + created: "2026-03-02" + modified: "2026-03-02" + +capability: + exposes: + - type: "mcp" + transport: "stdio" + namespace: "test-mcp-stdio" + description: "Test MCP server using stdio transport for local IDE development." + + tools: + - name: "query-database" + description: "Query the test database to retrieve committed participants." + call: "mock-api.query-db" + with: + datasource_id: "test-db-id-123" + outputParameters: + - type: "array" + mapping: "$.results" + items: + - type: "object" + properties: + name: + type: "string" + mapping: "$.properties.Name.title[0].text.content" + status: + type: "string" + mapping: "$.properties.Status.select.name" + + consumes: + - type: "http" + namespace: "mock-api" + baseUri: "http://localhost:8080/v1/" + inputParameters: + - name: "Content-Type" + in: "header" + value: "application/json" + + resources: + - path: "databases/{{datasource_id}}/query" + name: "query" + label: "Query database resource" + operations: + - method: "POST" + name: "query-db" + label: "Query Database" + body: | + { + "filter": { + "property": "Status", + "select": { + "equals": "Committed" + } + } + }