diff --git a/apps_script/vps_exit_node.py b/apps_script/vps_exit_node.py index 027bbab..7b48fe7 100644 --- a/apps_script/vps_exit_node.py +++ b/apps_script/vps_exit_node.py @@ -34,6 +34,7 @@ import logging import os import re +import socket import socketserver import sys import urllib.error @@ -209,6 +210,23 @@ def _relay_request( } + + +def _relay_udp_packet(host: str, port: int, payload: bytes) -> dict: + """Send one UDP packet and return one response packet (best effort).""" + if not host or port <= 0 or port > 65535: + return {"e": "bad_udp_target"} + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + try: + sock.settimeout(2.0) + sock.sendto(payload, (host, port)) + data, _ = sock.recvfrom(65535) + return {"ok": True, "payload": base64.b64encode(data).decode()} + except Exception as exc: + return {"e": str(exc) or type(exc).__name__} + finally: + sock.close() + # --------------------------------------------------------------------------- # HTTP request handler # --------------------------------------------------------------------------- @@ -265,6 +283,7 @@ def do_POST(self): # noqa: N802 m = str(body.get("m") or "GET").upper() h = _sanitize_headers(body.get("h")) b64 = body.get("b") + udp_mode = bool(body.get("udp")) if not _PSK: self._send_json(500, {"e": "server_psk_missing"}) @@ -275,6 +294,25 @@ def do_POST(self): # noqa: N802 self._send_json(401, {"e": "unauthorized"}) return + + if udp_mode: + host = str(body.get("host") or "") + try: + port = int(body.get("port") or 0) + except Exception: + port = 0 + payload = b"" + pb64 = body.get("payload") + if isinstance(pb64, str) and pb64: + try: + payload = base64.b64decode(pb64) + except Exception: + self._send_json(400, {"e": "bad_udp_payload"}) + return + result = _relay_udp_packet(host, port, payload) + self._send_json(200, result) + return + if not _safe_url(u): self._send_json(400, {"e": "bad_url"}) return diff --git a/docs/GETTING_STARTED.md b/docs/GETTING_STARTED.md index 4a26f85..309620e 100644 --- a/docs/GETTING_STARTED.md +++ b/docs/GETTING_STARTED.md @@ -1,156 +1,218 @@ -# Getting Started +# Getting Started (Complete Guide) -This guide keeps only the setup path most users need. Follow it once, then come back to the root README only when you need another topic. +This is the **full, practical setup guide** for first-time users. +If you follow this page in order, you can go from zero to a working proxy. -## What You Need +--- -- Python 3.10 or newer -- A free Google account -- Git, or the ZIP download from GitHub -- A browser where you can set an HTTP proxy +## Quick Outcome -## 1. Get The Project +When done correctly: +- Local HTTP proxy works at `127.0.0.1:8085` +- Local SOCKS5 proxy works at `127.0.0.1:1080` +- HTTPS websites open without certificate warnings +- You can run route checks with `--scan` and stability-first checks with `--adaptive-scan` -Choose whichever option works on your network. +--- -**Option A: ZIP** +## Prerequisites -[Click to Download](https://github.com/masterking32/MasterHttpRelayVPN/archive/refs/heads/python_testing.zip) +- Python `3.10+` +- A Google account (for Apps Script relay) +- Git (optional, ZIP download also works) +- A browser where you can set manual proxy +--- -**Option B: Git** +## 1) Download The Project + +### Option A — ZIP + +Download and extract: + +- + +Then open terminal in extracted folder. + +### Option B — Git ```bash git clone https://github.com/masterking32/MasterHttpRelayVPN.git cd MasterHttpRelayVPN ``` -## 2. Deploy The Google Relay +--- -The relay is the small Apps Script program that fetches websites for you. +## 2) Deploy The Google Apps Script Relay -1. Open [Google Apps Script](https://script.google.com/) and sign in. +1. Open and sign in. 2. Click **New project**. -3. Delete the default editor content. -4. Open [apps_script/Code.gs](../apps_script/Code.gs), copy everything, and paste it into Apps Script. -5. Change this line to a long secret: +3. Delete default code. +4. Open local file [`apps_script/Code.gs`](../apps_script/Code.gs), copy all code, paste into Apps Script. +5. Change: ```javascript const AUTH_KEY = "your-secret-password-here"; ``` -6. Click **Deploy** -> **New deployment**. + to your own long random secret. + +6. Click **Deploy** → **New deployment**. 7. Select **Web app**. -8. Set **Execute as** to **Me**. -9. Set **Who has access** to **Anyone**. -10. Click **Deploy**, authorize the app, and copy the **Deployment ID**. +8. Set **Execute as** = **Me**. +9. Set **Who has access** = **Anyone**. +10. Deploy, authorize, copy **Deployment ID**. -Keep the `AUTH_KEY` and Deployment ID nearby. You need both locally. +You now need **both** values locally: +- `Deployment ID` +- `AUTH_KEY` -## 3. Run The One-Click Launcher +--- -**Windows** +## 3) Start The App (Recommended) + +### Windows ```cmd start.bat ``` -**Linux / macOS** +### Linux / macOS ```bash chmod +x start.sh ./start.sh ``` -The launcher creates `.venv`, installs dependencies, runs `setup.py` if `config.json` is missing, and starts the proxy. - -If dependency installation fails through PyPI, the launcher retries through the runflare mirror automatically. +What launcher does: +- creates `.venv` +- installs dependencies +- runs setup wizard if `config.json` is missing +- starts proxy -## 4. Answer The Setup Wizard +--- -When the wizard opens: +## 4) Fill Setup Wizard Correctly -1. Enter the same `auth_key` you placed inside [apps_script/Code.gs](../apps_script/Code.gs). -2. Paste the Apps Script Deployment ID. -3. Keep the default HTTP proxy port `8085` unless you already use that port. -4. Keep LAN sharing off unless other devices must use this proxy. +When prompted: +1. `auth_key` = exactly same as `AUTH_KEY` in Apps Script +2. `script_id` = your Deployment ID +3. Keep HTTP port `8085` unless busy +4. Keep LAN sharing disabled unless you need other devices -The wizard writes `config.json` for you. +The wizard creates `config.json`. -## 5. Configure Your Browser +--- -Use the HTTP proxy for normal browsing: +## 5) Configure Browser Proxy | Field | Value | -|-------|-------| +|---|---| | Proxy type | HTTP | | Address | `127.0.0.1` | | Port | `8085` | -Firefox path: **Settings** -> **General** -> **Network Settings** -> **Manual proxy**. Enter `127.0.0.1` and `8085`, then enable the option to also use it for HTTPS. +For Firefox: Settings → General → Network Settings → Manual proxy. +Enable proxy for HTTPS too. -Chrome and Edge use the system proxy on Windows. You can also use extensions such as FoxyProxy or SwitchyOmega for easier switching. +--- -## 6. Install The CA Certificate +## 6) Install Local CA (HTTPS Required) -HTTPS browsing needs the local CA certificate generated by the proxy. The file is created at `ca/ca.crt` after first run. +The proxy generates `ca/ca.crt`. +If auto-install fails, install manually. -**The app tries to install it automatically. If it cannot, install it manually:** +### Windows +1. Open `ca/ca.crt` +2. Install Certificate +3. Current User +4. Place in **Trusted Root Certification Authorities** +5. Restart browser fully -**Windows** +### macOS +1. Open `ca/ca.crt` in Keychain Access +2. Open certificate → Trust +3. Set **Always Trust** +4. Restart browser -1. Double-click `ca/ca.crt`. -2. Choose **Install Certificate**. -3. Choose **Current User**. -4. Choose **Place all certificates in the following store**. -5. Select **Trusted Root Certification Authorities**. -6. Finish, then fully restart your browser. - -**macOS** - -1. Open `ca/ca.crt` in Keychain Access. -2. Find the certificate and open it. -3. Expand **Trust**. -4. Set **When using this certificate** to **Always Trust**. -5. Close the dialog, enter your password, and restart your browser. - -**Linux Ubuntu / Debian** +### Ubuntu / Debian ```bash sudo cp ca/ca.crt /usr/local/share/ca-certificates/masterhttp-relay.crt sudo update-ca-certificates ``` -Restart your browser after installing. +Restart browser. + +### Firefox (if needed) +Firefox can use a separate trust store: +- Settings → Privacy & Security → Certificates → View Certificates → Authorities → Import `ca/ca.crt` +- Enable trust for website identification + +--- + +## 7) Verify It Works + +- Open normal websites through browser proxy. +- If `unauthorized` appears: `AUTH_KEY` mismatch between Apps Script and `config.json`. +- If HTTPS certificate errors appear: CA not trusted correctly. -**Firefox** +--- -Firefox may use a separate certificate store: +## 8) Route Quality Commands -1. Open **Settings** -> **Privacy & Security** -> **Certificates** -> **View Certificates**. -2. Go to **Authorities**. -3. Click **Import** and select `ca/ca.crt`. -4. Enable **Trust this CA to identify websites**. +### Fast reachability scan -## Manual Run Commands +```bash +python main.py --scan +``` -Use these only if you are not using the launcher: +Use suggested `google_ip` in `config.json`. + +### Stability-first adaptive scan (recommended for unstable networks) ```bash +python main.py --adaptive-scan +``` + +This ranking is based on route stability metrics (not only minimum ping). + +--- + +## 9) Manual Start (Without launcher) + +### Windows + +```cmd python -m venv .venv .venv\Scripts\python -m pip install -r requirements.txt .venv\Scripts\python setup.py .venv\Scripts\python main.py ``` -On Linux / macOS, replace `.venv\Scripts\python` with `.venv/bin/python`. +### Linux / macOS + +```bash +python3 -m venv .venv +.venv/bin/python -m pip install -r requirements.txt +.venv/bin/python setup.py +.venv/bin/python main.py +``` + +--- + +## 10) Common Problems (Short) -## Done +- `unauthorized`: auth key mismatch +- proxy connects but sites fail: wrong Deployment ID or script deployment not public +- HTTPS warnings: CA not installed/trusted +- some services block Google egress: use Exit Node guide -When everything is working, the terminal shows the HTTP proxy on `127.0.0.1:8085` and SOCKS5 on `127.0.0.1:1080`. +--- -Next useful pages: +## Next Docs - [Troubleshooting](TROUBLESHOOTING.md) -- [Configuration Reference](CONFIGURATION.md) -- [Exit Node Guide](exit-node/EXIT_NODE_DEPLOYMENT.md) +- [Configuration](CONFIGURATION.md) +- [Exit Node](exit-node/EXIT_NODE_DEPLOYMENT.md) +- [Architecture](ARCHITECTURE.md) diff --git a/docs/fa/GETTING_STARTED.md b/docs/fa/GETTING_STARTED.md index b0102c6..c10d2b3 100644 --- a/docs/fa/GETTING_STARTED.md +++ b/docs/fa/GETTING_STARTED.md @@ -1,70 +1,217 @@ -# شروع سریع +# شروع سریع (راهنمای کامل) -این راهنما مسیر ساده راه‌اندازی را نشان می‌دهد: یک رله Google Apps Script، یک فایل `config.json`، و پراکسی محلی روی سیستم شما. +این صفحه یک **راهنمای کامل و ساده** برای شروع است. +اگر مرحله‌به‌مرحله جلو بروید، در پایان پراکسی شما کاملا کار خواهد کرد. -## 1. دریافت پروژه +--- -**با Git:** +## نتیجه نهایی + +اگر همه چیز درست انجام شود: +- پراکسی HTTP روی `127.0.0.1:8085` فعال می‌شود +- پراکسی SOCKS5 روی `127.0.0.1:1080` فعال می‌شود +- سایت‌های HTTPS بدون خطای گواهی باز می‌شوند +- می‌توانید با `--scan` و `--adaptive-scan` کیفیت مسیر را بررسی کنید + +--- + +## پیش‌نیازها + +- Python نسخه `3.10+` +- یک اکانت Google (برای Apps Script) +- Git (اختیاری؛ دانلود ZIP هم کافی است) +- مرورگری که تنظیم پراکسی دستی داشته باشد + +--- + +## 1) دریافت پروژه + +### روش A — ZIP + +دانلود و extract: +- + +بعد داخل پوشه پروژه terminal باز کنید. + +### روش B — Git ```bash git clone https://github.com/masterking32/MasterHttpRelayVPN.git cd MasterHttpRelayVPN ``` -**با ZIP:** +--- + +## 2) ساخت رله Google Apps Script -- صفحه [GitHub پروژه](https://github.com/masterking32/MasterHttpRelayVPN) را باز کنید. -- روی **Code** -> **Download ZIP** کلیک کنید. -- فایل ZIP را extract کنید. -- داخل پوشه `MasterHttpRelayVPN` یک terminal باز کنید. +1. وارد شوید. +2. روی **New project** کلیک کنید. +3. کد پیش‌فرض را پاک کنید. +4. فایل [`apps_script/Code.gs`](../../apps_script/Code.gs) را باز کنید، کل محتوا را کپی و در Apps Script paste کنید. +5. این مقدار را تغییر دهید: -## 2. ساخت رله Google + ```javascript + const AUTH_KEY = "your-secret-password-here"; + ``` -- به [Google Apps Script](https://script.google.com/) بروید و یک پروژه جدید بسازید. -- محتوای [apps_script/Code.gs](../../apps_script/Code.gs) را داخل فایل `Code.gs` کپی کنید. -- مقدار `AUTH_KEY` را به یک رمز طولانی و تصادفی تغییر دهید. -- از مسیر **Deploy** -> **New deployment** نوع **Web app** را انتخاب کنید. -- گزینه **Execute as** را روی **Me** و گزینه دسترسی را روی **Anyone** بگذارید. -- Deploy کنید و `Deployment ID` را نگه دارید. + و یک رمز طولانی و تصادفی خودتان بگذارید. -بعد از هر تغییر در `Code.gs` باید deployment جدید بسازید. +6. مسیر **Deploy** → **New deployment** را بزنید. +7. نوع **Web app** را انتخاب کنید. +8. **Execute as** را روی **Me** بگذارید. +9. **Who has access** را روی **Anyone** بگذارید. +10. Deploy کنید، دسترسی را تایید کنید، و **Deployment ID** را کپی کنید. -## 3. اجرای لانچر +دو مقدار مهم برای سیستم محلی: +- `Deployment ID` +- `AUTH_KEY` -**Windows:** +--- + +## 3) اجرای برنامه (روش پیشنهادی) + +### Windows ```cmd start.bat ``` -**Linux / macOS:** +### Linux / macOS ```bash chmod +x start.sh ./start.sh ``` -لانچر محیط مجازی می‌سازد، وابستگی‌ها را نصب می‌کند، اگر `config.json` وجود نداشته باشد setup wizard را اجرا می‌کند، و سپس پراکسی را بالا می‌آورد. +لانچر کارهای زیر را انجام می‌دهد: +- ساخت `.venv` +- نصب وابستگی‌ها +- اجرای setup wizard اگر `config.json` موجود نباشد +- اجرای پراکسی + +--- -## 4. تنظیم مرورگر +## 4) تکمیل Setup Wizard -مرورگر را روی پراکسی زیر تنظیم کنید: +در wizard: +1. `auth_key` دقیقا برابر `AUTH_KEY` در Apps Script باشد +2. `script_id` همان Deployment ID شما باشد +3. پورت HTTP را `8085` بگذارید (مگر اینکه اشغال باشد) +4. LAN sharing را فقط وقتی لازم دارید روشن کنید + +در پایان فایل `config.json` ساخته می‌شود. + +--- + +## 5) تنظیم پراکسی در مرورگر | گزینه | مقدار | -|-------|-------| +|---|---| | نوع پراکسی | HTTP | | آدرس | `127.0.0.1` | | پورت | `8085` | -| SOCKS5، اختیاری | `127.0.0.1:1080` | -برای HTTPS اگر مرورگر خطای گواهی داد، فایل `ca/ca.crt` را به عنوان trusted root نصب کنید و مرورگر را کامل ببندید و دوباره باز کنید. +در Firefox: Settings → General → Network Settings → Manual proxy +و برای HTTPS هم فعال کنید. + +--- + +## 6) نصب CA محلی (برای HTTPS ضروری) + +فایل گواهی در `ca/ca.crt` ساخته می‌شود. +اگر نصب خودکار انجام نشد، دستی نصب کنید. + +### Windows +1. فایل `ca/ca.crt` را باز کنید +2. Install Certificate +3. Current User +4. ذخیره در **Trusted Root Certification Authorities** +5. مرورگر را کامل ببندید و دوباره باز کنید + +### macOS +1. `ca/ca.crt` را در Keychain Access باز کنید +2. بخش Trust را باز کنید +3. روی **Always Trust** بگذارید +4. مرورگر را ری‌استارت کنید + +### Ubuntu / Debian + +```bash +sudo cp ca/ca.crt /usr/local/share/ca-certificates/masterhttp-relay.crt +sudo update-ca-certificates +``` + +مرورگر را ری‌استارت کنید. + +### Firefox (در صورت نیاز) +ممکن است Firefox store جداگانه داشته باشد: +- Settings → Privacy & Security → Certificates → View Certificates → Authorities → Import `ca/ca.crt` +- گزینه اعتماد برای شناسایی وب‌سایت را فعال کنید + +--- + +## 7) تست عملکرد + +- چند سایت عادی باز کنید. +- اگر `unauthorized` دیدید: `AUTH_KEY` ناهماهنگ است. +- اگر خطای گواهی HTTPS دیدید: CA درست نصب نشده. + +--- + +## 8) دستورهای بررسی کیفیت مسیر + +### اسکن سریع دسترسی IP + +```bash +python main.py --scan +``` + +IP پیشنهادی را در `config.json` قرار دهید. + +### اسکن تطبیقیِ پایداری‌محور (پیشنهادی برای شبکه ناپایدار) + +```bash +python main.py --adaptive-scan +``` + +این اسکن فقط روی کمترین پینگ تصمیم نمی‌گیرد و پایداری را هم در نظر می‌گیرد. + +--- + +## 9) اجرای دستی (بدون لانچر) + +### Windows + +```cmd +python -m venv .venv +.venv\Scripts\python -m pip install -r requirements.txt +.venv\Scripts\python setup.py +.venv\Scripts\python main.py +``` + +### Linux / macOS + +```bash +python3 -m venv .venv +.venv/bin/python -m pip install -r requirements.txt +.venv/bin/python setup.py +.venv/bin/python main.py +``` + +--- + +## 10) مشکلات رایج (خلاصه) -## 5. بررسی سریع +- `unauthorized`: عدم تطابق auth key +- اتصال پراکسی برقرار است ولی سایت‌ها باز نمی‌شوند: Deployment ID اشتباه یا دسترسی Web app درست نیست +- خطای HTTPS: گواهی CA نصب/Trusted نشده +- برخی سرویس‌ها خروجی Google را می‌بندند: از Exit Node استفاده کنید -- اگر `unauthorized` دیدید، مقدار `AUTH_KEY` در Apps Script باید دقیقا با `auth_key` در `config.json` یکی باشد. -- اگر صفحه‌ها باز نمی‌شوند، [رفع مشکل](TROUBLESHOOTING.md) را ببینید. -- اگر سرعت پایین است، دستور `python main.py --scan` را اجرا کنید و IP پیشنهادی را در `config.json` بگذارید. +--- -## قدم بعدی +## صفحه‌های بعدی -برای همه گزینه‌های تنظیمات، [مرجع تنظیمات](CONFIGURATION.md) را بخوانید. برای مسیرهای خاص مثل ChatGPT یا Turnstile، [راهنمای Exit Node](../exit-node/EXIT_NODE_DEPLOYMENT_FA.md) را ببینید. +- [رفع مشکل](TROUBLESHOOTING.md) +- [مرجع تنظیمات](CONFIGURATION.md) +- [راهنمای Exit Node](../exit-node/EXIT_NODE_DEPLOYMENT_FA.md) +- [معماری](ARCHITECTURE.md) diff --git a/main.py b/main.py index 76e7370..268c16f 100644 --- a/main.py +++ b/main.py @@ -24,6 +24,7 @@ from core.constants import __version__ from core.lan_utils import log_lan_access from core.google_ip_scanner import scan_sync +from core.adaptive_transport import AdaptiveRouteEngine, ProbeTarget from core.logging_utils import configure as configure_logging, print_banner from proxy.mitm import CA_CERT_FILE from proxy.proxy_server import ProxyServer @@ -99,6 +100,11 @@ def parse_args(): action="store_true", help="Scan Google IPs to find the fastest reachable one and exit.", ) + parser.add_argument( + "--adaptive-scan", + action="store_true", + help="Run adaptive transport scan (jitter/loss/stability-first) and exit.", + ) return parser.parse_args() @@ -224,6 +230,24 @@ def main(): print("Deploy the Apps Script from Code.gs and paste the Deployment ID.") sys.exit(1) + + if args.adaptive_scan: + configure_logging("INFO") + from core.constants import CANDIDATE_IPS + engine = AdaptiveRouteEngine(config.get("route_db", "route_intel.sqlite3")) + targets = [ + ProbeTarget(ip=ip, port=443, sni=config.get("front_domain", "www.google.com"), transport_profile="vless_reality") + for ip in CANDIDATE_IPS + ] + ranked = asyncio.run(engine.evaluate(targets)) + if not ranked: + print("No viable adaptive routes found") + sys.exit(1) + print("Adaptive route ranking (stability-first):") + for i, route in enumerate(ranked[:5], 1): + print(f"{i}. {route.target.ip} score={route.score:.4f} median={route.median_rtt_ms:.1f}ms jitter={route.jitter_ms:.1f} loss={route.packet_loss:.2%} handshake={route.handshake_success_rate:.2%}") + sys.exit(0) + # ── Google IP Scanner ────────────────────────────────────────────────── if args.scan: configure_logging("INFO") diff --git a/src/core/adaptive_transport/__init__.py b/src/core/adaptive_transport/__init__.py new file mode 100644 index 0000000..2e7b603 --- /dev/null +++ b/src/core/adaptive_transport/__init__.py @@ -0,0 +1,11 @@ +from .engine import AdaptiveRouteEngine, AdaptiveRouteConfig +from .models import ProbeTarget, RouteScore +from .storage import RouteIntelligenceStore + +__all__ = [ + "AdaptiveRouteEngine", + "AdaptiveRouteConfig", + "ProbeTarget", + "RouteScore", + "RouteIntelligenceStore", +] diff --git a/src/core/adaptive_transport/engine.py b/src/core/adaptive_transport/engine.py new file mode 100644 index 0000000..0e6e74b --- /dev/null +++ b/src/core/adaptive_transport/engine.py @@ -0,0 +1,129 @@ +from __future__ import annotations + +import asyncio +import logging +import time +from dataclasses import dataclass + +from .hygiene import validate_public_ip +from .models import ProbeTarget, RouteScore, RuntimeMetrics, SessionState +from .probe import AsyncRouteProbe, summarize +from .storage import RouteIntelligenceStore + +logger = logging.getLogger(__name__) + + +@dataclass(slots=True) +class AdaptiveRouteConfig: + min_concurrency: int = 1 + max_concurrency: int = 16 + sticky_session_s: float = 180.0 + switch_guard_s: float = 30.0 + circuit_breaker_failures: int = 4 + + +class AdaptiveRouteEngine: + def __init__(self, db_path: str, cfg: AdaptiveRouteConfig | None = None): + self.cfg = cfg or AdaptiveRouteConfig() + self.store = RouteIntelligenceStore(db_path) + self.probe = AsyncRouteProbe() + self._active_route: RouteScore | None = None + self._active_until = 0.0 + self._failure_counts: dict[str, int] = {} + self._session = SessionState() + self._scan_tasks: set[asyncio.Task] = set() + self._stopped = False + + async def evaluate(self, targets: list[ProbeTarget], cancel_event: asyncio.Event | None = None) -> list[RouteScore]: + if self._stopped: + return [] + ordered_targets = sorted(targets, key=self._route_key) + concurrency = max(1, min(self.cfg.max_concurrency, max(self.cfg.min_concurrency, len(ordered_targets)))) + sem = asyncio.Semaphore(concurrency) + results: list[RouteScore] = [] + + async def worker(t: ProbeTarget): + validate_public_ip(t.ip) + if self._stopped or (cancel_event and cancel_event.is_set()): + return + async with sem: + if self._stopped or (cancel_event and cancel_event.is_set()): + return + samples = await self.probe.probe(t, include_quic=t.transport_profile == "quic") + med, jit, loss, hs, stable = summarize(samples) + score_value = (stable * 0.45) + ((1.0 - min(1.0, loss)) * 0.25) + (hs * 0.20) + (1.0 / (1.0 + jit + med / 100.0) * 0.10) + score = RouteScore(t, med, jit, loss, hs, stable, score_value) + results.append(score) + logger.info( + "route_score_breakdown", + extra={ + "route": self._route_key(t), + "median_rtt_ms": med, + "jitter_ms": jit, + "packet_loss": loss, + "handshake_success_rate": hs, + "session_stability": stable, + "score": score_value, + }, + ) + await self.store.record_score(score) + + self._scan_tasks = {asyncio.create_task(worker(t)) for t in ordered_targets} + try: + await asyncio.gather(*self._scan_tasks, return_exceptions=False) + finally: + self._scan_tasks.clear() + return sorted(results, key=lambda r: (r.score, -r.packet_loss, -r.jitter_ms, -r.median_rtt_ms, self._route_key(r.target)), reverse=True) + + async def select_route(self, candidates: list[RouteScore], gameplay_active: bool) -> RouteScore | None: + if self._stopped: + return None + now = time.time() + if not candidates: + logger.info("route_rejected", extra={"reason": "no_candidates", "selected": self._route_key(self._active_route.target) if self._active_route else None}) + return self._active_route + best = candidates[0] + if self._active_route and gameplay_active and not self._hard_failure(self._active_route.target): + self._session.state = "session_active" + logger.info("session_transition", extra={"transition": "session_active", "route": self._route_key(self._active_route.target)}) + logger.info("route_rejected", extra={"reason": "session_active_locked", "selected": self._route_key(self._active_route.target), "candidate": self._route_key(best.target)}) + return self._active_route + if self._active_route and now - self._active_route.sampled_at < self.cfg.switch_guard_s and not self._hard_failure(self._active_route.target): + logger.info("route_rejected", extra={"reason": "switch_guard", "selected": self._route_key(self._active_route.target), "candidate": self._route_key(best.target)}) + return self._active_route + if self._active_route and not self._hard_failure(self._active_route.target): + logger.info("route_rejected", extra={"reason": "active_route_not_failed", "selected": self._route_key(self._active_route.target), "candidate": self._route_key(best.target)}) + return self._active_route + self._active_route = best + self._active_until = now + self.cfg.sticky_session_s + self._session.state = "session_stable_window" if gameplay_active else "session_start" + self._session.stable_since = now + logger.info("session_transition", extra={"transition": self._session.state, "route": self._route_key(best.target)}) + logger.info("route_selected", extra={"route": self._route_key(best.target), "score": best.score, "median_rtt_ms": best.median_rtt_ms, "jitter_ms": best.jitter_ms, "packet_loss": best.packet_loss, "handshake": best.handshake_success_rate, "stability": best.session_stability}) + return best + + async def record_runtime_metrics(self, target: ProbeTarget, metrics: RuntimeMetrics) -> None: + await self.store.record_runtime_metrics(target, metrics) + + def register_route_failure(self, target: ProbeTarget) -> bool: + key = self._route_key(target) + self._failure_counts[key] = self._failure_counts.get(key, 0) + 1 + logger.info("session_transition", extra={"transition": "session_fail", "route": key, "failure_count": self._failure_counts[key]}) + return self._hard_failure(target) + + def bound_transport_route(self) -> ProbeTarget | None: + return self._active_route.target if self._active_route else None + + async def shutdown(self) -> None: + self._stopped = True + for task in list(self._scan_tasks): + task.cancel() + if self._scan_tasks: + await asyncio.gather(*self._scan_tasks, return_exceptions=True) + self._scan_tasks.clear() + + def _route_key(self, target: ProbeTarget) -> str: + return f"{target.ip}:{target.port}:{target.sni}:{target.transport_profile}" + + def _hard_failure(self, target: ProbeTarget) -> bool: + return self._failure_counts.get(self._route_key(target), 0) >= self.cfg.circuit_breaker_failures diff --git a/src/core/adaptive_transport/hygiene.py b/src/core/adaptive_transport/hygiene.py new file mode 100644 index 0000000..5182efc --- /dev/null +++ b/src/core/adaptive_transport/hygiene.py @@ -0,0 +1,21 @@ +from __future__ import annotations + +import ipaddress + + +def validate_public_ip(ip: str) -> str: + try: + addr = ipaddress.ip_address(ip) + except ValueError as exc: + raise ValueError(f"invalid ip: {ip}") from exc + + if ( + addr.is_private + or addr.is_multicast + or addr.is_loopback + or addr.is_reserved + or addr.is_unspecified + or addr.is_link_local + ): + raise ValueError(f"non-public ip rejected: {ip}") + return ip diff --git a/src/core/adaptive_transport/models.py b/src/core/adaptive_transport/models.py new file mode 100644 index 0000000..56bc159 --- /dev/null +++ b/src/core/adaptive_transport/models.py @@ -0,0 +1,51 @@ +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import Literal +import time + +ProbeKind = Literal["tcp_syn", "tls", "h2_preface", "quic"] + + +@dataclass(slots=True, frozen=True) +class ProbeTarget: + ip: str + port: int + sni: str + alpn: tuple[str, ...] = ("h2", "http/1.1") + transport_profile: str = "vless_reality" + + +@dataclass(slots=True) +class ProbeObservation: + kind: ProbeKind + ok: bool + latency_ms: float + packet_loss: float = 0.0 + + +@dataclass(slots=True) +class RouteScore: + target: ProbeTarget + median_rtt_ms: float + jitter_ms: float + packet_loss: float + handshake_success_rate: float + session_stability: float + score: float + sampled_at: float = field(default_factory=time.time) + + +@dataclass(slots=True) +class RuntimeMetrics: + disconnects: int = 0 + retransmissions: int = 0 + latency_spikes: int = 0 + packet_delay_variance: float = 0.0 + + +@dataclass(slots=True) +class SessionState: + state: Literal["session_start", "session_active", "session_stable_window"] = "session_start" + started_at: float = field(default_factory=time.time) + stable_since: float | None = None diff --git a/src/core/adaptive_transport/probe.py b/src/core/adaptive_transport/probe.py new file mode 100644 index 0000000..3cfb5b6 --- /dev/null +++ b/src/core/adaptive_transport/probe.py @@ -0,0 +1,129 @@ +from __future__ import annotations + +import asyncio +import socket +import ssl +import statistics +import time +from dataclasses import dataclass + +from .models import ProbeObservation, ProbeTarget + + +@dataclass(slots=True) +class ProbeConfig: + timeout_s: float = 2.5 + retries: int = 3 + + +class AsyncRouteProbe: + def __init__(self, cfg: ProbeConfig | None = None): + self.cfg = cfg or ProbeConfig() + + async def probe(self, target: ProbeTarget, include_quic: bool = False) -> list[ProbeObservation]: + samples: list[ProbeObservation] = [] + for _ in range(self.cfg.retries): + samples.append(await self._tcp_syn_probe(target)) + samples.append(await self._tls_probe(target)) + samples.append(await self._h2_preface_probe(target)) + if include_quic: + samples.append(await self._quic_probe(target)) + return samples + + async def _tcp_syn_probe(self, target: ProbeTarget) -> ProbeObservation: + start = time.perf_counter() + ok = False + writer = None + try: + fut = asyncio.open_connection(target.ip, target.port) + _, writer = await asyncio.wait_for(fut, timeout=self.cfg.timeout_s) + ok = True + except Exception: + ok = False + finally: + if writer is not None: + writer.close() + await writer.wait_closed() + return ProbeObservation("tcp_syn", ok, (time.perf_counter() - start) * 1000) + + async def _tls_probe(self, target: ProbeTarget) -> ProbeObservation: + ctx = ssl.create_default_context() + ctx.check_hostname = False + ctx.verify_mode = ssl.CERT_NONE + ctx.set_alpn_protocols(list(target.alpn)) + start = time.perf_counter() + writer = None + ok = False + try: + reader, writer = await asyncio.wait_for( + asyncio.open_connection(target.ip, target.port, ssl=ctx, server_hostname=target.sni), + timeout=self.cfg.timeout_s, + ) + ssl_obj = writer.get_extra_info("ssl_object") + ok = bool(reader and ssl_obj and ssl_obj.version()) + except Exception: + ok = False + finally: + if writer is not None: + writer.close() + await writer.wait_closed() + return ProbeObservation("tls", ok, (time.perf_counter() - start) * 1000) + + async def _h2_preface_probe(self, target: ProbeTarget) -> ProbeObservation: + ctx = ssl.create_default_context() + ctx.check_hostname = False + ctx.verify_mode = ssl.CERT_NONE + ctx.set_alpn_protocols(["h2"]) + start = time.perf_counter() + writer = None + reader = None + ok = False + try: + reader, writer = await asyncio.wait_for( + asyncio.open_connection(target.ip, target.port, ssl=ctx, server_hostname=target.sni), + timeout=self.cfg.timeout_s, + ) + ssl_obj = writer.get_extra_info("ssl_object") + if not ssl_obj or ssl_obj.selected_alpn_protocol() != "h2": + raise RuntimeError("h2 alpn not negotiated") + writer.write(b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n") + await writer.drain() + frame_header = await asyncio.wait_for(reader.readexactly(9), timeout=0.4) + ok = len(frame_header) == 9 + except Exception: + ok = False + finally: + if writer is not None: + writer.close() + await writer.wait_closed() + return ProbeObservation("h2_preface", ok, (time.perf_counter() - start) * 1000) + + async def _quic_probe(self, target: ProbeTarget) -> ProbeObservation: + loop = asyncio.get_running_loop() + start = time.perf_counter() + udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + udp_socket.setblocking(False) + ok = False + try: + udp_socket.connect((target.ip, target.port)) + initial = bytes.fromhex("c300000001088394c8f03e5157080000449e00000002") + await loop.sock_sendall(udp_socket, initial) + response = await asyncio.wait_for(loop.sock_recv(udp_socket, 1200), timeout=min(self.cfg.timeout_s, 0.5)) + ok = bool(response) + except Exception: + ok = False + finally: + udp_socket.close() + return ProbeObservation("quic", ok, (time.perf_counter() - start) * 1000) + + +def summarize(samples: list[ProbeObservation]) -> tuple[float, float, float, float, float]: + lat = [s.latency_ms for s in samples if s.ok] + if not lat: + return 9_999.0, 5_000.0, 1.0, 0.0, 0.0 + median = statistics.median(lat) + jitter = statistics.pstdev(lat) if len(lat) > 1 else 0.0 + loss = 1.0 - (len(lat) / max(1, len(samples))) + handshake_success = len([s for s in samples if s.kind == "tls" and s.ok]) / max(1, len([s for s in samples if s.kind == "tls"])) + stability = 1.0 / (1.0 + jitter + (loss * 100.0)) + return median, jitter, loss, handshake_success, stability diff --git a/src/core/adaptive_transport/storage.py b/src/core/adaptive_transport/storage.py new file mode 100644 index 0000000..7e14426 --- /dev/null +++ b/src/core/adaptive_transport/storage.py @@ -0,0 +1,127 @@ +from __future__ import annotations + +import asyncio +import sqlite3 +import time +from pathlib import Path + +from .models import ProbeTarget, RouteScore, RuntimeMetrics + + +class RouteIntelligenceStore: + def __init__(self, path: str): + self.path = Path(path) + self._lock = asyncio.Lock() + self._init_db() + + def _init_db(self) -> None: + with sqlite3.connect(self.path) as conn: + conn.execute( + """ + CREATE TABLE IF NOT EXISTS route_scores ( + ip TEXT NOT NULL, + port INTEGER NOT NULL, + sni TEXT NOT NULL, + profile TEXT NOT NULL, + sampled_at REAL NOT NULL, + median_rtt_ms REAL NOT NULL, + jitter_ms REAL NOT NULL, + packet_loss REAL NOT NULL, + handshake_success REAL NOT NULL, + session_stability REAL NOT NULL, + score REAL NOT NULL, + success_count INTEGER NOT NULL DEFAULT 0, + failure_count INTEGER NOT NULL DEFAULT 0, + cooldown_until REAL NOT NULL DEFAULT 0, + retired INTEGER NOT NULL DEFAULT 0 + ) + """ + ) + conn.execute( + """ + CREATE TABLE IF NOT EXISTS route_runtime_metrics ( + ip TEXT NOT NULL, + port INTEGER NOT NULL, + sni TEXT NOT NULL, + profile TEXT NOT NULL, + observed_at REAL NOT NULL, + disconnects INTEGER NOT NULL, + retransmissions INTEGER NOT NULL, + latency_spikes INTEGER NOT NULL, + packet_delay_variance REAL NOT NULL + ) + """ + ) + + async def record_score(self, score: RouteScore) -> None: + async with self._lock: + await asyncio.to_thread(self._record_score_sync, score) + + def _record_score_sync(self, score: RouteScore) -> None: + with sqlite3.connect(self.path, timeout=30) as conn: + conn.execute("PRAGMA journal_mode=WAL") + conn.execute( + """INSERT INTO route_scores + (ip,port,sni,profile,sampled_at,median_rtt_ms,jitter_ms,packet_loss,handshake_success,session_stability,score,success_count,failure_count) + VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?)""", + ( + score.target.ip, + score.target.port, + score.target.sni, + score.target.transport_profile, + score.sampled_at, + score.median_rtt_ms, + score.jitter_ms, + score.packet_loss, + score.handshake_success_rate, + score.session_stability, + score.score, + 1 if score.score > 0 else 0, + 0 if score.score > 0 else 1, + ), + ) + + async def record_runtime_metrics(self, target: ProbeTarget, metrics: RuntimeMetrics, observed_at: float | None = None) -> None: + async with self._lock: + await asyncio.to_thread(self._record_runtime_metrics_sync, target, metrics, observed_at or time.time()) + + def _record_runtime_metrics_sync(self, target: ProbeTarget, metrics: RuntimeMetrics, observed_at: float) -> None: + with sqlite3.connect(self.path, timeout=30) as conn: + conn.execute("PRAGMA journal_mode=WAL") + conn.execute( + """INSERT INTO route_runtime_metrics + (ip,port,sni,profile,observed_at,disconnects,retransmissions,latency_spikes,packet_delay_variance) + VALUES (?,?,?,?,?,?,?,?,?)""", + (target.ip, target.port, target.sni, target.transport_profile, observed_at, metrics.disconnects, metrics.retransmissions, metrics.latency_spikes, metrics.packet_delay_variance), + ) + + async def top_routes(self, limit: int = 5, decay_window_s: float = 900.0) -> list[tuple[str, int, str, str, float]]: + now = time.time() + async with self._lock: + return await asyncio.to_thread(self._top_routes_sync, limit, now, decay_window_s) + + def _top_routes_sync(self, limit: int, now: float, decay_window_s: float): + with sqlite3.connect(self.path) as conn: + rows = conn.execute( + """SELECT s.ip,s.port,s.sni,s.profile, + AVG( + s.score * CASE + WHEN (? - s.sampled_at) >= ? THEN 0.05 + ELSE EXP(-((? - s.sampled_at) / ?)) + END + ) + - COALESCE(AVG(ABS(s.score - ss.mean_score)), 0.0) * 0.35 + - COALESCE(SUM(m.disconnects + m.retransmissions + m.latency_spikes) * 0.02, 0.0) + - COALESCE(AVG(m.packet_delay_variance) * 0.01, 0.0) AS decayed + FROM route_scores s + LEFT JOIN ( + SELECT ip,port,sni,profile,AVG(score) AS mean_score FROM route_scores GROUP BY ip,port,sni,profile + ) ss ON ss.ip=s.ip AND ss.port=s.port AND ss.sni=s.sni AND ss.profile=s.profile + LEFT JOIN route_runtime_metrics m ON m.ip=s.ip AND m.port=s.port AND m.sni=s.sni AND m.profile=s.profile + WHERE s.retired = 0 AND s.cooldown_until < ? + GROUP BY s.ip,s.port,s.sni,s.profile + ORDER BY decayed DESC + LIMIT ?""", + (now, max(1.0, decay_window_s), now, max(1.0, decay_window_s), now, max(1.0, decay_window_s), now, limit), + ).fetchall() + return rows diff --git a/src/core/adaptive_transport/transport_profiles.py b/src/core/adaptive_transport/transport_profiles.py new file mode 100644 index 0000000..2e4eabb --- /dev/null +++ b/src/core/adaptive_transport/transport_profiles.py @@ -0,0 +1,20 @@ +from __future__ import annotations + +from dataclasses import dataclass + + +@dataclass(frozen=True, slots=True) +class TransportProfile: + name: str + protocol: str + fallback: tuple[str, ...] + browser_fingerprint: str + + +PROFILES = { + "vless_reality": TransportProfile("vless_reality", "tcp+tls", ("h2", "h3", "udp_over_tcp"), "chrome_124"), + "http2_fallback": TransportProfile("http2_fallback", "h2", ("h1",), "firefox_125"), + "http3_fallback": TransportProfile("http3_fallback", "h3", ("h2",), "chrome_124"), + "udp_over_tcp": TransportProfile("udp_over_tcp", "tcp", ("h2",), "safari_17"), + "quic": TransportProfile("quic", "udp", ("h3", "h2"), "chrome_124"), +} diff --git a/src/proxy/proxy_server.py b/src/proxy/proxy_server.py index c6c2fe9..b390818 100644 --- a/src/proxy/proxy_server.py +++ b/src/proxy/proxy_server.py @@ -520,7 +520,11 @@ async def _on_socks_client(self, reader: asyncio.StreamReader, result = await negotiate_socks5(reader, writer) if result is None: return - host, port = result + cmd, host, port = result + if cmd == "udp_associate": + log.info("SOCKS5 UDP ASSOCIATE requested by %s", addr) + await self._handle_socks5_udp_associate(reader, writer) + return log.info("SOCKS5 CONNECT → %s:%d", host, port) await self._handle_target_tunnel(host, port, reader, writer) except asyncio.IncompleteReadError: @@ -539,6 +543,29 @@ async def _on_socks_client(self, reader: asyncio.StreamReader, except Exception: pass + + + async def _handle_socks5_udp_associate(self, + reader: asyncio.StreamReader, + writer: asyncio.StreamWriter): + """Handle SOCKS5 UDP ASSOCIATE for low-latency UDP apps (e.g., games).""" + loop = asyncio.get_running_loop() + transport, protocol = await loop.create_datagram_endpoint( + lambda: _Socks5UdpRelayProtocol(self.fronter), + local_addr=(self.socks_host, 0), + ) + sockname = transport.get_extra_info("sockname") + bind_ip = sockname[0] if sockname else self.socks_host + bind_port = sockname[1] if sockname else 0 + + try: + writer.write(b"\x05\x00\x00\x01" + socket.inet_aton(bind_ip) + bind_port.to_bytes(2, "big")) + await writer.drain() + await reader.read() + finally: + transport.close() + + # ── CONNECT (HTTPS tunnelling) ──────────────────────────────── async def _do_connect(self, target: str, reader, writer): @@ -1408,3 +1435,72 @@ async def _do_http(self, header_block: bytes, reader, writer): writer.write(response) await writer.drain() + + +class _Socks5UdpRelayProtocol(asyncio.DatagramProtocol): + """Best-effort SOCKS5 UDP relay (single-hop direct UDP forwarding).""" + + def __init__(self, fronter): + self.transport: asyncio.DatagramTransport | None = None + self._client_addr = None + self._fronter = fronter + self._loop = asyncio.get_running_loop() + + def connection_made(self, transport): + self.transport = transport + + + async def _forward_udp(self, dst: str, dport: int, payload: bytes): + if self.transport is None or self._client_addr is None: + return + relayed = await self._fronter.relay_udp_packet(dst, dport, payload) + if relayed is not None: + header = self._build_socks_header(dst, dport) + if header is not None: + self.transport.sendto(header + relayed, self._client_addr) + return + self.transport.sendto(payload, (dst, dport)) + + @staticmethod + def _build_socks_header(host: str, port: int) -> bytes | None: + try: + host_raw = socket.inet_aton(host) + return b"\x00\x00\x00\x01" + host_raw + port.to_bytes(2, "big") + except OSError: + return None + def datagram_received(self, data, addr): + if self.transport is None: + return + + if self._client_addr is None or addr == self._client_addr: + self._client_addr = addr + if len(data) < 10 or data[2] != 0x00: + return + atyp = data[3] + pos = 4 + if atyp == 0x01: # IPv4 + dst = socket.inet_ntoa(data[pos:pos + 4]) + pos += 4 + elif atyp == 0x03: # Domain + ln = data[pos] + pos += 1 + dst = data[pos:pos + ln].decode(errors="replace") + pos += ln + elif atyp == 0x04: # IPv6 + dst = socket.inet_ntop(socket.AF_INET6, data[pos:pos + 16]) + pos += 16 + else: + return + dport = int.from_bytes(data[pos:pos + 2], "big") + payload = data[pos + 2:] + self._loop.create_task(self._forward_udp(dst, dport, payload)) + return + + # response from remote server -> encapsulate back to client + host, port = addr[0], addr[1] + try: + host_raw = socket.inet_aton(host) + header = b"\x00\x00\x00\x01" + host_raw + port.to_bytes(2, "big") + except OSError: + return + self.transport.sendto(header + data, self._client_addr) diff --git a/src/proxy/socks5.py b/src/proxy/socks5.py index 6af0375..97fd920 100644 --- a/src/proxy/socks5.py +++ b/src/proxy/socks5.py @@ -22,8 +22,8 @@ async def negotiate_socks5( reader: asyncio.StreamReader, writer: asyncio.StreamWriter, -) -> tuple[str, int] | None: - """Perform a SOCKS5 handshake and return the requested (host, port). + ) -> tuple[str, str, int] | None: + """Perform a SOCKS5 handshake and return the requested SOCKS command and destination tuple. Sends protocol-level replies directly to *writer*. Returns ``None`` and leaves the connection in a closed state if negotiation fails at @@ -55,8 +55,8 @@ async def negotiate_socks5( # ── Request ─────────────────────────────────────────────────── req = await asyncio.wait_for(reader.readexactly(4), timeout=15) ver, cmd, _rsv, atyp = req - if ver != 5 or cmd != 0x01: - # Only CONNECT (0x01) is supported + if ver != 5 or cmd not in (0x01, 0x03): + # Support CONNECT (0x01) and UDP ASSOCIATE (0x03) only. writer.write(b"\x05\x07\x00\x01\x00\x00\x00\x00\x00\x00") await writer.drain() return None @@ -85,4 +85,5 @@ async def negotiate_socks5( writer.write(b"\x05\x00\x00\x01\x00\x00\x00\x00\x00\x00") await writer.drain() - return host, port + cmd_name = "connect" if cmd == 0x01 else "udp_associate" + return cmd_name, host, port diff --git a/src/relay/domain_fronter.py b/src/relay/domain_fronter.py index 842eeab..d9e264f 100644 --- a/src/relay/domain_fronter.py +++ b/src/relay/domain_fronter.py @@ -1562,6 +1562,44 @@ async def relay(self, method: str, url: str, latency_ns = int((time.perf_counter() - t0) * 1e9) self._record_site(url, len(result), latency_ns, errored) + async def relay_udp_packet(self, host: str, port: int, payload: bytes) -> bytes | None: + """Relay a single UDP packet via exit node over Apps Script. + + This encapsulates UDP into the existing HTTPS relay chain: + client UDP -> SOCKS5 UDP ASSOCIATE -> Apps Script -> exit node -> UDP target + """ + if not self._exit_node_enabled or not self._exit_node_url: + return None + + inner = { + "k": self._exit_node_psk, + "udp": 1, + "host": host, + "port": int(port), + "payload": base64.b64encode(payload).decode(), + } + inner_json = json.dumps(inner).encode() + outer = self._build_payload( + "POST", + self._exit_node_url, + {"Content-Type": "application/json"}, + inner_json, + ) + outer["ct"] = "application/json" + + raw = await self._batch_submit(outer) + _, _, relay_bytes = split_raw_response(raw) + obj = load_relay_json(relay_bytes) + if not isinstance(obj, dict) or obj.get("e"): + return None + b64 = obj.get("payload") + if not isinstance(b64, str) or not b64: + return b"" + try: + return base64.b64decode(b64) + except Exception: + return None + async def _coalesced_submit(self, key: str, payload: dict) -> bytes: """Dedup concurrent requests for the same URL (no Range header). @@ -2851,4 +2889,3 @@ def _parse_batch_body(self, resp_body: bytes, for item in items: results.append(parse_relay_json(item, self._max_response_body_bytes)) return results - diff --git a/tests/test_adaptive_transport.py b/tests/test_adaptive_transport.py new file mode 100644 index 0000000..a607724 --- /dev/null +++ b/tests/test_adaptive_transport.py @@ -0,0 +1,46 @@ +import pathlib +import sys +import tempfile +import unittest + +ROOT = pathlib.Path(__file__).resolve().parents[1] +SRC = ROOT / "src" +if str(SRC) not in sys.path: + sys.path.insert(0, str(SRC)) + +from core.adaptive_transport.engine import AdaptiveRouteEngine +from core.adaptive_transport.hygiene import validate_public_ip +from core.adaptive_transport.models import ProbeTarget, RouteScore + + +class HygieneTests(unittest.TestCase): + def test_rejects_private(self): + with self.assertRaises(ValueError): + validate_public_ip("10.0.0.1") + + def test_accepts_public(self): + self.assertEqual(validate_public_ip("8.8.8.8"), "8.8.8.8") + + +class EngineTests(unittest.IsolatedAsyncioTestCase): + async def test_circuit_breaker(self): + with tempfile.TemporaryDirectory() as td: + engine = AdaptiveRouteEngine(f"{td}/intel.db") + t = ProbeTarget(ip="8.8.8.8", port=443, sni="www.google.com") + for _ in range(engine.cfg.circuit_breaker_failures - 1): + self.assertFalse(engine.register_route_failure(t)) + self.assertTrue(engine.register_route_failure(t)) + + async def test_select_route_sticky(self): + with tempfile.TemporaryDirectory() as td: + engine = AdaptiveRouteEngine(f"{td}/intel.db") + t = ProbeTarget(ip="8.8.8.8", port=443, sni="www.google.com") + r1 = RouteScore(t, 100, 1, 0.0, 1.0, 0.9, 0.8) + r2 = RouteScore(t, 90, 1, 0.0, 1.0, 0.9, 0.81) + chosen = await engine.select_route([r1], gameplay_active=True) + chosen2 = await engine.select_route([r2], gameplay_active=True) + self.assertIs(chosen, chosen2) + + +if __name__ == "__main__": + unittest.main()