Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,14 @@ When enabled:
- `Crystal::System::Time.clock_gettime` is monkey-patched to return virtual monotonic time; `Crystal::System::Time.compute_utc_seconds_and_nanoseconds` is patched to return virtual UTC time.
- A `Fiber::ExecutionContext::Isolated` runs a dedicated timer thread. When `advance(N)` is called, the timer thread processes all virtual timers with `wake_at <= virtual_now + N` in order, enqueuing sleeping fibers back into their original execution contexts.
- After each batch of woken fibers, the timer thread waits 1ms (real sleep — the timer loop thread is tracked on `Context` and excluded from interception via `TimeControl.when_controlling`) to allow chained sleeps to register before rechecking.
- If the control block exits with timers still pending, `PendingTimersError` is raised.
- At control start, fibers already sleeping or waiting on a select timeout on the real event loop are adopted into virtual time: every execution context's event loop is scanned (`Fiber::ExecutionContext.each`) and its sleep/select-timeout events are removed from the real `Crystal::EventLoop::Polling` timer heap and re-registered on the virtual clock. Polling builds only; IO-operation timeouts are left in place.
- If the control block exits with timers still pending, each is re-attached to the real event loop with the virtual time that remained until it would have fired (`wake_at - virtual_now`), so the parked fibers wake up later in real time. `control` returns immediately.

## Public API

- `TimeControl.control` — the main entry point
- `Controller#advance(duration)` — advances virtual time by a fixed amount
- `Controller#advance` — advances virtual time to the next pending timer
- `TimeControl::Error`, `TimeControl::PendingTimersError` — error classes
- `TimeControl::Error` — base error class

