From 1045adb2a43236ee89b61870f22b7e998bce8e94 Mon Sep 17 00:00:00 2001 From: Andrew Cholakian Date: Tue, 21 Apr 2026 15:57:08 -0500 Subject: [PATCH 1/2] Ensure do_close failures don't halt pipeline shutdown Route plugin do_close calls in register_plugins rollback and shutdown_workers through close_plugin_and_ignore so one plugin raising doesn't leave remaining filters/outputs unclosed. Also fix the log key to use the instance-level config_name. Co-Authored-By: Claude Opus 4.7 (1M context) --- logstash-core/lib/logstash/java_pipeline.rb | 8 +- .../spec/logstash/java_pipeline_spec.rb | 100 ++++++++++++++++++ 2 files changed, 104 insertions(+), 4 deletions(-) diff --git a/logstash-core/lib/logstash/java_pipeline.rb b/logstash-core/lib/logstash/java_pipeline.rb index 71dfd5bc50..19147a6fb6 100644 --- a/logstash-core/lib/logstash/java_pipeline.rb +++ b/logstash-core/lib/logstash/java_pipeline.rb @@ -255,7 +255,7 @@ def register_plugins(plugins) registered << plugin end rescue => e - registered.each(&:do_close) + registered.each { |plugin| close_plugin_and_ignore(plugin) } raise e end @@ -523,8 +523,8 @@ def shutdown_workers t.join end - filters.each(&:do_close) - outputs.each(&:do_close) + filters.each { |plugin| close_plugin_and_ignore(plugin) } + outputs.each { |plugin| close_plugin_and_ignore(plugin) } end # for backward compatibility in devutils for the rspec helpers, this method is not used @@ -618,7 +618,7 @@ def close_plugin_and_ignore(plugin) rescue => e @logger.warn( "plugin raised exception while closing, ignoring", - exception_logging_keys(e, :plugin => plugin.class.config_name)) + exception_logging_keys(e, :plugin => plugin.config_name)) end end diff --git a/logstash-core/spec/logstash/java_pipeline_spec.rb b/logstash-core/spec/logstash/java_pipeline_spec.rb index 571e9cb806..e75134f2aa 100644 --- a/logstash-core/spec/logstash/java_pipeline_spec.rb +++ b/logstash-core/spec/logstash/java_pipeline_spec.rb @@ -790,6 +790,106 @@ def flush(options) expect(pipeline.crashed?).to be false end end + + context "when a filter raises during do_close" do + before(:each) do + allow(LogStash::Plugin).to receive(:lookup).with("filter", "dummyfilter").and_return(DummyFilter) + end + + let(:test_config_with_two_filters) { + <<-eos + input { dummyinput {} } + filter { + dummyfilter { id => "filter_1" } + dummyfilter { id => "filter_2" } + } + output { dummyoutput {} } + eos + } + let(:pipeline) { mock_java_pipeline_from_string(test_config_with_two_filters, mock_settings("pipeline.batch.metrics.sampling_mode" => batch_sampling_mode)) } + let(:raising_filter) { pipeline.filters.first } + let(:remaining_filter) { pipeline.filters.last } + let(:output) { pipeline.outputs.first } + let(:input) { pipeline.inputs.first } + + before { allow(raising_filter).to receive(:do_close).and_raise("boom from filter_1 do_close") } + + it "still closes the remaining filter and the output" do + # Core assertion: shutdown_workers must keep iterating past the raise. + expect(remaining_filter).to receive(:do_close).once + expect(output).to receive(:do_close).once + # Input close runs on a separate path (inputworker#ensure); + # asserted here as a regression guard against future coupling. + expect(input).to receive(:do_close).once + expect(pipeline.logger).to receive(:warn).with( + "plugin raised exception while closing, ignoring", + hash_including(:plugin => "dummyfilter")) + + pipeline.start + pipeline.shutdown + + expect(pipeline.crashed?).to be false + end + end + + context "when an output raises during do_close" do + before(:each) do + allow(LogStash::Plugin).to receive(:lookup).with("output", "dummyoutputmore").and_return(DummyOutputMore) + end + + let(:test_config_with_two_outputs) { + <<-eos + input { dummyinput {} } + output { + dummyoutput {} + dummyoutputmore {} + } + eos + } + let(:pipeline) { mock_java_pipeline_from_string(test_config_with_two_outputs, mock_settings("pipeline.batch.metrics.sampling_mode" => batch_sampling_mode)) } + let(:raising_output) { pipeline.outputs.first } + let(:remaining_output) { pipeline.outputs.last } + let(:input) { pipeline.inputs.first } + + before { allow(raising_output).to receive(:do_close).and_raise("boom from first output do_close") } + + it "still closes the remaining output" do + # Core assertion: shutdown_workers must keep iterating past the raise. + expect(remaining_output).to receive(:do_close).once + # Input close runs on a separate path (inputworker#ensure); + # asserted here as a regression guard against future coupling. + expect(input).to receive(:do_close).once + expect(pipeline.logger).to receive(:warn).with( + "plugin raised exception while closing, ignoring", + hash_including(:plugin => "dummyoutput")) + + pipeline.start + pipeline.shutdown + + expect(pipeline.crashed?).to be false + end + end + + context "when register_plugins rolls back and a registered plugin's do_close raises" do + let(:pipeline) { mock_java_pipeline_from_string(test_config_without_output_workers, mock_settings("pipeline.batch.metrics.sampling_mode" => batch_sampling_mode)) } + + after { pipeline.close rescue nil } + + it "closes every already-registered plugin and re-raises the original register error" do + good = ::LogStash::Outputs::DummyOutput.new + bad_close = ::LogStash::Outputs::DummyOutput.new + failing_register = ::LogStash::Outputs::DummyOutput.new + register_error = RuntimeError.new("register failed") + + allow(failing_register).to receive(:register).and_raise(register_error) + expect(bad_close).to receive(:do_close).and_raise("boom from do_close") + expect(good).to receive(:do_close).and_call_original + + expect { + pipeline.register_plugins([good, bad_close, failing_register]) + }.to raise_error(register_error) + end + end end context "with no explicit ids declared" do From c5004f7e21c29faff4ef22b83cf57a9931fdeac8 Mon Sep 17 00:00:00 2001 From: Andrew Cholakian Date: Tue, 21 Apr 2026 18:02:23 -0500 Subject: [PATCH 2/2] Fix CI failures in do_close spec log assertions MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Filter context: unrelated safe_pipeline_worker_count warn during start_workers tripped the strict expect().to receive(:warn).with(...) expectation. Add allow(logger).to receive(:warn) as a pass-through. Output context: pipeline.outputs.first on CI resolves to dummyoutputmore, not dummyoutput — output ordering isn't guaranteed to match config text order. Assert against raising_output.config_name so the match tracks the plugin actually stubbed to raise. Co-Authored-By: Claude Opus 4.7 (1M context) --- logstash-core/spec/logstash/java_pipeline_spec.rb | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/logstash-core/spec/logstash/java_pipeline_spec.rb b/logstash-core/spec/logstash/java_pipeline_spec.rb index e75134f2aa..c318f5a4fd 100644 --- a/logstash-core/spec/logstash/java_pipeline_spec.rb +++ b/logstash-core/spec/logstash/java_pipeline_spec.rb @@ -821,9 +821,10 @@ def flush(options) # Input close runs on a separate path (inputworker#ensure); # asserted here as a regression guard against future coupling. expect(input).to receive(:do_close).once + allow(pipeline.logger).to receive(:warn) expect(pipeline.logger).to receive(:warn).with( "plugin raised exception while closing, ignoring", - hash_including(:plugin => "dummyfilter")) + hash_including(:plugin => raising_filter.config_name)) pipeline.start pipeline.shutdown @@ -859,9 +860,10 @@ def flush(options) # Input close runs on a separate path (inputworker#ensure); # asserted here as a regression guard against future coupling. expect(input).to receive(:do_close).once + allow(pipeline.logger).to receive(:warn) expect(pipeline.logger).to receive(:warn).with( "plugin raised exception while closing, ignoring", - hash_including(:plugin => "dummyoutput")) + hash_including(:plugin => raising_output.config_name)) pipeline.start pipeline.shutdown