Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions lib/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ mod node;
pub mod project_control_plane;
mod repo;
mod state;
pub mod tunnel_activity;
pub mod tunnels;
pub mod update;

Expand All @@ -20,6 +21,7 @@ pub use node::*;
pub use project_control_plane::ProjectControlPlaneClient;
pub use repo::Repo;
pub use state::*;
pub use tunnel_activity::{TunnelActivityEntry, TunnelActivityTracker};
pub use tunnels::{TunnelDeleteOutcome, TunnelService, TunnelSummary};
pub use update::{UpdateChannel, UpdateChecker, UpdateInfo, UpdateSettings};

Expand Down
224 changes: 224 additions & 0 deletions lib/src/tunnel_activity.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
use std::collections::{HashMap, VecDeque};
use std::time::{Duration, Instant};

use iroh_proxy_utils::upstream::UpstreamMetrics;

use crate::TunnelSummary;

const HOUR: Duration = Duration::from_secs(3600);

/// Snapshot of a tunnel's recent traffic for tray menu display.
#[derive(Debug, Clone)]
pub struct TunnelActivityEntry {
pub tunnel_id: String,
pub label: String,
pub online: bool,
pub bytes_last_hour: u64,
pub rate_per_s: u64,
pub last_activity_at: Instant,
}

#[derive(Debug)]
struct TunnelSampleState {
label: String,
online: bool,
last_send: u64,
last_recv: u64,
last_sample_at: Instant,
last_activity_at: Option<Instant>,
hourly_deltas: VecDeque<(Instant, u64)>,
rate_per_s: u64,
}

#[derive(Debug, Default)]
pub struct TunnelActivityTracker {
tunnels: HashMap<String, TunnelSampleState>,
}

impl TunnelActivityTracker {
pub fn new() -> Self {
Self::default()
}

/// Sample cumulative proxy metrics against the current tunnel list.
///
/// Metrics are keyed by local endpoint authority; two tunnels targeting the same
/// host:port share counters.
pub fn tick(&mut self, tunnels: &[TunnelSummary], metrics: &UpstreamMetrics) {
let now = Instant::now();
let active_ids: std::collections::HashSet<&str> =
tunnels.iter().map(|t| t.id.as_str()).collect();
self.tunnels
.retain(|id, _| active_ids.contains(id.as_str()));

for tunnel in tunnels {
let Some(_authority) = tunnel.origin_authority() else {
continue;
};
let (send, recv) = metrics_bytes_for_tunnel(metrics, tunnel);

let entry =
self.tunnels
.entry(tunnel.id.clone())
.or_insert_with(|| TunnelSampleState {
label: tunnel.label.clone(),
online: tunnel_is_online(tunnel),
last_send: send,
last_recv: recv,
last_sample_at: now,
last_activity_at: None,
hourly_deltas: VecDeque::new(),
rate_per_s: 0,
});

entry.label = tunnel.label.clone();
entry.online = tunnel_is_online(tunnel);

if send < entry.last_send || recv < entry.last_recv {
entry.last_send = send;
entry.last_recv = recv;
entry.last_sample_at = now;
entry.rate_per_s = 0;
continue;
}

let delta_send = send.saturating_sub(entry.last_send);
let delta_recv = recv.saturating_sub(entry.last_recv);
let delta_total = delta_send + delta_recv;

if delta_total > 0 {
entry.hourly_deltas.push_back((now, delta_total));
entry.last_activity_at = Some(now);
}

while let Some(front) = entry.hourly_deltas.front() {
if now.duration_since(front.0) > HOUR {
entry.hourly_deltas.pop_front();
} else {
break;
}
}

let dt = now
.duration_since(entry.last_sample_at)
.as_secs_f64()
.max(0.001);
entry.rate_per_s = if delta_total > 0 {
(delta_total as f64 / dt) as u64
} else {
0
};

entry.last_send = send;
entry.last_recv = recv;
entry.last_sample_at = now;
}
}

/// Active tunnels for the tray menu, most recently used first (up to `limit`).
/// Includes tunnels with no recent traffic.
pub fn recent_active(&self, limit: usize) -> Vec<TunnelActivityEntry> {
let mut entries: Vec<TunnelActivityEntry> = self
.tunnels
.iter()
.map(|(id, state)| {
let bytes_last_hour: u64 = state.hourly_deltas.iter().map(|(_, b)| *b).sum();
TunnelActivityEntry {
tunnel_id: id.clone(),
label: state.label.clone(),
online: state.online,
bytes_last_hour,
rate_per_s: state.rate_per_s,
last_activity_at: state.last_activity_at.unwrap_or(state.last_sample_at),
}
})
.collect();
entries.sort_by(|a, b| {
let a_active = a.bytes_last_hour > 0 || a.rate_per_s > 0;
let b_active = b.bytes_last_hour > 0 || b.rate_per_s > 0;
match (a_active, b_active) {
(true, false) => std::cmp::Ordering::Less,
(false, true) => std::cmp::Ordering::Greater,
_ => b.last_activity_at.cmp(&a.last_activity_at),
}
});
entries.truncate(limit);
entries
}
}

