From f45b50811f1e9230ae9a3031d59d746f50bcc971 Mon Sep 17 00:00:00 2001 From: Patrik Ragnarsson Date: Fri, 12 Jun 2026 00:39:55 +0000 Subject: [PATCH] Add only: filter to restrict interception to matching fibers MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit In a process with background fibers (housekeeping loops, tickers), intercepting every sleep is not practical: any fiber that re-arms a periodic sleep while time is controlled freezes and leaks a pending timer, failing the control block with `PendingTimersError`. `TimeControl.control(only: /^worker/)` restricts interception to fibers whose name matches; sleeps and timeouts of all other fibers stay on the real event loop. Note that real timer deadlines are still measured against the virtual clock while it is controlled, so a non-matching fiber's timer registered inside the block fires late by up to the advanced amount — harmless for periodic housekeeping, but not full isolation. `PendingTimersError` now also names the fibers whose timers were left pending, which makes the leak source obvious in a process with many fibers. --- AGENTS.md | 2 +- README.md | 27 ++++++++++++- spec/time_control_spec.cr | 40 +++++++++++++++++++ src/time_control.cr | 30 ++++++++++---- src/time_control/context.cr | 17 ++++++-- .../core_ext/crystal/event_loop.cr | 8 ++-- .../core_ext/crystal/event_loop/polling.cr | 2 +- src/time_control/core_ext/fiber.cr | 12 ++++-- src/time_control/errors.cr | 11 +++-- 9 files changed, 125 insertions(+), 24 deletions(-) diff --git a/AGENTS.md b/AGENTS.md index dcf9bb5..24bf051 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -35,7 +35,7 @@ When enabled: ## Public API -- `TimeControl.control` — the main entry point +- `TimeControl.control` — the main entry point; accepts an optional `only:` regex restricting interception to fibers whose name matches (all other fibers keep real timers) - `Controller#advance(duration)` — advances virtual time by a fixed amount - `Controller#advance` — advances virtual time to the next pending timer - `TimeControl::Error`, `TimeControl::PendingTimersError` — error classes diff --git a/README.md b/README.md index 66dfa82..ccbef4b 100644 --- a/README.md +++ b/README.md @@ -206,7 +206,32 @@ end If the `control` block exits while virtual timers are still pending (i.e. fibers are sleeping beyond the last `advance`), a `TimeControl::PendingTimersError` -is raised. This catches specs that forget to advance past all scheduled work. +is raised, naming the fibers whose timers were left pending. This catches +specs that forget to advance past all scheduled work. + +### Controlling only specific fibers + +In a process with background fibers (housekeeping loops, tickers, stats +collectors), intercepting every sleep is not practical: any background fiber +that re-arms its periodic sleep while time is controlled freezes and leaks a +pending timer. Pass `only:` with a regex to restrict interception to fibers +whose name matches — all other fibers keep their timers on the real event +loop. + +```crystal +it "reconnects after the backoff" do + TimeControl.control(only: /^Reconnect worker/) do |controller| + spawn(name: "Reconnect worker") { sleep 30.seconds; reconnect } + controller.advance(30.seconds) + end +end +``` + +Note that while time is controlled the real event loop also measures its +timer deadlines against the virtual clock, so a non-matching fiber's timer +registered inside the block fires once virtual time advances past it or +after the block exits — late by up to the advanced amount. For periodic +housekeeping work this is harmless, but it is not full isolation. ## How it works diff --git a/spec/time_control_spec.cr b/spec/time_control_spec.cr index 88692c3..83a5266 100644 --- a/spec/time_control_spec.cr +++ b/spec/time_control_spec.cr @@ -215,6 +215,46 @@ describe TimeControl do ex.count.should eq(1) end + it "includes the names of fibers with pending timers in the error" do + ex = expect_raises(TimeControl::PendingTimersError, /housekeeping/) do + TimeControl.control do |_controller| + spawn(name: "housekeeping") { sleep 1.second } + Fiber.yield + end + end + ex.fiber_names.should eq(["housekeeping"]) + end + + describe "only filter" do + it "advances timers of fibers whose name matches" do + done = Channel(Nil).new + TimeControl.control(only: /^worker/) do |controller| + spawn(name: "worker 1") do + sleep 5.minutes + done.send(nil) + end + controller.advance(5.minutes) + done.receive + end + end + + it "leaves timers of non-matching fibers on the real event loop" do + TimeControl.control(only: /^worker/) do |_controller| + spawn(name: "housekeeping") { sleep 1.second } + Fiber.yield + # Exits cleanly: the housekeeping timer is real, not a leaked + # virtual timer. + end + end + + it "does not control unnamed fibers" do + TimeControl.control(only: /^worker/) do |_controller| + spawn { sleep 1.second } + Fiber.yield + end + end + end + describe "IO timeouts" do it "fires read_timeout when virtual time advances past it" do r, w = IO.pipe diff --git a/src/time_control.cr b/src/time_control.cr index 78540cf..f98512d 100644 --- a/src/time_control.cr +++ b/src/time_control.cr @@ -37,6 +37,15 @@ module TimeControl # An optional *start_time* sets the initial value of `Time.utc` inside the # block. Without it, virtual UTC starts at the real wall-clock time. # + # An optional *only* filter restricts which fibers get virtual timers: only + # fibers whose name matches the regex are controlled; sleeps and timeouts + # of all other fibers stay on the real event loop. Use this when the + # process runs background fibers (housekeeping loops, tickers) that should + # keep running normally — without a filter their re-armed timers freeze and + # leak, raising `PendingTimersError` at block exit. Note that real timers + # are still measured against the virtual clock while it is controlled, so + # an unfiltered fiber's timer can fire late by up to the advanced amount. + # # ``` # TimeControl.control do |controller| # spawn { sleep 5.minutes; puts "done" } @@ -50,17 +59,22 @@ module TimeControl # TimeControl.control("2030-01-01T09:00:00Z") do |controller| # Time.utc.hour # => 9 # end + # + # TimeControl.control(only: /^worker/) do |controller| + # spawn(name: "worker 1") { sleep 5.minutes; puts "done" } + # controller.advance(5.minutes) + # end # ``` - def self.control(& : Controller ->) : Nil - control(Context.new) { |controller| yield controller } + def self.control(only : Regex? = nil, & : Controller ->) : Nil + control(Context.new(only: only)) { |controller| yield controller } end - def self.control(start_time : Time, & : Controller ->) : Nil - control(Context.new(start_time)) { |controller| yield controller } + def self.control(start_time : Time, only : Regex? = nil, & : Controller ->) : Nil + control(Context.new(start_time, only: only)) { |controller| yield controller } end - def self.control(start_time : String, & : Controller ->) : Nil - control(parse_start_time(start_time)) { |controller| yield controller } + def self.control(start_time : String, only : Regex? = nil, & : Controller ->) : Nil + control(parse_start_time(start_time), only: only) { |controller| yield controller } end private def self.parse_start_time(str : String) : Time @@ -101,8 +115,8 @@ module TimeControl @@context = nil ctx.try &.stop isolated.try &.wait - if ctx && ctx.leaked_timer_count > 0 - raise PendingTimersError.new(ctx.leaked_timer_count) + if ctx && !ctx.leaked_timer_names.empty? + raise PendingTimersError.new(ctx.leaked_timer_names) end end end diff --git a/src/time_control/context.cr b/src/time_control/context.cr index f65da73..1722008 100644 --- a/src/time_control/context.cr +++ b/src/time_control/context.cr @@ -14,7 +14,7 @@ module TimeControl getter virtual_now : Time::Instant property timer_loop_thread : Thread? - getter leaked_timer_count : Int32 = 0 + getter leaked_timer_names = [] of String @advance_ch : Channel(Time::Span) @done_ch : Channel(Nil) @@ -28,7 +28,7 @@ module TimeControl @timers : Array(TimerEntry) @timers_mutex : Mutex - def initialize(start_time : Time? = nil) + def initialize(start_time : Time? = nil, @only : Regex? = nil) @advance_ch = Channel(Time::Span).new @done_ch = Channel(Nil).new @timer_inserted_ch = Channel(Nil).new(1) @@ -61,6 +61,17 @@ module TimeControl {@control_start_utc_s + total_ns // 1_000_000_000_i64, (total_ns % 1_000_000_000_i64).to_i32} end + # Whether timers for *fiber* should be virtualized. True for every fiber + # unless an `only` filter is set, in which case only fibers whose name + # matches the filter are controlled — the rest keep real timers. + def controls?(fiber : Fiber) : Bool + only = @only + return true unless only + name = fiber.name + return false unless name + name.matches?(only) + end + def add_sleep(fiber : Fiber, duration : Time::Span) : Nil notify = @timers_mutex.synchronize do insert_timer(TimerEntry.new(fiber, @virtual_now + duration, TimerKind::Sleep)) @@ -139,7 +150,7 @@ module TimeControl entry = @timers_mutex.synchronize { @timers.shift? } break unless entry next if entry.kind.io_timeout_wakeup? # not a stuck fiber; just an interrupt trigger for the event loop - @leaked_timer_count += 1 + @leaked_timer_names << (entry.fiber.name || "unnamed fiber") enqueue_entry(entry) end end diff --git a/src/time_control/core_ext/crystal/event_loop.cr b/src/time_control/core_ext/crystal/event_loop.cr index b02e299..10a8673 100644 --- a/src/time_control/core_ext/crystal/event_loop.cr +++ b/src/time_control/core_ext/crystal/event_loop.cr @@ -5,9 +5,11 @@ def sleep(duration : ::Time::Span) : Nil if duration.total_nanoseconds > 0 TimeControl.when_controlling do |ctx| - ctx.add_sleep(Fiber.current, duration) - Fiber.suspend - return + if ctx.controls?(Fiber.current) + ctx.add_sleep(Fiber.current, duration) + Fiber.suspend + return + end end end previous_def diff --git a/src/time_control/core_ext/crystal/event_loop/polling.cr b/src/time_control/core_ext/crystal/event_loop/polling.cr index bd238d3..ad9b661 100644 --- a/src/time_control/core_ext/crystal/event_loop/polling.cr +++ b/src/time_control/core_ext/crystal/event_loop/polling.cr @@ -5,7 +5,7 @@ if wake_at = event.value.wake_at? if event.value.type.io_read? || event.value.type.io_write? TimeControl.when_controlling do |ctx| - ctx.add_io_timeout(wake_at) + ctx.add_io_timeout(wake_at) if ctx.controls?(Fiber.current) end end end diff --git a/src/time_control/core_ext/fiber.cr b/src/time_control/core_ext/fiber.cr index ef9223c..692d9cf 100644 --- a/src/time_control/core_ext/fiber.cr +++ b/src/time_control/core_ext/fiber.cr @@ -3,8 +3,10 @@ class Fiber def timeout(timeout : Time::Span, select_action : Channel::TimeoutAction) : Nil @timeout_select_action = select_action TimeControl.when_controlling do |ctx| - ctx.add_select_timeout(self, timeout) - return + if ctx.controls?(self) + ctx.add_select_timeout(self, timeout) + return + end end timeout_event.add(timeout) end @@ -14,8 +16,10 @@ class Fiber return unless @timeout_select_action @timeout_select_action = nil TimeControl.when_controlling do |ctx| - ctx.cancel_select_timeout(self) - return + if ctx.controls?(self) + ctx.cancel_select_timeout(self) + return + end end @timeout_event.try &.delete end diff --git a/src/time_control/errors.cr b/src/time_control/errors.cr index b41bd7a..596ef20 100644 --- a/src/time_control/errors.cr +++ b/src/time_control/errors.cr @@ -7,13 +7,18 @@ module TimeControl # still pending, indicating that not all scheduled sleeps or timeouts # were advanced past. # - # The number of pending timers is available via `#count`. + # The number of pending timers is available via `#count` and the names of + # the fibers that owned them via `#fiber_names`. class PendingTimersError < Error # Returns the number of timers that were still pending. getter count : Int32 - def initialize(@count : Int32) - super("#{@count} timer(s) were still pending when the control block exited") + # Returns the names of the fibers whose timers were still pending. + getter fiber_names : Array(String) + + def initialize(@fiber_names : Array(String)) + @count = @fiber_names.size + super("#{@count} timer(s) were still pending when the control block exited: #{@fiber_names.join(", ")}") end end end