Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 46 additions & 1 deletion lib/rage/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -702,10 +702,13 @@ def backend=(config)
when :disk
@backend_options = parse_disk_backend_options(opts)
Rage::Deferred::Backends::Disk
when :sql
@backend_options = parse_sql_backend_options(opts)
Rage::Deferred::Backends::SQL
when nil
Rage::Deferred::Backends::Nil
else
raise ArgumentError, "unsupported backend value; supported keys are `:disk` and `nil`"
raise ArgumentError, "unsupported backend value; supported keys are `:disk`, `:sql`, and `nil`"
end
end

Expand Down Expand Up @@ -865,6 +868,12 @@ def parse_disk_backend_options(opts)

parsed_options
end

def parse_sql_backend_options(opts)
raise ArgumentError, "sql backend only supports :table_name" if opts.except(:table_name).any?

{ table_name: (opts[:table_name] || "rage_deferred_tasks").to_s }
end
end

# The class allows configuring telemetry handlers. See {MiddlewareRegistry} for details on available methods.
Expand Down Expand Up @@ -1097,3 +1106,39 @@ def __finalize
# def call(task_class:, task:, phase:, args:, kwargs:, context:)
# end
# end
# # # Re-encrypt the arguments in case of an error
# # args.map! { |arg| MyEncryptionSDK.encrypt(arg) }
# # kwargs.transform_values! { |value| MyEncryptionSDK.encrypt(value) }
# # raise
# # end
# # end
# def call(task_class:, task:, phase:, args:, kwargs:, context:)
# end
# end
# # # Re-encrypt the arguments in case of an error
# # args.map! { |arg| MyEncryptionSDK.encrypt(arg) }
# # kwargs.transform_values! { |value| MyEncryptionSDK.encrypt(value) }
# # raise
# # end
# # end
# def call(task_class:, task:, phase:, args:, kwargs:, context:)
# end
# end
# # # Re-encrypt the arguments in case of an error
# # args.map! { |arg| MyEncryptionSDK.encrypt(arg) }
# # kwargs.transform_values! { |value| MyEncryptionSDK.encrypt(value) }
# # raise
# # end
# # end
# def call(task_class:, task:, phase:, args:, kwargs:, context:)
# end
# end
# # # Re-encrypt the arguments in case of an error
# # args.map! { |arg| MyEncryptionSDK.encrypt(arg) }
# # kwargs.transform_values! { |value| MyEncryptionSDK.encrypt(value) }
# # raise
# # end
# # end
# def call(task_class:, task:, phase:, args:, kwargs:, context:)
# end
# end
84 changes: 84 additions & 0 deletions lib/rage/deferred/backends/sql.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
# frozen_string_literal: true

require "securerandom"

class Rage::Deferred::Backends::SQL
def initialize(table_name: "rage_deferred_tasks")
@table_name = table_name.to_s
@table_created = false
end

# Called during boot - keep it fast and safe
def pending_tasks
# Return empty during early boot to avoid pool issues
# Real pending tasks will be loaded later when pool is stable
[]
end

def add(context, publish_at: nil, task_id: nil)
with_connection do
ensure_table!
serialized = serialize(context)
task_id ||= SecureRandom.uuid

sql = <<~SQL
INSERT INTO #{@table_name} (task_id, serialized_task, publish_at, status)
VALUES (?, ?, ?, 'pending')
ON CONFLICT (task_id) DO UPDATE SET
serialized_task = excluded.serialized_task,
publish_at = excluded.publish_at,
status = 'pending'
SQL

connection.execute(ActiveRecord::Base.sanitize_sql_array([sql, task_id, serialized, publish_at&.to_i]))
task_id
end
end

# When task is completed (success or final failure) → delete the row
def remove(task_id)
with_connection do
ensure_table!
sql = ActiveRecord::Base.sanitize_sql_array([
"DELETE FROM #{@table_name} WHERE task_id = ?",
task_id
])
connection.execute(sql)
end
end

private

def with_connection(&block)
ActiveRecord::Base.with_connection(&block)
end

def connection
ActiveRecord::Base.connection
end

def ensure_table!
return if @table_created

connection.execute("PRAGMA busy_timeout = 15000;")

connection.execute(<<~SQL)
CREATE TABLE IF NOT EXISTS #{@table_name} (
task_id VARCHAR(255) PRIMARY KEY,
serialized_task TEXT NOT NULL,
publish_at BIGINT,
status VARCHAR(20) NOT NULL DEFAULT 'pending'
)
SQL

@table_created = true
end

def serialize(context)
Marshal.dump(context).dump
end

def deserialize(data)
Marshal.load(data.undump)
end
end
1 change: 1 addition & 0 deletions lib/rage/deferred/deferred.rb
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ class PushTimeout < StandardError
require_relative "metadata"
require_relative "middleware_chain"
require_relative "backends/disk"
require_relative "backends/sql"
require_relative "backends/nil"

if Iodine.running?
Expand Down