Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 39 additions & 30 deletions mtprotoproxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,10 +160,10 @@ def init_config():
# use middle proxy, necessary to show ad
conf_dict.setdefault("USE_MIDDLE_PROXY", len(conf_dict["AD_TAG"]) == 16)

# if IPv6 avaliable, use it by default
conf_dict.setdefault("PREFER_IPV6", socket.has_ipv6)
# if IPv6 available, use it by default, IPv6 with middle proxies is unstable now
conf_dict.setdefault("PREFER_IPV6", socket.has_ipv6 and not conf_dict["USE_MIDDLE_PROXY"])

# disables tg->client trafic reencryption, faster but less secure
# disables tg->client traffic reencryption, faster but less secure
conf_dict.setdefault("FAST_MODE", True)

# enables some working modes
Expand Down Expand Up @@ -289,6 +289,9 @@ def init_config():
# telegram servers connect timeout in seconds
conf_dict.setdefault("TG_CONNECT_TIMEOUT", 10)

# drop connection if no data from telegram server for this many seconds
conf_dict.setdefault("TG_READ_TIMEOUT", 60)

# listen address for IPv4
conf_dict.setdefault("LISTEN_ADDR_IPV4", "0.0.0.0")

Expand Down Expand Up @@ -517,7 +520,7 @@ def getrandbytes(self, n):


class TgConnectionPool:
MAX_CONNS_IN_POOL = 64
MAX_CONNS_IN_POOL = 16

def __init__(self):
self.pools = {}
Expand All @@ -542,6 +545,16 @@ async def open_tg_connection(self, host, port, init_func=None):
)
return reader_tgt, writer_tgt

def is_conn_dead(self, reader, writer):
if writer.transport.is_closing():
return True
raw_reader = reader
while hasattr(raw_reader, 'upstream'):
raw_reader = raw_reader.upstream
if raw_reader.at_eof():
return True
return False

def register_host_port(self, host, port, init_func):
if (host, port, init_func) not in self.pools:
self.pools[(host, port, init_func)] = []
Expand Down Expand Up @@ -708,7 +721,7 @@ async def read(self, n):

needed_till_full_block = -len(data) % self.block_size
if needed_till_full_block > 0:
data += self.upstream.readexactly(needed_till_full_block)
data += await self.upstream.readexactly(needed_till_full_block)
return self.decryptor.decrypt(data)

async def readexactly(self, n):
Expand Down Expand Up @@ -832,8 +845,8 @@ class MTProtoCompactFrameStreamWriter(LayeredStreamWriterBase):
__slots__ = ()

def write(self, data, extra={}):
SMALL_PKT_BORDER = 0x7F
LARGE_PKT_BORGER = 256**3
SMALL_PKT_BORDER = 0x7f
LARGE_PKT_BORDER = 256 ** 3

if len(data) % 4 != 0:
print_err(
Expand All @@ -849,10 +862,8 @@ def write(self, data, extra={}):

if len_div_four < SMALL_PKT_BORDER:
return self.upstream.write(bytes([len_div_four]) + data)
elif len_div_four < LARGE_PKT_BORGER:
return self.upstream.write(
b"\x7f" + int.to_bytes(len_div_four, 3, "little") + data
)
elif len_div_four < LARGE_PKT_BORDER:
return self.upstream.write(b'\x7f' + int.to_bytes(len_div_four, 3, 'little') + data)
else:
print_err("Attempted to send too large pkt len =", len(data))
return 0
Expand Down Expand Up @@ -1457,10 +1468,7 @@ async def do_direct_handshake(proto_tag, dc_idx, dec_key_and_iv=None):
)
return False
except ConnectionAbortedError as E:
print_err(
"The Telegram server connection is bad: %d (%s %s) %s"
% (dc_idx, addr, port, E)
)
print_err("The Telegram server connection is bad: %d (%s %s) %s" % (dc_idx, dc, TG_DATACENTER_PORT, E))
return False
except (OSError, asyncio.TimeoutError) as E:
print_err("Unable to connect to", dc, TG_DATACENTER_PORT)
Expand Down Expand Up @@ -1725,7 +1733,11 @@ async def do_middleproxy_handshake(proto_tag, dc_idx, cl_ip, cl_port):
async def tg_connect_reader_to_writer(rd, wr, user, rd_buf_size, is_upstream):
try:
while True:
data = await rd.read(rd_buf_size)
if not is_upstream:
data = await asyncio.wait_for(rd.read(rd_buf_size),
timeout=config.TG_READ_TIMEOUT)
else:
data = await rd.read(rd_buf_size)
if isinstance(data, tuple):
data, extra = data
else:
Expand All @@ -1750,7 +1762,7 @@ async def tg_connect_reader_to_writer(rd, wr, user, rd_buf_size, is_upstream):

