diff --git a/ruby/lib/ci/queue/redis/retry.rb b/ruby/lib/ci/queue/redis/retry.rb index 85bcc27e..be79e26c 100644 --- a/ruby/lib/ci/queue/redis/retry.rb +++ b/ruby/lib/ci/queue/redis/retry.rb @@ -12,6 +12,22 @@ def build @build ||= CI::Queue::Redis::BuildRecord.new(self, redis, config) end + # Retry queue is pre-populated with failed test entries from the previous run. + # Don't replace them with the full preresolved/lazy test list. + # QueuePopulationStrategy#configure_lazy_queue will still set entry_resolver, + # so poll uses LazyEntryResolver to lazily load test files on demand. + # The random/batch_size params are intentionally ignored since we keep + # the existing queue contents as-is. + # + # Note: populate (non-stream) is intentionally NOT overridden here. + # RSpec and non-lazy Minitest retries call populate to build the + # @index mapping test IDs to runnable objects, which poll needs to + # yield proper test/example instances. In those paths, @queue contains + # bare test IDs that match @index keys, so populate works correctly. + def stream_populate(tests, random: nil, batch_size: nil) + self + end + private attr_reader :redis diff --git a/ruby/lib/ci/queue/redis/worker.rb b/ruby/lib/ci/queue/redis/worker.rb index 38fa4483..afd9d8ef 100644 --- a/ruby/lib/ci/queue/redis/worker.rb +++ b/ruby/lib/ci/queue/redis/worker.rb @@ -148,9 +148,10 @@ def retrying? def retry_queue failures = build.failed_tests.to_set log = redis.lrange(key('worker', worker_id, 'queue'), 0, -1) - log = log.map { |entry| CI::Queue::QueueEntry.test_id(entry) } - log.select! { |test_id| failures.include?(test_id) } - log.uniq! + # Keep full entries (test_id + file_path) so lazy loading can resolve them. + # Filter by test_id against failures without stripping file paths. + log.select! { |entry| failures.include?(CI::Queue::QueueEntry.test_id(entry)) } + log.uniq! { |entry| CI::Queue::QueueEntry.test_id(entry) } log.reverse! Retry.new(log, config, redis: redis) end diff --git a/ruby/lib/ci/queue/static.rb b/ruby/lib/ci/queue/static.rb index 2dfc6bc3..c58d44a0 100644 --- a/ruby/lib/ci/queue/static.rb +++ b/ruby/lib/ci/queue/static.rb @@ -16,6 +16,7 @@ def from_uri(uri, config) TEN_MINUTES = 60 * 10 attr_reader :progress, :total + attr_accessor :entry_resolver def initialize(tests, config) @queue = tests @@ -50,6 +51,16 @@ def populate(tests, random: nil) self end + # Support lazy loading mode: accept an enumerator of entries and + # store them in queue order (no shuffling). This preserves the + # exact order from the input file for local reproduction. + def stream_populate(tests, random: nil, batch_size: nil) + @queue = [] + tests.each { |entry| @queue << entry } + @total = @queue.size + self + end + def with_heartbeat(id, lease: nil) yield end @@ -79,11 +90,15 @@ def expired? end def populated? - !!defined?(@index) + !!defined?(@index) || @queue.any? end def to_a - @queue.map { |i| index.fetch(i) } + if defined?(@index) && @index + @queue.map { |i| index.fetch(i) } + else + @queue.dup + end end def size @@ -101,9 +116,28 @@ def running def poll while !@shutdown && config.circuit_breakers.none?(&:open?) && !max_test_failed? && reserved_test = @queue.shift reserved_tests << reserved_test - yield index.fetch(reserved_test) + if entry_resolver + resolved = entry_resolver.call(reserved_test) + # Track the original queue entry so requeue can push it back + # with its full payload (file path, load-error data, etc.). + reserved_entries[resolved.id] = reserved_test if resolved.respond_to?(:id) + yield resolved + elsif defined?(@index) && @index + # Queue entries may be JSON-formatted (with test_id + file_path) while + # the index is keyed by bare test_id from populate. Try the raw entry + # first, then fall back to extracting the test_id. + test_id = begin + CI::Queue::QueueEntry.test_id(reserved_test) + rescue JSON::ParserError + reserved_test + end + yield index.fetch(test_id) + else + yield reserved_test + end end reserved_tests.clear + reserved_entries.clear end def exhausted? @@ -134,7 +168,10 @@ def requeue(entry) return false unless should_requeue?(test_id) requeues[test_id] += 1 - @queue.unshift(test_id) + # Push back the original queue entry (with file path / load-error payload) + # so entry_resolver can fully resolve it on the next poll iteration. + original_entry = reserved_entries.delete(test_id) || test_id + @queue.unshift(original_entry) true end @@ -150,6 +187,10 @@ def requeues @requeues ||= Hash.new(0) end + def reserved_entries + @reserved_entries ||= {} + end + def reserved_tests @reserved_tests ||= Concurrent::Set.new end diff --git a/ruby/test/ci/queue/redis_test.rb b/ruby/test/ci/queue/redis_test.rb index 1de567db..3693bd93 100644 --- a/ruby/test/ci/queue/redis_test.rb +++ b/ruby/test/ci/queue/redis_test.rb @@ -61,6 +61,149 @@ def test_retry_queue_with_all_tests_passing_2 assert_equal retry_test_order, retry_test_order end + def test_retry_queue_preserves_full_entries_with_file_paths + # Use stream_populate with file-path entries (as in preresolved mode), + # then verify retry_queue preserves the full entry including the file path. + @redis.flushdb + build_id = 'retry-file-paths' + leader = worker(1, populate: false, lazy_load_streaming_timeout: 2, queue_init_timeout: 2, build_id: build_id) + consumer = worker(2, populate: false, lazy_load_streaming_timeout: 2, queue_init_timeout: 2, build_id: build_id) + consumer.entry_resolver = ->(entry) { entry } + + tests = [ + EntryTest.new('ATest#test_foo', CI::Queue::QueueEntry.format('ATest#test_foo', '/tmp/a_test.rb')), + EntryTest.new('ATest#test_bar', CI::Queue::QueueEntry.format('ATest#test_bar', '/tmp/a_test.rb')), + ] + + leader_thread = Thread.new do + leader.stream_populate(tests, random: Random.new(0), batch_size: 10) + end + + timeout_at = Process.clock_gettime(Process::CLOCK_MONOTONIC) + 2 + loop do + status = @redis.get(leader.send(:key, 'master-status')) + break if status == 'ready' + raise "streaming status not set" if Process.clock_gettime(Process::CLOCK_MONOTONIC) > timeout_at + sleep 0.01 + end + + # Consumer polls all tests, failing the first one + failed_entry = nil + consumer.poll do |entry| + if failed_entry.nil? + failed_entry = entry + consumer.report_failure! + # record_error calls acknowledge internally + consumer.build.record_error(entry, 'Failed') + else + consumer.report_success! + consumer.acknowledge(entry) + end + end + + leader_thread.join(2) + + retry_queue = consumer.retry_queue + refute_predicate retry_queue, :exhausted? + + retry_entries = retry_queue.instance_variable_get(:@queue).dup + assert_equal 1, retry_entries.size + # The critical assertion: retry entry must be a JSON entry with file_path, + # not just the bare test ID. A regression in retry_queue would strip this. + parsed = CI::Queue::QueueEntry.parse(retry_entries.first) + assert parsed[:file_path], "Retry entry should preserve the full entry with file path" + failed_test_id = CI::Queue::QueueEntry.test_id(failed_entry) + assert_equal failed_test_id, CI::Queue::QueueEntry.test_id(retry_entries.first) + ensure + leader_thread&.kill + end + + def test_retry_queue_stream_populate_is_noop + target = shuffled_test_list.first + @queue.poll do |test| + if test == target + @queue.report_failure! + # record_error calls acknowledge internally + @queue.build.record_error(test.queue_entry, 'Failed') + else + @queue.report_success! + @queue.acknowledge(test.queue_entry) + end + end + + retry_queue = @queue.retry_queue + original_queue_contents = retry_queue.instance_variable_get(:@queue).dup + refute_empty original_queue_contents + + # stream_populate should NOT replace the retry queue's contents + dummy_entries = Enumerator.new do |yielder| + yielder << CI::Queue::QueueEntry.format("ZTest#test_zzz", "/tmp/z_test.rb") + end + retry_queue.stream_populate(dummy_entries, random: Random.new(0)) + + assert_equal original_queue_contents, retry_queue.instance_variable_get(:@queue), + "stream_populate should not replace retry queue contents" + end + + def test_retry_queue_works_with_entry_resolver + # Fail a test, then verify retry queue works with entry_resolver (lazy loading) + target = shuffled_test_list.first + @queue.poll do |test| + if test == target + @queue.report_failure! + # record_error calls acknowledge internally + @queue.build.record_error(test.queue_entry, 'Failed') + else + @queue.report_success! + @queue.acknowledge(test.queue_entry) + end + end + + retry_queue = @queue.retry_queue + + # Set up entry_resolver (as configure_lazy_queue would do) + resolved_entries = [] + retry_queue.entry_resolver = ->(entry) { + resolved_entries << entry + entry + } + + # stream_populate is a no-op, preserving the retry entries + retry_queue.stream_populate(Enumerator.new { |y| }, random: Random.new(0)) + + # Poll should use entry_resolver, not index.fetch — no KeyError crash + polled = [] + retry_queue.poll do |test| + polled << test + retry_queue.acknowledge(test) + end + + assert_equal retry_queue.total, polled.size + assert_equal polled.size, resolved_entries.size, + "All polled entries should have gone through entry_resolver" + end + + def test_retry_queue_with_multiple_failures_deduplicates + # Fail multiple tests, verify retry queue deduplicates by test_id + failed_ids = [] + @queue.poll do |test| + @queue.report_failure! + @queue.build.record_error(test.queue_entry, 'Failed') + failed_ids << test.id + end + + assert_operator failed_ids.size, :>=, 2, "Need multiple failures for this test" + + retry_queue = @queue.retry_queue + retry_entries = retry_queue.instance_variable_get(:@queue).dup + + # Each failed test should appear exactly once (no duplicates from requeues) + retry_test_ids = retry_entries.map { |e| CI::Queue::QueueEntry.test_id(e) } + assert_equal retry_test_ids.uniq, retry_test_ids, + "Retry queue should not contain duplicate test IDs" + assert_equal failed_ids.uniq.sort, retry_test_ids.sort + end + def test_shutdown poll(@queue) do @queue.shutdown! diff --git a/ruby/test/ci/queue/static_test.rb b/ruby/test/ci/queue/static_test.rb index 9c11fc41..22c7ba91 100644 --- a/ruby/test/ci/queue/static_test.rb +++ b/ruby/test/ci/queue/static_test.rb @@ -12,15 +12,145 @@ def test_expired refute queue.expired? end - private - - def build_queue - CI::Queue::Static.new(TEST_LIST.map(&:id), config) - end - def test_from_uri queue = CI::Queue.from_uri('list:foo:bar:plop%3Ffizz', config) assert_instance_of CI::Queue::Static, queue assert_equal %w(foo bar plop?fizz), queue.to_a end + + def test_retry_stream_populate_is_noop + failed_entries = ["ATest#test_foo\t/tmp/a_test.rb", "BTest#test_bar\t/tmp/b_test.rb"] + retry_queue = CI::Queue::Redis::Retry.new( + failed_entries, + config, + redis: nil, # not needed for this unit test + ) + + # stream_populate should preserve the existing queue + replacement = Enumerator.new do |y| + y << "ZTest#test_zzz\t/tmp/z_test.rb" + end + retry_queue.stream_populate(replacement, random: Random.new(0)) + + assert_equal failed_entries, retry_queue.instance_variable_get(:@queue) + assert_equal failed_entries.size, retry_queue.total + end + + def test_retry_queue_poll_with_entry_resolver + entry = "ATest#test_foo\t/tmp/a_test.rb" + retry_queue = CI::Queue::Redis::Retry.new( + [entry], + config, + redis: nil, + ) + + resolved = [] + retry_queue.entry_resolver = ->(e) { + resolved << e + e + } + + polled = [] + retry_queue.poll do |test| + polled << test + retry_queue.acknowledge(test) + end + + assert_equal 1, polled.size + assert_equal entry, polled.first + assert_equal 1, resolved.size, "entry_resolver should be called for each entry" + end + + def test_retry_populate_builds_index_for_eager_mode + # populate must still work on Retry for RSpec and non-lazy Minitest retries, + # which call populate to build @index for yielding runnable test objects. + retry_queue = CI::Queue::Redis::Retry.new( + TEST_LIST.map(&:id), + config, + redis: nil, + ) + + retry_queue.populate(TEST_LIST, random: Random.new(0)) + + polled = [] + retry_queue.poll do |test| + polled << test + retry_queue.acknowledge(test.id) + end + + assert_equal TEST_LIST.size, polled.size + assert polled.all? { |t| t.respond_to?(:id) }, "populate should build index so poll yields test objects" + end + + def test_retry_queue_poll_with_bare_test_ids + # Entries without file paths (non-preresolved / eager mode) + entries = ["ATest#test_foo", "BTest#test_bar"] + retry_queue = CI::Queue::Redis::Retry.new( + entries.dup, + config, + redis: nil, + ) + + polled = [] + retry_queue.poll do |test| + polled << test + retry_queue.acknowledge(test) + end + + assert_equal 2, polled.size + assert_equal entries, polled + end + + def test_retry_queue_requeue_preserves_full_entry + # Verify that requeue pushes back the original entry (with file path), + # not just test.id, so entry_resolver can resolve it on the next attempt. + entry = CI::Queue::QueueEntry.format("ATest#test_foo", "/tmp/a_test.rb") + requeue_config = CI::Queue::Configuration.new( + timeout: 0.2, + build_id: '42', + worker_id: '1', + max_requeues: 1, + requeue_tolerance: 1.0, + max_consecutive_failures: 10, + ) + retry_queue = CI::Queue::Redis::Retry.new( + [entry.dup], + requeue_config, + redis: nil, + ) + + resolved_entries = [] + retry_queue.entry_resolver = ->(e) { + resolved_entries << e + # Return a mock object with .id that returns the test_id + Struct.new(:id, :queue_entry).new(CI::Queue::QueueEntry.test_id(e), e) + } + + poll_count = 0 + retry_queue.poll do |test| + poll_count += 1 + if poll_count == 1 + retry_queue.report_failure! + retry_queue.requeue(test.queue_entry) || retry_queue.acknowledge(test.queue_entry) + else + retry_queue.report_success! + retry_queue.acknowledge(test.queue_entry) + end + end + + assert_equal 2, poll_count, "Test should be polled twice (original + requeue)" + # Both resolutions should receive the full JSON entry with file_path + assert_equal 2, resolved_entries.size + resolved_entries.each do |e| + parsed = CI::Queue::QueueEntry.parse(e) + assert parsed[:file_path], + "Requeued entry should preserve file path" + end + end + + private + + def build_queue + CI::Queue::Static.new(TEST_LIST.map(&:id), config) + end end