Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
282 changes: 282 additions & 0 deletions deepnote_core/resources/scripts/resource_usage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,282 @@
#!/usr/bin/env python3
"""Lightweight resource metrics server."""

import json
import logging
import os
import threading
import time
from functools import partial
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from pathlib import Path
from typing import Any, Optional

try:
import psutil

HAS_PSUTIL = True
except ImportError:
HAS_PSUTIL = False


class ResourceMonitor:
"""Monitors system resources via cgroups or psutil."""

def __init__(self, root_path: str = "/sys/fs/cgroup") -> None:
self.root = Path(root_path)
self.backend = self._detect_backend()
self._last_cpu_sec: Optional[float] = None
self._last_time: Optional[float] = None
self._lock = threading.Lock()

def _detect_backend(self) -> str:
if (self.root / "cgroup.controllers").exists():
return "cgroupv2"
if (self.root / "memory/memory.limit_in_bytes").exists():
return "cgroupv1"
if HAS_PSUTIL:
return "psutil"
return "none"

def _read_file(self, path: Path) -> Optional[str]:
try:
return path.read_text().strip()
except (FileNotFoundError, PermissionError, OSError):
return None

def get_memory(self) -> tuple[int, Optional[int]]:
"""Returns (used_bytes, limit_bytes). Limit is None if unlimited."""
if self.backend == "cgroupv2":
current = self._parse_int(self.root / "memory.current", 0)
inactive_file = self._parse_memory_stat("inactive_file")
used = current - inactive_file
limit_str = self._read_file(self.root / "memory.max")
limit = self._parse_limit(limit_str, "max")
return used, limit

if self.backend == "cgroupv1":
used = self._parse_int(self.root / "memory/memory.usage_in_bytes", 0)
limit_str = self._read_file(self.root / "memory/memory.limit_in_bytes")
limit = self._parse_limit(limit_str, threshold=1_000_000_000_000_000)
return used, limit

if self.backend == "psutil":
proc = psutil.Process()
return proc.memory_info().rss, psutil.virtual_memory().total

return 0, None

def get_cpu(self) -> tuple[Optional[float], Optional[float]]:
"""Returns (usage_percent, limit_cores)."""
usage_sec = self._get_cpu_seconds()
percent = self._calc_percent(usage_sec)
limit = self._get_cpu_limit()
return percent, limit

def _parse_int(self, path: Path, default: int = 0) -> int:
content = self._read_file(path)
if not content:
return default
try:
return int(content)
except ValueError:
return default

def _parse_limit(
self,
value: Optional[str],
unlimited_marker: Optional[str] = None,
threshold: Optional[int] = None,
) -> Optional[int]:
if not value:
return None
if unlimited_marker and value == unlimited_marker:
return None
try:
parsed = int(value)
if threshold and parsed >= threshold:
return None
return parsed
except ValueError:
return None

def _parse_memory_stat(self, key: str) -> int:
"""Parse a value from memory.stat file."""
content = self._read_file(self.root / "memory.stat")
if not content:
return 0
for line in content.splitlines():
if not line.startswith(key):
continue
parts = line.split()
if len(parts) < 2:
continue
try:
return int(parts[1])
except ValueError:
continue
return 0

def _get_cpu_seconds(self) -> float:
if self.backend == "cgroupv2":
content = self._read_file(self.root / "cpu.stat")
if not content:
return 0.0
for line in content.splitlines():
if not line.startswith("usage_usec"):
continue
parts = line.split()
if len(parts) >= 2:
return int(parts[1]) / 1_000_000.0
return 0.0

if self.backend == "cgroupv1":
content = self._read_file(self.root / "cpuacct/cpuacct.usage")
if not content:
return 0.0
return int(content) / 1_000_000_000.0

if self.backend == "psutil":
times = psutil.Process().cpu_times()
return times.user + times.system

return 0.0

def _get_cpu_limit(self) -> Optional[float]:
if self.backend == "cgroupv2":
content = self._read_file(self.root / "cpu.max")
if not content:
return None
parts = content.split()
if len(parts) < 2 or parts[0] == "max":
return None
try:
return int(parts[0]) / int(parts[1])
except (ValueError, ZeroDivisionError):
return None