fn metrics_bytes_for_tunnel(metrics: &UpstreamMetrics, tunnel: &TunnelSummary) -> (u64, u64) {
let Some(authority) = tunnel.origin_authority() else {
return (0, 0);
};
for auth in authority_lookup_variants(&authority) {
if let Some(m) = metrics.get(&auth) {
return (m.bytes_from_origin(), m.bytes_to_origin());
}
}
(0, 0)
}

fn tunnel_is_online(tunnel: &TunnelSummary) -> bool {
tunnel.enabled && tunnel.accepted && tunnel.programmed
}

fn authority_lookup_variants(
authority: &iroh_proxy_utils::Authority,
) -> Vec<iroh_proxy_utils::Authority> {
use iroh_proxy_utils::Authority;
let mut variants = vec![authority.clone()];
if authority.host == "localhost" {
variants.push(Authority {
host: "127.0.0.1".to_string(),
port: authority.port,
});
} else if authority.host == "127.0.0.1" {
variants.push(Authority {
host: "localhost".to_string(),
port: authority.port,
});
}
variants
}

#[cfg(test)]
mod tests {
use super::*;
use iroh_proxy_utils::Authority;
use std::str::FromStr;

fn tunnel(id: &str, label: &str, endpoint: &str) -> TunnelSummary {
TunnelSummary {
id: id.to_string(),
label: label.to_string(),
endpoint: endpoint.to_string(),
hostnames: vec![],
enabled: true,
accepted: true,
programmed: true,
}
}

#[test]
fn recent_active_includes_idle_tunnels() {
let mut tracker = TunnelActivityTracker::new();
let tunnels = vec![tunnel("t1", "app", "http://127.0.0.1:8080")];
let metrics = UpstreamMetrics::default();
tracker.tick(&tunnels, &metrics);
let recent = tracker.recent_active(5);
assert_eq!(recent.len(), 1);
assert_eq!(recent[0].tunnel_id, "t1");
assert_eq!(recent[0].label, "app");
assert!(recent[0].online);
assert_eq!(recent[0].rate_per_s, 0);
}

#[test]
fn origin_authority_parses_endpoint() {
let t = tunnel("t1", "app", "http://127.0.0.1:8080");
assert!(t.origin_authority().is_some());
let auth = t.origin_authority().unwrap();
assert_eq!(Authority::from_str("127.0.0.1:8080").ok(), Some(auth));
}
}
12 changes: 7 additions & 5 deletions lib/src/tunnels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,7 @@ impl TunnelService {
debug!(
project_id = %project_id_owned,
proxy = %proxy_name,
"skipping TrafficProtectionPolicy creation (env disabled)"
"skipping TrafficProtectionPolicy creation (disabled via env)"
);
}

Expand Down Expand Up @@ -1213,12 +1213,14 @@ fn publish_tickets_enabled() -> bool {
}

fn create_traffic_protection_policies_enabled() -> bool {
std::env::var("DATUM_CONNECT_CREATE_TRAFFIC_PROTECTION_POLICIES")
let raw = std::env::var("DATUM_CONNECT_CREATE_TRAFFIC_PROTECTION_POLICIES")
.ok()
.or_else(|| {
option_env!("BUILD_DATUM_CONNECT_CREATE_TRAFFIC_PROTECTION_POLICIES")
.map(str::to_string)
})
.map(|value| matches!(value.as_str(), "1" | "true" | "TRUE" | "yes" | "YES"))
.unwrap_or(false)
});
match raw {
Some(value) if matches!(value.as_str(), "0" | "false" | "FALSE" | "no" | "NO") => false,
_ => true,
}
}
71 changes: 71 additions & 0 deletions scripts/traffic-test-server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
#!/usr/bin/env python3
"""Minimal HTTP server for exercising Datum tunnel traffic metrics."""

from http.server import BaseHTTPRequestHandler, HTTPServer

HOST = "127.0.0.1"
PORT = 3001
PAYLOAD_KB = 64


class Handler(BaseHTTPRequestHandler):
def do_GET(self) -> None:
if self.path in ("/", "/index.html"):
body = f"""<!DOCTYPE html>
<html>
<head><title>Datum traffic test</title></head>
<body style="font-family: system-ui, sans-serif; padding: 2rem;">
<h1>Traffic test server</h1>
<p>Listening on <code>http://{HOST}:{PORT}</code></p>
<ul>
<li><a href="/ping">/ping</a> — small response</li>
<li><a href="/data">/data</a> — {PAYLOAD_KB} KB payload</li>
</ul>
<button id="burst">Generate burst (10× /data)</button>
<pre id="log"></pre>
<script>
const log = document.getElementById("log");
document.getElementById("burst").onclick = async () => {{
log.textContent = "Fetching...";
await Promise.all(Array.from({{length: 10}}, () => fetch("/data")));
log.textContent = "Done — sent 10 requests";
}};
</script>
</body>
</html>
""".encode()
self._respond(200, "text/html; charset=utf-8", body)
return

if self.path == "/ping":
self._respond(200, "text/plain; charset=utf-8", b"ok\n")
return

