-
Notifications
You must be signed in to change notification settings - Fork 31
Expand file tree
/
Copy pathmitm.py
More file actions
99 lines (93 loc) · 4.42 KB
/
mitm.py
File metadata and controls
99 lines (93 loc) · 4.42 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
# mitm.py
import datetime
import logging
import os
import ssl
import tempfile
from cryptography import x509
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.x509.oid import NameOID
log = logging.getLogger("MITM")
CA_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "ca")
CA_KEY_FILE = os.path.join(CA_DIR, "ca.key")
CA_CERT_FILE = os.path.join(CA_DIR, "ca.crt")
class MITMCertManager:
def __init__(self):
self._ca_key = None
self._ca_cert = None
self._ctx_cache: dict[str, ssl.SSLContext] = {}
self._cert_dir = tempfile.mkdtemp(prefix="domainfront_certs_")
self._ensure_ca()
def _ensure_ca(self):
if os.path.exists(CA_KEY_FILE) and os.path.exists(CA_CERT_FILE):
with open(CA_KEY_FILE, "rb") as f:
self._ca_key = serialization.load_pem_private_key(f.read(), password=None)
with open(CA_CERT_FILE, "rb") as f:
self._ca_cert = x509.load_pem_x509_certificate(f.read())
else:
self._create_ca()
def _create_ca(self):
os.makedirs(CA_DIR, exist_ok=True)
self._ca_key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
subject = issuer = x509.Name([
x509.NameAttribute(NameOID.COMMON_NAME, "MasterHttpRelayVPN"),
x509.NameAttribute(NameOID.ORGANIZATION_NAME, "MasterHttpRelayVPN"),
])
now = datetime.datetime.now(datetime.timezone.utc)
self._ca_cert = (
x509.CertificateBuilder()
.subject_name(subject)
.issuer_name(issuer)
.public_key(self._ca_key.public_key())
.serial_number(x509.random_serial_number())
.not_valid_before(now)
.not_valid_after(now + datetime.timedelta(days=3650))
.add_extension(x509.BasicConstraints(ca=True, path_length=0), critical=True)
.add_extension(
x509.KeyUsage(
digital_signature=True, key_cert_sign=True, crl_sign=True,
content_commitment=False, key_encipherment=False, data_encipherment=False,
key_agreement=False, encipher_only=False, decipher_only=False,
),
critical=True,
)
.sign(self._ca_key, hashes.SHA256())
)
with open(CA_KEY_FILE, "wb") as f:
f.write(self._ca_key.private_bytes(serialization.Encoding.PEM, serialization.PrivateFormat.TraditionalOpenSSL, serialization.NoEncryption()))
with open(CA_CERT_FILE, "wb") as f:
f.write(self._ca_cert.public_bytes(serialization.Encoding.PEM))
def get_server_context(self, domain: str) -> ssl.SSLContext:
if domain not in self._ctx_cache:
key_pem, cert_pem = self._generate_domain_cert(domain)
cert_file = os.path.join(self._cert_dir, f"{domain}.crt")
key_file = os.path.join(self._cert_dir, f"{domain}.key")
ca_pem = self._ca_cert.public_bytes(serialization.Encoding.PEM)
with open(cert_file, "wb") as f:
f.write(cert_pem + ca_pem)
with open(key_file, "wb") as f:
f.write(key_pem)
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
ctx.set_alpn_protocols(["http/1.1"])
ctx.load_cert_chain(cert_file, key_file)
self._ctx_cache[domain] = ctx
return self._ctx_cache[domain]
def _generate_domain_cert(self, domain: str):
key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
subject = x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, domain)])
now = datetime.datetime.now(datetime.timezone.utc)
cert = (
x509.CertificateBuilder()
.subject_name(subject)
.issuer_name(self._ca_cert.subject)
.public_key(key.public_key())
.serial_number(x509.random_serial_number())
.not_valid_before(now)
.not_valid_after(now + datetime.timedelta(days=365))
.add_extension(x509.SubjectAlternativeName([x509.DNSName(domain)]), critical=False)
.sign(self._ca_key, hashes.SHA256())
)
key_pem = key.private_bytes(serialization.Encoding.PEM, serialization.PrivateFormat.TraditionalOpenSSL, serialization.NoEncryption())
cert_pem = cert.public_bytes(serialization.Encoding.PEM)
return key_pem, cert_pem