Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion lib/async/cable/socket.rb
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,16 @@ def run(parent: Async::Task.current)
# @parameter data [Object] The data to transmit, which will be encoded by the coder.
def transmit(data)
# Console.info(self, "Transmitting data:", data, task: Async::Task.current?)
return if @output.closed?
@output.push(@coder.encode(data))
rescue ClosedQueueError
# Closed concurrently between the check and the push.
end

# Close the outbound queue, causing the drain task to terminate once all pending messages have been sent.
def close
@output.close
# Console.info(self, "Closing socket.", task: Async::Task.current?)
@output.close unless @output.closed?
end

# This can be called from the work pool, off the event loop.
Expand Down
18 changes: 18 additions & 0 deletions test/async/cable/socket.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# frozen_string_literal: true

require "async/cable/socket"
require "action_cable"

describe Async::Cable::Socket do
let(:socket) {subject.new({}, nil, ::ActionCable::Server::Base.new)}

it "transmit returns nil after close" do
socket.close
expect(socket.transmit({type: "ping"})).to be_nil
end

it "close is idempotent" do
socket.close
socket.close
end
end
Loading