Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog/66435.fixed.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fixed ``TypeError: argument must be an int, or have a fileno() method.`` raised from ``salt.transport.tcp.PublishClient.recv`` when the IOStream's underlying socket was torn down between the stream-not-None check and the selector peek. The non-blocking recv path now treats a missing socket as "no events pending" and returns ``None`` so the caller can reconnect cleanly instead of crashing.
14 changes: 12 additions & 2 deletions salt/transport/tcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -385,11 +385,21 @@ async def recv(self, timeout=None):
for msg in self.unpacker:
return msg[b"body"]

# The stream may be present but its underlying socket can be
# torn down concurrently (Tornado's IOStream sets ``socket``
# to ``None`` once closed). Passing that to selectors would
# raise ``TypeError: argument must be an int, or have a
# fileno() method.`` -- treat it as "no events pending" and
# let the caller try again. See issue #66435.
sock = self._stream.socket
if sock is None:

@rvesselinov rvesselinov Jun 23, 2026

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit:
Will here self._stream point at the dead IOStream?

I'm afraid that while True: msg = await PublishClient.recv(timeout=0) will start returning None forever without ever reconnecting.

return None

with selectors.DefaultSelector() as sel:
sel.register(self._stream.socket, selectors.EVENT_READ)
sel.register(sock, selectors.EVENT_READ)
ready = sel.select(timeout=0)
events = [key.fileobj for key, _ in ready]
sel.unregister(self._stream.socket)
sel.unregister(sock)

if events:
while not self._closing:
Expand Down
33 changes: 33 additions & 0 deletions tests/pytests/unit/transport/test_publish_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -332,3 +332,36 @@ async def test_recv_timeout_zero():
mock_selector_instance.unregister.assert_called_once_with(mock_socket)
mock_selector_instance.__enter__.assert_called_once()
mock_selector_instance.__exit__.assert_called_once()


async def test_recv_timeout_zero_stream_socket_none():
"""
Regression test for #66435.

If a stream's underlying socket has been torn down concurrently (the
Tornado ``IOStream`` keeps a reference to itself but its ``socket``
attribute becomes ``None`` once closed), ``recv(timeout=0)`` used to
pass ``None`` straight to ``select.select()`` / ``selectors.register``
and crash with::

TypeError: argument must be an int, or have a fileno() method.

The non-blocking peek should treat a missing socket as "no events
pending" and return ``None`` so the caller can re-enter the connect
loop, not propagate a fatal exception.
"""
host = "127.0.0.1"
port = 11122
ioloop = MagicMock()
mock_stream = MagicMock()
mock_stream.socket = None
mock_unpacker = MagicMock()
mock_unpacker.__iter__.return_value = []

with patch("salt.utils.msgpack.Unpacker", return_value=mock_unpacker):
client = salt.transport.tcp.PublishClient({}, ioloop, host=host, port=port)
client._stream = mock_stream
# Must not raise.
result = await client.recv(timeout=0)

assert result is None
8 changes: 4 additions & 4 deletions tools/ci.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
from typing import TYPE_CHECKING, Any, Literal

import yaml
from rich.markup import escape
from ptscripts import Context, command_group
from rich.markup import escape

import tools.utils
import tools.utils.gh
Expand Down Expand Up @@ -855,7 +855,7 @@ def workflow_config(
config["testrun"] = _define_testrun(ctx, changed_files, labels, full)

ctx.info(f"{'==== testrun ====':^80s}")
ctx.info(escape(pprint.pformat(config['testrun'])))
ctx.info(escape(pprint.pformat(config["testrun"])))
ctx.info(f"{'==== testrun ====':^80s}")

jobs = {
Expand Down Expand Up @@ -887,15 +887,15 @@ def workflow_config(
for platform in platforms
}
ctx.info(f"{'==== build matrix ====':^80s}")
ctx.info(escape(pprint.pformat(config['build-matrix'])))
ctx.info(escape(pprint.pformat(config["build-matrix"])))
ctx.info(f"{'==== end build matrix ====':^80s}")
config["artifact-matrix"] = []
for platform in platforms:
config["artifact-matrix"] += [
dict({"platform": platform}, **_) for _ in config["build-matrix"][platform]
]
ctx.info(f"{'==== artifact matrix ====':^80s}")
ctx.info(escape(pprint.pformat(config['artifact-matrix'])))
ctx.info(escape(pprint.pformat(config["artifact-matrix"])))
ctx.info(f"{'==== end artifact matrix ====':^80s}")

# Get salt releases.
Expand Down
Loading