Everything else is marked `# :nodoc:` or `private`. Do not add doc comments to internal methods, patch methods, or instance variables.
19 changes: 17 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -202,11 +202,26 @@ it "data arrives before the deadline, no timeout" do
end
```

### Adopting pre-existing timers

Fibers that were already sleeping or waiting on a `select` timeout when the
`control` block begins are adopted into virtual time: at control start every
execution context's event loop is scanned, and its sleep and select-timeout
timers are moved onto the virtual clock so they wake when virtual time is
advanced past their deadline, instead of waking in real wall-clock time.

This is only done on builds using the Polling event loop (kqueue/epoll). Only
sleeps and select timeouts are adopted; pending IO-operation timeouts stay on
the real event loop, since they are tied to a live IO wait.

### Pending timers

If the `control` block exits while virtual timers are still pending (i.e.
fibers are sleeping beyond the last `advance`), a `TimeControl::PendingTimersError`
is raised. This catches specs that forget to advance past all scheduled work.
fibers are sleeping or waiting on a `select` timeout beyond the last `advance`),
each pending timer is re-attached to the real event loop with the virtual time
that remained until it would have fired (`wake_at - virtual_now`). The parked
fibers then wake up later in real time rather than being abandoned. `control`
returns immediately and does not wait for them.

## How it works

Expand Down
24 changes: 24 additions & 0 deletions spec/time_control_mt_spec.cr
Original file line number Diff line number Diff line change
@@ -1,6 +1,30 @@
require "./spec_helper"

describe "TimeControl multi-threaded" do
it "adopts a fiber sleeping in a separate isolated context before control starts" do
ready = Channel(Nil).new
woke = Channel(Nil).new

sleeper = Fiber::ExecutionContext::Isolated.new("pre-sleeper") do
ready.send(nil)
sleep 10.seconds
woke.send(nil)
end

ready.receive
sleep 5.milliseconds # let the isolated fiber register its real sleep timer

TimeControl.control do |controller|
controller.advance(10.seconds)
select
when woke.receive
when timeout(2.seconds)
fail "adopted sleep in isolated context did not wake when virtual time advanced"
end
end

sleeper.wait
end
it "fires read timeouts across isolated contexts in virtual time order" do
order = Channel(Int32).new(3)
pipes = Array.new(3) { IO.pipe }
Expand Down
97 changes: 91 additions & 6 deletions spec/time_control_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -205,14 +205,99 @@ describe TimeControl do
end
end

it "raises if timers are still pending when the control block exits" do
ex = expect_raises(TimeControl::PendingTimersError, /1 timer\(s\) were still pending/) do
TimeControl.control do |_controller|
spawn { sleep 1.second }
Fiber.yield
it "re-attaches a pending sleep to the real event loop when the control block exits" do
woke = Channel(Nil).new

TimeControl.control do |_controller|
spawn { sleep 10.milliseconds; woke.send(nil) }
Fiber.yield
end

select
when woke.receive
when timeout(2.seconds)
fail "pending sleep was not re-attached to the real event loop"
end
end

it "re-attaches a pending select timeout to the real event loop when the control block exits" do
fired = Channel(Symbol).new

TimeControl.control do |_controller|
spawn do
select
when fired.receive
when timeout(10.milliseconds)
fired.send(:timed_out)
end
end
Fiber.yield
end

select
when result = fired.receive
result.should eq(:timed_out)
when timeout(2.seconds)
fail "pending select timeout was not re-attached to the real event loop"
end
end

describe "adopting pre-existing real-loop timers" do
it "adopts a fiber already sleeping on the real event loop" do
ready = Channel(Nil).new
woke = Channel(Time::Instant).new

spawn do
ready.send(nil)
sleep 10.seconds
woke.send(Time.instant)
end

ready.receive
sleep 5.milliseconds # let the fiber register its real sleep timer and park

TimeControl.control do |controller|
t0 = Time.instant
controller.advance(10.seconds)
select
when woke_at = woke.receive
# The fiber had already been asleep for a few real ms before control
# started, so slightly under 10s of virtual time remains.
(woke_at - t0).should be_close(10.seconds, 1.second)
when timeout(2.seconds)
fail "adopted sleep did not wake when virtual time advanced"
end
end
end

it "adopts a fiber already waiting on a select timeout on the real event loop" do
ready = Channel(Nil).new
result = Channel(Symbol).new
trigger = Channel(Nil).new

spawn do
ready.send(nil)
select
when trigger.receive
result.send(:received)
when timeout(10.seconds)
result.send(:timed_out)
end
end

ready.receive
sleep 5.milliseconds # let the fiber register its real select timeout and park

TimeControl.control do |controller|
controller.advance(10.seconds)
select
when r = result.receive
r.should eq(:timed_out)
when timeout(2.seconds)
fail "adopted select timeout did not fire when virtual time advanced"
end
end
end
ex.count.should eq(1)
end

describe "IO timeouts" do
Expand Down
9 changes: 6 additions & 3 deletions src/time_control.cr
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ require "./time_control/context"
require "./time_control/controller"
require "./time_control/core_ext/crystal/system/time"
require "./time_control/core_ext/crystal/event_loop"
require "./time_control/core_ext/crystal/event_loop/timers"
require "./time_control/core_ext/crystal/event_loop/polling"
require "./time_control/core_ext/fiber"

Expand Down Expand Up @@ -91,6 +92,10 @@ module TimeControl
private def self.control(ctx : Context, & : Controller ->) : Nil
@@context = ctx

{% if Crystal::EventLoop.all_subclasses.any? { |subclass| subclass.name == "Crystal::EventLoop::Polling" } %}
adopt_pending_timers(ctx)
{% end %}

isolated = Fiber::ExecutionContext::Isolated.new("time-control") do
ctx.timer_loop_thread = Thread.current
ctx.run
Expand All @@ -101,8 +106,6 @@ module TimeControl
@@context = nil
ctx.try &.stop
isolated.try &.wait
if ctx && ctx.leaked_timer_count > 0
raise PendingTimersError.new(ctx.leaked_timer_count)
end
ctx.try &.reschedule_pending_timers
end
end
81 changes: 73 additions & 8 deletions src/time_control/context.cr
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,19 @@ module TimeControl
IoTimeoutWakeup
end

private record TimerEntry, fiber : Fiber, wake_at : Time::Instant, kind : TimerKind
# *on_wake*, when set, fully owns waking the fiber (used by adopted timers
# whose fiber is suspended in the real event loop's `sleep` and must have
# its stack-allocated event marked timed out before being enqueued).
private record TimerEntry, fiber : Fiber, wake_at : Time::Instant, kind : TimerKind, on_wake : Proc(Nil)? = nil

# A timer that was still pending when control stopped, paired with the
# virtual time remaining until it would have fired. Used to re-attach the
# parked fiber to the real event loop with that remaining duration.
private record PendingTimer, fiber : Fiber, time_left : Time::Span, kind : TimerKind, on_wake : Proc(Nil)? = nil

getter virtual_now : Time::Instant
property timer_loop_thread : Thread?
getter leaked_timer_count : Int32 = 0
getter pending_timers = [] of PendingTimer

@advance_ch : Channel(Time::Span)
@done_ch : Channel(Nil)
Expand Down Expand Up @@ -82,6 +90,30 @@ module TimeControl
notify_run_loop if notify
end

# Adopts a fiber that was already sleeping on the real event loop when
# control started. *wake_at* is the event's real monotonic deadline, which
# equals the virtual deadline because both clocks share their origin at
# control start. *on_wake* marks the real (stack-allocated) sleep event
# timed out before enqueuing the fiber.
def adopt_sleep(fiber : Fiber, wake_at : Time::Instant, on_wake : Proc(Nil)) : Nil
Comment on lines +93 to +98
notify = @timers_mutex.synchronize do
at = wake_at < @virtual_now ? @virtual_now : wake_at
insert_timer(TimerEntry.new(fiber, at, TimerKind::Sleep, on_wake))
end
notify_run_loop if notify
end

# Adopts a fiber that was already waiting on a `select` timeout on the real
# event loop when control started. Reuses the native select-timeout wake and
# cancel paths.
def adopt_select_timeout(fiber : Fiber, wake_at : Time::Instant) : Nil
notify = @timers_mutex.synchronize do
Comment on lines +106 to +110
at = wake_at < @virtual_now ? @virtual_now : wake_at
insert_timer(TimerEntry.new(fiber, at, TimerKind::SelectTimeout))
end
notify_run_loop if notify
end

def cancel_select_timeout(fiber : Fiber) : Nil
@timers_mutex.synchronize do
@timers.reject! { |e| e.fiber.same?(fiber) && e.kind.select_timeout? }
Expand Down Expand Up @@ -139,8 +171,32 @@ module TimeControl
entry = @timers_mutex.synchronize { @timers.shift? }
break unless entry
next if entry.kind.io_timeout_wakeup? # not a stuck fiber; just an interrupt trigger for the event loop
@leaked_timer_count += 1
enqueue_entry(entry)
@pending_timers << PendingTimer.new(entry.fiber, entry.wake_at - @virtual_now, entry.kind, entry.on_wake)
end
end

# Re-attaches any timers that were still pending when control stopped to the
# real event loop, each with the virtual time that remained until it would
# have fired. Must be called after control has stopped (i.e. while time is no
# longer being intercepted) so that `sleep` uses the real event loop.
def reschedule_pending_timers : Nil
Comment on lines +178 to +182
@pending_timers.each do |pending|
fiber = pending.fiber
kind = pending.kind
time_left = pending.time_left
on_wake = pending.on_wake
spawn do
sleep time_left if time_left > Time::Span.zero
if on_wake
on_wake.call
else
case kind
in .sleep? then fiber.enqueue
in .select_timeout? then fire_select_timeout(fiber)
in .io_timeout_wakeup? # never collected into @pending_timers
end
end
end
end
end

Expand All @@ -160,14 +216,16 @@ module TimeControl
end

private def enqueue_entry(entry : TimerEntry) : Nil
if on_wake = entry.on_wake
on_wake.call
return
end

case entry.kind
in .sleep?
entry.fiber.enqueue
in .select_timeout?
if select_action = entry.fiber.timeout_select_action
entry.fiber.timeout_select_action = nil
entry.fiber.enqueue if select_action.time_expired?
end
fire_select_timeout(entry.fiber)
in .io_timeout_wakeup?
# Wake the blocking kqueue/epoll wait in the fiber's event loop. This
# causes it to call process_timers, which checks deadlines against the
Expand All @@ -177,6 +235,13 @@ module TimeControl
end
end

private def fire_select_timeout(fiber : Fiber) : Nil
if select_action = fiber.timeout_select_action
fiber.timeout_select_action = nil
fiber.enqueue if select_action.time_expired?
end
end

# Inserts the entry into the sorted @timers array and returns true if the
# run loop should be notified (i.e. an advance is active and this timer
# falls within its target window).
Expand Down
47 changes: 47 additions & 0 deletions src/time_control/core_ext/crystal/event_loop/polling.cr
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,52 @@
end
previous_def
end

# :nodoc:
#
# Removes every sleep and select-timeout timer currently registered on this
# loop and returns them so they can be re-registered on the virtual clock.
# IO-operation timeouts are left in place (they're tied to a live IO wait).
def time_control_extract_pending_timers
extracted = [] of {Fiber, Time::Instant, Crystal::EventLoop::Polling::Event*, Crystal::EventLoop::Polling::Event::Type}
@timers_lock.sync do
@timers.time_control_each do |event|
type = event.value.type
if (type.sleep? || type.select_timeout?) && (wake_at = event.value.wake_at?)
extracted << {event.value.fiber, wake_at, event, type}
end
end
# Raw delete + re-arm under the lock: delete_timer would re-take the
# non-reentrant @timers_lock.
extracted.each { |entry| @timers.delete(entry[2]) }
system_set_timer(@timers.next_ready?)
end
extracted
end
end

module TimeControl
# :nodoc:
#
# Adopts fibers already sleeping or waiting on a select timeout on the real
# event loop when control starts, across all execution contexts, so they're
# governed by the virtual clock instead of waking in real time.
def self.adopt_pending_timers(ctx : Context) : Nil
seen = Set(UInt64).new
Fiber::ExecutionContext.each do |execution_context|
event_loop = execution_context.event_loop
next unless event_loop.is_a?(Crystal::EventLoop::Polling)
next unless seen.add?(event_loop.object_id)

event_loop.time_control_extract_pending_timers.each do |(fiber, wake_at, event, type)|
case type
when .sleep?
ctx.adopt_sleep(fiber, wake_at, -> { event.value.timed_out!; fiber.enqueue })
when .select_timeout?
ctx.adopt_select_timeout(fiber, wake_at)
end
end
end
end
end
{% end %}
Loading