Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion lib/async/cable/middleware.rb
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,12 @@ def handle_incoming_websocket(env, websocket)

while message = websocket.read
# Console.debug(self, "Received cable data:", message.buffer)
connection.handle_incoming(@coder.decode(message.buffer))
begin
connection.handle_incoming(@coder.decode(message.buffer))
rescue ActionCable::Connection::Subscriptions::Error => error
# Subscription-level errors (e.g. `AlreadySubscribedError` raised when a client re-sends a `subscribe` command, which happens during Turbo morph/refresh cycles) should not tear down the entire WebSocket connection. Log and continue so the connection (and any underlying pubsub subscriptions, like PostgreSQL `LISTEN`) stays alive.
Console.warn(self, "Subscription error (ignored):", error)
end
end
rescue Protocol::WebSocket::ClosedError, EOFError
# This is a normal disconnection.
Expand Down
1 change: 1 addition & 0 deletions releases.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
## Unreleased

- Add {ruby Async::Cable::Socket#raw_transmit} for pushing pre-encoded payloads to the client without re-encoding. Enables "fastlane" broadcasts that encode the message once and share it across many connections.
- Rescue `ActionCable::Connection::Subscriptions::Error` (e.g. `AlreadySubscribedError` raised during Turbo morph/refresh cycles) instead of tearing down the WebSocket connection. Keeps underlying pubsub subscriptions (e.g. PostgreSQL `LISTEN`) alive across rapid resubscribe attempts.

## v0.3.0

Expand Down
44 changes: 44 additions & 0 deletions test/async/cable/middleware.rb
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,50 @@ def close_write(_ = nil); end
connection.close
end

it "survives subscription errors without tearing down the WebSocket" do
# Subscribe twice with the same identifier. ActionCable raises
# `Subscriptions::AlreadySubscribedError` on the second one; the middleware
# should log a warning and keep the connection alive so subsequent
# commands still work.
subscribe_message = ::Protocol::WebSocket::TextMessage.generate({
command: "subscribe",
identifier: identifier,
})

subscribe_message.send(connection)

while message = connection.read
break if message.parse[:type] == "confirm_subscription"
end

# Duplicate subscribe — would otherwise raise AlreadySubscribedError and
# kill the connection.
subscribe_message.send(connection)
connection.flush

# Connection is still alive: send a broadcast and verify it round-trips.
broadcast_message = ::Protocol::WebSocket::TextMessage.generate(
command: "message",
identifier: identifier,
data: {action: "broadcast", payload: "after-duplicate-subscribe"}.to_json,
)
broadcast_message.send(connection)
connection.flush

while message = connection.read
parsed = message.parse
if parsed[:identifier] == identifier && parsed[:message]
break
end
end

expect(parsed[:message]).to have_keys(payload: be == "after-duplicate-subscribe")

connection.shutdown
ensure
connection.close
end

it "handles server restart cleanly when a channel transmits during unsubscribed" do
# Subscribe to the channel so TestChannel#unsubscribed will be triggered on close.
subscribe_message = ::Protocol::WebSocket::TextMessage.generate({
Expand Down