diff --git a/custom_components/network_scanner/__init__.py b/custom_components/network_scanner/__init__.py index 62290d9..af7855c 100644 --- a/custom_components/network_scanner/__init__.py +++ b/custom_components/network_scanner/__init__.py @@ -1,18 +1,26 @@ -from .sensor import NetworkScanner +from __future__ import annotations +import logging +from homeassistant.core import HomeAssistant +from homeassistant.config_entries import ConfigEntry + from .const import DOMAIN -async def async_setup(hass, config): - """Set up the Network Scanner component.""" - # Store YAML configuration in hass.data - hass.data[DOMAIN] = config.get(DOMAIN, {}) - return True +_LOGGER = logging.getLogger(__name__) +PLATFORMS: list[str] = ["sensor"] -async def async_setup_entry(hass, config_entry): - """Set up Network Scanner from a config entry.""" - await hass.config_entries.async_forward_entry_setups(config_entry, ["sensor"]) +async def async_setup(hass: HomeAssistant, config: dict) -> bool: + # Preserve YAML for defaults in config flow (optional) + hass.data.setdefault(DOMAIN, config.get(DOMAIN, {}) or {}) return True -async def async_unload_entry(hass, config_entry): - """Unload a config entry.""" - await hass.config_entries.async_forward_entry_unload(config_entry, "sensor") +async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: + entry.async_on_unload(entry.add_update_listener(_async_update_listener)) + await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS) return True + +async def _async_update_listener(hass: HomeAssistant, entry: ConfigEntry) -> None: + await hass.config_entries.async_reload(entry.entry_id) + +async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: + unload_ok = await hass.config_entries.async_unload_platforms(entry, PLATFORMS) + return unload_ok diff --git a/custom_components/network_scanner/config_flow.py b/custom_components/network_scanner/config_flow.py index 13c1b19..6d9587c 100644 --- a/custom_components/network_scanner/config_flow.py +++ b/custom_components/network_scanner/config_flow.py @@ -1,61 +1,139 @@ +from __future__ import annotations +import json +import logging +from ipaddress import ip_network import voluptuous as vol +from typing import Any, Dict + from homeassistant import config_entries -from .const import DOMAIN -import logging + +try: + from homeassistant.helpers.selector import selector as ha_selector + def TextSelector(): + return ha_selector({"text": {"multiline": True}}) +except Exception: + def TextSelector(): + return str + +from .const import DOMAIN, DEFAULT_IP_RANGE _LOGGER = logging.getLogger(__name__) +DEFAULT_NMAP_ARGS = "-sn -PE -PS22,80,443 -PA80,443 -PU53 -T4" + +def _normalise_mac_key(mac: str) -> str: + return mac.upper() if isinstance(mac, str) else "" + class NetworkScannerConfigFlow(config_entries.ConfigFlow, domain=DOMAIN): - """Handle a config flow for Network Scanner.""" + VERSION = 1 async def async_step_user(self, user_input=None): - def format_dict_for_printing(d): - return {k: str(v) for k, v in d.items()} + yaml_defaults = self.hass.data.get(DOMAIN, {}) or {} + schema = vol.Schema({ + vol.Required("ip_range", description={"suggested_value": yaml_defaults.get("ip_range", DEFAULT_IP_RANGE)}): str, + vol.Optional("nmap_args", description={"suggested_value": DEFAULT_NMAP_ARGS}): str, + vol.Optional("mac_directory_json_text", description={"suggested_value": ""}): TextSelector(), + vol.Optional("mac_directory_json_url", description={"suggested_value": ""}): str, + }) - """Manage the configurations from the user interface.""" - errors = {} + if user_input is None: + return self.async_show_form(step_id="user", data_schema=schema, errors={}) + + errors: Dict[str, str] = {} + ipr = (user_input.get("ip_range") or "").strip() + try: + ip_network(ipr, strict=False) + except Exception: + errors["ip_range"] = "invalid_ip_range" - # Load data from configuration.yaml - yaml_config = self.hass.data.get(DOMAIN, {}) - _LOGGER.debug("YAML Config: %s", yaml_config) + # Light validation of JSON text + jtxt = (user_input.get("mac_directory_json_text") or "").strip() + if jtxt: + try: + parsed = json.loads(jtxt) + if not isinstance(parsed, dict): + errors["mac_directory_json_text"] = "invalid_json" + except Exception: + errors["mac_directory_json_text"] = "invalid_json" - if user_input is not None: - return self.async_create_entry(title="Network Scanner", data=user_input) + if errors: + return self.async_show_form(step_id="user", data_schema=schema, errors=errors) - data_schema_dict = { - vol.Required("ip_range", description={"suggested_value": yaml_config.get("ip_range", "192.168.1.0/24")}): str + # Normalise directory now so the sensor can use it directly + directory = {} + if jtxt: + raw = json.loads(jtxt) + block = raw.get("data", raw) if isinstance(raw, dict) else {} + if isinstance(block, dict): + for k, v in block.items(): + mk = _normalise_mac_key(k) + if not mk: + continue + if isinstance(v, dict): + directory[mk] = {"name": str(v.get("name", "")), "desc": str(v.get("desc", ""))} + else: + directory[mk] = {"name": str(v), "desc": ""} + + data = { + "ip_range": ipr, + "mac_directory": directory, + "mac_directory_json_url": (user_input.get("mac_directory_json_url") or "").strip(), } + return self.async_create_entry(title="Network Scanner Extended", data=data) + + # Options flow mirrors the same fields + async def async_step_init(self, user_input=None): + return await self.async_step_user(user_input) + + async def async_step_options(self, user_input=None): + return await self.async_step_user(user_input) + +class NetworkScannerOptionsFlow(config_entries.OptionsFlow): + def __init__(self, entry: config_entries.ConfigEntry) -> None: + self.entry = entry + + async def async_step_init(self, user_input=None): + return await self.async_step_user(user_input) + + async def async_step_user(self, user_input=None): + data = self.entry.data or {} + opts = self.entry.options or {} + + schema = vol.Schema({ + vol.Required("ip_range", description={"suggested_value": opts.get("ip_range", data.get("ip_range", DEFAULT_IP_RANGE))}): str, + vol.Optional("nmap_args", description={"suggested_value": DEFAULT_NMAP_ARGS}): str, + vol.Optional("mac_directory_json_text", description={"suggested_value": opts.get("mac_directory_json_text", "")}): TextSelector(), + vol.Optional("mac_directory_json_url", description={"suggested_value": opts.get("mac_directory_json_url", data.get("mac_directory_json_url",""))}): str, + }) + + if user_input is None: + return self.async_show_form(step_id="user", data_schema=schema, errors={}) + + # Validate similarly + errors = {} + try: + ip_network((user_input.get("ip_range") or "").strip(), strict=False) + except Exception: + errors["ip_range"] = "invalid_ip_range" + + jtxt = (user_input.get("mac_directory_json_text") or "").strip() + if jtxt: + try: + parsed = json.loads(jtxt) + block = parsed.get("data", parsed) if isinstance(parsed, dict) else {} + if not isinstance(block, dict): + errors["mac_directory_json_text"] = "invalid_json" + except Exception: + errors["mac_directory_json_text"] = "invalid_json" + + if errors: + return self.async_show_form(step_id="user", data_schema=schema, errors=errors) + + return self.async_create_entry(title="", data={ + "ip_range": (user_input.get("ip_range") or "").strip(), + "mac_directory_json_text": jtxt, + "mac_directory_json_url": (user_input.get("mac_directory_json_url") or "").strip(), + }) - # Add mac mappings with values from YAML if available - for i in range(1, 26): # Ensure at least 25 entries - key = f"mac_mapping_{i}" - if key in yaml_config: - suggested_value = yaml_config.get(key) - _LOGGER.debug("YAML Config key: %s", suggested_value) - else: - suggested_value = None # No value in YAML config - - # Add the optional field to the schema, with or without suggested_value - data_schema_dict[vol.Optional(key, description={"suggested_value": suggested_value})] = str - - # Continue to add more mappings if available in the YAML config - i = 26 - while True: - key = f"mac_mapping_{i}" - if key in yaml_config: - suggested_value = yaml_config.get(key) - _LOGGER.debug("YAML Config key: %s", suggested_value) - data_schema_dict[vol.Optional(key, description={"suggested_value": suggested_value})] = str - i += 1 - else: - break # Exit loop when no more mappings are found in the YAML config - - _LOGGER.debug("schema: %s", format_dict_for_printing(data_schema_dict)) - data_schema = vol.Schema(data_schema_dict) - - return self.async_show_form( - step_id="user", - data_schema=data_schema, - errors=errors, - description_placeholders={"description": "Enter the IP range and MAC mappings"} - ) +async def async_get_options_flow(config_entry: config_entries.ConfigEntry): + return NetworkScannerOptionsFlow(config_entry) diff --git a/custom_components/network_scanner/const.py b/custom_components/network_scanner/const.py index 8a6405a..e1fbdf2 100644 --- a/custom_components/network_scanner/const.py +++ b/custom_components/network_scanner/const.py @@ -1,2 +1,10 @@ -"""Constants for the Network Scanner integration.""" DOMAIN = "network_scanner" + +# Default UI values +DEFAULT_IP_RANGE = "192.168.1.0/24" + +# Card-friendly status strings +STATUS_IDLE = "idle" +STATUS_SCANNING = "scanning" +STATUS_OK = "ok" +STATUS_ERROR = "error" diff --git a/custom_components/network_scanner/en.json b/custom_components/network_scanner/en.json new file mode 100644 index 0000000..d35e969 --- /dev/null +++ b/custom_components/network_scanner/en.json @@ -0,0 +1,15 @@ +{ + "title": "Network Scanner Extended", + "config": { + "step": { + "user": { + "title": "Network Scanner Extended", + "description": "Enter the IP range and an optional MAC directory (JSON text or URL)." + } + }, + "error": { + "invalid_json": "That JSON isn’t valid. Paste a JSON object (optionally with a top-level \"data\" object).", + "invalid_ip_range": "Please enter a valid CIDR range, e.g. 192.168.1.0/24." + } + } +} diff --git a/custom_components/network_scanner/manifest.json b/custom_components/network_scanner/manifest.json index be34964..245b4db 100644 --- a/custom_components/network_scanner/manifest.json +++ b/custom_components/network_scanner/manifest.json @@ -1,12 +1,13 @@ { "domain": "network_scanner", - "name": "Network Scanner", - "codeowners": ["@parvez"], + "name": "Network Scanner Extended", + "codeowners": ["@paganl"], "config_flow": true, "dependencies": [], - "documentation": "https://github.com/parvez/network_scanner", - "iot_class": "local_polling", - "issue_tracker": "https://github.com/parvez/network_scanner/issues", - "requirements": ["python-nmap"], - "version": "1.0.7" + "documentation": "https://github.com/paganl/network_scanner_extended", + "issue_tracker": "https://github.com/paganl/network_scanner_extended/issues", + "requirements": ["python-nmap==0.7.1"], + "version": "0.6.9", + "integration_type": "hub", + "iot_class": "local_polling" } diff --git a/custom_components/network_scanner/sensor.py b/custom_components/network_scanner/sensor.py index 2edbbec..e664f79 100644 --- a/custom_components/network_scanner/sensor.py +++ b/custom_components/network_scanner/sensor.py @@ -1,134 +1,152 @@ +from __future__ import annotations +import json import logging +from typing import Any, Dict, List, Optional + import nmap -from datetime import timedelta -from homeassistant.helpers.entity import Entity -from .const import DOMAIN +from aiohttp import ClientError +from homeassistant.components.sensor import SensorEntity +from homeassistant.config_entries import ConfigEntry +from homeassistant.core import HomeAssistant +from homeassistant.helpers.aiohttp_client import async_get_clientsession -SCAN_INTERVAL = timedelta(minutes=15) +from .const import DOMAIN, STATUS_SCANNING, STATUS_OK, STATUS_ERROR _LOGGER = logging.getLogger(__name__) -class NetworkScanner(Entity): - """Representation of a Network Scanner.""" - - def __init__(self, hass, ip_range, mac_mapping): - """Initialize the sensor.""" - self._state = None - self.hass = hass - self.ip_range = ip_range +def _norm_mac(mac: Optional[str]) -> str: + return (mac or "").upper() + +def _parse_dir_obj(obj: Any) -> Dict[str, Dict[str, str]]: + out: Dict[str, Dict[str, str]] = {} + if not isinstance(obj, dict): + return out + block = obj.get("data", obj) + if not isinstance(block, dict): + return out + for k, v in block.items(): + mk = _norm_mac(k) + if not mk: + continue + if isinstance(v, dict): + out[mk] = {"name": str(v.get("name", "")), "desc": str(v.get("desc", ""))} + else: + out[mk] = {"name": str(v), "desc": ""} + return out - _LOGGER.debug("mac_mapping unparsed: %s", mac_mapping) - self.mac_mapping = self.parse_mac_mapping(mac_mapping) - _LOGGER.debug("mac_mapping parsed: %s", mac_mapping) +class NetworkScannerExtended(SensorEntity): + _attr_name = "Network Scanner Extended" + _attr_native_unit_of_measurement = "Devices" + _attr_should_poll = True + def __init__(self, hass: HomeAssistant, entry: ConfigEntry) -> None: + self.hass = hass + self.entry = entry + self.ip_range: str = entry.options.get("ip_range", entry.data.get("ip_range", "")) + self._state: Optional[int] = None + self._devices: List[Dict[str, Any]] = [] + self._status: str = STATUS_OK self.nm = nmap.PortScanner() - _LOGGER.info("Network Scanner initialized") - - @property - def should_poll(self): - """Return True as updates are needed via polling.""" - return True - - @property - def unique_id(self): - """Return unique ID.""" - return f"network_scanner_{self.ip_range}" @property - def name(self): - return 'Network Scanner' + def unique_id(self) -> str: + return f"{DOMAIN}_{self.ip_range}" @property - def state(self): + def native_value(self) -> Optional[int]: return self._state @property - def unit_of_measurement(self): - return 'Devices' - - async def async_update(self): - """Fetch new state data for the sensor.""" + def extra_state_attributes(self) -> Dict[str, Any]: + return { + "status": self._status, + "ip_range": self.ip_range, + "devices": self._devices, + } + + async def async_update(self) -> None: + # Build effective directory (entry.data base + options text/url) try: - _LOGGER.debug("Scanning network") - devices = await self.hass.async_add_executor_job(self.scan_network) + self._status = STATUS_SCANNING + directory: Dict[str, Dict[str, str]] = dict(self.entry.data.get("mac_directory", {})) + + opts = self.entry.options or {} + # JSON text in options (highest precedence) + jtxt = (opts.get("mac_directory_json_text") or "").strip() + if jtxt: + try: + directory.update(_parse_dir_obj(json.loads(jtxt))) + except Exception as exc: + _LOGGER.warning("Invalid options JSON: %s", exc) + + # Optional URL + url = (opts.get("mac_directory_json_url") or self.entry.data.get("mac_directory_json_url") or "").strip() + if url: + try: + session = async_get_clientsession(self.hass) + async with session.get(url, timeout=10) as resp: + resp.raise_for_status() + directory.update(_parse_dir_obj(json.loads(await resp.text()))) + except (ClientError, Exception) as exc: + _LOGGER.warning("Failed to fetch directory URL %s: %s", url, exc) + + # Scan (blocking) in executor + devices = await self.hass.async_add_executor_job(self._scan_network, directory) + self._devices = devices self._state = len(devices) - self._attr_extra_state_attributes = {"devices": devices} - except Exception as e: - _LOGGER.error("Error updating network scanner: %s", e) - - def parse_mac_mapping(self, mapping_string): - """Parse the MAC mapping string into a dictionary.""" - mapping = {} - for line in mapping_string.split('\n'): - parts = line.split(';') - if len(parts) >= 3: - mapping[parts[0].lower()] = (parts[1], parts[2]) - return mapping - - def get_device_info_from_mac(self, mac_address): - """Retrieve device name and type from the MAC mapping.""" - return self.mac_mapping.get(mac_address.lower(), ("Unknown Device", "Unknown Device")) - - def scan_network(self): - """Scan the network and return device information.""" - self.nm.scan(hosts=self.ip_range, arguments='-sn') - devices = [] - + self._status = STATUS_OK + except Exception as exc: + self._status = STATUS_ERROR + _LOGGER.error("Network scan failed: %s", exc) + + def _scan_network(self, directory: Dict[str, Dict[str, str]]) -> List[Dict[str, Any]]: + self.nm.scan(hosts=self.ip_range, arguments="-sn") + devices: List[Dict[str, Any]] = [] for host in self.nm.all_hosts(): - _LOGGER.debug("Found Host: %s", host) - if 'mac' in self.nm[host]['addresses']: - _LOGGER.debug("Found Mac: %s", self.nm[host]['addresses']) - ip = self.nm[host]['addresses']['ipv4'] - mac = self.nm[host]['addresses']['mac'] + try: + node = self.nm[host] + addrs = node.get("addresses", {}) + mac = addrs.get("mac") + ip = addrs.get("ipv4") or addrs.get("ipv6") or "" + if not mac or not ip: + continue + vendor = "Unknown" - if 'vendor' in self.nm[host] and mac in self.nm[host]['vendor']: - vendor = self.nm[host]['vendor'][mac] - hostname = self.nm[host].hostname() - device_name, device_type = self.get_device_info_from_mac(mac) + ven_map = node.get("vendor", {}) + if isinstance(ven_map, dict): + for k, v in ven_map.items(): + if _norm_mac(k) == _norm_mac(mac): + vendor = v + break + + hostname = node.hostname() or "" + override = directory.get(_norm_mac(mac), {}) + name = override.get("name") or "Unknown Device" + desc = override.get("desc") or "Unknown Device" + devices.append({ "ip": ip, "mac": mac, - "name": device_name, - "type": device_type, + "name": name, + "type": desc, "vendor": vendor, - "hostname": hostname + "hostname": hostname, }) + except Exception as exc: + _LOGGER.debug("Skipping host %s: %s", host, exc) - # Sort the devices by IP address - devices.sort(key=lambda x: [int(num) for num in x['ip'].split('.')]) - return devices - -async def async_setup_entry(hass, config_entry, async_add_entities): - """Set up the Network Scanner sensor from a config entry.""" - ip_range = config_entry.data.get("ip_range") - _LOGGER.debug("ip_range: %s", config_entry.data.get("ip_range")) - - # Initialize mac_mappings list to ensure at least 25 entries - mac_mappings_list = [] - - # Ensure we have at least 25 entries, even if config is missing some - for i in range(25): - key = f"mac_mapping_{i+1}" - mac_mapping = config_entry.data.get(key, "") - mac_mappings_list.append(mac_mapping) - _LOGGER.debug("mac_mapping_%s: %s", i+1, mac_mapping) - - # Continue adding additional mac mappings if present in the config - i = 25 - while True: - key = f"mac_mapping_{i+1}" - if key in config_entry.data: - mac_mapping = config_entry.data.get(key) - mac_mappings_list.append(mac_mapping) - _LOGGER.debug("mac_mapping_%s: %s", i+1, mac_mapping) - i += 1 - else: - break + def _ip_key(ip_str: str) -> List[int]: + try: + return [int(p) for p in ip_str.split(".")] + except Exception: + return [999, 999, 999, 999] - # Combine mac mappings into a newline-separated string - mac_mappings = "\n".join(mac_mappings_list) - _LOGGER.debug("mac_mappings: %s", mac_mappings) + devices.sort(key=lambda d: _ip_key(d.get("ip", ""))) + return devices - # Set up the network scanner entity - scanner = NetworkScanner(hass, ip_range, mac_mappings) - async_add_entities([scanner], True) +async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry, async_add_entities) -> None: + ip_range = entry.options.get("ip_range", entry.data.get("ip_range")) + if not ip_range: + _LOGGER.error("network_scanner: ip_range missing; not creating entity") + return + async_add_entities([NetworkScannerExtended(hass, entry)], False) diff --git a/hacs.json b/hacs.json index 41ca1d2..2b2ec13 100644 --- a/hacs.json +++ b/hacs.json @@ -1,5 +1,5 @@ { - "name": "Network Scanner", + "name": "Network Scanner Extended", "content_in_root": false, "zip_release": false, "render_readme": true,