Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .zed/tasks.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,14 @@
"label": "Run agent",
"command": "cargo run --bin agent -- dev/config-agent.toml ",
},
{
"label": "Run agent2",
"command": "cargo run --bin agent -- dev/config-agent2.toml ",
},
{
"label": "Run agent3",
"command": "cargo run --bin agent -- dev/config-agent3.toml ",
},
{
"label": "Run api",
"command": "cargo run --bin api -- dev/config-api.toml ",
Expand Down
6 changes: 2 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,15 @@ Pony is a lightweight control plane and orchestration platform for modern proxy

Contains parts

- agent — manages Xray/Wireguard(in progress) connections/users/metrics
- agent — manages Xray/Wireguard/Hysteria2 connections/users/metrics
- api — manages cluster of servers, gets API calls and send commands to servers
- auth — handles auth for Hysteri2 cleints
- auth — handles auth for Hysteri2 cleints
- utils — helper to work/debug Bin messages

### As dependencies the platfrom has

- ZeroMQ — communicating bus
- PostgreSQL — user and node data storage
- Clickhouse — metrics storage
- Carbon — metrics relay
- Xray Core
- Hysteria2
- MTproto
Expand Down
1 change: 0 additions & 1 deletion config-agent-example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ xray_config_path = "/etc/xray/config.json"
enabled = true
port = 51820
interface = "wg0"
# privkey и address обычно лучше через env, но можно и тут

[h2]
enabled = false
Expand Down
6 changes: 4 additions & 2 deletions config-api-example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,16 @@
address = "0.0.0.0"
port = 3005
token = "your-super-secret-api-token"
metrics_enabled = true
metrics_interval = 60
db_sync_interval_sec = 300
subscription_restore_interval = 60
subscription_expire_interval = 60
max_points = 100000000
retention_seconds = 604800 # 7 days

[metrics]
reciever = "tcp://0.0.0.0:3001"
topic = ["metrics"]

[node]
env = "experimental"
label = "🏴‍☠️ DarkMachine"
Expand Down
10 changes: 10 additions & 0 deletions dev/dev.sql
Original file line number Diff line number Diff line change
Expand Up @@ -124,3 +124,13 @@ DROP TABLE keys;

DROP TYPE node_status;
DROP TYPE proto;




===

CREATE TYPE node_type AS ENUM ('common', 'premium');

ALTER TABLE nodes
ADD COLUMN node_type node_type NOT NULL DEFAULT 'common';
3 changes: 3 additions & 0 deletions src/bin/agent/core/http.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use async_trait::async_trait;
use pony::memory::node::Type;
use reqwest::Client as HttpClient;
use reqwest::StatusCode;
use reqwest::Url;
Expand Down Expand Up @@ -35,6 +36,7 @@ pub struct NodeRequest {
pub cores: usize,
pub max_bandwidth_bps: i64,
pub country: String,
pub r#type: Type,
}

#[async_trait]
Expand Down Expand Up @@ -150,6 +152,7 @@ where
cores: node.cores,
max_bandwidth_bps: node.max_bandwidth_bps,
country: node.country,
r#type: node.r#type,
};

let res = HttpClient::new()
Expand Down
6 changes: 0 additions & 6 deletions src/bin/api/core/http/filters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,6 @@ where
warp::any().map(move || mem_sync.clone())
}

pub fn with_param_string(
param: String,
) -> impl Filter<Extract = (String,), Error = std::convert::Infallible> + Clone {
warp::any().map(move || param.clone())
}

