Skip to content

Commit 5c0fee3

Browse files
authored
Merge pull request #322 from atesgoral/ag/client-sse-parser
Parse SSE responses in HTTP client via event_stream_parser
2 parents 2edba77 + b832933 commit 5c0fee3

5 files changed

Lines changed: 156 additions & 12 deletions

File tree

Gemfile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ gem "yard", "~> 0.9"
2323
gem "yard-sorbet", "~> 0.9" if RUBY_VERSION >= "3.1"
2424

2525
group :test do
26+
gem "event_stream_parser", ">= 1.0"
2627
gem "faraday", ">= 2.0"
2728
gem "minitest", "~> 5.1", require: false
2829
gem "mocha"

README.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1635,11 +1635,12 @@ The stdio transport automatically handles:
16351635

16361636
Use the `MCP::Client::HTTP` transport to interact with MCP servers using simple HTTP requests.
16371637

1638-
You'll need to add `faraday` as a dependency in order to use the HTTP transport layer:
1638+
You'll need to add `faraday` as a dependency in order to use the HTTP transport layer. Add `event_stream_parser` as well if the server uses SSE (`text/event-stream`) responses:
16391639

16401640
```ruby
16411641
gem 'mcp'
16421642
gem 'faraday', '>= 2.0'
1643+
gem 'event_stream_parser', '>= 1.0' # optional, required only for SSE responses
16431644
```
16441645

16451646
Example usage:

docs/building-clients.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,11 +51,12 @@ stdio_transport.close
5151

5252
## HTTP Transport
5353

54-
Use `MCP::Client::HTTP` to interact with MCP servers over HTTP. Requires the `faraday` gem:
54+
Use `MCP::Client::HTTP` to interact with MCP servers over HTTP. Requires the `faraday` gem, plus `event_stream_parser` if the server uses SSE (`text/event-stream`) responses:
5555

5656
```ruby
5757
gem 'mcp'
5858
gem 'faraday', '>= 2.0'
59+
gem 'event_stream_parser', '>= 1.0' # optional, required only for SSE responses
5960
```
6061

