diff --git a/salt/modules/win_ip.py b/salt/modules/win_ip.py index cb688d0912a8..ad5fa36ac1eb 100644 --- a/salt/modules/win_ip.py +++ b/salt/modules/win_ip.py @@ -2,12 +2,14 @@ The networking module for Windows based systems """ +import ipaddress import logging -import time +import textwrap import salt.utils.network import salt.utils.platform import salt.utils.validate.net +import salt.utils.win_pwsh from salt.exceptions import CommandExecutionError, SaltInvocationError # Set up logging @@ -21,70 +23,107 @@ def __virtual__(): """ Confine this module to Windows systems """ - if salt.utils.platform.is_windows(): - return __virtualname__ - return (False, "Module win_ip: module only works on Windows systems") + if not salt.utils.platform.is_windows(): + return False, "Module win_ip: Only available on Windows" + if not salt.utils.win_pwsh.HAS_CLR: + return False, "Module win_ip: Requires pythonnet (pip install pythonnet)" + if not salt.utils.win_pwsh.HAS_PWSH_SDK: + return ( + False, + "Module win_ip: Requires the PowerShell SDK (System.Management.Automation)", + ) + return __virtualname__ -def _interface_configs(): - """ - Return all interface configs +def _normalize_gateway_fields(data): """ - cmd = ["netsh", "interface", "ip", "show", "config"] - lines = __salt__["cmd.run"](cmd, python_shell=False).splitlines() - ret = {} - current_iface = None - current_ip_list = None - - for line in lines: - - line = line.strip() - if not line: - current_iface = None - current_ip_list = None - continue - - if "Configuration for interface" in line: - _, iface = line.rstrip('"').split('"', 1) # get iface name - current_iface = {} - ret[iface] = current_iface - continue - - if ":" not in line: - if current_ip_list: - current_ip_list.append(line) - else: - log.warning('Cannot parse "%s"', line) - continue - - key, val = line.split(":", 1) - key = key.strip() - val = val.strip() + Ensure ``ipv4_gateways`` and ``ipv6_gateways`` are always lists. - lkey = key.lower() - if ("dns servers" in lkey) or ("wins servers" in lkey): - current_ip_list = [] - current_iface[key] = current_ip_list - current_ip_list.append(val) + PowerShell 5.1's ``ConvertTo-Json`` unwraps single-element arrays to plain + objects, so a single gateway arrives as a ``dict`` rather than a + ``list[dict]``. This normalizes the parsed data in-place so callers always + receive a consistent list type regardless of how many gateways are present. + """ + for key in ("ipv4_gateways", "ipv6_gateways"): + val = data.get(key) + if isinstance(val, dict): + data[key] = [val] + return data - elif "ip address" in lkey: - current_iface.setdefault("ip_addrs", []).append({key: val}) - elif "subnet prefix" in lkey: - subnet, _, netmask = val.split(" ", 2) - last_ip = current_iface["ip_addrs"][-1] - last_ip["Subnet"] = subnet.strip() - last_ip["Netmask"] = netmask.lstrip().rstrip(")") +def _get_interfaces_legacy_format(name=None): + """ + Returns interface data using the legacy netsh-style key names to avoid + breaking existing scripts. The data is sourced from PowerShell objects, + not netsh, so it is locale-independent. + """ + if name: + interfaces = get_interface_new(name) + else: + interfaces = list_interfaces(full=True) + + legacy = {} + for name, data in interfaces.items(): + is_dhcp = data["ipv4_dhcp"] + + # Re-map to the specific netsh labels you had before + legacy[name] = { + "DHCP enabled": "Yes" if data["ipv4_dhcp"] else "No", + "InterfaceMetric": data["ipv4_metric"], + "Register with which suffix": ( + "Primary only" if data["dns_register"] else "None" + ), + } + + legacy[name]["ip_addrs"] = [] + if isinstance(data["ipv4_address"], str): + data["ipv4_address"] = [data["ipv4_address"]] + for addr in data["ipv4_address"]: + ip_info = ipaddress.IPv4Interface(addr) + legacy[name]["ip_addrs"].append( + { + "IP Address": ip_info._string_from_ip_int(ip_info._ip), + "Netmask": str(ip_info.netmask), + "Subnet": str(ip_info.network), + } + ) + # Handle the dynamic Labeling for DNS/WINS + if data["ipv4_dns"]: + if isinstance(data["ipv4_dns"], str): + dns_value = [data["ipv4_dns"]] + else: + dns_value = data["ipv4_dns"] else: - current_iface[key] = val + dns_value = ["None"] + if data["ipv4_wins"]: + if isinstance(data["ipv4_wins"], str): + wins_value = [data["ipv4_wins"]] + else: + wins_value = data["ipv4_wins"] + else: + wins_value = ["None"] + if is_dhcp: + legacy[name]["DNS servers configured through DHCP"] = dns_value + legacy[name]["WINS servers configured through DHCP"] = wins_value + else: + legacy[name]["Statically Configured DNS Servers"] = dns_value + legacy[name]["Statically Configured WINS Servers"] = wins_value - return ret + # Add Gateway if it exists (ipv4_gateways is always a list after normalization) + gws = [g for g in data["ipv4_gateways"] if g.get("ip")] + if gws: + legacy[name]["Default Gateway"] = gws[0]["ip"] + legacy[name]["Gateway Metric"] = gws[0]["metric"] + + return legacy def raw_interface_configs(): """ - Return raw configs for all interfaces + Return raw configs for all interfaces as returned by netsh. This command is + localized and will return different text depending on the locality of the + operating system. CLI Example: @@ -98,7 +137,18 @@ def raw_interface_configs(): def get_all_interfaces(): """ - Return configs for all interfaces + Return IP configuration for all network interfaces using the legacy + ``netsh``-compatible data format. + + Each interface is keyed by its alias and contains ``ip_addrs``, + ``ipv4_gateway``, ``dns_servers``, and related fields mirroring the + structure formerly produced by ``netsh interface ip show config``. + Prefer :func:`list_interfaces` with ``full=True`` for richer, always- + English output. + + Returns: + dict: A dictionary keyed by interface name. Each value is a + dict with the legacy ``netsh``-style fields. CLI Example: @@ -106,16 +156,16 @@ def get_all_interfaces(): salt -G 'os_family:Windows' ip.get_all_interfaces """ - return _interface_configs() + return _get_interfaces_legacy_format() def get_interface(iface): """ - Return the configuration of a network interface + Return the IP configuration of a single network interface Args: - iface (str): The name of the interface to manage + iface (str): The name of the interface CLI Example: @@ -123,7 +173,7 @@ def get_interface(iface): salt -G 'os_family:Windows' ip.get_interface 'Local Area Connection' """ - return _interface_configs().get(iface, {}) + return _get_interfaces_legacy_format(iface) def is_enabled(iface): @@ -140,15 +190,25 @@ def is_enabled(iface): salt -G 'os_family:Windows' ip.is_enabled 'Local Area Connection #2' """ - cmd = ["netsh", "interface", "show", "interface", f"name={iface}"] - iface_found = False - for line in __salt__["cmd.run"](cmd, python_shell=False).splitlines(): - if "Connect state:" in line: - iface_found = True - return line.split()[-1] == "Connected" - if not iface_found: + with salt.utils.win_pwsh.PowerShellSession() as session: + # Using SilentlyContinue ensures we get None/Empty if 'junk' is passed + cmd = f""" + [int](Get-NetAdapter -Name '{iface}' ` + -ErrorAction SilentlyContinue).AdminStatus + """ + status = session.run(cmd) + + try: + # Use 0 as a fallback for None to trigger your "not found" check + status = int(status if status is not None else 0) + except (ValueError, TypeError): + msg = f"Interface '{iface}' not found or invalid response." + raise CommandExecutionError(msg) + + if status == 0: raise CommandExecutionError(f"Interface '{iface}' not found") - return False + + return status == 1 # 1 is enabled def is_disabled(iface): @@ -165,7 +225,25 @@ def is_disabled(iface): salt -G 'os_family:Windows' ip.is_disabled 'Local Area Connection #2' """ - return not is_enabled(iface) + with salt.utils.win_pwsh.PowerShellSession() as session: + # Using SilentlyContinue ensures we get None/Empty if 'junk' is passed + cmd = f""" + [int](Get-NetAdapter -Name '{iface}' ` + -ErrorAction SilentlyContinue).AdminStatus + """ + status = session.run(cmd) + + try: + # Use 0 as a fallback for None to trigger your "not found" check + status = int(status if status is not None else 0) + except (ValueError, TypeError): + msg = f"Interface '{iface}' not found or invalid response." + raise CommandExecutionError(msg) + + if status == 0: + raise CommandExecutionError(f"Interface '{iface}' not found") + + return status == 2 # 2 is disabled def enable(iface): @@ -182,18 +260,7 @@ def enable(iface): salt -G 'os_family:Windows' ip.enable 'Local Area Connection #2' """ - if is_enabled(iface): - return True - cmd = [ - "netsh", - "interface", - "set", - "interface", - f"name={iface}", - "admin=ENABLED", - ] - __salt__["cmd.run"](cmd, python_shell=False) - return is_enabled(iface) + set_interface(iface, enabled=True) def disable(iface): @@ -210,18 +277,7 @@ def disable(iface): salt -G 'os_family:Windows' ip.disable 'Local Area Connection #2' """ - if is_disabled(iface): - return True - cmd = [ - "netsh", - "interface", - "set", - "interface", - f"name={iface}", - "admin=DISABLED", - ] - __salt__["cmd.run"](cmd, python_shell=False) - return is_disabled(iface) + set_interface(iface, enabled=False) def get_subnet_length(mask): @@ -262,10 +318,22 @@ def set_static_ip(iface, addr, gateway=None, append=False): Default is ``None``. append (:obj:`bool`, optional): - If ``True``, this IP address will be added to the interface. Default is - ``False``, which overrides any existing configuration for the interface - and sets ``addr`` as the only address on the interface. - Default is ``False``. + If ``True``, the address will be added as a secondary IP to the + interface. If ``False``, all existing IPv4 addresses are cleared + first. Defaults to ``False``. + + Returns: + dict: A dictionary with the applied settings, e.g.:: + + {"Address Info": ["192.168.1.5/24"], "Default Gateway": "192.168.1.1"} + + ``Default Gateway`` is only present when the ``gateway`` argument is + provided. + + Raises: + SaltInvocationError: If ``addr`` or ``gateway`` is not a valid IPv4 + address. + CommandExecutionError: If the address already exists on the interface. CLI Example: @@ -274,56 +342,55 @@ def set_static_ip(iface, addr, gateway=None, append=False): salt -G 'os_family:Windows' ip.set_static_ip 'Local Area Connection' 10.1.2.3/24 gateway=10.1.2.1 salt -G 'os_family:Windows' ip.set_static_ip 'Local Area Connection' 10.1.2.4/24 append=True """ - - def _find_addr(iface, addr, timeout=1): - ip, cidr = addr.rsplit("/", 1) - netmask = salt.utils.network.cidr_to_ipv4_netmask(cidr) - for idx in range(timeout): - for addrinfo in get_interface(iface).get("ip_addrs", []): - if addrinfo["IP Address"] == ip and addrinfo["Netmask"] == netmask: - return addrinfo - time.sleep(1) - return {} - if not salt.utils.validate.net.ipv4_addr(addr): raise SaltInvocationError(f"Invalid address '{addr}'") - if gateway and not salt.utils.validate.net.ipv4_addr(addr): + if gateway and not salt.utils.validate.net.ipv4_addr(gateway): raise SaltInvocationError(f"Invalid default gateway '{gateway}'") if "/" not in addr: - addr += "/32" + addr += "/24" - if append and _find_addr(iface, addr): - raise CommandExecutionError( - f"Address '{addr}' already exists on interface '{iface}'" - ) + ip, _ = addr.split("/") if "/" in addr else (addr, "24") - cmd = ["netsh", "interface", "ip"] - if append: - cmd.append("add") - else: - cmd.append("set") - cmd.extend(["address", f"name={iface}"]) - if not append: - cmd.append("source=static") - cmd.append(f"address={addr}") - if gateway: - cmd.append(f"gateway={gateway}") + with salt.utils.win_pwsh.PowerShellSession() as session: - result = __salt__["cmd.run_all"](cmd, python_shell=False) - if result["retcode"] != 0: - raise CommandExecutionError( - "Unable to set IP address: {}".format(result["stderr"]) - ) + # Get interface Index + index = get_interface_index(iface, session) - new_addr = _find_addr(iface, addr, timeout=10) - if not new_addr: - return {} + cmd = f""" + (Get-NetIPAddress -InterfaceIndex {index} ` + -AddressFamily IPv4 ` + -ErrorAction SilentlyContinue).IPAddress + """ + result = session.run(cmd) - ret = {"Address Info": new_addr} + if result is None: + exists = False + elif isinstance(result, list): + exists = ip in result + else: + exists = ip == result.strip() + + if exists: + msg = f"Address '{ip}' already exists on '{iface}'" + raise CommandExecutionError(msg) + + set_interface( + iface=iface, + ipv4_address=addr, + ipv4_gateways=gateway if gateway else None, + append=append, + ) + # Verify new settings + new_settings = get_interface_new(iface)[iface] + + ret = {"Address Info": new_settings["ipv4_address"]} if gateway: - ret["Default Gateway"] = gateway + gws = [g for g in new_settings["ipv4_gateways"] if g.get("ip")] + if gws: + ret["Default Gateway"] = gws[0]["ip"] + return ret @@ -336,15 +403,50 @@ def set_dhcp_ip(iface): iface (str): The name of the interface to manage + Returns: + dict: ``{}`` if DHCP was already enabled, otherwise + ``{"Interface": , "DHCP enabled": "Yes"}``. + + Raises: + CommandExecutionError: If DHCP cannot be enabled. + CLI Example: .. code-block:: bash salt -G 'os_family:Windows' ip.set_dhcp_ip 'Local Area Connection' """ - cmd = ["netsh", "interface", "ip", "set", "address", iface, "dhcp"] - __salt__["cmd.run"](cmd, python_shell=False) - return {"Interface": iface, "DHCP enabled": "Yes"} + with salt.utils.win_pwsh.PowerShellSession() as session: + index = get_interface_index(iface, session) + + # Check if dhcp is already enabled + cmd = f""" + [int](Get-NetIPInterface -InterfaceIndex {index} ` + -AddressFamily IPv4 ` + -ErrorAction SilentlyContinue).Dhcp + """ + dhcp_enabled = session.run(cmd) + + if dhcp_enabled == 1: + return {} + + # Enable DHCP — set_interface manages its own session + set_interface(iface, ipv4_dhcp=True) + + with salt.utils.win_pwsh.PowerShellSession() as session: + index = get_interface_index(iface, session) + + # Verify that dhcp is enabled + cmd = f""" + (Get-NetIPInterface -InterfaceIndex {index} ` + -AddressFamily IPv4 ` + -ErrorAction SilentlyContinue).Dhcp + """ + dhcp_enabled = session.run_json(cmd) + if dhcp_enabled == 1: + return {"Interface": iface, "DHCP enabled": "Yes"} + else: + raise CommandExecutionError("Failed to enable DHCP") def set_static_dns(iface, *addrs): @@ -370,53 +472,74 @@ def set_static_dns(iface, *addrs): salt -G 'os_family:Windows' ip.set_static_dns 'Local Area Connection' '192.168.1.1' salt -G 'os_family:Windows' ip.set_static_dns 'Local Area Connection' '192.168.1.252' '192.168.1.253' """ + # Addrs is undefined or None, No Changes if not addrs or str(addrs[0]).lower() == "none": return {"Interface": iface, "DNS Server": "No Changes"} + # Clear the list of DNS servers if [] is passed if str(addrs[0]).lower() == "[]": - log.debug("Clearing list of DNS servers") - cmd = [ - "netsh", - "interface", - "ip", - "set", - "dns", - f"name={iface}", - "source=static", - "address=none", - ] - __salt__["cmd.run"](cmd, python_shell=False) - return {"Interface": iface, "DNS Server": []} - addr_index = 1 - for addr in addrs: - if addr_index == 1: - cmd = [ - "netsh", - "interface", - "ip", - "set", - "dns", - f"name={iface}", - "source=static", - f"address={addr}", - "register=primary", + # Use set_dhcp_dns to reset the interface + return set_dhcp_dns(iface) + + # Get interface Index + index = get_interface_index(iface) + + with salt.utils.win_pwsh.PowerShellSession() as session: + # 1. Fetch current DNS to see if work is actually needed + cmd = f""" + (Get-DnsClientServerAddress -InterfaceIndex {index} ` + -AddressFamily IPv4 ` + -ErrorAction SilentlyContinue).ServerAddresses + """ + current_dns = session.run(cmd) + + # If current_dns is None (empty stack), make it an empty list + if current_dns is None: + current_dns = [] + # If it's a string (single IP), put it in a list + elif isinstance(current_dns, str): + # Split by newlines/commas and strip whitespace + current_dns = [ + d.strip() + for d in current_dns.replace(",", "\n").splitlines() + if d.strip() ] - __salt__["cmd.run"](cmd, python_shell=False) - addr_index = addr_index + 1 - else: - cmd = [ - "netsh", - "interface", - "ip", - "add", - "dns", - f"name={iface}", - f"address={addr}", - f"index={addr_index}", + + # 2. Check if there's anything to set + requested_dns = list(addrs) + if set(current_dns) == set(requested_dns): + return {} + + # 3. Set static list (comma-separated for PowerShell) + dns_str = ",".join([f"'{a}'" for a in requested_dns]) + cmd = f""" + Set-DnsClientServerAddress -InterfaceIndex {index} ` + -ServerAddresses ({dns_str}) + """ + session.run(cmd) + + # 4. Verify successful changes + cmd = f""" + (Get-DnsClientServerAddress -InterfaceIndex {index} ` + -AddressFamily IPv4 ` + -ErrorAction SilentlyContinue).ServerAddresses + """ + current_dns = session.run(cmd) + # If current_dns is None (empty stack), make it an empty list + if current_dns is None: + current_dns = [] + # If it's a string (single IP), put it in a list + elif isinstance(current_dns, str): + # Split by newlines/commas and strip whitespace + current_dns = [ + d.strip() + for d in current_dns.replace(",", "\n").splitlines() + if d.strip() ] - __salt__["cmd.run"](cmd, python_shell=False) - addr_index = addr_index + 1 - return {"Interface": iface, "DNS Server": addrs} + if set(current_dns) == set(requested_dns): + return {"Interface": iface, "DNS Server": current_dns} + + raise CommandExecutionError("Failed to set DNS.") def set_dhcp_dns(iface): @@ -427,15 +550,56 @@ def set_dhcp_dns(iface): iface (str): The name of the interface to manage + Returns: + dict: ``{}`` if DNS was already set to automatic (no static servers + configured), otherwise ``{"Interface": , "DNS Server": "DHCP (Empty)"}``. + + Raises: + CommandExecutionError: If the DNS reset cannot be verified. + CLI Example: .. code-block:: bash salt -G 'os_family:Windows' ip.set_dhcp_dns 'Local Area Connection' """ - cmd = ["netsh", "interface", "ip", "set", "dns", iface, "dhcp"] - __salt__["cmd.run"](cmd, python_shell=False) - return {"Interface": iface, "DNS Server": "DHCP"} + with salt.utils.win_pwsh.PowerShellSession() as session: + # 1. Get the interface index + index = get_interface_index(iface, session) + + # 2. Fetch current DNS to see if work is actually needed + cmd = f""" + (Get-DnsClientServerAddress -InterfaceIndex {index} ` + -AddressFamily IPv4 ` + -ErrorAction SilentlyContinue).ServerAddresses + """ + current_dns = session.run(cmd) + + # If current_dns is None (empty stack), it's already set + if not current_dns: + return {} + + # 3. Apply the reset. + # This is the PowerShell equivalent of clicking 'Obtain automatically' + cmd = f""" + Set-DnsClientServerAddress -InterfaceIndex {index} ` + -ResetServerAddresses + """ + session.run(cmd) + + # 4. Verify by checking if ServerAddresses is now empty (or managed by DHCP) + # On many interfaces (like Loopback), a reset results in an empty list. + cmd = f""" + (Get-DnsClientServerAddress -InterfaceIndex {index} ` + -AddressFamily IPv4).ServerAddresses + """ + res = session.run(cmd) + + # If it returns None or an empty string, it's successfully reset/empty + if not res: + return {"Interface": iface, "DNS Server": "DHCP (Empty)"} + + raise CommandExecutionError("Failed to set DNS source to DHCP.") def set_dhcp_all(iface): @@ -446,6 +610,12 @@ def set_dhcp_all(iface): iface (str): The name of the interface to manage + Returns: + dict: ``{"Interface": , "DNS Server": "DHCP", "DHCP enabled": "Yes"}`` + + Raises: + CommandExecutionError: If either the IP or DNS cannot be switched to DHCP. + CLI Example: .. code-block:: bash @@ -457,25 +627,899 @@ def set_dhcp_all(iface): return {"Interface": iface, "DNS Server": "DHCP", "DHCP enabled": "Yes"} -def get_default_gateway(): +def get_default_gateway(iface=None): """ - Set DNS source to DHCP on Windows + Get the Default Gateway on Windows. + + Args: + + iface (str, optional): + The name or alias of the interface to query (e.g., 'Ethernet'). + If provided, the function returns the default gateway specific + to that interface. If ``None`` (default), it returns the + system-wide primary default gateway based on the lowest route + metric. + + Returns: + str: The IP address of the default gateway. + + Raises: + CommandExecutionError: If no default gateway is found. CLI Example: .. code-block:: bash - salt -G 'os_family:Windows' ip.get_default_gateway + # Get the system's primary default gateway + salt 'minion' ip.get_default_gateway + + # Get the gateway for a specific interface + salt 'minion' ip.get_default_gateway iface='Local Area Connection' """ - try: - return next( - iter( - x.split()[-1] - for x in __salt__["cmd.run"]( - ["netsh", "interface", "ip", "show", "config"], python_shell=False - ).splitlines() - if "Default Gateway:" in x + with salt.utils.win_pwsh.PowerShellSession() as session: + + if iface: + index = get_interface_index(iface) + cmd = f""" + Get-NetRoute -DestinationPrefix '0.0.0.0/0' ` + -InterfaceIndex {index} ` + -ErrorAction SilentlyContinue | + Sort-Object RouteMetric | + Select-Object -First 1 -ExpandProperty NextHop + """ + else: + cmd = """ + Get-NetRoute -DestinationPrefix '0.0.0.0/0' ` + -ErrorAction SilentlyContinue | + Sort-Object RouteMetric | + Select-Object -First 1 -ExpandProperty NextHop + """ + + gateway = session.run(cmd) + + if not gateway: + raise CommandExecutionError("Unable to find default gateway") + + return gateway + + +def get_interface_index(iface, session=None): + """ + Return the integer ``ifIndex`` for the named interface. + + Args: + + iface (str): + The interface name or alias (e.g., ``'Ethernet'``). + + session (:class:`~salt.utils.win_pwsh.PowerShellSession`, optional): + An existing ``PowerShellSession`` to reuse. If ``None`` (default), + a new session is opened for this call only. + + Returns: + int: The interface index. + + Raises: + CommandExecutionError: If the interface cannot be found. + + CLI Example: + + .. code-block:: bash + + salt -G 'os_family:Windows' ip.get_interface_index 'Ethernet' + """ + cmd = f""" + $adapter = Get-NetAdapter -Name "{iface}" ` + -ErrorAction SilentlyContinue + if (-not $adapter) {{ + $adapter = Get-NetIPInterface -InterfaceAlias "{iface}" ` + -ErrorAction SilentlyContinue | + Select-Object -First 1 + }} + if ($adapter) {{ $adapter.ifIndex }} else {{ "0" }} + """ + + def _run(s): + raw_index = s.run(cmd) + try: + index = int(raw_index) + except (ValueError, TypeError): + raise CommandExecutionError( + f"Interface not found or not initialized: {iface}" + ) + if index == 0: + raise CommandExecutionError(f"Interface not found: {iface}") + return index + + if session is not None: + return _run(session) + with salt.utils.win_pwsh.PowerShellSession() as s: + return _run(s) + + +def set_interface( + iface, + alias=None, + enabled=None, + ipv4_enabled=None, + ipv4_address=None, + ipv4_dhcp=None, + ipv4_dns=None, + ipv4_forwarding=None, + ipv4_gateways=None, + ipv4_metric=None, + ipv4_wins=None, + ipv4_netbios=None, + ipv6_enabled=None, + ipv6_address=None, + ipv6_dhcp=None, + ipv6_dns=None, + ipv6_forwarding=None, + ipv6_gateways=None, + ipv6_metric=None, + dns_register=None, + dns_suffix=None, + mtu=None, + append=False, +): + """ + Configures a network interface on Windows. + + This function provides a context-aware interface for managing adapter + properties, protocol bindings, and IP configurations. It utilizes the + InterfaceIndex to ensure stability during renames. + + **Understanding Metrics on Windows:** + Windows calculates route priority by summing the **Interface Metric** + (protocol level) and the **Route Metric** (gateway level). + + * **Interface Metric:** Set via ``ipvX_metric``. A value of ``0`` + enables 'Automatic Metric', where Windows assigns priority based on link + speed. + * **Route Metric:** Set within the ``ipvX_gateways`` objects. + * **Total Cost:** Interface Metric + Route Metric. The lowest total + cost determines the primary route. + + **Context-Aware Behavior:** + The function identifies the current state of protocol bindings (IPv4/IPv6) + before applying settings. + + * If a protocol is disabled and ``ipvX_enabled`` is not passed as ``True``, + configuration for that stack (IPs, DNS, etc.) is skipped to prevent + errors. + * If ``ipvX_dhcp`` is enabled, static IP and gateway configurations are + ignored by the OS; therefore, this function skips applying static + addresses unless DHCP is ``False``. + + Args: + + iface (str): + The current name or alias of the interface (e.g., 'Ethernet'). + + alias (str, optional): + A new name for the interface. + + enabled (bool, optional): + Administrative status of the adapter. + + ipv4_enabled (bool, optional): + Whether the IPv4 protocol is bound to this adapter. If ``None``, the + function discovers current state. + + ipv4_address (list, optional): + An IPv4 address or list of addresses in CIDR notation + (e.g., ``['192.168.1.5/24']``). + + .. note:: + If a CIDR prefix is not provided, it will default to ``/24``. + + ipv4_dhcp (bool, optional): + Set to ``True`` to enable DHCP, ``False`` for static. + + ipv4_dns (list, optional): + A list of IPv4 DNS server addresses. Passing an empty list ``[]`` + resets DNS to automatic. + + ipv4_forwarding (bool, optional): + Enables or disables **IP Forwarding** for the IPv4 stack. When + ``True``, this allows the Windows machine to act as a router, + passing traffic between this interface and others. + Default is ``None`` (no change). + + ipv4_gateways (list, str, dict): + The default gateway(s). Accepts multiple formats: + + * **String:** A single IP address (e.g., ``'192.168.1.1'``). + * **List of Strings:** Multiple gateways (e.g., ``['1.1.1.1', '1.0.0.1']``). + * **Dictionary:** A single gateway with a specific route metric + (e.g., ``{'ip': '192.168.1.1', 'metric': 5}``). + * **List of Dictionaries:** Multiple gateways with individual metrics + (e.g., ``[{'ip': '1.1.1.1', 'metric': 2}, {'ip': '1.0.0.1', 'metric': 10}]``). + + If a metric is not specified within a dictionary, or if a string/list + of strings is provided, the function falls back to ``ipv4_metric``. + + ipv4_metric (int, optional): + The IPv4 interface metric. Use ``0`` for Automatic. + + ipv4_wins (list, optional): + A list of up to two WINS server addresses. + + ipv4_netbios (str, optional): + Configures NetBIOS over TCP/IP behavior via the MSFT_NetIPInterface + CIM class. + + * ``Default`` (0): Defer to DHCP server settings. + * ``Enable`` (1): Explicitly enable NetBIOS. + * ``Disable`` (2): Explicitly disable NetBIOS. + + ipv6_enabled (bool, optional): + Whether the IPv6 protocol is bound. + + ipv6_address (list, optional): + An IPv6 address or list of addresses in CIDR notation + (e.g., ``['2001:db8::1/64']``). + + .. note:: + If a CIDR prefix is not provided, it will default to ``/64``. + + ipv6_dhcp (bool, optional): + Set to ``True`` for IPv6 stateful configuration. + + ipv6_dns (list, optional): + A list of IPv6 DNS server addresses. + + ipv6_forwarding (bool, optional): + Enables or disables **IP Forwarding** for the IPv6 stack. When + ``True``, this allows the Windows machine to act as a router, + passing traffic between this interface and others. + Default is ``None`` (no change). + + ipv6_gateways (list, optional): + The default gateway(s) for IPv6. Accepts multiple formats: + + * **String:** A single IPv6 address (e.g., ``'2001:db8::1'``). + * **List of Strings:** Multiple gateways (e.g., ``['2001:db8::1', 'fe80::1']``). + * **Dictionary:** A single gateway with a specific route metric + (e.g., ``{'ip': '2001:db8::1', 'metric': 5}``). + * **List of Dictionaries:** Multiple gateways with individual metrics + (e.g., ``[{'ip': '2001:db8::1', 'metric': 2}, {'ip': 'fe80::1', 'metric': 10}]``). + + If a metric is not specified within a dictionary, or if a string/list + of strings is provided, the function falls back to ``ipv6_metric``. + Note that link-local gateways (starting with ``fe80::``) are fully + supported as Windows scopes them via the interface index. + + ipv6_metric (int, optional): + The IPv6 interface metric. Use ``0`` for Automatic. + + dns_register (bool, optional): + Controls whether the interface's IP addresses are registered in DNS + using the computer's primary DNS suffix. + + * ``True`` (Default in Windows): Corresponds to "Primary only" in + legacy output. The computer will attempt to dynamically update + its DNS record (A/AAAA) on the DNS server. + * ``False``: Corresponds to "None". The computer will not attempt + to register its name for this specific connection. + + dns_suffix (str, optional): + Sets the **Connection-Specific DNS Suffix** (e.g., + ``corp.example.com``). This value is used in DNS registration and + resolution but does not change the global primary DNS suffix of the + computer. + + mtu (int, optional): + Configures the **Maximum Transmission Unit** size for the physical + adapter. Accepts values between ``576`` and ``9000``. Common uses + include setting ``9000`` for Jumbo Frames in iSCSI/Storage networks + or lower values for VPN compatibility. + + append (bool): + If ``True``, the provided IPv4 and IPv6 addresses will be added + to the interface without removing existing ones. If ``False`` (default), + all existing IPv4/IPv6 addresses on the interface will be removed + before applying the new configuration. + + .. note:: + This flag only applies to IP addresses. Gateways (Default Routes) + are always replaced if a new gateway is provided to ensure + routing stability. + + Returns: + bool: ``True`` if successful, otherwise raises an exception. + + CLI Examples: + + .. code-block:: bash + + # Set static IPv4 with a specific metric + salt-call --local win_ip.set_interface "Ethernet" ipv4_dhcp=False \\ + ipv4_address="['192.168.1.10/24']" ipv4_metric=10 + + # Set multiple gateways with different route priorities + salt-call --local win_ip.set_interface "Test-Iface" \\ + ipv4_gateways="[{'ip': '10.0.0.1', 'metric': 2}, {'ip': '10.0.0.2', 'metric': 100}]" + + # Reset DNS to automatic/DHCP + salt '*' win_ip.set_interface "Wi-Fi" ipv4_dns="[]" + + # Rename an interface and enable IPv6 + salt-call --local win_ip.set_interface "Ethernet 2" alias="Production" \\ + ipv6_enabled=True ipv6_dhcp=True + """ + # 1. Input Validation + if ipv4_netbios and ipv4_netbios.lower() not in ["default", "enable", "disable"]: + raise SaltInvocationError(f"Invalid NetBIOS setting: {ipv4_netbios}") + + if mtu is not None and (mtu < 576 or mtu > 9000): + raise SaltInvocationError("MTU must be between 576 and 9000.") + + ipv4_addrs = ( + ipv4_address + if isinstance(ipv4_address, list) + else ([ipv4_address] if ipv4_address else []) + ) + ipv6_addrs = ( + ipv6_address + if isinstance(ipv6_address, list) + else ([ipv6_address] if ipv6_address else []) + ) + + # 1. Identity & Setup + index = get_interface_index(iface) + + ps_script = textwrap.dedent( + f""" + $idx = {index} + $ErrorActionPreference = 'Stop' + # Internal Sanitizer for DHCP transitions + function Sanitize-Stack {{ + param([int]$i, [string]$family) + $stack = if($family -eq 'IPv4') {{ 'Tcpip' }} else {{ 'Tcpip6' }} + $guid = (Get-NetAdapter -InterfaceIndex $i).DeviceGuid + $reg = "HKLM:\\\\SYSTEM\\\\CurrentControlSet\\\\Services\\\\$stack\\\\Parameters\\\\Interfaces\\\\$guid" + if (Test-Path $reg) {{ + Set-ItemProperty $reg 'NameServer' '' -ErrorAction SilentlyContinue + Set-ItemProperty $reg 'DefaultGateway' ([string[]]@()) -ErrorAction SilentlyContinue + if ($family -eq 'IPv4') {{ + Set-ItemProperty $reg 'WINSPrimaryServer' '' -ErrorAction SilentlyContinue + Set-ItemProperty $reg 'WINSSecondaryServer' '' -ErrorAction SilentlyContinue + $bt = "HKLM:\\\\SYSTEM\\\\CurrentControlSet\\\\Services\\\\NetBT\\\\Parameters\\\\Interfaces\\\\TcpIp_$guid" + if (Test-Path $bt) {{ Set-ItemProperty $bt 'NameServerList' ([string[]]@()) -ErrorAction SilentlyContinue }} + # THE NUCLEAR ADDITION: Flush the WMI Cache specifically + $wmi = Get-CimInstance Win32_NetworkAdapterConfiguration -Filter "InterfaceIndex = $i" -ErrorAction SilentlyContinue + if ($wmi) {{ + $wmi | Invoke-CimMethod -MethodName SetWINSServer -Arguments @{{WINSPrimaryServer=''; WINSSecondaryServer=''}} -ErrorAction SilentlyContinue + }} + }} + }} + $prefix = if($family -eq 'IPv4') {{ '0.0.0.0/0' }} else {{ '::/0' }} + $r = try {{ Get-NetRoute -InterfaceIndex $idx -DestinationPrefix $prefix -ErrorAction SilentlyContinue }} catch {{ $null }} + if ($r) {{ try {{ $r | Remove-NetRoute -Confirm:$false }} catch {{ }} }} + $a = try {{ Get-NetIPAddress -InterfaceIndex $idx -AddressFamily $family -ErrorAction SilentlyContinue | Where-Object {{ $_.PrefixOrigin -eq 'Manual' }} }} catch {{ $null }} + if ($a) {{ try {{ $a | Remove-NetIPAddress -Confirm:$false }} catch {{ }} }} + Set-DnsClientServerAddress -InterfaceIndex $idx -ResetServerAddresses -ErrorAction SilentlyContinue + }} + """ + ) + + # 2. Administrative State & MTU + if enabled is not None: + action = "Enable-NetAdapter" if enabled else "Disable-NetAdapter" + ps_script += textwrap.dedent( + f""" + Get-NetAdapter -InterfaceIndex $idx | {action} -Confirm:$false""" + ) + + if mtu is not None: + ps_script += textwrap.dedent( + f""" + Set-NetIPInterface -InterfaceIndex $idx ` + -AddressFamily IPv4 ` + -NlMtuBytes {mtu} ` + -Confirm:$false""" + ) + # Applying to IPv6 as well for a consistent MTU across the interface + ps_script += textwrap.dedent( + f""" + Set-NetIPInterface -InterfaceIndex $idx ` + -AddressFamily IPv6 ` + -NlMtuBytes {mtu} ` + -Confirm:$false ` + -ErrorAction SilentlyContinue""" + ) + + # 3. Protocol Binding & Basic Interface Settings + for family, active, dhcp, forward, metric in [ + ("IPv4", ipv4_enabled, ipv4_dhcp, ipv4_forwarding, ipv4_metric), + ("IPv6", ipv6_enabled, ipv6_dhcp, ipv6_forwarding, ipv6_metric), + ]: + if active is not None: + comp = "ms_tcpip" if family == "IPv4" else "ms_tcpip6" + action = ( + "Enable-NetAdapterBinding" if active else "Disable-NetAdapterBinding" + ) + # SilentlyContinue: the binding toggle is idempotent and some adapter + # types (e.g. loopback) emit non-fatal errors for already-set states. + ps_script += textwrap.dedent( + f""" + Get-NetAdapter -InterfaceIndex $idx | {action} -ComponentID '{comp}' -ErrorAction SilentlyContinue""" + ) + + # IP Interface Properties (DHCP, Forwarding, Metric) + if any(x is not None for x in [dhcp, forward, metric]): + cmd = f"Set-NetIPInterface -InterfaceIndex $idx -AddressFamily {family}" + if dhcp is not None: + cmd += f" -Dhcp {'Enabled' if dhcp else 'Disabled'}" + if forward is not None: + cmd += f" -Forwarding {'Enabled' if forward else 'Disabled'}" + if metric is not None: + if metric == 0: + cmd += " -AutomaticMetric Enabled" + else: + cmd += f" -AutomaticMetric Disabled -InterfaceMetric {metric}" + ps_script += textwrap.dedent( + f""" + {cmd}""" ) + + # Trigger Sanitizer if enabling DHCP + if dhcp: + ps_script += textwrap.dedent( + f""" + Sanitize-Stack -i $idx -family '{family}'""" + ) + + # 4. IP Addresses (Static) + for family, addrs, dhcp in [ + ("IPv4", ipv4_addrs, ipv4_dhcp), + ("IPv6", ipv6_addrs, ipv6_dhcp), + ]: + if addrs and not dhcp: + if not append: + ps_script += textwrap.dedent( + f""" + $currentIPs = try {{ + Get-NetIPAddress -InterfaceIndex $idx ` + -AddressFamily {family} ` + -ErrorAction SilentlyContinue | + Where-Object {{ $_.IPAddress -notlike 'fe80*' -and $_.PrefixOrigin -eq 'Manual' }} + }} catch {{ $null }} + if ($currentIPs) {{ + try {{ $currentIPs | Remove-NetIPAddress -Confirm:$false }} catch {{ }} + }} + """ + ) + + # 'addrs' is now GUARANTEED to be a list of strings + for a in addrs: + # Split logic + parts = a.split("/") + ip = parts[0] + pref = ( + parts[1] if len(parts) > 1 else ("24" if family == "IPv4" else "64") + ) + + ps_script += textwrap.dedent( + f""" + New-NetIPAddress -InterfaceIndex $idx ` + -IPAddress '{ip}' ` + -PrefixLength {pref} ` + -AddressFamily {family} + """ + ) + + # 5. Gateways (Default Routes) + for family, gateways, metric_fallback in [ + ("IPv4", ipv4_gateways, ipv4_metric), + ("IPv6", ipv6_gateways, ipv6_metric), + ]: + if gateways is not None: + prefix = "0.0.0.0/0" if family == "IPv4" else "::/0" + # Clear existing default routes first + ps_script += textwrap.dedent( + f""" + $routes = try {{ + Get-NetRoute -InterfaceIndex $idx ` + -DestinationPrefix '{prefix}' ` + -ErrorAction SilentlyContinue + }} catch {{ $null }} + if ($routes) {{ + try {{ $routes | Remove-NetRoute -Confirm:$false }} catch {{ }} + }}""" + ) + + gw_list = gateways if isinstance(gateways, list) else [gateways] + for gw in gw_list: + if isinstance(gw, dict): + ip, met = gw.get("ip"), gw.get( + "metric", metric_fallback if metric_fallback else 0 + ) + else: + ip, met = gw, (metric_fallback if metric_fallback else 0) + if ip: + ps_script += textwrap.dedent( + f""" + New-NetRoute -InterfaceIndex $idx ` + -DestinationPrefix '{prefix}' ` + -NextHop '{ip}' ` + -RouteMetric {met} ` + -Confirm:$false""" + ) + + # 6. DNS, WINS, and NetBIOS + if dns_suffix: + ps_script += textwrap.dedent( + f""" + Set-DnsClient -InterfaceIndex $idx ` + -ConnectionSpecificSuffix '{dns_suffix}'""" ) - except StopIteration: - raise CommandExecutionError("Unable to find default gateway") + if dns_register is not None: + ps_script += textwrap.dedent( + f""" + Set-DnsClient -InterfaceIndex $idx ` + -RegisterThisConnectionsAddress {'$true' if dns_register else '$false'}""" + ) + + for family, dns_servers in [("IPv4", ipv4_dns), ("IPv6", ipv6_dns)]: + if dns_servers is not None: + if not dns_servers: + ps_script += textwrap.dedent( + f""" + Set-DnsClientServerAddress -InterfaceIndex $idx ` + -AddressFamily {family} ` + -ResetServerAddresses""" + ) + else: + s_list = dns_servers if isinstance(dns_servers, list) else [dns_servers] + # 1. Format the PowerShell array string first + # Result: "@('8.8.8.8','1.1.1.1')" + dns_array_str = "@(" + ",".join([f"'{s}'" for s in s_list]) + ")" + ps_script += textwrap.dedent( + f""" + Set-DnsClientServerAddress -InterfaceIndex $idx ` + -ServerAddresses {dns_array_str}""" + ) + + if ipv4_wins is not None: + w = ipv4_wins if isinstance(ipv4_wins, list) else [ipv4_wins] + p, s = (w[0] if len(w) > 0 else ""), (w[1] if len(w) > 1 else "") + ps_script += textwrap.dedent( + f""" + $w = Get-CimInstance Win32_NetworkAdapterConfiguration -Filter "InterfaceIndex = $idx" + if($w) {{ + $w | Invoke-CimMethod -MethodName SetWINSServer ` + -Arguments @{{WINSPrimaryServer='{p}'; WINSSecondaryServer='{s}'}} + }}""" + ) + + if ipv4_netbios: + nb_val = {"default": 0, "enable": 1, "disable": 2}[ipv4_netbios.lower()] + ps_script += textwrap.dedent( + f""" + Set-NetIPInterface -InterfaceIndex $idx ` + -AddressFamily IPv4 ` + -NetbiosSetting {nb_val}""" + ) + + # 7. Rename & Finalize + if alias and alias != iface: + ps_script += textwrap.dedent( + f""" + Get-NetAdapter -InterfaceIndex $idx | + Rename-NetAdapter -NewName '{alias}' ` + -Confirm:$false""" + ) + + ps_script += textwrap.dedent( + """ + $iface = Get-NetAdapter -InterfaceIndex $idx ` + -ErrorAction SilentlyContinue + if ($iface.AdminStatus -eq 1 -and $iface.MediaConnectionState -eq 1) { + $timeout = [System.Diagnostics.Stopwatch]::StartNew() + while ($timeout.Elapsed.TotalSeconds -lt 10) { + # Check for any addresses still in the 'Tentative' state. + # Wrap in try/catch: Get-NetIPAddress throws a terminating CIM error + # when the IP stack is disabled, which -ErrorAction cannot suppress. + $tentative = try { + Get-NetIPAddress -InterfaceIndex $idx -ErrorAction SilentlyContinue | + Where-Object { $_.AddressState -eq 'Tentative' } + } catch { $null } + if (-not $tentative) { + break + } + Start-Sleep -Milliseconds 200 + } + } + """ + ) + + # 8. Execution + with salt.utils.win_pwsh.PowerShellSession() as session: + session.import_modules(["NetAdapter", "NetTCPIP", "DnsClient"]) + session.run_strict(ps_script) + + return True + + +def get_interface_new(iface): + """ + Retrieves the current configuration of a network interface on Windows. + + This function gathers comprehensive data about a network adapter, including + administrative status, protocol bindings, IP addresses, DNS servers, + gateways, and WINS configuration. The returned dictionary is structured + to be directly compatible with the parameters of ``set_interface``. + + **Data Structures and Round-trip Logic:** + + * **Addresses:** IPs are returned in CIDR notation (e.g., ``10.0.0.5/24``) + to ensure the setter can accurately recreate the subnet mask. + * **Gateways:** Returned as a list of dictionaries containing both the + IP (``ip``) and the route-specific metric (``metric``). + * **Metrics:** A value of ``0`` indicates that the interface is + configured for 'Automatic Metric' calculation by Windows. + + Args: + + iface (str): + The name or alias of the interface (e.g., 'Ethernet'). This is used + for the initial hardware lookup to find the permanent ``InterfaceIndex``. + + Returns: + dict: A dictionary keyed by the interface name containing: + + - **alias** (str): The friendly name of the adapter. + - **description** (str): Human-readable adapter description + (e.g., ``"Microsoft Loopback Adapter"``). + - **enabled** (bool): Administrative status (``True`` = Up). + - **index** (int): The stable ``InterfaceIndex`` used internally + by all CIM/PowerShell cmdlets. + - **link_status** (str): Current link state reported by the driver + (e.g., ``"Up"``, ``"Disconnected"``). + - **mac_address** (str): The physical (MAC) address of the adapter. + - **mtu** (int): MTU in bytes; defaults to ``1500`` when the + adapter reports no value (``4294967295`` or ``None``). + - **dns_register** (bool): ``True`` if the adapter registers its + addresses in DNS using the computer's primary DNS suffix. + - **dns_suffix** (str): Connection-specific DNS suffix, or + ``None`` if not set. + - **ipv4_enabled** (bool): Status of the IPv4 protocol binding. + - **ipv4_dhcp** (bool): ``True`` if IPv4 DHCP is enabled. + - **ipv4_metric** (int): The IPv4 interface metric (``0`` = Auto). + - **ipv4_address** (list): List of IPv4 addresses in CIDR format. + - **ipv4_gateways** (list): List of dicts, each with ``ip`` and + ``metric`` keys, for IPv4 default routes. Always a list — even + when only one gateway is configured — because + :func:`_normalize_gateway_fields` corrects the PowerShell 5.1 + behavior of unwrapping single-element arrays. + - **ipv4_dns** (list): List of IPv4 DNS server addresses. + - **ipv4_forwarding** (bool | None): ``True`` if IPv4 forwarding + is enabled, ``None`` if the IPv4 stack is not bound. + - **ipv4_wins** (list): Primary and secondary WINS server + addresses (empty list if none configured). + - **ipv4_netbios** (str): NetBIOS configuration — + ``"Default"``, ``"Enable"``, or ``"Disable"``. + - **ipv6_enabled** (bool): Status of the IPv6 protocol binding. + - **ipv6_dhcp** (bool): ``True`` if IPv6 stateful config is + enabled. + - **ipv6_metric** (int): The IPv6 interface metric (``0`` = Auto). + - **ipv6_address** (list): List of IPv6 addresses in CIDR format. + - **ipv6_gateways** (list): List of dicts, each with ``ip`` and + ``metric`` keys, for IPv6 default routes. Always a list for the + same reason as ``ipv4_gateways``. + - **ipv6_dns** (list): List of IPv6 DNS server addresses. + - **ipv6_forwarding** (bool | None): ``True`` if IPv6 forwarding + is enabled, ``None`` if the IPv6 stack is not bound. + + Raises: + CommandExecutionError: If the specified interface cannot be found. + + CLI Examples: + + .. code-block:: bash + + # Get details for a specific interface + salt-call --local win_ip.get_interface_new "Ethernet" + + # Get details for the loopback test adapter + salt '*' win_ip.get_interface_new "Test-Iface" + """ + index = get_interface_index(iface) + + # ONE script to rule them all + cmd = rf""" + $idx = {index} + $adapter = Get-NetAdapter -InterfaceIndex $idx -ErrorAction SilentlyContinue + if (-not $adapter) {{ return @{{}} | ConvertTo-Json }} + + $ipv4 = Get-NetIPInterface -InterfaceIndex $idx -AddressFamily IPv4 -ErrorAction SilentlyContinue + $ipv6 = Get-NetIPInterface -InterfaceIndex $idx -AddressFamily IPv6 -ErrorAction SilentlyContinue + $dns = Get-DnsClientServerAddress -InterfaceIndex $idx -ErrorAction SilentlyContinue + $reg = Get-DnsClient -InterfaceIndex $idx -ErrorAction SilentlyContinue + $wmi = Get-CimInstance Win32_NetworkAdapterConfiguration -Filter "InterfaceIndex = $idx" -ErrorAction SilentlyContinue + + $rawMtu = if ($ipv4) {{ $ipv4.NlMtu }} elseif ($ipv6) {{ $ipv6.NlMtu }} else {{ $adapter.MtuSize }} + $finalMtu = if ($rawMtu -eq 4294967295 -or $null -eq $rawMtu) {{ 1500 }} else {{ $rawMtu }} + + # Helper to format IPs into CIDR + function Get-CIDR {{ + param($family) + $ips = Get-NetIPAddress -InterfaceIndex $idx -AddressFamily $family -ErrorAction SilentlyContinue | + Where-Object {{ $_.AddressState -eq 'Preferred' -and $_.IPAddress -notlike 'fe80*' }} + if ($ips) {{ @($ips | ForEach-Object {{ "$($_.IPAddress)/$($_.PrefixLength)" }}) }} else {{ @() }} + }} + + # Helper to format Gateways + function Get-Gateways {{ + param($family) + $prefix = if($family -eq 'IPv4') {{ "0.0.0.0/0" }} else {{ "::/0" }} + $routes = Get-NetRoute -InterfaceIndex $idx -DestinationPrefix $prefix -ErrorAction SilentlyContinue | + Where-Object {{ $_.NextHop -and $_.NextHop -notmatch '^0\.0\.0\.0$|^::$' }} + if ($routes) {{ + @($routes | Select-Object @{{Name='ip';Expression={{$_.NextHop}}}}, @{{Name='metric';Expression={{$_.RouteMetric}}}}) + }} else {{ @() }} + }} + + $result = [PSCustomObject]@{{ + alias = $adapter.Name + description = $adapter.InterfaceDescription + dns_register = $reg.RegisterThisConnectionsAddress + dns_suffix = $reg.ConnectionSpecificSuffix + enabled = $adapter.AdminStatus -eq 1 + index = $idx + link_status = $adapter.Status + mac_address = $adapter.MacAddress + mtu = [int]$finalMtu + + # IPv4 Stack + ipv4_address = Get-CIDR -family IPv4 + ipv4_dhcp = if($ipv4) {{ $ipv4.Dhcp -eq 1 }} else {{ $false }} + ipv4_dns = @($dns | Where-Object {{$_.AddressFamily -eq 2}} | Select-Object -ExpandProperty ServerAddresses) + ipv4_enabled = $null -ne $ipv4 + ipv4_forwarding = if ($ipv4) {{ $ipv4.Forwarding -eq 1 }} else {{ $null }} + ipv4_gateways = Get-Gateways -family IPv4 + ipv4_metric = if($ipv4) {{ $ipv4.InterfaceMetric }} else {{ 0 }} + ipv4_netbios = switch($ipv4.NetbiosSetting) {{ 1 {{"Enable"}} 2 {{"Disable"}} Default {{"Default"}} }} + ipv4_wins = @($wmi.WINSPrimaryServer, $wmi.WINSSecondaryServer).Where({{$_}}) + + # IPv6 Stack + ipv6_address = Get-CIDR -family IPv6 + ipv6_dhcp = if($ipv6) {{ $ipv6.Dhcp -eq 1 }} else {{ $false }} + ipv6_dns = @($dns | Where-Object {{$_.AddressFamily -eq 23}} | Select-Object -ExpandProperty ServerAddresses) + ipv6_enabled = $null -ne $ipv6 + ipv6_forwarding = if ($ipv6) {{ $ipv6.Forwarding -eq 1 }} else {{ $null }} + ipv6_gateways = Get-Gateways -family IPv6 + ipv6_metric = if($ipv6) {{ $ipv6.InterfaceMetric }} else {{ 0 }} + }} + + $result | ConvertTo-Json -Depth 5 -Compress + """ + + with salt.utils.win_pwsh.PowerShellSession() as session: + # Load modules once per session for efficiency + session.import_modules(["NetAdapter", "NetTCPIP", "DnsClient"]) + data = session.run_json(cmd) + + if data: + _normalize_gateway_fields(data) + + return {iface: data} + + +def list_interfaces(full=False): + """ + Lists the available network interfaces on the system. + + Args: + + full (bool, optional): If ``True``, returns a dictionary keyed by + interface names with detailed configuration for each adapter. + If ``False``, returns a list of interface names only. + Defaults to ``False``. + + Returns: + list: When ``full=False``, a list of interface name strings. + + dict: When ``full=True``, a dictionary keyed by interface name. + Each value has the same structure as the per-interface dict + returned by :func:`get_interface_new` — see that function for + the full field reference (``alias``, ``description``, ``enabled``, + ``index``, ``link_status``, ``mac_address``, ``mtu``, + ``dns_register``, ``dns_suffix``, ``ipv4_*``, ``ipv6_*``). + + The entire dataset is collected in a **single PowerShell + invocation** rather than one session per adapter, so it is + significantly faster than calling :func:`get_interface_new` + in a loop. + + CLI Example: + + .. code-block:: bash + + salt -G 'os_family:Windows' ip.list_interfaces + salt -G 'os_family:Windows' ip.list_interfaces full=True + """ + with salt.utils.win_pwsh.PowerShellSession() as session: + session.import_modules(["NetAdapter", "NetTCPIP", "DnsClient"]) + + if not full: + data = session.run_json("(Get-NetAdapter).Name") + if not data: + return [] + return data if isinstance(data, list) else [data] + + # Gather all adapter details in a single PowerShell invocation to avoid + # opening N separate sessions (one per adapter). + cmd = r""" + function Get-CIDR { + param([int]$idx, [string]$family) + $ips = Get-NetIPAddress -InterfaceIndex $idx -AddressFamily $family -ErrorAction SilentlyContinue | + Where-Object { $_.AddressState -eq 'Preferred' -and $_.IPAddress -notlike 'fe80*' } + if ($ips) { @($ips | ForEach-Object { "$($_.IPAddress)/$($_.PrefixLength)" }) } else { @() } + } + + function Get-Gateways { + param([int]$idx, [string]$family) + $prefix = if ($family -eq 'IPv4') { "0.0.0.0/0" } else { "::/0" } + $routes = Get-NetRoute -InterfaceIndex $idx -DestinationPrefix $prefix -ErrorAction SilentlyContinue | + Where-Object { $_.NextHop -and $_.NextHop -notmatch '^0\.0\.0\.0$|^::$' } + if ($routes) { + @($routes | Select-Object @{Name='ip';Expression={$_.NextHop}}, @{Name='metric';Expression={$_.RouteMetric}}) + } else { @() } + } + + $out = @{} + Get-NetAdapter | ForEach-Object { + $adapter = $_ + $idx = [int]$adapter.ifIndex + + $ipv4 = Get-NetIPInterface -InterfaceIndex $idx -AddressFamily IPv4 -ErrorAction SilentlyContinue + $ipv6 = Get-NetIPInterface -InterfaceIndex $idx -AddressFamily IPv6 -ErrorAction SilentlyContinue + $dns = Get-DnsClientServerAddress -InterfaceIndex $idx -ErrorAction SilentlyContinue + $reg = Get-DnsClient -InterfaceIndex $idx -ErrorAction SilentlyContinue + $wmi = Get-CimInstance Win32_NetworkAdapterConfiguration -Filter "InterfaceIndex = $idx" -ErrorAction SilentlyContinue + + $rawMtu = if ($ipv4) { $ipv4.NlMtu } elseif ($ipv6) { $ipv6.NlMtu } else { $adapter.MtuSize } + $finalMtu = if ($rawMtu -eq 4294967295 -or $null -eq $rawMtu) { 1500 } else { $rawMtu } + + $out[$adapter.Name] = [PSCustomObject]@{ + alias = $adapter.Name + description = $adapter.InterfaceDescription + dns_register = $reg.RegisterThisConnectionsAddress + dns_suffix = $reg.ConnectionSpecificSuffix + enabled = $adapter.AdminStatus -eq 1 + index = $idx + link_status = $adapter.Status + mac_address = $adapter.MacAddress + mtu = [int]$finalMtu + + ipv4_address = Get-CIDR -idx $idx -family IPv4 + ipv4_dhcp = if ($ipv4) { $ipv4.Dhcp -eq 1 } else { $false } + ipv4_dns = @($dns | Where-Object { $_.AddressFamily -eq 2 } | Select-Object -ExpandProperty ServerAddresses) + ipv4_enabled = $null -ne $ipv4 + ipv4_forwarding = if ($ipv4) { $ipv4.Forwarding -eq 1 } else { $null } + ipv4_gateways = Get-Gateways -idx $idx -family IPv4 + ipv4_metric = if ($ipv4) { $ipv4.InterfaceMetric } else { 0 } + ipv4_netbios = switch ($ipv4.NetbiosSetting) { 1 { "Enable" } 2 { "Disable" } Default { "Default" } } + ipv4_wins = @($wmi.WINSPrimaryServer, $wmi.WINSSecondaryServer).Where({ $_ }) + + ipv6_address = Get-CIDR -idx $idx -family IPv6 + ipv6_dhcp = if ($ipv6) { $ipv6.Dhcp -eq 1 } else { $false } + ipv6_dns = @($dns | Where-Object { $_.AddressFamily -eq 23 } | Select-Object -ExpandProperty ServerAddresses) + ipv6_enabled = $null -ne $ipv6 + ipv6_forwarding = if ($ipv6) { $ipv6.Forwarding -eq 1 } else { $null } + ipv6_gateways = Get-Gateways -idx $idx -family IPv6 + ipv6_metric = if ($ipv6) { $ipv6.InterfaceMetric } else { 0 } + } + } + $out | ConvertTo-Json -Depth 5 -Compress + """ + data = session.run_json(cmd) + if not data: + return {} + for adapter_data in data.values(): + if isinstance(adapter_data, dict): + _normalize_gateway_fields(adapter_data) + return data diff --git a/salt/utils/validate/net.py b/salt/utils/validate/net.py index c098281b6542..0957edc73f53 100644 --- a/salt/utils/validate/net.py +++ b/salt/utils/validate/net.py @@ -2,6 +2,7 @@ Various network validation utilities """ +import ipaddress import re import socket @@ -92,8 +93,8 @@ def netmask(mask): if not isinstance(mask, str): return False - octets = mask.split(".") - if not len(octets) == 4: + try: + ipaddress.IPv4Network(f"0.0.0.0/{mask}") + return True + except ipaddress.NetmaskValueError: return False - - return ipv4_addr(mask) and octets == sorted(octets, reverse=True) diff --git a/salt/utils/win_pwsh.py b/salt/utils/win_pwsh.py index ff5fae429680..4a075234a2cf 100644 --- a/salt/utils/win_pwsh.py +++ b/salt/utils/win_pwsh.py @@ -1,23 +1,46 @@ +import logging +import re + import salt.modules.cmdmod import salt.utils.json -import salt.utils.platform from salt.exceptions import CommandExecutionError +# Standard Salt logging +log = logging.getLogger(__name__) + +HAS_CLR = False +try: + import clr + + HAS_CLR = True +except ImportError: + log.debug("could not import clr, pythonnet is not available") + +HAS_PWSH_SDK = False +try: + PWSH_GAC_NAME = "System.Management.Automation, Version=3.0.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35" + clr.AddReference(PWSH_GAC_NAME) + from System.Management.Automation import PowerShell + + HAS_PWSH_SDK = True +except Exception as exc: # pylint: disable=broad-exception-caught + log.debug("win_pwsh could not load PowerShell SDK: %s", exc) + def run_dict(cmd, cwd=None): """ - Execute the powershell command and return the data as a dictionary + Execute the PowerShell command and return the data as a dictionary .. versionadded:: 3006.9 Args: - cmd (str,list): The powershell command to run + cmd (str,list): The PowerShell command to run cwd (str): The current working directory Returns: - dict: A dictionary containing the output of the powershell command + dict: A dictionary containing the output of the PowerShell command Raises: CommandExecutionError: @@ -53,3 +76,289 @@ def run_dict(cmd, cwd=None): raise CommandExecutionError("No JSON results from PowerShell", info=ret) return ret + + +class PowerShellSession: + """ + A stateful PowerShell runspace backed by the .NET PowerShell SDK + (``System.Management.Automation``), loaded via ``pythonnet``. + + Unlike :func:`run_dict`, which shells out to a ``powershell.exe`` + subprocess, this class opens an **in-process** runspace and keeps it alive + across multiple calls. The same runspace is reused for every ``run*`` + invocation, so imported modules, loaded functions, and session variables + persist for the lifetime of the object. + + **Requirements:** both :data:`HAS_CLR` and :data:`HAS_PWSH_SDK` must be + ``True`` before instantiating this class. + + **Usage — always use as a context manager** so the underlying runspace is + disposed of deterministically:: + + with PowerShellSession() as session: + result = session.run_json("Get-NetAdapter | Select-Object Name, Status") + + **Methods:** + + * :meth:`run` — run a script and return raw Python scalars/lists. + * :meth:`run_json` — run a script and return a parsed Python object via + ``ConvertTo-Json``. + * :meth:`run_strict` — like :meth:`run` but raises + :exc:`~salt.exceptions.CommandExecutionError` if the script did not run + to completion (i.e., an uncaught error occurred). + + **Session defaults** set in :meth:`__init__`: + + * ``$ErrorActionPreference = 'Stop'`` — all non-terminating cmdlet errors + are promoted to terminating exceptions so they cannot be silently ignored. + * ``$ProgressPreference = 'SilentlyContinue'`` — suppresses progress bars + that would otherwise pollute the output stream. + * ``$WarningPreference = 'SilentlyContinue'`` — suppresses advisory + warnings that are not actionable in an automation context. + """ + + def __init__(self): + """ + Create the PowerShell runspace and apply session-wide preferences. + + Sets ``$ErrorActionPreference = 'Stop'`` so every cmdlet uses + terminating-error semantics by default. Individual commands that are + expected to return nothing (e.g., ``Get-NetRoute`` when no route + exists) must use ``-ErrorAction SilentlyContinue`` or be wrapped in a + PowerShell ``try/catch`` block. + """ + # Create PowerShell instance + self.ps = PowerShell.Create() + + # Suppress anything that might be displayed + self.ps.AddScript("$ProgressPreference = 'SilentlyContinue'").AddStatement() + self.ps.AddScript("$ErrorActionPreference = 'Stop'").AddStatement() + self.ps.AddScript("$WarningPreference = 'SilentlyContinue'").AddStatement() + self.ps.Invoke() + + def __enter__(self): + return self + + def version(self): + return self.run("$PSVersionTable.PSVersion.ToString()") + + def import_modules(self, modules=None): + """ + Load modules into the existing session + """ + if not modules: + return + + self.ps.Commands.Clear() + self.ps.Streams.ClearStreams() + for module in modules if isinstance(modules, list) else [modules]: + self.ps.AddScript(f"Import-Module {module}").AddStatement() + self.ps.Invoke() + + def run(self, cmd): + """ + Run a PowerShell command and return the result without attempting to + parse it convert the PowerShell objects. + + Args: + + cmd (str): The command to run. + + Returns: + str: A string with the results of the command + list: If there is more than one return, it will be a list of strings + """ + # Clear previous commands and any accumulated stream state + self.ps.Commands.Clear() + self.ps.Streams.ClearStreams() + self.ps.AddScript(cmd) + results = self.ps.Invoke() + + if self.ps.HadErrors: + error_msg = "Unknown PowerShell Error" + + if self.ps.Streams.Error.Count > 0: + error_msg = self.ps.Streams.Error[0].ToString() + elif self.ps.InvocationStateInfo.Reason: + error_msg = str(self.ps.InvocationStateInfo.Reason.Message) + + # We don't raise here so the session stays alive, but we log it + log.debug("PowerShell Session Warning/Error: %s", error_msg) + return error_msg + + if not results or len(results) == 0: + return None + + if len(results) == 1: + value = results[0] + if value is None: + return None + + # Access the underlying .NET BaseObject (e.g., System.String -> str) + base_value = value.BaseObject + + # Basic type mapping + if isinstance(base_value, (str, int, bool, float)): + return base_value + + return str(base_value) + + # If there are multiple results, return them as a list of strings + return [str(result) for result in results] + + def run_json(self, cmd, depth=4): + """ + Run a PowerShell command and return the result as a parsed Python object. + + Unless the command already contains ``ConvertTo-Json``, the method + automatically pipes the output through ``ConvertTo-Json -Compress + -Depth `` and then ``Out-String`` before parsing. This ensures + that PowerShell objects are always serialized to English-language JSON + regardless of the system locale. + + Args: + + cmd (str): The PowerShell command or script to run. + + depth (int): The JSON serialization depth passed to + ``ConvertTo-Json``. Defaults to ``4``. + + Returns: + Any: The deserialized Python object (dict, list, str, etc.), or + ``None`` if PowerShell produced no output. + + Raises: + CommandExecutionError: If PowerShell enters a ``Failed`` state. + """ + # Clear previous commands and any accumulated stream state + self.ps.Commands.Clear() + self.ps.Streams.ClearStreams() + + self.ps.AddScript(cmd) + + # Only add ConvertTo-Json if not already present in the command. + # Match both piped (| ConvertTo-Json) and standalone (ConvertTo-Json + # -InputObject ...) forms so we never double-encode the output. + if not re.search(r"ConvertTo-Json", cmd, re.IGNORECASE): + self.ps.AddCommand("ConvertTo-Json") + self.ps.AddParameter("Compress", True) + self.ps.AddParameter("Depth", depth) + + # Use Out-String for reliable string extraction from the pipeline + if not cmd.lower().endswith("out-string"): + self.ps.AddCommand("Out-String") + + results = self.ps.Invoke() + + if self.ps.HadErrors: + if self.ps.InvocationStateInfo.State.ToString() == "Failed": + err_msgs = [str(err) for err in self.ps.Streams.Error] + raise CommandExecutionError(f"PowerShell Error: {err_msgs}") + + if results and len(results) > 0: + return salt.utils.json.loads(str(results[0])) + return None + + def run_strict(self, cmd): + """ + Run a PowerShell command and raise on any error. + + Identical to :meth:`run` except that a ``CommandExecutionError`` is + raised when PowerShell reports ``HadErrors``, rather than logging and + returning the error string. Use this when the caller must not silently + continue after a failure (e.g., when executing a large configuration + script where a mid-script error would leave the system in an unknown + state). + + .. note:: + PowerShell sets ``HadErrors = True`` even for errors caught by + ``try/catch`` inside the script. To distinguish "script ran to + completion with some suppressed errors" from "script terminated + mid-way", we bracket the caller's script with a sentinel variable. + If the sentinel is ``$true`` at the end, all errors were caught + intentionally and we do not raise. + + Args: + + cmd (str): The PowerShell command or script to run. + + Returns: + str | list | None: Same return types as :meth:`run`. + + Raises: + CommandExecutionError: If PowerShell reports any error. + """ + self.ps.Commands.Clear() + self.ps.Streams.ClearStreams() + # Sentinel: set $false before the script, $true after. If the script + # terminates mid-way (uncaught error) the sentinel stays $false. + wrapped = "$__salt_ok__ = $false\n" + cmd + "\n$__salt_ok__ = $true" + self.ps.AddScript(wrapped) + results = self.ps.Invoke() + + if self.ps.HadErrors: + # Capture error info BEFORE the sentinel query overwrites state. + streams_err = ( + self.ps.Streams.Error[0].ToString() + if self.ps.Streams.Error.Count > 0 + else None + ) + inv_reason = self.ps.InvocationStateInfo.Reason + + # Did the script run to completion? If yes, every error was caught + # by an explicit try/catch inside the script — do not raise. + self.ps.Commands.Clear() + self.ps.Streams.ClearStreams() + self.ps.AddScript("$__salt_ok__ -eq $true") + chk = self.ps.Invoke() + ran_to_end = chk and chk.Count > 0 and str(chk[0]).lower() == "true" + if ran_to_end: + # All errors were intentionally suppressed; script completed. + pass + else: + # Script terminated before the sentinel — uncaught error. + error_msg = "Unknown PowerShell Error" + if streams_err: + error_msg = streams_err + elif inv_reason: + error_msg = str(inv_reason.Message) + else: + # Terminating errors from $ErrorActionPreference = 'Stop' + # do not always populate Streams.Error in PS 5.1 SDK. + # Query $Error[0] from the runspace as a last-resort. + self.ps.Commands.Clear() + self.ps.AddScript( + "if ($Error.Count -gt 0)" + " { $Error[0].Exception.Message } else { '' }" + ) + err_results = self.ps.Invoke() + if err_results and err_results.Count > 0: + msg = str(err_results[0]) + if msg.strip(): + error_msg = msg + raise CommandExecutionError(error_msg) + + if not results or len(results) == 0: + return None + + if len(results) == 1: + value = results[0] + if value is None: + return None + base_value = value.BaseObject + if isinstance(base_value, (str, int, bool, float)): + return base_value + return str(base_value) + + return [str(result) for result in results] + + def __exit__(self, exc_type, exc_val, exc_tb): + """ + exc_type: The class of the exception (e.g., CommandExecutionError) + exc_val: The instance of the exception + exc_tb: The traceback object + """ + if exc_type: + log.debug("PowerShellSession exiting due to error: %s", exc_val) + log.debug(exc_tb) + self.ps.Dispose() diff --git a/tests/integration/modules/test_win_ip.py b/tests/integration/modules/test_win_ip.py deleted file mode 100644 index dd8626605342..000000000000 --- a/tests/integration/modules/test_win_ip.py +++ /dev/null @@ -1,27 +0,0 @@ -import re - -import pytest - -from tests.support.case import ModuleCase - - -@pytest.mark.skip_unless_on_windows -class WinIPTest(ModuleCase): - """ - Tests for salt.modules.win_ip - """ - - def test_get_default_gateway(self): - """ - Test getting default gateway - """ - ip = re.compile(r"^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$") - ret = self.run_function("ip.get_default_gateway") - assert ip.match(ret) - - def test_ip_is_enabled(self): - """ - Test ip.is_enabled - """ - assert self.run_function("ip.is_enabled", ["Ethernet"]) - assert "not found" in self.run_function("ip.is_enabled", ["doesnotexist"]) diff --git a/tests/pytests/functional/modules/test_win_ip.py b/tests/pytests/functional/modules/test_win_ip.py new file mode 100644 index 000000000000..6ffc491859f1 --- /dev/null +++ b/tests/pytests/functional/modules/test_win_ip.py @@ -0,0 +1,1013 @@ +import ipaddress + +import pytest + +import salt.utils.json +import salt.utils.validate.net +import salt.utils.win_pwsh +from salt.exceptions import CommandExecutionError, SaltInvocationError +from tests.support.mock import patch + +pytestmark = [ + pytest.mark.skip_unless_on_windows, + pytest.mark.slow_test, + pytest.mark.windows_whitelisted, +] + +INSTALL_LOOPBACK_ADAPTER = r""" +$InstallSource = @" +using System; +using System.Runtime.InteropServices; +using System.Text; + +public class LoopbackInstaller { + [DllImport("setupapi.dll", SetLastError = true, CharSet = CharSet.Auto)] + public static extern IntPtr SetupDiCreateDeviceInfoList(ref Guid ClassGuid, IntPtr hwndParent); + + [DllImport("setupapi.dll", SetLastError = true, CharSet = CharSet.Auto)] + public static extern bool SetupDiCreateDeviceInfo(IntPtr DeviceInfoSet, string DeviceName, ref Guid ClassGuid, string DeviceDescription, IntPtr hwndParent, uint CreationFlags, ref SP_DEVINFO_DATA DeviceInfoData); + + [DllImport("setupapi.dll", SetLastError = true, CharSet = CharSet.Auto)] + public static extern bool SetupDiSetDeviceRegistryProperty(IntPtr DeviceInfoSet, ref SP_DEVINFO_DATA DeviceInfoData, uint Property, byte[] PropertyBuffer, uint PropertyBufferSize); + + [DllImport("setupapi.dll", SetLastError = true, CharSet = CharSet.Auto)] + public static extern bool SetupDiCallClassInstaller(uint InstallFunction, IntPtr DeviceInfoSet, ref SP_DEVINFO_DATA DeviceInfoData); + + [StructLayout(LayoutKind.Sequential)] + public struct SP_DEVINFO_DATA { + public uint cbSize; + public Guid ClassGuid; + public uint DevInst; + public IntPtr Reserved; + } + + public static void Install() { + Guid netClassGuid = new Guid("{4d36e972-e325-11ce-bfc1-08002be10318}"); // Network Adapter Class GUID + IntPtr deviceInfoSet = SetupDiCreateDeviceInfoList(ref netClassGuid, IntPtr.Zero); + + SP_DEVINFO_DATA deviceInfoData = new SP_DEVINFO_DATA(); + deviceInfoData.cbSize = (uint)Marshal.SizeOf(deviceInfoData); + + // Create the device node + SetupDiCreateDeviceInfo(deviceInfoSet, "MSLOOP", ref netClassGuid, null, IntPtr.Zero, 1, ref deviceInfoData); + + // Set the HardwareID property so Windows knows to use the Loopback driver + byte[] hwid = Encoding.Unicode.GetBytes("*MSLOOP\0\0"); + SetupDiSetDeviceRegistryProperty(deviceInfoSet, ref deviceInfoData, 1, hwid, (uint)hwid.Length); + + // DIF_INSTALLDEVICE = 0x19 (Tells Windows to actually install the driver for this node) + SetupDiCallClassInstaller(0x19, deviceInfoSet, ref deviceInfoData); + } +} +"@ + +# Add the installer code to the session +if (-not ([System.Management.Automation.PSTypeName]"LoopbackInstaller").Type) { + Add-Type -TypeDefinition $InstallSource | Out-Null +} + +# Execute the installation +[LoopbackInstaller]::Install() | Out-Null + +# 1. Force the driver installation +# This adds the driver to the store and attempts to start the hardware node +pnputil /add-driver $env:windir\inf\netloop.inf /install +""" + +REMOVE_LOOPBACK_ADAPTER = r""" +$UninstallSource = @" +using System; +using System.Runtime.InteropServices; +using System.Text; + +public class LoopbackUninstaller { + [DllImport("setupapi.dll", SetLastError = true, CharSet = CharSet.Auto)] + public static extern IntPtr SetupDiGetClassDevs(ref Guid ClassGuid, string Enumerator, IntPtr hwndParent, uint Flags); + + [DllImport("setupapi.dll", SetLastError = true)] + public static extern bool SetupDiEnumDeviceInfo(IntPtr DeviceInfoSet, uint MemberIndex, ref SP_DEVINFO_DATA DeviceInfoData); + + [DllImport("setupapi.dll", SetLastError = true, CharSet = CharSet.Auto)] + public static extern bool SetupDiGetDeviceRegistryProperty(IntPtr DeviceInfoSet, ref SP_DEVINFO_DATA DeviceInfoData, uint Property, out uint PropertyRegDataType, byte[] PropertyBuffer, uint PropertyBufferSize, out uint RequiredSize); + + [DllImport("setupapi.dll", SetLastError = true, CharSet = CharSet.Auto)] + public static extern bool SetupDiCallClassInstaller(uint InstallFunction, IntPtr DeviceInfoSet, ref SP_DEVINFO_DATA DeviceInfoData); + + [DllImport("setupapi.dll", SetLastError = true)] + public static extern bool SetupDiDestroyDeviceInfoList(IntPtr DeviceInfoSet); + + [StructLayout(LayoutKind.Sequential)] + public struct SP_DEVINFO_DATA { + public uint cbSize; + public Guid ClassGuid; + public uint DevInst; + public IntPtr Reserved; + } + + public static void UninstallAll() { + Guid netClassGuid = new Guid("{4d36e972-e325-11ce-bfc1-08002be10318}"); + // DIGCF_PRESENT = 0x2 + IntPtr deviceInfoSet = SetupDiGetClassDevs(ref netClassGuid, null, IntPtr.Zero, 0x2); + + SP_DEVINFO_DATA deviceInfoData = new SP_DEVINFO_DATA(); + deviceInfoData.cbSize = (uint)Marshal.SizeOf(deviceInfoData); + + uint i = 0; + while (SetupDiEnumDeviceInfo(deviceInfoSet, i, ref deviceInfoData)) { + uint propType; + uint requiredSize; + byte[] buffer = new byte[1024]; + + // SPDRP_HARDWAREID = 0x1 + if (SetupDiGetDeviceRegistryProperty(deviceInfoSet, ref deviceInfoData, 1, out propType, buffer, (uint)buffer.Length, out requiredSize)) { + string hwid = Encoding.Unicode.GetString(buffer).TrimEnd('\0'); + if (hwid.Contains("*MSLOOP")) { + // DIF_REMOVE = 0x05 + SetupDiCallClassInstaller(0x05, deviceInfoSet, ref deviceInfoData); + // Don't increment 'i' because the list shifted after removal + continue; + } + } + i++; + } + SetupDiDestroyDeviceInfoList(deviceInfoSet); + } +} +"@ + +# Add the uninstaller to the session +Add-Type -TypeDefinition $UninstallSource +if (-not ([System.Management.Automation.PSTypeName]"LoopbackUninstaller").Type) { + Add-Type -TypeDefinition $UninstallSource | Out-Null +} + +# Execute the removal +[LoopbackUninstaller]::UninstallAll() +""" + + +@pytest.fixture(scope="module") +def ip(modules): + return modules.ip + + +@pytest.fixture(scope="module") +def dummy_interface(request): + """ + We need to create a dummy interface for messing with since we're gonna be + enabling, disabling, changing dns, dhcp, gateways, etc + """ + with salt.utils.win_pwsh.PowerShellSession() as session: + session.run(INSTALL_LOOPBACK_ADAPTER) + + # Let's make sure cleanup happens + def cleanup(): + with salt.utils.win_pwsh.PowerShellSession() as cleanup_session: + cleanup_session.run(REMOVE_LOOPBACK_ADAPTER) + + request.addfinalizer(cleanup) + + cmd = """ + $dummy = Get-NetAdapter | Where-Object { $_.InterfaceDescription -match "KM-TEST Loopback" } + Rename-NetAdapter -Name $dummy.Name -NewName "SaltTestLoopback" -Confirm:$false + ConvertTo-Json -InputObject @($dummy.ifIndex, "SaltTestLoopback") -Compress + """ + index, name = session.run_json(cmd) + yield index, name + + +@pytest.fixture(scope="function") +def default_dhcp(ip, dummy_interface): + index, name = dummy_interface + + # Ensure the interface is named correctly before configuring + with salt.utils.win_pwsh.PowerShellSession() as session: + cmd = f""" + Get-NetAdapter -InterfaceIndex {index} | Rename-NetAdapter -NewName "{name}" + """ + session.run(cmd) + + settings = { + "enabled": True, + "ipv4_enabled": True, + "ipv4_dhcp": True, + "ipv6_enabled": True, + "ipv6_dhcp": True, + } + + # 2. Apply settings + ip.set_interface(iface=name, **settings) + + # 3. Verify settings + new_settings = ip.get_interface_new(iface=name)[name] + assert new_settings["enabled"] == settings["enabled"] + assert new_settings["ipv4_enabled"] == settings["ipv4_enabled"] + assert new_settings["ipv4_dhcp"] == settings["ipv4_dhcp"] + assert new_settings["ipv6_enabled"] == settings["ipv6_enabled"] + assert new_settings["ipv6_dhcp"] == settings["ipv6_dhcp"] + + return name, settings + + +@pytest.fixture(scope="function") +def default_static(ip, dummy_interface): + index, name = dummy_interface + + # 1. Ensure the interface is named correctly before configuring + with salt.utils.win_pwsh.PowerShellSession() as session: + cmd = f""" + Get-NetAdapter -InterfaceIndex {index} | Rename-NetAdapter -NewName "{name}" + """ + session.run(cmd) + + settings = { + "enabled": True, + "ipv4_enabled": True, + "ipv4_address": "192.168.1.105/24", + "ipv4_dhcp": False, + "ipv4_dns": ["192.168.1.10"], + "ipv4_gateways": [{"ip": "192.168.1.1", "metric": 5}], + "ipv4_metric": 10, + "ipv4_wins": ["192.168.1.11", "192.168.1.12"], + "ipv6_enabled": True, + "ipv6_address": "2001:db8::1/64", + "ipv6_dhcp": False, + "ipv6_dns": ["fd00:1234:5678:1::10"], + # Use a ULA address rather than a link-local (fe80::) next hop. + # Link-local addresses require a zone ID and are not valid default-route + # next hops on loopback adapters. + "ipv6_gateways": [{"ip": "fd00::1", "metric": 50}], + "ipv6_metric": 100, + } + + # 2. Apply settings + ip.set_interface(iface=name, **settings) + + # 3. Verify settings + new_settings = ip.get_interface_new(iface=name)[name] + assert new_settings["enabled"] == settings["enabled"] + assert new_settings["ipv4_enabled"] == settings["ipv4_enabled"] + assert new_settings["ipv4_address"] == settings["ipv4_address"] + assert new_settings["ipv4_dhcp"] == settings["ipv4_dhcp"] + assert new_settings["ipv4_dns"] == settings["ipv4_dns"] + assert new_settings["ipv4_gateways"] == settings["ipv4_gateways"] + assert new_settings["ipv4_metric"] == settings["ipv4_metric"] + assert new_settings["ipv4_wins"] == settings["ipv4_wins"] + assert new_settings["ipv6_enabled"] == settings["ipv6_enabled"] + assert new_settings["ipv6_address"] == "2001:db8::1/64" + assert new_settings["ipv6_dhcp"] == settings["ipv6_dhcp"] + assert new_settings["ipv6_dns"] == settings["ipv6_dns"] + assert new_settings["ipv6_gateways"] == settings["ipv6_gateways"] + assert new_settings["ipv6_metric"] == settings["ipv6_metric"] + + return name, settings + + +@pytest.fixture(scope="function") +def enabled(ip, dummy_interface): + """ + We'll make sure the device is enabled and make sure it's still enabled + when we're done + """ + index, name = dummy_interface + ip.enable(name) + assert ip.is_enabled(name) + return name + + +@pytest.fixture(scope="function") +def disabled(ip, dummy_interface): + index, name = dummy_interface + ip.disable(name) + assert ip.is_disabled(name) + return name + + +def test_is_enabled(ip, enabled): + """ + Test that is_enabled returns True for an administratively up interface. + """ + assert ip.is_enabled(enabled) + + +def test_is_enabled_invalid_interface(ip): + """ + Test that is_enabled raises CommandExecutionError for non-existent interfaces. + """ + with pytest.raises(CommandExecutionError) as excinfo: + ip.is_enabled("does-not-exist") + assert ( + str(excinfo.value) + == "Interface 'does-not-exist' not found or invalid response." + ) + + +def test_is_disabled(ip, disabled): + """ + Test that is_disabled returns True for an administratively down interface. + """ + assert ip.is_disabled(disabled) + + +def test_is_disabled_invalid_interface(ip): + """ + Test that is_disabled raises CommandExecutionError for non-existent interfaces. + """ + with pytest.raises(CommandExecutionError) as excinfo: + ip.is_disabled("does-not-exist") + assert ( + str(excinfo.value) + == "Interface 'does-not-exist' not found or invalid response." + ) + + +def test_enable(ip, disabled): + """ + Test that enable() brings a disabled interface back to administrative up state. + """ + ip.enable(disabled) + assert ip.is_enabled(disabled) + + +def test_disable(ip, enabled): + """ + Test that disable() takes an enabled interface to administrative down state. + """ + ip.disable(enabled) + assert ip.is_disabled(enabled) + + +def test_get_subnet_length(ip): + """ + Test converting dotted-decimal netmasks to CIDR lengths. + """ + # Standard Class C + assert ip.get_subnet_length("255.255.255.0") == 24 + + # Standard Class B + assert ip.get_subnet_length("255.255.0.0") == 16 + + # Standard Class A + assert ip.get_subnet_length("255.0.0.0") == 8 + + # Subnetted masks + assert ip.get_subnet_length("255.255.255.192") == 26 + assert ip.get_subnet_length("255.255.252.0") == 22 + + # Edge cases + assert ip.get_subnet_length("255.255.255.255") == 32 + assert ip.get_subnet_length("0.0.0.0") == 0 + + +def test_get_subnet_length_errors(ip): + """ + Test that invalid netmasks raise SaltInvocationError. + """ + invalid_masks = [ + "255.255.255.256", # Out of range + "192.168.1.1", # An IP, not a mask + "255.255.0.255", # Discontiguous mask + "not-a-mask", # Garbage string + ] + + for mask in invalid_masks: + with pytest.raises(SaltInvocationError) as excinfo: + ip.get_subnet_length(mask) + assert f"'{mask}' is not a valid netmask" in str(excinfo.value) + + +def test_set_static_ip_basic(ip, default_dhcp): + """ + Test setting a basic static IP and gateway (overwriting existing). + """ + name, settings = default_dhcp + address = "10.1.2.3/24" + gateway = "10.1.2.1" + + # Set the IP + ret = ip.set_static_ip(iface=name, addr=address, gateway=gateway) + + # Verify return message + assert ret["Address Info"] == "10.1.2.3/24" + assert ret["Default Gateway"] == "10.1.2.1" + + result = ip.get_interface_new(name)[name] + assert "10.1.2.3/24" == result["ipv4_address"] + assert "10.1.2.1" == result["ipv4_gateways"][0]["ip"] + + +def test_set_static_ip_append(ip, default_dhcp): + """ + Test appending an IP to an existing configuration. + """ + name, settings = default_dhcp + ip.set_static_ip(iface=name, addr="10.1.2.3/24") + + # Append a second IP + ip.set_static_ip(iface=name, addr="10.1.2.4/24", append=True) + + result = ip.get_interface_new(name)[name] + assert "10.1.2.3/24" in result["ipv4_address"] + assert "10.1.2.4/24" in result["ipv4_address"] + + +def test_set_static_ip_append_duplicate_error(ip, default_dhcp): + """ + Test that appending an existing IP raises CommandExecutionError. + """ + name, settings = default_dhcp + addr = "10.1.2.5/24" + ip.set_static_ip(iface=name, addr=addr) + assert ip.get_interface_new(name)[name]["ipv4_address"] == addr + + with pytest.raises(CommandExecutionError) as excinfo: + ip.set_static_ip(iface=name, addr=addr, append=True) + + address, _ = addr.split("/") if "/" in addr else (addr, "24") + msg = f"Address '{address}' already exists on '{name}'" + assert excinfo.value.args[0] == msg + + +def test_set_static_ip_no_cidr_default(ip, default_dhcp): + """ + Test that an IP without a CIDR defaults to /24. + """ + name, settings = default_dhcp + # The code specifically adds /24 if "/" is missing + ip.set_static_ip(iface=name, addr="10.10.10.10") + + result = ip.get_interface_new(name)[name] + assert "10.10.10.10/24" in result["ipv4_address"] + + +def test_set_static_ip_invalid_inputs(ip, dummy_interface): + """ + Test validation for bad IP and Gateway strings. + """ + index, name = dummy_interface + + with pytest.raises(SaltInvocationError): + ip.set_static_ip(iface=name, addr="999.999.999.999") + + with pytest.raises(SaltInvocationError): + ip.set_static_ip(iface=name, addr="10.1.1.1/24", gateway="not.an.ip") + + +def test_set_dhcp_ip_success(ip, default_static): + """ + Test transitioning from a static IP to DHCP. + """ + name, settings = default_static + + # 1. Start with a static IP to ensure we aren't already in DHCP mode + ip.set_static_ip(iface=name, addr="10.1.2.3/24") + assert ip.get_interface_new(name)[name]["ipv4_dhcp"] is False + + # 2. Run the DHCP setter + ret = ip.set_dhcp_ip(iface=name) + + # 3. Verify return and actual state + assert ret["DHCP enabled"] == "Yes" + assert ip.get_interface_new(name)[name]["ipv4_dhcp"] is True + + +def test_set_dhcp_ip_already_enabled(ip, default_dhcp): + """ + Test that calling set_dhcp_ip on an interface that already has DHCP + enabled is a no-op and returns an empty dict. + """ + name, settings = default_dhcp + + # Try to turn dhcp on (it's already on) + ret = ip.set_dhcp_ip(iface=name) + assert ret == {} + + +def test_set_dhcp_ip_invalid_interface(ip): + """ + Test that set_dhcp_ip raises an exception when the interface does not exist. + """ + with pytest.raises(Exception): + ip.set_dhcp_ip(iface="ThisInterfaceDoesNotExist") + + +def test_set_static_dns_single(ip, dummy_interface): + """ + Test setting a single static DNS server. + """ + index, name = dummy_interface + dns_target = ["1.1.1.1"] + + ret = ip.set_static_dns(name, *dns_target) + + assert ret["Interface"] == name + # Ensure the DNS is set in the stack + result = ip.get_interface_new(name)[name] + assert "1.1.1.1" in result["ipv4_dns"] + + +def test_set_static_dns_multiple(ip, dummy_interface): + """ + Test setting multiple DNS servers in specific order. + """ + index, name = dummy_interface + dns_targets = ["8.8.8.8", "8.8.4.4"] + + ret = ip.set_static_dns(name, *dns_targets) + + assert set(ret["DNS Server"]) == set(dns_targets) + result = ip.get_interface_new(name)[name] + assert all(dns in result["ipv4_dns"] for dns in dns_targets) + + +def test_set_static_dns_no_changes(ip, dummy_interface): + """ + Test that passing 'None' or no addresses returns 'No Changes'. + """ + index, name = dummy_interface + ret = ip.set_static_dns(name, "None") + assert ret["DNS Server"] == "No Changes" + + +def test_set_static_dns_already_set(ip, dummy_interface): + """ + Test that setting the same DNS again returns an empty dict (idempotency). + """ + index, name = dummy_interface + dns = "9.9.9.9" + + # First set + ip.set_static_dns(name, dns) + + # Ensure the DNS is set in the stack + result = ip.get_interface_new(name)[name] + assert "9.9.9.9" in result["ipv4_dns"] + + # Second set + ret = ip.set_static_dns(name, dns) + + assert ret == {} + + +def test_set_static_dns_clear_via_list_string(ip, dummy_interface): + """ + Test that passing '[]' calls set_dhcp_dns. + """ + index, name = dummy_interface + + # 1. Set it to something static first + ip.set_static_dns(name, "1.1.1.1") + + # 2. Call the clear logic (the real deal, no mocks) + ret = ip.set_static_dns(name, "[]") + + # 3. Verify the real-world result + assert ret["DNS Server"] == "DHCP (Empty)" + result = ip.get_interface_new(name)[name] + # Check that the static IP we set is gone + assert "1.1.1.1" not in result.get("ipv4_dns", []) + + +def test_set_dhcp_dns_success(ip, dummy_interface): + """ + Test transitioning from static DNS back to automatic (DHCP-sourced) DNS. + + Verifies that after the reset the previously configured static server is no + longer present in the interface's DNS server list. + """ + index, name = dummy_interface + + # 1. Start by setting a static DNS to ensure we aren't already in DHCP mode + ip.set_static_dns(name, "1.1.1.1") + + result = ip.get_interface_new(name)[name] + assert "1.1.1.1" in result["ipv4_dns"] + + # 2. Reset DNS to automatic + ret = ip.set_dhcp_dns(iface=name) + + # 3. Verify return message and actual state + assert ret["DNS Server"] == "DHCP (Empty)" + assert ret["Interface"] == name + + result = ip.get_interface_new(name)[name] + assert "1.1.1.1" not in result.get("ipv4_dns", []) + + +def test_set_dhcp_dns_already_enabled(ip, dummy_interface): + """ + Test that calling set_dhcp_dns on an interface already using DHCP + returns an empty dict (idempotency). + """ + index, name = dummy_interface + + # Ensure it's in DHCP mode first + ip.set_dhcp_dns(iface=name) + + # Call it again + ret = ip.set_dhcp_dns(iface=name) + assert ret == {} + + +def test_set_dhcp_dns_invalid_interface(ip): + """ + Test that an invalid interface raises an error. + """ + with pytest.raises(Exception): + ip.set_dhcp_dns(iface="NonExistentInterface") + + +def test_set_dhcp_all(ip, dummy_interface): + """ + Integration test to ensure both IP and DNS are reset to DHCP. + """ + index, name = dummy_interface + + # 1. Manually "dirty" the interface with static settings + ip.set_static_ip(iface=name, addr="10.1.2.3/24", gateway="10.1.2.1") + ip.set_static_dns(name, "1.1.1.1") + + # 2. Run the "All" reset + ret = ip.set_dhcp_all(iface=name) + + # 3. Verify the return structure + assert ret["Interface"] == name + assert ret["DHCP enabled"] == "Yes" + assert ret["DNS Server"] == "DHCP" + + # 4. Verify the actual stack state via our getter + result = ip.get_interface_new(name)[name] + assert result["ipv4_dhcp"] is True + # If your getter tracks DNS source, check that too. + # Otherwise, ensure the static IP is gone. + assert "10.1.2.3/24" not in result["ipv4_address"] + + +def test_get_default_gateway_success(ip): + """ + Test that we can retrieve a gateway on a live system. + Note: This assumes the test runner has internet access/a gateway. + """ + try: + gateway = ip.get_default_gateway() + # Verify it looks like an IP address + assert salt.utils.validate.net.ipv4_addr(gateway) + except CommandExecutionError: + pytest.skip("No default gateway found on this system.") + + +def test_get_default_gateway_selection(ip, dummy_interface): + """ + Test that the function selects the gateway with the lowest metric. + """ + index, name = dummy_interface + + # 1. Set a gateway with a high metric + ip.set_interface( + iface=name, + ipv4_address="10.99.99.2/24", + ipv4_gateways={"ip": "10.99.99.1", "metric": 1}, + ) + + # 2. Check the gateway. On a test machine with no other internet, + # this should return our dummy gateway. + gateway = ip.get_default_gateway(iface=name) + assert gateway == "10.99.99.1" + + +def test_get_default_gateway_no_route(ip): + """ + Test that the function raises CommandExecutionError when no route is found, + using unittest.mock to simulate an empty PowerShell return. + """ + # We patch the 'run' method of the PowerShellSession class + # located within the win_pwsh utility module. + with patch("salt.utils.win_pwsh.PowerShellSession.run") as mock_run: + # Simulate PowerShell returning nothing (empty string or None) + mock_run.return_value = "" + + with pytest.raises(CommandExecutionError) as excinfo: + ip.get_default_gateway() + + assert "Unable to find default gateway" in str(excinfo.value) + + # Optional: Verify that the correct PowerShell command was attempted + # We check the first argument of the last call to mock_run + called_cmd = mock_run.call_args[0][0] + assert "Get-NetRoute" in called_cmd + assert "0.0.0.0/0" in called_cmd + + +def test_get_interface_static(ip, default_static): + """ + Test that get_interface returns the correct legacy-format keys for a + statically configured interface (DHCP disabled, static IP/DNS/WINS/gateway). + """ + name, settings = default_static + result = ip.get_interface(name)[name] + assert result["DHCP enabled"] == "No" + assert result["Default Gateway"] == settings["ipv4_gateways"][0]["ip"] + assert result["InterfaceMetric"] == settings["ipv4_metric"] + assert result["Register with which suffix"] == "Primary only" + assert result["Statically Configured DNS Servers"] == settings["ipv4_dns"] + assert result["Statically Configured WINS Servers"] == settings["ipv4_wins"] + ip_info = ipaddress.IPv4Interface(settings["ipv4_address"]) + expected_ip_addrs = [ + { + "IP Address": ip_info._string_from_ip_int(ip_info._ip), + "Netmask": str(ip_info.netmask), + "Subnet": str(ip_info.network), + }, + ] + assert result["ip_addrs"] == expected_ip_addrs + + +def test_get_interface_dhcp(ip, default_dhcp): + """ + Test that get_interface returns the correct legacy-format keys for an + interface configured to obtain its IP and DNS from DHCP. + """ + name, settings = default_dhcp + result = ip.get_interface(name)[name] + assert result["DHCP enabled"] == "Yes" + assert result["Register with which suffix"] == "Primary only" + assert result["DNS servers configured through DHCP"] == ["None"] + assert result["WINS servers configured through DHCP"] == ["None"] + + +def test_get_interface_new_static(ip, default_static): + """ + Test that get_interface_new returns the full structured configuration for a + statically configured interface, including both IPv4 and IPv6 addresses, + gateways, DNS, WINS, and metric values. + """ + name, settings = default_static + result = ip.get_interface_new(name)[name] + assert result["alias"] == name + assert result["description"] == "Microsoft KM-TEST Loopback Adapter" + for setting in settings: + assert result[setting] == settings[setting] + + +def test_get_interface_new_dhcp(ip, default_dhcp): + """ + Test that get_interface_new returns the correct structured configuration for + an interface configured to use DHCP for both IPv4 and IPv6. + """ + name, settings = default_dhcp + result = ip.get_interface_new(name)[name] + assert result["alias"] == name + assert result["description"] == "Microsoft KM-TEST Loopback Adapter" + for setting in settings: + assert result[setting] == settings[setting] + + +@pytest.mark.parametrize( + "gateways_input, expected_gateways", + [ + # Case 1: Single string gateway — metric falls back to ipv4_metric (10) + ("192.168.1.1", [{"ip": "192.168.1.1", "metric": 10}]), + # Case 2: List of string gateways — metric falls back to ipv4_metric (10) + (["192.168.1.1"], [{"ip": "192.168.1.1", "metric": 10}]), + # Case 3: Single dict with custom metric + ({"ip": "192.168.1.1", "metric": 5}, [{"ip": "192.168.1.1", "metric": 5}]), + # Case 4: List of dicts + ([{"ip": "192.168.1.1", "metric": 99}], [{"ip": "192.168.1.1", "metric": 99}]), + ], +) +def test_set_interface_flexible_gateways( + ip, dummy_interface, gateways_input, expected_gateways +): + """ + Test that set_interface accepts gateways in all supported input formats + (plain string, list of strings, dict, list of dicts) and that + get_interface_new always returns them as a normalized list of + ``{"ip": ..., "metric": ...}`` dicts. When no per-gateway metric is + supplied the interface-level ``ipv4_metric`` is used as the fallback. + """ + index, name = dummy_interface + + settings = { + "ipv4_gateways": gateways_input, + "ipv4_metric": 10, # The fallback metric + "ipv4_dhcp": False, + "ipv4_address": "192.168.1.200/24", + } + + ip.set_interface(iface=name, **settings) + result = ip.get_interface_new(name)[name] + + # Verify the gateway matches the expected dictionary format + # Note: result["ipv4_gateways"] will be a list of dicts based on our getter + assert result["ipv4_gateways"] == expected_gateways + + +def test_ipv4_cidr_defaulting(ip, dummy_interface): + """ + Test that IPv4 addresses supplied without a prefix length default to /24, + while addresses that already include a prefix keep their original length. + """ + index, name = dummy_interface + + # Pass a list of naked IPs and one with CIDR + ips = ["172.16.0.5", "172.16.0.6/16"] + ip.set_interface(iface=name, ipv4_address=ips, ipv4_dhcp=False) + + result = ip.get_interface_new(name)[name] + + # Check that the first one defaulted to /24 and the second kept its /16 + assert "172.16.0.5/24" in result["ipv4_address"] + assert "172.16.0.6/16" in result["ipv4_address"] + + +def test_dhcp_with_manual_metric(ip, dummy_interface): + """ + Test that an explicit interface metric can be set alongside DHCP, + confirming that metric and DHCP mode are independent settings. + """ + index, name = dummy_interface + + # Enable DHCP but force a very high metric + ip.set_interface(iface=name, ipv4_dhcp=True, ipv4_metric=500) + + result = ip.get_interface_new(name)[name] + assert result["ipv4_dhcp"] is True + assert result["ipv4_metric"] == 500 + + +def test_interface_hardware_and_dns_suffix(ip, dummy_interface): + """ + Test that MTU and the connection-specific DNS suffix can be set and are + accurately reflected by get_interface_new. + """ + index, name = dummy_interface + + ip.set_interface(iface=name, mtu=1450, dns_suffix="test.saltstack.com") + + res = ip.get_interface_new(iface=name)[name] + assert res["mtu"] == 1450 + assert res["dns_suffix"] == "test.saltstack.com" + + +def test_interface_forwarding_toggles(ip, dummy_interface): + """ + Test that IP forwarding can be independently enabled and disabled for both + the IPv4 and IPv6 stacks, and that the change is reflected by get_interface_new. + """ + index, name = dummy_interface + + # 1. Enable forwarding on both stacks + ip.set_interface(iface=name, ipv4_forwarding=True, ipv6_forwarding=True) + + res = ip.get_interface_new(iface=name)[name] + assert res["ipv4_forwarding"] is True + assert res["ipv6_forwarding"] is True + + # 2. Disable and verify + ip.set_interface(iface=name, ipv4_forwarding=False, ipv6_forwarding=False) + res = ip.get_interface_new(iface=name)[name] + assert res["ipv4_forwarding"] is False + assert res["ipv6_forwarding"] is False + + +def test_interface_rename_persistence(ip, dummy_interface): + """ + Test that renaming an interface via set_interface (alias parameter) is + persisted: the new name is discoverable by get_interface_new and the + InterfaceIndex remains unchanged. The original name is restored in the + finally block so subsequent tests are not affected. + """ + index, name = dummy_interface + new_name = "Salt-Rename-Test" + + try: + # 1. Rename via the setter + ip.set_interface(iface=name, alias=new_name) + + # 2. Verify the getter can find it by the NEW name + res = ip.get_interface_new(iface=new_name)[new_name] + assert res["alias"] == new_name + assert res["index"] == index + + finally: + # 3. Clean up: Rename it back to the original fixture name + ip.set_interface(iface=new_name, alias=name) + + +def test_interface_ip_append(ip, dummy_interface): + """ + Test that passing append=True to set_interface adds an additional IP + address without removing the previously configured one. + """ + index, name = dummy_interface + primary_ip = "10.10.10.10/24" + secondary_ip = "10.10.10.11/24" + + # 1. Set first IP + ip.set_interface(iface=name, ipv4_address=primary_ip, ipv4_dhcp=False) + + # 2. Append second IP + ip.set_interface(iface=name, ipv4_address=secondary_ip, append=True) + + res = ip.get_interface_new(iface=name)[name] + # Verify BOTH IPs exist in the list + assert primary_ip in res["ipv4_address"] + assert secondary_ip in res["ipv4_address"] + + +def test_interface_invalid_mtu_raises(ip, dummy_interface): + """ + Test that set_interface raises SaltInvocationError when an MTU value + outside the supported range (576–9000) is provided. + """ + index, name = dummy_interface + + with pytest.raises(SaltInvocationError): + ip.set_interface(iface=name, mtu=9999) + + +def test_set_interface_atomic_multi_change(ip, dummy_interface): + """ + Test that set_interface applies multiple unrelated settings (MTU, IPv6 + forwarding, DNS suffix, interface metric) atomically in a single call, and + that all changes are reflected correctly by get_interface_new. + """ + index, name = dummy_interface + + # Mix hardware, protocol, and client settings + ip.set_interface( + iface=name, + mtu=1400, + ipv6_forwarding=True, + dns_suffix="atomic.test", + ipv4_metric=42, + ) + + res = ip.get_interface_new(iface=name)[name] + assert res["mtu"] == 1400 + assert res["ipv6_forwarding"] is True + assert res["dns_suffix"] == "atomic.test" + assert res["ipv4_metric"] == 42 + + +def test_set_interface_protocol_binding(ip, dummy_interface): + """ + Test that the IPv6 protocol binding (ms_tcpip6) can be disabled and + re-enabled via set_interface, and that the change is visible through + the ipv6_enabled field in get_interface_new. + """ + index, name = dummy_interface + + # Disable IPv6 binding + ip.set_interface(iface=name, ipv6_enabled=False) + res = ip.get_interface_new(iface=name)[name] + assert res["ipv6_enabled"] is False + + # Re-enable + ip.set_interface(iface=name, ipv6_enabled=True) + res = ip.get_interface_new(iface=name)[name] + assert res["ipv6_enabled"] is True + + +def test_set_interface_dns_registration(ip, dummy_interface): + """ + Test that the dns_register flag can be toggled via set_interface, controlling + whether the interface registers its addresses in DNS with the computer's + primary DNS suffix. + """ + index, name = dummy_interface + + # Disable registration + ip.set_interface(iface=name, dns_register=False) + res = ip.get_interface_new(iface=name)[name] + assert res["dns_register"] is False + + # Enable registration + ip.set_interface(iface=name, dns_register=True) + res = ip.get_interface_new(iface=name)[name] + assert res["dns_register"] is True + + +def test_interface_protocol_binding_toggles(ip, dummy_interface): + """ + Test that the IPv4 and IPv6 protocol bindings can each be disabled + independently and then re-enabled together, verifying the state after each + change via get_interface_new. + """ + index, name = dummy_interface + + # 1. Disable IPv4 Stack + ip.set_interface(iface=name, ipv4_enabled=False) + res = ip.get_interface_new(iface=name)[name] + assert res["ipv4_enabled"] is False + + # 2. Disable IPv6 Stack + ip.set_interface(iface=name, ipv6_enabled=False) + res = ip.get_interface_new(iface=name)[name] + assert res["ipv6_enabled"] is False + + # 3. Re-enable both for cleanup + ip.set_interface(iface=name, ipv4_enabled=True, ipv6_enabled=True) + res = ip.get_interface_new(iface=name)[name] + assert res["ipv4_enabled"] is True + assert res["ipv6_enabled"] is True diff --git a/tests/pytests/unit/modules/test_win_ip.py b/tests/pytests/unit/modules/test_win_ip.py index 94a3fe7ca938..296c55aa2c13 100644 --- a/tests/pytests/unit/modules/test_win_ip.py +++ b/tests/pytests/unit/modules/test_win_ip.py @@ -7,390 +7,12 @@ import pytest import salt.modules.win_ip as win_ip -from salt.exceptions import CommandExecutionError, SaltInvocationError -from tests.support.mock import MagicMock, call, patch - - -@pytest.fixture -def configure_loader_modules(): - return {win_ip: {}} - - -@pytest.fixture -def ethernet_config(): - return ( - 'Configuration for interface "Ethernet"\n' - "DHCP enabled: Yes\n" - "IP Address: 1.2.3.74\n" - "Subnet Prefix: 1.2.3.0/24 (mask 255.255.255.0)\n" - "Default Gateway: 1.2.3.1\n" - "Gateway Metric: 0\n" - "InterfaceMetric: 20\n" - "DNS servers configured through DHCP: 1.2.3.4\n" - "Register with which suffix: Primary only\n" - "WINS servers configured through DHCP: None\n" - ) - - -@pytest.fixture -def ethernet_enable(): - return "Ethernet\nType: Dedicated\nAdministrative state: Enabled\nConnect state: Connected" - - -# 'raw_interface_configs' function tests: 1 - - -def test_raw_interface_configs(ethernet_config): - """ - Test if it return raw configs for all interfaces. - """ - mock_cmd = MagicMock(return_value=ethernet_config) - with patch.dict(win_ip.__salt__, {"cmd.run": mock_cmd}): - assert win_ip.raw_interface_configs() == ethernet_config - - -# 'get_all_interfaces' function tests: 1 - - -def test_get_all_interfaces(ethernet_config): - """ - Test if it return configs for all interfaces. - """ - ret = { - "Ethernet": { - "DHCP enabled": "Yes", - "DNS servers configured through DHCP": ["1.2.3.4"], - "Default Gateway": "1.2.3.1", - "Gateway Metric": "0", - "InterfaceMetric": "20", - "Register with which suffix": "Primary only", - "WINS servers configured through DHCP": ["None"], - "ip_addrs": [ - { - "IP Address": "1.2.3.74", - "Netmask": "255.255.255.0", - "Subnet": "1.2.3.0/24", - } - ], - } - } - - mock_cmd = MagicMock(return_value=ethernet_config) - with patch.dict(win_ip.__salt__, {"cmd.run": mock_cmd}): - assert win_ip.get_all_interfaces() == ret - - -# 'get_interface' function tests: 1 - - -def test_get_interface(ethernet_config): - """ - Test if it return the configuration of a network interface. - """ - ret = { - "DHCP enabled": "Yes", - "DNS servers configured through DHCP": ["1.2.3.4"], - "Default Gateway": "1.2.3.1", - "Gateway Metric": "0", - "InterfaceMetric": "20", - "Register with which suffix": "Primary only", - "WINS servers configured through DHCP": ["None"], - "ip_addrs": [ - { - "IP Address": "1.2.3.74", - "Netmask": "255.255.255.0", - "Subnet": "1.2.3.0/24", - } - ], - } - - mock_cmd = MagicMock(return_value=ethernet_config) - with patch.dict(win_ip.__salt__, {"cmd.run": mock_cmd}): - assert win_ip.get_interface("Ethernet") == ret - - -# 'is_enabled' function tests: 1 - - -def test_is_enabled(ethernet_enable): - """ - Test if it returns `True` if interface is enabled, otherwise `False`. - """ - mock_cmd = MagicMock(side_effect=[ethernet_enable, ""]) - with patch.dict(win_ip.__salt__, {"cmd.run": mock_cmd}): - assert win_ip.is_enabled("Ethernet") - pytest.raises(CommandExecutionError, win_ip.is_enabled, "Ethernet") - - -# 'is_disabled' function tests: 1 - - -def test_is_disabled(ethernet_enable): - """ - Test if it returns `True` if interface is disabled, otherwise `False`. - """ - mock_cmd = MagicMock(return_value=ethernet_enable) - with patch.dict(win_ip.__salt__, {"cmd.run": mock_cmd}): - assert not win_ip.is_disabled("Ethernet") - - -# 'enable' function tests: 1 - - -def test_enable(): - """ - Test if it enable an interface. - """ - # Test with enabled interface - with patch.object(win_ip, "is_enabled", return_value=True): - assert win_ip.enable("Ethernet") - - mock_cmd = MagicMock() - with patch.object(win_ip, "is_enabled", side_effect=[False, True]), patch.dict( - win_ip.__salt__, {"cmd.run": mock_cmd} - ): - assert win_ip.enable("Ethernet") - - mock_cmd.assert_called_once_with( - [ - "netsh", - "interface", - "set", - "interface", - "name=Ethernet", - "admin=ENABLED", - ], - python_shell=False, - ) - - -# 'disable' function tests: 1 - - -def test_disable(): - """ - Test if it disable an interface. - """ - with patch.object(win_ip, "is_disabled", return_value=True): - assert win_ip.disable("Ethernet") - - mock_cmd = MagicMock() - with patch.object(win_ip, "is_disabled", side_effect=[False, True]), patch.dict( - win_ip.__salt__, {"cmd.run": mock_cmd} - ): - assert win_ip.disable("Ethernet") - - mock_cmd.assert_called_once_with( - [ - "netsh", - "interface", - "set", - "interface", - "name=Ethernet", - "admin=DISABLED", - ], - python_shell=False, - ) - - -# 'get_subnet_length' function tests: 1 +from salt.exceptions import SaltInvocationError def test_get_subnet_length(): """ - Test if it disable an interface. + Test get subnet length is correct. """ assert win_ip.get_subnet_length("255.255.255.0") == 24 pytest.raises(SaltInvocationError, win_ip.get_subnet_length, "255.255.0") - - -# 'set_static_ip' function tests: 1 - - -@pytest.mark.slow_test -def test_set_static_ip(ethernet_config): - """ - Test if it set static IP configuration on a Windows NIC. - """ - pytest.raises( - SaltInvocationError, - win_ip.set_static_ip, - "Local Area Connection", - "10.1.2/24", - ) - - mock_cmd = MagicMock(return_value=ethernet_config) - mock_all = MagicMock(return_value={"retcode": 1, "stderr": "Error"}) - with patch.dict(win_ip.__salt__, {"cmd.run": mock_cmd, "cmd.run_all": mock_all}): - pytest.raises( - CommandExecutionError, - win_ip.set_static_ip, - "Ethernet", - "1.2.3.74/24", - append=True, - ) - pytest.raises( - CommandExecutionError, win_ip.set_static_ip, "Ethernet", "1.2.3.74/24" - ) - - mock_all = MagicMock(return_value={"retcode": 0}) - with patch.dict(win_ip.__salt__, {"cmd.run": mock_cmd, "cmd.run_all": mock_all}): - assert win_ip.set_static_ip("Local Area Connection", "1.2.3.74/24") == {} - assert win_ip.set_static_ip("Ethernet", "1.2.3.74/24") == { - "Address Info": { - "IP Address": "1.2.3.74", - "Netmask": "255.255.255.0", - "Subnet": "1.2.3.0/24", - } - } - - -# 'set_dhcp_ip' function tests: 1 - - -def test_set_dhcp_ip(ethernet_config): - """ - Test if it set Windows NIC to get IP from DHCP. - """ - mock_cmd = MagicMock(return_value=ethernet_config) - with patch.dict(win_ip.__salt__, {"cmd.run": mock_cmd}): - assert win_ip.set_dhcp_ip("Ethernet") == { - "DHCP enabled": "Yes", - "Interface": "Ethernet", - } - - -# 'set_static_dns' function tests: 1 - - -def test_set_static_dns(): - """ - Test if it set static DNS configuration on a Windows NIC. - """ - mock_cmd = MagicMock() - with patch.dict(win_ip.__salt__, {"cmd.run": mock_cmd}): - assert win_ip.set_static_dns("Ethernet", "192.168.1.252", "192.168.1.253") == { - "DNS Server": ("192.168.1.252", "192.168.1.253"), - "Interface": "Ethernet", - } - mock_cmd.assert_has_calls( - [ - call( - [ - "netsh", - "interface", - "ip", - "set", - "dns", - "name=Ethernet", - "source=static", - "address=192.168.1.252", - "register=primary", - ], - python_shell=False, - ), - call( - [ - "netsh", - "interface", - "ip", - "add", - "dns", - "name=Ethernet", - "address=192.168.1.253", - "index=2", - ], - python_shell=False, - ), - ] - ) - - -def test_set_static_dns_clear(): - """ - Test if it set static DNS configuration on a Windows NIC. - """ - mock_cmd = MagicMock() - with patch.dict(win_ip.__salt__, {"cmd.run": mock_cmd}): - assert win_ip.set_static_dns("Ethernet", []) == { - "DNS Server": [], - "Interface": "Ethernet", - } - mock_cmd.assert_called_once_with( - [ - "netsh", - "interface", - "ip", - "set", - "dns", - "name=Ethernet", - "source=static", - "address=none", - ], - python_shell=False, - ) - - -def test_set_static_dns_no_action(): - """ - Test if it set static DNS configuration on a Windows NIC. - """ - # Test passing nothing - assert win_ip.set_static_dns("Ethernet") == { - "DNS Server": "No Changes", - "Interface": "Ethernet", - } - # Test passing None - assert win_ip.set_static_dns("Ethernet", None) == { - "DNS Server": "No Changes", - "Interface": "Ethernet", - } - - # Test passing string None - assert win_ip.set_static_dns("Ethernet", "None") == { - "DNS Server": "No Changes", - "Interface": "Ethernet", - } - - -# 'set_dhcp_dns' function tests: 1 - - -def test_set_dhcp_dns(ethernet_config): - """ - Test if it set DNS source to DHCP on Windows. - """ - mock_cmd = MagicMock(return_value=ethernet_config) - with patch.dict(win_ip.__salt__, {"cmd.run": mock_cmd}): - assert win_ip.set_dhcp_dns("Ethernet") == { - "DNS Server": "DHCP", - "Interface": "Ethernet", - } - - -# 'set_dhcp_all' function tests: 1 - - -def test_set_dhcp_all(ethernet_config): - """ - Test if it set both IP Address and DNS to DHCP. - """ - mock_cmd = MagicMock(return_value=ethernet_config) - with patch.dict(win_ip.__salt__, {"cmd.run": mock_cmd}): - assert win_ip.set_dhcp_all("Ethernet") == { - "Interface": "Ethernet", - "DNS Server": "DHCP", - "DHCP enabled": "Yes", - } - - -# 'get_default_gateway' function tests: 1 - - -def test_get_default_gateway(ethernet_config): - """ - Test if it set DNS source to DHCP on Windows. - """ - mock_cmd = MagicMock(return_value=ethernet_config) - with patch.dict(win_ip.__salt__, {"cmd.run": mock_cmd}): - assert win_ip.get_default_gateway() == "1.2.3.1"