From 2ad91d9e1d1e08dab8985f1621ca333b0cef85c7 Mon Sep 17 00:00:00 2001 From: SAI KISHAN A <222060771+kishansaaai@users.noreply.github.com> Date: Sat, 20 Jun 2026 16:42:41 +0530 Subject: [PATCH 1/4] Add fast path for net_io_counters on Windows to reduce idle CPU usage (#9161) --- distributed/_windows_net_io.py | 222 +++++++++++++++++++++++ distributed/system_monitor.py | 10 +- distributed/tests/test_system_monitor.py | 48 +++++ 3 files changed, 278 insertions(+), 2 deletions(-) create mode 100644 distributed/_windows_net_io.py diff --git a/distributed/_windows_net_io.py b/distributed/_windows_net_io.py new file mode 100644 index 00000000000..50ea3a47d97 --- /dev/null +++ b/distributed/_windows_net_io.py @@ -0,0 +1,222 @@ +# Windows-only fast path for retrieving net_io_counters. +# Part of fix for Issue #9161 (high CPU when idle, caused by expensive psutil.net_io_counters on Windows). + +import ctypes +import logging +from collections import namedtuple +from ctypes import wintypes + +logger = logging.getLogger("distributed.system_monitor") + +# Define GAA flags to skip unnecessary data and speed up the call +GAA_FLAG_SKIP_UNICAST = 0x0001 +GAA_FLAG_SKIP_ANYCAST = 0x0002 +GAA_FLAG_SKIP_MULTICAST = 0x0004 +GAA_FLAG_SKIP_DNS_SERVER = 0x0008 + +# Define compatible namedtuple for psutil's snetio +win_net_io_counters = namedtuple( + "win_net_io_counters", + [ + "bytes_sent", + "bytes_recv", + "packets_sent", + "packets_recv", + "errin", + "errout", + "dropin", + "dropout", + ], +) + + +class IP_ADAPTER_ADDRESSES(ctypes.Structure): + pass + + +# We only define fields up to FriendlyName. Since we only read the memory +# returned by GetAdaptersAddresses, this is safe and avoids mapping all fields. +IP_ADAPTER_ADDRESSES._fields_ = [ + ("Length", ctypes.c_ulong), + ("IfIndex", ctypes.c_ulong), + ("Next", ctypes.POINTER(IP_ADAPTER_ADDRESSES)), + ("AdapterName", ctypes.c_char_p), + ("FirstUnicastAddress", ctypes.c_void_p), + ("FirstAnycastAddress", ctypes.c_void_p), + ("FirstMulticastAddress", ctypes.c_void_p), + ("FirstDnsServerAddress", ctypes.c_void_p), + ("DnsSuffix", ctypes.c_wchar_p), + ("Description", ctypes.c_wchar_p), + ("FriendlyName", ctypes.c_wchar_p), +] + + +class MIB_IF_ROW2(ctypes.Structure): + _fields_ = [ + ("InterfaceLuid", ctypes.c_uint64), + ("InterfaceIndex", ctypes.c_ulong), + ("InterfaceGuid", ctypes.c_byte * 16), + ("Alias", ctypes.c_wchar * 257), + ("Description", ctypes.c_wchar * 257), + ("PhysicalAddressLength", ctypes.c_ulong), + ("PhysicalAddress", ctypes.c_byte * 32), + ("PermanentPhysicalAddress", ctypes.c_byte * 32), + ("Mtu", ctypes.c_ulong), + ("Type", ctypes.c_ulong), + ("TunnelType", ctypes.c_ulong), + ("MediaType", ctypes.c_ulong), + ("PhysicalMediumType", ctypes.c_ulong), + ("AccessType", ctypes.c_ulong), + ("DirectionType", ctypes.c_ulong), + ("InterfaceAndOperStatusFlags", ctypes.c_byte), + # 3 bytes padding (added automatically by ctypes alignment) + ("OperStatus", ctypes.c_ulong), + ("AdminStatus", ctypes.c_ulong), + ("MediaConnectState", ctypes.c_ulong), + ("NetworkGuid", ctypes.c_byte * 16), + ("ConnectionType", ctypes.c_ulong), + # 4 bytes padding (added automatically by ctypes alignment) + ("TransmitLinkSpeed", ctypes.c_uint64), + ("ReceiveLinkSpeed", ctypes.c_uint64), + ("InOctets", ctypes.c_uint64), + ("InUcastPkts", ctypes.c_uint64), + ("InNUcastPkts", ctypes.c_uint64), + ("InDiscards", ctypes.c_uint64), + ("InErrors", ctypes.c_uint64), + ("InUnknownProtos", ctypes.c_uint64), + ("InUcastOctets", ctypes.c_uint64), + ("InMulticastOctets", ctypes.c_uint64), + ("InBroadcastOctets", ctypes.c_uint64), + ("OutOctets", ctypes.c_uint64), + ("OutUcastPkts", ctypes.c_uint64), + ("OutNUcastPkts", ctypes.c_uint64), + ("OutDiscards", ctypes.c_uint64), + ("OutErrors", ctypes.c_uint64), + ("OutUcastOctets", ctypes.c_uint64), + ("OutMulticastOctets", ctypes.c_uint64), + ("OutBroadcastOctets", ctypes.c_uint64), + ("OutQLen", ctypes.c_uint64), + ] + + +import sys + +# Setup Windows API calls +if sys.platform == "win32": + iphlpapi = ctypes.windll.iphlpapi + + GetAdaptersAddresses = iphlpapi.GetAdaptersAddresses + GetAdaptersAddresses.argtypes = [ + wintypes.ULONG, + wintypes.ULONG, + ctypes.c_void_p, + ctypes.POINTER(IP_ADAPTER_ADDRESSES), + ctypes.POINTER(wintypes.ULONG), + ] + GetAdaptersAddresses.restype = wintypes.ULONG + + GetIfEntry2 = iphlpapi.GetIfEntry2 + GetIfEntry2.argtypes = [ctypes.POINTER(MIB_IF_ROW2)] + GetIfEntry2.restype = wintypes.ULONG +else: + iphlpapi = None + GetAdaptersAddresses = None + GetIfEntry2 = None + + +# Cached buffer size for GetAdaptersAddresses +_ADAPTER_ADDRESSES_BUF_SIZE = 16384 + + +def _fast_net_io_counters() -> win_net_io_counters: + """Low-overhead Windows-only network I/O stats querying using Win32 API.""" + global _ADAPTER_ADDRESSES_BUF_SIZE + size = wintypes.ULONG(_ADAPTER_ADDRESSES_BUF_SIZE) + buf = ctypes.create_string_buffer(size.value) + + flags = ( + GAA_FLAG_SKIP_UNICAST + | GAA_FLAG_SKIP_ANYCAST + | GAA_FLAG_SKIP_MULTICAST + | GAA_FLAG_SKIP_DNS_SERVER + ) + + ret = GetAdaptersAddresses( + 0, # AF_UNSPEC + flags, + None, + ctypes.cast(buf, ctypes.POINTER(IP_ADAPTER_ADDRESSES)), + ctypes.byref(size), + ) + + # If the buffer was too small, update the cached size, allocate, and call again + if ret == 111: # ERROR_BUFFER_OVERFLOW + _ADAPTER_ADDRESSES_BUF_SIZE = size.value + buf = ctypes.create_string_buffer(size.value) + ret = GetAdaptersAddresses( + 0, + flags, + None, + ctypes.cast(buf, ctypes.POINTER(IP_ADAPTER_ADDRESSES)), + ctypes.byref(size), + ) + + if ret != 0: + raise OSError(f"GetAdaptersAddresses failed with error {ret}") + + curr_ptr = ctypes.cast(buf, ctypes.POINTER(IP_ADAPTER_ADDRESSES)) + + bytes_recv = 0 + bytes_sent = 0 + packets_recv = 0 + packets_sent = 0 + errin = 0 + errout = 0 + dropin = 0 + dropout = 0 + + while curr_ptr: + curr = curr_ptr.contents + ifIndex = curr.IfIndex + + row = MIB_IF_ROW2() + row.InterfaceIndex = ifIndex + + status = GetIfEntry2(ctypes.byref(row)) + if status == 0: + bytes_recv += row.InOctets + bytes_sent += row.OutOctets + packets_recv += row.InUcastPkts + row.InNUcastPkts + packets_sent += row.OutUcastPkts + row.OutNUcastPkts + errin += row.InErrors + errout += row.OutErrors + dropin += row.InDiscards + dropout += row.OutDiscards + + curr_ptr = curr.Next + + return win_net_io_counters( + bytes_sent=bytes_sent, + bytes_recv=bytes_recv, + packets_sent=packets_sent, + packets_recv=packets_recv, + errin=errin, + errout=errout, + dropin=dropin, + dropout=dropout, + ) + + +def fast_net_io_counters(): + """Wrapper that falls back to psutil.net_io_counters on error.""" + try: + return _fast_net_io_counters() + except Exception as e: + logger.debug( + "Windows fast path net_io_counters failed, falling back to psutil: %r", + e, + exc_info=True, + ) + import psutil + + return psutil.net_io_counters() diff --git a/distributed/system_monitor.py b/distributed/system_monitor.py index 9bdbfb8717a..29228a345d6 100644 --- a/distributed/system_monitor.py +++ b/distributed/system_monitor.py @@ -14,6 +14,12 @@ from distributed.diagnostics import nvml from distributed.metrics import monotonic, time +if sys.platform == "win32": + # Win32 fast path for Issue #9161 + from distributed._windows_net_io import fast_net_io_counters +else: + fast_net_io_counters = psutil.net_io_counters + class SystemMonitor: proc: psutil.Process @@ -63,7 +69,7 @@ def __init__( } try: - self._last_net_io_counters = psutil.net_io_counters() + self._last_net_io_counters = fast_net_io_counters() except Exception: # FIXME is this possible? self.monitor_net_io = False # pragma: nocover @@ -165,7 +171,7 @@ def update(self) -> dict[str, Any]: } if self.monitor_net_io: - net_ioc = psutil.net_io_counters() + net_ioc = fast_net_io_counters() last = self._last_net_io_counters result["host_net_io.read_bps"] = ( net_ioc.bytes_recv - last.bytes_recv diff --git a/distributed/tests/test_system_monitor.py b/distributed/tests/test_system_monitor.py index 62d0b1f0358..92d2964bb6d 100644 --- a/distributed/tests/test_system_monitor.py +++ b/distributed/tests/test_system_monitor.py @@ -129,3 +129,51 @@ def test_gil_contention(): sm = SystemMonitor() a = sm.update() assert "gil_contention" not in a + + +def test_windows_fast_net_io_counters(): + import sys + + if sys.platform != "win32": + pytest.skip("Windows only test") + + from distributed._windows_net_io import _fast_net_io_counters + + res1 = _fast_net_io_counters() + assert hasattr(res1, "bytes_recv") + assert hasattr(res1, "bytes_sent") + assert isinstance(res1.bytes_recv, int) + assert isinstance(res1.bytes_sent, int) + assert res1.bytes_recv >= 0 + assert res1.bytes_sent >= 0 + + res2 = _fast_net_io_counters() + assert res2.bytes_recv >= res1.bytes_recv + assert res2.bytes_sent >= res1.bytes_sent + + +def test_windows_fast_net_io_counters_fallback(monkeypatch): + import sys + + if sys.platform != "win32": + pytest.skip("Windows only test") + + import distributed._windows_net_io + from distributed._windows_net_io import fast_net_io_counters + + def mock_fast_net_io_counters(): + raise RuntimeError("Simulated ctypes error") + + monkeypatch.setattr( + distributed._windows_net_io, "_fast_net_io_counters", mock_fast_net_io_counters + ) + + # Calling fast_net_io_counters should fall back to psutil without raising + import psutil + + expected = psutil.net_io_counters() + res = fast_net_io_counters() + + # Check that it returns a valid net_io_counters namedtuple or similar from psutil + assert hasattr(res, "bytes_recv") + assert hasattr(res, "bytes_sent") From 6b6a81c18d69397d84fb2e79f0c207712dd7106f Mon Sep 17 00:00:00 2001 From: SAI KISHAN A <222060771+kishansaaai@users.noreply.github.com> Date: Sun, 21 Jun 2026 15:41:19 +0530 Subject: [PATCH 2/4] Fix: Move wintypes import inside win32 platform check to fix Linux CI --- distributed/_windows_net_io.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/distributed/_windows_net_io.py b/distributed/_windows_net_io.py index 50ea3a47d97..9f0193d9fed 100644 --- a/distributed/_windows_net_io.py +++ b/distributed/_windows_net_io.py @@ -4,7 +4,6 @@ import ctypes import logging from collections import namedtuple -from ctypes import wintypes logger = logging.getLogger("distributed.system_monitor") @@ -103,6 +102,8 @@ class MIB_IF_ROW2(ctypes.Structure): # Setup Windows API calls if sys.platform == "win32": + from ctypes import wintypes + iphlpapi = ctypes.windll.iphlpapi GetAdaptersAddresses = iphlpapi.GetAdaptersAddresses From 4991db3c0457f47dad5b003911268f40ea49f75e Mon Sep 17 00:00:00 2001 From: SAI KISHAN A <222060771+kishansaaai@users.noreply.github.com> Date: Sun, 21 Jun 2026 15:58:28 +0530 Subject: [PATCH 3/4] Fix: format code to resolve ruff linting errors --- distributed/_windows_net_io.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/distributed/_windows_net_io.py b/distributed/_windows_net_io.py index 9f0193d9fed..23f458b75a4 100644 --- a/distributed/_windows_net_io.py +++ b/distributed/_windows_net_io.py @@ -3,8 +3,12 @@ import ctypes import logging +import sys from collections import namedtuple +if sys.platform == "win32": + from ctypes import wintypes + logger = logging.getLogger("distributed.system_monitor") # Define GAA flags to skip unnecessary data and speed up the call @@ -98,12 +102,8 @@ class MIB_IF_ROW2(ctypes.Structure): ] -import sys - # Setup Windows API calls if sys.platform == "win32": - from ctypes import wintypes - iphlpapi = ctypes.windll.iphlpapi GetAdaptersAddresses = iphlpapi.GetAdaptersAddresses From 8bd071e8c6d7ac682e15bd7343cbf30ff54d2687 Mon Sep 17 00:00:00 2001 From: SAI KISHAN A <222060771+kishansaaai@users.noreply.github.com> Date: Sun, 21 Jun 2026 16:01:20 +0530 Subject: [PATCH 4/4] Fix: add mypy ignore comments for win32-only types/functions --- distributed/_windows_net_io.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/distributed/_windows_net_io.py b/distributed/_windows_net_io.py index 23f458b75a4..ac112625bc2 100644 --- a/distributed/_windows_net_io.py +++ b/distributed/_windows_net_io.py @@ -132,7 +132,7 @@ class MIB_IF_ROW2(ctypes.Structure): def _fast_net_io_counters() -> win_net_io_counters: """Low-overhead Windows-only network I/O stats querying using Win32 API.""" global _ADAPTER_ADDRESSES_BUF_SIZE - size = wintypes.ULONG(_ADAPTER_ADDRESSES_BUF_SIZE) + size = wintypes.ULONG(_ADAPTER_ADDRESSES_BUF_SIZE) # type: ignore[name-defined] buf = ctypes.create_string_buffer(size.value) flags = ( @@ -142,7 +142,7 @@ def _fast_net_io_counters() -> win_net_io_counters: | GAA_FLAG_SKIP_DNS_SERVER ) - ret = GetAdaptersAddresses( + ret = GetAdaptersAddresses( # type: ignore[misc] 0, # AF_UNSPEC flags, None, @@ -154,7 +154,7 @@ def _fast_net_io_counters() -> win_net_io_counters: if ret == 111: # ERROR_BUFFER_OVERFLOW _ADAPTER_ADDRESSES_BUF_SIZE = size.value buf = ctypes.create_string_buffer(size.value) - ret = GetAdaptersAddresses( + ret = GetAdaptersAddresses( # type: ignore[misc] 0, flags, None, @@ -183,7 +183,7 @@ def _fast_net_io_counters() -> win_net_io_counters: row = MIB_IF_ROW2() row.InterfaceIndex = ifIndex - status = GetIfEntry2(ctypes.byref(row)) + status = GetIfEntry2(ctypes.byref(row)) # type: ignore[misc] if status == 0: bytes_recv += row.InOctets bytes_sent += row.OutOctets