Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
## [Unreleased]
### Added
- [Deferred] Propagate `ActiveSupport::CurrentAttributes` across fiber boundaries so deferred tasks see the same `Current.*` values as the enqueueing request by [@jsxs0](https://github.com/jsxs0) (#TBD).
- [Deferred] Add tests for log context capture and backward-compatible restore by [@jsxs0](https://github.com/jsxs0) (#274).

## [1.23.0] - 2026-04-15
Expand Down
32 changes: 31 additions & 1 deletion lib/rage/deferred/context.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,12 @@ def self.build(task, args, kwargs)
nil,
Fiber[:__rage_logger_tags],
Fiber[:__rage_logger_context],
nil
nil,
# Index 7: ActiveSupport::CurrentAttributes snapshot.
# Why captured here rather than at restore time: the enqueueing fiber
# owns the truthful `Current.*` state. By the time the task fiber runs,
# the request fiber may have already reset its attributes.
capture_current_attributes
]
end

Expand Down Expand Up @@ -72,4 +77,29 @@ def self.get_user_context(context)
def self.get_or_create_user_context(context)
context[6] ||= {}
end

# @return [Array<Array(Class, Hash)>, nil] snapshot of each ActiveSupport::CurrentAttributes
# subclass and its per-attribute values at enqueue time; nil when AS is unavailable
# or no subclasses exist.
def self.get_current_attributes(context)
context[7]
end

# Why a dedicated method: keeps the `build` array literal readable and makes this
# path trivial to stub in specs that run without ActiveSupport loaded.
#
# Why we snapshot the values (not the classes) here: `Current.reset` in the parent
# fiber after enqueue would otherwise mutate what we captured. We need value copies.
def self.capture_current_attributes
return nil unless defined?(ActiveSupport::CurrentAttributes)

subclasses = ActiveSupport::CurrentAttributes.descendants
return nil if subclasses.empty?

subclasses.filter_map do |klass|
attrs = klass.attributes
next if attrs.empty?
[klass, attrs.dup]
end.then { |snapshots| snapshots.empty? ? nil : snapshots }
end
end
86 changes: 38 additions & 48 deletions lib/rage/deferred/task.rb
Original file line number Diff line number Diff line change
Expand Up @@ -45,21 +45,14 @@ def perform

# Access metadata for the current task execution.
# @return [Rage::Deferred::Metadata] the metadata object for the current task execution
# @example
# class MyTask
# include Rage::Deferred::Task
#
# def perform
# puts meta.retries
# end
# end
def meta
Rage::Deferred::Metadata
end

# @private
def __perform(context)
restore_log_info(context)
ca_snapshots = restore_current_attributes(context)

attempts = Rage::Deferred::Context.get_attempts(context)
task_log_context = { task: self.class.name }
Expand All @@ -86,6 +79,12 @@ def __perform(context)
end
end
e
ensure
# Why reset only what we restored: task fibers are reused by Iodine's worker pool,
# so leftover Current.* values would poison the next task on the same fiber.
# But we only need to clean up subclasses whose values we actually set, not every
# descendant in the app, which would pointlessly fire before_reset/after_reset hooks.
reset_current_attributes(ca_snapshots) if ca_snapshots
end

private def restore_log_info(context)
Expand All @@ -100,23 +99,42 @@ def __perform(context)
end
end

# Why direct attribute assignment and not `CurrentAttributes.set { }`:
# `set` with a block restores previous values after the block exits. We need
# the restored values to persist through `perform`, not revert. Rails' own
# ActiveJob::CurrentAttributes takes the same direct-assign approach.
#
# @return [Array, nil] the snapshots we restored, so the ensure block knows what to reset.
# Returns nil when there was nothing to do, telling the caller "no cleanup needed."
private def restore_current_attributes(context)
snapshots = Rage::Deferred::Context.get_current_attributes(context)
return nil unless snapshots && !snapshots.empty?

snapshots.each do |klass, attrs|
attrs.each { |name, value| klass.public_send("#{name}=", value) }
rescue => e
# Why rescue-and-continue: one broken CurrentAttributes subclass must not
# take down the whole task. Logged so the failure stays visible.
Rage.logger.warn("Rage::Deferred: failed to restore #{klass}: #{e.class} (#{e.message})")
end

snapshots
end

# Resets only the subclasses we restored. See the ensure block for why.
private def reset_current_attributes(snapshots)
snapshots.each do |klass, _|
klass.reset
rescue => e
Rage.logger.warn("Rage::Deferred: failed to reset #{klass}: #{e.class} (#{e.message})")
end
end

def self.included(klass)
klass.extend(ClassMethods)
end

module ClassMethods
# Set the maximum number of retry attempts for this task.
#
# @param count [Integer] the maximum number of retry attempts
# @example
# class SendWelcomeEmail
# include Rage::Deferred::Task
# max_retries 10
#
# def perform(email)
# # ...
# end
# end
def max_retries(count)
value = Integer(count)

Expand All @@ -129,34 +147,6 @@ def max_retries(count)
raise ArgumentError, "max_retries should be a valid non-negative integer"
end

# Override this method to customize retry behavior per exception.
#
# Return an Integer to retry in that many seconds.
# Return `super` to use the default exponential backoff.
# Return `false` or `nil` to abort retries.
#
# @param exception [Exception] the exception that caused the failure
# @param attempt [Integer] the current attempt number (1-indexed)
# @return [Integer, false, nil] the retry interval in seconds, or false/nil to abort
# @example
# class ProcessPayment
# include Rage::Deferred::Task
#
# def self.retry_interval(exception, attempt:)
# case exception
# when TemporaryNetworkError
# 10 # Retry in 10 seconds
# when InvalidDataError
# false # Do not retry
# else
# super # Default backoff strategy
# end
# end
#
# def perform(payment_id)
# # ...
# end
# end
def retry_interval(exception, attempt:)
__default_backoff(attempt)
end
Expand Down
90 changes: 90 additions & 0 deletions spec/deferred/context_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -143,4 +143,94 @@
end
end
end

describe ".capture_current_attributes" do
context "when ActiveSupport is not loaded" do
before { hide_const("ActiveSupport::CurrentAttributes") if defined?(ActiveSupport::CurrentAttributes) }

it "returns nil" do
expect(described_class.capture_current_attributes).to be_nil
end
end

context "when ActiveSupport::CurrentAttributes exists but has no subclasses" do
before do
stub_const("ActiveSupport::CurrentAttributes", Class.new {
def self.descendants
[]
end
})
end

it "returns nil" do
expect(described_class.capture_current_attributes).to be_nil
end
end

context "when there are subclasses with attributes" do
let(:subclass) do
Class.new do
def self.name
"Current"
end

def self.attributes
{ user_id: 42, tenant: "acme" }
end
end
end

before do
stub_const("ActiveSupport::CurrentAttributes", Class.new)
allow(ActiveSupport::CurrentAttributes).to receive(:descendants).and_return([subclass])
end

it "captures the subclass and a duplicated attribute hash" do
snapshots = described_class.capture_current_attributes
expect(snapshots).to eq([[subclass, { user_id: 42, tenant: "acme" }]])
end

it "returns a duplicated hash so later mutation does not leak into the snapshot" do
snapshots = described_class.capture_current_attributes
# Why: if we captured the live hash reference, a `Current.reset` in the
# parent fiber after enqueue would wipe the snapshot before the task runs.
expect(snapshots.first.last).not_to equal(subclass.attributes)
end
end

context "when a subclass has no attributes set" do
let(:empty_subclass) do
Class.new do
def self.name
"EmptyCurrent"
end

def self.attributes
{}
end
end
end

before do
stub_const("ActiveSupport::CurrentAttributes", Class.new)
allow(ActiveSupport::CurrentAttributes).to receive(:descendants).and_return([empty_subclass])
end

it "excludes the empty subclass and returns nil when nothing worth capturing" do
expect(described_class.capture_current_attributes).to be_nil
end
end
end

describe ".get_current_attributes" do
it "returns index 7 of the context" do
context = [nil, nil, nil, nil, nil, nil, nil, :ca_snapshot]
expect(described_class.get_current_attributes(context)).to eq(:ca_snapshot)
end

it "returns nil when the context array has no index 7 (backward compatible with pre-CA contexts)" do
context = [nil, nil, nil, nil, nil, nil, nil]
expect(described_class.get_current_attributes(context)).to be_nil
end
end
end
81 changes: 81 additions & 0 deletions spec/deferred/task_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,7 @@ def perform(arg, kwarg:)
before do
allow(Rage::Deferred::Context).to receive(:get_log_tags).with(context).and_return(["request-id"])
allow(Rage::Deferred::Context).to receive(:get_log_context).with(context).and_return({})
allow(Rage::Deferred::Context).to receive(:get_current_attributes).with(context).and_return(nil)
allow(task).to receive(:perform)
end

Expand Down Expand Up @@ -288,6 +289,7 @@ def perform(arg, kwarg:)
before do
allow(Rage::Deferred::Context).to receive(:get_log_tags).with(context).and_return("old-request-id")
allow(Rage::Deferred::Context).to receive(:get_log_context).with(context).and_return(nil)
allow(Rage::Deferred::Context).to receive(:get_current_attributes).with(context).and_return(nil)
allow(task).to receive(:perform)
end

Expand All @@ -311,6 +313,7 @@ def perform(arg, kwarg:)
before do
allow(Rage::Deferred::Context).to receive(:get_log_tags).with(context).and_return(nil)
allow(Rage::Deferred::Context).to receive(:get_log_context).with(context).and_return({})
allow(Rage::Deferred::Context).to receive(:get_current_attributes).with(context).and_return(nil)
allow(task).to receive(:perform)
end

Expand All @@ -320,12 +323,90 @@ def perform(arg, kwarg:)
end
end

context "when context carries CurrentAttributes snapshots" do
let(:current_class) do
Class.new do
@store = {}
class << self
attr_reader :store
def user_id=(v)
@store[:user_id] = v
end

def tenant=(v)
@store[:tenant] = v
end

def reset
@store.clear
end

def attributes
@store
end
end
end
end

before do
stub_const("TestCurrent", current_class)
stub_const("ActiveSupport::CurrentAttributes", Class.new)
allow(ActiveSupport::CurrentAttributes).to receive(:descendants).and_return([current_class])

allow(Rage::Deferred::Context).to receive(:get_log_tags).with(context).and_return(nil)
allow(Rage::Deferred::Context).to receive(:get_log_context).with(context).and_return(nil)
allow(Rage::Deferred::Context).to receive(:get_current_attributes).
with(context).and_return([[current_class, { user_id: 42, tenant: "acme" }]])
allow(task).to receive(:perform)
end

it "restores CurrentAttributes values before perform runs" do
seen = nil
allow(task).to receive(:perform) { seen = current_class.attributes.dup }
task.__perform(context)
expect(seen).to eq({ user_id: 42, tenant: "acme" })
end

it "resets CurrentAttributes after perform to prevent cross-task leakage" do
task.__perform(context)
# Why this matters: task fibers are reused by Iodine's worker pool. A
# leaked Current.user_id = 42 would poison the NEXT task on the same fiber.
expect(current_class.attributes).to be_empty
end

it "resets CurrentAttributes even when perform raises" do
allow(task).to receive(:perform).and_raise(StandardError, "boom")
task.__perform(context)
expect(current_class.attributes).to be_empty
end

it "continues gracefully when an individual subclass restore fails" do
allow(current_class).to receive(:user_id=).and_raise(RuntimeError, "bad attribute")
expect { task.__perform(context) }.not_to raise_error
end
end

context "when ActiveSupport is not loaded" do
before do
hide_const("ActiveSupport::CurrentAttributes") if defined?(ActiveSupport::CurrentAttributes)
allow(Rage::Deferred::Context).to receive(:get_log_tags).with(context).and_return(nil)
allow(Rage::Deferred::Context).to receive(:get_log_context).with(context).and_return(nil)
allow(Rage::Deferred::Context).to receive(:get_current_attributes).with(context).and_return(nil)
allow(task).to receive(:perform)
end

it "runs the task without error" do
expect { task.__perform(context) }.not_to raise_error
end
end

context "when task fails" do
let(:error) { StandardError.new("Something went wrong") }

before do
allow(Rage::Deferred::Context).to receive(:get_log_tags).with(context).and_return(nil)
allow(Rage::Deferred::Context).to receive(:get_log_context).with(context).and_return({})
allow(Rage::Deferred::Context).to receive(:get_current_attributes).with(context).and_return(nil)
allow(task).to receive(:perform).and_raise(error)
allow(error).to receive(:backtrace).and_return(["line 1", "line 2"])
end
Expand Down
Loading