From dab88f19153164b4af836b1dccde8cd066bfc4e5 Mon Sep 17 00:00:00 2001 From: Shizuo Fujita Date: Tue, 9 Jun 2026 16:58:19 +0900 Subject: [PATCH 1/2] Experimental: Replace Yajl to JSON for handling JSON stream Signed-off-by: Shizuo Fujita --- Gemfile | 1 + fluentd.gemspec | 1 - lib/fluent/compat/exec_util.rb | 13 +++++++++---- lib/fluent/config/literal_parser.rb | 1 - lib/fluent/load.rb | 1 - lib/fluent/plugin/in_forward.rb | 16 +++++++++------- lib/fluent/plugin/in_unix.rb | 8 +++++--- lib/fluent/plugin/parser_json.rb | 25 +++++++++++++------------ test/plugin/test_parser_json.rb | 2 +- test/test_event_time.rb | 8 +------- test/test_formatter.rb | 12 ++++++------ 11 files changed, 45 insertions(+), 43 deletions(-) diff --git a/Gemfile b/Gemfile index 2420c512b6..14eb9b4e17 100644 --- a/Gemfile +++ b/Gemfile @@ -3,6 +3,7 @@ source 'https://rubygems.org/' gemspec gem 'benchmark' +gem 'json', git: 'https://github.com/byroot/json.git', branch: 'resumable-parser' local_gemfile = File.join(File.dirname(__FILE__), "Gemfile.local") if File.exist?(local_gemfile) diff --git a/fluentd.gemspec b/fluentd.gemspec index 7d34ddab72..96c763fd3e 100644 --- a/fluentd.gemspec +++ b/fluentd.gemspec @@ -29,7 +29,6 @@ Gem::Specification.new do |gem| gem.add_runtime_dependency("bundler") gem.add_runtime_dependency("msgpack", [">= 1.3.1", "< 2.0.0"]) - gem.add_runtime_dependency("yajl-ruby", ["~> 1.0"]) gem.add_runtime_dependency("cool.io", [">= 1.4.5", "< 2.0.0"]) gem.add_runtime_dependency("serverengine", [">= 2.3.2", "< 3.0.0"]) gem.add_runtime_dependency("http_parser.rb", [">= 0.5.1", "< 0.9.0"]) diff --git a/lib/fluent/compat/exec_util.rb b/lib/fluent/compat/exec_util.rb index 6a8eec2d9c..cfb5c9ac1c 100644 --- a/lib/fluent/compat/exec_util.rb +++ b/lib/fluent/compat/exec_util.rb @@ -16,7 +16,6 @@ require 'msgpack' require 'json' -require 'yajl' require 'fluent/engine' require 'fluent/plugin' @@ -78,9 +77,15 @@ def each_line(line) class JSONParser < Parser def call(io) - y = Yajl::Parser.new - y.on_parse_complete = @on_message - y.parse(io) + parser = JSON::Ext::ResumableParser.new({}) + buffer_size = 8192 + + while (chunk = io.read(buffer_size)) + parser << chunk + while parser.parse + @on_message.call(parser.value) + end + end end end diff --git a/lib/fluent/config/literal_parser.rb b/lib/fluent/config/literal_parser.rb index c30f752de9..a953c8d697 100644 --- a/lib/fluent/config/literal_parser.rb +++ b/lib/fluent/config/literal_parser.rb @@ -17,7 +17,6 @@ require 'stringio' require 'json' -require 'yajl' require 'socket' require 'ripper' diff --git a/lib/fluent/load.rb b/lib/fluent/load.rb index 1410536920..a5608350c4 100644 --- a/lib/fluent/load.rb +++ b/lib/fluent/load.rb @@ -4,7 +4,6 @@ require 'stringio' require 'fileutils' require 'json' -require 'yajl' require 'uri' require 'msgpack' require 'strptime' diff --git a/lib/fluent/plugin/in_forward.rb b/lib/fluent/plugin/in_forward.rb index 489c474261..d928f8c30a 100644 --- a/lib/fluent/plugin/in_forward.rb +++ b/lib/fluent/plugin/in_forward.rb @@ -17,7 +17,7 @@ require 'fluent/plugin/input' require 'fluent/msgpack_factory' -require 'yajl' +require 'json' require 'digest' require 'securerandom' @@ -248,13 +248,15 @@ def read_messages(conn, &block) unless feeder first = data[0] if first == '{' || first == '[' # json - parser = Yajl::Parser.new - parser.on_parse_complete = ->(obj){ - block.call(obj, bytes, serializer) - bytes = 0 - } + parser = JSON::Ext::ResumableParser.new({}) serializer = :to_json.to_proc - feeder = ->(d){ parser << d } + feeder = ->(d){ + parser << d + while parser.parse + block.call(parser.value, bytes, serializer) + bytes = 0 + end + } else # msgpack parser = Fluent::MessagePackFactory.msgpack_unpacker serializer = :to_msgpack.to_proc diff --git a/lib/fluent/plugin/in_unix.rb b/lib/fluent/plugin/in_unix.rb index 7f342871c2..9b3ad1be32 100644 --- a/lib/fluent/plugin/in_unix.rb +++ b/lib/fluent/plugin/in_unix.rb @@ -19,7 +19,7 @@ require 'fluent/msgpack_factory' require 'cool.io' -require 'yajl' +require 'json' require 'fileutils' require 'socket' @@ -158,8 +158,7 @@ def on_read(data) first = data[0] if first == '{'.freeze || first == '['.freeze m = method(:on_read_json) - @parser = Yajl::Parser.new - @parser.on_parse_complete = @on_message + @parser = JSON::Ext::ResumableParser.new({}) else m = method(:on_read_msgpack) @parser = Fluent::MessagePackFactory.msgpack_unpacker @@ -173,6 +172,9 @@ def on_read(data) def on_read_json(data) @parser << data + while @parser.parse + @on_message.call(@parser.value) + end rescue => e @log.error "unexpected error in json payload", error: e.to_s @log.error_backtrace diff --git a/lib/fluent/plugin/parser_json.rb b/lib/fluent/plugin/parser_json.rb index 05d2fe4921..34591af3d0 100644 --- a/lib/fluent/plugin/parser_json.rb +++ b/lib/fluent/plugin/parser_json.rb @@ -18,7 +18,6 @@ require 'fluent/time' require 'fluent/oj_options' -require 'yajl' require 'json' module Fluent @@ -28,12 +27,10 @@ class JSONParser < Parser config_set_default :time_key, 'time' desc 'Set JSON parser' + # NOTE: Contains yajl for backward compatibility config_param :json_parser, :enum, list: [:oj, :yajl, :json], default: :oj - # The Yajl library defines a default buffer size of 8KiB when parsing - # from IO streams, so maintain this for backwards-compatibility. - # https://www.rubydoc.info/github/brianmario/yajl-ruby/Yajl%2FParser:parse - desc 'Set the buffer size that Yajl will use when parsing streaming input' + desc 'Set the buffer size that JSON parser will use when parsing streaming input' config_param :stream_buffer_size, :integer, default: 8192 config_set_default :time_type, :float @@ -54,8 +51,8 @@ def configure_json_parser(name) log&.info "Oj is not installed, and failing back to JSON for json parser" configure_json_parser(:json) - when :json then [JSON.method(:parse), JSON::ParserError] - when :yajl then [Yajl.method(:load), Yajl::ParseError] + when :yajl, :json # NOTE: Fallback yajl to json for backward compatibility + [JSON.method(:parse), JSON::ParserError] else raise "BUG: unknown json parser specified: #{name}" end @@ -94,11 +91,15 @@ def parser_type end def parse_io(io, &block) - y = Yajl::Parser.new - y.on_parse_complete = ->(record){ - block.call(parse_time(record), record) - } - y.parse(io, @stream_buffer_size) + parser = JSON::Ext::ResumableParser.new({}) + while (chunk = io.read(@stream_buffer_size)) + parser << chunk + + while parser.parse + record = parser.value + block.call(parse_time(record), record) + end + end end end end diff --git a/test/plugin/test_parser_json.rb b/test/plugin/test_parser_json.rb index 1829d84087..86ff087057 100644 --- a/test/plugin/test_parser_json.rb +++ b/test/plugin/test_parser_json.rb @@ -11,7 +11,7 @@ def setup sub_test_case "configure_json_parser" do data("oj", [:oj, [Oj.method(:load), Oj::ParseError]]) data("json", [:json, [JSON.method(:parse), JSON::ParserError]]) - data("yajl", [:yajl, [Yajl.method(:load), Yajl::ParseError]]) + data("yajl", [:yajl, [JSON.method(:parse), JSON::ParserError]]) def test_return_each_loader((input, expected_return)) result = @parser.instance.configure_json_parser(input) assert_equal expected_return, result diff --git a/test/test_event_time.rb b/test/test_event_time.rb index 6ce82f5cc8..da1aadd554 100644 --- a/test/test_event_time.rb +++ b/test/test_event_time.rb @@ -1,7 +1,7 @@ require_relative 'helper' require 'timecop' require 'oj' -require 'yajl' +require 'json' class EventTimeTest < Test::Unit::TestCase setup do @@ -70,12 +70,6 @@ class EventTimeTest < Test::Unit::TestCase assert_equal('["tag",100,{"key":"value"}]', Oj.dump(["tag", time, {"key" => "value"}], mode: :compat)) end - test 'Yajl.dump' do - time = Fluent::EventTime.new(100) - assert_equal('{"time":100}', Yajl.dump({'time' => time})) - assert_equal('["tag",100,{"key":"value"}]', Yajl.dump(["tag", time, {"key" => "value"}])) - end - test '.from_time' do sec = 1000 usec = 2 diff --git a/test/test_formatter.rb b/test/test_formatter.rb index f0f67d3cdf..2204201bab 100644 --- a/test/test_formatter.rb +++ b/test/test_formatter.rb @@ -68,28 +68,28 @@ def test_format configure({}) formatted = @formatter.format(tag, @time, record) - assert_equal("#{time2str(@time)}\t#{tag}\t#{Yajl.dump(record)}#{@newline}", formatted) + assert_equal("#{time2str(@time)}\t#{tag}\t#{JSON.generate(record)}#{@newline}", formatted) end def test_format_without_time configure('output_time' => 'false') formatted = @formatter.format(tag, @time, record) - assert_equal("#{tag}\t#{Yajl.dump(record)}#{@newline}", formatted) + assert_equal("#{tag}\t#{JSON.generate(record)}#{@newline}", formatted) end def test_format_without_tag configure('output_tag' => 'false') formatted = @formatter.format(tag, @time, record) - assert_equal("#{time2str(@time)}\t#{Yajl.dump(record)}#{@newline}", formatted) + assert_equal("#{time2str(@time)}\t#{JSON.generate(record)}#{@newline}", formatted) end def test_format_without_time_and_tag configure('output_tag' => 'false', 'output_time' => 'false') formatted = @formatter.format('tag', @time, record) - assert_equal("#{Yajl.dump(record)}#{@newline}", formatted) + assert_equal("#{JSON.generate(record)}#{@newline}", formatted) end def test_format_without_time_and_tag_against_string_literal_configure @@ -100,7 +100,7 @@ def test_format_without_time_and_tag_against_string_literal_configure ]) formatted = @formatter.format('tag', @time, record) - assert_equal("#{Yajl.dump(record)}#{@newline}", formatted) + assert_equal("#{JSON.generate(record)}#{@newline}", formatted) end end @@ -122,7 +122,7 @@ def test_format(data) @formatter.configure('json_parser' => data) formatted = @formatter.format(tag, @time, record) - assert_equal("#{Yajl.dump(record)}#{@newline}", formatted) + assert_equal("#{JSON.generate(record)}#{@newline}", formatted) end end From 167a8c4bc86e6c8d818380b541891c92bf8acaf9 Mon Sep 17 00:00:00 2001 From: Shizuo Fujita Date: Tue, 16 Jun 2026 10:35:23 +0900 Subject: [PATCH 2/2] Adapt to upstream namespace change for JSON::ResumableParser Signed-off-by: Shizuo Fujita --- Gemfile | 2 +- lib/fluent/compat/exec_util.rb | 2 +- lib/fluent/plugin/in_forward.rb | 2 +- lib/fluent/plugin/in_unix.rb | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/Gemfile b/Gemfile index 14eb9b4e17..6a65e8462a 100644 --- a/Gemfile +++ b/Gemfile @@ -3,7 +3,7 @@ source 'https://rubygems.org/' gemspec gem 'benchmark' -gem 'json', git: 'https://github.com/byroot/json.git', branch: 'resumable-parser' +gem 'json', git: 'https://github.com/ruby/json.git' local_gemfile = File.join(File.dirname(__FILE__), "Gemfile.local") if File.exist?(local_gemfile) diff --git a/lib/fluent/compat/exec_util.rb b/lib/fluent/compat/exec_util.rb index cfb5c9ac1c..1876c2c0ae 100644 --- a/lib/fluent/compat/exec_util.rb +++ b/lib/fluent/compat/exec_util.rb @@ -77,7 +77,7 @@ def each_line(line) class JSONParser < Parser def call(io) - parser = JSON::Ext::ResumableParser.new({}) + parser = JSON::ResumableParser.new({}) buffer_size = 8192 while (chunk = io.read(buffer_size)) diff --git a/lib/fluent/plugin/in_forward.rb b/lib/fluent/plugin/in_forward.rb index d928f8c30a..dde260a613 100644 --- a/lib/fluent/plugin/in_forward.rb +++ b/lib/fluent/plugin/in_forward.rb @@ -248,7 +248,7 @@ def read_messages(conn, &block) unless feeder first = data[0] if first == '{' || first == '[' # json - parser = JSON::Ext::ResumableParser.new({}) + parser = JSON::ResumableParser.new({}) serializer = :to_json.to_proc feeder = ->(d){ parser << d diff --git a/lib/fluent/plugin/in_unix.rb b/lib/fluent/plugin/in_unix.rb index 9b3ad1be32..1196e54175 100644 --- a/lib/fluent/plugin/in_unix.rb +++ b/lib/fluent/plugin/in_unix.rb @@ -158,7 +158,7 @@ def on_read(data) first = data[0] if first == '{'.freeze || first == '['.freeze m = method(:on_read_json) - @parser = JSON::Ext::ResumableParser.new({}) + @parser = JSON::ResumableParser.new({}) else m = method(:on_read_msgpack) @parser = Fluent::MessagePackFactory.msgpack_unpacker