Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 9 additions & 8 deletions agent/internal/agent/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,15 @@ func buildHeartbeat(sup *supervisor.Supervisor, vectorVersion string, deployment
// Map per-component metrics for editor node overlays
for _, cm := range sr.Components {
ps.ComponentMetrics = append(ps.ComponentMetrics, client.ComponentMetric{
ComponentID: cm.ComponentID,
ComponentKind: cm.ComponentKind,
ReceivedEvents: cm.ReceivedEvents,
SentEvents: cm.SentEvents,
ReceivedBytes: cm.ReceivedBytes,
SentBytes: cm.SentBytes,
ErrorsTotal: cm.ErrorsTotal,
DiscardedEvents: cm.DiscardedEvents,
ComponentID: cm.ComponentID,
ComponentKind: cm.ComponentKind,
ReceivedEvents: cm.ReceivedEvents,
SentEvents: cm.SentEvents,
ReceivedBytes: cm.ReceivedBytes,
SentBytes: cm.SentBytes,
ErrorsTotal: cm.ErrorsTotal,
DiscardedEvents: cm.DiscardedEvents,
LatencyMeanSeconds: cm.LatencyMeanSeconds,
})
}

Expand Down
3 changes: 2 additions & 1 deletion agent/internal/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,8 @@ type ComponentMetric struct {
ReceivedBytes int64 `json:"receivedBytes,omitempty"`
SentBytes int64 `json:"sentBytes,omitempty"`
ErrorsTotal int64 `json:"errorsTotal,omitempty"`
DiscardedEvents int64 `json:"discardedEvents,omitempty"`
DiscardedEvents int64 `json:"discardedEvents,omitempty"`
LatencyMeanSeconds float64 `json:"latencyMeanSeconds,omitempty"`
}

// HostMetrics holds system-level metrics from the Vector host
Expand Down
8 changes: 7 additions & 1 deletion agent/internal/metrics/scraper.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ type ComponentMetrics struct {
ReceivedBytes int64
SentBytes int64
ErrorsTotal int64
DiscardedEvents int64
DiscardedEvents int64
LatencyMeanSeconds float64 // mean event time in component (seconds)
}

// HostMetrics holds system-level metrics from Vector's host_metrics source.
Expand Down Expand Up @@ -137,6 +138,11 @@ func ScrapePrometheus(metricsPort int) ScrapeResult {
}
getOrCreate(componentMap, componentID, componentKind).DiscardedEvents += v

case "vector_component_latency_mean_seconds", "component_latency_mean_seconds":
if !isInternal {
getOrCreate(componentMap, componentID, componentKind).LatencyMeanSeconds = value
}

// Host metrics – use += to aggregate across CPU cores, devices, interfaces, etc.
case "host_memory_total_bytes":
sr.Host.MemoryTotalBytes += int64(value)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-- AlterTable: Add latencyMeanMs column to PipelineMetric
ALTER TABLE "PipelineMetric" ADD COLUMN "latencyMeanMs" DOUBLE PRECISION;
3 changes: 2 additions & 1 deletion prisma/schema.prisma
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,7 @@ model PipelineMetric {
bytesIn BigInt @default(0)
bytesOut BigInt @default(0)
utilization Float @default(0)
latencyMeanMs Float? // pipeline-level weighted mean latency (ms)

@@index([pipelineId, timestamp])
@@index([timestamp])
Expand All @@ -330,7 +331,7 @@ model PipelineSli {
id String @id @default(cuid())
pipelineId String
pipeline Pipeline @relation(fields: [pipelineId], references: [id], onDelete: Cascade)
metric String // "error_rate" | "throughput_floor" | "discard_rate"
metric String // "error_rate" | "throughput_floor" | "discard_rate" | "latency_mean"
condition String // "lt" | "gt"
threshold Float
windowMinutes Int @default(5)
Expand Down
7 changes: 7 additions & 0 deletions src/app/(dashboard)/fleet/[nodeId]/page.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import {
formatCount,
formatBytes,
formatBytesRate,
formatLatency,
} from "@/lib/format";
import { nodeStatusVariant, nodeStatusLabel, pipelineStatusVariant, pipelineStatusLabel } from "@/lib/status";

Expand Down Expand Up @@ -521,6 +522,7 @@ export default function NodeDetailPage() {
<TableHead className="text-right">Errors</TableHead>
<TableHead className="text-right">Bytes In</TableHead>
<TableHead className="text-right">Bytes Out</TableHead>
<TableHead className="text-right">Avg Latency</TableHead>
<TableHead className="text-right">Uptime</TableHead>
</TableRow>
</TableHeader>
Expand Down Expand Up @@ -557,6 +559,11 @@ export default function NodeDetailPage() {
<div>{formatBytes(ps.bytesOut)}</div>
{rates && <div className="text-xs text-muted-foreground">{formatBytesRate(rates.bytesOutRate)}</div>}
</TableCell>
<TableCell className="text-right font-mono text-sm">
{rates?.latencyMeanMs != null
? formatLatency(rates.latencyMeanMs)
: "—"}
</TableCell>
<TableCell className="text-right font-mono text-sm">
{formatUptime(ps.uptimeSeconds)}
</TableCell>
Expand Down
12 changes: 11 additions & 1 deletion src/app/(dashboard)/page.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import {
Plus,
Pencil,
Trash2,
Timer,
} from "lucide-react";
import { Card, CardContent } from "@/components/ui/card";
import { Skeleton } from "@/components/ui/skeleton";
Expand All @@ -29,7 +30,7 @@ import { MetricsSection } from "@/components/dashboard/metrics-section";
import { MetricChart } from "@/components/dashboard/metric-chart";
import { ViewBuilderDialog } from "@/components/dashboard/view-builder-dialog";
import { CustomView } from "@/components/dashboard/custom-view";
import { formatSI, formatBytesRate, formatEventsRate } from "@/lib/format";
import { formatSI, formatBytesRate, formatEventsRate, formatLatency } from "@/lib/format";
import { PageHeader } from "@/components/page-header";
import { cn } from "@/lib/utils";

Expand Down Expand Up @@ -380,6 +381,15 @@ export default function DashboardPage() {
timeRange={timeRange}
height={200}
/>
<MetricChart
title="Component Latency"
icon={<Timer className="h-4 w-4" />}
data={chartData.data?.pipeline.latency ?? {}}
variant="area"
yFormatter={formatLatency}
timeRange={timeRange}
height={200}
/>
</MetricsSection>

{/* System Metrics */}
Expand Down
9 changes: 9 additions & 0 deletions src/app/(dashboard)/pipelines/[id]/metrics/page.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,15 @@ export default function PipelineMetricsPage() {
<MetricsChart rows={rows} dataKey="errors" height={220} />
</CardContent>
</Card>

<Card>
<CardHeader>
<CardTitle>Component Latency</CardTitle>
</CardHeader>
<CardContent>
<MetricsChart rows={rows} dataKey="latency" height={220} />
</CardContent>
</Card>
</>
)}

Expand Down
1 change: 1 addition & 0 deletions src/app/(dashboard)/pipelines/[id]/page.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ function PipelineBuilderInner({ pipelineId }: { pipelineId: string }) {
...(entry.kind === "TRANSFORM" ? { eventsInPerSec: latest.receivedEventsRate } : {}),
status: eventsPerSec > 0 ? "healthy" : "degraded",
samples: entry.samples,
latencyMs: latest.latencyMeanMs,
});
}

Expand Down
23 changes: 23 additions & 0 deletions src/app/api/agent/heartbeat/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,23 @@ import { deliverToChannels } from "@/server/services/channels";
import { DeploymentMode } from "@/generated/prisma";
import { isVersionOlder } from "@/lib/version";

/** Compute pipeline-level weighted mean latency (ms) from per-component metrics. */
function computeWeightedLatency(
components?: Array<{ receivedEvents: number; sentEvents: number; latencyMeanSeconds?: number }>,
): number | null {
if (!components || components.length === 0) return null;
let weightedSum = 0;
let totalEvents = 0;
for (const cm of components) {
if (cm.latencyMeanSeconds == null || cm.latencyMeanSeconds === 0) continue;
const events = cm.receivedEvents + cm.sentEvents;
weightedSum += cm.latencyMeanSeconds * 1000 * events; // convert seconds → ms
totalEvents += events;
}
if (totalEvents === 0) return null;
return weightedSum / totalEvents;
}

const heartbeatSchema = z.object({
agentVersion: z.string().max(100).optional(),
vectorVersion: z.string().max(100).optional(),
Expand All @@ -39,6 +56,7 @@ const heartbeatSchema = z.object({
sentBytes: z.number().optional(),
errorsTotal: z.number().optional(),
discardedEvents: z.number().optional(),
latencyMeanSeconds: z.number().optional(), // NEW
})).optional(),
utilization: z.number().optional(),
recentLogs: z.array(z.string()).optional(),
Expand Down Expand Up @@ -94,6 +112,7 @@ interface PipelineStatus {
sentBytes?: number;
errorsTotal?: number;
discardedEvents?: number;
latencyMeanSeconds?: number; // NEW
}>;
utilization?: number;
recentLogs?: string[];
Expand Down Expand Up @@ -319,6 +338,7 @@ export async function POST(request: Request) {
bytesIn: BigInt(p.bytesIn ?? 0),
bytesOut: BigInt(p.bytesOut ?? 0),
utilization: p.utilization ?? 0,
latencyMeanMs: computeWeightedLatency(p.componentMetrics),
}));

if (metricsData.length > 0) {
Expand All @@ -336,6 +356,9 @@ export async function POST(request: Request) {
sentEventsTotal: cm.sentEvents,
receivedBytesTotal: cm.receivedBytes ?? 0,
sentBytesTotal: cm.sentBytes ?? 0,
errorsTotal: cm.errorsTotal ?? 0,
discardedTotal: cm.discardedEvents ?? 0,
latencyMeanSeconds: cm.latencyMeanSeconds,
});
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/components/flow/node-metrics-format.ts
Original file line number Diff line number Diff line change
@@ -1 +1 @@
export { formatRate, formatBytesRate } from "@/lib/format";
export { formatRate, formatBytesRate, formatLatency } from "@/lib/format";
5 changes: 4 additions & 1 deletion src/components/flow/sink-node.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import type { VectorComponentDef } from "@/lib/vector/types";
import type { NodeMetricsData } from "@/stores/flow-store";
import { getIcon } from "./node-icon";
import { NodeSparkline } from "./node-sparkline";
import { formatRate, formatBytesRate } from "./node-metrics-format";
import { formatRate, formatBytesRate, formatLatency } from "./node-metrics-format";
import { StatusDot } from "@/components/ui/status-dot";
import { nodeStatusVariant } from "@/lib/status";

Expand Down Expand Up @@ -68,6 +68,9 @@ function SinkNodeComponent({ data, selected }: NodeProps<SinkNodeType>) {
{metrics && (
<p className="truncate text-xs font-mono text-purple-400">
{formatRate(metrics.eventsPerSec)} ev/s{" "}{formatBytesRate(metrics.bytesPerSec)}
{metrics.latencyMs != null && metrics.latencyMs > 0 && (
<>{" "}{formatLatency(metrics.latencyMs)}</>
)}
</p>
)}
</div>
Expand Down
5 changes: 4 additions & 1 deletion src/components/flow/source-node.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import type { VectorComponentDef } from "@/lib/vector/types";
import type { NodeMetricsData } from "@/stores/flow-store";
import { getIcon } from "./node-icon";
import { NodeSparkline } from "./node-sparkline";
import { formatRate, formatBytesRate } from "./node-metrics-format";
import { formatRate, formatBytesRate, formatLatency } from "./node-metrics-format";
import { StatusDot } from "@/components/ui/status-dot";
import { nodeStatusVariant } from "@/lib/status";

Expand Down Expand Up @@ -66,6 +66,9 @@ function SourceNodeComponent({ data, selected }: NodeProps<SourceNodeType>) {
{metrics && (
<p className="truncate text-xs font-mono text-emerald-400">
{formatRate(metrics.eventsPerSec)} ev/s{" "}{formatBytesRate(metrics.bytesPerSec)}
{metrics.latencyMs != null && metrics.latencyMs > 0 && (
<>{" "}{formatLatency(metrics.latencyMs)}</>
)}
</p>
)}
</div>
Expand Down
29 changes: 18 additions & 11 deletions src/components/flow/transform-node.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import type { VectorComponentDef } from "@/lib/vector/types";
import type { NodeMetricsData } from "@/stores/flow-store";
import { getIcon } from "./node-icon";
import { NodeSparkline } from "./node-sparkline";
import { formatRate, formatBytesRate } from "./node-metrics-format";
import { formatRate, formatBytesRate, formatLatency } from "./node-metrics-format";
import { StatusDot } from "@/components/ui/status-dot";
import { nodeStatusVariant } from "@/lib/status";

Expand Down Expand Up @@ -69,16 +69,23 @@ function TransformNodeComponent({
{displayName && <p className="truncate text-xs font-medium text-foreground">{displayName}</p>}

{metrics && (
metrics.eventsInPerSec != null ? (
<div className="flex justify-between text-xs font-mono text-blue-400">
<span>{formatRate(metrics.eventsInPerSec)} ev/s in</span>
<span>{formatRate(metrics.eventsPerSec)} ev/s out</span>
</div>
) : (
<p className="truncate text-xs font-mono text-blue-400">
{formatRate(metrics.eventsPerSec)} ev/s{" "}{formatBytesRate(metrics.bytesPerSec)}
</p>
)
<>
{metrics.eventsInPerSec != null ? (
<div className="flex justify-between text-xs font-mono text-blue-400">
<span>{formatRate(metrics.eventsInPerSec)} ev/s in</span>
<span>{formatRate(metrics.eventsPerSec)} ev/s out</span>
</div>
) : (
<p className="truncate text-xs font-mono text-blue-400">
{formatRate(metrics.eventsPerSec)} ev/s{" "}{formatBytesRate(metrics.bytesPerSec)}
</p>
)}
{metrics.latencyMs != null && metrics.latencyMs > 0 && (
<p className="truncate text-xs font-mono text-blue-400/70">
{formatLatency(metrics.latencyMs)}
</p>
)}
</>
)}
</div>

Expand Down
15 changes: 11 additions & 4 deletions src/components/metrics/component-chart.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import {
} from "recharts";
import { ChartContainer, ChartTooltip, ChartTooltipContent, type ChartConfig } from "@/components/ui/chart";
import { Inbox } from "lucide-react";
import { formatBytesRate } from "@/lib/format";
import { formatBytesRate, formatLatency } from "@/lib/format";

interface MetricRow {
timestamp: Date;
Expand All @@ -20,11 +20,12 @@ interface MetricRow {
bytesOut: bigint;
errorsTotal: bigint;
eventsDiscarded: bigint;
latencyMeanMs?: number | null;
}

interface MetricsChartProps {
rows: MetricRow[];
dataKey: "events" | "bytes" | "errors";
dataKey: "events" | "bytes" | "errors" | "latency";
height?: number;
}

Expand All @@ -38,22 +39,26 @@ const colorMap = {
events: { in: "#22c55e", out: "#3b82f6" },
bytes: { in: "#f59e0b", out: "#8b5cf6" },
errors: { in: "#ef4444", out: "#f97316" },
latency: { in: "#ec4899", out: "#ec4899" },
} as const;

const labelMap = {
events: { in: "Events In/s", out: "Events Out/s" },
bytes: { in: "Bytes In/s", out: "Bytes Out/s" },
errors: { in: "Errors/s", out: "Discarded/s" },
latency: { in: "Mean Latency", out: "Mean Latency" },
} as const;

export function MetricsChart({ rows, dataKey, height = 200 }: MetricsChartProps) {
const data = rows.map((m) => ({
time: new Date(m.timestamp).toLocaleTimeString([], { hour: "2-digit", minute: "2-digit" }),
in: dataKey === "events" ? Number(m.eventsIn) / 60
: dataKey === "bytes" ? Number(m.bytesIn) / 60
: dataKey === "latency" ? (m.latencyMeanMs ?? 0)
: Number(m.errorsTotal) / 60,
out: dataKey === "events" ? Number(m.eventsOut) / 60
: dataKey === "bytes" ? Number(m.bytesOut) / 60
: dataKey === "latency" ? (m.latencyMeanMs ?? 0)
: Number(m.eventsDiscarded) / 60,
}));

Expand All @@ -71,7 +76,7 @@ export function MetricsChart({ rows, dataKey, height = 200 }: MetricsChartProps)
);
}

const formatter = dataKey === "bytes" ? formatBytesRate : formatEventsRate;
const formatter = dataKey === "bytes" ? formatBytesRate : dataKey === "latency" ? formatLatency : formatEventsRate;

return (
<ChartContainer config={chartConfig} className="w-full" style={{ height }}>
Expand All @@ -96,7 +101,9 @@ export function MetricsChart({ rows, dataKey, height = 200 }: MetricsChartProps)
}
/>
<Area type="monotone" dataKey="in" name={labelMap[dataKey].in} stroke="var(--color-in)" fill="var(--color-in)" fillOpacity={0.1} strokeWidth={1.5} />
<Area type="monotone" dataKey="out" name={labelMap[dataKey].out} stroke="var(--color-out)" fill="var(--color-out)" fillOpacity={0.1} strokeWidth={1.5} />
{dataKey !== "latency" && (
<Area type="monotone" dataKey="out" name={labelMap[dataKey].out} stroke="var(--color-out)" fill="var(--color-out)" fillOpacity={0.1} strokeWidth={1.5} />
)}
</AreaChart>
</ChartContainer>
);
Expand Down
Loading
Loading