From 27e5c17991625bd1263d5e4486ccc7113d4f7775 Mon Sep 17 00:00:00 2001 From: Tom Date: Fri, 29 May 2026 21:31:24 +0900 Subject: [PATCH] feat(adapter): add async_redis Action Cable adapter. A fiber-based Redis pub/sub adapter for Action Cable, built on async-redis. Maintains a single dedicated thread that hosts an Async reactor and owns both Redis clients (publisher + subscriber); all adapter operations are routed onto that reactor via a thread-safe inbox queue. Mirrors Rails' built-in `redis` adapter semantics (dynamic subscribe/unsubscribe, automatic reconnect + resubscribe). Co-authored-by: darkamenosa Co-authored-by: Samuel Williams --- .github/workflows/test-coverage.yaml | 14 + .github/workflows/test-external.yaml | 11 + .github/workflows/test.yaml | 14 + async-cable.gemspec | 1 + .../subscription_adapter/async_redis.rb | 241 ++++++++++++++++++ readme.md | 23 ++ test/async/cable/async_redis_adapter.rb | 105 ++++++++ 7 files changed, 409 insertions(+) create mode 100644 lib/action_cable/subscription_adapter/async_redis.rb create mode 100644 test/async/cable/async_redis_adapter.rb diff --git a/.github/workflows/test-coverage.yaml b/.github/workflows/test-coverage.yaml index 5798e84..556e4e2 100644 --- a/.github/workflows/test-coverage.yaml +++ b/.github/workflows/test-coverage.yaml @@ -13,6 +13,20 @@ jobs: name: ${{matrix.ruby}} on ${{matrix.os}} runs-on: ${{matrix.os}}-latest + # GitHub Actions service containers run only on Linux runners; on + # macOS this block is ignored and the Redis adapter tests skip + # themselves via a TCP probe. + services: + redis: + image: redis + options: >- + --health-cmd "redis-cli ping" + --health-interval 10s + --health-timeout 5s + --health-retries 5 + ports: + - 6379:6379 + strategy: matrix: os: diff --git a/.github/workflows/test-external.yaml b/.github/workflows/test-external.yaml index 9ec63e2..e5a67c4 100644 --- a/.github/workflows/test-external.yaml +++ b/.github/workflows/test-external.yaml @@ -10,6 +10,17 @@ jobs: name: ${{matrix.ruby}} on ${{matrix.os}} runs-on: ${{matrix.os}}-latest + services: + redis: + image: redis + options: >- + --health-cmd "redis-cli ping" + --health-interval 10s + --health-timeout 5s + --health-retries 5 + ports: + - 6379:6379 + strategy: matrix: os: diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index e91d97c..e06ee68 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -11,6 +11,20 @@ jobs: runs-on: ${{matrix.os}}-latest continue-on-error: ${{matrix.experimental}} + # GitHub Actions service containers run only on Linux runners; on + # macOS this block is ignored and the Redis adapter tests skip + # themselves via a TCP probe. + services: + redis: + image: redis + options: >- + --health-cmd "redis-cli ping" + --health-interval 10s + --health-timeout 5s + --health-retries 5 + ports: + - 6379:6379 + strategy: matrix: os: diff --git a/async-cable.gemspec b/async-cable.gemspec index 78a3209..9f4ca23 100644 --- a/async-cable.gemspec +++ b/async-cable.gemspec @@ -29,4 +29,5 @@ Gem::Specification.new do |spec| spec.add_dependency "actioncable", ">= 8.1.0.alpha" spec.add_dependency "async", "~> 2.9" spec.add_dependency "async-websocket" + spec.add_dependency "async-redis", "~> 0.13" end diff --git a/lib/action_cable/subscription_adapter/async_redis.rb b/lib/action_cable/subscription_adapter/async_redis.rb new file mode 100644 index 0000000..9cc47c7 --- /dev/null +++ b/lib/action_cable/subscription_adapter/async_redis.rb @@ -0,0 +1,241 @@ +# frozen_string_literal: true + +# Released under the MIT License. +# Copyright, 2026, by Tom and contributors. +# Copyright, 2026, by Samuel Williams. + +require "async" +require "async/redis" + +module ActionCable + module SubscriptionAdapter + # Fiber-based Redis pub/sub adapter for Action Cable, built on + # {Async::Redis::Client}. + # + # Action Cable adapters are process-global and called from arbitrary + # threads (HTTP request threads, the Action Cable executor pool, + # background jobs, etc.). To keep all Redis I/O on a single reactor + # without putting any constraints on callers, the adapter owns one + # dedicated thread that hosts an event loop; cross-thread requests + # arrive via a {Thread::Queue} and are dispatched as fibers on that + # reactor. Redis clients are never shared across threads. + # + # Configuration (in `config/cable.yml`): + # + # production: + # adapter: async_redis + # url: redis://localhost:6379/1 + # reconnect_attempts: 5 # or [0, 1, 2] for backoff + # channel_prefix: my_app + class AsyncRedis < Base + prepend ChannelPrefix + + # Sentinel pushed into the inbox to terminate the reactor thread. + SHUTDOWN = :shutdown + + # Create a new adapter instance. + def initialize(*) + super + @mutex = ::Mutex.new + @inbox = nil + @thread = nil + end + + # Publish a payload to a Redis channel. Safe to call from any + # thread or fiber; the work is queued onto the adapter's dedicated + # reactor thread. + # @parameter channel [String] The Redis channel name. + # @parameter payload [String] The encoded message payload. + def broadcast(channel, payload) + inbox.push([:broadcast, channel, payload]) + end + + # Subscribe to a Redis channel. The `success_callback` (if given) + # is invoked once the subscription has been issued. + # @parameter channel [String] The Redis channel name. + # @parameter callback [Proc] Invoked with each received payload. + # @parameter success_callback [Proc, nil] Invoked after subscribe. + def subscribe(channel, callback, success_callback = nil) + inbox.push([:subscribe, channel, callback, success_callback]) + end + + # Remove a previously-registered subscription. + # @parameter channel [String] The Redis channel name. + # @parameter callback [Proc] The callback originally passed to `#subscribe`. + def unsubscribe(channel, callback) + inbox.push([:unsubscribe, channel, callback]) + end + + # Shut down the adapter, closing both Redis clients and stopping + # the reactor thread. + def shutdown + @mutex.synchronize do + return unless @thread + @inbox.push(SHUTDOWN) + @thread.join + @thread = nil + @inbox = nil + end + end + + private + + def inbox + @inbox || @mutex.synchronize{@inbox ||= start_reactor_thread} + end + + def start_reactor_thread + inbox = ::Thread::Queue.new + @thread = ::Thread.new do + ::Thread.current.name = "async-cable redis adapter" + Sync do + Worker.new(inbox, endpoint, executor, logger: logger, reconnect_attempts: reconnect_attempts).run + end + end + inbox + end + + def endpoint + @endpoint ||= if url = config_options[:url] + ::Async::Redis::Endpoint.parse(url) + else + ::Async::Redis::Endpoint.local + end + end + + def reconnect_attempts + value = config_options.fetch(:reconnect_attempts, 1) + value.is_a?(Integer) ? Array.new(value, 0) : Array(value) + end + + def config_options + @config_options ||= config.cable.deep_symbolize_keys.merge(id: identifier) + end + + # Lives entirely on the adapter's reactor thread. Owns one + # {Async::Redis::Client} that is shared between publishing and + # subscribing: PUBLISH acquires a pooled connection per call, + # while a single long-lived {Context::Subscription} multiplexes + # every channel the adapter has subscribed to. + # + # All access to the client happens on this thread's reactor, so + # the pool's internal `Async::Semaphore` (which is not + # thread-safe) is only ever touched from one thread. + class Worker + def initialize(inbox, endpoint, executor, logger: nil, reconnect_attempts: [0]) + @inbox = inbox + @endpoint = endpoint + @executor = executor + @logger = logger + @reconnect_attempts = reconnect_attempts + + @client = nil + @subscribers = ::Hash.new{|hash, key| hash[key] = []} + @subscriber_context = nil + @pending_subscribes = [] + end + + # Main reactor-thread entry point. + def run + @client = ::Async::Redis::Client.new(@endpoint) + task = ::Async::Task.current + + listener_task = task.async{run_listener} + + while (command = @inbox.pop) + break if command == SHUTDOWN + + # Dispatch as a fiber so a slow PUBLISH (network stall) + # can't block subsequent commands or message delivery: + task.async{dispatch(command)} + end + ensure + listener_task&.stop + @client&.close + end + + private + + def dispatch(command) + case command.first + when :broadcast + _, channel, payload = command + @client.call("PUBLISH", channel, payload) + when :subscribe + _, channel, callback, success_callback = command + local_subscribe(channel, callback) + @executor.post(&success_callback) if success_callback + when :unsubscribe + _, channel, callback = command + local_unsubscribe(channel, callback) + end + rescue => error + @logger&.error("AsyncRedis dispatch (#{command.first}): #{error.class}: #{error.message}") + end + + def local_subscribe(channel, callback) + new_channel = @subscribers[channel].empty? + @subscribers[channel] << callback + return unless new_channel + + if @subscriber_context + @subscriber_context.subscribe([channel]) + else + # Listener still connecting; it will pick this up on + # connect: + @pending_subscribes << channel + end + end + + def local_unsubscribe(channel, callback) + return unless @subscribers.key?(channel) + @subscribers[channel].delete(callback) + return unless @subscribers[channel].empty? + + @subscribers.delete(channel) + @subscriber_context&.unsubscribe([channel]) + end + + # Long-lived task that maintains the SUBSCRIBE connection and + # routes incoming messages back to subscribers via the Action + # Cable executor. + def run_listener + attempts = 0 + + loop do + begin + @client.subscribe("_async_cable_internal") do |context| + @subscriber_context = context + + # Resubscribe everything we already know about, + # plus anything queued while disconnected: + initial = @subscribers.keys | @pending_subscribes + @pending_subscribes.clear + context.subscribe(initial) unless initial.empty? + + context.each do |_type, channel, data| + next unless data + @subscribers[channel]&.each do |callback| + @executor.post{callback.call(data)} + end + end + end + + attempts = 0 + rescue => error + @subscriber_context = nil + raise if attempts >= @reconnect_attempts.size + + @logger&.error("AsyncRedis listener: #{error.class}: #{error.message}") + delay = @reconnect_attempts[attempts] + ::Async::Task.current.sleep(delay) if delay && delay > 0 + attempts += 1 + ensure + @subscriber_context = nil + end + end + end + end + end + end +end diff --git a/readme.md b/readme.md index a5c8884..2afd421 100644 --- a/readme.md +++ b/readme.md @@ -12,6 +12,29 @@ Please see the [project documentation](https://socketry.github.io/async-cable/) - [Getting Started](https://socketry.github.io/async-cable/guides/getting-started/index) - This guide shows you how to add `async-cable` to your project to enable real-time communication between clients and servers using Falcon and Action Cable. +### Async Redis Adapter + +`async-cable` ships with a fiber-based Redis subscription adapter: `async_redis`. It mirrors Rails' built-in `redis` adapter (dynamic subscribe/unsubscribe, reconnect + resubscribe) but uses [`async-redis`](https://github.com/socketry/async-redis) so all I/O runs cooperatively on the fiber scheduler instead of blocking a thread. + +Configure Action Cable to use it by setting `adapter: async_redis` in `config/cable.yml`: + +```yaml +production: + adapter: async_redis + url: <%= ENV.fetch("REDIS_URL", "redis://127.0.0.1:6379/0") %> + # Optional: + channel_prefix: <%= "#{Rails.application.class.module_parent_name.underscore}_production" %> + reconnect_attempts: [0, 1, 2, 5] +``` + +`reconnect_attempts` accepts either an integer (retry that many times with no delay) or an array of per-attempt delays in seconds. + +Notes: + +- Broadcasting (`#broadcast`) is safe to call from any thread or fiber. When the caller is already inside a reactor it runs cooperatively; otherwise a transient reactor is opened via `Sync { }`. +- The subscription listener runs on a single dedicated thread hosting its own reactor and shared across all channels. +- `success_callback` is invoked immediately after issuing `SUBSCRIBE` (async-redis does not expose subscribe ACKs). + ## Releases Please see the [project releases](https://socketry.github.io/async-cable/releases/index) for all releases. diff --git a/test/async/cable/async_redis_adapter.rb b/test/async/cable/async_redis_adapter.rb new file mode 100644 index 0000000..f4188a0 --- /dev/null +++ b/test/async/cable/async_redis_adapter.rb @@ -0,0 +1,105 @@ +# frozen_string_literal: true + +# Released under the MIT License. +# Copyright, 2026, by Tom and contributors. +# Copyright, 2026, by Samuel Williams. + +require "action_cable" +require "active_support/core_ext/hash" +require "concurrent" +require "async/cable" +require "action_cable/subscription_adapter/async_redis" +require "sus/fixtures/async" + +REDIS_URL = ENV.fetch("REDIS_URL", "redis://127.0.0.1:6379") + +# Cheap TCP-level probe so the suite cleanly skips when Redis isn't running +# (e.g. local development). CI provides a Redis service container. +REDIS_AVAILABLE = begin + require "socket" + uri = URI.parse(REDIS_URL) + Socket.tcp(uri.host, uri.port, connect_timeout: 1).close + true +rescue StandardError + false +end + +describe ActionCable::SubscriptionAdapter::AsyncRedis do + include Sus::Fixtures::Async::ReactorContext + + let(:cable_server) {::ActionCable::Server::Base.new} + + before do + cable_server.config.logger = Console + cable_server.config.cable = { + "adapter" => "async_redis", + "url" => REDIS_URL, + } + end + + let(:adapter) {subject.new(cable_server)} + + it "is resolvable via Action Cable configuration" do + expect(cable_server.config.pubsub_adapter).to be == subject + end + + with "a running Redis", if: REDIS_AVAILABLE do + after do + adapter.shutdown + end + + it "round-trips a message from broadcast to subscriber" do + received = ::Thread::Queue.new + subscribed = ::Thread::Queue.new + + adapter.subscribe("test-channel", ->(message){received.push(message)}, ->{subscribed.push(true)}) + + # Wait for the subscription to be issued before broadcasting, + # otherwise the message can be published into the void: + subscribed.pop + + adapter.broadcast("test-channel", "hello world") + + expect(received.pop).to be == "hello world" + end + + it "delivers to multiple subscribers on the same channel" do + received_a = ::Thread::Queue.new + received_b = ::Thread::Queue.new + ready = ::Thread::Queue.new + + adapter.subscribe("multi-channel", ->(m){received_a.push(m)}, ->{ready.push(true)}) + adapter.subscribe("multi-channel", ->(m){received_b.push(m)}, ->{ready.push(true)}) + + 2.times{ready.pop} + + adapter.broadcast("multi-channel", "fanout") + + expect(received_a.pop).to be == "fanout" + expect(received_b.pop).to be == "fanout" + end + + it "stops delivering after unsubscribe" do + received = ::Thread::Queue.new + subscribed = ::Thread::Queue.new + callback = ->(message){received.push(message)} + + adapter.subscribe("toggle-channel", callback, ->{subscribed.push(true)}) + subscribed.pop + + adapter.unsubscribe("toggle-channel", callback) + + # Round-trip a sentinel message via a second subscriber to confirm + # the UNSUBSCRIBE has been processed by the server: + sentinel = ::Thread::Queue.new + adapter.subscribe("toggle-channel-sentinel", ->(m){sentinel.push(m)}, ->{sentinel.push(:ready)}) + sentinel.pop # :ready + + adapter.broadcast("toggle-channel", "should not arrive") + adapter.broadcast("toggle-channel-sentinel", "sync") + + expect(sentinel.pop).to be == "sync" + expect(received).to be(:empty?) + end + end +end