if self.backend == "cgroupv1":
quota = self._parse_int(self.root / "cpu/cpu.cfs_quota_us", -1)
period = self._parse_int(self.root / "cpu/cpu.cfs_period_us", 0)
if quota <= 0 or period <= 0:
return None
return quota / period

if self.backend == "psutil":
cpu_count = psutil.cpu_count(logical=True)
return float(cpu_count) if cpu_count is not None else None

return None

def _calc_percent(self, current_sec: float) -> Optional[float]:
now = time.monotonic()
with self._lock:
if self._last_cpu_sec is None or self._last_time is None:
self._last_cpu_sec = current_sec
self._last_time = now
return None

time_delta = now - self._last_time
cpu_delta = current_sec - self._last_cpu_sec
self._last_cpu_sec = current_sec
self._last_time = now

if time_delta <= 0:
return 0.0
return (cpu_delta / time_delta) * 100.0


class MetricsHandler(BaseHTTPRequestHandler):
"""HTTP handler for resource metrics."""

def __init__(self, monitor: ResourceMonitor, *args: Any, **kwargs: Any) -> None:
self.monitor = monitor
super().__init__(*args, **kwargs)

def do_GET(self) -> None:
if self.path in ("/", "/resource-usage"):
self._send_metrics()
elif self.path == "/health":
self._send_text(200, "ok")
else:
self.send_error(404)

def _send_metrics(self) -> None:
mem_used, mem_limit = self.monitor.get_memory()
cpu_percent, cpu_limit = self.monitor.get_cpu()

env_limit = os.environ.get("MEM_LIMIT")
if env_limit:
try:
mem_limit = int(env_limit)
except ValueError:
pass

mem_util = None
if mem_limit and mem_limit > 0:
mem_util = round((mem_used / mem_limit) * 100, 2)

cpu_sat = None
if cpu_percent is not None and cpu_limit:
cpu_sat = round(cpu_percent / cpu_limit, 2)

data = {
"meta": {"backend": self.monitor.backend, "timestamp": time.time()},
"memory": {
"used_bytes": mem_used,
"limit_bytes": mem_limit,
"usage_percent": mem_util,
},
"cpu": {
"usage_percent": (
round(cpu_percent, 2) if cpu_percent is not None else None
),
"limit_cores": cpu_limit,
"saturation_percent": cpu_sat,
},
}
self._send_json(200, data)

def _send_json(self, code: int, data: dict[str, Any]) -> None:
body = json.dumps(data, indent=2).encode()
self._send_response(code, body, "application/json")

def _send_text(self, code: int, text: str) -> None:
self._send_response(code, text.encode(), "text/plain")

def _send_response(self, code: int, body: bytes, content_type: str) -> None:
self.send_response(code)
self.send_header("Content-Type", content_type)
self.send_header("Content-Length", str(len(body)))
self.send_header("X-Content-Type-Options", "nosniff")
self.end_headers()
self.wfile.write(body)

def log_message(self, msg_format: str, *args: Any) -> None:
logging.info(f"{self.address_string()} - {msg_format % args}")


def main() -> None:
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)

port = int(os.environ.get("RESOURCE_USAGE_METRICS_PORT", 9104))
monitor = ResourceMonitor()
monitor.get_cpu() # Initialize CPU baseline

handler = partial(MetricsHandler, monitor)
server = ThreadingHTTPServer(("0.0.0.0", port), handler)

logging.info(f"Starting server on port {port} (backend: {monitor.backend})")

try:
server.serve_forever()
except KeyboardInterrupt:
logging.info("Shutting down...")
server.server_close()


if __name__ == "__main__":
main()
6 changes: 6 additions & 0 deletions installer/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,12 @@ def bootstrap():
)
all_actions.append(ExtraServerSpec(command=["python", prometheus_script]))

# Add resource usage server
resource_usage_script = os.path.join(
config_directory_path, "scripts", "resource_usage.py"
)
all_actions.append(ExtraServerSpec(command=["python", resource_usage_script]))

# Execute all actions via the unified registry
from .module.executor import run_actions_in_installer_env

Expand Down