6162
```ruby

lib/mcp/client/http.rb

Lines changed: 43 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,7 @@ def send_request(request:)
1818
params = request[:params] || request["params"]
1919

2020
response = client.post("", request)
21-
validate_response_content_type!(response, method, params)
22-
response.body
21+
parse_response_body(response, method, params)
2322
rescue Faraday::BadRequestError => e
2423
raise RequestHandlerError.new(
2524
"The #{method} request is invalid",
@@ -92,14 +91,52 @@ def require_faraday!
9291
"See https://rubygems.org/gems/faraday for more details."
9392
end
9493

95-
def validate_response_content_type!(response, method, params)
94+
def require_event_stream_parser!
95+
require "event_stream_parser"
96+
rescue LoadError
97+
raise LoadError, "The 'event_stream_parser' gem is required to parse SSE responses. " \
98+
"Add it to your Gemfile: gem 'event_stream_parser', '>= 1.0'. " \
99+
"See https://rubygems.org/gems/event_stream_parser for more details."
100+
end
101+
102+
def parse_response_body(response, method, params)
96103
content_type = response.headers["Content-Type"]
97-
return if content_type&.include?("application/json")
104+
105+
if content_type&.include?("text/event-stream")
106+
parse_sse_response(response.body, method, params)
107+
elsif content_type&.include?("application/json")
108+
response.body
109+
else
110+
raise RequestHandlerError.new(
111+
"Unsupported Content-Type: #{content_type.inspect}. Expected application/json or text/event-stream.",
112+
{ method: method, params: params },
113+
error_type: :unsupported_media_type,
114+
)
115+
end
116+
end
117+
118+
def parse_sse_response(body, method, params)
119+
require_event_stream_parser!
120+
121+
json_rpc_response = nil
122+
parser = EventStreamParser::Parser.new
123+
parser.feed(body.to_s) do |_type, data, _id|
124+
next if data.empty?
125+
126+
begin
127+
parsed = JSON.parse(data)
128+
json_rpc_response = parsed if parsed.is_a?(Hash) && (parsed.key?("result") || parsed.key?("error"))
129+
rescue JSON::ParserError
130+
next
131+
end
132+
end
133+
134+
return json_rpc_response if json_rpc_response
98135

99136
raise RequestHandlerError.new(
100-
"Unsupported Content-Type: #{content_type.inspect}. This client only supports JSON responses.",
137+
"No valid JSON-RPC response found in SSE stream",
101138
{ method: method, params: params },
102-
error_type: :unsupported_media_type,
139+
error_type: :parse_error,
103140
)
104141
end
105142
end

test/mcp/client/http_test.rb

Lines changed: 108 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
# frozen_string_literal: true
22

33
require "test_helper"
4+
require "event_stream_parser"
45
require "faraday"
56
require "webmock/minitest"
67
require "mcp/client/http"
@@ -25,6 +26,26 @@ def test_raises_load_error_when_faraday_not_available
2526
assert_includes(error.message, "Add it to your Gemfile: gem 'faraday', '>= 2.0'")
2627
end
2728

29+
def test_raises_load_error_when_event_stream_parser_not_available
30+
stub_request(:post, url)
31+
.to_return(
32+
status: 200,
33+
headers: { "Content-Type" => "text/event-stream" },
34+
body: "data: {}\n\n",
35+
)
36+
37+
HTTP.any_instance.stubs(:require).with("faraday").returns(true)
38+
HTTP.any_instance.stubs(:require).with("event_stream_parser")
39+
.raises(LoadError, "cannot load such file -- event_stream_parser")
40+
41+
error = assert_raises(LoadError) do
42+
client.send_request(request: { method: "tools/list" })
43+
end
44+
45+
assert_includes(error.message, "The 'event_stream_parser' gem is required to parse SSE responses")
46+
assert_includes(error.message, "Add it to your Gemfile: gem 'event_stream_parser', '>= 1.0'")
47+
end
48+
2849
def test_headers_are_added_to_the_request
2950
headers = { "Authorization" => "Bearer token" }
3051
client = HTTP.new(url: url, headers: headers)
@@ -267,7 +288,7 @@ def test_block_customizes_faraday_connection
267288
custom_client.send_request(request: request)
268289
end
269290

270-
def test_send_request_raises_error_for_non_json_response
291+
def test_send_request_raises_error_for_unsupported_content_type
271292
request = {
272293
jsonrpc: "2.0",
273294
id: "test_id",
@@ -278,22 +299,105 @@ def test_send_request_raises_error_for_non_json_response
278299
.with(body: request.to_json)
279300
.to_return(
280301
status: 200,
281-
headers: { "Content-Type" => "text/event-stream" },
282-
body: "data: {}\n\n",
302+
headers: { "Content-Type" => "text/html" },
303+
body: "<html></html>",
283304
)
284305

285306
error = assert_raises(RequestHandlerError) do
286307
client.send_request(request: request)
287308
end
288309

289310
assert_equal(
290-
'Unsupported Content-Type: "text/event-stream". This client only supports JSON responses.',
311+
'Unsupported Content-Type: "text/html". Expected application/json or text/event-stream.',
291312
error.message,
292313
)
293314
assert_equal(:unsupported_media_type, error.error_type)
294315
assert_equal({ method: "tools/list", params: nil }, error.request)
295316
end
296317

318+
def test_send_request_parses_sse_response
319+
request = {
320+
jsonrpc: "2.0",
321+
id: "test_id",
322+
method: "tools/list",
323+
}
324+
325+
sse_body = <<~SSE
326+
: comment
327+
data: {"jsonrpc":"2.0","method":"notifications/progress","params":{}}
328+
329+
data: {"jsonrpc":"2.0","id":"test_id","result":{"tools":[{"name":"echo"}]}}
330+
331+
SSE
332+
333+
stub_request(:post, url)
334+
.with(body: request.to_json)
335+
.to_return(
336+
status: 200,
337+
headers: { "Content-Type" => "text/event-stream" },
338+
body: sse_body,
339+
)
340+
341+
response = client.send_request(request: request)
342+
343+
assert_equal({ "tools" => [{ "name" => "echo" }] }, response["result"])
344+
end
345+
346+
def test_send_request_parses_sse_error_response
347+
request = {
348+
jsonrpc: "2.0",
349+
id: "test_id",
350+
method: "tools/list",
351+
}
352+
353+
sse_body = <<~SSE
354+
data: {"jsonrpc":"2.0","id":"test_id","error":{"code":-32600,"message":"Invalid request"}}
355+
356+
SSE
357+
358+
stub_request(:post, url)
359+
.with(body: request.to_json)
360+
.to_return(
361+
status: 200,
362+
headers: { "Content-Type" => "text/event-stream" },
363+
body: sse_body,
364+
)
365+
366+
response = client.send_request(request: request)
367+
368+
assert_equal(-32600, response.dig("error", "code"))
369+
assert_equal("Invalid request", response.dig("error", "message"))
370+
end
371+
372+
def test_send_request_raises_error_for_sse_without_response
373+
request = {
374+
jsonrpc: "2.0",
375+
id: "test_id",
376+
method: "tools/list",
377+
}
378+
379+
sse_body = <<~SSE
380+
: just a comment
381+
data: {"jsonrpc":"2.0","method":"notifications/progress","params":{}}
382+
383+
SSE
384+
385+
stub_request(:post, url)
386+
.with(body: request.to_json)
387+
.to_return(
388+
status: 200,
389+
headers: { "Content-Type" => "text/event-stream" },
390+
body: sse_body,
391+
)
392+
393+
error = assert_raises(RequestHandlerError) do
394+
client.send_request(request: request)
395+
end
396+
397+
assert_includes(error.message, "No valid JSON-RPC response found in SSE stream")
398+
assert_equal(:parse_error, error.error_type)
399+
end
400+
297401
private
298402

299403
def stub_request(method, url)

0 commit comments

Comments
 (0)