From b4b27edb11826ca46f8318c900bfc8d2b95e10a0 Mon Sep 17 00:00:00 2001 From: jsxs0 <44266373+jsxs0@users.noreply.github.com> Date: Fri, 17 Apr 2026 16:31:56 +0900 Subject: [PATCH] feat(deferred): propagate Current.* across fibers --- CHANGELOG.md | 1 + lib/rage/deferred/context.rb | 32 ++++++++++++- lib/rage/deferred/task.rb | 86 +++++++++++++++------------------ spec/deferred/context_spec.rb | 90 +++++++++++++++++++++++++++++++++++ spec/deferred/task_spec.rb | 81 +++++++++++++++++++++++++++++++ 5 files changed, 241 insertions(+), 49 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a48f7c04..78db8eed 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,6 @@ ## [Unreleased] ### Added +- [Deferred] Propagate `ActiveSupport::CurrentAttributes` across fiber boundaries so deferred tasks see the same `Current.*` values as the enqueueing request by [@jsxs0](https://github.com/jsxs0) (#TBD). - [Deferred] Add tests for log context capture and backward-compatible restore by [@jsxs0](https://github.com/jsxs0) (#274). ## [1.23.0] - 2026-04-15 diff --git a/lib/rage/deferred/context.rb b/lib/rage/deferred/context.rb index bb739560..22d102a3 100644 --- a/lib/rage/deferred/context.rb +++ b/lib/rage/deferred/context.rb @@ -14,7 +14,12 @@ def self.build(task, args, kwargs) nil, Fiber[:__rage_logger_tags], Fiber[:__rage_logger_context], - nil + nil, + # Index 7: ActiveSupport::CurrentAttributes snapshot. + # Why captured here rather than at restore time: the enqueueing fiber + # owns the truthful `Current.*` state. By the time the task fiber runs, + # the request fiber may have already reset its attributes. + capture_current_attributes ] end @@ -72,4 +77,29 @@ def self.get_user_context(context) def self.get_or_create_user_context(context) context[6] ||= {} end + + # @return [Array, nil] snapshot of each ActiveSupport::CurrentAttributes + # subclass and its per-attribute values at enqueue time; nil when AS is unavailable + # or no subclasses exist. + def self.get_current_attributes(context) + context[7] + end + + # Why a dedicated method: keeps the `build` array literal readable and makes this + # path trivial to stub in specs that run without ActiveSupport loaded. + # + # Why we snapshot the values (not the classes) here: `Current.reset` in the parent + # fiber after enqueue would otherwise mutate what we captured. We need value copies. + def self.capture_current_attributes + return nil unless defined?(ActiveSupport::CurrentAttributes) + + subclasses = ActiveSupport::CurrentAttributes.descendants + return nil if subclasses.empty? + + subclasses.filter_map do |klass| + attrs = klass.attributes + next if attrs.empty? + [klass, attrs.dup] + end.then { |snapshots| snapshots.empty? ? nil : snapshots } + end end diff --git a/lib/rage/deferred/task.rb b/lib/rage/deferred/task.rb index 820b98cd..18e7a98e 100644 --- a/lib/rage/deferred/task.rb +++ b/lib/rage/deferred/task.rb @@ -45,14 +45,6 @@ def perform # Access metadata for the current task execution. # @return [Rage::Deferred::Metadata] the metadata object for the current task execution - # @example - # class MyTask - # include Rage::Deferred::Task - # - # def perform - # puts meta.retries - # end - # end def meta Rage::Deferred::Metadata end @@ -60,6 +52,7 @@ def meta # @private def __perform(context) restore_log_info(context) + ca_snapshots = restore_current_attributes(context) attempts = Rage::Deferred::Context.get_attempts(context) task_log_context = { task: self.class.name } @@ -86,6 +79,12 @@ def __perform(context) end end e + ensure + # Why reset only what we restored: task fibers are reused by Iodine's worker pool, + # so leftover Current.* values would poison the next task on the same fiber. + # But we only need to clean up subclasses whose values we actually set, not every + # descendant in the app, which would pointlessly fire before_reset/after_reset hooks. + reset_current_attributes(ca_snapshots) if ca_snapshots end private def restore_log_info(context) @@ -100,23 +99,42 @@ def __perform(context) end end + # Why direct attribute assignment and not `CurrentAttributes.set { }`: + # `set` with a block restores previous values after the block exits. We need + # the restored values to persist through `perform`, not revert. Rails' own + # ActiveJob::CurrentAttributes takes the same direct-assign approach. + # + # @return [Array, nil] the snapshots we restored, so the ensure block knows what to reset. + # Returns nil when there was nothing to do, telling the caller "no cleanup needed." + private def restore_current_attributes(context) + snapshots = Rage::Deferred::Context.get_current_attributes(context) + return nil unless snapshots && !snapshots.empty? + + snapshots.each do |klass, attrs| + attrs.each { |name, value| klass.public_send("#{name}=", value) } + rescue => e + # Why rescue-and-continue: one broken CurrentAttributes subclass must not + # take down the whole task. Logged so the failure stays visible. + Rage.logger.warn("Rage::Deferred: failed to restore #{klass}: #{e.class} (#{e.message})") + end + + snapshots + end + + # Resets only the subclasses we restored. See the ensure block for why. + private def reset_current_attributes(snapshots) + snapshots.each do |klass, _| + klass.reset + rescue => e + Rage.logger.warn("Rage::Deferred: failed to reset #{klass}: #{e.class} (#{e.message})") + end + end + def self.included(klass) klass.extend(ClassMethods) end module ClassMethods - # Set the maximum number of retry attempts for this task. - # - # @param count [Integer] the maximum number of retry attempts - # @example - # class SendWelcomeEmail - # include Rage::Deferred::Task - # max_retries 10 - # - # def perform(email) - # # ... - # end - # end def max_retries(count) value = Integer(count) @@ -129,34 +147,6 @@ def max_retries(count) raise ArgumentError, "max_retries should be a valid non-negative integer" end - # Override this method to customize retry behavior per exception. - # - # Return an Integer to retry in that many seconds. - # Return `super` to use the default exponential backoff. - # Return `false` or `nil` to abort retries. - # - # @param exception [Exception] the exception that caused the failure - # @param attempt [Integer] the current attempt number (1-indexed) - # @return [Integer, false, nil] the retry interval in seconds, or false/nil to abort - # @example - # class ProcessPayment - # include Rage::Deferred::Task - # - # def self.retry_interval(exception, attempt:) - # case exception - # when TemporaryNetworkError - # 10 # Retry in 10 seconds - # when InvalidDataError - # false # Do not retry - # else - # super # Default backoff strategy - # end - # end - # - # def perform(payment_id) - # # ... - # end - # end def retry_interval(exception, attempt:) __default_backoff(attempt) end diff --git a/spec/deferred/context_spec.rb b/spec/deferred/context_spec.rb index 7b01a74e..cfd8735f 100644 --- a/spec/deferred/context_spec.rb +++ b/spec/deferred/context_spec.rb @@ -143,4 +143,94 @@ end end end + + describe ".capture_current_attributes" do + context "when ActiveSupport is not loaded" do + before { hide_const("ActiveSupport::CurrentAttributes") if defined?(ActiveSupport::CurrentAttributes) } + + it "returns nil" do + expect(described_class.capture_current_attributes).to be_nil + end + end + + context "when ActiveSupport::CurrentAttributes exists but has no subclasses" do + before do + stub_const("ActiveSupport::CurrentAttributes", Class.new { + def self.descendants + [] + end + }) + end + + it "returns nil" do + expect(described_class.capture_current_attributes).to be_nil + end + end + + context "when there are subclasses with attributes" do + let(:subclass) do + Class.new do + def self.name + "Current" + end + + def self.attributes + { user_id: 42, tenant: "acme" } + end + end + end + + before do + stub_const("ActiveSupport::CurrentAttributes", Class.new) + allow(ActiveSupport::CurrentAttributes).to receive(:descendants).and_return([subclass]) + end + + it "captures the subclass and a duplicated attribute hash" do + snapshots = described_class.capture_current_attributes + expect(snapshots).to eq([[subclass, { user_id: 42, tenant: "acme" }]]) + end + + it "returns a duplicated hash so later mutation does not leak into the snapshot" do + snapshots = described_class.capture_current_attributes + # Why: if we captured the live hash reference, a `Current.reset` in the + # parent fiber after enqueue would wipe the snapshot before the task runs. + expect(snapshots.first.last).not_to equal(subclass.attributes) + end + end + + context "when a subclass has no attributes set" do + let(:empty_subclass) do + Class.new do + def self.name + "EmptyCurrent" + end + + def self.attributes + {} + end + end + end + + before do + stub_const("ActiveSupport::CurrentAttributes", Class.new) + allow(ActiveSupport::CurrentAttributes).to receive(:descendants).and_return([empty_subclass]) + end + + it "excludes the empty subclass and returns nil when nothing worth capturing" do + expect(described_class.capture_current_attributes).to be_nil + end + end + end + + describe ".get_current_attributes" do + it "returns index 7 of the context" do + context = [nil, nil, nil, nil, nil, nil, nil, :ca_snapshot] + expect(described_class.get_current_attributes(context)).to eq(:ca_snapshot) + end + + it "returns nil when the context array has no index 7 (backward compatible with pre-CA contexts)" do + context = [nil, nil, nil, nil, nil, nil, nil] + expect(described_class.get_current_attributes(context)).to be_nil + end + end end diff --git a/spec/deferred/task_spec.rb b/spec/deferred/task_spec.rb index 2fb82176..7aa9efe6 100644 --- a/spec/deferred/task_spec.rb +++ b/spec/deferred/task_spec.rb @@ -249,6 +249,7 @@ def perform(arg, kwarg:) before do allow(Rage::Deferred::Context).to receive(:get_log_tags).with(context).and_return(["request-id"]) allow(Rage::Deferred::Context).to receive(:get_log_context).with(context).and_return({}) + allow(Rage::Deferred::Context).to receive(:get_current_attributes).with(context).and_return(nil) allow(task).to receive(:perform) end @@ -288,6 +289,7 @@ def perform(arg, kwarg:) before do allow(Rage::Deferred::Context).to receive(:get_log_tags).with(context).and_return("old-request-id") allow(Rage::Deferred::Context).to receive(:get_log_context).with(context).and_return(nil) + allow(Rage::Deferred::Context).to receive(:get_current_attributes).with(context).and_return(nil) allow(task).to receive(:perform) end @@ -311,6 +313,7 @@ def perform(arg, kwarg:) before do allow(Rage::Deferred::Context).to receive(:get_log_tags).with(context).and_return(nil) allow(Rage::Deferred::Context).to receive(:get_log_context).with(context).and_return({}) + allow(Rage::Deferred::Context).to receive(:get_current_attributes).with(context).and_return(nil) allow(task).to receive(:perform) end @@ -320,12 +323,90 @@ def perform(arg, kwarg:) end end + context "when context carries CurrentAttributes snapshots" do + let(:current_class) do + Class.new do + @store = {} + class << self + attr_reader :store + def user_id=(v) + @store[:user_id] = v + end + + def tenant=(v) + @store[:tenant] = v + end + + def reset + @store.clear + end + + def attributes + @store + end + end + end + end + + before do + stub_const("TestCurrent", current_class) + stub_const("ActiveSupport::CurrentAttributes", Class.new) + allow(ActiveSupport::CurrentAttributes).to receive(:descendants).and_return([current_class]) + + allow(Rage::Deferred::Context).to receive(:get_log_tags).with(context).and_return(nil) + allow(Rage::Deferred::Context).to receive(:get_log_context).with(context).and_return(nil) + allow(Rage::Deferred::Context).to receive(:get_current_attributes). + with(context).and_return([[current_class, { user_id: 42, tenant: "acme" }]]) + allow(task).to receive(:perform) + end + + it "restores CurrentAttributes values before perform runs" do + seen = nil + allow(task).to receive(:perform) { seen = current_class.attributes.dup } + task.__perform(context) + expect(seen).to eq({ user_id: 42, tenant: "acme" }) + end + + it "resets CurrentAttributes after perform to prevent cross-task leakage" do + task.__perform(context) + # Why this matters: task fibers are reused by Iodine's worker pool. A + # leaked Current.user_id = 42 would poison the NEXT task on the same fiber. + expect(current_class.attributes).to be_empty + end + + it "resets CurrentAttributes even when perform raises" do + allow(task).to receive(:perform).and_raise(StandardError, "boom") + task.__perform(context) + expect(current_class.attributes).to be_empty + end + + it "continues gracefully when an individual subclass restore fails" do + allow(current_class).to receive(:user_id=).and_raise(RuntimeError, "bad attribute") + expect { task.__perform(context) }.not_to raise_error + end + end + + context "when ActiveSupport is not loaded" do + before do + hide_const("ActiveSupport::CurrentAttributes") if defined?(ActiveSupport::CurrentAttributes) + allow(Rage::Deferred::Context).to receive(:get_log_tags).with(context).and_return(nil) + allow(Rage::Deferred::Context).to receive(:get_log_context).with(context).and_return(nil) + allow(Rage::Deferred::Context).to receive(:get_current_attributes).with(context).and_return(nil) + allow(task).to receive(:perform) + end + + it "runs the task without error" do + expect { task.__perform(context) }.not_to raise_error + end + end + context "when task fails" do let(:error) { StandardError.new("Something went wrong") } before do allow(Rage::Deferred::Context).to receive(:get_log_tags).with(context).and_return(nil) allow(Rage::Deferred::Context).to receive(:get_log_context).with(context).and_return({}) + allow(Rage::Deferred::Context).to receive(:get_current_attributes).with(context).and_return(nil) allow(task).to receive(:perform).and_raise(error) allow(error).to receive(:backtrace).and_return(["line 1", "line 2"]) end