From fb25768b8ddde401aa0a90c70a28318d494952a1 Mon Sep 17 00:00:00 2001 From: Michal Franczel Date: Thu, 15 Jan 2026 16:20:12 +0100 Subject: [PATCH 1/7] feat(resource-usage): Add lightweight resource metrics server --- .../resources/scripts/resource_usage.py | 255 ++++++++++++++++++ installer/__main__.py | 6 + 2 files changed, 261 insertions(+) create mode 100644 deepnote_core/resources/scripts/resource_usage.py diff --git a/deepnote_core/resources/scripts/resource_usage.py b/deepnote_core/resources/scripts/resource_usage.py new file mode 100644 index 0000000..3df5d88 --- /dev/null +++ b/deepnote_core/resources/scripts/resource_usage.py @@ -0,0 +1,255 @@ +#!/usr/bin/env python3 +"""Lightweight resource metrics server.""" + +import json +import os +import threading +import time +from functools import partial +from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer +from pathlib import Path +from typing import Any, Optional + +try: + import psutil + + HAS_PSUTIL = True +except ImportError: + HAS_PSUTIL = False + + +class ResourceMonitor: + """Monitors system resources via cgroups or psutil.""" + + def __init__(self, root_path: str = "/sys/fs/cgroup") -> None: + self.root = Path(root_path) + self.backend = self._detect_backend() + self._last_cpu_sec: Optional[float] = None + self._last_time: Optional[float] = None + self._lock = threading.Lock() + print(f"ResourceMonitor initialized with backend: {self.backend}") + + def _detect_backend(self) -> str: + if (self.root / "cgroup.controllers").exists(): + return "cgroupv2" + if (self.root / "memory/memory.limit_in_bytes").exists(): + return "cgroupv1" + if HAS_PSUTIL: + return "psutil" + return "none" + + def _read_file(self, path: Path) -> Optional[str]: + try: + return path.read_text().strip() + except (FileNotFoundError, PermissionError, OSError): + return None + + def get_memory(self) -> tuple[int, Optional[int]]: + """Returns (used_bytes, limit_bytes). Limit is None if unlimited.""" + if self.backend == "cgroupv2": + used = self._parse_int(self.root / "memory.current", 0) + limit_str = self._read_file(self.root / "memory.max") + limit = self._parse_limit(limit_str, "max") + return used, limit + + if self.backend == "cgroupv1": + used = self._parse_int(self.root / "memory/memory.usage_in_bytes", 0) + limit_str = self._read_file(self.root / "memory/memory.limit_in_bytes") + limit = self._parse_limit(limit_str, threshold=1_000_000_000_000_000) + return used, limit + + if self.backend == "psutil": + proc = psutil.Process() + return proc.memory_info().rss, psutil.virtual_memory().total + + return 0, None + + def get_cpu(self) -> tuple[Optional[float], Optional[float]]: + """Returns (usage_percent, limit_cores).""" + usage_sec = self._get_cpu_seconds() + percent = self._calc_percent(usage_sec) + limit = self._get_cpu_limit() + return percent, limit + + def _parse_int(self, path: Path, default: int = 0) -> int: + content = self._read_file(path) + if content: + try: + return int(content) + except ValueError: + pass + return default + + def _parse_limit( + self, + value: Optional[str], + unlimited_marker: Optional[str] = None, + threshold: Optional[int] = None, + ) -> Optional[int]: + if not value: + return None + if unlimited_marker and value == unlimited_marker: + return None + try: + parsed = int(value) + if threshold and parsed >= threshold: + return None + return parsed + except ValueError: + return None + + def _get_cpu_seconds(self) -> float: + if self.backend == "cgroupv2": + content = self._read_file(self.root / "cpu.stat") + if content: + for line in content.splitlines(): + if line.startswith("usage_usec"): + parts = line.split() + if len(parts) >= 2: + return int(parts[1]) / 1_000_000.0 + return 0.0 + + if self.backend == "cgroupv1": + content = self._read_file(self.root / "cpuacct/cpuacct.usage") + if content: + return int(content) / 1_000_000_000.0 + return 0.0 + + if self.backend == "psutil": + times = psutil.Process().cpu_times() + return times.user + times.system + + return 0.0 + + def _get_cpu_limit(self) -> Optional[float]: + if self.backend == "cgroupv2": + content = self._read_file(self.root / "cpu.max") + if not content: + return None + parts = content.split() + if len(parts) < 2 or parts[0] == "max": + return None + try: + return int(parts[0]) / int(parts[1]) + except (ValueError, ZeroDivisionError): + return None + + if self.backend == "cgroupv1": + quota = self._parse_int(self.root / "cpu/cpu.cfs_quota_us", -1) + period = self._parse_int(self.root / "cpu/cpu.cfs_period_us", 0) + if quota > 0 and period > 0: + return quota / period + return None + + if self.backend == "psutil": + return float(psutil.cpu_count(logical=True)) + + return None + + def _calc_percent(self, current_sec: float) -> Optional[float]: + now = time.monotonic() + with self._lock: + if self._last_cpu_sec is None or self._last_time is None: + self._last_cpu_sec = current_sec + self._last_time = now + return None + + time_delta = now - self._last_time + cpu_delta = current_sec - self._last_cpu_sec + self._last_cpu_sec = current_sec + self._last_time = now + + if time_delta <= 0: + return 0.0 + return (cpu_delta / time_delta) * 100.0 + + +class MetricsHandler(BaseHTTPRequestHandler): + """HTTP handler for resource metrics.""" + + def __init__( + self, monitor: ResourceMonitor, *args: Any, **kwargs: Any + ) -> None: + self.monitor = monitor + super().__init__(*args, **kwargs) + + def do_GET(self) -> None: + if self.path in ("/", "/resource-usage"): + self._send_metrics() + elif self.path == "/health": + self._send_text(200, "ok") + else: + self.send_error(404) + + def _send_metrics(self) -> None: + mem_used, mem_limit = self.monitor.get_memory() + cpu_percent, cpu_limit = self.monitor.get_cpu() + + env_limit = os.environ.get("MEM_LIMIT") + if env_limit: + try: + mem_limit = int(env_limit) + except ValueError: + pass + + mem_util = None + if mem_limit and mem_limit > 0: + mem_util = round((mem_used / mem_limit) * 100, 2) + + cpu_sat = None + if cpu_percent is not None and cpu_limit: + cpu_sat = round(cpu_percent / cpu_limit, 2) + + data = { + "meta": {"backend": self.monitor.backend, "timestamp": time.time()}, + "memory": { + "used_bytes": mem_used, + "limit_bytes": mem_limit, + "usage_percent": mem_util, + }, + "cpu": { + "usage_percent": round(cpu_percent, 2) if cpu_percent else None, + "limit_cores": cpu_limit, + "saturation_percent": cpu_sat, + }, + } + self._send_json(200, data) + + def _send_json(self, code: int, data: dict[str, Any]) -> None: + body = json.dumps(data, indent=2).encode() + self._send_response(code, body, "application/json") + + def _send_text(self, code: int, text: str) -> None: + self._send_response(code, text.encode(), "text/plain") + + def _send_response(self, code: int, body: bytes, content_type: str) -> None: + self.send_response(code) + self.send_header("Content-Type", content_type) + self.send_header("Content-Length", str(len(body))) + self.send_header("X-Content-Type-Options", "nosniff") + self.end_headers() + self.wfile.write(body) + + def log_message(self, format: str, *args: Any) -> None: + print(f"{self.address_string()} - {format % args}") + + +def main() -> None: + port = int(os.environ.get("RESOURCE_USAGE_METRICS_PORT", 9104)) + monitor = ResourceMonitor() + monitor.get_cpu() # Initialize CPU baseline + + handler = partial(MetricsHandler, monitor) + server = ThreadingHTTPServer(("0.0.0.0", port), handler) + + print(f"Starting server on port {port} (backend: {monitor.backend})") + + try: + server.serve_forever() + except KeyboardInterrupt: + print("Shutting down...") + server.server_close() + + +if __name__ == "__main__": + main() diff --git a/installer/__main__.py b/installer/__main__.py index fae2947..08c4b0d 100644 --- a/installer/__main__.py +++ b/installer/__main__.py @@ -382,6 +382,12 @@ def bootstrap(): ) all_actions.append(ExtraServerSpec(command=["python", prometheus_script])) + # Add resource usage server + resource_usage_script = os.path.join( + config_directory_path, "scripts", "resource_usage.py" + ) + all_actions.append(ExtraServerSpec(command=["python", resource_usage_script])) + # Execute all actions via the unified registry from .module.executor import run_actions_in_installer_env From 67859fa55d6f19f40c01b17969b4b34fc99b36e0 Mon Sep 17 00:00:00 2001 From: Michal Franczel Date: Thu, 15 Jan 2026 16:25:04 +0100 Subject: [PATCH 2/7] fix(resource-usage): Handle None values for CPU count and usage percent --- deepnote_core/resources/scripts/resource_usage.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/deepnote_core/resources/scripts/resource_usage.py b/deepnote_core/resources/scripts/resource_usage.py index 3df5d88..8c306cd 100644 --- a/deepnote_core/resources/scripts/resource_usage.py +++ b/deepnote_core/resources/scripts/resource_usage.py @@ -142,7 +142,8 @@ def _get_cpu_limit(self) -> Optional[float]: return None if self.backend == "psutil": - return float(psutil.cpu_count(logical=True)) + cpu_count = psutil.cpu_count(logical=True) + return float(cpu_count) if cpu_count is not None else None return None @@ -208,7 +209,7 @@ def _send_metrics(self) -> None: "usage_percent": mem_util, }, "cpu": { - "usage_percent": round(cpu_percent, 2) if cpu_percent else None, + "usage_percent": round(cpu_percent, 2) if cpu_percent is not None else None, "limit_cores": cpu_limit, "saturation_percent": cpu_sat, }, From 95b873a501fced1c9acfb35573181e8ef0d99b8d Mon Sep 17 00:00:00 2001 From: Michal Franczel Date: Thu, 15 Jan 2026 16:33:57 +0100 Subject: [PATCH 3/7] fix: formatting issues in resource_usage --- deepnote_core/resources/scripts/resource_usage.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/deepnote_core/resources/scripts/resource_usage.py b/deepnote_core/resources/scripts/resource_usage.py index 8c306cd..1273283 100644 --- a/deepnote_core/resources/scripts/resource_usage.py +++ b/deepnote_core/resources/scripts/resource_usage.py @@ -168,9 +168,7 @@ def _calc_percent(self, current_sec: float) -> Optional[float]: class MetricsHandler(BaseHTTPRequestHandler): """HTTP handler for resource metrics.""" - def __init__( - self, monitor: ResourceMonitor, *args: Any, **kwargs: Any - ) -> None: + def __init__(self, monitor: ResourceMonitor, *args: Any, **kwargs: Any) -> None: self.monitor = monitor super().__init__(*args, **kwargs) @@ -209,7 +207,9 @@ def _send_metrics(self) -> None: "usage_percent": mem_util, }, "cpu": { - "usage_percent": round(cpu_percent, 2) if cpu_percent is not None else None, + "usage_percent": ( + round(cpu_percent, 2) if cpu_percent is not None else None + ), "limit_cores": cpu_limit, "saturation_percent": cpu_sat, }, From 5ba463819964f97a1ff7d51275426c56a417312a Mon Sep 17 00:00:00 2001 From: Michal Franczel Date: Thu, 15 Jan 2026 17:00:38 +0100 Subject: [PATCH 4/7] fix: dont shadow builtin format --- deepnote_core/resources/scripts/resource_usage.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/deepnote_core/resources/scripts/resource_usage.py b/deepnote_core/resources/scripts/resource_usage.py index 1273283..ebbbf58 100644 --- a/deepnote_core/resources/scripts/resource_usage.py +++ b/deepnote_core/resources/scripts/resource_usage.py @@ -231,8 +231,8 @@ def _send_response(self, code: int, body: bytes, content_type: str) -> None: self.end_headers() self.wfile.write(body) - def log_message(self, format: str, *args: Any) -> None: - print(f"{self.address_string()} - {format % args}") + def log_message(self, msg_format: str, *args: Any) -> None: + print(f"{self.address_string()} - {msg_format % args}") def main() -> None: From d0397d4c44436706867eea73799ede226957b325 Mon Sep 17 00:00:00 2001 From: Michal Franczel Date: Thu, 15 Jan 2026 17:26:33 +0100 Subject: [PATCH 5/7] fix: subtract inactive fie from current memory --- .../resources/scripts/resource_usage.py | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/deepnote_core/resources/scripts/resource_usage.py b/deepnote_core/resources/scripts/resource_usage.py index ebbbf58..94bed41 100644 --- a/deepnote_core/resources/scripts/resource_usage.py +++ b/deepnote_core/resources/scripts/resource_usage.py @@ -47,7 +47,9 @@ def _read_file(self, path: Path) -> Optional[str]: def get_memory(self) -> tuple[int, Optional[int]]: """Returns (used_bytes, limit_bytes). Limit is None if unlimited.""" if self.backend == "cgroupv2": - used = self._parse_int(self.root / "memory.current", 0) + current = self._parse_int(self.root / "memory.current", 0) + inactive_file = self._parse_memory_stat("inactive_file") + used = current - inactive_file limit_str = self._read_file(self.root / "memory.max") limit = self._parse_limit(limit_str, "max") return used, limit @@ -98,6 +100,21 @@ def _parse_limit( except ValueError: return None + def _parse_memory_stat(self, key: str) -> int: + """Parse a value from memory.stat file.""" + content = self._read_file(self.root / "memory.stat") + if not content: + return 0 + for line in content.splitlines(): + if line.startswith(key): + parts = line.split() + if len(parts) >= 2: + try: + return int(parts[1]) + except ValueError: + pass + return 0 + def _get_cpu_seconds(self) -> float: if self.backend == "cgroupv2": content = self._read_file(self.root / "cpu.stat") From 0a619fffac701a2f86c8f08b602a18c774da4104 Mon Sep 17 00:00:00 2001 From: Michal Franczel Date: Fri, 16 Jan 2026 11:27:11 +0100 Subject: [PATCH 6/7] refactor(resource-usage): Replace print statements with logging for better monitoring --- deepnote_core/resources/scripts/resource_usage.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/deepnote_core/resources/scripts/resource_usage.py b/deepnote_core/resources/scripts/resource_usage.py index 94bed41..d1990a4 100644 --- a/deepnote_core/resources/scripts/resource_usage.py +++ b/deepnote_core/resources/scripts/resource_usage.py @@ -2,6 +2,7 @@ """Lightweight resource metrics server.""" import json +import logging import os import threading import time @@ -27,7 +28,6 @@ def __init__(self, root_path: str = "/sys/fs/cgroup") -> None: self._last_cpu_sec: Optional[float] = None self._last_time: Optional[float] = None self._lock = threading.Lock() - print(f"ResourceMonitor initialized with backend: {self.backend}") def _detect_backend(self) -> str: if (self.root / "cgroup.controllers").exists(): @@ -249,10 +249,15 @@ def _send_response(self, code: int, body: bytes, content_type: str) -> None: self.wfile.write(body) def log_message(self, msg_format: str, *args: Any) -> None: - print(f"{self.address_string()} - {msg_format % args}") + logging.info(f"{self.address_string()} - {msg_format % args}") def main() -> None: + logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", + ) + port = int(os.environ.get("RESOURCE_USAGE_METRICS_PORT", 9104)) monitor = ResourceMonitor() monitor.get_cpu() # Initialize CPU baseline @@ -260,12 +265,12 @@ def main() -> None: handler = partial(MetricsHandler, monitor) server = ThreadingHTTPServer(("0.0.0.0", port), handler) - print(f"Starting server on port {port} (backend: {monitor.backend})") + logging.info(f"Starting server on port {port} (backend: {monitor.backend})") try: server.serve_forever() except KeyboardInterrupt: - print("Shutting down...") + logging.info("Shutting down...") server.server_close() From 9e9d988a98783b3aa61f134c238cedbd065664fc Mon Sep 17 00:00:00 2001 From: Michal Franczel Date: Fri, 16 Jan 2026 11:29:24 +0100 Subject: [PATCH 7/7] refactor(resource-usage): Simplify parsing logic and improve early returns in resource usage methods --- .../resources/scripts/resource_usage.py | 54 ++++++++++--------- 1 file changed, 29 insertions(+), 25 deletions(-) diff --git a/deepnote_core/resources/scripts/resource_usage.py b/deepnote_core/resources/scripts/resource_usage.py index d1990a4..beaa98a 100644 --- a/deepnote_core/resources/scripts/resource_usage.py +++ b/deepnote_core/resources/scripts/resource_usage.py @@ -75,12 +75,12 @@ def get_cpu(self) -> tuple[Optional[float], Optional[float]]: def _parse_int(self, path: Path, default: int = 0) -> int: content = self._read_file(path) - if content: - try: - return int(content) - except ValueError: - pass - return default + if not content: + return default + try: + return int(content) + except ValueError: + return default def _parse_limit( self, @@ -106,31 +106,35 @@ def _parse_memory_stat(self, key: str) -> int: if not content: return 0 for line in content.splitlines(): - if line.startswith(key): - parts = line.split() - if len(parts) >= 2: - try: - return int(parts[1]) - except ValueError: - pass + if not line.startswith(key): + continue + parts = line.split() + if len(parts) < 2: + continue + try: + return int(parts[1]) + except ValueError: + continue return 0 def _get_cpu_seconds(self) -> float: if self.backend == "cgroupv2": content = self._read_file(self.root / "cpu.stat") - if content: - for line in content.splitlines(): - if line.startswith("usage_usec"): - parts = line.split() - if len(parts) >= 2: - return int(parts[1]) / 1_000_000.0 + if not content: + return 0.0 + for line in content.splitlines(): + if not line.startswith("usage_usec"): + continue + parts = line.split() + if len(parts) >= 2: + return int(parts[1]) / 1_000_000.0 return 0.0 if self.backend == "cgroupv1": content = self._read_file(self.root / "cpuacct/cpuacct.usage") - if content: - return int(content) / 1_000_000_000.0 - return 0.0 + if not content: + return 0.0 + return int(content) / 1_000_000_000.0 if self.backend == "psutil": times = psutil.Process().cpu_times() @@ -154,9 +158,9 @@ def _get_cpu_limit(self) -> Optional[float]: if self.backend == "cgroupv1": quota = self._parse_int(self.root / "cpu/cpu.cfs_quota_us", -1) period = self._parse_int(self.root / "cpu/cpu.cfs_period_us", 0) - if quota > 0 and period > 0: - return quota / period - return None + if quota <= 0 or period <= 0: + return None + return quota / period if self.backend == "psutil": cpu_count = psutil.cpu_count(logical=True)