Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ruby/Gemfile.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: .
specs:
ci-queue (0.88.0)
ci-queue (0.89.0)
logger

GEM
Expand Down
4 changes: 3 additions & 1 deletion ruby/lib/ci/queue/queue_entry.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ def self.parse(entry)
end

def self.format(test_id, file_path)
JSON.dump({ test_id: test_id, file_path: file_path })
raise ArgumentError, "file_path is required for '#{test_id}' — the test file path must be resolvable" if file_path.nil? || file_path.empty?
canonical = load_error_payload?(file_path) ? file_path : ::File.expand_path(file_path)
JSON.dump({ test_id: test_id, file_path: canonical })
end

def self.load_error_payload?(file_path)
Expand Down
14 changes: 14 additions & 0 deletions ruby/lib/ci/queue/redis/build_record.rb
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,10 @@ def error_reports
redis.hgetall(key('error-reports')).transform_keys { |entry| CI::Queue::QueueEntry.test_id(entry) }
end

def failed_test_entries
redis.hkeys(key('error-reports'))
end

def flaky_reports
redis.smembers(key('flaky-reports')).map { |entry| CI::Queue::QueueEntry.test_id(entry) }
end
Expand Down Expand Up @@ -177,6 +181,16 @@ def reset_stats(stat_names)
pipeline.hdel(key(stat_name), config.worker_id)
end
end
# Purge any error-report-deltas that reference this worker so that
# apply_error_report_delta_correction cannot double-subtract from
# the now-zeroed counters on a subsequent successful retry.
deltas = redis.hgetall(key('error-report-deltas'))
to_delete = deltas.filter_map do |entry, delta_json|
entry if JSON.parse(delta_json)['worker_id'].to_s == config.worker_id.to_s
rescue JSON::ParserError
nil
end
redis.hdel(key('error-report-deltas'), *to_delete) unless to_delete.empty?
end

private
Expand Down
15 changes: 15 additions & 0 deletions ruby/lib/ci/queue/redis/retry.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,24 @@ def stream_populate(tests, random: nil, batch_size: nil)
self
end

# Queue a Redis SADD so that BuildRecord#record_success can include this
# in its multi-exec transaction. Without this, Static#acknowledge returns
# a Ruby value (not a Redis future), shifting the result indices and
# breaking the stats delta correction.
def acknowledge(entry, error: nil, pipeline: redis)
@progress += 1
return @progress unless pipeline
test_id = CI::Queue::QueueEntry.test_id(entry)
pipeline.sadd(key('processed'), test_id)
end

private

attr_reader :redis

def key(*args)
['build', config.build_id, *args].join(':')
end
end
end
end
Expand Down
9 changes: 9 additions & 0 deletions ruby/lib/ci/queue/redis/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,15 @@ def retry_queue
log.select! { |entry| failures.include?(CI::Queue::QueueEntry.test_id(entry)) }
log.uniq! { |entry| CI::Queue::QueueEntry.test_id(entry) }
log.reverse!

if log.empty?
# Per-worker log has no matching failures — this worker didn't run
# the failing tests (e.g. Buildkite rebuild with new worker IDs,
# or a different parallel slot). Fall back to ALL unresolved
# failures from error-reports so any worker can retry them.
log = redis.hkeys(key('error-reports'))
end

Retry.new(log, config, redis: redis)
end

Expand Down
2 changes: 1 addition & 1 deletion ruby/lib/ci/queue/version.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

module CI
module Queue
VERSION = '0.88.0'
VERSION = '0.89.0'
DEV_SCRIPTS_ROOT = ::File.expand_path('../../../../../redis', __FILE__)
RELEASE_SCRIPTS_ROOT = ::File.expand_path('../redis', __FILE__)
end
Expand Down
11 changes: 10 additions & 1 deletion ruby/lib/minitest/queue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,16 @@ def id
end

