Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions logstash-core/lib/logstash/java_pipeline.rb
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ def register_plugins(plugins)
registered << plugin
end
rescue => e
registered.each(&:do_close)
registered.each { |plugin| close_plugin_and_ignore(plugin) }
raise e
end

Expand Down Expand Up @@ -523,8 +523,8 @@ def shutdown_workers
t.join
end

filters.each(&:do_close)
outputs.each(&:do_close)
filters.each { |plugin| close_plugin_and_ignore(plugin) }
outputs.each { |plugin| close_plugin_and_ignore(plugin) }
end

# for backward compatibility in devutils for the rspec helpers, this method is not used
Expand Down Expand Up @@ -618,7 +618,7 @@ def close_plugin_and_ignore(plugin)
rescue => e
@logger.warn(
"plugin raised exception while closing, ignoring",
exception_logging_keys(e, :plugin => plugin.class.config_name))
exception_logging_keys(e, :plugin => plugin.config_name))

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Delegators prevented the previous code from working for filters / outputs. Now we prefer the instance form.

end
end

Expand Down
102 changes: 102 additions & 0 deletions logstash-core/spec/logstash/java_pipeline_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -790,6 +790,108 @@ def flush(options)
expect(pipeline.crashed?).to be false
end
end

context "when a filter raises during do_close" do
before(:each) do
allow(LogStash::Plugin).to receive(:lookup).with("filter", "dummyfilter").and_return(DummyFilter)
end

let(:test_config_with_two_filters) {
<<-eos
input { dummyinput {} }
filter {
dummyfilter { id => "filter_1" }
dummyfilter { id => "filter_2" }
}
output { dummyoutput {} }
eos
}
let(:pipeline) { mock_java_pipeline_from_string(test_config_with_two_filters, mock_settings("pipeline.batch.metrics.sampling_mode" => batch_sampling_mode)) }
let(:raising_filter) { pipeline.filters.first }
let(:remaining_filter) { pipeline.filters.last }
let(:output) { pipeline.outputs.first }
let(:input) { pipeline.inputs.first }

before { allow(raising_filter).to receive(:do_close).and_raise("boom from filter_1 do_close") }

it "still closes the remaining filter and the output" do
# Core assertion: shutdown_workers must keep iterating past the raise.
expect(remaining_filter).to receive(:do_close).once
expect(output).to receive(:do_close).once
# Input close runs on a separate path (inputworker#ensure);
# asserted here as a regression guard against future coupling.
expect(input).to receive(:do_close).once
allow(pipeline.logger).to receive(:warn)
expect(pipeline.logger).to receive(:warn).with(
"plugin raised exception while closing, ignoring",
hash_including(:plugin => raising_filter.config_name))

pipeline.start
pipeline.shutdown

expect(pipeline.crashed?).to be false
end
end

context "when an output raises during do_close" do
before(:each) do
allow(LogStash::Plugin).to receive(:lookup).with("output", "dummyoutputmore").and_return(DummyOutputMore)
end

let(:test_config_with_two_outputs) {
<<-eos
input { dummyinput {} }
output {
dummyoutput {}
dummyoutputmore {}
}
eos
}
let(:pipeline) { mock_java_pipeline_from_string(test_config_with_two_outputs, mock_settings("pipeline.batch.metrics.sampling_mode" => batch_sampling_mode)) }
let(:raising_output) { pipeline.outputs.first }
let(:remaining_output) { pipeline.outputs.last }
let(:input) { pipeline.inputs.first }

before { allow(raising_output).to receive(:do_close).and_raise("boom from first output do_close") }

it "still closes the remaining output" do
# Core assertion: shutdown_workers must keep iterating past the raise.
expect(remaining_output).to receive(:do_close).once
# Input close runs on a separate path (inputworker#ensure);
# asserted here as a regression guard against future coupling.
expect(input).to receive(:do_close).once
allow(pipeline.logger).to receive(:warn)
expect(pipeline.logger).to receive(:warn).with(
"plugin raised exception while closing, ignoring",
hash_including(:plugin => raising_output.config_name))

pipeline.start
pipeline.shutdown

expect(pipeline.crashed?).to be false
end
end

context "when register_plugins rolls back and a registered plugin's do_close raises" do
let(:pipeline) { mock_java_pipeline_from_string(test_config_without_output_workers, mock_settings("pipeline.batch.metrics.sampling_mode" => batch_sampling_mode)) }

after { pipeline.close rescue nil }

it "closes every already-registered plugin and re-raises the original register error" do
good = ::LogStash::Outputs::DummyOutput.new
bad_close = ::LogStash::Outputs::DummyOutput.new
failing_register = ::LogStash::Outputs::DummyOutput.new
register_error = RuntimeError.new("register failed")

allow(failing_register).to receive(:register).and_raise(register_error)
expect(bad_close).to receive(:do_close).and_raise("boom from do_close")
expect(good).to receive(:do_close).and_call_original

expect {
pipeline.register_plugins([good, bad_close, failing_register])
}.to raise_error(register_error)
end
end
end

context "with no explicit ids declared" do
Expand Down
Loading