diff --git a/logstash-core/lib/logstash/java_pipeline.rb b/logstash-core/lib/logstash/java_pipeline.rb index 71dfd5bc50..19147a6fb6 100644 --- a/logstash-core/lib/logstash/java_pipeline.rb +++ b/logstash-core/lib/logstash/java_pipeline.rb @@ -255,7 +255,7 @@ def register_plugins(plugins) registered << plugin end rescue => e - registered.each(&:do_close) + registered.each { |plugin| close_plugin_and_ignore(plugin) } raise e end @@ -523,8 +523,8 @@ def shutdown_workers t.join end - filters.each(&:do_close) - outputs.each(&:do_close) + filters.each { |plugin| close_plugin_and_ignore(plugin) } + outputs.each { |plugin| close_plugin_and_ignore(plugin) } end # for backward compatibility in devutils for the rspec helpers, this method is not used @@ -618,7 +618,7 @@ def close_plugin_and_ignore(plugin) rescue => e @logger.warn( "plugin raised exception while closing, ignoring", - exception_logging_keys(e, :plugin => plugin.class.config_name)) + exception_logging_keys(e, :plugin => plugin.config_name)) end end diff --git a/logstash-core/spec/logstash/java_pipeline_spec.rb b/logstash-core/spec/logstash/java_pipeline_spec.rb index 571e9cb806..c318f5a4fd 100644 --- a/logstash-core/spec/logstash/java_pipeline_spec.rb +++ b/logstash-core/spec/logstash/java_pipeline_spec.rb @@ -790,6 +790,108 @@ def flush(options) expect(pipeline.crashed?).to be false end end + + context "when a filter raises during do_close" do + before(:each) do + allow(LogStash::Plugin).to receive(:lookup).with("filter", "dummyfilter").and_return(DummyFilter) + end + + let(:test_config_with_two_filters) { + <<-eos + input { dummyinput {} } + filter { + dummyfilter { id => "filter_1" } + dummyfilter { id => "filter_2" } + } + output { dummyoutput {} } + eos + } + let(:pipeline) { mock_java_pipeline_from_string(test_config_with_two_filters, mock_settings("pipeline.batch.metrics.sampling_mode" => batch_sampling_mode)) } + let(:raising_filter) { pipeline.filters.first } + let(:remaining_filter) { pipeline.filters.last } + let(:output) { pipeline.outputs.first } + let(:input) { pipeline.inputs.first } + + before { allow(raising_filter).to receive(:do_close).and_raise("boom from filter_1 do_close") } + + it "still closes the remaining filter and the output" do + # Core assertion: shutdown_workers must keep iterating past the raise. + expect(remaining_filter).to receive(:do_close).once + expect(output).to receive(:do_close).once + # Input close runs on a separate path (inputworker#ensure); + # asserted here as a regression guard against future coupling. + expect(input).to receive(:do_close).once + allow(pipeline.logger).to receive(:warn) + expect(pipeline.logger).to receive(:warn).with( + "plugin raised exception while closing, ignoring", + hash_including(:plugin => raising_filter.config_name)) + + pipeline.start + pipeline.shutdown + + expect(pipeline.crashed?).to be false + end + end + + context "when an output raises during do_close" do + before(:each) do + allow(LogStash::Plugin).to receive(:lookup).with("output", "dummyoutputmore").and_return(DummyOutputMore) + end + + let(:test_config_with_two_outputs) { + <<-eos + input { dummyinput {} } + output { + dummyoutput {} + dummyoutputmore {} + } + eos + } + let(:pipeline) { mock_java_pipeline_from_string(test_config_with_two_outputs, mock_settings("pipeline.batch.metrics.sampling_mode" => batch_sampling_mode)) } + let(:raising_output) { pipeline.outputs.first } + let(:remaining_output) { pipeline.outputs.last } + let(:input) { pipeline.inputs.first } + + before { allow(raising_output).to receive(:do_close).and_raise("boom from first output do_close") } + + it "still closes the remaining output" do + # Core assertion: shutdown_workers must keep iterating past the raise. + expect(remaining_output).to receive(:do_close).once + # Input close runs on a separate path (inputworker#ensure); + # asserted here as a regression guard against future coupling. + expect(input).to receive(:do_close).once + allow(pipeline.logger).to receive(:warn) + expect(pipeline.logger).to receive(:warn).with( + "plugin raised exception while closing, ignoring", + hash_including(:plugin => raising_output.config_name)) + + pipeline.start + pipeline.shutdown + + expect(pipeline.crashed?).to be false + end + end + + context "when register_plugins rolls back and a registered plugin's do_close raises" do + let(:pipeline) { mock_java_pipeline_from_string(test_config_without_output_workers, mock_settings("pipeline.batch.metrics.sampling_mode" => batch_sampling_mode)) } + + after { pipeline.close rescue nil } + + it "closes every already-registered plugin and re-raises the original register error" do + good = ::LogStash::Outputs::DummyOutput.new + bad_close = ::LogStash::Outputs::DummyOutput.new + failing_register = ::LogStash::Outputs::DummyOutput.new + register_error = RuntimeError.new("register failed") + + allow(failing_register).to receive(:register).and_raise(register_error) + expect(bad_close).to receive(:do_close).and_raise("boom from do_close") + expect(good).to receive(:do_close).and_call_original + + expect { + pipeline.register_plugins([good, bad_close, failing_register]) + }.to raise_error(register_error) + end + end end context "with no explicit ids declared" do