diff --git a/README.md b/README.md index 40a64f17..8cbffb77 100644 --- a/README.md +++ b/README.md @@ -1425,6 +1425,22 @@ Set `stateless: true` in `MCP::Server::Transports::StreamableHTTPTransport.new` transport = MCP::Server::Transports::StreamableHTTPTransport.new(server, stateless: true) ``` +You can enable JSON response mode, where the server returns `application/json` instead of `text/event-stream`. +Set `enable_json_response: true` in `MCP::Server::Transports::StreamableHTTPTransport.new`: + +```ruby +# JSON response mode +transport = MCP::Server::Transports::StreamableHTTPTransport.new(server, enable_json_response: true) +``` + +In JSON response mode, the POST response is a single JSON object, so server-to-client messages +that need to arrive during request processing are not supported: +request-scoped notifications (`progress`, `log`) are silently dropped, and all server-to-client requests +(`sampling/createMessage`, `roots/list`, `elicitation/create`) raise an error. +Session-scoped standalone notifications (`resources/updated`, `elicitation/complete`) and +broadcast notifications (`tools/list_changed`, etc.) still flow to clients connected to the GET SSE stream. +This mode is suitable for simple tool servers that do not need server-initiated requests. + By default, sessions do not expire. To mitigate session hijacking risks, you can set a `session_idle_timeout` (in seconds). When configured, sessions that receive no HTTP requests for this duration are automatically expired and cleaned up: diff --git a/lib/mcp/server/transports/streamable_http_transport.rb b/lib/mcp/server/transports/streamable_http_transport.rb index 5c6c068e..284e7584 100644 --- a/lib/mcp/server/transports/streamable_http_transport.rb +++ b/lib/mcp/server/transports/streamable_http_transport.rb @@ -22,13 +22,14 @@ class StreamableHTTPTransport < Transport "Connection" => "keep-alive", }.freeze - def initialize(server, stateless: false, session_idle_timeout: nil) + def initialize(server, stateless: false, enable_json_response: false, session_idle_timeout: nil) super(server) # Maps `session_id` to `{ get_sse_stream: stream_object, server_session: ServerSession, last_active_at: float_from_monotonic_clock }`. @sessions = {} @mutex = Mutex.new @stateless = stateless + @enable_json_response = enable_json_response @session_idle_timeout = session_idle_timeout @pending_responses = {} @@ -43,7 +44,8 @@ def initialize(server, stateless: false, session_idle_timeout: nil) start_reaper_thread if @session_idle_timeout end - REQUIRED_POST_ACCEPT_TYPES = ["application/json", "text/event-stream"].freeze + REQUIRED_POST_ACCEPT_TYPES_SSE = ["application/json", "text/event-stream"].freeze + REQUIRED_POST_ACCEPT_TYPES_JSON = ["application/json"].freeze REQUIRED_GET_ACCEPT_TYPES = ["text/event-stream"].freeze STREAM_WRITE_ERRORS = [IOError, Errno::EPIPE, Errno::ECONNRESET].freeze SESSION_REAP_INTERVAL = 60 @@ -94,6 +96,12 @@ def send_notification(method, params = nil, session_id: nil, related_request_id: result = @mutex.synchronize do if session_id + # JSON response mode returns a single JSON object as the POST response, + # so request-scoped notifications (e.g. progress, log) cannot be delivered + # alongside it. Session-scoped standalone notifications + # (e.g. `resources/updated`, `elicitation/complete`) still flow via GET SSE. + next false if @enable_json_response && related_request_id + # Send to specific session if (session = @sessions[session_id]) stream = active_stream(session, related_request_id: related_request_id) @@ -172,6 +180,10 @@ def send_request(method, params = nil, session_id: nil, related_request_id: nil) raise "Stateless mode does not support server-to-client requests." end + if @enable_json_response + raise "JSON response mode does not support server-to-client requests." + end + unless session_id raise "session_id is required for server-to-client requests." end @@ -278,7 +290,8 @@ def send_ping_to_stream(stream) end def handle_post(request) - accept_error = validate_accept_header(request, REQUIRED_POST_ACCEPT_TYPES) + required_types = @enable_json_response ? REQUIRED_POST_ACCEPT_TYPES_JSON : REQUIRED_POST_ACCEPT_TYPES_SSE + accept_error = validate_accept_header(request, required_types) return accept_error if accept_error content_type_error = validate_content_type(request) @@ -519,7 +532,7 @@ def handle_regular_request(body_string, session_id, related_request_id: nil) end end - if session_id && !@stateless + if session_id && !@stateless && !@enable_json_response handle_request_with_sse_response(body_string, session_id, server_session, related_request_id: related_request_id) else response = dispatch_handle_json(body_string, server_session) diff --git a/test/mcp/server/transports/streamable_http_transport_test.rb b/test/mcp/server/transports/streamable_http_transport_test.rb index d3d85f46..6c1983d5 100644 --- a/test/mcp/server/transports/streamable_http_transport_test.rb +++ b/test/mcp/server/transports/streamable_http_transport_test.rb @@ -1549,6 +1549,421 @@ def string assert_equal "456", body["id"] end + test "JSON response mode returns application/json for POST requests" do + server = Server.new(name: "test", tools: [], prompts: [], resources: []) + transport = StreamableHTTPTransport.new(server, enable_json_response: true) + server.transport = transport + + init_request = create_rack_request( + "POST", + "/", + { "CONTENT_TYPE" => "application/json" }, + { jsonrpc: "2.0", method: "initialize", id: "init" }.to_json, + ) + init_response = transport.handle_request(init_request) + session_id = init_response[1]["Mcp-Session-Id"] + + request = create_rack_request( + "POST", + "/", + { + "CONTENT_TYPE" => "application/json", + "HTTP_MCP_SESSION_ID" => session_id, + }, + { jsonrpc: "2.0", method: "ping", id: "1" }.to_json, + ) + + response = transport.handle_request(request) + assert_equal(200, response[0]) + assert_equal("application/json", response[1]["Content-Type"]) + + body = JSON.parse(response[2][0]) + assert_equal("1", body["id"]) + ensure + transport.close + end + + test "JSON response mode accepts application/json only in Accept header" do + server = Server.new(name: "test", tools: [], prompts: [], resources: []) + transport = StreamableHTTPTransport.new(server, enable_json_response: true) + server.transport = transport + + request = create_rack_request_without_accept( + "POST", + "/", + { + "CONTENT_TYPE" => "application/json", + "HTTP_ACCEPT" => "application/json", + }, + { jsonrpc: "2.0", method: "initialize", id: "init" }.to_json, + ) + + response = transport.handle_request(request) + assert_equal(200, response[0]) + ensure + transport.close + end + + test "JSON response mode returns 406 when Accept header is missing" do + server = Server.new(name: "test", tools: [], prompts: [], resources: []) + transport = StreamableHTTPTransport.new(server, enable_json_response: true) + server.transport = transport + + request = create_rack_request_without_accept( + "POST", + "/", + { "CONTENT_TYPE" => "application/json" }, + { jsonrpc: "2.0", method: "initialize", id: "init" }.to_json, + ) + + response = transport.handle_request(request) + assert_equal(406, response[0]) + ensure + transport.close + end + + test "JSON response mode accepts wildcard Accept header" do + server = Server.new(name: "test", tools: [], prompts: [], resources: []) + transport = StreamableHTTPTransport.new(server, enable_json_response: true) + server.transport = transport + + request = create_rack_request_without_accept( + "POST", + "/", + { + "CONTENT_TYPE" => "application/json", + "HTTP_ACCEPT" => "*/*", + }, + { jsonrpc: "2.0", method: "initialize", id: "init" }.to_json, + ) + + response = transport.handle_request(request) + assert_equal(200, response[0]) + ensure + transport.close + end + + test "JSON response mode drops notifications during tool execution" do + server = Server.new(name: "test", tools: [], prompts: [], resources: []) + server.logging_message_notification = MCP::LoggingMessageNotification.new(level: "debug") + transport = StreamableHTTPTransport.new(server, enable_json_response: true) + server.transport = transport + + server.define_tool(name: "log_tool") do |server_context:| + server_context.notify_log_message(data: "should be dropped", level: "info") + Tool::Response.new([{ type: "text", text: "ok" }]) + end + server.server_context = server + + init_request = create_rack_request( + "POST", + "/", + { "CONTENT_TYPE" => "application/json" }, + { + jsonrpc: "2.0", + method: "initialize", + id: "init", + params: { protocolVersion: "2025-11-25", clientInfo: { name: "test" } }, + }.to_json, + ) + init_response = transport.handle_request(init_request) + session_id = init_response[1]["Mcp-Session-Id"] + + tool_request = create_rack_request( + "POST", + "/", + { + "CONTENT_TYPE" => "application/json", + "HTTP_MCP_SESSION_ID" => session_id, + }, + { + jsonrpc: "2.0", + id: "call-1", + method: "tools/call", + params: { name: "log_tool", arguments: {} }, + }.to_json, + ) + + response = transport.handle_request(tool_request) + assert_equal(200, response[0]) + assert_equal("application/json", response[1]["Content-Type"]) + + body = JSON.parse(response[2][0]) + assert_equal("call-1", body["id"]) + refute_includes(response[2][0], "should be dropped") + ensure + transport.close + end + + test "JSON response mode drops notifications even with GET SSE connected" do + server = Server.new(name: "test", tools: [], prompts: [], resources: []) + server.logging_message_notification = MCP::LoggingMessageNotification.new(level: "debug") + transport = StreamableHTTPTransport.new(server, enable_json_response: true) + server.transport = transport + + server.define_tool(name: "log_tool") do |server_context:| + server_context.notify_log_message(data: "should not leak", level: "info") + Tool::Response.new([{ type: "text", text: "ok" }]) + end + server.server_context = server + + init_request = create_rack_request( + "POST", + "/", + { "CONTENT_TYPE" => "application/json" }, + { + jsonrpc: "2.0", + method: "initialize", + id: "init", + params: { protocolVersion: "2025-11-25", clientInfo: { name: "test" } }, + }.to_json, + ) + init_response = transport.handle_request(init_request) + session_id = init_response[1]["Mcp-Session-Id"] + + # Connect GET SSE. + io = StringIO.new + get_request = create_rack_request( + "GET", + "/", + { "HTTP_MCP_SESSION_ID" => session_id }, + ) + get_response = transport.handle_request(get_request) + get_response[2].call(io) if get_response[2].is_a?(Proc) + sleep(0.1) + + # Call tool that sends a notification. + tool_request = create_rack_request( + "POST", + "/", + { + "CONTENT_TYPE" => "application/json", + "HTTP_MCP_SESSION_ID" => session_id, + }, + { + jsonrpc: "2.0", + id: "call-1", + method: "tools/call", + params: { name: "log_tool", arguments: {} }, + }.to_json, + ) + + response = transport.handle_request(tool_request) + assert_equal(200, response[0]) + assert_equal("application/json", response[1]["Content-Type"]) + + # Notification should NOT leak to GET SSE stream. + io.rewind + refute_includes(io.read, "should not leak") + ensure + transport.close + end + + test "JSON response mode raises on send_request (sampling)" do + server = Server.new(name: "test", tools: [], prompts: [], resources: []) + transport = StreamableHTTPTransport.new(server, enable_json_response: true) + server.transport = transport + + error = assert_raises(RuntimeError) do + transport.send_request("sampling/createMessage", { messages: [] }, session_id: "s1") + end + + assert_equal("JSON response mode does not support server-to-client requests.", error.message) + ensure + transport.close + end + + test "JSON response mode raises on send_request (roots/list)" do + server = Server.new(name: "test", tools: [], prompts: [], resources: []) + transport = StreamableHTTPTransport.new(server, enable_json_response: true) + server.transport = transport + + error = assert_raises(RuntimeError) do + transport.send_request("roots/list", nil, session_id: "s1") + end + + assert_equal("JSON response mode does not support server-to-client requests.", error.message) + ensure + transport.close + end + + test "JSON response mode raises on send_request (elicitation/create)" do + server = Server.new(name: "test", tools: [], prompts: [], resources: []) + transport = StreamableHTTPTransport.new(server, enable_json_response: true) + server.transport = transport + + error = assert_raises(RuntimeError) do + transport.send_request( + "elicitation/create", + { mode: "form", message: "test", requestedSchema: {} }, + session_id: "s1", + ) + end + + assert_equal("JSON response mode does not support server-to-client requests.", error.message) + ensure + transport.close + end + + test "JSON response mode allows broadcast notifications via GET SSE" do + server = Server.new(name: "test", tools: [], prompts: [], resources: []) + transport = StreamableHTTPTransport.new(server, enable_json_response: true) + server.transport = transport + server.server_context = server + + server.define_tool(name: "notify_tool") do |server_context:| + server_context.notify_tools_list_changed + Tool::Response.new([{ type: "text", text: "ok" }]) + end + + init_request = create_rack_request( + "POST", + "/", + { "CONTENT_TYPE" => "application/json" }, + { + jsonrpc: "2.0", + method: "initialize", + id: "init", + params: { protocolVersion: "2025-11-25", clientInfo: { name: "test" } }, + }.to_json, + ) + init_response = transport.handle_request(init_request) + session_id = init_response[1]["Mcp-Session-Id"] + + # Connect GET SSE. + io = StringIO.new + get_request = create_rack_request( + "GET", + "/", + { "HTTP_MCP_SESSION_ID" => session_id }, + ) + get_response = transport.handle_request(get_request) + get_response[2].call(io) if get_response[2].is_a?(Proc) + sleep(0.1) + + # Call tool that triggers a broadcast notification. + tool_request = create_rack_request( + "POST", + "/", + { + "CONTENT_TYPE" => "application/json", + "HTTP_MCP_SESSION_ID" => session_id, + }, + { + jsonrpc: "2.0", + id: "call-1", + method: "tools/call", + params: { name: "notify_tool", arguments: {} }, + }.to_json, + ) + + response = transport.handle_request(tool_request) + assert_equal(200, response[0]) + assert_equal("application/json", response[1]["Content-Type"]) + + # Broadcast notification should arrive on GET SSE stream. + io.rewind + assert_includes(io.read, "notifications/tools/list_changed") + ensure + transport.close + end + + test "JSON response mode with stateless returns application/json without session ID" do + server = Server.new(name: "test", tools: [], prompts: [], resources: []) + transport = StreamableHTTPTransport.new(server, stateless: true, enable_json_response: true) + server.transport = transport + + request = create_rack_request( + "POST", + "/", + { "CONTENT_TYPE" => "application/json" }, + { jsonrpc: "2.0", method: "ping", id: "1" }.to_json, + ) + + response = transport.handle_request(request) + assert_equal(200, response[0]) + assert_equal("application/json", response[1]["Content-Type"]) + assert_nil(response[1]["Mcp-Session-Id"]) + + body = JSON.parse(response[2][0]) + assert_equal("1", body["id"]) + ensure + transport.close + end + + test "JSON response mode with stateless returns 405 on GET" do + server = Server.new(name: "test", tools: [], prompts: [], resources: []) + transport = StreamableHTTPTransport.new(server, stateless: true, enable_json_response: true) + server.transport = transport + + request = create_rack_request("GET", "/", {}) + + response = transport.handle_request(request) + assert_equal(405, response[0]) + ensure + transport.close + end + + test "JSON response mode delivers session-scoped standalone notifications via GET SSE" do + server = Server.new(name: "test", tools: [], prompts: [], resources: []) + transport = StreamableHTTPTransport.new(server, enable_json_response: true) + server.transport = transport + server.server_context = server + + server.define_tool(name: "touch_tool") do |server_context:| + server_context.notify_resources_updated(uri: "file:///example.txt") + Tool::Response.new([{ type: "text", text: "ok" }]) + end + + init_request = create_rack_request( + "POST", + "/", + { "CONTENT_TYPE" => "application/json" }, + { + jsonrpc: "2.0", + method: "initialize", + id: "init", + params: { protocolVersion: "2025-11-25", clientInfo: { name: "test" } }, + }.to_json, + ) + init_response = transport.handle_request(init_request) + session_id = init_response[1]["Mcp-Session-Id"] + + io = StringIO.new + get_request = create_rack_request( + "GET", + "/", + { "HTTP_MCP_SESSION_ID" => session_id }, + ) + get_response = transport.handle_request(get_request) + get_response[2].call(io) if get_response[2].is_a?(Proc) + sleep(0.1) + + tool_request = create_rack_request( + "POST", + "/", + { + "CONTENT_TYPE" => "application/json", + "HTTP_MCP_SESSION_ID" => session_id, + }, + { + jsonrpc: "2.0", + id: "call-1", + method: "tools/call", + params: { name: "touch_tool", arguments: {} }, + }.to_json, + ) + + response = transport.handle_request(tool_request) + assert_equal(200, response[0]) + assert_equal("application/json", response[1]["Content-Type"]) + + io.rewind + assert_includes(io.read, "notifications/resources/updated") + ensure + transport.close + end + test "handle post request with a standard error" do request = create_rack_request( "POST",