pub fn with_param_vec(
param: Vec<u8>,
) -> impl Filter<Extract = (Vec<u8>,), Error = std::convert::Infallible> + Clone {
Expand Down
80 changes: 67 additions & 13 deletions src/bin/api/core/http/handlers/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
use chrono::Utc;
use futures::{SinkExt, StreamExt};
use std::collections::BTreeMap;
use std::sync::Arc;

use pony::metrics::storage::MetricStorage;

pub async fn handle_ws_client(
socket: warp::ws::WebSocket,
node_id: uuid::Uuid,
metric: String,
series_hash: u64,
storage: Arc<MetricStorage>,
) {
use futures::{SinkExt, StreamExt};
let (mut ws_tx, _) = socket.split();
let mut ticker = tokio::time::interval(std::time::Duration::from_millis(1000));

Expand All @@ -18,19 +20,71 @@ pub async fn handle_ws_client(
let now_ms = chrono::Utc::now().timestamp_millis();
let ten_min_ago_ms = now_ms - (10 * 60 * 1000);

let points = storage.get_range(&node_id, &metric, ten_min_ago_ms, now_ms);
let points = storage.get_range(&node_id, series_hash, ten_min_ago_ms, now_ms);

if !points.is_empty() {
let chart_points: Vec<serde_json::Value> = points
.into_iter()
.map(|p| serde_json::json!({ "x": p.timestamp, "y": p.value }))
.collect();

if let Ok(msg) = serde_json::to_string(&chart_points) {
if let Err(e) = ws_tx.send(warp::ws::Message::text(msg)).await {
log::error!("WS send error: {}", e);
break;
}
let msg = serde_json::json!({
"type": "update",
"node_id": node_id,
"hash": series_hash,
"data": points.iter().map(|p| (p.timestamp, p.value)).collect::<Vec<_>>()
});

if ws_tx
.send(warp::ws::Message::text(msg.to_string()))
.await
.is_err()
{
break;
}
}
}
}

pub async fn handle_aggregated_ws(
socket: warp::ws::WebSocket,
tag_key: String,
tag_value: String,
metric_name: String,
storage: Arc<MetricStorage>,
) {
let (mut ws_tx, _) = socket.split();
let mut ticker = tokio::time::interval(std::time::Duration::from_millis(1000));

loop {
ticker.tick().await;

let now = Utc::now().timestamp_millis();
let aggregated_data =
storage.get_aggregated_range(&tag_key, &tag_value, &metric_name, now - 600_000, now);

if !aggregated_data.is_empty() {
let response = aggregated_data
.iter()
.map(|(id, points)| {
(
id,
points
.iter()
.map(|p| (p.timestamp, p.value))
.collect::<Vec<_>>(),
)
})
.collect::<BTreeMap<_, _>>();

let msg = serde_json::json!({
"type": "aggregated_update",
"tag": format!("{}:{}", tag_key, tag_value),
"metric": metric_name,
"data": response
});

if ws_tx
.send(warp::ws::Message::text(msg.to_string()))
.await
.is_err()
{
break;
}
}
}
Expand Down
28 changes: 11 additions & 17 deletions src/bin/api/core/http/handlers/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,33 +136,24 @@ where
node_metrics_map
.iter()
.filter_map(|entry| {
let series_key = entry.key();
let series_hash = entry.key();
let points = entry.value();

if points.is_empty() {
return None;
}

if !(series_key.starts_with("sys.")
|| series_key.starts_with("net."))
{
let (name, tags) = metrics.metadata.get(series_hash).map(|m| {
let val = m.value();
(val.0.clone(), val.1.clone())
})?;

if !(name.starts_with("sys.") || name.starts_with("net.")) {
return None;
}

let tags = metrics
.metadata
.get(series_key)
.map(|m| m.value().clone())
.unwrap_or_default();

let name = series_key
.split(':')
.next()
.unwrap_or(series_key)
.to_string();

Some(NodeMetricInfo {
key: series_key.clone(),
key: series_hash.to_string(),
name,
tags,
})
Expand All @@ -189,9 +180,12 @@ where
None => {
let response = ResponseMessage::<Option<Vec<NodeResponse>>> {
status: StatusCode::NOT_FOUND.as_u16(),

message: "List of nodes".to_string(),

response: vec![].into(),
};

Ok(warp::reply::with_status(
warp::reply::json(&response),
StatusCode::NOT_FOUND,
Expand Down
Loading
Loading