wr.write(data, extra)
await wr.drain()
except (OSError, asyncio.IncompleteReadError) as e:
except (OSError, asyncio.IncompleteReadError, asyncio.TimeoutError) as e:
# print_err(e)
pass

Expand Down Expand Up @@ -2227,13 +2239,10 @@ async def get_srv_time():
continue
line = line[len("Date: ") :].decode()
srv_time = datetime.datetime.strptime(line, "%a, %d %b %Y %H:%M:%S %Z")
now_time = datetime.datetime.utcnow()
is_time_skewed = (now_time - srv_time).total_seconds() > MAX_TIME_SKEW
if (
is_time_skewed
and config.USE_MIDDLE_PROXY
and not disable_middle_proxy
):
srv_time = srv_time.replace(tzinfo=datetime.timezone.utc)
now_time = datetime.datetime.now(datetime.timezone.utc)
is_time_skewed = (now_time-srv_time).total_seconds() > MAX_TIME_SKEW
if is_time_skewed and config.USE_MIDDLE_PROXY and not disable_middle_proxy:
print_err("Time skew detected, please set the clock")
print_err("Server time:", srv_time, "your time:", now_time)
print_err("Disabling advertising to continue serving")
Expand Down Expand Up @@ -2379,15 +2388,15 @@ def print_tg_info():
for ip in ip_addrs:
if config.MODES["classic"]:
params = {"server": ip, "port": config.PORT, "secret": secret}
params_encodeded = urllib.parse.urlencode(params, safe=":")
classic_link = "tg://proxy?{}".format(params_encodeded)
params_encoded = urllib.parse.urlencode(params, safe=':')
classic_link = "tg://proxy?{}".format(params_encoded)
proxy_links.append({"user": user, "link": classic_link})
print("{}: {}".format(user, classic_link), flush=True)

if config.MODES["secure"]:
params = {"server": ip, "port": config.PORT, "secret": "dd" + secret}
params_encodeded = urllib.parse.urlencode(params, safe=":")
dd_link = "tg://proxy?{}".format(params_encodeded)
params_encoded = urllib.parse.urlencode(params, safe=':')
dd_link = "tg://proxy?{}".format(params_encoded)
proxy_links.append({"user": user, "link": dd_link})
print("{}: {}".format(user, dd_link), flush=True)

Expand All @@ -2397,8 +2406,8 @@ def print_tg_info():
# tls_secret = bytes.fromhex("ee" + secret) + config.TLS_DOMAIN.encode()
# tls_secret_base64 = base64.urlsafe_b64encode(tls_secret)
params = {"server": ip, "port": config.PORT, "secret": tls_secret}
params_encodeded = urllib.parse.urlencode(params, safe=":")
tls_link = "tg://proxy?{}".format(params_encodeded)
params_encoded = urllib.parse.urlencode(params, safe=':')
tls_link = "tg://proxy?{}".format(params_encoded)
proxy_links.append({"user": user, "link": tls_link})
print("{}: {}".format(user, tls_link), flush=True)

Expand Down
Loading