diff --git a/lib/async/cable/socket.rb b/lib/async/cable/socket.rb index b281446..765e748 100644 --- a/lib/async/cable/socket.rb +++ b/lib/async/cable/socket.rb @@ -65,12 +65,16 @@ def run(parent: Async::Task.current) # @parameter data [Object] The data to transmit, which will be encoded by the coder. def transmit(data) # Console.info(self, "Transmitting data:", data, task: Async::Task.current?) + return if @output.closed? @output.push(@coder.encode(data)) + rescue ClosedQueueError + # Closed concurrently between the check and the push. end # Close the outbound queue, causing the drain task to terminate once all pending messages have been sent. def close - @output.close + # Console.info(self, "Closing socket.", task: Async::Task.current?) + @output.close unless @output.closed? end # This can be called from the work pool, off the event loop. diff --git a/test/async/cable/socket.rb b/test/async/cable/socket.rb new file mode 100644 index 0000000..4d25a3a --- /dev/null +++ b/test/async/cable/socket.rb @@ -0,0 +1,18 @@ +# frozen_string_literal: true + +require "async/cable/socket" +require "action_cable" + +describe Async::Cable::Socket do + let(:socket) {subject.new({}, nil, ::ActionCable::Server::Base.new)} + + it "transmit returns nil after close" do + socket.close + expect(socket.transmit({type: "ping"})).to be_nil + end + + it "close is idempotent" do + socket.close + socket.close + end +end