From 4e295d4f747cc8ee34a6cceead913b1ba9300539 Mon Sep 17 00:00:00 2001 From: Serhii Sadovskyi Date: Thu, 19 Jun 2025 16:28:17 +0200 Subject: [PATCH 01/10] added: * worker to check fiber timeouts * timeout_after implementation for fiber scheduler --- lib/rage/fiber_scheduler.rb | 78 +++++++++++++++++++++++++++++++++++++ 1 file changed, 78 insertions(+) diff --git a/lib/rage/fiber_scheduler.rb b/lib/rage/fiber_scheduler.rb index 5bec50c0..b53e5bd0 100644 --- a/lib/rage/fiber_scheduler.rb +++ b/lib/rage/fiber_scheduler.rb @@ -4,16 +4,29 @@ class Rage::FiberScheduler MAX_READ = 65536 + TIMEOUT_WORKER_INTERVAL = 100 # miliseconds def initialize @root_fiber = Fiber.current @dns_cache = {} + + @alive_fibers = {} + @timeout_mutex = Mutex.new + + start_timeout_worker end def io_wait(io, events, timeout = nil) f = Fiber.current ::Iodine::Scheduler.attach(io.fileno, events, timeout&.ceil) { |err| f.resume(err) } + timeout_deadline = Process.clock_gettime(Process::CLOCK_MONOTONIC) + timeout + @alive_fibers[f.__get_id] = { + fiber: f, + timeout_deadline: timeout_deadline, + exception_class: RageTimeout, + } + err = Fiber.defer(io.fileno) if err == false || (err && err < 0) err @@ -79,6 +92,30 @@ def kernel_sleep(duration = nil) # result # end + def timeout_after(duration, exception_class = Timeout::Error, *exception_arguments, &block) + fiber = Fiber.current + timeout_deadline = Process.clock_gettime(Process::CLOCK_MONOTONIC) + duration + + p "duration #{duration}" + p "fiber id #{fiber.__get_id}" + + @timeout_mutex.synchronize do + @alive_fibers[fiber.__get_id] = { + fiber: fiber, + timeout_deadline: timeout_deadline, + exception_class: exception_class, + exception_arguments: exception_arguments, + } + end + + begin + block.call + ensure + @timeout_mutex.synchronize do + @alive_fibers.delete(fiber.__get_id) + end + end + end def address_resolve(hostname) @dns_cache[hostname] ||= begin @@ -146,4 +183,45 @@ def fiber(&block) def close ::Iodine::Scheduler.close end + + private + + def start_timeout_worker + return unless ::Iodine.running? + + ::Iodine.run_every(Rage::FiberScheduler::TIMEOUT_WORKER_INTERVAL) do + @timeout_mutex.synchronize do + check_timeouts + end + end + end + + def check_timeouts + p @alive_fibers.count + + @alive_fibers.delete_if do |_, fiber_hash| + current_time = Process.clock_gettime(Process::CLOCK_MONOTONIC) + + p current_time + p "deadline #{fiber_hash[:timeout_deadline]}" + p "fiber id #{fiber_hash[:fiber].__get_id}" + + return false if current_time < fiber_hash[:timeout_deadline] + + p 'after' + + fiber = fiber_hash[:fiber] + # unblock(nil, fiber) + + # if fiber.alive? + fiber.raise(RageTimeout) + # else + # fiber.kill + # end + + true + end + end end + +class RageTimeout < StandardError; end \ No newline at end of file From f7cf4e442d23bc987483cf24680a0d56d1a6a8ff Mon Sep 17 00:00:00 2001 From: Serhii Sadovskyi Date: Wed, 9 Jul 2025 23:20:48 +0200 Subject: [PATCH 02/10] correctly handle nested timeouts --- lib/rage/fiber_scheduler.rb | 82 +++++++++++-------------------------- 1 file changed, 24 insertions(+), 58 deletions(-) diff --git a/lib/rage/fiber_scheduler.rb b/lib/rage/fiber_scheduler.rb index b53e5bd0..1babeefb 100644 --- a/lib/rage/fiber_scheduler.rb +++ b/lib/rage/fiber_scheduler.rb @@ -10,8 +10,7 @@ def initialize @root_fiber = Fiber.current @dns_cache = {} - @alive_fibers = {} - @timeout_mutex = Mutex.new + @alive_fibers = Hash.new { |h, k| h[k] = {} } start_timeout_worker end @@ -20,13 +19,6 @@ def io_wait(io, events, timeout = nil) f = Fiber.current ::Iodine::Scheduler.attach(io.fileno, events, timeout&.ceil) { |err| f.resume(err) } - timeout_deadline = Process.clock_gettime(Process::CLOCK_MONOTONIC) + timeout - @alive_fibers[f.__get_id] = { - fiber: f, - timeout_deadline: timeout_deadline, - exception_class: RageTimeout, - } - err = Fiber.defer(io.fileno) if err == false || (err && err < 0) err @@ -79,41 +71,21 @@ def kernel_sleep(duration = nil) Fiber.pause if duration.nil? || duration < 1 end - # TODO: GC works a little strange with this closure; - # - # def timeout_after(duration, exception_class = Timeout::Error, *exception_arguments, &block) - # fiber, block_status = Fiber.current, :running - # ::Iodine.run_after((duration * 1000).to_i) do - # fiber.raise(exception_class, exception_arguments) if block_status == :running - # end - - # result = block.call - # block_status = :finished - - # result - # end def timeout_after(duration, exception_class = Timeout::Error, *exception_arguments, &block) fiber = Fiber.current timeout_deadline = Process.clock_gettime(Process::CLOCK_MONOTONIC) + duration - p "duration #{duration}" - p "fiber id #{fiber.__get_id}" - - @timeout_mutex.synchronize do - @alive_fibers[fiber.__get_id] = { - fiber: fiber, - timeout_deadline: timeout_deadline, - exception_class: exception_class, - exception_arguments: exception_arguments, - } - end + @alive_fibers[fiber.__get_id][timeout_deadline] = { + fiber: fiber, + timeout_deadline: timeout_deadline, + exception_class: exception_class, + exception_arguments: exception_arguments + } begin block.call ensure - @timeout_mutex.synchronize do - @alive_fibers.delete(fiber.__get_id) - end + @alive_fibers[fiber.__get_id].delete(timeout_deadline) end end @@ -190,38 +162,32 @@ def start_timeout_worker return unless ::Iodine.running? ::Iodine.run_every(Rage::FiberScheduler::TIMEOUT_WORKER_INTERVAL) do - @timeout_mutex.synchronize do - check_timeouts - end + check_timeouts end end def check_timeouts - p @alive_fibers.count - - @alive_fibers.delete_if do |_, fiber_hash| - current_time = Process.clock_gettime(Process::CLOCK_MONOTONIC) + @alive_fibers.delete_if do |fiber_id, timeouts| + timeouts.delete_if do |timeout_key, fiber_hash| + current_time = Process.clock_gettime(Process::CLOCK_MONOTONIC) - p current_time - p "deadline #{fiber_hash[:timeout_deadline]}" - p "fiber id #{fiber_hash[:fiber].__get_id}" + return false if current_time < fiber_hash[:timeout_deadline] - return false if current_time < fiber_hash[:timeout_deadline] + fiber = fiber_hash[:fiber] + unblock(nil, fiber) - p 'after' - - fiber = fiber_hash[:fiber] - # unblock(nil, fiber) + if fiber.alive? + fiber.raise(RageTimeout) + else + timeouts.delete(timeout_key) + end - # if fiber.alive? - fiber.raise(RageTimeout) - # else - # fiber.kill - # end + true + end - true + timeouts.length == 0 end end end -class RageTimeout < StandardError; end \ No newline at end of file +class RageTimeout < StandardError; end From 4efffde0d30040275153df922419f53c97bb27e8 Mon Sep 17 00:00:00 2001 From: Serhii Sadovskyi Date: Sun, 13 Jul 2025 21:51:22 +0200 Subject: [PATCH 03/10] - mutex to correctly access hash with fiber timeouts (concurrency happens between fiber itself and iodine worker) - kill fiber forcefully if it still alive after error was raised --- lib/rage/fiber_scheduler.rb | 51 ++++++++++++++++++++++--------------- 1 file changed, 31 insertions(+), 20 deletions(-) diff --git a/lib/rage/fiber_scheduler.rb b/lib/rage/fiber_scheduler.rb index 1babeefb..8eb48157 100644 --- a/lib/rage/fiber_scheduler.rb +++ b/lib/rage/fiber_scheduler.rb @@ -11,6 +11,7 @@ def initialize @dns_cache = {} @alive_fibers = Hash.new { |h, k| h[k] = {} } + @fibers_mutex = Mutex.new start_timeout_worker end @@ -75,17 +76,21 @@ def timeout_after(duration, exception_class = Timeout::Error, *exception_argumen fiber = Fiber.current timeout_deadline = Process.clock_gettime(Process::CLOCK_MONOTONIC) + duration - @alive_fibers[fiber.__get_id][timeout_deadline] = { - fiber: fiber, - timeout_deadline: timeout_deadline, - exception_class: exception_class, - exception_arguments: exception_arguments - } + @fibers_mutex.synchronize do + @alive_fibers[fiber.__get_id][timeout_deadline] = { + fiber: fiber, + timeout_deadline: timeout_deadline, + exception_class: exception_class, + exception_arguments: exception_arguments + } + end begin block.call ensure - @alive_fibers[fiber.__get_id].delete(timeout_deadline) + @fibers_mutex.synchronize do + @alive_fibers[fiber.__get_id].delete(timeout_deadline) + end end end @@ -167,25 +172,31 @@ def start_timeout_worker end def check_timeouts - @alive_fibers.delete_if do |fiber_id, timeouts| - timeouts.delete_if do |timeout_key, fiber_hash| - current_time = Process.clock_gettime(Process::CLOCK_MONOTONIC) + @fibers_mutex.synchronize do + @alive_fibers.delete_if do |fiber_id, timeouts| + timeouts.delete_if do |timeout_key, fiber_hash| + current_time = Process.clock_gettime(Process::CLOCK_MONOTONIC) + + next false if current_time < fiber_hash[:timeout_deadline] - return false if current_time < fiber_hash[:timeout_deadline] + fiber = fiber_hash[:fiber] + unblock(nil, fiber) - fiber = fiber_hash[:fiber] - unblock(nil, fiber) + if fiber.alive? + fiber.raise(RageTimeout) - if fiber.alive? - fiber.raise(RageTimeout) - else - timeouts.delete(timeout_key) + ::Iodine.run_after(1000) do + fiber.kill if fiber.alive? + end + else + timeouts.delete(timeout_key) + end + + true end - true + timeouts.length == 0 end - - timeouts.length == 0 end end end From 05b2bc14673ca6751dfc3300acd90aab9afff68c Mon Sep 17 00:00:00 2001 From: Serhii Sadovskyi Date: Sun, 13 Jul 2025 22:38:53 +0200 Subject: [PATCH 04/10] refactorings --- lib/rage/fiber_scheduler.rb | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/lib/rage/fiber_scheduler.rb b/lib/rage/fiber_scheduler.rb index 8eb48157..478cd7d6 100644 --- a/lib/rage/fiber_scheduler.rb +++ b/lib/rage/fiber_scheduler.rb @@ -5,6 +5,7 @@ class Rage::FiberScheduler MAX_READ = 65536 TIMEOUT_WORKER_INTERVAL = 100 # miliseconds + FIBER_KILL_DELAY = 500 # miliseconds def initialize @root_fiber = Fiber.current @@ -173,32 +174,30 @@ def start_timeout_worker def check_timeouts @fibers_mutex.synchronize do - @alive_fibers.delete_if do |fiber_id, timeouts| - timeouts.delete_if do |timeout_key, fiber_hash| + @alive_fibers.delete_if do |fiber_id, fiber_timeouts| + fiber_timeouts.delete_if do |timeout_key, fiber_context| current_time = Process.clock_gettime(Process::CLOCK_MONOTONIC) - next false if current_time < fiber_hash[:timeout_deadline] + next false if current_time < fiber_context[:timeout_deadline] - fiber = fiber_hash[:fiber] + fiber = fiber_context[:fiber] unblock(nil, fiber) if fiber.alive? - fiber.raise(RageTimeout) + fiber.raise(fiber_context[:exception_class], *fiber_context[:exception_arguments]) - ::Iodine.run_after(1000) do + ::Iodine.run_after(FIBER_KILL_DELAY) do fiber.kill if fiber.alive? end else - timeouts.delete(timeout_key) + fiber_timeouts.delete(timeout_key) end true end - timeouts.length == 0 + fiber_timeouts.length == 0 end end end end - -class RageTimeout < StandardError; end From 8449798abf8f90ad08cebe623433aa413db8f96f Mon Sep 17 00:00:00 2001 From: Serhii Sadovskyi Date: Mon, 14 Jul 2025 19:14:03 +0200 Subject: [PATCH 05/10] refactored: - variable names - removed mutex, as was added by mistake - `check_timeouts` now is thinner after unnecessary code deletion --- lib/rage/fiber_scheduler.rb | 54 ++++++++++--------------------------- 1 file changed, 14 insertions(+), 40 deletions(-) diff --git a/lib/rage/fiber_scheduler.rb b/lib/rage/fiber_scheduler.rb index 478cd7d6..44b41990 100644 --- a/lib/rage/fiber_scheduler.rb +++ b/lib/rage/fiber_scheduler.rb @@ -5,14 +5,12 @@ class Rage::FiberScheduler MAX_READ = 65536 TIMEOUT_WORKER_INTERVAL = 100 # miliseconds - FIBER_KILL_DELAY = 500 # miliseconds def initialize @root_fiber = Fiber.current @dns_cache = {} - @alive_fibers = Hash.new { |h, k| h[k] = {} } - @fibers_mutex = Mutex.new + @fiber_timeouts = Hash.new { |h, k| h[k] = {} } start_timeout_worker end @@ -75,23 +73,18 @@ def kernel_sleep(duration = nil) def timeout_after(duration, exception_class = Timeout::Error, *exception_arguments, &block) fiber = Fiber.current - timeout_deadline = Process.clock_gettime(Process::CLOCK_MONOTONIC) + duration - - @fibers_mutex.synchronize do - @alive_fibers[fiber.__get_id][timeout_deadline] = { - fiber: fiber, - timeout_deadline: timeout_deadline, - exception_class: exception_class, - exception_arguments: exception_arguments - } - end + timeout = Process.clock_gettime(Process::CLOCK_MONOTONIC) + duration + + @fiber_timeouts[fiber][timeout] = { + exception_class: exception_class, + exception_arguments: exception_arguments + } begin block.call ensure - @fibers_mutex.synchronize do - @alive_fibers[fiber.__get_id].delete(timeout_deadline) - end + @fiber_timeouts[fiber].delete(timeout) + @fiber_timeouts.delete(fiber) if @fiber_timeouts[fiber].empty? end end @@ -165,38 +158,19 @@ def close private def start_timeout_worker - return unless ::Iodine.running? - ::Iodine.run_every(Rage::FiberScheduler::TIMEOUT_WORKER_INTERVAL) do check_timeouts end end def check_timeouts - @fibers_mutex.synchronize do - @alive_fibers.delete_if do |fiber_id, fiber_timeouts| - fiber_timeouts.delete_if do |timeout_key, fiber_context| - current_time = Process.clock_gettime(Process::CLOCK_MONOTONIC) - - next false if current_time < fiber_context[:timeout_deadline] + @fiber_timeouts.each_pair do |fiber, timeouts| + timeouts.delete_if do |timeout, context| + next false if Process.clock_gettime(Process::CLOCK_MONOTONIC) < timeout - fiber = fiber_context[:fiber] - unblock(nil, fiber) - - if fiber.alive? - fiber.raise(fiber_context[:exception_class], *fiber_context[:exception_arguments]) - - ::Iodine.run_after(FIBER_KILL_DELAY) do - fiber.kill if fiber.alive? - end - else - fiber_timeouts.delete(timeout_key) - end - - true - end + fiber.raise(context[:exception_class], *context[:exception_arguments]) if fiber.alive? - fiber_timeouts.length == 0 + true end end end From a1a486978af0d130e7c2d75d8c8c108a1c1ae7a6 Mon Sep 17 00:00:00 2001 From: Serhii Sadovskyi Date: Mon, 14 Jul 2025 20:33:48 +0200 Subject: [PATCH 06/10] (fix) do not try to resume fiber if it's cannot be resumed --- lib/rage/fiber_scheduler.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/rage/fiber_scheduler.rb b/lib/rage/fiber_scheduler.rb index 44b41990..d6618897 100644 --- a/lib/rage/fiber_scheduler.rb +++ b/lib/rage/fiber_scheduler.rb @@ -17,7 +17,7 @@ def initialize def io_wait(io, events, timeout = nil) f = Fiber.current - ::Iodine::Scheduler.attach(io.fileno, events, timeout&.ceil) { |err| f.resume(err) } + ::Iodine::Scheduler.attach(io.fileno, events, timeout&.ceil) { |err| f.resume(err) if f.alive? } err = Fiber.defer(io.fileno) if err == false || (err && err < 0) From e778bcace1a1308ed446bce149d9e083f4cde4d3 Mon Sep 17 00:00:00 2001 From: Serhii Sadovskyi Date: Tue, 15 Jul 2025 17:33:03 +0200 Subject: [PATCH 07/10] more refactoring: * simplified `check_timeouts` method * renamed variables --- lib/rage/fiber_scheduler.rb | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/lib/rage/fiber_scheduler.rb b/lib/rage/fiber_scheduler.rb index d6618897..e1d0e7bf 100644 --- a/lib/rage/fiber_scheduler.rb +++ b/lib/rage/fiber_scheduler.rb @@ -72,10 +72,10 @@ def kernel_sleep(duration = nil) end def timeout_after(duration, exception_class = Timeout::Error, *exception_arguments, &block) - fiber = Fiber.current + f = Fiber.current timeout = Process.clock_gettime(Process::CLOCK_MONOTONIC) + duration - @fiber_timeouts[fiber][timeout] = { + @fiber_timeouts[f][timeout] = { exception_class: exception_class, exception_arguments: exception_arguments } @@ -83,8 +83,8 @@ def timeout_after(duration, exception_class = Timeout::Error, *exception_argumen begin block.call ensure - @fiber_timeouts[fiber].delete(timeout) - @fiber_timeouts.delete(fiber) if @fiber_timeouts[fiber].empty? + @fiber_timeouts[f].delete(timeout) + @fiber_timeouts.delete(f) if @fiber_timeouts[f].empty? end end @@ -164,13 +164,11 @@ def start_timeout_worker end def check_timeouts - @fiber_timeouts.each_pair do |fiber, timeouts| - timeouts.delete_if do |timeout, context| + @fiber_timeouts.each do |fiber, timeouts| + timeouts.each do |timeout, context| next false if Process.clock_gettime(Process::CLOCK_MONOTONIC) < timeout - fiber.raise(context[:exception_class], *context[:exception_arguments]) if fiber.alive? - - true + fiber.raise(context[:exception_class], *context[:exception_arguments]) end end end From 296aa2ef4aee658f9f64413fdc936c7364468aeb Mon Sep 17 00:00:00 2001 From: Serhii Sadovskyi Date: Tue, 14 Oct 2025 19:45:59 +0200 Subject: [PATCH 08/10] * unsubscribe from channel after raising exception for fiber * names for channels moved into a separate method --- lib/rage/fiber_scheduler.rb | 3 +++ 1 file changed, 3 insertions(+) diff --git a/lib/rage/fiber_scheduler.rb b/lib/rage/fiber_scheduler.rb index e1d0e7bf..0a4babf6 100644 --- a/lib/rage/fiber_scheduler.rb +++ b/lib/rage/fiber_scheduler.rb @@ -169,6 +169,9 @@ def check_timeouts next false if Process.clock_gettime(Process::CLOCK_MONOTONIC) < timeout fiber.raise(context[:exception_class], *context[:exception_arguments]) + + Iodine.unsubscribe(fiber.__block_channel) + Iodine.unsubscribe(fiber.__await_channel) end end end From bb355114d5098aa62675d396b9bb46a92ddc8712 Mon Sep 17 00:00:00 2001 From: Serhii Sadovskyi Date: Wed, 15 Oct 2025 13:34:31 +0200 Subject: [PATCH 09/10] fix for "RuntimeError (can't add a new key into hash during iteration)" --- lib/rage/fiber_scheduler.rb | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/lib/rage/fiber_scheduler.rb b/lib/rage/fiber_scheduler.rb index 0a4babf6..bfd057cb 100644 --- a/lib/rage/fiber_scheduler.rb +++ b/lib/rage/fiber_scheduler.rb @@ -164,15 +164,23 @@ def start_timeout_worker end def check_timeouts + fibers_to_raise = [] + @fiber_timeouts.each do |fiber, timeouts| timeouts.each do |timeout, context| next false if Process.clock_gettime(Process::CLOCK_MONOTONIC) < timeout - fiber.raise(context[:exception_class], *context[:exception_arguments]) + fibers_to_raise << -> do + fiber.raise(context[:exception_class], *context[:exception_arguments]) - Iodine.unsubscribe(fiber.__block_channel) - Iodine.unsubscribe(fiber.__await_channel) + Iodine.unsubscribe(fiber.__block_channel) + Iodine.unsubscribe(fiber.__await_channel) + end end end + + fibers_to_raise.each(&:call) + + fibers_to_raise.clear end end From d4cc3bad7c3de4c743306791d0b21212df706700 Mon Sep 17 00:00:00 2001 From: Serhii Sadovskyi Date: Fri, 5 Dec 2025 16:31:27 +0100 Subject: [PATCH 10/10] code's refactored in favor of `run_after` method instead of `run_every` --- lib/rage/fiber_scheduler.rb | 25 ++++++++++++++++++++----- 1 file changed, 20 insertions(+), 5 deletions(-) diff --git a/lib/rage/fiber_scheduler.rb b/lib/rage/fiber_scheduler.rb index bfd057cb..ce26c41a 100644 --- a/lib/rage/fiber_scheduler.rb +++ b/lib/rage/fiber_scheduler.rb @@ -4,15 +4,12 @@ class Rage::FiberScheduler MAX_READ = 65536 - TIMEOUT_WORKER_INTERVAL = 100 # miliseconds def initialize @root_fiber = Fiber.current @dns_cache = {} @fiber_timeouts = Hash.new { |h, k| h[k] = {} } - - start_timeout_worker end def io_wait(io, events, timeout = nil) @@ -80,6 +77,8 @@ def timeout_after(duration, exception_class = Timeout::Error, *exception_argumen exception_arguments: exception_arguments } + schedule_timeout_check + begin block.call ensure @@ -157,9 +156,25 @@ def close private - def start_timeout_worker - ::Iodine.run_every(Rage::FiberScheduler::TIMEOUT_WORKER_INTERVAL) do + def schedule_timeout_check + return if @fiber_timeouts.empty? + + closest_timeout = nil + @fiber_timeouts.each_value do |timeouts| + timeouts.each_key do |timeout| + closest_timeout = timeout if closest_timeout.nil? || timeout < closest_timeout + end + end + + return unless closest_timeout + + now = Process.clock_gettime(Process::CLOCK_MONOTONIC) + delay_ms = ((closest_timeout - now) * 1000).ceil + delay_ms = 0 if delay_ms < 0 + + ::Iodine.run_after(delay_ms) do check_timeouts + schedule_timeout_check end end