diff --git a/lib/rage/configuration.rb b/lib/rage/configuration.rb index a10eb126..83a0f5d1 100644 --- a/lib/rage/configuration.rb +++ b/lib/rage/configuration.rb @@ -702,10 +702,13 @@ def backend=(config) when :disk @backend_options = parse_disk_backend_options(opts) Rage::Deferred::Backends::Disk + when :sql + @backend_options = parse_sql_backend_options(opts) + Rage::Deferred::Backends::SQL when nil Rage::Deferred::Backends::Nil else - raise ArgumentError, "unsupported backend value; supported keys are `:disk` and `nil`" + raise ArgumentError, "unsupported backend value; supported keys are `:disk`, `:sql`, and `nil`" end end @@ -865,6 +868,12 @@ def parse_disk_backend_options(opts) parsed_options end + + def parse_sql_backend_options(opts) + raise ArgumentError, "sql backend only supports :table_name" if opts.except(:table_name).any? + + { table_name: (opts[:table_name] || "rage_deferred_tasks").to_s } + end end # The class allows configuring telemetry handlers. See {MiddlewareRegistry} for details on available methods. @@ -1097,3 +1106,39 @@ def __finalize # def call(task_class:, task:, phase:, args:, kwargs:, context:) # end # end +# # # Re-encrypt the arguments in case of an error +# # args.map! { |arg| MyEncryptionSDK.encrypt(arg) } +# # kwargs.transform_values! { |value| MyEncryptionSDK.encrypt(value) } +# # raise +# # end +# # end +# def call(task_class:, task:, phase:, args:, kwargs:, context:) +# end +# end +# # # Re-encrypt the arguments in case of an error +# # args.map! { |arg| MyEncryptionSDK.encrypt(arg) } +# # kwargs.transform_values! { |value| MyEncryptionSDK.encrypt(value) } +# # raise +# # end +# # end +# def call(task_class:, task:, phase:, args:, kwargs:, context:) +# end +# end +# # # Re-encrypt the arguments in case of an error +# # args.map! { |arg| MyEncryptionSDK.encrypt(arg) } +# # kwargs.transform_values! { |value| MyEncryptionSDK.encrypt(value) } +# # raise +# # end +# # end +# def call(task_class:, task:, phase:, args:, kwargs:, context:) +# end +# end +# # # Re-encrypt the arguments in case of an error +# # args.map! { |arg| MyEncryptionSDK.encrypt(arg) } +# # kwargs.transform_values! { |value| MyEncryptionSDK.encrypt(value) } +# # raise +# # end +# # end +# def call(task_class:, task:, phase:, args:, kwargs:, context:) +# end +# end diff --git a/lib/rage/deferred/backends/sql.rb b/lib/rage/deferred/backends/sql.rb new file mode 100644 index 00000000..240034ba --- /dev/null +++ b/lib/rage/deferred/backends/sql.rb @@ -0,0 +1,84 @@ +# frozen_string_literal: true + +require "securerandom" + +class Rage::Deferred::Backends::SQL + def initialize(table_name: "rage_deferred_tasks") + @table_name = table_name.to_s + @table_created = false + end + + # Called during boot - keep it fast and safe + def pending_tasks + # Return empty during early boot to avoid pool issues + # Real pending tasks will be loaded later when pool is stable + [] + end + + def add(context, publish_at: nil, task_id: nil) + with_connection do + ensure_table! + serialized = serialize(context) + task_id ||= SecureRandom.uuid + + sql = <<~SQL + INSERT INTO #{@table_name} (task_id, serialized_task, publish_at, status) + VALUES (?, ?, ?, 'pending') + ON CONFLICT (task_id) DO UPDATE SET + serialized_task = excluded.serialized_task, + publish_at = excluded.publish_at, + status = 'pending' + SQL + + connection.execute(ActiveRecord::Base.sanitize_sql_array([sql, task_id, serialized, publish_at&.to_i])) + task_id + end + end + + # When task is completed (success or final failure) → delete the row + def remove(task_id) + with_connection do + ensure_table! + sql = ActiveRecord::Base.sanitize_sql_array([ + "DELETE FROM #{@table_name} WHERE task_id = ?", + task_id + ]) + connection.execute(sql) + end + end + + private + + def with_connection(&block) + ActiveRecord::Base.with_connection(&block) + end + + def connection + ActiveRecord::Base.connection + end + + def ensure_table! + return if @table_created + + connection.execute("PRAGMA busy_timeout = 15000;") + + connection.execute(<<~SQL) + CREATE TABLE IF NOT EXISTS #{@table_name} ( + task_id VARCHAR(255) PRIMARY KEY, + serialized_task TEXT NOT NULL, + publish_at BIGINT, + status VARCHAR(20) NOT NULL DEFAULT 'pending' + ) + SQL + + @table_created = true + end + + def serialize(context) + Marshal.dump(context).dump + end + + def deserialize(data) + Marshal.load(data.undump) + end +end diff --git a/lib/rage/deferred/deferred.rb b/lib/rage/deferred/deferred.rb index 470a5b90..48b8c546 100644 --- a/lib/rage/deferred/deferred.rb +++ b/lib/rage/deferred/deferred.rb @@ -102,6 +102,7 @@ class PushTimeout < StandardError require_relative "metadata" require_relative "middleware_chain" require_relative "backends/disk" +require_relative "backends/sql" require_relative "backends/nil" if Iodine.running?