diff --git a/ruby/Gemfile.lock b/ruby/Gemfile.lock index 6f0f5cc8..47237fd7 100644 --- a/ruby/Gemfile.lock +++ b/ruby/Gemfile.lock @@ -1,7 +1,7 @@ PATH remote: . specs: - ci-queue (0.87.0) + ci-queue (0.88.0) logger GEM @@ -43,6 +43,8 @@ GEM builder minitest (>= 5.0) ruby-progressbar + mocha (3.1.0) + ruby2_keywords (>= 0.0.5) msgpack (1.8.0) parallel (1.27.0) parser (3.3.10.2) @@ -86,6 +88,7 @@ GEM parser (>= 3.3.7.2) prism (~> 1.7) ruby-progressbar (1.13.0) + ruby2_keywords (0.0.5) securerandom (0.4.1) simplecov (0.22.0) docile (~> 1.1) @@ -112,6 +115,7 @@ DEPENDENCIES ci-queue! minitest (~> 5.11) minitest-reporters (~> 1.1) + mocha msgpack rake redis diff --git a/ruby/ci-queue.gemspec b/ruby/ci-queue.gemspec index 3f4f69a9..ae55143f 100644 --- a/ruby/ci-queue.gemspec +++ b/ruby/ci-queue.gemspec @@ -40,6 +40,7 @@ Gem::Specification.new do |spec| spec.add_development_dependency 'redis' spec.add_development_dependency 'simplecov', '~> 0.12' spec.add_development_dependency 'minitest-reporters', '~> 1.1' + spec.add_development_dependency 'mocha' spec.add_development_dependency 'rexml' spec.add_development_dependency 'snappy' diff --git a/ruby/lib/ci/queue/redis/base.rb b/ruby/lib/ci/queue/redis/base.rb index db835912..0ead2884 100644 --- a/ruby/lib/ci/queue/redis/base.rb +++ b/ruby/lib/ci/queue/redis/base.rb @@ -264,6 +264,8 @@ def resolve_lua_includes(script, root) end class HeartbeatProcess + MAX_RESTART_ATTEMPTS = 3 + def initialize(redis_url, zset_key, owners_key, leases_key) @redis_url = redis_url @zset_key = zset_key @@ -313,10 +315,28 @@ def shutdown! def tick!(id, lease) send_message(:tick!, id: id, lease: lease.to_s) + @restart_attempts = 0 + rescue IOError, Errno::EPIPE => error + @restart_attempts = (@restart_attempts || 0) + 1 + raise if @restart_attempts > MAX_RESTART_ATTEMPTS + + restart! + retry end private + def restart! + @pipe.close rescue nil + begin + Process.kill(:TERM, @pid) + Process.waitpid2(@pid) + rescue Errno::ESRCH, Errno::ECHILD + nil + end + boot! + end + def send_message(*message) payload = message.to_json @pipe.write([payload.bytesize].pack("L").b, payload) diff --git a/ruby/lib/ci/queue/version.rb b/ruby/lib/ci/queue/version.rb index b142f74c..1a6afbee 100644 --- a/ruby/lib/ci/queue/version.rb +++ b/ruby/lib/ci/queue/version.rb @@ -2,7 +2,7 @@ module CI module Queue - VERSION = '0.87.0' + VERSION = '0.88.0' DEV_SCRIPTS_ROOT = ::File.expand_path('../../../../../redis', __FILE__) RELEASE_SCRIPTS_ROOT = ::File.expand_path('../redis', __FILE__) end diff --git a/ruby/test/ci/queue/redis/heartbeat_process_test.rb b/ruby/test/ci/queue/redis/heartbeat_process_test.rb new file mode 100644 index 00000000..31ef1d0c --- /dev/null +++ b/ruby/test/ci/queue/redis/heartbeat_process_test.rb @@ -0,0 +1,55 @@ +# frozen_string_literal: true +require 'test_helper' + +class CI::Queue::Redis::Base::HeartbeatProcessTest < Minitest::Test + MAX = CI::Queue::Redis::Base::HeartbeatProcess::MAX_RESTART_ATTEMPTS + + def setup + @hp = CI::Queue::Redis::Base::HeartbeatProcess.new( + 'redis://localhost:6379/0', + 'zset', 'owners', 'leases' + ) + # boot! and restart! must not spawn real processes + @hp.stubs(:boot!) + @hp.stubs(:restart!) + end + + def test_tick_retries_after_pipe_ioerror + @hp.expects(:send_message).twice.raises(IOError, "closed stream").then.returns(nil) + + @hp.tick!("test_id", "lease_id") + end + + def test_tick_retries_after_epipe + @hp.expects(:send_message).twice.raises(Errno::EPIPE).then.returns(nil) + + @hp.tick!("test_id", "lease_id") + end + + def test_tick_calls_restart_on_pipe_error + @hp.stubs(:send_message).raises(IOError, "closed stream").then.returns(nil) + @hp.expects(:restart!).once + + @hp.tick!("test_id", "lease_id") + end + + def test_tick_raises_after_max_restart_attempts + @hp.stubs(:send_message).raises(IOError, "closed stream") + + assert_raises(IOError) do + (MAX + 1).times { @hp.tick!("test_id", "lease_id") } + end + end + + def test_restart_counter_resets_after_success + # Build a sequence: [raise, return] * (MAX+1). + # Without @restart_attempts = 0 on success, the (MAX+1)th failure would exceed MAX and raise. + stub = @hp.stubs(:send_message) + (MAX + 1).times do |i| + stub = stub.raises(IOError, "closed stream").then.returns(nil) + stub = stub.then unless i == MAX + end + + (MAX + 1).times { @hp.tick!("test_id", "lease_id") } + end +end diff --git a/ruby/test/test_helper.rb b/ruby/test/test_helper.rb index 1988b7de..b0a8ed8a 100644 --- a/ruby/test/test_helper.rb +++ b/ruby/test/test_helper.rb @@ -9,6 +9,7 @@ require 'ci/queue/redis' require 'minitest/queue' require 'minitest/autorun' +require 'mocha/minitest' Minitest::Reporters.use!([Minitest::Reporters::SpecReporter.new])