diff --git a/AGENTS.md b/AGENTS.md index dcf9bb5..384d42d 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -31,13 +31,14 @@ When enabled: - `Crystal::System::Time.clock_gettime` is monkey-patched to return virtual monotonic time; `Crystal::System::Time.compute_utc_seconds_and_nanoseconds` is patched to return virtual UTC time. - A `Fiber::ExecutionContext::Isolated` runs a dedicated timer thread. When `advance(N)` is called, the timer thread processes all virtual timers with `wake_at <= virtual_now + N` in order, enqueuing sleeping fibers back into their original execution contexts. - After each batch of woken fibers, the timer thread waits 1ms (real sleep — the timer loop thread is tracked on `Context` and excluded from interception via `TimeControl.when_controlling`) to allow chained sleeps to register before rechecking. -- If the control block exits with timers still pending, `PendingTimersError` is raised. +- At control start, fibers already sleeping or waiting on a select timeout on the real event loop are adopted into virtual time: every execution context's event loop is scanned (`Fiber::ExecutionContext.each`) and its sleep/select-timeout events are removed from the real `Crystal::EventLoop::Polling` timer heap and re-registered on the virtual clock. Polling builds only; IO-operation timeouts are left in place. +- If the control block exits with timers still pending, each is re-attached to the real event loop with the virtual time that remained until it would have fired (`wake_at - virtual_now`), so the parked fibers wake up later in real time. `control` returns immediately. ## Public API - `TimeControl.control` — the main entry point - `Controller#advance(duration)` — advances virtual time by a fixed amount - `Controller#advance` — advances virtual time to the next pending timer -- `TimeControl::Error`, `TimeControl::PendingTimersError` — error classes +- `TimeControl::Error` — base error class Everything else is marked `# :nodoc:` or `private`. Do not add doc comments to internal methods, patch methods, or instance variables. diff --git a/README.md b/README.md index 66dfa82..08a90e2 100644 --- a/README.md +++ b/README.md @@ -202,11 +202,26 @@ it "data arrives before the deadline, no timeout" do end ``` +### Adopting pre-existing timers + +Fibers that were already sleeping or waiting on a `select` timeout when the +`control` block begins are adopted into virtual time: at control start every +execution context's event loop is scanned, and its sleep and select-timeout +timers are moved onto the virtual clock so they wake when virtual time is +advanced past their deadline, instead of waking in real wall-clock time. + +This is only done on builds using the Polling event loop (kqueue/epoll). Only +sleeps and select timeouts are adopted; pending IO-operation timeouts stay on +the real event loop, since they are tied to a live IO wait. + ### Pending timers If the `control` block exits while virtual timers are still pending (i.e. -fibers are sleeping beyond the last `advance`), a `TimeControl::PendingTimersError` -is raised. This catches specs that forget to advance past all scheduled work. +fibers are sleeping or waiting on a `select` timeout beyond the last `advance`), +each pending timer is re-attached to the real event loop with the virtual time +that remained until it would have fired (`wake_at - virtual_now`). The parked +fibers then wake up later in real time rather than being abandoned. `control` +returns immediately and does not wait for them. ## How it works diff --git a/spec/time_control_mt_spec.cr b/spec/time_control_mt_spec.cr index 9a48e5c..807d8db 100644 --- a/spec/time_control_mt_spec.cr +++ b/spec/time_control_mt_spec.cr @@ -1,6 +1,30 @@ require "./spec_helper" describe "TimeControl multi-threaded" do + it "adopts a fiber sleeping in a separate isolated context before control starts" do + ready = Channel(Nil).new + woke = Channel(Nil).new + + sleeper = Fiber::ExecutionContext::Isolated.new("pre-sleeper") do + ready.send(nil) + sleep 10.seconds + woke.send(nil) + end + + ready.receive + sleep 5.milliseconds # let the isolated fiber register its real sleep timer + + TimeControl.control do |controller| + controller.advance(10.seconds) + select + when woke.receive + when timeout(2.seconds) + fail "adopted sleep in isolated context did not wake when virtual time advanced" + end + end + + sleeper.wait + end it "fires read timeouts across isolated contexts in virtual time order" do order = Channel(Int32).new(3) pipes = Array.new(3) { IO.pipe } diff --git a/spec/time_control_spec.cr b/spec/time_control_spec.cr index 88692c3..95ae7f3 100644 --- a/spec/time_control_spec.cr +++ b/spec/time_control_spec.cr @@ -205,14 +205,99 @@ describe TimeControl do end end - it "raises if timers are still pending when the control block exits" do - ex = expect_raises(TimeControl::PendingTimersError, /1 timer\(s\) were still pending/) do - TimeControl.control do |_controller| - spawn { sleep 1.second } - Fiber.yield + it "re-attaches a pending sleep to the real event loop when the control block exits" do + woke = Channel(Nil).new + + TimeControl.control do |_controller| + spawn { sleep 10.milliseconds; woke.send(nil) } + Fiber.yield + end + + select + when woke.receive + when timeout(2.seconds) + fail "pending sleep was not re-attached to the real event loop" + end + end + + it "re-attaches a pending select timeout to the real event loop when the control block exits" do + fired = Channel(Symbol).new + + TimeControl.control do |_controller| + spawn do + select + when fired.receive + when timeout(10.milliseconds) + fired.send(:timed_out) + end + end + Fiber.yield + end + + select + when result = fired.receive + result.should eq(:timed_out) + when timeout(2.seconds) + fail "pending select timeout was not re-attached to the real event loop" + end + end + + describe "adopting pre-existing real-loop timers" do + it "adopts a fiber already sleeping on the real event loop" do + ready = Channel(Nil).new + woke = Channel(Time::Instant).new + + spawn do + ready.send(nil) + sleep 10.seconds + woke.send(Time.instant) + end + + ready.receive + sleep 5.milliseconds # let the fiber register its real sleep timer and park + + TimeControl.control do |controller| + t0 = Time.instant + controller.advance(10.seconds) + select + when woke_at = woke.receive + # The fiber had already been asleep for a few real ms before control + # started, so slightly under 10s of virtual time remains. + (woke_at - t0).should be_close(10.seconds, 1.second) + when timeout(2.seconds) + fail "adopted sleep did not wake when virtual time advanced" + end + end + end + + it "adopts a fiber already waiting on a select timeout on the real event loop" do + ready = Channel(Nil).new + result = Channel(Symbol).new + trigger = Channel(Nil).new + + spawn do + ready.send(nil) + select + when trigger.receive + result.send(:received) + when timeout(10.seconds) + result.send(:timed_out) + end + end + + ready.receive + sleep 5.milliseconds # let the fiber register its real select timeout and park + + TimeControl.control do |controller| + controller.advance(10.seconds) + select + when r = result.receive + r.should eq(:timed_out) + when timeout(2.seconds) + fail "adopted select timeout did not fire when virtual time advanced" + end end end - ex.count.should eq(1) end describe "IO timeouts" do diff --git a/src/time_control.cr b/src/time_control.cr index 78540cf..b15b73f 100644 --- a/src/time_control.cr +++ b/src/time_control.cr @@ -8,6 +8,7 @@ require "./time_control/context" require "./time_control/controller" require "./time_control/core_ext/crystal/system/time" require "./time_control/core_ext/crystal/event_loop" +require "./time_control/core_ext/crystal/event_loop/timers" require "./time_control/core_ext/crystal/event_loop/polling" require "./time_control/core_ext/fiber" @@ -91,6 +92,10 @@ module TimeControl private def self.control(ctx : Context, & : Controller ->) : Nil @@context = ctx + {% if Crystal::EventLoop.all_subclasses.any? { |subclass| subclass.name == "Crystal::EventLoop::Polling" } %} + adopt_pending_timers(ctx) + {% end %} + isolated = Fiber::ExecutionContext::Isolated.new("time-control") do ctx.timer_loop_thread = Thread.current ctx.run @@ -101,8 +106,6 @@ module TimeControl @@context = nil ctx.try &.stop isolated.try &.wait - if ctx && ctx.leaked_timer_count > 0 - raise PendingTimersError.new(ctx.leaked_timer_count) - end + ctx.try &.reschedule_pending_timers end end diff --git a/src/time_control/context.cr b/src/time_control/context.cr index f65da73..17a2c9a 100644 --- a/src/time_control/context.cr +++ b/src/time_control/context.cr @@ -10,11 +10,19 @@ module TimeControl IoTimeoutWakeup end - private record TimerEntry, fiber : Fiber, wake_at : Time::Instant, kind : TimerKind + # *on_wake*, when set, fully owns waking the fiber (used by adopted timers + # whose fiber is suspended in the real event loop's `sleep` and must have + # its stack-allocated event marked timed out before being enqueued). + private record TimerEntry, fiber : Fiber, wake_at : Time::Instant, kind : TimerKind, on_wake : Proc(Nil)? = nil + + # A timer that was still pending when control stopped, paired with the + # virtual time remaining until it would have fired. Used to re-attach the + # parked fiber to the real event loop with that remaining duration. + private record PendingTimer, fiber : Fiber, time_left : Time::Span, kind : TimerKind, on_wake : Proc(Nil)? = nil getter virtual_now : Time::Instant property timer_loop_thread : Thread? - getter leaked_timer_count : Int32 = 0 + getter pending_timers = [] of PendingTimer @advance_ch : Channel(Time::Span) @done_ch : Channel(Nil) @@ -82,6 +90,30 @@ module TimeControl notify_run_loop if notify end + # Adopts a fiber that was already sleeping on the real event loop when + # control started. *wake_at* is the event's real monotonic deadline, which + # equals the virtual deadline because both clocks share their origin at + # control start. *on_wake* marks the real (stack-allocated) sleep event + # timed out before enqueuing the fiber. + def adopt_sleep(fiber : Fiber, wake_at : Time::Instant, on_wake : Proc(Nil)) : Nil + notify = @timers_mutex.synchronize do + at = wake_at < @virtual_now ? @virtual_now : wake_at + insert_timer(TimerEntry.new(fiber, at, TimerKind::Sleep, on_wake)) + end + notify_run_loop if notify + end + + # Adopts a fiber that was already waiting on a `select` timeout on the real + # event loop when control started. Reuses the native select-timeout wake and + # cancel paths. + def adopt_select_timeout(fiber : Fiber, wake_at : Time::Instant) : Nil + notify = @timers_mutex.synchronize do + at = wake_at < @virtual_now ? @virtual_now : wake_at + insert_timer(TimerEntry.new(fiber, at, TimerKind::SelectTimeout)) + end + notify_run_loop if notify + end + def cancel_select_timeout(fiber : Fiber) : Nil @timers_mutex.synchronize do @timers.reject! { |e| e.fiber.same?(fiber) && e.kind.select_timeout? } @@ -139,8 +171,32 @@ module TimeControl entry = @timers_mutex.synchronize { @timers.shift? } break unless entry next if entry.kind.io_timeout_wakeup? # not a stuck fiber; just an interrupt trigger for the event loop - @leaked_timer_count += 1 - enqueue_entry(entry) + @pending_timers << PendingTimer.new(entry.fiber, entry.wake_at - @virtual_now, entry.kind, entry.on_wake) + end + end + + # Re-attaches any timers that were still pending when control stopped to the + # real event loop, each with the virtual time that remained until it would + # have fired. Must be called after control has stopped (i.e. while time is no + # longer being intercepted) so that `sleep` uses the real event loop. + def reschedule_pending_timers : Nil + @pending_timers.each do |pending| + fiber = pending.fiber + kind = pending.kind + time_left = pending.time_left + on_wake = pending.on_wake + spawn do + sleep time_left if time_left > Time::Span.zero + if on_wake + on_wake.call + else + case kind + in .sleep? then fiber.enqueue + in .select_timeout? then fire_select_timeout(fiber) + in .io_timeout_wakeup? # never collected into @pending_timers + end + end + end end end @@ -160,14 +216,16 @@ module TimeControl end private def enqueue_entry(entry : TimerEntry) : Nil + if on_wake = entry.on_wake + on_wake.call + return + end + case entry.kind in .sleep? entry.fiber.enqueue in .select_timeout? - if select_action = entry.fiber.timeout_select_action - entry.fiber.timeout_select_action = nil - entry.fiber.enqueue if select_action.time_expired? - end + fire_select_timeout(entry.fiber) in .io_timeout_wakeup? # Wake the blocking kqueue/epoll wait in the fiber's event loop. This # causes it to call process_timers, which checks deadlines against the @@ -177,6 +235,13 @@ module TimeControl end end + private def fire_select_timeout(fiber : Fiber) : Nil + if select_action = fiber.timeout_select_action + fiber.timeout_select_action = nil + fiber.enqueue if select_action.time_expired? + end + end + # Inserts the entry into the sorted @timers array and returns true if the # run loop should be notified (i.e. an advance is active and this timer # falls within its target window). diff --git a/src/time_control/core_ext/crystal/event_loop/polling.cr b/src/time_control/core_ext/crystal/event_loop/polling.cr index bd238d3..3fb4287 100644 --- a/src/time_control/core_ext/crystal/event_loop/polling.cr +++ b/src/time_control/core_ext/crystal/event_loop/polling.cr @@ -11,5 +11,52 @@ end previous_def end + + # :nodoc: + # + # Removes every sleep and select-timeout timer currently registered on this + # loop and returns them so they can be re-registered on the virtual clock. + # IO-operation timeouts are left in place (they're tied to a live IO wait). + def time_control_extract_pending_timers + extracted = [] of {Fiber, Time::Instant, Crystal::EventLoop::Polling::Event*, Crystal::EventLoop::Polling::Event::Type} + @timers_lock.sync do + @timers.time_control_each do |event| + type = event.value.type + if (type.sleep? || type.select_timeout?) && (wake_at = event.value.wake_at?) + extracted << {event.value.fiber, wake_at, event, type} + end + end + # Raw delete + re-arm under the lock: delete_timer would re-take the + # non-reentrant @timers_lock. + extracted.each { |entry| @timers.delete(entry[2]) } + system_set_timer(@timers.next_ready?) + end + extracted + end + end + + module TimeControl + # :nodoc: + # + # Adopts fibers already sleeping or waiting on a select timeout on the real + # event loop when control starts, across all execution contexts, so they're + # governed by the virtual clock instead of waking in real time. + def self.adopt_pending_timers(ctx : Context) : Nil + seen = Set(UInt64).new + Fiber::ExecutionContext.each do |execution_context| + event_loop = execution_context.event_loop + next unless event_loop.is_a?(Crystal::EventLoop::Polling) + next unless seen.add?(event_loop.object_id) + + event_loop.time_control_extract_pending_timers.each do |(fiber, wake_at, event, type)| + case type + when .sleep? + ctx.adopt_sleep(fiber, wake_at, -> { event.value.timed_out!; fiber.enqueue }) + when .select_timeout? + ctx.adopt_select_timeout(fiber, wake_at) + end + end + end + end end {% end %} diff --git a/src/time_control/core_ext/crystal/event_loop/timers.cr b/src/time_control/core_ext/crystal/event_loop/timers.cr new file mode 100644 index 0000000..6eae963 --- /dev/null +++ b/src/time_control/core_ext/crystal/event_loop/timers.cr @@ -0,0 +1,28 @@ +{% if Crystal::EventLoop.all_subclasses.any? { |subclass| subclass.name == "Crystal::EventLoop::Polling" } %} + class Crystal::PointerPairingHeap(T) + # :nodoc: + # + # Read-only traversal of every node in the heap, in unspecified order. + def time_control_each(& : Pointer(T) ->) : Nil + node = @head + return if node.null? + + stack = [node] + until stack.empty? + n = stack.pop + yield n + child = n.value.heap_child? + stack << child unless child.null? + sibling = n.value.heap_next? + stack << sibling unless sibling.null? + end + end + end + + struct Crystal::EventLoop::Timers(T) + # :nodoc: + def time_control_each(& : Pointer(T) ->) : Nil + @heap.time_control_each { |event| yield event } + end + end +{% end %} diff --git a/src/time_control/errors.cr b/src/time_control/errors.cr index b41bd7a..434f13e 100644 --- a/src/time_control/errors.cr +++ b/src/time_control/errors.cr @@ -2,18 +2,4 @@ module TimeControl # Base class for all `TimeControl` errors. abstract class Error < ::Exception end - - # Raised when the `TimeControl.control` block exits with virtual timers - # still pending, indicating that not all scheduled sleeps or timeouts - # were advanced past. - # - # The number of pending timers is available via `#count`. - class PendingTimersError < Error - # Returns the number of timers that were still pending. - getter count : Int32 - - def initialize(@count : Int32) - super("#{@count} timer(s) were still pending when the control block exited") - end - end end