-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathprobe.py
More file actions
123 lines (105 loc) · 3.82 KB
/
Copy pathprobe.py
File metadata and controls
123 lines (105 loc) · 3.82 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
"""
Network probe helpers for OpenSentry Command.
Contains lightweight reachability checks and fallback subnet scanning utilities
used by the web UI for discovery when mDNS is unavailable.
"""
from __future__ import annotations
import ipaddress
import socket
import subprocess
import concurrent.futures
from typing import Any, Dict, Iterable, List, Optional
# ---------- Fallback subnet probe (/health) ----------
DEFAULT_PROBE_PORT = 5000
DEFAULT_TCP_TIMEOUT = 0.6
DEFAULT_HTTP_TIMEOUT = 1.2
DEFAULT_CONCURRENCY = 256
def _is_private_ipv4(addr: str) -> bool:
try:
ip = ipaddress.ip_address(addr)
return isinstance(ip, ipaddress.IPv4Address) and ip.is_private
except ValueError:
return False
def detect_local_networks(limit_to_class_c: bool = True) -> List[ipaddress.IPv4Network]:
try:
out = subprocess.check_output(["ip", "-o", "-f", "inet", "addr", "show"], text=True)
except Exception:
return []
skip_prefixes = ("lo", "docker", "br-", "veth", "vmnet", "virbr", "kube", "cni", "tailscale", "zt", "wg")
found: set[str] = set()
networks: List[ipaddress.IPv4Network] = []
for line in out.splitlines():
parts = line.split()
if len(parts) < 4:
continue
iface = parts[1]
if any(iface.startswith(p) for p in skip_prefixes):
continue
if parts[2] != "inet":
continue
addr_cidr = parts[3]
ip_str, _, prefix_str = addr_cidr.partition("/")
if not _is_private_ipv4(ip_str):
continue
try:
prefix = int(prefix_str)
except ValueError:
continue
if limit_to_class_c and prefix < 24:
try:
net = ipaddress.ip_network(f"{ip_str}/24", strict=False)
except Exception:
continue
else:
try:
net = ipaddress.ip_interface(addr_cidr).network
except Exception:
continue
key = net.with_prefixlen
if key not in found:
found.add(key)
if net.network_address.is_link_local or net.network_address.is_multicast:
continue
networks.append(net)
return networks
def iter_hosts(networks: Iterable[ipaddress.IPv4Network]) -> Iterable[str]:
emitted: set[str] = set()
for net in networks:
for host in net.hosts():
s = str(host)
if s not in emitted:
emitted.add(s)
yield s
def port_open(ip: str, port: int, timeout: float) -> bool:
try:
with socket.create_connection((ip, port), timeout=timeout):
return True
except Exception:
return False
def verify_opensentry(ip: str, port: int, timeout: float) -> bool:
import requests
url = f"http://{ip}:{port}/health"
try:
r = requests.get(url, timeout=timeout)
return (r.status_code == 200 and (r.text or "").strip().lower() == "ok")
except Exception:
return False
def scan_ips(ips: List[str], port: int, tcp_timeout: float, http_timeout: float, concurrency: int) -> List[Dict[str, Any]]:
results: List[Dict[str, Any]] = []
def probe(ip: str) -> Optional[Dict[str, Any]]:
if not port_open(ip, port, tcp_timeout):
return None
if verify_opensentry(ip, port, http_timeout):
return {"ip": ip, "port": port, "url": f"http://{ip}:{port}/"}
return None
workers = min(max(8, concurrency), 1024)
with concurrent.futures.ThreadPoolExecutor(max_workers=workers) as ex:
futs = {ex.submit(probe, ip): ip for ip in ips}
for fut in concurrent.futures.as_completed(futs):
try:
res = fut.result()
if res:
results.append(res)
except Exception:
pass
return results