if self.path == "/data":
body = b"x" * (PAYLOAD_KB * 1024)
self._respond(200, "application/octet-stream", body)
return

self._respond(404, "text/plain; charset=utf-8", b"not found\n")

def _respond(self, status: int, content_type: str, body: bytes) -> None:
self.send_response(status)
self.send_header("Content-Type", content_type)
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)

def log_message(self, format: str, *args) -> None:
print(f"[{self.log_date_time_string()}] {format % args}")


def main() -> None:
server = HTTPServer((HOST, PORT), Handler)
print(f"Traffic test server running at http://{HOST}:{PORT}")
print(" /ping — small response")
print(f" /data — {PAYLOAD_KB} KB download")
server.serve_forever()


if __name__ == "__main__":
main()
12 changes: 12 additions & 0 deletions ui/assets/tailwind.css
Original file line number Diff line number Diff line change
Expand Up @@ -1390,6 +1390,11 @@
outline-style: none;
}
}
.disabled\:cursor-not-allowed {
&:disabled {
cursor: not-allowed;
}
}
.disabled\:bg-content-background {
&:disabled {
background-color: var(--content-background);
Expand Down Expand Up @@ -1635,7 +1640,14 @@
font-family: "Alliance No1", ui-sans-serif, system-ui, sans-serif, "Apple Color Emoji",
"Segoe UI Emoji", "Segoe UI Symbol", "Noto Color Emoji";
}
button:not(:disabled) {
cursor: pointer;
}
button:disabled {
cursor: not-allowed;
}
a {
cursor: pointer;
&:focus {
outline-style: var(--tw-outline-style);
outline-width: 1px;
Expand Down
Binary file added ui/assets/tray/status-offline.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added ui/assets/tray/status-online.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added ui/assets/tray/tray-icon-template.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
8 changes: 4 additions & 4 deletions ui/src/components/button.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@ pub struct ButtonProps {
fn class_for(kind: ButtonKind) -> &'static str {
// [transform:translateZ(0)] keeps the button on its own compositing layer so opacity hover doesn't cause subpixel shift
match kind {
ButtonKind::Primary => "inline-flex items-center justify-center gap-2 rounded-md px-3.5 py-2.5 bg-button-primary-background/90 text-button-primary-foreground hover:opacity-80 transition-opacity duration-300 border border-1 border-button-primary-background [transform:translateZ(0)] text-xs focus:outline-2 focus:outline-button-primary-background/50",
ButtonKind::Secondary => "inline-flex items-center justify-center gap-2 rounded-md px-3.5 py-2.5 bg-button-secondary-background/90 text-button-secondary-foreground border border-1 border-button-secondary-background hover:opacity-80 transition-opacity duration-300 text-xs [transform:translateZ(0)] focus:outline-2 focus:outline-button-secondary-background/50",
ButtonKind::Ghost => "inline-flex items-center justify-center gap-2 rounded-md px-3.5 py-2.5 bg-transparent text-button-outline-foreground border border-1 border-button-outline-background hover:opacity-80 transition-opacity duration-300 [transform:translateZ(0)] text-xs focus:outline-2 focus:outline-button-outline-background/50",
ButtonKind::Outline => "inline-flex items-center justify-center gap-2 rounded-md px-3.5 py-2.5 bg-transparent text-foreground border border-1 border-foreground hover:opacity-80 transition-opacity duration-300 [transform:translateZ(0)] text-xs focus:outline-2 focus:outline-foreground/50",
ButtonKind::Primary => "inline-flex items-center justify-center gap-2 rounded-md px-3.5 py-2.5 bg-button-primary-background/90 text-button-primary-foreground hover:opacity-80 transition-opacity duration-300 border border-1 border-button-primary-background [transform:translateZ(0)] text-xs focus:outline-2 focus:outline-button-primary-background/50 cursor-pointer disabled:cursor-not-allowed",
ButtonKind::Secondary => "inline-flex items-center justify-center gap-2 rounded-md px-3.5 py-2.5 bg-button-secondary-background/90 text-button-secondary-foreground border border-1 border-button-secondary-background hover:opacity-80 transition-opacity duration-300 text-xs [transform:translateZ(0)] focus:outline-2 focus:outline-button-secondary-background/50 cursor-pointer disabled:cursor-not-allowed",
ButtonKind::Ghost => "inline-flex items-center justify-center gap-2 rounded-md px-3.5 py-2.5 bg-transparent text-button-outline-foreground border border-1 border-button-outline-background hover:opacity-80 transition-opacity duration-300 [transform:translateZ(0)] text-xs focus:outline-2 focus:outline-button-outline-background/50 cursor-pointer disabled:cursor-not-allowed",
ButtonKind::Outline => "inline-flex items-center justify-center gap-2 rounded-md px-3.5 py-2.5 bg-transparent text-foreground border border-1 border-foreground hover:opacity-80 transition-opacity duration-300 [transform:translateZ(0)] text-xs focus:outline-2 focus:outline-foreground/50 cursor-pointer disabled:cursor-not-allowed",
}
}

Expand Down
Loading
Loading