From 17204fdf461ea4c5e8e57188a1e5149434e0a4fd Mon Sep 17 00:00:00 2001 From: Ian Ker-Seymer Date: Wed, 25 Mar 2026 16:05:18 -0400 Subject: [PATCH] Replace tab-delimited QueueEntry format with JSON serialization --- redis/_entry_helpers.lua | 10 ++++----- redis/heartbeat.lua | 3 +-- redis/reserve_lost.lua | 3 +-- ruby/lib/ci/queue/queue_entry.rb | 12 +++------- ruby/lib/ci/queue/redis/base.rb | 5 +---- ruby/lib/ci/queue/redis/monitor.rb | 8 +++---- ruby/lib/ci/queue/redis/worker.rb | 2 +- ruby/test/ci/queue/queue_entry_test.rb | 31 ++++++++++++++++++++++---- ruby/test/ci/queue/redis_test.rb | 18 +++++++-------- 9 files changed, 50 insertions(+), 42 deletions(-) diff --git a/redis/_entry_helpers.lua b/redis/_entry_helpers.lua index c1755278..b2fd2f57 100644 --- a/redis/_entry_helpers.lua +++ b/redis/_entry_helpers.lua @@ -1,9 +1,7 @@ -local function test_id_from_entry(value, delimiter) - if delimiter then - local pos = string.find(value, delimiter, 1, true) - if pos then - return string.sub(value, 1, pos - 1) - end +local function test_id_from_entry(value) + if string.sub(value, 1, 1) == '{' then + local decoded = cjson.decode(value) + return decoded['test_id'] end return value end diff --git a/redis/heartbeat.lua b/redis/heartbeat.lua index 6b88fe2b..2a7006c1 100644 --- a/redis/heartbeat.lua +++ b/redis/heartbeat.lua @@ -7,9 +7,8 @@ local worker_queue_key = KEYS[4] local current_time = ARGV[1] local entry = ARGV[2] -local entry_delimiter = ARGV[3] -local test_id = test_id_from_entry(entry, entry_delimiter) +local test_id = test_id_from_entry(entry) -- already processed, we do not need to bump the timestamp if redis.call('sismember', processed_key, test_id) == 1 then diff --git a/redis/reserve_lost.lua b/redis/reserve_lost.lua index f595dd28..a36fc067 100644 --- a/redis/reserve_lost.lua +++ b/redis/reserve_lost.lua @@ -7,11 +7,10 @@ local owners_key = KEYS[4] local current_time = ARGV[1] local timeout = ARGV[2] -local entry_delimiter = ARGV[3] local lost_tests = redis.call('zrangebyscore', zset_key, 0, current_time - timeout) for _, test in ipairs(lost_tests) do - local test_id = test_id_from_entry(test, entry_delimiter) + local test_id = test_id_from_entry(test) if redis.call('sismember', processed_key, test_id) == 0 then redis.call('zadd', zset_key, current_time, test) redis.call('lpush', worker_queue_key, test) diff --git a/ruby/lib/ci/queue/queue_entry.rb b/ruby/lib/ci/queue/queue_entry.rb index 81846f27..50288603 100644 --- a/ruby/lib/ci/queue/queue_entry.rb +++ b/ruby/lib/ci/queue/queue_entry.rb @@ -6,26 +6,20 @@ module CI module Queue module QueueEntry - DELIMITER = "\t" LOAD_ERROR_PREFIX = '__ciq_load_error__:'.freeze def self.test_id(entry) - pos = entry.index(DELIMITER) - pos ? entry[0, pos] : entry + JSON.parse(entry, symbolize_names: true)[:test_id] end def self.parse(entry) - return { test_id: entry, file_path: nil } unless entry.include?(DELIMITER) - - test_id, file_path = entry.split(DELIMITER, 2) - file_path = nil if file_path == "" - { test_id: test_id, file_path: file_path } + JSON.parse(entry, symbolize_names: true) end def self.format(test_id, file_path) return test_id if file_path.nil? || file_path == "" - "#{test_id}#{DELIMITER}#{file_path}" + JSON.dump({ test_id: test_id, file_path: file_path }) end def self.load_error_payload?(file_path) diff --git a/ruby/lib/ci/queue/redis/base.rb b/ruby/lib/ci/queue/redis/base.rb index 95e4d304..95693d3f 100644 --- a/ruby/lib/ci/queue/redis/base.rb +++ b/ruby/lib/ci/queue/redis/base.rb @@ -264,13 +264,12 @@ def resolve_lua_includes(script, root) end class HeartbeatProcess - def initialize(redis_url, zset_key, processed_key, owners_key, worker_queue_key, entry_delimiter:) + def initialize(redis_url, zset_key, processed_key, owners_key, worker_queue_key) @redis_url = redis_url @zset_key = zset_key @processed_key = processed_key @owners_key = owners_key @worker_queue_key = worker_queue_key - @entry_delimiter = entry_delimiter end def boot! @@ -285,7 +284,6 @@ def boot! @processed_key, @owners_key, @worker_queue_key, - @entry_delimiter, in: child_read, out: child_write, ) @@ -360,7 +358,6 @@ def heartbeat_process key('processed'), key('owners'), key('worker', worker_id, 'queue'), - entry_delimiter: CI::Queue::QueueEntry::DELIMITER, ) end diff --git a/ruby/lib/ci/queue/redis/monitor.rb b/ruby/lib/ci/queue/redis/monitor.rb index b737690c..8554ab06 100755 --- a/ruby/lib/ci/queue/redis/monitor.rb +++ b/ruby/lib/ci/queue/redis/monitor.rb @@ -13,12 +13,11 @@ class Monitor DEV_SCRIPTS_ROOT = ::File.expand_path('../../../../../../redis', __FILE__) RELEASE_SCRIPTS_ROOT = ::File.expand_path('../../redis', __FILE__) - def initialize(pipe, logger, redis_url, zset_key, processed_key, owners_key, worker_queue_key, entry_delimiter) + def initialize(pipe, logger, redis_url, zset_key, processed_key, owners_key, worker_queue_key) @zset_key = zset_key @processed_key = processed_key @owners_key = owners_key @worker_queue_key = worker_queue_key - @entry_delimiter = entry_delimiter @logger = logger @redis = ::Redis.new(url: redis_url, reconnect_attempts: [0, 0, 0.1, 0.5, 1, 3, 5]) @shutdown = false @@ -41,7 +40,7 @@ def process_tick!(id:) eval_script( :heartbeat, keys: [@zset_key, @processed_key, @owners_key, @worker_queue_key], - argv: [Time.now.to_f, id, @entry_delimiter] + argv: [Time.now.to_f, id] ) rescue => error @logger.info(error) @@ -155,10 +154,9 @@ def monitor processed_key = ARGV[2] owners_key = ARGV[3] worker_queue_key = ARGV[4] -entry_delimiter = ARGV[5] logger.debug("Starting monitor: #{redis_url} #{zset_key} #{processed_key}") -manager = CI::Queue::Redis::Monitor.new($stdin, logger, redis_url, zset_key, processed_key, owners_key, worker_queue_key, entry_delimiter) +manager = CI::Queue::Redis::Monitor.new($stdin, logger, redis_url, zset_key, processed_key, owners_key, worker_queue_key) # Notify the parent we're ready $stdout.puts(".") diff --git a/ruby/lib/ci/queue/redis/worker.rb b/ruby/lib/ci/queue/redis/worker.rb index 004b6fe3..6736bf40 100644 --- a/ruby/lib/ci/queue/redis/worker.rb +++ b/ruby/lib/ci/queue/redis/worker.rb @@ -391,7 +391,7 @@ def try_to_reserve_lost_test key('worker', worker_id, 'queue'), key('owners'), ], - argv: [CI::Queue.time_now.to_f, timeout, CI::Queue::QueueEntry::DELIMITER], + argv: [CI::Queue.time_now.to_f, timeout], ) if lost_test diff --git a/ruby/test/ci/queue/queue_entry_test.rb b/ruby/test/ci/queue/queue_entry_test.rb index 86a96aa5..5d869bbb 100644 --- a/ruby/test/ci/queue/queue_entry_test.rb +++ b/ruby/test/ci/queue/queue_entry_test.rb @@ -2,8 +2,6 @@ require 'test_helper' class CI::Queue::QueueEntryTest < Minitest::Test - DELIMITER = CI::Queue::QueueEntry::DELIMITER - def test_parse_without_file_path entry = "FooTest#test_bar" parsed = CI::Queue::QueueEntry.parse(entry) @@ -12,7 +10,7 @@ def test_parse_without_file_path end def test_parse_with_file_path - entry = "FooTest#test_bar#{DELIMITER}/tmp/foo_test.rb" + entry = CI::Queue::QueueEntry.format("FooTest#test_bar", "/tmp/foo_test.rb") parsed = CI::Queue::QueueEntry.parse(entry) assert_equal "FooTest#test_bar", parsed[:test_id] assert_equal "/tmp/foo_test.rb", parsed[:file_path] @@ -25,7 +23,9 @@ def test_format_without_file_path def test_format_with_file_path entry = CI::Queue::QueueEntry.format("FooTest#test_bar", "/tmp/foo_test.rb") - assert_equal "FooTest#test_bar#{DELIMITER}/tmp/foo_test.rb", entry + parsed = JSON.parse(entry, symbolize_names: true) + assert_equal "FooTest#test_bar", parsed[:test_id] + assert_equal "/tmp/foo_test.rb", parsed[:file_path] end def test_parse_with_pipe_in_test_name @@ -36,6 +36,14 @@ def test_parse_with_pipe_in_test_name assert_equal "/tmp/foo_test.rb", parsed[:file_path] end + def test_parse_with_tab_in_test_name + test_id = "FooTest#test_xss_" + entry = CI::Queue::QueueEntry.format(test_id, "/tmp/foo_test.rb") + parsed = CI::Queue::QueueEntry.parse(entry) + assert_equal test_id, parsed[:test_id] + assert_equal "/tmp/foo_test.rb", parsed[:file_path] + end + def test_round_trip_preserves_test_id test_id = "FooTest#test_bar" file_path = "/tmp/foo_test.rb" @@ -45,6 +53,21 @@ def test_round_trip_preserves_test_id assert_equal file_path, parsed[:file_path] end + def test_test_id_without_file_path + assert_equal "FooTest#test_bar", CI::Queue::QueueEntry.test_id("FooTest#test_bar") + end + + def test_test_id_with_file_path + entry = CI::Queue::QueueEntry.format("FooTest#test_bar", "/tmp/foo_test.rb") + assert_equal "FooTest#test_bar", CI::Queue::QueueEntry.test_id(entry) + end + + def test_test_id_with_tab_in_test_name + test_id = "FooTest#test_xss_" + entry = CI::Queue::QueueEntry.format(test_id, "/tmp/foo_test.rb") + assert_equal test_id, CI::Queue::QueueEntry.test_id(entry) + end + def test_encode_decode_load_error error = StandardError.new("boom") error.set_backtrace(["/tmp/test.rb:10"]) diff --git a/ruby/test/ci/queue/redis_test.rb b/ruby/test/ci/queue/redis_test.rb index 6e89798f..b4c2f151 100644 --- a/ruby/test/ci/queue/redis_test.rb +++ b/ruby/test/ci/queue/redis_test.rb @@ -4,7 +4,6 @@ class CI::Queue::RedisTest < Minitest::Test include SharedQueueAssertions - DELIMITER = CI::Queue::QueueEntry::DELIMITER EntryTest = Struct.new(:id, :queue_entry) def setup @@ -243,8 +242,8 @@ def test_streaming_waits_for_batches consumer.entry_resolver = ->(entry) { entry } tests = [ - EntryTest.new('ATest#test_foo', "ATest#test_foo#{DELIMITER}/tmp/a_test.rb"), - EntryTest.new('ATest#test_bar', "ATest#test_bar#{DELIMITER}/tmp/a_test.rb"), + EntryTest.new('ATest#test_foo', CI::Queue::QueueEntry.format('ATest#test_foo', '/tmp/a_test.rb')), + EntryTest.new('ATest#test_bar', CI::Queue::QueueEntry.format('ATest#test_bar', '/tmp/a_test.rb')), ] streamed = Enumerator.new do |yielder| @@ -283,7 +282,7 @@ def test_streaming_waits_for_batches def test_reserve_lost_ignores_processed_entry_with_path queue = worker(1, populate: false) - entry = "ATest#test_foo#{DELIMITER}/tmp/a_test.rb" + entry = CI::Queue::QueueEntry.format('ATest#test_foo', '/tmp/a_test.rb') test_id = 'ATest#test_foo' @redis.zadd(queue.send(:key, 'running'), 0, entry) @@ -307,7 +306,7 @@ def test_streaming_timeout_raises_lost_master def test_reserve_defers_own_requeued_test_once queue = worker(1, populate: false, build_id: 'self-requeue-script') queue.send(:register) - entry = "ATest#test_foo#{DELIMITER}/tmp/a_test.rb" + entry = CI::Queue::QueueEntry.format('ATest#test_foo', '/tmp/a_test.rb') queue_key = queue.send(:key, 'queue') requeued_by_key = queue.send(:key, 'requeued-by') worker_queue_key = queue.send(:key, 'worker', queue.config.worker_id, 'queue') @@ -328,7 +327,7 @@ def test_reserve_defers_own_requeued_test_once def test_heartbeat_uses_test_id_for_processed_check queue = worker(1, populate: false) - entry = "ATest#test_foo#{DELIMITER}/tmp/a_test.rb" + entry = CI::Queue::QueueEntry.format('ATest#test_foo', '/tmp/a_test.rb') test_id = 'ATest#test_foo' @redis.sadd(queue.send(:key, 'processed'), test_id) @@ -342,7 +341,7 @@ def test_heartbeat_uses_test_id_for_processed_check queue.send(:key, 'owners'), queue.send(:key, 'worker', queue.config.worker_id, 'queue'), ], - argv: [CI::Queue.time_now.to_f, entry, DELIMITER], + argv: [CI::Queue.time_now.to_f, entry], ) assert_nil result @@ -353,9 +352,10 @@ def test_resolve_entry_falls_back_to_resolver queue.instance_variable_set(:@index, { 'ATest#test_foo' => :ok }) queue.entry_resolver = ->(entry) { "resolved:#{entry}" } - resolved = queue.send(:resolve_entry, "MissingTest#test_bar#{DELIMITER}/tmp/missing.rb") + missing_entry = CI::Queue::QueueEntry.format('MissingTest#test_bar', '/tmp/missing.rb') + resolved = queue.send(:resolve_entry, missing_entry) - assert_equal "resolved:MissingTest#test_bar#{DELIMITER}/tmp/missing.rb", resolved + assert_equal "resolved:#{missing_entry}", resolved end def test_continuously_timing_out_tests