Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ When enabled:

## Public API

- `TimeControl.control` — the main entry point
- `TimeControl.control` — the main entry point; accepts an optional `only:` regex restricting interception to fibers whose name matches (all other fibers keep real timers)
- `Controller#advance(duration)` — advances virtual time by a fixed amount
- `Controller#advance` — advances virtual time to the next pending timer
- `TimeControl::Error`, `TimeControl::PendingTimersError` — error classes
Expand Down
27 changes: 26 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,32 @@ end

If the `control` block exits while virtual timers are still pending (i.e.
fibers are sleeping beyond the last `advance`), a `TimeControl::PendingTimersError`
is raised. This catches specs that forget to advance past all scheduled work.
is raised, naming the fibers whose timers were left pending. This catches
specs that forget to advance past all scheduled work.

### Controlling only specific fibers

In a process with background fibers (housekeeping loops, tickers, stats
collectors), intercepting every sleep is not practical: any background fiber
that re-arms its periodic sleep while time is controlled freezes and leaks a
pending timer. Pass `only:` with a regex to restrict interception to fibers
whose name matches — all other fibers keep their timers on the real event
loop.

```crystal
it "reconnects after the backoff" do
TimeControl.control(only: /^Reconnect worker/) do |controller|
spawn(name: "Reconnect worker") { sleep 30.seconds; reconnect }
controller.advance(30.seconds)
end
end
```

Note that while time is controlled the real event loop also measures its
timer deadlines against the virtual clock, so a non-matching fiber's timer
registered inside the block fires once virtual time advances past it or
after the block exits — late by up to the advanced amount. For periodic
housekeeping work this is harmless, but it is not full isolation.
Comment on lines +230 to +234

## How it works

Expand Down
40 changes: 40 additions & 0 deletions spec/time_control_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,46 @@ describe TimeControl do
ex.count.should eq(1)
end

it "includes the names of fibers with pending timers in the error" do
ex = expect_raises(TimeControl::PendingTimersError, /housekeeping/) do
TimeControl.control do |_controller|
spawn(name: "housekeeping") { sleep 1.second }
Fiber.yield
end
end
ex.fiber_names.should eq(["housekeeping"])
end

describe "only filter" do
it "advances timers of fibers whose name matches" do
done = Channel(Nil).new
TimeControl.control(only: /^worker/) do |controller|
spawn(name: "worker 1") do
sleep 5.minutes
done.send(nil)
end
controller.advance(5.minutes)
done.receive
end
end

it "leaves timers of non-matching fibers on the real event loop" do
TimeControl.control(only: /^worker/) do |_controller|
spawn(name: "housekeeping") { sleep 1.second }
Fiber.yield
# Exits cleanly: the housekeeping timer is real, not a leaked
# virtual timer.
end
end
Comment on lines +241 to +248

it "does not control unnamed fibers" do
TimeControl.control(only: /^worker/) do |_controller|
spawn { sleep 1.second }
Fiber.yield
end
end
Comment on lines +250 to +255
end

describe "IO timeouts" do
it "fires read_timeout when virtual time advances past it" do
r, w = IO.pipe
Expand Down
30 changes: 22 additions & 8 deletions src/time_control.cr
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,15 @@ module TimeControl
# An optional *start_time* sets the initial value of `Time.utc` inside the
# block. Without it, virtual UTC starts at the real wall-clock time.
#
# An optional *only* filter restricts which fibers get virtual timers: only
# fibers whose name matches the regex are controlled; sleeps and timeouts
# of all other fibers stay on the real event loop. Use this when the
# process runs background fibers (housekeeping loops, tickers) that should
# keep running normally — without a filter their re-armed timers freeze and
# leak, raising `PendingTimersError` at block exit. Note that real timers
# are still measured against the virtual clock while it is controlled, so
# an unfiltered fiber's timer can fire late by up to the advanced amount.
Comment on lines +45 to +47
#
# ```
# TimeControl.control do |controller|
# spawn { sleep 5.minutes; puts "done" }
Expand All @@ -50,17 +59,22 @@ module TimeControl
# TimeControl.control("2030-01-01T09:00:00Z") do |controller|
# Time.utc.hour # => 9
# end
#
# TimeControl.control(only: /^worker/) do |controller|
# spawn(name: "worker 1") { sleep 5.minutes; puts "done" }
# controller.advance(5.minutes)
# end
# ```
def self.control(& : Controller ->) : Nil
control(Context.new) { |controller| yield controller }
def self.control(only : Regex? = nil, & : Controller ->) : Nil
control(Context.new(only: only)) { |controller| yield controller }
end

def self.control(start_time : Time, & : Controller ->) : Nil
control(Context.new(start_time)) { |controller| yield controller }
def self.control(start_time : Time, only : Regex? = nil, & : Controller ->) : Nil
control(Context.new(start_time, only: only)) { |controller| yield controller }
end

def self.control(start_time : String, & : Controller ->) : Nil
control(parse_start_time(start_time)) { |controller| yield controller }
def self.control(start_time : String, only : Regex? = nil, & : Controller ->) : Nil
control(parse_start_time(start_time), only: only) { |controller| yield controller }
end

