diff --git a/prisma/migrations/20260311030000_add_component_id_to_pipeline_metric/migration.sql b/prisma/migrations/20260311030000_add_component_id_to_pipeline_metric/migration.sql
new file mode 100644
index 0000000..248d2cf
--- /dev/null
+++ b/prisma/migrations/20260311030000_add_component_id_to_pipeline_metric/migration.sql
@@ -0,0 +1,5 @@
+-- AlterTable: Add componentId column to PipelineMetric for per-component latency
+ALTER TABLE "PipelineMetric" ADD COLUMN "componentId" TEXT;
+
+-- CreateIndex: Compound index for per-component latency queries
+CREATE INDEX "PipelineMetric_pipelineId_componentId_timestamp_idx" ON "PipelineMetric"("pipelineId", "componentId", "timestamp");
diff --git a/prisma/schema.prisma b/prisma/schema.prisma
index fb9734d..abbff76 100644
--- a/prisma/schema.prisma
+++ b/prisma/schema.prisma
@@ -327,6 +327,7 @@ model PipelineMetric {
pipelineId String
pipeline Pipeline @relation(fields: [pipelineId], references: [id], onDelete: Cascade)
nodeId String?
+ componentId String? // null = aggregate row, non-null = per-component
timestamp DateTime
eventsIn BigInt @default(0)
eventsOut BigInt @default(0)
@@ -335,9 +336,10 @@ model PipelineMetric {
bytesIn BigInt @default(0)
bytesOut BigInt @default(0)
utilization Float @default(0)
- latencyMeanMs Float? // pipeline-level weighted mean latency (ms)
+ latencyMeanMs Float? // pipeline-level weighted mean latency (ms)
@@index([pipelineId, timestamp])
+ @@index([pipelineId, componentId, timestamp])
@@index([timestamp])
}
diff --git a/src/app/(dashboard)/page.tsx b/src/app/(dashboard)/page.tsx
index a9b58a2..341064a 100644
--- a/src/app/(dashboard)/page.tsx
+++ b/src/app/(dashboard)/page.tsx
@@ -382,7 +382,7 @@ export default function DashboardPage() {
height={200}
/>
}
data={chartData.data?.pipeline.latency ?? {}}
variant="area"
diff --git a/src/app/(dashboard)/pipelines/[id]/metrics/page.tsx b/src/app/(dashboard)/pipelines/[id]/metrics/page.tsx
index 5836a1a..c574c47 100644
--- a/src/app/(dashboard)/pipelines/[id]/metrics/page.tsx
+++ b/src/app/(dashboard)/pipelines/[id]/metrics/page.tsx
@@ -1,6 +1,6 @@
"use client";
-import { useState } from "react";
+import { useState, useMemo } from "react";
import { useParams } from "next/navigation";
import { useQuery } from "@tanstack/react-query";
import { useTRPC } from "@/trpc/client";
@@ -10,6 +10,9 @@ import { Skeleton } from "@/components/ui/skeleton";
import { SummaryCards } from "@/components/metrics/summary-cards";
import { MetricsChart } from "@/components/metrics/component-chart";
import { PipelineLogs } from "@/components/pipeline/pipeline-logs";
+import { LineChart, Line, XAxis, YAxis, CartesianGrid } from "recharts";
+import { ChartContainer, ChartTooltip, ChartTooltipContent, type ChartConfig } from "@/components/ui/chart";
+import { formatLatency } from "@/lib/format";
const TIME_RANGES = [
{ label: "5m", minutes: 5 },
@@ -35,6 +38,13 @@ export default function PipelineMetricsPage() {
),
);
+ const componentLatencyQuery = useQuery(
+ trpc.metrics.getComponentLatencyHistory.queryOptions(
+ { pipelineId: params.id, minutes },
+ { refetchInterval: 15000 },
+ ),
+ );
+
if (pipelineQuery.isLoading) {
return (
@@ -118,10 +128,13 @@ export default function PipelineMetricsPage() {
- Pipeline Latency
+ Transform Latency
-
+
>
@@ -141,3 +154,103 @@ export default function PipelineMetricsPage() {
);
}
+
+// Deterministic color palette for per-component latency lines
+const LATENCY_COLORS = [
+ "#ec4899", "#8b5cf6", "#3b82f6", "#06b6d4", "#10b981",
+ "#f59e0b", "#ef4444", "#6366f1", "#14b8a6", "#f97316",
+];
+
+function TransformLatencyChart({
+ components,
+ height = 220,
+}: {
+ components: Record>;
+ height?: number;
+}) {
+ const { data, config, componentIds } = useMemo(() => {
+ const ids = Object.keys(components).sort();
+ if (ids.length === 0) return { data: [], config: {} as ChartConfig, componentIds: [] };
+
+ // Collect all unique timestamps (keyed by epoch ms for correct sorting)
+ const timeMap = new Map>();
+ for (const id of ids) {
+ for (const point of components[id]) {
+ const ms = new Date(point.timestamp).getTime();
+ const entry = timeMap.get(ms) ?? {};
+ entry[id] = point.latencyMeanMs;
+ timeMap.set(ms, entry);
+ }
+ }
+
+ // Build chart data sorted by timestamp, display as locale time
+ const chartData = Array.from(timeMap.entries())
+ .sort(([a], [b]) => a - b)
+ .map(([ms, values]) => ({
+ time: new Date(ms).toLocaleTimeString([], { hour: "2-digit", minute: "2-digit" }),
+ ...values,
+ }));
+
+ // Build chart config with deterministic colors
+ const chartConfig: ChartConfig = {};
+ for (let i = 0; i < ids.length; i++) {
+ chartConfig[ids[i]] = {
+ label: ids[i],
+ color: LATENCY_COLORS[i % LATENCY_COLORS.length],
+ };
+ }
+
+ return { data: chartData, config: chartConfig, componentIds: ids };
+ }, [components]);
+
+ if (data.length === 0) {
+ return (
+
+
No transform latency data yet
+
+ );
+ }
+
+ return (
+
+
+
+
+ formatLatency(v)}
+ />
+ (
+
+ {String(name)}
+
+ {formatLatency(Number(value) ?? 0)}
+
+
+ )}
+ />
+ }
+ />
+ {componentIds.map((id) => (
+
+ ))}
+
+
+ );
+}
diff --git a/src/app/api/agent/heartbeat/route.ts b/src/app/api/agent/heartbeat/route.ts
index 7508e5d..f26a76b 100644
--- a/src/app/api/agent/heartbeat/route.ts
+++ b/src/app/api/agent/heartbeat/route.ts
@@ -343,6 +343,10 @@ export async function POST(request: Request) {
.catch((err) => console.error("Node metrics insert error:", err));
}
+ // Shared minute-truncated timestamp for all metric rows this heartbeat
+ const minuteTimestamp = new Date();
+ minuteTimestamp.setSeconds(0, 0);
+
// Ingest metrics from pipelines that report counter data
const metricsData = pipelines
.filter((p) => p.eventsIn !== undefined)
@@ -365,6 +369,55 @@ export async function POST(request: Request) {
);
}
+ // Write per-component latency rows (direct create, bypasses delta-tracking)
+ const componentLatencyRows: Array<{
+ pipelineId: string;
+ nodeId: string;
+ componentId: string;
+ timestamp: Date;
+ latencyMeanMs: number;
+ }> = [];
+
+ for (const ps of pipelines) {
+ if (!Array.isArray(ps.componentMetrics)) continue;
+ for (const cm of ps.componentMetrics) {
+ if (cm.latencyMeanSeconds != null && cm.latencyMeanSeconds > 0) {
+ componentLatencyRows.push({
+ pipelineId: ps.pipelineId,
+ nodeId: agent.nodeId,
+ componentId: cm.componentId,
+ timestamp: minuteTimestamp,
+ latencyMeanMs: cm.latencyMeanSeconds * 1000,
+ });
+ }
+ }
+ }
+
+ if (componentLatencyRows.length > 0) {
+ try {
+ for (const row of componentLatencyRows) {
+ const existing = await prisma.pipelineMetric.findFirst({
+ where: {
+ pipelineId: row.pipelineId,
+ nodeId: row.nodeId,
+ componentId: row.componentId,
+ timestamp: row.timestamp,
+ },
+ });
+ if (existing) {
+ await prisma.pipelineMetric.update({
+ where: { id: existing.id },
+ data: { latencyMeanMs: row.latencyMeanMs },
+ });
+ } else {
+ await prisma.pipelineMetric.create({ data: row });
+ }
+ }
+ } catch (err) {
+ console.error("Per-component latency upsert error:", err);
+ }
+ }
+
// Feed per-component metrics into the in-memory MetricStore for editor overlays
for (const ps of pipelines) {
if (Array.isArray(ps.componentMetrics) && ps.componentMetrics.length > 0) {
diff --git a/src/components/pipeline/metrics-chart.tsx b/src/components/pipeline/metrics-chart.tsx
index b35a8cf..24f653f 100644
--- a/src/components/pipeline/metrics-chart.tsx
+++ b/src/components/pipeline/metrics-chart.tsx
@@ -168,7 +168,7 @@ export function PipelineMetricsChart({ pipelineId, hours = 24 }: PipelineMetrics
{/* Latency chart */}
-
Pipeline Latency
+
Transform Latency
diff --git a/src/server/routers/dashboard.ts b/src/server/routers/dashboard.ts
index dd189f0..a71407b 100644
--- a/src/server/routers/dashboard.ts
+++ b/src/server/routers/dashboard.ts
@@ -27,6 +27,7 @@ export const dashboardRouter = router({
prisma.pipelineMetric.aggregate({
where: {
nodeId: null, // aggregate rows only
+ componentId: null,
timestamp: { gte: oneHourAgo },
pipeline: { environmentId: input.environmentId },
},
@@ -279,6 +280,7 @@ export const dashboardRouter = router({
where: {
pipelineId: { in: pipelineIds },
nodeId: null,
+ componentId: null,
timestamp: { gte: oneHourAgo },
},
orderBy: { timestamp: "asc" },
@@ -470,6 +472,7 @@ export const dashboardRouter = router({
prisma.pipelineMetric.aggregate({
where: {
nodeId: null,
+ componentId: null,
timestamp: { gte: fiveMinAgo },
...(user?.isSuperAdmin ? {} : { pipeline: teamFilter }),
},
@@ -503,6 +506,7 @@ export const dashboardRouter = router({
const current = await prisma.pipelineMetric.aggregate({
where: {
pipeline: { environmentId: input.environmentId },
+ componentId: null,
timestamp: { gte: since },
},
_sum: { eventsIn: true, eventsOut: true, bytesIn: true, bytesOut: true },
@@ -512,6 +516,7 @@ export const dashboardRouter = router({
const previous = await prisma.pipelineMetric.aggregate({
where: {
pipeline: { environmentId: input.environmentId },
+ componentId: null,
timestamp: { gte: prevSince, lt: since },
},
_sum: { eventsIn: true, eventsOut: true, bytesIn: true, bytesOut: true },
@@ -522,6 +527,7 @@ export const dashboardRouter = router({
by: ["pipelineId"],
where: {
pipeline: { environmentId: input.environmentId },
+ componentId: null,
timestamp: { gte: since },
},
_sum: { eventsIn: true, eventsOut: true, bytesIn: true, bytesOut: true },
@@ -554,6 +560,7 @@ export const dashboardRouter = router({
const rawMetrics = await prisma.pipelineMetric.findMany({
where: {
pipeline: { environmentId: input.environmentId },
+ componentId: null,
timestamp: { gte: since },
},
select: {
@@ -666,6 +673,7 @@ export const dashboardRouter = router({
prisma.pipelineMetric.findMany({
where: {
pipelineId: { in: effectivePipelineIds },
+ componentId: null,
timestamp: { gte: since },
...(input.groupBy === "node"
? { nodeId: { in: effectiveNodeIds } }
diff --git a/src/server/routers/metrics.ts b/src/server/routers/metrics.ts
index 75583ef..7ccbc06 100644
--- a/src/server/routers/metrics.ts
+++ b/src/server/routers/metrics.ts
@@ -22,6 +22,7 @@ export const metricsRouter = router({
where: {
pipelineId: input.pipelineId,
nodeId: null,
+ componentId: null,
timestamp: { gte: since },
},
orderBy: { timestamp: "asc" },
@@ -41,6 +42,62 @@ export const metricsRouter = router({
return { rows };
}),
+ /**
+ * Per-component historical latency from the database.
+ * Used by the pipeline metrics page for the multi-line transform latency chart.
+ */
+ getComponentLatencyHistory: protectedProcedure
+ .input(
+ z.object({
+ pipelineId: z.string(),
+ minutes: z.number().int().min(1).max(1440).default(60),
+ }),
+ )
+ .use(withTeamAccess("VIEWER"))
+ .query(async ({ input }) => {
+ const since = new Date(Date.now() - input.minutes * 60 * 1000);
+
+ const rows = await prisma.pipelineMetric.findMany({
+ where: {
+ pipelineId: input.pipelineId,
+ componentId: { not: null },
+ timestamp: { gte: since },
+ },
+ orderBy: { timestamp: "asc" },
+ select: {
+ componentId: true,
+ timestamp: true,
+ latencyMeanMs: true,
+ },
+ });
+
+ // Average across nodes per (componentId, timestamp) to handle multi-node deployments
+ const components: Record> = {};
+ const acc: Record> = {};
+
+ for (const row of rows) {
+ if (!row.componentId || row.latencyMeanMs == null) continue;
+ const tsMs = row.timestamp.getTime();
+ const byTs = acc[row.componentId] ?? new Map();
+ const bucket = byTs.get(tsMs) ?? { sum: 0, count: 0 };
+ bucket.sum += row.latencyMeanMs;
+ bucket.count++;
+ byTs.set(tsMs, bucket);
+ acc[row.componentId] = byTs;
+ }
+
+ for (const [cid, byTs] of Object.entries(acc)) {
+ components[cid] = Array.from(byTs.entries())
+ .map(([tsMs, { sum, count }]) => ({
+ timestamp: new Date(tsMs),
+ latencyMeanMs: sum / count,
+ }))
+ .sort((a, b) => a.timestamp.getTime() - b.timestamp.getTime());
+ }
+
+ return { components };
+ }),
+
/**
* Per-component live metrics from the in-memory store.
* Used by the flow editor to overlay throughput on nodes.
diff --git a/src/server/routers/pipeline.ts b/src/server/routers/pipeline.ts
index b645d6f..3db25a3 100644
--- a/src/server/routers/pipeline.ts
+++ b/src/server/routers/pipeline.ts
@@ -1063,6 +1063,7 @@ export const pipelineRouter = router({
where: {
pipelineId: input.pipelineId,
nodeId: null,
+ componentId: null,
timestamp: { gte: since },
},
orderBy: { timestamp: "asc" },
diff --git a/src/server/services/metrics-ingest.ts b/src/server/services/metrics-ingest.ts
index 3920f99..e4c55ec 100644
--- a/src/server/services/metrics-ingest.ts
+++ b/src/server/services/metrics-ingest.ts
@@ -81,6 +81,7 @@ export async function ingestMetrics(
where: {
pipelineId: dp.pipelineId,
nodeId: dp.nodeId,
+ componentId: null,
timestamp: now,
},
});
@@ -126,6 +127,7 @@ export async function ingestMetrics(
where: {
pipelineId,
nodeId: { not: null },
+ componentId: null,
timestamp: now,
},
});
@@ -162,6 +164,7 @@ export async function ingestMetrics(
where: {
pipelineId,
nodeId: null,
+ componentId: null,
timestamp: now,
},
});
diff --git a/src/server/services/sli-evaluator.ts b/src/server/services/sli-evaluator.ts
index 4a72c0a..60f3a9f 100644
--- a/src/server/services/sli-evaluator.ts
+++ b/src/server/services/sli-evaluator.ts
@@ -27,7 +27,7 @@ export async function evaluatePipelineHealth(pipelineId: string): Promise<{
// Use aggregate to avoid transferring all metric rows to the application
const agg = await prisma.pipelineMetric.aggregate({
- where: { pipelineId, timestamp: { gte: since } },
+ where: { pipelineId, componentId: null, timestamp: { gte: since } },
_sum: { eventsIn: true, errorsTotal: true, eventsDiscarded: true },
_count: true,
});
@@ -76,7 +76,7 @@ export async function evaluatePipelineHealth(pipelineId: string): Promise<{
}
case "latency_mean": {
const latencyAgg = await prisma.pipelineMetric.aggregate({
- where: { pipelineId, timestamp: { gte: since }, latencyMeanMs: { not: null } },
+ where: { pipelineId, componentId: null, timestamp: { gte: since }, latencyMeanMs: { not: null } },
_avg: { latencyMeanMs: true },
_count: true,
});