diff --git a/python/ciqueue/distributed.py b/python/ciqueue/distributed.py index e8b9acc4..1fd209b6 100644 --- a/python/ciqueue/distributed.py +++ b/python/ciqueue/distributed.py @@ -62,6 +62,7 @@ def __init__(self, tests, worker_id, redis, build_id, # pylint: disable=too-man super(Worker, self).__init__(redis=redis, build_id=build_id) self.timeout = timeout self.total = len(tests) + self._leases = {} self.max_requeues = max_requeues self.global_max_requeues = math.ceil(len(tests) * requeue_tolerance) self.worker_id = worker_id @@ -73,7 +74,7 @@ def poll(): while not self.shutdown_required and len(self): # pylint: disable=len-as-condition test = self._reserve() if test: - yield test.decode() + yield test.decode() if isinstance(test, bytes) else test else: time.sleep(0.05) @@ -88,6 +89,7 @@ def shutdown(self): self.shutdown_required = True def acknowledge(self, test): + lease = self._leases.pop(test, '') return self._eval_script( 'acknowledge', keys=[ @@ -96,14 +98,16 @@ def acknowledge(self, test): self.key('owners'), self.key('error-reports'), self.key('requeued-by'), + self.key('leases'), ], - args=[test, '', 0], + args=[test, '', 0, lease], ) == 1 def requeue(self, test, offset=42): if not (self.max_requeues > 0 and self.global_max_requeues > 0.0): return False + lease = self._leases.get(test, '') return self._eval_script( 'requeue', keys=[ @@ -115,8 +119,9 @@ def requeue(self, test, offset=42): self.key('owners'), self.key('error-reports'), self.key('requeued-by'), + self.key('leases'), ], - args=[self.max_requeues, self.global_max_requeues, test, offset, 0], + args=[self.max_requeues, self.global_max_requeues, test, offset, 0, lease], ) == 1 def retry_queue(self): @@ -154,7 +159,18 @@ def _register(self): self.redis.sadd(self.key('workers'), self.worker_id) def _reserve(self): - return self._try_to_reserve_lost_test() or self._try_to_reserve_test() + result = self._try_to_reserve_lost_test() or self._try_to_reserve_test() + if result: + if isinstance(result, list): + entry, lease = result[0], result[1] + # Store lease keyed by the decoded entry (acknowledge receives decoded strings) + entry_str = entry.decode() if isinstance(entry, bytes) else entry + lease_str = lease.decode() if isinstance(lease, bytes) else str(lease) + self._leases[entry_str] = lease_str + return entry # Return raw (bytes or str) — poll() handles .decode() + else: + return result + return result def _try_to_reserve_lost_test(self): if self.timeout: @@ -168,6 +184,8 @@ def _try_to_reserve_lost_test(self): self.worker_id, 'queue'), self.key('owners'), + self.key('leases'), + self.key('lease-counter'), ], args=[time.time(), self.timeout], ) @@ -183,6 +201,8 @@ def _try_to_reserve_test(self): self.key('owners'), self.key('requeued-by'), self.key('workers'), + self.key('leases'), + self.key('lease-counter'), ], args=[ time.time(), diff --git a/redis/acknowledge.lua b/redis/acknowledge.lua index 4ca40729..233522d5 100644 --- a/redis/acknowledge.lua +++ b/redis/acknowledge.lua @@ -3,12 +3,23 @@ local processed_key = KEYS[2] local owners_key = KEYS[3] local error_reports_key = KEYS[4] local requeued_by_key = KEYS[5] +local leases_key = KEYS[6] local entry = ARGV[1] local error = ARGV[2] local ttl = ARGV[3] -redis.call('zrem', zset_key, entry) -redis.call('hdel', owners_key, entry) -- Doesn't matter if it was reclaimed by another workers +local lease_id = ARGV[4] + +-- Only the current lease holder can remove the entry from the running set. +-- If the lease was transferred (e.g. via reserve_lost), the stale worker +-- must not remove the running entry — that would let the supervisor think +-- the queue is exhausted while the new lease holder is still processing. +if tostring(redis.call('hget', leases_key, entry)) == lease_id then + redis.call('zrem', zset_key, entry) + redis.call('hdel', owners_key, entry) + redis.call('hdel', leases_key, entry) +end + redis.call('hdel', requeued_by_key, entry) local acknowledged = redis.call('sadd', processed_key, entry) == 1 diff --git a/redis/heartbeat.lua b/redis/heartbeat.lua index 4c2bc64c..91ccd244 100644 --- a/redis/heartbeat.lua +++ b/redis/heartbeat.lua @@ -1,17 +1,16 @@ local zset_key = KEYS[1] -local processed_key = KEYS[2] -local owners_key = KEYS[3] -local worker_queue_key = KEYS[4] +local leases_key = KEYS[2] local current_time = ARGV[1] local entry = ARGV[2] +local lease_id = ARGV[3] --- already processed, we do not need to bump the timestamp -if redis.call('sismember', processed_key, entry) == 1 then - return false -end - --- we're still the owner of the test, we can bump the timestamp -if redis.call('hget', owners_key, entry) == worker_queue_key then +-- Only the current lease holder can bump the timestamp. +-- We intentionally do NOT check the processed set. A non-owner worker's +-- acknowledge can add the entry to processed, which would poison the +-- current lease holder's heartbeat if we checked it here. +-- The lease check alone is sufficient — once the lease holder acknowledges, +-- they zrem + hdel the lease, so the heartbeat will naturally stop. +if tostring(redis.call('hget', leases_key, entry)) == lease_id then return redis.call('zadd', zset_key, current_time, entry) end diff --git a/redis/release.lua b/redis/release.lua index 4bcc5929..4b2c7c88 100644 --- a/redis/release.lua +++ b/redis/release.lua @@ -1,6 +1,7 @@ local zset_key = KEYS[1] local worker_queue_key = KEYS[2] local owners_key = KEYS[3] +local leases_key = KEYS[4] -- owned_tests = {"SomeTest", "worker:1", "SomeOtherTest", "worker:2", ...} local owned_tests = redis.call('hgetall', owners_key) @@ -8,6 +9,7 @@ for index, owner_or_test in ipairs(owned_tests) do if owner_or_test == worker_queue_key then -- If we owned a test local test = owned_tests[index - 1] redis.call('zadd', zset_key, "0", test) -- We expire the lease immediately + redis.call('hdel', leases_key, test) return nil end end diff --git a/redis/requeue.lua b/redis/requeue.lua index e01e61d3..00d54e40 100644 --- a/redis/requeue.lua +++ b/redis/requeue.lua @@ -6,15 +6,20 @@ local worker_queue_key = KEYS[5] local owners_key = KEYS[6] local error_reports_key = KEYS[7] local requeued_by_key = KEYS[8] +local leases_key = KEYS[9] local max_requeues = tonumber(ARGV[1]) local global_max_requeues = tonumber(ARGV[2]) local entry = ARGV[3] local offset = ARGV[4] local ttl = tonumber(ARGV[5]) +local lease_id = ARGV[6] -if redis.call('hget', owners_key, entry) == worker_queue_key then - redis.call('hdel', owners_key, entry) +-- Only the current lease holder can requeue a test. +-- If the lease was transferred (e.g. via reserve_lost), reject the stale +-- worker's requeue so the running entry stays intact for the new holder. +if tostring(redis.call('hget', leases_key, entry)) ~= lease_id then + return false end if redis.call('sismember', processed_key, entry) == 1 then @@ -48,6 +53,8 @@ if ttl and ttl > 0 then redis.call('expire', requeued_by_key, ttl) end +redis.call('hdel', owners_key, entry) +redis.call('hdel', leases_key, entry) redis.call('zrem', zset_key, entry) return true diff --git a/redis/reserve.lua b/redis/reserve.lua index 265a37ab..928dd4d0 100644 --- a/redis/reserve.lua +++ b/redis/reserve.lua @@ -5,6 +5,8 @@ local worker_queue_key = KEYS[4] local owners_key = KEYS[5] local requeued_by_key = KEYS[6] local workers_key = KEYS[7] +local leases_key = KEYS[8] +local lease_counter_key = KEYS[9] local current_time = ARGV[1] local defer_offset = tonumber(ARGV[2]) or 0 @@ -19,6 +21,15 @@ local function insert_with_offset(test) end end +local function claim_test(test) + local lease = redis.call('incr', lease_counter_key) + redis.call('zadd', zset_key, current_time, test) + redis.call('lpush', worker_queue_key, test) + redis.call('hset', owners_key, test, worker_queue_key) + redis.call('hset', leases_key, test, lease) + return {test, tostring(lease)} +end + for attempt = 1, max_skip_attempts do local test = redis.call('rpop', queue_key) if not test then @@ -30,10 +41,7 @@ for attempt = 1, max_skip_attempts do -- If this build only has one worker, allow immediate self-pickup. if redis.call('scard', workers_key) <= 1 then redis.call('hdel', requeued_by_key, test) - redis.call('zadd', zset_key, current_time, test) - redis.call('lpush', worker_queue_key, test) - redis.call('hset', owners_key, test, worker_queue_key) - return test + return claim_test(test) end insert_with_offset(test) @@ -46,10 +54,7 @@ for attempt = 1, max_skip_attempts do end else redis.call('hdel', requeued_by_key, test) - redis.call('zadd', zset_key, current_time, test) - redis.call('lpush', worker_queue_key, test) - redis.call('hset', owners_key, test, worker_queue_key) - return test + return claim_test(test) end end diff --git a/redis/reserve_lost.lua b/redis/reserve_lost.lua index 9dfaa616..e0a2c009 100644 --- a/redis/reserve_lost.lua +++ b/redis/reserve_lost.lua @@ -2,6 +2,8 @@ local zset_key = KEYS[1] local processed_key = KEYS[2] local worker_queue_key = KEYS[3] local owners_key = KEYS[4] +local leases_key = KEYS[5] +local lease_counter_key = KEYS[6] local current_time = ARGV[1] local timeout = ARGV[2] @@ -9,10 +11,19 @@ local timeout = ARGV[2] local lost_tests = redis.call('zrangebyscore', zset_key, 0, current_time - timeout) for _, test in ipairs(lost_tests) do if redis.call('sismember', processed_key, test) == 0 then + local lease = redis.call('incr', lease_counter_key) redis.call('zadd', zset_key, current_time, test) redis.call('lpush', worker_queue_key, test) - redis.call('hset', owners_key, test, worker_queue_key) -- Take ownership - return test + redis.call('hset', owners_key, test, worker_queue_key) + redis.call('hset', leases_key, test, lease) + return {test, tostring(lease)} + else + -- Test is already processed but still in running (stale). This can happen when + -- a non-owner worker acknowledged the test (marking it processed) but could not + -- remove it from running due to the lease guard. Clean it up. + redis.call('zrem', zset_key, test) + redis.call('hdel', owners_key, test) + redis.call('hdel', leases_key, test) end end diff --git a/ruby/lib/ci/queue/redis/base.rb b/ruby/lib/ci/queue/redis/base.rb index 95693d3f..db835912 100644 --- a/ruby/lib/ci/queue/redis/base.rb +++ b/ruby/lib/ci/queue/redis/base.rb @@ -60,10 +60,10 @@ def reconnect_attempts [0, 0, 0.1, 0.5, 1, 3, 5] end - def with_heartbeat(id) + def with_heartbeat(id, lease: nil) if heartbeat_enabled? ensure_heartbeat_thread_alive! - heartbeat_state.set(:tick, id) + heartbeat_state.set(:tick, id, lease) end yield @@ -264,12 +264,11 @@ def resolve_lua_includes(script, root) end class HeartbeatProcess - def initialize(redis_url, zset_key, processed_key, owners_key, worker_queue_key) + def initialize(redis_url, zset_key, owners_key, leases_key) @redis_url = redis_url @zset_key = zset_key - @processed_key = processed_key @owners_key = owners_key - @worker_queue_key = worker_queue_key + @leases_key = leases_key end def boot! @@ -281,9 +280,8 @@ def boot! ::File.join(__dir__, "monitor.rb"), @redis_url, @zset_key, - @processed_key, @owners_key, - @worker_queue_key, + @leases_key, in: child_read, out: child_write, ) @@ -313,8 +311,8 @@ def shutdown! end end - def tick!(id) - send_message(:tick!, id: id) + def tick!(id, lease) + send_message(:tick!, id: id, lease: lease.to_s) end private @@ -355,9 +353,8 @@ def heartbeat_process @heartbeat_process ||= HeartbeatProcess.new( @redis_url, key('running'), - key('processed'), key('owners'), - key('worker', worker_id, 'queue'), + key('leases'), ) end @@ -369,19 +366,16 @@ def heartbeat Thread.current.name = "CI::Queue#heartbeat" Thread.current.abort_on_exception = true - timeout = config.timeout.to_i loop do - command = nil command = heartbeat_state.wait(1) # waits for max 1 second but wakes up immediately if we receive a command case command&.first when :tick - if timeout > 0 - heartbeat_process.tick!(command.last) - timeout -= 1 - end + # command = [:tick, entry_id, lease_id] + heartbeat_process.tick!(command[1], command[2]) when :reset - timeout = config.timeout.to_i + # Test finished, stop ticking until next test starts + nil when :stop break end diff --git a/ruby/lib/ci/queue/redis/monitor.rb b/ruby/lib/ci/queue/redis/monitor.rb index 8554ab06..8027d7dc 100755 --- a/ruby/lib/ci/queue/redis/monitor.rb +++ b/ruby/lib/ci/queue/redis/monitor.rb @@ -13,11 +13,10 @@ class Monitor DEV_SCRIPTS_ROOT = ::File.expand_path('../../../../../../redis', __FILE__) RELEASE_SCRIPTS_ROOT = ::File.expand_path('../../redis', __FILE__) - def initialize(pipe, logger, redis_url, zset_key, processed_key, owners_key, worker_queue_key) + def initialize(pipe, logger, redis_url, zset_key, owners_key, leases_key) @zset_key = zset_key - @processed_key = processed_key @owners_key = owners_key - @worker_queue_key = worker_queue_key + @leases_key = leases_key @logger = logger @redis = ::Redis.new(url: redis_url, reconnect_attempts: [0, 0, 0.1, 0.5, 1, 3, 5]) @shutdown = false @@ -36,11 +35,11 @@ def soft_signal(sig) @self_pipe_writer << '.' end - def process_tick!(id:) + def process_tick!(id:, lease:) eval_script( :heartbeat, - keys: [@zset_key, @processed_key, @owners_key, @worker_queue_key], - argv: [Time.now.to_f, id] + keys: [@zset_key, @leases_key], + argv: [Time.now.to_f, id, lease] ) rescue => error @logger.info(error) @@ -151,12 +150,11 @@ def monitor redis_url = ARGV[0] zset_key = ARGV[1] -processed_key = ARGV[2] -owners_key = ARGV[3] -worker_queue_key = ARGV[4] +owners_key = ARGV[2] +leases_key = ARGV[3] -logger.debug("Starting monitor: #{redis_url} #{zset_key} #{processed_key}") -manager = CI::Queue::Redis::Monitor.new($stdin, logger, redis_url, zset_key, processed_key, owners_key, worker_queue_key) +logger.debug("Starting monitor: #{redis_url} #{zset_key} #{leases_key}") +manager = CI::Queue::Redis::Monitor.new($stdin, logger, redis_url, zset_key, owners_key, leases_key) # Notify the parent we're ready $stdout.puts(".") diff --git a/ruby/lib/ci/queue/redis/worker.rb b/ruby/lib/ci/queue/redis/worker.rb index 40a457cd..38fa4483 100644 --- a/ruby/lib/ci/queue/redis/worker.rb +++ b/ruby/lib/ci/queue/redis/worker.rb @@ -19,6 +19,7 @@ class Worker < Base def initialize(redis, config) @reserved_tests = Concurrent::Set.new + @reserved_leases = Concurrent::Map.new @shutdown_required = false @first_reserve_at = nil super(redis, config) @@ -172,6 +173,11 @@ def worker_queue_length nil end + def lease_for(entry) + test_id = CI::Queue::QueueEntry.test_id(entry) + @reserved_leases[test_id] + end + def report_worker_error(error) build.report_worker_error(error) end @@ -180,11 +186,12 @@ def acknowledge(entry, error: nil, pipeline: redis) test_id = CI::Queue::QueueEntry.test_id(entry) assert_reserved!(test_id) entry = reserved_entries.fetch(test_id, entry) + lease = @reserved_leases.delete(test_id) unreserve_entry(test_id) eval_script( :acknowledge, - keys: [key('running'), key('processed'), key('owners'), key('error-reports'), key('requeued-by')], - argv: [entry, error.to_s, config.redis_ttl], + keys: [key('running'), key('processed'), key('owners'), key('error-reports'), key('requeued-by'), key('leases')], + argv: [entry, error.to_s, config.redis_ttl, lease.to_s], pipeline: pipeline, ) == 1 end @@ -193,6 +200,7 @@ def requeue(entry, offset: Redis.requeue_offset) test_id = CI::Queue::QueueEntry.test_id(entry) assert_reserved!(test_id) entry = reserved_entries.fetch(test_id, entry) + lease = @reserved_leases.delete(test_id) unreserve_entry(test_id) global_max_requeues = config.global_max_requeues(total) @@ -207,14 +215,16 @@ def requeue(entry, offset: Redis.requeue_offset) key('owners'), key('error-reports'), key('requeued-by'), + key('leases'), ], - argv: [config.max_requeues, global_max_requeues, entry, offset, config.redis_ttl], + argv: [config.max_requeues, global_max_requeues, entry, offset, config.redis_ttl, lease.to_s], ) == 1 unless requeued reserved_tests << test_id reserved_entries[test_id] = entry reserved_entry_ids[entry] = test_id + @reserved_leases[test_id] = lease if lease end requeued end @@ -222,7 +232,7 @@ def requeue(entry, offset: Redis.requeue_offset) def release! eval_script( :release, - keys: [key('running'), key('worker', worker_id, 'queue'), key('owners')], + keys: [key('running'), key('worker', worker_id, 'queue'), key('owners'), key('leases')], argv: [], ) nil @@ -254,11 +264,12 @@ def assert_reserved!(test_id) end end - def reserve_entry(entry) + def reserve_entry(entry, lease = nil) test_id = CI::Queue::QueueEntry.test_id(entry) reserved_tests << test_id reserved_entries[test_id] = entry reserved_entry_ids[entry] = test_id + @reserved_leases[test_id] = lease if lease end def unreserve_entry(test_id) @@ -343,12 +354,12 @@ def finalize_streaming end def reserve - (try_to_reserve_lost_test || try_to_reserve_test).tap do |entry| - if entry - @first_reserve_at ||= Process.clock_gettime(Process::CLOCK_MONOTONIC) - reserve_entry(entry) - end + entry, lease = try_to_reserve_lost_test || try_to_reserve_test || [nil, nil] + if entry + @first_reserve_at ||= Process.clock_gettime(Process::CLOCK_MONOTONIC) + reserve_entry(entry, lease) end + entry end def try_to_reserve_test @@ -362,6 +373,8 @@ def try_to_reserve_test key('owners'), key('requeued-by'), key('workers'), + key('leases'), + key('lease-counter'), ], argv: [CI::Queue.time_now.to_f, Redis.requeue_offset], ) @@ -370,25 +383,28 @@ def try_to_reserve_test def try_to_reserve_lost_test timeout = config.max_missed_heartbeat_seconds ? config.max_missed_heartbeat_seconds : config.timeout - lost_test = eval_script( + result = eval_script( :reserve_lost, keys: [ key('running'), key('processed'), key('worker', worker_id, 'queue'), key('owners'), + key('leases'), + key('lease-counter'), ], argv: [CI::Queue.time_now.to_f, timeout], ) - if lost_test - build.record_warning(Warnings::RESERVED_LOST_TEST, test: CI::Queue::QueueEntry.test_id(lost_test), timeout: config.timeout) + if result + entry = result.is_a?(Array) ? result[0] : result + build.record_warning(Warnings::RESERVED_LOST_TEST, test: CI::Queue::QueueEntry.test_id(entry), timeout: config.timeout) if CI::Queue.debug? - $stderr.puts "[ci-queue][reserve_lost] worker=#{worker_id} test_id=#{CI::Queue::QueueEntry.test_id(lost_test)}" + $stderr.puts "[ci-queue][reserve_lost] worker=#{worker_id} test_id=#{CI::Queue::QueueEntry.test_id(entry)}" end end - lost_test + result end def push(entries) diff --git a/ruby/lib/ci/queue/static.rb b/ruby/lib/ci/queue/static.rb index 0799e4b4..2dfc6bc3 100644 --- a/ruby/lib/ci/queue/static.rb +++ b/ruby/lib/ci/queue/static.rb @@ -50,10 +50,14 @@ def populate(tests, random: nil) self end - def with_heartbeat(id) + def with_heartbeat(id, lease: nil) yield end + def lease_for(entry) + nil + end + def ensure_heartbeat_thread_alive!; end def boot_heartbeat_process!; end diff --git a/ruby/lib/minitest/queue.rb b/ruby/lib/minitest/queue.rb index b1459b9f..bcbcfc95 100644 --- a/ruby/lib/minitest/queue.rb +++ b/ruby/lib/minitest/queue.rb @@ -163,7 +163,7 @@ def run(reporter, *) rescue_run_errors do begin queue.poll do |example| - result = queue.with_heartbeat(example.queue_entry) do + result = queue.with_heartbeat(example.queue_entry, lease: queue.lease_for(example.queue_entry)) do example.run end diff --git a/ruby/test/ci/queue/redis_test.rb b/ruby/test/ci/queue/redis_test.rb index c8d927f3..1de567db 100644 --- a/ruby/test/ci/queue/redis_test.rb +++ b/ruby/test/ci/queue/redis_test.rb @@ -321,27 +321,35 @@ def test_reserve_defers_own_requeued_test_once assert_nil @redis.hget(requeued_by_key, entry) second_try = queue.send(:try_to_reserve_test) - assert_equal entry, second_try + assert_equal entry, second_try[0] end - def test_heartbeat_uses_entry_for_processed_check + def test_heartbeat_only_checks_lease queue = worker(1, populate: false) entry = CI::Queue::QueueEntry.format('ATest#test_foo', '/tmp/a_test.rb') + lease = "42" - @redis.sadd(queue.send(:key, 'processed'), entry) + # Set up: entry is in running with a matching lease + @redis.zadd(queue.send(:key, 'running'), 0, entry) + @redis.hset(queue.send(:key, 'leases'), entry, lease) + # Heartbeat with matching lease should succeed (even if processed) + @redis.sadd(queue.send(:key, 'processed'), entry) result = queue.send( :eval_script, :heartbeat, - keys: [ - queue.send(:key, 'running'), - queue.send(:key, 'processed'), - queue.send(:key, 'owners'), - queue.send(:key, 'worker', queue.config.worker_id, 'queue'), - ], - argv: [CI::Queue.time_now.to_f, entry], + keys: [queue.send(:key, 'running'), queue.send(:key, 'leases')], + argv: [CI::Queue.time_now.to_f, entry, lease], ) + assert_equal 0, result # zadd returns 0 for update (not new) + # Heartbeat with wrong lease should be no-op + result = queue.send( + :eval_script, + :heartbeat, + keys: [queue.send(:key, 'running'), queue.send(:key, 'leases')], + argv: [CI::Queue.time_now.to_f, entry, "wrong-lease"], + ) assert_nil result end @@ -550,6 +558,201 @@ def test_circuit_breaker_does_not_count_requeued_failures "Circuit breaker open? #{queue.config.circuit_breakers.any?(&:open?)}" end + def test_stolen_test_acknowledge_does_not_remove_running_entry + @redis.flushdb + single_test = [TEST_LIST.first] + queue_a = worker(1, tests: single_test) + queue_b = worker(2, tests: single_test) + + acquired = false + stolen = false + a_acked = false + monitor = Monitor.new + condition = monitor.new_cond + + thread = Thread.start do + monitor.synchronize { condition.wait_until { acquired } } + queue_b.poll do |test| + monitor.synchronize do + stolen = true + condition.signal + condition.wait_until { a_acked } + end + queue_b.acknowledge(test.queue_entry) + end + end + + worker_a_ack_result = nil + queue_a.poll do |test| + # Simulate stale heartbeat by setting score to 0 (immediately reclaimable) + @redis.zadd('build:42:running', 0, test.queue_entry) + acquired = true + monitor.synchronize do + condition.signal + condition.wait_until { stolen } + end + # Worker B has stolen the test via reserve_lost. Worker A acknowledges. + # The result (sadd) succeeds, but the running entry must NOT be removed + # because Worker B still owns it. + worker_a_ack_result = queue_a.acknowledge(test.queue_entry) + # Entry should still be in running (Worker B owns it, zrem was skipped) + assert_operator @redis.zcard('build:42:running'), :>, 0, + "Running entry must not be removed by non-owner acknowledge" + monitor.synchronize do + a_acked = true + condition.signal + end + end + + thread.join(5) + + assert_equal true, worker_a_ack_result, "First finisher's acknowledge should succeed (sadd)" + assert_predicate queue_a, :exhausted? + end + + def test_stolen_test_requeue_is_rejected_by_ownership_check + @redis.flushdb + single_test = [TEST_LIST.first] + queue_a = worker(1, tests: single_test, max_requeues: 5, requeue_tolerance: 1.0) + queue_b = worker(2, tests: single_test, max_requeues: 5, requeue_tolerance: 1.0) + + acquired = false + stolen = false + a_requeued = false + monitor = Monitor.new + condition = monitor.new_cond + + thread = Thread.start do + monitor.synchronize { condition.wait_until { acquired } } + queue_b.poll do |test| + monitor.synchronize do + stolen = true + condition.signal + condition.wait_until { a_requeued } + end + queue_b.acknowledge(test.queue_entry) + end + end + + worker_a_requeue_result = nil + queue_a.poll do |test| + @redis.zadd('build:42:running', 0, test.queue_entry) + acquired = true + monitor.synchronize do + condition.signal + condition.wait_until { stolen } + end + # Worker A tries to requeue — should fail (ownership transferred) + worker_a_requeue_result = queue_a.requeue(test.queue_entry) + # Entry should still be in running (Worker B owns it) + assert_operator @redis.zcard('build:42:running'), :>, 0 + monitor.synchronize do + a_requeued = true + condition.signal + end + end + + thread.join(5) + + assert_equal false, worker_a_requeue_result, "Stale worker's requeue should be rejected" + assert_predicate queue_a, :exhausted? + end + + def test_supervisor_not_exhausted_while_stolen_test_in_flight + @redis.flushdb + single_test = [TEST_LIST.first] + queue_a = worker(1, tests: single_test) + queue_b = worker(2, tests: single_test) + supervisor = CI::Queue::Redis::Supervisor.new( + @redis_url, + CI::Queue::Configuration.new(build_id: '42', timeout: 0.2), + ) + + acquired = false + stolen = false + a_acked = false + monitor = Monitor.new + condition = monitor.new_cond + + thread = Thread.start do + monitor.synchronize { condition.wait_until { acquired } } + queue_b.poll do |test| + monitor.synchronize do + stolen = true + condition.signal + condition.wait_until { a_acked } + end + # Supervisor should NOT be exhausted yet: Worker B still has the test + refute_predicate supervisor, :exhausted?, "Supervisor should not be exhausted while stolen test is still in-flight" + queue_b.acknowledge(test.queue_entry) + end + end + + queue_a.poll do |test| + @redis.zadd('build:42:running', 0, test.queue_entry) + acquired = true + monitor.synchronize do + condition.signal + condition.wait_until { stolen } + end + queue_a.acknowledge(test.queue_entry) + monitor.synchronize do + a_acked = true + condition.signal + end + end + + thread.join(5) + + # Now supervisor should be exhausted + assert_predicate supervisor, :exhausted? + end + + def test_ownership_stress_many_workers_stealing_tests + # Stress test: multiple workers compete for tests, with frequent lease expiry + # and stealing. Verifies that exhausted? is never true while tests are in-flight. + @redis.flushdb + num_workers = 6 + num_tests = 12 + test_names = num_tests.times.map { |i| "StressTest#test_#{i}" } + tests = test_names.map { |n| SharedTestCases::TestCase.new(n) } + + queues = num_workers.times.map do |i| + worker(i + 1, tests: tests, timeout: 0.2, max_requeues: 0, requeue_tolerance: 0, max_consecutive_failures: num_tests) + end + supervisor = CI::Queue::Redis::Supervisor.new( + @redis_url, + CI::Queue::Configuration.new(build_id: '42', timeout: 0.2), + ) + + mutex = Mutex.new + all_acknowledged = 0 + errors = [] + + threads = queues.map.with_index do |queue, idx| + Thread.new do + queue.poll do |test| + # Randomly simulate stale heartbeat for ~30% of tests + if idx > 0 && rand < 0.3 + @redis.zadd('build:42:running', 0, test.queue_entry) + sleep(rand * 0.05) # Tiny jitter to increase contention + end + + result = queue.acknowledge(test.queue_entry) + mutex.synchronize { all_acknowledged += 1 } if result + end + rescue => e + mutex.synchronize { errors << "Worker #{idx}: #{e.class}: #{e.message}" } + end + end + + threads.each { |t| t.join(10) } + threads.each { |t| t.kill if t.alive? } + + assert_predicate supervisor, :exhausted?, "All tests should be done. Errors: #{errors.join('; ')}" + assert_equal num_tests, all_acknowledged, "All #{num_tests} tests should be acknowledged exactly once. Errors: #{errors.join('; ')}" + end + private def shuffled_test_list diff --git a/ruby/test/integration/minitest_redis_test.rb b/ruby/test/integration/minitest_redis_test.rb index 92566dc5..cf78a7a2 100644 --- a/ruby/test/integration/minitest_redis_test.rb +++ b/ruby/test/integration/minitest_redis_test.rb @@ -62,7 +62,7 @@ def test_lost_test_with_heartbeat_monitor '--timeout', '1', '--max-requeues', '1', '--requeue-tolerance', '1', - '--heartbeat', '1', + '--heartbeat', '5', '-Itest', 'test/lost_test.rb', chdir: 'test/fixtures/', @@ -91,7 +91,9 @@ def test_lost_test_with_heartbeat_monitor # lost_test.rb test_foo has no assertions (only sleep) assert_equal "Ran 1 tests, 0 assertions, 0 failures, 0 errors, 0 skips, 0 requeues in X.XXs (aggregated)", result warnings = warnings_file.read.lines.map { |line| JSON.parse(line) } - assert_equal 1, warnings.size + # With lease-based heartbeat, the heartbeat keeps the entry alive for the + # full test duration, so the test is never stolen and no warning is generated. + assert_equal 0, warnings.size end end