From c86f7bb30f6bd2593e7d3ed52e30d7dfab72c211 Mon Sep 17 00:00:00 2001 From: Ian Ker-Seymer Date: Thu, 26 Mar 2026 16:35:48 -0400 Subject: [PATCH 1/6] Fix supervisor exiting before all workers complete tests The heartbeat thread countdown limited heartbeats to config.timeout seconds, causing running ZSET entries to go stale for long tests. Combined with acknowledge.lua's unconditional zrem, a stale worker whose test was stolen via reserve_lost could remove the running entry when acknowledging, making exhausted? return true prematurely. Three-layer fix: - Remove heartbeat countdown so heartbeats continue for full test duration - Guard acknowledge/requeue zrem on ownership so non-owners can't empty the running set - Clean up stale processed entries in reserve_lost --- redis/acknowledge.lua | 13 +- redis/requeue.lua | 8 +- redis/reserve_lost.lua | 6 + ruby/lib/ci/queue/redis/base.rb | 9 +- ruby/lib/ci/queue/redis/worker.rb | 2 +- ruby/test/ci/queue/redis_test.rb | 195 ++++++++++++++++++++++++++++++ 6 files changed, 220 insertions(+), 13 deletions(-) diff --git a/redis/acknowledge.lua b/redis/acknowledge.lua index 4ca40729..380fb491 100644 --- a/redis/acknowledge.lua +++ b/redis/acknowledge.lua @@ -3,12 +3,21 @@ local processed_key = KEYS[2] local owners_key = KEYS[3] local error_reports_key = KEYS[4] local requeued_by_key = KEYS[5] +local worker_queue_key = KEYS[6] local entry = ARGV[1] local error = ARGV[2] local ttl = ARGV[3] -redis.call('zrem', zset_key, entry) -redis.call('hdel', owners_key, entry) -- Doesn't matter if it was reclaimed by another workers + +-- Only the current owner can remove the entry from the running set. +-- If ownership was transferred (e.g. via reserve_lost), the stale worker +-- must not remove the running entry — that would let the supervisor think +-- the queue is exhausted while the new owner is still processing. +if redis.call('hget', owners_key, entry) == worker_queue_key then + redis.call('zrem', zset_key, entry) + redis.call('hdel', owners_key, entry) +end + redis.call('hdel', requeued_by_key, entry) local acknowledged = redis.call('sadd', processed_key, entry) == 1 diff --git a/redis/requeue.lua b/redis/requeue.lua index e01e61d3..70f5a766 100644 --- a/redis/requeue.lua +++ b/redis/requeue.lua @@ -13,8 +13,11 @@ local entry = ARGV[3] local offset = ARGV[4] local ttl = tonumber(ARGV[5]) -if redis.call('hget', owners_key, entry) == worker_queue_key then - redis.call('hdel', owners_key, entry) +-- Only the current owner can requeue a test. +-- If ownership was transferred (e.g. via reserve_lost), reject the stale worker's +-- requeue so the running entry stays intact for the new owner. +if redis.call('hget', owners_key, entry) ~= worker_queue_key then + return false end if redis.call('sismember', processed_key, entry) == 1 then @@ -48,6 +51,7 @@ if ttl and ttl > 0 then redis.call('expire', requeued_by_key, ttl) end +redis.call('hdel', owners_key, entry) redis.call('zrem', zset_key, entry) return true diff --git a/redis/reserve_lost.lua b/redis/reserve_lost.lua index 9dfaa616..beceaa68 100644 --- a/redis/reserve_lost.lua +++ b/redis/reserve_lost.lua @@ -13,6 +13,12 @@ for _, test in ipairs(lost_tests) do redis.call('lpush', worker_queue_key, test) redis.call('hset', owners_key, test, worker_queue_key) -- Take ownership return test + else + -- Test is already processed but still in running (stale). This can happen when + -- a non-owner worker acknowledged the test (marking it processed) but could not + -- remove it from running due to the ownership guard. Clean it up. + redis.call('zrem', zset_key, test) + redis.call('hdel', owners_key, test) end end diff --git a/ruby/lib/ci/queue/redis/base.rb b/ruby/lib/ci/queue/redis/base.rb index 95693d3f..db2ebbaf 100644 --- a/ruby/lib/ci/queue/redis/base.rb +++ b/ruby/lib/ci/queue/redis/base.rb @@ -369,19 +369,12 @@ def heartbeat Thread.current.name = "CI::Queue#heartbeat" Thread.current.abort_on_exception = true - timeout = config.timeout.to_i loop do - command = nil command = heartbeat_state.wait(1) # waits for max 1 second but wakes up immediately if we receive a command case command&.first when :tick - if timeout > 0 - heartbeat_process.tick!(command.last) - timeout -= 1 - end - when :reset - timeout = config.timeout.to_i + heartbeat_process.tick!(command.last) when :stop break end diff --git a/ruby/lib/ci/queue/redis/worker.rb b/ruby/lib/ci/queue/redis/worker.rb index 40a457cd..9bfe438f 100644 --- a/ruby/lib/ci/queue/redis/worker.rb +++ b/ruby/lib/ci/queue/redis/worker.rb @@ -183,7 +183,7 @@ def acknowledge(entry, error: nil, pipeline: redis) unreserve_entry(test_id) eval_script( :acknowledge, - keys: [key('running'), key('processed'), key('owners'), key('error-reports'), key('requeued-by')], + keys: [key('running'), key('processed'), key('owners'), key('error-reports'), key('requeued-by'), key('worker', worker_id, 'queue')], argv: [entry, error.to_s, config.redis_ttl], pipeline: pipeline, ) == 1 diff --git a/ruby/test/ci/queue/redis_test.rb b/ruby/test/ci/queue/redis_test.rb index c8d927f3..f56e3991 100644 --- a/ruby/test/ci/queue/redis_test.rb +++ b/ruby/test/ci/queue/redis_test.rb @@ -550,6 +550,201 @@ def test_circuit_breaker_does_not_count_requeued_failures "Circuit breaker open? #{queue.config.circuit_breakers.any?(&:open?)}" end + def test_stolen_test_acknowledge_does_not_remove_running_entry + @redis.flushdb + single_test = [TEST_LIST.first] + queue_a = worker(1, tests: single_test) + queue_b = worker(2, tests: single_test) + + acquired = false + stolen = false + a_acked = false + monitor = Monitor.new + condition = monitor.new_cond + + thread = Thread.start do + monitor.synchronize { condition.wait_until { acquired } } + queue_b.poll do |test| + monitor.synchronize do + stolen = true + condition.signal + condition.wait_until { a_acked } + end + queue_b.acknowledge(test.queue_entry) + end + end + + worker_a_ack_result = nil + queue_a.poll do |test| + # Simulate stale heartbeat by setting score to 0 (immediately reclaimable) + @redis.zadd('build:42:running', 0, test.queue_entry) + acquired = true + monitor.synchronize do + condition.signal + condition.wait_until { stolen } + end + # Worker B has stolen the test via reserve_lost. Worker A acknowledges. + # The result (sadd) succeeds, but the running entry must NOT be removed + # because Worker B still owns it. + worker_a_ack_result = queue_a.acknowledge(test.queue_entry) + # Entry should still be in running (Worker B owns it, zrem was skipped) + assert_operator @redis.zcard('build:42:running'), :>, 0, + "Running entry must not be removed by non-owner acknowledge" + monitor.synchronize do + a_acked = true + condition.signal + end + end + + thread.join(5) + + assert_equal true, worker_a_ack_result, "First finisher's acknowledge should succeed (sadd)" + assert_predicate queue_a, :exhausted? + end + + def test_stolen_test_requeue_is_rejected_by_ownership_check + @redis.flushdb + single_test = [TEST_LIST.first] + queue_a = worker(1, tests: single_test, max_requeues: 5, requeue_tolerance: 1.0) + queue_b = worker(2, tests: single_test, max_requeues: 5, requeue_tolerance: 1.0) + + acquired = false + stolen = false + a_requeued = false + monitor = Monitor.new + condition = monitor.new_cond + + thread = Thread.start do + monitor.synchronize { condition.wait_until { acquired } } + queue_b.poll do |test| + monitor.synchronize do + stolen = true + condition.signal + condition.wait_until { a_requeued } + end + queue_b.acknowledge(test.queue_entry) + end + end + + worker_a_requeue_result = nil + queue_a.poll do |test| + @redis.zadd('build:42:running', 0, test.queue_entry) + acquired = true + monitor.synchronize do + condition.signal + condition.wait_until { stolen } + end + # Worker A tries to requeue — should fail (ownership transferred) + worker_a_requeue_result = queue_a.requeue(test.queue_entry) + # Entry should still be in running (Worker B owns it) + assert_operator @redis.zcard('build:42:running'), :>, 0 + monitor.synchronize do + a_requeued = true + condition.signal + end + end + + thread.join(5) + + assert_equal false, worker_a_requeue_result, "Stale worker's requeue should be rejected" + assert_predicate queue_a, :exhausted? + end + + def test_supervisor_not_exhausted_while_stolen_test_in_flight + @redis.flushdb + single_test = [TEST_LIST.first] + queue_a = worker(1, tests: single_test) + queue_b = worker(2, tests: single_test) + supervisor = CI::Queue::Redis::Supervisor.new( + @redis_url, + CI::Queue::Configuration.new(build_id: '42', timeout: 0.2), + ) + + acquired = false + stolen = false + a_acked = false + monitor = Monitor.new + condition = monitor.new_cond + + thread = Thread.start do + monitor.synchronize { condition.wait_until { acquired } } + queue_b.poll do |test| + monitor.synchronize do + stolen = true + condition.signal + condition.wait_until { a_acked } + end + # Supervisor should NOT be exhausted yet: Worker B still has the test + refute_predicate supervisor, :exhausted?, "Supervisor should not be exhausted while stolen test is still in-flight" + queue_b.acknowledge(test.queue_entry) + end + end + + queue_a.poll do |test| + @redis.zadd('build:42:running', 0, test.queue_entry) + acquired = true + monitor.synchronize do + condition.signal + condition.wait_until { stolen } + end + queue_a.acknowledge(test.queue_entry) + monitor.synchronize do + a_acked = true + condition.signal + end + end + + thread.join(5) + + # Now supervisor should be exhausted + assert_predicate supervisor, :exhausted? + end + + def test_ownership_stress_many_workers_stealing_tests + # Stress test: multiple workers compete for tests, with frequent lease expiry + # and stealing. Verifies that exhausted? is never true while tests are in-flight. + @redis.flushdb + num_workers = 6 + num_tests = 12 + test_names = num_tests.times.map { |i| "StressTest#test_#{i}" } + tests = test_names.map { |n| SharedTestCases::TestCase.new(n) } + + queues = num_workers.times.map do |i| + worker(i + 1, tests: tests, timeout: 0.2, max_requeues: 0, requeue_tolerance: 0, max_consecutive_failures: num_tests) + end + supervisor = CI::Queue::Redis::Supervisor.new( + @redis_url, + CI::Queue::Configuration.new(build_id: '42', timeout: 0.2), + ) + + mutex = Mutex.new + all_acknowledged = 0 + errors = [] + + threads = queues.map.with_index do |queue, idx| + Thread.new do + queue.poll do |test| + # Randomly simulate stale heartbeat for ~30% of tests + if idx > 0 && rand < 0.3 + @redis.zadd('build:42:running', 0, test.queue_entry) + sleep(rand * 0.05) # Tiny jitter to increase contention + end + + result = queue.acknowledge(test.queue_entry) + mutex.synchronize { all_acknowledged += 1 } if result + end + rescue => e + mutex.synchronize { errors << "Worker #{idx}: #{e.class}: #{e.message}" } + end + end + + threads.each { |t| t.join(10) } + threads.each { |t| t.kill if t.alive? } + + assert_predicate supervisor, :exhausted?, "All tests should be done. Errors: #{errors.join('; ')}" + assert_equal num_tests, all_acknowledged, "All #{num_tests} tests should be acknowledged exactly once. Errors: #{errors.join('; ')}" + end + private def shuffled_test_list From 68005176451a239330b2ebf1385004d4b088f64a Mon Sep 17 00:00:00 2001 From: Ian Ker-Seymer Date: Thu, 26 Mar 2026 23:36:20 -0400 Subject: [PATCH 2/6] Replace worker-queue-key ownership with lease-based authorization MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The heartbeat poisoning bug: when a stale worker's acknowledge adds an entry to the processed set (sadd is unconditional), heartbeat.lua's sismember(processed) check causes the current lease holder's heartbeat to stop, making the entry go stale and get cleaned up from running while the holder is still executing the test. Fix by introducing per-reservation lease IDs. Each reserve/reserve_lost generates a monotonic lease ID stored in a separate 'leases' hash. heartbeat.lua, acknowledge.lua, and requeue.lua now check the lease instead of worker_queue_key ownership or processed set membership. The owners hash (entry → worker_queue_key) is preserved for release.lua and requeued-by routing, which need to identify entries by worker, not by lease. --- redis/acknowledge.lua | 12 ++++---- redis/heartbeat.lua | 19 ++++++------- redis/release.lua | 2 ++ redis/requeue.lua | 11 +++++--- redis/reserve.lua | 21 ++++++++------ redis/reserve_lost.lua | 11 ++++++-- ruby/lib/ci/queue/redis/base.rb | 22 +++++++-------- ruby/lib/ci/queue/redis/monitor.rb | 20 ++++++------- ruby/lib/ci/queue/redis/worker.rb | 45 ++++++++++++++++++++---------- ruby/lib/ci/queue/static.rb | 6 +++- ruby/lib/minitest/queue.rb | 2 +- ruby/test/ci/queue/redis_test.rb | 28 ++++++++++++------- 12 files changed, 119 insertions(+), 80 deletions(-) diff --git a/redis/acknowledge.lua b/redis/acknowledge.lua index 380fb491..233522d5 100644 --- a/redis/acknowledge.lua +++ b/redis/acknowledge.lua @@ -3,19 +3,21 @@ local processed_key = KEYS[2] local owners_key = KEYS[3] local error_reports_key = KEYS[4] local requeued_by_key = KEYS[5] -local worker_queue_key = KEYS[6] +local leases_key = KEYS[6] local entry = ARGV[1] local error = ARGV[2] local ttl = ARGV[3] +local lease_id = ARGV[4] --- Only the current owner can remove the entry from the running set. --- If ownership was transferred (e.g. via reserve_lost), the stale worker +-- Only the current lease holder can remove the entry from the running set. +-- If the lease was transferred (e.g. via reserve_lost), the stale worker -- must not remove the running entry — that would let the supervisor think --- the queue is exhausted while the new owner is still processing. -if redis.call('hget', owners_key, entry) == worker_queue_key then +-- the queue is exhausted while the new lease holder is still processing. +if tostring(redis.call('hget', leases_key, entry)) == lease_id then redis.call('zrem', zset_key, entry) redis.call('hdel', owners_key, entry) + redis.call('hdel', leases_key, entry) end redis.call('hdel', requeued_by_key, entry) diff --git a/redis/heartbeat.lua b/redis/heartbeat.lua index 4c2bc64c..91ccd244 100644 --- a/redis/heartbeat.lua +++ b/redis/heartbeat.lua @@ -1,17 +1,16 @@ local zset_key = KEYS[1] -local processed_key = KEYS[2] -local owners_key = KEYS[3] -local worker_queue_key = KEYS[4] +local leases_key = KEYS[2] local current_time = ARGV[1] local entry = ARGV[2] +local lease_id = ARGV[3] --- already processed, we do not need to bump the timestamp -if redis.call('sismember', processed_key, entry) == 1 then - return false -end - --- we're still the owner of the test, we can bump the timestamp -if redis.call('hget', owners_key, entry) == worker_queue_key then +-- Only the current lease holder can bump the timestamp. +-- We intentionally do NOT check the processed set. A non-owner worker's +-- acknowledge can add the entry to processed, which would poison the +-- current lease holder's heartbeat if we checked it here. +-- The lease check alone is sufficient — once the lease holder acknowledges, +-- they zrem + hdel the lease, so the heartbeat will naturally stop. +if tostring(redis.call('hget', leases_key, entry)) == lease_id then return redis.call('zadd', zset_key, current_time, entry) end diff --git a/redis/release.lua b/redis/release.lua index 4bcc5929..4b2c7c88 100644 --- a/redis/release.lua +++ b/redis/release.lua @@ -1,6 +1,7 @@ local zset_key = KEYS[1] local worker_queue_key = KEYS[2] local owners_key = KEYS[3] +local leases_key = KEYS[4] -- owned_tests = {"SomeTest", "worker:1", "SomeOtherTest", "worker:2", ...} local owned_tests = redis.call('hgetall', owners_key) @@ -8,6 +9,7 @@ for index, owner_or_test in ipairs(owned_tests) do if owner_or_test == worker_queue_key then -- If we owned a test local test = owned_tests[index - 1] redis.call('zadd', zset_key, "0", test) -- We expire the lease immediately + redis.call('hdel', leases_key, test) return nil end end diff --git a/redis/requeue.lua b/redis/requeue.lua index 70f5a766..00d54e40 100644 --- a/redis/requeue.lua +++ b/redis/requeue.lua @@ -6,17 +6,19 @@ local worker_queue_key = KEYS[5] local owners_key = KEYS[6] local error_reports_key = KEYS[7] local requeued_by_key = KEYS[8] +local leases_key = KEYS[9] local max_requeues = tonumber(ARGV[1]) local global_max_requeues = tonumber(ARGV[2]) local entry = ARGV[3] local offset = ARGV[4] local ttl = tonumber(ARGV[5]) +local lease_id = ARGV[6] --- Only the current owner can requeue a test. --- If ownership was transferred (e.g. via reserve_lost), reject the stale worker's --- requeue so the running entry stays intact for the new owner. -if redis.call('hget', owners_key, entry) ~= worker_queue_key then +-- Only the current lease holder can requeue a test. +-- If the lease was transferred (e.g. via reserve_lost), reject the stale +-- worker's requeue so the running entry stays intact for the new holder. +if tostring(redis.call('hget', leases_key, entry)) ~= lease_id then return false end @@ -52,6 +54,7 @@ if ttl and ttl > 0 then end redis.call('hdel', owners_key, entry) +redis.call('hdel', leases_key, entry) redis.call('zrem', zset_key, entry) return true diff --git a/redis/reserve.lua b/redis/reserve.lua index 265a37ab..928dd4d0 100644 --- a/redis/reserve.lua +++ b/redis/reserve.lua @@ -5,6 +5,8 @@ local worker_queue_key = KEYS[4] local owners_key = KEYS[5] local requeued_by_key = KEYS[6] local workers_key = KEYS[7] +local leases_key = KEYS[8] +local lease_counter_key = KEYS[9] local current_time = ARGV[1] local defer_offset = tonumber(ARGV[2]) or 0 @@ -19,6 +21,15 @@ local function insert_with_offset(test) end end +local function claim_test(test) + local lease = redis.call('incr', lease_counter_key) + redis.call('zadd', zset_key, current_time, test) + redis.call('lpush', worker_queue_key, test) + redis.call('hset', owners_key, test, worker_queue_key) + redis.call('hset', leases_key, test, lease) + return {test, tostring(lease)} +end + for attempt = 1, max_skip_attempts do local test = redis.call('rpop', queue_key) if not test then @@ -30,10 +41,7 @@ for attempt = 1, max_skip_attempts do -- If this build only has one worker, allow immediate self-pickup. if redis.call('scard', workers_key) <= 1 then redis.call('hdel', requeued_by_key, test) - redis.call('zadd', zset_key, current_time, test) - redis.call('lpush', worker_queue_key, test) - redis.call('hset', owners_key, test, worker_queue_key) - return test + return claim_test(test) end insert_with_offset(test) @@ -46,10 +54,7 @@ for attempt = 1, max_skip_attempts do end else redis.call('hdel', requeued_by_key, test) - redis.call('zadd', zset_key, current_time, test) - redis.call('lpush', worker_queue_key, test) - redis.call('hset', owners_key, test, worker_queue_key) - return test + return claim_test(test) end end diff --git a/redis/reserve_lost.lua b/redis/reserve_lost.lua index beceaa68..e0a2c009 100644 --- a/redis/reserve_lost.lua +++ b/redis/reserve_lost.lua @@ -2,6 +2,8 @@ local zset_key = KEYS[1] local processed_key = KEYS[2] local worker_queue_key = KEYS[3] local owners_key = KEYS[4] +local leases_key = KEYS[5] +local lease_counter_key = KEYS[6] local current_time = ARGV[1] local timeout = ARGV[2] @@ -9,16 +11,19 @@ local timeout = ARGV[2] local lost_tests = redis.call('zrangebyscore', zset_key, 0, current_time - timeout) for _, test in ipairs(lost_tests) do if redis.call('sismember', processed_key, test) == 0 then + local lease = redis.call('incr', lease_counter_key) redis.call('zadd', zset_key, current_time, test) redis.call('lpush', worker_queue_key, test) - redis.call('hset', owners_key, test, worker_queue_key) -- Take ownership - return test + redis.call('hset', owners_key, test, worker_queue_key) + redis.call('hset', leases_key, test, lease) + return {test, tostring(lease)} else -- Test is already processed but still in running (stale). This can happen when -- a non-owner worker acknowledged the test (marking it processed) but could not - -- remove it from running due to the ownership guard. Clean it up. + -- remove it from running due to the lease guard. Clean it up. redis.call('zrem', zset_key, test) redis.call('hdel', owners_key, test) + redis.call('hdel', leases_key, test) end end diff --git a/ruby/lib/ci/queue/redis/base.rb b/ruby/lib/ci/queue/redis/base.rb index db2ebbaf..92dfc705 100644 --- a/ruby/lib/ci/queue/redis/base.rb +++ b/ruby/lib/ci/queue/redis/base.rb @@ -60,10 +60,10 @@ def reconnect_attempts [0, 0, 0.1, 0.5, 1, 3, 5] end - def with_heartbeat(id) + def with_heartbeat(id, lease: nil) if heartbeat_enabled? ensure_heartbeat_thread_alive! - heartbeat_state.set(:tick, id) + heartbeat_state.set(:tick, id, lease) end yield @@ -264,12 +264,11 @@ def resolve_lua_includes(script, root) end class HeartbeatProcess - def initialize(redis_url, zset_key, processed_key, owners_key, worker_queue_key) + def initialize(redis_url, zset_key, owners_key, leases_key) @redis_url = redis_url @zset_key = zset_key - @processed_key = processed_key @owners_key = owners_key - @worker_queue_key = worker_queue_key + @leases_key = leases_key end def boot! @@ -281,9 +280,8 @@ def boot! ::File.join(__dir__, "monitor.rb"), @redis_url, @zset_key, - @processed_key, @owners_key, - @worker_queue_key, + @leases_key, in: child_read, out: child_write, ) @@ -313,8 +311,8 @@ def shutdown! end end - def tick!(id) - send_message(:tick!, id: id) + def tick!(id, lease) + send_message(:tick!, id: id, lease: lease.to_s) end private @@ -355,9 +353,8 @@ def heartbeat_process @heartbeat_process ||= HeartbeatProcess.new( @redis_url, key('running'), - key('processed'), key('owners'), - key('worker', worker_id, 'queue'), + key('leases'), ) end @@ -374,7 +371,8 @@ def heartbeat case command&.first when :tick - heartbeat_process.tick!(command.last) + # command = [:tick, entry_id, lease_id] + heartbeat_process.tick!(command[1], command[2]) when :stop break end diff --git a/ruby/lib/ci/queue/redis/monitor.rb b/ruby/lib/ci/queue/redis/monitor.rb index 8554ab06..8027d7dc 100755 --- a/ruby/lib/ci/queue/redis/monitor.rb +++ b/ruby/lib/ci/queue/redis/monitor.rb @@ -13,11 +13,10 @@ class Monitor DEV_SCRIPTS_ROOT = ::File.expand_path('../../../../../../redis', __FILE__) RELEASE_SCRIPTS_ROOT = ::File.expand_path('../../redis', __FILE__) - def initialize(pipe, logger, redis_url, zset_key, processed_key, owners_key, worker_queue_key) + def initialize(pipe, logger, redis_url, zset_key, owners_key, leases_key) @zset_key = zset_key - @processed_key = processed_key @owners_key = owners_key - @worker_queue_key = worker_queue_key + @leases_key = leases_key @logger = logger @redis = ::Redis.new(url: redis_url, reconnect_attempts: [0, 0, 0.1, 0.5, 1, 3, 5]) @shutdown = false @@ -36,11 +35,11 @@ def soft_signal(sig) @self_pipe_writer << '.' end - def process_tick!(id:) + def process_tick!(id:, lease:) eval_script( :heartbeat, - keys: [@zset_key, @processed_key, @owners_key, @worker_queue_key], - argv: [Time.now.to_f, id] + keys: [@zset_key, @leases_key], + argv: [Time.now.to_f, id, lease] ) rescue => error @logger.info(error) @@ -151,12 +150,11 @@ def monitor redis_url = ARGV[0] zset_key = ARGV[1] -processed_key = ARGV[2] -owners_key = ARGV[3] -worker_queue_key = ARGV[4] +owners_key = ARGV[2] +leases_key = ARGV[3] -logger.debug("Starting monitor: #{redis_url} #{zset_key} #{processed_key}") -manager = CI::Queue::Redis::Monitor.new($stdin, logger, redis_url, zset_key, processed_key, owners_key, worker_queue_key) +logger.debug("Starting monitor: #{redis_url} #{zset_key} #{leases_key}") +manager = CI::Queue::Redis::Monitor.new($stdin, logger, redis_url, zset_key, owners_key, leases_key) # Notify the parent we're ready $stdout.puts(".") diff --git a/ruby/lib/ci/queue/redis/worker.rb b/ruby/lib/ci/queue/redis/worker.rb index 9bfe438f..219cb3c6 100644 --- a/ruby/lib/ci/queue/redis/worker.rb +++ b/ruby/lib/ci/queue/redis/worker.rb @@ -19,6 +19,7 @@ class Worker < Base def initialize(redis, config) @reserved_tests = Concurrent::Set.new + @reserved_leases = Concurrent::Map.new @shutdown_required = false @first_reserve_at = nil super(redis, config) @@ -172,6 +173,11 @@ def worker_queue_length nil end + def lease_for(entry) + test_id = CI::Queue::QueueEntry.test_id(entry) + @reserved_leases[test_id] + end + def report_worker_error(error) build.report_worker_error(error) end @@ -180,11 +186,12 @@ def acknowledge(entry, error: nil, pipeline: redis) test_id = CI::Queue::QueueEntry.test_id(entry) assert_reserved!(test_id) entry = reserved_entries.fetch(test_id, entry) + lease = @reserved_leases.delete(test_id) unreserve_entry(test_id) eval_script( :acknowledge, - keys: [key('running'), key('processed'), key('owners'), key('error-reports'), key('requeued-by'), key('worker', worker_id, 'queue')], - argv: [entry, error.to_s, config.redis_ttl], + keys: [key('running'), key('processed'), key('owners'), key('error-reports'), key('requeued-by'), key('leases')], + argv: [entry, error.to_s, config.redis_ttl, lease.to_s], pipeline: pipeline, ) == 1 end @@ -193,6 +200,7 @@ def requeue(entry, offset: Redis.requeue_offset) test_id = CI::Queue::QueueEntry.test_id(entry) assert_reserved!(test_id) entry = reserved_entries.fetch(test_id, entry) + lease = @reserved_leases[test_id] unreserve_entry(test_id) global_max_requeues = config.global_max_requeues(total) @@ -207,8 +215,9 @@ def requeue(entry, offset: Redis.requeue_offset) key('owners'), key('error-reports'), key('requeued-by'), + key('leases'), ], - argv: [config.max_requeues, global_max_requeues, entry, offset, config.redis_ttl], + argv: [config.max_requeues, global_max_requeues, entry, offset, config.redis_ttl, lease.to_s], ) == 1 unless requeued @@ -222,7 +231,7 @@ def requeue(entry, offset: Redis.requeue_offset) def release! eval_script( :release, - keys: [key('running'), key('worker', worker_id, 'queue'), key('owners')], + keys: [key('running'), key('worker', worker_id, 'queue'), key('owners'), key('leases')], argv: [], ) nil @@ -254,11 +263,12 @@ def assert_reserved!(test_id) end end - def reserve_entry(entry) + def reserve_entry(entry, lease = nil) test_id = CI::Queue::QueueEntry.test_id(entry) reserved_tests << test_id reserved_entries[test_id] = entry reserved_entry_ids[entry] = test_id + @reserved_leases[test_id] = lease if lease end def unreserve_entry(test_id) @@ -343,12 +353,12 @@ def finalize_streaming end def reserve - (try_to_reserve_lost_test || try_to_reserve_test).tap do |entry| - if entry - @first_reserve_at ||= Process.clock_gettime(Process::CLOCK_MONOTONIC) - reserve_entry(entry) - end + entry, lease = try_to_reserve_lost_test || try_to_reserve_test || [nil, nil] + if entry + @first_reserve_at ||= Process.clock_gettime(Process::CLOCK_MONOTONIC) + reserve_entry(entry, lease) end + entry end def try_to_reserve_test @@ -362,6 +372,8 @@ def try_to_reserve_test key('owners'), key('requeued-by'), key('workers'), + key('leases'), + key('lease-counter'), ], argv: [CI::Queue.time_now.to_f, Redis.requeue_offset], ) @@ -370,25 +382,28 @@ def try_to_reserve_test def try_to_reserve_lost_test timeout = config.max_missed_heartbeat_seconds ? config.max_missed_heartbeat_seconds : config.timeout - lost_test = eval_script( + result = eval_script( :reserve_lost, keys: [ key('running'), key('processed'), key('worker', worker_id, 'queue'), key('owners'), + key('leases'), + key('lease-counter'), ], argv: [CI::Queue.time_now.to_f, timeout], ) - if lost_test - build.record_warning(Warnings::RESERVED_LOST_TEST, test: CI::Queue::QueueEntry.test_id(lost_test), timeout: config.timeout) + if result + entry = result.is_a?(Array) ? result[0] : result + build.record_warning(Warnings::RESERVED_LOST_TEST, test: CI::Queue::QueueEntry.test_id(entry), timeout: config.timeout) if CI::Queue.debug? - $stderr.puts "[ci-queue][reserve_lost] worker=#{worker_id} test_id=#{CI::Queue::QueueEntry.test_id(lost_test)}" + $stderr.puts "[ci-queue][reserve_lost] worker=#{worker_id} test_id=#{CI::Queue::QueueEntry.test_id(entry)}" end end - lost_test + result end def push(entries) diff --git a/ruby/lib/ci/queue/static.rb b/ruby/lib/ci/queue/static.rb index 0799e4b4..2dfc6bc3 100644 --- a/ruby/lib/ci/queue/static.rb +++ b/ruby/lib/ci/queue/static.rb @@ -50,10 +50,14 @@ def populate(tests, random: nil) self end - def with_heartbeat(id) + def with_heartbeat(id, lease: nil) yield end + def lease_for(entry) + nil + end + def ensure_heartbeat_thread_alive!; end def boot_heartbeat_process!; end diff --git a/ruby/lib/minitest/queue.rb b/ruby/lib/minitest/queue.rb index b1459b9f..bcbcfc95 100644 --- a/ruby/lib/minitest/queue.rb +++ b/ruby/lib/minitest/queue.rb @@ -163,7 +163,7 @@ def run(reporter, *) rescue_run_errors do begin queue.poll do |example| - result = queue.with_heartbeat(example.queue_entry) do + result = queue.with_heartbeat(example.queue_entry, lease: queue.lease_for(example.queue_entry)) do example.run end diff --git a/ruby/test/ci/queue/redis_test.rb b/ruby/test/ci/queue/redis_test.rb index f56e3991..1de567db 100644 --- a/ruby/test/ci/queue/redis_test.rb +++ b/ruby/test/ci/queue/redis_test.rb @@ -321,27 +321,35 @@ def test_reserve_defers_own_requeued_test_once assert_nil @redis.hget(requeued_by_key, entry) second_try = queue.send(:try_to_reserve_test) - assert_equal entry, second_try + assert_equal entry, second_try[0] end - def test_heartbeat_uses_entry_for_processed_check + def test_heartbeat_only_checks_lease queue = worker(1, populate: false) entry = CI::Queue::QueueEntry.format('ATest#test_foo', '/tmp/a_test.rb') + lease = "42" - @redis.sadd(queue.send(:key, 'processed'), entry) + # Set up: entry is in running with a matching lease + @redis.zadd(queue.send(:key, 'running'), 0, entry) + @redis.hset(queue.send(:key, 'leases'), entry, lease) + # Heartbeat with matching lease should succeed (even if processed) + @redis.sadd(queue.send(:key, 'processed'), entry) result = queue.send( :eval_script, :heartbeat, - keys: [ - queue.send(:key, 'running'), - queue.send(:key, 'processed'), - queue.send(:key, 'owners'), - queue.send(:key, 'worker', queue.config.worker_id, 'queue'), - ], - argv: [CI::Queue.time_now.to_f, entry], + keys: [queue.send(:key, 'running'), queue.send(:key, 'leases')], + argv: [CI::Queue.time_now.to_f, entry, lease], ) + assert_equal 0, result # zadd returns 0 for update (not new) + # Heartbeat with wrong lease should be no-op + result = queue.send( + :eval_script, + :heartbeat, + keys: [queue.send(:key, 'running'), queue.send(:key, 'leases')], + argv: [CI::Queue.time_now.to_f, entry, "wrong-lease"], + ) assert_nil result end From c7963c94d9756b7fd742a9e1f2dc6e2d9d7f4b5e Mon Sep 17 00:00:00 2001 From: Ian Ker-Seymer Date: Fri, 27 Mar 2026 01:20:09 -0400 Subject: [PATCH 3/6] Fix Python client and integration test for lease-based scripts - Update Python distributed.py to pass leases_key and lease_counter_key to reserve, reserve_lost, acknowledge, and requeue Lua scripts - Parse [entry, lease] array return from reserve/reserve_lost - Store and pass lease IDs through acknowledge/requeue - Update heartbeat integration test: with lease-based heartbeat the entry stays alive for the full test duration, so the test is never stolen and no RESERVED_LOST_TEST warning is generated --- python/ciqueue/distributed.py | 29 ++++++++++++++++++-- ruby/test/integration/minitest_redis_test.rb | 4 ++- 2 files changed, 29 insertions(+), 4 deletions(-) diff --git a/python/ciqueue/distributed.py b/python/ciqueue/distributed.py index e8b9acc4..1120bdf5 100644 --- a/python/ciqueue/distributed.py +++ b/python/ciqueue/distributed.py @@ -62,6 +62,7 @@ def __init__(self, tests, worker_id, redis, build_id, # pylint: disable=too-man super(Worker, self).__init__(redis=redis, build_id=build_id) self.timeout = timeout self.total = len(tests) + self._leases = {} self.max_requeues = max_requeues self.global_max_requeues = math.ceil(len(tests) * requeue_tolerance) self.worker_id = worker_id @@ -88,6 +89,7 @@ def shutdown(self): self.shutdown_required = True def acknowledge(self, test): + lease = self._leases.pop(test, '') return self._eval_script( 'acknowledge', keys=[ @@ -96,14 +98,16 @@ def acknowledge(self, test): self.key('owners'), self.key('error-reports'), self.key('requeued-by'), + self.key('leases'), ], - args=[test, '', 0], + args=[test, '', 0, lease], ) == 1 def requeue(self, test, offset=42): if not (self.max_requeues > 0 and self.global_max_requeues > 0.0): return False + lease = self._leases.get(test, '') return self._eval_script( 'requeue', keys=[ @@ -115,8 +119,9 @@ def requeue(self, test, offset=42): self.key('owners'), self.key('error-reports'), self.key('requeued-by'), + self.key('leases'), ], - args=[self.max_requeues, self.global_max_requeues, test, offset, 0], + args=[self.max_requeues, self.global_max_requeues, test, offset, 0, lease], ) == 1 def retry_queue(self): @@ -154,7 +159,21 @@ def _register(self): self.redis.sadd(self.key('workers'), self.worker_id) def _reserve(self): - return self._try_to_reserve_lost_test() or self._try_to_reserve_test() + result = self._try_to_reserve_lost_test() or self._try_to_reserve_test() + if result: + if isinstance(result, list): + entry, lease = result[0], result[1] + if isinstance(entry, bytes): + entry = entry.decode() + if isinstance(lease, bytes): + lease = lease.decode() + self._leases[entry] = lease + return entry + else: + if isinstance(result, bytes): + result = result.decode() + return result + return result def _try_to_reserve_lost_test(self): if self.timeout: @@ -168,6 +187,8 @@ def _try_to_reserve_lost_test(self): self.worker_id, 'queue'), self.key('owners'), + self.key('leases'), + self.key('lease-counter'), ], args=[time.time(), self.timeout], ) @@ -183,6 +204,8 @@ def _try_to_reserve_test(self): self.key('owners'), self.key('requeued-by'), self.key('workers'), + self.key('leases'), + self.key('lease-counter'), ], args=[ time.time(), diff --git a/ruby/test/integration/minitest_redis_test.rb b/ruby/test/integration/minitest_redis_test.rb index 92566dc5..9b997344 100644 --- a/ruby/test/integration/minitest_redis_test.rb +++ b/ruby/test/integration/minitest_redis_test.rb @@ -91,7 +91,9 @@ def test_lost_test_with_heartbeat_monitor # lost_test.rb test_foo has no assertions (only sleep) assert_equal "Ran 1 tests, 0 assertions, 0 failures, 0 errors, 0 skips, 0 requeues in X.XXs (aggregated)", result warnings = warnings_file.read.lines.map { |line| JSON.parse(line) } - assert_equal 1, warnings.size + # With lease-based heartbeat, the heartbeat keeps the entry alive for the + # full test duration, so the test is never stolen and no warning is generated. + assert_equal 0, warnings.size end end From feac86b73ee6921d914134c4172fe442f62cf10a Mon Sep 17 00:00:00 2001 From: Ian Ker-Seymer Date: Fri, 27 Mar 2026 01:26:32 -0400 Subject: [PATCH 4/6] Fix Python decode error: _reserve returns raw bytes for poll to decode --- python/ciqueue/distributed.py | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/python/ciqueue/distributed.py b/python/ciqueue/distributed.py index 1120bdf5..1fd209b6 100644 --- a/python/ciqueue/distributed.py +++ b/python/ciqueue/distributed.py @@ -74,7 +74,7 @@ def poll(): while not self.shutdown_required and len(self): # pylint: disable=len-as-condition test = self._reserve() if test: - yield test.decode() + yield test.decode() if isinstance(test, bytes) else test else: time.sleep(0.05) @@ -163,15 +163,12 @@ def _reserve(self): if result: if isinstance(result, list): entry, lease = result[0], result[1] - if isinstance(entry, bytes): - entry = entry.decode() - if isinstance(lease, bytes): - lease = lease.decode() - self._leases[entry] = lease - return entry + # Store lease keyed by the decoded entry (acknowledge receives decoded strings) + entry_str = entry.decode() if isinstance(entry, bytes) else entry + lease_str = lease.decode() if isinstance(lease, bytes) else str(lease) + self._leases[entry_str] = lease_str + return entry # Return raw (bytes or str) — poll() handles .decode() else: - if isinstance(result, bytes): - result = result.decode() return result return result From ff8ead492f0b68a1c5963fe2faf7b3bf796e9a0f Mon Sep 17 00:00:00 2001 From: Ian Ker-Seymer Date: Fri, 27 Mar 2026 15:54:58 -0400 Subject: [PATCH 5/6] Fix flaky TruffleRuby test: increase heartbeat timeout for lost test The test_lost_test_with_heartbeat_monitor test was using --heartbeat 1, meaning a test would be considered 'lost' after just 1 second without a heartbeat tick. Since the heartbeat thread ticks at most every ~1 second, TruffleRuby's slower thread scheduling left essentially zero margin, causing the test to be stolen and a warning to be generated. Increasing to --heartbeat 5 gives ample margin for the heartbeat to tick while still testing the core behavior: heartbeat keeps the test alive so it is never stolen. --- ruby/test/integration/minitest_redis_test.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ruby/test/integration/minitest_redis_test.rb b/ruby/test/integration/minitest_redis_test.rb index 9b997344..cf78a7a2 100644 --- a/ruby/test/integration/minitest_redis_test.rb +++ b/ruby/test/integration/minitest_redis_test.rb @@ -62,7 +62,7 @@ def test_lost_test_with_heartbeat_monitor '--timeout', '1', '--max-requeues', '1', '--requeue-tolerance', '1', - '--heartbeat', '1', + '--heartbeat', '5', '-Itest', 'test/lost_test.rb', chdir: 'test/fixtures/', From 1b88d741f917cd1e76961ab73e3b10c353f93bb5 Mon Sep 17 00:00:00 2001 From: Ian Ker-Seymer Date: Mon, 30 Mar 2026 11:18:47 -0400 Subject: [PATCH 6/6] Address review: fix lease leak in requeue, remove dead :reset heartbeat state --- ruby/lib/ci/queue/redis/base.rb | 3 +++ ruby/lib/ci/queue/redis/worker.rb | 3 ++- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/ruby/lib/ci/queue/redis/base.rb b/ruby/lib/ci/queue/redis/base.rb index 92dfc705..db835912 100644 --- a/ruby/lib/ci/queue/redis/base.rb +++ b/ruby/lib/ci/queue/redis/base.rb @@ -373,6 +373,9 @@ def heartbeat when :tick # command = [:tick, entry_id, lease_id] heartbeat_process.tick!(command[1], command[2]) + when :reset + # Test finished, stop ticking until next test starts + nil when :stop break end diff --git a/ruby/lib/ci/queue/redis/worker.rb b/ruby/lib/ci/queue/redis/worker.rb index 219cb3c6..38fa4483 100644 --- a/ruby/lib/ci/queue/redis/worker.rb +++ b/ruby/lib/ci/queue/redis/worker.rb @@ -200,7 +200,7 @@ def requeue(entry, offset: Redis.requeue_offset) test_id = CI::Queue::QueueEntry.test_id(entry) assert_reserved!(test_id) entry = reserved_entries.fetch(test_id, entry) - lease = @reserved_leases[test_id] + lease = @reserved_leases.delete(test_id) unreserve_entry(test_id) global_max_requeues = config.global_max_requeues(total) @@ -224,6 +224,7 @@ def requeue(entry, offset: Redis.requeue_offset) reserved_tests << test_id reserved_entries[test_id] = entry reserved_entry_ids[entry] = test_id + @reserved_leases[test_id] = lease if lease end requeued end