From 47ef2ab17d2ccda24ea75586b701e58e3d0ca386 Mon Sep 17 00:00:00 2001 From: Debot MacMini1 Date: Sun, 24 May 2026 09:41:08 -0700 Subject: [PATCH 01/13] nodemon: add JVM heap metrics via hsperfdata (no hooking) --- cmd/zxporter-nodemon/main.go | 8 +- docs/jvm-metrics.md | 51 ++++ .../zxporter-nodemon/templates/daemonset.yaml | 9 + helm-chart/zxporter-nodemon/values.yaml | 5 + internal/nodemon/jvm_collector.go | 180 +++++++++++++ internal/nodemon/jvm_discovery.go | 216 +++++++++++++++ internal/nodemon/jvm_discovery_test.go | 246 ++++++++++++++++++ internal/nodemon/jvm_handler.go | 86 ++++++ internal/nodemon/jvm_hsperfdata.go | 146 +++++++++++ internal/nodemon/jvm_types.go | 37 +++ internal/nodemon/server.go | 10 +- 11 files changed, 989 insertions(+), 5 deletions(-) create mode 100644 docs/jvm-metrics.md create mode 100644 internal/nodemon/jvm_collector.go create mode 100644 internal/nodemon/jvm_discovery.go create mode 100644 internal/nodemon/jvm_discovery_test.go create mode 100644 internal/nodemon/jvm_handler.go create mode 100644 internal/nodemon/jvm_hsperfdata.go create mode 100644 internal/nodemon/jvm_types.go diff --git a/cmd/zxporter-nodemon/main.go b/cmd/zxporter-nodemon/main.go index 40f63ec6..1f840e1b 100644 --- a/cmd/zxporter-nodemon/main.go +++ b/cmd/zxporter-nodemon/main.go @@ -79,9 +79,13 @@ func main() { // Create exporter exporter := nodemon.NewExporter(cfg, dynClient, scraper, mapper, logger) - // Create HTTP handler and server + // Create JVM collector + jvmCollector := nodemon.NewJVMCollector(cfg.NodeName, dynClient, logger) + + // Create HTTP handlers and server containerMetricsHandler := nodemon.NewContainerMetricsHandler(exporter, logger) - mux := nodemon.NewServerMux(containerMetricsHandler) + jvmMetricsHandler := nodemon.NewJVMMetricsHandler(jvmCollector, logger) + mux := nodemon.NewServerMux(containerMetricsHandler, jvmMetricsHandler) server := &http.Server{ Addr: fmt.Sprintf(":%d", cfg.HTTPListenPort), diff --git a/docs/jvm-metrics.md b/docs/jvm-metrics.md new file mode 100644 index 00000000..fb581c55 --- /dev/null +++ b/docs/jvm-metrics.md @@ -0,0 +1,51 @@ +# JVM metrics (zxporter-nodemon) — hsperfdata (no hooking) + +This spike adds an **optional** endpoint to zxporter-nodemon: + +- `GET /container/jvm-metrics` + +It discovers Java processes on the node via `/proc` and reads HotSpot's `hsperfdata` file via: + +- `/proc//root/tmp/hsperfdata_*/` + +This requires: +- `hostPID: true` +- running zxporter-nodemon as **UID 0** (root) to read those paths + +No attach/JMX/javaagent/async-profiler is used. + +## Enable via Helm (chart: zxporter-nodemon) + +Set: + +```yaml +jvmMetrics: + enabled: true +``` + +When enabled, the DaemonSet will: +- set `spec.hostPID: true` +- set `runAsUser: 0` and `runAsNonRoot: false` for the `zxporter-nodemon` container + +## Local validation (kind) + +Assumes you have a kind cluster and a Java workload running. + +If using the `kind-jvm-spike` cluster from this spike: + +```bash +kubectl config use-context kind-jvm-spike +kubectl -n jvm-spike get pods +``` + +Then port-forward zxporter-nodemon (namespace depends on your install) and hit: + +```bash +curl -sS localhost:6061/container/jvm-metrics | jq '.[0]' +``` + +Expected fields include: +- `heap_used_bytes`, `heap_size_bytes`, `heap_max_size_bytes` +- `gc_time_seconds_total` (map) +- `safepoint_time_seconds_total`, `safepoint_sync_time_seconds_total` +- `flags_extracted` (best-effort from `/proc//cmdline`) diff --git a/helm-chart/zxporter-nodemon/templates/daemonset.yaml b/helm-chart/zxporter-nodemon/templates/daemonset.yaml index a3193d79..e6dd2891 100644 --- a/helm-chart/zxporter-nodemon/templates/daemonset.yaml +++ b/helm-chart/zxporter-nodemon/templates/daemonset.yaml @@ -25,6 +25,9 @@ spec: labels: {{- include "zxporter-nodemon.selectorLabels" . | nindent 8 }} spec: + {{- if .Values.jvmMetrics.enabled }} + hostPID: true + {{- end }} {{- with .Values.imagePullSecrets }} imagePullSecrets: {{- toYaml . | nindent 8 }} @@ -69,7 +72,13 @@ spec: containers: - name: zxporter-nodemon securityContext: + {{- if .Values.jvmMetrics.enabled }} + runAsNonRoot: false + runAsUser: 0 + readOnlyRootFilesystem: true + {{- else }} {{- toYaml .Values.gpuMetricsExporter.securityContext | nindent 12 }} + {{- end }} image: "{{ .Values.gpuMetricsExporter.image.repository }}:{{ .Values.gpuMetricsExporter.image.tag | default .Chart.AppVersion }}" imagePullPolicy: {{ .Values.gpuMetricsExporter.image.pullPolicy }} ports: diff --git a/helm-chart/zxporter-nodemon/values.yaml b/helm-chart/zxporter-nodemon/values.yaml index 88b3cd66..7abe0aa0 100644 --- a/helm-chart/zxporter-nodemon/values.yaml +++ b/helm-chart/zxporter-nodemon/values.yaml @@ -42,6 +42,11 @@ gpuMetricsExporter: rbac: clusterWide: true +# jvmMetrics -- Enable JVM metrics collection via hsperfdata. +# NOTE: requires hostPID + running nodemon as UID 0 to read /proc//root/tmp/hsperfdata_*/ +jvmMetrics: + enabled: false + dcgmExporter: enabled: true image: diff --git a/internal/nodemon/jvm_collector.go b/internal/nodemon/jvm_collector.go new file mode 100644 index 00000000..95078e4e --- /dev/null +++ b/internal/nodemon/jvm_collector.go @@ -0,0 +1,180 @@ +package nodemon + +import ( + "context" + "fmt" + "strings" + "time" + + "github.com/go-logr/logr" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/dynamic" +) + +type containerInfo struct { + Pod string + Namespace string + Container string +} + +// JVMCollector collects JVM metrics from hsperfdata files via /proc. +// Requires the pod to run with hostPID: true and as UID 0 to read +// /proc//root/tmp/hsperfdata_*/ for other containers. +type JVMCollector struct { + nodeName string + dynClient dynamic.Interface + procRoot string + log logr.Logger +} + +// NewJVMCollector creates a JVMCollector. procRoot defaults to "/proc". +func NewJVMCollector(nodeName string, dynClient dynamic.Interface, log logr.Logger) *JVMCollector { + return &JVMCollector{ + nodeName: nodeName, + dynClient: dynClient, + procRoot: "/proc", + log: log.WithName("jvm-collector"), + } +} + +// QueryJVMMetrics returns JVM metrics for all discovered Java containers on this node. +func (c *JVMCollector) QueryJVMMetrics(ctx context.Context) ([]JVMMetric, error) { + containerMap, err := c.buildContainerMap(ctx) + if err != nil { + // Non-fatal: continue with empty map; pod/namespace/container fields will be blank. + c.log.Error(err, "Failed to build container map; pod metadata will be missing") + containerMap = map[string]containerInfo{} + } + + procs, err := discoverJavaProcesses(c.procRoot) + if err != nil { + return nil, fmt.Errorf("discovering java processes: %w", err) + } + + metrics := make([]JVMMetric, 0, len(procs)) + for _, proc := range procs { + counters, err := readHsperfdata(proc.HsperfDataPath) + if err != nil { + c.log.Error(err, "Failed to read hsperfdata", "pid", proc.PidHost, "path", proc.HsperfDataPath) + continue + } + info := containerMap[proc.ContainerID] + metrics = append(metrics, buildJVMMetric(counters, proc, info, c.nodeName)) + } + + return metrics, nil +} + +// buildContainerMap lists running pods on this node and returns a map of +// containerID (hex, no scheme prefix) → containerInfo. +func (c *JVMCollector) buildContainerMap(ctx context.Context) (map[string]containerInfo, error) { + fieldSelector := "status.phase=Running" + if c.nodeName != "" { + fieldSelector = fmt.Sprintf("%s,spec.nodeName=%s", fieldSelector, c.nodeName) + } + + podList, err := c.dynClient.Resource(podGVR). + Namespace(""). + List(ctx, metav1.ListOptions{FieldSelector: fieldSelector}) + if err != nil { + return nil, fmt.Errorf("listing pods: %w", err) + } + + result := make(map[string]containerInfo, len(podList.Items)) + for _, item := range podList.Items { + var pod corev1.Pod + if err := runtime.DefaultUnstructuredConverter.FromUnstructured(item.Object, &pod); err != nil { + continue + } + for _, cs := range pod.Status.ContainerStatuses { + id := stripContainerIDScheme(cs.ContainerID) + if id == "" { + continue + } + result[id] = containerInfo{ + Pod: pod.Name, + Namespace: pod.Namespace, + Container: cs.Name, + } + } + } + return result, nil +} + +// stripContainerIDScheme strips the URL scheme (e.g., "containerd://") from a container ID. +func stripContainerIDScheme(raw string) string { + if i := strings.LastIndex(raw, "://"); i >= 0 { + return raw[i+3:] + } + return raw +} + +// buildJVMMetric assembles a JVMMetric from parsed hsperfdata counters and process metadata. +func buildJVMMetric(counters map[string]any, proc JavaProcess, info containerInfo, nodeName string) JVMMetric { + m := JVMMetric{ + NodeName: nodeName, + Pod: info.Pod, + Namespace: info.Namespace, + Container: info.Container, + ContainerID: proc.ContainerID, + PidHost: proc.PidHost, + PidNS: proc.PidNS, + RawCmdline: proc.CmdLine, + Timestamp: time.Now().UTC(), + } + + m.JavaCommand, _ = hsStr(counters, "sun.rt.javaCommand") + m.JavaVersion, _ = hsStr(counters, "java.property.java.version") + + // Heap capacity: try aggregate heap counters first, then sum generation counters. + if cap, ok := hsInt(counters, "sun.gc.heap.capacity"); ok { + m.HeapSizeBytes = cap + } else { + gen0, _ := hsInt(counters, "sun.gc.generation.0.capacity") + gen1, _ := hsInt(counters, "sun.gc.generation.1.capacity") + m.HeapSizeBytes = gen0 + gen1 + } + + if used, ok := hsInt(counters, "sun.gc.heap.used"); ok { + m.HeapUsedBytes = used + } else { + gen0, _ := hsInt(counters, "sun.gc.generation.0.used") + gen1, _ := hsInt(counters, "sun.gc.generation.1.used") + m.HeapUsedBytes = gen0 + gen1 + } + + if maxCap, ok := hsInt(counters, "sun.gc.heap.maxCapacity"); ok { + m.HeapMaxSizeBytes = maxCap + } else { + gen0, _ := hsInt(counters, "sun.gc.generation.0.maxCapacity") + gen1, _ := hsInt(counters, "sun.gc.generation.1.maxCapacity") + m.HeapMaxSizeBytes = gen0 + gen1 + } + + // Convert GC time ticks to seconds using the JVM's high-resolution timer frequency. + freq, _ := hsInt(counters, "sun.os.hrt.frequency") + if freq <= 0 { + freq = 1_000_000_000 // nanosecond fallback + } + + m.GCTimeSecondsTotal = make(map[string]float64) + for i := 0; i < 8; i++ { + name, ok := hsStr(counters, fmt.Sprintf("sun.gc.collector.%d.name", i)) + if !ok || name == "" { + break + } + ticks, _ := hsInt(counters, fmt.Sprintf("sun.gc.collector.%d.time", i)) + m.GCTimeSecondsTotal[name] = float64(ticks) / float64(freq) + } + + safeTicks, _ := hsInt(counters, "sun.rt.safepointTime") + syncTicks, _ := hsInt(counters, "sun.rt.safepointSyncTime") + m.SafepointTimeSecondsTotal = float64(safeTicks) / float64(freq) + m.SafepointSyncTimeSecondsTotal = float64(syncTicks) / float64(freq) + + m.FlagsExtracted = ParseJVMFlags(proc.CmdLine) + + return m +} diff --git a/internal/nodemon/jvm_discovery.go b/internal/nodemon/jvm_discovery.go new file mode 100644 index 00000000..da2a4cde --- /dev/null +++ b/internal/nodemon/jvm_discovery.go @@ -0,0 +1,216 @@ +package nodemon + +import ( + "bufio" + "bytes" + "fmt" + "os" + "path/filepath" + "regexp" + "strconv" + "strings" +) + +var ( + // containerIDRe matches 64-char container IDs in cgroup paths for containerd, docker, and bare containerd. + containerIDRe = regexp.MustCompile(`(?:cri-containerd|docker|containerd)-([a-f0-9]{64})\.scope`) + // crioRe matches CRI-O container IDs. + crioRe = regexp.MustCompile(`crio-([a-f0-9]{64})\.scope`) +) + +// JavaProcess holds info about a discovered Java process running inside a Kubernetes container. +type JavaProcess struct { + PidHost int + PidNS int + ContainerID string + CmdLine string + HsperfDataPath string +} + +// discoverJavaProcesses scans procRoot (usually "/proc") for Java processes +// running inside Kubernetes container cgroups. +// Returns nil, nil if procRoot does not exist (non-Linux hosts). +func discoverJavaProcesses(procRoot string) ([]JavaProcess, error) { + entries, err := os.ReadDir(procRoot) + if err != nil { + if os.IsNotExist(err) { + return nil, nil + } + return nil, fmt.Errorf("reading %s: %w", procRoot, err) + } + + var procs []JavaProcess + for _, e := range entries { + if !e.IsDir() { + continue + } + pid, err := strconv.Atoi(e.Name()) + if err != nil { + continue + } + + pidDir := filepath.Join(procRoot, e.Name()) + + // Read null-separated cmdline and convert to space-separated for matching. + rawCmdline := readProcFile(filepath.Join(pidDir, "cmdline")) + cmdline := string(bytes.ReplaceAll([]byte(rawCmdline), []byte{0}, []byte{' '})) + comm := strings.TrimSpace(readProcFile(filepath.Join(pidDir, "comm"))) + + if !isJavaProcess(comm, cmdline) { + continue + } + + cgroupContent := readProcFile(filepath.Join(pidDir, "cgroup")) + containerID, ok := parseCgroupContainerID(cgroupContent) + if !ok { + continue + } + + statusContent := readProcFile(filepath.Join(pidDir, "status")) + nsPid, ok := parseNSpid(statusContent) + if !ok { + continue + } + + hsperfPath := findHsperfdata(pidDir, nsPid) + if hsperfPath == "" { + continue + } + + procs = append(procs, JavaProcess{ + PidHost: pid, + PidNS: nsPid, + ContainerID: containerID, + CmdLine: strings.TrimSpace(cmdline), + HsperfDataPath: hsperfPath, + }) + } + + return procs, nil +} + +// isJavaProcess returns true if the process comm is "java" or its first cmdline +// argument is a binary named "java". +func isJavaProcess(comm, cmdline string) bool { + if comm == "java" { + return true + } + parts := strings.Fields(cmdline) + if len(parts) == 0 { + return false + } + return filepath.Base(parts[0]) == "java" +} + +// parseCgroupContainerID extracts a 64-char hex container ID from cgroup file content. +func parseCgroupContainerID(content string) (string, bool) { + scanner := bufio.NewScanner(strings.NewReader(content)) + for scanner.Scan() { + line := scanner.Text() + if m := containerIDRe.FindStringSubmatch(line); len(m) == 2 { + return m[1], true + } + if m := crioRe.FindStringSubmatch(line); len(m) == 2 { + return m[1], true + } + } + return "", false +} + +// parseNSpid extracts the innermost (last) NSpid value from /proc//status content. +// The last value is the PID as seen from inside the container's pid namespace. +func parseNSpid(content string) (int, bool) { + scanner := bufio.NewScanner(strings.NewReader(content)) + for scanner.Scan() { + line := scanner.Text() + if !strings.HasPrefix(line, "NSpid:") { + continue + } + fields := strings.Fields(line) + if len(fields) < 2 { + return 0, false + } + v, err := strconv.Atoi(fields[len(fields)-1]) + if err != nil { + return 0, false + } + return v, true + } + return 0, false +} + +// findHsperfdata returns the hsperfdata path for a Java process via its /proc//root +// overlay: /proc//root/tmp/hsperfdata_*/ +// Returns "" if no file is found. +func findHsperfdata(pidDir string, nsPid int) string { + pattern := filepath.Join(pidDir, "root", "tmp", "hsperfdata_*", strconv.Itoa(nsPid)) + matches, err := filepath.Glob(pattern) + if err != nil || len(matches) == 0 { + return "" + } + return matches[0] +} + +// readProcFile reads a /proc pseudo-file, returning "" on any error. +// Errors are expected and normal (process may disappear between readdir and read). +func readProcFile(path string) string { + b, err := os.ReadFile(path) + if err != nil { + return "" + } + return string(b) +} + +// ParseJVMFlags parses JVM memory and container-awareness flags from a process cmdline string. +func ParseJVMFlags(cmdline string) JVMFlagsExtracted { + var flags JVMFlagsExtracted + for _, token := range strings.Fields(cmdline) { + switch { + case strings.HasPrefix(token, "-Xms"): + if v, err := parseMemSize(token[4:]); err == nil { + flags.XmsBytes = &v + } + case strings.HasPrefix(token, "-Xmx"): + if v, err := parseMemSize(token[4:]); err == nil { + flags.XmxBytes = &v + } + case strings.HasPrefix(token, "-XX:MaxRAMPercentage="): + s := token[len("-XX:MaxRAMPercentage="):] + if v, err := strconv.ParseFloat(s, 64); err == nil { + flags.MaxRamPercentage = &v + } + case token == "-XX:+UseContainerSupport": + t := true + flags.UseContainerSupport = &t + case token == "-XX:-UseContainerSupport": + f := false + flags.UseContainerSupport = &f + } + } + return flags +} + +// parseMemSize parses JVM memory size strings: "256m", "4g", "512k", or bare bytes. +func parseMemSize(s string) (int64, error) { + if s == "" { + return 0, fmt.Errorf("empty size string") + } + lower := strings.ToLower(strings.TrimSpace(s)) + var multiplier int64 = 1 + switch lower[len(lower)-1] { + case 'k': + multiplier = 1024 + lower = lower[:len(lower)-1] + case 'm': + multiplier = 1024 * 1024 + lower = lower[:len(lower)-1] + case 'g': + multiplier = 1024 * 1024 * 1024 + lower = lower[:len(lower)-1] + } + v, err := strconv.ParseInt(lower, 10, 64) + if err != nil { + return 0, fmt.Errorf("invalid memory size %q: %w", s, err) + } + return v * multiplier, nil +} diff --git a/internal/nodemon/jvm_discovery_test.go b/internal/nodemon/jvm_discovery_test.go new file mode 100644 index 00000000..a04f10b5 --- /dev/null +++ b/internal/nodemon/jvm_discovery_test.go @@ -0,0 +1,246 @@ +package nodemon + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestParseCgroupContainerID(t *testing.T) { + const id64 = "abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789" + tests := []struct { + name string + content string + wantID string + wantOK bool + }{ + { + name: "containerd cgroupv1", + content: "12:cpuset:/kubepods/besteffort/pod1234/cri-containerd-" + id64 + ".scope\n" + + "0::/kubepods/besteffort/pod1234/cri-containerd-" + id64 + ".scope\n", + wantID: id64, + wantOK: true, + }, + { + name: "docker scope", + content: "12:cpuset:/kubepods/docker-" + id64 + ".scope\n", + wantID: id64, + wantOK: true, + }, + { + name: "bare containerd scope", + content: "12:cpuset:/kubepods/containerd-" + id64 + ".scope\n", + wantID: id64, + wantOK: true, + }, + { + name: "crio scope", + content: "12:cpuset:/kubepods/crio-" + id64 + ".scope\n", + wantID: id64, + wantOK: true, + }, + { + name: "non-k8s container", + content: "12:cpuset:/user.slice\n0::/user.slice/user-1000.slice\n", + wantID: "", + wantOK: false, + }, + { + name: "empty content", + content: "", + wantID: "", + wantOK: false, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + id, ok := parseCgroupContainerID(tc.content) + assert.Equal(t, tc.wantOK, ok) + if tc.wantOK { + assert.Equal(t, tc.wantID, id) + } + }) + } +} + +func TestParseNSpid(t *testing.T) { + tests := []struct { + name string + content string + wantPid int + wantOK bool + }{ + { + name: "nested namespace - takes last value", + content: "Name:\tjava\nPid:\t12345\nNSpid:\t12345\t67\nTgid:\t12345\n", + wantPid: 67, + wantOK: true, + }, + { + name: "single namespace", + content: "Name:\tjava\nNSpid:\t42\nTgid:\t42\n", + wantPid: 42, + wantOK: true, + }, + { + name: "three levels of nesting", + content: "NSpid:\t1000\t500\t1\n", + wantPid: 1, + wantOK: true, + }, + { + name: "missing NSpid line", + content: "Name:\tjava\nPid:\t12345\n", + wantPid: 0, + wantOK: false, + }, + { + name: "empty content", + content: "", + wantPid: 0, + wantOK: false, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + pid, ok := parseNSpid(tc.content) + assert.Equal(t, tc.wantOK, ok) + if tc.wantOK { + assert.Equal(t, tc.wantPid, pid) + } + }) + } +} + +func TestParseJVMFlags(t *testing.T) { + tests := []struct { + name string + cmdline string + xms *int64 + xmx *int64 + maxRam *float64 + useCS *bool + }{ + { + name: "basic heap flags megabytes and gigabytes", + cmdline: "java -Xms256m -Xmx2g -jar app.jar", + xms: jvmIntPtr(256 * 1024 * 1024), + xmx: jvmIntPtr(2 * 1024 * 1024 * 1024), + }, + { + name: "container support enabled with MaxRAMPercentage", + cmdline: "java -XX:MaxRAMPercentage=75.0 -XX:+UseContainerSupport -jar app.jar", + maxRam: jvmF64Ptr(75.0), + useCS: jvmBoolPtr(true), + }, + { + name: "container support disabled", + cmdline: "java -XX:-UseContainerSupport -jar app.jar", + useCS: jvmBoolPtr(false), + }, + { + name: "kilobytes", + cmdline: "java -Xms512k -Xmx4g", + xms: jvmIntPtr(512 * 1024), + xmx: jvmIntPtr(4 * 1024 * 1024 * 1024), + }, + { + name: "uppercase suffix", + cmdline: "java -Xms128M -Xmx1G", + xms: jvmIntPtr(128 * 1024 * 1024), + xmx: jvmIntPtr(1 * 1024 * 1024 * 1024), + }, + { + name: "no JVM flags", + cmdline: "java -jar app.jar", + }, + { + name: "full path java binary", + cmdline: "/usr/lib/jvm/java-21/bin/java -Xmx512m -jar app.jar", + xmx: jvmIntPtr(512 * 1024 * 1024), + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + flags := ParseJVMFlags(tc.cmdline) + assert.Equal(t, tc.xms, flags.XmsBytes, "XmsBytes") + assert.Equal(t, tc.xmx, flags.XmxBytes, "XmxBytes") + assert.Equal(t, tc.maxRam, flags.MaxRamPercentage, "MaxRamPercentage") + assert.Equal(t, tc.useCS, flags.UseContainerSupport, "UseContainerSupport") + }) + } +} + +func TestParseMemSize(t *testing.T) { + tests := []struct { + input string + want int64 + wantErr bool + }{ + {"256m", 256 * 1024 * 1024, false}, + {"2g", 2 * 1024 * 1024 * 1024, false}, + {"512k", 512 * 1024, false}, + {"1024", 1024, false}, + {"4G", 4 * 1024 * 1024 * 1024, false}, + {"128M", 128 * 1024 * 1024, false}, + {"1K", 1024, false}, + {"", 0, true}, + {"abc", 0, true}, + {"1.5g", 0, true}, + } + + for _, tc := range tests { + t.Run(tc.input, func(t *testing.T) { + v, err := parseMemSize(tc.input) + if tc.wantErr { + require.Error(t, err) + } else { + require.NoError(t, err) + assert.Equal(t, tc.want, v) + } + }) + } +} + +func TestIsJavaProcess(t *testing.T) { + tests := []struct { + comm string + cmdline string + want bool + }{ + {"java", "", true}, + {"java", "/usr/bin/java -jar app.jar", true}, + {"", "/usr/bin/java -jar app.jar", true}, + {"", "/usr/lib/jvm/java-21/bin/java -Xmx512m", true}, + {"python3", "/usr/bin/python3 app.py", false}, + {"", "", false}, + {"javac", "/usr/bin/javac Main.java", false}, + } + + for _, tc := range tests { + t.Run(tc.comm+"|"+tc.cmdline, func(t *testing.T) { + assert.Equal(t, tc.want, isJavaProcess(tc.comm, tc.cmdline)) + }) + } +} + +func TestStripContainerIDScheme(t *testing.T) { + assert.Equal(t, "abc123", stripContainerIDScheme("containerd://abc123")) + assert.Equal(t, "abc123", stripContainerIDScheme("docker://abc123")) + assert.Equal(t, "abc123", stripContainerIDScheme("abc123")) + assert.Equal(t, "", stripContainerIDScheme("")) + assert.Equal(t, "abc123", stripContainerIDScheme("cri-o://abc123")) +} + +// jvmIntPtr returns a pointer to the given int64, for use in test assertions. +func jvmIntPtr(v int64) *int64 { return &v } + +// jvmF64Ptr returns a pointer to the given float64, for use in test assertions. +func jvmF64Ptr(v float64) *float64 { return &v } + +// jvmBoolPtr returns a pointer to the given bool, for use in test assertions. +func jvmBoolPtr(v bool) *bool { return &v } diff --git a/internal/nodemon/jvm_handler.go b/internal/nodemon/jvm_handler.go new file mode 100644 index 00000000..c24aca04 --- /dev/null +++ b/internal/nodemon/jvm_handler.go @@ -0,0 +1,86 @@ +package nodemon + +import ( + "context" + "encoding/json" + "net/http" + + "github.com/go-logr/logr" +) + +// JVMMetricsQuerier provides on-demand JVM metrics. +type JVMMetricsQuerier interface { + QueryJVMMetrics(ctx context.Context) ([]JVMMetric, error) +} + +type jvmMetricsFilter struct { + Container string + Pod string + Namespace string + Node string +} + +func (f jvmMetricsFilter) matches(m *JVMMetric) bool { + if f.Container != "" && m.Container != f.Container { + return false + } + if f.Pod != "" && m.Pod != f.Pod { + return false + } + if f.Namespace != "" && m.Namespace != f.Namespace { + return false + } + if f.Node != "" && m.NodeName != f.Node { + return false + } + return true +} + +type jvmMetricsHandler struct { + querier JVMMetricsQuerier + log logr.Logger +} + +// NewJVMMetricsHandler creates an HTTP handler for GET /container/jvm-metrics. +// Supports ?container=, ?pod=, ?namespace=, ?node= query filters. +func NewJVMMetricsHandler(querier JVMMetricsQuerier, log logr.Logger) http.Handler { + return &jvmMetricsHandler{ + querier: querier, + log: log.WithName("jvm-metrics-handler"), + } +} + +func (h *jvmMetricsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + http.Error(w, "method not allowed", http.StatusMethodNotAllowed) + return + } + + filter := jvmMetricsFilter{ + Container: r.URL.Query().Get("container"), + Pod: r.URL.Query().Get("pod"), + Namespace: r.URL.Query().Get("namespace"), + Node: r.URL.Query().Get("node"), + } + + metrics, err := h.querier.QueryJVMMetrics(r.Context()) + if err != nil { + h.log.Error(err, "Failed to query JVM metrics") + http.Error(w, "internal server error", http.StatusInternalServerError) + return + } + + result := make([]JVMMetric, 0, len(metrics)) + for i := range metrics { + if filter.matches(&metrics[i]) { + result = append(result, metrics[i]) + } + } + + w.Header().Set("Content-Type", "application/json") + enc := json.NewEncoder(w) + enc.SetIndent("", " ") + if err := enc.Encode(result); err != nil { + h.log.Error(err, "Failed to encode JVM metrics response") + } +} diff --git a/internal/nodemon/jvm_hsperfdata.go b/internal/nodemon/jvm_hsperfdata.go new file mode 100644 index 00000000..5b8f17d3 --- /dev/null +++ b/internal/nodemon/jvm_hsperfdata.go @@ -0,0 +1,146 @@ +package nodemon + +import ( + "encoding/binary" + "fmt" + "os" +) + +const perfMagic uint32 = 0xcafec0c0 + +// readHsperfdata reads a JVM hsperfdata binary file and returns counters as a +// map of name → value (int64 for numeric, string for byte-array counters). +func readHsperfdata(path string) (map[string]any, error) { + data, err := os.ReadFile(path) + if err != nil { + return nil, err + } + return parseHsperfdata(data) +} + +// parseHsperfdata parses the raw bytes of an hsperfdata file. +// +// Format reference: hotspot/share/services/perfMemory.hpp (OpenJDK). +// Prologue (32 bytes): +// +// [0:4] magic (jint, native byte order encodes endianness) +// [4] byte_order +// [5] major_version +// [6] minor_version +// [7] accessible +// [8:12] used (jint) +// [12:16] overflow (jint) +// [16:24] mod_time_stamp (jlong) +// [24:28] entry_offset (jint) +// [28:32] num_entries (jint) +// +// Each entry header (20 bytes, relative to entry start): +// +// [0:4] entry_length +// [4:8] name_offset (from entry start) +// [8:12] vector_length (0 = scalar) +// [12] data_type ('J'=long, 'B'=byte/string, 'I'=int) +// [13] flags +// [14] data_units +// [15] data_variability +// [16:20] data_offset (from entry start) +func parseHsperfdata(data []byte) (map[string]any, error) { + if len(data) < 32 { + return nil, fmt.Errorf("hsperfdata: file too short (%d bytes)", len(data)) + } + + // Detect byte order from magic. + var order binary.ByteOrder + if binary.BigEndian.Uint32(data[0:4]) == perfMagic { + order = binary.BigEndian + } else if binary.LittleEndian.Uint32(data[0:4]) == perfMagic { + order = binary.LittleEndian + } else { + return nil, fmt.Errorf("hsperfdata: invalid magic bytes") + } + + entryOffset := int(order.Uint32(data[24:28])) + numEntries := int(order.Uint32(data[28:32])) + + result := make(map[string]any, numEntries) + + offset := entryOffset + for i := 0; i < numEntries; i++ { + if offset+20 > len(data) { + break + } + entryStart := offset + entryLength := int(order.Uint32(data[offset : offset+4])) + if entryLength <= 0 || entryStart+entryLength > len(data) { + break + } + + nameOffset := int(order.Uint32(data[offset+4 : offset+8])) + vectorLength := int(order.Uint32(data[offset+8 : offset+12])) + dataType := data[offset+12] + dataOffset := int(order.Uint32(data[offset+16 : offset+20])) + + // Read null-terminated name. + nameStart := entryStart + nameOffset + nameEnd := nameStart + for nameEnd < len(data) && data[nameEnd] != 0 { + nameEnd++ + } + if nameEnd <= nameStart { + offset = entryStart + entryLength + continue + } + name := string(data[nameStart:nameEnd]) + dataStart := entryStart + dataOffset + + switch dataType { + case 'J': // long (int64) + if dataStart+8 <= len(data) { + result[name] = int64(order.Uint64(data[dataStart : dataStart+8])) + } + case 'B': // byte scalar or byte-array string + if vectorLength > 0 { + // String stored as a null-terminated byte array. + maxEnd := dataStart + vectorLength + if maxEnd > len(data) { + maxEnd = len(data) + } + end := dataStart + for end < maxEnd && data[end] != 0 { + end++ + } + result[name] = string(data[dataStart:end]) + } else if dataStart < len(data) { + result[name] = int64(data[dataStart]) + } + case 'I': // int (int32) + if dataStart+4 <= len(data) { + result[name] = int64(int32(order.Uint32(data[dataStart : dataStart+4]))) + } + } + + offset = entryStart + entryLength + } + + return result, nil +} + +// hsInt looks up an int64 counter. +func hsInt(m map[string]any, key string) (int64, bool) { + v, ok := m[key] + if !ok { + return 0, false + } + i, ok := v.(int64) + return i, ok +} + +// hsStr looks up a string counter. +func hsStr(m map[string]any, key string) (string, bool) { + v, ok := m[key] + if !ok { + return "", false + } + s, ok := v.(string) + return s, ok +} diff --git a/internal/nodemon/jvm_types.go b/internal/nodemon/jvm_types.go new file mode 100644 index 00000000..62fd388b --- /dev/null +++ b/internal/nodemon/jvm_types.go @@ -0,0 +1,37 @@ +package nodemon + +import "time" + +// JVMMetric holds per-container JVM metrics extracted from hsperfdata. +type JVMMetric struct { + NodeName string `json:"node_name"` + Pod string `json:"pod"` + Namespace string `json:"namespace"` + Container string `json:"container"` + ContainerID string `json:"container_id"` + PidHost int `json:"pid_host"` + PidNS int `json:"pid_ns"` + + JavaCommand string `json:"java_command,omitempty"` + JavaVersion string `json:"java_version,omitempty"` + + HeapSizeBytes int64 `json:"heap_size_bytes"` + HeapUsedBytes int64 `json:"heap_used_bytes"` + HeapMaxSizeBytes int64 `json:"heap_max_size_bytes"` + + GCTimeSecondsTotal map[string]float64 `json:"gc_time_seconds_total"` + SafepointTimeSecondsTotal float64 `json:"safepoint_time_seconds_total"` + SafepointSyncTimeSecondsTotal float64 `json:"safepoint_sync_time_seconds_total"` + + FlagsExtracted JVMFlagsExtracted `json:"flags_extracted"` + RawCmdline string `json:"raw_cmdline,omitempty"` + Timestamp time.Time `json:"timestamp"` +} + +// JVMFlagsExtracted holds JVM flags parsed from the process cmdline. +type JVMFlagsExtracted struct { + XmsBytes *int64 `json:"xms_bytes,omitempty"` + XmxBytes *int64 `json:"xmx_bytes,omitempty"` + MaxRamPercentage *float64 `json:"max_ram_percentage,omitempty"` + UseContainerSupport *bool `json:"use_container_support,omitempty"` +} diff --git a/internal/nodemon/server.go b/internal/nodemon/server.go index 14a38cfc..02326f11 100644 --- a/internal/nodemon/server.go +++ b/internal/nodemon/server.go @@ -50,7 +50,7 @@ func NewExporter( } } -var dcgmPodGVR = schema.GroupVersionResource{Group: "", Version: "v1", Resource: "pods"} +var podGVR = schema.GroupVersionResource{Group: "", Version: "v1", Resource: "pods"} // QueryMetrics scrapes DCGM exporters on demand and returns mapped GPU metrics. func (e *Exporter) QueryMetrics(ctx context.Context) ([]GPUMetric, error) { @@ -92,7 +92,7 @@ func (e *Exporter) getDCGMUrls(ctx context.Context) ([]string, error) { fieldSelector = fmt.Sprintf("%s,spec.nodeName=%s", fieldSelector, e.cfg.NodeName) } - dcgmExporterList, err := e.dynamic.Resource(dcgmPodGVR). + dcgmExporterList, err := e.dynamic.Resource(podGVR). Namespace(""). List(ctx, metav1.ListOptions{ LabelSelector: e.cfg.DCGMLabels, @@ -120,7 +120,7 @@ func (e *Exporter) getDCGMUrls(ctx context.Context) ([]string, error) { } // NewServerMux creates the HTTP mux for the nodemon. -func NewServerMux(containerMetricsHandler http.Handler) *http.ServeMux { +func NewServerMux(containerMetricsHandler http.Handler, jvmMetricsHandler http.Handler) *http.ServeMux { mux := http.NewServeMux() mux.HandleFunc("/debug/pprof/", pprof.Index) @@ -138,5 +138,9 @@ func NewServerMux(containerMetricsHandler http.Handler) *http.ServeMux { mux.Handle("/container/metrics", containerMetricsHandler) } + if jvmMetricsHandler != nil { + mux.Handle("/container/jvm-metrics", jvmMetricsHandler) + } + return mux } From dc30b770cf07f785361233ad1b63e145d7dd5853 Mon Sep 17 00:00:00 2001 From: Debot MacMini1 Date: Sun, 24 May 2026 14:06:00 -0700 Subject: [PATCH 02/13] nodemon: bound jvm metrics scrape to avoid OOM --- internal/nodemon/jvm_collector.go | 6 +++++- internal/nodemon/jvm_discovery.go | 24 +++++++++++++++++------- internal/nodemon/jvm_handler.go | 14 +++++++++++++- internal/nodemon/jvm_hsperfdata.go | 11 ++++++++++- 4 files changed, 45 insertions(+), 10 deletions(-) diff --git a/internal/nodemon/jvm_collector.go b/internal/nodemon/jvm_collector.go index 95078e4e..0cd94f74 100644 --- a/internal/nodemon/jvm_collector.go +++ b/internal/nodemon/jvm_collector.go @@ -41,7 +41,11 @@ func NewJVMCollector(nodeName string, dynClient dynamic.Interface, log logr.Logg // QueryJVMMetrics returns JVM metrics for all discovered Java containers on this node. func (c *JVMCollector) QueryJVMMetrics(ctx context.Context) ([]JVMMetric, error) { - containerMap, err := c.buildContainerMap(ctx) + // Pod listing can be slow on some clusters/APIServer conditions. Bound it so + // JVM metrics scraping doesn't wedge the HTTP server. + ctxPods, cancelPods := context.WithTimeout(ctx, 500*time.Millisecond) + defer cancelPods() + containerMap, err := c.buildContainerMap(ctxPods) if err != nil { // Non-fatal: continue with empty map; pod/namespace/container fields will be blank. c.log.Error(err, "Failed to build container map; pod metadata will be missing") diff --git a/internal/nodemon/jvm_discovery.go b/internal/nodemon/jvm_discovery.go index da2a4cde..532d4a06 100644 --- a/internal/nodemon/jvm_discovery.go +++ b/internal/nodemon/jvm_discovery.go @@ -4,6 +4,7 @@ import ( "bufio" "bytes" "fmt" + "io" "os" "path/filepath" "regexp" @@ -51,15 +52,16 @@ func discoverJavaProcesses(procRoot string) ([]JavaProcess, error) { pidDir := filepath.Join(procRoot, e.Name()) - // Read null-separated cmdline and convert to space-separated for matching. - rawCmdline := readProcFile(filepath.Join(pidDir, "cmdline")) - cmdline := string(bytes.ReplaceAll([]byte(rawCmdline), []byte{0}, []byte{' '})) comm := strings.TrimSpace(readProcFile(filepath.Join(pidDir, "comm"))) - - if !isJavaProcess(comm, cmdline) { + // Fast path: most JVMs have comm == "java". Avoid reading cmdline for every PID. + if comm != "java" { continue } + // Read null-separated cmdline and convert to space-separated for parsing flags. + rawCmdline := readProcFile(filepath.Join(pidDir, "cmdline")) + cmdline := string(bytes.ReplaceAll([]byte(rawCmdline), []byte{0}, []byte{' '})) + cgroupContent := readProcFile(filepath.Join(pidDir, "cgroup")) containerID, ok := parseCgroupContainerID(cgroupContent) if !ok { @@ -151,10 +153,18 @@ func findHsperfdata(pidDir string, nsPid int) string { return matches[0] } -// readProcFile reads a /proc pseudo-file, returning "" on any error. +// readProcFile reads a /proc pseudo-file with a hard cap, returning "" on any error. // Errors are expected and normal (process may disappear between readdir and read). func readProcFile(path string) string { - b, err := os.ReadFile(path) + const maxProcBytes = 64 << 10 // 64KiB safety cap + + f, err := os.Open(path) + if err != nil { + return "" + } + defer f.Close() + + b, err := io.ReadAll(io.LimitReader(f, maxProcBytes)) if err != nil { return "" } diff --git a/internal/nodemon/jvm_handler.go b/internal/nodemon/jvm_handler.go index c24aca04..3c64233b 100644 --- a/internal/nodemon/jvm_handler.go +++ b/internal/nodemon/jvm_handler.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "net/http" + "time" "github.com/go-logr/logr" ) @@ -63,8 +64,19 @@ func (h *jvmMetricsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { Node: r.URL.Query().Get("node"), } - metrics, err := h.querier.QueryJVMMetrics(r.Context()) + // Hard cap to avoid stalling the HTTP server / probes. We'll make this smarter + // (cached snapshots) once we identify the slow path. + ctx, cancel := context.WithTimeout(r.Context(), 2500*time.Millisecond) + defer cancel() + + metrics, err := h.querier.QueryJVMMetrics(ctx) if err != nil { + if ctx.Err() != nil { + h.log.Error(ctx.Err(), "Timed out querying JVM metrics") + http.Error(w, "jvm metrics query timed out", http.StatusGatewayTimeout) + return + } + h.log.Error(err, "Failed to query JVM metrics") http.Error(w, "internal server error", http.StatusInternalServerError) return diff --git a/internal/nodemon/jvm_hsperfdata.go b/internal/nodemon/jvm_hsperfdata.go index 5b8f17d3..7b88b12d 100644 --- a/internal/nodemon/jvm_hsperfdata.go +++ b/internal/nodemon/jvm_hsperfdata.go @@ -3,6 +3,7 @@ package nodemon import ( "encoding/binary" "fmt" + "io" "os" ) @@ -11,7 +12,15 @@ const perfMagic uint32 = 0xcafec0c0 // readHsperfdata reads a JVM hsperfdata binary file and returns counters as a // map of name → value (int64 for numeric, string for byte-array counters). func readHsperfdata(path string) (map[string]any, error) { - data, err := os.ReadFile(path) + const maxHsperfBytes = 4 << 20 // 4MiB safety cap + + f, err := os.Open(path) + if err != nil { + return nil, err + } + defer f.Close() + + data, err := io.ReadAll(io.LimitReader(f, maxHsperfBytes)) if err != nil { return nil, err } From a8dc65ad9c242fd36fb95c3297e62fa96b9ae362 Mon Sep 17 00:00:00 2001 From: Debot MacMini1 Date: Sun, 24 May 2026 14:09:32 -0700 Subject: [PATCH 03/13] nodemon: add stage timing logs for jvm metrics query --- internal/nodemon/jvm_collector.go | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/internal/nodemon/jvm_collector.go b/internal/nodemon/jvm_collector.go index 0cd94f74..130062c0 100644 --- a/internal/nodemon/jvm_collector.go +++ b/internal/nodemon/jvm_collector.go @@ -45,28 +45,39 @@ func (c *JVMCollector) QueryJVMMetrics(ctx context.Context) ([]JVMMetric, error) // JVM metrics scraping doesn't wedge the HTTP server. ctxPods, cancelPods := context.WithTimeout(ctx, 500*time.Millisecond) defer cancelPods() + start := time.Now() + containerMap, err := c.buildContainerMap(ctxPods) if err != nil { // Non-fatal: continue with empty map; pod/namespace/container fields will be blank. c.log.Error(err, "Failed to build container map; pod metadata will be missing") containerMap = map[string]containerInfo{} } + c.log.V(1).Info("Built container map", "count", len(containerMap), "took", time.Since(start).String()) + start = time.Now() procs, err := discoverJavaProcesses(c.procRoot) if err != nil { return nil, fmt.Errorf("discovering java processes: %w", err) } + c.log.V(1).Info("Discovered java processes", "count", len(procs), "took", time.Since(start).String()) + start = time.Now() metrics := make([]JVMMetric, 0, len(procs)) for _, proc := range procs { + // Bound each read so a single bad proc file cant wedge the request. + readStart := time.Now() counters, err := readHsperfdata(proc.HsperfDataPath) if err != nil { c.log.Error(err, "Failed to read hsperfdata", "pid", proc.PidHost, "path", proc.HsperfDataPath) continue } + c.log.V(1).Info("Read hsperfdata", "pid", proc.PidHost, "counters", len(counters), "took", time.Since(readStart).String()) + info := containerMap[proc.ContainerID] metrics = append(metrics, buildJVMMetric(counters, proc, info, c.nodeName)) } + c.log.V(1).Info("Built JVM metrics", "count", len(metrics), "took", time.Since(start).String()) return metrics, nil } From da98d1c208ded13d6b5ab614412e90a564aced63 Mon Sep 17 00:00:00 2001 From: Debot MacMini1 Date: Sun, 24 May 2026 14:12:45 -0700 Subject: [PATCH 04/13] nodemon: log JVM scrape stages at info level --- internal/nodemon/jvm_collector.go | 8 ++++---- internal/nodemon/jvm_handler.go | 6 ++++++ 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/internal/nodemon/jvm_collector.go b/internal/nodemon/jvm_collector.go index 130062c0..af986aca 100644 --- a/internal/nodemon/jvm_collector.go +++ b/internal/nodemon/jvm_collector.go @@ -53,14 +53,14 @@ func (c *JVMCollector) QueryJVMMetrics(ctx context.Context) ([]JVMMetric, error) c.log.Error(err, "Failed to build container map; pod metadata will be missing") containerMap = map[string]containerInfo{} } - c.log.V(1).Info("Built container map", "count", len(containerMap), "took", time.Since(start).String()) + c.log.Info("Built container map", "count", len(containerMap), "took", time.Since(start).String()) start = time.Now() procs, err := discoverJavaProcesses(c.procRoot) if err != nil { return nil, fmt.Errorf("discovering java processes: %w", err) } - c.log.V(1).Info("Discovered java processes", "count", len(procs), "took", time.Since(start).String()) + c.log.Info("Discovered java processes", "count", len(procs), "took", time.Since(start).String()) start = time.Now() metrics := make([]JVMMetric, 0, len(procs)) @@ -72,12 +72,12 @@ func (c *JVMCollector) QueryJVMMetrics(ctx context.Context) ([]JVMMetric, error) c.log.Error(err, "Failed to read hsperfdata", "pid", proc.PidHost, "path", proc.HsperfDataPath) continue } - c.log.V(1).Info("Read hsperfdata", "pid", proc.PidHost, "counters", len(counters), "took", time.Since(readStart).String()) + c.log.Info("Read hsperfdata", "pid", proc.PidHost, "counters", len(counters), "took", time.Since(readStart).String()) info := containerMap[proc.ContainerID] metrics = append(metrics, buildJVMMetric(counters, proc, info, c.nodeName)) } - c.log.V(1).Info("Built JVM metrics", "count", len(metrics), "took", time.Since(start).String()) + c.log.Info("Built JVM metrics", "count", len(metrics), "took", time.Since(start).String()) return metrics, nil } diff --git a/internal/nodemon/jvm_handler.go b/internal/nodemon/jvm_handler.go index 3c64233b..b8c5f229 100644 --- a/internal/nodemon/jvm_handler.go +++ b/internal/nodemon/jvm_handler.go @@ -52,10 +52,16 @@ func NewJVMMetricsHandler(querier JVMMetricsQuerier, log logr.Logger) http.Handl } func (h *jvmMetricsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + start := time.Now() if r.Method != http.MethodGet { http.Error(w, "method not allowed", http.StatusMethodNotAllowed) return } + // Non-verbose log so we can see where requests stall. + h.log.Info("JVMMetrics request start", "path", r.URL.Path, "rawQuery", r.URL.RawQuery) + defer func() { + h.log.Info("JVMMetrics request end", "took", time.Since(start).String()) + }() filter := jvmMetricsFilter{ Container: r.URL.Query().Get("container"), From 0443ebc0ba6a04ca7ab77a3935c31f8944c90660 Mon Sep 17 00:00:00 2001 From: Debot MacMini1 Date: Sun, 24 May 2026 23:28:06 -0700 Subject: [PATCH 05/13] nodemon: log hsperfdata path+size before parse --- internal/nodemon/jvm_collector.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/internal/nodemon/jvm_collector.go b/internal/nodemon/jvm_collector.go index af986aca..efaaeb1d 100644 --- a/internal/nodemon/jvm_collector.go +++ b/internal/nodemon/jvm_collector.go @@ -3,6 +3,7 @@ package nodemon import ( "context" "fmt" + "os" "strings" "time" @@ -65,7 +66,13 @@ func (c *JVMCollector) QueryJVMMetrics(ctx context.Context) ([]JVMMetric, error) start = time.Now() metrics := make([]JVMMetric, 0, len(procs)) for _, proc := range procs { - // Bound each read so a single bad proc file cant wedge the request. + // Bound each read so a single bad proc file can’t wedge the request. + c.log.Info("Reading hsperfdata", "pid", proc.PidHost, "path", proc.HsperfDataPath) + if st, err := os.Stat(proc.HsperfDataPath); err == nil { + c.log.Info("hsperfdata stat", "pid", proc.PidHost, "sizeBytes", st.Size()) + } else { + c.log.Error(err, "hsperfdata stat failed", "pid", proc.PidHost, "path", proc.HsperfDataPath) + } readStart := time.Now() counters, err := readHsperfdata(proc.HsperfDataPath) if err != nil { From 6e7848e8ee959b81b14a17e4a80907cddd5ecd1a Mon Sep 17 00:00:00 2001 From: Debot MacMini1 Date: Sun, 24 May 2026 23:36:22 -0700 Subject: [PATCH 06/13] nodemon: harden hsperfdata parser bounds --- internal/nodemon/jvm_hsperfdata.go | 66 ++++++++++++++++++++++-------- 1 file changed, 50 insertions(+), 16 deletions(-) diff --git a/internal/nodemon/jvm_hsperfdata.go b/internal/nodemon/jvm_hsperfdata.go index 7b88b12d..973d2f17 100644 --- a/internal/nodemon/jvm_hsperfdata.go +++ b/internal/nodemon/jvm_hsperfdata.go @@ -70,8 +70,15 @@ func parseHsperfdata(data []byte) (map[string]any, error) { entryOffset := int(order.Uint32(data[24:28])) numEntries := int(order.Uint32(data[28:32])) + if entryOffset < 32 || entryOffset >= len(data) { + return nil, fmt.Errorf("hsperfdata: invalid entry_offset=%d len=%d", entryOffset, len(data)) + } + // Guard against corrupted headers. + if numEntries < 0 || numEntries > 100_000 { + return nil, fmt.Errorf("hsperfdata: unreasonable num_entries=%d", numEntries) + } - result := make(map[string]any, numEntries) + result := make(map[string]any, minInt(numEntries, 2048)) offset := entryOffset for i := 0; i < numEntries; i++ { @@ -80,8 +87,12 @@ func parseHsperfdata(data []byte) (map[string]any, error) { } entryStart := offset entryLength := int(order.Uint32(data[offset : offset+4])) - if entryLength <= 0 || entryStart+entryLength > len(data) { - break + if entryLength <= 0 { + return nil, fmt.Errorf("hsperfdata: invalid entry_length=%d at entry=%d", entryLength, i) + } + entryEnd := entryStart + entryLength + if entryEnd > len(data) { + return nil, fmt.Errorf("hsperfdata: entry beyond eof entry=%d end=%d len=%d", i, entryEnd, len(data)) } nameOffset := int(order.Uint32(data[offset+4 : offset+8])) @@ -89,51 +100,74 @@ func parseHsperfdata(data []byte) (map[string]any, error) { dataType := data[offset+12] dataOffset := int(order.Uint32(data[offset+16 : offset+20])) - // Read null-terminated name. + // Validate offsets are within the entry. + if nameOffset < 0 || nameOffset >= entryLength { + offset = entryEnd + continue + } + if dataOffset < 0 || dataOffset >= entryLength { + offset = entryEnd + continue + } + + // Read null-terminated name (bounded to this entry). nameStart := entryStart + nameOffset nameEnd := nameStart - for nameEnd < len(data) && data[nameEnd] != 0 { + for nameEnd < entryEnd && data[nameEnd] != 0 { nameEnd++ } if nameEnd <= nameStart { - offset = entryStart + entryLength + offset = entryEnd continue } name := string(data[nameStart:nameEnd]) + dataStart := entryStart + dataOffset switch dataType { case 'J': // long (int64) - if dataStart+8 <= len(data) { + if dataStart+8 <= entryEnd { result[name] = int64(order.Uint64(data[dataStart : dataStart+8])) } + case 'I': // int (int32) + if dataStart+4 <= entryEnd { + result[name] = int64(int32(order.Uint32(data[dataStart : dataStart+4]))) + } case 'B': // byte scalar or byte-array string if vectorLength > 0 { - // String stored as a null-terminated byte array. + // Cap vector length to avoid pathological allocations on corrupt data. + if vectorLength > 256<<10 { + vectorLength = 256 << 10 + } maxEnd := dataStart + vectorLength - if maxEnd > len(data) { - maxEnd = len(data) + if maxEnd > entryEnd { + maxEnd = entryEnd } end := dataStart for end < maxEnd && data[end] != 0 { end++ } result[name] = string(data[dataStart:end]) - } else if dataStart < len(data) { + } else if dataStart < entryEnd { result[name] = int64(data[dataStart]) } - case 'I': // int (int32) - if dataStart+4 <= len(data) { - result[name] = int64(int32(order.Uint32(data[dataStart : dataStart+4]))) - } + default: + // Unknown type: skip. } - offset = entryStart + entryLength + offset = entryEnd } return result, nil } +func minInt(a, b int) int { + if a < b { + return a + } + return b +} + // hsInt looks up an int64 counter. func hsInt(m map[string]any, key string) (int64, bool) { v, ok := m[key] From bf26cf4bbf6ceb19d64ff37de9ae5e79fde5e95f Mon Sep 17 00:00:00 2001 From: Debot MacMini1 Date: Sun, 24 May 2026 23:51:28 -0700 Subject: [PATCH 07/13] nodemon: parse hsperfdata header fields as big-endian --- internal/nodemon/jvm_hsperfdata.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/internal/nodemon/jvm_hsperfdata.go b/internal/nodemon/jvm_hsperfdata.go index 973d2f17..65780700 100644 --- a/internal/nodemon/jvm_hsperfdata.go +++ b/internal/nodemon/jvm_hsperfdata.go @@ -68,8 +68,12 @@ func parseHsperfdata(data []byte) (map[string]any, error) { return nil, fmt.Errorf("hsperfdata: invalid magic bytes") } - entryOffset := int(order.Uint32(data[24:28])) - numEntries := int(order.Uint32(data[28:32])) + // Note: hsperfdata stores its header fields in *Java byte order* (big-endian), + // even though the magic can be used to infer the platform endianness. + // In practice (and per coroot's implementation), entry_offset/num_entries are + // read as big-endian. + entryOffset := int(binary.BigEndian.Uint32(data[24:28])) + numEntries := int(binary.BigEndian.Uint32(data[28:32])) if entryOffset < 32 || entryOffset >= len(data) { return nil, fmt.Errorf("hsperfdata: invalid entry_offset=%d len=%d", entryOffset, len(data)) } From 89709878b5a50f8e4fcaa55b4fafbe6abcb1620c Mon Sep 17 00:00:00 2001 From: Debot MacMini1 Date: Sun, 24 May 2026 23:53:03 -0700 Subject: [PATCH 08/13] nodemon: parse hsperfdata header fields using detected endianness --- internal/nodemon/jvm_hsperfdata.go | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/internal/nodemon/jvm_hsperfdata.go b/internal/nodemon/jvm_hsperfdata.go index 65780700..973d2f17 100644 --- a/internal/nodemon/jvm_hsperfdata.go +++ b/internal/nodemon/jvm_hsperfdata.go @@ -68,12 +68,8 @@ func parseHsperfdata(data []byte) (map[string]any, error) { return nil, fmt.Errorf("hsperfdata: invalid magic bytes") } - // Note: hsperfdata stores its header fields in *Java byte order* (big-endian), - // even though the magic can be used to infer the platform endianness. - // In practice (and per coroot's implementation), entry_offset/num_entries are - // read as big-endian. - entryOffset := int(binary.BigEndian.Uint32(data[24:28])) - numEntries := int(binary.BigEndian.Uint32(data[28:32])) + entryOffset := int(order.Uint32(data[24:28])) + numEntries := int(order.Uint32(data[28:32])) if entryOffset < 32 || entryOffset >= len(data) { return nil, fmt.Errorf("hsperfdata: invalid entry_offset=%d len=%d", entryOffset, len(data)) } From a61319009f19d89c35d10fb04332ad5a9f4233b4 Mon Sep 17 00:00:00 2001 From: Debot MacMini1 Date: Mon, 25 May 2026 02:55:03 -0700 Subject: [PATCH 09/13] feat: add JVM metrics + flag sources via nodemon (no hooking) - Add nodemon /container/jvm-metrics endpoint backed by hsperfdata - Extract heap used/size/max + GC/safepoint counters - Extract flags from cmdline + JAVA_TOOL_OPTIONS/JDK_JAVA_OPTIONS/JAVA_OPTS with source attribution - Guard permissions behind jvmMetrics.enabled (hostPID + runAsUser=0 + SYS_PTRACE) - Integrate JVM metrics into container resource collector (mirrors GPU nodemon flow) - Add Kind workflow test covering JVM app matrix --- .github/workflows/jvm-metrics-kind-test.yml | 116 ++++++++++++ cmd/zxporter-nodemon/main.go | 4 +- .../zxporter-nodemon/templates/daemonset.yaml | 7 + helm-chart/zxporter-nodemon/values.yaml | 3 + internal/collector/container_metrics.go | 11 ++ .../collector/container_resource_collector.go | 72 ++++++- internal/collector/nodemon_jvm_client.go | 117 ++++++++++++ internal/nodemon/jvm_collector.go | 6 +- internal/nodemon/jvm_discovery.go | 178 +++++++++++++++++- internal/nodemon/jvm_discovery_test.go | 28 +++ internal/nodemon/jvm_hsperfdata.go | 66 +++++-- internal/nodemon/jvm_types.go | 18 +- test/fixtures/jvm-apps.yaml | 109 +++++++++++ 13 files changed, 707 insertions(+), 28 deletions(-) create mode 100644 .github/workflows/jvm-metrics-kind-test.yml create mode 100644 internal/collector/nodemon_jvm_client.go create mode 100644 test/fixtures/jvm-apps.yaml diff --git a/.github/workflows/jvm-metrics-kind-test.yml b/.github/workflows/jvm-metrics-kind-test.yml new file mode 100644 index 00000000..92a2a5e0 --- /dev/null +++ b/.github/workflows/jvm-metrics-kind-test.yml @@ -0,0 +1,116 @@ +permissions: + contents: read +name: JVM Metrics (nodemon) Kind Test + +on: + pull_request: + workflow_dispatch: + +jobs: + jvm-metrics-kind: + runs-on: ubuntu-xl + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Set up Go + uses: actions/setup-go@v5 + with: + go-version-file: "go.mod" + cache: true + + - name: Create kind cluster + uses: helm/kind-action@v1 + with: + version: v0.27.0 + node_image: kindest/node:v1.32.3 + cluster_name: kind-jvm-metrics + wait: 120s + + - name: Install metrics-server + run: | + helm repo add metrics-server https://kubernetes-sigs.github.io/metrics-server/ + helm repo update + helm upgrade --install --set args={--kubelet-insecure-tls} metrics-server metrics-server/metrics-server --namespace kube-system + kubectl wait --for=condition=available --timeout=300s deployment/metrics-server -n kube-system + + - name: Build nodemon image + run: | + docker build -f Dockerfile.nodemon -t zxporter-nodemon:jvm-ci . + + - name: Load nodemon image into kind + run: | + kind load docker-image zxporter-nodemon:jvm-ci --name kind-jvm-metrics + + - name: Deploy zxporter-nodemon (JVM metrics enabled) + run: | + kubectl create ns devzero-system || true + helm upgrade --install zxporter-nodemon ./helm-chart/zxporter-nodemon \ + -n devzero-system \ + --set provider=eks \ + --set dcgmExporter.enabled=false \ + --set jvmMetrics.enabled=true \ + --set jvmMetrics.buildMarker="ci-${GITHUB_RUN_ID}" \ + --set gpuMetricsExporter.image.repository=zxporter-nodemon \ + --set gpuMetricsExporter.image.tag=jvm-ci \ + --set gpuMetricsExporter.image.pullPolicy=IfNotPresent + kubectl -n devzero-system rollout status ds/zxporter-nodemon --timeout=180s + + - name: Deploy JVM test workloads + run: | + kubectl apply -f test/fixtures/jvm-apps.yaml + kubectl -n jvm-spike rollout status deploy/java8-xmx-only --timeout=300s + kubectl -n jvm-spike rollout status deploy/java11-tool-options --timeout=300s + kubectl -n jvm-spike rollout status deploy/java17-maxram-only --timeout=300s + kubectl -n jvm-spike rollout status deploy/not-java --timeout=300s + + - name: Validate nodemon JVM metrics output + run: | + POD_IP=$(kubectl -n devzero-system get pod -l app.kubernetes.io/name=zxporter-nodemon -o jsonpath='{.items[0].status.podIP}') + echo "nodemon podIP=$POD_IP" + + # Assertions (python; no jq dependency). Fetch JSON inside the python container. + kubectl -n devzero-system run py-assert --image=python:3.12-slim -i --rm --restart=Never --command -- \ + sh -lc "python - <<'PY' +import json,sys,urllib.request +pod_ip = '${POD_IP}' +url = f'http://{pod_ip}:6061/container/jvm-metrics?namespace=jvm-spike' +with urllib.request.urlopen(url, timeout=30) as r: + data = json.loads(r.read().decode('utf-8')) + +by_pod={m.get('pod'):m for m in data} + +def req_prefix(prefix): + for k,v in by_pod.items(): + if k and k.startswith(prefix): + return v + raise SystemExit(f'missing pod prefix={prefix}; got pods={list(by_pod.keys())}') + +# Ensure not-java absent +if any((m.get('pod','') or '').startswith('not-java') for m in data): + raise SystemExit('not-java unexpectedly present in JVM metrics') + +# java8: cmdline -Xmx192m => 201326592 +m8=req_prefix('java8-xmx-only') +flags=m8.get('flags_extracted',{}) +if flags.get('xmx_bytes')!=201326592: + raise SystemExit(f'java8-xmx-only expected xmx_bytes=201326592 got {flags.get("xmx_bytes")}') + +# java11: env JAVA_TOOL_OPTIONS provides xmx/xms/maxram; sources should mention JAVA_TOOL_OPTIONS +m11=req_prefix('java11-tool-options') +flags=m11.get('flags_extracted',{}) +if flags.get('xmx_bytes')!=167772160 or flags.get('xms_bytes')!=50331648 or flags.get('max_ram_percentage')!=65: + raise SystemExit(f'java11-tool-options flags mismatch: {flags}') +src=m11.get('flag_sources',{}) +xmx_src=str(src.get('xmx_bytes','')) +if 'TOOL_OPTIONS' not in xmx_src: + raise SystemExit(f'java11-tool-options expected xmx source to mention JAVA_TOOL_OPTIONS; got {src}') + +# java17: maxram only should be 40 +m17=req_prefix('java17-maxram-only') +flags=m17.get('flags_extracted',{}) +if flags.get('max_ram_percentage')!=40: + raise SystemExit(f'java17-maxram-only expected max_ram_percentage=40 got {flags.get("max_ram_percentage")}') + +print('OK') +PY" \ No newline at end of file diff --git a/cmd/zxporter-nodemon/main.go b/cmd/zxporter-nodemon/main.go index 1f840e1b..ee5237ea 100644 --- a/cmd/zxporter-nodemon/main.go +++ b/cmd/zxporter-nodemon/main.go @@ -29,9 +29,11 @@ func main() { logger := zapr.NewLogger(zapLog) versionInfo := version.Get() + marker := os.Getenv("JVM_METRICS_BUILD_MARKER") logger.Info("Starting zxporter-nodemon", "version", versionInfo.String(), - "commit", versionInfo.GitCommit) + "commit", versionInfo.GitCommit, + "jvmMetricsBuildMarker", marker) cfg := nodemon.ExporterConfig{ HTTPListenPort: envInt("HTTP_LISTEN_PORT", 6061), diff --git a/helm-chart/zxporter-nodemon/templates/daemonset.yaml b/helm-chart/zxporter-nodemon/templates/daemonset.yaml index e6dd2891..1d83ab72 100644 --- a/helm-chart/zxporter-nodemon/templates/daemonset.yaml +++ b/helm-chart/zxporter-nodemon/templates/daemonset.yaml @@ -76,6 +76,9 @@ spec: runAsNonRoot: false runAsUser: 0 readOnlyRootFilesystem: true + capabilities: + add: + - SYS_PTRACE {{- else }} {{- toYaml .Values.gpuMetricsExporter.securityContext | nindent 12 }} {{- end }} @@ -101,6 +104,10 @@ spec: valueFrom: fieldRef: fieldPath: spec.nodeName + {{- if .Values.jvmMetrics.buildMarker }} + - name: "JVM_METRICS_BUILD_MARKER" + value: {{ .Values.jvmMetrics.buildMarker | quote }} + {{- end }} {{- if .Values.dcgmExporter.enabled }} - name: "DCGM_HOST" value: "localhost" diff --git a/helm-chart/zxporter-nodemon/values.yaml b/helm-chart/zxporter-nodemon/values.yaml index 7abe0aa0..78c5db86 100644 --- a/helm-chart/zxporter-nodemon/values.yaml +++ b/helm-chart/zxporter-nodemon/values.yaml @@ -46,6 +46,9 @@ gpuMetricsExporter: # NOTE: requires hostPID + running nodemon as UID 0 to read /proc//root/tmp/hsperfdata_*/ jvmMetrics: enabled: false + # Optional: forces a recognizable marker into the nodemon env + startup logs to + # prove which image/chart revision is running. + buildMarker: "" dcgmExporter: enabled: true diff --git a/internal/collector/container_metrics.go b/internal/collector/container_metrics.go index 7a35e0df..732a4c03 100644 --- a/internal/collector/container_metrics.go +++ b/internal/collector/container_metrics.go @@ -76,8 +76,19 @@ type ContainerMetricsSnapshot struct { GpuLimitCount interface{} `json:"gpuLimitCount,omitempty"` GpuTotalMemoryMb interface{} `json:"gpuTotalMemoryMb,omitempty"` IndividualGPUMetrics string `json:"individualGPUMetrics,omitempty"` // JSON string + + // JVM metrics (from zxporter-nodemon /container/jvm-metrics) + JvmJavaCommand string `json:"jvmJavaCommand,omitempty"` + JvmJavaVersion string `json:"jvmJavaVersion,omitempty"` + JvmHeapSizeBytes int64 `json:"jvmHeapSizeBytes,omitempty"` + JvmHeapUsedBytes int64 `json:"jvmHeapUsedBytes,omitempty"` + JvmHeapMaxSizeBytes int64 `json:"jvmHeapMaxSizeBytes,omitempty"` + JvmRawCmdline string `json:"jvmRawCmdline,omitempty"` + JvmFlagsExtractedJSON string `json:"jvmFlagsExtractedJson,omitempty"` + JvmFlagSourcesJSON string `json:"jvmFlagSourcesJson,omitempty"` } + // BuildOOMSnapshot constructs a ContainerMetricsSnapshot for an OOM event. // Used by both the PodCollector (informer fast path) and OOMReconciler (sweep path) // to ensure consistent snapshot construction. diff --git a/internal/collector/container_resource_collector.go b/internal/collector/container_resource_collector.go index 04c2ebb0..0148706a 100644 --- a/internal/collector/container_resource_collector.go +++ b/internal/collector/container_resource_collector.go @@ -50,12 +50,42 @@ type ContainerResourceCollectorConfig struct { // DisableGPUMetrics determines whether to disable GPU metrics collection // Default is false, so metrics are collected by default DisableGPUMetrics bool + + // DisableJVMMetrics determines whether to disable JVM metrics collection (via zxporter-nodemon). + // Default is false. + DisableJVMMetrics bool } type gpuQueryState struct { lastFailed bool } +func strFromMap(m map[string]interface{}, key string) string { + v, ok := m[key] + if !ok || v == nil { + return "" + } + s, _ := v.(string) + return s +} + +func i64FromMap(m map[string]interface{}, key string) int64 { + v, ok := m[key] + if !ok || v == nil { + return 0 + } + switch x := v.(type) { + case int64: + return x + case int: + return int64(x) + case float64: + return int64(x) + default: + return 0 + } +} + // throttleTracker tracks last emission time for CPU throttle events per container to avoid duplicates. type throttleTracker struct { lastEmitted map[string]time.Time // key: "ns/pod/container" → last emit time @@ -167,7 +197,8 @@ func (c *ContainerResourceCollector) Start(ctx context.Context) error { "namespaces", c.namespaces, "updateInterval", c.config.UpdateInterval, "disableNetworkIOMetrics", c.config.DisableNetworkIOMetrics, - "disableGPUMetrics", c.config.DisableGPUMetrics) + "disableGPUMetrics", c.config.DisableGPUMetrics, + "disableJVMMetrics", c.config.DisableJVMMetrics) // Check if metrics client is available if c.metricsClient == nil { @@ -176,7 +207,7 @@ func (c *ContainerResourceCollector) Start(ctx context.Context) error { // Initialize nodemon client for auto-discovery // It discovers DaemonSet pods by well-known label — no config needed. - if !c.config.DisableGPUMetrics { + if !c.config.DisableGPUMetrics || !c.config.DisableJVMMetrics { ns := os.Getenv("POD_NAMESPACE") if ns == "" { ns = "devzero-system" @@ -352,6 +383,17 @@ func (c *ContainerResourceCollector) collectAllContainerResources(ctx context.Co } } + // Pre-fetch JVM metrics from the nodemon (one HTTP call for the entire cycle) + var jvmIndex map[gpuContainerKey]NodemonJVMMetrics + if c.nodemonClient != nil && !c.config.DisableJVMMetrics { + allJVMMetrics, err := c.nodemonClient.FetchAllJVMMetrics(ctx) + if err != nil { + c.logger.Error(err, "Failed to fetch JVM metrics from nodemon") + } else { + jvmIndex = IndexJVMMetricsByContainer(allJVMMetrics) + } + } + // Process each pod's metrics for _, podMetrics := range podMetricsList.Items { // Skip excluded pods @@ -496,6 +538,19 @@ func (c *ContainerResourceCollector) collectAllContainerResources(ctx context.Co } } + // JVM metrics lookup (optional) + jvmMetrics := make(map[string]interface{}) + if jvmIndex != nil { + key := gpuContainerKey{ + Pod: podMetrics.Name, + Container: containerMetrics.Name, + Namespace: podMetrics.Namespace, + } + if jm, ok := jvmIndex[key]; ok { + jvmMetrics = JVMMetricsFromNodemon(jm) + } + } + // Process the container metrics with optional network/IO data c.processContainerMetrics( pod, @@ -503,6 +558,7 @@ func (c *ContainerResourceCollector) collectAllContainerResources(ctx context.Co networkMetrics, ioMetrics, gpuMetrics, + jvmMetrics, throttleFraction, ) } @@ -516,6 +572,7 @@ func (c *ContainerResourceCollector) processContainerMetrics( networkMetrics map[string]float64, ioMetrics map[string]float64, gpuMetrics map[string]interface{}, + jvmMetrics map[string]interface{}, throttleFraction float64, ) { // Find the container spec in the pod @@ -685,6 +742,17 @@ func (c *ContainerResourceCollector) processContainerMetrics( } } + if len(jvmMetrics) > 0 { + metricsSnapshot.JvmJavaCommand = strFromMap(jvmMetrics, "JavaCommand") + metricsSnapshot.JvmJavaVersion = strFromMap(jvmMetrics, "JavaVersion") + metricsSnapshot.JvmHeapSizeBytes = i64FromMap(jvmMetrics, "HeapSizeBytes") + metricsSnapshot.JvmHeapUsedBytes = i64FromMap(jvmMetrics, "HeapUsedBytes") + metricsSnapshot.JvmHeapMaxSizeBytes = i64FromMap(jvmMetrics, "HeapMaxSizeBytes") + metricsSnapshot.JvmRawCmdline = strFromMap(jvmMetrics, "RawCmdline") + metricsSnapshot.JvmFlagsExtractedJSON = strFromMap(jvmMetrics, "FlagsExtractedJSON") + metricsSnapshot.JvmFlagSourcesJSON = strFromMap(jvmMetrics, "FlagSourcesJSON") + } + // Send the resource usage data to the batch channel c.batchChan <- CollectedResource{ ResourceType: ContainerResource, diff --git a/internal/collector/nodemon_jvm_client.go b/internal/collector/nodemon_jvm_client.go new file mode 100644 index 00000000..3e9a13d1 --- /dev/null +++ b/internal/collector/nodemon_jvm_client.go @@ -0,0 +1,117 @@ +package collector + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "time" +) + +// NodemonJVMMetrics represents a single JVM metric entry returned by the nodemon +// GET /container/jvm-metrics endpoint. +// Mirrors nodemon.JVMMetric but is defined here to avoid importing the nodemon package. +// +// NOTE: We intentionally keep FlagsExtracted / FlagSources as generic maps to +// preserve forward-compatibility with nodemon output. +type NodemonJVMMetrics struct { + NodeName string `json:"node_name"` + Pod string `json:"pod"` + Namespace string `json:"namespace"` + Container string `json:"container"` + ContainerID string `json:"container_id"` + PidHost int `json:"pid_host"` + PidNS int `json:"pid_ns"` + + JavaCommand string `json:"java_command,omitempty"` + JavaVersion string `json:"java_version,omitempty"` + + HeapSizeBytes int64 `json:"heap_size_bytes"` + HeapUsedBytes int64 `json:"heap_used_bytes"` + HeapMaxSizeBytes int64 `json:"heap_max_size_bytes"` + + FlagsExtracted map[string]any `json:"flags_extracted"` + FlagSources map[string]any `json:"flag_sources,omitempty"` + RawCmdline string `json:"raw_cmdline,omitempty"` + Timestamp time.Time `json:"timestamp"` +} + +// FetchAllJVMMetrics discovers all nodemon pods and fetches JVM metrics from each, +// merging the results into a single slice. +func (c *NodemonClient) FetchAllJVMMetrics(ctx context.Context) ([]NodemonJVMMetrics, error) { + nodeToIP, err := c.refreshCache(ctx) + if err != nil { + return nil, err + } + if len(nodeToIP) == 0 { + return nil, nil + } + + var all []NodemonJVMMetrics + for nodeName, podIP := range nodeToIP { + url := fmt.Sprintf("http://%s:%d/container/jvm-metrics", podIP, c.port) + metrics, fetchErr := c.fetchJVMMetrics(ctx, url) + if fetchErr != nil { + c.log.Error(fetchErr, "Failed to fetch JVM metrics from exporter pod", "node", nodeName, "podIP", podIP) + continue + } + all = append(all, metrics...) + } + return all, nil +} + +func (c *NodemonClient) fetchJVMMetrics(ctx context.Context, url string) ([]NodemonJVMMetrics, error) { + req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) + if err != nil { + return nil, fmt.Errorf("creating request: %w", err) + } + + resp, err := c.httpClient.Do(req) + if err != nil { + return nil, fmt.Errorf("HTTP request to nodemon failed: %w", err) + } + defer func() { _ = resp.Body.Close() }() + + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("nodemon returned status %d", resp.StatusCode) + } + + var metrics []NodemonJVMMetrics + if err := json.NewDecoder(resp.Body).Decode(&metrics); err != nil { + return nil, fmt.Errorf("decoding nodemon jvm response: %w", err) + } + return metrics, nil +} + +// IndexJVMMetricsByContainer indexes JVM metrics by (pod, container, namespace) for O(1) lookup. +func IndexJVMMetricsByContainer(metrics []NodemonJVMMetrics) map[gpuContainerKey]NodemonJVMMetrics { + index := make(map[gpuContainerKey]NodemonJVMMetrics) + for _, m := range metrics { + key := gpuContainerKey{Pod: m.Pod, Container: m.Container, Namespace: m.Namespace} + // If multiple JVMs in one container, keep the first for now. + if _, exists := index[key]; !exists { + index[key] = m + } + } + return index +} + +// JVMMetricsFromNodemon converts a JVM metric entry into a flat map[string]interface{} +// for attachment to ContainerMetricsSnapshot. +func JVMMetricsFromNodemon(m NodemonJVMMetrics) map[string]interface{} { + out := map[string]interface{}{ + "JavaCommand": m.JavaCommand, + "JavaVersion": m.JavaVersion, + "HeapSizeBytes": m.HeapSizeBytes, + "HeapUsedBytes": m.HeapUsedBytes, + "HeapMaxSizeBytes": m.HeapMaxSizeBytes, + "RawCmdline": m.RawCmdline, + } + if b, err := json.Marshal(m.FlagsExtracted); err == nil { + out["FlagsExtractedJSON"] = string(b) + } + if b, err := json.Marshal(m.FlagSources); err == nil { + out["FlagSourcesJSON"] = string(b) + } + return out +} diff --git a/internal/nodemon/jvm_collector.go b/internal/nodemon/jvm_collector.go index efaaeb1d..6444d9b2 100644 --- a/internal/nodemon/jvm_collector.go +++ b/internal/nodemon/jvm_collector.go @@ -144,6 +144,7 @@ func buildJVMMetric(counters map[string]any, proc JavaProcess, info containerInf PidHost: proc.PidHost, PidNS: proc.PidNS, RawCmdline: proc.CmdLine, + // FlagsExtracted / FlagSources are filled after hsperf parsing. Timestamp: time.Now().UTC(), } @@ -196,7 +197,10 @@ func buildJVMMetric(counters map[string]any, proc JavaProcess, info containerInf m.SafepointTimeSecondsTotal = float64(safeTicks) / float64(freq) m.SafepointSyncTimeSecondsTotal = float64(syncTicks) / float64(freq) - m.FlagsExtracted = ParseJVMFlags(proc.CmdLine) + flags, sources, effectiveCmd := ParseJVMFlagsWithSources(proc.CmdLine, proc.EnvJavaOpts) + m.FlagsExtracted = flags + m.FlagSources = sources + m.RawCmdline = effectiveCmd return m } diff --git a/internal/nodemon/jvm_discovery.go b/internal/nodemon/jvm_discovery.go index 532d4a06..8f8cb36f 100644 --- a/internal/nodemon/jvm_discovery.go +++ b/internal/nodemon/jvm_discovery.go @@ -12,6 +12,7 @@ import ( "strings" ) + var ( // containerIDRe matches 64-char container IDs in cgroup paths for containerd, docker, and bare containerd. containerIDRe = regexp.MustCompile(`(?:cri-containerd|docker|containerd)-([a-f0-9]{64})\.scope`) @@ -21,10 +22,15 @@ var ( // JavaProcess holds info about a discovered Java process running inside a Kubernetes container. type JavaProcess struct { - PidHost int - PidNS int - ContainerID string - CmdLine string + PidHost int + PidNS int + ContainerID string + + CmdLine string + // EnvJavaOpts includes any env-injected Java options found in /proc//environ. + // Keys are env var names (JAVA_TOOL_OPTIONS, JDK_JAVA_OPTIONS, JAVA_OPTS). + EnvJavaOpts map[string]string + HsperfDataPath string } @@ -58,10 +64,14 @@ func discoverJavaProcesses(procRoot string) ([]JavaProcess, error) { continue } - // Read null-separated cmdline and convert to space-separated for parsing flags. + // Read null-separated cmdline and convert to space-separated for display / parsing. rawCmdline := readProcFile(filepath.Join(pidDir, "cmdline")) cmdline := string(bytes.ReplaceAll([]byte(rawCmdline), []byte{0}, []byte{' '})) + // Also capture env-injected JVM options (common in k8s via JAVA_TOOL_OPTIONS, + // JDK_JAVA_OPTIONS, JAVA_OPTS). These do NOT appear in /proc//cmdline. + envJavaOpts := readJavaOptsFromProcEnviron(filepath.Join(pidDir, "environ")) + cgroupContent := readProcFile(filepath.Join(pidDir, "cgroup")) containerID, ok := parseCgroupContainerID(cgroupContent) if !ok { @@ -84,6 +94,7 @@ func discoverJavaProcesses(procRoot string) ([]JavaProcess, error) { PidNS: nsPid, ContainerID: containerID, CmdLine: strings.TrimSpace(cmdline), + EnvJavaOpts: envJavaOpts, HsperfDataPath: hsperfPath, }) } @@ -172,6 +183,8 @@ func readProcFile(path string) string { } // ParseJVMFlags parses JVM memory and container-awareness flags from a process cmdline string. +// +// NOTE: This is *best-effort* parsing for the few flags we care about for sizing. func ParseJVMFlags(cmdline string) JVMFlagsExtracted { var flags JVMFlagsExtracted for _, token := range strings.Fields(cmdline) { @@ -200,6 +213,161 @@ func ParseJVMFlags(cmdline string) JVMFlagsExtracted { return flags } +// ParseJVMFlagsWithSources extracts sizing-related JVM flags and also returns +// where each value came from. +// +// We cannot observe the final JVM argument list directly (env-injected options +// don’t appear in /proc//cmdline), so this is best-effort: +// - cmdline is treated as highest precedence +// - env vars are applied in the following precedence order: +// JAVA_TOOL_OPTIONS, JDK_JAVA_OPTIONS, JAVA_OPTS +// +// The returned effectiveCmdline is the cmdline plus the env options appended as +// tokens for observability. +func ParseJVMFlagsWithSources(cmdline string, envJavaOpts map[string]string) (JVMFlagsExtracted, JVMFlagSources, string) { + flags := JVMFlagsExtracted{} + src := JVMFlagSources{} + + applyTokens := func(tokens []string, source string) { + for _, token := range tokens { + switch { + case strings.HasPrefix(token, "-Xms"): + if flags.XmsBytes == nil { + if v, err := parseMemSize(token[4:]); err == nil { + flags.XmsBytes = &v + src.XmsBytes = source + } + } + case strings.HasPrefix(token, "-Xmx"): + if flags.XmxBytes == nil { + if v, err := parseMemSize(token[4:]); err == nil { + flags.XmxBytes = &v + src.XmxBytes = source + } + } + case strings.HasPrefix(token, "-XX:MaxRAMPercentage="): + if flags.MaxRamPercentage == nil { + s := token[len("-XX:MaxRAMPercentage="):] + if v, err := strconv.ParseFloat(s, 64); err == nil { + flags.MaxRamPercentage = &v + src.MaxRamPercentage = source + } + } + case token == "-XX:+UseContainerSupport": + if flags.UseContainerSupport == nil { + t := true + flags.UseContainerSupport = &t + src.UseContainerSupport = source + } + case token == "-XX:-UseContainerSupport": + if flags.UseContainerSupport == nil { + f := false + flags.UseContainerSupport = &f + src.UseContainerSupport = source + } + } + } + } + + applyTokens(strings.Fields(cmdline), "cmdline") + + effective := strings.TrimSpace(cmdline) + for _, k := range []string{"JAVA_TOOL_OPTIONS", "JDK_JAVA_OPTIONS", "JAVA_OPTS"} { + v := "" + if envJavaOpts != nil { + v = envJavaOpts[k] + } + if strings.TrimSpace(v) == "" { + continue + } + toks := splitJavaOpts(v) + if len(toks) > 0 { + effective = strings.TrimSpace(effective + " " + strings.Join(toks, " ")) + } + applyTokens(toks, k) + } + + return flags, src, effective +} + +func readJavaOptsFromProcEnviron(environPath string) map[string]string { + raw := readProcFile(environPath) + if raw == "" { + return nil + } + + // /proc//environ is NUL-separated key=value pairs. + parts := strings.Split(raw, "\x00") + out := map[string]string{} + for _, kv := range parts { + if kv == "" { + continue + } + k, v, ok := strings.Cut(kv, "=") + if !ok { + continue + } + switch k { + case "JAVA_TOOL_OPTIONS", "JDK_JAVA_OPTIONS", "JAVA_OPTS": + if strings.TrimSpace(v) != "" { + out[k] = v + } + } + } + if len(out) == 0 { + return nil + } + return out +} + +func splitJavaOpts(s string) []string { + var out []string + var cur strings.Builder + inSingle := false + inDouble := false + escape := false + + flush := func() { + if cur.Len() > 0 { + out = append(out, cur.String()) + cur.Reset() + } + } + + for _, r := range s { + if escape { + cur.WriteRune(r) + escape = false + continue + } + if r == '\\' && !inSingle { + escape = true + continue + } + switch r { + case '\'': + if !inDouble { + inSingle = !inSingle + continue + } + case '"': + if !inSingle { + inDouble = !inDouble + continue + } + } + if !inSingle && !inDouble { + if r == ' ' || r == '\t' || r == '\n' || r == '\r' { + flush() + continue + } + } + cur.WriteRune(r) + } + flush() + return out +} + // parseMemSize parses JVM memory size strings: "256m", "4g", "512k", or bare bytes. func parseMemSize(s string) (int64, error) { if s == "" { diff --git a/internal/nodemon/jvm_discovery_test.go b/internal/nodemon/jvm_discovery_test.go index a04f10b5..cf5aad05 100644 --- a/internal/nodemon/jvm_discovery_test.go +++ b/internal/nodemon/jvm_discovery_test.go @@ -1,12 +1,40 @@ package nodemon import ( + "os" "testing" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) +func TestSplitJavaOpts(t *testing.T) { + assert.Equal(t, []string{"-Xms48m", "-Xmx160m", "-XX:MaxRAMPercentage=65", "-Dfrom=tooloptions"}, splitJavaOpts("-Xms48m -Xmx160m -XX:MaxRAMPercentage=65 -Dfrom=tooloptions")) + assert.Equal(t, []string{"-Dfoo=bar baz", "-Xmx256m"}, splitJavaOpts("-Dfoo='bar baz' -Xmx256m")) + assert.Equal(t, []string{"-Dfoo=bar baz", "-Xmx256m"}, splitJavaOpts("-Dfoo=\"bar baz\" -Xmx256m")) +} + +func TestReadJVMFlagsFromProcEnviron(t *testing.T) { + f, err := os.CreateTemp(t.TempDir(), "environ") + require.NoError(t, err) + defer f.Close() + + // NUL-separated key=value pairs. + _, err = f.WriteString("PATH=/usr/bin\x00JAVA_TOOL_OPTIONS=-Xms48m -Xmx160m -XX:MaxRAMPercentage=65 -Dfrom=tooloptions\x00OTHER=x\x00") + require.NoError(t, err) + require.NoError(t, f.Sync()) + + env := readJavaOptsFromProcEnviron(f.Name()) + flags, _, _ := ParseJVMFlagsWithSources("java", env) + parsed := flags + require.NotNil(t, parsed.XmsBytes) + require.NotNil(t, parsed.XmxBytes) + require.NotNil(t, parsed.MaxRamPercentage) + assert.Equal(t, int64(48*1024*1024), *parsed.XmsBytes) + assert.Equal(t, int64(160*1024*1024), *parsed.XmxBytes) + assert.Equal(t, 65.0, *parsed.MaxRamPercentage) +} + func TestParseCgroupContainerID(t *testing.T) { const id64 = "abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789" tests := []struct { diff --git a/internal/nodemon/jvm_hsperfdata.go b/internal/nodemon/jvm_hsperfdata.go index 973d2f17..e8061538 100644 --- a/internal/nodemon/jvm_hsperfdata.go +++ b/internal/nodemon/jvm_hsperfdata.go @@ -58,24 +58,19 @@ func parseHsperfdata(data []byte) (map[string]any, error) { return nil, fmt.Errorf("hsperfdata: file too short (%d bytes)", len(data)) } - // Detect byte order from magic. - var order binary.ByteOrder - if binary.BigEndian.Uint32(data[0:4]) == perfMagic { - order = binary.BigEndian - } else if binary.LittleEndian.Uint32(data[0:4]) == perfMagic { - order = binary.LittleEndian - } else { + // Detect byte order. + // + // In practice, we’ve observed hsperfdata files where the raw magic bytes match + // 0xca fe c0 c0 but the numeric header fields (entry_offset/num_entries/etc.) + // must be interpreted little-endian to be sane. So: validate both orders and + // pick the one that yields a consistent header. + if binary.BigEndian.Uint32(data[0:4]) != perfMagic && binary.LittleEndian.Uint32(data[0:4]) != perfMagic { return nil, fmt.Errorf("hsperfdata: invalid magic bytes") } - entryOffset := int(order.Uint32(data[24:28])) - numEntries := int(order.Uint32(data[28:32])) - if entryOffset < 32 || entryOffset >= len(data) { - return nil, fmt.Errorf("hsperfdata: invalid entry_offset=%d len=%d", entryOffset, len(data)) - } - // Guard against corrupted headers. - if numEntries < 0 || numEntries > 100_000 { - return nil, fmt.Errorf("hsperfdata: unreasonable num_entries=%d", numEntries) + order, entryOffset, numEntries, err := pickHsperfByteOrder(data) + if err != nil { + return nil, err } result := make(map[string]any, minInt(numEntries, 2048)) @@ -161,6 +156,47 @@ func parseHsperfdata(data []byte) (map[string]any, error) { return result, nil } +func pickHsperfByteOrder(data []byte) (binary.ByteOrder, int, int, error) { + // Try both endian interpretations and pick the one that produces a sane header. + try := func(order binary.ByteOrder) (entryOffset int, numEntries int, ok bool) { + if len(data) < 32 { + return 0, 0, false + } + entryOffset = int(order.Uint32(data[24:28])) + numEntries = int(order.Uint32(data[28:32])) + if entryOffset < 32 || entryOffset+20 > len(data) { + return 0, 0, false + } + if numEntries < 0 || numEntries > 100_000 { + return 0, 0, false + } + // Validate first entry has a plausible length. + firstLen := int(order.Uint32(data[entryOffset : entryOffset+4])) + if firstLen <= 0 || entryOffset+firstLen > len(data) { + return 0, 0, false + } + return entryOffset, numEntries, true + } + + beOff, beN, beOK := try(binary.BigEndian) + leOff, leN, leOK := try(binary.LittleEndian) + + switch { + case beOK && !leOK: + return binary.BigEndian, beOff, beN, nil + case leOK && !beOK: + return binary.LittleEndian, leOff, leN, nil + case beOK && leOK: + // Tie-breaker: prefer the order matching the magic interpretation if possible. + if binary.BigEndian.Uint32(data[0:4]) == perfMagic { + return binary.BigEndian, beOff, beN, nil + } + return binary.LittleEndian, leOff, leN, nil + default: + return nil, 0, 0, fmt.Errorf("hsperfdata: could not determine byte order (len=%d)", len(data)) + } +} + func minInt(a, b int) int { if a < b { return a diff --git a/internal/nodemon/jvm_types.go b/internal/nodemon/jvm_types.go index 62fd388b..6b716e59 100644 --- a/internal/nodemon/jvm_types.go +++ b/internal/nodemon/jvm_types.go @@ -23,12 +23,22 @@ type JVMMetric struct { SafepointTimeSecondsTotal float64 `json:"safepoint_time_seconds_total"` SafepointSyncTimeSecondsTotal float64 `json:"safepoint_sync_time_seconds_total"` - FlagsExtracted JVMFlagsExtracted `json:"flags_extracted"` - RawCmdline string `json:"raw_cmdline,omitempty"` - Timestamp time.Time `json:"timestamp"` + FlagsExtracted JVMFlagsExtracted `json:"flags_extracted"` + FlagSources JVMFlagSources `json:"flag_sources,omitempty"` + RawCmdline string `json:"raw_cmdline,omitempty"` + Timestamp time.Time `json:"timestamp"` } -// JVMFlagsExtracted holds JVM flags parsed from the process cmdline. +// JVMFlagSources describes where each extracted JVM flag value came from. +// This is best-effort; precedence rules are implemented in ParseJVMFlagsWithSources. +type JVMFlagSources struct { + XmsBytes string `json:"xms_bytes,omitempty"` + XmxBytes string `json:"xmx_bytes,omitempty"` + MaxRamPercentage string `json:"max_ram_percentage,omitempty"` + UseContainerSupport string `json:"use_container_support,omitempty"` +} + +// JVMFlagsExtracted holds JVM flags parsed from the process cmdline and/or env. type JVMFlagsExtracted struct { XmsBytes *int64 `json:"xms_bytes,omitempty"` XmxBytes *int64 `json:"xmx_bytes,omitempty"` diff --git a/test/fixtures/jvm-apps.yaml b/test/fixtures/jvm-apps.yaml new file mode 100644 index 00000000..f481511f --- /dev/null +++ b/test/fixtures/jvm-apps.yaml @@ -0,0 +1,109 @@ +apiVersion: v1 +kind: Namespace +metadata: + name: jvm-spike +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: java8-xmx-only + namespace: jvm-spike +spec: + replicas: 1 + selector: + matchLabels: + app: java8-xmx-only + template: + metadata: + labels: + app: java8-xmx-only + spec: + containers: + - name: app + image: eclipse-temurin:8-jre + command: ["sh","-lc"] + args: + - | + cat >/tmp/DummyMain.java <<'EOF' + public class DummyMain { public static void main(String[] args) throws Exception { for(;;) Thread.sleep(60000); } } + EOF + javac /tmp/DummyMain.java + exec java -Xmx192m -cp /tmp DummyMain +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: java11-tool-options + namespace: jvm-spike +spec: + replicas: 1 + selector: + matchLabels: + app: java11-tool-options + template: + metadata: + labels: + app: java11-tool-options + spec: + containers: + - name: app + image: eclipse-temurin:11-jre + env: + - name: JAVA_TOOL_OPTIONS + value: "-Xms48m -Xmx160m -XX:MaxRAMPercentage=65 -Dfrom=tooloptions" + command: ["sh","-lc"] + args: + - | + cat >/tmp/DummyMain.java <<'EOF' + public class DummyMain { public static void main(String[] args) throws Exception { for(;;) Thread.sleep(60000); } } + EOF + javac /tmp/DummyMain.java + exec java -cp /tmp DummyMain +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: java17-maxram-only + namespace: jvm-spike +spec: + replicas: 1 + selector: + matchLabels: + app: java17-maxram-only + template: + metadata: + labels: + app: java17-maxram-only + spec: + containers: + - name: app + image: eclipse-temurin:17-jre + command: ["sh","-lc"] + args: + - | + cat >/tmp/DummyMain.java <<'EOF' + public class DummyMain { public static void main(String[] args) throws Exception { for(;;) Thread.sleep(60000); } } + EOF + javac /tmp/DummyMain.java + exec java -XX:MaxRAMPercentage=40 -cp /tmp DummyMain +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: not-java + namespace: jvm-spike +spec: + replicas: 1 + selector: + matchLabels: + app: not-java + template: + metadata: + labels: + app: not-java + spec: + containers: + - name: app + image: busybox:1.36 + command: ["sh","-lc"] + args: ["sleep 3600"] From f076b1efbbd12a877f994c9a1e35e7a293c794de Mon Sep 17 00:00:00 2001 From: Debot MacMini1 Date: Mon, 25 May 2026 10:27:04 -0700 Subject: [PATCH 10/13] nodemon: fix JVM safepoint units, cgroupv2 IDs, CI lint; drop build marker --- .github/workflows/jvm-metrics-kind-test.yml | 1 - cmd/zxporter-nodemon/main.go | 4 +-- .../zxporter-nodemon/templates/daemonset.yaml | 5 +-- helm-chart/zxporter-nodemon/values.yaml | 3 -- internal/collector/container_metrics.go | 1 - internal/nodemon/jvm_collector.go | 8 +++-- internal/nodemon/jvm_discovery.go | 33 ++++++++++++------- internal/nodemon/jvm_discovery_test.go | 4 +-- internal/nodemon/jvm_hsperfdata.go | 2 +- internal/nodemon/jvm_types.go | 8 ++--- 10 files changed, 36 insertions(+), 33 deletions(-) diff --git a/.github/workflows/jvm-metrics-kind-test.yml b/.github/workflows/jvm-metrics-kind-test.yml index 92a2a5e0..979d4274 100644 --- a/.github/workflows/jvm-metrics-kind-test.yml +++ b/.github/workflows/jvm-metrics-kind-test.yml @@ -50,7 +50,6 @@ jobs: --set provider=eks \ --set dcgmExporter.enabled=false \ --set jvmMetrics.enabled=true \ - --set jvmMetrics.buildMarker="ci-${GITHUB_RUN_ID}" \ --set gpuMetricsExporter.image.repository=zxporter-nodemon \ --set gpuMetricsExporter.image.tag=jvm-ci \ --set gpuMetricsExporter.image.pullPolicy=IfNotPresent diff --git a/cmd/zxporter-nodemon/main.go b/cmd/zxporter-nodemon/main.go index ee5237ea..1f840e1b 100644 --- a/cmd/zxporter-nodemon/main.go +++ b/cmd/zxporter-nodemon/main.go @@ -29,11 +29,9 @@ func main() { logger := zapr.NewLogger(zapLog) versionInfo := version.Get() - marker := os.Getenv("JVM_METRICS_BUILD_MARKER") logger.Info("Starting zxporter-nodemon", "version", versionInfo.String(), - "commit", versionInfo.GitCommit, - "jvmMetricsBuildMarker", marker) + "commit", versionInfo.GitCommit) cfg := nodemon.ExporterConfig{ HTTPListenPort: envInt("HTTP_LISTEN_PORT", 6061), diff --git a/helm-chart/zxporter-nodemon/templates/daemonset.yaml b/helm-chart/zxporter-nodemon/templates/daemonset.yaml index 1d83ab72..4855a401 100644 --- a/helm-chart/zxporter-nodemon/templates/daemonset.yaml +++ b/helm-chart/zxporter-nodemon/templates/daemonset.yaml @@ -104,10 +104,7 @@ spec: valueFrom: fieldRef: fieldPath: spec.nodeName - {{- if .Values.jvmMetrics.buildMarker }} - - name: "JVM_METRICS_BUILD_MARKER" - value: {{ .Values.jvmMetrics.buildMarker | quote }} - {{- end }} + {{- if .Values.dcgmExporter.enabled }} - name: "DCGM_HOST" value: "localhost" diff --git a/helm-chart/zxporter-nodemon/values.yaml b/helm-chart/zxporter-nodemon/values.yaml index 78c5db86..7abe0aa0 100644 --- a/helm-chart/zxporter-nodemon/values.yaml +++ b/helm-chart/zxporter-nodemon/values.yaml @@ -46,9 +46,6 @@ gpuMetricsExporter: # NOTE: requires hostPID + running nodemon as UID 0 to read /proc//root/tmp/hsperfdata_*/ jvmMetrics: enabled: false - # Optional: forces a recognizable marker into the nodemon env + startup logs to - # prove which image/chart revision is running. - buildMarker: "" dcgmExporter: enabled: true diff --git a/internal/collector/container_metrics.go b/internal/collector/container_metrics.go index 732a4c03..edbc45d2 100644 --- a/internal/collector/container_metrics.go +++ b/internal/collector/container_metrics.go @@ -88,7 +88,6 @@ type ContainerMetricsSnapshot struct { JvmFlagSourcesJSON string `json:"jvmFlagSourcesJson,omitempty"` } - // BuildOOMSnapshot constructs a ContainerMetricsSnapshot for an OOM event. // Used by both the PodCollector (informer fast path) and OOMReconciler (sweep path) // to ensure consistent snapshot construction. diff --git a/internal/nodemon/jvm_collector.go b/internal/nodemon/jvm_collector.go index 6444d9b2..16e7a8aa 100644 --- a/internal/nodemon/jvm_collector.go +++ b/internal/nodemon/jvm_collector.go @@ -145,7 +145,7 @@ func buildJVMMetric(counters map[string]any, proc JavaProcess, info containerInf PidNS: proc.PidNS, RawCmdline: proc.CmdLine, // FlagsExtracted / FlagSources are filled after hsperf parsing. - Timestamp: time.Now().UTC(), + Timestamp: time.Now().UTC(), } m.JavaCommand, _ = hsStr(counters, "sun.rt.javaCommand") @@ -194,8 +194,10 @@ func buildJVMMetric(counters map[string]any, proc JavaProcess, info containerInf safeTicks, _ := hsInt(counters, "sun.rt.safepointTime") syncTicks, _ := hsInt(counters, "sun.rt.safepointSyncTime") - m.SafepointTimeSecondsTotal = float64(safeTicks) / float64(freq) - m.SafepointSyncTimeSecondsTotal = float64(syncTicks) / float64(freq) + // HotSpot stores safepoint counters in milliseconds (not HRT ticks like GC collector time). + // See: sun.rt.safepointTime, sun.rt.safepointSyncTime + m.SafepointTimeSecondsTotal = float64(safeTicks) / 1000.0 + m.SafepointSyncTimeSecondsTotal = float64(syncTicks) / 1000.0 flags, sources, effectiveCmd := ParseJVMFlagsWithSources(proc.CmdLine, proc.EnvJavaOpts) m.FlagsExtracted = flags diff --git a/internal/nodemon/jvm_discovery.go b/internal/nodemon/jvm_discovery.go index 8f8cb36f..6ce30619 100644 --- a/internal/nodemon/jvm_discovery.go +++ b/internal/nodemon/jvm_discovery.go @@ -12,14 +12,19 @@ import ( "strings" ) - var ( - // containerIDRe matches 64-char container IDs in cgroup paths for containerd, docker, and bare containerd. + // containerIDRe matches 64-char container IDs in cgroup paths for containerd and docker. containerIDRe = regexp.MustCompile(`(?:cri-containerd|docker|containerd)-([a-f0-9]{64})\.scope`) // crioRe matches CRI-O container IDs. crioRe = regexp.MustCompile(`crio-([a-f0-9]{64})\.scope`) + // bareCgroupIDRe matches cgroupv2 paths where the last segment is a bare 64-char container ID. + // Example: .../kubepods-burstable-pod.slice/ + bareCgroupIDRe = regexp.MustCompile(`/([a-f0-9]{64})$`) ) +const javaComm = "java" +const javaBinName = "java" + // JavaProcess holds info about a discovered Java process running inside a Kubernetes container. type JavaProcess struct { PidHost int @@ -46,7 +51,7 @@ func discoverJavaProcesses(procRoot string) ([]JavaProcess, error) { return nil, fmt.Errorf("reading %s: %w", procRoot, err) } - var procs []JavaProcess + procs := make([]JavaProcess, 0, len(entries)) for _, e := range entries { if !e.IsDir() { continue @@ -59,14 +64,14 @@ func discoverJavaProcesses(procRoot string) ([]JavaProcess, error) { pidDir := filepath.Join(procRoot, e.Name()) comm := strings.TrimSpace(readProcFile(filepath.Join(pidDir, "comm"))) - // Fast path: most JVMs have comm == "java". Avoid reading cmdline for every PID. - if comm != "java" { - continue - } // Read null-separated cmdline and convert to space-separated for display / parsing. rawCmdline := readProcFile(filepath.Join(pidDir, "cmdline")) cmdline := string(bytes.ReplaceAll([]byte(rawCmdline), []byte{0}, []byte{' '})) + // Fast path: most JVMs have comm == "java"; but on some systems comm may differ. + if !isJavaProcess(comm, cmdline) { + continue + } // Also capture env-injected JVM options (common in k8s via JAVA_TOOL_OPTIONS, // JDK_JAVA_OPTIONS, JAVA_OPTS). These do NOT appear in /proc//cmdline. @@ -105,14 +110,14 @@ func discoverJavaProcesses(procRoot string) ([]JavaProcess, error) { // isJavaProcess returns true if the process comm is "java" or its first cmdline // argument is a binary named "java". func isJavaProcess(comm, cmdline string) bool { - if comm == "java" { + if comm == javaComm { return true } parts := strings.Fields(cmdline) if len(parts) == 0 { return false } - return filepath.Base(parts[0]) == "java" + return filepath.Base(parts[0]) == javaBinName } // parseCgroupContainerID extracts a 64-char hex container ID from cgroup file content. @@ -126,6 +131,9 @@ func parseCgroupContainerID(content string) (string, bool) { if m := crioRe.FindStringSubmatch(line); len(m) == 2 { return m[1], true } + if m := bareCgroupIDRe.FindStringSubmatch(line); len(m) == 2 { + return m[1], true + } } return "", false } @@ -173,7 +181,10 @@ func readProcFile(path string) string { if err != nil { return "" } - defer f.Close() + defer func() { + // procfs close errors are not actionable. + _ = f.Close() + }() b, err := io.ReadAll(io.LimitReader(f, maxProcBytes)) if err != nil { @@ -220,7 +231,7 @@ func ParseJVMFlags(cmdline string) JVMFlagsExtracted { // don’t appear in /proc//cmdline), so this is best-effort: // - cmdline is treated as highest precedence // - env vars are applied in the following precedence order: -// JAVA_TOOL_OPTIONS, JDK_JAVA_OPTIONS, JAVA_OPTS +// JAVA_TOOL_OPTIONS, JDK_JAVA_OPTIONS, JAVA_OPTS // // The returned effectiveCmdline is the cmdline plus the env options appended as // tokens for observability. diff --git a/internal/nodemon/jvm_discovery_test.go b/internal/nodemon/jvm_discovery_test.go index cf5aad05..31026722 100644 --- a/internal/nodemon/jvm_discovery_test.go +++ b/internal/nodemon/jvm_discovery_test.go @@ -17,7 +17,7 @@ func TestSplitJavaOpts(t *testing.T) { func TestReadJVMFlagsFromProcEnviron(t *testing.T) { f, err := os.CreateTemp(t.TempDir(), "environ") require.NoError(t, err) - defer f.Close() + defer func() { _ = f.Close() }() // NUL-separated key=value pairs. _, err = f.WriteString("PATH=/usr/bin\x00JAVA_TOOL_OPTIONS=-Xms48m -Xmx160m -XX:MaxRAMPercentage=65 -Dfrom=tooloptions\x00OTHER=x\x00") @@ -101,7 +101,7 @@ func TestParseNSpid(t *testing.T) { wantOK bool }{ { - name: "nested namespace - takes last value", + name: "nested namespace - takes last value", content: "Name:\tjava\nPid:\t12345\nNSpid:\t12345\t67\nTgid:\t12345\n", wantPid: 67, wantOK: true, diff --git a/internal/nodemon/jvm_hsperfdata.go b/internal/nodemon/jvm_hsperfdata.go index e8061538..9d3d4993 100644 --- a/internal/nodemon/jvm_hsperfdata.go +++ b/internal/nodemon/jvm_hsperfdata.go @@ -18,7 +18,7 @@ func readHsperfdata(path string) (map[string]any, error) { if err != nil { return nil, err } - defer f.Close() + defer func() { _ = f.Close() }() data, err := io.ReadAll(io.LimitReader(f, maxHsperfBytes)) if err != nil { diff --git a/internal/nodemon/jvm_types.go b/internal/nodemon/jvm_types.go index 6b716e59..f1a2489a 100644 --- a/internal/nodemon/jvm_types.go +++ b/internal/nodemon/jvm_types.go @@ -23,10 +23,10 @@ type JVMMetric struct { SafepointTimeSecondsTotal float64 `json:"safepoint_time_seconds_total"` SafepointSyncTimeSecondsTotal float64 `json:"safepoint_sync_time_seconds_total"` - FlagsExtracted JVMFlagsExtracted `json:"flags_extracted"` - FlagSources JVMFlagSources `json:"flag_sources,omitempty"` - RawCmdline string `json:"raw_cmdline,omitempty"` - Timestamp time.Time `json:"timestamp"` + FlagsExtracted JVMFlagsExtracted `json:"flags_extracted"` + FlagSources JVMFlagSources `json:"flag_sources,omitempty"` + RawCmdline string `json:"raw_cmdline,omitempty"` + Timestamp time.Time `json:"timestamp"` } // JVMFlagSources describes where each extracted JVM flag value came from. From ec1c64e84f5a2aaebd2b1890b79c7fcbdb85c83d Mon Sep 17 00:00:00 2001 From: Parthiba-Hazra Date: Tue, 26 May 2026 13:35:07 +0530 Subject: [PATCH 11/13] get the pod list from informer instead of api calls in nodemon for jvm metrics mapping --- cmd/zxporter-nodemon/main.go | 16 ++- internal/nodemon/jvm_collector.go | 190 +++++++++++++++++-------- internal/nodemon/jvm_collector_test.go | 105 ++++++++++++++ 3 files changed, 249 insertions(+), 62 deletions(-) create mode 100644 internal/nodemon/jvm_collector_test.go diff --git a/cmd/zxporter-nodemon/main.go b/cmd/zxporter-nodemon/main.go index 1f840e1b..0a5ad661 100644 --- a/cmd/zxporter-nodemon/main.go +++ b/cmd/zxporter-nodemon/main.go @@ -17,6 +17,7 @@ import ( "go.uber.org/zap" "k8s.io/client-go/dynamic" + "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" ) @@ -63,6 +64,12 @@ func main() { os.Exit(1) } + k8sClient, err := kubernetes.NewForConfig(kubeConfig) + if err != nil { + logger.Error(err, "Failed to create kubernetes client") + os.Exit(1) + } + // Create components httpClient := &http.Client{Timeout: 15 * time.Second} scraper := nodemon.NewScraper(httpClient, logger) @@ -80,7 +87,13 @@ func main() { exporter := nodemon.NewExporter(cfg, dynClient, scraper, mapper, logger) // Create JVM collector - jvmCollector := nodemon.NewJVMCollector(cfg.NodeName, dynClient, logger) + jvmCollector := nodemon.NewJVMCollector(cfg.NodeName, k8sClient, logger) + + // Start JVM collector's pod informer + if err := jvmCollector.Start(); err != nil { + logger.Error(err, "Failed to start JVM collector") + os.Exit(1) + } // Create HTTP handlers and server containerMetricsHandler := nodemon.NewContainerMetricsHandler(exporter, logger) @@ -111,6 +124,7 @@ func main() { <-sigChan logger.Info("Shutting down...") + jvmCollector.Stop() shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 5*time.Second) defer shutdownCancel() if err := server.Shutdown(shutdownCtx); err != nil { diff --git a/internal/nodemon/jvm_collector.go b/internal/nodemon/jvm_collector.go index 16e7a8aa..9920bc62 100644 --- a/internal/nodemon/jvm_collector.go +++ b/internal/nodemon/jvm_collector.go @@ -5,13 +5,15 @@ import ( "fmt" "os" "strings" + "sync" "time" "github.com/go-logr/logr" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/client-go/dynamic" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" ) type containerInfo struct { @@ -24,39 +26,142 @@ type containerInfo struct { // Requires the pod to run with hostPID: true and as UID 0 to read // /proc//root/tmp/hsperfdata_*/ for other containers. type JVMCollector struct { - nodeName string - dynClient dynamic.Interface - procRoot string - log logr.Logger + nodeName string + k8sClient kubernetes.Interface + informerFactory informers.SharedInformerFactory + procRoot string + log logr.Logger + + mu sync.RWMutex + containerMap map[string]containerInfo // containerID (hex) -> pod metadata + stopCh chan struct{} } // NewJVMCollector creates a JVMCollector. procRoot defaults to "/proc". -func NewJVMCollector(nodeName string, dynClient dynamic.Interface, log logr.Logger) *JVMCollector { +func NewJVMCollector(nodeName string, k8sClient kubernetes.Interface, log logr.Logger) *JVMCollector { return &JVMCollector{ - nodeName: nodeName, - dynClient: dynClient, - procRoot: "/proc", - log: log.WithName("jvm-collector"), + nodeName: nodeName, + k8sClient: k8sClient, + procRoot: "/proc", + log: log.WithName("jvm-collector"), + containerMap: make(map[string]containerInfo), + stopCh: make(chan struct{}), } } -// QueryJVMMetrics returns JVM metrics for all discovered Java containers on this node. -func (c *JVMCollector) QueryJVMMetrics(ctx context.Context) ([]JVMMetric, error) { - // Pod listing can be slow on some clusters/APIServer conditions. Bound it so - // JVM metrics scraping doesn't wedge the HTTP server. - ctxPods, cancelPods := context.WithTimeout(ctx, 500*time.Millisecond) - defer cancelPods() - start := time.Now() +// Start creates a node-scoped pod informer and waits for the cache to sync. +// Must be called exactly once before serving HTTP requests. +func (c *JVMCollector) Start() error { + if c.informerFactory != nil { + return fmt.Errorf("JVMCollector already started") + } + + c.informerFactory = informers.NewSharedInformerFactoryWithOptions( + c.k8sClient, + 0, + informers.WithTweakListOptions(func(opts *metav1.ListOptions) { + if c.nodeName != "" { + opts.FieldSelector = "spec.nodeName=" + c.nodeName + } + }), + ) - containerMap, err := c.buildContainerMap(ctxPods) + podInformer := c.informerFactory.Core().V1().Pods().Informer() + + _, err := podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + pod, ok := obj.(*corev1.Pod) + if !ok { + return + } + c.updateContainerMap(pod) + }, + UpdateFunc: func(_, newObj interface{}) { + pod, ok := newObj.(*corev1.Pod) + if !ok { + return + } + c.updateContainerMap(pod) + }, + DeleteFunc: func(obj interface{}) { + pod, ok := obj.(*corev1.Pod) + if !ok { + tombstone, ok := obj.(cache.DeletedFinalStateUnknown) + if !ok { + return + } + pod, ok = tombstone.Obj.(*corev1.Pod) + if !ok { + return + } + } + c.removeFromContainerMap(pod) + }, + }) if err != nil { - // Non-fatal: continue with empty map; pod/namespace/container fields will be blank. - c.log.Error(err, "Failed to build container map; pod metadata will be missing") - containerMap = map[string]containerInfo{} + return fmt.Errorf("adding pod event handler: %w", err) } - c.log.Info("Built container map", "count", len(containerMap), "took", time.Since(start).String()) - start = time.Now() + c.informerFactory.Start(c.stopCh) + + c.log.Info("Waiting for pod informer cache to sync") + if !cache.WaitForCacheSync(c.stopCh, podInformer.HasSynced) { + return fmt.Errorf("timed out waiting for pod informer cache to sync") + } + c.log.Info("Pod informer cache synced") + + return nil +} + +// Stop shuts down the informer factory. Safe to call multiple times. +func (c *JVMCollector) Stop() { + select { + case <-c.stopCh: + // already closed + default: + close(c.stopCh) + } +} + +func (c *JVMCollector) updateContainerMap(pod *corev1.Pod) { + c.mu.Lock() + defer c.mu.Unlock() + for _, cs := range pod.Status.ContainerStatuses { + id := stripContainerIDScheme(cs.ContainerID) + if id == "" { + continue + } + c.containerMap[id] = containerInfo{ + Pod: pod.Name, + Namespace: pod.Namespace, + Container: cs.Name, + } + } +} + +func (c *JVMCollector) removeFromContainerMap(pod *corev1.Pod) { + c.mu.Lock() + defer c.mu.Unlock() + for _, cs := range pod.Status.ContainerStatuses { + id := stripContainerIDScheme(cs.ContainerID) + if id == "" { + continue + } + delete(c.containerMap, id) + } +} + +// QueryJVMMetrics returns JVM metrics for all discovered Java containers on this node. +func (c *JVMCollector) QueryJVMMetrics(ctx context.Context) ([]JVMMetric, error) { + c.mu.RLock() + containerMap := make(map[string]containerInfo, len(c.containerMap)) + for k, v := range c.containerMap { + containerMap[k] = v + } + c.mu.RUnlock() + c.log.Info("Container map snapshot", "count", len(containerMap)) + + start := time.Now() procs, err := discoverJavaProcesses(c.procRoot) if err != nil { return nil, fmt.Errorf("discovering java processes: %w", err) @@ -66,7 +171,6 @@ func (c *JVMCollector) QueryJVMMetrics(ctx context.Context) ([]JVMMetric, error) start = time.Now() metrics := make([]JVMMetric, 0, len(procs)) for _, proc := range procs { - // Bound each read so a single bad proc file can’t wedge the request. c.log.Info("Reading hsperfdata", "pid", proc.PidHost, "path", proc.HsperfDataPath) if st, err := os.Stat(proc.HsperfDataPath); err == nil { c.log.Info("hsperfdata stat", "pid", proc.PidHost, "sizeBytes", st.Size()) @@ -89,42 +193,6 @@ func (c *JVMCollector) QueryJVMMetrics(ctx context.Context) ([]JVMMetric, error) return metrics, nil } -// buildContainerMap lists running pods on this node and returns a map of -// containerID (hex, no scheme prefix) → containerInfo. -func (c *JVMCollector) buildContainerMap(ctx context.Context) (map[string]containerInfo, error) { - fieldSelector := "status.phase=Running" - if c.nodeName != "" { - fieldSelector = fmt.Sprintf("%s,spec.nodeName=%s", fieldSelector, c.nodeName) - } - - podList, err := c.dynClient.Resource(podGVR). - Namespace(""). - List(ctx, metav1.ListOptions{FieldSelector: fieldSelector}) - if err != nil { - return nil, fmt.Errorf("listing pods: %w", err) - } - - result := make(map[string]containerInfo, len(podList.Items)) - for _, item := range podList.Items { - var pod corev1.Pod - if err := runtime.DefaultUnstructuredConverter.FromUnstructured(item.Object, &pod); err != nil { - continue - } - for _, cs := range pod.Status.ContainerStatuses { - id := stripContainerIDScheme(cs.ContainerID) - if id == "" { - continue - } - result[id] = containerInfo{ - Pod: pod.Name, - Namespace: pod.Namespace, - Container: cs.Name, - } - } - } - return result, nil -} - // stripContainerIDScheme strips the URL scheme (e.g., "containerd://") from a container ID. func stripContainerIDScheme(raw string) string { if i := strings.LastIndex(raw, "://"); i >= 0 { diff --git a/internal/nodemon/jvm_collector_test.go b/internal/nodemon/jvm_collector_test.go new file mode 100644 index 00000000..f9eada80 --- /dev/null +++ b/internal/nodemon/jvm_collector_test.go @@ -0,0 +1,105 @@ +package nodemon + +import ( + "testing" + + "github.com/stretchr/testify/assert" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func TestUpdateContainerMap(t *testing.T) { + c := &JVMCollector{ + containerMap: make(map[string]containerInfo), + } + + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "my-app-abc", + Namespace: "default", + }, + Status: corev1.PodStatus{ + ContainerStatuses: []corev1.ContainerStatus{ + { + Name: "app", + ContainerID: "containerd://abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789", + }, + { + Name: "sidecar", + ContainerID: "containerd://1111111111111111111111111111111111111111111111111111111111111111", + }, + }, + }, + } + + c.updateContainerMap(pod) + + assert.Equal(t, containerInfo{Pod: "my-app-abc", Namespace: "default", Container: "app"}, + c.containerMap["abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789"]) + assert.Equal(t, containerInfo{Pod: "my-app-abc", Namespace: "default", Container: "sidecar"}, + c.containerMap["1111111111111111111111111111111111111111111111111111111111111111"]) + assert.Len(t, c.containerMap, 2) +} + +func TestRemoveFromContainerMap(t *testing.T) { + c := &JVMCollector{ + containerMap: map[string]containerInfo{ + "abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789": {Pod: "my-app", Namespace: "default", Container: "app"}, + "1111111111111111111111111111111111111111111111111111111111111111": {Pod: "my-app", Namespace: "default", Container: "sidecar"}, + "2222222222222222222222222222222222222222222222222222222222222222": {Pod: "other-pod", Namespace: "kube-system", Container: "dns"}, + }, + } + + pod := &corev1.Pod{ + Status: corev1.PodStatus{ + ContainerStatuses: []corev1.ContainerStatus{ + {ContainerID: "containerd://abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789"}, + {ContainerID: "containerd://1111111111111111111111111111111111111111111111111111111111111111"}, + }, + }, + } + + c.removeFromContainerMap(pod) + + assert.Len(t, c.containerMap, 1) + assert.Contains(t, c.containerMap, "2222222222222222222222222222222222222222222222222222222222222222") +} + +func TestUpdateContainerMap_SkipsEmptyID(t *testing.T) { + c := &JVMCollector{ + containerMap: make(map[string]containerInfo), + } + + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "pending-pod", Namespace: "default"}, + Status: corev1.PodStatus{ + ContainerStatuses: []corev1.ContainerStatus{ + {Name: "app", ContainerID: ""}, + }, + }, + } + + c.updateContainerMap(pod) + assert.Empty(t, c.containerMap) +} + +func TestUpdateContainerMap_OverwritesOnUpdate(t *testing.T) { + cid := "abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789" + c := &JVMCollector{ + containerMap: map[string]containerInfo{ + cid: {Pod: "old-pod", Namespace: "default", Container: "app"}, + }, + } + + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "new-pod", Namespace: "staging"}, + Status: corev1.PodStatus{ + ContainerStatuses: []corev1.ContainerStatus{ + {Name: "web", ContainerID: "containerd://" + cid}, + }, + }, + } + + c.updateContainerMap(pod) + assert.Equal(t, containerInfo{Pod: "new-pod", Namespace: "staging", Container: "web"}, c.containerMap[cid]) +} From fae204e7829054175eb80cfaf4da5c99840b4563 Mon Sep 17 00:00:00 2001 From: Parthiba-Hazra Date: Tue, 26 May 2026 16:08:03 +0530 Subject: [PATCH 12/13] some small bug fixes --- internal/collector/container_resource_collector.go | 5 ++++- internal/collector/nodemon_client.go | 4 +++- internal/collector/nodemon_jvm_client.go | 4 +++- internal/nodemon/jvm_collector.go | 7 +++++++ internal/nodemon/jvm_discovery.go | 2 +- internal/nodemon/jvm_types.go | 5 ++++- 6 files changed, 22 insertions(+), 5 deletions(-) diff --git a/internal/collector/container_resource_collector.go b/internal/collector/container_resource_collector.go index 0148706a..7b67b1a9 100644 --- a/internal/collector/container_resource_collector.go +++ b/internal/collector/container_resource_collector.go @@ -415,7 +415,6 @@ func (c *ContainerResourceCollector) collectAllContainerResources(ctx context.Co var cancel context.CancelFunc if c.prometheusAPI != nil { queryCtx, cancel = context.WithTimeout(ctx, c.config.QueryTimeout) - defer cancel() } // Fetch network metrics @@ -562,6 +561,10 @@ func (c *ContainerResourceCollector) collectAllContainerResources(ctx context.Co throttleFraction, ) } + + if cancel != nil { + cancel() + } } } diff --git a/internal/collector/nodemon_client.go b/internal/collector/nodemon_client.go index 3f058473..7989c2e4 100644 --- a/internal/collector/nodemon_client.go +++ b/internal/collector/nodemon_client.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "io" "math" "net/http" "sync" @@ -257,8 +258,9 @@ func (c *NodemonClient) fetchMetrics( return nil, fmt.Errorf("nodemon returned status %d", resp.StatusCode) } + const maxResponseBytes = 16 << 20 // 16MiB safety cap var metrics []NodemonMetric - if err := json.NewDecoder(resp.Body).Decode(&metrics); err != nil { + if err := json.NewDecoder(io.LimitReader(resp.Body, maxResponseBytes)).Decode(&metrics); err != nil { return nil, fmt.Errorf("decoding nodemon response: %w", err) } diff --git a/internal/collector/nodemon_jvm_client.go b/internal/collector/nodemon_jvm_client.go index 3e9a13d1..9fb27f64 100644 --- a/internal/collector/nodemon_jvm_client.go +++ b/internal/collector/nodemon_jvm_client.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "io" "net/http" "time" ) @@ -76,8 +77,9 @@ func (c *NodemonClient) fetchJVMMetrics(ctx context.Context, url string) ([]Node return nil, fmt.Errorf("nodemon returned status %d", resp.StatusCode) } + const maxResponseBytes = 16 << 20 // 16MiB safety cap var metrics []NodemonJVMMetrics - if err := json.NewDecoder(resp.Body).Decode(&metrics); err != nil { + if err := json.NewDecoder(io.LimitReader(resp.Body, maxResponseBytes)).Decode(&metrics); err != nil { return nil, fmt.Errorf("decoding nodemon jvm response: %w", err) } return metrics, nil diff --git a/internal/nodemon/jvm_collector.go b/internal/nodemon/jvm_collector.go index 9920bc62..b6f03278 100644 --- a/internal/nodemon/jvm_collector.go +++ b/internal/nodemon/jvm_collector.go @@ -171,6 +171,13 @@ func (c *JVMCollector) QueryJVMMetrics(ctx context.Context) ([]JVMMetric, error) start = time.Now() metrics := make([]JVMMetric, 0, len(procs)) for _, proc := range procs { + select { + case <-ctx.Done(): + c.log.Info("JVM metrics query cancelled", "collected", len(metrics), "remaining", len(procs)-len(metrics)) + return metrics, ctx.Err() + default: + } + c.log.Info("Reading hsperfdata", "pid", proc.PidHost, "path", proc.HsperfDataPath) if st, err := os.Stat(proc.HsperfDataPath); err == nil { c.log.Info("hsperfdata stat", "pid", proc.PidHost, "sizeBytes", st.Size()) diff --git a/internal/nodemon/jvm_discovery.go b/internal/nodemon/jvm_discovery.go index 6ce30619..e87a461b 100644 --- a/internal/nodemon/jvm_discovery.go +++ b/internal/nodemon/jvm_discovery.go @@ -51,7 +51,7 @@ func discoverJavaProcesses(procRoot string) ([]JavaProcess, error) { return nil, fmt.Errorf("reading %s: %w", procRoot, err) } - procs := make([]JavaProcess, 0, len(entries)) + procs := make([]JavaProcess, 0, 64) for _, e := range entries { if !e.IsDir() { continue diff --git a/internal/nodemon/jvm_types.go b/internal/nodemon/jvm_types.go index f1a2489a..24f79b03 100644 --- a/internal/nodemon/jvm_types.go +++ b/internal/nodemon/jvm_types.go @@ -25,7 +25,10 @@ type JVMMetric struct { FlagsExtracted JVMFlagsExtracted `json:"flags_extracted"` FlagSources JVMFlagSources `json:"flag_sources,omitempty"` - RawCmdline string `json:"raw_cmdline,omitempty"` + // RawCmdline is the effective command line including env-injected options + // (JAVA_TOOL_OPTIONS, JDK_JAVA_OPTIONS, JAVA_OPTS). Note: this may contain + // sensitive values if secrets are passed via JVM system properties or env vars. + RawCmdline string `json:"raw_cmdline,omitempty"` Timestamp time.Time `json:"timestamp"` } From 0adba2eaebeea24550394a0e82f9b9fde7873d6a Mon Sep 17 00:00:00 2001 From: Parthiba-Hazra Date: Tue, 26 May 2026 22:58:17 +0530 Subject: [PATCH 13/13] fix perser to collect correct jvm heap usage --- internal/nodemon/jvm_collector.go | 28 +++++++++++++++++++++------- 1 file changed, 21 insertions(+), 7 deletions(-) diff --git a/internal/nodemon/jvm_collector.go b/internal/nodemon/jvm_collector.go index b6f03278..d347a3fc 100644 --- a/internal/nodemon/jvm_collector.go +++ b/internal/nodemon/jvm_collector.go @@ -200,6 +200,22 @@ func (c *JVMCollector) QueryJVMMetrics(ctx context.Context) ([]JVMMetric, error) return metrics, nil } +// sumSpaceCounters sums sun.gc.generation.*.space.*.{metric} across all generations +// and spaces. This handles JVMs where the aggregate sun.gc.heap.{metric} counter +// doesn't exist (Serial GC, some G1 configurations). +func sumSpaceCounters(counters map[string]any, metric string) int64 { + var total int64 + for gen := 0; gen < 4; gen++ { + for space := 0; space < 4; space++ { + key := fmt.Sprintf("sun.gc.generation.%d.space.%d.%s", gen, space, metric) + if v, ok := hsInt(counters, key); ok { + total += v + } + } + } + return total +} + // stripContainerIDScheme strips the URL scheme (e.g., "containerd://") from a container ID. func stripContainerIDScheme(raw string) string { if i := strings.LastIndex(raw, "://"); i >= 0 { @@ -226,21 +242,19 @@ func buildJVMMetric(counters map[string]any, proc JavaProcess, info containerInf m.JavaCommand, _ = hsStr(counters, "sun.rt.javaCommand") m.JavaVersion, _ = hsStr(counters, "java.property.java.version") - // Heap capacity: try aggregate heap counters first, then sum generation counters. + // Heap capacity: try aggregate heap counter first, then sum per-space counters. if cap, ok := hsInt(counters, "sun.gc.heap.capacity"); ok { m.HeapSizeBytes = cap } else { - gen0, _ := hsInt(counters, "sun.gc.generation.0.capacity") - gen1, _ := hsInt(counters, "sun.gc.generation.1.capacity") - m.HeapSizeBytes = gen0 + gen1 + m.HeapSizeBytes = sumSpaceCounters(counters, "capacity") } if used, ok := hsInt(counters, "sun.gc.heap.used"); ok { m.HeapUsedBytes = used } else { - gen0, _ := hsInt(counters, "sun.gc.generation.0.used") - gen1, _ := hsInt(counters, "sun.gc.generation.1.used") - m.HeapUsedBytes = gen0 + gen1 + // Aggregate heap counter missing (common with Serial/G1 GC). + // Sum all generation.*.space.*.used counters instead. + m.HeapUsedBytes = sumSpaceCounters(counters, "used") } if maxCap, ok := hsInt(counters, "sun.gc.heap.maxCapacity"); ok {