-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathupdate-ip.py
More file actions
executable file
·107 lines (81 loc) · 3.03 KB
/
update-ip.py
File metadata and controls
executable file
·107 lines (81 loc) · 3.03 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
#!/usr/bin/python3
import enum
import ipaddress
import logging
import socket
import typing
import modules.API.Yandex
import modules.web
import yaml
URL_GET_EXTERNAL_IPv4 = "http://ipecho.net/plain"
DST_GET_EXTERNAL_IP = "a.root-servers.net"
CONFIG_PATH = "/etc/update-ip.yml"
logger = logging.getLogger()
logger.setLevel(logging.INFO)
logger.addHandler(logging.StreamHandler())
class DnsRecordType(enum.Enum):
A = "A"
AAAA = "AAAA"
def detect_nat_address(url=URL_GET_EXTERNAL_IPv4) -> str:
return modules.web.get_url_body(url)
def detect_local_address(addr_type=socket.AF_INET6, dst_addr=DST_GET_EXTERNAL_IP) -> str:
s = socket.socket(addr_type, socket.SOCK_DGRAM)
s.connect((dst_addr, 80))
src_ip = s.getsockname()[0]
if not src_ip:
raise ValueError(f"cannot detect AF{socket.AF_INET6} address")
return src_ip
def main():
config: typing.Dict[str, typing.Any]
with open(CONFIG_PATH, "r", encoding="utf-8") as fh:
config = yaml.load(fh, Loader=yaml.BaseLoader)
dns_api = modules.API.Yandex.API360(config["organization"], config["domain"], config["token"])
domains_desc = dns_api.list_domain()
if "records" not in domains_desc:
raise ValueError("cannot get domain description")
detected_addresses: typing.Set(ipaddress._BaseAddress) = set()
for func_detect_address in (detect_nat_address, detect_local_address):
try:
_ip_addr = func_detect_address()
detected_addresses.add(ipaddress.ip_address(_ip_addr))
except Exception as err:
logger.warning("func detect source address exception: %s", err)
continue
for addr in detected_addresses:
_record_exists_flag: bool = False
record_id: typing.Optional[int] = None
ip_addr: str = addr.compressed
record_type: DnsRecordType = DnsRecordType.A.value
if isinstance(addr, ipaddress.IPv6Address):
record_type = DnsRecordType.AAAA.value
for r_desc in domains_desc["records"]:
if r_desc["type"] == record_type and r_desc["name"] == config["subdomain"]:
if r_desc["address"] == ip_addr:
_record_exists_flag = True
break
record_id = r_desc["recordId"]
break
if _record_exists_flag:
continue
if record_id is not None:
message = dns_api.edit_domain(
**{
"record_id": record_id,
"address": ip_addr,
"name": config["subdomain"],
"record_type": record_type,
"ttl": config["ttl"],
}
)
else:
message = dns_api.add_domain(
**{
"address": ip_addr,
"name": config["subdomain"],
"record_type": record_type,
"ttl": config["ttl"],
}
)
logger.debug("message: %s", message)
if __name__ == "__main__":
main()