private def self.parse_start_time(str : String) : Time
Expand Down Expand Up @@ -101,8 +115,8 @@ module TimeControl
@@context = nil
ctx.try &.stop
isolated.try &.wait
if ctx && ctx.leaked_timer_count > 0
raise PendingTimersError.new(ctx.leaked_timer_count)
if ctx && !ctx.leaked_timer_names.empty?
raise PendingTimersError.new(ctx.leaked_timer_names)
end
end
end
17 changes: 14 additions & 3 deletions src/time_control/context.cr
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ module TimeControl

getter virtual_now : Time::Instant
property timer_loop_thread : Thread?
getter leaked_timer_count : Int32 = 0
getter leaked_timer_names = [] of String

@advance_ch : Channel(Time::Span)
@done_ch : Channel(Nil)
Expand All @@ -28,7 +28,7 @@ module TimeControl
@timers : Array(TimerEntry)
@timers_mutex : Mutex

def initialize(start_time : Time? = nil)
def initialize(start_time : Time? = nil, @only : Regex? = nil)
@advance_ch = Channel(Time::Span).new
@done_ch = Channel(Nil).new
@timer_inserted_ch = Channel(Nil).new(1)
Expand Down Expand Up @@ -61,6 +61,17 @@ module TimeControl
{@control_start_utc_s + total_ns // 1_000_000_000_i64, (total_ns % 1_000_000_000_i64).to_i32}
end

# Whether timers for *fiber* should be virtualized. True for every fiber
# unless an `only` filter is set, in which case only fibers whose name
# matches the filter are controlled — the rest keep real timers.
def controls?(fiber : Fiber) : Bool
only = @only
return true unless only
name = fiber.name
return false unless name
name.matches?(only)
end

def add_sleep(fiber : Fiber, duration : Time::Span) : Nil
notify = @timers_mutex.synchronize do
insert_timer(TimerEntry.new(fiber, @virtual_now + duration, TimerKind::Sleep))
Expand Down Expand Up @@ -139,7 +150,7 @@ module TimeControl
entry = @timers_mutex.synchronize { @timers.shift? }
break unless entry
next if entry.kind.io_timeout_wakeup? # not a stuck fiber; just an interrupt trigger for the event loop
@leaked_timer_count += 1
@leaked_timer_names << (entry.fiber.name || "unnamed fiber")
enqueue_entry(entry)
end
end
Expand Down
8 changes: 5 additions & 3 deletions src/time_control/core_ext/crystal/event_loop.cr
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@
def sleep(duration : ::Time::Span) : Nil
if duration.total_nanoseconds > 0
TimeControl.when_controlling do |ctx|
ctx.add_sleep(Fiber.current, duration)
Fiber.suspend
return
if ctx.controls?(Fiber.current)
ctx.add_sleep(Fiber.current, duration)
Fiber.suspend
return
end
end
end
previous_def
Expand Down
2 changes: 1 addition & 1 deletion src/time_control/core_ext/crystal/event_loop/polling.cr
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
if wake_at = event.value.wake_at?
if event.value.type.io_read? || event.value.type.io_write?
TimeControl.when_controlling do |ctx|
ctx.add_io_timeout(wake_at)
ctx.add_io_timeout(wake_at) if ctx.controls?(Fiber.current)
end
end
end
Expand Down
12 changes: 8 additions & 4 deletions src/time_control/core_ext/fiber.cr
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ class Fiber
def timeout(timeout : Time::Span, select_action : Channel::TimeoutAction) : Nil
@timeout_select_action = select_action
TimeControl.when_controlling do |ctx|
ctx.add_select_timeout(self, timeout)
return
if ctx.controls?(self)
ctx.add_select_timeout(self, timeout)
return
end
end
timeout_event.add(timeout)
end
Expand All @@ -14,8 +16,10 @@ class Fiber
return unless @timeout_select_action
@timeout_select_action = nil
TimeControl.when_controlling do |ctx|
ctx.cancel_select_timeout(self)
return
if ctx.controls?(self)
ctx.cancel_select_timeout(self)
return
end
end
@timeout_event.try &.delete
end
Expand Down
11 changes: 8 additions & 3 deletions src/time_control/errors.cr
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,18 @@ module TimeControl
# still pending, indicating that not all scheduled sleeps or timeouts
# were advanced past.
#
# The number of pending timers is available via `#count`.
# The number of pending timers is available via `#count` and the names of
# the fibers that owned them via `#fiber_names`.
class PendingTimersError < Error
# Returns the number of timers that were still pending.
getter count : Int32

def initialize(@count : Int32)
super("#{@count} timer(s) were still pending when the control block exited")
# Returns the names of the fibers whose timers were still pending.
getter fiber_names : Array(String)

def initialize(@fiber_names : Array(String))
@count = @fiber_names.size
super("#{@count} timer(s) were still pending when the control block exited: #{@fiber_names.join(", ")}")
Comment on lines +19 to +21
end
end
end