diff --git a/CHANGELOG.md b/CHANGELOG.md index 419de4f4..a04e1143 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ - [OpenAPI] Add support for per-endpoint OAuth2/OpenID scopes via `@auth_scope` tag by [@Piyush-Goenka](https://github.com/Piyush-Goenka) (#272). - Reuse `define_dynamic_method` and `define_maybe_yield` methods in `RageController::API` from `Rage::Internal` by [@numice](https://github.com/numice) (#273). - Add the `form_actions` router configuration (#278). +- [Deferred] Add native periodic task scheduling with multi-process leader election via `File#flock` by [@Abishekcs](https://github.com/Abishekcs) (#233). ### Fixed diff --git a/lib/rage/configuration.rb b/lib/rage/configuration.rb index 702d5416..69c759dd 100644 --- a/lib/rage/configuration.rb +++ b/lib/rage/configuration.rb @@ -715,6 +715,39 @@ class Deferred # @private def initialize @configured = false + @schedule_blocks = [] + end + + # Stores the scheduling block for later execution + def schedule(&block) + @schedule_blocks << block + end + + # Evaluates all stored schedule blocks and returns the collected tasks. + # Called at boot time after all app constants are loaded. + def scheduled_tasks + @schedule_blocks.flat_map do |block| + dsl = ScheduleDSL.new + dsl.instance_eval(&block) + dsl.tasks + end + end + + # @private + class ScheduleDSL + attr_reader :tasks + + def initialize + @tasks = [] + end + + # Registers a task to run on a fixed interval (in seconds) + def every(interval, task:) + unless task.is_a?(Class) && task.include?(Rage::Deferred::Task) + raise ArgumentError, "#{task} must be a class that includes Rage::Deferred::Task" + end + @tasks << { interval:, task: } + end end # Returns the backend instance used by `Rage::Deferred`. diff --git a/lib/rage/deferred/deferred.rb b/lib/rage/deferred/deferred.rb index 470a5b90..274eba34 100644 --- a/lib/rage/deferred/deferred.rb +++ b/lib/rage/deferred/deferred.rb @@ -82,10 +82,16 @@ def self.__middleware_chain ) end + # @private + def self.__start_scheduler + Rage::Deferred::Scheduler.start(Rage.config.deferred.scheduled_tasks) + end + # @private def self.__initialize __middleware_chain __load_tasks + __start_scheduler end module Backends @@ -96,6 +102,7 @@ class PushTimeout < StandardError end require_relative "task" +require_relative "scheduler" require_relative "queue" require_relative "proxy" require_relative "context" diff --git a/lib/rage/deferred/scheduler.rb b/lib/rage/deferred/scheduler.rb new file mode 100644 index 00000000..a055ccab --- /dev/null +++ b/lib/rage/deferred/scheduler.rb @@ -0,0 +1,22 @@ +# frozen_string_literal: true + +class Rage::Deferred::Scheduler + LOCK_PATH = "/tmp/rage_deferred_scheduler.lock" + + def self.start(tasks) + return if tasks.empty? + + Rage::Internal.pick_a_worker(lock_path: LOCK_PATH) do + Rage.logger.info " Worker PID #{Process.pid} is managing scheduled tasks" + register_timers tasks + end + end + + def self.register_timers(tasks) + tasks.each do |entry| + Iodine.run_every((entry[:interval] * 1000).to_i) do + entry[:task].enqueue + end + end + end +end diff --git a/lib/rage/internal.rb b/lib/rage/internal.rb index dc15383b..b0d09430 100644 --- a/lib/rage/internal.rb +++ b/lib/rage/internal.rb @@ -67,17 +67,18 @@ def stream_name_for(streamables) # Pick a worker process to execute a block of code. # This is useful for ensuring that certain code is only executed by a single worker in a multi-worker setup, e.g. for broadcasting messages to known streams or for running periodic tasks. # @yield The block of code to be executed by the picked worker - def pick_a_worker(&block) - @lock_file, lock_path = Tempfile.new.yield_self { |file| [file, file.path] } - - Iodine.on_state(:on_start) do - worker_lock = File.new(lock_path) + def pick_a_worker(lock_path: nil, &block) + @lock_file, lock_path = Tempfile.new.yield_self { |f| [f, f.path] } unless lock_path + attempt = proc do + worker_lock = File.open(lock_path, File::CREAT | File::WRONLY) if worker_lock.flock(File::LOCK_EX | File::LOCK_NB) @worker_lock = worker_lock block.call end end + + Iodine.running? ? attempt.call : Iodine.on_state(:on_start) { attempt.call } end private diff --git a/spec/deferred/scheduler_spec.rb b/spec/deferred/scheduler_spec.rb new file mode 100644 index 00000000..1448d985 --- /dev/null +++ b/spec/deferred/scheduler_spec.rb @@ -0,0 +1,53 @@ +# frozen_string_literal: true + +RSpec.describe Rage::Deferred::Scheduler do + let(:task) { double("Rage::Deferred::Task") } + let(:tasks) { [{ interval: 60, task: task }] } + let(:logger) { double("Logger", info: nil) } + + before do + allow(Rage).to receive(:logger).and_return(logger) + allow(Iodine).to receive(:run_every) + allow(Rage::Internal).to receive(:pick_a_worker) { |&block| block.call } + allow(task).to receive(:enqueue) + end + + describe ".start" do + it "does not start when no tasks are configured" do + described_class.start([]) + expect(Iodine).not_to have_received(:run_every) + end + + it "registers timers when leader is elected" do + described_class.start(tasks) + expect(Iodine).to have_received(:run_every).with(60_000) + end + + it "does not register task timers when lock is not acquired" do + allow(Rage::Internal).to receive(:pick_a_worker) + described_class.start(tasks) + expect(Iodine).not_to have_received(:run_every) + end + + it "registers a timer for each task" do + tasks = [ + { interval: 60, task: double(enqueue: true) }, + { interval: 120, task: double(enqueue: true) } + ] + described_class.start(tasks) + expect(Iodine).to have_received(:run_every).with(60_000) + expect(Iodine).to have_received(:run_every).with(120_000) + end + + it "calls enqueue on the task when timer fires" do + allow(Iodine).to receive(:run_every).with(60_000) { |&block| block.call } + described_class.start(tasks) + expect(task).to have_received(:enqueue) + end + + it "passes the correct lock path to pick_a_worker" do + described_class.start(tasks) + expect(Rage::Internal).to have_received(:pick_a_worker).with(lock_path: Rage::Deferred::Scheduler::LOCK_PATH) + end + end +end diff --git a/spec/internal_spec.rb b/spec/internal_spec.rb index 0761bbc4..bc46ba09 100644 --- a/spec/internal_spec.rb +++ b/spec/internal_spec.rb @@ -99,6 +99,7 @@ let(:on_start_callbacks) { [] } before do + allow(Iodine).to receive(:running?).and_return(false) allow(Iodine).to receive(:on_state).with(:on_start) do |&block| on_start_callbacks << block end @@ -116,6 +117,13 @@ expect(on_start_callbacks.size).to eq(1) end + it "executes the block immediately when Iodine is already running" do + allow(Iodine).to receive(:running?).and_return(true) + executed = false + described_class.pick_a_worker { executed = true } + expect(executed).to be(true) + end + it "creates a lock file" do described_class.pick_a_worker { "work" } @@ -123,6 +131,13 @@ expect(File.exist?(lock_file.path)).to be(true) end + it "uses provided lock_path instead of creating a new tempfile" do + lock_path = Tempfile.new.path + expect(Tempfile).not_to receive(:new) + described_class.pick_a_worker(lock_path: lock_path) { "work" } + on_start_callbacks.first.call + end + it "executes the block when lock is acquired" do executed = false described_class.pick_a_worker { executed = true }