Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 24 additions & 4 deletions python/ciqueue/distributed.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ def __init__(self, tests, worker_id, redis, build_id, # pylint: disable=too-man
super(Worker, self).__init__(redis=redis, build_id=build_id)
self.timeout = timeout
self.total = len(tests)
self._leases = {}
self.max_requeues = max_requeues
self.global_max_requeues = math.ceil(len(tests) * requeue_tolerance)
self.worker_id = worker_id
Expand All @@ -73,7 +74,7 @@ def poll():
while not self.shutdown_required and len(self): # pylint: disable=len-as-condition
test = self._reserve()
if test:
yield test.decode()
yield test.decode() if isinstance(test, bytes) else test
else:
time.sleep(0.05)

Expand All @@ -88,6 +89,7 @@ def shutdown(self):
self.shutdown_required = True

def acknowledge(self, test):
lease = self._leases.pop(test, '')
return self._eval_script(
'acknowledge',
keys=[
Expand All @@ -96,14 +98,16 @@ def acknowledge(self, test):
self.key('owners'),
self.key('error-reports'),
self.key('requeued-by'),
self.key('leases'),
],
args=[test, '', 0],
args=[test, '', 0, lease],
) == 1

def requeue(self, test, offset=42):
if not (self.max_requeues > 0 and self.global_max_requeues > 0.0):
return False

lease = self._leases.get(test, '')
return self._eval_script(
'requeue',
keys=[
Expand All @@ -115,8 +119,9 @@ def requeue(self, test, offset=42):
self.key('owners'),
self.key('error-reports'),
self.key('requeued-by'),
self.key('leases'),
],
args=[self.max_requeues, self.global_max_requeues, test, offset, 0],
args=[self.max_requeues, self.global_max_requeues, test, offset, 0, lease],
) == 1

def retry_queue(self):
Expand Down Expand Up @@ -154,7 +159,18 @@ def _register(self):
self.redis.sadd(self.key('workers'), self.worker_id)

def _reserve(self):
return self._try_to_reserve_lost_test() or self._try_to_reserve_test()
result = self._try_to_reserve_lost_test() or self._try_to_reserve_test()
if result:
if isinstance(result, list):
entry, lease = result[0], result[1]
# Store lease keyed by the decoded entry (acknowledge receives decoded strings)
entry_str = entry.decode() if isinstance(entry, bytes) else entry
lease_str = lease.decode() if isinstance(lease, bytes) else str(lease)
self._leases[entry_str] = lease_str
return entry # Return raw (bytes or str) — poll() handles .decode()
else:
return result
return result

def _try_to_reserve_lost_test(self):
if self.timeout:
Expand All @@ -168,6 +184,8 @@ def _try_to_reserve_lost_test(self):
self.worker_id,
'queue'),
self.key('owners'),
self.key('leases'),
self.key('lease-counter'),
],
args=[time.time(), self.timeout],
)
Expand All @@ -183,6 +201,8 @@ def _try_to_reserve_test(self):
self.key('owners'),
self.key('requeued-by'),
self.key('workers'),
self.key('leases'),
self.key('lease-counter'),
],
args=[
time.time(),
Expand Down
15 changes: 13 additions & 2 deletions redis/acknowledge.lua
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,23 @@ local processed_key = KEYS[2]
local owners_key = KEYS[3]
local error_reports_key = KEYS[4]
local requeued_by_key = KEYS[5]
local leases_key = KEYS[6]

local entry = ARGV[1]
local error = ARGV[2]
local ttl = ARGV[3]
redis.call('zrem', zset_key, entry)
redis.call('hdel', owners_key, entry) -- Doesn't matter if it was reclaimed by another workers
local lease_id = ARGV[4]

-- Only the current lease holder can remove the entry from the running set.
-- If the lease was transferred (e.g. via reserve_lost), the stale worker
-- must not remove the running entry — that would let the supervisor think
-- the queue is exhausted while the new lease holder is still processing.
if tostring(redis.call('hget', leases_key, entry)) == lease_id then
redis.call('zrem', zset_key, entry)
redis.call('hdel', owners_key, entry)
redis.call('hdel', leases_key, entry)
end

redis.call('hdel', requeued_by_key, entry)
local acknowledged = redis.call('sadd', processed_key, entry) == 1

Expand Down
19 changes: 9 additions & 10 deletions redis/heartbeat.lua
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
local zset_key = KEYS[1]
local processed_key = KEYS[2]
local owners_key = KEYS[3]
local worker_queue_key = KEYS[4]
local leases_key = KEYS[2]

local current_time = ARGV[1]
local entry = ARGV[2]
local lease_id = ARGV[3]

-- already processed, we do not need to bump the timestamp
if redis.call('sismember', processed_key, entry) == 1 then
return false
end

-- we're still the owner of the test, we can bump the timestamp
if redis.call('hget', owners_key, entry) == worker_queue_key then
-- Only the current lease holder can bump the timestamp.
-- We intentionally do NOT check the processed set. A non-owner worker's
-- acknowledge can add the entry to processed, which would poison the
-- current lease holder's heartbeat if we checked it here.
-- The lease check alone is sufficient — once the lease holder acknowledges,
-- they zrem + hdel the lease, so the heartbeat will naturally stop.
if tostring(redis.call('hget', leases_key, entry)) == lease_id then
return redis.call('zadd', zset_key, current_time, entry)
end
2 changes: 2 additions & 0 deletions redis/release.lua
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
local zset_key = KEYS[1]
local worker_queue_key = KEYS[2]
local owners_key = KEYS[3]
local leases_key = KEYS[4]

-- owned_tests = {"SomeTest", "worker:1", "SomeOtherTest", "worker:2", ...}
local owned_tests = redis.call('hgetall', owners_key)
for index, owner_or_test in ipairs(owned_tests) do
if owner_or_test == worker_queue_key then -- If we owned a test
local test = owned_tests[index - 1]
redis.call('zadd', zset_key, "0", test) -- We expire the lease immediately
redis.call('hdel', leases_key, test)
return nil
end
end
Expand Down
11 changes: 9 additions & 2 deletions redis/requeue.lua
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,20 @@ local worker_queue_key = KEYS[5]
local owners_key = KEYS[6]
local error_reports_key = KEYS[7]
local requeued_by_key = KEYS[8]
local leases_key = KEYS[9]

local max_requeues = tonumber(ARGV[1])
local global_max_requeues = tonumber(ARGV[2])
local entry = ARGV[3]
local offset = ARGV[4]
local ttl = tonumber(ARGV[5])
local lease_id = ARGV[6]

if redis.call('hget', owners_key, entry) == worker_queue_key then
redis.call('hdel', owners_key, entry)
-- Only the current lease holder can requeue a test.
-- If the lease was transferred (e.g. via reserve_lost), reject the stale
-- worker's requeue so the running entry stays intact for the new holder.
if tostring(redis.call('hget', leases_key, entry)) ~= lease_id then
return false
end

if redis.call('sismember', processed_key, entry) == 1 then
Expand Down Expand Up @@ -48,6 +53,8 @@ if ttl and ttl > 0 then
redis.call('expire', requeued_by_key, ttl)
end

redis.call('hdel', owners_key, entry)
redis.call('hdel', leases_key, entry)
redis.call('zrem', zset_key, entry)

return true
21 changes: 13 additions & 8 deletions redis/reserve.lua
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ local worker_queue_key = KEYS[4]
local owners_key = KEYS[5]
local requeued_by_key = KEYS[6]
local workers_key = KEYS[7]
local leases_key = KEYS[8]
local lease_counter_key = KEYS[9]

local current_time = ARGV[1]
local defer_offset = tonumber(ARGV[2]) or 0
Expand All @@ -19,6 +21,15 @@ local function insert_with_offset(test)
end
end

local function claim_test(test)
local lease = redis.call('incr', lease_counter_key)
redis.call('zadd', zset_key, current_time, test)
redis.call('lpush', worker_queue_key, test)
redis.call('hset', owners_key, test, worker_queue_key)
redis.call('hset', leases_key, test, lease)
return {test, tostring(lease)}
end

for attempt = 1, max_skip_attempts do
local test = redis.call('rpop', queue_key)
if not test then
Expand All @@ -30,10 +41,7 @@ for attempt = 1, max_skip_attempts do
-- If this build only has one worker, allow immediate self-pickup.
if redis.call('scard', workers_key) <= 1 then
redis.call('hdel', requeued_by_key, test)
redis.call('zadd', zset_key, current_time, test)
redis.call('lpush', worker_queue_key, test)
redis.call('hset', owners_key, test, worker_queue_key)
return test
return claim_test(test)
end

insert_with_offset(test)
Expand All @@ -46,10 +54,7 @@ for attempt = 1, max_skip_attempts do
end
else
redis.call('hdel', requeued_by_key, test)
redis.call('zadd', zset_key, current_time, test)
redis.call('lpush', worker_queue_key, test)
redis.call('hset', owners_key, test, worker_queue_key)
return test
return claim_test(test)
end
end

Expand Down
15 changes: 13 additions & 2 deletions redis/reserve_lost.lua
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,28 @@ local zset_key = KEYS[1]
local processed_key = KEYS[2]
local worker_queue_key = KEYS[3]
local owners_key = KEYS[4]
local leases_key = KEYS[5]
local lease_counter_key = KEYS[6]

local current_time = ARGV[1]
local timeout = ARGV[2]

local lost_tests = redis.call('zrangebyscore', zset_key, 0, current_time - timeout)
for _, test in ipairs(lost_tests) do
if redis.call('sismember', processed_key, test) == 0 then
local lease = redis.call('incr', lease_counter_key)
redis.call('zadd', zset_key, current_time, test)
redis.call('lpush', worker_queue_key, test)
redis.call('hset', owners_key, test, worker_queue_key) -- Take ownership
return test
redis.call('hset', owners_key, test, worker_queue_key)
redis.call('hset', leases_key, test, lease)
return {test, tostring(lease)}
else
-- Test is already processed but still in running (stale). This can happen when
-- a non-owner worker acknowledged the test (marking it processed) but could not
-- remove it from running due to the lease guard. Clean it up.
redis.call('zrem', zset_key, test)
redis.call('hdel', owners_key, test)
redis.call('hdel', leases_key, test)
end
end

Expand Down
30 changes: 12 additions & 18 deletions ruby/lib/ci/queue/redis/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,10 @@ def reconnect_attempts
[0, 0, 0.1, 0.5, 1, 3, 5]
end

def with_heartbeat(id)
def with_heartbeat(id, lease: nil)
if heartbeat_enabled?
ensure_heartbeat_thread_alive!
heartbeat_state.set(:tick, id)
heartbeat_state.set(:tick, id, lease)
end

yield
Expand Down Expand Up @@ -264,12 +264,11 @@ def resolve_lua_includes(script, root)
end

class HeartbeatProcess
def initialize(redis_url, zset_key, processed_key, owners_key, worker_queue_key)
def initialize(redis_url, zset_key, owners_key, leases_key)
@redis_url = redis_url
@zset_key = zset_key
@processed_key = processed_key
@owners_key = owners_key
@worker_queue_key = worker_queue_key
@leases_key = leases_key
end

def boot!
Expand All @@ -281,9 +280,8 @@ def boot!
::File.join(__dir__, "monitor.rb"),
@redis_url,
@zset_key,
@processed_key,
@owners_key,
@worker_queue_key,
@leases_key,
in: child_read,
out: child_write,
)
Expand Down Expand Up @@ -313,8 +311,8 @@ def shutdown!
end
end

def tick!(id)
send_message(:tick!, id: id)
def tick!(id, lease)
send_message(:tick!, id: id, lease: lease.to_s)
end

private
Expand Down Expand Up @@ -355,9 +353,8 @@ def heartbeat_process
@heartbeat_process ||= HeartbeatProcess.new(
@redis_url,
key('running'),
key('processed'),
key('owners'),
key('worker', worker_id, 'queue'),
key('leases'),
)
end

Expand All @@ -369,19 +366,16 @@ def heartbeat
Thread.current.name = "CI::Queue#heartbeat"
Thread.current.abort_on_exception = true

timeout = config.timeout.to_i
loop do
command = nil
command = heartbeat_state.wait(1) # waits for max 1 second but wakes up immediately if we receive a command

case command&.first
when :tick
if timeout > 0
heartbeat_process.tick!(command.last)
timeout -= 1
end
# command = [:tick, entry_id, lease_id]
heartbeat_process.tick!(command[1], command[2])
when :reset
timeout = config.timeout.to_i
# Test finished, stop ticking until next test starts
nil
when :stop
break
end
Expand Down
Loading
Loading