Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
238 changes: 46 additions & 192 deletions src/main/java/io/naftiko/engine/exposes/JettyMcpStreamableHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,18 @@
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.logging.Level;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.io.Content;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Response;
import org.eclipse.jetty.util.Callback;
import org.restlet.Context;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.modelcontextprotocol.spec.McpSchema;

/**
* Jetty Handler implementing the MCP Streamable HTTP transport protocol.
Expand All @@ -39,21 +39,20 @@
* - GET: SSE stream for server-initiated messages (returns 405 - not supported)
* - DELETE: Session termination
*
* Uses MCP Java SDK types (McpSchema) for JSON-RPC message serialization.
* Delegates protocol dispatch to {@link McpProtocolDispatcher} and adds
* HTTP-specific concerns: session management, HTTP status codes, content types.
*/
public class JettyMcpStreamableHandler extends Handler.Abstract {

private static final String JSONRPC_VERSION = "2.0";
private static final String MCP_PROTOCOL_VERSION = "2025-03-26";
private static final String HEADER_MCP_SESSION_ID = "Mcp-Session-Id";

private final McpServerAdapter adapter;
private final McpProtocolDispatcher dispatcher;
private final ObjectMapper mapper;
private final Map<String, Boolean> activeSessions;

public JettyMcpStreamableHandler(McpServerAdapter adapter) {
this.adapter = adapter;
this.mapper = new ObjectMapper();
this.dispatcher = new McpProtocolDispatcher(adapter);
this.mapper = dispatcher.getMapper();
this.activeSessions = new ConcurrentHashMap<>();
}

Expand Down Expand Up @@ -84,169 +83,63 @@ private boolean handlePost(Request request, Response response, Callback callback
String body = Content.Source.asString(request, StandardCharsets.UTF_8);

if (body == null || body.isBlank()) {
writeJsonRpcError(response, callback, null, -32700, "Parse error: empty body");
Context.getCurrentLogger().log(Level.WARNING, "Error processing request. Missing or empty body");
ObjectNode error = dispatcher.buildJsonRpcError(null, -32700,
"Parse error: empty body");
writeJson(response, callback, 200, error);
return true;
}

JsonNode root = mapper.readTree(body);
String jsonrpc = root.path("jsonrpc").asText("");
JsonNode idNode = root.get("id");
String rpcMethod = root.path("method").asText("");
JsonNode params = root.get("params");

if (!JSONRPC_VERSION.equals(jsonrpc)) {
writeJsonRpcError(response, callback, idNode, -32600,
"Invalid Request: jsonrpc must be '2.0'");
return true;
// Handle initialize specially — we need to create the session
if ("initialize".equals(rpcMethod)) {
String sessionId = UUID.randomUUID().toString();
activeSessions.put(sessionId, true);
response.getHeaders().put(HEADER_MCP_SESSION_ID, sessionId);
}

switch (rpcMethod) {
case "initialize":
return handleInitialize(request, response, callback, idNode, params);

case "notifications/initialized":
// Notification — no response needed
response.setStatus(202);
response.write(true, null, callback);
return true;

case "tools/list":
return handleToolsList(request, response, callback, idNode);

case "tools/call":
return handleToolsCall(request, response, callback, idNode, params);
// Handle notifications/initialized — return 202 with no body
if ("notifications/initialized".equals(rpcMethod)) {
response.setStatus(202);
response.write(true, null, callback);
return true;
}

case "ping":
return handlePing(response, callback, idNode);
// Delegate to the shared protocol dispatcher
ObjectNode result = dispatcher.dispatch(root);

default:
writeJsonRpcError(response, callback, idNode, -32601,
"Method not found: " + rpcMethod);
return true;
if (result != null) {
writeJson(response, callback, 200, result);
} else {
// Notification — no response body
response.setStatus(202);
response.write(true, null, callback);
}

return true;
} catch (JsonProcessingException e) {
writeJsonRpcError(response, callback, null, -32700, "Parse error: " + e.getMessage());
Context.getCurrentLogger().log(Level.SEVERE, "Error processing request", e);
ObjectNode error = dispatcher.buildJsonRpcError(null, -32700,
"Parse error: " + e.getMessage());
try {
writeJson(response, callback, 200, error);
} catch (Exception ex) {
writeError(response, callback, 500, "Internal server error");
}
return true;
} catch (Exception e) {
writeJsonRpcError(response, callback, null, -32603,
Context.getCurrentLogger().log(Level.SEVERE, "Error processing request", e);
ObjectNode error = dispatcher.buildJsonRpcError(null, -32603,
"Internal error: " + e.getMessage());
return true;
}
}

/**
* Handle MCP initialize request.
* Returns server capabilities and assigns a session ID.
*/
private boolean handleInitialize(Request request, Response response, Callback callback,
JsonNode id, JsonNode params) throws Exception {

String sessionId = UUID.randomUUID().toString();
activeSessions.put(sessionId, true);

// Build InitializeResult
ObjectNode result = mapper.createObjectNode();
result.put("protocolVersion", MCP_PROTOCOL_VERSION);

// Server capabilities — we support tools
ObjectNode capabilities = mapper.createObjectNode();
capabilities.putObject("tools");
result.set("capabilities", capabilities);

// Server info
ObjectNode serverInfo = mapper.createObjectNode();
serverInfo.put("name", adapter.getMcpServerSpec().getNamespace());
serverInfo.put("version", "1.0.0");
result.set("serverInfo", serverInfo);

// Instructions from the spec description
if (adapter.getMcpServerSpec().getDescription() != null) {
result.put("instructions", adapter.getMcpServerSpec().getDescription());
}

// Set session header
response.getHeaders().put(HEADER_MCP_SESSION_ID, sessionId);

writeJsonRpcResult(response, callback, id, result);
return true;
}

/**
* Handle tools/list request.
* Returns the list of available tools built from the spec.
*/
private boolean handleToolsList(Request request, Response response, Callback callback,
JsonNode id) throws Exception {

ObjectNode result = mapper.createObjectNode();
ArrayNode toolsArray = result.putArray("tools");

for (McpSchema.Tool tool : adapter.getTools()) {
ObjectNode toolNode = mapper.createObjectNode();
toolNode.put("name", tool.name());
if (tool.description() != null) {
toolNode.put("description", tool.description());
}
if (tool.inputSchema() != null) {
toolNode.set("inputSchema", mapper.valueToTree(tool.inputSchema()));
try {
writeJson(response, callback, 200, error);
} catch (Exception ex) {
writeError(response, callback, 500, "Internal server error");
}
toolsArray.add(toolNode);
}

writeJsonRpcResult(response, callback, id, result);
return true;
}

/**
* Handle tools/call request.
* Dispatches tool invocation to the McpToolHandler.
*/
private boolean handleToolsCall(Request request, Response response, Callback callback,
JsonNode id, JsonNode params) throws Exception {

if (params == null) {
writeJsonRpcError(response, callback, id, -32602, "Invalid params: missing params");
return true;
}

String toolName = params.path("name").asText("");
JsonNode argumentsNode = params.get("arguments");

@SuppressWarnings("unchecked")
Map<String, Object> arguments = argumentsNode != null
? mapper.treeToValue(argumentsNode, Map.class)
: new ConcurrentHashMap<>();

try {
McpSchema.CallToolResult toolResult =
adapter.getToolHandler().handleToolCall(toolName, arguments);

ObjectNode result = mapper.valueToTree(toolResult);
writeJsonRpcResult(response, callback, id, result);
} catch (IllegalArgumentException e) {
writeJsonRpcError(response, callback, id, -32602,
"Invalid params: " + e.getMessage());
} catch (Exception e) {
// Tool execution error — return as a tool result with isError=true
ObjectNode result = mapper.createObjectNode();
ArrayNode content = result.putArray("content");
ObjectNode textContent = content.addObject();
textContent.put("type", "text");
textContent.put("text", "Error: " + e.getMessage());
result.put("isError", true);
writeJsonRpcResult(response, callback, id, result);
}

return true;
}

/**
* Handle ping request.
*/
private boolean handlePing(Response response, Callback callback, JsonNode id)
throws Exception {
writeJsonRpcResult(response, callback, id, mapper.createObjectNode());
return true;
}

/**
Expand All @@ -262,45 +155,6 @@ private boolean handleDelete(Request request, Response response, Callback callba
return true;
}

/**
* Write a JSON-RPC success response.
*/
private void writeJsonRpcResult(Response response, Callback callback, JsonNode id,
JsonNode result) throws Exception {
ObjectNode envelope = mapper.createObjectNode();
envelope.put("jsonrpc", JSONRPC_VERSION);
if (id != null) {
envelope.set("id", id);
}
envelope.set("result", result);

writeJson(response, callback, 200, envelope);
}

/**
* Write a JSON-RPC error response.
*/
private void writeJsonRpcError(Response response, Callback callback, JsonNode id, int code,
String message) {
try {
ObjectNode envelope = mapper.createObjectNode();
envelope.put("jsonrpc", JSONRPC_VERSION);
if (id != null) {
envelope.set("id", id);
} else {
envelope.putNull("id");
}

ObjectNode error = envelope.putObject("error");
error.put("code", code);
error.put("message", message);

writeJson(response, callback, 200, envelope);
} catch (Exception e) {
writeError(response, callback, 500, "Internal server error");
}
}

/**
* Write a JSON response body.
*/
Expand Down
Loading