diff --git a/lib/async/cable/middleware.rb b/lib/async/cable/middleware.rb index ccf187d..03d0899 100644 --- a/lib/async/cable/middleware.rb +++ b/lib/async/cable/middleware.rb @@ -60,7 +60,12 @@ def handle_incoming_websocket(env, websocket) while message = websocket.read # Console.debug(self, "Received cable data:", message.buffer) - connection.handle_incoming(@coder.decode(message.buffer)) + begin + connection.handle_incoming(@coder.decode(message.buffer)) + rescue ActionCable::Connection::Subscriptions::Error => error + # Subscription-level errors (e.g. `AlreadySubscribedError` raised when a client re-sends a `subscribe` command, which happens during Turbo morph/refresh cycles) should not tear down the entire WebSocket connection. Log and continue so the connection (and any underlying pubsub subscriptions, like PostgreSQL `LISTEN`) stays alive. + Console.warn(self, "Subscription error (ignored):", error) + end end rescue Protocol::WebSocket::ClosedError, EOFError # This is a normal disconnection. diff --git a/releases.md b/releases.md index fe6c5db..743ffd3 100644 --- a/releases.md +++ b/releases.md @@ -3,6 +3,7 @@ ## Unreleased - Add {ruby Async::Cable::Socket#raw_transmit} for pushing pre-encoded payloads to the client without re-encoding. Enables "fastlane" broadcasts that encode the message once and share it across many connections. + - Rescue `ActionCable::Connection::Subscriptions::Error` (e.g. `AlreadySubscribedError` raised during Turbo morph/refresh cycles) instead of tearing down the WebSocket connection. Keeps underlying pubsub subscriptions (e.g. PostgreSQL `LISTEN`) alive across rapid resubscribe attempts. ## v0.3.0 diff --git a/test/async/cable/middleware.rb b/test/async/cable/middleware.rb index 54d6a9d..27cc3e4 100644 --- a/test/async/cable/middleware.rb +++ b/test/async/cable/middleware.rb @@ -207,6 +207,50 @@ def close_write(_ = nil); end connection.close end + it "survives subscription errors without tearing down the WebSocket" do + # Subscribe twice with the same identifier. ActionCable raises + # `Subscriptions::AlreadySubscribedError` on the second one; the middleware + # should log a warning and keep the connection alive so subsequent + # commands still work. + subscribe_message = ::Protocol::WebSocket::TextMessage.generate({ + command: "subscribe", + identifier: identifier, + }) + + subscribe_message.send(connection) + + while message = connection.read + break if message.parse[:type] == "confirm_subscription" + end + + # Duplicate subscribe — would otherwise raise AlreadySubscribedError and + # kill the connection. + subscribe_message.send(connection) + connection.flush + + # Connection is still alive: send a broadcast and verify it round-trips. + broadcast_message = ::Protocol::WebSocket::TextMessage.generate( + command: "message", + identifier: identifier, + data: {action: "broadcast", payload: "after-duplicate-subscribe"}.to_json, + ) + broadcast_message.send(connection) + connection.flush + + while message = connection.read + parsed = message.parse + if parsed[:identifier] == identifier && parsed[:message] + break + end + end + + expect(parsed[:message]).to have_keys(payload: be == "after-duplicate-subscribe") + + connection.shutdown + ensure + connection.close + end + it "handles server restart cleanly when a channel transmits during unsubscribed" do # Subscribe to the channel so TestChannel#unsubscribed will be triggered on close. subscribe_message = ::Protocol::WebSocket::TextMessage.generate({