def queue_entry
@queue_entry ||= CI::Queue::QueueEntry.format(id, nil)
@queue_entry ||= begin
unless runnable.is_a?(Module)
raise ArgumentError, "runnable must be a Module (got #{runnable.class}). " \
"Do not create SingleExample with string class names."
end
file_path = runnable.instance_method(method_name).source_location&.first
raise ArgumentError, "Cannot resolve source file for #{id} — " \
"ensure the test method is defined in a Ruby source file" if file_path.nil?
CI::Queue::QueueEntry.format(id, file_path)
end
end

def <=>(other)
Expand Down
5 changes: 4 additions & 1 deletion ruby/lib/rspec/queue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,10 @@ def id
end

def queue_entry
@queue_entry ||= CI::Queue::QueueEntry.format(id, nil)
@queue_entry ||= begin
file_path = example.metadata[:absolute_file_path] || example.file_path
CI::Queue::QueueEntry.format(id, file_path)
end
end

def <=>(other)
Expand Down
4 changes: 2 additions & 2 deletions ruby/lib/rspec/queue/build_status_recorder.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@ def initialize(*)

def example_passed(notification)
example = notification.example
entry = CI::Queue::QueueEntry.format(example.id, nil)
entry = CI::Queue::QueueEntry.format(example.id, example.file_path)
build.record_success(entry)
end

def example_failed(notification)
example = notification.example
entry = CI::Queue::QueueEntry.format(example.id, nil)
entry = CI::Queue::QueueEntry.format(example.id, example.file_path)
build.record_error(entry, dump(notification))
end

Expand Down
25 changes: 3 additions & 22 deletions ruby/test/ci/queue/queue_entry_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,9 @@
require 'test_helper'

class CI::Queue::QueueEntryTest < Minitest::Test
def test_parse_without_file_path
entry = CI::Queue::QueueEntry.format("FooTest#test_bar", nil)
parsed = CI::Queue::QueueEntry.parse(entry)
assert_equal "FooTest#test_bar", parsed[:test_id]
assert_nil parsed[:file_path]
def test_format_raises_without_file_path
assert_raises(ArgumentError) { CI::Queue::QueueEntry.format("FooTest#test_bar", nil) }
assert_raises(ArgumentError) { CI::Queue::QueueEntry.format("FooTest#test_bar", "") }
end

def test_parse_with_file_path
Expand All @@ -16,18 +14,6 @@ def test_parse_with_file_path
assert_equal "/tmp/foo_test.rb", parsed[:file_path]
end

def test_format_without_file_path
entry_nil = CI::Queue::QueueEntry.format("FooTest#test_bar", nil)
parsed_nil = JSON.parse(entry_nil, symbolize_names: true)
assert_equal "FooTest#test_bar", parsed_nil[:test_id]
assert_nil parsed_nil[:file_path]

entry_empty = CI::Queue::QueueEntry.format("FooTest#test_bar", "")
parsed_empty = JSON.parse(entry_empty, symbolize_names: true)
assert_equal "FooTest#test_bar", parsed_empty[:test_id]
assert_equal "", parsed_empty[:file_path]
end

def test_format_with_file_path
entry = CI::Queue::QueueEntry.format("FooTest#test_bar", "/tmp/foo_test.rb")
parsed = JSON.parse(entry, symbolize_names: true)
Expand Down Expand Up @@ -60,11 +46,6 @@ def test_round_trip_preserves_test_id
assert_equal file_path, parsed[:file_path]
end

def test_test_id_without_file_path
entry = CI::Queue::QueueEntry.format("FooTest#test_bar", nil)
assert_equal "FooTest#test_bar", CI::Queue::QueueEntry.test_id(entry)
end

def test_test_id_with_file_path
entry = CI::Queue::QueueEntry.format("FooTest#test_bar", "/tmp/foo_test.rb")
assert_equal "FooTest#test_bar", CI::Queue::QueueEntry.test_id(entry)
Expand Down
Loading
Loading