From 9b8bc2a255b5722c748c98ecf4a3e2c811032df5 Mon Sep 17 00:00:00 2001 From: Abishekcs Date: Tue, 21 Apr 2026 22:48:42 +0530 Subject: [PATCH] feat(deferred): add periodic task scheduling Adds native recurring task scheduling to Rage::Deferred via a simple DSL, without relying on external tools like cron or third-party libraries. ## Public API Rage.configure do config.deferred.schedule do every 1.hour, task: CleanupExpiredInvites every 1.minute, task: ResetCache end end ## How It Works - Uses Iodine's run_every timer primitive within the existing runtime. - Schedule blocks are stored during configuration and evaluated at boot, after all app constants are loaded. Multiple schedule calls are supported. ## Leader Election Uses Rage::Internal.pick_a_worker with a fixed shared lock path so all workers compete on the same file. The winner registers timers, others stand by. When the leader dies, the OS releases the lock and the next worker to boot takes over. Timers reset on leader change. ## Design Decisions - Overlaps allowed by default. - First run waits for the first interval. - Arguments not supported - tasks discover their own data at runtime. - task must include Rage::Deferred::Task, validated at boot with ArgumentError. - pick_a_worker updated to accept lock_path and handle Iodine.running?. ## Tests Covers timer registration, task enqueue, lock path, and empty task list. --- CHANGELOG.md | 1 + lib/rage/configuration.rb | 33 ++++++++++++++++++++ lib/rage/deferred/deferred.rb | 7 +++++ lib/rage/deferred/scheduler.rb | 22 ++++++++++++++ lib/rage/internal.rb | 11 +++---- spec/deferred/scheduler_spec.rb | 53 +++++++++++++++++++++++++++++++++ spec/internal_spec.rb | 15 ++++++++++ 7 files changed, 137 insertions(+), 5 deletions(-) create mode 100644 lib/rage/deferred/scheduler.rb create mode 100644 spec/deferred/scheduler_spec.rb diff --git a/CHANGELOG.md b/CHANGELOG.md index 419de4f4..a04e1143 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ - [OpenAPI] Add support for per-endpoint OAuth2/OpenID scopes via `@auth_scope` tag by [@Piyush-Goenka](https://github.com/Piyush-Goenka) (#272). - Reuse `define_dynamic_method` and `define_maybe_yield` methods in `RageController::API` from `Rage::Internal` by [@numice](https://github.com/numice) (#273). - Add the `form_actions` router configuration (#278). +- [Deferred] Add native periodic task scheduling with multi-process leader election via `File#flock` by [@Abishekcs](https://github.com/Abishekcs) (#233). ### Fixed diff --git a/lib/rage/configuration.rb b/lib/rage/configuration.rb index 702d5416..69c759dd 100644 --- a/lib/rage/configuration.rb +++ b/lib/rage/configuration.rb @@ -715,6 +715,39 @@ class Deferred # @private def initialize @configured = false + @schedule_blocks = [] + end + + # Stores the scheduling block for later execution + def schedule(&block) + @schedule_blocks << block + end + + # Evaluates all stored schedule blocks and returns the collected tasks. + # Called at boot time after all app constants are loaded. + def scheduled_tasks + @schedule_blocks.flat_map do |block| + dsl = ScheduleDSL.new + dsl.instance_eval(&block) + dsl.tasks + end + end + + # @private + class ScheduleDSL + attr_reader :tasks + + def initialize + @tasks = [] + end + + # Registers a task to run on a fixed interval (in seconds) + def every(interval, task:) + unless task.is_a?(Class) && task.include?(Rage::Deferred::Task) + raise ArgumentError, "#{task} must be a class that includes Rage::Deferred::Task" + end + @tasks << { interval:, task: } + end end # Returns the backend instance used by `Rage::Deferred`. diff --git a/lib/rage/deferred/deferred.rb b/lib/rage/deferred/deferred.rb index 470a5b90..274eba34 100644 --- a/lib/rage/deferred/deferred.rb +++ b/lib/rage/deferred/deferred.rb @@ -82,10 +82,16 @@ def self.__middleware_chain ) end + # @private + def self.__start_scheduler + Rage::Deferred::Scheduler.start(Rage.config.deferred.scheduled_tasks) + end + # @private def self.__initialize __middleware_chain __load_tasks + __start_scheduler end module Backends @@ -96,6 +102,7 @@ class PushTimeout < StandardError end require_relative "task" +require_relative "scheduler" require_relative "queue" require_relative "proxy" require_relative "context" diff --git a/lib/rage/deferred/scheduler.rb b/lib/rage/deferred/scheduler.rb new file mode 100644 index 00000000..a055ccab --- /dev/null +++ b/lib/rage/deferred/scheduler.rb @@ -0,0 +1,22 @@ +# frozen_string_literal: true + +class Rage::Deferred::Scheduler + LOCK_PATH = "/tmp/rage_deferred_scheduler.lock" + + def self.start(tasks) + return if tasks.empty? + + Rage::Internal.pick_a_worker(lock_path: LOCK_PATH) do + Rage.logger.info " Worker PID #{Process.pid} is managing scheduled tasks" + register_timers tasks + end + end + + def self.register_timers(tasks) + tasks.each do |entry| + Iodine.run_every((entry[:interval] * 1000).to_i) do + entry[:task].enqueue + end + end + end +end diff --git a/lib/rage/internal.rb b/lib/rage/internal.rb index dc15383b..b0d09430 100644 --- a/lib/rage/internal.rb +++ b/lib/rage/internal.rb @@ -67,17 +67,18 @@ def stream_name_for(streamables) # Pick a worker process to execute a block of code. # This is useful for ensuring that certain code is only executed by a single worker in a multi-worker setup, e.g. for broadcasting messages to known streams or for running periodic tasks. # @yield The block of code to be executed by the picked worker - def pick_a_worker(&block) - @lock_file, lock_path = Tempfile.new.yield_self { |file| [file, file.path] } - - Iodine.on_state(:on_start) do - worker_lock = File.new(lock_path) + def pick_a_worker(lock_path: nil, &block) + @lock_file, lock_path = Tempfile.new.yield_self { |f| [f, f.path] } unless lock_path + attempt = proc do + worker_lock = File.open(lock_path, File::CREAT | File::WRONLY) if worker_lock.flock(File::LOCK_EX | File::LOCK_NB) @worker_lock = worker_lock block.call end end + + Iodine.running? ? attempt.call : Iodine.on_state(:on_start) { attempt.call } end private diff --git a/spec/deferred/scheduler_spec.rb b/spec/deferred/scheduler_spec.rb new file mode 100644 index 00000000..1448d985 --- /dev/null +++ b/spec/deferred/scheduler_spec.rb @@ -0,0 +1,53 @@ +# frozen_string_literal: true + +RSpec.describe Rage::Deferred::Scheduler do + let(:task) { double("Rage::Deferred::Task") } + let(:tasks) { [{ interval: 60, task: task }] } + let(:logger) { double("Logger", info: nil) } + + before do + allow(Rage).to receive(:logger).and_return(logger) + allow(Iodine).to receive(:run_every) + allow(Rage::Internal).to receive(:pick_a_worker) { |&block| block.call } + allow(task).to receive(:enqueue) + end + + describe ".start" do + it "does not start when no tasks are configured" do + described_class.start([]) + expect(Iodine).not_to have_received(:run_every) + end + + it "registers timers when leader is elected" do + described_class.start(tasks) + expect(Iodine).to have_received(:run_every).with(60_000) + end + + it "does not register task timers when lock is not acquired" do + allow(Rage::Internal).to receive(:pick_a_worker) + described_class.start(tasks) + expect(Iodine).not_to have_received(:run_every) + end + + it "registers a timer for each task" do + tasks = [ + { interval: 60, task: double(enqueue: true) }, + { interval: 120, task: double(enqueue: true) } + ] + described_class.start(tasks) + expect(Iodine).to have_received(:run_every).with(60_000) + expect(Iodine).to have_received(:run_every).with(120_000) + end + + it "calls enqueue on the task when timer fires" do + allow(Iodine).to receive(:run_every).with(60_000) { |&block| block.call } + described_class.start(tasks) + expect(task).to have_received(:enqueue) + end + + it "passes the correct lock path to pick_a_worker" do + described_class.start(tasks) + expect(Rage::Internal).to have_received(:pick_a_worker).with(lock_path: Rage::Deferred::Scheduler::LOCK_PATH) + end + end +end diff --git a/spec/internal_spec.rb b/spec/internal_spec.rb index 0761bbc4..bc46ba09 100644 --- a/spec/internal_spec.rb +++ b/spec/internal_spec.rb @@ -99,6 +99,7 @@ let(:on_start_callbacks) { [] } before do + allow(Iodine).to receive(:running?).and_return(false) allow(Iodine).to receive(:on_state).with(:on_start) do |&block| on_start_callbacks << block end @@ -116,6 +117,13 @@ expect(on_start_callbacks.size).to eq(1) end + it "executes the block immediately when Iodine is already running" do + allow(Iodine).to receive(:running?).and_return(true) + executed = false + described_class.pick_a_worker { executed = true } + expect(executed).to be(true) + end + it "creates a lock file" do described_class.pick_a_worker { "work" } @@ -123,6 +131,13 @@ expect(File.exist?(lock_file.path)).to be(true) end + it "uses provided lock_path instead of creating a new tempfile" do + lock_path = Tempfile.new.path + expect(Tempfile).not_to receive(:new) + described_class.pick_a_worker(lock_path: lock_path) { "work" } + on_start_callbacks.first.call + end + it "executes the block when lock is acquired" do executed = false described_class.pick_a_worker { executed = true }