From a57d65aec72f76610406432887a3810415702883 Mon Sep 17 00:00:00 2001 From: TerrifiedBug Date: Wed, 11 Mar 2026 09:49:16 +0000 Subject: [PATCH 01/10] feat(agent): scrape component_latency_mean_seconds from Vector 0.54.0 --- agent/internal/agent/heartbeat.go | 17 +++++++++-------- agent/internal/client/client.go | 3 ++- agent/internal/metrics/scraper.go | 8 +++++++- 3 files changed, 18 insertions(+), 10 deletions(-) diff --git a/agent/internal/agent/heartbeat.go b/agent/internal/agent/heartbeat.go index 088172e4..50581ac7 100644 --- a/agent/internal/agent/heartbeat.go +++ b/agent/internal/agent/heartbeat.go @@ -42,14 +42,15 @@ func buildHeartbeat(sup *supervisor.Supervisor, vectorVersion string, deployment // Map per-component metrics for editor node overlays for _, cm := range sr.Components { ps.ComponentMetrics = append(ps.ComponentMetrics, client.ComponentMetric{ - ComponentID: cm.ComponentID, - ComponentKind: cm.ComponentKind, - ReceivedEvents: cm.ReceivedEvents, - SentEvents: cm.SentEvents, - ReceivedBytes: cm.ReceivedBytes, - SentBytes: cm.SentBytes, - ErrorsTotal: cm.ErrorsTotal, - DiscardedEvents: cm.DiscardedEvents, + ComponentID: cm.ComponentID, + ComponentKind: cm.ComponentKind, + ReceivedEvents: cm.ReceivedEvents, + SentEvents: cm.SentEvents, + ReceivedBytes: cm.ReceivedBytes, + SentBytes: cm.SentBytes, + ErrorsTotal: cm.ErrorsTotal, + DiscardedEvents: cm.DiscardedEvents, + LatencyMeanSeconds: cm.LatencyMeanSeconds, }) } diff --git a/agent/internal/client/client.go b/agent/internal/client/client.go index 9fd39297..b06d5d82 100644 --- a/agent/internal/client/client.go +++ b/agent/internal/client/client.go @@ -178,7 +178,8 @@ type ComponentMetric struct { ReceivedBytes int64 `json:"receivedBytes,omitempty"` SentBytes int64 `json:"sentBytes,omitempty"` ErrorsTotal int64 `json:"errorsTotal,omitempty"` - DiscardedEvents int64 `json:"discardedEvents,omitempty"` + DiscardedEvents int64 `json:"discardedEvents,omitempty"` + LatencyMeanSeconds float64 `json:"latencyMeanSeconds,omitempty"` } // HostMetrics holds system-level metrics from the Vector host diff --git a/agent/internal/metrics/scraper.go b/agent/internal/metrics/scraper.go index 45351fbe..c6358f25 100644 --- a/agent/internal/metrics/scraper.go +++ b/agent/internal/metrics/scraper.go @@ -28,7 +28,8 @@ type ComponentMetrics struct { ReceivedBytes int64 SentBytes int64 ErrorsTotal int64 - DiscardedEvents int64 + DiscardedEvents int64 + LatencyMeanSeconds float64 // mean event time in component (seconds) } // HostMetrics holds system-level metrics from Vector's host_metrics source. @@ -137,6 +138,11 @@ func ScrapePrometheus(metricsPort int) ScrapeResult { } getOrCreate(componentMap, componentID, componentKind).DiscardedEvents += v + case "vector_component_latency_mean_seconds", "component_latency_mean_seconds": + if !isInternal { + getOrCreate(componentMap, componentID, componentKind).LatencyMeanSeconds = value + } + // Host metrics – use += to aggregate across CPU cores, devices, interfaces, etc. case "host_memory_total_bytes": sr.Host.MemoryTotalBytes += int64(value) From 284a5e2e27d27c53b6390ddc48df255e3cbe21ce Mon Sep 17 00:00:00 2001 From: TerrifiedBug Date: Wed, 11 Mar 2026 09:55:04 +0000 Subject: [PATCH 02/10] feat(db): add latencyMeanMs column to PipelineMetric --- .../20260311000000_add_pipeline_latency_mean/migration.sql | 2 ++ prisma/schema.prisma | 1 + 2 files changed, 3 insertions(+) create mode 100644 prisma/migrations/20260311000000_add_pipeline_latency_mean/migration.sql diff --git a/prisma/migrations/20260311000000_add_pipeline_latency_mean/migration.sql b/prisma/migrations/20260311000000_add_pipeline_latency_mean/migration.sql new file mode 100644 index 00000000..377a4497 --- /dev/null +++ b/prisma/migrations/20260311000000_add_pipeline_latency_mean/migration.sql @@ -0,0 +1,2 @@ +-- AlterTable: Add latencyMeanMs column to PipelineMetric +ALTER TABLE "PipelineMetric" ADD COLUMN "latencyMeanMs" DOUBLE PRECISION; diff --git a/prisma/schema.prisma b/prisma/schema.prisma index 3a244320..650a4b86 100644 --- a/prisma/schema.prisma +++ b/prisma/schema.prisma @@ -321,6 +321,7 @@ model PipelineMetric { bytesIn BigInt @default(0) bytesOut BigInt @default(0) utilization Float @default(0) + latencyMeanMs Float? // pipeline-level weighted mean latency (ms) @@index([pipelineId, timestamp]) @@index([timestamp]) From 6a0ad6a78c693285494c9765d36c10f261cd6efb Mon Sep 17 00:00:00 2001 From: TerrifiedBug Date: Wed, 11 Mar 2026 09:55:08 +0000 Subject: [PATCH 03/10] feat: accept and store component latency in heartbeat pipeline --- src/app/api/agent/heartbeat/route.ts | 23 +++++++++++++++++++++++ src/server/services/metric-store.ts | 5 +++++ src/server/services/metrics-ingest.ts | 13 +++++++++++++ 3 files changed, 41 insertions(+) diff --git a/src/app/api/agent/heartbeat/route.ts b/src/app/api/agent/heartbeat/route.ts index 88338c6b..0391adab 100644 --- a/src/app/api/agent/heartbeat/route.ts +++ b/src/app/api/agent/heartbeat/route.ts @@ -14,6 +14,23 @@ import { deliverToChannels } from "@/server/services/channels"; import { DeploymentMode } from "@/generated/prisma"; import { isVersionOlder } from "@/lib/version"; +/** Compute pipeline-level weighted mean latency (ms) from per-component metrics. */ +function computeWeightedLatency( + components?: Array<{ receivedEvents: number; sentEvents: number; latencyMeanSeconds?: number }>, +): number | null { + if (!components || components.length === 0) return null; + let weightedSum = 0; + let totalEvents = 0; + for (const cm of components) { + if (cm.latencyMeanSeconds == null || cm.latencyMeanSeconds === 0) continue; + const events = cm.receivedEvents + cm.sentEvents; + weightedSum += cm.latencyMeanSeconds * 1000 * events; // convert seconds → ms + totalEvents += events; + } + if (totalEvents === 0) return null; + return weightedSum / totalEvents; +} + const heartbeatSchema = z.object({ agentVersion: z.string().max(100).optional(), vectorVersion: z.string().max(100).optional(), @@ -39,6 +56,7 @@ const heartbeatSchema = z.object({ sentBytes: z.number().optional(), errorsTotal: z.number().optional(), discardedEvents: z.number().optional(), + latencyMeanSeconds: z.number().optional(), // NEW })).optional(), utilization: z.number().optional(), recentLogs: z.array(z.string()).optional(), @@ -94,6 +112,7 @@ interface PipelineStatus { sentBytes?: number; errorsTotal?: number; discardedEvents?: number; + latencyMeanSeconds?: number; // NEW }>; utilization?: number; recentLogs?: string[]; @@ -319,6 +338,7 @@ export async function POST(request: Request) { bytesIn: BigInt(p.bytesIn ?? 0), bytesOut: BigInt(p.bytesOut ?? 0), utilization: p.utilization ?? 0, + latencyMeanMs: computeWeightedLatency(p.componentMetrics), })); if (metricsData.length > 0) { @@ -336,6 +356,9 @@ export async function POST(request: Request) { sentEventsTotal: cm.sentEvents, receivedBytesTotal: cm.receivedBytes ?? 0, sentBytesTotal: cm.sentBytes ?? 0, + errorsTotal: cm.errorsTotal ?? 0, + discardedTotal: cm.discardedEvents ?? 0, + latencyMeanSeconds: cm.latencyMeanSeconds, }); } } diff --git a/src/server/services/metric-store.ts b/src/server/services/metric-store.ts index ae7772fb..e7d8907c 100644 --- a/src/server/services/metric-store.ts +++ b/src/server/services/metric-store.ts @@ -7,6 +7,7 @@ export interface MetricSample { errorCount: number; errorsRate: number; discardedRate: number; + latencyMeanMs: number | null; // mean component latency in ms } interface PrevTotals { @@ -17,6 +18,7 @@ interface PrevTotals { sentBytesTotal: number; errorsTotal: number; discardedTotal: number; + latencyMeanSeconds: number | null; } const MAX_SAMPLES = 240; // 1 hour at 15s intervals @@ -36,6 +38,7 @@ class MetricStore { sentBytesTotal?: number; errorsTotal?: number; discardedTotal?: number; + latencyMeanSeconds?: number; }, ): MetricSample | null { const key = `${nodeId}:${pipelineId}:${componentId}`; @@ -50,6 +53,7 @@ class MetricStore { sentBytesTotal: totals.sentBytesTotal ?? 0, errorsTotal: totals.errorsTotal ?? 0, discardedTotal: totals.discardedTotal ?? 0, + latencyMeanSeconds: totals.latencyMeanSeconds ?? null, }); if (!prev) return null; @@ -66,6 +70,7 @@ class MetricStore { errorCount: totals.errorsTotal ?? 0, errorsRate: Math.max(0, ((totals.errorsTotal ?? 0) - prev.errorsTotal) / elapsedSec), discardedRate: Math.max(0, ((totals.discardedTotal ?? 0) - prev.discardedTotal) / elapsedSec), + latencyMeanMs: totals.latencyMeanSeconds != null ? totals.latencyMeanSeconds * 1000 : null, }; const arr = this.samples.get(key) ?? []; diff --git a/src/server/services/metrics-ingest.ts b/src/server/services/metrics-ingest.ts index 9d6c7354..3920f992 100644 --- a/src/server/services/metrics-ingest.ts +++ b/src/server/services/metrics-ingest.ts @@ -10,6 +10,7 @@ export interface MetricsDataPoint { bytesIn: bigint; bytesOut: bigint; utilization: number; + latencyMeanMs: number | null; } export interface PreviousSnapshot { @@ -95,6 +96,7 @@ export async function ingestMetrics( bytesIn: { increment: deltaBytesIn }, bytesOut: { increment: deltaBytesOut }, utilization: dp.utilization, + ...(dp.latencyMeanMs != null ? { latencyMeanMs: dp.latencyMeanMs } : {}), }, }); } else { @@ -110,6 +112,7 @@ export async function ingestMetrics( bytesIn: deltaBytesIn, bytesOut: deltaBytesOut, utilization: dp.utilization, + ...(dp.latencyMeanMs != null ? { latencyMeanMs: dp.latencyMeanMs } : {}), }, }); } @@ -134,6 +137,8 @@ export async function ingestMetrics( let totalBytesIn = BigInt(0); let totalBytesOut = BigInt(0); let totalUtil = 0; + let latencyWeightedSum = 0; + let latencyWeightCount = 0; for (const row of nodeRows) { totalEventsIn += row.eventsIn; @@ -143,9 +148,15 @@ export async function ingestMetrics( totalBytesIn += row.bytesIn; totalBytesOut += row.bytesOut; totalUtil += row.utilization; + if (row.latencyMeanMs != null) { + const rowEvents = Number(row.eventsIn) + Number(row.eventsOut); + latencyWeightedSum += row.latencyMeanMs * rowEvents; + latencyWeightCount += rowEvents; + } } const avgUtil = nodeRows.length > 0 ? totalUtil / nodeRows.length : 0; + const avgLatencyMs = latencyWeightCount > 0 ? latencyWeightedSum / latencyWeightCount : null; const existingAgg = await prisma.pipelineMetric.findFirst({ where: { @@ -166,6 +177,7 @@ export async function ingestMetrics( bytesIn: totalBytesIn, bytesOut: totalBytesOut, utilization: avgUtil, + ...(avgLatencyMs != null ? { latencyMeanMs: avgLatencyMs } : {}), }, }); } else { @@ -181,6 +193,7 @@ export async function ingestMetrics( bytesIn: totalBytesIn, bytesOut: totalBytesOut, utilization: avgUtil, + ...(avgLatencyMs != null ? { latencyMeanMs: avgLatencyMs } : {}), }, }); } From 7ef5861c321f0fd29c868b4de2891f48b06c89ab Mon Sep 17 00:00:00 2001 From: TerrifiedBug Date: Wed, 11 Mar 2026 09:57:51 +0000 Subject: [PATCH 04/10] feat(api): expose latency data in metrics and dashboard endpoints --- src/server/routers/dashboard.ts | 24 ++++++++++++++++++++---- src/server/routers/metrics.ts | 8 ++++++++ 2 files changed, 28 insertions(+), 4 deletions(-) diff --git a/src/server/routers/dashboard.ts b/src/server/routers/dashboard.ts index a441b8fd..dd189f03 100644 --- a/src/server/routers/dashboard.ts +++ b/src/server/routers/dashboard.ts @@ -682,6 +682,7 @@ export const dashboardRouter = router({ bytesOut: true, errorsTotal: true, eventsDiscarded: true, + latencyMeanMs: true, }, }), effectiveNodeIds.length > 0 @@ -741,22 +742,27 @@ export const dashboardRouter = router({ const bytesOut: TSMap = {}; const errors: TSMap = {}; const discarded: TSMap = {}; + const latency: TSMap = {}; if (input.groupBy === "node") { // Sum pipeline values per (node, timestamp) since multiple pipelines on one node produce multiple rows - const acc = new Map>(); + const acc = new Map>(); for (const row of pipelineRows) { const label = nodeNameMap.get(row.nodeId ?? "") ?? row.nodeId ?? "unknown"; const t = new Date(row.timestamp).getTime(); if (!acc.has(label)) acc.set(label, new Map()); const timeMap = acc.get(label)!; - const s = timeMap.get(t) ?? { ei: 0, eo: 0, bi: 0, bo: 0, er: 0, di: 0 }; + const s = timeMap.get(t) ?? { ei: 0, eo: 0, bi: 0, bo: 0, er: 0, di: 0, lat: 0, latC: 0 }; s.ei += Number(row.eventsIn) / 60; s.eo += Number(row.eventsOut) / 60; s.bi += Number(row.bytesIn) / 60; s.bo += Number(row.bytesOut) / 60; s.er += Number(row.errorsTotal) / 60; s.di += Number(row.eventsDiscarded) / 60; + if (row.latencyMeanMs != null) { + s.lat += row.latencyMeanMs; + s.latC++; + } timeMap.set(t, s); } for (const [label, timeMap] of acc) { @@ -767,20 +773,25 @@ export const dashboardRouter = router({ addPoint(bytesOut, label, t, s.bo); addPoint(errors, label, t, s.er); addPoint(discarded, label, t, s.di); + if (s.latC > 0) addPoint(latency, label, t, s.lat / s.latC); } } } else if (input.groupBy === "aggregate") { // Sum all pipelines into a single "Total" series per timestamp - const acc = new Map(); + const acc = new Map(); for (const row of pipelineRows) { const t = new Date(row.timestamp).getTime(); - const s = acc.get(t) ?? { ei: 0, eo: 0, bi: 0, bo: 0, er: 0, di: 0 }; + const s = acc.get(t) ?? { ei: 0, eo: 0, bi: 0, bo: 0, er: 0, di: 0, lat: 0, latC: 0 }; s.ei += Number(row.eventsIn) / 60; s.eo += Number(row.eventsOut) / 60; s.bi += Number(row.bytesIn) / 60; s.bo += Number(row.bytesOut) / 60; s.er += Number(row.errorsTotal) / 60; s.di += Number(row.eventsDiscarded) / 60; + if (row.latencyMeanMs != null) { + s.lat += row.latencyMeanMs; + s.latC++; + } acc.set(t, s); } for (const [t, s] of acc) { @@ -790,6 +801,7 @@ export const dashboardRouter = router({ addPoint(bytesOut, "Total", t, s.bo); addPoint(errors, "Total", t, s.er); addPoint(discarded, "Total", t, s.di); + if (s.latC > 0) addPoint(latency, "Total", t, s.lat / s.latC); } } else { // groupBy === "pipeline" — direct mapping, one series per pipeline @@ -802,6 +814,9 @@ export const dashboardRouter = router({ addPoint(bytesOut, label, t, Number(row.bytesOut) / 60); addPoint(errors, label, t, Number(row.errorsTotal) / 60); addPoint(discarded, label, t, Number(row.eventsDiscarded) / 60); + if (row.latencyMeanMs != null) { + addPoint(latency, label, t, row.latencyMeanMs); + } } } @@ -920,6 +935,7 @@ export const dashboardRouter = router({ bytesOut: downsample(bytesOut), errors: downsample(errors), discarded: downsample(discarded), + latency: downsample(latency), }, system: { cpu: downsample(cpu), diff --git a/src/server/routers/metrics.ts b/src/server/routers/metrics.ts index 75fe96c6..581dad25 100644 --- a/src/server/routers/metrics.ts +++ b/src/server/routers/metrics.ts @@ -34,6 +34,7 @@ export const metricsRouter = router({ bytesIn: true, bytesOut: true, utilization: true, + latencyMeanMs: true, }, }); @@ -113,6 +114,7 @@ export const metricsRouter = router({ bytesInRate: number; bytesOutRate: number; errorsRate: number; + latencyMeanMs: number | null; }> = {}; for (const [componentId, samples] of nodeMetrics) { @@ -126,6 +128,7 @@ export const metricsRouter = router({ const existing = rates[matchingNode.pipelineId] ?? { eventsInRate: 0, eventsOutRate: 0, bytesInRate: 0, bytesOutRate: 0, errorsRate: 0, + latencyMeanMs: null, }; if (matchingNode.kind === "SOURCE") { existing.eventsInRate += latest.receivedEventsRate; @@ -135,6 +138,11 @@ export const metricsRouter = router({ existing.bytesOutRate += latest.sentBytesRate; } existing.errorsRate += latest.errorsRate; + if (latest.latencyMeanMs != null) { + existing.latencyMeanMs = existing.latencyMeanMs != null + ? (existing.latencyMeanMs + latest.latencyMeanMs) / 2 + : latest.latencyMeanMs; + } rates[matchingNode.pipelineId] = existing; } From 6f549c6b1f798328cf13cfd99cd5a34d2989d9f9 Mon Sep 17 00:00:00 2001 From: TerrifiedBug Date: Wed, 11 Mar 2026 10:00:44 +0000 Subject: [PATCH 05/10] feat(ui): add formatLatency helper and Component Latency chart to dashboard --- src/app/(dashboard)/page.tsx | 12 +++++++++++- src/lib/format.ts | 8 ++++++++ 2 files changed, 19 insertions(+), 1 deletion(-) diff --git a/src/app/(dashboard)/page.tsx b/src/app/(dashboard)/page.tsx index 11467976..84961472 100644 --- a/src/app/(dashboard)/page.tsx +++ b/src/app/(dashboard)/page.tsx @@ -15,6 +15,7 @@ import { Plus, Pencil, Trash2, + Timer, } from "lucide-react"; import { Card, CardContent } from "@/components/ui/card"; import { Skeleton } from "@/components/ui/skeleton"; @@ -29,7 +30,7 @@ import { MetricsSection } from "@/components/dashboard/metrics-section"; import { MetricChart } from "@/components/dashboard/metric-chart"; import { ViewBuilderDialog } from "@/components/dashboard/view-builder-dialog"; import { CustomView } from "@/components/dashboard/custom-view"; -import { formatSI, formatBytesRate, formatEventsRate } from "@/lib/format"; +import { formatSI, formatBytesRate, formatEventsRate, formatLatency } from "@/lib/format"; import { PageHeader } from "@/components/page-header"; import { cn } from "@/lib/utils"; @@ -380,6 +381,15 @@ export default function DashboardPage() { timeRange={timeRange} height={200} /> + } + data={chartData.data?.pipeline.latency ?? {}} + variant="area" + yFormatter={formatLatency} + timeRange={timeRange} + height={200} + /> {/* System Metrics */} diff --git a/src/lib/format.ts b/src/lib/format.ts index a7dfc5aa..b3b44710 100644 --- a/src/lib/format.ts +++ b/src/lib/format.ts @@ -72,3 +72,11 @@ export function formatTimeAxis(timestamp: number | string, range: string): strin } return d.toLocaleTimeString([], { hour: "2-digit", minute: "2-digit" }); } + +/** Format latency in milliseconds to a human-readable string. */ +export function formatLatency(ms: number): string { + if (ms >= 1000) return `${(ms / 1000).toFixed(2)}s`; + if (ms >= 1) return `${ms.toFixed(1)}ms`; + if (ms >= 0.001) return `${(ms * 1000).toFixed(0)}us`; + return `${ms.toFixed(3)}ms`; +} From c790f5452c74f29bf2d2301fd8e8ea312a15f598 Mon Sep 17 00:00:00 2001 From: TerrifiedBug Date: Wed, 11 Mar 2026 10:00:48 +0000 Subject: [PATCH 06/10] feat(ui): add Component Latency chart to pipeline metrics page --- .../(dashboard)/pipelines/[id]/metrics/page.tsx | 9 +++++++++ src/components/metrics/component-chart.tsx | 15 +++++++++++---- 2 files changed, 20 insertions(+), 4 deletions(-) diff --git a/src/app/(dashboard)/pipelines/[id]/metrics/page.tsx b/src/app/(dashboard)/pipelines/[id]/metrics/page.tsx index 45517b49..85e0edb8 100644 --- a/src/app/(dashboard)/pipelines/[id]/metrics/page.tsx +++ b/src/app/(dashboard)/pipelines/[id]/metrics/page.tsx @@ -115,6 +115,15 @@ export default function PipelineMetricsPage() { + + + + Component Latency + + + + + )} diff --git a/src/components/metrics/component-chart.tsx b/src/components/metrics/component-chart.tsx index 7c34a9d3..575e5502 100644 --- a/src/components/metrics/component-chart.tsx +++ b/src/components/metrics/component-chart.tsx @@ -10,7 +10,7 @@ import { } from "recharts"; import { ChartContainer, ChartTooltip, ChartTooltipContent, type ChartConfig } from "@/components/ui/chart"; import { Inbox } from "lucide-react"; -import { formatBytesRate } from "@/lib/format"; +import { formatBytesRate, formatLatency } from "@/lib/format"; interface MetricRow { timestamp: Date; @@ -20,11 +20,12 @@ interface MetricRow { bytesOut: bigint; errorsTotal: bigint; eventsDiscarded: bigint; + latencyMeanMs?: number | null; } interface MetricsChartProps { rows: MetricRow[]; - dataKey: "events" | "bytes" | "errors"; + dataKey: "events" | "bytes" | "errors" | "latency"; height?: number; } @@ -38,12 +39,14 @@ const colorMap = { events: { in: "#22c55e", out: "#3b82f6" }, bytes: { in: "#f59e0b", out: "#8b5cf6" }, errors: { in: "#ef4444", out: "#f97316" }, + latency: { in: "#ec4899", out: "#ec4899" }, } as const; const labelMap = { events: { in: "Events In/s", out: "Events Out/s" }, bytes: { in: "Bytes In/s", out: "Bytes Out/s" }, errors: { in: "Errors/s", out: "Discarded/s" }, + latency: { in: "Mean Latency", out: "Mean Latency" }, } as const; export function MetricsChart({ rows, dataKey, height = 200 }: MetricsChartProps) { @@ -51,9 +54,11 @@ export function MetricsChart({ rows, dataKey, height = 200 }: MetricsChartProps) time: new Date(m.timestamp).toLocaleTimeString([], { hour: "2-digit", minute: "2-digit" }), in: dataKey === "events" ? Number(m.eventsIn) / 60 : dataKey === "bytes" ? Number(m.bytesIn) / 60 + : dataKey === "latency" ? (m.latencyMeanMs ?? 0) : Number(m.errorsTotal) / 60, out: dataKey === "events" ? Number(m.eventsOut) / 60 : dataKey === "bytes" ? Number(m.bytesOut) / 60 + : dataKey === "latency" ? (m.latencyMeanMs ?? 0) : Number(m.eventsDiscarded) / 60, })); @@ -71,7 +76,7 @@ export function MetricsChart({ rows, dataKey, height = 200 }: MetricsChartProps) ); } - const formatter = dataKey === "bytes" ? formatBytesRate : formatEventsRate; + const formatter = dataKey === "bytes" ? formatBytesRate : dataKey === "latency" ? formatLatency : formatEventsRate; return ( @@ -96,7 +101,9 @@ export function MetricsChart({ rows, dataKey, height = 200 }: MetricsChartProps) } /> - + {dataKey !== "latency" && ( + + )} ); From 4b4d2d66aea653d568dea224858fa6b5adf8890c Mon Sep 17 00:00:00 2001 From: TerrifiedBug Date: Wed, 11 Mar 2026 10:04:09 +0000 Subject: [PATCH 07/10] feat(ui): add latency to flow editor overlay and show-metrics panel --- src/app/(dashboard)/pipelines/[id]/page.tsx | 1 + src/components/flow/node-metrics-format.ts | 2 +- src/components/flow/sink-node.tsx | 5 ++- src/components/flow/source-node.tsx | 5 ++- src/components/flow/transform-node.tsx | 29 ++++++++------ src/components/pipeline/metrics-chart.tsx | 44 ++++++++++++++++++++- src/server/routers/pipeline.ts | 1 + src/stores/flow-store.ts | 1 + 8 files changed, 73 insertions(+), 15 deletions(-) diff --git a/src/app/(dashboard)/pipelines/[id]/page.tsx b/src/app/(dashboard)/pipelines/[id]/page.tsx index fe2d3dd3..eefb9fa4 100644 --- a/src/app/(dashboard)/pipelines/[id]/page.tsx +++ b/src/app/(dashboard)/pipelines/[id]/page.tsx @@ -216,6 +216,7 @@ function PipelineBuilderInner({ pipelineId }: { pipelineId: string }) { ...(entry.kind === "TRANSFORM" ? { eventsInPerSec: latest.receivedEventsRate } : {}), status: eventsPerSec > 0 ? "healthy" : "degraded", samples: entry.samples, + latencyMs: latest.latencyMeanMs, }); } diff --git a/src/components/flow/node-metrics-format.ts b/src/components/flow/node-metrics-format.ts index 53f1d35d..fdbff25a 100644 --- a/src/components/flow/node-metrics-format.ts +++ b/src/components/flow/node-metrics-format.ts @@ -1 +1 @@ -export { formatRate, formatBytesRate } from "@/lib/format"; +export { formatRate, formatBytesRate, formatLatency } from "@/lib/format"; diff --git a/src/components/flow/sink-node.tsx b/src/components/flow/sink-node.tsx index 74a7c02e..b3e60500 100644 --- a/src/components/flow/sink-node.tsx +++ b/src/components/flow/sink-node.tsx @@ -8,7 +8,7 @@ import type { VectorComponentDef } from "@/lib/vector/types"; import type { NodeMetricsData } from "@/stores/flow-store"; import { getIcon } from "./node-icon"; import { NodeSparkline } from "./node-sparkline"; -import { formatRate, formatBytesRate } from "./node-metrics-format"; +import { formatRate, formatBytesRate, formatLatency } from "./node-metrics-format"; import { StatusDot } from "@/components/ui/status-dot"; import { nodeStatusVariant } from "@/lib/status"; @@ -68,6 +68,9 @@ function SinkNodeComponent({ data, selected }: NodeProps) { {metrics && (

{formatRate(metrics.eventsPerSec)} ev/s{" "}{formatBytesRate(metrics.bytesPerSec)} + {metrics.latencyMs != null && metrics.latencyMs > 0 && ( + <>{" "}{formatLatency(metrics.latencyMs)} + )}

)} diff --git a/src/components/flow/source-node.tsx b/src/components/flow/source-node.tsx index 87b1372c..75b97faa 100644 --- a/src/components/flow/source-node.tsx +++ b/src/components/flow/source-node.tsx @@ -8,7 +8,7 @@ import type { VectorComponentDef } from "@/lib/vector/types"; import type { NodeMetricsData } from "@/stores/flow-store"; import { getIcon } from "./node-icon"; import { NodeSparkline } from "./node-sparkline"; -import { formatRate, formatBytesRate } from "./node-metrics-format"; +import { formatRate, formatBytesRate, formatLatency } from "./node-metrics-format"; import { StatusDot } from "@/components/ui/status-dot"; import { nodeStatusVariant } from "@/lib/status"; @@ -66,6 +66,9 @@ function SourceNodeComponent({ data, selected }: NodeProps) { {metrics && (

{formatRate(metrics.eventsPerSec)} ev/s{" "}{formatBytesRate(metrics.bytesPerSec)} + {metrics.latencyMs != null && metrics.latencyMs > 0 && ( + <>{" "}{formatLatency(metrics.latencyMs)} + )}

)} diff --git a/src/components/flow/transform-node.tsx b/src/components/flow/transform-node.tsx index dbe824f2..2cc45996 100644 --- a/src/components/flow/transform-node.tsx +++ b/src/components/flow/transform-node.tsx @@ -8,7 +8,7 @@ import type { VectorComponentDef } from "@/lib/vector/types"; import type { NodeMetricsData } from "@/stores/flow-store"; import { getIcon } from "./node-icon"; import { NodeSparkline } from "./node-sparkline"; -import { formatRate, formatBytesRate } from "./node-metrics-format"; +import { formatRate, formatBytesRate, formatLatency } from "./node-metrics-format"; import { StatusDot } from "@/components/ui/status-dot"; import { nodeStatusVariant } from "@/lib/status"; @@ -69,16 +69,23 @@ function TransformNodeComponent({ {displayName &&

{displayName}

} {metrics && ( - metrics.eventsInPerSec != null ? ( -
- {formatRate(metrics.eventsInPerSec)} ev/s in - {formatRate(metrics.eventsPerSec)} ev/s out -
- ) : ( -

- {formatRate(metrics.eventsPerSec)} ev/s{" "}{formatBytesRate(metrics.bytesPerSec)} -

- ) + <> + {metrics.eventsInPerSec != null ? ( +
+ {formatRate(metrics.eventsInPerSec)} ev/s in + {formatRate(metrics.eventsPerSec)} ev/s out +
+ ) : ( +

+ {formatRate(metrics.eventsPerSec)} ev/s{" "}{formatBytesRate(metrics.bytesPerSec)} +

+ )} + {metrics.latencyMs != null && metrics.latencyMs > 0 && ( +

+ {formatLatency(metrics.latencyMs)} +

+ )} + )} diff --git a/src/components/pipeline/metrics-chart.tsx b/src/components/pipeline/metrics-chart.tsx index ddcb0894..e6592564 100644 --- a/src/components/pipeline/metrics-chart.tsx +++ b/src/components/pipeline/metrics-chart.tsx @@ -12,7 +12,7 @@ import { import { ChartContainer, ChartTooltip, ChartTooltipContent, type ChartConfig } from "@/components/ui/chart"; import { Skeleton } from "@/components/ui/skeleton"; import { Inbox } from "lucide-react"; -import { formatBytesRate } from "@/lib/format"; +import { formatBytesRate, formatLatency } from "@/lib/format"; interface PipelineMetricsChartProps { pipelineId: string; @@ -35,6 +35,10 @@ const bytesChartConfig = { bytesOut: { label: "Bytes Out/s", color: "#8b5cf6" }, } satisfies ChartConfig; +const latencyChartConfig = { + latency: { label: "Mean Latency", color: "#ec4899" }, +} satisfies ChartConfig; + export function PipelineMetricsChart({ pipelineId, hours = 24 }: PipelineMetricsChartProps) { const trpc = useTRPC(); @@ -51,6 +55,7 @@ export function PipelineMetricsChart({ pipelineId, hours = 24 }: PipelineMetrics bytesIn: Number(m.bytesIn) / 60, bytesOut: Number(m.bytesOut) / 60, errors: Number(m.errorsTotal), + latency: m.latencyMeanMs ?? 0, })); if (metricsQuery.isLoading) { @@ -160,6 +165,43 @@ export function PipelineMetricsChart({ pipelineId, hours = 24 }: PipelineMetrics + + {/* Latency chart */} +
+

Component Latency

+ + + + + formatLatency(v)} + /> + ( +
+ {latencyChartConfig[name as keyof typeof latencyChartConfig]?.label ?? name} + {formatLatency(Number(value) ?? 0)} +
+ )} + /> + } + /> + +
+
+
); } diff --git a/src/server/routers/pipeline.ts b/src/server/routers/pipeline.ts index d82f3535..79f33743 100644 --- a/src/server/routers/pipeline.ts +++ b/src/server/routers/pipeline.ts @@ -1075,6 +1075,7 @@ export const pipelineRouter = router({ bytesIn: true, bytesOut: true, utilization: true, + latencyMeanMs: true, }, }); }), diff --git a/src/stores/flow-store.ts b/src/stores/flow-store.ts index 1386a351..763b00af 100644 --- a/src/stores/flow-store.ts +++ b/src/stores/flow-store.ts @@ -57,6 +57,7 @@ export interface NodeMetricsData { eventsInPerSec?: number; status: string; samples?: import("@/server/services/metric-store").MetricSample[]; + latencyMs?: number | null; } export interface FlowState { From ce6abcb5858c67c4b32529142f2d5fee2d52dbae Mon Sep 17 00:00:00 2001 From: TerrifiedBug Date: Wed, 11 Mar 2026 10:06:02 +0000 Subject: [PATCH 08/10] feat(ui): add Avg Latency column to fleet node pipeline table --- src/app/(dashboard)/fleet/[nodeId]/page.tsx | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/app/(dashboard)/fleet/[nodeId]/page.tsx b/src/app/(dashboard)/fleet/[nodeId]/page.tsx index 8a76bbeb..c3a2fbfd 100644 --- a/src/app/(dashboard)/fleet/[nodeId]/page.tsx +++ b/src/app/(dashboard)/fleet/[nodeId]/page.tsx @@ -30,6 +30,7 @@ import { formatCount, formatBytes, formatBytesRate, + formatLatency, } from "@/lib/format"; import { nodeStatusVariant, nodeStatusLabel, pipelineStatusVariant, pipelineStatusLabel } from "@/lib/status"; @@ -521,6 +522,7 @@ export default function NodeDetailPage() { Errors Bytes In Bytes Out + Avg Latency Uptime @@ -557,6 +559,11 @@ export default function NodeDetailPage() {
{formatBytes(ps.bytesOut)}
{rates &&
{formatBytesRate(rates.bytesOutRate)}
} + + {rates?.latencyMeanMs != null + ? formatLatency(rates.latencyMeanMs) + : "—"} + {formatUptime(ps.uptimeSeconds)} From 5e7524c4cd9b26b6aa664c5caba25f64a24809e6 Mon Sep 17 00:00:00 2001 From: TerrifiedBug Date: Wed, 11 Mar 2026 10:06:06 +0000 Subject: [PATCH 09/10] feat: add latency_mean SLI metric type for pipeline health evaluation --- prisma/schema.prisma | 2 +- src/server/services/sli-evaluator.ts | 19 +++++++++++++++++++ 2 files changed, 20 insertions(+), 1 deletion(-) diff --git a/prisma/schema.prisma b/prisma/schema.prisma index 650a4b86..dc1b8d9c 100644 --- a/prisma/schema.prisma +++ b/prisma/schema.prisma @@ -331,7 +331,7 @@ model PipelineSli { id String @id @default(cuid()) pipelineId String pipeline Pipeline @relation(fields: [pipelineId], references: [id], onDelete: Cascade) - metric String // "error_rate" | "throughput_floor" | "discard_rate" + metric String // "error_rate" | "throughput_floor" | "discard_rate" | "latency_mean" condition String // "lt" | "gt" threshold Float windowMinutes Int @default(5) diff --git a/src/server/services/sli-evaluator.ts b/src/server/services/sli-evaluator.ts index 5328dcfb..4a72c0ab 100644 --- a/src/server/services/sli-evaluator.ts +++ b/src/server/services/sli-evaluator.ts @@ -74,6 +74,25 @@ export async function evaluatePipelineHealth(pipelineId: string): Promise<{ value = totalEventsIn / windowSeconds; break; } + case "latency_mean": { + const latencyAgg = await prisma.pipelineMetric.aggregate({ + where: { pipelineId, timestamp: { gte: since }, latencyMeanMs: { not: null } }, + _avg: { latencyMeanMs: true }, + _count: true, + }); + if (latencyAgg._count === 0) { + results.push({ + metric: sli.metric, + status: "no_data", + value: null, + threshold: sli.threshold, + condition: sli.condition, + }); + continue; + } + value = latencyAgg._avg.latencyMeanMs ?? 0; + break; + } default: value = 0; } From ef105321d1a56b5855db740afc10e95bbe7c168e Mon Sep 17 00:00:00 2001 From: TerrifiedBug Date: Wed, 11 Mar 2026 10:10:51 +0000 Subject: [PATCH 10/10] fix: use proper mean for latency aggregation and handle zero formatting - Fix running-average bug in getNodePipelineRates that gave incorrect latency means when aggregating across >2 components - Add early return for formatLatency(0) to display "0ms" instead of "0.000ms" --- src/lib/format.ts | 1 + src/server/routers/metrics.ts | 16 +++++++++++++--- 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/src/lib/format.ts b/src/lib/format.ts index b3b44710..6b3e90fe 100644 --- a/src/lib/format.ts +++ b/src/lib/format.ts @@ -75,6 +75,7 @@ export function formatTimeAxis(timestamp: number | string, range: string): strin /** Format latency in milliseconds to a human-readable string. */ export function formatLatency(ms: number): string { + if (ms === 0) return "0ms"; if (ms >= 1000) return `${(ms / 1000).toFixed(2)}s`; if (ms >= 1) return `${ms.toFixed(1)}ms`; if (ms >= 0.001) return `${(ms * 1000).toFixed(0)}us`; diff --git a/src/server/routers/metrics.ts b/src/server/routers/metrics.ts index 581dad25..75583eff 100644 --- a/src/server/routers/metrics.ts +++ b/src/server/routers/metrics.ts @@ -116,6 +116,8 @@ export const metricsRouter = router({ errorsRate: number; latencyMeanMs: number | null; }> = {}; + // Accumulate latency sum+count for proper averaging across components + const latencyAcc: Record = {}; for (const [componentId, samples] of nodeMetrics) { if (samples.length === 0) continue; @@ -139,13 +141,21 @@ export const metricsRouter = router({ } existing.errorsRate += latest.errorsRate; if (latest.latencyMeanMs != null) { - existing.latencyMeanMs = existing.latencyMeanMs != null - ? (existing.latencyMeanMs + latest.latencyMeanMs) / 2 - : latest.latencyMeanMs; + const acc = latencyAcc[matchingNode.pipelineId] ?? { sum: 0, count: 0 }; + acc.sum += latest.latencyMeanMs; + acc.count++; + latencyAcc[matchingNode.pipelineId] = acc; } rates[matchingNode.pipelineId] = existing; } + // Compute proper mean latency per pipeline + for (const [pipelineId, acc] of Object.entries(latencyAcc)) { + if (rates[pipelineId] && acc.count > 0) { + rates[pipelineId].latencyMeanMs = acc.sum / acc.count; + } + } + return { rates }; }),