diff --git a/redis/_entry_helpers.lua b/redis/_entry_helpers.lua
index c1755278..b2fd2f57 100644
--- a/redis/_entry_helpers.lua
+++ b/redis/_entry_helpers.lua
@@ -1,9 +1,7 @@
-local function test_id_from_entry(value, delimiter)
- if delimiter then
- local pos = string.find(value, delimiter, 1, true)
- if pos then
- return string.sub(value, 1, pos - 1)
- end
+local function test_id_from_entry(value)
+ if string.sub(value, 1, 1) == '{' then
+ local decoded = cjson.decode(value)
+ return decoded['test_id']
end
return value
end
diff --git a/redis/heartbeat.lua b/redis/heartbeat.lua
index 6b88fe2b..2a7006c1 100644
--- a/redis/heartbeat.lua
+++ b/redis/heartbeat.lua
@@ -7,9 +7,8 @@ local worker_queue_key = KEYS[4]
local current_time = ARGV[1]
local entry = ARGV[2]
-local entry_delimiter = ARGV[3]
-local test_id = test_id_from_entry(entry, entry_delimiter)
+local test_id = test_id_from_entry(entry)
-- already processed, we do not need to bump the timestamp
if redis.call('sismember', processed_key, test_id) == 1 then
diff --git a/redis/reserve_lost.lua b/redis/reserve_lost.lua
index f595dd28..a36fc067 100644
--- a/redis/reserve_lost.lua
+++ b/redis/reserve_lost.lua
@@ -7,11 +7,10 @@ local owners_key = KEYS[4]
local current_time = ARGV[1]
local timeout = ARGV[2]
-local entry_delimiter = ARGV[3]
local lost_tests = redis.call('zrangebyscore', zset_key, 0, current_time - timeout)
for _, test in ipairs(lost_tests) do
- local test_id = test_id_from_entry(test, entry_delimiter)
+ local test_id = test_id_from_entry(test)
if redis.call('sismember', processed_key, test_id) == 0 then
redis.call('zadd', zset_key, current_time, test)
redis.call('lpush', worker_queue_key, test)
diff --git a/ruby/lib/ci/queue/queue_entry.rb b/ruby/lib/ci/queue/queue_entry.rb
index 81846f27..50288603 100644
--- a/ruby/lib/ci/queue/queue_entry.rb
+++ b/ruby/lib/ci/queue/queue_entry.rb
@@ -6,26 +6,20 @@
module CI
module Queue
module QueueEntry
- DELIMITER = "\t"
LOAD_ERROR_PREFIX = '__ciq_load_error__:'.freeze
def self.test_id(entry)
- pos = entry.index(DELIMITER)
- pos ? entry[0, pos] : entry
+ JSON.parse(entry, symbolize_names: true)[:test_id]
end
def self.parse(entry)
- return { test_id: entry, file_path: nil } unless entry.include?(DELIMITER)
-
- test_id, file_path = entry.split(DELIMITER, 2)
- file_path = nil if file_path == ""
- { test_id: test_id, file_path: file_path }
+ JSON.parse(entry, symbolize_names: true)
end
def self.format(test_id, file_path)
return test_id if file_path.nil? || file_path == ""
- "#{test_id}#{DELIMITER}#{file_path}"
+ JSON.dump({ test_id: test_id, file_path: file_path })
end
def self.load_error_payload?(file_path)
diff --git a/ruby/lib/ci/queue/redis/base.rb b/ruby/lib/ci/queue/redis/base.rb
index 95e4d304..95693d3f 100644
--- a/ruby/lib/ci/queue/redis/base.rb
+++ b/ruby/lib/ci/queue/redis/base.rb
@@ -264,13 +264,12 @@ def resolve_lua_includes(script, root)
end
class HeartbeatProcess
- def initialize(redis_url, zset_key, processed_key, owners_key, worker_queue_key, entry_delimiter:)
+ def initialize(redis_url, zset_key, processed_key, owners_key, worker_queue_key)
@redis_url = redis_url
@zset_key = zset_key
@processed_key = processed_key
@owners_key = owners_key
@worker_queue_key = worker_queue_key
- @entry_delimiter = entry_delimiter
end
def boot!
@@ -285,7 +284,6 @@ def boot!
@processed_key,
@owners_key,
@worker_queue_key,
- @entry_delimiter,
in: child_read,
out: child_write,
)
@@ -360,7 +358,6 @@ def heartbeat_process
key('processed'),
key('owners'),
key('worker', worker_id, 'queue'),
- entry_delimiter: CI::Queue::QueueEntry::DELIMITER,
)
end
diff --git a/ruby/lib/ci/queue/redis/monitor.rb b/ruby/lib/ci/queue/redis/monitor.rb
index b737690c..8554ab06 100755
--- a/ruby/lib/ci/queue/redis/monitor.rb
+++ b/ruby/lib/ci/queue/redis/monitor.rb
@@ -13,12 +13,11 @@ class Monitor
DEV_SCRIPTS_ROOT = ::File.expand_path('../../../../../../redis', __FILE__)
RELEASE_SCRIPTS_ROOT = ::File.expand_path('../../redis', __FILE__)
- def initialize(pipe, logger, redis_url, zset_key, processed_key, owners_key, worker_queue_key, entry_delimiter)
+ def initialize(pipe, logger, redis_url, zset_key, processed_key, owners_key, worker_queue_key)
@zset_key = zset_key
@processed_key = processed_key
@owners_key = owners_key
@worker_queue_key = worker_queue_key
- @entry_delimiter = entry_delimiter
@logger = logger
@redis = ::Redis.new(url: redis_url, reconnect_attempts: [0, 0, 0.1, 0.5, 1, 3, 5])
@shutdown = false
@@ -41,7 +40,7 @@ def process_tick!(id:)
eval_script(
:heartbeat,
keys: [@zset_key, @processed_key, @owners_key, @worker_queue_key],
- argv: [Time.now.to_f, id, @entry_delimiter]
+ argv: [Time.now.to_f, id]
)
rescue => error
@logger.info(error)
@@ -155,10 +154,9 @@ def monitor
processed_key = ARGV[2]
owners_key = ARGV[3]
worker_queue_key = ARGV[4]
-entry_delimiter = ARGV[5]
logger.debug("Starting monitor: #{redis_url} #{zset_key} #{processed_key}")
-manager = CI::Queue::Redis::Monitor.new($stdin, logger, redis_url, zset_key, processed_key, owners_key, worker_queue_key, entry_delimiter)
+manager = CI::Queue::Redis::Monitor.new($stdin, logger, redis_url, zset_key, processed_key, owners_key, worker_queue_key)
# Notify the parent we're ready
$stdout.puts(".")
diff --git a/ruby/lib/ci/queue/redis/worker.rb b/ruby/lib/ci/queue/redis/worker.rb
index 004b6fe3..6736bf40 100644
--- a/ruby/lib/ci/queue/redis/worker.rb
+++ b/ruby/lib/ci/queue/redis/worker.rb
@@ -391,7 +391,7 @@ def try_to_reserve_lost_test
key('worker', worker_id, 'queue'),
key('owners'),
],
- argv: [CI::Queue.time_now.to_f, timeout, CI::Queue::QueueEntry::DELIMITER],
+ argv: [CI::Queue.time_now.to_f, timeout],
)
if lost_test
diff --git a/ruby/test/ci/queue/queue_entry_test.rb b/ruby/test/ci/queue/queue_entry_test.rb
index 86a96aa5..5d869bbb 100644
--- a/ruby/test/ci/queue/queue_entry_test.rb
+++ b/ruby/test/ci/queue/queue_entry_test.rb
@@ -2,8 +2,6 @@
require 'test_helper'
class CI::Queue::QueueEntryTest < Minitest::Test
- DELIMITER = CI::Queue::QueueEntry::DELIMITER
-
def test_parse_without_file_path
entry = "FooTest#test_bar"
parsed = CI::Queue::QueueEntry.parse(entry)
@@ -12,7 +10,7 @@ def test_parse_without_file_path
end
def test_parse_with_file_path
- entry = "FooTest#test_bar#{DELIMITER}/tmp/foo_test.rb"
+ entry = CI::Queue::QueueEntry.format("FooTest#test_bar", "/tmp/foo_test.rb")
parsed = CI::Queue::QueueEntry.parse(entry)
assert_equal "FooTest#test_bar", parsed[:test_id]
assert_equal "/tmp/foo_test.rb", parsed[:file_path]
@@ -25,7 +23,9 @@ def test_format_without_file_path
def test_format_with_file_path
entry = CI::Queue::QueueEntry.format("FooTest#test_bar", "/tmp/foo_test.rb")
- assert_equal "FooTest#test_bar#{DELIMITER}/tmp/foo_test.rb", entry
+ parsed = JSON.parse(entry, symbolize_names: true)
+ assert_equal "FooTest#test_bar", parsed[:test_id]
+ assert_equal "/tmp/foo_test.rb", parsed[:file_path]
end
def test_parse_with_pipe_in_test_name
@@ -36,6 +36,14 @@ def test_parse_with_pipe_in_test_name
assert_equal "/tmp/foo_test.rb", parsed[:file_path]
end
+ def test_parse_with_tab_in_test_name
+ test_id = "FooTest#test_xss_
"
+ entry = CI::Queue::QueueEntry.format(test_id, "/tmp/foo_test.rb")
+ parsed = CI::Queue::QueueEntry.parse(entry)
+ assert_equal test_id, parsed[:test_id]
+ assert_equal "/tmp/foo_test.rb", parsed[:file_path]
+ end
+
def test_round_trip_preserves_test_id
test_id = "FooTest#test_bar"
file_path = "/tmp/foo_test.rb"
@@ -45,6 +53,21 @@ def test_round_trip_preserves_test_id
assert_equal file_path, parsed[:file_path]
end
+ def test_test_id_without_file_path
+ assert_equal "FooTest#test_bar", CI::Queue::QueueEntry.test_id("FooTest#test_bar")
+ end
+
+ def test_test_id_with_file_path
+ entry = CI::Queue::QueueEntry.format("FooTest#test_bar", "/tmp/foo_test.rb")
+ assert_equal "FooTest#test_bar", CI::Queue::QueueEntry.test_id(entry)
+ end
+
+ def test_test_id_with_tab_in_test_name
+ test_id = "FooTest#test_xss_
"
+ entry = CI::Queue::QueueEntry.format(test_id, "/tmp/foo_test.rb")
+ assert_equal test_id, CI::Queue::QueueEntry.test_id(entry)
+ end
+
def test_encode_decode_load_error
error = StandardError.new("boom")
error.set_backtrace(["/tmp/test.rb:10"])
diff --git a/ruby/test/ci/queue/redis_test.rb b/ruby/test/ci/queue/redis_test.rb
index 6e89798f..b4c2f151 100644
--- a/ruby/test/ci/queue/redis_test.rb
+++ b/ruby/test/ci/queue/redis_test.rb
@@ -4,7 +4,6 @@
class CI::Queue::RedisTest < Minitest::Test
include SharedQueueAssertions
- DELIMITER = CI::Queue::QueueEntry::DELIMITER
EntryTest = Struct.new(:id, :queue_entry)
def setup
@@ -243,8 +242,8 @@ def test_streaming_waits_for_batches
consumer.entry_resolver = ->(entry) { entry }
tests = [
- EntryTest.new('ATest#test_foo', "ATest#test_foo#{DELIMITER}/tmp/a_test.rb"),
- EntryTest.new('ATest#test_bar', "ATest#test_bar#{DELIMITER}/tmp/a_test.rb"),
+ EntryTest.new('ATest#test_foo', CI::Queue::QueueEntry.format('ATest#test_foo', '/tmp/a_test.rb')),
+ EntryTest.new('ATest#test_bar', CI::Queue::QueueEntry.format('ATest#test_bar', '/tmp/a_test.rb')),
]
streamed = Enumerator.new do |yielder|
@@ -283,7 +282,7 @@ def test_streaming_waits_for_batches
def test_reserve_lost_ignores_processed_entry_with_path
queue = worker(1, populate: false)
- entry = "ATest#test_foo#{DELIMITER}/tmp/a_test.rb"
+ entry = CI::Queue::QueueEntry.format('ATest#test_foo', '/tmp/a_test.rb')
test_id = 'ATest#test_foo'
@redis.zadd(queue.send(:key, 'running'), 0, entry)
@@ -307,7 +306,7 @@ def test_streaming_timeout_raises_lost_master
def test_reserve_defers_own_requeued_test_once
queue = worker(1, populate: false, build_id: 'self-requeue-script')
queue.send(:register)
- entry = "ATest#test_foo#{DELIMITER}/tmp/a_test.rb"
+ entry = CI::Queue::QueueEntry.format('ATest#test_foo', '/tmp/a_test.rb')
queue_key = queue.send(:key, 'queue')
requeued_by_key = queue.send(:key, 'requeued-by')
worker_queue_key = queue.send(:key, 'worker', queue.config.worker_id, 'queue')
@@ -328,7 +327,7 @@ def test_reserve_defers_own_requeued_test_once
def test_heartbeat_uses_test_id_for_processed_check
queue = worker(1, populate: false)
- entry = "ATest#test_foo#{DELIMITER}/tmp/a_test.rb"
+ entry = CI::Queue::QueueEntry.format('ATest#test_foo', '/tmp/a_test.rb')
test_id = 'ATest#test_foo'
@redis.sadd(queue.send(:key, 'processed'), test_id)
@@ -342,7 +341,7 @@ def test_heartbeat_uses_test_id_for_processed_check
queue.send(:key, 'owners'),
queue.send(:key, 'worker', queue.config.worker_id, 'queue'),
],
- argv: [CI::Queue.time_now.to_f, entry, DELIMITER],
+ argv: [CI::Queue.time_now.to_f, entry],
)
assert_nil result
@@ -353,9 +352,10 @@ def test_resolve_entry_falls_back_to_resolver
queue.instance_variable_set(:@index, { 'ATest#test_foo' => :ok })
queue.entry_resolver = ->(entry) { "resolved:#{entry}" }
- resolved = queue.send(:resolve_entry, "MissingTest#test_bar#{DELIMITER}/tmp/missing.rb")
+ missing_entry = CI::Queue::QueueEntry.format('MissingTest#test_bar', '/tmp/missing.rb')
+ resolved = queue.send(:resolve_entry, missing_entry)
- assert_equal "resolved:MissingTest#test_bar#{DELIMITER}/tmp/missing.rb", resolved
+ assert_equal "resolved:#{missing_entry}", resolved
end
def test_continuously_timing_out_tests