From d67745c7a208ea2df8a443c33ac395d8fb573617 Mon Sep 17 00:00:00 2001 From: TerrifiedBug Date: Wed, 11 Mar 2026 13:24:47 +0000 Subject: [PATCH 01/12] feat: add componentId column to PipelineMetric for per-component latency --- .../migration.sql | 5 +++++ prisma/schema.prisma | 4 +++- 2 files changed, 8 insertions(+), 1 deletion(-) create mode 100644 prisma/migrations/20260311030000_add_component_id_to_pipeline_metric/migration.sql diff --git a/prisma/migrations/20260311030000_add_component_id_to_pipeline_metric/migration.sql b/prisma/migrations/20260311030000_add_component_id_to_pipeline_metric/migration.sql new file mode 100644 index 0000000..248d2cf --- /dev/null +++ b/prisma/migrations/20260311030000_add_component_id_to_pipeline_metric/migration.sql @@ -0,0 +1,5 @@ +-- AlterTable: Add componentId column to PipelineMetric for per-component latency +ALTER TABLE "PipelineMetric" ADD COLUMN "componentId" TEXT; + +-- CreateIndex: Compound index for per-component latency queries +CREATE INDEX "PipelineMetric_pipelineId_componentId_timestamp_idx" ON "PipelineMetric"("pipelineId", "componentId", "timestamp"); diff --git a/prisma/schema.prisma b/prisma/schema.prisma index fb9734d..abbff76 100644 --- a/prisma/schema.prisma +++ b/prisma/schema.prisma @@ -327,6 +327,7 @@ model PipelineMetric { pipelineId String pipeline Pipeline @relation(fields: [pipelineId], references: [id], onDelete: Cascade) nodeId String? + componentId String? // null = aggregate row, non-null = per-component timestamp DateTime eventsIn BigInt @default(0) eventsOut BigInt @default(0) @@ -335,9 +336,10 @@ model PipelineMetric { bytesIn BigInt @default(0) bytesOut BigInt @default(0) utilization Float @default(0) - latencyMeanMs Float? // pipeline-level weighted mean latency (ms) + latencyMeanMs Float? // pipeline-level weighted mean latency (ms) @@index([pipelineId, timestamp]) + @@index([pipelineId, componentId, timestamp]) @@index([timestamp]) } From 25bb6f4ce5b37acec37470dd856b315bb3ccff86 Mon Sep 17 00:00:00 2001 From: TerrifiedBug Date: Wed, 11 Mar 2026 13:26:29 +0000 Subject: [PATCH 02/12] fix: add componentId null filter to metrics-ingest queries --- src/server/services/metrics-ingest.ts | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/server/services/metrics-ingest.ts b/src/server/services/metrics-ingest.ts index 3920f99..e4c55ec 100644 --- a/src/server/services/metrics-ingest.ts +++ b/src/server/services/metrics-ingest.ts @@ -81,6 +81,7 @@ export async function ingestMetrics( where: { pipelineId: dp.pipelineId, nodeId: dp.nodeId, + componentId: null, timestamp: now, }, }); @@ -126,6 +127,7 @@ export async function ingestMetrics( where: { pipelineId, nodeId: { not: null }, + componentId: null, timestamp: now, }, }); @@ -162,6 +164,7 @@ export async function ingestMetrics( where: { pipelineId, nodeId: null, + componentId: null, timestamp: now, }, }); From 509a0992633f595ad0ad5960228e647b9a7fffa3 Mon Sep 17 00:00:00 2001 From: TerrifiedBug Date: Wed, 11 Mar 2026 13:26:32 +0000 Subject: [PATCH 03/12] fix: add componentId null filter to SLI evaluator queries --- src/server/services/sli-evaluator.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/server/services/sli-evaluator.ts b/src/server/services/sli-evaluator.ts index 4a72c0a..60f3a9f 100644 --- a/src/server/services/sli-evaluator.ts +++ b/src/server/services/sli-evaluator.ts @@ -27,7 +27,7 @@ export async function evaluatePipelineHealth(pipelineId: string): Promise<{ // Use aggregate to avoid transferring all metric rows to the application const agg = await prisma.pipelineMetric.aggregate({ - where: { pipelineId, timestamp: { gte: since } }, + where: { pipelineId, componentId: null, timestamp: { gte: since } }, _sum: { eventsIn: true, errorsTotal: true, eventsDiscarded: true }, _count: true, }); @@ -76,7 +76,7 @@ export async function evaluatePipelineHealth(pipelineId: string): Promise<{ } case "latency_mean": { const latencyAgg = await prisma.pipelineMetric.aggregate({ - where: { pipelineId, timestamp: { gte: since }, latencyMeanMs: { not: null } }, + where: { pipelineId, componentId: null, timestamp: { gte: since }, latencyMeanMs: { not: null } }, _avg: { latencyMeanMs: true }, _count: true, }); From c85ad9fb7ad8bc131900359f2ac7b8d48b67539b Mon Sep 17 00:00:00 2001 From: TerrifiedBug Date: Wed, 11 Mar 2026 13:28:24 +0000 Subject: [PATCH 04/12] fix: add componentId null filter to all dashboard metric queries --- src/server/routers/dashboard.ts | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/src/server/routers/dashboard.ts b/src/server/routers/dashboard.ts index dd189f0..a71407b 100644 --- a/src/server/routers/dashboard.ts +++ b/src/server/routers/dashboard.ts @@ -27,6 +27,7 @@ export const dashboardRouter = router({ prisma.pipelineMetric.aggregate({ where: { nodeId: null, // aggregate rows only + componentId: null, timestamp: { gte: oneHourAgo }, pipeline: { environmentId: input.environmentId }, }, @@ -279,6 +280,7 @@ export const dashboardRouter = router({ where: { pipelineId: { in: pipelineIds }, nodeId: null, + componentId: null, timestamp: { gte: oneHourAgo }, }, orderBy: { timestamp: "asc" }, @@ -470,6 +472,7 @@ export const dashboardRouter = router({ prisma.pipelineMetric.aggregate({ where: { nodeId: null, + componentId: null, timestamp: { gte: fiveMinAgo }, ...(user?.isSuperAdmin ? {} : { pipeline: teamFilter }), }, @@ -503,6 +506,7 @@ export const dashboardRouter = router({ const current = await prisma.pipelineMetric.aggregate({ where: { pipeline: { environmentId: input.environmentId }, + componentId: null, timestamp: { gte: since }, }, _sum: { eventsIn: true, eventsOut: true, bytesIn: true, bytesOut: true }, @@ -512,6 +516,7 @@ export const dashboardRouter = router({ const previous = await prisma.pipelineMetric.aggregate({ where: { pipeline: { environmentId: input.environmentId }, + componentId: null, timestamp: { gte: prevSince, lt: since }, }, _sum: { eventsIn: true, eventsOut: true, bytesIn: true, bytesOut: true }, @@ -522,6 +527,7 @@ export const dashboardRouter = router({ by: ["pipelineId"], where: { pipeline: { environmentId: input.environmentId }, + componentId: null, timestamp: { gte: since }, }, _sum: { eventsIn: true, eventsOut: true, bytesIn: true, bytesOut: true }, @@ -554,6 +560,7 @@ export const dashboardRouter = router({ const rawMetrics = await prisma.pipelineMetric.findMany({ where: { pipeline: { environmentId: input.environmentId }, + componentId: null, timestamp: { gte: since }, }, select: { @@ -666,6 +673,7 @@ export const dashboardRouter = router({ prisma.pipelineMetric.findMany({ where: { pipelineId: { in: effectivePipelineIds }, + componentId: null, timestamp: { gte: since }, ...(input.groupBy === "node" ? { nodeId: { in: effectiveNodeIds } } From d49ca58c85ccfbc7c11b4d6ba7574fccb5a91abd Mon Sep 17 00:00:00 2001 From: TerrifiedBug Date: Wed, 11 Mar 2026 13:28:27 +0000 Subject: [PATCH 05/12] fix: add componentId null filter to metrics and pipeline router queries --- src/server/routers/metrics.ts | 1 + src/server/routers/pipeline.ts | 1 + 2 files changed, 2 insertions(+) diff --git a/src/server/routers/metrics.ts b/src/server/routers/metrics.ts index 75583ef..6c6ca83 100644 --- a/src/server/routers/metrics.ts +++ b/src/server/routers/metrics.ts @@ -22,6 +22,7 @@ export const metricsRouter = router({ where: { pipelineId: input.pipelineId, nodeId: null, + componentId: null, timestamp: { gte: since }, }, orderBy: { timestamp: "asc" }, diff --git a/src/server/routers/pipeline.ts b/src/server/routers/pipeline.ts index b645d6f..3db25a3 100644 --- a/src/server/routers/pipeline.ts +++ b/src/server/routers/pipeline.ts @@ -1063,6 +1063,7 @@ export const pipelineRouter = router({ where: { pipelineId: input.pipelineId, nodeId: null, + componentId: null, timestamp: { gte: since }, }, orderBy: { timestamp: "asc" }, From 6cb28a6ee09cbbbb37130d71b8d33766e348d507 Mon Sep 17 00:00:00 2001 From: TerrifiedBug Date: Wed, 11 Mar 2026 13:29:45 +0000 Subject: [PATCH 06/12] feat: write per-component latency rows to PipelineMetric --- src/app/api/agent/heartbeat/route.ts | 34 ++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/src/app/api/agent/heartbeat/route.ts b/src/app/api/agent/heartbeat/route.ts index 7508e5d..7214b6c 100644 --- a/src/app/api/agent/heartbeat/route.ts +++ b/src/app/api/agent/heartbeat/route.ts @@ -343,6 +343,10 @@ export async function POST(request: Request) { .catch((err) => console.error("Node metrics insert error:", err)); } + // Shared minute-truncated timestamp for all metric rows this heartbeat + const minuteTimestamp = new Date(); + minuteTimestamp.setSeconds(0, 0); + // Ingest metrics from pipelines that report counter data const metricsData = pipelines .filter((p) => p.eventsIn !== undefined) @@ -365,6 +369,36 @@ export async function POST(request: Request) { ); } + // Write per-component latency rows (direct create, bypasses delta-tracking) + const componentLatencyRows: Array<{ + pipelineId: string; + nodeId: string; + componentId: string; + timestamp: Date; + latencyMeanMs: number; + }> = []; + + for (const ps of pipelines) { + if (!Array.isArray(ps.componentMetrics)) continue; + for (const cm of ps.componentMetrics) { + if (cm.latencyMeanSeconds != null && cm.latencyMeanSeconds > 0) { + componentLatencyRows.push({ + pipelineId: ps.pipelineId, + nodeId: agent.nodeId, + componentId: cm.componentId, + timestamp: minuteTimestamp, + latencyMeanMs: cm.latencyMeanSeconds * 1000, + }); + } + } + } + + if (componentLatencyRows.length > 0) { + prisma.pipelineMetric + .createMany({ data: componentLatencyRows }) + .catch((err) => console.error("Per-component latency insert error:", err)); + } + // Feed per-component metrics into the in-memory MetricStore for editor overlays for (const ps of pipelines) { if (Array.isArray(ps.componentMetrics) && ps.componentMetrics.length > 0) { From 1e03431c2d8cbbb2121d23500faf10de4ec971d2 Mon Sep 17 00:00:00 2001 From: TerrifiedBug Date: Wed, 11 Mar 2026 13:30:48 +0000 Subject: [PATCH 07/12] feat: add getComponentLatencyHistory tRPC procedure --- src/server/routers/metrics.ts | 40 +++++++++++++++++++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/src/server/routers/metrics.ts b/src/server/routers/metrics.ts index 6c6ca83..c2039c5 100644 --- a/src/server/routers/metrics.ts +++ b/src/server/routers/metrics.ts @@ -42,6 +42,46 @@ export const metricsRouter = router({ return { rows }; }), + /** + * Per-component historical latency from the database. + * Used by the pipeline metrics page for the multi-line transform latency chart. + */ + getComponentLatencyHistory: protectedProcedure + .input( + z.object({ + pipelineId: z.string(), + minutes: z.number().int().min(1).max(1440).default(60), + }), + ) + .query(async ({ input }) => { + const since = new Date(Date.now() - input.minutes * 60 * 1000); + + const rows = await prisma.pipelineMetric.findMany({ + where: { + pipelineId: input.pipelineId, + componentId: { not: null }, + timestamp: { gte: since }, + }, + orderBy: { timestamp: "asc" }, + select: { + componentId: true, + timestamp: true, + latencyMeanMs: true, + }, + }); + + // Group by componentId + const components: Record> = {}; + for (const row of rows) { + if (!row.componentId || row.latencyMeanMs == null) continue; + const arr = components[row.componentId] ?? []; + arr.push({ timestamp: row.timestamp, latencyMeanMs: row.latencyMeanMs }); + components[row.componentId] = arr; + } + + return { components }; + }), + /** * Per-component live metrics from the in-memory store. * Used by the flow editor to overlay throughput on nodes. From cab6120558df0f6c2e40745d2f312049759c9067 Mon Sep 17 00:00:00 2001 From: TerrifiedBug Date: Wed, 11 Mar 2026 13:33:13 +0000 Subject: [PATCH 08/12] feat: multi-line transform latency chart on pipeline metrics page --- .../pipelines/[id]/metrics/page.tsx | 119 +++++++++++++++++- 1 file changed, 116 insertions(+), 3 deletions(-) diff --git a/src/app/(dashboard)/pipelines/[id]/metrics/page.tsx b/src/app/(dashboard)/pipelines/[id]/metrics/page.tsx index 5836a1a..c574c47 100644 --- a/src/app/(dashboard)/pipelines/[id]/metrics/page.tsx +++ b/src/app/(dashboard)/pipelines/[id]/metrics/page.tsx @@ -1,6 +1,6 @@ "use client"; -import { useState } from "react"; +import { useState, useMemo } from "react"; import { useParams } from "next/navigation"; import { useQuery } from "@tanstack/react-query"; import { useTRPC } from "@/trpc/client"; @@ -10,6 +10,9 @@ import { Skeleton } from "@/components/ui/skeleton"; import { SummaryCards } from "@/components/metrics/summary-cards"; import { MetricsChart } from "@/components/metrics/component-chart"; import { PipelineLogs } from "@/components/pipeline/pipeline-logs"; +import { LineChart, Line, XAxis, YAxis, CartesianGrid } from "recharts"; +import { ChartContainer, ChartTooltip, ChartTooltipContent, type ChartConfig } from "@/components/ui/chart"; +import { formatLatency } from "@/lib/format"; const TIME_RANGES = [ { label: "5m", minutes: 5 }, @@ -35,6 +38,13 @@ export default function PipelineMetricsPage() { ), ); + const componentLatencyQuery = useQuery( + trpc.metrics.getComponentLatencyHistory.queryOptions( + { pipelineId: params.id, minutes }, + { refetchInterval: 15000 }, + ), + ); + if (pipelineQuery.isLoading) { return (
@@ -118,10 +128,13 @@ export default function PipelineMetricsPage() { - Pipeline Latency + Transform Latency - + @@ -141,3 +154,103 @@ export default function PipelineMetricsPage() {
); } + +// Deterministic color palette for per-component latency lines +const LATENCY_COLORS = [ + "#ec4899", "#8b5cf6", "#3b82f6", "#06b6d4", "#10b981", + "#f59e0b", "#ef4444", "#6366f1", "#14b8a6", "#f97316", +]; + +function TransformLatencyChart({ + components, + height = 220, +}: { + components: Record>; + height?: number; +}) { + const { data, config, componentIds } = useMemo(() => { + const ids = Object.keys(components).sort(); + if (ids.length === 0) return { data: [], config: {} as ChartConfig, componentIds: [] }; + + // Collect all unique timestamps (keyed by epoch ms for correct sorting) + const timeMap = new Map>(); + for (const id of ids) { + for (const point of components[id]) { + const ms = new Date(point.timestamp).getTime(); + const entry = timeMap.get(ms) ?? {}; + entry[id] = point.latencyMeanMs; + timeMap.set(ms, entry); + } + } + + // Build chart data sorted by timestamp, display as locale time + const chartData = Array.from(timeMap.entries()) + .sort(([a], [b]) => a - b) + .map(([ms, values]) => ({ + time: new Date(ms).toLocaleTimeString([], { hour: "2-digit", minute: "2-digit" }), + ...values, + })); + + // Build chart config with deterministic colors + const chartConfig: ChartConfig = {}; + for (let i = 0; i < ids.length; i++) { + chartConfig[ids[i]] = { + label: ids[i], + color: LATENCY_COLORS[i % LATENCY_COLORS.length], + }; + } + + return { data: chartData, config: chartConfig, componentIds: ids }; + }, [components]); + + if (data.length === 0) { + return ( +
+

No transform latency data yet

+
+ ); + } + + return ( + + + + + formatLatency(v)} + /> + ( +
+ {String(name)} + + {formatLatency(Number(value) ?? 0)} + +
+ )} + /> + } + /> + {componentIds.map((id) => ( + + ))} +
+
+ ); +} From 7c29b0946dae099755a60fefd8363cfd2841197d Mon Sep 17 00:00:00 2001 From: TerrifiedBug Date: Wed, 11 Mar 2026 13:33:52 +0000 Subject: [PATCH 09/12] fix: rename Component Latency to Transform Latency in dashboard labels --- src/app/(dashboard)/page.tsx | 2 +- src/components/pipeline/metrics-chart.tsx | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/app/(dashboard)/page.tsx b/src/app/(dashboard)/page.tsx index a9b58a2..341064a 100644 --- a/src/app/(dashboard)/page.tsx +++ b/src/app/(dashboard)/page.tsx @@ -382,7 +382,7 @@ export default function DashboardPage() { height={200} /> } data={chartData.data?.pipeline.latency ?? {}} variant="area" diff --git a/src/components/pipeline/metrics-chart.tsx b/src/components/pipeline/metrics-chart.tsx index b35a8cf..24f653f 100644 --- a/src/components/pipeline/metrics-chart.tsx +++ b/src/components/pipeline/metrics-chart.tsx @@ -168,7 +168,7 @@ export function PipelineMetricsChart({ pipelineId, hours = 24 }: PipelineMetrics {/* Latency chart */}
-

Pipeline Latency

+

Transform Latency

From c8c47779a16be85ab249d5000d624ce2a1307e27 Mon Sep 17 00:00:00 2001 From: TerrifiedBug Date: Wed, 11 Mar 2026 13:42:54 +0000 Subject: [PATCH 10/12] fix: address greptile review findings - Add withTeamAccess("VIEWER") to getComponentLatencyHistory procedure - Replace createMany with findFirst+upsert to deduplicate per-component latency rows within the same minute bucket --- src/app/api/agent/heartbeat/route.ts | 23 ++++++++++++++++++++--- src/server/routers/metrics.ts | 1 + 2 files changed, 21 insertions(+), 3 deletions(-) diff --git a/src/app/api/agent/heartbeat/route.ts b/src/app/api/agent/heartbeat/route.ts index 7214b6c..97a73b6 100644 --- a/src/app/api/agent/heartbeat/route.ts +++ b/src/app/api/agent/heartbeat/route.ts @@ -394,9 +394,26 @@ export async function POST(request: Request) { } if (componentLatencyRows.length > 0) { - prisma.pipelineMetric - .createMany({ data: componentLatencyRows }) - .catch((err) => console.error("Per-component latency insert error:", err)); + Promise.all( + componentLatencyRows.map(async (row) => { + const existing = await prisma.pipelineMetric.findFirst({ + where: { + pipelineId: row.pipelineId, + nodeId: row.nodeId, + componentId: row.componentId, + timestamp: row.timestamp, + }, + }); + if (existing) { + await prisma.pipelineMetric.update({ + where: { id: existing.id }, + data: { latencyMeanMs: row.latencyMeanMs }, + }); + } else { + await prisma.pipelineMetric.create({ data: row }); + } + }), + ).catch((err) => console.error("Per-component latency upsert error:", err)); } // Feed per-component metrics into the in-memory MetricStore for editor overlays diff --git a/src/server/routers/metrics.ts b/src/server/routers/metrics.ts index c2039c5..dd10cb1 100644 --- a/src/server/routers/metrics.ts +++ b/src/server/routers/metrics.ts @@ -53,6 +53,7 @@ export const metricsRouter = router({ minutes: z.number().int().min(1).max(1440).default(60), }), ) + .use(withTeamAccess("VIEWER")) .query(async ({ input }) => { const since = new Date(Date.now() - input.minutes * 60 * 1000); From e5a881cda1d2dae9dd4f2d5d5233bad5064812bf Mon Sep 17 00:00:00 2001 From: TerrifiedBug Date: Wed, 11 Mar 2026 13:54:12 +0000 Subject: [PATCH 11/12] fix: address greptile review findings Average per-component latency across nodes in getComponentLatencyHistory to handle multi-node pipeline deployments correctly --- src/server/routers/metrics.ts | 23 +++++++++++++++++++---- 1 file changed, 19 insertions(+), 4 deletions(-) diff --git a/src/server/routers/metrics.ts b/src/server/routers/metrics.ts index dd10cb1..7ccbc06 100644 --- a/src/server/routers/metrics.ts +++ b/src/server/routers/metrics.ts @@ -71,13 +71,28 @@ export const metricsRouter = router({ }, }); - // Group by componentId + // Average across nodes per (componentId, timestamp) to handle multi-node deployments const components: Record> = {}; + const acc: Record> = {}; + for (const row of rows) { if (!row.componentId || row.latencyMeanMs == null) continue; - const arr = components[row.componentId] ?? []; - arr.push({ timestamp: row.timestamp, latencyMeanMs: row.latencyMeanMs }); - components[row.componentId] = arr; + const tsMs = row.timestamp.getTime(); + const byTs = acc[row.componentId] ?? new Map(); + const bucket = byTs.get(tsMs) ?? { sum: 0, count: 0 }; + bucket.sum += row.latencyMeanMs; + bucket.count++; + byTs.set(tsMs, bucket); + acc[row.componentId] = byTs; + } + + for (const [cid, byTs] of Object.entries(acc)) { + components[cid] = Array.from(byTs.entries()) + .map(([tsMs, { sum, count }]) => ({ + timestamp: new Date(tsMs), + latencyMeanMs: sum / count, + })) + .sort((a, b) => a.timestamp.getTime() - b.timestamp.getTime()); } return { components }; From 3691f2b88ea361d148a806a5da599d83eec2c1a9 Mon Sep 17 00:00:00 2001 From: TerrifiedBug Date: Wed, 11 Mar 2026 14:00:13 +0000 Subject: [PATCH 12/12] fix: address greptile review findings Await per-component latency upserts sequentially to eliminate TOCTOU race between concurrent heartbeat requests --- src/app/api/agent/heartbeat/route.ts | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/src/app/api/agent/heartbeat/route.ts b/src/app/api/agent/heartbeat/route.ts index 97a73b6..f26a76b 100644 --- a/src/app/api/agent/heartbeat/route.ts +++ b/src/app/api/agent/heartbeat/route.ts @@ -394,8 +394,8 @@ export async function POST(request: Request) { } if (componentLatencyRows.length > 0) { - Promise.all( - componentLatencyRows.map(async (row) => { + try { + for (const row of componentLatencyRows) { const existing = await prisma.pipelineMetric.findFirst({ where: { pipelineId: row.pipelineId, @@ -412,8 +412,10 @@ export async function POST(request: Request) { } else { await prisma.pipelineMetric.create({ data: row }); } - }), - ).catch((err) => console.error("Per-component latency upsert error:", err)); + } + } catch (err) { + console.error("Per-component latency upsert error:", err); + } } // Feed per-component metrics into the in-memory MetricStore for editor overlays