From 67b140df177ee729dc9d8f73495608a314dcd626 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jon=20B=C3=B6rjesson?= Date: Fri, 12 Jun 2026 12:54:46 +0200 Subject: [PATCH 1/2] Re-attach pending timers to the real event loop on control exit Instead of raising PendingTimersError when the control block exits with virtual timers still pending, track each pending timer together with the virtual time remaining until it would have fired (wake_at - virtual_now) and re-attach it to the real event loop with that remaining duration. Parked sleeping fibers and select timeouts then wake up later in real time; control returns immediately. Removes PendingTimersError (the abstract Error base remains). Co-Authored-By: Claude Opus 4.8 (1M context) --- AGENTS.md | 4 ++-- README.md | 7 +++++-- spec/time_control_spec.cr | 39 ++++++++++++++++++++++++++++------ src/time_control.cr | 4 +--- src/time_control/context.cr | 42 ++++++++++++++++++++++++++++++------- src/time_control/errors.cr | 14 ------------- 6 files changed, 76 insertions(+), 34 deletions(-) diff --git a/AGENTS.md b/AGENTS.md index dcf9bb5..b7700c6 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -31,13 +31,13 @@ When enabled: - `Crystal::System::Time.clock_gettime` is monkey-patched to return virtual monotonic time; `Crystal::System::Time.compute_utc_seconds_and_nanoseconds` is patched to return virtual UTC time. - A `Fiber::ExecutionContext::Isolated` runs a dedicated timer thread. When `advance(N)` is called, the timer thread processes all virtual timers with `wake_at <= virtual_now + N` in order, enqueuing sleeping fibers back into their original execution contexts. - After each batch of woken fibers, the timer thread waits 1ms (real sleep — the timer loop thread is tracked on `Context` and excluded from interception via `TimeControl.when_controlling`) to allow chained sleeps to register before rechecking. -- If the control block exits with timers still pending, `PendingTimersError` is raised. +- If the control block exits with timers still pending, each is re-attached to the real event loop with the virtual time that remained until it would have fired (`wake_at - virtual_now`), so the parked fibers wake up later in real time. `control` returns immediately. ## Public API - `TimeControl.control` — the main entry point - `Controller#advance(duration)` — advances virtual time by a fixed amount - `Controller#advance` — advances virtual time to the next pending timer -- `TimeControl::Error`, `TimeControl::PendingTimersError` — error classes +- `TimeControl::Error` — base error class Everything else is marked `# :nodoc:` or `private`. Do not add doc comments to internal methods, patch methods, or instance variables. diff --git a/README.md b/README.md index 66dfa82..2aea13f 100644 --- a/README.md +++ b/README.md @@ -205,8 +205,11 @@ end ### Pending timers If the `control` block exits while virtual timers are still pending (i.e. -fibers are sleeping beyond the last `advance`), a `TimeControl::PendingTimersError` -is raised. This catches specs that forget to advance past all scheduled work. +fibers are sleeping or waiting on a `select` timeout beyond the last `advance`), +each pending timer is re-attached to the real event loop with the virtual time +that remained until it would have fired (`wake_at - virtual_now`). The parked +fibers then wake up later in real time rather than being abandoned. `control` +returns immediately and does not wait for them. ## How it works diff --git a/spec/time_control_spec.cr b/spec/time_control_spec.cr index 88692c3..7fed20c 100644 --- a/spec/time_control_spec.cr +++ b/spec/time_control_spec.cr @@ -205,14 +205,41 @@ describe TimeControl do end end - it "raises if timers are still pending when the control block exits" do - ex = expect_raises(TimeControl::PendingTimersError, /1 timer\(s\) were still pending/) do - TimeControl.control do |_controller| - spawn { sleep 1.second } - Fiber.yield + it "re-attaches a pending sleep to the real event loop when the control block exits" do + woke = Channel(Nil).new + + TimeControl.control do |_controller| + spawn { sleep 10.milliseconds; woke.send(nil) } + Fiber.yield + end + + select + when woke.receive + when timeout(2.seconds) + fail "pending sleep was not re-attached to the real event loop" + end + end + + it "re-attaches a pending select timeout to the real event loop when the control block exits" do + fired = Channel(Symbol).new + + TimeControl.control do |_controller| + spawn do + select + when fired.receive + when timeout(10.milliseconds) + fired.send(:timed_out) + end end + Fiber.yield + end + + select + when result = fired.receive + result.should eq(:timed_out) + when timeout(2.seconds) + fail "pending select timeout was not re-attached to the real event loop" end - ex.count.should eq(1) end describe "IO timeouts" do diff --git a/src/time_control.cr b/src/time_control.cr index 78540cf..01ab408 100644 --- a/src/time_control.cr +++ b/src/time_control.cr @@ -101,8 +101,6 @@ module TimeControl @@context = nil ctx.try &.stop isolated.try &.wait - if ctx && ctx.leaked_timer_count > 0 - raise PendingTimersError.new(ctx.leaked_timer_count) - end + ctx.try &.reschedule_pending_timers end end diff --git a/src/time_control/context.cr b/src/time_control/context.cr index f65da73..6a05469 100644 --- a/src/time_control/context.cr +++ b/src/time_control/context.cr @@ -12,9 +12,14 @@ module TimeControl private record TimerEntry, fiber : Fiber, wake_at : Time::Instant, kind : TimerKind + # A timer that was still pending when control stopped, paired with the + # virtual time remaining until it would have fired. Used to re-attach the + # parked fiber to the real event loop with that remaining duration. + private record PendingTimer, fiber : Fiber, time_left : Time::Span, kind : TimerKind + getter virtual_now : Time::Instant property timer_loop_thread : Thread? - getter leaked_timer_count : Int32 = 0 + getter pending_timers = [] of PendingTimer @advance_ch : Channel(Time::Span) @done_ch : Channel(Nil) @@ -139,8 +144,27 @@ module TimeControl entry = @timers_mutex.synchronize { @timers.shift? } break unless entry next if entry.kind.io_timeout_wakeup? # not a stuck fiber; just an interrupt trigger for the event loop - @leaked_timer_count += 1 - enqueue_entry(entry) + @pending_timers << PendingTimer.new(entry.fiber, entry.wake_at - @virtual_now, entry.kind) + end + end + + # Re-attaches any timers that were still pending when control stopped to the + # real event loop, each with the virtual time that remained until it would + # have fired. Must be called after control has stopped (i.e. while time is no + # longer being intercepted) so that `sleep` uses the real event loop. + def reschedule_pending_timers : Nil + @pending_timers.each do |pending| + fiber = pending.fiber + kind = pending.kind + time_left = pending.time_left + spawn do + sleep time_left if time_left > Time::Span.zero + case kind + in .sleep? then fiber.enqueue + in .select_timeout? then fire_select_timeout(fiber) + in .io_timeout_wakeup? # never collected into @pending_timers + end + end end end @@ -164,10 +188,7 @@ module TimeControl in .sleep? entry.fiber.enqueue in .select_timeout? - if select_action = entry.fiber.timeout_select_action - entry.fiber.timeout_select_action = nil - entry.fiber.enqueue if select_action.time_expired? - end + fire_select_timeout(entry.fiber) in .io_timeout_wakeup? # Wake the blocking kqueue/epoll wait in the fiber's event loop. This # causes it to call process_timers, which checks deadlines against the @@ -177,6 +198,13 @@ module TimeControl end end + private def fire_select_timeout(fiber : Fiber) : Nil + if select_action = fiber.timeout_select_action + fiber.timeout_select_action = nil + fiber.enqueue if select_action.time_expired? + end + end + # Inserts the entry into the sorted @timers array and returns true if the # run loop should be notified (i.e. an advance is active and this timer # falls within its target window). diff --git a/src/time_control/errors.cr b/src/time_control/errors.cr index b41bd7a..434f13e 100644 --- a/src/time_control/errors.cr +++ b/src/time_control/errors.cr @@ -2,18 +2,4 @@ module TimeControl # Base class for all `TimeControl` errors. abstract class Error < ::Exception end - - # Raised when the `TimeControl.control` block exits with virtual timers - # still pending, indicating that not all scheduled sleeps or timeouts - # were advanced past. - # - # The number of pending timers is available via `#count`. - class PendingTimersError < Error - # Returns the number of timers that were still pending. - getter count : Int32 - - def initialize(@count : Int32) - super("#{@count} timer(s) were still pending when the control block exited") - end - end end From d9c8c8520cbdf9839368a9479aee0f8d98d165ad Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jon=20B=C3=B6rjesson?= Date: Fri, 12 Jun 2026 13:09:32 +0200 Subject: [PATCH 2/2] Adopt pre-existing real-loop timers into virtual time on control start Fibers already sleeping or waiting on a select timeout on the real event loop when control begins are now adopted into virtual time, the mirror image of the exit-side re-attach. At control start every execution context's event loop is scanned (Fiber::ExecutionContext.each); sleep and select-timeout events are removed from the real Crystal::EventLoop::Polling timer heap and re-registered on the virtual clock, so they wake when virtual time is advanced past their deadline instead of in real time. Adopted sleeps carry an on_wake proc that marks the real (stack-allocated) sleep event timed out before enqueuing, avoiding the event loop's "manually resumed before the timer expired" guard; this proc is honored on both the advance path and the exit-side re-attach path. Polling builds only (kqueue/epoll). IO-operation timeouts are left on the real loop since they are tied to a live IO wait. Co-Authored-By: Claude Opus 4.8 (1M context) --- AGENTS.md | 1 + README.md | 12 ++++ spec/time_control_mt_spec.cr | 24 ++++++++ spec/time_control_spec.cr | 58 +++++++++++++++++++ src/time_control.cr | 5 ++ src/time_control/context.cr | 51 +++++++++++++--- .../core_ext/crystal/event_loop/polling.cr | 47 +++++++++++++++ .../core_ext/crystal/event_loop/timers.cr | 28 +++++++++ 8 files changed, 219 insertions(+), 7 deletions(-) create mode 100644 src/time_control/core_ext/crystal/event_loop/timers.cr diff --git a/AGENTS.md b/AGENTS.md index b7700c6..384d42d 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -31,6 +31,7 @@ When enabled: - `Crystal::System::Time.clock_gettime` is monkey-patched to return virtual monotonic time; `Crystal::System::Time.compute_utc_seconds_and_nanoseconds` is patched to return virtual UTC time. - A `Fiber::ExecutionContext::Isolated` runs a dedicated timer thread. When `advance(N)` is called, the timer thread processes all virtual timers with `wake_at <= virtual_now + N` in order, enqueuing sleeping fibers back into their original execution contexts. - After each batch of woken fibers, the timer thread waits 1ms (real sleep — the timer loop thread is tracked on `Context` and excluded from interception via `TimeControl.when_controlling`) to allow chained sleeps to register before rechecking. +- At control start, fibers already sleeping or waiting on a select timeout on the real event loop are adopted into virtual time: every execution context's event loop is scanned (`Fiber::ExecutionContext.each`) and its sleep/select-timeout events are removed from the real `Crystal::EventLoop::Polling` timer heap and re-registered on the virtual clock. Polling builds only; IO-operation timeouts are left in place. - If the control block exits with timers still pending, each is re-attached to the real event loop with the virtual time that remained until it would have fired (`wake_at - virtual_now`), so the parked fibers wake up later in real time. `control` returns immediately. ## Public API diff --git a/README.md b/README.md index 2aea13f..08a90e2 100644 --- a/README.md +++ b/README.md @@ -202,6 +202,18 @@ it "data arrives before the deadline, no timeout" do end ``` +### Adopting pre-existing timers + +Fibers that were already sleeping or waiting on a `select` timeout when the +`control` block begins are adopted into virtual time: at control start every +execution context's event loop is scanned, and its sleep and select-timeout +timers are moved onto the virtual clock so they wake when virtual time is +advanced past their deadline, instead of waking in real wall-clock time. + +This is only done on builds using the Polling event loop (kqueue/epoll). Only +sleeps and select timeouts are adopted; pending IO-operation timeouts stay on +the real event loop, since they are tied to a live IO wait. + ### Pending timers If the `control` block exits while virtual timers are still pending (i.e. diff --git a/spec/time_control_mt_spec.cr b/spec/time_control_mt_spec.cr index 9a48e5c..807d8db 100644 --- a/spec/time_control_mt_spec.cr +++ b/spec/time_control_mt_spec.cr @@ -1,6 +1,30 @@ require "./spec_helper" describe "TimeControl multi-threaded" do + it "adopts a fiber sleeping in a separate isolated context before control starts" do + ready = Channel(Nil).new + woke = Channel(Nil).new + + sleeper = Fiber::ExecutionContext::Isolated.new("pre-sleeper") do + ready.send(nil) + sleep 10.seconds + woke.send(nil) + end + + ready.receive + sleep 5.milliseconds # let the isolated fiber register its real sleep timer + + TimeControl.control do |controller| + controller.advance(10.seconds) + select + when woke.receive + when timeout(2.seconds) + fail "adopted sleep in isolated context did not wake when virtual time advanced" + end + end + + sleeper.wait + end it "fires read timeouts across isolated contexts in virtual time order" do order = Channel(Int32).new(3) pipes = Array.new(3) { IO.pipe } diff --git a/spec/time_control_spec.cr b/spec/time_control_spec.cr index 7fed20c..95ae7f3 100644 --- a/spec/time_control_spec.cr +++ b/spec/time_control_spec.cr @@ -242,6 +242,64 @@ describe TimeControl do end end + describe "adopting pre-existing real-loop timers" do + it "adopts a fiber already sleeping on the real event loop" do + ready = Channel(Nil).new + woke = Channel(Time::Instant).new + + spawn do + ready.send(nil) + sleep 10.seconds + woke.send(Time.instant) + end + + ready.receive + sleep 5.milliseconds # let the fiber register its real sleep timer and park + + TimeControl.control do |controller| + t0 = Time.instant + controller.advance(10.seconds) + select + when woke_at = woke.receive + # The fiber had already been asleep for a few real ms before control + # started, so slightly under 10s of virtual time remains. + (woke_at - t0).should be_close(10.seconds, 1.second) + when timeout(2.seconds) + fail "adopted sleep did not wake when virtual time advanced" + end + end + end + + it "adopts a fiber already waiting on a select timeout on the real event loop" do + ready = Channel(Nil).new + result = Channel(Symbol).new + trigger = Channel(Nil).new + + spawn do + ready.send(nil) + select + when trigger.receive + result.send(:received) + when timeout(10.seconds) + result.send(:timed_out) + end + end + + ready.receive + sleep 5.milliseconds # let the fiber register its real select timeout and park + + TimeControl.control do |controller| + controller.advance(10.seconds) + select + when r = result.receive + r.should eq(:timed_out) + when timeout(2.seconds) + fail "adopted select timeout did not fire when virtual time advanced" + end + end + end + end + describe "IO timeouts" do it "fires read_timeout when virtual time advances past it" do r, w = IO.pipe diff --git a/src/time_control.cr b/src/time_control.cr index 01ab408..b15b73f 100644 --- a/src/time_control.cr +++ b/src/time_control.cr @@ -8,6 +8,7 @@ require "./time_control/context" require "./time_control/controller" require "./time_control/core_ext/crystal/system/time" require "./time_control/core_ext/crystal/event_loop" +require "./time_control/core_ext/crystal/event_loop/timers" require "./time_control/core_ext/crystal/event_loop/polling" require "./time_control/core_ext/fiber" @@ -91,6 +92,10 @@ module TimeControl private def self.control(ctx : Context, & : Controller ->) : Nil @@context = ctx + {% if Crystal::EventLoop.all_subclasses.any? { |subclass| subclass.name == "Crystal::EventLoop::Polling" } %} + adopt_pending_timers(ctx) + {% end %} + isolated = Fiber::ExecutionContext::Isolated.new("time-control") do ctx.timer_loop_thread = Thread.current ctx.run diff --git a/src/time_control/context.cr b/src/time_control/context.cr index 6a05469..17a2c9a 100644 --- a/src/time_control/context.cr +++ b/src/time_control/context.cr @@ -10,12 +10,15 @@ module TimeControl IoTimeoutWakeup end - private record TimerEntry, fiber : Fiber, wake_at : Time::Instant, kind : TimerKind + # *on_wake*, when set, fully owns waking the fiber (used by adopted timers + # whose fiber is suspended in the real event loop's `sleep` and must have + # its stack-allocated event marked timed out before being enqueued). + private record TimerEntry, fiber : Fiber, wake_at : Time::Instant, kind : TimerKind, on_wake : Proc(Nil)? = nil # A timer that was still pending when control stopped, paired with the # virtual time remaining until it would have fired. Used to re-attach the # parked fiber to the real event loop with that remaining duration. - private record PendingTimer, fiber : Fiber, time_left : Time::Span, kind : TimerKind + private record PendingTimer, fiber : Fiber, time_left : Time::Span, kind : TimerKind, on_wake : Proc(Nil)? = nil getter virtual_now : Time::Instant property timer_loop_thread : Thread? @@ -87,6 +90,30 @@ module TimeControl notify_run_loop if notify end + # Adopts a fiber that was already sleeping on the real event loop when + # control started. *wake_at* is the event's real monotonic deadline, which + # equals the virtual deadline because both clocks share their origin at + # control start. *on_wake* marks the real (stack-allocated) sleep event + # timed out before enqueuing the fiber. + def adopt_sleep(fiber : Fiber, wake_at : Time::Instant, on_wake : Proc(Nil)) : Nil + notify = @timers_mutex.synchronize do + at = wake_at < @virtual_now ? @virtual_now : wake_at + insert_timer(TimerEntry.new(fiber, at, TimerKind::Sleep, on_wake)) + end + notify_run_loop if notify + end + + # Adopts a fiber that was already waiting on a `select` timeout on the real + # event loop when control started. Reuses the native select-timeout wake and + # cancel paths. + def adopt_select_timeout(fiber : Fiber, wake_at : Time::Instant) : Nil + notify = @timers_mutex.synchronize do + at = wake_at < @virtual_now ? @virtual_now : wake_at + insert_timer(TimerEntry.new(fiber, at, TimerKind::SelectTimeout)) + end + notify_run_loop if notify + end + def cancel_select_timeout(fiber : Fiber) : Nil @timers_mutex.synchronize do @timers.reject! { |e| e.fiber.same?(fiber) && e.kind.select_timeout? } @@ -144,7 +171,7 @@ module TimeControl entry = @timers_mutex.synchronize { @timers.shift? } break unless entry next if entry.kind.io_timeout_wakeup? # not a stuck fiber; just an interrupt trigger for the event loop - @pending_timers << PendingTimer.new(entry.fiber, entry.wake_at - @virtual_now, entry.kind) + @pending_timers << PendingTimer.new(entry.fiber, entry.wake_at - @virtual_now, entry.kind, entry.on_wake) end end @@ -157,12 +184,17 @@ module TimeControl fiber = pending.fiber kind = pending.kind time_left = pending.time_left + on_wake = pending.on_wake spawn do sleep time_left if time_left > Time::Span.zero - case kind - in .sleep? then fiber.enqueue - in .select_timeout? then fire_select_timeout(fiber) - in .io_timeout_wakeup? # never collected into @pending_timers + if on_wake + on_wake.call + else + case kind + in .sleep? then fiber.enqueue + in .select_timeout? then fire_select_timeout(fiber) + in .io_timeout_wakeup? # never collected into @pending_timers + end end end end @@ -184,6 +216,11 @@ module TimeControl end private def enqueue_entry(entry : TimerEntry) : Nil + if on_wake = entry.on_wake + on_wake.call + return + end + case entry.kind in .sleep? entry.fiber.enqueue diff --git a/src/time_control/core_ext/crystal/event_loop/polling.cr b/src/time_control/core_ext/crystal/event_loop/polling.cr index bd238d3..3fb4287 100644 --- a/src/time_control/core_ext/crystal/event_loop/polling.cr +++ b/src/time_control/core_ext/crystal/event_loop/polling.cr @@ -11,5 +11,52 @@ end previous_def end + + # :nodoc: + # + # Removes every sleep and select-timeout timer currently registered on this + # loop and returns them so they can be re-registered on the virtual clock. + # IO-operation timeouts are left in place (they're tied to a live IO wait). + def time_control_extract_pending_timers + extracted = [] of {Fiber, Time::Instant, Crystal::EventLoop::Polling::Event*, Crystal::EventLoop::Polling::Event::Type} + @timers_lock.sync do + @timers.time_control_each do |event| + type = event.value.type + if (type.sleep? || type.select_timeout?) && (wake_at = event.value.wake_at?) + extracted << {event.value.fiber, wake_at, event, type} + end + end + # Raw delete + re-arm under the lock: delete_timer would re-take the + # non-reentrant @timers_lock. + extracted.each { |entry| @timers.delete(entry[2]) } + system_set_timer(@timers.next_ready?) + end + extracted + end + end + + module TimeControl + # :nodoc: + # + # Adopts fibers already sleeping or waiting on a select timeout on the real + # event loop when control starts, across all execution contexts, so they're + # governed by the virtual clock instead of waking in real time. + def self.adopt_pending_timers(ctx : Context) : Nil + seen = Set(UInt64).new + Fiber::ExecutionContext.each do |execution_context| + event_loop = execution_context.event_loop + next unless event_loop.is_a?(Crystal::EventLoop::Polling) + next unless seen.add?(event_loop.object_id) + + event_loop.time_control_extract_pending_timers.each do |(fiber, wake_at, event, type)| + case type + when .sleep? + ctx.adopt_sleep(fiber, wake_at, -> { event.value.timed_out!; fiber.enqueue }) + when .select_timeout? + ctx.adopt_select_timeout(fiber, wake_at) + end + end + end + end end {% end %} diff --git a/src/time_control/core_ext/crystal/event_loop/timers.cr b/src/time_control/core_ext/crystal/event_loop/timers.cr new file mode 100644 index 0000000..6eae963 --- /dev/null +++ b/src/time_control/core_ext/crystal/event_loop/timers.cr @@ -0,0 +1,28 @@ +{% if Crystal::EventLoop.all_subclasses.any? { |subclass| subclass.name == "Crystal::EventLoop::Polling" } %} + class Crystal::PointerPairingHeap(T) + # :nodoc: + # + # Read-only traversal of every node in the heap, in unspecified order. + def time_control_each(& : Pointer(T) ->) : Nil + node = @head + return if node.null? + + stack = [node] + until stack.empty? + n = stack.pop + yield n + child = n.value.heap_child? + stack << child unless child.null? + sibling = n.value.heap_next? + stack << sibling unless sibling.null? + end + end + end + + struct Crystal::EventLoop::Timers(T) + # :nodoc: + def time_control_each(& : Pointer(T) ->) : Nil + @heap.time_control_each { |event| yield event } + end + end +{% end %}