Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions tornado/iostream.py
Original file line number Diff line number Diff line change
Expand Up @@ -594,6 +594,7 @@ def close(
self._state = None
self.close_fd()
self._closed = True
_discard_pending_ssl_stream(self)
self._signal_closed()

def _signal_closed(self) -> None:
Expand Down Expand Up @@ -1092,6 +1093,8 @@ def fileno(self) -> int | ioloop._Selectable:
return self.socket

def close_fd(self) -> None:
if self.socket is None:
return
self.socket.close()
self.socket = None # type: ignore

Expand Down Expand Up @@ -1240,6 +1243,11 @@ def start_tls(
socket = self.socket
self.io_loop.remove_handler(socket)
self.socket = None # type: ignore
# The IOStream no longer owns any I/O resources; mark it as
# quiesced so a subsequent ``close()`` does not try to
# dereference the now-``None`` socket. See
# https://github.com/tornadoweb/tornado/issues/3614.
self._state = None
socket = ssl_wrap_socket(
socket,
ssl_options,
Expand All @@ -1256,6 +1264,23 @@ def start_tls(
ssl_stream._ssl_connect_future = future
ssl_stream.max_buffer_size = self.max_buffer_size
ssl_stream.read_chunk_size = self.read_chunk_size
# Expose the in-progress ``SSLIOStream`` on the original stream so
# callers (notably ``TCPClient.connect`` under a deadline) can
# close it if the handshake is abandoned. ``IOStream.close``
# looks at this attribute and releases the underlying socket
# if it is still set. See
# https://github.com/tornadoweb/tornado/issues/3614.
self._pending_ssl_stream = ssl_stream
# The ``start_tls`` future resolves when the handshake finishes
# (successfully or with an error). At that point the SSLIOStream
# is no longer ``in-flight`` from the perspective of the original
# IOStream: the caller has either received it as the result, or
# it has been closed as part of the failure path. Either way the
# original IOStream should not hold a reference to it any more.
# See https://github.com/tornadoweb/tornado/issues/3614.
future.add_done_callback(
lambda f: _discard_pending_ssl_stream(self, close_it=False)
)
return future

def _handle_connect(self) -> None:
Expand Down Expand Up @@ -1602,6 +1627,29 @@ def read_from_fd(self, buf: bytearray | memoryview) -> int | None:
del buf


def _discard_pending_ssl_stream(stream: IOStream, *, close_it: bool = True) -> None:
"""Close or release any in-flight ``SSLIOStream`` created by ``start_tls``.

``IOStream.start_tls`` transfers ownership of the underlying socket
to a new ``SSLIOStream`` *before* the TLS handshake completes. If a
caller (typically ``TCPClient.connect``) times out while the
handshake is in flight, the original ``IOStream`` is the only
reference still reachable from the caller's scope, but its
``.socket`` is already ``None`` and calling ``close()`` on it is a
no-op. The new ``SSLIOStream`` (and the socket it owns) becomes
unreachable and leaks permanently.

This helper closes that in-flight ``SSLIOStream`` (if any) when the
original stream is closed, so its socket is released.
"""
ssl_stream = getattr(stream, "_pending_ssl_stream", None)
if ssl_stream is None:
return
stream._pending_ssl_stream = None
if close_it and not ssl_stream.closed():
ssl_stream.close()


def doctests() -> Any:
import doctest

Expand Down
31 changes: 22 additions & 9 deletions tornado/tcpclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,17 +272,30 @@ async def connect(
# information here and re-use it on subsequent connections to
# the same host. (http://tools.ietf.org/html/rfc6555#section-4.2)
if ssl_options is not None:
# ``IOStream.start_tls`` transfers ownership of the underlying
# socket to a new ``SSLIOStream`` *before* the TLS handshake
# completes. If a timeout fires while we are waiting on that
# handshake, ``gen.with_timeout`` raises ``TimeoutError`` to
# the caller but does not cancel the inner future, so the
# ``SSLIOStream`` (and its socket) becomes unreachable from
# the caller's reference to the original ``IOStream``
# (whose ``.socket`` is already ``None``) and leaks
# permanently. ``IOStream.close`` looks at
# ``self._pending_ssl_stream`` (set by ``start_tls``) and
# releases the SSLIOStream's socket, so close the original
# stream on timeout to give it a chance to clean up.
# See https://github.com/tornadoweb/tornado/issues/3614.
ssl_future = stream.start_tls(
False, ssl_options=ssl_options, server_hostname=host
)
if timeout is not None:
stream = await gen.with_timeout(
timeout,
stream.start_tls(
False, ssl_options=ssl_options, server_hostname=host
),
)
try:
stream = await gen.with_timeout(timeout, ssl_future)
except gen.TimeoutError:
stream.close()
raise
else:
stream = await stream.start_tls(
False, ssl_options=ssl_options, server_hostname=host
)
stream = await ssl_future
return stream

def _create_stream(
Expand Down
94 changes: 93 additions & 1 deletion tornado/test/tcpclient_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,19 @@
import typing
import unittest
from contextlib import closing
import ssl

from tornado import gen
from tornado.concurrent import Future
from tornado.gen import TimeoutError
from tornado.iostream import IOStream
from tornado.log import app_log
from tornado.netutil import Resolver, bind_sockets
from tornado.queues import Queue
from tornado.tcpclient import TCPClient, _Connector
from tornado.tcpserver import TCPServer
from tornado.test.util import refusing_port, skipIfNoIPv6, skipIfNonUnix
from tornado.testing import AsyncTestCase, gen_test
from tornado.testing import AsyncTestCase, ExpectLog, gen_test

# Fake address families for testing. Used in place of AF_INET
# and AF_INET6 because some installations do not have AF_INET6.
Expand Down Expand Up @@ -174,6 +177,95 @@ def resolve(self, *args, **kwargs):
)


class TCPClientSSLTimeoutTest(AsyncTestCase):
# Regression test for https://github.com/tornadoweb/tornado/issues/3614.
# When a ``TCPClient.connect`` call has both ``ssl_options`` and a
# ``timeout``, and the TLS handshake does not complete in time, the
# underlying socket must be released. The pre-fix code left the
# ``SSLIOStream`` registered on the IOLoop forever, so the file
# descriptor stayed open until the process exited.
def setUp(self):
super().setUp()
self.listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.listener.bind(("127.0.0.1", 0))
self.listener.listen(1)
self.port = self.listener.getsockname()[1]
# Accept and drop the first connection in a background thread so
# the client sees an established TCP socket and then tries to
# perform the TLS handshake against a peer that never speaks.
import threading

self._accept_thread = threading.Thread(
target=self._accept_loop, daemon=True
)
self._accept_thread.start()

def _accept_loop(self):
try:
while True:
conn, _addr = self.listener.accept()
# Hold the server side open but do not send any TLS
# bytes. This is enough to wedge the client's
# ``start_tls`` handshake until the timeout fires.
except OSError:
return

def tearDown(self):
try:
self.listener.close()
except OSError:
pass
super().tearDown()

def _count_open_sockets(self):
# ``/proc/net/tcp`` is the most reliable cross-platform-ish
# signal that a socket is still in the kernel, even if the
# Python file descriptor has been closed via ``close_fd``.
import os

try:
text = open("/proc/net/tcp").read()
except FileNotFoundError:
self.skipTest("/proc/net/tcp not available on this platform")
target_hex = f"{self.port:04X}"
n = 0
for line in text.splitlines()[1:]:
parts = line.split()
# state "01" is ESTABLISHED -- this is what an un-closed
# leaked client socket will look like.
if len(parts) < 4 or parts[3] != "01":
continue
local = parts[1]
if local.upper() == target_hex:
n += 1
return n

@gen_test
def test_ssl_handshake_timeout_releases_socket(self):
ssl_ctx = ssl.create_default_context()
ssl_ctx.check_hostname = False
ssl_ctx.verify_mode = ssl.CERT_NONE

client = TCPClient()
# Suppress the expected "Exception in Future after timeout"
# error log from ``gen.with_timeout`` (the wrapped future will
# fail when its SSLIOStream is force-closed below).
with ExpectLog(app_log, "Exception in Future .* after timeout"):
with self.assertRaises(gen.TimeoutError):
yield client.connect(
"127.0.0.1",
self.port,
ssl_options=ssl_ctx,
timeout=0.2,
)
# Give the IOLoop a moment to run any pending close callbacks
# and to tear the SSLIOStream's socket down.
yield gen.sleep(0.1)
# If the leak is back, there will be a lingering ESTABLISHED
# entry in /proc/net/tcp that points at our local port.
self.assertEqual(self._count_open_sockets(), 0)


class TestConnectorSplit(unittest.TestCase):
def test_one_family(self):
# These addresses aren't in the right format, but split doesn't care.
Expand Down