Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
- [OpenAPI] Add support for per-endpoint OAuth2/OpenID scopes via `@auth_scope` tag by [@Piyush-Goenka](https://github.com/Piyush-Goenka) (#272).
- Reuse `define_dynamic_method` and `define_maybe_yield` methods in `RageController::API` from `Rage::Internal` by [@numice](https://github.com/numice) (#273).
- Add the `form_actions` router configuration (#278).
- [Deferred] Add native periodic task scheduling with multi-process leader election via `File#flock` by [@Abishekcs](https://github.com/Abishekcs) (#233).

### Fixed

Expand Down
33 changes: 33 additions & 0 deletions lib/rage/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -715,6 +715,39 @@ class Deferred
# @private
def initialize
@configured = false
@schedule_blocks = []
end

# Stores the scheduling block for later execution
def schedule(&block)
@schedule_blocks << block
end

# Evaluates all stored schedule blocks and returns the collected tasks.
# Called at boot time after all app constants are loaded.
def scheduled_tasks
@schedule_blocks.flat_map do |block|
dsl = ScheduleDSL.new
dsl.instance_eval(&block)
dsl.tasks
end
end

# @private
class ScheduleDSL
attr_reader :tasks

def initialize
@tasks = []
end

# Registers a task to run on a fixed interval (in seconds)
def every(interval, task:)
unless task.is_a?(Class) && task.include?(Rage::Deferred::Task)
raise ArgumentError, "#{task} must be a class that includes Rage::Deferred::Task"
end
@tasks << { interval:, task: }
end
end

# Returns the backend instance used by `Rage::Deferred`.
Expand Down
7 changes: 7 additions & 0 deletions lib/rage/deferred/deferred.rb
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,16 @@ def self.__middleware_chain
)
end

# @private
def self.__start_scheduler
Rage::Deferred::Scheduler.start(Rage.config.deferred.scheduled_tasks)
end

# @private
def self.__initialize
__middleware_chain
__load_tasks
__start_scheduler
end

module Backends
Expand All @@ -96,6 +102,7 @@ class PushTimeout < StandardError
end

require_relative "task"
require_relative "scheduler"
require_relative "queue"
require_relative "proxy"
require_relative "context"
Expand Down
22 changes: 22 additions & 0 deletions lib/rage/deferred/scheduler.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# frozen_string_literal: true

class Rage::Deferred::Scheduler
LOCK_PATH = "/tmp/rage_deferred_scheduler.lock"

def self.start(tasks)
return if tasks.empty?

Rage::Internal.pick_a_worker(lock_path: LOCK_PATH) do
Rage.logger.info " Worker PID #{Process.pid} is managing scheduled tasks"
register_timers tasks
end
end

def self.register_timers(tasks)
tasks.each do |entry|
Iodine.run_every((entry[:interval] * 1000).to_i) do
entry[:task].enqueue
end
end
end
end
11 changes: 6 additions & 5 deletions lib/rage/internal.rb
Original file line number Diff line number Diff line change
Expand Up @@ -67,17 +67,18 @@ def stream_name_for(streamables)
# Pick a worker process to execute a block of code.
# This is useful for ensuring that certain code is only executed by a single worker in a multi-worker setup, e.g. for broadcasting messages to known streams or for running periodic tasks.
# @yield The block of code to be executed by the picked worker
def pick_a_worker(&block)
@lock_file, lock_path = Tempfile.new.yield_self { |file| [file, file.path] }

Iodine.on_state(:on_start) do
worker_lock = File.new(lock_path)
def pick_a_worker(lock_path: nil, &block)
@lock_file, lock_path = Tempfile.new.yield_self { |f| [f, f.path] } unless lock_path

attempt = proc do
worker_lock = File.open(lock_path, File::CREAT | File::WRONLY)
if worker_lock.flock(File::LOCK_EX | File::LOCK_NB)
@worker_lock = worker_lock
block.call
end
end

Iodine.running? ? attempt.call : Iodine.on_state(:on_start) { attempt.call }
end

private
Expand Down
53 changes: 53 additions & 0 deletions spec/deferred/scheduler_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# frozen_string_literal: true

RSpec.describe Rage::Deferred::Scheduler do
let(:task) { double("Rage::Deferred::Task") }
let(:tasks) { [{ interval: 60, task: task }] }
let(:logger) { double("Logger", info: nil) }

before do
allow(Rage).to receive(:logger).and_return(logger)
allow(Iodine).to receive(:run_every)
allow(Rage::Internal).to receive(:pick_a_worker) { |&block| block.call }
allow(task).to receive(:enqueue)
end

describe ".start" do
it "does not start when no tasks are configured" do
described_class.start([])
expect(Iodine).not_to have_received(:run_every)
end

it "registers timers when leader is elected" do
described_class.start(tasks)
expect(Iodine).to have_received(:run_every).with(60_000)
end

it "does not register task timers when lock is not acquired" do
allow(Rage::Internal).to receive(:pick_a_worker)
described_class.start(tasks)
expect(Iodine).not_to have_received(:run_every)
end

it "registers a timer for each task" do
tasks = [
{ interval: 60, task: double(enqueue: true) },
{ interval: 120, task: double(enqueue: true) }
]
described_class.start(tasks)
expect(Iodine).to have_received(:run_every).with(60_000)
expect(Iodine).to have_received(:run_every).with(120_000)
end

it "calls enqueue on the task when timer fires" do
allow(Iodine).to receive(:run_every).with(60_000) { |&block| block.call }
described_class.start(tasks)
expect(task).to have_received(:enqueue)
end

it "passes the correct lock path to pick_a_worker" do
described_class.start(tasks)
expect(Rage::Internal).to have_received(:pick_a_worker).with(lock_path: Rage::Deferred::Scheduler::LOCK_PATH)
end
end
end
15 changes: 15 additions & 0 deletions spec/internal_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@
let(:on_start_callbacks) { [] }

before do
allow(Iodine).to receive(:running?).and_return(false)
allow(Iodine).to receive(:on_state).with(:on_start) do |&block|
on_start_callbacks << block
end
Expand All @@ -116,13 +117,27 @@
expect(on_start_callbacks.size).to eq(1)
end

it "executes the block immediately when Iodine is already running" do
allow(Iodine).to receive(:running?).and_return(true)
executed = false
described_class.pick_a_worker { executed = true }
expect(executed).to be(true)
end

it "creates a lock file" do
described_class.pick_a_worker { "work" }

lock_file = described_class.instance_variable_get(:@lock_file)
expect(File.exist?(lock_file.path)).to be(true)
end

it "uses provided lock_path instead of creating a new tempfile" do
lock_path = Tempfile.new.path
expect(Tempfile).not_to receive(:new)
described_class.pick_a_worker(lock_path: lock_path) { "work" }
on_start_callbacks.first.call
end

it "executes the block when lock is acquired" do
executed = false
described_class.pick_a_worker { executed = true }
Expand Down
Loading