diff --git a/agent/internal/agent/heartbeat.go b/agent/internal/agent/heartbeat.go
index 088172e4..50581ac7 100644
--- a/agent/internal/agent/heartbeat.go
+++ b/agent/internal/agent/heartbeat.go
@@ -42,14 +42,15 @@ func buildHeartbeat(sup *supervisor.Supervisor, vectorVersion string, deployment
// Map per-component metrics for editor node overlays
for _, cm := range sr.Components {
ps.ComponentMetrics = append(ps.ComponentMetrics, client.ComponentMetric{
- ComponentID: cm.ComponentID,
- ComponentKind: cm.ComponentKind,
- ReceivedEvents: cm.ReceivedEvents,
- SentEvents: cm.SentEvents,
- ReceivedBytes: cm.ReceivedBytes,
- SentBytes: cm.SentBytes,
- ErrorsTotal: cm.ErrorsTotal,
- DiscardedEvents: cm.DiscardedEvents,
+ ComponentID: cm.ComponentID,
+ ComponentKind: cm.ComponentKind,
+ ReceivedEvents: cm.ReceivedEvents,
+ SentEvents: cm.SentEvents,
+ ReceivedBytes: cm.ReceivedBytes,
+ SentBytes: cm.SentBytes,
+ ErrorsTotal: cm.ErrorsTotal,
+ DiscardedEvents: cm.DiscardedEvents,
+ LatencyMeanSeconds: cm.LatencyMeanSeconds,
})
}
diff --git a/agent/internal/client/client.go b/agent/internal/client/client.go
index 9fd39297..b06d5d82 100644
--- a/agent/internal/client/client.go
+++ b/agent/internal/client/client.go
@@ -178,7 +178,8 @@ type ComponentMetric struct {
ReceivedBytes int64 `json:"receivedBytes,omitempty"`
SentBytes int64 `json:"sentBytes,omitempty"`
ErrorsTotal int64 `json:"errorsTotal,omitempty"`
- DiscardedEvents int64 `json:"discardedEvents,omitempty"`
+ DiscardedEvents int64 `json:"discardedEvents,omitempty"`
+ LatencyMeanSeconds float64 `json:"latencyMeanSeconds,omitempty"`
}
// HostMetrics holds system-level metrics from the Vector host
diff --git a/agent/internal/metrics/scraper.go b/agent/internal/metrics/scraper.go
index 45351fbe..c6358f25 100644
--- a/agent/internal/metrics/scraper.go
+++ b/agent/internal/metrics/scraper.go
@@ -28,7 +28,8 @@ type ComponentMetrics struct {
ReceivedBytes int64
SentBytes int64
ErrorsTotal int64
- DiscardedEvents int64
+ DiscardedEvents int64
+ LatencyMeanSeconds float64 // mean event time in component (seconds)
}
// HostMetrics holds system-level metrics from Vector's host_metrics source.
@@ -137,6 +138,11 @@ func ScrapePrometheus(metricsPort int) ScrapeResult {
}
getOrCreate(componentMap, componentID, componentKind).DiscardedEvents += v
+ case "vector_component_latency_mean_seconds", "component_latency_mean_seconds":
+ if !isInternal {
+ getOrCreate(componentMap, componentID, componentKind).LatencyMeanSeconds = value
+ }
+
// Host metrics – use += to aggregate across CPU cores, devices, interfaces, etc.
case "host_memory_total_bytes":
sr.Host.MemoryTotalBytes += int64(value)
diff --git a/prisma/migrations/20260311000000_add_pipeline_latency_mean/migration.sql b/prisma/migrations/20260311000000_add_pipeline_latency_mean/migration.sql
new file mode 100644
index 00000000..377a4497
--- /dev/null
+++ b/prisma/migrations/20260311000000_add_pipeline_latency_mean/migration.sql
@@ -0,0 +1,2 @@
+-- AlterTable: Add latencyMeanMs column to PipelineMetric
+ALTER TABLE "PipelineMetric" ADD COLUMN "latencyMeanMs" DOUBLE PRECISION;
diff --git a/prisma/schema.prisma b/prisma/schema.prisma
index 3a244320..dc1b8d9c 100644
--- a/prisma/schema.prisma
+++ b/prisma/schema.prisma
@@ -321,6 +321,7 @@ model PipelineMetric {
bytesIn BigInt @default(0)
bytesOut BigInt @default(0)
utilization Float @default(0)
+ latencyMeanMs Float? // pipeline-level weighted mean latency (ms)
@@index([pipelineId, timestamp])
@@index([timestamp])
@@ -330,7 +331,7 @@ model PipelineSli {
id String @id @default(cuid())
pipelineId String
pipeline Pipeline @relation(fields: [pipelineId], references: [id], onDelete: Cascade)
- metric String // "error_rate" | "throughput_floor" | "discard_rate"
+ metric String // "error_rate" | "throughput_floor" | "discard_rate" | "latency_mean"
condition String // "lt" | "gt"
threshold Float
windowMinutes Int @default(5)
diff --git a/src/app/(dashboard)/fleet/[nodeId]/page.tsx b/src/app/(dashboard)/fleet/[nodeId]/page.tsx
index 8a76bbeb..c3a2fbfd 100644
--- a/src/app/(dashboard)/fleet/[nodeId]/page.tsx
+++ b/src/app/(dashboard)/fleet/[nodeId]/page.tsx
@@ -30,6 +30,7 @@ import {
formatCount,
formatBytes,
formatBytesRate,
+ formatLatency,
} from "@/lib/format";
import { nodeStatusVariant, nodeStatusLabel, pipelineStatusVariant, pipelineStatusLabel } from "@/lib/status";
@@ -521,6 +522,7 @@ export default function NodeDetailPage() {
Errors
Bytes In
Bytes Out
+ Avg Latency
Uptime
@@ -557,6 +559,11 @@ export default function NodeDetailPage() {
{formatBytes(ps.bytesOut)}
{rates && {formatBytesRate(rates.bytesOutRate)}
}
+
+ {rates?.latencyMeanMs != null
+ ? formatLatency(rates.latencyMeanMs)
+ : "—"}
+
{formatUptime(ps.uptimeSeconds)}
diff --git a/src/app/(dashboard)/page.tsx b/src/app/(dashboard)/page.tsx
index 11467976..84961472 100644
--- a/src/app/(dashboard)/page.tsx
+++ b/src/app/(dashboard)/page.tsx
@@ -15,6 +15,7 @@ import {
Plus,
Pencil,
Trash2,
+ Timer,
} from "lucide-react";
import { Card, CardContent } from "@/components/ui/card";
import { Skeleton } from "@/components/ui/skeleton";
@@ -29,7 +30,7 @@ import { MetricsSection } from "@/components/dashboard/metrics-section";
import { MetricChart } from "@/components/dashboard/metric-chart";
import { ViewBuilderDialog } from "@/components/dashboard/view-builder-dialog";
import { CustomView } from "@/components/dashboard/custom-view";
-import { formatSI, formatBytesRate, formatEventsRate } from "@/lib/format";
+import { formatSI, formatBytesRate, formatEventsRate, formatLatency } from "@/lib/format";
import { PageHeader } from "@/components/page-header";
import { cn } from "@/lib/utils";
@@ -380,6 +381,15 @@ export default function DashboardPage() {
timeRange={timeRange}
height={200}
/>
+ }
+ data={chartData.data?.pipeline.latency ?? {}}
+ variant="area"
+ yFormatter={formatLatency}
+ timeRange={timeRange}
+ height={200}
+ />
{/* System Metrics */}
diff --git a/src/app/(dashboard)/pipelines/[id]/metrics/page.tsx b/src/app/(dashboard)/pipelines/[id]/metrics/page.tsx
index 45517b49..85e0edb8 100644
--- a/src/app/(dashboard)/pipelines/[id]/metrics/page.tsx
+++ b/src/app/(dashboard)/pipelines/[id]/metrics/page.tsx
@@ -115,6 +115,15 @@ export default function PipelineMetricsPage() {
+
+
+
+ Component Latency
+
+
+
+
+
>
)}
diff --git a/src/app/(dashboard)/pipelines/[id]/page.tsx b/src/app/(dashboard)/pipelines/[id]/page.tsx
index fe2d3dd3..eefb9fa4 100644
--- a/src/app/(dashboard)/pipelines/[id]/page.tsx
+++ b/src/app/(dashboard)/pipelines/[id]/page.tsx
@@ -216,6 +216,7 @@ function PipelineBuilderInner({ pipelineId }: { pipelineId: string }) {
...(entry.kind === "TRANSFORM" ? { eventsInPerSec: latest.receivedEventsRate } : {}),
status: eventsPerSec > 0 ? "healthy" : "degraded",
samples: entry.samples,
+ latencyMs: latest.latencyMeanMs,
});
}
diff --git a/src/app/api/agent/heartbeat/route.ts b/src/app/api/agent/heartbeat/route.ts
index 88338c6b..0391adab 100644
--- a/src/app/api/agent/heartbeat/route.ts
+++ b/src/app/api/agent/heartbeat/route.ts
@@ -14,6 +14,23 @@ import { deliverToChannels } from "@/server/services/channels";
import { DeploymentMode } from "@/generated/prisma";
import { isVersionOlder } from "@/lib/version";
+/** Compute pipeline-level weighted mean latency (ms) from per-component metrics. */
+function computeWeightedLatency(
+ components?: Array<{ receivedEvents: number; sentEvents: number; latencyMeanSeconds?: number }>,
+): number | null {
+ if (!components || components.length === 0) return null;
+ let weightedSum = 0;
+ let totalEvents = 0;
+ for (const cm of components) {
+ if (cm.latencyMeanSeconds == null || cm.latencyMeanSeconds === 0) continue;
+ const events = cm.receivedEvents + cm.sentEvents;
+ weightedSum += cm.latencyMeanSeconds * 1000 * events; // convert seconds → ms
+ totalEvents += events;
+ }
+ if (totalEvents === 0) return null;
+ return weightedSum / totalEvents;
+}
+
const heartbeatSchema = z.object({
agentVersion: z.string().max(100).optional(),
vectorVersion: z.string().max(100).optional(),
@@ -39,6 +56,7 @@ const heartbeatSchema = z.object({
sentBytes: z.number().optional(),
errorsTotal: z.number().optional(),
discardedEvents: z.number().optional(),
+ latencyMeanSeconds: z.number().optional(), // NEW
})).optional(),
utilization: z.number().optional(),
recentLogs: z.array(z.string()).optional(),
@@ -94,6 +112,7 @@ interface PipelineStatus {
sentBytes?: number;
errorsTotal?: number;
discardedEvents?: number;
+ latencyMeanSeconds?: number; // NEW
}>;
utilization?: number;
recentLogs?: string[];
@@ -319,6 +338,7 @@ export async function POST(request: Request) {
bytesIn: BigInt(p.bytesIn ?? 0),
bytesOut: BigInt(p.bytesOut ?? 0),
utilization: p.utilization ?? 0,
+ latencyMeanMs: computeWeightedLatency(p.componentMetrics),
}));
if (metricsData.length > 0) {
@@ -336,6 +356,9 @@ export async function POST(request: Request) {
sentEventsTotal: cm.sentEvents,
receivedBytesTotal: cm.receivedBytes ?? 0,
sentBytesTotal: cm.sentBytes ?? 0,
+ errorsTotal: cm.errorsTotal ?? 0,
+ discardedTotal: cm.discardedEvents ?? 0,
+ latencyMeanSeconds: cm.latencyMeanSeconds,
});
}
}
diff --git a/src/components/flow/node-metrics-format.ts b/src/components/flow/node-metrics-format.ts
index 53f1d35d..fdbff25a 100644
--- a/src/components/flow/node-metrics-format.ts
+++ b/src/components/flow/node-metrics-format.ts
@@ -1 +1 @@
-export { formatRate, formatBytesRate } from "@/lib/format";
+export { formatRate, formatBytesRate, formatLatency } from "@/lib/format";
diff --git a/src/components/flow/sink-node.tsx b/src/components/flow/sink-node.tsx
index 74a7c02e..b3e60500 100644
--- a/src/components/flow/sink-node.tsx
+++ b/src/components/flow/sink-node.tsx
@@ -8,7 +8,7 @@ import type { VectorComponentDef } from "@/lib/vector/types";
import type { NodeMetricsData } from "@/stores/flow-store";
import { getIcon } from "./node-icon";
import { NodeSparkline } from "./node-sparkline";
-import { formatRate, formatBytesRate } from "./node-metrics-format";
+import { formatRate, formatBytesRate, formatLatency } from "./node-metrics-format";
import { StatusDot } from "@/components/ui/status-dot";
import { nodeStatusVariant } from "@/lib/status";
@@ -68,6 +68,9 @@ function SinkNodeComponent({ data, selected }: NodeProps) {
{metrics && (
{formatRate(metrics.eventsPerSec)} ev/s{" "}{formatBytesRate(metrics.bytesPerSec)}
+ {metrics.latencyMs != null && metrics.latencyMs > 0 && (
+ <>{" "}{formatLatency(metrics.latencyMs)}>
+ )}
)}
diff --git a/src/components/flow/source-node.tsx b/src/components/flow/source-node.tsx
index 87b1372c..75b97faa 100644
--- a/src/components/flow/source-node.tsx
+++ b/src/components/flow/source-node.tsx
@@ -8,7 +8,7 @@ import type { VectorComponentDef } from "@/lib/vector/types";
import type { NodeMetricsData } from "@/stores/flow-store";
import { getIcon } from "./node-icon";
import { NodeSparkline } from "./node-sparkline";
-import { formatRate, formatBytesRate } from "./node-metrics-format";
+import { formatRate, formatBytesRate, formatLatency } from "./node-metrics-format";
import { StatusDot } from "@/components/ui/status-dot";
import { nodeStatusVariant } from "@/lib/status";
@@ -66,6 +66,9 @@ function SourceNodeComponent({ data, selected }: NodeProps) {
{metrics && (
{formatRate(metrics.eventsPerSec)} ev/s{" "}{formatBytesRate(metrics.bytesPerSec)}
+ {metrics.latencyMs != null && metrics.latencyMs > 0 && (
+ <>{" "}{formatLatency(metrics.latencyMs)}>
+ )}
)}
diff --git a/src/components/flow/transform-node.tsx b/src/components/flow/transform-node.tsx
index dbe824f2..2cc45996 100644
--- a/src/components/flow/transform-node.tsx
+++ b/src/components/flow/transform-node.tsx
@@ -8,7 +8,7 @@ import type { VectorComponentDef } from "@/lib/vector/types";
import type { NodeMetricsData } from "@/stores/flow-store";
import { getIcon } from "./node-icon";
import { NodeSparkline } from "./node-sparkline";
-import { formatRate, formatBytesRate } from "./node-metrics-format";
+import { formatRate, formatBytesRate, formatLatency } from "./node-metrics-format";
import { StatusDot } from "@/components/ui/status-dot";
import { nodeStatusVariant } from "@/lib/status";
@@ -69,16 +69,23 @@ function TransformNodeComponent({
{displayName && {displayName}
}
{metrics && (
- metrics.eventsInPerSec != null ? (
-
- {formatRate(metrics.eventsInPerSec)} ev/s in
- {formatRate(metrics.eventsPerSec)} ev/s out
-
- ) : (
-
- {formatRate(metrics.eventsPerSec)} ev/s{" "}{formatBytesRate(metrics.bytesPerSec)}
-
- )
+ <>
+ {metrics.eventsInPerSec != null ? (
+
+ {formatRate(metrics.eventsInPerSec)} ev/s in
+ {formatRate(metrics.eventsPerSec)} ev/s out
+
+ ) : (
+
+ {formatRate(metrics.eventsPerSec)} ev/s{" "}{formatBytesRate(metrics.bytesPerSec)}
+
+ )}
+ {metrics.latencyMs != null && metrics.latencyMs > 0 && (
+
+ {formatLatency(metrics.latencyMs)}
+
+ )}
+ >
)}
diff --git a/src/components/metrics/component-chart.tsx b/src/components/metrics/component-chart.tsx
index 7c34a9d3..575e5502 100644
--- a/src/components/metrics/component-chart.tsx
+++ b/src/components/metrics/component-chart.tsx
@@ -10,7 +10,7 @@ import {
} from "recharts";
import { ChartContainer, ChartTooltip, ChartTooltipContent, type ChartConfig } from "@/components/ui/chart";
import { Inbox } from "lucide-react";
-import { formatBytesRate } from "@/lib/format";
+import { formatBytesRate, formatLatency } from "@/lib/format";
interface MetricRow {
timestamp: Date;
@@ -20,11 +20,12 @@ interface MetricRow {
bytesOut: bigint;
errorsTotal: bigint;
eventsDiscarded: bigint;
+ latencyMeanMs?: number | null;
}
interface MetricsChartProps {
rows: MetricRow[];
- dataKey: "events" | "bytes" | "errors";
+ dataKey: "events" | "bytes" | "errors" | "latency";
height?: number;
}
@@ -38,12 +39,14 @@ const colorMap = {
events: { in: "#22c55e", out: "#3b82f6" },
bytes: { in: "#f59e0b", out: "#8b5cf6" },
errors: { in: "#ef4444", out: "#f97316" },
+ latency: { in: "#ec4899", out: "#ec4899" },
} as const;
const labelMap = {
events: { in: "Events In/s", out: "Events Out/s" },
bytes: { in: "Bytes In/s", out: "Bytes Out/s" },
errors: { in: "Errors/s", out: "Discarded/s" },
+ latency: { in: "Mean Latency", out: "Mean Latency" },
} as const;
export function MetricsChart({ rows, dataKey, height = 200 }: MetricsChartProps) {
@@ -51,9 +54,11 @@ export function MetricsChart({ rows, dataKey, height = 200 }: MetricsChartProps)
time: new Date(m.timestamp).toLocaleTimeString([], { hour: "2-digit", minute: "2-digit" }),
in: dataKey === "events" ? Number(m.eventsIn) / 60
: dataKey === "bytes" ? Number(m.bytesIn) / 60
+ : dataKey === "latency" ? (m.latencyMeanMs ?? 0)
: Number(m.errorsTotal) / 60,
out: dataKey === "events" ? Number(m.eventsOut) / 60
: dataKey === "bytes" ? Number(m.bytesOut) / 60
+ : dataKey === "latency" ? (m.latencyMeanMs ?? 0)
: Number(m.eventsDiscarded) / 60,
}));
@@ -71,7 +76,7 @@ export function MetricsChart({ rows, dataKey, height = 200 }: MetricsChartProps)
);
}
- const formatter = dataKey === "bytes" ? formatBytesRate : formatEventsRate;
+ const formatter = dataKey === "bytes" ? formatBytesRate : dataKey === "latency" ? formatLatency : formatEventsRate;
return (
@@ -96,7 +101,9 @@ export function MetricsChart({ rows, dataKey, height = 200 }: MetricsChartProps)
}
/>
-
+ {dataKey !== "latency" && (
+
+ )}
);
diff --git a/src/components/pipeline/metrics-chart.tsx b/src/components/pipeline/metrics-chart.tsx
index ddcb0894..e6592564 100644
--- a/src/components/pipeline/metrics-chart.tsx
+++ b/src/components/pipeline/metrics-chart.tsx
@@ -12,7 +12,7 @@ import {
import { ChartContainer, ChartTooltip, ChartTooltipContent, type ChartConfig } from "@/components/ui/chart";
import { Skeleton } from "@/components/ui/skeleton";
import { Inbox } from "lucide-react";
-import { formatBytesRate } from "@/lib/format";
+import { formatBytesRate, formatLatency } from "@/lib/format";
interface PipelineMetricsChartProps {
pipelineId: string;
@@ -35,6 +35,10 @@ const bytesChartConfig = {
bytesOut: { label: "Bytes Out/s", color: "#8b5cf6" },
} satisfies ChartConfig;
+const latencyChartConfig = {
+ latency: { label: "Mean Latency", color: "#ec4899" },
+} satisfies ChartConfig;
+
export function PipelineMetricsChart({ pipelineId, hours = 24 }: PipelineMetricsChartProps) {
const trpc = useTRPC();
@@ -51,6 +55,7 @@ export function PipelineMetricsChart({ pipelineId, hours = 24 }: PipelineMetrics
bytesIn: Number(m.bytesIn) / 60,
bytesOut: Number(m.bytesOut) / 60,
errors: Number(m.errorsTotal),
+ latency: m.latencyMeanMs ?? 0,
}));
if (metricsQuery.isLoading) {
@@ -160,6 +165,43 @@ export function PipelineMetricsChart({ pipelineId, hours = 24 }: PipelineMetrics
+
+ {/* Latency chart */}
+
+
Component Latency
+
+
+
+
+ formatLatency(v)}
+ />
+ (
+
+ {latencyChartConfig[name as keyof typeof latencyChartConfig]?.label ?? name}
+ {formatLatency(Number(value) ?? 0)}
+
+ )}
+ />
+ }
+ />
+
+
+
+
);
}
diff --git a/src/lib/format.ts b/src/lib/format.ts
index a7dfc5aa..6b3e90fe 100644
--- a/src/lib/format.ts
+++ b/src/lib/format.ts
@@ -72,3 +72,12 @@ export function formatTimeAxis(timestamp: number | string, range: string): strin
}
return d.toLocaleTimeString([], { hour: "2-digit", minute: "2-digit" });
}
+
+/** Format latency in milliseconds to a human-readable string. */
+export function formatLatency(ms: number): string {
+ if (ms === 0) return "0ms";
+ if (ms >= 1000) return `${(ms / 1000).toFixed(2)}s`;
+ if (ms >= 1) return `${ms.toFixed(1)}ms`;
+ if (ms >= 0.001) return `${(ms * 1000).toFixed(0)}us`;
+ return `${ms.toFixed(3)}ms`;
+}
diff --git a/src/server/routers/dashboard.ts b/src/server/routers/dashboard.ts
index a441b8fd..dd189f03 100644
--- a/src/server/routers/dashboard.ts
+++ b/src/server/routers/dashboard.ts
@@ -682,6 +682,7 @@ export const dashboardRouter = router({
bytesOut: true,
errorsTotal: true,
eventsDiscarded: true,
+ latencyMeanMs: true,
},
}),
effectiveNodeIds.length > 0
@@ -741,22 +742,27 @@ export const dashboardRouter = router({
const bytesOut: TSMap = {};
const errors: TSMap = {};
const discarded: TSMap = {};
+ const latency: TSMap = {};
if (input.groupBy === "node") {
// Sum pipeline values per (node, timestamp) since multiple pipelines on one node produce multiple rows
- const acc = new Map>();
+ const acc = new Map>();
for (const row of pipelineRows) {
const label = nodeNameMap.get(row.nodeId ?? "") ?? row.nodeId ?? "unknown";
const t = new Date(row.timestamp).getTime();
if (!acc.has(label)) acc.set(label, new Map());
const timeMap = acc.get(label)!;
- const s = timeMap.get(t) ?? { ei: 0, eo: 0, bi: 0, bo: 0, er: 0, di: 0 };
+ const s = timeMap.get(t) ?? { ei: 0, eo: 0, bi: 0, bo: 0, er: 0, di: 0, lat: 0, latC: 0 };
s.ei += Number(row.eventsIn) / 60;
s.eo += Number(row.eventsOut) / 60;
s.bi += Number(row.bytesIn) / 60;
s.bo += Number(row.bytesOut) / 60;
s.er += Number(row.errorsTotal) / 60;
s.di += Number(row.eventsDiscarded) / 60;
+ if (row.latencyMeanMs != null) {
+ s.lat += row.latencyMeanMs;
+ s.latC++;
+ }
timeMap.set(t, s);
}
for (const [label, timeMap] of acc) {
@@ -767,20 +773,25 @@ export const dashboardRouter = router({
addPoint(bytesOut, label, t, s.bo);
addPoint(errors, label, t, s.er);
addPoint(discarded, label, t, s.di);
+ if (s.latC > 0) addPoint(latency, label, t, s.lat / s.latC);
}
}
} else if (input.groupBy === "aggregate") {
// Sum all pipelines into a single "Total" series per timestamp
- const acc = new Map();
+ const acc = new Map();
for (const row of pipelineRows) {
const t = new Date(row.timestamp).getTime();
- const s = acc.get(t) ?? { ei: 0, eo: 0, bi: 0, bo: 0, er: 0, di: 0 };
+ const s = acc.get(t) ?? { ei: 0, eo: 0, bi: 0, bo: 0, er: 0, di: 0, lat: 0, latC: 0 };
s.ei += Number(row.eventsIn) / 60;
s.eo += Number(row.eventsOut) / 60;
s.bi += Number(row.bytesIn) / 60;
s.bo += Number(row.bytesOut) / 60;
s.er += Number(row.errorsTotal) / 60;
s.di += Number(row.eventsDiscarded) / 60;
+ if (row.latencyMeanMs != null) {
+ s.lat += row.latencyMeanMs;
+ s.latC++;
+ }
acc.set(t, s);
}
for (const [t, s] of acc) {
@@ -790,6 +801,7 @@ export const dashboardRouter = router({
addPoint(bytesOut, "Total", t, s.bo);
addPoint(errors, "Total", t, s.er);
addPoint(discarded, "Total", t, s.di);
+ if (s.latC > 0) addPoint(latency, "Total", t, s.lat / s.latC);
}
} else {
// groupBy === "pipeline" — direct mapping, one series per pipeline
@@ -802,6 +814,9 @@ export const dashboardRouter = router({
addPoint(bytesOut, label, t, Number(row.bytesOut) / 60);
addPoint(errors, label, t, Number(row.errorsTotal) / 60);
addPoint(discarded, label, t, Number(row.eventsDiscarded) / 60);
+ if (row.latencyMeanMs != null) {
+ addPoint(latency, label, t, row.latencyMeanMs);
+ }
}
}
@@ -920,6 +935,7 @@ export const dashboardRouter = router({
bytesOut: downsample(bytesOut),
errors: downsample(errors),
discarded: downsample(discarded),
+ latency: downsample(latency),
},
system: {
cpu: downsample(cpu),
diff --git a/src/server/routers/metrics.ts b/src/server/routers/metrics.ts
index 75fe96c6..75583eff 100644
--- a/src/server/routers/metrics.ts
+++ b/src/server/routers/metrics.ts
@@ -34,6 +34,7 @@ export const metricsRouter = router({
bytesIn: true,
bytesOut: true,
utilization: true,
+ latencyMeanMs: true,
},
});
@@ -113,7 +114,10 @@ export const metricsRouter = router({
bytesInRate: number;
bytesOutRate: number;
errorsRate: number;
+ latencyMeanMs: number | null;
}> = {};
+ // Accumulate latency sum+count for proper averaging across components
+ const latencyAcc: Record = {};
for (const [componentId, samples] of nodeMetrics) {
if (samples.length === 0) continue;
@@ -126,6 +130,7 @@ export const metricsRouter = router({
const existing = rates[matchingNode.pipelineId] ?? {
eventsInRate: 0, eventsOutRate: 0,
bytesInRate: 0, bytesOutRate: 0, errorsRate: 0,
+ latencyMeanMs: null,
};
if (matchingNode.kind === "SOURCE") {
existing.eventsInRate += latest.receivedEventsRate;
@@ -135,9 +140,22 @@ export const metricsRouter = router({
existing.bytesOutRate += latest.sentBytesRate;
}
existing.errorsRate += latest.errorsRate;
+ if (latest.latencyMeanMs != null) {
+ const acc = latencyAcc[matchingNode.pipelineId] ?? { sum: 0, count: 0 };
+ acc.sum += latest.latencyMeanMs;
+ acc.count++;
+ latencyAcc[matchingNode.pipelineId] = acc;
+ }
rates[matchingNode.pipelineId] = existing;
}
+ // Compute proper mean latency per pipeline
+ for (const [pipelineId, acc] of Object.entries(latencyAcc)) {
+ if (rates[pipelineId] && acc.count > 0) {
+ rates[pipelineId].latencyMeanMs = acc.sum / acc.count;
+ }
+ }
+
return { rates };
}),
diff --git a/src/server/routers/pipeline.ts b/src/server/routers/pipeline.ts
index d82f3535..79f33743 100644
--- a/src/server/routers/pipeline.ts
+++ b/src/server/routers/pipeline.ts
@@ -1075,6 +1075,7 @@ export const pipelineRouter = router({
bytesIn: true,
bytesOut: true,
utilization: true,
+ latencyMeanMs: true,
},
});
}),
diff --git a/src/server/services/metric-store.ts b/src/server/services/metric-store.ts
index ae7772fb..e7d8907c 100644
--- a/src/server/services/metric-store.ts
+++ b/src/server/services/metric-store.ts
@@ -7,6 +7,7 @@ export interface MetricSample {
errorCount: number;
errorsRate: number;
discardedRate: number;
+ latencyMeanMs: number | null; // mean component latency in ms
}
interface PrevTotals {
@@ -17,6 +18,7 @@ interface PrevTotals {
sentBytesTotal: number;
errorsTotal: number;
discardedTotal: number;
+ latencyMeanSeconds: number | null;
}
const MAX_SAMPLES = 240; // 1 hour at 15s intervals
@@ -36,6 +38,7 @@ class MetricStore {
sentBytesTotal?: number;
errorsTotal?: number;
discardedTotal?: number;
+ latencyMeanSeconds?: number;
},
): MetricSample | null {
const key = `${nodeId}:${pipelineId}:${componentId}`;
@@ -50,6 +53,7 @@ class MetricStore {
sentBytesTotal: totals.sentBytesTotal ?? 0,
errorsTotal: totals.errorsTotal ?? 0,
discardedTotal: totals.discardedTotal ?? 0,
+ latencyMeanSeconds: totals.latencyMeanSeconds ?? null,
});
if (!prev) return null;
@@ -66,6 +70,7 @@ class MetricStore {
errorCount: totals.errorsTotal ?? 0,
errorsRate: Math.max(0, ((totals.errorsTotal ?? 0) - prev.errorsTotal) / elapsedSec),
discardedRate: Math.max(0, ((totals.discardedTotal ?? 0) - prev.discardedTotal) / elapsedSec),
+ latencyMeanMs: totals.latencyMeanSeconds != null ? totals.latencyMeanSeconds * 1000 : null,
};
const arr = this.samples.get(key) ?? [];
diff --git a/src/server/services/metrics-ingest.ts b/src/server/services/metrics-ingest.ts
index 9d6c7354..3920f992 100644
--- a/src/server/services/metrics-ingest.ts
+++ b/src/server/services/metrics-ingest.ts
@@ -10,6 +10,7 @@ export interface MetricsDataPoint {
bytesIn: bigint;
bytesOut: bigint;
utilization: number;
+ latencyMeanMs: number | null;
}
export interface PreviousSnapshot {
@@ -95,6 +96,7 @@ export async function ingestMetrics(
bytesIn: { increment: deltaBytesIn },
bytesOut: { increment: deltaBytesOut },
utilization: dp.utilization,
+ ...(dp.latencyMeanMs != null ? { latencyMeanMs: dp.latencyMeanMs } : {}),
},
});
} else {
@@ -110,6 +112,7 @@ export async function ingestMetrics(
bytesIn: deltaBytesIn,
bytesOut: deltaBytesOut,
utilization: dp.utilization,
+ ...(dp.latencyMeanMs != null ? { latencyMeanMs: dp.latencyMeanMs } : {}),
},
});
}
@@ -134,6 +137,8 @@ export async function ingestMetrics(
let totalBytesIn = BigInt(0);
let totalBytesOut = BigInt(0);
let totalUtil = 0;
+ let latencyWeightedSum = 0;
+ let latencyWeightCount = 0;
for (const row of nodeRows) {
totalEventsIn += row.eventsIn;
@@ -143,9 +148,15 @@ export async function ingestMetrics(
totalBytesIn += row.bytesIn;
totalBytesOut += row.bytesOut;
totalUtil += row.utilization;
+ if (row.latencyMeanMs != null) {
+ const rowEvents = Number(row.eventsIn) + Number(row.eventsOut);
+ latencyWeightedSum += row.latencyMeanMs * rowEvents;
+ latencyWeightCount += rowEvents;
+ }
}
const avgUtil = nodeRows.length > 0 ? totalUtil / nodeRows.length : 0;
+ const avgLatencyMs = latencyWeightCount > 0 ? latencyWeightedSum / latencyWeightCount : null;
const existingAgg = await prisma.pipelineMetric.findFirst({
where: {
@@ -166,6 +177,7 @@ export async function ingestMetrics(
bytesIn: totalBytesIn,
bytesOut: totalBytesOut,
utilization: avgUtil,
+ ...(avgLatencyMs != null ? { latencyMeanMs: avgLatencyMs } : {}),
},
});
} else {
@@ -181,6 +193,7 @@ export async function ingestMetrics(
bytesIn: totalBytesIn,
bytesOut: totalBytesOut,
utilization: avgUtil,
+ ...(avgLatencyMs != null ? { latencyMeanMs: avgLatencyMs } : {}),
},
});
}
diff --git a/src/server/services/sli-evaluator.ts b/src/server/services/sli-evaluator.ts
index 5328dcfb..4a72c0ab 100644
--- a/src/server/services/sli-evaluator.ts
+++ b/src/server/services/sli-evaluator.ts
@@ -74,6 +74,25 @@ export async function evaluatePipelineHealth(pipelineId: string): Promise<{
value = totalEventsIn / windowSeconds;
break;
}
+ case "latency_mean": {
+ const latencyAgg = await prisma.pipelineMetric.aggregate({
+ where: { pipelineId, timestamp: { gte: since }, latencyMeanMs: { not: null } },
+ _avg: { latencyMeanMs: true },
+ _count: true,
+ });
+ if (latencyAgg._count === 0) {
+ results.push({
+ metric: sli.metric,
+ status: "no_data",
+ value: null,
+ threshold: sli.threshold,
+ condition: sli.condition,
+ });
+ continue;
+ }
+ value = latencyAgg._avg.latencyMeanMs ?? 0;
+ break;
+ }
default:
value = 0;
}
diff --git a/src/stores/flow-store.ts b/src/stores/flow-store.ts
index 1386a351..763b00af 100644
--- a/src/stores/flow-store.ts
+++ b/src/stores/flow-store.ts
@@ -57,6 +57,7 @@ export interface NodeMetricsData {
eventsInPerSec?: number;
status: string;
samples?: import("@/server/services/metric-store").MetricSample[];
+ latencyMs?: number | null;
}
export interface FlowState {