diff --git a/lib/async/cable/socket.rb b/lib/async/cable/socket.rb index f72df54..50c62ce 100644 --- a/lib/async/cable/socket.rb +++ b/lib/async/cable/socket.rb @@ -68,6 +68,12 @@ def transmit(data) @output.push(@coder.encode(data)) end + # Enqueue an already-encoded message for asynchronous delivery to the client, bypassing the coder. Useful for "fastlane" broadcasts where the payload is encoded once and shared across many connections. + # @parameter data [String] The pre-encoded message to transmit. + def raw_transmit(data) + @output.push(data) + end + # Close the outbound queue, causing the drain task to terminate once all pending messages have been sent. def close # Console.info(self, "Closing socket.", task: Async::Task.current?) diff --git a/releases.md b/releases.md index 8947759..fe6c5db 100644 --- a/releases.md +++ b/releases.md @@ -1,5 +1,9 @@ # Releases +## Unreleased + + - Add {ruby Async::Cable::Socket#raw_transmit} for pushing pre-encoded payloads to the client without re-encoding. Enables "fastlane" broadcasts that encode the message once and share it across many connections. + ## v0.3.0 - Filter requests based on path - don't eat all inbound WebSocket connections. diff --git a/test/async/cable/socket.rb b/test/async/cable/socket.rb index b896264..8759a51 100644 --- a/test/async/cable/socket.rb +++ b/test/async/cable/socket.rb @@ -56,6 +56,29 @@ def fake_application.env_config; {"rails.test" => true}; end end end + with "#transmit" do + it "encodes data using the coder before enqueueing it" do + socket.transmit({type: "ping"}) + expect(socket.instance_variable_get(:@output).pop).to be == ActiveSupport::JSON.encode({type: "ping"}) + end + end + + with "#raw_transmit" do + it "enqueues the data verbatim without encoding it" do + payload = ActiveSupport::JSON.encode({type: "ping"}) + socket.raw_transmit(payload) + expect(socket.instance_variable_get(:@output).pop).to be_equal(payload) + end + + it "cannot raw_transmit after close" do + socket.close + + expect do + socket.raw_transmit("already encoded") + end.to raise_exception(ClosedQueueError) + end + end + with "#run" do include Sus::Fixtures::Async::ReactorContext