From 4e3389ec0ef0e3992fc4cbdb06b79510f938c0c2 Mon Sep 17 00:00:00 2001 From: Paul Gale Date: Fri, 3 Oct 2025 16:04:29 +0200 Subject: [PATCH 01/35] Update config_flow.py Add JSON external file lookup. --- .../network_scanner/config_flow.py | 179 +++++++++++++----- 1 file changed, 136 insertions(+), 43 deletions(-) diff --git a/custom_components/network_scanner/config_flow.py b/custom_components/network_scanner/config_flow.py index 13c1b19..abbb952 100644 --- a/custom_components/network_scanner/config_flow.py +++ b/custom_components/network_scanner/config_flow.py @@ -1,61 +1,154 @@ +import json +import logging import voluptuous as vol + from homeassistant import config_entries +from homeassistant.helpers.selector import selector + from .const import DOMAIN -import logging _LOGGER = logging.getLogger(__name__) +def _format_for_log(d: dict) -> dict: + return {k: str(v) for k, v in d.items()} + +def _normalise_mac_key(mac: str) -> str: + if not isinstance(mac, str): + return "" + return mac.upper() # keep separators as-is per your decision + +def _build_directory_from_legacy_yaml(yaml_cfg: dict) -> dict: + """ + Accept legacy keys like mac_mapping_1: "AA:BB:..|Name|Desc" + or "AA:BB:..=Name|Desc". Loosely parsed on |. + """ + directory = {} + for i in range(1, 999): # be generous; break when not found + key = f"mac_mapping_{i}" + if key not in yaml_cfg: + if i > 25: + break + continue + raw = str(yaml_cfg.get(key, "")).strip() + if not raw: + continue + # accepted formats: + # "AA:BB:CC:DD:EE:FF|Device Name|Description" + # "AA:BB:CC:DD:EE:FF=Device Name|Description" + if "=" in raw: + mac_part, payload = raw.split("=", 1) + else: + parts = raw.split("|") + mac_part, payload = parts[0], "|".join(parts[1:]) + mac = _normalise_mac_key(mac_part.strip()) + name, desc = "", "" + if "|" in payload: + name, desc = payload.split("|", 1) + else: + name = payload + directory[mac] = {"name": name.strip(), "desc": desc.strip()} + return directory + class NetworkScannerConfigFlow(config_entries.ConfigFlow, domain=DOMAIN): """Handle a config flow for Network Scanner.""" - async def async_step_user(self, user_input=None): - def format_dict_for_printing(d): - return {k: str(v) for k, v in d.items()} + VERSION = 1 + async def async_step_user(self, user_input=None): """Manage the configurations from the user interface.""" errors = {} - # Load data from configuration.yaml - yaml_config = self.hass.data.get(DOMAIN, {}) - _LOGGER.debug("YAML Config: %s", yaml_config) + # Load any YAML provided defaults (optional) + yaml_config = self.hass.data.get(DOMAIN, {}) or {} + _LOGGER.debug("YAML Config (raw): %s", _format_for_log(yaml_config)) + + # Build form schema with selectors (multiline text for JSON) + data_schema = vol.Schema( + { + vol.Required( + "ip_range", + description={"suggested_value": yaml_config.get("ip_range", "192.168.1.0/24")}, + ): str, + vol.Optional( + "mac_directory_json_text", + description={"suggested_value": yaml_config.get("mac_directory_json_text", "")}, + ): selector({"text": {"multiline": True}}), + vol.Optional( + "mac_directory_json_url", + description={"suggested_value": yaml_config.get("mac_directory_json_url", "")}, + ): str, + } + ) + + if user_input is None: + return self.async_show_form( + step_id="user", + data_schema=data_schema, + errors=errors, + description_placeholders={ + "description": "Enter the IP range and an optional MAC directory (JSON text or URL)." + }, + ) - if user_input is not None: - return self.async_create_entry(title="Network Scanner", data=user_input) + # --- Validate & assemble config entry data --- + ip_range = user_input.get("ip_range", "").strip() + json_text = user_input.get("mac_directory_json_text") or "" + json_url = (user_input.get("mac_directory_json_url") or "").strip() - data_schema_dict = { - vol.Required("ip_range", description={"suggested_value": yaml_config.get("ip_range", "192.168.1.0/24")}): str + mac_directory = {} + + # 1) Legacy YAML mac_mapping_* support (fold into directory) + legacy_from_yaml = _build_directory_from_legacy_yaml(yaml_config) + if legacy_from_yaml: + mac_directory.update(legacy_from_yaml) + + # 2) JSON pasted in textarea + if json_text: + try: + parsed = json.loads(json_text) + if not isinstance(parsed, dict): + errors["mac_directory_json_text"] = "invalid_json" + else: + # Support both {"AA:...":{"name":..,"desc":..}} and {"data": {...}} + data_block = parsed.get("data", parsed) + if not isinstance(data_block, dict): + errors["mac_directory_json_text"] = "invalid_json" + else: + for k, v in data_block.items(): + mac = _normalise_mac_key(k) + if not mac: + continue + if isinstance(v, dict): + name = v.get("name", "") + desc = v.get("desc", "") + else: + # allow simple string as name + name, desc = str(v), "" + mac_directory[mac] = {"name": str(name), "desc": str(desc)} + except Exception as exc: # noqa: BLE001 (we want to log anything) + _LOGGER.warning("Failed to parse mac_directory_json_text: %s", exc) + errors["mac_directory_json_text"] = "invalid_json" + + # 3) URL is optional; we just store it and let the coordinator fetch/refresh + # (Do not fetch here; config flows should avoid I/O where possible.) + entry_data = { + "ip_range": ip_range, + "mac_directory": mac_directory, # dict ready to use + "mac_directory_json_url": json_url, # optional; fetch later in update loop } - # Add mac mappings with values from YAML if available - for i in range(1, 26): # Ensure at least 25 entries - key = f"mac_mapping_{i}" - if key in yaml_config: - suggested_value = yaml_config.get(key) - _LOGGER.debug("YAML Config key: %s", suggested_value) - else: - suggested_value = None # No value in YAML config - - # Add the optional field to the schema, with or without suggested_value - data_schema_dict[vol.Optional(key, description={"suggested_value": suggested_value})] = str - - # Continue to add more mappings if available in the YAML config - i = 26 - while True: - key = f"mac_mapping_{i}" - if key in yaml_config: - suggested_value = yaml_config.get(key) - _LOGGER.debug("YAML Config key: %s", suggested_value) - data_schema_dict[vol.Optional(key, description={"suggested_value": suggested_value})] = str - i += 1 - else: - break # Exit loop when no more mappings are found in the YAML config - - _LOGGER.debug("schema: %s", format_dict_for_printing(data_schema_dict)) - data_schema = vol.Schema(data_schema_dict) - - return self.async_show_form( - step_id="user", - data_schema=data_schema, - errors=errors, - description_placeholders={"description": "Enter the IP range and MAC mappings"} + if errors: + return self.async_show_form( + step_id="user", + data_schema=data_schema, + errors=errors, + description_placeholders={"description": "Fix the errors and try again."}, + ) + + _LOGGER.debug( + "Creating entry: ip_range=%s, mac_directory_count=%d, url=%s", + ip_range, + len(mac_directory), + json_url or "-", ) + return self.async_create_entry(title="Network Scanner", data=entry_data) From b17ba0e3819e99795a01d29fb5ab4a6dce87fda5 Mon Sep 17 00:00:00 2001 From: Paul Gale Date: Fri, 3 Oct 2025 16:06:52 +0200 Subject: [PATCH 02/35] Update manifest.json Modify manifest to suit fork --- custom_components/network_scanner/manifest.json | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/custom_components/network_scanner/manifest.json b/custom_components/network_scanner/manifest.json index be34964..5e96482 100644 --- a/custom_components/network_scanner/manifest.json +++ b/custom_components/network_scanner/manifest.json @@ -1,12 +1,12 @@ { - "domain": "network_scanner", + "domain": "network_scanner_extended", "name": "Network Scanner", - "codeowners": ["@parvez"], + "codeowners": ["@paganl"], "config_flow": true, "dependencies": [], - "documentation": "https://github.com/parvez/network_scanner", + "documentation": "https://github.com/paganl/network_scanner_extended", "iot_class": "local_polling", - "issue_tracker": "https://github.com/parvez/network_scanner/issues", + "issue_tracker": "https://github.com/paganl/network_scanner_extended/issues", "requirements": ["python-nmap"], - "version": "1.0.7" + "version": "1.1.7" } From 003d26c8364609aac10b7d4367f53212250f6205 Mon Sep 17 00:00:00 2001 From: Paul Gale Date: Sat, 4 Oct 2025 16:17:54 +0100 Subject: [PATCH 03/35] Update hacs.json --- hacs.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hacs.json b/hacs.json index 41ca1d2..2b2ec13 100644 --- a/hacs.json +++ b/hacs.json @@ -1,5 +1,5 @@ { - "name": "Network Scanner", + "name": "Network Scanner Extended", "content_in_root": false, "zip_release": false, "render_readme": true, From dd13300aeb903a1fd402ddb96626e7aff0711f5b Mon Sep 17 00:00:00 2001 From: Paul Gale Date: Sat, 4 Oct 2025 16:42:27 +0100 Subject: [PATCH 04/35] Update manifest.json --- custom_components/network_scanner/manifest.json | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/custom_components/network_scanner/manifest.json b/custom_components/network_scanner/manifest.json index 5e96482..4b0375a 100644 --- a/custom_components/network_scanner/manifest.json +++ b/custom_components/network_scanner/manifest.json @@ -1,5 +1,5 @@ { - "domain": "network_scanner_extended", + "domain": "network_scanner", "name": "Network Scanner", "codeowners": ["@paganl"], "config_flow": true, @@ -8,5 +8,7 @@ "iot_class": "local_polling", "issue_tracker": "https://github.com/paganl/network_scanner_extended/issues", "requirements": ["python-nmap"], - "version": "1.1.7" + "version": "0.4.0" + "integration_type": hub + "iot_class": "local_polling" } From 91c43197e10d0876cac4026a34d6390a7aa98a67 Mon Sep 17 00:00:00 2001 From: Paul Gale Date: Sat, 4 Oct 2025 16:59:20 +0100 Subject: [PATCH 05/35] Update config_flow.py --- .../network_scanner/config_flow.py | 121 +++++++++++------- 1 file changed, 78 insertions(+), 43 deletions(-) diff --git a/custom_components/network_scanner/config_flow.py b/custom_components/network_scanner/config_flow.py index abbb952..26f0629 100644 --- a/custom_components/network_scanner/config_flow.py +++ b/custom_components/network_scanner/config_flow.py @@ -1,81 +1,108 @@ import json import logging +from ipaddress import ip_network import voluptuous as vol from homeassistant import config_entries -from homeassistant.helpers.selector import selector + +# Selector fallback for older HA cores +try: + from homeassistant.helpers.selector import selector as ha_selector + + def TextSelector(): + return ha_selector({"text": {"multiline": True}}) +except Exception: # pragma: no cover + def TextSelector(): + return str from .const import DOMAIN _LOGGER = logging.getLogger(__name__) + def _format_for_log(d: dict) -> dict: return {k: str(v) for k, v in d.items()} + def _normalise_mac_key(mac: str) -> str: - if not isinstance(mac, str): - return "" - return mac.upper() # keep separators as-is per your decision + return mac.upper() if isinstance(mac, str) else "" + def _build_directory_from_legacy_yaml(yaml_cfg: dict) -> dict: """ - Accept legacy keys like mac_mapping_1: "AA:BB:..|Name|Desc" - or "AA:BB:..=Name|Desc". Loosely parsed on |. + Accept legacy keys like: + mac_mapping_1: "AA:BB:..:FF|Name|Desc" + mac_mapping_2: "AA:BB:..:FF=Name|Desc" + Loosely parsed on "|" with optional "=" between MAC and payload. """ - directory = {} - for i in range(1, 999): # be generous; break when not found + directory: dict[str, dict] = {} + for i in range(1, 999): # generous upper bound; break after a gap beyond 25 key = f"mac_mapping_{i}" if key not in yaml_cfg: if i > 25: break continue + raw = str(yaml_cfg.get(key, "")).strip() if not raw: continue - # accepted formats: - # "AA:BB:CC:DD:EE:FF|Device Name|Description" - # "AA:BB:CC:DD:EE:FF=Device Name|Description" + if "=" in raw: mac_part, payload = raw.split("=", 1) else: parts = raw.split("|") mac_part, payload = parts[0], "|".join(parts[1:]) + mac = _normalise_mac_key(mac_part.strip()) + if not mac: + continue + name, desc = "", "" if "|" in payload: name, desc = payload.split("|", 1) else: name = payload + directory[mac] = {"name": name.strip(), "desc": desc.strip()} + return directory + class NetworkScannerConfigFlow(config_entries.ConfigFlow, domain=DOMAIN): - """Handle a config flow for Network Scanner.""" + """Handle the config flow for Network Scanner (Extended).""" VERSION = 1 async def async_step_user(self, user_input=None): - """Manage the configurations from the user interface.""" - errors = {} + errors: dict[str, str] = {} - # Load any YAML provided defaults (optional) + # Pick up any YAML defaults preserved by __init__ (optional) yaml_config = self.hass.data.get(DOMAIN, {}) or {} _LOGGER.debug("YAML Config (raw): %s", _format_for_log(yaml_config)) - # Build form schema with selectors (multiline text for JSON) data_schema = vol.Schema( { vol.Required( "ip_range", - description={"suggested_value": yaml_config.get("ip_range", "192.168.1.0/24")}, + description={ + "suggested_value": yaml_config.get("ip_range", "192.168.1.0/24") + }, ): str, vol.Optional( "mac_directory_json_text", - description={"suggested_value": yaml_config.get("mac_directory_json_text", "")}, - ): selector({"text": {"multiline": True}}), + description={ + "suggested_value": yaml_config.get( + "mac_directory_json_text", "" + ) + }, + ): TextSelector(), vol.Optional( "mac_directory_json_url", - description={"suggested_value": yaml_config.get("mac_directory_json_url", "")}, + description={ + "suggested_value": yaml_config.get( + "mac_directory_json_url", "" + ) + }, ): str, } ) @@ -90,17 +117,23 @@ async def async_step_user(self, user_input=None): }, ) - # --- Validate & assemble config entry data --- - ip_range = user_input.get("ip_range", "").strip() - json_text = user_input.get("mac_directory_json_text") or "" + # --- Validate & assemble entry data --- + ip_range = (user_input.get("ip_range") or "").strip() + json_text = (user_input.get("mac_directory_json_text") or "").strip() json_url = (user_input.get("mac_directory_json_url") or "").strip() - mac_directory = {} + # Validate CIDR/IP early + try: + ip_network(ip_range, strict=False) + except Exception: + errors["ip_range"] = "invalid_ip_range" + + mac_directory: dict[str, dict] = {} - # 1) Legacy YAML mac_mapping_* support (fold into directory) - legacy_from_yaml = _build_directory_from_legacy_yaml(yaml_config) - if legacy_from_yaml: - mac_directory.update(legacy_from_yaml) + # 1) Legacy YAML mac_mapping_* support + legacy = _build_directory_from_legacy_yaml(yaml_config) + if legacy: + mac_directory.update(legacy) # 2) JSON pasted in textarea if json_text: @@ -109,32 +142,31 @@ async def async_step_user(self, user_input=None): if not isinstance(parsed, dict): errors["mac_directory_json_text"] = "invalid_json" else: - # Support both {"AA:...":{"name":..,"desc":..}} and {"data": {...}} - data_block = parsed.get("data", parsed) - if not isinstance(data_block, dict): + # Accept both { "data": {...} } and flat { "AA:BB:..": {...} } + block = parsed.get("data", parsed) + if not isinstance(block, dict): errors["mac_directory_json_text"] = "invalid_json" else: - for k, v in data_block.items(): - mac = _normalise_mac_key(k) - if not mac: + for k, v in block.items(): + mk = _normalise_mac_key(k) + if not mk: continue if isinstance(v, dict): name = v.get("name", "") desc = v.get("desc", "") else: - # allow simple string as name name, desc = str(v), "" - mac_directory[mac] = {"name": str(name), "desc": str(desc)} - except Exception as exc: # noqa: BLE001 (we want to log anything) - _LOGGER.warning("Failed to parse mac_directory_json_text: %s", exc) + mac_directory[mk] = {"name": str(name), "desc": str(desc)} + except Exception as exc: # log and surface a friendly error + _LOGGER.warning( + "Failed to parse mac_directory_json_text: %s", exc, exc_info=True + ) errors["mac_directory_json_text"] = "invalid_json" - # 3) URL is optional; we just store it and let the coordinator fetch/refresh - # (Do not fetch here; config flows should avoid I/O where possible.) entry_data = { "ip_range": ip_range, - "mac_directory": mac_directory, # dict ready to use - "mac_directory_json_url": json_url, # optional; fetch later in update loop + "mac_directory": mac_directory, # processed dict with UPPERCASE keys + "mac_directory_json_url": json_url, # optional; fetch at runtime if you support it } if errors: @@ -151,4 +183,7 @@ async def async_step_user(self, user_input=None): len(mac_directory), json_url or "-", ) - return self.async_create_entry(title="Network Scanner", data=entry_data) + return self.async_create_entry( + title="Network Scanner Extended", + data=entry_data, + ) From 9b165fa6e0525f8067a6d7efc5b468b46d55e846 Mon Sep 17 00:00:00 2001 From: Paul Gale Date: Sat, 4 Oct 2025 17:00:45 +0100 Subject: [PATCH 06/35] Create en.json --- custom_components/network_scanner/en.json | 15 +++++++++++++++ 1 file changed, 15 insertions(+) create mode 100644 custom_components/network_scanner/en.json diff --git a/custom_components/network_scanner/en.json b/custom_components/network_scanner/en.json new file mode 100644 index 0000000..d35e969 --- /dev/null +++ b/custom_components/network_scanner/en.json @@ -0,0 +1,15 @@ +{ + "title": "Network Scanner Extended", + "config": { + "step": { + "user": { + "title": "Network Scanner Extended", + "description": "Enter the IP range and an optional MAC directory (JSON text or URL)." + } + }, + "error": { + "invalid_json": "That JSON isn’t valid. Paste a JSON object (optionally with a top-level \"data\" object).", + "invalid_ip_range": "Please enter a valid CIDR range, e.g. 192.168.1.0/24." + } + } +} From 6d9cb117c19e3d6a8d5cdaaaf4d39279d9b576bf Mon Sep 17 00:00:00 2001 From: Paul Gale Date: Sat, 4 Oct 2025 17:02:11 +0100 Subject: [PATCH 07/35] Update manifest.json --- custom_components/network_scanner/manifest.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/custom_components/network_scanner/manifest.json b/custom_components/network_scanner/manifest.json index 4b0375a..2a34368 100644 --- a/custom_components/network_scanner/manifest.json +++ b/custom_components/network_scanner/manifest.json @@ -1,6 +1,6 @@ { "domain": "network_scanner", - "name": "Network Scanner", + "name": "Network Scanner Extended", "codeowners": ["@paganl"], "config_flow": true, "dependencies": [], From 082d00a89b5ef461ffd2ea41bf194c2dc785c01c Mon Sep 17 00:00:00 2001 From: Paul Gale Date: Sat, 4 Oct 2025 17:06:31 +0100 Subject: [PATCH 08/35] Update sensor.py --- custom_components/network_scanner/sensor.py | 250 ++++++++++++-------- 1 file changed, 154 insertions(+), 96 deletions(-) diff --git a/custom_components/network_scanner/sensor.py b/custom_components/network_scanner/sensor.py index 2edbbec..1499785 100644 --- a/custom_components/network_scanner/sensor.py +++ b/custom_components/network_scanner/sensor.py @@ -1,134 +1,192 @@ +import json import logging -import nmap from datetime import timedelta +from typing import Any, Dict, List, Optional + +import nmap +from homeassistant.core import HomeAssistant from homeassistant.helpers.entity import Entity +from homeassistant.config_entries import ConfigEntry +from homeassistant.helpers.aiohttp_client import async_get_clientsession + from .const import DOMAIN SCAN_INTERVAL = timedelta(minutes=15) - _LOGGER = logging.getLogger(__name__) -class NetworkScanner(Entity): - """Representation of a Network Scanner.""" - def __init__(self, hass, ip_range, mac_mapping): - """Initialize the sensor.""" - self._state = None - self.hass = hass - self.ip_range = ip_range +def _norm_mac(mac: Optional[str]) -> str: + return (mac or "").upper() + + +def _parse_dir_obj(obj: Any) -> Dict[str, Dict[str, str]]: + """ + Accept either: + { "AA:BB:...": {"name":"..","desc":".."}, ... } + or: + { "data": { "AA:BB:...": {"name":"..","desc":".."}, ... } } + Returns a dict with UPPERCASE MAC keys. + """ + out: Dict[str, Dict[str, str]] = {} + if not isinstance(obj, dict): + return out + block = obj.get("data", obj) + if not isinstance(block, dict): + return out + for k, v in block.items(): + mk = _norm_mac(k) + if not mk: + continue + if isinstance(v, dict): + out[mk] = {"name": str(v.get("name", "")), "desc": str(v.get("desc", ""))} + else: + out[mk] = {"name": str(v), "desc": ""} + return out + + +async def _fetch_dir_from_url(hass: HomeAssistant, url: str) -> Dict[str, Dict[str, str]]: + """Best-effort fetch of a JSON directory from URL.""" + if not url: + return {} + try: + session = async_get_clientsession(hass) + async with session.get(url, timeout=10) as resp: + resp.raise_for_status() + # Let aiohttp detect content-type; fall back to text->json + text = await resp.text() + data = json.loads(text) + return _parse_dir_obj(data) + except Exception as exc: # swallow & warn; keep scanning + _LOGGER.warning("Failed to fetch directory from %s: %s", url, exc) + return {} - _LOGGER.debug("mac_mapping unparsed: %s", mac_mapping) - self.mac_mapping = self.parse_mac_mapping(mac_mapping) - _LOGGER.debug("mac_mapping parsed: %s", mac_mapping) - self.nm = nmap.PortScanner() - _LOGGER.info("Network Scanner initialized") +class NetworkScanner(Entity): + """Representation of a Network Scanner (Extended).""" - @property - def should_poll(self): - """Return True as updates are needed via polling.""" - return True + _attr_name = "Network Scanner Extended" + _attr_unit_of_measurement = "Devices" + _attr_should_poll = True - @property - def unique_id(self): - """Return unique ID.""" - return f"network_scanner_{self.ip_range}" + def __init__(self, hass: HomeAssistant, entry: ConfigEntry) -> None: + self.hass = hass + self.entry = entry + self.ip_range: str = entry.data.get("ip_range", "") + self._state: Optional[int] = None + self._devices: List[Dict[str, Any]] = [] + self.nm = nmap.PortScanner() + _LOGGER.info("Network Scanner Extended initialised for %s", self.ip_range) @property - def name(self): - return 'Network Scanner' + def unique_id(self) -> str: + return f"{DOMAIN}_{self.ip_range}" @property - def state(self): + def state(self) -> Optional[int]: return self._state @property - def unit_of_measurement(self): - return 'Devices' + def extra_state_attributes(self) -> Dict[str, Any]: + return {"devices": self._devices} - async def async_update(self): + async def async_update(self) -> None: """Fetch new state data for the sensor.""" try: - _LOGGER.debug("Scanning network") - devices = await self.hass.async_add_executor_job(self.scan_network) + # Build effective directory: data (setup) + options text/url (optional) + directory: Dict[str, Dict[str, str]] = dict(self.entry.data.get("mac_directory", {})) + + # Merge options JSON text (highest precedence) + opts = self.entry.options or {} + opt_text = (opts.get("mac_directory_json_text") or "").strip() + if opt_text: + try: + directory.update(_parse_dir_obj(json.loads(opt_text))) + except Exception as exc: + _LOGGER.warning("Options JSON invalid; ignoring: %s", exc) + + # Merge URL(s): options URL first, then data URL as fallback + opt_url = (opts.get("mac_directory_json_url") or "").strip() + data_url = (self.entry.data.get("mac_directory_json_url") or "").strip() + url_to_use = opt_url or data_url + if url_to_use: + fetched = await _fetch_dir_from_url(self.hass, url_to_use) + if fetched: + directory.update(fetched) + + _LOGGER.debug("Effective directory size: %d", len(directory)) + + # Scan network in executor (nmap is blocking) + devices = await self.hass.async_add_executor_job(self._scan_network, directory) + self._devices = devices self._state = len(devices) - self._attr_extra_state_attributes = {"devices": devices} except Exception as e: _LOGGER.error("Error updating network scanner: %s", e) - def parse_mac_mapping(self, mapping_string): - """Parse the MAC mapping string into a dictionary.""" - mapping = {} - for line in mapping_string.split('\n'): - parts = line.split(';') - if len(parts) >= 3: - mapping[parts[0].lower()] = (parts[1], parts[2]) - return mapping + # ---------- Internal helpers ---------- - def get_device_info_from_mac(self, mac_address): - """Retrieve device name and type from the MAC mapping.""" - return self.mac_mapping.get(mac_address.lower(), ("Unknown Device", "Unknown Device")) + def _lookup_override(self, directory: Dict[str, Dict[str, str]], mac: str) -> Dict[str, str]: + return directory.get(_norm_mac(mac), {}) - def scan_network(self): + def _scan_network(self, directory: Dict[str, Dict[str, str]]) -> List[Dict[str, Any]]: """Scan the network and return device information.""" - self.nm.scan(hosts=self.ip_range, arguments='-sn') - devices = [] + self.nm.scan(hosts=self.ip_range, arguments="-sn") + devices: List[Dict[str, Any]] = [] for host in self.nm.all_hosts(): - _LOGGER.debug("Found Host: %s", host) - if 'mac' in self.nm[host]['addresses']: - _LOGGER.debug("Found Mac: %s", self.nm[host]['addresses']) - ip = self.nm[host]['addresses']['ipv4'] - mac = self.nm[host]['addresses']['mac'] + try: + addrs = self.nm[host].get("addresses", {}) + mac = addrs.get("mac") + ip = addrs.get("ipv4") or addrs.get("ipv6") or "" + + if not mac or not ip: + continue # need both to be useful + + # Vendor (case-insensitive lookup in nmap's vendor map) vendor = "Unknown" - if 'vendor' in self.nm[host] and mac in self.nm[host]['vendor']: - vendor = self.nm[host]['vendor'][mac] - hostname = self.nm[host].hostname() - device_name, device_type = self.get_device_info_from_mac(mac) - devices.append({ - "ip": ip, - "mac": mac, - "name": device_name, - "type": device_type, - "vendor": vendor, - "hostname": hostname - }) - - # Sort the devices by IP address - devices.sort(key=lambda x: [int(num) for num in x['ip'].split('.')]) + ven_map = self.nm[host].get("vendor", {}) + if isinstance(ven_map, dict) and ven_map: + # nmap keys may be upper/lower; compare upper + for k, v in ven_map.items(): + if _norm_mac(k) == _norm_mac(mac): + vendor = v + break + + hostname = self.nm[host].hostname() or "" + + override = self._lookup_override(directory, mac) + device_name = override.get("name") or "Unknown Device" + device_type = override.get("desc") or "Unknown Device" + + devices.append( + { + "ip": ip, + "mac": mac, + "name": device_name, + "type": device_type, + "vendor": vendor, + "hostname": hostname, + } + ) + except Exception as exc: + _LOGGER.debug("Skipping host %s due to parse error: %s", host, exc) + + # Sort devices by IPv4 numerically (fallback: lexicographic) + def _ip_key(ip_str: str) -> List[int]: + try: + return [int(part) for part in ip_str.split(".")] + except Exception: + return [999, 999, 999, 999] + + devices.sort(key=lambda x: _ip_key(x.get("ip", ""))) return devices -async def async_setup_entry(hass, config_entry, async_add_entities): + +async def async_setup_entry(hass: HomeAssistant, config_entry: ConfigEntry, async_add_entities) -> None: """Set up the Network Scanner sensor from a config entry.""" ip_range = config_entry.data.get("ip_range") - _LOGGER.debug("ip_range: %s", config_entry.data.get("ip_range")) - - # Initialize mac_mappings list to ensure at least 25 entries - mac_mappings_list = [] - - # Ensure we have at least 25 entries, even if config is missing some - for i in range(25): - key = f"mac_mapping_{i+1}" - mac_mapping = config_entry.data.get(key, "") - mac_mappings_list.append(mac_mapping) - _LOGGER.debug("mac_mapping_%s: %s", i+1, mac_mapping) - - # Continue adding additional mac mappings if present in the config - i = 25 - while True: - key = f"mac_mapping_{i+1}" - if key in config_entry.data: - mac_mapping = config_entry.data.get(key) - mac_mappings_list.append(mac_mapping) - _LOGGER.debug("mac_mapping_%s: %s", i+1, mac_mapping) - i += 1 - else: - break - - # Combine mac mappings into a newline-separated string - mac_mappings = "\n".join(mac_mappings_list) - _LOGGER.debug("mac_mappings: %s", mac_mappings) + if not ip_range: + _LOGGER.error("No ip_range configured; aborting setup") + return - # Set up the network scanner entity - scanner = NetworkScanner(hass, ip_range, mac_mappings) - async_add_entities([scanner], True) + _LOGGER.debug("Setting up Network Scanner Extended for %s", ip_range) + async_add_entities([NetworkScanner(hass, config_entry)], True) From e2064229817c0a7cb2707aa8669f170a9eb3c06f Mon Sep 17 00:00:00 2001 From: Paul Gale Date: Sat, 4 Oct 2025 17:17:04 +0100 Subject: [PATCH 09/35] Update manifest.json --- custom_components/network_scanner/manifest.json | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/custom_components/network_scanner/manifest.json b/custom_components/network_scanner/manifest.json index 2a34368..4838085 100644 --- a/custom_components/network_scanner/manifest.json +++ b/custom_components/network_scanner/manifest.json @@ -7,8 +7,8 @@ "documentation": "https://github.com/paganl/network_scanner_extended", "iot_class": "local_polling", "issue_tracker": "https://github.com/paganl/network_scanner_extended/issues", - "requirements": ["python-nmap"], - "version": "0.4.0" - "integration_type": hub + "requirements": ["python-nmap==0.7.1"], + "version": "0.4.0", + "integration_type": "hub", "iot_class": "local_polling" } From ae2c7386d9e6222fb5cddc23223a7ec5d7b19d36 Mon Sep 17 00:00:00 2001 From: Paul Gale Date: Sat, 4 Oct 2025 17:34:19 +0100 Subject: [PATCH 10/35] Update config_flow.py --- .../network_scanner/config_flow.py | 72 +++++++++++++++++++ 1 file changed, 72 insertions(+) diff --git a/custom_components/network_scanner/config_flow.py b/custom_components/network_scanner/config_flow.py index 26f0629..3bc1b4f 100644 --- a/custom_components/network_scanner/config_flow.py +++ b/custom_components/network_scanner/config_flow.py @@ -187,3 +187,75 @@ async def async_step_user(self, user_input=None): title="Network Scanner Extended", data=entry_data, ) + +# ---- Options Flow ---- +from typing import Any, Dict + +class NetworkScannerOptionsFlow(config_entries.OptionsFlow): + """Options UI to edit settings after setup.""" + + def __init__(self, config_entry: config_entries.ConfigEntry) -> None: + self.config_entry = config_entry + + async def async_step_init(self, user_input: Dict[str, Any] | None = None): + # Entry point; show the same form as 'user' + return await self.async_step_user(user_input) + + async def async_step_user(self, user_input: Dict[str, Any] | None = None): + errors: dict[str, str] = {} + + # Defaults prefer options (if previously saved), then fall back to data + data = self.config_entry.data or {} + opts = self.config_entry.options or {} + + cur_ip_range = opts.get("ip_range", data.get("ip_range", "192.168.1.0/24")) + cur_json_text = opts.get("mac_directory_json_text", "") + cur_json_url = opts.get("mac_directory_json_url", data.get("mac_directory_json_url", "")) + + schema = vol.Schema({ + vol.Required("ip_range", description={"suggested_value": cur_ip_range}): str, + vol.Optional("mac_directory_json_text", + description={"suggested_value": cur_json_text}): TextSelector(), + vol.Optional("mac_directory_json_url", + description={"suggested_value": cur_json_url}): str, + }) + + if user_input is None: + return self.async_show_form(step_id="user", data_schema=schema, errors=errors) + + # Validate IP/CIDR + ipr = (user_input.get("ip_range") or "").strip() + from ipaddress import ip_network + try: + ip_network(ipr, strict=False) + except Exception: + errors["ip_range"] = "invalid_ip_range" + + # Lightly validate JSON text (don’t store parsed dict in options) + jtxt = (user_input.get("mac_directory_json_text") or "").strip() + if jtxt: + try: + parsed = json.loads(jtxt) + block = parsed.get("data", parsed) if isinstance(parsed, dict) else {} + if not isinstance(block, dict): + errors["mac_directory_json_text"] = "invalid_json" + except Exception: + errors["mac_directory_json_text"] = "invalid_json" + + jurl = (user_input.get("mac_directory_json_url") or "").strip() + + if errors: + return self.async_show_form(step_id="user", data_schema=schema, errors=errors) + + # Save OPTIONS only. (Your setup uses entry.data; options override at runtime.) + new_options = { + "ip_range": ipr, + "mac_directory_json_text": jtxt, + "mac_directory_json_url": jurl, + } + return self.async_create_entry(title="", data=new_options) + + +async def async_get_options_flow(config_entry: config_entries.ConfigEntry): + """Tell HA how to get the options flow.""" + return NetworkScannerOptionsFlow(config_entry) From f2da2075a4a0310fd0419e08c01ae313ef1ab65b Mon Sep 17 00:00:00 2001 From: Paul Gale Date: Sat, 4 Oct 2025 17:36:39 +0100 Subject: [PATCH 11/35] Update __init__.py --- custom_components/network_scanner/__init__.py | 37 ++++++++++++++----- 1 file changed, 27 insertions(+), 10 deletions(-) diff --git a/custom_components/network_scanner/__init__.py b/custom_components/network_scanner/__init__.py index 62290d9..6242c09 100644 --- a/custom_components/network_scanner/__init__.py +++ b/custom_components/network_scanner/__init__.py @@ -1,18 +1,35 @@ -from .sensor import NetworkScanner +from __future__ import annotations + +from homeassistant.core import HomeAssistant +from homeassistant.config_entries import ConfigEntry + from .const import DOMAIN -async def async_setup(hass, config): - """Set up the Network Scanner component.""" - # Store YAML configuration in hass.data - hass.data[DOMAIN] = config.get(DOMAIN, {}) +PLATFORMS: list[str] = ["sensor"] + + +async def async_setup(hass: HomeAssistant, config: dict) -> bool: + """Set up Network Scanner (store YAML defaults for the config flow).""" + # Keep any YAML under hass.data[DOMAIN] so config_flow can read suggested values + hass.data.setdefault(DOMAIN, config.get(DOMAIN, {}) or {}) return True -async def async_setup_entry(hass, config_entry): + +async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: """Set up Network Scanner from a config entry.""" - await hass.config_entries.async_forward_entry_setups(config_entry, ["sensor"]) + # Reload automatically when options change (enables 'Configure' button behaviour) + entry.async_on_unload(entry.add_update_listener(_async_update_listener)) + + await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS) return True -async def async_unload_entry(hass, config_entry): + +async def _async_update_listener(hass: HomeAssistant, entry: ConfigEntry) -> None: + """Handle options updates by reloading the entry.""" + await hass.config_entries.async_reload(entry.entry_id) + + +async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: """Unload a config entry.""" - await hass.config_entries.async_forward_entry_unload(config_entry, "sensor") - return True + unload_ok = await hass.config_entries.async_unload_platforms(entry, PLATFORMS) + return unload_ok From 89deec24d198fb6c19a42b4b273d792d4e21af77 Mon Sep 17 00:00:00 2001 From: Paul Gale Date: Sat, 4 Oct 2025 17:37:17 +0100 Subject: [PATCH 12/35] Update manifest.json --- custom_components/network_scanner/manifest.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/custom_components/network_scanner/manifest.json b/custom_components/network_scanner/manifest.json index 4838085..545bd86 100644 --- a/custom_components/network_scanner/manifest.json +++ b/custom_components/network_scanner/manifest.json @@ -8,7 +8,7 @@ "iot_class": "local_polling", "issue_tracker": "https://github.com/paganl/network_scanner_extended/issues", "requirements": ["python-nmap==0.7.1"], - "version": "0.4.0", + "version": "0.5.0", "integration_type": "hub", "iot_class": "local_polling" } From db48d78ffe44fc514c0676907d47e9ac1aec0700 Mon Sep 17 00:00:00 2001 From: Paul Gale Date: Sat, 4 Oct 2025 17:50:27 +0100 Subject: [PATCH 13/35] Update config_flow.py --- .../network_scanner/config_flow.py | 76 +++++++++++++++++++ 1 file changed, 76 insertions(+) diff --git a/custom_components/network_scanner/config_flow.py b/custom_components/network_scanner/config_flow.py index 3bc1b4f..1b18440 100644 --- a/custom_components/network_scanner/config_flow.py +++ b/custom_components/network_scanner/config_flow.py @@ -259,3 +259,79 @@ async def async_step_user(self, user_input: Dict[str, Any] | None = None): async def async_get_options_flow(config_entry: config_entries.ConfigEntry): """Tell HA how to get the options flow.""" return NetworkScannerOptionsFlow(config_entry) + +# at top of file +from homeassistant import config_entries +import voluptuous as vol +import json + +# … your existing ConfigFlow code … + +# ---- Options Flow (add this block) ---- +from typing import Any, Dict + +class NetworkScannerOptionsFlow(config_entries.OptionsFlow): + def __init__(self, config_entry: config_entries.ConfigEntry) -> None: + self.config_entry = config_entry + + async def async_step_init(self, user_input: Dict[str, Any] | None = None): + return await self.async_step_user(user_input) + + async def async_step_user(self, user_input: Dict[str, Any] | None = None): + errors: dict[str, str] = {} + + data = self.config_entry.data or {} + opts = self.config_entry.options or {} + + cur_ip_range = opts.get("ip_range", data.get("ip_range", "192.168.1.0/24")) + cur_json_text = opts.get("mac_directory_json_text", "") + cur_json_url = opts.get("mac_directory_json_url", data.get("mac_directory_json_url","")) + + # Reuse your TextSelector() helper; if you don’t have it, just use str + try: + from homeassistant.helpers.selector import selector as ha_selector + TextSelector = lambda: ha_selector({"text": {"multiline": True}}) + except Exception: + TextSelector = lambda: str + + schema = vol.Schema({ + vol.Required("ip_range", description={"suggested_value": cur_ip_range}): str, + vol.Optional("mac_directory_json_text", description={"suggested_value": cur_json_text}): TextSelector(), + vol.Optional("mac_directory_json_url", description={"suggested_value": cur_json_url}): str, + }) + + if user_input is None: + return self.async_show_form(step_id="user", data_schema=schema, errors=errors) + + # Minimal validation (match your ConfigFlow) + from ipaddress import ip_network + ipr = (user_input.get("ip_range") or "").strip() + try: + ip_network(ipr, strict=False) + except Exception: + errors["ip_range"] = "invalid_ip_range" + + txt = (user_input.get("mac_directory_json_text") or "").strip() + if txt: + try: + parsed = json.loads(txt) + block = parsed.get("data", parsed) if isinstance(parsed, dict) else {} + if not isinstance(block, dict): + errors["mac_directory_json_text"] = "invalid_json" + except Exception: + errors["mac_directory_json_text"] = "invalid_json" + + url = (user_input.get("mac_directory_json_url") or "").strip() + + if errors: + return self.async_show_form(step_id="user", data_schema=schema, errors=errors) + + return self.async_create_entry(title="", data={ + "ip_range": ipr, + "mac_directory_json_text": txt, + "mac_directory_json_url": url, + }) + +# Tell HA how to get the options flow +async def async_get_options_flow(config_entry: config_entries.ConfigEntry): + return NetworkScannerOptionsFlow(config_entry) From b6926a0f6ef831ffd4c7ed77af6393fa715aebb3 Mon Sep 17 00:00:00 2001 From: Paul Gale Date: Sat, 4 Oct 2025 17:52:07 +0100 Subject: [PATCH 14/35] Update __init__.py --- custom_components/network_scanner/__init__.py | 21 ++++--------------- 1 file changed, 4 insertions(+), 17 deletions(-) diff --git a/custom_components/network_scanner/__init__.py b/custom_components/network_scanner/__init__.py index 6242c09..5a3e07f 100644 --- a/custom_components/network_scanner/__init__.py +++ b/custom_components/network_scanner/__init__.py @@ -1,35 +1,22 @@ from __future__ import annotations -from homeassistant.core import HomeAssistant from homeassistant.config_entries import ConfigEntry - +from homeassistant.core import HomeAssistant from .const import DOMAIN -PLATFORMS: list[str] = ["sensor"] - +PLATFORMS = ["sensor"] async def async_setup(hass: HomeAssistant, config: dict) -> bool: - """Set up Network Scanner (store YAML defaults for the config flow).""" - # Keep any YAML under hass.data[DOMAIN] so config_flow can read suggested values hass.data.setdefault(DOMAIN, config.get(DOMAIN, {}) or {}) return True - async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: - """Set up Network Scanner from a config entry.""" - # Reload automatically when options change (enables 'Configure' button behaviour) entry.async_on_unload(entry.add_update_listener(_async_update_listener)) - await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS) return True - -async def _async_update_listener(hass: HomeAssistant, entry: ConfigEntry) -> None: - """Handle options updates by reloading the entry.""" +async def _async_update_listener(hass: HomeAssistant, entry: ConfigEntry): await hass.config_entries.async_reload(entry.entry_id) - async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: - """Unload a config entry.""" - unload_ok = await hass.config_entries.async_unload_platforms(entry, PLATFORMS) - return unload_ok + return await hass.config_entries.async_unload_platforms(entry, PLATFORMS) From 6242fe61a0e3739fade840b19bebaa5e907cc09a Mon Sep 17 00:00:00 2001 From: Paul Gale Date: Sat, 4 Oct 2025 17:53:56 +0100 Subject: [PATCH 15/35] Update __init__.py --- custom_components/network_scanner/__init__.py | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/custom_components/network_scanner/__init__.py b/custom_components/network_scanner/__init__.py index 5a3e07f..fe0cd73 100644 --- a/custom_components/network_scanner/__init__.py +++ b/custom_components/network_scanner/__init__.py @@ -1,22 +1,32 @@ from __future__ import annotations -from homeassistant.config_entries import ConfigEntry from homeassistant.core import HomeAssistant +from homeassistant.config_entries import ConfigEntry + from .const import DOMAIN -PLATFORMS = ["sensor"] +PLATFORMS: list[str] = ["sensor"] + async def async_setup(hass: HomeAssistant, config: dict) -> bool: + """Set up Network Scanner (store YAML defaults for the config flow).""" hass.data.setdefault(DOMAIN, config.get(DOMAIN, {}) or {}) return True + async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: + """Set up Network Scanner from a config entry.""" entry.async_on_unload(entry.add_update_listener(_async_update_listener)) await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS) return True -async def _async_update_listener(hass: HomeAssistant, entry: ConfigEntry): + +async def _async_update_listener(hass: HomeAssistant, entry: ConfigEntry) -> None: + """Handle options updates by reloading the entry.""" await hass.config_entries.async_reload(entry.entry_id) + async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: - return await hass.config_entries.async_unload_platforms(entry, PLATFORMS) + """Unload a config entry.""" + unload_ok = await hass.config_entries.async_unload_platforms(entry, PLATFORMS) + return unload_ok From a569d58d7a7019c30ee46f94c0d2bb069254fff1 Mon Sep 17 00:00:00 2001 From: Paul Gale Date: Sat, 4 Oct 2025 17:54:37 +0100 Subject: [PATCH 16/35] Update manifest.json --- custom_components/network_scanner/manifest.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/custom_components/network_scanner/manifest.json b/custom_components/network_scanner/manifest.json index 545bd86..a33f87d 100644 --- a/custom_components/network_scanner/manifest.json +++ b/custom_components/network_scanner/manifest.json @@ -8,7 +8,7 @@ "iot_class": "local_polling", "issue_tracker": "https://github.com/paganl/network_scanner_extended/issues", "requirements": ["python-nmap==0.7.1"], - "version": "0.5.0", + "version": "0.6.0", "integration_type": "hub", "iot_class": "local_polling" } From f0b7b9afa3114021886a7e1c9653bb3d490c4491 Mon Sep 17 00:00:00 2001 From: Paul Gale Date: Sat, 4 Oct 2025 18:42:49 +0100 Subject: [PATCH 17/35] Update config_flow.py --- .../network_scanner/config_flow.py | 111 ++++-------------- 1 file changed, 23 insertions(+), 88 deletions(-) diff --git a/custom_components/network_scanner/config_flow.py b/custom_components/network_scanner/config_flow.py index 1b18440..927fed4 100644 --- a/custom_components/network_scanner/config_flow.py +++ b/custom_components/network_scanner/config_flow.py @@ -8,7 +8,6 @@ # Selector fallback for older HA cores try: from homeassistant.helpers.selector import selector as ha_selector - def TextSelector(): return ha_selector({"text": {"multiline": True}}) except Exception: # pragma: no cover @@ -188,7 +187,8 @@ async def async_step_user(self, user_input=None): data=entry_data, ) -# ---- Options Flow ---- + +# ---- Options Flow (single, clean definition) ---- from typing import Any, Dict class NetworkScannerOptionsFlow(config_entries.OptionsFlow): @@ -210,22 +210,33 @@ async def async_step_user(self, user_input: Dict[str, Any] | None = None): cur_ip_range = opts.get("ip_range", data.get("ip_range", "192.168.1.0/24")) cur_json_text = opts.get("mac_directory_json_text", "") - cur_json_url = opts.get("mac_directory_json_url", data.get("mac_directory_json_url", "")) + cur_json_url = opts.get( + "mac_directory_json_url", data.get("mac_directory_json_url", "") + ) - schema = vol.Schema({ - vol.Required("ip_range", description={"suggested_value": cur_ip_range}): str, - vol.Optional("mac_directory_json_text", - description={"suggested_value": cur_json_text}): TextSelector(), - vol.Optional("mac_directory_json_url", - description={"suggested_value": cur_json_url}): str, - }) + schema = vol.Schema( + { + vol.Required( + "ip_range", description={"suggested_value": cur_ip_range} + ): str, + vol.Optional( + "mac_directory_json_text", + description={"suggested_value": cur_json_text}, + ): TextSelector(), + vol.Optional( + "mac_directory_json_url", + description={"suggested_value": cur_json_url}, + ): str, + } + ) if user_input is None: - return self.async_show_form(step_id="user", data_schema=schema, errors=errors) + return self.async_show_form( + step_id="user", data_schema=schema, errors=errors + ) # Validate IP/CIDR ipr = (user_input.get("ip_range") or "").strip() - from ipaddress import ip_network try: ip_network(ipr, strict=False) except Exception: @@ -259,79 +270,3 @@ async def async_step_user(self, user_input: Dict[str, Any] | None = None): async def async_get_options_flow(config_entry: config_entries.ConfigEntry): """Tell HA how to get the options flow.""" return NetworkScannerOptionsFlow(config_entry) - -# at top of file -from homeassistant import config_entries -import voluptuous as vol -import json - -# … your existing ConfigFlow code … - -# ---- Options Flow (add this block) ---- -from typing import Any, Dict - -class NetworkScannerOptionsFlow(config_entries.OptionsFlow): - def __init__(self, config_entry: config_entries.ConfigEntry) -> None: - self.config_entry = config_entry - - async def async_step_init(self, user_input: Dict[str, Any] | None = None): - return await self.async_step_user(user_input) - - async def async_step_user(self, user_input: Dict[str, Any] | None = None): - errors: dict[str, str] = {} - - data = self.config_entry.data or {} - opts = self.config_entry.options or {} - - cur_ip_range = opts.get("ip_range", data.get("ip_range", "192.168.1.0/24")) - cur_json_text = opts.get("mac_directory_json_text", "") - cur_json_url = opts.get("mac_directory_json_url", data.get("mac_directory_json_url","")) - - # Reuse your TextSelector() helper; if you don’t have it, just use str - try: - from homeassistant.helpers.selector import selector as ha_selector - TextSelector = lambda: ha_selector({"text": {"multiline": True}}) - except Exception: - TextSelector = lambda: str - - schema = vol.Schema({ - vol.Required("ip_range", description={"suggested_value": cur_ip_range}): str, - vol.Optional("mac_directory_json_text", description={"suggested_value": cur_json_text}): TextSelector(), - vol.Optional("mac_directory_json_url", description={"suggested_value": cur_json_url}): str, - }) - - if user_input is None: - return self.async_show_form(step_id="user", data_schema=schema, errors=errors) - - # Minimal validation (match your ConfigFlow) - from ipaddress import ip_network - ipr = (user_input.get("ip_range") or "").strip() - try: - ip_network(ipr, strict=False) - except Exception: - errors["ip_range"] = "invalid_ip_range" - - txt = (user_input.get("mac_directory_json_text") or "").strip() - if txt: - try: - parsed = json.loads(txt) - block = parsed.get("data", parsed) if isinstance(parsed, dict) else {} - if not isinstance(block, dict): - errors["mac_directory_json_text"] = "invalid_json" - except Exception: - errors["mac_directory_json_text"] = "invalid_json" - - url = (user_input.get("mac_directory_json_url") or "").strip() - - if errors: - return self.async_show_form(step_id="user", data_schema=schema, errors=errors) - - return self.async_create_entry(title="", data={ - "ip_range": ipr, - "mac_directory_json_text": txt, - "mac_directory_json_url": url, - }) - -# Tell HA how to get the options flow -async def async_get_options_flow(config_entry: config_entries.ConfigEntry): - return NetworkScannerOptionsFlow(config_entry) From a502c05abf898a4647cacfb728a8ff0a2ef0eaec Mon Sep 17 00:00:00 2001 From: Paul Gale Date: Sat, 4 Oct 2025 18:43:26 +0100 Subject: [PATCH 18/35] Update manifest.json --- custom_components/network_scanner/manifest.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/custom_components/network_scanner/manifest.json b/custom_components/network_scanner/manifest.json index a33f87d..8e517ee 100644 --- a/custom_components/network_scanner/manifest.json +++ b/custom_components/network_scanner/manifest.json @@ -8,7 +8,7 @@ "iot_class": "local_polling", "issue_tracker": "https://github.com/paganl/network_scanner_extended/issues", "requirements": ["python-nmap==0.7.1"], - "version": "0.6.0", + "version": "0.6.1", "integration_type": "hub", "iot_class": "local_polling" } From 4211e0b7ac45ea52dc49342f0419729658d34604 Mon Sep 17 00:00:00 2001 From: Paul Gale Date: Sat, 4 Oct 2025 19:12:49 +0100 Subject: [PATCH 19/35] Update config_flow.py --- custom_components/network_scanner/config_flow.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/custom_components/network_scanner/config_flow.py b/custom_components/network_scanner/config_flow.py index 927fed4..5cee57f 100644 --- a/custom_components/network_scanner/config_flow.py +++ b/custom_components/network_scanner/config_flow.py @@ -17,7 +17,7 @@ def TextSelector(): from .const import DOMAIN _LOGGER = logging.getLogger(__name__) - +_LOGGER.warning("network_scanner: config_flow module imported") def _format_for_log(d: dict) -> dict: return {k: str(v) for k, v in d.items()} From 9720367af0877fa332c4a861a1e9d99c5a083772 Mon Sep 17 00:00:00 2001 From: Paul Gale Date: Sat, 4 Oct 2025 19:13:23 +0100 Subject: [PATCH 20/35] Update manifest.json --- custom_components/network_scanner/manifest.json | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/custom_components/network_scanner/manifest.json b/custom_components/network_scanner/manifest.json index 8e517ee..c75b920 100644 --- a/custom_components/network_scanner/manifest.json +++ b/custom_components/network_scanner/manifest.json @@ -5,10 +5,9 @@ "config_flow": true, "dependencies": [], "documentation": "https://github.com/paganl/network_scanner_extended", - "iot_class": "local_polling", "issue_tracker": "https://github.com/paganl/network_scanner_extended/issues", "requirements": ["python-nmap==0.7.1"], - "version": "0.6.1", + "version": "0.6.5", "integration_type": "hub", "iot_class": "local_polling" } From af4c8604726bf411cb6130adca3f5164f42d3ab3 Mon Sep 17 00:00:00 2001 From: Paul Gale Date: Sat, 4 Oct 2025 19:47:28 +0100 Subject: [PATCH 21/35] Update manifest.json --- custom_components/network_scanner/manifest.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/custom_components/network_scanner/manifest.json b/custom_components/network_scanner/manifest.json index c75b920..3c62766 100644 --- a/custom_components/network_scanner/manifest.json +++ b/custom_components/network_scanner/manifest.json @@ -7,7 +7,7 @@ "documentation": "https://github.com/paganl/network_scanner_extended", "issue_tracker": "https://github.com/paganl/network_scanner_extended/issues", "requirements": ["python-nmap==0.7.1"], - "version": "0.6.5", + "version": "0.6.6", "integration_type": "hub", "iot_class": "local_polling" } From d1a43355856c0ce355073f1503f47157c5a71e0f Mon Sep 17 00:00:00 2001 From: Paul Gale Date: Sat, 4 Oct 2025 21:43:13 +0100 Subject: [PATCH 22/35] Update config_flow.py --- custom_components/network_scanner/config_flow.py | 1 + 1 file changed, 1 insertion(+) diff --git a/custom_components/network_scanner/config_flow.py b/custom_components/network_scanner/config_flow.py index 5cee57f..21159e6 100644 --- a/custom_components/network_scanner/config_flow.py +++ b/custom_components/network_scanner/config_flow.py @@ -269,4 +269,5 @@ async def async_step_user(self, user_input: Dict[str, Any] | None = None): async def async_get_options_flow(config_entry: config_entries.ConfigEntry): """Tell HA how to get the options flow.""" + _LOGGER.warning("network_scanner: options flow factory called for %s", config_entry.entry_id) return NetworkScannerOptionsFlow(config_entry) From 45fad54ed5d25c5abebfc7a9bd50e49f521065db Mon Sep 17 00:00:00 2001 From: Paul Gale Date: Sat, 4 Oct 2025 21:46:54 +0100 Subject: [PATCH 23/35] Update sensor.py --- custom_components/network_scanner/sensor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/custom_components/network_scanner/sensor.py b/custom_components/network_scanner/sensor.py index 1499785..84c9768 100644 --- a/custom_components/network_scanner/sensor.py +++ b/custom_components/network_scanner/sensor.py @@ -189,4 +189,4 @@ async def async_setup_entry(hass: HomeAssistant, config_entry: ConfigEntry, asyn return _LOGGER.debug("Setting up Network Scanner Extended for %s", ip_range) - async_add_entities([NetworkScanner(hass, config_entry)], True) + async_add_entities([NetworkScanner(hass, config_entry)], False) From f3e8bd60ce0eafc8b6ac6a11846d7b9bc33c6bbf Mon Sep 17 00:00:00 2001 From: Paul Gale Date: Sat, 4 Oct 2025 21:47:56 +0100 Subject: [PATCH 24/35] Update manifest.json --- custom_components/network_scanner/manifest.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/custom_components/network_scanner/manifest.json b/custom_components/network_scanner/manifest.json index 3c62766..ee38379 100644 --- a/custom_components/network_scanner/manifest.json +++ b/custom_components/network_scanner/manifest.json @@ -7,7 +7,7 @@ "documentation": "https://github.com/paganl/network_scanner_extended", "issue_tracker": "https://github.com/paganl/network_scanner_extended/issues", "requirements": ["python-nmap==0.7.1"], - "version": "0.6.6", + "version": "0.6.7", "integration_type": "hub", "iot_class": "local_polling" } From 1579cdbc63f52a049096019e1a19c0f7d381027e Mon Sep 17 00:00:00 2001 From: Paul Gale Date: Mon, 6 Oct 2025 14:20:07 +0100 Subject: [PATCH 25/35] Update manifest.json --- custom_components/network_scanner/manifest.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/custom_components/network_scanner/manifest.json b/custom_components/network_scanner/manifest.json index ee38379..66a693b 100644 --- a/custom_components/network_scanner/manifest.json +++ b/custom_components/network_scanner/manifest.json @@ -7,7 +7,7 @@ "documentation": "https://github.com/paganl/network_scanner_extended", "issue_tracker": "https://github.com/paganl/network_scanner_extended/issues", "requirements": ["python-nmap==0.7.1"], - "version": "0.6.7", + "version": "0.6.8", "integration_type": "hub", "iot_class": "local_polling" } From 861c387a0a0ec0c00860261284ba4c4232394918 Mon Sep 17 00:00:00 2001 From: Paul Gale Date: Mon, 6 Oct 2025 14:21:59 +0100 Subject: [PATCH 26/35] Update __init__.py --- custom_components/network_scanner/__init__.py | 15 +++------------ 1 file changed, 3 insertions(+), 12 deletions(-) diff --git a/custom_components/network_scanner/__init__.py b/custom_components/network_scanner/__init__.py index fe0cd73..0e0c20a 100644 --- a/custom_components/network_scanner/__init__.py +++ b/custom_components/network_scanner/__init__.py @@ -1,32 +1,23 @@ from __future__ import annotations - +import logging from homeassistant.core import HomeAssistant from homeassistant.config_entries import ConfigEntry - from .const import DOMAIN +_LOGGER = logging.getLogger(__name__) PLATFORMS: list[str] = ["sensor"] - async def async_setup(hass: HomeAssistant, config: dict) -> bool: - """Set up Network Scanner (store YAML defaults for the config flow).""" hass.data.setdefault(DOMAIN, config.get(DOMAIN, {}) or {}) return True - async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: - """Set up Network Scanner from a config entry.""" entry.async_on_unload(entry.add_update_listener(_async_update_listener)) await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS) return True - async def _async_update_listener(hass: HomeAssistant, entry: ConfigEntry) -> None: - """Handle options updates by reloading the entry.""" await hass.config_entries.async_reload(entry.entry_id) - async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: - """Unload a config entry.""" - unload_ok = await hass.config_entries.async_unload_platforms(entry, PLATFORMS) - return unload_ok + return await hass.config_entries.async_unload_platforms(entry, PLATFORMS) From 85f4697ad353cc16ee6412452402b0c70ffdcdc4 Mon Sep 17 00:00:00 2001 From: Paul Gale Date: Mon, 6 Oct 2025 14:23:27 +0100 Subject: [PATCH 27/35] Update config_flow.py --- custom_components/network_scanner/config_flow.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/custom_components/network_scanner/config_flow.py b/custom_components/network_scanner/config_flow.py index 21159e6..905deda 100644 --- a/custom_components/network_scanner/config_flow.py +++ b/custom_components/network_scanner/config_flow.py @@ -267,7 +267,5 @@ async def async_step_user(self, user_input: Dict[str, Any] | None = None): return self.async_create_entry(title="", data=new_options) -async def async_get_options_flow(config_entry: config_entries.ConfigEntry): - """Tell HA how to get the options flow.""" - _LOGGER.warning("network_scanner: options flow factory called for %s", config_entry.entry_id) - return NetworkScannerOptionsFlow(config_entry) + async def async_get_options_flow(config_entry: config_entries.ConfigEntry): + return NetworkScannerOptionsFlow(config_entry) From d8bf3e4bcd121ce34d934afca222a4c861d0790b Mon Sep 17 00:00:00 2001 From: Paul Gale Date: Mon, 6 Oct 2025 14:24:37 +0100 Subject: [PATCH 28/35] Update config_flow.py --- custom_components/network_scanner/config_flow.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/custom_components/network_scanner/config_flow.py b/custom_components/network_scanner/config_flow.py index 905deda..53ba71f 100644 --- a/custom_components/network_scanner/config_flow.py +++ b/custom_components/network_scanner/config_flow.py @@ -260,12 +260,12 @@ async def async_step_user(self, user_input: Dict[str, Any] | None = None): # Save OPTIONS only. (Your setup uses entry.data; options override at runtime.) new_options = { - "ip_range": ipr, + "ip_range": ipr "mac_directory_json_text": jtxt, "mac_directory_json_url": jurl, } return self.async_create_entry(title="", data=new_options) - async def async_get_options_flow(config_entry: config_entries.ConfigEntry): - return NetworkScannerOptionsFlow(config_entry) + async def async_get_options_flow(config_entry: config_entries.ConfigEntry): + return NetworkScannerOptionsFlow(config_entry) From 405ac6866b266aab56a89ac0fb2205ec8bf1af0b Mon Sep 17 00:00:00 2001 From: Paul Gale Date: Mon, 6 Oct 2025 14:25:30 +0100 Subject: [PATCH 29/35] Update sensor.py --- custom_components/network_scanner/sensor.py | 111 ++++++++------------ 1 file changed, 42 insertions(+), 69 deletions(-) diff --git a/custom_components/network_scanner/sensor.py b/custom_components/network_scanner/sensor.py index 84c9768..9c25d3c 100644 --- a/custom_components/network_scanner/sensor.py +++ b/custom_components/network_scanner/sensor.py @@ -1,32 +1,21 @@ -import json -import logging -from datetime import timedelta +from __future__ import annotations +import json, logging +from datetime import timedelta, datetime from typing import Any, Dict, List, Optional - import nmap from homeassistant.core import HomeAssistant from homeassistant.helpers.entity import Entity from homeassistant.config_entries import ConfigEntry from homeassistant.helpers.aiohttp_client import async_get_clientsession - from .const import DOMAIN SCAN_INTERVAL = timedelta(minutes=15) _LOGGER = logging.getLogger(__name__) - def _norm_mac(mac: Optional[str]) -> str: return (mac or "").upper() - def _parse_dir_obj(obj: Any) -> Dict[str, Dict[str, str]]: - """ - Accept either: - { "AA:BB:...": {"name":"..","desc":".."}, ... } - or: - { "data": { "AA:BB:...": {"name":"..","desc":".."}, ... } } - Returns a dict with UPPERCASE MAC keys. - """ out: Dict[str, Dict[str, str]] = {} if not isinstance(obj, dict): return out @@ -43,27 +32,20 @@ def _parse_dir_obj(obj: Any) -> Dict[str, Dict[str, str]]: out[mk] = {"name": str(v), "desc": ""} return out - async def _fetch_dir_from_url(hass: HomeAssistant, url: str) -> Dict[str, Dict[str, str]]: - """Best-effort fetch of a JSON directory from URL.""" if not url: return {} try: session = async_get_clientsession(hass) async with session.get(url, timeout=10) as resp: resp.raise_for_status() - # Let aiohttp detect content-type; fall back to text->json - text = await resp.text() - data = json.loads(text) + data = json.loads(await resp.text()) return _parse_dir_obj(data) - except Exception as exc: # swallow & warn; keep scanning + except Exception as exc: _LOGGER.warning("Failed to fetch directory from %s: %s", url, exc) return {} - class NetworkScanner(Entity): - """Representation of a Network Scanner (Extended).""" - _attr_name = "Network Scanner Extended" _attr_unit_of_measurement = "Devices" _attr_should_poll = True @@ -74,6 +56,11 @@ def __init__(self, hass: HomeAssistant, entry: ConfigEntry) -> None: self.ip_range: str = entry.data.get("ip_range", "") self._state: Optional[int] = None self._devices: List[Dict[str, Any]] = [] + # status diagnostics + self._status: str = "idle" # idle|scanning|ok|error + self._last_scan_started: Optional[str] = None + self._last_scan_duration_ms: Optional[int] = None + self._last_error: Optional[str] = None self.nm = nmap.PortScanner() _LOGGER.info("Network Scanner Extended initialised for %s", self.ip_range) @@ -81,21 +68,27 @@ def __init__(self, hass: HomeAssistant, entry: ConfigEntry) -> None: def unique_id(self) -> str: return f"{DOMAIN}_{self.ip_range}" + @property + def icon(self) -> str: + return "mdi:lan-pending" if self._status == "scanning" else "mdi:lan" + @property def state(self) -> Optional[int]: return self._state @property def extra_state_attributes(self) -> Dict[str, Any]: - return {"devices": self._devices} + return { + "devices": self._devices, + "status": self._status, + "last_scan_started": self._last_scan_started, + "last_scan_duration_ms": self._last_scan_duration_ms, + "last_error": self._last_error, + } async def async_update(self) -> None: - """Fetch new state data for the sensor.""" try: - # Build effective directory: data (setup) + options text/url (optional) directory: Dict[str, Dict[str, str]] = dict(self.entry.data.get("mac_directory", {})) - - # Merge options JSON text (highest precedence) opts = self.entry.options or {} opt_text = (opts.get("mac_directory_json_text") or "").strip() if opt_text: @@ -103,90 +96,70 @@ async def async_update(self) -> None: directory.update(_parse_dir_obj(json.loads(opt_text))) except Exception as exc: _LOGGER.warning("Options JSON invalid; ignoring: %s", exc) - - # Merge URL(s): options URL first, then data URL as fallback opt_url = (opts.get("mac_directory_json_url") or "").strip() data_url = (self.entry.data.get("mac_directory_json_url") or "").strip() url_to_use = opt_url or data_url if url_to_use: - fetched = await _fetch_dir_from_url(self.hass, url_to_use) - if fetched: - directory.update(fetched) + directory.update(await _fetch_dir_from_url(self.hass, url_to_use) or {}) - _LOGGER.debug("Effective directory size: %d", len(directory)) + # mark scanning and push to UI + self._status = "scanning" + self._last_error = None + start = datetime.utcnow() + self._last_scan_started = start.isoformat(timespec="seconds") + "Z" + self.async_write_ha_state() - # Scan network in executor (nmap is blocking) devices = await self.hass.async_add_executor_job(self._scan_network, directory) self._devices = devices self._state = len(devices) + self._last_scan_duration_ms = int((datetime.utcnow() - start).total_seconds() * 1000) + self._status = "ok" except Exception as e: _LOGGER.error("Error updating network scanner: %s", e) - - # ---------- Internal helpers ---------- + self._status = "error" + self._last_error = str(e) def _lookup_override(self, directory: Dict[str, Dict[str, str]], mac: str) -> Dict[str, str]: return directory.get(_norm_mac(mac), {}) def _scan_network(self, directory: Dict[str, Dict[str, str]]) -> List[Dict[str, Any]]: - """Scan the network and return device information.""" self.nm.scan(hosts=self.ip_range, arguments="-sn") devices: List[Dict[str, Any]] = [] - for host in self.nm.all_hosts(): try: addrs = self.nm[host].get("addresses", {}) mac = addrs.get("mac") ip = addrs.get("ipv4") or addrs.get("ipv6") or "" - if not mac or not ip: - continue # need both to be useful - - # Vendor (case-insensitive lookup in nmap's vendor map) + continue vendor = "Unknown" ven_map = self.nm[host].get("vendor", {}) - if isinstance(ven_map, dict) and ven_map: - # nmap keys may be upper/lower; compare upper + if isinstance(ven_map, dict): for k, v in ven_map.items(): if _norm_mac(k) == _norm_mac(mac): vendor = v break - hostname = self.nm[host].hostname() or "" - override = self._lookup_override(directory, mac) device_name = override.get("name") or "Unknown Device" device_type = override.get("desc") or "Unknown Device" - - devices.append( - { - "ip": ip, - "mac": mac, - "name": device_name, - "type": device_type, - "vendor": vendor, - "hostname": hostname, - } - ) + devices.append({ + "ip": ip, "mac": mac, "name": device_name, "type": device_type, + "vendor": vendor, "hostname": hostname + }) except Exception as exc: - _LOGGER.debug("Skipping host %s due to parse error: %s", host, exc) - - # Sort devices by IPv4 numerically (fallback: lexicographic) + _LOGGER.debug("Skipping host %s: %s", host, exc) def _ip_key(ip_str: str) -> List[int]: - try: - return [int(part) for part in ip_str.split(".")] - except Exception: - return [999, 999, 999, 999] - + try: return [int(part) for part in ip_str.split(".")] + except Exception: return [999, 999, 999, 999] devices.sort(key=lambda x: _ip_key(x.get("ip", ""))) return devices - async def async_setup_entry(hass: HomeAssistant, config_entry: ConfigEntry, async_add_entities) -> None: - """Set up the Network Scanner sensor from a config entry.""" ip_range = config_entry.data.get("ip_range") if not ip_range: _LOGGER.error("No ip_range configured; aborting setup") return - _LOGGER.debug("Setting up Network Scanner Extended for %s", ip_range) + # IMPORTANT: don't block on first update async_add_entities([NetworkScanner(hass, config_entry)], False) From 48d784e81bdad0a8c75ac7f86357ac2947d313ed Mon Sep 17 00:00:00 2001 From: Paul Gale Date: Tue, 7 Oct 2025 10:18:43 +0100 Subject: [PATCH 30/35] v0.6.9 just bump version --- custom_components/network_scanner/manifest.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/custom_components/network_scanner/manifest.json b/custom_components/network_scanner/manifest.json index 66a693b..245b4db 100644 --- a/custom_components/network_scanner/manifest.json +++ b/custom_components/network_scanner/manifest.json @@ -7,7 +7,7 @@ "documentation": "https://github.com/paganl/network_scanner_extended", "issue_tracker": "https://github.com/paganl/network_scanner_extended/issues", "requirements": ["python-nmap==0.7.1"], - "version": "0.6.8", + "version": "0.6.9", "integration_type": "hub", "iot_class": "local_polling" } From 47b25c061c45adb0ef699b3657dae669ddd14f56 Mon Sep 17 00:00:00 2001 From: Paul Gale Date: Tue, 7 Oct 2025 10:19:56 +0100 Subject: [PATCH 31/35] v0.6.9 Added support for status, since we're now running in the background --- custom_components/network_scanner/const.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/custom_components/network_scanner/const.py b/custom_components/network_scanner/const.py index 8a6405a..e1fbdf2 100644 --- a/custom_components/network_scanner/const.py +++ b/custom_components/network_scanner/const.py @@ -1,2 +1,10 @@ -"""Constants for the Network Scanner integration.""" DOMAIN = "network_scanner" + +# Default UI values +DEFAULT_IP_RANGE = "192.168.1.0/24" + +# Card-friendly status strings +STATUS_IDLE = "idle" +STATUS_SCANNING = "scanning" +STATUS_OK = "ok" +STATUS_ERROR = "error" From f9b795c4ca0175bd9277ec709348b0eb5a94111e Mon Sep 17 00:00:00 2001 From: Paul Gale Date: Tue, 7 Oct 2025 10:20:32 +0100 Subject: [PATCH 32/35] v0.6.9 --- custom_components/network_scanner/__init__.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/custom_components/network_scanner/__init__.py b/custom_components/network_scanner/__init__.py index 0e0c20a..af7855c 100644 --- a/custom_components/network_scanner/__init__.py +++ b/custom_components/network_scanner/__init__.py @@ -2,12 +2,14 @@ import logging from homeassistant.core import HomeAssistant from homeassistant.config_entries import ConfigEntry + from .const import DOMAIN _LOGGER = logging.getLogger(__name__) PLATFORMS: list[str] = ["sensor"] async def async_setup(hass: HomeAssistant, config: dict) -> bool: + # Preserve YAML for defaults in config flow (optional) hass.data.setdefault(DOMAIN, config.get(DOMAIN, {}) or {}) return True @@ -20,4 +22,5 @@ async def _async_update_listener(hass: HomeAssistant, entry: ConfigEntry) -> Non await hass.config_entries.async_reload(entry.entry_id) async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: - return await hass.config_entries.async_unload_platforms(entry, PLATFORMS) + unload_ok = await hass.config_entries.async_unload_platforms(entry, PLATFORMS) + return unload_ok From 9e518f958af9babff10ca18be7a33c8d151f169e Mon Sep 17 00:00:00 2001 From: Paul Gale Date: Tue, 7 Oct 2025 10:21:19 +0100 Subject: [PATCH 33/35] v0.6.9 --- .../network_scanner/config_flow.py | 270 +++++------------- 1 file changed, 67 insertions(+), 203 deletions(-) diff --git a/custom_components/network_scanner/config_flow.py b/custom_components/network_scanner/config_flow.py index 53ba71f..51c3a3f 100644 --- a/custom_components/network_scanner/config_flow.py +++ b/custom_components/network_scanner/config_flow.py @@ -1,248 +1,117 @@ +from __future__ import annotations import json import logging from ipaddress import ip_network import voluptuous as vol +from typing import Any, Dict from homeassistant import config_entries -# Selector fallback for older HA cores try: from homeassistant.helpers.selector import selector as ha_selector def TextSelector(): return ha_selector({"text": {"multiline": True}}) -except Exception: # pragma: no cover +except Exception: def TextSelector(): return str -from .const import DOMAIN +from .const import DOMAIN, DEFAULT_IP_RANGE _LOGGER = logging.getLogger(__name__) -_LOGGER.warning("network_scanner: config_flow module imported") - -def _format_for_log(d: dict) -> dict: - return {k: str(v) for k, v in d.items()} - def _normalise_mac_key(mac: str) -> str: return mac.upper() if isinstance(mac, str) else "" - -def _build_directory_from_legacy_yaml(yaml_cfg: dict) -> dict: - """ - Accept legacy keys like: - mac_mapping_1: "AA:BB:..:FF|Name|Desc" - mac_mapping_2: "AA:BB:..:FF=Name|Desc" - Loosely parsed on "|" with optional "=" between MAC and payload. - """ - directory: dict[str, dict] = {} - for i in range(1, 999): # generous upper bound; break after a gap beyond 25 - key = f"mac_mapping_{i}" - if key not in yaml_cfg: - if i > 25: - break - continue - - raw = str(yaml_cfg.get(key, "")).strip() - if not raw: - continue - - if "=" in raw: - mac_part, payload = raw.split("=", 1) - else: - parts = raw.split("|") - mac_part, payload = parts[0], "|".join(parts[1:]) - - mac = _normalise_mac_key(mac_part.strip()) - if not mac: - continue - - name, desc = "", "" - if "|" in payload: - name, desc = payload.split("|", 1) - else: - name = payload - - directory[mac] = {"name": name.strip(), "desc": desc.strip()} - - return directory - - class NetworkScannerConfigFlow(config_entries.ConfigFlow, domain=DOMAIN): - """Handle the config flow for Network Scanner (Extended).""" - VERSION = 1 async def async_step_user(self, user_input=None): - errors: dict[str, str] = {} - - # Pick up any YAML defaults preserved by __init__ (optional) - yaml_config = self.hass.data.get(DOMAIN, {}) or {} - _LOGGER.debug("YAML Config (raw): %s", _format_for_log(yaml_config)) - - data_schema = vol.Schema( - { - vol.Required( - "ip_range", - description={ - "suggested_value": yaml_config.get("ip_range", "192.168.1.0/24") - }, - ): str, - vol.Optional( - "mac_directory_json_text", - description={ - "suggested_value": yaml_config.get( - "mac_directory_json_text", "" - ) - }, - ): TextSelector(), - vol.Optional( - "mac_directory_json_url", - description={ - "suggested_value": yaml_config.get( - "mac_directory_json_url", "" - ) - }, - ): str, - } - ) + yaml_defaults = self.hass.data.get(DOMAIN, {}) or {} + schema = vol.Schema({ + vol.Required("ip_range", description={"suggested_value": yaml_defaults.get("ip_range", DEFAULT_IP_RANGE)}): str, + vol.Optional("mac_directory_json_text", description={"suggested_value": ""}): TextSelector(), + vol.Optional("mac_directory_json_url", description={"suggested_value": ""}): str, + }) if user_input is None: - return self.async_show_form( - step_id="user", - data_schema=data_schema, - errors=errors, - description_placeholders={ - "description": "Enter the IP range and an optional MAC directory (JSON text or URL)." - }, - ) - - # --- Validate & assemble entry data --- - ip_range = (user_input.get("ip_range") or "").strip() - json_text = (user_input.get("mac_directory_json_text") or "").strip() - json_url = (user_input.get("mac_directory_json_url") or "").strip() + return self.async_show_form(step_id="user", data_schema=schema, errors={}) - # Validate CIDR/IP early + errors: Dict[str, str] = {} + ipr = (user_input.get("ip_range") or "").strip() try: - ip_network(ip_range, strict=False) + ip_network(ipr, strict=False) except Exception: errors["ip_range"] = "invalid_ip_range" - mac_directory: dict[str, dict] = {} - - # 1) Legacy YAML mac_mapping_* support - legacy = _build_directory_from_legacy_yaml(yaml_config) - if legacy: - mac_directory.update(legacy) - - # 2) JSON pasted in textarea - if json_text: + # Light validation of JSON text + jtxt = (user_input.get("mac_directory_json_text") or "").strip() + if jtxt: try: - parsed = json.loads(json_text) + parsed = json.loads(jtxt) if not isinstance(parsed, dict): errors["mac_directory_json_text"] = "invalid_json" - else: - # Accept both { "data": {...} } and flat { "AA:BB:..": {...} } - block = parsed.get("data", parsed) - if not isinstance(block, dict): - errors["mac_directory_json_text"] = "invalid_json" - else: - for k, v in block.items(): - mk = _normalise_mac_key(k) - if not mk: - continue - if isinstance(v, dict): - name = v.get("name", "") - desc = v.get("desc", "") - else: - name, desc = str(v), "" - mac_directory[mk] = {"name": str(name), "desc": str(desc)} - except Exception as exc: # log and surface a friendly error - _LOGGER.warning( - "Failed to parse mac_directory_json_text: %s", exc, exc_info=True - ) + except Exception: errors["mac_directory_json_text"] = "invalid_json" - entry_data = { - "ip_range": ip_range, - "mac_directory": mac_directory, # processed dict with UPPERCASE keys - "mac_directory_json_url": json_url, # optional; fetch at runtime if you support it - } - if errors: - return self.async_show_form( - step_id="user", - data_schema=data_schema, - errors=errors, - description_placeholders={"description": "Fix the errors and try again."}, - ) - - _LOGGER.debug( - "Creating entry: ip_range=%s, mac_directory_count=%d, url=%s", - ip_range, - len(mac_directory), - json_url or "-", - ) - return self.async_create_entry( - title="Network Scanner Extended", - data=entry_data, - ) - + return self.async_show_form(step_id="user", data_schema=schema, errors=errors) -# ---- Options Flow (single, clean definition) ---- -from typing import Any, Dict + # Normalise directory now so the sensor can use it directly + directory = {} + if jtxt: + raw = json.loads(jtxt) + block = raw.get("data", raw) if isinstance(raw, dict) else {} + if isinstance(block, dict): + for k, v in block.items(): + mk = _normalise_mac_key(k) + if not mk: + continue + if isinstance(v, dict): + directory[mk] = {"name": str(v.get("name", "")), "desc": str(v.get("desc", ""))} + else: + directory[mk] = {"name": str(v), "desc": ""} -class NetworkScannerOptionsFlow(config_entries.OptionsFlow): - """Options UI to edit settings after setup.""" + data = { + "ip_range": ipr, + "mac_directory": directory, + "mac_directory_json_url": (user_input.get("mac_directory_json_url") or "").strip(), + } + return self.async_create_entry(title="Network Scanner Extended", data=data) - def __init__(self, config_entry: config_entries.ConfigEntry) -> None: - self.config_entry = config_entry + # Options flow mirrors the same fields + async def async_step_init(self, user_input=None): + return await self.async_step_user(user_input) - async def async_step_init(self, user_input: Dict[str, Any] | None = None): - # Entry point; show the same form as 'user' + async def async_step_options(self, user_input=None): return await self.async_step_user(user_input) - async def async_step_user(self, user_input: Dict[str, Any] | None = None): - errors: dict[str, str] = {} +class NetworkScannerOptionsFlow(config_entries.OptionsFlow): + def __init__(self, entry: config_entries.ConfigEntry) -> None: + self.entry = entry - # Defaults prefer options (if previously saved), then fall back to data - data = self.config_entry.data or {} - opts = self.config_entry.options or {} + async def async_step_init(self, user_input=None): + return await self.async_step_user(user_input) - cur_ip_range = opts.get("ip_range", data.get("ip_range", "192.168.1.0/24")) - cur_json_text = opts.get("mac_directory_json_text", "") - cur_json_url = opts.get( - "mac_directory_json_url", data.get("mac_directory_json_url", "") - ) + async def async_step_user(self, user_input=None): + data = self.entry.data or {} + opts = self.entry.options or {} - schema = vol.Schema( - { - vol.Required( - "ip_range", description={"suggested_value": cur_ip_range} - ): str, - vol.Optional( - "mac_directory_json_text", - description={"suggested_value": cur_json_text}, - ): TextSelector(), - vol.Optional( - "mac_directory_json_url", - description={"suggested_value": cur_json_url}, - ): str, - } - ) + schema = vol.Schema({ + vol.Required("ip_range", description={"suggested_value": opts.get("ip_range", data.get("ip_range", DEFAULT_IP_RANGE))}): str, + vol.Optional("mac_directory_json_text", description={"suggested_value": opts.get("mac_directory_json_text", "")}): TextSelector(), + vol.Optional("mac_directory_json_url", description={"suggested_value": opts.get("mac_directory_json_url", data.get("mac_directory_json_url",""))}): str, + }) if user_input is None: - return self.async_show_form( - step_id="user", data_schema=schema, errors=errors - ) + return self.async_show_form(step_id="user", data_schema=schema, errors={}) - # Validate IP/CIDR - ipr = (user_input.get("ip_range") or "").strip() + # Validate similarly + errors = {} try: - ip_network(ipr, strict=False) + ip_network((user_input.get("ip_range") or "").strip(), strict=False) except Exception: errors["ip_range"] = "invalid_ip_range" - # Lightly validate JSON text (don’t store parsed dict in options) jtxt = (user_input.get("mac_directory_json_text") or "").strip() if jtxt: try: @@ -253,19 +122,14 @@ async def async_step_user(self, user_input: Dict[str, Any] | None = None): except Exception: errors["mac_directory_json_text"] = "invalid_json" - jurl = (user_input.get("mac_directory_json_url") or "").strip() - if errors: return self.async_show_form(step_id="user", data_schema=schema, errors=errors) - # Save OPTIONS only. (Your setup uses entry.data; options override at runtime.) - new_options = { - "ip_range": ipr + return self.async_create_entry(title="", data={ + "ip_range": (user_input.get("ip_range") or "").strip(), "mac_directory_json_text": jtxt, - "mac_directory_json_url": jurl, - } - return self.async_create_entry(title="", data=new_options) - + "mac_directory_json_url": (user_input.get("mac_directory_json_url") or "").strip(), + }) - async def async_get_options_flow(config_entry: config_entries.ConfigEntry): - return NetworkScannerOptionsFlow(config_entry) +async def async_get_options_flow(config_entry: config_entries.ConfigEntry): + return NetworkScannerOptionsFlow(config_entry) From 64def4563b3b1c75cc11a14d3baf30759a0eff1c Mon Sep 17 00:00:00 2001 From: Paul Gale Date: Tue, 7 Oct 2025 10:22:00 +0100 Subject: [PATCH 34/35] v0.6.9 --- custom_components/network_scanner/sensor.py | 145 +++++++++----------- 1 file changed, 66 insertions(+), 79 deletions(-) diff --git a/custom_components/network_scanner/sensor.py b/custom_components/network_scanner/sensor.py index 9c25d3c..e664f79 100644 --- a/custom_components/network_scanner/sensor.py +++ b/custom_components/network_scanner/sensor.py @@ -1,15 +1,17 @@ from __future__ import annotations -import json, logging -from datetime import timedelta, datetime +import json +import logging from typing import Any, Dict, List, Optional + import nmap -from homeassistant.core import HomeAssistant -from homeassistant.helpers.entity import Entity +from aiohttp import ClientError +from homeassistant.components.sensor import SensorEntity from homeassistant.config_entries import ConfigEntry +from homeassistant.core import HomeAssistant from homeassistant.helpers.aiohttp_client import async_get_clientsession -from .const import DOMAIN -SCAN_INTERVAL = timedelta(minutes=15) +from .const import DOMAIN, STATUS_SCANNING, STATUS_OK, STATUS_ERROR + _LOGGER = logging.getLogger(__name__) def _norm_mac(mac: Optional[str]) -> str: @@ -32,134 +34,119 @@ def _parse_dir_obj(obj: Any) -> Dict[str, Dict[str, str]]: out[mk] = {"name": str(v), "desc": ""} return out -async def _fetch_dir_from_url(hass: HomeAssistant, url: str) -> Dict[str, Dict[str, str]]: - if not url: - return {} - try: - session = async_get_clientsession(hass) - async with session.get(url, timeout=10) as resp: - resp.raise_for_status() - data = json.loads(await resp.text()) - return _parse_dir_obj(data) - except Exception as exc: - _LOGGER.warning("Failed to fetch directory from %s: %s", url, exc) - return {} - -class NetworkScanner(Entity): +class NetworkScannerExtended(SensorEntity): _attr_name = "Network Scanner Extended" - _attr_unit_of_measurement = "Devices" + _attr_native_unit_of_measurement = "Devices" _attr_should_poll = True def __init__(self, hass: HomeAssistant, entry: ConfigEntry) -> None: self.hass = hass self.entry = entry - self.ip_range: str = entry.data.get("ip_range", "") + self.ip_range: str = entry.options.get("ip_range", entry.data.get("ip_range", "")) self._state: Optional[int] = None self._devices: List[Dict[str, Any]] = [] - # status diagnostics - self._status: str = "idle" # idle|scanning|ok|error - self._last_scan_started: Optional[str] = None - self._last_scan_duration_ms: Optional[int] = None - self._last_error: Optional[str] = None + self._status: str = STATUS_OK self.nm = nmap.PortScanner() - _LOGGER.info("Network Scanner Extended initialised for %s", self.ip_range) @property def unique_id(self) -> str: return f"{DOMAIN}_{self.ip_range}" @property - def icon(self) -> str: - return "mdi:lan-pending" if self._status == "scanning" else "mdi:lan" - - @property - def state(self) -> Optional[int]: + def native_value(self) -> Optional[int]: return self._state @property def extra_state_attributes(self) -> Dict[str, Any]: return { - "devices": self._devices, "status": self._status, - "last_scan_started": self._last_scan_started, - "last_scan_duration_ms": self._last_scan_duration_ms, - "last_error": self._last_error, + "ip_range": self.ip_range, + "devices": self._devices, } async def async_update(self) -> None: + # Build effective directory (entry.data base + options text/url) try: + self._status = STATUS_SCANNING directory: Dict[str, Dict[str, str]] = dict(self.entry.data.get("mac_directory", {})) + opts = self.entry.options or {} - opt_text = (opts.get("mac_directory_json_text") or "").strip() - if opt_text: + # JSON text in options (highest precedence) + jtxt = (opts.get("mac_directory_json_text") or "").strip() + if jtxt: try: - directory.update(_parse_dir_obj(json.loads(opt_text))) + directory.update(_parse_dir_obj(json.loads(jtxt))) except Exception as exc: - _LOGGER.warning("Options JSON invalid; ignoring: %s", exc) - opt_url = (opts.get("mac_directory_json_url") or "").strip() - data_url = (self.entry.data.get("mac_directory_json_url") or "").strip() - url_to_use = opt_url or data_url - if url_to_use: - directory.update(await _fetch_dir_from_url(self.hass, url_to_use) or {}) - - # mark scanning and push to UI - self._status = "scanning" - self._last_error = None - start = datetime.utcnow() - self._last_scan_started = start.isoformat(timespec="seconds") + "Z" - self.async_write_ha_state() + _LOGGER.warning("Invalid options JSON: %s", exc) + # Optional URL + url = (opts.get("mac_directory_json_url") or self.entry.data.get("mac_directory_json_url") or "").strip() + if url: + try: + session = async_get_clientsession(self.hass) + async with session.get(url, timeout=10) as resp: + resp.raise_for_status() + directory.update(_parse_dir_obj(json.loads(await resp.text()))) + except (ClientError, Exception) as exc: + _LOGGER.warning("Failed to fetch directory URL %s: %s", url, exc) + + # Scan (blocking) in executor devices = await self.hass.async_add_executor_job(self._scan_network, directory) self._devices = devices self._state = len(devices) - self._last_scan_duration_ms = int((datetime.utcnow() - start).total_seconds() * 1000) - self._status = "ok" - except Exception as e: - _LOGGER.error("Error updating network scanner: %s", e) - self._status = "error" - self._last_error = str(e) - - def _lookup_override(self, directory: Dict[str, Dict[str, str]], mac: str) -> Dict[str, str]: - return directory.get(_norm_mac(mac), {}) + self._status = STATUS_OK + except Exception as exc: + self._status = STATUS_ERROR + _LOGGER.error("Network scan failed: %s", exc) def _scan_network(self, directory: Dict[str, Dict[str, str]]) -> List[Dict[str, Any]]: self.nm.scan(hosts=self.ip_range, arguments="-sn") devices: List[Dict[str, Any]] = [] for host in self.nm.all_hosts(): try: - addrs = self.nm[host].get("addresses", {}) + node = self.nm[host] + addrs = node.get("addresses", {}) mac = addrs.get("mac") ip = addrs.get("ipv4") or addrs.get("ipv6") or "" if not mac or not ip: continue + vendor = "Unknown" - ven_map = self.nm[host].get("vendor", {}) + ven_map = node.get("vendor", {}) if isinstance(ven_map, dict): for k, v in ven_map.items(): if _norm_mac(k) == _norm_mac(mac): vendor = v break - hostname = self.nm[host].hostname() or "" - override = self._lookup_override(directory, mac) - device_name = override.get("name") or "Unknown Device" - device_type = override.get("desc") or "Unknown Device" + + hostname = node.hostname() or "" + override = directory.get(_norm_mac(mac), {}) + name = override.get("name") or "Unknown Device" + desc = override.get("desc") or "Unknown Device" + devices.append({ - "ip": ip, "mac": mac, "name": device_name, "type": device_type, - "vendor": vendor, "hostname": hostname + "ip": ip, + "mac": mac, + "name": name, + "type": desc, + "vendor": vendor, + "hostname": hostname, }) except Exception as exc: _LOGGER.debug("Skipping host %s: %s", host, exc) + def _ip_key(ip_str: str) -> List[int]: - try: return [int(part) for part in ip_str.split(".")] - except Exception: return [999, 999, 999, 999] - devices.sort(key=lambda x: _ip_key(x.get("ip", ""))) + try: + return [int(p) for p in ip_str.split(".")] + except Exception: + return [999, 999, 999, 999] + + devices.sort(key=lambda d: _ip_key(d.get("ip", ""))) return devices -async def async_setup_entry(hass: HomeAssistant, config_entry: ConfigEntry, async_add_entities) -> None: - ip_range = config_entry.data.get("ip_range") +async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry, async_add_entities) -> None: + ip_range = entry.options.get("ip_range", entry.data.get("ip_range")) if not ip_range: - _LOGGER.error("No ip_range configured; aborting setup") + _LOGGER.error("network_scanner: ip_range missing; not creating entity") return - _LOGGER.debug("Setting up Network Scanner Extended for %s", ip_range) - # IMPORTANT: don't block on first update - async_add_entities([NetworkScanner(hass, config_entry)], False) + async_add_entities([NetworkScannerExtended(hass, entry)], False) From 5901d9247a58ad0e957a1df76f959b6d93ae745c Mon Sep 17 00:00:00 2001 From: Paul Gale Date: Tue, 7 Oct 2025 10:45:09 +0100 Subject: [PATCH 35/35] v0.6.10 --- custom_components/network_scanner/config_flow.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/custom_components/network_scanner/config_flow.py b/custom_components/network_scanner/config_flow.py index 51c3a3f..6d9587c 100644 --- a/custom_components/network_scanner/config_flow.py +++ b/custom_components/network_scanner/config_flow.py @@ -19,6 +19,8 @@ def TextSelector(): _LOGGER = logging.getLogger(__name__) +DEFAULT_NMAP_ARGS = "-sn -PE -PS22,80,443 -PA80,443 -PU53 -T4" + def _normalise_mac_key(mac: str) -> str: return mac.upper() if isinstance(mac, str) else "" @@ -29,6 +31,7 @@ async def async_step_user(self, user_input=None): yaml_defaults = self.hass.data.get(DOMAIN, {}) or {} schema = vol.Schema({ vol.Required("ip_range", description={"suggested_value": yaml_defaults.get("ip_range", DEFAULT_IP_RANGE)}): str, + vol.Optional("nmap_args", description={"suggested_value": DEFAULT_NMAP_ARGS}): str, vol.Optional("mac_directory_json_text", description={"suggested_value": ""}): TextSelector(), vol.Optional("mac_directory_json_url", description={"suggested_value": ""}): str, }) @@ -98,6 +101,7 @@ async def async_step_user(self, user_input=None): schema = vol.Schema({ vol.Required("ip_range", description={"suggested_value": opts.get("ip_range", data.get("ip_range", DEFAULT_IP_RANGE))}): str, + vol.Optional("nmap_args", description={"suggested_value": DEFAULT_NMAP_ARGS}): str, vol.Optional("mac_directory_json_text", description={"suggested_value": opts.get("mac_directory_json_text", "")}): TextSelector(), vol.Optional("mac_directory_json_url", description={"suggested_value": opts.get("mac_directory_json_url", data.get("mac_directory_json_url",""))}): str, })