From 290b723d33dc694569eb8ec9d8d911cbd735006f Mon Sep 17 00:00:00 2001 From: Fyodor Sotnikov Date: Wed, 8 Apr 2026 13:02:46 +0300 Subject: [PATCH 1/3] Rework metrics storage --- .zed/tasks.json | 8 + dev/dev.sql | 10 + src/bin/agent/core/http.rs | 3 + src/bin/api/core/http/filters.rs | 6 - src/bin/api/core/http/handlers/metrics.rs | 80 +++- src/bin/api/core/http/handlers/node.rs | 28 +- src/bin/api/core/http/handlers/sub.rs | 532 ++++++---------------- src/bin/api/core/http/request.rs | 9 + src/bin/api/core/http/routes.rs | 39 +- src/bin/api/core/postgres/node.rs | 9 +- src/config/settings.rs | 6 +- src/h2_op/mod.rs | 4 +- src/memory/node.rs | 37 +- src/metrics/impls.rs | 10 +- src/metrics/storage.rs | 153 +++++-- src/xray_op/vless.rs | 10 +- 16 files changed, 445 insertions(+), 499 deletions(-) diff --git a/.zed/tasks.json b/.zed/tasks.json index 3764ddec..6e438ecf 100644 --- a/.zed/tasks.json +++ b/.zed/tasks.json @@ -10,6 +10,14 @@ "label": "Run agent", "command": "cargo run --bin agent -- dev/config-agent.toml ", }, + { + "label": "Run agent2", + "command": "cargo run --bin agent -- dev/config-agent2.toml ", + }, + { + "label": "Run agent3", + "command": "cargo run --bin agent -- dev/config-agent3.toml ", + }, { "label": "Run api", "command": "cargo run --bin api -- dev/config-api.toml ", diff --git a/dev/dev.sql b/dev/dev.sql index a48ef9ea..6189e80b 100644 --- a/dev/dev.sql +++ b/dev/dev.sql @@ -124,3 +124,13 @@ DROP TABLE keys; DROP TYPE node_status; DROP TYPE proto; + + + + +=== + +CREATE TYPE node_type AS ENUM ('common', 'premium'); + +ALTER TABLE nodes +ADD COLUMN node_type node_type NOT NULL DEFAULT 'common'; diff --git a/src/bin/agent/core/http.rs b/src/bin/agent/core/http.rs index 5cc17729..07d4a6bd 100644 --- a/src/bin/agent/core/http.rs +++ b/src/bin/agent/core/http.rs @@ -1,4 +1,5 @@ use async_trait::async_trait; +use pony::memory::node::Type; use reqwest::Client as HttpClient; use reqwest::StatusCode; use reqwest::Url; @@ -35,6 +36,7 @@ pub struct NodeRequest { pub cores: usize, pub max_bandwidth_bps: i64, pub country: String, + pub r#type: Type, } #[async_trait] @@ -150,6 +152,7 @@ where cores: node.cores, max_bandwidth_bps: node.max_bandwidth_bps, country: node.country, + r#type: node.r#type, }; let res = HttpClient::new() diff --git a/src/bin/api/core/http/filters.rs b/src/bin/api/core/http/filters.rs index 8b79a33e..8446ab7c 100644 --- a/src/bin/api/core/http/filters.rs +++ b/src/bin/api/core/http/filters.rs @@ -22,12 +22,6 @@ where warp::any().map(move || mem_sync.clone()) } -pub fn with_param_string( - param: String, -) -> impl Filter + Clone { - warp::any().map(move || param.clone()) -} - pub fn with_param_vec( param: Vec, ) -> impl Filter,), Error = std::convert::Infallible> + Clone { diff --git a/src/bin/api/core/http/handlers/metrics.rs b/src/bin/api/core/http/handlers/metrics.rs index a2be0a9b..3e80d8f8 100644 --- a/src/bin/api/core/http/handlers/metrics.rs +++ b/src/bin/api/core/http/handlers/metrics.rs @@ -1,3 +1,6 @@ +use chrono::Utc; +use futures::{SinkExt, StreamExt}; +use std::collections::BTreeMap; use std::sync::Arc; use pony::metrics::storage::MetricStorage; @@ -5,10 +8,9 @@ use pony::metrics::storage::MetricStorage; pub async fn handle_ws_client( socket: warp::ws::WebSocket, node_id: uuid::Uuid, - metric: String, + series_hash: u64, storage: Arc, ) { - use futures::{SinkExt, StreamExt}; let (mut ws_tx, _) = socket.split(); let mut ticker = tokio::time::interval(std::time::Duration::from_millis(1000)); @@ -18,19 +20,71 @@ pub async fn handle_ws_client( let now_ms = chrono::Utc::now().timestamp_millis(); let ten_min_ago_ms = now_ms - (10 * 60 * 1000); - let points = storage.get_range(&node_id, &metric, ten_min_ago_ms, now_ms); + let points = storage.get_range(&node_id, series_hash, ten_min_ago_ms, now_ms); if !points.is_empty() { - let chart_points: Vec = points - .into_iter() - .map(|p| serde_json::json!({ "x": p.timestamp, "y": p.value })) - .collect(); - - if let Ok(msg) = serde_json::to_string(&chart_points) { - if let Err(e) = ws_tx.send(warp::ws::Message::text(msg)).await { - log::error!("WS send error: {}", e); - break; - } + let msg = serde_json::json!({ + "type": "update", + "node_id": node_id, + "hash": series_hash, + "data": points.iter().map(|p| (p.timestamp, p.value)).collect::>() + }); + + if ws_tx + .send(warp::ws::Message::text(msg.to_string())) + .await + .is_err() + { + break; + } + } + } +} + +pub async fn handle_aggregated_ws( + socket: warp::ws::WebSocket, + tag_key: String, + tag_value: String, + metric_name: String, + storage: Arc, +) { + let (mut ws_tx, _) = socket.split(); + let mut ticker = tokio::time::interval(std::time::Duration::from_millis(1000)); + + loop { + ticker.tick().await; + + let now = Utc::now().timestamp_millis(); + let aggregated_data = + storage.get_aggregated_range(&tag_key, &tag_value, &metric_name, now - 600_000, now); + + if !aggregated_data.is_empty() { + let response = aggregated_data + .iter() + .map(|(id, points)| { + ( + id, + points + .iter() + .map(|p| (p.timestamp, p.value)) + .collect::>(), + ) + }) + .collect::>(); + + let msg = serde_json::json!({ + "type": "aggregated_update", + "tag": format!("{}:{}", tag_key, tag_value), + "metric": metric_name, + "data": response + }); + + if ws_tx + .send(warp::ws::Message::text(msg.to_string())) + .await + .is_err() + { + break; } } } diff --git a/src/bin/api/core/http/handlers/node.rs b/src/bin/api/core/http/handlers/node.rs index efb78d4e..6a982438 100644 --- a/src/bin/api/core/http/handlers/node.rs +++ b/src/bin/api/core/http/handlers/node.rs @@ -136,33 +136,24 @@ where node_metrics_map .iter() .filter_map(|entry| { - let series_key = entry.key(); + let series_hash = entry.key(); let points = entry.value(); if points.is_empty() { return None; } - if !(series_key.starts_with("sys.") - || series_key.starts_with("net.")) - { + let (name, tags) = metrics.metadata.get(series_hash).map(|m| { + let val = m.value(); + (val.0.clone(), val.1.clone()) + })?; + + if !(name.starts_with("sys.") || name.starts_with("net.")) { return None; } - let tags = metrics - .metadata - .get(series_key) - .map(|m| m.value().clone()) - .unwrap_or_default(); - - let name = series_key - .split(':') - .next() - .unwrap_or(series_key) - .to_string(); - Some(NodeMetricInfo { - key: series_key.clone(), + key: series_hash.to_string(), name, tags, }) @@ -189,9 +180,12 @@ where None => { let response = ResponseMessage::>> { status: StatusCode::NOT_FOUND.as_u16(), + message: "List of nodes".to_string(), + response: vec![].into(), }; + Ok(warp::reply::with_status( warp::reply::json(&response), StatusCode::NOT_FOUND, diff --git a/src/bin/api/core/http/handlers/sub.rs b/src/bin/api/core/http/handlers/sub.rs index 24819bbb..eab11e6c 100644 --- a/src/bin/api/core/http/handlers/sub.rs +++ b/src/bin/api/core/http/handlers/sub.rs @@ -1,8 +1,12 @@ +use std::collections::HashSet; + +use url::Url; + use base64::Engine; use chrono::DateTime; use chrono::Utc; -use url::Url; +use pony::metrics::storage::MetricStorage; use warp::http::Response; use warp::http::StatusCode; @@ -182,8 +186,6 @@ where pub async fn subscription_info_handler( sub_param: SubQueryParam, memory: MemSync, - web_host: String, - api_web_host: String, ) -> Result, warp::Rejection> where N: NodeStorageOp + Sync + Send + Clone + 'static, @@ -247,234 +249,31 @@ where "Подписка на Рилзопровод" }; - let sub_link = format!("{}/sub/info?id={}", api_web_host, id); + let sub_link = format!("https://api.frkn.org/sub/info?id={}", id); let ru_link = format!("{}&env={}", sub_link, "ru"); let wl_link = format!("{}&env={}", sub_link, "wl"); let main_link = format!("{}&env={}", sub_link, "dev"); - let ru_block = if is_ru || is_wl { - format!(r#"Иностранные сервера"#) - } else { - format!( - r#"Российские сервера  "# - ) - }; - - let wl_block = if !is_wl { - format!(r#"Обход Белых Списков"#) - } else { - "".to_string() - }; - - let xray_node_exists = mem - .nodes - .get_by_env(env) - .map(|nodes| { - nodes.iter().any(|node| { - node.inbounds.values().any(|inb| { - matches!( - inb.tag, - Tag::VlessGrpcReality | Tag::VlessTcpReality | Tag::VlessXhttpReality - ) - }) - }) - }) - .unwrap_or(false); - - let hysteria_node_exists = mem - .nodes - .get_by_env(env) - .map(|nodes| { - nodes - .iter() - .any(|node| node.inbounds.values().any(|inb| inb.tag == Tag::Hysteria2)) - }) - .unwrap_or(false); - - let (has_xray, has_h2) = if let Some(conns) = mem.connections.get_by_subscription_id(id) { - let xray_tags = [ - Tag::VlessGrpcReality, - Tag::VlessTcpReality, - Tag::VlessXhttpReality, - ]; - - let mut is_xray = false; - let mut is_h2 = false; - - for (_id, conn) in conns { - let proto = conn.get_proto().proto(); - let is_deleted = conn.get_deleted(); - - if !is_deleted && env == &conn.get_env() { - if xray_node_exists && xray_tags.contains(&proto) { - is_xray = true; - } - if hysteria_node_exists && proto == Tag::Hysteria2 { - is_h2 = true; - } - } - } - - (is_xray, is_h2) - } else { - (false, false) - }; - - let base_link = format!("{}/sub?id={}&env={}", api_web_host, id, env); - - let xray_block = if has_xray && is_active { - format!( - r#" -
    -
  • -
    Универсальная
    -
    Скопировать
    -
  • -
  • -
    TXT
    -
    Скопировать
    -
  • -
  • -
    Clash
    -
    Скопировать
    -
  • -
- -
- -
Отсканируйте в приложении
-
-

- -
-

Поддерживаемые приложения

-
    -
  • Happ, Hiddify, v2rayNG, Shadowrocket, Streisand, Clash Verge, Nekobox
  • -
-
- "# - ) - } else { - format!( - r#"
Нет доступных Xray подключений для {}. Обратитесь в поддержку.
"#, - env - ) - }; - - let hysteria_block = if has_h2 && is_active { - format!( - r#" -
    -
  • -
    Универсальная
    -
    Скопировать
    -
  • -
  • -
    TXT
    -
    Скопировать
    -
  • -
- -
- -
Отсканируйте в приложении
-
- -

-
-

Поддерживаемые приложения

-
    -
  • Shadowrocket, hiddify, v2rayN
  • -
-
- "# - ) - } else { - format!( - r#"
Нет доступных Hysteria2 подключений для {}. Обратитесь в поддержку.
"#, - env - ) - }; - - fn proxy_label_from_url(url_str: &str) -> String { - if let Ok(url) = Url::parse(url_str) { - if let Some(fragment) = url.fragment() { - return percent_encoding::percent_decode_str(fragment) - .decode_utf8_lossy() - .to_string(); - } - } - "Telegram Proxy".into() - } - - let mtproto_block = if is_active { - match mem.nodes.get_by_env(env) { - Some(nodes) if !nodes.is_empty() => { - let links_html: String = nodes - .iter() - .filter_map(|node| { - node.inbounds - .values() - .find(|inb| inb.tag == Tag::Mtproto) - .and_then(|inb| mtproto_conn(node.address, inb, &node.label).ok()) - }) - .map(|link| { - let label = proxy_label_from_url(&link); - format!( - r#"
  • -
    {label}
    - Connect -
  • "#, - href = link, - label = label - ) - }) - .collect::>() - .join("\n"); - - if links_html.is_empty() { - format!( - r#"
    Нет доступных Mtproto подключений для {}. Обратитесь в поддержку.
    "#, - env - ) - } else { - links_html - } - } - _ => format!( - r#"
    Нет доступных Mtproto подключений для {}. Обратитесь в поддержку.
    "#, - env - ), - } - } else { - format!( - r#"
    Нет доступных Mtproto подключений для {}. Обратитесь в поддержку.
    "#, - env - ) - }; - - let main_link_vless = format!( - "{}/sub?id={}&format=txt&env={}&proto=Xray", - api_web_host, id, env - ); - let main_link_h2 = format!( - "{}/sub?id={}&format=txt&env={}&proto=Hysteria2", - api_web_host, id, env - ); - let html = format!( -r#"{head} + r#"{head}
    -{ru_block} -{wl_block} +

    {title}

    +
    +

    Твоя Новая Страница подписки теперь доступна по АДРЕСУ

    + +

    https://frkn.org/subscription?id={subscription_id}

    + + + https://frkn.org/subscription?id={subscription_id} +
    Статус: {status_text}
    Дата окончания: {expires}
    @@ -483,193 +282,22 @@ r#"{head}
    Id: {subscription_id}

    -
    Трафик: ↓    ↑
    -
    - -

    Ссылки для подключения

    - -
    - - - - - - -
    - -
    -{xray_block} - -
    - -
    -{hysteria_block} - -
    - -
    - {mtproto_block} -
    - -
    -
    Wireguard скоро будет доступен
    -
    - -
    -
    Amnezia Wireguard скоро будет доступен
    -
    - -
    -
    TrustTunnel скоро будет доступен
    -
    - -
    -
    -

    Докинуть дней (Активировать ключ)

    - - - -
    -
    - -
    -

    Реферальная программа

    -
    Твой реферальный код: {ref}
    - -
    -
    Вы пригласили: {invited}
    -
    Добавим по 7 дней доступа и тебе и другу
    -

    {footer}
    - - - - - -
    Скопировано
    "#, - head = HEAD, - footer = FOOTER, - logo = LOGO, - status_class = status_class, - status_text = status_text, - expires = expires, - days = days, - ref = sub.refer_code(), - invited = invited, - subscription_id = id, - title = title, - ru_block = ru_block, - wl_block = wl_block, - xray_block = xray_block, - hysteria_block = hysteria_block, - mtproto_block = mtproto_block + head = HEAD, + footer = FOOTER, + logo = LOGO, + status_class = status_class, + status_text = status_text, + expires = expires, + days = days, + subscription_id = id, + title = title, ); Ok(Box::new(warp::reply::with_status( @@ -678,6 +306,114 @@ window.onload = () => {{ ))) } +#[derive(serde::Serialize)] +pub struct EnvInfo { + pub env: String, + pub has_xray: bool, + pub has_hysteria: bool, +} + +#[derive(serde::Serialize)] +pub struct SubscriptionResponse { + pub id: uuid::Uuid, + pub expires: DateTime, + pub days: i64, + pub ref_code: String, + pub invited_count: usize, + pub locations: Vec, +} + +/// get subscription_info_json +pub async fn get_subscription_info_json( + subscription_id: uuid::Uuid, + memory: MemSync, + _metrics: std::sync::Arc, +) -> Result, warp::Rejection> +where + N: NodeStorageOp + Sync + Send + Clone + 'static, + S: SubscriptionOp + Send + Sync + Clone + 'static + PartialEq, + C: ConnectionApiOp + + ConnectionBaseOp + + Sync + + Send + + Clone + + 'static + + std::fmt::Debug + + PartialEq, +{ + let mem = memory.memory.read().await; + + let Some(sub) = mem.subscriptions.find_by_id(&subscription_id) else { + return Ok(Box::new(warp::reply::with_status( + warp::reply::json(&"Subscription not found"), + warp::http::StatusCode::NOT_FOUND, + ))); + }; + + let connections = mem.connections.get_by_subscription_id(&subscription_id); + let mut locations = Vec::new(); + + if let Some(conns) = connections.clone() { + let active_envs: HashSet = conns + .iter() + .filter(|(_, conn)| !conn.get_deleted()) + .map(|(_, conn)| conn.get_env()) + .collect(); + + for env in active_envs { + let xray_tags = [ + Tag::VlessGrpcReality, + Tag::VlessTcpReality, + Tag::VlessXhttpReality, + ]; + + let nodes = mem.nodes.get_by_env(&env); + let xray_nodes = nodes.clone(); + let xray_node_exists = xray_nodes.is_some_and(|ns| { + ns.iter() + .any(|n| n.inbounds.values().any(|i| xray_tags.contains(&i.tag))) + }); + + let hyst_node_exists = nodes.is_some_and(|ns| { + ns.iter() + .any(|n| n.inbounds.values().any(|i| i.tag == Tag::Hysteria2)) + }); + + let mut has_xray = false; + let mut has_hysteria = false; + + for (_, conn) in conns.clone() { + if !conn.get_deleted() && conn.get_env() == env { + let proto = conn.get_proto().proto(); + if xray_node_exists && xray_tags.contains(&proto) { + has_xray = true; + } + if hyst_node_exists && proto == Tag::Hysteria2 { + has_hysteria = true; + } + } + } + + locations.push(EnvInfo { + env, + has_xray, + has_hysteria, + }); + } + } + + let sub_resp = SubscriptionResponse { + id: sub.id(), + expires: sub.expires_at().unwrap_or_default(), + days: sub.days_remaining().unwrap_or(0), + ref_code: sub.refer_code(), + invited_count: mem.subscriptions.count_invited_by(&sub.refer_code()), + locations, + }; + + Ok(Box::new(warp::reply::json(&sub_resp))) +} + /// Get list of subscription connection credentials pub async fn get_subscription_connections_handler( sub_param: SubIdQueryParam, diff --git a/src/bin/api/core/http/request.rs b/src/bin/api/core/http/request.rs index ce0e0204..959031e2 100644 --- a/src/bin/api/core/http/request.rs +++ b/src/bin/api/core/http/request.rs @@ -2,6 +2,7 @@ use pony::config::xray::Inbound; use pony::memory::connection::wireguard::Param as WgParam; use pony::memory::node::Node; use pony::memory::node::Status as NodeStatus; +use pony::memory::node::Type; use pony::memory::tag::ProtoTag as Tag; use chrono::Utc; @@ -36,11 +37,18 @@ pub struct NodeRequest { pub cores: usize, pub max_bandwidth_bps: i64, pub country: String, + pub r#type: Option, } impl NodeRequest { pub fn as_node(&self) -> Node { let now = Utc::now(); + + let t = if let Some(t) = self.r#type { + t + } else { + Type::Common + }; Node { uuid: self.uuid, env: self.env.clone(), @@ -55,6 +63,7 @@ impl NodeRequest { cores: self.cores, max_bandwidth_bps: self.max_bandwidth_bps, country: self.country.clone(), + r#type: t, } } } diff --git a/src/bin/api/core/http/routes.rs b/src/bin/api/core/http/routes.rs index d751e388..008fc250 100644 --- a/src/bin/api/core/http/routes.rs +++ b/src/bin/api/core/http/routes.rs @@ -115,10 +115,15 @@ where .and(warp::path::end()) .and(warp::query::()) .and(with_sync(self.sync.clone())) - .and(with_param_string(params.web_host)) - .and(with_param_string(params.api_web_host)) .and_then(subscription_info_handler); + let get_subscription_info_route_new = warp::get() + .and(warp::path!("subscription" / Uuid)) + .and(warp::path::end()) + .and(with_sync(self.sync.clone())) + .and(with_metrics(self.metrics.clone())) + .and_then(get_subscription_info_json); + let post_subscription_route = warp::post() .and(warp::path("subscription")) .and(warp::path::end()) @@ -201,26 +206,39 @@ where .and_then(post_activate_key_handler); use uuid::Uuid; - let ws_metrics_route = warp::path!("metrics" / Uuid / String / "ws") + let ws_all_metrics_route = warp::path!("metrics" / "all" / Uuid / u64 / "ws") .and(warp::ws()) .and(with_metrics(self.metrics.clone())) .map( - |node_id, encoded_metric: String, ws: warp::ws::Ws, storage| { - let metric_name = urlencoding::decode(&encoded_metric) - .map(|s| s.into_owned()) - .unwrap_or(encoded_metric); - + |node_id: Uuid, series_hash: u64, ws: warp::ws::Ws, storage| { ws.on_upgrade(move |socket| { - handle_ws_client(socket, node_id, metric_name, storage) + handle_ws_client(socket, node_id, series_hash, storage) }) }, ); + let ws_aggregate_route = + warp::path!("metrics" / "aggregate" / String / String / String / "ws") + .and(warp::ws()) + .and(with_metrics(self.metrics.clone())) + .map( + |tag_key: String, + tag_value: String, + metric_name: String, + ws: warp::ws::Ws, + storage| { + ws.on_upgrade(move |socket| { + handle_aggregated_ws(socket, tag_key, tag_value, metric_name, storage) + }) + }, + ); + let routes = get_healthcheck_route // Subscription .or(get_subscription_connections_route) .or(get_subscription_route) .or(get_subscription_info_route) + .or(get_subscription_info_route_new) .or(post_subscription_route) .or(put_subscription_route) // Node @@ -237,7 +255,8 @@ where .or(post_key_route) .or(post_activate_key_route) // Metrics - .or(ws_metrics_route) + .or(ws_all_metrics_route) + .or(ws_aggregate_route) .recover(rejection) .with(cors); diff --git a/src/bin/api/core/postgres/node.rs b/src/bin/api/core/postgres/node.rs index 93fd7a9a..40b08c53 100644 --- a/src/bin/api/core/postgres/node.rs +++ b/src/bin/api/core/postgres/node.rs @@ -2,6 +2,7 @@ use defguard_wireguard_rs::net::IpAddrMask; use chrono::DateTime; use chrono::Utc; +use pony::memory::node::Type; use std::collections::HashMap; use std::net::IpAddr; use std::net::Ipv4Addr; @@ -165,8 +166,10 @@ impl PgNode { .query( "SELECT n.id AS node_id, n.uuid, n.env, n.hostname, n.address, n.status, - n.created_at, n.modified_at, n.label, n.interface, n.cores, n.max_bandwidth_bps, n.country, - i.id AS inbound_id, i.tag, i.port, i.stream_settings, i.uplink, i.downlink, + n.created_at, n.modified_at, n.label, n.interface, + n.cores, n.max_bandwidth_bps, n.country, n.node_type, i.id + + AS inbound_id, i.tag, i.port, i.stream_settings, i.uplink, i.downlink, i.conn_count, i.wg_pubkey, i.wg_privkey, i.wg_interface, i.wg_network, i.wg_address, i.dns, i.h2, i.mtproto_secret FROM nodes n LEFT JOIN inbounds i ON n.id = i.node_id", @@ -190,6 +193,7 @@ impl PgNode { let cores: i32 = row.get("cores"); let country: String = row.get("country"); let max_bandwidth_bps: i64 = row.get("max_bandwidth_bps"); + let r#type: Type = row.get("node_type"); let wg_network: Option = row .get::<_, Option>("wg_network") @@ -228,6 +232,7 @@ impl PgNode { cores: cores as usize, max_bandwidth_bps, country, + r#type, }); if let Some(_inbound_id) = inbound_id { diff --git a/src/config/settings.rs b/src/config/settings.rs index e369dc85..6527c394 100644 --- a/src/config/settings.rs +++ b/src/config/settings.rs @@ -1,3 +1,4 @@ +use crate::memory::node::Type; use crate::PonyError; use crate::Result; use default_net::{get_default_interface, get_interfaces}; @@ -22,8 +23,6 @@ pub struct ApiServiceConfig { pub port: u16, pub token: String, pub db_sync_interval_sec: u64, - pub web_host: String, - pub api_web_host: String, pub subscription_restore_interval: u64, pub subscription_expire_interval: u64, pub key_sign_token: Vec, @@ -95,6 +94,7 @@ pub struct NodeConfig { pub max_bandwidth_bps: i64, pub cores: usize, pub country: String, + pub r#type: Type, } #[derive(Clone, Debug, Deserialize, Default)] @@ -108,6 +108,7 @@ pub struct NodeConfigRaw { pub label: String, pub max_bandwidth_bps: i64, pub country: String, + pub r#type: String, } impl NodeConfig { @@ -196,6 +197,7 @@ impl NodeConfig { max_bandwidth_bps: raw.max_bandwidth_bps, cores: num_cpus, country: raw.country, + r#type: raw.r#type.parse().unwrap_or(Type::Common), }) } } diff --git a/src/h2_op/mod.rs b/src/h2_op/mod.rs index 13170a27..049cd6a5 100644 --- a/src/h2_op/mod.rs +++ b/src/h2_op/mod.rs @@ -1,5 +1,6 @@ use crate::config::xray::Inbound; +use crate::utils::get_uuid_last_octet_simple; use crate::{PonyError, Result}; use url::Url; @@ -37,7 +38,8 @@ pub fn hysteria2_conn( .append_pair("up-mbps", &h2.up_mbps.unwrap_or(0).to_string()) .append_pair("down-mbps", &h2.down_mbps.unwrap_or(0).to_string()); - url.set_fragment(Some(label)); + let last = get_uuid_last_octet_simple(token); + url.set_fragment(Some(&format!("{} {} H2", last, label))); Ok(url.to_string()) } else { diff --git a/src/memory/node.rs b/src/memory/node.rs index 54ef68c4..c4d437e1 100644 --- a/src/memory/node.rs +++ b/src/memory/node.rs @@ -53,11 +53,41 @@ impl FromStr for Status { } } +#[derive(Clone, Debug, Deserialize, Serialize, Copy, ToSql, FromSql, PartialEq)] +#[postgres(name = "node_type", rename_all = "snake_case")] +#[serde(rename_all = "lowercase")] +pub enum Type { + Common, + Premium, +} + +impl fmt::Display for Type { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Type::Common => write!(f, "Common"), + Type::Premium => write!(f, "Premium"), + } + } +} + +impl FromStr for Type { + type Err = (); + + fn from_str(input: &str) -> Result { + match input { + "Common" => Ok(Type::Common), + "Premium" => Ok(Type::Premium), + _ => Ok(Type::Common), + } + } +} + #[derive(Clone, Debug, Deserialize, Serialize)] pub struct NodeResponse { pub uuid: uuid::Uuid, pub env: String, pub hostname: String, + pub interface: String, pub address: Ipv4Addr, pub inbounds: Vec, pub status: Status, @@ -66,6 +96,7 @@ pub struct NodeResponse { pub max_bandwidth_bps: i64, pub metrics: Vec, pub country: String, + pub r#type: Type, } #[derive(Debug, Deserialize, Serialize, Clone)] @@ -90,6 +121,7 @@ pub struct Node { pub cores: usize, pub max_bandwidth_bps: i64, pub country: String, + pub r#type: Type, } impl Node { @@ -180,6 +212,7 @@ impl Node { cores: settings.cores, max_bandwidth_bps: settings.max_bandwidth_bps, country: settings.country, + r#type: settings.r#type, } } @@ -190,13 +223,13 @@ impl Node { tags.insert("label".to_string(), self.label.clone()); tags.insert("address".to_string(), self.address.to_string()); tags.insert("label".to_string(), self.label.clone()); - tags.insert("interface".to_string(), self.interface.clone()); tags.insert("cores".to_string(), self.cores.to_string()); tags.insert( "max_bandwidth_bps".to_string(), self.max_bandwidth_bps.to_string(), ); tags.insert("country".to_string(), self.country.clone()); + tags.insert("type".to_string(), self.r#type.to_string()); tags } @@ -206,6 +239,7 @@ impl Node { NodeResponse { env: self.env.clone(), hostname: self.hostname.clone(), + interface: self.interface.clone(), address: self.address, uuid: self.uuid, inbounds: tags, @@ -215,6 +249,7 @@ impl Node { max_bandwidth_bps: self.max_bandwidth_bps, metrics: [].to_vec(), country: self.country.clone(), + r#type: self.r#type, } } diff --git a/src/metrics/impls.rs b/src/metrics/impls.rs index f7278bc1..742bcd7a 100644 --- a/src/metrics/impls.rs +++ b/src/metrics/impls.rs @@ -79,11 +79,19 @@ where std::thread::sleep(std::time::Duration::from_secs(1)); networks.refresh(true); + let default_if = self.node_settings().interface.clone(); + let node = self.node_settings(); - let tags = node.get_base_tags(); + let mut tags = node.get_base_tags(); let node_uuid = node.uuid; for (interface, data) in networks.iter() { + if interface == &default_if { + tags.insert("default_interface".to_string(), "true".to_string()); + } else { + tags.insert("default_interface".to_string(), "false".to_string()); + } + self.metrics().write( &node_uuid, &format!("net.{interface}.total_rx_bps"), diff --git a/src/metrics/storage.rs b/src/metrics/storage.rs index 00ac0f73..7d1a6b78 100644 --- a/src/metrics/storage.rs +++ b/src/metrics/storage.rs @@ -1,5 +1,5 @@ use dashmap::DashMap; -use std::collections::{BTreeMap, VecDeque}; +use std::collections::{BTreeMap, HashSet, VecDeque}; use super::MetricEnvelope; use super::MetricPoint; @@ -79,8 +79,9 @@ impl MetricBuffer { } pub struct MetricStorage { - pub inner: DashMap>>, - pub metadata: DashMap>, + pub inner: DashMap>>, + pub metadata: DashMap)>, + pub tag_index: DashMap>>, pub max_points: usize, pub retention_seconds: i64, @@ -91,29 +92,73 @@ impl MetricStorage { Self { inner: DashMap::new(), metadata: DashMap::new(), + tag_index: DashMap::new(), max_points, retention_seconds, } } - fn make_series_key(name: &str, tags: &BTreeMap) -> String { - if tags.is_empty() { - return name.to_string(); + pub fn insert_envelope(&self, e: MetricEnvelope) { + let key = Self::make_series_key(&e.name, &e.tags); + + self.metadata.entry(key).or_insert_with(|| { + for (k, v) in &e.tags { + self.tag_index + .entry(k.clone()) + .or_default() + .entry(v.clone()) + .or_default() + .insert(key); + } + (e.name.clone(), e.tags.clone()) + }); + + let node_map = self.inner.entry(e.node_id).or_default(); + let mut entry = node_map.entry(key).or_default(); + + entry.push_back(MetricPoint { + timestamp: e.timestamp, + value: e.value, + }); + + while entry.len() > self.max_points { + entry.pop_front(); } - let tags_part: Vec = tags.iter().map(|(k, v)| format!("{}={}", k, v)).collect(); - format!("{}:{{{}}}", name, tags_part.join(",")) + + let retention_ms = self.retention_seconds * 1000; + let min_ts = e.timestamp - retention_ms; + + while let Some(front) = entry.front() { + if front.timestamp < min_ts { + entry.pop_front(); + } else { + break; + } + } + } + fn make_series_key(name: &str, tags: &BTreeMap) -> u64 { + use std::collections::hash_map::DefaultHasher; + use std::hash::{Hash, Hasher}; + + let mut hasher = DefaultHasher::new(); + name.hash(&mut hasher); + for (k, v) in tags { + k.hash(&mut hasher); + v.hash(&mut hasher); + } + hasher.finish() } pub fn get_range( &self, node_id: &uuid::Uuid, - series_key: &str, + series_hash: u64, from: i64, to: i64, ) -> Vec { self.inner .get(node_id) .and_then(|node_data| { - node_data.get(series_key).map(|deque| { + node_data.get(&series_hash).map(|deque| { deque .iter() .filter(|p| p.timestamp >= from && p.timestamp <= to) @@ -124,43 +169,44 @@ impl MetricStorage { .unwrap_or_default() } - pub fn insert_envelope(&self, env: MetricEnvelope) { - let series_key = Self::make_series_key(&env.name, &env.tags); - - if !self.metadata.contains_key(&series_key) { - self.metadata.insert(series_key.clone(), env.tags); - } - - let node_map = self.inner.entry(env.node_id).or_default(); - let mut entry = node_map.entry(series_key).or_default(); - - // env.timestamp уже в миллисекундах (из MetricBuffer) - entry.push_back(MetricPoint { - timestamp: env.timestamp, - value: env.value, - }); - - // 1. Очистка по количеству точек - while entry.len() > self.max_points { - entry.pop_front(); - } - - // 2. Очистка по времени (Retention) - // Переводим секунды конфига в мс для сравнения с env.timestamp - let retention_ms = self.retention_seconds * 1000; - let min_ts = env.timestamp - retention_ms; + pub fn find_series_by_tag(&self, tag_key: &str, tag_value: &str) -> HashSet { + self.tag_index + .get(tag_key) + .and_then(|tag_map| tag_map.get(tag_value).map(|v| v.clone())) + .unwrap_or_default() + } - while let Some(front) = entry.front() { - if front.timestamp < min_ts { - entry.pop_front(); - } else { - break; + pub fn get_aggregated_range( + &self, + tag_key: &str, + tag_value: &str, + metric_name: &str, + from: i64, + to: i64, + ) -> BTreeMap> { + let mut result = BTreeMap::new(); + + let hashes = self.find_series_by_tag(tag_key, tag_value); + + for node_ref in self.inner.iter() { + let node_id = node_ref.key(); + let _node_data = node_ref.value(); + + for hash in &hashes { + if let Some(meta) = self.metadata.get(hash) { + if meta.0 == metric_name { + let points = self.get_range(node_id, *hash, from, to); + if !points.is_empty() { + result.insert(*node_id, points); + } + } + } } } + result } pub fn perform_gc(&self) { - // Используем встроенный метод для мс, чтобы не множить на 1000 вручную let now_ms = chrono::Utc::now().timestamp_millis(); let retention_ms = self.retention_seconds * 1000; let min_ts = now_ms - retention_ms; @@ -178,13 +224,30 @@ impl MetricStorage { } !deque.is_empty() }); + !node_map.is_empty() }); - self.metadata.retain(|series_key, _| { - self.inner - .iter() - .any(|node_ref| node_ref.value().contains_key(series_key)) + let mut alive_series = std::collections::HashSet::new(); + for node_ref in self.inner.iter() { + for key in node_ref.value().iter().map(|entry| *entry.key()) { + alive_series.insert(key); + } + } + + self.metadata + .retain(|series_key, _| alive_series.contains(series_key)); + + self.tag_index.retain(|_tag_key, tag_map| { + tag_map.retain(|_tag_value, set| { + set.retain(|series_key| alive_series.contains(series_key)); + + !set.is_empty() + }); + + !tag_map.is_empty() }); + + log::debug!("GC completed. Alive series: {}", alive_series.len()); } } diff --git a/src/xray_op/vless.rs b/src/xray_op/vless.rs index af9c3e0b..883cacad 100644 --- a/src/xray_op/vless.rs +++ b/src/xray_op/vless.rs @@ -6,6 +6,7 @@ use super::ProtocolConn; use crate::config::xray::Inbound; use crate::error::{PonyError, Result as PonyResult}; use crate::memory::tag::ProtoTag as Tag; +use crate::utils::get_uuid_last_octet_simple; use crate::xray_api::xray::proxy::vless; use crate::xray_api::xray::{common::protocol::User, common::serial::TypedMessage}; @@ -109,7 +110,8 @@ pub fn vless_xtls_conn( .append_pair("pbk", &pbk) .append_pair("sid", sid); - url.set_fragment(Some(&format!("{} XTLS", label))); + let last = get_uuid_last_octet_simple(conn_id); + url.set_fragment(Some(&format!("{} | {} XTLS", last, label))); Ok(url.to_string()) } @@ -154,7 +156,8 @@ pub fn vless_grpc_conn( .append_pair("pbk", &pbk) .append_pair("sid", sid); - url.set_fragment(Some(&format!("{label} GRPC"))); + let last = get_uuid_last_octet_simple(conn_id); + url.set_fragment(Some(&format!("{} | {} GRPC", last, label))); Ok(url.to_string()) } @@ -199,7 +202,8 @@ pub fn vless_xhttp_conn( .append_pair("pbk", &pbk) .append_pair("sid", sid); - url.set_fragment(Some(&format!("{label} XHTTP"))); + let last = get_uuid_last_octet_simple(conn_id); + url.set_fragment(Some(&format!("{} | {} XHTTP", last, label))); Ok(url.to_string()) } From b465490455f531205387217aed6af3a73a22c1d9 Mon Sep 17 00:00:00 2001 From: Fyodor Sotnikov Date: Wed, 8 Apr 2026 19:49:59 +0300 Subject: [PATCH 2/3] Delete old sub/info page --- config-agent-example.toml | 1 - config-api-example.toml | 6 ++++-- src/bin/api/core/http/handlers/sub.rs | 22 +++++++++------------- src/bin/auth/core/email.rs | 12 ++++-------- src/bin/auth/core/handlers.rs | 27 ++++++++++----------------- src/bin/auth/core/mod.rs | 2 +- 6 files changed, 28 insertions(+), 42 deletions(-) diff --git a/config-agent-example.toml b/config-agent-example.toml index b4be0b15..c120aaa2 100644 --- a/config-agent-example.toml +++ b/config-agent-example.toml @@ -11,7 +11,6 @@ xray_config_path = "/etc/xray/config.json" enabled = true port = 51820 interface = "wg0" -# privkey и address обычно лучше через env, но можно и тут [h2] enabled = false diff --git a/config-api-example.toml b/config-api-example.toml index 630286e4..648785d0 100644 --- a/config-api-example.toml +++ b/config-api-example.toml @@ -2,14 +2,16 @@ address = "0.0.0.0" port = 3005 token = "your-super-secret-api-token" -metrics_enabled = true -metrics_interval = 60 db_sync_interval_sec = 300 subscription_restore_interval = 60 subscription_expire_interval = 60 max_points = 100000000 retention_seconds = 604800 # 7 days +[metrics] +reciever = "tcp://0.0.0.0:3001" +topic = ["metrics"] + [node] env = "experimental" label = "🏴‍☠️ DarkMachine" diff --git a/src/bin/api/core/http/handlers/sub.rs b/src/bin/api/core/http/handlers/sub.rs index eab11e6c..44f01ffb 100644 --- a/src/bin/api/core/http/handlers/sub.rs +++ b/src/bin/api/core/http/handlers/sub.rs @@ -1,7 +1,5 @@ use std::collections::HashSet; -use url::Url; - use base64::Engine; use chrono::DateTime; use chrono::Utc; @@ -13,7 +11,6 @@ use warp::http::StatusCode; use pony::http::helpers as http; use pony::http::response::Instance; use pony::http::ResponseMessage; -use pony::mtproto_op::mtproto_conn; use pony::utils; use pony::utils::get_uuid_last_octet_simple; use pony::xray_op::clash::generate_clash_config; @@ -239,8 +236,6 @@ where .map(|d| d.max(0).to_string()) .unwrap_or_else(|| "∞".into()); - let invited = mem.subscriptions.count_invited_by(&sub.refer_code()); - let title = if is_ru { "Подписка на Рилзопровод (RU)" } else if is_wl { @@ -249,11 +244,6 @@ where "Подписка на Рилзопровод" }; - let sub_link = format!("https://api.frkn.org/sub/info?id={}", id); - let ru_link = format!("{}&env={}", sub_link, "ru"); - let wl_link = format!("{}&env={}", sub_link, "wl"); - let main_link = format!("{}&env={}", sub_link, "dev"); - let html = format!( r#"{head}
    @@ -267,12 +257,18 @@ where

    {title}


    -

    Твоя Новая Страница подписки теперь доступна по АДРЕСУ

    +

    Мы постоянно работаем над улучшениями. +Переработали страницу подписки. +
    Теперь твоя страница подписки доступна по адресу: +

    + https://frkn.org/subscription?id={subscription_id} +
    +

    -

    https://frkn.org/subscription?id={subscription_id}

    +
    +
    - https://frkn.org/subscription?id={subscription_id}
    Статус: {status_text}
    Дата окончания: {expires}
    diff --git a/src/bin/auth/core/email.rs b/src/bin/auth/core/email.rs index 3fe532d5..e37586ad 100644 --- a/src/bin/auth/core/email.rs +++ b/src/bin/auth/core/email.rs @@ -112,8 +112,6 @@ impl EmailStore { &self, to: &str, sub_id: &uuid::Uuid, - api_address: &str, - web_host: &str, ) -> Result<(), Box> { let html_body = format!( r#" @@ -180,7 +178,7 @@ impl EmailStore {

    Привет!

    Твоя подписка для FRKN успешно активирована 🎉

    - Мы не храним твой email

    -

    @frkn_org


    "#, - api_address = api_address, sub_id = sub_id, - web_host = web_host, + web_host = self.web_host, ); let msg = Message::builder() .from(format!("FRKN <{}>", self.smtp.from).parse()?) .to(to.parse()?) - .subject("FRKN Рилзопровод 🚀") + .subject("FRKN Рилзопровод") .header(lettre::message::header::ContentType::TEXT_HTML) .body(html_body)?; diff --git a/src/bin/auth/core/handlers.rs b/src/bin/auth/core/handlers.rs index 57cb5419..b2ee2c9b 100644 --- a/src/bin/auth/core/handlers.rs +++ b/src/bin/auth/core/handlers.rs @@ -1,23 +1,24 @@ -use pony::memory::connection::Connections; use std::sync::Arc; use tokio::sync::RwLock; -use super::helpers::{activate_key, create_connection, create_subscription, validate_key}; +use crate::core::helpers::{activate_key, validate_key}; + +use super::helpers::{create_connection, create_subscription}; use super::request; use super::response; use super::EmailStore; use super::HttpClient; +use super::Env; +use super::DEFAULT_DAYS; +use super::PROTOS; use pony::config::settings::ApiAccessConfig; use pony::http::helpers as http; use pony::http::response::Instance; +use pony::memory::connection::Connections; use pony::ConnectionBaseOp; use pony::ConnectionStorageBaseOp; -use super::Env; -use super::DEFAULT_DAYS; -use super::PROTOS; - pub async fn activate_key_handler( req: request::Key, store: EmailStore, @@ -120,13 +121,9 @@ pub async fn activate_key_handler( } /* ================= SEND EMAIL + SAVE ================= */ - let endpoint = api.endpoint.clone(); if let Some(email) = req.email { - if let Err(e) = store - .send_email(&email, &sub.id, &endpoint, &store.web_host) - .await - { + if let Err(e) = store.send_email(&email, &sub.id).await { log::error!("email error: {}", e); return Ok(http::internal_error( "Failed to send email. Please try again later or contact support.", @@ -227,12 +224,8 @@ pub async fn trial_handler( let now = Utc::now(); let email = req.email.clone(); let ref_by = referred_by.clone(); - let endpoint = api.endpoint.clone(); - if let Err(e) = store - .send_email(&email, &sub.id, &endpoint, &store.web_host) - .await - { + if let Err(e) = store.send_email(&email, &sub.id).await { log::error!("📧 email error: {}", e); return Ok(http::internal_error( "Failed to send confirmation email. Please try again later or contact support.", @@ -258,7 +251,7 @@ pub async fn auth_handler( memory: Arc>>, ) -> Result where - C: ConnectionBaseOp + Sync + Send + Clone + 'static, + C: ConnectionBaseOp + Sync + Send + Clone + 'static + std::fmt::Display, { log::debug!("Auth req {} {} {}", req.auth, req.addr, req.tx); let mem = memory.read().await; diff --git a/src/bin/auth/core/mod.rs b/src/bin/auth/core/mod.rs index 122de773..09bcf33c 100644 --- a/src/bin/auth/core/mod.rs +++ b/src/bin/auth/core/mod.rs @@ -72,7 +72,7 @@ where impl AuthService where - C: ConnectionBaseOp + Send + Sync + Clone + 'static, + C: ConnectionBaseOp + Send + Sync + Clone + 'static + std::fmt::Display, { pub fn new( metrics: Arc, From 089feafeaa72db3eeb3bebc1b02c4428ccbe04b7 Mon Sep 17 00:00:00 2001 From: Fyodor Sotnikov Date: Wed, 8 Apr 2026 19:54:32 +0300 Subject: [PATCH 3/3] ff --- README.md | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 977b511d..30ef2a84 100644 --- a/README.md +++ b/README.md @@ -8,17 +8,15 @@ Pony is a lightweight control plane and orchestration platform for modern proxy Contains parts -- agent — manages Xray/Wireguard(in progress) connections/users/metrics +- agent — manages Xray/Wireguard/Hysteria2 connections/users/metrics - api — manages cluster of servers, gets API calls and send commands to servers -- auth — handles auth for Hysteri2 cleints +- auth — handles auth for Hysteri2 cleints - utils — helper to work/debug Bin messages ### As dependencies the platfrom has - ZeroMQ — communicating bus - PostgreSQL — user and node data storage -- Clickhouse — metrics storage -- Carbon — metrics relay - Xray Core - Hysteria2 - MTproto